~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Monty Taylor
  • Date: 2008-08-02 00:06:32 UTC
  • mto: (236.1.42 codestyle)
  • mto: This revision was merged to the branch mainline in revision 261.
  • Revision ID: monty@inaugust.com-20080802000632-jsse0zdd9r6ic5ku
Actually turn gettext on...

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  This program is free software; you can redistribute it and/or modify
8
 
 *  it under the terms of the GNU General Public License as published by
9
 
 *  the Free Software Foundation; version 2 of the License.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 
 */
20
 
 
21
 
/**
22
 
 * @file Transaction processing code
23
 
 *
24
 
 * @note
25
 
 *
26
 
 * The TransactionServices component takes internal events (for instance the start of a 
27
 
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 
 * component and used during replication.
30
 
 *
31
 
 * The reason for this functionality is to encapsulate all communication
32
 
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 
 * Instead of the plugin having to understand the (often fluidly changing)
34
 
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 
 * these messages.
37
 
 *
38
 
 * @see /drizzled/message/transaction.proto
39
 
 *
40
 
 * @todo
41
 
 *
42
 
 * We really should store the raw bytes in the messages, not the
43
 
 * String value of the Field.  But, to do that, the
44
 
 * statement_transform library needs first to be updated
45
 
 * to include the transformation code to convert raw
46
 
 * Drizzle-internal Field byte representation into something
47
 
 * plugins can understand.
48
 
 */
49
 
 
50
 
#include "config.h"
51
 
#include "drizzled/my_hash.h"
52
 
#include "drizzled/error.h"
53
 
#include "drizzled/gettext.h"
54
 
#include "drizzled/probes.h"
55
 
#include "drizzled/sql_parse.h"
56
 
#include "drizzled/session.h"
57
 
#include "drizzled/sql_base.h"
58
 
#include "drizzled/replication_services.h"
59
 
#include "drizzled/transaction_services.h"
60
 
#include "drizzled/transaction_context.h"
61
 
#include "drizzled/message/transaction.pb.h"
62
 
#include "drizzled/message/statement_transform.h"
63
 
#include "drizzled/resource_context.h"
64
 
#include "drizzled/lock.h"
65
 
#include "drizzled/item/int.h"
66
 
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/timestamp.h"
68
 
#include "drizzled/plugin/client.h"
69
 
#include "drizzled/plugin/monitored_in_transaction.h"
70
 
#include "drizzled/plugin/transactional_storage_engine.h"
71
 
#include "drizzled/plugin/xa_resource_manager.h"
72
 
#include "drizzled/internal/my_sys.h"
73
 
 
74
 
using namespace std;
75
 
 
76
 
#include <vector>
77
 
#include <algorithm>
78
 
#include <functional>
79
 
 
80
 
namespace drizzled
81
 
