~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/pbms.h

  • Committer: Stewart Smith
  • Date: 2010-11-03 03:27:09 UTC
  • mto: (1902.1.1 build) (1910.1.2 build)
  • mto: This revision was merged to the branch mainline in revision 1903.
  • Revision ID: stewart@flamingspork.com-20101103032709-oyvfrc6eb8fzj0mr
fix docs warning: docs/unlock.rst:2: (WARNING/2) Title underline too short.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2010 PrimeBase Technologies GmbH
 
2
 * All rights reserved.
 
3
 * 
 
4
 * Redistribution and use in source and binary forms, with or without 
 
5
 * modification, are permitted provided that the following conditions are met:
 
6
 * 
 
7
 *     * Redistributions of source code must retain the above copyright notice, 
 
8
 *              this list of conditions and the following disclaimer.
 
9
 *     * Redistributions in binary form must reproduce the above copyright notice, 
 
10
 *              this list of conditions and the following disclaimer in the documentation 
 
11
 *              and/or other materials provided with the distribution.
 
12
 *     * Neither the name of the "PrimeBase Technologies GmbH" nor the names of its 
 
13
 *              contributors may be used to endorse or promote products derived from this 
 
14
 *              software without specific prior written permission.
 
15
 * 
 
16
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 
 
17
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
 
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
 
19
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 
 
20
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 
 
21
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 
22
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
 
23
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 
24
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 
25
 * POSSIBILITY OF SUCH DAMAGE. 
 
26
 *  
 
27
 * PrimeBase Media Stream for MySQL and Drizzle
 
28
 *
 
29
 * Original author: Paul McCullagh
 
30
 * Continued development: Barry Leslie
 
31
 * H&G2JCtL
 
32
 *
 
33
 * 2007-06-01
 
34
 *
 
35
 * This file contains the BLOB streaming interface engines that
 
36
 * are streaming enabled.
 
37
 *
 
38
 */
 
39
#ifndef __PBMS_H__
 
40
#define __PBMS_H__
 
41
 
 
42
#include <stdio.h>
 
43
#include <sys/types.h>
 
44
#include <unistd.h>
 
45
#include <stdlib.h>
 
46
#include <fcntl.h>
 
47
#include <string.h>
 
48
#include <dirent.h>
 
49
#include <signal.h>
 
50
#include <ctype.h>
 
51
#include <errno.h>
 
52
#include <inttypes.h>
 
53
#include <stdint.h>
 
54
 
 
55
#ifdef USE_PRAGMA_INTERFACE
 
56
#pragma interface                       /* gcc class implementation */
 
57
#endif
 
58
 
 
59
 
 
60
#define MS_SHARED_MEMORY_MAGIC                  0x7E9A120C
 
61
#define MS_ENGINE_VERSION                               3
 
62
#define MS_CALLBACK_VERSION                             6
 
63
#define MS_SHARED_MEMORY_VERSION                2
 
64
#define MS_ENGINE_LIST_SIZE                             10
 
65
#define MS_TEMP_FILE_PREFIX                             "pbms_temp_"
 
66
 
 
67
#define MS_BLOB_HANDLE_SIZE                             300
 
68
 
 
69
#define SH_MASK                                                 ((S_IRUSR | S_IWUSR) | (S_IRGRP | S_IWGRP) | (S_IROTH))
 
70
 
 
71
#define MS_OK                                                   0
 
72
#define MS_ERR_ENGINE                                   1                                                       /* Internal engine error. */
 
73
#define MS_ERR_UNKNOWN_TABLE                    2                                                       /* Returned if the engine cannot open the given table. */
 
74
#define MS_ERR_NOT_FOUND                                3                                                       /* The BLOB cannot be found. */
 
75
#define MS_ERR_TABLE_LOCKED                             4                                                       /* Table is currently locked. */
 
76
#define MS_ERR_INCORRECT_URL                    5
 
77
#define MS_ERR_AUTH_FAILED                              6
 
78
#define MS_ERR_NOT_IMPLEMENTED                  7
 
79
#define MS_ERR_UNKNOWN_DB                               8
 
80
#define MS_ERR_REMOVING_REPO                    9
 
81
#define MS_ERR_DATABASE_DELETED                 10
 
82
#define MS_ERR_DUPLICATE                                11                                              /* Attempt to insert a duplicate key into a system table. */
 
