~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

This patch does a few things:

* Renames the TransactionServices component in the kernel
  to ReplicationServices.
* Renames the transaction.proto to replication proto and
  updates the various programs referencing drizzled/message/transaction.pb.h
* Adds a public method to the kernel's ReplicationServices component:
  getLastAppliedTimestamp() which returns the timestamp of the last Command message
  which was successfully sent to a registered Applier plugin (the Command was "applied")
* Updates ReplicationServices::push() method to update an atomic timestamp
  when a Command is successfully applied by a replicator to an applier.

The ReplicationServices::getLastAppliedTimestamp() is critical to the upcoming
Publisher plugin, as it allows the publisher to ask the kernel when the last Command
message was applied.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
 * @file Server-side utility which is responsible for managing the 
26
26
 * communication between the kernel, replicator plugins, and applier plugins.
27
27
 *
28
 
 * TransactionServices is a bridge between modules and the kernel, and its
 
28
 * ReplicationServices is a bridge between modules and the kernel, and its
29
29
 * primary function is to take internal events (for instance the start of a 
30
30
 * transaction, the changing of a record, or the rollback of a transaction) 
31
31
 * and construct GPB Messages that are passed to the registered replicator and
38
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
39
39
 * these messages.
40
40
 *
41
 
 * @see /drizzled/message/transaction.proto
 
41
 * @see /drizzled/message/replication.proto
42
42
 */
43
43
 
44
44
#include "drizzled/server_includes.h"
45
 
#include "drizzled/transaction_services.h"
 
45
#include "drizzled/replication_services.h"
46
46
#include "drizzled/plugin/replicator.h"
47
47
#include "drizzled/plugin/applier.h"
48
 
#include "drizzled/message/transaction.pb.h"
 
48
#include "drizzled/message/replication.pb.h"
49
49
#include "drizzled/message/table.pb.h"
50
50
#include "drizzled/gettext.h"
51
51
#include "drizzled/session.h"
53
53
 
54
54
#include <vector>
55
55
 
56
 
drizzled::TransactionServices transaction_services;
 
56
drizzled::ReplicationServices replication_services;
57
57
 
58
58
void add_replicator(drizzled::plugin::Replicator *replicator)
59
59
{
60
 
  transaction_services.attachReplicator(replicator);
 
60
  replication_services.attachReplicator(replicator);
61
61
}
62
62
 
63
63
void remove_replicator(drizzled::plugin::Replicator *replicator)
64
64
{
65
 
  transaction_services.detachReplicator(replicator);
 
65
  replication_services.detachReplicator(replicator);
66
66
}
67
67
 
68
68
void add_applier(drizzled::plugin::Applier *applier)
69
69
{
70
 
  transaction_services.attachApplier(applier);
 
70
  replication_services.attachApplier(applier);
71
71
}
72
72
 
73
73
void remove_applier(drizzled::plugin::Applier *applier)
74
74
{
75
 
  transaction_services.detachApplier(applier);
 
75
  replication_services.detachApplier(applier);
76
76
}
77
77
 