{
82
 
 
83
 
/** @TODO: Make this a system variable */
84
 
static const size_t trx_msg_threshold= 1024 * 1024;
85
 
 
86
 
/**
87
 
 * @defgroup Transactions
88
 
 *
89
 
 * @brief
90
 
 *
91
 
 * Transaction handling in the server
92
 
 *
93
 
 * @detail
94
 
 *
95
 
 * In each client connection, Drizzle maintains two transaction
96
 
 * contexts representing the state of the:
97
 
 *
98
 
 * 1) Statement Transaction
99
 
 * 2) Normal Transaction
100
 
 *
101
 
 * These two transaction contexts represent the transactional
102
 
 * state of a Session's SQL and XA transactions for a single
103
 
 * SQL statement or a series of SQL statements.
104
 
 *
105
 
 * When the Session's connection is in AUTOCOMMIT mode, there
106
 
 * is no practical difference between the statement and the
107
 
 * normal transaction, as each SQL statement is committed or
108
 
 * rolled back depending on the success or failure of the
109
 
 * indvidual SQL statement.
110
 
 *
111
 
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
 
 * the Session has explicitly begun a normal SQL transaction using
113
 
 * a BEGIN WORK/START TRANSACTION statement, then the normal
114
 
 * transaction context tracks the aggregate transaction state of
115
 
 * the SQL transaction's individual statements, and the SQL
116
 
 * transaction's commit or rollback is done atomically for all of
117
 
 * the SQL transaction's statement's data changes.
118
 
 *
119
 
 * Technically, a statement transaction can be viewed as a savepoint 
120
 
 * which is maintained automatically in order to make effects of one
121
 
 * statement atomic.
122
 
 *
123
 
 * The normal transaction is started by the user and is typically
124
 
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
 
 * The exception to this is that DDL statements implicitly COMMIT
126
 
 * any previously active normal transaction before they begin executing.
127
 
 *
128
 
 * In Drizzle, unlike MySQL, plugins other than a storage engine
129
 
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
130
 
 * plugins will automatically be monitored by Drizzle's transaction 
131
 
 * manager (implemented in this source file), as will all plugins which
132
 
 * implement plugin::XaResourceManager and register with the transaction
133
 
 * manager.
134
 
 *
135
 
 * If Drizzle's transaction manager sees that more than one resource
136
 
 * manager (transactional storage engine or XA resource manager) has modified
137
 
 * data state during a statement or normal transaction, the transaction
138
 
 * manager will automatically use a two-phase commit protocol for all
139
 
 * resources which support XA's distributed transaction protocol.  Unlike
140
 
 * MySQL, storage engines need not manually register with the transaction
141
 
 * manager during a statement's execution.  Previously, in MySQL, all
142
 
 * handlertons would have to call trans_register_ha() at some point after
143
 
 * modifying data state in order to have MySQL include that handler in
144
 
 * an XA transaction.  Drizzle does all of this grunt work behind the
145
 
 * scenes for the storage engine implementers.
146
 
 *
147
 
 * When a connection is closed, the current normal transaction, if
148
 
 * any is currently active, is rolled back.
149
 
 *
150
 
 * Transaction life cycle
151
 
 * ----------------------
152
 
 *
153
 
 * When a new connection is established, session->transaction
154
 
 * members are initialized to an empty state. If a statement uses any tables, 
155
 
 * all affected engines are registered in the statement engine list automatically
156
 
 * in plugin::StorageEngine::startStatement() and 
157
 
 * plugin::TransactionalStorageEngine::startTransaction().
158
 
 *
159
 
 * You can view the lifetime of a normal transaction in the following
160
 
 * call-sequence:
161
 
 *
162
 
 * drizzled::statement::Statement::execute()
163
 
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
 
 *     drizzled::TransactionServices::registerResourceForTransaction()
165
 
 *     drizzled::TransactionServices::registerResourceForStatement()
166
 
 *     drizzled::plugin::StorageEngine::startStatement()
167
 
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
 
 *     drizzled::plugin::StorageEngine::endStatement()
169
 
 *   drizzled::TransactionServices::autocommitOrRollback()
170
 
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
 
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
172
 
 *
173
 
 * Roles and responsibilities
174
 
 * --------------------------
175
 
 *
176
 
 * Beginning of SQL Statement (and Statement Transaction)
177
 
 * ------------------------------------------------------
178
 
 *
179
 
 * At the start of each SQL statement, for each storage engine
180
 
 * <strong>that is involved in the SQL statement</strong>, the kernel 
181
 
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
182
 
 * engine needs to track some data for the statement, it should use
183
 
 * this method invocation to initialize this data.  This is the
184
 
 * beginning of what is called the "statement transaction".
185
 
 *
186
 
 * <strong>For transaction storage engines (those storage engines
187
 
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
 
 * kernel automatically determines if the start of the SQL statement 
189
 
 * transaction should <em>also</em> begin the normal SQL transaction.
190
 
 * This occurs when the connection is in NOT in autocommit mode. If
191
 
 * the kernel detects this, then the kernel automatically starts the
192
 
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
 
 * method and then calls plugin::StorageEngine::startStatement()
194
 
 * afterwards.
195
 
 *
196
 
 * Beginning of an SQL "Normal" Transaction
197
 
 * ----------------------------------------
198
 
 *
199
 
 * As noted above, a "normal SQL transaction" may be started when
200
 
 * an SQL statement is started in a connection and the connection is
201
 
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
202
 
 *
203
 
 * In addition, when a user executes a START TRANSACTION or
204
 
 * BEGIN WORK statement in a connection, the kernel explicitly
205
 
 * calls each transactional storage engine's startTransaction() method.
206
 
 *
207
 
 * Ending of an SQL Statement (and Statement Transaction)
208
 
 * ------------------------------------------------------
209
 
 *
210
 
 * At the end of each SQL statement, for each of the aforementioned
211
 
 * involved storage engines, the kernel calls the engine's
212
 
 * plugin::StorageEngine::endStatement() method.  If the engine
213
 
 * has initialized or modified some internal data about the
214
 
 * statement transaction, it should use this method to reset or destroy
215
 
 * this data appropriately.
216
 
 *
217
 
 * Ending of an SQL "Normal" Transaction
218
 
 * -------------------------------------
219
 
 *
220
 
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
221
 
 * depending on the success or failure of the statement transaction(s) 
222
 
 * it encloses.
223
 
 *
224
 
 * The end of a "normal transaction" occurs when any of the following
225
 
 * occurs:
226
 
 *
227
 
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
 
 *    then the normal transaction which encloses the statement
229
 
 *    transaction ends
230
 
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
 
 * 3) Just before a DDL operation occurs, the kernel will implicitly
232
 
 *    commit the active normal transaction
233
 
 *
234
 
 * Transactions and Non-transactional Storage Engines
235
 
 * --------------------------------------------------
236
 
 *
237
 
 * For non-transactional engines, this call can be safely ignored, an
238
 
 * the kernel tracks whether a non-transactional engine has changed
239
 
 * any data state, and warns the user appropriately if a transaction
240
 
 * (statement or normal) is rolled back after such non-transactional
241
 
 * data changes have been made.
242
 
 *
243
 
 * XA Two-phase Commit Protocol
244
 
 * ----------------------------
245
 
 *
246
 
 * During statement execution, whenever any of data-modifying
247
 
 * PSEA API methods is used, e.g. Cursor::write_row() or
248
 
 * Cursor::update_row(), the read-write flag is raised in the
249
 
 * statement transaction for the involved engine.
250
 
 * Currently All PSEA calls are "traced", and the data can not be
251
 
 * changed in a way other than issuing a PSEA call. Important:
252
 
 * unless this invariant is preserved the server will not know that
253
 
 * a transaction in a given engine is read-write and will not
254
 
 * involve the two-phase commit protocol!
255
 
 *
256
 
 * At the end of a statement, TransactionServices::autocommitOrRollback()
257
 
 * is invoked. This call in turn
258
 
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
259
 
 * resource manager.
260
 
 *
261
 
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
 
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
263
 
 * is...)
264
 
 * 
265
 
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
 
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
 
 * At statement commit, the statement-related read-write engine
268
 
 * flag is propagated to the corresponding flag in the normal
269
 
 * transaction.  When the commit is complete, the list of registered
270
 
 * engines is cleared.
271
 
 *
272
 
 * Rollback is handled in a similar fashion.
273
 
 *
274
 
 * Additional notes on DDL and the normal transaction.
275
 
 * ---------------------------------------------------
276
 
 *
277
 
 * CREATE TABLE .. SELECT can start a *new* normal transaction
278
 
 * because of the fact that SELECTs on a transactional storage
279
 
 * engine participate in the normal SQL transaction (due to
280
 
 * isolation level issues and consistent read views).
281
 
 *
282
 
 * Behaviour of the server in this case is currently badly
283
 
 * defined.
284
 
 *
285
 
 * DDL statements use a form of "semantic" logging
286
 
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
 
 * the newly created table is deleted.
288
 
 * 
289
 
 * In addition, some DDL statements issue interim transaction
290
 
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
 
 * from the original table to the internal temporary table. Other
292
 
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
293
 
 * after itself.
294
 
 *
295
 
 * And finally there is a group of DDL statements such as
296
 
 * RENAME/DROP TABLE that doesn't start a new transaction
297
 
 * and doesn't commit.
298
 
 *
299
 
 * A consistent behaviour is perhaps to always commit the normal
300
 
 * transaction after all DDLs, just like the statement transaction
301
 
 * is always committed at the end of all statements.
302
 
 */
303
 
void TransactionServices::registerResourceForStatement(Session *session,
304
 
                                                       plugin::MonitoredInTransaction *monitored,
305
 
                                                       plugin::TransactionalStorageEngine *engine)
306
 
{
307
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
308
 
  {
309
 
    /* 
310
 
     * Now we automatically register this resource manager for the
311
 
     * normal transaction.  This is fine because a statement
312
 
     * transaction registration should always enlist the resource
313
 
     * in the normal transaction which contains the statement
314
 
     * transaction.
315
 
     */
316
 
    registerResourceForTransaction(session, monitored, engine);
317
 
  }
318
 
 
319
 
  TransactionContext *trans= &session->transaction.stmt;
320
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
321
 
 
322
 
  if (resource_context->isStarted())
323
 
    return; /* already registered, return */
324
 
 
325
 
  assert(monitored->participatesInSqlTransaction());
326
 
  assert(not monitored->participatesInXaTransaction());
327
 
 
328
 
  resource_context->setMonitored(monitored);
329
 
  resource_context->setTransactionalStorageEngine(engine);
330
 
  trans->registerResource(resource_context);
331
 
 
332
 
  trans->no_2pc|= true;
333
 
}
334
 
 
335
 
void TransactionServices::registerResourceForStatement(Session *session,
336
 
                                                       plugin::MonitoredInTransaction *monitored,
337
 
                                                       plugin::TransactionalStorageEngine *engine,
338
 
                                                       plugin::XaResourceManager *resource_manager)
339
 
{
340
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
341
 
  {
342
 
    /* 
343
 
     * Now we automatically register this resource manager for the
344
 
     * normal transaction.  This is fine because a statement
345
 
     * transaction registration should always enlist the resource
346
 
     * in the normal transaction which contains the statement
347
 
     * transaction.
348
 
     */
349
 
    registerResourceForTransaction(session, monitored, engine, resource_manager);
350
 
  }
351
 
 
352
 
  TransactionContext *trans= &session->transaction.stmt;
353
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
354
 
 
355
 
  if (resource_context->isStarted())
356
 
    return; /* already registered, return */
357
 
 
358
 
  assert(monitored->participatesInXaTransaction());
359
 
  assert(monitored->participatesInSqlTransaction());
360
 
 
361
 
  resource_context->setMonitored(monitored);
362
 
  resource_context->setTransactionalStorageEngine(engine);
363
 
  resource_context->setXaResourceManager(resource_manager);
364
 
  trans->registerResource(resource_context);
365
 
 
366
 
  trans->no_2pc|= false;
367
 
}
368
 
 
369
 
void TransactionServices::registerResourceForTransaction(Session *session,
370
 
                                                         plugin::MonitoredInTransaction *monitored,
371
 
                                                         plugin::TransactionalStorageEngine *engine)
372
 
{
373
 
  TransactionContext *trans= &session->transaction.all;
374
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
375
 
 
376
 
  if (resource_context->isStarted())
377
 
    return; /* already registered, return */
378
 
 
379
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
380
 
 
381
 
  trans->registerResource(resource_context);
382
 
 
383
 
  assert(monitored->participatesInSqlTransaction());
384
 
  assert(not monitored->participatesInXaTransaction());
385
 
 
386
 
  resource_context->setMonitored(monitored);
387
 
  resource_context->setTransactionalStorageEngine(engine);
388
 
  trans->no_2pc|= true;
389
 
 
390
 
  if (session->transaction.xid_state.xid.is_null())
391
 
    session->transaction.xid_state.xid.set(session->getQueryId());
392
 
 
393
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
394
 
 
395
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
396
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
397
 
    registerResourceForStatement(session, monitored, engine);
398
 
}
399
 
 
400
 
void TransactionServices::registerResourceForTransaction(Session *session,
401
 
                                                         plugin::MonitoredInTransaction *monitored,
402
 
                                                         plugin::TransactionalStorageEngine *engine,
403
 
                                                         plugin::XaResourceManager *resource_manager)
404
 
{
405
 
  TransactionContext *trans= &session->transaction.all;
406
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
407
 
 
408
 
  if (resource_context->isStarted())
409
 
    return; /* already registered, return */
410
 
 
411
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
412
 
 
413
 
  trans->registerResource(resource_context);
414
 
 
415
 
  assert(monitored->participatesInSqlTransaction());
416
 
 
417
 
  resource_context->setMonitored(monitored);
418
 
  resource_context->setXaResourceManager(resource_manager);
419
 
  resource_context->setTransactionalStorageEngine(engine);
420
 
  trans->no_2pc|= true;
421
 
 
422
 
  if (session->transaction.xid_state.xid.is_null())
423
 
    session->transaction.xid_state.xid.set(session->getQueryId());
424
 
 
425
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
426
 
 
427
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
428
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
429
 
    registerResourceForStatement(session, monitored, engine, resource_manager);
430
 
}
431
 
 
432
 
/**
433
 
  @retval
434
 
    0   ok
435
 
  @retval
436
 
    1   transaction was rolled back
437
 
  @retval
438
 
    2   error during commit, data may be inconsistent
439
 
 
440
 
  @todo
441
 
    Since we don't support nested statement transactions in 5.0,
442
 
    we can't commit or rollback stmt transactions while we are inside
443
 
    stored functions or triggers. So we simply do nothing now.
444
 
    TODO: This should be fixed in later ( >= 5.1) releases.
445
 
*/
446
 
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
447
 
{
448
 
  int error= 0, cookie= 0;
449
 
  /*
450
 
    'all' means that this is either an explicit commit issued by
451
 
    user, or an implicit commit issued by a DDL.
452
 
  */
453
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
454
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
455
 
 
456
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
457
 
 
458
 
  /*
459
 
    We must not commit the normal transaction if a statement
460
 
    transaction is pending. Otherwise statement transaction
461
 
    flags will not get propagated to its normal transaction's
462
 
    counterpart.
463
 
  */
464
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
465
 
              trans == &session->transaction.stmt);
466
 
 
467
 
  if (resource_contexts.empty() == false)
468
 
  {
469
 
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
470
 
    {
471
 
      rollbackTransaction(session, normal_transaction);
472
 
      return 1;
473
 
    }
474
 
 
475
 
    /*
476
 
     * If replication is on, we do a PREPARE on the resource managers, push the
477
 
     * Transaction message across the replication stream, and then COMMIT if the
478
 
     * replication stream returned successfully.
479
 
     */
480
 
    if (shouldConstructMessages())
481
 
    {
482
 
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
483
 
           it != resource_contexts.end() && ! error;
484
 
           ++it)
485
 
      {
486
 
        ResourceContext *resource_context= *it;
487
 
        int err;
488
 
        /*
489
 
          Do not call two-phase commit if this particular
490
 
          transaction is read-only. This allows for simpler
491
 
          implementation in engines that are always read-only.
492
 
        */
493
 
        if (! resource_context->hasModifiedData())
494
 
          continue;
495
 
 
496
 
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
497
 
 
498
 
        if (resource->participatesInXaTransaction())
499
 
        {
500
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
501
 
          {
502
 
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
503
 
            error= 1;
504
 
          }
505
 
          else
506
 
          {
507
 
            session->status_var.ha_prepare_count++;
508
 
          }
509
 
        }
510
 
      }
511
 
      if (error == 0 && is_real_trans)
512
 
      {
513
 
        /*
514
 
         * Push the constructed Transaction messages across to
515
 
         * replicators and appliers.
516
 
         */
517
 
        error= commitTransactionMessage(session);
518
 
      }
519
 
      if (error)
520
 
      {
521
 
        rollbackTransaction(session, normal_transaction);
522
 
        error= 1;
523
 
        goto end;
524
 
      }
525
 
    }
526
 
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
527
 
end:
528
 
    if (is_real_trans)
529
 
      start_waiting_global_read_lock(session);
530
 
  }
