~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/pbms.h

lp:drizzle + pbxt 1.1 + test results

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2007 PrimeBase Technologies GmbH
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 * H&G2JCtL
 
22
 *
 
23
 * 2007-06-01
 
24
 *
 
25
 * This file contains the BLOB streaming interface engines that
 
26
 * are streaming enabled.
 
27
 *
 
28
 */
 
29
#ifndef __streaming_unx_h__
 
30
#define __streaming_unx_h__
 
31
 
 
32
#include <stdio.h>
 
33
#include <sys/types.h>
 
34
#include <unistd.h>
 
35
#include <stdlib.h>
 
36
#include <fcntl.h>
 
37
#include <string.h>
 
38
#include <dirent.h>
 
39
#include <signal.h>
 
40
#include <ctype.h>
 
41
#include <errno.h>
 
42
 
 
43
 
 
44
#ifdef USE_PRAGMA_INTERFACE
 
45
#pragma interface                       /* gcc class implementation */
 
46
#endif
 
47
 
 
48
/*                      2       10              1                       10                      20                      10                              10                      20                              20
 
49
 * Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
 
50
 */
 
51
//If URL_FMT changes do not forget to update couldBeURL() in this file.
 
52
 
 
53
#define URL_FMT "~*%lu%c%lu-%llu-%lx-%lu-%llu-%llu"
 
54
 
 
55
#define MS_SHARED_MEMORY_MAGIC                  0x7E9A120C
 
56
#define MS_ENGINE_VERSION                               1
 
57
#define MS_CALLBACK_VERSION                             4
 
58
#define MS_SHARED_MEMORY_VERSION                2
 
59
#define MS_ENGINE_LIST_SIZE                             10
 
60
#define MS_TEMP_FILE_PREFIX                             "pbms_temp_"
 
61
 
 
62
#define MS_BLOB_HANDLE_SIZE                             300
 
63
 
 
64
#define SH_MASK                                                 ((S_IRUSR | S_IWUSR) | (S_IRGRP | S_IWGRP) | (S_IROTH))
 
65
 
 
66
#define MS_OK                                                   0
 
67
#define MS_ERR_ENGINE                                   1                                                       /* Internal engine error. */
 
68
#define MS_ERR_UNKNOWN_TABLE                    2                                                       /* Returned if the engine cannot open the given table. */
 
69
#define MS_ERR_NOT_FOUND                                3                                                       /* The BLOB cannot be found. */
 
70
#define MS_ERR_TABLE_LOCKED                             4                                                       /* Table is currently locked. */
 
71
#define MS_ERR_INCORRECT_URL                    5
 
72
#define MS_ERR_AUTH_FAILED                              6
 
73
#define MS_ERR_NOT_IMPLEMENTED                  7
 
74
#define MS_ERR_UNKNOWN_DB                               8
 
75
#define MS_ERR_REMOVING_REPO                    9
 
76
#define MS_ERR_DATABASE_DELETED                 10
 
77
#define MS_ERR_DUPLICATE                                11                                              /* Attempt to insert a duplicate key into a system table. */
 
78
#define MS_ERR_INVALID_RECORD                   12
 
79
#define MS_ERR_RECOVERY_IN_PROGRESS             13
 
80
#define MS_ERR_DUPLICATE_DB                             14
 
81
#define MS_ERR_DUPLICATE_DB_ID                  15
 
82
#define MS_ERR_INVALID_OPERATION                16
 
83
 
 
84
#define MS_LOCK_NONE                                    0
 
85
#define MS_LOCK_READONLY                                1
 
86
#define MS_LOCK_READ_WRITE                              2
 
87
 
 
88
#define PBMS_BLOB_URL_SIZE                              120
 
89
 
 
90
#define PBMS_FIELD_COL_SIZE                             128
 
91
#define PBMS_FIELD_COND_SIZE                    300
 
92
 
 
93
#define MS_RESULT_MESSAGE_SIZE                  300
 
94
#define MS_RESULT_STACK_SIZE                    200
 
95
 
 
96
typedef struct PBMSResultRec {
 
97
        int                                             mr_code;                                                                /* Engine specific error code. */ 
 
98
        char                                    mr_message[MS_RESULT_MESSAGE_SIZE];             /* Error message, required if non-zero return code. */
 
99
        char                                    mr_stack[MS_RESULT_STACK_SIZE];                 /* Trace information about where the error occurred. */
 
100
} PBMSResultRec, *PBMSResultPtr;
 
