~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/command_reader.cc

* New serial event log plugin

Implemented in /plugin/serial_event_log/.

Adds a very simple serialized event log to the server.  This simple
applier takes Command messages and writes them to a log file as it
received them.  Nothing complex for right now.

* New default replicator plugin

This plugin is extremely simple and merely passes a received Command
message on to all registered Appliers (of which the new serial event
log is one of those appliers)

The plugin is disabled by default.  It can be enabled on startup
with --default-replicator-enable.

* New command reader test program

There is a new test program in /drizzled/message/ which is similar to
the transaction_reader program but can read single Command messages, 
and not Transaction messages which contain a vector of Command messages.

* New serial_event_log test suite

The test case is very simple right now, but serves to show how plugin
test suites can be written, and how test programs in the server source
tree can be used in the drizzletest program language.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include <drizzled/global.h>
 
2
#include <sys/types.h>
 
3
#include <sys/stat.h>
 
4
#include <fcntl.h>
 
5
#include <iostream>
 
6
#include <fstream>
 
7
#include <string>
 
8
#include <unistd.h>
 
9
#include <drizzled/message/transaction.pb.h>
 
10
 
 
11
using namespace std;
 
12
using namespace drizzled::message;
 
13
 
 
14
/**
 
15
 * @file Example application for reading change records (Command messages)
 
16
 *
 
17
 * @note
 
18
 *
 
19
 * This program is used in the serial_event_log test suite to verify
 
20
 * the log written by that plugin.
 
21
 */
 
22
 
 
23
void printInsert(const drizzled::message::Command &container, const drizzled::message::InsertRecord &record)
 
24
{
 
25
 
 
26
  cout << "INSERT INTO `" << container.schema() << "`.`" << container.table() << "` (";
 
27
 
 
28
  int32_t num_fields= record.insert_field_size();
 
29
 
 
30
  int32_t x;
 
31
  for (x= 0; x < num_fields; x++)
 
32
  {
 
33
    if (x != 0)
 
34
      cout << ", ";
 
35
 
 
36
    const Table::Field f= record.insert_field(x);
 
37
 
 
38
    cout << "`" << f.name() << "`";
 
39
  }
 
40
 
 
41
  cout << ") VALUES ";
 
42
 
 
43
  /* 
 
44
   * There may be an INSERT VALUES (),() type statement.  We know the
 
45
   * number of records is equal to the field_values array size divided
 
46
   * by the number of fields.
 
47
   *
 
48
   * So, we do an inner and an outer loop.  Outer loop is on the number
 
49
   * of records and the inner loop on the number of fields.  In this way, 
 
50
   * we know that record.field_values(outer_loop * num_fields) + inner_loop))
 
51
   * always gives us our correct field value.
 
52
   */
 
53
  int32_t num_records= (record.insert_value_size() / num_fields);
 
54
  int32_t y;
 
55
  for (x= 0; x < num_records; x++)
 
56
  {
 
57
    if (x != 0)
 
58
      cout << ", ";
 
59
 
 
60
    cout << "(";
 
61
    for (y= 0; y < num_fields; y++)
 
62
    {
 
63
      if (y != 0)
 
64
        cout << ", ";
 
65
 
 
66
      cout << "\"" << record.insert_value((x * num_fields) + y) << "\"";
 
67
    }
 
68
    cout << ")";
 
69
  }
 
70
 
 
71
  cout << ";";
 
72
}
 
73
 
 
74
void printDeleteWithPK(const drizzled::message::Command &container, const drizzled::message::DeleteRecord &record)
 
75
{
 
76
  cout << "DELETE FROM `" << container.schema() << "`.`" << container.table() << "`";
 
77
 
 
78
  int32_t num_where_fields= record.where_field_size();
 
79
  /* 
 
80
   * Make sure we catch anywhere we're not aligning the fields with
 
81
   * the field_values arrays...
 
82
   */
 
83
  assert(num_where_fields == record.where_value_size());
 
84
 
 
85
  cout << " WHERE ";
 
86
  int32_t x;
 
87
  for (x= 0; x < num_where_fields; x++)
 
88
  {
 
89
    if (x != 0)
 
90
      cout << " AND "; /* Always AND condition with a multi-column PK */
 
91
 
 
92
    const Table::Field f= record.where_field(x);
 
93
 
 
94
    /* Always equality conditions */
 
95
    cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
 
96
  }
 
97
}
 
98
 
 
99
void printUpdateWithPK(const drizzled::message::Command &container, const drizzled::message::UpdateRecord &record)
 
