~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSSocket.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh (H&G2JCtL)
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-05-24
23
 
 *
24
 
 * CORE SYSTEM:
25
 
 * Basic socket I/O.
26
 
 *
27
 
 */
28
 
 
29
 
#include "CSConfig.h"
30
 
 
31
 
#include <stdio.h>
32
 
#include <sys/types.h>
33
 
 
34
 
#ifdef OS_WINDOWS
35
 
#include <winsock.h>
36
 
typedef int socklen_t;
37
 
#define SHUT_RDWR 2
38
 
#define CLOSE_SOCKET(s) closesocket(s)
39
 
#define IOCTL_SOCKET    ioctlsocket
40
 
#define SOCKET_ERRORNO  WSAGetLastError()
41
 
#define EWOULDBLOCK             WSAEWOULDBLOCK
42
 
 
43
 
#else
44
 
#include <sys/ioctl.h>
45
 
#include <sys/socket.h>
46
 
#include <netdb.h>
47
 
#include <netinet/in.h>
48
 
#include <arpa/inet.h>
49
 
#include <netinet/in.h>
50
 
#include <netinet/tcp.h>
51
 
#include <sys/select.h>
52
 
#include <fcntl.h>
53
 
 
54
 
extern void unix_close(int h);
55
 
#define CLOSE_SOCKET(s) unix_close(s)
56
 
#define IOCTL_SOCKET    ioctl
57
 
#define SOCKET_ERRORNO  errno
58
 
 
59
 
#endif
60
 
 
61
 
#include <ctype.h>
62
 
#include <string.h>
63
 
#include <stdlib.h>
64
 
 
65
 
#include "CSSocket.h"
66
 
#include "CSStream.h"
67
 
#include "CSGlobal.h"
68
 
#include "CSStrUtil.h"
69
 
#include "CSFile.h"
70
 
 
71
 
void CSSocket::initSockets()
72
 
{
73
 
#ifdef OS_WINDOWS
74
 
        int             err;
75
 
        WSADATA data;
76
 
        WORD    version = MAKEWORD (1,1);
77
 
        static int inited = 0;
78
 
 
79
 
        if (!inited) {
80
 
                err = WSAStartup(version, &data);
81
 
 
82
 
                if (err != 0) {
83
 
                        CSException::throwException(CS_CONTEXT, err, "WSAStartup error");
84
 
                }
85
 
                
86
 
                inited = 1;
87
 
        }
88
 
        
89
 
#endif
90
 
}
91
 
 
92
 
/*
93
 
 * ---------------------------------------------------------------
94
 
 * CORE SYSTEM SOCKET FACTORY
95
 
 */
96
 
 
97
 
CSSocket *CSSocket::newSocket()
98
 
{
99
 
        CSSocket *s;
100
 
        
101
 
        new_(s, CSSocket());
102
 
        return s;
103
 
}
104
 
 
105
 
/*
106
 
 * ---------------------------------------------------------------
107
 
 * INTERNAL UTILITIES
108
 
 */
109
 
 
110
 
void CSSocket::formatAddress(size_t size, char *buffer)
111
 
{
112
 
        if (iHost) {
113
 
                cs_strcpy(size, buffer, iHost);
114
 
                if (iService)
115
 
                        cs_strcat(size, buffer, ":");
116
 
        }
117
 
        else
118
 
                *buffer = 0;
119
 
        if (iService)
120
 
                cs_strcat(size, buffer, iService);
121
 
}
122
 
 
123
 
void CSSocket::throwError(const char *func, const char *file, int line, char *address, int err)
124
 
{
125
 
        if (err)
126
 
                CSException::throwFileError(func, file, line, address, err);
127
 
        else
128
 
                CSException::throwEOFError(func, file, line, address);
129
 
}
130
 
 
131
 
void CSSocket::throwError(const char *func, const char *file, int line, int err)
132
 
{
133
 
        char address[CS_SOCKET_ADDRESS_SIZE];
134
 
 
135
 
        formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
136
 
        throwError(func, file, line, address, err);
137
 
}
138
 
 
139
 
void CSSocket::setNoDelay()
140
 
{
141
 
        int flag = 1;
142
 
 
143
 
        if (setsockopt(iHandle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) == -1)
144
 
                CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
145
 
}
146
 
 
147
 
void CSSocket::setNonBlocking()
148
 
{
149
 
        if (iTimeout) {
150
 
                unsigned long block = 1;
151
 
 
152
 
                if (IOCTL_SOCKET(iHandle, FIONBIO, &block) != 0)
153
 
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
154
 
        }
155
 
}
156
 
 
157
 
void CSSocket::setBlocking()
158
 
