~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to libdrizzle/net_serv.c

  • Committer: Monty Taylor
  • Date: 2008-09-16 00:00:48 UTC
  • mto: This revision was merged to the branch mainline in revision 391.
  • Revision ID: monty@inaugust.com-20080916000048-3rvrv3gv9l0ad3gs
Fixed copyright headers in drizzled/

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2008 Sun Microsystems, Inc.
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; either version 2 of the License, or
 
9
 *  (at your option) any later version.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
#include <drizzled/global.h>
 
22
#include "libdrizzle.h"
 
23
#include <libdrizzle/errmsg.h>
 
24
#include <vio/violite.h>
 
25
#include <signal.h>
 
26
#include <errno.h>
 
27
#include <sys/poll.h>
 
28
#include <zlib.h>
 
29
 
 
30
/*
 
31
  The following handles the differences when this is linked between the
 
32
  client and the server.
 
33
 
 
34
  This gives an error if a too big packet is found
 
35
  The server can change this with the -O switch, but because the client
 
36
  can't normally do this the client should have a bigger max_allowed_packet.
 
37
*/
 
38
 
 
39
 
 
40
#define update_statistics(A)
 
41
#define thd_increment_bytes_sent(N)
 
42
 
 
43
#define TEST_BLOCKING        8
 
44
#define MAX_PACKET_LENGTH (256L*256L*256L-1)
 
45
#define MIN_COMPRESS_LENGTH             50      /* Don't compress small bl. */
 
46
 
 
47
static bool net_write_buff(NET *net, const unsigned char *packet, uint32_t len);
 
48
 
 
49
 
 
50
/** Init with packet info. */
 
51
 
 
52
bool my_net_init(NET *net, Vio* vio)
 
53
{
 
54
  net->vio = vio;
 
55
  my_net_local_init(net);            /* Set some limits */
 
56
  if (!(net->buff=(uchar*) malloc((size_t) net->max_packet+
 
57
                                  NET_HEADER_SIZE + COMP_HEADER_SIZE)))
 
58
    return(1);
 
59
  net->buff_end=net->buff+net->max_packet;
 
60
  net->error=0; net->return_status=0;
 
61
  net->pkt_nr=net->compress_pkt_nr=0;
 
62
  net->write_pos=net->read_pos = net->buff;
 
63
  net->last_error[0]=0;
 
64
  net->compress=0; net->reading_or_writing=0;
 
65
  net->where_b = net->remain_in_buf=0;
 
66
  net->last_errno=0;
 
67
  net->unused= 0;
 
68
 
 
69
  if (vio != 0)                    /* If real connection */
 
70
  {
 
71
    net->fd  = vio_fd(vio);            /* For perl DBI/DBD */
 
72
    vio_fastsend(vio);
 
73
  }
 
74
  return(0);
 
75
}
 
76
 
 
77
bool net_init_sock(NET * net, int sock, int flags)
 
78
{
 
79
 
 
80
  Vio *vio_tmp= vio_new(sock, VIO_TYPE_TCPIP, flags);
 
81
  if (vio_tmp == NULL)
 
82
    return true;
 
83
  else
 
84
    if (my_net_init(net, vio_tmp))
 
85
    {
 
86
      /* Only delete the temporary vio if we didn't already attach it to the
 
87
       * NET object.
 
88
       */
 
89
      if (vio_tmp && (net->vio != vio_tmp))
 
90
        vio_delete(vio_tmp);
 
91
      else
 
92
      {
 
93
        (void) shutdown(sock, SHUT_RDWR);
 
94
        (void) close(sock);
 
95
      }
 
96
      return true;
 
97
    }
 
98
  return false;
 
99
}
 
100
 
 
101
void net_end(NET *net)
 
102
{
 
103
  if (net->buff != NULL)
 
104
    free(net->buff);
 
105
  net->buff=0;
 
106
  return;
 
107
}
 
108
 
 
109
void net_close(NET *net)
 
110
{
 
111
  if (net->vio != NULL)
 
112
  {
 
113
    vio_delete(net->vio);
 
114
    net->vio= 0;
 
115
  }
 
116
}
 
117
 
 
118
bool net_peer_addr(NET *net, char *buf, uint16_t *port, size_t buflen)
 
119
{
 
120
  return vio_peer_addr(net->vio, buf, port, buflen);
 
121
}
 
