~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/pbms.h

  • Committer: Brian Aker
  • Date: 2010-05-26 00:24:31 UTC
  • Revision ID: brian@gaz-20100526002431-33l06jm4z369luxy
Remove restriction on schemas named temporary.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2010 PrimeBase Technologies GmbH
2
 
 * All rights reserved.
3
 
 * 
4
 
 * Redistribution and use in source and binary forms, with or without 
5
 
 * modification, are permitted provided that the following conditions are met:
6
 
 * 
7
 
 *     * Redistributions of source code must retain the above copyright notice, 
8
 
 *              this list of conditions and the following disclaimer.
9
 
 *     * Redistributions in binary form must reproduce the above copyright notice, 
10
 
 *              this list of conditions and the following disclaimer in the documentation 
11
 
 *              and/or other materials provided with the distribution.
12
 
 *     * Neither the name of the "PrimeBase Technologies GmbH" nor the names of its 
13
 
 *              contributors may be used to endorse or promote products derived from this 
14
 
 *              software without specific prior written permission.
15
 
 * 
16
 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 
17
 
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
18
 
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
19
 
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 
20
 
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 
21
 
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
22
 
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
23
 
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
24
 
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
25
 
 * POSSIBILITY OF SUCH DAMAGE. 
26
 
 *  
27
 
 * PrimeBase Media Stream for MySQL and Drizzle
28
 
 *
29
 
 * Original author: Paul McCullagh
30
 
 * Continued development: Barry Leslie
31
 
 * H&G2JCtL
32
 
 *
33
 
 * 2007-06-01
34
 
 *
35
 
 * This file contains the BLOB streaming interface engines that
36
 
 * are streaming enabled.
37
 
 *
38
 
 */
39
 
#ifndef __PBMS_H__
40
 
#define __PBMS_H__
41
 
 
42
 
#include <stdio.h>
43
 
#include <sys/types.h>
44
 
#include <unistd.h>
45
 
#include <stdlib.h>
46
 
#include <fcntl.h>
47
 
#include <string.h>
48
 
#include <dirent.h>
49
 
#include <signal.h>
50
 
#include <ctype.h>
51
 
#include <errno.h>
52
 
#include <inttypes.h>
53
 
#include <stdint.h>
54
 
 
55
 
#ifdef USE_PRAGMA_INTERFACE
56
 
#pragma interface                       /* gcc class implementation */
57
 
#endif
58
 
 
59
 
 
60
 
#define MS_SHARED_MEMORY_MAGIC                  0x7E9A120C
61
 
#define MS_ENGINE_VERSION                               3
62
 
#define MS_CALLBACK_VERSION                             6
63
 
#define MS_SHARED_MEMORY_VERSION                2
64
 
#define MS_ENGINE_LIST_SIZE                             10
65
 
#define MS_TEMP_FILE_PREFIX                             "pbms_temp_"
66
 
 
67
 
#define MS_BLOB_HANDLE_SIZE                             300
68
 
 
69
 
#define SH_MASK                                                 ((S_IRUSR | S_IWUSR) | (S_IRGRP | S_IWGRP) | (S_IROTH))
70
 
 
71
 
#define MS_OK                                                   0
72
 
#define MS_ERR_ENGINE                                   1                                                       /* Internal engine error. */
73
 
#define MS_ERR_UNKNOWN_TABLE                    2                                                       /* Returned if the engine cannot open the given table. */
74
 
#define MS_ERR_NOT_FOUND                                3                                                       /* The BLOB cannot be found. */
75
 
#define MS_ERR_TABLE_LOCKED                             4                                                       /* Table is currently locked. */
76
 
#define MS_ERR_INCORRECT_URL                    5
77
 
#define MS_ERR_AUTH_FAILED                              6
78
 
#define MS_ERR_NOT_IMPLEMENTED                  7
79
 
#define MS_ERR_UNKNOWN_DB                               8
80
 
#define MS_ERR_REMOVING_REPO                    9
81
 
#define MS_ERR_DATABASE_DELETED                 10
82
 
#define MS_ERR_DUPLICATE                                11                                              /* Attempt to insert a duplicate key into a system table. */
83
 
#define MS_ERR_INVALID_RECORD                   12
84
 
#define MS_ERR_RECOVERY_IN_PROGRESS             13
85
 
