~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/scheduler.cc

  • Committer: Stewart Smith
  • Date: 2008-12-18 23:54:47 UTC
  • mto: This revision was merged to the branch mainline in revision 719.
  • Revision ID: stewart@flamingspork.com-20081218235447-qf6dofgd6guwefo2
fix RENAME TABLE

(problem was missing hton to mysql_rename_table)

If engine implements table_exists_in_engine we get the hton from there.

Else, we reintroduce the mysql_frm_type hackery (temporarily, it *will* go away again with FRM removal).

also fix rename test: no views, plus current FLUSH LOCKS bug.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2007 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Implementation for the thread scheduler
 
18
*/
 
19
 
 
20
#include <drizzled/server_includes.h>
 
21
#include <libdrizzle/libdrizzle.h>
 
22
#include <event.h>
 
23
#include <drizzled/gettext.h>
 
24
#include <drizzled/sql_parse.h>
 
25
#include <drizzled/scheduler.h>
 
26
#include <drizzled/session.h>
 
27
/* API for connecting, logging in to a drizzled server */
 
28
#include <drizzled/connect.h>
 
29
 
 
30
/*
 
31
  'Dummy' functions to be used when we don't need any handling for a scheduler
 
32
  event
 
33
 */
 
34
 
 
35
static bool init_dummy(void) {return 0;}
 
36
static void post_kill_dummy(Session *session __attribute__((unused))) {}
 
37
static void end_dummy(void) {}
 
38
static bool end_thread_dummy(Session *session __attribute__((unused)),
 
39
                             bool cache_thread __attribute__((unused)))
 
40
{ return 0; }
 
41
 
 
42
/*
 
43
  Initialize default scheduler with dummy functions so that setup functions
 
44
  only need to declare those that are relvant for their usage
 
45
*/
 
46
 
 
47
scheduler_functions::scheduler_functions()
 
48
  :init(init_dummy),
 
49
   init_new_connection_thread(init_new_connection_handler_thread),
 
50
   add_connection(0),                           // Must be defined
 
51
   post_kill_notification(post_kill_dummy),
 
52
   end_thread(end_thread_dummy), end(end_dummy)
 
53
{}
 
54
 
 
55
static uint32_t created_threads, killed_threads;
 
56
static bool kill_pool_threads;
 
57
 
 
58
static struct event session_add_event;
 
59
static struct event session_kill_event;
 
60
 
 
61
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
 
62
static LIST *sessions_need_adding;    /* list of sessions to add to libevent queue */
 
63
 
 
64
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
65
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
66
 
 
67
/*
 
68
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and
 
69
  event_del) and sessions_need_processing and sessions_waiting_for_io.
 
70
*/
 
71
static pthread_mutex_t LOCK_event_loop;
 
72
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
 
73
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
 
74
 
 
75
pthread_handler_t libevent_thread_proc(void *arg);
 
76
static void libevent_end();
 
77
static bool libevent_needs_immediate_processing(Session *session);
 
78
static void libevent_connection_close(Session *session);
 
79
static bool libevent_should_close_connection(Session* session);
 
80
static void libevent_session_add(Session* session);
 
81
void libevent_io_callback(int Fd, short Operation, void *ctx);
 
82
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
 
83
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
 
84
 
 
85
 
 
86
/*
 
87
  Create a pipe and set to non-blocking.
 
88
  Returns true if there is an error.
 
89
*/
 
90
 
 
91
static bool init_pipe(int pipe_fds[])
 
92
{
 
93
  int flags;
 
94
  return pipe(pipe_fds) < 0 ||
 
95
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
 
96
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
 
97
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
 
98
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
 
99
}
 
100
 
 
101
 
 
102
/*
 
103
  session_scheduler keeps the link between Session and events.
 
104
  It's embedded in the Session class.
 
105
*/
 
106
 
 
107
session_scheduler::session_scheduler()
 
108
  : logged_in(false), io_event(NULL), thread_attached(false)
 
109
{
 
110
}
 
111
 
 
112
 
 
113
session_scheduler::~session_scheduler()
 
114
{
 
115
  delete io_event;
 
116
}
 
117
 
 
118
 
 
119
session_scheduler::session_scheduler(const session_scheduler&)
 
