~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/system_table_ms.cc

  • Committer: Monty Taylor
  • Date: 2010-07-04 20:02:43 UTC
  • mfrom: (1548.2.40 drizzle_pbms)
  • mto: This revision was merged to the branch mainline in revision 1644.
  • Revision ID: mordred@inaugust.com-20100704200243-2vkq9gi6ysauj2tb
Merge PBMS from Barry.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-07-18
 
23
 *
 
24
 * H&G2JCtL
 
25
 *
 
26
 * System tables.
 
27
 *
 
28
 */
 
29
 
 
30
#ifdef DRIZZLED
 
31
#include "config.h"
 
32
#include <drizzled/common.h>
 
33
#include <drizzled/session.h>
 
34
#include <drizzled/table.h>
 
35
#include <drizzled/field.h>
 
36
#include <drizzled/field/blob.h>
 
37
 
 
38
#include <drizzled/message/table.pb.h>
 
39
#include "drizzled/charset_info.h"
 
40
#include <drizzled/table_proto.h>
 
41
#endif
 
42
 
 
43
 
 
44
#include "cslib/CSConfig.h"
 
45
#include <inttypes.h>
 
46
 
 
47
#include <sys/types.h>
 
48
#include <sys/stat.h>
 
49
#include <stdlib.h>
 
50
#include <time.h>
 
51
 
 
52
//#include "mysql_priv.h"
 
53
//#include <plugin.h>
 
54
 
 
55
#include "cslib/CSGlobal.h"
 
56
#include "cslib/CSStrUtil.h"
 
57
#include "ha_pbms.h"
 
58
 
 
59
#include "mysql_ms.h"
 
60
#include "engine_ms.h"
 
61
#include "system_table_ms.h"
 
62
#include "repository_ms.h"
 
63
#include "database_ms.h"
 
64
#include "compactor_ms.h"
 
65
#include "open_table_ms.h"
 
66
#include "metadata_ms.h"
 
67
#ifdef HAVE_ALIAS_SUPPORT
 
68
#include "alias_ms.h"
 
69
#endif
 
70
#include "cloud_ms.h"
 
71
#include "transaction_ms.h"
 
72
 
 
73
#include "systab_httpheader_ms.h"
 
74
#include "systab_dump_ms.h"
 
75
#include "systab_variable_ms.h"
 
76
#include "systab_cloud_ms.h"
 
77
#include "systab_backup_ms.h"
 
78
#ifndef DRIZZLED
 
79
#include "systab_enabled_ms.h"
 
80
#endif
 
81
#include "discover_ms.h"
 
82
#include "parameters_ms.h"
 
83
 
 
84
///* Note: mysql_priv.h messes with new, which caused a crash. */
 
85
//#ifdef new
 
86
//#undef new
 
87
//#endif
 
88
 
 
89
/* Definitions for PBMS table discovery: */
 
90
//--------------------------------
 
91
static DT_FIELD_INFO pbms_repository_info[]=
 
92
{
 
93
#ifdef HAVE_ALIAS_SUPPORT
 
94
        {"Blob_alias",                  BLOB_ALIAS_LENGTH, NULL, MYSQL_TYPE_VARCHAR,            &my_charset_utf8_bin,   0,                                                              "The BLOB alias"},
 
95
#endif
 
96
        {"Repository_id",               NULL, NULL, MYSQL_TYPE_LONG,            NULL,                                   NOT_NULL_FLAG,                                  "The repository file number"},
 
97
        {"Repo_blob_offset",    NULL, NULL, MYSQL_TYPE_LONGLONG,        NULL,                                   NOT_NULL_FLAG,                                  "The offset of the BLOB in the repository file"},
 
98
        {"Blob_size",                   NULL, NULL, MYSQL_TYPE_LONGLONG,        NULL,                                   NOT_NULL_FLAG,                                  "The size of the BLOB in bytes"},
 
99
        {"MD5_Checksum",                32,   NULL,     MYSQL_TYPE_VARCHAR,             system_charset_info,    0,                                                              "The MD5 Digest of the BLOB data."},
 
100
        {"Head_size",                   NULL, NULL, MYSQL_TYPE_SHORT,           NULL,                                   NOT_NULL_FLAG | UNSIGNED_FLAG,  "The size of the BLOB header - proceeds the BLOB data"},
 
101
        {"Access_code",                 NULL, NULL, MYSQL_TYPE_LONG,            NULL,                                   NOT_NULL_FLAG,                                  "The 4-byte authorisation code required to access the BLOB - part of the BLOB URL"},
 
102
        {"Creation_time",               NULL, NULL, MYSQL_TYPE_TIMESTAMP,       NULL,                                   NOT_NULL_FLAG,                                  "The time the BLOB was created"},
 
103
        {"Last_ref_time",               NULL, NULL, MYSQL_TYPE_TIMESTAMP,       NULL,                                   0,                                                              "The last time the BLOB was referenced"},
 
104
        {"Last_access_time",    NULL, NULL, MYSQL_TYPE_TIMESTAMP,       NULL,                                   0,                                                              "The last time the BLOB was accessed (read)"},
 
105
        {"Access_count",                NULL, NULL, MYSQL_TYPE_LONG,            NULL,                                   NOT_NULL_FLAG,                                  "The count of the number of times the BLOB has been read"},
 
106
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
107
};
 
108
 
 
109
#ifdef PBMS_HAS_KEYS
 
110
static DT_KEY_INFO pbms_repository_keys[]=
 