#define MS_ERR_DUPLICATE_DB                             14
86
 
#define MS_ERR_DUPLICATE_DB_ID                  15
87
 
#define MS_ERR_INVALID_OPERATION                16
88
 
#define MS_ERR_MISSING_CLOUD_REFFERENCE 17
89
 
#define MS_ERR_SYSTAB_VERSION                   18
90
 
 
91
 
#define MS_LOCK_NONE                                    0
92
 
#define MS_LOCK_READONLY                                1
93
 
#define MS_LOCK_READ_WRITE                              2
94
 
 
95
 
#define PBMS_BLOB_URL_SIZE                              120
96
 
 
97
 
#define PBMS_FIELD_COL_SIZE                             128
98
 
#define PBMS_FIELD_COND_SIZE                    300
99
 
 
100
 
#define MS_RESULT_MESSAGE_SIZE                  300
101
 
#define MS_RESULT_STACK_SIZE                    200
102
 
 
103
 
typedef struct PBMSResultRec {
104
 
        uint8_t                         mr_had_blobs;                                                   /* A flag to indicate if the statement had any PBMS blobs. */
105
 
        int                                             mr_code;                                                                /* Engine specific error code. */ 
106
 
        char                                    mr_message[MS_RESULT_MESSAGE_SIZE];             /* Error message, required if non-zero return code. */
107
 
        char                                    mr_stack[MS_RESULT_STACK_SIZE];                 /* Trace information about where the error occurred. */
108
 
} PBMSResultRec, *PBMSResultPtr;
109
 
 
110
 
 
111
 
 
112
 
typedef struct PBMSBlobID {
113
 
        uint32_t                                bi_db_id;       
114
 
        uint64_t                                bi_blob_size;   
115
 
        uint64_t                                bi_blob_id;                             // or repo file offset if type = REPO
116
 
        uint64_t                                bi_blob_ref_id;                 
117
 
        uint32_t                                bi_tab_id;                              // or repo ID if type = REPO
118
 
        uint32_t                                bi_auth_code;
119
 
        uint32_t                                bi_blob_type;
120
 
} PBMSBlobIDRec, *PBMSBlobIDPtr;
121
 
 
122
 
 
123
 
typedef struct MSBlobURL {
124
 
        uint8_t                                 bu_type;
125
 
        uint32_t                                bu_db_id;
126
 
        uint32_t                                bu_tab_id;                              // or repo ID if type = REPO
127
 
        uint64_t                                bu_blob_id;                             // or repo file offset if type = REPO
128
 
        uint32_t                                bu_auth_code;
129
 
        uint32_t                                bu_server_id;
130
 
        uint64_t                                bu_blob_size;                   
131
 
        uint64_t                                bu_blob_ref_id;                 // Unique identifier of the blob reference
132
 
} MSBlobURLRec, *MSBlobURLPtr;
133
 
 
134
 
 
135
 
typedef struct PBMSBlobURL {
136
 
        char                                    bu_data[PBMS_BLOB_URL_SIZE];
137
 
} PBMSBlobURLRec, *PBMSBlobURLPtr;
138
 
 
139
 
typedef struct PBMSEngineRec {
140
 
        int                                             ms_version;                                                     /* MS_ENGINE_VERSION */
141
 
        int                                             ms_index;                                                       /* The index into the engine list. */
142
 
        int                                             ms_removing;                                            /* TRUE (1) if the engine is being removed. */
143
 
        int                                             ms_internal;                                            /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
144
 
        char                                    ms_engine_name[32];
145
 
        int                                             ms_has_transactions;                            /* TRUE (1) if the engine supports transactions. */
146
 
} PBMSEngineRec, *PBMSEnginePtr;
147
 
 
148
 
/*                      2       10              1                       10                      20                      10                              10                      20                              20
149
 
 * Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
150
 
 */
151
 
 
152
 
#ifndef  PRIu64
153
 
#define URL_FMT "~*%u%c%u-%llu-%x-%u-%llu-%llu"
154
 
#else
155
 
#define URL_FMT "~*%"PRIu32"%c%"PRIu32"-%"PRIu64"-%x-%"PRIu32"-%"PRIu64"-%"PRIu64""
156
 
#endif
157
 
