~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/pbms.h

  • Committer: Daniel Nichter
  • Date: 2011-10-23 16:01:37 UTC
  • mto: This revision was merged to the branch mainline in revision 2448.
  • Revision ID: daniel@percona.com-20111023160137-7ac3blgz8z4tf8za
Add Administration Getting Started and Logging.  Capitalize SQL clause keywords.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2010 PrimeBase Technologies GmbH
 
2
 * All rights reserved.
 
3
 * 
 
4
 * Redistribution and use in source and binary forms, with or without 
 
5
 * modification, are permitted provided that the following conditions are met:
 
6
 * 
 
7
 *     * Redistributions of source code must retain the above copyright notice, 
 
8
 *              this list of conditions and the following disclaimer.
 
9
 *     * Redistributions in binary form must reproduce the above copyright notice, 
 
10
 *              this list of conditions and the following disclaimer in the documentation 
 
11
 *              and/or other materials provided with the distribution.
 
12
 *     * Neither the name of the "PrimeBase Technologies GmbH" nor the names of its 
 
13
 *              contributors may be used to endorse or promote products derived from this 
 
14
 *              software without specific prior written permission.
 
15
 * 
 
16
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 
 
17
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
 
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
 
19
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 
 
20
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 
 
21
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 
22
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
 
23
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 
24
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 
25
 * POSSIBILITY OF SUCH DAMAGE. 
 
26
 *  
 
27
 * PrimeBase Media Stream for MySQL and Drizzle
 
28
 *
 
29
 * Original author: Paul McCullagh
 
30
 * Continued development: Barry Leslie
 
31
 * H&G2JCtL
 
32
 *
 
33
 * 2007-06-01
 
34
 *
 
35
 * This file contains the BLOB streaming interface engines that
 
36
 * are streaming enabled.
 
37
 *
 
38
 */
 
39
#pragma once
 
40
#ifndef __PBMS_H__
 
41
#define __PBMS_H__
 
42
 
 
43
#include <stdio.h>
 
44
#include <sys/types.h>
 
45
#include <unistd.h>
 
46
#include <stdlib.h>
 
47
#include <fcntl.h>
 
48
#include <string.h>
 
49
#include <dirent.h>
 
50
#include <signal.h>
 
51
#include <ctype.h>
 
52
#include <errno.h>
 
53
#include <inttypes.h>
 
54
#include <stdint.h>
 
55
 
 
56
#ifdef USE_PRAGMA_INTERFACE
 
57
#pragma interface                       /* gcc class implementation */
 
58
#endif
 
59
 
 
60
 
 
61
#define MS_SHARED_MEMORY_MAGIC                  0x7E9A120C
 
62
#define MS_ENGINE_VERSION                               3
 
63
#define MS_CALLBACK_VERSION                             6
 
64
#define MS_SHARED_MEMORY_VERSION                2
 
65
#define MS_ENGINE_LIST_SIZE                             10
 
66
#define MS_TEMP_FILE_PREFIX                             "pbms_temp_"
 
67
 
 
68
#define MS_BLOB_HANDLE_SIZE                             300
 
69
 
 
70
#define SH_MASK                                                 ((S_IRUSR | S_IWUSR) | (S_IRGRP | S_IWGRP) | (S_IROTH))
 
71
 
 
72
#define MS_OK                                                   0
 
73
#define MS_ERR_ENGINE                                   1                                                       /* Internal engine error. */
 
74
#define MS_ERR_UNKNOWN_TABLE                    2                                                       /* Returned if the engine cannot open the given table. */
 
75
#define MS_ERR_NOT_FOUND                                3                                                       /* The BLOB cannot be found. */
 
76
#define MS_ERR_TABLE_LOCKED                             4                                                       /* Table is currently locked. */
 
77
#define MS_ERR_INCORRECT_URL                    5
 
78
#define MS_ERR_AUTH_FAILED                              6
 
79
#define MS_ERR_NOT_IMPLEMENTED                  7
 
80
#define MS_ERR_UNKNOWN_DB                               8
 
81
#define MS_ERR_REMOVING_REPO                    9
 
82
#define MS_ERR_DATABASE_DELETED                 10
 
83
#define MS_ERR_DUPLICATE                                11                                              /* Attempt to insert a duplicate key into a system table. */
 
