~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/events_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 *  Copyright (C) 2010 PrimeBase Technologies GmbH, Germany
3
 
 *
4
 
 *  This program is free software; you can redistribute it and/or modify
5
 
 *  it under the terms of the GNU General Public License as published by
6
 
 *  the Free Software Foundation; version 2 of the License.
7
 
 *
8
 
 *  This program is distributed in the hope that it will be useful,
9
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
 *  GNU General Public License for more details.
12
 
 *
13
 
 *  You should have received a copy of the GNU General Public License
14
 
 *  along with this program; if not, write to the Free Software
15
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
16
 
 *
17
 
 * Barry Leslie
18
 
 *
19
 
 * 2010-06-01
20
 
 */
21
 
 
22
 
#include <config.h>
23
 
#include <string>
24
 
#include <inttypes.h>
25
 
 
26
 
#include <drizzled/session.h>
27
 
#include <drizzled/field/blob.h>
28
 
#include <drizzled/sql_lex.h>
29
 
 
30
 
#include "cslib/CSConfig.h"
31
 
#include "cslib/CSGlobal.h"
32
 
#include "cslib/CSStrUtil.h"
33
 
#include "cslib/CSThread.h"
34
 
 
35
 
#include "events_ms.h"
36
 
#include "parameters_ms.h"
37
 
#include "engine_ms.h"
38
 
 
39
 
using namespace drizzled;
40
 
using namespace plugin;
41
 
using namespace std;
42
 
 
43
 
 
44
 
//==================================
45
 
// My table event observers: 
46
 
static bool insertRecord(const char *db, const char *table_name, char *possible_blob_url,  size_t length, 
47
 
        Session &session, Field_blob *field, unsigned char *blob_rec, size_t packlength)
48
 
{
49
 
        char *blob_url;
50
 
        char safe_url[PBMS_BLOB_URL_SIZE+1];
51
 
        PBMSBlobURLRec blob_url_buffer;
52
 
        size_t org_length = length;
53
 
        int32_t err;
54
 
        PBMSResultRec result;
55
 
        
56
 
        // Tell PBMS to record a new reference to the BLOB.
57
 
        // If 'blob' is not a BLOB URL then it will be stored in the repositor as a new BLOB
58
 
        // and a reference to it will be created.
59
 
        
60
 
        if (MSEngine::couldBeURL(possible_blob_url, length) == false) {
61
 
                err = MSEngine::createBlob(db, table_name, possible_blob_url, length, &blob_url_buffer, &result);
62
 
                if (err) {
63
 
                        // If it fails log the error and continue to try and release any other BLOBs in the row.
64
 
                        fprintf(stderr, "PBMSEvents: createBlob(\"%s.%s\") error (%d):'%s'\n", 
65
 
                                db, table_name, result.mr_code,  result.mr_message);
66
 
                                
67
 
                        return true;
68
 
                }                               
69
 
                blob_url = blob_url_buffer.bu_data;
70
 
        } else {
71
 
                // The BLOB URL may not be null terminate, if so
72
 
                // then copy it to a safe buffer and terminate it.
73
 
                if (possible_blob_url[length]) {
74
 
                        memcpy(safe_url, possible_blob_url, length);
75
 
                        safe_url[length] = 0;
76
 
                        blob_url = safe_url;
77
 
                } else
78
 
                        blob_url = possible_blob_url;
79
 
        }
80
 
        
81
 
        // Tell PBMS to add a reference to the BLOB.
82
 
        err = MSEngine::referenceBlob(db, table_name, &blob_url_buffer, blob_url, field->position(), &result);
83
 
        if (err) {
84
 
                // If it fails log the error and continue to try and release any other BLOBs in the row.
85
 
                fprintf(stderr, "PBMSEvents: referenceBlob(\"%s.%s\", \"%s\" ) error (%d):'%s'\n", 
86
 
                        db, table_name, blob_url, result.mr_code,  result.mr_message);
87
 
                        
88
 
                return true;
89
 
        }
90
 
        
91
 
        // The URL is modified on insert so if the BLOB length changed reset it. 
92
 
        // This will happen if the BLOB data was replaced with a BLOB reference. 
93
 
        length = strlen(blob_url_buffer.bu_data)  +1;
94
 
        if ((length != org_length) || memcmp(blob_url_buffer.bu_data, possible_blob_url, length)) {
95
 
                char *blob = possible_blob_url; // This is the BLOB as the server currently sees it.
96
 
                
97
 
                if (length != org_length) {
98
 
                        field->store_length(blob_rec, length);
99
 
                }
100
 
                
101
 
                if (length > org_length) {
102
 
                        // This can only happen if the BLOB URL is actually larger than the BLOB itself.
103
 
                        blob = (char *) session.mem.alloc(length);
104
 
                        memcpy(blob_rec+packlength, &blob, sizeof(char*));
105
 
                }                       
106
 
                memcpy(blob, blob_url_buffer.bu_data, length);
107
 
        } 
108
 
 
109
 
        return false;
110
 
}
111
 
 
112
 
