116
119
/* Global Thread counter */
117
120
uint32_t thread_counter;
118
pthread_mutex_t counter_mutex;
119
pthread_cond_t count_threshhold;
120
uint32_t master_wakeup;
121
pthread_mutex_t sleeper_mutex;
122
pthread_cond_t sleep_threshhold;
121
boost::mutex counter_mutex;
122
boost::condition_variable_any count_threshhold;
124
boost::mutex sleeper_mutex;
125
boost::condition_variable_any sleep_threshhold;
124
127
/* Global Thread timer */
125
128
static bool timer_alarm= false;
126
pthread_mutex_t timer_alarm_mutex;
127
pthread_cond_t timer_alarm_threshold;
129
boost::mutex timer_alarm_mutex;
130
boost::condition_variable_any timer_alarm_threshold;
129
132
std::vector < std::string > primary_keys;
264
265
user_supplied_query.append(delimiter);
270
static void run_task(ThreadContext *ctx)
272
uint64_t counter= 0, queries;
273
uint64_t detach_counter;
274
unsigned int commit_counter;
276
drizzle_result_st result;
280
sleeper_mutex.lock();
281
while (master_wakeup)
283
sleep_threshhold.wait(sleeper_mutex);
285
sleeper_mutex.unlock();
287
slap_connect(&con, true);
290
printf("connected!\n");
295
run_query(&con, NULL, "SET AUTOCOMMIT=0", strlen("SET AUTOCOMMIT=0"));
298
for (ptr= ctx->getStmt(), detach_counter= 0;
299
ptr && ptr->getLength();
300
ptr= ptr->getNext(), detach_counter++)
302
if (!opt_only_print && detach_rate && !(detach_counter % detach_rate))
305
slap_connect(&con, true);
309
We have to execute differently based on query type. This should become a function.
311
if ((ptr->getType() == UPDATE_TYPE_REQUIRES_PREFIX) ||
312
(ptr->getType() == SELECT_TYPE_REQUIRES_PREFIX))
315
unsigned int key_val;
316
char buffer[HUGE_STRING_LENGTH];
319
This should only happen if some sort of new engine was
320
implemented that didn't properly handle UPDATEs.
322
Just in case someone runs this under an experimental engine we don't
323
want a crash so the if() is placed here.
325
assert(primary_keys.size());
326
if (primary_keys.size())
328
key_val= (unsigned int)(random() % primary_keys.size());
330
key= primary_keys[key_val].c_str();
334
length= snprintf(buffer, HUGE_STRING_LENGTH, "%.*s '%s'",
335
(int)ptr->getLength(), ptr->getString(), key);
337
if (run_query(&con, &result, buffer, length))
339
fprintf(stderr,"%s: Cannot run query %.*s ERROR : %s\n",
340
internal::my_progname, (uint32_t)length, buffer, drizzle_con_error(&con));
347
if (run_query(&con, &result, ptr->getString(), ptr->getLength()))
349
fprintf(stderr,"%s: Cannot run query %.*s ERROR : %s\n",
350
internal::my_progname, (uint32_t)ptr->getLength(), ptr->getString(), drizzle_con_error(&con));
357
while ((row = drizzle_row_next(&result)))
359
drizzle_result_free(&result);
363
if (commit_rate && (++commit_counter == commit_rate))
366
run_query(&con, NULL, "COMMIT", strlen("COMMIT"));
369
/* If the timer is set, and the alarm is not active then end */
370
if (opt_timer_length && timer_alarm == false)
373
/* If limit has been reached, and we are not in a timer_alarm just end */
374
if (ctx->getLimit() && queries == ctx->getLimit() && timer_alarm == false)
378
if (opt_timer_length && timer_alarm == true)
381
if (ctx->getLimit() && queries < ctx->getLimit())
387
run_query(&con, NULL, "COMMIT", strlen("COMMIT"));
392
boost::mutex::scoped_lock scopedLock(counter_mutex);
394
count_threshhold.notify_one();
268
401
* commandline_options is the set of all options that can only be called via the command line.
1921
static void timer_thread()
1924
We lock around the initial call in case were we in a loop. This
1925
also keeps the value properly syncronized across call threads.
1927
sleeper_mutex.lock();
1928
while (master_wakeup)
1930
sleep_threshhold.wait(sleeper_mutex);
1932
sleeper_mutex.unlock();
1935
boost::mutex::scoped_lock scopedLock(timer_alarm_mutex);
1938
xtime_get(&xt, boost::TIME_UTC);
1939
xt.sec += opt_timer_length;
1941
(void)timer_alarm_threshold.timed_wait(scopedLock, xt);
1945
boost::mutex::scoped_lock scopedLock(timer_alarm_mutex);
1803
1951
run_scheduler(Stats *sptr, Statement **stmts, uint32_t concur, uint64_t limit)
1806
1953
unsigned int real_concurrency;
1807
1954
struct timeval start_time, end_time;
1808
OptionString *sql_type;
1809
pthread_t mainthread; /* Thread descriptor */
1810
pthread_attr_t attr; /* Thread attributes */
1813
pthread_attr_init(&attr);
1814
pthread_attr_setdetachstate(&attr,
1815
PTHREAD_CREATE_DETACHED);
1817
pthread_mutex_lock(&counter_mutex);
1820
pthread_mutex_lock(&sleeper_mutex);
1822
pthread_mutex_unlock(&sleeper_mutex);
1824
real_concurrency= 0;
1826
for (y= 0, sql_type= query_options;
1827
y < query_statements_count;
1828
y++, sql_type= sql_type->getNext())
1830
unsigned int options_loop= 1;
1832
if (sql_type->getOption())
1834
options_loop= strtol(sql_type->getOption(),
1836
options_loop= options_loop ? options_loop : 1;
1839
while (options_loop--)
1841
for (uint32_t x= 0; x < concur; x++)
1844
con= new ThreadContext;
1847
fprintf(stderr, "Memory Allocation error in scheduler\n");
1850
con->setStmt(stmts[y]);
1851
con->setLimit(limit);
1854
/* now you create the thread */
1855
if (pthread_create(&mainthread, &attr, run_task,
1858
fprintf(stderr,"%s: Could not create thread\n", internal::my_progname);
1867
The timer_thread belongs to all threads so it too obeys the wakeup
1868
call that run tasks obey.
1870
if (opt_timer_length)
1872
pthread_mutex_lock(&timer_alarm_mutex);
1874
pthread_mutex_unlock(&timer_alarm_mutex);
1876
if (pthread_create(&mainthread, &attr, timer_thread,
1877
(void *)&opt_timer_length) != 0)
1879
fprintf(stderr,"%s: Could not create timer thread\n", internal::my_progname);
1884
pthread_mutex_unlock(&counter_mutex);
1885
pthread_attr_destroy(&attr);
1887
pthread_mutex_lock(&sleeper_mutex);
1889
pthread_mutex_unlock(&sleeper_mutex);
1890
pthread_cond_broadcast(&sleep_threshhold);
1957
boost::mutex::scoped_lock lock(counter_mutex);
1959
OptionString *sql_type;
1963
boost::mutex::scoped_lock scopedWakeup(sleeper_mutex);
1964
master_wakeup= true;
1967
real_concurrency= 0;
1970
for (y= 0, sql_type= query_options;
1971
y < query_statements_count;
1972
y++, sql_type= sql_type->getNext())
1974
unsigned int options_loop= 1;
1976
if (sql_type->getOption())
1978
options_loop= strtol(sql_type->getOption(),
1980
options_loop= options_loop ? options_loop : 1;
1983
while (options_loop--)
1985
for (uint32_t x= 0; x < concur; x++)
1988
con= new ThreadContext;
1991
fprintf(stderr, "Memory Allocation error in scheduler\n");
1994
con->setStmt(stmts[y]);
1995
con->setLimit(limit);
1999
/* now you create the thread */
2000
boost::thread new_thread(boost::bind(&run_task, con));
2007
The timer_thread belongs to all threads so it too obeys the wakeup
2008
call that run tasks obey.
2010
if (opt_timer_length)
2013
boost::mutex::scoped_lock alarmLock(timer_alarm_mutex);
2017
boost::thread new_thread(&timer_thread);
2022
boost::mutex::scoped_lock scopedSleeper(sleeper_mutex);
2023
master_wakeup= false;
2025
sleep_threshhold.notify_all();
1892
2027
gettimeofday(&start_time, NULL);
1895
2030
We loop until we know that all children have cleaned up.
1897
pthread_mutex_lock(&counter_mutex);
1898
while (thread_counter)
1900
struct timespec abstime;
2033
boost::mutex::scoped_lock scopedLock(counter_mutex);
2034
while (thread_counter)
2037
xtime_get(&xt, boost::TIME_UTC);
1902
set_timespec(abstime, 3);
1903
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
2040
count_threshhold.timed_wait(scopedLock, xt);
1905
pthread_mutex_unlock(&counter_mutex);
1907
2044
gettimeofday(&end_time, NULL);
1910
2046
sptr->setTiming(timedif(end_time, start_time));
1911
2047
sptr->setUsers(concur);
1912
2048
sptr->setRealUsers(real_concurrency);
1919
pthread_handler_t timer_thread(void *p)
1921
uint32_t *timer_length= (uint32_t *)p;
1922
struct timespec abstime;
1926
We lock around the initial call in case were we in a loop. This
1927
also keeps the value properly syncronized across call threads.
1929
pthread_mutex_lock(&sleeper_mutex);
1930
while (master_wakeup)
1932
pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
1934
pthread_mutex_unlock(&sleeper_mutex);
1936
set_timespec(abstime, *timer_length);
1938
pthread_mutex_lock(&timer_alarm_mutex);
1939
pthread_cond_timedwait(&timer_alarm_threshold, &timer_alarm_mutex, &abstime);
1940
pthread_mutex_unlock(&timer_alarm_mutex);
1942
pthread_mutex_lock(&timer_alarm_mutex);
1944
pthread_mutex_unlock(&timer_alarm_mutex);
1949
pthread_handler_t run_task(void *p)
1951
uint64_t counter= 0, queries;
1952
uint64_t detach_counter;
1953
unsigned int commit_counter;
1955
drizzle_result_st result;
1958
ThreadContext *ctx= (ThreadContext *)p;
1960
pthread_mutex_lock(&sleeper_mutex);
1961
while (master_wakeup)
1963
pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
1965
pthread_mutex_unlock(&sleeper_mutex);
1967
slap_connect(&con, true);
1970
printf("connected!\n");
1975
run_query(&con, NULL, "SET AUTOCOMMIT=0", strlen("SET AUTOCOMMIT=0"));
1978
for (ptr= ctx->getStmt(), detach_counter= 0;
1979
ptr && ptr->getLength();
1980
ptr= ptr->getNext(), detach_counter++)
1982
if (!opt_only_print && detach_rate && !(detach_counter % detach_rate))
1985
slap_connect(&con, true);
1989
We have to execute differently based on query type. This should become a function.
1991
if ((ptr->getType() == UPDATE_TYPE_REQUIRES_PREFIX) ||
1992
(ptr->getType() == SELECT_TYPE_REQUIRES_PREFIX))
1995
unsigned int key_val;
1996
char buffer[HUGE_STRING_LENGTH];
1999
This should only happen if some sort of new engine was
2000
implemented that didn't properly handle UPDATEs.
2002
Just in case someone runs this under an experimental engine we don't
2003
want a crash so the if() is placed here.
2005
assert(primary_keys.size());
2006
if (primary_keys.size())
2008
key_val= (unsigned int)(random() % primary_keys.size());
2010
key= primary_keys[key_val].c_str();
2014
length= snprintf(buffer, HUGE_STRING_LENGTH, "%.*s '%s'",
2015
(int)ptr->getLength(), ptr->getString(), key);
2017
if (run_query(&con, &result, buffer, length))
2019
fprintf(stderr,"%s: Cannot run query %.*s ERROR : %s\n",
2020
internal::my_progname, (uint32_t)length, buffer, drizzle_con_error(&con));
2027
if (run_query(&con, &result, ptr->getString(), ptr->getLength()))
2029
fprintf(stderr,"%s: Cannot run query %.*s ERROR : %s\n",
2030
internal::my_progname, (uint32_t)ptr->getLength(), ptr->getString(), drizzle_con_error(&con));
2035
if (!opt_only_print)
2037
while ((row = drizzle_row_next(&result)))
2039
drizzle_result_free(&result);
2043
if (commit_rate && (++commit_counter == commit_rate))
2046
run_query(&con, NULL, "COMMIT", strlen("COMMIT"));
2049
/* If the timer is set, and the alarm is not active then end */
2050
if (opt_timer_length && timer_alarm == false)
2053
/* If limit has been reached, and we are not in a timer_alarm just end */
2054
if (ctx->getLimit() && queries == ctx->getLimit() && timer_alarm == false)
2058
if (opt_timer_length && timer_alarm == true)
2061
if (ctx->getLimit() && queries < ctx->getLimit())
2067
run_query(&con, NULL, "COMMIT", strlen("COMMIT"));
2071
pthread_mutex_lock(&counter_mutex);
2073
pthread_cond_signal(&count_threshhold);
2074
pthread_mutex_unlock(&counter_mutex);
2082
2055
Parse records from comma seperated string. : is a reserved character and is used for options