~drizzle-trunk/drizzle/development

1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2009 Sun Microsystems
5
 *
6
 *  Authors:
7
 *
8
 *    Jay Pipes <joinfu@sun.com>
9
 *
10
 *  This program is free software; you can redistribute it and/or modify
11
 *  it under the terms of the GNU General Public License as published by
12
 *  the Free Software Foundation; version 2 of the License.
13
 *
14
 *  This program is distributed in the hope that it will be useful,
15
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 *  GNU General Public License for more details.
18
 *
19
 *  You should have received a copy of the GNU General Public License
20
 *  along with this program; if not, write to the Free Software
21
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
22
 */
23
1122.2.2 by Monty Taylor
Added missing copyright headers. Added drizzled/global.h to a few things that
24
#include "drizzled/global.h"
1143.2.23 by Jay Pipes
Merge trunk and resolve all conflicts.
25
#include "drizzled/hash/crc32.h"
26
#include "drizzled/gettext.h"
27
#include "drizzled/replication_services.h"
1122.2.2 by Monty Taylor
Added missing copyright headers. Added drizzled/global.h to a few things that
28
671 by Brian Aker
Cleaned up events for writing in replication (using simple file
29
#include <sys/types.h>
30
#include <sys/stat.h>
31
#include <fcntl.h>
636 by Brian Aker
First pass with new event API (yeah... it will be better).
32
#include <string>
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
33
#include <fstream>
34
#include <unistd.h>
35
#include <drizzled/message/transaction.pb.h>
36
37
#include <google/protobuf/io/coded_stream.h>
38
#include <google/protobuf/io/zero_copy_stream_impl.h>
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
39
1130.3.29 by Monty Taylor
Merged with latest.
40
#include "drizzled/gettext.h"
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
41
42
/** 
43
 * @file Example script for writing transactions to a log file.
44
 */
779.3.18 by Monty Taylor
Cleaned up warnings up through innodb.
45
636 by Brian Aker
First pass with new event API (yeah... it will be better).
46
using namespace std;
1101.2.1 by Monty Taylor
Fixed the first set of using namespace
47
using namespace drizzled;
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
48
using namespace google;
636 by Brian Aker
First pass with new event API (yeah... it will be better).
49
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
50
static uint32_t server_id= 1;
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
51
static uint64_t transaction_id= 1;
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
52
1085.1.2 by Monty Taylor
Fixed -Wmissing-declarations
53
static uint64_t getNanoTimestamp()
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
54
{
55
#ifdef HAVE_CLOCK_GETTIME
56
  struct timespec tp;
57
  clock_gettime(CLOCK_REALTIME, &tp);
58
  return (uint64_t) tp.tv_sec * 10000000
59
       + (uint64_t) tp.tv_nsec;
60
#else
61
  struct timeval tv;
62
  gettimeofday(&tv,NULL);
63
  return (uint64_t) tv.tv_sec * 10000000
64
       + (uint64_t) tv.tv_usec * 1000;
65
#endif
66
}
67
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
68
static void initTransactionContext(message::Transaction &transaction)
69
{
70
  message::TransactionContext *ctx= transaction.mutable_transaction_context();
71
  ctx->set_transaction_id(transaction_id++);
72
  ctx->set_start_timestamp(getNanoTimestamp());
73
  ctx->set_server_id(server_id);
74
}
75
76
static void finalizeTransactionContext(message::Transaction &transaction)
77
{
78
  message::TransactionContext *ctx= transaction.mutable_transaction_context();
79
  ctx->set_end_timestamp(getNanoTimestamp());
80
}
81
1143.2.7 by Jay Pipes
Based on IRC discussions with Brian, this patch reworks the Statement
82
static void doCreateTable1(message::Transaction &transaction)
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
83
{
84
  message::Statement *statement= transaction.add_statement();
85
86
  statement->set_type(message::Statement::RAW_SQL);
87
  statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
88
  statement->set_start_timestamp(getNanoTimestamp());
89
  statement->set_end_timestamp(getNanoTimestamp());
90
}
91
1143.2.7 by Jay Pipes
Based on IRC discussions with Brian, this patch reworks the Statement
92
static void doCreateTable2(message::Transaction &transaction)
93
{
94
  message::Statement *statement= transaction.add_statement();
95
96
  statement->set_type(message::Statement::RAW_SQL);
97
  statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
98
  statement->set_start_timestamp(getNanoTimestamp());
99
  statement->set_end_timestamp(getNanoTimestamp());
100
}
101
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
102
static void doSimpleInsert(message::Transaction &transaction)
103
{
104
  message::Statement *statement= transaction.add_statement();
105
106
  /* Do generic Statement setup */
107
  statement->set_type(message::Statement::INSERT);
108
  statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
109
  statement->set_start_timestamp(getNanoTimestamp());
110
111
  /* Do INSERT-specific header and setup */
112
  message::InsertHeader *header= statement->mutable_insert_header();
113
114
  /* Add table and field metadata for the statement */
115
  message::TableMetadata *t_meta= header->mutable_table_metadata();
116
  t_meta->set_schema_name("test");
117
  t_meta->set_table_name("t1");
118
119
  message::FieldMetadata *f_meta= header->add_field_metadata();
120
  f_meta->set_name("a");
121
  f_meta->set_type(message::Table::Field::VARCHAR);
122
123
  /* Add new values... */
124
  message::InsertData *data= statement->mutable_insert_data();
125
  data->set_segment_id(1);
126
  data->set_end_segment(true);
127
128
  message::InsertRecord *record1= data->add_record();
129
  message::InsertRecord *record2= data->add_record();
130
1143.2.7 by Jay Pipes
Based on IRC discussions with Brian, this patch reworks the Statement
131
  record1->add_insert_value("1");
132
  record2->add_insert_value("2");
133
134
  statement->set_end_timestamp(getNanoTimestamp());
135
}
136
137
static void doNonVarcharInsert(message::Transaction &transaction)
138
{
139
  message::Statement *statement= transaction.add_statement();
140
141
  /* Do generic Statement setup */
142
  statement->set_type(message::Statement::INSERT);
143
  statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
144
  statement->set_start_timestamp(getNanoTimestamp());
145
146
  /* Do INSERT-specific header and setup */
147
  message::InsertHeader *header= statement->mutable_insert_header();
148
149
  /* Add table and field metadata for the statement */
150
  message::TableMetadata *t_meta= header->mutable_table_metadata();
151
  t_meta->set_schema_name("test");
152
  t_meta->set_table_name("t2");
153
154
  message::FieldMetadata *f_meta= header->add_field_metadata();
155
  f_meta->set_name("a");
156
  f_meta->set_type(message::Table::Field::INTEGER);
157
158
  /* Add new values... */
159
  message::InsertData *data= statement->mutable_insert_data();
160
  data->set_segment_id(1);
161
  data->set_end_segment(true);
162
163
  message::InsertRecord *record1= data->add_record();
164
  message::InsertRecord *record2= data->add_record();
165
166
  record1->add_insert_value("1");
167
  record2->add_insert_value("2");
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
168
169
  statement->set_end_timestamp(getNanoTimestamp());
170
}
171
172
static void doSimpleDelete(message::Transaction &transaction)
173
{
174
  message::Statement *statement= transaction.add_statement();
175
176
  /* Do generic Statement setup */
177
  statement->set_type(message::Statement::DELETE);
178
  statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
179
  statement->set_start_timestamp(getNanoTimestamp());
180
181
  /* Do DELETE-specific header and setup */
182
  message::DeleteHeader *header= statement->mutable_delete_header();
183
184
  /* Add table and field metadata for the statement */
185
  message::TableMetadata *t_meta= header->mutable_table_metadata();
186
  t_meta->set_schema_name("test");
187
  t_meta->set_table_name("t1");
188
189
  message::FieldMetadata *f_meta= header->add_key_field_metadata();
190
  f_meta->set_name("a");
191
  f_meta->set_type(message::Table::Field::VARCHAR);
192
193
  /* Add new values... */
194
  message::DeleteData *data= statement->mutable_delete_data();
195
  data->set_segment_id(1);
196
  data->set_end_segment(true);
197
198
  message::DeleteRecord *record1= data->add_record();
199
200
  record1->add_key_value("1");
201
202
  statement->set_end_timestamp(getNanoTimestamp());
203
}
204
205
static void doSimpleUpdate(message::Transaction &transaction)
206
{
207
  message::Statement *statement= transaction.add_statement();
208
209
  /* Do generic Statement setup */
210
  statement->set_type(message::Statement::UPDATE);
211
  statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
212
  statement->set_start_timestamp(getNanoTimestamp());
213
214
  /* Do UPDATE-specific header and setup */
215
  message::UpdateHeader *header= statement->mutable_update_header();
216
217
  /* Add table and field metadata for the statement */
218
  message::TableMetadata *t_meta= header->mutable_table_metadata();
219
  t_meta->set_schema_name("test");
220
  t_meta->set_table_name("t1");
221
222
  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
223
  kf_meta->set_name("a");
224
  kf_meta->set_type(message::Table::Field::VARCHAR);
225
226
  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
227
  sf_meta->set_name("a");
228
  sf_meta->set_type(message::Table::Field::VARCHAR);
229
230
  /* Add new values... */
231
  message::UpdateData *data= statement->mutable_update_data();
232
  data->set_segment_id(1);
233
  data->set_end_segment(true);
234
235
  message::UpdateRecord *record1= data->add_record();
236
1143.2.33 by Jay Pipes
This patch fixes a bug in the replication service and transaction
237
  record1->add_after_value("5");
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
238
  record1->add_key_value("1");
239
240
  statement->set_end_timestamp(getNanoTimestamp());
241
}
242
1143.2.7 by Jay Pipes
Based on IRC discussions with Brian, this patch reworks the Statement
243
static void doMultiKeyUpdate(message::Transaction &transaction)
244
{
245
  message::Statement *statement= transaction.add_statement();
246
247
  /* Do generic Statement setup */
248
  statement->set_type(message::Statement::UPDATE);
249
  statement->set_sql("UPDATE t1 SET a = \"5\"");
250
  statement->set_start_timestamp(getNanoTimestamp());
251
252
  /* Do UPDATE-specific header and setup */
253
  message::UpdateHeader *header= statement->mutable_update_header();
254
255
  /* Add table and field metadata for the statement */
256
  message::TableMetadata *t_meta= header->mutable_table_metadata();
257
  t_meta->set_schema_name("test");
258
  t_meta->set_table_name("t1");
259
260
  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
261
  kf_meta->set_name("a");
262
  kf_meta->set_type(message::Table::Field::VARCHAR);
263
264
  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
265
  sf_meta->set_name("a");
266
  sf_meta->set_type(message::Table::Field::VARCHAR);
267
268
  /* Add new values... */
269
  message::UpdateData *data= statement->mutable_update_data();
270
  data->set_segment_id(1);
271
  data->set_end_segment(true);
272
273
  message::UpdateRecord *record1= data->add_record();
274
  message::UpdateRecord *record2= data->add_record();
275
1143.2.33 by Jay Pipes
This patch fixes a bug in the replication service and transaction
276
  record1->add_after_value("5");
1143.2.7 by Jay Pipes
Based on IRC discussions with Brian, this patch reworks the Statement
277
  record1->add_key_value("1");
1143.2.33 by Jay Pipes
This patch fixes a bug in the replication service and transaction
278
  record2->add_after_value("5");
1143.2.7 by Jay Pipes
Based on IRC discussions with Brian, this patch reworks the Statement
279
  record2->add_key_value("2");
280
281
  statement->set_end_timestamp(getNanoTimestamp());
282
}
283
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
284
static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
285
{
286
  std::string buffer("");
287
  finalizeTransactionContext(transaction);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
288
  transaction.SerializeToString(&buffer);
671 by Brian Aker
Cleaned up events for writing in replication (using simple file
289
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
290
  size_t length= buffer.length();
291
1143.2.15 by Jay Pipes
This patch does the following:
292
  output->WriteLittleEndian32(static_cast<uint32_t>(ReplicationServices::TRANSACTION));
1143.2.12 by Jay Pipes
Fixes 64-bit length encoding to be 32-bit, which is what is supported by GPB. We will need to use a segment strategy for large blob records.
293
  output->WriteLittleEndian32(static_cast<uint32_t>(length));
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
294
  output->WriteString(buffer);
1143.2.10 by Jay Pipes
Phase 2 new replication work:
295
  output->WriteLittleEndian32(drizzled::hash::crc32(buffer.c_str(), length)); /* checksum */
671 by Brian Aker
Cleaned up events for writing in replication (using simple file
296
}
297
660.1.3 by Eric Herman
removed trailing whitespace with simple script:
298
int main(int argc, char* argv[])
636 by Brian Aker
First pass with new event API (yeah... it will be better).
299
{
300
  GOOGLE_PROTOBUF_VERIFY_VERSION;
671 by Brian Aker
Cleaned up events for writing in replication (using simple file
301
  int file;
636 by Brian Aker
First pass with new event API (yeah... it will be better).
302
671 by Brian Aker
Cleaned up events for writing in replication (using simple file
303
  if (argc != 2) 
304
  {
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
305
    fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
636 by Brian Aker
First pass with new event API (yeah... it will be better).
306
    return -1;
307
  }
308
671 by Brian Aker
Cleaned up events for writing in replication (using simple file
309
  if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
310
  {
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
311
    fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
312
    return -1;
671 by Brian Aker
Cleaned up events for writing in replication (using simple file
313
  }
314
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
315
  protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
316
  protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
317
318
  /* Write a series of statements which test each type of Statement */
319
  message::Transaction transaction;
320
1143.2.7 by Jay Pipes
Based on IRC discussions with Brian, this patch reworks the Statement
321
  /* Simple CREATE TABLE statements as raw sql */
322
  initTransactionContext(transaction);
323
  doCreateTable1(transaction);
324
  writeTransaction(coded_output, transaction);
325
  transaction.Clear();
326
327
  initTransactionContext(transaction);
328
  doCreateTable2(transaction);
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
329
  writeTransaction(coded_output, transaction);
330
  transaction.Clear();
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
331
332
  /* Simple INSERT statement */
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
333
  initTransactionContext(transaction);
334
  doSimpleInsert(transaction);
335
  writeTransaction(coded_output, transaction);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
336
  transaction.Clear();
337
338
  /* Write a DELETE and an UPDATE in one transaction */
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
339
  initTransactionContext(transaction);
340
  doSimpleDelete(transaction);
341
  doSimpleUpdate(transaction);
342
  writeTransaction(coded_output, transaction);
1143.2.7 by Jay Pipes
Based on IRC discussions with Brian, this patch reworks the Statement
343
  transaction.Clear();
344
345
  /* Test an INSERT into non-varchar columns */
346
  initTransactionContext(transaction);
347
  doNonVarcharInsert(transaction);
348
  writeTransaction(coded_output, transaction);
349
  transaction.Clear();
350
351
  /* Write an UPDATE which affects >1 row */
352
  initTransactionContext(transaction);
353
  doMultiKeyUpdate(transaction);
354
  writeTransaction(coded_output, transaction);
355
  transaction.Clear();
1143.2.4 by Jay Pipes
New transaction proto file containing message definitions to be
356
357
  delete coded_output;
358
  delete raw_output;
636 by Brian Aker
First pass with new event API (yeah... it will be better).
359
360
  return 0;
361
}