122
 
 
123
void net_keepalive(NET *net, bool flag)
 
124
{
 
125
  vio_keepalive(net->vio, flag);
 
126
}
 
127
 
 
128
int net_get_sd(NET *net)
 
129
{
 
130
  return net->vio->sd;
 
131
}
 
132
 
 
133
bool net_should_close(NET *net)
 
134
{
 
135
  return net->error || (net->vio == 0);
 
136
}
 
137
 
 
138
bool net_more_data(NET *net)
 
139
{
 
140
  return (net->vio == 0 || net->vio->read_pos < net->vio->read_end);
 
141
}
 
142
 
 
143
/** Realloc the packet buffer. */
 
144
 
 
145
bool net_realloc(NET *net, size_t length)
 
146
{
 
147
  uchar *buff;
 
148
  size_t pkt_length;
 
149
 
 
150
  if (length >= net->max_packet_size)
 
151
  {
 
152
    /* @todo: 1 and 2 codes are identical. */
 
153
    net->error= 1;
 
154
    net->last_errno= CR_NET_PACKET_TOO_LARGE;
 
155
    return(1);
 
156
  }
 
157
  pkt_length = (length+IO_SIZE-1) & ~(IO_SIZE-1);
 
158
  /*
 
159
    We must allocate some extra bytes for the end 0 and to be able to
 
160
    read big compressed blocks
 
161
  */
 
162
  if (!(buff= (uchar*) realloc((char*) net->buff, pkt_length +
 
163
                               NET_HEADER_SIZE + COMP_HEADER_SIZE)))
 
164
  {
 
165
    /* @todo: 1 and 2 codes are identical. */
 
166
    net->error= 1;
 
167
    net->last_errno= CR_OUT_OF_MEMORY;
 
168
    /* In the server the error is reported by MY_WME flag. */
 
169
    return(1);
 
170
  }
 
171
  net->buff=net->write_pos=buff;
 
172
  net->buff_end=buff+(net->max_packet= (uint32_t) pkt_length);
 
173
  return(0);
 
174
}
 
175
 
 
176
 
 
177
/**
 
178
   Check if there is any data to be read from the socket.
 
179
 
 
180
   @param sd   socket descriptor
 
181
 
 
182
   @retval
 
183
   0  No data to read
 
184
   @retval
 
185
   1  Data or EOF to read
 
186
   @retval
 
187
   -1   Don't know if data is ready or not
 
188
*/
 
189
 
 
190
static bool net_data_is_ready(int sd)
 
191
{
 
192
  struct pollfd ufds;
 
193
  int res;
 
194
 
 
195
  ufds.fd= sd;
 
196
  ufds.events= POLLIN | POLLPRI;
 
197
  if (!(res= poll(&ufds, 1, 0)))
 
198
    return 0;
 
199
  if (res < 0 || !(ufds.revents & (POLLIN | POLLPRI)))
 
200
    return 0;
 
201
  return 1;
 
202
}
 
203
 
 
204
/**
 
205
   Remove unwanted characters from connection
 
206
   and check if disconnected.
 
207
 
 
208
   Read from socket until there is nothing more to read. Discard
 
209
   what is read.
 
210
 
 
211
   If there is anything when to read 'net_clear' is called this
 
212
   normally indicates an error in the protocol.
 
213
 
 
214
   When connection is properly closed (for TCP it means with
 
215
   a FIN packet), then select() considers a socket "ready to read",
 
216
   in the sense that there's EOF to read, but read() returns 0.
 
217
 
 
218
   @param net            NET handler
 
219
   @param clear_buffer           if <> 0, then clear all data from comm buff
 
220
*/
 
221
 
 
222
void net_clear(NET *net, bool clear_buffer)
 
223
{
 
224
  if (clear_buffer)
 
225
  {
 
226
    while (net_data_is_ready(net->vio->sd) > 0)
 
227
    {
 
228
      /* The socket is ready */
 
229
      if (vio_read(net->vio, net->buff,
 
230
                   (size_t) net->max_packet) <= 0)
 
231
      {
 
232
        net->error= 2;
 
233
        break;
 
234
      }
 
235
    }
 
236
  }
 
237
  net->pkt_nr=net->compress_pkt_nr=0;        /* Ready for new command */
 
238
  net->write_pos=net->buff;
 
239
  return;
 
240
}
 
241
 
 
242
 
 
243
/** Flush write_buffer if not empty. */
 
