~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/log.cc

  • Committer: Monty Taylor
  • Date: 2008-10-08 01:10:45 UTC
  • mto: This revision was merged to the branch mainline in revision 491.
  • Revision ID: monty@inaugust.com-20081008011045-zqozbc81f8qhmxok
Get rid of pragma interface/pragma implementation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/**
 
18
  @file
 
19
 
 
20
  @brief
 
21
  logging of commands
 
22
 
 
23
  @todo
 
24
    Abort logging when we get an error in reading or writing log files
 
25
*/
 
26
 
 
27
#include <drizzled/server_includes.h>
 
28
#include "sql_repl.h"
 
29
#include "rpl_filter.h"
 
30
#include "rpl_rli.h"
 
31
 
 
32
#include <mysys/my_dir.h>
 
33
#include <stdarg.h>
 
34
 
 
35
#include <drizzled/plugin.h>
 
36
#include <drizzled/drizzled_error_messages.h>
 
37
#include <libdrizzle/gettext.h>
 
38
 
 
39
/* max size of the log message */
 
40
#define MAX_LOG_BUFFER_SIZE 1024
 
41
#define MAX_USER_HOST_SIZE 512
 
42
#define MAX_TIME_SIZE 32
 
43
#define MY_OFF_T_UNDEF (~(my_off_t)0UL)
 
44
 
 
45
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
 
46
 
 
47
LOGGER logger;
 
48
 
 
49
DRIZZLE_BIN_LOG mysql_bin_log;
 
50
ulong sync_binlog_counter= 0;
 
51
 
 
52
static bool test_if_number(const char *str,
 
53
                           long *res, bool allow_wildcards);
 
54
static int binlog_init(void *p);
 
55
static int binlog_close_connection(handlerton *hton, THD *thd);
 
56
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv);
 
57
static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv);
 
58
static int binlog_commit(handlerton *hton, THD *thd, bool all);
 
59
static int binlog_rollback(handlerton *hton, THD *thd, bool all);
 
60
static int binlog_prepare(handlerton *hton, THD *thd, bool all);
 
61
 
 
62
 
 
63
sql_print_message_func sql_print_message_handlers[3] =
 
64
{
 
65
  sql_print_information,
 
66
  sql_print_warning,
 
67
  sql_print_error
 
68
};
 
69
 
 
70
 
 
71
char *make_default_log_name(char *buff,const char* log_ext)
 
72
{
 
73
  strmake(buff, pidfile_name, FN_REFLEN-5);
 
74
  return fn_format(buff, buff, mysql_data_home, log_ext,
 
75
                   MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT));
 
76
}
 
77
 
 
78
/*
 
79
  Helper class to hold a mutex for the duration of the
 
80
  block.
 
81
 
 
82
  Eliminates the need for explicit unlocking of mutexes on, e.g.,
 
83
  error returns.  On passing a null pointer, the sentry will not do
 
84
  anything.
 
85
 */
 
86
class Mutex_sentry
 
87
{
 
88
public:
 
89
  Mutex_sentry(pthread_mutex_t *mutex)
 
90
    : m_mutex(mutex)
 
91
  {
 
92
    if (m_mutex)
 
93
      pthread_mutex_lock(mutex);
 
94
  }
 
95
 
 
96
  ~Mutex_sentry()
 
97
  {
 
98
    if (m_mutex)
 
99
      pthread_mutex_unlock(m_mutex);
 
100
    m_mutex= 0;
 
101
  }
 
102
 
 
103
private:
 
104
  pthread_mutex_t *m_mutex;
 
105
 
 
106
  // It's not allowed to copy this object in any way
 
107
  Mutex_sentry(Mutex_sentry const&);
 
108
  void operator=(Mutex_sentry const&);
 
109
};
 
110
 
 
111
/*
 
112
  Helper class to store binary log transaction data.
 
113
*/
 
114
class binlog_trx_data {
 
115
public:
 
116
  binlog_trx_data()
 
117
    : at_least_one_stmt(0), m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF)
 
118
  {
 
119
    trans_log.end_of_file= max_binlog_cache_size;
 
120
  }
 
121
 
 
122
  ~binlog_trx_data()
 
123
  {
 
124
    assert(pending() == NULL);
 
125
    close_cached_file(&trans_log);
 
126
  }
 
127
 
 
128
  my_off_t position() const {
 
129
    return my_b_tell(&trans_log);
 
130
  }
 
131
 
 
132
  bool empty() const
 
133
  {
 
134
    return pending() == NULL && my_b_tell(&trans_log) == 0;
 
135
  }
 
136
 
 
137
  /*
 
138
    Truncate the transaction cache to a certain position. This
 
139
    includes deleting the pending event.
 
140
   */
 
141
  void truncate(my_off_t pos)
 
142
  {
 
143
    delete pending();
 
144
    set_pending(0);
 
145
    reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0);
 
146
    if (pos < before_stmt_pos)
 
147
      before_stmt_pos= MY_OFF_T_UNDEF;
 
148
 
 
149
    /*
 
150
      The only valid positions that can be truncated to are at the
 
151
      beginning of a statement. We are relying on this fact to be able
 
152
      to set the at_least_one_stmt flag correctly. In other word, if
 
153
      we are truncating to the beginning of the transaction cache,
 
154
      there will be no statements in the cache, otherwhise, we will
 
155
      have at least one statement in the transaction cache.
 
156
     */
 
157
    at_least_one_stmt= (pos > 0);
 
158
  }
 
159
 
 
160
  /*
 
161
    Reset the entire contents of the transaction cache, emptying it
 
162
    completely.
 
163
   */
 
164
  void reset() {
 
165
    if (!empty())
 
166
      truncate(0);
 
167
    before_stmt_pos= MY_OFF_T_UNDEF;
 
168
    trans_log.end_of_file= max_binlog_cache_size;
 
169
  }
 
170
 
 
171
  Rows_log_event *pending() const
 
172
  {
 
173
    return m_pending;
 
174
  }
 
175
 
 
176
  void set_pending(Rows_log_event *const pending)
 
177
  {
 
178
    m_pending= pending;
 
179
  }
 
180
 
 
181
  IO_CACHE trans_log;                         // The transaction cache
 
182
 
 
183
  /**
 
184
    Boolean that is true if there is at least one statement in the
 
185
    transaction cache.
 
186
  */
 
187
  bool at_least_one_stmt;
 
188
 
 
189
private:
 
190
  /*
 
191
    Pending binrows event. This event is the event where the rows are
 
192
    currently written.
 
193
   */
 
194
  Rows_log_event *m_pending;
 
195
 
 
196
public:
 
197
  /*
 
198
    Binlog position before the start of the current statement.
 
199
  */
 
200
  my_off_t before_stmt_pos;
 
201
};
 
202
 
 
203
handlerton *binlog_hton;
 
204
 
 
205
 
 
206
/* Check if a given table is opened log table */
 
207
int check_if_log_table(uint32_t db_len __attribute__((unused)),
 
208
                       const char *db __attribute__((unused)),
 
209
                       uint32_t table_name_len __attribute__((unused)),
 
210
                       const char *table_name __attribute__((unused)),
 
211
                       uint32_t check_if_opened __attribute__((unused)))
 
212
{
 
213
  return 0;
 
214
}
 
215
 
 
216
/*
 
217
  Log error with all enabled log event handlers
 
218
 
 
219
  SYNOPSIS
 
220
    error_log_print()
 
221
 
 
222
    level             The level of the error significance: NOTE,
 
223
                      WARNING or ERROR.
 
224
    format            format string for the error message
 
225
    args              list of arguments for the format string
 
226
 
 
227
  RETURN
 
228
    FALSE - OK
 
229
    TRUE - error occured
 
230
*/
 
231
 
 
232
bool LOGGER::error_log_print(enum loglevel level, const char *format,
 
233
                             va_list args)
 
234
{
 
235
  bool error= false;
 
236
  Log_event_handler **current_handler;
 
237
 
 
238
  /* currently we don't need locking here as there is no error_log table */
 
239
  for (current_handler= error_log_handler_list ; *current_handler ;)
 
240
    error= (*current_handler++)->log_error(level, format, args) || error;
 
241
 
 
242
  return error;
 
243
}
 
244
 
 
245
 
 
246
void LOGGER::cleanup_base()
 
247
{
 
248
  assert(inited == 1);
 
249
  rwlock_destroy(&LOCK_logger);
 
250
}
 
251
 
 
252
 
 
253
void LOGGER::cleanup_end()
 
254
{
 
255
  assert(inited == 1);
 
256
}
 
257
 
 
258
 
 
259
/**
 
260
  Perform basic log initialization: create file-based log handler and
 
261
  init error log.
 
262
*/
 
263
void LOGGER::init_base()
 
264
{
 
265
  assert(inited == 0);
 
266
  inited= 1;
 
267
 
 
268
  /* by default we use traditional error log */
 
269
  init_error_log(LOG_FILE);
 
270
 
 
271
  my_rwlock_init(&LOCK_logger, NULL);
 
272
}
 
273
 
 
274
 
 
275
bool LOGGER::flush_logs(THD *thd __attribute__((unused)))
 
276
{
 
277
  int rc= 0;
 
278
 
 
279
  /*
 
280
    Now we lock logger, as nobody should be able to use logging routines while
 
281
    log tables are closed
 
282
  */
 
283
  logger.lock_exclusive();
 
284
 
 
285
  /* end of log flush */
 
286
  logger.unlock();
 
287
  return rc;
 
288
}
 
289
 
 
290
void LOGGER::init_error_log(uint32_t error_log_printer)
 
291
{
 
292
  if (error_log_printer & LOG_NONE)
 
293
  {
 
294
    error_log_handler_list[0]= 0;
 
295
    return;
 
296
  }
 
297
 
 
298
}
 
299
 
 
300
int LOGGER::set_handlers(uint32_t error_log_printer)
 
301
{
 
302
  /* error log table is not supported yet */
 
303
  lock_exclusive();
 
304
 
 
305
  init_error_log(error_log_printer);
 
306
  unlock();
 
307
 
 
308
  return 0;
 
309
}
 
310
 
 
311
 
 
312
 /*
 
313
  Save position of binary log transaction cache.
 
314
 
 
315
  SYNPOSIS
 
316
    binlog_trans_log_savepos()
 
317
 
 
318
    thd      The thread to take the binlog data from
 
319
    pos      Pointer to variable where the position will be stored
 
320
 
 
321
  DESCRIPTION
 
322
 
 
323
    Save the current position in the binary log transaction cache into
 
324
    the variable pointed to by 'pos'
 
325
 */
 
326
 
 
327
static void
 
328
binlog_trans_log_savepos(THD *thd, my_off_t *pos)
 
329
{
 
330
  assert(pos != NULL);
 
331
  if (thd_get_ha_data(thd, binlog_hton) == NULL)
 
332
    thd->binlog_setup_trx_data();
 
333
  binlog_trx_data *const trx_data=
 
334
    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
335
  assert(mysql_bin_log.is_open());
 
336
  *pos= trx_data->position();
 
337
  return;
 
338
}
 
339
 
 
340
 
 
341
/*
 
342
  Truncate the binary log transaction cache.
 
343
 
 
344
  SYNPOSIS
 
345
    binlog_trans_log_truncate()
 
346
 
 
347
    thd      The thread to take the binlog data from
 
348
    pos      Position to truncate to
 
349
 
 
350
  DESCRIPTION
 
351
 
 
352
    Truncate the binary log to the given position. Will not change
 
353
    anything else.
 
354
 
 
355
 */
 
356
static void
 
357
binlog_trans_log_truncate(THD *thd, my_off_t pos)
 
358
{
 
359
  assert(thd_get_ha_data(thd, binlog_hton) != NULL);
 
360
  /* Only true if binlog_trans_log_savepos() wasn't called before */
 
361
  assert(pos != ~(my_off_t) 0);
 
362
 
 
363
  binlog_trx_data *const trx_data=
 
364
    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
365
  trx_data->truncate(pos);
 
366
  return;
 
367
}
 
368
 
 
369
 
 
370
/*
 
371
  this function is mostly a placeholder.
 
372
  conceptually, binlog initialization (now mostly done in DRIZZLE_BIN_LOG::open)
 
373
  should be moved here.
 
374
*/
 
375
 
 
376
int binlog_init(void *p)
 
377
{
 
378
  binlog_hton= (handlerton *)p;
 
379
  binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
 
380
  binlog_hton->savepoint_offset= sizeof(my_off_t);
 
381
  binlog_hton->close_connection= binlog_close_connection;
 
382
  binlog_hton->savepoint_set= binlog_savepoint_set;
 
383
  binlog_hton->savepoint_rollback= binlog_savepoint_rollback;
 
384
  binlog_hton->commit= binlog_commit;
 
385
  binlog_hton->rollback= binlog_rollback;
 
386
  binlog_hton->prepare= binlog_prepare;
 
387
  binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
 
388
 
 
389
  return 0;
 
390
}
 
391
 
 
392
static int binlog_close_connection(handlerton *hton __attribute__((unused)),
 
393
                                   THD *thd)
 
394
{
 
395
  binlog_trx_data *const trx_data=
 
396
    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
397
  assert(trx_data->empty());
 
398
  thd_set_ha_data(thd, binlog_hton, NULL);
 
399
  trx_data->~binlog_trx_data();
 
400
  free((unsigned char*)trx_data);
 
401
  return 0;
 
402
}
 
403
 
 
404
/*
 
405
  End a transaction.
 
406
 
 
407
  SYNOPSIS
 
408
    binlog_end_trans()
 
409
 
 
410
    thd      The thread whose transaction should be ended
 
411
    trx_data Pointer to the transaction data to use
 
412
    end_ev   The end event to use, or NULL
 
413
    all      True if the entire transaction should be ended, false if
 
414
             only the statement transaction should be ended.
 
415
 
 
416
  DESCRIPTION
 
417
 
 
418
    End the currently open transaction. The transaction can be either
 
419
    a real transaction (if 'all' is true) or a statement transaction
 
420
    (if 'all' is false).
 
421
 
 
422
    If 'end_ev' is NULL, the transaction is a rollback of only
 
423
    transactional tables, so the transaction cache will be truncated
 
424
    to either just before the last opened statement transaction (if
 
425
    'all' is false), or reset completely (if 'all' is true).
 
426
 */
 
427
static int
 
428
binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
 
429
                 Log_event *end_ev, bool all)
 
430
{
 
431
  int error=0;
 
432
  IO_CACHE *trans_log= &trx_data->trans_log;
 
433
 
 
434
  /*
 
435
    NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of
 
436
    only transactional tables.  If the transaction contain changes to
 
437
    any non-transactiona tables, we need write the transaction and log
 
438
    a ROLLBACK last.
 
439
  */
 
440
  if (end_ev != NULL)
 
441
  {
 
442
    /*
 
443
      Doing a commit or a rollback including non-transactional tables,
 
444
      i.e., ending a transaction where we might write the transaction
 
445
      cache to the binary log.
 
446
 
 
447
      We can always end the statement when ending a transaction since
 
448
      transactions are not allowed inside stored functions.  If they
 
449
      were, we would have to ensure that we're not ending a statement
 
450
      inside a stored function.
 
451
     */
 
452
    thd->binlog_flush_pending_rows_event(true);
 
453
 
 
454
    error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev);
 
455
    trx_data->reset();
 
456
 
 
457
    /*
 
458
      We need to step the table map version after writing the
 
459
      transaction cache to disk.
 
460
    */
 
461
    mysql_bin_log.update_table_map_version();
 
462
    statistic_increment(binlog_cache_use, &LOCK_status);
 
463
    if (trans_log->disk_writes != 0)
 
464
    {
 
465
      statistic_increment(binlog_cache_disk_use, &LOCK_status);
 
466
      trans_log->disk_writes= 0;
 
467
    }
 
468
  }
 
469
  else
 
470
  {
 
471
    /*
 
472
      If rolling back an entire transaction or a single statement not
 
473
      inside a transaction, we reset the transaction cache.
 
474
 
 
475
      If rolling back a statement in a transaction, we truncate the
 
476
      transaction cache to remove the statement.
 
477
     */
 
478
    if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
 
479
    {
 
480
      trx_data->reset();
 
481
 
 
482
      assert(!thd->binlog_get_pending_rows_event());
 
483
      thd->clear_binlog_table_maps();
 
484
    }
 
485
    else                                        // ...statement
 
486
      trx_data->truncate(trx_data->before_stmt_pos);
 
487
 
 
488
    /*
 
489
      We need to step the table map version on a rollback to ensure
 
490
      that a new table map event is generated instead of the one that
 
491
      was written to the thrown-away transaction cache.
 
492
    */
 
493
    mysql_bin_log.update_table_map_version();
 
494
  }
 
495
 
 
496
  return(error);
 
497
}
 
498
 
 
499
static int binlog_prepare(handlerton *hton __attribute__((unused)),
 
500
                          THD *thd __attribute__((unused)),
 
501
                          bool all __attribute__((unused)))
 
502
{
 
503
  /*
 
504
    do nothing.
 
505
    just pretend we can do 2pc, so that MySQL won't
 
506
    switch to 1pc.
 
507
    real work will be done in DRIZZLE_BIN_LOG::log_xid()
 
508
  */
 
509
  return 0;
 
510
}
 
511
 
 
512
#define YESNO(X) ((X) ? "yes" : "no")
 
513
 
 
514
/**
 
515
  This function is called once after each statement.
 
516
 
 
517
  It has the responsibility to flush the transaction cache to the
 
518
  binlog file on commits.
 
519
 
 
520
  @param hton  The binlog handlerton.
 
521
  @param thd   The client thread that executes the transaction.
 
522
  @param all   This is @c true if this is a real transaction commit, and
 
523
               @false otherwise.
 
524
 
 
525
  @see handlerton::commit
 
526
*/
 
527
static int binlog_commit(handlerton *hton __attribute__((unused)),
 
528
                         THD *thd, bool all)
 
529
{
 
530
  binlog_trx_data *const trx_data=
 
531
    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
532
 
 
533
  if (trx_data->empty())
 
534
  {
 
535
    // we're here because trans_log was flushed in DRIZZLE_BIN_LOG::log_xid()
 
536
    trx_data->reset();
 
537
    return(0);
 
538
  }
 
539
 
 
540
  /*
 
541
    Decision table for committing a transaction. The top part, the
 
542
    *conditions* represent different cases that can occur, and hte
 
543
    bottom part, the *actions*, represent what should be done in that
 
544
    particular case.
 
545
 
 
546
    Real transaction        'all' was true
 
547
 
 
548
    Statement in cache      There were at least one statement in the
 
549
                            transaction cache
 
550
 
 
551
    In transaction          We are inside a transaction
 
552
 
 
553
    Stmt modified non-trans The statement being committed modified a
 
554
                            non-transactional table
 
555
 
 
556
    All modified non-trans  Some statement before this one in the
 
557
                            transaction modified a non-transactional
 
558
                            table
 
559
 
 
560
 
 
561
    =============================  = = = = = = = = = = = = = = = =
 
562
    Real transaction               N N N N N N N N N N N N N N N N
 
563
    Statement in cache             N N N N N N N N Y Y Y Y Y Y Y Y
 
564
    In transaction                 N N N N Y Y Y Y N N N N Y Y Y Y
 
565
    Stmt modified non-trans        N N Y Y N N Y Y N N Y Y N N Y Y
 
566
    All modified non-trans         N Y N Y N Y N Y N Y N Y N Y N Y
 
567
 
 
568
    Action: (C)ommit/(A)ccumulate  C C - C A C - C - - - - A A - A
 
569
    =============================  = = = = = = = = = = = = = = = =
 
570
 
 
571
 
 
572
    =============================  = = = = = = = = = = = = = = = =
 
573
    Real transaction               Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y
 
574
    Statement in cache             N N N N N N N N Y Y Y Y Y Y Y Y
 
575
    In transaction                 N N N N Y Y Y Y N N N N Y Y Y Y
 
576
    Stmt modified non-trans        N N Y Y N N Y Y N N Y Y N N Y Y
 
577
    All modified non-trans         N Y N Y N Y N Y N Y N Y N Y N Y
 
578
 
 
579
    (C)ommit/(A)ccumulate/(-)      - - - - C C - C - - - - C C - C
 
580
    =============================  = = = = = = = = = = = = = = = =
 
581
 
 
582
    In other words, we commit the transaction if and only if both of
 
583
    the following are true:
 
584
     - We are not in a transaction and committing a statement
 
585
 
 
586
     - We are in a transaction and one (or more) of the following are
 
587
       true:
 
588
 
 
589
       - A full transaction is committed
 
590
 
 
591
         OR
 
592
 
 
593
       - A non-transactional statement is committed and there is
 
594
         no statement cached
 
595
 
 
596
    Otherwise, we accumulate the statement
 
597
  */
 
598
  uint64_t const in_transaction=
 
599
    thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
 
600
  if ((in_transaction && (all || (!trx_data->at_least_one_stmt && thd->transaction.stmt.modified_non_trans_table))) || (!in_transaction && !all))
 
601
  {
 
602
    Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), true, false);
 