120
  : logged_in(false), io_event(NULL), thread_attached(false)
 
121
{}
 
122
 
 
123
void session_scheduler::operator=(const session_scheduler&)
 
124
{}
 
125
 
 
126
bool session_scheduler::init(Session *parent_session)
 
127
{
 
128
  io_event= new struct event;
 
129
 
 
130
  if (io_event == NULL)
 
131
  {
 
132
    sql_print_error(_("Memory allocation error in session_scheduler::init\n"));
 
133
    return true;
 
134
  }
 
135
  memset(io_event, 0, sizeof(*io_event));
 
136
 
 
137
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
 
138
            libevent_io_callback, (void*)parent_session);
 
139
 
 
140
  list.data= parent_session;
 
141
 
 
142
  return false;
 
143
}
 
144
 
 
145
 
 
146
/*
 
147
  Attach/associate the connection with the OS thread, for command processing.
 
148
*/
 
149
 
 
150
bool session_scheduler::thread_attach()
 
151
{
 
152
  assert(!thread_attached);
 
153
  Session* session = (Session*)list.data;
 
154
  if (libevent_should_close_connection(session) ||
 
155
      setup_connection_thread_globals(session))
 
156
  {
 
157
    return true;
 
158
  }
 
159
  my_errno= 0;
 
160
  session->mysys_var->abort= 0;
 
161
  thread_attached= true;
 
162
 
 
163
  return false;
 
164
}
 
165
 
 
166
 
 
167
/*
 
168
  Detach/disassociate the connection with the OS thread.
 
169
*/
 
170
 
 
171
void session_scheduler::thread_detach()
 
172
{
 
173
  if (thread_attached)
 
174
  {
 
175
    Session* session = (Session*)list.data;
 
176
    session->mysys_var= NULL;
 
177
    thread_attached= false;
 
178
  }
 
179
}
 
180
 
 
181
/**
 
182
  Create all threads for the thread pool
 
183
 
 
184
  NOTES
 
185
    After threads are created we wait until all threads has signaled that
 
186
    they have started before we return
 
187
 
 
188
  RETURN
 
189
    0  ok
 
190
    1  We got an error creating the thread pool
 
191
       In this case we will abort all created threads
 
192
*/
 
193
 
 
194
static bool libevent_init(void)
 
195
{
 
196
  uint32_t i;
 
197
 
 
198
  event_init();
 
199
 
 
200
  created_threads= 0;
 
201
  killed_threads= 0;
 
202
  kill_pool_threads= false;
 
203
 
 
204
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
205
  pthread_mutex_init(&LOCK_session_add, NULL);
 
206
 
 
207
  /* set up the pipe used to add new sessions to the event pool */
 
208
  if (init_pipe(session_add_pipe))
 
209
  {
 
210
    sql_print_error(_("init_pipe(session_add_pipe) error in libevent_init\n"));
 
211
    return(1);
 
212
  }
 
213
  /* set up the pipe used to kill sessions in the event queue */
 
214
  if (init_pipe(session_kill_pipe))
 
215
  {
 
216
    sql_print_error(_("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
217
    close(session_add_pipe[0]);
 
218
    close(session_add_pipe[1]);
 
219
    return(1);
 
220
  }
 
221
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
222
            libevent_add_session_callback, NULL);
 
223
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
224
            libevent_kill_session_callback, NULL);
 
225
 
 
226
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
 
227
 {
 
228
   sql_print_error(_("session_add_event event_add error in libevent_init\n"));
 
229
   libevent_end();
 
230
   return(1);
 
231
 
 
232
 }
 
233
  /* Set up the thread pool */
 
234
  created_threads= killed_threads= 0;
 
235
  pthread_mutex_lock(&LOCK_thread_count);
 
236
 
 
237
  for (i= 0; i < thread_pool_size; i++)
 
238
  {
 
239
    pthread_t thread;
 
240
    int error;
 
241
    if ((error= pthread_create(&thread, &connection_attrib,
 
242
                               libevent_thread_proc, 0)))
 
243
    {
 
244
      sql_print_error(_("Can't create completion port thread (error %d)"),
 
245
                      error);
 
246
      pthread_mutex_unlock(&LOCK_thread_count);
 
247
      libevent_end();                      // Cleanup
 
248
      return(true);
 
249
    }
 
250
  }
 
251
 
 
252
  /* Wait until all threads are created */
 
253
  while (created_threads != thread_pool_size)
 
254
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
255
  pthread_mutex_unlock(&LOCK_thread_count);
 
256
 
 
257
  return(false);
 
258
}
 
