~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/drizzled.cc

  • Committer: Brian Aker
  • Date: 2008-07-29 07:26:35 UTC
  • Revision ID: brian@tangent.org-20080729072635-vlzlrd02sg54ad3u
This is the second pass through the code. Mainly to get the socket loop
finished. Next pass will push out poll() and move to the direct libeven
queue.

Show diffs side-by-side

added added

removed removed

Lines of Context:
263
263
bool opt_safe_user_create = 0;
264
264
bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
265
265
bool opt_log_slave_updates= 0;
266
 
static fd_set clientFDs;
 
266
static struct pollfd fds[UINT8_MAX];
 
267
static uint8_t pollfd_count= 0;
267
268
 
268
269
/*
269
270
  Legacy global handlerton. These will be removed (please do not add more).
468
469
static char **defaults_argv;
469
470
static char *opt_bin_logname;
470
471
 
471
 
static my_socket ip_sock;
472
472
struct rand_struct sql_rand; ///< used by sql_class.cc:THD::THD()
473
473
 
474
474
struct passwd *user_info;
557
557
 
558
558
 
559
559
  /* Abort listening to new connections */
560
 
  if (ip_sock != INVALID_SOCKET)
561
560
  {
562
 
    (void) shutdown(ip_sock, SHUT_RDWR);
563
 
    (void) closesocket(ip_sock);
564
 
    ip_sock= INVALID_SOCKET;
 
561
    int x;
 
562
    
 
563
    for (x= 0; x < pollfd_count; x++)
 
564
    {
 
565
      if (fds[x].fd != INVALID_SOCKET)
 
566
      {
 
567
        (void) shutdown(fds[x].fd, SHUT_RDWR);
 
568
        (void) closesocket(fds[x].fd);
 
569
        fds[x].fd= INVALID_SOCKET;
 
570
      }
 
571
    }
565
572
  }
566
573
 
567
574
  end_thr_alarm(0);                      // Abort old alarms.
644
651
static void close_server_sock()
645
652
{
646
653
#ifdef HAVE_CLOSE_SERVER_SOCK
647
 
  my_socket tmp_sock;
648
 
  tmp_sock=ip_sock;
649
 
  if (tmp_sock != INVALID_SOCKET)
650
654
  {
651
 
    ip_sock=INVALID_SOCKET;
652
 
    VOID(shutdown(tmp_sock, SHUT_RDWR));
 
655
    int x;
 
656
    
 
657
    for (x= 0; x < pollfd_count; x++)
 
658
    {
 
659
      if (fds[x].fd != INVALID_SOCKET)
 
660
      {
 
661
        (void) shutdown(fds[x].fd, SHUT_RDWR);
 
662
        (void) closesocket(fds[x].fd);
 
663
        fds[x].fd= INVALID_SOCKET;
 
664
      }
 
665
    }
653
666
  }
654
 
  return;;
655
667
#endif
656
668
}
657
669
 
946
958
      line options.
947
959
    */
948
960
 
949
 
#if MYSQL_PORT_DEFAULT == 0
950
961
    struct  servent *serv_ptr;
951
962
    if ((serv_ptr= getservbyname("mysql", "tcp")))
952
963
      mysqld_port= ntohs((u_short) serv_ptr->s_port); /* purecov: inspected */
953
 
#endif
 
964
 
954
965
    if ((env = getenv("MYSQL_TCP_PORT")))
955
966
      mysqld_port= (uint) atoi(env);            /* purecov: inspected */
 
967
 
 
968
    assert(mysqld_port);
956
969
  }
957
970
}
958
971
 
1077
1090
 
