1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2010 Brian Aker
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
#include "read_replication.h"
24
#include "create_replication.h"
27
#include "dict0crea.ic"
32
#include "page0page.h"
33
#include "mach0data.h"
34
#include "dict0boot.h"
35
#include "dict0dict.h"
38
#include "row0mysql.h"
39
#include "pars0pars.h"
43
#include "row0merge.h"
45
UNIV_INTERN ulint dict_create_sys_replication_log(void)
51
mutex_enter(&(dict_sys->mutex));
53
table1 = dict_table_get_low("SYS_REPLICATION_LOG");
57
mutex_exit(&(dict_sys->mutex));
62
mutex_exit(&(dict_sys->mutex));
64
trx= trx_allocate_for_mysql();
66
trx->op_info= "creating replication sys table";
68
row_mysql_lock_data_dictionary(trx);
70
pars_info_t *info= pars_info_create();
73
error = que_eval_sql(info,
74
"PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
76
"CREATE TABLE SYS_REPLICATION_LOG(ID BINARY(8), MESSAGE BLOB);\n"
77
"CREATE UNIQUE CLUSTERED INDEX ID_IND ON SYS_REPLICATION_LOG (ID);\n"
83
if (error != DB_SUCCESS)
85
fprintf(stderr, "InnoDB: error %lu in creation.\n", (ulong) error);
87
ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
90
"InnoDB: creation failed\n"
91
"InnoDB: tablespace is full\n"
92
"InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
94
row_drop_table_for_mysql("SYS_REPLICATION_LOG", trx, TRUE);
96
error = DB_MUST_GET_MORE_FILE_SPACE;
99
trx_commit_for_mysql(trx);
101
row_mysql_unlock_data_dictionary(trx);
103
trx_free_for_mysql(trx);
108
extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t* prebuilt);
110
ulint insert_replication_message(const char *message, size_t size,
111
trx_t *trx, uint64_t trx_id)
114
row_prebuilt_t* prebuilt; /* For reading rows */
118
table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
120
prebuilt = row_create_prebuilt(table);
122
if (prebuilt->trx != trx)
124
row_update_prebuilt_trx(prebuilt, trx);
127
/* DDL operations create table/drop table call
128
* innobase_commit_low() which will commit the trx
129
* that leaves the operation of committing to the
130
* log in a new trx. If that is the case we need
131
* to keep track and commit the trx later in this
134
bool is_started= true;
135
if (trx->conc_state == TRX_NOT_STARTED)
140
dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
142
dfield = dtuple_get_nth_field(dtuple, 0);
144
dfield_set_data(dfield, &trx_id, 8);
146
dfield = dtuple_get_nth_field(dtuple, 1);
147
dfield_set_data(dfield, message, size);
149
ins_node_t* node = prebuilt->ins_node;
151
thr = que_fork_get_first_thr(prebuilt->ins_graph);
153
if (prebuilt->sql_stat_start) {
154
node->state = INS_NODE_SET_IX_LOCK;
155
prebuilt->sql_stat_start = FALSE;
157
node->state = INS_NODE_ALLOC_ROW_ID;
160
que_thr_move_to_run_state_for_mysql(thr, trx);
163
thr->run_node = node;
164
thr->prev_node = node;
168
error = trx->error_state;
170
que_thr_stop_for_mysql_no_error(thr, trx);
171
row_prebuilt_free(prebuilt, FALSE);
175
trx_commit_for_mysql(trx);
181
UNIV_INTERN struct read_replication_state_st *replication_read_init(void)
183
struct read_replication_state_st *state= calloc(1, sizeof(struct read_replication_state_st));
185
mutex_enter(&(dict_sys->mutex));
187
mtr_start(&state->mtr);
188
state->sys_tables= dict_table_get_low("SYS_REPLICATION_LOG");
189
state->sys_index= UT_LIST_GET_FIRST(state->sys_tables->indexes);
191
mutex_exit(&(dict_sys->mutex));
193
btr_pcur_open_at_index_side(TRUE, state->sys_index, BTR_SEARCH_LEAF, &state->pcur, TRUE, &state->mtr);
198
UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *state)
200
btr_pcur_close(&state->pcur);
201
mtr_commit(&state->mtr);
205
UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
207
struct read_replication_return_st ret;
210
btr_pcur_move_to_next_user_rec(&state->pcur, &state->mtr);
212
rec= btr_pcur_get_rec(&state->pcur);
214
while (btr_pcur_is_on_user_rec(&state->pcur))
219
// Is the row deleted? If so go fetch the next
220
if (rec_get_deleted_flag(rec, 0))
223
// Store transaction id
224
field = rec_get_nth_field_old(rec, 0, &len);
225
ret.id= *(uint64_t *)field;
228
field = rec_get_nth_field_old(rec, 3, &len);
229
ret.message= (char *)field;
230
ret.message_length= len;
232
// @todo double check that "field" will continue to be value past this
234
btr_pcur_store_position(&state->pcur, &state->mtr);
235
mtr_commit(&state->mtr);
237
mtr_start(&state->mtr);
239
btr_pcur_restore_position(BTR_SEARCH_LEAF, &state->pcur, &state->mtr);
245
memset(&ret, 0, sizeof(ret));