101
 
 
102
 
 
103
 
 
104
typedef struct PBMSBlobID {
 
105
        u_int32_t                               bi_db_id;       
 
106
        u_int64_t                               bi_blob_size;   
 
107
        u_int64_t                               bi_blob_id;                             // or repo file offset if type = REPO
 
108
        u_int64_t                               bi_blob_ref_id;                 
 
109
        u_int32_t                               bi_tab_id;                              // or repo ID if type = REPO
 
110
        u_int32_t                               bi_auth_code;
 
111
        u_int32_t                               bi_blob_type;
 
112
} PBMSBlobIDRec, *PBMSBlobIDPtr;
 
113
 
 
114
typedef struct PBMSBlobURL {
 
115
        char                                    bu_data[PBMS_BLOB_URL_SIZE];
 
116
} PBMSBlobURLRec, *PBMSBlobURLPtr;
 
117
 
 
118
typedef struct PBMSEngineRec {
 
119
        int                                             ms_version;                                                     /* MS_ENGINE_VERSION */
 
120
        int                                             ms_index;                                                       /* The index into the engine list. */
 
121
        int                                             ms_removing;                                            /* TRUE (1) if the engine is being removed. */
 
122
        int                                             ms_internal;                                            /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
 
123
        char                                    ms_engine_name[32];
 
124
} PBMSEngineRec, *PBMSEnginePtr;
 
125
 
 
126
/*
 
127
 * This function should never be called directly, it is called
 
128
 * by deregisterEngine() below.
 
129
 */
 
130
typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
 
131
 
 
132
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
 
133
 
 
134
/*
 
135
 * Call this function to store a BLOB in the repository the BLOB's
 
136
 * URL will be returned. The returned URL buffer is expected to be atleast 
 
137
 * PBMS_BLOB_URL_SIZE long.
 
138
 *
 
139
 * The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
 
140
 */
 
141
typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, char *blob_url, unsigned short col_index, PBMSResultPtr result);
 
142
 
 
143
/*
 
144
 * Call this function for each BLOB to be retained. When a BLOB is used, the 
 
145
 * URL may be changed. The returned URL buffer is expected to be atleast 
 
146
 * PBMS_BLOB_URL_SIZE long.
 
147
 *
 
148
 * The returned URL must be inserted into the row in place of the given
 
149
 * URL.
 
150
 */
 
151
typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
 
152
 
 
153
/*
 
154
 * If a row containing a BLOB is deleted, then the BLOBs in the
 
155
 * row must be released.
 
156
 *
 
157
 * Note: if a table is dropped, all the BLOBs referenced by the
 
158
 * table are automatically released.
 
159
 */
 
160
typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
 
161
 
 
162
typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
 
163
 
 
164
typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result);
 
165
 
 
166
typedef void (*ECCallCompleted)(bool built_in, bool ok);
 
167
 
 
168
typedef struct PBMSCallbacksRec {
 
169
        int                                             cb_version;                                                     /* MS_CALLBACK_VERSION */
 
170
        ECRegisterdFunc                 cb_register;
 
171
        ECDeregisterdFunc               cb_deregister;
 
172
        ECCreateBlobsFunc               cb_create_blob;
 
173
        ECRetainBlobsFunc               cb_retain_blob;
 
174
        ECReleaseBlobFunc               cb_release_blob;
 
175
        ECDropTable                             cb_drop_table;
 
176
        ECRenameTable                   cb_rename_table;
 
177
        ECCallCompleted                 cb_completed;
 
178
} PBMSCallbacksRec, *PBMSCallbacksPtr;
 
179
 
 
180
typedef struct PBMSSharedMemoryRec {
 
181
        int                                             sm_magic;                                                       /* MS_SHARED_MEMORY_MAGIC */
 
182
        int                                             sm_version;                                                     /* MS_SHARED_MEMORY_VERSION */
 
183
        volatile int                    sm_shutdown_lock;                                       /* "Cheap" lock for shutdown! */
 
184
        PBMSCallbacksPtr                sm_callbacks;
 
185
        int                                             sm_reserved1[20];
 
186
        void                                    *sm_reserved2[20];
 
187
        int                                             sm_list_size;
 
188
        int                                             sm_list_len;
 
189
        PBMSEnginePtr                   sm_engine_list[MS_ENGINE_LIST_SIZE];
 
190
} PBMSSharedMemoryRec, *PBMSSharedMemoryPtr;
 
