~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSSocket.cc

Merge Joe, plus I updated the tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
2
2
 *
3
3
 * PrimeBase Media Stream for MySQL
4
4
 *
14
14
 *
15
15
 * You should have received a copy of the GNU General Public License
16
16
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
18
 *
19
19
 * Original author: Paul McCullagh (H&G2JCtL)
20
20
 * Continued development: Barry Leslie
30
30
 
31
31
#include <stdio.h>
32
32
#include <sys/types.h>
33
 
 
34
 
#ifdef OS_WINDOWS
35
 
#include <winsock.h>
36
 
typedef int socklen_t;
37
 
#define SHUT_RDWR 2
38
 
#define CLOSE_SOCKET(s) closesocket(s)
39
 
#define IOCTL_SOCKET    ioctlsocket
40
 
#define SOCKET_ERRORNO  WSAGetLastError()
41
 
#define EWOULDBLOCK             WSAEWOULDBLOCK
42
 
 
43
 
#else
44
 
#include <sys/ioctl.h>
45
33
#include <sys/socket.h>
46
34
#include <netdb.h>
47
35
#include <netinet/in.h>
48
36
#include <arpa/inet.h>
49
37
#include <netinet/in.h>
50
38
#include <netinet/tcp.h>
51
 
#include <sys/select.h>
52
 
#include <fcntl.h>
53
 
 
54
 
extern void unix_close(int h);
55
 
#define CLOSE_SOCKET(s) unix_close(s)
56
 
#define IOCTL_SOCKET    ioctl
57
 
#define SOCKET_ERRORNO  errno
58
 
 
59
 
#endif
60
 
 
61
39
#include <ctype.h>
62
40
#include <string.h>
63
41
#include <stdlib.h>
68
46
#include "CSStrUtil.h"
69
47
#include "CSFile.h"
70
48
 
71
 
void CSSocket::initSockets()
72
 
{
73
 
#ifdef OS_WINDOWS
74
 
        int             err;
75
 
        WSADATA data;
76
 
        WORD    version = MAKEWORD (1,1);
77
 
        static int inited = 0;
78
 
 
79
 
        if (!inited) {
80
 
                err = WSAStartup(version, &data);
81
 
 
82
 
                if (err != 0) {
83
 
                        CSException::throwException(CS_CONTEXT, err, "WSAStartup error");
84
 
                }
85
 
                
86
 
                inited = 1;
87
 
        }
88
 
        
89
 
#endif
90
 
}
91
 
 
92
49
/*
93
50
 * ---------------------------------------------------------------
94
51
 * CORE SYSTEM SOCKET FACTORY
96
53
 
97
54
CSSocket *CSSocket::newSocket()
98
55
{
99
 
        CSSocket *s;
 
56
        SCSocket *s;
100
57
        
101
 
        new_(s, CSSocket());
102
 
        return s;
 
58
        new_(s, SCSocket());
 
59
        return (CSSocket *) s;
103
60
}
104
61
 
105
62
/*
107
64
 * INTERNAL UTILITIES
108
65
 */
109
66
 
110
 
void CSSocket::formatAddress(size_t size, char *buffer)
 
