~drizzle-trunk/drizzle/development

636.1.1 by Mark Atwood
add replicator plugin type
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 *
6
 *  Authors:
7
 *
8
 *    Jay Pipes <joinfu@sun.com>
636.1.1 by Mark Atwood
add replicator plugin type
9
 *
10
 *  This program is free software; you can redistribute it and/or modify
11
 *  it under the terms of the GNU General Public License as published by
12
 *  the Free Software Foundation; version 2 of the License.
13
 *
14
 *  This program is distributed in the hope that it will be useful,
15
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 *  GNU General Public License for more details.
18
 *
19
 *  You should have received a copy of the GNU General Public License
20
 *  along with this program; if not, write to the Free Software
21
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
22
 */
23
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
24
/**
25
 * @file Server-side utility which is responsible for managing the 
26
 * communication between the kernel, replicator plugins, and applier plugins.
27
 *
28
 * TransactionServices is a bridge between modules and the kernel, and its
29
 * primary function is to take internal events (for instance the start of a 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
31
 * and construct GPB Messages that are passed to the registered replicator and
32
 * applier plugins.
33
 *
34
 * The reason for this functionality is to encapsulate all communication
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
36
 * Instead of the plugin having to understand the (often fluidly changing)
37
 * mechanics of the kernel, all the plugin needs to understand is the message
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
39
 * these messages.
40
 *
41
 * @see /drizzled/message/transaction.proto
42
 */
43
44
#include "drizzled/server_includes.h"
45
#include "drizzled/transaction_services.h"
46
#include "drizzled/plugin/replicator.h"
47
#include "drizzled/plugin/applier.h"
48
#include "drizzled/message/transaction.pb.h"
49
#include "drizzled/message/table.pb.h"
50
#include "drizzled/gettext.h"
51
#include "drizzled/session.h"
971.1.49 by Monty Taylor
Hooked transaction_services into Plugin_registry.
52
#include "drizzled/plugin_registry.h"
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
53
54
#include <vector>
55
56
drizzled::TransactionServices transaction_services;
57
971.1.49 by Monty Taylor
Hooked transaction_services into Plugin_registry.
58
void add_replicator(drizzled::plugin::Replicator *repl)
59
{
60
  transaction_services.attachReplicator(repl);
61
}
62
971.1.52 by Monty Taylor
Did the finalizers. Renamed plugin_registry.
63
void remove_replicator(drizzled::plugin::Replicator *repl)
660.1.3 by Eric Herman
removed trailing whitespace with simple script:
64
{
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
65
  transaction_services.detachReplicator(repl);
636.1.1 by Mark Atwood
add replicator plugin type
66
}
67
971.1.52 by Monty Taylor
Did the finalizers. Renamed plugin_registry.
68
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
69
namespace drizzled
70
{
71
72
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
73
{
74
  replicators.push_back(in_replicator);
75
}
76
77
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
78
{
79
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
80
}
81
82
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
83
{
84
  appliers.push_back(in_applier);
85
}
86
87
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
88
{
89
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
90
}
91
92
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
93
                                                     , Session *in_session) const
