~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log.cc

  • Committer: Monty Taylor
  • Date: 2009-08-12 06:25:19 UTC
  • mto: (1114.1.1 innodb-plugin-merge)
  • mto: This revision was merged to the branch mainline in revision 1183.
  • Revision ID: mordred@inaugust.com-20090812062519-cij02mrrunvnxblt
Tags: innodb-plugin-1.0.4
InnoDB Plugin 1.0.4

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  Authors:
8
 
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com.com>
10
 
 *
11
 
 *  This program is free software; you can redistribute it and/or modify
12
 
 *  it under the terms of the GNU General Public License as published by
13
 
 *  the Free Software Foundation; either version 2 of the License, or
14
 
 *  (at your option) any later version.
15
 
 *
16
 
 *  This program is distributed in the hope that it will be useful,
17
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
18
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19
 
 *  GNU General Public License for more details.
20
 
 *
21
 
 *  You should have received a copy of the GNU General Public License
22
 
 *  along with this program; if not, write to the Free Software
23
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
24
 
 */
25
 
 
26
 
/**
27
 
 * @file
28
 
 *
29
 
 * Defines the implementation of the transaction log file descriptor.
30
 
 *
31
 
 * @details
32
 
 *
33
 
 * Currently, the transaction log file uses a simple, single-file, append-only
34
 
 * format.
35
 
 *
36
 
 * We have an atomic off_t called log_offset which keeps track of the 
37
 
 * offset into the log file for writing the next log entry.  The log
38
 
 * entries are written, one after the other, in the following way:
39
 
 *
40
 
 * <pre>
41
 
 * --------------------------------------
42
 
 * |<- 4 bytes ->|<- # Bytes of Entry ->|
43
 
 * --------------------------------------
44
 
 * |  Entry Type |  Serialized Entry    |
45
 
 * --------------------------------------
46
 
 * </pre>
47
 
 *
48
 
 * The Entry Type is an integer defined as an enumeration in the 
49
 
 * /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
50
 
 *
51
 
 * Each transaction log entry type is written to the log differently.  Here,
52
 
 * we cover the format of each log entry type.
53
 
 *
54
 
 * Committed and Prepared Transaction Log Entries
55
 
 * -----------------------------------------------
56
 
 * 
57
 
 * <pre>
58
 
 * ------------------------------------------------------------------
59
 
 * |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
 
 * ------------------------------------------------------------------
61
 
 * |   Length    |   Serialized Transaction Message   |   Checksum  |
62
 
 * ------------------------------------------------------------------
63
 
 * </pre>
64
 
 *
65
 
 * @todo
66
 
 *
67
 
 * Possibly look at a scoreboard approach with multiple file segments.  For
68
 
 * right now, though, this is just a quick simple implementation to serve
69
 
 * as a skeleton and a springboard.
70
 
 */
71
 
 
72
 
#include "config.h"
73
 
#include "transaction_log.h"
74
 
 
75
 
#include <sys/stat.h>
76
 
#include <fcntl.h>
77
 
#include <unistd.h>
78
 
#include <errno.h>
79
 
 
80
 
#include <vector>
81
 
#include <string>
82
 
 
83
 
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
84
 
#include <drizzled/errmsg_print.h>
85
 
#include <drizzled/gettext.h>
86
 
#include <drizzled/message/transaction.pb.h>
87
 
#include <drizzled/transaction_services.h>
88
 
#include <drizzled/algorithm/crc32.h>
89
 
 
90
 
#include <google/protobuf/io/coded_stream.h>
91
 
 
92
 
using namespace std;
93
 
using namespace drizzled;
94
 
using namespace google;
95
 
 
96
 
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
97
 
 
98
 
TransactionLog::TransactionLog(const string in_log_file_path,
99
 
                               uint32_t in_flush_frequency,
100
 
                               bool in_do_checksum) : 
101
 
    state(OFFLINE),
102
 
    log_file_path(in_log_file_path),
103
 
    has_error(false),
104
 
    error_message(),
105
 
    flush_frequency(in_flush_frequency),
106
 
    do_checksum(in_do_checksum)
107
 
{
108
 
  /* Setup our log file and determine the next write offset... */
109
 
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
110
 
  if (log_file == -1)
111
 
  {
112
 
    char errmsg[STRERROR_MAX];
113
 
    strerror_r(errno, errmsg, sizeof(errmsg));
114
 
    error_message.assign(_("Failed to open transaction log file "));
115
 
    error_message.append(log_file_path);
116
 
    error_message.append("  Got error: ");
117
 
    error_message.append(errmsg);
118
 
    error_message.push_back('\n');
119
 
    has_error= true;
120
 
    return;
121
 
  }
122
 
 
123
 
  /* For convenience, grab the log file name from the path */
124
 
  if (log_file_path.find_first_of('/') != string::npos)
125
 
  {
126
 
    /* Strip to last / */
127
 
    string tmp;
128
 
    tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
129
 
    log_file_name.assign(tmp);
130
 
  }
131
 
  else
132
 
    log_file_name.assign(log_file_path);
133
 
 
134
 
  /* 
135
 
   * The offset of the next write is the current position of the log
136
 
   * file, since it's opened in append mode...
137
 
   */
138
 
  log_offset= lseek(log_file, 0, SEEK_END);
139
 
 
140
 
  state= ONLINE;
141
 
}
142
 
 
143
 
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
144
 
                                                     uint8_t *buffer,
145
 
                                                     uint32_t *checksum_out)
146
 