84
#define MS_ERR_INVALID_RECORD                   12
 
85
#define MS_ERR_RECOVERY_IN_PROGRESS             13
 
86
#define MS_ERR_DUPLICATE_DB                             14
 
87
#define MS_ERR_DUPLICATE_DB_ID                  15
 
88
#define MS_ERR_INVALID_OPERATION                16
 
89
#define MS_ERR_MISSING_CLOUD_REFFERENCE 17
 
90
#define MS_ERR_SYSTAB_VERSION                   18
 
91
 
 
92
#define MS_LOCK_NONE                                    0
 
93
#define MS_LOCK_READONLY                                1
 
94
#define MS_LOCK_READ_WRITE                              2
 
95
 
 
96
#define PBMS_BLOB_URL_SIZE                              120
 
97
 
 
98
#define PBMS_FIELD_COL_SIZE                             128
 
99
#define PBMS_FIELD_COND_SIZE                    300
 
100
 
 
101
#define MS_RESULT_MESSAGE_SIZE                  300
 
102
#define MS_RESULT_STACK_SIZE                    200
 
103
 
 
104
typedef struct PBMSResultRec {
 
105
        uint8_t                         mr_had_blobs;                                                   /* A flag to indicate if the statement had any PBMS blobs. */
 
106
        int                                             mr_code;                                                                /* Engine specific error code. */ 
 
107
        char                                    mr_message[MS_RESULT_MESSAGE_SIZE];             /* Error message, required if non-zero return code. */
 
108
        char                                    mr_stack[MS_RESULT_STACK_SIZE];                 /* Trace information about where the error occurred. */
 
109
} PBMSResultRec, *PBMSResultPtr;
 
110
 
 
111
 
 
112
 
 
113
typedef struct PBMSBlobID {
 
114
        uint32_t                                bi_db_id;       
 
115
        uint64_t                                bi_blob_size;   
 
116
        uint64_t                                bi_blob_id;                             // or repo file offset if type = REPO
 
117
        uint64_t                                bi_blob_ref_id;                 
 
118
        uint32_t                                bi_tab_id;                              // or repo ID if type = REPO
 
119
        uint32_t                                bi_auth_code;
 
120
        uint32_t                                bi_blob_type;
 
121
} PBMSBlobIDRec, *PBMSBlobIDPtr;
 
122
 
 
123
 
 
124
typedef struct MSBlobURL {
 
125
        uint8_t                                 bu_type;
 
126
        uint32_t                                bu_db_id;
 
127
        uint32_t                                bu_tab_id;                              // or repo ID if type = REPO
 
128
        uint64_t                                bu_blob_id;                             // or repo file offset if type = REPO
 
129
        uint32_t                                bu_auth_code;
 
130
        uint32_t                                bu_server_id;
 
131
        uint64_t                                bu_blob_size;                   
 
132
        uint64_t                                bu_blob_ref_id;                 // Unique identifier of the blob reference
 
133
} MSBlobURLRec, *MSBlobURLPtr;
 
134
 
 
135
 
 
136
typedef struct PBMSBlobURL {
 
137
        char                                    bu_data[PBMS_BLOB_URL_SIZE];
 
138
} PBMSBlobURLRec, *PBMSBlobURLPtr;
 
139
 
 
140
typedef struct PBMSEngineRec {
 
141
        int                                             ms_version;                                                     /* MS_ENGINE_VERSION */
 
142
        int                                             ms_index;                                                       /* The index into the engine list. */
 
143
        int                                             ms_removing;                                            /* TRUE (1) if the engine is being removed. */
 
144
        int                                             ms_internal;                                            /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
 
145
        char                                    ms_engine_name[32];
 
146
        int                                             ms_has_transactions;                            /* TRUE (1) if the engine supports transactions. */
 
147
} PBMSEngineRec, *PBMSEnginePtr;
 
148
 
 
149
/*                      2       10              1                       10                      20                      10                              10                      20                              20
 
150
 * Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
 
151
 */
 
152
 
 
153
#ifndef  PRIu64
 
154
#define URL_FMT "~*%u%c%u-%llu-%x-%u-%llu-%llu"
 
155
#else
 
156
#define URL_FMT "~*%"PRIu32"%c%"PRIu32"-%"PRIu64"-%x-%"PRIu32"-%"PRIu64"-%"PRIu64""
 
