~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log.cc

  • Committer: Brian Aker
  • Date: 2010-12-08 22:35:56 UTC
  • mfrom: (1819.9.158 update-innobase)
  • Revision ID: brian@tangent.org-20101208223556-37mi4omqg7lkjzf3
Merge in Stewart's changes, 1.3 changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
 
6
 *
 
7
 *  Authors:
 
8
 *
 
9
 *    Jay Pipes <jaypipes@gmail.com.com>
 
10
 *
 
11
 *  This program is free software; you can redistribute it and/or modify
 
12
 *  it under the terms of the GNU General Public License as published by
 
13
 *  the Free Software Foundation; either version 2 of the License, or
 
14
 *  (at your option) any later version.
 
15
 *
 
16
 *  This program is distributed in the hope that it will be useful,
 
17
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
18
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
19
 *  GNU General Public License for more details.
 
20
 *
 
21
 *  You should have received a copy of the GNU General Public License
 
22
 *  along with this program; if not, write to the Free Software
 
23
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
24
 */
 
25
 
 
26
/**
 
27
 * @file
 
28
 *
 
29
 * Defines the implementation of the transaction log file descriptor.
 
30
 *
 
31
 * @details
 
32
 *
 
33
 * Currently, the transaction log file uses a simple, single-file, append-only
 
34
 * format.
 
35
 *
 
36
 * We have an atomic off_t called log_offset which keeps track of the 
 
37
 * offset into the log file for writing the next log entry.  The log
 
38
 * entries are written, one after the other, in the following way:
 
39
 *
 
40
 * <pre>
 
41
 * --------------------------------------
 
42
 * |<- 4 bytes ->|<- # Bytes of Entry ->|
 
43
 * --------------------------------------
 
44
 * |  Entry Type |  Serialized Entry    |
 
45
 * --------------------------------------
 
46
 * </pre>
 
47
 *
 
48
 * The Entry Type is an integer defined as an enumeration in the 
 
49
 * /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
 
50
 *
 
51
 * Each transaction log entry type is written to the log differently.  Here,
 
52
 * we cover the format of each log entry type.
 
53
 *
 
54
 * Committed and Prepared Transaction Log Entries
 
55
 * -----------------------------------------------
 
56
 * 
 
57
 * <pre>
 
58
 * ------------------------------------------------------------------
 
59
 * |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
 
60
 * ------------------------------------------------------------------
 
61
 * |   Length    |   Serialized Transaction Message   |   Checksum  |
 
62
 * ------------------------------------------------------------------
 
63
 * </pre>
 
64
 *
 
65
 * @todo
 
66
 *
 
67
 * Possibly look at a scoreboard approach with multiple file segments.  For
 
68
 * right now, though, this is just a quick simple implementation to serve
 
69
 * as a skeleton and a springboard.
 
70
 */
 
71
 
 
72
#include "config.h"
 
73
#include "transaction_log.h"
 
74
 
 
75
#include <sys/stat.h>
 
76
#include <fcntl.h>
 
77
#include <unistd.h>
 
78
#include <errno.h>
 
79
 
 
80
#include <vector>
 
81
#include <string>
 
82
 
 
83
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
 
84
#include <drizzled/errmsg_print.h>
 
85
#include <drizzled/gettext.h>
 
86
#include <drizzled/message/transaction.pb.h>
 
87
#include <drizzled/transaction_services.h>
 
88
#include <drizzled/algorithm/crc32.h>
 
89
 
 
90
#include <google/protobuf/io/coded_stream.h>
 
91
 
 
92
using namespace std;
 
93
using namespace drizzled;
 
94
using namespace google;
 
95
 
 
96
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
 
97
 
 
98
TransactionLog::TransactionLog(const string in_log_file_path,
 
99
                               uint32_t in_flush_frequency,
 
100
                               bool in_do_checksum) : 
 
101
    state(OFFLINE),
 
102
    log_file_path(in_log_file_path),
 
103
    has_error(false),
 
104
    error_message(),
 
105
    flush_frequency(in_flush_frequency),
 
106
    do_checksum(in_do_checksum)
 
107
{
 
108
  /* Setup our log file and determine the next write offset... */
 
109
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
 
110
  if (log_file == -1)
 
111
  {
 
112
    char errmsg[STRERROR_MAX];
 
113
    strerror_r(errno, errmsg, sizeof(errmsg));
 
114
    error_message.assign(_("Failed to open transaction log file "));
 
115
    error_message.append(log_file_path);
 
116
    error_message.append("  Got error: ");
 
117
    error_message.append(errmsg);
 
118
    error_message.push_back('\n');
 
119
    has_error= true;
 
120
    return;
 
121
  }
 
122
 
 
123
  /* For convenience, grab the log file name from the path */
 
124
  if (log_file_path.find_first_of('/') != string::npos)
 
125
  {
 
126
    /* Strip to last / */
 
127
    string tmp;
 
128
    tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
 
129
    log_file_name.assign(tmp);
 
130
  }
 
131
  else
 
132
    log_file_name.assign(log_file_path);
 
133
 
 
134
  /* 
 
135
   * The offset of the next write is the current position of the log
 
136
   * file, since it's opened in append mode...
 
137
   */
 
138
  log_offset= lseek(log_file, 0, SEEK_END);
 
139
 
 
140
  state= ONLINE;
 
141
}
 
142
 
 
143
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
 
144
                                                     uint8_t *buffer,
 
145
                                                     uint32_t *checksum_out)
 