//---
113
 
static bool deleteRecord(const char *db, const char *table_name, char *blob_url,  size_t length)
114
 
{
115
 
        int32_t err;
116
 
        char safe_url[PBMS_BLOB_URL_SIZE+1];
117
 
        PBMSResultRec result;
118
 
        bool call_failed = false;
119
 
        
120
 
        // Check to see if this is a valid URL.
121
 
        if (MSEngine::couldBeURL(blob_url, length)) {
122
 
        
123
 
                // The BLOB URL may not be null terminate, if so
124
 
                // then copy it to a safe buffer and terminate it.
125
 
                if (blob_url[length]) {
126
 
                        memcpy(safe_url, blob_url, length);
127
 
                        safe_url[length] = 0;
128
 
                        blob_url = safe_url;
129
 
                }
130
 
                
131
 
                // Signal PBMS to delete the reference to the BLOB.
132
 
                err = MSEngine::dereferenceBlob(db, table_name, blob_url, &result);
133
 
                if (err) {
134
 
                        // If it fails log the error and continue to try and release any other BLOBs in the row.
135
 
                        fprintf(stderr, "PBMSEvents: dereferenceBlob(\"%s.%s\") error (%d):'%s'\n", 
136
 
                                db, table_name, result.mr_code,  result.mr_message);
137
 
                                
138
 
                        call_failed = true;
139
 
                }
140
 
        }
141
 
 
142
 
        return call_failed;
143
 
}
144
 
 
145
 
//---
146
 
static bool observeBeforeInsertRecord(BeforeInsertRecordEventData &data)
147
 
{
148
 
        Field_blob *field;
149
 
        unsigned char *blob_rec;
150
 
        char *blob_url;
151
 
        size_t packlength, i, length;
152
 
 
153
 
        for (i= 0; i < data.table.sizeBlobFields(); i++) {
154
 
                field = data.table.getBlobFieldAt(i);
155
 
                
156
 
                if (field->is_null_in_record(data.row))
157
 
                        continue;
158
 
                        
159
 
                // Get the blob record:
160
 
                packlength = field->pack_length() - data.table.getBlobPtrSize();
161
 
 
162
 
                blob_rec = (unsigned char *)data.row + field->offset(data.table.getInsertRecord());
163
 
                length = field->get_length(blob_rec);
164
 
                memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
165
 
 
166
 
                if (insertRecord(data.table.getSchemaName(), data.table.getTableName(), 
167
 
                        blob_url, length, data.session, field, blob_rec, packlength))
168
 
                        return true;
169
 
        }
170
 
 
171
 
        return false;
172
 
}
173
 
 
174
 
//---
175
 
static bool observeAfterInsertRecord(AfterInsertRecordEventData &data)
176
 
{
177
 
        bool has_blob = false;
178
 
        
179
 
        for (uint32_t i= 0; (i < data.table.sizeBlobFields()) && (has_blob == false); i++) {
180
 
                Field_blob *field = data.table.getBlobFieldAt(i);
181
 
                
182
 
                if ( field->is_null_in_record(data.row) == false)
183
 
                        has_blob = true;
184
 
        }
185
 
        
186
 
        if  (has_blob)
187
 
                MSEngine::callCompleted(data.err == 0);
188
 
        
189
 
        return false;
190
 
}
191
 
 
192
 
//---
193
 
static bool observeBeforeUpdateRecord(BeforeUpdateRecordEventData &data)
194
 