603
    qev.error_code= 0; // see comment in DRIZZLE_LOG::write(THD, IO_CACHE)
 
604
    int error= binlog_end_trans(thd, trx_data, &qev, all);
 
605
    return(error);
 
606
  }
 
607
  return(0);
 
608
}
 
609
 
 
610
/**
 
611
  This function is called when a transaction involving a transactional
 
612
  table is rolled back.
 
613
 
 
614
  It has the responsibility to flush the transaction cache to the
 
615
  binlog file. However, if the transaction does not involve
 
616
  non-transactional tables, nothing needs to be logged.
 
617
 
 
618
  @param hton  The binlog handlerton.
 
619
  @param thd   The client thread that executes the transaction.
 
620
  @param all   This is @c true if this is a real transaction rollback, and
 
621
               @false otherwise.
 
622
 
 
623
  @see handlerton::rollback
 
624
*/
 
625
static int binlog_rollback(handlerton *hton __attribute__((unused)),
 
626
                           THD *thd, bool all)
 
627
{
 
628
  int error=0;
 
629
  binlog_trx_data *const trx_data=
 
630
    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
631
 
 
632
  if (trx_data->empty()) {
 
633
    trx_data->reset();
 
634
    return(0);
 
635
  }
 
636
 
 
637
  if ((all && thd->transaction.all.modified_non_trans_table) ||
 
638
      (!all && thd->transaction.stmt.modified_non_trans_table) ||
 
639
      (thd->options & OPTION_KEEP_LOG))
 
640
  {
 
641
    /*
 
642
      We write the transaction cache with a rollback last if we have
 
643
      modified any non-transactional table. We do this even if we are
 
644
      committing a single statement that has modified a
 
645
      non-transactional table since it can have modified a
 
646
      transactional table in that statement as well, which needs to be
 
647
      rolled back on the slave.
 
648
    */
 
649
    Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), true, false);
 
650
    qev.error_code= 0; // see comment in DRIZZLE_LOG::write(THD, IO_CACHE)
 
651
    error= binlog_end_trans(thd, trx_data, &qev, all);
 
652
  }
 
653
  else if ((all && !thd->transaction.all.modified_non_trans_table) ||
 
654
           (!all && !thd->transaction.stmt.modified_non_trans_table))
 
655
  {
 
656
    /*
 
657
      If we have modified only transactional tables, we can truncate
 
658
      the transaction cache without writing anything to the binary
 
659
      log.
 
660
     */
 
661
    error= binlog_end_trans(thd, trx_data, 0, all);
 
662
  }
 
663
  return(error);
 
664
}
 
665
 
 
666
/**
 
667
  @note
 
668
  How do we handle this (unlikely but legal) case:
 
669
  @verbatim
 
670
    [transaction] + [update to non-trans table] + [rollback to savepoint] ?
 
671
  @endverbatim
 
672
  The problem occurs when a savepoint is before the update to the
 
673
  non-transactional table. Then when there's a rollback to the savepoint, if we
 
674
  simply truncate the binlog cache, we lose the part of the binlog cache where
 
675
  the update is. If we want to not lose it, we need to write the SAVEPOINT
 
676
  command and the ROLLBACK TO SAVEPOINT command to the binlog cache. The latter
 
677
  is easy: it's just write at the end of the binlog cache, but the former
 
678
  should be *inserted* to the place where the user called SAVEPOINT. The
 
679
  solution is that when the user calls SAVEPOINT, we write it to the binlog
 
680
  cache (so no need to later insert it). As transactions are never intermixed
 
681
  in the binary log (i.e. they are serialized), we won't have conflicts with
 
682
  savepoint names when using mysqlbinlog or in the slave SQL thread.
 
683
  Then when ROLLBACK TO SAVEPOINT is called, if we updated some
 
684
  non-transactional table, we don't truncate the binlog cache but instead write
 
685
  ROLLBACK TO SAVEPOINT to it; otherwise we truncate the binlog cache (which
 
686
  will chop the SAVEPOINT command from the binlog cache, which is good as in
 
687
  that case there is no need to have it in the binlog).
 
688
*/
 
689
 
 
690
static int binlog_savepoint_set(handlerton *hton __attribute__((unused)),
 
691
                                THD *thd, void *sv)
 
692
{
 
693
  binlog_trans_log_savepos(thd, (my_off_t*) sv);
 
694
  /* Write it to the binary log */
 
695
  
 
696
  int const error=
 
697
    thd->binlog_query(THD::STMT_QUERY_TYPE,
 
698
                      thd->query, thd->query_length, true, false);
 
699
  return(error);
 
700
}
 
701
 
 
702
static int binlog_savepoint_rollback(handlerton *hton __attribute__((unused)),
 
703
                                     THD *thd, void *sv)
 
704
{
 
705
  /*
 
706
    Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
 
707
    non-transactional table. Otherwise, truncate the binlog cache starting
 
708
    from the SAVEPOINT command.
 
709
  */
 
710
  if (unlikely(thd->transaction.all.modified_non_trans_table || 
 
711
               (thd->options & OPTION_KEEP_LOG)))
 
712
  {
 
713
    int error=
 
714
      thd->binlog_query(THD::STMT_QUERY_TYPE,
 
715
                        thd->query, thd->query_length, true, false);
 
716
    return(error);
 
717
  }
 
718
  binlog_trans_log_truncate(thd, *(my_off_t*)sv);
 
719
  return(0);
 
720
}
 
721
 
 
722
 
 
723
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
 
724
{
 
725
  char magic[4];
 
726
  assert(my_b_tell(log) == 0);
 
727
 
 
728
  if (my_b_read(log, (unsigned char*) magic, sizeof(magic)))
 
729
  {
 
730
    *errmsg = _("I/O error reading the header from the binary log");
 
731
    sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
 
732
                    log->error);
 
733
    return 1;
 
734
  }
 
735
  if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
 
736
  {
 
737
    *errmsg = _("Binlog has bad magic number;  It's not a binary log file "
 
738
                "that can be used by this version of Drizzle");
 
739
    return 1;
 
740
  }
 
741
  return 0;
 
742
}
 
743
 
 
744
 
 
745
File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg)
 
746
{
 
747
  File file;
 
748
 
 
749
  if ((file = my_open(log_file_name, O_RDONLY | O_BINARY | O_SHARE, 
 
750
                      MYF(MY_WME))) < 0)
 
751
  {
 
752
    sql_print_error(_("Failed to open log (file '%s', errno %d)"),
 
753
                    log_file_name, my_errno);
 
754
    *errmsg = _("Could not open log file");
 
755
    goto err;
 
756
  }
 
757
  if (init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
 
758
                    MYF(MY_WME|MY_DONT_CHECK_FILESIZE)))
 
759
  {
 
760
    sql_print_error(_("Failed to create a cache on log (file '%s')"),
 
761
                    log_file_name);
 
762
    *errmsg = _("Could not open log file");
 
763
    goto err;
 
764
  }
 
765
  if (check_binlog_magic(log,errmsg))
 
766
    goto err;
 
767
  return(file);
 
768
 
 
769
err:
 
770
  if (file >= 0)
 
771
  {
 
772
    my_close(file,MYF(0));
 
773
    end_io_cache(log);
 
774
  }
 
775
  return(-1);
 
776
}
 
777
 
 
778
 
 
779
/**
 
780
  Find a unique filename for 'filename.#'.
 
781
 
 
782
  Set '#' to a number as low as possible.
 
783
 
 
784
  @return
 
785
    nonzero if not possible to get unique filename
 
786
*/
 
787
 
 
788
static int find_uniq_filename(char *name)
 
789
{
 
790
  long                  number;
 
791
  uint32_t                  i;
 
792
  char                  buff[FN_REFLEN];
 
793
  struct st_my_dir     *dir_info;
 
794
  register struct fileinfo *file_info;
 
795
  ulong                 max_found=0;
 
796
  size_t                buf_length, length;
 
797
  char                  *start, *end;
 
798
 
 
799
  length= dirname_part(buff, name, &buf_length);
 
800
  start=  name + length;
 
801
  end= strchr(start, '\0');
 
802
 
 
803
  *end='.';
 
804
  length= (size_t) (end-start+1);
 
805
 
 
806
  if (!(dir_info = my_dir(buff,MYF(MY_DONT_SORT))))
 
807
  {                                             // This shouldn't happen
 
808
    my_stpcpy(end,".1");                                // use name+1
 
809
    return(0);
 
810
  }
 
811
  file_info= dir_info->dir_entry;
 
812
  for (i=dir_info->number_off_files ; i-- ; file_info++)
 
813
  {
 
814
    if (memcmp(file_info->name, start, length) == 0 &&
 
815
        test_if_number(file_info->name+length, &number,0))
 
816
    {
 
817
      set_if_bigger(max_found,(ulong) number);
 
818
    }
 
819
  }
 
820
  my_dirend(dir_info);
 
821
 
 
822
  *end++='.';
 
823
  sprintf(end,"%06ld",max_found+1);
 
824
  return(0);
 
825
}
 
826
 
 
827
 
 
828
void DRIZZLE_LOG::init(enum_log_type log_type_arg,
 
829
                     enum cache_type io_cache_type_arg)
 
830
{
 
831
  log_type= log_type_arg;
 
832
  io_cache_type= io_cache_type_arg;
 
833
  return;
 
834
}
 
835
 
 
836
 
 
837
/*
 
838
  Open a (new) log file.
 
839
 
 
840
  SYNOPSIS
 
841
    open()
 
842
 
 
843
    log_name            The name of the log to open
 
844
    log_type_arg        The type of the log. E.g. LOG_NORMAL
 
845
    new_name            The new name for the logfile. This is only needed
 
846
                        when the method is used to open the binlog file.
 
847
    io_cache_type_arg   The type of the IO_CACHE to use for this log file
 
848
 
 
849
  DESCRIPTION
 
850
    Open the logfile, init IO_CACHE and write startup messages
 
851
    (in case of general and slow query logs).
 
852
 
 
853
  RETURN VALUES
 
854
    0   ok
 
855
    1   error
 
856
*/
 
857
 
 
858
bool DRIZZLE_LOG::open(const char *log_name, enum_log_type log_type_arg,
 
859
                     const char *new_name, enum cache_type io_cache_type_arg)
 
860
{
 
861
  char buff[FN_REFLEN];
 
862
  File file= -1;
 
863
  int open_flags= O_CREAT | O_BINARY;
 
864
 
 
865
  write_error= 0;
 
866
 
 
867
  init(log_type_arg, io_cache_type_arg);
 
868
 
 
869
  if (!(name= my_strdup(log_name, MYF(MY_WME))))
 
870
  {
 
871
    name= (char *)log_name; // for the error message
 
872
    goto err;
 
873
  }
 
874
 
 
875
  if (new_name)
 
876
    my_stpcpy(log_file_name, new_name);
 
877
  else if (generate_new_name(log_file_name, name))
 
878
    goto err;
 
879
 
 
880
  if (io_cache_type == SEQ_READ_APPEND)
 
881
    open_flags |= O_RDWR | O_APPEND;
 
882
  else
 
883
    open_flags |= O_WRONLY | (log_type == LOG_BIN ? 0 : O_APPEND);
 
884
 
 
885
  db[0]= 0;
 
886
 
 
887
  if ((file= my_open(log_file_name, open_flags,
 
888
                     MYF(MY_WME | ME_WAITTANG))) < 0 ||
 
889
      init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
 
890
                    my_tell(file, MYF(MY_WME)), 0,
 
891
                    MYF(MY_WME | MY_NABP |
 
892
                        ((log_type == LOG_BIN) ? MY_WAIT_IF_FULL : 0))))
 
893
    goto err;
 
894
 
 
895
  if (log_type == LOG_NORMAL)
 
896
  {
 
897
    char *end;
 
898
    int len=snprintf(buff, sizeof(buff), "%s, Version: %s (%s). "
 
899
                     "started with:\nTCP Port: %d, Named Pipe: %s\n",
 
900
                     my_progname, server_version, DRIZZLE_COMPILATION_COMMENT,
 
901
                     mysqld_port, ""
 
902
                     );
 
903
    end= my_stpncpy(buff + len, "Time                 Id Command    Argument\n",
 
904
                 sizeof(buff) - len);
 
905
    if (my_b_write(&log_file, (unsigned char*) buff, (uint) (end-buff)) ||
 
906
        flush_io_cache(&log_file))
 
907
      goto err;
 
908
  }
 
909
 
 
910
  log_state= LOG_OPENED;
 
911
  return(0);
 
912
 
 
913
err:
 
914
  sql_print_error(_("Could not use %s for logging (error %d). "
 
915
                    "Turning logging off for the whole duration of the "
 
916
                    "Drizzle server process. "
 
917
                    "To turn it on again: fix the cause, "
 
918
                    "shutdown the Drizzle server and restart it."),
 
919
                    name, errno);
 
920
  if (file >= 0)
 
921
    my_close(file, MYF(0));
 
922
  end_io_cache(&log_file);
 
923
  if (name)
 
924
  {
 
925
    free(name);
 
926
    name= NULL;
 
927
  }
 
928
  log_state= LOG_CLOSED;
 
929
  return(1);
 
930
}
 
931
 
 
932
DRIZZLE_LOG::DRIZZLE_LOG()
 
933
  : name(0), write_error(false), inited(false), log_type(LOG_UNKNOWN),
 
934
    log_state(LOG_CLOSED)
 
935
{
 
936
  /*
 
937
    We don't want to initialize LOCK_Log here as such initialization depends on
 
938
    safe_mutex (when using safe_mutex) which depends on MY_INIT(), which is
 
939
    called only in main(). Doing initialization here would make it happen
 
940
    before main().
 
941
  */
 
942
  memset(&log_file, 0, sizeof(log_file));
 
943
}
 
944
 
 
945
void DRIZZLE_LOG::init_pthread_objects()
 
946
{
 
947
  assert(inited == 0);
 
948
  inited= 1;
 
949
  (void) pthread_mutex_init(&LOCK_log, MY_MUTEX_INIT_SLOW);
 
950
}
 
951
 
 
952
/*
 
953
  Close the log file
 
954
 
 
955
  SYNOPSIS
 
956
    close()
 
957
    exiting     Bitmask. For the slow and general logs the only used bit is
 
958
                LOG_CLOSE_TO_BE_OPENED. This is used if we intend to call
 
959
                open at once after close.
 
960
 
 
961
  NOTES
 
962
    One can do an open on the object at once after doing a close.
 
963
    The internal structures are not freed until cleanup() is called
 
964
*/
 
965
 
 
966
void DRIZZLE_LOG::close(uint32_t exiting)
 
967
{                                       // One can't set log_type here!
 
968
  if (log_state == LOG_OPENED)
 
969
  {
 
970
    end_io_cache(&log_file);
 
971
 
 
972
    if (my_sync(log_file.file, MYF(MY_WME)) && ! write_error)
 
973
    {
 
974
      write_error= 1;
 
975
      sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
 
976
    }
 
977
 
 
978
    if (my_close(log_file.file, MYF(MY_WME)) && ! write_error)
 
979
    {
 
980
      write_error= 1;
 
981
      sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
 
982
    }
 
983
  }
 
984
 
 
985
  log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
 
986
  if (name)
 
987
  {
 
988
    free(name);
 
989
    name= NULL;
 
990
  }
 
991
  return;
 
992
}
 
993
 
 
994
/** This is called only once. */
 
995
 
 
996
void DRIZZLE_LOG::cleanup()
 
997
{
 
998
  if (inited)
 
999
  {
 
1000
    inited= 0;
 
1001
    (void) pthread_mutex_destroy(&LOCK_log);
 
1002
    close(0);
 
1003
  }
 
1004
  return;
 
1005
}
 
1006
 
 
1007
 
 
1008
int DRIZZLE_LOG::generate_new_name(char *new_name, const char *log_name)
 
1009
{
 
1010
  fn_format(new_name, log_name, mysql_data_home, "", 4);
 
1011
  if (log_type == LOG_BIN)
 
1012
  {
 
1013
    if (!fn_ext(log_name)[0])
 
1014
    {
 
1015
      if (find_uniq_filename(new_name))
 
1016
      {
 
1017
        sql_print_error(ER(ER_NO_UNIQUE_LOGFILE), log_name);
 
1018
        return 1;
 
1019
      }
 
1020
    }
 
1021
  }
 
1022
  return 0;
 
1023
}
 
1024
 
 
1025
 
 
1026
/**
 
1027
  @todo
 
1028
  The following should be using fn_format();  We just need to
 
1029
  first change fn_format() to cut the file name if it's too long.
 
1030
*/
 
1031
const char *DRIZZLE_LOG::generate_name(const char *log_name,
 
1032
                                      const char *suffix,
 
1033
                                      bool strip_ext, char *buff)
 
1034
{
 
1035
  if (!log_name || !log_name[0])
 
1036
  {
 
1037
    strmake(buff, pidfile_name, FN_REFLEN - strlen(suffix) - 1);
 
1038
    return (const char *)
 
1039
      fn_format(buff, buff, "", suffix, MYF(MY_REPLACE_EXT|MY_REPLACE_DIR));
 
1040
  }
 
1041
  // get rid of extension if the log is binary to avoid problems
 
1042
  if (strip_ext)
 
1043
  {
 
1044
    char *p= fn_ext(log_name);
 
1045
    uint32_t length= (uint) (p - log_name);
 
1046
    strmake(buff, log_name, cmin(length, (uint)FN_REFLEN));
 
1047
    return (const char*)buff;
 
1048
  }
 
1049
  return log_name;
 
1050
}
 
1051
 
 
1052
 
 
1053
 
 
1054
DRIZZLE_BIN_LOG::DRIZZLE_BIN_LOG()
 
1055
  :bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
 
1056
   need_start_event(true), m_table_map_version(0),
 
1057
   description_event_for_exec(0), description_event_for_queue(0)
 
1058
{
 
1059
  /*
 
1060
    We don't want to initialize locks here as such initialization depends on
 
1061
    safe_mutex (when using safe_mutex) which depends on MY_INIT(), which is
 
1062
    called only in main(). Doing initialization here would make it happen
 
1063
    before main().
 
1064
  */
 
1065
  index_file_name[0] = 0;
 
1066
  memset(&index_file, 0, sizeof(index_file));
 
1067
}
 
