21
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
#ifndef DRIZZLED_REPLICATION_SERVICES_H
25
#define DRIZZLED_REPLICATION_SERVICES_H
27
#include "drizzled/atomics.h"
29
#include "drizzled/message/transaction.pb.h"
37
/* some forward declarations needed */
43
class TransactionReplicator;
44
class TransactionApplier;
48
* This is a class which manages transforming internal
49
* transactional events into GPB messages and sending those
50
* events out through registered replicators and appliers.
52
class ReplicationServices
55
static const size_t DEFAULT_RECORD_SIZE= 100;
56
typedef uint64_t GlobalTransactionId;
58
* Types of messages that can go in the transaction
59
* log file. Every time something is written into the
60
* transaction log, it is preceded by a header containing
61
* the type of message which follows.
65
TRANSACTION= 1, /* A GPB Transaction Message */
66
BLOB= 2 /* A BLOB value */
68
typedef std::vector<plugin::TransactionReplicator *> Replicators;
69
typedef std::vector<plugin::TransactionApplier *> Appliers;
72
* Atomic boolean set to true if any *active* replicators
73
* or appliers are actually registered.
75
atomic<bool> is_active;
77
* The timestamp of the last time a Transaction message was successfully
78
* applied (sent to an Applier)
80
atomic<uint64_t> last_applied_timestamp;
81
/** Our collection of replicator plugins */
82
Replicators replicators;
83
/** Our collection of applier plugins */
86
* Helper method which is called after any change in the
87
* registered appliers or replicators to evaluate whether
88
* any remaining plugins are actually active.
90
* This method properly sets the is_active member variable.
92
void evaluateActivePlugins();
94
* Helper method which returns the active Transaction message
95
* for the supplied Session. If one is not found, a new Transaction
96
* message is allocated, initialized, and returned.
98
* @param The session processing the transaction
100
message::Transaction *getActiveTransaction(Session *in_session) const;
102
* Helper method which attaches a transaction context
103
* the supplied transaction based on the supplied Session's
104
* transaction information. This method also ensure the
105
* transaction message is attached properly to the Session object
107
* @param The transaction message to initialize
108
* @param The Session processing this transaction
110
void initTransaction(message::Transaction &in_command, Session *in_session) const;
112
* Helper method which finalizes data members for the
113
* supplied transaction's context.
115
* @param The transaction message to finalize
116
* @param The Session processing this transaction
118
void finalizeTransaction(message::Transaction &in_command, Session *in_session) const;
120
* Helper method which deletes transaction memory and
121
* unsets Session's transaction and statement messages.
123
void cleanupTransaction(message::Transaction *in_transaction,
124
Session *in_session) const;
126
* Returns true if the transaction contains any Statement
127
* messages which are not end segments (i.e. a bulk statement has
128
* previously been sent to replicators).
130
* @param The transaction to check
132
bool transactionContainsBulkSegment(const message::Transaction &transaction) const;
134
* Helper method which initializes a Statement message
136
* @param The statement to initialize
137
* @param The type of the statement
138
* @param The session processing this statement
140
void initStatement(message::Statement &statement,
141
message::Statement::Type in_type,
142
Session *in_session) const;
144
* Helper method which returns an initialized Statement
145
* message for methods doing insertion of data.
147
* @param[in] Pointer to the Session doing the processing
148
* @param[in] Pointer to the Table object being inserted into
150
message::Statement &getInsertStatement(Session *in_session,
151
Table *in_table) const;
154
* Helper method which initializes the header message for
157
* @param[inout] Statement message container to modify
158
* @param[in] Pointer to the Session doing the processing
159
* @param[in] Pointer to the Table being inserted into
161
void setInsertHeader(message::Statement &statement,
163
Table *in_table) const;
165
* Helper method which returns an initialized Statement
166
* message for methods doing updates of data.
168
* @param[in] Pointer to the Session doing the processing
169
* @param[in] Pointer to the Table object being updated
170
* @param[in] Pointer to the old data in the record
171
* @param[in] Pointer to the new data in the record
173
message::Statement &getUpdateStatement(Session *in_session,
175
const unsigned char *old_record,
176
const unsigned char *new_record) const;
178
* Helper method which initializes the header message for
181
* @param[inout] Statement message container to modify
182
* @param[in] Pointer to the Session doing the processing
183
* @param[in] Pointer to the Table being updated
184
* @param[in] Pointer to the old data in the record
185
* @param[in] Pointer to the new data in the record
187
void setUpdateHeader(message::Statement &statement,
190
const unsigned char *old_record,
191
const unsigned char *new_record) const;
193
* Helper method which returns an initialized Statement
194
* message for methods doing deletion of data.
196
* @param[in] Pointer to the Session doing the processing
197
* @param[in] Pointer to the Table object being deleted from
199
message::Statement &getDeleteStatement(Session *in_session,
200
Table *in_table) const;
203
* Helper method which initializes the header message for
206
* @param[inout] Statement message container to modify
207
* @param[in] Pointer to the Session doing the processing
208
* @param[in] Pointer to the Table being deleted from
210
void setDeleteHeader(message::Statement &statement,
212
Table *in_table) const;
214
* Helper method which pushes a constructed message out
215
* to the registered replicator and applier plugins.
217
* @param Message to push out
219
void push(message::Transaction &to_push);
224
ReplicationServices();
228
* Returns the singleton instance of ReplicationServices
230
static inline ReplicationServices &singleton()
232
static ReplicationServices replication_services;
233
return replication_services;
237
* Returns whether the ReplicationServices object
238
* is active. In other words, does it have both
239
* a replicator and an applier that are *active*?
241
bool isActive() const;
243
* Attaches a replicator to our internal collection of
246
* @param Pointer to a replicator to attach/register
248
void attachReplicator(plugin::TransactionReplicator *in_replicator);
250
* Detaches/unregisters a replicator with our internal
251
* collection of replicators.
253
* @param Pointer to the replicator to detach
255
void detachReplicator(plugin::TransactionReplicator *in_replicator);
257
* Attaches a applier to our internal collection of
260
* @param Pointer to a applier to attach/register
262
void attachApplier(plugin::TransactionApplier *in_applier);
264
* Detaches/unregisters a applier with our internal
265
* collection of appliers.
267
* @param Pointer to the applier to detach
269
void detachApplier(plugin::TransactionApplier *in_applier);
271
* Commits a normal transaction (see above) and pushes the
272
* transaction message out to the replicators.
274
* @param Pointer to the Session committing the transaction
276
void commitTransaction(Session *in_session);
278
* Marks the current active transaction message as being rolled
279
* back and pushes the transaction message out to replicators.
281
* @param Pointer to the Session committing the transaction
283
void rollbackTransaction(Session *in_session);
285
* Finalizes a Statement message and sets the Session's statement
288
* @param The statement to initialize
289
* @param The session processing this statement
291
void finalizeStatement(message::Statement &statement,
292
Session *in_session) const;
294
* Creates a new InsertRecord GPB message and pushes it to
297
* @param Pointer to the Session which has inserted a record
298
* @param Pointer to the Table containing insert information
300
* Grr, returning "true" here on error because of the cursor
301
* reversed bool return crap...fix that.
303
bool insertRecord(Session *in_session, Table *in_table);
305
* Creates a new UpdateRecord GPB message and pushes it to
308
* @param Pointer to the Session which has updated a record
309
* @param Pointer to the Table containing update information
310
* @param Pointer to the raw bytes representing the old record/row
311
* @param Pointer to the raw bytes representing the new record/row
313
void updateRecord(Session *in_session,
315
const unsigned char *old_record,
316
const unsigned char *new_record);
318
* Creates a new DeleteRecord GPB message and pushes it to
321
* @param Pointer to the Session which has deleted a record
322
* @param Pointer to the Table containing delete information
324
void deleteRecord(Session *in_session, Table *in_table);
326
* Creates a TruncateTable Statement GPB message and add it
327
* to the Session's active Transaction GPB message for pushing
328
* out to the replicator streams.
330
* @param[in] Pointer to the Session which issued the statement
331
* @param[in] The Table being truncated
333
void truncateTable(Session *in_session, Table *in_table);
335
* Creates a new RawSql GPB message and pushes it to
338
* @TODO With a real data dictionary, this really shouldn't
339
* be needed. CREATE TABLE would map to insertRecord call
340
* on the I_S, etc. Not sure what to do with administrative
341
* commands like CHECK TABLE, though..
343
* @param Pointer to the Session which issued the statement
344
* @param Query string
346
void rawStatement(Session *in_session, const std::string &query);
348
* Returns the timestamp of the last Transaction which was sent to
351
uint64_t getLastAppliedTimestamp() const;
354
} /* namespace drizzled */
356
#endif /* DRIZZLED_REPLICATION_SERVICES_H */
20
#ifndef DRIZZLED_REPLICATOR_H
21
#define DRIZZLED_REPLICATOR_H
23
#include <drizzled/plugin_replicator.h>
25
int replicator_initializer (st_plugin_int *plugin);
26
int replicator_finalizer (st_plugin_int *plugin);
28
/* todo, fill in this API */
29
/* these are the functions called by the rest of the drizzle server
30
to do whatever this plugin does. */
31
bool replicator_session_init (Session *session);
32
bool replicator_write_row(Session *session, Table *table);
33
bool replicator_update_row(Session *session, Table *table,
34
const unsigned char *before,
35
const unsigned char *after);
36
bool replicator_delete_row(Session *session, Table *table);
38
/* The below control transactions */
39
bool replicator_end_transaction(Session *session, bool autocommit, bool commit);
40
bool replicator_rollback_to_savepoint(Session *session, void *save_point);
41
bool replicator_savepoint_set(Session *session, void *save_point);
42
bool replicator_prepare(Session *session);
44
#endif /* DRIZZLED_REPLICATOR_H */