~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSStream.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 07:00:09 UTC
  • mto: This revision was merged to the branch mainline in revision 2476.
  • Revision ID: me@mark.atwood.name-20111220070009-dve2mlryq200razw
remove references to pbms in the gettext po files

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh (H&G2JCtL)
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-06-07
23
 
 *
24
 
 * CORE SYSTEM:
25
 
 * Basic input and output streams.
26
 
 *
27
 
 * These objects wrap the system streams, and simplify things.
28
 
 * I also want to standardize exceptions and implement
29
 
 * socket based streams.
30
 
 *
31
 
 */
32
 
 
33
 
#include "CSConfig.h"
34
 
 
35
 
#include <string.h>
36
 
#include <inttypes.h>
37
 
 
38
 
#include "CSMemory.h"
39
 
#include "CSStream.h"
40
 
#include "CSGlobal.h"
41
 
 
42
 
/*
43
 
 * ---------------------------------------------------------------
44
 
 * STREAM UTILITIES
45
 
 */
46
 
 
47
 
void CSStream::pipe(CSOutputStream *out, CSInputStream *in)
48
 
{
49
 
        char    *buffer;
50
 
        size_t  size;
51
 
 
52
 
        enter_();
53
 
        push_(out);
54
 
        push_(in);
55
 
        
56
 
        buffer = (char *) cs_malloc(DEFAULT_BUFFER_SIZE);
57
 
        push_ptr_(buffer);
58
 
        
59
 
        for (;;) {
60
 
                size = in->read(buffer, DEFAULT_BUFFER_SIZE);
61
 
                self->interrupted();
62
 
                if (!size)
63
 
                        break;
64
 
                out->write(buffer, size);
65
 
                self->interrupted();
66
 
        }
67
 
        in->close();
68
 
        out->close();
69
 
        
70
 
        release_(buffer);
71
 
        release_(in);
72
 
        release_(out);
73
 
        exit_();
74
 
}
75
 
 
76
 
/*
77
 
 * ---------------------------------------------------------------
78
 
 * INPUT STREAMS
79
 
 */
80
 
 
81
 
CSStringBuffer *CSInputStream::readLine()
82
 
{
83
 
        int                             ch;
84
 
        CSStringBuffer  *sb = NULL;
85
 
 
86
 
        enter_();
87
 
        
88
 
        ch = read();
89
 
        if (ch != -1) {
90
 
                new_(sb, CSStringBuffer(20));
91
 
                push_(sb);
92
 
                
93
 
                while (ch != '\n' && ch != '\r' && ch != -1) {
94
 
                        sb->append((char) ch);
95
 
                        ch = read();
96
 
                }
97
 
                if (ch == '\r') {
98
 
                        if (peek() == '\n')
99
 
                                ch = read();
100
 
                }
101
 
 
102
 
                pop_(sb);
103
 
        }
104
 
 
105
 
        return_(sb);
106
 
}
107
 
 
108
 
/*
109
 
 * ---------------------------------------------------------------
110
 
 * OUTPUT STREAMS
111
 
 */
112
 
 
113
 
void CSOutputStream::printLine(const char *cstr)
114
 
{
115
 
        enter_();
116
 
        print(cstr);
117
 
        print(getEOL());
118
 
        flush();
119
 
        exit_();
120
 
}
121
 
 
122
 
void CSOutputStream::print(const char *cstr)
123
 
{
124
 
        enter_();
125
 
        write(cstr, strlen(cstr));
126
 
        exit_();
127
 
}
128
 
 
129
 
void CSOutputStream::print(CSString *s)
130
 
{
131
 
        enter_();
132
 
        print(s->getCString());
133
 
        exit_();
134
 
}
135
 
 
136
 
void CSOutputStream::print(int value)
137
 
{
138
 
        char buffer[20];
139
 
 
140
 
        snprintf(buffer, 20, "%d", value);
141
 
        print(buffer);
142
 
}
143
 
 
144
 
void CSOutputStream::print(uint64_t value)
145
 
{
146
 
        char buffer[30];
147
 
 
148
 
        snprintf(buffer, 30, "%"PRIu64"", value);
149
 
        print(buffer);
150
 
}
151
 
 
152
 
/*
153
 
 * ---------------------------------------------------------------
154
 
 * FILE INPUT STREAMS
155
 
 */
156
 
 
157
 
CSFileInputStream::~CSFileInputStream()
158
 
{
159
 
        if (iFile)
160
 
                iFile->release();
161
 
}
162
 
 
163
 
size_t CSFileInputStream::read(char *b, size_t len)
164
 