531
 
  return error;
532
 
}
533
 
 
534
 
/**
535
 
  @note
536
 
  This function does not care about global read lock. A caller should.
537
 
*/
538
 
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
539
 
{
540
 
  int error=0;
541
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
542
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
543
 
 
544
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
545
 
 
546
 
  if (resource_contexts.empty() == false)
547
 
  {
548
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
549
 
         it != resource_contexts.end();
550
 
         ++it)
551
 
    {
552
 
      int err;
553
 
      ResourceContext *resource_context= *it;
554
 
 
555
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
556
 
 
557
 
      if (resource->participatesInXaTransaction())
558
 
      {
559
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
560
 
        {
561
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
562
 
          error= 1;
563
 
        }
564
 
        else if (normal_transaction)
565
 
        {
566
 
          session->status_var.ha_commit_count++;
567
 
        }
568
 
      }
569
 
      else if (resource->participatesInSqlTransaction())
570
 
      {
571
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
572
 
        {
573
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
574
 
          error= 1;
575
 
        }
576
 
        else if (normal_transaction)
577
 
        {
578
 
          session->status_var.ha_commit_count++;
579
 
        }
580
 
      }
581
 
      resource_context->reset(); /* keep it conveniently zero-filled */
582
 
    }
583
 
 
584
 
    if (is_real_trans)
585
 
      session->transaction.xid_state.xid.null();
586
 
 
587
 
    if (normal_transaction)
588
 
    {
589
 
      session->variables.tx_isolation= session->session_tx_isolation;
590
 
      session->transaction.cleanup();
591
 
    }
592
 
  }
593
 
  trans->reset();
594
 
  return error;
595
 
}
596
 
 
597
 
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
598
 
{
599
 
  int error= 0;
600
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
601
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
602
 
 
603
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
604
 
 
605
 
  /*
606
 
    We must not rollback the normal transaction if a statement
607
 
    transaction is pending.
608
 
  */
609
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
610
 
              trans == &session->transaction.stmt);
611
 
 
612
 
  if (resource_contexts.empty() == false)
613
 
  {
614
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
615
 
         it != resource_contexts.end();
616
 
         ++it)
617
 
    {
618
 
      int err;
619
 
      ResourceContext *resource_context= *it;
620
 
 
621
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
622
 
 
623
 
      if (resource->participatesInXaTransaction())
624
 
      {
625
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
626
 
        {
627
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
628
 
          error= 1;
629
 
        }
630
 
        else if (normal_transaction)
631
 
        {
632
 
          session->status_var.ha_rollback_count++;
633
 
        }
634
 
      }
635
 
      else if (resource->participatesInSqlTransaction())
636
 
      {
637
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
638
 
        {
639
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
640
 
          error= 1;
641
 
        }
642
 
        else if (normal_transaction)
643
 
        {
644
 
          session->status_var.ha_rollback_count++;
645
 
        }
646
 
      }
647
 
      resource_context->reset(); /* keep it conveniently zero-filled */
648
 
    }
649
 
    
650
 
    /* 
651
 
     * We need to signal the ROLLBACK to ReplicationServices here
652
 
     * BEFORE we set the transaction ID to NULL.  This is because
653
 
     * if a bulk segment was sent to replicators, we need to send
654
 
     * a rollback statement with the corresponding transaction ID
655
 
     * to rollback.
656
 
     */
657
 
    rollbackTransactionMessage(session);
658
 
 
659
 
    if (is_real_trans)
660
 
      session->transaction.xid_state.xid.null();
661
 
    if (normal_transaction)
662
 
    {
663
 
      session->variables.tx_isolation=session->session_tx_isolation;
664
 
      session->transaction.cleanup();
665
 
    }
666
 
  }
667
 
  if (normal_transaction)
668
 
    session->transaction_rollback_request= false;
669
 
 
670
 
  /*
671
 
   * If a non-transactional table was updated, warn the user
672
 
   */
673
 
  if (is_real_trans &&
674
 
      session->transaction.all.hasModifiedNonTransData() &&
675
 
      session->killed != Session::KILL_CONNECTION)
676
 
  {
677
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
678
 
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
679
 
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
680
 
  }
681
 
  trans->reset();
682
 
  return error;
683
 
}
684
 
 
685
 
/**
686
 
  This is used to commit or rollback a single statement depending on
687
 
  the value of error.
688
 
 
689
 
  @note
690
 
    Note that if the autocommit is on, then the following call inside
691
 
    InnoDB will commit or rollback the whole transaction (= the statement). The
692
 
    autocommit mechanism built into InnoDB is based on counting locks, but if
693
 
    the user has used LOCK TABLES then that mechanism does not know to do the
694
 
    commit.
695
 
*/
696
 