191
 
 
192
#ifdef PBMS_API
 
193
 
 
194
class PBMS_API
 
195
{
 
196
private:
 
197
        const char *temp_prefix[3];
 
198
        bool built_in;
 
199
 
 
200
public:
 
201
        PBMS_API(): sharedMemory(NULL) { 
 
202
                int i = 0;
 
203
                temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
 
204
                temp_prefix[i++] = NULL;
 
205
                
 
206
        }
 
207
 
 
208
        ~PBMS_API() { }
 
209
 
 
210
        /*
 
211
         * This method is called by the PBMS engine during startup.
 
212
         */
 
213
        int PBMSStartup(PBMSCallbacksPtr callbacks, PBMSResultPtr result) {
 
214
                int err;
 
215
                
 
216
                deleteTempFiles();
 
217
                err = getSharedMemory(true, result);
 
218
                if (!err)
 
219
                        sharedMemory->sm_callbacks = callbacks;
 
220
                        
 
221
                return err;
 
222
        }
 
223
 
 
224
        /*
 
225
         * This method is called by the PBMS engine during startup.
 
226
         */
 
227
        void PBMSShutdown() {
 
228
                
 
229
                if (!sharedMemory)
 
230
                        return;
 
231
                        
 
232
                lock();
 
233
                sharedMemory->sm_callbacks = NULL;
 
234
 
 
235
                bool empty = true;
 
236
                for (int i=0; i<sharedMemory->sm_list_len && empty; i++) {
 
237
                        if (sharedMemory->sm_engine_list[i]) 
 
238
                                empty = false;
 
239
                }
 
240
 
 
241
                unlock();
 
242
                
 
243
                if (empty) 
 
244
                        removeSharedMemory();
 
245
        }
 
246
 
 
247
        /*
 
248
         * Register the engine with the Stream Engine.
 
249
         */
 
250
        int registerEngine(PBMSEnginePtr engine, PBMSResultPtr result) {
 
251
                int err;
 
252
 
 
253
                deleteTempFiles();
 
254
 
 
255
                // The first engine to register creates the shared memory.
 
256
                if ((err = getSharedMemory(true, result)))
 
257
                        return err;
 
258
 
 
259
                for (int i=0; i<sharedMemory->sm_list_size; i++) {
 
260
                        if (!sharedMemory->sm_engine_list[i]) {
 
261
                                sharedMemory->sm_engine_list[i] = engine;
 
262
                                engine->ms_index = i;
 
263
                                if (i >= sharedMemory->sm_list_len)
 
264
                                        sharedMemory->sm_list_len = i+1;
 
265
                                if (sharedMemory->sm_callbacks)
 
266
                                        sharedMemory->sm_callbacks->cb_register(engine);
 
267
                                        
 
268
                                built_in = (engine->ms_internal == 1);
 
269
                                return MS_OK;
 
270
                        }
 
271
                }
 
272
                
 
273
                result->mr_code = 15010;
 
274
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Too many BLOB streaming engines already registered");
 
275
                *result->mr_stack = 0;
 
276
                return MS_ERR_ENGINE;
 
277
        }
 
278
 
 
279
        void lock() {
 
280
                while (sharedMemory->sm_shutdown_lock)
 
281
                        usleep(10000);
 
282
                sharedMemory->sm_shutdown_lock++;
 
283
                while (sharedMemory->sm_shutdown_lock != 1) {
 
284
                        usleep(random() % 10000);
 
285
                        sharedMemory->sm_shutdown_lock--;
 
286
                        usleep(10000);
 
287
                        sharedMemory->sm_shutdown_lock++;
 
288
                }
 
289
        }
 
290
 
 
291
        void unlock() {
 
292
                sharedMemory->sm_shutdown_lock--;
 
293
        }
 
294
 
 
295
        void deregisterEngine(PBMSEnginePtr engine) {
 
296
                PBMSResultRec result;
 
297
                int err;
 
298
 
 
299
                if ((err = getSharedMemory(false, &result)))
 
300
                        return;
 
301
 
 
302
                lock();
 
303
 
 
304
                bool empty = true;
 
305
                for (int i=0; i<sharedMemory->sm_list_len; i++) {
 
306
                        if (sharedMemory->sm_engine_list[i]) {
 
307
                                if (sharedMemory->sm_engine_list[i] == engine) {
 
308
                                        if (sharedMemory->sm_callbacks)
 
309
                                                sharedMemory->sm_callbacks->cb_deregister(engine);
 
310
                                        sharedMemory->sm_engine_list[i] = NULL;
 
311
                                }
 
312
                                else
 
313
                                        empty = false;
 
314
                        }
 
315
                }
 
316
 
 
317
                unlock();
 
318
 
 
319
                if (empty) 
 
320
                        removeSharedMemory();
 
321
        }
 
322
 
 
323
        void removeSharedMemory() 
 
324
        {
 
325
                const char **prefix = temp_prefix;
 
326
                char    temp_file[100];
 
327
 
 
328
                // Do not remove the sharfed memory until after
 
329
                // the PBMS engine has shutdown.
 
330
                if (sharedMemory->sm_callbacks)
 
331
                        return;
 
332
                        
 
333
                sharedMemory->sm_magic = 0;
 
334
                free(sharedMemory);
 
335
                sharedMemory = NULL;
 
336
                
 
337
                while (*prefix) {
 
338
                        getTempFileName(temp_file, *prefix, getpid());
 
339
                        unlink(temp_file);
 
340
                        prefix++;
 
341
                }
 
342
        }
 
343
        
 
344
        int couldBeURL(char *blob_url, int size)
 
345
        {
 
346
                if (blob_url && (size < PBMS_BLOB_URL_SIZE)) {
 
347
                        char                            buffer[PBMS_BLOB_URL_SIZE+1];
 
348
                        unsigned long           db_id = 0;
 
349
                        unsigned long           tab_id = 0;
 
350
                        unsigned long long      blob_id = 0;
 
351
                        unsigned long long      blob_ref_id = 0;
 
352
                        unsigned long long      blob_size = 0;
 
353
                        unsigned long           auth_code = 0;
 
354
                        unsigned long           server_id = 0;
 
355
                        char                            type, junk[5];
 
356
                        int                                     scanned;
 
357
 
 
358
                        junk[0] = 0;
 
359
                        if (blob_url[size]) { // There is no guarantee that the URL will be null terminated.
 
360
                                memcpy(buffer, blob_url, size);
 
361
                                buffer[size] = 0;
 
362
                                blob_url = buffer;
 
363
                        }
 
364
                        
 
365
                        scanned = sscanf(blob_url, URL_FMT"%4s", &db_id, &type, &tab_id, &blob_id, &auth_code, &server_id, &blob_ref_id, &blob_size, junk);
 
366
                        if (scanned != 8) {// If junk is found at the end this will also result in an invalid URL. 
 
367
                                printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
 
368
                                return 0;
 
369
                        }
 
370
                        
 
371
                        if (junk[0] || (type != '~' && type != '_')) {
 
372
                                printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
 
373
                                return 0;
 
374
                        }
 
375
                
 
376
                        return 1;
 
377
                }
 
378
                
 
379
                return 0;
 
380
        }
 
381
        
 
382
        int  retainBlob(const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
 
383
        {
 
384
                int err;
 
385
                char safe_url[PBMS_BLOB_URL_SIZE+1];
 
386
 
 
387
 
 
388
                if ((err = getSharedMemory(false, result)))
 
389
                        return err;
 
390
 
 
391
                if (!couldBeURL(blob_url, blob_size)) {
 
392
                
 
393
                        if (!sharedMemory->sm_callbacks)  {
 
394
                                *ret_blob_url = 0;
 
395
                                return MS_OK;
 
396
                        }
 
397
                        err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, col_index, result);
 
398
                        if (err)
 
399
                                return err;
 
400
                                
 
401
                        blob_url = ret_blob_url;
 
402
                } else {
 
403
                        // Make sure the url is a C string:
 
404
                        if (blob_url[blob_size]) {
 
405
                                memcpy(safe_url, blob_url, blob_size);
 
406
                                safe_url[blob_size] = 0;
 
407
                                blob_url = safe_url;
 
408
                        }
 
409
                }
 
410
                
 
411
 
 
412
                if (!sharedMemory->sm_callbacks) {
 
413
                        result->mr_code = MS_ERR_INCORRECT_URL;
 
414
                        strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming engine (PBMS) not installed");
 
415
                        *result->mr_stack = 0;
 
416
                        return MS_ERR_INCORRECT_URL;
 
417
                }
 
418
 
 
419
                return sharedMemory->sm_callbacks->cb_retain_blob(built_in, db_name, tab_name, ret_blob_url, blob_url, col_index, result);
 
420
        }
 
421
 
 
422
        int releaseBlob(const char *db_name, const char *tab_name, char *blob_url, size_t blob_size, PBMSResultPtr result)
 
423
        {
 
424
                int err;
 
425
                char safe_url[PBMS_BLOB_URL_SIZE+1];
 
426
 
 
427
                if ((err = getSharedMemory(false, result)))
 
428
                        return err;
 
429
 
 
430
                if (!sharedMemory->sm_callbacks)
 
431
                        return MS_OK;
 
432
 
 
433
                if (!couldBeURL(blob_url, blob_size))
 
434
                        return MS_OK;
 
435
 
 
436
                if (blob_url[blob_size]) {
 
437
                        memcpy(safe_url, blob_url, blob_size);
 
438
                        safe_url[blob_size] = 0;
 
439
                        blob_url = safe_url;
 
440
                }
 
441
                
 
442
                return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
 
443
        }
 
444
 
 
445
        int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
 
446
        {
 
447
                int err;
 
448
 
 
449
                if ((err = getSharedMemory(false, result)))
 
450
                        return err;
 
451
 
 
452
                if (!sharedMemory->sm_callbacks)
 
453
                        return MS_OK;
 
454
                        
 
455
                return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
 
456
        }
 
457
 
 
458
        int renameTable(const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result)
 
459
        {
 
460
                int err;
 
461
 
 
462
                if ((err = getSharedMemory(false, result)))
 
463
                        return err;
 
464
 
 
465
                if (!sharedMemory->sm_callbacks)
 
466
                        return MS_OK;
 
467
                        
 
468
                return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_table, result);
 
469
        }
 
