~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/concurrency_test.c

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
  Just a test application for threads.
 
3
  */
 
4
#include "azio.h"
 
5
#include <mysql.h>
 
6
#include <my_getopt.h>
 
7
#include <mysql_version.h>
 
8
#include <stdio.h>
 
9
#include <stdlib.h>
 
10
#include <sys/types.h>
 
11
#include <sys/stat.h>
 
12
#include <sys/types.h>
 
13
#include <sys/mman.h>
 
14
#include <fcntl.h>
 
15
#include <sys/time.h>
 
16
#include <pthread.h>
 
17
#include <strings.h>
 
18
#ifndef __WIN__
 
19
#include <sys/wait.h>
 
20
#endif
 
21
 
 
22
#ifdef __WIN__
 
23
#define srandom  srand
 
24
#define random   rand
 
25
#define snprintf _snprintf
 
26
#endif
 
27
 
 
28
#include "azio.h"
 
29
 
 
30
#define DEFAULT_INITIAL_LOAD 10000
 
31
#define DEFAULT_EXECUTE_SECONDS 120
 
32
#define DEFAULT_CONCURRENCY 8
 
33
#define TEST_FILENAME "concurrency_test.az"
 
34
 
 
35
#define HUGE_STRING_LENGTH 8192
 
36
 
 
37
/* Global Thread counter */
 
38
unsigned int thread_counter;
 
39
pthread_mutex_t counter_mutex;
 
40
pthread_cond_t count_threshhold;
 
41
unsigned int master_wakeup;
 
42
pthread_mutex_t sleeper_mutex;
 
43
pthread_cond_t sleep_threshhold;
 
44
static my_bool timer_alarm= FALSE;
 
45
pthread_mutex_t timer_alarm_mutex;
 
46
pthread_cond_t timer_alarm_threshold;
 
47
 
 
48
pthread_mutex_t row_lock;
 
49
 
 
50
/* Prototypes */
 
51
void *run_task(void *p);
 
52
void *timer_thread(void *p);
 
53
void scheduler(az_method use_aio);
 
54
void create_data_file(azio_stream *write_handler, unsigned long long rows);
 
55
unsigned int write_row(azio_stream *s);
 
56
 
 
57
typedef struct thread_context_st thread_context_st;
 
58
struct thread_context_st {
 
59
  unsigned int how_often_to_write;
 
60
  unsigned long long counter;
 
61
  az_method use_aio;
 
62
  azio_stream *writer;
 
63
};
 
64
 
 
65
/* Use this for string generation */
 
66
static const char ALPHANUMERICS[]=
 
67
  "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
 
68
 
 
69
#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1)
 
70
 
 
71
static void get_random_string(char *buffer, size_t size)
 
72
{
 
73
  char *buffer_ptr= buffer;
 
74
 
 
75
  while (--size)
 
76
    *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
 
77
  *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
 
78
}
 
79
 
 
80
int main(int argc, char *argv[])
 
81
{
 
82
 
 
83
  az_method method;
 
84
  my_init();
 
85
 
 
86
  MY_INIT(argv[0]);
 
87
 
 
88
  if (argc > 1)
 
89
    exit(1);
 
90
 
 
91
  if (!(mysql_thread_safe()))
 
92
      fprintf(stderr, "This application was compiled incorrectly. Please recompile with thread support.\n");
 
93
 
 
94
  srandom(time(NULL));
 
95
 
 
96
  pthread_mutex_init(&counter_mutex, NULL);
 
97
  pthread_cond_init(&count_threshhold, NULL);
 
98
  pthread_mutex_init(&sleeper_mutex, NULL);
 
99
  pthread_cond_init(&sleep_threshhold, NULL);
 
100
  VOID(pthread_mutex_init(&timer_alarm_mutex, NULL));
 
101
  VOID(pthread_cond_init(&timer_alarm_threshold, NULL));
 
102
  VOID(pthread_mutex_init(&row_lock, NULL));
 
103
 
 
104
  for (method= AZ_METHOD_BLOCK; method < AZ_METHOD_MAX; method++)
 
105
    scheduler(method);
 
106
 
 
107
  (void)pthread_mutex_destroy(&counter_mutex);
 
108
  (void)pthread_cond_destroy(&count_threshhold);
 
109
  (void)pthread_mutex_destroy(&sleeper_mutex);
 
110
  (void)pthread_cond_destroy(&sleep_threshhold);
 
111
  VOID(pthread_mutex_destroy(&timer_alarm_mutex));
 
112
  VOID(pthread_cond_destroy(&timer_alarm_threshold));
 
113
  VOID(pthread_mutex_destroy(&row_lock));
 
114
 
 
115
  return 0;
 
116
}
 
