~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/scheduler.cc

Removed/replaced DBUG symbols and TRUE/FALSE

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2007 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Implementation for the thread scheduler
 
18
*/
 
19
 
 
20
#ifdef USE_PRAGMA_INTERFACE
 
21
#pragma implementation
 
22
#endif
 
23
 
 
24
#include <mysql_priv.h>
 
25
#include "event.h"
 
26
 
 
27
 
 
28
/*
 
29
  'Dummy' functions to be used when we don't need any handling for a scheduler
 
30
  event
 
31
 */
 
32
 
 
33
static bool init_dummy(void) {return 0;}
 
34
static void post_kill_dummy(THD *thd __attribute__((__unused__))) {}
 
35
static void end_dummy(void) {}
 
36
static bool end_thread_dummy(THD *thd __attribute__((__unused__)),
 
37
                             bool cache_thread __attribute__((__unused__)))
 
38
{ return 0; }
 
39
 
 
40
/*
 
41
  Initialize default scheduler with dummy functions so that setup functions
 
42
  only need to declare those that are relvant for their usage
 
43
*/
 
44
 
 
45
scheduler_functions::scheduler_functions()
 
46
  :init(init_dummy),
 
47
   init_new_connection_thread(init_new_connection_handler_thread),
 
48
   add_connection(0),                           // Must be defined
 
49
   post_kill_notification(post_kill_dummy),
 
50
   end_thread(end_thread_dummy), end(end_dummy)
 
51
{}
 
52
 
 
53
static uint created_threads, killed_threads;
 
54
static bool kill_pool_threads;
 
55
 
 
56
static struct event thd_add_event;
 
57
static struct event thd_kill_event;
 
58
 
 
59
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
 
60
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
 
61
 
 
62
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
63
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
64
 
 
65
/*
 
66
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
 
67
  event_del) and thds_need_processing and thds_waiting_for_io.
 
68
*/
 
69
static pthread_mutex_t LOCK_event_loop;
 
70
static LIST *thds_need_processing; /* list of thds that needs some processing */
 
71
static LIST *thds_waiting_for_io; /* list of thds with added events */
 
72
 
 
73
pthread_handler_t libevent_thread_proc(void *arg);
 
74
static void libevent_end();
 
75
static bool libevent_needs_immediate_processing(THD *thd);
 
76
static void libevent_connection_close(THD *thd);
 
77
static bool libevent_should_close_connection(THD* thd);
 
78
static void libevent_thd_add(THD* thd);
 
79
void libevent_io_callback(int Fd, short Operation, void *ctx);
 
80
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
 
81
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
 
82
 
 
83
 
 
84
/*
 
85
  Create a pipe and set to non-blocking.
 
86
  Returns true if there is an error.
 
87
*/
 
88
 
 
89
static bool init_pipe(int pipe_fds[])
 
90
{
 
91
  int flags;
 
92
  return pipe(pipe_fds) < 0 ||
 
93
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
 
94
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
 
95
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
 
96
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
 
97
}
 
98
 
 
99
 
 
100
/*
 
101
  thd_scheduler keeps the link between THD and events.
 
102
  It's embedded in the THD class.
 
103
*/
 
104
 
 
105
thd_scheduler::thd_scheduler()
 
106
  : logged_in(false), io_event(NULL), thread_attached(false)
 
107
{  
 
108
  dbug_explain_buf[0]= 0;
 
109
}
 
110
 
 
111
 
 
112
thd_scheduler::~thd_scheduler()
 
113
{
 
114
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
 
115
}
 
116
 
 
117
 
 
118
bool thd_scheduler::init(THD *parent_thd)
 
119
{
 
120
  io_event=
 
121
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
 
122
    
 
123
  if (!io_event)
 
124
  {
 
125
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
 
126
    return true;
 
127
  }
 
128
  
 
129
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
 
130
            libevent_io_callback, (void*)parent_thd);
 
131
    
 
132
  list.data= parent_thd;
 
133
  
 
134
  return false;
 
135
}
 
136
 
 
137
 
 
138
/*
 
139
  Attach/associate the connection with the OS thread, for command processing.
 
140
*/
 