#define MS_URL_TYPE_BLOB        '~'
158
 
#define MS_URL_TYPE_REPO        '_'
159
 
class PBMSBlobURLTools
160
 
{
161
 
        public:
162
 
        static bool couldBeURL(const char *blob_url, size_t size, MSBlobURLPtr blob)
163
 
        {
164
 
                if (blob_url && (size < PBMS_BLOB_URL_SIZE)) {
165
 
                        MSBlobURLRec ignored_blob;
166
 
                        char    buffer[PBMS_BLOB_URL_SIZE+1];
167
 
                        char    junk[5];
168
 
                        int             scanned;
169
 
                        
170
 
                        if (!blob)
171
 
                                blob = &ignored_blob;
172
 
                        
173
 
                        junk[0] = 0;
174
 
                        if (blob_url[size]) { // There is no guarantee that the URL will be null terminated.
175
 
                                memcpy(buffer, blob_url, size);
176
 
                                buffer[size] = 0;
177
 
                                blob_url = buffer;
178
 
                        }
179
 
                        
180
 
                        scanned = sscanf(blob_url, URL_FMT"%4s", 
181
 
                                &blob->bu_db_id, 
182
 
                                &blob->bu_type, 
183
 
                                &blob->bu_tab_id, 
184
 
                                &blob->bu_blob_id, 
185
 
                                &blob->bu_auth_code, 
186
 
                                &blob->bu_server_id, 
187
 
                                &blob->bu_blob_ref_id, 
188
 
                                &blob->bu_blob_size, 
189
 
                                junk);
190
 
                                
191
 
                        if ((scanned != 8) || (blob->bu_type != MS_URL_TYPE_BLOB && blob->bu_type != MS_URL_TYPE_REPO)) {// If junk is found at the end this will also result in an invalid URL. 
192
 
                                //printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
193
 
                                return false;
194
 
                        }
195
 
                
196
 
                        return true;
197
 
                }
198
 
                
199
 
                return false;
200
 
        }
201
 
        
202
 
        static bool couldBeURL(const char *blob_url, MSBlobURLPtr blob)
203
 
        {
204
 
                return couldBeURL(blob_url, strlen(blob_url), blob);
205
 
        }
206
 
        
207
 
        static void buildBlobURL(MSBlobURLPtr blob, PBMSBlobURLPtr url)
208
 
        {
209
 
                snprintf(url->bu_data, PBMS_BLOB_URL_SIZE, URL_FMT,     blob->bu_db_id, 
210
 
                                                                blob->bu_type, 
211
 
                                                                blob->bu_tab_id, 
212
 
                                                                blob->bu_blob_id, 
213
 
                                                                blob->bu_auth_code, 
214
 
                                                                blob->bu_server_id, 
215
 
                                                                blob->bu_blob_ref_id, 
216
 
                                                                blob->bu_blob_size);
217
 
        }
218
 
};
219
 
 
220
 
#ifndef DRIZZLED
221
 
/*
222
 
 * This function should never be called directly, it is called
223
 
 * by deregisterEngine() below.
224
 
 */
225
 
typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
226
 
 
227
 
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
228
 
 
229
 
/*
230
 
 * Call this function to store a BLOB in the repository the BLOB's
231
 
 * URL will be returned. The returned URL buffer is expected to be atleast 
232
 
 * PBMS_BLOB_URL_SIZE long.
233
 
 *
234
 
 * The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
235
 
 */
236
 
typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result);
237
 
 
238
 
/*
239
 
 * Call this function for each BLOB to be retained. When a BLOB is used, the 
240
 
 * URL may be changed. The returned URL buffer is expected to be atleast 
241
 
 * PBMS_BLOB_URL_SIZE long.
242
 
 *
243
 
 * The returned URL must be inserted into the row in place of the given
244
 
 * URL.
245
 
 */
246
 
typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
247
 
 
248
 
/*
249
 
 * If a row containing a BLOB is deleted, then the BLOBs in the
250
 
 * row must be released.
251
 
 *
252
 
 * Note: if a table is dropped, all the BLOBs referenced by the
253
 
 * table are automatically released.
254
 
 */
255
 
typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
256
 
 
257
 
typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
258
 
 
259
 
typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result);
260
 
 
261
 