67
void SCSocket::formatAddress(size_t size, char *buffer)
111
68
{
112
69
        if (iHost) {
113
70
                cs_strcpy(size, buffer, iHost);
120
77
                cs_strcat(size, buffer, iService);
121
78
}
122
79
 
123
 
void CSSocket::throwError(const char *func, const char *file, int line, char *address, int err)
 
80
void SCSocket::throwError(const char *func, const char *file, int line, char *address, int err)
124
81
{
125
82
        if (err)
126
83
                CSException::throwFileError(func, file, line, address, err);
128
85
                CSException::throwEOFError(func, file, line, address);
129
86
}
130
87
 
131
 
void CSSocket::throwError(const char *func, const char *file, int line, int err)
 
88
void SCSocket::throwError(const char *func, const char *file, int line, int err)
132
89
{
133
90
        char address[CS_SOCKET_ADDRESS_SIZE];
134
91
 
136
93
        throwError(func, file, line, address, err);
137
94
}
138
95
 
139
 
void CSSocket::setNoDelay()
 
96
void SCSocket::setInternalOptions()
140
97
{
141
98
        int flag = 1;
142
99
 
143
100
        if (setsockopt(iHandle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) == -1)
144
 
                CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
145
 
}
146
 
 
147
 
void CSSocket::setNonBlocking()
148
 
{
149
 
        if (iTimeout) {
150
 
                unsigned long block = 1;
151
 
 
152
 
                if (IOCTL_SOCKET(iHandle, FIONBIO, &block) != 0)
153
 
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
154
 
        }
155
 
}
156
 
 
157
 
void CSSocket::setBlocking()
158
 
{
159
 
        /* No timeout, set blocking: */
160
 
        if (!iTimeout) {
161
 
                unsigned long block = 0;
162
 
 
163
 
                if (IOCTL_SOCKET(iHandle, FIONBIO, &block) != 0)
164
 
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
165
 
        }
166
 
}
167
 
 
168
 
void CSSocket::openInternal()
169
 
{
170
 
        iHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
 
101
                CSException::throwOSError(CS_CONTEXT, errno);
 
102
}
 
103
 
 
104
void SCSocket::openInternal()
 
105
{
 
106
        iHandle = socket(AF_INET, SOCK_STREAM, 0);
171
107
        if (iHandle == -1)
172
 
                CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
173
 
        setNoDelay();
174
 
        setNonBlocking();
175
 
}
176
 
 
177
 
void CSSocket::writeBlock(const void *data, size_t len)
178
 
{
179
 
        ssize_t out;
180
 
 
181
 
        enter_();
182
 
        while (len > 0) {
183
 
                out = send(iHandle, (const char *) data, len, 0);
184
 
                self->interrupted();
185
 
                if (out == -1) {
186
 
                        int err = SOCKET_ERRORNO;
187
 
 
188
 
                        if (err == EWOULDBLOCK || err == EINTR)
189
 
                                continue;
190
 
                        throwError(CS_CONTEXT, err);
191
 
                }
192
 
                if ((size_t) out > len)
193
 
                        break;
194
 
                len -= (size_t) out;
195
 
                data = ((char *) data) + (size_t) out;
196
 
        }
197
 
        exit_();
198
 
}
199
 
 
200
 
int CSSocket::timeoutRead(CSThread *self, void *buffer, size_t length)
201
 
{      
202
 
        int                     in;
203
 
        uint64_t        start_time;
204
 
        uint64_t        timeout = iTimeout * 1000;
205
 
        
206
 
        start_time = CSTime::getTimeCurrentTicks();
207
 
 
208
 
        retry:
209
 
        in = recv(iHandle, (char *) buffer, length, 0);
210
 
        if (in == -1) {
211
 
                if (SOCKET_ERRORNO == EWOULDBLOCK) {
212
 
                        fd_set                  readfds;
213
 
                        uint64_t                time_diff;
214
 
                        struct timeval  tv_timeout;
215
 
 
216
 
                        FD_ZERO(&readfds);
217
 
                        self->interrupted();
218
 
 
219
 
                        time_diff = CSTime::getTimeCurrentTicks() - start_time;
220
 
                        if (time_diff >= timeout) {
221
 
                                char address[CS_SOCKET_ADDRESS_SIZE];
222
 
 
223
 
                                formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
224
 
                                CSException::throwExceptionf(CS_CONTEXT, CS_ERR_RECEIVE_TIMEOUT, "Receive timeout: %lu ms, on: %s", iTimeout, address);
225
 
                        }
226
 
 
227
 
                        /* Calculate how much time we can wait: */
228
 
                        time_diff = timeout - time_diff;
229
 
                        tv_timeout.tv_sec = (long)time_diff / 1000000;
230
 
                        tv_timeout.tv_usec = (long)time_diff % 1000000;
231
 
 
232
 
                        FD_SET(iHandle, &readfds);
233
 
                        in = select(iHandle+1, &readfds, NULL, NULL, &tv_timeout);
234
 
                        if (in != -1)
235
 
                                goto retry;
236
 
                }
237
 
        }
238
 
        return in;
 
108
                CSException::throwOSError(CS_CONTEXT, errno);
 
109
        setInternalOptions();
239
110
}
240
111
 
241
112
/*
243
114
 * SOCKET BASED ON THE STANDARD C SOCKET
244
115
 */
245
116
 
246
 
void CSSocket::setTimeout(uint32_t milli_sec)
247
 
{
248
 
        if (iTimeout != milli_sec) {
249
 
                if ((iTimeout = milli_sec))
250
 
                        setNonBlocking();
251
 
                else
252
 
                        setBlocking();
253
 
        }
254
 
}
255
 
 
256
117
CSOutputStream *CSSocket::getOutputStream()
257
118
{
258
119
        return CSSocketOutputStream::newStream(RETAIN(this));
263
124
        return CSSocketInputStream::newStream(RETAIN(this));
264
125
}
265
126
 
266
 
void CSSocket::publish(char *service, int default_port)
 
127
void SCSocket::publish(char *service, int default_port)
267
128
{
268
129
        enter_();
269
130
        close();
310
171
                server.sin_port = (uint16_t) servp->s_port;
311
172
 
312
173
                if (setsockopt(iHandle, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof(int)) == -1)
313
 
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
 
174
                        CSException::throwOSError(CS_CONTEXT, errno);
314
175
 
315
176
                if (bind(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
316
 
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
 
177
                        CSException::throwOSError(CS_CONTEXT, errno);
317
178
 
318
179
                if (listen(iHandle, SOMAXCONN) == -1)
319
 
                        CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
 
180
                        CSException::throwOSError(CS_CONTEXT, errno);
320
181
        }
321
182
        catch_(a) {
322
183
                close();
326
187
        exit_();
327
188
}
328
189
 
329
 
void CSSocket::open(CSSocket *listener)
 
190
void SCSocket::open(CSSocket *listener)
330
191
{
331
192
        enter_();
332
193
 
338
199
                socklen_t                       addrlen = sizeof(remote);
339
200
 
340
201
                /* First get all the information we need from the listener: */
341
 
                listener_handle = ((CSSocket *) listener)->iHandle;
 
202
                listener_handle = ((SCSocket *) listener)->iHandle;
342
203
                listener->formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
343
204
 
344
205
                /* I want to make sure no error occurs after the connect!
350
211
                iHost = (char *) cs_malloc(100);
351
212
                iHandle = accept(listener_handle, (struct sockaddr *) &remote, &addrlen);
352
213
                if (iHandle == -1)
353
 
                        throwError(CS_CONTEXT, address, SOCKET_ERRORNO);
 
214
                        throwError(CS_CONTEXT, address, errno);
354
215
 
355
216
                cs_strcpy(100, iHost, inet_ntoa(remote.sin_addr));
356
217
                iPort = ntohs(remote.sin_port);
357
218
 
358
 
                setNoDelay();
359
 
                setNonBlocking();
 
219
                setInternalOptions();
360
220
        }
361
221
        catch_(a) {
362
222
                close();
366
226
        exit_();
367
227
}
368
228
 
369
 
void CSSocket::open(char *address, int default_port)
 
229
void SCSocket::open(char *address, int default_port)
370
230
{
371
231
        enter_();
372
232
        close();
409
269
                memcpy(&server.sin_addr, hostp->h_addr, (size_t) hostp->h_length);
410
270
                server.sin_port = (uint16_t) servp->s_port;
411
271
                if (connect(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
412
 
                        throwError(CS_CONTEXT, SOCKET_ERRORNO);
 
272
                        throwError(CS_CONTEXT, errno);
413
273
        }
414
274
        catch_(a) {
415
275
                close();
419
279
        exit_();
420
280
}
421
281
 
422
 
void CSSocket::close()
 
282
void SCSocket::close()
423
283
{
424
 
        flush();
425
284
        if (iHandle != -1) {
426
285
                shutdown(iHandle, SHUT_RDWR);
427
286
                /* shutdown does not close the socket!!? */
428
 
                CLOSE_SOCKET(iHandle);
 
287
                unix_file_close(iHandle);
429
288
                iHandle = -1;
430
289
        }
431
290
        if (iHost) {
436
295
                cs_free(iService);
437
296
                iService = NULL;
438
297
        }
439
 
        if (iIdentity) {
440
 
                cs_free(iIdentity);
441
 
                iIdentity = NULL;
442
 
        }
443
298
        iPort = 0;
444
299
}
445
300
 
446
 
size_t CSSocket::read(void *data, size_t len)
 
301
size_t SCSocket::read(void *data, size_t len)
447
302
{
448
303
        ssize_t in;
449
304
 
453
308
         * So a return of zero means EOF!
454
309
         */
455
310
        retry:
456
 
        if (iTimeout)
457
 
                in = timeoutRead(self, data, len);
458
 
        else
459
 
                in = recv(iHandle, (char *) data, len, 0);
 
311
        in = recv(iHandle, data, len, 0);
460
312
        self->interrupted();
461
313
        if (in == -1) {
462
314
                /* Note, we actually ignore all errors on the socket.
463
315
                 * If no data was returned by the read so far, then
464
316
                 * the error will be considered EOF.
465
317
                 */
466
 
                int err = SOCKET_ERRORNO;
467
 
 
468
 
                if (err == EWOULDBLOCK || err == EINTR)
 
318
                if (errno == EAGAIN || errno == EINTR)
469
319
                        goto retry;
470
 
                throwError(CS_CONTEXT, err);
471
320
                in = 0;
472
321
        }
473
322
        return_((size_t) in);
474
323
}
475
324
 
476
 
int CSSocket::read()
 
325
int SCSocket::read()
477
326
{
478
327
        int             ch;
479
328
        u_char  buffer[1];
486
335
        return_(ch);
487
336
}
488
337
 
489
 
int CSSocket::peek()
 
338
int SCSocket::peek()
490
339
{
491
340
        return -1;
492
341
}
493
342
 
494
 
void CSSocket::write(const void *data, size_t len)
495
 
{
496
 
#ifdef CS_USE_OUTPUT_BUFFER
497
 
        if (len <= CS_MIN_WRITE_SIZE) {
498
 
                if (iDataLen + len > CS_OUTPUT_BUFFER_SIZE) {
499
 
                        /* This is the amount of data that will still fit
500
 
                         * intp the buffer:
501
 
                         */
502
 
                        size_t tfer = CS_OUTPUT_BUFFER_SIZE - iDataLen;
503
 
 
504
 
                        memcpy(iOutputBuffer + iDataLen, data, tfer);
505
 
                        flush();
506
 
                        len -= tfer;
507
 
                        memcpy(iOutputBuffer, ((char *) data) + tfer, len);
508
 
                        iDataLen = len;
509
 
                }
510
 
                else {
511
 
                        memcpy(iOutputBuffer + iDataLen, data, len);
512
 
                        iDataLen += len;
513
 
                }
514
 
        }
515
 
        else {
516
 
                /* If the block give is large enough, the
517
 
                 * writing directly from the block saves copying the
518
 
                 * data to the local output buffer buffer:
519
 
                 */
520
 
                flush();
521
 
                writeBlock(data, len);
522
 
        }
523
 
#else
524
 
        writeBlock(data, len);
525
 
#endif
526
 
}
527
 
 
528
 
void CSSocket::write(char ch)
529
 
{
530
 
        enter_();
531
 
        writeBlock(&ch, 1);
532
 
        exit_();
533
 
}
534
 
 
535
 
void CSSocket::flush()
536
 
{
537
 
#ifdef CS_USE_OUTPUT_BUFFER
538
 
        uint32_t len;
539
 
 
540
 
        if ((len = iDataLen)) {
541
 
                iDataLen = 0;
542
 
                /* Note: we discard the data to be written if an
543
 
                 * exception occurs.
544
 
                 */
545
 
                writeBlock(iOutputBuffer, len);
546
 
        }
547
 
#endif
548
 
}
549
 
 
550
 
const char *CSSocket::identify()
551
 
{
552
 
        enter_();
553
 
        if (!iIdentity) {
554
 
                char buffer[200];
555
 
 
556
 
                formatAddress(200, buffer);
557
 
                iIdentity = cs_strdup(buffer);
558
 
        }
559
 
        return_(iIdentity);
560
 
}
561
 
 
 
343
void SCSocket::write(const void *data, size_t len)
 
344
{
 
345
        ssize_t out;
 
346
 
 
347
        enter_();
 
348
        while (len > 0) {
 
349
                out = send(iHandle, data, len, 0);
 
350
                self->interrupted();
 
351
                if (out == -1) {
 
352
                        int err = errno;
 
353
 
 
354
                        if (err == EAGAIN || errno == EINTR)
 
355
                                continue;
 
356
                        throwError(CS_CONTEXT, err);
 
357
                }
 
358
                if ((size_t) out > len)
 
359
                        break;
 
360
                len -= (size_t) out;
 
361
                data = ((char *) data) + (size_t) out;
 
362
        }
 
363
        exit_();
 
364
}
 
365
 
 
366
void SCSocket::write(char ch)
 
367
{
 
368
        enter_();
 
369
        write(&ch, 1);
 
370
        exit_();
 
371
}
 
372
 
 
373
void SCSocket::flush()
 
374
{
 
375
}
562
376
 
563
377