1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2010 Brian Aker
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
#include "read_replication.h"
24
#include "create_replication.h"
27
#include "dict0crea.ic"
32
#include "page0page.h"
33
#include "mach0data.h"
34
#include "dict0boot.h"
35
#include "dict0dict.h"
38
#include "row0mysql.h"
39
#include "pars0pars.h"
43
#include "row0merge.h"
44
#include "row0mysql.h"
46
UNIV_INTERN ulint dict_create_sys_replication_log(void)
52
mutex_enter(&(dict_sys->mutex));
54
table1 = dict_table_get_low("SYS_REPLICATION_LOG");
56
trx_sys_read_commit_id();
60
mutex_exit(&(dict_sys->mutex));
65
mutex_exit(&(dict_sys->mutex));
67
trx= trx_allocate_for_mysql();
69
trx->op_info= "creating replication sys table";
71
row_mysql_lock_data_dictionary(trx);
73
pars_info_t *info= pars_info_create();
76
error = que_eval_sql(info,
77
"PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
79
"CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
80
"CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
81
"CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
87
if (error != DB_SUCCESS)
89
fprintf(stderr, "InnoDB: error %lu in creation.\n", (ulong) error);
91
ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
94
"InnoDB: creation failed\n"
95
"InnoDB: tablespace is full\n"
96
"InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
98
row_drop_table_for_mysql("SYS_REPLICATION_LOG", trx, TRUE);
100
error = DB_MUST_GET_MORE_FILE_SPACE;
103
trx_commit_for_mysql(trx);
105
row_mysql_unlock_data_dictionary(trx);
107
trx_free_for_mysql(trx);
112
UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
114
std::string search_string(table_name);
115
boost::algorithm::to_lower(search_string);
117
if (search_string.compare("sys_replication_log") != 0)
120
drizzled::message::Engine *engine= table_message->mutable_engine();
121
engine->set_name("InnoDB");
122
table_message->set_name("SYS_REPLICATION_LOG");
123
table_message->set_schema("DATA_DICTIONARY");
124
table_message->set_type(drizzled::message::Table::STANDARD);
125
table_message->set_creation_timestamp(0);
126
table_message->set_update_timestamp(0);
128
drizzled::message::Table::TableOptions *options= table_message->mutable_options();
129
options->set_collation_id(drizzled::my_charset_bin.number);
130
options->set_collation(drizzled::my_charset_bin.name);
131
options->set_dont_replicate(true);
133
drizzled::message::Table::Field *field= table_message->add_field();
134
field->set_name("ID");
135
field->set_type(drizzled::message::Table::Field::BIGINT);
137
field= table_message->add_field();
138
field->set_name("SEGID");
139
field->set_type(drizzled::message::Table::Field::INTEGER);
141
field= table_message->add_field();
142
field->set_name("COMMIT_ID");
143
field->set_type(drizzled::message::Table::Field::BIGINT);
145
field= table_message->add_field();
146
field->set_name("END_TIMESTAMP");
147
field->set_type(drizzled::message::Table::Field::BIGINT);
149
field= table_message->add_field();
150
field->set_name("MESSAGE_LEN");
151
field->set_type(drizzled::message::Table::Field::INTEGER);
153
field= table_message->add_field();
154
field->set_name("MESSAGE");
155
field->set_type(drizzled::message::Table::Field::BLOB);
156
drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
157
stropt->set_collation_id(drizzled::my_charset_bin.number);
158
stropt->set_collation(drizzled::my_charset_bin.name);
160
drizzled::message::Table::Index *index= table_message->add_indexes();
161
index->set_name("PRIMARY");
162
index->set_is_primary(true);
163
index->set_is_unique(true);
164
index->set_type(drizzled::message::Table::Index::BTREE);
165
index->set_key_length(8);
166
drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
167
part->set_fieldnr(0);
168
part->set_compare_length(8);
169
part= index->add_index_part();
170
part->set_fieldnr(1);
171
part->set_compare_length(4);
173
index= table_message->add_indexes();
174
index->set_name("COMMIT_IDX");
175
index->set_is_primary(false);
176
index->set_is_unique(false);
177
index->set_type(drizzled::message::Table::Index::BTREE);
178
index->set_key_length(8);
179
part= index->add_index_part();
180
part->set_fieldnr(2);
181
part->set_compare_length(8);
182
part= index->add_index_part();
183
part->set_fieldnr(0);
184
part->set_compare_length(8);
189
extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t* prebuilt);
191
ulint insert_replication_message(const char *message, size_t size,
192
trx_t *trx, uint64_t trx_id,
193
uint64_t end_timestamp, bool is_end_segment,
197
row_prebuilt_t* prebuilt; /* For reading rows */
202
table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
204
prebuilt = row_create_prebuilt(table);
206
if (prebuilt->trx != trx)
208
row_update_prebuilt_trx(prebuilt, trx);
211
/* DDL operations create table/drop table call
212
* innobase_commit_low() which will commit the trx
213
* that leaves the operation of committing to the
214
* log in a new trx. If that is the case we need
215
* to keep track and commit the trx later in this
218
bool is_started= true;
219
if (trx->conc_state == TRX_NOT_STARTED)
224
dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
227
dfield = dtuple_get_nth_field(dtuple, 0);
228
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
229
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
230
dfield_set_data(dfield, data, 8);
232
dfield = dtuple_get_nth_field(dtuple, 1);
234
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
235
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
236
dfield_set_data(dfield, data, 4);
238
uint64_t commit_id= 0;
241
commit_id= trx_sys_commit_id.increment();
244
dfield = dtuple_get_nth_field(dtuple, 2);
245
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
246
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
247
dfield_set_data(dfield, data, 8);
249
dfield = dtuple_get_nth_field(dtuple, 3);
250
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
251
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
252
dfield_set_data(dfield, data, 8);
254
dfield = dtuple_get_nth_field(dtuple, 4);
255
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
256
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
257
dfield_set_data(dfield, data, 4);
259
dfield = dtuple_get_nth_field(dtuple, 5);
260
dfield_set_data(dfield, message, size);
262
ins_node_t* node = prebuilt->ins_node;
264
thr = que_fork_get_first_thr(prebuilt->ins_graph);
266
if (prebuilt->sql_stat_start) {
267
node->state = INS_NODE_SET_IX_LOCK;
268
prebuilt->sql_stat_start = FALSE;
270
node->state = INS_NODE_ALLOC_ROW_ID;
273
que_thr_move_to_run_state_for_mysql(thr, trx);
276
thr->run_node = node;
277
thr->prev_node = node;
281
error = trx->error_state;
283
que_thr_stop_for_mysql_no_error(thr, trx);
284
row_prebuilt_free(prebuilt, FALSE);
288
trx_commit_for_mysql(trx);
294
UNIV_INTERN read_replication_state_st *replication_read_init(void)
296
read_replication_state_st *state= new read_replication_state_st;
298
mutex_enter(&(dict_sys->mutex));
300
mtr_start(&state->mtr);
301
state->sys_tables= dict_table_get_low("SYS_REPLICATION_LOG");
302
state->sys_index= UT_LIST_GET_FIRST(state->sys_tables->indexes);
304
mutex_exit(&(dict_sys->mutex));
306
btr_pcur_open_at_index_side(TRUE, state->sys_index, BTR_SEARCH_LEAF, &state->pcur, TRUE, &state->mtr);
311
UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *state)
313
btr_pcur_close(&state->pcur);
314
mtr_commit(&state->mtr);
318
UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
320
struct read_replication_return_st ret;
323
btr_pcur_move_to_next_user_rec(&state->pcur, &state->mtr);
325
rec= btr_pcur_get_rec(&state->pcur);
327
while (btr_pcur_is_on_user_rec(&state->pcur))
332
// Is the row deleted? If so go fetch the next
333
if (rec_get_deleted_flag(rec, 0))
336
// Store transaction id
337
field = rec_get_nth_field_old(rec, 0, &len);
339
convert_to_mysql_format(idbyte, field, 8);
340
ret.id= *(uint64_t *)idbyte;
343
field = rec_get_nth_field_old(rec, 1, &len);
345
convert_to_mysql_format(segbyte, field, 4);
346
ret.seg_id= *(uint32_t *)segbyte;
348
field = rec_get_nth_field_old(rec, 4, &len);
350
convert_to_mysql_format(commitbyte, field, 8);
351
ret.commit_id= *(uint64_t *)commitbyte;
353
field = rec_get_nth_field_old(rec, 5, &len);
354
byte timestampbyte[8];
355
convert_to_mysql_format(timestampbyte, field, 8);
356
ret.end_timestamp= *(uint64_t *)timestampbyte;
359
field = rec_get_nth_field_old(rec, 7, &len);
360
ret.message= (char *)field;
361
ret.message_length= len;
363
// @todo double check that "field" will continue to be value past this
365
btr_pcur_store_position(&state->pcur, &state->mtr);
366
mtr_commit(&state->mtr);
368
mtr_start(&state->mtr);
370
btr_pcur_restore_position(BTR_SEARCH_LEAF, &state->pcur, &state->mtr);
376
memset(&ret, 0, sizeof(ret));
381
UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
395
out[len - 1] = (byte) (out[len - 1] ^ 128);