~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-12-08 22:35:56 UTC
  • mfrom: (1819.9.158 update-innobase)
  • Revision ID: brian@tangent.org-20101208223556-37mi4omqg7lkjzf3
Merge in Stewart's changes, 1.3 changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2008 Sun Microsystems
 
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
 
6
 *
 
7
 *  This program is free software; you can redistribute it and/or modify
 
8
 *  it under the terms of the GNU General Public License as published by
 
9
 *  the Free Software Foundation; version 2 of the License.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
/**
 
22
 * @file Transaction processing code
 
23
 *
 
24
 * @note
 
25
 *
 
26
 * The TransactionServices component takes internal events (for instance the start of a 
 
27
 * transaction, the changing of a record, or the rollback of a transaction) 
 
28
 * and constructs GPB Messages that are passed to the ReplicationServices
 
29
 * component and used during replication.
 
30
 *
 
31
 * The reason for this functionality is to encapsulate all communication
 
32
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
33
 * Instead of the plugin having to understand the (often fluidly changing)
 
34
 * mechanics of the kernel, all the plugin needs to understand is the message
 
35
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
36
 * these messages.
 
37
 *
 
38
 * @see /drizzled/message/transaction.proto
 
39
 *
 
40
 * @todo
 
41
 *
 
42
 * We really should store the raw bytes in the messages, not the
 
43
 * String value of the Field.  But, to do that, the
 
44
 * statement_transform library needs first to be updated
 
45
 * to include the transformation code to convert raw
 
46
 * Drizzle-internal Field byte representation into something
 
47
 * plugins can understand.
 
48
 */
 
49
 
 
50
#include "config.h"
 
51
#include "drizzled/my_hash.h"
 
52
#include "drizzled/error.h"
 
53
#include "drizzled/gettext.h"
 
54
#include "drizzled/probes.h"
 
55
#include "drizzled/sql_parse.h"
 
56
#include "drizzled/session.h"
 
57
#include "drizzled/sql_base.h"
 
58
#include "drizzled/replication_services.h"
 
59
#include "drizzled/transaction_services.h"
 
60
#include "drizzled/transaction_context.h"
 
61
#include "drizzled/message/transaction.pb.h"
 
62
#include "drizzled/message/statement_transform.h"
 
63
#include "drizzled/resource_context.h"
 
64
#include "drizzled/lock.h"
 
65
#include "drizzled/item/int.h"
 
66
#include "drizzled/item/empty_string.h"
 
67
#include "drizzled/field/timestamp.h"
 
68
#include "drizzled/plugin/client.h"
 
69
#include "drizzled/plugin/monitored_in_transaction.h"
 
70
#include "drizzled/plugin/transactional_storage_engine.h"
 
71
#include "drizzled/plugin/xa_resource_manager.h"
 
72
#include "drizzled/plugin/xa_storage_engine.h"
 
73
#include "drizzled/internal/my_sys.h"
 
74
 
 
75
#include <vector>
 
76
#include <algorithm>
 
77
#include <functional>
 
78
#include <google/protobuf/repeated_field.h>
 
79
 
 
80
using namespace std;
 
81
using namespace google;
 
82
 
 
83
namespace drizzled
 
