~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/replicator/replicator.cc

  • Committer: Brian Aker
  • Date: 2008-12-10 02:15:50 UTC
  • Revision ID: brian@tangent.org-20081210021550-1r8uoxfmsrbvcj61
Cleaned up events for writing in replication (using simple file
format...length + block)

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
 
16
16
#define DRIZZLE_SERVER 1 /* for session variable max_allowed_packet */
17
17
#include <drizzled/server_includes.h>
 
18
#include <drizzled/gettext.h>
18
19
#include <drizzled/session.h>
19
20
#include <drizzled/error.h>
20
21
#include <drizzled/item/strfunc.h>
21
22
#include <drizzled/plugin_replicator.h>
22
 
 
23
 
static char anchor[100];
24
 
 
25
 
static bool statement(Session *, const char *query, size_t query_length)
 
23
#include <drizzled/serialize/serialize.h>
 
24
 
 
25
#include <iostream>
 
26
#include <fstream>
 
27
#include <string>
 
28
using namespace std;
 
29
 
 
30
static bool isEnabled;
 
31
static char *log_directory= NULL;
 
32
int log_file= -1;
 
33
 
 
34
static bool write_to_disk(int file, drizzle::EventList *list)
26
35
{
27
 
  fprintf(stderr, "STATEMENT: %.*s\n", (uint32_t)query_length, query);
 
36
  std::string buffer;
 
37
  uint64_t length;
 
38
  size_t written;
 
39
 
 
40
  list->SerializePartialToString(&buffer);
 
41
 
 
42
  length= buffer.length();
 
43
 
 
44
  cout << "Writing record of " << length << "." << endl;
 
45
 
 
46
  if ((written= write(file, &length, sizeof(uint64_t))) != sizeof(uint64_t))
 
47
  {
 
48
    cerr << "Only wrote " << written << " out of " << length << "." << endl;
 
49
    return true;
 
50
  }
 
51
 
 
52
  if ((written= write(file, buffer.c_str(), length)) != length)
 
53
  {
 
54
    cerr << "Only wrote " << written << " out of " << length << "." << endl;
 
55
    return true;
 
56
  }
28
57
 
29
58
  return false;
30
59
}
31
60
 
 
61
static bool statement(Session *session, const char *query, size_t)
 
62
{
 
63
  using namespace drizzle;
 
64
 
 
65
  drizzle::EventList list;
 
66
 
 
67
  if (isEnabled == false)
 
68
    return false;
 
69
  cerr << "Got into statement" <<endl;
 
70
 
 
71
  drizzle::Event *record= list.add_event();
 
72
  record->set_type(Event::DDL);
 
73
  record->set_autocommit(true);
 
74
  record->set_server_id("localhost");
 
75
  record->set_query_id(10);
 
76
  record->set_transaction_id("junk");
 
77
  record->set_schema(session->db);
 
78
  record->set_sql(query);
 
79
 
 
80
  return write_to_disk(log_file, &list);
 
81
}
 
82
 
32
83
static bool session_init(Session *session)
33
84
{
34
 
  fprintf(stderr, "Starting Session\n");
35
 
  session->setReplicationData(anchor);
 
85
  using namespace drizzle;
 
86
 
 
87
  if (isEnabled == false)
 
88
    return false;
 
89
 
 
90
  drizzle::EventList *list= new drizzle::EventList;
 
91
  session->setReplicationData(list);
 
92
 
 
93
  drizzle::Event *record= list->add_event();
 
94
 
 
95
  record->set_type(Event::DDL);
 
96
  record->set_autocommit(true);
 
97
  record->set_server_id("localhost");
 
98
  record->set_query_id(10);
 
99
  record->set_transaction_id("junk");
 
100
  record->set_schema(session->db);
 
101
  record->set_sql("BEGIN");
36
102
 
37
103
  return false;
38
104
}
39
105
 
40
106
static bool row_insert(Session *session, Table *)
41
107
{
42
 
  fprintf(stderr, "INSERT: %.*s\n", (uint32_t)session->query_length, session->query);
 
108
  using namespace drizzle;
 
109
 
 
110
  if (isEnabled == false)
 
111
    return false;
 
112
 
 
113
  drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
 
114
  drizzle::Event *record= list->add_event();
 
115
 
 
116
  record->set_type(Event::INSERT);
 
117
  record->set_autocommit(true);
 
118
  record->set_server_id("localhost");
 
119
  record->set_query_id(10);
 
120
  record->set_transaction_id("junk");
 
121
  record->set_schema(session->db);
 
122
  record->set_sql(session->query);
43
123
 
44
124
  return false;
45
125
}
48
128
                          const unsigned char *, 
49
129
                          const unsigned char *)
50
130
{
51
 
  fprintf(stderr, "UPDATE: %.*s\n", (uint32_t)session->query_length, session->query);
 
131
  using namespace drizzle;
 
132
 
 
133
  if (isEnabled == false)
 
134
    return false;
 
135
 
 
136
  drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
 
137
  drizzle::Event *record= list->add_event();
 
138
 
 
139
  record->set_type(Event::UPDATE);
 
140
  record->set_autocommit(true);
 
141
  record->set_server_id("localhost");
 
142
  record->set_query_id(10);
 
143
  record->set_transaction_id("junk");
 
144
  record->set_schema(session->db);
 
145
  record->set_sql(session->query);
52
146
 
53
147
  return false;
54
148
}
55
149
 
