~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/command_log/command_log.cc

Merging the latest from Jay's replication branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
 *
24
24
 * Defines the implementation of the default command log.
25
25
 *
26
 
 * @see drizzled/plugin/replicator.h
27
 
 * @see drizzled/plugin/applier.h
 
26
 * @see drizzled/plugin/command_replicator.h
 
27
 * @see drizzled/plugin/command_applier.h
28
28
 *
29
29
 * @details
30
30
 *
57
57
#include <drizzled/session.h>
58
58
#include <drizzled/set_var.h>
59
59
#include <drizzled/gettext.h>
60
 
#include <drizzled/message/transaction.pb.h>
 
60
#include <drizzled/message/replication.pb.h>
61
61
 
62
62
#include <vector>
63
63
#include <string>
64
64
#include <unistd.h>
65
65
 
66
66
using namespace std;
 
67
using namespace drizzled;
67
68
 
68
69
/** 
69
70
 * Command Log plugin system variable - Is the log enabled? Only used on init().  
79
80
 
80
81
CommandLog::CommandLog(const char *in_log_file_path)
81
82
  : 
82
 
    drizzled::plugin::Applier(),
83
 
    state(CommandLog::OFFLINE),
 
83
    plugin::CommandApplier(),
 
84
    state(OFFLINE),
84
85
    log_file_path(in_log_file_path)
85
86
{
86
87
  is_enabled= true; /* If constructed, the plugin is enabled until taken offline with disable() */
90
91
  log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
91
92
  if (log_file == -1)
92
93
  {
93
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file.  Got error: %s"), strerror(errno));
 
94
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file %s.  Got error: %s\n"), log_file_path, strerror(errno));
94
95
    is_active= false;
95
96
    return;
96
97
  }
101
102
   */
102
103
  log_offset= lseek(log_file, 0, SEEK_END);
103
104
 
104
 
  state= CommandLog::ONLINE;
 
105
  state= ONLINE;
105
106
  is_active= true;
106
107
}
107
108
 
119
120
  return is_enabled && is_active;
120
121
}
121
122
 
122
 
void CommandLog::apply(drizzled::message::Command *to_apply)
 
