~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/listen.cc

  • Committer: devananda
  • Date: 2009-06-30 14:27:54 UTC
  • mfrom: (1030.2.4 trunk)
  • mto: (1093.1.7 captain)
  • mto: This revision was merged to the branch mainline in revision 1095.
  • Revision ID: devananda.vdv@gmail.com-20090630142754-vm9w374yxkf1pikc
mergeĀ fromĀ lp

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2008 Sun Microsystems
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; version 2 of the License.
 
9
 *
 
10
 *  This program is distributed in the hope that it will be useful,
 
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 *  GNU General Public License for more details.
 
14
 *
 
15
 *  You should have received a copy of the GNU General Public License
 
16
 *  along with this program; if not, write to the Free Software
 
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 */
 
19
 
 
20
#include <drizzled/server_includes.h>
 
21
#include <drizzled/listen.h>
 
22
#include "drizzled/plugin_registry.h"
 
23
#include <drizzled/gettext.h>
 
24
#include <drizzled/error.h>
 
25
 
 
26
#include <netdb.h>
 
27
#include <netinet/tcp.h>
 
28
#include <poll.h>
 
29
 
 
30
using namespace std;
 
31
 
 
32
/* This is needed for the plugin registry interface. */
 
33
static ListenHandler *_default_listen_handler= NULL;
 
34
 
 
35
ListenHandler::ListenHandler(): fd_list(NULL), fd_count(0)
 
36
{
 
37
  /* Don't allow more than one ListenHandler to be created for now. */
 
38
  assert(_default_listen_handler == NULL);
 
39
  _default_listen_handler= this;
 
40
}
 
41
 
 
42
ListenHandler::~ListenHandler()
 
43
{
 
44
  if (fd_list != NULL)
 
45
    free(fd_list);
 
46
 
 
47
  assert(_default_listen_handler == this);
 
48
  _default_listen_handler= NULL;
 
49
}
 
50
 
 
51
void ListenHandler::addListen(const Listen &listen_obj)
 
52
{
 
53
  listen_list.push_back(&listen_obj);
 
54
}
 
55
 
 
56
void ListenHandler::removeListen(const Listen &listen_obj)
 
57
{
 
58
  listen_list.erase(remove(listen_list.begin(),
 
59
                           listen_list.end(),
 
60
                           &listen_obj),
 
61
                    listen_list.end());
 
62
}
 
63
 
 
64
bool ListenHandler::bindAll(const char *host, uint32_t bind_timeout)
 