100
{
 
101
  int32_t num_update_fields= record.update_field_size();
 
102
  int32_t x;
 
103
 
 
104
  cout << "UPDATE `" << container.schema() << "`.`" << container.table() << "` SET ";
 
105
 
 
106
  for (x= 0;x < num_update_fields; x++)
 
107
  {
 
108
    Table::Field f= record.update_field(x);
 
109
    
 
110
    if (x != 0)
 
111
      cout << ", ";
 
112
 
 
113
    cout << "`" << f.name() << "` = \"" << record.after_value(x) << "\"";
 
114
  }
 
115
 
 
116
  int32_t num_where_fields= record.where_field_size();
 
117
  /* 
 
118
   * Make sure we catch anywhere we're not aligning the fields with
 
119
   * the field_values arrays...
 
120
   */
 
121
  assert(num_where_fields == record.where_value_size());
 
122
 
 
123
  cout << " WHERE ";
 
124
  for (x= 0;x < num_where_fields; x++)
 
125
  {
 
126
    if (x != 0)
 
127
      cout << " AND "; /* Always AND condition with a multi-column PK */
 
128
 
 
129
    const Table::Field f= record.where_field(x);
 
130
 
 
131
    /* Always equality conditions */
 
132
    cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
 
133
  }
 
134
}
 
135
 
 
136
void printCommand(const drizzled::message::Command &command)
 
137
{
 
138
  cout << "/* Timestamp: " << command.timestamp() << " */"<< endl;
 
139
 
 
140
  drizzled::message::TransactionContext trx= command.transaction_context();
 
141
 
 
142
  cout << "/* SID: " << trx.server_id() << " XID: " << trx.transaction_id() << " */ ";
 
143
 
 
144
  switch (command.type())
 
145
  {
 
146
    case Command::START_TRANSACTION:
 
147
      cout << "START TRANSACTION;";
 
148
      break;
 
149
    case Command::COMMIT:
 
150
      cout << "COMMIT;";
 
151
      break;
 
152
    case Command::ROLLBACK:
 
153
      cout << "ROLLBACK;";
 
154
      break;
 
155
    case Command::INSERT:
 
156
    {
 
157
      printInsert(command, command.insert_record());
 
158
      break;
 
159
    }
 
160
    case Command::DELETE:
 
161
    {
 
162
      printDeleteWithPK(command, command.delete_record());
 
163
      break;
 
164
    }
 
165
    case Command::UPDATE:
 
166
    {
 
167
      printUpdateWithPK(command, command.update_record());
 
168
      break;
 
169
    }
 
170
    case Command::RAW_SQL:
 
171
    {
 
172
      std::string sql= command.sql();
 
173
      /* Replace \n with spaces */
 
174
      const std::string newline= "\n";
 
175
      while (sql.find(newline) != std::string::npos)
 
176
        sql.replace(sql.find(newline), 1, " ");
 
177
 
 
178
      cout << sql << ";";
 
179
      break;
 
180
    }
 
181
    default:
 
182
      cout << "Received an unknown Command type: " << (int32_t) command.type();
 
183
  }
 
184
  cout << endl;
 
185
}
 
186
 
 
187
int main(int argc, char* argv[])
 
188
{
 
189
  GOOGLE_PROTOBUF_VERIFY_VERSION;
 
190
  int file;
 
191
 
 
192
  if (argc != 2)
 
193
  {
 
194
    cerr << "Usage:  " << argv[0] << " TRANSACTION_LOG" << endl;
 
195
    return -1;
 
196
  }
 
197
 
 
198
  Command command;
 
199
 
 
200
  if ((file= open(argv[1], O_RDONLY)) == -1)
 
201
  {
 
202
    cerr << "Can not open file: " << argv[1] << endl;
 
203
  }
 
204
 
 
205
  char *buffer= NULL;
 
206
  char *temp_buffer;
 
207
 
 
208
  while (1)
 
209
  {
 
210
    uint64_t length;
 
211
 
 
212
    /* Read the size */
 
213
    if (read(file, &length, sizeof(uint64_t)) != sizeof(uint64_t))
 
214
      break;
 
215
 
 
216
    if (length > SIZE_MAX)
 
217
    {
 
218
      cerr << "Attempted to read record bigger than SIZE_MAX" << endl;
 
219
      exit(1);
 
220
    }
 
221
 
 
222
    temp_buffer= (char *)realloc(buffer, (size_t)length);
 
223
    if (temp_buffer == NULL)
 
224
    {
 
225
      cerr << "Memory allocation failure trying to allocate " << length << " bytes."  << endl;
 
226
      exit(1);
 
227
    }
 
228
    memset(temp_buffer, 0, (size_t)length);
 
229
    buffer= temp_buffer;
 
230
    size_t read_bytes= 0;
 
231
 
 
232
    /* Read the transaction */
 
233
    if ((read_bytes= read(file, buffer, (size_t)length)) != (size_t)length)
 
234
    {
 
235
      cerr << "Could not read entire transaction. Read " << read_bytes << " bytes instead of " << length << " bytes." << endl;
 
236
      exit(1);
 
237
    }
 
238
    command.ParseFromArray(buffer, (int) length);
 
239
 
 
240
    /* Print the command */
 
241
    printCommand(command);
 
242
  }
 
243
  return 0;
 
244
}