1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
* System cloud starage info table.
28
#include <drizzled/common.h>
29
#include <drizzled/session.h>
30
#include <drizzled/sql_lex.h>
33
#include "cslib/CSConfig.h"
36
#include <sys/types.h>
41
//#include "mysql_priv.h"
42
#include "cslib/CSGlobal.h"
43
#include "cslib/CSStrUtil.h"
44
#include "cslib/CSLog.h"
45
#include "cslib/CSPath.h"
46
#include "cslib/CSDirectory.h"
52
#include "database_ms.h"
53
#include "open_table_ms.h"
54
#include "discover_ms.h"
55
#include "systab_util_ms.h"
57
#include "systab_cloud_ms.h"
59
DT_FIELD_INFO pbms_cloud_info[]=
61
{"Id", NOVAL, NULL, MYSQL_TYPE_LONG, NULL, NOT_NULL_FLAG, "The Cloud storage reference ID"},
62
{"Server", 1024, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 server name"},
63
{"Bucket", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 bucket name"},
64
{"PublicKey", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 public key"},
65
{"PrivateKey", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 private key"},
66
{NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
69
DT_KEY_INFO pbms_cloud_keys[]=
71
{"pbms_cloud_pk", PRI_KEY_FLAG, {"Id", NULL}},
75
#define MIN_CLOUD_TABLE_SIZE 4
77
//----------------------------
78
void MSCloudTable::startUp()
80
MSCloudInfo::startUp();
83
//----------------------------
84
void MSCloudTable::shutDown()
86
MSCloudInfo::shutDown();
89
//----------------------------
90
void MSCloudTable::loadTable(MSDatabase *db)
96
lock_(MSCloudInfo::gCloudInfo);
98
if (MSCloudInfo::gMaxInfoRef == 0) {
100
path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
103
if (path->exists()) {
105
SysTabRec *cloudData;
106
const char *server, *bucket, *pubKey, *privKey;
111
new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
114
file = path->openFile(CSFile::READONLY);
116
size = file->getEOF();
117
cloudData->setLength(size);
118
file->read(cloudData->getBuffer(0), 0, size, size);
121
cloudData->firstRecord();
122
MSCloudInfo::gMaxInfoRef = cloudData->getInt4Field();
124
if (! cloudData->isValidRecord())
125
MSCloudInfo::gMaxInfoRef = 1;
127
while (cloudData->nextRecord()) {
128
info_id = cloudData->getInt4Field();
129
server = cloudData->getStringField();
130
bucket = cloudData->getStringField();
131
pubKey = cloudData->getStringField();
132
privKey = cloudData->getStringField();
134
if (cloudData->isValidRecord()) {
135
if (info_id > MSCloudInfo::gMaxInfoRef) {
137
snprintf(msg, 80, "Cloud info id (%"PRIu32") larger than expected (%"PRIu32")\n", info_id, MSCloudInfo::gMaxInfoRef);
138
CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
139
CSL.log(self, CSLog::Warning, msg);
140
MSCloudInfo::gMaxInfoRef = info_id +1;
142
if ( MSCloudInfo::gCloudInfo->get(info_id)) {
144
snprintf(msg, 80, "Duplicate Cloud info id (%"PRIu32") being ignored\n", info_id);
145
CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
146
CSL.log(self, CSLog::Warning, msg);
148
new_(info, MSCloudInfo( info_id, server, bucket, pubKey, privKey));
149
MSCloudInfo::gCloudInfo->set(info_id, info);
153
release_(cloudData); cloudData = NULL;
156
MSCloudInfo::gMaxInfoRef = 1;
161
unlock_(MSCloudInfo::gCloudInfo);
168
void MSCloudTable::saveTable(MSDatabase *db)
170
SysTabRec *cloudData;
176
new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
179
// Build the table records
181
lock_(MSCloudInfo::gCloudInfo);
183
cloudData->beginRecord();
184
cloudData->setInt4Field(MSCloudInfo::gMaxInfoRef);
185
cloudData->endRecord();
186
for (int i = 0;(info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->itemAt(i)); i++) { // info is not referenced.
188
cloudData->beginRecord();
189
cloudData->setInt4Field(info->getCloudRefId());
190
cloudData->setStringField(info->getServer());
191
cloudData->setStringField(info->getBucket());
192
cloudData->setStringField(info->getPublicKey());
193
cloudData->setStringField(info->getPrivateKey());
194
cloudData->endRecord();
196
unlock_(MSCloudInfo::gCloudInfo);
198
restoreTable(RETAIN(db), cloudData->getBuffer(0), cloudData->length(), false);
206
MSCloudTable::MSCloudTable(MSSystemTableShare *share, TABLE *table):
207
MSOpenSystemTable(share, table),
212
MSCloudTable::~MSCloudTable()
217
void MSCloudTable::use()
219
MSCloudInfo::gCloudInfo->lock();
222
void MSCloudTable::unuse()
224
MSCloudInfo::gCloudInfo->unlock();
229
void MSCloudTable::seqScanInit()
234
#define MAX_PASSWORD ((int32_t)64)
235
bool MSCloudTable::seqScanNext(char *buf)
237
char passwd[MAX_PASSWORD +1];
238
TABLE *table = mySQLTable;
241
MY_BITMAP *save_write_set;
247
info = (MSCloudInfo *) MSCloudInfo::gCloudInfo->itemAt(iCloudIndex++); // Object is not referenced.
251
save_write_set = table->write_set;
252
table->write_set = NULL;
255
memset(buf, 0xFF, table->getNullBytes());
257
memset(buf, 0xFF, table->s->null_bytes);
259
for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
261
save = curr_field->ptr;
262
#if MYSQL_VERSION_ID < 50114
263
curr_field->ptr = (byte *) buf + curr_field->offset();
266
curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
268
curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
271
switch (curr_field->field_name[0]) {
273
ASSERT(strcmp(curr_field->field_name, "Id") == 0);
274
curr_field->store(info->getCloudRefId(), true);
278
ASSERT(strcmp(curr_field->field_name, "Server") == 0);
279
val = info->getServer();
280
curr_field->store(val, strlen(val), &UTF8_CHARSET);
281
setNotNullInRecord(curr_field, buf);
285
ASSERT(strcmp(curr_field->field_name, "Bucket") == 0);
286
val = info->getBucket();
287
curr_field->store(val, strlen(val), &UTF8_CHARSET);
288
setNotNullInRecord(curr_field, buf);
292
if (curr_field->field_name[1] == 'u') {
293
ASSERT(strcmp(curr_field->field_name, "PublicKey") == 0);
294
val = info->getPublicKey();
295
} else if (curr_field->field_name[1] == 'r') {
296
ASSERT(strcmp(curr_field->field_name, "PrivateKey") == 0);
297
val = info->getPrivateKey();
300
for (i = 0; (i < MAX_PASSWORD) && (i < (int32_t)strlen(val)); i++) passwd[i] = '*';
307
curr_field->store(val, strlen(val), &UTF8_CHARSET);
308
setNotNullInRecord(curr_field, buf);
314
curr_field->ptr = save;
317
table->write_set = save_write_set;
322
void MSCloudTable::seqScanPos(unsigned char *pos )
324
int32_t index = iCloudIndex -1;
326
index = 0; // This is probably an error condition.
328
mi_int4store(pos, index);
331
void MSCloudTable::seqScanRead(unsigned char *pos , char *buf)
333
iCloudIndex = mi_uint4korr(pos);
337
void MSCloudTable::updateRow(char *old_data, char *new_data)
339
uint32_t n_id, o_id, o_indx, n_indx;
340
const char *realPrivKey;
341
String server, bucket, pubKey, privKey;
342
String o_server, o_bucket, o_pubKey, o_privKey;
347
getFieldValue(new_data, 0, &n_id);
348
getFieldValue(new_data, 1, &server);
349
getFieldValue(new_data, 2, &bucket);
350
getFieldValue(new_data, 3, &pubKey);
351
getFieldValue(new_data, 4, &privKey);
353
getFieldValue(old_data, 0, &o_id);
354
getFieldValue(old_data, 1, &o_server);
355
getFieldValue(old_data, 2, &o_bucket);
356
getFieldValue(old_data, 3, &o_pubKey);
357
getFieldValue(old_data, 4, &o_privKey);
359
// The cloud ID must be unique
360
if ((o_id != n_id) && MSCloudInfo::gCloudInfo->get(n_id)) {
361
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to update a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
364
// The private key is masked when returned to the caller, so
365
// unless the caller has updated it we need to get the real
366
// private key from the old record.
367
if (strcmp(privKey.c_ptr(), o_privKey.c_ptr()))
368
realPrivKey = privKey.c_ptr();
370
info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->get(o_id); // unreference pointer
371
realPrivKey = info->getPrivateKey();
374
new_(info, MSCloudInfo( n_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), realPrivKey));
377
o_indx = MSCloudInfo::gCloudInfo->getIndex(o_id);
379
MSCloudInfo::gCloudInfo->remove(o_id);
381
MSCloudInfo::gCloudInfo->set(n_id, info);
382
n_indx = MSCloudInfo::gCloudInfo->getIndex(n_id);
384
// Adjust the current position in the array if required.
385
if (o_indx < n_indx )
388
saveTable(RETAIN(myShare->mySysDatabase));
392
void MSCloudTable::insertRow(char *data)
395
String server, bucket, pubKey, privKey;
400
getFieldValue(data, 0, &ref_id);
402
// The cloud ID must be unique
403
if (ref_id && MSCloudInfo::gCloudInfo->get(ref_id)) {
404
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to insert a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
407
getFieldValue(data, 1, &server);
408
getFieldValue(data, 2, &bucket);
409
getFieldValue(data, 3, &pubKey);
410
getFieldValue(data, 4, &privKey);
413
ref_id = MSCloudInfo::gMaxInfoRef++;
414
else if (ref_id >= MSCloudInfo::gMaxInfoRef)
415
MSCloudInfo::gMaxInfoRef = ref_id +1;
417
new_(info, MSCloudInfo( ref_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), privKey.c_ptr()));
418
MSCloudInfo::gCloudInfo->set(ref_id, info);
420
saveTable(RETAIN(myShare->mySysDatabase));
424
void MSCloudTable::deleteRow(char *data)
426
uint32_t ref_id, indx;
430
getFieldValue(data, 0, &ref_id);
432
// Adjust the current position in the array if required.
433
indx = MSCloudInfo::gCloudInfo->getIndex(ref_id);
434
if (indx <= iCloudIndex)
437
MSCloudInfo::gCloudInfo->remove(ref_id);
438
saveTable(RETAIN(myShare->mySysDatabase));
442
void MSCloudTable::transferTable(MSDatabase *to_db, MSDatabase *from_db)
450
path = CSPath::newPath(getPBMSPath(RETAIN(from_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
452
if (path->exists()) {
454
bu_path = CSPath::newPath(getPBMSPath(RETAIN(to_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
455
path->copyTo(bu_path, true);
465
CSStringBuffer *MSCloudTable::dumpTable(MSDatabase *db)
469
CSStringBuffer *dump;
474
path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
478
new_(dump, CSStringBuffer(20));
481
if (path->exists()) {
485
file = path->openFile(CSFile::READONLY);
488
size = file->getEOF();
489
dump->setLength(size);
490
file->read(dump->getBuffer(0), 0, size, size);
499
void MSCloudTable::restoreTable(MSDatabase *db, const char *data, size_t size, bool reload)
507
path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
510
file = path->openFile(CSFile::CREATE | CSFile::TRUNCATE);
513
file->write(data, 0, size);
528
void MSCloudTable::removeTable(CSString *db_path)
531
char pbms_path[PATH_MAX];
536
cs_strcpy(PATH_MAX, pbms_path, db_path->getCString());
539
if (strcmp(cs_last_name_of_path(pbms_path), "pbms") != 0)
542
cs_remove_last_name_of_path(pbms_path);
544
path = getSysFile(CSString::newString(pbms_path), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);