157
#endif
 
158
#define MS_URL_TYPE_BLOB        '~'
 
159
#define MS_URL_TYPE_REPO        '_'
 
160
class PBMSBlobURLTools
 
161
{
 
162
        public:
 
163
        static bool couldBeURL(const char *blob_url, size_t size, MSBlobURLPtr blob)
 
164
        {
 
165
                if (blob_url && (size < PBMS_BLOB_URL_SIZE) && (size > 16)) {
 
166
                        MSBlobURLRec ignored_blob;
 
167
                        char    buffer[PBMS_BLOB_URL_SIZE+1];
 
168
                        char    junk[5];
 
169
                        int             scanned;
 
170
                        
 
171
                        if (!blob)
 
172
                                blob = &ignored_blob;
 
173
                        
 
174
                        junk[0] = 0;
 
175
                        
 
176
                        // There is no guarantee that the URL will be null terminated
 
177
                        // so always copy it into our own buffer to be safe.
 
178
                        memcpy(buffer, blob_url, size);
 
179
                        buffer[size] = 0;
 
180
                        blob_url = buffer;
 
181
                        
 
182
                        scanned = sscanf(blob_url, URL_FMT"%4s", 
 
183
                                &blob->bu_db_id, 
 
184
                                &blob->bu_type, 
 
185
                                &blob->bu_tab_id, 
 
186
                                &blob->bu_blob_id, 
 
187
                                &blob->bu_auth_code, 
 
188
                                &blob->bu_server_id, 
 
189
                                &blob->bu_blob_ref_id, 
 
190
                                &blob->bu_blob_size, 
 
191
                                junk);
 
192
                                
 
193
                        if ((scanned != 8) || (blob->bu_type != MS_URL_TYPE_BLOB && blob->bu_type != MS_URL_TYPE_REPO)) {// If junk is found at the end this will also result in an invalid URL. 
 
194
                                //printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
 
195
                                return false;
 
196
                        }
 
197
                
 
198
                        return true;
 
199
                }
 
200
                
 
201
                return false;
 
202
        }
 
203
        
 
204
        static bool couldBeURL(const char *blob_url, MSBlobURLPtr blob)
 
205
        {
 
206
                return couldBeURL(blob_url, strlen(blob_url), blob);
 
207
        }
 
208
        
 
209
        static void buildBlobURL(MSBlobURLPtr blob, PBMSBlobURLPtr url)
 
210
        {
 
211
                snprintf(url->bu_data, PBMS_BLOB_URL_SIZE, URL_FMT,     blob->bu_db_id, 
 
212
                                                                blob->bu_type, 
 
213
                                                                blob->bu_tab_id, 
 
214
                                                                blob->bu_blob_id, 
 
215
                                                                blob->bu_auth_code, 
 
216
                                                                blob->bu_server_id, 
 
217
                                                                blob->bu_blob_ref_id, 
 
218
                                                                blob->bu_blob_size);
 
219
        }
 
220
};
 
221
 
 
222
#ifndef DRIZZLED
 
223
/*
 
224
 * This function should never be called directly, it is called
 
225
 * by deregisterEngine() below.
 
226
 */
 
227
typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
 
228
 
 
229
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
 
230
 
 
231
/*
 
232
 * Call this function to store a BLOB in the repository the BLOB's
 
233
 * URL will be returned. The returned URL buffer is expected to be atleast 
 
234
 * PBMS_BLOB_URL_SIZE long.
 
235
 *
 
236
 * The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
 
237
 */
 
238
typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result);
 
239
 
 
240
/*
 
241
 * Call this function for each BLOB to be retained. When a BLOB is used, the 
 
242
 * URL may be changed. The returned URL buffer is expected to be atleast 
 
243
 * PBMS_BLOB_URL_SIZE long.
 
244
 *
 
245
 * The returned URL must be inserted into the row in place of the given
 
246
 * URL.
 
247
 */
 
248
typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
 
249
 
 
250
/*
 
251
 * If a row containing a BLOB is deleted, then the BLOBs in the
 
252
 * row must be released.
 
253
 *
 
254
 * Note: if a table is dropped, all the BLOBs referenced by the
 
255
 * table are automatically released.
 
256
 */
 
257
typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
 
258
 
 
259
typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
 