56
150
static bool row_delete(Session *session, Table *)
57
151
{
58
 
  fprintf(stderr, "DELETE: %.*s\n", (uint32_t)session->query_length, session->query);
 
152
  using namespace drizzle;
 
153
 
 
154
  if (isEnabled == false)
 
155
    return false;
 
156
 
 
157
  drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
 
158
  drizzle::Event *record= list->add_event();
 
159
 
 
160
  record->set_type(Event::DELETE);
 
161
  record->set_autocommit(true);
 
162
  record->set_server_id("localhost");
 
163
  record->set_query_id(10);
 
164
  record->set_transaction_id("junk");
 
165
  record->set_schema(session->db);
 
166
  record->set_sql(session->query);
59
167
 
60
168
  return false;
61
169
}
62
170
 
63
171
static bool end_transaction(Session *session, bool autocommit, bool commit)
64
172
{
 
173
  bool error;
 
174
  using namespace drizzle;
 
175
 
 
176
  if (isEnabled == false)
 
177
    return false;
 
178
 
 
179
  cerr << "Got into end" <<endl;
 
180
 
 
181
  drizzle::EventList *list= (drizzle::EventList *)session->getReplicationData();
 
182
  drizzle::Event *record= list->add_event();
 
183
 
 
184
  record->set_type(Event::DELETE);
 
185
  record->set_autocommit(true);
 
186
  record->set_server_id("localhost");
 
187
  record->set_query_id(10);
 
188
  record->set_transaction_id("junk");
 
189
  record->set_schema(session->db);
 
190
 
65
191
  if (commit)
66
192
  {
67
193
    if (autocommit)
68
 
      fprintf(stderr, "COMMIT\n");
 
194
      record->set_sql("COMMIT");
69
195
    else
70
 
      fprintf(stderr, "AUTOCOMMIT\n");
 
196
      record->set_sql("AUTOCOMMIT");
71
197
  }
72
198
  else
73
 
    fprintf(stderr, "ROLLBACK\n");
 
199
    record->set_sql("ROLLBACK");
 
200
 
 
201
  error= write_to_disk(log_file, list);
74
202
 
75
203
  session->setReplicationData(NULL);
 
204
  delete(list);
76
205
 
77
 
  return false;
 
206
  return error;
78
207
}
79
208
 
80
209
static int init(void *p)
88
217
  repl->row_update= row_update;
89
218
  repl->end_transaction= end_transaction;
90
219
 
91
 
  return 0;
92
 
}
 
220
 
 
221
  if (isEnabled)
 
222
  {
 
223
    using std::string;
 
224
    string logname;
 
225
 
 
226
    logname.append(log_directory ? log_directory : "/tmp");
 
227
    logname.append("/replication_log");
 
228
 
 
229
    if ((log_file= open(logname.c_str(), O_TRUNC|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
 
230
    {
 
231
      cerr << "Can not open file: " << logname.c_str() << endl;
 
232
      exit(0);
 
233
    }
 
234
  }
 
235
 
 
236
  return 0;
 
237
}
 
238
 
 
239
static int deinit(void *)
 
240
{
 
241
  if (log_file != -1)
 
242
    close(log_file);
 
243
 
 
244
  return 0;
 
245
}
 
246
 
 
247
static DRIZZLE_SYSVAR_BOOL(
 
248
  enabled,
 
249
  isEnabled,
 
250
  PLUGIN_VAR_NOCMDARG,
 
251
  N_("Enable Replicator"),
 
252
  NULL, /* check func */
 
253
  NULL, /* update func */
 
254
  false /* default */);
 
255
 
 
256
static DRIZZLE_SYSVAR_STR(
 
257
  directory,
 
258
  log_directory,
 
259
  PLUGIN_VAR_READONLY,
 
260
  N_("Directory to place replication logs."),
 
261
  NULL, /* check func */
 
262
  NULL, /* update func*/
 
263
  NULL /* default */);
 
264
 
 
265
static struct st_mysql_sys_var* system_variables[]= {
 
266
  DRIZZLE_SYSVAR(directory),
 
267
  DRIZZLE_SYSVAR(enabled),
 
268
  NULL,
 
269
};
93
270
 
94
271
mysql_declare_plugin(replicator)
95
272
{
100
277
  "Basic replication module",
101
278
  PLUGIN_LICENSE_GPL,
102
279
  init, /* Plugin Init */
103
 
  NULL, /* Plugin Deinit */
 
280
  deinit, /* Plugin Deinit */
104
281
  NULL,   /* status variables */
105
 
  NULL,   /* system variables */
 
282
  system_variables,   /* system variables */
106
283
  NULL    /* config options */
107
284
}
108
285
mysql_declare_plugin_end;