~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSStream.h

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original Author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-06-07
 
23
 *
 
24
 * CORE SYSTEM:
 
25
 * Basic input and output streams.
 
26
 *
 
27
 * These objects wrap the system streams, and simplify things.
 
28
 * I also want to standardize exceptions and implement
 
29
 * socket based streams.
 
30
 *
 
31
 */
 
32
 
 
33
#ifndef __CSSTREAM_H__
 
34
#define __CSSTREAM_H__
 
35
 
 
36
#define DEFAULT_BUFFER_SIZE             64000
 
37
 
 
38
#include "CSDefs.h"
 
39
#include "CSFile.h"
 
40
#include "CSSocket.h"
 
41
 
 
42
class CSInputStream : public CSRefObject {
 
43
public:
 
44
        CSInputStream() { }
 
45
        virtual ~CSInputStream() { }
 
46
 
 
47
        /*
 
48
         * Closes this input stream and releases any system
 
49
         * resources associated  with the stream.
 
50
         */
 
51
        virtual void close() = 0;
 
52
 
 
53
        /*
 
54
         * Reads up to len bytes of data from the input stream into an
 
55
         * array of bytes. Return the number of bytes read.
 
56
         *
 
57
         * Zero will be returned in the case of EOF.
 
58
         *
 
59
         * This call blocks until at least one byte is
 
60
         * returned.
 
61
         */
 
62
        virtual size_t read(char *b, size_t len) = 0;
 
63
 
 
64
        /* Reads the next byte of data from the input stream.
 
65
         * Returns -1 if EOF.
 
66
         */
 
67
        virtual int read() = 0;
 
68
 
 
69
        /* Read one character ahead. */
 
70
        virtual int peek() = 0;
 
71
 
 
72
        /*
 
73
         *  Reset this output stream to the start inorder to restart the write.
 
74
         */
 
75
        virtual void reset() = 0; 
 
76
 
 
77
        /*
 
78
         * Read a line from the input stream. This function
 
79
         * handles all types of line endings. The function
 
80
         * return NULL on EOF.
 
81
         */
 
82
        CSStringBuffer *readLine();
 
83
};
 
84
 
 
85
class CSOutputStream : public CSRefObject {
 
86
public:
 
87
        /*
 
88
         * Closes this input stream and releases any system
 
89
         * resources associated  with the stream.
 
90
         */
 
91
        virtual void close() = 0;
 
92
 
 
93
        /*
 
94
         * Writes len bytes from the specified byte array starting at
 
95
         * offset off to this output stream.
 
96
         */
 
97
        virtual void write(const char *b, size_t len) = 0;
 
98
 
 
99
        /*
 
100
         * Returns the default EOL indicator.
 
101
         * Will be \n, \r or \r\n.
 
102
         */
 
103
        virtual const char *getEOL() = 0;
 
104
 
 
105
        /*
 
106
         *  Flushes this output stream and forces any buffered output
 
107
         * bytes to be written out.
 
108
         */
 
109
        virtual void flush() = 0;
 
110
 
 
111
        /*
 
112
         *  Writes the specified byte to this output stream.
 
113
         */
 
114
        virtual void write(char b) = 0; 
 
115
 
 
116
        /*
 
117
         *  Reset this output stream to the start inorder to restart the write.
 
118
         */
 
119
        virtual void reset() = 0; 
 
120
 
 
121
        /*
 
122
         * Write a line. Terminator is specific to the
 
123
         * type of output stream and may depend on the
 
124
         * current platform.
 
125
         */
 
126
        void printLine(const char *cstr);
 
127
 
 
128
        /*
 
129
         * Various write types:
 
130
         */
 
131
        virtual void print(const char *cstr);
 
132
        virtual void print(CSString *s);
 
133
        virtual void print(int value);
 
134
        virtual void print(uint64_t value);
 
135
};
 
136
 
 
137
class CSStream : public CSObject {
 
138
public:
 
139
        static void pipe(CSOutputStream *out, CSInputStream *in);
 
140
};
 