{
195
 
        Field_blob *field;
196
 
        uint32_t field_offset;
197
 
        const unsigned char *old_blob_rec;
198
 
        unsigned char *new_blob_rec= NULL;
199
 
        char *old_blob_url, *new_blob_url;
200
 
        size_t packlength, i, old_length= 0, new_length= 0;
201
 
        const unsigned char *old_row = data.old_row;
202
 
        unsigned char *new_row = data.new_row;
203
 
        const char *db = data.table.getSchemaName();
204
 
        const char *table_name = data.table.getTableName();
205
 
        bool old_null, new_null;
206
 
 
207
 
        for (i= 0; i < data.table.sizeBlobFields(); i++) {
208
 
                field = data.table.getBlobFieldAt(i);
209
 
                
210
 
                new_null = field->is_null_in_record(new_row);           
211
 
                old_null = field->is_null_in_record(old_row);
212
 
                
213
 
                if (new_null && old_null)
214
 
                        continue;
215
 
                
216
 
                // Check to see if the BLOB data was updated.
217
 
 
218
 
                // Get the blob records:
219
 
                field_offset = field->offset(data.table.getInsertRecord());
220
 
                packlength = field->pack_length() - data.table.getBlobPtrSize();
221
 
 
222
 
                if (new_null) {
223
 
                        new_blob_url = NULL;
224
 
                } else {
225
 
                        new_blob_rec = new_row + field_offset;
226
 
                        new_length = field->get_length(new_blob_rec);
227
 
                        memcpy(&new_blob_url, new_blob_rec +packlength, sizeof(char*));
228
 
                }
229
 
                
230
 
                if (old_null) {
231
 
                        old_blob_url = NULL;
232
 
                } else {
233
 
                        old_blob_rec = old_row + field_offset;
234
 
                        old_length = field->get_length(old_blob_rec);
235
 
                        memcpy(&old_blob_url, old_blob_rec +packlength, sizeof(char*));
236
 
                }
237
 
                
238
 
                // Check to see if the BLOBs are the same.
239
 
                // I am assuming that if the BLOB pointer is different then teh BLOB has changed.
240
 
                // Zero length BLOBs are a special case because they may have a NULL data pointer,
241
 
                // to catch this and distiguish it from a NULL BLOB I do a check to see if one field was NULL:
242
 
                // (old_null != new_null)
243
 
                if ((old_blob_url != new_blob_url) || (old_null != new_null)) {
244
 
                        
245
 
                        // The BLOB was updated so delete the old one and insert the new one.
246
 
                        if ((old_null == false) && deleteRecord(db, table_name, old_blob_url, old_length))
247
 
                                return true;
248
 
                                
249
 
                        if ((new_null == false) && insertRecord(db, table_name, new_blob_url, new_length, data.session, field, new_blob_rec, packlength))
250
 
                                return true;
251
 
 
252
 
                }
253
 
                
254
 
        }
255
 
 
256
 
        return false;
257
 
}
258
 
 
259
 
//---
260
 
static bool observeAfterUpdateRecord(AfterUpdateRecordEventData &data)
261
 
{
262
 
        bool has_blob = false;
263
 
        const unsigned char *old_row = data.old_row;
264
 
        const unsigned char *new_row = data.new_row;
265
 
        
266
 
        for (uint32_t i= 0; (i < data.table.sizeBlobFields()) && (has_blob == false); i++) {
267
 
                Field_blob *field = data.table.getBlobFieldAt(i);               
268
 
                bool new_null = field->is_null_in_record(new_row);              
269
 
                bool old_null = field->is_null_in_record(old_row);
270
 
                
271
 
                if ( (new_null == false) || (old_null == false)) {
272
 
                        const unsigned char *blob_rec;                  
273
 
                        size_t field_offset = field->offset(data.table.getInsertRecord());
274
 
                        size_t packlength = field->pack_length() - data.table.getBlobPtrSize();
275
 
                        char *old_blob_url, *new_blob_url;
276
 
                        
277
 
                        blob_rec = new_row + field_offset;
278
 
                        memcpy(&new_blob_url, blob_rec +packlength, sizeof(char*));
279
 
 
280
 
                        blob_rec = old_row + field_offset;
281
 
                        memcpy(&old_blob_url, blob_rec +packlength, sizeof(char*));
282
 
 
283
 
                        has_blob = ((old_blob_url != new_blob_url) || (old_null != new_null));
284
 
                }
285
 
        }
286
 
        
287
 
        if  (has_blob)
288
 
                MSEngine::callCompleted(data.err == 0);
289
 
 
290
 
  return false;
291
 
}
292
 
 
293
 
