1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2009 Sun Microsystems
8
* Jay Pipes <joinfu@sun.com>
10
* This program is free software; you can redistribute it and/or modify
11
* it under the terms of the GNU General Public License as published by
12
* the Free Software Foundation; version 2 of the License.
14
* This program is distributed in the hope that it will be useful,
15
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
* GNU General Public License for more details.
19
* You should have received a copy of the GNU General Public License
20
* along with this program; if not, write to the Free Software
21
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
1
24
#include <drizzled/global.h>
25
#include <drizzled/gettext.h>
2
26
#include <sys/types.h>
3
27
#include <sys/stat.h>
9
#include <drizzled/message/replication.pb.h>
33
#include <drizzled/message/transaction.pb.h>
34
#include <drizzled/message/statement_transform.h>
36
#include <google/protobuf/io/coded_stream.h>
37
#include <google/protobuf/io/zero_copy_stream_impl.h>
11
39
using namespace std;
12
40
using namespace drizzled;
15
* @file Example application for reading change records and transactions
18
static void printInsert(const message::Command &container,
19
const message::InsertRecord &record)
22
cout << "INSERT INTO `" << container.schema() << "`.`"
23
<< container.table() << "` (";
25
int32_t num_fields= record.insert_field_size();
28
for (x= 0; x < num_fields; x++)
33
const message::Table::Field f= record.insert_field(x);
35
cout << "`" << f.name() << "`";
41
* There may be an INSERT VALUES (),() type statement. We know the
42
* number of records is equal to the field_values array size divided
43
* by the number of fields.
45
* So, we do an inner and an outer loop. Outer loop is on the number
46
* of records and the inner loop on the number of fields. In this way,
47
* we know that record.field_values(outer_loop * num_fields) + inner_loop))
48
* always gives us our correct field value.
50
int32_t num_records= (record.insert_value_size() / num_fields);
52
for (x= 0; x < num_records; x++)
58
for (y= 0; y < num_fields; y++)
63
cout << "\"" << record.insert_value((x * num_fields) + y) << "\"";
71
static void printDeleteWithPK(const message::Command &container,
72
const message::DeleteRecord &record)
74
cout << "DELETE FROM `" << container.schema() << "`.`" << container.table() << "`";
76
int32_t num_where_fields= record.where_field_size();
78
* Make sure we catch anywhere we're not aligning the fields with
79
* the field_values arrays...
81
assert(num_where_fields == record.where_value_size());
85
for (x= 0; x < num_where_fields; x++)
88
cout << " AND "; /* Always AND condition with a multi-column PK */
90
const message::Table::Field f= record.where_field(x);
92
/* Always equality conditions */
93
cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
97
static void printUpdateWithPK(const message::Command &container,
98
const message::UpdateRecord &record)
100
int32_t num_update_fields= record.update_field_size();
103
cout << "UPDATE `" << container.schema() << "`.`" << container.table() << "` SET ";
105
for (x= 0;x < num_update_fields; x++)
107
message::Table::Field f= record.update_field(x);
112
cout << "`" << f.name() << "` = \"" << record.after_value(x) << "\"";
115
int32_t num_where_fields= record.where_field_size();
117
* Make sure we catch anywhere we're not aligning the fields with
118
* the field_values arrays...
120
assert(num_where_fields == record.where_value_size());
123
for (x= 0;x < num_where_fields; x++)
126
cout << " AND "; /* Always AND condition with a multi-column PK */
128
const message::Table::Field f= record.where_field(x);
130
/* Always equality conditions */
131
cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
41
using namespace google;
43
static void printStatement(const message::Statement &statement)
45
cout << "/* Start Timestamp: " << statement.start_timestamp() << " ";
46
cout << " End Timestamp: " << statement.end_timestamp() << " */" << endl;
48
vector<string> sql_strings;
50
message::transformStatementToSql(statement, sql_strings, message::DRIZZLE);
52
vector<string>::iterator sql_string_iter= sql_strings.begin();
53
const std::string newline= "\n";
54
while (sql_string_iter != sql_strings.end())
56
string &sql= *sql_string_iter;
58
* Replace \n with spaces so that SQL statements
59
* are always on a single line
61
while (sql.find(newline) != std::string::npos)
62
sql.replace(sql.find(newline), 1, " ");
64
cout << sql << ';' << endl;
135
69
static void printTransaction(const message::Transaction &transaction)
139
cout << "/* Start Time: " << transaction.start_timestamp() << " */ START TRANSACTION;"<< endl;
141
for (e_size= 0; e_size < transaction.command_size(); e_size++)
71
const message::TransactionContext trx= transaction.transaction_context();
73
cout << "/* SERVER ID: " << trx.server_id() << " TRX ID: " << trx.transaction_id() << " */ " << endl;
75
size_t num_statements= transaction.statement_size();
78
for (x= 0; x < num_statements; ++x)
143
const message::Command command= transaction.command(e_size);
145
message::TransactionContext trx= command.transaction_context();
147
cout << "/* SID: " << trx.server_id() << " XID: " << trx.transaction_id() << " */ ";
149
switch (command.type())
151
case message::Command::START_TRANSACTION:
152
cout << "START TRANSACTION;";
154
case message::Command::COMMIT:
157
case message::Command::ROLLBACK:
160
case message::Command::INSERT:
162
printInsert(command, command.insert_record());
165
case message::Command::DELETE:
167
printDeleteWithPK(command, command.delete_record());
170
case message::Command::UPDATE:
172
printUpdateWithPK(command, command.update_record());
80
const message::Statement &statement= transaction.statement(x);
81
printStatement(statement);
180
cout << "/* Commit Time: " << transaction.end_timestamp() << " */ COMMIT;" << endl;
183
85
int main(int argc, char* argv[])
190
cerr << "Usage: " << argv[0] << " TRANSACTION_LOG" << endl;
92
fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
194
96
message::Transaction transaction;
196
if ((file= open(argv[1], O_RDONLY)) == -1)
98
file= open(argv[1], O_RDONLY);
198
cerr << "Can not open file: " << argv[1] << endl;
101
fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
105
protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);
106
protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
201
108
char *buffer= NULL;
109
char *temp_buffer= NULL;
111
uint64_t previous_length= 0;
114
/* Read in the length of the command */
115
while (result == true && coded_input->ReadLittleEndian64(&length) == true)
209
if (read(file, &length, sizeof(uint64_t)) != sizeof(uint64_t))
212
117
if (length > SIZE_MAX)
214
cerr << "Attempted to read record bigger than SIZE_MAX" << endl;
119
fprintf(stderr, _("Attempted to read record bigger than SIZE_MAX\n"));
218
temp_buffer= (char *)realloc(buffer, (size_t)length);
126
* First time around...just malloc the length. This block gets rid
127
* of a GCC warning about uninitialized temp_buffer.
129
temp_buffer= (char *) malloc((size_t) length);
131
/* No need to allocate if we have a buffer big enough... */
132
else if (length > previous_length)
134
temp_buffer= (char *) realloc(buffer, (size_t) length);
219
137
if (temp_buffer == NULL)
221
cerr << "Memory allocation failure trying to allocate " << length << " bytes." << endl;
224
memset(temp_buffer, 0, (size_t)length);
226
size_t read_bytes= 0;
228
/* Read the transaction */
229
if ((read_bytes= read(file, buffer, (size_t)length)) != (size_t)length)
231
cerr << "Could not read entire transaction. Read " << read_bytes << " bytes instead of " << length << " bytes." << endl;
234
transaction.ParseFromArray(buffer, (int) length);
139
fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
140
static_cast<uint64_t>(length));
146
/* Read the Command */
147
result= coded_input->ReadRaw(buffer, length);
150
fprintf(stderr, _("Could not read transaction message.\n"));
151
fprintf(stderr, _("GPB ERROR: %s.\n"), strerror(errno));
152
fprintf(stderr, _("Raw buffer read: %s.\n"), buffer);
156
result= transaction.ParseFromArray(buffer, static_cast<size_t>(length));
159
fprintf(stderr, _("Unable to parse command. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
161
fprintf(stderr, _("BUFFER: %s\n"), buffer);
236
165
/* Print the transaction */
237
166
printTransaction(transaction);
168
previous_length= length;
176
return (result == true ? 0 : 1);