141
 
 
142
/* File Stream: */
 
143
 
 
144
class CSFileInputStream : public CSInputStream {
 
145
public:
 
146
        CSFileInputStream(): iFile(NULL), iReadOffset(0) { }
 
147
        virtual ~CSFileInputStream();
 
148
 
 
149
        virtual void close();
 
150
 
 
151
        virtual size_t read(char *b, size_t len);
 
152
 
 
153
        virtual int read();
 
154
 
 
155
        virtual int peek();
 
156
 
 
157
        /*
 
158
         *  Reset this output stream to the start inorder to restart the read.
 
159
         */
 
160
        virtual void reset(); 
 
161
 
 
162
        static CSFileInputStream *newStream(CSFile* f);
 
163
        static CSFileInputStream *newStream(CSFile* f, off_t offset);
 
164
 
 
165
private:
 
166
        CSFile  *iFile;
 
167
        off_t   iReadOffset;
 
168
};
 
169
 
 
170
class CSFileOutputStream : public CSOutputStream {
 
171
public:
 
172
        CSFileOutputStream(): iFile(NULL), iWriteOffset(0) { }
 
173
        virtual ~CSFileOutputStream();
 
174
 
 
175
        virtual void close();
 
176
 
 
177
        virtual void write(const char *b, size_t len);
 
178
 
 
179
        virtual const char *getEOL();
 
180
 
 
181
        virtual void flush();
 
182
 
 
183
        virtual void write(char b);
 
184
 
 
185
        virtual void reset(); 
 
186
 
 
187
        static CSFileOutputStream *newStream(CSFile* f);
 
188
        static CSFileOutputStream *newStream(CSFile* f, off_t offset);
 
189
 
 
190
private:
 
191
        CSFile  *iFile;
 
192
        off_t   iWriteOffset;
 
193
};
 
194
 
 
195
/* Socket Stream */
 
196
 
 
197
class CSSocketInputStream : public CSInputStream {
 
198
public:
 
199
        CSSocketInputStream(): iSocket(NULL) { }
 
200
        virtual ~CSSocketInputStream();
 
201
 
 
202
        virtual void close();
 
203
 
 
204
        virtual size_t read(char *b, size_t len);
 
205
 
 
206
        virtual int read();
 
207
 
 
208
        virtual int peek();
 
209
 
 
210
        virtual void reset(); 
 
211
 
 
212
        static CSSocketInputStream *newStream(CSSocket *s);
 
213
 
 
214
private:
 
215
        CSSocket* iSocket;
 
216
};
 
217
 
 
218
class CSSocketOutputStream : public CSOutputStream {
 
219
public:
 
220
        CSSocketOutputStream(): iSocket(NULL) { }
 
221
        virtual ~CSSocketOutputStream();
 
222
 
 
223
        virtual void close();
 
224
 
 
225
        virtual void write(const char *b, size_t len);
 
226
 
 
227
        virtual const char *getEOL() { return "\n"; };
 
228
 
 
229
        virtual void flush();
 
230
 
 
231
        virtual void write(char b);
 
232
 
 
233
        virtual void reset(); 
 
234
 
 
235
        static CSSocketOutputStream *newStream(CSSocket *s);
 
236
 
 
237
private:
 
238
        CSSocket* iSocket;
 
239
};
 
240
 
 
241
/* Buffered Stream: */
 
242
#ifdef DEBUG
 
243
#define CS_STREAM_BUFFER_SIZE                   11
 
244
//#define CS_STREAM_BUFFER_SIZE                 (64 * 1024)
 
245
#else
 
246
#define CS_STREAM_BUFFER_SIZE                   (64 * 1024)
 
247
#endif
 
