~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.h

  • Committer: Brian Aker
  • Date: 2010-07-20 02:17:09 UTC
  • mto: This revision was merged to the branch mainline in revision 1665.
  • Revision ID: brian@gaz-20100720021709-1yeehbw8d5vbtmlc
Removed identifier bit

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
{
37
37
  class MonitoredInTransaction;
38
38
  class XaResourceManager;
39
 
  class XaStorageEngine;
40
39
  class TransactionalStorageEngine;
41
40
}
42
41
 
43
42
class Session;
44
43
class NamedSavepoint;
45
 
class Field;
46
44
 
47
45
/**
48
46
 * This is a class which manages the XA transaction processing
52
50
{
53
51
public:
54
52
  static const size_t DEFAULT_RECORD_SIZE= 100;
55
 
  
56
 
  TransactionServices();
 
53
  typedef uint64_t TransactionId;
 
54
  /**
 
55
   * Constructor
 
56
   */
 
57
  TransactionServices()
 
58
  {
 
59
    /**
 
60
     * @todo set transaction ID to the last one from an applier...
 
61
     */
 
62
    current_transaction_id= 0;
 
63
  }
57
64
 
58
65
  /**
59
66
   * Singleton method
73
80
  /**
74
81
   * Method which returns the active Transaction message
75
82
   * for the supplied Session.  If one is not found, a new Transaction
76
 
   * message is allocated, initialized, and returned. It is possible that
77
 
   * we may want to NOT increment the transaction id for a new Transaction
78
 
   * object (e.g., splitting up Transactions into smaller chunks). The
79
 
   * should_inc_trx_id flag controls if we do this.
 
83
   * message is allocated, initialized, and returned.
80
84
   *
81
 
   * @param in_session The session processing the transaction
82
 
   * @param should_inc_trx_id If true, increments the transaction id for a new trx
 
85
   * @param The session processing the transaction
83
86
   */
84
 
  message::Transaction *getActiveTransactionMessage(Session *in_session,
85
 
                                                    bool should_inc_trx_id= true);
 
87
  message::Transaction *getActiveTransactionMessage(Session *in_session);
86
88
  /** 
87
89
   * Method which attaches a transaction context
88
90
   * the supplied transaction based on the supplied Session's
89
91
   * transaction information.  This method also ensure the
90
92
   * transaction message is attached properly to the Session object
91
93
   *
92
 
   * @param in_transaction The transaction message to initialize
93
 
   * @param in_session The Session processing this transaction
94
 
   * @param should_inc_trx_id If true, increments the transaction id for a new trx
 
94
   * @param The transaction message to initialize
 
95
   * @param The Session processing this transaction
95
96
   */
96
 
  void initTransactionMessage(message::Transaction &in_transaction,
97
 
                              Session *in_session,
98
 
                              bool should_inc_trx_id);
 
97
  void initTransactionMessage(message::Transaction &in_transaction, Session *in_session);
99
98
  /** 
100
99
   * Helper method which finalizes data members for the 
101
100
   * supplied transaction's context.
102
101
   *
103
 
   * @param in_transaction The transaction message to finalize 
104
 
   * @param in_session The Session processing this transaction
 
102
   * @param The transaction message to finalize 
 
103
   * @param The Session processing this transaction
105
104
   */
106
105
  void finalizeTransactionMessage(message::Transaction &in_transaction, Session *in_session);
107
106
  /**
110
109
   */
111
110
  void cleanupTransactionMessage(message::Transaction *in_transaction,
112
111
                                 Session *in_session);
113
 
 
114
112
  /**
115
113
   * Helper method which initializes a Statement message
116
114
   *
117
 
   * @param statement The statement to initialize
118
 
   * @param in_type The type of the statement
119
 
   * @param in_session The session processing this statement
 
115
   * @param The statement to initialize
 
116
   * @param The type of the statement
 
117
   * @param The session processing this statement
120
118
   */
