~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/SysTab_cloud.cc

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Barry Leslie
 
20
 *
 
21
 * 2009-10-21
 
22
 *
 
23
 * System cloud starage info table.
 
24
 *
 
25
 */
 
26
#include "CSConfig.h"
 
27
#include <inttypes.h>
 
28
 
 
29
#include <sys/types.h>
 
30
#include <sys/stat.h>
 
31
#include <stdlib.h>
 
32
#include <time.h>
 
33
 
 
34
#ifdef DRIZZLED
 
35
#include <drizzled/server_includes.h>
 
36
#endif
 
37
 
 
38
//#include "mysql_priv.h"
 
39
#include "CSGlobal.h"
 
40
#include "CSStrUtil.h"
 
41
#include "CSLog.h"
 
42
#include "CSPath.h"
 
43
#include "CSDirectory.h"
 
44
 
 
45
#include "ha_pbms.h"
 
46
//#include <plugin.h>
 
47
 
 
48
#include "ms_mysql.h"
 
49
#include "Database_ms.h"
 
50
#include "OpenTable_ms.h"
 
51
#include "Util_ms.h"
 
52
#include "Discover_ms.h"
 
53
#include "SysTab_util.h"
 
54
 
 
55
#include "SysTab_cloud.h"
 
56
 
 
57
DT_FIELD_INFO pbms_cloud_info[]=
 
