~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/concurrency_test.cc

Merge Monty.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2009 Sun Microsystems
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; version 2 of the License.
 
9
 *
 
10
 *  This program is distributed in the hope that it will be useful,
 
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 *  GNU General Public License for more details.
 
14
 *
 
15
 *  You should have received a copy of the GNU General Public License
 
16
 *  along with this program; if not, write to the Free Software
 
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 */
 
19
 
1
20
/*
2
21
  Just a test application for threads.
3
22
  */
 
23
 
 
24
#include "config.h"
 
25
 
4
26
#include "azio.h"
5
 
#include <drizzle.h>
6
 
#include <my_getopt.h>
7
 
#include <drizzle_version.h>
 
27
#include "drizzled/my_getopt.h"
8
28
#include <stdio.h>
9
29
#include <stdlib.h>
10
30
#include <sys/types.h>
14
34
#include <fcntl.h>
15
35
#include <sys/time.h>
16
36
#include <pthread.h>
17
 
#include <strings.h>
 
37
#include <string.h>                             /* Pull in memset() */
18
38
#ifndef __WIN__
19
39
#include <sys/wait.h>
20
40
#endif
29
49
 
30
50
#define DEFAULT_INITIAL_LOAD 10000
31
51
#define DEFAULT_EXECUTE_SECONDS 120
32
 
#define DEFAULT_CONCURRENCY 8
33
52
#define TEST_FILENAME "concurrency_test.az"
34
53
 
35
54
#define HUGE_STRING_LENGTH 8192
41
60
unsigned int master_wakeup;
42
61
pthread_mutex_t sleeper_mutex;
43
62
pthread_cond_t sleep_threshhold;
44
 
static my_bool timer_alarm= false;
 
63
static bool timer_alarm= false;
45
64
pthread_mutex_t timer_alarm_mutex;
46
65
pthread_cond_t timer_alarm_threshold;
47
66
 
48
67
pthread_mutex_t row_lock;
49
68
 
50
69
/* Prototypes */
51
 
void *run_task(void *p);
52
 
void *timer_thread(void *p);
 
70
extern "C" {
 
71
  void *run_concurrent_task(void *p);
 
72
  void *timer_thread(void *p);
 
73
}
53
74
void scheduler(az_method use_aio);
54
 
void create_data_file(azio_stream *write_handler, unsigned long long rows);
 
75
void create_data_file(azio_stream *write_handler, uint64_t rows);
55
76
unsigned int write_row(azio_stream *s);
56
77
 
57
78
typedef struct thread_context_st thread_context_st;
58
79
struct thread_context_st {
59
80
  unsigned int how_often_to_write;
60
 