117
 
 
118
void scheduler(az_method use_aio)
 
119
{
 
120
  unsigned int x;
 
121
  unsigned long long total;
 
122
  azio_stream writer_handle;
 
123
  thread_context_st *context;
 
124
  pthread_t mainthread;            /* Thread descriptor */
 
125
  pthread_attr_t attr;          /* Thread attributes */
 
126
 
 
127
  pthread_attr_init(&attr);
 
128
  pthread_attr_setdetachstate(&attr,
 
129
                              PTHREAD_CREATE_DETACHED);
 
130
 
 
131
  pthread_mutex_lock(&counter_mutex);
 
132
  thread_counter= 0;
 
133
 
 
134
  create_data_file(&writer_handle, DEFAULT_INITIAL_LOAD);
 
135
 
 
136
  pthread_mutex_lock(&sleeper_mutex);
 
137
  master_wakeup= 1;
 
138
  pthread_mutex_unlock(&sleeper_mutex);
 
139
 
 
140
  context= (thread_context_st *)malloc(sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
 
141
  bzero(context, sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
 
142
 
 
143
  if (!context)
 
144
  {
 
145
    fprintf(stderr, "Could not allocate memory for context\n");
 
146
    exit(1);
 
147
  }
 
148
 
 
149
  for (x= 0; x < DEFAULT_CONCURRENCY; x++)
 
150
  {
 
151
 
 
152
    context[x].how_often_to_write= random()%1000;
 
153
    context[x].writer= &writer_handle;
 
154
    context[x].counter= 0;
 
155
    context[x].use_aio= use_aio;
 
156
 
 
157
    /* now you create the thread */
 
158
    if (pthread_create(&mainthread, &attr, run_task,
 
159
                       (void *)context) != 0)
 
160
    {
 
161
      fprintf(stderr,"Could not create thread\n");
 
162
      exit(1);
 
163
    }
 
164
    thread_counter++;
 
165
  }
 
166
 
 
167
  if (DEFAULT_EXECUTE_SECONDS)
 
168
  {
 
169
    time_t opt_timer_length= DEFAULT_EXECUTE_SECONDS;
 
170
    pthread_mutex_lock(&timer_alarm_mutex);
 
171
    timer_alarm= TRUE;
 
172
    pthread_mutex_unlock(&timer_alarm_mutex);
 
173
 
 
174
    if (pthread_create(&mainthread, &attr, timer_thread, 
 
175
                       (void *)&opt_timer_length) != 0)
 
176
    {
 
177
      fprintf(stderr,"%s: Could not create timer thread\n", my_progname);
 
178
      exit(1);
 
179
    }
 
180
  }
 
181
 
 
182
  pthread_mutex_unlock(&counter_mutex);
 
183
  pthread_attr_destroy(&attr);
 
184
 
 
185
  pthread_mutex_lock(&sleeper_mutex);
 
186
  master_wakeup= 0;
 
187
  pthread_mutex_unlock(&sleeper_mutex);
 
188
  pthread_cond_broadcast(&sleep_threshhold);
 
189
 
 
190
  /*
 
191
    We loop until we know that all children have cleaned up.
 
192
  */
 
193
  pthread_mutex_lock(&counter_mutex);
 
194
  while (thread_counter)
 
195
  {
 
196
    struct timespec abstime;
 
197
 
 
198
    bzero(&abstime, sizeof(struct timespec));
 
199
    abstime.tv_sec= 1;
 
200
 
 
201
    pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
 
202
  }
 
203
  pthread_mutex_unlock(&counter_mutex);
 
204
 
 
205
  for (total= x= 0; x < DEFAULT_CONCURRENCY; x++)
 
206
    total+= context[x].counter;
 
207
 
 
208
  free(context);
 
209
  azclose(&writer_handle);
 
210
 
 
211
  printf("Read %llu rows\n", total);
 
212
}
 
213
 
 
214
void *timer_thread(void *p)
 
215
{
 
216
  time_t *timer_length= (time_t *)p;
 
217
  struct timespec abstime;
 
218
 
 
219
  if (mysql_thread_init())
 
220
  {
 
221
    fprintf(stderr,"%s: mysql_thread_init() failed.\n",
 
222
            my_progname);
 
223
    exit(1);
 
224
  }
 
225
 
 
226
  /* 
 
227
    We lock around the initial call in case were we in a loop. This 
 
228
    also keeps the value properly syncronized across call threads.
 
229
  */
 
230
  pthread_mutex_lock(&sleeper_mutex);
 
231
  while (master_wakeup)
 
232
  {
 
233
    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
 
234
  }
 
235
  pthread_mutex_unlock(&sleeper_mutex);
 
236
 
 
237
  set_timespec(abstime, *timer_length);
 
238
 
 
239
  pthread_mutex_lock(&timer_alarm_mutex);
 
240
  pthread_cond_timedwait(&timer_alarm_threshold, &timer_alarm_mutex, &abstime);
 
241
  pthread_mutex_unlock(&timer_alarm_mutex);
 
242
 
 
243
  pthread_mutex_lock(&timer_alarm_mutex);
 
244
  timer_alarm= FALSE;
 
245
  pthread_mutex_unlock(&timer_alarm_mutex);
 
246
 
 
247
  mysql_thread_end();
 
248
 
 
249
  return 0;
 
250
}
 
251
 
 
252
void *run_task(void *p)
 
253
{
 
254
  thread_context_st *context= (thread_context_st *)p;
 
255
  unsigned long long count;
 
256
  int ret;
 
257
  int error;
 
258
  azio_stream reader_handle;
 
259
 
 
260
  if (mysql_thread_init())
 
261
  {
 
262
    fprintf(stderr,"%s: mysql_thread_init() failed.\n", my_progname);
 
263
    exit(1);
 
264
  }
 
265
 
 
266
  if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY,
 
267
                    context->use_aio)))
 
