~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/pbms.h

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2010 PrimeBase Technologies GmbH
2
 
 * All rights reserved.
3
 
 * 
4
 
 * Redistribution and use in source and binary forms, with or without 
5
 
 * modification, are permitted provided that the following conditions are met:
6
 
 * 
7
 
 *     * Redistributions of source code must retain the above copyright notice, 
8
 
 *              this list of conditions and the following disclaimer.
9
 
 *     * Redistributions in binary form must reproduce the above copyright notice, 
10
 
 *              this list of conditions and the following disclaimer in the documentation 
11
 
 *              and/or other materials provided with the distribution.
12
 
 *     * Neither the name of the "PrimeBase Technologies GmbH" nor the names of its 
13
 
 *              contributors may be used to endorse or promote products derived from this 
14
 
 *              software without specific prior written permission.
15
 
 * 
16
 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 
17
 
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
18
 
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
19
 
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 
20
 
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 
21
 
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
22
 
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
23
 
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
24
 
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
25
 
 * POSSIBILITY OF SUCH DAMAGE. 
26
 
 *  
27
 
 * PrimeBase Media Stream for MySQL and Drizzle
28
 
 *
29
 
 * Original author: Paul McCullagh
30
 
 * Continued development: Barry Leslie
31
 
 * H&G2JCtL
32
 
 *
33
 
 * 2007-06-01
34
 
 *
35
 
 * This file contains the BLOB streaming interface engines that
36
 
 * are streaming enabled.
37
 
 *
38
 
 */
39
 
#pragma once
40
 
#ifndef __PBMS_H__
41
 
#define __PBMS_H__
42
 
 
43
 
#include <stdio.h>
44
 
#include <sys/types.h>
45
 
#include <unistd.h>
46
 
#include <stdlib.h>
47
 
#include <fcntl.h>
48
 
#include <string.h>
49
 
#include <dirent.h>
50
 
#include <signal.h>
51
 
#include <ctype.h>
52
 
#include <errno.h>
53
 
#include <inttypes.h>
54
 
#include <stdint.h>
55
 
 
56
 
#ifdef USE_PRAGMA_INTERFACE
57
 
#pragma interface                       /* gcc class implementation */
58
 
#endif
59
 
 
60
 
 
61
 
#define MS_SHARED_MEMORY_MAGIC                  0x7E9A120C
62
 
#define MS_ENGINE_VERSION                               3
63
 
#define MS_CALLBACK_VERSION                             6
64
 
#define MS_SHARED_MEMORY_VERSION                2
65
 
#define MS_ENGINE_LIST_SIZE                             10
66
 
#define MS_TEMP_FILE_PREFIX                             "pbms_temp_"
67
 
 
68
 
#define MS_BLOB_HANDLE_SIZE                             300
69
 
 
70
 
#define SH_MASK                                                 ((S_IRUSR | S_IWUSR) | (S_IRGRP | S_IWGRP) | (S_IROTH))
71
 
 
72
 
#define MS_OK                                                   0
73
 
#define MS_ERR_ENGINE                                   1                                                       /* Internal engine error. */
74
 
#define MS_ERR_UNKNOWN_TABLE                    2                                                       /* Returned if the engine cannot open the given table. */
75
 
#define MS_ERR_NOT_FOUND                                3                                                       /* The BLOB cannot be found. */
76
 
#define MS_ERR_TABLE_LOCKED                             4                                                       /* Table is currently locked. */
77
 
#define MS_ERR_INCORRECT_URL                    5
78
 
#define MS_ERR_AUTH_FAILED                              6
79
 
#define MS_ERR_NOT_IMPLEMENTED                  7
80
 
#define MS_ERR_UNKNOWN_DB                               8
81
 
#define MS_ERR_REMOVING_REPO                    9
82
 
#define MS_ERR_DATABASE_DELETED                 10
83
 
#define MS_ERR_DUPLICATE                                11                                              /* Attempt to insert a duplicate key into a system table. */
84
 
#define MS_ERR_INVALID_RECORD                   12
85
 
#define MS_ERR_RECOVERY_IN_PROGRESS             13
86
 
#define MS_ERR_DUPLICATE_DB                             14
87
 
#define MS_ERR_DUPLICATE_DB_ID                  15
88
 