  unsigned long long counter;
 
81
  uint64_t counter;
61
82
  az_method use_aio;
62
83
  azio_stream *writer;
63
84
};
80
101
int main(int argc, char *argv[])
81
102
{
82
103
 
83
 
  az_method method;
 
104
  unsigned int method;
84
105
  my_init();
85
106
 
86
107
  MY_INIT(argv[0]);
88
109
  if (argc > 1)
89
110
    exit(1);
90
111
 
91
 
  if (!(drizzle_thread_safe()))
92
 
      fprintf(stderr, "This application was compiled incorrectly. Please recompile with thread support.\n");
93
 
 
94
112
  srandom(time(NULL));
95
113
 
96
114
  pthread_mutex_init(&counter_mutex, NULL);
97
115
  pthread_cond_init(&count_threshhold, NULL);
98
116
  pthread_mutex_init(&sleeper_mutex, NULL);
99
117
  pthread_cond_init(&sleep_threshhold, NULL);
100
 
  VOID(pthread_mutex_init(&timer_alarm_mutex, NULL));
101
 
  VOID(pthread_cond_init(&timer_alarm_threshold, NULL));
102
 
  VOID(pthread_mutex_init(&row_lock, NULL));
 
118
  pthread_mutex_init(&timer_alarm_mutex, NULL);
 
119
  pthread_cond_init(&timer_alarm_threshold, NULL);
 
120
  pthread_mutex_init(&row_lock, NULL);
103
121
 
104
122
  for (method= AZ_METHOD_BLOCK; method < AZ_METHOD_MAX; method++)
105
 
    scheduler(method);
 
123
    scheduler((az_method)method);
106
124
 
107
125
  (void)pthread_mutex_destroy(&counter_mutex);
108
126
  (void)pthread_cond_destroy(&count_threshhold);
109
127
  (void)pthread_mutex_destroy(&sleeper_mutex);
110
128
  (void)pthread_cond_destroy(&sleep_threshhold);
111
 
  VOID(pthread_mutex_destroy(&timer_alarm_mutex));
112
 
  VOID(pthread_cond_destroy(&timer_alarm_threshold));
113
 
  VOID(pthread_mutex_destroy(&row_lock));
 
129
  pthread_mutex_destroy(&timer_alarm_mutex);
 
130
  pthread_cond_destroy(&timer_alarm_threshold);
 
131
  pthread_mutex_destroy(&row_lock);
114
132
 
115
133
  return 0;
116
134
}
118
136
void scheduler(az_method use_aio)
119
137
{
120
138
  unsigned int x;
121
 
  unsigned long long total;
 
139
  uint64_t total;
122
140
  azio_stream writer_handle;
123
141
  thread_context_st *context;
124
142
  pthread_t mainthread;            /* Thread descriptor */
138
156
  pthread_mutex_unlock(&sleeper_mutex);
139
157
 
140
158
  context= (thread_context_st *)malloc(sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
141
 
  bzero(context, sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
 
159
  memset(context, 0, sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
142
160
 
143
161
  if (!context)
144
162
  {
155
173
    context[x].use_aio= use_aio;
156
174
 
157
175
    /* now you create the thread */
158
 
    if (pthread_create(&mainthread, &attr, run_task,
 
176
    if (pthread_create(&mainthread, &attr, run_concurrent_task,
159
177
                       (void *)context) != 0)
160
178
    {
161
179
      fprintf(stderr,"Could not create thread\n");
171
189
    timer_alarm= true;
172
190
    pthread_mutex_unlock(&timer_alarm_mutex);
173
191
 
174
 
    if (pthread_create(&mainthread, &attr, timer_thread, 
 
192
    if (pthread_create(&mainthread, &attr, timer_thread,
175
193
                       (void *)&opt_timer_length) != 0)
176
194
    {
177
195
      fprintf(stderr,"%s: Could not create timer thread\n", my_progname);
195
213
  {
196
214
    struct timespec abstime;
197
215
 
198
 
    bzero(&abstime, sizeof(struct timespec));
 
216
    memset(&abstime, 0, sizeof(struct timespec));
199
217
    abstime.tv_sec= 1;
200
218
 
201
219
    pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
208
226
  free(context);
209
227
  azclose(&writer_handle);
210
228
 
211
 
  printf("Read %llu rows\n", total);
 
229
  printf("Read %"PRIu64" rows\n", total);
212
230
}
213
231
 
214
232
void *timer_thread(void *p)
216
234
  time_t *timer_length= (time_t *)p;
217
235
  struct timespec abstime;
218
236
 
219
 
  if (drizzle_thread_init())
220
 
  {
221
 
    fprintf(stderr,"%s: drizzle_thread_init() failed.\n",
222
 
            my_progname);
223
 
    exit(1);
224
 
  }
225
 
 
226
 
  /* 
227
 
    We lock around the initial call in case were we in a loop. This 
 
237
  /*
 
238
    We lock around the initial call in case were we in a loop. This
228
239
    also keeps the value properly syncronized across call threads.
229
240
  */
230
241
  pthread_mutex_lock(&sleeper_mutex);
244
255
  timer_alarm= false;
245
256
  pthread_mutex_unlock(&timer_alarm_mutex);
246
257
 
247
 
  drizzle_thread_end();
248
 
 
249
258
  return 0;
250
259
}
251
260
 
252
 
void *run_task(void *p)
 
261
void *run_concurrent_task(void *p)
253
262
{
254
263
  thread_context_st *context= (thread_context_st *)p;
255
 
  unsigned long long count;
 
264
  uint64_t count;
256
265
  int ret;
257
266
  int error;
258
267
  azio_stream reader_handle;
259
268
 
260
 
  if (drizzle_thread_init())
261
 
  {
262
 
    fprintf(stderr,"%s: drizzle_thread_init() failed.\n", my_progname);
263
 
    exit(1);
264
 
  }
265
 
 
266
 
  if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY,
 
269
  if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY,
267
270
                    context->use_aio)))
268
271
  {
269
272
    printf("Could not open test file\n");
274
277
  while (master_wakeup)
275
278
  {
276
279
    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
277
 
  } 
 
280
  }
278
281
  pthread_mutex_unlock(&sleeper_mutex);
279
282
 
280
283
  /* Do Stuff */
301
304
  pthread_mutex_unlock(&counter_mutex);
302
305
  azclose(&reader_handle);
303
306
 
304
 
  drizzle_thread_end();
305
 
 
306
307
  return NULL;
307
308
}
308
309
 
309
 
void create_data_file(azio_stream *write_handler, unsigned long long rows)
 
310
void create_data_file(azio_stream *write_handler, uint64_t rows)
310
311
{
311
312
  int ret;
312
 
  unsigned long long x;
 
313
  uint64_t x;
313
314
 
314
 
  if (!(ret= azopen(write_handler, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC|O_BINARY,
 
315
  if (!(ret= azopen(write_handler, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC,
315
316
                    AZ_METHOD_BLOCK)))
316
317
  {
317
318
    printf("Could not create test file\n");
334
335
  /* Avoid zero length strings */
335
336
  length++;
336
337
 
337
 
  get_random_string(buffer, length); 
 
338
  get_random_string(buffer, length);
338
339
  pthread_mutex_lock(&row_lock);
339
340
  azwrite_row(s, buffer, length);
340
341
  pthread_mutex_unlock(&row_lock);