260
 
 
261
typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result);
 
262
 
 
263
typedef void (*ECCallCompleted)(bool built_in, bool ok);
 
264
 
 
265
typedef struct PBMSCallbacksRec {
 
266
        int                                             cb_version;                                                     /* MS_CALLBACK_VERSION */
 
267
        ECRegisterdFunc                 cb_register;
 
268
        ECDeregisterdFunc               cb_deregister;
 
269
        ECCreateBlobsFunc               cb_create_blob;
 
270
        ECRetainBlobsFunc               cb_retain_blob;
 
271
        ECReleaseBlobFunc               cb_release_blob;
 
272
        ECDropTable                             cb_drop_table;
 
273
        ECRenameTable                   cb_rename_table;
 
274
        ECCallCompleted                 cb_completed;
 
275
} PBMSCallbacksRec, *PBMSCallbacksPtr;
 
276
 
 
277
typedef struct PBMSSharedMemoryRec {
 
278
        int                                             sm_magic;                                                       /* MS_SHARED_MEMORY_MAGIC */
 
279
        int                                             sm_version;                                                     /* MS_SHARED_MEMORY_VERSION */
 
280
        volatile int                    sm_shutdown_lock;                                       /* "Cheap" lock for shutdown! */
 
281
        PBMSCallbacksPtr                sm_callbacks;
 
282
        int                                             sm_reserved1[20];
 
283
        void                                    *sm_reserved2[20];
 
284
        int                                             sm_list_size;
 
285
        int                                             sm_list_len;
 
286
        PBMSEnginePtr                   sm_engine_list[MS_ENGINE_LIST_SIZE];
 
287
} PBMSSharedMemoryRec, *PBMSSharedMemoryPtr;
 
288
 
 
289
#ifdef PBMS_API
 
290
 
 
291
class PBMS_API
 