244
 
 
245
bool net_flush(NET *net)
 
246
{
 
247
  bool error= 0;
 
248
  if (net->buff != net->write_pos)
 
249
  {
 
250
    error=test(net_real_write(net, net->buff,
 
251
                              (size_t) (net->write_pos - net->buff)));
 
252
    net->write_pos=net->buff;
 
253
  }
 
254
  /* Sync packet number if using compression */
 
255
  if (net->compress)
 
256
    net->pkt_nr=net->compress_pkt_nr;
 
257
  return(error);
 
258
}
 
259
 
 
260
 
 
261
/*****************************************************************************
 
262
 ** Write something to server/client buffer
 
263
 *****************************************************************************/
 
264
 
 
265
/**
 
266
   Write a logical packet with packet header.
 
267
 
 
268
   Format: Packet length (3 bytes), packet number(1 byte)
 
269
   When compression is used a 3 byte compression length is added
 
270
 
 
271
   @note
 
272
   If compression is used the original package is modified!
 
273
*/
 
274
 
 
275
bool
 
276
my_net_write(NET *net,const uchar *packet,size_t len)
 
277
{
 
278
  uchar buff[NET_HEADER_SIZE];
 
279
  if (unlikely(!net->vio)) /* nowhere to write */
 
280
    return 0;
 
281
  /*
 
282
    Big packets are handled by splitting them in packets of MAX_PACKET_LENGTH
 
283
    length. The last packet is always a packet that is < MAX_PACKET_LENGTH.
 
284
    (The last packet may even have a length of 0)
 
285
  */
 
286
  while (len >= MAX_PACKET_LENGTH)
 
287
  {
 
288
    const uint32_t z_size = MAX_PACKET_LENGTH;
 
289
    int3store(buff, z_size);
 
290
    buff[3]= (uchar) net->pkt_nr++;
 
291
    if (net_write_buff(net, buff, NET_HEADER_SIZE) ||
 
292
        net_write_buff(net, packet, z_size))
 
293
      return 1;
 
294
    packet += z_size;
 
295
    len-=     z_size;
 
296
  }
 
297
  /* Write last packet */
 
298
  int3store(buff,len);
 
299
  buff[3]= (uchar) net->pkt_nr++;
 
300
  if (net_write_buff(net, buff, NET_HEADER_SIZE))
 
301
    return 1;
 
302
  return test(net_write_buff(net,packet,len));
 
303
}
 
304
 
 
305
/**
 
306
   Send a command to the server.
 
307
 
 
308
   The reason for having both header and packet is so that libdrizzle
 
309
   can easy add a header to a special command (like prepared statements)
 
310
   without having to re-alloc the string.
 
311
 
 
312
   As the command is part of the first data packet, we have to do some data
 
313
   juggling to put the command in there, without having to create a new
 
314
   packet.
 
315
 
 
316
   This function will split big packets into sub-packets if needed.
 
317
   (Each sub packet can only be 2^24 bytes)
 
318
 
 
319
   @param net        NET handler
 
320
   @param command    Command in MySQL server (enum enum_server_command)
 
321
   @param header    Header to write after command
 
322
   @param head_len    Length of header
 
323
   @param packet    Query or parameter to query
 
324
   @param len        Length of packet
 
325
 
 
326
   @retval
 
327
   0    ok
 
328
   @retval
 
329
   1    error
 
330
*/
 
331
 
 
332
bool
 
333
net_write_command(NET *net,uchar command,
 
334
                  const uchar *header, size_t head_len,
 
335
                  const uchar *packet, size_t len)
 