#define MS_ERR_INVALID_OPERATION                16
89
 
#define MS_ERR_MISSING_CLOUD_REFFERENCE 17
90
 
#define MS_ERR_SYSTAB_VERSION                   18
91
 
 
92
 
#define MS_LOCK_NONE                                    0
93
 
#define MS_LOCK_READONLY                                1
94
 
#define MS_LOCK_READ_WRITE                              2
95
 
 
96
 
#define PBMS_BLOB_URL_SIZE                              120
97
 
 
98
 
#define PBMS_FIELD_COL_SIZE                             128
99
 
#define PBMS_FIELD_COND_SIZE                    300
100
 
 
101
 
#define MS_RESULT_MESSAGE_SIZE                  300
102
 
#define MS_RESULT_STACK_SIZE                    200
103
 
 
104
 
typedef struct PBMSResultRec {
105
 
        uint8_t                         mr_had_blobs;                                                   /* A flag to indicate if the statement had any PBMS blobs. */
106
 
        int                                             mr_code;                                                                /* Engine specific error code. */ 
107
 
        char                                    mr_message[MS_RESULT_MESSAGE_SIZE];             /* Error message, required if non-zero return code. */
108
 
        char                                    mr_stack[MS_RESULT_STACK_SIZE];                 /* Trace information about where the error occurred. */
109
 
} PBMSResultRec, *PBMSResultPtr;
110
 
 
111
 
 
112
 
 
113
 
typedef struct PBMSBlobID {
114
 
        uint32_t                                bi_db_id;       
115
 
        uint64_t                                bi_blob_size;   
116
 
        uint64_t                                bi_blob_id;                             // or repo file offset if type = REPO
117
 
        uint64_t                                bi_blob_ref_id;                 
118
 
        uint32_t                                bi_tab_id;                              // or repo ID if type = REPO
119
 
        uint32_t                                bi_auth_code;
120
 
        uint32_t                                bi_blob_type;
121
 
} PBMSBlobIDRec, *PBMSBlobIDPtr;
122
 
 
123
 
 
124
 
typedef struct MSBlobURL {
125
 
        uint8_t                                 bu_type;
126
 
        uint32_t                                bu_db_id;
127
 
        uint32_t                                bu_tab_id;                              // or repo ID if type = REPO
128
 
        uint64_t                                bu_blob_id;                             // or repo file offset if type = REPO
129
 
        uint32_t                                bu_auth_code;
130
 
        uint32_t                                bu_server_id;
131
 
        uint64_t                                bu_blob_size;                   
132
 
        uint64_t                                bu_blob_ref_id;                 // Unique identifier of the blob reference
133
 
} MSBlobURLRec, *MSBlobURLPtr;
134
 
 
135
 
 
136
 
typedef struct PBMSBlobURL {
137
 
        char                                    bu_data[PBMS_BLOB_URL_SIZE];
138
 
} PBMSBlobURLRec, *PBMSBlobURLPtr;
139
 
 
140
 
typedef struct PBMSEngineRec {
141
 
        int                                             ms_version;                                                     /* MS_ENGINE_VERSION */
142
 
        int                                             ms_index;                                                       /* The index into the engine list. */
143
 
        int                                             ms_removing;                                            /* TRUE (1) if the engine is being removed. */
144
 
        int                                             ms_internal;                                            /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
145
 
        char                                    ms_engine_name[32];
146
 
        int                                             ms_has_transactions;                            /* TRUE (1) if the engine supports transactions. */
147
 
} PBMSEngineRec, *PBMSEnginePtr;
148
 
 
149
 
/*                      2       10              1                       10                      20                      10                              10                      20                              20
150
 
 * Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
151
 
 */
152
 
 
153
 
#ifndef  PRIu64
154
 
#define URL_FMT "~*%u%c%u-%llu-%x-%u-%llu-%llu"
155
 
#else
156
 
#define URL_FMT "~*%"PRIu32"%c%"PRIu32"-%"PRIu64"-%x-%"PRIu32"-%"PRIu64"-%"PRIu64""
157
 
#endif
158
 
#define MS_URL_TYPE_BLOB        '~'
159
 
#define MS_URL_TYPE_REPO        '_'
160
 
class PBMSBlobURLTools
161
 
