~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSStream.cc

Renamed more stuff to drizzle.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 
 *
19
 
 * Original author: Paul McCullagh (H&G2JCtL)
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-06-07
23
 
 *
24
 
 * CORE SYSTEM:
25
 
 * Basic input and output streams.
26
 
 *
27
 
 * These objects wrap the system streams, and simplify things.
28
 
 * I also want to standardize exceptions and implement
29
 
 * socket based streams.
30
 
 *
31
 
 */
32
 
 
33
 
#include "CSConfig.h"
34
 
 
35
 
#include <string.h>
36
 
#include <inttypes.h>
37
 
 
38
 
#include "CSMemory.h"
39
 
#include "CSStream.h"
40
 
#include "CSGlobal.h"
41
 
 
42
 
using namespace std;
43
 
 
44
 
/*
45
 
 * ---------------------------------------------------------------
46
 
 * STREAM UTILITIES
47
 
 */
48
 
 
49
 
void CSStream::pipe(CSOutputStream *out, CSInputStream *in)
50
 
{
51
 
        char    *buffer;
52
 
        size_t  size;
53
 
 
54
 
        enter_();
55
 
        push_(out);
56
 
        push_(in);
57
 
        
58
 
        buffer = (char *) cs_malloc(DEFAULT_BUFFER_SIZE);
59
 
        push_ptr_(buffer);
60
 
        
61
 
        for (;;) {
62
 
                size = in->read(buffer, DEFAULT_BUFFER_SIZE);
63
 
                self->interrupted();
64
 
                if (!size)
65
 
                        break;
66
 
                out->write(buffer, size);
67
 
                self->interrupted();
68
 
        }
69
 
        in->close();
70
 
        out->close();
71
 
        
72
 
        release_(buffer);
73
 
        release_(in);
74
 
        release_(out);
75
 
        exit_();
76
 
}
77
 
 
78
 
/*
79
 
 * ---------------------------------------------------------------
80
 
 * INPUT STREAMS
81
 
 */
82
 
 
83
 
CSStringBuffer *CSInputStream::readLine()
84
 
{
85
 
        int                             ch;
86
 
        CSStringBuffer  *sb = NULL;
87
 
 
88
 
        enter_();
89
 
        
90
 
        ch = read();
91
 
        if (ch != -1) {
92
 
                new_(sb, CSStringBuffer(20));
93
 
                push_(sb);
94
 
                
95
 
                while (ch != '\n' && ch != '\r' && ch != -1) {
96
 
                        sb->append((char) ch);
97
 
                        ch = read();
98
 
                }
99
 
                if (ch == '\r') {
100
 
                        if (peek() == '\n')
101
 
                                ch = read();
102
 
                }
103
 
 
104
 
                pop_(sb);
105
 
        }
106
 
 
107
 
        return_(sb);
108
 
}
109
 
 
110
 
/*
111
 
 * ---------------------------------------------------------------
112
 
 * OUTPUT STREAMS
113
 
 */
114
 
 
115
 
void CSOutputStream::printLine(const char *cstr)
116
 
{
117
 
        enter_();
118
 
        print(cstr);
119
 
        print(getEOL());
120
 
        flush();
121
 
        exit_();
122
 
}
123
 
 
124
 
void CSOutputStream::print(const char *cstr)
125
 
{
126
 
        enter_();
127
 
        write(cstr, strlen(cstr));
128
 
        exit_();
129
 
}
130
 
 
131
 
void CSOutputStream::print(CSString *s)
132
 
{
133
 
        enter_();
134
 
        print(s->getCString());
135
 
        exit_();
136
 
}
137
 
 
138
 
void CSOutputStream::print(int value)
139
 
{
140
 
        char buffer[20];
141
 
 
142
 
        snprintf(buffer, 20, "%d", value);
143
 
        print(buffer);
144
 
}
145
 
 
146
 
void CSOutputStream::print(uint64_t value)
147
 
{
148
 
        char buffer[30];
149
 
 
150
 
        snprintf(buffer, 30, "%"PRIu64"", value);
151
 
        print(buffer);
152
 
}
153
 
 
154
 
/*
155
 
 * ---------------------------------------------------------------
156
 
 * FILE INPUT STREAMS
157
 
 */
158
 
 
159
 
CSFileInputStream::~CSFileInputStream()
160
 
