1
/* Copyright (c) 2009 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2009-09-07 Paul McCullagh
24
#include "xt_config.h"
26
#ifdef MYSQL_SUPPORTS_BACKUP
34
#include "mysql_priv.h"
35
#include <backup/api_types.h>
36
#include <backup/backup_engine.h>
37
#include <backup/backup_aux.h> // for build_table_list()
42
#include "backup_xt.h"
43
#include "pthread_xt.h"
44
#include "filesys_xt.h"
45
#include "database_xt.h"
46
#include "strutil_xt.h"
47
#include "memory_xt.h"
60
//#define TRACE_BACKUP_CALLS
61
//#define TEST_SMALL_BLOCK 100000
65
using backup::result_t;
66
using backup::version_t;
67
using backup::Table_list;
68
using backup::Table_ref;
71
#ifdef TRACE_BACKUP_CALLS
72
#define XT_TRACE_CALL() ha_trace_function(__FUNC__, NULL)
74
#define XT_TRACE_CALL()
77
#define XT_RESTORE_BATCH_SIZE 10000
79
#define BUP_STATE_BEFORE_LOCK 0
80
#define BUP_STATE_AFTER_LOCK 1
82
#define BUP_STANDARD_VAR_RECORD 1
83
#define BUP_RECORD_BLOCK_4_START 2 // Part of a record, with a 4 byte total length, and 4 byte data length
84
#define BUP_RECORD_BLOCK_4 3 // Part of a record, with a 4 byte length
85
#define BUP_RECORD_BLOCK_4_END 4 // Last part of a record with a 4 byte length
88
* -----------------------------------------------------------------------
92
#ifdef TRACE_BACKUP_CALLS
93
static void ha_trace_function(const char *function, char *table)
95
char func_buf[50], *ptr;
96
XTThreadPtr thread = xt_get_self();
98
if ((ptr = strchr(function, '('))) {
100
while (ptr > function) {
101
if (!(isalnum(*ptr) || *ptr == '_'))
106
xt_strcpy(50, func_buf, ptr);
107
if ((ptr = strchr(func_buf, '(')))
111
xt_strcpy(50, func_buf, function);
113
printf("%s %s (%s)\n", thread ? thread->t_name : "-unknown-", func_buf, table);
115
printf("%s %s\n", thread ? thread->t_name : "-unknown-", func_buf);
120
* -----------------------------------------------------------------------
124
class PBXTBackupDriver: public Backup_driver
127
PBXTBackupDriver(const Table_list &);
128
virtual ~PBXTBackupDriver();
130
virtual size_t size();
131
virtual size_t init_size();
132
virtual result_t begin(const size_t);
133
virtual result_t end();
134
virtual result_t get_data(Buffer &);
135
virtual result_t prelock();
136
virtual result_t lock();
137
virtual result_t unlock();
138
virtual result_t cancel();
140
void lock_tables_TL_READ_NO_INSERT();
143
XTThreadPtr bd_thread;
146
XTOpenTablePtr bd_ot;
149
/* Non-zero if we last returned only part of
152
xtWord1 *db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *size, xtWord4 row_len);
153
xtWord1 *db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *size, xtWord4 total_len, xtWord4 row_len);
155
xtWord4 bd_row_offset;
160
PBXTBackupDriver::PBXTBackupDriver(const Table_list &tables):
161
Backup_driver(tables),
162
bd_state(BUP_STATE_BEFORE_LOCK),
171
PBXTBackupDriver::~PBXTBackupDriver()
175
/** Estimates total size of backup. @todo improve it */
176
size_t PBXTBackupDriver::size()
182
/** Estimates size of backup before lock. @todo improve it */
183
size_t PBXTBackupDriver::init_size()
189
result_t PBXTBackupDriver::begin(const size_t)
191
THD *thd = current_thd;
196
if (!(bd_thread = xt_ha_set_current_thread(thd, &e))) {
197
xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
198
return backup::ERROR;
204
result_t PBXTBackupDriver::end()
208
xt_tab_seq_exit(bd_ot);
209
xt_db_return_table_to_pool_ns(bd_ot);
212
if (bd_thread->st_xact_data) {
213
if (!xt_xn_commit(bd_thread))
214
return backup::ERROR;
219
xtWord1 *PBXTBackupDriver::db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *ret_size, xtWord4 row_len)
221
register size_t size = *ret_size;
223
*buffer = bup_type; // Record type identifier.
226
memcpy(buffer, bd_ot->ot_row_wbuffer, row_len);
233
xtWord1 *PBXTBackupDriver::db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *ret_size, xtWord4 total_len, xtWord4 row_len)
235
register size_t size = *ret_size;
237
*buffer = bup_type; // Record type identifier.
240
if (bup_type == BUP_RECORD_BLOCK_4_START) {
241
XT_SET_DISK_4(buffer, total_len);
245
XT_SET_DISK_4(buffer, row_len);
248
memcpy(buffer, bd_ot->ot_row_wbuffer+bd_row_offset, row_len);
251
bd_row_size -= row_len;
252
bd_row_offset += row_len;
257
result_t PBXTBackupDriver::get_data(Buffer &buf)
266
if (bd_state == BUP_STATE_BEFORE_LOCK) {
270
return backup::READY;
273
/* Open the backup table: */
275
XTThreadPtr self = bd_thread;
279
if (bd_table_no == m_tables.count()) {
286
m_tables[bd_table_no].internal_name(path, sizeof(path));
289
xt_ha_open_database_of_table(self, (XTPathStrPtr) path);
290
tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE);
291
pushr_(xt_heap_release, tab);
292
if (!(bd_ot = xt_db_open_table_using_tab(tab, bd_thread)))
294
freer_(); // xt_heap_release(tab)
296
/* Prepare the seqential scan: */
297
xt_tab_seq_exit(bd_ot);
298
if (!xt_tab_seq_init(bd_ot))
302
xt_free(self, bd_row_buf);
305
bd_row_buf = (xtWord1 *) xt_malloc(self, bd_ot->ot_table->tab_dic.dic_mysql_buf_size);
306
bd_ot->ot_cols_req = bd_ot->ot_table->tab_dic.dic_no_of_cols;
317
buf.table_num = bd_table_no;
318
#ifdef TEST_SMALL_BLOCK
319
buf.size = TEST_SMALL_BLOCK;
322
buffer = (xtWord1 *) buf.data;
325
/* First check of a record was partically written
329
if (bd_row_size > 0) {
330
row_len = bd_row_size;
331
if (bd_row_offset == 0) {
332
if (row_len+1 > size) {
335
buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4_START, &size, bd_row_size, row_len);
338
buffer = db_write_block(buffer, BUP_STANDARD_VAR_RECORD, &size, row_len);
342
if (row_len+5 > size) {
344
buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4, &size, 0, row_len);
347
buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4_END, &size, 0, row_len);
351
/* Now continue with the sequential scan. */
353
if (!xt_tab_seq_next(bd_ot, bd_row_buf, &eof))
356
/* We will go the next table, on the next call. */
357
xt_tab_seq_exit(bd_ot);
358
xt_db_return_table_to_pool_ns(bd_ot);
362
if (!(row_len = myxt_store_row_data(bd_ot, 0, (char *) bd_row_buf)))
364
if (row_len+1 > size) {
367
bd_row_size = row_len;
368
/* Only add part of the row, if there is still
369
* quite a bit of space left:
371
if (size >= (32 * 1024))
375
buffer = db_write_block(buffer, BUP_STANDARD_VAR_RECORD, &size, row_len);
379
buf.size = buf.size - size;
380
/* This indicates wnd of data for a table! */
386
xt_log_and_clear_exception(bd_thread);
387
return backup::ERROR;
390
result_t PBXTBackupDriver::prelock()
393
return backup::READY;
396
result_t PBXTBackupDriver::lock()
399
bd_thread->st_xact_mode = XT_XACT_COMMITTED_READ;
400
bd_thread->st_ignore_fkeys = FALSE;
401
bd_thread->st_auto_commit = FALSE;
402
bd_thread->st_table_trans = FALSE;
403
bd_thread->st_abort_trans = FALSE;
404
bd_thread->st_stat_ended = FALSE;
405
bd_thread->st_stat_trans = FALSE;
406
bd_thread->st_is_update = NULL;
407
if (!xt_xn_begin(bd_thread))
408
return backup::ERROR;
409
bd_state = BUP_STATE_AFTER_LOCK;
413
result_t PBXTBackupDriver::unlock()
419
result_t PBXTBackupDriver::cancel()
422
return backup::OK; // free() will be called and suffice
425
void PBXTBackupDriver::free()
429
xt_tab_seq_exit(bd_ot);
430
xt_db_return_table_to_pool_ns(bd_ot);
434
xt_free_ns(bd_row_buf);
437
if (bd_thread->st_xact_data)
438
xt_xn_rollback(bd_thread);
442
void PBXTBackupDriver::lock_tables_TL_READ_NO_INSERT()
448
* -----------------------------------------------------------------------
452
class PBXTRestoreDriver: public Restore_driver
455
PBXTRestoreDriver(const Table_list &tables);
456
virtual ~PBXTRestoreDriver();
458
virtual result_t begin(const size_t);
459
virtual result_t end();
460
virtual result_t send_data(Buffer &buf);
461
virtual result_t cancel();
465
XTThreadPtr rd_thread;
467
XTOpenTablePtr rd_ot;
468
STRUCT_TABLE *rd_my_table;
471
u_int rb_insert_count;
473
/* Long rows are accumulated here: */
475
xtWord4 rb_data_size;
476
xtWord1 *rb_row_data;
479
PBXTRestoreDriver::PBXTRestoreDriver(const Table_list &tables):
480
Restore_driver(tables),
491
PBXTRestoreDriver::~PBXTRestoreDriver()
495
result_t PBXTRestoreDriver::begin(const size_t)
497
THD *thd = current_thd;
502
if (!(rd_thread = xt_ha_set_current_thread(thd, &e))) {
503
xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
504
return backup::ERROR;
510
result_t PBXTRestoreDriver::end()
514
xt_db_return_table_to_pool_ns(rd_ot);
518
// xt_free_ns(rb_row_buf);
519
// rb_row_buf = NULL;
522
xt_free_ns(rb_row_data);
525
if (rd_thread->st_xact_data) {
526
if (!xt_xn_commit(rd_thread))
527
return backup::ERROR;
533
result_t PBXTRestoreDriver::send_data(Buffer &buf)
543
if (buf.table_num != rd_table_no) {
544
XTThreadPtr self = rd_thread;
549
xt_db_return_table_to_pool_ns(rd_ot);
553
if (rd_thread->st_xact_data) {
554
if (!xt_xn_commit(rd_thread))
557
if (!xt_xn_begin(rd_thread))
561
rd_table_no = buf.table_num;
562
m_tables[rd_table_no-1].internal_name(path, sizeof(path));
564
xt_ha_open_database_of_table(self, (XTPathStrPtr) path);
565
tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE);
566
pushr_(xt_heap_release, tab);
567
if (!(rd_ot = xt_db_open_table_using_tab(tab, rd_thread)))
569
freer_(); // xt_heap_release(tab)
571
rd_my_table = rd_ot->ot_table->tab_dic.dic_my_table;
572
if (rd_my_table->found_next_number_field) {
573
rd_my_table->in_use = current_thd;
574
rd_my_table->next_number_field = rd_my_table->found_next_number_field;
575
rd_my_table->mark_columns_used_by_index_no_reset(rd_my_table->s->next_number_index, rd_my_table->read_set);
578
/* This is safe because only one thread can restore a table at
581
rb_row_buf = (xtWord1 *) rd_my_table->record[0];
583
// xt_free(self, rb_row_buf);
584
// rb_row_buf = NULL;
586
//rb_row_buf = (xtWord1 *) xt_malloc(self, rd_ot->ot_table->tab_dic.dic_mysql_buf_size);
588
rb_col_cnt = rd_ot->ot_table->tab_dic.dic_no_of_cols;
600
buffer = (xtWord1 *) buf.data;
606
case BUP_STANDARD_VAR_RECORD:
607
rec_data = buffer + 1;
609
case BUP_RECORD_BLOCK_4_START:
611
row_len = XT_GET_DISK_4(buffer);
613
if (rb_data_size < row_len) {
614
if (!xt_realloc_ns((void **) &rb_row_data, row_len))
616
rb_data_size = row_len;
618
row_len = XT_GET_DISK_4(buffer);
620
ASSERT_NS(row_len <= rb_data_size);
621
if (row_len > rb_data_size) {
622
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
625
memcpy(rb_row_data, buffer, row_len);
626
rb_row_len = row_len;
628
if (row_len + 9 > size) {
629
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
634
case BUP_RECORD_BLOCK_4:
636
row_len = XT_GET_DISK_4(buffer);
638
ASSERT_NS(rb_row_len + row_len <= rb_data_size);
639
if (rb_row_len + row_len > rb_data_size) {
640
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
643
memcpy(rb_row_data + rb_row_len, buffer, row_len);
644
rb_row_len += row_len;
646
if (row_len + 5 > size) {
647
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
652
case BUP_RECORD_BLOCK_4_END:
654
row_len = XT_GET_DISK_4(buffer);
656
ASSERT_NS(rb_row_len + row_len <= rb_data_size);
657
if (rb_row_len + row_len > rb_data_size) {
658
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
661
memcpy(rb_row_data + rb_row_len, buffer, row_len);
663
if (row_len + 5 > size) {
664
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
668
rec_data = rb_row_data;
671
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
675
if (!(row_len = myxt_load_row_data(rd_ot, rec_data, rb_row_buf, rb_col_cnt)))
678
if (rd_ot->ot_table->tab_dic.dic_my_table->found_next_number_field)
679
ha_set_auto_increment(rd_ot, rd_ot->ot_table->tab_dic.dic_my_table->found_next_number_field);
681
if (!xt_tab_new_record(rd_ot, rb_row_buf))
684
if (type == BUP_STANDARD_VAR_RECORD) {
686
if (row_len + 1 > size) {
687
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
694
if (rb_insert_count == XT_RESTORE_BATCH_SIZE) {
695
if (!xt_xn_commit(rd_thread))
697
if (!xt_xn_begin(rd_thread))
706
xt_log_and_clear_exception(rd_thread);
707
return backup::ERROR;
711
result_t PBXTRestoreDriver::cancel()
714
/* Nothing to do in cancel(); free() will suffice */
718
void PBXTRestoreDriver::free()
722
xt_db_return_table_to_pool_ns(rd_ot);
726
// xt_free_ns(rb_row_buf);
727
// rb_row_buf = NULL;
730
xt_free_ns(rb_row_data);
733
if (rd_thread->st_xact_data)
734
xt_xn_rollback(rd_thread);
739
* -----------------------------------------------------------------------
740
* BACKUP ENGINE FACTORY
743
#define PBXT_BACKUP_VERSION 1
746
class PBXTBackupEngine: public Backup_engine
749
PBXTBackupEngine() { };
751
virtual version_t version() const {
752
return PBXT_BACKUP_VERSION;
755
virtual result_t get_backup(const uint32, const Table_list &, Backup_driver* &);
757
virtual result_t get_restore(const version_t, const uint32, const Table_list &,Restore_driver* &);
765
result_t PBXTBackupEngine::get_backup(const u_int count, const Table_list &tables, Backup_driver* &drv)
767
PBXTBackupDriver *ptr = new PBXTBackupDriver(tables);
770
return backup::ERROR;
775
result_t PBXTBackupEngine::get_restore(const version_t ver, const uint32,
776
const Table_list &tables, Restore_driver* &drv)
778
if (ver > PBXT_BACKUP_VERSION)
780
return backup::ERROR;
783
PBXTRestoreDriver *ptr = new PBXTRestoreDriver(tables);
786
return backup::ERROR;
787
drv = (Restore_driver *) ptr;
792
Backup_result_t pbxt_backup_engine(handlerton *self, Backup_engine* &be)
794
be = new PBXTBackupEngine();
797
return backup::ERROR;