{
162
 
        public:
163
 
        static bool couldBeURL(const char *blob_url, size_t size, MSBlobURLPtr blob)
164
 
        {
165
 
                if (blob_url && (size < PBMS_BLOB_URL_SIZE) && (size > 16)) {
166
 
                        MSBlobURLRec ignored_blob;
167
 
                        char    buffer[PBMS_BLOB_URL_SIZE+1];
168
 
                        char    junk[5];
169
 
                        int             scanned;
170
 
                        
171
 
                        if (!blob)
172
 
                                blob = &ignored_blob;
173
 
                        
174
 
                        junk[0] = 0;
175
 
                        
176
 
                        // There is no guarantee that the URL will be null terminated
177
 
                        // so always copy it into our own buffer to be safe.
178
 
                        memcpy(buffer, blob_url, size);
179
 
                        buffer[size] = 0;
180
 
                        blob_url = buffer;
181
 
                        
182
 
                        scanned = sscanf(blob_url, URL_FMT"%4s", 
183
 
                                &blob->bu_db_id, 
184
 
                                &blob->bu_type, 
185
 
                                &blob->bu_tab_id, 
186
 
                                &blob->bu_blob_id, 
187
 
                                &blob->bu_auth_code, 
188
 
                                &blob->bu_server_id, 
189
 
                                &blob->bu_blob_ref_id, 
190
 
                                &blob->bu_blob_size, 
191
 
                                junk);
192
 
                                
193
 
                        if ((scanned != 8) || (blob->bu_type != MS_URL_TYPE_BLOB && blob->bu_type != MS_URL_TYPE_REPO)) {// If junk is found at the end this will also result in an invalid URL. 
194
 
                                //printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
195
 
                                return false;
196
 
                        }
197
 
                
198
 
                        return true;
199
 
                }
200
 
                
201
 
                return false;
202
 
        }
203
 
        
204
 
        static bool couldBeURL(const char *blob_url, MSBlobURLPtr blob)
205
 
        {
206
 
                return couldBeURL(blob_url, strlen(blob_url), blob);
207
 
        }
208
 
        
209
 
        static void buildBlobURL(MSBlobURLPtr blob, PBMSBlobURLPtr url)
210
 
        {
211
 
                snprintf(url->bu_data, PBMS_BLOB_URL_SIZE, URL_FMT,     blob->bu_db_id, 
212
 
                                                                blob->bu_type, 
213
 
                                                                blob->bu_tab_id, 
214
 
                                                                blob->bu_blob_id, 
215
 
                                                                blob->bu_auth_code, 
216
 
                                                                blob->bu_server_id, 
217
 
                                                                blob->bu_blob_ref_id, 
218
 
                                                                blob->bu_blob_size);
219
 
        }
220
 
};
221
 
 
222
 
#ifndef DRIZZLED
223
 
/*
224
 
 * This function should never be called directly, it is called
225
 
 * by deregisterEngine() below.
226
 
 */
227
 
typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
228
 
 
229
 
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
230
 
 
231
 
/*
232
 
 * Call this function to store a BLOB in the repository the BLOB's
233
 
 * URL will be returned. The returned URL buffer is expected to be atleast 
234
 
 * PBMS_BLOB_URL_SIZE long.
235
 
 *
236
 
 * The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
237
 
 */
238
 
typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result);
239
 
 
240
 
/*
241
 
 * Call this function for each BLOB to be retained. When a BLOB is used, the 
242
 
 * URL may be changed. The returned URL buffer is expected to be atleast 
243
 
 * PBMS_BLOB_URL_SIZE long.
244
 
 *
245
 
 * The returned URL must be inserted into the row in place of the given
246
 
 * URL.
247
 
 */
248
 
typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
249
 
 
250
 
/*
251
 
 * If a row containing a BLOB is deleted, then the BLOBs in the
252
 
 * row must be released.
253
 
 *
254
 
 * Note: if a table is dropped, all the BLOBs referenced by the
255
 
 * table are automatically released.
256
 
 */
257
 
typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
258
 
 
259
 
typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
260
 
 
261
 
typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result);
262
 
 
263
 
typedef void (*ECCallCompleted)(bool built_in, bool ok);
264
 
 
265
 
