~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSStream.h

  • Committer: Monty Taylor
  • Date: 2010-12-24 02:13:05 UTC
  • mto: This revision was merged to the branch mainline in revision 2038.
  • Revision ID: mordred@inaugust.com-20101224021305-e3slv1cyjczqorij
Changed the bzrignore file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 
18
 *
 
19
 * Original Author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-06-07
 
23
 *
 
24
 * CORE SYSTEM:
 
25
 * Basic input and output streams.
 
26
 *
 
27
 * These objects wrap the system streams, and simplify things.
 
28
 * I also want to standardize exceptions and implement
 
29
 * socket based streams.
 
30
 *
 
31
 */
 
32
 
 
33
#ifndef __CSSTREAM_H__
 
34
#define __CSSTREAM_H__
 
35
 
 
36
#define DEFAULT_BUFFER_SIZE             64000
 
37
 
 
38
#include "CSDefs.h"
 
39
#include "CSFile.h"
 
40
#include "CSSocket.h"
 
41
 
 
42
class CSInputStream : public CSRefObject {
 
43
public:
 
44
        CSInputStream() { }
 
45
        virtual ~CSInputStream() { }
 
46
 
 
47
        /*
 
48
         * Closes this input stream and releases any system
 
49
         * resources associated  with the stream.
 
50
         */
 
51
        virtual void close() = 0;
 
52
 
 
53
        /*
 
54
         * Reads up to len bytes of data from the input stream into an
 
55
         * array of bytes. Return the number of bytes read.
 
56
         *
 
57
         * Zero will be returned in the case of EOF.
 
58
         *
 
59
         * This call blocks until at least one byte is
 
60
         * returned.
 
61
         */
 
62
        virtual size_t read(char *b, size_t len) = 0;
 
63
 
 
64
        /* Reads the next byte of data from the input stream.
 
65
         * Returns -1 if EOF.
 
66
         */
 
67
        virtual int read() = 0;
 
68
 
 
69
        /* Read one character ahead. */
 
70
        virtual int peek() = 0;
 
71
 
 
72
        /*
 
73
         *  Reset this output stream to the start inorder to restart the write.
 
74
         */
 
75
        virtual void reset() = 0; 
 
76
 
 
77
        /* Return the name of the file, or whatever: */
 
78
        virtual const char *identify() = 0;
 
79
 
 
80
        /*
 
81
         * Read a line from the input stream. This function
 
82
         * handles all types of line endings. The function
 
83
         * return NULL on EOF.
 
84
         */
 
85
        CSStringBuffer *readLine();
 
86
};
 
87
 
 
88
class CSOutputStream : public CSRefObject {
 
89
public:
 
90
        /*
 
91
         * Closes this input stream and releases any system
 
92
         * resources associated  with the stream.
 
93
         */
 
94
        virtual void close() = 0;
 
95
 
 
96
        /*
 
97
         * Writes len bytes from the specified byte array starting at
 
98
         * offset off to this output stream.
 
99
         */
 
100
        virtual void write(const char *b, size_t len) = 0;
 
101
 
 
102
        /*
 
103
         * Returns the default EOL indicator.
 
104
         * Will be \n, \r or \r\n.
 
105
         */
 
106
        virtual const char *getEOL() = 0;
 
107
 
 
108
        /*
 
109
         *  Flushes this output stream and forces any buffered output
 
110
         * bytes to be written out.
 
111
         */
 
112
        virtual void flush() = 0;
 
113
 
 
114
        /*
 
115
         * Writes the specified byte to this output stream.
 
116
         */
 
117
        virtual void write(char b) = 0; 
 
118
 
 
119
        /*
 
120
         * Reset this output stream to the start inorder to restart the write.
 
121
         */
 
122
        virtual void reset() = 0; 
 
123
 
 
124
        virtual const char *identify() = 0;
 
125
 
 
126
        /*
 
127
         * Write a line. Terminator is specific to the
 
128
         * type of output stream and may depend on the
 
129
         * current platform.
 
130
         */
 
131
        void printLine(const char *cstr);
 
132
 
 
133
        /*
 
134
         * Various write types:
 
135
         */
 
136
        virtual void print(const char *cstr);
 
137
        virtual void print(CSString *s);
 
138
        virtual void print(int value);
 
139
        virtual void print(uint64_t value);
 
140
};
 