83
#define MS_ERR_INVALID_RECORD                   12
 
84
#define MS_ERR_RECOVERY_IN_PROGRESS             13
 
85
#define MS_ERR_DUPLICATE_DB                             14
 
86
#define MS_ERR_DUPLICATE_DB_ID                  15
 
87
#define MS_ERR_INVALID_OPERATION                16
 
88
#define MS_ERR_MISSING_CLOUD_REFFERENCE 17
 
89
#define MS_ERR_SYSTAB_VERSION                   18
 
90
 
 
91
#define MS_LOCK_NONE                                    0
 
92
#define MS_LOCK_READONLY                                1
 
93
#define MS_LOCK_READ_WRITE                              2
 
94
 
 
95
#define PBMS_BLOB_URL_SIZE                              120
 
96
 
 
97
#define PBMS_FIELD_COL_SIZE                             128
 
98
#define PBMS_FIELD_COND_SIZE                    300
 
99
 
 
100
#define MS_RESULT_MESSAGE_SIZE                  300
 
101
#define MS_RESULT_STACK_SIZE                    200
 
102
 
 
103
typedef struct PBMSResultRec {
 
104
        uint8_t                         mr_had_blobs;                                                   /* A flag to indicate if the statement had any PBMS blobs. */
 
105
        int                                             mr_code;                                                                /* Engine specific error code. */ 
 
106
        char                                    mr_message[MS_RESULT_MESSAGE_SIZE];             /* Error message, required if non-zero return code. */
 
107
        char                                    mr_stack[MS_RESULT_STACK_SIZE];                 /* Trace information about where the error occurred. */
 
108
} PBMSResultRec, *PBMSResultPtr;
 
109
 
 
110
 
 
111
 
 
112
typedef struct PBMSBlobID {
 
113
        uint32_t                                bi_db_id;       
 
114
        uint64_t                                bi_blob_size;   
 
115
        uint64_t                                bi_blob_id;                             // or repo file offset if type = REPO
 
116
        uint64_t                                bi_blob_ref_id;                 
 
117
        uint32_t                                bi_tab_id;                              // or repo ID if type = REPO
 
118
        uint32_t                                bi_auth_code;
 
119
        uint32_t                                bi_blob_type;
 
120
} PBMSBlobIDRec, *PBMSBlobIDPtr;
 
121
 
 
122
 
 
123
typedef struct MSBlobURL {
 
124
        uint8_t                                 bu_type;
 
125
        uint32_t                                bu_db_id;
 
126
        uint32_t                                bu_tab_id;                              // or repo ID if type = REPO
 
127
        uint64_t                                bu_blob_id;                             // or repo file offset if type = REPO
 
128
        uint32_t                                bu_auth_code;
 
129
        uint32_t                                bu_server_id;
 
130
        uint64_t                                bu_blob_size;                   
 
131
        uint64_t                                bu_blob_ref_id;                 // Unique identifier of the blob reference
 
132
} MSBlobURLRec, *MSBlobURLPtr;
 
133
 
 
134
 
 
135
typedef struct PBMSBlobURL {
 
136
        char                                    bu_data[PBMS_BLOB_URL_SIZE];
 
137
} PBMSBlobURLRec, *PBMSBlobURLPtr;
 
138
 
 
139
typedef struct PBMSEngineRec {
 
140
        int                                             ms_version;                                                     /* MS_ENGINE_VERSION */
 
141
        int                                             ms_index;                                                       /* The index into the engine list. */
 
142
        int                                             ms_removing;                                            /* TRUE (1) if the engine is being removed. */
 
143
        int                                             ms_internal;                                            /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
 
144
        char                                    ms_engine_name[32];
 
145
        int                                             ms_has_transactions;                            /* TRUE (1) if the engine supports transactions. */
 
146
} PBMSEngineRec, *PBMSEnginePtr;
 
147
 
 
148
/*                      2       10              1                       10                      20                      10                              10                      20                              20
 
149
 * Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
 
150
 */
 
151
 
 
152
#ifndef  PRIu64
 
153
#define URL_FMT "~*%u%c%u-%llu-%x-%u-%llu-%llu"
 
154
#else
 
155
#define URL_FMT "~*%"PRIu32"%c%"PRIu32"-%"PRIu64"-%x-%"PRIu32"-%"PRIu64"-%"PRIu64""
 
