~drizzle-trunk/drizzle/development

1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
2
 *
3
 * PrimeBase Media Stream for MySQL
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 *
19
 * Original author: Paul McCullagh (H&G2JCtL)
20
 * Continued development: Barry Leslie
21
 *
22
 * 2007-06-07
23
 *
24
 * CORE SYSTEM:
25
 * Basic input and output streams.
26
 *
27
 * These objects wrap the system streams, and simplify things.
28
 * I also want to standardize exceptions and implement
29
 * socket based streams.
30
 *
31
 */
32
33
#include "CSConfig.h"
34
35
#include <string.h>
36
#include <inttypes.h>
37
38
#include "CSMemory.h"
39
#include "CSStream.h"
40
#include "CSGlobal.h"
41
42
using namespace std;
43
44
/*
45
 * ---------------------------------------------------------------
46
 * STREAM UTILITIES
47
 */
48
49
void CSStream::pipe(CSOutputStream *out, CSInputStream *in)
50
{
51
	char	*buffer;
52
	size_t	size;
53
54
	enter_();
55
	push_(out);
56
	push_(in);
57
	
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
58
	buffer = (char *) cs_malloc(DEFAULT_BUFFER_SIZE);
59
	push_ptr_(buffer);
60
	
61
	for (;;) {
62
		size = in->read(buffer, DEFAULT_BUFFER_SIZE);
63
		self->interrupted();
64
		if (!size)
65
			break;
66
		out->write(buffer, size);
67
		self->interrupted();
68
	}
69
	in->close();
70
	out->close();
71
	
72
	release_(buffer);
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
73
	release_(in);
74
	release_(out);
75
	exit_();
76
}
77
78
/*
79
 * ---------------------------------------------------------------
80
 * INPUT STREAMS
81
 */
82
83
CSStringBuffer *CSInputStream::readLine()
84
{
85
	int				ch;
86
	CSStringBuffer	*sb = NULL;
87
88
	enter_();
89
	
90
	ch = read();
91
	if (ch != -1) {
92
		new_(sb, CSStringBuffer(20));
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
93
		push_(sb);
94
		
95
		while (ch != '\n' && ch != '\r' && ch != -1) {
96
			sb->append((char) ch);
97
			ch = read();
98
		}
99
		if (ch == '\r') {
100
			if (peek() == '\n')
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
101
				ch = read();
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
102
		}
103
104
		pop_(sb);
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
105
	}
106
107
	return_(sb);
108
}
109
110
/*
111
 * ---------------------------------------------------------------
112
 * OUTPUT STREAMS
113
 */
114
115
void CSOutputStream::printLine(const char *cstr)
116
{
117
	enter_();
118
	print(cstr);
119
	print(getEOL());
120
	flush();
121
	exit_();
122
}
123
124
void CSOutputStream::print(const char *cstr)
125
{
126
	enter_();
127
	write(cstr, strlen(cstr));
128
	exit_();
129
}
130
131
void CSOutputStream::print(CSString *s)
132
{
133
	enter_();
134
	print(s->getCString());
135
	exit_();
136
}
137
138
void CSOutputStream::print(int value)
139
{
140
	char buffer[20];
141
142
	snprintf(buffer, 20, "%d", value);
143
	print(buffer);
144
}
145
146
void CSOutputStream::print(uint64_t value)
147
{
148
	char buffer[30];
149
150
	snprintf(buffer, 30, "%"PRIu64"", value);
151
	print(buffer);
152
}
153
154
/*
155
 * ---------------------------------------------------------------
156
 * FILE INPUT STREAMS
157
 */