146
{
 
147
  uint8_t *orig_buffer= buffer;
 
148
  size_t message_byte_length= trx.ByteSize();
 
149
 
 
150
  /*
 
151
   * Write the header information, which is the message type and
 
152
   * the length of the transaction message into the buffer
 
153
   */
 
154
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
 
155
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
 
156
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
 
157
      static_cast<uint32_t>(message_byte_length), buffer);
 
158
  
 
159
  /*
 
160
   * Now write the serialized transaction message, followed
 
161
   * by the optional checksum into the buffer.
 
162
   */
 
163
  buffer= trx.SerializeWithCachedSizesToArray(buffer);
 
164
 
 
165
  if (do_checksum)
 
166
  {
 
167
    *checksum_out= drizzled::algorithm::crc32(
 
168
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
 
169
  }
 
170
  else
 
171
    *checksum_out= 0;
 
172
 
 
173
  /* We always write in network byte order */
 
174
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
 
175
  /* Reset the pointer back to its original location... */
 
176
  buffer= orig_buffer;
 
177
  return orig_buffer;
 
178
}
 
179
 
 
180
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
 
181
{
 
182
  ssize_t written= 0;
 
183
 
 
184
  /*
 
185
   * Do an atomic increment on the offset of the log file position
 
186
   */
 
187
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
 
188
 
 
189
  /* 
 
190
   * Quick safety...if an error occurs above in another writer, the log 
 
191
   * file will be in a crashed state.
 
192
   */
 
193
  if (unlikely(state == CRASHED))
 
194
  {
 
195
    /* 
 
196
     * Reset the log's offset in case we want to produce a decent error message including
 
197
     * the original offset where an error occurred.
 
198
     */
 
199
    log_offset= cur_offset;
 
200
    return log_offset;
 
201
  }
 
202
 
 
203
  /* Write the full buffer in one swoop */
 
204
  do
 
205
  {
 
206
    written= pwrite(log_file, data, data_length, cur_offset);
 
207
  }
 
208
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
209
 
 
210
  if (unlikely(written != static_cast<ssize_t>(data_length)))
 
211
  {
 
212
    char errmsg[STRERROR_MAX];
 
213
    strerror_r(errno, errmsg, sizeof(errmsg));
 
214
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
215
                  _("Failed to write full size of log entry.  Tried to write %" PRId64
 
216
                    " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
 
217
                  static_cast<int64_t>(data_length),
 
218
                  static_cast<int64_t>(cur_offset),
 
219
                  static_cast<int32_t>(written), 
 
220
                  errmsg);
 
221
    state= CRASHED;
 
222
    /* 
 
223
     * Reset the log's offset in case we want to produce a decent error message including
 
224
     * the original offset where an error occurred.
 
225
     */
 
226
    log_offset= cur_offset;
 
227
  }
 
228
 
 
229
  int error_code= syncLogFile();
 
230
 
 
231
  if (unlikely(error_code != 0))
 
232
  {
 
233
    char errmsg[STRERROR_MAX];
 
234
    strerror_r(errno, errmsg, sizeof(errmsg));
 
235
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
236
                  _("Failed to sync log file. Got error: %s\n"), 
 
237
                  errmsg);
 
238
  }
 
239
  return cur_offset;
 
240
}
 
241
 
 
242
int TransactionLog::syncLogFile()
 
243
{
 
244
  switch (flush_frequency)
 
245
  {
 
246
  case FLUSH_FREQUENCY_EVERY_WRITE:
 
247
    return internal::my_sync(log_file, 0);
 
248
  case FLUSH_FREQUENCY_EVERY_SECOND:
 
249
    {
 
250
      time_t now_time= time(NULL);
 
251
      if (last_sync_time <= (now_time - 1))
 
252
      {
 
253
        last_sync_time= now_time;
 
254
        return internal::my_sync(log_file, 0);
 
255
      }
 
256
      return 0;
 
257
    }
 
258
  case FLUSH_FREQUENCY_OS:
 
259
  default:
 
260
    return 0;
 
261
  }
 
262
}
 
263
 
 
264
const string &TransactionLog::getLogFilename()
 
265
{
 
266
  return log_file_name;
 
267
}
 
268
 
 
269
const string &TransactionLog::getLogFilepath()
 
270
{
 
271
  return log_file_path;
 
272
}
 
273
 
 
274
void TransactionLog::truncate()
 
275
{
 
276
  /* 
 
277
   * @note
 
278
   *
 
279
   * This is NOT THREAD SAFE! DEBUG/TEST code only!
 
280
   */
 
281
  log_offset= (off_t) 0;
 
282
  int result;
 
283
  do
 
284
  {
 
285
    result= ftruncate(log_file, log_offset);
 
286
  }
 
287
  while (result == -1 && errno == EINTR);
 
288
}
 
289
 
 
290
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
 
291
                                                            string &out_filename) const
 
292
{
 
293
  /* 
 
294
   * Currently, we simply return the single logfile name
 
295
   * Eventually, we'll have an index/hash with upper and
 
296
   * lower bounds to look up a log file with a transaction id
 
297
   */
 
298
  out_filename.assign(log_file_path);
 
299
  return true;
 
300
}
 
301
 
 
302
bool TransactionLog::hasError() const
 
303
{
 
304
  return has_error;
 
305
}
 
306
 
 
307
void TransactionLog::clearError()
 
308
{
 
309
  has_error= false;
 
310
  error_message.clear();
 
311
}
 
312
 
 
313
const string &TransactionLog::getErrorMessage() const
 
314
{
 
315
  return error_message;
 
316
}
 
317
 
 
318
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
 
319
{
 
320
  return trx.ByteSize() + HEADER_TRAILER_BYTES;
 
321
}