~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.h

  • Committer: Monty Taylor
  • Date: 2009-12-25 02:37:09 UTC
  • mto: (1253.2.3 out-of-tree)
  • mto: This revision was merged to the branch mainline in revision 1258.
  • Revision ID: mordred@inaugust.com-20091225023709-r5xr0agiyzyfbhin
pandora-buildĀ v0.88

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2009-2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
6
5
 *
7
6
 *  Authors:
8
7
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com>
 
8
 *    Jay Pipes <joinfu@sun.com>
10
9
 *
11
10
 *  This program is free software; you can redistribute it and/or modify
12
11
 *  it under the terms of the GNU General Public License as published by
26
25
#define DRIZZLED_REPLICATION_SERVICES_H
27
26
 
28
27
#include "drizzled/atomics.h"
29
 
#include "drizzled/plugin/replication.h"
30
 
 
31
 
#include <string>
 
28
 
 
29
#include "drizzled/message/transaction.pb.h"
 
30
 
32
31
#include <vector>
33
 
#include <utility>
34
 
 
35
 
#include "drizzled/visibility.h"
36
 
 
37
 
namespace drizzled
38
 
{
39
32
 
40
33
/* some forward declarations needed */
41
34
class Session;
42
35
class Table;
43
36
 
44
 
namespace plugin
45
 
{
46
 
  class TransactionReplicator;
47
 
  class TransactionApplier;
48
 
}
49
 
namespace message
50
 
{
51
 
  class Transaction;
52
 
}
 
37
namespace drizzled
 
38
{
 
39
  namespace plugin
 
40
  {
 
41
    class TransactionReplicator;
 
42
    class TransactionApplier;
 
43
  }
53
44
 
54
45
/**
55
46
 * This is a class which manages transforming internal 
56
47
 * transactional events into GPB messages and sending those
57
48
 * events out through registered replicators and appliers.
58
49
 */
59
 
class DRIZZLED_API ReplicationServices
 
50
class ReplicationServices
60
51
{
61
52
public:
 
53
  static const size_t DEFAULT_RECORD_SIZE= 100;
62
54
  typedef uint64_t GlobalTransactionId;
63
55
  /**
64
56
   * Types of messages that can go in the transaction
71
63
    TRANSACTION= 1, /* A GPB Transaction Message */
72
64
    BLOB= 2 /* A BLOB value */
73
65
  };
74
 
  typedef std::pair<plugin::TransactionReplicator *, plugin::TransactionApplier *> ReplicationPair;
75
 
  typedef std::vector<ReplicationPair> ReplicationStreams;
76
 
  /**
77
 
   * Method which is called after plugins have been loaded but
78
 
   * before the first client connects.  It determines if the registration
79
 
   * of applier and replicator plugins is proper and pairs
80
 
   * the applier and requested replicator plugins into the replication
81
 
   * streams.
82
 
   *
83
 
   * @todo
84
 
   *
85
 
   * This is only necessary because we don't yet have plugin dependency
86
 
   * tracking...
87
 
   */
88
 
  bool evaluateRegisteredPlugins();
89
 
  /** 
90
 
   * Helper method which pushes a constructed message out to the registered
91
 
   * replicator and applier plugins.
92
 
   *
93
 
   * @param Session descriptor
 
66
  typedef std::vector<plugin::TransactionReplicator *> Replicators;
 
67
  typedef std::vector<plugin::TransactionApplier *> Appliers;
 
68
private:
 
69
  /** 
 
70
   * Atomic boolean set to true if any *active* replicators
 
71
   * or appliers are actually registered.
 
72
   */
 
73
  atomic<bool> is_active;
 
74
  /**
 
75
   * The timestamp of the last time a Transaction message was successfully
 
76
   * applied (sent to an Applier)
 
77
   */
 
78
  atomic<uint64_t> last_applied_timestamp;
 
79
  /** Our collection of replicator plugins */
 
80
  Replicators replicators;
 
81
  /** Our collection of applier plugins */
 
82
  Appliers appliers;
 
83
  /**
 
84
   * Helper method which is called after any change in the
 
85
   * registered appliers or replicators to evaluate whether
 
86
   * any remaining plugins are actually active.
 
87
   * 
 
88
   * This method properly sets the is_active member variable.
 
89
   */
 
90
  void evaluateActivePlugins();
 
91
  /**
 
92
   * Helper method which returns the active Transaction message
 
93
   * for the supplied Session.  If one is not found, a new Transaction
 
94
   * message is allocated, initialized, and returned.
 
95
   *
 
96
   * @param The session processing the transaction
 
97
   */
 
98
  drizzled::message::Transaction *getActiveTransaction(Session *in_session) const;
 
99
  /** 
 
100
   * Helper method which attaches a transaction context
 
101
   * the supplied transaction based on the supplied Session's
 
102
   * transaction information.  This method also ensure the
 
103
   * transaction message is attached properly to the Session object
 
104
   *
 
105
   * @param The transaction message to initialize
 
106
   * @param The Session processing this transaction
 
107
   */
 
108
  void initTransaction(drizzled::message::Transaction &in_command, Session *in_session) const;
 
109
  /** 
 
110
   * Helper method which finalizes data members for the 
 
111
   * supplied transaction's context.
 
112
   *
 
113
   * @param The transaction message to finalize 
 
114
   * @param The Session processing this transaction
 
115
   */
 
116
  void finalizeTransaction(drizzled::message::Transaction &in_command, Session *in_session) const;
 
117
  /**
 
118
   * Helper method which deletes transaction memory and
 
119
   * unsets Session's transaction and statement messages.
 
120
   */
 
121
  void cleanupTransaction(message::Transaction *in_transaction,
 
122
                          Session *in_session) const;
 
123
  /**
 
124
   * Returns true if the transaction contains any Statement
 
125
   * messages which are not end segments (i.e. a bulk statement has
 
126
   * previously been sent to replicators).
 
127
   *
 
128
   * @param The transaction to check
 
129
   */
 
130
  bool transactionContainsBulkSegment(const drizzled::message::Transaction &transaction) const;
 
131
  /**
 
132
   * Helper method which initializes a Statement message
 
133
   *
 
134
   * @param The statement to initialize
 
135
   * @param The type of the statement
 
136
   * @param The session processing this statement
 
137
   */
 
138
  void initStatement(drizzled::message::Statement &statement,
 
139
                     drizzled::message::Statement::Type in_type,
 
140
                     Session *in_session) const;
 
141
  /**
 
142
   * Helper method which returns an initialized Statement
 
143
   * message for methods doing insertion of data.
 
144
   *
 
145
   * @param[in] Pointer to the Session doing the processing
 
146
   * @param[in] Pointer to the Table object being inserted into
 
147
   */
 
148
  message::Statement &getInsertStatement(Session *in_session,
 
149
                                         Table *in_table) const;
 
150
 
 
151
  /**
 
152
   * Helper method which initializes the header message for
 
153
   * insert operations.
 
154
   *
 
155
   * @param[inout] Statement message container to modify
 
156
   * @param[in] Pointer to the Session doing the processing
 
157
   * @param[in] Pointer to the Table being inserted into
 
158
   */
 
159
  void setInsertHeader(message::Statement &statement,
 
160
                       Session *in_session,
 
161
                       Table *in_table) const;
 
162
  /**
 
163
   * Helper method which returns an initialized Statement
 
164
   * message for methods doing updates of data.
 
165
   *
 
166
   * @param[in] Pointer to the Session doing the processing
 
167
   * @param[in] Pointer to the Table object being updated
 
168
   * @param[in] Pointer to the old data in the record
 
169
   * @param[in] Pointer to the new data in the record
 
170
   */
 
171
  message::Statement &getUpdateStatement(Session *in_session,
 
172
                                         Table *in_table,
 
173
                                         const unsigned char *old_record, 
 
174
                                         const unsigned char *new_record) const;
 
175
  /**
 
176
   * Helper method which initializes the header message for
 
177
   * update operations.
 
178
   *
 
179
   * @param[inout] Statement message container to modify
 
180
   * @param[in] Pointer to the Session doing the processing
 
181
   * @param[in] Pointer to the Table being updated
 
182
   * @param[in] Pointer to the old data in the record
 
183
   * @param[in] Pointer to the new data in the record
 
184
   */
 
185
  void setUpdateHeader(message::Statement &statement,
 
186
                       Session *in_session,
 
187
                       Table *in_table,
 
188
                       const unsigned char *old_record, 
 
189
                       const unsigned char *new_record) const;
 
190
  /**
 
191
   * Helper method which returns an initialized Statement
 
192
   * message for methods doing deletion of data.
 
193
   *
 
194
   * @param[in] Pointer to the Session doing the processing
 
195
   * @param[in] Pointer to the Table object being deleted from
 
196
   */
 
197
  message::Statement &getDeleteStatement(Session *in_session,
 
198
                                         Table *in_table) const;
 
199
 
 
200
  /**
 
201
   * Helper method which initializes the header message for
 
202
   * insert operations.
 
203
   *
 
204
   * @param[inout] Statement message container to modify
 
205
   * @param[in] Pointer to the Session doing the processing
 
206
   * @param[in] Pointer to the Table being deleted from
 
207
   */
 
208
  void setDeleteHeader(message::Statement &statement,
 
209
                       Session *in_session,
 
210
                       Table *in_table) const;
 
211
  /**
 
212
   * Helper method which pushes a constructed message out
 
213
   * to the registered replicator and applier plugins.
 
214
   *
94
215
   * @param Message to push out
95
216
   */
96
 
  plugin::ReplicationReturnCode pushTransactionMessage(Session &in_session,
97
 
                                                       message::Transaction &to_push);
 
217
  void push(drizzled::message::Transaction &to_push);
 
218
public:
98
219
  /**
99
220
   * Constructor
100
221
   */
116
237
   * a replicator and an applier that are *active*?
117
238
   */
118
239
  bool isActive() const;
119
 
 
120
 
  /**
121
 
   * Returns the list of replication streams
122
 
   */
123
 
  ReplicationStreams &getReplicationStreams();
124
 
 
125
240
  /**
126
241
   * Attaches a replicator to our internal collection of
127
242
   * replicators.
128
243
   *
129
244
   * @param Pointer to a replicator to attach/register
130
245
   */
131
 
  void attachReplicator(plugin::TransactionReplicator *in_replicator);
132
 
  
 
246
  void attachReplicator(drizzled::plugin::TransactionReplicator *in_replicator);
133
247
  /**
134
248
   * Detaches/unregisters a replicator with our internal
135
249
   * collection of replicators.
136
250
   *
137
251
   * @param Pointer to the replicator to detach
138
252
   */
139
 
  void detachReplicator(plugin::TransactionReplicator *in_replicator);
140
 
  
 
253
  void detachReplicator(drizzled::plugin::TransactionReplicator *in_replicator);
141
254
  /**
142
255
   * Attaches a applier to our internal collection of
143
256
   * appliers.
144
257
   *
145
258
   * @param Pointer to a applier to attach/register
146
 
   * @param The name of the replicator to pair with
147
259
   */
148
 
  void attachApplier(plugin::TransactionApplier *in_applier, const std::string &requested_replicator);
149
 
  
 
260
  void attachApplier(drizzled::plugin::TransactionApplier *in_applier);
150
261
  /**
151
262
   * Detaches/unregisters a applier with our internal
152
263
   * collection of appliers.
153
264
   *
154
265
   * @param Pointer to the applier to detach
155
266
   */
156
 
  void detachApplier(plugin::TransactionApplier *in_applier);
157
 
 
158
 
  /** 
159
 
   * Returns the timestamp of the last Transaction which was sent to an
160
 
   * applier.
 
267
  void detachApplier(drizzled::plugin::TransactionApplier *in_applier);
 
268
  /**
 
269
   * Commits a normal transaction (see above) and pushes the
 
270
   * transaction message out to the replicators.
 
271
   *
 
272
   * @param Pointer to the Session committing the transaction
 
273
   */
 
274
  void commitTransaction(Session *in_session);
 
275
  /**
 
276
   * Marks the current active transaction message as being rolled
 
277
   * back and pushes the transaction message out to replicators.
 
278
   *
 
279
   * @param Pointer to the Session committing the transaction
 
280
   */
 
281
  void rollbackTransaction(Session *in_session);
 
282
  /**
 
283
   * Finalizes a Statement message and sets the Session's statement
 
284
   * message to NULL.
 
285
   *
 
286
   * @param The statement to initialize
 
287
   * @param The session processing this statement
 
288
   */
 
289
  void finalizeStatement(drizzled::message::Statement &statement,
 
290
                         Session *in_session) const;
 
291
  /**
 
292
   * Creates a new InsertRecord GPB message and pushes it to
 
293
   * replicators.
 
294
   *
 
295
   * @param Pointer to the Session which has inserted a record
 
296
   * @param Pointer to the Table containing insert information
 
297
   *
 
298
   * Grr, returning "true" here on error because of the cursor
 
299
   * reversed bool return crap...fix that.
 
300
   */
 
301
  bool insertRecord(Session *in_session, Table *in_table);
 
302
  /**
 
303
   * Creates a new UpdateRecord GPB message and pushes it to
 
304
   * replicators.
 
305
   *
 
306
   * @param Pointer to the Session which has updated a record
 
307
   * @param Pointer to the Table containing update information
 
308
   * @param Pointer to the raw bytes representing the old record/row
 
309
   * @param Pointer to the raw bytes representing the new record/row 
 
310
   */
 
311
  void updateRecord(Session *in_session, 
 
312
                    Table *in_table, 
 
313
                    const unsigned char *old_record, 
 
314
                    const unsigned char *new_record);
 
315
  /**
 
316
   * Creates a new DeleteRecord GPB message and pushes it to
 
317
   * replicators.
 
318
   *
 
319
   * @param Pointer to the Session which has deleted a record
 
320
   * @param Pointer to the Table containing delete information
 
321
   */
 
322
  void deleteRecord(Session *in_session, Table *in_table);
 
323
  /**
 
324
   * Creates a TruncateTable Statement GPB message and add it
 
325
   * to the Session's active Transaction GPB message for pushing
 
326
   * out to the replicator streams.
 
327
   *
 
328
   * @param[in] Pointer to the Session which issued the statement
 
329
   * @param[in] The Table being truncated
 
330
   */
 
331
  void truncateTable(Session *in_session, Table *in_table);
 
332
  /**
 
333
   * Creates a new RawSql GPB message and pushes it to 
 
334
   * replicators.
 
335
   *
 
336
   * @TODO With a real data dictionary, this really shouldn't
 
337
   * be needed.  CREATE TABLE would map to insertRecord call
 
338
   * on the I_S, etc.  Not sure what to do with administrative
 
339
   * commands like CHECK TABLE, though..
 
340
   *
 
341
   * @param Pointer to the Session which issued the statement
 
342
   * @param Query string
 
343
   * @param Length of the query string
 
344
   */
 
345
  void rawStatement(Session *in_session, const char *in_query, size_t in_query_len);
 
346
  /**
 
347
   * Returns the timestamp of the last Transaction which was sent to 
 
348
   * an applier.
161
349
   */
162
350
  uint64_t getLastAppliedTimestamp() const;
163
 
private:
164
 
  typedef std::vector<plugin::TransactionReplicator *> Replicators;
165
 
  typedef std::vector<std::pair<std::string, plugin::TransactionApplier *> > Appliers;
166
 
  /** 
167
 
   * Atomic boolean set to true if any *active* replicators
168
 
   * or appliers are actually registered.
169
 
   */
170
 
  bool is_active;
171
 
  /**
172
 
   * The timestamp of the last time a Transaction message was successfully
173
 
   * applied (sent to an Applier)
174
 
   */
175
 
  atomic<uint64_t> last_applied_timestamp;
176
 
  /** Our collection of registered replicator plugins */
177
 
  Replicators replicators;
178
 
  /** Our collection of registered applier plugins and their requested replicator plugin names */
179
 
  Appliers appliers;
180
 
  /** Our replication streams */
181
 
  ReplicationStreams replication_streams;
182
 
  /**
183
 
   * Strips underscores and lowercases supplied replicator name
184
 
   * or requested name, and appends the suffix "replicator" if missing...
185
 
   */
186
 
  void normalizeReplicatorName(std::string &name);
187
351
};
188
352
 
189
 
} /* namespace drizzled */
 
353
} /* end namespace drizzled */
190
354
 
191
355
#endif /* DRIZZLED_REPLICATION_SERVICES_H */