158
159
CSFileInputStream::~CSFileInputStream()
160
{
161
	if (iFile)
162
		iFile->release();
163
}
164
165
size_t CSFileInputStream::read(char *b, size_t len)
166
{
167
	size_t size;
168
169
	enter_();
170
	size = iFile->read(b, iReadOffset, len, 0);
171
	iReadOffset += size;
172
	return_(size);
173
}
174
175
int CSFileInputStream::read()
176
{
177
	size_t	size;
178
	char	ch;
179
180
	enter_();
181
	size = iFile->read(&ch, iReadOffset, 1, 0);
182
	iReadOffset += size;
183
	return_(size == 0 ? -1 : (int) ch);
184
}
185
186
void CSFileInputStream::reset() {iReadOffset = 0;}
187
188
int CSFileInputStream::peek()
189
{
190
	size_t	size;
191
	char	ch;
192
193
	enter_();
194
	size = iFile->read(&ch, iReadOffset, 1, 0);
195
	return_(size == 0 ? -1 : (int) ch);
196
}
197
198
void CSFileInputStream::close()
199
{
200
	enter_();
201
	iFile->close();
202
	exit_();
203
}
204
205
CSFileInputStream *CSFileInputStream::newStream(CSFile *f)
206
{
207
	CSFileInputStream *s;
208
209
	if (!(s = new CSFileInputStream())) {
210
		f->release();
211
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
212
	}
213
	s->iFile = f;
214
	return s;
215
}
216
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
217
CSFileInputStream *CSFileInputStream::newStream(CSFile *f, off64_t offset)
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
218
{
219
	CSFileInputStream *s;
220
221
	if (!(s = new CSFileInputStream())) {
222
		f->release();
223
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
224
	}
225
	s->iFile = f;
226
	s->iReadOffset = offset;
227
	return s;
228
}
229
230
/*
231
 * ---------------------------------------------------------------
232
 * FILE OUTPUT STREAMS
233
 */
234
235
CSFileOutputStream::~CSFileOutputStream()
236
{
237
	if (iFile)
238
		iFile->release();
239
}
240
241
void CSFileOutputStream::write(const char *b, size_t len)
242
{
243
	enter_();
244
	iFile->write(b, iWriteOffset, len);
245
	iWriteOffset += len;
246
	exit_();
247
}
248
249
const char *CSFileOutputStream::getEOL()
250
{
251
	enter_();
252
	return_(iFile->getEOL());
253
}
254
255
void CSFileOutputStream::flush()
256
{
257
	enter_();
258
	iFile->flush();
259
	exit_();
260
}
261
262
void CSFileOutputStream::write(char b)
263
{
264
	enter_();
265
	iFile->write(&b, iWriteOffset, 1);
266
	iWriteOffset += 1;
267
	exit_();
268
}
269
270
void CSFileOutputStream::reset() {iWriteOffset = 0;}
271
272
void CSFileOutputStream::close()
273
{
274
	enter_();
275
	iFile->close();
276
	exit_();
277
}
278
279
CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f)
280
{
281
	CSFileOutputStream *s;
282
283
	if (!(s = new CSFileOutputStream())) {
284
		f->release();
285
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
286
	}
287
	s->iFile = f;
288
	return  s;
289
}
290
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
291
CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f, off64_t offset)
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
292
{
293
	CSFileOutputStream *s;
294
295
	if (!(s = new CSFileOutputStream())) {
296
		f->release();
297
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
298
	}
299
	s->iFile = f;
300
	s->iWriteOffset = offset;
301
	return  s;
302
}
303
304
/*
305
 * ---------------------------------------------------------------
306
 * SOCKET INPUT STREAMS
307
 */
308
309
CSSocketInputStream::~CSSocketInputStream()
310
{
311
	if (iSocket)
312
		iSocket->release();
313
}
314
315
void CSSocketInputStream::close()
316
{
317
	enter_();
318
	iSocket->close();
319
	exit_();
320
}
321
322
size_t CSSocketInputStream::read(char *b, size_t len)
323
{
324
	enter_();
325
	return_(iSocket->read(b, len));
326
}
327
328
int CSSocketInputStream::read()
329
{
330
	enter_();
331
	return_(iSocket->read());
332
}
333
334
int CSSocketInputStream::peek()
335
{
336
	enter_();
337
	return_(iSocket->peek());
338
}
339
340
void CSSocketInputStream::reset()
341
{
342
	enter_();
343
	CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketInputStream::reset() not supported");
344
	exit_();
345
}
346
347
348
CSSocketInputStream *CSSocketInputStream::newStream(CSSocket *s)
349
{
350
	CSSocketInputStream *str;
351
352
	if (!(str = new CSSocketInputStream())) {
353
		s->release();
354
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
355
	}
356
	str->iSocket = s;
357
	return  str;
358
}
359
360
/*
361
 * ---------------------------------------------------------------
362
 * SOCKET OUTPUT STREAMS
363
 */