//---
294
 
static bool observeAfterDeleteRecord(AfterDeleteRecordEventData &data)
295
 
{
296
 
        Field_blob *field;
297
 
        const unsigned char *blob_rec;
298
 
        char *blob_url;
299
 
        size_t packlength, i, length;
300
 
        bool call_failed = false;
301
 
        bool has_blob = false;
302
 
        
303
 
        if (data.err != 0)
304
 
                return false;
305
 
 
306
 
        for (i= 0; (i < data.table.sizeBlobFields()) && (call_failed == false); i++) {
307
 
                field = data.table.getBlobFieldAt(i);
308
 
                
309
 
                if (field->is_null_in_record(data.row))
310
 
                        continue;
311
 
                        
312
 
                has_blob = true;        
313
 
                // Get the blob record:
314
 
                packlength = field->pack_length() - data.table.getBlobPtrSize();
315
 
 
316
 
                blob_rec = data.row + field->offset(data.table.getInsertRecord());
317
 
                length = field->get_length(blob_rec);
318
 
                memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
319
 
 
320
 
                if (deleteRecord(data.table.getSchemaName(), data.table.getTableName(), blob_url, length))
321
 
                        call_failed = true;
322
 
        }
323
 
        
324
 
        if (has_blob)
325
 
                MSEngine::callCompleted(call_failed == false);
326
 
                
327
 
        return call_failed;
328
 
}
329
 
 
330
 
//==================================
331
 
// My session event observers: 
332
 
static bool observeAfterDropDatabase(AfterDropDatabaseEventData &data)
333
 
{
334
 
        PBMSResultRec result;
335
 
        if (data.err != 0)
336
 
                return false;
337
 
 
338
 
        if (MSEngine::dropDatabase(data.db.c_str(), &result) != 0) {
339
 
                fprintf(stderr, "PBMSEvents: dropDatabase(\"%s\") error (%d):'%s'\n", 
340
 
                        data.db.c_str(), result.mr_code,  result.mr_message);
341
 
        }
342
 
        
343
 
        // Always return no error for after drop database. What could the server do about it?
344
 
        return false;
345
 
}
346
 
 
347
 
//==================================
348
 
// My schema event observers: 
349
 
static bool observeAfterDropTable(AfterDropTableEventData &data)
350
 
{
351
 
        PBMSResultRec result;
352
 
        if (data.err != 0)
353
 
                return false;
354
 
 
355
 
        if (MSEngine::dropTable(data.table.getSchemaName().c_str(), data.table.getTableName().c_str(), &result) != 0) {
356
 
                fprintf(stderr, "PBMSEvents: dropTable(\"%s.%s\") error (%d):'%s'\n", 
357
 
                        data.table.getSchemaName().c_str(), data.table.getTableName().c_str(), result.mr_code,  result.mr_message);
358
 
                return true;
359
 
        }
360
 
        MSEngine::callCompleted(true);
361
 
        
362
 
        return false;
363
 
}
364
 
 
365
 
//---
366
 
static bool observeAfterRenameTable(AfterRenameTableEventData &data)
367
 
{
368
 
        PBMSResultRec result;
369
 
        if (data.err != 0)
370
 
                return false;
371
 
 
372
 
        const char *from_db = data.from.getSchemaName().c_str();
373
 
        const char *from_table = data.from.getTableName().c_str();
374
 
        const char *to_db = data.to.getSchemaName().c_str();
375
 
        const char *to_table = data.to.getTableName().c_str();
376
 
        
377
 
        if (MSEngine::renameTable(from_db, from_table, to_db, to_table, &result) != 0) {
378
 
                fprintf(stderr, "PBMSEvents: renameTable(\"%s.%s\" To \"%s.%s\") error (%d):'%s'\n", 
379
 
                        from_db, from_table, to_db, to_table, result.mr_code,  result.mr_message);
380
 
                return true;
381
 
        }
382
 
        MSEngine::callCompleted(true);
383
 
        
384
 
        return false;
385
 
}
386
 
 
387
 