470
 
 
471
        void completed(int ok)
 
472
        {
 
473
                PBMSResultRec result;
 
474
 
 
475
                if (getSharedMemory(false, &result))
 
476
                        return;
 
477
 
 
478
                if (!sharedMemory->sm_callbacks)
 
479
                        return;
 
480
                        
 
481
                sharedMemory->sm_callbacks->cb_completed(built_in, ok);
 
482
        }
 
483
        
 
484
        volatile PBMSSharedMemoryPtr sharedMemory;
 
485
 
 
486
private:
 
487
        int getSharedMemory(bool create, PBMSResultPtr result)
 
488
        {
 
489
                int             tmp_f;
 
490
                int             r;
 
491
                char    temp_file[100];
 
492
                const char      **prefix = temp_prefix;
 
493
 
 
494
                if (sharedMemory)
 
495
                        return MS_OK;
 
496
 
 
497
                while (*prefix) {
 
498
                        getTempFileName(temp_file, *prefix, getpid());
 
499
                        tmp_f = open(temp_file, O_RDWR | (create ? O_CREAT : 0), SH_MASK);
 
500
                        if (tmp_f == -1)
 
501
                                return setOSResult(errno, "open", temp_file, result);
 
502
 
 
503
                        r = lseek(tmp_f, 0, SEEK_SET);
 
504
                        if (r == -1) {
 
505
                                close(tmp_f);
 
506
                                return setOSResult(errno, "lseek", temp_file, result);
 
507
                        }
 
508
                        ssize_t tfer;
 
509
                        char buffer[100];
 
510
                        
 
511
                        tfer = read(tmp_f, buffer, 100);
 
512
                        if (tfer == -1) {
 
513
                                close(tmp_f);
 
514
                                return setOSResult(errno, "read", temp_file, result);
 
515
                        }
 
516
 
 
517
                        buffer[tfer] = 0;
 
518
                        sscanf(buffer, "%p", &sharedMemory);
 
519
                        if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
 
520
                                if (!create)
 
521
                                        return MS_OK;
 
522
 
 
523
                                sharedMemory = (PBMSSharedMemoryPtr) calloc(1, sizeof(PBMSSharedMemoryRec));
 
524
                                sharedMemory->sm_magic = MS_SHARED_MEMORY_MAGIC;
 
525
                                sharedMemory->sm_version = MS_SHARED_MEMORY_VERSION;
 
526
                                sharedMemory->sm_list_size = MS_ENGINE_LIST_SIZE;
 
527
 
 
528
                                r = lseek(tmp_f, 0, SEEK_SET);
 
529
                                if (r == -1) {
 
530
                                        close(tmp_f);
 
531
                                        return setOSResult(errno, "fseek", temp_file, result);
 
532
                                }
 
533
 
 
534
                                sprintf(buffer, "%p", sharedMemory);
 
535
                                tfer = write(tmp_f, buffer, strlen(buffer));
 
536
                                if (tfer != strlen(buffer)) {
 
537
                                        close(tmp_f);
 
538
                                        return setOSResult(errno, "write", temp_file, result);
 
539
                                }
 
540
                                r = fsync(tmp_f);
 
541
                                if (r == -1) {
 
542
                                        close(tmp_f);
 
543
                                        return setOSResult(errno, "fsync", temp_file, result);
 
544
                                }
 
545
                        }
 
546
                        else if (sharedMemory->sm_version != MS_SHARED_MEMORY_VERSION) {
 
547
                                close(tmp_f);
 
548
                                result->mr_code = -1000;
 
549
                                *result->mr_stack = 0;
 
550
                                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Shared memory version: ");          
 
551
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, sharedMemory->sm_version);           
 
552
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ", does not match engine shared memory version: ");          
 
