~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/transaction_writer.cc

  • Committer: Stewart Smith
  • Date: 2009-10-12 05:13:54 UTC
  • mfrom: (1178 staging)
  • mto: This revision was merged to the branch mainline in revision 1179.
  • Revision ID: stewart@flamingspork.com-20091012051354-2n7zpid9f67ddsa0
mergeĀ lp:drizzle/build

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#include <iostream>
2
 
#include <fstream>
3
 
#include <unistd.h>
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2009 Sun Microsystems
 
5
 *
 
6
 *  Authors:
 
7
 *
 
8
 *    Jay Pipes <joinfu@sun.com>
 
9
 *
 
10
 *  This program is free software; you can redistribute it and/or modify
 
11
 *  it under the terms of the GNU General Public License as published by
 
12
 *  the Free Software Foundation; version 2 of the License.
 
13
 *
 
14
 *  This program is distributed in the hope that it will be useful,
 
15
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
17
 *  GNU General Public License for more details.
 
18
 *
 
19
 *  You should have received a copy of the GNU General Public License
 
20
 *  along with this program; if not, write to the Free Software
 
21
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
22
 */
 
23
 
 
24
#include <drizzled/global.h>
 
25
#include <drizzled/gettext.h>
4
26
#include <sys/types.h>
5
27
#include <sys/stat.h>
6
28
#include <fcntl.h>
7
29
#include <string>
8
 
 
9
 
#include <sys/time.h>
10
 
 
11
 
#include <drizzled/message/replication.pb.h>
 
30
#include <fstream>
 
31
#include <unistd.h>
 
32
#include <drizzled/message/transaction.pb.h>
 
33
 
 
34
#include <google/protobuf/io/coded_stream.h>
 
35
#include <google/protobuf/io/zero_copy_stream_impl.h>
12
36
 
13
37
/** 
14
38
 * @file Example script for writing transactions to a log file.
16
40
 
17
41
using namespace std;
18
42
using namespace drizzled;
 
43
using namespace google;
19
44
 
20
45
static uint32_t server_id= 1;
21
 
static uint64_t transaction_id= 0;
 
46
static uint64_t transaction_id= 1;
22
47
 
23
48
static uint64_t getNanoTimestamp()
24
49
{
35
60
#endif
36
61
}
37
62
 
38
 
static void writeCommit(message::Command &record)
39
 
{
40
 
  record.set_type(message::Command::COMMIT);
41
 
  record.set_timestamp(getNanoTimestamp());
42
 
 
43
 
  message::TransactionContext *trx= record.mutable_transaction_context();
44
 
  trx->set_server_id(server_id);
45
 
  trx->set_transaction_id(transaction_id);
46
 
}
47
 
 
48
 
#if 0
49
 
static void writeRollback(message::Command &record)
50
 
{
51
 
  record.set_type(message::Command::ROLLBACK);
52
 
  record.set_timestamp(getNanoTimestamp());
53
 
 
54
 
  message::TransactionContext *trx= record.mutable_transaction_context();
55
 
  trx->set_server_id(server_id);
56
 
  trx->set_transaction_id(transaction_id);
57
 
}
58
 
#endif
59
 
 
60
 
static void writeStartTransaction(message::Command &record)
61
 
{
62
 
  record.set_type(message::Command::START_TRANSACTION);
63
 
  record.set_timestamp(getNanoTimestamp());
64
 
 
65
 
  message::TransactionContext *trx= record.mutable_transaction_context();
66
 
  trx->set_server_id(server_id);
67
 
  trx->set_transaction_id(transaction_id);
68
 
}
69
 
 
70
 
static void writeInsert(message::Command &record)
71
 
{
72
 
  record.set_type(message::Command::INSERT);
73
 
  record.set_sql("INSERT INTO t1 (a) VALUES (1) (2)");
74
 
  record.set_timestamp(getNanoTimestamp());
75
 
  record.set_schema("test");
76
 
  record.set_table("t1");
77
 
 
78
 
  message::TransactionContext *trx= record.mutable_transaction_context();
79
 
  trx->set_server_id(server_id);
80
 
  trx->set_transaction_id(transaction_id);
81
 
 
82
 
  message::InsertRecord *irecord= record.mutable_insert_record();
83
 
 
84
 
  /* Add Fields and Values... */