1068
 
 
1069
/* this is called only once */
 
1070
 
 
1071
void DRIZZLE_BIN_LOG::cleanup()
 
1072
{
 
1073
  if (inited)
 
1074
  {
 
1075
    inited= 0;
 
1076
    close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
 
1077
    delete description_event_for_queue;
 
1078
    delete description_event_for_exec;
 
1079
    (void) pthread_mutex_destroy(&LOCK_log);
 
1080
    (void) pthread_mutex_destroy(&LOCK_index);
 
1081
    (void) pthread_cond_destroy(&update_cond);
 
1082
  }
 
1083
  return;
 
1084
}
 
1085
 
 
1086
 
 
1087
/* Init binlog-specific vars */
 
1088
void DRIZZLE_BIN_LOG::init(bool no_auto_events_arg, ulong max_size_arg)
 
1089
{
 
1090
  no_auto_events= no_auto_events_arg;
 
1091
  max_size= max_size_arg;
 
1092
  return;
 
1093
}
 
1094
 
 
1095
 
 
1096
void DRIZZLE_BIN_LOG::init_pthread_objects()
 
1097
{
 
1098
  assert(inited == 0);
 
1099
  inited= 1;
 
1100
  (void) pthread_mutex_init(&LOCK_log, MY_MUTEX_INIT_SLOW);
 
1101
  (void) pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW);
 
1102
  (void) pthread_cond_init(&update_cond, 0);
 
1103
}
 
1104
 
 
1105
 
 
1106
bool DRIZZLE_BIN_LOG::open_index_file(const char *index_file_name_arg,
 
1107
                                const char *log_name)
 
1108
{
 
1109
  File index_file_nr= -1;
 
1110
  assert(!my_b_inited(&index_file));
 
1111
 
 
1112
  /*
 
1113
    First open of this class instance
 
1114
    Create an index file that will hold all file names uses for logging.
 
1115
    Add new entries to the end of it.
 
1116
  */
 
1117
  myf opt= MY_UNPACK_FILENAME;
 
1118
  if (!index_file_name_arg)
 
1119
  {
 
1120
    index_file_name_arg= log_name;    // Use same basename for index file
 
1121
    opt= MY_UNPACK_FILENAME | MY_REPLACE_EXT;
 
1122
  }
 
1123
  fn_format(index_file_name, index_file_name_arg, mysql_data_home,
 
1124
            ".index", opt);
 
1125
  if ((index_file_nr= my_open(index_file_name,
 
1126
                              O_RDWR | O_CREAT | O_BINARY ,
 
1127
                              MYF(MY_WME))) < 0 ||
 
1128
       my_sync(index_file_nr, MYF(MY_WME)) ||
 
1129
       init_io_cache(&index_file, index_file_nr,
 
1130
                     IO_SIZE, WRITE_CACHE,
 
1131
                     my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
 
1132
                        0, MYF(MY_WME | MY_WAIT_IF_FULL)))
 
1133
  {
 
1134
    /*
 
1135
      TODO: all operations creating/deleting the index file or a log, should
 
1136
      call my_sync_dir() or my_sync_dir_by_file() to be durable.
 
1137
      TODO: file creation should be done with my_create() not my_open().
 
1138
    */
 
1139
    if (index_file_nr >= 0)
 
1140
      my_close(index_file_nr,MYF(0));
 
1141
    return true;
 
1142
  }
 
1143
  return false;
 
1144
}
 
1145
 
 
1146
 
 
1147
/**
 
1148
  Open a (new) binlog file.
 
1149
 
 
1150
  - Open the log file and the index file. Register the new
 
1151
  file name in it
 
1152
  - When calling this when the file is in use, you must have a locks
 
1153
  on LOCK_log and LOCK_index.
 
1154
 
 
1155
  @retval
 
1156
    0   ok
 
1157
  @retval
 
1158
    1   error
 
1159
*/
 
1160
 
 
1161
bool DRIZZLE_BIN_LOG::open(const char *log_name,
 
1162
                         enum_log_type log_type_arg,
 
1163
                         const char *new_name,
 
1164
                         enum cache_type io_cache_type_arg,
 
1165
                         bool no_auto_events_arg,
 
1166
                         ulong max_size_arg,
 
1167
                         bool null_created_arg)
 
1168
{
 
1169
  File file= -1;
 
1170
 
 
1171
  write_error=0;
 
1172
 
 
1173
  /* open the main log file */
 
1174
  if (DRIZZLE_LOG::open(log_name, log_type_arg, new_name, io_cache_type_arg))
 
1175
    return(1);                            /* all warnings issued */
 
1176
 
 
1177
  init(no_auto_events_arg, max_size_arg);
 
1178
 
 
1179
  open_count++;
 
1180
 
 
1181
  assert(log_type == LOG_BIN);
 
1182
 
 
1183
  {
 
1184
    bool write_file_name_to_index_file=0;
 
1185
 
 
1186
    if (!my_b_filelength(&log_file))
 
1187
    {
 
1188
      /*
 
1189
        The binary log file was empty (probably newly created)
 
1190
        This is the normal case and happens when the user doesn't specify
 
1191
        an extension for the binary log files.
 
1192
        In this case we write a standard header to it.
 
1193
      */
 
1194
      if (my_b_safe_write(&log_file, (unsigned char*) BINLOG_MAGIC,
 
1195
                          BIN_LOG_HEADER_SIZE))
 
1196
        goto err;
 
1197
      bytes_written+= BIN_LOG_HEADER_SIZE;
 
1198
      write_file_name_to_index_file= 1;
 
1199
    }
 
1200
 
 
1201
    assert(my_b_inited(&index_file) != 0);
 
1202
    reinit_io_cache(&index_file, WRITE_CACHE,
 
1203
                    my_b_filelength(&index_file), 0, 0);
 
1204
    if (need_start_event && !no_auto_events)
 
1205
    {
 
1206
      /*
 
1207
        In 4.x we set need_start_event=0 here, but in 5.0 we want a Start event
 
1208
        even if this is not the very first binlog.
 
1209
      */
 
1210
      Format_description_log_event s(BINLOG_VERSION);
 
1211
      /*
 
1212
        don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache
 
1213
        as we won't be able to reset it later
 
1214
      */
 
1215
      if (io_cache_type == WRITE_CACHE)
 
1216
        s.flags|= LOG_EVENT_BINLOG_IN_USE_F;
 
1217
      if (!s.is_valid())
 
1218
        goto err;
 
1219
      s.dont_set_created= null_created_arg;
 
1220
      if (s.write(&log_file))
 
1221
        goto err;
 
1222
      bytes_written+= s.data_written;
 
1223
    }
 
1224
    if (description_event_for_queue &&
 
1225
        description_event_for_queue->binlog_version>=4)
 
1226
    {
 
1227
      /*
 
1228
        This is a relay log written to by the I/O slave thread.
 
1229
        Write the event so that others can later know the format of this relay
 
1230
        log.
 
1231
        Note that this event is very close to the original event from the
 
1232
        master (it has binlog version of the master, event types of the
 
1233
        master), so this is suitable to parse the next relay log's event. It
 
1234
        has been produced by
 
1235
        Format_description_log_event::Format_description_log_event(char* buf,).
 
1236
        Why don't we want to write the description_event_for_queue if this
 
1237
        event is for format<4 (3.23 or 4.x): this is because in that case, the
 
1238
        description_event_for_queue describes the data received from the
 
1239
        master, but not the data written to the relay log (*conversion*),
 
1240
        which is in format 4 (slave's).
 
1241
      */
 
1242
      /*
 
1243
        Set 'created' to 0, so that in next relay logs this event does not
 
1244
        trigger cleaning actions on the slave in
 
1245
        Format_description_log_event::apply_event_impl().
 
1246
      */
 
1247
      description_event_for_queue->created= 0;
 
1248
      /* Don't set log_pos in event header */
 
1249
      description_event_for_queue->artificial_event=1;
 
1250
 
 
1251
      if (description_event_for_queue->write(&log_file))
 
1252
        goto err;
 
1253
      bytes_written+= description_event_for_queue->data_written;
 
1254
    }
 
1255
    if (flush_io_cache(&log_file) ||
 
1256
        my_sync(log_file.file, MYF(MY_WME)))
 
1257
      goto err;
 
1258
 
 
1259
    if (write_file_name_to_index_file)
 
1260
    {
 
1261
      /*
 
1262
        As this is a new log file, we write the file name to the index
 
1263
        file. As every time we write to the index file, we sync it.
 
1264
      */
 
1265
      if (my_b_write(&index_file, (unsigned char*) log_file_name,
 
1266
                     strlen(log_file_name)) ||
 
1267
          my_b_write(&index_file, (unsigned char*) "\n", 1) ||
 
1268
          flush_io_cache(&index_file) ||
 
1269
          my_sync(index_file.file, MYF(MY_WME)))
 
1270
        goto err;
 
1271
    }
 
1272
  }
 
1273
  log_state= LOG_OPENED;
 
1274
 
 
1275
  return(0);
 
1276
 
 
1277
err:
 
1278
  sql_print_error(_("Could not use %s for logging (error %d). "
 
1279
                    "Turning logging off for the whole duration of the "
 
1280
                    "Drizzle server process. "
 
1281
                    "To turn it on again: fix the cause, "
 
1282
                    "shutdown the Drizzle server and restart it."),
 
1283
                    name, errno);
 
1284
  if (file >= 0)
 
1285
    my_close(file,MYF(0));
 
1286
  end_io_cache(&log_file);
 
1287
  end_io_cache(&index_file);
 
1288
  if (name)
 
1289
  {
 
1290
    free(name);
 
1291
    name= NULL;
 
1292
  }
 
1293
  log_state= LOG_CLOSED;
 
1294
  return(1);
 
1295
}
 
1296
 
 
1297
 
 
1298
int DRIZZLE_BIN_LOG::get_current_log(LOG_INFO* linfo)
 
1299
{
 
1300
  pthread_mutex_lock(&LOCK_log);
 
1301
  int ret = raw_get_current_log(linfo);
 
1302
  pthread_mutex_unlock(&LOCK_log);
 
1303
  return ret;
 
1304
}
 
1305
 
 
1306
int DRIZZLE_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
 
1307
{
 
1308
  strmake(linfo->log_file_name, log_file_name, sizeof(linfo->log_file_name)-1);
 
1309
  linfo->pos = my_b_tell(&log_file);
 
1310
  return 0;
 
1311
}
 
1312
 
 
1313
/**
 
1314
  Move all data up in a file in an filename index file.
 
1315
 
 
1316
    We do the copy outside of the IO_CACHE as the cache buffers would just
 
1317
    make things slower and more complicated.
 
1318
    In most cases the copy loop should only do one read.
 
1319
 
 
1320
  @param index_file                     File to move
 
1321
  @param offset                 Move everything from here to beginning
 
1322
 
 
1323
  @note
 
1324
    File will be truncated to be 'offset' shorter or filled up with newlines
 
1325
 
 
1326
  @retval
 
1327
    0   ok
 
1328
*/
 
1329
 
 
1330
static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset)
 
1331
{
 
1332
  int bytes_read;
 
1333
  my_off_t init_offset= offset;
 
1334
  File file= index_file->file;
 
1335
  unsigned char io_buf[IO_SIZE*2];
 
1336
 
 
1337
  for (;; offset+= bytes_read)
 
1338
  {
 
1339
    (void) my_seek(file, offset, MY_SEEK_SET, MYF(0));
 
1340
    if ((bytes_read= (int) my_read(file, io_buf, sizeof(io_buf), MYF(MY_WME)))
 
1341
        < 0)
 
1342
      goto err;
 
1343
    if (!bytes_read)
 
1344
      break;                                    // end of file
 
1345
    (void) my_seek(file, offset-init_offset, MY_SEEK_SET, MYF(0));
 
1346
    if (my_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
 
1347
      goto err;
 
1348
  }
 
1349
  /* The following will either truncate the file or fill the end with \n' */
 
1350
  if (ftruncate(file, offset - init_offset) || my_sync(file, MYF(MY_WME)))
 
1351
    goto err;
 
1352
 
 
1353
  /* Reset data in old index cache */
 
1354
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 1);
 
1355
  return(0);
 
1356
 
 
1357
err:
 
1358
  return(1);
 
1359
}
 
1360
 
 
1361
/**
 
1362
  Find the position in the log-index-file for the given log name.
 
1363
 
 
1364
  @param linfo          Store here the found log file name and position to
 
1365
                       the NEXT log file name in the index file.
 
1366
  @param log_name       Filename to find in the index file.
 
1367
                       Is a null pointer if we want to read the first entry
 
1368
  @param need_lock      Set this to 1 if the parent doesn't already have a
 
1369
                       lock on LOCK_index
 
1370
 
 
1371
  @note
 
1372
    On systems without the truncate function the file will end with one or
 
1373
    more empty lines.  These will be ignored when reading the file.
 
1374
 
 
1375
  @retval
 
1376
    0                   ok
 
1377
  @retval
 
1378
    LOG_INFO_EOF                End of log-index-file found
 
1379
  @retval
 
1380
    LOG_INFO_IO         Got IO error while reading file
 
1381
*/
 
1382
 
 
1383
int DRIZZLE_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
 
1384
                            bool need_lock)
 
1385
{
 
1386
  int error= 0;
 
1387
  char *fname= linfo->log_file_name;
 
1388
  uint32_t log_name_len= log_name ? (uint) strlen(log_name) : 0;
 
1389
 
 
1390
  /*
 
1391
    Mutex needed because we need to make sure the file pointer does not
 
1392
    move from under our feet
 
1393
  */
 
1394
  if (need_lock)
 
1395
    pthread_mutex_lock(&LOCK_index);
 
1396
  safe_mutex_assert_owner(&LOCK_index);
 
1397
 
 
1398
  /* As the file is flushed, we can't get an error here */
 
1399
  (void) reinit_io_cache(&index_file, READ_CACHE, (my_off_t) 0, 0, 0);
 
1400
 
 
1401
  for (;;)
 
1402
  {
 
1403
    uint32_t length;
 
1404
    my_off_t offset= my_b_tell(&index_file);
 
1405
    /* If we get 0 or 1 characters, this is the end of the file */
 
1406
 
 
1407
    if ((length= my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
 
1408
    {
 
1409
      /* Did not find the given entry; Return not found or error */
 
1410
      error= !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
 
1411
      break;
 
1412
    }
 
1413
 
 
1414
    // if the log entry matches, null string matching anything
 
1415
    if (!log_name ||
 
1416
        (log_name_len == length-1 && fname[log_name_len] == '\n' &&
 
1417
         !memcmp(fname, log_name, log_name_len)))
 
1418
    {
 
1419
      fname[length-1]=0;                        // remove last \n
 
1420
      linfo->index_file_start_offset= offset;
 
1421
      linfo->index_file_offset = my_b_tell(&index_file);
 
1422
      break;
 
1423
    }
 
1424
  }
 
1425
 
 
1426
  if (need_lock)
 
1427
    pthread_mutex_unlock(&LOCK_index);
 
1428
  return(error);
 
1429
}
 
1430
 
 
1431
 
 
1432
/**
 
1433
  Find the position in the log-index-file for the given log name.
 
1434
 
 
1435
  @param
 
1436
    linfo               Store here the next log file name and position to
 
1437
                        the file name after that.
 
1438
  @param
 
1439
    need_lock           Set this to 1 if the parent doesn't already have a
 
1440
                        lock on LOCK_index
 
1441
 
 
1442
  @note
 
1443
    - Before calling this function, one has to call find_log_pos()
 
1444
    to set up 'linfo'
 
1445
    - Mutex needed because we need to make sure the file pointer does not move
 
1446
    from under our feet
 
1447
 
 
1448
  @retval
 
1449
    0                   ok
 
1450
  @retval
 
1451
    LOG_INFO_EOF                End of log-index-file found
 
1452
  @retval
 
1453
    LOG_INFO_IO         Got IO error while reading file
 
1454
*/
 
1455
 
 
1456
int DRIZZLE_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock)
 
1457
{
 
1458
  int error= 0;
 
1459
  uint32_t length;
 
1460
  char *fname= linfo->log_file_name;
 
1461
 
 
1462
  if (need_lock)
 
1463
    pthread_mutex_lock(&LOCK_index);
 
1464
  safe_mutex_assert_owner(&LOCK_index);
 
1465
 
 
1466
  /* As the file is flushed, we can't get an error here */
 
1467
  (void) reinit_io_cache(&index_file, READ_CACHE, linfo->index_file_offset, 0,
 
1468
                         0);
 
1469
 
 
1470
  linfo->index_file_start_offset= linfo->index_file_offset;
 
1471
  if ((length=my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
 
1472
  {
 
1473
    error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
 
1474
    goto err;
 
1475
  }
 
1476
  fname[length-1]=0;                            // kill \n
 
1477
  linfo->index_file_offset = my_b_tell(&index_file);
 
1478
 
 
1479
err:
 
1480
  if (need_lock)
 
1481
    pthread_mutex_unlock(&LOCK_index);
 
1482
  return error;
 
1483
}
 
1484
 
 
1485
 
 
1486
/**
 
1487
  Delete all logs refered to in the index file.
 
1488
  Start writing to a new log file.
 
1489
 
 
1490
  The new index file will only contain this file.
 
1491
 
 
1492
  @param thd            Thread
 
1493
 
 
1494
  @note
 
1495
    If not called from slave thread, write start event to new log
 
1496
 
 
1497
  @retval
 
1498
    0   ok
 
1499
  @retval
 
1500
    1   error
 
1501
*/
 
1502
 
 
1503
bool DRIZZLE_BIN_LOG::reset_logs(THD* thd)
 
1504
{
 
1505
  LOG_INFO linfo;
 
1506
  bool error=0;
 
1507
  const char* save_name;
 
1508
 
 
1509
  /*
 
1510
    We need to get both locks to be sure that no one is trying to
 
1511
    write to the index log file.
 
1512
  */
 
1513
  pthread_mutex_lock(&LOCK_log);
 
1514
  pthread_mutex_lock(&LOCK_index);
 
1515
 
 
1516
  /*
 
1517
    The following mutex is needed to ensure that no threads call
 
1518
    'delete thd' as we would then risk missing a 'rollback' from this
 
1519
    thread. If the transaction involved MyISAM tables, it should go
 
1520
    into binlog even on rollback.
 
1521
  */
 
1522
  pthread_mutex_lock(&LOCK_thread_count);
 
1523
 
 
1524
  /* Save variables so that we can reopen the log */
 
1525
  save_name=name;
 
1526
  name=0;                                       // Protect against free
 
1527
  close(LOG_CLOSE_TO_BE_OPENED);
 
1528
 
 
1529
  /* First delete all old log files */
 
1530
 
 
1531
  if (find_log_pos(&linfo, NULL, 0))
 
1532
  {
 
1533
    error=1;
 
1534
    goto err;
 
1535
  }
 
1536
 
 
1537
  for (;;)
 
1538
  {
 
1539
    if ((error= my_delete_allow_opened(linfo.log_file_name, MYF(0))) != 0)
 
1540
    {
 
1541
      if (my_errno == ENOENT) 
 
1542
      {
 
1543
        push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1544
                            ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
 
1545
                            linfo.log_file_name);
 
1546
        sql_print_information(_("Failed to delete file '%s'"),
 
1547
                              linfo.log_file_name);
 
1548
        my_errno= 0;
 
1549
        error= 0;
 
1550
      }
 
1551
      else
 
1552
      {
 
1553
        push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
 
1554
                            ER_BINLOG_PURGE_FATAL_ERR,
 
1555
                            _("a problem with deleting %s; "
 
1556
                            "consider examining correspondence "
 
1557
                            "of your binlog index file "
 
1558
                            "to the actual binlog files"),
 
1559
                            linfo.log_file_name);
 
1560
        error= 1;
 
1561
        goto err;
 
1562
      }
 
1563
    }
 
1564
    if (find_next_log(&linfo, 0))
 
1565
      break;
 
1566
  }
 
1567
 
 
1568
  /* Start logging with a new file */
 
1569
  close(LOG_CLOSE_INDEX);
 
1570
  if ((error= my_delete_allow_opened(index_file_name, MYF(0)))) // Reset (open will update)
 
1571
  {
 
1572
    if (my_errno == ENOENT) 
 
1573
    {
 
1574
      push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1575
                          ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
 
1576
                          index_file_name);
 
1577
      sql_print_information(_("Failed to delete file '%s'"),
 
1578
                            index_file_name);
 
1579
      my_errno= 0;
 
1580
      error= 0;
 
1581
    }
 
1582
    else
 
1583
    {
 
1584
      push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
 
1585
                          ER_BINLOG_PURGE_FATAL_ERR,
 
1586
                          "a problem with deleting %s; "
 
1587
                          "consider examining correspondence "
 
1588
                          "of your binlog index file "
 
1589
                          "to the actual binlog files",
 
1590
                          index_file_name);
 
1591
      error= 1;
 
1592
      goto err;
 
1593
    }
 
1594
  }
 
