~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Monty Taylor
  • Date: 2009-04-25 20:45:19 UTC
  • mto: (997.2.5 mordred)
  • mto: This revision was merged to the branch mainline in revision 1003.
  • Revision ID: mordred@inaugust.com-20090425204519-lgrl7mz2r66v0jby
Blackhole.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 
 *  Copyright (c) 2009-2010 Jay Pipes <jaypipes@gmail.com>
6
5
 *
7
6
 *  Authors:
8
7
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com>
 
8
 *    Jay Pipes <joinfu@sun.com>
10
9
 *
11
10
 *  This program is free software; you can redistribute it and/or modify
12
11
 *  it under the terms of the GNU General Public License as published by
24
23
 
25
24
/**
26
25
 * @file Server-side utility which is responsible for managing the 
27
 
 * communication between the kernel and the replication plugins:
28
 
 *
29
 
 * - TransactionReplicator
30
 
 * - TransactionApplier
31
 
 * - Publisher
32
 
 * - Subscriber
33
 
 *
34
 
 * ReplicationServices is a bridge between replication modules and the kernel,
35
 
 * and its primary function is to  */
 
26
 * communication between the kernel, replicator plugins, and applier plugins.
 
27
 *
 
28
 * TransactionServices is a bridge between modules and the kernel, and its
 