553
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, MS_SHARED_MEMORY_VERSION);           
 
554
                                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ".");                
 
555
                                return MS_ERR_ENGINE;
 
556
                        }
 
557
                        close(tmp_f);
 
558
                        
 
559
                        // For backward compatability we need to create the old versions but we only need to read the current version.
 
560
                        if (create)
 
561
                                prefix++;
 
562
                        else
 
563
                                break;
 
564
                }
 
565
                return MS_OK;
 
566
        }
 
567
 
 
568
        void strcpy(size_t size, char *to, const char *from)
 
569
        {
 
570
                if (size > 0) {
 
571
                        size--;
 
572
                        while (*from && size--)
 
573
                                *to++ = *from++;
 
574
                        *to = 0;
 
575
                }
 
576
        }
 
577
 
 
578
        void strcat(size_t size, char *to, const char *from)
 
579
        {
 
580
                while (*to && size--) to++;
 
581
                strcpy(size, to, from);
 
582
        }
 
583
 
 
584
        void strcat(size_t size, char *to, int val)
 
585
        {
 
586
                char buffer[100];
 
587
 
 
588
                sprintf(buffer, "%d", val);
 
589
                strcat(size, to, buffer);
 
590
        }
 
591
 
 
592
        int setOSResult(int err, const char *func, char *file, PBMSResultPtr result) {
 
593
                char *msg;
 
594
 
 
595
                result->mr_code = err;
 
596
                *result->mr_stack = 0;
 
597
                strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "System call ");             
 