{
165
 
        size_t size;
166
 
 
167
 
        enter_();
168
 
        size = iFile->read(b, iReadOffset, len, 0);
169
 
        iReadOffset += size;
170
 
        return_(size);
171
 
}
172
 
 
173
 
int CSFileInputStream::read()
174
 
{
175
 
        size_t  size;
176
 
        char    ch;
177
 
 
178
 
        enter_();
179
 
        size = iFile->read(&ch, iReadOffset, 1, 0);
180
 
        iReadOffset += size;
181
 
        return_(size == 0 ? -1 : (int) ch);
182
 
}
183
 
 
184
 
void CSFileInputStream::reset()
185
 
{
186
 
        iReadOffset = 0;
187
 
}
188
 
 
189
 
const char *CSFileInputStream::identify()
190
 
{
191
 
        return iFile->myFilePath->getCString();
192
 
}
193
 
 
194
 
int CSFileInputStream::peek()
195
 
{
196
 
        size_t  size;
197
 
        char    ch;
198
 
 
199
 
        enter_();
200
 
        size = iFile->read(&ch, iReadOffset, 1, 0);
201
 
        return_(size == 0 ? -1 : (int) ch);
202
 
}
203
 
 
204
 
void CSFileInputStream::close()
205
 
{
206
 
        enter_();
207
 
        iFile->close();
208
 
        exit_();
209
 
}
210
 
 
211
 
CSFileInputStream *CSFileInputStream::newStream(CSFile *f)
212
 
{
213
 
        CSFileInputStream *s;
214
 
 
215
 
        if (!(s = new CSFileInputStream())) {
216
 
                f->release();
217
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
218
 
        }
219
 
        s->iFile = f;
220
 
        return s;
221
 
}
222
 
 
223
 
CSFileInputStream *CSFileInputStream::newStream(CSFile *f, off64_t offset)
224
 
{
225
 
        CSFileInputStream *s;
226
 
 
227
 
        if (!(s = new CSFileInputStream())) {
228
 
                f->release();
229
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
230
 
        }
231
 
        s->iFile = f;
232
 
        s->iReadOffset = offset;
233
 
        return s;
234
 
}
235
 
 
236
 
/*
237
 
 * ---------------------------------------------------------------
238
 
 * FILE OUTPUT STREAMS
239
 
 */
240
 
 
241
 
CSFileOutputStream::~CSFileOutputStream()
242
 
{
243
 
        if (iFile)
244
 
                iFile->release();
245
 
}
246
 
 
247
 
void CSFileOutputStream::write(const char *b, size_t len)
248
 
{
249
 
        enter_();
250
 
        iFile->write(b, iWriteOffset, len);
251
 
        iWriteOffset += len;
252
 
        exit_();
253
 
}
254
 
 
255
 
const char *CSFileOutputStream::getEOL()
256
 
{
257
 
        enter_();
258
 
        return_(iFile->getEOL());
259
 
}
260
 
 
261
 
void CSFileOutputStream::flush()
262
 
{
263
 
        enter_();
264
 
        iFile->flush();
265
 
        exit_();
266
 
}
267
 
 
268
 
void CSFileOutputStream::write(char b)
269
 
{
270
 
        enter_();
271
 
        iFile->write(&b, iWriteOffset, 1);
272
 
        iWriteOffset += 1;
273
 
        exit_();
274
 
}
275
 
 
276
 
void CSFileOutputStream::reset()
277
 
{
278
 
        iWriteOffset = 0;
279
 
}
280
 
 
281
 
const char *CSFileOutputStream::identify()
282
 
{
283
 
        return iFile->myFilePath->getCString();
284
 
}
285
 
 
286
 
void CSFileOutputStream::close()
287
 
{
288
 
        enter_();
289
 
        iFile->close();
290
 
        exit_();
291
 
}
292
 
 
293
 
CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f)
294
 
{
295
 
        CSFileOutputStream *s;
296
 
 
297
 
        if (!(s = new CSFileOutputStream())) {
298
 
                f->release();
299
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
300
 
        }
301
 
        s->iFile = f;
302
 
        return  s;
303
 
}
304
 
 
305
 
CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f, off64_t offset)
306
 
{
307
 
        CSFileOutputStream *s;
308
 
 
309
 
        if (!(s = new CSFileOutputStream())) {
310
 
                f->release();
311
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
312
 
        }
313
 
        s->iFile = f;
314
 
        s->iWriteOffset = offset;
315
 
        return  s;
316
 
}
317
 
 
318
 
/*
319
 
 * ---------------------------------------------------------------
320
 
 * SOCKET INPUT STREAMS
321
 
 */
322
 
 
323
 
CSSocketInputStream::~CSSocketInputStream()
324
 