364
365
CSSocketOutputStream::~CSSocketOutputStream()
366
{
367
	if (iSocket)
368
		iSocket->release();
369
}
370
371
void CSSocketOutputStream::close()
372
{
373
	enter_();
374
	iSocket->close();
375
	exit_();
376
}
377
378
void CSSocketOutputStream::write(const char *b, size_t len)
379
{
380
	enter_();
381
	iSocket->write(b, len);
382
	exit_();
383
}
384
385
void CSSocketOutputStream::flush()
386
{
387
	enter_();
388
	iSocket->flush();
389
	exit_();
390
}
391
392
void CSSocketOutputStream::write(char b)
393
{
394
	enter_();
395
	iSocket->write(b);
396
	exit_();
397
}
398
399
void CSSocketOutputStream::reset()
400
{
401
	enter_();
402
	CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketOutputStream::reset() not supported");
403
	exit_();
404
}
405
406
CSSocketOutputStream *CSSocketOutputStream::newStream(CSSocket *s)
407
{
408
	CSSocketOutputStream *str;
409
410
	if (!(str = new CSSocketOutputStream())) {
411
		s->release();
412
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
413
	}
414
	str->iSocket = s;
415
	return  str;
416
}
417
418
/*
419
 * ---------------------------------------------------------------
420
 * BUFFERED INPUT STREAMS
421
 */
422
423
CSBufferedInputStream::~CSBufferedInputStream()
424
{
425
	if (iStream)
426
		iStream->release();
427
}
428
429
void CSBufferedInputStream::close()
430
{
431
	enter_();
432
	iStream->close();
433
	exit_();
434
}
435
436
size_t CSBufferedInputStream::read(char *b, size_t len)
437
{
438
	size_t tfer;
439
440
	enter_();
441
	if (iBuffPos < iBuffTotal) {
442
		tfer = iBuffTotal - iBuffPos;
443
		if (tfer > len)
444
			tfer = len;
445
		memcpy(b, iBuffer + iBuffPos, tfer);
446
		iBuffPos += tfer;
447
	}
448
	else
449
		tfer = iStream->read(b, len);
450
	return_(tfer);
451
}
452
453
int CSBufferedInputStream::read()
454
{
455
	int ch;
456
	
457
	enter_();
458
	if (iBuffPos == iBuffTotal) {
459
		iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
460
		iBuffPos = 0;
461
	}
462
	if (iBuffPos < iBuffTotal) {
463
		ch = iBuffer[iBuffPos];
464
		iBuffPos++;
465
	}
466
	else
467
		ch = -1;
468
	return_(ch);
469
}
470
471
int CSBufferedInputStream::peek()
472
{
473
	int ch;
474
	
475
	enter_();
476
	if (iBuffPos == iBuffTotal) {
477
		iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
478
		iBuffPos = 0;
479
	}
480
	if (iBuffPos < iBuffTotal)
481
		ch = iBuffer[iBuffPos];
482
	else
483
		ch = -1;
484
	return_(ch);
485
}
486
487
void CSBufferedInputStream::reset()
488
{
489
	iBuffPos = iBuffTotal =0;
490
	iStream->reset();
491
}
492
493
CSBufferedInputStream *CSBufferedInputStream::newStream(CSInputStream* i)
494
{
495
	CSBufferedInputStream *s;
496
497
	if (!(s = new CSBufferedInputStream())) {
498
		i->release();
499
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
500
	}
501
	s->iStream = i;
502
	return  s;
503
}
504
505
/*
506
 * ---------------------------------------------------------------
507
 * BUFFERED OUTPUT STREAMS
508
 */
