~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/command_reader.cc

  • Committer: Brian Aker
  • Date: 2009-08-18 07:19:56 UTC
  • mfrom: (1116.1.3 stewart)
  • mto: This revision was merged to the branch mainline in revision 1118.
  • Revision ID: brian@gaz-20090818071956-nfpoe9rp3i7p50kx
Merge my branch from Stewart into one branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include <drizzled/global.h>
 
2
#include <sys/types.h>
 
3
#include <sys/stat.h>
 
4
#include <fcntl.h>
 
5
#include <iostream>
 
6
#include <fstream>
 
7
#include <string>
 
8
#include <unistd.h>
 
9
#include <drizzled/message/replication.pb.h>
 
10
 
 
11
#include "drizzled/korr.h"
 
12
 
 
13
using namespace std;
 
14
using namespace drizzled;
 
15
 
 
16
/**
 
17
 * @file Example application for reading change records (Command messages)
 
18
 *
 
19
 * @note
 
20
 *
 
21
 * This program is used in the serial_event_log test suite to verify
 
22
 * the log written by that plugin.
 
23
 */
 
24
 
 
25
static void printInsert(const message::Command &container,
 
26
                        const message::InsertRecord &record)
 
27
{
 
28
 
 
29
  cout << "INSERT INTO `" << container.schema() << "`.`" << container.table() << "` (";
 
30
  
 
31
  assert(record.insert_field_size() > 0);
 
32
  assert(record.insert_value_size() > 0);
 
33
  assert(record.insert_value_size() % record.insert_field_size() == 0);
 
34
 
 
35
  int32_t num_fields= record.insert_field_size();
 
36
 
 
37
  int32_t x;
 
38
  for (x= 0; x < num_fields; x++)
 
39
  {
 
40
    if (x != 0)
 
41
      cout << ", ";
 
42
 
 
43
    const message::Table::Field f= record.insert_field(x);
 
44
 
 
45
    cout << "`" << f.name() << "`";
 
46
  }
 
47
 
 
48
  cout << ") VALUES ";
 
49
 
 
50
  /* 
 
51
   * There may be an INSERT VALUES (),() type statement.  We know the
 
52
   * number of records is equal to the field_values array size divided
 
53
   * by the number of fields.
 
54
   *
 
55
   * So, we do an inner and an outer loop.  Outer loop is on the number
 
56
   * of records and the inner loop on the number of fields.  In this way, 
 
57
   * we know that record.field_values(outer_loop * num_fields) + inner_loop))
 
58
   * always gives us our correct field value.
 
59
   */
 
60
  int32_t num_records= (record.insert_value_size() / num_fields);
 
61
  int32_t y;
 
62
  for (x= 0; x < num_records; x++)
 
63
  {
 
64
    if (x != 0)
 
65
      cout << ", ";
 
66
 
 
67
    cout << "(";
 
68
    for (y= 0; y < num_fields; y++)
 
69
    {
 
70
      if (y != 0)
 
71
        cout << ", ";
 
72
 
 
73
      cout << "\"" << record.insert_value((x * num_fields) + y) << "\"";
 
74
    }
 
75
    cout << ")";
 
76
  }
 
77
 
 
78
  cout << ";";
 
79
}
 
80
 
 
81
static void printDeleteWithPK(const message::Command &container,
 
82
                              const message::DeleteRecord &record)
 
83
{
 
84
  cout << "DELETE FROM `" << container.schema() << "`.`" << container.table() << "`";
 
85
  
 
86
  assert(record.where_field_size() > 0);
 
87
  assert(record.where_value_size() == record.where_field_size());
 
88
 
 
89
  int32_t num_where_fields= record.where_field_size();
 
90
  /* 
 
91
   * Make sure we catch anywhere we're not aligning the fields with
 
92
   * the field_values arrays...
 
93
   */
 
94
  assert(num_where_fields == record.where_value_size());
 
95
 
 
96
  cout << " WHERE ";
 
97
  int32_t x;
 
98
  for (x= 0; x < num_where_fields; x++)
 
99
  {
 
100
    if (x != 0)
 
101
      cout << " AND "; /* Always AND condition with a multi-column PK */
 
102
 
 
103
    const message::Table::Field f= record.where_field(x);
 
104
 
 
105
    /* Always equality conditions */
 
106
    cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
 
107
  }
 
108
 
 
109
  cout << ";";
 
110
}
 
111
 
 
112
static void printUpdateWithPK(const message::Command &container,
 
113
                              const message::UpdateRecord &record)
 
114
{
 
115
  int32_t num_update_fields= record.update_field_size();
 
116
  int32_t x;
 
117
  
 
118
  assert(record.update_field_size() > 0);
 
119
  assert(record.where_field_size() > 0);
 
120
  assert(record.where_value_size() == record.where_field_size());
 
121
 
 
122
  cout << "UPDATE `" << container.schema() << "`.`" << container.table() << "` SET ";
 
123
 
 
124
  for (x= 0;x < num_update_fields; x++)
 
125
  {
 
126
    message::Table::Field f= record.update_field(x);
 
127
    
 
128
    if (x != 0)
 
129
      cout << ", ";
 
130
 
 
131
    cout << "`" << f.name() << "` = \"" << record.after_value(x) << "\"";
 
132
  }
 
133
 
 
134
  int32_t num_where_fields= record.where_field_size();
 
135
  /* 
 
136
   * Make sure we catch anywhere we're not aligning the fields with
 
137
   * the field_values arrays...
 
138
   */
 
139
  assert(num_where_fields == record.where_value_size());
 
140
 
 
141
  cout << " WHERE ";
 
142
  for (x= 0;x < num_where_fields; x++)
 
143
  {
 
144
    if (x != 0)
 
145
      cout << " AND "; /* Always AND condition with a multi-column PK */
 
146
 
 
147
    const message::Table::Field f= record.where_field(x);
 
148
 
 
149
    /* Always equality conditions */
 
150
    cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
 
151
  }
 
152
  cout << ";";
 
153
}
 
