~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2008-10-06 06:47:29 UTC
  • Revision ID: brian@tangent.org-20081006064729-2i9mhjkzyvow9xsm
RemoveĀ uint.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  This program is free software; you can redistribute it and/or modify
8
 
 *  it under the terms of the GNU General Public License as published by
9
 
 *  the Free Software Foundation; version 2 of the License.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 
 */
20
 
 
21
 
/**
22
 
 * @file Transaction processing code
23
 
 *
24
 
 * @note
25
 
 *
26
 
 * The TransactionServices component takes internal events (for instance the start of a 
27
 
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 
 * component and used during replication.
30
 
 *
31
 
 * The reason for this functionality is to encapsulate all communication
32
 
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 
 * Instead of the plugin having to understand the (often fluidly changing)
34
 
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 
 * these messages.
37
 
 *
38
 
 * @see /drizzled/message/transaction.proto
39
 
 *
40
 
 * @todo
41
 
 *
42
 
 * We really should store the raw bytes in the messages, not the
43
 
 * String value of the Field.  But, to do that, the
44
 
 * statement_transform library needs first to be updated
45
 
 * to include the transformation code to convert raw
46
 
 * Drizzle-internal Field byte representation into something
47
 
 * plugins can understand.
48
 
 */
49
 
 
50
 
#include "config.h"
51
 
#include "drizzled/my_hash.h"
52
 
#include "drizzled/error.h"
53
 
#include "drizzled/gettext.h"
54
 
#include "drizzled/probes.h"
55
 
#include "drizzled/sql_parse.h"
56
 
#include "drizzled/session.h"
57
 
#include "drizzled/sql_base.h"
58
 
#include "drizzled/replication_services.h"
59
 
#include "drizzled/transaction_services.h"
60
 
#include "drizzled/transaction_context.h"
61
 
#include "drizzled/message/transaction.pb.h"
62
 
#include "drizzled/message/statement_transform.h"
63
 
#include "drizzled/resource_context.h"
64
 
#include "drizzled/lock.h"
65
 
#include "drizzled/item/int.h"
66
 
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/timestamp.h"
68
 
#include "drizzled/plugin/client.h"
69
 
#include "drizzled/plugin/monitored_in_transaction.h"
70
 
#include "drizzled/plugin/transactional_storage_engine.h"
71
 
#include "drizzled/plugin/xa_resource_manager.h"
72
 
#include "drizzled/internal/my_sys.h"
73
 
 
74
 
using namespace std;
75
 
 
76
 
#include <vector>
77
 
#include <algorithm>
78
 
#include <functional>
79
 
 
80
 
namespace drizzled
81
 