292
{
 
293
private:
 
294
        const char *temp_prefix[3];
 
295
        bool built_in;
 
296
 
 
297
public:
 
298
        PBMS_API(): sharedMemory(NULL) { 
 
299
                int i = 0;
 
300
                temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
 
301
                temp_prefix[i++] = NULL;
 
302
                
 
303
        }
 
304
 
 
305
        ~PBMS_API() { }
 
306
 
 
307
        /*
 
308
         * This method is called by the PBMS engine during startup.
 
309
         */
 
310
        int PBMSStartup(PBMSCallbacksPtr callbacks, PBMSResultPtr result) {
 
311
                int err;
 
312
                
 
313
                deleteTempFiles();
 
314
                err = getSharedMemory(true, result);
 
315
                if (!err)
 
316
                        sharedMemory->sm_callbacks = callbacks;
 
317
                        
 
318
                return err;
 
319
        }
 
320
 
 
321
        /*
 
322
         * This method is called by the PBMS engine during startup.
 
323
         */
 
324
        void PBMSShutdown() {
 
325
                
 
326
                if (!sharedMemory)
 
327
                        return;
 
328
                        
 
329
                lock();
 
330
                sharedMemory->sm_callbacks = NULL;
 
331
 
 
332
                bool empty = true;
 
333
                for (int i=0; i<sharedMemory->sm_list_len && empty; i++) {
 
334
                        if (sharedMemory->sm_engine_list[i]) 
 
335
                                empty = false;
 
336
                }
 
337
 
 
338
                unlock();
 
339
                
 
340
                if (empty) 
 
341
                        removeSharedMemory();
 
342
        }
 
343
 
 
344
        /*
 
345
         * Register the engine with the Stream Daemon.
 
346
         */
 
347
        int registerEngine(PBMSEnginePtr the_engine, PBMSResultPtr result) {
 
348
                int err;
 
349
 
 
350
                deleteTempFiles();
 
351
 
 
352
                // The first engine to register creates the shared memory.
 
353
                if ((err = getSharedMemory(true, result)))
 
354
                        return err;
 
355
 
 
356
                lock();
 
357
                for (int i=0; i<sharedMemory->sm_list_size; i++) {
 
358
                        if (!sharedMemory->sm_engine_list[i]) {
 
359
                                PBMSEnginePtr engine;
 
360
                                engine = (PBMSEnginePtr) malloc(sizeof(PBMSEngineRec));
 
361
                                if (!engine) {
 
362
                                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Out of memory.");
 
363
                                        err = MS_ERR_ENGINE;
 
364
                                        goto done;
 
365
                                }
 
366
                                memcpy(engine, the_engine, sizeof(PBMSEngineRec));
 
367
                                
 
368
                                sharedMemory->sm_engine_list[i] = engine;
 
369
                                engine->ms_index = i;
 
370
                                if (i >= sharedMemory->sm_list_len)
 
371
                                        sharedMemory->sm_list_len = i+1;
 
372
                                if (sharedMemory->sm_callbacks)
 
373
                                        sharedMemory->sm_callbacks->cb_register(engine);
 
374
                                        
 
375
                                built_in = (engine->ms_internal == 1);
 
376
                                err =  MS_OK;
 
377
                                goto done;
 
378
                        }
 
379
                }
 
380
                
 
381
                result->mr_code = 15010;
 
382
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Too many BLOB streaming engines already registered");
 
383
                *result->mr_stack = 0;
 
384
                
 
385
                err = MS_ERR_ENGINE;
 
386
                
 
387
        done:
 
388
                unlock();
 
389
                return err;
 
390
        }
 
391
 
 
392
        void lock() {
 
393
                while (sharedMemory->sm_shutdown_lock)
 
394
                        usleep(10000);
 
395
                sharedMemory->sm_shutdown_lock++;
 
396
                while (sharedMemory->sm_shutdown_lock != 1) {
 
397
                        usleep(random() % 10000);
 
398
                        sharedMemory->sm_shutdown_lock--;
 
399
                        usleep(10000);
 
400
                        sharedMemory->sm_shutdown_lock++;
 
401
                }
 
402
        }
 
403
 
 
404
        void unlock() {
 
405
                sharedMemory->sm_shutdown_lock--;
 
406
        }
 
407
 
 
408
        void deregisterEngine(const char *engine_name) {
 
409
                PBMSResultRec result;
 
410
                int err;
 
411
 
 
412
                if ((err = getSharedMemory(false, &result)))
 
413
                        return;
 
414
 
 
415
                lock();
 
416
 
 
417
                bool empty = true;
 
418
                for (int i=0; i<sharedMemory->sm_list_len; i++) {
 
419
                        PBMSEnginePtr engine = sharedMemory->sm_engine_list[i];
 
420
                        if (engine) {                            
 
421
                                if (strcmp(engine->ms_engine_name, engine_name) == 0) {
 
422
                                        if (sharedMemory->sm_callbacks)
 
423
                                                sharedMemory->sm_callbacks->cb_deregister(engine);
 
424
                                        free(engine);
 
425
                                        sharedMemory->sm_engine_list[i] = NULL;
 
426
                                }
 
427
                                else
 
428
                                        empty = false;
 
429
                        }
 
430
                }
 
431
 
 
432
                unlock();
 
433
 
 
434
                if (empty) 
 
435
                        removeSharedMemory();
 
436
        }
 
437
 
 
438
        void removeSharedMemory() 
 
439
        {
 
440
                const char **prefix = temp_prefix;
 
441
                char    temp_file[100];
 
442
 
 
443
                // Do not remove the sharfed memory until after
 
444
                // the PBMS engine has shutdown.
 
445
                if (sharedMemory->sm_callbacks)
 
446
                        return;
 
447
                        
 
448
                sharedMemory->sm_magic = 0;
 
449
                free(sharedMemory);
 
450
                sharedMemory = NULL;
 
451
                
 
452
                while (*prefix) {
 
453
                        getTempFileName(temp_file, 100, *prefix, getpid());
 
454
                        unlink(temp_file);
 
455
                        prefix++;
 
456
                }
 
457
        }
 
458
        
 
459
        bool isPBMSLoaded()
 
460
        {
 
461
                PBMSResultRec result;
 
462
                if (getSharedMemory(false, &result))
 
463
                        return false;
 
464
                        
 
465
                return (sharedMemory->sm_callbacks != NULL);
 
466
        }
 
467
        
 
468
        int  retainBlob(const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
 
469
        {
 
470
                int err;
 
471
                char safe_url[PBMS_BLOB_URL_SIZE+1];
 
472
 
 
473
 
 
474
                if ((err = getSharedMemory(false, result)))
 
475
                        return err;
 
476
 
 
477
                if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL)) {
 
478
                
 
479
                        if (!sharedMemory->sm_callbacks)  {
 
480
                                ret_blob_url->bu_data[0] = 0;
 
481
                                return MS_OK;
 
482
                        }
 
483
                        err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, result);
 
484
                        if (err)
 
485
                                return err;
 
486
                                
 
487
                        blob_url = ret_blob_url->bu_data;
 
488
                } else {
 
489
                        // Make sure the url is a C string:
 
490
                        if (blob_url[blob_size]) {
 
491
                                memcpy(safe_url, blob_url, blob_size);
 
492
                                safe_url[blob_size] = 0;
 
493
                                blob_url = safe_url;
 
494
                        }
 
495
                }
 
