~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to libdrizzle/net_serv.c

  • Committer: Brian Aker
  • Date: 2008-07-16 03:31:49 UTC
  • Revision ID: brian@tangent.org-20080716033149-1z4kyxcy1pgo8ih5
Removed thd alarm around socket, we now use timeouts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
145
145
 
146
146
static int net_data_is_ready(my_socket sd)
147
147
{
148
 
#ifdef HAVE_POLL
149
148
  struct pollfd ufds;
150
149
  int res;
151
150
 
156
155
  if (res < 0 || !(ufds.revents & (POLLIN | POLLPRI)))
157
156
    return 0;
158
157
  return 1;
159
 
#else
160
 
  fd_set sfds;
161
 
  struct timeval tv;
162
 
  int res;
163
 
 
164
 
  /* Windows uses an _array_ of 64 fd's as default, so it's safe */
165
 
  if (sd >= FD_SETSIZE)
166
 
    return -1;
167
 
#define NET_DATA_IS_READY_CAN_RETURN_MINUS_ONE
168
 
 
169
 
  FD_ZERO(&sfds);
170
 
  FD_SET(sd, &sfds);
171
 
 
172
 
  tv.tv_sec= tv.tv_usec= 0;
173
 
 
174
 
  if ((res= select(sd+1, &sfds, NULL, NULL, &tv)) < 0)
175
 
    return 0;
176
 
  else
177
 
    return test(res ? FD_ISSET(sd, &sfds) : 0);
178
 
#endif /* HAVE_POLL */
179
158
}
180
159
 
181
160
/**
220
199
        break;
221
200
      }
222
201
    }
223
 
#ifdef NET_DATA_IS_READY_CAN_RETURN_MINUS_ONE
224
 
    /* 'net_data_is_ready' returned "don't know" */
225
 
    if (ready == -1)
226
 
    {
227
 
      /* Read unblocking to clear net */
228
 
      bool old_mode;
229
 
      if (!vio_blocking(net->vio, false, &old_mode))
230
 
      {
231
 
        while ((long) (count= vio_read(net->vio, net->buff,
232
 
                                       (size_t) net->max_packet)) > 0)
233
 
          DBUG_PRINT("info",("skipped %ld bytes from file: %s",
234
 
                             (long) count, vio_description(net->vio)));
235
 
        vio_blocking(net->vio, true, &old_mode);
236
 
      }
237
 
    }
238
 
#endif /* NET_DATA_IS_READY_CAN_RETURN_MINUS_ONE */
239
202
  }
240
203
  net->pkt_nr=net->compress_pkt_nr=0;           /* Ready for new command */
241
204
  net->write_pos=net->buff;
589
552
  size_t length;
590
553
  uint i,retry_count=0;
591
554
  ulong len=packet_error;
592
 
  thr_alarm_t alarmed;
593
 
  my_bool net_blocking=vio_is_blocking(net->vio);
594
555
  uint32 remain= (net->compress ? NET_HEADER_SIZE+COMP_HEADER_SIZE :
595
 
                  NET_HEADER_SIZE);
 
556
                  NET_HEADER_SIZE);
 
557
  /* Backup of the original SO_RCVTIMEO timeout */
 
558
  struct timeval backtime;
 
559
  int error;
 
560
 
596
561
  *complen = 0;
597
562
 
598
 
  net->reading_or_writing=1;
599
 
  thr_alarm_init(&alarmed);
 
563
  net->reading_or_writing= 1;
600
564
  /* Read timeout is set in my_net_set_read_timeout */
601
565
 
602
 
    pos = net->buff + net->where_b;             /* net->packet -4 */
603
 
    for (i=0 ; i < 2 ; i++)
 
566
  pos = net->buff + net->where_b;               /* net->packet -4 */
 
567
 
 
568
 
 
569
  /* Check for error, currently assert */
 
570
  {
 
571
    struct timeval waittime;
 
572
    socklen_t length;
 
573
 
 
574
    waittime.tv_sec= net->read_timeout;
 
575
    waittime.tv_usec= 0;
 
576
 
 
577
    memset(&backtime, 0, sizeof(struct timeval));
 
578
    error= getsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO, 
 
579
                      &backtime, &length);
 