{
82
 
 
83
 
/**
84
 
 * @defgroup Transactions
85
 
 *
86
 
 * @brief
87
 
 *
88
 
 * Transaction handling in the server
89
 
 *
90
 
 * @detail
91
 
 *
92
 
 * In each client connection, Drizzle maintains two transaction
93
 
 * contexts representing the state of the:
94
 
 *
95
 
 * 1) Statement Transaction
96
 
 * 2) Normal Transaction
97
 
 *
98
 
 * These two transaction contexts represent the transactional
99
 
 * state of a Session's SQL and XA transactions for a single
100
 
 * SQL statement or a series of SQL statements.
101
 
 *
102
 
 * When the Session's connection is in AUTOCOMMIT mode, there
103
 
 * is no practical difference between the statement and the
104
 
 * normal transaction, as each SQL statement is committed or
105
 
 * rolled back depending on the success or failure of the
106
 
 * indvidual SQL statement.
107
 
 *
108
 
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
109
 
 * the Session has explicitly begun a normal SQL transaction using
110
 
 * a BEGIN WORK/START TRANSACTION statement, then the normal
111
 
 * transaction context tracks the aggregate transaction state of
112
 
 * the SQL transaction's individual statements, and the SQL
113
 
 * transaction's commit or rollback is done atomically for all of
114
 
 * the SQL transaction's statement's data changes.
115
 
 *
116
 
 * Technically, a statement transaction can be viewed as a savepoint 
117
 
 * which is maintained automatically in order to make effects of one
118
 
 * statement atomic.
119
 
 *
120
 
 * The normal transaction is started by the user and is typically
121
 
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
122
 
 * The exception to this is that DDL statements implicitly COMMIT
123
 
 * any previously active normal transaction before they begin executing.
124
 
 *
125
 
 * In Drizzle, unlike MySQL, plugins other than a storage engine
126
 
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
127
 
 * plugins will automatically be monitored by Drizzle's transaction 
128
 
 * manager (implemented in this source file), as will all plugins which
129
 
 * implement plugin::XaResourceManager and register with the transaction
130
 
 * manager.
131
 
 *
132
 
 * If Drizzle's transaction manager sees that more than one resource
133
 
 * manager (transactional storage engine or XA resource manager) has modified
134
 
 * data state during a statement or normal transaction, the transaction
135
 
 * manager will automatically use a two-phase commit protocol for all
136
 
 * resources which support XA's distributed transaction protocol.  Unlike
137
 
 * MySQL, storage engines need not manually register with the transaction
138
 
 * manager during a statement's execution.  Previously, in MySQL, all
139
 
 * handlertons would have to call trans_register_ha() at some point after
140
 
 * modifying data state in order to have MySQL include that handler in
141
 
 * an XA transaction.  Drizzle does all of this grunt work behind the
142
 
 * scenes for the storage engine implementers.
143
 
 *
144
 
 * When a connection is closed, the current normal transaction, if
145
 
 * any is currently active, is rolled back.
146
 
 *
147
 
 * Transaction life cycle
148
 
 * ----------------------
149
 
 *
150
 
 * When a new connection is established, session->transaction
151
 
 * members are initialized to an empty state. If a statement uses any tables, 
152
 
 * all affected engines are registered in the statement engine list automatically
153
 
 * in plugin::StorageEngine::startStatement() and 
154
 
 * plugin::TransactionalStorageEngine::startTransaction().
155
 
 *
156
 
 * You can view the lifetime of a normal transaction in the following
157
 
 * call-sequence:
158
 
 *
159
 
 * drizzled::statement::Statement::execute()
160
 
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
161
 
 *     drizzled::TransactionServices::registerResourceForTransaction()
162
 
 *     drizzled::TransactionServices::registerResourceForStatement()
163
 
 *     drizzled::plugin::StorageEngine::startStatement()
164
 
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
165
 
 *     drizzled::plugin::StorageEngine::endStatement()
166
 
 *   drizzled::TransactionServices::autocommitOrRollback()
167
 
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
168
 
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
169
 
 *
170
 
 * Roles and responsibilities
171
 
 * --------------------------
172
 
 *
173
 
 * Beginning of SQL Statement (and Statement Transaction)
174
 
 * ------------------------------------------------------
175
 
 *
176
 
 * At the start of each SQL statement, for each storage engine
177
 
 * <strong>that is involved in the SQL statement</strong>, the kernel 
178
 
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
179
 
 * engine needs to track some data for the statement, it should use
180
 
 * this method invocation to initialize this data.  This is the
181
 
 * beginning of what is called the "statement transaction".
182
 
 *
183
 
 * <strong>For transaction storage engines (those storage engines
184
 
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
185
 
 * kernel automatically determines if the start of the SQL statement 
186
 
 * transaction should <em>also</em> begin the normal SQL transaction.
187
 
 * This occurs when the connection is in NOT in autocommit mode. If
188
 
 * the kernel detects this, then the kernel automatically starts the
189
 
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
190
 
 * method and then calls plugin::StorageEngine::startStatement()
191
 
 * afterwards.
192
 
 *
193
 
 * Beginning of an SQL "Normal" Transaction
194
 
 * ----------------------------------------
195
 
 *
196
 
 * As noted above, a "normal SQL transaction" may be started when
197
 
 * an SQL statement is started in a connection and the connection is
198
 
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
199
 
 *
200
 
 * In addition, when a user executes a START TRANSACTION or
201
 
 * BEGIN WORK statement in a connection, the kernel explicitly
202
 
 * calls each transactional storage engine's startTransaction() method.
203
 
 *
204
 
 * Ending of an SQL Statement (and Statement Transaction)
205
 
 * ------------------------------------------------------
206
 
 *
207
 
 * At the end of each SQL statement, for each of the aforementioned
208
 
 * involved storage engines, the kernel calls the engine's
209
 
 * plugin::StorageEngine::endStatement() method.  If the engine
210
 
 * has initialized or modified some internal data about the
211
 
 * statement transaction, it should use this method to reset or destroy
212
 
 * this data appropriately.
213
 
 *
214
 
 * Ending of an SQL "Normal" Transaction
215
 
 * -------------------------------------
216
 
 *
217
 
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
218
 
 * depending on the success or failure of the statement transaction(s) 
219
 
 * it encloses.
220
 
 *
221
 
 * The end of a "normal transaction" occurs when any of the following
222
 
 * occurs:
223
 
 *
224
 
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
225
 
 *    then the normal transaction which encloses the statement
226
 
 *    transaction ends
227
 
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
228
 
 * 3) Just before a DDL operation occurs, the kernel will implicitly
229
 
 *    commit the active normal transaction
230
 
 *
231
 
 * Transactions and Non-transactional Storage Engines
232
 
 * --------------------------------------------------
233
 
 *
234
 
 * For non-transactional engines, this call can be safely ignored, an
235
 
 * the kernel tracks whether a non-transactional engine has changed
236
 
 * any data state, and warns the user appropriately if a transaction
237
 
 * (statement or normal) is rolled back after such non-transactional
238
 
 * data changes have been made.
239
 
 *
240
 
 * XA Two-phase Commit Protocol
241
 
 * ----------------------------
242
 
 *
243
 
 * During statement execution, whenever any of data-modifying
244
 
 * PSEA API methods is used, e.g. Cursor::write_row() or
245
 
 * Cursor::update_row(), the read-write flag is raised in the
246
 
 * statement transaction for the involved engine.
247
 
 * Currently All PSEA calls are "traced", and the data can not be
248
 
 * changed in a way other than issuing a PSEA call. Important:
249
 
 * unless this invariant is preserved the server will not know that
250
 
 * a transaction in a given engine is read-write and will not
251
 
 * involve the two-phase commit protocol!
252
 
 *
253
 
 * At the end of a statement, TransactionServices::autocommitOrRollback()
254
 
 * is invoked. This call in turn
255
 
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
256
 
 * resource manager.
257
 
 *
258
 
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
259
 
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
260
 
 * is...)
261
 
 * 
262
 
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
263
 
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
264
 
 * At statement commit, the statement-related read-write engine
265
 
 * flag is propagated to the corresponding flag in the normal
266
 
 * transaction.  When the commit is complete, the list of registered
267
 
 * engines is cleared.
268
 
 *
269
 
 * Rollback is handled in a similar fashion.
270
 
 *
271
 
 * Additional notes on DDL and the normal transaction.
272
 
 * ---------------------------------------------------
273
 
 *
274
 
 * CREATE TABLE .. SELECT can start a *new* normal transaction
275
 
 * because of the fact that SELECTs on a transactional storage
276
 
 * engine participate in the normal SQL transaction (due to
277
 
 * isolation level issues and consistent read views).
278
 
 *
279
 
 * Behaviour of the server in this case is currently badly
280
 
 * defined.
281
 
 *
282
 
 * DDL statements use a form of "semantic" logging
283
 
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
284
 
 * the newly created table is deleted.
285
 
 * 
286
 
 * In addition, some DDL statements issue interim transaction
287
 
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
288
 
 * from the original table to the internal temporary table. Other
289
 
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
290
 
 * after itself.
291
 
 *
292
 
 * And finally there is a group of DDL statements such as
293
 
 * RENAME/DROP TABLE that doesn't start a new transaction
294
 
 * and doesn't commit.
295
 
 *
296
 
 * A consistent behaviour is perhaps to always commit the normal
297
 
 * transaction after all DDLs, just like the statement transaction
298
 
 * is always committed at the end of all statements.
299
 
 */
300
 
void TransactionServices::registerResourceForStatement(Session *session,
301
 
                                                       plugin::MonitoredInTransaction *monitored,
302
 
                                                       plugin::TransactionalStorageEngine *engine)
303
 
{
304
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
305
 
  {
306
 
    /* 
307
 
     * Now we automatically register this resource manager for the
308
 
     * normal transaction.  This is fine because a statement
309
 
     * transaction registration should always enlist the resource
310
 
     * in the normal transaction which contains the statement
311
 
     * transaction.
312
 
     */
313
 
    registerResourceForTransaction(session, monitored, engine);
314
 
  }
315
 
 
316
 
  TransactionContext *trans= &session->transaction.stmt;
317
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
318
 
 
319
 
  if (resource_context->isStarted())
320
 
    return; /* already registered, return */
321
 
 
322
 
  assert(monitored->participatesInSqlTransaction());
323
 
  assert(not monitored->participatesInXaTransaction());
324
 
 
325
 
  resource_context->setMonitored(monitored);
326
 
  resource_context->setTransactionalStorageEngine(engine);
327
 
  trans->registerResource(resource_context);
328
 
 
329
 
  trans->no_2pc|= true;
330
 
}
331
 
 
332
 
void TransactionServices::registerResourceForStatement(Session *session,
333
 
                                                       plugin::MonitoredInTransaction *monitored,
334
 
                                                       plugin::TransactionalStorageEngine *engine,
335
 
                                                       plugin::XaResourceManager *resource_manager)
336
 
{
337
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
338
 
  {
339
 
    /* 
340
 
     * Now we automatically register this resource manager for the
341
 
     * normal transaction.  This is fine because a statement
342
 
     * transaction registration should always enlist the resource
343
 
     * in the normal transaction which contains the statement
344
 
     * transaction.
345
 
     */
346
 
    registerResourceForTransaction(session, monitored, engine, resource_manager);
347
 
  }
348
 
 
349
 
  TransactionContext *trans= &session->transaction.stmt;
350
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
351
 
 
352
 
  if (resource_context->isStarted())
353
 
    return; /* already registered, return */
354
 
 
355
 
  assert(monitored->participatesInXaTransaction());
356
 
  assert(monitored->participatesInSqlTransaction());
357
 
 
358
 
  resource_context->setMonitored(monitored);
359
 
  resource_context->setTransactionalStorageEngine(engine);
360
 
  resource_context->setXaResourceManager(resource_manager);
361
 
  trans->registerResource(resource_context);
362
 
 
363
 
  trans->no_2pc|= false;
364
 
}
365
 
 
366
 
void TransactionServices::registerResourceForTransaction(Session *session,
367
 
                                                         plugin::MonitoredInTransaction *monitored,
368
 
                                                         plugin::TransactionalStorageEngine *engine)
369
 
{
370
 
  TransactionContext *trans= &session->transaction.all;
371
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
372
 
 
373
 
  if (resource_context->isStarted())
374
 
    return; /* already registered, return */
375
 
 
376
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
377
 
 
378
 
  trans->registerResource(resource_context);
379
 
 
380
 
  assert(monitored->participatesInSqlTransaction());
381
 
  assert(not monitored->participatesInXaTransaction());
382
 
 
383
 
  resource_context->setMonitored(monitored);
384
 
  resource_context->setTransactionalStorageEngine(engine);
385
 
  trans->no_2pc|= true;
386
 
 
387
 
  if (session->transaction.xid_state.xid.is_null())
388
 
    session->transaction.xid_state.xid.set(session->getQueryId());
389
 
 
390
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
391
 
 
392
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
393
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
394
 
    registerResourceForStatement(session, monitored, engine);
395
 
}
396
 
 
397
 
void TransactionServices::registerResourceForTransaction(Session *session,
398
 
                                                         plugin::MonitoredInTransaction *monitored,
399
 
                                                         plugin::TransactionalStorageEngine *engine,
400
 
                                                         plugin::XaResourceManager *resource_manager)
401
 
{
402
 
  TransactionContext *trans= &session->transaction.all;
403
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
404
 
 
405
 
  if (resource_context->isStarted())
406
 
    return; /* already registered, return */
407
 
 
408
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
409
 
 
410
 
  trans->registerResource(resource_context);
411
 
 
412
 
  assert(monitored->participatesInSqlTransaction());
413
 
 
414
 
  resource_context->setMonitored(monitored);
415
 
  resource_context->setXaResourceManager(resource_manager);
416
 
  resource_context->setTransactionalStorageEngine(engine);
417
 
  trans->no_2pc|= true;
418
 
 
419
 
  if (session->transaction.xid_state.xid.is_null())
420
 
    session->transaction.xid_state.xid.set(session->getQueryId());
421
 
 
422
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
423
 
 
424
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
425
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
426
 
    registerResourceForStatement(session, monitored, engine, resource_manager);
427
 
}
428
 
 
429
 
/**
430
 
  Check if we can skip the two-phase commit.
431
 
 
432
 
  A helper function to evaluate if two-phase commit is mandatory.
433
 
  As a side effect, propagates the read-only/read-write flags
434
 
  of the statement transaction to its enclosing normal transaction.
435
 
 
436
 
  @retval true   we must run a two-phase commit. Returned
437
 
                 if we have at least two engines with read-write changes.
438
 
  @retval false  Don't need two-phase commit. Even if we have two
439
 
                 transactional engines, we can run two independent
440
 
                 commits if changes in one of the engines are read-only.
441
 
*/
442
 
static
443
 
bool
444
 
ha_check_and_coalesce_trx_read_only(Session *session,
445
 
                                    TransactionContext::ResourceContexts &resource_contexts,
446
 
                                    bool normal_transaction)
447
 
{
448
 
  /* The number of storage engines that have actual changes. */
449
 
  unsigned num_resources_modified_data= 0;
450
 
  ResourceContext *resource_context;
451
 
 
452
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453
 
       it != resource_contexts.end();
454
 
       ++it)
455
 
  {
456
 
    resource_context= *it;
457
 
    if (resource_context->hasModifiedData())
458
 
      ++num_resources_modified_data;
459
 
 
460
 
    if (! normal_transaction)
461
 
    {
462
 
      ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463
 
      assert(resource_context != resource_context_normal);
464
 
      /*
465
 
        Merge read-only/read-write information about statement
466
 
        transaction to its enclosing normal transaction. Do this
467
 
        only if in a real transaction -- that is, if we know
468
 
        that resource_context_all is registered in session->transaction.all.
469
 
        Since otherwise we only clutter the normal transaction flags.
470
 
      */
471
 
      if (resource_context_normal->isStarted()) /* false if autocommit. */
472
 
        resource_context_normal->coalesceWith(resource_context);
473
 
    }
474
 
    else if (num_resources_modified_data > 1)
475
 
    {
476
 
      /*
477
 
        It is a normal transaction, so we don't need to merge read/write
478
 
        information up, and the need for two-phase commit has been
479
 
        already established. Break the loop prematurely.
480
 
      */
481
 
      break;
482
 
    }
483
 
  }
484
 
  return num_resources_modified_data > 1;
485
 
}
486
 
 
487
 
 
488
 
