~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/systab_cloud_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-10-21
22
 
 *
23
 
 * System cloud starage info table.
24
 
 *
25
 
 */
26
 
#ifdef DRIZZLED
27
 
#include <config.h>
28
 
#include <drizzled/common.h>
29
 
#include <drizzled/session.h>
30
 
#include <drizzled/sql_lex.h>
31
 
#endif
32
 
 
33
 
#include "cslib/CSConfig.h"
34
 
#include <inttypes.h>
35
 
 
36
 
#include <sys/types.h>
37
 
#include <sys/stat.h>
38
 
#include <stdlib.h>
39
 
#include <time.h>
40
 
 
41
 
//#include "mysql_priv.h"
42
 
#include "cslib/CSGlobal.h"
43
 
#include "cslib/CSStrUtil.h"
44
 
#include "cslib/CSLog.h"
45
 
#include "cslib/CSPath.h"
46
 
#include "cslib/CSDirectory.h"
47
 
 
48
 
#include "ha_pbms.h"
49
 
//#include <plugin.h>
50
 
 
51
 
#include "mysql_ms.h"
52
 
#include "database_ms.h"
53
 
#include "open_table_ms.h"
54
 
#include "discover_ms.h"
55
 
#include "systab_util_ms.h"
56
 
 
57
 
#include "systab_cloud_ms.h"
58
 
 
59
 
DT_FIELD_INFO pbms_cloud_info[]=
60
 