598
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, func);               
 
599
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "() failed on ");            
 
600
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, file);               
 
601
                strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ": ");               
 
602
 
 
603
#ifdef XT_WIN
 
604
                if (FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, iMessage + strlen(iMessage), MS_RESULT_MESSAGE_SIZE - strlen(iMessage), NULL)) {
 
605
                        char *ptr;
 
606
 
 
607
                        ptr = &iMessage[strlen(iMessage)];
 
608
                        while (ptr-1 > err_msg) {
 
609
                                if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.')
 
610
                                        break;
 
611
                                ptr--;
 
612
                        }
 
613
                        *ptr = 0;
 
614
 
 
615
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
 
616
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
617
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
 
618
                        return MS_ERR_ENGINE;
 
619
                }
 
620
#endif
 
621
 
 
622
                msg = strerror(err);
 
623
                if (msg) {
 
624
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, msg);
 
625
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
 
626
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
627
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
 
628
                }
 
629
                else {
 
630
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Unknown OS error code ");
 
631
                        strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
 
632
                }
 
633
 
 
634
                return MS_ERR_ENGINE;
 
635
        }
 
636
 
 
637
        void getTempFileName(char *temp_file, const char * prefix, int pid)
 
638
        {
 
639
                sprintf(temp_file, "/tmp/%s%d", prefix,  pid);
 
640
        }
 