84
{
 
85
 
 
86
/**
 
87
 * @defgroup Transactions
 
88
 *
 
89
 * @brief
 
90
 *
 
91
 * Transaction handling in the server
 
92
 *
 
93
 * @detail
 
94
 *
 
95
 * In each client connection, Drizzle maintains two transaction
 
96
 * contexts representing the state of the:
 
97
 *
 
98
 * 1) Statement Transaction
 
99
 * 2) Normal Transaction
 
100
 *
 
101
 * These two transaction contexts represent the transactional
 
102
 * state of a Session's SQL and XA transactions for a single
 
103
 * SQL statement or a series of SQL statements.
 
104
 *
 
105
 * When the Session's connection is in AUTOCOMMIT mode, there
 
106
 * is no practical difference between the statement and the
 
107
 * normal transaction, as each SQL statement is committed or
 
108
 * rolled back depending on the success or failure of the
 
109
 * indvidual SQL statement.
 
110
 *
 
111
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
 
112
 * the Session has explicitly begun a normal SQL transaction using
 
113
 * a BEGIN WORK/START TRANSACTION statement, then the normal
 
114
 * transaction context tracks the aggregate transaction state of
 
115
 * the SQL transaction's individual statements, and the SQL
 
116
 * transaction's commit or rollback is done atomically for all of
 
117
 * the SQL transaction's statement's data changes.
 
118
 *
 
119
 * Technically, a statement transaction can be viewed as a savepoint 
 
120
 * which is maintained automatically in order to make effects of one
 
121
 * statement atomic.
 
122
 *
 
123
 * The normal transaction is started by the user and is typically
 
124
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
 
125
 * The exception to this is that DDL statements implicitly COMMIT
 
126
 * any previously active normal transaction before they begin executing.
 
127
 *
 
128
 * In Drizzle, unlike MySQL, plugins other than a storage engine
 
129
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
 
130
 * plugins will automatically be monitored by Drizzle's transaction 
 
131
 * manager (implemented in this source file), as will all plugins which
 
132
 * implement plugin::XaResourceManager and register with the transaction
 
133
 * manager.
 
134
 *
 
135
 * If Drizzle's transaction manager sees that more than one resource
 
136
 * manager (transactional storage engine or XA resource manager) has modified
 
137
 * data state during a statement or normal transaction, the transaction
 
138
 * manager will automatically use a two-phase commit protocol for all
 
139
 * resources which support XA's distributed transaction protocol.  Unlike
 
140
 * MySQL, storage engines need not manually register with the transaction
 
141
 * manager during a statement's execution.  Previously, in MySQL, all
 
142
 * handlertons would have to call trans_register_ha() at some point after
 
143
 * modifying data state in order to have MySQL include that handler in
 
144
 * an XA transaction.  Drizzle does all of this grunt work behind the
 
145
 * scenes for the storage engine implementers.
 
146
 *
 
147
 * When a connection is closed, the current normal transaction, if
 
148
 * any is currently active, is rolled back.
 
149
 *
 
150
 * Transaction life cycle
 
151
 * ----------------------
 
152
 *
 
153
 * When a new connection is established, session->transaction
 
154
 * members are initialized to an empty state. If a statement uses any tables, 
 
155
 * all affected engines are registered in the statement engine list automatically
 
156
 * in plugin::StorageEngine::startStatement() and 
 
157
 * plugin::TransactionalStorageEngine::startTransaction().
 
158
 *
 
159
 * You can view the lifetime of a normal transaction in the following
 
160
 * call-sequence:
 
161
 *
 
162
 * drizzled::statement::Statement::execute()
 
163
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
 
164
 *     drizzled::TransactionServices::registerResourceForTransaction()
 
165
 *     drizzled::TransactionServices::registerResourceForStatement()
 
166
 *     drizzled::plugin::StorageEngine::startStatement()
 
167
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
 
168
 *     drizzled::plugin::StorageEngine::endStatement()
 
169
 *   drizzled::TransactionServices::autocommitOrRollback()
 
170
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
 
171
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
 
172
 *
 
173
 * Roles and responsibilities
 
174
 * --------------------------
 
175
 *
 
176
 * Beginning of SQL Statement (and Statement Transaction)
 
177
 * ------------------------------------------------------
 
178
 *
 
179
 * At the start of each SQL statement, for each storage engine
 
180
 * <strong>that is involved in the SQL statement</strong>, the kernel 
 
181
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
 
182
 * engine needs to track some data for the statement, it should use
 
183
 * this method invocation to initialize this data.  This is the
 
184
 * beginning of what is called the "statement transaction".
 
185
 *
 
186
 * <strong>For transaction storage engines (those storage engines
 
187
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
 
188
 * kernel automatically determines if the start of the SQL statement 
 
189
 * transaction should <em>also</em> begin the normal SQL transaction.
 
190
 * This occurs when the connection is in NOT in autocommit mode. If
 
191
 * the kernel detects this, then the kernel automatically starts the
 
192
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
 
193
 * method and then calls plugin::StorageEngine::startStatement()
 
194
 * afterwards.
 
195
 *
 
196
 * Beginning of an SQL "Normal" Transaction
 
197
 * ----------------------------------------
 
198
 *
 
199
 * As noted above, a "normal SQL transaction" may be started when
 
200
 * an SQL statement is started in a connection and the connection is
 
201
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
 
202
 *
 
203
 * In addition, when a user executes a START TRANSACTION or
 
204
 * BEGIN WORK statement in a connection, the kernel explicitly
 
205
 * calls each transactional storage engine's startTransaction() method.
 
206
 *
 
207
 * Ending of an SQL Statement (and Statement Transaction)
 
208
 * ------------------------------------------------------
 
209
 *
 
210
 * At the end of each SQL statement, for each of the aforementioned
 
211
 * involved storage engines, the kernel calls the engine's
 
212
 * plugin::StorageEngine::endStatement() method.  If the engine
 
213
 * has initialized or modified some internal data about the
 
214
 * statement transaction, it should use this method to reset or destroy
 
215
 * this data appropriately.
 
216
 *
 
217
 * Ending of an SQL "Normal" Transaction
 
218
 * -------------------------------------
 
219
 *
 
220
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
 
221
 * depending on the success or failure of the statement transaction(s) 
 
222
 * it encloses.
 
223
 *
 
224
 * The end of a "normal transaction" occurs when any of the following
 
225
 * occurs:
 
226
 *
 
227
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
 
228
 *    then the normal transaction which encloses the statement
 
229
 *    transaction ends
 
230
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
 
231
 * 3) Just before a DDL operation occurs, the kernel will implicitly
 
232
 *    commit the active normal transaction
 
233
 *
 
234
 * Transactions and Non-transactional Storage Engines
 
235
 * --------------------------------------------------
 
236
 *
 
237
 * For non-transactional engines, this call can be safely ignored, an
 
238
 * the kernel tracks whether a non-transactional engine has changed
 
239
 * any data state, and warns the user appropriately if a transaction
 
240
 * (statement or normal) is rolled back after such non-transactional
 
241
 * data changes have been made.
 
242
 *
 
243
 * XA Two-phase Commit Protocol
 
244
 * ----------------------------
 
245
 *
 
246
 * During statement execution, whenever any of data-modifying
 
247
 * PSEA API methods is used, e.g. Cursor::write_row() or
 
248
 * Cursor::update_row(), the read-write flag is raised in the
 
249
 * statement transaction for the involved engine.
 
250
 * Currently All PSEA calls are "traced", and the data can not be
 
251
 * changed in a way other than issuing a PSEA call. Important:
 
252
 * unless this invariant is preserved the server will not know that
 
253
 * a transaction in a given engine is read-write and will not
 
254
 * involve the two-phase commit protocol!
 
255
 *
 
256
 * At the end of a statement, TransactionServices::autocommitOrRollback()
 
257
 * is invoked. This call in turn
 
258
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
 
259
 * resource manager.
 
260
 *
 
261
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
 
262
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
 
263
 * is...)
 
264
 * 
 
265
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
 
266
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
 
267
 * At statement commit, the statement-related read-write engine
 
268
 * flag is propagated to the corresponding flag in the normal
 
269
 * transaction.  When the commit is complete, the list of registered
 
270
 * engines is cleared.
 
271
 *
 
272
 * Rollback is handled in a similar fashion.
 
273
 *
 
274
 * Additional notes on DDL and the normal transaction.
 
275
 * ---------------------------------------------------
 
276
 *
 
277
 * CREATE TABLE .. SELECT can start a *new* normal transaction
 
278
 * because of the fact that SELECTs on a transactional storage
 
279
 * engine participate in the normal SQL transaction (due to
 
280
 * isolation level issues and consistent read views).
 
281
 *
 
282
 * Behaviour of the server in this case is currently badly
 
283
 * defined.
 
284
 *
 
285
 * DDL statements use a form of "semantic" logging
 
286
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
 
287
 * the newly created table is deleted.
 
288
 * 
 
289
 * In addition, some DDL statements issue interim transaction
 
290
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
 
291
 * from the original table to the internal temporary table. Other
 
292
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
 
293
 * after itself.
 
294
 *
 
295
 * And finally there is a group of DDL statements such as
 
296
 * RENAME/DROP TABLE that doesn't start a new transaction
 
297
 * and doesn't commit.
 
298
 *
 
299
 * A consistent behaviour is perhaps to always commit the normal
 
300
 * transaction after all DDLs, just like the statement transaction
 
301
 * is always committed at the end of all statements.
 
302
 */
 
303
TransactionServices::TransactionServices()
 
304
{
 
305
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
 
306
  if (engine)
 
307
  {
 
308
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
 
309
  }
 
310
  else 
 
311
  {
 
312
    xa_storage_engine= NULL;
 
313
  }
 
314
}
 
315
 
 
316
void TransactionServices::registerResourceForStatement(Session *session,
 
317
                                                       plugin::MonitoredInTransaction *monitored,
 
318
                                                       plugin::TransactionalStorageEngine *engine)
 
319
{
 
320
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
321
  {
 
322
    /* 
 
323
     * Now we automatically register this resource manager for the
 
324
     * normal transaction.  This is fine because a statement
 
325
     * transaction registration should always enlist the resource
 
326
     * in the normal transaction which contains the statement
 
327
     * transaction.
 
328
     */
 
329
    registerResourceForTransaction(session, monitored, engine);
 
330
  }
 
331
 
 
332
  TransactionContext *trans= &session->transaction.stmt;
 
333
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
334
 
 
335
  if (resource_context->isStarted())
 
336
    return; /* already registered, return */
 
337
 
 
338
  assert(monitored->participatesInSqlTransaction());
 
339
  assert(not monitored->participatesInXaTransaction());
 
340
 
 
341
  resource_context->setMonitored(monitored);
 
342
  resource_context->setTransactionalStorageEngine(engine);
 
343
  trans->registerResource(resource_context);
 
344
 
 
345
  trans->no_2pc|= true;
 
346
}
 
347
 
 
348
void TransactionServices::registerResourceForStatement(Session *session,
 
349
                                                       plugin::MonitoredInTransaction *monitored,
 
350
                                                       plugin::TransactionalStorageEngine *engine,
 
351
                                                       plugin::XaResourceManager *resource_manager)
 
352
{
 
353
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
354
  {
 
355
    /* 
 
356
     * Now we automatically register this resource manager for the
 
357
     * normal transaction.  This is fine because a statement
 
358
     * transaction registration should always enlist the resource
 
359
     * in the normal transaction which contains the statement
 
360
     * transaction.
 
361
     */
 
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
 
363
  }
 
364
 
 
365
  TransactionContext *trans= &session->transaction.stmt;
 
366
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
367
 
 
368
  if (resource_context->isStarted())
 
369
    return; /* already registered, return */
 
370
 
 
371
  assert(monitored->participatesInXaTransaction());
 
372
  assert(monitored->participatesInSqlTransaction());
 
373
 
 
374
  resource_context->setMonitored(monitored);
 
375
  resource_context->setTransactionalStorageEngine(engine);
 
376
  resource_context->setXaResourceManager(resource_manager);
 
377
  trans->registerResource(resource_context);
 
378
 
 
379
  trans->no_2pc|= false;
 
380
}
 
381
 
 
382
void TransactionServices::registerResourceForTransaction(Session *session,
 
383
                                                         plugin::MonitoredInTransaction *monitored,
 
384
                                                         plugin::TransactionalStorageEngine *engine)
 
385
{
 
386
  TransactionContext *trans= &session->transaction.all;
 
387
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
388
 
 
389
  if (resource_context->isStarted())
 
390
    return; /* already registered, return */
 
391
 
 
392
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
393
 
 
394
  trans->registerResource(resource_context);
 
395
 
 
396
  assert(monitored->participatesInSqlTransaction());
 
397
  assert(not monitored->participatesInXaTransaction());
 
398
 
 
399
  resource_context->setMonitored(monitored);
 
400
  resource_context->setTransactionalStorageEngine(engine);
 
401
  trans->no_2pc|= true;
 
402
 
 
403
  if (session->transaction.xid_state.xid.is_null())
 
404
    session->transaction.xid_state.xid.set(session->getQueryId());
 
405
 
 
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
 
407
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
408
    registerResourceForStatement(session, monitored, engine);
 
409
}
 
410
 
 
411
void TransactionServices::registerResourceForTransaction(Session *session,
 
412
                                                         plugin::MonitoredInTransaction *monitored,
 
413
                                                         plugin::TransactionalStorageEngine *engine,
 
414
                                                         plugin::XaResourceManager *resource_manager)
 
415
{
 
416
  TransactionContext *trans= &session->transaction.all;
 
417
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
418
 
 
419
  if (resource_context->isStarted())
 
420
    return; /* already registered, return */
 
421
 
 
422
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
423
 
 
424
  trans->registerResource(resource_context);
 
425
 
 
426
  assert(monitored->participatesInSqlTransaction());
 
427
 
 
428
  resource_context->setMonitored(monitored);
 
429
  resource_context->setXaResourceManager(resource_manager);
 
430
  resource_context->setTransactionalStorageEngine(engine);
 
431
  trans->no_2pc|= true;
 
432
 
 
433
  if (session->transaction.xid_state.xid.is_null())
 
434
    session->transaction.xid_state.xid.set(session->getQueryId());
 
435
 
 
436
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
 
437
 
 
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
 
439
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
 
441
}
 
442
 
 
443
void TransactionServices::allocateNewTransactionId()
 
444
{
 
445
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
446
  if (! replication_services.isActive())
 
447
  {
 
448
    return;
 
449
  }
 
450
 
 
451
  Session *my_session= current_session;
 
452
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
 
453
  my_session->setXaId(xa_id);
 
454
}
 
455
 
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
 
457
{
 
458
  if (session->getXaId() == 0)
 
459
  {
 
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
 
461
  }
 
462
 
 
463
  return session->getXaId();
 
464
}
 
465
 
 
466
/**
 
467
  @retval
 
468
    0   ok
 
469
  @retval
 
470
    1   transaction was rolled back
 
471
  @retval
 
472
    2   error during commit, data may be inconsistent
 
473
 
 
474
  @todo
 
475
    Since we don't support nested statement transactions in 5.0,
 
476
    we can't commit or rollback stmt transactions while we are inside
 
477
    stored functions or triggers. So we simply do nothing now.
 
478
    TODO: This should be fixed in later ( >= 5.1) releases.
 
479
*/
 
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
 
481
{
 
482
  int error= 0, cookie= 0;
 
483
  /*
 
484
    'all' means that this is either an explicit commit issued by
 
485
    user, or an implicit commit issued by a DDL.
 
486
  */
 
487
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
488
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
489
 
 
490
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
491
 
 
492
  /*
 
493
    We must not commit the normal transaction if a statement
 
494
    transaction is pending. Otherwise statement transaction
 
495
    flags will not get propagated to its normal transaction's
 
496
    counterpart.
 
497
  */
 
498
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
499
              trans == &session->transaction.stmt);
 
500
 
 
501
  if (resource_contexts.empty() == false)
 
502
  {
 
503
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
 
504
    {
 
505
      rollbackTransaction(session, normal_transaction);
 
506
      return 1;
 
507
    }
 
508
 
 
509
    /*
 
510
     * If replication is on, we do a PREPARE on the resource managers, push the
 
511
     * Transaction message across the replication stream, and then COMMIT if the
 
512
     * replication stream returned successfully.
 
513
     */
 
514
    if (shouldConstructMessages())
 
515
    {
 
516
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
517
           it != resource_contexts.end() && ! error;
 
518
           ++it)
 
519
      {
 
520
        ResourceContext *resource_context= *it;
 
521
        int err;
 
522
        /*
 
523
          Do not call two-phase commit if this particular
 
524
          transaction is read-only. This allows for simpler
 
525
          implementation in engines that are always read-only.
 
526
        */
 
527
        if (! resource_context->hasModifiedData())
 
528
          continue;
 
529
 
 
530
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
 
531
 
 
532
        if (resource->participatesInXaTransaction())
 
533
        {
 
534
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
 
535
          {
 
536
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
 
537
            error= 1;
 
538
          }
 
539
          else
 
540
          {
 
541
            session->status_var.ha_prepare_count++;
 
542
          }
 
543
        }
 
544
      }
 
545
      if (error == 0 && is_real_trans)
 
546
      {
 
547
        /*
 
548
         * Push the constructed Transaction messages across to
 
549
         * replicators and appliers.
 
550
         */
 
551
        error= commitTransactionMessage(session);
 
552
      }
 
553
      if (error)
 
554
      {
 
555
        rollbackTransaction(session, normal_transaction);
 
556
        error= 1;
 
557
        goto end;
 
558
      }
 
559
    }
 
560
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
 
561
end:
 
562
    if (is_real_trans)
 
563
      session->startWaitingGlobalReadLock();
 
564
  }
 