int TransactionServices::autocommitOrRollback(Session *session, int error)
697
 
{
698
 
  if (session->transaction.stmt.getResourceContexts().empty() == false)
699
 
  {
700
 
    if (! error)
701
 
    {
702
 
      if (commitTransaction(session, false))
703
 
        error= 1;
704
 
    }
705
 
    else
706
 
    {
707
 
      (void) rollbackTransaction(session, false);
708
 
      if (session->transaction_rollback_request)
709
 
        (void) rollbackTransaction(session, true);
710
 
    }
711
 
 
712
 
    session->variables.tx_isolation= session->session_tx_isolation;
713
 
  }
714
 
  return error;
715
 
}
716
 
 
717
 
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
718
 
{
719
 
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
720
 
  {
721
 
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
722
 
     * resources aren't the same... */
723
 
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
724
 
  }
725
 
};
726
 
 
727
 
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
728
 
{
729
 
  int error= 0;
730
 
  TransactionContext *trans= &session->transaction.all;
731
 
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
732
 
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
733
 
 
734
 
  trans->no_2pc= false;
735
 
  /*
736
 
    rolling back to savepoint in all storage engines that were part of the
737
 
    transaction when the savepoint was set
738
 
  */
739
 
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
740
 
       it != sv_resource_contexts.end();
741
 
       ++it)
742
 
  {
743
 
    int err;
744
 
    ResourceContext *resource_context= *it;
745
 
 
746
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
747
 
 
748
 
    if (resource->participatesInSqlTransaction())
749
 
    {
750
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
751
 
      {
752
 
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
753
 
        error= 1;
754
 
      }
755
 
      else
756
 
      {
757
 
        session->status_var.ha_savepoint_rollback_count++;
758
 
      }
759
 
    }
760
 
    trans->no_2pc|= not resource->participatesInXaTransaction();
761
 
  }
762
 
  /*
763
 
    rolling back the transaction in all storage engines that were not part of
764
 
    the transaction when the savepoint was set
765
 
  */
766
 
  {
767
 
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
768
 
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
769
 
    TransactionContext::ResourceContexts set_difference_contexts;
770
 
 
771
 
    /* 
772
 
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
773
 
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
774
 
     * here
775
 
     */
776
 
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
777
 
 
778
 
    sort(sorted_tran_resource_contexts.begin(),
779
 
         sorted_tran_resource_contexts.end(),
780
 
         ResourceContextCompare());
781
 
    sort(sorted_sv_resource_contexts.begin(),
782
 
         sorted_sv_resource_contexts.end(),
783
 
         ResourceContextCompare());
784
 
    set_difference(sorted_tran_resource_contexts.begin(),
785
 
                   sorted_tran_resource_contexts.end(),
786
 
                   sorted_sv_resource_contexts.begin(),
787
 
                   sorted_sv_resource_contexts.end(),
788
 
                   set_difference_contexts.begin(),
789
 
                   ResourceContextCompare());
790
 
    /* 
791
 
     * set_difference_contexts now contains all resource contexts
792
 
     * which are in the transaction context but were NOT in the
793
 
     * savepoint's resource contexts.
794
 
     */
795
 
        
796
 
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
797
 
         it != set_difference_contexts.end();
798
 
         ++it)
799
 
    {
800
 
      ResourceContext *resource_context= *it;
801
 
      int err;
802
 
 
803
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
804
 
 
805
 
      if (resource->participatesInSqlTransaction())
806
 
      {
807
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
808
 
        {
809
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
810
 
          error= 1;
811
 
        }
812
 
        else
813
 
        {
814
 
          session->status_var.ha_rollback_count++;
815
 
        }
816
 
      }
817
 
      resource_context->reset(); /* keep it conveniently zero-filled */
818
 
    }
819
 
  }
820
 
  trans->setResourceContexts(sv_resource_contexts);
821
 
 
822
 
  if (shouldConstructMessages())
823
 
  {
824
 
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
825
 
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
826
 
    if (savepoint_transaction != NULL)
827
 
    {
828
 
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
829
 
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
830
 
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
831
 
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
832
 
      */ 
833
 
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
834
 
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
835
 
      if (num_statements == 0)
836
 
      {    
837
 
        session->setStatementMessage(NULL);
838
 
      }    
839
 
      else 
840
 
      {
841
 
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
842
 
      }    
843
 
      session->setTransactionMessage(savepoint_transaction_copy);
844
 
    }
845
 
  }
846
 
 
847
 
  return error;
848
 
}
849
 
 
850
 
/**
851
 
  @note
852
 
  according to the sql standard (ISO/IEC 9075-2:2003)
853
 
  section "4.33.4 SQL-statements and transaction states",
854
 
  NamedSavepoint is *not* transaction-initiating SQL-statement
855
 
*/
856
 
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
857
 
{
858
 
  int error= 0;
859
 
  TransactionContext *trans= &session->transaction.all;
860
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
861
 
 
862
 
  if (resource_contexts.empty() == false)
863
 
  {
864
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
865
 
         it != resource_contexts.end();
866
 
         ++it)
867
 
    {
868
 
      ResourceContext *resource_context= *it;
869
 
      int err;
870
 
 
871
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
872
 
 
873
 
      if (resource->participatesInSqlTransaction())
874
 
      {
875
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
876
 
        {
877
 
          my_error(ER_GET_ERRNO, MYF(0), err);
878
 
          error= 1;
879
 
        }
880
 
        else
881
 
        {
882
 
          session->status_var.ha_savepoint_count++;
883
 
        }
884
 
      }
885
 
    }
886
 
  }
887
 
  /*
888
 
    Remember the list of registered storage engines.
889
 
  */
890
 
  sv.setResourceContexts(resource_contexts);
891
 
 
892
 
  if (shouldConstructMessages())
893
 
  {
894
 
    message::Transaction *transaction= session->getTransactionMessage();
895
 
                  
896
 
    if (transaction != NULL)
897
 
    {
898
 
      message::Transaction *transaction_savepoint= 
899
 
        new message::Transaction(*transaction);
900
 
      sv.setTransactionMessage(transaction_savepoint);
901
 
    }
902
 
  } 
903
 
 
904
 
  return error;
905
 
}
906
 
 
907
 
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
908
 
{
909
 
  int error= 0;
910
 
 
911
 
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
912
 
 
913
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
914
 
       it != resource_contexts.end();
915
 
       ++it)
916
 
  {
917
 
    int err;
918
 
    ResourceContext *resource_context= *it;
919
 
 
920
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
921
 
 
922
 
    if (resource->participatesInSqlTransaction())
923
 
    {
924
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
925
 
      {
926
 
        my_error(ER_GET_ERRNO, MYF(0), err);
927
 
        error= 1;
928
 
      }
929
 
    }
930
 
  }
931
 
  
932
 
  return error;
933
 
}
934
 
 
935
 
bool TransactionServices::shouldConstructMessages()
936
 
{
937
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
938
 
  return replication_services.isActive();
939
 
}
940
 
 
941
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
942
 
{
943
 
  message::Transaction *transaction= in_session->getTransactionMessage();
944
 
 
945
 
  if (unlikely(transaction == NULL))
946
 
  {
947
 
    /* 
948
 
     * Allocate and initialize a new transaction message 
949
 
     * for this Session object.  Session is responsible for
950
 
     * deleting transaction message when done with it.
951
 
     */
952
 
    transaction= new (nothrow) message::Transaction();
953
 
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
954
 
    in_session->setTransactionMessage(transaction);
955
 
    return transaction;
956
 
  }
957
 
  else
958
 
    return transaction;
959
 
}
960
 
 
961
 
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
962
 
                                                 Session *in_session,
963
 
                                                 bool should_inc_trx_id)
964
 
{
965
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
966
 
  trx->set_server_id(in_session->getServerId());
967
 
 
968
 
  if (should_inc_trx_id)
969
 
    trx->set_transaction_id(getNextTransactionId());
970
 
 
971
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
972
 
}
973
 
 
974
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
975
 
                                              Session *in_session)
976
 
{
977
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
978
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
979
 
}
980
 
 
981
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
982
 
                                             Session *in_session)
983
 