156
#endif
 
157
#define MS_URL_TYPE_BLOB        '~'
 
158
#define MS_URL_TYPE_REPO        '_'
 
159
class PBMSBlobURLTools
 
160
{
 
161
        public:
 
162
        static bool couldBeURL(const char *blob_url, size_t size, MSBlobURLPtr blob)
 
163
        {
 
164
                if (blob_url && (size < PBMS_BLOB_URL_SIZE)) {
 
165
                        MSBlobURLRec ignored_blob;
 
166
                        char    buffer[PBMS_BLOB_URL_SIZE+1];
 
167
                        char    junk[5];
 
168
                        int             scanned;
 
169
                        
 
170
                        if (!blob)
 
171
                                blob = &ignored_blob;
 
172
                        
 
173
                        junk[0] = 0;
 
174
                        if (blob_url[size]) { // There is no guarantee that the URL will be null terminated.
 
175
                                memcpy(buffer, blob_url, size);
 
176
                                buffer[size] = 0;
 
177
                                blob_url = buffer;
 
178
                        }
 
179
                        
 
180
                        scanned = sscanf(blob_url, URL_FMT"%4s", 
 
181
                                &blob->bu_db_id, 
 
182
                                &blob->bu_type, 
 
183
                                &blob->bu_tab_id, 
 
184
                                &blob->bu_blob_id, 
 
185
                                &blob->bu_auth_code, 
 
186
                                &blob->bu_server_id, 
 
187
                                &blob->bu_blob_ref_id, 
 
188
                                &blob->bu_blob_size, 
 
189
                                junk);
 
190
                                
 
191
                        if ((scanned != 8) || (blob->bu_type != MS_URL_TYPE_BLOB && blob->bu_type != MS_URL_TYPE_REPO)) {// If junk is found at the end this will also result in an invalid URL. 
 
192
                                //printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
 
193
                                return false;
 
194
                        }
 
195
                
 
196
                        return true;
 
197
                }
 
198
                
 
199
                return false;
 
200
        }
 
201
        
 
202
        static bool couldBeURL(const char *blob_url, MSBlobURLPtr blob)
 
203
        {
 
204
                return couldBeURL(blob_url, strlen(blob_url), blob);
 
205
        }
 
206
        
 
207
        static void buildBlobURL(MSBlobURLPtr blob, PBMSBlobURLPtr url)
 
208
        {
 
209
                snprintf(url->bu_data, PBMS_BLOB_URL_SIZE, URL_FMT,     blob->bu_db_id, 
 
210
                                                                blob->bu_type, 
 
211
                                                                blob->bu_tab_id, 
 
212
                                                                blob->bu_blob_id, 
 
213
                                                                blob->bu_auth_code, 
 
214
                                                                blob->bu_server_id, 
 
215
                                                                blob->bu_blob_ref_id, 
 
216
                                                                blob->bu_blob_size);
 
217
        }
 
218
};
 
219
 
 
220
#ifndef DRIZZLED
 
221
/*
 
222
 * This function should never be called directly, it is called
 
223
 * by deregisterEngine() below.
 
224
 */
 
225
typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
 
226
 
 
227
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
 
228
 
 
229
/*
 
230
 * Call this function to store a BLOB in the repository the BLOB's
 
231
 * URL will be returned. The returned URL buffer is expected to be atleast 
 
232
 * PBMS_BLOB_URL_SIZE long.
 
233
 *
 
234
 * The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
 
235
 */
 
236
typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result);
 
237
 
 
238
/*
 
239
 * Call this function for each BLOB to be retained. When a BLOB is used, the 
 
240
 * URL may be changed. The returned URL buffer is expected to be atleast 
 
241
 * PBMS_BLOB_URL_SIZE long.
 
242
 *
 
243
 * The returned URL must be inserted into the row in place of the given
 
244
 * URL.
 
245
 */
 
246
typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
 
247
 
 
248
/*
 
249
 * If a row containing a BLOB is deleted, then the BLOBs in the
 
250
 * row must be released.
 
251
 *
 
252
 * Note: if a table is dropped, all the BLOBs referenced by the
 
253
 * table are automatically released.
 
254
 */
 
255
typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
 
256
 
 
257
typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
 
