~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/command_reader.cc

Renamed namespace slot to namespace service.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include <drizzled/server_includes.h>
 
2
#include <drizzled/gettext.h>
 
3
#include <sys/types.h>
 
4
#include <sys/stat.h>
 
5
#include <fcntl.h>
 
6
#include <iostream>
 
7
#include <fstream>
 
8
#include <string>
 
9
#include <unistd.h>
 
10
#include <cstdio>
 
11
#include <drizzled/message/replication.pb.h>
 
12
 
 
13
#include "drizzled/korr.h"
 
14
 
 
15
using namespace std;
 
16
using namespace drizzled;
 
17
 
 
18
/**
 
19
 * @file Example application for reading change records (Command messages)
 
20
 *
 
21
 * @note
 
22
 *
 
23
 * This program is used in the serial_event_log test suite to verify
 
24
 * the log written by that plugin.
 
25
 */
 
26
 
 
27
static void printInsert(const message::Command &container,
 
28
                        const message::InsertRecord &record)
 
29
{
 
30
 
 
31
  cout << "INSERT INTO `" << container.schema() << "`.`" << container.table() << "` (";
 
32
  
 
33
  assert(record.insert_field_size() > 0);
 
34
  assert(record.insert_value_size() > 0);
 
35
  assert(record.insert_value_size() % record.insert_field_size() == 0);
 
36
 
 
37
  int32_t num_fields= record.insert_field_size();
 
38
 
 
39
  int32_t x;
 
40
  for (x= 0; x < num_fields; x++)
 
41
  {
 
42
    if (x != 0)
 
43
      cout << ", ";
 
44
 
 
45
    const message::Table::Field f= record.insert_field(x);
 
46
 
 
47
    cout << "`" << f.name() << "`";
 
48
  }
 
49
 
 
50
  cout << ") VALUES ";
 
51
 
 
52
  /* 
 
53
   * There may be an INSERT VALUES (),() type statement.  We know the
 
54
   * number of records is equal to the field_values array size divided
 
55
   * by the number of fields.
 
56
   *
 
57
   * So, we do an inner and an outer loop.  Outer loop is on the number
 
58
   * of records and the inner loop on the number of fields.  In this way, 
 
59
   * we know that record.field_values(outer_loop * num_fields) + inner_loop))
 
60
   * always gives us our correct field value.
 
61
   */
 
62
  int32_t num_records= (record.insert_value_size() / num_fields);
 
63
  int32_t y;
 
64
  for (x= 0; x < num_records; x++)
 
65
  {
 
66
    if (x != 0)
 
67
      cout << ", ";
 
68
 
 
69
    cout << "(";
 
70
    for (y= 0; y < num_fields; y++)
 
71
    {
 
72
      if (y != 0)
 
73
        cout << ", ";
 
74
 
 
75
      cout << "\"" << record.insert_value((x * num_fields) + y) << "\"";
 
76
    }
 
77
    cout << ")";
 
78
  }
 
79
 
 
80
  cout << ";";
 
81
}
 
82
 
 
83
static void printDeleteWithPK(const message::Command &container,
 
84
                              const message::DeleteRecord &record)
 
85
{
 
86
  cout << "DELETE FROM `" << container.schema() << "`.`" << container.table() << "`";
 
87
  
 
88
  assert(record.where_field_size() > 0);
 
89
  assert(record.where_value_size() == record.where_field_size());
 
90
 
 
91
  int32_t num_where_fields= record.where_field_size();
 
92
  /* 
 
93
   * Make sure we catch anywhere we're not aligning the fields with
 
94
   * the field_values arrays...
 
95
   */
 
96
  assert(num_where_fields == record.where_value_size());
 
97
 
 
98
  cout << " WHERE ";
 
99
  int32_t x;
 
100
  for (x= 0; x < num_where_fields; x++)
 
101
  {
 
102
    if (x != 0)
 
103
      cout << " AND "; /* Always AND condition with a multi-column PK */
 
104
 
 
105
    const message::Table::Field f= record.where_field(x);
 
106
 
 
107
    /* Always equality conditions */
 
108
    cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
 
109
  }
 
110
 
 
111
  cout << ";";
 
112
}
 