{
161
 
        if (iFile)
162
 
                iFile->release();
163
 
}
164
 
 
165
 
size_t CSFileInputStream::read(char *b, size_t len)
166
 
{
167
 
        size_t size;
168
 
 
169
 
        enter_();
170
 
        size = iFile->read(b, iReadOffset, len, 0);
171
 
        iReadOffset += size;
172
 
        return_(size);
173
 
}
174
 
 
175
 
int CSFileInputStream::read()
176
 
{
177
 
        size_t  size;
178
 
        char    ch;
179
 
 
180
 
        enter_();
181
 
        size = iFile->read(&ch, iReadOffset, 1, 0);
182
 
        iReadOffset += size;
183
 
        return_(size == 0 ? -1 : (int) ch);
184
 
}
185
 
 
186
 
void CSFileInputStream::reset() {iReadOffset = 0;}
187
 
 
188
 
int CSFileInputStream::peek()
189
 
{
190
 
        size_t  size;
191
 
        char    ch;
192
 
 
193
 
        enter_();
194
 
        size = iFile->read(&ch, iReadOffset, 1, 0);
195
 
        return_(size == 0 ? -1 : (int) ch);
196
 
}
197
 
 
198
 
void CSFileInputStream::close()
199
 
{
200
 
        enter_();
201
 
        iFile->close();
202
 
        exit_();
203
 
}
204
 
 
205
 
CSFileInputStream *CSFileInputStream::newStream(CSFile *f)
206
 
{
207
 
        CSFileInputStream *s;
208
 
 
209
 
        if (!(s = new CSFileInputStream())) {
210
 
                f->release();
211
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
212
 
        }
213
 
        s->iFile = f;
214
 
        return s;
215
 
}
216
 
 
217
 
CSFileInputStream *CSFileInputStream::newStream(CSFile *f, off64_t offset)
218
 
{
219
 
        CSFileInputStream *s;
220
 
 
221
 
        if (!(s = new CSFileInputStream())) {
222
 
                f->release();
223
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
224
 
        }
225
 
        s->iFile = f;
226
 
        s->iReadOffset = offset;
227
 
        return s;
228
 
}
229
 
 
230
 
/*
231
 
 * ---------------------------------------------------------------
232
 
 * FILE OUTPUT STREAMS
233
 
 */
234
 
 
235
 
CSFileOutputStream::~CSFileOutputStream()
236
 
{
237
 
        if (iFile)
238
 
                iFile->release();
239
 
}
240
 
 
241
 
void CSFileOutputStream::write(const char *b, size_t len)
242
 
{
243
 
        enter_();
244
 
        iFile->write(b, iWriteOffset, len);
245
 
        iWriteOffset += len;
246
 
        exit_();
247
 
}
248
 
 
249
 
const char *CSFileOutputStream::getEOL()
250
 
{
251
 
        enter_();
252
 
        return_(iFile->getEOL());
253
 
}
254
 
 
255
 
void CSFileOutputStream::flush()
256
 
{
257
 
        enter_();
258
 
        iFile->flush();
259
 
        exit_();
260
 
}
261
 
 
262
 
void CSFileOutputStream::write(char b)
263
 
{
264
 
        enter_();
265
 
        iFile->write(&b, iWriteOffset, 1);
266
 
        iWriteOffset += 1;
267
 
        exit_();
268
 
}
269
 
 
270
 
void CSFileOutputStream::reset() {iWriteOffset = 0;}
271
 
 
272
 
void CSFileOutputStream::close()
273
 
{
274
 
        enter_();
275
 
        iFile->close();
276
 
        exit_();
277
 
}
278
 
 
279
 
CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f)
280
 
{
281
 
        CSFileOutputStream *s;
282
 
 
283
 
        if (!(s = new CSFileOutputStream())) {
284
 
                f->release();
285
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
286
 
        }
287
 
        s->iFile = f;
288
 
        return  s;
289
 
}
290
 
 
291
 
CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f, off64_t offset)
292
 
{
293
 
        CSFileOutputStream *s;
294
 
 
295
 
        if (!(s = new CSFileOutputStream())) {
296
 
                f->release();
297
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
298
 
        }
299
 
        s->iFile = f;
300
 
        s->iWriteOffset = offset;
301
 
        return  s;
302
 
}
303
 
 
304
 
/*
305
 
 * ---------------------------------------------------------------
306
 
 * SOCKET INPUT STREAMS
307
 
 */
