~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/protobuf_replicator/protobuf_replicator.cc

  • Committer: Brian Aker
  • Date: 2009-03-22 21:27:04 UTC
  • mfrom: (960.2.22 mordred)
  • Revision ID: brian@tangent.org-20090322212704-ysn4mkkjg2u9kv22
Merge Monty

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
#include <drizzled/gettext.h>
18
18
#include <drizzled/session.h>
19
19
#include <drizzled/error.h>
20
 
#include <drizzled/plugin_replicator.h>
 
20
#include <drizzled/plugin/replicator.h>
21
21
#include <drizzled/serialize/serialize.h>
22
22
 
23
23
#include <iostream>
24
24
#include <fstream>
25
25
#include <string>
 
26
 
26
27
using namespace std;
27
28
 
28
29
static bool isEnabled;
29
30
static char *log_directory= NULL;
30
 
int log_file= -1;
31
31
 
32
32
static bool write_to_disk(int file, drizzle::EventList *list)
33
33
{
34
 
  std::string buffer;
 
34
  string buffer;
35
35
  size_t length;
36
36
  size_t written;
37
37
 
56
56
  return false;
57
57
}
58
58
 
59
 
static bool statement(Session *session, const char *query, size_t)
60
 
{
61
 
  using namespace drizzle;
62
 
 
63
 
  drizzle::EventList list;
64
 
 
65
 
  if (isEnabled == false)
66
 
    return false;
67
 
  cerr << "Got into statement" <<endl;
68
 
 
69
 
  drizzle::Event *record= list.add_event();
70
 
  record->set_type(Event::DDL);
71
 
  record->set_autocommit(true);
72
 
  record->set_server_id("localhost");
73
 
  record->set_query_id(10);
74
 
  record->set_transaction_id("junk");
75
 
  record->set_schema(session->db);
76
 
  record->set_sql(query);
77
 
 
78
 
  return write_to_disk(log_file, &list);
79
 
}
80
 
 
81
 
static bool session_init(Session *session)
82
 
{
83
 
  using namespace drizzle;
84
 
 
85
 
  if (isEnabled == false)
86
 
    return false;
87
 
 
88
 
  drizzle::EventList *list= new drizzle::EventList;
89
 
  session->setReplicationData(list);
90
 
 
91
 
  drizzle::Event *record= list->add_event();
92
 
 
93
 
  record->set_type(Event::DDL);
94
 
  record->set_autocommit(true);
95
 
  record->set_server_id("localhost");
96
 
  record->set_query_id(10);
97
 
  record->set_transaction_id("junk");
98
 
  record->set_schema(session->db);
99
 
  record->set_sql("BEGIN");
100
 
 
101
 
  return false;
102
 
}
103
 
 
104
 
static bool row_insert(Session *session, Table *)
105
 
{
106
 
  using namespace drizzle;
107
 
 
108
 
  if (isEnabled == false)
109
 
    return false;
110
 
 
111
 
  drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
112
 
  drizzle::Event *record= list->add_event();
113
 
 
114
 
  record->set_type(Event::INSERT);
115
 
  record->set_autocommit(true);
116
 
  record->set_server_id("localhost");
117
 
  record->set_query_id(10);
118
 
  record->set_transaction_id("junk");
119
 
  record->set_schema(session->db);
120
 
  record->set_sql(session->query);
121
 
 
122
 
  return false;
123
 
}
124
 
 
125
 
static bool row_update(Session *session, Table *, 
126
 
                          const unsigned char *, 
127
 
                          const unsigned char *)
128
 
{
129
 
  using namespace drizzle;
130
 
 
131
 
  if (isEnabled == false)
132
 
    return false;
133
 
 
134
 
  drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
135
 
  drizzle::Event *record= list->add_event();
136
 
 
137
 
  record->set_type(Event::UPDATE);
138
 
  record->set_autocommit(true);
139
 
  record->set_server_id("localhost");
140
 
  record->set_query_id(10);
141
 
  record->set_transaction_id("junk");
142
 
  record->set_schema(session->db);
143
 
  record->set_sql(session->query);
144
 
 
145
 
  return false;
146
 
}
147
 
 
148
 
static bool row_delete(Session *session, Table *)
149
 
{
150
 
  using namespace drizzle;
151
 
 
152
 
  if (isEnabled == false)
153
 
    return false;
154
 
 
155
 
  drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
156
 
  drizzle::Event *record= list->add_event();
157
 
 
158
 
  record->set_type(Event::DELETE);
159
 
  record->set_autocommit(true);
160
 
  record->set_server_id("localhost");
161
 
  record->set_query_id(10);
162
 
  record->set_transaction_id("junk");
163
 
  record->set_schema(session->db);
164
 
  record->set_sql(session->query);
165
 
 
166
 
  return false;
167
 
}
168
 
 
169
 
