~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSSocket.cc

  • Committer: Stewart Smith
  • Date: 2010-11-03 03:27:09 UTC
  • mto: (1902.1.1 build) (1910.1.2 build)
  • mto: This revision was merged to the branch mainline in revision 1903.
  • Revision ID: stewart@flamingspork.com-20101103032709-oyvfrc6eb8fzj0mr
fix docs warning: docs/unlock.rst:2: (WARNING/2) Title underline too short.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 
18
 *
 
19
 * Original author: Paul McCullagh (H&G2JCtL)
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-05-24
 
23
 *
 
24
 * CORE SYSTEM:
 
25
 * Basic socket I/O.
 
26
 *
 
27
 */
 
28
 
 
29
#include "CSConfig.h"
 
30
 
 
31
#include <stdio.h>
 
32
#include <sys/types.h>
 
33
 
 
34
#ifdef OS_WINDOWS
 
35
#include <winsock.h>
 
36
typedef int socklen_t;
 
37
#define SHUT_RDWR 2
 
38
#define CLOSE_SOCKET(s) closesocket(s)
 
39
#define IOCTL_SOCKET    ioctlsocket
 
40
#define SOCKET_ERRORNO  WSAGetLastError()
 
41
#define EWOULDBLOCK             WSAEWOULDBLOCK
 
42
 
 
43
#else
 
44
#include <sys/ioctl.h>
 
45
#include <sys/socket.h>
 
46
#include <netdb.h>
 
47
#include <netinet/in.h>
 
48
#include <arpa/inet.h>
 
49
#include <netinet/in.h>
 
50
#include <netinet/tcp.h>
 
51
#include <sys/select.h>
 
52
#include <fcntl.h>
 
53
 
 
54
extern void unix_close(int h);
 
55
#define CLOSE_SOCKET(s) unix_close(s)
 
56
#define IOCTL_SOCKET    ioctl
 
57
#define SOCKET_ERRORNO  errno
 
58
 
 
59
#endif
 
60
 
 
61
#include <ctype.h>
 
62
#include <string.h>
 
63
#include <stdlib.h>
 
64
 
 
65
#include "CSSocket.h"
 
66
#include "CSStream.h"
 
67
#include "CSGlobal.h"
 
68
#include "CSStrUtil.h"
 
69
#include "CSFile.h"
 
70
 
 
71
void CSSocket::initSockets()
 
72
{
 
73
#ifdef OS_WINDOWS
 
74
        int             err;
 
75
        WSADATA data;
 
76
        WORD    version = MAKEWORD (1,1);
 
77
        static int inited = 0;
 
78
 
 
79
        if (!inited) {
 
80
                err = WSAStartup(version, &data);
 
81
 
 
82
                if (err != 0) {
 
83
                        CSException::throwException(CS_CONTEXT, err, "WSAStartup error");
 
84
                }
 
85
                
 
86
                inited = 1;
 
87
        }
 
88
        
 
89
#endif
 
90
}
 
91
 
 
92
/*
 
93
 * ---------------------------------------------------------------
 
94
 * CORE SYSTEM SOCKET FACTORY
 
95
 */
 
96
 
 
97
CSSocket *CSSocket::newSocket()
 
98
{
 
99
        CSSocket *s;
 
100
        
 
101
        new_(s, CSSocket());
 
102
        return s;
 
103
}
 
104
 
 
105
/*
 
106
 * ---------------------------------------------------------------
 
107
 * INTERNAL UTILITIES
 
108
 */
 
109
 
 
110
void CSSocket::formatAddress(size_t size, char *buffer)
 
111
{
 
112
        if (iHost) {
 
113
                cs_strcpy(size, buffer, iHost);
 
114
                if (iService)
 
115
                        cs_strcat(size, buffer, ":");
 
116
        }
 
117
        else
 
118
                *buffer = 0;
 
119
        if (iService)
 
120
                cs_strcat(size, buffer, iService);
 
121
}
 
