2
* Copyright (C) 2010 PrimeBase Technologies GmbH, Germany
4
* This program is free software; you can redistribute it and/or modify
5
* it under the terms of the GNU General Public License as published by
6
* the Free Software Foundation; version 2 of the License.
8
* This program is distributed in the hope that it will be useful,
9
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
* GNU General Public License for more details.
13
* You should have received a copy of the GNU General Public License
14
* along with this program; if not, write to the Free Software
15
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
26
#include <drizzled/session.h>
27
#include <drizzled/field/blob.h>
29
#include "cslib/CSConfig.h"
30
#include "cslib/CSGlobal.h"
31
#include "cslib/CSStrUtil.h"
32
#include "cslib/CSThread.h"
34
#include "events_ms.h"
35
#include "parameters_ms.h"
36
#include "engine_ms.h"
38
using namespace drizzled;
39
using namespace plugin;
43
//==================================
44
// My table event observers:
46
static bool insertRecord(TableEventData &data, unsigned char *new_row)
49
unsigned char *blob_rec;
50
char *blob_url, *possible_blob_url;
51
char safe_url[PBMS_BLOB_URL_SIZE+1];
52
PBMSBlobURLRec blob_url_buffer;
53
size_t packlength, i, length, org_length;
57
for (i= 0; i < data.table.sizeBlobFields(); i++) {
58
field = data.table.getBlobFieldAt(i);
60
// Get the blob record:
61
blob_rec = new_row + field->offset(data.table.record[0]);
62
packlength = field->pack_length() - data.table.getBlobPtrSize();
64
length = field->get_length(blob_rec);
65
memcpy(&possible_blob_url, blob_rec +packlength, sizeof(char*));
66
org_length = field->get_length(blob_rec);
68
// Signal PBMS to record a new reference to the BLOB.
69
// If 'blob' is not a BLOB URL then it will be stored in the repositor as a new BLOB
70
// and a reference to it will be created.
71
if (MSEngine::couldBeURL(possible_blob_url, length) == false) {
72
err = MSEngine::createBlob(data.table.getSchemaName(), data.table.getTableName(), possible_blob_url, length, &blob_url_buffer, &result);
74
// If it fails log the error and continue to try and release any other BLOBs in the row.
75
fprintf(stderr, "PBMSEvents: createBlob(\"%s.%s\") error (%d):'%s'\n",
76
data.table.getSchemaName(), data.table.getTableName(), result.mr_code, result.mr_message);
80
blob_url = blob_url_buffer.bu_data;
82
// The BLOB URL may not be null terminate, if so
83
// then copy it to a safe buffer and terminate it.
84
if (possible_blob_url[length]) {
85
memcpy(safe_url, possible_blob_url, length);
89
blob_url = possible_blob_url;
92
// Signal PBMS to delete the reference to the BLOB.
93
err = MSEngine::referenceBlob(data.table.getSchemaName(), data.table.getTableName(), &blob_url_buffer, blob_url, field->field_index, &result);
95
// If it fails log the error and continue to try and release any other BLOBs in the row.
96
fprintf(stderr, "PBMSEvents: referenceBlob(\"%s.%s\", \"%s\" ) error (%d):'%s'\n",
97
data.table.getSchemaName(), data.table.getTableName(), blob_url, result.mr_code, result.mr_message);
102
// The URL is modified on insert so if the BLOB length changed reset it.
103
// This will happen if the BLOB data was replaced with a BLOB reference.
104
length = strlen(blob_url_buffer.bu_data) +1;
105
if ((length != org_length) || memcmp(blob_url_buffer.bu_data, possible_blob_url, length)) {
106
char *blob = possible_blob_url; // This is the BLOB as the server currently sees it.
108
if (length != org_length) {
109
field->store_length(blob_rec, packlength, length);
112
if (length > org_length) {
113
// This can only happen if the BLOB URL is actually larger than the BLOB itself.
114
blob = (char *) data.session.alloc(length);
115
memcpy(blob_rec+packlength, &blob, sizeof(char*));
117
memcpy(blob, blob_url_buffer.bu_data, length);
125
static bool deleteRecord(TableEventData &data, const unsigned char *old_row)
128
const char *blob_rec;
129
unsigned char *blob_url;
130
size_t packlength, i, length;
132
PBMSResultRec result;
133
bool call_failed = false;
135
for (i= 0; i < data.table.sizeBlobFields(); i++) {
136
field = data.table.getBlobFieldAt(i);
138
// Get the blob record:
139
blob_rec = (char *)old_row + field->offset(data.table.record[0]);
140
packlength = field->pack_length() - data.table.getBlobPtrSize();
142
length = field->get_length((unsigned char *)blob_rec);
143
memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
145
// Check to see if this is a valid URL.
146
if (MSEngine::couldBeURL(blob_url, length)) {
148
// The BLOB URL may not be null terminate, if so
149
// then copy it to a safe buffer and terminate it.
150
char safe_url[PBMS_BLOB_URL_SIZE+1];
151
if (blob_url[length]) {
152
memcpy(safe_url, blob_url, length);
153
safe_url[length] = 0;
157
// Signal PBMS to delete the reference to the BLOB.
158
err = MSEngine::dereferenceBlob(data.table.getSchemaName(), data.table.getTableName(), blob_url, &result);
160
// If it fails log the error and continue to try and release any other BLOBs in the row.
161
fprintf(stderr, "PBMSEvents: dereferenceBlob(\"%s.%s\") error (%d):'%s'\n",
162
data.table.getSchemaName(), data.table.getTableName(), result.mr_code, result.mr_message);
173
static bool insertRecord(const char *db, const char *table_name, char *possible_blob_url, size_t length,
174
Session &session, Field_blob *field, unsigned char *blob_rec, size_t packlength)
177
char safe_url[PBMS_BLOB_URL_SIZE+1];
178
PBMSBlobURLRec blob_url_buffer;
179
size_t org_length = length;
181
PBMSResultRec result;
183
// Tell PBMS to record a new reference to the BLOB.
184
// If 'blob' is not a BLOB URL then it will be stored in the repositor as a new BLOB
185
// and a reference to it will be created.
187
if (MSEngine::couldBeURL(possible_blob_url, length) == false) {
188
err = MSEngine::createBlob(db, table_name, possible_blob_url, length, &blob_url_buffer, &result);
190
// If it fails log the error and continue to try and release any other BLOBs in the row.
191
fprintf(stderr, "PBMSEvents: createBlob(\"%s.%s\") error (%d):'%s'\n",
192
db, table_name, result.mr_code, result.mr_message);
196
blob_url = blob_url_buffer.bu_data;
198
// The BLOB URL may not be null terminate, if so
199
// then copy it to a safe buffer and terminate it.
200
if (possible_blob_url[length]) {
201
memcpy(safe_url, possible_blob_url, length);
202
safe_url[length] = 0;
205
blob_url = possible_blob_url;
208
// Tell PBMS to add a reference to the BLOB.
209
err = MSEngine::referenceBlob(db, table_name, &blob_url_buffer, blob_url, field->field_index, &result);
211
// If it fails log the error and continue to try and release any other BLOBs in the row.
212
fprintf(stderr, "PBMSEvents: referenceBlob(\"%s.%s\", \"%s\" ) error (%d):'%s'\n",
213
db, table_name, blob_url, result.mr_code, result.mr_message);
218
// The URL is modified on insert so if the BLOB length changed reset it.
219
// This will happen if the BLOB data was replaced with a BLOB reference.
220
length = strlen(blob_url_buffer.bu_data) +1;
221
if ((length != org_length) || memcmp(blob_url_buffer.bu_data, possible_blob_url, length)) {
222
char *blob = possible_blob_url; // This is the BLOB as the server currently sees it.
224
if (length != org_length) {
225
field->store_length(blob_rec, packlength, length);
228
if (length > org_length) {
229
// This can only happen if the BLOB URL is actually larger than the BLOB itself.
230
blob = (char *) session.alloc(length);
231
memcpy(blob_rec+packlength, &blob, sizeof(char*));
233
memcpy(blob, blob_url_buffer.bu_data, length);
240
static bool deleteRecord(const char *db, const char *table_name, char *blob_url, size_t length)
243
char safe_url[PBMS_BLOB_URL_SIZE+1];
244
PBMSResultRec result;
245
bool call_failed = false;
247
// Check to see if this is a valid URL.
248
if (MSEngine::couldBeURL(blob_url, length)) {
250
// The BLOB URL may not be null terminate, if so
251
// then copy it to a safe buffer and terminate it.
252
if (blob_url[length]) {
253
memcpy(safe_url, blob_url, length);
254
safe_url[length] = 0;
258
// Signal PBMS to delete the reference to the BLOB.
259
err = MSEngine::dereferenceBlob(db, table_name, blob_url, &result);
261
// If it fails log the error and continue to try and release any other BLOBs in the row.
262
fprintf(stderr, "PBMSEvents: dereferenceBlob(\"%s.%s\") error (%d):'%s'\n",
263
db, table_name, result.mr_code, result.mr_message);
273
static bool observeBeforeInsertRecord(BeforeInsertRecordEventData &data)
276
unsigned char *blob_rec;
278
size_t packlength, i, length;
280
for (i= 0; i < data.table.sizeBlobFields(); i++) {
281
field = data.table.getBlobFieldAt(i);
283
if (field->is_null_in_record(data.row))
286
// Get the blob record:
287
packlength = field->pack_length() - data.table.getBlobPtrSize();
289
blob_rec = (unsigned char *)data.row + field->offset(data.table.record[0]);
290
length = field->get_length(blob_rec);
291
memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
293
if (insertRecord(data.table.getSchemaName(), data.table.getTableName(),
294
blob_url, length, data.session, field, blob_rec, packlength))
302
static bool observeAfterInsertRecord(AfterInsertRecordEventData &data)
304
bool has_blob = false;
306
for (uint32_t i= 0; (i < data.table.sizeBlobFields()) && (has_blob == false); i++) {
307
Field_blob *field = data.table.getBlobFieldAt(i);
309
if ( field->is_null_in_record(data.row) == false)
314
MSEngine::callCompleted(data.err == 0);
320
static bool observeBeforeUpdateRecord(BeforeUpdateRecordEventData &data)
323
uint32_t field_offset;
324
const unsigned char *old_blob_rec;
325
unsigned char *new_blob_rec= NULL;
326
char *old_blob_url, *new_blob_url;
327
size_t packlength, i, old_length= 0, new_length= 0;
328
const unsigned char *old_row = data.old_row;
329
unsigned char *new_row = data.new_row;
330
const char *db = data.table.getSchemaName();
331
const char *table_name = data.table.getTableName();
332
bool old_null, new_null;
334
for (i= 0; i < data.table.sizeBlobFields(); i++) {
335
field = data.table.getBlobFieldAt(i);
337
new_null = field->is_null_in_record(new_row);
338
old_null = field->is_null_in_record(old_row);
340
if (new_null && old_null)
343
// Check to see if the BLOB data was updated.
345
// Get the blob records:
346
field_offset = field->offset(data.table.record[0]);
347
packlength = field->pack_length() - data.table.getBlobPtrSize();
352
new_blob_rec = new_row + field_offset;
353
new_length = field->get_length(new_blob_rec);
354
memcpy(&new_blob_url, new_blob_rec +packlength, sizeof(char*));
360
old_blob_rec = old_row + field_offset;
361
old_length = field->get_length(old_blob_rec);
362
memcpy(&old_blob_url, old_blob_rec +packlength, sizeof(char*));
365
// Check to see if the BLOBs are the same.
366
// I am assuming that if the BLOB pointer is different then teh BLOB has changed.
367
// Zero length BLOBs are a special case because they may have a NULL data pointer,
368
// to catch this and distiguish it from a NULL BLOB I do a check to see if one field was NULL:
369
// (old_null != new_null)
370
if ((old_blob_url != new_blob_url) || (old_null != new_null)) {
372
// The BLOB was updated so delete the old one and insert the new one.
373
if ((old_null == false) && deleteRecord(db, table_name, old_blob_url, old_length))
376
if ((new_null == false) && insertRecord(db, table_name, new_blob_url, new_length, data.session, field, new_blob_rec, packlength))
387
static bool observeAfterUpdateRecord(AfterUpdateRecordEventData &data)
389
bool has_blob = false;
390
const unsigned char *old_row = data.old_row;
391
const unsigned char *new_row = data.new_row;
393
for (uint32_t i= 0; (i < data.table.sizeBlobFields()) && (has_blob == false); i++) {
394
Field_blob *field = data.table.getBlobFieldAt(i);
395
bool new_null = field->is_null_in_record(new_row);
396
bool old_null = field->is_null_in_record(old_row);
398
if ( (new_null == false) || (old_null == false)) {
399
const unsigned char *blob_rec;
400
size_t field_offset = field->offset(data.table.record[0]);
401
size_t packlength = field->pack_length() - data.table.getBlobPtrSize();
402
char *old_blob_url, *new_blob_url;
404
blob_rec = new_row + field_offset;
405
memcpy(&new_blob_url, blob_rec +packlength, sizeof(char*));
407
blob_rec = old_row + field_offset;
408
memcpy(&old_blob_url, blob_rec +packlength, sizeof(char*));
410
has_blob = ((old_blob_url != new_blob_url) || (old_null != new_null));
415
MSEngine::callCompleted(data.err == 0);
421
static bool observeAfterDeleteRecord(AfterDeleteRecordEventData &data)
424
const unsigned char *blob_rec;
426
size_t packlength, i, length;
427
bool call_failed = false;
428
bool has_blob = false;
433
for (i= 0; (i < data.table.sizeBlobFields()) && (call_failed == false); i++) {
434
field = data.table.getBlobFieldAt(i);
436
if (field->is_null_in_record(data.row))
440
// Get the blob record:
441
packlength = field->pack_length() - data.table.getBlobPtrSize();
443
blob_rec = data.row + field->offset(data.table.record[0]);
444
length = field->get_length(blob_rec);
445
memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
447
if (deleteRecord(data.table.getSchemaName(), data.table.getTableName(), blob_url, length))
452
MSEngine::callCompleted(call_failed == false);
457
//==================================
458
// My session event observers:
459
static bool observeAfterDropDatabase(AfterDropDatabaseEventData &data)
461
PBMSResultRec result;
465
if (MSEngine::dropDatabase(data.db.c_str(), &result) != 0) {
466
fprintf(stderr, "PBMSEvents: dropDatabase(\"%s\") error (%d):'%s'\n",
467
data.db.c_str(), result.mr_code, result.mr_message);
470
// Always return no error for after drop database. What could the server do about it?
474
//==================================
475
// My schema event observers:
476
static bool observeAfterDropTable(AfterDropTableEventData &data)
478
PBMSResultRec result;
482
if (MSEngine::dropTable(data.table.getSchemaName().c_str(), data.table.getTableName().c_str(), &result) != 0) {
483
fprintf(stderr, "PBMSEvents: dropTable(\"%s.%s\") error (%d):'%s'\n",
484
data.table.getSchemaName().c_str(), data.table.getTableName().c_str(), result.mr_code, result.mr_message);
487
MSEngine::callCompleted(true);
493
static bool observeAfterRenameTable(AfterRenameTableEventData &data)
495
PBMSResultRec result;
499
const char *from_db = data.from.getSchemaName().c_str();
500
const char *from_table = data.from.getTableName().c_str();
501
const char *to_db = data.to.getSchemaName().c_str();
502
const char *to_table = data.to.getTableName().c_str();
504
if (MSEngine::renameTable(from_db, from_table, to_db, to_table, &result) != 0) {
505
fprintf(stderr, "PBMSEvents: renameTable(\"%s.%s\" To \"%s.%s\") error (%d):'%s'\n",
506
from_db, from_table, to_db, to_table, result.mr_code, result.mr_message);
509
MSEngine::callCompleted(true);
514
//==================================
515
/* This is where I register which table events my pluggin is interested in.*/
516
void PBMSEvents::registerTableEventsDo(TableShare &table_share, EventObserverList &observers)
518
if ((PBMSParameters::isPBMSEventsEnabled() == false)
519
|| (PBMSParameters::isBLOBTable(table_share.getSchemaName(), table_share.getTableName()) == false))
522
if (table_share.blob_fields > 0) {
523
registerEvent(observers, BEFORE_INSERT_RECORD, PBMSParameters::getBeforeInsertEventPosition()); // I want to be called first if passible
524
registerEvent(observers, AFTER_INSERT_RECORD);
525
registerEvent(observers, BEFORE_UPDATE_RECORD, PBMSParameters::getBeforeUptateEventPosition());
526
registerEvent(observers, AFTER_UPDATE_RECORD);
527
registerEvent(observers, AFTER_DELETE_RECORD);
531
//==================================
532
/* This is where I register which schema events my pluggin is interested in.*/
533
void PBMSEvents::registerSchemaEventsDo(const std::string &db, EventObserverList &observers)
535
if ((PBMSParameters::isPBMSEventsEnabled() == false)
536
|| (PBMSParameters::isBLOBDatabase(db.c_str()) == false))
539
registerEvent(observers, AFTER_DROP_TABLE);
540
registerEvent(observers, AFTER_RENAME_TABLE);
543
//==================================
544
/* This is where I register which schema events my pluggin is interested in.*/
545
void PBMSEvents::registerSessionEventsDo(Session &, EventObserverList &observers)
547
if (PBMSParameters::isPBMSEventsEnabled() == false)
550
registerEvent(observers, AFTER_DROP_DATABASE);
553
//==================================
554
/* The event observer.*/
555
bool PBMSEvents::observeEventDo(EventData &data)
559
switch (data.event) {
560
case AFTER_DROP_DATABASE:
561
result = observeAfterDropDatabase((AfterDropDatabaseEventData &)data);
564
case AFTER_DROP_TABLE:
565
result = observeAfterDropTable((AfterDropTableEventData &)data);
568
case AFTER_RENAME_TABLE:
569
result = observeAfterRenameTable((AfterRenameTableEventData &)data);
572
case BEFORE_INSERT_RECORD:
573
result = observeBeforeInsertRecord((BeforeInsertRecordEventData &)data);
576
case AFTER_INSERT_RECORD:
577
result = observeAfterInsertRecord((AfterInsertRecordEventData &)data);
580
case BEFORE_UPDATE_RECORD:
581
result = observeBeforeUpdateRecord((BeforeUpdateRecordEventData &)data);
584
case AFTER_UPDATE_RECORD:
585
result = observeAfterUpdateRecord((AfterUpdateRecordEventData &)data);
588
case AFTER_DELETE_RECORD:
589
result = observeAfterDeleteRecord((AfterDeleteRecordEventData &)data);
593
fprintf(stderr, "PBMSEvents: Unexpected event '%s'\n", EventObserver::eventName(data.event));