1
/* Copyright (C) 2010 PrimeBase Technologies GmbH
4
* Redistribution and use in source and binary forms, with or without
5
* modification, are permitted provided that the following conditions are met:
7
* * Redistributions of source code must retain the above copyright notice,
8
* this list of conditions and the following disclaimer.
9
* * Redistributions in binary form must reproduce the above copyright notice,
10
* this list of conditions and the following disclaimer in the documentation
11
* and/or other materials provided with the distribution.
12
* * Neither the name of the "PrimeBase Technologies GmbH" nor the names of its
13
* contributors may be used to endorse or promote products derived from this
14
* software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19
* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
20
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25
* POSSIBILITY OF SUCH DAMAGE.
28
* PrimeBase Media Stream for MySQL and Drizzle
37
* PBMS interface used to enable engines for use with the PBMS daemon.
39
* For an example on how to build this into an engine have a look at the PBXT engine
40
* in file ha_pbxt.cc. Search for 'PBMS_ENABLED'.
45
#if defined(MSDOS) || defined(__WIN__)
46
#include "pbms_enabled.h"
48
// Windows is not supported yet so just stub out the functions..
49
bool pbms_initialize(const char *engine_name __attribute__((unused)),
50
bool isServer __attribute__((unused)),
51
bool isTransactional __attribute__((unused)),
52
PBMSResultPtr result __attribute__((unused)),
53
IsPBMSFilterFunc is_pbms_blob __attribute__((unused))
55
void pbms_finalize() {}
56
int pbms_write_row_blobs(const TABLE *table __attribute__((unused)),
57
unsigned char *buf __attribute__((unused)),
58
PBMSResultPtr result __attribute__((unused))
60
int pbms_update_row_blobs(const TABLE *table __attribute__((unused)),
61
const unsigned char *old_row __attribute__((unused)),
62
unsigned char *new_row __attribute__((unused)),
63
PBMSResultPtr result __attribute__((unused))
65
int pbms_delete_row_blobs(const TABLE *table __attribute__((unused)),
66
const unsigned char *buf __attribute__((unused)),
67
PBMSResultPtr result __attribute__((unused))
69
int pbms_rename_table_with_blobs(const char *old_table_path __attribute__((unused)),
70
const char *new_table_path __attribute__((unused)),
71
PBMSResultPtr result __attribute__((unused))
73
int pbms_delete_table_with_blobs(const char *table_path __attribute__((unused)),
74
PBMSResultPtr result __attribute__((unused))
76
void pbms_completed(TABLE *table __attribute__((unused)),
77
bool ok __attribute__((unused))
80
#define PBMS_API pbms_enabled_api
82
#include "pbms_enabled.h"
83
#include "mysql_priv.h"
84
#include <mysql/plugin.h>
85
#define session_alloc(sess, size) thd_alloc(sess, size);
86
#define current_session current_thd
88
#define GET_BLOB_FIELD(t, i) (Field_blob *)(t->field[t->s->blob_field[i]])
89
#define DB_NAME(f) (f->table->s->db.str)
90
#define TAB_NAME(f) (*(f->table_name))
92
static PBMS_API pbms_api;
95
* A callback function to check if the column is a PBMS BLOB.
96
* Can be NULL if no check is to be done.
98
static IsPBMSFilterFunc is_pbms_blob = NULL;
100
//====================
101
bool pbms_initialize(const char *engine_name, bool isServer, bool isTransactional, PBMSResultPtr result, IsPBMSFilterFunc is_pbms_blob_arg)
104
PBMSEngineRec enabled_engine = {
113
strncpy(enabled_engine.ms_engine_name, engine_name, 32);
114
enabled_engine.ms_internal = isServer;
115
enabled_engine.ms_has_transactions = isTransactional;
116
enabled_engine.ms_engine_name[31] = 0;
118
err = pbms_api.registerEngine(&enabled_engine, result);
119
is_pbms_blob = is_pbms_blob_arg;
125
//====================
126
void pbms_finalize(const char *engine_name)
128
pbms_api.deregisterEngine(engine_name);
131
//==================================
132
static int insertRecord(Field_blob *field, char *blob, size_t org_length, unsigned char *blob_rec, size_t packlength, PBMSResultPtr result)
136
PBMSBlobURLRec blob_url;
138
err = pbms_api.retainBlob(DB_NAME(field), TAB_NAME(field), &blob_url, blob, org_length, field->position(), result);
142
// If the BLOB length changed reset it.
143
// This will happen if the BLOB data was replaced with a BLOB reference.
144
length = strlen(blob_url.bu_data) +1;
145
if ((length != org_length) || memcmp(blob_url.bu_data, blob, length)) {
146
if (length != org_length) {
147
field->store_length(blob_rec, packlength, length);
150
if (length > org_length) {
151
// This can only happen if the BLOB URL is actually larger than the BLOB itself.
152
blob = (char *) session_alloc(current_session, length);
153
memcpy(blob_rec+packlength, &blob, sizeof(char*));
155
memcpy(blob, blob_url.bu_data, length);
161
//====================
162
int pbms_update_row_blobs(const TABLE *table, const unsigned char *old_row, unsigned char *new_row, PBMSResultPtr result)
165
uint32_t field_offset;
166
const unsigned char *old_blob_rec;
167
unsigned char *new_blob_rec;
168
char *old_blob_url, *new_blob_url;
169
size_t packlength, i, old_length, new_length;
171
bool old_null_blob, new_null_blob;
173
result->mr_had_blobs = false;
175
if (!pbms_api.isPBMSLoaded())
178
if (table->s->blob_fields == 0)
181
for (i= 0; i < table->s->blob_fields; i++) {
182
field = GET_BLOB_FIELD(table, i);
184
old_null_blob = field->is_null_in_record(old_row);
185
new_null_blob = field->is_null_in_record(new_row);
186
if (old_null_blob && new_null_blob)
191
// Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
192
field->sql_type(type_name);
193
if (strcasecmp(type_name.c_ptr(), "LongBlob"))
197
if( is_pbms_blob && !is_pbms_blob(field) )
201
// Get the blob record:
202
field_offset = field->offset(field->table->record[0]);
203
packlength = field->pack_length() - field->table->s->blob_ptr_size;
208
new_blob_rec = new_row + field_offset;
209
new_length = field->get_length(new_blob_rec);
210
memcpy(&new_blob_url, new_blob_rec +packlength, sizeof(char*));
216
old_blob_rec = old_row + field_offset;
217
old_length = field->get_length(old_blob_rec);
218
memcpy(&old_blob_url, old_blob_rec +packlength, sizeof(char*));
221
// Check to see if the BLOBs are the same.
222
// I am assuming that if the BLOB pointer is different then teh BLOB has changed.
223
// Zero length BLOBs are a special case because they may have a NULL data pointer,
224
// to catch this and distiguish it from a NULL BLOB I do a check to see if one field was NULL:
225
// (old_null_blob != new_null_blob)
226
if ((old_blob_url != new_blob_url) || (old_null_blob != new_null_blob)) {
228
result->mr_had_blobs = true;
230
// The BLOB was updated so delete the old one and insert the new one.
231
if ((old_null_blob == false) && (err = pbms_api.releaseBlob(DB_NAME(field), TAB_NAME(field), old_blob_url, old_length, result)))
234
if ((new_null_blob == false) && (err = insertRecord(field, new_blob_url, new_length, new_blob_rec, packlength, result)))
242
//====================
243
int pbms_write_row_blobs(const TABLE *table, unsigned char *row_buffer, PBMSResultPtr result)
247
unsigned char *blob_rec;
249
size_t packlength, i, length;
252
result->mr_had_blobs = false;
254
if (!pbms_api.isPBMSLoaded())
257
if (table->s->blob_fields == 0)
260
for (i= 0; i < table->s->blob_fields; i++) {
261
field = GET_BLOB_FIELD(table, i);
263
if (field->is_null_in_record(row_buffer))
268
// Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
269
field->sql_type(type_name);
270
if (strcasecmp(type_name.c_ptr(), "LongBlob"))
274
if( is_pbms_blob && !is_pbms_blob(field) )
277
result->mr_had_blobs = true;
279
// Get the blob record:
280
packlength = field->pack_length() - field->table->s->blob_ptr_size;
281
blob_rec = row_buffer + field->offset(field->table->record[0]);
283
length = field->get_length(blob_rec);
284
memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
286
if ((err = insertRecord(field, blob_url, length, blob_rec, packlength, result)))
293
//====================
294
int pbms_delete_row_blobs(const TABLE *table, const unsigned char *row_buffer, PBMSResultPtr result)
297
const unsigned char *blob_rec;
299
size_t packlength, i, length;
300
bool call_failed = false;
303
result->mr_had_blobs = false;
305
if (!pbms_api.isPBMSLoaded())
308
if (table->s->blob_fields == 0)
311
for (i= 0; i < table->s->blob_fields; i++) {
312
field = GET_BLOB_FIELD(table, i);
314
if (field->is_null_in_record(row_buffer))
319
// Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
320
field->sql_type(type_name);
321
if (strcasecmp(type_name.c_ptr(), "LongBlob"))
325
if(is_pbms_blob && !is_pbms_blob(field) )
328
result->mr_had_blobs = true;
330
// Get the blob record:
331
packlength = field->pack_length() - field->table->s->blob_ptr_size;
333
blob_rec = row_buffer + field->offset(field->table->record[0]);
334
length = field->get_length(blob_rec);
335
memcpy(&blob, blob_rec +packlength, sizeof(char*));
337
// Signal PBMS to delete the reference to the BLOB.
338
err = pbms_api.releaseBlob(DB_NAME(field), TAB_NAME(field), blob, length, result);
346
#define MAX_NAME_SIZE 64
347
static void parse_table_path(const char *path, char *db_name, char *tab_name)
349
const char *ptr = path + strlen(path) -1, *eptr;
352
*db_name = *tab_name = 0;
354
while ((ptr > path) && (*ptr != '/'))ptr --;
358
strncpy(tab_name, ptr+1, MAX_NAME_SIZE);
359
tab_name[MAX_NAME_SIZE-1] = 0;
363
while ((ptr > path) && (*ptr != '/'))ptr --;
369
if (len >= MAX_NAME_SIZE)
370
len = MAX_NAME_SIZE-1;
372
memcpy(db_name, ptr, len);
377
//====================
378
int pbms_rename_table_with_blobs(const char *old_table_path, const char *new_table_path, PBMSResultPtr result)
380
char o_db_name[MAX_NAME_SIZE], n_db_name[MAX_NAME_SIZE], o_tab_name[MAX_NAME_SIZE], n_tab_name[MAX_NAME_SIZE];
382
result->mr_had_blobs = false;
383
if (!pbms_api.isPBMSLoaded())
386
result->mr_had_blobs = true; // Assume it has blobs.
388
parse_table_path(old_table_path, o_db_name, o_tab_name);
389
parse_table_path(new_table_path, n_db_name, n_tab_name);
391
return pbms_api.renameTable(o_db_name, o_tab_name, n_db_name, n_tab_name, result);
394
//====================
395
int pbms_delete_table_with_blobs(const char *table_path, PBMSResultPtr result)
397
char db_name[MAX_NAME_SIZE], tab_name[MAX_NAME_SIZE];
399
result->mr_had_blobs = false;
400
if (!pbms_api.isPBMSLoaded())
403
result->mr_had_blobs = true; // Assume it has blobs.
404
parse_table_path(table_path, db_name, tab_name);
406
return pbms_api.dropTable(db_name, tab_name, result);
409
//====================
410
void pbms_completed(const TABLE *table, bool ok)
412
if (!pbms_api.isPBMSLoaded())
415
if ((!table) || (table->s->blob_fields != 0))
416
pbms_api.completed(ok) ;