122
 
 
123
void CSSocket::throwError(const char *func, const char *file, int line, char *address, int err)
 
124
{
 
125
        if (err)
 
126
                CSException::throwFileError(func, file, line, address, err);
 
127
        else
 
128
                CSException::throwEOFError(func, file, line, address);
 
129
}
 
130
 
 
131
void CSSocket::throwError(const char *func, const char *file, int line, int err)
 
132
{
 
133
        char address[CS_SOCKET_ADDRESS_SIZE];
 
134
 
 
135
        formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
 
136
        throwError(func, file, line, address, err);
 
137
}
 
138
 
 
139
void CSSocket::setNoDelay()
 
140
{
 
141
        int flag = 1;
 
142
 
 
143
        if (setsockopt(iHandle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) == -1)
 
144
                CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
 
145
}
 
146
 
 
147
void CSSocket::setNonBlocking()
 
148
{
 
149
        if (iTimeout) {
 
150
                unsigned long block = 1;
 
151
 
 
152
                if (IOCTL_SOCKET(iHandle, FIONBIO, &block) != 0)
 
153
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
 
154
        }
 
155
}
 
156
 
 
157
void CSSocket::setBlocking()
 
158
{
 
159
        /* No timeout, set blocking: */
 
160
        if (!iTimeout) {
 
161
                unsigned long block = 0;
 
162
 
 
163
                if (IOCTL_SOCKET(iHandle, FIONBIO, &block) != 0)
 
164
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
 
165
        }
 
166
}
 
167
 
 
168
void CSSocket::openInternal()
 
169
{
 
170
        iHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
 
171
        if (iHandle == -1)
 
172
                CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
 
173
        setNoDelay();
 
174
        setNonBlocking();
 
175
}
 
176
 
 
177
void CSSocket::writeBlock(const void *data, size_t len)
 
178
{
 
179
        ssize_t out;
 
180
 
 
181
        enter_();
 
182
        while (len > 0) {
 
183
                out = send(iHandle, (const char *) data, len, 0);
 
184
                self->interrupted();
 
185
                if (out == -1) {
 
186
                        int err = SOCKET_ERRORNO;
 
187
 
 
188
                        if (err == EWOULDBLOCK || err == EINTR)
 
189
                                continue;
 
190
                        throwError(CS_CONTEXT, err);
 
191
                }
 
192
                if ((size_t) out > len)
 
193
                        break;
 
194
                len -= (size_t) out;
 
195
                data = ((char *) data) + (size_t) out;
 
196
        }
 
197
        exit_();
 
198
}
 
199
 
 
200
int CSSocket::timeoutRead(CSThread *self, void *buffer, size_t length)
 
201
{      
 
202
        int                     in;
 
203
        uint64_t        start_time;
 
204
        uint64_t        timeout = iTimeout * 1000;
 
205
        
 
206
        start_time = CSTime::getTimeCurrentTicks();
 
207
 
 
208
        retry:
 
209
        in = recv(iHandle, (char *) buffer, length, 0);
 
210
        if (in == -1) {
 
211
                if (SOCKET_ERRORNO == EWOULDBLOCK) {
 
212
                        fd_set                  readfds;
 
213
                        uint64_t                time_diff;
 
214
                        struct timeval  tv_timeout;
 
215
 
 
216
                        FD_ZERO(&readfds);
 
217
                        self->interrupted();
 
218
 
 
219
                        time_diff = CSTime::getTimeCurrentTicks() - start_time;
 
220
                        if (time_diff >= timeout) {
 
221
                                char address[CS_SOCKET_ADDRESS_SIZE];
 
222
 
 
223
                                formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
 
224
                                CSException::throwExceptionf(CS_CONTEXT, CS_ERR_RECEIVE_TIMEOUT, "Receive timeout: %lu ms, on: %s", iTimeout, address);
 
225
                        }
 
226
 
 
227
                        /* Calculate how much time we can wait: */
 
228
                        time_diff = timeout - time_diff;
 
229
                        tv_timeout.tv_sec = (long)time_diff / 1000000;
 
230
                        tv_timeout.tv_usec = (long)time_diff % 1000000;
 
231
 
 
232
                        FD_SET(iHandle, &readfds);
 
233
                        in = select(iHandle+1, &readfds, NULL, NULL, &tv_timeout);
 
234
                        if (in != -1)
 
235
                                goto retry;
 
236
                }
 
237
        }
 
238
        return in;
 
239
}
 