580
    assert(error == 0);
 
581
    error= setsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO, 
 
582
                      &waittime, (socklen_t)sizeof(struct timeval));
 
583
    assert(error == 0);
 
584
  }
 
585
 
 
586
  for (i= 0; i < 2 ; i++)
 
587
  {
 
588
    while (remain > 0)
604
589
    {
605
 
      while (remain > 0)
606
 
      {
607
 
        /* First read is done with non blocking mode */
608
 
        if ((long) (length= vio_read(net->vio, pos, remain)) <= 0L)
609
 
        {
610
 
          my_bool interrupted = vio_should_retry(net->vio);
611
 
 
612
 
          DBUG_PRINT("info",("vio_read returned %ld  errno: %d",
613
 
                             (long) length, vio_errno(net->vio)));
614
 
          if (thr_alarm_in_use(&alarmed) && !thr_got_alarm(&alarmed) &&
615
 
              interrupted)
616
 
          {                                     /* Probably in MIT threads */
617
 
            if (retry_count++ < net->retry_count)
618
 
              continue;
619
 
#ifdef EXTRA_DEBUG
620
 
            fprintf(stderr, "%s: read looped with error %d, aborting thread\n",
621
 
                    my_progname,vio_errno(net->vio));
622
 
#endif /* EXTRA_DEBUG */
623
 
          }
624
 
          if (vio_errno(net->vio) == SOCKET_EINTR)
625
 
          {
626
 
            DBUG_PRINT("warning",("Interrupted read. Retrying..."));
627
 
            continue;
628
 
          }
629
 
          DBUG_PRINT("error",("Couldn't read packet: remain: %u  errno: %d  length: %ld",
630
 
                              remain, vio_errno(net->vio), (long) length));
631
 
          len= packet_error;
632
 
          net->error= 2;                                /* Close socket */
633
 
          net->last_errno= (vio_was_interrupted(net->vio) ?
634
 
                                   ER_NET_READ_INTERRUPTED :
635
 
                                   ER_NET_READ_ERROR);
636
 
          goto end;
637
 
        }
638
 
        remain -= (uint32) length;
639
 
        pos+= length;
640
 
        update_statistics(thd_increment_bytes_received(length));
641
 
      }
642
 
      if (i == 0)
643
 
      {                                 /* First parts is packet length */
644
 
        ulong helping;
645
 
        DBUG_DUMP("packet_header", net->buff+net->where_b,
646
 
                  NET_HEADER_SIZE);
647
 
        if (net->buff[net->where_b + 3] != (uchar) net->pkt_nr)
648
 
        {
649
 
          if (net->buff[net->where_b] != (uchar) 255)
650
 
          {
651
 
            DBUG_PRINT("error",
652
 
                       ("Packets out of order (Found: %d, expected %u)",
653
 
                        (int) net->buff[net->where_b + 3],
654
 
                        net->pkt_nr));
655
 
#ifdef EXTRA_DEBUG
656
 
            fflush(stdout);
657
 
            fprintf(stderr,"Error: Packets out of order (Found: %d, expected %d)\n",
658
 
                    (int) net->buff[net->where_b + 3],
659
 
                    (uint) (uchar) net->pkt_nr);
660
 
            fflush(stderr);
661
 
            DBUG_ASSERT(0);
662
 
#endif
663
 
          }
664
 
          len= packet_error;
665
 
          /* Not a NET error on the client. XXX: why? */
666
 
          goto end;
667
 
        }
668
 
        net->compress_pkt_nr= ++net->pkt_nr;
669
 
        if (net->compress)
670
 
        {
671
 
          /*
672
 
            If the packet is compressed then complen > 0 and contains the
673
 
            number of bytes in the uncompressed packet
674
 
          */
675
 
          *complen=uint3korr(&(net->buff[net->where_b + NET_HEADER_SIZE]));
676
 
        }
677
 
 
678
 
        len=uint3korr(net->buff+net->where_b);
679
 
        if (!len)                               /* End of big multi-packet */
680
 
          goto end;
681
 
        helping = max(len,*complen) + net->where_b;
682
 
        /* The necessary size of net->buff */
683
 
        if (helping >= net->max_packet)
684
 
        {
685
 
          if (net_realloc(net,helping))
686
 
          {
687
 
            len= packet_error;          /* Return error and close connection */
688
 
            goto end;
689
 
          }
690
 
        }
691
 
        pos=net->buff + net->where_b;
692
 
        remain = (uint32) len;
693
 
      }
694
 
    }
 
590
      /* First read is done with non blocking mode */
 
591
      if ((long) (length= vio_read(net->vio, pos, remain)) <= 0L)
 
592
      {
 
593
        bool interrupted = vio_should_retry(net->vio);
 
594
 
 
595
        DBUG_PRINT("info",("vio_read returned %ld  errno: %d",
 
596
                           (long) length, vio_errno(net->vio)));
 
597
        if (interrupted)
 
598
        {                                       /* Probably in MIT threads */
 
599
          if (retry_count++ < net->retry_count)
 
600
            continue;
 
601
        }
 
602
        if (vio_errno(net->vio) == SOCKET_EINTR)
 
603
        {
 
604
          continue;
 
605
        }
 
606
        len= packet_error;
 
607
        net->error= 2;                          /* Close socket */
 
608
        net->last_errno= (vio_was_interrupted(net->vio) ?
 
609
                          ER_NET_READ_INTERRUPTED :
 
610
                          ER_NET_READ_ERROR);
 
611
        goto end;
 
612
      }
 
613
      remain -= (uint32) length;
 
614
      pos+= length;
 
615
      update_statistics(thd_increment_bytes_received(length));
 
616
    }
 