141
 
 
142
bool thd_scheduler::thread_attach()
 
143
{
 
144
  assert(!thread_attached);
 
145
  THD* thd = (THD*)list.data;
 
146
  if (libevent_should_close_connection(thd) ||
 
147
      setup_connection_thread_globals(thd))
 
148
  {
 
149
    return true;
 
150
  }
 
151
  my_errno= 0;
 
152
  thd->mysys_var->abort= 0;
 
153
  thread_attached= true;
 
154
  swap_dbug_explain();
 
155
  return false;
 
156
}
 
157
 
 
158
 
 
159
/*
 
160
  Detach/disassociate the connection with the OS thread.
 
161
*/
 
162
 
 
163
void thd_scheduler::thread_detach()
 
164
{
 
165
  if (thread_attached)
 
166
  {
 
167
    THD* thd = (THD*)list.data;
 
168
    thd->mysys_var= NULL;
 
169
    thread_attached= false;
 
170
    swap_dbug_explain();
 
171
  }
 
172
}
 
173
 
 
174
 
 
175
/*
 
176
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
 
177
 
 
178
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
 
179
  thread during a command, but each command is handled by a different thread.
 
180
*/
 
181
void thd_scheduler::swap_dbug_explain()
 
182
{
 
183
  char buffer[sizeof(dbug_explain_buf)];
 
184
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
 
185
}
 
186
 
 
187
/**
 
188
  Create all threads for the thread pool
 
189
 
 
190
  NOTES
 
191
    After threads are created we wait until all threads has signaled that
 
192
    they have started before we return
 
193
 
 
194
  RETURN
 
195
    0  ok
 
196
    1  We got an error creating the thread pool
 
197
       In this case we will abort all created threads
 
198
*/
 
199
 
 
200
static bool libevent_init(void)
 
201
{
 
202
  uint i;
 
203
 
 
204
  event_init();
 
205
  
 
206
  created_threads= 0;
 
207
  killed_threads= 0;
 
208
  kill_pool_threads= false;
 
209
  
 
210
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
211
  pthread_mutex_init(&LOCK_thd_add, NULL);
 
212
  
 
213
  /* set up the pipe used to add new thds to the event pool */
 
214
  if (init_pipe(thd_add_pipe))
 
215
  {
 
216
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
 
217
    return(1);
 
218
  }
 
219
  /* set up the pipe used to kill thds in the event queue */
 
220
  if (init_pipe(thd_kill_pipe))
 
221
  {
 
222
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
 
223
    close(thd_add_pipe[0]);
 
224
    close(thd_add_pipe[1]);
 
225
    return(1);
 
226
  }
 
227
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
 
228
            libevent_add_thd_callback, NULL);
 
229
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
 
230
            libevent_kill_thd_callback, NULL);
 
231
 
 
232
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
 
233
 {
 
234
   sql_print_error("thd_add_event event_add error in libevent_init\n");
 
235
   libevent_end();
 
236
   return(1);
 
237
   
 
238
 }
 
239
  /* Set up the thread pool */
 
240
  created_threads= killed_threads= 0;
 
241
  pthread_mutex_lock(&LOCK_thread_count);
 
242
 
 
243
  for (i= 0; i < thread_pool_size; i++)
 
244
  {
 
245
    pthread_t thread;
 
246
    int error;
 
247
    if ((error= pthread_create(&thread, &connection_attrib,
 
248
                               libevent_thread_proc, 0)))
 
249
    {
 
250
      sql_print_error("Can't create completion port thread (error %d)",
 
251
                      error);
 
252
      pthread_mutex_unlock(&LOCK_thread_count);
 
253
      libevent_end();                      // Cleanup
 
254
      return(true);
 
255
    }
 
256
  }
 
257
 
 
258
  /* Wait until all threads are created */
 
259
  while (created_threads != thread_pool_size)
 
260
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
261
  pthread_mutex_unlock(&LOCK_thread_count);
 
262
  
 
263
  return(false);
 
264
}
 