258
 
 
259
typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result);
 
260
 
 
261
typedef void (*ECCallCompleted)(bool built_in, bool ok);
 
262
 
 
263
typedef struct PBMSCallbacksRec {
 
264
        int                                             cb_version;                                                     /* MS_CALLBACK_VERSION */
 
265
        ECRegisterdFunc                 cb_register;
 
266
        ECDeregisterdFunc               cb_deregister;
 
267
        ECCreateBlobsFunc               cb_create_blob;
 
268
        ECRetainBlobsFunc               cb_retain_blob;
 
269
        ECReleaseBlobFunc               cb_release_blob;
 
270
        ECDropTable                             cb_drop_table;
 
271
        ECRenameTable                   cb_rename_table;
 
272
        ECCallCompleted                 cb_completed;
 
273
} PBMSCallbacksRec, *PBMSCallbacksPtr;
 
274
 
 
275
typedef struct PBMSSharedMemoryRec {
 
276
        int                                             sm_magic;                                                       /* MS_SHARED_MEMORY_MAGIC */
 
277
        int                                             sm_version;                                                     /* MS_SHARED_MEMORY_VERSION */
 
278
        volatile int                    sm_shutdown_lock;                                       /* "Cheap" lock for shutdown! */
 
279
        PBMSCallbacksPtr                sm_callbacks;
 
280
        int                                             sm_reserved1[20];
 
281
        void                                    *sm_reserved2[20];
 
282
        int                                             sm_list_size;
 
283
        int                                             sm_list_len;
 
284
        PBMSEnginePtr                   sm_engine_list[MS_ENGINE_LIST_SIZE];
 
285
} PBMSSharedMemoryRec, *PBMSSharedMemoryPtr;
 
286
 
 
287
#ifdef PBMS_API
 
288
 
 
289
class PBMS_API
 