308
 
 
309
 
CSSocketInputStream::~CSSocketInputStream()
310
 
{
311
 
        if (iSocket)
312
 
                iSocket->release();
313
 
}
314
 
 
315
 
void CSSocketInputStream::close()
316
 
{
317
 
        enter_();
318
 
        iSocket->close();
319
 
        exit_();
320
 
}
321
 
 
322
 
size_t CSSocketInputStream::read(char *b, size_t len)
323
 
{
324
 
        enter_();
325
 
        return_(iSocket->read(b, len));
326
 
}
327
 
 
328
 
int CSSocketInputStream::read()
329
 
{
330
 
        enter_();
331
 
        return_(iSocket->read());
332
 
}
333
 
 
334
 
int CSSocketInputStream::peek()
335
 
{
336
 
        enter_();
337
 
        return_(iSocket->peek());
338
 
}
339
 
 
340
 
void CSSocketInputStream::reset()
341
 
{
342
 
        enter_();
343
 
        CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketInputStream::reset() not supported");
344
 
        exit_();
345
 
}
346
 
 
347
 
 
348
 
CSSocketInputStream *CSSocketInputStream::newStream(CSSocket *s)
349
 
{
350
 
        CSSocketInputStream *str;
351
 
 
352
 
        if (!(str = new CSSocketInputStream())) {
353
 
                s->release();
354
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
355
 
        }
356
 
        str->iSocket = s;
357
 
        return  str;
358
 
}
359
 
 
360
 
/*
361
 
 * ---------------------------------------------------------------
362
 
 * SOCKET OUTPUT STREAMS
363
 
 */
364
 
 
365
 
CSSocketOutputStream::~CSSocketOutputStream()
366
 
{
367
 
        if (iSocket)
368
 
                iSocket->release();
369
 
}
370
 
 
371
 
void CSSocketOutputStream::close()
372
 
{
373
 
        enter_();
374
 
        iSocket->close();
375
 
        exit_();
376
 
}
377
 
 
378
 
void CSSocketOutputStream::write(const char *b, size_t len)
379
 
{
380
 
        enter_();
381
 
        iSocket->write(b, len);
382
 
        exit_();
383
 
}
384
 
 
385
 
void CSSocketOutputStream::flush()
386
 
{
387
 
        enter_();
388
 
        iSocket->flush();
389
 
        exit_();
390
 
}
391
 
 
392
 
void CSSocketOutputStream::write(char b)
393
 
{
394
 
        enter_();
395
 
        iSocket->write(b);
396
 
        exit_();
397
 
}
398
 
 
399
 
void CSSocketOutputStream::reset()
400
 
{
401
 
        enter_();
402
 
        CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketOutputStream::reset() not supported");
403
 
        exit_();
404
 
}
405
 
 
406
 
CSSocketOutputStream *CSSocketOutputStream::newStream(CSSocket *s)
407
 
{
408
 
        CSSocketOutputStream *str;
409
 
 
410
 
        if (!(str = new CSSocketOutputStream())) {
411
 
                s->release();
412
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
413
 
        }
414
 
        str->iSocket = s;
415
 
        return  str;
416
 
}
417
 
 
418
 
/*
419
 
 * ---------------------------------------------------------------
420
 
 * BUFFERED INPUT STREAMS
421
 
 */
422
 
 
423
 
CSBufferedInputStream::~CSBufferedInputStream()
424
 
{
425
 
        if (iStream)
426
 
                iStream->release();
427
 
}
428
 
 
429
 
void CSBufferedInputStream::close()
430
 
{
431
 
        enter_();
432
 
        iStream->close();
433
 
        exit_();
434
 
}
435
 
 
436
 
size_t CSBufferedInputStream::read(char *b, size_t len)
437
 
{
438
 
        size_t tfer;
439
 
 
440
 
        enter_();
441
 
        if (iBuffPos < iBuffTotal) {
442
 
                tfer = iBuffTotal - iBuffPos;
443
 
                if (tfer > len)
444
 
                        tfer = len;
445
 
                memcpy(b, iBuffer + iBuffPos, tfer);
446
 
                iBuffPos += tfer;
447
 
        }
448
 
        else
449
 
                tfer = iStream->read(b, len);
450
 
        return_(tfer);
451
 
}
452
 
 
453
 