121
119
  void initStatementMessage(message::Statement &statement,
122
120
                            message::Statement::Type in_type,
125
123
   * Finalizes a Statement message and sets the Session's statement
126
124
   * message to NULL.
127
125
   *
128
 
   * @param statement The statement to initialize
129
 
   * @param in_session The session processing this statement
 
126
   * @param The statement to initialize
 
127
   * @param The session processing this statement
130
128
   */
131
129
  void finalizeStatementMessage(message::Statement &statement,
132
130
                                Session *in_session);
133
131
  /** Helper method which returns an initialized Statement message for methods
134
132
   * doing insertion of data.
135
133
   *
136
 
   * @param[in] in_session Pointer to the Session doing the processing
137
 
   * @param[in] in_table Pointer to the Table object being inserted into
138
 
   * @param[out] next_segment_id The next Statement segment id to be used
 
134
   * @param[in] Pointer to the Session doing the processing
 
135
   * @param[in] Pointer to the Table object being inserted into
139
136
   */
140
137
  message::Statement &getInsertStatement(Session *in_session,
141
 
                                         Table *in_table,
142
 
                                         uint32_t *next_segment_id);
 
138
                                         Table *in_table);
143
139
 
144
140
  /**
145
141
   * Helper method which initializes the header message for
146
142
   * insert operations.
147
143
   *
148
 
   * @param[in,out] statement Statement message container to modify
149
 
   * @param[in] in_session Pointer to the Session doing the processing
150
 
   * @param[in] in_table Pointer to the Table being inserted into
 
144
   * @param[inout] Statement message container to modify
 
145
   * @param[in] Pointer to the Session doing the processing
 
146
   * @param[in] Pointer to the Table being inserted into
151
147
   */
152
148
  void setInsertHeader(message::Statement &statement,
153
149
                       Session *in_session,
156
152
   * Helper method which returns an initialized Statement
157
153
   * message for methods doing updates of data.
158
154
   *
159
 
   * @param[in] in_session Pointer to the Session doing the processing
160
 
   * @param[in] in_table Pointer to the Table object being updated
161
 
   * @param[in] old_record Pointer to the old data in the record
162
 
   * @param[in] new_record Pointer to the new data in the record
163
 
   * @param[out] next_segment_id The next Statement segment id to be used
 
155
   * @param[in] Pointer to the Session doing the processing
 
156
   * @param[in] Pointer to the Table object being updated
 
157
   * @param[in] Pointer to the old data in the record
 
158
   * @param[in] Pointer to the new data in the record
164
159
   */
165
160
  message::Statement &getUpdateStatement(Session *in_session,
166
161
                                         Table *in_table,
167
162
                                         const unsigned char *old_record, 
168
 
                                         const unsigned char *new_record,
169
 
                                         uint32_t *next_segment_id);
 
163
                                         const unsigned char *new_record);
170
164
  /**
171
165
   * Helper method which initializes the header message for
172
166
   * update operations.
173
167
   *
174
 
   * @param[in,out] statement Statement message container to modify
175
 
   * @param[in] in_session Pointer to the Session doing the processing
176
 
   * @param[in] in_table Pointer to the Table being updated
177
 
   * @param[in] old_record Pointer to the old data in the record
178
 
   * @param[in] new_record Pointer to the new data in the record
 
168
   * @param[inout] Statement message container to modify
 
169
   * @param[in] Pointer to the Session doing the processing
 
170
   * @param[in] Pointer to the Table being updated
 
171
   * @param[in] Pointer to the old data in the record
 
172
   * @param[in] Pointer to the new data in the record
179
173
   */
180
174
  void setUpdateHeader(message::Statement &statement,
181
175
                       Session *in_session,
186
180
   * Helper method which returns an initialized Statement
187
181
   * message for methods doing deletion of data.
188
182
   *
189
 
   * @param[in] in_session Pointer to the Session doing the processing
190
 
   * @param[in] in_table Pointer to the Table object being deleted from
191
 
   * @param[out] next_segment_id The next Statement segment id to be used
 
183
   * @param[in] Pointer to the Session doing the processing
 
184
   * @param[in] Pointer to the Table object being deleted from
192
185
   */
193
186
  message::Statement &getDeleteStatement(Session *in_session,
194
 
                                         Table *in_table,
195
 
                                         uint32_t *next_segment_id);
 
187
                                         Table *in_table);
196
188
 