336
{
 
337
  uint32_t length=len+1+head_len;            /* 1 extra byte for command */
 
338
  uchar buff[NET_HEADER_SIZE+1];
 
339
  uint header_size=NET_HEADER_SIZE+1;
 
340
 
 
341
  buff[4]=command;                /* For first packet */
 
342
 
 
343
  if (length >= MAX_PACKET_LENGTH)
 
344
  {
 
345
    /* Take into account that we have the command in the first header */
 
346
    len= MAX_PACKET_LENGTH - 1 - head_len;
 
347
    do
 
348
    {
 
349
      int3store(buff, MAX_PACKET_LENGTH);
 
350
      buff[3]= (uchar) net->pkt_nr++;
 
351
      if (net_write_buff(net, buff, header_size) ||
 
352
          net_write_buff(net, header, head_len) ||
 
353
          net_write_buff(net, packet, len))
 
354
        return(1);
 
355
      packet+= len;
 
356
      length-= MAX_PACKET_LENGTH;
 
357
      len= MAX_PACKET_LENGTH;
 
358
      head_len= 0;
 
359
      header_size= NET_HEADER_SIZE;
 
360
    } while (length >= MAX_PACKET_LENGTH);
 
361
    len=length;                    /* Data left to be written */
 
362
  }
 
363
  int3store(buff,length);
 
364
  buff[3]= (uchar) net->pkt_nr++;
 
365
  return(test(net_write_buff(net, buff, header_size) ||
 
366
              (head_len && net_write_buff(net, header, head_len)) ||
 
367
              net_write_buff(net, packet, len) || net_flush(net)));
 
368
}
 
369
 
 
370
/**
 
371
   Caching the data in a local buffer before sending it.
 
372
 
 
373
   Fill up net->buffer and send it to the client when full.
 
374
 
 
375
   If the rest of the to-be-sent-packet is bigger than buffer,
 
376
   send it in one big block (to avoid copying to internal buffer).
 
377
   If not, copy the rest of the data to the buffer and return without
 
378
   sending data.
 
379
 
 
380
   @param net        Network handler
 
381
   @param packet    Packet to send
 
382
   @param len        Length of packet
 
383
 
 
384
   @note
 
385
   The cached buffer can be sent as it is with 'net_flush()'.
 
386
   In this code we have to be careful to not send a packet longer than
 
387
   MAX_PACKET_LENGTH to net_real_write() if we are using the compressed
 
388
   protocol as we store the length of the compressed packet in 3 bytes.
 
389
 
 
390
   @retval
 
391
   0    ok
 
392
   @retval
 
393
   1
 
394
*/
 
395
 
 
396
static bool
 
397
net_write_buff(NET *net, const unsigned char *packet, uint32_t len)
 
398
{
 
399
  uint32_t left_length;
 
400
  if (net->compress && net->max_packet > MAX_PACKET_LENGTH)
 
401
    left_length= MAX_PACKET_LENGTH - (net->write_pos - net->buff);
 
402
  else
 
403
    left_length= (uint32_t) (net->buff_end - net->write_pos);
 
404
 
 
405
  if (len > left_length)
 
406
  {
 
407
    if (net->write_pos != net->buff)
 
408
    {
 
409
      /* Fill up already used packet and write it */
 
410
      memcpy(net->write_pos,packet,left_length);
 
411
      if (net_real_write(net, net->buff,
 
412
                         (size_t) (net->write_pos - net->buff) + left_length))
 
413
        return 1;
 
414
      net->write_pos= net->buff;
 
415
      packet+= left_length;
 
416
      len-= left_length;
 
417
    }
 
418
    if (net->compress)
 
419
    {
 
420
      /*
 
421
        We can't have bigger packets than 16M with compression
 
422
        Because the uncompressed length is stored in 3 bytes
 
423
      */
 
424
      left_length= MAX_PACKET_LENGTH;
 
425
      while (len > left_length)
 
426
      {
 
427
        if (net_real_write(net, packet, left_length))
 
428
          return 1;
 
429
        packet+= left_length;
 
430
        len-= left_length;
 
431
      }
 
432
    }
 
433
    if (len > net->max_packet)
 
434
      return net_real_write(net, packet, len) ? 1 : 0;
 
435
    /* Send out rest of the blocks as full sized blocks */
 
436
  }
 
437
  memcpy(net->write_pos,packet,len);
 
438
  net->write_pos+= len;
 
439
  return 0;
 
440
}
 
441
 
 
442
 
 
443
/**
 
444
   Read and write one packet using timeouts.
 
445
   If needed, the packet is compressed before sending.
 
446
 
 
447
   @todo
 
448
   - TODO is it needed to set this variable if we have no socket
 
449
*/
 
450
 
 
451
/*
 
452
  TODO: rewrite this in a manner to do non-block writes. If a write can not be made, and we are
 
453
  in the server, yield to another process and come back later.
 
454
*/
 
455
int
 
456
net_real_write(NET *net,const uchar *packet, size_t len)
 