141
 
 
142
class CSStream : public CSObject {
 
143
public:
 
144
        static void pipe(CSOutputStream *out, CSInputStream *in);
 
145
};
 
146
 
 
147
/* File Stream: */
 
148
 
 
149
class CSFileInputStream : public CSInputStream {
 
150
public:
 
151
        CSFileInputStream(): iFile(NULL), iReadOffset(0) { }
 
152
        virtual ~CSFileInputStream();
 
153
 
 
154
        virtual void close();
 
155
 
 
156
        virtual size_t read(char *b, size_t len);
 
157
 
 
158
        virtual int read();
 
159
 
 
160
        virtual int peek();
 
161
 
 
162
        /*
 
163
         *  Reset this output stream to the start inorder to restart the read.
 
164
         */
 
165
        virtual void reset(); 
 
166
 
 
167
        virtual const char *identify();
 
168
 
 
169
        static CSFileInputStream *newStream(CSFile* f);
 
170
        static CSFileInputStream *newStream(CSFile* f, off64_t offset);
 
171
 
 
172
private:
 
173
        CSFile  *iFile;
 
174
        off64_t iReadOffset;
 
175
};
 
176
 
 
177
class CSFileOutputStream : public CSOutputStream {
 
178
public:
 
179
        CSFileOutputStream(): iFile(NULL), iWriteOffset(0) { }
 
180
        virtual ~CSFileOutputStream();
 
181
 
 
182
        virtual void close();
 
183
 
 
184
        virtual void write(const char *b, size_t len);
 
185
 
 
186
        virtual const char *getEOL();
 
187
 
 
188
        virtual void flush();
 
189
 
 
190
        virtual void write(char b);
 
191
 
 
192
        virtual void reset(); 
 
193
 
 
194
        virtual const char *identify();
 
195
 
 
196
        static CSFileOutputStream *newStream(CSFile* f);
 
197
        static CSFileOutputStream *newStream(CSFile* f, off64_t offset);
 
198
 
 
199
private:
 
200
        CSFile  *iFile;
 
201
        off64_t iWriteOffset;
 
202
};
 
203
 
 
204
/* Socket Stream */
 
205
 
 
206
class CSSocketInputStream : public CSInputStream {
 
207
public:
 
208
        CSSocketInputStream(): iSocket(NULL) { }
 
209
        virtual ~CSSocketInputStream();
 
210
 
 
211
        virtual void close();
 
212
 
 
213
        virtual size_t read(char *b, size_t len);
 
214
 
 
215
        virtual int read();
 
216
 
 
217
        virtual int peek();
 
218
 
 
219
        virtual void reset(); 
 
220
 
 
221
        virtual const char *identify();
 
222
 
 
223
        static CSSocketInputStream *newStream(CSSocket *s);
 
224
 
 
225
private:
 
226
        CSSocket* iSocket;
 
227
};
 
228
 
 
229
class CSSocketOutputStream : public CSOutputStream {
 
230
public:
 
231
        CSSocketOutputStream(): iSocket(NULL) { }
 
232
        virtual ~CSSocketOutputStream();
 
233
 
 
234
        virtual void close();
 
235
 
 
236
        virtual void write(const char *b, size_t len);
 
237
 
 
238
        virtual const char *getEOL() { return "\n"; };
 
239
 
 
240
        virtual void flush();
 
241
 
 
242
        virtual void write(char b);
 
243
 
 
244
        virtual void reset(); 
 
245
 
 
246
        virtual const char *identify();
 
247
 
 
248
        static CSSocketOutputStream *newStream(CSSocket *s);
 
249
 
 
250
private:
 
251
        CSSocket* iSocket;
 
252
};
 
253
 
 
254
/* Buffered Stream: */
 
255
#ifdef DEBUG_disabled
 
256
#define CS_STREAM_BUFFER_SIZE                   80
 
257
#else
 
258
#define CS_STREAM_BUFFER_SIZE                   (32 * 1024)
 
259
#endif
 
