~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSStream.cc

  • Committer: Monty Taylor
  • Date: 2010-07-04 20:02:43 UTC
  • mfrom: (1548.2.40 drizzle_pbms)
  • mto: This revision was merged to the branch mainline in revision 1644.
  • Revision ID: mordred@inaugust.com-20100704200243-2vkq9gi6ysauj2tb
Merge PBMS from Barry.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh (H&G2JCtL)
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-06-07
 
23
 *
 
24
 * CORE SYSTEM:
 
25
 * Basic input and output streams.
 
26
 *
 
27
 * These objects wrap the system streams, and simplify things.
 
28
 * I also want to standardize exceptions and implement
 
29
 * socket based streams.
 
30
 *
 
31
 */
 
32
 
 
33
#include "CSConfig.h"
 
34
 
 
35
#include <string.h>
 
36
#include <inttypes.h>
 
37
 
 
38
#include "CSMemory.h"
 
39
#include "CSStream.h"
 
40
#include "CSGlobal.h"
 
41
 
 
42
using namespace std;
 
43
 
 
44
/*
 
45
 * ---------------------------------------------------------------
 
46
 * STREAM UTILITIES
 
47
 */
 
48
 
 
49
void CSStream::pipe(CSOutputStream *out, CSInputStream *in)
 
50
{
 
51
        char    *buffer;
 
52
        size_t  size;
 
53
 
 
54
        enter_();
 
55
        push_(out);
 
56
        push_(in);
 
57
        
 
58
        buffer = (char *) cs_malloc(DEFAULT_BUFFER_SIZE);
 
59
        push_ptr_(buffer);
 
60
        
 
61
        for (;;) {
 
62
                size = in->read(buffer, DEFAULT_BUFFER_SIZE);
 
63
                self->interrupted();
 
64
                if (!size)
 
65
                        break;
 
66
                out->write(buffer, size);
 
67
                self->interrupted();
 
68
        }
 
69
        in->close();
 
70
        out->close();
 
71
        
 
72
        release_(buffer);
 
73
        release_(in);
 
74
        release_(out);
 
75
        exit_();
 
76
}
 
77
 
 
78
/*
 
79
 * ---------------------------------------------------------------
 
80
 * INPUT STREAMS
 
81
 */
 
82
 
 
83
CSStringBuffer *CSInputStream::readLine()
 
84
{
 
85
        int                             ch;
 
86
        CSStringBuffer  *sb = NULL;
 
87
 
 
88
        enter_();
 
89
        
 
90
        ch = read();
 
91
        if (ch != -1) {
 
92
                new_(sb, CSStringBuffer(20));
 
93
                push_(sb);
 
94
                
 
95
                while (ch != '\n' && ch != '\r' && ch != -1) {
 
96
                        sb->append((char) ch);
 
97
                        ch = read();
 
98
                }
 
99
                if (ch == '\r') {
 
100
                        if (peek() == '\n')
 
101
                                ch = read();
 
102
                }
 
103
 
 
104
                pop_(sb);
 
105
        }
 
106
 
 
107
        return_(sb);
 
108
}
 
109
 
 
110
/*
 
111
 * ---------------------------------------------------------------
 
112
 * OUTPUT STREAMS
 
113
 */
 
114
 
 
115
void CSOutputStream::printLine(const char *cstr)
 
116
{
 
117
        enter_();
 
118
        print(cstr);
 
119
        print(getEOL());
 
120
        flush();
 
121
        exit_();
 
122
}
 
123
 
 
124
void CSOutputStream::print(const char *cstr)
 
125
{
 
126
        enter_();
 
127
        write(cstr, strlen(cstr));
 
128
        exit_();
 
129
}
 
130
 
 
131
void CSOutputStream::print(CSString *s)
 
132
{
 
133
        enter_();
 
134
        print(s->getCString());
 
135
        exit_();
 
136
}
 
137
 
 
138
void CSOutputStream::print(int value)
 
139
{
 
140
        char buffer[20];
 
141
 
 
142
        snprintf(buffer, 20, "%d", value);
 
143
        print(buffer);
 
144
}
 
145
 
 
146
void CSOutputStream::print(uint64_t value)
 
147
{
 
148
        char buffer[30];
 
149
 
 
150
        snprintf(buffer, 30, "%"PRIu64"", value);
 
151
        print(buffer);
 
152
}
 