typedef void (*ECCallCompleted)(bool built_in, bool ok);
262
 
 
263
 
typedef struct PBMSCallbacksRec {
264
 
        int                                             cb_version;                                                     /* MS_CALLBACK_VERSION */
265
 
        ECRegisterdFunc                 cb_register;
266
 
        ECDeregisterdFunc               cb_deregister;
267
 
        ECCreateBlobsFunc               cb_create_blob;
268
 
        ECRetainBlobsFunc               cb_retain_blob;
269
 
        ECReleaseBlobFunc               cb_release_blob;
270
 
        ECDropTable                             cb_drop_table;
271
 
        ECRenameTable                   cb_rename_table;
272
 
        ECCallCompleted                 cb_completed;
273
 
} PBMSCallbacksRec, *PBMSCallbacksPtr;
274
 
 
275
 
typedef struct PBMSSharedMemoryRec {
276
 
        int                                             sm_magic;                                                       /* MS_SHARED_MEMORY_MAGIC */
277
 
        int                                             sm_version;                                                     /* MS_SHARED_MEMORY_VERSION */
278
 
        volatile int                    sm_shutdown_lock;                                       /* "Cheap" lock for shutdown! */
279
 
        PBMSCallbacksPtr                sm_callbacks;
280
 
        int                                             sm_reserved1[20];
281
 
        void                                    *sm_reserved2[20];
282
 
        int                                             sm_list_size;
283
 
        int                                             sm_list_len;
284
 
        PBMSEnginePtr                   sm_engine_list[MS_ENGINE_LIST_SIZE];
285
 
} PBMSSharedMemoryRec, *PBMSSharedMemoryPtr;
286
 
 
287
 
#ifdef PBMS_API
288
 
 
289
 
class PBMS_API
290
 