123
void CommandLog::apply(const message::Command &to_apply)
123
124
{
124
 
  std::string buffer; /* Buffer we will write serialized command to */
 
125
  /* 
 
126
   * There is an issue on Solaris/SunStudio where if the std::string buffer is
 
127
   * NOT initialized with the below, the code produces an EFAULT when accessing
 
128
   * c_str() later on.  Stoopid, but true.
 
129
   */
 
130
  string buffer(""); /* Buffer we will write serialized command to */
 
131
 
125
132
  uint64_t length;
126
133
  ssize_t written;
127
134
  off_t cur_offset;
128
135
 
129
 
  to_apply->SerializeToString(&buffer);
 
136
  to_apply.SerializeToString(&buffer);
130
137
 
131
 
  /* We force to uint64_t since this is what is reserved as the length header in the written serial log */
 
138
  /* We force to uint64_t since this is what is reserved as the length header in the written log */
132
139
  length= (uint64_t) buffer.length(); 
133
140
 
134
141
  /*
135
142
   * Do an atomic increment on the offset of the log file position
136
143
   */
137
144
  cur_offset= log_offset.fetch_and_add((off_t) (sizeof(uint64_t) + length));
138
 
  /** 
139
 
   * @TODO
140
 
   *
141
 
   * Not sure about the following problem:
142
 
   *
143
 
   * If log_offset is incremented by thread 2 *before* cur_offset
144
 
   * is assigned to the log_offset value, then thread 2's write will
145
 
   * clobber thread 1's write since the cur_offset will be wrong.
146
 
   *
147
 
   * Do we need to do the following check?
148
 
   *
149
 
   * if (unlikely(cur_offset != log_offset))
150
 
   * {
151
 
   *   usleep(random_time_period);
152
 
   *   restart from beginning...
153
 
   * }
154
 
   */
 
145
 
155
146
  /*
156
147
   * We adjust cur_offset back to the original log_offset before
157
148
   * the increment above...
163
154
   * not be active, therefore a caller could have been ready
164
155
   * to write...but the log is crashed.
165
156
   */
166
 
  if (unlikely(state == CommandLog::CRASHED))
 
157
  if (unlikely(state == CRASHED))
167
158
    return;
168
159
 
169
160
  /* We always write in network byte order */
170
161
  unsigned char nbo_length[8];
171
 
#ifdef WORDS_BIGENDIAN
172
 
  int8store(&nbo_length, length);
173
 
#else
174
 
  int64_tstore(&nbo_length, length);
175
 
#endif
 
162
  int8store(nbo_length, length);
176
163
 
177
164
  do
178
165
  {
179
 
    written= pwrite(log_file, &length, sizeof(uint64_t), cur_offset);
 
166
    written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
180
167
  }
181
168
  while (written == EINTR); /* Just retry the write when interrupted by a signal... */
182
169
 
 
170
  if (unlikely(written != sizeof(uint64_t)))
 
171
  {
 
172
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
173
                  _("Failed to write full size of command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
174
                  (int64_t) length, 
 
175
                  (int64_t) cur_offset,
 
176
                  (int64_t) written, 
 
177
                  strerror(errno));
 
178
    state= CRASHED;
 
179
    /* 
 
180
     * Reset the log's offset in case we want to produce a decent error message including
 
181
     * the original offset where an error occurred.
 
182
     */
 
183
    log_offset= cur_offset;
 
184
    is_active= false;
 
185
    return;
 
186
  }
 
187
 
183
188
  cur_offset+= (off_t) written;
184
 
  if (unlikely(written != sizeof(uint64_t)))
185
 
  {
186
 
    errmsg_printf(ERRMSG_LVL_ERROR, 
187
 
                  _("Failed to write full size of command.  Tried to write %" PRId64 ", but only wrote %" PRId64 ".  Error: %s"), 
188
 
                  (int64_t) length, 
189
 
                  (int64_t) written, 
190
 
                  strerror(errno));
191
 
    state= CRASHED;
192
 
    /* 
193
 
     * Reset the log's offset in case we want to produce a decent error message including
194
 
     * the original offset where an error occurred.
195
 
     */
196
 
    log_offset= cur_offset;
197
 
    is_active= false;
198
 
    return;
199
 
  }
200
189
 
201
190
  /* 
202
191
   * Quick safety...if an error occurs above in another writer, the log 
203
192
   * file will be in a crashed state.
204
193
   */
205
 
  if (unlikely(state == CommandLog::CRASHED))
 
194
  if (unlikely(state == CRASHED))
206
195
  {
207
196
    /* 
208
197
     * Reset the log's offset in case we want to produce a decent error message including
221
210
  if (unlikely(written != (ssize_t) length))
222
211
  {
223
212
    errmsg_printf(ERRMSG_LVL_ERROR, 
224
 
                  _("Failed to write full serialized command.  Tried to write %" PRId64 ", but only wrote %" PRId64 ".  Error: %s"), 
 
213
                  _("Failed to write full serialized command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
225
214
                  (int64_t) length, 
 
215
                  (int64_t) cur_offset,
226
216
                  (int64_t) written, 
227
217
                  strerror(errno));
228
218
    state= CRASHED;
262
252
  is_enabled= orig_is_enabled;
263
253
}
264
254
 
 
255
bool CommandLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
 
256
                                                        string &out_filename) const
 
257
{
 
258
  /* 
 
259
   * Currently, we simply return the single logfile name
 
260
   * Eventually, we'll have an index/hash with upper and
 
261
   * lower bounds to look up a log file with a transaction id
 
262
   */
 
263
  out_filename.assign(log_file_path);
 
264
  return true;
 
265
}
 
266
 
265
267
static CommandLog *command_log= NULL; /* The singleton command log */
266
268
 
267
269
static int init(PluginRegistry &registry)
331
333
  "command_log",
332
334
  "0.1",
333
335
  "Jay Pipes",
334
 
  N_("Simple Command Message Log"),
 
336
  N_("Command Message Log"),
335
337
  PLUGIN_LICENSE_GPL,
336
338
  init, /* Plugin Init */
337
339
  deinit, /* Plugin Deinit */