~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/transaction_writer.cc

  • Committer: Jay Pipes
  • Date: 2009-04-10 17:06:58 UTC
  • mto: (971.1.47 mordred)
  • mto: This revision was merged to the branch mainline in revision 990.
  • Revision ID: jpipes@serialcoder-20090410170658-d3azdnas1fn8v68l
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
plugin.  New transaction_services.cc class implementation of the API for
converting between internal formats and GPB Command Messages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
#include <sys/stat.h>
6
6
#include <fcntl.h>
7
7
#include <string>
8
 
#include <uuid/uuid.h>
9
 
 
10
 
#include <drizzled/message/replication_event.pb.h>
 
8
 
 
9
#include <sys/time.h>
 
10
 
 
11
#include <drizzled/message/transaction.pb.h>
 
12
 
 
13
/** 
 
14
 * @file Example script for writing transactions to a log file.
 
15
 */
11
16
 
12
17
using namespace std;
13
18
using namespace drizzled::message;
14
19
 
15
 
static uint64_t query_id= 0;
16
 
char transaction_id[37];
17
 
 
18
 
/*
19
 
  Example script for reader a Drizzle master replication list.
20
 
*/
21
 
 
22
 
void write_ddl(::drizzled::message::Event *record, const char *sql)
23
 
{
24
 
  uuid_t uu;
25
 
 
26
 
  uuid_generate_time(uu);
27
 
  uuid_unparse(uu, transaction_id);
28
 
 
29
 
  record->set_type(Event::DDL);
30
 
  record->set_autocommit(true);
31
 
  record->set_server_id("localhost");
32
 
  record->set_query_id(query_id++);
33
 
  record->set_transaction_id(transaction_id);
34
 
  record->set_schema("test");
35
 
  record->set_sql(sql);
36
 
}
37
 
 
38
 
void write_insert(::drizzled::message::Event *record, const char *trx)
39
 
{
40
 
  Event::Value *value;
41
 
 
42
 
  record->set_type(Event::INSERT);
43
 
  record->set_autocommit(true);
44
 
  record->set_server_id("localhost");
45
 
  record->set_query_id(query_id++);
46
 
  record->set_transaction_id(trx);
47
 
  record->set_schema("test");
48
 
  record->set_table("t1");
49
 
  record->set_sql("INSERT INTO t1 (a) VALUES (1) (2)");
50
 
 
51
 
  /* Add Field Names */
52
 
  record->add_field_names("a");
53
 
 
54
 
  /* Add values (first row) */
55
 
  value= record->add_values();
56
 
  value->add_val("1");
57
 
 
58
 
  /* Add values (second row) */
59
 
  value= record->add_values();
60
 
  value->add_val("2");
61
 
}
62
 
 
63
 
void write_delete(::drizzled::message::Event *record, const char *trx)
64
 
{
65
 
  Event::Value *value;
66
 
 
67
 
  record->set_type(Event::DELETE);
68
 
  record->set_autocommit(true);
69
 
  record->set_server_id("localhost");
70
 
  record->set_query_id(query_id++);
71
 
  record->set_transaction_id(trx);
72
 
  record->set_schema("test");
73
 
  record->set_table("t1");
74
 
  record->set_sql("DELETE FROM t1 WHERE a IN (1, 2)");
75
 
 
76
 
  /* Add Field Names */
77
 
  record->set_primary_key("a");
78
 
 
79
 
  /* Add values for IN() */
80
 
  value= record->add_values();
81
 
  value->add_val("1");
82
 
  value->add_val("2");
83
 
}
84
 
 
85
 
void write_update(::drizzled::message::Event *record, const char *trx)
86
 
{
87
 
  Event::Value *value;
88
 
 
89
 
  record->set_type(Event::UPDATE);
90
 
  record->set_autocommit(true);
91
 
  record->set_server_id("localhost");
92
 
  record->set_query_id(query_id++);
93
 
  record->set_transaction_id(trx);
94
 
  record->set_schema("test");
95
 
  record->set_table("t1");
96
 
  record->set_sql("UPDATE t1 SET a=5 WHERE a = 1 ");
97
 
  record->set_primary_key("a");
98
 
 
99
 
  /* Add Field Names */
100
 
  record->add_field_names("a");
101
 
 
102
 
  /* Add values (first row) */
103
 
  value= record->add_values();
104
 
  value->add_val("1"); // The first value is always the primary key comparison value
105
 
  value->add_val("5");
106
 
 
107
 
  /* Add values (second row) */
108
 
  value= record->add_values();
109
 
  value->add_val("2");
110
 
  value->add_val("6");
111
 
}
112
 
 
113
 