457
{
 
458
  size_t length;
 
459
  const uchar *pos,*end;
 
460
  uint retry_count= 0;
 
461
 
 
462
  /* Backup of the original SO_RCVTIMEO timeout */
 
463
  struct timeval backtime;
 
464
  int error;
 
465
 
 
466
  if (net->error == 2)
 
467
    return(-1);                /* socket can't be used */
 
468
 
 
469
  net->reading_or_writing=2;
 
470
  if (net->compress)
 
471
  {
 
472
    size_t complen;
 
473
    uchar *b;
 
474
    const uint header_length=NET_HEADER_SIZE+COMP_HEADER_SIZE;
 
475
    if (!(b= (uchar*) malloc(len + NET_HEADER_SIZE +
 
476
                             COMP_HEADER_SIZE)))
 
477
    {
 
478
      net->error= 2;
 
479
      net->last_errno= CR_OUT_OF_MEMORY;
 
480
      /* In the server, the error is reported by MY_WME flag. */
 
481
      net->reading_or_writing= 0;
 
482
      return(1);
 
483
    }
 
484
    memcpy(b+header_length,packet,len);
 
485
 
 
486
    complen= len * 120 / 100 + 12;
 
487
    unsigned char * compbuf= (unsigned char *) malloc(complen);
 
488
    if (compbuf != NULL)
 
489
    {
 
490
      uLongf tmp_complen= complen;
 
491
      int res= compress((Bytef*) compbuf, &tmp_complen,
 
492
                        (Bytef*) (b+header_length),
 
493
                        len);
 
494
      complen= tmp_complen;
 
495
 
 
496
      free(compbuf);
 
497
 
 
498
      if ((res != Z_OK) || (complen >= len))
 
499
        complen= 0;
 
500
      else
 
501
      {
 
502
        size_t tmplen= complen;
 
503
        complen= len;
 
504
        len= tmplen;
 
505
      }
 
506
    }
 
507
    else
 
508
    {
 
509
      complen=0;
 
510
    }
 
511
    int3store(&b[NET_HEADER_SIZE],complen);
 
512
    int3store(b,len);
 
513
    b[3]=(uchar) (net->compress_pkt_nr++);
 
514
    len+= header_length;
 
515
    packet= b;
 
516
  }
 
517
 
 
518
  /* Check for error, currently assert */
 
519
  if (net->write_timeout)
 
520
  {
 
521
    struct timeval waittime;
 
522
    socklen_t length;
 
523
 
 
524
    waittime.tv_sec= net->write_timeout;
 
525
    waittime.tv_usec= 0;
 
526
 
 
527
    memset(&backtime, 0, sizeof(struct timeval));
 
528
    length= sizeof(struct timeval);
 
529
    error= getsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO,
 
530
                      &backtime, &length);
 
531
    if (error != 0)
 
532
    {
 
533
      perror("getsockopt");
 
534
      assert(error == 0);
 
535
    }
 
536
    error= setsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO,
 
537
                      &waittime, (socklen_t)sizeof(struct timeval));
 
538
    assert(error == 0);
 
539
  }
 
540
  pos= packet;
 
541
  end=pos+len;
 
542
  /* Loop until we have read everything */
 
543
  while (pos != end)
 
544
  {
 
545
    if ((long) (length= vio_write(net->vio,pos,(size_t) (end-pos))) <= 0)
 
546
    {
 
547
      const bool interrupted= vio_should_retry(net->vio);
 
548
      /*
 
549
        If we read 0, or we were interrupted this means that
 
550
        we need to switch to blocking mode and wait until the timeout
 
551
        on the socket kicks in.
 
552
      */
 
553
      if ((interrupted || length == 0))
 
554
      {
 
555
        bool old_mode;
 
556
 
 
557
        while (vio_blocking(net->vio, true, &old_mode) < 0)
 
558
        {
 
559
          if (vio_should_retry(net->vio) && retry_count++ < net->retry_count)
 
560
            continue;
 
561
          net->error= 2;                     /* Close socket */
 
562
          net->last_errno= CR_NET_PACKET_TOO_LARGE;
 
563
          goto end;
 
564
        }
 
565
        retry_count=0;
 
566
        continue;
 
567
      }
 
568
      else
 
569
      {
 
570
        if (retry_count++ < net->retry_count)
 
571
          continue;
 
572
      }
 
573
 
 
574
      if (vio_errno(net->vio) == SOCKET_EINTR)
 
575
      {
 
576
        continue;
 
577
      }
 
578
      net->error= 2;                /* Close socket */
 
579
      net->last_errno= (interrupted ? CR_NET_WRITE_INTERRUPTED :
 
580
                        CR_NET_ERROR_ON_WRITE);
 
581
      break;
 
582
    }
 
583
    pos+=length;
 
584
    update_statistics(thd_increment_bytes_sent(length));
 
585
  }
 