248
 
 
249
class CSBufferedInputStream : public CSInputStream {
 
250
public:
 
251
        CSBufferedInputStream(): iStream(NULL), iBuffTotal(0), iBuffPos(0) { }
 
252
        virtual ~CSBufferedInputStream();
 
253
 
 
254
        virtual void close();
 
255
 
 
256
        virtual size_t read(char *b, size_t len);
 
257
 
 
258
        virtual int read();
 
259
 
 
260
        virtual int peek();
 
261
 
 
262
        virtual void reset(); 
 
263
 
 
264
        static CSBufferedInputStream *newStream(CSInputStream* i);
 
265
 
 
266
private:
 
267
        CSInputStream* iStream;
 
268
        u_char iBuffer[CS_STREAM_BUFFER_SIZE];
 
269
        u_int iBuffTotal;
 
270
        u_int iBuffPos;
 
271
};
 
272
 
 
273
class CSBufferedOutputStream : public CSOutputStream {
 
274
public:
 
275
        CSBufferedOutputStream(): iStream(NULL), iBuffTotal(0) { }
 
276
        virtual ~CSBufferedOutputStream();
 
277
 
 
278
        virtual void close();
 
279
 
 
280
        virtual void write(const char *b, size_t len);
 
281
 
 
282
        virtual const char *getEOL() { return "\n"; };
 
283
 
 
284
        virtual void flush();
 
285
 
 
286
        virtual void write(char b);
 
287
 
 
288
        virtual void reset(); 
 
289
 
 
290
        static CSBufferedOutputStream *newStream(CSOutputStream* i);
 
291
 
 
292
private:
 
293
        CSOutputStream* iStream;
 
294
        u_char iBuffer[CS_STREAM_BUFFER_SIZE];
 
295
        u_int iBuffTotal;
 
296
};
 
297
 
 
298
/* memory Stream */
 
299
class CSMemoryInputStream : public CSInputStream {
 
300
public:
 
301
        CSMemoryInputStream(): iMemory(NULL), iMemTotal(0), iMemPos(0) { }
 
302
        ~CSMemoryInputStream(){}
 
303
 
 
304
        virtual void close() {}
 
305
 
 
306
        virtual size_t read(char *b, size_t len)
 
307
        {
 
308
                if (len > (iMemTotal - iMemPos))
 
309
                        len = iMemTotal - iMemPos;
 
310
                
 
311
                memcpy(b, iMemory + iMemPos, len);
 
312
                iMemPos += len; 
 
313
                return len;
 
314
        }
 
315
 
 
316
        virtual int read()
 
317
        {
 
318
                int b = -1;
 
319
                if (iMemPos < iMemTotal) 
 
320
                        b = iMemory[iMemPos++];
 
321
                return b;
 
322
        }
 
323
 
 
324
        virtual int peek()
 
325
        {
 
326
                int b = -1;
 
327
                if (iMemPos < iMemTotal) 
 
328
                        b = iMemory[iMemPos];
 
329
                return b;
 
330
        }
 
331
 
 
332
        virtual void reset() {iMemPos = 0;}
 
333
        
 
334
        static CSMemoryInputStream *newStream(const u_char* buffer, u_int length);
 
335
 
 
336
private:
 
337
        const u_char *iMemory;
 
338
        u_int iMemTotal;
 
339
        u_int iMemPos;
 
340
};
 
341
 
 
342
 
 
343
class CSMemoryOutputStream : public CSOutputStream {
 
344
public:
 
345
        CSMemoryOutputStream(): iMemory(NULL), iMemTotal(0), iMemSpace(0), iMemMin(0), iMemPos(NULL){ }
 
346
        virtual ~CSMemoryOutputStream();
 
347
 
 
348
        virtual void close() {}
 
349
 
 
350
        virtual void write(const char *b, size_t len);
 
351
        virtual const char *getEOL() { return "\n"; };
 
352
 
 
353
        virtual void flush() {}
 
354
 
 
355
        virtual void write(char b);
 
356
 
 
357
        const u_char *getMemory(size_t *len)
 
358
        {
 
359
                *len = iMemPos - iMemory;
 
360
                return iMemory;
 
361
        }
 
362
        
 
363
        virtual void reset();
 
364
        
 
365
        static CSMemoryOutputStream *newStream(size_t init_length, size_t min_alloc);
 
366
 
 
367
private:
 
368
        u_char *iMemory;
 
369
        u_int iMemTotal;
 
370
        u_int iMemSpace;
 
371
        u_int iMemMin;
 
372
        u_char *iMemPos;
 
373
};
 