240
 
 
241
/*
 
242
 * ---------------------------------------------------------------
 
243
 * SOCKET BASED ON THE STANDARD C SOCKET
 
244
 */
 
245
 
 
246
void CSSocket::setTimeout(uint32_t milli_sec)
 
247
{
 
248
        if (iTimeout != milli_sec) {
 
249
                if ((iTimeout = milli_sec))
 
250
                        setNonBlocking();
 
251
                else
 
252
                        setBlocking();
 
253
        }
 
254
}
 
255
 
 
256
CSOutputStream *CSSocket::getOutputStream()
 
257
{
 
258
        return CSSocketOutputStream::newStream(RETAIN(this));
 
259
}
 
260
 
 
261
CSInputStream *CSSocket::getInputStream()
 
262
{
 
263
        return CSSocketInputStream::newStream(RETAIN(this));
 
264
}
 
265
 
 
266
void CSSocket::publish(char *service, int default_port)
 
267
{
 
268
        enter_();
 
269
        close();
 
270
        try_(a) {
 
271
                struct servent          *servp;
 
272
                struct sockaddr_in      server;
 
273
                struct servent          s;
 
274
                int                                     flag = 1;
 
275
 
 
276
                openInternal();
 
277
                if (service) {
 
278
                        if (isdigit(service[0])) {
 
279
                                int i =  atoi(service);
 
280
 
 
281
                                if (!i)
 
282
                                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, service);
 
283
                                servp = &s;
 
284
                                s.s_port = htons((uint16_t) i);
 
285
                                iService = cs_strdup(service);
 
286
                        }
 
287
                        else if ((servp = getservbyname(service, "tcp")) == NULL) {
 
288
                                if (!default_port)
 
289
                                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, service);
 
290
                                servp = &s;
 
291
                                s.s_port = htons((uint16_t) default_port);
 
292
                                iService = cs_strdup(default_port);
 
293
                        }
 
294
                        else
 
295
                                iService = cs_strdup(service);
 
296
                }
 
297
                else {
 
298
                        if (!default_port)
 
299
                                CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, "");
 
300
                        servp = &s;
 
301
                        s.s_port = htons((uint16_t) default_port);
 
302
                        iService = cs_strdup(default_port);
 
303
                }
 
304
                        
 
305
                iPort = ntohs(servp->s_port);
 
306
 
 
307
                memset(&server, 0, sizeof(server));
 
308
                server.sin_family = AF_INET;
 
309
                server.sin_addr.s_addr = INADDR_ANY;
 
310
                server.sin_port = (uint16_t) servp->s_port;
 
311
 
 
312
                if (setsockopt(iHandle, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof(int)) == -1)
 
313
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
 
314
 
 
315
                if (bind(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
 
316
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
 
317
 
 
318
                if (listen(iHandle, SOMAXCONN) == -1)
 
319
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
 
320
        }
 
321
        catch_(a) {
 
322
                close();
 
323
                throw_();
 
324
        }
 
325
        cont_(a);
 
326
        exit_();
 
327
}
 
328
 
 
329
void CSSocket::open(CSSocket *listener)
 
