~drizzle-trunk/drizzle/development

636.1.1 by Mark Atwood
add replicator plugin type
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 *
6
 *  Authors:
7
 *
8
 *    Jay Pipes <joinfu@sun.com>
636.1.1 by Mark Atwood
add replicator plugin type
9
 *
10
 *  This program is free software; you can redistribute it and/or modify
11
 *  it under the terms of the GNU General Public License as published by
12
 *  the Free Software Foundation; version 2 of the License.
13
 *
14
 *  This program is distributed in the hope that it will be useful,
15
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 *  GNU General Public License for more details.
18
 *
19
 *  You should have received a copy of the GNU General Public License
20
 *  along with this program; if not, write to the Free Software
21
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
22
 */
23
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
24
/**
25
 * @file Server-side utility which is responsible for managing the 
26
 * communication between the kernel, replicator plugins, and applier plugins.
27
 *
1039.5.31 by Jay Pipes
This patch does a few things:
28
 * ReplicationServices is a bridge between modules and the kernel, and its
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
29
 * primary function is to take internal events (for instance the start of a 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
31
 * and construct GPB Messages that are passed to the registered replicator and
32
 * applier plugins.
33
 *
34
 * The reason for this functionality is to encapsulate all communication
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
36
 * Instead of the plugin having to understand the (often fluidly changing)
37
 * mechanics of the kernel, all the plugin needs to understand is the message
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
39
 * these messages.
40
 *
1039.5.31 by Jay Pipes
This patch does a few things:
41
 * @see /drizzled/message/replication.proto
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
42
 */