typedef struct PBMSCallbacksRec {
266
 
        int                                             cb_version;                                                     /* MS_CALLBACK_VERSION */
267
 
        ECRegisterdFunc                 cb_register;
268
 
        ECDeregisterdFunc               cb_deregister;
269
 
        ECCreateBlobsFunc               cb_create_blob;
270
 
        ECRetainBlobsFunc               cb_retain_blob;
271
 
        ECReleaseBlobFunc               cb_release_blob;
272
 
        ECDropTable                             cb_drop_table;
273
 
        ECRenameTable                   cb_rename_table;
274
 
        ECCallCompleted                 cb_completed;
275
 
} PBMSCallbacksRec, *PBMSCallbacksPtr;
276
 
 
277
 
typedef struct PBMSSharedMemoryRec {
278
 
        int                                             sm_magic;                                                       /* MS_SHARED_MEMORY_MAGIC */
279
 
        int                                             sm_version;                                                     /* MS_SHARED_MEMORY_VERSION */
280
 
        volatile int                    sm_shutdown_lock;                                       /* "Cheap" lock for shutdown! */
281
 
        PBMSCallbacksPtr                sm_callbacks;
282
 
        int                                             sm_reserved1[20];
283
 
        void                                    *sm_reserved2[20];
284
 
        int                                             sm_list_size;
285
 
        int                                             sm_list_len;
286
 
        PBMSEnginePtr                   sm_engine_list[MS_ENGINE_LIST_SIZE];
287
 
} PBMSSharedMemoryRec, *PBMSSharedMemoryPtr;
288
 
 
289
 
#ifdef PBMS_API
290
 
 
291
 
class PBMS_API
292
 