509
510
CSBufferedOutputStream::~CSBufferedOutputStream()
511
{
512
	if (iStream)
513
		iStream->release();
514
}
515
516
void CSBufferedOutputStream::close()
517
{
518
	enter_();
519
	iStream->close();
520
	exit_();
521
}
522
523
void CSBufferedOutputStream::write(const char *b, size_t len)
524
{
525
	size_t tfer;
526
527
	enter_();
528
	if (iBuffTotal < CS_STREAM_BUFFER_SIZE) {
529
		tfer = CS_STREAM_BUFFER_SIZE - iBuffTotal;
530
		
531
		if (tfer > len)
532
			tfer = len;
533
		memcpy(iBuffer + iBuffTotal, b, tfer);
534
		iBuffTotal += tfer;
535
		b += tfer;
536
		len -= tfer;
537
	}
538
	if (len > 0) {
539
		flush();
540
		if (len > CS_STREAM_BUFFER_SIZE)
541
			iStream->write(b, len);
542
		else {
543
			memcpy(iBuffer, b, len);
544
			iBuffTotal = len;
545
		}
546
	}
547
	exit_();
548
}
549
550
void CSBufferedOutputStream::flush()
551
{
552
	enter_();
553
	if (iBuffTotal > 0) {
554
		iStream->write((char *) iBuffer, iBuffTotal);
555
		iBuffTotal = 0;
556
	}
557
	exit_();
558
}
559
560
void CSBufferedOutputStream::write(char b)
561
{
562
	enter_();
563
	if (iBuffTotal == CS_STREAM_BUFFER_SIZE)
564
		flush();
565
	iBuffer[iBuffTotal] = b;
566
	iBuffTotal++;
567
	exit_();
568
}
569
570
void CSBufferedOutputStream::reset()
571
{
572
	iBuffTotal = 0;
573
	iStream->reset();
574
}
575
576
CSBufferedOutputStream *CSBufferedOutputStream::newStream(CSOutputStream* i)
577
{
578
	CSBufferedOutputStream *s;
579
580
	if (!(s = new CSBufferedOutputStream())) {
581
		i->release();
582
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
583
	}
584
	s->iStream = i;
585
	return s;
586
}
587
588
/*
589
 * ---------------------------------------------------------------
590
 * MEMORY INPUT STREAMS
591
 */
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
592
CSMemoryInputStream *CSMemoryInputStream::newStream(const u_char* buffer, uint32_t length)
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
593
{
594
	CSMemoryInputStream *s;
595
596
	if (!(s = new CSMemoryInputStream())) {
597
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
598
	}
599
	s->iMemory = buffer;
600
	s->iMemTotal = length;
601
	return s;
602
}
603
604
605
CSMemoryOutputStream *CSMemoryOutputStream::newStream(size_t init_length, size_t min_alloc)
606
{
607
	CSMemoryOutputStream *s;
608
609
	if (!(s = new CSMemoryOutputStream())) {
610
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
611
	}
612
	
613
	s->iMemory = (u_char *) cs_malloc(init_length);
614
	s->iMemTotal = init_length;
615
	s->iMemSpace = init_length;
616
	s->iMemPos = s->iMemory;
617
	s->iMemMin = min_alloc;
618
	return s;
619
}
620
621
CSMemoryOutputStream::~CSMemoryOutputStream()
622
{
623
	if (iMemory)
624
		cs_free(iMemory);
625
}
626
627
void CSMemoryOutputStream::write(const char *b, size_t len)
628
{
629
	if (iMemSpace < len) {
630
		size_t new_size = iMemTotal + ((len < iMemMin)? iMemMin:len);
631
		
632
		cs_realloc((void**) &iMemory, new_size);
633
		iMemPos = iMemory + (iMemTotal - iMemSpace);
634
		iMemSpace += (new_size - iMemTotal);
635
		iMemTotal = new_size;		
636
	}
637
	memcpy(iMemPos, b, len);
638
	iMemPos +=len;
639
	iMemSpace -= len;
640
}
641
642
void CSMemoryOutputStream::write(const char b)
643
{
644
	if (!iMemSpace) {
645
		cs_realloc((void**) &iMemory, iMemTotal + iMemMin);
646
		iMemPos = iMemory + iMemTotal;
647
		iMemSpace += iMemMin;
648
		iMemTotal += iMemMin;		
649
	}
650
	*iMemPos = b;
651
	iMemPos++;
652
	iMemSpace--;
653
}
654
655
void CSMemoryOutputStream::reset()
656
{
657
	iMemPos = iMemory;
658
	iMemSpace = iMemTotal;
659
}
660
661
/*
662
 * ---------------------------------------------------------------
663
 * STATIC (user) MEMORY OUTPUT STREAM
664
 */
665
void CSStaticMemoryOutputStream::write(const char *b, size_t len)
666
{
667
	if (iMemSpace < len) {
668
		enter_();
669
		CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
670
		exit_();
671
	}
672
	memcpy(iMemPos, b, len);
673
	iMemPos +=len;
674
	iMemSpace -= len;
675
}
676
677
void CSStaticMemoryOutputStream::write(const char b)
678
{
679
	if (!iMemSpace) {
680
		enter_();
681
		CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
682
		exit_();
683
	}
684
	*iMemPos = b;
685
	iMemPos++;
686
	iMemSpace--;
687
}
688
689
/*
690
 * ---------------------------------------------------------------
691
 * Callback InPUT STREAM
692
 */
693
694
CSCallbackInputStream *CSCallbackInputStream::newStream(CSStreamReadCallbackFunc callback, void *user_data)
695
{
696
	CSCallbackInputStream *s;
697
698
	if (!(s = new CSCallbackInputStream())) {
699
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
700
	}
701
	
702
	s->callback = callback;
703
	s->cb_data = user_data;
704
	return s;
705
}
706