43
44
#include "drizzled/server_includes.h"
1039.5.31 by Jay Pipes
This patch does a few things:
45
#include "drizzled/replication_services.h"
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
46
#include "drizzled/plugin/replicator.h"
47
#include "drizzled/plugin/applier.h"
1039.5.31 by Jay Pipes
This patch does a few things:
48
#include "drizzled/message/replication.pb.h"
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
49
#include "drizzled/message/table.pb.h"
50
#include "drizzled/gettext.h"
51
#include "drizzled/session.h"
971.1.49 by Monty Taylor
Hooked transaction_services into Plugin_registry.
52
#include "drizzled/plugin_registry.h"
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
53
54
#include <vector>
55
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
56
using namespace drizzled;
57
58
ReplicationServices replication_services;
59
60
void add_replicator(plugin::Replicator *replicator)
1039.5.1 by Jay Pipes
* New serial event log plugin
61
{
1039.5.31 by Jay Pipes
This patch does a few things:
62
  replication_services.attachReplicator(replicator);
1039.5.1 by Jay Pipes
* New serial event log plugin
63
}
64
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
65
void remove_replicator(plugin::Replicator *replicator)
1039.5.1 by Jay Pipes
* New serial event log plugin
66
{
1039.5.31 by Jay Pipes
This patch does a few things:
67
  replication_services.detachReplicator(replicator);
1039.5.1 by Jay Pipes
* New serial event log plugin
68
}
69
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
70
void add_applier(plugin::Applier *applier)
1039.5.1 by Jay Pipes
* New serial event log plugin
71
{
1039.5.31 by Jay Pipes
This patch does a few things:
72
  replication_services.attachApplier(applier);
1039.5.1 by Jay Pipes
* New serial event log plugin
73
}
74
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
75
void remove_applier(plugin::Applier *applier)
1039.5.1 by Jay Pipes
* New serial event log plugin
76
{
1039.5.31 by Jay Pipes
This patch does a few things:
77
  replication_services.detachApplier(applier);
1039.5.1 by Jay Pipes
* New serial event log plugin
78
}
971.1.52 by Monty Taylor
Did the finalizers. Renamed plugin_registry.
79
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
80
namespace drizzled
81
{
82
1039.5.31 by Jay Pipes
This patch does a few things:
83
ReplicationServices::ReplicationServices()
1039.5.5 by Jay Pipes
This commit does two things:
84
{
85
  is_active= false;
86
}
87
1039.5.31 by Jay Pipes
This patch does a few things:
88
void ReplicationServices::evaluateActivePlugins()
1039.5.5 by Jay Pipes
This commit does two things:
89
{
90
  /* 
91
   * We loop through replicators and appliers, evaluating
92
   * whether or not there is at least one active replicator
93
   * and one active applier.  If not, we set is_active
94
   * to false.
95
   */
96
  bool tmp_is_active= false;
97
98
  if (replicators.size() == 0 || appliers.size() == 0)
99
  {
100
    is_active= false;
101
    return;
102
  }
103
104
  /* 
105
   * Determine if any remaining replicators and if those
106
   * replicators are active...if not, set is_active
107
   * to false
108
   */
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
109
  std::vector<plugin::Replicator *>::iterator repl_iter= replicators.begin();
1039.5.5 by Jay Pipes
This commit does two things:
110
  while (repl_iter != replicators.end())
111
  {
112
    if ((*repl_iter)->isActive())
113
    {
114
      tmp_is_active= true;
115
      break;
116
    }
117
    ++repl_iter;
118
  }
119
  if (! tmp_is_active)
120
  {
121
    /* No active replicators. Set is_active to false and exit. */
122
    is_active= false;
123
    return;
124
  }
125
126
  /* 
127
   * OK, we know there's at least one active replicator.
128
   *
129
   * Now determine if any remaining replicators and if those
130
   * replicators are active...if not, set is_active
131
   * to false
132
   */
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
133
  std::vector<plugin::Applier *>::iterator appl_iter= appliers.begin();
1039.5.5 by Jay Pipes
This commit does two things:
134
  while (appl_iter != appliers.end())
135
  {
136
    if ((*appl_iter)->isActive())
137
    {
138
      is_active= true;
139
      return;
140
    }
141
    ++appl_iter;
142
  }
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
143
  /* If we get here, there are no active appliers */
1039.5.5 by Jay Pipes
This commit does two things:
144
  is_active= false;
145
}
146
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
147
void ReplicationServices::attachReplicator(plugin::Replicator *in_replicator)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
148
{
149
  replicators.push_back(in_replicator);
1039.5.5 by Jay Pipes
This commit does two things:
150
  evaluateActivePlugins();
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
151
}
152
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
153
void ReplicationServices::detachReplicator(plugin::Replicator *in_replicator)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
154
{
155
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
1039.5.5 by Jay Pipes
This commit does two things:
156
  evaluateActivePlugins();
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
157
}
158
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
159
void ReplicationServices::attachApplier(plugin::Applier *in_applier)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
160
{
161
  appliers.push_back(in_applier);
1039.5.5 by Jay Pipes
This commit does two things:
162
  evaluateActivePlugins();
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
163
}
164
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
165
void ReplicationServices::detachApplier(plugin::Applier *in_applier)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
166
{
167
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
1039.5.5 by Jay Pipes
This commit does two things:
168
  evaluateActivePlugins();
169
}
170
1039.5.31 by Jay Pipes
This patch does a few things:
171
bool ReplicationServices::isActive() const
1039.5.5 by Jay Pipes
This commit does two things:
172
{
173
  return is_active;
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
174
}
175
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
176
void ReplicationServices::setCommandTransactionContext(message::Command *in_command
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
177
                                                     , Session *in_session) const
178
{
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
179
  message::TransactionContext *trx= in_command->mutable_transaction_context();
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
180
  trx->set_server_id(in_session->getServerId());
181
  trx->set_transaction_id(in_session->getTransactionId());
182
}
183
1039.5.31 by Jay Pipes
This patch does a few things:
184
void ReplicationServices::startTransaction(Session *in_session)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
185
{
1039.5.5 by Jay Pipes
This commit does two things:
186
  if (! is_active)
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
187
    return;
188
  
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
189
  message::Command command;
190
  command.set_type(message::Command::START_TRANSACTION);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
191
  command.set_timestamp(in_session->getCurrentTimestamp());
192
193
  setCommandTransactionContext(&command, in_session);
194
  
195
  push(&command);
196
}
197
1039.5.31 by Jay Pipes
This patch does a few things:
198
void ReplicationServices::commitTransaction(Session *in_session)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
199
{
1039.5.5 by Jay Pipes
This commit does two things:
200
  if (! is_active)
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
201
    return;
202
  
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
203
  message::Command command;
204
  command.set_type(message::Command::COMMIT);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
205
  command.set_timestamp(in_session->getCurrentTimestamp());
206
207
  setCommandTransactionContext(&command, in_session);
208
  
209
  push(&command);
210
}
211
1039.5.31 by Jay Pipes
This patch does a few things:
212
void ReplicationServices::rollbackTransaction(Session *in_session)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
213
{
1039.5.5 by Jay Pipes
This commit does two things:
214
  if (! is_active)
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
215
    return;
216
  
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
217
  message::Command command;
218
  command.set_type(message::Command::ROLLBACK);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
219
  command.set_timestamp(in_session->getCurrentTimestamp());
220
221
  setCommandTransactionContext(&command, in_session);
222
  
223
  push(&command);
224
}
225
1039.5.31 by Jay Pipes
This patch does a few things:
226
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
227
{
1039.5.5 by Jay Pipes
This commit does two things:
228
  if (! is_active)
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
229
    return;
230
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
231
  message::Command command;
232
  command.set_type(message::Command::INSERT);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
233
  command.set_timestamp(in_session->getCurrentTimestamp());
234
235
  setCommandTransactionContext(&command, in_session);
236
237
  const char *schema_name= in_table->getShare()->db.str;
238
  const char *table_name= in_table->getShare()->table_name.str;
239
240
  command.set_schema(schema_name);
241
  command.set_table(table_name);
242
243
  /* 
244
   * Now we construct the specialized InsertRecord command inside
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
245
   * the message::Command container...
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
246
   */
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
247
  message::InsertRecord *change_record= command.mutable_insert_record();
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
248
249
  Field *current_field;
250
  Field **table_fields= in_table->field;
1039.5.31 by Jay Pipes
This patch does a few things:
251
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
252
  string_value->set_charset(system_charset_info);
253
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
254
  message::Table::Field *current_proto_field;
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
255
1039.5.27 by Jay Pipes
Had to set read bitmap for fields in insertRecord() now that certain asserts have been placed in Field classes. Oh, BTW, the bitmap interface sucks ass. Also corrected incorrect call to uint8korr macros with proper int64_tget macro call in command reader
256
  /* We will read all the table's fields... */
257
  in_table->setReadSet();
258
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
259
  while ((current_field= *table_fields++) != NULL) 
260
  {
1039.5.19 by Jay Pipes
Per Stew's suggestions in code review:
261
    current_proto_field= change_record->add_insert_field();
262
    current_proto_field->set_name(std::string(current_field->field_name));
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
263
    current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
264
    string_value= current_field->val_str(string_value);
265
    change_record->add_insert_value(std::string(string_value->c_ptr()));
1039.5.19 by Jay Pipes
Per Stew's suggestions in code review:
266
    string_value->free();
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
267
  }
268
  
269
  push(&command);
270
}
271
1039.5.31 by Jay Pipes
This patch does a few things:
272
void ReplicationServices::updateRecord(Session *in_session,
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
273
                                       Table *in_table, 
274
                                       const unsigned char *old_record, 
275
                                       const unsigned char *new_record)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
276
{
1039.5.5 by Jay Pipes
This commit does two things:
277
  if (! is_active)
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
278
    return;
279
  
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
280
  message::Command command;
281
  command.set_type(message::Command::UPDATE);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
282
  command.set_timestamp(in_session->getCurrentTimestamp());
283
284
  setCommandTransactionContext(&command, in_session);
285
286
  const char *schema_name= in_table->getShare()->db.str;
287
  const char *table_name= in_table->getShare()->table_name.str;
288
289
  command.set_schema(schema_name);
290
  command.set_table(table_name);
291
292
  /* 
293
   * Now we construct the specialized UpdateRecord command inside
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
294
   * the message::Command container...
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
295
   */
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
296
  message::UpdateRecord *change_record= command.mutable_update_record();
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
297
298
  Field *current_field;
299
  Field **table_fields= in_table->field;
1039.5.31 by Jay Pipes
This patch does a few things:
300
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
301
  string_value->set_charset(system_charset_info);
302
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
303
  message::Table::Field *current_proto_field;
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
304
305
  while ((current_field= *table_fields++) != NULL) 
306
  {
307
    /*
308
     * The below really should be moved into the Field API and Record API.  But for now
309
     * we do this crazy pointer fiddling to figure out if the current field
310
     * has been updated in the supplied record raw byte pointers.
311
     */
312
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
313
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
314
315
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
316
317
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
318
    {
319
      /* Field is changed from old to new */
1039.5.19 by Jay Pipes
Per Stew's suggestions in code review:
320
      current_proto_field= change_record->add_update_field();
321
      current_proto_field->set_name(std::string(current_field->field_name));
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
322
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
1039.5.29 by Jay Pipes
Add call to set read bit on fields being read in updateRecord()
323
1039.5.33 by Jay Pipes
Restores original read bit on fields read during call to val_str in ReplicationServices::updateRecord()
324
      /* Store the original "read bit" for this field */
325
      bool is_read_set= current_field->isReadSet();
326
1039.5.29 by Jay Pipes
Add call to set read bit on fields being read in updateRecord()
327
      /* We need to mark that we will "read" this field... */
328
      in_table->setReadSet(current_field->field_index);
329
1039.5.33 by Jay Pipes
Restores original read bit on fields read during call to val_str in ReplicationServices::updateRecord()
330
      /* Read the string value of this field's contents */
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
331
      string_value= current_field->val_str(string_value);
1039.5.33 by Jay Pipes
Restores original read bit on fields read during call to val_str in ReplicationServices::updateRecord()
332
333
      /* 
334
       * Reset the read bit after reading field to its original state.  This 
335
       * prevents the field from being included in the WHERE clause
336
       */
337
      current_field->setReadSet(is_read_set);
338
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
339
      change_record->add_after_value(std::string(string_value->c_ptr()));
1039.5.19 by Jay Pipes
Per Stew's suggestions in code review:
340
      string_value->free();
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
341
    }
342
343
    /* 
1039.5.12 by Jay Pipes
Fixes up UPDATE test case for serial event log and TransactionServices::updateRecord to properly add the WHERE clause fields for primary/unique keys on the table.
344
     * Add the WHERE clause values now...the fields which return true
345
     * for isReadSet() are in the WHERE clause.  For tables with no
346
     * primary or unique key, all fields will be returned.
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
347
     */
1039.5.12 by Jay Pipes
Fixes up UPDATE test case for serial event log and TransactionServices::updateRecord to properly add the WHERE clause fields for primary/unique keys on the table.
348
    if (current_field->isReadSet())
349
    {
1039.5.19 by Jay Pipes
Per Stew's suggestions in code review:
350
      current_proto_field= change_record->add_where_field();
351
      current_proto_field->set_name(std::string(current_field->field_name));
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
352
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
1039.5.12 by Jay Pipes
Fixes up UPDATE test case for serial event log and TransactionServices::updateRecord to properly add the WHERE clause fields for primary/unique keys on the table.
353
      string_value= current_field->val_str(string_value);
354
      change_record->add_where_value(std::string(string_value->c_ptr()));
1039.5.19 by Jay Pipes
Per Stew's suggestions in code review:
355
      string_value->free();
1039.5.12 by Jay Pipes
Fixes up UPDATE test case for serial event log and TransactionServices::updateRecord to properly add the WHERE clause fields for primary/unique keys on the table.
356
    }
1039.5.11 by Jay Pipes
Adds multi-value INSERT test, modifies TransactionServices to support UPDATE statements (though WHERE clause still not done...)
357
  }
358
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
359
  push(&command);
360
}
361
1039.5.31 by Jay Pipes
This patch does a few things:
362
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
363
{
1039.5.5 by Jay Pipes
This commit does two things:
364
  if (! is_active)
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
365
    return;
366
  
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
367
  message::Command command;
368
  command.set_type(message::Command::DELETE);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
369
  command.set_timestamp(in_session->getCurrentTimestamp());
370
371
  setCommandTransactionContext(&command, in_session);
372
373
  const char *schema_name= in_table->getShare()->db.str;
374
  const char *table_name= in_table->getShare()->table_name.str;
375
376
  command.set_schema(schema_name);
377
  command.set_table(table_name);
378
379
  /* 
380
   * Now we construct the specialized DeleteRecord command inside
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
381
   * the message::Command container...
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
382
   */
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
383
  message::DeleteRecord *change_record= command.mutable_delete_record();
1039.6.10 by Joe Daly
Add code to transaction_services.cc to populate a DeleteRecord, add delete test to all.test
384
 
385
  Field *current_field;
386
  Field **table_fields= in_table->field;
1039.5.31 by Jay Pipes
This patch does a few things:
387
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
1039.6.10 by Joe Daly
Add code to transaction_services.cc to populate a DeleteRecord, add delete test to all.test
388
  string_value->set_charset(system_charset_info);
389
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
390
  message::Table::Field *current_proto_field;
1039.6.10 by Joe Daly
Add code to transaction_services.cc to populate a DeleteRecord, add delete test to all.test
391
392
  while ((current_field= *table_fields++) != NULL)
393
  {
394
    /*
395
     * Add the WHERE clause values now...the fields which return true
396
     * for isReadSet() are in the WHERE clause.  For tables with no
397
     * primary or unique key, all fields will be returned.
398
     */
399
    if (current_field->isReadSet())
400
    {
1039.5.19 by Jay Pipes
Per Stew's suggestions in code review:
401
      current_proto_field= change_record->add_where_field();
402
      current_proto_field->set_name(std::string(current_field->field_name));
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
403
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
1039.6.10 by Joe Daly
Add code to transaction_services.cc to populate a DeleteRecord, add delete test to all.test
404
      string_value= current_field->val_str(string_value);
405
      change_record->add_where_value(std::string(string_value->c_ptr()));
1039.5.19 by Jay Pipes
Per Stew's suggestions in code review:
406
      string_value->free();
1039.6.10 by Joe Daly
Add code to transaction_services.cc to populate a DeleteRecord, add delete test to all.test
407
    }
408
  }
409
 
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
410
  push(&command);
411
}
412
1039.5.31 by Jay Pipes
This patch does a few things:
413
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
414
{
1039.5.5 by Jay Pipes
This commit does two things:
415
  if (! is_active)
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
416
    return;
417
  
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
418
  message::Command command;
419
  command.set_type(message::Command::RAW_SQL);
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
420
  command.set_timestamp(in_session->getCurrentTimestamp());
421
422
  setCommandTransactionContext(&command, in_session);
423
424
  std::string query(in_query, in_query_len);
425
  command.set_sql(query);
426
427
  push(&command);
428
}
429
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
430
void ReplicationServices::push(message::Command *to_push)
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
431
{
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
432
  std::vector<plugin::Replicator *>::iterator repl_iter= replicators.begin();
433
  std::vector<plugin::Applier *>::iterator appl_start_iter, appl_iter;
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
434
  appl_start_iter= appliers.begin();
435
1101.1.8 by Monty Taylor
Finished cleaning up using namespace stuff.
436
  plugin::Replicator *cur_repl;
437
  plugin::Applier *cur_appl;
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
438
439
  while (repl_iter != replicators.end())
440
  {
441
    cur_repl= *repl_iter;
442
    if (! cur_repl->isActive())
443
    {
444
      ++repl_iter;
445
      continue;
446
    }
447
    
448
    appl_iter= appl_start_iter;
449
    while (appl_iter != appliers.end())
450
    {
451
      cur_appl= *appl_iter;
452
453
      if (! cur_appl->isActive())
454
      {
455
        ++appl_iter;
456
        continue;
457
      }
458
459
      cur_repl->replicate(cur_appl, to_push);
1039.5.31 by Jay Pipes
This patch does a few things:
460
      
461
      /* 
462
       * We update the timestamp for the last applied Command so that
463
       * publisher plugins can ask the replication services when the
464
       * last known applied Command was using the getLastAppliedTimestamp()
465
       * method.
466
       */
467
      last_applied_timestamp.fetch_and_store(to_push->timestamp());
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
468
      ++appl_iter;
469
    }
470
    ++repl_iter;
471
  }
472
}
473
474
} /* end namespace drizzled */