~drizzle-trunk/drizzle/development

1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
2
 *
3
 * PrimeBase Media Stream for MySQL
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 *
19
 * Original author: Paul McCullagh (H&G2JCtL)
20
 * Continued development: Barry Leslie
21
 *
22
 * 2007-05-24
23
 *
24
 * CORE SYSTEM:
25
 * Basic socket I/O.
26
 *
27
 */
28
29
#include "CSConfig.h"
30
31
#include <stdio.h>
32
#include <sys/types.h>
33
#include <sys/socket.h>
34
#include <netdb.h>
35
#include <netinet/in.h>
36
#include <arpa/inet.h>
37
#include <netinet/in.h>
38
#include <netinet/tcp.h>
39
#include <ctype.h>
40
#include <string.h>
41
#include <stdlib.h>
42
43
#include "CSSocket.h"
44
#include "CSStream.h"
45
#include "CSGlobal.h"
46
#include "CSStrUtil.h"
47
#include "CSFile.h"
48
49
/*
50
 * ---------------------------------------------------------------
51
 * CORE SYSTEM SOCKET FACTORY
52
 */
53
54
CSSocket *CSSocket::newSocket()
55
{
56
	SCSocket *s;
57
	
58
	new_(s, SCSocket());
59
	return (CSSocket *) s;
60
}
61
62
/*
63
 * ---------------------------------------------------------------
64
 * INTERNAL UTILITIES
65
 */
66
67
void SCSocket::formatAddress(size_t size, char *buffer)
68
{
69
	if (iHost) {
70
		cs_strcpy(size, buffer, iHost);
71
		if (iService)
72
			cs_strcat(size, buffer, ":");
73
	}
74
	else
75
		*buffer = 0;
76
	if (iService)
77
		cs_strcat(size, buffer, iService);
78
}
79
80
void SCSocket::throwError(const char *func, const char *file, int line, char *address, int err)
81
{
82
	if (err)
83
		CSException::throwFileError(func, file, line, address, err);
84
	else
85
		CSException::throwEOFError(func, file, line, address);
86
}
87
88
void SCSocket::throwError(const char *func, const char *file, int line, int err)
89
{
90
	char address[CS_SOCKET_ADDRESS_SIZE];
91
92
	formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
93
	throwError(func, file, line, address, err);
94
}
95
96
void SCSocket::setInternalOptions()
97
{
98
	int flag = 1;
99
100
	if (setsockopt(iHandle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) == -1)
101
		CSException::throwOSError(CS_CONTEXT, errno);
102
}
103
104
void SCSocket::openInternal()
105
{
106
	iHandle = socket(AF_INET, SOCK_STREAM, 0);
107
	if (iHandle == -1)
108
		CSException::throwOSError(CS_CONTEXT, errno);
109
	setInternalOptions();
110
}
111
112
/*
113
 * ---------------------------------------------------------------
114
 * SOCKET BASED ON THE STANDARD C SOCKET
115
 */
116
117
CSOutputStream *CSSocket::getOutputStream()
118
{
119
	return CSSocketOutputStream::newStream(RETAIN(this));
120
}
121
122
CSInputStream *CSSocket::getInputStream()
123
{
124
	return CSSocketInputStream::newStream(RETAIN(this));
125
}
126
127
void SCSocket::publish(char *service, int default_port)
128
{
129
	enter_();
130
	close();
131
	try_(a) {
132
		struct servent		*servp;
133
		struct sockaddr_in	server;
134
		struct servent		s;
135
		int					flag = 1;
136
137
		openInternal();
138
		if (service) {
139
			if (isdigit(service[0])) {
140
				int i =  atoi(service);
141
142
				if (!i)
143
					CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, service);
144
				servp = &s;
145
				s.s_port = htons((uint16_t) i);
146
				iService = cs_strdup(service);
147
			}
148
			else if ((servp = getservbyname(service, "tcp")) == NULL) {
149
				if (!default_port)
150
					CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, service);
151
				servp = &s;
152
				s.s_port = htons((uint16_t) default_port);
153
				iService = cs_strdup(default_port);
154
			}
155
			else
156
				iService = cs_strdup(service);
157
		}
158
		else {
159
			if (!default_port)
160
				CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, "");
161
			servp = &s;
162
			s.s_port = htons((uint16_t) default_port);
163
			iService = cs_strdup(default_port);
164
		}
165
			
166
		iPort = ntohs(servp->s_port);
167
168
		memset(&server, 0, sizeof(server));
169
		server.sin_family = AF_INET;
170
		server.sin_addr.s_addr = INADDR_ANY;
171
		server.sin_port = (uint16_t) servp->s_port;
172
173
		if (setsockopt(iHandle, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof(int)) == -1)
174
			CSException::throwOSError(CS_CONTEXT, errno);