94
{
95
  using namespace drizzled::message;
96
97
  TransactionContext *trx= in_command->mutable_transaction_context();
98
  trx->set_server_id(in_session->getServerId());
99
  trx->set_transaction_id(in_session->getTransactionId());
100
}
101
102
void TransactionServices::startTransaction(Session *in_session)
103
{
104
  using namespace drizzled::message;
105
  
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
106
  if (replicators.size() == 0 || appliers.size() == 0)
107
    return;
108
  
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
109
  Command command;
110
  command.set_type(Command::START_TRANSACTION);
111
  command.set_timestamp(in_session->getCurrentTimestamp());
112
113
  setCommandTransactionContext(&command, in_session);
114
  
115
  push(&command);
116
}
117
118
void TransactionServices::commitTransaction(Session *in_session)
119
{
120
  using namespace drizzled::message;
121
  
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
122
  if (replicators.size() == 0 || appliers.size() == 0)
123
    return;
124
  
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
125
  Command command;
126
  command.set_type(Command::COMMIT);
127
  command.set_timestamp(in_session->getCurrentTimestamp());
128
129
  setCommandTransactionContext(&command, in_session);
130
  
131
  push(&command);
132
}
133
134
void TransactionServices::rollbackTransaction(Session *in_session)
135
{
136
  using namespace drizzled::message;
137
  
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
138
  if (replicators.size() == 0 || appliers.size() == 0)
139
    return;
140
  
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
141
  Command command;
142
  command.set_type(Command::ROLLBACK);
143
  command.set_timestamp(in_session->getCurrentTimestamp());
144
145
  setCommandTransactionContext(&command, in_session);
146
  
147
  push(&command);
148
}
149
150
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
151
{
152
  using namespace drizzled::message;
153
  
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
154
  if (replicators.size() == 0 || appliers.size() == 0)
155
    return;
156
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
157
  Command command;
158
  command.set_type(Command::INSERT);
159
  command.set_timestamp(in_session->getCurrentTimestamp());
160
161
  setCommandTransactionContext(&command, in_session);
162
163
  const char *schema_name= in_table->getShare()->db.str;
164
  const char *table_name= in_table->getShare()->table_name.str;
165
166
  command.set_schema(schema_name);
167
  command.set_table(table_name);
168
169
  /* 
170
   * Now we construct the specialized InsertRecord command inside
171
   * the Command container...
172
   */
173
  InsertRecord *change_record= command.mutable_insert_record();
174
175
  Field *current_field;
176
  Field **table_fields= in_table->field;
177
  String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
178
  string_value->set_charset(system_charset_info);
179
180
  Table::Field *cur_field;
181
182
  while ((current_field= *table_fields++) != NULL) 
183
  {
184
    cur_field= change_record->add_insert_field();
185
    cur_field->set_name(std::string(current_field->field_name));
186
    cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
187
    string_value= current_field->val_str(string_value);
188
    change_record->add_insert_value(std::string(string_value->c_ptr()));
189
    string_value->free(); /* I wish there was a clear() method... */
190
  }
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
191
192
  if (string_value)
193
    delete string_value; /* Is this needed with memroot allocation? */
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
194
  
195
  push(&command);
196
}
197
198
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
199
{
200
  using namespace drizzled::message;
201
  
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
202
  if (replicators.size() == 0 || appliers.size() == 0)
203
    return;
204
  
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
205
  Command command;
206
  command.set_type(Command::UPDATE);
207
  command.set_timestamp(in_session->getCurrentTimestamp());
208
209
  setCommandTransactionContext(&command, in_session);
210
211
  const char *schema_name= in_table->getShare()->db.str;
212
  const char *table_name= in_table->getShare()->table_name.str;
213
214
  command.set_schema(schema_name);
215
  command.set_table(table_name);
216
217
  /* 
218
   * Now we construct the specialized UpdateRecord command inside
219
   * the Command container...
220
   */
221
  //UpdateRecord *change_record= command.mutable_update_record();
222
223
  push(&command);
224
}
225
226
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
227
{
228
  using namespace drizzled::message;
229
  
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
230
  if (replicators.size() == 0 || appliers.size() == 0)
231
    return;
232
  
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
233
  Command command;
234
  command.set_type(Command::DELETE);
235
  command.set_timestamp(in_session->getCurrentTimestamp());
236
237
  setCommandTransactionContext(&command, in_session);
238
239
  const char *schema_name= in_table->getShare()->db.str;
240
  const char *table_name= in_table->getShare()->table_name.str;
241
242
  command.set_schema(schema_name);
243
  command.set_table(table_name);
244
245
  /* 
246
   * Now we construct the specialized DeleteRecord command inside
247
   * the Command container...
248
   */
249
  //DeleteRecord *change_record= command.mutable_delete_record();
250
  
251
  push(&command);
252
}
253
254
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
255
{
256
  using namespace drizzled::message;
257
  
988.1.6 by Jay Pipes
Removed old protobuf_replicator plugin, fixed up db.cc and other files to use new
258
  if (replicators.size() == 0 || appliers.size() == 0)
259
    return;
260
  
988.1.5 by Jay Pipes
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
261
  Command command;
262
  command.set_type(Command::RAW_SQL);
263
  command.set_timestamp(in_session->getCurrentTimestamp());
264
265
  setCommandTransactionContext(&command, in_session);
266
267
  std::string query(in_query, in_query_len);
268
  command.set_sql(query);
269
270
  push(&command);
271
  
272
}
273
274
void TransactionServices::push(drizzled::message::Command *to_push)
275
{
276
  std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
277
  std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
278
  appl_start_iter= appliers.begin();
279
280
  drizzled::plugin::Replicator *cur_repl;
281
  drizzled::plugin::Applier *cur_appl;
282
283
  while (repl_iter != replicators.end())
284
  {
285
    cur_repl= *repl_iter;
286
    if (! cur_repl->isActive())
287
    {
288
      ++repl_iter;
289
      continue;
290
    }
291
    
292
    appl_iter= appl_start_iter;
293
    while (appl_iter != appliers.end())
294
    {
295
      cur_appl= *appl_iter;
296
297
      if (! cur_appl->isActive())
298
      {
299
        ++appl_iter;
300
        continue;
301
      }
302
303
      cur_repl->replicate(cur_appl, to_push);
304
      ++appl_iter;
305
    }
306
    ++repl_iter;
307
  }
308
}
309
310
311
} /* end namespace drizzled */