111
{
 
112
        {"pbms_repository_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
113
        {NULL, 0, {NULL}}
 
114
};
 
115
#endif
 
116
 
 
117
static DT_FIELD_INFO pbms_metadata_info[]=
 
118
{
 
119
        {"Repository_id",               NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                                   NOT_NULL_FLAG,  "The repository file number"},
 
120
        {"Repo_blob_offset",    NULL,                                   NULL, MYSQL_TYPE_LONGLONG,      NULL,                                                   NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
 
121
        {"Name",                                MS_META_NAME_SIZE,      NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "Metadata name"},
 
122
        {"Value",                               MS_META_VALUE_SIZE,     NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,                  NOT_NULL_FLAG,  "Metadata value"},
 
123
        {NULL,                                  NULL,                                   NULL, MYSQL_TYPE_STRING,        NULL, 0, NULL}
 
124
};
 
125
 
 
126
#ifdef PBMS_HAS_KEYS
 
127
static DT_KEY_INFO pbms_metadata_keys[]=
 
128
{
 
129
        {"pbms_metadata_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
130
        {NULL, 0, {NULL}}
 
131
};
 
132
#endif
 
133
 
 
134
 
 
135
#ifdef HAVE_ALIAS_SUPPORT
 
136
static DT_FIELD_INFO pbms_alias_info[]=
 
137
{
 
138
        {"Repository_id",               NULL, NULL, MYSQL_TYPE_LONG,            NULL,                           NOT_NULL_FLAG,  "The repository file number"},
 
139
        {"Repo_blob_offset",    NULL, NULL, MYSQL_TYPE_LONGLONG,        NULL,                           NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
 
140
        {"Blob_alias",                  BLOB_ALIAS_LENGTH, NULL, MYSQL_TYPE_VARCHAR,            &my_charset_utf8_bin,   NOT_NULL_FLAG,                  "The BLOB alias"},
 
141
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
142
};
 
143
 
 
144
static DT_KEY_INFO pbms_alias_keys[]=
 
145
{
 
146
        {"pbms_alias_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
147
        {NULL, 0, {NULL}}
 
148
};
 
149
#endif
 
150
 
 
151
static DT_FIELD_INFO pbms_blobs_info[]=
 
152
{
 
153
        {"Repository_id",               NULL, NULL, MYSQL_TYPE_LONG,            NULL,                           NOT_NULL_FLAG,  "The repository file number"},
 
154
        {"Repo_blob_offset",    NULL, NULL, MYSQL_TYPE_LONGLONG,        NULL,                           NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
 
155
        {"Blob_data",                   NULL, NULL, MYSQL_TYPE_LONG_BLOB,       &my_charset_bin,        NOT_NULL_FLAG,  "The data of this BLOB"},
 
156
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
157
};
 
158
 
 
159
#ifdef PBMS_HAS_KEYS
 
160
static DT_KEY_INFO pbms_blobs_keys[]=
 
161
{
 
162
        {"pbms_blobs_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
163
        {NULL, 0, {NULL}}
 
164
};
 
165
#endif
 
166
 
 
167
static DT_FIELD_INFO pbms_reference_info[]=
 
168
{
 
169
        {"Table_name",          MS_TABLE_NAME_SIZE,             NULL, MYSQL_TYPE_STRING,        system_charset_info,    0,      "The name of the referencing table"},
 
170
        {"Column_ordinal",      NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                   0,      "The column ordinal of the referencing field"},
 
171
        {"Blob_id",                     NULL,                                   NULL, MYSQL_TYPE_LONGLONG,      NULL,                                   NOT_NULL_FLAG,  "The BLOB reference number - part of the BLOB URL"},
 
172
        {"Blob_url",            PBMS_BLOB_URL_SIZE,             NULL, MYSQL_TYPE_VARCHAR,       system_charset_info,    0,      "The BLOB URL for HTTP GET access"},
 
173
        {"Repository_id",       NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                   NOT_NULL_FLAG,  "The repository file number of the BLOB"},
 
174
        {"Repo_blob_offset",NULL,                                       NULL, MYSQL_TYPE_LONGLONG,      NULL,                                   NOT_NULL_FLAG,  "The offset in the repository file"},
 
175
        {"Blob_size",           NULL,                                   NULL, MYSQL_TYPE_LONGLONG,      NULL,                                   NOT_NULL_FLAG,  "The size of the BLOB in bytes"},
 
176
        {"Deletion_time",       NULL,                                   NULL, MYSQL_TYPE_TIMESTAMP,     NULL,                                   0,                              "The time the BLOB was deleted"},
 
177
        {"Remove_in",           NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                   0,                              "The number of seconds before the reference/BLOB is removed perminently"},
 
178
        {"Temp_log_id",         NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                   0,                              "Temporary log number of the referencing deletion entry"},
 
179
        {"Temp_log_offset",     NULL,                                   NULL, MYSQL_TYPE_LONGLONG,      NULL,                                   0,                              "Temporary log offset of the referencing deletion entry"},
 
180
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
181
};
 
182
 
 
183
#ifdef PBMS_HAS_KEYS
 
184
static DT_KEY_INFO pbms_reference_keys[]=
 
185
{
 
186
        {"pbms_reference_pk", PRI_KEY_FLAG, {"Table_name", "Blob_id", NULL}},
 
187
        {"pbms_reference_k", MULTIPLE_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
188
        {NULL, 0, {NULL}}
 
189
};
 
190
#endif
 
191
 
 
192
 
 
193
typedef enum {  SYS_REP = 0, 
 
194
                                SYS_REF, 
 
195
                                SYS_BLOB, 
 
196
                                SYS_DUMP, 
 
197
                                SYS_META, 
 
198
                                SYS_HTTP, 
 
199
#ifdef HAVE_ALIAS_SUPPORT
 
200
                                SYS_ALIAS, 
 
201
#endif
 
202
                                SYS_VARIABLE, 
 
203
                                SYS_CLOUD, 
 
204
                                SYS_BACKUP, 
 
205
#ifndef DRIZZLED
 
206
                                SYS_ENABLED, 
 
207
#endif
 
208
                                SYS_UNKNOWN} SysTableType;
 
209
                                
 
210
static const char *sysTableNames[] = {
 
211
        "pbms_repository",
 
212
        "pbms_reference",
 
213
        "pbms_blob",
 
214
        "pbms_dump",
 
215
        "pbms_metadata",
 
216
        METADATA_HEADER_NAME,
 
217
#ifdef HAVE_ALIAS_SUPPORT
 
218
        "pbms_alias",
 
219
#endif
 
220
        VARIABLES_TABLE_NAME,
 
221
        CLOUD_TABLE_NAME,
 
222
        BACKUP_TABLE_NAME,
 
223
#ifndef DRIZZLED
 
224
        ENABLED_TABLE_NAME,
 
225
#endif
 
226
        NULL
 
227
};
 
228
 
 
229
static INTERRNAL_TABLE_INFO pbms_internal_tables[]=
 
230
{
 
231
#ifdef PBMS_HAS_KEYS
 
232
        { false, sysTableNames[SYS_REP],pbms_repository_info, pbms_repository_keys},
 
233
        { false, sysTableNames[SYS_REF], pbms_reference_info, pbms_reference_keys},
 
234
        { false, sysTableNames[SYS_BLOB], pbms_blobs_info, pbms_blobs_keys},
 
235
        { false, sysTableNames[SYS_DUMP], pbms_dump_info, pbms_dump_keys},
 
236
        { false, sysTableNames[SYS_META], pbms_metadata_info, pbms_metadata_keys},
 
237
        { false, sysTableNames[SYS_HTTP], pbms_metadata_headers_info, pbms_metadata_headers_keys},
 
238
#ifdef HAVE_ALIAS_SUPPORT
 
239
        { false, sysTableNames[SYS_ALIAS], pbms_alias_info, pbms_alias_keys},
 
240
#endif
 
241
        { false, sysTableNames[SYS_VARIABLE], pbms_variable_info, pbms_variable_keys},
 
242
        { true, sysTableNames[SYS_CLOUD], pbms_cloud_info, pbms_cloud_keys},
 
243
        { true, sysTableNames[SYS_BACKUP], pbms_backup_info, pbms_backup_keys},
 
244
#ifndef DRIZZLED
 
245
        { true, sysTableNames[SYS_ENABLED], pbms_enabled_info, pbms_enabled_keys},
 
246
#endif
 
247
#else
 
248
        { false, sysTableNames[SYS_REP], pbms_repository_info, NULL},
 
249
        { false, sysTableNames[SYS_REF], pbms_reference_info, NULL},
 
250
        { false, sysTableNames[SYS_BLOB], pbms_blobs_info, NULL},
 
251
        { false, sysTableNames[SYS_DUMP], pbms_dump_info, NULL},
 
252
        { false, sysTableNames[SYS_META], pbms_metadata_info, NULL},
 
253
        { false, sysTableNames[SYS_HTTP], pbms_metadata_headers_info, NULL},
 
254
#ifdef HAVE_ALIAS_SUPPORT
 
255
        { false, sysTableNames[SYS_ALIAS], pbms_alias_info, NULL},
 
256
#endif
 
257
        { false, sysTableNames[SYS_VARIABLE], pbms_variable_info, NULL},
 
258
        { true, sysTableNames[SYS_CLOUD], pbms_cloud_info, NULL},
 
259
        { true, sysTableNames[SYS_BACKUP], pbms_backup_info, NULL},
 
260
#ifndef DRIZZLED
 
261
        { true, sysTableNames[SYS_ENABLED], pbms_enabled_info, NULL},
 
262
#endif
 
263
#endif
 
264
 
 
265
        { false, NULL, NULL, NULL}
 
266
        
 
267
};
 
268
 
 
269
//--------------------------
 
270
static SysTableType pbms_systable_type(const char *table)
 
271
{
 
272
        int i = 0;
 
273
        
 
274
        while ((i < SYS_UNKNOWN) && strcasecmp(table, sysTableNames[i])) i++;
 
275
        
 
276
        return((SysTableType) i );
 
277
}
 
278
 
 
279
//--------------------------
 
280
bool PBMSSystemTables::isSystemTable(bool isPBMS, const char *table)
 
281
{
 
282
        SysTableType i;
 
283
        
 
284
        i = pbms_systable_type(table);
 
285
 
 
286
        if (i == SYS_UNKNOWN)
 
287
                return false;
 
288
                
 
289
        return (pbms_internal_tables[i].is_pbms == isPBMS);
 
290
}
 
291
 
 
292
//--------------------------
 
293
#ifdef DRIZZLED
 
294
using namespace std;
 
295
using namespace drizzled;
 
296
#undef TABLE
 
297
#undef Field
 
298
static int pbms_create_proto_table(const char *engine_name, const char *name, DT_FIELD_INFO *info, DT_KEY_INFO *keys, drizzled::message::Table &table)
 
299
{
 
300
        message::Table::Field *field;
 
301
        message::Table::Field::FieldConstraints *field_constraints;
 
302
        message::Table::Field::StringFieldOptions *string_field_options;
 
303
        message::Table::TableOptions *table_options;
 
304
 
 
305
        table.set_name(name);
 
306
        table.set_name(name);
 
307
        table.set_type(message::Table::STANDARD);
 
308
        table.mutable_engine()->set_name(engine_name);
 
309
        
 
310
        table_options = table.mutable_options();
 
311
        table_options->set_collation_id(my_charset_utf8_bin.number);
 
312
        table_options->set_collation(my_charset_utf8_bin.name);
 
313
        
 
314
        while (info->field_name) {      
 
315
                field= table.add_field();
 
316
                
 
317
                field->set_name(info->field_name);
 
318
                if (info->comment)
 
319
                        field->set_comment(info->comment);
 
320
                        
 
321
                field_constraints= field->mutable_constraints();
 
322
                if (info->field_flags & NOT_NULL_FLAG)
 
323
                        field_constraints->set_is_nullable(false);
 
324
                else
 
325
                        field_constraints->set_is_nullable(true);
 
326
                
 
327
                if (info->field_flags & UNSIGNED_FLAG)
 
328
                        field_constraints->set_is_unsigned(true);
 
329
                else
 
330
                        field_constraints->set_is_unsigned(false);
 
331
 
 
332
                switch (info->field_type) {
 
333
                        case DRIZZLE_TYPE_VARCHAR:
 
334
                                string_field_options = field->mutable_string_options();
 
335
                                
 
336
                                field->set_type(message::Table::Field::VARCHAR);
 
337
                                string_field_options->set_length(info->field_length);
 
338
                                if (info->field_charset) {
 
339
                                        string_field_options->set_collation(info->field_charset->name);
 
340
                                        string_field_options->set_collation_id(info->field_charset->number);
 
341
                                }
 
342
                                break;
 
343
                                
 
344
                        case DRIZZLE_TYPE_LONG:
 
345
                                field->set_type(message::Table::Field::INTEGER);
 
346
                                break;
 
347
                                
 
348
                        case DRIZZLE_TYPE_DOUBLE:
 
349
                                field->set_type(message::Table::Field::DOUBLE);
 
350
                                break;
 
351
                                
 
352
                        case DRIZZLE_TYPE_LONGLONG:
 
353
                                field->set_type(message::Table::Field::BIGINT);
 
354
                                break;
 
355
                                
 
356
                        case DRIZZLE_TYPE_TIMESTAMP:
 
357
                                field->set_type(message::Table::Field::TIMESTAMP);
 
358
                                break;
 
359
                                
 
360
                        case DRIZZLE_TYPE_BLOB:
 
361
                                field->set_type(message::Table::Field::BLOB);
 
362
                                if (info->field_charset) {
 
363
                                        string_field_options = field->mutable_string_options();
 
364
                                        string_field_options->set_collation(info->field_charset->name);
 
365
                                        string_field_options->set_collation_id(info->field_charset->number);
 
366
                                }
 
367
                                break;
 
368
                                
 
369
                        default:
 
370
                                assert(0); 
 
371
                }
 
372
                info++;
 
373
        }
 
374
        
 
375
                        
 
376
        if (keys) {
 
377
                while (keys->key_name) {
 
378
                        // To be done later. (maybe)
 
379
                        keys++;
 
380
                }
 
381
        }
 
382
 
 
383
        return 0;
 
384
}
 
385
#define TABLE                                                           drizzled::Table
 
386
#define Field                                                           drizzled::Field
 
387
 
 
388
int PBMSSystemTables::getSystemTableInfo(const char *name, drizzled::message::Table &table)
 
389
{
 
390
        int err = 1, i = 0;
 
391
                        
 
392
        while (pbms_internal_tables[i].name) {
 
393
                if (strcasecmp(name, pbms_internal_tables[i].name) == 0){
 
394
                        err = pbms_create_proto_table("PBMS", name, pbms_internal_tables[i].info, pbms_internal_tables[i].keys, table);
 
395
                        break;
 
396
                }
 
397
                i++;
 
398
        }
 
399
        
 
400
        return err;
 
401
}
 
402
 
 
403
void PBMSSystemTables::getSystemTableNames(bool isPBMS, std::set<std::string> &set_of_names)
 
404
{
 
405
        int i = 0;
 
406
                        
 
407
        while (pbms_internal_tables[i].name) {
 
408
                if ( isPBMS == pbms_internal_tables[i].is_pbms){
 
409
                        set_of_names.insert(pbms_internal_tables[i].name);
 
410
                }
 
411
                i++;
 
412
        }
 
413
        
 
414
}
 
415
 
 
416
#else // DRIZZLED
 
417
//--------------------------
 
418
static bool pbms_database_follder_exists( const char *db)
 
419
{
 
420
        struct stat stat_info;  
 
421
        char path[PATH_MAX];
 
422
        
 
423
        if (!db)
 
424
                return false;
 
425
                
 
426
        cs_strcpy(PATH_MAX, path, ms_my_get_mysql_home_path());
 
427
        cs_add_name_to_path(PATH_MAX, path, db);
 
428
        
 
429
        if (stat(path, &stat_info) == 0)
 
430
                return(stat_info.st_mode & S_IFDIR);
 
431
                
 
432
        return false;
 
433
}
 
434
 
 
435
int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen)
 
436
{
 
437
        int err = 1, i = 0;
 
438
        bool is_pbms = false;
 
439
 
 
440
        // Check that the database exists!
 
441
        if (!pbms_database_follder_exists(db))
 
442
                return err;
 
443
                
 
444
        is_pbms = (strcmp(db, "pbms") == 0);
 
445
                
 
446
        
 
447
        while (pbms_internal_tables[i].name) {
 
448
                if ((!strcasecmp(name, pbms_internal_tables[i].name)) && ( is_pbms == pbms_internal_tables[i].is_pbms)){
 
449
                        err = ms_create_table_frm(hton, thd, db, name, pbms_internal_tables[i].info, pbms_internal_tables[i].keys, frmblob, frmlen);
 
450
                        break;
 
451
                }
 
452
                i++;
 
453
        }
 
454
        
 
455
        return err;
 
456
}
 
457
#endif // DRIZZLED
 
458
 
 
459
// Transfer any physical PBMS ststem tables to another database.
 
460
void PBMSSystemTables::transferSystemTables(MSDatabase *dst_db, MSDatabase *src_db)
 
461
{
 
462
        enter_();
 
463
        push_(dst_db);
 
464
        push_(src_db);
 
465
        
 
466
        MSHTTPHeaderTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
 
467
        MSVariableTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
 
468
        MSCloudTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
 
469
        MSBackupTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
 
470
        
 
471
        release_(src_db);
 
472
        release_(dst_db);
 
473
        exit_();
 
474
}
 
475
 
 
476
//----------------
 
477
void PBMSSystemTables::removeSystemTables(CSString *db_path)
 
478
{
 
479
        enter_();
 
480
        push_(db_path);
 
481
        
 
482
        MSHTTPHeaderTable::removeTable(RETAIN(db_path));
 
483
        MSVariableTable::removeTable(RETAIN(db_path));
 
484
        MSCloudTable::removeTable(RETAIN(db_path));
 
485
        MSBackupTable::removeTable(RETAIN(db_path));
 
486
        
 
487
        release_(db_path);
 
488
        exit_();
 
489
}
 
490
 
 
491
//----------------
 
492
void PBMSSystemTables::loadSystemTables(MSDatabase *db)
 
493
{
 
494
        enter_();
 
495
        push_(db);
 
496
        
 
497
        for ( int i = 0; i < 4; i++) {
 
498
                try_(a) {
 
499
                        switch (i) {
 
500
                                case 0:
 
501
                                        MSHTTPHeaderTable::loadTable(RETAIN(db));
 
502
                                        break;
 
503
                                case 1:
 
504
                                        MSCloudTable::loadTable(RETAIN(db));
 
505
                                        break;
 
506
                                case 2:
 
507
                                        MSBackupTable::loadTable(RETAIN(db));
 
508
                                        break;
 
509
                                case 3:
 
510
                                        // Variable must be loaded after cloud and backup info
 
511
                                        // incase BLOB recovery is required.
 
512
                                        MSVariableTable::loadTable(RETAIN(db)); 
 
513
                                        break;
 
514
                                        
 
515
                                default:
 
516
                                        ASSERT(false);
 
517
                        }
 
518
                }
 
519
                catch_(a) {
 
520
                        self->logException();
 
521
                }
 
522
                cont_(a);
 
523
        }
 
524
        
 
525
        release_(db);
 
526
        exit_();
 
527
}
 
528
 
 
529
//----------------
 
530
// Dump all the system tables into one buffer.
 
531
typedef struct {
 
532
        CSDiskValue4 size_4;
 
533
        CSDiskValue1 tab_id_1;
 
534
        CSDiskValue4 tab_version_4;
 
535
} DumpHeaderRec, *DumpHeaderPtr;
 
536
 
 
537
typedef union {
 
538
                char *rec_chars;
 
539
                const char *rec_cchars;
 
540
                DumpHeaderPtr dumpHeader;
 
541
} DumpDiskData;
 
542
 
 
543
CSStringBuffer *PBMSSystemTables::dumpSystemTables(MSDatabase *db)
 
544
{
 
545
        CSStringBuffer *sysDump, *tabDump;
 
546
        uint32_t size, pos, tab_version;
 
547
        uint8_t tab_id;
 
548
        DumpDiskData    d;
 
549
 
 
550
        enter_();
 
551
        push_(db);
 
552
        new_(sysDump, CSStringBuffer());
 
553
        push_(sysDump);
 
554
        pos = 0;
 
555
        
 
556
        for ( int i = 0; i < 4; i++) {
 
557
                switch (i) {
 
558
                        case 0:
 
559
                                tabDump = MSHTTPHeaderTable::dumpTable(RETAIN(db));
 
560
                                tab_id = MSHTTPHeaderTable::tableID;
 
561
                                tab_version = MSHTTPHeaderTable::tableVersion;
 
562
                                break;
 
563
                        case 1:
 
564
                                tabDump = MSCloudTable::dumpTable(RETAIN(db));
 
565
                                tab_id = MSCloudTable::tableID;
 
566
                                tab_version = MSCloudTable::tableVersion;
 
567
                                break;
 
568
                        case 2:
 
569
                                tabDump = MSBackupTable::dumpTable(RETAIN(db));
 
570
                                tab_id = MSBackupTable::tableID;
 
571
                                tab_version = MSBackupTable::tableVersion;
 
572
                                break;
 
573
                        case 3:
 
574
                                tabDump = MSVariableTable::dumpTable(RETAIN(db)); // Dump the variables table last.
 
575
                                tab_id = MSVariableTable::tableID;
 
576
                                tab_version = MSVariableTable::tableVersion;
 
577
                                break;
 
578
                                
 
579
                        default:
 
580
                                ASSERT(false);
 
581
                }
 
582
                
 
583
                push_(tabDump);
 
584
                size = tabDump->length();
 
585
                
 
586
                // Grow the buffer for the header
 
587
                sysDump->setLength(pos + sizeof(DumpHeaderRec));
 
588
                
 
589
                // Add the dump header 
 
590
                d.rec_chars = sysDump->getBuffer(pos); 
 
591
                CS_SET_DISK_4(d.dumpHeader->size_4, size);;             
 
592
                CS_SET_DISK_1(d.dumpHeader->tab_id_1, tab_id);          
 
593
                CS_SET_DISK_4(d.dumpHeader->tab_version_4, tab_version);        
 
594
        
 
595
                sysDump->append(tabDump->getBuffer(0), size);
 
596
                pos += size + sizeof(DumpHeaderRec);
 
597
                release_(tabDump);
 
598
        }
 
599
        
 
600
        
 
601
        pop_(sysDump);
 
602
        release_(db);
 
603
        return_(sysDump);
 
604
}
 
605
 
 
606
//----------------
 
607
void PBMSSystemTables::restoreSystemTables(MSDatabase *db, const char *data, size_t size)
 
608
{
 
609
        uint32_t tab_size, tab_version;
 
610
        uint8_t tab_id;
 
611
        DumpDiskData    d;
 
612
 
 
613
        enter_();
 
614
        push_(db);
 
615
        
 
616
        while  ( size >= sizeof(DumpHeaderRec)) {
 
617
                d.rec_cchars = data;
 
618
                tab_size = CS_GET_DISK_4(d.dumpHeader->size_4);
 
619
                tab_id = CS_GET_DISK_1(d.dumpHeader->tab_id_1);                 
 
620
                tab_version = CS_GET_DISK_4(d.dumpHeader->tab_version_4);       
 
621
                data += sizeof(DumpHeaderRec);
 
622
                size -= sizeof(DumpHeaderRec);
 
623
                
 
624
                if (size < tab_size) {
 
625
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "PBMS system table restore data truncated.");
 
626
                }
 
627
                
 
628
                switch (tab_id) {
 
629
                        case MSHTTPHeaderTable::tableID:
 
630
                                if (MSHTTPHeaderTable::tableVersion == tab_version)
 
631
                                        MSHTTPHeaderTable::restoreTable(RETAIN(db), data, tab_size);
 
632
                                else
 
633
                                        CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "METADATA_HEADER_NAME" failed, incompatable table version" );
 
634
                                break;
 
635
                        case MSCloudTable::tableID:
 
636
                                if (MSCloudTable::tableVersion == tab_version)
 
637
                                        MSCloudTable::restoreTable(RETAIN(db), data, tab_size);
 
638
                                else
 
639
                                        CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "CLOUD_TABLE_NAME" failed, incompatable table version" );
 
640
                                break;
 
641
                        case MSBackupTable::tableID:
 
642
                                if (MSBackupTable::tableVersion == tab_version)
 
643
                                        MSBackupTable::restoreTable(RETAIN(db), data, tab_size);
 
644
                                else
 
645
                                        CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "BACKUP_TABLE_NAME" failed, incompatable table version" );
 
646
                                break;                          
 
647
                        case MSVariableTable::tableID:
 
648
                                if (MSVariableTable::tableVersion == tab_version)
 
649
                                        MSVariableTable::restoreTable(RETAIN(db), data, tab_size);
 
650
                                else
 
651
                                        CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "VARIABLES_TABLE_NAME" failed, incompatable table version" );
 
652
                                break;
 
653
                        default:
 
654
                                ASSERT(false);
 
655
                }
 
656
                 data += tab_size;
 
657
                 size -= tab_size;
 
658
        }
 
659
        
 
660
        if (size) {
 
661
                CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "PBMS trailing garbage in system table restore data being ignored.");
 
662
        }
 
663
                
 
664
        release_(db);
 
665
        exit_();
 
666
}
 
667
 
 
668
/*
 
669
 * -------------------------------------------------------------------------
 
670
 * MYSQL UTILITIES
 
671
 */
 
672
 
 
673
void MSOpenSystemTable::setNotNullInRecord(Field *field, char *record)
 
674
{
 
675
        if (field->null_ptr)
 
676
                record[(uint) (field->null_ptr - (uchar *) field->table->record[0])] &= (uchar) ~field->null_bit;
 
677
}
 
678
 
 
679
/*
 
680
 * -------------------------------------------------------------------------
 
681
 * OPEN SYSTEM TABLES
 
682
 */
 
683
 
 
684
MSOpenSystemTable::MSOpenSystemTable(MSSystemTableShare *share, TABLE *table):
 
685
CSRefObject()
 
686
{
 
687
        myShare = share;
 
688
        mySQLTable = table;
 
689
}
 
690
 
 
691
MSOpenSystemTable::~MSOpenSystemTable()
 
692
{
 
693
        MSSystemTableShare::releaseSystemTable(this);
 
694
}
 
695
 
 
696
 
 
697
/*
 
698
 * -------------------------------------------------------------------------
 
699
 * REPOSITORY TABLE
 
700
 */
 
701
 
 
702
MSRepositoryTable::MSRepositoryTable(MSSystemTableShare *share, TABLE *table):
 
703
MSOpenSystemTable(share, table),
 
704
iCompactor(NULL),
 
705
iRepoFile(NULL),
 
706
iBlobBuffer(NULL)
 
707
{
 
708
}
 
709
 
 
710
//-----------------------
 
711
MSRepositoryTable::~MSRepositoryTable()
 
712
{
 
713
        unuse();
 
714
        
 
715
        if (iBlobBuffer)
 
716
                iBlobBuffer->release();
 
717
}
 
718
 
 
719
//-----------------------
 
720
void MSRepositoryTable::use()
 
721
{
 
722
}
 
723
 
 
724
//-----------------------
 
725
void MSRepositoryTable::unuse()
 
726
{
 
727
        if (iCompactor) {
 
728
                iCompactor->resume();
 
729
                iCompactor->release();
 
730
                iCompactor = NULL;
 
731
        }
 
732
        if (iRepoFile) {
 
733
                iRepoFile->release();
 
734
                iRepoFile = NULL;
 
735
        }
 
736
        if (iBlobBuffer)
 
737
                iBlobBuffer->clear();
 
738
}
 
739
 
 
740
 
 
741
//-----------------------
 
742
void MSRepositoryTable::seqScanInit()
 
743
{
 
744
        MSDatabase *db;
 
745
 
 
746
        enter_();
 
747
        
 
748
        // Flush all committed transactions to the repository file.
 
749
        MSTransactionManager::flush();
 
750
 
 
751
        iRepoIndex = 0;
 
752
        iRepoOffset = 0;
 
753
        if (!iBlobBuffer)
 
754
                new_(iBlobBuffer, CSStringBuffer(20));
 
755
        db = myShare->mySysDatabase;
 
756
        if ((iCompactor = db->getCompactorThread())) {
 
757
                if (iCompactor->isMe(self))
 
758
                        iCompactor = NULL;
 
759
                else {
 
760
                        iCompactor->retain();
 
761
                        iCompactor->suspend();
 
762
                }
 
763
        }
 
764
                
 
765
        exit_();
 
766
}
 
767
 
 
768
//-----------------------
 
769
bool MSRepositoryTable::resetScan(bool positioned, uint32_t repo_index)
 
770
{
 
771
        if (positioned) {
 
772
                if (iRepoFile && (repo_index != iRepoIndex)) {
 
773
                        iRepoFile->release();
 
774
                        iRepoFile = NULL;
 
775
                }
 
776
                
 
777
                iRepoIndex = repo_index;
 
778
        }
 
779
        if (iRepoFile) 
 
780
                return true;
 
781
                
 
782
        enter_();
 
783
        MSRepository    *repo = NULL;
 
784
        CSSyncVector    *repo_list = myShare->mySysDatabase->getRepositoryList();
 
785
 
 
786
        lock_(repo_list);
 
787
        for (; iRepoIndex<repo_list->size(); iRepoIndex++) {
 
788
                if ((repo = (MSRepository *) repo_list->get(iRepoIndex))) {
 
789
                        iRepoFile = repo->openRepoFile();
 
790
                        break;
 
791
                }
 
792
        }
 
793
        unlock_(repo_list);
 
794
        
 
795
        if (!iRepoFile)
 
796
                return_(false);
 
797
        iRepoFileSize = repo->getRepoFileSize();
 
798
        if ((!iRepoOffset) || !positioned) 
 
799
                iRepoOffset = repo->getRepoHeadSize();
 
800
                
 
801
        iRepoCurrentOffset = iRepoOffset;
 
802
                
 
803
        return_(true);
 
804
}
 
805
 
 
806
//-----------------------
 
807
bool MSRepositoryTable::seqScanNext(char *buf)
 
808
{
 
809
 
 
810
        enter_();
 
811
        iRepoCurrentOffset = iRepoOffset;
 
812
        
 
813
        if (returnSubRecord(buf))
 
814
                goto exit;
 
815
 
 
816
        restart:
 
817
        if ((!iRepoFile) && !MSRepositoryTable::resetScan(false))
 
818
                return false;
 
819
 
 
820
        while (iRepoOffset < iRepoFileSize) {
 
821
                if (returnRecord(buf))
 
822
                        goto exit;
 
823
        }
 
824
 
 
825
        if (iRepoFile) {
 
826
                iRepoFile->release();
 
827
                iRepoFile = NULL;
 
828
                iRepoOffset = 0;
 
829
        }
 
830
        iRepoIndex++;
 
831
        goto restart;
 
832
 
 
833
        exit:
 
834
        return_(true);
 
835
}
 
836
 
 
837
//-----------------------
 
838
int     MSRepositoryTable::getRefLen()
 
839
{
 
840
        return sizeof(iRepoIndex) + sizeof(iRepoOffset);
 
841
}
 
842
 
 
843
 
 
844
//-----------------------
 
845
void MSRepositoryTable::seqScanPos(uint8_t *pos)
 
846
{
 
847
        mi_int4store(pos, iRepoIndex); pos +=4;
 
848
        mi_int8store(pos, iRepoCurrentOffset);
 
849
}
 
850
 
 
851
//-----------------------
 
852
void MSRepositoryTable::seqScanRead(uint32_t repo, uint64_t offset, char *buf)
 
853
{
 
854
        iRepoOffset = offset;
 
855
 
 
856
        if (!resetScan(true, repo))
 
857
                return;
 
858
                
 
859
        seqScanNext(buf);
 
860
}
 
861
 
 
862
//-----------------------
 
863
void MSRepositoryTable::seqScanRead(uint8_t *pos, char *buf)
 
864
{
 
865
        seqScanRead(mi_uint4korr( pos), mi_uint8korr(pos +4), buf);
 
866
}
 
867
 
 
868
//-----------------------
 
869
bool MSRepositoryTable::returnRecord(char *buf)
 
870
{
 
871
        CSMutex                 *lock;
 
872
        MSBlobHeadRec   blob;
 
873
        uint16_t                        head_size;
 
874
        uint64_t                        blob_size;
 
875
        int                             ref_count;
 
876
        size_t                  ref_size;
 
877
        uint8_t                 status;
 
878
 
 
879
        enter_();
 
880
        retry_read:
 
881
        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
882
        lock_(lock);
 
883
        if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
 
884
                unlock_(lock);
 
885
                iRepoOffset = iRepoFileSize;
 
886
                return_(false);
 
887
        }
 
888
        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
 
889
        blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
 
890
        ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
 
891
        ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
 
892
        status = CS_GET_DISK_1(blob.rb_status_1);
 
893
        if (ref_count <= 0 || ref_size == 0 ||
 
894
                head_size < iRepoFile->myRepo->getRepoBlobHeadSize() + ref_count * ref_size || 
 
895
                !VALID_BLOB_STATUS(status)) {
 
896
                /* Can't be true. Assume this is garbage! */
 
897
                unlock_(lock);
 
898
                iRepoOffset++;
 
899
                goto retry_read;
 
900
        }
 
901
 
 
902
        if (IN_USE_BLOB_STATUS(status)) {
 
903
                unlock_(lock);
 
904
                if (!returnRow(&blob, buf)) {
 
905
                        /* This record may not have had any data of interest. */
 
906
                        iRepoOffset++;
 
907
                        goto retry_read;
 
908
                }
 
909
                iRepoOffset += head_size + blob_size;
 
910
                return_(true);
 
911
        }
 
912
        unlock_(lock);
 
913
        iRepoOffset += head_size + blob_size;
 
914
        return_(false);
 
915
}
 
916
 
 
917
//-----------------------
 
918
bool MSRepositoryTable::returnSubRecord(char *)
 
919
{
 
920
        return false;
 
921
}
 
922
 
 
923
//-----------------------
 
924
bool MSRepositoryTable::returnRow(MSBlobHeadPtr blob, char *buf)
 
925
{
 
926
        TABLE           *table = mySQLTable;
 
927
        uint8_t         storage_type;
 
928
        uint32_t                access_count;
 
929
        uint32_t                last_access;
 
930
        uint32_t                last_ref;
 
931
        uint32_t                creation_time;
 
932
        uint32_t                access_code;
 
933
        uint16_t                head_size;
 
934
        uint16_t                alias_offset;
 
935
        uint64_t                blob_size;
 
936
        Field           *curr_field;
 
937
        byte            *save;
 
938
        MY_BITMAP       *save_write_set;
 
939
        
 
940
        enter_();
 
941
 
 
942
        storage_type = CS_GET_DISK_1(blob->rb_storage_type_1);
 
943
        last_access = CS_GET_DISK_4(blob->rb_last_access_4);
 
944
        access_count = CS_GET_DISK_4(blob->rb_access_count_4);
 
945
        last_ref = CS_GET_DISK_4(blob->rb_last_ref_4);
 
946
        creation_time = CS_GET_DISK_4(blob->rb_create_time_4);
 
947
        head_size = CS_GET_DISK_2(blob->rb_head_size_2);
 
948
        blob_size = CS_GET_DISK_6(blob->rb_blob_repo_size_6);
 
949
        access_code = CS_GET_DISK_4(blob->rb_auth_code_4);
 
950
        alias_offset = CS_GET_DISK_2(blob->rb_alias_offset_2);
 
951
 
 
952
        /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
 
953
         * I use store()!??
 
954
         * But I want to use it! :(
 
955
         */
 
956
        save_write_set = table->write_set;
 
957
        table->write_set = NULL;
 
958
 
 
959
        memset(buf, 0xFF, table->s->null_bytes);
 
960
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
961
                curr_field = *field;
 
962
 
 
963
                save = curr_field->ptr;
 
964
#if MYSQL_VERSION_ID < 50114
 
965
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
966
#else
 
967
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
 
968
#endif
 
969
                switch (curr_field->field_name[0]) {
 
970
                        case 'A':
 
971
                                switch (curr_field->field_name[9]) {
 
972
                                        case 'd':
 
973
                                                ASSERT(strcmp(curr_field->field_name, "Access_code") == 0);
 
974
                                                curr_field->store(access_code, true);
 
975
                                                setNotNullInRecord(curr_field, buf);
 
976
                                                break;
 
977
                                        case 'u':
 
978
                                                ASSERT(strcmp(curr_field->field_name, "Access_count") == 0);
 
979
                                                curr_field->store(access_count, true);
 
980
                                                setNotNullInRecord(curr_field, buf);
 
981
                                                break;
 
982
                                }
 
983
                                break;
 
984
                        case 'R':
 
985
                                switch (curr_field->field_name[6]) {
 
986
                                        case 't':
 
987
                                                // Repository_id     INT
 
988
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
989
                                                curr_field->store(iRepoFile->myRepo->getRepoID(), true);
 
990
                                                setNotNullInRecord(curr_field, buf);
 
991
                                                break;
 
992
                                        case 'l':
 
993
                                                // Repo_blob_offset  BIGINT
 
994
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
995
                                                curr_field->store(iRepoOffset, true);
 
996
                                                setNotNullInRecord(curr_field, buf);
 
997
                                                break;
 
998
                                }
 
999
                                break;
 
1000
                        case 'B':
 
1001
                                switch (curr_field->field_name[5]) {
 
1002
                                        case 's':
 
1003
                                                // Blob_size         BIGINT
 
1004
                                                ASSERT(strcmp(curr_field->field_name, "Blob_size") == 0);
 
1005
                                                curr_field->store(blob_size, true);
 
1006
                                                setNotNullInRecord(curr_field, buf);
 
1007
                                                break;
 
1008
                                        case 'a':
 
1009
                                                // Blob_alias         
 
1010
                                                ASSERT(strcmp(curr_field->field_name, "Blob_alias") == 0);
 
1011
#ifdef HAVE_ALIAS_SUPPORT
 
1012
                                                if (alias_offset) {
 
1013
                                                        char blob_alias[BLOB_ALIAS_LENGTH +1];
 
1014
                                                        CSMutex *lock;
 
1015
                                                        uint64_t rsize;
 
1016
                                                        
 
1017
                                                        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
1018
                                                        lock_(lock);
 
1019
                                                        rsize = iRepoFile->read(blob_alias, iRepoOffset + alias_offset, BLOB_ALIAS_LENGTH, 0);
 
1020
                                                        unlock_(lock);
 
1021
                                                        blob_alias[rsize] = 0;
 
1022
                                                        curr_field->store(blob_alias, strlen(blob_alias), &my_charset_utf8_general_ci);
 
1023
                                                        setNotNullInRecord(curr_field, buf);
 
1024
                                                } else {
 
1025
                                                        curr_field->store((uint64_t) 0, true);
 
1026
                                                }
 
1027
#else
 
1028
                                                curr_field->store((uint64_t) 0, true);
 
1029
#endif
 
1030
                                                
 
1031
                                                break;
 
1032
                                }
 
1033
 
 
1034
                                break;
 
1035
                        case 'M': // MD5_Checksum
 
1036
                                if (storage_type == MS_STANDARD_STORAGE) {
 
1037
                                        char checksum[sizeof(Md5Digest) *2];
 
1038
                                        
 
1039
                                        ASSERT(strcmp(curr_field->field_name, "MD5_Checksum") == 0);
 
1040
                                        cs_bin_to_hex(sizeof(Md5Digest) *2, checksum, sizeof(Md5Digest), &(blob->rb_blob_checksum_md5d));
 
1041
                                        curr_field->store(checksum, sizeof(Md5Digest) *2, system_charset_info);
 
1042
                                        setNotNullInRecord(curr_field, buf);
 
1043
                                        
 
1044
                                } else
 
1045
                                        curr_field->store((uint64_t) 0, true);
 
1046
                        
 
1047
                                break;
 
1048
                        case 'H':
 
1049
                                // Head_size         SMALLINT UNSIGNED
 
1050
                                ASSERT(strcmp(curr_field->field_name, "Head_size") == 0);
 
1051
                                curr_field->store(head_size, true);
 
1052
                                setNotNullInRecord(curr_field, buf);
 
1053
                                break;
 
1054
                        case 'C':
 
1055
                                // Creation_time     TIMESTAMP
 
1056
                                ASSERT(strcmp(curr_field->field_name, "Creation_time") == 0);
 
1057
                                curr_field->store(ms_my_1970_to_mysql_time(creation_time), true);
 
1058
                                setNotNullInRecord(curr_field, buf);
 
1059
                                break;
 
1060
                        case 'L':
 
1061
                                switch (curr_field->field_name[5]) {
 
1062
                                        case 'r':
 
1063
                                                // Last_ref_time     TIMESTAMP
 
1064
                                                ASSERT(strcmp(curr_field->field_name, "Last_ref_time") == 0);
 
1065
                                                curr_field->store(ms_my_1970_to_mysql_time(last_ref), true);
 
1066
                                                setNotNullInRecord(curr_field, buf);
 
1067
                                                break;
 
1068
                                        case 'a':
 
1069
                                                // Last_access_time  TIMESTAMP
 
1070
                                                ASSERT(strcmp(curr_field->field_name, "Last_access_time") == 0);
 
1071
                                                curr_field->store(ms_my_1970_to_mysql_time(last_access), true);
 
1072
                                                setNotNullInRecord(curr_field, buf);
 
1073
                                                break;
 
1074
                                }
 
1075
                                break;
 
1076
                }
 
1077
                curr_field->ptr = save;
 
1078
        }
 
1079
 
 
1080
        table->write_set = save_write_set;
 
1081
        return_(true);
 
1082
}
 
1083
 
 
1084
/*
 
1085
 * -------------------------------------------------------------------------
 
1086
 * BLOB DATA TABLE
 
1087
 */
 
1088
//-----------------------
 
1089
MSBlobDataTable::MSBlobDataTable(MSSystemTableShare *share, TABLE *table):MSRepositoryTable(share, table)
 
1090
{
 
1091
}
 
1092
 
 
1093
//-----------------------
 
1094
MSBlobDataTable::~MSBlobDataTable()
 
1095
{       
 
1096
}
 
1097
 
 
1098
//-----------------------
 
1099
bool MSBlobDataTable::returnRow(MSBlobHeadPtr blob, char *buf)
 
1100
{
 
1101
        TABLE           *table = mySQLTable;
 
1102
        uint8_t         blob_type;
 
1103
        uint16_t                head_size;
 
1104
        uint64_t                blob_size;
 
1105
        uint32          len;
 
1106
        Field           *curr_field;
 
1107
        byte            *save;
 
1108
        MY_BITMAP       *save_write_set;
 
1109
 
 
1110
        head_size = CS_GET_DISK_2(blob->rb_head_size_2);
 
1111
        blob_size = CS_GET_DISK_6(blob->rb_blob_repo_size_6);
 
1112
        blob_type = CS_GET_DISK_1(blob->rb_storage_type_1);
 
1113
 
 
1114
        /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
 
1115
         * I use store()!??
 
1116
         * But I want to use it! :(
 
1117
         */
 
1118
        save_write_set = table->write_set;
 
1119
        table->write_set = NULL;
 
1120
 
 
1121
        memset(buf, 0xFF, table->s->null_bytes);
 
1122
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
1123
                curr_field = *field;
 
1124
 
 
1125
                save = curr_field->ptr;
 
1126
#if MYSQL_VERSION_ID < 50114
 
1127
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
1128
#else
 
1129
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
 
1130
#endif
 
1131
                switch (curr_field->field_name[0]) {
 
1132
                        case 'R':
 
1133
                                switch (curr_field->field_name[6]) {
 
1134
                                        case 't':
 
1135
                                                // Repository_id     INT
 
1136
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
1137
                                                curr_field->store(iRepoFile->myRepo->getRepoID(), true);
 
1138
                                                setNotNullInRecord(curr_field, buf);
 
1139
                                                break;
 
1140
                                        case 'l':
 
1141
                                                // Repo_blob_offset  BIGINT
 
1142
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
1143
                                                curr_field->store(iRepoOffset, true);
 
1144
                                                setNotNullInRecord(curr_field, buf);
 
1145
                                                break;
 
1146
                                }
 
1147
                                break;
 
1148
                        case 'B':
 
1149
                                // Blob_data         LONGBLOB
 
1150
                                ASSERT(strcmp(curr_field->field_name, "Blob_data") == 0);
 
1151
                                if (blob_size <= 0xFFFFFFF) {
 
1152
                                        iBlobBuffer->setLength((uint32_t) blob_size);
 
1153
                                        if (BLOB_IN_REPOSITORY(blob_type)) {
 
1154
                                                len = iRepoFile->read(iBlobBuffer->getBuffer(0), iRepoOffset + head_size, (size_t) blob_size, 0);
 
1155
                                        } else {
 
1156
                                                CloudDB *blobCloud = myShare->mySysDatabase->myBlobCloud;
 
1157
                                                CloudKeyRec key;
 
1158
                                                ASSERT(blobCloud != NULL);
 
1159
                                                
 
1160
                                                MSRepoFile::getBlobKey(blob, &key);
 
1161
                                                
 
1162
                                                len = blobCloud->cl_getData(&key, iBlobBuffer->getBuffer(0), blob_size);
 
1163
                                        }
 
1164
                                        ((Field_blob *) curr_field)->set_ptr(len, (byte *) iBlobBuffer->getBuffer(0));
 
1165
                                        setNotNullInRecord(curr_field, buf);
 
1166
                                }
 
1167
                                break;
 
1168
                }
 
1169
                curr_field->ptr = save;
 
1170
        }
 
1171
 
 
1172
        table->write_set = save_write_set;
 
1173
        return true;
 
1174
}
 
1175
 
 
1176
#ifdef HAVE_ALIAS_SUPPORT
 
1177
/*
 
1178
 * -------------------------------------------------------------------------
 
1179
 * Alias DATA TABLE
 
1180
 */
 
1181
bool MSBlobAliasTable::returnRow(MSBlobHeadPtr blob, char *buf)
 
1182
{
 
1183
        TABLE           *table = mySQLTable;
 
1184
        Field           *curr_field;
 
1185
        byte            *save;
 
1186
        MY_BITMAP       *save_write_set;
 
1187
        uint16_t                alias_offset; 
 
1188
        char            blob_alias[BLOB_ALIAS_LENGTH +1];
 
1189
        CSMutex         *lock;
 
1190
        uint64_t                rsize;
 
1191
        enter_();
 
1192
        
 
1193
        alias_offset = CS_GET_DISK_2(blob->rb_alias_offset_2);
 
1194
 
 
1195
        if (!alias_offset)
 
1196
                return_(false);
 
1197
 
 
1198
        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
1199
        lock_(lock);
 
1200
        rsize = iRepoFile->read(blob_alias, iRepoOffset + alias_offset, BLOB_ALIAS_LENGTH, 0);
 
1201
        unlock_(lock);
 
1202
        blob_alias[rsize] = 0;
 
1203
        
 
1204
        /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
 
1205
         * I use store()!??
 
1206
         * But I want to use it! :(
 
1207
         */
 
1208
        save_write_set = table->write_set;
 
1209
        table->write_set = NULL;
 
1210
 
 
1211
        memset(buf, 0xFF, table->s->null_bytes);
 
1212
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
1213
                curr_field = *field;
 
1214
 
 
1215
                save = curr_field->ptr;
 
1216
#if MYSQL_VERSION_ID < 50114
 
1217
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
1218
#else
 
1219
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
 
1220
#endif
 
1221
                switch (curr_field->field_name[0]) {
 
1222
                        case 'R':
 
1223
                                switch (curr_field->field_name[6]) {
 
1224
                                        case 't':
 
1225
                                                // Repository_id     INT
 
1226
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
1227
                                                curr_field->store(iRepoFile->myRepo->getRepoID(), true);
 
1228
                                                setNotNullInRecord(curr_field, buf);
 
1229
                                                break;
 
1230
                                        case 'l':
 
1231
                                                // Repo_blob_offset  BIGINT
 
1232
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
1233
                                                curr_field->store(iRepoOffset, true);
 
1234
                                                setNotNullInRecord(curr_field, buf);
 
1235
                                                break;
 
1236
                                }
 
1237
                                break;
 
1238
                        case 'B':
 
1239
                                // Blob_alias         
 
1240
                                ASSERT(strcmp(curr_field->field_name, "Blob_alias") == 0);
 
1241
                                curr_field->store(blob_alias, strlen(blob_alias), &UTF8_CHARSET);
 
1242
                                setNotNullInRecord(curr_field, buf);
 
1243
                                
 
1244
                                break;
 
1245
                }
 
1246
                curr_field->ptr = save;
 
1247
        }
 
1248
 
 
1249
        table->write_set = save_write_set;
 
1250
        return_(true);
 
1251
}
 
1252
 
 
1253
#endif
 
1254
 
 
1255
/*
 
1256
 * -------------------------------------------------------------------------
 
1257
 * REFERENCE TABLE
 
1258
 */
 
1259
 
 
1260
MSReferenceTable::MSReferenceTable(MSSystemTableShare *share, TABLE *table):
 
1261
MSRepositoryTable(share, table),
 
1262
iRefDataList(NULL), 
 
1263
iRefDataSize(0), 
 
1264
iRefDataUsed(0),
 
1265
iRefDataPos(0),
 
1266
iRefOpenTable(NULL),
 
1267
iRefTempLog(NULL)
 
1268
{
 
1269
}
 
1270
 
 
1271
MSReferenceTable::~MSReferenceTable()
 
1272
{
 
1273
        if (iRefDataList)
 
1274
                cs_free(iRefDataList);
 
1275
        if (iRefOpenTable)
 
1276
                iRefOpenTable->returnToPool();
 
1277
        if (iRefTempLog)
 
1278
                iRefTempLog->release();
 
1279
}
 
1280
 
 
1281
void MSReferenceTable::unuse()
 
1282
{
 
1283
        MSRepositoryTable::unuse();
 
1284
        if (iRefDataList) {
 
1285
                cs_free(iRefDataList);
 
1286
                iRefDataList = NULL;
 
1287
        }
 
1288
        iRefDataSize = 0;
 
1289
        if (iRefOpenTable) {
 
1290
                iRefOpenTable->returnToPool();
 
1291
                iRefOpenTable = NULL;
 
1292
        }
 
1293
        if (iRefTempLog) {
 
1294
                iRefTempLog->release();
 
1295
                iRefTempLog = NULL;
 
1296
        }
 
1297
}
 
1298
 
 
1299
 
 
1300
void MSReferenceTable::seqScanInit()
 
1301
{
 
1302
        MSRepositoryTable::seqScanInit();
 
1303
        iRefDataUsed = 0;
 
1304
        iRefDataPos = 0;
 
1305
}
 
1306
 
 
1307
int     MSReferenceTable::getRefLen()
 
1308
{
 
1309
        return sizeof(iRefDataUsed) + sizeof(iRefDataPos) + sizeof(iRefCurrentIndex) + sizeof(iRefCurrentOffset);
 
1310
}
 
1311
 
 
1312
void MSReferenceTable::seqScanPos(uint8_t *pos)
 
1313
{
 
1314
        mi_int4store(pos, iRefCurrentDataPos); pos +=4;
 
1315
        mi_int4store(pos, iRefCurrentDataUsed);pos +=4; 
 
1316
        mi_int4store(pos, iRefCurrentIndex); pos +=4;
 
1317
        mi_int8store(pos, iRefCurrentOffset);
 
1318
}
 
1319
 
 
1320
void MSReferenceTable::seqScanRead(uint8_t *pos, char *buf)
 
1321
{
 
1322
        iRefDataPos = mi_uint4korr( pos); pos +=4;
 
1323
        iRefDataUsed = mi_uint4korr(pos); pos +=4;
 
1324
        iRefBlobRepo = mi_uint4korr(pos); pos +=4;
 
1325
        iRefBlobOffset = mi_uint8korr(pos);
 
1326
        MSRepositoryTable::seqScanRead(iRefBlobRepo, iRefBlobOffset, buf);
 
1327
}
 
1328
 
 
1329
bool MSReferenceTable::seqScanNext(char *buf)
 
1330
{
 
1331
        iRefCurrentDataPos = iRefDataPos;
 
1332
        iRefCurrentDataUsed = iRefDataUsed;
 
1333
        
 
1334
        // Reset the position
 
1335
        return MSRepositoryTable::seqScanNext(buf);
 
1336
}
 
1337
// select * from pbms_reference order by blob_size DESC;
 
1338
// select * from pbms_reference order by Table_name DESC;
 
1339
 
 
1340
bool MSReferenceTable::resetScan(bool positioned, uint32_t repo_index)
 
1341
{
 
1342
        CSMutex                         *lock;
 
1343
        MSBlobHeadRec           blob;
 
1344
        uint16_t                        head_size;
 
1345
        uint64_t                        blob_size;
 
1346
        MSRepoPointersRec       ptr;
 
1347
        size_t                          ref_size;
 
1348
        uint32_t                        tab_index;
 
1349
        uint32_t                        ref_count;
 
1350
        uint8_t                         status;
 
1351
 
 
1352
        enter_();
 
1353
        
 
1354
        if (!MSRepositoryTable::resetScan(positioned, repo_index))
 
1355
                return_(false);
 
1356
        
 
1357
        retry_read:
 
1358
        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
1359
        lock_(lock);
 
1360
        if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
 
1361
                unlock_(lock);
 
1362
                iRepoOffset = iRepoFileSize;
 
1363
                return_(false);
 
1364
        }
 
1365
        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
 
1366
        blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
 
1367
        ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
 
1368
        ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
 
1369
        status = CS_GET_DISK_1(blob.rb_status_1);
 
1370
        if (ref_count <= 0 || ref_size == 0 ||
 
1371
                head_size < iRepoFile->myRepo->getRepoBlobHeadSize() + ref_count * ref_size || 
 
1372
                !VALID_BLOB_STATUS(status)) {
 
1373
                /* Can't be true. Assume this is garbage! */
 
1374
                unlock_(lock);
 
1375
                iRepoOffset++;
 
1376
                goto retry_read;
 
1377
        }
 
1378
 
 
1379
        if (IN_USE_BLOB_STATUS(status)) {
 
1380
                iBlobBuffer->setLength((uint32_t) head_size);
 
1381
                if (iRepoFile->read(iBlobBuffer->getBuffer(0), iRepoOffset, head_size, 0) < head_size) {
 
1382
                        unlock_(lock);
 
1383
                        iRepoOffset = iRepoFileSize;
 
1384
                        return_(false);
 
1385
                }
 
1386
                unlock_(lock);
 
1387
 
 
1388
                iRefAuthCode = CS_GET_DISK_4(blob.rb_auth_code_4);
 
1389
                iRefBlobSize = CS_GET_DISK_6(blob.rb_blob_data_size_6);;
 
1390
                iRefBlobRepo = iRepoFile->myRepo->getRepoID();
 
1391
                iRefBlobOffset = iRepoOffset;
 
1392
 
 
1393
                if (ref_count > iRefDataSize) {
 
1394
                        cs_realloc((void **) &iRefDataList, sizeof(MSRefDataRec) * ref_count);
 
1395
                        iRefDataSize = ref_count;
 
1396
                }
 
1397
                
 
1398
                if (!positioned) 
 
1399
                        iRefDataPos = 0;
 
1400
 
 
1401
                // When ever the data position is reset the current location information
 
1402
                // must also be reset so that it is consisent with the data position.
 
1403
                iRefCurrentDataPos = iRefDataPos;
 
1404
                iRefCurrentOffset = iRepoOffset;
 
1405
                iRefCurrentIndex = iRepoIndex;
 
1406
                iRefCurrentDataUsed = iRefDataUsed = ref_count;
 
1407
                memset(iRefDataList, 0, sizeof(MSRefDataRec) * ref_count);
 
1408
 
 
1409
                uint32_t h = iRepoFile->myRepo->getRepoBlobHeadSize();
 
1410
                ptr.rp_chars = iBlobBuffer->getBuffer(0) + h;
 
1411
                for (uint32_t i=0; i<ref_count; i++) {
 
1412
                        switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
 
1413
                                case MS_BLOB_FREE_REF:
 
1414
                                        break;
 
1415
                                case MS_BLOB_TABLE_REF: // The blob is intended for a file but has not been inserted yet.
 
1416
                                        iRefDataList[i].rd_tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
 
1417
                                        iRefDataList[i].rd_blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
 
1418
                                        iRefDataList[i].rd_col_index = INVALID_INDEX;  // Means not used
 
1419
                                        break;
 
1420
                                case MS_BLOB_DELETE_REF:
 
1421
                                        tab_index = CS_GET_DISK_2(ptr.rp_temp_ref->tp_del_ref_2);
 
1422
                                        if (tab_index && (tab_index <= ref_count)) {
 
1423
                                                tab_index--;
 
1424
                                                iRefDataList[tab_index].rd_temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
 
1425
                                                iRefDataList[tab_index].rd_temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
 
1426
                                        }
 
1427
                                        else if (tab_index == INVALID_INDEX) {
 
1428
                                                /* The is a reference from the temporary log only!! */
 
1429
                                                iRefDataList[i].rd_tab_id = 0xFFFFFFFF;  // Indicates no table
 
1430
                                                iRefDataList[i].rd_blob_id = iRepoOffset;
 
1431
                                                iRefDataList[i].rd_blob_ref_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);;
 
1432
                                                iRefDataList[i].rd_col_index = INVALID_INDEX;  // Means not used
 
1433
                                                iRefDataList[i].rd_temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
 
1434
                                                iRefDataList[i].rd_temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
 
1435
                                        }
 
1436
                                        break;
 
1437
                                default:
 
1438
                                        MSRepoTableRefPtr       tab_ref;
 
1439
 
 
1440
                                        tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1;
 
1441
                                        if (tab_index < ref_count) {
 
1442
                                                tab_ref = (MSRepoTableRefPtr) (iBlobBuffer->getBuffer(0) + iRepoFile->myRepo->getRepoBlobHeadSize() + tab_index * ref_size);
 
1443
        
 
1444
                                                iRefDataList[i].rd_tab_id = CS_GET_DISK_4(tab_ref->tr_table_id_4);
 
1445
                                                iRefDataList[i].rd_blob_id = CS_GET_DISK_6(tab_ref->tr_blob_id_6);
 
1446
                                                iRefDataList[i].rd_col_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_col_index_2);
 
1447
                                                iRefDataList[i].rd_blob_ref_id = COMMIT_MASK(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8));
 
1448
 
 
1449
                                                iRefDataList[tab_index].rd_ref_count++;
 
1450
                                        }
 
1451
                                        else {
 
1452
                                                /* Can't be true. Assume this is garbage! */
 
1453
                                                unlock_(lock);
 
1454
                                                iRepoOffset++;
 
1455
                                                goto retry_read;
 
1456
                                        }
 
1457
                                        break;
 
1458
                        }
 
1459
                        ptr.rp_chars += ref_size;
 
1460
                }
 
1461
 
 
1462
                iRepoOffset += head_size + blob_size;
 
1463
 
 
1464
                return_(true);
 
1465
        }
 