565
  return error;
 
566
}
 
567
 
 
568
/**
 
569
  @note
 
570
  This function does not care about global read lock. A caller should.
 
571
*/
 
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
 
573
{
 
574
  int error=0;
 
575
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
576
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
577
 
 
578
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
579
 
 
580
  if (resource_contexts.empty() == false)
 
581
  {
 
582
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
583
         it != resource_contexts.end();
 
584
         ++it)
 
585
    {
 
586
      int err;
 
587
      ResourceContext *resource_context= *it;
 
588
 
 
589
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
 
590
 
 
591
      if (resource->participatesInXaTransaction())
 
592
      {
 
593
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
 
594
        {
 
595
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
 
596
          error= 1;
 
597
        }
 
598
        else if (normal_transaction)
 
599
        {
 
600
          session->status_var.ha_commit_count++;
 
601
        }
 
602
      }
 
603
      else if (resource->participatesInSqlTransaction())
 
604
      {
 
605
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
 
606
        {
 
607
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
 
608
          error= 1;
 
609
        }
 
610
        else if (normal_transaction)
 
611
        {
 
612
          session->status_var.ha_commit_count++;
 
613
        }
 
614
      }
 
615
      resource_context->reset(); /* keep it conveniently zero-filled */
 
616
    }
 
617
 
 
618
    if (is_real_trans)
 
619
      session->transaction.xid_state.xid.null();
 
620
 
 
621
    if (normal_transaction)
 
622
    {
 
623
      session->variables.tx_isolation= session->session_tx_isolation;
 
624
      session->transaction.cleanup();
 
625
    }
 
626
  }
 
627
  trans->reset();
 
628
  return error;
 
629
}
 
630
 
 
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
 
632
{
 
633
  int error= 0;
 
634
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
635
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
636
 
 
637
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
638
 
 
639
  /*
 
640
    We must not rollback the normal transaction if a statement
 
641
    transaction is pending.
 
642
  */
 
643
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
644
              trans == &session->transaction.stmt);
 
645
 
 
646
  if (resource_contexts.empty() == false)
 
647
  {
 
648
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
649
         it != resource_contexts.end();
 
650
         ++it)
 
651
    {
 
652
      int err;
 
653
      ResourceContext *resource_context= *it;
 
654
 
 
655
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
 
656
 
 
657
      if (resource->participatesInXaTransaction())
 
658
      {
 
659
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
 
660
        {
 
661
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
 
662
          error= 1;
 
663
        }
 
664
        else if (normal_transaction)
 
665
        {
 
666
          session->status_var.ha_rollback_count++;
 
667
        }
 
668
      }
 
669
      else if (resource->participatesInSqlTransaction())
 
670
      {
 
671
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
 
672
        {
 
673
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
 
674
          error= 1;
 
675
        }
 
676
        else if (normal_transaction)
 
677
        {
 
678
          session->status_var.ha_rollback_count++;
 
679
        }
 
680
      }
 
681
      resource_context->reset(); /* keep it conveniently zero-filled */
 
682
    }
 
683
    
 
684
    /* 
 
685
     * We need to signal the ROLLBACK to ReplicationServices here
 
686
     * BEFORE we set the transaction ID to NULL.  This is because
 
687
     * if a bulk segment was sent to replicators, we need to send
 
688
     * a rollback statement with the corresponding transaction ID
 
689
     * to rollback.
 
690
     */
 
691
    if (normal_transaction)
 
692
      rollbackTransactionMessage(session);
 
693
 
 
694
    if (is_real_trans)
 
695
      session->transaction.xid_state.xid.null();
 
696
    if (normal_transaction)
 
697
    {
 
698
      session->variables.tx_isolation=session->session_tx_isolation;
 
699
      session->transaction.cleanup();
 
700
    }
 
701
  }
 
702
  if (normal_transaction)
 
703
    session->transaction_rollback_request= false;
 
704
 
 
705
  /*
 
706
   * If a non-transactional table was updated, warn the user
 
707
   */
 
708
  if (is_real_trans &&
 
709
      session->transaction.all.hasModifiedNonTransData() &&
 
710
      session->getKilled() != Session::KILL_CONNECTION)
 
711
  {
 
712
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
713
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
 
714
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
 
715
  }
 
716
  trans->reset();
 
717
  return error;
 
718
}
 
719
 
 
720
/**
 
721
  This is used to commit or rollback a single statement depending on
 
722
  the value of error.
 
723
 
 
724
  @note
 
725
    Note that if the autocommit is on, then the following call inside
 
726
    InnoDB will commit or rollback the whole transaction (= the statement). The
 
727
    autocommit mechanism built into InnoDB is based on counting locks, but if
 
728
    the user has used LOCK TABLES then that mechanism does not know to do the
 
729
    commit.
 
730
*/
 
731
int TransactionServices::autocommitOrRollback(Session *session, int error)
 
732
{
 
733
 
 
734
  if (session->transaction.stmt.getResourceContexts().empty() == false)
 
735
  {
 
736
    TransactionContext *trans = &session->transaction.stmt;
 
737
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
738
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
739
         it != resource_contexts.end();
 
740
         ++it)
 
741
    {
 
742
      ResourceContext *resource_context= *it;
 
743
 
 
744
      resource_context->getTransactionalStorageEngine()->endStatement(session);
 
745
    }
 
746
 
 
747
    if (! error)
 
748
    {
 
749
      if (commitTransaction(session, false))
 
750
        error= 1;
 
751
    }
 
752
    else
 
753
    {
 
754
      (void) rollbackTransaction(session, false);
 
755
      if (session->transaction_rollback_request)
 
756
        (void) rollbackTransaction(session, true);
 
757
    }
 
758
 
 
759
    session->variables.tx_isolation= session->session_tx_isolation;
 
760
  }
 
761
  return error;
 
762
}
 
763
 
 
764
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
 
765
{
 
766
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
 
767
  {
 
768
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
 
769
     * resources aren't the same... */
 
770
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
 
771
  }
 
772
};
 
773
 
 
774
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
 
775
{
 
776
  int error= 0;
 
777
  TransactionContext *trans= &session->transaction.all;
 
778
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
 
779
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
 
780
 
 
781
  trans->no_2pc= false;
 
782
  /*
 
783
    rolling back to savepoint in all storage engines that were part of the
 
784
    transaction when the savepoint was set
 
785
  */
 
786
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
 
787
       it != sv_resource_contexts.end();
 
788
       ++it)
 
789
  {
 
790
    int err;
 
791
    ResourceContext *resource_context= *it;
 
792
 
 
793
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
 
794
 
 
795
    if (resource->participatesInSqlTransaction())
 
796
    {
 
797
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
 
798
      {
 
799
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
 
800
        error= 1;
 
801
      }
 
802
      else
 
803
      {
 
804
        session->status_var.ha_savepoint_rollback_count++;
 
805
      }
 
806
    }
 
807
    trans->no_2pc|= not resource->participatesInXaTransaction();
 
808
  }
 
809
  /*
 
810
    rolling back the transaction in all storage engines that were not part of
 
811
    the transaction when the savepoint was set
 
812
  */
 
813
  {
 
814
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
 
815
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
 
816
    TransactionContext::ResourceContexts set_difference_contexts;
 
817
 
 
818
    /* 
 
819
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
 
820
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
 
821
     * here
 
822
     */
 
823
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
 
824
 
 
825
    sort(sorted_tran_resource_contexts.begin(),
 
826
         sorted_tran_resource_contexts.end(),
 
827
         ResourceContextCompare());
 
828
    sort(sorted_sv_resource_contexts.begin(),
 
829
         sorted_sv_resource_contexts.end(),
 
830
         ResourceContextCompare());
 
831
    set_difference(sorted_tran_resource_contexts.begin(),
 
832
                   sorted_tran_resource_contexts.end(),
 
833
                   sorted_sv_resource_contexts.begin(),
 
834
                   sorted_sv_resource_contexts.end(),
 
835
                   set_difference_contexts.begin(),
 
836
                   ResourceContextCompare());
 
837
    /* 
 
838
     * set_difference_contexts now contains all resource contexts
 
839
     * which are in the transaction context but were NOT in the
 
840
     * savepoint's resource contexts.
 
841
     */
 
842
        
 
843
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
 
844
         it != set_difference_contexts.end();
 
845
         ++it)
 