1595
  if (!thd->slave_thread)
 
1596
    need_start_event=1;
 
1597
  if (!open_index_file(index_file_name, 0))
 
1598
    open(save_name, log_type, 0, io_cache_type, no_auto_events, max_size, 0);
 
1599
  free((unsigned char*) save_name);
 
1600
 
 
1601
err:
 
1602
  pthread_mutex_unlock(&LOCK_thread_count);
 
1603
  pthread_mutex_unlock(&LOCK_index);
 
1604
  pthread_mutex_unlock(&LOCK_log);
 
1605
  return(error);
 
1606
}
 
1607
 
 
1608
 
 
1609
/**
 
1610
  Delete relay log files prior to rli->group_relay_log_name
 
1611
  (i.e. all logs which are not involved in a non-finished group
 
1612
  (transaction)), remove them from the index file and start on next
 
1613
  relay log.
 
1614
 
 
1615
  IMPLEMENTATION
 
1616
  - Protects index file with LOCK_index
 
1617
  - Delete relevant relay log files
 
1618
  - Copy all file names after these ones to the front of the index file
 
1619
  - If the OS has truncate, truncate the file, else fill it with \n'
 
1620
  - Read the next file name from the index file and store in rli->linfo
 
1621
 
 
1622
  @param rli           Relay log information
 
1623
  @param included     If false, all relay logs that are strictly before
 
1624
                      rli->group_relay_log_name are deleted ; if true, the
 
1625
                      latter is deleted too (i.e. all relay logs
 
1626
                      read by the SQL slave thread are deleted).
 
1627
 
 
1628
  @note
 
1629
    - This is only called from the slave-execute thread when it has read
 
1630
    all commands from a relay log and want to switch to a new relay log.
 
1631
    - When this happens, we can be in an active transaction as
 
1632
    a transaction can span over two relay logs
 
1633
    (although it is always written as a single block to the master's binary
 
1634
    log, hence cannot span over two master's binary logs).
 
1635
 
 
1636
  @retval
 
1637
    0                   ok
 
1638
  @retval
 
1639
    LOG_INFO_EOF                End of log-index-file found
 
1640
  @retval
 
1641
    LOG_INFO_SEEK       Could not allocate IO cache
 
1642
  @retval
 
1643
    LOG_INFO_IO         Got IO error while reading file
 
1644
*/
 
1645
 
 
1646
 
 
1647
int DRIZZLE_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
 
1648
{
 
1649
  int error;
 
1650
 
 
1651
  assert(is_open());
 
1652
  assert(rli->slave_running == 1);
 
1653
  assert(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
 
1654
 
 
1655
  pthread_mutex_lock(&LOCK_index);
 
1656
  pthread_mutex_lock(&rli->log_space_lock);
 
1657
  rli->relay_log.purge_logs(rli->group_relay_log_name, included,
 
1658
                            0, 0, &rli->log_space_total);
 
1659
  // Tell the I/O thread to take the relay_log_space_limit into account
 
1660
  rli->ignore_log_space_limit= 0;
 
1661
  pthread_mutex_unlock(&rli->log_space_lock);
 
1662
 
 
1663
  /*
 
1664
    Ok to broadcast after the critical region as there is no risk of
 
1665
    the mutex being destroyed by this thread later - this helps save
 
1666
    context switches
 
1667
  */
 
1668
  pthread_cond_broadcast(&rli->log_space_cond);
 
1669
  
 
1670
  /*
 
1671
    Read the next log file name from the index file and pass it back to
 
1672
    the caller
 
1673
    If included is true, we want the first relay log;
 
1674
    otherwise we want the one after event_relay_log_name.
 
1675
  */
 
1676
  if ((included && (error=find_log_pos(&rli->linfo, NULL, 0))) ||
 
1677
      (!included &&
 
1678
       ((error=find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)) ||
 
1679
        (error=find_next_log(&rli->linfo, 0)))))
 
1680
  {
 
1681
    char buff[22];
 
1682
    sql_print_error(_("next log error: %d  offset: %s  log: %s included: %d"),
 
1683
                    error,
 
1684
                    llstr(rli->linfo.index_file_offset,buff),
 
1685
                    rli->group_relay_log_name,
 
1686
                    included);
 
1687
    goto err;
 
1688
  }
 
1689
 
 
1690
  /*
 
1691
    Reset rli's coordinates to the current log.
 
1692
  */
 
1693
  rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
 
1694
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
1695
          sizeof(rli->event_relay_log_name)-1);
 
1696
 
 
1697
  /*
 
1698
    If we removed the rli->group_relay_log_name file,
 
1699
    we must update the rli->group* coordinates, otherwise do not touch it as the
 
1700
    group's execution is not finished (e.g. COMMIT not executed)
 
1701
  */
 
1702
  if (included)
 
1703
  {
 
1704
    rli->group_relay_log_pos = BIN_LOG_HEADER_SIZE;
 
1705
    strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
 
1706
            sizeof(rli->group_relay_log_name)-1);
 
1707
    rli->notify_group_relay_log_name_update();
 
1708
  }
 
1709
 
 
1710
  /* Store where we are in the new file for the execution thread */
 
1711
  flush_relay_log_info(rli);
 
1712
 
 
1713
err:
 
1714
  pthread_mutex_unlock(&LOCK_index);
 
1715
  return(error);
 
1716
}
 
1717
 
 
1718
/**
 
1719
  Update log index_file.
 
1720
*/
 
1721
 
 
1722
int DRIZZLE_BIN_LOG::update_log_index(LOG_INFO* log_info, bool need_update_threads)
 
1723
{
 
1724
  if (copy_up_file_and_fill(&index_file, log_info->index_file_start_offset))
 
1725
    return LOG_INFO_IO;
 
1726
 
 
1727
  // now update offsets in index file for running threads
 
1728
  if (need_update_threads)
 
1729
    adjust_linfo_offsets(log_info->index_file_start_offset);
 
1730
  return 0;
 
1731
}
 
1732
 
 
1733
/**
 
1734
  Remove all logs before the given log from disk and from the index file.
 
1735
 
 
1736
  @param to_log       Delete all log file name before this file.
 
1737
  @param included            If true, to_log is deleted too.
 
1738
  @param need_mutex
 
1739
  @param need_update_threads If we want to update the log coordinates of
 
1740
                             all threads. False for relay logs, true otherwise.
 
1741
  @param freed_log_space     If not null, decrement this variable of
 
1742
                             the amount of log space freed
 
1743
 
 
1744
  @note
 
1745
    If any of the logs before the deleted one is in use,
 
1746
    only purge logs up to this one.
 
1747
 
 
1748
  @retval
 
1749
    0                   ok
 
1750
  @retval
 
1751
    LOG_INFO_EOF                to_log not found
 
1752
    LOG_INFO_EMFILE             too many files opened
 
1753
    LOG_INFO_FATAL              if any other than ENOENT error from
 
1754
                                stat() or my_delete()
 
1755
*/
 
1756
 
 
1757
int DRIZZLE_BIN_LOG::purge_logs(const char *to_log, 
 
1758
                          bool included,
 
1759
                          bool need_mutex, 
 
1760
                          bool need_update_threads, 
 
1761
                          uint64_t *decrease_log_space)
 
1762
{
 
1763
  int error;
 
1764
  int ret = 0;
 
1765
  bool exit_loop= 0;
 
1766
  LOG_INFO log_info;
 
1767
 
 
1768
  if (need_mutex)
 
1769
    pthread_mutex_lock(&LOCK_index);
 
1770
  if ((error=find_log_pos(&log_info, to_log, 0 /*no mutex*/)))
 
1771
    goto err;
 
1772
 
 
1773
  /*
 
1774
    File name exists in index file; delete until we find this file
 
1775
    or a file that is used.
 
1776
  */
 
1777
  if ((error=find_log_pos(&log_info, NULL, 0 /*no mutex*/)))
 
1778
    goto err;
 
1779
  while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) &&
 
1780
         !log_in_use(log_info.log_file_name))
 
1781
  {
 
1782
    struct stat s;
 
1783
    if (stat(log_info.log_file_name, &s))
 
1784
    {
 
1785
      if (errno == ENOENT) 
 
1786
      {
 
1787
        /*
 
1788
          It's not fatal if we can't stat a log file that does not exist;
 
1789
          If we could not stat, we won't delete.
 
1790
        */     
 
1791
        push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1792
                            ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
 
1793
                            log_info.log_file_name);
 
1794
        sql_print_information(_("Failed to execute stat() on file '%s'"),
 
1795
                              log_info.log_file_name);
 
1796
        my_errno= 0;
 
1797
      }
 
1798
      else
 
1799
      {
 
1800
        /*
 
1801
          Other than ENOENT are fatal
 
1802
        */
 
1803
        push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
 
1804
                            ER_BINLOG_PURGE_FATAL_ERR,
 
1805
                            _("a problem with getting info on being purged %s; "
 
1806
                            "consider examining correspondence "
 
1807
                            "of your binlog index file "
 
1808
                            "to the actual binlog files"),
 
1809
                            log_info.log_file_name);
 
1810
        error= LOG_INFO_FATAL;
 
1811
        goto err;
 
1812
      }
 
1813
    }
 
1814
    else
 
1815
    {
 
1816
      if (!my_delete(log_info.log_file_name, MYF(0)))
 
1817
      {
 
1818
        if (decrease_log_space)
 
1819
          *decrease_log_space-= s.st_size;
 
1820
      }
 
1821
      else
 
1822
      {
 
1823
        if (my_errno == ENOENT) 
 
1824
        {
 
1825
          push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1826
                              ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
 
1827
                              log_info.log_file_name);
 
1828
          sql_print_information(_("Failed to delete file '%s'"),
 
1829
                                log_info.log_file_name);
 
1830
          my_errno= 0;
 
1831
        }
 
1832
        else
 
1833
        {
 
1834
          push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
 
1835
                              ER_BINLOG_PURGE_FATAL_ERR,
 
1836
                              _("a problem with deleting %s; "
 
1837
                              "consider examining correspondence "
 
1838
                              "of your binlog index file "
 
1839
                              "to the actual binlog files"),
 
1840
                              log_info.log_file_name);
 
1841
          if (my_errno == EMFILE)
 
1842
          {
 
1843
            error= LOG_INFO_EMFILE;
 
1844
          }
 
1845
          error= LOG_INFO_FATAL;
 
1846
          goto err;
 
1847
        }
 
1848
      }
 
1849
    }
 
1850
 
 
1851
    if (find_next_log(&log_info, 0) || exit_loop)
 
1852
      break;
 
1853
  }
 
1854
  
 
1855
  /*
 
1856
    If we get killed -9 here, the sysadmin would have to edit
 
1857
    the log index file after restart - otherwise, this should be safe
 
1858
  */
 
1859
  error= update_log_index(&log_info, need_update_threads);
 
1860
  if (error == 0) {
 
1861
    error = ret;
 
1862
  }
 
1863
 
 
1864
err:
 
1865
  if (need_mutex)
 
1866
    pthread_mutex_unlock(&LOCK_index);
 
1867
  return(error);
 
1868
}
 
1869
 
 
1870
/**
 
1871
  Remove all logs before the given file date from disk and from the
 
1872
  index file.
 
1873
 
 
1874
  @param thd            Thread pointer
 
1875
  @param before_date    Delete all log files before given date.
 
1876
 
 
1877
  @note
 
1878
    If any of the logs before the deleted one is in use,
 
1879
    only purge logs up to this one.
 
1880
 
 
1881
  @retval
 
1882
    0                           ok
 
1883
  @retval
 
1884
    LOG_INFO_PURGE_NO_ROTATE    Binary file that can't be rotated
 
1885
    LOG_INFO_FATAL              if any other than ENOENT error from
 
1886
                                stat() or my_delete()
 
1887
*/
 
1888
 
 
1889
int DRIZZLE_BIN_LOG::purge_logs_before_date(time_t purge_time)
 
1890
{
 
1891
  int error;
 
1892
  LOG_INFO log_info;
 
1893
  struct stat stat_area;
 
1894
 
 
1895
  pthread_mutex_lock(&LOCK_index);
 
1896
 
 
1897
  /*
 
1898
    Delete until we find curren file
 
1899
    or a file that is used or a file
 
1900
    that is older than purge_time.
 
1901
  */
 
1902
  if ((error=find_log_pos(&log_info, NULL, 0 /*no mutex*/)))
 
1903
    goto err;
 
1904
 
 
1905
  while (strcmp(log_file_name, log_info.log_file_name) &&
 
1906
         !log_in_use(log_info.log_file_name))
 
1907
  {
 
1908
    if (stat(log_info.log_file_name, &stat_area))
 
1909
    {
 
1910
      if (errno == ENOENT) 
 
1911
      {
 
1912
        /*
 
1913
          It's not fatal if we can't stat a log file that does not exist.
 
1914
        */     
 
1915
        push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1916
                            ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
 
1917
                            log_info.log_file_name);
 
1918
        sql_print_information(_("Failed to execute stat() on file '%s'"),
 
1919
                              log_info.log_file_name);
 
1920
        my_errno= 0;
 
1921
      }
 
1922
      else
 
1923
      {
 
1924
        /*
 
1925
          Other than ENOENT are fatal
 
1926
        */
 
1927
        push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
 
1928
                            ER_BINLOG_PURGE_FATAL_ERR,
 
1929
                            _("a problem with getting info on being purged %s; "
 
1930
                            "consider examining correspondence "
 
1931
                            "of your binlog index file "
 
1932
                            "to the actual binlog files"),
 
1933
                            log_info.log_file_name);
 
1934
        error= LOG_INFO_FATAL;
 
1935
        goto err;
 
1936
      }
 
1937
    }
 
1938
    else
 
1939
    {
 
1940
      if (stat_area.st_mtime >= purge_time)
 
1941
        break;
 
1942
      if (my_delete(log_info.log_file_name, MYF(0)))
 
1943
      {
 
1944
        if (my_errno == ENOENT) 
 
1945
        {
 
1946
          /* It's not fatal even if we can't delete a log file */
 
1947
          push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1948
                              ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
 
1949
                              log_info.log_file_name);
 
1950
          sql_print_information(_("Failed to delete file '%s'"),
 
1951
                                log_info.log_file_name);
 
1952
          my_errno= 0;
 
1953
        }
 
1954
        else
 
1955
        {
 
1956
          push_warning_printf(current_thd, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
 
1957
                              ER_BINLOG_PURGE_FATAL_ERR,
 
1958
                              _("a problem with deleting %s; "
 
1959
                              "consider examining correspondence "
 
1960
                              "of your binlog index file "
 
1961
                              "to the actual binlog files"),
 
1962
                              log_info.log_file_name);
 
1963
          error= LOG_INFO_FATAL;
 
1964
          goto err;
 
1965
        }
 
1966
      }
 
1967
    }
 
1968
    if (find_next_log(&log_info, 0))
 
1969
      break;
 
1970
  }
 
1971
 
 
1972
  /*
 
1973
    If we get killed -9 here, the sysadmin would have to edit
 
1974
    the log index file after restart - otherwise, this should be safe
 
1975
  */
 
1976
  error= update_log_index(&log_info, 1);
 
1977
 
 
1978
err:
 
1979
  pthread_mutex_unlock(&LOCK_index);
 
1980
  return(error);
 
1981
}
 
1982
 
 
1983
 
 
1984
/**
 
1985
  Create a new log file name.
 
1986
 
 
1987
  @param buf            buf of at least FN_REFLEN where new name is stored
 
1988
 
 
1989
  @note
 
1990
    If file name will be longer then FN_REFLEN it will be truncated
 
1991
*/
 
1992
 
 
1993
void DRIZZLE_BIN_LOG::make_log_name(char* buf, const char* log_ident)
 
1994
{
 
1995
  uint32_t dir_len = dirname_length(log_file_name); 
 
1996
  if (dir_len >= FN_REFLEN)
 
1997
    dir_len=FN_REFLEN-1;
 
1998
  my_stpncpy(buf, log_file_name, dir_len);
 
1999
  strmake(buf+dir_len, log_ident, FN_REFLEN - dir_len -1);
 
2000
}
 
2001
 
 
2002
 
 
2003
/**
 
2004
  Check if we are writing/reading to the given log file.
 
2005
*/
 
2006
 
 
2007
bool DRIZZLE_BIN_LOG::is_active(const char *log_file_name_arg)
 
2008
{
 
2009
  return !strcmp(log_file_name, log_file_name_arg);
 
2010
}
 
2011
 
 
2012
 
 
2013
/*
 
2014
  Wrappers around new_file_impl to avoid using argument
 
2015
  to control locking. The argument 1) less readable 2) breaks
 
2016
  incapsulation 3) allows external access to the class without
 
2017
  a lock (which is not possible with private new_file_without_locking
 
2018
  method).
 
2019
*/
 
2020
 
 
2021
void DRIZZLE_BIN_LOG::new_file()
 
2022
{
 
2023
  new_file_impl(1);
 
2024
}
 
2025
 
 
2026
 
 
2027
void DRIZZLE_BIN_LOG::new_file_without_locking()
 
2028
{
 
2029
  new_file_impl(0);
 
2030
}
 
2031
 
 
2032
 
 
2033
/**
 
2034
  Start writing to a new log file or reopen the old file.
 
2035
 
 
2036
  @param need_lock              Set to 1 if caller has not locked LOCK_log
 
2037
 
 
2038
  @note
 
2039
    The new file name is stored last in the index file
 
2040
*/
 
2041
 
 
2042
void DRIZZLE_BIN_LOG::new_file_impl(bool need_lock)
 
