1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2009 Sun Microsystems, Inc.
8
* Jay Pipes <joinfu@sun.com>
10
* This program is free software; you can redistribute it and/or modify
11
* it under the terms of the GNU General Public License as published by
12
* the Free Software Foundation; version 2 of the License.
14
* This program is distributed in the hope that it will be useful,
15
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
* GNU General Public License for more details.
19
* You should have received a copy of the GNU General Public License
20
* along with this program; if not, write to the Free Software
21
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
#include "drizzled/algorithm/crc32.h"
26
#include "drizzled/gettext.h"
27
#include "drizzled/replication_services.h"
29
4
#include <sys/types.h>
30
5
#include <sys/stat.h>
36
#if TIME_WITH_SYS_TIME
37
# include <sys/time.h>
41
# include <sys/time.h>
47
#include <drizzled/message/transaction.pb.h>
49
#include <google/protobuf/io/coded_stream.h>
50
#include <google/protobuf/io/zero_copy_stream_impl.h>
52
#include "drizzled/gettext.h"
55
* @file Example script for writing transactions to a log file.
10
#include <drizzled/serialize/replication_event.pb.h>
58
12
using namespace std;
59
using namespace drizzled;
60
using namespace google;
62
static uint32_t server_id= 1;
63
static uint64_t transaction_id= 1;
65
static uint64_t getNanoTimestamp()
67
#ifdef HAVE_CLOCK_GETTIME
69
clock_gettime(CLOCK_REALTIME, &tp);
70
return (uint64_t) tp.tv_sec * 10000000
71
+ (uint64_t) tp.tv_nsec;
74
gettimeofday(&tv,NULL);
75
return (uint64_t) tv.tv_sec * 10000000
76
+ (uint64_t) tv.tv_usec * 1000;
80
static void initTransactionContext(message::Transaction &transaction)
82
message::TransactionContext *ctx= transaction.mutable_transaction_context();
83
ctx->set_transaction_id(transaction_id++);
84
ctx->set_start_timestamp(getNanoTimestamp());
85
ctx->set_server_id(server_id);
88
static void finalizeTransactionContext(message::Transaction &transaction)
90
message::TransactionContext *ctx= transaction.mutable_transaction_context();
91
ctx->set_end_timestamp(getNanoTimestamp());
94
static void doCreateTable1(message::Transaction &transaction)
96
message::Statement *statement= transaction.add_statement();
98
statement->set_type(message::Statement::RAW_SQL);
99
statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
100
statement->set_start_timestamp(getNanoTimestamp());
101
statement->set_end_timestamp(getNanoTimestamp());
104
static void doCreateTable2(message::Transaction &transaction)
106
message::Statement *statement= transaction.add_statement();
108
statement->set_type(message::Statement::RAW_SQL);
109
statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
110
statement->set_start_timestamp(getNanoTimestamp());
111
statement->set_end_timestamp(getNanoTimestamp());
114
static void doCreateTable3(message::Transaction &transaction)
116
message::Statement *statement= transaction.add_statement();
118
statement->set_type(message::Statement::RAW_SQL);
119
statement->set_sql("CREATE TABLE t3 (a INTEGER NOT NULL, b BLOB NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
120
statement->set_start_timestamp(getNanoTimestamp());
121
statement->set_end_timestamp(getNanoTimestamp());
124
static void doSimpleInsert(message::Transaction &transaction)
126
message::Statement *statement= transaction.add_statement();
128
/* Do generic Statement setup */
129
statement->set_type(message::Statement::INSERT);
130
statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
131
statement->set_start_timestamp(getNanoTimestamp());
133
/* Do INSERT-specific header and setup */
134
message::InsertHeader *header= statement->mutable_insert_header();
136
/* Add table and field metadata for the statement */
137
message::TableMetadata *t_meta= header->mutable_table_metadata();
138
t_meta->set_schema_name("test");
139
t_meta->set_table_name("t1");
141
message::FieldMetadata *f_meta= header->add_field_metadata();
142
f_meta->set_name("a");
143
f_meta->set_type(message::Table::Field::VARCHAR);
145
/* Add new values... */
146
message::InsertData *data= statement->mutable_insert_data();
147
data->set_segment_id(1);
148
data->set_end_segment(true);
150
message::InsertRecord *record1= data->add_record();
151
message::InsertRecord *record2= data->add_record();
153
record1->add_insert_value("1");
154
record2->add_insert_value("2");
156
statement->set_end_timestamp(getNanoTimestamp());
159
static void doNonVarcharInsert(message::Transaction &transaction)
161
message::Statement *statement= transaction.add_statement();
163
/* Do generic Statement setup */
164
statement->set_type(message::Statement::INSERT);
165
statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
166
statement->set_start_timestamp(getNanoTimestamp());
168
/* Do INSERT-specific header and setup */
169
message::InsertHeader *header= statement->mutable_insert_header();
171
/* Add table and field metadata for the statement */
172
message::TableMetadata *t_meta= header->mutable_table_metadata();
173
t_meta->set_schema_name("test");
174
t_meta->set_table_name("t2");
176
message::FieldMetadata *f_meta= header->add_field_metadata();
177
f_meta->set_name("a");
178
f_meta->set_type(message::Table::Field::INTEGER);
180
/* Add new values... */
181
message::InsertData *data= statement->mutable_insert_data();
182
data->set_segment_id(1);
183
data->set_end_segment(true);
185
message::InsertRecord *record1= data->add_record();
186
message::InsertRecord *record2= data->add_record();
188
record1->add_insert_value("1");
189
record2->add_insert_value("2");
191
statement->set_end_timestamp(getNanoTimestamp());
194
static void doBlobInsert(message::Transaction &transaction)
196
message::Statement *statement= transaction.add_statement();
198
/* Do generic Statement setup */
199
statement->set_type(message::Statement::INSERT);
200
statement->set_sql("INSERT INTO t3 (a, b) VALUES (1, 'test\0me')", 43); /* 43 == length including \0 */
201
statement->set_start_timestamp(getNanoTimestamp());
203
/* Do INSERT-specific header and setup */
204
message::InsertHeader *header= statement->mutable_insert_header();
206
/* Add table and field metadata for the statement */
207
message::TableMetadata *t_meta= header->mutable_table_metadata();
208
t_meta->set_schema_name("test");
209
t_meta->set_table_name("t3");
211
message::FieldMetadata *f_meta= header->add_field_metadata();
212
f_meta->set_name("a");
213
f_meta->set_type(message::Table::Field::INTEGER);
215
f_meta= header->add_field_metadata();
216
f_meta->set_name("b");
217
f_meta->set_type(message::Table::Field::BLOB);
219
/* Add new values... */
220
message::InsertData *data= statement->mutable_insert_data();
221
data->set_segment_id(1);
222
data->set_end_segment(true);
224
message::InsertRecord *record1= data->add_record();
226
record1->add_insert_value("1");
227
record1->add_insert_value("test\0me", 7); /* 7 == length including \0 */
229
statement->set_end_timestamp(getNanoTimestamp());
232
static void doSimpleDelete(message::Transaction &transaction)
234
message::Statement *statement= transaction.add_statement();
236
/* Do generic Statement setup */
237
statement->set_type(message::Statement::DELETE);
238
statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
239
statement->set_start_timestamp(getNanoTimestamp());
241
/* Do DELETE-specific header and setup */
242
message::DeleteHeader *header= statement->mutable_delete_header();
244
/* Add table and field metadata for the statement */
245
message::TableMetadata *t_meta= header->mutable_table_metadata();
246
t_meta->set_schema_name("test");
247
t_meta->set_table_name("t1");
249
message::FieldMetadata *f_meta= header->add_key_field_metadata();
250
f_meta->set_name("a");
251
f_meta->set_type(message::Table::Field::VARCHAR);
253
/* Add new values... */
254
message::DeleteData *data= statement->mutable_delete_data();
255
data->set_segment_id(1);
256
data->set_end_segment(true);
258
message::DeleteRecord *record1= data->add_record();
260
record1->add_key_value("1");
262
statement->set_end_timestamp(getNanoTimestamp());
265
static void doSimpleUpdate(message::Transaction &transaction)
267
message::Statement *statement= transaction.add_statement();
269
/* Do generic Statement setup */
270
statement->set_type(message::Statement::UPDATE);
271
statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
272
statement->set_start_timestamp(getNanoTimestamp());
274
/* Do UPDATE-specific header and setup */
275
message::UpdateHeader *header= statement->mutable_update_header();
277
/* Add table and field metadata for the statement */
278
message::TableMetadata *t_meta= header->mutable_table_metadata();
279
t_meta->set_schema_name("test");
280
t_meta->set_table_name("t1");
282
message::FieldMetadata *kf_meta= header->add_key_field_metadata();
283
kf_meta->set_name("a");
284
kf_meta->set_type(message::Table::Field::VARCHAR);
286
message::FieldMetadata *sf_meta= header->add_set_field_metadata();
287
sf_meta->set_name("a");
288
sf_meta->set_type(message::Table::Field::VARCHAR);
290
/* Add new values... */
291
message::UpdateData *data= statement->mutable_update_data();
292
data->set_segment_id(1);
293
data->set_end_segment(true);
295
message::UpdateRecord *record1= data->add_record();
297
record1->add_after_value("5");
298
record1->add_key_value("1");
300
statement->set_end_timestamp(getNanoTimestamp());
303
static void doMultiKeyUpdate(message::Transaction &transaction)
305
message::Statement *statement= transaction.add_statement();
307
/* Do generic Statement setup */
308
statement->set_type(message::Statement::UPDATE);
309
statement->set_sql("UPDATE t1 SET a = \"5\"");
310
statement->set_start_timestamp(getNanoTimestamp());
312
/* Do UPDATE-specific header and setup */
313
message::UpdateHeader *header= statement->mutable_update_header();
315
/* Add table and field metadata for the statement */
316
message::TableMetadata *t_meta= header->mutable_table_metadata();
317
t_meta->set_schema_name("test");
318
t_meta->set_table_name("t1");
320
message::FieldMetadata *kf_meta= header->add_key_field_metadata();
321
kf_meta->set_name("a");
322
kf_meta->set_type(message::Table::Field::VARCHAR);
324
message::FieldMetadata *sf_meta= header->add_set_field_metadata();
325
sf_meta->set_name("a");
326
sf_meta->set_type(message::Table::Field::VARCHAR);
328
/* Add new values... */
329
message::UpdateData *data= statement->mutable_update_data();
330
data->set_segment_id(1);
331
data->set_end_segment(true);
333
message::UpdateRecord *record1= data->add_record();
334
message::UpdateRecord *record2= data->add_record();
336
record1->add_after_value("5");
337
record1->add_key_value("1");
338
record2->add_after_value("5");
339
record2->add_key_value("2");
341
statement->set_end_timestamp(getNanoTimestamp());
344
static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
346
std::string buffer("");
347
finalizeTransactionContext(transaction);
348
transaction.SerializeToString(&buffer);
350
size_t length= buffer.length();
352
output->WriteLittleEndian32(static_cast<uint32_t>(ReplicationServices::TRANSACTION));
353
output->WriteLittleEndian32(static_cast<uint32_t>(length));
354
output->WriteString(buffer);
355
output->WriteLittleEndian32(drizzled::algorithm::crc32(buffer.c_str(), length)); /* checksum */
13
using namespace drizzle;
15
static uint64_t query_id= 0;
16
char transaction_id[37];
19
Example script for reader a Drizzle master replication list.
22
void write_ddl(::drizzle::Event *record, const char *sql)
26
uuid_generate_time(uu);
27
uuid_unparse(uu, transaction_id);
29
record->set_type(Event::DDL);
30
record->set_autocommit(true);
31
record->set_server_id("localhost");
32
record->set_query_id(query_id++);
33
record->set_transaction_id(transaction_id);
34
record->set_schema("test");
38
void write_insert(::drizzle::Event *record, const char *trx)
42
record->set_type(Event::INSERT);
43
record->set_autocommit(true);
44
record->set_server_id("localhost");
45
record->set_query_id(query_id++);
46
record->set_transaction_id(trx);
47
record->set_schema("test");
48
record->set_table("t1");
49
record->set_sql("INSERT INTO t1 (a) VALUES (1) (2)");
52
record->add_field_names("a");
54
/* Add values (first row) */
55
value= record->add_values();
56
value->add_value("1");
58
/* Add values (second row) */
59
value= record->add_values();
60
value->add_value("2");
63
void write_delete(::drizzle::Event *record, const char *trx)
67
record->set_type(Event::DELETE);
68
record->set_autocommit(true);
69
record->set_server_id("localhost");
70
record->set_query_id(query_id++);
71
record->set_transaction_id(trx);
72
record->set_schema("test");
73
record->set_table("t1");
74
record->set_sql("DELETE FROM t1 WHERE a IN (1, 2)");
77
record->set_primary_key("a");
79
/* Add values for IN() */
80
value= record->add_values();
81
value->add_value("1");
82
value->add_value("2");
85
void write_update(::drizzle::Event *record, const char *trx)
89
record->set_type(Event::UPDATE);
90
record->set_autocommit(true);
91
record->set_server_id("localhost");
92
record->set_query_id(query_id++);
93
record->set_transaction_id(trx);
94
record->set_schema("test");
95
record->set_table("t1");
96
record->set_sql("UPDATE t1 SET a=5 WHERE a = 1 ");
97
record->set_primary_key("a");
100
record->add_field_names("a");
102
/* Add values (first row) */
103
value= record->add_values();
104
value->add_value("1"); // The first value is always the primary key comparison value
105
value->add_value("5");
107
/* Add values (second row) */
108
value= record->add_values();
109
value->add_value("2");
110
value->add_value("6");
113
void write_to_disk(int file, ::drizzle::EventList *list)
119
list->SerializePartialToString(&buffer);
121
length= buffer.length();
123
cout << "Writing record of " << length << "." << endl;
125
if ((written= write(file, &length, sizeof(uint64_t))) != sizeof(uint64_t))
127
cerr << "Only wrote " << written << " out of " << length << "." << endl;
131
if ((written= write(file, buffer.c_str(), length)) != length)
133
cerr << "Only wrote " << written << " out of " << length << "." << endl;
358
139
int main(int argc, char* argv[])
365
fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
146
cerr << "Usage: " << argv[0] << " REPLICATION_EVENT_LOG " << endl;
369
150
if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
371
fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
152
cerr << "Can not open file: " << argv[0] << endl;
375
protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
376
protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
378
/* Write a series of statements which test each type of Statement */
379
message::Transaction transaction;
381
/* Simple CREATE TABLE statements as raw sql */
382
initTransactionContext(transaction);
383
doCreateTable1(transaction);
384
writeTransaction(coded_output, transaction);
387
initTransactionContext(transaction);
388
doCreateTable2(transaction);
389
writeTransaction(coded_output, transaction);
392
/* Simple INSERT statement */
393
initTransactionContext(transaction);
394
doSimpleInsert(transaction);
395
writeTransaction(coded_output, transaction);
398
/* Write a DELETE and an UPDATE in one transaction */
399
initTransactionContext(transaction);
400
doSimpleDelete(transaction);
401
doSimpleUpdate(transaction);
402
writeTransaction(coded_output, transaction);
405
/* Test an INSERT into non-varchar columns */
406
initTransactionContext(transaction);
407
doNonVarcharInsert(transaction);
408
writeTransaction(coded_output, transaction);
411
/* Write an UPDATE which affects >1 row */
412
initTransactionContext(transaction);
413
doMultiKeyUpdate(transaction);
414
writeTransaction(coded_output, transaction);
417
/* Write an INSERT which writes BLOB data */
418
initTransactionContext(transaction);
419
doCreateTable3(transaction);
420
doBlobInsert(transaction);
421
writeTransaction(coded_output, transaction);
158
/* Write first set of records */
159
write_ddl(list.add_event(), "CREATE TABLE A (a int) ENGINE=innodb");
160
write_insert(list.add_event(), transaction_id);
162
write_to_disk(file, &list);
164
/* Write Second set of records */
165
write_ddl(list.add_event(), "CREATE TABLE A (a int) ENGINE=innodb");
166
write_delete(list.add_event(), transaction_id);
167
write_update(list.add_event(), transaction_id);
169
write_to_disk(file, &list);