24
24
* Defines the implementation of the default command log.
26
* @see drizzled/plugin/replicator.h
27
* @see drizzled/plugin/applier.h
26
* @see drizzled/plugin/command_replicator.h
27
* @see drizzled/plugin/command_applier.h
57
57
#include <drizzled/session.h>
58
58
#include <drizzled/set_var.h>
59
59
#include <drizzled/gettext.h>
60
#include <drizzled/message/transaction.pb.h>
60
#include <drizzled/message/replication.pb.h>
64
64
#include <unistd.h>
66
66
using namespace std;
67
using namespace drizzled;
69
70
* Command Log plugin system variable - Is the log enabled? Only used on init().
80
81
CommandLog::CommandLog(const char *in_log_file_path)
82
drizzled::plugin::Applier(),
83
state(CommandLog::OFFLINE),
83
plugin::CommandApplier(),
84
85
log_file_path(in_log_file_path)
86
87
is_enabled= true; /* If constructed, the plugin is enabled until taken offline with disable() */
90
91
log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
91
92
if (log_file == -1)
93
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file. Got error: %s"), strerror(errno));
94
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file %s. Got error: %s\n"), log_file_path, strerror(errno));
119
120
return is_enabled && is_active;
122
void CommandLog::apply(drizzled::message::Command *to_apply)
123
void CommandLog::apply(const message::Command &to_apply)
124
std::string buffer; /* Buffer we will write serialized command to */
126
* There is an issue on Solaris/SunStudio where if the std::string buffer is
127
* NOT initialized with the below, the code produces an EFAULT when accessing
128
* c_str() later on. Stoopid, but true.
130
string buffer(""); /* Buffer we will write serialized command to */
127
134
off_t cur_offset;
129
to_apply->SerializeToString(&buffer);
136
to_apply.SerializeToString(&buffer);
131
/* We force to uint64_t since this is what is reserved as the length header in the written serial log */
138
/* We force to uint64_t since this is what is reserved as the length header in the written log */
132
139
length= (uint64_t) buffer.length();
135
142
* Do an atomic increment on the offset of the log file position
137
144
cur_offset= log_offset.fetch_and_add((off_t) (sizeof(uint64_t) + length));
141
* Not sure about the following problem:
143
* If log_offset is incremented by thread 2 *before* cur_offset
144
* is assigned to the log_offset value, then thread 2's write will
145
* clobber thread 1's write since the cur_offset will be wrong.
147
* Do we need to do the following check?
149
* if (unlikely(cur_offset != log_offset))
151
* usleep(random_time_period);
152
* restart from beginning...
156
147
* We adjust cur_offset back to the original log_offset before
157
148
* the increment above...
163
154
* not be active, therefore a caller could have been ready
164
155
* to write...but the log is crashed.
166
if (unlikely(state == CommandLog::CRASHED))
157
if (unlikely(state == CRASHED))
169
160
/* We always write in network byte order */
170
161
unsigned char nbo_length[8];
171
#ifdef WORDS_BIGENDIAN
172
int8store(&nbo_length, length);
174
int64_tstore(&nbo_length, length);
162
int8store(nbo_length, length);
179
written= pwrite(log_file, &length, sizeof(uint64_t), cur_offset);
166
written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
181
168
while (written == EINTR); /* Just retry the write when interrupted by a signal... */
170
if (unlikely(written != sizeof(uint64_t)))
172
errmsg_printf(ERRMSG_LVL_ERROR,
173
_("Failed to write full size of command. Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
175
(int64_t) cur_offset,
180
* Reset the log's offset in case we want to produce a decent error message including
181
* the original offset where an error occurred.
183
log_offset= cur_offset;
183
188
cur_offset+= (off_t) written;
184
if (unlikely(written != sizeof(uint64_t)))
186
errmsg_printf(ERRMSG_LVL_ERROR,
187
_("Failed to write full size of command. Tried to write %" PRId64 ", but only wrote %" PRId64 ". Error: %s"),
193
* Reset the log's offset in case we want to produce a decent error message including
194
* the original offset where an error occurred.
196
log_offset= cur_offset;
202
191
* Quick safety...if an error occurs above in another writer, the log
203
192
* file will be in a crashed state.
205
if (unlikely(state == CommandLog::CRASHED))
194
if (unlikely(state == CRASHED))
208
197
* Reset the log's offset in case we want to produce a decent error message including
221
210
if (unlikely(written != (ssize_t) length))
223
212
errmsg_printf(ERRMSG_LVL_ERROR,
224
_("Failed to write full serialized command. Tried to write %" PRId64 ", but only wrote %" PRId64 ". Error: %s"),
213
_("Failed to write full serialized command. Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
225
214
(int64_t) length,
215
(int64_t) cur_offset,
226
216
(int64_t) written,
227
217
strerror(errno));
262
252
is_enabled= orig_is_enabled;
255
bool CommandLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
256
string &out_filename) const
259
* Currently, we simply return the single logfile name
260
* Eventually, we'll have an index/hash with upper and
261
* lower bounds to look up a log file with a transaction id
263
out_filename.assign(log_file_path);
265
267
static CommandLog *command_log= NULL; /* The singleton command log */
267
269
static int init(PluginRegistry ®istry)