2043
{
 
2044
  char new_name[FN_REFLEN], *new_name_ptr, *old_name;
 
2045
 
 
2046
  if (!is_open())
 
2047
  {
 
2048
    return;
 
2049
  }
 
2050
 
 
2051
  if (need_lock)
 
2052
    pthread_mutex_lock(&LOCK_log);
 
2053
  pthread_mutex_lock(&LOCK_index);
 
2054
 
 
2055
  safe_mutex_assert_owner(&LOCK_log);
 
2056
  safe_mutex_assert_owner(&LOCK_index);
 
2057
 
 
2058
  /*
 
2059
    if binlog is used as tc log, be sure all xids are "unlogged",
 
2060
    so that on recover we only need to scan one - latest - binlog file
 
2061
    for prepared xids. As this is expected to be a rare event,
 
2062
    simple wait strategy is enough. We're locking LOCK_log to be sure no
 
2063
    new Xid_log_event's are added to the log (and prepared_xids is not
 
2064
    increased), and waiting on COND_prep_xids for late threads to
 
2065
    catch up.
 
2066
  */
 
2067
  if (prepared_xids)
 
2068
  {
 
2069
    tc_log_page_waits++;
 
2070
    pthread_mutex_lock(&LOCK_prep_xids);
 
2071
    while (prepared_xids) {
 
2072
      pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
 
2073
    }
 
2074
    pthread_mutex_unlock(&LOCK_prep_xids);
 
2075
  }
 
2076
 
 
2077
  /* Reuse old name if not binlog and not update log */
 
2078
  new_name_ptr= name;
 
2079
 
 
2080
  /*
 
2081
    If user hasn't specified an extension, generate a new log name
 
2082
    We have to do this here and not in open as we want to store the
 
2083
    new file name in the current binary log file.
 
2084
  */
 
2085
  if (generate_new_name(new_name, name))
 
2086
    goto end;
 
2087
  new_name_ptr=new_name;
 
2088
 
 
2089
  if (log_type == LOG_BIN)
 
2090
  {
 
2091
    if (!no_auto_events)
 
2092
    {
 
2093
      /*
 
2094
        We log the whole file name for log file as the user may decide
 
2095
        to change base names at some point.
 
2096
      */
 
2097
      Rotate_log_event r(new_name+dirname_length(new_name),
 
2098
                         0, LOG_EVENT_OFFSET, 0);
 
2099
      r.write(&log_file);
 
2100
      bytes_written += r.data_written;
 
2101
    }
 
2102
    /*
 
2103
      Update needs to be signalled even if there is no rotate event
 
2104
      log rotation should give the waiting thread a signal to
 
2105
      discover EOF and move on to the next log.
 
2106
    */
 
2107
    signal_update();
 
2108
  }
 
2109
  old_name=name;
 
2110
  name=0;                               // Don't free name
 
2111
  close(LOG_CLOSE_TO_BE_OPENED);
 
2112
 
 
2113
  /*
 
2114
     Note that at this point, log_state != LOG_CLOSED (important for is_open()).
 
2115
  */
 
2116
 
 
2117
  /*
 
2118
     new_file() is only used for rotation (in FLUSH LOGS or because size >
 
2119
     max_binlog_size or max_relay_log_size).
 
2120
     If this is a binary log, the Format_description_log_event at the beginning of
 
2121
     the new file should have created=0 (to distinguish with the
 
2122
     Format_description_log_event written at server startup, which should
 
2123
     trigger temp tables deletion on slaves.
 
2124
  */
 
2125
 
 
2126
  open(old_name, log_type, new_name_ptr,
 
2127
       io_cache_type, no_auto_events, max_size, 1);
 
2128
  free(old_name);
 
2129
 
 
2130
end:
 
2131
  if (need_lock)
 
2132
    pthread_mutex_unlock(&LOCK_log);
 
2133
  pthread_mutex_unlock(&LOCK_index);
 
2134
 
 
2135
  return;
 
2136
}
 
2137
 
 
2138
 
 
2139
bool DRIZZLE_BIN_LOG::append(Log_event* ev)
 
2140
{
 
2141
  bool error = 0;
 
2142
  pthread_mutex_lock(&LOCK_log);
 
2143
 
 
2144
  assert(log_file.type == SEQ_READ_APPEND);
 
2145
  /*
 
2146
    Log_event::write() is smart enough to use my_b_write() or
 
2147
    my_b_append() depending on the kind of cache we have.
 
2148
  */
 
2149
  if (ev->write(&log_file))
 
2150
  {
 
2151
    error=1;
 
2152
    goto err;
 
2153
  }
 
2154
  bytes_written+= ev->data_written;
 
2155
  if ((uint) my_b_append_tell(&log_file) > max_size)
 
2156
    new_file_without_locking();
 
2157
 
 
2158
err:
 
2159
  pthread_mutex_unlock(&LOCK_log);
 
2160
  signal_update();                              // Safe as we don't call close
 
2161
  return(error);
 
2162
}
 
2163
 
 
2164
 
 
2165
bool DRIZZLE_BIN_LOG::appendv(const char* buf, uint32_t len,...)
 
2166
{
 
2167
  bool error= 0;
 
2168
  va_list(args);
 
2169
  va_start(args,len);
 
2170
 
 
2171
  assert(log_file.type == SEQ_READ_APPEND);
 
2172
 
 
2173
  safe_mutex_assert_owner(&LOCK_log);
 
2174
  do
 
2175
  {
 
2176
    if (my_b_append(&log_file,(unsigned char*) buf,len))
 
2177
    {
 
2178
      error= 1;
 
2179
      goto err;
 
2180
    }
 
2181
    bytes_written += len;
 
2182
  } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint)));
 
2183
  if ((uint) my_b_append_tell(&log_file) > max_size)
 
2184
    new_file_without_locking();
 
2185
 
 
2186
err:
 
2187
  if (!error)
 
2188
    signal_update();
 
2189
  return(error);
 
2190
}
 
2191
 
 
2192
 
 
2193
bool DRIZZLE_BIN_LOG::flush_and_sync()
 
2194
{
 
2195
  int err=0, fd=log_file.file;
 
2196
  safe_mutex_assert_owner(&LOCK_log);
 
2197
  if (flush_io_cache(&log_file))
 
2198
    return 1;
 
2199
  if (++sync_binlog_counter >= sync_binlog_period && sync_binlog_period)
 
2200
  {
 
2201
    sync_binlog_counter= 0;
 
2202
    err=my_sync(fd, MYF(MY_WME));
 
2203
  }
 
2204
  return err;
 
2205
}
 
2206
 
 
2207
void DRIZZLE_BIN_LOG::start_union_events(THD *thd, query_id_t query_id_param)
 
2208
{
 
2209
  assert(!thd->binlog_evt_union.do_union);
 
2210
  thd->binlog_evt_union.do_union= true;
 
2211
  thd->binlog_evt_union.unioned_events= false;
 
2212
  thd->binlog_evt_union.unioned_events_trans= false;
 
2213
  thd->binlog_evt_union.first_query_id= query_id_param;
 
2214
}
 
2215
 
 
2216
void DRIZZLE_BIN_LOG::stop_union_events(THD *thd)
 
2217
{
 
2218
  assert(thd->binlog_evt_union.do_union);
 
2219
  thd->binlog_evt_union.do_union= false;
 
2220
}
 
2221
 
 
2222
bool DRIZZLE_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param)
 
2223
{
 
2224
  return (thd->binlog_evt_union.do_union && 
 
2225
          query_id_param >= thd->binlog_evt_union.first_query_id);
 
2226
}
 
2227
 
 
2228
 
 
2229
/*
 
2230
  These functions are placed in this file since they need access to
 
2231
  binlog_hton, which has internal linkage.
 
2232
*/
 
2233
 
 
2234
int THD::binlog_setup_trx_data()
 
2235
{
 
2236
  binlog_trx_data *trx_data=
 
2237
    (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
 
2238
 
 
2239
  if (trx_data)
 
2240
    return(0);                             // Already set up
 
2241
 
 
2242
  trx_data= (binlog_trx_data*) my_malloc(sizeof(binlog_trx_data), MYF(MY_ZEROFILL));
 
2243
  if (!trx_data ||
 
2244
      open_cached_file(&trx_data->trans_log, mysql_tmpdir,
 
2245
                       LOG_PREFIX, binlog_cache_size, MYF(MY_WME)))
 
2246
  {
 
2247
    free((unsigned char*)trx_data);
 
2248
    return(1);                      // Didn't manage to set it up
 
2249
  }
 
2250
  thd_set_ha_data(this, binlog_hton, trx_data);
 
2251
 
 
2252
  trx_data= new (thd_get_ha_data(this, binlog_hton)) binlog_trx_data;
 
2253
 
 
2254
  return(0);
 
2255
}
 
2256
 
 
2257
/*
 
2258
  Function to start a statement and optionally a transaction for the
 
2259
  binary log.
 
2260
 
 
2261
  SYNOPSIS
 
2262
    binlog_start_trans_and_stmt()
 
2263
 
 
2264
  DESCRIPTION
 
2265
 
 
2266
    This function does three things:
 
2267
    - Start a transaction if not in autocommit mode or if a BEGIN
 
2268
      statement has been seen.
 
2269
 
 
2270
    - Start a statement transaction to allow us to truncate the binary
 
2271
      log.
 
2272
 
 
2273
    - Save the currrent binlog position so that we can roll back the
 
2274
      statement by truncating the transaction log.
 
2275
 
 
2276
      We only update the saved position if the old one was undefined,
 
2277
      the reason is that there are some cases (e.g., for CREATE-SELECT)
 
2278
      where the position is saved twice (e.g., both in
 
2279
      select_create::prepare() and THD::binlog_write_table_map()) , but
 
2280
      we should use the first. This means that calls to this function
 
2281
      can be used to start the statement before the first table map
 
2282
      event, to include some extra events.
 
2283
 */
 
2284
 
 
2285
void
 
2286
THD::binlog_start_trans_and_stmt()
 
2287
{
 
2288
  binlog_trx_data *trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
 
2289
 
 
2290
  if (trx_data == NULL ||
 
2291
      trx_data->before_stmt_pos == MY_OFF_T_UNDEF)
 
2292
  {
 
2293
    this->binlog_set_stmt_begin();
 
2294
    if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
2295
      trans_register_ha(this, true, binlog_hton);
 
2296
    trans_register_ha(this, false, binlog_hton);
 
2297
    /*
 
2298
      Mark statement transaction as read/write. We never start
 
2299
      a binary log transaction and keep it read-only,
 
2300
      therefore it's best to mark the transaction read/write just
 
2301
      at the same time we start it.
 
2302
      Not necessary to mark the normal transaction read/write
 
2303
      since the statement-level flag will be propagated automatically
 
2304
      inside ha_commit_trans.
 
2305
    */
 
2306
    ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write();
 
2307
  }
 
2308
  return;
 
2309
}
 
2310
 
 
2311
void THD::binlog_set_stmt_begin() {
 
2312
  binlog_trx_data *trx_data=
 
2313
    (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
 
2314
 
 
2315
  /*
 
2316
    The call to binlog_trans_log_savepos() might create the trx_data
 
2317
    structure, if it didn't exist before, so we save the position
 
2318
    into an auto variable and then write it into the transaction
 
2319
    data for the binary log (i.e., trx_data).
 
2320
  */
 
2321
  my_off_t pos= 0;
 
2322
  binlog_trans_log_savepos(this, &pos);
 
2323
  trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
 
2324
  trx_data->before_stmt_pos= pos;
 
2325
}
 
2326
 
 
2327
 
 
2328
/*
 
2329
  Write a table map to the binary log.
 
2330
 */
 
2331
 
 
2332
int THD::binlog_write_table_map(Table *table, bool is_trans)
 
2333
{
 
2334
  int error;
 
2335
 
 
2336
  /* Pre-conditions */
 
2337
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2338
  assert(table->s->table_map_id != UINT32_MAX);
 
2339
 
 
2340
  Table_map_log_event::flag_set const
 
2341
    flags= Table_map_log_event::TM_NO_FLAGS;
 
2342
 
 
2343
  Table_map_log_event
 
2344
    the_event(this, table, table->s->table_map_id, is_trans, flags);
 
2345
 
 
2346
  if (is_trans && binlog_table_maps == 0)
 
2347
    binlog_start_trans_and_stmt();
 
2348
 
 
2349
  if ((error= mysql_bin_log.write(&the_event)))
 
2350
    return(error);
 
2351
 
 
2352
  binlog_table_maps++;
 
2353
  table->s->table_map_version= mysql_bin_log.table_map_version();
 
2354
  return(0);
 
2355
}
 
2356
 
 
2357
Rows_log_event*
 
2358
THD::binlog_get_pending_rows_event() const
 
2359
{
 
2360
  binlog_trx_data *const trx_data=
 
2361
    (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
 
2362
  /*
 
2363
    This is less than ideal, but here's the story: If there is no
 
2364
    trx_data, prepare_pending_rows_event() has never been called
 
2365
    (since the trx_data is set up there). In that case, we just return
 
2366
    NULL.
 
2367
   */
 
2368
  return trx_data ? trx_data->pending() : NULL;
 
2369
}
 
2370
 
 
2371
void
 
2372
THD::binlog_set_pending_rows_event(Rows_log_event* ev)
 
2373
{
 
2374
  if (thd_get_ha_data(this, binlog_hton) == NULL)
 
2375
    binlog_setup_trx_data();
 
2376
 
 
2377
  binlog_trx_data *const trx_data=
 
2378
    (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
 
2379
 
 
2380
  assert(trx_data);
 
2381
  trx_data->set_pending(ev);
 
2382
}
 
2383
 
 
2384
 
 
2385
/*
 
2386
  Moves the last bunch of rows from the pending Rows event to the binlog
 
2387
  (either cached binlog if transaction, or disk binlog). Sets a new pending
 
2388
  event.
 
2389
*/
 
2390
int
 
2391
DRIZZLE_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
 
2392
                                                Rows_log_event* event)
 
2393
{
 
2394
  assert(mysql_bin_log.is_open());
 
2395
 
 
2396
  int error= 0;
 
2397
 
 
2398
  binlog_trx_data *const trx_data=
 
2399
    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
2400
 
 
2401
  assert(trx_data);
 
2402
 
 
2403
  if (Rows_log_event* pending= trx_data->pending())
 
2404
  {
 
2405
    IO_CACHE *file= &log_file;
 
2406
 
 
2407
    /*
 
2408
      Decide if we should write to the log file directly or to the
 
2409
      transaction log.
 
2410
    */
 
2411
    if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log))
 
2412
      file= &trx_data->trans_log;
 
2413
 
 
2414
    /*
 
2415
      If we are writing to the log file directly, we could avoid
 
2416
      locking the log. This does not work since we need to step the
 
2417
      m_table_map_version below, and that change has to be protected
 
2418
      by the LOCK_log mutex.
 
2419
    */
 
2420
    pthread_mutex_lock(&LOCK_log);
 
2421
 
 
2422
    /*
 
2423
      Write pending event to log file or transaction cache
 
2424
    */
 
2425
    if (pending->write(file))
 
2426
    {
 
2427
      pthread_mutex_unlock(&LOCK_log);
 
2428
      return(1);
 
2429
    }
 
2430
 
 
2431
    /*
 
2432
      We step the table map version if we are writing an event
 
2433
      representing the end of a statement.  We do this regardless of
 
2434
      wheather we write to the transaction cache or to directly to the
 
2435
      file.
 
2436
 
 
2437
      In an ideal world, we could avoid stepping the table map version
 
2438
      if we were writing to a transaction cache, since we could then
 
2439
      reuse the table map that was written earlier in the transaction
 
2440
      cache.  This does not work since STMT_END_F implies closing all
 
2441
      table mappings on the slave side.
 
2442
 
 
2443
      TODO: Find a solution so that table maps does not have to be
 
2444
      written several times within a transaction.
 
2445
     */
 
2446
    if (pending->get_flags(Rows_log_event::STMT_END_F))
 
2447
      ++m_table_map_version;
 
2448
 
 
2449
    delete pending;
 
2450
 
 
2451
    if (file == &log_file)
 
2452
    {
 
2453
      error= flush_and_sync();
 
2454
      if (!error)
 
2455
      {
 
2456
        signal_update();
 
2457
        rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
 
2458
      }
 
2459
    }
 
2460
 
 
2461
    pthread_mutex_unlock(&LOCK_log);
 
2462
  }
 
2463
 
 
2464
  thd->binlog_set_pending_rows_event(event);
 
2465
 
 
2466
  return(error);
 
2467
}
 
2468
 
 
2469
/**
 
2470
  Write an event to the binary log.
 
2471
*/
 
2472
 
 
2473
bool DRIZZLE_BIN_LOG::write(Log_event *event_info)
 
2474
{
 
2475
  THD *thd= event_info->thd;
 
2476
  bool error= 1;
 
2477
 
 
2478
  if (thd->binlog_evt_union.do_union)
 
2479
  {
 
2480
    /*
 
2481
      In Stored function; Remember that function call caused an update.
 
2482
      We will log the function call to the binary log on function exit
 
2483
    */
 
2484
    thd->binlog_evt_union.unioned_events= true;
 
2485
    thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt;
 
2486
    return(0);
 
2487
  }
 
2488
 
 
2489
  /*
 
2490
    Flush the pending rows event to the transaction cache or to the
 
2491
    log file.  Since this function potentially aquire the LOCK_log
 
2492
    mutex, we do this before aquiring the LOCK_log mutex in this
 
2493
    function.
 
2494
 
 
2495
    We only end the statement if we are in a top-level statement.  If
 
2496
    we are inside a stored function, we do not end the statement since
 
2497
    this will close all tables on the slave.
 
2498
  */
 
2499
  bool const end_stmt= false;
 
2500
  thd->binlog_flush_pending_rows_event(end_stmt);
 
2501
 
 
2502
  pthread_mutex_lock(&LOCK_log);
 
2503
 
 
2504
  /*
 
2505
     In most cases this is only called if 'is_open()' is true; in fact this is
 
2506
     mostly called if is_open() *was* true a few instructions before, but it
 
2507
     could have changed since.
 
2508
  */
 
2509
  if (likely(is_open()))
 
2510
  {
 
2511
    IO_CACHE *file= &log_file;
 
2512
    /*
 
2513
      In the future we need to add to the following if tests like
 
2514
      "do the involved tables match (to be implemented)
 
2515
      binlog_[wild_]{do|ignore}_table?" (WL#1049)"
 
2516
    */
 
2517
    const char *local_db= event_info->get_db();
 
2518
    if ((thd && !(thd->options & OPTION_BIN_LOG)) ||
 
2519
        (!binlog_filter->db_ok(local_db)))
 
2520
    {
 
2521
      pthread_mutex_unlock(&LOCK_log);
 
2522
      return(0);
 
2523
    }
 
2524
 
 
2525
    /*
 
2526
      Should we write to the binlog cache or to the binlog on disk?
 
2527
      Write to the binlog cache if:
 
2528
      - it is already not empty (meaning we're in a transaction; note that the
 
2529
     present event could be about a non-transactional table, but still we need
 
2530
     to write to the binlog cache in that case to handle updates to mixed
 
2531
     trans/non-trans table types the best possible in binlogging)
 
2532
      - or if the event asks for it (cache_stmt == TRUE).
 
2533
    */
 
2534
    if (opt_using_transactions && thd)
 
2535
    {
 
2536
      if (thd->binlog_setup_trx_data())
 
2537
        goto err;
 
2538
 
 
2539
      binlog_trx_data *const trx_data=
 
2540
        (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
2541
      IO_CACHE *trans_log= &trx_data->trans_log;
 
2542
      my_off_t trans_log_pos= my_b_tell(trans_log);
 
2543
      if (event_info->get_cache_stmt() || trans_log_pos != 0)
 
2544
      {
 
2545
        if (trans_log_pos == 0)
 
2546
          thd->binlog_start_trans_and_stmt();
 
2547
        file= trans_log;
 
2548
      }
 
2549
      /*
 
2550
        TODO as Mats suggested, for all the cases above where we write to
 
2551
        trans_log, it sounds unnecessary to lock LOCK_log. We should rather
 
2552
        test first if we want to write to trans_log, and if not, lock
 
2553
        LOCK_log.
 
2554
      */
 
2555
    }
 
2556
 
 
2557
    /*
 
2558
      No check for auto events flag here - this write method should
 
2559
      never be called if auto-events are enabled
 
2560
    */
 
2561
 
 
2562
    /*
 
2563
      1. Write first log events which describe the 'run environment'
 
2564
      of the SQL command
 
2565
    */
 
2566
 
 
2567
    /*
 
2568
      If row-based binlogging, Insert_id, Rand and other kind of "setting
 
2569
      context" events are not needed.
 
2570
    */
 
2571
    if (thd)
 
2572
    {
 
2573
      if (!thd->current_stmt_binlog_row_based)
 
2574
      {
 
2575
        if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt)
 
2576
        {
 
2577
          Intvar_log_event e(thd,(unsigned char) LAST_INSERT_ID_EVENT,
 
2578
                             thd->first_successful_insert_id_in_prev_stmt_for_binlog);
 
2579
          if (e.write(file))
 
2580
            goto err;
 
2581
        }
 
2582
        if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
 
2583
        {
 
2584
          /*
 
2585
            If the auto_increment was second in a table's index (possible with
 
2586
            MyISAM or BDB) (table->next_number_keypart != 0), such event is
 
2587
            in fact not necessary. We could avoid logging it.
 
2588
          */
 
2589
          Intvar_log_event e(thd, (unsigned char) INSERT_ID_EVENT,
 
2590
                             thd->auto_inc_intervals_in_cur_stmt_for_binlog.
 
2591
                             minimum());
 
2592
          if (e.write(file))
 
2593
            goto err;
 
2594
        }
 
2595
        if (thd->rand_used)
 
2596
        {
 
2597
          Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2);
 
2598
          if (e.write(file))
 
2599
            goto err;
 
2600
        }
 
2601
        if (thd->user_var_events.elements)
 
2602
        {
 
2603
          for (uint32_t i= 0; i < thd->user_var_events.elements; i++)
 
2604
          {
 
2605
            BINLOG_USER_VAR_EVENT *user_var_event;
 
2606
            get_dynamic(&thd->user_var_events,(unsigned char*) &user_var_event, i);
 
2607
            User_var_log_event e(thd, user_var_event->user_var_event->name.str,
 
2608
                                 user_var_event->user_var_event->name.length,
 
2609
                                 user_var_event->value,
 
2610
                                 user_var_event->length,
 
2611
                                 user_var_event->type,
 
2612
                                 user_var_event->charset_number);
 
2613
            if (e.write(file))
 
2614
              goto err;
 
2615
          }
 
2616
        }
 
2617
      }
 
2618
    }
 
2619
 
 
2620
    /*
 
2621
       Write the SQL command
 
2622
     */
 
2623
 
 
2624
    if (event_info->write(file))
 
2625
      goto err;
 
2626
 
 
2627
    if (file == &log_file) // we are writing to the real log (disk)
 
2628
    {
 
2629
      if (flush_and_sync())
 
2630
        goto err;
 
2631
      signal_update();
 
2632
      rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
 
2633
    }
 
2634
    error=0;
 
2635
 
 
2636
err:
 
2637
    if (error)
 
2638
    {
 
2639
      if (my_errno == EFBIG)
 
2640
        my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(0));
 
2641
      else
 
2642
        my_error(ER_ERROR_ON_WRITE, MYF(0), name, errno);
 
2643
      write_error=1;
 
2644
    }
 
2645
  }
 