259
 
 
260
 
 
261
/*
 
262
  This is called when data is ready on the socket.
 
263
 
 
264
  NOTES
 
265
    This is only called by the thread that owns LOCK_event_loop.
 
266
 
 
267
    We add the session that got the data to sessions_need_processing, and
 
268
    cause the libevent event_loop() to terminate. Then this same thread will
 
269
    return from event_loop and pick the session value back up for processing.
 
270
*/
 
271
 
 
272
void libevent_io_callback(int, short, void *ctx)
 
273
{
 
274
  safe_mutex_assert_owner(&LOCK_event_loop);
 
275
  Session *session= (Session*)ctx;
 
276
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
 
277
  sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
 
278
}
 
279
 
 
280
/*
 
281
  This is called when we have a thread we want to be killed.
 
282
 
 
283
  NOTES
 
284
    This is only called by the thread that owns LOCK_event_loop.
 
285
*/
 
286
 
 
287
void libevent_kill_session_callback(int Fd, short, void*)
 
288
{
 
289
  safe_mutex_assert_owner(&LOCK_event_loop);
 
290
 
 
291
  /* clear the pending events */
 
292
  char c;
 
293
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
294
  {}
 
295
 
 
296
  LIST* list= sessions_waiting_for_io;
 
297
  while (list)
 
298
  {
 
299
    Session *session= (Session*)list->data;
 
300
    list= list_rest(list);
 
301
    if (session->killed == Session::KILL_CONNECTION)
 
302
    {
 
303
      /*
 
304
        Delete from libevent and add to the processing queue.
 
305
      */
 
306
      event_del(session->scheduler.io_event);
 
307
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
 
308
                                       &session->scheduler.list);
 
309
      sessions_need_processing= list_add(sessions_need_processing,
 
310
                                     &session->scheduler.list);
 
311
    }
 
312
  }
 
313
}
 
314
 
 
315
 
 
316
/*
 
317
  This is used to add connections to the pool. This callback is invoked from
 
318
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
 
319
  written to it.
 
320
 
 
321
  NOTES
 
322
    This is only called by the thread that owns LOCK_event_loop.
 
323
*/
 
324
 
 
325
void libevent_add_session_callback(int Fd, short, void *)
 
326
{
 
327
  safe_mutex_assert_owner(&LOCK_event_loop);
 
328
 
 
329
  /* clear the pending events */
 
330
  char c;
 
331
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
332
  {}
 
333
 
 
334
  pthread_mutex_lock(&LOCK_session_add);
 
335
  while (sessions_need_adding)
 
336
  {
 
337
    /* pop the first session off the list */
 
338
    Session* session= (Session*)sessions_need_adding->data;
 
339
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
 
340
 
 
341
    pthread_mutex_unlock(&LOCK_session_add);
 
342
 
 
343
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
 
344
    {
 
345
      /*
 
346
        Add session to sessions_need_processing list. If it needs closing we'll close
 
347
        it outside of event_loop().
 
348
      */
 
349
      sessions_need_processing= list_add(sessions_need_processing,
 
350
                                     &session->scheduler.list);
 
351
    }
 
352
    else
 
353
    {
 
354
      /* Add to libevent */
 
355
      if (event_add(session->scheduler.io_event, NULL))
 
356
      {
 
357
        sql_print_error(_("event_add error in libevent_add_session_callback\n"));
 
358
        libevent_connection_close(session);
 
359
      }
 
360
      else
 
361
      {
 
362
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
 
363
                                      &session->scheduler.list);
 
364
      }
 
365
    }
 
366
    pthread_mutex_lock(&LOCK_session_add);
 
367
  }
 
368
  pthread_mutex_unlock(&LOCK_session_add);
 
369
}
 