846
    {
 
847
      ResourceContext *resource_context= *it;
 
848
      int err;
 
849
 
 
850
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
 
851
 
 
852
      if (resource->participatesInSqlTransaction())
 
853
      {
 
854
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
 
855
        {
 
856
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
 
857
          error= 1;
 
858
        }
 
859
        else
 
860
        {
 
861
          session->status_var.ha_rollback_count++;
 
862
        }
 
863
      }
 
864
      resource_context->reset(); /* keep it conveniently zero-filled */
 
865
    }
 
866
  }
 
867
  trans->setResourceContexts(sv_resource_contexts);
 
868
 
 
869
  if (shouldConstructMessages())
 
870
  {
 
871
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
 
872
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
 
873
    if (savepoint_transaction != NULL)
 
874
    {
 
875
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
 
876
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
 
877
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
 
878
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
 
879
      */ 
 
880
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
 
881
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
 
882
      if (num_statements == 0)
 
883
      {    
 
884
        session->setStatementMessage(NULL);
 
885
      }    
 
886
      else 
 
887
      {
 
888
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
889
      }    
 
890
      session->setTransactionMessage(savepoint_transaction_copy);
 
891
    }
 
892
  }
 
893
 
 
894
  return error;
 
895
}
 
896
 
 
897
/**
 
898
  @note
 
899
  according to the sql standard (ISO/IEC 9075-2:2003)
 
900
  section "4.33.4 SQL-statements and transaction states",
 
901
  NamedSavepoint is *not* transaction-initiating SQL-statement
 
902
*/
 
903
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
 
904
{
 
905
  int error= 0;
 
906
  TransactionContext *trans= &session->transaction.all;
 
907
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
908
 
 
909
  if (resource_contexts.empty() == false)
 
910
  {
 
911
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
912
         it != resource_contexts.end();
 
913
         ++it)
 
914
    {
 
915
      ResourceContext *resource_context= *it;
 
916
      int err;
 
917
 
 
918
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
 
919
 
 
920
      if (resource->participatesInSqlTransaction())
 
921
      {
 
922
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
 
923
        {
 
924
          my_error(ER_GET_ERRNO, MYF(0), err);
 
925
          error= 1;
 
926
        }
 
927
        else
 
928
        {
 
929
          session->status_var.ha_savepoint_count++;
 
930
        }
 
931
      }
 
932
    }
 
933
  }
 
934
  /*
 
935
    Remember the list of registered storage engines.
 
936
  */
 
937
  sv.setResourceContexts(resource_contexts);
 
938
 
 
939
  if (shouldConstructMessages())
 
940
  {
 
941
    message::Transaction *transaction= session->getTransactionMessage();
 
942
                  
 
943
    if (transaction != NULL)
 
944
    {
 
945
      message::Transaction *transaction_savepoint= 
 
946
        new message::Transaction(*transaction);
 
947
      sv.setTransactionMessage(transaction_savepoint);
 
948
    }
 
949
  } 
 
950
 
 
951
  return error;
 
952
}
 
953
 
 
954
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
 
955
{
 
956
  int error= 0;
 
957
 
 
958
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
 
959
 
 
960
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
961
       it != resource_contexts.end();
 
962
       ++it)
 
963
  {
 
964
    int err;
 
965
    ResourceContext *resource_context= *it;
 
966
 
 
967
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
 
968
 
 
969
    if (resource->participatesInSqlTransaction())
 
970
    {
 
971
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
 
972
      {
 
973
        my_error(ER_GET_ERRNO, MYF(0), err);
 
974
        error= 1;
 
975
      }
 
976
    }
 
977
  }
 
978
  
 
979
  return error;
 
980
}
 
981
 
 
982
bool TransactionServices::shouldConstructMessages()
 
983
{
 
984
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
985
  return replication_services.isActive();
 
986
}
 
987
 
 
988
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
 
989
{
 
990
  message::Transaction *transaction= in_session->getTransactionMessage();
 
991
 
 
992
  if (unlikely(transaction == NULL))
 
993
  {
 
994
    /* 
 
995
     * Allocate and initialize a new transaction message 
 
996
     * for this Session object.  Session is responsible for
 
997
     * deleting transaction message when done with it.
 
998
     */
 
999
    transaction= new (nothrow) message::Transaction();
 
1000
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
 
1001
    in_session->setTransactionMessage(transaction);
 
1002
    return transaction;
 
1003
  }
 
1004
  else
 
1005
    return transaction;
 
1006
}
 
1007
 
 
1008
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
1009
                                                 Session *in_session,
 
1010
                                                 bool should_inc_trx_id)
 
1011
{
 
1012
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1013
  trx->set_server_id(in_session->getServerId());
 
1014
 
 
1015
  if (should_inc_trx_id)
 
1016
  {
 
1017
    trx->set_transaction_id(getCurrentTransactionId(in_session));
 
1018
    in_session->setXaId(0);
 
1019
  }  
 
1020
  else
 
1021
  { 
 
1022
    trx->set_transaction_id(0);
 
1023
  }
 
1024
 
 
1025
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1026
}
 
1027
 
 
1028
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
1029
                                              Session *in_session)
 
1030
{
 
1031
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1032
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
1033
}
 
1034
 
 
1035
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
1036
                                             Session *in_session)
 
1037
{
 
1038
  delete in_transaction;
 
1039
  in_session->setStatementMessage(NULL);
 
1040
  in_session->setTransactionMessage(NULL);
 
1041
  in_session->setXaId(0);
 
1042
}
 
1043
 
 
1044
int TransactionServices::commitTransactionMessage(Session *in_session)
 
1045
{
 
1046
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1047
  if (! replication_services.isActive())
 
1048
    return 0;
 
1049
 
 
1050
  /* If there is an active statement message, finalize it */
 
1051
  message::Statement *statement= in_session->getStatementMessage();
 
1052
 
 
1053
  if (statement != NULL)
 
1054
  {
 
1055
    finalizeStatementMessage(*statement, in_session);
 
1056
  }
 
1057
  else
 
1058
    return 0; /* No data modification occurred inside the transaction */
 
1059
  
 
1060
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1061
 
 
1062
  finalizeTransactionMessage(*transaction, in_session);
 
1063
  
 
1064
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
 
1065
 
 
1066
  cleanupTransactionMessage(transaction, in_session);
 
1067
 
 
1068
  return static_cast<int>(result);
 
1069
}
 
1070
 
 
1071
void TransactionServices::initStatementMessage(message::Statement &statement,
 
1072
                                        message::Statement::Type in_type,
 
1073
                                        Session *in_session)
 
1074
{
 
1075
  statement.set_type(in_type);
 
1076
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
1077
 
 
1078
  if (in_session->variables.replicate_query)
 
1079
    statement.set_sql(in_session->getQueryString()->c_str());
 
1080
}
 
1081
 
 
1082
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
 
1083
                                            Session *in_session)
 
1084
{
 
1085
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1086
  in_session->setStatementMessage(NULL);
 
1087
}
 
1088
 
 
1089
void TransactionServices::rollbackTransactionMessage(Session *in_session)
 
1090
{
 
1091
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1092
  if (! replication_services.isActive())
 
1093
    return;
 
1094
  
 
1095
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1096
 
 
1097
  /*
 
1098
   * OK, so there are two situations that we need to deal with here:
 
1099
   *
 
1100
   * 1) We receive an instruction to ROLLBACK the current transaction
 
1101
   *    and the currently-stored Transaction message is *self-contained*, 
 
1102
   *    meaning that no Statement messages in the Transaction message
 
1103
   *    contain a message having its segment_id member greater than 1.  If
 
1104
   *    no non-segment ID 1 members are found, we can simply clear the
 
1105
   *    current Transaction message and remove it from memory.
 
1106
   *
 
1107
   * 2) If the Transaction message does indeed have a non-end segment, that
 
1108
   *    means that a bulk update/delete/insert Transaction message segment
 
1109
   *    has previously been sent over the wire to replicators.  In this case, 
 
1110
   *    we need to package a Transaction with a Statement message of type
 
1111
   *    ROLLBACK to indicate to replicators that previously-transmitted
 
1112
   *    messages must be un-applied.
 
1113
   */
 
1114
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
 
1115
  {
 
1116
    /* Remember the transaction ID so we can re-use it */
 
1117
    uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1118
 
 
1119
    /*
 
1120
     * Clear the transaction, create a Rollback statement message, 
 
1121
     * attach it to the transaction, and push it to replicators.
 
1122
     */
 
1123
    transaction->Clear();
 
1124
    initTransactionMessage(*transaction, in_session, false);
 
1125
 
 
1126
    /* Set the transaction ID to match the previous messages */
 
1127
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1128
 
 
1129
    message::Statement *statement= transaction->add_statement();
 
1130
 
 
1131
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1132
    finalizeStatementMessage(*statement, in_session);
 
1133
 
 
1134
    finalizeTransactionMessage(*transaction, in_session);
 
1135
    
 
1136
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1137
  }
 
1138
  cleanupTransactionMessage(transaction, in_session);
 
1139
}
 
1140
 
 
1141
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1142
                                                            Table *in_table,
 
1143
                                                            uint32_t *next_segment_id)
 