290
{
 
291
private:
 
292
        const char *temp_prefix[3];
 
293
        bool built_in;
 
294
 
 
295
public:
 
296
        PBMS_API(): sharedMemory(NULL) { 
 
297
                int i = 0;
 
298
                temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
 
299
                temp_prefix[i++] = NULL;
 
300
                
 
301
        }
 
302
 
 
303
        ~PBMS_API() { }
 
304
 
 
305
        /*
 
306
         * This method is called by the PBMS engine during startup.
 
307
         */
 
308
        int PBMSStartup(PBMSCallbacksPtr callbacks, PBMSResultPtr result) {
 
309
                int err;
 
310
                
 
311
                deleteTempFiles();
 
312
                err = getSharedMemory(true, result);
 
313
                if (!err)
 
314
                        sharedMemory->sm_callbacks = callbacks;
 
315
                        
 
316
                return err;
 
317
        }
 
318
 
 
319
        /*
 
320
         * This method is called by the PBMS engine during startup.
 
321
         */
 
322
        void PBMSShutdown() {
 
323
                
 
324
                if (!sharedMemory)
 
325
                        return;
 
326
                        
 
327
                lock();
 
328
                sharedMemory->sm_callbacks = NULL;
 
329
 
 
330
                bool empty = true;
 
331
                for (int i=0; i<sharedMemory->sm_list_len && empty; i++) {
 
332
                        if (sharedMemory->sm_engine_list[i]) 
 
333
                                empty = false;
 
334
                }
 
335
 
 
336
                unlock();
 
337
                
 
338
                if (empty) 
 
339
                        removeSharedMemory();
 
340
        }
 
341
 
 
342
        /*
 
343
         * Register the engine with the Stream Daemon.
 
344
         */
 
345
        int registerEngine(PBMSEnginePtr the_engine, PBMSResultPtr result) {
 
346
                int err;
 
347
 
 
348
                deleteTempFiles();
 
349
 
 
350
                // The first engine to register creates the shared memory.
 
351
                if ((err = getSharedMemory(true, result)))
 
352
                        return err;
 
353
 
 
354
                lock();
 
355
                for (int i=0; i<sharedMemory->sm_list_size; i++) {
 
356
                        if (!sharedMemory->sm_engine_list[i]) {
 
357
                                PBMSEnginePtr engine;
 
358
                                engine = (PBMSEnginePtr) malloc(sizeof(PBMSEngineRec));
 
359
                                if (!engine) {
 
360
                                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Out of memory.");
 
361
                                        err = MS_ERR_ENGINE;
 
362
                                        goto done;
 
363
                                }
 
364
                                memcpy(engine, the_engine, sizeof(PBMSEngineRec));
 
365
                                
 
366
                                sharedMemory->sm_engine_list[i] = engine;
 
367
                                engine->ms_index = i;
 
368
                                if (i >= sharedMemory->sm_list_len)
 
369
                                        sharedMemory->sm_list_len = i+1;
 
370
                                if (sharedMemory->sm_callbacks)
 
371
                                        sharedMemory->sm_callbacks->cb_register(engine);
 
372
                                        
 
373
                                built_in = (engine->ms_internal == 1);
 
374
                                err =  MS_OK;
 
375
                                goto done;
 
376
                        }
 
377
                }
 
378
                
 
379
                result->mr_code = 15010;
 
380
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Too many BLOB streaming engines already registered");
 
381
                *result->mr_stack = 0;
 
382
                
 
383
                err = MS_ERR_ENGINE;
 
384
                
 
385
        done:
 
386
                unlock();
 
387
                return err;
 
388
        }
 
389
 
 
390
        void lock() {
 
391
                while (sharedMemory->sm_shutdown_lock)
 
392
                        usleep(10000);
 
393
                sharedMemory->sm_shutdown_lock++;
 
394
                while (sharedMemory->sm_shutdown_lock != 1) {
 
395
                        usleep(random() % 10000);
 
396
                        sharedMemory->sm_shutdown_lock--;
 
397
                        usleep(10000);
 
398
                        sharedMemory->sm_shutdown_lock++;
 
399
                }
 
400
        }
 
401
 
 
402
        void unlock() {
 
403
                sharedMemory->sm_shutdown_lock--;
 
404
        }
 
405
 
 
406
        void deregisterEngine(const char *engine_name) {
 
407
                PBMSResultRec result;
 
408
                int err;
 
409
 
 
410
                if ((err = getSharedMemory(false, &result)))
 
411
                        return;
 
412
 
 
413
                lock();
 
414
 
 
415
                bool empty = true;
 
416
                for (int i=0; i<sharedMemory->sm_list_len; i++) {
 
417
                        PBMSEnginePtr engine = sharedMemory->sm_engine_list[i];
 
418
                        if (engine) {                            
 
419
                                if (strcmp(engine->ms_engine_name, engine_name) == 0) {
 
420
                                        if (sharedMemory->sm_callbacks)
 
421
                                                sharedMemory->sm_callbacks->cb_deregister(engine);
 
422
                                        free(engine);
 
423
                                        sharedMemory->sm_engine_list[i] = NULL;
 
424
                                }
 
425
                                else
 
426
                                        empty = false;
 
427
                        }
 
428
                }
 
429
 
 
430
                unlock();
 
431
 
 
432
                if (empty) 
 
433
                        removeSharedMemory();
 
434
        }
 
435
 
 
436
        void removeSharedMemory() 
 
437
        {
 
438
                const char **prefix = temp_prefix;
 
439
                char    temp_file[100];
 
440
 
 
441
                // Do not remove the sharfed memory until after
 
442
                // the PBMS engine has shutdown.
 
443
                if (sharedMemory->sm_callbacks)
 
444
                        return;
 
445
                        
 
446
                sharedMemory->sm_magic = 0;
 
447
                free(sharedMemory);
 
448
                sharedMemory = NULL;
 
449
                
 
450
                while (*prefix) {
 
451
                        getTempFileName(temp_file, 100, *prefix, getpid());
 
452
                        unlink(temp_file);
 
453
                        prefix++;
 
454
                }
 
455
        }
 
456
        
 
457
        bool isPBMSLoaded()
 
458
        {
 
459
                PBMSResultRec result;
 
460
                if (getSharedMemory(false, &result))
 
461
                        return false;
 
462
                        
 
463
                return (sharedMemory->sm_callbacks != NULL);
 
464
        }
 
465
        
 
466
        int  retainBlob(const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
 
467
        {
 
468
                int err;
 
469
                char safe_url[PBMS_BLOB_URL_SIZE+1];
 
470
 
 
471
 
 
472
                if ((err = getSharedMemory(false, result)))
 
473
                        return err;
 
474
 
 
475
                if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL)) {
 
476
                
 
477
                        if (!sharedMemory->sm_callbacks)  {
 
478
                                ret_blob_url->bu_data[0] = 0;
 
479
                                return MS_OK;
 
480
                        }
 
481
                        err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, result);
 
482
                        if (err)
 
483
                                return err;
 
484
                                
 
485
                        blob_url = ret_blob_url->bu_data;
 
486
                } else {
 
487
                        // Make sure the url is a C string:
 
488
                        if (blob_url[blob_size]) {
 
489
                                memcpy(safe_url, blob_url, blob_size);
 
490
                                safe_url[blob_size] = 0;
 
491
                                blob_url = safe_url;
 
492
                        }
 
493
                }
 
