1
/* Copyright (c) 2007 PrimeBase Technologies GmbH
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
25
* This file contains the BLOB streaming interface engines that
26
* are streaming enabled.
29
#ifndef __streaming_unx_h__
30
#define __streaming_unx_h__
33
#include <sys/types.h>
44
#ifdef USE_PRAGMA_INTERFACE
45
#pragma interface /* gcc class implementation */
48
/* 2 10 1 10 20 10 10 20 20
49
* Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
51
//If URL_FMT changes do not forget to update couldBeURL() in this file.
53
#define URL_FMT "~*%lu%c%lu-%llu-%lx-%lu-%llu-%llu"
55
#define MS_SHARED_MEMORY_MAGIC 0x7E9A120C
56
#define MS_ENGINE_VERSION 1
57
#define MS_CALLBACK_VERSION 4
58
#define MS_SHARED_MEMORY_VERSION 2
59
#define MS_ENGINE_LIST_SIZE 10
60
#define MS_TEMP_FILE_PREFIX "pbms_temp_"
62
#define MS_BLOB_HANDLE_SIZE 300
64
#define SH_MASK ((S_IRUSR | S_IWUSR) | (S_IRGRP | S_IWGRP) | (S_IROTH))
67
#define MS_ERR_ENGINE 1 /* Internal engine error. */
68
#define MS_ERR_UNKNOWN_TABLE 2 /* Returned if the engine cannot open the given table. */
69
#define MS_ERR_NOT_FOUND 3 /* The BLOB cannot be found. */
70
#define MS_ERR_TABLE_LOCKED 4 /* Table is currently locked. */
71
#define MS_ERR_INCORRECT_URL 5
72
#define MS_ERR_AUTH_FAILED 6
73
#define MS_ERR_NOT_IMPLEMENTED 7
74
#define MS_ERR_UNKNOWN_DB 8
75
#define MS_ERR_REMOVING_REPO 9
76
#define MS_ERR_DATABASE_DELETED 10
77
#define MS_ERR_DUPLICATE 11 /* Attempt to insert a duplicate key into a system table. */
78
#define MS_ERR_INVALID_RECORD 12
79
#define MS_ERR_RECOVERY_IN_PROGRESS 13
80
#define MS_ERR_DUPLICATE_DB 14
81
#define MS_ERR_DUPLICATE_DB_ID 15
82
#define MS_ERR_INVALID_OPERATION 16
84
#define MS_LOCK_NONE 0
85
#define MS_LOCK_READONLY 1
86
#define MS_LOCK_READ_WRITE 2
88
#define PBMS_BLOB_URL_SIZE 120
90
#define PBMS_FIELD_COL_SIZE 128
91
#define PBMS_FIELD_COND_SIZE 300
93
#define MS_RESULT_MESSAGE_SIZE 300
94
#define MS_RESULT_STACK_SIZE 200
96
typedef struct PBMSResultRec {
97
int mr_code; /* Engine specific error code. */
98
char mr_message[MS_RESULT_MESSAGE_SIZE]; /* Error message, required if non-zero return code. */
99
char mr_stack[MS_RESULT_STACK_SIZE]; /* Trace information about where the error occurred. */
100
} PBMSResultRec, *PBMSResultPtr;
104
typedef struct PBMSBlobID {
106
u_int64_t bi_blob_size;
107
u_int64_t bi_blob_id; // or repo file offset if type = REPO
108
u_int64_t bi_blob_ref_id;
109
u_int32_t bi_tab_id; // or repo ID if type = REPO
110
u_int32_t bi_auth_code;
111
u_int32_t bi_blob_type;
112
} PBMSBlobIDRec, *PBMSBlobIDPtr;
114
typedef struct PBMSBlobURL {
115
char bu_data[PBMS_BLOB_URL_SIZE];
116
} PBMSBlobURLRec, *PBMSBlobURLPtr;
118
typedef struct PBMSEngineRec {
119
int ms_version; /* MS_ENGINE_VERSION */
120
int ms_index; /* The index into the engine list. */
121
int ms_removing; /* TRUE (1) if the engine is being removed. */
122
int ms_internal; /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
123
char ms_engine_name[32];
124
} PBMSEngineRec, *PBMSEnginePtr;
127
* This function should never be called directly, it is called
128
* by deregisterEngine() below.
130
typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
132
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
135
* Call this function to store a BLOB in the repository the BLOB's
136
* URL will be returned. The returned URL buffer is expected to be atleast
137
* PBMS_BLOB_URL_SIZE long.
139
* The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
141
typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, char *blob_url, unsigned short col_index, PBMSResultPtr result);
144
* Call this function for each BLOB to be retained. When a BLOB is used, the
145
* URL may be changed. The returned URL buffer is expected to be atleast
146
* PBMS_BLOB_URL_SIZE long.
148
* The returned URL must be inserted into the row in place of the given
151
typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
154
* If a row containing a BLOB is deleted, then the BLOBs in the
155
* row must be released.
157
* Note: if a table is dropped, all the BLOBs referenced by the
158
* table are automatically released.
160
typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
162
typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
164
typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result);
166
typedef void (*ECCallCompleted)(bool built_in, bool ok);
168
typedef struct PBMSCallbacksRec {
169
int cb_version; /* MS_CALLBACK_VERSION */
170
ECRegisterdFunc cb_register;
171
ECDeregisterdFunc cb_deregister;
172
ECCreateBlobsFunc cb_create_blob;
173
ECRetainBlobsFunc cb_retain_blob;
174
ECReleaseBlobFunc cb_release_blob;
175
ECDropTable cb_drop_table;
176
ECRenameTable cb_rename_table;
177
ECCallCompleted cb_completed;
178
} PBMSCallbacksRec, *PBMSCallbacksPtr;
180
typedef struct PBMSSharedMemoryRec {
181
int sm_magic; /* MS_SHARED_MEMORY_MAGIC */
182
int sm_version; /* MS_SHARED_MEMORY_VERSION */
183
volatile int sm_shutdown_lock; /* "Cheap" lock for shutdown! */
184
PBMSCallbacksPtr sm_callbacks;
185
int sm_reserved1[20];
186
void *sm_reserved2[20];
189
PBMSEnginePtr sm_engine_list[MS_ENGINE_LIST_SIZE];
190
} PBMSSharedMemoryRec, *PBMSSharedMemoryPtr;
197
const char *temp_prefix[3];
201
PBMS_API(): sharedMemory(NULL) {
203
temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
204
temp_prefix[i++] = NULL;
211
* This method is called by the PBMS engine during startup.
213
int PBMSStartup(PBMSCallbacksPtr callbacks, PBMSResultPtr result) {
217
err = getSharedMemory(true, result);
219
sharedMemory->sm_callbacks = callbacks;
225
* This method is called by the PBMS engine during startup.
227
void PBMSShutdown() {
233
sharedMemory->sm_callbacks = NULL;
236
for (int i=0; i<sharedMemory->sm_list_len && empty; i++) {
237
if (sharedMemory->sm_engine_list[i])
244
removeSharedMemory();
248
* Register the engine with the Stream Engine.
250
int registerEngine(PBMSEnginePtr engine, PBMSResultPtr result) {
255
// The first engine to register creates the shared memory.
256
if ((err = getSharedMemory(true, result)))
259
for (int i=0; i<sharedMemory->sm_list_size; i++) {
260
if (!sharedMemory->sm_engine_list[i]) {
261
sharedMemory->sm_engine_list[i] = engine;
262
engine->ms_index = i;
263
if (i >= sharedMemory->sm_list_len)
264
sharedMemory->sm_list_len = i+1;
265
if (sharedMemory->sm_callbacks)
266
sharedMemory->sm_callbacks->cb_register(engine);
268
built_in = (engine->ms_internal == 1);
273
result->mr_code = 15010;
274
strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Too many BLOB streaming engines already registered");
275
*result->mr_stack = 0;
276
return MS_ERR_ENGINE;
280
while (sharedMemory->sm_shutdown_lock)
282
sharedMemory->sm_shutdown_lock++;
283
while (sharedMemory->sm_shutdown_lock != 1) {
284
usleep(random() % 10000);
285
sharedMemory->sm_shutdown_lock--;
287
sharedMemory->sm_shutdown_lock++;
292
sharedMemory->sm_shutdown_lock--;
295
void deregisterEngine(PBMSEnginePtr engine) {
296
PBMSResultRec result;
299
if ((err = getSharedMemory(false, &result)))
305
for (int i=0; i<sharedMemory->sm_list_len; i++) {
306
if (sharedMemory->sm_engine_list[i]) {
307
if (sharedMemory->sm_engine_list[i] == engine) {
308
if (sharedMemory->sm_callbacks)
309
sharedMemory->sm_callbacks->cb_deregister(engine);
310
sharedMemory->sm_engine_list[i] = NULL;
320
removeSharedMemory();
323
void removeSharedMemory()
325
const char **prefix = temp_prefix;
328
// Do not remove the sharfed memory until after
329
// the PBMS engine has shutdown.
330
if (sharedMemory->sm_callbacks)
333
sharedMemory->sm_magic = 0;
338
getTempFileName(temp_file, *prefix, getpid());
344
int couldBeURL(char *blob_url, int size)
346
if (blob_url && (size < PBMS_BLOB_URL_SIZE)) {
347
char buffer[PBMS_BLOB_URL_SIZE+1];
348
unsigned long db_id = 0;
349
unsigned long tab_id = 0;
350
unsigned long long blob_id = 0;
351
unsigned long long blob_ref_id = 0;
352
unsigned long long blob_size = 0;
353
unsigned long auth_code = 0;
354
unsigned long server_id = 0;
359
if (blob_url[size]) { // There is no guarantee that the URL will be null terminated.
360
memcpy(buffer, blob_url, size);
365
scanned = sscanf(blob_url, URL_FMT"%4s", &db_id, &type, &tab_id, &blob_id, &auth_code, &server_id, &blob_ref_id, &blob_size, junk);
366
if (scanned != 8) {// If junk is found at the end this will also result in an invalid URL.
367
printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]);
371
if (junk[0] || (type != '~' && type != '_')) {
372
printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]);
382
int retainBlob(const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
385
char safe_url[PBMS_BLOB_URL_SIZE+1];
388
if ((err = getSharedMemory(false, result)))
391
if (!couldBeURL(blob_url, blob_size)) {
393
if (!sharedMemory->sm_callbacks) {
397
err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, col_index, result);
401
blob_url = ret_blob_url;
403
// Make sure the url is a C string:
404
if (blob_url[blob_size]) {
405
memcpy(safe_url, blob_url, blob_size);
406
safe_url[blob_size] = 0;
412
if (!sharedMemory->sm_callbacks) {
413
result->mr_code = MS_ERR_INCORRECT_URL;
414
strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming engine (PBMS) not installed");
415
*result->mr_stack = 0;
416
return MS_ERR_INCORRECT_URL;
419
return sharedMemory->sm_callbacks->cb_retain_blob(built_in, db_name, tab_name, ret_blob_url, blob_url, col_index, result);
422
int releaseBlob(const char *db_name, const char *tab_name, char *blob_url, size_t blob_size, PBMSResultPtr result)
425
char safe_url[PBMS_BLOB_URL_SIZE+1];
427
if ((err = getSharedMemory(false, result)))
430
if (!sharedMemory->sm_callbacks)
433
if (!couldBeURL(blob_url, blob_size))
436
if (blob_url[blob_size]) {
437
memcpy(safe_url, blob_url, blob_size);
438
safe_url[blob_size] = 0;
442
return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
445
int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
449
if ((err = getSharedMemory(false, result)))
452
if (!sharedMemory->sm_callbacks)
455
return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
458
int renameTable(const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result)
462
if ((err = getSharedMemory(false, result)))
465
if (!sharedMemory->sm_callbacks)
468
return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_table, result);
471
void completed(int ok)
473
PBMSResultRec result;
475
if (getSharedMemory(false, &result))
478
if (!sharedMemory->sm_callbacks)
481
sharedMemory->sm_callbacks->cb_completed(built_in, ok);
484
volatile PBMSSharedMemoryPtr sharedMemory;
487
int getSharedMemory(bool create, PBMSResultPtr result)
492
const char **prefix = temp_prefix;
498
getTempFileName(temp_file, *prefix, getpid());
499
tmp_f = open(temp_file, O_RDWR | (create ? O_CREAT : 0), SH_MASK);
501
return setOSResult(errno, "open", temp_file, result);
503
r = lseek(tmp_f, 0, SEEK_SET);
506
return setOSResult(errno, "lseek", temp_file, result);
511
tfer = read(tmp_f, buffer, 100);
514
return setOSResult(errno, "read", temp_file, result);
518
sscanf(buffer, "%p", &sharedMemory);
519
if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
523
sharedMemory = (PBMSSharedMemoryPtr) calloc(1, sizeof(PBMSSharedMemoryRec));
524
sharedMemory->sm_magic = MS_SHARED_MEMORY_MAGIC;
525
sharedMemory->sm_version = MS_SHARED_MEMORY_VERSION;
526
sharedMemory->sm_list_size = MS_ENGINE_LIST_SIZE;
528
r = lseek(tmp_f, 0, SEEK_SET);
531
return setOSResult(errno, "fseek", temp_file, result);
534
sprintf(buffer, "%p", sharedMemory);
535
tfer = write(tmp_f, buffer, strlen(buffer));
536
if (tfer != strlen(buffer)) {
538
return setOSResult(errno, "write", temp_file, result);
543
return setOSResult(errno, "fsync", temp_file, result);
546
else if (sharedMemory->sm_version != MS_SHARED_MEMORY_VERSION) {
548
result->mr_code = -1000;
549
*result->mr_stack = 0;
550
strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Shared memory version: ");
551
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, sharedMemory->sm_version);
552
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ", does not match engine shared memory version: ");
553
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, MS_SHARED_MEMORY_VERSION);
554
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ".");
555
return MS_ERR_ENGINE;
559
// For backward compatability we need to create the old versions but we only need to read the current version.
568
void strcpy(size_t size, char *to, const char *from)
572
while (*from && size--)
578
void strcat(size_t size, char *to, const char *from)
580
while (*to && size--) to++;
581
strcpy(size, to, from);
584
void strcat(size_t size, char *to, int val)
588
sprintf(buffer, "%d", val);
589
strcat(size, to, buffer);
592
int setOSResult(int err, const char *func, char *file, PBMSResultPtr result) {
595
result->mr_code = err;
596
*result->mr_stack = 0;
597
strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "System call ");
598
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, func);
599
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "() failed on ");
600
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, file);
601
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ": ");
604
if (FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, iMessage + strlen(iMessage), MS_RESULT_MESSAGE_SIZE - strlen(iMessage), NULL)) {
607
ptr = &iMessage[strlen(iMessage)];
608
while (ptr-1 > err_msg) {
609
if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.')
615
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
616
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
617
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
618
return MS_ERR_ENGINE;
624
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, msg);
625
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
626
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
627
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
630
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Unknown OS error code ");
631
strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
634
return MS_ERR_ENGINE;
637
void getTempFileName(char *temp_file, const char * prefix, int pid)
639
sprintf(temp_file, "/tmp/%s%d", prefix, pid);
642
bool startsWith(const char *cstr, const char *w_cstr)
644
while (*cstr && *w_cstr) {
645
if (*cstr != *w_cstr)
650
return *cstr || !*w_cstr;
653
void deleteTempFiles()
655
struct dirent *entry;
656
struct dirent *result;
663
sz = sizeof(struct dirent) + pathconf("/tmp/", _PC_NAME_MAX); // Solaris, see readdir(3C)
665
sz = sizeof(struct dirent);
667
if (!(entry = (struct dirent *) malloc(sz)))
669
if (!(odir = opendir("/tmp/")))
671
err = readdir_r(odir, entry, &result);
672
while (!err && result) {
673
const char **prefix = temp_prefix;
676
if (startsWith(entry->d_name, *prefix)) {
677
int pid = atoi(entry->d_name + strlen(*prefix));
679
/* If the process does not exist: */
680
if (kill(pid, 0) == -1 && errno == ESRCH) {
681
getTempFileName(temp_file, *prefix, pid);
688
err = readdir_r(odir, entry, &result);
697
* The following is a low level API for accessing blobs directly.
702
* Any threads using the direct blob access API must first register them selves with the
703
* blob streaming engine before using the blob access functions. This is done by calling
704
* PBMSInitBlobStreamingThread(). Call PBMSDeinitBlobStreamingThread() after the thread is
705
* done using the direct blob access API
709
* PBMSInitBlobStreamingThread(): Returns a pointer to a blob streaming thread.
711
extern void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result);
712
extern void PBMSDeinitBlobStreamingThread(void *v_bs_thread);
715
* PBMSGetError():Gets the last error reported by a blob streaming thread.
717
extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
720
* PBMSCreateBlob():Creates a new blob in the database of the given size.
722
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
725
* PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of
726
* data written to the blob must match the size specified when the blob was created.
728
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
731
* PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
733
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
736
* PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast PBMS_BLOB_URL_SIZE bytes in size.
738
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
741
* PBMSIDToURL():Convert a blob URL to a blob ID.
743
extern bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id);