1144
{
 
1145
  message::Statement *statement= in_session->getStatementMessage();
 
1146
  message::Transaction *transaction= NULL;
 
1147
 
 
1148
  /* 
 
1149
   * Check the type for the current Statement message, if it is anything
 
1150
   * other then INSERT we need to call finalize, this will ensure a 
 
1151
   * new InsertStatement is created. If it is of type INSERT check
 
1152
   * what table the INSERT belongs to, if it is a different table
 
1153
   * call finalize, so a new InsertStatement can be created. 
 
1154
   */
 
1155
  if (statement != NULL && statement->type() != message::Statement::INSERT)
 
1156
  {
 
1157
    finalizeStatementMessage(*statement, in_session);
 
1158
    statement= in_session->getStatementMessage();
 
1159
  } 
 
1160
  else if (statement != NULL)
 
1161
  {
 
1162
    transaction= getActiveTransactionMessage(in_session);
 
1163
 
 
1164
    /*
 
1165
     * If we've passed our threshold for the statement size (possible for
 
1166
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1167
     * the Transaction will keep it from getting huge).
 
1168
     */
 
1169
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1170
      in_session->variables.transaction_message_threshold)
 
1171
    {
 
1172
      /* Remember the transaction ID so we can re-use it */
 
1173
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1174
 
 
1175
      message::InsertData *current_data= statement->mutable_insert_data();
 
1176
 
 
1177
      /* Caller should use this value when adding a new record */
 
1178
      *next_segment_id= current_data->segment_id() + 1;
 
1179
 
 
1180
      current_data->set_end_segment(false);
 
1181
 
 
1182
      /* 
 
1183
       * Send the trx message to replicators after finalizing the 
 
1184
       * statement and transaction. This will also set the Transaction
 
1185
       * and Statement objects in Session to NULL.
 
1186
       */
 
1187
      commitTransactionMessage(in_session);
 
1188
 
 
1189
      /*
 
1190
       * Statement and Transaction should now be NULL, so new ones will get
 
1191
       * created. We reuse the transaction id since we are segmenting
 
1192
       * one transaction.
 
1193
       */
 
1194
      statement= in_session->getStatementMessage();
 
1195
      transaction= getActiveTransactionMessage(in_session, false);
 
1196
      assert(transaction != NULL);
 
1197
 
 
1198
      /* Set the transaction ID to match the previous messages */
 
1199
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1200
    }
 
1201
    else
 
1202
    {
 
1203
      const message::InsertHeader &insert_header= statement->insert_header();
 
1204
      string old_table_name= insert_header.table_metadata().table_name();
 
1205
     
 
1206
      string current_table_name;
 
1207
      (void) in_table->getShare()->getTableName(current_table_name);
 
1208
 
 
1209
      if (current_table_name.compare(old_table_name))
 
1210
      {
 
1211
        finalizeStatementMessage(*statement, in_session);
 
1212
        statement= in_session->getStatementMessage();
 
1213
      }
 
1214
      else
 
1215
      {
 
1216
        /* append this INSERT query string */
 
1217
        if (in_session->variables.replicate_query)
 
1218
        {
 
1219
          string s(statement->sql());
 
1220
          if (not s.empty())
 
1221
          {
 
1222
            s.append(" ; ");
 
1223
            s.append(in_session->getQueryString()->c_str());
 
1224
            statement->set_sql(s);
 
1225
          }
 
1226
          else
 
1227
            statement->set_sql(in_session->getQueryString()->c_str());
 
1228
        }
 
1229
 
 
1230
        /* carry forward the existing segment id */
 
1231
        const message::InsertData &current_data= statement->insert_data();
 
1232
        *next_segment_id= current_data.segment_id();
 
1233
      }
 
1234
    }
 
1235
  } 
 
1236
 
 
1237
  if (statement == NULL)
 
1238
  {
 
1239
    /*
 
1240
     * Transaction will be non-NULL only if we had to segment it due to
 
1241
     * transaction size above.
 
1242
     */
 
1243
    if (transaction == NULL)
 
1244
      transaction= getActiveTransactionMessage(in_session);
 
1245
 
 
1246
    /* 
 
1247
     * Transaction message initialized and set, but no statement created
 
1248
     * yet.  We construct one and initialize it, here, then return the
 
1249
     * message after attaching the new Statement message pointer to the 
 
1250
     * Session for easy retrieval later...
 
1251
     */
 
1252
    statement= transaction->add_statement();
 
1253
    setInsertHeader(*statement, in_session, in_table);
 
1254
    in_session->setStatementMessage(statement);
 
1255
  }
 
1256
  return *statement;
 
1257
}
 
1258
 
 
1259
void TransactionServices::setInsertHeader(message::Statement &statement,
 
1260
                                          Session *in_session,
 
1261
                                          Table *in_table)
 
1262
{
 
1263
  initStatementMessage(statement, message::Statement::INSERT, in_session);
 
1264
 
 
1265
  /* 
 
1266
   * Now we construct the specialized InsertHeader message inside
 
1267
   * the generalized message::Statement container...
 
1268
   */
 
1269
  /* Set up the insert header */
 
1270
  message::InsertHeader *header= statement.mutable_insert_header();
 
1271
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1272
 
 
1273
  string schema_name;
 
1274
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1275
  string table_name;
 
1276
  (void) in_table->getShare()->getTableName(table_name);
 
1277
 
 
1278
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1279
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1280
 
 
1281
  Field *current_field;
 
1282
  Field **table_fields= in_table->getFields();
 
1283
 
 
1284
  message::FieldMetadata *field_metadata;
 
1285
 
 
1286
  /* We will read all the table's fields... */
 
1287
  in_table->setReadSet();
 
1288
 
 
1289
  while ((current_field= *table_fields++) != NULL) 
 
1290
  {
 
1291
    field_metadata= header->add_field_metadata();
 
1292
    field_metadata->set_name(current_field->field_name);
 
1293
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1294
  }
 
1295
}
 
1296
 
 
1297
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
1298
{
 
1299
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1300
  if (! replication_services.isActive())
 
1301
    return false;
 
1302
  /**
 
1303
   * We do this check here because we don't want to even create a 
 
1304
   * statement if there isn't a primary key on the table...
 
1305
   *
 
1306
   * @todo
 
1307
   *
 
1308
   * Multi-column primary keys are handled how exactly?
 
1309
   */
 
1310
  if (not in_table->getShare()->hasPrimaryKey())
 
1311
  {
 
1312
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
 
1313
    return true;
 
1314
  }
 
1315
 
 
1316
  uint32_t next_segment_id= 1;
 
1317
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
 
1318
 
 
1319
  message::InsertData *data= statement.mutable_insert_data();
 
1320
  data->set_segment_id(next_segment_id);
 
1321
  data->set_end_segment(true);
 
1322
  message::InsertRecord *record= data->add_record();
 
1323
 
 
1324
  Field *current_field;
 
1325
  Field **table_fields= in_table->getFields();
 
1326
 
 
1327
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1328
  string_value->set_charset(system_charset_info);
 
1329
 
 
1330
  /* We will read all the table's fields... */
 
1331
  in_table->setReadSet();
 
1332
 
 
1333
  while ((current_field= *table_fields++) != NULL) 
 
1334
  {
 
1335
    if (current_field->is_null())
 
1336
    {
 
1337
      record->add_is_null(true);
 
1338
      record->add_insert_value("", 0);
 
1339
    } 
 
1340
    else 
 
1341
    {
 
1342
      string_value= current_field->val_str(string_value);
 
1343
      record->add_is_null(false);
 
1344
      record->add_insert_value(string_value->c_ptr(), string_value->length());
 
1345
      string_value->free();
 
1346
    }
 
1347
  }
 
1348
  return false;
 
1349
}
 
1350
 
 
1351
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1352
                                                            Table *in_table,
 
1353
                                                            const unsigned char *old_record, 
 
1354
                                                            const unsigned char *new_record,
 
1355
                                                            uint32_t *next_segment_id)
 
1356
{
 
1357
  message::Statement *statement= in_session->getStatementMessage();
 
1358
  message::Transaction *transaction= NULL;
 
1359
 
 
1360
  /*
 
1361
   * Check the type for the current Statement message, if it is anything
 
1362
   * other then UPDATE we need to call finalize, this will ensure a
 
1363
   * new UpdateStatement is created. If it is of type UPDATE check
 
1364
   * what table the UPDATE belongs to, if it is a different table
 
1365
   * call finalize, so a new UpdateStatement can be created.
 
1366
   */
 
1367
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
 
1368
  {
 
1369
    finalizeStatementMessage(*statement, in_session);
 
1370
    statement= in_session->getStatementMessage();
 
1371
  }
 
1372
  else if (statement != NULL)
 
1373
  {
 
1374
    transaction= getActiveTransactionMessage(in_session);
 
1375
 
 
1376
    /*
 
1377
     * If we've passed our threshold for the statement size (possible for
 
1378
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1379
     * the Transaction will keep it from getting huge).
 
1380
     */
 
1381
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1382
      in_session->variables.transaction_message_threshold)
 
1383
    {
 
1384
      /* Remember the transaction ID so we can re-use it */
 
1385
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1386
 
 
1387
      message::UpdateData *current_data= statement->mutable_update_data();
 
1388
 
 
1389
      /* Caller should use this value when adding a new record */
 
1390
      *next_segment_id= current_data->segment_id() + 1;
 
1391
 
 
1392
      current_data->set_end_segment(false);
 
1393
 
 
1394
      /*
 
1395
       * Send the trx message to replicators after finalizing the 
 
1396
       * statement and transaction. This will also set the Transaction
 
1397
       * and Statement objects in Session to NULL.
 
1398
       */
 
1399
      commitTransactionMessage(in_session);
 
1400
 
 
1401
      /*
 
1402
       * Statement and Transaction should now be NULL, so new ones will get
 
1403
       * created. We reuse the transaction id since we are segmenting
 
1404
       * one transaction.
 
1405
       */
 
1406
      statement= in_session->getStatementMessage();
 
1407
      transaction= getActiveTransactionMessage(in_session, false);
 
1408
      assert(transaction != NULL);
 
1409
 
 
1410
      /* Set the transaction ID to match the previous messages */
 
1411
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1412
    }
 
1413
    else
 
1414
    {
 
1415
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
 
1416
      {
 
1417
        /* append this UPDATE query string */
 
1418
        if (in_session->variables.replicate_query)
 
1419
        {
 
1420
          string s(statement->sql());
 
1421
          if (not s.empty())
 
1422
          {
 
1423
            s.append(" ; ");
 
1424
            s.append(in_session->getQueryString()->c_str());
 
1425
            statement->set_sql(s);
 
1426
          }
 
1427
          else
 
1428
            statement->set_sql(in_session->getQueryString()->c_str());
 
1429
        }
 
1430
 
 
1431
        /* carry forward the existing segment id */
 
1432
        const message::UpdateData &current_data= statement->update_data();
 
1433
        *next_segment_id= current_data.segment_id();
 
1434
      } 
 
1435
      else 
 
1436
      {
 
1437
        finalizeStatementMessage(*statement, in_session);
 
1438
        statement= in_session->getStatementMessage();
 
1439
      }
 
1440
    }
 
1441
  }
 