113
 
 
114
static void printUpdateWithPK(const message::Command &container,
 
115
                              const message::UpdateRecord &record)
 
116
{
 
117
  int32_t num_update_fields= record.update_field_size();
 
118
  int32_t x;
 
119
  
 
120
  assert(record.update_field_size() > 0);
 
121
  assert(record.where_field_size() > 0);
 
122
  assert(record.where_value_size() == record.where_field_size());
 
123
 
 
124
  cout << "UPDATE `" << container.schema() << "`.`" << container.table() << "` SET ";
 
125
 
 
126
  for (x= 0;x < num_update_fields; x++)
 
127
  {
 
128
    message::Table::Field f= record.update_field(x);
 
129
    
 
130
    if (x != 0)
 
131
      cout << ", ";
 
132
 
 
133
    cout << "`" << f.name() << "` = \"" << record.after_value(x) << "\"";
 
134
  }
 
135
 
 
136
  int32_t num_where_fields= record.where_field_size();
 
137
  /* 
 
138
   * Make sure we catch anywhere we're not aligning the fields with
 
139
   * the field_values arrays...
 
140
   */
 
141
  assert(num_where_fields == record.where_value_size());
 
142
 
 
143
  cout << " WHERE ";
 
144
  for (x= 0;x < num_where_fields; x++)
 
145
  {
 
146
    if (x != 0)
 
147
      cout << " AND "; /* Always AND condition with a multi-column PK */
 
148
 
 
149
    const message::Table::Field f= record.where_field(x);
 
150
 
 
151
    /* Always equality conditions */
 
152
    cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
 
153
  }
 
154
  cout << ";";
 
155
}
 
156
 
 
157
static void printCommand(const message::Command &command)
 
158
{
 
159
  cout << "/* Timestamp: " << command.timestamp() << " */"<< endl;
 
160
 
 
161
  message::TransactionContext trx= command.transaction_context();
 
162
 
 
163
  cout << "/* SERVER ID: " << trx.server_id() << " TRX ID: " << trx.transaction_id();
 
164
  
 
165
  if (command.has_session_id())
 
166
    cout << " SESSION ID: " << command.session_id();
 
167
 
 
168
  cout << " */ ";
 
169
 
 
170
  switch (command.type())
 
171
  {
 
172
    case message::Command::START_TRANSACTION:
 
173
      cout << "START TRANSACTION;";
 
174
      break;
 
175
    case message::Command::COMMIT:
 
176
      cout << "COMMIT;";
 
177
      break;
 
178
    case message::Command::ROLLBACK:
 
179
      cout << "ROLLBACK;";
 
180
      break;
 
181
    case message::Command::INSERT:
 
182
    {
 
183
      printInsert(command, command.insert_record());
 
184
      break;
 
185
    }
 
186
    case message::Command::DELETE:
 
187
    {
 
188
      printDeleteWithPK(command, command.delete_record());
 
189
      break;
 
190
    }
 
191
    case message::Command::UPDATE:
 
192
    {
 
193
      printUpdateWithPK(command, command.update_record());
 
194
      break;
 
195
    }
 
196
    case message::Command::RAW_SQL:
 
197
    {
 
198
      std::string sql= command.sql();
 
199
      /* Replace \n with spaces */
 
200
      const std::string newline= "\n";
 
201
      while (sql.find(newline) != std::string::npos)
 
202
        sql.replace(sql.find(newline), 1, " ");
 
203
 
 
204
      cout << sql << ";";
 
205
      break;
 
206
    }
 
207
    default:
 
208
      cout << "Received an unknown Command type: " << (int32_t) command.type();
 
209
  }
 
210
  cout << endl;
 
211
}
 
212
 
 
213
int main(int argc, char* argv[])
 