1078
1091
static void network_init(void)
1079
1092
{
1080
 
  int   arg;
1081
1093
  int   ret;
1082
1094
  uint  waited;
1083
1095
  uint  this_wait;
1084
1096
  uint  retry;
1085
1097
  char port_buf[NI_MAXSERV];
 
1098
  struct addrinfo *ai;
 
1099
  struct addrinfo *next;
 
1100
  struct addrinfo hints;
 
1101
  int error;
1086
1102
 
1087
1103
  if (thread_scheduler.init())
1088
1104
    unireg_abort(1);                    /* purecov: inspected */
1089
1105
 
1090
1106
  set_ports();
1091
1107
 
1092
 
  FD_ZERO(&clientFDs);
1093
 
  if (mysqld_port != 0)
1094
 
  {
1095
 
    struct addrinfo *ai;
1096
 
    struct addrinfo hints;
1097
 
    int error;
1098
 
 
1099
 
    bzero(&hints, sizeof (hints));
1100
 
    hints.ai_flags= AI_PASSIVE;
1101
 
    hints.ai_socktype= SOCK_STREAM;
1102
 
    hints.ai_family= AF_UNSPEC;
1103
 
 
1104
 
    snprintf(port_buf, NI_MAXSERV, "%d", mysqld_port);
1105
 
    error= getaddrinfo(my_bind_addr_str, port_buf, &hints, &ai);
1106
 
    if (error != 0)
1107
 
    {
1108
 
      sql_perror(ER(ER_IPSOCK_ERROR));          /* purecov: tested */
1109
 
      unireg_abort(1);                          /* purecov: tested */
1110
 
    }
1111
 
 
1112
 
 
1113
 
    ip_sock= socket(ai->ai_family, ai->ai_socktype,
1114
 
                    ai->ai_protocol);
1115
 
 
 
1108
  memset(fds, 0, sizeof(struct pollfd) * UINT8_MAX);
 
1109
  memset(&hints, 0, sizeof (hints));
 
1110
  hints.ai_flags= AI_PASSIVE;
 
1111
  hints.ai_socktype= SOCK_STREAM;
 
1112
  hints.ai_family= AF_INET;
 
1113
  hints.ai_protocol= IPPROTO_TCP;
 
1114
 
 
1115
  snprintf(port_buf, NI_MAXSERV, "%d", mysqld_port);
 
1116
  error= getaddrinfo(my_bind_addr_str, port_buf, &hints, &ai);
 
1117
  if (error != 0)
 
1118
  {
 
1119
    sql_perror(ER(ER_IPSOCK_ERROR));            /* purecov: tested */
 
1120
    unireg_abort(1);                            /* purecov: tested */
 
1121
  }
 
1122
 
 
1123
  for (next= ai, pollfd_count= 0; next; next= next->ai_next, pollfd_count++)
 
1124
  {
 
1125
    int ip_sock;
 
1126
 
 
1127
    ip_sock= socket(next->ai_family, next->ai_socktype, next->ai_protocol);
1116
1128
    if (ip_sock == INVALID_SOCKET)
1117
1129
    {
1118
1130
      sql_perror(ER(ER_IPSOCK_ERROR));          /* purecov: tested */
1119
1131
      unireg_abort(1);                          /* purecov: tested */
1120
1132
    }
1121
1133
 
1122
 
    /*
1123
 
      We should not use SO_REUSEADDR on windows as this would enable a
1124
 
      user to open two mysqld servers with the same TCP/IP port.
1125
 
    */
1126
 
    arg= 1;
1127
 
    (void) setsockopt(ip_sock,SOL_SOCKET,SO_REUSEADDR,(char*)&arg,sizeof(arg));
1128
 
 
1129
 
#ifdef IPV6_V6ONLY
1130
 
     /*
1131
 
       For interoperability with older clients, IPv6 socket should
1132
 
       listen on both IPv6 and IPv4 wildcard addresses.
1133
 
       Turn off IPV6_V6ONLY option.
1134
 
     */
1135
 
    if (ai->ai_family == AF_INET6)
1136
 
    {
1137
 
      arg= 0;      
1138
 
      (void) setsockopt(ip_sock, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,
1139
 
                sizeof(arg));
1140
 
    }
1141
 
#endif
 
1134
    fds[pollfd_count].fd= ip_sock;
 
1135
    fds[pollfd_count].events= POLLIN | POLLERR;
 
1136
 
 
1137
 
1142
1138
    /*
1143
1139
      Sometimes the port is not released fast enough when stopping and
1144
1140
      restarting the server. This happens quite often with the test suite
1149
1145
    */
1150
1146
    for (waited= 0, retry= 1; ; retry++, waited+= this_wait)
1151
1147
    {
1152
 
      if (((ret= bind(ip_sock, ai->ai_addr, ai->ai_addrlen)) >= 0 ) ||
 
1148
      if (((ret= bind(ip_sock, next->ai_addr, next->ai_addrlen)) >= 0 ) ||
1153
1149
          (socket_errno != SOCKET_EADDRINUSE) ||
1154
1150
          (waited >= mysqld_port_timeout))
1155
1151
        break;
1161
1157
    if (ret < 0)
1162
1158
    {
1163
1159
      sql_perror("Can't start server: Bind on TCP/IP port");
1164
 
      sql_print_error("Do you already have another mysqld server running on port: %d ?",mysqld_port);
 
1160
      sql_print_error("Do you already have another drizzled server running on port: %d ?",mysqld_port);
1165
1161
      unireg_abort(1);
1166
1162
    }
1167
1163
    if (listen(ip_sock,(int) back_log) < 0)
1174
1170
 
1175
1171
    /* Add the socket to our listeners */
1176
1172
    {
1177
 
      int ip_flags;
1178
 
      FD_SET(ip_sock, &clientFDs);
1179
 
      ip_flags= fcntl(ip_sock, F_GETFL, 0);
 
1173
      struct linger ling = {0, 0};
 
1174
      int flags =1;
1180
1175
 
1181
 
      if (!(test_flags & TEST_BLOCKING))
1182
 
      {
1183
 
        fcntl(ip_sock, F_SETFL, ip_flags | O_NONBLOCK);
1184
 
        fcntl(ip_sock, F_SETFL, ip_flags | O_NDELAY);
1185
 
      }
 
1176
      (void) setsockopt(ip_sock, SOL_SOCKET, SO_REUSEADDR, (char*)&flags, sizeof(flags));
 
1177
      (void) setsockopt(ip_sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
 
1178
      (void) setsockopt(ip_sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
 
1179
      (void) setsockopt(ip_sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
1186
1180
    }
1187
1181
  }
1188
1182
 
1189
 
  return;;
 
1183
  return;
1190
1184
}
1191
1185
 
1192
1186
/**
2783
2777
 
2784
2778
void handle_connections_sockets()
2785
2779
{
 
2780
  int x;
2786
2781
  my_socket sock,new_sock;
2787
2782
  uint error_count=0;
2788
 
  uint max_used_connection= (uint)ip_sock+1;
2789
 
  fd_set readFDs;
2790
2783
  THD *thd;
2791
2784
  struct sockaddr_storage cAddr;
2792
 
  int ip_flags=0, flags;
2793
2785
  st_vio *vio_tmp;
2794
2786
 
2795
 
  ip_flags= fcntl(ip_sock, F_GETFL, 0);
2796
 
 
2797
2787
  MAYBE_BROKEN_SYSCALL;
2798
2788
  while (!abort_loop)
2799
2789
  {
2800
 
    readFDs= clientFDs;
2801
 
    if (select((int) max_used_connection, &readFDs,0,0,0) < 0)
 
2790
    int number_of;
 
2791
 
 
2792
    if ((number_of= poll(fds, pollfd_count, -1)) == -1)
2802
2793
    {
2803
2794
      if (socket_errno != SOCKET_EINTR)
2804
2795
      {
2808
2799
      MAYBE_BROKEN_SYSCALL
2809
2800
      continue;
2810
2801
    }
 
2802
    if (number_of == 0)
 
2803
      continue;
 
2804
    
 
2805
#ifdef FIXME_IF_WE_WERE_KEEPING_THIS
 
2806
    assert(number_of > 1); /* Not handling this at the moment */
 
2807
#endif
 
2808
 
2811
2809
    if (abort_loop)
2812
2810
    {
2813
2811
      MAYBE_BROKEN_SYSCALL;
2814
2812
      break;
2815
2813
    }
2816
2814
 
2817
 
    sock= ip_sock;
2818
 
    flags= ip_flags;
 
2815
    for (x= 0, sock= -1; x < pollfd_count; x++)
 
2816
    {
 
2817
      if (fds[x].revents == POLLIN)
 
2818
      {
 
2819
        sock= fds[x].fd;
 
2820
        break;
 
2821
      }
 
2822
    }
 
2823
    assert(sock != -1);
2819
2824
 
2820
2825
    for (uint retry=0; retry < MAX_ACCEPT_RETRY; retry++)
2821
2826
    {
2822
2827
      size_socket length= sizeof(struct sockaddr_storage);
2823
2828
      new_sock= accept(sock, (struct sockaddr *)(&cAddr),
2824
2829
                       &length);
2825
 
      if (new_sock != INVALID_SOCKET ||
2826
 
          (socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN))
 
2830
      if (new_sock != INVALID_SOCKET || (socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN))
2827
2831
        break;
2828
 
      MAYBE_BROKEN_SYSCALL;
2829
 
      if (!(test_flags & TEST_BLOCKING))
2830
 
      {
2831
 
        if (retry == MAX_ACCEPT_RETRY - 1)
2832
 
          fcntl(sock, F_SETFL, flags);          // Try without O_NONBLOCK
2833
 
      }
2834
2832
    }
2835
2833
 
2836
 
    if (!(test_flags & TEST_BLOCKING))
2837
 
      fcntl(sock, F_SETFL, flags);
2838
2834
 
2839
2835
    if (new_sock == INVALID_SOCKET)
2840
2836
    {
4221
4217
  slave_exec_mode_options= (uint)
4222
4218
    find_bit_type_or_exit(slave_exec_mode_str, &slave_exec_mode_typelib, NULL);
4223
4219
  opt_specialflag= SPECIAL_ENGLISH;
4224
 
  ip_sock= INVALID_SOCKET;
4225
4220
  mysql_home_ptr= mysql_home;
4226
4221
  pidfile_name_ptr= pidfile_name;
4227
4222
  log_error_file_ptr= log_error_file;