/**
489
 
  @retval
490
 
    0   ok
491
 
  @retval
492
 
    1   transaction was rolled back
493
 
  @retval
494
 
    2   error during commit, data may be inconsistent
495
 
 
496
 
  @todo
497
 
    Since we don't support nested statement transactions in 5.0,
498
 
    we can't commit or rollback stmt transactions while we are inside
499
 
    stored functions or triggers. So we simply do nothing now.
500
 
    TODO: This should be fixed in later ( >= 5.1) releases.
501
 
*/
502
 
int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)
503
 
{
504
 
  int error= 0, cookie= 0;
505
 
  /*
506
 
    'all' means that this is either an explicit commit issued by
507
 
    user, or an implicit commit issued by a DDL.
508
 
  */
509
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
510
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
511
 
 
512
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
513
 
 
514
 
  /*
515
 
    We must not commit the normal transaction if a statement
516
 
    transaction is pending. Otherwise statement transaction
517
 
    flags will not get propagated to its normal transaction's
518
 
    counterpart.
519
 
  */
520
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
521
 
              trans == &session->transaction.stmt);
522
 
 
523
 
  if (resource_contexts.empty() == false)
524
 
  {
525
 
    bool must_2pc;
526
 
 
527
 
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
528
 
    {
529
 
      ha_rollback_trans(session, normal_transaction);
530
 
      return 1;
531
 
    }
532
 
 
533
 
    must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
534
 
 
535
 
    if (! trans->no_2pc && must_2pc)
536
 
    {
537
 
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538
 
           it != resource_contexts.end() && ! error;
539
 
           ++it)
540
 
      {
541
 
        ResourceContext *resource_context= *it;
542
 
        int err;
543
 
        /*
544
 
          Do not call two-phase commit if this particular
545
 
          transaction is read-only. This allows for simpler
546
 
          implementation in engines that are always read-only.
547
 
        */
548
 
        if (! resource_context->hasModifiedData())
549
 
          continue;
550
 
 
551
 
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
552
 
 
553
 
        if (resource->participatesInXaTransaction())
554
 
        {
555
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
556
 
          {
557
 
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
558
 
            error= 1;
559
 
          }
560
 
          else
561
 
          {
562
 
            status_var_increment(session->status_var.ha_prepare_count);
563
 
          }
564
 
        }
565
 
      }
566
 
      if (error)
567
 
      {
568
 
        ha_rollback_trans(session, normal_transaction);
569
 
        error= 1;
570
 
        goto end;
571
 
      }
572
 
    }
573
 
    error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
574
 
end:
575
 
    if (is_real_trans)
576
 
      start_waiting_global_read_lock(session);
577
 
  }
578
 
  return error;
579
 
}
580
 
 
581
 
/**
582
 
  @note
583
 
  This function does not care about global read lock. A caller should.
584
 
*/
585
 
int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)
586
 