330
{
 
331
        enter_();
 
332
 
 
333
        close();
 
334
        try_(a) {
 
335
                int listener_handle;
 
336
                char address[CS_SOCKET_ADDRESS_SIZE];
 
337
                struct sockaddr_in      remote;
 
338
                socklen_t                       addrlen = sizeof(remote);
 
339
 
 
340
                /* First get all the information we need from the listener: */
 
341
                listener_handle = ((CSSocket *) listener)->iHandle;
 
342
                listener->formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
 
343
 
 
344
                /* I want to make sure no error occurs after the connect!
 
345
                 * So I allocate a buffer for the host name up front.
 
346
                 * This means it may be to small, but this is not a problem
 
347
                 * because the host name stored here is is only used for display
 
348
                 * of error message etc.
 
349
                 */
 
350
                iHost = (char *) cs_malloc(100);
 
351
                iHandle = accept(listener_handle, (struct sockaddr *) &remote, &addrlen);
 
352
                if (iHandle == -1)
 
353
                        throwError(CS_CONTEXT, address, SOCKET_ERRORNO);
 
354
 
 
355
                cs_strcpy(100, iHost, inet_ntoa(remote.sin_addr));
 
356
                iPort = ntohs(remote.sin_port);
 
357
 
 
358
                setNoDelay();
 
359
                setNonBlocking();
 
360
        }
 
361
        catch_(a) {
 
362
                close();
 
363
                throw_();
 
364
        }
 
365
        cont_(a);
 
366
        exit_();
 
367
}
 
368
 
 
369
void CSSocket::open(char *address, int default_port)
 
370
{
 
371
        enter_();
 
372
        close();
 
373
        try_(a) {
 
374
                char                            *portp = strchr(address, ':');
 
375
                struct servent          s;
 
376
                struct servent          *servp;
 
377
                struct hostent          *hostp;
 
378
                struct sockaddr_in      server;
 
379
 
 
380
                openInternal();
 
381
                if (!portp) {
 
382
                        iHost = cs_strdup(address);
 
383
                        if (!default_port)
 
384
                                CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, address);
 
385
                        iService = cs_strdup(default_port);
 
386
                }
 
387
                else {
 
388
                        iHost = cs_strdup(address, (size_t) (portp - address));
 
389
                        iService = cs_strdup(portp+1);
 
390
                }
 
391
        
 
392
                if (isdigit(iService[0])) {
 
393
                        int i =  atoi(iService);
 
394
 
 
395
                        if (!i)
 
396
                                CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, address);
 
397
                        servp = &s;
 
398
                        s.s_port = htons((uint16_t) i);
 
399
                }
 
400
                else if ((servp = getservbyname(iService, "tcp")) == NULL)
 
401
                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, iService);
 
402
                iPort = (int) ntohs(servp->s_port);
 
403
 
 
404
                if ((hostp = gethostbyname(iHost)) == 0)
 
405
                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_HOST, iHost);
 
406
 
 
407
                memset(&server, 0, sizeof(server));
 
408
                server.sin_family = AF_INET;
 
409
                memcpy(&server.sin_addr, hostp->h_addr, (size_t) hostp->h_length);
 
410
                server.sin_port = (uint16_t) servp->s_port;
 
411
                if (connect(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
 
412
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
 
413
        }
 
414
        catch_(a) {
 
415
                close();
 
416
                throw_();
 
417
        }
 
418
        cont_(a);
 
419
        exit_();
 
420
}
 
421
 
 
422
void CSSocket::close()
 
423
{
 
424
        flush();
 
425
        if (iHandle != -1) {
 
426
                shutdown(iHandle, SHUT_RDWR);
 
427
                /* shutdown does not close the socket!!? */
 
428
                CLOSE_SOCKET(iHandle);
 
429
                iHandle = -1;
 
430
        }
 
431
        if (iHost) {
 
432
                cs_free(iHost);
 
433
                iHost = NULL;
 
434
        }
 
435
        if (iService) {
 
436
                cs_free(iService);
 
437
                iService = NULL;
 
438
        }
 
439
        if (iIdentity) {
 
440
                cs_free(iIdentity);
 
441
                iIdentity = NULL;
 
442
        }
 
443
        iPort = 0;
 
444
}
 
445
 
 
446
size_t CSSocket::read(void *data, size_t len)
 