{
293
 
private:
294
 
        const char *temp_prefix[3];
295
 
        bool built_in;
296
 
 
297
 
public:
298
 
        PBMS_API(): sharedMemory(NULL) { 
299
 
                int i = 0;
300
 
                temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
301
 
                temp_prefix[i++] = NULL;
302
 
                
303
 
        }
304
 
 
305
 
        ~PBMS_API() { }
306
 
 
307
 
        /*
308
 
         * This method is called by the PBMS engine during startup.
309
 
         */
310
 
        int PBMSStartup(PBMSCallbacksPtr callbacks, PBMSResultPtr result) {
311
 
                int err;
312
 
                
313
 
                deleteTempFiles();
314
 
                err = getSharedMemory(true, result);
315
 
                if (!err)
316
 
                        sharedMemory->sm_callbacks = callbacks;
317
 
                        
318
 
                return err;
319
 
        }
320
 
 
321
 
        /*
322
 
         * This method is called by the PBMS engine during startup.
323
 
         */
324
 
        void PBMSShutdown() {
325
 
                
326
 
                if (!sharedMemory)
327
 
                        return;
328
 
                        
329
 
                lock();
330
 
                sharedMemory->sm_callbacks = NULL;
331
 
 
332
 
                bool empty = true;
333
 
                for (int i=0; i<sharedMemory->sm_list_len && empty; i++) {
334
 
                        if (sharedMemory->sm_engine_list[i]) 
335
 
                                empty = false;
336
 
                }
337
 
 
338
 
                unlock();
339
 
                
340
 
                if (empty) 
341
 
                        removeSharedMemory();
342
 
        }
343
 
 
344
 
        /*
345
 
         * Register the engine with the Stream Daemon.
346
 
         */
347
 
        int registerEngine(PBMSEnginePtr the_engine, PBMSResultPtr result) {
348
 
                int err;
349
 
 
350
 
                deleteTempFiles();
351
 
 
352
 
                // The first engine to register creates the shared memory.
353
 
                if ((err = getSharedMemory(true, result)))
354
 
                        return err;
355
 
 
356
 
                lock();
357
 
                for (int i=0; i<sharedMemory->sm_list_size; i++) {
358
 
                        if (!sharedMemory->sm_engine_list[i]) {
359
 
                                PBMSEnginePtr engine;
360
 
                                engine = (PBMSEnginePtr) malloc(sizeof(PBMSEngineRec));
361
 
                                if (!engine) {
362
 
                                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Out of memory.");
363
 
                                        err = MS_ERR_ENGINE;
364
 
                                        goto done;
365
 
                                }
366
 
                                memcpy(engine, the_engine, sizeof(PBMSEngineRec));
367
 
                                
368
 
                                sharedMemory->sm_engine_list[i] = engine;
369
 
                                engine->ms_index = i;
370
 
                                if (i >= sharedMemory->sm_list_len)
371
 
                                        sharedMemory->sm_list_len = i+1;
372
 
                                if (sharedMemory->sm_callbacks)
373
 
                                        sharedMemory->sm_callbacks->cb_register(engine);
374
 
                                        
375
 
                                built_in = (engine->ms_internal == 1);
376
 
                                err =  MS_OK;
377
 
                                goto done;
378
 
                        }
379
 
                }
380
 
                
381
 
                result->mr_code = 15010;
382
 
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Too many BLOB streaming engines already registered");
383
 
                *result->mr_stack = 0;
384
 
                
385
 
                err = MS_ERR_ENGINE;
386
 
                
387
 
        done:
388
 
                unlock();
389
 
                return err;
390
 
        }
391
 
 
392
 
        void lock() {
393
 
                while (sharedMemory->sm_shutdown_lock)
394
 
                        usleep(10000);
395
 
                sharedMemory->sm_shutdown_lock++;
396
 
                while (sharedMemory->sm_shutdown_lock != 1) {
397
 
                        usleep(random() % 10000);
398
 
                        sharedMemory->sm_shutdown_lock--;
399
 
                        usleep(10000);
400
 
                        sharedMemory->sm_shutdown_lock++;
401
 
                }
402
 
        }
403
 
 
404
 
        void unlock() {
405
 
                sharedMemory->sm_shutdown_lock--;
406
 
        }
407
 
 
408
 
        void deregisterEngine(const char *engine_name) {
409
 
                PBMSResultRec result;
410
 
                int err;
411
 
 
412
 
                if ((err = getSharedMemory(false, &result)))
413
 
                        return;
414
 
 
415
 
                lock();
416
 
 
417
 
                bool empty = true;
418
 
                for (int i=0; i<sharedMemory->sm_list_len; i++) {
419
 
                        PBMSEnginePtr engine = sharedMemory->sm_engine_list[i];
420
 
                        if (engine) {                            
421
 
                                if (strcmp(engine->ms_engine_name, engine_name) == 0) {
422
 
                                        if (sharedMemory->sm_callbacks)
423
 
                                                sharedMemory->sm_callbacks->cb_deregister(engine);
424
 
                                        free(engine);
425
 
                                        sharedMemory->sm_engine_list[i] = NULL;
426
 
                                }
427
 
                                else
428
 
                                        empty = false;
429
 
                        }
430
 
                }
431
 
 
432
 
                unlock();
433
 
 
434
 
                if (empty) 
435
 
                        removeSharedMemory();
436
 
        }
437
 
 
438
 
        void removeSharedMemory() 
439
 
        {
440
 
                const char **prefix = temp_prefix;
441
 
                char    temp_file[100];
442
 
 
443
 
                // Do not remove the sharfed memory until after
444
 
                // the PBMS engine has shutdown.
445
 
                if (sharedMemory->sm_callbacks)
446
 
                        return;
447
 
                        
448
 
                sharedMemory->sm_magic = 0;
449
 
                free(sharedMemory);
450
 
                sharedMemory = NULL;
451
 
                
452
 
                while (*prefix) {
453
 
                        getTempFileName(temp_file, 100, *prefix, getpid());
454
 
                        unlink(temp_file);
455
 
                        prefix++;
456
 
                }
457
 
        }
458
 
        
459
 
        bool isPBMSLoaded()
460
 
        {
461
 
                PBMSResultRec result;
462
 
                if (getSharedMemory(false, &result))
463
 
                        return false;
464
 
                        
465
 
                return (sharedMemory->sm_callbacks != NULL);
466
 
        }
467
 
        
468
 
        int  retainBlob(const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
469
 
        {
470
 
                int err;
471
 
                char safe_url[PBMS_BLOB_URL_SIZE+1];
472
 
 
473
 
 
474
 
                if ((err = getSharedMemory(false, result)))
475
 
                        return err;
476
 
 
477
 
                if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL)) {
478
 
                
479
 
                        if (!sharedMemory->sm_callbacks)  {
480
 
                                ret_blob_url->bu_data[0] = 0;
481
 
                                return MS_OK;
482
 
                        }
483
 
                        err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, result);
484
 
                        if (err)
485
 
                                return err;
486
 
                                
487
 
                        blob_url = ret_blob_url->bu_data;
488
 
                } else {
489
 
                        // Make sure the url is a C string:
490
 
                        if (blob_url[blob_size]) {
491
 
                                memcpy(safe_url, blob_url, blob_size);
492
 
                                safe_url[blob_size] = 0;
493
 
                                blob_url = safe_url;
494
 
                        }
495
 
                }