268
  {
 
269
    printf("Could not open test file\n");
 
270
    return 0;
 
271
  }
 
272
 
 
273
  pthread_mutex_lock(&sleeper_mutex);
 
274
  while (master_wakeup)
 
275
  {
 
276
    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
 
277
  } 
 
278
  pthread_mutex_unlock(&sleeper_mutex);
 
279
 
 
280
  /* Do Stuff */
 
281
  count= 0;
 
282
  while (1)
 
283
  {
 
284
    azread_init(&reader_handle);
 
285
    while ((ret= azread_row(&reader_handle, &error)))
 
286
      context->counter++;
 
287
 
 
288
    if (count % context->how_often_to_write)
 
289
    {
 
290
      write_row(context->writer);
 
291
    }
 
292
 
 
293
    /* If the timer is set, and the alarm is not active then end */
 
294
    if (timer_alarm == FALSE)
 
295
      break;
 
296
  }
 
297
 
 
298
  pthread_mutex_lock(&counter_mutex);
 
299
  thread_counter--;
 
300
  pthread_cond_signal(&count_threshhold);
 
301
  pthread_mutex_unlock(&counter_mutex);
 
302
  azclose(&reader_handle);
 
303
 
 
304
  mysql_thread_end();
 
305
 
 
306
  return NULL;
 
307
}
 
308
 
 
309
void create_data_file(azio_stream *write_handler, unsigned long long rows)
 
310
{
 
311
  int ret;
 
312
  unsigned long long x;
 
313
 
 
314
  if (!(ret= azopen(write_handler, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC|O_BINARY,
 
315
                    AZ_METHOD_BLOCK)))
 
316
  {
 
317
    printf("Could not create test file\n");
 
318
    exit(1);
 
319
  }
 
320
 
 
321
  for (x= 0; x < rows; x++)
 
322
    write_row(write_handler);
 
323
 
 
324
  azflush(write_handler, Z_SYNC_FLUSH);
 
325
}
 
326
 
 
327
unsigned int write_row(azio_stream *s)
 
328
{
 
329
  size_t length;
 
330
  char buffer[HUGE_STRING_LENGTH];
 
331
 
 
332
  length= random() % HUGE_STRING_LENGTH;
 
333
 
 
334
  /* Avoid zero length strings */
 
335
  length++;
 
336
 
 
337
  get_random_string(buffer, length); 
 
338
  pthread_mutex_lock(&row_lock);
 
339
  azwrite_row(s, buffer, length);
 
340
  pthread_mutex_unlock(&row_lock);
 
341
 
 
342
  return 0;
 
343
}