int CSBufferedInputStream::read()
454
 
{
455
 
        int ch;
456
 
        
457
 
        enter_();
458
 
        if (iBuffPos == iBuffTotal) {
459
 
                iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
460
 
                iBuffPos = 0;
461
 
        }
462
 
        if (iBuffPos < iBuffTotal) {
463
 
                ch = iBuffer[iBuffPos];
464
 
                iBuffPos++;
465
 
        }
466
 
        else
467
 
                ch = -1;
468
 
        return_(ch);
469
 
}
470
 
 
471
 
int CSBufferedInputStream::peek()
472
 
{
473
 
        int ch;
474
 
        
475
 
        enter_();
476
 
        if (iBuffPos == iBuffTotal) {
477
 
                iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
478
 
                iBuffPos = 0;
479
 
        }
480
 
        if (iBuffPos < iBuffTotal)
481
 
                ch = iBuffer[iBuffPos];
482
 
        else
483
 
                ch = -1;
484
 
        return_(ch);
485
 
}
486
 
 
487
 
void CSBufferedInputStream::reset()
488
 
{
489
 
        iBuffPos = iBuffTotal =0;
490
 
        iStream->reset();
491
 
}
492
 
 
493
 
CSBufferedInputStream *CSBufferedInputStream::newStream(CSInputStream* i)
494
 
{
495
 
        CSBufferedInputStream *s;
496
 
 
497
 
        if (!(s = new CSBufferedInputStream())) {
498
 
                i->release();
499
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
500
 
        }
501
 
        s->iStream = i;
502
 
        return  s;
503
 
}
504
 
 
505
 
/*
506
 
 * ---------------------------------------------------------------
507
 
 * BUFFERED OUTPUT STREAMS
508
 
 */
509
 
 
510
 
CSBufferedOutputStream::~CSBufferedOutputStream()
511
 
{
512
 
        if (iStream)
513
 
                iStream->release();
514
 
}
515
 
 
516
 
void CSBufferedOutputStream::close()
517
 
{
518
 
        enter_();
519
 
        iStream->close();
520
 
        exit_();
521
 
}
522
 
 
523
 
void CSBufferedOutputStream::write(const char *b, size_t len)
524
 
{
525
 
        size_t tfer;
526
 
 
527
 
        enter_();
528
 
        if (iBuffTotal < CS_STREAM_BUFFER_SIZE) {
529
 
                tfer = CS_STREAM_BUFFER_SIZE - iBuffTotal;
530
 
                
531
 
                if (tfer > len)
532
 
                        tfer = len;
533
 
                memcpy(iBuffer + iBuffTotal, b, tfer);
534
 
                iBuffTotal += tfer;
535
 
                b += tfer;
536
 
                len -= tfer;
537
 
        }
538
 
        if (len > 0) {
539
 
                flush();
540
 
                if (len > CS_STREAM_BUFFER_SIZE)
541
 
                        iStream->write(b, len);
542
 
                else {
543
 
                        memcpy(iBuffer, b, len);
544
 
                        iBuffTotal = len;
545
 
                }
546
 
        }
547
 
        exit_();
548
 
}
549
 
 
550
 
void CSBufferedOutputStream::flush()
551
 
{
552
 
        enter_();
553
 
        if (iBuffTotal > 0) {
554
 
                iStream->write((char *) iBuffer, iBuffTotal);
555
 
                iBuffTotal = 0;
556
 
        }
557
 
        exit_();
558
 
}
559
 
 
560
 
void CSBufferedOutputStream::write(char b)
561
 
{
562
 
        enter_();
563
 
        if (iBuffTotal == CS_STREAM_BUFFER_SIZE)
564
 
                flush();
565
 
        iBuffer[iBuffTotal] = b;
566
 
        iBuffTotal++;
567
 
        exit_();
568
 
}
569
 
 
570
 
void CSBufferedOutputStream::reset()
571
 
{
572
 
        iBuffTotal = 0;
573
 
        iStream->reset();
574
 
}
575
 
 
576
 
CSBufferedOutputStream *CSBufferedOutputStream::newStream(CSOutputStream* i)
577
 
{
578
 
        CSBufferedOutputStream *s;
579
 
 
580
 
        if (!(s = new CSBufferedOutputStream())) {
581
 
                i->release();
582
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
583
 
        }
584
 
        s->iStream = i;
585
 
        return s;
586
 
}
587
 
 
588
 