586
end:
 
587
  if ((net->compress) && (packet != NULL))
 
588
    free((char*) packet);
 
589
  net->reading_or_writing=0;
 
590
 
 
591
  if (net->write_timeout)
 
592
    error= setsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO,
 
593
                      &backtime, (socklen_t)sizeof(struct timeval));
 
594
 
 
595
  return(((int) (pos != end)));
 
596
}
 
597
 
 
598
 
 
599
/**
 
600
   Reads one packet to net->buff + net->where_b.
 
601
   Long packets are handled by my_net_read().
 
602
   This function reallocates the net->buff buffer if necessary.
 
603
 
 
604
   @return
 
605
   Returns length of packet.
 
606
*/
 
607
 
 
608
static uint32_t
 
609
my_real_read(NET *net, size_t *complen)
 
610
{
 
611
  uchar *pos;
 
612
  size_t length;
 
613
  uint i,retry_count=0;
 
614
  uint32_t len=packet_error;
 
615
  uint32_t remain= (net->compress ? NET_HEADER_SIZE+COMP_HEADER_SIZE :
 
616
                    NET_HEADER_SIZE);
 
617
  /* Backup of the original SO_RCVTIMEO timeout */
 
618
  struct timeval backtime;
 
619
  int error= 0;
 
620
 
 
621
  *complen = 0;
 
622
 
 
623
  net->reading_or_writing= 1;
 
624
  /* Read timeout is set in my_net_set_read_timeout */
 
625
 
 
626
  pos = net->buff + net->where_b;        /* net->packet -4 */
 
627
 
 
628
 
 
629
  /* Check for error, currently assert */
 
630
  if (net->read_timeout)
 
631
  {
 
632
    struct timeval waittime;
 
633
    socklen_t length;
 
634
 
 
635
    waittime.tv_sec= net->read_timeout;
 
636
    waittime.tv_usec= 0;
 
637
 
 
638
    memset(&backtime, 0, sizeof(struct timeval));
 
639
    length= sizeof(struct timeval);
 
640
    error= getsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO,
 
641
                      &backtime, &length);
 
642
    if (error != 0)
 
643
    {
 
644
      perror("getsockopt");
 
645
      assert(error == 0);
 
646
    }
 
647
    error= setsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO,
 
648
                      &waittime, (socklen_t)sizeof(struct timeval));
 
649
    assert(error == 0);
 
650
  }
 
651
 
 
652
  for (i= 0; i < 2 ; i++)
 
653
  {
 
654
    while (remain > 0)
 
655
    {
 
656
      /* First read is done with non blocking mode */
 
657
      if ((long) (length= vio_read(net->vio, pos, remain)) <= 0L)
 
658
      {
 
659
        const bool interrupted = vio_should_retry(net->vio);
 
660
 
 
661
        if (interrupted)
 
662
        {                    /* Probably in MIT threads */
 
663
          if (retry_count++ < net->retry_count)
 
664
            continue;
 
665
        }
 
666
        if (vio_errno(net->vio) == SOCKET_EINTR)
 
667
        {
 
668
          continue;
 
669
        }
 
670
        len= packet_error;
 
671
        net->error= 2;                /* Close socket */
 
672
        net->last_errno= (vio_was_interrupted(net->vio) ?
 
673
                          CR_NET_READ_INTERRUPTED :
 
674
                          CR_NET_READ_ERROR);
 
675
        ER(net->last_errno);
 
676
        goto end;
 
677
      }
 
678
      remain -= (uint32_t) length;
 
679
      pos+= length;
 
680
      update_statistics(thd_increment_bytes_received(length));
 
681
    }
 
682
    if (i == 0)
 
683
    {                    /* First parts is packet length */
 
684
      uint32_t helping;
 
685
 
 
686
      if (net->buff[net->where_b + 3] != (uchar) net->pkt_nr)
 
687
      {
 
688
        len= packet_error;
 
689
        /* Not a NET error on the client. XXX: why? */
 
690
        goto end;
 
691
      }
 
692
      net->compress_pkt_nr= ++net->pkt_nr;
 
693
      if (net->compress)
 
694
      {
 
695
        /*
 
696
          If the packet is compressed then complen > 0 and contains the
 
697
          number of bytes in the uncompressed packet
 
698
        */
 
699
        *complen=uint3korr(&(net->buff[net->where_b + NET_HEADER_SIZE]));
 
700
      }
 
701
 
 
702
      len=uint3korr(net->buff+net->where_b);
 
703
      if (!len)                /* End of big multi-packet */
 
704
        goto end;
 
705
      helping = max(len,*complen) + net->where_b;
 
706
      /* The necessary size of net->buff */
 
707
      if (helping >= net->max_packet)
 
708
      {
 
709
        if (net_realloc(net,helping))
 
710
        {
 
711
          len= packet_error;          /* Return error and close connection */
 
712
          goto end;
 
713
        }
 
714
      }
 
715
      pos=net->buff + net->where_b;
 
716
      remain = (uint32_t) len;
 
717
    }
 
718
  }
 