641
 
 
642
        bool startsWith(const char *cstr, const char *w_cstr)
 
643
        {
 
644
                while (*cstr && *w_cstr) {
 
645
                        if (*cstr != *w_cstr)
 
646
                                return false;
 
647
                        cstr++;
 
648
                        w_cstr++;
 
649
                }
 
650
                return *cstr || !*w_cstr;
 
651
        }
 
652
 
 
653
        void deleteTempFiles()
 
654
        {
 
655
                struct dirent   *entry;
 
656
                struct dirent   *result;
 
657
                DIR                             *odir;
 
658
                int                             err;
 
659
                size_t                  sz;
 
660
                char                    temp_file[100];
 
661
 
 
662
#ifdef __sun
 
663
                sz = sizeof(struct dirent) + pathconf("/tmp/", _PC_NAME_MAX); // Solaris, see readdir(3C)
 
664
#else
 
665
                sz = sizeof(struct dirent);
 
666
#endif
 
667
                if (!(entry = (struct dirent *) malloc(sz)))
 
668
                        return;
 
669
                if (!(odir = opendir("/tmp/")))
 
670
                        return;
 
671
                err = readdir_r(odir, entry, &result);
 
672
                while (!err && result) {
 
673
                        const char **prefix = temp_prefix;
 
674
                        
 
675
                        while (*prefix) {
 
676
                                if (startsWith(entry->d_name, *prefix)) {
 
677
                                        int pid = atoi(entry->d_name + strlen(*prefix));
 
678
                                        
 
679
                                        /* If the process does not exist: */
 
680
                                        if (kill(pid, 0) == -1 && errno == ESRCH) {
 
681
                                                getTempFileName(temp_file, *prefix, pid);
 
682
                                                unlink(temp_file);
 
683
                                        }
 
684
                                }
 
685
                                prefix++;
 
686
                        }
 
687
                        
 
688
                        err = readdir_r(odir, entry, &result);
 
689
                }
 
690
                closedir(odir);
 
691
                free(entry);
 
692
        }
 
693
};
 
694
#endif // PBMS_API
 
695
 
 
696
/*
 
697
 * The following is a low level API for accessing blobs directly.
 
698
 */
 
699
 
 
700
 
 
701
/*
 
702
 * Any threads using the direct blob access API must first register them selves with the
 
703
 * blob streaming engine before using the blob access functions. This is done by calling
 
704
 * PBMSInitBlobStreamingThread(). Call PBMSDeinitBlobStreamingThread() after the thread is
 
705
 * done using the direct blob access API
 
706
 */
 
707
 
 
708
/* 
 
709
* PBMSInitBlobStreamingThread(): Returns a pointer to a blob streaming thread.
 
710
*/
 
711
extern void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result);
 
712
extern void PBMSDeinitBlobStreamingThread(void *v_bs_thread);
 
713
 
 
714
/* 
 
715
* PBMSGetError():Gets the last error reported by a blob streaming thread.
 
716
*/
 
717
extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
 
718
 
 
719
/* 
 
720
* PBMSCreateBlob():Creates a new blob in the database of the given size.
 
721
*/
 
722
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
 
723
 
 
724
/* 
 
725
* PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of 
 
726
* data written to the blob must match the size specified when the blob was created.
 
727
*/
 
728
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
 
729
 
 
730
/* 
 
731
* PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
 
732
*/
 
733
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
 
734
 
 
735
/*
 
736
* PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast  PBMS_BLOB_URL_SIZE bytes in size.
 
737
*/
 
738
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
 
739
 
 
740
/*
 
741
* PBMSIDToURL():Convert a blob URL to a blob ID.
 
742
*/
 
743
extern bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id);
 
744
 
 
745
#endif