494
                
 
495
 
 
496
                if (!sharedMemory->sm_callbacks) {
 
497
                        result->mr_code = MS_ERR_INCORRECT_URL;
 
498
                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming daemon (PBMS) not installed");
 
499
                        *result->mr_stack = 0;
 
500
                        return MS_ERR_INCORRECT_URL;
 
501
                }
 
502
 
 
503
                return sharedMemory->sm_callbacks->cb_retain_blob(built_in, db_name, tab_name, ret_blob_url, blob_url, col_index, result);
 
504
        }
 
505
 
 
506
        int releaseBlob(const char *db_name, const char *tab_name, char *blob_url, size_t blob_size, PBMSResultPtr result)
 
507
        {
 
508
                int err;
 
509
                char safe_url[PBMS_BLOB_URL_SIZE+1];
 
510
 
 
511
                if ((err = getSharedMemory(false, result)))
 
512
                        return err;
 
513
 
 
514
                if (!sharedMemory->sm_callbacks)
 
515
                        return MS_OK;
 
516
 
 
517
                if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL))
 
518
                        return MS_OK;
 
519
 
 
520
                if (blob_url[blob_size]) {
 
521
                        memcpy(safe_url, blob_url, blob_size);
 
522
                        safe_url[blob_size] = 0;
 
523
                        blob_url = safe_url;
 
524
                }
 
525
                
 
526
                return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
 
527
        }
 
528
 
 
529
        int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
 
530
        {
 
531
                int err;
 
532
 
 
533
                if ((err = getSharedMemory(false, result)))
 
534
                        return err;
 
535
 
 
536
                if (!sharedMemory->sm_callbacks)
 
537
                        return MS_OK;
 
538
                        
 
539
                return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
 
540
        }
 
541
 
 
542
        int renameTable(const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result)
 
543
        {
 
544
                int err;
 
545
 
 
546
                if ((err = getSharedMemory(false, result)))
 
547
                        return err;
 
548
 
 
549
                if (!sharedMemory->sm_callbacks)
 
550
                        return MS_OK;
 
551
                        
 
552
                return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_db, to_table, result);
 
553
        }
 
554
 
 
555
        void completed(int ok)
 
556
        {
 
557
                PBMSResultRec result;
 
558
 
 
559
                if (getSharedMemory(false, &result))
 
560
                        return;
 
561
 
 
562
                if (!sharedMemory->sm_callbacks)
 
563
                        return;
 
564
                        
 
565
                sharedMemory->sm_callbacks->cb_completed(built_in, ok);
 
566
        }
 
567
        
 
568
        volatile PBMSSharedMemoryPtr sharedMemory;
 
569
 
 
570
private:
 
571
        int getSharedMemory(bool create, PBMSResultPtr result)
 