65
{
 
66
  vector<const Listen *>::iterator it;
 
67
  int ret;
 
68
  char host_buf[NI_MAXHOST];
 
69
  char port_buf[NI_MAXSERV];
 
70
  struct addrinfo hints;
 
71
  struct addrinfo *ai;
 
72
  struct addrinfo *ai_list;
 
73
  int fd= -1;
 
74
  uint32_t waited;
 
75
  uint32_t this_wait;
 
76
  uint32_t retry;
 
77
  struct linger ling= {0, 0};
 
78
  int flags= 1;
 
79
  struct pollfd *tmp_fd_list;
 
80
 
 
81
  for (it= listen_list.begin(); it < listen_list.end(); ++it)
 
82
  {
 
83
    memset(&hints, 0, sizeof(struct addrinfo));
 
84
    hints.ai_flags= AI_PASSIVE;
 
85
    hints.ai_socktype= SOCK_STREAM;
 
86
 
 
87
    snprintf(port_buf, NI_MAXSERV, "%d", (*it)->getPort());
 
88
    ret= getaddrinfo(host, port_buf, &hints, &ai_list);
 
89
    if (ret != 0)
 
90
    {
 
91
      errmsg_printf(ERRMSG_LVL_ERROR, _("getaddrinfo() failed with error %s"),
 
92
                    gai_strerror(ret));
 
93
      return true;
 
94
    }
 
95
 
 
96
    for (ai= ai_list; ai != NULL; ai= ai->ai_next)
 
97
    {
 
98
      ret= getnameinfo(ai->ai_addr, ai->ai_addrlen, host_buf, NI_MAXHOST,
 
99
                       port_buf, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
 
100
      if (ret != 0)
 
101
      { 
 
102
        strcpy(host_buf, "-");
 
103
        strcpy(port_buf, "-");
 
104
      }
 
105
 
 
106
      fd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
 
107
      if (fd == -1)
 
108
      {
 
109
        /*
 
110
          Call to socket() can fail for some getaddrinfo results, try another.
 
111
        */
 
112
        continue;
 
113
      }
 
114
 
 
115
#ifdef IPV6_V6ONLY
 
116
      if (ai->ai_family == AF_INET6)
 
117
      {
 
118
        flags= 1;
 
119
        ret= setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &flags, sizeof(flags));
 
120
        if (ret != 0)
 
121
        {
 
122
          errmsg_printf(ERRMSG_LVL_ERROR,
 
123
                        _("setsockopt(IPV6_V6ONLY) failed with errno %d"),
 
124
                        errno);
 
125
          return true;
 
126
        }
 
127
      }
 
128
#endif
 
129
 
 
130
      ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
 
131
      if (ret != 0)
 
132
      {
 
133
        errmsg_printf(ERRMSG_LVL_ERROR,
 
134
                      _("setsockopt(SO_REUSEADDR) failed with errno %d"),
 
135
                      errno);
 
136
        return true;
 
137
      }
 
138
 
 
139
      ret= setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
 
140
      if (ret != 0)
 
141
      {
 
142
        errmsg_printf(ERRMSG_LVL_ERROR,
 
143
                      _("setsockopt(SO_KEEPALIVE) failed with errno %d"),
 
144
                      errno);
 
145
        return true;
 
146
      }
 
147
 
 
148
      ret= setsockopt(fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
 
149
      if (ret != 0)
 
150
      {
 
151
        errmsg_printf(ERRMSG_LVL_ERROR,
 
152
                      _("setsockopt(SO_LINGER) failed with errno %d"),
 
153
                      errno);
 
154
        return true;
 
155
      }
 
156
 
 
157
      ret= setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
 
158
      if (ret != 0)
 
159
      {
 
160
        errmsg_printf(ERRMSG_LVL_ERROR,
 
161
                      _("setsockopt(TCP_NODELAY) failed with errno %d"),
 
162
                      errno);
 
163
        return true;
 
164
      }
 
165
 
 
166
      /*
 
167
        Sometimes the port is not released fast enough when stopping and
 
168
        restarting the server. This happens quite often with the test suite
 
169
        on busy Linux systems. Retry to bind the address at these intervals:
 
170
        Sleep intervals: 1, 2, 4,  6,  9, 13, 17, 22, ...
 
171
        Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ...
 
172
        Limit the sequence by bind_timeout.
 
173
      */
 
174
      for (waited= 0, retry= 1; ; retry++, waited+= this_wait)
 
175
      {
 
176
        if (((ret= ::bind(fd, ai->ai_addr, ai->ai_addrlen)) == 0) ||
 
177
            (errno != EADDRINUSE) || (waited >= bind_timeout))
 
178
        {
 
179
          break;
 
180
        }
 
181
 
 
182
        errmsg_printf(ERRMSG_LVL_INFO, _("Retrying bind() on %u"),
 
183
                      (*it)->getPort());
 
184
        this_wait= retry * retry / 3 + 1;
 
185
        sleep(this_wait);
 
186
      }
 
187
 
 
188
      if (ret < 0)
 
189
      {
 
190
        errmsg_printf(ERRMSG_LVL_ERROR, _("bind() failed with errno: %d"),
 
191
                      errno);
 
192
        errmsg_printf(ERRMSG_LVL_ERROR,
 
193
                      _("Do you already have another drizzled running?"));
 
194
        return true;
 
195
      }
 
196
 
 
197
      if (listen(fd, (int) back_log) < 0)
 
198
      {
 
199
        errmsg_printf(ERRMSG_LVL_ERROR,
 
200
                      _("listen() failed with errno %d"), errno);
 
201
        return true;
 
202
      }
 
203
 
 
204
      tmp_fd_list= (struct pollfd *)realloc(fd_list,
 
205
                                        sizeof(struct pollfd) * (fd_count + 1));
 
206
      if (tmp_fd_list == NULL)
 
207
      {
 
208
        errmsg_printf(ERRMSG_LVL_ERROR, _("realloc() failed with errno %d"),
 
209
                      errno);
 
210
        return true;
 
211
      }
 
212
 
 
213
      fd_list= tmp_fd_list;
 
214
      fd_list[fd_count].fd= fd;
 
215
      fd_list[fd_count].events= POLLIN | POLLERR;
 
216
      listen_fd_list.push_back(*it);
 
217
      fd_count++;
 
218
 
 
219
      errmsg_printf(ERRMSG_LVL_INFO, _("Listening on %s:%s\n"), host_buf,
 
220
                    port_buf);
 
221
    }
 
222
  }
 
223
 
 
224
  if (fd_count == 0)
 
225
  {
 
226
    errmsg_printf(ERRMSG_LVL_ERROR,
 
227
                  _("No sockets could be bound for listening"));
 
228
    return true;
 
229
  }
 
230
 
 
231
  freeaddrinfo(ai_list);
 
232
 
 
233
  /*
 
234
    We need a pipe to wakeup the listening thread since some operating systems
 
235
    are stupid. *cough* OSX *cough*
 
236
  */
 
237
  if (pipe(wakeup_pipe) == -1)
 
238
  {
 
239
    errmsg_printf(ERRMSG_LVL_ERROR, _("pipe() failed with errno %d"), errno);
 
240
    return true;
 
241
  }
 
242
 
 
243
  tmp_fd_list= (struct pollfd *)realloc(fd_list,
 
244
                                        sizeof(struct pollfd) * (fd_count + 1));
 
245
  if (tmp_fd_list == NULL)
 
246
  {
 
247
    errmsg_printf(ERRMSG_LVL_ERROR, _("realloc() failed with errno %d"), errno);
 
248
    return true;
 
249
  }
 
250
 
 
251
  fd_list= tmp_fd_list;
 
252
  fd_list[fd_count].fd= wakeup_pipe[0];
 
253
  fd_list[fd_count].events= POLLIN | POLLERR;
 
254
  fd_count++;
 
255
 
 
256
  return false;
 
257
}
 
258
 
 
259
Protocol *ListenHandler::getProtocol(void) const
 
260
{
 
261
  int ready;
 
262
  uint32_t x;
 
263
  uint32_t retry;
 
264
  int fd;
 
265
  Protocol *protocol;
 
266
  uint32_t error_count= 0;
 
267
 
 
268
  while (1)
 
269
  {
 
270
    ready= poll(fd_list, fd_count, -1);
 
271
    if (ready == -1)
 
272
    {
 
273
      if (errno != EINTR)
 
274
      {
 
275
        errmsg_printf(ERRMSG_LVL_ERROR, _("poll() failed with errno %d"),
 
276
                      errno);
 
277
      }
 
278
 
 
279
      continue;
 
280
    }
 
281
    else if (ready == 0)
 
282
      continue;
 
283
 
 
284
    for (x= 0; x < fd_count; x++)
 
285
    {
 
286
      if (fd_list[x].revents != POLLIN)
 
287
        continue;
 
288
 
 
289
      /* Check to see if the wakeup_pipe was written to. */
 
290
      if (x == fd_count - 1)
 
291
      {
 
292
        /* Close all file descriptors now. */
 
293
        for (x= 0; x < fd_count; x++)
 
294
        {
 
295
          (void) shutdown(fd_list[x].fd, SHUT_RDWR);
 
296
          (void) close(fd_list[x].fd);
 
297
          fd_list[x].fd= -1;
 
298
        }
 
299
 
 
300
        /* wakeup_pipe[0] was closed in the for loop above. */
 
301
        (void) close(wakeup_pipe[1]);
 
302
 
 
303
        return NULL;
 
304
      }
 
305
 
 
306
      for (retry= 0; retry < MAX_ACCEPT_RETRY; retry++)
 
307
      {
 
308
        fd= accept(fd_list[x].fd, NULL, 0);
 
309
        if (fd != -1 || (errno != EINTR && errno != EAGAIN))
 
310
          break;
 
311
      }
 
312
 
 
313
      if (fd == -1)
 
314
      {
 
315
        if ((error_count++ & 255) == 0)
 
316
        {
 
317
          errmsg_printf(ERRMSG_LVL_ERROR, _("accept() failed with errno %d"),
 
318
                        errno);
 
319
        }
 
320
 
 
321
        if (errno == ENFILE || errno == EMFILE)
 
322
          sleep(1);
 
323
 
 
324
        continue;
 
325
      }
 
326
 
 
327
      if (!(protocol= listen_fd_list[x]->protocolFactory()))
 
328
      {
 
329
        (void) shutdown(fd, SHUT_RDWR);
 
330
        close(fd);
 
331
        continue;
 
332
      }
 
333
 
 
334
      if (protocol->setFileDescriptor(fd))
 
335
      {
 
336
        (void) shutdown(fd, SHUT_RDWR);
 
337
        close(fd);
 
338
        delete protocol;
 
339
        continue;
 
340
      }
 
341
 
 
342
      return protocol;
 
343
    }
 
344
  }
 
345
}
 
346
 
 
347
Protocol *ListenHandler::getTmpProtocol(void) const
 
348
{
 
349
  assert(listen_list.size() > 0);
 
350
  return listen_list[0]->protocolFactory();
 
351
}
 
352
 
 
353
void ListenHandler::wakeup(void)
 
354
{
 
355
  ssize_t ret= write(wakeup_pipe[1], "\0", 1);
 
356
  assert(ret == 1);
 
357
}
 
358
 
 
359
void add_listen(const Listen &listen_obj)
 
360
{
 
361
  assert(_default_listen_handler != NULL);
 
362
  _default_listen_handler->addListen(listen_obj);
 
363
}
 
364
 
 
365
void remove_listen(const Listen &listen_obj)
 
366
{
 
367
  assert(_default_listen_handler != NULL);
 
368
  _default_listen_handler->removeListen(listen_obj);
 
369
}
 
370
 
 
371
void listen_abort(void)
 
372
{
 
373
  assert(_default_listen_handler != NULL);
 
374
  _default_listen_handler->wakeup();
 
375
}