//==================================
388
 
/* This is where I register which table events my pluggin is interested in.*/
389
 
void PBMSEvents::registerTableEventsDo(TableShare &table_share, EventObserverList &observers)
390
 
{
391
 
  if ((PBMSParameters::isPBMSEventsEnabled() == false) 
392
 
    || (PBMSParameters::isBLOBTable(table_share.getSchemaName(), table_share.getTableName()) == false))
393
 
    return;
394
 
    
395
 
  if (table_share.blob_fields > 0) {
396
 
          registerEvent(observers, BEFORE_INSERT_RECORD, PBMSParameters::getBeforeInsertEventPosition()); // I want to be called first if passible
397
 
          registerEvent(observers, AFTER_INSERT_RECORD); 
398
 
          registerEvent(observers, BEFORE_UPDATE_RECORD, PBMSParameters::getBeforeUptateEventPosition());
399
 
          registerEvent(observers, AFTER_UPDATE_RECORD); 
400
 
          registerEvent(observers, AFTER_DELETE_RECORD);
401
 
 }
402
 
}
403
 
 
404
 
//==================================
405
 
/* This is where I register which schema events my pluggin is interested in.*/
406
 
void PBMSEvents::registerSchemaEventsDo(const std::string &db, EventObserverList &observers)
407
 
{
408
 
  if ((PBMSParameters::isPBMSEventsEnabled() == false) 
409
 
    || (PBMSParameters::isBLOBDatabase(db.c_str()) == false))
410
 
    return;
411
 
    
412
 
  registerEvent(observers, AFTER_DROP_TABLE);
413
 
  registerEvent(observers, AFTER_RENAME_TABLE);
414
 
}
415
 
 
416
 
//==================================
417
 
/* This is where I register which schema events my pluggin is interested in.*/
418
 
void PBMSEvents::registerSessionEventsDo(Session &, EventObserverList &observers)
419
 
{
420
 
  if (PBMSParameters::isPBMSEventsEnabled() == false) 
421
 
    return;
422
 
    
423
 
  registerEvent(observers, AFTER_DROP_DATABASE);
424
 
}
425
 
 
426
 
//==================================
427
 
/* The event observer.*/
428
 
bool PBMSEvents::observeEventDo(EventData &data)
429
 
{
430
 
  bool result= false;
431
 
  
432
 
  switch (data.event) {
433
 
  case AFTER_DROP_DATABASE:
434
 
    result = observeAfterDropDatabase((AfterDropDatabaseEventData &)data);
435
 
    break;
436
 
    
437
 
  case AFTER_DROP_TABLE:
438
 
    result = observeAfterDropTable((AfterDropTableEventData &)data);
439
 
    break;
440
 
    
441
 
  case AFTER_RENAME_TABLE:
442
 
    result = observeAfterRenameTable((AfterRenameTableEventData &)data);
443
 
    break;
444
 
    
445
 
  case BEFORE_INSERT_RECORD:
446
 
     result = observeBeforeInsertRecord((BeforeInsertRecordEventData &)data);
447
 
    break;
448
 
    
449
 
  case AFTER_INSERT_RECORD:
450
 
    result = observeAfterInsertRecord((AfterInsertRecordEventData &)data);
451
 
    break;
452
 
    
453
 
 case BEFORE_UPDATE_RECORD:
454
 
    result = observeBeforeUpdateRecord((BeforeUpdateRecordEventData &)data);
455
 
    break;
456
 
             
457
 
  case AFTER_UPDATE_RECORD:
458
 
    result = observeAfterUpdateRecord((AfterUpdateRecordEventData &)data);
459
 
    break;
460
 
    
461
 
  case AFTER_DELETE_RECORD:
462
 
    result = observeAfterDeleteRecord((AfterDeleteRecordEventData &)data);
463
 
    break;
464
 
 
465
 
  default:
466
 
    fprintf(stderr, "PBMSEvents: Unexpected event '%s'\n", EventObserver::eventName(data.event));
467
 
 
468
 
  }
469
 
  
470
 
  return result;
471
 
}
472