496
                
 
497
 
 
498
                if (!sharedMemory->sm_callbacks) {
 
499
                        result->mr_code = MS_ERR_INCORRECT_URL;
 
500
                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming daemon (PBMS) not installed");
 
501
                        *result->mr_stack = 0;
 
502
                        return MS_ERR_INCORRECT_URL;
 
503
                }
 
504
 
 
505
                return sharedMemory->sm_callbacks->cb_retain_blob(built_in, db_name, tab_name, ret_blob_url, blob_url, col_index, result);
 
506
        }
 
507
 
 
508
        int releaseBlob(const char *db_name, const char *tab_name, char *blob_url, size_t blob_size, PBMSResultPtr result)
 
509
        {
 
510
                int err;
 
511
                char safe_url[PBMS_BLOB_URL_SIZE+1];
 
512
 
 
513
                if ((err = getSharedMemory(false, result)))
 
514
                        return err;
 
515
 
 
516
                if (!sharedMemory->sm_callbacks)
 
517
                        return MS_OK;
 
518
 
 
519
                if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL))
 
520
                        return MS_OK;
 
521
 
 
522
                if (blob_url[blob_size]) {
 
523
                        memcpy(safe_url, blob_url, blob_size);
 
524
                        safe_url[blob_size] = 0;
 
525
                        blob_url = safe_url;
 
526
                }
 
527
                
 
528
                return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
 
529
        }
 
530
 
 
531
        int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
 
532
        {
 
533
                int err;
 
534
 
 
535
                if ((err = getSharedMemory(false, result)))
 
536
                        return err;
 
537
 
 
538
                if (!sharedMemory->sm_callbacks)
 
539
                        return MS_OK;
 
540
                        
 
541
                return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
 
542
        }
 
543
 
 
544
        int renameTable(const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result)
 
545
        {
 
546
                int err;
 
547
 
 
548
                if ((err = getSharedMemory(false, result)))
 
549
                        return err;
 
550
 
 
551
                if (!sharedMemory->sm_callbacks)
 
552
                        return MS_OK;
 
553
                        
 
554
                return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_db, to_table, result);
 
555
        }
 
556
 
 
557
        void completed(int ok)
 
558
        {
 
559
                PBMSResultRec result;
 
560
 
 
561
                if (getSharedMemory(false, &result))
 
562
                        return;
 
563
 
 
564
                if (!sharedMemory->sm_callbacks)
 
565
                        return;
 
566
                        
 
567
                sharedMemory->sm_callbacks->cb_completed(built_in, ok);
 
568
        }
 
569
        
 
570
        volatile PBMSSharedMemoryPtr sharedMemory;
 
571
 
 
572
private:
 
573
        int getSharedMemory(bool create, PBMSResultPtr result)
 