719
 
 
720
end:
 
721
  if  (net->read_timeout)
 
722
    error= setsockopt(net->vio->sd, SOL_SOCKET, SO_RCVTIMEO,
 
723
                      &backtime, (socklen_t)sizeof(struct timeval));
 
724
  assert(error == 0);
 
725
  net->reading_or_writing= 0;
 
726
 
 
727
  return(len);
 
728
}
 
729
 
 
730
 
 
731
/**
 
732
   Read a packet from the client/server and return it without the internal
 
733
   package header.
 
734
 
 
735
   If the packet is the first packet of a multi-packet packet
 
736
   (which is indicated by the length of the packet = 0xffffff) then
 
737
   all sub packets are read and concatenated.
 
738
 
 
739
   If the packet was compressed, its uncompressed and the length of the
 
740
   uncompressed packet is returned.
 
741
 
 
742
   @return
 
743
   The function returns the length of the found packet or packet_error.
 
744
   net->read_pos points to the read data.
 
745
*/
 
746
 
 
747
uint32_t
 
748
my_net_read(NET *net)
 
749
{
 
750
  size_t len, complen;
 
751
 
 
752
  if (!net->compress)
 
753
  {
 
754
    len = my_real_read(net,&complen);
 
755
    if (len == MAX_PACKET_LENGTH)
 
756
    {
 
757
      /* First packet of a multi-packet.  Concatenate the packets */
 
758
      uint32_t save_pos = net->where_b;
 
759
      size_t total_length= 0;
 
760
      do
 
761
      {
 
762
        net->where_b += len;
 
763
        total_length += len;
 
764
        len = my_real_read(net,&complen);
 
765
      } while (len == MAX_PACKET_LENGTH);
 
766
      if (len != packet_error)
 
767
        len+= total_length;
 
768
      net->where_b = save_pos;
 
769
    }
 
770
    net->read_pos = net->buff + net->where_b;
 
771
    if (len != packet_error)
 
772
      net->read_pos[len]=0;        /* Safeguard for drizzle_use_result */
 
773
    return len;
 
774
  }
 
775
  else
 
776
  {
 
777
    /* We are using the compressed protocol */
 
778
 
 
779
    uint32_t buf_length;
 
780
    uint32_t start_of_packet;
 
781
    uint32_t first_packet_offset;
 
782
    uint read_length, multi_byte_packet=0;
 
783
 
 
784
    if (net->remain_in_buf)
 
785
    {
 
786
      buf_length= net->buf_length;        /* Data left in old packet */
 
787
      first_packet_offset= start_of_packet= (net->buf_length -
 
788
                                             net->remain_in_buf);
 
789
      /* Restore the character that was overwritten by the end 0 */
 
790
      net->buff[start_of_packet]= net->save_char;
 
791
    }
 
792
    else
 
793
    {
 
794
      /* reuse buffer, as there is nothing in it that we need */
 
795
      buf_length= start_of_packet= first_packet_offset= 0;
 
796
    }
 
797
    for (;;)
 
798
    {
 
799
      uint32_t packet_len;
 
800
 
 
801
      if (buf_length - start_of_packet >= NET_HEADER_SIZE)
 
802
      {
 
803
        read_length = uint3korr(net->buff+start_of_packet);
 
804
        if (!read_length)
 
805
        {
 
806
          /* End of multi-byte packet */
 
807
          start_of_packet += NET_HEADER_SIZE;
 
808
          break;
 
809
        }
 
810
        if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet)
 
811
        {
 
812
          if (multi_byte_packet)
 
813
          {
 
814
            /* Remove packet header for second packet */
 
815
            memmove(net->buff + first_packet_offset + start_of_packet,
 
816
                    net->buff + first_packet_offset + start_of_packet +
 
817
                    NET_HEADER_SIZE,
 
818
                    buf_length - start_of_packet);
 
819
            start_of_packet += read_length;
 
820
            buf_length -= NET_HEADER_SIZE;
 
821
          }
 
822
          else
 
823
            start_of_packet+= read_length + NET_HEADER_SIZE;
 
824
 
 
825
          if (read_length != MAX_PACKET_LENGTH)    /* last package */
 
826
          {
 
827
            multi_byte_packet= 0;        /* No last zero len packet */
 
828
            break;
 
829
          }
 
830
          multi_byte_packet= NET_HEADER_SIZE;
 
831
          /* Move data down to read next data packet after current one */
 
832
          if (first_packet_offset)
 
833
          {
 
834
            memmove(net->buff,net->buff+first_packet_offset,
 
835
                    buf_length-first_packet_offset);
 
836
            buf_length-=first_packet_offset;
 
837
            start_of_packet -= first_packet_offset;
 
838
            first_packet_offset=0;
 
839
          }
 
840
          continue;
 
841
        }
 
842
      }
 
843
      /* Move data down to read next data packet after current one */
 
844
      if (first_packet_offset)
 
845
      {
 
846
        memmove(net->buff,net->buff+first_packet_offset,
 
847
                buf_length-first_packet_offset);
 
848
        buf_length-=first_packet_offset;
 
849
        start_of_packet -= first_packet_offset;
 
850
        first_packet_offset=0;
 
851
      }
 
852
 
 
853
      net->where_b=buf_length;
 
854
      if ((packet_len = my_real_read(net,&complen)) == packet_error)
 
855
        return packet_error;
 
856
 
 
857
      if (complen)
 
858
      {
 
859
        unsigned char * compbuf= (unsigned char *) malloc(complen);
 
860
        if (compbuf != NULL)
 
861
        {
 
862
          uLongf tmp_complen= complen;
 
863
          int error= uncompress((Bytef*) compbuf, &tmp_complen,
 
864
                                (Bytef*) (net->buff + net->where_b),
 
865
                                (uLong)packet_len);
 
866
          complen= tmp_complen;
 
867
 
 
868
          if (error != Z_OK)
 
869
          {
 
870
            net->error= 2;            /* caller will close socket */
 
871
            net->last_errno= CR_NET_UNCOMPRESS_ERROR;
 
872
          }
 
873
          else
 
874
          {
 
875
            memcpy((net->buff + net->where_b), compbuf, complen);
 
876
          }
 
877
          free(compbuf);
 
878
        }
 
879
      }
 
880
      else
 
881
        complen= packet_len;
 
882
 
 
883
    }
 
884
    buf_length+= complen;
 
885
 
 
886
    net->read_pos=      net->buff+ first_packet_offset + NET_HEADER_SIZE;
 
887
    net->buf_length=    buf_length;
 
888
    net->remain_in_buf= (uint32_t) (buf_length - start_of_packet);
 
889
    len = ((uint32_t) (start_of_packet - first_packet_offset) - NET_HEADER_SIZE -
 
890
           multi_byte_packet);
 
891
    net->save_char= net->read_pos[len];    /* Must be saved */
 
892
    net->read_pos[len]=0;        /* Safeguard for drizzle_use_result */
 
893
  }
 
894
  return len;
 
895
  }
 
896
 
 
897
 
 
898
void my_net_set_read_timeout(NET *net, uint timeout)
 
899
{
 
900
  net->read_timeout= timeout;
 
901
  if (net->vio)
 
902
    vio_timeout(net->vio, 0, timeout);
 
903
  return;
 
904
}
 
905
 
 
906
 
 
907
void my_net_set_write_timeout(NET *net, uint timeout)
 
908
{
 
909
  net->write_timeout= timeout;
 
910
  if (net->vio)
 
911
    vio_timeout(net->vio, 1, timeout);
 
912
  return;
 
913
}