{
61
 
        {"Id",                  NOVAL,  NULL, MYSQL_TYPE_LONG,          NULL,                   NOT_NULL_FLAG,  "The Cloud storage reference ID"},
62
 
        {"Server",              1024,   NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 server name"},
63
 
        {"Bucket",              124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 bucket name"},
64
 
        {"PublicKey",   124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 public key"},
65
 
        {"PrivateKey",  124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 private key"},
66
 
        {NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
67
 
};
68
 
 
69
 
DT_KEY_INFO pbms_cloud_keys[]=
70
 
{
71
 
        {"pbms_cloud_pk", PRI_KEY_FLAG, {"Id", NULL}},
72
 
        {NULL, 0, {NULL}}
73
 
};
74
 
 
75
 
#define MIN_CLOUD_TABLE_SIZE 4
76
 
 
77
 
//----------------------------
78
 
void MSCloudTable::startUp()
79
 
{
80
 
        MSCloudInfo::startUp();
81
 
}
82
 
 
83
 
//----------------------------
84
 
void MSCloudTable::shutDown()
85
 
{
86
 
        MSCloudInfo::shutDown();
87
 
}
88
 
 
89
 
//----------------------------
90
 
void MSCloudTable::loadTable(MSDatabase *db)
91
 
{
92
 
 
93
 
        enter_();
94
 
        
95
 
        push_(db);
96
 
        lock_(MSCloudInfo::gCloudInfo);
97
 
        
98
 
        if (MSCloudInfo::gMaxInfoRef == 0) {
99
 
                CSPath  *path;
100
 
                path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
101
 
                push_(path);
102
 
 
103
 
                if (path->exists()) {
104
 
                        CSFile          *file;
105
 
                        SysTabRec       *cloudData;
106
 
                        const char      *server, *bucket, *pubKey, *privKey;
107
 
                        uint32_t                info_id;
108
 
                        MSCloudInfo     *info;
109
 
                        size_t          size;
110
 
                        
111
 
                        new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
112
 
                        push_(cloudData);
113
 
 
114
 
                        file = path->openFile(CSFile::READONLY);
115
 
                        push_(file);
116
 
                        size = file->getEOF();
117
 
                        cloudData->setLength(size);
118
 
                        file->read(cloudData->getBuffer(0), 0, size, size);
119
 
                        release_(file);
120
 
                        
121
 
                        cloudData->firstRecord();
122
 
                        MSCloudInfo::gMaxInfoRef = cloudData->getInt4Field();
123
 
                        
124
 
                        if (! cloudData->isValidRecord()) 
125
 
                                MSCloudInfo::gMaxInfoRef = 1;
126
 
                        
127
 
                        while (cloudData->nextRecord()) {
128
 
                                info_id = cloudData->getInt4Field();
129
 
                                server = cloudData->getStringField();
130
 
                                bucket = cloudData->getStringField();
131
 
                                pubKey = cloudData->getStringField();
132
 
                                privKey = cloudData->getStringField();
133
 
                                
134
 
                                if (cloudData->isValidRecord()) {
135
 
                                        if (info_id > MSCloudInfo::gMaxInfoRef) {
136
 
                                                char msg[80];
137
 
                                                snprintf(msg, 80, "Cloud info id (%"PRIu32") larger than expected (%"PRIu32")\n", info_id, MSCloudInfo::gMaxInfoRef);
138
 
                                                CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
139
 
                                                CSL.log(self, CSLog::Warning, msg);
140
 
                                                MSCloudInfo::gMaxInfoRef = info_id +1;
141
 
                                        }
142
 
                                        if ( MSCloudInfo::gCloudInfo->get(info_id)) {
143
 
                                                char msg[80];
144
 
                                                snprintf(msg, 80, "Duplicate Cloud info id (%"PRIu32") being ignored\n", info_id);
145
 
                                                CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
146
 
                                                CSL.log(self, CSLog::Warning, msg);
147
 
                                        } else {
148
 
                                                new_(info, MSCloudInfo( info_id, server, bucket, pubKey, privKey));
149
 
                                                MSCloudInfo::gCloudInfo->set(info_id, info);
150
 
                                        }
151
 
                                }
152
 
                        }
153
 
                        release_(cloudData); cloudData = NULL;
154
 
                        
155
 
                } else
156
 
                        MSCloudInfo::gMaxInfoRef = 1;
157
 
                
158
 
                release_(path);
159
 
                
160
 
        }
161
 
        unlock_(MSCloudInfo::gCloudInfo);
162
 
 
163
 
        release_(db);
164
 
 
165
 
        exit_();
166
 
}
167
 
 
168
 
void MSCloudTable::saveTable(MSDatabase *db)
169
 
{
170
 
        SysTabRec               *cloudData;
171
 
        MSCloudInfo             *info;
172
 
        enter_();
173
 
        
174
 
        push_(db);
175
 
        
176
 
        new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
177
 
        push_(cloudData);
178
 
        
179
 
        // Build the table records
180
 
        cloudData->clear();
181
 
        lock_(MSCloudInfo::gCloudInfo);
182
 
        
183
 
        cloudData->beginRecord();       
184
 
        cloudData->setInt4Field(MSCloudInfo::gMaxInfoRef);
185
 
        cloudData->endRecord(); 
186
 
        for  (int i = 0;(info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->itemAt(i)); i++) { // info is not referenced.
187
 
                
188
 
                cloudData->beginRecord();       
189
 
                cloudData->setInt4Field(info->getCloudRefId());
190
 
                cloudData->setStringField(info->getServer());
191
 
                cloudData->setStringField(info->getBucket());
192
 
                cloudData->setStringField(info->getPublicKey());
193
 
                cloudData->setStringField(info->getPrivateKey());
194
 
                cloudData->endRecord();                 
195
 
        }
196
 
        unlock_(MSCloudInfo::gCloudInfo);
197
 
 
198
 
        restoreTable(RETAIN(db), cloudData->getBuffer(0), cloudData->length(), false);
199
 
        
200
 
        release_(cloudData);
201
 
        release_(db);
202
 
        exit_();
203
 
}
204
 
 
205
 
 
206
 
MSCloudTable::MSCloudTable(MSSystemTableShare *share, TABLE *table):
207
 
MSOpenSystemTable(share, table),
208
 
iCloudIndex(0)
209
 
{
210
 
}
211
 
 
212
 
MSCloudTable::~MSCloudTable()
213
 
{
214
 
        //unuse();
215
 
}
216
 
 
217
 
void MSCloudTable::use()
218
 
{
219
 
        MSCloudInfo::gCloudInfo->lock();
220
 
}
221
 
 
222
 
void MSCloudTable::unuse()
223
 
{
224
 
        MSCloudInfo::gCloudInfo->unlock();
225
 
        
226
 
}
227
 
 
228
 
 
229
 
void MSCloudTable::seqScanInit()
230
 
{
231
 
        iCloudIndex = 0;
232
 
}
233
 
 
234
 
#define MAX_PASSWORD ((int32_t)64)
235
 
bool MSCloudTable::seqScanNext(char *buf)
236
 
{
237
 
        char            passwd[MAX_PASSWORD +1];
238
 
        TABLE           *table = mySQLTable;
239
 
        Field           *curr_field;
240
 
        byte            *save;
241
 
        MY_BITMAP       *save_write_set;
242
 
        MSCloudInfo     *info;
243
 
        const char      *val;
244
 
        
245
 
        enter_();
246
 
        
247
 
        info = (MSCloudInfo     *) MSCloudInfo::gCloudInfo->itemAt(iCloudIndex++); // Object is not referenced.
248
 
        if (!info)
249
 
                return_(false);
250
 
        
251
 
        save_write_set = table->write_set;
252
 
        table->write_set = NULL;
253
 
 
254
 
#ifdef DRIZZLED
255
 
        memset(buf, 0xFF, table->getNullBytes());
256
 
#else
257
 
        memset(buf, 0xFF, table->s->null_bytes);
258
 
#endif
259
 
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
260
 
                curr_field = *field;
261
 
                save = curr_field->ptr;
262
 
#if MYSQL_VERSION_ID < 50114
263
 
                curr_field->ptr = (byte *) buf + curr_field->offset();
264
 
#else
265
 
#ifdef DRIZZLED
266
 
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
267
 
#else
268
 
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
269
 
#endif
270
 
#endif
271
 
                switch (curr_field->field_name[0]) {
272
 
                        case 'I':
273
 
                                ASSERT(strcmp(curr_field->field_name, "Id") == 0);
274
 
                                curr_field->store(info->getCloudRefId(), true);
275
 
                                break;
276
 
 
277
 
                        case 'S':
278
 
                                ASSERT(strcmp(curr_field->field_name, "Server") == 0);
279
 
                                val = info->getServer();
280
 
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
281
 
                                setNotNullInRecord(curr_field, buf);
282
 
                                break;
283
 
 
284
 
                        case 'B': 
285
 
                                ASSERT(strcmp(curr_field->field_name, "Bucket") == 0);
286
 
                                val = info->getBucket();
287
 
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
288
 
                                setNotNullInRecord(curr_field, buf);
289
 
                                break;
290
 
 
291
 
                        case 'P': 
292
 
                                if (curr_field->field_name[1] == 'u') {
293
 
                                        ASSERT(strcmp(curr_field->field_name, "PublicKey") == 0);
294
 
                                        val = info->getPublicKey();
295
 
                                } else if (curr_field->field_name[1] == 'r') {
296
 
                                        ASSERT(strcmp(curr_field->field_name, "PrivateKey") == 0);
297
 
                                        val = info->getPrivateKey();
298
 
                                        
299
 
                                        int32_t i;
300
 
                                        for (i = 0; (i < MAX_PASSWORD) && (i < (int32_t)strlen(val)); i++) passwd[i] = '*';
301
 
                                        passwd[i] = 0;
302
 
                                        val = passwd;
303
 
                                } else {
304
 
                                        ASSERT(false);
305
 
                                        break;
306
 
                                }
307
 
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
308
 
                                setNotNullInRecord(curr_field, buf);
309
 
                                break;
310
 
                                
311
 
                        default:
312
 
                                ASSERT(false);
313
 
                }
314
 
                curr_field->ptr = save;
315
 
        }
316
 
 
317
 
        table->write_set = save_write_set;
318
 
        
319
 
        return_(true);
320
 
}
321
 
 
322
 
void MSCloudTable::seqScanPos(unsigned char *pos )
323
 
{
324
 
        int32_t index = iCloudIndex -1;
325
 
        if (index < 0)
326
 
                index = 0; // This is probably an error condition.
327
 
                
328
 
        mi_int4store(pos, index);
329
 
}
330
 
 
331
 
void MSCloudTable::seqScanRead(unsigned char *pos , char *buf)
332
 
{
333
 
        iCloudIndex = mi_uint4korr(pos);
334
 
        seqScanNext(buf);
335
 
}
336
 
 
337
 
void MSCloudTable::updateRow(char *old_data, char *new_data) 
338
 
{
339
 
        uint32_t n_id, o_id, o_indx, n_indx;
340
 
        const char *realPrivKey;
341
 
        String server, bucket, pubKey, privKey;
342
 
        String o_server, o_bucket, o_pubKey, o_privKey;
343
 
        MSCloudInfo *info;
344
 
 
345
 
        enter_();
346
 
        
347
 
        getFieldValue(new_data, 0, &n_id);
348
 
        getFieldValue(new_data, 1, &server);
349
 
        getFieldValue(new_data, 2, &bucket);
350
 
        getFieldValue(new_data, 3, &pubKey);
351
 
        getFieldValue(new_data, 4, &privKey);
352
 
 
353
 
        getFieldValue(old_data, 0, &o_id);
354
 
        getFieldValue(old_data, 1, &o_server);
355
 
        getFieldValue(old_data, 2, &o_bucket);
356
 
        getFieldValue(old_data, 3, &o_pubKey);
357
 
        getFieldValue(old_data, 4, &o_privKey);
358
 
 
359
 
        // The cloud ID must be unique
360
 
        if ((o_id !=  n_id) && MSCloudInfo::gCloudInfo->get(n_id)) {
361
 
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to update a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
362
 
        }
363
 
 
364
 
        // The private key is masked when returned to the caller, so
365
 
        // unless the caller has updated it we need to get the real 
366
 
        // private key from the old record.
367
 
        if (strcmp(privKey.c_ptr(), o_privKey.c_ptr()))
368
 
                realPrivKey = privKey.c_ptr();
369
 
        else {
370
 
                info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->get(o_id); // unreference pointer
371
 
                realPrivKey = info->getPrivateKey();
372
 
        }
373
 
        
374
 
        new_(info, MSCloudInfo( n_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), realPrivKey));
375
 
        push_(info);
376
 
        
377
 
        o_indx = MSCloudInfo::gCloudInfo->getIndex(o_id);
378
 
 
379
 
        MSCloudInfo::gCloudInfo->remove(o_id);
380
 
        pop_(info);
381
 
        MSCloudInfo::gCloudInfo->set(n_id, info);
382
 
        n_indx = MSCloudInfo::gCloudInfo->getIndex(n_id);
383
 
        
384
 
        // Adjust the current position in the array if required.
385
 
        if (o_indx < n_indx )
386
 
                iCloudIndex--;
387
 
                
388
 
        saveTable(RETAIN(myShare->mySysDatabase));
389
 
        exit_();
390
 
}
391
 
 
392
 
void MSCloudTable::insertRow(char *data) 
393
 
{
394
 
        uint32_t ref_id;
395
 
        String server, bucket, pubKey, privKey;
396
 
        MSCloudInfo *info;
397
 
 
398
 
        enter_();
399
 
        
400
 
        getFieldValue(data, 0, &ref_id);
401
 
                
402
 
        // The cloud ID must be unique
403
 
        if (ref_id && MSCloudInfo::gCloudInfo->get(ref_id)) {
404
 
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to insert a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
405
 
        }
406
 
        
407
 
        getFieldValue(data, 1, &server);
408
 
        getFieldValue(data, 2, &bucket);
409
 
        getFieldValue(data, 3, &pubKey);
410
 
        getFieldValue(data, 4, &privKey);
411
 
 
412
 
        if (ref_id == 0)
413
 
                ref_id = MSCloudInfo::gMaxInfoRef++;
414
 
        else if (ref_id >= MSCloudInfo::gMaxInfoRef)
415
 
                MSCloudInfo::gMaxInfoRef = ref_id +1;
416
 
                
417
 
        new_(info, MSCloudInfo( ref_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), privKey.c_ptr()));
418
 
        MSCloudInfo::gCloudInfo->set(ref_id, info);
419
 
        
420
 
        saveTable(RETAIN(myShare->mySysDatabase));
421
 
        exit_();
422
 
}
423
 
 
424
 
void MSCloudTable::deleteRow(char *data) 
425
 
{
426
 
        uint32_t ref_id, indx;
427
 
 
428
 
        enter_();
429
 
        
430
 
        getFieldValue(data, 0, &ref_id);
431
 
                
432
 
        // Adjust the current position in the array if required.
433
 
        indx = MSCloudInfo::gCloudInfo->getIndex(ref_id);
434
 
        if (indx <= iCloudIndex)
435
 
                iCloudIndex--;
436
 
 
437
 
        MSCloudInfo::gCloudInfo->remove(ref_id);
438
 
        saveTable(RETAIN(myShare->mySysDatabase));
439
 
        exit_();
440
 
}
441
 
 
442
 
void MSCloudTable::transferTable(MSDatabase *to_db, MSDatabase *from_db)
443
 
{
444
 
        CSPath  *path;
445
 
        enter_();
446
 
        
447
 
        push_(from_db);
448
 
        push_(to_db);
449
 
        
450
 
        path = CSPath::newPath(getPBMSPath(RETAIN(from_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
451
 
        push_(path);
452
 
        if (path->exists()) {
453
 
                CSPath  *bu_path;
454
 
                bu_path = CSPath::newPath(getPBMSPath(RETAIN(to_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
455
 
                path->copyTo(bu_path, true);
456
 
        }
457
 
        
458
 
        release_(path);
459
 
        release_(to_db);
460
 
        release_(from_db);
461
 
        
462
 
        exit_();
463
 
}
464
 
 
465
 
CSStringBuffer *MSCloudTable::dumpTable(MSDatabase *db)
466
 
{
467
 
 
468
 
        CSPath                  *path;
469
 
        CSStringBuffer  *dump;
470
 
 
471
 
        enter_();
472
 
        
473
 
        push_(db);
474
 
        path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
475
 
        release_(db);
476
 
        
477
 
        push_(path);
478
 
        new_(dump, CSStringBuffer(20));
479
 
        push_(dump);
480
 
 
481
 
        if (path->exists()) {
482
 
                CSFile  *file;
483
 
                size_t  size;
484
 
                
485
 
                file = path->openFile(CSFile::READONLY);
486
 
                push_(file);
487
 
                
488
 
                size = file->getEOF();
489
 
                dump->setLength(size);
490
 
                file->read(dump->getBuffer(0), 0, size, size);
491
 
                release_(file);
492
 
        }
493
 
        
494
 
        pop_(dump);
495
 
        release_(path);
496
 
        return_(dump);
497
 
}
498
 
 
499
 
void MSCloudTable::restoreTable(MSDatabase *db, const char *data, size_t size, bool reload)
500
 
{
501
 
        CSPath  *path;
502
 
        CSFile  *file;
503
 
 
504
 
        enter_();
505
 
        
506
 
        push_(db);
507
 
        path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
508
 
        push_(path);
509
 
        
510
 
        file = path->openFile(CSFile::CREATE | CSFile::TRUNCATE);
511
 
        push_(file);
512
 
        
513
 
        file->write(data, 0, size);
514
 
        file->close();
515
 
        release_(file);
516
 
        
517
 
        release_(path);
518
 
        
519
 
        pop_(db);
520
 
        if (reload)
521
 
                loadTable(db);
522
 
        else
523
 
                db->release();
524
 
                
525
 
        exit_();
526
 
}
527
 
 
528
 
void MSCloudTable::removeTable(CSString *db_path)
529
 
{
530
 
        CSPath  *path;
531
 
        char pbms_path[PATH_MAX];
532
 
        
533
 
        enter_();
534
 
        
535
 
        push_(db_path); 
536
 
        cs_strcpy(PATH_MAX, pbms_path, db_path->getCString());
537
 
        release_(db_path);
538
 
        
539
 
        if (strcmp(cs_last_name_of_path(pbms_path), "pbms")  != 0)
540
 
                exit_();
541
 
                
542
 
        cs_remove_last_name_of_path(pbms_path);
543
 
 
544
 
        path = getSysFile(CSString::newString(pbms_path), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
545
 
        push_(path);
546
 
        
547
 
        if (path->exists())
548
 
                path->removeFile();
549
 
        release_(path);
550
 
        
551
 
        exit_();
552
 
}
553