260
 
 
261
class CSBufferedInputStream : public CSInputStream {
 
262
public:
 
263
        CSBufferedInputStream(): iStream(NULL), iBuffTotal(0), iBuffPos(0) { }
 
264
        virtual ~CSBufferedInputStream();
 
265
 
 
266
        virtual void close();
 
267
 
 
268
        virtual size_t read(char *b, size_t len);
 
269
 
 
270
        virtual int read();
 
271
 
 
272
        virtual int peek();
 
273
 
 
274
        virtual void reset(); 
 
275
 
 
276
        virtual const char *identify();
 
277
 
 
278
        static CSBufferedInputStream *newStream(CSInputStream* i);
 
279
 
 
280
private:
 
281
        CSInputStream* iStream;
 
282
        u_char iBuffer[CS_STREAM_BUFFER_SIZE];
 
283
        uint32_t iBuffTotal;
 
284
        uint32_t iBuffPos;
 
285
};
 
286
 
 
287
class CSBufferedOutputStream : public CSOutputStream {
 
288
public:
 
289
        CSBufferedOutputStream(): iStream(NULL), iBuffTotal(0) { }
 
290
        virtual ~CSBufferedOutputStream();
 
291
 
 
292
        virtual void close();
 
293
 
 
294
        virtual void write(const char *b, size_t len);
 
295
 
 
296
        virtual const char *getEOL() { return "\n"; };
 
297
 
 
298
        virtual void flush();
 
299
 
 
300
        virtual void write(char b);
 
301
 
 
302
        virtual void reset(); 
 
303
 
 
304
        virtual const char *identify();
 
305
 
 
306
        static CSBufferedOutputStream *newStream(CSOutputStream* i);
 
307
 
 
308
private:
 
309
        CSOutputStream* iStream;
 
310
        u_char iBuffer[CS_STREAM_BUFFER_SIZE];
 
311
        uint32_t iBuffTotal;
 
312
};
 
313
 
 
314
/* memory Stream */
 
315
class CSMemoryInputStream : public CSInputStream {
 
316
public:
 
317
        CSMemoryInputStream(): iMemory(NULL), iMemTotal(0), iMemPos(0) { }
 
318
        ~CSMemoryInputStream(){}
 
319
 
 
320
        virtual void close() {}
 
321
 
 
322
        virtual size_t read(char *b, size_t len)
 
323
        {
 
324
                if (len > (iMemTotal - iMemPos))
 
325
                        len = iMemTotal - iMemPos;
 
326
                
 
327
                memcpy(b, iMemory + iMemPos, len);
 
328
                iMemPos += len; 
 
329
                return len;
 
330
        }
 
331
 
 
332
        virtual int read()
 
333
        {
 
334
                int b = -1;
 
335
                if (iMemPos < iMemTotal) 
 
336
                        b = iMemory[iMemPos++];
 
337
                return b;
 
338
        }
 
339
 
 
340
        virtual int peek()
 
341
        {
 
342
                int b = -1;
 
343
                if (iMemPos < iMemTotal) 
 
344
                        b = iMemory[iMemPos];
 
345
                return b;
 
346
        }
 
347
 
 
348
        virtual void reset() {iMemPos = 0;}
 
349
        
 
350
        virtual const char *identify() 
 
351
        {
 
352
                return "memory stream";
 
353
        }
 
354
 
 
355
        static CSMemoryInputStream *newStream(const u_char* buffer, uint32_t length);
 
356
 
 
357
private:
 
358
        const u_char *iMemory;
 
359
        uint32_t iMemTotal;
 
360
        uint32_t iMemPos;
 
361
};
 
362
 
 
363
 
 
364
class CSMemoryOutputStream : public CSOutputStream {
 
365
public:
 
366
        CSMemoryOutputStream(): iMemory(NULL), iMemTotal(0), iMemSpace(0), iMemMin(0), iMemPos(NULL){ }
 
367
        virtual ~CSMemoryOutputStream();
 
368
 
 
369
        virtual void close() {}
 
370
 
 
371
        virtual void write(const char *b, size_t len);
 
372
        virtual const char *getEOL() { return "\n"; };
 
373
 
 
374
        virtual void flush() {}
 
375
 
 
376
        virtual void write(char b);
 
377
 
 
378
        const u_char *getMemory(size_t *len)
 
379
        {
 
380
                *len = iMemPos - iMemory;
 
381
                return iMemory;
 
382
        }
 
383
        
 
384
        virtual void reset();
 
385
        
 
386
        virtual const char *identify();
 
387
        
 
388
        static CSMemoryOutputStream *newStream(size_t init_length, size_t min_alloc);
 
389
 
 
390
private:
 
391
        u_char *iMemory;
 
392
        uint32_t iMemTotal;
 
393
        uint32_t iMemSpace;
 
394
        uint32_t iMemMin;
 
395
        u_char *iMemPos;
 
396
};
 