1442
 
 
1443
  if (statement == NULL)
 
1444
  {
 
1445
    /*
 
1446
     * Transaction will be non-NULL only if we had to segment it due to
 
1447
     * transaction size above.
 
1448
     */
 
1449
    if (transaction == NULL)
 
1450
      transaction= getActiveTransactionMessage(in_session);
 
1451
 
 
1452
    /* 
 
1453
     * Transaction message initialized and set, but no statement created
 
1454
     * yet.  We construct one and initialize it, here, then return the
 
1455
     * message after attaching the new Statement message pointer to the 
 
1456
     * Session for easy retrieval later...
 
1457
     */
 
1458
    statement= transaction->add_statement();
 
1459
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1460
    in_session->setStatementMessage(statement);
 
1461
  }
 
1462
  return *statement;
 
1463
}
 
1464
 
 
1465
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
 
1466
                                                  Table *in_table,
 
1467
                                                  const unsigned char *old_record,
 
1468
                                                  const unsigned char *new_record)
 
1469
{
 
1470
  const message::UpdateHeader &update_header= statement.update_header();
 
1471
  string old_table_name= update_header.table_metadata().table_name();
 
1472
 
 
1473
  string current_table_name;
 
1474
  (void) in_table->getShare()->getTableName(current_table_name);
 
1475
  if (current_table_name.compare(old_table_name))
 
1476
  {
 
1477
    return false;
 
1478
  }
 
1479
  else
 
1480
  {
 
1481
    /* Compare the set fields in the existing UpdateHeader and see if they
 
1482
     * match the updated fields in the new record, if they do not we must
 
1483
     * create a new UpdateHeader 
 
1484
     */
 
1485
    size_t num_set_fields= update_header.set_field_metadata_size();
 
1486
 
 
1487
    Field *current_field;
 
1488
    Field **table_fields= in_table->getFields();
 
1489
    in_table->setReadSet();
 
1490
 
 
1491
    size_t num_calculated_updated_fields= 0;
 
1492
    bool found= false;
 
1493
    while ((current_field= *table_fields++) != NULL)
 
1494
    {
 
1495
      if (num_calculated_updated_fields > num_set_fields)
 
1496
      {
 
1497
        break;
 
1498
      }
 
1499
 
 
1500
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1501
      {
 
1502
        /* check that this field exists in the UpdateHeader record */
 
1503
        found= false;
 
1504
 
 
1505
        for (size_t x= 0; x < num_set_fields; ++x)
 
1506
        {
 
1507
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
 
1508
          string name= field_metadata.name();
 
1509
          if (name.compare(current_field->field_name) == 0)
 
1510
          {
 
1511
            found= true;
 
1512
            ++num_calculated_updated_fields;
 
1513
            break;
 
1514
          } 
 
1515
        }
 
1516
        if (! found)
 
1517
        {
 
1518
          break;
 
1519
        } 
 
1520
      }
 
1521
    }
 
1522
 
 
1523
    if ((num_calculated_updated_fields == num_set_fields) && found)
 
1524
    {
 
1525
      return true;
 
1526
    } 
 
1527
    else 
 
1528
    {
 
1529
      return false;
 
1530
    }
 
1531
  }
 
1532
}  
 
1533
 
 
1534
void TransactionServices::setUpdateHeader(message::Statement &statement,
 
1535
                                          Session *in_session,
 
1536
                                          Table *in_table,
 
1537
                                          const unsigned char *old_record, 
 
1538
                                          const unsigned char *new_record)
 
1539
{
 
1540
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
 
1541
 
 
1542
  /* 
 
1543
   * Now we construct the specialized UpdateHeader message inside
 
1544
   * the generalized message::Statement container...
 
1545
   */
 
1546
  /* Set up the update header */
 
1547
  message::UpdateHeader *header= statement.mutable_update_header();
 
1548
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1549
 
 
1550
  string schema_name;
 
1551
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1552
  string table_name;
 
1553
  (void) in_table->getShare()->getTableName(table_name);
 
1554
 
 
1555
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1556
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1557
 
 
1558
  Field *current_field;
 
1559
  Field **table_fields= in_table->getFields();
 
1560
 
 
1561
  message::FieldMetadata *field_metadata;
 
1562
 
 
1563
  /* We will read all the table's fields... */
 
1564
  in_table->setReadSet();
 
1565
 
 
1566
  while ((current_field= *table_fields++) != NULL) 
 
1567
  {
 
1568
    /*
 
1569
     * We add the "key field metadata" -- i.e. the fields which is
 
1570
     * the primary key for the table.
 
1571
     */
 
1572
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1573
    {
 
1574
      field_metadata= header->add_key_field_metadata();
 
1575
      field_metadata->set_name(current_field->field_name);
 
1576
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1577
    }
 
1578
 
 
1579
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1580
    {
 
1581
      /* Field is changed from old to new */
 
1582
      field_metadata= header->add_set_field_metadata();
 
1583
      field_metadata->set_name(current_field->field_name);
 
1584
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1585
    }
 
1586
  }
 
1587
}
 
1588
void TransactionServices::updateRecord(Session *in_session,
 
1589
                                       Table *in_table, 
 
1590
                                       const unsigned char *old_record, 
 
1591
                                       const unsigned char *new_record)
 
1592
{
 
1593
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1594
  if (! replication_services.isActive())
 
1595
    return;
 
1596
 
 
1597
  uint32_t next_segment_id= 1;
 
1598
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
 
1599
 
 
1600
  message::UpdateData *data= statement.mutable_update_data();
 
1601
  data->set_segment_id(next_segment_id);
 
1602
  data->set_end_segment(true);
 
1603
  message::UpdateRecord *record= data->add_record();
 
1604
 
 
1605
  Field *current_field;
 
1606
  Field **table_fields= in_table->getFields();
 
1607
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1608
  string_value->set_charset(system_charset_info);
 
1609
 
 
1610
  while ((current_field= *table_fields++) != NULL) 
 
1611
  {
 
1612
    /*
 
1613
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
 
1614
     * but then realized that an UPDATE statement could potentially have different values for
 
1615
     * the SET field.  For instance, imagine this SQL scenario:
 
1616
     *
 
1617
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
 
1618
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
 
1619
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
 
1620
     *
 
1621
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
1622
     */
 
1623
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1624
    {
 
1625
      /* Store the original "read bit" for this field */
 
1626
      bool is_read_set= current_field->isReadSet();
 
1627
 
 
1628
      /* We need to mark that we will "read" this field... */
 
1629
      in_table->setReadSet(current_field->field_index);
 
1630
 
 
1631
      /* Read the string value of this field's contents */
 
1632
      string_value= current_field->val_str(string_value);
 
1633
 
 
1634
      /* 
 
1635
       * Reset the read bit after reading field to its original state.  This 
 
1636
       * prevents the field from being included in the WHERE clause
 
1637
       */
 
1638
      current_field->setReadSet(is_read_set);
 
1639
 
 
1640
      if (current_field->is_null())
 
1641
      {
 
1642
        record->add_is_null(true);
 
1643
        record->add_after_value("", 0);
 
1644
      }
 
1645
      else
 
1646
      {
 
1647
        record->add_is_null(false);
 
1648
        record->add_after_value(string_value->c_ptr(), string_value->length());
 
1649
      }
 
1650
      string_value->free();
 
1651
    }
 
1652
 
 
1653
    /* 
 
1654
     * Add the WHERE clause values now...for now, this means the
 
1655
     * primary key field value.  Replication only supports tables
 
1656
     * with a primary key.
 
1657
     */
 
1658
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1659
    {
 
1660
      /**
 
1661
       * To say the below is ugly is an understatement. But it works.
 
1662
       * 
 
1663
       * @todo Move this crap into a real Record API.
 
1664
       */
 
1665
      string_value= current_field->val_str(string_value,
 
1666
                                           old_record + 
 
1667
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
1668
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1669
      string_value->free();
 
1670
    }
 
1671
 
 
1672
  }
 
1673
}
 
1674
 
 
1675
bool TransactionServices::isFieldUpdated(Field *current_field,
 
1676
                                         Table *in_table,
 
1677
                                         const unsigned char *old_record,
 
1678
                                         const unsigned char *new_record)
 
1679
{
 
1680
  /*
 
1681
   * The below really should be moved into the Field API and Record API.  But for now
 
1682
   * we do this crazy pointer fiddling to figure out if the current field
 
1683
   * has been updated in the supplied record raw byte pointers.
 
1684
   */
 
1685
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1686
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1687
 
 
1688
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1689
 
 
1690
  bool old_value_is_null= current_field->is_null_in_record(old_record);
 
1691
  bool new_value_is_null= current_field->is_null_in_record(new_record);
 
1692
 
 
1693
  bool isUpdated= false;
 
1694
  if (old_value_is_null != new_value_is_null)
 
1695
  {
 
1696
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
 
1697
    {
 
1698
      isUpdated= true;
 
1699
    }
 
1700
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
 
1701
    {
 
1702
      isUpdated= true;
 
1703
    }
 
1704
  }
 
1705
 
 
1706
  if (! isUpdated)
 
1707
  {
 
1708
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1709
    {
 
1710
      isUpdated= true;
 
1711
    }
 
1712
  }
 
1713
  return isUpdated;
 
1714
}  
 
1715
 
 
1716
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1717
                                                            Table *in_table,
 
1718
                                                            uint32_t *next_segment_id)
 