265
 
 
266
 
 
267
/*
 
268
  This is called when data is ready on the socket.
 
269
  
 
270
  NOTES
 
271
    This is only called by the thread that owns LOCK_event_loop.
 
272
  
 
273
    We add the thd that got the data to thds_need_processing, and 
 
274
    cause the libevent event_loop() to terminate. Then this same thread will
 
275
    return from event_loop and pick the thd value back up for processing.
 
276
*/
 
277
 
 
278
void libevent_io_callback(int, short, void *ctx)
 
279
{    
 
280
  safe_mutex_assert_owner(&LOCK_event_loop);
 
281
  THD *thd= (THD*)ctx;
 
282
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
 
283
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
 
284
}
 
285
 
 
286
/*
 
287
  This is called when we have a thread we want to be killed.
 
288
  
 
289
  NOTES
 
290
    This is only called by the thread that owns LOCK_event_loop.
 
291
*/
 
292
 
 
293
void libevent_kill_thd_callback(int Fd, short, void*)
 
294
{    
 
295
  safe_mutex_assert_owner(&LOCK_event_loop);
 
296
 
 
297
  /* clear the pending events */
 
298
  char c;
 
299
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
300
  {}
 
301
 
 
302
  LIST* list= thds_waiting_for_io;
 
303
  while (list)
 
304
  {
 
305
    THD *thd= (THD*)list->data;
 
306
    list= list_rest(list);
 
307
    if (thd->killed == THD::KILL_CONNECTION)
 
308
    {
 
309
      /*
 
310
        Delete from libevent and add to the processing queue.
 
311
      */
 
312
      event_del(thd->scheduler.io_event);
 
313
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
 
314
                                       &thd->scheduler.list);
 
315
      thds_need_processing= list_add(thds_need_processing,
 
316
                                     &thd->scheduler.list);
 
317
    }
 
318
  }
 
319
}
 
320
 
 
321
 
 
322
/*
 
323
  This is used to add connections to the pool. This callback is invoked from
 
324
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
 
325
  written to it.
 
326
  
 
327
  NOTES
 
328
    This is only called by the thread that owns LOCK_event_loop.
 
329
*/
 
330
 
 
331
void libevent_add_thd_callback(int Fd, short, void *)
 
332
 
333
  safe_mutex_assert_owner(&LOCK_event_loop);
 
334
 
 
335
  /* clear the pending events */
 
336
  char c;
 
337
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
338
  {}
 
339
 
 
340
  pthread_mutex_lock(&LOCK_thd_add);
 
341
  while (thds_need_adding)
 
342
  {
 
343
    /* pop the first thd off the list */
 
344
    THD* thd= (THD*)thds_need_adding->data;
 
345
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
 
346
 
 
347
    pthread_mutex_unlock(&LOCK_thd_add);
 
348
    
 
349
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
 
350
    {
 
351
      /*
 
352
        Add thd to thds_need_processing list. If it needs closing we'll close
 
353
        it outside of event_loop().
 
354
      */
 
355
      thds_need_processing= list_add(thds_need_processing,
 
356
                                     &thd->scheduler.list);
 
357
    }
 
358
    else
 
359
    {
 
360
      /* Add to libevent */
 
361
      if (event_add(thd->scheduler.io_event, NULL))
 
362
      {
 
363
        sql_print_error("event_add error in libevent_add_thd_callback\n");
 
364
        libevent_connection_close(thd);
 
365
      } 
 
366
      else
 
367
      {
 
368
        thds_waiting_for_io= list_add(thds_waiting_for_io,
 
369
                                      &thd->scheduler.list);
 
370
      }
 
371
    }
 
372
    pthread_mutex_lock(&LOCK_thd_add);
 
373
  }
 
374
  pthread_mutex_unlock(&LOCK_thd_add);
 
375
}
 
376
 
 
377
 
 
378
/**
 
379
  Notify the thread pool about a new connection
 
380
 
 
381
  NOTES
 
382
    LOCK_thread_count is locked on entry. This function MUST unlock it!
 
383
*/
 
384
 
 
385
static void libevent_add_connection(THD *thd)
 