{
325
 
        if (iSocket)
326
 
                iSocket->release();
327
 
}
328
 
 
329
 
void CSSocketInputStream::close()
330
 
{
331
 
        enter_();
332
 
        iSocket->close();
333
 
        exit_();
334
 
}
335
 
 
336
 
size_t CSSocketInputStream::read(char *b, size_t len)
337
 
{
338
 
        enter_();
339
 
        return_(iSocket->read(b, len));
340
 
}
341
 
 
342
 
int CSSocketInputStream::read()
343
 
{
344
 
        enter_();
345
 
        return_(iSocket->read());
346
 
}
347
 
 
348
 
int CSSocketInputStream::peek()
349
 
{
350
 
        enter_();
351
 
        return_(iSocket->peek());
352
 
}
353
 
 
354
 
void CSSocketInputStream::reset()
355
 
{
356
 
        enter_();
357
 
        CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketInputStream::reset() not supported");
358
 
        exit_();
359
 
}
360
 
 
361
 
const char *CSSocketInputStream::identify()
362
 
{
363
 
        return iSocket->identify();
364
 
}
365
 
 
366
 
CSSocketInputStream *CSSocketInputStream::newStream(CSSocket *s)
367
 
{
368
 
        CSSocketInputStream *str;
369
 
 
370
 
        if (!(str = new CSSocketInputStream())) {
371
 
                s->release();
372
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
373
 
        }
374
 
        str->iSocket = s;
375
 
        return  str;
376
 
}
377
 
 
378
 
/*
379
 
 * ---------------------------------------------------------------
380
 
 * SOCKET OUTPUT STREAMS
381
 
 */
382
 
 
383
 
CSSocketOutputStream::~CSSocketOutputStream()
384
 
{
385
 
        if (iSocket)
386
 
                iSocket->release();
387
 
}
388
 
 
389
 
void CSSocketOutputStream::close()
390
 
{
391
 
        enter_();
392
 
        iSocket->close();
393
 
        exit_();
394
 
}
395
 
 
396
 
void CSSocketOutputStream::write(const char *b, size_t len)
397
 
{
398
 
        enter_();
399
 
        iSocket->write(b, len);
400
 
        exit_();
401
 
}
402
 
 
403
 
void CSSocketOutputStream::flush()
404
 
{
405
 
        enter_();
406
 
        iSocket->flush();
407
 
        exit_();
408
 
}
409
 
 
410
 
void CSSocketOutputStream::write(char b)
411
 
{
412
 
        enter_();
413
 
        iSocket->write(b);
414
 
        exit_();
415
 
}
416
 
 
417
 
void CSSocketOutputStream::reset()
418
 
{
419
 
        enter_();
420
 
        CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketOutputStream::reset() not supported");
421
 
        exit_();
422
 
}
423
 
 
424
 
const char *CSSocketOutputStream::identify()
425
 
{
426
 
        return iSocket->identify();
427
 
}
428
 
 
429
 
CSSocketOutputStream *CSSocketOutputStream::newStream(CSSocket *s)
430
 
{
431
 
        CSSocketOutputStream *str;
432
 
 
433
 
        if (!(str = new CSSocketOutputStream())) {
434
 
                s->release();
435
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
436
 
        }
437
 
        str->iSocket = s;
438
 
        return  str;
439
 
}
440
 
 
441
 
/*
442
 
 * ---------------------------------------------------------------
443
 
 * BUFFERED INPUT STREAMS
444
 
 */
445
 
 
446
 
CSBufferedInputStream::~CSBufferedInputStream()
447
 
{
448
 
        if (iStream)
449
 
                iStream->release();
450
 
}
451
 
 
452
 
void CSBufferedInputStream::close()
453
 
{
454
 
        enter_();
455
 
        iStream->close();
456
 
        exit_();
457
 
}
458
 
 
459
 
size_t CSBufferedInputStream::read(char *b, size_t len)
460
 
{
461
 
        size_t tfer;
462
 
 
463
 
        enter_();
464
 
        if (iBuffPos < iBuffTotal) {
465
 
                tfer = iBuffTotal - iBuffPos;
466
 
                if (tfer > len)
467
 
                        tfer = len;
468
 
                memcpy(b, iBuffer + iBuffPos, tfer);
469
 
                iBuffPos += tfer;
470
 
        }
471
 
        else
472
 
                tfer = iStream->read(b, len);
473
 
        return_(tfer);
474
 
}
475
 
 
476
 
int CSBufferedInputStream::read()
477
 