2646
 
 
2647
  if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F)
 
2648
    ++m_table_map_version;
 
2649
 
 
2650
  pthread_mutex_unlock(&LOCK_log);
 
2651
  return(error);
 
2652
}
 
2653
 
 
2654
 
 
2655
int error_log_print(enum loglevel level, const char *format,
 
2656
                    va_list args)
 
2657
{
 
2658
  return logger.error_log_print(level, format, args);
 
2659
}
 
2660
 
 
2661
void DRIZZLE_BIN_LOG::rotate_and_purge(uint32_t flags)
 
2662
{
 
2663
  if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
 
2664
    pthread_mutex_lock(&LOCK_log);
 
2665
  if ((flags & RP_FORCE_ROTATE) ||
 
2666
      (my_b_tell(&log_file) >= (my_off_t) max_size))
 
2667
  {
 
2668
    new_file_without_locking();
 
2669
    if (expire_logs_days)
 
2670
    {
 
2671
      time_t purge_time= my_time(0) - expire_logs_days*24*60*60;
 
2672
      if (purge_time >= 0)
 
2673
        purge_logs_before_date(purge_time);
 
2674
    }
 
2675
  }
 
2676
  if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
 
2677
    pthread_mutex_unlock(&LOCK_log);
 
2678
}
 
2679
 
 
2680
uint32_t DRIZZLE_BIN_LOG::next_file_id()
 
2681
{
 
2682
  uint32_t res;
 
2683
  pthread_mutex_lock(&LOCK_log);
 
2684
  res = file_id++;
 
2685
  pthread_mutex_unlock(&LOCK_log);
 
2686
  return res;
 
2687
}
 
2688
 
 
2689
 
 
2690
/*
 
2691
  Write the contents of a cache to the binary log.
 
2692
 
 
2693
  SYNOPSIS
 
2694
    write_cache()
 
2695
    cache    Cache to write to the binary log
 
2696
    lock_log True if the LOCK_log mutex should be aquired, false otherwise
 
2697
    sync_log True if the log should be flushed and sync:ed
 
2698
 
 
2699
  DESCRIPTION
 
2700
    Write the contents of the cache to the binary log. The cache will
 
2701
    be reset as a READ_CACHE to be able to read the contents from it.
 
2702
 */
 
2703
 
 
2704
int DRIZZLE_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
 
2705
{
 
2706
  Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
 
2707
 
 
2708
  if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
 
2709
    return ER_ERROR_ON_WRITE;
 
2710
  uint32_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
 
2711
  long val;
 
2712
  unsigned char header[LOG_EVENT_HEADER_LEN];
 
2713
 
 
2714
  /*
 
2715
    The events in the buffer have incorrect end_log_pos data
 
2716
    (relative to beginning of group rather than absolute),
 
2717
    so we'll recalculate them in situ so the binlog is always
 
2718
    correct, even in the middle of a group. This is possible
 
2719
    because we now know the start position of the group (the
 
2720
    offset of this cache in the log, if you will); all we need
 
2721
    to do is to find all event-headers, and add the position of
 
2722
    the group to the end_log_pos of each event.  This is pretty
 
2723
    straight forward, except that we read the cache in segments,
 
2724
    so an event-header might end up on the cache-border and get
 
2725
    split.
 
2726
  */
 
2727
 
 
2728
  group= (uint)my_b_tell(&log_file);
 
2729
  hdr_offs= carry= 0;
 
2730
 
 
2731
  do
 
2732
  {
 
2733
 
 
2734
    /*
 
2735
      if we only got a partial header in the last iteration,
 
2736
      get the other half now and process a full header.
 
2737
    */
 
2738
    if (unlikely(carry > 0))
 
2739
    {
 
2740
      assert(carry < LOG_EVENT_HEADER_LEN);
 
2741
 
 
2742
      /* assemble both halves */
 
2743
      memcpy(&header[carry], cache->read_pos, LOG_EVENT_HEADER_LEN - carry);
 
2744
 
 
2745
      /* fix end_log_pos */
 
2746
      val= uint4korr(&header[LOG_POS_OFFSET]) + group;
 
2747
      int4store(&header[LOG_POS_OFFSET], val);
 
2748
 
 
2749
      /* write the first half of the split header */
 
2750
      if (my_b_write(&log_file, header, carry))
 
2751
        return ER_ERROR_ON_WRITE;
 
2752
 
 
2753
      /*
 
2754
        copy fixed second half of header to cache so the correct
 
2755
        version will be written later.
 
2756
      */
 
2757
      memcpy(cache->read_pos, &header[carry], LOG_EVENT_HEADER_LEN - carry);
 
2758
 
 
2759
      /* next event header at ... */
 
2760
      hdr_offs = uint4korr(&header[EVENT_LEN_OFFSET]) - carry;
 
2761
 
 
2762
      carry= 0;
 
2763
    }
 
2764
 
 
2765
    /* if there is anything to write, process it. */
 
2766
 
 
2767
    if (likely(length > 0))
 
2768
    {
 
2769
      /*
 
2770
        process all event-headers in this (partial) cache.
 
2771
        if next header is beyond current read-buffer,
 
2772
        we'll get it later (though not necessarily in the
 
2773
        very next iteration, just "eventually").
 
2774
      */
 
2775
 
 
2776
      while (hdr_offs < length)
 
2777
      {
 
2778
        /*
 
2779
          partial header only? save what we can get, process once
 
2780
          we get the rest.
 
2781
        */
 
2782
 
 
2783
        if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
 
2784
        {
 
2785
          carry= length - hdr_offs;
 
2786
          memcpy(header, cache->read_pos + hdr_offs, carry);
 
2787
          length= hdr_offs;
 
2788
        }
 
2789
        else
 
2790
        {
 
2791
          /* we've got a full event-header, and it came in one piece */
 
2792
 
 
2793
          unsigned char *log_pos= (unsigned char *)cache->read_pos + hdr_offs + LOG_POS_OFFSET;
 
2794
 
 
2795
          /* fix end_log_pos */
 
2796
          val= uint4korr(log_pos) + group;
 
2797
          int4store(log_pos, val);
 
2798
 
 
2799
          /* next event header at ... */
 
2800
          log_pos= (unsigned char *)cache->read_pos + hdr_offs + EVENT_LEN_OFFSET;
 
2801
          hdr_offs += uint4korr(log_pos);
 
2802
 
 
2803
        }
 
2804
      }
 
2805
 
 
2806
      /*
 
2807
        Adjust hdr_offs. Note that it may still point beyond the segment
 
2808
        read in the next iteration; if the current event is very long,
 
2809
        it may take a couple of read-iterations (and subsequent adjustments
 
2810
        of hdr_offs) for it to point into the then-current segment.
 
2811
        If we have a split header (!carry), hdr_offs will be set at the
 
2812
        beginning of the next iteration, overwriting the value we set here:
 
2813
      */
 
2814
      hdr_offs -= length;
 
2815
    }
 
2816
 
 
2817
    /* Write data to the binary log file */
 
2818
    if (my_b_write(&log_file, cache->read_pos, length))
 
2819
      return ER_ERROR_ON_WRITE;
 
2820
    cache->read_pos=cache->read_end;            // Mark buffer used up
 
2821
  } while ((length= my_b_fill(cache)));
 
2822
 
 
2823
  assert(carry == 0);
 
2824
 
 
2825
  if (sync_log)
 
2826
    flush_and_sync();
 
2827
 
 
2828
  return 0;                                     // All OK
 
2829
}
 
2830
 
 
2831
/**
 
2832
  Write a cached log entry to the binary log.
 
2833
  - To support transaction over replication, we wrap the transaction
 
2834
  with BEGIN/COMMIT or BEGIN/ROLLBACK in the binary log.
 
2835
  We want to write a BEGIN/ROLLBACK block when a non-transactional table
 
2836
  was updated in a transaction which was rolled back. This is to ensure
 
2837
  that the same updates are run on the slave.
 
2838
 
 
2839
  @param thd
 
2840
  @param cache          The cache to copy to the binlog
 
2841
  @param commit_event   The commit event to print after writing the
 
2842
                        contents of the cache.
 
2843
 
 
2844
  @note
 
2845
    We only come here if there is something in the cache.
 
2846
  @note
 
2847
    The thing in the cache is always a complete transaction.
 
2848
  @note
 
2849
    'cache' needs to be reinitialized after this functions returns.
 
2850
*/
 
2851
 
 
2852
bool DRIZZLE_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event)
 
2853
{
 
2854
  pthread_mutex_lock(&LOCK_log);
 
2855
 
 
2856
  /* NULL would represent nothing to replicate after ROLLBACK */
 
2857
  assert(commit_event != NULL);
 
2858
 
 
2859
  assert(is_open());
 
2860
  if (likely(is_open()))                       // Should always be true
 
2861
  {
 
2862
    /*
 
2863
      We only bother to write to the binary log if there is anything
 
2864
      to write.
 
2865
     */
 
2866
    if (my_b_tell(cache) > 0)
 
2867
    {
 
2868
      /*
 
2869
        Log "BEGIN" at the beginning of every transaction.  Here, a
 
2870
        transaction is either a BEGIN..COMMIT block or a single
 
2871
        statement in autocommit mode.
 
2872
      */
 
2873
      Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), true, false);
 
2874
      /*
 
2875
        Imagine this is rollback due to net timeout, after all
 
2876
        statements of the transaction succeeded. Then we want a
 
2877
        zero-error code in BEGIN.  In other words, if there was a
 
2878
        really serious error code it's already in the statement's
 
2879
        events, there is no need to put it also in this internally
 
2880
        generated event, and as this event is generated late it would
 
2881
        lead to false alarms.
 
2882
 
 
2883
        This is safer than thd->clear_error() against kills at shutdown.
 
2884
      */
 
2885
      qinfo.error_code= 0;
 
2886
      /*
 
2887
        Now this Query_log_event has artificial log_pos 0. It must be
 
2888
        adjusted to reflect the real position in the log. Not doing it
 
2889
        would confuse the slave: it would prevent this one from
 
2890
        knowing where he is in the master's binlog, which would result
 
2891
        in wrong positions being shown to the user, MASTER_POS_WAIT
 
2892
        undue waiting etc.
 
2893
      */
 
2894
      if (qinfo.write(&log_file))
 
2895
        goto err;
 
2896
 
 
2897
      if ((write_error= write_cache(cache, false, false)))
 
2898
        goto err;
 
2899
 
 
2900
      if (commit_event && commit_event->write(&log_file))
 
2901
        goto err;
 
2902
      if (flush_and_sync())
 
2903
        goto err;
 
2904
      if (cache->error)                         // Error on read
 
2905
      {
 
2906
        sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno);
 
2907
        write_error=1;                          // Don't give more errors
 
2908
        goto err;
 
2909
      }
 
2910
      signal_update();
 
2911
    }
 
2912
 
 
2913
    /*
 
2914
      if commit_event is Xid_log_event, increase the number of
 
2915
      prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
 
2916
      if there're prepared xids in it - see the comment in new_file() for
 
2917
      an explanation.
 
2918
      If the commit_event is not Xid_log_event (then it's a Query_log_event)
 
2919
      rotate binlog, if necessary.
 
2920
    */
 
2921
    if (commit_event && commit_event->get_type_code() == XID_EVENT)
 
2922
    {
 
2923
      pthread_mutex_lock(&LOCK_prep_xids);
 
2924
      prepared_xids++;
 
2925
      pthread_mutex_unlock(&LOCK_prep_xids);
 
2926
    }
 
2927
    else
 
2928
      rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
 
2929
  }
 
2930
  pthread_mutex_unlock(&LOCK_log);
 
2931
 
 
2932
  return(0);
 
2933
 
 
2934
err:
 
2935
  if (!write_error)
 
2936
  {
 
2937
    write_error= 1;
 
2938
    sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
 
2939
  }
 
2940
  pthread_mutex_unlock(&LOCK_log);
 
2941
  return(1);
 
2942
}
 
2943
 
 
2944
 
 
2945
/**
 
2946
  Wait until we get a signal that the relay log has been updated
 
2947
 
 
2948
  @param[in] thd   a THD struct
 
2949
  @note
 
2950
    LOCK_log must be taken before calling this function.
 
2951
    It will be released at the end of the function.
 
2952
*/
 
2953
 
 
2954
void DRIZZLE_BIN_LOG::wait_for_update_relay_log(THD* thd)
 
2955
{
 
2956
  const char *old_msg;
 
2957
  old_msg= thd->enter_cond(&update_cond, &LOCK_log,
 
2958
                           "Slave has read all relay log; " 
 
2959
                           "waiting for the slave I/O "
 
2960
                           "thread to update it" );
 
2961
  pthread_cond_wait(&update_cond, &LOCK_log);
 
2962
  thd->exit_cond(old_msg);
 
2963
  return;
 
2964
}
 
2965
 
 
2966
 
 
2967
/**
 
2968
  Wait until we get a signal that the binary log has been updated.
 
2969
  Applies to master only.
 
2970
     
 
2971
  NOTES
 
2972
  @param[in] thd        a THD struct
 
2973
  @param[in] timeout    a pointer to a timespec;
 
2974
                        NULL means to wait w/o timeout.
 
2975
  @retval    0          if got signalled on update
 
2976
  @retval    non-0      if wait timeout elapsed
 
2977
  @note
 
2978
    LOCK_log must be taken before calling this function.
 
2979
    LOCK_log is being released while the thread is waiting.
 
2980
    LOCK_log is released by the caller.
 
2981
*/
 
2982
 
 
2983
int DRIZZLE_BIN_LOG::wait_for_update_bin_log(THD* thd,
 
2984
                                           const struct timespec *timeout)
 
2985
{
 
2986
  int ret= 0;
 
2987
  const char* old_msg = thd->get_proc_info();
 
2988
  old_msg= thd->enter_cond(&update_cond, &LOCK_log,
 
2989
                           "Master has sent all binlog to slave; "
 
2990
                           "waiting for binlog to be updated");
 
2991
  if (!timeout)
 
2992
    pthread_cond_wait(&update_cond, &LOCK_log);
 
2993
  else
 
2994
    ret= pthread_cond_timedwait(&update_cond, &LOCK_log,
 
2995
                                const_cast<struct timespec *>(timeout));
 
2996
  return(ret);
 
2997
}
 
2998
 
 
2999
 
 
3000
/**
 
3001
  Close the log file.
 
3002
 
 
3003
  @param exiting     Bitmask for one or more of the following bits:
 
3004
          - LOG_CLOSE_INDEX : if we should close the index file
 
3005
          - LOG_CLOSE_TO_BE_OPENED : if we intend to call open
 
3006
                                     at once after close.
 
3007
          - LOG_CLOSE_STOP_EVENT : write a 'stop' event to the log
 
3008
 
 
3009
  @note
 
3010
    One can do an open on the object at once after doing a close.
 
3011
    The internal structures are not freed until cleanup() is called
 
3012
*/
 
3013
 
 
3014
void DRIZZLE_BIN_LOG::close(uint32_t exiting)
 
3015
{                                       // One can't set log_type here!
 
3016
  if (log_state == LOG_OPENED)
 
3017
  {
 
3018
    if (log_type == LOG_BIN && !no_auto_events &&
 
3019
        (exiting & LOG_CLOSE_STOP_EVENT))
 
3020
    {
 
3021
      Stop_log_event s;
 
3022
      s.write(&log_file);
 
3023
      bytes_written+= s.data_written;
 
3024
      signal_update();
 
3025
    }
 
3026
 
 
3027
    /* don't pwrite in a file opened with O_APPEND - it doesn't work */
 
3028
    if (log_file.type == WRITE_CACHE && log_type == LOG_BIN)
 
3029
    {
 
3030
      my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
 
3031
      unsigned char flags= 0;            // clearing LOG_EVENT_BINLOG_IN_USE_F
 
3032
      pwrite(log_file.file, &flags, 1, offset);
 
3033
    }
 
3034
 
 
3035
    /* this will cleanup IO_CACHE, sync and close the file */
 
3036
    DRIZZLE_LOG::close(exiting);
 
3037
  }
 
3038
 
 
3039
  /*
 
3040
    The following test is needed even if is_open() is not set, as we may have
 
3041
    called a not complete close earlier and the index file is still open.
 
3042
  */
 
3043
 
 
3044
  if ((exiting & LOG_CLOSE_INDEX) && my_b_inited(&index_file))
 
3045
  {
 
3046
    end_io_cache(&index_file);
 
3047
    if (my_close(index_file.file, MYF(0)) < 0 && ! write_error)
 
3048
    {
 
3049
      write_error= 1;
 
3050
      sql_print_error(ER(ER_ERROR_ON_WRITE), index_file_name, errno);
 
3051
    }
 
3052
  }
 
3053
  log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
 
3054
  if (name)
 
3055
  {
 
3056
    free(name);
 
3057
    name= NULL;
 
3058
  }
 
3059
  return;
 
3060
}
 