175
176
		if (bind(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
177
			CSException::throwOSError(CS_CONTEXT, errno);
178
179
		if (listen(iHandle, SOMAXCONN) == -1)
180
			CSException::throwOSError(CS_CONTEXT, errno);
181
	}
182
	catch_(a) {
183
		close();
184
		throw_();
185
	}
186
	cont_(a);
187
	exit_();
188
}
189
190
void SCSocket::open(CSSocket *listener)
191
{
192
	enter_();
193
194
	close();
195
	try_(a) {
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
196
		int listener_handle;
197
		char address[CS_SOCKET_ADDRESS_SIZE];
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
198
		struct sockaddr_in	remote;
199
		socklen_t			addrlen = sizeof(remote);
200
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
201
		/* First get all the information we need from the listener: */
202
		listener_handle = ((SCSocket *) listener)->iHandle;
203
		listener->formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
204
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
205
		/* I want to make sure no error occurs after the connect!
206
		 * So I allocate a buffer for the host name up front.
207
		 * This means it may be to small, but this is not a problem
208
		 * because the host name stored here is is only used for display
209
		 * of error message etc.
210
		 */
211
		iHost = (char *) cs_malloc(100);
212
		iHandle = accept(listener_handle, (struct sockaddr *) &remote, &addrlen);
213
		if (iHandle == -1)
214
			throwError(CS_CONTEXT, address, errno);
215
216
		cs_strcpy(100, iHost, inet_ntoa(remote.sin_addr));
217
		iPort = ntohs(remote.sin_port);
218
219
		setInternalOptions();
220
	}
221
	catch_(a) {
222
		close();
223
		throw_();
224
	}
225
	cont_(a);
226
	exit_();
227
}
228
229
void SCSocket::open(char *address, int default_port)
230
{
231
	enter_();
232
	close();
233
	try_(a) {
234
		char				*portp = strchr(address, ':');
235
		struct servent		s;
236
		struct servent		*servp;
237
		struct hostent		*hostp;
238
		struct sockaddr_in	server;
239
240
		openInternal();
241
		if (!portp) {
242
			iHost = cs_strdup(address);
243
			if (!default_port)
244
				CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, address);
245
			iService = cs_strdup(default_port);
246
		}
247
		else {
248
			iHost = cs_strdup(address, (size_t) (portp - address));
249
			iService = cs_strdup(portp+1);
250
		}
251
	
252
		if (isdigit(iService[0])) {
253
			int i =  atoi(iService);
254
255
			if (!i)
256
				CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, address);
257
			servp = &s;
258
			s.s_port = htons((uint16_t) i);
259
		}
260
		else if ((servp = getservbyname(iService, "tcp")) == NULL)
261
			CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, iService);
262
		iPort = (int) ntohs(servp->s_port);
263
264
		if ((hostp = gethostbyname(iHost)) == 0)
265
			CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_HOST, iHost);
266
267
		memset(&server, 0, sizeof(server));
268
		server.sin_family = AF_INET;
269
		memcpy(&server.sin_addr, hostp->h_addr, (size_t) hostp->h_length);
270
		server.sin_port = (uint16_t) servp->s_port;
271
		if (connect(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
272
			throwError(CS_CONTEXT, errno);
273
	}
274
	catch_(a) {
275
		close();
276
		throw_();
277
	}
278
	cont_(a);
279
	exit_();
280
}
281
282
void SCSocket::close()
283
{
284
	if (iHandle != -1) {
285
		shutdown(iHandle, SHUT_RDWR);
286
		/* shutdown does not close the socket!!? */
287
		unix_file_close(iHandle);
288
		iHandle = -1;
289
	}
290
	if (iHost) {
291
		cs_free(iHost);
292
		iHost = NULL;
293
	}
294
	if (iService) {
295
		cs_free(iService);
296
		iService = NULL;
297
	}
298
	iPort = 0;
299
}
300
301
size_t SCSocket::read(void *data, size_t len)
302
{
303
	ssize_t in;
304
305
	enter_();
306
	/* recv, by default will block until at lease one byte is
307
	 * returned.
308
	 * So a return of zero means EOF!
309
	 */
310
	retry:
311
	in = recv(iHandle, data, len, 0);
312
	self->interrupted();
313
	if (in == -1) {
314
		/* Note, we actually ignore all errors on the socket.
315
		 * If no data was returned by the read so far, then
316
		 * the error will be considered EOF.
317
		 */
318
		if (errno == EAGAIN || errno == EINTR)
319
			goto retry;
320
		in = 0;
321
	}
322
	return_((size_t) in);
323
}
324
325
int SCSocket::read()
326
{
327
	int		ch;
328
	u_char	buffer[1];
329
330
	enter_();
331
	if (read(buffer, 1) == 1)
332
		ch = buffer[0];
333
	else
334
		ch = -1;
335
	return_(ch);
336
}
337
338
int SCSocket::peek()
339
{
340
	return -1;
341
}
342
343
void SCSocket::write(const void *data, size_t len)
344
{
345
	ssize_t	out;
346
347
	enter_();
348
	while (len > 0) {
349
		out = send(iHandle, data, len, 0);
350
		self->interrupted();
351
		if (out == -1) {
352
			int err = errno;
353
354
			if (err == EAGAIN || errno == EINTR)
355
				continue;
356
			throwError(CS_CONTEXT, err);
357
		}
358
		if ((size_t) out > len)
359
			break;
360
		len -= (size_t) out;
361
		data = ((char *) data) + (size_t) out;
362
	}
363
	exit_();
364
}
365
366
void SCSocket::write(char ch)
367
{
368
	enter_();
369
	write(&ch, 1);
370
	exit_();
371
}
372
373
void SCSocket::flush()
374
{
375
}
376
377