1719
{
 
1720
  message::Statement *statement= in_session->getStatementMessage();
 
1721
  message::Transaction *transaction= NULL;
 
1722
 
 
1723
  /*
 
1724
   * Check the type for the current Statement message, if it is anything
 
1725
   * other then DELETE we need to call finalize, this will ensure a
 
1726
   * new DeleteStatement is created. If it is of type DELETE check
 
1727
   * what table the DELETE belongs to, if it is a different table
 
1728
   * call finalize, so a new DeleteStatement can be created.
 
1729
   */
 
1730
  if (statement != NULL && statement->type() != message::Statement::DELETE)
 
1731
  {
 
1732
    finalizeStatementMessage(*statement, in_session);
 
1733
    statement= in_session->getStatementMessage();
 
1734
  }
 
1735
  else if (statement != NULL)
 
1736
  {
 
1737
    transaction= getActiveTransactionMessage(in_session);
 
1738
 
 
1739
    /*
 
1740
     * If we've passed our threshold for the statement size (possible for
 
1741
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1742
     * the Transaction will keep it from getting huge).
 
1743
     */
 
1744
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1745
      in_session->variables.transaction_message_threshold)
 
1746
    {
 
1747
      /* Remember the transaction ID so we can re-use it */
 
1748
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1749
 
 
1750
      message::DeleteData *current_data= statement->mutable_delete_data();
 
1751
 
 
1752
      /* Caller should use this value when adding a new record */
 
1753
      *next_segment_id= current_data->segment_id() + 1;
 
1754
 
 
1755
      current_data->set_end_segment(false);
 
1756
 
 
1757
      /* 
 
1758
       * Send the trx message to replicators after finalizing the 
 
1759
       * statement and transaction. This will also set the Transaction
 
1760
       * and Statement objects in Session to NULL.
 
1761
       */
 
1762
      commitTransactionMessage(in_session);
 
1763
 
 
1764
      /*
 
1765
       * Statement and Transaction should now be NULL, so new ones will get
 
1766
       * created. We reuse the transaction id since we are segmenting
 
1767
       * one transaction.
 
1768
       */
 
1769
      statement= in_session->getStatementMessage();
 
1770
      transaction= getActiveTransactionMessage(in_session, false);
 
1771
      assert(transaction != NULL);
 
1772
 
 
1773
      /* Set the transaction ID to match the previous messages */
 
1774
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1775
    }
 
1776
    else
 
1777
    {
 
1778
      const message::DeleteHeader &delete_header= statement->delete_header();
 
1779
      string old_table_name= delete_header.table_metadata().table_name();
 
1780
 
 
1781
      string current_table_name;
 
1782
      (void) in_table->getShare()->getTableName(current_table_name);
 
1783
      if (current_table_name.compare(old_table_name))
 
1784
      {
 
1785
        finalizeStatementMessage(*statement, in_session);
 
1786
        statement= in_session->getStatementMessage();
 
1787
      }
 
1788
      else
 
1789
      {
 
1790
        /* append this DELETE query string */
 
1791
        if (in_session->variables.replicate_query)
 
1792
        {
 
1793
          string s(statement->sql());
 
1794
          if (not s.empty())
 
1795
          {
 
1796
            s.append(" ; ");
 
1797
            s.append(in_session->getQueryString()->c_str());
 
1798
            statement->set_sql(s);
 
1799
          }
 
1800
          else
 
1801
            statement->set_sql(in_session->getQueryString()->c_str());
 
1802
        }
 
1803
 
 
1804
        /* carry forward the existing segment id */
 
1805
        const message::DeleteData &current_data= statement->delete_data();
 
1806
        *next_segment_id= current_data.segment_id();
 
1807
      }
 
1808
    }
 
1809
  }
 
1810
 
 
1811
  if (statement == NULL)
 
1812
  {
 
1813
    /*
 
1814
     * Transaction will be non-NULL only if we had to segment it due to
 
1815
     * transaction size above.
 
1816
     */
 
1817
    if (transaction == NULL)
 
1818
      transaction= getActiveTransactionMessage(in_session);
 
1819
 
 
1820
    /* 
 
1821
     * Transaction message initialized and set, but no statement created
 
1822
     * yet.  We construct one and initialize it, here, then return the
 
1823
     * message after attaching the new Statement message pointer to the 
 
1824
     * Session for easy retrieval later...
 
1825
     */
 
1826
    statement= transaction->add_statement();
 
1827
    setDeleteHeader(*statement, in_session, in_table);
 
1828
    in_session->setStatementMessage(statement);
 
1829
  }
 
1830
  return *statement;
 
1831
}
 
1832
 
 
1833
void TransactionServices::setDeleteHeader(message::Statement &statement,
 
1834
                                          Session *in_session,
 
1835
                                          Table *in_table)
 
1836
{
 
1837
  initStatementMessage(statement, message::Statement::DELETE, in_session);
 
1838
 
 
1839
  /* 
 
1840
   * Now we construct the specialized DeleteHeader message inside
 
1841
   * the generalized message::Statement container...
 
1842
   */
 
1843
  message::DeleteHeader *header= statement.mutable_delete_header();
 
1844
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1845
 
 
1846
  string schema_name;
 
1847
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1848
  string table_name;
 
1849
  (void) in_table->getShare()->getTableName(table_name);
 
1850
 
 
1851
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1852
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1853
 
 
1854
  Field *current_field;
 
1855
  Field **table_fields= in_table->getFields();
 
1856
 
 
1857
  message::FieldMetadata *field_metadata;
 
1858
 
 
1859
  while ((current_field= *table_fields++) != NULL) 
 
1860
  {
 
1861
    /* 
 
1862
     * Add the WHERE clause values now...for now, this means the
 
1863
     * primary key field value.  Replication only supports tables
 
1864
     * with a primary key.
 
1865
     */
 
1866
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1867
    {
 
1868
      field_metadata= header->add_key_field_metadata();
 
1869
      field_metadata->set_name(current_field->field_name);
 
1870
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1871
    }
 
1872
  }
 
1873
}
 
1874
 
 
1875
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
 
1876
{
 
1877
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1878
  if (! replication_services.isActive())
 
1879
    return;
 
1880
 
 
1881
  uint32_t next_segment_id= 1;
 
1882
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
 
1883
 
 
1884
  message::DeleteData *data= statement.mutable_delete_data();
 
1885
  data->set_segment_id(next_segment_id);
 
1886
  data->set_end_segment(true);
 
1887
  message::DeleteRecord *record= data->add_record();
 
1888
 
 
1889
  Field *current_field;
 
1890
  Field **table_fields= in_table->getFields();
 
1891
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1892
  string_value->set_charset(system_charset_info);
 
1893
 
 
1894
  while ((current_field= *table_fields++) != NULL) 
 
1895
  {
 
1896
    /* 
 
1897
     * Add the WHERE clause values now...for now, this means the
 
1898
     * primary key field value.  Replication only supports tables
 
1899
     * with a primary key.
 
1900
     */
 
1901
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1902
    {
 
1903
      if (use_update_record)
 
1904
      {
 
1905
        /*
 
1906
         * Temporarily point to the update record to get its value.
 
1907
         * This is pretty much a hack in order to get the PK value from
 
1908
         * the update record rather than the insert record. Field::val_str()
 
1909
         * should not change anything in Field::ptr, so this should be safe.
 
1910
         * We are careful not to change anything in old_ptr.
 
1911
         */
 
1912
        const unsigned char *old_ptr= current_field->ptr;
 
1913
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1914
        string_value= current_field->val_str(string_value);
 
1915
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
 
1916
      }
 
1917
      else
 
1918
      {
 
1919
        string_value= current_field->val_str(string_value);
 
1920
        /**
 
1921
         * @TODO Store optional old record value in the before data member
 
1922
         */
 
1923
      }
 
1924
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1925
      string_value->free();
 
1926
    }
 
1927
  }
 
1928
}
 
1929
 
 
1930
 
 
1931
/**
 
1932
 * Template for removing Statement records of different types.
 
1933
 *
 
1934
 * The code for removing records from different Statement message types
 
1935
 * is identical except for the class types that are embedded within the
 
1936
 * Statement.
 
1937
 *
 
1938
 * There are 3 scenarios we need to look for:
 
1939
 *   - We've been asked to remove more records than exist in the Statement
 
1940
 *   - We've been asked to remove less records than exist in the Statement
 
1941
 *   - We've been asked to remove ALL records that exist in the Statement
 
1942
 *
 
1943
 * If we are removing ALL records, then effectively we would be left with
 
1944
 * an empty Statement message, so we should just remove it and clean up
 
1945
 * message pointers in the Session object.
 
1946
 */
 
1947
template <class DataType, class RecordType>
 
1948
static bool removeStatementRecordsWithType(Session *session,
 
1949
                                           DataType *data,
 
1950
                                           uint32_t count)
 