617
    if (i == 0)
 
618
    {                                   /* First parts is packet length */
 
619
      ulong helping;
 
620
 
 
621
      if (net->buff[net->where_b + 3] != (uchar) net->pkt_nr)
 
622
      {
 
623
        len= packet_error;
 
624
        /* Not a NET error on the client. XXX: why? */
 
625
        goto end;
 
626
      }
 
627
      net->compress_pkt_nr= ++net->pkt_nr;
 
628
      if (net->compress)
 
629
      {
 
630
        /*
 
631
          If the packet is compressed then complen > 0 and contains the
 
632
          number of bytes in the uncompressed packet
 
633
        */
 
634
        *complen=uint3korr(&(net->buff[net->where_b + NET_HEADER_SIZE]));
 
635
      }
 
636
 
 
637
      len=uint3korr(net->buff+net->where_b);
 
638
      if (!len)                         /* End of big multi-packet */
 
639
        goto end;
 
640
      helping = max(len,*complen) + net->where_b;
 
641
      /* The necessary size of net->buff */
 
642
      if (helping >= net->max_packet)
 
643
      {
 
644
        if (net_realloc(net,helping))
 
645
        {
 
646
          len= packet_error;          /* Return error and close connection */
 
647
          goto end;
 
648
        }
 
649
      }
 
650
      pos=net->buff + net->where_b;
 
651
      remain = (uint32) len;
 
652
    }
 
653
  }
695
654
 
696
655
end:
697
 
  if (thr_alarm_in_use(&alarmed))
698
 
  {
699
 
    bool old_mode;
700
 
    thr_end_alarm(&alarmed);
701
 
    vio_blocking(net->vio, net_blocking, &old_mode);
702
 
  }
703
 
  net->reading_or_writing=0;
704
 
#ifdef DEBUG_DATA_PACKETS
705
 
  if (len != packet_error)
706
 
    DBUG_DUMP("data", net->buff+net->where_b, len);
707
 
#endif
 
656
  error= setsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO, 
 
657
                    &backtime, (socklen_t)sizeof(struct timeval));
 
658
  assert(error == 0);
 
659
  net->reading_or_writing= 0;
 
660
 
708
661
  return(len);
709
662
}
710
663