85
 
 
86
 
  message::Table::Field *field= irecord->add_insert_field();
87
 
  field->set_name("a");
88
 
  field->set_type(message::Table::Field::VARCHAR);
89
 
 
90
 
  irecord->add_insert_value("1");
91
 
  irecord->add_insert_value("2");
92
 
}
93
 
 
94
 
static void writeDeleteWithPK(message::Command &record)
95
 
{
96
 
  record.set_type(message::Command::DELETE);
97
 
  record.set_sql("DELETE FROM t1 WHERE a = 1");
98
 
  record.set_timestamp(getNanoTimestamp());
99
 
  record.set_schema("test");
100
 
  record.set_table("t1");
101
 
 
102
 
  message::TransactionContext *trx= record.mutable_transaction_context();
103
 
  trx->set_server_id(server_id);
104
 
  trx->set_transaction_id(transaction_id);
105
 
 
106
 
  message::DeleteRecord *drecord= record.mutable_delete_record();
107
 
 
108
 
  message::Table::Field *field= drecord->add_where_field();
109
 
  field->set_name("a");
110
 
  field->set_type(message::Table::Field::VARCHAR);
111
 
 
112
 
  drecord->add_where_value("1");
113
 
}
114
 
 
115
 
static void writeUpdateWithPK(message::Command &record)
116
 
{
117
 
  record.set_type(message::Command::UPDATE);
118
 
  record.set_sql("UPDATE t1 SET a = 5 WHERE a = 1;");
119
 
  record.set_timestamp(getNanoTimestamp());
120
 
  record.set_schema("test");
121
 
  record.set_table("t1");
122
 
 
123
 
  message::TransactionContext *trx= record.mutable_transaction_context();
124
 
  trx->set_server_id(server_id);
125
 
  trx->set_transaction_id(transaction_id);
126
 
 
127
 
  message::UpdateRecord *urecord= record.mutable_update_record();
128
 
 
129
 
  message::Table::Field *field;
130
 
  
131
 
  field= urecord->add_update_field();
132
 
  field->set_name("a");
133
 
  field->set_type(message::Table::Field::VARCHAR);
134
 
 
135
 
  urecord->add_after_value("5");
136
 
 
137
 
  field= urecord->add_where_field();
138
 
  field->set_name("a");
139
 
  field->set_type(message::Table::Field::VARCHAR);
140
 
 
141
 
  urecord->add_where_value("1");
142
 
}
143
 
 
144
 
static void writeTransaction(int file, message::Transaction &transaction)
145
 