58
{
 
59
        {"Id",                  NULL,   NULL, MYSQL_TYPE_LONG,          NULL,                   NOT_NULL_FLAG,  "The Cloud storage reference ID"},
 
60
        {"Server",              1024,   NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 server name"},
 
61
        {"Bucket",              124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 bucket name"},
 
62
        {"PublicKey",   124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 public key"},
 
63
        {"PrivateKey",  124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 private key"},
 
64
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
65
};
 
66
 
 
67
DT_KEY_INFO pbms_cloud_keys[]=
 
68
{
 
69
        {"pbms_cloud_pk", PRI_KEY_FLAG, {"Id", NULL}},
 
70
        {NULL, 0, {NULL}}
 
71
};
 
72
 
 
73
#define MIN_CLOUD_TABLE_SIZE 4
 
74
 
 
75
//----------------------------
 
76
void MSCloudTable::startUp()
 
77
{
 
78
        MSCloudInfo::startUp();
 
79
}
 
80
 
 
81
//----------------------------
 
82
void MSCloudTable::shutDown()
 
83
{
 
84
        MSCloudInfo::shutDown();
 
85
}
 
86
 
 
87
//----------------------------
 
88
void MSCloudTable::loadTable(MSDatabase *db)
 
89
{
 
90
 
 
91
        enter_();
 
92
        
 
93
        push_(db);
 
94
        lock_(MSCloudInfo::gCloudInfo);
 
95
        
 
96
        if (MSCloudInfo::gMaxInfoRef == 0) {
 
97
                CSPath  *path;
 
98
                path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
 
99
                push_(path);
 
100
 
 
101
                if (path->exists()) {
 
102
                        CSFile          *file;
 
103
                        SysTabRec       *cloudData;
 
104
                        const char      *server, *bucket, *pubKey, *privKey;
 
105
                        uint32_t                info_id;
 
106
                        MSCloudInfo     *info;
 
107
                        size_t          size;
 
108
                        
 
109
                        new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
 
110
                        push_(cloudData);
 
111
 
 
112
                        file = path->openFile(CSFile::READONLY);
 
113
                        push_(file);
 
114
                        size = file->getEOF();
 
115
                        cloudData->setLength(size);
 
116
                        file->read(cloudData->getBuffer(0), 0, size, size);
 
117
                        release_(file);
 
118
                        
 
119
                        cloudData->firstRecord();
 
120
                        MSCloudInfo::gMaxInfoRef = cloudData->getInt4Field();
 
121
                        
 
122
                        if (! cloudData->isValidRecord()) 
 
123
                                MSCloudInfo::gMaxInfoRef = 1;
 
124
                        
 
125
                        while (cloudData->nextRecord()) {
 
126
                                info_id = cloudData->getInt4Field();
 
127
                                server = cloudData->getStringField();
 
128
                                bucket = cloudData->getStringField();
 
129
                                pubKey = cloudData->getStringField();
 
130
                                privKey = cloudData->getStringField();
 
131
                                
 
132
                                if (cloudData->isValidRecord()) {
 
133
                                        if (info_id > MSCloudInfo::gMaxInfoRef) {
 
134
                                                char msg[80];
 
135
                                                snprintf(msg, 80, "Cloud info id (%"PRIu32") larger than expected (%"PRIu32")\n", info_id, MSCloudInfo::gMaxInfoRef);
 
136
                                                CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
 
137
                                                CSL.log(self, CSLog::Warning, msg);
 
138
                                                MSCloudInfo::gMaxInfoRef = info_id +1;
 
139
                                        }
 
140
                                        if ( MSCloudInfo::gCloudInfo->get(info_id)) {
 
141
                                                char msg[80];
 
142
                                                snprintf(msg, 80, "Duplicate Cloud info id (%"PRIu32") being ignored\n", info_id);
 
143
                                                CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
 
144
                                                CSL.log(self, CSLog::Warning, msg);
 
145
                                        } else {
 
146
                                                new_(info, MSCloudInfo( info_id, server, bucket, pubKey, privKey));
 
147
                                                MSCloudInfo::gCloudInfo->set(info_id, info);
 
148
                                        }
 
149
                                }
 
150
                        }
 
151
                        release_(cloudData); cloudData = NULL;
 
152
                        
 
153
                } else
 
154
                        MSCloudInfo::gMaxInfoRef = 1;
 
155
                
 
156
                release_(path);
 
157
                
 
158
        }
 
159
        unlock_(MSCloudInfo::gCloudInfo);
 
160
 
 
161
        release_(db);
 
162
 
 
163
        exit_();
 
164
}
 
165
 
 
166
void MSCloudTable::saveTable(MSDatabase *db)
 
167
{
 
168
        SysTabRec               *cloudData;
 
169
        MSCloudInfo             *info;
 
170
        enter_();
 
171
        
 
172
        push_(db);
 
173
        
 
174
        new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
 
175
        push_(cloudData);
 
176
        
 
177
        // Build the table records
 
178
        cloudData->clear();
 
179
        lock_(MSCloudInfo::gCloudInfo);
 
180
        
 
181
        cloudData->beginRecord();       
 
182
        cloudData->setInt4Field(MSCloudInfo::gMaxInfoRef);
 
183
        cloudData->endRecord(); 
 
184
        for  (int i = 0;(info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->itemAt(i)); i++) { // info is not referenced.
 
185
                
 
186
                cloudData->beginRecord();       
 
187
                cloudData->setInt4Field(info->getCloudRefId());
 
188
                cloudData->setStringField(info->getServer());
 
189
                cloudData->setStringField(info->getBucket());
 
190
                cloudData->setStringField(info->getPublicKey());
 
191
                cloudData->setStringField(info->getPrivateKey());
 
192
                cloudData->endRecord();                 
 
193
        }
 
194
        unlock_(MSCloudInfo::gCloudInfo);
 
195
 
 
196
        restoreTable(RETAIN(db), cloudData->getBuffer(0), cloudData->length(), false);
 
197
        
 
198
        release_(cloudData);
 
199
        release_(db);
 
200
        exit_();
 
201
}
 
202
 
 
203
 
 
204
MSCloudTable::MSCloudTable(MSSystemTableShare *share, TABLE *table):
 
205
MSOpenSystemTable(share, table),
 
206
iCloudIndex(0)
 
207
{
 
208
}
 
209
 
 
210
MSCloudTable::~MSCloudTable()
 
211
{
 
212
        //unuse();
 
213
}
 
214
 
 
215
void MSCloudTable::use()
 
216
{
 
217
        MSCloudInfo::gCloudInfo->lock();
 
218
}
 
219
 
 
220
void MSCloudTable::unuse()
 
221
{
 
222
        MSCloudInfo::gCloudInfo->unlock();
 
223
        
 
224
}
 
225
 
 
226
 
 
227
void MSCloudTable::seqScanInit()
 
228
{
 
229
        iCloudIndex = 0;
 
230
}
 
231
 
 
232
#define MAX_PASSWORD 64
 
233
bool MSCloudTable::seqScanNext(char *buf)
 
234
{
 
235
        char            passwd[MAX_PASSWORD +1];
 
236
        TABLE           *table = mySQLTable;
 
237
        Field           *curr_field;
 
238
        byte            *save;
 
239
        MY_BITMAP       *save_write_set;
 
240
        MSCloudInfo     *info;
 
241
        const char      *val;
 
242
        
 
243
        enter_();
 
244
        
 
245
        info = (MSCloudInfo     *) MSCloudInfo::gCloudInfo->itemAt(iCloudIndex++); // Object is not referenced.
 
246
        if (!info)
 
247
                return_(false);
 
248
        
 
249
        save_write_set = table->write_set;
 
250
        table->write_set = NULL;
 
251
 
 
252
        memset(buf, 0xFF, table->s->null_bytes);
 
253
        for (Field **field=table->field ; *field ; field++) {
 
254
                curr_field = *field;
 
255
                save = curr_field->ptr;
 
256
#if MYSQL_VERSION_ID < 50114
 
257
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
258
#else
 
259
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
 
260
#endif
 
261
                switch (curr_field->field_name[0]) {
 
262
                        case 'I':
 
263
                                ASSERT(strcmp(curr_field->field_name, "Id") == 0);
 
264
                                curr_field->store(info->getCloudRefId(), true);
 
265
                                break;
 
266
 
 
267
                        case 'S':
 
268
                                ASSERT(strcmp(curr_field->field_name, "Server") == 0);
 
269
                                val = info->getServer();
 
270
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
 
271
                                ms_my_set_notnull_in_record(curr_field, buf);
 
272
                                break;
 
273
 
 
274
                        case 'B': 
 
275
                                ASSERT(strcmp(curr_field->field_name, "Bucket") == 0);
 
276
                                val = info->getBucket();
 
277
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
 
278
                                ms_my_set_notnull_in_record(curr_field, buf);
 
279
                                break;
 
280
 
 
281
                        case 'P': 
 
282
                                if (curr_field->field_name[1] == 'u') {
 
283
                                        ASSERT(strcmp(curr_field->field_name, "PublicKey") == 0);
 
284
                                        val = info->getPublicKey();
 
285
                                } else if (curr_field->field_name[1] == 'r') {
 
286
                                        ASSERT(strcmp(curr_field->field_name, "PrivateKey") == 0);
 
287
                                        val = info->getPrivateKey();
 
288
                                        
 
289
                                        int i;
 
290
                                        for (i = 0; i < MAX_PASSWORD && i < strlen(val); i++) passwd[i] = '*';
 
291
                                        passwd[i] = 0;
 
292
                                        val = passwd;
 
293
                                } else {
 
294
                                        ASSERT(false);
 
295
                                        break;
 
296
                                }
 
297
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
 
298
                                ms_my_set_notnull_in_record(curr_field, buf);
 
299
                                break;
 
300
                                
 
301
                        default:
 
302
                                ASSERT(false);
 
303
                }
 
304
                curr_field->ptr = save;
 
305
        }
 
306
 
 
307
        table->write_set = save_write_set;
 
308
        
 
309
        return_(true);
 
310
}
 
311
 
 
312
void MSCloudTable::seqScanPos(uint8_t *pos)
 
313
{
 
314
        int32_t index = iCloudIndex -1;
 
315
        if (index < 0)
 
316
                index = 0; // This is probably an error condition.
 
317
                
 
318
        mi_int4store(pos, index);
 
319
}
 
320
 
 
321
void MSCloudTable::seqScanRead(uint8_t *pos, char *buf)
 
322
{
 
323
        iCloudIndex = mi_uint4korr(pos);
 
324
        seqScanNext(buf);
 
325
}
 
326
 
 
327
void MSCloudTable::updateRow(char *old_data, char *new_data) 
 
328
{
 
329
        TABLE   *table = mySQLTable;    
 
330
        uint32_t n_id, o_id, o_indx, n_indx;
 
331
        const char *realPrivKey;
 
332
        String server_val, bucket_val, pubKey_val, privKey_val, *server = &server_val, *bucket = &bucket_val, *pubKey = &pubKey_val, *privKey = &privKey_val;
 
333
        String o_server_val, o_bucket_val, o_pubKey_val, o_privKey_val, *o_server = &o_server_val, *o_bucket = &o_bucket_val, *o_pubKey = &o_pubKey_val, *o_privKey = &o_privKey_val;
 
334
        MSCloudInfo *info;
 
335
 
 
336
        enter_();
 
337
        
 
338
        GET_INT_FIELD(n_id, 0, table, new_data);                
 
339
        GET_STR_FIELD(server, 1, table, new_data);
 
340
        GET_STR_FIELD(bucket, 2, table, new_data);
 
341
        GET_STR_FIELD(pubKey, 3, table, new_data);
 
342
        GET_STR_FIELD(privKey, 4, table, new_data);
 
343
        
 
344
        GET_INT_FIELD(o_id, 0, table, old_data);
 
345
        GET_STR_FIELD(o_server, 1, table, old_data);
 
346
        GET_STR_FIELD(o_bucket, 2, table, old_data);
 
347
        GET_STR_FIELD(o_pubKey, 3, table, old_data);
 
348
        GET_STR_FIELD(o_privKey, 4, table, old_data);
 
349
        
 
350
        // The cloud ID must be unique
 
351
        if ((o_id !=  n_id) && MSCloudInfo::gCloudInfo->get(n_id)) {
 
352
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to update a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
 
353
        }
 
354
 
 
355
        // The private key is masked when returned to the caller, so
 
356
        // unless the caller has updated it we need to get the real 
 
357
        // private key from the old record.
 
358
        if (strcmp(privKey->c_ptr(), o_privKey->c_ptr()))
 
359
                realPrivKey = privKey->c_ptr();
 
360
        else {
 
361
                info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->get(o_id); // unreference pointer
 
362
                realPrivKey = info->getPrivateKey();
 
363
        }
 
364
        
 
365
        new_(info, MSCloudInfo( n_id, server->c_ptr(), bucket->c_ptr(), pubKey->c_ptr(), realPrivKey));
 
366
        push_(info);
 
367
        
 
368
        o_indx = MSCloudInfo::gCloudInfo->getIndex(o_id);
 
369
 
 
370
        MSCloudInfo::gCloudInfo->remove(o_id);
 
371
        pop_(info);
 
372
        MSCloudInfo::gCloudInfo->set(n_id, info);
 
373
        n_indx = MSCloudInfo::gCloudInfo->getIndex(n_id);
 
374
        
 
375
        // Adjust the current position in the array if required.
 
376
        if (o_indx < n_indx )
 
377
                iCloudIndex--;
 
378
                
 
379
        saveTable(RETAIN(myShare->mySysDatabase));
 
380
        exit_();
 
381
}
 
382
 
 
383
void MSCloudTable::insertRow(char *data) 
 
384
{
 
385
        TABLE   *table = mySQLTable;    
 
386
        uint32_t ref_id;
 
387
        String server_val, bucket_val, pubKey_val, privKey_val, *server = &server_val, *bucket = &bucket_val, *pubKey = &pubKey_val, *privKey = &privKey_val;
 
388
        MSCloudInfo *info;
 
389
 
 
390
        enter_();
 
391
        
 
392
        GET_INT_FIELD(ref_id, 0, table, data);
 
393
                
 
394
        // The cloud ID must be unique
 
395
        if (ref_id && MSCloudInfo::gCloudInfo->get(ref_id)) {
 
396
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to insert a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
 
397
        }
 
398
        
 
399
        GET_STR_FIELD(server, 1, table, data);
 
400
        GET_STR_FIELD(bucket, 2, table, data);
 
401
        GET_STR_FIELD(pubKey, 3, table, data);
 
402
        GET_STR_FIELD(privKey, 4, table, data);
 
403
        
 
404
        if (ref_id == 0)
 
405
                ref_id = MSCloudInfo::gMaxInfoRef++;
 
406
        else if (ref_id >= MSCloudInfo::gMaxInfoRef)
 
407
                MSCloudInfo::gMaxInfoRef = ref_id +1;
 
408
                
 
409
        new_(info, MSCloudInfo( ref_id, server->c_ptr(), bucket->c_ptr(), pubKey->c_ptr(), privKey->c_ptr()));
 
410
        MSCloudInfo::gCloudInfo->set(ref_id, info);
 
411
        
 
412
        saveTable(RETAIN(myShare->mySysDatabase));
 
413
        exit_();
 
414
}
 
415
 
 
416
void MSCloudTable::deleteRow(char *data) 
 
417
{
 
418
        TABLE   *table = mySQLTable;    
 
419
        uint32_t ref_id, indx;
 
420
 
 
421
        enter_();
 
422
        
 
423
        GET_INT_FIELD(ref_id, 0, table, data);
 
424
                
 
425
        // Adjust the current position in the array if required.
 
426
        indx = MSCloudInfo::gCloudInfo->getIndex(ref_id);
 
427
        if (indx <= iCloudIndex)
 
428
                iCloudIndex--;
 
429
 
 
430
        MSCloudInfo::gCloudInfo->remove(ref_id);
 
431
        saveTable(RETAIN(myShare->mySysDatabase));
 
432
        exit_();
 
433
}
 
434
 
 
435
void MSCloudTable::transferTable(MSDatabase *to_db, MSDatabase *from_db)
 
436
{
 
437
        CSPath  *path;
 
438
        enter_();
 
439
        
 
440
        push_(from_db);
 
441
        push_(to_db);
 
442
        
 
443
        path = CSPath::newPath(getPBMSPath(RETAIN(from_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
 
444
        push_(path);
 
445
        if (path->exists()) {
 
446
                CSPath  *bu_path;
 
447
                bu_path = CSPath::newPath(getPBMSPath(RETAIN(to_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
 
448
                path->copyTo(bu_path, true);
 
449
        }
 
450
        
 
451
        release_(path);
 
452
        release_(to_db);
 
453
        release_(from_db);
 
454
        
 
455
        exit_();
 
456
}
 
457
 
 
458
CSStringBuffer *MSCloudTable::dumpTable(MSDatabase *db)
 
459
{
 
460
 
 
461
        CSPath                  *path;
 
462
        CSStringBuffer  *dump;
 
463
 
 
464
        enter_();
 
465
        
 
466
        push_(db);
 
467
        path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
 
468
        release_(db);
 
469
        
 
470
        push_(path);
 
471
        new_(dump, CSStringBuffer(20));
 
472
        push_(dump);
 
473
 
 
474
        if (path->exists()) {
 
475
                CSFile  *file;
 
476
                size_t  size;
 
477
                
 
478
                file = path->openFile(CSFile::READONLY);
 
479
                push_(file);
 
480
                
 
481
                size = file->getEOF();
 
482
                dump->setLength(size);
 
483
                file->read(dump->getBuffer(0), 0, size, size);
 
484
                release_(file);
 
485
        }
 
486
        
 
487
        pop_(dump);
 
488
        release_(path);
 
489
        return_(dump);
 
490
}
 
491
 
 
492
void MSCloudTable::restoreTable(MSDatabase *db, const char *data, size_t size, bool reload)
 
493
{
 
494
        CSPath  *path;
 
495
        CSFile  *file;
 
496
 
 
497
        enter_();
 
498
        
 
499
        push_(db);
 
500
        path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
 
501
        push_(path);
 
502
        
 
503
        file = path->openFile(CSFile::CREATE | CSFile::TRUNCATE);
 
504
        push_(file);
 
505
        
 
506
        file->write(data, 0, size);
 
507
        file->close();
 
508
        release_(file);
 
509
        
 
510
        release_(path);
 
511
        
 
512
        pop_(db);
 
513
        if (reload)
 
514
                loadTable(db);
 
515
        else
 
516
                db->release();
 
517
                
 
518
        exit_();
 
519
}
 
520
 
 
521
void MSCloudTable::removeTable(CSString *db_path)
 
522
{
 
523
        CSPath  *path;
 
524
        char pbms_path[PATH_MAX];
 
525
        
 
526
        enter_();
 
527
        
 
528
        push_(db_path); 
 
529
        cs_strcpy(PATH_MAX, pbms_path, db_path->getCString());
 
530
        release_(db_path);
 
531
        
 
532
        if (strcmp(cs_last_name_of_path(pbms_path), "pbms")  != 0)
 
533
                exit_();
 
534
                
 
535
        cs_remove_last_name_of_path(pbms_path);
 
536
 
 
537
        path = getSysFile(CSString::newString(pbms_path), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
 
538
        push_(path);
 
539
        
 
540
        if (path->exists())
 
541
                path->removeFile();
 
542
        release_(path);
 
543
        
 
544
        exit_();
 
545
}
 
546