{
291
 
private:
292
 
        const char *temp_prefix[3];
293
 
        bool built_in;
294
 
 
295
 
public:
296
 
        PBMS_API(): sharedMemory(NULL) { 
297
 
                int i = 0;
298
 
                temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
299
 
                temp_prefix[i++] = NULL;
300
 
                
301
 
        }
302
 
 
303
 
        ~PBMS_API() { }
304
 
 
305
 
        /*
306
 
         * This method is called by the PBMS engine during startup.
307
 
         */
308
 
        int PBMSStartup(PBMSCallbacksPtr callbacks, PBMSResultPtr result) {
309
 
                int err;
310
 
                
311
 
                deleteTempFiles();
312
 
                err = getSharedMemory(true, result);
313
 
                if (!err)
314
 
                        sharedMemory->sm_callbacks = callbacks;
315
 
                        
316
 
                return err;
317
 
        }
318
 
 
319
 
        /*
320
 
         * This method is called by the PBMS engine during startup.
321
 
         */
322
 
        void PBMSShutdown() {
323
 
                
324
 
                if (!sharedMemory)
325
 
                        return;
326
 
                        
327
 
                lock();
328
 
                sharedMemory->sm_callbacks = NULL;
329
 
 
330
 
                bool empty = true;
331
 
                for (int i=0; i<sharedMemory->sm_list_len && empty; i++) {
332
 
                        if (sharedMemory->sm_engine_list[i]) 
333
 
                                empty = false;
334
 
                }
335
 
 
336
 
                unlock();
337
 
                
338
 
                if (empty) 
339
 
                        removeSharedMemory();
340
 
        }
341
 
 
342
 
        /*
343
 
         * Register the engine with the Stream Daemon.
344
 
         */
345
 
        int registerEngine(PBMSEnginePtr the_engine, PBMSResultPtr result) {
346
 
                int err;
347
 
 
348
 
                deleteTempFiles();
349
 
 
350
 
                // The first engine to register creates the shared memory.
351
 
                if ((err = getSharedMemory(true, result)))
352
 
                        return err;
353
 
 
354
 
                lock();
355
 
                for (int i=0; i<sharedMemory->sm_list_size; i++) {
356
 
                        if (!sharedMemory->sm_engine_list[i]) {
357
 
                                PBMSEnginePtr engine;
358
 
                                engine = (PBMSEnginePtr) malloc(sizeof(PBMSEngineRec));
359
 
                                if (!engine) {
360
 
                                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Out of memory.");
361
 
                                        err = MS_ERR_ENGINE;
362
 
                                        goto done;
363
 
                                }
364
 
                                memcpy(engine, the_engine, sizeof(PBMSEngineRec));
365
 
                                
366
 
                                sharedMemory->sm_engine_list[i] = engine;
367
 
                                engine->ms_index = i;
368
 
                                if (i >= sharedMemory->sm_list_len)
369
 
                                        sharedMemory->sm_list_len = i+1;
370
 
                                if (sharedMemory->sm_callbacks)
371
 
                                        sharedMemory->sm_callbacks->cb_register(engine);
372
 
                                        
373
 
                                built_in = (engine->ms_internal == 1);
374
 
                                err =  MS_OK;
375
 
                                goto done;
376
 
                        }
377
 
                }
378
 
                
379
 
                result->mr_code = 15010;
380
 
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Too many BLOB streaming engines already registered");
381
 
                *result->mr_stack = 0;
382
 
                
383
 
                err = MS_ERR_ENGINE;
384
 
                
385
 
        done:
386
 
                unlock();
387
 
                return err;
388
 
        }
389
 
 
390
 
        void lock() {
391
 
                while (sharedMemory->sm_shutdown_lock)
392
 
                        usleep(10000);
393
 
                sharedMemory->sm_shutdown_lock++;
394
 
                while (sharedMemory->sm_shutdown_lock != 1) {
395
 
                        usleep(random() % 10000);
396
 
                        sharedMemory->sm_shutdown_lock--;
397
 
                        usleep(10000);
398
 
                        sharedMemory->sm_shutdown_lock++;
399
 
                }
400
 
        }
401
 
 
402
 
        void unlock() {
403
 
                sharedMemory->sm_shutdown_lock--;
404
 
        }
405
 
 
406
 
        void deregisterEngine(const char *engine_name) {
407
 
                PBMSResultRec result;
408
 
                int err;
409
 
 
410
 
                if ((err = getSharedMemory(false, &result)))
411
 
                        return;
412
 
 
413
 
                lock();
414
 
 
415
 
                bool empty = true;
416
 
                for (int i=0; i<sharedMemory->sm_list_len; i++) {
417
 
                        PBMSEnginePtr engine = sharedMemory->sm_engine_list[i];
418
 
                        if (engine) {                            
419
 
                                if (strcmp(engine->ms_engine_name, engine_name) == 0) {
420
 
                                        if (sharedMemory->sm_callbacks)
421
 
                                                sharedMemory->sm_callbacks->cb_deregister(engine);
422
 
                                        free(engine);
423
 
                                        sharedMemory->sm_engine_list[i] = NULL;
424
 
                                }
425
 
                                else
426
 
                                        empty = false;
427
 
                        }
428
 
                }
429
 
 
430
 
                unlock();
431
 
 
432
 
                if (empty) 
433
 
                        removeSharedMemory();
434
 
        }
435
 
 
436
 
        void removeSharedMemory() 
437
 
        {
438
 
                const char **prefix = temp_prefix;
439
 
                char    temp_file[100];
440
 
 
441
 
                // Do not remove the sharfed memory until after
442
 
                // the PBMS engine has shutdown.
443
 
                if (sharedMemory->sm_callbacks)
444
 
                        return;
445
 
                        
446
 
                sharedMemory->sm_magic = 0;
447
 
                free(sharedMemory);
448
 
                sharedMemory = NULL;
449
 
                
450
 
                while (*prefix) {
451
 
                        getTempFileName(temp_file, 100, *prefix, getpid());
452
 
                        unlink(temp_file);
453
 
                        prefix++;
454
 
                }
455
 
        }
456
 
        
457
 
        bool isPBMSLoaded()
458
 
        {
459
 
                PBMSResultRec result;
460
 
                if (getSharedMemory(false, &result))
461
 
                        return false;
462
 
                        
463
 
                return (sharedMemory->sm_callbacks != NULL);
464
 
        }
465
 
        
466
 
        int  retainBlob(const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
467
 
        {
468
 
                int err;
469
 
                char safe_url[PBMS_BLOB_URL_SIZE+1];
470
 
 
471
 
 
472
 
                if ((err = getSharedMemory(false, result)))
473
 
                        return err;
474
 
 
475
 
                if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL)) {
476
 
                
477
 
                        if (!sharedMemory->sm_callbacks)  {
478
 
                                ret_blob_url->bu_data[0] = 0;
479
 
                                return MS_OK;
480
 
                        }
481
 
                        err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, result);
482
 
                        if (err)
483
 
                                return err;
484
 
                                
485
 
                        blob_url = ret_blob_url->bu_data;
486
 
                } else {
487
 
                        // Make sure the url is a C string:
488
 
                        if (blob_url[blob_size]) {
489
 
                                memcpy(safe_url, blob_url, blob_size);
490
 
                                safe_url[blob_size] = 0;
491
 
                                blob_url = safe_url;
492
 
                        }
493
 
                }
