~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/serialize/replication_event_writer.cc

  • Committer: Brian Aker
  • Date: 2009-02-21 00:18:15 UTC
  • Revision ID: brian@tangent.org-20090221001815-x20e8h71e984lvs1
Completion (?) of uint conversion.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2009 Sun Microsystems, Inc.
5
 
 *
6
 
 *  Authors:
7
 
 *
8
 
 *    Jay Pipes <joinfu@sun.com>
9
 
 *
10
 
 *  This program is free software; you can redistribute it and/or modify
11
 
 *  it under the terms of the GNU General Public License as published by
12
 
 *  the Free Software Foundation; version 2 of the License.
13
 
 *
14
 
 *  This program is distributed in the hope that it will be useful,
15
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 
 *  GNU General Public License for more details.
18
 
 *
19
 
 *  You should have received a copy of the GNU General Public License
20
 
 *  along with this program; if not, write to the Free Software
21
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
22
 
 */
23
 
 
24
 
#include "config.h"
25
 
#include "drizzled/algorithm/crc32.h"
26
 
#include "drizzled/gettext.h"
27
 
#include "drizzled/replication_services.h"
28
 
 
 
1
#include <iostream>
 
2
#include <fstream>
 
3
#include <unistd.h>
29
4
#include <sys/types.h>
30
5
#include <sys/stat.h>
31
6
#include <fcntl.h>
32
7
#include <string>
33
 
#include <fstream>
34
 
#include <unistd.h>
35
 
 
36
 
#if TIME_WITH_SYS_TIME
37
 
# include <sys/time.h>
38
 
# include <time.h>
39
 
#else
40
 
# if HAVE_SYS_TIME_H
41
 
#  include <sys/time.h>
42
 
# else
43
 
#  include <time.h>
44
 
# endif
45
 
#endif
46
 
 
47
 
#include <drizzled/message/transaction.pb.h>
48
 
 
49
 
#include <google/protobuf/io/coded_stream.h>
50
 
#include <google/protobuf/io/zero_copy_stream_impl.h>
51
 
 
52
 
#include "drizzled/gettext.h"
53
 
 
54
 
/** 
55
 
 * @file Example script for writing transactions to a log file.
56
 
 */
 
8
#include <uuid/uuid.h>
 
9
 
 
10
#include <drizzled/serialize/replication_event.pb.h>
57
11
 
58
12
using namespace std;
59
 
using namespace drizzled;
60
 
using namespace google;
61
 
 
62
 
static uint32_t server_id= 1;
63
 
static uint64_t transaction_id= 1;
64
 
 
65
 
static uint64_t getNanoTimestamp()
66
 
{
67
 
#ifdef HAVE_CLOCK_GETTIME
68
 
  struct timespec tp;
69
 
  clock_gettime(CLOCK_REALTIME, &tp);
70
 
  return (uint64_t) tp.tv_sec * 10000000
71
 
       + (uint64_t) tp.tv_nsec;
72
 
#else
73
 
  struct timeval tv;
74
 
  gettimeofday(&tv,NULL);
75
 
  return (uint64_t) tv.tv_sec * 10000000
76
 
       + (uint64_t) tv.tv_usec * 1000;
77
 
#endif
78
 
}
79
 
 
80
 
static void initTransactionContext(message::Transaction &transaction)
81
 
{
82
 
  message::TransactionContext *ctx= transaction.mutable_transaction_context();
83
 
  ctx->set_transaction_id(transaction_id++);
84
 
  ctx->set_start_timestamp(getNanoTimestamp());
85
 
  ctx->set_server_id(server_id);
86
 
}
87
 
 
88
 
static void finalizeTransactionContext(message::Transaction &transaction)
89
 
{
90
 
  message::TransactionContext *ctx= transaction.mutable_transaction_context();
91
 
  ctx->set_end_timestamp(getNanoTimestamp());
92
 
}
93
 
 
94
 