574
        {
 
575
                int             tmp_f;
 
576
                int             r;
 
577
                char    temp_file[100];
 
578
                const char      **prefix = temp_prefix;
 
579
 
 
580
                if (sharedMemory)
 
581
                        return MS_OK;
 
582
 
 
583
                while (*prefix) {
 
584
                        getTempFileName(temp_file, 100, *prefix, getpid());
 
585
                        tmp_f = open(temp_file, O_RDWR | (create ? O_CREAT : 0), SH_MASK);
 
586
                        if (tmp_f == -1)
 
587
                                return setOSResult(errno, "open", temp_file, result);
 
588
 
 
589
                        r = lseek(tmp_f, 0, SEEK_SET);
 
590
                        if (r == -1) {
 
591
                                close(tmp_f);
 
592
                                return setOSResult(errno, "lseek", temp_file, result);
 
593
                        }
 
594
                        ssize_t tfer;
 
595
                        char buffer[100];
 
596
                        
 
597
                        tfer = read(tmp_f, buffer, 100);
 
598
                        if (tfer == -1) {
 
599
                                close(tmp_f);
 
600
                                return setOSResult(errno, "read", temp_file, result);
 
601
                        }
 
602
 
 
603
                        buffer[tfer] = 0;
 
604
                        sscanf(buffer, "%p", (void**) &sharedMemory);
 
605
                        if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
 
606
                                if (!create)
 
607
                                        return MS_OK;
 
608
 
 
609
                                sharedMemory = (PBMSSharedMemoryPtr) calloc(1, sizeof(PBMSSharedMemoryRec));
 
610
                                sharedMemory->sm_magic = MS_SHARED_MEMORY_MAGIC;
 
611
                                sharedMemory->sm_version = MS_SHARED_MEMORY_VERSION;
 
612
                                sharedMemory->sm_list_size = MS_ENGINE_LIST_SIZE;
 
613
 
 
614
                                r = lseek(tmp_f, 0, SEEK_SET);
 
615
                                if (r == -1) {
 
616
                                        close(tmp_f);
 
617
                                        return setOSResult(errno, "fseek", temp_file, result);
 
618
                                }
 
619
 
 
620
                                snprintf(buffer, 100, "%p", (void*) sharedMemory);
 
621
                                tfer = write(tmp_f, buffer, strlen(buffer));
 
622
                                if (tfer != (ssize_t) strlen(buffer)) {
 
623
                                        close(tmp_f);
 
624
                                        return setOSResult(errno, "write", temp_file, result);
 
625
                                }
 
626
                                r = fsync(tmp_f);
 
627
                                if (r == -1) {
 
628
                                        close(tmp_f);
 
629
                                        return setOSResult(errno, "fsync", temp_file, result);
 
630
                                }
 
631
                        }
 
632
                        else if (sharedMemory->sm_version != MS_SHARED_MEMORY_VERSION) {
 
633
                                close(tmp_f);
 
634
                                result->mr_code = -1000;
 
635
                                *result->mr_stack = 0;
 
636
                                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Shared memory version: ");          
 
637
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, sharedMemory->sm_version);           
 
638
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ", does not match engine shared memory version: ");          
 
639
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, MS_SHARED_MEMORY_VERSION);           
 
640
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ".");                
 
641
                                return MS_ERR_ENGINE;
 
642
                        }
 
643
                        close(tmp_f);
 
644
                        
 
645
                        // For backward compatability we need to create the old versions but we only need to read the current version.
 
646
                        if (create)
 
647
                                prefix++;
 
648
                        else
 
649
                                break;
 
650
                }
 
651
                return MS_OK;
 
652
        }
 
653
 
 
654
        void strcpy(size_t size, char *to, const char *from)
 
655
        {
 
656
                if (size > 0) {
 
657
                        size--;
 
658
                        while (*from && size--)
 
659
                                *to++ = *from++;
 
660
                        *to = 0;
 
661
                }
 
662
        }
 
663
 
 
664
        void strcat(size_t size, char *to, const char *from)
 
665
        {
 
666
                while (*to && size--) to++;
 
667
                strcpy(size, to, from);
 
668
        }
 
669
 
 
670
        void strcat(size_t size, char *to, int val)
 
671
        {
 
672
                char buffer[20];
 
673
 
 
674
                snprintf(buffer, 20, "%d", val);
 
675
                strcat(size, to, buffer);
 
676
        }
 
677
 
 
678
        int setOSResult(int err, const char *func, char *file, PBMSResultPtr result) {
 
679
                char *msg;
 
680
 
 
681
                result->mr_code = err;
 
682
                *result->mr_stack = 0;
 
683
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "System call ");             
 
684
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, func);               
 
685
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "() failed on ");            
 
686
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, file);               
 
687
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ": ");               
 
688
 
 
689
#ifdef XT_WIN
 