78
78
namespace drizzled
79
79
{
80
80
 
81
 
TransactionServices::TransactionServices()
 
81
ReplicationServices::ReplicationServices()
82
82
{
83
83
  is_active= false;
84
84
}
85
85
 
86
 
void TransactionServices::evaluateActivePlugins()
 
86
void ReplicationServices::evaluateActivePlugins()
87
87
{
88
88
  /* 
89
89
   * We loop through replicators and appliers, evaluating
142
142
  is_active= false;
143
143
}
144
144
 
145
 
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
 
145
void ReplicationServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
146
146
{
147
147
  replicators.push_back(in_replicator);
148
148
  evaluateActivePlugins();
149
149
}
150
150
 
151
 
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
 
151
void ReplicationServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
152
152
{
153
153
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
154
154
  evaluateActivePlugins();
155
155
}
156
156
 
157
 
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
 
157
void ReplicationServices::attachApplier(drizzled::plugin::Applier *in_applier)
158
158
{
159
159
  appliers.push_back(in_applier);
160
160
  evaluateActivePlugins();
161
161
}
162
162
 
163
 
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
 
163
void ReplicationServices::detachApplier(drizzled::plugin::Applier *in_applier)
164
164
{
165
165
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
166
166
  evaluateActivePlugins();
167
167
}
168
168
 
169
 
bool TransactionServices::isActive() const
 
169
bool ReplicationServices::isActive() const
170
170
{
171
171
  return is_active;
172
172
}
173
173
 
174
 
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
 
174
void ReplicationServices::setCommandTransactionContext(drizzled::message::Command *in_command
175
175
                                                     , Session *in_session) const
176
176
{
177
177
  using namespace drizzled::message;
181
181
  trx->set_transaction_id(in_session->getTransactionId());
182
182
}
183
183
 
184
 
void TransactionServices::startTransaction(Session *in_session)
 
184
void ReplicationServices::startTransaction(Session *in_session)
185
185
{
186
186
  using namespace drizzled::message;
187
187
  
197
197
  push(&command);
198
198
}
199
199
 
200
 
void TransactionServices::commitTransaction(Session *in_session)
 
200
void ReplicationServices::commitTransaction(Session *in_session)
201
201
{
202
202
  using namespace drizzled::message;
203
203
  
213
213
  push(&command);
214
214
}
215
215
 
216
 
void TransactionServices::rollbackTransaction(Session *in_session)
 
216
void ReplicationServices::rollbackTransaction(Session *in_session)
217
217
{
218
218
  using namespace drizzled::message;
219
219
  
229
229
  push(&command);
230
230
}
231
231
 
232
 
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
232
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
233
233
{
234
234
  using namespace drizzled::message;
235
235
  
256
256
 
257
257
  Field *current_field;
258
258
  Field **table_fields= in_table->field;
259
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
259
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
260
260
  string_value->set_charset(system_charset_info);
261
261
 
262
262
  Table::Field *current_proto_field;
277
277
  push(&command);
278
278
}
279
279
 
280
 
void TransactionServices::updateRecord(Session *in_session,
 
280
void ReplicationServices::updateRecord(Session *in_session,
281
281
                                       Table *in_table, 
282
282
                                       const unsigned char *old_record, 
283
283
                                       const unsigned char *new_record)
307
307
 
308
308
  Field *current_field;
309
309
  Field **table_fields= in_table->field;
310
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
310
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
311
311
  string_value->set_charset(system_charset_info);
312
312
 
313
313
  Table::Field *current_proto_field;
358
358
  push(&command);
359
359
}
360
360
 
361
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
361
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
362
362
{
363
363
  using namespace drizzled::message;
364
364
  
385
385
 
386
386
  Field *current_field;
387
387
  Field **table_fields= in_table->field;
388
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
388
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
389
389
  string_value->set_charset(system_charset_info);
390
390
 
391
391
  Table::Field *current_proto_field;
411
411
  push(&command);
412
412
}
413
413
 
414
 
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
414
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
415
415
{
416
416
  using namespace drizzled::message;
417
417
  
430
430
  push(&command);
431
431
}
432
432
 
433
 
void TransactionServices::push(drizzled::message::Command *to_push)
 
433
void ReplicationServices::push(drizzled::message::Command *to_push)
434
434
{
435
435
  std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
436
436
  std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
460
460
      }
461
461
 
462
462
      cur_repl->replicate(cur_appl, to_push);
 
463
      
 
464
      /* 
 
465
       * We update the timestamp for the last applied Command so that
 
466
       * publisher plugins can ask the replication services when the
 
467
       * last known applied Command was using the getLastAppliedTimestamp()
 
468
       * method.
 
469
       */
 
470
      last_applied_timestamp.fetch_and_store(to_push->timestamp());
463
471
      ++appl_iter;
464
472
    }
465
473
    ++repl_iter;