214
{
 
215
  GOOGLE_PROTOBUF_VERIFY_VERSION;
 
216
  int file;
 
217
 
 
218
  if (argc != 2)
 
219
  {
 
220
    fprintf(stderr, _("Usage: %s COMMAND_LOG\n"), argv[0]);
 
221
    return -1;
 
222
  }
 
223
 
 
224
  message::Command command;
 
225
 
 
226
  file= open(argv[1], O_RDONLY);
 
227
  if (file == -1)
 
228
  {
 
229
    fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
 
230
  }
 
231
 
 
232
  char *buffer= NULL;
 
233
  char *temp_buffer= NULL;
 
234
  uint64_t previous_length= 0;
 
235
  ssize_t read_bytes= 0;
 
236
  uint64_t length= 0;
 
237
  uint32_t checksum= 0;
 
238
 
 
239
  /* We use korr.h macros when writing and must do the same when reading... */
 
240
  unsigned char coded_length[8];
 
241
  unsigned char coded_checksum[4];
 
242
 
 
243
  /* Read in the length of the command */
 
244
  while ((read_bytes= read(file, coded_length, sizeof(uint64_t))) != 0)
 
245
  {
 
246
    if (read_bytes == -1)
 
247
    {
 
248
      fprintf(stderr, _("Failed to read initial length header\n"));
 
249
      exit(1);
 
250
    }
 
251
    length= uint8korr(coded_length);
 
252
 
 
253
    if (length > SIZE_MAX)
 
254
    {
 
255
      fprintf(stderr, _("Attempted to read record bigger than SIZE_MAX\n"));
 
256
      exit(1);
 
257
    }
 
258
 
 
259
    if (buffer == NULL)
 
260
    {
 
261
      /* 
 
262
       * First time around...just malloc the length.  This block gets rid
 
263
       * of a GCC warning about uninitialized temp_buffer.
 
264
       */
 
265
      temp_buffer= (char *) malloc((size_t) length);
 
266
    }
 
267
    /* No need to allocate if we have a buffer big enough... */
 
268
    else if (length > previous_length)
 
269
    {
 
270
      temp_buffer= (char *) realloc(buffer, (size_t) length);
 
271
    }
 
272
 
 
273
    if (temp_buffer == NULL)
 
274
    {
 
275
      fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"), length);
 
276
      exit(1);
 
277
    }
 
278
    else
 
279
      buffer= temp_buffer;
 
280
 
 
281
    /* Read the Command */
 
282
    read_bytes= read(file, buffer, (size_t) length);
 
283
    if ((read_bytes != (ssize_t) length))
 
284
    {
 
285
      fprintf(stderr, _("Could not read entire transaction. Read %" PRIu64 " bytes instead of %" PRIu64 " bytes.\n"), (uint64_t) read_bytes, (uint64_t) length);
 
286
      exit(1);
 
287
    }
 
288
 
 
289
    if (! command.ParseFromArray(buffer, (int) length))
 
290
    {
 
291
      fprintf(stderr, _("Unable to parse command. Got error: %s.\n"), command.InitializationErrorString().c_str());
 
292
      if (buffer != NULL)
 
293
        fprintf(stderr, _("BUFFER: %s\n"), buffer);
 
294
      exit(1);
 
295
    }
 
296
 
 
297
    /* Read the checksum */
 
298
    read_bytes= read(file, coded_checksum, sizeof(uint32_t));
 
299
    if ((read_bytes != (ssize_t) sizeof(uint32_t)))
 
300
    {
 
301
      fprintf(stderr, _("Could not read entire checksum. Read %" PRIu64 " bytes instead of 4 bytes.\n"), (uint64_t) read_bytes);
 
302
      exit(1);
 
303
    }
 
304
    checksum= uint4korr(coded_checksum);
 
305
 
 
306
    if (checksum != 0)
 
307
    {
 
308
      /* @TODO checksumming.. */
 
309
    }
 
310
 
 
311
    /* Print the command */
 
312
    printCommand(command);
 
313
 
 
314
    /* Reset our length check */
 
315
    previous_length= length;
 
316
    memset(coded_length, 0, sizeof(coded_length));
 
317
  }
 
318
  if (buffer)
 
319
    free(buffer);
 
320
  return 0;
 
321
}