{
146
 
  std::string buffer;
147
 
  size_t length;
148
 
  size_t written;
149
 
 
150
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
151
 
  trx->set_server_id(server_id);
152
 
  trx->set_transaction_id(transaction_id);
153
 
 
 
63
static void initTransactionContext(message::Transaction &transaction)
 
64
{
 
65
  message::TransactionContext *ctx= transaction.mutable_transaction_context();
 
66
  ctx->set_transaction_id(transaction_id++);
 
67
  ctx->set_start_timestamp(getNanoTimestamp());
 
68
  ctx->set_server_id(server_id);
 
69
}
 
70
 
 
71
static void finalizeTransactionContext(message::Transaction &transaction)
 
72
{
 
73
  message::TransactionContext *ctx= transaction.mutable_transaction_context();
 
74
  ctx->set_end_timestamp(getNanoTimestamp());
 
75
}
 
76
 
 
77
static void doCreateTable1(message::Transaction &transaction)
 
78
{
 
79
  message::Statement *statement= transaction.add_statement();
 
80
 
 
81
  statement->set_type(message::Statement::RAW_SQL);
 
82
  statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
 
83
  statement->set_start_timestamp(getNanoTimestamp());
 
84
  statement->set_end_timestamp(getNanoTimestamp());
 
85
}
 
86
 
 
87
static void doCreateTable2(message::Transaction &transaction)
 
88
{
 
89
  message::Statement *statement= transaction.add_statement();
 
90
 
 
91
  statement->set_type(message::Statement::RAW_SQL);
 
92
  statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
 
93
  statement->set_start_timestamp(getNanoTimestamp());
 
94
  statement->set_end_timestamp(getNanoTimestamp());
 
95
}
 
96
 
 
97
static void doSimpleInsert(message::Transaction &transaction)
 
98
{
 
99
  message::Statement *statement= transaction.add_statement();
 
100
 
 
101
  /* Do generic Statement setup */
 
102
  statement->set_type(message::Statement::INSERT);
 
103
  statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
 
104
  statement->set_start_timestamp(getNanoTimestamp());
 
105
 
 
106
  /* Do INSERT-specific header and setup */
 
107
  message::InsertHeader *header= statement->mutable_insert_header();
 
108
 
 
109
  /* Add table and field metadata for the statement */
 
110
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
111
  t_meta->set_schema_name("test");
 
112
  t_meta->set_table_name("t1");
 
113
 
 
114
  message::FieldMetadata *f_meta= header->add_field_metadata();
 
115
  f_meta->set_name("a");
 
116
  f_meta->set_type(message::Table::Field::VARCHAR);
 
117
 
 
118
  /* Add new values... */
 
119
  message::InsertData *data= statement->mutable_insert_data();
 
120
  data->set_segment_id(1);
 
121
  data->set_end_segment(true);
 
122
 
 
123
  message::InsertRecord *record1= data->add_record();
 
124
  message::InsertRecord *record2= data->add_record();
 
125
 
 
126
  record1->add_insert_value("1");
 
127
  record2->add_insert_value("2");
 
128
 
 
129
  statement->set_end_timestamp(getNanoTimestamp());
 
130
}
 
131
 
 
132
static void doNonVarcharInsert(message::Transaction &transaction)
 
133
{
 
134
  message::Statement *statement= transaction.add_statement();
 
135
 
 
136
  /* Do generic Statement setup */
 
137
  statement->set_type(message::Statement::INSERT);
 
138
  statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
 
139
  statement->set_start_timestamp(getNanoTimestamp());
 
140
 
 
141
  /* Do INSERT-specific header and setup */
 
142
  message::InsertHeader *header= statement->mutable_insert_header();
 
143
 
 
144
  /* Add table and field metadata for the statement */
 
145
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
146
  t_meta->set_schema_name("test");
 
147
  t_meta->set_table_name("t2");
 
148
 
 
149
  message::FieldMetadata *f_meta= header->add_field_metadata();
 
150
  f_meta->set_name("a");
 
151
  f_meta->set_type(message::Table::Field::INTEGER);
 
152
 
 
153
  /* Add new values... */
 
154
  message::InsertData *data= statement->mutable_insert_data();
 
155
  data->set_segment_id(1);
 
156
  data->set_end_segment(true);
 
157
 
 
158
  message::InsertRecord *record1= data->add_record();
 
159
  message::InsertRecord *record2= data->add_record();
 
160
 
 
161
  record1->add_insert_value("1");
 
162
  record2->add_insert_value("2");
 
163
 
 
164
  statement->set_end_timestamp(getNanoTimestamp());
 
165
}
 
166
 
 
167
static void doSimpleDelete(message::Transaction &transaction)
 
168
{
 
169
  message::Statement *statement= transaction.add_statement();
 
170
 
 
171
  /* Do generic Statement setup */
 
172
  statement->set_type(message::Statement::DELETE);
 
173
  statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
 
174
  statement->set_start_timestamp(getNanoTimestamp());
 
175
 
 
176
  /* Do DELETE-specific header and setup */
 
177
  message::DeleteHeader *header= statement->mutable_delete_header();
 
178
 
 
179
  /* Add table and field metadata for the statement */
 
180
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
181
  t_meta->set_schema_name("test");
 
182
  t_meta->set_table_name("t1");
 
183
 
 
184
  message::FieldMetadata *f_meta= header->add_key_field_metadata();
 
185
  f_meta->set_name("a");
 
186
  f_meta->set_type(message::Table::Field::VARCHAR);
 
187
 
 
188
  /* Add new values... */
 
189
  message::DeleteData *data= statement->mutable_delete_data();
 
190
  data->set_segment_id(1);
 
191
  data->set_end_segment(true);
 
192
 
 
193
  message::DeleteRecord *record1= data->add_record();
 
194
 
 
195
  record1->add_key_value("1");
 
196
 
 
197
  statement->set_end_timestamp(getNanoTimestamp());
 
198
}
 
199
 
 
200
static void doSimpleUpdate(message::Transaction &transaction)
 
201
{
 
202
  message::Statement *statement= transaction.add_statement();
 
203
 
 
204
  /* Do generic Statement setup */
 
205
  statement->set_type(message::Statement::UPDATE);
 
206
  statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
 
207
  statement->set_start_timestamp(getNanoTimestamp());
 
208
 
 
209
  /* Do UPDATE-specific header and setup */
 
210
  message::UpdateHeader *header= statement->mutable_update_header();
 
211
 
 
212
  /* Add table and field metadata for the statement */
 
213
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
214
  t_meta->set_schema_name("test");
 
215
  t_meta->set_table_name("t1");
 
216
 
 
217
  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
 
218
  kf_meta->set_name("a");
 
219
  kf_meta->set_type(message::Table::Field::VARCHAR);
 
220
 
 
221
  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
 
222
  sf_meta->set_name("a");
 
223
  sf_meta->set_type(message::Table::Field::VARCHAR);
 
224
 
 
225
  header->add_set_value("5");
 
226
 
 
227
  /* Add new values... */
 
228
  message::UpdateData *data= statement->mutable_update_data();
 
229
  data->set_segment_id(1);
 
230
  data->set_end_segment(true);
 
231
 
 
232
  message::UpdateRecord *record1= data->add_record();
 
233
 
 
234
  record1->add_key_value("1");
 
235
 
 
236
  statement->set_end_timestamp(getNanoTimestamp());
 
237
}
 
238
 
 
239
static void doMultiKeyUpdate(message::Transaction &transaction)
 
240
{
 
241
  message::Statement *statement= transaction.add_statement();
 
242
 
 
243
  /* Do generic Statement setup */
 
244
  statement->set_type(message::Statement::UPDATE);
 
245
  statement->set_sql("UPDATE t1 SET a = \"5\"");
 
246
  statement->set_start_timestamp(getNanoTimestamp());
 
247
 
 
248
  /* Do UPDATE-specific header and setup */
 
249
  message::UpdateHeader *header= statement->mutable_update_header();
 
250
 
 
251
  /* Add table and field metadata for the statement */
 
252
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
253
  t_meta->set_schema_name("test");
 
254
  t_meta->set_table_name("t1");
 
255
 
 
256
  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
 
257
  kf_meta->set_name("a");
 
258
  kf_meta->set_type(message::Table::Field::VARCHAR);
 
259
 
 
260
  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
 
261
  sf_meta->set_name("a");
 
262
  sf_meta->set_type(message::Table::Field::VARCHAR);
 
263
 
 
264
  header->add_set_value("5");
 
265
 
 
266
  /* Add new values... */
 
267
  message::UpdateData *data= statement->mutable_update_data();
 
268
  data->set_segment_id(1);
 
269
  data->set_end_segment(true);
 
270
 
 
271
  message::UpdateRecord *record1= data->add_record();
 
272
  message::UpdateRecord *record2= data->add_record();
 
273
 
 
274
  record1->add_key_value("1");
 
275
  record2->add_key_value("2");
 
276
 
 
277
 
 
278
  statement->set_end_timestamp(getNanoTimestamp());
 
279
}
 
280
 
 
281
static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
 
282
{
 
283
  std::string buffer("");
 
284
  finalizeTransactionContext(transaction);
154
285
  transaction.SerializeToString(&buffer);
155
286
 
156
 
  length= buffer.length();
157
 
 
158
 
  cout << "Writing transaction of " << length << " length." << endl;
159
 
 
160
 
  if ((written= write(file, &length, sizeof(uint64_t))) != sizeof(uint64_t))
161
 
  {
162
 
    cerr << "Only wrote " << written << " out of " << length << "." << endl;
163
 
    exit(1);
164
 
  }
165
 
 
166
 
  if ((written= write(file, buffer.c_str(), length)) != length)
167
 
  {
168
 
    cerr << "Only wrote " << written << " out of " << length << "." << endl;
169
 
    exit(1);
170
 
  }
 
287
  size_t length= buffer.length();
 
288
 
 
289
  output->WriteLittleEndian64(static_cast<uint64_t>(length));
 
290
  output->WriteString(buffer);
171
291
}
172
292
 
173
293
int main(int argc, char* argv[])
177
297
 
178
298
  if (argc != 2) 
179
299
  {
180
 
    cerr << "Usage:  " << argv[0] << " TRANSACTION_LOG" << endl;
 
300
    fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
181
301
    return -1;
182
302
  }
183
303
 
184
304
  if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
185
305
  {
186
 
    cerr << "Can not open file: " << argv[0] << endl;
187
 
   exit(0);
 
306
    fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
 
307
    return -1;
188
308
  }
189
309
 
190
 
  /* Write a series of statements which test each type of record class */
191
 
  transaction_id++;
 
310
  protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
 
311
  protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
 
312
 
 
313
  /* Write a series of statements which test each type of Statement */
 
314
  message::Transaction transaction;
 
315
 
 
316
  /* Simple CREATE TABLE statements as raw sql */
 
317
  initTransactionContext(transaction);
 
318
  doCreateTable1(transaction);
 
319
  writeTransaction(coded_output, transaction);
 
320
  transaction.Clear();
 
321
 
 
322
  initTransactionContext(transaction);
 
323
  doCreateTable2(transaction);
 
324
  writeTransaction(coded_output, transaction);
 
325
  transaction.Clear();
192
326
 
193
327
  /* Simple INSERT statement */
194
 
  message::Transaction transaction;
195
 
  transaction.set_start_timestamp(getNanoTimestamp());
196
 
  writeStartTransaction(*transaction.add_command());
197
 
  writeInsert(*transaction.add_command());
198
 
  writeCommit(*transaction.add_command());
199
 
  transaction.set_end_timestamp(getNanoTimestamp());
200
 
 
201
 
  writeTransaction(file, transaction);
202
 
 
 
328
  initTransactionContext(transaction);
 
329
  doSimpleInsert(transaction);
 
330
  writeTransaction(coded_output, transaction);
203
331
  transaction.Clear();
204
332
 
205
333
  /* Write a DELETE and an UPDATE in one transaction */
206
 
  transaction_id++;
207
 
  transaction.set_start_timestamp(getNanoTimestamp());
208
 
  writeStartTransaction(*transaction.add_command());
209
 
  writeDeleteWithPK(*transaction.add_command());
210
 
  writeUpdateWithPK(*transaction.add_command());
211
 
  writeCommit(*transaction.add_command());
212
 
  transaction.set_end_timestamp(getNanoTimestamp());
213
 
 
214
 
  writeTransaction(file, transaction);
215
 
 
216
 
  close(file);
 
334
  initTransactionContext(transaction);
 
335
  doSimpleDelete(transaction);
 
336
  doSimpleUpdate(transaction);
 
337
  writeTransaction(coded_output, transaction);
 
338
  transaction.Clear();
 
339
 
 
340
  /* Test an INSERT into non-varchar columns */
 
341
  initTransactionContext(transaction);
 
342
  doNonVarcharInsert(transaction);
 
343
  writeTransaction(coded_output, transaction);
 
344
  transaction.Clear();
 
345
 
 
346
  /* Write an UPDATE which affects >1 row */
 
347
  initTransactionContext(transaction);
 
348
  doMultiKeyUpdate(transaction);
 
349
  writeTransaction(coded_output, transaction);
 
350
  transaction.Clear();
 
351
 
 
352
  delete coded_output;
 
353
  delete raw_output;
217
354
 
218
355
  return 0;
219
356
}