1466
        unlock_(lock);
 
1467
        iRepoOffset += head_size + blob_size;
 
1468
        return_(false);
 
1469
}
 
1470
 
 
1471
bool MSReferenceTable::returnRecord(char *buf)
 
1472
{
 
1473
        if (!resetScan(false))
 
1474
                return false;
 
1475
                
 
1476
        return(returnSubRecord(buf));
 
1477
}
 
1478
 
 
1479
bool MSReferenceTable::returnSubRecord(char *buf)
 
1480
{
 
1481
        uint32_t i;
 
1482
        
 
1483
        while (iRefDataPos < iRefDataUsed) {
 
1484
                i = iRefDataPos++;
 
1485
                if (iRefDataList[i].rd_tab_id) {
 
1486
                        if (iRefDataList[i].rd_col_index == INVALID_INDEX) {
 
1487
                                /* This is a table reference */
 
1488
                                if (!iRefDataList[i].rd_ref_count || iRefDataList[i].rd_temp_log_id) {
 
1489
                                        returnRow(&iRefDataList[i], buf);
 
1490
                                        return true;
 
1491
                                }
 
1492
                        }
 
1493
                        else {
 
1494
                                /* This is an engine reference */
 
1495
                                returnRow(&iRefDataList[i], buf);
 
1496
                                return true;
 
1497
                        }
 
1498
                }
 
1499
        }
 
1500
 
 
1501
        return false;
 
1502
}
 