{
159
 
        /* No timeout, set blocking: */
160
 
        if (!iTimeout) {
161
 
                unsigned long block = 0;
162
 
 
163
 
                if (IOCTL_SOCKET(iHandle, FIONBIO, &block) != 0)
164
 
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
165
 
        }
166
 
}
167
 
 
168
 
void CSSocket::openInternal()
169
 
{
170
 
        iHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
171
 
        if (iHandle == -1)
172
 
                CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
173
 
        setNoDelay();
174
 
        setNonBlocking();
175
 
}
176
 
 
177
 
void CSSocket::writeBlock(const void *data, size_t len)
178
 
{
179
 
        ssize_t out;
180
 
 
181
 
        enter_();
182
 
        while (len > 0) {
183
 
                out = send(iHandle, (const char *) data, len, 0);
184
 
                self->interrupted();
185
 
                if (out == -1) {
186
 
                        int err = SOCKET_ERRORNO;
187
 
 
188
 
                        if (err == EWOULDBLOCK || err == EINTR)
189
 
                                continue;
190
 
                        throwError(CS_CONTEXT, err);
191
 
                }
192
 
                if ((size_t) out > len)
193
 
                        break;
194
 
                len -= (size_t) out;
195
 
                data = ((char *) data) + (size_t) out;
196
 
        }
197
 
        exit_();
198
 
}
199
 
 
200
 
int CSSocket::timeoutRead(CSThread *self, void *buffer, size_t length)
201
 
{      
202
 
        int                     in;
203
 
        uint64_t        start_time;
204
 
        uint64_t        timeout = iTimeout * 1000;
205
 
        
206
 
        start_time = CSTime::getTimeCurrentTicks();
207
 
 
208
 
        retry:
209
 
        in = recv(iHandle, (char *) buffer, length, 0);
210
 
        if (in == -1) {
211
 
                if (SOCKET_ERRORNO == EWOULDBLOCK) {
212
 
                        fd_set                  readfds;
213
 
                        uint64_t                time_diff;
214
 
                        struct timeval  tv_timeout;
215
 
 
216
 
                        FD_ZERO(&readfds);
217
 
                        self->interrupted();
218
 
 
219
 
                        time_diff = CSTime::getTimeCurrentTicks() - start_time;
220
 
                        if (time_diff >= timeout) {
221
 
                                char address[CS_SOCKET_ADDRESS_SIZE];
222
 
 
223
 
                                formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
224
 
                                CSException::throwExceptionf(CS_CONTEXT, CS_ERR_RECEIVE_TIMEOUT, "Receive timeout: %lu ms, on: %s", iTimeout, address);
225
 
                        }
226
 
 
227
 
                        /* Calculate how much time we can wait: */
228
 
                        time_diff = timeout - time_diff;
229
 
                        tv_timeout.tv_sec = (long)time_diff / 1000000;
230
 
                        tv_timeout.tv_usec = (long)time_diff % 1000000;
231
 
 
232
 
                        FD_SET(iHandle, &readfds);
233
 
                        in = select(iHandle+1, &readfds, NULL, NULL, &tv_timeout);
234
 
                        if (in != -1)
235
 
                                goto retry;
236
 
                }
237
 
        }
238
 
        return in;
239
 
}
240
 
 
241
 
/*
242
 
 * ---------------------------------------------------------------
243
 
 * SOCKET BASED ON THE STANDARD C SOCKET
244
 
 */
245
 
 
246
 
void CSSocket::setTimeout(uint32_t milli_sec)
247
 
{
248
 
        if (iTimeout != milli_sec) {
249
 
                if ((iTimeout = milli_sec))
250
 
                        setNonBlocking();
251
 
                else
252
 
                        setBlocking();
253
 
        }
254
 
}
255
 
 
256
 
CSOutputStream *CSSocket::getOutputStream()
257
 
{
258
 
        return CSSocketOutputStream::newStream(RETAIN(this));
259
 
}
260
 
 
261
 
CSInputStream *CSSocket::getInputStream()
262
 
{
263
 
        return CSSocketInputStream::newStream(RETAIN(this));
264
 
}
265
 
 
266
 
void CSSocket::publish(char *service, int default_port)
267
 
{
268
 
        enter_();
269
 
        close();
270
 
        try_(a) {
271
 
                struct servent          *servp;
272
 
                struct sockaddr_in      server;
273
 
                struct servent          s;
274
 
                int                                     flag = 1;
275
 
 
276
 
                openInternal();
277
 
                if (service) {
278
 
                        if (isdigit(service[0])) {
279
 
                                int i =  atoi(service);
280
 
 
281
 
                                if (!i)
282
 
                                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, service);
283
 
                                servp = &s;
284
 
                                s.s_port = htons((uint16_t) i);
285
 
                                iService = cs_strdup(service);
286
 
                        }
287
 
                        else if ((servp = getservbyname(service, "tcp")) == NULL) {
288
 
                                if (!default_port)
289
 
                                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, service);
290
 
                                servp = &s;
291
 
                                s.s_port = htons((uint16_t) default_port);
292
 
                                iService = cs_strdup(default_port);
293
 
                        }
294
 
                        else
295
 
                                iService = cs_strdup(service);
296
 
                }