static void doCreateTable1(message::Transaction &transaction)
95
 
{
96
 
  message::Statement *statement= transaction.add_statement();
97
 
 
98
 
  statement->set_type(message::Statement::RAW_SQL);
99
 
  statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
100
 
  statement->set_start_timestamp(getNanoTimestamp());
101
 
  statement->set_end_timestamp(getNanoTimestamp());
102
 
}
103
 
 
104
 
static void doCreateTable2(message::Transaction &transaction)
105
 
{
106
 
  message::Statement *statement= transaction.add_statement();
107
 
 
108
 
  statement->set_type(message::Statement::RAW_SQL);
109
 
  statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
110
 
  statement->set_start_timestamp(getNanoTimestamp());
111
 
  statement->set_end_timestamp(getNanoTimestamp());
112
 
}
113
 
 
114
 
static void doCreateTable3(message::Transaction &transaction)
115
 
{
116
 
  message::Statement *statement= transaction.add_statement();
117
 
 
118
 
  statement->set_type(message::Statement::RAW_SQL);
119
 
  statement->set_sql("CREATE TABLE t3 (a INTEGER NOT NULL, b BLOB NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
120
 
  statement->set_start_timestamp(getNanoTimestamp());
121
 
  statement->set_end_timestamp(getNanoTimestamp());
122
 
}
123
 
 
124
 
static void doSimpleInsert(message::Transaction &transaction)
125
 
{
126
 
  message::Statement *statement= transaction.add_statement();
127
 
 
128
 
  /* Do generic Statement setup */
129
 
  statement->set_type(message::Statement::INSERT);
130
 
  statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
131
 
  statement->set_start_timestamp(getNanoTimestamp());
132
 
 
133
 
  /* Do INSERT-specific header and setup */
134
 
  message::InsertHeader *header= statement->mutable_insert_header();
135
 
 
136
 
  /* Add table and field metadata for the statement */
137
 
  message::TableMetadata *t_meta= header->mutable_table_metadata();
138
 
  t_meta->set_schema_name("test");
139
 
  t_meta->set_table_name("t1");
140
 
 
141
 
  message::FieldMetadata *f_meta= header->add_field_metadata();
142
 
  f_meta->set_name("a");
143
 
  f_meta->set_type(message::Table::Field::VARCHAR);
144
 
 
145
 
  /* Add new values... */
146
 
  message::InsertData *data= statement->mutable_insert_data();
147
 
  data->set_segment_id(1);
148
 
  data->set_end_segment(true);
149
 
 
150
 
  message::InsertRecord *record1= data->add_record();
151
 
  message::InsertRecord *record2= data->add_record();
152
 
 
153
 
  record1->add_insert_value("1");
154
 
  record2->add_insert_value("2");
155
 
 
156
 
  statement->set_end_timestamp(getNanoTimestamp());
157
 
}
158
 
 
159
 
static void doNonVarcharInsert(message::Transaction &transaction)
160
 
{
161
 
  message::Statement *statement= transaction.add_statement();
162
 
 
163
 
  /* Do generic Statement setup */
164
 
  statement->set_type(message::Statement::INSERT);
165
 
  statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
166
 
  statement->set_start_timestamp(getNanoTimestamp());
167
 
 
168
 
  /* Do INSERT-specific header and setup */
169
 
  message::InsertHeader *header= statement->mutable_insert_header();
170
 
 
171
 
  /* Add table and field metadata for the statement */
172
 
  message::TableMetadata *t_meta= header->mutable_table_metadata();
173
 
  t_meta->set_schema_name("test");
174
 
  t_meta->set_table_name("t2");
175
 
 
176
 
  message::FieldMetadata *f_meta= header->add_field_metadata();
177
 
  f_meta->set_name("a");
178
 
  f_meta->set_type(message::Table::Field::INTEGER);
179
 
 
180
 
  /* Add new values... */
181
 
  message::InsertData *data= statement->mutable_insert_data();
182
 
  data->set_segment_id(1);
183
 
  data->set_end_segment(true);
184
 
 
185
 
  message::InsertRecord *record1= data->add_record();
186
 
  message::InsertRecord *record2= data->add_record();
187
 
 
188
 
  record1->add_insert_value("1");
189
 
  record2->add_insert_value("2");
190
 
 
191
 
  statement->set_end_timestamp(getNanoTimestamp());
192
 
}
193
 
 
194
 
static void doBlobInsert(message::Transaction &transaction)
195
 
{
196
 
  message::Statement *statement= transaction.add_statement();
197
 
 
198
 
  /* Do generic Statement setup */
199
 
  statement->set_type(message::Statement::INSERT);
200
 
  statement->set_sql("INSERT INTO t3 (a, b) VALUES (1, 'test\0me')", 43); /* 43 == length including \0 */
201
 
  statement->set_start_timestamp(getNanoTimestamp());
202
 
 
203
 
  /* Do INSERT-specific header and setup */
204
 
  message::InsertHeader *header= statement->mutable_insert_header();
205
 
 
206
 
  /* Add table and field metadata for the statement */
207
 
  message::TableMetadata *t_meta= header->mutable_table_metadata();
208
 
  t_meta->set_schema_name("test");
209
 
  t_meta->set_table_name("t3");
210
 
 
211
 
  message::FieldMetadata *f_meta= header->add_field_metadata();
212
 
  f_meta->set_name("a");
213
 
  f_meta->set_type(message::Table::Field::INTEGER);
214
 
 
215
 
  f_meta= header->add_field_metadata();
216
 
  f_meta->set_name("b");
217
 
  f_meta->set_type(message::Table::Field::BLOB);
218
 
 
219
 
  /* Add new values... */
220
 
  message::InsertData *data= statement->mutable_insert_data();
221
 
  data->set_segment_id(1);
222
 
  data->set_end_segment(true);
223
 
 
224
 
  message::InsertRecord *record1= data->add_record();
225
 
 
226
 
  record1->add_insert_value("1");
227
 
  record1->add_insert_value("test\0me", 7); /* 7 == length including \0 */
228
 
 
229
 
  statement->set_end_timestamp(getNanoTimestamp());
230
 
}
231
 
 
232
 
static void doSimpleDelete(message::Transaction &transaction)
233
 
{
234
 
  message::Statement *statement= transaction.add_statement();
235
 
 
236
 
  /* Do generic Statement setup */
237
 
  statement->set_type(message::Statement::DELETE);
238
 
  statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
239
 
  statement->set_start_timestamp(getNanoTimestamp());
240
 
 
241
 
  /* Do DELETE-specific header and setup */
242
 
  message::DeleteHeader *header= statement->mutable_delete_header();
243
 
 
244
 
  /* Add table and field metadata for the statement */
245
 
  message::TableMetadata *t_meta= header->mutable_table_metadata();
246
 
  t_meta->set_schema_name("test");
247
 
  t_meta->set_table_name("t1");
248
 
 
249
 
  message::FieldMetadata *f_meta= header->add_key_field_metadata();
250
 
  f_meta->set_name("a");
251
 
  f_meta->set_type(message::Table::Field::VARCHAR);
252
 
 
253
 
  /* Add new values... */
254
 
  message::DeleteData *data= statement->mutable_delete_data();
255
 
  data->set_segment_id(1);
256
 
  data->set_end_segment(true);
257
 
 
258
 
  message::DeleteRecord *record1= data->add_record();
259
 
 
260
 
  record1->add_key_value("1");
261
 
 
262
 
  statement->set_end_timestamp(getNanoTimestamp());
263
 
}
264
 
 
265
 
static void doSimpleUpdate(message::Transaction &transaction)
266
 
{
267
 
  message::Statement *statement= transaction.add_statement();
268
 
 
269
 
  /* Do generic Statement setup */
270
 
  statement->set_type(message::Statement::UPDATE);
271
 
  statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
272
 
  statement->set_start_timestamp(getNanoTimestamp());
273
 
 
274
 
  /* Do UPDATE-specific header and setup */
275
 
  message::UpdateHeader *header= statement->mutable_update_header();
276
 
 
277
 
  /* Add table and field metadata for the statement */
278
 
  message::TableMetadata *t_meta= header->mutable_table_metadata();
279
 
  t_meta->set_schema_name("test");
280
 
  t_meta->set_table_name("t1");
281
 
 
282
 
  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
283
 
  kf_meta->set_name("a");
284
 
  kf_meta->set_type(message::Table::Field::VARCHAR);
285
 
 
286
 
  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
287
 
  sf_meta->set_name("a");
288
 
  sf_meta->set_type(message::Table::Field::VARCHAR);
289
 
 
290
 
  /* Add new values... */
291
 
  message::UpdateData *data= statement->mutable_update_data();
292
 
  data->set_segment_id(1);
293
 
  data->set_end_segment(true);
294
 
 
295
 
  message::UpdateRecord *record1= data->add_record();
296
 
 
297
 
  record1->add_after_value("5");
298
 
  record1->add_key_value("1");
299
 
 
300
 
  statement->set_end_timestamp(getNanoTimestamp());
301
 
}
302
 
 
303
 
static void doMultiKeyUpdate(message::Transaction &transaction)
304
 
{
305
 
  message::Statement *statement= transaction.add_statement();
306
 
 
307
 
  /* Do generic Statement setup */
308
 
  statement->set_type(message::Statement::UPDATE);
309
 
  statement->set_sql("UPDATE t1 SET a = \"5\"");
310
 
  statement->set_start_timestamp(getNanoTimestamp());
311
 
 
312
 
  /* Do UPDATE-specific header and setup */
313
 
  message::UpdateHeader *header= statement->mutable_update_header();
314
 
 
315
 
  /* Add table and field metadata for the statement */
316
 
  message::TableMetadata *t_meta= header->mutable_table_metadata();
317
 
  t_meta->set_schema_name("test");
318
 
  t_meta->set_table_name("t1");
319
 
 
320
 
  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
321
 
  kf_meta->set_name("a");
322
 
  kf_meta->set_type(message::Table::Field::VARCHAR);
323
 
 
324
 
  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
325
 
  sf_meta->set_name("a");
326
 
  sf_meta->set_type(message::Table::Field::VARCHAR);
327
 
 
328
 
  /* Add new values... */
329
 
  message::UpdateData *data= statement->mutable_update_data();
330
 
  data->set_segment_id(1);
331
 
  data->set_end_segment(true);
332
 
 
333
 
  message::UpdateRecord *record1= data->add_record();
334
 
  message::UpdateRecord *record2= data->add_record();
335
 
 
336
 
  record1->add_after_value("5");
337
 
  record1->add_key_value("1");
338
 
  record2->add_after_value("5");
339
 
  record2->add_key_value("2");
340
 
 
341
 
  statement->set_end_timestamp(getNanoTimestamp());
342
 
}
343
 
 
344
 
static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
345
 
{
346
 
  std::string buffer("");
347
 
  finalizeTransactionContext(transaction);
348
 
  transaction.SerializeToString(&buffer);
349
 
 
350
 
  size_t length= buffer.length();
351
 
 
352
 
  output->WriteLittleEndian32(static_cast<uint32_t>(ReplicationServices::TRANSACTION));
353
 
  output->WriteLittleEndian32(static_cast<uint32_t>(length));
354
 
  output->WriteString(buffer);
355
 
  output->WriteLittleEndian32(drizzled::algorithm::crc32(buffer.c_str(), length)); /* checksum */
356
 
}
 
13
using namespace drizzle;
 
14
 
 
15
static uint64_t query_id= 0;
 
16
char transaction_id[37];
 
17
 
 
18
/*
 
19
  Example script for reader a Drizzle master replication list.
 
20
*/
 
21
 
 
22
void write_ddl(::drizzle::Event *record, const char *sql)
 
23
{
 
24
  uuid_t uu;
 
25
 
 
26
  uuid_generate_time(uu);
 
27
  uuid_unparse(uu, transaction_id);
 
28
 
 
29
  record->set_type(Event::DDL);
 
30
  record->set_autocommit(true);
 
31
  record->set_server_id("localhost");
 
32
  record->set_query_id(query_id++);
 
33
  record->set_transaction_id(transaction_id);
 
34
  record->set_schema("test");
 
35
  record->set_sql(sql);
 
36
}
 
37
 
 
38
void write_insert(::drizzle::Event *record, const char *trx)
 
39
{
 
40
  Event::Value *value;
 
41
 
 
42
  record->set_type(Event::INSERT);
 
43
  record->set_autocommit(true);
 
44
  record->set_server_id("localhost");
 
45
  record->set_query_id(query_id++);
 
46
  record->set_transaction_id(trx);
 
47
  record->set_schema("test");
 
48
  record->set_table("t1");
 
49
  record->set_sql("INSERT INTO t1 (a) VALUES (1) (2)");
 
50
 
 
51
  /* Add Field Names */
 
52
  record->add_field_names("a");
 
53
 
 
54
  /* Add values (first row) */
 
55
  value= record->add_values();
 
56
  value->add_value("1");
 
57
 
 
58
  /* Add values (second row) */
 
59
  value= record->add_values();
 
60
  value->add_value("2");
 
61
}
 
62
 
 
63
void write_delete(::drizzle::Event *record, const char *trx)
 
64
{
 
65
  Event::Value *value;
 
66
 
 
67
  record->set_type(Event::DELETE);
 
68
  record->set_autocommit(true);
 
69
  record->set_server_id("localhost");
 
70
  record->set_query_id(query_id++);
 
71
  record->set_transaction_id(trx);
 
72
  record->set_schema("test");
 
73
  record->set_table("t1");
 
74
  record->set_sql("DELETE FROM t1 WHERE a IN (1, 2)");
 
75
 
 
76
  /* Add Field Names */
 
77
  record->set_primary_key("a");
 
78
 
 
79
  /* Add values for IN() */
 
80
  value= record->add_values();
 
81
  value->add_value("1");
 
82
  value->add_value("2");
 
83
}
 
84
 
 
85
void write_update(::drizzle::Event *record, const char *trx)
 
86
{
 
87
  Event::Value *value;
 
88
 
 
89
  record->set_type(Event::UPDATE);
 
90
  record->set_autocommit(true);
 
91
  record->set_server_id("localhost");
 
92
  record->set_query_id(query_id++);
 
93
  record->set_transaction_id(trx);
 
94
  record->set_schema("test");
 
95
  record->set_table("t1");
 
96
  record->set_sql("UPDATE t1 SET a=5 WHERE a = 1 ");
 
97
  record->set_primary_key("a");
 
98
 
 
99
  /* Add Field Names */
 
100
  record->add_field_names("a");
 
101
 
 
102
  /* Add values (first row) */
 
103
  value= record->add_values();
 
104
  value->add_value("1"); // The first value is always the primary key comparison value
 
105
  value->add_value("5");
 
106
 
 
107
  /* Add values (second row) */
 
108
  value= record->add_values();
 
109
  value->add_value("2");
 
110
  value->add_value("6");
 
111
}
 
112
 
 
113
void write_to_disk(int file, ::drizzle::EventList *list)
 
114
{
 
115
  std::string buffer;
 
116
  size_t length;
 
117
  size_t written;
 
118
 
 
119
  list->SerializePartialToString(&buffer);
 
120
 
 
121
  length= buffer.length();
 
122
 
 
123
  cout << "Writing record of " << length << "." << endl;
 
124
 
 
125
  if ((written= write(file, &length, sizeof(uint64_t))) != sizeof(uint64_t))
 
126
  {
 
127
    cerr << "Only wrote " << written << " out of " << length << "." << endl;
 
128
    exit(1);
 
129
  }
 
130
 
 
131
  if ((written= write(file, buffer.c_str(), length)) != length)
 
132
  {
 
133
    cerr << "Only wrote " << written << " out of " << length << "." << endl;
 
134
    exit(1);
 
135
  }
 
136
}
 
137
 
357
138
 
358
139
int main(int argc, char* argv[])
359
140
{
362
143
 
363
144
  if (argc != 2) 
364
145
  {
365
 
    fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
 
146
    cerr << "Usage:  " << argv[0] << " REPLICATION_EVENT_LOG " << endl;
366
147
    return -1;
367
148
  }
368
149
 
369
150
  if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
370
151
  {
371
 
    fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
372
 
    return -1;
 
152
    cerr << "Can not open file: " << argv[0] << endl;
 
153
   exit(0);
373
154
  }
374
155
 
375
 
  protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
376
 
  protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
377
 
 
378
 
  /* Write a series of statements which test each type of Statement */
379
 
  message::Transaction transaction;
380
 
 
381
 
  /* Simple CREATE TABLE statements as raw sql */
382
 
  initTransactionContext(transaction);
383
 
  doCreateTable1(transaction);
384
 
  writeTransaction(coded_output, transaction);
385
 
  transaction.Clear();
386
 
 
387
 
  initTransactionContext(transaction);
388
 
  doCreateTable2(transaction);
389
 
  writeTransaction(coded_output, transaction);
390
 
  transaction.Clear();
391
 
 
392
 
  /* Simple INSERT statement */
393
 
  initTransactionContext(transaction);
394
 
  doSimpleInsert(transaction);
395
 
  writeTransaction(coded_output, transaction);
396
 
  transaction.Clear();
397
 
 
398
 
  /* Write a DELETE and an UPDATE in one transaction */
399
 
  initTransactionContext(transaction);
400
 
  doSimpleDelete(transaction);
401
 
  doSimpleUpdate(transaction);
402
 
  writeTransaction(coded_output, transaction);
403
 
  transaction.Clear();
404
 
 
405
 
  /* Test an INSERT into non-varchar columns */
406
 
  initTransactionContext(transaction);
407
 
  doNonVarcharInsert(transaction);
408
 
  writeTransaction(coded_output, transaction);
409
 
  transaction.Clear();
410
 
 
411
 
  /* Write an UPDATE which affects >1 row */
412
 
  initTransactionContext(transaction);
413
 
  doMultiKeyUpdate(transaction);
414
 
  writeTransaction(coded_output, transaction);
415
 
  transaction.Clear();
416
 
 
417
 
  /* Write an INSERT which writes BLOB data */
418
 
  initTransactionContext(transaction);
419
 
  doCreateTable3(transaction);
420
 
  doBlobInsert(transaction);
421
 
  writeTransaction(coded_output, transaction);
422
 
  transaction.Clear();
423
 
 
424
 
  delete coded_output;
425
 
  delete raw_output;
 
156
  EventList list;
 
157
 
 
158
  /* Write first set of records */
 
159
  write_ddl(list.add_event(), "CREATE TABLE A (a int) ENGINE=innodb");
 
160
  write_insert(list.add_event(), transaction_id);
 
161
 
 
162
  write_to_disk(file, &list);
 
163
 
 
164
  /* Write Second set of records */
 
165
  write_ddl(list.add_event(), "CREATE TABLE A (a int) ENGINE=innodb");
 
166
  write_delete(list.add_event(), transaction_id);
 
167
  write_update(list.add_event(), transaction_id);
 
168
 
 
169
  write_to_disk(file, &list);
 
170
 
 
171
  close(file);
426
172
 
427
173
  return 0;
428
174
}