~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/*
2
  Just a test application for threads.
3
  */
4
#include "azio.h"
383.1.44 by Monty Taylor
Renamed drizzle.h to libdrizzle.h.
5
#include <libdrizzle/libdrizzle.h>
212.5.21 by Monty Taylor
Moved my_getopt.h
6
#include <mysys/my_getopt.h>
1 by brian
clean slate
7
#include <stdio.h>
8
#include <stdlib.h>
9
#include <sys/types.h>
10
#include <sys/stat.h>
11
#include <sys/types.h>
12
#include <sys/mman.h>
13
#include <fcntl.h>
14
#include <sys/time.h>
15
#include <pthread.h>
212.6.1 by Mats Kindahl
Replacing all bzero() calls with memset() calls and removing the bzero.c file.
16
#include <string.h>                             /* Pull in memset() */
1 by brian
clean slate
17
#ifndef __WIN__
18
#include <sys/wait.h>
19
#endif
20
21
#ifdef __WIN__
22
#define srandom  srand
23
#define random   rand
24
#define snprintf _snprintf
25
#endif
26
27
#include "azio.h"
28
29
#define DEFAULT_INITIAL_LOAD 10000
30
#define DEFAULT_EXECUTE_SECONDS 120
31
#define DEFAULT_CONCURRENCY 8
32
#define TEST_FILENAME "concurrency_test.az"
33
34
#define HUGE_STRING_LENGTH 8192
35
36
/* Global Thread counter */
37
unsigned int thread_counter;
38
pthread_mutex_t counter_mutex;
39
pthread_cond_t count_threshhold;
40
unsigned int master_wakeup;
41
pthread_mutex_t sleeper_mutex;
42
pthread_cond_t sleep_threshhold;
282 by Brian Aker
Modified blackhole and archive to remove my_bool.
43
static bool timer_alarm= false;
1 by brian
clean slate
44
pthread_mutex_t timer_alarm_mutex;
45
pthread_cond_t timer_alarm_threshold;
46
47
pthread_mutex_t row_lock;
48
49
/* Prototypes */
50
void *run_task(void *p);
51
void *timer_thread(void *p);
52
void scheduler(az_method use_aio);
53
void create_data_file(azio_stream *write_handler, unsigned long long rows);
54
unsigned int write_row(azio_stream *s);
55
56
typedef struct thread_context_st thread_context_st;
57
struct thread_context_st {
58
  unsigned int how_often_to_write;
59
  unsigned long long counter;
60
  az_method use_aio;
61
  azio_stream *writer;
62
};
63
64
/* Use this for string generation */
65
static const char ALPHANUMERICS[]=
66
  "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