{
984
 
  delete in_transaction;
985
 
  in_session->setStatementMessage(NULL);
986
 
  in_session->setTransactionMessage(NULL);
987
 
}
988
 
 
989
 
int TransactionServices::commitTransactionMessage(Session *in_session)
990
 
{
991
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
992
 
  if (! replication_services.isActive())
993
 
    return 0;
994
 
 
995
 
  /* If there is an active statement message, finalize it */
996
 
  message::Statement *statement= in_session->getStatementMessage();
997
 
 
998
 
  if (statement != NULL)
999
 
  {
1000
 
    finalizeStatementMessage(*statement, in_session);
1001
 
  }
1002
 
  else
1003
 
    return 0; /* No data modification occurred inside the transaction */
1004
 
  
1005
 
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1006
 
 
1007
 
  finalizeTransactionMessage(*transaction, in_session);
1008
 
  
1009
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1010
 
 
1011
 
  cleanupTransactionMessage(transaction, in_session);
1012
 
 
1013
 
  return static_cast<int>(result);
1014
 
}
1015
 
 
1016
 
void TransactionServices::initStatementMessage(message::Statement &statement,
1017
 
                                        message::Statement::Type in_type,
1018
 
                                        Session *in_session)
1019
 
{
1020
 
  statement.set_type(in_type);
1021
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1022
 
  /** @TODO Set sql string optionally */
1023
 
}
1024
 
 
1025
 
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1026
 
                                            Session *in_session)
1027
 
{
1028
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1029
 
  in_session->setStatementMessage(NULL);
1030
 
}
1031
 
 
1032
 
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1033
 
{
1034
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1035
 
  if (! replication_services.isActive())
1036
 
    return;
1037
 
  
1038
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1039
 
 
1040
 
  /*
1041
 
   * OK, so there are two situations that we need to deal with here:
1042
 
   *
1043
 
   * 1) We receive an instruction to ROLLBACK the current transaction
1044
 
   *    and the currently-stored Transaction message is *self-contained*, 
1045
 
   *    meaning that no Statement messages in the Transaction message
1046
 
   *    contain a message having its segment_id member greater than 1.  If
1047
 
   *    no non-segment ID 1 members are found, we can simply clear the
1048
 
   *    current Transaction message and remove it from memory.
1049
 
   *
1050
 
   * 2) If the Transaction message does indeed have a non-end segment, that
1051
 
   *    means that a bulk update/delete/insert Transaction message segment
1052
 
   *    has previously been sent over the wire to replicators.  In this case, 
1053
 
   *    we need to package a Transaction with a Statement message of type
1054
 
   *    ROLLBACK to indicate to replicators that previously-transmitted
1055
 
   *    messages must be un-applied.
1056
 
   */
1057
 
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1058
 
  {
1059
 
    /* Remember the transaction ID so we can re-use it */
1060
 
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1061
 
 
1062
 
    /*
1063
 
     * Clear the transaction, create a Rollback statement message, 
1064
 
     * attach it to the transaction, and push it to replicators.
1065
 
     */
1066
 
    transaction->Clear();
1067
 
    initTransactionMessage(*transaction, in_session, false);
1068
 
 
1069
 
    /* Set the transaction ID to match the previous messages */
1070
 
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1071
 
 
1072
 
    message::Statement *statement= transaction->add_statement();
1073
 
 
1074
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1075
 
    finalizeStatementMessage(*statement, in_session);
1076
 
 
1077
 
    finalizeTransactionMessage(*transaction, in_session);
1078
 
    
1079
 
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1080
 
  }
1081
 
  cleanupTransactionMessage(transaction, in_session);
1082
 
}
1083
 
 
1084
 
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1085
 
                                                            Table *in_table,
1086
 
                                                            uint32_t *next_segment_id)
1087
 
{
1088
 
  message::Statement *statement= in_session->getStatementMessage();
1089
 
  message::Transaction *transaction= NULL;
1090
 
 
1091
 
  /* 
1092
 
   * Check the type for the current Statement message, if it is anything
1093
 
   * other then INSERT we need to call finalize, this will ensure a 
1094
 
   * new InsertStatement is created. If it is of type INSERT check
1095
 
   * what table the INSERT belongs to, if it is a different table
1096
 
   * call finalize, so a new InsertStatement can be created. 
1097
 
   */
1098
 
  if (statement != NULL && statement->type() != message::Statement::INSERT)
1099
 
  {
1100
 
    finalizeStatementMessage(*statement, in_session);
1101
 
    statement= in_session->getStatementMessage();
1102
 
  } 
1103
 
  else if (statement != NULL)
1104
 
  {
1105
 
    transaction= getActiveTransactionMessage(in_session);
1106
 
 
1107
 
    /*
1108
 
     * If we've passed our threshold for the statement size (possible for
1109
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1110
 
     * the Transaction will keep it from getting huge).
1111
 
     */
1112
 
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1113
 
    {
1114
 
      /* Remember the transaction ID so we can re-use it */
1115
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1116
 
 
1117
 
      message::InsertData *current_data= statement->mutable_insert_data();
1118
 
 
1119
 
      /* Caller should use this value when adding a new record */
1120
 
      *next_segment_id= current_data->segment_id() + 1;
1121
 
 
1122
 
      current_data->set_end_segment(false);
1123
 
 
1124
 
      /* 
1125
 
       * Send the trx message to replicators after finalizing the 
1126
 
       * statement and transaction. This will also set the Transaction
1127
 
       * and Statement objects in Session to NULL.
1128
 
       */
1129
 
      commitTransactionMessage(in_session);
1130
 
 
1131
 
      /*
1132
 
       * Statement and Transaction should now be NULL, so new ones will get
1133
 
       * created. We reuse the transaction id since we are segmenting
1134
 
       * one transaction.
1135
 
       */
1136
 
      statement= in_session->getStatementMessage();
1137
 
      transaction= getActiveTransactionMessage(in_session, false);
1138
 
      assert(transaction != NULL);
1139
 
 
1140
 
      /* Set the transaction ID to match the previous messages */
1141
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1142
 
    }
1143
 
    else
1144
 
    {
1145
 
      const message::InsertHeader &insert_header= statement->insert_header();
1146
 
      string old_table_name= insert_header.table_metadata().table_name();
1147
 
     
1148
 
      string current_table_name;
1149
 
      (void) in_table->getShare()->getTableName(current_table_name);
1150
 
 
1151
 
      if (current_table_name.compare(old_table_name))
1152
 
      {
1153
 
        finalizeStatementMessage(*statement, in_session);
1154
 
        statement= in_session->getStatementMessage();
1155
 
      }
1156
 
      else
1157
 
      {
1158
 
        /* carry forward the existing segment id */
1159
 
        const message::InsertData &current_data= statement->insert_data();
1160
 
        *next_segment_id= current_data.segment_id();
1161
 
      }
1162
 
    }
1163
 
  } 
1164
 
 
1165
 
  if (statement == NULL)
1166
 
  {
1167
 
    /*
1168
 
     * Transaction will be non-NULL only if we had to segment it due to
1169
 
     * transaction size above.
1170
 
     */
1171
 
    if (transaction == NULL)
1172
 
      transaction= getActiveTransactionMessage(in_session);
1173
 
 
1174
 
    /* 
1175
 
     * Transaction message initialized and set, but no statement created
1176
 
     * yet.  We construct one and initialize it, here, then return the
1177
 
     * message after attaching the new Statement message pointer to the 
1178
 
     * Session for easy retrieval later...
1179
 
     */
1180
 
    statement= transaction->add_statement();
1181
 
    setInsertHeader(*statement, in_session, in_table);
1182
 
    in_session->setStatementMessage(statement);
1183
 
  }
1184
 
  return *statement;
1185
 
}
1186
 
 
1187
 
void TransactionServices::setInsertHeader(message::Statement &statement,
1188
 
                                          Session *in_session,
1189
 
                                          Table *in_table)
1190
 