1503
 
 
1504
void MSReferenceTable::returnRow(MSRefDataPtr ref_data, char *buf)
 
1505
{
 
1506
        TABLE                   *table = mySQLTable;
 
1507
        Field                   *curr_field;
 
1508
        byte                    *save;
 
1509
        MY_BITMAP               *save_write_set;
 
1510
        MY_BITMAP               *save_read_set;
 
1511
        bool                    have_times = false;
 
1512
        time_t                  delete_time;
 
1513
        int32_t                 countdown = 0;
 
1514
 
 
1515
        if (iRefOpenTable) {
 
1516
                if (iRefOpenTable->getDBTable()->myTableID != ref_data->rd_tab_id) {
 
1517
                        iRefOpenTable->returnToPool();
 
1518
                        iRefOpenTable = NULL;
 
1519
                }
 
1520
        }
 
1521
 
 
1522
        if (!iRefOpenTable && ref_data->rd_tab_id != 0xFFFFFFFF)
 
1523
                iRefOpenTable = MSTableList::getOpenTableByID(myShare->mySysDatabase->myDatabaseID, ref_data->rd_tab_id);
 
1524
 
 
1525
        if (ref_data->rd_temp_log_id) {
 
1526
                if (iRefTempLog) {
 
1527
                        if (iRefTempLog->myTempLogID != ref_data->rd_temp_log_id) {
 
1528
                                iRefTempLog->release();
 
1529
                                iRefTempLog = NULL;
 
1530
                        }
 
1531
                }
 
1532
                if (!iRefTempLog)
 
1533
                        iRefTempLog = myShare->mySysDatabase->openTempLogFile(ref_data->rd_temp_log_id, NULL, NULL);
 
1534
 
 
1535
                if (iRefTempLog) {
 
1536
                        MSTempLogItemRec        log_item;
 
1537
 
 
1538
                        if (iRefTempLog->read(&log_item, ref_data->rd_temp_log_offset, sizeof(MSTempLogItemRec), 0) == sizeof(MSTempLogItemRec)) {
 
1539
                                have_times = true;
 
1540
                                delete_time = CS_GET_DISK_4(log_item.ti_time_4);
 
1541
                                countdown = (int32_t) (delete_time + PBMSParameters::getTempBlobTimeout()) - (int32_t) time(NULL);
 
1542
                        }
 
1543
                }
 
1544
        }
 
1545
 
 
1546
        if (ref_data->rd_col_index != INVALID_INDEX) {
 
1547
                if (iRefOpenTable) {
 
1548
                        if (iRefOpenTable->getDBTable()->isToDelete()) {
 
1549
                                have_times = true;
 
1550
                                iRefOpenTable->getDBTable()->getDeleteInfo(&ref_data->rd_temp_log_id, &ref_data->rd_temp_log_offset, &delete_time);
 
1551
                                ref_data->rd_col_index = INVALID_INDEX;
 
1552
                                countdown = (int32_t) (delete_time + PBMSParameters::getTempBlobTimeout()) - (int32_t) time(NULL);
 
1553
                        }
 
1554
                }
 
1555
                else
 
1556
                        ref_data->rd_col_index = INVALID_INDEX;
 
1557
        }
 
1558
 
 
1559
        save_write_set = table->write_set;
 
1560
        save_read_set = table->read_set;
 
1561
        table->write_set = NULL;
 
1562
        table->read_set = NULL;
 
1563
 
 
1564
        memset(buf, 0xFF, table->s->null_bytes);
 
1565
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
1566
                curr_field = *field;
 
1567
 
 
1568
                save = curr_field->ptr;
 
1569
#if MYSQL_VERSION_ID < 50114
 
1570
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
1571
#else
 
1572
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
 
1573
#endif
 
1574
                switch (curr_field->field_name[0]) {
 
1575
                        case 'B':
 
1576
                                switch (curr_field->field_name[5]) {
 
1577
                                        case 'i':
 
1578
                                                // Blob_id           BIGINT,
 
1579
                                                ASSERT(strcmp(curr_field->field_name, "Blob_id") == 0);
 
1580
                                                if (ref_data->rd_tab_id == 0xFFFFFFFF)
 
1581
                                                        curr_field->store(0, true);
 
1582
                                                else {
 
1583
                                                        curr_field->store(ref_data->rd_blob_id, true);
 
1584
                                                        setNotNullInRecord(curr_field, buf);
 
1585
                                                }
 
1586
                                                break;
 
1587
                                        case 'u':
 
1588
                                                // Blob_url          VARCHAR(120),
 
1589
                                                PBMSBlobURLRec blob_url;
 
1590
 
 
1591
                                                ASSERT(strcmp(curr_field->field_name, "Blob_url") == 0);
 
1592
                                                if (ref_data->rd_tab_id != 0xFFFFFFFF) {
 
1593
                                                        iRefOpenTable->formatBlobURL(&blob_url, ref_data->rd_blob_id, iRefAuthCode, iRefBlobSize, ref_data->rd_blob_ref_id);
 
1594
                                                        curr_field->store(blob_url.bu_data, strlen(blob_url.bu_data) +1, &UTF8_CHARSET); // Include the null char in the url. This is the way it is stored in user tables.
 
1595
                                                        setNotNullInRecord(curr_field, buf);
 
1596
                                                }
 
1597
                                                break;
 
1598
                                        case 's':
 
1599
                                                // Blob_size         BIGINT,
 
1600
                                                ASSERT(strcmp(curr_field->field_name, "Blob_size") == 0);
 
1601
                                                curr_field->store(iRefBlobSize, true);
 
1602
                                                setNotNullInRecord(curr_field, buf);
 
1603
                                                break;
 
1604
                                }
 
1605
                                break;
 
1606
                        case 'C':
 
1607
                                // Column_ordinal       INT,
 
1608
                                ASSERT(strcmp(curr_field->field_name, "Column_ordinal") == 0);
 
1609
                                if (ref_data->rd_col_index != INVALID_INDEX) {
 
1610
                                        curr_field->store(ref_data->rd_col_index +1, true);
 
1611
                                        setNotNullInRecord(curr_field, buf);
 
1612
                                }
 
1613
                                break;
 
1614
                        case 'D':
 
1615
                                // Deletion_time     TIMESTAMP,
 
1616
                                ASSERT(strcmp(curr_field->field_name, "Deletion_time") == 0);
 
1617
                                if (have_times) {
 
1618
                                        curr_field->store(ms_my_1970_to_mysql_time(delete_time), true);
 
1619
                                        setNotNullInRecord(curr_field, buf);
 
1620
                                }
 
1621
                                break;
 
1622
                        case 'R':
 
1623
                                switch (curr_field->field_name[5]) {
 
1624
                                        case 'i':
 
1625
                                                // Repository_id     INT,
 
1626
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
1627
                                                curr_field->store(iRefBlobRepo, true);
 
1628
                                                setNotNullInRecord(curr_field, buf);
 
1629
                                                break;
 
1630
                                        case 'b':
 
1631
                                                // Repo_blob_offset  BIGINT,
 
1632
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
1633
                                                curr_field->store(iRefBlobOffset, true);
 
1634
                                                setNotNullInRecord(curr_field, buf);
 
1635
                                                break;
 
1636
                                        case 'e':
 
1637
                                                // Remove_in INT
 
1638
                                                ASSERT(strcmp(curr_field->field_name, "Remove_in") == 0);
 
1639
                                                if (have_times) {
 
1640
                                                        curr_field->store(countdown, false);
 
1641
                                                        setNotNullInRecord(curr_field, buf);
 
1642
                                                }
 
1643
                                                break;
 
1644
                                }
 
1645
                                break;
 
1646
                        case 'T':
 
1647
                                switch (curr_field->field_name[9]) {
 
1648
                                        case 'e':
 
1649
                                                // Table_name        CHAR(64),
 
1650
                                                ASSERT(strcmp(curr_field->field_name, "Table_name") == 0);
 
1651
                                                if (ref_data->rd_tab_id != 0xFFFFFFFF) {
 
1652
                                                        if (iRefOpenTable) {
 
1653
                                                                CSString *table_name = iRefOpenTable->getDBTable()->getTableName();
 
1654
                                                                curr_field->store(table_name->getCString(), table_name->length(), &UTF8_CHARSET);
 
1655
                                                        }
 
1656
                                                        else {
 
1657
                                                                char buffer[MS_TABLE_NAME_SIZE];
 
1658
                                                                
 
1659
                                                                snprintf(buffer, MS_TABLE_NAME_SIZE, "Table #%"PRIu32"", ref_data->rd_tab_id);
 
1660
                                                                curr_field->store(buffer, strlen(buffer), &UTF8_CHARSET);
 
1661
                                                        }
 
1662
                                                        setNotNullInRecord(curr_field, buf);
 
1663
                                                }
 
1664
                                                break;
 
1665
                                        case 'i':
 
1666
                                                // Temp_log_id       INT,
 
1667
                                                ASSERT(strcmp(curr_field->field_name, "Temp_log_id") == 0);
 
1668
                                                if (ref_data->rd_temp_log_id) {
 
1669
                                                        curr_field->store(ref_data->rd_temp_log_id, true);
 
1670
                                                        setNotNullInRecord(curr_field, buf);
 
1671
                                                }
 
1672
                                                break;
 
1673
                                        case 'o':
 
1674
                                                // Temp_log_offset   BIGINT
 
1675
                                                ASSERT(strcmp(curr_field->field_name, "Temp_log_offset") == 0);
 
1676
                                                if (ref_data->rd_temp_log_id) {
 
1677
                                                        curr_field->store(ref_data->rd_temp_log_offset, true);
 
1678
                                                        setNotNullInRecord(curr_field, buf);
 
1679
                                                }
 
1680
                                                break;
 
1681
                                }
 
1682
                                break;
 
1683
                }
 
1684
                curr_field->ptr = save;
 
1685
        }
 