370
 
 
371
 
 
372
/**
 
373
  Notify the thread pool about a new connection
 
374
 
 
375
  NOTES
 
376
    LOCK_thread_count is locked on entry. This function MUST unlock it!
 
377
*/
 
378
 
 
379
static void libevent_add_connection(Session *session)
 
380
{
 
381
  if (session->scheduler.init(session))
 
382
  {
 
383
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
 
384
    pthread_mutex_unlock(&LOCK_thread_count);
 
385
    libevent_connection_close(session);
 
386
    return;
 
387
  }
 
388
  threads.append(session);
 
389
  libevent_session_add(session);
 
390
 
 
391
  pthread_mutex_unlock(&LOCK_thread_count);
 
392
  return;
 
393
}
 
394
 
 
395
 
 
396
/**
 
397
  @brief Signal a waiting connection it's time to die.
 
398
 
 
399
  @details This function will signal libevent the Session should be killed.
 
400
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
 
401
    upon entry.
 
402
 
 
403
  @param[in]  session The connection to kill
 
404
*/
 
405
 
 
406
static void libevent_post_kill_notification(Session *)
 
407
{
 
408
  /*
 
409
    Note, we just wake up libevent with an event that a Session should be killed,
 
410
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
 
411
    find the Sessions it should kill.
 
412
 
 
413
    So we don't actually tell it which one and we don't actually use the
 
414
    Session being passed to us, but that's just a design detail that could change
 
415
    later.
 
416
  */
 
417
  char c= 0;
 
418
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
 
419
}
 
420
 
 
421
 
 
422
/*
 
423
  Close and delete a connection.
 
424
*/
 
425
 
 
426
static void libevent_connection_close(Session *session)
 
427
{
 
428
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
 
429
 
 
430
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
 
431
  {
 
432
    end_connection(session);
 
433
    session->close_connection(0, 1);
 
434
  }
 
435
  session->scheduler.thread_detach();
 
436
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
 
437
  pthread_mutex_unlock(&LOCK_thread_count);
 
438
 
 
439
  return;
 
440
}
 
441
 
 
442
 
 
443
/*
 
444
  Returns true if we should close and delete a Session connection.
 
445
*/
 
446
 
 
447
static bool libevent_should_close_connection(Session* session)
 
448
{
 
449
  return net_should_close(&(session->net)) ||
 
450
         session->killed == Session::KILL_CONNECTION;
 
451
}
 
452
 
 
453
 
 
454
/*
 
455
  libevent_thread_proc is the outer loop of each thread in the thread pool.
 
456
  These procs only return/terminate on shutdown (kill_pool_threads == true).
 
457
*/
 
458
 
 
459
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
 
460
{
 
461
  if (init_new_connection_handler_thread())
 
462
  {
 
463
    my_thread_global_end();
 
464
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
 
465
    exit(1);
 
466
  }
 
467
 
 
468
  /*
 
469
    Signal libevent_init() when all threads has been created and are ready to
 
470
    receive events.
 
471
  */
 
472
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
473
  created_threads++;
 
474
  if (created_threads == thread_pool_size)
 
475
    (void) pthread_cond_signal(&COND_thread_count);
 
476
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
477
 
 
478
  for (;;)
 
479
  {
 
480
    Session *session= NULL;
 
481
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
482
 
 
483
    /* get session(s) to process */
 
484
    while (!sessions_need_processing)
 
485
    {
 
486
      if (kill_pool_threads)
 
487
      {
 
488
        /* the flag that we should die has been set */
 
489
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
490
        goto thread_exit;
 
491
      }
 
492
      event_loop(EVLOOP_ONCE);
 
493
    }
 
494
 
 
495
    /* pop the first session off the list */
 
496
    session= (Session*)sessions_need_processing->data;
 
497
    sessions_need_processing= list_delete(sessions_need_processing,
 
498
                                      sessions_need_processing);
 
499
 
 
500
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
501
 
 
502
    /* now we process the connection (session) */
 
503
 
 
504
    /* set up the session<->thread links. */
 
505
    session->thread_stack= (char*) &session;
 
506
 
 
507
    if (session->scheduler.thread_attach())
 
508
    {
 
509
      libevent_connection_close(session);
 
510
      continue;
 
511
    }
 
512
 
 
513
    /* is the connection logged in yet? */
 
514
    if (!session->scheduler.logged_in)
 
515
    {
 
516
      if (login_connection(session))
 
517
      {
 
518
        /* Failed to log in */
 
519
        libevent_connection_close(session);
 
520
        continue;
 
521
      }
 
522
      else
 
523
      {
 
524
        /* login successful */
 
525
        session->scheduler.logged_in= true;
 
526
        prepare_new_connection_state(session);
 
527
        if (!libevent_needs_immediate_processing(session))
 
528
          continue; /* New connection is now waiting for data in libevent*/
 
529
      }
 
530
    }
 
531
 
 
532
    do
 
533
    {
 
534
      /* Process a query */
 
535
      if (do_command(session))
 
536
      {
 
537
        libevent_connection_close(session);
 
538
        break;
 
539
      }
 
540
    } while (libevent_needs_immediate_processing(session));
 
541
  }
 
542
 
 
543
thread_exit:
 
544
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
545
  killed_threads++;
 
546
  pthread_cond_broadcast(&COND_thread_count);
 
547
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
548
  my_thread_end();
 
549
  pthread_exit(0);
 
550
  return(0);                               /* purify: deadcode */
 
551
}
 