494
 
                
495
 
 
496
 
                if (!sharedMemory->sm_callbacks) {
497
 
                        result->mr_code = MS_ERR_INCORRECT_URL;
498
 
                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming daemon (PBMS) not installed");
499
 
                        *result->mr_stack = 0;
500
 
                        return MS_ERR_INCORRECT_URL;
501
 
                }
502
 
 
503
 
                return sharedMemory->sm_callbacks->cb_retain_blob(built_in, db_name, tab_name, ret_blob_url, blob_url, col_index, result);
504
 
        }
505
 
 
506
 
        int releaseBlob(const char *db_name, const char *tab_name, char *blob_url, size_t blob_size, PBMSResultPtr result)
507
 
        {
508
 
                int err;
509
 
                char safe_url[PBMS_BLOB_URL_SIZE+1];
510
 
 
511
 
                if ((err = getSharedMemory(false, result)))
512
 
                        return err;
513
 
 
514
 
                if (!sharedMemory->sm_callbacks)
515
 
                        return MS_OK;
516
 
 
517
 
                if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL))
518
 
                        return MS_OK;
519
 
 
520
 
                if (blob_url[blob_size]) {
521
 
                        memcpy(safe_url, blob_url, blob_size);
522
 
                        safe_url[blob_size] = 0;
523
 
                        blob_url = safe_url;
524
 
                }
525
 
                
526
 
                return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
527
 
        }
528
 
 
529
 
        int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
530
 
        {
531
 
                int err;
532
 
 
533
 
                if ((err = getSharedMemory(false, result)))
534
 
                        return err;
535
 
 
536
 
                if (!sharedMemory->sm_callbacks)
537
 
                        return MS_OK;
538
 
                        
539
 
                return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
540
 
        }
541
 
 
542
 
        int renameTable(const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result)
543
 
        {
544
 
                int err;
545
 
 
546
 
                if ((err = getSharedMemory(false, result)))
547
 
                        return err;
548
 
 
549
 
                if (!sharedMemory->sm_callbacks)
550
 
                        return MS_OK;
551
 
                        
552
 
                return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_db, to_table, result);
553
 
        }
554
 
 
555
 
        void completed(int ok)
556
 
        {
557
 
                PBMSResultRec result;
558
 
 
559
 
                if (getSharedMemory(false, &result))
560
 
                        return;
561
 
 
562
 
                if (!sharedMemory->sm_callbacks)
563
 
                        return;
564
 
                        
565
 
                sharedMemory->sm_callbacks->cb_completed(built_in, ok);
566
 
        }
567
 
        
568
 
        volatile PBMSSharedMemoryPtr sharedMemory;
569
 
 
570
 
private:
571
 
        int getSharedMemory(bool create, PBMSResultPtr result)