{
587
 
  int error=0;
588
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
589
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
590
 
 
591
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
592
 
 
593
 
  if (resource_contexts.empty() == false)
594
 
  {
595
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
596
 
         it != resource_contexts.end();
597
 
         ++it)
598
 
    {
599
 
      int err;
600
 
      ResourceContext *resource_context= *it;
601
 
 
602
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
603
 
 
604
 
      if (resource->participatesInXaTransaction())
605
 
      {
606
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
607
 
        {
608
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
609
 
          error= 1;
610
 
        }
611
 
        else if (normal_transaction)
612
 
        {
613
 
          status_var_increment(session->status_var.ha_commit_count);
614
 
        }
615
 
      }
616
 
      else if (resource->participatesInSqlTransaction())
617
 
      {
618
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
619
 
        {
620
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
621
 
          error= 1;
622
 
        }
623
 
        else if (normal_transaction)
624
 
        {
625
 
          status_var_increment(session->status_var.ha_commit_count);
626
 
        }
627
 
      }
628
 
      resource_context->reset(); /* keep it conveniently zero-filled */
629
 
    }
630
 
 
631
 
    if (is_real_trans)
632
 
      session->transaction.xid_state.xid.null();
633
 
 
634
 
    if (normal_transaction)
635
 
    {
636
 
      session->variables.tx_isolation= session->session_tx_isolation;
637
 
      session->transaction.cleanup();
638
 
    }
639
 
  }
640
 
  trans->reset();
641
 
  if (error == 0)
642
 
  {
643
 
    if (is_real_trans)
644
 
    {
645
 
      /* 
646
 
       * We commit the normal transaction by finalizing the transaction message
647
 
       * and propogating the message to all registered replicators.
648
 
       */
649
 
      commitTransactionMessage(session);
650
 
    }
651
 
  }
652
 
  return error;
653
 
}
654
 
 
655
 
int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)
656
 
{
657
 
  int error= 0;
658
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
659
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
660
 
 
661
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
662
 
 
663
 
  /*
664
 
    We must not rollback the normal transaction if a statement
665
 
    transaction is pending.
666
 
  */
667
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
668
 
              trans == &session->transaction.stmt);
669
 
 
670
 
  if (resource_contexts.empty() == false)
671
 
  {
672
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
673
 
         it != resource_contexts.end();
674
 
         ++it)
675
 
    {
676
 
      int err;
677
 
      ResourceContext *resource_context= *it;
678
 
 
679
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
680
 
 
681
 
      if (resource->participatesInXaTransaction())
682
 
      {
683
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
684
 
        {
685
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
686
 
          error= 1;
687
 
        }
688
 
        else if (normal_transaction)
689
 
        {
690
 
          status_var_increment(session->status_var.ha_rollback_count);
691
 
        }
692
 
      }
693
 
      else if (resource->participatesInSqlTransaction())
694
 
      {
695
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
696
 
        {
697
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
698
 
          error= 1;
699
 
        }
700
 
        else if (normal_transaction)
701
 
        {
702
 
          status_var_increment(session->status_var.ha_rollback_count);
703
 
        }
704
 
      }
705
 
      resource_context->reset(); /* keep it conveniently zero-filled */
706
 
    }
707
 
    
708
 
    /* 
709
 
     * We need to signal the ROLLBACK to ReplicationServices here
710
 
     * BEFORE we set the transaction ID to NULL.  This is because
711
 
     * if a bulk segment was sent to replicators, we need to send
712
 
     * a rollback statement with the corresponding transaction ID
713
 
     * to rollback.
714
 
     */
715
 
    rollbackTransactionMessage(session);
716
 
 
717
 
    if (is_real_trans)
718
 
      session->transaction.xid_state.xid.null();
719
 
    if (normal_transaction)
720
 
    {
721
 
      session->variables.tx_isolation=session->session_tx_isolation;
722
 
      session->transaction.cleanup();
723
 
    }
724
 
  }
725
 
  if (normal_transaction)
726
 
    session->transaction_rollback_request= false;
727
 
 
728
 
  /*
729
 
   * If a non-transactional table was updated, warn the user
730
 
   */
731
 
  if (is_real_trans &&
732
 
      session->transaction.all.hasModifiedNonTransData() &&
733
 
      session->killed != Session::KILL_CONNECTION)
734
 
  {
735
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
736
 
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
737
 
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
738
 
  }
739
 
  trans->reset();
740
 
  return error;
741
 
}
742
 
 
743
 
/**
744
 
  This is used to commit or rollback a single statement depending on
745
 
  the value of error.
746
 
 
747
 
  @note
748
 
    Note that if the autocommit is on, then the following call inside
749
 
    InnoDB will commit or rollback the whole transaction (= the statement). The
750
 
    autocommit mechanism built into InnoDB is based on counting locks, but if
751
 
    the user has used LOCK TABLES then that mechanism does not know to do the
752
 
    commit.
753
 
*/
754
 
int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
755
 
{
756
 
  if (session->transaction.stmt.getResourceContexts().empty() == false)
757
 
  {
758
 
    if (! error)
759
 
    {
760
 
      if (ha_commit_trans(session, false))
761
 
        error= 1;
762
 
    }
763
 
    else
764
 
    {
765
 
      (void) ha_rollback_trans(session, false);
766
 
      if (session->transaction_rollback_request)
767
 
        (void) ha_rollback_trans(session, true);
768
 
    }
769
 
 
770
 
    session->variables.tx_isolation= session->session_tx_isolation;
771
 
  }
772
 
  return error;
773
 
}
774
 
 
775
 
/**
776
 
  return the list of XID's to a client, the same way SHOW commands do.
777
 
 
778
 
  @note
779
 
    I didn't find in XA specs that an RM cannot return the same XID twice,
780
 
    so mysql_xa_recover does not filter XID's to ensure uniqueness.
781
 
    It can be easily fixed later, if necessary.
782
 
*/
783
 
bool TransactionServices::mysql_xa_recover(Session *session)
784
 
{
785
 
  List<Item> field_list;
786
 
  int i= 0;
787
 
  XID_STATE *xs;
788
 
 
789
 
  field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
790
 
  field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
791
 
  field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
792
 
  field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
793
 
 
794
 
  if (session->client->sendFields(&field_list))
795
 
    return 1;
796
 
 
797
 
  pthread_mutex_lock(&LOCK_xid_cache);
798
 
  while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
799
 
  {
800
 
    if (xs->xa_state==XA_PREPARED)
801
 
    {
802
 
      session->client->store((int64_t)xs->xid.formatID);
803
 
      session->client->store((int64_t)xs->xid.gtrid_length);
804
 
      session->client->store((int64_t)xs->xid.bqual_length);
805
 
      session->client->store(xs->xid.data,
806
 
                             xs->xid.gtrid_length+xs->xid.bqual_length);
807
 
      if (session->client->flush())
808
 
      {
809
 
        pthread_mutex_unlock(&LOCK_xid_cache);
810
 
        return 1;
811
 
      }
812
 
    }
813
 
  }
814
 
 
815
 
  pthread_mutex_unlock(&LOCK_xid_cache);
816
 
  session->my_eof();
817
 
  return 0;
818
 
}
819
 
 
820
 
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
821
 
{
822
 
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
823
 
  {
824
 
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
825
 
     * resources aren't the same... */
826
 
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
827
 
  }
828
 
};
829
 
 
830
 
int TransactionServices::ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv)
831
 