552
 
 
553
 
 
554
/*
 
555
  Returns true if the connection needs immediate processing and false if
 
556
  instead it's queued for libevent processing or closed,
 
557
*/
 
558
 
 
559
static bool libevent_needs_immediate_processing(Session *session)
 
560
{
 
561
  if (libevent_should_close_connection(session))
 
562
  {
 
563
    libevent_connection_close(session);
 
564
    return false;
 
565
  }
 
566
  /*
 
567
    If more data in the socket buffer, return true to process another command.
 
568
 
 
569
    Note: we cannot add for event processing because the whole request might
 
570
    already be buffered and we wouldn't receive an event.
 
571
  */
 
572
  if (net_more_data(&(session->net)))
 
573
    return true;
 
574
 
 
575
  session->scheduler.thread_detach();
 
576
  libevent_session_add(session);
 
577
  return false;
 
578
}
 
579
 
 
580
 
 
581
/*
 
582
  Adds a Session to queued for libevent processing.
 
583
 
 
584
  This call does not actually register the event with libevent.
 
585
  Instead, it places the Session onto a queue and signals libevent by writing
 
586
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
 
587
  be invoked which will find the Session on the queue and add it to libevent.
 
588
*/
 
589
 
 
590
static void libevent_session_add(Session* session)
 
591
{
 
592
  char c=0;
 
593
  pthread_mutex_lock(&LOCK_session_add);
 
594
  /* queue for libevent */
 
595
  sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
 
596
  /* notify libevent */
 
597
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
598
  pthread_mutex_unlock(&LOCK_session_add);
 
599
}
 
600
 
 
601
 
 
602
/**
 
603
  Wait until all pool threads have been deleted for clean shutdown
 
604
*/
 
605
 
 
606
static void libevent_end()
 
607
{
 
608
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
609
 
 
610
  kill_pool_threads= true;
 
611
  while (killed_threads != created_threads)
 
612
  {
 
613
    /* wake up the event loop */
 
614
    char c= 0;
 
615
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
616
 
 
617
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
618
  }
 
619
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
620
 
 
621
  event_del(&session_add_event);
 
622
  close(session_add_pipe[0]);
 
623
  close(session_add_pipe[1]);
 
624
  event_del(&session_kill_event);
 
625
  close(session_kill_pipe[0]);
 
626
  close(session_kill_pipe[1]);
 
627
 
 
628
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
629
  (void) pthread_mutex_destroy(&LOCK_session_add);
 
630
  return;
 
631
}
 
632
 
 
633
 
 
634
void pool_of_threads_scheduler(scheduler_functions* func)
 
635
{
 
636
  func->max_threads= thread_pool_size;
 
637
  func->init= libevent_init;
 
638
  func->end=  libevent_end;
 
639
  func->post_kill_notification= libevent_post_kill_notification;
 
640
  func->add_connection= libevent_add_connection;
 
641
}