447
{
 
448
        ssize_t in;
 
449
 
 
450
        enter_();
 
451
        /* recv, by default will block until at lease one byte is
 
452
         * returned.
 
453
         * So a return of zero means EOF!
 
454
         */
 
455
        retry:
 
456
        if (iTimeout)
 
457
                in = timeoutRead(self, data, len);
 
458
        else
 
459
                in = recv(iHandle, (char *) data, len, 0);
 
460
        self->interrupted();
 
461
        if (in == -1) {
 
462
                /* Note, we actually ignore all errors on the socket.
 
463
                 * If no data was returned by the read so far, then
 
464
                 * the error will be considered EOF.
 
465
                 */
 
466
                int err = SOCKET_ERRORNO;
 
467
 
 
468
                if (err == EWOULDBLOCK || err == EINTR)
 
469
                        goto retry;
 
470
                throwError(CS_CONTEXT, err);
 
471
                in = 0;
 
472
        }
 
473
        return_((size_t) in);
 
474
}
 
475
 
 
476
int CSSocket::read()
 
477
{
 
478
        int             ch;
 
479
        u_char  buffer[1];
 
480
 
 
481
        enter_();
 
482
        if (read(buffer, 1) == 1)
 
483
                ch = buffer[0];
 
484
        else
 
485
                ch = -1;
 
486
        return_(ch);
 
487
}
 
488
 
 
489
int CSSocket::peek()
 
490
{
 
491
        return -1;
 
492
}
 
493
 
 
494
void CSSocket::write(const void *data, size_t len)
 
495
{
 
496
#ifdef CS_USE_OUTPUT_BUFFER
 
497
        if (len <= CS_MIN_WRITE_SIZE) {
 
498
                if (iDataLen + len > CS_OUTPUT_BUFFER_SIZE) {
 
499
                        /* This is the amount of data that will still fit
 
500
                         * intp the buffer:
 
501
                         */
 
502
                        size_t tfer = CS_OUTPUT_BUFFER_SIZE - iDataLen;
 
503
 
 
504
                        memcpy(iOutputBuffer + iDataLen, data, tfer);
 
505
                        flush();
 
506
                        len -= tfer;
 
507
                        memcpy(iOutputBuffer, ((char *) data) + tfer, len);
 
508
                        iDataLen = len;
 
509
                }
 
510
                else {
 
511
                        memcpy(iOutputBuffer + iDataLen, data, len);
 
512
                        iDataLen += len;
 
513
                }
 
514
        }
 
515
        else {
 
516
                /* If the block give is large enough, the
 
517
                 * writing directly from the block saves copying the
 
518
                 * data to the local output buffer buffer:
 
519
                 */
 
520
                flush();
 
521
                writeBlock(data, len);
 
522
        }
 
523
#else
 
524
        writeBlock(data, len);
 
525
#endif
 
526
}
 
527
 
 
528
void CSSocket::write(char ch)
 
529
{
 
530
        enter_();
 
531
        writeBlock(&ch, 1);
 
532
        exit_();
 
533
}
 
534
 
 
535
void CSSocket::flush()
 
536
{
 
537
#ifdef CS_USE_OUTPUT_BUFFER
 
538
        uint32_t len;
 
539
 
 
540
        if ((len = iDataLen)) {
 
541
                iDataLen = 0;
 
542
                /* Note: we discard the data to be written if an
 
543
                 * exception occurs.
 
544
                 */
 
545
                writeBlock(iOutputBuffer, len);
 
546
        }
 
547
#endif
 
548
}
 
549
 
 
550
const char *CSSocket::identify()
 
551
{
 
552
        enter_();
 
553
        if (!iIdentity) {
 
554
                char buffer[200];
 
555
 
 
556
                formatAddress(200, buffer);
 
557
                iIdentity = cs_strdup(buffer);
 
558
        }
 
559
        return_(iIdentity);
 
560
}
 
561
 
 
562
 
 
563