297
 
                else {
298
 
                        if (!default_port)
299
 
                                CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, "");
300
 
                        servp = &s;
301
 
                        s.s_port = htons((uint16_t) default_port);
302
 
                        iService = cs_strdup(default_port);
303
 
                }
304
 
                        
305
 
                iPort = ntohs(servp->s_port);
306
 
 
307
 
                memset(&server, 0, sizeof(server));
308
 
                server.sin_family = AF_INET;
309
 
                server.sin_addr.s_addr = INADDR_ANY;
310
 
                server.sin_port = (uint16_t) servp->s_port;
311
 
 
312
 
                if (setsockopt(iHandle, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof(int)) == -1)
313
 
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
314
 
 
315
 
                if (bind(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
316
 
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
317
 
 
318
 
                if (listen(iHandle, SOMAXCONN) == -1)
319
 
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
320
 
        }
321
 
        catch_(a) {
322
 
                close();
323
 
                throw_();
324
 
        }
325
 
        cont_(a);
326
 
        exit_();
327
 
}
328
 
 
329
 
void CSSocket::open(CSSocket *listener)
330
 
{
331
 
        enter_();
332
 
 
333
 
        close();
334
 
        try_(a) {
335
 
                int listener_handle;
336
 
                char address[CS_SOCKET_ADDRESS_SIZE];
337
 
                struct sockaddr_in      remote;
338
 
                socklen_t                       addrlen = sizeof(remote);
339
 
 
340
 
                /* First get all the information we need from the listener: */
341
 
                listener_handle = ((CSSocket *) listener)->iHandle;
342
 
                listener->formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
343
 
 
344
 
                /* I want to make sure no error occurs after the connect!
345
 
                 * So I allocate a buffer for the host name up front.
346
 
                 * This means it may be to small, but this is not a problem
347
 
                 * because the host name stored here is is only used for display
348
 
                 * of error message etc.
349
 
                 */
350
 
                iHost = (char *) cs_malloc(100);
351
 
                iHandle = accept(listener_handle, (struct sockaddr *) &remote, &addrlen);
352
 
                if (iHandle == -1)
353
 
                        throwError(CS_CONTEXT, address, SOCKET_ERRORNO);
354
 
 
355
 
                cs_strcpy(100, iHost, inet_ntoa(remote.sin_addr));
356
 
                iPort = ntohs(remote.sin_port);
357
 
 
358
 
                setNoDelay();
359
 
                setNonBlocking();
360
 
        }
361
 
        catch_(a) {
362
 
                close();
363
 
                throw_();
364
 
        }
365
 
        cont_(a);
366
 
        exit_();
367
 
}
368
 
 
369
 
void CSSocket::open(char *address, int default_port)
370
 
{
371
 
        enter_();
372
 
        close();
373
 
        try_(a) {
374
 
                char                            *portp = strchr(address, ':');
375
 
                struct servent          s;
376
 
                struct servent          *servp;
377
 
                struct hostent          *hostp;
378
 
                struct sockaddr_in      server;
379
 
 
380
 
                openInternal();
381
 
                if (!portp) {
382
 
                        iHost = cs_strdup(address);
383
 
                        if (!default_port)
384
 
                                CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, address);
385
 
                        iService = cs_strdup(default_port);
386
 
                }
387
 
                else {
388
 
                        iHost = cs_strdup(address, (size_t) (portp - address));
389
 
                        iService = cs_strdup(portp+1);
390
 
                }
391
 
        
392
 
                if (isdigit(iService[0])) {
393
 
                        int i =  atoi(iService);
394
 
 
395
 
                        if (!i)
396
 
                                CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, address);
397
 
                        servp = &s;
398
 
                        s.s_port = htons((uint16_t) i);
399
 
                }
400
 
                else if ((servp = getservbyname(iService, "tcp")) == NULL)
401
 
                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, iService);
402
 
                iPort = (int) ntohs(servp->s_port);
403
 
 
404
 
                if ((hostp = gethostbyname(iHost)) == 0)
405
 
                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_HOST, iHost);
406
 
 
407
 
                memset(&server, 0, sizeof(server));
408
 
                server.sin_family = AF_INET;
409
 
                memcpy(&server.sin_addr, hostp->h_addr, (size_t) hostp->h_length);
410
 
                server.sin_port = (uint16_t) servp->s_port;
411
 
                if (connect(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
412
 
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
413
 
        }
414
 
        catch_(a) {
415
 
                close();
416
 
                throw_();
417
 
        }
418
 
        cont_(a);
419
 
        exit_();
420
 
}
421
 
 
422
 