397
 
 
398
class CSStaticMemoryOutputStream : public CSOutputStream {
 
399
public:
 
400
        CSStaticMemoryOutputStream(u_char *mem, off64_t size): iMemory(mem), iMemSpace(size), iMemSize(size), iMemPos(mem){ }
 
401
        virtual ~CSStaticMemoryOutputStream() {}
 
402
 
 
403
        virtual void close() {}
 
404
 
 
405
        virtual void write(const char *b, size_t len);
 
406
        virtual const char *getEOL() { return "\n"; };
 
407
 
 
408
        virtual void flush() {}
 
409
 
 
410
        virtual void write(char b);
 
411
        
 
412
        virtual void reset() 
 
413
        {
 
414
                iMemPos = iMemory;
 
415
                iMemSpace = iMemSize;
 
416
        }
 
417
        
 
418
        virtual const char *identify() 
 
419
        {
 
420
                return "memory stream";
 
421
        }
 
422
        
 
423
        off64_t getSize() { return iMemPos - iMemory; }
 
424
 
 
425
private:
 
426
        u_char *iMemory;
 
427
        off64_t iMemSpace;
 
428
        off64_t iMemSize;
 
429
        u_char *iMemPos;
 
430
};
 
431
 
 
432
typedef size_t (* CSStreamReadCallbackFunc) (void *caller_data, char *buffer, size_t buffer_size, u_char reset);
 
433
 
 
434
class CSCallbackInputStream : public CSInputStream {
 
435
public:
 
436
        CSCallbackInputStream(): callback(NULL), cb_data(NULL), havePeek(false), doReset(false) { }
 
437
        ~CSCallbackInputStream(){}
 
438
 
 
439
        virtual void close() {}
 
440
 
 
441
        virtual size_t read(char *b, size_t len)
 
442
        {
 
443
                size_t size = 0;
 
444
                
 
445
                if (havePeek) {
 
446
                        havePeek = false;
 
447
                        *b =  peek_char;
 
448
                        b++; len--;
 
449
                        if (len) {
 
450
                                size = callback(cb_data, b, len, doReset);
 
451
                        }
 
452
                                
 
453
                        size++;                 
 
454
                } else
 
455
                        size = callback(cb_data, b, len, doReset);
 
456
                        
 
457
                if (doReset)
 
458
                        doReset = false;
 
459
                        
 
460
                return size;
 
461
        }
 
462
 
 
463
        virtual int read()
 
464
        {
 
465
                char c;
 
466
                
 
467
                if (havePeek) {
 
468
                        havePeek = false;
 
469
                        return peek_char;
 
470
                }
 
471
                if (!callback(cb_data, &c, 1, doReset))
 
472
                        c = -1;
 
473
                        
 
474
                if (doReset)
 
475
                        doReset = false;
 
476
                
 
477
                return c;
 
478
        }
 
479
 
 
480
        virtual int peek()
 
481
        {
 
482
                if (!havePeek) {
 
483
                        if (callback(cb_data, &peek_char, 1, doReset))
 
484
                                havePeek = true;
 
485
                        else
 
486
                                return -1;
 
487
                }
 
488
                return peek_char;
 
489
        }
 
490
 
 
491
        virtual void reset() 
 
492
        {
 
493
                havePeek = false;
 
494
                doReset = false;
 
495
        }
 
496
 
 
497
        virtual const char *identify() 
 
498
        {
 
499
                return "callback stream";
 
500
        }
 
501
 
 
502
        static CSCallbackInputStream *newStream(CSStreamReadCallbackFunc callback, void *user_data);
 
503
 
 
504
private:
 
505
        CSStreamReadCallbackFunc callback;
 
506
        void *cb_data;
 
507
        char peek_char;
 
508
        bool havePeek;
 
509
        bool doReset;
 
510
};
 
511
 
 
512
#endif
 
513
 
 
514