153
 
 
154
/*
 
155
 * ---------------------------------------------------------------
 
156
 * FILE INPUT STREAMS
 
157
 */
 
158
 
 
159
CSFileInputStream::~CSFileInputStream()
 
160
{
 
161
        if (iFile)
 
162
                iFile->release();
 
163
}
 
164
 
 
165
size_t CSFileInputStream::read(char *b, size_t len)
 
166
{
 
167
        size_t size;
 
168
 
 
169
        enter_();
 
170
        size = iFile->read(b, iReadOffset, len, 0);
 
171
        iReadOffset += size;
 
172
        return_(size);
 
173
}
 
174
 
 
175
int CSFileInputStream::read()
 
176
{
 
177
        size_t  size;
 
178
        char    ch;
 
179
 
 
180
        enter_();
 
181
        size = iFile->read(&ch, iReadOffset, 1, 0);
 
182
        iReadOffset += size;
 
183
        return_(size == 0 ? -1 : (int) ch);
 
184
}
 
185
 
 
186
void CSFileInputStream::reset() {iReadOffset = 0;}
 
187
 
 
188
int CSFileInputStream::peek()
 
189
{
 
190
        size_t  size;
 
191
        char    ch;
 
192
 
 
193
        enter_();
 
194
        size = iFile->read(&ch, iReadOffset, 1, 0);
 
195
        return_(size == 0 ? -1 : (int) ch);
 
196
}
 
197
 
 
198
void CSFileInputStream::close()
 
199
{
 
200
        enter_();
 
201
        iFile->close();
 
202
        exit_();
 
203
}
 
204
 
 
205
CSFileInputStream *CSFileInputStream::newStream(CSFile *f)
 
206
{
 
207
        CSFileInputStream *s;
 
208
 
 
209
        if (!(s = new CSFileInputStream())) {
 
210
                f->release();
 
211
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
212
        }
 
213
        s->iFile = f;
 
214
        return s;
 
215
}
 
216
 
 
217
CSFileInputStream *CSFileInputStream::newStream(CSFile *f, off64_t offset)
 
218
{
 
219
        CSFileInputStream *s;
 
220
 
 
221
        if (!(s = new CSFileInputStream())) {
 
222
                f->release();
 
223
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
224
        }
 
225
        s->iFile = f;
 
226
        s->iReadOffset = offset;
 
227
        return s;
 
228
}
 
229
 
 
230
/*
 
231
 * ---------------------------------------------------------------
 
232
 * FILE OUTPUT STREAMS
 
233
 */
 
234
 
 
235
CSFileOutputStream::~CSFileOutputStream()
 
236
{
 
237
        if (iFile)
 
238
                iFile->release();
 
239
}
 
240
 
 
241
void CSFileOutputStream::write(const char *b, size_t len)
 
242
{
 
243
        enter_();
 
244
        iFile->write(b, iWriteOffset, len);
 
245
        iWriteOffset += len;
 
246
        exit_();
 
247
}
 
248
 
 
249
const char *CSFileOutputStream::getEOL()
 
250
{
 
251
        enter_();
 
252
        return_(iFile->getEOL());
 
253
}
 
254
 
 
255
void CSFileOutputStream::flush()
 
256
{
 
257
        enter_();
 
258
        iFile->flush();
 
259
        exit_();
 
260
}
 
261
 
 
262
void CSFileOutputStream::write(char b)
 
263
{
 
264
        enter_();
 
265
        iFile->write(&b, iWriteOffset, 1);
 
266
        iWriteOffset += 1;
 
267
        exit_();
 
268
}
 
269
 
 
270
void CSFileOutputStream::reset() {iWriteOffset = 0;}
 
271
 
 
272
void CSFileOutputStream::close()
 
273
{
 
274
        enter_();
 
275
        iFile->close();
 
276
        exit_();
 
277
}
 
278
 
 
279
CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f)
 
280
{
 
281
        CSFileOutputStream *s;
 
282
 
 
283
        if (!(s = new CSFileOutputStream())) {
 
284
                f->release();
 
285
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
286
        }
 
287
        s->iFile = f;
 
288
        return  s;
 
289
}
 
290
 
 
291
CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f, off64_t offset)
 
292
{
 
293
        CSFileOutputStream *s;
 
294
 
 
295
        if (!(s = new CSFileOutputStream())) {
 
296
                f->release();
 
297
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
298
        }
 
299
        s->iFile = f;
 
300
        s->iWriteOffset = offset;
 
301
        return  s;
 
302
}
 