{
832
 
  int error= 0;
833
 
  TransactionContext *trans= &session->transaction.all;
834
 
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
835
 
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
836
 
 
837
 
  trans->no_2pc= false;
838
 
  /*
839
 
    rolling back to savepoint in all storage engines that were part of the
840
 
    transaction when the savepoint was set
841
 
  */
842
 
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
843
 
       it != sv_resource_contexts.end();
844
 
       ++it)
845
 
  {
846
 
    int err;
847
 
    ResourceContext *resource_context= *it;
848
 
 
849
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
850
 
 
851
 
    if (resource->participatesInSqlTransaction())
852
 
    {
853
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
854
 
      {
855
 
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
856
 
        error= 1;
857
 
      }
858
 
      else
859
 
      {
860
 
        status_var_increment(session->status_var.ha_savepoint_rollback_count);
861
 
      }
862
 
    }
863
 
    trans->no_2pc|= not resource->participatesInXaTransaction();
864
 
  }
865
 
  /*
866
 
    rolling back the transaction in all storage engines that were not part of
867
 
    the transaction when the savepoint was set
868
 
  */
869
 
  {
870
 
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
871
 
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
872
 
    TransactionContext::ResourceContexts set_difference_contexts;
873
 
 
874
 
    sort(sorted_tran_resource_contexts.begin(),
875
 
         sorted_tran_resource_contexts.end(),
876
 
         ResourceContextCompare());
877
 
    sort(sorted_sv_resource_contexts.begin(),
878
 
         sorted_sv_resource_contexts.end(),
879
 
         ResourceContextCompare());
880
 
    set_difference(sorted_tran_resource_contexts.begin(),
881
 
                   sorted_tran_resource_contexts.end(),
882
 
                   sorted_sv_resource_contexts.begin(),
883
 
                   sorted_sv_resource_contexts.end(),
884
 
                   set_difference_contexts.begin(),
885
 
                   ResourceContextCompare());
886
 
    /* 
887
 
     * set_difference_contexts now contains all resource contexts
888
 
     * which are in the transaction context but were NOT in the
889
 
     * savepoint's resource contexts.
890
 
     */
891
 
        
892
 
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
893
 
         it != set_difference_contexts.end();
894
 
         ++it)
895
 
    {
896
 
      ResourceContext *resource_context= *it;
897
 
      int err;
898
 
 
899
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
900
 
 
901
 
      if (resource->participatesInSqlTransaction())
902
 
      {
903
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
904
 
        {
905
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
906
 
          error= 1;
907
 
        }
908
 
        else
909
 
        {
910
 
          status_var_increment(session->status_var.ha_rollback_count);
911
 
        }
912
 
      }
913
 
      resource_context->reset(); /* keep it conveniently zero-filled */
914
 
    }
915
 
  }
916
 
  trans->setResourceContexts(sv_resource_contexts);
917
 
  return error;
918
 
}
919
 
 
920
 
/**
921
 
  @note
922
 
  according to the sql standard (ISO/IEC 9075-2:2003)
923
 
  section "4.33.4 SQL-statements and transaction states",
924
 
  NamedSavepoint is *not* transaction-initiating SQL-statement
925
 
*/
926
 
int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)
927
 
{
928
 
  int error= 0;
929
 
  TransactionContext *trans= &session->transaction.all;
930
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
931
 
 
932
 
  if (resource_contexts.empty() == false)
933
 
  {
934
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
935
 
         it != resource_contexts.end();
936
 
         ++it)
937
 
    {
938
 
      ResourceContext *resource_context= *it;
939
 
      int err;
940
 
 
941
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
942
 
 
943
 
      if (resource->participatesInSqlTransaction())
944
 
      {
945
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
946
 
        {
947
 
          my_error(ER_GET_ERRNO, MYF(0), err);
948
 
          error= 1;
949
 
        }
950
 
        else
951
 
        {
952
 
          status_var_increment(session->status_var.ha_savepoint_count);
953
 
        }
954
 
      }
955
 
    }
956
 
  }
957
 
  /*
958
 
    Remember the list of registered storage engines.
959
 
  */
960
 
  sv.setResourceContexts(resource_contexts);
961
 
  return error;
962
 
}
963
 
 
964
 
int TransactionServices::ha_release_savepoint(Session *session, NamedSavepoint &sv)
965
 
{
966
 
  int error= 0;
967
 
 
968
 
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
969
 
 
970
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
971
 
       it != resource_contexts.end();
972
 
       ++it)
973
 
  {
974
 
    int err;
975
 
    ResourceContext *resource_context= *it;
976
 
 
977
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
978
 
 
979
 
    if (resource->participatesInSqlTransaction())
980
 
    {
981
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
982
 
      {
983
 
        my_error(ER_GET_ERRNO, MYF(0), err);
984
 
        error= 1;
985
 
      }
986
 
    }
987
 
  }
988
 
  return error;
989
 
}
990
 
 
991
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
992
 
{
993
 
  message::Transaction *transaction= in_session->getTransactionMessage();
994
 
 
995
 
  if (unlikely(transaction == NULL))
996
 
  {
997
 
    /* 
998
 
     * Allocate and initialize a new transaction message 
999
 
     * for this Session object.  Session is responsible for
1000
 
     * deleting transaction message when done with it.
1001
 
     */
1002
 
    transaction= new (nothrow) message::Transaction();
1003
 
    initTransactionMessage(*transaction, in_session);
1004
 
    in_session->setTransactionMessage(transaction);
1005
 
    return transaction;
1006
 
  }
1007
 
  else
1008
 
    return transaction;
1009
 
}
1010
 
 
1011
 
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1012
 
                                          Session *in_session)
1013
 
{
1014
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1015
 
  trx->set_server_id(in_session->getServerId());
1016
 
  trx->set_transaction_id(in_session->getQueryId());
1017
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1018
 
}
1019
 
 
1020
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1021
 
                                              Session *in_session)
1022
 
{
1023
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1024
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
1025
 
}
1026
 
 
1027
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1028
 
                                             Session *in_session)
1029
 
{
1030
 
  delete in_transaction;
1031
 
  in_session->setStatementMessage(NULL);
1032
 
  in_session->setTransactionMessage(NULL);
1033
 
}
1034
 
 
1035
 
void TransactionServices::commitTransactionMessage(Session *in_session)
1036
 
{
1037
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1038
 
  if (! replication_services.isActive())
1039
 
    return;
1040
 
 
1041
 
  /* If there is an active statement message, finalize it */
1042
 
  message::Statement *statement= in_session->getStatementMessage();
1043
 
 
1044
 
  if (statement != NULL)
1045
 
  {
1046
 
    finalizeStatementMessage(*statement, in_session);
1047
 
  }
1048
 
  else
1049
 
    return; /* No data modification occurred inside the transaction */
1050
 
  
1051
 
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1052
 
 
1053
 
  finalizeTransactionMessage(*transaction, in_session);
1054
 
  
1055
 
  replication_services.pushTransactionMessage(*transaction);
1056
 
 
1057
 
  cleanupTransactionMessage(transaction, in_session);
1058
 
}
1059
 
 
1060
 
void TransactionServices::initStatementMessage(message::Statement &statement,
1061
 
                                        message::Statement::Type in_type,
1062
 
                                        Session *in_session)
1063
 
{
1064
 
  statement.set_type(in_type);
1065
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1066
 
  /** @TODO Set sql string optionally */
1067
 
}
1068
 
 
1069
 
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1070
 
                                            Session *in_session)
1071
 
{
1072
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1073
 
  in_session->setStatementMessage(NULL);
1074
 
}
1075
 
 
1076
 
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1077
 