{
1191
 
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1192
 
 
1193
 
  /* 
1194
 
   * Now we construct the specialized InsertHeader message inside
1195
 
   * the generalized message::Statement container...
1196
 
   */
1197
 
  /* Set up the insert header */
1198
 
  message::InsertHeader *header= statement.mutable_insert_header();
1199
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1200
 
 
1201
 
  string schema_name;
1202
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1203
 
  string table_name;
1204
 
  (void) in_table->getShare()->getTableName(table_name);
1205
 
 
1206
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1207
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1208
 
 
1209
 
  Field *current_field;
1210
 
  Field **table_fields= in_table->getFields();
1211
 
 
1212
 
  message::FieldMetadata *field_metadata;
1213
 
 
1214
 
  /* We will read all the table's fields... */
1215
 
  in_table->setReadSet();
1216
 
 
1217
 
  while ((current_field= *table_fields++) != NULL) 
1218
 
  {
1219
 
    field_metadata= header->add_field_metadata();
1220
 
    field_metadata->set_name(current_field->field_name);
1221
 
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1222
 
  }
1223
 
}
1224
 
 
1225
 
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1226
 
{
1227
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1228
 
  if (! replication_services.isActive())
1229
 
    return false;
1230
 
  /**
1231
 
   * We do this check here because we don't want to even create a 
1232
 
   * statement if there isn't a primary key on the table...
1233
 
   *
1234
 
   * @todo
1235
 
   *
1236
 
   * Multi-column primary keys are handled how exactly?
1237
 
   */
1238
 
  if (not in_table->getShare()->hasPrimaryKey())
1239
 
  {
1240
 
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1241
 
    return true;
1242
 
  }
1243
 
 
1244
 
  uint32_t next_segment_id= 1;
1245
 
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1246
 
 
1247
 
  message::InsertData *data= statement.mutable_insert_data();
1248
 
  data->set_segment_id(next_segment_id);
1249
 
  data->set_end_segment(true);
1250
 
  message::InsertRecord *record= data->add_record();
1251
 
 
1252
 
  Field *current_field;
1253
 
  Field **table_fields= in_table->getFields();
1254
 
 
1255
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1256
 
  string_value->set_charset(system_charset_info);
1257
 
 
1258
 
  /* We will read all the table's fields... */
1259
 
  in_table->setReadSet();
1260
 
 
1261
 
  while ((current_field= *table_fields++) != NULL) 
1262
 
  {
1263
 
    if (current_field->is_null())
1264
 
    {
1265
 
      record->add_is_null(true);
1266
 
      record->add_insert_value("", 0);
1267
 
    } 
1268
 
    else 
1269
 
    {
1270
 
      string_value= current_field->val_str(string_value);
1271
 
      record->add_is_null(false);
1272
 
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1273
 
      string_value->free();
1274
 
    }
1275
 
  }
1276
 
  return false;
1277
 
}
1278
 
 
1279
 
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1280
 
                                                            Table *in_table,
1281
 
                                                            const unsigned char *old_record, 
1282
 
                                                            const unsigned char *new_record,
1283
 
                                                            uint32_t *next_segment_id)
1284
 
{
1285
 
  message::Statement *statement= in_session->getStatementMessage();
1286
 
  message::Transaction *transaction= NULL;
1287
 
 
1288
 
  /*
1289
 
   * Check the type for the current Statement message, if it is anything
1290
 
   * other then UPDATE we need to call finalize, this will ensure a
1291
 
   * new UpdateStatement is created. If it is of type UPDATE check
1292
 
   * what table the UPDATE belongs to, if it is a different table
1293
 
   * call finalize, so a new UpdateStatement can be created.
1294
 
   */
1295
 
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1296
 
  {
1297
 
    finalizeStatementMessage(*statement, in_session);
1298
 
    statement= in_session->getStatementMessage();
1299
 
  }
1300
 
  else if (statement != NULL)
1301
 
  {
1302
 
    transaction= getActiveTransactionMessage(in_session);
1303
 
 
1304
 
    /*
1305
 
     * If we've passed our threshold for the statement size (possible for
1306
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1307
 
     * the Transaction will keep it from getting huge).
1308
 
     */
1309
 
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1310
 
    {
1311
 
      /* Remember the transaction ID so we can re-use it */
1312
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1313
 
 
1314
 
      message::UpdateData *current_data= statement->mutable_update_data();
1315
 
 
1316
 
      /* Caller should use this value when adding a new record */
1317
 
      *next_segment_id= current_data->segment_id() + 1;
1318
 
 
1319
 
      current_data->set_end_segment(false);
1320
 
 
1321
 
      /*
1322
 
       * Send the trx message to replicators after finalizing the 
1323
 
       * statement and transaction. This will also set the Transaction
1324
 
       * and Statement objects in Session to NULL.
1325
 
       */
1326
 
      commitTransactionMessage(in_session);
1327
 
 
1328
 
      /*
1329
 
       * Statement and Transaction should now be NULL, so new ones will get
1330
 
       * created. We reuse the transaction id since we are segmenting
1331
 
       * one transaction.
1332
 
       */
1333
 
      statement= in_session->getStatementMessage();
1334
 
      transaction= getActiveTransactionMessage(in_session, false);
1335
 
      assert(transaction != NULL);
1336
 
 
1337
 
      /* Set the transaction ID to match the previous messages */
1338
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1339
 
    }
1340
 
    else
1341
 
    {
1342
 
      const message::UpdateHeader &update_header= statement->update_header();
1343
 
      string old_table_name= update_header.table_metadata().table_name();
1344
 
 
1345
 
      string current_table_name;
1346
 
      (void) in_table->getShare()->getTableName(current_table_name);
1347
 
      if (current_table_name.compare(old_table_name))
1348
 
      {
1349
 
        finalizeStatementMessage(*statement, in_session);
1350
 
        statement= in_session->getStatementMessage();
1351
 
      }
1352
 
      else
1353
 
      {
1354
 
        /* carry forward the existing segment id */
1355
 
        const message::UpdateData &current_data= statement->update_data();
1356
 
        *next_segment_id= current_data.segment_id();
1357
 
      }
1358
 
    }
1359
 
  }
1360
 
 
1361
 
  if (statement == NULL)
1362
 
  {
1363
 
    /*
1364
 
     * Transaction will be non-NULL only if we had to segment it due to
1365
 
     * transaction size above.
1366
 
     */
1367
 
    if (transaction == NULL)
1368
 
      transaction= getActiveTransactionMessage(in_session);
1369
 
 
1370
 
    /* 
1371
 
     * Transaction message initialized and set, but no statement created
1372
 
     * yet.  We construct one and initialize it, here, then return the
1373
 
     * message after attaching the new Statement message pointer to the 
1374
 
     * Session for easy retrieval later...
1375
 
     */
1376
 
    statement= transaction->add_statement();
1377
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1378
 
    in_session->setStatementMessage(statement);
1379
 
  }
1380
 
  return *statement;
1381
 
}
1382
 
 
1383
 
void TransactionServices::setUpdateHeader(message::Statement &statement,
1384
 
                                          Session *in_session,
1385
 
                                          Table *in_table,
1386
 
                                          const unsigned char *old_record, 
1387
 
                                          const unsigned char *new_record)
1388
 
{
1389
 
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1390
 
 
1391
 
  /* 
1392
 
   * Now we construct the specialized UpdateHeader message inside
1393
 
   * the generalized message::Statement container...
1394
 
   */
1395
 
  /* Set up the update header */
1396
 
  message::UpdateHeader *header= statement.mutable_update_header();
1397
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1398
 
 
1399
 
  string schema_name;
1400
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1401
 
  string table_name;
1402
 
  (void) in_table->getShare()->getTableName(table_name);
1403
 
 
1404
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1405
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1406
 
 
1407
 
  Field *current_field;
1408
 
  Field **table_fields= in_table->getFields();
1409
 
 
1410
 
  message::FieldMetadata *field_metadata;
1411
 
 
1412
 
  /* We will read all the table's fields... */
1413
 
  in_table->setReadSet();
1414
 
 
1415
 
  while ((current_field= *table_fields++) != NULL) 
1416
 
  {
1417
 
    /*
1418
 
     * We add the "key field metadata" -- i.e. the fields which is
1419
 
     * the primary key for the table.
1420
 
     */
1421
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1422
 
    {
1423
 
      field_metadata= header->add_key_field_metadata();
1424
 
      field_metadata->set_name(current_field->field_name);
1425
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1426
 
    }
1427
 
 
1428
 
    /*
1429
 
     * The below really should be moved into the Field API and Record API.  But for now
1430
 
     * we do this crazy pointer fiddling to figure out if the current field
1431
 
     * has been updated in the supplied record raw byte pointers.
1432
 
     */
1433
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
1434
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
1435
 
 
1436
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1437
 
 
1438
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1439
 
    {
1440
 
      /* Field is changed from old to new */
1441
 
      field_metadata= header->add_set_field_metadata();
1442
 
      field_metadata->set_name(current_field->field_name);
1443
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1444
 
    }
1445
 
  }
1446
 
}
1447
 