1686
 
 
1687
        table->write_set = save_write_set;
 
1688
        table->read_set = save_read_set;
 
1689
}
 
1690
 
 
1691
/*
 
1692
 * -------------------------------------------------------------------------
 
1693
 * META DATA TABLE
 
1694
 */
 
1695
MSMetaDataTable::MSMetaDataTable(MSSystemTableShare *share, TABLE *table):
 
1696
MSRepositoryTable(share, table),
 
1697
iMetData(NULL), 
 
1698
iMetCurrentBlobRepo(0),
 
1699
iMetCurrentBlobOffset(0),
 
1700
iMetCurrentDataPos(0),
 
1701
iMetCurrentDataSize(0),
 
1702
iMetDataPos(0),
 
1703
iMetDataSize(0), 
 
1704
iMetBlobRepo(0),
 
1705
iMetBlobOffset(0),
 
1706
iMetStateSaved(false)
 
1707
{
 
1708
}
 
1709
 
 
1710
MSMetaDataTable::~MSMetaDataTable()
 
1711
{
 
1712
        if (iMetData) {
 
1713
                iMetData->release();
 
1714
                iMetData = NULL;
 
1715
        }
 
1716
}
 
1717
 
 
1718
MSMetaDataTable *MSMetaDataTable::newMSMetaDataTable(MSDatabase *db)
 