572
 
        {
573
 
                int             tmp_f;
574
 
                int             r;
575
 
                char    temp_file[100];
576
 
                const char      **prefix = temp_prefix;
577
 
 
578
 
                if (sharedMemory)
579
 
                        return MS_OK;
580
 
 
581
 
                while (*prefix) {
582
 
                        getTempFileName(temp_file, 100, *prefix, getpid());
583
 
                        tmp_f = open(temp_file, O_RDWR | (create ? O_CREAT : 0), SH_MASK);
584
 
                        if (tmp_f == -1)
585
 
                                return setOSResult(errno, "open", temp_file, result);
586
 
 
587
 
                        r = lseek(tmp_f, 0, SEEK_SET);
588
 
                        if (r == -1) {
589
 
                                close(tmp_f);
590
 
                                return setOSResult(errno, "lseek", temp_file, result);
591
 
                        }
592
 
                        ssize_t tfer;
593
 
                        char buffer[100];
594
 
                        
595
 
                        tfer = read(tmp_f, buffer, 100);
596
 
                        if (tfer == -1) {
597
 
                                close(tmp_f);
598
 
                                return setOSResult(errno, "read", temp_file, result);
599
 
                        }
600
 
 
601
 
                        buffer[tfer] = 0;
602
 
                        sscanf(buffer, "%p", (void**) &sharedMemory);
603
 
                        if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
604
 
                                if (!create)
605
 
                                        return MS_OK;
606
 
 
607
 
                                sharedMemory = (PBMSSharedMemoryPtr) calloc(1, sizeof(PBMSSharedMemoryRec));
608
 
                                sharedMemory->sm_magic = MS_SHARED_MEMORY_MAGIC;
609
 
                                sharedMemory->sm_version = MS_SHARED_MEMORY_VERSION;
610
 
                                sharedMemory->sm_list_size = MS_ENGINE_LIST_SIZE;
611
 
 
612
 
                                r = lseek(tmp_f, 0, SEEK_SET);
613
 
                                if (r == -1) {
614
 
                                        close(tmp_f);
615
 
                                        return setOSResult(errno, "fseek", temp_file, result);
616
 
                                }
617
 
 
618
 
                                snprintf(buffer, 100, "%p", (void*) sharedMemory);
619
 
                                tfer = write(tmp_f, buffer, strlen(buffer));
620
 
                                if (tfer != (ssize_t) strlen(buffer)) {
621
 
                                        close(tmp_f);
622
 
                                        return setOSResult(errno, "write", temp_file, result);
623
 
                                }
624
 
                                r = fsync(tmp_f);
625
 
                                if (r == -1) {
626
 
                                        close(tmp_f);
627
 
                                        return setOSResult(errno, "fsync", temp_file, result);
628
 
                                }
629
 
                        }
630
 
                        else if (sharedMemory->sm_version != MS_SHARED_MEMORY_VERSION) {
631
 
                                close(tmp_f);
632
 
                                result->mr_code = -1000;
633
 
                                *result->mr_stack = 0;
634
 
                                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Shared memory version: ");          
635
 
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, sharedMemory->sm_version);           
636
 
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ", does not match engine shared memory version: ");          
637
 
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, MS_SHARED_MEMORY_VERSION);           
638
 
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ".");                
639
 
                                return MS_ERR_ENGINE;
640
 
                        }
641
 
                        close(tmp_f);
642
 
                        
643
 
                        // For backward compatability we need to create the old versions but we only need to read the current version.
644
 
                        if (create)
645
 
                                prefix++;
646
 
                        else
647
 
                                break;
648
 
                }
649
 
                return MS_OK;
650
 
        }
651
 
 
652
 
        void strcpy(size_t size, char *to, const char *from)
653
 
        {
654
 
                if (size > 0) {
655
 
                        size--;
656
 
                        while (*from && size--)
657
 
                                *to++ = *from++;
658
 
                        *to = 0;
659
 
                }
660
 
        }
661
 
 
662
 
        void strcat(size_t size, char *to, const char *from)
663
 
        {
664
 
                while (*to && size--) to++;
665
 
                strcpy(size, to, from);
666
 
        }
667
 
 
668
 
        void strcat(size_t size, char *to, int val)
669
 
        {
670
 
                char buffer[20];
671
 
 
672
 
                snprintf(buffer, 20, "%d", val);
673
 
                strcat(size, to, buffer);
674
 
        }
675
 
 
676
 
        int setOSResult(int err, const char *func, char *file, PBMSResultPtr result) {
677
 
                char *msg;
678
 
 
679
 
                result->mr_code = err;
680
 
                *result->mr_stack = 0;
681
 
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "System call ");             
682
 
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, func);               
683
 
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "() failed on ");            
684
 
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, file);               
685
 
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ": ");               
686
 
 
687
 
#ifdef XT_WIN
688
 
                if (FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, iMessage + strlen(iMessage), MS_RESULT_MESSAGE_SIZE - strlen(iMessage), NULL)) {
689
 
                        char *ptr;
690
 
 
691
 
                        ptr = &iMessage[strlen(iMessage)];
692
 
                        while (ptr-1 > err_msg) {
693
 
                                if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.')
694
 
                                        break;
695
 
                                ptr--;
696
 
                        }
697
 
                        *ptr = 0;
698
 
 
699
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
700
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
701
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
702
 
                        return MS_ERR_ENGINE;
703
 
                }