void CSSocket::close()
423
 
{
424
 
        flush();
425
 
        if (iHandle != -1) {
426
 
                shutdown(iHandle, SHUT_RDWR);
427
 
                /* shutdown does not close the socket!!? */
428
 
                CLOSE_SOCKET(iHandle);
429
 
                iHandle = -1;
430
 
        }
431
 
        if (iHost) {
432
 
                cs_free(iHost);
433
 
                iHost = NULL;
434
 
        }
435
 
        if (iService) {
436
 
                cs_free(iService);
437
 
                iService = NULL;
438
 
        }
439
 
        if (iIdentity) {
440
 
                cs_free(iIdentity);
441
 
                iIdentity = NULL;
442
 
        }
443
 
        iPort = 0;
444
 
}
445
 
 
446
 
size_t CSSocket::read(void *data, size_t len)
447
 
{
448
 
        ssize_t in;
449
 
 
450
 
        enter_();
451
 
        /* recv, by default will block until at lease one byte is
452
 
         * returned.
453
 
         * So a return of zero means EOF!
454
 
         */
455
 
        retry:
456
 
        if (iTimeout)
457
 
                in = timeoutRead(self, data, len);
458
 
        else
459
 
                in = recv(iHandle, (char *) data, len, 0);
460
 
        self->interrupted();
461
 
        if (in == -1) {
462
 
                /* Note, we actually ignore all errors on the socket.
463
 
                 * If no data was returned by the read so far, then
464
 
                 * the error will be considered EOF.
465
 
                 */
466
 
                int err = SOCKET_ERRORNO;
467
 
 
468
 
                if (err == EWOULDBLOCK || err == EINTR)
469
 
                        goto retry;
470
 
                throwError(CS_CONTEXT, err);
471
 
                in = 0;
472
 
        }
473
 
        return_((size_t) in);
474
 
}
475
 
 
476
 
int CSSocket::read()
477
 
{
478
 
        int             ch;
479
 
        u_char  buffer[1];
480
 
 
481
 
        enter_();
482
 
        if (read(buffer, 1) == 1)
483
 
                ch = buffer[0];
484
 
        else
485
 
                ch = -1;
486
 
        return_(ch);
487
 
}
488
 
 
489
 
int CSSocket::peek()
490
 
{
491
 
        return -1;
492
 
}
493
 
 
494
 
void CSSocket::write(const void *data, size_t len)
495
 
{
496
 
#ifdef CS_USE_OUTPUT_BUFFER
497
 
        if (len <= CS_MIN_WRITE_SIZE) {
498
 
                if (iDataLen + len > CS_OUTPUT_BUFFER_SIZE) {
499
 
                        /* This is the amount of data that will still fit
500
 
                         * intp the buffer:
501
 
                         */
502
 
                        size_t tfer = CS_OUTPUT_BUFFER_SIZE - iDataLen;
503
 
 
504
 
                        memcpy(iOutputBuffer + iDataLen, data, tfer);
505
 
                        flush();
506
 
                        len -= tfer;
507
 
                        memcpy(iOutputBuffer, ((char *) data) + tfer, len);
508
 
                        iDataLen = len;
509
 
                }
510
 
                else {
511
 
                        memcpy(iOutputBuffer + iDataLen, data, len);
512
 
                        iDataLen += len;
513
 
                }
514
 
        }
515
 
        else {
516
 
                /* If the block give is large enough, the
517
 
                 * writing directly from the block saves copying the
518
 
                 * data to the local output buffer buffer:
519
 
                 */
520
 
                flush();
521
 
                writeBlock(data, len);
522
 
        }
523
 
#else
524
 
        writeBlock(data, len);
525
 
#endif
526
 
}
527
 
 
528
 
void CSSocket::write(char ch)
529
 
{
530
 
        enter_();
531
 
        writeBlock(&ch, 1);
532
 
        exit_();
533
 
}
534
 
 
535
 
void CSSocket::flush()
536
 
{
537
 
#ifdef CS_USE_OUTPUT_BUFFER
538
 
        uint32_t len;
539
 
 
540
 
        if ((len = iDataLen)) {
541
 
                iDataLen = 0;
542
 
                /* Note: we discard the data to be written if an
543
 
                 * exception occurs.
544
 
                 */
545
 
                writeBlock(iOutputBuffer, len);
546
 
        }
547
 
#endif
548
 
}
549
 
 
550
 
const char *CSSocket::identify()
551
 
{
552
 
        enter_();
553
 
        if (!iIdentity) {
554
 
                char buffer[200];
555
 
 
556
 
                formatAddress(200, buffer);
557
 
                iIdentity = cs_strdup(buffer);
558
 
        }
559
 
        return_(iIdentity);
560
 
}
561
 
 
562
 
 
563