303
 
 
304
/*
 
305
 * ---------------------------------------------------------------
 
306
 * SOCKET INPUT STREAMS
 
307
 */
 
308
 
 
309
CSSocketInputStream::~CSSocketInputStream()
 
310
{
 
311
        if (iSocket)
 
312
                iSocket->release();
 
313
}
 
314
 
 
315
void CSSocketInputStream::close()
 
316
{
 
317
        enter_();
 
318
        iSocket->close();
 
319
        exit_();
 
320
}
 
321
 
 
322
size_t CSSocketInputStream::read(char *b, size_t len)
 
323
{
 
324
        enter_();
 
325
        return_(iSocket->read(b, len));
 
326
}
 
327
 
 
328
int CSSocketInputStream::read()
 
329
{
 
330
        enter_();
 
331
        return_(iSocket->read());
 
332
}
 
333
 
 
334
int CSSocketInputStream::peek()
 
335
{
 
336
        enter_();
 
337
        return_(iSocket->peek());
 
338
}
 
339
 
 
340
void CSSocketInputStream::reset()
 
341
{
 
342
        enter_();
 
343
        CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketInputStream::reset() not supported");
 
344
        exit_();
 
345
}
 
346
 
 
347
 
 
348
CSSocketInputStream *CSSocketInputStream::newStream(CSSocket *s)
 
349
{
 
350
        CSSocketInputStream *str;
 
351
 
 
352
        if (!(str = new CSSocketInputStream())) {
 
353
                s->release();
 
354
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
355
        }
 
356
        str->iSocket = s;
 
357
        return  str;
 
358
}
 
359
 
 
360
/*
 
361
 * ---------------------------------------------------------------
 
362
 * SOCKET OUTPUT STREAMS
 
363
 */
 
364
 
 
365
CSSocketOutputStream::~CSSocketOutputStream()
 
366
{
 
367
        if (iSocket)
 
368
                iSocket->release();
 
369
}
 
370
 
 
371
void CSSocketOutputStream::close()
 
372
{
 
373
        enter_();
 
374
        iSocket->close();
 
375
        exit_();
 
376
}
 
377
 
 
378
void CSSocketOutputStream::write(const char *b, size_t len)
 
379
{
 
380
        enter_();
 
381
        iSocket->write(b, len);
 
382
        exit_();
 
383
}
 
384
 
 
385
void CSSocketOutputStream::flush()
 
386
{
 
387
        enter_();
 
388
        iSocket->flush();
 
389
        exit_();
 
390
}
 
391
 
 
392
void CSSocketOutputStream::write(char b)
 
393
{
 
394
        enter_();
 
395
        iSocket->write(b);
 
396
        exit_();
 
397
}
 
398
 
 
399
void CSSocketOutputStream::reset()
 
400
{
 
401
        enter_();
 
402
        CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketOutputStream::reset() not supported");
 
403
        exit_();
 
404
}
 
405
 
 
406
CSSocketOutputStream *CSSocketOutputStream::newStream(CSSocket *s)
 
407
{
 
408
        CSSocketOutputStream *str;
 
409
 
 
410
        if (!(str = new CSSocketOutputStream())) {
 
411
                s->release();
 
412
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
413
        }
 
414
        str->iSocket = s;
 
415
        return  str;
 
416
}
 
417
 
 
418
/*
 
419
 * ---------------------------------------------------------------
 
420
 * BUFFERED INPUT STREAMS
 
421
 */
 
422
 
 
423
CSBufferedInputStream::~CSBufferedInputStream()
 
424
{
 
425
        if (iStream)
 
426
                iStream->release();
 
427
}
 
428
 
 
429
void CSBufferedInputStream::close()
 
430
{
 
431
        enter_();
 
432
        iStream->close();
 
433
        exit_();
 
434
}
 
435
 
 
436
size_t CSBufferedInputStream::read(char *b, size_t len)
 
437
{
 
438
        size_t tfer;
 
439
 
 
440
        enter_();
 
441
        if (iBuffPos < iBuffTotal) {
 
442
                tfer = iBuffTotal - iBuffPos;
 
443
                if (tfer > len)
 
444
                        tfer = len;
 
445
                memcpy(b, iBuffer + iBuffPos, tfer);
 
446
                iBuffPos += tfer;
 
447
        }
 
448
        else
 
449
                tfer = iStream->read(b, len);
 
450
        return_(tfer);
 
451
}
 