3061
 
 
3062
 
 
3063
void DRIZZLE_BIN_LOG::set_max_size(ulong max_size_arg)
 
3064
{
 
3065
  /*
 
3066
    We need to take locks, otherwise this may happen:
 
3067
    new_file() is called, calls open(old_max_size), then before open() starts,
 
3068
    set_max_size() sets max_size to max_size_arg, then open() starts and
 
3069
    uses the old_max_size argument, so max_size_arg has been overwritten and
 
3070
    it's like if the SET command was never run.
 
3071
  */
 
3072
  pthread_mutex_lock(&LOCK_log);
 
3073
  if (is_open())
 
3074
    max_size= max_size_arg;
 
3075
  pthread_mutex_unlock(&LOCK_log);
 
3076
  return;
 
3077
}
 
3078
 
 
3079
 
 
3080
/**
 
3081
  Check if a string is a valid number.
 
3082
 
 
3083
  @param str                    String to test
 
3084
  @param res                    Store value here
 
3085
  @param allow_wildcards        Set to 1 if we should ignore '%' and '_'
 
3086
 
 
3087
  @note
 
3088
    For the moment the allow_wildcards argument is not used
 
3089
    Should be move to some other file.
 
3090
 
 
3091
  @retval
 
3092
    1   String is a number
 
3093
  @retval
 
3094
    0   Error
 
3095
*/
 
3096
 
 
3097
static bool test_if_number(register const char *str,
 
3098
                           long *res, bool allow_wildcards)
 
3099
{
 
3100
  register int flag;
 
3101
  const char *start;
 
3102
 
 
3103
  flag= 0; 
 
3104
  start= str;
 
3105
  while (*str++ == ' ') ;
 
3106
  if (*--str == '-' || *str == '+')
 
3107
    str++;
 
3108
  while (my_isdigit(files_charset_info,*str) ||
 
3109
         (allow_wildcards && (*str == wild_many || *str == wild_one)))
 
3110
  {
 
3111
    flag=1;
 
3112
    str++;
 
3113
  }
 
3114
  if (*str == '.')
 
3115
  {
 
3116
    for (str++ ;
 
3117
         my_isdigit(files_charset_info,*str) ||
 
3118
           (allow_wildcards && (*str == wild_many || *str == wild_one)) ;
 
3119
         str++, flag=1) ;
 
3120
  }
 
3121
  if (*str != 0 || flag == 0)
 
3122
    return(0);
 
3123
  if (res)
 
3124
    *res=atol(start);
 
3125
  return(1);                    /* Number ok */
 
3126
} /* test_if_number */
 
3127
 
 
3128
 
 
3129
void sql_perror(const char *message)
 
3130
{
 
3131
  sql_print_error("%s: %s",message, strerror(errno));
 
3132
}
 
3133
 
 
3134
 
 
3135
bool flush_error_log()
 
3136
{
 
3137
  bool result=0;
 
3138
  if (opt_error_log)
 
3139
  {
 
3140
    char err_renamed[FN_REFLEN], *end;
 
3141
    end= strmake(err_renamed,log_error_file,FN_REFLEN-4);
 
3142
    my_stpcpy(end, "-old");
 
3143
    pthread_mutex_lock(&LOCK_error_log);
 
3144
    char err_temp[FN_REFLEN+4];
 
3145
    /*
 
3146
     On Windows is necessary a temporary file for to rename
 
3147
     the current error file.
 
3148
    */
 
3149
    strxmov(err_temp, err_renamed,"-tmp",NULL);
 
3150
    (void) my_delete(err_temp, MYF(0)); 
 
3151
    if (freopen(err_temp,"a+",stdout))
 
3152
    {
 
3153
      int fd;
 
3154
      size_t bytes;
 
3155
      unsigned char buf[IO_SIZE];
 
3156
 
 
3157
      freopen(err_temp,"a+",stderr);
 
3158
      (void) my_delete(err_renamed, MYF(0));
 
3159
      my_rename(log_error_file,err_renamed,MYF(0));
 
3160
      if (freopen(log_error_file,"a+",stdout))
 
3161
        freopen(log_error_file,"a+",stderr);
 
3162
 
 
3163
      if ((fd = my_open(err_temp, O_RDONLY, MYF(0))) >= 0)
 
3164
      {
 
3165
        while ((bytes= my_read(fd, buf, IO_SIZE, MYF(0))) &&
 
3166
               bytes != MY_FILE_ERROR)
 
3167
          my_fwrite(stderr, buf, bytes, MYF(0));
 
3168
        my_close(fd, MYF(0));
 
3169
      }
 
3170
      (void) my_delete(err_temp, MYF(0)); 
 
3171
    }
 
3172
    else
 
3173
     result= 1;
 
3174
    pthread_mutex_unlock(&LOCK_error_log);
 
3175
  }
 
3176
   return result;
 
3177
}
 
3178
 
 
3179
void DRIZZLE_BIN_LOG::signal_update()
 
3180
{
 
3181
  pthread_cond_broadcast(&update_cond);
 
3182
  return;
 
3183
}
 
3184
 
 
3185
/**
 
3186
  Prints a printf style message to the error log and, under NT, to the
 
3187
  Windows event log.
 
3188
 
 
3189
  This function prints the message into a buffer and then sends that buffer
 
3190
  to other functions to write that message to other logging sources.
 
3191
 
 
3192
  @param event_type          Type of event to write (Error, Warning, or Info)
 
3193
  @param format              Printf style format of message
 
3194
  @param args                va_list list of arguments for the message
 
3195
 
 
3196
  @returns
 
3197
    The function always returns 0. The return value is present in the
 
3198
    signature to be compatible with other logging routines, which could
 
3199
    return an error (e.g. logging to the log tables)
 
3200
*/
 
3201
static void print_buffer_to_file(enum loglevel level,
 
3202
                                 int error_code __attribute__((unused)),
 
3203
                                 const char *buffer,
 
3204
                                 size_t buffer_length __attribute__((unused)))
 
3205
{
 
3206
  time_t skr;
 
3207
  struct tm tm_tmp;
 
3208
  struct tm *start;
 
3209
 
 
3210
  pthread_mutex_lock(&LOCK_error_log);
 
3211
 
 
3212
  skr= my_time(0);
 
3213
  localtime_r(&skr, &tm_tmp);
 
3214
  start=&tm_tmp;
 
3215
 
 
3216
  fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %s\n",
 
3217
          start->tm_year % 100,
 
3218
          start->tm_mon+1,
 
3219
          start->tm_mday,
 
3220
          start->tm_hour,
 
3221
          start->tm_min,
 
3222
          start->tm_sec,
 
3223
          (level == ERROR_LEVEL ? "ERROR" : level == WARNING_LEVEL ?
 
3224
           "Warning" : "Note"),
 
3225
          buffer);
 
3226
 
 
3227
  fflush(stderr);
 
3228
 
 
3229
  pthread_mutex_unlock(&LOCK_error_log);
 
3230
  return;
 
3231
}
 
3232
 
 
3233
 
 
3234
int vprint_msg_to_log(enum loglevel level, const char *format, va_list args)
 
3235
{
 
3236
  char   buff[1024];
 
3237
  size_t length;
 
3238
  int error_code= errno;
 
3239
 
 
3240
  length= vsnprintf(buff, sizeof(buff), format, args);
 
3241
 
 
3242
  print_buffer_to_file(level, error_code, buff, length);
 
3243
 
 
3244
  return(0);
 
3245
}
 
3246
 
 
3247
 
 
3248
void sql_print_error(const char *format, ...) 
 
3249
{
 
3250
  va_list args;
 
3251
 
 
3252
  va_start(args, format);
 
3253
  error_log_print(ERROR_LEVEL, format, args);
 
3254
  va_end(args);
 
3255
 
 
3256
  return;
 
3257
}
 
3258
 
 
3259
 
 
3260
void sql_print_warning(const char *format, ...) 
 
3261
{
 
3262
  va_list args;
 
3263
 
 
3264
  va_start(args, format);
 
3265
  error_log_print(WARNING_LEVEL, format, args);
 
3266
  va_end(args);
 
3267
 
 
3268
  return;
 
3269
}
 
3270
 
 
3271
 
 
3272
void sql_print_information(const char *format, ...) 
 
3273
{
 
3274
  va_list args;
 
3275
 
 
3276
  va_start(args, format);
 
3277
  error_log_print(INFORMATION_LEVEL, format, args);
 
3278
  va_end(args);
 
3279
 
 
3280
  return;
 
3281
}
 
3282
 
 
3283
 
 
3284
/********* transaction coordinator log for 2pc - mmap() based solution *******/
 
3285
 
 
3286
/*
 
3287
  the log consists of a file, mmapped to a memory.
 
3288
  file is divided on pages of tc_log_page_size size.
 
3289
  (usable size of the first page is smaller because of log header)
 
3290
  there's PAGE control structure for each page
 
3291
  each page (or rather PAGE control structure) can be in one of three
 
3292
  states - active, syncing, pool.
 
3293
  there could be only one page in active or syncing states,
 
3294
  but many in pool - pool is fifo queue.
 
3295
  usual lifecycle of a page is pool->active->syncing->pool
 
3296
  "active" page - is a page where new xid's are logged.
 
3297
  the page stays active as long as syncing slot is taken.
 
3298
  "syncing" page is being synced to disk. no new xid can be added to it.
 
3299
  when the sync is done the page is moved to a pool and an active page
 
3300
  becomes "syncing".
 
3301
 
 
3302
  the result of such an architecture is a natural "commit grouping" -
 
3303
  If commits are coming faster than the system can sync, they do not
 
3304
  stall. Instead, all commit that came since the last sync are
 
3305
  logged to the same page, and they all are synced with the next -
 
3306
  one - sync. Thus, thought individual commits are delayed, throughput
 
3307
  is not decreasing.
 
3308
 
 
3309
  when a xid is added to an active page, the thread of this xid waits
 
3310
  for a page's condition until the page is synced. when syncing slot
 
3311
  becomes vacant one of these waiters is awaken to take care of syncing.
 
3312
  it syncs the page and signals all waiters that the page is synced.
 
3313
  PAGE::waiters is used to count these waiters, and a page may never
 
3314
  become active again until waiters==0 (that is all waiters from the
 
3315
  previous sync have noticed the sync was completed)
 
3316
 
 
3317
  note, that the page becomes "dirty" and has to be synced only when a
 
3318
  new xid is added into it. Removing a xid from a page does not make it
 
3319
  dirty - we don't sync removals to disk.
 
3320
*/
 
3321
 
 
3322
ulong tc_log_page_waits= 0;
 
3323
 
 
3324
#ifdef HAVE_MMAP
 
3325
 
 
3326
#define TC_LOG_HEADER_SIZE (sizeof(tc_log_magic)+1)
 
3327
 
 
3328
static const char tc_log_magic[]={(char) 254, 0x23, 0x05, 0x74};
 
3329
 
 
3330
ulong opt_tc_log_size= TC_LOG_MIN_SIZE;
 
3331
ulong tc_log_max_pages_used=0, tc_log_page_size=0, tc_log_cur_pages_used=0;
 
3332
 
 
3333
int TC_LOG_MMAP::open(const char *opt_name)
 
3334
{
 
3335
  uint32_t i;
 
3336
  bool crashed= false;
 
3337
  PAGE *pg;
 
3338
 
 
3339
  assert(total_ha_2pc > 1);
 
3340
  assert(opt_name && opt_name[0]);
 
3341
 
 
3342
  tc_log_page_size= getpagesize();
 
3343
  assert(TC_LOG_PAGE_SIZE % tc_log_page_size == 0);
 
3344
 
 
3345
  fn_format(logname,opt_name,mysql_data_home,"",MY_UNPACK_FILENAME);
 
3346
  if ((fd= my_open(logname, O_RDWR, MYF(0))) < 0)
 
3347
  {
 
3348
    if (my_errno != ENOENT)
 
3349
      goto err;
 
3350
    if (using_heuristic_recover())
 
3351
      return 1;
 
3352
    if ((fd= my_create(logname, CREATE_MODE, O_RDWR, MYF(MY_WME))) < 0)
 
3353
      goto err;
 
3354
    inited=1;
 
3355
    file_length= opt_tc_log_size;
 
3356
    if (ftruncate(fd, file_length))
 
3357
      goto err;
 
3358
  }
 
3359
  else
 
3360
  {
 
3361
    inited= 1;
 
3362
    crashed= true;
 
3363
    sql_print_information(_("Recovering after a crash using %s"), opt_name);
 
3364
    if (tc_heuristic_recover)
 
3365
    {
 
3366
      sql_print_error(_("Cannot perform automatic crash recovery when "
 
3367
                      "--tc-heuristic-recover is used"));
 
3368
      goto err;
 
3369
    }
 
3370
    file_length= my_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME+MY_FAE));
 
3371
    if (file_length == MY_FILEPOS_ERROR || file_length % tc_log_page_size)
 
3372
      goto err;
 
3373
  }
 
3374
 
 
3375
  data= (unsigned char *)my_mmap(0, (size_t)file_length, PROT_READ|PROT_WRITE,
 
3376
                        MAP_NOSYNC|MAP_SHARED, fd, 0);
 
3377
  if (data == MAP_FAILED)
 
3378
  {
 
3379
    my_errno=errno;
 
3380
    goto err;
 
3381
  }
 
3382
  inited=2;
 
3383
 
 
3384
  npages=(uint)file_length/tc_log_page_size;
 
3385
  assert(npages >= 3);             // to guarantee non-empty pool
 
3386
  if (!(pages=(PAGE *)my_malloc(npages*sizeof(PAGE), MYF(MY_WME|MY_ZEROFILL))))
 
3387
    goto err;
 
3388
  inited=3;
 
3389
  for (pg=pages, i=0; i < npages; i++, pg++)
 
3390
  {
 
3391
    pg->next=pg+1;
 
3392
    pg->waiters=0;
 
3393
    pg->state=POOL;
 
3394
    pthread_mutex_init(&pg->lock, MY_MUTEX_INIT_FAST);
 
3395
    pthread_cond_init (&pg->cond, 0);
 
3396
    pg->start=(my_xid *)(data + i*tc_log_page_size);
 
3397
    pg->ptr=pg->start;
 
3398
    pg->end=(my_xid *)(pg->start + tc_log_page_size);
 
3399
    pg->size=pg->free=tc_log_page_size/sizeof(my_xid);
 
3400
  }
 
3401
  pages[0].size=pages[0].free=
 
3402
                (tc_log_page_size-TC_LOG_HEADER_SIZE)/sizeof(my_xid);
 
3403
  pages[0].start=pages[0].end-pages[0].size;
 
3404
  pages[npages-1].next=0;
 
3405
  inited=4;
 
3406
 
 
3407
  if (crashed && recover())
 
3408
      goto err;
 
3409
 
 
3410
  memcpy(data, tc_log_magic, sizeof(tc_log_magic));
 
3411
  data[sizeof(tc_log_magic)]= (unsigned char)total_ha_2pc;
 
3412
  msync(data, tc_log_page_size, MS_SYNC);
 
3413
  my_sync(fd, MYF(0));
 
3414
  inited=5;
 
3415
 
 
3416
  pthread_mutex_init(&LOCK_sync,    MY_MUTEX_INIT_FAST);
 
3417
  pthread_mutex_init(&LOCK_active,  MY_MUTEX_INIT_FAST);
 
3418
  pthread_mutex_init(&LOCK_pool,    MY_MUTEX_INIT_FAST);
 
3419
  pthread_cond_init(&COND_active, 0);
 
3420
  pthread_cond_init(&COND_pool, 0);
 
3421
 
 
3422
  inited=6;
 
3423
 
 
3424
  syncing= 0;
 
3425
  active=pages;
 
3426
  pool=pages+1;
 
3427
  pool_last=pages+npages-1;
 
3428
 
 
3429
  return 0;
 
3430
 
 
3431
err:
 
3432
  close();
 
3433
  return 1;
 
3434
}
 
3435
 
 
3436
/**
 
3437
  there is no active page, let's got one from the pool.
 
3438
 
 
3439
  Two strategies here:
 
3440
    -# take the first from the pool
 
3441
    -# if there're waiters - take the one with the most free space.
 
3442
 
 
3443
  @todo
 
3444
    TODO page merging. try to allocate adjacent page first,
 
3445
    so that they can be flushed both in one sync
 
3446
*/
 
3447
 
 
3448
void TC_LOG_MMAP::get_active_from_pool()
 
3449
{
 
3450
  PAGE **p, **best_p=0;
 
3451
  int best_free;
 
3452
 
 
3453
  if (syncing)
 
3454
    pthread_mutex_lock(&LOCK_pool);
 
3455
 
 
3456
  do
 
3457
  {
 
3458
    best_p= p= &pool;
 
3459
    if ((*p)->waiters == 0) // can the first page be used ?
 
3460
      break;                // yes - take it.
 
3461
 
 
3462
    best_free=0;            // no - trying second strategy
 
3463
    for (p=&(*p)->next; *p; p=&(*p)->next)
 
3464
    {
 
3465
      if ((*p)->waiters == 0 && (*p)->free > best_free)
 
3466
      {
 
3467
        best_free=(*p)->free;
 
3468
        best_p=p;
 
3469
      }
 
3470
    }
 
3471
  }
 
3472
  while ((*best_p == 0 || best_free == 0) && overflow());
 
3473
 
 
3474
  active=*best_p;
 
3475
  if (active->free == active->size) // we've chosen an empty page
 
3476
  {
 
3477
    tc_log_cur_pages_used++;
 
3478
    set_if_bigger(tc_log_max_pages_used, tc_log_cur_pages_used);
 
3479
  }
 
3480
 
 
3481
  if ((*best_p)->next)              // unlink the page from the pool
 
3482
    *best_p=(*best_p)->next;
 
3483
  else
 
3484
    pool_last=*best_p;
 
3485
 
 
3486
  if (syncing)
 
3487
    pthread_mutex_unlock(&LOCK_pool);
 
3488
}
 
3489
 
 
3490
/**
 
3491
  @todo
 
3492
  perhaps, increase log size ?
 
3493
*/
 
3494
int TC_LOG_MMAP::overflow()
 
3495
{
 
3496
  /*
 
3497
    simple overflow handling - just wait
 
3498
    TODO perhaps, increase log size ?
 
3499
    let's check the behaviour of tc_log_page_waits first
 
3500
  */
 
3501
  tc_log_page_waits++;
 
3502
  pthread_cond_wait(&COND_pool, &LOCK_pool);
 
3503
  return 1; // always return 1
 
3504
}
 
3505
 
 
3506
/**
 
3507
  Record that transaction XID is committed on the persistent storage.
 
3508
 
 
3509
    This function is called in the middle of two-phase commit:
 
3510
    First all resources prepare the transaction, then tc_log->log() is called,
 
3511
    then all resources commit the transaction, then tc_log->unlog() is called.
 
3512
 
 
3513
    All access to active page is serialized but it's not a problem, as
 
3514
    we're assuming that fsync() will be a main bottleneck.
 
3515
    That is, parallelizing writes to log pages we'll decrease number of
 
3516
    threads waiting for a page, but then all these threads will be waiting
 
3517
    for a fsync() anyway
 
3518
 
 
3519
   If tc_log == DRIZZLE_LOG then tc_log writes transaction to binlog and
 
3520
   records XID in a special Xid_log_event.
 
3521
   If tc_log = TC_LOG_MMAP then xid is written in a special memory-mapped
 
3522
   log.
 
3523
 
 
3524
  @retval
 
3525
    0  - error
 
3526
  @retval
 
3527
    \# - otherwise, "cookie", a number that will be passed as an argument
 
3528
    to unlog() call. tc_log can define it any way it wants,
 
3529
    and use for whatever purposes. TC_LOG_MMAP sets it
 
3530
    to the position in memory where xid was logged to.
 
3531
*/
 
