80
static void initTransactionContext(message::Transaction &transaction)
82
message::TransactionContext *ctx= transaction.mutable_transaction_context();
83
ctx->set_transaction_id(transaction_id++);
84
ctx->set_start_timestamp(getNanoTimestamp());
85
ctx->set_server_id(server_id);
88
static void finalizeTransactionContext(message::Transaction &transaction)
90
message::TransactionContext *ctx= transaction.mutable_transaction_context();
91
ctx->set_end_timestamp(getNanoTimestamp());
94
static void doCreateTable1(message::Transaction &transaction)
96
message::Statement *statement= transaction.add_statement();
98
statement->set_type(message::Statement::RAW_SQL);
99
statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
100
statement->set_start_timestamp(getNanoTimestamp());
101
statement->set_end_timestamp(getNanoTimestamp());
104
static void doCreateTable2(message::Transaction &transaction)
106
message::Statement *statement= transaction.add_statement();
108
statement->set_type(message::Statement::RAW_SQL);
109
statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
110
statement->set_start_timestamp(getNanoTimestamp());
111
statement->set_end_timestamp(getNanoTimestamp());
114
static void doCreateTable3(message::Transaction &transaction)
116
message::Statement *statement= transaction.add_statement();
118
statement->set_type(message::Statement::RAW_SQL);
119
statement->set_sql("CREATE TABLE t3 (a INTEGER NOT NULL, b BLOB NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
120
statement->set_start_timestamp(getNanoTimestamp());
121
statement->set_end_timestamp(getNanoTimestamp());
124
static void doSimpleInsert(message::Transaction &transaction)
126
message::Statement *statement= transaction.add_statement();
128
/* Do generic Statement setup */
129
statement->set_type(message::Statement::INSERT);
130
statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
131
statement->set_start_timestamp(getNanoTimestamp());
133
/* Do INSERT-specific header and setup */
134
message::InsertHeader *header= statement->mutable_insert_header();
136
/* Add table and field metadata for the statement */
137
message::TableMetadata *t_meta= header->mutable_table_metadata();
138
t_meta->set_schema_name("test");
139
t_meta->set_table_name("t1");
141
message::FieldMetadata *f_meta= header->add_field_metadata();
142
f_meta->set_name("a");
143
f_meta->set_type(message::Table::Field::VARCHAR);
145
/* Add new values... */
146
message::InsertData *data= statement->mutable_insert_data();
147
data->set_segment_id(1);
148
data->set_end_segment(true);
150
message::InsertRecord *record1= data->add_record();
151
message::InsertRecord *record2= data->add_record();
153
record1->add_insert_value("1");
154
record2->add_insert_value("2");
156
statement->set_end_timestamp(getNanoTimestamp());
159
static void doNonVarcharInsert(message::Transaction &transaction)
161
message::Statement *statement= transaction.add_statement();
163
/* Do generic Statement setup */
164
statement->set_type(message::Statement::INSERT);
165
statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
166
statement->set_start_timestamp(getNanoTimestamp());
168
/* Do INSERT-specific header and setup */
169
message::InsertHeader *header= statement->mutable_insert_header();
171
/* Add table and field metadata for the statement */
172
message::TableMetadata *t_meta= header->mutable_table_metadata();
173
t_meta->set_schema_name("test");
174
t_meta->set_table_name("t2");
176
message::FieldMetadata *f_meta= header->add_field_metadata();
177
f_meta->set_name("a");
178
f_meta->set_type(message::Table::Field::INTEGER);
180
/* Add new values... */
181
message::InsertData *data= statement->mutable_insert_data();
182
data->set_segment_id(1);
183
data->set_end_segment(true);
185
message::InsertRecord *record1= data->add_record();
186
message::InsertRecord *record2= data->add_record();
188
record1->add_insert_value("1");
189
record2->add_insert_value("2");
191
statement->set_end_timestamp(getNanoTimestamp());
194
static void doBlobInsert(message::Transaction &transaction)
196
message::Statement *statement= transaction.add_statement();
198
/* Do generic Statement setup */
199
statement->set_type(message::Statement::INSERT);
200
statement->set_sql("INSERT INTO t3 (a, b) VALUES (1, 'test\0me')", 43); /* 43 == length including \0 */
201
statement->set_start_timestamp(getNanoTimestamp());
203
/* Do INSERT-specific header and setup */
204
message::InsertHeader *header= statement->mutable_insert_header();
206
/* Add table and field metadata for the statement */
207
message::TableMetadata *t_meta= header->mutable_table_metadata();
208
t_meta->set_schema_name("test");
209
t_meta->set_table_name("t3");
211
message::FieldMetadata *f_meta= header->add_field_metadata();
212
f_meta->set_name("a");
213
f_meta->set_type(message::Table::Field::INTEGER);
215
f_meta= header->add_field_metadata();
216
f_meta->set_name("b");
217
f_meta->set_type(message::Table::Field::BLOB);
219
/* Add new values... */
220
message::InsertData *data= statement->mutable_insert_data();
221
data->set_segment_id(1);
222
data->set_end_segment(true);
224
message::InsertRecord *record1= data->add_record();
226
record1->add_insert_value("1");
227
record1->add_insert_value("test\0me", 7); /* 7 == length including \0 */
229
statement->set_end_timestamp(getNanoTimestamp());
232
static void doSimpleDelete(message::Transaction &transaction)
234
message::Statement *statement= transaction.add_statement();
236
/* Do generic Statement setup */
237
statement->set_type(message::Statement::DELETE);
238
statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
239
statement->set_start_timestamp(getNanoTimestamp());
241
/* Do DELETE-specific header and setup */
242
message::DeleteHeader *header= statement->mutable_delete_header();
244
/* Add table and field metadata for the statement */
245
message::TableMetadata *t_meta= header->mutable_table_metadata();
246
t_meta->set_schema_name("test");
247
t_meta->set_table_name("t1");
249
message::FieldMetadata *f_meta= header->add_key_field_metadata();
250
f_meta->set_name("a");
251
f_meta->set_type(message::Table::Field::VARCHAR);
253
/* Add new values... */
254
message::DeleteData *data= statement->mutable_delete_data();
255
data->set_segment_id(1);
256
data->set_end_segment(true);
258
message::DeleteRecord *record1= data->add_record();
260
record1->add_key_value("1");
262
statement->set_end_timestamp(getNanoTimestamp());
265
static void doSimpleUpdate(message::Transaction &transaction)
267
message::Statement *statement= transaction.add_statement();
269
/* Do generic Statement setup */
270
statement->set_type(message::Statement::UPDATE);
271
statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
272
statement->set_start_timestamp(getNanoTimestamp());
274
/* Do UPDATE-specific header and setup */
275
message::UpdateHeader *header= statement->mutable_update_header();
277
/* Add table and field metadata for the statement */
278
message::TableMetadata *t_meta= header->mutable_table_metadata();
279
t_meta->set_schema_name("test");
280
t_meta->set_table_name("t1");
282
message::FieldMetadata *kf_meta= header->add_key_field_metadata();
283
kf_meta->set_name("a");
284
kf_meta->set_type(message::Table::Field::VARCHAR);
286
message::FieldMetadata *sf_meta= header->add_set_field_metadata();
287
sf_meta->set_name("a");
288
sf_meta->set_type(message::Table::Field::VARCHAR);
290
/* Add new values... */
291
message::UpdateData *data= statement->mutable_update_data();
292
data->set_segment_id(1);
293
data->set_end_segment(true);
295
message::UpdateRecord *record1= data->add_record();
297
record1->add_after_value("5");
298
record1->add_key_value("1");
300
statement->set_end_timestamp(getNanoTimestamp());
303
static void doMultiKeyUpdate(message::Transaction &transaction)
305
message::Statement *statement= transaction.add_statement();
307
/* Do generic Statement setup */
308
statement->set_type(message::Statement::UPDATE);
309
statement->set_sql("UPDATE t1 SET a = \"5\"");
310
statement->set_start_timestamp(getNanoTimestamp());
312
/* Do UPDATE-specific header and setup */
313
message::UpdateHeader *header= statement->mutable_update_header();
315
/* Add table and field metadata for the statement */
316
message::TableMetadata *t_meta= header->mutable_table_metadata();
317
t_meta->set_schema_name("test");
318
t_meta->set_table_name("t1");
320
message::FieldMetadata *kf_meta= header->add_key_field_metadata();
321
kf_meta->set_name("a");
322
kf_meta->set_type(message::Table::Field::VARCHAR);
324
message::FieldMetadata *sf_meta= header->add_set_field_metadata();
325
sf_meta->set_name("a");
326
sf_meta->set_type(message::Table::Field::VARCHAR);
328
/* Add new values... */
329
message::UpdateData *data= statement->mutable_update_data();
330
data->set_segment_id(1);
331
data->set_end_segment(true);
333
message::UpdateRecord *record1= data->add_record();
334
message::UpdateRecord *record2= data->add_record();
336
record1->add_after_value("5");
337
record1->add_key_value("1");
338
record2->add_after_value("5");
339
record2->add_key_value("2");
341
statement->set_end_timestamp(getNanoTimestamp());
344
static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
346
std::string buffer("");
347
finalizeTransactionContext(transaction);
38
void writeCommit(drizzled::message::Command &record)
40
record.set_type(Command::COMMIT);
41
record.set_timestamp(getNanoTimestamp());
43
drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
44
trx->set_server_id(server_id);
45
trx->set_transaction_id(transaction_id);
48
void writeRollback(drizzled::message::Command &record)
50
record.set_type(Command::ROLLBACK);
51
record.set_timestamp(getNanoTimestamp());
53
drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
54
trx->set_server_id(server_id);
55
trx->set_transaction_id(transaction_id);
58
void writeStartTransaction(drizzled::message::Command &record)
60
record.set_type(Command::START_TRANSACTION);
61
record.set_timestamp(getNanoTimestamp());
63
drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
64
trx->set_server_id(server_id);
65
trx->set_transaction_id(transaction_id);
68
void writeInsert(drizzled::message::Command &record)
70
record.set_type(Command::INSERT);
71
record.set_sql("INSERT INTO t1 (a) VALUES (1) (2)");
72
record.set_timestamp(getNanoTimestamp());
73
record.set_schema("test");
74
record.set_table("t1");
76
drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
77
trx->set_server_id(server_id);
78
trx->set_transaction_id(transaction_id);
80
drizzled::message::InsertRecord *irecord= record.mutable_insert_record();
82
/* Add Fields and Values... */
84
Table::Field *field= irecord->add_insert_field();
86
field->set_type(drizzled::message::Table::Field::VARCHAR);
88
irecord->add_insert_value("1");
89
irecord->add_insert_value("2");
92
void writeDeleteWithPK(drizzled::message::Command &record)
94
record.set_type(Command::DELETE);
95
record.set_sql("DELETE FROM t1 WHERE a = 1");
96
record.set_timestamp(getNanoTimestamp());
97
record.set_schema("test");
98
record.set_table("t1");
100
drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
101
trx->set_server_id(server_id);
102
trx->set_transaction_id(transaction_id);
104
drizzled::message::DeleteRecord *drecord= record.mutable_delete_record();
106
Table::Field *field= drecord->add_where_field();
107
field->set_name("a");
108
field->set_type(drizzled::message::Table::Field::VARCHAR);
110
drecord->add_where_value("1");
113
void writeUpdateWithPK(drizzled::message::Command &record)
115
record.set_type(Command::UPDATE);
116
record.set_sql("UPDATE t1 SET a = 5 WHERE a = 1;");
117
record.set_timestamp(getNanoTimestamp());
118
record.set_schema("test");
119
record.set_table("t1");
121
drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
122
trx->set_server_id(server_id);
123
trx->set_transaction_id(transaction_id);
125
drizzled::message::UpdateRecord *urecord= record.mutable_update_record();
129
field= urecord->add_update_field();
130
field->set_name("a");
131
field->set_type(drizzled::message::Table::Field::VARCHAR);
133
urecord->add_after_value("5");
135
field= urecord->add_where_field();
136
field->set_name("a");
137
field->set_type(drizzled::message::Table::Field::VARCHAR);
139
urecord->add_where_value("1");
142
void writeTransaction(int file, drizzled::message::Transaction &transaction)
148
drizzled::message::TransactionContext *trx= transaction.mutable_transaction_context();
149
trx->set_server_id(server_id);
150
trx->set_transaction_id(transaction_id);
348
152
transaction.SerializeToString(&buffer);
350
size_t length= buffer.length();
352
output->WriteLittleEndian32(static_cast<uint32_t>(ReplicationServices::TRANSACTION));
353
output->WriteLittleEndian32(static_cast<uint32_t>(length));
354
output->WriteString(buffer);
355
output->WriteLittleEndian32(drizzled::algorithm::crc32(buffer.c_str(), length)); /* checksum */
154
length= buffer.length();
156
cout << "Writing transaction of " << length << " length." << endl;
158
if ((written= write(file, &length, sizeof(uint64_t))) != sizeof(uint64_t))
160
cerr << "Only wrote " << written << " out of " << length << "." << endl;
164
if ((written= write(file, buffer.c_str(), length)) != length)
166
cerr << "Only wrote " << written << " out of " << length << "." << endl;
358
171
int main(int argc, char* argv[])
365
fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
178
cerr << "Usage: " << argv[0] << " TRANSACTION_LOG" << endl;
369
182
if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
371
fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
184
cerr << "Can not open file: " << argv[0] << endl;
375
protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
376
protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
378
/* Write a series of statements which test each type of Statement */
379
message::Transaction transaction;
381
/* Simple CREATE TABLE statements as raw sql */
382
initTransactionContext(transaction);
383
doCreateTable1(transaction);
384
writeTransaction(coded_output, transaction);
387
initTransactionContext(transaction);
388
doCreateTable2(transaction);
389
writeTransaction(coded_output, transaction);
188
/* Write a series of statements which test each type of record class */
392
191
/* Simple INSERT statement */
393
initTransactionContext(transaction);
394
doSimpleInsert(transaction);
395
writeTransaction(coded_output, transaction);
192
Transaction transaction;
193
transaction.set_start_timestamp(getNanoTimestamp());
194
writeStartTransaction(*transaction.add_command());
195
writeInsert(*transaction.add_command());
196
writeCommit(*transaction.add_command());
197
transaction.set_end_timestamp(getNanoTimestamp());
199
writeTransaction(file, transaction);
396
201
transaction.Clear();
398
203
/* Write a DELETE and an UPDATE in one transaction */
399
initTransactionContext(transaction);
400
doSimpleDelete(transaction);
401
doSimpleUpdate(transaction);
402
writeTransaction(coded_output, transaction);
405
/* Test an INSERT into non-varchar columns */
406
initTransactionContext(transaction);
407
doNonVarcharInsert(transaction);
408
writeTransaction(coded_output, transaction);
411
/* Write an UPDATE which affects >1 row */
412
initTransactionContext(transaction);
413
doMultiKeyUpdate(transaction);
414
writeTransaction(coded_output, transaction);
417
/* Write an INSERT which writes BLOB data */
418
initTransactionContext(transaction);
419
doCreateTable3(transaction);
420
doBlobInsert(transaction);
421
writeTransaction(coded_output, transaction);
205
transaction.set_start_timestamp(getNanoTimestamp());
206
writeStartTransaction(*transaction.add_command());
207
writeDeleteWithPK(*transaction.add_command());
208
writeUpdateWithPK(*transaction.add_command());
209
writeCommit(*transaction.add_command());
210
transaction.set_end_timestamp(getNanoTimestamp());
212
writeTransaction(file, transaction);