static bool end_transaction(Session *session, bool autocommit, bool commit)
170
 
{
171
 
  bool error;
172
 
  using namespace drizzle;
173
 
 
174
 
  if (isEnabled == false)
175
 
    return false;
176
 
 
177
 
  cerr << "Got into end" <<endl;
178
 
 
179
 
  drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
180
 
  drizzle::Event *record= list->add_event();
181
 
 
182
 
  record->set_type(Event::DELETE);
183
 
  record->set_autocommit(true);
184
 
  record->set_server_id("localhost");
185
 
  record->set_query_id(10);
186
 
  record->set_transaction_id("junk");
187
 
  record->set_schema(session->db);
188
 
 
189
 
  if (commit)
190
 
  {
191
 
    if (autocommit)
192
 
      record->set_sql("COMMIT");
 
59
class Protobuf_replicator: public Replicator
 
60
{
 
61
  int log_file;
 
62
public:
 
63
  Protobuf_replicator() : log_file(-1)
 
64
  {
 
65
    if (isEnabled)
 
66
    {
 
67
      string logname;
 
68
  
 
69
      logname.append(log_directory ? log_directory : "/tmp");
 
70
      logname.append("/replication_log");
 
71
  
 
72
      if ((log_file= open(logname.c_str(), O_TRUNC|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
 
73
      {
 
74
        cerr << "Can not open file: " << logname.c_str() << endl;
 
75
        log_file= -1;
 
76
        isEnabled= false;
 
77
      }
 
78
    }
 
79
  }
 
80
 
 
81
  ~Protobuf_replicator()
 
82
  {
 
83
    if (log_file != -1)
 
84
      close(log_file);
 
85
  }
 
86
 
 
87
  virtual bool statement_hook(Session *session, const char *query, size_t)
 
88
  {
 
89
    using namespace drizzle;
 
90
  
 
91
    drizzle::EventList list;
 
92
  
 
93
    if (isEnabled == false)
 
94
      return false;
 
95
    cerr << "Got into statement" <<endl;
 
96
  
 
97
    drizzle::Event *record= list.add_event();
 
98
    record->set_type(Event::DDL);
 
99
    record->set_autocommit(true);
 
100
    record->set_server_id("localhost");
 
101
    record->set_query_id(10);
 
102
    record->set_transaction_id("junk");
 
103
    record->set_schema(session->db);
 
104
    record->set_sql(query);
 
105
  
 
106
    return write_to_disk(log_file, &list);
 
107
  }
 
108
  
 
109
  virtual bool session_init_hook(Session *session)
 
110
  {
 
111
    using namespace drizzle;
 
112
  
 
113
    if (isEnabled == false)
 
114
      return false;
 
115
  
 
116
    drizzle::EventList *list= new drizzle::EventList;
 
117
    session->setReplicationData(list);
 
118
  
 
119
    drizzle::Event *record= list->add_event();
 
120
  
 
121
    record->set_type(Event::DDL);
 
122
    record->set_autocommit(true);
 
123
    record->set_server_id("localhost");
 
124
    record->set_query_id(10);
 
125
    record->set_transaction_id("junk");
 
126
    record->set_schema(session->db);
 
127
    record->set_sql("BEGIN");
 
128
  
 
129
    return false;
 
130
  }
 
131
  
 
132
  virtual bool row_insert_hook(Session *session, Table *)
 
133
  {
 
134
    using namespace drizzle;
 
135
  
 
136
    if (isEnabled == false)
 
137
      return false;
 
138
  
 
139
    drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
 
140
    drizzle::Event *record= list->add_event();
 
141
  
 
142
    record->set_type(Event::INSERT);
 
143
    record->set_autocommit(true);
 
144
    record->set_server_id("localhost");
 
145
    record->set_query_id(10);
 
146
    record->set_transaction_id("junk");
 
147
    record->set_schema(session->db);
 
148
    record->set_sql(session->query);
 
149
  
 
150
    return false;
 
151
  }
 
152
  
 
153
  virtual bool row_update_hook(Session *session, Table *, 
 
154
                               const unsigned char *, 
 
155
                               const unsigned char *)
 
156
  {
 
157
    using namespace drizzle;
 
158
  
 
159
    if (isEnabled == false)
 
160
      return false;
 
161
  
 
162
    drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
 
163
    drizzle::Event *record= list->add_event();
 
164
  
 
165
    record->set_type(Event::UPDATE);
 
166
    record->set_autocommit(true);
 
167
    record->set_server_id("localhost");
 
168
    record->set_query_id(10);
 
169
    record->set_transaction_id("junk");
 
170
    record->set_schema(session->db);
 
171
    record->set_sql(session->query);
 
172
  
 
173
    return false;
 
174
  }
 
175
  
 
176
  virtual bool row_delete_hook(Session *session, Table *)
 
177
  {
 
178
    using namespace drizzle;
 
179
  
 
180
    if (isEnabled == false)
 
181
      return false;
 
182
  
 
183
    drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
 
184
    drizzle::Event *record= list->add_event();
 
185
  
 
186
    record->set_type(Event::DELETE);
 
187
    record->set_autocommit(true);
 
188
    record->set_server_id("localhost");
 
189
    record->set_query_id(10);
 
190
    record->set_transaction_id("junk");
 
191
    record->set_schema(session->db);
 
192
    record->set_sql(session->query);
 
193
  
 
194
    return false;
 
195
  }
 
196
  
 
197
  virtual bool end_transaction_hook(Session *session,
 
198
                                    bool autocommit, bool commit)
 
199
  {
 
200
    bool error;
 
201
    using namespace drizzle;
 
202
  
 
203
    if (isEnabled == false)
 
204
      return false;
 
205
  
 
206
    cerr << "Got into end" <<endl;
 
207
  
 
208
    drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
 
209
    drizzle::Event *record= list->add_event();
 
210
  
 
211
    record->set_type(Event::DELETE);
 
212
    record->set_autocommit(true);
 
213
    record->set_server_id("localhost");
 
214
    record->set_query_id(10);
 
215
    record->set_transaction_id("junk");
 
216
    record->set_schema(session->db);
 
217
  
 
218
    if (commit)
 
219
    {
 
220
      if (autocommit)
 
221
        record->set_sql("COMMIT");
 
222
      else
 
223
        record->set_sql("AUTOCOMMIT");
 
224
    }
193
225
    else
194
 
      record->set_sql("AUTOCOMMIT");
 
226
      record->set_sql("ROLLBACK");
 
227
  
 
228
    error= write_to_disk(log_file, list);
 
229
  
 
230
    session->setReplicationData(NULL);
 
231
    delete(list);
 
232
  
 
233
    return error;
195
234
  }
196
 
  else
197
 
    record->set_sql("ROLLBACK");
198
 
 
199
 
  error= write_to_disk(log_file, list);
200
 
 
201
 
  session->setReplicationData(NULL);
202
 
  delete(list);
203
 
 
204
 
  return error;
205
 
}
 
235
};
206
236
 
207
237
static int init(void *p)
208
238
{
209
 
  replicator_t *repl = (replicator_t *)p;
210
 
 
211
 
  repl->statement= statement;
212
 
  repl->session_init= session_init;
213
 
  repl->row_insert= row_insert;
214
 
  repl->row_delete= row_delete;
215
 
  repl->row_update= row_update;
216
 
  repl->end_transaction= end_transaction;
217
 
 
218
 
 
219
 
  if (isEnabled)
220
 
  {
221
 
    using std::string;
222
 
    string logname;
223
 
 
224
 
    logname.append(log_directory ? log_directory : "/tmp");
225
 
    logname.append("/replication_log");
226
 
 
227
 
    if ((log_file= open(logname.c_str(), O_TRUNC|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
228
 
    {
229
 
      cerr << "Can not open file: " << logname.c_str() << endl;
230
 
      exit(0);
231
 
    }
232
 
  }
 
239
  Replicator **repl = static_cast<Replicator **>(p);
 
240
 
 
241
  *repl= new Protobuf_replicator();
233
242
 
234
243
  return 0;
235
244
}
236
245
 
237
 
static int deinit(void *)
 
246
static int deinit(void *p)
238
247
{
239
 
  if (log_file != -1)
240
 
    close(log_file);
 
248
  Protobuf_replicator *repl = static_cast<Protobuf_replicator *>(p);
 
249
 
 
250
  delete repl;
241
251
 
242
252
  return 0;
243
253
}
266
276
  NULL,
267
277
};
268
278
 
269
 
drizzle_declare_plugin(replicator)
 
279
drizzle_declare_plugin(protobuf_replicator)
270
280
{
271
281
  DRIZZLE_REPLICATOR_PLUGIN,
272
282
  "replicator",