{
147
 
  uint8_t *orig_buffer= buffer;
148
 
  size_t message_byte_length= trx.ByteSize();
149
 
 
150
 
  /*
151
 
   * Write the header information, which is the message type and
152
 
   * the length of the transaction message into the buffer
153
 
   */
154
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
155
 
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
156
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
157
 
      static_cast<uint32_t>(message_byte_length), buffer);
158
 
  
159
 
  /*
160
 
   * Now write the serialized transaction message, followed
161
 
   * by the optional checksum into the buffer.
162
 
   */
163
 
  buffer= trx.SerializeWithCachedSizesToArray(buffer);
164
 
 
165
 
  if (do_checksum)
166
 
  {
167
 
    *checksum_out= drizzled::algorithm::crc32(
168
 
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
169
 
  }
170
 
  else
171
 
    *checksum_out= 0;
172
 
 
173
 
  /* We always write in network byte order */
174
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
175
 
  /* Reset the pointer back to its original location... */
176
 
  buffer= orig_buffer;
177
 
  return orig_buffer;
178
 
}
179
 
 
180
 
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
181
 
{
182
 
  ssize_t written= 0;
183
 
 
184
 
  /*
185
 
   * Do an atomic increment on the offset of the log file position
186
 
   */
187
 
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
188
 
 
189
 
  /* 
190
 
   * Quick safety...if an error occurs above in another writer, the log 
191
 
   * file will be in a crashed state.
192
 
   */
193
 
  if (unlikely(state == CRASHED))
194
 
  {
195
 
    /* 
196
 
     * Reset the log's offset in case we want to produce a decent error message including
197
 
     * the original offset where an error occurred.
198
 
     */
199
 
    log_offset= cur_offset;
200
 
    return log_offset;
201
 
  }
202
 
 
203
 
  /* Write the full buffer in one swoop */
204
 
  do
205
 
  {
206
 
    written= pwrite(log_file, data, data_length, cur_offset);
207
 
  }
208
 
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
209
 
 
210
 
  if (unlikely(written != static_cast<ssize_t>(data_length)))
211
 
  {
212
 
    char errmsg[STRERROR_MAX];
213
 
    strerror_r(errno, errmsg, sizeof(errmsg));
214
 
    errmsg_printf(error::ERROR, 
215
 
                  _("Failed to write full size of log entry.  Tried to write %" PRId64
216
 
                    " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
217
 
                  static_cast<int64_t>(data_length),
218
 
                  static_cast<int64_t>(cur_offset),
219
 
                  static_cast<int32_t>(written), 
220
 
                  errmsg);
221
 
    state= CRASHED;
222
 
    /* 
223
 
     * Reset the log's offset in case we want to produce a decent error message including
224
 
     * the original offset where an error occurred.
225
 
     */
226
 
    log_offset= cur_offset;
227
 
  }
228
 
 
229
 
  int error_code= syncLogFile();
230
 
 
231
 
  if (unlikely(error_code != 0))
232
 
  {
233
 
    sql_perror(_("Failed to sync log file."));
234
 
  }
235
 
 
236
 
  return cur_offset;
237
 
}
238
 
 
239
 
int TransactionLog::syncLogFile()
240
 
{
241
 
  switch (flush_frequency)
242
 
  {
243
 
  case FLUSH_FREQUENCY_EVERY_WRITE:
244
 
    return internal::my_sync(log_file, 0);
245
 
  case FLUSH_FREQUENCY_EVERY_SECOND:
246
 
    {
247
 
      time_t now_time= time(NULL);
248
 
      if (last_sync_time <= (now_time - 1))
249
 
      {
250
 
        last_sync_time= now_time;
251
 
        return internal::my_sync(log_file, 0);
252
 
      }
253
 
      return 0;
254
 
    }
255
 
  case FLUSH_FREQUENCY_OS:
256
 
  default:
257
 
    return 0;
258
 
  }
259
 
}
260
 
 
261
 
const string &TransactionLog::getLogFilename()
262
 
{
263
 
  return log_file_name;
264
 
}
265
 
 
266
 
const string &TransactionLog::getLogFilepath()
267
 
{
268
 
  return log_file_path;
269
 
}
270
 
 
271
 
void TransactionLog::truncate()
272
 
{
273
 
  /* 
274
 
   * @note
275
 
   *
276
 
   * This is NOT THREAD SAFE! DEBUG/TEST code only!
277
 
   */
278
 
  log_offset= (off_t) 0;
279
 
  int result;
280
 
  do
281
 
  {
282
 
    result= ftruncate(log_file, log_offset);
283
 
  }
284
 
  while (result == -1 && errno == EINTR);
285
 
}
286
 
 
287
 
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
288
 
                                                            string &out_filename) const
289
 
{
290
 
  /* 
291
 
   * Currently, we simply return the single logfile name
292
 
   * Eventually, we'll have an index/hash with upper and
293
 
   * lower bounds to look up a log file with a transaction id
294
 
   */
295
 
  out_filename.assign(log_file_path);
296
 
  return true;
297
 
}
298
 
 
299
 
bool TransactionLog::hasError() const
300
 
{
301
 
  return has_error;
302
 
}
303
 
 
304
 
void TransactionLog::clearError()
305
 
{
306
 
  has_error= false;
307
 
  error_message.clear();
308
 
}
309
 
 
310
 
const string &TransactionLog::getErrorMessage() const
311
 
{
312
 
  return error_message;
313
 
}
314
 
 
315
 
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
316
 
{
317
 
  return trx.ByteSize() + HEADER_TRAILER_BYTES;
318
 
}