3532
 
 
3533
int TC_LOG_MMAP::log_xid(THD *thd __attribute__((unused)), my_xid xid)
 
3534
{
 
3535
  int err;
 
3536
  PAGE *p;
 
3537
  ulong cookie;
 
3538
 
 
3539
  pthread_mutex_lock(&LOCK_active);
 
3540
 
 
3541
  /*
 
3542
    if active page is full - just wait...
 
3543
    frankly speaking, active->free here accessed outside of mutex
 
3544
    protection, but it's safe, because it only means we may miss an
 
3545
    unlog() for the active page, and we're not waiting for it here -
 
3546
    unlog() does not signal COND_active.
 
3547
  */
 
3548
  while (unlikely(active && active->free == 0))
 
3549
    pthread_cond_wait(&COND_active, &LOCK_active);
 
3550
 
 
3551
  /* no active page ? take one from the pool */
 
3552
  if (active == 0)
 
3553
    get_active_from_pool();
 
3554
 
 
3555
  p=active;
 
3556
  pthread_mutex_lock(&p->lock);
 
3557
 
 
3558
  /* searching for an empty slot */
 
3559
  while (*p->ptr)
 
3560
  {
 
3561
    p->ptr++;
 
3562
    assert(p->ptr < p->end);               // because p->free > 0
 
3563
  }
 
3564
 
 
3565
  /* found! store xid there and mark the page dirty */
 
3566
  cookie= (ulong)((unsigned char *)p->ptr - data);      // can never be zero
 
3567
  *p->ptr++= xid;
 
3568
  p->free--;
 
3569
  p->state= DIRTY;
 
3570
 
 
3571
  /* to sync or not to sync - this is the question */
 
3572
  pthread_mutex_unlock(&LOCK_active);
 
3573
  pthread_mutex_lock(&LOCK_sync);
 
3574
  pthread_mutex_unlock(&p->lock);
 
3575
 
 
3576
  if (syncing)
 
3577
  {                                          // somebody's syncing. let's wait
 
3578
    p->waiters++;
 
3579
    /*
 
3580
      note - it must be while (), not do ... while () here
 
3581
      as p->state may be not DIRTY when we come here
 
3582
    */
 
3583
    while (p->state == DIRTY && syncing)
 
3584
      pthread_cond_wait(&p->cond, &LOCK_sync);
 
3585
    p->waiters--;
 
3586
    err= p->state == ERROR;
 
3587
    if (p->state != DIRTY)                   // page was synced
 
3588
    {
 
3589
      if (p->waiters == 0)
 
3590
        pthread_cond_signal(&COND_pool);     // in case somebody's waiting
 
3591
      pthread_mutex_unlock(&LOCK_sync);
 
3592
      goto done;                             // we're done
 
3593
    }
 
3594
  }                                          // page was not synced! do it now
 
3595
  assert(active == p && syncing == 0);
 
3596
  pthread_mutex_lock(&LOCK_active);
 
3597
  syncing=p;                                 // place is vacant - take it
 
3598
  active=0;                                  // page is not active anymore
 
3599
  pthread_cond_broadcast(&COND_active);      // in case somebody's waiting
 
3600
  pthread_mutex_unlock(&LOCK_active);
 
3601
  pthread_mutex_unlock(&LOCK_sync);
 
3602
  err= sync();
 
3603
 
 
3604
done:
 
3605
  return err ? 0 : cookie;
 
3606
}
 
3607
 
 
3608
int TC_LOG_MMAP::sync()
 
3609
{
 
3610
  int err;
 
3611
 
 
3612
  assert(syncing != active);
 
3613
 
 
3614
  /*
 
3615
    sit down and relax - this can take a while...
 
3616
    note - no locks are held at this point
 
3617
  */
 
3618
  err= msync(syncing->start, 1, MS_SYNC);
 
3619
  if(err==0)
 
3620
    err= my_sync(fd, MYF(0));
 
3621
 
 
3622
  /* page is synced. let's move it to the pool */
 
3623
  pthread_mutex_lock(&LOCK_pool);
 
3624
  pool_last->next=syncing;
 
3625
  pool_last=syncing;
 
3626
  syncing->next=0;
 
3627
  syncing->state= err ? ERROR : POOL;
 
3628
  pthread_cond_broadcast(&syncing->cond);    // signal "sync done"
 
3629
  pthread_cond_signal(&COND_pool);           // in case somebody's waiting
 
3630
  pthread_mutex_unlock(&LOCK_pool);
 
3631
 
 
3632
  /* marking 'syncing' slot free */
 
3633
  pthread_mutex_lock(&LOCK_sync);
 
3634
  syncing=0;
 
3635
  pthread_cond_signal(&active->cond);        // wake up a new syncer
 
3636
  pthread_mutex_unlock(&LOCK_sync);
 
3637
  return err;
 
3638
}
 
3639
 
 
3640
/**
 
3641
  erase xid from the page, update page free space counters/pointers.
 
3642
  cookie points directly to the memory where xid was logged.
 
3643
*/
 
3644
 
 
3645
void TC_LOG_MMAP::unlog(ulong cookie, my_xid xid __attribute__((unused)))
 
3646
{
 
3647
  PAGE *p=pages+(cookie/tc_log_page_size);
 
3648
  my_xid *x=(my_xid *)(data+cookie);
 
3649
 
 
3650
  assert(*x == xid);
 
3651
  assert(x >= p->start && x < p->end);
 
3652
  *x=0;
 
3653
 
 
3654
  pthread_mutex_lock(&p->lock);
 
3655
  p->free++;
 
3656
  assert(p->free <= p->size);
 
3657
  set_if_smaller(p->ptr, x);
 
3658
  if (p->free == p->size)               // the page is completely empty
 
3659
    statistic_decrement(tc_log_cur_pages_used, &LOCK_status);
 
3660
  if (p->waiters == 0)                 // the page is in pool and ready to rock
 
3661
    pthread_cond_signal(&COND_pool);   // ping ... for overflow()
 
3662
  pthread_mutex_unlock(&p->lock);
 
3663
}
 
3664
 
 
3665
void TC_LOG_MMAP::close()
 
3666
{
 
3667
  uint32_t i;
 
3668
  switch (inited) {
 
3669
  case 6:
 
3670
    pthread_mutex_destroy(&LOCK_sync);
 
3671
    pthread_mutex_destroy(&LOCK_active);
 
3672
    pthread_mutex_destroy(&LOCK_pool);
 
3673
    pthread_cond_destroy(&COND_pool);
 
3674
  case 5:
 
3675
    data[0]='A'; // garble the first (signature) byte, in case my_delete fails
 
3676
  case 4:
 
3677
    for (i=0; i < npages; i++)
 
3678
    {
 
3679
      if (pages[i].ptr == 0)
 
3680
        break;
 
3681
      pthread_mutex_destroy(&pages[i].lock);
 
3682
      pthread_cond_destroy(&pages[i].cond);
 
3683
    }
 
3684
  case 3:
 
3685
    free((unsigned char*)pages);
 
3686
  case 2:
 
3687
    my_munmap((char*)data, (size_t)file_length);
 
3688
  case 1:
 
3689
    my_close(fd, MYF(0));
 
3690
  }
 
3691
  if (inited>=5) // cannot do in the switch because of Windows
 
3692
    my_delete(logname, MYF(MY_WME));
 
3693
  inited=0;
 
3694
}
 
3695
 
 
3696
int TC_LOG_MMAP::recover()
 
3697
{
 
3698
  HASH xids;
 
3699
  PAGE *p=pages, *end_p=pages+npages;
 
3700
 
 
3701
  if (memcmp(data, tc_log_magic, sizeof(tc_log_magic)))
 
3702
  {
 
3703
    sql_print_error(_("Bad magic header in tc log"));
 
3704
    goto err1;
 
3705
  }
 
3706
 
 
3707
  /*
 
3708
    the first byte after magic signature is set to current
 
3709
    number of storage engines on startup
 
3710
  */
 
3711
  if (data[sizeof(tc_log_magic)] != total_ha_2pc)
 
3712
  {
 
3713
    sql_print_error(_("Recovery failed! You must enable "
 
3714
                    "exactly %d storage engines that support "
 
3715
                    "two-phase commit protocol"),
 
3716
                    data[sizeof(tc_log_magic)]);
 
3717
    goto err1;
 
3718
  }
 
3719
 
 
3720
  if (hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0,
 
3721
                sizeof(my_xid), 0, 0, MYF(0)))
 
3722
    goto err1;
 
3723
 
 
3724
  for ( ; p < end_p ; p++)
 
3725
  {
 
3726
    for (my_xid *x=p->start; x < p->end; x++)
 
3727
      if (*x && my_hash_insert(&xids, (unsigned char *)x))
 
3728
        goto err2; // OOM
 
3729
  }
 
3730
 
 
3731
  if (ha_recover(&xids))
 
3732
    goto err2;
 
3733
 
 
3734
  hash_free(&xids);
 
3735
  memset(data, 0, (size_t)file_length);
 
3736
  return 0;
 
3737
 
 
3738
err2:
 
3739
  hash_free(&xids);
 
3740
err1:
 
3741
  sql_print_error(_("Crash recovery failed. Either correct the problem "
 
3742
                  "(if it's, for example, out of memory error) and restart, "
 
3743
                  "or delete tc log and start drizzled with "
 
3744
                  "--tc-heuristic-recover={commit|rollback}"));
 
3745
  return 1;
 
3746
}
 
3747
#endif
 
3748
 
 
3749
TC_LOG *tc_log;
 
3750
TC_LOG_DUMMY tc_log_dummy;
 
3751
TC_LOG_MMAP  tc_log_mmap;
 
3752
 
 
3753
/**
 
3754
  Perform heuristic recovery, if --tc-heuristic-recover was used.
 
3755
 
 
3756
  @note
 
3757
    no matter whether heuristic recovery was successful or not
 
3758
    mysqld must exit. So, return value is the same in both cases.
 
3759
 
 
3760
  @retval
 
3761
    0   no heuristic recovery was requested
 
3762
  @retval
 
3763
    1   heuristic recovery was performed
 
3764
*/
 
3765
 
 
3766
int TC_LOG::using_heuristic_recover()
 
3767
{
 
3768
  if (!tc_heuristic_recover)
 
3769
    return 0;
 
3770
 
 
3771
  sql_print_information(_("Heuristic crash recovery mode"));
 
3772
  if (ha_recover(0))
 
3773
    sql_print_error(_("Heuristic crash recovery failed"));
 
3774
  sql_print_information(_("Please restart mysqld without --tc-heuristic-recover"));
 
3775
  return 1;
 
3776
}
 
3777
 
 
3778
/****** transaction coordinator log for 2pc - binlog() based solution ******/
 
3779
#define TC_LOG_BINLOG DRIZZLE_BIN_LOG
 
3780
 
 
3781
/**
 
3782
  @todo
 
3783
  keep in-memory list of prepared transactions
 
3784
  (add to list in log(), remove on unlog())
 
3785
  and copy it to the new binlog if rotated
 
3786
  but let's check the behaviour of tc_log_page_waits first!
 
3787
*/
 
3788
 
 
3789
int TC_LOG_BINLOG::open(const char *opt_name)
 
3790
{
 
3791
  LOG_INFO log_info;
 
3792
  int      error= 1;
 
3793
 
 
3794
  assert(total_ha_2pc > 1);
 
3795
  assert(opt_name && opt_name[0]);
 
3796
 
 
3797
  pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST);
 
3798
  pthread_cond_init (&COND_prep_xids, 0);
 
3799
 
 
3800
  if (!my_b_inited(&index_file))
 
3801
  {
 
3802
    /* There was a failure to open the index file, can't open the binlog */
 
3803
    cleanup();
 
3804
    return 1;
 
3805
  }
 
3806
 
 
3807
  if (using_heuristic_recover())
 
3808
  {
 
3809
    /* generate a new binlog to mask a corrupted one */
 
3810
    open(opt_name, LOG_BIN, 0, WRITE_CACHE, 0, max_binlog_size, 0);
 
3811
    cleanup();
 
3812
    return 1;
 
3813
  }
 
3814
 
 
3815
  if ((error= find_log_pos(&log_info, NULL, 1)))
 
3816
  {
 
3817
    if (error != LOG_INFO_EOF)
 
3818
      sql_print_error(_("find_log_pos() failed (error: %d)"), error);
 
3819
    else
 
3820
      error= 0;
 
3821
    goto err;
 
3822
  }
 
3823
 
 
3824
  {
 
3825
    const char *errmsg;
 
3826
    IO_CACHE    log;
 
3827
    File        file;
 
3828
    Log_event  *ev=0;
 
3829
    Format_description_log_event fdle(BINLOG_VERSION);
 
3830
    char        log_name[FN_REFLEN];
 
3831
 
 
3832
    if (! fdle.is_valid())
 
3833
      goto err;
 
3834
 
 
3835
    do
 
3836
    {
 
3837
      strmake(log_name, log_info.log_file_name, sizeof(log_name)-1);
 
3838
    } while (!(error= find_next_log(&log_info, 1)));
 
3839
 
 
3840
    if (error !=  LOG_INFO_EOF)
 
3841
    {
 
3842
      sql_print_error(_("find_log_pos() failed (error: %d)"), error);
 
3843
      goto err;
 
3844
    }
 
3845
 
 
3846
    if ((file= open_binlog(&log, log_name, &errmsg)) < 0)
 
3847
    {
 
3848
      sql_print_error("%s", errmsg);
 
3849
      goto err;
 
3850
    }
 
3851
 
 
3852
    if ((ev= Log_event::read_log_event(&log, 0, &fdle)) &&
 
3853
        ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
 
3854
        ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
 
3855
    {
 
3856
      sql_print_information(_("Recovering after a crash using %s"), opt_name);
 
3857
      error= recover(&log, (Format_description_log_event *)ev);
 
3858
    }
 
3859
    else
 
3860
      error=0;
 
3861
 
 
3862
    delete ev;
 
3863
    end_io_cache(&log);
 
3864
    my_close(file, MYF(MY_WME));
 
3865
 
 
3866
    if (error)
 
3867
      goto err;
 
3868
  }
 
3869
 
 
3870
err:
 
3871
  return error;
 
3872
}
 
3873
 
 
3874
/** This is called on shutdown, after ha_panic. */
 
3875
void TC_LOG_BINLOG::close()
 
3876
{
 
3877
  assert(prepared_xids==0);
 
3878
  pthread_mutex_destroy(&LOCK_prep_xids);
 
3879
  pthread_cond_destroy (&COND_prep_xids);
 
3880
}
 
3881
 
 
3882
/**
 
3883
  @todo
 
3884
  group commit
 
3885
 
 
3886
  @retval
 
3887
    0    error
 
3888
  @retval
 
3889
    1    success
 
3890
*/
 
3891
int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid)
 
3892
{
 
3893
  Xid_log_event xle(thd, xid);
 
3894
  binlog_trx_data *trx_data=
 
3895
    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
3896
  /*
 
3897
    We always commit the entire transaction when writing an XID. Also
 
3898
    note that the return value is inverted.
 
3899
   */
 
3900
  return(!binlog_end_trans(thd, trx_data, &xle, true));
 
3901
}
 
3902
 
 
3903
void TC_LOG_BINLOG::unlog(ulong cookie __attribute__((unused)),
 
3904
                          my_xid xid __attribute__((unused)))
 
3905
{
 
3906
  pthread_mutex_lock(&LOCK_prep_xids);
 
3907
  assert(prepared_xids > 0);
 
3908
  if (--prepared_xids == 0) {
 
3909
    pthread_cond_signal(&COND_prep_xids);
 
3910
  }
 
3911
  pthread_mutex_unlock(&LOCK_prep_xids);
 
3912
  rotate_and_purge(0);     // as ::write() did not rotate
 
3913
}
 
3914
 
 
3915
int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
 
3916
{
 
3917
  Log_event  *ev;
 
3918
  HASH xids;
 
3919
  MEM_ROOT mem_root;
 
3920
 
 
3921
  if (! fdle->is_valid() ||
 
3922
      hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
 
3923
                sizeof(my_xid), 0, 0, MYF(0)))
 
3924
    goto err1;
 
3925
 
 
3926
  init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE);
 
3927
 
 
3928
  fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error
 
3929
 
 
3930
  while ((ev= Log_event::read_log_event(log,0,fdle)) && ev->is_valid())
 
3931
  {
 
3932
    if (ev->get_type_code() == XID_EVENT)
 
3933
    {
 
3934
      Xid_log_event *xev=(Xid_log_event *)ev;
 
3935
      unsigned char *x= (unsigned char *) memdup_root(&mem_root, (unsigned char*) &xev->xid,
 
3936
                                      sizeof(xev->xid));
 
3937
      if (! x)
 
3938
        goto err2;
 
3939
      my_hash_insert(&xids, x);
 
3940
    }
 
3941
    delete ev;
 
3942
  }
 
3943
 
 
3944
  if (ha_recover(&xids))
 
3945
    goto err2;
 
3946
 
 
3947
  free_root(&mem_root, MYF(0));
 
3948
  hash_free(&xids);
 
3949
  return 0;
 
3950
 
 
3951
err2:
 
3952
  free_root(&mem_root, MYF(0));
 
3953
  hash_free(&xids);
 
3954
err1:
 
3955
  sql_print_error(_("Crash recovery failed. Either correct the problem "
 
3956
                  "(if it's, for example, out of memory error) and restart, "
 
3957
                  "or delete (or rename) binary log and start mysqld with "
 
3958
                  "--tc-heuristic-recover={commit|rollback}"));
 
3959
  return 1;
 
3960
}
 
3961
 
 
3962
 
 
3963
#ifdef INNODB_COMPATIBILITY_HOOKS
 
3964
/**
 
3965
  Get the file name of the MySQL binlog.
 
3966
  @return the name of the binlog file
 
3967
*/
 
3968
extern "C"
 
3969
const char* mysql_bin_log_file_name(void)
 
3970
{
 
3971
  return mysql_bin_log.get_log_fname();
 
3972
}
 
3973
/**
 
3974
  Get the current position of the MySQL binlog.
 
3975
  @return byte offset from the beginning of the binlog
 
3976
*/
 
3977
extern "C"
 
3978
uint64_t mysql_bin_log_file_pos(void)
 
3979
{
 
3980
  return (uint64_t) mysql_bin_log.get_log_file()->pos_in_file;
 
3981
}
 
3982
#endif /* INNODB_COMPATIBILITY_HOOKS */
 
3983
 
 
3984
 
 
3985
mysql_declare_plugin(binlog)
 
3986
{
 
3987
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
3988
  "binlog",
 
3989
  "1.0",
 
3990
  "MySQL AB",
 
3991
  "This is a pseudo storage engine to represent the binlog in a transaction",
 
3992
  PLUGIN_LICENSE_GPL,
 
3993
  binlog_init, /* Plugin Init */
 
3994
  NULL, /* Plugin Deinit */
 
3995
  NULL,                       /* status variables                */
 
3996
  NULL,                       /* system variables                */
 
3997
  NULL                        /* config options                  */
 
3998
}
 
3999
mysql_declare_plugin_end;