/*
589
 
 * ---------------------------------------------------------------
590
 
 * MEMORY INPUT STREAMS
591
 
 */
592
 
CSMemoryInputStream *CSMemoryInputStream::newStream(const u_char* buffer, uint32_t length)
593
 
{
594
 
        CSMemoryInputStream *s;
595
 
 
596
 
        if (!(s = new CSMemoryInputStream())) {
597
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
598
 
        }
599
 
        s->iMemory = buffer;
600
 
        s->iMemTotal = length;
601
 
        return s;
602
 
}
603
 
 
604
 
 
605
 
CSMemoryOutputStream *CSMemoryOutputStream::newStream(size_t init_length, size_t min_alloc)
606
 
{
607
 
        CSMemoryOutputStream *s;
608
 
 
609
 
        if (!(s = new CSMemoryOutputStream())) {
610
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
611
 
        }
612
 
        
613
 
        s->iMemory = (u_char *) cs_malloc(init_length);
614
 
        s->iMemTotal = init_length;
615
 
        s->iMemSpace = init_length;
616
 
        s->iMemPos = s->iMemory;
617
 
        s->iMemMin = min_alloc;
618
 
        return s;
619
 
}
620
 
 
621
 
CSMemoryOutputStream::~CSMemoryOutputStream()
622
 
{
623
 
        if (iMemory)
624
 
                cs_free(iMemory);
625
 
}
626
 
 
627
 
void CSMemoryOutputStream::write(const char *b, size_t len)
628
 
{
629
 
        if (iMemSpace < len) {
630
 
                size_t new_size = iMemTotal + ((len < iMemMin)? iMemMin:len);
631
 
                
632
 
                cs_realloc((void**) &iMemory, new_size);
633
 
                iMemPos = iMemory + (iMemTotal - iMemSpace);
634
 
                iMemSpace += (new_size - iMemTotal);
635
 
                iMemTotal = new_size;           
636
 
        }
637
 
        memcpy(iMemPos, b, len);
638
 
        iMemPos +=len;
639
 
        iMemSpace -= len;
640
 
}
641
 
 
642
 
void CSMemoryOutputStream::write(const char b)
643
 
{
644
 
        if (!iMemSpace) {
645
 
                cs_realloc((void**) &iMemory, iMemTotal + iMemMin);
646
 
                iMemPos = iMemory + iMemTotal;
647
 
                iMemSpace += iMemMin;
648
 
                iMemTotal += iMemMin;           
649
 
        }
650
 
        *iMemPos = b;
651
 
        iMemPos++;
652
 
        iMemSpace--;
653
 
}
654
 
 
655
 
void CSMemoryOutputStream::reset()
656
 
{
657
 
        iMemPos = iMemory;
658
 
        iMemSpace = iMemTotal;
659
 
}
660
 
 
661
 
/*
662
 
 * ---------------------------------------------------------------
663
 
 * STATIC (user) MEMORY OUTPUT STREAM
664
 
 */
665
 
void CSStaticMemoryOutputStream::write(const char *b, size_t len)
666
 
{
667
 
        if (iMemSpace < len) {
668
 
                enter_();
669
 
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
670
 
                exit_();
671
 
        }
672
 
        memcpy(iMemPos, b, len);
673
 
        iMemPos +=len;
674
 
        iMemSpace -= len;
675
 
}
676
 
 
677
 
void CSStaticMemoryOutputStream::write(const char b)
678
 
{
679
 
        if (!iMemSpace) {
680
 
                enter_();
681
 
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
682
 
                exit_();
683
 
        }
684
 
        *iMemPos = b;
685
 
        iMemPos++;
686
 
        iMemSpace--;
687
 
}
688
 
 
689
 
/*
690
 
 * ---------------------------------------------------------------
691
 
 * Callback InPUT STREAM
692
 
 */
693
 
 
694
 
CSCallbackInputStream *CSCallbackInputStream::newStream(CSStreamReadCallbackFunc callback, void *user_data)
695
 
{
696
 
        CSCallbackInputStream *s;
697
 
 
698
 
        if (!(s = new CSCallbackInputStream())) {
699
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
700
 
        }
701
 
        
702
 
        s->callback = callback;
703
 
        s->cb_data = user_data;
704
 
        return s;
705
 
}
706