374
 
 
375
class CSStaticMemoryOutputStream : public CSOutputStream {
 
376
public:
 
377
        CSStaticMemoryOutputStream(u_char *mem, off_t size): iMemory(mem), iMemSpace(size), iMemSize(size), iMemPos(mem){ }
 
378
        virtual ~CSStaticMemoryOutputStream() {}
 
379
 
 
380
        virtual void close() {}
 
381
 
 
382
        virtual void write(const char *b, size_t len);
 
383
        virtual const char *getEOL() { return "\n"; };
 
384
 
 
385
        virtual void flush() {}
 
386
 
 
387
        virtual void write(char b);
 
388
        
 
389
        virtual void reset() 
 
390
        {
 
391
                iMemPos = iMemory;
 
392
                iMemSpace = iMemSize;
 
393
        }
 
394
        
 
395
        off_t getSize() { return iMemPos - iMemory; }
 
396
 
 
397
private:
 
398
        u_char *iMemory;
 
399
        off_t iMemSpace;
 
400
        off_t iMemSize;
 
401
        u_char *iMemPos;
 
402
};
 
403
 
 
404
typedef size_t (* CSStreamReadCallbackFunc) (void *caller_data, char *buffer, size_t buffer_size, u_char reset);
 
405
 
 
406
class CSCallbackInputStream : public CSInputStream {
 
407
public:
 
408
        CSCallbackInputStream(): callback(NULL), cb_data(NULL), havePeek(false), doReset(false) { }
 
409
        ~CSCallbackInputStream(){}
 
410
 
 
411
        virtual void close() {}
 
412
 
 
413
        virtual size_t read(char *b, size_t len)
 
414
        {
 
415
                size_t size = 0;
 
416
                
 
417
                if (havePeek) {
 
418
                        havePeek = false;
 
419
                        *b =  peek_char;
 
420
                        b++; len--;
 
421
                        if (len) {
 
422
                                size = callback(cb_data, b, len, doReset);
 
423
                        }
 
424
                                
 
425
                        size++;                 
 
426
                } else
 
427
                        size = callback(cb_data, b, len, doReset);
 
428
                        
 
429
                if (doReset)
 
430
                        doReset = false;
 
431
                        
 
432
                return size;
 
433
        }
 
434
 
 
435
        virtual int read()
 
436
        {
 
437
                char c;
 
438
                
 
439
                if (havePeek) {
 
440
                        havePeek = false;
 
441
                        return peek_char;
 
442
                }
 
443
                if (!callback(cb_data, &c, 1, doReset))
 
444
                        c = -1;
 
445
                        
 
446
                if (doReset)
 
447
                        doReset = false;
 
448
                
 
449
                return c;
 
450
        }
 
451
 
 
452
        virtual int peek()
 
453
        {
 
454
                if (!havePeek) {
 
455
                        if (callback(cb_data, &peek_char, 1, doReset))
 
456
                                havePeek = true;
 
457
                        else
 
458
                                return -1;
 
459
                }
 
460
                return peek_char;
 
461
        }
 
462
 
 
463
        virtual void reset() 
 
464
        {
 
465
                havePeek = false;
 
466
                doReset = false;
 
467
        }
 
468
 
 
469
        static CSCallbackInputStream *newStream(CSStreamReadCallbackFunc callback, void *user_data);
 
470
 
 
471
private:
 
472
        CSStreamReadCallbackFunc callback;
 
473
        void *cb_data;
 
474
        char peek_char;
 
475
        bool havePeek;
 
476
        bool doReset;
 
477
};
 
478
#endif
 
479
 
 
480