29
 * primary function is to take internal events (for instance the start of a 
 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
 
31
 * and construct GPB Messages that are passed to the registered replicator and
 
32
 * applier plugins.
 
33
 *
 
34
 * The reason for this functionality is to encapsulate all communication
 
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
36
 * Instead of the plugin having to understand the (often fluidly changing)
 
37
 * mechanics of the kernel, all the plugin needs to understand is the message
 
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
39
 * these messages.
 
40
 *
 
41
 * @see /drizzled/message/transaction.proto
 
42
 */
36
43
 
37
 
#include "config.h"
38
 
#include "drizzled/replication_services.h"
39
 
#include "drizzled/plugin/transaction_replicator.h"
40
 
#include "drizzled/plugin/transaction_applier.h"
 
44
#include "drizzled/server_includes.h"
 
45
#include "drizzled/transaction_services.h"
 
46
#include "drizzled/plugin/replicator.h"
 
47
#include "drizzled/plugin/applier.h"
41
48
#include "drizzled/message/transaction.pb.h"
42
49
#include "drizzled/message/table.pb.h"
43
 
#include "drizzled/message/statement_transform.h"
44
50
#include "drizzled/gettext.h"
45
51
#include "drizzled/session.h"
46
 
#include "drizzled/error.h"
 
52
#include "drizzled/plugin_registry.h"
47
53
 
48
54
#include <vector>
49
55
 
50
 
using namespace std;
 
56
drizzled::TransactionServices transaction_services;
 
57
 
 
58
void add_replicator(drizzled::plugin::Replicator *repl)
 
59
{
 
60
  transaction_services.attachReplicator(repl);
 
61
}
 
62
 
 
63
void remove_replicator(drizzled::plugin::Replicator *repl)
 
64
{
 
65
  transaction_services.detachReplicator(repl);
 
66
}
 
67
 
51
68
 
52
69
namespace drizzled
53
70
{
54
71
 
55
 
ReplicationServices::ReplicationServices()
56
 
{
57
 
  is_active= false;
58
 
}
59
 
 
60
 
void ReplicationServices::evaluateActivePlugins()
61
 
{
62
 
  /* 
63
 
   * We loop through replicators and appliers, evaluating
64
 
   * whether or not there is at least one active replicator
65
 
   * and one active applier.  If not, we set is_active
66
 
   * to false.
67
 
   */
68
 
  bool tmp_is_active= false;
69
 
 
70
 
  if (replicators.empty() || appliers.empty())
71
 
  {
72
 
    is_active= false;
73
 
    return;
74
 
  }
75
 
 
76
 
  /* 
77
 
   * Determine if any remaining replicators and if those
78
 
   * replicators are active...if not, set is_active
79
 
   * to false
80
 
   */
81
 
  for (Replicators::iterator repl_iter= replicators.begin();
82
 
       repl_iter != replicators.end();
83
 
       ++repl_iter)
84
 
  {
85
 
    if ((*repl_iter)->isEnabled())
86
 
    {
87
 
      tmp_is_active= true;
88
 
      break;
89
 
    }
90
 
  }
91
 
  if (! tmp_is_active)
92
 
  {
93
 
    /* No active replicators. Set is_active to false and exit. */
94
 
    is_active= false;
95
 
    return;
96
 
  }
97
 
 
98
 
  /* 
99
 
   * OK, we know there's at least one active replicator.
100
 
   *
101
 
   * Now determine if any remaining replicators and if those
102
 
   * replicators are active...if not, set is_active
103
 
   * to false
104
 
   */
105
 
  for (Appliers::iterator appl_iter= appliers.begin();
106
 
       appl_iter != appliers.end();
107
 
       ++appl_iter)
108
 
  {
109
 
    if ((*appl_iter)->isEnabled())
110
 
    {
111
 
      is_active= true;
112
 
      return;
113
 
    }
114
 
  }
115
 
  /* If we get here, there are no active appliers */
116
 
  is_active= false;
117
 
}
118
 
 
119
 
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
 
72
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
120
73
{
121
74
  replicators.push_back(in_replicator);
122
 
  evaluateActivePlugins();
123
75
}
124
76
 
125
 
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
 
77
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
126
78
{
127
79
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
128
 
  evaluateActivePlugins();
129
80
}
130
81
 
131
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
 
82
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
132
83
{
133
84
  appliers.push_back(in_applier);
134
 
  evaluateActivePlugins();
135
85
}
136
86
 
137
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
 
87
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
138
88
{
139
89
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
140
 
  evaluateActivePlugins();
141
 
}
142
 
 
143
 
bool ReplicationServices::isActive() const
144
 
{
145
 
  return is_active;
146
 
}
147
 
 
148
 
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
149
 
{
150
 
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
151
 
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
 
90
}
 
91
 
 
92
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
 
93
                                                     , Session *in_session) const
 
94
{
 
95
  using namespace drizzled::message;
 
96
 
 
97
  TransactionContext *trx= in_command->mutable_transaction_context();
 
98
  trx->set_server_id(in_session->getServerId());
 
99
  trx->set_transaction_id(in_session->getTransactionId());
 
100
}
 
101
 
 
102
void TransactionServices::startTransaction(Session *in_session)
 
103
{
 
104
  using namespace drizzled::message;
 
105
  
 
106
  if (replicators.size() == 0 || appliers.size() == 0)
 
107
    return;
 
108
  
 
109
  Command command;
 
110
  command.set_type(Command::START_TRANSACTION);
 
111
  command.set_timestamp(in_session->getCurrentTimestamp());
 
112
 
 
113
  setCommandTransactionContext(&command, in_session);
 
114
  
 
115
  push(&command);
 
116
}
 
117
 
 
118
void TransactionServices::commitTransaction(Session *in_session)
 
119
{
 
120
  using namespace drizzled::message;
 
121
  
 
122
  if (replicators.size() == 0 || appliers.size() == 0)
 
123
    return;
 
124
  
 
125
  Command command;
 
126
  command.set_type(Command::COMMIT);
 
127
  command.set_timestamp(in_session->getCurrentTimestamp());
 
128
 
 
129
  setCommandTransactionContext(&command, in_session);
 
130
  
 
131
  push(&command);
 
132
}
 
133
 
 
134
void TransactionServices::rollbackTransaction(Session *in_session)
 
135
{
 
136
  using namespace drizzled::message;
 
137
  
 
138
  if (replicators.size() == 0 || appliers.size() == 0)
 
139
    return;
 
140
  
 
141
  Command command;
 
142
  command.set_type(Command::ROLLBACK);
 
143
  command.set_timestamp(in_session->getCurrentTimestamp());
 
144
 
 
145
  setCommandTransactionContext(&command, in_session);
 
146
  
 
147
  push(&command);
 
148
}
 
149
 
 
150
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
151
{
 
152
  using namespace drizzled::message;
 
153
  
 
154
  if (replicators.size() == 0 || appliers.size() == 0)
 
155
    return;
 
156
 
 
157
  Command command;
 
158
  command.set_type(Command::INSERT);
 
159
  command.set_timestamp(in_session->getCurrentTimestamp());
 
160
 
 
161
  setCommandTransactionContext(&command, in_session);
 
162
 
 
163
  const char *schema_name= in_table->getShare()->db.str;
 
164
  const char *table_name= in_table->getShare()->table_name.str;
 
165
 
 
166
  command.set_schema(schema_name);
 
167
  command.set_table(table_name);
 
168
 
 
169
  /* 
 
170
   * Now we construct the specialized InsertRecord command inside
 
171
   * the Command container...
 
172
   */
 
173
  InsertRecord *change_record= command.mutable_insert_record();
 
174
 
 
175
  Field *current_field;
 
176
  Field **table_fields= in_table->field;
 
177
  String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
 
178
  string_value->set_charset(system_charset_info);
 
179
 
 
180
  Table::Field *cur_field;
 
181
 
 
182
  while ((current_field= *table_fields++) != NULL) 
 
183
  {
 
184
    cur_field= change_record->add_insert_field();
 
185
    cur_field->set_name(std::string(current_field->field_name));
 
186
    cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
 
187
    string_value= current_field->val_str(string_value);
 
188
    change_record->add_insert_value(std::string(string_value->c_ptr()));
 
189
    string_value->free(); /* I wish there was a clear() method... */
 
190
  }
 
191
 
 
192
  if (string_value)
 
193
    delete string_value; /* Is this needed with memroot allocation? */
 
194
  
 
195
  push(&command);
 
196
}
 
197
 
 
198
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
 
199
{
 
200
  using namespace drizzled::message;
 
201
  
 
202
  if (replicators.size() == 0 || appliers.size() == 0)
 
203
    return;
 
204
  
 
205
  Command command;
 
206
  command.set_type(Command::UPDATE);
 
207
  command.set_timestamp(in_session->getCurrentTimestamp());
 
208
 
 
209
  setCommandTransactionContext(&command, in_session);
 
210
 
 
211
  const char *schema_name= in_table->getShare()->db.str;
 
212
  const char *table_name= in_table->getShare()->table_name.str;
 
213
 
 
214
  command.set_schema(schema_name);
 
215
  command.set_table(table_name);
 
216
 
 
217
  /* 
 
218
   * Now we construct the specialized UpdateRecord command inside
 
219
   * the Command container...
 
220
   */
 
221
  //UpdateRecord *change_record= command.mutable_update_record();
 
222
 
 
223
  push(&command);
 
224
}
 
225
 
 
226
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
227
{
 
228
  using namespace drizzled::message;
 
229
  
 
230
  if (replicators.size() == 0 || appliers.size() == 0)
 
231
    return;
 
232
  
 
233
  Command command;
 
234
  command.set_type(Command::DELETE);
 
235
  command.set_timestamp(in_session->getCurrentTimestamp());
 
236
 
 
237
  setCommandTransactionContext(&command, in_session);
 
238
 
 
239
  const char *schema_name= in_table->getShare()->db.str;
 
240
  const char *table_name= in_table->getShare()->table_name.str;
 
241
 
 
242
  command.set_schema(schema_name);
 
243
  command.set_table(table_name);
 
244
 
 
245
  /* 
 
246
   * Now we construct the specialized DeleteRecord command inside
 
247
   * the Command container...
 
248
   */
 
249
  //DeleteRecord *change_record= command.mutable_delete_record();
 
250
  
 
251
  push(&command);
 
252
}
 
253
 
 
254
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
255
{
 
256
  using namespace drizzled::message;
 
257
  
 
258
  if (replicators.size() == 0 || appliers.size() == 0)
 
259
    return;
 
260
  
 
261
  Command command;
 
262
  command.set_type(Command::RAW_SQL);
 
263
  command.set_timestamp(in_session->getCurrentTimestamp());
 
264
 
 
265
  setCommandTransactionContext(&command, in_session);
 
266
 
 
267
  std::string query(in_query, in_query_len);
 
268
  command.set_sql(query);
 
269
 
 
270
  push(&command);
 
271
  
 
272
}
 
273
 
 
274
void TransactionServices::push(drizzled::message::Command *to_push)
 
275
{
 
276
  std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
 
277
  std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
152
278
  appl_start_iter= appliers.begin();
153
279
 
154
 
  plugin::TransactionReplicator *cur_repl;
155
 
  plugin::TransactionApplier *cur_appl;
 
280
  drizzled::plugin::Replicator *cur_repl;
 
281
  drizzled::plugin::Applier *cur_appl;
156
282
 
157
283
  while (repl_iter != replicators.end())
158
284
  {
159
285
    cur_repl= *repl_iter;
160
 
    if (! cur_repl->isEnabled())
 
286
    if (! cur_repl->isActive())
161
287
    {
162
288
      ++repl_iter;
163
289
      continue;
168
294
    {
169
295
      cur_appl= *appl_iter;
170
296
 
171
 
      if (! cur_appl->isEnabled())
 
297
      if (! cur_appl->isActive())
172
298
      {
173
299
        ++appl_iter;
174
300
        continue;
175
301
      }
176
302
 
177
303
      cur_repl->replicate(cur_appl, to_push);
178
 
      
179
 
      /* 
180
 
       * We update the timestamp for the last applied Transaction so that
181
 
       * publisher plugins can ask the replication services when the
182
 
       * last known applied Transaction was using the getLastAppliedTimestamp()
183
 
       * method.
184
 
       */
185
 
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
186
304
      ++appl_iter;
187
305
    }
188
306
    ++repl_iter;
189
307
  }
190
308
}
191
309
 
192
 
} /* namespace drizzled */
 
310
 
 
311
} /* end namespace drizzled */