~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.h

  • Committer: Brian Aker
  • Date: 2009-11-18 22:58:22 UTC
  • mto: (1223.1.1 push) (1226.1.2 push)
  • mto: This revision was merged to the branch mainline in revision 1224.
  • Revision ID: brian@gaz-20091118225822-4ryr9rviir23o0kr
Second pass through bugs related to CREATE TABLE LIKE

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 
 *  Copyright (c) 2009-2010 Jay Pipes <jaypipes@gmail.com>
6
5
 *
7
6
 *  Authors:
8
7
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com>
 
8
 *    Jay Pipes <joinfu@sun.com>
10
9
 *
11
10
 *  This program is free software; you can redistribute it and/or modify
12
11
 *  it under the terms of the GNU General Public License as published by
26
25
#define DRIZZLED_REPLICATION_SERVICES_H
27
26
 
28
27
#include "drizzled/atomics.h"
29
 
#include "drizzled/plugin/replication.h"
30
 
 
31
 
#include <string>
 
28
 
 
29
#include "drizzled/message/transaction.pb.h"
 
30
 
32
31
#include <vector>
33
 
#include <utility>
34
 
 
35
 
namespace drizzled
36
 
{
37
32
 
38
33
/* some forward declarations needed */
39
34
class Session;
40
35
class Table;
41
36
 
42
 
namespace plugin
43
 
{
44
 
  class TransactionReplicator;
45
 
  class TransactionApplier;
46
 
}
47
 
namespace message
48
 
{
49
 
  class Transaction;
50
 
}
 
37
namespace drizzled
 
38
{
 
39
  namespace plugin
 
40
  {
 
41
    class TransactionReplicator;
 
42
    class TransactionApplier;
 
43
  }
51
44
 
52
45
/**
53
46
 * This is a class which manages transforming internal 
57
50
class ReplicationServices
58
51
{
59
52
public:
 
53
  static const size_t DEFAULT_RECORD_SIZE= 100;
60
54
  typedef uint64_t GlobalTransactionId;
61
55
  /**
62
56
   * Types of messages that can go in the transaction
69
63
    TRANSACTION= 1, /* A GPB Transaction Message */
70
64
    BLOB= 2 /* A BLOB value */
71
65
  };
72
 
  typedef std::pair<plugin::TransactionReplicator *, plugin::TransactionApplier *> ReplicationPair;
73
 
  typedef std::vector<ReplicationPair> ReplicationStreams;
74
 
  /**
75
 
   * Method which is called after plugins have been loaded but
76
 
   * before the first client connects.  It determines if the registration
77
 
   * of applier and replicator plugins is proper and pairs
78
 
   * the applier and requested replicator plugins into the replication
79
 
   * streams.
80
 
   *
81
 
   * @todo
82
 
   *
83
 
   * This is only necessary because we don't yet have plugin dependency
84
 
   * tracking...
85
 
   */
86
 
  bool evaluateRegisteredPlugins();
87
 
  /** 
88
 
   * Helper method which pushes a constructed message out to the registered
89
 
   * replicator and applier plugins.
90
 
   *
91
 
   * @param Session descriptor
 
66
private:
 
67
  /** 
 
68
   * Atomic boolean set to true if any *active* replicators
 
69
   * or appliers are actually registered.
 
70
   */
 
71
  atomic<bool> is_active;
 
72
  /**
 
73
   * The timestamp of the last time a Transaction message was successfully
 
74
   * applied (sent to an Applier)
 
75
   */
 
76
  atomic<uint64_t> last_applied_timestamp;
 
77
  /** Our collection of replicator plugins */
 
78
  std::vector<drizzled::plugin::TransactionReplicator *> replicators;
 
79
  /** Our collection of applier plugins */
 
80
  std::vector<drizzled::plugin::TransactionApplier *> appliers;
 
81
  /**
 
82
   * Helper method which is called after any change in the
 
83
   * registered appliers or replicators to evaluate whether
 
84
   * any remaining plugins are actually active.
 
85
   * 
 
86
   * This method properly sets the is_active member variable.
 
87
   */
 
88
  void evaluateActivePlugins();
 
89
  /**
 
90
   * Helper method which returns the active Transaction message
 
91
   * for the supplied Session.  If one is not found, a new Transaction
 
92
   * message is allocated, initialized, and returned.
 
93
   *
 
94
   * @param The session processing the transaction
 
95
   */
 
96
  drizzled::message::Transaction *getActiveTransaction(Session *in_session) const;
 
97
  /** 
 
98
   * Helper method which attaches a transaction context
 
99
   * the supplied transaction based on the supplied Session's
 
100
   * transaction information.  This method also ensure the
 
101
   * transaction message is attached properly to the Session object
 
102
   *
 
103
   * @param The transaction message to initialize
 
104
   * @param The Session processing this transaction
 
105
   */
 
106
  void initTransaction(drizzled::message::Transaction &in_command, Session *in_session) const;
 
107
  /** 
 
108
   * Helper method which finalizes data members for the 
 
109
   * supplied transaction's context.
 
110
   *
 
111
   * @param The transaction message to finalize 
 
112
   * @param The Session processing this transaction
 
113
   */
 
114
  void finalizeTransaction(drizzled::message::Transaction &in_command, Session *in_session) const;
 
115
  /**
 
116
   * Helper method which deletes transaction memory and
 
117
   * unsets Session's transaction and statement messages.
 
118
   */
 
119
  void cleanupTransaction(message::Transaction *in_transaction,
 
120
                          Session *in_session) const;
 
121
  /**
 
122
   * Helper method which initializes a Statement message
 
123
   *
 
124
   * @param The statement to initialize
 
125
   * @param The type of the statement
 
126
   * @param The session processing this statement
 
127
   */
 
128
  void initStatement(drizzled::message::Statement &statement,
 
129
                     drizzled::message::Statement::Type in_type,
 
130
                     Session *in_session) const;
 
131
  /**
 
132
   * Helper method which returns an initialized Statement
 
133
   * message for methods doing insertion of data.
 
134
   *
 
135
   * @param[in] Pointer to the Session doing the processing
 
136
   * @param[in] Pointer to the Table object being inserted into
 
137
   */
 
138
  message::Statement &getInsertStatement(Session *in_session,
 
139
                                         Table *in_table) const;
 
140
 
 
141
  /**
 
142
   * Helper method which initializes the header message for
 
143
   * insert operations.
 
144
   *
 
145
   * @param[inout] Statement message container to modify
 
146
   * @param[in] Pointer to the Session doing the processing
 
147
   * @param[in] Pointer to the Table being inserted into
 
148
   */
 
149
  void setInsertHeader(message::Statement &statement,
 
150
                       Session *in_session,
 
151
                       Table *in_table) const;
 
152
  /**
 
153
   * Helper method which returns an initialized Statement
 
154
   * message for methods doing updates of data.
 
155
   *
 
156
   * @param[in] Pointer to the Session doing the processing
 
157
   * @param[in] Pointer to the Table object being updated
 
158
   * @param[in] Pointer to the old data in the record
 
159
   * @param[in] Pointer to the new data in the record
 
160
   */
 
161
  message::Statement &getUpdateStatement(Session *in_session,
 
162
                                         Table *in_table,
 
163
                                         const unsigned char *old_record, 
 
164
                                         const unsigned char *new_record) const;
 
165
  /**
 
166
   * Helper method which initializes the header message for
 
167
   * update operations.
 
168
   *
 
169
   * @param[inout] Statement message container to modify
 
170
   * @param[in] Pointer to the Session doing the processing
 
171
   * @param[in] Pointer to the Table being updated
 
172
   * @param[in] Pointer to the old data in the record
 
173
   * @param[in] Pointer to the new data in the record
 
174
   */
 
175
  void setUpdateHeader(message::Statement &statement,
 
176
                       Session *in_session,
 
177
                       Table *in_table,
 
178
                       const unsigned char *old_record, 
 
179
                       const unsigned char *new_record) const;
 
180
  /**
 
181
   * Helper method which returns an initialized Statement
 
182
   * message for methods doing deletion of data.
 
183
   *
 
184
   * @param[in] Pointer to the Session doing the processing
 
185
   * @param[in] Pointer to the Table object being deleted from
 
186
   */
 
187
  message::Statement &getDeleteStatement(Session *in_session,
 
188
                                         Table *in_table) const;
 
189
 
 
190
  /**
 
191
   * Helper method which initializes the header message for
 
192
   * insert operations.
 
193
   *
 
194
   * @param[inout] Statement message container to modify
 
195
   * @param[in] Pointer to the Session doing the processing
 
196
   * @param[in] Pointer to the Table being deleted from
 
197
   */
 
198
  void setDeleteHeader(message::Statement &statement,
 
199
                       Session *in_session,
 
200
                       Table *in_table) const;
 
201
  /**
 
202
   * Helper method which pushes a constructed message out
 
203
   * to the registered replicator and applier plugins.
 
204
   *
92
205
   * @param Message to push out
93
206
   */
94
 
  plugin::ReplicationReturnCode pushTransactionMessage(Session &in_session,
95
 
                                                       message::Transaction &to_push);
 
207
  void push(drizzled::message::Transaction &to_push);
 
208
public:
96
209
  /**
97
210
   * Constructor
98
211
   */
114
227
   * a replicator and an applier that are *active*?
115
228
   */
116
229
  bool isActive() const;
117
 
 
118
 
  /**
119
 
   * Returns the list of replication streams
120
 
   */
121
 
  ReplicationStreams &getReplicationStreams();
122
 
 
123
230
  /**
124
231
   * Attaches a replicator to our internal collection of
125
232
   * replicators.
126
233
   *
127
234
   * @param Pointer to a replicator to attach/register
128
235
   */
129
 
  void attachReplicator(plugin::TransactionReplicator *in_replicator);
130
 
  
 
236
  void attachReplicator(drizzled::plugin::TransactionReplicator *in_replicator);
131
237
  /**
132
238
   * Detaches/unregisters a replicator with our internal
133
239
   * collection of replicators.
134
240
   *
135
241
   * @param Pointer to the replicator to detach
136
242
   */
137
 
  void detachReplicator(plugin::TransactionReplicator *in_replicator);
138
 
  
 
243
  void detachReplicator(drizzled::plugin::TransactionReplicator *in_replicator);
139
244
  /**
140
245
   * Attaches a applier to our internal collection of
141
246
   * appliers.
142
247
   *
143
248
   * @param Pointer to a applier to attach/register
144
 
   * @param The name of the replicator to pair with
145
249
   */
146
 
  void attachApplier(plugin::TransactionApplier *in_applier, const std::string &requested_replicator);
147
 
  
 
250
  void attachApplier(drizzled::plugin::TransactionApplier *in_applier);
148
251
  /**
149
252
   * Detaches/unregisters a applier with our internal
150
253
   * collection of appliers.
151
254
   *
152
255
   * @param Pointer to the applier to detach
153
256
   */
154
 
  void detachApplier(plugin::TransactionApplier *in_applier);
155
 
 
156
 
  /** 
157
 
   * Returns the timestamp of the last Transaction which was sent to an
158
 
   * applier.
 
257
  void detachApplier(drizzled::plugin::TransactionApplier *in_applier);
 
258
  /**
 
259
   * Creates a new Transaction GPB message and attaches the message
 
260
   * to the supplied session object.
 
261
   *
 
262
   * @note
 
263
   *
 
264
   * This method is called when a "normal" transaction -- i.e. an 
 
265
   * explicitly-started transaction from a client -- is started with 
 
266
   * BEGIN or START TRANSACTION.
 
267
   *
 
268
   * @param Pointer to the Session starting the transaction
 
269
   */
 
270
  void startNormalTransaction(Session *in_session);
 
271
  /**
 
272
   * Commits a normal transaction (see above) and pushes the
 
273
   * transaction message out to the replicators.
 
274
   *
 
275
   * @param Pointer to the Session committing the transaction
 
276
   */
 
277
  void commitNormalTransaction(Session *in_session);
 
278
  /**
 
279
   * Marks the current active transaction message as being rolled
 
280
   * back and pushes the transaction message out to replicators.
 
281
   *
 
282
   * @param Pointer to the Session committing the transaction
 
283
   */
 
284
  void rollbackTransaction(Session *in_session);
 
285
  /**
 
286
   * Finalizes a Statement message and sets the Session's statement
 
287
   * message to NULL.
 
288
   *
 
289
   * @param The statement to initialize
 
290
   * @param The session processing this statement
 
291
   */
 
292
  void finalizeStatement(drizzled::message::Statement &statement,
 
293
                         Session *in_session) const;
 
294
  /**
 
295
   * Creates a new InsertRecord GPB message and pushes it to
 
296
   * replicators.
 
297
   *
 
298
   * @param Pointer to the Session which has inserted a record
 
299
   * @param Pointer to the Table containing insert information
 
300
   */
 
301
  void insertRecord(Session *in_session, Table *in_table);
 
302
  /**
 
303
   * Creates a new UpdateRecord GPB message and pushes it to
 
304
   * replicators.
 
305
   *
 
306
   * @param Pointer to the Session which has updated a record
 
307
   * @param Pointer to the Table containing update information
 
308
   * @param Pointer to the raw bytes representing the old record/row
 
309
   * @param Pointer to the raw bytes representing the new record/row 
 
310
   */
 
311
  void updateRecord(Session *in_session, 
 
312
                    Table *in_table, 
 
313
                    const unsigned char *old_record, 
 
314
                    const unsigned char *new_record);
 
315
  /**
 
316
   * Creates a new DeleteRecord GPB message and pushes it to
 
317
   * replicators.
 
318
   *
 
319
   * @param Pointer to the Session which has deleted a record
 
320
   * @param Pointer to the Table containing delete information
 
321
   */
 
322
  void deleteRecord(Session *in_session, Table *in_table);
 
323
  /**
 
324
   * Creates a new RawSql GPB message and pushes it to 
 
325
   * replicators.
 
326
   *
 
327
   * @TODO With a real data dictionary, this really shouldn't
 
328
   * be needed.  CREATE TABLE would map to insertRecord call
 
329
   * on the I_S, etc.  Not sure what to do with administrative
 
330
   * commands like CHECK TABLE, though..
 
331
   *
 
332
   * @param Pointer to the Session which issued the statement
 
333
   * @param Query string
 
334
   * @param Length of the query string
 
335
   */
 
336
  void rawStatement(Session *in_session, const char *in_query, size_t in_query_len);
 
337
  /**
 
338
   * Returns the timestamp of the last Transaction which was sent to 
 
339
   * an applier.
159
340
   */
160
341
  uint64_t getLastAppliedTimestamp() const;
161
 
private:
162
 
  typedef std::vector<plugin::TransactionReplicator *> Replicators;
163
 
  typedef std::vector<std::pair<std::string, plugin::TransactionApplier *> > Appliers;
164
 
  /** 
165
 
   * Atomic boolean set to true if any *active* replicators
166
 
   * or appliers are actually registered.
167
 
   */
168
 
  bool is_active;
169
 
  /**
170
 
   * The timestamp of the last time a Transaction message was successfully
171
 
   * applied (sent to an Applier)
172
 
   */
173
 
  atomic<uint64_t> last_applied_timestamp;
174
 
  /** Our collection of registered replicator plugins */
175
 
  Replicators replicators;
176
 
  /** Our collection of registered applier plugins and their requested replicator plugin names */
177
 
  Appliers appliers;
178
 
  /** Our replication streams */
179
 
  ReplicationStreams replication_streams;
180
 
  /**
181
 
   * Strips underscores and lowercases supplied replicator name
182
 
   * or requested name, and appends the suffix "replicator" if missing...
183
 
   */
184
 
  void normalizeReplicatorName(std::string &name);
185
342
};
186
343
 
187
 
} /* namespace drizzled */
 
344
} /* end namespace drizzled */
188
345
 
189
346
#endif /* DRIZZLED_REPLICATION_SERVICES_H */