572
        {
 
573
                int             tmp_f;
 
574
                int             r;
 
575
                char    temp_file[100];
 
576
                const char      **prefix = temp_prefix;
 
577
 
 
578
                if (sharedMemory)
 
579
                        return MS_OK;
 
580
 
 
581
                while (*prefix) {
 
582
                        getTempFileName(temp_file, 100, *prefix, getpid());
 
583
                        tmp_f = open(temp_file, O_RDWR | (create ? O_CREAT : 0), SH_MASK);
 
584
                        if (tmp_f == -1)
 
585
                                return setOSResult(errno, "open", temp_file, result);
 
586
 
 
587
                        r = lseek(tmp_f, 0, SEEK_SET);
 
588
                        if (r == -1) {
 
589
                                close(tmp_f);
 
590
                                return setOSResult(errno, "lseek", temp_file, result);
 
591
                        }
 
592
                        ssize_t tfer;
 
593
                        char buffer[100];
 
594
                        
 
595
                        tfer = read(tmp_f, buffer, 100);
 
596
                        if (tfer == -1) {
 
597
                                close(tmp_f);
 
598
                                return setOSResult(errno, "read", temp_file, result);
 
599
                        }
 
600
 
 
601
                        buffer[tfer] = 0;
 
602
                        sscanf(buffer, "%p", (void**) &sharedMemory);
 
603
                        if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
 
604
                                if (!create)
 
605
                                        return MS_OK;
 
606
 
 
607
                                sharedMemory = (PBMSSharedMemoryPtr) calloc(1, sizeof(PBMSSharedMemoryRec));
 
608
                                sharedMemory->sm_magic = MS_SHARED_MEMORY_MAGIC;
 
609
                                sharedMemory->sm_version = MS_SHARED_MEMORY_VERSION;
 
610
                                sharedMemory->sm_list_size = MS_ENGINE_LIST_SIZE;
 
611
 
 
612
                                r = lseek(tmp_f, 0, SEEK_SET);
 
613
                                if (r == -1) {
 
614
                                        close(tmp_f);
 
615
                                        return setOSResult(errno, "fseek", temp_file, result);
 
616
                                }
 
617
 
 
618
                                snprintf(buffer, 100, "%p", (void*) sharedMemory);
 
619
                                tfer = write(tmp_f, buffer, strlen(buffer));
 
620
                                if (tfer != (ssize_t) strlen(buffer)) {
 
621
                                        close(tmp_f);
 
622
                                        return setOSResult(errno, "write", temp_file, result);
 
623
                                }
 
624
                                r = fsync(tmp_f);
 
625
                                if (r == -1) {
 
626
                                        close(tmp_f);
 
627
                                        return setOSResult(errno, "fsync", temp_file, result);
 
628
                                }
 
629
                        }
 
630
                        else if (sharedMemory->sm_version != MS_SHARED_MEMORY_VERSION) {
 
631
                                close(tmp_f);
 
632
                                result->mr_code = -1000;
 
633
                                *result->mr_stack = 0;
 
634
                                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Shared memory version: ");          
 
635
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, sharedMemory->sm_version);           
 
636
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ", does not match engine shared memory version: ");          
 
637
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, MS_SHARED_MEMORY_VERSION);           
 
638
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ".");                
 
639
                                return MS_ERR_ENGINE;
 
640
                        }
 
641
                        close(tmp_f);
 
642
                        
 
643
                        // For backward compatability we need to create the old versions but we only need to read the current version.
 
644
                        if (create)
 
645
                                prefix++;
 
646
                        else
 
647
                                break;
 
648
                }
 
649
                return MS_OK;
 
650
        }
 
651
 
 
652
        void strcpy(size_t size, char *to, const char *from)
 
653
        {
 
654
                if (size > 0) {
 
655
                        size--;
 
656
                        while (*from && size--)
 
657
                                *to++ = *from++;
 
658
                        *to = 0;
 
659
                }
 
660
        }
 
661
 
 
662
        void strcat(size_t size, char *to, const char *from)
 
663
        {
 
664
                while (*to && size--) to++;
 
665
                strcpy(size, to, from);
 
666
        }
 
667
 
 
668
        void strcat(size_t size, char *to, int val)
 
669
        {
 
670
                char buffer[20];
 
671
 
 
672
                snprintf(buffer, 20, "%d", val);
 
673
                strcat(size, to, buffer);
 
674
        }
 
675
 
 
676
        int setOSResult(int err, const char *func, char *file, PBMSResultPtr result) {
 
677
                char *msg;
 
678
 
 
679
                result->mr_code = err;
 
680
                *result->mr_stack = 0;
 
681
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "System call ");             
 
682
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, func);               
 
683
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "() failed on ");            
 
684
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, file);               
 
685
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ": ");               
 
686
 
 
687
#ifdef XT_WIN
 