{
1078
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1079
 
  if (! replication_services.isActive())
1080
 
    return;
1081
 
  
1082
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1083
 
 
1084
 
  /*
1085
 
   * OK, so there are two situations that we need to deal with here:
1086
 
   *
1087
 
   * 1) We receive an instruction to ROLLBACK the current transaction
1088
 
   *    and the currently-stored Transaction message is *self-contained*, 
1089
 
   *    meaning that no Statement messages in the Transaction message
1090
 
   *    contain a message having its segment_id member greater than 1.  If
1091
 
   *    no non-segment ID 1 members are found, we can simply clear the
1092
 
   *    current Transaction message and remove it from memory.
1093
 
   *
1094
 
   * 2) If the Transaction message does indeed have a non-end segment, that
1095
 
   *    means that a bulk update/delete/insert Transaction message segment
1096
 
   *    has previously been sent over the wire to replicators.  In this case, 
1097
 
   *    we need to package a Transaction with a Statement message of type
1098
 
   *    ROLLBACK to indicate to replicators that previously-transmitted
1099
 
   *    messages must be un-applied.
1100
 
   */
1101
 
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1102
 
  {
1103
 
    /*
1104
 
     * Clear the transaction, create a Rollback statement message, 
1105
 
     * attach it to the transaction, and push it to replicators.
1106
 
     */
1107
 
    transaction->Clear();
1108
 
    initTransactionMessage(*transaction, in_session);
1109
 
 
1110
 
    message::Statement *statement= transaction->add_statement();
1111
 
 
1112
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1113
 
    finalizeStatementMessage(*statement, in_session);
1114
 
 
1115
 
    finalizeTransactionMessage(*transaction, in_session);
1116
 
    
1117
 
    replication_services.pushTransactionMessage(*transaction);
1118
 
  }
1119
 
  cleanupTransactionMessage(transaction, in_session);
1120
 
}
1121
 
 
1122
 
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1123
 
                                                                 Table *in_table)
1124
 
{
1125
 
  message::Statement *statement= in_session->getStatementMessage();
1126
 
  /*
1127
 
   * We check to see if the current Statement message is of type INSERT.
1128
 
   * If it is not, we finalize the current Statement and ensure a new
1129
 
   * InsertStatement is created.
1130
 
   */
1131
 
  if (statement != NULL &&
1132
 
      statement->type() != message::Statement::INSERT)
1133
 
  {
1134
 
    finalizeStatementMessage(*statement, in_session);
1135
 
    statement= in_session->getStatementMessage();
1136
 
  }
1137
 
 
1138
 
  if (statement == NULL)
1139
 
  {
1140
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1141
 
    /* 
1142
 
     * Transaction message initialized and set, but no statement created
1143
 
     * yet.  We construct one and initialize it, here, then return the
1144
 
     * message after attaching the new Statement message pointer to the 
1145
 
     * Session for easy retrieval later...
1146
 
     */
1147
 
    statement= transaction->add_statement();
1148
 
    setInsertHeader(*statement, in_session, in_table);
1149
 
    in_session->setStatementMessage(statement);
1150
 
  }
1151
 
  return *statement;
1152
 
}
1153
 
 
1154
 
void TransactionServices::setInsertHeader(message::Statement &statement,
1155
 
                                          Session *in_session,
1156
 
                                          Table *in_table)
1157
 
{
1158
 
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1159
 
 
1160
 
  /* 
1161
 
   * Now we construct the specialized InsertHeader message inside
1162
 
   * the generalized message::Statement container...
1163
 
   */
1164
 
  /* Set up the insert header */
1165
 
  message::InsertHeader *header= statement.mutable_insert_header();
1166
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1167
 
 
1168
 
  string schema_name;
1169
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1170
 
  string table_name;
1171
 
  (void) in_table->getShare()->getTableName(table_name);
1172
 
 
1173
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1174
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1175
 
 
1176
 
  Field *current_field;
1177
 
  Field **table_fields= in_table->field;
1178
 
 
1179
 
  message::FieldMetadata *field_metadata;
1180
 
 
1181
 
  /* We will read all the table's fields... */
1182
 
  in_table->setReadSet();
1183
 
 
1184
 
  while ((current_field= *table_fields++) != NULL) 
1185
 
  {
1186
 
    field_metadata= header->add_field_metadata();
1187
 
    field_metadata->set_name(current_field->field_name);
1188
 
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1189
 
  }
1190
 
}
1191
 
 
1192
 
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1193
 
{
1194
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1195
 
  if (! replication_services.isActive())
1196
 
    return false;
1197
 
  /**
1198
 
   * We do this check here because we don't want to even create a 
1199
 
   * statement if there isn't a primary key on the table...
1200
 
   *
1201
 
   * @todo
1202
 
   *
1203
 
   * Multi-column primary keys are handled how exactly?
1204
 
   */
1205
 
  if (in_table->s->primary_key == MAX_KEY)
1206
 
  {
1207
 
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1208
 
    return true;
1209
 
  }
1210
 
 
1211
 
  message::Statement &statement= getInsertStatement(in_session, in_table);
1212
 
 
1213
 
  message::InsertData *data= statement.mutable_insert_data();
1214
 
  data->set_segment_id(1);
1215
 
  data->set_end_segment(true);
1216
 
  message::InsertRecord *record= data->add_record();
1217
 
 
1218
 
  Field *current_field;
1219
 
  Field **table_fields= in_table->field;
1220
 
 
1221
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1222
 
  string_value->set_charset(system_charset_info);
1223
 
 
1224
 
  /* We will read all the table's fields... */
1225
 
  in_table->setReadSet();
1226
 
 
1227
 
  while ((current_field= *table_fields++) != NULL) 
1228
 
  {
1229
 
    string_value= current_field->val_str(string_value);
1230
 
    record->add_insert_value(string_value->c_ptr(), string_value->length());
1231
 
    string_value->free();
1232
 
  }
1233
 
  return false;
1234
 
}
1235
 
 
1236
 
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1237
 
                                                            Table *in_table,
1238
 
                                                            const unsigned char *old_record, 
1239
 
                                                            const unsigned char *new_record)
1240
 
{
1241
 
  message::Statement *statement= in_session->getStatementMessage();
1242
 
  /*
1243
 
   * We check to see if the current Statement message is of type UPDATE.
1244
 
   * If it is not, we finalize the current Statement and ensure a new
1245
 
   * UpdateStatement is created.
1246
 
   */
1247
 
  if (statement != NULL &&
1248
 
      statement->type() != message::Statement::UPDATE)
1249
 
  {
1250
 
    finalizeStatementMessage(*statement, in_session);
1251
 
    statement= in_session->getStatementMessage();
1252
 
  }
1253
 
 
1254
 
  if (statement == NULL)
1255
 
  {
1256
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1257
 
    /* 
1258
 
     * Transaction message initialized and set, but no statement created
1259
 
     * yet.  We construct one and initialize it, here, then return the
1260
 
     * message after attaching the new Statement message pointer to the 
1261
 
     * Session for easy retrieval later...
1262
 
     */
1263
 
    statement= transaction->add_statement();
1264
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1265
 
    in_session->setStatementMessage(statement);
1266
 
  }
1267
 
  return *statement;
1268
 
}
1269
 
 
1270
 
void TransactionServices::setUpdateHeader(message::Statement &statement,
1271
 
                                          Session *in_session,
1272
 
                                          Table *in_table,
1273
 
                                          const unsigned char *old_record, 
1274
 
                                          const unsigned char *new_record)