1719
{
 
1720
        char path[PATH_MAX];
 
1721
        
 
1722
        cs_strcpy(PATH_MAX, path, db->myDatabasePath->getCString());
 
1723
        db->release();
 
1724
        cs_add_dir_char(PATH_MAX, path);
 
1725
        cs_strcat(PATH_MAX, path, sysTableNames[SYS_META]);
 
1726
        
 
1727
        return  (MSMetaDataTable*) MSSystemTableShare::openSystemTable(path, NULL);
 
1728
}
 
1729
 
 
1730
void MSMetaDataTable::unuse()
 
1731
{
 
1732
        MSRepositoryTable::unuse();
 
1733
        if (iMetData) {
 
1734
                iMetData->release();
 
1735
                iMetData = NULL;
 
1736
        }
 
1737
        iMetDataSize = 0;
 
1738
}
 
1739
 
 
1740
 
 
1741
void MSMetaDataTable::seqScanInit()
 
1742
{
 
1743
        MSRepositoryTable::seqScanInit();
 
1744
        iMetDataSize = 0;
 
1745
        iMetDataPos = 0;
 
1746
        iMetBlobRepo = 0;
 
1747
        iMetBlobOffset = 0;
 
1748
        new_(iMetData, CSStringBuffer(80));
 
1749
        iMetStateSaved = false;
 
1750
}
 
1751
 
 
1752
void MSMetaDataTable::seqScanReset()
 
1753
{
 
1754
        seqScanPos(iMetState);
 
1755
        seqScanInit();
 
1756
        iMetStateSaved = true;
 
1757
}
 
1758
 
 
1759
int     MSMetaDataTable::getRefLen()
 
1760
{
 
1761
        return sizeof(iMetCurrentDataPos) + sizeof(iMetCurrentDataSize) + sizeof(iMetCurrentBlobRepo) + sizeof(iMetCurrentBlobOffset);
 
1762
}
 
1763
 
 
1764
void MSMetaDataTable::seqScanPos(uint8_t *pos)
 
1765
{
 
1766
        mi_int4store(pos, iMetCurrentDataPos); pos +=4;
 
1767
        mi_int4store(pos, iMetCurrentDataSize);pos +=4; 
 
1768
        mi_int4store(pos, iMetCurrentBlobRepo); pos +=4;
 
1769
        mi_int8store(pos, iMetCurrentBlobOffset);
 
1770
}
 
1771
 
 
1772
void MSMetaDataTable::seqScanRead(uint8_t *pos, char *buf)
 
1773
{
 
1774
        iMetStateSaved = false;
 
1775
        iMetDataPos = mi_uint4korr( pos); pos +=4;
 
1776
        iMetDataSize = mi_uint4korr(pos); pos +=4;
 
1777
        iMetBlobRepo = mi_uint4korr(pos); pos +=4;
 
1778
        iMetBlobOffset = mi_uint8korr(pos);
 
1779
        MSRepositoryTable::seqScanRead(iMetBlobRepo, iMetBlobOffset, buf);
 
1780
}
 
1781
 
 
1782
bool MSMetaDataTable::seqScanNext(char *buf)
 
1783
{
 
1784
        if (iMetStateSaved) {
 
1785
                bool have_data;
 
1786
                uint8_t *pos = iMetState;
 
1787
                iMetDataPos = mi_uint4korr( pos); pos +=4;
 
1788
                // Do not reset the meta data size.
 
1789
                /*iMetDataSize = mi_uint4korr(pos); */pos +=4;
 
1790
                iMetBlobRepo = mi_uint4korr(pos); pos +=4;
 
1791
                iMetBlobOffset = mi_uint8korr(pos);
 
1792
                iMetStateSaved = false;
 
1793
                resetScan(true, &have_data, iMetBlobRepo);
 
1794
        }
 
1795
        
 
1796
        iMetCurrentDataPos = iMetDataPos;
 
1797
        iMetCurrentDataSize = iMetDataSize;
 
1798
        
 
1799
        return MSRepositoryTable::seqScanNext(buf);
 
1800
}
 
1801
 
 
1802
bool MSMetaDataTable::resetScan(bool positioned, bool *have_data, uint32_t repo_index)
 
1803
{
 
1804
        CSMutex                         *lock;
 
1805
        MSBlobHeadRec           blob;
 
1806
        uint16_t                                head_size;
 
1807
        uint64_t                                blob_size;
 
1808
        size_t                          mdata_size, mdata_offset;
 
1809
        uint8_t                         status;
 
1810
 
 
1811
        enter_();
 
1812
        
 
1813
        *have_data = false;
 
1814
        if (!MSRepositoryTable::resetScan(positioned, repo_index))
 
1815
                return_(false);
 
1816
        
 
1817
        retry_read:
 
1818
        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
1819
        lock_(lock);
 
1820
        if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
 
1821
                unlock_(lock);
 
1822
                iRepoOffset = iRepoFileSize;
 
1823
                return_(false);
 
1824
        }
 
1825
        
 
1826
        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
 
1827
        blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
 
1828
        mdata_size = CS_GET_DISK_2(blob.rb_mdata_size_2);
 
1829
        mdata_offset = CS_GET_DISK_2(blob.rb_mdata_offset_2);
 