496
 
                
497
 
 
498
 
                if (!sharedMemory->sm_callbacks) {
499
 
                        result->mr_code = MS_ERR_INCORRECT_URL;
500
 
                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming daemon (PBMS) not installed");
501
 
                        *result->mr_stack = 0;
502
 
                        return MS_ERR_INCORRECT_URL;
503
 
                }
504
 
 
505
 
                return sharedMemory->sm_callbacks->cb_retain_blob(built_in, db_name, tab_name, ret_blob_url, blob_url, col_index, result);
506
 
        }
507
 
 
508
 
        int releaseBlob(const char *db_name, const char *tab_name, char *blob_url, size_t blob_size, PBMSResultPtr result)
509
 
        {
510
 
                int err;
511
 
                char safe_url[PBMS_BLOB_URL_SIZE+1];
512
 
 
513
 
                if ((err = getSharedMemory(false, result)))
514
 
                        return err;
515
 
 
516
 
                if (!sharedMemory->sm_callbacks)
517
 
                        return MS_OK;
518
 
 
519
 
                if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL))
520
 
                        return MS_OK;
521
 
 
522
 
                if (blob_url[blob_size]) {
523
 
                        memcpy(safe_url, blob_url, blob_size);
524
 
                        safe_url[blob_size] = 0;
525
 
                        blob_url = safe_url;
526
 
                }
527
 
                
528
 
                return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
529
 
        }
530
 
 
531
 
        int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
532
 
        {
533
 
                int err;
534
 
 
535
 
                if ((err = getSharedMemory(false, result)))
536
 
                        return err;
537
 
 
538
 
                if (!sharedMemory->sm_callbacks)
539
 
                        return MS_OK;
540
 
                        
541
 
                return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
542
 
        }
543
 
 
544
 
        int renameTable(const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result)
545
 
        {
546
 
                int err;
547
 
 
548
 
                if ((err = getSharedMemory(false, result)))
549
 
                        return err;
550
 
 
551
 
                if (!sharedMemory->sm_callbacks)
552
 
                        return MS_OK;
553
 
                        
554
 
                return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_db, to_table, result);
555
 
        }
556
 
 
557
 
        void completed(int ok)
558
 
        {
559
 
                PBMSResultRec result;
560
 
 
561
 
                if (getSharedMemory(false, &result))
562
 
                        return;
563
 
 
564
 
                if (!sharedMemory->sm_callbacks)
565
 
                        return;
566
 
                        
567
 
                sharedMemory->sm_callbacks->cb_completed(built_in, ok);
568
 
        }
569
 
        
570
 
        volatile PBMSSharedMemoryPtr sharedMemory;
571
 
 
572
 
private:
573
 
        int getSharedMemory(bool create, PBMSResultPtr result)
