~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.h

  • Committer: Jay Pipes
  • Date: 2009-10-07 23:59:47 UTC
  • mto: (1234.1.1 push) (1237.2.10 push)
  • mto: This revision was merged to the branch mainline in revision 1193.
  • Revision ID: jpipes@serialcoder-20091007235947-18simrecnzwv8t1q
Phase 2 new replication work:

* Removes old replication.proto file, old command_transform library
* Removes use of korr.h macro calls in favor of GPB's CodedOutputStream
  API.
* Updates transaction_log, default_replicator, and filtered_replicator module
  to use new Transaction message.
* Updates ReplicationServices to construct the new Transaction messages and
  associated Statement sub-messages
* Corrects transaction boundaries.  AUTOCOMMIT now works properly, and I have
  added a new test case to verify AUTOCOMMIT variable modification of the way
  in which Transaction messages are bundled up and sent across to replicators.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
#define DRIZZLED_REPLICATION_SERVICES_H
26
26
 
27
27
#include "drizzled/atomics.h"
 
28
 
 
29
#include "drizzled/message/transaction.pb.h"
 
30
 
28
31
#include <vector>
29
32
 
30
33
/* some forward declarations needed */
35
38
{
36
39
  namespace plugin
37
40
  {
38
 
    class CommandReplicator;
39
 
    class CommandApplier;
40
 
  }
41
 
  namespace message
42
 
  {
43
 
    class Command;
 
41
    class TransactionReplicator;
 
42
    class TransactionApplier;
44
43
  }
45
44
 
46
45
 
61
60
   */
62
61
  atomic<bool> is_active;
63
62
  /**
64
 
   * The timestamp of the last time a Command message was successfully
 
63
   * The timestamp of the last time a Transaction message was successfully
65
64
   * applied (sent to an Applier)
66
65
   */
67
66
  atomic<uint64_t> last_applied_timestamp;
68
67
  /** Our collection of replicator plugins */
69
 
  std::vector<drizzled::plugin::CommandReplicator *> replicators;
 
68
  std::vector<drizzled::plugin::TransactionReplicator *> replicators;
70
69
  /** Our collection of applier plugins */
71
 
  std::vector<drizzled::plugin::CommandApplier *> appliers;
 
70
  std::vector<drizzled::plugin::TransactionApplier *> appliers;
72
71
  /**
73
72
   * Helper method which is called after any change in the
74
73
   * registered appliers or replicators to evaluate whether
77
76
   * This method properly sets the is_active member variable.
78
77
   */
79
78
  void evaluateActivePlugins();
 
79
  /**
 
80
   * Helper method which returns the active Transaction message
 
81
   * for the supplied Session.  If one is not found, a new Transaction
 
82
   * message is allocated, initialized, and returned.
 
83
   *
 
84
   * @param The session processing the transaction
 
85
   */
 
86
  drizzled::message::Transaction *getActiveTransaction(Session *in_session) const;
80
87
  /** 
81
88
   * Helper method which attaches a transaction context
82
 
   * the supplied command based on the supplied Session's
83
 
   * transaction information.
84
 
   */
85
 
  void setCommandTransactionContext(drizzled::message::Command &in_command, Session *in_session) const;
 
89
   * the supplied transaction based on the supplied Session's
 
90
   * transaction information.  This method also ensure the
 
91
   * transaction message is attached properly to the Session object
 
92
   *
 
93
   * @param The transaction message to initialize
 
94
   * @param The Session processing this transaction
 
95
   */
 
96
  void initTransaction(drizzled::message::Transaction &in_command, Session *in_session) const;
 
97
  /** 
 
98
   * Helper method which finalizes data members for the 
 
99
   * supplied transaction's context.
 
100
   *
 
101
   * @param The transaction message to finalize 
 
102
   * @param The Session processing this transaction
 
103
   */
 
104
  void finalizeTransaction(drizzled::message::Transaction &in_command, Session *in_session) const;
 
105
  /**
 
106
   * Helper method which deletes transaction memory and
 
107
   * unsets Session's transaction and statement messages.
 
108
   */
 
109
  void cleanupTransaction(message::Transaction *in_transaction,
 
110
                          Session *in_session) const;
 
111
  /**
 
112
   * Helper method which initializes a Statement message
 
113
   *
 
114
   * @param The statement to initialize
 
115
   * @param The type of the statement
 
116
   * @param The session processing this statement
 
117
   */
 
118
  void initStatement(drizzled::message::Statement &statement,
 
119
                     drizzled::message::Statement::Type in_type,
 
120
                     Session *in_session) const;
 
121
  /**
 
122
   * Helper method which finalizes a Statement message
 
123
   *
 
124
   * @param The statement to initialize
 
125
   * @param The session processing this statement
 
126
   */
 
127
  void finalizeStatement(drizzled::message::Statement &statement,
 
128
                         Session *in_session) const;
 
129
  /**
 
130
   * Helper method which returns an initialized Statement
 
131
   * message for methods doing insertion of data.
 
132
   *
 
133
   * @param[in] Pointer to the Session doing the processing
 
134
   * @param[in] Pointer to the Table object being inserted into
 
135
   */
 
136
  message::Statement &getInsertStatement(Session *in_session,
 
137
                                         Table *in_table) const;
 
138
 
 
139
  /**
 
140
   * Helper method which initializes the header message for
 
141
   * insert operations.
 
142
   *
 
143
   * @param[inout] Statement message container to modify
 
144
   * @param[in] Pointer to the Session doing the processing
 
145
   * @param[in] Pointer to the Table being inserted into
 
146
   */
 
147
  void setInsertHeader(message::Statement &statement,
 
148
                       Session *in_session,
 
149
                       Table *in_table) const;
 
150
  /**
 
151
   * Helper method which returns an initialized Statement
 
152
   * message for methods doing updates of data.
 
153
   *
 
154
   * @param[in] Pointer to the Session doing the processing
 
155
   * @param[in] Pointer to the Table object being updated
 
156
   * @param[in] Pointer to the old data in the record
 
157
   * @param[in] Pointer to the new data in the record
 
158
   */
 
159
  message::Statement &getUpdateStatement(Session *in_session,
 
160
                                         Table *in_table,
 
161
                                         const unsigned char *old_record, 
 
162
                                         const unsigned char *new_record) const;
 
163
  /**
 
164
   * Helper method which initializes the header message for
 
165
   * update operations.
 
166
   *
 
167
   * @param[inout] Statement message container to modify
 
168
   * @param[in] Pointer to the Session doing the processing
 
169
   * @param[in] Pointer to the Table being updated
 
170
   * @param[in] Pointer to the old data in the record
 
171
   * @param[in] Pointer to the new data in the record
 
172
   */
 
173
  void setUpdateHeader(message::Statement &statement,
 
174
                       Session *in_session,
 
175
                       Table *in_table,
 
176
                       const unsigned char *old_record, 
 
177
                       const unsigned char *new_record) const;
 
178
  /**
 
179
   * Helper method which returns an initialized Statement
 
180
   * message for methods doing deletion of data.
 
181
   *
 
182
   * @param[in] Pointer to the Session doing the processing
 
183
   * @param[in] Pointer to the Table object being deleted from
 
184
   */
 
185
  message::Statement &getDeleteStatement(Session *in_session,
 
186
                                         Table *in_table) const;
 
187
 
 
188
  /**
 
189
   * Helper method which initializes the header message for
 
190
   * insert operations.
 
191
   *
 
192
   * @param[inout] Statement message container to modify
 
193
   * @param[in] Pointer to the Session doing the processing
 
194
   * @param[in] Pointer to the Table being deleted from
 
195
   */
 
196
  void setDeleteHeader(message::Statement &statement,
 
197
                       Session *in_session,
 
198
                       Table *in_table) const;
86
199
  /**
87
200
   * Helper method which pushes a constructed message out
88
201
   * to the registered replicator and applier plugins.
89
202
   *
90
203
   * @param Message to push out
91
204
   */
92
 
  void push(drizzled::message::Command &to_push);
 
205
  void push(drizzled::message::Transaction &to_push);
93
206
public:
94
207
  /**
95
208
   * Constructor
118
231
   *
119
232
   * @param Pointer to a replicator to attach/register
120
233
   */
121
 
  void attachReplicator(drizzled::plugin::CommandReplicator *in_replicator);
 
234
  void attachReplicator(drizzled::plugin::TransactionReplicator *in_replicator);
122
235
  /**
123
236
   * Detaches/unregisters a replicator with our internal
124
237
   * collection of replicators.
125
238
   *
126
239
   * @param Pointer to the replicator to detach
127
240
   */
128
 
  void detachReplicator(drizzled::plugin::CommandReplicator *in_replicator);
 
241
  void detachReplicator(drizzled::plugin::TransactionReplicator *in_replicator);
129
242
  /**
130
243
   * Attaches a applier to our internal collection of
131
244
   * appliers.
132
245
   *
133
246
   * @param Pointer to a applier to attach/register
134
247
   */
135
 
  void attachApplier(drizzled::plugin::CommandApplier *in_applier);
 
248
  void attachApplier(drizzled::plugin::TransactionApplier *in_applier);
136
249
  /**
137
250
   * Detaches/unregisters a applier with our internal
138
251
   * collection of appliers.
139
252
   *
140
253
   * @param Pointer to the applier to detach
141
254
   */
142
 
  void detachApplier(drizzled::plugin::CommandApplier *in_applier);
 
255
  void detachApplier(drizzled::plugin::TransactionApplier *in_applier);
143
256
  /**
144
 
   * Creates a new StartTransaction GPB message and pushes
145
 
   * it to replicators.
 
257
   * Creates a new Transaction GPB message and attaches the message
 
258
   * to the supplied session object.
 
259
   *
 
260
   * @note
 
261
   *
 
262
   * This method is called when a "normal" transaction -- i.e. an 
 
263
   * explicitly-started transaction from a client -- is started with 
 
264
   * BEGIN or START TRANSACTION.
146
265
   *
147
266
   * @param Pointer to the Session starting the transaction
148
267
   */
149
 
  void startTransaction(Session *in_session);
 
268
  void startNormalTransaction(Session *in_session);
150
269
  /**
151
 
   * Creates a new CommitTransaction GPB message and pushes
152
 
   * it to replicators.
 
270
   * Commits a normal transaction (see above) and pushes the
 
271
   * transaction message out to the replicators.
153
272
   *
154
273
   * @param Pointer to the Session committing the transaction
155
274
   */
156
 
  void commitTransaction(Session *in_session);
 
275
  void commitNormalTransaction(Session *in_session);
157
276
  /**
158
 
   * Creates a new RollbackTransaction GPB message and pushes
159
 
   * it to replicators.
 
277
   * Marks the current active transaction message as being rolled
 
278
   * back and pushes the transaction message out to replicators.
160
279
   *
161
280
   * @param Pointer to the Session committing the transaction
162
281
   */
205
324
   */
206
325
  void rawStatement(Session *in_session, const char *in_query, size_t in_query_len);
207
326
  /**
208
 
   * Returns the timestamp of the last Command which was sent to 
 
327
   * Returns the timestamp of the last Transaction which was sent to 
209
328
   * an applier.
210
329
   */
211
330
  uint64_t getLastAppliedTimestamp() const;