67
68
#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1)
69
70
static void get_random_string(char *buffer, size_t size)
71
{
72
  char *buffer_ptr= buffer;
73
74
  while (--size)
75
    *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
76
  *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
77
}
78
79
int main(int argc, char *argv[])
80
{
81
82
  az_method method;
83
  my_init();
84
85
  MY_INIT(argv[0]);
86
87
  if (argc > 1)
88
    exit(1);
89
90
  srandom(time(NULL));
91
92
  pthread_mutex_init(&counter_mutex, NULL);
93
  pthread_cond_init(&count_threshhold, NULL);
94
  pthread_mutex_init(&sleeper_mutex, NULL);
95
  pthread_cond_init(&sleep_threshhold, NULL);
96
  VOID(pthread_mutex_init(&timer_alarm_mutex, NULL));
97
  VOID(pthread_cond_init(&timer_alarm_threshold, NULL));
98
  VOID(pthread_mutex_init(&row_lock, NULL));
99
100
  for (method= AZ_METHOD_BLOCK; method < AZ_METHOD_MAX; method++)
101
    scheduler(method);
102
103
  (void)pthread_mutex_destroy(&counter_mutex);
104
  (void)pthread_cond_destroy(&count_threshhold);
105
  (void)pthread_mutex_destroy(&sleeper_mutex);
106
  (void)pthread_cond_destroy(&sleep_threshhold);
107
  VOID(pthread_mutex_destroy(&timer_alarm_mutex));
108
  VOID(pthread_cond_destroy(&timer_alarm_threshold));
109
  VOID(pthread_mutex_destroy(&row_lock));
110
111
  return 0;
112
}
113
114
void scheduler(az_method use_aio)
115
{
116
  unsigned int x;
117
  unsigned long long total;
118
  azio_stream writer_handle;
119
  thread_context_st *context;
120
  pthread_t mainthread;            /* Thread descriptor */
121
  pthread_attr_t attr;          /* Thread attributes */
122
123
  pthread_attr_init(&attr);
124
  pthread_attr_setdetachstate(&attr,
125
                              PTHREAD_CREATE_DETACHED);
126
127
  pthread_mutex_lock(&counter_mutex);
128
  thread_counter= 0;
129
130
  create_data_file(&writer_handle, DEFAULT_INITIAL_LOAD);
131
132
  pthread_mutex_lock(&sleeper_mutex);
133
  master_wakeup= 1;
134
  pthread_mutex_unlock(&sleeper_mutex);
135
136
  context= (thread_context_st *)malloc(sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
212.6.1 by Mats Kindahl
Replacing all bzero() calls with memset() calls and removing the bzero.c file.
137
  memset(context, 0, sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
1 by brian
clean slate
138
139
  if (!context)
140
  {
141
    fprintf(stderr, "Could not allocate memory for context\n");
142
    exit(1);
143
  }
144
145
  for (x= 0; x < DEFAULT_CONCURRENCY; x++)
146
  {
147
148
    context[x].how_often_to_write= random()%1000;
149
    context[x].writer= &writer_handle;
150
    context[x].counter= 0;
151
    context[x].use_aio= use_aio;
152
153
    /* now you create the thread */
154
    if (pthread_create(&mainthread, &attr, run_task,
155
                       (void *)context) != 0)
156
    {
157
      fprintf(stderr,"Could not create thread\n");
158
      exit(1);
159
    }
160
    thread_counter++;
161
  }
162
163
  if (DEFAULT_EXECUTE_SECONDS)
164
  {
165
    time_t opt_timer_length= DEFAULT_EXECUTE_SECONDS;
166
    pthread_mutex_lock(&timer_alarm_mutex);
163 by Brian Aker
Merge Monty's code.
167
    timer_alarm= true;
1 by brian
clean slate
168
    pthread_mutex_unlock(&timer_alarm_mutex);
169
170
    if (pthread_create(&mainthread, &attr, timer_thread, 
171
                       (void *)&opt_timer_length) != 0)
172
    {
173
      fprintf(stderr,"%s: Could not create timer thread\n", my_progname);
174
      exit(1);
175
    }
176
  }
177
178
  pthread_mutex_unlock(&counter_mutex);
179
  pthread_attr_destroy(&attr);
180
181
  pthread_mutex_lock(&sleeper_mutex);
182
  master_wakeup= 0;
183
  pthread_mutex_unlock(&sleeper_mutex);
184
  pthread_cond_broadcast(&sleep_threshhold);
185
186
  /*
187
    We loop until we know that all children have cleaned up.
188
  */
189
  pthread_mutex_lock(&counter_mutex);
190
  while (thread_counter)
191
  {
192
    struct timespec abstime;
193
212.6.1 by Mats Kindahl
Replacing all bzero() calls with memset() calls and removing the bzero.c file.
194
    memset(&abstime, 0, sizeof(struct timespec));
1 by brian
clean slate
195
    abstime.tv_sec= 1;
196
197
    pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
198
  }
199
  pthread_mutex_unlock(&counter_mutex);
200
201
  for (total= x= 0; x < DEFAULT_CONCURRENCY; x++)
202
    total+= context[x].counter;
203
204
  free(context);
205
  azclose(&writer_handle);
206
207
  printf("Read %llu rows\n", total);
208
}
209
210
void *timer_thread(void *p)
211
{
212
  time_t *timer_length= (time_t *)p;
213
  struct timespec abstime;
214
215
  /* 
216
    We lock around the initial call in case were we in a loop. This 
217
    also keeps the value properly syncronized across call threads.
218
  */
219
  pthread_mutex_lock(&sleeper_mutex);
220
  while (master_wakeup)
221
  {
222
    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
223
  }
224
  pthread_mutex_unlock(&sleeper_mutex);
225
226
  set_timespec(abstime, *timer_length);
227
228
  pthread_mutex_lock(&timer_alarm_mutex);
229
  pthread_cond_timedwait(&timer_alarm_threshold, &timer_alarm_mutex, &abstime);
230
  pthread_mutex_unlock(&timer_alarm_mutex);
231
232
  pthread_mutex_lock(&timer_alarm_mutex);
163 by Brian Aker
Merge Monty's code.
233
  timer_alarm= false;
1 by brian
clean slate
234
  pthread_mutex_unlock(&timer_alarm_mutex);
235
236
  return 0;
237
}
238
239
void *run_task(void *p)
240
{
241
  thread_context_st *context= (thread_context_st *)p;
242
  unsigned long long count;
243
  int ret;
244
  int error;
245
  azio_stream reader_handle;
246
247
  if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY,
248
                    context->use_aio)))
249
  {
250
    printf("Could not open test file\n");
251
    return 0;
252
  }
253
254
  pthread_mutex_lock(&sleeper_mutex);
255
  while (master_wakeup)
256
  {
257
    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
258
  } 
259
  pthread_mutex_unlock(&sleeper_mutex);
260
261
  /* Do Stuff */
262
  count= 0;
263
  while (1)
264
  {
265
    azread_init(&reader_handle);
266
    while ((ret= azread_row(&reader_handle, &error)))
267
      context->counter++;
268
269
    if (count % context->how_often_to_write)
270
    {
271
      write_row(context->writer);
272
    }
273
274
    /* If the timer is set, and the alarm is not active then end */
163 by Brian Aker
Merge Monty's code.
275
    if (timer_alarm == false)
1 by brian
clean slate
276
      break;
277
  }
278
279
  pthread_mutex_lock(&counter_mutex);
280
  thread_counter--;
281
  pthread_cond_signal(&count_threshhold);
282
  pthread_mutex_unlock(&counter_mutex);
283
  azclose(&reader_handle);
284
285
  return NULL;
286
}
287
288
void create_data_file(azio_stream *write_handler, unsigned long long rows)
289
{
290
  int ret;
291
  unsigned long long x;
292
293
  if (!(ret= azopen(write_handler, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC|O_BINARY,
294
                    AZ_METHOD_BLOCK)))
295
  {
296
    printf("Could not create test file\n");
297
    exit(1);
298
  }
299
300
  for (x= 0; x < rows; x++)
301
    write_row(write_handler);
302
303
  azflush(write_handler, Z_SYNC_FLUSH);
304
}
305
306
unsigned int write_row(azio_stream *s)
307
{
308
  size_t length;
309
  char buffer[HUGE_STRING_LENGTH];
310
311
  length= random() % HUGE_STRING_LENGTH;
312
313
  /* Avoid zero length strings */
314
  length++;
315
316
  get_random_string(buffer, length); 
317
  pthread_mutex_lock(&row_lock);
318
  azwrite_row(s, buffer, length);
319
  pthread_mutex_unlock(&row_lock);
320
321
  return 0;
322
}