1830
        
 
1831
        status = CS_GET_DISK_1(blob.rb_status_1);
 
1832
        if ((head_size < (mdata_offset + mdata_size)) || !VALID_BLOB_STATUS(status)) {
 
1833
                /* Can't be true. Assume this is garbage! */
 
1834
                unlock_(lock);
 
1835
                iRepoOffset++;
 
1836
                goto retry_read;
 
1837
        }
 
1838
 
 
1839
        if (mdata_size && IN_USE_BLOB_STATUS(status)) {
 
1840
                iMetData->setLength((uint32_t) mdata_size);
 
1841
                if (iRepoFile->read(iMetData->getBuffer(0), iRepoOffset + mdata_offset, mdata_size, 0) < mdata_size) {
 
1842
                        unlock_(lock);
 
1843
                        iRepoOffset = iRepoFileSize;
 
1844
                        return_(false);
 
1845
                }
 
1846
 
 
1847
                iMetBlobRepo = iRepoFile->myRepo->getRepoID();
 
1848
                iMetBlobOffset = iRepoOffset;
 
1849
 
 
1850
                if (!positioned) 
 
1851
                        iMetDataPos = 0;
 
1852
 
 
1853
                iMetDataSize = mdata_size;
 
1854
                
 
1855
                // When ever the data position is reset the current location information
 
1856
                // must also be reset to that it is consisent with the data position.
 
1857
                iMetCurrentBlobOffset = iRepoOffset;
 
1858
                iMetCurrentBlobRepo = iRepoIndex;               
 
1859
                iMetCurrentDataPos = iMetDataPos;
 
1860
                iMetCurrentDataSize = iMetDataSize;
 
1861
                
 
1862
                *have_data = true;
 
1863
        }
 
1864
        unlock_(lock);
 
1865
        iRepoOffset += head_size + blob_size;
 
1866
        return_(true);
 
1867
}
 
1868
 
 
1869
bool MSMetaDataTable::returnRecord(char *buf)
 
1870
{
 
1871
        bool have_data = false;
 
1872
 
 
1873
        if (resetScan(false, &have_data) && have_data)
 
1874
                return(returnSubRecord(buf));
 
1875
                
 
1876
        return false;
 
1877
}
 
1878
 
 
1879
bool MSMetaDataTable::nextRecord(char **name, char **value)
 
1880
{
 
1881
        if (iMetDataPos < iMetDataSize) {
 
1882
                char *data = iMetData->getBuffer(iMetDataPos);
 
1883
                
 
1884
                *name = data;
 
1885
                data += strlen(*name) +1;
 
1886
                *value = data;
 
1887
                data += strlen(*value) +1;
 
1888
                
 
1889
                iMetDataPos += data - *name;
 
1890
                ASSERT(iMetDataPos <= iMetDataSize);
 
1891
                
 
1892
                return true;            
 
1893
        }
 
1894
 
 
1895
        return false;
 
1896
        
 
1897
}
 
1898
 
 
1899
bool MSMetaDataTable::returnSubRecord(char *buf)
 
1900
{
 
1901
        char *name, *value;
 
1902
        
 
1903
        if (nextRecord(&name, &value)) {
 
1904
                returnRow(name, value, buf);            
 
1905
                return true;            
 
1906
        }
 
1907
 
 
1908
        return false;
 
1909
}
 
1910
 
 
1911
void MSMetaDataTable::returnRow(char *name, char *value, char *buf)
 
1912
{
 
1913
        TABLE           *table = mySQLTable;
 
1914
        Field           *curr_field;
 
1915
        byte            *save;
 
1916
        MY_BITMAP       *save_write_set;
 
1917
 
 
1918
        save_write_set = table->write_set;
 
1919
        table->write_set = NULL;
 
1920
 
 
1921
        memset(buf, 0xFF, table->s->null_bytes);
 
1922
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
1923
                curr_field = *field;
 
1924
 
 
1925
                save = curr_field->ptr;
 
1926
#if MYSQL_VERSION_ID < 50114
 
1927
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
1928
#else
 
1929
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
 
1930
#endif
 
1931
                switch (curr_field->field_name[0]) {
 
1932
                        case 'R':
 
1933
                                switch (curr_field->field_name[6]) {
 
1934
                                        case 't':
 
1935
                                                // Repository_id     INT
 
1936
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
1937
                                                curr_field->store(iRepoFile->myRepo->getRepoID(), true);
 
1938
                                                setNotNullInRecord(curr_field, buf);
 
1939
                                                break;
 
1940
                                        case 'l':
 
1941
                                                // Repo_blob_offset  BIGINT
 
1942
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
1943
                                                curr_field->store(iMetCurrentBlobOffset, true);
 
1944
                                                setNotNullInRecord(curr_field, buf);
 
1945
                                                break;
 
1946
                                }
 
1947
                                break;
 
1948
                        case 'N':
 
1949
                                // Name        
 
1950
                                ASSERT(strcmp(curr_field->field_name, "Name") == 0);
 
1951
                                curr_field->store(name, strlen(name), &UTF8_CHARSET);
 
1952
                                setNotNullInRecord(curr_field, buf);
 
1953
                                break;
 
1954
                        case 'V':
 
1955
                                // Value        
 
1956
                                ASSERT(strcmp(curr_field->field_name, "Value") == 0);
 
1957
                                curr_field->store(value, strlen(value), &my_charset_utf8_bin);
 
1958
                                setNotNullInRecord(curr_field, buf);
 
1959
                                break;
 
1960
                }
 
1961
                curr_field->ptr = save;
 
1962
        }
 
1963
 
 
1964
        table->write_set = save_write_set;
 
1965
}
 
1966
 
 
1967
 
 
1968
#ifdef HAVE_ALIAS_SUPPORT
 
1969
bool MSMetaDataTable::matchAlias(uint32_t repo_id, uint64_t offset, const char *alias)
 
1970
{
 
1971
        bool matched = false, have_data = false;
 
1972
        enter_();
 
1973
        
 
1974
        if (resetScan(true, &have_data, repo_id) && have_data) {
 
1975
                const char *blob_alias;
 
1976
                MetaData md(iMetData->getBuffer(0), iMetDataSize);
 
1977
                
 
1978
                blob_alias = md.findAlias();
 
1979
                matched = (blob_alias && !my_strcasecmp(&UTF8_CHARSET, blob_alias, alias));
 
1980
        }
 
1981
        
 
1982
        return_(matched);
 
1983
}
 
1984
#endif
 
1985
 
 
1986
void MSMetaDataTable::insertRow(char *buf)
 
1987
{
 
1988
        uint32_t repo_index;
 
1989
        String meta_name, meta_value;   
 
1990
        uint16_t data_len;
 
1991
        uint64_t repo_offset;
 
1992
        char *data;             
 
1993
        bool have_data, reset_alias = false;
 
1994
        
 
1995
        enter_();
 
1996
        
 
1997
        // Metadata inserts are ignored during reovery.
 
1998
        // They will be restored from the dump table.
 
1999
        if (myShare->mySysDatabase->isRecovering())
 
2000
                exit_();
 
2001
                
 
2002
        seqScanReset();
 
2003
 
 
2004
        getFieldValue(buf, 0, &repo_index);
 
2005
        getFieldValue(buf, 1, &repo_offset);
 
2006
        getFieldValue(buf, 2, &meta_name);
 
2007
        getFieldValue(buf, 3, &meta_value);
 
2008
        
 
2009
        if (!repo_index)
 
2010
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
 
2011
                
 
2012
        iRepoOffset = repo_offset;
 
2013
        if (!resetScan(true, &have_data, repo_index -1))
 
2014
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
 
2015
        
 
2016
        const char *alias = NULL, *tag = meta_name.c_ptr_safe();
 
2017
        
 
2018
        if (iMetDataSize) {
 
2019
                MetaData md(iMetData->getBuffer(0), iMetDataSize);
 
2020
                // Check to see it this is a duplicate name.
 
2021
                if (md.findName(tag))
 
2022
                        CSException::throwException(CS_CONTEXT, HA_ERR_FOUND_DUPP_KEY, "Meta data tag already exists.");
 
2023
                        
 
2024
#ifdef HAVE_ALIAS_SUPPORT
 
2025
                alias = md.findAlias();
 
2026
#endif
 
2027
        }
 
2028
        
 
2029
        // Create the meta data record.
 
2030
#ifdef HAVE_ALIAS_SUPPORT
 
2031
        if (alias)
 
2032
                data = iMetData->getBuffer(0); // This is to be able to test if the alias pointer needs to be reset.
 
2033
        else
 
2034
#endif
 
2035
                data = NULL;
 
2036
                
 
2037
        iMetData->setLength(iMetDataSize + meta_name.length() + meta_value.length()  + 2);
 
2038
        
 
2039
#ifdef HAVE_ALIAS_SUPPORT
 
2040
        if (alias && (data != iMetData->getBuffer(0))) // The buffer moved, adjust the alias.
 
2041
                alias += (iMetData->getBuffer(0) - data);
 
2042
#endif
 
2043
        
 
2044
        data = iMetData->getBuffer(0);
 
2045
        data_len = iMetDataSize;
 
2046
        
 
2047
#ifdef HAVE_ALIAS_SUPPORT
 
2048
        if ((!alias) && !my_strcasecmp(&UTF8_CHARSET, MS_ALIAS_TAG, tag)) {
 
2049
                reset_alias = true;
 
2050
                memcpy(data + data_len, MS_ALIAS_TAG, meta_name->length()); // Use my alias tag so we do not need to wory about case.
 
2051
                alias = data + data_len + meta_name->length() + 1; // Set the alias to the value location.
 
2052
        } else 
 
2053
#endif
 
2054
                memcpy(data + data_len, meta_name.c_ptr_quick(), meta_name.length());
 
2055
                
 
2056
        data_len += meta_name.length();
 
2057
        data[data_len] = 0;
 
2058
        data_len++;
 
2059
 
 
2060
        memcpy(data + data_len, meta_value.c_ptr_quick(), meta_value.length());
 
2061
        data_len += meta_value.length();
 
2062
        data[data_len] = 0;
 
2063
        data_len++;
 
2064
        
 
2065
        // Update the blob header with the new meta data.
 
2066
        MSOpenTable     *otab = MSOpenTable::newOpenTable(NULL);
 
2067
        push_(otab);
 
2068
        iRepoFile->setBlobMetaData(otab, repo_offset, data, data_len, reset_alias, alias);
 
2069
        release_(otab);
 
2070
                
 
2071
        exit_();
 
2072
}
 
2073
/*
 
2074
insert into pbms_mata_data values(1, 921, "ATAG 3", "xx");
 
2075
insert into pbms_mata_data values(1, 921, "ATAG 1", "A VALUE 1");
 
2076
insert into pbms_mata_data values(1, 921, "ATAG 2", "xx");
 
2077
insert into pbms_mata_data values(1, 383, "ATAG 2", "xx");
 
2078
select * from pbms_mata_data;
 
2079
 
 
2080
delete from pbms_mata_data where value = "xx";
 
2081
select * from pbms_mata_data;
 
2082
 
 
2083
update pbms_mata_data set value = "!!" where name = "ATAG 3";
 
2084
update pbms_mata_data set Repo_blob_offset = 383 where value = "!!";
 
2085
 
 
2086
delete from pbms_mata_data where Repo_blob_offset = 921;
 
2087
*/
 
2088
//insert into pbms_mata_data values(1, 921, "blob_ALIAs", "My_alias");
 
2089
//select * from pbms_mata_data;
 
2090
//select * from pbms_repository;
 
2091
 
 
2092
 
 
2093
void MSMetaDataTable::deleteRow(char *buf)
 
2094
{
 
2095
        uint32_t repo_index;
 
2096
        String meta_name, meta_value;   
 
2097
        uint16_t record_size;
 
2098
        uint64_t repo_offset;
 
2099
        char *data;
 
2100
        bool have_data, reset_alias = false;
 
2101
        
 
2102
        enter_();
 
2103
        
 
2104
        seqScanReset();
 
2105
 
 
2106
        getFieldValue(buf, 0, &repo_index);
 
2107
        getFieldValue(buf, 1, &repo_offset);
 
2108
        getFieldValue(buf, 2, &meta_name);
 
2109
        getFieldValue(buf, 3, &meta_value);
 
2110
 
 
2111
        if (!repo_index)
 
2112
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
 
2113
                
 
2114
        iRepoOffset = repo_offset;
 
2115
        if (!resetScan(true, &have_data, repo_index -1))
 
2116
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
 
2117
        
 
2118
        const char *alias = NULL, *value = NULL, *tag = meta_name.c_ptr_safe();
 
2119
        char *location;
 
2120
        
 
2121
        // Check to see name exists.
 
2122
        MetaData md;
 
2123
 
 
2124
        md.use_data(iMetData->getBuffer(0), iMetDataSize);
 
2125
        if (iMetDataSize) 
 
2126
                value = md.findName(tag);
 
2127
        
 
2128
        if (value == NULL)
 
2129
                CSException::throwException(CS_CONTEXT, HA_ERR_KEY_NOT_FOUND, "Meta data tag dosn't exists.");
 
2130
                        
 
2131
#ifdef HAVE_ALIAS_SUPPORT
 
2132
        alias = md.findAlias();
 
2133
        
 
2134
        if (alias == value) {
 
2135
                reset_alias = true;
 
2136
                alias = NULL;
 
2137
        }
 
2138
#endif
 
2139
        
 
2140
        // Create the meta data record.
 
2141
        data = md.getBuffer();
 
2142
        location = md.findNamePosition(tag);
 
2143
        record_size = MetaData::recSize(location);
 
2144
        iMetDataSize -= record_size;
 
2145
        memmove(location, location + record_size, iMetDataSize - (location - data)); // Shift the meta data down
 
2146
 
 
2147
#ifdef HAVE_ALIAS_SUPPORT
 
2148
        // Get the alias again incase it moved.
 
2149
        if (alias)
 
2150
                alias = md.findAlias();
 
2151
#endif
 
2152
        
 
2153
        // Update the blob header with the new meta data.
 
2154
        MSOpenTable     *otab = MSOpenTable::newOpenTable(NULL);
 
2155
        push_(otab);
 
2156
        iRepoFile->setBlobMetaData(otab, repo_offset, data, iMetDataSize, reset_alias, alias);
 
2157
        release_(otab);
 
2158
 
 
2159
        exit_();
 
2160
}
 