{
478
 
        int ch;
479
 
        
480
 
        enter_();
481
 
        if (iBuffPos == iBuffTotal) {
482
 
                iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
483
 
                iBuffPos = 0;
484
 
        }
485
 
        if (iBuffPos < iBuffTotal) {
486
 
                ch = iBuffer[iBuffPos];
487
 
                iBuffPos++;
488
 
        }
489
 
        else
490
 
                ch = -1;
491
 
        return_(ch);
492
 
}
493
 
 
494
 
int CSBufferedInputStream::peek()
495
 
{
496
 
        int ch;
497
 
        
498
 
        enter_();
499
 
        if (iBuffPos == iBuffTotal) {
500
 
                iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
501
 
                iBuffPos = 0;
502
 
        }
503
 
        if (iBuffPos < iBuffTotal)
504
 
                ch = iBuffer[iBuffPos];
505
 
        else
506
 
                ch = -1;
507
 
        return_(ch);
508
 
}
509
 
 
510
 
void CSBufferedInputStream::reset()
511
 
{
512
 
        iBuffPos = iBuffTotal =0;
513
 
        iStream->reset();
514
 
}
515
 
 
516
 
const char *CSBufferedInputStream::identify()
517
 
{
518
 
        return iStream->identify();
519
 
}
520
 
 
521
 
CSBufferedInputStream *CSBufferedInputStream::newStream(CSInputStream* i)
522
 
{
523
 
        CSBufferedInputStream *s;
524
 
 
525
 
        if (!(s = new CSBufferedInputStream())) {
526
 
                i->release();
527
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
528
 
        }
529
 
        s->iStream = i;
530
 
        return  s;
531
 
}
532
 
 
533
 
/*
534
 
 * ---------------------------------------------------------------
535
 
 * BUFFERED OUTPUT STREAMS
536
 
 */
537
 
 
538
 
CSBufferedOutputStream::~CSBufferedOutputStream()
539
 
{
540
 
        if (iStream)
541
 
                iStream->release();
542
 
}
543
 
 
544
 
void CSBufferedOutputStream::close()
545
 
{
546
 
        enter_();
547
 
        iStream->close();
548
 
        exit_();
549
 
}
550
 
 
551
 
void CSBufferedOutputStream::write(const char *b, size_t len)
552
 
{
553
 
        size_t tfer;
554
 
 
555
 
        // If the length of the data being written is greater than half
556
 
        // the buffer size then the data is written directly through
557
 
        // with out buffering.
558
 
        enter_();
559
 
        if (iBuffTotal < CS_STREAM_BUFFER_SIZE/2) {
560
 
                tfer = CS_STREAM_BUFFER_SIZE - iBuffTotal;
561
 
                
562
 
                if (tfer > len)
563
 
                        tfer = len;
564
 
                memcpy(iBuffer + iBuffTotal, b, tfer);
565
 
                iBuffTotal += tfer;
566
 
                b += tfer;
567
 
                len -= tfer;
568
 
        }
569
 
        if (len > 0) {
570
 
                flush();
571
 
                if (len > CS_STREAM_BUFFER_SIZE/2)
572
 
                        iStream->write(b, len);
573
 
                else {
574
 
                        memcpy(iBuffer, b, len);
575
 
                        iBuffTotal = len;
576
 
                }
577
 
        }
578
 
        exit_();
579
 
}
580
 
 
581
 
void CSBufferedOutputStream::flush()
582
 
{
583
 
        size_t len;
584
 
 
585
 
        enter_();
586
 
        if ((len = iBuffTotal)) {
587
 
                /* Discard the contents of the buffer
588
 
                 * if flush fails, because we do
589
 
                 * not know how much was written anyway!
590
 
                 */
591
 
                iBuffTotal = 0;
592
 
                iStream->write((char *) iBuffer, len);
593
 
        }
594
 
        exit_();
595
 
}
596
 
 
597
 
void CSBufferedOutputStream::write(char b)
598
 
{
599
 
        enter_();
600
 
        if (iBuffTotal == CS_STREAM_BUFFER_SIZE)
601
 
                flush();
602
 
        iBuffer[iBuffTotal] = b;
603
 
        iBuffTotal++;
604
 
        exit_();
605
 
}
606
 
 
607
 
void CSBufferedOutputStream::reset()
608
 
{
609
 
        iBuffTotal = 0;
610
 
        iStream->reset();
611
 
}
612
 
 
613
 
const char *CSBufferedOutputStream::identify()
614
 
{
615
 
        return iStream->identify();
616
 
}
617
 
 
618
 
CSBufferedOutputStream *CSBufferedOutputStream::newStream(CSOutputStream* i)
619
 
