~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSStream.h

* Completes the blueprint for splitting the XA Resource Manager
  API from the storage engine API:

We add a new plugin::XaResourceManager abstract interface class
which exposes the X/Open XA distributed transaction protocol for
resource managers.

We add a new plugin::MonitoredInTransaction base class from
which all plugins that need monitored by Drizzle's transaction
manager (drizzled::TransactionServices component) derive.

All plugin::StorageEngine's now derive from plugin::MonitoredInTransaction
since all storage engines a monitored by the transaction manager
and the Session keeps a "slot" available for keeping the engine's
per-session data state.  In a future patch, the transaction log's
XaApplier plugin will also derive from MonitoredInTransaction, as
the transaction log, in XA mode, is also monitored by Drizzle's
transaction manager and automatically enlisted in XA transactions.

* Updates all documentation in /drizzled/transaction_services.cc
  to accurately reflect Drizzle's new transaction management
  process and explicit transaction and statement boundaries.

* Kills off dead code:

  binlog_format_names
  ha_init()
  total_ha, total_ha_2pc (no longer necessary, as the above-mentioned
  abstract base classes provide all of this functionality)
  StorageEngine::slot (now plugin::MonitoredInTransaction::getId())
  TransactionalStorageEngine::two_phase_commit (same as above)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original Author: Paul McCullagh
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-06-07
23
 
 *
24
 
 * CORE SYSTEM:
25
 
 * Basic input and output streams.
26
 
 *
27
 
 * These objects wrap the system streams, and simplify things.
28
 
 * I also want to standardize exceptions and implement
29
 
 * socket based streams.
30
 
 *
31
 
 */
32
 
 
33
 
#ifndef __CSSTREAM_H__
34
 
#define __CSSTREAM_H__
35
 
 
36
 
#define DEFAULT_BUFFER_SIZE             64000
37
 
 
38
 
#include "CSDefs.h"
39
 
#include "CSFile.h"
40
 
#include "CSSocket.h"
41
 
 
42
 
class CSInputStream : public CSRefObject {
43
 
public:
44
 
        CSInputStream() { }
45
 
        virtual ~CSInputStream() { }
46
 
 
47
 
        /*
48
 
         * Closes this input stream and releases any system
49
 
         * resources associated  with the stream.
50
 
         */
51
 
        virtual void close() = 0;
52
 
 
53
 
        /*
54
 
         * Reads up to len bytes of data from the input stream into an
55
 
         * array of bytes. Return the number of bytes read.
56
 
         *
57
 
         * Zero will be returned in the case of EOF.
58
 
         *
59
 
         * This call blocks until at least one byte is
60
 
         * returned.
61
 
         */
62
 
        virtual size_t read(char *b, size_t len) = 0;
63
 
 
64
 
        /* Reads the next byte of data from the input stream.
65
 
         * Returns -1 if EOF.
66
 
         */
67
 
        virtual int read() = 0;
68
 
 
69
 
        /* Read one character ahead. */
70
 
        virtual int peek() = 0;
71
 
 
72
 
        /*
73
 
         *  Reset this output stream to the start inorder to restart the write.
74
 
         */
75
 
        virtual void reset() = 0; 
76
 
 
77
 
        /* Return the name of the file, or whatever: */
78
 
        virtual const char *identify() = 0;
79
 
 
80
 
        /*
81
 
         * Read a line from the input stream. This function
82
 
         * handles all types of line endings. The function
83
 
         * return NULL on EOF.
84
 
         */
85
 
        CSStringBuffer *readLine();
86
 
};
87
 
 
88
 
class CSOutputStream : public CSRefObject {
89
 
public:
90
 
        /*
91
 
         * Closes this input stream and releases any system
92
 
         * resources associated  with the stream.
93
 
         */
94
 
        virtual void close() = 0;
95
 
 
96
 
        /*
97
 
         * Writes len bytes from the specified byte array starting at
98
 
         * offset off to this output stream.
99
 
         */
100
 
        virtual void write(const char *b, size_t len) = 0;
101
 
 
102
 
        /*
103
 
         * Returns the default EOL indicator.
104
 
         * Will be \n, \r or \r\n.
105
 
         */
106
 
        virtual const char *getEOL() = 0;
107
 
 
108
 
        /*
109
 
         *  Flushes this output stream and forces any buffered output
110
 
         * bytes to be written out.
111
 
         */
112
 
        virtual void flush() = 0;
113
 
 
114
 
        /*
115
 
         * Writes the specified byte to this output stream.
116
 
         */
117
 