1951
{
 
1952
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
 
1953
 
 
1954
  /* If there aren't enough records to remove 'count' of them, error. */
 
1955
  if (num_avail_recs < count)
 
1956
    return false;
 
1957
 
 
1958
  /*
 
1959
   * If we are removing all of the data records, we'll just remove this
 
1960
   * entire Statement message.
 
1961
   */
 
1962
  if (num_avail_recs == count)
 
1963
  {
 
1964
    message::Transaction *transaction= session->getTransactionMessage();
 
1965
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
 
1966
    statements->RemoveLast();
 
1967
 
 
1968
    /*
 
1969
     * Now need to set the Session Statement pointer to either the previous
 
1970
     * Statement, or NULL if there isn't one.
 
1971
     */
 
1972
    if (statements->size() == 0)
 
1973
    {
 
1974
      session->setStatementMessage(NULL);
 
1975
    }
 
1976
    else
 
1977
    {
 
1978
      /*
 
1979
       * There isn't a great way to get a pointer to the previous Statement
 
1980
       * message using the RepeatedPtrField object, so we'll just get to it
 
1981
       * using the Transaction message.
 
1982
       */
 
1983
      int last_stmt_idx= transaction->statement_size() - 1;
 
1984
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
 
1985
    }
 
1986
  }
 
1987
  /* We only need to remove 'count' records */
 
1988
  else if (num_avail_recs > count)
 
1989
  {
 
1990
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
 
1991
    while (count--)
 
1992
      records->RemoveLast();
 
1993
  }
 
1994
 
 
1995
  return true;
 
1996
}
 
1997
 
 
1998
 
 
1999
bool TransactionServices::removeStatementRecords(Session *session,
 
2000
                                                 uint32_t count)
 
2001
{
 
2002
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2003
  if (! replication_services.isActive())
 
2004
    return false;
 
2005
 
 
2006
  /* Get the most current Statement */
 
2007
  message::Statement *statement= session->getStatementMessage();
 
2008
 
 
2009
  /* Make sure we have work to do */
 
2010
  if (statement == NULL)
 
2011
    return false;
 
2012
 
 
2013
  bool retval= false;
 
2014
 
 
2015
  switch (statement->type())
 
2016
  {
 
2017
    case message::Statement::INSERT:
 
2018
    {
 
2019
      message::InsertData *data= statement->mutable_insert_data();
 
2020
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
 
2021
      break;
 
2022
    }
 
2023
 
 
2024
    case message::Statement::UPDATE:
 
2025
    {
 
2026
      message::UpdateData *data= statement->mutable_update_data();
 
2027
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
 
2028
      break;
 
2029
    }
 
2030
 
 
2031
    case message::Statement::DELETE:  /* not sure if this one is possible... */
 
2032
    {
 
2033
      message::DeleteData *data= statement->mutable_delete_data();
 
2034
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
 
2035
      break;
 
2036
    }
 
2037
 
 
2038
    default:
 
2039
      retval= false;
 
2040
      break;
 
2041
  }
 
2042
 
 
2043
  return retval;
 
2044
}
 
2045
 
 
2046
 
 
2047
void TransactionServices::createTable(Session *in_session,
 
2048
                                      const message::Table &table)
 
2049
{
 
2050
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2051
  if (! replication_services.isActive())
 
2052
    return;
 
2053
  
 
2054
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2055
  message::Statement *statement= transaction->add_statement();
 
2056
 
 
2057
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
 
2058
 
 
2059
  /* 
 
2060
   * Construct the specialized CreateTableStatement message and attach
 
2061
   * it to the generic Statement message
 
2062
   */
 
2063
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
 
2064
  message::Table *new_table_message= create_table_statement->mutable_table();
 
2065
  *new_table_message= table;
 
2066
 
 
2067
  finalizeStatementMessage(*statement, in_session);
 
2068
 
 
2069
  finalizeTransactionMessage(*transaction, in_session);
 
2070
  
 
2071
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2072
 
 
2073
  cleanupTransactionMessage(transaction, in_session);
 
2074
 
 
2075
}
 
2076
 
 
2077
void TransactionServices::createSchema(Session *in_session,
 
2078
                                       const message::Schema &schema)
 
2079
{
 
2080
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2081
  if (! replication_services.isActive())
 
2082
    return;
 
2083
  
 
2084
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2085
  message::Statement *statement= transaction->add_statement();
 
2086
 
 
2087
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
 
2088
 
 
2089
  /* 
 
2090
   * Construct the specialized CreateSchemaStatement message and attach
 
2091
   * it to the generic Statement message
 
2092
   */
 
2093
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
 
2094
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
 
2095
  *new_schema_message= schema;
 
2096
 
 
2097
  finalizeStatementMessage(*statement, in_session);
 
2098
 
 
2099
  finalizeTransactionMessage(*transaction, in_session);
 
2100
  
 
2101
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2102
 
 
2103
  cleanupTransactionMessage(transaction, in_session);
 
2104
 
 
2105
}
 
2106
 
 
2107
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
 
2108
{
 
2109
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2110
  if (! replication_services.isActive())
 
2111
    return;
 
2112
  
 
2113
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2114
  message::Statement *statement= transaction->add_statement();
 
2115
 
 
2116
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
 
2117
 
 
2118
  /* 
 
2119
   * Construct the specialized DropSchemaStatement message and attach
 
2120
   * it to the generic Statement message
 
2121
   */
 
2122
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
 
2123
 
 
2124
  drop_schema_statement->set_schema_name(schema_name);
 
2125
 
 
2126
  finalizeStatementMessage(*statement, in_session);
 
2127
 
 
2128
  finalizeTransactionMessage(*transaction, in_session);
 
2129
  
 
2130
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2131
 
 
2132
  cleanupTransactionMessage(transaction, in_session);
 
2133
}
 
2134
 
 
2135
void TransactionServices::dropTable(Session *in_session,
 
2136
                                    const string &schema_name,
 
2137
                                    const string &table_name,
 
2138
                                    bool if_exists)
 
2139
{
 
2140
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2141
  if (! replication_services.isActive())
 
2142
    return;
 
2143
  
 
2144
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2145
  message::Statement *statement= transaction->add_statement();
 
2146
 
 
2147
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
 
2148
 
 
2149
  /* 
 
2150
   * Construct the specialized DropTableStatement message and attach
 
2151
   * it to the generic Statement message
 
2152
   */
 
2153
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
 
2154
 
 
2155
  drop_table_statement->set_if_exists_clause(if_exists);
 
2156
 
 
2157
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
 
2158
 
 
2159
  table_metadata->set_schema_name(schema_name);
 
2160
  table_metadata->set_table_name(table_name);
 
2161
 
 
2162
  finalizeStatementMessage(*statement, in_session);
 
2163
 
 
2164
  finalizeTransactionMessage(*transaction, in_session);
 
2165
  
 
2166
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2167
 
 
2168
  cleanupTransactionMessage(transaction, in_session);
 
2169
}
 
2170
 
 
2171
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
 
2172
{
 
2173
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2174
  if (! replication_services.isActive())
 
2175
    return;
 
2176
  
 
2177
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2178
  message::Statement *statement= transaction->add_statement();
 
2179
 
 
2180
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
2181
 
 
2182
  /* 
 
2183
   * Construct the specialized TruncateTableStatement message and attach
 
2184
   * it to the generic Statement message
 
2185
   */
 
2186
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
 
2187
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
 
2188
 
 
2189
  string schema_name;
 
2190
  (void) in_table->getShare()->getSchemaName(schema_name);
 
2191
  string table_name;
 
2192
  (void) in_table->getShare()->getTableName(table_name);
 
2193
 
 
2194
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
2195
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
2196
 
 
2197
  finalizeStatementMessage(*statement, in_session);
 
2198
 
 
2199
  finalizeTransactionMessage(*transaction, in_session);
 
2200
  
 
2201
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2202
 
 
2203
  cleanupTransactionMessage(transaction, in_session);
 
2204
}
 
2205
 
 
2206
void TransactionServices::rawStatement(Session *in_session, const string &query)
 
2207
{
 
2208
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2209
  if (! replication_services.isActive())
 
2210
    return;
 
2211
 
 
2212
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2213
  message::Statement *statement= transaction->add_statement();
 
2214
 
 
2215
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
 
2216
  statement->set_sql(query);
 
2217
  finalizeStatementMessage(*statement, in_session);
 
2218
 
 
2219
  finalizeTransactionMessage(*transaction, in_session);
 
2220
  
 
2221
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2222
 
 
2223
  cleanupTransactionMessage(transaction, in_session);
 
2224
}
 
2225
 
 
2226
int TransactionServices::sendEvent(Session *session, const message::Event &event)
 
2227
{
 
2228
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2229
  if (! replication_services.isActive())
 
2230
    return 0;
 
2231
 
 
2232
  message::Transaction *transaction= new (nothrow) message::Transaction();
 
2233
 
 
2234
  // set server id, start timestamp
 
2235
  initTransactionMessage(*transaction, session, true);
 
2236
 
 
2237
  // set end timestamp
 
2238
  finalizeTransactionMessage(*transaction, session);
 
2239
 
 
2240
  message::Event *trx_event= transaction->mutable_event();
 
2241
 
 
2242
  trx_event->CopyFrom(event);
 
2243
 
 
2244
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
 
2245
 
 
2246
  delete transaction;
 
2247
 
 
2248
  return static_cast<int>(result);
 
2249
}
 
2250
 
 
2251
bool TransactionServices::sendStartupEvent(Session *session)
 
2252
{
 
2253
  message::Event event;
 
2254
  event.set_type(message::Event::STARTUP);
 
2255
  if (sendEvent(session, event) != 0)
 
2256
    return false;
 
2257
  return true;
 
2258
}
 
2259
 
 
2260
bool TransactionServices::sendShutdownEvent(Session *session)
 
2261
{
 
2262
  message::Event event;
 
2263
  event.set_type(message::Event::SHUTDOWN);
 
2264
  if (sendEvent(session, event) != 0)
 
2265
    return false;
 
2266
  return true;
 
2267
}
 
2268
 
 
2269
} /* namespace drizzled */