{
620
 
        CSBufferedOutputStream *s;
621
 
 
622
 
        if (!(s = new CSBufferedOutputStream())) {
623
 
                i->release();
624
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
625
 
        }
626
 
        s->iStream = i;
627
 
        return s;
628
 
}
629
 
 
630
 
/*
631
 
 * ---------------------------------------------------------------
632
 
 * MEMORY INPUT STREAMS
633
 
 */
634
 
CSMemoryInputStream *CSMemoryInputStream::newStream(const u_char* buffer, uint32_t length)
635
 
{
636
 
        CSMemoryInputStream *s;
637
 
 
638
 
        if (!(s = new CSMemoryInputStream())) {
639
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
640
 
        }
641
 
        s->iMemory = buffer;
642
 
        s->iMemTotal = length;
643
 
        return s;
644
 
}
645
 
 
646
 
 
647
 
CSMemoryOutputStream *CSMemoryOutputStream::newStream(size_t init_length, size_t min_alloc)
648
 
{
649
 
        CSMemoryOutputStream *s;
650
 
 
651
 
        if (!(s = new CSMemoryOutputStream())) {
652
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
653
 
        }
654
 
        
655
 
        s->iMemory = (u_char *) cs_malloc(init_length);
656
 
        s->iMemTotal = init_length;
657
 
        s->iMemSpace = init_length;
658
 
        s->iMemPos = s->iMemory;
659
 
        s->iMemMin = min_alloc;
660
 
        return s;
661
 
}
662
 
 
663
 
CSMemoryOutputStream::~CSMemoryOutputStream()
664
 
{
665
 
        if (iMemory)
666
 
                cs_free(iMemory);
667
 
}
668
 
 
669
 
void CSMemoryOutputStream::write(const char *b, size_t len)
670
 
{
671
 
        if (iMemSpace < len) {
672
 
                size_t new_size = iMemTotal + ((len < iMemMin)? iMemMin:len);
673
 
                
674
 
                cs_realloc((void**) &iMemory, new_size);
675
 
                iMemPos = iMemory + (iMemTotal - iMemSpace);
676
 
                iMemSpace += (new_size - iMemTotal);
677
 
                iMemTotal = new_size;           
678
 
        }
679
 
        memcpy(iMemPos, b, len);
680
 
        iMemPos +=len;
681
 
        iMemSpace -= len;
682
 
}
683
 
 
684
 
void CSMemoryOutputStream::write(const char b)
685
 
{
686
 
        if (!iMemSpace) {
687
 
                cs_realloc((void**) &iMemory, iMemTotal + iMemMin);
688
 
                iMemPos = iMemory + iMemTotal;
689
 
                iMemSpace += iMemMin;
690
 
                iMemTotal += iMemMin;           
691
 
        }
692
 
        *iMemPos = b;
693
 
        iMemPos++;
694
 
        iMemSpace--;
695
 
}
696
 
 
697
 
void CSMemoryOutputStream::reset()
698
 
{
699
 
        iMemPos = iMemory;
700
 
        iMemSpace = iMemTotal;
701
 
}
702
 
 
703
 
const char *CSMemoryOutputStream::identify()
704
 
{
705
 
        return "memory stream";
706
 
}
707
 
 
708
 
/*
709
 
 * ---------------------------------------------------------------
710
 
 * STATIC (user) MEMORY OUTPUT STREAM
711
 
 */
712
 
void CSStaticMemoryOutputStream::write(const char *b, size_t len)
713
 
{
714
 
        if (iMemSpace < len) {
715
 
                enter_();
716
 
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
717
 
                exit_();
718
 
        }
719
 
        memcpy(iMemPos, b, len);
720
 
        iMemPos +=len;
721
 
        iMemSpace -= len;
722
 
}
723
 
 
724
 
void CSStaticMemoryOutputStream::write(const char b)
725
 
{
726
 
        if (!iMemSpace) {
727
 
                enter_();
728
 
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
729
 
                exit_();
730
 
        }
731
 
        *iMemPos = b;
732
 
        iMemPos++;
733
 
        iMemSpace--;
734
 
}
735
 
 
736
 
/*
737
 
 * ---------------------------------------------------------------
738
 
 * Callback InPUT STREAM
739
 
 */
740
 
 
741
 
CSCallbackInputStream *CSCallbackInputStream::newStream(CSStreamReadCallbackFunc in_callback, void *user_data)
742
 
{
743
 
        CSCallbackInputStream *s;
744
 
 
745
 
        if (!(s = new CSCallbackInputStream())) {
746
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
747
 
        }
748
 
        
749
 
        s->callback = in_callback;
750
 
        s->cb_data = user_data;
751
 
        return s;
752
 
}
753