        virtual void write(char b) = 0; 
118
 
 
119
 
        /*
120
 
         * Reset this output stream to the start inorder to restart the write.
121
 
         */
122
 
        virtual void reset() = 0; 
123
 
 
124
 
        virtual const char *identify() = 0;
125
 
 
126
 
        /*
127
 
         * Write a line. Terminator is specific to the
128
 
         * type of output stream and may depend on the
129
 
         * current platform.
130
 
         */
131
 
        void printLine(const char *cstr);
132
 
 
133
 
        /*
134
 
         * Various write types:
135
 
         */
136
 
        virtual void print(const char *cstr);
137
 
        virtual void print(CSString *s);
138
 
        virtual void print(int value);
139
 
        virtual void print(uint64_t value);
140
 
};
141
 
 
142
 
class CSStream : public CSObject {
143
 
public:
144
 
        static void pipe(CSOutputStream *out, CSInputStream *in);
145
 
};
146
 
 
147
 
/* File Stream: */
148
 
 
149
 
class CSFileInputStream : public CSInputStream {
150
 
public:
151
 
        CSFileInputStream(): iFile(NULL), iReadOffset(0) { }
152
 
        virtual ~CSFileInputStream();
153
 
 
154
 
        virtual void close();
155
 
 
156
 
        virtual size_t read(char *b, size_t len);
157
 
 
158
 
        virtual int read();
159
 
 
160
 
        virtual int peek();
161
 
 
162
 
        /*
163
 
         *  Reset this output stream to the start inorder to restart the read.
164
 
         */
165
 
        virtual void reset(); 
166
 
 
167
 
        virtual const char *identify();
168
 
 
169
 
        static CSFileInputStream *newStream(CSFile* f);
170
 
        static CSFileInputStream *newStream(CSFile* f, off64_t offset);
171
 
 
172
 
private:
173
 
        CSFile  *iFile;
174
 
        off64_t iReadOffset;
175
 
};
176
 
 
177
 
class CSFileOutputStream : public CSOutputStream {
178
 
public:
179
 
        CSFileOutputStream(): iFile(NULL), iWriteOffset(0) { }
180
 
        virtual ~CSFileOutputStream();
181
 
 
182
 
        virtual void close();
183
 
 
184
 
        virtual void write(const char *b, size_t len);
185
 
 
186
 
        virtual const char *getEOL();
187
 
 
188
 
        virtual void flush();
189
 
 
190
 
        virtual void write(char b);
191
 
 
192
 
        virtual void reset(); 
193
 
 
194
 
        virtual const char *identify();
195
 
 
196
 
        static CSFileOutputStream *newStream(CSFile* f);
197
 
        static CSFileOutputStream *newStream(CSFile* f, off64_t offset);
198
 
 
199
 
private:
200
 
        CSFile  *iFile;
201
 
        off64_t iWriteOffset;
202
 
};
203
 
 
204
 
/* Socket Stream */
205
 
 
206
 
class CSSocketInputStream : public CSInputStream {
207
 
public:
208
 
        CSSocketInputStream(): iSocket(NULL) { }
209
 
        virtual ~CSSocketInputStream();
210
 
 
211
 
        virtual void close();
212
 
 
213
 
        virtual size_t read(char *b, size_t len);
214
 
 
215
 
        virtual int read();
216
 
 
217
 
        virtual int peek();
218
 
 
219
 
        virtual void reset(); 
220
 
 
221
 
        virtual const char *identify();
222
 
 
223
 
        static CSSocketInputStream *newStream(CSSocket *s);
224
 
 
225
 
private:
226
 
        CSSocket* iSocket;
227
 
};
228
 
 
229
 
class CSSocketOutputStream : public CSOutputStream {
230
 
public:
231
 
        CSSocketOutputStream(): iSocket(NULL) { }
232
 
        virtual ~CSSocketOutputStream();
233
 
 
234
 
        virtual void close();
235
 
 
236
 
        virtual void write(const char *b, size_t len);
237
 
 
238
 
        virtual const char *getEOL() { return "\n"; };
239
 
 
240
 
        virtual void flush();
241
 
 
242
 
        virtual void write(char b);
243
 
 
244
 
        virtual void reset(); 
245
 
 
246
 
        virtual const char *identify();
247
 
 
248
 
        static CSSocketOutputStream *newStream(CSSocket *s);
249
 
 
250
 
private:
251
 
        CSSocket* iSocket;
252
 
};
253
 
 
254
 