1275
 
{
1276
 
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1277
 
 
1278
 
  /* 
1279
 
   * Now we construct the specialized UpdateHeader message inside
1280
 
   * the generalized message::Statement container...
1281
 
   */
1282
 
  /* Set up the update header */
1283
 
  message::UpdateHeader *header= statement.mutable_update_header();
1284
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1285
 
 
1286
 
  string schema_name;
1287
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1288
 
  string table_name;
1289
 
  (void) in_table->getShare()->getTableName(table_name);
1290
 
 
1291
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1292
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1293
 
 
1294
 
  Field *current_field;
1295
 
  Field **table_fields= in_table->field;
1296
 
 
1297
 
  message::FieldMetadata *field_metadata;
1298
 
 
1299
 
  /* We will read all the table's fields... */
1300
 
  in_table->setReadSet();
1301
 
 
1302
 
  while ((current_field= *table_fields++) != NULL) 
1303
 
  {
1304
 
    /*
1305
 
     * We add the "key field metadata" -- i.e. the fields which is
1306
 
     * the primary key for the table.
1307
 
     */
1308
 
    if (in_table->s->fieldInPrimaryKey(current_field))
1309
 
    {
1310
 
      field_metadata= header->add_key_field_metadata();
1311
 
      field_metadata->set_name(current_field->field_name);
1312
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1313
 
    }
1314
 
 
1315
 
    /*
1316
 
     * The below really should be moved into the Field API and Record API.  But for now
1317
 
     * we do this crazy pointer fiddling to figure out if the current field
1318
 
     * has been updated in the supplied record raw byte pointers.
1319
 
     */
1320
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1321
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1322
 
 
1323
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1324
 
 
1325
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1326
 
    {
1327
 
      /* Field is changed from old to new */
1328
 
      field_metadata= header->add_set_field_metadata();
1329
 
      field_metadata->set_name(current_field->field_name);
1330
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1331
 
    }
1332
 
  }
1333
 
}
1334
 
void TransactionServices::updateRecord(Session *in_session,
1335
 
                                       Table *in_table, 
1336
 
                                       const unsigned char *old_record, 
1337
 
                                       const unsigned char *new_record)
1338
 
{
1339
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1340
 
  if (! replication_services.isActive())
1341
 
    return;
1342
 
 
1343
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1344
 
 
1345
 
  message::UpdateData *data= statement.mutable_update_data();
1346
 
  data->set_segment_id(1);
1347
 
  data->set_end_segment(true);
1348
 
  message::UpdateRecord *record= data->add_record();
1349
 
 
1350
 
  Field *current_field;
1351
 
  Field **table_fields= in_table->field;
1352
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1353
 
  string_value->set_charset(system_charset_info);
1354
 
 
1355
 
  while ((current_field= *table_fields++) != NULL) 
1356
 
  {
1357
 
    /*
1358
 
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1359
 
     * but then realized that an UPDATE statement could potentially have different values for
1360
 
     * the SET field.  For instance, imagine this SQL scenario:
1361
 
     *
1362
 
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1363
 
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1364
 
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1365
 
     *
1366
 
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1367
 
     *
1368
 
     * The below really should be moved into the Field API and Record API.  But for now
1369
 
     * we do this crazy pointer fiddling to figure out if the current field
1370
 
     * has been updated in the supplied record raw byte pointers.
1371
 
     */
1372
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1373
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1374
 
 
1375
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1376
 
 
1377
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1378
 
    {
1379
 
      /* Store the original "read bit" for this field */
1380
 
      bool is_read_set= current_field->isReadSet();
1381
 
 
1382
 
      /* We need to mark that we will "read" this field... */
1383
 
      in_table->setReadSet(current_field->field_index);
1384
 
 
1385
 
      /* Read the string value of this field's contents */
1386
 
      string_value= current_field->val_str(string_value);
1387
 
 
1388
 
      /* 
1389
 
       * Reset the read bit after reading field to its original state.  This 
1390
 
       * prevents the field from being included in the WHERE clause
1391
 
       */
1392
 
      current_field->setReadSet(is_read_set);
1393
 
 
1394
 
      record->add_after_value(string_value->c_ptr(), string_value->length());
1395
 
      string_value->free();
1396
 
    }
1397
 
 
1398
 
    /* 
1399
 
     * Add the WHERE clause values now...for now, this means the
1400
 
     * primary key field value.  Replication only supports tables
1401
 
     * with a primary key.
1402
 
     */
1403
 
    if (in_table->s->fieldInPrimaryKey(current_field))
1404
 
    {
1405
 
      /**
1406
 
       * To say the below is ugly is an understatement. But it works.
1407
 
       * 
1408
 
       * @todo Move this crap into a real Record API.
1409
 
       */
1410
 
      string_value= current_field->val_str(string_value,
1411
 
                                           old_record + 
1412
 
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1413
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1414
 
      string_value->free();
1415
 
    }
1416
 
 
1417
 
  }
1418
 
}
1419
 
 
1420
 
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1421
 
                                                            Table *in_table)
1422
 
{
1423
 
  message::Statement *statement= in_session->getStatementMessage();
1424
 
  /*
1425
 
   * We check to see if the current Statement message is of type DELETE.
1426
 
   * If it is not, we finalize the current Statement and ensure a new
1427
 
   * DeleteStatement is created.
1428
 
   */
1429
 
  if (statement != NULL &&
1430
 
      statement->type() != message::Statement::DELETE)
1431
 
  {
1432
 
    finalizeStatementMessage(*statement, in_session);
1433
 
    statement= in_session->getStatementMessage();
1434
 
  }
1435
 
 
1436
 
  if (statement == NULL)
1437
 
  {
1438
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1439
 
    /* 
1440
 
     * Transaction message initialized and set, but no statement created
1441
 
     * yet.  We construct one and initialize it, here, then return the
1442
 
     * message after attaching the new Statement message pointer to the 
1443
 
     * Session for easy retrieval later...
1444
 
     */
1445
 
    statement= transaction->add_statement();
1446
 
    setDeleteHeader(*statement, in_session, in_table);
1447
 
    in_session->setStatementMessage(statement);
1448
 
  }
1449
 
  return *statement;
1450
 
}
1451
 
 
1452
 
void TransactionServices::setDeleteHeader(message::Statement &statement,
1453
 
                                          Session *in_session,
1454
 
                                          Table *in_table)
1455
 
{
1456
 
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1457
 
 
1458
 
  /* 
1459
 
   * Now we construct the specialized DeleteHeader message inside
1460
 
   * the generalized message::Statement container...
1461
 
   */
1462
 
  message::DeleteHeader *header= statement.mutable_delete_header();
1463
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1464
 
 
1465
 
  string schema_name;
1466
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1467
 
  string table_name;
1468
 
  (void) in_table->getShare()->getTableName(table_name);
1469
 
 
1470
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1471
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1472
 
 
1473
 
  Field *current_field;
1474
 
  Field **table_fields= in_table->field;
1475
 
 
1476
 
  message::FieldMetadata *field_metadata;
1477
 
 
1478
 
  while ((current_field= *table_fields++) != NULL) 
1479
 
  {
1480
 
    /* 
1481
 
     * Add the WHERE clause values now...for now, this means the
1482
 
     * primary key field value.  Replication only supports tables
1483
 
     * with a primary key.
1484
 
     */
1485
 
    if (in_table->s->fieldInPrimaryKey(current_field))
1486
 
    {
1487
 
      field_metadata= header->add_key_field_metadata();
1488
 
      field_metadata->set_name(current_field->field_name);
1489
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1490
 
    }
1491
 
  }
1492
 
}
1493
 
 
1494
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1495
 