688
                if (FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, iMessage + strlen(iMessage), MS_RESULT_MESSAGE_SIZE - strlen(iMessage), NULL)) {
 
689
                        char *ptr;
 
690
 
 
691
                        ptr = &iMessage[strlen(iMessage)];
 
692
                        while (ptr-1 > err_msg) {
 
693
                                if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.')
 
694
                                        break;
 
695
                                ptr--;
 
696
                        }
 
697
                        *ptr = 0;
 
698
 
 
699
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
 
700
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
701
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
 
702
                        return MS_ERR_ENGINE;
 
703
                }
 
704
#endif
 
705
 
 
706
                msg = strerror(err);
 
707
                if (msg) {
 
708
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, msg);
 
709
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
 
710
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
711
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
 
712
                }
 
713
                else {
 
714
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Unknown OS error code ");
 
715
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
716
                }
 
717
 
 
718
                return MS_ERR_ENGINE;
 
719
        }
 
720
 
 
721
        void getTempFileName(char *temp_file, int buffer_size, const char * prefix, int pid)
 
722
        {
 
723
                snprintf(temp_file, buffer_size, "/tmp/%s%d", prefix,  pid);
 
724
        }
 
725
 
 
726
        bool startsWith(const char *cstr, const char *w_cstr)
 
727
        {
 
728
                while (*cstr && *w_cstr) {
 
729
                        if (*cstr != *w_cstr)
 
730
                                return false;
 
731
                        cstr++;
 
732
                        w_cstr++;
 
733
                }
 
734
                return *cstr || !*w_cstr;
 
735
        }
 
736
 
 
737
        void deleteTempFiles()
 
738
        {
 
739
                struct dirent   entry;
 
740
                struct dirent   *result;
 
741
                DIR                             *odir;
 
742
                int                             err;
 
743
                char                    temp_file[100];
 
744
 
 
745
                if (!(odir = opendir("/tmp/")))
 
746
                        return;
 
747
                err = readdir_r(odir, &entry, &result);
 
748
                while (!err && result) {
 
749
                        const char **prefix = temp_prefix;
 
750
                        
 
751
                        while (*prefix) {
 
752
                                if (startsWith(entry.d_name, *prefix)) {
 
753
                                        int pid = atoi(entry.d_name + strlen(*prefix));
 
754
                                        
 
755
                                        /* If the process does not exist: */
 
756
                                        if (kill(pid, 0) == -1 && errno == ESRCH) {
 
757
                                                getTempFileName(temp_file, 100, *prefix, pid);
 
758
                                                unlink(temp_file);
 
759
                                        }
 
760
                                }
 
761
                                prefix++;
 
762
                        }
 
763
                        
 
764
                        err = readdir_r(odir, &entry, &result);
 
765
                }
 
766
                closedir(odir);
 
767
        }
 
768
};
 
769
#endif // PBMS_API
 
770
 
 
771
/*
 
772
 * The following is a low level API for accessing blobs directly.
 
773
 */
 
774
 
 
775
 
 
776
/*
 
777
 * Any threads using the direct blob access API must first register them selves with the
 
778
 * blob streaming engine before using the blob access functions. This is done by calling
 
779
 * PBMSInitBlobStreamingThread(). Call PBMSDeinitBlobStreamingThread() after the thread is
 
780
 * done using the direct blob access API
 
781
 */
 
782
 
 
783
/* 
 
784
* PBMSInitBlobStreamingThread(): Returns a pointer to a blob streaming thread.
 
785
*/
 
786
extern void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result);
 
787
extern void PBMSDeinitBlobStreamingThread(void *v_bs_thread);
 
788
 
 
789
/* 
 
790
* PBMSGetError():Gets the last error reported by a blob streaming thread.
 
791
*/
 
792
extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
 
793
 
 
794
/* 
 
795
* PBMSCreateBlob():Creates a new blob in the database of the given size.
 
796
*/
 
797
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
 
798
 
 
799
/* 
 
800
* PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of 
 
801
* data written to the blob must match the size specified when the blob was created.
 
802
*/
 
803
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
 
804
 
 
805
/* 
 
806
* PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
 
807
*/
 
808
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
 
809
 
 
810
/*
 
811
* PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast  PBMS_BLOB_URL_SIZE bytes in size.
 
812
*/
 
813
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
 
814
 
 
815
/*
 
816
* PBMSIDToURL():Convert a blob URL to a blob ID.
 
817
*/
 
818
extern bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id);
 
819
#endif //DRIZZLED 
 
820
 
 
821
 
 
822
#endif //__PBMS_H__