void TransactionServices::updateRecord(Session *in_session,
1448
 
                                       Table *in_table, 
1449
 
                                       const unsigned char *old_record, 
1450
 
                                       const unsigned char *new_record)
1451
 
{
1452
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1453
 
  if (! replication_services.isActive())
1454
 
    return;
1455
 
 
1456
 
  uint32_t next_segment_id= 1;
1457
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1458
 
 
1459
 
  message::UpdateData *data= statement.mutable_update_data();
1460
 
  data->set_segment_id(next_segment_id);
1461
 
  data->set_end_segment(true);
1462
 
  message::UpdateRecord *record= data->add_record();
1463
 
 
1464
 
  Field *current_field;
1465
 
  Field **table_fields= in_table->getFields();
1466
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1467
 
  string_value->set_charset(system_charset_info);
1468
 
 
1469
 
  while ((current_field= *table_fields++) != NULL) 
1470
 
  {
1471
 
    /*
1472
 
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1473
 
     * but then realized that an UPDATE statement could potentially have different values for
1474
 
     * the SET field.  For instance, imagine this SQL scenario:
1475
 
     *
1476
 
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1477
 
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1478
 
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1479
 
     *
1480
 
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1481
 
     *
1482
 
     * The below really should be moved into the Field API and Record API.  But for now
1483
 
     * we do this crazy pointer fiddling to figure out if the current field
1484
 
     * has been updated in the supplied record raw byte pointers.
1485
 
     */
1486
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
1487
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
1488
 
 
1489
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1490
 
 
1491
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1492
 
    {
1493
 
      /* Store the original "read bit" for this field */
1494
 
      bool is_read_set= current_field->isReadSet();
1495
 
 
1496
 
      /* We need to mark that we will "read" this field... */
1497
 
      in_table->setReadSet(current_field->field_index);
1498
 
 
1499
 
      /* Read the string value of this field's contents */
1500
 
      string_value= current_field->val_str(string_value);
1501
 
 
1502
 
      /* 
1503
 
       * Reset the read bit after reading field to its original state.  This 
1504
 
       * prevents the field from being included in the WHERE clause
1505
 
       */
1506
 
      current_field->setReadSet(is_read_set);
1507
 
 
1508
 
      if (current_field->is_null())
1509
 
      {
1510
 
        record->add_is_null(true);
1511
 
        record->add_after_value("", 0);
1512
 
      }
1513
 
      else
1514
 
      {
1515
 
        record->add_is_null(false);
1516
 
        record->add_after_value(string_value->c_ptr(), string_value->length());
1517
 
      }
1518
 
      string_value->free();
1519
 
    }
1520
 
 
1521
 
    /* 
1522
 
     * Add the WHERE clause values now...for now, this means the
1523
 
     * primary key field value.  Replication only supports tables
1524
 
     * with a primary key.
1525
 
     */
1526
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1527
 
    {
1528
 
      /**
1529
 
       * To say the below is ugly is an understatement. But it works.
1530
 
       * 
1531
 
       * @todo Move this crap into a real Record API.
1532
 
       */
1533
 
      string_value= current_field->val_str(string_value,
1534
 
                                           old_record + 
1535
 
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1536
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1537
 
      string_value->free();
1538
 
    }
1539
 
 
1540
 
  }
1541
 
}
1542
 
 
1543
 
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1544
 
                                                            Table *in_table,
1545
 
                                                            uint32_t *next_segment_id)
1546
 
{
1547
 
  message::Statement *statement= in_session->getStatementMessage();
1548
 
  message::Transaction *transaction= NULL;
1549
 
 
1550
 
  /*
1551
 
   * Check the type for the current Statement message, if it is anything
1552
 
   * other then DELETE we need to call finalize, this will ensure a
1553
 
   * new DeleteStatement is created. If it is of type DELETE check
1554
 
   * what table the DELETE belongs to, if it is a different table
1555
 
   * call finalize, so a new DeleteStatement can be created.
1556
 
   */
1557
 
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1558
 
  {
1559
 
    finalizeStatementMessage(*statement, in_session);
1560
 
    statement= in_session->getStatementMessage();
1561
 
  }
1562
 
  else if (statement != NULL)
1563
 
  {
1564
 
    transaction= getActiveTransactionMessage(in_session);
1565
 
 
1566
 
    /*
1567
 
     * If we've passed our threshold for the statement size (possible for
1568
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1569
 
     * the Transaction will keep it from getting huge).
1570
 
     */
1571
 
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1572
 
    {
1573
 
      /* Remember the transaction ID so we can re-use it */
1574
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1575
 
 
1576
 
      message::DeleteData *current_data= statement->mutable_delete_data();
1577
 
 
1578
 
      /* Caller should use this value when adding a new record */
1579
 
      *next_segment_id= current_data->segment_id() + 1;
1580
 
 
1581
 
      current_data->set_end_segment(false);
1582
 
 
1583
 
      /* 
1584
 
       * Send the trx message to replicators after finalizing the 
1585
 
       * statement and transaction. This will also set the Transaction
1586
 
       * and Statement objects in Session to NULL.
1587
 
       */
1588
 
      commitTransactionMessage(in_session);
1589
 
 
1590
 
      /*
1591
 
       * Statement and Transaction should now be NULL, so new ones will get
1592
 
       * created. We reuse the transaction id since we are segmenting
1593
 
       * one transaction.
1594
 
       */
1595
 
      statement= in_session->getStatementMessage();
1596
 
      transaction= getActiveTransactionMessage(in_session, false);
1597
 
      assert(transaction != NULL);
1598
 
 
1599
 
      /* Set the transaction ID to match the previous messages */
1600
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1601
 
    }
1602
 
    else
1603
 
    {
1604
 
      const message::DeleteHeader &delete_header= statement->delete_header();
1605
 
      string old_table_name= delete_header.table_metadata().table_name();
1606
 
 
1607
 
      string current_table_name;
1608
 
      (void) in_table->getShare()->getTableName(current_table_name);
1609
 
      if (current_table_name.compare(old_table_name))
1610
 
      {
1611
 
        finalizeStatementMessage(*statement, in_session);
1612
 
        statement= in_session->getStatementMessage();
1613
 
      }
1614
 
      else
1615
 
      {
1616
 
        /* carry forward the existing segment id */
1617
 
        const message::DeleteData &current_data= statement->delete_data();
1618
 
        *next_segment_id= current_data.segment_id();
1619
 
      }
1620
 
    }
1621
 
  }
1622
 
 
1623
 
  if (statement == NULL)
1624
 
  {
1625
 
    /*
1626
 
     * Transaction will be non-NULL only if we had to segment it due to
1627
 
     * transaction size above.
1628
 
     */
1629
 
    if (transaction == NULL)
1630
 
      transaction= getActiveTransactionMessage(in_session);
1631
 
 
1632
 
    /* 
1633
 
     * Transaction message initialized and set, but no statement created
1634
 
     * yet.  We construct one and initialize it, here, then return the
1635
 
     * message after attaching the new Statement message pointer to the 
1636
 
     * Session for easy retrieval later...
1637
 
     */
1638
 
    statement= transaction->add_statement();
1639
 
    setDeleteHeader(*statement, in_session, in_table);
1640
 
    in_session->setStatementMessage(statement);
1641
 
  }
1642
 
  return *statement;
1643
 
}
1644
 
 
1645
 
void TransactionServices::setDeleteHeader(message::Statement &statement,
1646
 
                                          Session *in_session,
1647
 
                                          Table *in_table)
1648
 