2161
 
 
2162
void MSMetaDataTable::updateRow(char *old_data, char *new_data)
 
2163
{
 
2164
        uint32_t o_repo_index, n_repo_index;
 
2165
        String n_meta_name, n_meta_value;       
 
2166
        String o_meta_name, o_meta_value;       
 
2167
        uint16_t record_size;
 
2168
        uint64_t o_repo_offset, n_repo_offset;
 
2169
        char *data;     
 
2170
        bool have_data, reset_alias = false;
 
2171
        
 
2172
        enter_();
 
2173
        
 
2174
        seqScanReset();
 
2175
 
 
2176
        getFieldValue(new_data, 0, &n_repo_index);
 
2177
        getFieldValue(new_data, 1, &n_repo_offset);
 
2178
        getFieldValue(new_data, 2, &n_meta_name);
 
2179
        getFieldValue(new_data, 3, &n_meta_value);
 
2180
 
 
2181
        getFieldValue(old_data, 0, &o_repo_index);
 
2182
        getFieldValue(old_data, 1, &o_repo_offset);
 
2183
        getFieldValue(old_data, 2, &o_meta_name);
 
2184
        getFieldValue(old_data, 3, &o_meta_value);
 
2185
 
 
2186
        if ((!o_repo_index) || (!n_repo_index))
 
2187
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
 
2188
        
 
2189
        // If the meta data is not for the same BLOB then we do an insert and delete.
 
2190
        if ((n_repo_index != o_repo_index) || (n_repo_offset != o_repo_offset)) {
 
2191
                insertRow(new_data);
 
2192
                try_(a) {
 
2193
                        deleteRow(old_data);
 
2194
                }
 
2195
                catch_(a) {
 
2196
                        deleteRow(new_data);
 
2197
                        throw_();
 
2198
                }
 
2199
                cont_(a);
 
2200
                exit_();
 
2201
        }
 
2202
        
 
2203
        iRepoOffset = n_repo_offset;
 
2204
        if (!resetScan(true, &have_data, n_repo_index -1))
 
2205
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
 
2206
        
 
2207
        char *location;
 
2208
        const char *value, *alias = NULL, *n_tag = n_meta_name.c_ptr_safe(), *o_tag = o_meta_name.c_ptr_safe();
 
2209
        
 
2210
        if (!my_strcasecmp(&UTF8_CHARSET, o_tag, n_tag))
 
2211
                n_tag = NULL;
 
2212
                
 
2213
        MetaData md;
 
2214
 
 
2215
        md.use_data(iMetData->getBuffer(0), iMetDataSize);
 
2216
        
 
2217
        if ((!iMetDataSize) || ((value = md.findName(o_tag)) == NULL))
 
2218
                CSException::throwException(CS_CONTEXT, HA_ERR_KEY_NOT_FOUND, "Meta data tag dosn't exists.");
 
2219
                        
 
2220
        if (n_tag && (md.findName(n_tag) != NULL))
 
2221
                CSException::throwException(CS_CONTEXT, HA_ERR_FOUND_DUPP_KEY, "Meta data tag already exists.");
 
2222
                
 
2223
#ifdef HAVE_ALIAS_SUPPORT
 
2224
        alias = md.findAlias();
 
2225
 
 
2226
        if (alias == value) {
 
2227
                reset_alias = true;
 
2228
                alias = NULL; // The alias is being deleted.
 
2229
        }
 
2230
#endif
 
2231
        
 
2232
        if (!n_tag)
 
2233
                n_tag = o_tag;
 
2234
                
 
2235
        // Create the meta data record.
 
2236
        data = md.getBuffer();
 
2237
        location = md.findNamePosition(o_tag);
 
2238
        record_size = MetaData::recSize(location);
 
2239
        iMetDataSize -= record_size;
 
2240
        memmove(location, location + record_size, iMetDataSize - (location - data)); // Shift the meta data down
 
2241
        
 
2242
        // Add the updated meta data to the end of the block.
 
2243
        iMetData->setLength(iMetDataSize + n_meta_name.length() + n_meta_value.length()  + 2);
 
2244
        
 
2245
        md.use_data(iMetData->getBuffer(0), iMetDataSize); // Reset this incase the buffer moved.
 
2246
#ifdef HAVE_ALIAS_SUPPORT
 
2247
        // Get the alias again incase it moved.
 
2248
        if (alias)
 
2249
                alias = md.findAlias();
 
2250
#endif
 
2251
        
 
2252
        data = iMetData->getBuffer(0);
 
2253
                
 
2254
#ifdef HAVE_ALIAS_SUPPORT
 
2255
        if ((!alias) && !my_strcasecmp(&UTF8_CHARSET, MS_ALIAS_TAG, n_tag)) {
 
2256
                reset_alias = true;
 
2257
                memcpy(data + iMetDataSize, MS_ALIAS_TAG, n_meta_name.length()); // Use my alias tag so we do not need to wory about case.
 
2258
                alias = data + iMetDataSize + n_meta_name.length() + 1; // Set the alias to the value location.
 
2259
        } else 
 
2260
#endif
 
2261
                memcpy(data + iMetDataSize, n_meta_name.c_ptr_quick(), n_meta_name.length());
 
2262
                
 
2263
        iMetDataSize += n_meta_name.length();
 
2264
        data[iMetDataSize] = 0;
 
2265
        iMetDataSize++;
 
2266
 
 
2267
        memcpy(data + iMetDataSize, n_meta_value.c_ptr_quick(), n_meta_value.length());
 
2268
        iMetDataSize += n_meta_value.length();
 
2269
        data[iMetDataSize] = 0;
 
2270
        iMetDataSize++;
 
2271
        
 
2272
        
 
2273
        // Update the blob header with the new meta data.
 
2274
        MSOpenTable     *otab = MSOpenTable::newOpenTable(NULL);
 
2275
        push_(otab);
 
2276
        iRepoFile->setBlobMetaData(otab, n_repo_offset, data, iMetDataSize, reset_alias, alias);
 
2277
        release_(otab);
 
2278
                
 
2279
        exit_();
 
2280
}
 
2281
 
 
2282
/*
 
2283
 * -------------------------------------------------------------------------
 
2284
 * SYSTEM TABLE SHARES
 
2285
 */
 
2286
 
 
2287
CSSyncSortedList *MSSystemTableShare::gSystemTableList;
 
2288
 
 
2289
MSSystemTableShare::MSSystemTableShare():
 
2290
CSRefObject(),
 
2291
myTablePath(NULL),
 
2292
mySysDatabase(NULL),
 
2293
iOpenCount(0)
 
2294
{
 
2295
        thr_lock_init(&myThrLock);
 
2296
}
 
2297
 
 
2298
MSSystemTableShare::~MSSystemTableShare()
 
2299
{
 
2300
        thr_lock_delete(&myThrLock);
 
2301
        if (myTablePath) {
 
2302
                myTablePath->release();
 
2303
                myTablePath = NULL;
 
2304
        }
 
2305
        if (mySysDatabase) {
 
2306
                mySysDatabase->release();
 
2307
                mySysDatabase = NULL;
 
2308
        }
 
2309
}
 
2310
 
 
2311
CSObject *MSSystemTableShare::getKey()
 
2312
{
 
2313
        return (CSObject *) myTablePath;
 
2314
}
 
2315
 
 
2316
int MSSystemTableShare::compareKey(CSObject *key)
 
2317
{
 
2318
        return myTablePath->compare((CSString *) key);
 
2319
}
 
2320
 
 
2321
void MSSystemTableShare::startUp()
 
2322
{
 
2323
        new_(gSystemTableList, CSSyncSortedList);
 
2324
}
 
2325
 
 
2326
void MSSystemTableShare::shutDown()
 
2327
{
 
2328
        if (gSystemTableList) {
 
2329
                gSystemTableList->release();
 
2330
                gSystemTableList = NULL;
 
2331
        }
 
2332
}
 
2333
 
 
2334
MSOpenSystemTable *MSSystemTableShare::openSystemTable(const char *table_path, TABLE *table)
 
2335
{
 
2336
        CSString                        *table_url;
 
2337
        MSSystemTableShare      *share;
 
2338
        MSOpenSystemTable       *otab = NULL;
 
2339
        SysTableType            table_type;
 
2340
 
 
2341
        enter_();
 
2342
        
 
2343
        table_type =  pbms_systable_type(cs_last_name_of_path(table_path));
 
2344
        if (table_type == SYS_UNKNOWN)
 
2345
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, "Table not found");
 
2346
        
 
2347
        table_path = cs_last_name_of_path(table_path, 2);
 
2348
        table_url = CSString::newString(table_path);
 
2349
        push_(table_url);
 
2350
 
 
2351
        lock_(gSystemTableList);
 
2352
        if (!(share = (MSSystemTableShare *) gSystemTableList->find(table_url))) {
 
2353
                share = MSSystemTableShare::newTableShare(RETAIN(table_url));
 
2354
                gSystemTableList->add(share);
 
2355
        }
 
2356
        
 
2357
        switch (table_type) {
 
2358
                case SYS_REP:
 
2359
                        new_(otab, MSRepositoryTable(share, table));
 
2360
                        break;
 
2361
                case SYS_REF:
 
2362
                        new_(otab, MSReferenceTable(share, table));
 
2363
                        break;
 
2364
                case SYS_BLOB:
 
2365
                        new_(otab, MSBlobDataTable(share, table));
 
2366
                        break;
 
2367
                case SYS_DUMP:
 
2368
                        new_(otab, MSDumpTable(share, table));
 
2369
                        break;
 
2370
                case SYS_META:
 
2371
                        new_(otab, MSMetaDataTable(share, table));
 
2372
                        break;
 
2373
                case SYS_HTTP:
 
2374
                        new_(otab, MSHTTPHeaderTable(share, table));
 
2375
                        break;
 
2376
#ifdef HAVE_ALIAS_SUPPORT
 
2377
                case SYS_ALIAS:
 
2378
                        new_(otab, MSBlobAliasTable(share, table));
 
2379
                        break;
 
2380
#endif
 
2381
                case SYS_VARIABLE:
 
2382
                        new_(otab, MSVariableTable(share, table));
 
2383
                        break;
 
2384
                case SYS_CLOUD:
 
2385
                        new_(otab, MSCloudTable(share, table));
 
2386
                        break;
 
2387
                case SYS_BACKUP:
 
2388
                        new_(otab, MSBackupTable(share, table));
 
2389
                        break;
 
2390
#ifndef DRIZZLED
 
2391
                case SYS_ENABLED:
 
2392
                        new_(otab, MSEnabledTable(share, table));
 
2393
                        break;
 
2394
#endif
 
2395
                case SYS_UNKNOWN:
 
2396
                        break;
 
2397
        }
 
2398
        
 
2399
        share->iOpenCount++;
 
2400
        unlock_(gSystemTableList);
 
2401
 
 
2402
        release_(table_url);
 
2403
        return_(otab);
 
2404
}
 
2405
 
 
2406
void MSSystemTableShare::removeDatabaseSystemTables(MSDatabase *doomed_db)
 
2407
{
 
2408
        MSSystemTableShare      *share;
 
2409
        uint32_t i= 0;
 
2410
        enter_();
 
2411
        
 
2412
        push_(doomed_db);
 
2413
        lock_(gSystemTableList);
 
2414
        while ((share = (MSSystemTableShare *) gSystemTableList->itemAt(i))) {
 
2415
                if (share->mySysDatabase == doomed_db) {
 
2416
                        gSystemTableList->remove(share->myTablePath);
 
2417
                } else
 
2418
                        i++;
 
2419
        }
 
2420
        
 
2421
        unlock_(gSystemTableList);
 
2422
        release_(doomed_db);
 
2423
        exit_();
 
2424
}
 
2425
 
 
2426
void MSSystemTableShare::releaseSystemTable(MSOpenSystemTable *tab)
 
2427
{
 
2428
        enter_();
 
2429
        lock_(gSystemTableList);
 
2430
        tab->myShare->iOpenCount--;
 
2431
        if (!tab->myShare->iOpenCount) {
 
2432
                gSystemTableList->remove(tab->myShare->myTablePath);
 
2433
        }
 
2434
        unlock_(gSystemTableList);
 
2435
        exit_();
 
2436
}
 
2437
 
 
2438
MSSystemTableShare *MSSystemTableShare::newTableShare(CSString *table_path)
 
2439
{
 
2440
        MSSystemTableShare *tab;
 
2441
 
 
2442
        enter_();
 
2443
        if (!(tab = new MSSystemTableShare())) {
 
2444
                table_path->release();
 
2445
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
2446
        }
 
2447
        push_(tab);
 
2448
        tab->myTablePath = table_path;
 
2449
        tab->mySysDatabase = MSDatabase::getDatabase(table_path->left("/", -1), true);
 
2450
        pop_(tab);
 
2451
        return_(tab);
 
2452
}
 
2453
 
 
2454
void PBMSSystemTables::systemTablesStartUp()
 
2455
{
 
2456
        MSCloudTable::startUp();
 
2457
        MSBackupTable::startUp();
 
2458
}
 
2459
 
 
2460
void PBMSSystemTables::systemTableShutDown()
 
2461
{
 
2462
        MSBackupTable::shutDown();
 
2463
        MSCloudTable::shutDown();
 
2464
}
 
2465