452
 
 
453
int CSBufferedInputStream::read()
 
454
{
 
455
        int ch;
 
456
        
 
457
        enter_();
 
458
        if (iBuffPos == iBuffTotal) {
 
459
                iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
 
460
                iBuffPos = 0;
 
461
        }
 
462
        if (iBuffPos < iBuffTotal) {
 
463
                ch = iBuffer[iBuffPos];
 
464
                iBuffPos++;
 
465
        }
 
466
        else
 
467
                ch = -1;
 
468
        return_(ch);
 
469
}
 
470
 
 
471
int CSBufferedInputStream::peek()
 
472
{
 
473
        int ch;
 
474
        
 
475
        enter_();
 
476
        if (iBuffPos == iBuffTotal) {
 
477
                iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
 
478
                iBuffPos = 0;
 
479
        }
 
480
        if (iBuffPos < iBuffTotal)
 
481
                ch = iBuffer[iBuffPos];
 
482
        else
 
483
                ch = -1;
 
484
        return_(ch);
 
485
}
 
486
 
 
487
void CSBufferedInputStream::reset()
 
488
{
 
489
        iBuffPos = iBuffTotal =0;
 
490
        iStream->reset();
 
491
}
 
492
 
 
493
CSBufferedInputStream *CSBufferedInputStream::newStream(CSInputStream* i)
 
494
{
 
495
        CSBufferedInputStream *s;
 
496
 
 
497
        if (!(s = new CSBufferedInputStream())) {
 
498
                i->release();
 
499
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
500
        }
 
501
        s->iStream = i;
 
502
        return  s;
 
503
}
 
504
 
 
505
/*
 
506
 * ---------------------------------------------------------------
 
507
 * BUFFERED OUTPUT STREAMS
 
508
 */
 
509
 
 
510
CSBufferedOutputStream::~CSBufferedOutputStream()
 
511
{
 
512
        if (iStream)
 
513
                iStream->release();
 
514
}
 
515
 
 
516
void CSBufferedOutputStream::close()
 
517
{
 
518
        enter_();
 
519
        iStream->close();
 
520
        exit_();
 
521
}
 
522
 
 
523
void CSBufferedOutputStream::write(const char *b, size_t len)
 
524
{
 
525
        size_t tfer;
 
526
 
 
527
        enter_();
 
528
        if (iBuffTotal < CS_STREAM_BUFFER_SIZE) {
 
529
                tfer = CS_STREAM_BUFFER_SIZE - iBuffTotal;
 
530
                
 
531
                if (tfer > len)
 
532
                        tfer = len;
 
533
                memcpy(iBuffer + iBuffTotal, b, tfer);
 
534
                iBuffTotal += tfer;
 
535
                b += tfer;
 
536
                len -= tfer;
 
537
        }
 
538
        if (len > 0) {
 
539
                flush();
 
540
                if (len > CS_STREAM_BUFFER_SIZE)
 
541
                        iStream->write(b, len);
 
542
                else {
 
543
                        memcpy(iBuffer, b, len);
 
544
                        iBuffTotal = len;
 
545
                }
 
546
        }
 
547
        exit_();
 
548
}
 
549
 
 
550
void CSBufferedOutputStream::flush()
 
551
{
 
552
        enter_();
 
553
        if (iBuffTotal > 0) {
 
554
                iStream->write((char *) iBuffer, iBuffTotal);
 
555
                iBuffTotal = 0;
 
556
        }
 
557
        exit_();
 
558
}
 
559
 
 
560
void CSBufferedOutputStream::write(char b)
 
561
{
 
562
        enter_();
 
563
        if (iBuffTotal == CS_STREAM_BUFFER_SIZE)
 
564
                flush();
 
565
        iBuffer[iBuffTotal] = b;
 
566
        iBuffTotal++;
 
567
        exit_();
 
568
}
 
569
 
 
570
void CSBufferedOutputStream::reset()
 
571
{
 
572
        iBuffTotal = 0;
 
573
        iStream->reset();
 
574
}
 
575
 
 
576
CSBufferedOutputStream *CSBufferedOutputStream::newStream(CSOutputStream* i)
 
577
{
 
578
        CSBufferedOutputStream *s;
 
579
 
 
580
        if (!(s = new CSBufferedOutputStream())) {
 
581
                i->release();
 
582
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
583
        }
 
584
        s->iStream = i;
 
585
        return s;
 
586
}
 