574
 
        {
575
 
                int             tmp_f;
576
 
                int             r;
577
 
                char    temp_file[100];
578
 
                const char      **prefix = temp_prefix;
579
 
 
580
 
                if (sharedMemory)
581
 
                        return MS_OK;
582
 
 
583
 
                while (*prefix) {
584
 
                        getTempFileName(temp_file, 100, *prefix, getpid());
585
 
                        tmp_f = open(temp_file, O_RDWR | (create ? O_CREAT : 0), SH_MASK);
586
 
                        if (tmp_f == -1)
587
 
                                return setOSResult(errno, "open", temp_file, result);
588
 
 
589
 
                        r = lseek(tmp_f, 0, SEEK_SET);
590
 
                        if (r == -1) {
591
 
                                close(tmp_f);
592
 
                                return setOSResult(errno, "lseek", temp_file, result);
593
 
                        }
594
 
                        ssize_t tfer;
595
 
                        char buffer[100];
596
 
                        
597
 
                        tfer = read(tmp_f, buffer, 100);
598
 
                        if (tfer == -1) {
599
 
                                close(tmp_f);
600
 
                                return setOSResult(errno, "read", temp_file, result);
601
 
                        }
602
 
 
603
 
                        buffer[tfer] = 0;
604
 
                        sscanf(buffer, "%p", (void**) &sharedMemory);
605
 
                        if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
606
 
                                if (!create)
607
 
                                        return MS_OK;
608
 
 
609
 
                                sharedMemory = (PBMSSharedMemoryPtr) calloc(1, sizeof(PBMSSharedMemoryRec));
610
 
                                sharedMemory->sm_magic = MS_SHARED_MEMORY_MAGIC;
611
 
                                sharedMemory->sm_version = MS_SHARED_MEMORY_VERSION;
612
 
                                sharedMemory->sm_list_size = MS_ENGINE_LIST_SIZE;
613
 
 
614
 
                                r = lseek(tmp_f, 0, SEEK_SET);
615
 
                                if (r == -1) {
616
 
                                        close(tmp_f);
617
 
                                        return setOSResult(errno, "fseek", temp_file, result);
618
 
                                }
619
 
 
620
 
                                snprintf(buffer, 100, "%p", (void*) sharedMemory);
621
 
                                tfer = write(tmp_f, buffer, strlen(buffer));
622
 
                                if (tfer != (ssize_t) strlen(buffer)) {
623
 
                                        close(tmp_f);
624
 
                                        return setOSResult(errno, "write", temp_file, result);
625
 
                                }
626
 
                                r = fsync(tmp_f);
627
 
                                if (r == -1) {
628
 
                                        close(tmp_f);
629
 
                                        return setOSResult(errno, "fsync", temp_file, result);
630
 
                                }
631
 
                        }
632
 
                        else if (sharedMemory->sm_version != MS_SHARED_MEMORY_VERSION) {
633
 
                                close(tmp_f);
634
 
                                result->mr_code = -1000;
635
 
                                *result->mr_stack = 0;
636
 
                                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Shared memory version: ");          
637
 
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, sharedMemory->sm_version);           
638
 
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ", does not match engine shared memory version: ");          
639
 
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, MS_SHARED_MEMORY_VERSION);           
640
 
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ".");                
641
 
                                return MS_ERR_ENGINE;
642
 
                        }
643
 
                        close(tmp_f);
644
 
                        
645
 
                        // For backward compatability we need to create the old versions but we only need to read the current version.
646
 
                        if (create)
647
 
                                prefix++;
648
 
                        else
649
 
                                break;
650
 
                }
651
 
                return MS_OK;
652
 
        }
653
 
 
654
 
        void strcpy(size_t size, char *to, const char *from)
655
 
        {
656
 
                if (size > 0) {
657
 
                        size--;
658
 
                        while (*from && size--)
659
 
                                *to++ = *from++;
660
 
                        *to = 0;
661
 
                }
662
 
        }
663
 
 
664
 
        void strcat(size_t size, char *to, const char *from)
665
 
        {
666
 
                while (*to && size--) to++;
667
 
                strcpy(size, to, from);
668
 
        }
669
 
 
670
 
        void strcat(size_t size, char *to, int val)
671
 
        {
672
 
                char buffer[20];
673
 
 
674
 
                snprintf(buffer, 20, "%d", val);
675
 
                strcat(size, to, buffer);
676
 
        }
677
 
 
678
 
        int setOSResult(int err, const char *func, char *file, PBMSResultPtr result) {
679
 
                char *msg;
680
 
 
681
 
                result->mr_code = err;
682
 
                *result->mr_stack = 0;
683
 
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "System call ");             
684
 
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, func);               
685
 
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "() failed on ");            
686
 
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, file);               
687
 
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ": ");               
688
 
 
689
 
#ifdef XT_WIN
690
 
                if (FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, iMessage + strlen(iMessage), MS_RESULT_MESSAGE_SIZE - strlen(iMessage), NULL)) {
691
 
                        char *ptr;
692
 
 
693
 
                        ptr = &iMessage[strlen(iMessage)];
694
 
                        while (ptr-1 > err_msg) {
695
 
                                if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.')
696
 
                                        break;
697
 
                                ptr--;
698
 
                        }
699
 
                        *ptr = 0;
700
 
 
701
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
702
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
703
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
704
 
                        return MS_ERR_ENGINE;
705
 
                }