/* Buffered Stream: */
255
 
#ifdef DEBUG_disabled
256
 
#define CS_STREAM_BUFFER_SIZE                   80
257
 
#else
258
 
#define CS_STREAM_BUFFER_SIZE                   (32 * 1024)
259
 
#endif
260
 
 
261
 
class CSBufferedInputStream : public CSInputStream {
262
 
public:
263
 
        CSBufferedInputStream(): iStream(NULL), iBuffTotal(0), iBuffPos(0) { }
264
 
        virtual ~CSBufferedInputStream();
265
 
 
266
 
        virtual void close();
267
 
 
268
 
        virtual size_t read(char *b, size_t len);
269
 
 
270
 
        virtual int read();
271
 
 
272
 
        virtual int peek();
273
 
 
274
 
        virtual void reset(); 
275
 
 
276
 
        virtual const char *identify();
277
 
 
278
 
        static CSBufferedInputStream *newStream(CSInputStream* i);
279
 
 
280
 
private:
281
 
        CSInputStream* iStream;
282
 
        u_char iBuffer[CS_STREAM_BUFFER_SIZE];
283
 
        uint32_t iBuffTotal;
284
 
        uint32_t iBuffPos;
285
 
};
286
 
 
287
 
class CSBufferedOutputStream : public CSOutputStream {
288
 
public:
289
 
        CSBufferedOutputStream(): iStream(NULL), iBuffTotal(0) { }
290
 
        virtual ~CSBufferedOutputStream();
291
 
 
292
 
        virtual void close();
293
 
 
294
 
        virtual void write(const char *b, size_t len);
295
 
 
296
 
        virtual const char *getEOL() { return "\n"; };
297
 
 
298
 
        virtual void flush();
299
 
 
300
 
        virtual void write(char b);
301
 
 
302
 
        virtual void reset(); 
303
 
 
304
 
        virtual const char *identify();
305
 
 
306
 
        static CSBufferedOutputStream *newStream(CSOutputStream* i);
307
 
 
308
 
private:
309
 
        CSOutputStream* iStream;
310
 
        u_char iBuffer[CS_STREAM_BUFFER_SIZE];
311
 
        uint32_t iBuffTotal;
312
 
};
313
 
 
314
 
/* memory Stream */
315
 
class CSMemoryInputStream : public CSInputStream {
316
 
public:
317
 
        CSMemoryInputStream(): iMemory(NULL), iMemTotal(0), iMemPos(0) { }
318
 
        ~CSMemoryInputStream(){}
319
 
 
320
 
        virtual void close() {}
321
 
 
322
 
        virtual size_t read(char *b, size_t len)
323
 
        {
324
 
                if (len > (iMemTotal - iMemPos))
325
 
                        len = iMemTotal - iMemPos;
326
 
                
327
 
                memcpy(b, iMemory + iMemPos, len);
328
 
                iMemPos += len; 
329
 
                return len;
330
 
        }
331
 
 
332
 
        virtual int read()
333
 
        {
334
 
                int b = -1;
335
 
                if (iMemPos < iMemTotal) 
336
 
                        b = iMemory[iMemPos++];
337
 
                return b;
338
 
        }
339
 
 
340
 
        virtual int peek()
341
 
        {
342
 
                int b = -1;
343
 
                if (iMemPos < iMemTotal) 
344
 
                        b = iMemory[iMemPos];
345
 
                return b;
346
 
        }
347
 
 
348
 
        virtual void reset() {iMemPos = 0;}
349
 
        
350
 
        virtual const char *identify() 
351
 
        {
352
 
                return "memory stream";
353
 
        }
354
 
 
355
 
        static CSMemoryInputStream *newStream(const u_char* buffer, uint32_t length);
356
 
 
357
 
private:
358
 
        const u_char *iMemory;
359
 
        uint32_t iMemTotal;
360
 
        uint32_t iMemPos;
361
 
};
362
 
 
363
 
 
364
 
class CSMemoryOutputStream : public CSOutputStream {
365
 
public:
366
 
        CSMemoryOutputStream(): iMemory(NULL), iMemTotal(0), iMemSpace(0), iMemMin(0), iMemPos(NULL){ }
367
 
        virtual ~CSMemoryOutputStream();
368
 
 
369
 
        virtual void close() {}
370
 
 
371
 
        virtual void write(const char *b, size_t len);
372
 
        virtual const char *getEOL() { return "\n"; };
373
 
 
374
 
        virtual void flush() {}
375
 
 
376
 
        virtual void write(char b);
377
 
 
378
 
