1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2009 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21
Just a test application for threads.
27
#include "drizzled/option.h"
30
#include <sys/types.h>
32
#include <sys/types.h>
37
#include <string.h> /* Pull in memset() */
45
#define snprintf _snprintf
50
#define DEFAULT_INITIAL_LOAD 10000
51
#define DEFAULT_EXECUTE_SECONDS 120
52
#define TEST_FILENAME "concurrency_test.az"
54
#define HUGE_STRING_LENGTH 8192
56
/* Global Thread counter */
57
unsigned int thread_counter;
58
pthread_mutex_t counter_mutex;
59
pthread_cond_t count_threshhold;
60
unsigned int master_wakeup;
61
pthread_mutex_t sleeper_mutex;
62
pthread_cond_t sleep_threshhold;
63
static bool timer_alarm= false;
64
pthread_mutex_t timer_alarm_mutex;
65
pthread_cond_t timer_alarm_threshold;
67
pthread_mutex_t row_lock;
71
void *run_concurrent_task(void *p);
72
void *timer_thread(void *p);
74
void scheduler(az_method use_aio);
75
void create_data_file(azio_stream *write_handler, uint64_t rows);
76
unsigned int write_row(azio_stream *s);
78
typedef struct thread_context_st thread_context_st;
79
struct thread_context_st {
80
unsigned int how_often_to_write;
86
/* Use this for string generation */
87
static const char ALPHANUMERICS[]=
88
"0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
90
#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1)
92
static void get_random_string(char *buffer, size_t size)
94
char *buffer_ptr= buffer;
97
*buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
98
*buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
101
int main(int argc, char *argv[])
105
drizzled::internal::my_init();
114
pthread_mutex_init(&counter_mutex, NULL);
115
pthread_cond_init(&count_threshhold, NULL);
116
pthread_mutex_init(&sleeper_mutex, NULL);
117
pthread_cond_init(&sleep_threshhold, NULL);
118
pthread_mutex_init(&timer_alarm_mutex, NULL);
119
pthread_cond_init(&timer_alarm_threshold, NULL);
120
pthread_mutex_init(&row_lock, NULL);
122
for (method= AZ_METHOD_BLOCK; method < AZ_METHOD_MAX; method++)
123
scheduler((az_method)method);
125
(void)pthread_mutex_destroy(&counter_mutex);
126
(void)pthread_cond_destroy(&count_threshhold);
127
(void)pthread_mutex_destroy(&sleeper_mutex);
128
(void)pthread_cond_destroy(&sleep_threshhold);
129
pthread_mutex_destroy(&timer_alarm_mutex);
130
pthread_cond_destroy(&timer_alarm_threshold);
131
pthread_mutex_destroy(&row_lock);
136
void scheduler(az_method use_aio)
140
azio_stream writer_handle;
141
thread_context_st *context;
142
pthread_t mainthread; /* Thread descriptor */
143
pthread_attr_t attr; /* Thread attributes */
145
pthread_attr_init(&attr);
146
pthread_attr_setdetachstate(&attr,
147
PTHREAD_CREATE_DETACHED);
149
pthread_mutex_lock(&counter_mutex);
152
create_data_file(&writer_handle, DEFAULT_INITIAL_LOAD);
154
pthread_mutex_lock(&sleeper_mutex);
156
pthread_mutex_unlock(&sleeper_mutex);
158
context= (thread_context_st *)malloc(sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
159
memset(context, 0, sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
163
fprintf(stderr, "Could not allocate memory for context\n");
167
for (x= 0; x < DEFAULT_CONCURRENCY; x++)
170
context[x].how_often_to_write= random()%1000;
171
context[x].writer= &writer_handle;
172
context[x].counter= 0;
173
context[x].use_aio= use_aio;
175
/* now you create the thread */
176
if (pthread_create(&mainthread, &attr, run_concurrent_task,
177
(void *)context) != 0)
179
fprintf(stderr,"Could not create thread\n");
185
if (DEFAULT_EXECUTE_SECONDS)
187
time_t opt_timer_length= DEFAULT_EXECUTE_SECONDS;
188
pthread_mutex_lock(&timer_alarm_mutex);
190
pthread_mutex_unlock(&timer_alarm_mutex);
192
if (pthread_create(&mainthread, &attr, timer_thread,
193
(void *)&opt_timer_length) != 0)
195
fprintf(stderr,"%s: Could not create timer thread\n", drizzled::internal::my_progname);
200
pthread_mutex_unlock(&counter_mutex);
201
pthread_attr_destroy(&attr);
203
pthread_mutex_lock(&sleeper_mutex);
205
pthread_mutex_unlock(&sleeper_mutex);
206
pthread_cond_broadcast(&sleep_threshhold);
209
We loop until we know that all children have cleaned up.
211
pthread_mutex_lock(&counter_mutex);
212
while (thread_counter)
214
struct timespec abstime;
216
memset(&abstime, 0, sizeof(struct timespec));
219
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
221
pthread_mutex_unlock(&counter_mutex);
223
for (total= x= 0; x < DEFAULT_CONCURRENCY; x++)
224
total+= context[x].counter;
227
azclose(&writer_handle);
229
printf("Read %"PRIu64" rows\n", total);
232
void *timer_thread(void *p)
234
time_t *timer_length= (time_t *)p;
235
struct timespec abstime;
238
We lock around the initial call in case were we in a loop. This
239
also keeps the value properly syncronized across call threads.
241
pthread_mutex_lock(&sleeper_mutex);
242
while (master_wakeup)
244
pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
246
pthread_mutex_unlock(&sleeper_mutex);
248
set_timespec(abstime, *timer_length);
250
pthread_mutex_lock(&timer_alarm_mutex);
251
pthread_cond_timedwait(&timer_alarm_threshold, &timer_alarm_mutex, &abstime);
252
pthread_mutex_unlock(&timer_alarm_mutex);
254
pthread_mutex_lock(&timer_alarm_mutex);
256
pthread_mutex_unlock(&timer_alarm_mutex);
261
void *run_concurrent_task(void *p)
263
thread_context_st *context= (thread_context_st *)p;
267
azio_stream reader_handle;
269
if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY,
272
printf("Could not open test file\n");
276
pthread_mutex_lock(&sleeper_mutex);
277
while (master_wakeup)
279
pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
281
pthread_mutex_unlock(&sleeper_mutex);
287
azread_init(&reader_handle);
288
while ((ret= azread_row(&reader_handle, &error)))
291
if (count % context->how_often_to_write)
293
write_row(context->writer);
296
/* If the timer is set, and the alarm is not active then end */
297
if (timer_alarm == false)
301
pthread_mutex_lock(&counter_mutex);
303
pthread_cond_signal(&count_threshhold);
304
pthread_mutex_unlock(&counter_mutex);
305
azclose(&reader_handle);
310
void create_data_file(azio_stream *write_handler, uint64_t rows)
315
if (!(ret= azopen(write_handler, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC,
318
printf("Could not create test file\n");
322
for (x= 0; x < rows; x++)
323
write_row(write_handler);
325
azflush(write_handler, Z_SYNC_FLUSH);
328
unsigned int write_row(azio_stream *s)
331
char buffer[HUGE_STRING_LENGTH];
333
length= random() % HUGE_STRING_LENGTH;
335
/* Avoid zero length strings */
338
get_random_string(buffer, length);
339
pthread_mutex_lock(&row_lock);
340
azwrite_row(s, buffer, length);
341
pthread_mutex_unlock(&row_lock);