587
 
 
588
/*
 
589
 * ---------------------------------------------------------------
 
590
 * MEMORY INPUT STREAMS
 
591
 */
 
592
CSMemoryInputStream *CSMemoryInputStream::newStream(const u_char* buffer, uint32_t length)
 
593
{
 
594
        CSMemoryInputStream *s;
 
595
 
 
596
        if (!(s = new CSMemoryInputStream())) {
 
597
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
598
        }
 
599
        s->iMemory = buffer;
 
600
        s->iMemTotal = length;
 
601
        return s;
 
602
}
 
603
 
 
604
 
 
605
CSMemoryOutputStream *CSMemoryOutputStream::newStream(size_t init_length, size_t min_alloc)
 
606
{
 
607
        CSMemoryOutputStream *s;
 
608
 
 
609
        if (!(s = new CSMemoryOutputStream())) {
 
610
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
611
        }
 
612
        
 
613
        s->iMemory = (u_char *) cs_malloc(init_length);
 
614
        s->iMemTotal = init_length;
 
615
        s->iMemSpace = init_length;
 
616
        s->iMemPos = s->iMemory;
 
617
        s->iMemMin = min_alloc;
 
618
        return s;
 
619
}
 
620
 
 
621
CSMemoryOutputStream::~CSMemoryOutputStream()
 
622
{
 
623
        if (iMemory)
 
624
                cs_free(iMemory);
 
625
}
 
626
 
 
627
void CSMemoryOutputStream::write(const char *b, size_t len)
 
628
{
 
629
        if (iMemSpace < len) {
 
630
                size_t new_size = iMemTotal + ((len < iMemMin)? iMemMin:len);
 
631
                
 
632
                cs_realloc((void**) &iMemory, new_size);
 
633
                iMemPos = iMemory + (iMemTotal - iMemSpace);
 
634
                iMemSpace += (new_size - iMemTotal);
 
635
                iMemTotal = new_size;           
 
636
        }
 
637
        memcpy(iMemPos, b, len);
 
638
        iMemPos +=len;
 
639
        iMemSpace -= len;
 
640
}
 
641
 
 
642
void CSMemoryOutputStream::write(const char b)
 
643
{
 
644
        if (!iMemSpace) {
 
645
                cs_realloc((void**) &iMemory, iMemTotal + iMemMin);
 
646
                iMemPos = iMemory + iMemTotal;
 
647
                iMemSpace += iMemMin;
 
648
                iMemTotal += iMemMin;           
 
649
        }
 
650
        *iMemPos = b;
 
651
        iMemPos++;
 
652
        iMemSpace--;
 
653
}
 
654
 
 
655
void CSMemoryOutputStream::reset()
 
656
{
 
657
        iMemPos = iMemory;
 
658
        iMemSpace = iMemTotal;
 
659
}
 
660
 
 
661
/*
 
662
 * ---------------------------------------------------------------
 
663
 * STATIC (user) MEMORY OUTPUT STREAM
 
664
 */
 
665
void CSStaticMemoryOutputStream::write(const char *b, size_t len)
 
666
{
 
667
        if (iMemSpace < len) {
 
668
                enter_();
 
669
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
 
670
                exit_();
 
671
        }
 
672
        memcpy(iMemPos, b, len);
 
673
        iMemPos +=len;
 
674
        iMemSpace -= len;
 
675
}
 
676
 
 
677
void CSStaticMemoryOutputStream::write(const char b)
 
678
{
 
679
        if (!iMemSpace) {
 
680
                enter_();
 
681
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
 
682
                exit_();
 
683
        }
 
684
        *iMemPos = b;
 
685
        iMemPos++;
 
686
        iMemSpace--;
 
687
}
 
688
 
 
689
/*
 
690
 * ---------------------------------------------------------------
 
691
 * Callback InPUT STREAM
 
692
 */
 
693
 
 
694
CSCallbackInputStream *CSCallbackInputStream::newStream(CSStreamReadCallbackFunc callback, void *user_data)
 
695
{
 
696
        CSCallbackInputStream *s;
 
697
 
 
698
        if (!(s = new CSCallbackInputStream())) {
 
699
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
700
        }
 
701
        
 
702
        s->callback = callback;
 
703
        s->cb_data = user_data;
 
704
        return s;
 
705
}
 
706