void write_to_disk(int file, ::drizzled::message::EventList *list)
 
20
static uint32_t server_id= 1;
 
21
static uint64_t transaction_id= 0;
 
22
 
 
23
uint64_t getNanoTimestamp()
 
24
{
 
25
#ifdef HAVE_CLOCK_GETTIME
 
26
  struct timespec tp;
 
27
  clock_gettime(CLOCK_REALTIME, &tp);
 
28
  return (uint64_t) tp.tv_sec * 10000000
 
29
       + (uint64_t) tp.tv_nsec;
 
30
#else
 
31
  struct timeval tv;
 
32
  gettimeofday(&tv,NULL);
 
33
  return (uint64_t) tv.tv_sec * 10000000
 
34
       + (uint64_t) tv.tv_usec * 1000;
 
35
#endif
 
36
}
 
37
 
 
38
void writeCommit(drizzled::message::Command &record)
 
39
{
 
40
  record.set_type(Command::COMMIT);
 
41
  record.set_timestamp(getNanoTimestamp());
 
42
 
 
43
  drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
 
44
  trx->set_server_id(server_id);
 
45
  trx->set_transaction_id(transaction_id);
 
46
}
 
47
 
 
48
void writeRollback(drizzled::message::Command &record)
 
49
{
 
50
  record.set_type(Command::ROLLBACK);
 
51
  record.set_timestamp(getNanoTimestamp());
 
52
 
 
53
  drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
 
54
  trx->set_server_id(server_id);
 
55
  trx->set_transaction_id(transaction_id);
 
56
}
 
57
 
 
58
void writeStartTransaction(drizzled::message::Command &record)
 
59
{
 
60
  record.set_type(Command::START_TRANSACTION);
 
61
  record.set_timestamp(getNanoTimestamp());
 
62
 
 
63
  drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
 
64
  trx->set_server_id(server_id);
 
65
  trx->set_transaction_id(transaction_id);
 
66
}
 
67
 
 
68
void writeInsert(drizzled::message::Command &record)
 
69
{
 
70
  record.set_type(Command::INSERT);
 
71
  record.set_sql("INSERT INTO t1 (a) VALUES (1) (2)");
 
72
  record.set_timestamp(getNanoTimestamp());
 
73
  record.set_schema("test");
 
74
  record.set_table("t1");
 
75
 
 
76
  drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
 
77
  trx->set_server_id(server_id);
 
78
  trx->set_transaction_id(transaction_id);
 
79
 
 
80
  drizzled::message::InsertRecord *irecord= record.mutable_insert_record();
 
81
 
 
82
  /* Add Fields and Values... */
 
83
 
 
84
  Table::Field *field= irecord->add_insert_field();
 
85
  field->set_name("a");
 
86
  field->set_type(drizzled::message::Table::Field::VARCHAR);
 
87
 
 
88
  irecord->add_insert_value("1");
 
89
  irecord->add_insert_value("2");
 
90
}
 
91
 
 
92
void writeDeleteWithPK(drizzled::message::Command &record)
 
93
{
 
94
  record.set_type(Command::DELETE);
 
95
  record.set_sql("DELETE FROM t1 WHERE a = 1");
 
96
  record.set_timestamp(getNanoTimestamp());
 
97
  record.set_schema("test");
 
98
  record.set_table("t1");
 
99
 
 
100
  drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
 
101
  trx->set_server_id(server_id);
 
102
  trx->set_transaction_id(transaction_id);
 
103
 
 
104
  drizzled::message::DeleteRecord *drecord= record.mutable_delete_record();
 
105
 
 
106
  Table::Field *field= drecord->add_where_field();
 
107
  field->set_name("a");
 
108
  field->set_type(drizzled::message::Table::Field::VARCHAR);
 
109
 
 
110
  drecord->add_where_value("1");
 
111
}
 
112
 
 
113
void writeUpdateWithPK(drizzled::message::Command &record)
 