{
1496
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1497
 
  if (! replication_services.isActive())
1498
 
    return;
1499
 
 
1500
 
  message::Statement &statement= getDeleteStatement(in_session, in_table);
1501
 
 
1502
 
  message::DeleteData *data= statement.mutable_delete_data();
1503
 
  data->set_segment_id(1);
1504
 
  data->set_end_segment(true);
1505
 
  message::DeleteRecord *record= data->add_record();
1506
 
 
1507
 
  Field *current_field;
1508
 
  Field **table_fields= in_table->field;
1509
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1510
 
  string_value->set_charset(system_charset_info);
1511
 
 
1512
 
  while ((current_field= *table_fields++) != NULL) 
1513
 
  {
1514
 
    /* 
1515
 
     * Add the WHERE clause values now...for now, this means the
1516
 
     * primary key field value.  Replication only supports tables
1517
 
     * with a primary key.
1518
 
     */
1519
 
    if (in_table->s->fieldInPrimaryKey(current_field))
1520
 
    {
1521
 
      string_value= current_field->val_str(string_value);
1522
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1523
 
      /**
1524
 
       * @TODO Store optional old record value in the before data member
1525
 
       */
1526
 
      string_value->free();
1527
 
    }
1528
 
  }
1529
 
}
1530
 
 
1531
 
void TransactionServices::createTable(Session *in_session,
1532
 
                                      const message::Table &table)
1533
 
{
1534
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1535
 
  if (! replication_services.isActive())
1536
 
    return;
1537
 
  
1538
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1539
 
  message::Statement *statement= transaction->add_statement();
1540
 
 
1541
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1542
 
 
1543
 
  /* 
1544
 
   * Construct the specialized CreateTableStatement message and attach
1545
 
   * it to the generic Statement message
1546
 
   */
1547
 
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1548
 
  message::Table *new_table_message= create_table_statement->mutable_table();
1549
 
  *new_table_message= table;
1550
 
 
1551
 
  finalizeStatementMessage(*statement, in_session);
1552
 
 
1553
 
  finalizeTransactionMessage(*transaction, in_session);
1554
 
  
1555
 
  replication_services.pushTransactionMessage(*transaction);
1556
 
 
1557
 
  cleanupTransactionMessage(transaction, in_session);
1558
 
 
1559
 
}
1560
 
 
1561
 
void TransactionServices::createSchema(Session *in_session,
1562
 
                                       const message::Schema &schema)
1563
 
{
1564
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1565
 
  if (! replication_services.isActive())
1566
 
    return;
1567
 
  
1568
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1569
 
  message::Statement *statement= transaction->add_statement();
1570
 
 
1571
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1572
 
 
1573
 
  /* 
1574
 
   * Construct the specialized CreateSchemaStatement message and attach
1575
 
   * it to the generic Statement message
1576
 
   */
1577
 
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1578
 
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1579
 
  *new_schema_message= schema;
1580
 
 
1581
 
  finalizeStatementMessage(*statement, in_session);
1582
 
 
1583
 
  finalizeTransactionMessage(*transaction, in_session);
1584
 
  
1585
 
  replication_services.pushTransactionMessage(*transaction);
1586
 
 
1587
 
  cleanupTransactionMessage(transaction, in_session);
1588
 
 
1589
 
}
1590
 
 
1591
 
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1592
 
{
1593
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1594
 
  if (! replication_services.isActive())
1595
 
    return;
1596
 
  
1597
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1598
 
  message::Statement *statement= transaction->add_statement();
1599
 
 
1600
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1601
 
 
1602
 
  /* 
1603
 
   * Construct the specialized DropSchemaStatement message and attach
1604
 
   * it to the generic Statement message
1605
 
   */
1606
 
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1607
 
 
1608
 
  drop_schema_statement->set_schema_name(schema_name);
1609
 
 
1610
 
  finalizeStatementMessage(*statement, in_session);
1611
 
 
1612
 
  finalizeTransactionMessage(*transaction, in_session);
1613
 
  
1614
 
  replication_services.pushTransactionMessage(*transaction);
1615
 
 
1616
 
  cleanupTransactionMessage(transaction, in_session);
1617
 
}
1618
 
 
1619
 
void TransactionServices::dropTable(Session *in_session,
1620
 
                                    const string &schema_name,
1621
 
                                    const string &table_name,
1622
 
                                    bool if_exists)
1623
 
{
1624
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1625
 
  if (! replication_services.isActive())
1626
 
    return;
1627
 
  
1628
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1629
 
  message::Statement *statement= transaction->add_statement();
1630
 
 
1631
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1632
 
 
1633
 
  /* 
1634
 
   * Construct the specialized DropTableStatement message and attach
1635
 
   * it to the generic Statement message
1636
 
   */
1637
 
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1638
 
 
1639
 
  drop_table_statement->set_if_exists_clause(if_exists);
1640
 
 
1641
 
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1642
 
 
1643
 
  table_metadata->set_schema_name(schema_name);
1644
 
  table_metadata->set_table_name(table_name);
1645
 
 
1646
 
  finalizeStatementMessage(*statement, in_session);
1647
 
 
1648
 
  finalizeTransactionMessage(*transaction, in_session);
1649
 
  
1650
 
  replication_services.pushTransactionMessage(*transaction);
1651
 
 
1652
 
  cleanupTransactionMessage(transaction, in_session);
1653
 
}
1654
 
 
1655
 
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1656
 
{
1657
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1658
 
  if (! replication_services.isActive())
1659
 
    return;
1660
 
  
1661
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1662
 
  message::Statement *statement= transaction->add_statement();
1663
 
 
1664
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1665
 
 
1666
 
  /* 
1667
 
   * Construct the specialized TruncateTableStatement message and attach
1668
 
   * it to the generic Statement message
1669
 
   */
1670
 
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1671
 
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1672
 
 
1673
 
  string schema_name;
1674
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1675
 
  string table_name;
1676
 
  (void) in_table->getShare()->getTableName(table_name);
1677
 
 
1678
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1679
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1680
 
 
1681
 
  finalizeStatementMessage(*statement, in_session);
1682
 
 
1683
 
  finalizeTransactionMessage(*transaction, in_session);
1684
 
  
1685
 
  replication_services.pushTransactionMessage(*transaction);
1686
 
 
1687
 
  cleanupTransactionMessage(transaction, in_session);
1688
 
}
1689
 
 
1690
 
void TransactionServices::rawStatement(Session *in_session, const string &query)
1691
 
{
1692
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1693
 
  if (! replication_services.isActive())
1694
 
    return;
1695
 
  
1696
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1697
 
  message::Statement *statement= transaction->add_statement();
1698
 
 
1699
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1700
 
  statement->set_sql(query);
1701
 
  finalizeStatementMessage(*statement, in_session);
1702
 
 
1703
 
  finalizeTransactionMessage(*transaction, in_session);
1704
 
  
1705
 
  replication_services.pushTransactionMessage(*transaction);
1706
 
 
1707
 
  cleanupTransactionMessage(transaction, in_session);
1708
 
}
1709
 
 
1710
 
} /* namespace drizzled */