154
 
 
155
static void printCommand(const message::Command &command)
 
156
{
 
157
  cout << "/* Timestamp: " << command.timestamp() << " */"<< endl;
 
158
 
 
159
  message::TransactionContext trx= command.transaction_context();
 
160
 
 
161
  cout << "/* SID: " << trx.server_id() << " XID: " << trx.transaction_id() << " */ ";
 
162
 
 
163
  switch (command.type())
 
164
  {
 
165
    case message::Command::START_TRANSACTION:
 
166
      cout << "START TRANSACTION;";
 
167
      break;
 
168
    case message::Command::COMMIT:
 
169
      cout << "COMMIT;";
 
170
      break;
 
171
    case message::Command::ROLLBACK:
 
172
      cout << "ROLLBACK;";
 
173
      break;
 
174
    case message::Command::INSERT:
 
175
    {
 
176
      printInsert(command, command.insert_record());
 
177
      break;
 
178
    }
 
179
    case message::Command::DELETE:
 
180
    {
 
181
      printDeleteWithPK(command, command.delete_record());
 
182
      break;
 
183
    }
 
184
    case message::Command::UPDATE:
 
185
    {
 
186
      printUpdateWithPK(command, command.update_record());
 
187
      break;
 
188
    }
 
189
    case message::Command::RAW_SQL:
 
190
    {
 
191
      std::string sql= command.sql();
 
192
      /* Replace \n with spaces */
 
193
      const std::string newline= "\n";
 
194
      while (sql.find(newline) != std::string::npos)
 
195
        sql.replace(sql.find(newline), 1, " ");
 
196
 
 
197
      cout << sql << ";";
 
198
      break;
 
199
    }
 
200
    default:
 
201
      cout << "Received an unknown Command type: " << (int32_t) command.type();
 
202
  }
 
203
  cout << endl;
 
204
}
 
205
 
 
206
int main(int argc, char* argv[])
 
207
{
 
208
  GOOGLE_PROTOBUF_VERIFY_VERSION;
 
209
  int file;
 
210
 
 
211
  if (argc != 2)
 
212
  {
 
213
    cerr << "Usage:  " << argv[0] << " COMMAND_LOG" << endl;
 
214
    return -1;
 
215
  }
 
216
 
 
217
  message::Command command;
 
218
 
 
219
  if ((file= open(argv[1], O_RDONLY)) == -1)
 
220
  {
 
221
    cerr << "Cannot open file: " << argv[1] << endl;
 
222
  }
 
223
 
 
224
  char *buffer= NULL;
 
225
  char *temp_buffer= NULL;
 
226
  uint64_t previous_length= 0;
 
227
  ssize_t read_bytes= 0;
 
228
  uint64_t length= 0;
 
229
 
 
230
  /* We use korr.h macros when writing and must do the same when reading... */
 
231
  unsigned char coded_length[8];
 
232
 
 
233
  /* Read in the length of the command */
 
234
  while ((read_bytes= read(file, coded_length, sizeof(uint64_t))) != 0)
 
235
  {
 
236
    if (read_bytes == -1)
 
237
    {
 
238
      cerr << "Failed to read initial length." << endl;
 
239
      exit(1);
 
240
    }
 
241
    length= uint8korr(coded_length);
 
242
 
 
243
    if (length > SIZE_MAX)
 
244
    {
 
245
      cerr << "Attempted to read record bigger than SIZE_MAX" << endl;
 
246
      exit(1);
 
247
    }
 
248
 
 
249
    if (buffer == NULL)
 
250
    {
 
251
      /* 
 
252
       * First time around...just malloc the length.  This block gets rid
 
253
       * of a GCC warning about uninitialized temp_buffer.
 
254
       */
 
255
      temp_buffer= (char *) malloc((size_t) length);
 
256
    }
 
257
    /* No need to allocate if we have a buffer big enough... */
 
258
    else if (length > previous_length)
 
259
    {
 
260
      temp_buffer= (char *) realloc(buffer, (size_t) length);
 
261
    }
 
262
 
 
263
    if (temp_buffer == NULL)
 
264
    {
 
265
      cerr << "Memory allocation failure trying to allocate " << length << " bytes."  << endl;
 
266
      exit(1);
 
267
    }
 
268
    else
 
269
      buffer= temp_buffer;
 
270
 
 
271
    /* Read the Command */
 
272
    read_bytes= read(file, buffer, (size_t) length);
 
273
    if ((read_bytes != (ssize_t) length))
 
274
    {
 
275
      cerr << "Could not read entire transaction. Read " << read_bytes << " bytes instead of " << length << " bytes." << endl;
 
276
      exit(1);
 
277
    }
 
278
 
 
279
    if (! command.ParseFromArray(buffer, (int) length))
 
280
    {
 
281
      cerr << "Unable to parse command. Got error: " << command.InitializationErrorString() << endl;
 
282
      if (buffer != NULL)
 
283
        cerr << "BUFFER: " << buffer << endl;
 
284
      exit(1);
 
285
    }
 
286
 
 
287
    /* Print the command */
 
288
    printCommand(command);
 
289
 
 
290
    /* Reset our length check */
 
291
    previous_length= length;
 
292
    memset(coded_length, 0, sizeof(coded_length));
 
293
  }
 
294
  if (buffer)
 
295
    free(buffer);
 
296
  return 0;
 
297
}