114
{
 
115
  record.set_type(Command::UPDATE);
 
116
  record.set_sql("UPDATE t1 SET a = 5 WHERE a = 1;");
 
117
  record.set_timestamp(getNanoTimestamp());
 
118
  record.set_schema("test");
 
119
  record.set_table("t1");
 
120
 
 
121
  drizzled::message::TransactionContext *trx= record.mutable_transaction_context();
 
122
  trx->set_server_id(server_id);
 
123
  trx->set_transaction_id(transaction_id);
 
124
 
 
125
  drizzled::message::UpdateRecord *urecord= record.mutable_update_record();
 
126
 
 
127
  Table::Field *field;
 
128
  
 
129
  field= urecord->add_update_field();
 
130
  field->set_name("a");
 
131
  field->set_type(drizzled::message::Table::Field::VARCHAR);
 
132
 
 
133
  urecord->add_after_value("5");
 
134
 
 
135
  field= urecord->add_where_field();
 
136
  field->set_name("a");
 
137
  field->set_type(drizzled::message::Table::Field::VARCHAR);
 
138
 
 
139
  urecord->add_where_value("1");
 
140
}
 
141
 
 
142
void writeTransaction(int file, drizzled::message::Transaction &transaction)
114
143
{
115
144
  std::string buffer;
116
145
  size_t length;
117
146
  size_t written;
118
147
 
119
 
  list->SerializePartialToString(&buffer);
 
148
  drizzled::message::TransactionContext *trx= transaction.mutable_transaction_context();
 
149
  trx->set_server_id(server_id);
 
150
  trx->set_transaction_id(transaction_id);
 
151
 
 
152
  transaction.SerializeToString(&buffer);
120
153
 
121
154
  length= buffer.length();
122
155
 
123
 
  cout << "Writing record of " << length << "." << endl;
 
156
  cout << "Writing transaction of " << length << " length." << endl;
124
157
 
125
158
  if ((written= write(file, &length, sizeof(uint64_t))) != sizeof(uint64_t))
126
159
  {
135
168
  }
136
169
}
137
170
 
138
 
 
139
171
int main(int argc, char* argv[])
140
172
{
141
173
  GOOGLE_PROTOBUF_VERIFY_VERSION;
143
175
 
144
176
  if (argc != 2) 
145
177
  {
146
 
    cerr << "Usage:  " << argv[0] << " REPLICATION_EVENT_LOG " << endl;
 
178
    cerr << "Usage:  " << argv[0] << " TRANSACTION_LOG" << endl;
147
179
    return -1;
148
180
  }
149
181
 
153
185
   exit(0);
154
186
  }
155
187
 
156
 
  EventList list;
157
 
 
158
 
  /* Write first set of records */
159
 
  write_ddl(list.add_event(), "CREATE TABLE A (a int) ENGINE=innodb");
160
 
  write_insert(list.add_event(), transaction_id);
161
 
 
162
 
  write_to_disk(file, &list);
163
 
 
164
 
  /* Write Second set of records */
165
 
  write_ddl(list.add_event(), "CREATE TABLE A (a int) ENGINE=innodb");
166
 
  write_delete(list.add_event(), transaction_id);
167
 
  write_update(list.add_event(), transaction_id);
168
 
 
169
 
  write_to_disk(file, &list);
 
188
  /* Write a series of statements which test each type of record class */
 
189
  transaction_id++;
 
190
 
 
191
  /* Simple INSERT statement */
 
192
  Transaction transaction;
 
193
  transaction.set_start_timestamp(getNanoTimestamp());
 
194
  writeStartTransaction(*transaction.add_command());
 
195
  writeInsert(*transaction.add_command());
 
196
  writeCommit(*transaction.add_command());
 
197
  transaction.set_end_timestamp(getNanoTimestamp());
 
198
 
 
199
  writeTransaction(file, transaction);
 
200
 
 
201
  transaction.Clear();
 
202
 
 
203
  /* Write a DELETE and an UPDATE in one transaction */
 
204
  transaction_id++;
 
205
  transaction.set_start_timestamp(getNanoTimestamp());
 
206
  writeStartTransaction(*transaction.add_command());
 
207
  writeDeleteWithPK(*transaction.add_command());
 
208
  writeUpdateWithPK(*transaction.add_command());
 
209
  writeCommit(*transaction.add_command());
 
210
  transaction.set_end_timestamp(getNanoTimestamp());
 
211
 
 
212
  writeTransaction(file, transaction);
170
213
 
171
214
  close(file);
172
215