197
189
  /**
198
190
   * Helper method which initializes the header message for
199
191
   * insert operations.
200
192
   *
201
 
   * @param[in,out] statement Statement message container to modify
202
 
   * @param[in] in_session Pointer to the Session doing the processing
203
 
   * @param[in] in_table Pointer to the Table being deleted from
 
193
   * @param[inout] Statement message container to modify
 
194
   * @param[in] Pointer to the Session doing the processing
 
195
   * @param[in] Pointer to the Table being deleted from
204
196
   */
205
197
  void setDeleteHeader(message::Statement &statement,
206
198
                       Session *in_session,
209
201
   * Commits a normal transaction (see above) and pushes the transaction
210
202
   * message out to the replicators.
211
203
   *
212
 
   * @param in_session Pointer to the Session committing the transaction
 
204
   * @param Pointer to the Session committing the transaction
213
205
   */
214
206
  int commitTransactionMessage(Session *in_session);
215
207
  /** 
216
208
   * Marks the current active transaction message as being rolled back and
217
209
   * pushes the transaction message out to replicators.
218
210
   *
219
 
   * @param in_session Pointer to the Session committing the transaction
 
211
   * @param Pointer to the Session committing the transaction
220
212
   */
221
213
  void rollbackTransactionMessage(Session *in_session);
222
214
  /**
223
 
   * Rolls back the current statement, deleting the last Statement out of
224
 
   * the current Transaction message.
225
 
   *
226
 
   * @note This depends on having clear statement boundaries (i.e., one
227
 
   * Statement message per actual SQL statement.
228
 
   */
229
 
  void rollbackStatementMessage(Session *in_session);
230
 
  /**
231
215
   * Creates a new InsertRecord GPB message and pushes it to
232
216
   * replicators.
233
217
   *
234
 
   * @param in_session Pointer to the Session which has inserted a record
235
 
   * @param in_table Pointer to the Table containing insert information
 
218
   * @param Pointer to the Session which has inserted a record
 
219
   * @param Pointer to the Table containing insert information
236
220
   *
237
221
   * Grr, returning "true" here on error because of the cursor
238
222
   * reversed bool return crap...fix that.
242
226
   * Creates a new UpdateRecord GPB message and pushes it to
243
227
   * replicators.
244
228
   *
245
 
   * @param in_session Pointer to the Session which has updated a record
246
 
   * @param in_table Pointer to the Table containing update information
247
 
   * @param old_record Pointer to the raw bytes representing the old record/row
248
 
   * @param new_record Pointer to the raw bytes representing the new record/row 
 
229
   * @param Pointer to the Session which has updated a record
 
230
   * @param Pointer to the Table containing update information
 
231
   * @param Pointer to the raw bytes representing the old record/row
 
232
   * @param Pointer to the raw bytes representing the new record/row 
249
233
   */
250
234
  void updateRecord(Session *in_session, 
251
235
                    Table *in_table, 
255
239
   * Creates a new DeleteRecord GPB message and pushes it to
256
240
   * replicators.
257
241
   *
258
 
   * @param in_session Pointer to the Session which has deleted a record
259
 
   * @param in_table Pointer to the Table containing delete information
260
 
   * @param use_update_record If true, uses the values from the update row instead
261
 
   */
262
 
  void deleteRecord(Session *in_session, Table *in_table, bool use_update_record= false);
263
 
 
264
 
  /**
265
 
   * Used to undo effects of a failed statement.
266
 
   *
267
 
   * An SQL statement, like an UPDATE, that affects multiple rows could
268
 
   * potentially fail mid-way through processing the rows. In such a case,
269
 
   * the successfully modified rows that preceeded the failing row would
270
 
   * have been added to the Statement message. This method is used for
271
 
   * rolling back that change.
272
 
   *
273
 
   * @note
274
 
   * This particular failure is seen on column constraint violations
275
 
   * during a multi-row UPDATE and a multi-row INSERT..SELECT.
276
 
   *
277
 
   * @param in_session Pointer to the Session containing the Statement
278
 
   * @param count The number of records to remove from Statement.
279
 
   *
280
 
   * @retval true Successfully removed 'count' records
281
 
   * @retval false Failure
282
 
   */
283
 
  bool removeStatementRecords(Session *in_session, uint32_t count);
284
 
 
 
242
   * @param Pointer to the Session which has deleted a record
 
243
   * @param Pointer to the Table containing delete information
 
244
   */
 
245
  void deleteRecord(Session *in_session, Table *in_table);
285
246
  /**
286
247
   * Creates a CreateSchema Statement GPB message and adds it
287
248
   * to the Session's active Transaction GPB message for pushing
288
249
   * out to the replicator streams.
289
250
   *
290
 
   * @param[in] in_session Pointer to the Session which issued the statement
291
 
   * @param[in] schema message::Schema message describing new schema
 
251
   * @param[in] Pointer to the Session which issued the statement
 
252
   * @param[in] message::Schema message describing new schema
292
253
   */
293
254
  void createSchema(Session *in_session, const message::Schema &schema);
294
255
  /**
296
257
   * to the Session's active Transaction GPB message for pushing
297
258
   * out to the replicator streams.
298
259
   *
299
 
   * @param[in] in_session Pointer to the Session which issued the statement
300
 
   * @param[in] schema_name message::Schema message describing new schema
 
260
   * @param[in] Pointer to the Session which issued the statement
 
261
   * @param[in] message::Schema message describing new schema
301
262
   */
302
263
  void dropSchema(Session *in_session, const std::string &schema_name);
303
264
  /**
305
266
   * to the Session's active Transaction GPB message for pushing
306
267
   * out to the replicator streams.
307
268
   *
308
 
   * @param[in] in_session Pointer to the Session which issued the statement
309
 
   * @param[in] table message::Table message describing new schema
 
269
   * @param[in] Pointer to the Session which issued the statement
 
270
   * @param[in] message::Table message describing new schema
310
271
   */
311
272
  void createTable(Session *in_session, const message::Table &table);
312
273
  /**
314
275
   * to the Session's active Transaction GPB message for pushing
315
276
   * out to the replicator streams.
316
277
   *
317
 
   * @param[in] in_session Pointer to the Session which issued the statement
318
 
   * @param[in] schema_name The schema of the table being dropped
319
 
   * @param[in] table_name The table name of the table being dropped
320
 
   * @param[in] if_exists Did the user specify an IF EXISTS clause?
 
278
   * @param[in] Pointer to the Session which issued the statement
 
279
   * @param[in] The schema of the table being dropped
 
280
   * @param[in] The table name of the table being dropped
 
281
   * @param[in] Did the user specify an IF EXISTS clause?
321
282
   */
322
283
  void dropTable(Session *in_session,
323
284
                     const std::string &schema_name,
328
289
   * to the Session's active Transaction GPB message for pushing
329
290
   * out to the replicator streams.
330
291
   *
331
 
   * @param[in] in_session Pointer to the Session which issued the statement
332
 
   * @param[in] in_table The Table being truncated
 
292
   * @param[in] Pointer to the Session which issued the statement
 
293
   * @param[in] The Table being truncated
333
294
   */
334
295
  void truncateTable(Session *in_session, Table *in_table);
335
296
  /**
341
302
   * on the I_S, etc.  Not sure what to do with administrative
342
303
   * commands like CHECK TABLE, though..
343
304
   *
344
 
   * @param in_session Pointer to the Session which issued the statement
345
 
   * @param query Query string
 
305
   * @param Pointer to the Session which issued the statement
 
306
   * @param Query string
346
307
   */
347
308
  void rawStatement(Session *in_session, const std::string &query);
348
309
  /* transactions: interface to plugin::StorageEngine functions */
372
333
   * per statement, and therefore should not need to be idempotent.
373
334
   * Put in assert()s to test this.
374
335
   *
375
 
   * @param[in] session Session pointer
376
 
   * @param[in] monitored Descriptor for the resource which will be participating
377
 
   * @param[in] engine Pointer to the TransactionalStorageEngine resource
 
336
   * @param[in] Session pointer
 
337
   * @param[in] Descriptor for the resource which will be participating
 
338
   * @param[in] Pointer to the TransactionalStorageEngine resource
378
339
   */
379
340
  void registerResourceForStatement(Session *session,
380
341
                                    plugin::MonitoredInTransaction *monitored,
394
355
   * per statement, and therefore should not need to be idempotent.
395
356
   * Put in assert()s to test this.
396
357
   *
397
 
   * @param[in] session Session pointer
398
 
   * @param[in] monitored Descriptor for the resource which will be participating
399
 
   * @param[in] engine Pointer to the TransactionalStorageEngine resource
400
 
   * @param[in] resource_manager Pointer to the XaResourceManager resource manager
 
358
   * @param[in] Session pointer
 
359
   * @param[in] Descriptor for the resource which will be participating
 
360
   * @param[in] Pointer to the TransactionalStorageEngine resource
 
361
   * @param[in] Pointer to the XaResourceManager resource manager
401
362
   */
402
363
  void registerResourceForStatement(Session *session,
403
364
                                    plugin::MonitoredInTransaction *monitored,
435
396
                                      plugin::MonitoredInTransaction *monitored,
436
397
                                      plugin::TransactionalStorageEngine *engine,
437
398
                                      plugin::XaResourceManager *resource_manager);
438
 
 
439
 
  uint64_t getCurrentTransactionId(Session *session);
440
 
 
441
 
  void allocateNewTransactionId();
442
 
 
443
 
  /**************
444
 
   * Events API
445
 
   **************/
446
 
 
447
 
  /**
448
 
   * Send server startup event.
449
 
   *
450
 
   * @param session Session pointer
451
 
   *
452
 
   * @retval true Success
453
 
   * @retval false Failure
454
 
   */
455
 
  bool sendStartupEvent(Session *session);
456
 
 
457
 
  /**
458
 
   * Send server shutdown event.
459
 
   *
460
 
   * @param session Session pointer
461
 
   *
462
 
   * @retval true Success
463
 
   * @retval false Failure
464
 
   */
465
 
  bool sendShutdownEvent(Session *session);
466
 
 
 
399
  TransactionId getNextTransactionId()
 
400
  {
 
401
    return current_transaction_id.increment();
 
402
  }
 
403
  TransactionId getCurrentTransactionId()
 
404
  {
 
405
    return current_transaction_id;
 
406
  }
 
407
  /**
 
408
   * DEBUG ONLY.  See plugin::TransactionLog::truncate()
 
409
   */
 
410
  void resetTransactionId()
 
411
  {
 
412
    current_transaction_id= 0;
 
413
  }
467
414
private:
468
 
 
469
 
  /**
470
 
   * Checks if a field has been updated 
471
 
   *
472
 
   * @param current_field Pointer to the field to check if it is updated 
473
 
   * @in_table Pointer to the Table containing update information
474
 
   * @param old_record Pointer to the raw bytes representing the old record/row
475
 
   * @param new_record Pointer to the raw bytes representing the new record/row
476
 
   */
477
 
  bool isFieldUpdated(Field *current_field,
478
 
                      Table *in_table,
479
 
                      const unsigned char *old_record,
480
 
                      const unsigned char *new_record);
481
 
 
482
 
  /**
483
 
   * Create a Transaction that contains event information and send it off.
484
 
   *
485
 
   * This differs from other uses of Transaction in that we don't use the
486
 
   * message associated with Session. We create a totally new message and
487
 
   * use it.
488
 
   *
489
 
   * @param session Session pointer
490
 
   * @param event Event message to send
491
 
   *
492
 
   * @note Used by the public Events API.
493
 
   *
494
 
   * @returns Non-zero on error
495
 
   */
496
 
  int sendEvent(Session *session, const message::Event &event);
497
 
 
498
 
  /**
499
 
   * Helper method which checks the UpdateHeader to determine 
500
 
   * if it needs to be finalized.  
501
 
   *
502
 
   * @param[in] statement Statement message container to check 
503
 
   * @param[in] in_table Pointer to the Table being updated
504
 
   * @param[in] old_record Pointer to the old data in the record
505
 
   * @param[in] new_record Pointer to the new data in the record
506
 
   */
507
 
  bool useExistingUpdateHeader(message::Statement &statement,
508
 
                               Table *in_table,
509
 
                               const unsigned char *old_record,
510
 
                               const unsigned char *new_record);
511
 
 
512
 
  plugin::XaStorageEngine *xa_storage_engine;
 
415
  atomic<TransactionId> current_transaction_id;
513
416
};
514
417
 
515
418
} /* namespace drizzled */