18
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
#include <drizzled/message.h>
23
24
#include "read_replication.h"
24
25
#include "create_replication.h"
76
77
error = que_eval_sql(info,
77
78
"PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
79
"CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
80
"CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), ORIGINATING_SERVER_UUID BLOB, ORIGINATING_COMMIT_ID INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
80
81
"CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
81
82
"CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
128
129
drizzled::message::Table::TableOptions *options= table_message->mutable_options();
129
130
options->set_collation_id(drizzled::my_charset_bin.number);
130
131
options->set_collation(drizzled::my_charset_bin.name);
132
drizzled::message::set_is_replicated(*table_message, false);
132
134
drizzled::message::Table::Field *field= table_message->add_field();
133
135
field->set_name("ID");
146
148
field->set_type(drizzled::message::Table::Field::BIGINT);
148
150
field= table_message->add_field();
151
field->set_name("ORIGINATING_SERVER_UUID");
152
field->set_type(drizzled::message::Table::Field::BLOB);
154
field= table_message->add_field();
155
field->set_name("ORIGINATING_COMMIT_ID");
156
field->set_type(drizzled::message::Table::Field::BIGINT);
158
field= table_message->add_field();
149
159
field->set_name("MESSAGE_LEN");
150
160
field->set_type(drizzled::message::Table::Field::INTEGER);
161
171
index->set_is_primary(true);
162
172
index->set_is_unique(true);
163
173
index->set_type(drizzled::message::Table::Index::BTREE);
164
index->set_key_length(8);
174
index->set_key_length(12);
165
175
drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
166
176
part->set_fieldnr(0);
167
177
part->set_compare_length(8);
174
184
index->set_is_primary(false);
175
185
index->set_is_unique(false);
176
186
index->set_type(drizzled::message::Table::Index::BTREE);
177
index->set_key_length(8);
187
index->set_key_length(16);
178
188
part= index->add_index_part();
179
189
part->set_fieldnr(2);
180
190
part->set_compare_length(8);
190
200
ulint insert_replication_message(const char *message, size_t size,
191
201
trx_t *trx, uint64_t trx_id,
192
202
uint64_t end_timestamp, bool is_end_segment,
203
uint32_t seg_id, const char *server_uuid,
204
bool use_originating_server_uuid,
205
const char *originating_server_uuid,
206
uint64_t originating_commit_id)
196
209
row_prebuilt_t* prebuilt; /* For reading rows */
250
263
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
251
264
dfield_set_data(dfield, data, 8);
266
if (not use_originating_server_uuid)
268
/* This transaction originated from this server, rather then being
269
replicated to this server reset the values to reflect that */
270
originating_server_uuid= server_uuid;
271
originating_commit_id= commit_id;
253
274
dfield = dtuple_get_nth_field(dtuple, 4);
275
dfield_set_data(dfield, originating_server_uuid, 36);
277
dfield = dtuple_get_nth_field(dtuple, 5);
278
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
279
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_commit_id, 8, dict_table_is_comp(prebuilt->table));
280
dfield_set_data(dfield, data, 8);
282
dfield = dtuple_get_nth_field(dtuple, 6);
254
283
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
255
284
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
256
285
dfield_set_data(dfield, data, 4);
258
dfield = dtuple_get_nth_field(dtuple, 5);
287
dfield = dtuple_get_nth_field(dtuple, 7);
259
288
dfield_set_data(dfield, message, size);
261
290
ins_node_t* node = prebuilt->ins_node;
354
383
convert_to_mysql_format(timestampbyte, field, 8);
355
384
ret.end_timestamp= *(uint64_t *)timestampbyte;
386
field = rec_get_nth_field_old(rec, 6, &len);
387
ret.originating_server_uuid= (char *)field;
389
field = rec_get_nth_field_old(rec, 7, &len);
390
byte originatingcommitbyte[8];
391
convert_to_mysql_format(originatingcommitbyte, field, 8);
392
ret.originating_commit_id= *(uint64_t *)originatingcommitbyte;
357
394
// Handler message
358
field = rec_get_nth_field_old(rec, 7, &len);
395
field = rec_get_nth_field_old(rec, 9, &len);
359
396
ret.message= (char *)field;
360
397
ret.message_length= len;