386
{
 
387
  if (thd->scheduler.init(thd))
 
388
  {
 
389
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
 
390
    pthread_mutex_unlock(&LOCK_thread_count);
 
391
    libevent_connection_close(thd);
 
392
    return;
 
393
  }
 
394
  threads.append(thd);
 
395
  libevent_thd_add(thd);
 
396
  
 
397
  pthread_mutex_unlock(&LOCK_thread_count);
 
398
  return;
 
399
}
 
400
 
 
401
 
 
402
/**
 
403
  @brief Signal a waiting connection it's time to die.
 
404
 
 
405
  @details This function will signal libevent the THD should be killed.
 
406
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
 
407
    upon entry.
 
408
 
 
409
  @param[in]  thd The connection to kill
 
410
*/
 
411
 
 
412
static void libevent_post_kill_notification(THD *)
 
413
{
 
414
  /*
 
415
    Note, we just wake up libevent with an event that a THD should be killed,
 
416
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
 
417
    find the THDs it should kill.
 
418
    
 
419
    So we don't actually tell it which one and we don't actually use the
 
420
    THD being passed to us, but that's just a design detail that could change
 
421
    later.
 
422
  */
 
423
  char c= 0;
 
424
  write(thd_kill_pipe[1], &c, sizeof(c));
 
425
}
 
426
 
 
427
 
 
428
/*
 
429
  Close and delete a connection.
 
430
*/
 
431
 
 
432
static void libevent_connection_close(THD *thd)
 
433
{
 
434
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
 
435
 
 
436
  if (thd->net.vio->sd >= 0)                  // not already closed
 
437
  {
 
438
    end_connection(thd);
 
439
    close_connection(thd, 0, 1);
 
440
  }
 
441
  thd->scheduler.thread_detach();
 
442
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
 
443
  pthread_mutex_unlock(&LOCK_thread_count);
 
444
 
 
445
  return;
 
446
}
 
447
 
 
448
 
 
449
/*
 
450
  Returns true if we should close and delete a THD connection.
 
451
*/
 
452
 
 
453
static bool libevent_should_close_connection(THD* thd)
 
454
{
 
455
  return thd->net.error ||
 
456
         thd->net.vio == 0 ||
 
457
         thd->killed == THD::KILL_CONNECTION;
 
458
}
 
459
 
 
460
 
 
461
/*
 
462
  libevent_thread_proc is the outer loop of each thread in the thread pool.
 
463
  These procs only return/terminate on shutdown (kill_pool_threads == true).
 
464
*/
 
465
 
 
466
pthread_handler_t libevent_thread_proc(void *arg __attribute__((__unused__)))
 
467
{
 
468
  if (init_new_connection_handler_thread())
 
469
  {
 
470
    my_thread_global_end();
 
471
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
 
472
    exit(1);
 
473
  }
 
474
 
 
475
  /*
 
476
    Signal libevent_init() when all threads has been created and are ready to
 
477
    receive events.
 
478
  */
 
479
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
480
  created_threads++;
 
481
  if (created_threads == thread_pool_size)
 
482
    (void) pthread_cond_signal(&COND_thread_count);
 
483
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
484
  
 
485
  for (;;)
 
486
  {
 
487
    THD *thd= NULL;
 
488
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
489
    
 
490
    /* get thd(s) to process */
 
491
    while (!thds_need_processing)
 
492
    {
 
493
      if (kill_pool_threads)
 
494
      {
 
495
        /* the flag that we should die has been set */
 
496
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
497
        goto thread_exit;
 
498
      }
 
499
      event_loop(EVLOOP_ONCE);
 
500
    }
 
501
    
 
502
    /* pop the first thd off the list */
 
503
    thd= (THD*)thds_need_processing->data;
 
504
    thds_need_processing= list_delete(thds_need_processing,
 
505
                                      thds_need_processing);
 
506
    
 
507
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
508
    
 
509
    /* now we process the connection (thd) */
 
510
    
 
511
    /* set up the thd<->thread links. */
 
512
    thd->thread_stack= (char*) &thd;
 
513
    
 
514
    if (thd->scheduler.thread_attach())
 
515
    {
 
516
      libevent_connection_close(thd);
 
517
      continue;
 
518
    }
 
519
 
 
520
    /* is the connection logged in yet? */
 
521
    if (!thd->scheduler.logged_in)
 
522
    {
 
523
      if (login_connection(thd))
 
524
      {
 
525
        /* Failed to log in */
 
526
        libevent_connection_close(thd);
 
527
        continue;
 
528
      }
 
529
      else
 
530
      {
 
531
        /* login successful */
 
532
        thd->scheduler.logged_in= true;
 
533
        prepare_new_connection_state(thd);
 
534
        if (!libevent_needs_immediate_processing(thd))
 
535
          continue; /* New connection is now waiting for data in libevent*/
 
536
      }
 
537
    }
 
538
 
 
539
    do
 
540
    {
 
541
      /* Process a query */
 
542
      if (do_command(thd))
 
543
      {
 
544
        libevent_connection_close(thd);
 
545
        break;
 
546
      }
 
547
    } while (libevent_needs_immediate_processing(thd));
 
548
  }
 
549
  
 
550
thread_exit:
 
551
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
552
  killed_threads++;
 
553
  pthread_cond_broadcast(&COND_thread_count);
 
554
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
555
  my_thread_end();
 
556
  pthread_exit(0);
 
557
  return(0);                               /* purify: deadcode */
 
558
}
 