690
                if (FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, iMessage + strlen(iMessage), MS_RESULT_MESSAGE_SIZE - strlen(iMessage), NULL)) {
 
691
                        char *ptr;
 
692
 
 
693
                        ptr = &iMessage[strlen(iMessage)];
 
694
                        while (ptr-1 > err_msg) {
 
695
                                if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.')
 
696
                                        break;
 
697
                                ptr--;
 
698
                        }
 
699
                        *ptr = 0;
 
700
 
 
701
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
 
702
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
703
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
 
704
                        return MS_ERR_ENGINE;
 
705
                }
 
706
#endif
 
707
 
 
708
                msg = strerror(err);
 
709
                if (msg) {
 
710
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, msg);
 
711
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
 
712
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
713
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
 
714
                }
 
715
                else {
 
716
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Unknown OS error code ");
 
717
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
718
                }
 
719
 
 
720
                return MS_ERR_ENGINE;
 
721
        }
 
722
 
 
723
        void getTempFileName(char *temp_file, int buffer_size, const char * prefix, int pid)
 
724
        {
 
725
                snprintf(temp_file, buffer_size, "/tmp/%s%d", prefix,  pid);
 
726
        }
 
727
 
 
728
        bool startsWith(const char *cstr, const char *w_cstr)
 
729
        {
 
730
                while (*cstr && *w_cstr) {
 
731
                        if (*cstr != *w_cstr)
 
732
                                return false;
 
733
                        cstr++;
 
734
                        w_cstr++;
 
735
                }
 
736
                return *cstr || !*w_cstr;
 
737
        }
 
738
 
 
739
        void deleteTempFiles()
 
740
        {
 
741
                struct dirent   entry;
 
742
                struct dirent   *result;
 
743
                DIR                             *odir;
 
744
                int                             err;
 
745
                char                    temp_file[100];
 
746
 
 
747
                if (!(odir = opendir("/tmp/")))
 
748
                        return;
 
749
                err = readdir_r(odir, &entry, &result);
 
750
                while (!err && result) {
 
751
                        const char **prefix = temp_prefix;
 
752
                        
 
753
                        while (*prefix) {
 
754
                                if (startsWith(entry.d_name, *prefix)) {
 
755
                                        int pid = atoi(entry.d_name + strlen(*prefix));
 
756
                                        
 
757
                                        /* If the process does not exist: */
 
758
                                        if (kill(pid, 0) == -1 && errno == ESRCH) {
 
759
                                                getTempFileName(temp_file, 100, *prefix, pid);
 
760
                                                unlink(temp_file);
 
761
                                        }
 
762
                                }
 
763
                                prefix++;
 
764
                        }
 
765
                        
 
766
                        err = readdir_r(odir, &entry, &result);
 
767
                }
 
768
                closedir(odir);
 
769
        }
 
770
};
 
771
#endif // PBMS_API
 
772
 
 
773
/*
 
774
 * The following is a low level API for accessing blobs directly.
 
775
 */
 
776
 
 
777
 
 
778
/*
 
779
 * Any threads using the direct blob access API must first register them selves with the
 
780
 * blob streaming engine before using the blob access functions. This is done by calling
 
781
 * PBMSInitBlobStreamingThread(). Call PBMSDeinitBlobStreamingThread() after the thread is
 
782
 * done using the direct blob access API
 
783
 */
 
784
 
 
785
/* 
 
786
* PBMSInitBlobStreamingThread(): Returns a pointer to a blob streaming thread.
 
787
*/
 
788
extern void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result);
 
789
extern void PBMSDeinitBlobStreamingThread(void *v_bs_thread);
 
790
 
 
791
/* 
 
792
* PBMSGetError():Gets the last error reported by a blob streaming thread.
 
793
*/
 
794
extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
 
795
 
 
796
/* 
 
797
* PBMSCreateBlob():Creates a new blob in the database of the given size.
 
798
*/
 
799
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
 
800
 
 
801
/* 
 
802
* PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of 
 
803
* data written to the blob must match the size specified when the blob was created.
 
804
*/
 
805
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
 
806
 
 
807
/* 
 
808
* PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
 
809
*/
 
810
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
 
811
 
 
812
/*
 
813
* PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast  PBMS_BLOB_URL_SIZE bytes in size.
 
814
*/
 
815
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
 
816
 
 
817
/*
 
818
* PBMSIDToURL():Convert a blob URL to a blob ID.
 
819
*/
 
820
extern bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id);
 
821
#endif //DRIZZLED 
 
822
 
 
823
 
 
824
#endif //__PBMS_H__