        const u_char *getMemory(size_t *len)
379
 
        {
380
 
                *len = iMemPos - iMemory;
381
 
                return iMemory;
382
 
        }
383
 
        
384
 
        virtual void reset();
385
 
        
386
 
        virtual const char *identify();
387
 
        
388
 
        static CSMemoryOutputStream *newStream(size_t init_length, size_t min_alloc);
389
 
 
390
 
private:
391
 
        u_char *iMemory;
392
 
        uint32_t iMemTotal;
393
 
        uint32_t iMemSpace;
394
 
        uint32_t iMemMin;
395
 
        u_char *iMemPos;
396
 
};
397
 
 
398
 
class CSStaticMemoryOutputStream : public CSOutputStream {
399
 
public:
400
 
        CSStaticMemoryOutputStream(u_char *mem, off64_t size): iMemory(mem), iMemSpace(size), iMemSize(size), iMemPos(mem){ }
401
 
        virtual ~CSStaticMemoryOutputStream() {}
402
 
 
403
 
        virtual void close() {}
404
 
 
405
 
        virtual void write(const char *b, size_t len);
406
 
        virtual const char *getEOL() { return "\n"; };
407
 
 
408
 
        virtual void flush() {}
409
 
 
410
 
        virtual void write(char b);
411
 
        
412
 
        virtual void reset() 
413
 
        {
414
 
                iMemPos = iMemory;
415
 
                iMemSpace = iMemSize;
416
 
        }
417
 
        
418
 
        virtual const char *identify() 
419
 
        {
420
 
                return "memory stream";
421
 
        }
422
 
        
423
 
        off64_t getSize() { return iMemPos - iMemory; }
424
 
 
425
 
private:
426
 
        u_char *iMemory;
427
 
        off64_t iMemSpace;
428
 
        off64_t iMemSize;
429
 
        u_char *iMemPos;
430
 
};
431
 
 
432
 
typedef size_t (* CSStreamReadCallbackFunc) (void *caller_data, char *buffer, size_t buffer_size, u_char reset);
433
 
 
434
 
class CSCallbackInputStream : public CSInputStream {
435
 
public:
436
 
        CSCallbackInputStream(): callback(NULL), cb_data(NULL), havePeek(false), doReset(false) { }
437
 
        ~CSCallbackInputStream(){}
438
 
 
439
 
        virtual void close() {}
440
 
 
441
 
        virtual size_t read(char *b, size_t len)
442
 
        {
443
 
                size_t size = 0;
444
 
                
445
 
                if (havePeek) {
446
 
                        havePeek = false;
447
 
                        *b =  peek_char;
448
 
                        b++; len--;
449
 
                        if (len) {
450
 
                                size = callback(cb_data, b, len, doReset);
451
 
                        }
452
 
                                
453
 
                        size++;                 
454
 
                } else
455
 
                        size = callback(cb_data, b, len, doReset);
456
 
                        
457
 
                if (doReset)
458
 
                        doReset = false;
459
 
                        
460
 
                return size;
461
 
        }
462
 
 
463
 
        virtual int read()
464
 
        {
465
 
                char c;
466
 
                
467
 
                if (havePeek) {
468
 
                        havePeek = false;
469
 
                        return peek_char;
470
 
                }
471
 
                if (!callback(cb_data, &c, 1, doReset))
472
 
                        c = -1;
473
 
                        
474
 
                if (doReset)
475
 
                        doReset = false;
476
 
                
477
 
                return c;
478
 
        }
479
 
 
480
 
        virtual int peek()
481
 
        {
482
 
                if (!havePeek) {
483
 
                        if (callback(cb_data, &peek_char, 1, doReset))
484
 
                                havePeek = true;
485
 
                        else
486
 
                                return -1;
487
 
                }
488
 
                return peek_char;
489
 
        }
490
 
 
491
 
        virtual void reset() 
492
 
        {
493
 
                havePeek = false;
494
 
                doReset = false;
495
 
        }
496
 
 
497
 
        virtual const char *identify() 
498
 
        {
499
 
                return "callback stream";
500
 
        }
501
 
 
502
 
        static CSCallbackInputStream *newStream(CSStreamReadCallbackFunc callback, void *user_data);
503
 
 
504
 
private:
505
 
        CSStreamReadCallbackFunc callback;
506
 
        void *cb_data;
507
 
        char peek_char;
508
 
        bool havePeek;
509
 
        bool doReset;
510
 
};
511
 
 
512
 
#endif
513
 
 
514