{
1649
 
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1650
 
 
1651
 
  /* 
1652
 
   * Now we construct the specialized DeleteHeader message inside
1653
 
   * the generalized message::Statement container...
1654
 
   */
1655
 
  message::DeleteHeader *header= statement.mutable_delete_header();
1656
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1657
 
 
1658
 
  string schema_name;
1659
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1660
 
  string table_name;
1661
 
  (void) in_table->getShare()->getTableName(table_name);
1662
 
 
1663
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1664
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1665
 
 
1666
 
  Field *current_field;
1667
 
  Field **table_fields= in_table->getFields();
1668
 
 
1669
 
  message::FieldMetadata *field_metadata;
1670
 
 
1671
 
  while ((current_field= *table_fields++) != NULL) 
1672
 
  {
1673
 
    /* 
1674
 
     * Add the WHERE clause values now...for now, this means the
1675
 
     * primary key field value.  Replication only supports tables
1676
 
     * with a primary key.
1677
 
     */
1678
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1679
 
    {
1680
 
      field_metadata= header->add_key_field_metadata();
1681
 
      field_metadata->set_name(current_field->field_name);
1682
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1683
 
    }
1684
 
  }
1685
 
}
1686
 
 
1687
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1688
 
{
1689
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1690
 
  if (! replication_services.isActive())
1691
 
    return;
1692
 
 
1693
 
  uint32_t next_segment_id= 1;
1694
 
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1695
 
 
1696
 
  message::DeleteData *data= statement.mutable_delete_data();
1697
 
  data->set_segment_id(next_segment_id);
1698
 
  data->set_end_segment(true);
1699
 
  message::DeleteRecord *record= data->add_record();
1700
 
 
1701
 
  Field *current_field;
1702
 
  Field **table_fields= in_table->getFields();
1703
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1704
 
  string_value->set_charset(system_charset_info);
1705
 
 
1706
 
  while ((current_field= *table_fields++) != NULL) 
1707
 
  {
1708
 
    /* 
1709
 
     * Add the WHERE clause values now...for now, this means the
1710
 
     * primary key field value.  Replication only supports tables
1711
 
     * with a primary key.
1712
 
     */
1713
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1714
 
    {
1715
 
      if (use_update_record)
1716
 
      {
1717
 
        /*
1718
 
         * Temporarily point to the update record to get its value.
1719
 
         * This is pretty much a hack in order to get the PK value from
1720
 
         * the update record rather than the insert record. Field::val_str()
1721
 
         * should not change anything in Field::ptr, so this should be safe.
1722
 
         * We are careful not to change anything in old_ptr.
1723
 
         */
1724
 
        const unsigned char *old_ptr= current_field->ptr;
1725
 
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1726
 
        string_value= current_field->val_str(string_value);
1727
 
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1728
 
      }
1729
 
      else
1730
 
      {
1731
 
        string_value= current_field->val_str(string_value);
1732
 
        /**
1733
 
         * @TODO Store optional old record value in the before data member
1734
 
         */
1735
 
      }
1736
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1737
 
      string_value->free();
1738
 
    }
1739
 
  }
1740
 
}
1741
 
 
1742
 
void TransactionServices::createTable(Session *in_session,
1743
 
                                      const message::Table &table)
1744
 
{
1745
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1746
 
  if (! replication_services.isActive())
1747
 
    return;
1748
 
  
1749
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1750
 
  message::Statement *statement= transaction->add_statement();
1751
 
 
1752
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1753
 
 
1754
 
  /* 
1755
 
   * Construct the specialized CreateTableStatement message and attach
1756
 
   * it to the generic Statement message
1757
 
   */
1758
 
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1759
 
  message::Table *new_table_message= create_table_statement->mutable_table();
1760
 
  *new_table_message= table;
1761
 
 
1762
 
  finalizeStatementMessage(*statement, in_session);
1763
 
 
1764
 
  finalizeTransactionMessage(*transaction, in_session);
1765
 
  
1766
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1767
 
 
1768
 
  cleanupTransactionMessage(transaction, in_session);
1769
 
 
1770
 
}
1771
 
 
1772
 
void TransactionServices::createSchema(Session *in_session,
1773
 
                                       const message::Schema &schema)
1774
 
{
1775
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1776
 
  if (! replication_services.isActive())
1777
 
    return;
1778
 
  
1779
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1780
 
  message::Statement *statement= transaction->add_statement();
1781
 
 
1782
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1783
 
 
1784
 
  /* 
1785
 
   * Construct the specialized CreateSchemaStatement message and attach
1786
 
   * it to the generic Statement message
1787
 
   */
1788
 
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1789
 
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1790
 
  *new_schema_message= schema;
1791
 
 
1792
 
  finalizeStatementMessage(*statement, in_session);
1793
 
 
1794
 
  finalizeTransactionMessage(*transaction, in_session);
1795
 
  
1796
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1797
 
 
1798
 
  cleanupTransactionMessage(transaction, in_session);
1799
 
 
1800
 
}
1801
 
 
1802
 
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1803
 
{
1804
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1805
 
  if (! replication_services.isActive())
1806
 
    return;
1807
 
  
1808
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1809
 
  message::Statement *statement= transaction->add_statement();
1810
 
 
1811
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1812
 
 
1813
 
  /* 
1814
 
   * Construct the specialized DropSchemaStatement message and attach
1815
 
   * it to the generic Statement message
1816
 
   */
1817
 
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1818
 
 
1819
 
  drop_schema_statement->set_schema_name(schema_name);
1820
 
 
1821
 
  finalizeStatementMessage(*statement, in_session);
1822
 
 
1823
 
  finalizeTransactionMessage(*transaction, in_session);
1824
 
  
1825
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1826
 
 
1827
 
  cleanupTransactionMessage(transaction, in_session);
1828
 
}
1829
 
 
1830
 
void TransactionServices::dropTable(Session *in_session,
1831
 
                                    const string &schema_name,
1832
 
                                    const string &table_name,
1833
 
                                    bool if_exists)
1834
 
{
1835
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1836
 
  if (! replication_services.isActive())
1837
 
    return;
1838
 
  
1839
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1840
 
  message::Statement *statement= transaction->add_statement();
1841
 
 
1842
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1843
 
 
1844
 
  /* 
1845
 
   * Construct the specialized DropTableStatement message and attach
1846
 
   * it to the generic Statement message
1847
 
   */
1848
 
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1849
 
 
1850
 
  drop_table_statement->set_if_exists_clause(if_exists);
1851
 
 
1852
 
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1853
 
 
1854
 
  table_metadata->set_schema_name(schema_name);
1855
 
  table_metadata->set_table_name(table_name);
1856
 
 
1857
 
  finalizeStatementMessage(*statement, in_session);
1858
 
 
1859
 
  finalizeTransactionMessage(*transaction, in_session);
1860
 
  
1861
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1862
 
 
1863
 
  cleanupTransactionMessage(transaction, in_session);
1864
 
}
1865
 
 
1866
 
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1867
 
{
1868
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1869
 
  if (! replication_services.isActive())
1870
 
    return;
1871
 
  
1872
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1873
 
  message::Statement *statement= transaction->add_statement();
1874
 
 
1875
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1876
 
 
1877
 
  /* 
1878
 
   * Construct the specialized TruncateTableStatement message and attach
1879
 
   * it to the generic Statement message
1880
 
   */
1881
 
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1882
 
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1883
 
 
1884
 
  string schema_name;
1885
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1886
 
  string table_name;
1887
 
  (void) in_table->getShare()->getTableName(table_name);
1888
 
 
1889
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1890
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1891
 
 
1892
 
  finalizeStatementMessage(*statement, in_session);
1893
 
 
1894
 
  finalizeTransactionMessage(*transaction, in_session);
1895
 
  
1896
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1897
 
 
1898
 
  cleanupTransactionMessage(transaction, in_session);
1899
 
}
1900
 
 
1901
 
void TransactionServices::rawStatement(Session *in_session, const string &query)
1902
 
{
1903
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1904
 
  if (! replication_services.isActive())
1905
 
    return;
1906
 
  
1907
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1908
 
  message::Statement *statement= transaction->add_statement();
1909
 
 
1910
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1911
 
  statement->set_sql(query);
1912
 
  finalizeStatementMessage(*statement, in_session);
1913
 
 
1914
 
  finalizeTransactionMessage(*transaction, in_session);
1915
 
  
1916
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1917
 
 
1918
 
  cleanupTransactionMessage(transaction, in_session);
1919
 
}
1920
 
 
1921
 
} /* namespace drizzled */