559
 
 
560
 
 
561
/*
 
562
  Returns true if the connection needs immediate processing and false if 
 
563
  instead it's queued for libevent processing or closed,
 
564
*/
 
565
 
 
566
static bool libevent_needs_immediate_processing(THD *thd)
 
567
{
 
568
  if (libevent_should_close_connection(thd))
 
569
  {
 
570
    libevent_connection_close(thd);
 
571
    return false;
 
572
  }
 
573
  /*
 
574
    If more data in the socket buffer, return true to process another command.
 
575
 
 
576
    Note: we cannot add for event processing because the whole request might
 
577
    already be buffered and we wouldn't receive an event.
 
578
  */
 
579
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
 
580
    return true;
 
581
  
 
582
  thd->scheduler.thread_detach();
 
583
  libevent_thd_add(thd);
 
584
  return false;
 
585
}
 
586
 
 
587
 
 
588
/*
 
589
  Adds a THD to queued for libevent processing.
 
590
  
 
591
  This call does not actually register the event with libevent.
 
592
  Instead, it places the THD onto a queue and signals libevent by writing
 
593
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
 
594
  be invoked which will find the THD on the queue and add it to libevent.
 
595
*/
 
596
 
 
597
static void libevent_thd_add(THD* thd)
 
598
{
 
599
  char c=0;
 
600
  pthread_mutex_lock(&LOCK_thd_add);
 
601
  /* queue for libevent */
 
602
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
 
603
  /* notify libevent */
 
604
  write(thd_add_pipe[1], &c, sizeof(c));
 
605
  pthread_mutex_unlock(&LOCK_thd_add);
 
606
}
 
607
 
 
608
 
 
609
/**
 
610
  Wait until all pool threads have been deleted for clean shutdown
 
611
*/
 
612
 
 
613
static void libevent_end()
 
614
{
 
615
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
616
  
 
617
  kill_pool_threads= true;
 
618
  while (killed_threads != created_threads)
 
619
  {
 
620
    /* wake up the event loop */
 
621
    char c= 0;
 
622
    write(thd_add_pipe[1], &c, sizeof(c));
 
623
 
 
624
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
625
  }
 
626
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
627
  
 
628
  event_del(&thd_add_event);
 
629
  close(thd_add_pipe[0]);
 
630
  close(thd_add_pipe[1]);
 
631
  event_del(&thd_kill_event);
 
632
  close(thd_kill_pipe[0]);
 
633
  close(thd_kill_pipe[1]);
 
634
 
 
635
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
636
  (void) pthread_mutex_destroy(&LOCK_thd_add);
 
637
  return;
 
638
}
 
639
 
 
640
 
 
641
void pool_of_threads_scheduler(scheduler_functions* func)
 
642
{
 
643
  func->max_threads= thread_pool_size;
 
644
  func->init= libevent_init;
 
645
  func->end=  libevent_end;
 
646
  func->post_kill_notification= libevent_post_kill_notification;
 
647
  func->add_connection= libevent_add_connection;
 
648
}