706
 
#endif
707
 
 
708
 
                msg = strerror(err);
709
 
                if (msg) {
710
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, msg);
711
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
712
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
713
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
714
 
                }
715
 
                else {
716
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Unknown OS error code ");
717
 
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
718
 
                }
719
 
 
720
 
                return MS_ERR_ENGINE;
721
 
        }
722
 
 
723
 
        void getTempFileName(char *temp_file, int buffer_size, const char * prefix, int pid)
724
 
        {
725
 
                snprintf(temp_file, buffer_size, "/tmp/%s%d", prefix,  pid);
726
 
        }
727
 
 
728
 
        bool startsWith(const char *cstr, const char *w_cstr)
729
 
        {
730
 
                while (*cstr && *w_cstr) {
731
 
                        if (*cstr != *w_cstr)
732
 
                                return false;
733
 
                        cstr++;
734
 
                        w_cstr++;
735
 
                }
736
 
                return *cstr || !*w_cstr;
737
 
        }
738
 
 
739
 
        void deleteTempFiles()
740
 
        {
741
 
                struct dirent   entry;
742
 
                struct dirent   *result;
743
 
                DIR                             *odir;
744
 
                int                             err;
745
 
                char                    temp_file[100];
746
 
 
747
 
                if (!(odir = opendir("/tmp/")))
748
 
                        return;
749
 
                err = readdir_r(odir, &entry, &result);
750
 
                while (!err && result) {
751
 
                        const char **prefix = temp_prefix;
752
 
                        
753
 
                        while (*prefix) {
754
 
                                if (startsWith(entry.d_name, *prefix)) {
755
 
                                        int pid = atoi(entry.d_name + strlen(*prefix));
756
 
                                        
757
 
                                        /* If the process does not exist: */
758
 
                                        if (kill(pid, 0) == -1 && errno == ESRCH) {
759
 
                                                getTempFileName(temp_file, 100, *prefix, pid);
760
 
                                                unlink(temp_file);
761
 
                                        }
762
 
                                }
763
 
                                prefix++;
764
 
                        }
765
 
                        
766
 
                        err = readdir_r(odir, &entry, &result);
767
 
                }
768
 
                closedir(odir);
769
 
        }
770
 
};
771
 
#endif // PBMS_API
772
 
 
773
 
/*
774
 
 * The following is a low level API for accessing blobs directly.
775
 
 */
776
 
 
777
 
 
778
 
/*
779
 
 * Any threads using the direct blob access API must first register them selves with the
780
 
 * blob streaming engine before using the blob access functions. This is done by calling
781
 
 * PBMSInitBlobStreamingThread(). Call PBMSDeinitBlobStreamingThread() after the thread is
782
 
 * done using the direct blob access API
783
 
 */
784
 
 
785
 
/* 
786
 
* PBMSInitBlobStreamingThread(): Returns a pointer to a blob streaming thread.
787
 
*/
788
 
extern void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result);
789
 
extern void PBMSDeinitBlobStreamingThread(void *v_bs_thread);
790
 
 
791
 
/* 
792
 
* PBMSGetError():Gets the last error reported by a blob streaming thread.
793
 
*/
794
 
extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
795
 
 
796
 
/* 
797
 
* PBMSCreateBlob():Creates a new blob in the database of the given size.
798
 
*/
799
 
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
800
 
 
801
 
/* 
802
 
* PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of 
803
 
* data written to the blob must match the size specified when the blob was created.
804
 
*/
805
 
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
806
 
 
807
 
/* 
808
 
* PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
809
 
*/
810
 
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
811
 
 
812
 
/*
813
 
* PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast  PBMS_BLOB_URL_SIZE bytes in size.
814
 
*/
815
 
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
816
 
 
817
 
/*
818
 
* PBMSIDToURL():Convert a blob URL to a blob ID.
819
 
*/
820
 
extern bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id);
821
 
#endif //DRIZZLED 
822
 
 
823
 
 
824
 
#endif //__PBMS_H__