18
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
#include <drizzled/message.h>
24
23
#include "read_replication.h"
25
24
#include "create_replication.h"
77
76
error = que_eval_sql(info,
78
77
"PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
80
"CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), ORIGINATING_SERVER_UUID BLOB, ORIGINATING_COMMIT_ID INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
79
"CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
81
80
"CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
82
81
"CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
129
128
drizzled::message::Table::TableOptions *options= table_message->mutable_options();
130
129
options->set_collation_id(drizzled::my_charset_bin.number);
131
130
options->set_collation(drizzled::my_charset_bin.name);
132
drizzled::message::set_is_replicated(*table_message, false);
134
132
drizzled::message::Table::Field *field= table_message->add_field();
135
133
field->set_name("ID");
148
146
field->set_type(drizzled::message::Table::Field::BIGINT);
150
148
field= table_message->add_field();
151
field->set_name("ORIGINATING_SERVER_UUID");
152
field->set_type(drizzled::message::Table::Field::BLOB);
154
field= table_message->add_field();
155
field->set_name("ORIGINATING_COMMIT_ID");
156
field->set_type(drizzled::message::Table::Field::BIGINT);
158
field= table_message->add_field();
159
149
field->set_name("MESSAGE_LEN");
160
150
field->set_type(drizzled::message::Table::Field::INTEGER);
171
161
index->set_is_primary(true);
172
162
index->set_is_unique(true);
173
163
index->set_type(drizzled::message::Table::Index::BTREE);
174
index->set_key_length(12);
164
index->set_key_length(8);
175
165
drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
176
166
part->set_fieldnr(0);
177
167
part->set_compare_length(8);
184
174
index->set_is_primary(false);
185
175
index->set_is_unique(false);
186
176
index->set_type(drizzled::message::Table::Index::BTREE);
187
index->set_key_length(16);
177
index->set_key_length(8);
188
178
part= index->add_index_part();
189
179
part->set_fieldnr(2);
190
180
part->set_compare_length(8);
200
190
ulint insert_replication_message(const char *message, size_t size,
201
191
trx_t *trx, uint64_t trx_id,
202
192
uint64_t end_timestamp, bool is_end_segment,
203
uint32_t seg_id, const char *server_uuid,
204
bool use_originating_server_uuid,
205
const char *originating_server_uuid,
206
uint64_t originating_commit_id)
209
196
row_prebuilt_t* prebuilt; /* For reading rows */
263
250
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
264
251
dfield_set_data(dfield, data, 8);
266
if (not use_originating_server_uuid)
268
/* This transaction originated from this server, rather then being
269
replicated to this server reset the values to reflect that */
270
originating_server_uuid= server_uuid;
271
originating_commit_id= commit_id;
274
253
dfield = dtuple_get_nth_field(dtuple, 4);
275
dfield_set_data(dfield, originating_server_uuid, 36);
254
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
255
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
256
dfield_set_data(dfield, data, 4);
277
258
dfield = dtuple_get_nth_field(dtuple, 5);
278
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
279
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_commit_id, 8, dict_table_is_comp(prebuilt->table));
280
dfield_set_data(dfield, data, 8);
282
dfield = dtuple_get_nth_field(dtuple, 6);
283
data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
284
row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
285
dfield_set_data(dfield, data, 4);
287
dfield = dtuple_get_nth_field(dtuple, 7);
288
259
dfield_set_data(dfield, message, size);
290
261
ins_node_t* node = prebuilt->ins_node;
383
354
convert_to_mysql_format(timestampbyte, field, 8);
384
355
ret.end_timestamp= *(uint64_t *)timestampbyte;
386
field = rec_get_nth_field_old(rec, 6, &len);
387
ret.originating_server_uuid= (char *)field;
389
358
field = rec_get_nth_field_old(rec, 7, &len);
390
byte originatingcommitbyte[8];
391
convert_to_mysql_format(originatingcommitbyte, field, 8);
392
ret.originating_commit_id= *(uint64_t *)originatingcommitbyte;
395
field = rec_get_nth_field_old(rec, 9, &len);
396
359
ret.message= (char *)field;
397
360
ret.message_length= len;