704
 
#endif
705
 
 
706
 
                msg = strerror(err);
707
 
                if (msg) {
708
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, msg);
709
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
710
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
711
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
712
 
                }
713
 
                else {
714
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Unknown OS error code ");
715
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
716
 
                }
717
 
 
718
 
                return MS_ERR_ENGINE;
719
 
        }
720
 
 
721
 
        void getTempFileName(char *temp_file, int buffer_size, const char * prefix, int pid)
722
 
        {
723
 
                snprintf(temp_file, buffer_size, "/tmp/%s%d", prefix,  pid);
724
 
        }
725
 
 
726
 
        bool startsWith(const char *cstr, const char *w_cstr)
727
 
        {
728
 
                while (*cstr && *w_cstr) {
729
 
                        if (*cstr != *w_cstr)
730
 
                                return false;
731
 
                        cstr++;
732
 
                        w_cstr++;
733
 
                }
734
 
                return *cstr || !*w_cstr;
735
 
        }
736
 
 
737
 
        void deleteTempFiles()
738
 
        {
739
 
                struct dirent   entry;
740
 
                struct dirent   *result;
741
 
                DIR                             *odir;
742
 
                int                             err;
743
 
                char                    temp_file[100];
744
 
 
745
 
                if (!(odir = opendir("/tmp/")))
746
 
                        return;
747
 
                err = readdir_r(odir, &entry, &result);
748
 
                while (!err && result) {
749
 
                        const char **prefix = temp_prefix;
750
 
                        
751
 
                        while (*prefix) {
752
 
                                if (startsWith(entry.d_name, *prefix)) {
753
 
                                        int pid = atoi(entry.d_name + strlen(*prefix));
754
 
                                        
755
 
                                        /* If the process does not exist: */
756
 
                                        if (kill(pid, 0) == -1 && errno == ESRCH) {
757
 
                                                getTempFileName(temp_file, 100, *prefix, pid);
758
 
                                                unlink(temp_file);
759
 
                                        }
760
 
                                }
761
 
                                prefix++;
762
 
                        }
763
 
                        
764
 
                        err = readdir_r(odir, &entry, &result);
765
 
                }
766
 
                closedir(odir);
767
 
        }
768
 
};
769
 
#endif // PBMS_API
770
 
 
771
 
/*
772
 
 * The following is a low level API for accessing blobs directly.
773
 
 */
774
 
 
775
 
 
776
 
/*
777
 
 * Any threads using the direct blob access API must first register them selves with the
778
 
 * blob streaming engine before using the blob access functions. This is done by calling
779
 
 * PBMSInitBlobStreamingThread(). Call PBMSDeinitBlobStreamingThread() after the thread is
780
 
 * done using the direct blob access API
781
 
 */
782
 
 
783
 
/* 
784
 
* PBMSInitBlobStreamingThread(): Returns a pointer to a blob streaming thread.
785
 
*/
786
 
extern void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result);
787
 
extern void PBMSDeinitBlobStreamingThread(void *v_bs_thread);
788
 
 
789
 
/* 
790
 
* PBMSGetError():Gets the last error reported by a blob streaming thread.
791
 
*/
792
 
extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
793
 
 
794
 
/* 
795
 
* PBMSCreateBlob():Creates a new blob in the database of the given size.
796
 
*/
797
 
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
798
 
 
799
 
/* 
800
 
* PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of 
801
 
* data written to the blob must match the size specified when the blob was created.
802
 
*/
803
 
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
804
 
 
805
 
/* 
806
 
* PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
807
 
*/
808
 
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
809
 
 
810
 
/*
811
 
* PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast  PBMS_BLOB_URL_SIZE bytes in size.
812
 
*/
813
 
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
814
 
 
815
 
/*
816
 
* PBMSIDToURL():Convert a blob URL to a blob ID.
817
 
*/
818
 
extern bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id);
819
 
#endif //DRIZZLED 
820
 
 
821
 
 
822
 
#endif //__PBMS_H__