~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/log.cc

  • Committer: Brian Aker
  • Date: 2009-01-23 02:15:04 UTC
  • mfrom: (798.2.32 drizzle)
  • Revision ID: brian@tangent.org-20090123021504-2j99e6hxab1ew601
Merge for replication removal.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
*/
26
26
 
27
27
#include <drizzled/server_includes.h>
28
 
#include <drizzled/replication/replication.h>
29
28
#include <libdrizzle/libdrizzle.h>
30
29
#include <drizzled/replicator.h>
31
30
#include <mysys/hash.h>
32
 
#include <drizzled/replication/rli.h>
33
31
 
34
32
#include <mysys/my_dir.h>
35
33
#include <stdarg.h>
39
37
#include <drizzled/errmsg_print.h>
40
38
#include <drizzled/gettext.h>
41
39
#include <drizzled/data_home.h>
42
 
#include <drizzled/log_event.h>
 
40
#include <drizzled/session.h>
 
41
#include <drizzled/handler.h>
43
42
 
44
43
/* max size of the log message */
45
44
#define MY_OFF_T_UNDEF (~(my_off_t)0UL)
46
45
 
47
 
DRIZZLE_BIN_LOG drizzle_bin_log;
48
 
uint64_t sync_binlog_counter= 0; /* We should rationalize the largest possible counters for binlog sync */
49
 
 
50
 
static bool test_if_number(const char *str,
51
 
                           long *res, bool allow_wildcards);
52
46
static int binlog_init(void *p);
53
47
static int binlog_close_connection(handlerton *hton, Session *session);
54
48
static int binlog_savepoint_set(handlerton *hton, Session *session, void *sv);
58
52
static int binlog_prepare(handlerton *hton, Session *session, bool all);
59
53
 
60
54
 
61
 
char *make_default_log_name(char *buff,const char *log_ext)
62
 
{
63
 
  strncpy(buff, pidfile_name, FN_REFLEN-5);
64
 
  return fn_format(buff, buff, drizzle_data_home, log_ext,
65
 
                   MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT));
66
 
}
67
 
 
68
 
/*
69
 
  Helper class to hold a mutex for the duration of the
70
 
  block.
71
 
 
72
 
  Eliminates the need for explicit unlocking of mutexes on, e.g.,
73
 
  error returns.  On passing a null pointer, the sentry will not do
74
 
  anything.
75
 
 */
76
 
class Mutex_sentry
77
 
{
78
 
public:
79
 
  Mutex_sentry(pthread_mutex_t *mutex)
80
 
    : m_mutex(mutex)
81
 
  {
82
 
    if (m_mutex)
83
 
      pthread_mutex_lock(mutex);
84
 
  }
85
 
 
86
 
  ~Mutex_sentry()
87
 
  {
88
 
    if (m_mutex)
89
 
      pthread_mutex_unlock(m_mutex);
90
 
    m_mutex= 0;
91
 
  }
92
 
 
93
 
private:
94
 
  pthread_mutex_t *m_mutex;
95
 
 
96
 
  // It's not allowed to copy this object in any way
97
 
  Mutex_sentry(Mutex_sentry const&);
98
 
  void operator=(Mutex_sentry const&);
99
 
};
100
 
 
101
55
handlerton *binlog_hton;
102
56
 
103
57
 
318
272
  return error;
319
273
}
320
274
 
321
 
 
322
 
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
323
 
{
324
 
  char magic[4];
325
 
  assert(my_b_tell(log) == 0);
326
 
 
327
 
  if (my_b_read(log, (unsigned char*) magic, sizeof(magic)))
328
 
  {
329
 
    *errmsg = _("I/O error reading the header from the binary log");
330
 
    errmsg_printf(ERRMSG_LVL_ERROR, "%s, errno=%d, io cache code=%d", *errmsg, my_errno,
331
 
                    log->error);
332
 
    return 1;
333
 
  }
334
 
  if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
335
 
  {
336
 
    *errmsg = _("Binlog has bad magic number;  It's not a binary log file "
337
 
                "that can be used by this version of Drizzle");
338
 
    return 1;
339
 
  }
340
 
  return 0;
341
 
}
342
 
 
343
 
 
344
 
File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg)
345
 
{
346
 
  File file;
347
 
 
348
 
  if ((file = my_open(log_file_name, O_RDONLY,
349
 
                      MYF(MY_WME))) < 0)
350
 
  {
351
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open log (file '%s', errno %d)"),
352
 
                    log_file_name, my_errno);
353
 
    *errmsg = _("Could not open log file");
354
 
    goto err;
355
 
  }
356
 
  if (init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
357
 
                    MYF(MY_WME|MY_DONT_CHECK_FILESIZE)))
358
 
  {
359
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to create a cache on log (file '%s')"),
360
 
                    log_file_name);
361
 
    *errmsg = _("Could not open log file");
362
 
    goto err;
363
 
  }
364
 
  if (check_binlog_magic(log,errmsg))
365
 
    goto err;
366
 
  return(file);
367
 
 
368
 
err:
369
 
  if (file >= 0)
370
 
  {
371
 
    my_close(file,MYF(0));
372
 
    end_io_cache(log);
373
 
  }
374
 
  return(-1);
375
 
}
376
 
 
377
 
 
378
 
/**
379
 
  Find a unique filename for 'filename.#'.
380
 
 
381
 
  Set '#' to a number as low as possible.
382
 
 
383
 
  @return
384
 
    nonzero if not possible to get unique filename
385
 
*/
386
 
 
387
 
static int find_uniq_filename(char *name)
388
 
{
389
 
  long                  number;
390
 
  uint32_t                  i;
391
 
  char                  buff[FN_REFLEN];
392
 
  struct st_my_dir     *dir_info;
393
 
  register struct fileinfo *file_info;
394
 
  ulong                 max_found=0;
395
 
  size_t                buf_length, length;
396
 
  char                  *start, *end;
397
 
 
398
 
  length= dirname_part(buff, name, &buf_length);
399
 
  start=  name + length;
400
 
  end= strchr(start, '\0');
401
 
 
402
 
  *end='.';
403
 
  length= (size_t) (end-start+1);
404
 
 
405
 
  if (!(dir_info = my_dir(buff,MYF(MY_DONT_SORT))))
406
 
  {                                             // This shouldn't happen
407
 
    strcpy(end,".1");                           // use name+1
408
 
    return(0);
409
 
  }
410
 
  file_info= dir_info->dir_entry;
411
 
  for (i=dir_info->number_off_files ; i-- ; file_info++)
412
 
  {
413
 
    if (memcmp(file_info->name, start, length) == 0 &&
414
 
        test_if_number(file_info->name+length, &number,0))
415
 
    {
416
 
      set_if_bigger(max_found,(ulong) number);
417
 
    }
418
 
  }
419
 
  my_dirend(dir_info);
420
 
 
421
 
  *end++='.';
422
 
  sprintf(end,"%06ld",max_found+1);
423
 
  return(0);
424
 
}
425
 
 
426
 
 
427
 
void DRIZZLE_LOG::init(enum_log_type log_type_arg,
428
 
                     enum cache_type io_cache_type_arg)
429
 
{
430
 
  log_type= log_type_arg;
431
 
  io_cache_type= io_cache_type_arg;
432
 
  return;
433
 
}
434
 
 
435
 
 
436
 
/*
437
 
  Open a (new) log file.
438
 
 
439
 
  SYNOPSIS
440
 
    open()
441
 
 
442
 
    log_name            The name of the log to open
443
 
    log_type_arg        The type of the log. E.g. LOG_NORMAL
444
 
    new_name            The new name for the logfile. This is only needed
445
 
                        when the method is used to open the binlog file.
446
 
    io_cache_type_arg   The type of the IO_CACHE to use for this log file
447
 
 
448
 
  DESCRIPTION
449
 
    Open the logfile, init IO_CACHE and write startup messages
450
 
    (in case of general and slow query logs).
451
 
 
452
 
  RETURN VALUES
453
 
    0   ok
454
 
    1   error
455
 
*/
456
 
 
457
 
bool DRIZZLE_LOG::open(const char *log_name, enum_log_type log_type_arg,
458
 
                     const char *new_name, enum cache_type io_cache_type_arg)
459
 
{
460
 
  char buff[FN_REFLEN];
461
 
  File file= -1;
462
 
  int open_flags= O_CREAT;
463
 
 
464
 
  write_error= 0;
465
 
 
466
 
  init(log_type_arg, io_cache_type_arg);
467
 
 
468
 
  if (!(name= strdup(log_name)))
469
 
  {
470
 
    name= (char *)log_name; // for the error message
471
 
    goto err;
472
 
  }
473
 
 
474
 
  if (new_name)
475
 
    strcpy(log_file_name, new_name);
476
 
  else if (generate_new_name(log_file_name, name))
477
 
    goto err;
478
 
 
479
 
  if (io_cache_type == SEQ_READ_APPEND)
480
 
    open_flags |= O_RDWR | O_APPEND;
481
 
  else
482
 
    open_flags |= O_WRONLY | (log_type == LOG_BIN ? 0 : O_APPEND);
483
 
 
484
 
  db[0]= 0;
485
 
 
486
 
  if ((file= my_open(log_file_name, open_flags,
487
 
                     MYF(MY_WME | ME_WAITTANG))) < 0 ||
488
 
      init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
489
 
                    lseek(file, 0, SEEK_CUR), 0,
490
 
                    MYF(MY_WME | MY_NABP |
491
 
                        ((log_type == LOG_BIN) ? MY_WAIT_IF_FULL : 0))))
492
 
    goto err;
493
 
 
494
 
  if (log_type == LOG_NORMAL)
495
 
  {
496
 
    int len= snprintf(buff, sizeof(buff), "%s, Version: %s (%s). "
497
 
                      "started with:\nTCP Port: %d, Named Pipe: %s\n",
498
 
                      my_progname, server_version, COMPILATION_COMMENT,
499
 
                      drizzled_port, "");
500
 
    len+= sprintf(buff+len, "Time                 Id Command    Argument\n");
501
 
 
502
 
    if (my_b_write(&log_file, (unsigned char*) buff, len) ||
503
 
        flush_io_cache(&log_file))
504
 
      goto err;
505
 
  }
506
 
 
507
 
  log_state= LOG_OPENED;
508
 
  return(0);
509
 
 
510
 
err:
511
 
  errmsg_printf(ERRMSG_LVL_ERROR, _("Could not use %s for logging (error %d). "
512
 
                    "Turning logging off for the whole duration of the "
513
 
                    "Drizzle server process. "
514
 
                    "To turn it on again: fix the cause, "
515
 
                    "shutdown the Drizzle server and restart it."),
516
 
                    name, errno);
517
 
  if (file >= 0)
518
 
    my_close(file, MYF(0));
519
 
  end_io_cache(&log_file);
520
 
  if (name)
521
 
  {
522
 
    free(name);
523
 
    name= NULL;
524
 
  }
525
 
  log_state= LOG_CLOSED;
526
 
  return(1);
527
 
}
528
 
 
529
 
DRIZZLE_LOG::DRIZZLE_LOG()
530
 
  : name(0), write_error(false), inited(false), log_type(LOG_UNKNOWN),
531
 
    log_state(LOG_CLOSED)
532
 
{
533
 
  /*
534
 
    We don't want to initialize LOCK_Log here as such initialization depends on
535
 
    safe_mutex (when using safe_mutex) which depends on MY_INIT(), which is
536
 
    called only in main(). Doing initialization here would make it happen
537
 
    before main().
538
 
  */
539
 
  memset(&log_file, 0, sizeof(log_file));
540
 
}
541
 
 
542
 
void DRIZZLE_LOG::init_pthread_objects()
543
 
{
544
 
  assert(inited == 0);
545
 
  inited= 1;
546
 
  (void) pthread_mutex_init(&LOCK_log, MY_MUTEX_INIT_SLOW);
547
 
}
548
 
 
549
 
/*
550
 
  Close the log file
551
 
 
552
 
  SYNOPSIS
553
 
    close()
554
 
    exiting     Bitmask. For the slow and general logs the only used bit is
555
 
                LOG_CLOSE_TO_BE_OPENED. This is used if we intend to call
556
 
                open at once after close.
557
 
 
558
 
  NOTES
559
 
    One can do an open on the object at once after doing a close.
560
 
    The internal structures are not freed until cleanup() is called
561
 
*/
562
 
 
563
 
void DRIZZLE_LOG::close(uint32_t exiting)
564
 
{                                       // One can't set log_type here!
565
 
  if (log_state == LOG_OPENED)
566
 
  {
567
 
    end_io_cache(&log_file);
568
 
 
569
 
    if (my_sync(log_file.file, MYF(MY_WME)) && ! write_error)
570
 
    {
571
 
      write_error= 1;
572
 
      errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_ERROR_ON_WRITE), name, errno);
573
 
    }
574
 
 
575
 
    if (my_close(log_file.file, MYF(MY_WME)) && ! write_error)
576
 
    {
577
 
      write_error= 1;
578
 
      errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_ERROR_ON_WRITE), name, errno);
579
 
    }
580
 
  }
581
 
 
582
 
  log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
583
 
  if (name)
584
 
  {
585
 
    free(name);
586
 
    name= NULL;
587
 
  }
588
 
  return;
589
 
}
590
 
 
591
 
/** This is called only once. */
592
 
 
593
 
void DRIZZLE_LOG::cleanup()
594
 
{
595
 
  if (inited)
596
 
  {
597
 
    inited= 0;
598
 
    (void) pthread_mutex_destroy(&LOCK_log);
599
 
    close(0);
600
 
  }
601
 
  return;
602
 
}
603
 
 
604
 
 
605
 
int DRIZZLE_LOG::generate_new_name(char *new_name, const char *log_name)
606
 
{
607
 
  fn_format(new_name, log_name, drizzle_data_home, "", 4);
608
 
  if (log_type == LOG_BIN)
609
 
  {
610
 
    if (!fn_ext(log_name)[0])
611
 
    {
612
 
      if (find_uniq_filename(new_name))
613
 
      {
614
 
        errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_NO_UNIQUE_LOGFILE), log_name);
615
 
        return 1;
616
 
      }
617
 
    }
618
 
  }
619
 
  return 0;
620
 
}
621
 
 
622
 
 
623
 
/**
624
 
  @todo
625
 
  The following should be using fn_format();  We just need to
626
 
  first change fn_format() to cut the file name if it's too long.
627
 
*/
628
 
const char *DRIZZLE_LOG::generate_name(const char *log_name,
629
 
                                      const char *suffix,
630
 
                                      bool strip_ext, char *buff)
631
 
{
632
 
  if (!log_name || !log_name[0])
633
 
  {
634
 
    strncpy(buff, pidfile_name, FN_REFLEN - strlen(suffix) - 1);
635
 
    return (const char *)
636
 
      fn_format(buff, buff, "", suffix, MYF(MY_REPLACE_EXT|MY_REPLACE_DIR));
637
 
  }
638
 
  // get rid of extension if the log is binary to avoid problems
639
 
  if (strip_ext)
640
 
  {
641
 
    char *p= fn_ext(log_name);
642
 
    uint32_t length= cmin((uint32_t)(p - log_name), FN_REFLEN);
643
 
    strncpy(buff, log_name, length);
644
 
    buff[length]= '\0';
645
 
    return (const char*)buff;
646
 
  }
647
 
  return log_name;
648
 
}
649
 
 
650
 
 
651
 
 
652
 
DRIZZLE_BIN_LOG::DRIZZLE_BIN_LOG()
653
 
  :bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
654
 
   need_start_event(true), m_table_map_version(0),
655
 
   description_event_for_exec(0), description_event_for_queue(0)
656
 
{
657
 
  /*
658
 
    We don't want to initialize locks here as such initialization depends on
659
 
    safe_mutex (when using safe_mutex) which depends on MY_INIT(), which is
660
 
    called only in main(). Doing initialization here would make it happen
661
 
    before main().
662
 
  */
663
 
  index_file_name[0] = 0;
664
 
  memset(&index_file, 0, sizeof(index_file));
665
 
}
666
 
 
667
 
/* this is called only once */
668
 
 
669
 
void DRIZZLE_BIN_LOG::cleanup()
670
 
{
671
 
  if (inited)
672
 
  {
673
 
    inited= 0;
674
 
    close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
675
 
    delete description_event_for_queue;
676
 
    delete description_event_for_exec;
677
 
    (void) pthread_mutex_destroy(&LOCK_log);
678
 
    (void) pthread_mutex_destroy(&LOCK_index);
679
 
    (void) pthread_cond_destroy(&update_cond);
680
 
  }
681
 
  return;
682
 
}
683
 
 
684
 
 
685
 
/* Init binlog-specific vars */
686
 
void DRIZZLE_BIN_LOG::init(bool no_auto_events_arg, ulong max_size_arg)
687
 
{
688
 
  no_auto_events= no_auto_events_arg;
689
 
  max_size= max_size_arg;
690
 
  return;
691
 
}
692
 
 
693
 
 
694
 
void DRIZZLE_BIN_LOG::init_pthread_objects()
695
 
{
696
 
  assert(inited == 0);
697
 
  inited= 1;
698
 
  (void) pthread_mutex_init(&LOCK_log, MY_MUTEX_INIT_SLOW);
699
 
  (void) pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW);
700
 
  (void) pthread_cond_init(&update_cond, 0);
701
 
}
702
 
 
703
 
 
704
 
bool DRIZZLE_BIN_LOG::open_index_file(const char *index_file_name_arg,
705
 
                                const char *log_name)
706
 
{
707
 
  File index_file_nr= -1;
708
 
  assert(!my_b_inited(&index_file));
709
 
 
710
 
  /*
711
 
    First open of this class instance
712
 
    Create an index file that will hold all file names uses for logging.
713
 
    Add new entries to the end of it.
714
 
  */
715
 
  myf opt= MY_UNPACK_FILENAME;
716
 
  if (!index_file_name_arg)
717
 
  {
718
 
    index_file_name_arg= log_name;    // Use same basename for index file
719
 
    opt= MY_UNPACK_FILENAME | MY_REPLACE_EXT;
720
 
  }
721
 
  fn_format(index_file_name, index_file_name_arg, drizzle_data_home,
722
 
            ".index", opt);
723
 
  if ((index_file_nr= my_open(index_file_name,
724
 
                              O_RDWR | O_CREAT,
725
 
                              MYF(MY_WME))) < 0 ||
726
 
       my_sync(index_file_nr, MYF(MY_WME)) ||
727
 
       init_io_cache(&index_file, index_file_nr,
728
 
                     IO_SIZE, WRITE_CACHE,
729
 
                     lseek(index_file_nr,0,SEEK_END),
730
 
                        0, MYF(MY_WME | MY_WAIT_IF_FULL)))
731
 
  {
732
 
    /*
733
 
      TODO: all operations creating/deleting the index file or a log, should
734
 
      call my_sync_dir() or my_sync_dir_by_file() to be durable.
735
 
      TODO: file creation should be done with my_create() not my_open().
736
 
    */
737
 
    if (index_file_nr >= 0)
738
 
      my_close(index_file_nr,MYF(0));
739
 
    return true;
740
 
  }
741
 
  return false;
742
 
}
743
 
 
744
 
 
745
 
/**
746
 
  Open a (new) binlog file.
747
 
 
748
 
  - Open the log file and the index file. Register the new
749
 
  file name in it
750
 
  - When calling this when the file is in use, you must have a locks
751
 
  on LOCK_log and LOCK_index.
752
 
 
753
 
  @retval
754
 
    0   ok
755
 
  @retval
756
 
    1   error
757
 
*/
758
 
 
759
 
bool DRIZZLE_BIN_LOG::open(const char *log_name,
760
 
                         enum_log_type log_type_arg,
761
 
                         const char *new_name,
762
 
                         enum cache_type io_cache_type_arg,
763
 
                         bool no_auto_events_arg,
764
 
                         ulong max_size_arg,
765
 
                         bool null_created_arg)
766
 
{
767
 
  File file= -1;
768
 
 
769
 
  write_error=0;
770
 
 
771
 
  /* open the main log file */
772
 
  if (DRIZZLE_LOG::open(log_name, log_type_arg, new_name, io_cache_type_arg))
773
 
    return(1);                            /* all warnings issued */
774
 
 
775
 
  init(no_auto_events_arg, max_size_arg);
776
 
 
777
 
  open_count++;
778
 
 
779
 
  assert(log_type == LOG_BIN);
780
 
 
781
 
  {
782
 
    bool write_file_name_to_index_file=0;
783
 
 
784
 
    if (!my_b_filelength(&log_file))
785
 
    {
786
 
      /*
787
 
        The binary log file was empty (probably newly created)
788
 
        This is the normal case and happens when the user doesn't specify
789
 
        an extension for the binary log files.
790
 
        In this case we write a standard header to it.
791
 
      */
792
 
      if (my_b_safe_write(&log_file, (unsigned char*) BINLOG_MAGIC,
793
 
                          BIN_LOG_HEADER_SIZE))
794
 
        goto err;
795
 
      bytes_written+= BIN_LOG_HEADER_SIZE;
796
 
      write_file_name_to_index_file= 1;
797
 
    }
798
 
 
799
 
    assert(my_b_inited(&index_file) != 0);
800
 
    reinit_io_cache(&index_file, WRITE_CACHE,
801
 
                    my_b_filelength(&index_file), 0, 0);
802
 
    if (need_start_event && !no_auto_events)
803
 
    {
804
 
      /*
805
 
        In 4.x we set need_start_event=0 here, but in 5.0 we want a Start event
806
 
        even if this is not the very first binlog.
807
 
      */
808
 
      Format_description_log_event s(BINLOG_VERSION);
809
 
      /*
810
 
        don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache
811
 
        as we won't be able to reset it later
812
 
      */
813
 
      if (io_cache_type == WRITE_CACHE)
814
 
        s.flags|= LOG_EVENT_BINLOG_IN_USE_F;
815
 
      if (!s.is_valid())
816
 
        goto err;
817
 
      s.dont_set_created= null_created_arg;
818
 
      if (s.write(&log_file))
819
 
        goto err;
820
 
      bytes_written+= s.data_written;
821
 
    }
822
 
    if (description_event_for_queue &&
823
 
        description_event_for_queue->binlog_version>=4)
824
 
    {
825
 
      /*
826
 
        This is a relay log written to by the I/O slave thread.
827
 
        Write the event so that others can later know the format of this relay
828
 
        log.
829
 
        Note that this event is very close to the original event from the
830
 
        master (it has binlog version of the master, event types of the
831
 
        master), so this is suitable to parse the next relay log's event. It
832
 
        has been produced by
833
 
        Format_description_log_event::Format_description_log_event(char* buf,).
834
 
        Why don't we want to write the description_event_for_queue if this
835
 
        event is for format<4 (3.23 or 4.x): this is because in that case, the
836
 
        description_event_for_queue describes the data received from the
837
 
        master, but not the data written to the relay log (*conversion*),
838
 
        which is in format 4 (slave's).
839
 
      */
840
 
      /*
841
 
        Set 'created' to 0, so that in next relay logs this event does not
842
 
        trigger cleaning actions on the slave in
843
 
        Format_description_log_event::apply_event_impl().
844
 
      */
845
 
      description_event_for_queue->created= 0;
846
 
      /* Don't set log_pos in event header */
847
 
      description_event_for_queue->artificial_event=1;
848
 
 
849
 
      if (description_event_for_queue->write(&log_file))
850
 
        goto err;
851
 
      bytes_written+= description_event_for_queue->data_written;
852
 
    }
853
 
    if (flush_io_cache(&log_file) ||
854
 
        my_sync(log_file.file, MYF(MY_WME)))
855
 
      goto err;
856
 
 
857
 
    if (write_file_name_to_index_file)
858
 
    {
859
 
      /*
860
 
        As this is a new log file, we write the file name to the index
861
 
        file. As every time we write to the index file, we sync it.
862
 
      */
863
 
      if (my_b_write(&index_file, (unsigned char*) log_file_name,
864
 
                     strlen(log_file_name)) ||
865
 
          my_b_write(&index_file, (unsigned char*) "\n", 1) ||
866
 
          flush_io_cache(&index_file) ||
867
 
          my_sync(index_file.file, MYF(MY_WME)))
868
 
        goto err;
869
 
    }
870
 
  }
871
 
  log_state= LOG_OPENED;
872
 
 
873
 
  return(0);
874
 
 
875
 
err:
876
 
  errmsg_printf(ERRMSG_LVL_ERROR, _("Could not use %s for logging (error %d). "
877
 
                    "Turning logging off for the whole duration of the "
878
 
                    "Drizzle server process. "
879
 
                    "To turn it on again: fix the cause, "
880
 
                    "shutdown the Drizzle server and restart it."),
881
 
                    name, errno);
882
 
  if (file >= 0)
883
 
    my_close(file,MYF(0));
884
 
  end_io_cache(&log_file);
885
 
  end_io_cache(&index_file);
886
 
  if (name)
887
 
  {
888
 
    free(name);
889
 
    name= NULL;
890
 
  }
891
 
  log_state= LOG_CLOSED;
892
 
  return(1);
893
 
}
894
 
 
895
 
 
896
 
int DRIZZLE_BIN_LOG::get_current_log(LOG_INFO* linfo)
897
 
{
898
 
  pthread_mutex_lock(&LOCK_log);
899
 
  int ret = raw_get_current_log(linfo);
900
 
  pthread_mutex_unlock(&LOCK_log);
901
 
  return ret;
902
 
}
903
 
 
904
 
int DRIZZLE_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
905
 
{
906
 
  strncpy(linfo->log_file_name, log_file_name, sizeof(linfo->log_file_name)-1);
907
 
  linfo->pos = my_b_tell(&log_file);
908
 
  return 0;
909
 
}
910
 
 
911
 
/**
912
 
  Move all data up in a file in an filename index file.
913
 
 
914
 
    We do the copy outside of the IO_CACHE as the cache buffers would just
915
 
    make things slower and more complicated.
916
 
    In most cases the copy loop should only do one read.
917
 
 
918
 
  @param index_file                     File to move
919
 
  @param offset                 Move everything from here to beginning
920
 
 
921
 
  @note
922
 
    File will be truncated to be 'offset' shorter or filled up with newlines
923
 
 
924
 
  @retval
925
 
    0   ok
926
 
*/
927
 
 
928
 
static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset)
929
 
{
930
 
  int bytes_read;
931
 
  my_off_t init_offset= offset;
932
 
  File file= index_file->file;
933
 
  unsigned char io_buf[IO_SIZE*2];
934
 
 
935
 
  for (;; offset+= bytes_read)
936
 
  {
937
 
    (void) lseek(file, offset, SEEK_SET);
938
 
    if ((bytes_read= (int) my_read(file, io_buf, sizeof(io_buf), MYF(MY_WME)))
939
 
        < 0)
940
 
      goto err;
941
 
    if (!bytes_read)
942
 
      break;                                    // end of file
943
 
    (void) lseek(file, offset-init_offset, SEEK_SET);
944
 
    if (my_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
945
 
      goto err;
946
 
  }
947
 
  /* The following will either truncate the file or fill the end with \n' */
948
 
  if (ftruncate(file, offset - init_offset) || my_sync(file, MYF(MY_WME)))
949
 
    goto err;
950
 
 
951
 
  /* Reset data in old index cache */
952
 
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 1);
953
 
  return(0);
954
 
 
955
 
err:
956
 
  return(1);
957
 
}
958
 
 
959
 
/**
960
 
  Find the position in the log-index-file for the given log name.
961
 
 
962
 
  @param linfo          Store here the found log file name and position to
963
 
                       the NEXT log file name in the index file.
964
 
  @param log_name       Filename to find in the index file.
965
 
                       Is a null pointer if we want to read the first entry
966
 
  @param need_lock      Set this to 1 if the parent doesn't already have a
967
 
                       lock on LOCK_index
968
 
 
969
 
  @note
970
 
    On systems without the truncate function the file will end with one or
971
 
    more empty lines.  These will be ignored when reading the file.
972
 
 
973
 
  @retval
974
 
    0                   ok
975
 
  @retval
976
 
    LOG_INFO_EOF                End of log-index-file found
977
 
  @retval
978
 
    LOG_INFO_IO         Got IO error while reading file
979
 
*/
980
 
 
981
 
int DRIZZLE_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
982
 
                            bool need_lock)
983
 
{
984
 
  int error= 0;
985
 
  char *fname= linfo->log_file_name;
986
 
  uint32_t log_name_len= log_name ? (uint) strlen(log_name) : 0;
987
 
 
988
 
  /*
989
 
    Mutex needed because we need to make sure the file pointer does not
990
 
    move from under our feet
991
 
  */
992
 
  if (need_lock)
993
 
    pthread_mutex_lock(&LOCK_index);
994
 
  safe_mutex_assert_owner(&LOCK_index);
995
 
 
996
 
  /* As the file is flushed, we can't get an error here */
997
 
  (void) reinit_io_cache(&index_file, READ_CACHE, (my_off_t) 0, 0, 0);
998
 
 
999
 
  for (;;)
1000
 
  {
1001
 
    uint32_t length;
1002
 
    my_off_t offset= my_b_tell(&index_file);
1003
 
    /* If we get 0 or 1 characters, this is the end of the file */
1004
 
 
1005
 
    if ((length= my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
1006
 
    {
1007
 
      /* Did not find the given entry; Return not found or error */
1008
 
      error= !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
1009
 
      break;
1010
 
    }
1011
 
 
1012
 
    // if the log entry matches, null string matching anything
1013
 
    if (!log_name ||
1014
 
        (log_name_len == length-1 && fname[log_name_len] == '\n' &&
1015
 
         !memcmp(fname, log_name, log_name_len)))
1016
 
    {
1017
 
      fname[length-1]=0;                        // remove last \n
1018
 
      linfo->index_file_start_offset= offset;
1019
 
      linfo->index_file_offset = my_b_tell(&index_file);
1020
 
      break;
1021
 
    }
1022
 
  }
1023
 
 
1024
 
  if (need_lock)
1025
 
    pthread_mutex_unlock(&LOCK_index);
1026
 
  return(error);
1027
 
}
1028
 
 
1029
 
 
1030
 
/**
1031
 
  Find the position in the log-index-file for the given log name.
1032
 
 
1033
 
  @param
1034
 
    linfo               Store here the next log file name and position to
1035
 
                        the file name after that.
1036
 
  @param
1037
 
    need_lock           Set this to 1 if the parent doesn't already have a
1038
 
                        lock on LOCK_index
1039
 
 
1040
 
  @note
1041
 
    - Before calling this function, one has to call find_log_pos()
1042
 
    to set up 'linfo'
1043
 
    - Mutex needed because we need to make sure the file pointer does not move
1044
 
    from under our feet
1045
 
 
1046
 
  @retval
1047
 
    0                   ok
1048
 
  @retval
1049
 
    LOG_INFO_EOF                End of log-index-file found
1050
 
  @retval
1051
 
    LOG_INFO_IO         Got IO error while reading file
1052
 
*/
1053
 
 
1054
 
int DRIZZLE_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock)
1055
 
{
1056
 
  int error= 0;
1057
 
  uint32_t length;
1058
 
  char *fname= linfo->log_file_name;
1059
 
 
1060
 
  if (need_lock)
1061
 
    pthread_mutex_lock(&LOCK_index);
1062
 
  safe_mutex_assert_owner(&LOCK_index);
1063
 
 
1064
 
  /* As the file is flushed, we can't get an error here */
1065
 
  (void) reinit_io_cache(&index_file, READ_CACHE, linfo->index_file_offset, 0,
1066
 
                         0);
1067
 
 
1068
 
  linfo->index_file_start_offset= linfo->index_file_offset;
1069
 
  if ((length=my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
1070
 
  {
1071
 
    error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
1072
 
    goto err;
1073
 
  }
1074
 
  fname[length-1]=0;                            // kill \n
1075
 
  linfo->index_file_offset = my_b_tell(&index_file);
1076
 
 
1077
 
err:
1078
 
  if (need_lock)
1079
 
    pthread_mutex_unlock(&LOCK_index);
1080
 
  return error;
1081
 
}
1082
 
 
1083
 
 
1084
 
/**
1085
 
  Delete all logs refered to in the index file.
1086
 
  Start writing to a new log file.
1087
 
 
1088
 
  The new index file will only contain this file.
1089
 
 
1090
 
  @param session                Thread
1091
 
 
1092
 
  @note
1093
 
    If not called from slave thread, write start event to new log
1094
 
 
1095
 
  @retval
1096
 
    0   ok
1097
 
  @retval
1098
 
    1   error
1099
 
*/
1100
 
 
1101
 
bool DRIZZLE_BIN_LOG::reset_logs(Session* session)
1102
 
{
1103
 
  LOG_INFO linfo;
1104
 
  bool error=0;
1105
 
  const char* save_name;
1106
 
 
1107
 
  /*
1108
 
    We need to get both locks to be sure that no one is trying to
1109
 
    write to the index log file.
1110
 
  */
1111
 
  pthread_mutex_lock(&LOCK_log);
1112
 
  pthread_mutex_lock(&LOCK_index);
1113
 
 
1114
 
  /*
1115
 
    The following mutex is needed to ensure that no threads call
1116
 
    'delete session' as we would then risk missing a 'rollback' from this
1117
 
    thread. If the transaction involved MyISAM tables, it should go
1118
 
    into binlog even on rollback.
1119
 
  */
1120
 
  pthread_mutex_lock(&LOCK_thread_count);
1121
 
 
1122
 
  /* Save variables so that we can reopen the log */
1123
 
  save_name=name;
1124
 
  name=0;                                       // Protect against free
1125
 
  close(LOG_CLOSE_TO_BE_OPENED);
1126
 
 
1127
 
  /* First delete all old log files */
1128
 
 
1129
 
  if (find_log_pos(&linfo, NULL, 0))
1130
 
  {
1131
 
    error=1;
1132
 
    goto err;
1133
 
  }
1134
 
 
1135
 
  for (;;)
1136
 
  {
1137
 
    if ((error= my_delete_allow_opened(linfo.log_file_name, MYF(0))) != 0)
1138
 
    {
1139
 
      if (my_errno == ENOENT)
1140
 
      {
1141
 
        push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1142
 
                            ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
1143
 
                            linfo.log_file_name);
1144
 
        errmsg_printf(ERRMSG_LVL_INFO, _("Failed to delete file '%s'"),
1145
 
                              linfo.log_file_name);
1146
 
        my_errno= 0;
1147
 
        error= 0;
1148
 
      }
1149
 
      else
1150
 
      {
1151
 
        push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
1152
 
                            ER_BINLOG_PURGE_FATAL_ERR,
1153
 
                            _("a problem with deleting %s; "
1154
 
                            "consider examining correspondence "
1155
 
                            "of your binlog index file "
1156
 
                            "to the actual binlog files"),
1157
 
                            linfo.log_file_name);
1158
 
        error= 1;
1159
 
        goto err;
1160
 
      }
1161
 
    }
1162
 
    if (find_next_log(&linfo, 0))
1163
 
      break;
1164
 
  }
1165
 
 
1166
 
  /* Start logging with a new file */
1167
 
  close(LOG_CLOSE_INDEX);
1168
 
  if ((error= my_delete_allow_opened(index_file_name, MYF(0)))) // Reset (open will update)
1169
 
  {
1170
 
    if (my_errno == ENOENT)
1171
 
    {
1172
 
      push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1173
 
                          ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
1174
 
                          index_file_name);
1175
 
      errmsg_printf(ERRMSG_LVL_INFO, _("Failed to delete file '%s'"),
1176
 
                            index_file_name);
1177
 
      my_errno= 0;
1178
 
      error= 0;
1179
 
    }
1180
 
    else
1181
 
    {
1182
 
      push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
1183
 
                          ER_BINLOG_PURGE_FATAL_ERR,
1184
 
                          "a problem with deleting %s; "
1185
 
                          "consider examining correspondence "
1186
 
                          "of your binlog index file "
1187
 
                          "to the actual binlog files",
1188
 
                          index_file_name);
1189
 
      error= 1;
1190
 
      goto err;
1191
 
    }
1192
 
  }
1193
 
  if (!session->slave_thread)
1194
 
    need_start_event=1;
1195
 
  if (!open_index_file(index_file_name, 0))
1196
 
    open(save_name, log_type, 0, io_cache_type, no_auto_events, max_size, 0);
1197
 
  free((unsigned char*) save_name);
1198
 
 
1199
 
err:
1200
 
  pthread_mutex_unlock(&LOCK_thread_count);
1201
 
  pthread_mutex_unlock(&LOCK_index);
1202
 
  pthread_mutex_unlock(&LOCK_log);
1203
 
  return(error);
1204
 
}
1205
 
 
1206
 
 
1207
 
/**
1208
 
  Delete relay log files prior to rli->group_relay_log_name
1209
 
  (i.e. all logs which are not involved in a non-finished group
1210
 
  (transaction)), remove them from the index file and start on next
1211
 
  relay log.
1212
 
 
1213
 
  IMPLEMENTATION
1214
 
  - Protects index file with LOCK_index
1215
 
  - Delete relevant relay log files
1216
 
  - Copy all file names after these ones to the front of the index file
1217
 
  - If the OS has truncate, truncate the file, else fill it with \n'
1218
 
  - Read the next file name from the index file and store in rli->linfo
1219
 
 
1220
 
  @param rli           Relay log information
1221
 
  @param included     If false, all relay logs that are strictly before
1222
 
                      rli->group_relay_log_name are deleted ; if true, the
1223
 
                      latter is deleted too (i.e. all relay logs
1224
 
                      read by the SQL slave thread are deleted).
1225
 
 
1226
 
  @note
1227
 
    - This is only called from the slave-execute thread when it has read
1228
 
    all commands from a relay log and want to switch to a new relay log.
1229
 
    - When this happens, we can be in an active transaction as
1230
 
    a transaction can span over two relay logs
1231
 
    (although it is always written as a single block to the master's binary
1232
 
    log, hence cannot span over two master's binary logs).
1233
 
 
1234
 
  @retval
1235
 
    0                   ok
1236
 
  @retval
1237
 
    LOG_INFO_EOF                End of log-index-file found
1238
 
  @retval
1239
 
    LOG_INFO_SEEK       Could not allocate IO cache
1240
 
  @retval
1241
 
    LOG_INFO_IO         Got IO error while reading file
1242
 
*/
1243
 
 
1244
 
 
1245
 
int DRIZZLE_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
1246
 
{
1247
 
  int error;
1248
 
 
1249
 
  assert(is_open());
1250
 
  assert(rli->slave_running == 1);
1251
 
  assert(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name.c_str()));
1252
 
 
1253
 
  pthread_mutex_lock(&LOCK_index);
1254
 
  pthread_mutex_lock(&rli->log_space_lock);
1255
 
  rli->relay_log.purge_logs(rli->group_relay_log_name.c_str(), included,
1256
 
                            0, 0, &rli->log_space_total);
1257
 
  // Tell the I/O thread to take the relay_log_space_limit into account
1258
 
  rli->ignore_log_space_limit= 0;
1259
 
  pthread_mutex_unlock(&rli->log_space_lock);
1260
 
 
1261
 
  /*
1262
 
    Ok to broadcast after the critical region as there is no risk of
1263
 
    the mutex being destroyed by this thread later - this helps save
1264
 
    context switches
1265
 
  */
1266
 
  pthread_cond_broadcast(&rli->log_space_cond);
1267
 
 
1268
 
  /*
1269
 
    Read the next log file name from the index file and pass it back to
1270
 
    the caller
1271
 
    If included is true, we want the first relay log;
1272
 
    otherwise we want the one after event_relay_log_name.
1273
 
  */
1274
 
  if ((included && (error=find_log_pos(&rli->linfo, NULL, 0))) ||
1275
 
      (!included &&
1276
 
       ((error=find_log_pos(&rli->linfo, rli->event_relay_log_name.c_str(), 0)) ||
1277
 
        (error=find_next_log(&rli->linfo, 0)))))
1278
 
  {
1279
 
    char buff[22];
1280
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("next log error: %d  offset: %s  log: %s included: %d"),
1281
 
                    error,
1282
 
                    llstr(rli->linfo.index_file_offset,buff),
1283
 
                    rli->group_relay_log_name.c_str(),
1284
 
                    included);
1285
 
    goto err;
1286
 
  }
1287
 
 
1288
 
  /*
1289
 
    Reset rli's coordinates to the current log.
1290
 
  */
1291
 
  rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1292
 
  rli->event_relay_log_name.assign(rli->linfo.log_file_name);
1293
 
 
1294
 
  /*
1295
 
    If we removed the rli->group_relay_log_name file,
1296
 
    we must update the rli->group* coordinates, otherwise do not touch it as the
1297
 
    group's execution is not finished (e.g. COMMIT not executed)
1298
 
  */
1299
 
  if (included)
1300
 
  {
1301
 
    rli->group_relay_log_pos = BIN_LOG_HEADER_SIZE;
1302
 
    rli->group_relay_log_name.assign(rli->linfo.log_file_name);
1303
 
    rli->notify_group_relay_log_name_update();
1304
 
  }
1305
 
 
1306
 
  /* Store where we are in the new file for the execution thread */
1307
 
  flush_relay_log_info(rli);
1308
 
 
1309
 
err:
1310
 
  pthread_mutex_unlock(&LOCK_index);
1311
 
  return(error);
1312
 
}
1313
 
 
1314
 
/**
1315
 
  Update log index_file.
1316
 
*/
1317
 
 
1318
 
int DRIZZLE_BIN_LOG::update_log_index(LOG_INFO* log_info, bool need_update_threads)
1319
 
{
1320
 
  if (copy_up_file_and_fill(&index_file, log_info->index_file_start_offset))
1321
 
    return LOG_INFO_IO;
1322
 
 
1323
 
  // now update offsets in index file for running threads
1324
 
  if (need_update_threads)
1325
 
    adjust_linfo_offsets(log_info->index_file_start_offset);
1326
 
  return 0;
1327
 
}
1328
 
 
1329
 
/**
1330
 
  Remove all logs before the given log from disk and from the index file.
1331
 
 
1332
 
  @param to_log       Delete all log file name before this file.
1333
 
  @param included            If true, to_log is deleted too.
1334
 
  @param need_mutex
1335
 
  @param need_update_threads If we want to update the log coordinates of
1336
 
                             all threads. False for relay logs, true otherwise.
1337
 
  @param freed_log_space     If not null, decrement this variable of
1338
 
                             the amount of log space freed
1339
 
 
1340
 
  @note
1341
 
    If any of the logs before the deleted one is in use,
1342
 
    only purge logs up to this one.
1343
 
 
1344
 
  @retval
1345
 
    0                   ok
1346
 
  @retval
1347
 
    LOG_INFO_EOF                to_log not found
1348
 
    LOG_INFO_EMFILE             too many files opened
1349
 
    LOG_INFO_FATAL              if any other than ENOENT error from
1350
 
                                stat() or my_delete()
1351
 
*/
1352
 
 
1353
 
int DRIZZLE_BIN_LOG::purge_logs(const char *to_log,
1354
 
                          bool included,
1355
 
                          bool need_mutex,
1356
 
                          bool need_update_threads,
1357
 
                          uint64_t *decrease_log_space)
1358
 
{
1359
 
  int error;
1360
 
  int ret = 0;
1361
 
  bool exit_loop= 0;
1362
 
  LOG_INFO log_info;
1363
 
 
1364
 
  if (need_mutex)
1365
 
    pthread_mutex_lock(&LOCK_index);
1366
 
  if ((error=find_log_pos(&log_info, to_log, 0 /*no mutex*/)))
1367
 
    goto err;
1368
 
 
1369
 
  /*
1370
 
    File name exists in index file; delete until we find this file
1371
 
    or a file that is used.
1372
 
  */
1373
 
  if ((error=find_log_pos(&log_info, NULL, 0 /*no mutex*/)))
1374
 
    goto err;
1375
 
  while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) &&
1376
 
         !log_in_use(log_info.log_file_name))
1377
 
  {
1378
 
    struct stat s;
1379
 
    if (stat(log_info.log_file_name, &s))
1380
 
    {
1381
 
      if (errno == ENOENT)
1382
 
      {
1383
 
        /*
1384
 
          It's not fatal if we can't stat a log file that does not exist;
1385
 
          If we could not stat, we won't delete.
1386
 
        */
1387
 
        push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1388
 
                            ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
1389
 
                            log_info.log_file_name);
1390
 
        errmsg_printf(ERRMSG_LVL_INFO, _("Failed to execute stat() on file '%s'"),
1391
 
                              log_info.log_file_name);
1392
 
        my_errno= 0;
1393
 
      }
1394
 
      else
1395
 
      {
1396
 
        /*
1397
 
          Other than ENOENT are fatal
1398
 
        */
1399
 
        push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
1400
 
                            ER_BINLOG_PURGE_FATAL_ERR,
1401
 
                            _("a problem with getting info on being purged %s; "
1402
 
                            "consider examining correspondence "
1403
 
                            "of your binlog index file "
1404
 
                            "to the actual binlog files"),
1405
 
                            log_info.log_file_name);
1406
 
        error= LOG_INFO_FATAL;
1407
 
        goto err;
1408
 
      }
1409
 
    }
1410
 
    else
1411
 
    {
1412
 
      if (!my_delete(log_info.log_file_name, MYF(0)))
1413
 
      {
1414
 
        if (decrease_log_space)
1415
 
          *decrease_log_space-= s.st_size;
1416
 
      }
1417
 
      else
1418
 
      {
1419
 
        if (my_errno == ENOENT)
1420
 
        {
1421
 
          push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1422
 
                              ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
1423
 
                              log_info.log_file_name);
1424
 
          errmsg_printf(ERRMSG_LVL_INFO, _("Failed to delete file '%s'"),
1425
 
                                log_info.log_file_name);
1426
 
          my_errno= 0;
1427
 
        }
1428
 
        else
1429
 
        {
1430
 
          push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
1431
 
                              ER_BINLOG_PURGE_FATAL_ERR,
1432
 
                              _("a problem with deleting %s; "
1433
 
                              "consider examining correspondence "
1434
 
                              "of your binlog index file "
1435
 
                              "to the actual binlog files"),
1436
 
                              log_info.log_file_name);
1437
 
          if (my_errno == EMFILE)
1438
 
          {
1439
 
            error= LOG_INFO_EMFILE;
1440
 
          }
1441
 
          error= LOG_INFO_FATAL;
1442
 
          goto err;
1443
 
        }
1444
 
      }
1445
 
    }
1446
 
 
1447
 
    if (find_next_log(&log_info, 0) || exit_loop)
1448
 
      break;
1449
 
  }
1450
 
 
1451
 
  /*
1452
 
    If we get killed -9 here, the sysadmin would have to edit
1453
 
    the log index file after restart - otherwise, this should be safe
1454
 
  */
1455
 
  error= update_log_index(&log_info, need_update_threads);
1456
 
  if (error == 0) {
1457
 
    error = ret;
1458
 
  }
1459
 
 
1460
 
err:
1461
 
  if (need_mutex)
1462
 
    pthread_mutex_unlock(&LOCK_index);
1463
 
  return(error);
1464
 
}
1465
 
 
1466
 
/**
1467
 
  Remove all logs before the given file date from disk and from the
1468
 
  index file.
1469
 
 
1470
 
  @param session                Thread pointer
1471
 
  @param before_date    Delete all log files before given date.
1472
 
 
1473
 
  @note
1474
 
    If any of the logs before the deleted one is in use,
1475
 
    only purge logs up to this one.
1476
 
 
1477
 
  @retval
1478
 
    0                           ok
1479
 
  @retval
1480
 
    LOG_INFO_PURGE_NO_ROTATE    Binary file that can't be rotated
1481
 
    LOG_INFO_FATAL              if any other than ENOENT error from
1482
 
                                stat() or my_delete()
1483
 
*/
1484
 
 
1485
 
int DRIZZLE_BIN_LOG::purge_logs_before_date(time_t purge_time)
1486
 
{
1487
 
  int error;
1488
 
  LOG_INFO log_info;
1489
 
  struct stat stat_area;
1490
 
 
1491
 
  pthread_mutex_lock(&LOCK_index);
1492
 
 
1493
 
  /*
1494
 
    Delete until we find curren file
1495
 
    or a file that is used or a file
1496
 
    that is older than purge_time.
1497
 
  */
1498
 
  if ((error=find_log_pos(&log_info, NULL, 0 /*no mutex*/)))
1499
 
    goto err;
1500
 
 
1501
 
  while (strcmp(log_file_name, log_info.log_file_name) &&
1502
 
         !log_in_use(log_info.log_file_name))
1503
 
  {
1504
 
    if (stat(log_info.log_file_name, &stat_area))
1505
 
    {
1506
 
      if (errno == ENOENT)
1507
 
      {
1508
 
        /*
1509
 
          It's not fatal if we can't stat a log file that does not exist.
1510
 
        */
1511
 
        push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1512
 
                            ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
1513
 
                            log_info.log_file_name);
1514
 
        errmsg_printf(ERRMSG_LVL_INFO, _("Failed to execute stat() on file '%s'"),
1515
 
                              log_info.log_file_name);
1516
 
        my_errno= 0;
1517
 
      }
1518
 
      else
1519
 
      {
1520
 
        /*
1521
 
          Other than ENOENT are fatal
1522
 
        */
1523
 
        push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
1524
 
                            ER_BINLOG_PURGE_FATAL_ERR,
1525
 
                            _("a problem with getting info on being purged %s; "
1526
 
                            "consider examining correspondence "
1527
 
                            "of your binlog index file "
1528
 
                            "to the actual binlog files"),
1529
 
                            log_info.log_file_name);
1530
 
        error= LOG_INFO_FATAL;
1531
 
        goto err;
1532
 
      }
1533
 
    }
1534
 
    else
1535
 
    {
1536
 
      if (stat_area.st_mtime >= purge_time)
1537
 
        break;
1538
 
      if (my_delete(log_info.log_file_name, MYF(0)))
1539
 
      {
1540
 
        if (my_errno == ENOENT)
1541
 
        {
1542
 
          /* It's not fatal even if we can't delete a log file */
1543
 
          push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1544
 
                              ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
1545
 
                              log_info.log_file_name);
1546
 
          errmsg_printf(ERRMSG_LVL_INFO, _("Failed to delete file '%s'"),
1547
 
                                log_info.log_file_name);
1548
 
          my_errno= 0;
1549
 
        }
1550
 
        else
1551
 
        {
1552
 
          push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
1553
 
                              ER_BINLOG_PURGE_FATAL_ERR,
1554
 
                              _("a problem with deleting %s; "
1555
 
                              "consider examining correspondence "
1556
 
                              "of your binlog index file "
1557
 
                              "to the actual binlog files"),
1558
 
                              log_info.log_file_name);
1559
 
          error= LOG_INFO_FATAL;
1560
 
          goto err;
1561
 
        }
1562
 
      }
1563
 
    }
1564
 
    if (find_next_log(&log_info, 0))
1565
 
      break;
1566
 
  }
1567
 
 
1568
 
  /*
1569
 
    If we get killed -9 here, the sysadmin would have to edit
1570
 
    the log index file after restart - otherwise, this should be safe
1571
 
  */
1572
 
  error= update_log_index(&log_info, 1);
1573
 
 
1574
 
err:
1575
 
  pthread_mutex_unlock(&LOCK_index);
1576
 
  return(error);
1577
 
}
1578
 
 
1579
 
 
1580
 
/**
1581
 
  Create a new log file name.
1582
 
 
1583
 
  @param buf            buf of at least FN_REFLEN where new name is stored
1584
 
 
1585
 
  @note
1586
 
    If file name will be longer then FN_REFLEN it will be truncated
1587
 
*/
1588
 
 
1589
 
void DRIZZLE_BIN_LOG::make_log_name(char* buf, const char* log_ident)
1590
 
{
1591
 
  uint32_t dir_len = dirname_length(log_file_name);
1592
 
  if (dir_len >= FN_REFLEN)
1593
 
    dir_len=FN_REFLEN-1;
1594
 
  strncpy(buf, log_file_name, dir_len);
1595
 
  strncpy(buf+dir_len, log_ident, FN_REFLEN - dir_len -1);
1596
 
}
1597
 
 
1598
 
 
1599
 
/**
1600
 
  Check if we are writing/reading to the given log file.
1601
 
*/
1602
 
 
1603
 
bool DRIZZLE_BIN_LOG::is_active(const char *log_file_name_arg)
1604
 
{
1605
 
  return !strcmp(log_file_name, log_file_name_arg);
1606
 
}
1607
 
 
1608
 
 
1609
 
/*
1610
 
  Wrappers around new_file_impl to avoid using argument
1611
 
  to control locking. The argument 1) less readable 2) breaks
1612
 
  incapsulation 3) allows external access to the class without
1613
 
  a lock (which is not possible with private new_file_without_locking
1614
 
  method).
1615
 
*/
1616
 
 
1617
 
void DRIZZLE_BIN_LOG::new_file()
1618
 
{
1619
 
  new_file_impl(1);
1620
 
}
1621
 
 
1622
 
 
1623
 
void DRIZZLE_BIN_LOG::new_file_without_locking()
1624
 
{
1625
 
  new_file_impl(0);
1626
 
}
1627
 
 
1628
 
 
1629
 
/**
1630
 
  Start writing to a new log file or reopen the old file.
1631
 
 
1632
 
  @param need_lock              Set to 1 if caller has not locked LOCK_log
1633
 
 
1634
 
  @note
1635
 
    The new file name is stored last in the index file
1636
 
*/
1637
 
 
1638
 
void DRIZZLE_BIN_LOG::new_file_impl(bool need_lock)
1639
 
{
1640
 
  char new_name[FN_REFLEN], *new_name_ptr, *old_name;
1641
 
 
1642
 
  if (!is_open())
1643
 
  {
1644
 
    return;
1645
 
  }
1646
 
 
1647
 
  if (need_lock)
1648
 
    pthread_mutex_lock(&LOCK_log);
1649
 
  pthread_mutex_lock(&LOCK_index);
1650
 
 
1651
 
  safe_mutex_assert_owner(&LOCK_log);
1652
 
  safe_mutex_assert_owner(&LOCK_index);
1653
 
 
1654
 
  /*
1655
 
    if binlog is used as tc log, be sure all xids are "unlogged",
1656
 
    so that on recover we only need to scan one - latest - binlog file
1657
 
    for prepared xids. As this is expected to be a rare event,
1658
 
    simple wait strategy is enough. We're locking LOCK_log to be sure no
1659
 
    new Xid_log_event's are added to the log (and prepared_xids is not
1660
 
    increased), and waiting on COND_prep_xids for late threads to
1661
 
    catch up.
1662
 
  */
1663
 
  if (prepared_xids)
1664
 
  {
1665
 
    tc_log_page_waits++;
1666
 
    pthread_mutex_lock(&LOCK_prep_xids);
1667
 
    while (prepared_xids) {
1668
 
      pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
1669
 
    }
1670
 
    pthread_mutex_unlock(&LOCK_prep_xids);
1671
 
  }
1672
 
 
1673
 
  /* Reuse old name if not binlog and not update log */
1674
 
  new_name_ptr= name;
1675
 
 
1676
 
  /*
1677
 
    If user hasn't specified an extension, generate a new log name
1678
 
    We have to do this here and not in open as we want to store the
1679
 
    new file name in the current binary log file.
1680
 
  */
1681
 
  if (generate_new_name(new_name, name))
1682
 
    goto end;
1683
 
  new_name_ptr=new_name;
1684
 
 
1685
 
  if (log_type == LOG_BIN)
1686
 
  {
1687
 
    if (!no_auto_events)
1688
 
    {
1689
 
      /*
1690
 
        We log the whole file name for log file as the user may decide
1691
 
        to change base names at some point.
1692
 
      */
1693
 
      Rotate_log_event r(new_name+dirname_length(new_name),
1694
 
                         0, LOG_EVENT_OFFSET, 0);
1695
 
      r.write(&log_file);
1696
 
      bytes_written += r.data_written;
1697
 
    }
1698
 
    /*
1699
 
      Update needs to be signalled even if there is no rotate event
1700
 
      log rotation should give the waiting thread a signal to
1701
 
      discover EOF and move on to the next log.
1702
 
    */
1703
 
    signal_update();
1704
 
  }
1705
 
  old_name=name;
1706
 
  name=0;                               // Don't free name
1707
 
  close(LOG_CLOSE_TO_BE_OPENED);
1708
 
 
1709
 
  /*
1710
 
     Note that at this point, log_state != LOG_CLOSED (important for is_open()).
1711
 
  */
1712
 
 
1713
 
  /*
1714
 
     new_file() is only used for rotation (in FLUSH LOGS or because size >
1715
 
     max_binlog_size or max_relay_log_size).
1716
 
     If this is a binary log, the Format_description_log_event at the beginning of
1717
 
     the new file should have created=0 (to distinguish with the
1718
 
     Format_description_log_event written at server startup, which should
1719
 
     trigger temp tables deletion on slaves.
1720
 
  */
1721
 
 
1722
 
  open(old_name, log_type, new_name_ptr,
1723
 
       io_cache_type, no_auto_events, max_size, 1);
1724
 
  free(old_name);
1725
 
 
1726
 
end:
1727
 
  if (need_lock)
1728
 
    pthread_mutex_unlock(&LOCK_log);
1729
 
  pthread_mutex_unlock(&LOCK_index);
1730
 
 
1731
 
  return;
1732
 
}
1733
 
 
1734
 
 
1735
 
bool DRIZZLE_BIN_LOG::append(Log_event* ev)
1736
 
{
1737
 
  bool error = 0;
1738
 
  pthread_mutex_lock(&LOCK_log);
1739
 
 
1740
 
  assert(log_file.type == SEQ_READ_APPEND);
1741
 
  /*
1742
 
    Log_event::write() is smart enough to use my_b_write() or
1743
 
    my_b_append() depending on the kind of cache we have.
1744
 
  */
1745
 
  if (ev->write(&log_file))
1746
 
  {
1747
 
    error=1;
1748
 
    goto err;
1749
 
  }
1750
 
  bytes_written+= ev->data_written;
1751
 
  if ((uint) my_b_append_tell(&log_file) > max_size)
1752
 
    new_file_without_locking();
1753
 
 
1754
 
err:
1755
 
  pthread_mutex_unlock(&LOCK_log);
1756
 
  signal_update();                              // Safe as we don't call close
1757
 
  return(error);
1758
 
}
1759
 
 
1760
 
 
1761
 
bool DRIZZLE_BIN_LOG::appendv(const char* buf, uint32_t len,...)
1762
 
{
1763
 
  bool error= 0;
1764
 
  va_list(args);
1765
 
  va_start(args,len);
1766
 
 
1767
 
  assert(log_file.type == SEQ_READ_APPEND);
1768
 
 
1769
 
  safe_mutex_assert_owner(&LOCK_log);
1770
 
  do
1771
 
  {
1772
 
    if (my_b_append(&log_file,(unsigned char*) buf,len))
1773
 
    {
1774
 
      error= 1;
1775
 
      goto err;
1776
 
    }
1777
 
    bytes_written += len;
1778
 
  } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint)));
1779
 
  if ((uint) my_b_append_tell(&log_file) > max_size)
1780
 
    new_file_without_locking();
1781
 
 
1782
 
err:
1783
 
  if (!error)
1784
 
    signal_update();
1785
 
  return(error);
1786
 
}
1787
 
 
1788
 
 
1789
 
bool DRIZZLE_BIN_LOG::flush_and_sync()
1790
 
{
1791
 
  int err=0, fd=log_file.file;
1792
 
  safe_mutex_assert_owner(&LOCK_log);
1793
 
  if (flush_io_cache(&log_file))
1794
 
    return 1;
1795
 
  if (++sync_binlog_counter >= sync_binlog_period && sync_binlog_period)
1796
 
  {
1797
 
    sync_binlog_counter= 0;
1798
 
    err=my_sync(fd, MYF(MY_WME));
1799
 
  }
1800
 
  return err;
1801
 
}
1802
 
 
1803
 
void DRIZZLE_BIN_LOG::start_union_events(Session *session, query_id_t query_id_param)
1804
 
{
1805
 
  assert(!session->binlog_evt_union.do_union);
1806
 
  session->binlog_evt_union.do_union= true;
1807
 
  session->binlog_evt_union.unioned_events= false;
1808
 
  session->binlog_evt_union.unioned_events_trans= false;
1809
 
  session->binlog_evt_union.first_query_id= query_id_param;
1810
 
}
1811
 
 
1812
 
void DRIZZLE_BIN_LOG::stop_union_events(Session *session)
1813
 
{
1814
 
  assert(session->binlog_evt_union.do_union);
1815
 
  session->binlog_evt_union.do_union= false;
1816
 
}
1817
 
 
1818
 
bool DRIZZLE_BIN_LOG::is_query_in_union(Session *session, query_id_t query_id_param)
1819
 
{
1820
 
  return (session->binlog_evt_union.do_union &&
1821
 
          query_id_param >= session->binlog_evt_union.first_query_id);
1822
 
}
1823
 
 
1824
 
/*
1825
 
  Moves the last bunch of rows from the pending Rows event to the binlog
1826
 
  (either cached binlog if transaction, or disk binlog). Sets a new pending
1827
 
  event.
1828
 
*/
1829
 
int
1830
 
DRIZZLE_BIN_LOG::flush_and_set_pending_rows_event(Session *, Rows_log_event*)
1831
 
{
1832
 
  assert(drizzle_bin_log.is_open());
1833
 
 
1834
 
  return false;
1835
 
}
1836
 
 
1837
 
/**
1838
 
  Write an event to the binary log.
1839
 
*/
1840
 
 
1841
 
bool DRIZZLE_BIN_LOG::write(Log_event *event_info)
1842
 
{
1843
 
  Session *session= event_info->session;
1844
 
  bool error= 1;
1845
 
 
1846
 
  if (session->binlog_evt_union.do_union)
1847
 
  {
1848
 
    /*
1849
 
      In Stored function; Remember that function call caused an update.
1850
 
      We will log the function call to the binary log on function exit
1851
 
    */
1852
 
    session->binlog_evt_union.unioned_events= true;
1853
 
    session->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt;
1854
 
    return(0);
1855
 
  }
1856
 
 
1857
 
  /*
1858
 
    Flush the pending rows event to the transaction cache or to the
1859
 
    log file.  Since this function potentially aquire the LOCK_log
1860
 
    mutex, we do this before aquiring the LOCK_log mutex in this
1861
 
    function.
1862
 
 
1863
 
    We only end the statement if we are in a top-level statement.  If
1864
 
    we are inside a stored function, we do not end the statement since
1865
 
    this will close all tables on the slave.
1866
 
  */
1867
 
 
1868
 
  pthread_mutex_lock(&LOCK_log);
1869
 
 
1870
 
  /*
1871
 
     In most cases this is only called if 'is_open()' is true; in fact this is
1872
 
     mostly called if is_open() *was* true a few instructions before, but it
1873
 
     could have changed since.
1874
 
  */
1875
 
  if (likely(is_open()))
1876
 
  {
1877
 
    IO_CACHE *file= &log_file;
1878
 
    /*
1879
 
      In the future we need to add to the following if tests like
1880
 
      "do the involved tables match (to be implemented)
1881
 
      binlog_[wild_]{do|ignore}_table?" (WL#1049)"
1882
 
    */
1883
 
    if (session && !(session->options & OPTION_BIN_LOG))
1884
 
    {
1885
 
      pthread_mutex_unlock(&LOCK_log);
1886
 
      return(0);
1887
 
    }
1888
 
 
1889
 
    /*
1890
 
       Write the SQL command
1891
 
     */
1892
 
 
1893
 
    if (event_info->write(file))
1894
 
      goto err;
1895
 
 
1896
 
    if (file == &log_file) // we are writing to the real log (disk)
1897
 
    {
1898
 
      if (flush_and_sync())
1899
 
        goto err;
1900
 
      signal_update();
1901
 
      rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
1902
 
    }
1903
 
    error=0;
1904
 
 
1905
 
err:
1906
 
    if (error)
1907
 
    {
1908
 
      if (my_errno == EFBIG)
1909
 
        my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(0));
1910
 
      else
1911
 
        my_error(ER_ERROR_ON_WRITE, MYF(0), name, errno);
1912
 
      write_error=1;
1913
 
    }
1914
 
  }
1915
 
 
1916
 
  if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F)
1917
 
    ++m_table_map_version;
1918
 
 
1919
 
  pthread_mutex_unlock(&LOCK_log);
1920
 
  return(error);
1921
 
}
1922
 
 
1923
 
void DRIZZLE_BIN_LOG::rotate_and_purge(uint32_t flags)
1924
 
{
1925
 
  if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
1926
 
    pthread_mutex_lock(&LOCK_log);
1927
 
  if ((flags & RP_FORCE_ROTATE) ||
1928
 
      (my_b_tell(&log_file) >= (my_off_t) max_size))
1929
 
  {
1930
 
    new_file_without_locking();
1931
 
    if (expire_logs_days)
1932
 
    {
1933
 
      time_t purge_time= time(0) - expire_logs_days*24*60*60;
1934
 
      if (purge_time >= 0)
1935
 
        purge_logs_before_date(purge_time);
1936
 
    }
1937
 
  }
1938
 
  if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
1939
 
    pthread_mutex_unlock(&LOCK_log);
1940
 
}
1941
 
 
1942
 
uint32_t DRIZZLE_BIN_LOG::next_file_id()
1943
 
{
1944
 
  uint32_t res;
1945
 
  pthread_mutex_lock(&LOCK_log);
1946
 
  res = file_id++;
1947
 
  pthread_mutex_unlock(&LOCK_log);
1948
 
  return res;
1949
 
}
1950
 
 
1951
 
 
1952
 
/*
1953
 
  Write the contents of a cache to the binary log.
1954
 
 
1955
 
  SYNOPSIS
1956
 
    write_cache()
1957
 
    cache    Cache to write to the binary log
1958
 
    lock_log True if the LOCK_log mutex should be aquired, false otherwise
1959
 
    sync_log True if the log should be flushed and sync:ed
1960
 
 
1961
 
  DESCRIPTION
1962
 
    Write the contents of the cache to the binary log. The cache will
1963
 
    be reset as a READ_CACHE to be able to read the contents from it.
1964
 
 */
1965
 
 
1966
 
int DRIZZLE_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
1967
 
{
1968
 
  Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
1969
 
 
1970
 
  if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
1971
 
    return ER_ERROR_ON_WRITE;
1972
 
  uint32_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
1973
 
  long val;
1974
 
  unsigned char header[LOG_EVENT_HEADER_LEN];
1975
 
 
1976
 
  /*
1977
 
    The events in the buffer have incorrect end_log_pos data
1978
 
    (relative to beginning of group rather than absolute),
1979
 
    so we'll recalculate them in situ so the binlog is always
1980
 
    correct, even in the middle of a group. This is possible
1981
 
    because we now know the start position of the group (the
1982
 
    offset of this cache in the log, if you will); all we need
1983
 
    to do is to find all event-headers, and add the position of
1984
 
    the group to the end_log_pos of each event.  This is pretty
1985
 
    straight forward, except that we read the cache in segments,
1986
 
    so an event-header might end up on the cache-border and get
1987
 
    split.
1988
 
  */
1989
 
 
1990
 
  group= (uint)my_b_tell(&log_file);
1991
 
  hdr_offs= carry= 0;
1992
 
 
1993
 
  do
1994
 
  {
1995
 
 
1996
 
    /*
1997
 
      if we only got a partial header in the last iteration,
1998
 
      get the other half now and process a full header.
1999
 
    */
2000
 
    if (unlikely(carry > 0))
2001
 
    {
2002
 
      assert(carry < LOG_EVENT_HEADER_LEN);
2003
 
 
2004
 
      /* assemble both halves */
2005
 
      memcpy(&header[carry], cache->read_pos, LOG_EVENT_HEADER_LEN - carry);
2006
 
 
2007
 
      /* fix end_log_pos */
2008
 
      val= uint4korr(&header[LOG_POS_OFFSET]) + group;
2009
 
      int4store(&header[LOG_POS_OFFSET], val);
2010
 
 
2011
 
      /* write the first half of the split header */
2012
 
      if (my_b_write(&log_file, header, carry))
2013
 
        return ER_ERROR_ON_WRITE;
2014
 
 
2015
 
      /*
2016
 
        copy fixed second half of header to cache so the correct
2017
 
        version will be written later.
2018
 
      */
2019
 
      memcpy(cache->read_pos, &header[carry], LOG_EVENT_HEADER_LEN - carry);
2020
 
 
2021
 
      /* next event header at ... */
2022
 
      hdr_offs = uint4korr(&header[EVENT_LEN_OFFSET]) - carry;
2023
 
 
2024
 
      carry= 0;
2025
 
    }
2026
 
 
2027
 
    /* if there is anything to write, process it. */
2028
 
 
2029
 
    if (likely(length > 0))
2030
 
    {
2031
 
      /*
2032
 
        process all event-headers in this (partial) cache.
2033
 
        if next header is beyond current read-buffer,
2034
 
        we'll get it later (though not necessarily in the
2035
 
        very next iteration, just "eventually").
2036
 
      */
2037
 
 
2038
 
      while (hdr_offs < length)
2039
 
      {
2040
 
        /*
2041
 
          partial header only? save what we can get, process once
2042
 
          we get the rest.
2043
 
        */
2044
 
 
2045
 
        if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
2046
 
        {
2047
 
          carry= length - hdr_offs;
2048
 
          memcpy(header, cache->read_pos + hdr_offs, carry);
2049
 
          length= hdr_offs;
2050
 
        }
2051
 
        else
2052
 
        {
2053
 
          /* we've got a full event-header, and it came in one piece */
2054
 
 
2055
 
          unsigned char *log_pos= (unsigned char *)cache->read_pos + hdr_offs + LOG_POS_OFFSET;
2056
 
 
2057
 
          /* fix end_log_pos */
2058
 
          val= uint4korr(log_pos) + group;
2059
 
          int4store(log_pos, val);
2060
 
 
2061
 
          /* next event header at ... */
2062
 
          log_pos= (unsigned char *)cache->read_pos + hdr_offs + EVENT_LEN_OFFSET;
2063
 
          hdr_offs += uint4korr(log_pos);
2064
 
 
2065
 
        }
2066
 
      }
2067
 
 
2068
 
      /*
2069
 
        Adjust hdr_offs. Note that it may still point beyond the segment
2070
 
        read in the next iteration; if the current event is very long,
2071
 
        it may take a couple of read-iterations (and subsequent adjustments
2072
 
        of hdr_offs) for it to point into the then-current segment.
2073
 
        If we have a split header (!carry), hdr_offs will be set at the
2074
 
        beginning of the next iteration, overwriting the value we set here:
2075
 
      */
2076
 
      hdr_offs -= length;
2077
 
    }
2078
 
 
2079
 
    /* Write data to the binary log file */
2080
 
    if (my_b_write(&log_file, cache->read_pos, length))
2081
 
      return ER_ERROR_ON_WRITE;
2082
 
    cache->read_pos=cache->read_end;            // Mark buffer used up
2083
 
  } while ((length= my_b_fill(cache)));
2084
 
 
2085
 
  assert(carry == 0);
2086
 
 
2087
 
  if (sync_log)
2088
 
    flush_and_sync();
2089
 
 
2090
 
  return 0;                                     // All OK
2091
 
}
2092
 
 
2093
 
/**
2094
 
  Write a cached log entry to the binary log.
2095
 
  - To support transaction over replication, we wrap the transaction
2096
 
  with BEGIN/COMMIT or BEGIN/ROLLBACK in the binary log.
2097
 
  We want to write a BEGIN/ROLLBACK block when a non-transactional table
2098
 
  was updated in a transaction which was rolled back. This is to ensure
2099
 
  that the same updates are run on the slave.
2100
 
 
2101
 
  @param session
2102
 
  @param cache          The cache to copy to the binlog
2103
 
  @param commit_event   The commit event to print after writing the
2104
 
                        contents of the cache.
2105
 
 
2106
 
  @note
2107
 
    We only come here if there is something in the cache.
2108
 
  @note
2109
 
    The thing in the cache is always a complete transaction.
2110
 
  @note
2111
 
    'cache' needs to be reinitialized after this functions returns.
2112
 
*/
2113
 
 
2114
 
bool DRIZZLE_BIN_LOG::write(Session *session, IO_CACHE *cache, Log_event *commit_event)
2115
 
{
2116
 
  pthread_mutex_lock(&LOCK_log);
2117
 
 
2118
 
  /* NULL would represent nothing to replicate after ROLLBACK */
2119
 
  assert(commit_event != NULL);
2120
 
 
2121
 
  assert(is_open());
2122
 
  if (likely(is_open()))                       // Should always be true
2123
 
  {
2124
 
    /*
2125
 
      We only bother to write to the binary log if there is anything
2126
 
      to write.
2127
 
     */
2128
 
    if (my_b_tell(cache) > 0)
2129
 
    {
2130
 
      /*
2131
 
        Log "BEGIN" at the beginning of every transaction.  Here, a
2132
 
        transaction is either a BEGIN..COMMIT block or a single
2133
 
        statement in autocommit mode.
2134
 
      */
2135
 
      Query_log_event qinfo(session, STRING_WITH_LEN("BEGIN"), true, false);
2136
 
      /*
2137
 
        Imagine this is rollback due to net timeout, after all
2138
 
        statements of the transaction succeeded. Then we want a
2139
 
        zero-error code in BEGIN.  In other words, if there was a
2140
 
        really serious error code it's already in the statement's
2141
 
        events, there is no need to put it also in this internally
2142
 
        generated event, and as this event is generated late it would
2143
 
        lead to false alarms.
2144
 
 
2145
 
        This is safer than session->clear_error() against kills at shutdown.
2146
 
      */
2147
 
      qinfo.error_code= 0;
2148
 
      /*
2149
 
        Now this Query_log_event has artificial log_pos 0. It must be
2150
 
        adjusted to reflect the real position in the log. Not doing it
2151
 
        would confuse the slave: it would prevent this one from
2152
 
        knowing where he is in the master's binlog, which would result
2153
 
        in wrong positions being shown to the user, MASTER_POS_WAIT
2154
 
        undue waiting etc.
2155
 
      */
2156
 
      if (qinfo.write(&log_file))
2157
 
        goto err;
2158
 
 
2159
 
      if ((write_error= write_cache(cache, false, false)))
2160
 
        goto err;
2161
 
 
2162
 
      if (commit_event && commit_event->write(&log_file))
2163
 
        goto err;
2164
 
      if (flush_and_sync())
2165
 
        goto err;
2166
 
      if (cache->error)                         // Error on read
2167
 
      {
2168
 
        errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_ERROR_ON_READ), cache->file_name, errno);
2169
 
        write_error=1;                          // Don't give more errors
2170
 
        goto err;
2171
 
      }
2172
 
      signal_update();
2173
 
    }
2174
 
 
2175
 
    /*
2176
 
      if commit_event is Xid_log_event, increase the number of
2177
 
      prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
2178
 
      if there're prepared xids in it - see the comment in new_file() for
2179
 
      an explanation.
2180
 
      If the commit_event is not Xid_log_event (then it's a Query_log_event)
2181
 
      rotate binlog, if necessary.
2182
 
    */
2183
 
    if (commit_event && commit_event->get_type_code() == XID_EVENT)
2184
 
    {
2185
 
      pthread_mutex_lock(&LOCK_prep_xids);
2186
 
      prepared_xids++;
2187
 
      pthread_mutex_unlock(&LOCK_prep_xids);
2188
 
    }
2189
 
    else
2190
 
      rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
2191
 
  }
2192
 
  pthread_mutex_unlock(&LOCK_log);
2193
 
 
2194
 
  return(0);
2195
 
 
2196
 
err:
2197
 
  if (!write_error)
2198
 
  {
2199
 
    write_error= 1;
2200
 
    errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_ERROR_ON_WRITE), name, errno);
2201
 
  }
2202
 
  pthread_mutex_unlock(&LOCK_log);
2203
 
  return(1);
2204
 
}
2205
 
 
2206
 
 
2207
 
/**
2208
 
  Wait until we get a signal that the relay log has been updated
2209
 
 
2210
 
  @param[in] session   a Session struct
2211
 
  @note
2212
 
    LOCK_log must be taken before calling this function.
2213
 
    It will be released at the end of the function.
2214
 
*/
2215
 
 
2216
 
void DRIZZLE_BIN_LOG::wait_for_update_relay_log(Session* session)
2217
 
{
2218
 
  const char *old_msg;
2219
 
  old_msg= session->enter_cond(&update_cond, &LOCK_log,
2220
 
                           "Slave has read all relay log; "
2221
 
                           "waiting for the slave I/O "
2222
 
                           "thread to update it" );
2223
 
  pthread_cond_wait(&update_cond, &LOCK_log);
2224
 
  session->exit_cond(old_msg);
2225
 
  return;
2226
 
}
2227
 
 
2228
 
 
2229
 
/**
2230
 
  Wait until we get a signal that the binary log has been updated.
2231
 
  Applies to master only.
2232
 
 
2233
 
  NOTES
2234
 
  @param[in] session        a Session struct
2235
 
  @param[in] timeout    a pointer to a timespec;
2236
 
                        NULL means to wait w/o timeout.
2237
 
  @retval    0          if got signalled on update
2238
 
  @retval    non-0      if wait timeout elapsed
2239
 
  @note
2240
 
    LOCK_log must be taken before calling this function.
2241
 
    LOCK_log is being released while the thread is waiting.
2242
 
    LOCK_log is released by the caller.
2243
 
*/
2244
 
 
2245
 
int DRIZZLE_BIN_LOG::wait_for_update_bin_log(Session* session,
2246
 
                                           const struct timespec *timeout)
2247
 
{
2248
 
  int ret= 0;
2249
 
  const char* old_msg = session->get_proc_info();
2250
 
  old_msg= session->enter_cond(&update_cond, &LOCK_log,
2251
 
                           "Master has sent all binlog to slave; "
2252
 
                           "waiting for binlog to be updated");
2253
 
  if (!timeout)
2254
 
    pthread_cond_wait(&update_cond, &LOCK_log);
2255
 
  else
2256
 
    ret= pthread_cond_timedwait(&update_cond, &LOCK_log,
2257
 
                                const_cast<struct timespec *>(timeout));
2258
 
  return(ret);
2259
 
}
2260
 
 
2261
 
 
2262
 
/**
2263
 
  Close the log file.
2264
 
 
2265
 
  @param exiting     Bitmask for one or more of the following bits:
2266
 
          - LOG_CLOSE_INDEX : if we should close the index file
2267
 
          - LOG_CLOSE_TO_BE_OPENED : if we intend to call open
2268
 
                                     at once after close.
2269
 
          - LOG_CLOSE_STOP_EVENT : write a 'stop' event to the log
2270
 
 
2271
 
  @note
2272
 
    One can do an open on the object at once after doing a close.
2273
 
    The internal structures are not freed until cleanup() is called
2274
 
*/
2275
 
 
2276
 
void DRIZZLE_BIN_LOG::close(uint32_t exiting)
2277
 
{                                       // One can't set log_type here!
2278
 
  if (log_state == LOG_OPENED)
2279
 
  {
2280
 
    if (log_type == LOG_BIN && !no_auto_events &&
2281
 
        (exiting & LOG_CLOSE_STOP_EVENT))
2282
 
    {
2283
 
      Stop_log_event s;
2284
 
      s.write(&log_file);
2285
 
      bytes_written+= s.data_written;
2286
 
      signal_update();
2287
 
    }
2288
 
 
2289
 
    /* don't pwrite in a file opened with O_APPEND - it doesn't work */
2290
 
    if (log_file.type == WRITE_CACHE && log_type == LOG_BIN)
2291
 
    {
2292
 
      my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
2293
 
      unsigned char flags= 0;            // clearing LOG_EVENT_BINLOG_IN_USE_F
2294
 
      assert(pwrite(log_file.file, &flags, 1, offset)==1);
2295
 
    }
2296
 
 
2297
 
    /* this will cleanup IO_CACHE, sync and close the file */
2298
 
    DRIZZLE_LOG::close(exiting);
2299
 
  }
2300
 
 
2301
 
  /*
2302
 
    The following test is needed even if is_open() is not set, as we may have
2303
 
    called a not complete close earlier and the index file is still open.
2304
 
  */
2305
 
 
2306
 
  if ((exiting & LOG_CLOSE_INDEX) && my_b_inited(&index_file))
2307
 
  {
2308
 
    end_io_cache(&index_file);
2309
 
    if (my_close(index_file.file, MYF(0)) < 0 && ! write_error)
2310
 
    {
2311
 
      write_error= 1;
2312
 
      errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_ERROR_ON_WRITE), index_file_name, errno);
2313
 
    }
2314
 
  }
2315
 
  log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
2316
 
  if (name)
2317
 
  {
2318
 
    free(name);
2319
 
    name= NULL;
2320
 
  }
2321
 
  return;
2322
 
}
2323
 
 
2324
 
 
2325
 
void DRIZZLE_BIN_LOG::set_max_size(ulong max_size_arg)
2326
 
{
2327
 
  /*
2328
 
    We need to take locks, otherwise this may happen:
2329
 
    new_file() is called, calls open(old_max_size), then before open() starts,
2330
 
    set_max_size() sets max_size to max_size_arg, then open() starts and
2331
 
    uses the old_max_size argument, so max_size_arg has been overwritten and
2332
 
    it's like if the SET command was never run.
2333
 
  */
2334
 
  pthread_mutex_lock(&LOCK_log);
2335
 
  if (is_open())
2336
 
    max_size= max_size_arg;
2337
 
  pthread_mutex_unlock(&LOCK_log);
2338
 
  return;
2339
 
}
2340
 
 
2341
 
 
2342
 
/**
2343
 
  Check if a string is a valid number.
2344
 
 
2345
 
  @param str                    String to test
2346
 
  @param res                    Store value here
2347
 
  @param allow_wildcards        Set to 1 if we should ignore '%' and '_'
2348
 
 
2349
 
  @note
2350
 
    For the moment the allow_wildcards argument is not used
2351
 
    Should be move to some other file.
2352
 
 
2353
 
  @retval
2354
 
    1   String is a number
2355
 
  @retval
2356
 
    0   Error
2357
 
*/
2358
 
 
2359
 
static bool test_if_number(register const char *str,
2360
 
                           long *res, bool allow_wildcards)
2361
 
{
2362
 
  register int flag;
2363
 
  const char *start;
2364
 
 
2365
 
  flag= 0;
2366
 
  start= str;
2367
 
  while (*str++ == ' ') ;
2368
 
  if (*--str == '-' || *str == '+')
2369
 
    str++;
2370
 
  while (my_isdigit(files_charset_info,*str) ||
2371
 
         (allow_wildcards && (*str == wild_many || *str == wild_one)))
2372
 
  {
2373
 
    flag=1;
2374
 
    str++;
2375
 
  }
2376
 
  if (*str == '.')
2377
 
  {
2378
 
    for (str++ ;
2379
 
         my_isdigit(files_charset_info,*str) ||
2380
 
           (allow_wildcards && (*str == wild_many || *str == wild_one)) ;
2381
 
         str++, flag=1) ;
2382
 
  }
2383
 
  if (*str != 0 || flag == 0)
2384
 
    return(0);
2385
 
  if (res)
2386
 
    *res=atol(start);
2387
 
  return(1);                    /* Number ok */
2388
 
} /* test_if_number */
2389
 
 
2390
 
void DRIZZLE_BIN_LOG::signal_update()
2391
 
{
2392
 
  pthread_cond_broadcast(&update_cond);
2393
 
  return;
2394
 
}
2395
 
 
2396
 
/********* transaction coordinator log for 2pc - mmap() based solution *******/
2397
 
 
2398
 
/*
2399
 
  the log consists of a file, mmapped to a memory.
2400
 
  file is divided on pages of tc_log_page_size size.
2401
 
  (usable size of the first page is smaller because of log header)
2402
 
  there's PAGE control structure for each page
2403
 
  each page (or rather PAGE control structure) can be in one of three
2404
 
  states - active, syncing, pool.
2405
 
  there could be only one page in active or syncing states,
2406
 
  but many in pool - pool is fifo queue.
2407
 
  usual lifecycle of a page is pool->active->syncing->pool
2408
 
  "active" page - is a page where new xid's are logged.
2409
 
  the page stays active as long as syncing slot is taken.
2410
 
  "syncing" page is being synced to disk. no new xid can be added to it.
2411
 
  when the sync is done the page is moved to a pool and an active page
2412
 
  becomes "syncing".
2413
 
 
2414
 
  the result of such an architecture is a natural "commit grouping" -
2415
 
  If commits are coming faster than the system can sync, they do not
2416
 
  stall. Instead, all commit that came since the last sync are
2417
 
  logged to the same page, and they all are synced with the next -
2418
 
  one - sync. Thus, thought individual commits are delayed, throughput
2419
 
  is not decreasing.
2420
 
 
2421
 
  when a xid is added to an active page, the thread of this xid waits
2422
 
  for a page's condition until the page is synced. when syncing slot
2423
 
  becomes vacant one of these waiters is awaken to take care of syncing.
2424
 
  it syncs the page and signals all waiters that the page is synced.
2425
 
  PAGE::waiters is used to count these waiters, and a page may never
2426
 
  become active again until waiters==0 (that is all waiters from the
2427
 
  previous sync have noticed the sync was completed)
2428
 
 
2429
 
  note, that the page becomes "dirty" and has to be synced only when a
2430
 
  new xid is added into it. Removing a xid from a page does not make it
2431
 
  dirty - we don't sync removals to disk.
2432
 
*/
2433
 
 
2434
 
uint64_t tc_log_page_waits= 0;
2435
 
 
2436
 
#ifdef HAVE_MMAP
2437
 
 
2438
 
#define TC_LOG_HEADER_SIZE (sizeof(tc_log_magic)+1)
2439
 
 
2440
 
static const char tc_log_magic[]={(char) 254, 0x23, 0x05, 0x74};
2441
 
 
2442
 
uint64_t opt_tc_log_size= TC_LOG_MIN_SIZE;
2443
 
uint64_t tc_log_max_pages_used= 0;
2444
 
uint64_t tc_log_page_size= 0;
2445
 
uint64_t tc_log_cur_pages_used= 0;
2446
 
 
2447
 
int TC_LOG_MMAP::open(const char *opt_name)
2448
 
{
2449
 
  uint32_t i;
2450
 
  bool crashed= false;
2451
 
  PAGE *pg;
2452
 
 
2453
 
  assert(total_ha_2pc > 1);
2454
 
  assert(opt_name && opt_name[0]);
2455
 
 
2456
 
  tc_log_page_size= getpagesize();
2457
 
  assert(TC_LOG_PAGE_SIZE % tc_log_page_size == 0);
2458
 
 
2459
 
  fn_format(logname,opt_name,drizzle_data_home,"",MY_UNPACK_FILENAME);
2460
 
  if ((fd= my_open(logname, O_RDWR, MYF(0))) < 0)
2461
 
  {
2462
 
    if (my_errno != ENOENT)
2463
 
      goto err;
2464
 
    if (using_heuristic_recover())
2465
 
      return 1;
2466
 
    if ((fd= my_create(logname, CREATE_MODE, O_RDWR, MYF(MY_WME))) < 0)
2467
 
      goto err;
2468
 
    inited=1;
2469
 
    file_length= opt_tc_log_size;
2470
 
    if (ftruncate(fd, file_length))
2471
 
      goto err;
2472
 
  }
2473
 
  else
2474
 
  {
2475
 
    inited= 1;
2476
 
    crashed= true;
2477
 
    errmsg_printf(ERRMSG_LVL_INFO, _("Recovering after a crash using %s"), opt_name);
2478
 
    if (tc_heuristic_recover)
2479
 
    {
2480
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Cannot perform automatic crash recovery when "
2481
 
                      "--tc-heuristic-recover is used"));
2482
 
      goto err;
2483
 
    }
2484
 
    file_length= lseek(fd, 0, SEEK_END);
2485
 
    if (file_length == OFF_T_MAX || file_length % tc_log_page_size)
2486
 
      goto err;
2487
 
  }
2488
 
 
2489
 
  data= (unsigned char *)my_mmap(0, (size_t)file_length, PROT_READ|PROT_WRITE,
2490
 
                        MAP_NOSYNC|MAP_SHARED, fd, 0);
2491
 
  if (data == MAP_FAILED)
2492
 
  {
2493
 
    my_errno=errno;
2494
 
    goto err;
2495
 
  }
2496
 
  inited=2;
2497
 
 
2498
 
  npages=(uint)file_length/tc_log_page_size;
2499
 
  assert(npages >= 3);             // to guarantee non-empty pool
2500
 
  if (!(pages=(PAGE *)malloc(npages*sizeof(PAGE))))
2501
 
    goto err;
2502
 
  memset(pages, 0, npages*sizeof(PAGE));
2503
 
  inited=3;
2504
 
  for (pg=pages, i=0; i < npages; i++, pg++)
2505
 
  {
2506
 
    pg->next=pg+1;
2507
 
    pg->waiters=0;
2508
 
    pg->state=POOL;
2509
 
    pthread_mutex_init(&pg->lock, MY_MUTEX_INIT_FAST);
2510
 
    pthread_cond_init (&pg->cond, 0);
2511
 
    pg->start=(my_xid *)(data + i*tc_log_page_size);
2512
 
    pg->ptr=pg->start;
2513
 
    pg->end=(my_xid *)(pg->start + tc_log_page_size);
2514
 
    pg->size=pg->free=tc_log_page_size/sizeof(my_xid);
2515
 
  }
2516
 
  pages[0].size=pages[0].free=
2517
 
                (tc_log_page_size-TC_LOG_HEADER_SIZE)/sizeof(my_xid);
2518
 
  pages[0].start=pages[0].end-pages[0].size;
2519
 
  pages[npages-1].next=0;
2520
 
  inited=4;
2521
 
 
2522
 
  if (crashed && recover())
2523
 
      goto err;
2524
 
 
2525
 
  memcpy(data, tc_log_magic, sizeof(tc_log_magic));
2526
 
  data[sizeof(tc_log_magic)]= (unsigned char)total_ha_2pc;
2527
 
  // must cast data to (char *) for solaris. Arg1 is (void *) on linux
2528
 
  //   so the cast should be fine.
2529
 
  msync((char *)data, tc_log_page_size, MS_SYNC);
2530
 
  my_sync(fd, MYF(0));
2531
 
  inited=5;
2532
 
 
2533
 
  pthread_mutex_init(&LOCK_sync,    MY_MUTEX_INIT_FAST);
2534
 
  pthread_mutex_init(&LOCK_active,  MY_MUTEX_INIT_FAST);
2535
 
  pthread_mutex_init(&LOCK_pool,    MY_MUTEX_INIT_FAST);
2536
 
  pthread_cond_init(&COND_active, 0);
2537
 
  pthread_cond_init(&COND_pool, 0);
2538
 
 
2539
 
  inited=6;
2540
 
 
2541
 
  syncing= 0;
2542
 
  active=pages;
2543
 
  pool=pages+1;
2544
 
  pool_last=pages+npages-1;
2545
 
 
2546
 
  return 0;
2547
 
 
2548
 
err:
2549
 
  close();
2550
 
  return 1;
2551
 
}
2552
 
 
2553
 
/**
2554
 
  there is no active page, let's got one from the pool.
2555
 
 
2556
 
  Two strategies here:
2557
 
    -# take the first from the pool
2558
 
    -# if there're waiters - take the one with the most free space.
2559
 
 
2560
 
  @todo
2561
 
    TODO page merging. try to allocate adjacent page first,
2562
 
    so that they can be flushed both in one sync
2563
 
*/
2564
 
 
2565
 
void TC_LOG_MMAP::get_active_from_pool()
2566
 
{
2567
 
  PAGE **p, **best_p=0;
2568
 
  int best_free;
2569
 
 
2570
 
  if (syncing)
2571
 
    pthread_mutex_lock(&LOCK_pool);
2572
 
 
2573
 
  do
2574
 
  {
2575
 
    best_p= p= &pool;
2576
 
    if ((*p)->waiters == 0) // can the first page be used ?
2577
 
      break;                // yes - take it.
2578
 
 
2579
 
    best_free=0;            // no - trying second strategy
2580
 
    for (p=&(*p)->next; *p; p=&(*p)->next)
2581
 
    {
2582
 
      if ((*p)->waiters == 0 && (*p)->free > best_free)
2583
 
      {
2584
 
        best_free=(*p)->free;
2585
 
        best_p=p;
2586
 
      }
2587
 
    }
2588
 
  }
2589
 
  while ((*best_p == 0 || best_free == 0) && overflow());
2590
 
 
2591
 
  active=*best_p;
2592
 
  if (active->free == active->size) // we've chosen an empty page
2593
 
  {
2594
 
    tc_log_cur_pages_used++;
2595
 
    set_if_bigger(tc_log_max_pages_used, tc_log_cur_pages_used);
2596
 
  }
2597
 
 
2598
 
  if ((*best_p)->next)              // unlink the page from the pool
2599
 
    *best_p=(*best_p)->next;
2600
 
  else
2601
 
    pool_last=*best_p;
2602
 
 
2603
 
  if (syncing)
2604
 
    pthread_mutex_unlock(&LOCK_pool);
2605
 
}
2606
 
 
2607
 
/**
2608
 
  @todo
2609
 
  perhaps, increase log size ?
2610
 
*/
2611
 
int TC_LOG_MMAP::overflow()
2612
 
{
2613
 
  /*
2614
 
    simple overflow handling - just wait
2615
 
    TODO perhaps, increase log size ?
2616
 
    let's check the behaviour of tc_log_page_waits first
2617
 
  */
2618
 
  tc_log_page_waits++;
2619
 
  pthread_cond_wait(&COND_pool, &LOCK_pool);
2620
 
  return 1; // always return 1
2621
 
}
2622
 
 
2623
 
/**
2624
 
  Record that transaction XID is committed on the persistent storage.
2625
 
 
2626
 
    This function is called in the middle of two-phase commit:
2627
 
    First all resources prepare the transaction, then tc_log->log() is called,
2628
 
    then all resources commit the transaction, then tc_log->unlog() is called.
2629
 
 
2630
 
    All access to active page is serialized but it's not a problem, as
2631
 
    we're assuming that fsync() will be a main bottleneck.
2632
 
    That is, parallelizing writes to log pages we'll decrease number of
2633
 
    threads waiting for a page, but then all these threads will be waiting
2634
 
    for a fsync() anyway
2635
 
 
2636
 
   If tc_log == DRIZZLE_LOG then tc_log writes transaction to binlog and
2637
 
   records XID in a special Xid_log_event.
2638
 
   If tc_log = TC_LOG_MMAP then xid is written in a special memory-mapped
2639
 
   log.
2640
 
 
2641
 
  @retval
2642
 
    0  - error
2643
 
  @retval
2644
 
    \# - otherwise, "cookie", a number that will be passed as an argument
2645
 
    to unlog() call. tc_log can define it any way it wants,
2646
 
    and use for whatever purposes. TC_LOG_MMAP sets it
2647
 
    to the position in memory where xid was logged to.
2648
 
*/
2649
 
 
2650
 
int TC_LOG_MMAP::log_xid(Session *, my_xid xid)
2651
 
{
2652
 
  int err;
2653
 
  PAGE *p;
2654
 
  ulong cookie;
2655
 
 
2656
 
  pthread_mutex_lock(&LOCK_active);
2657
 
 
2658
 
  /*
2659
 
    if active page is full - just wait...
2660
 
    frankly speaking, active->free here accessed outside of mutex
2661
 
    protection, but it's safe, because it only means we may miss an
2662
 
    unlog() for the active page, and we're not waiting for it here -
2663
 
    unlog() does not signal COND_active.
2664
 
  */
2665
 
  while (unlikely(active && active->free == 0))
2666
 
    pthread_cond_wait(&COND_active, &LOCK_active);
2667
 
 
2668
 
  /* no active page ? take one from the pool */
2669
 
  if (active == 0)
2670
 
    get_active_from_pool();
2671
 
 
2672
 
  p=active;
2673
 
  pthread_mutex_lock(&p->lock);
2674
 
 
2675
 
  /* searching for an empty slot */
2676
 
  while (*p->ptr)
2677
 
  {
2678
 
    p->ptr++;
2679
 
    assert(p->ptr < p->end);               // because p->free > 0
2680
 
  }
2681
 
 
2682
 
  /* found! store xid there and mark the page dirty */
2683
 
  cookie= (ulong)((unsigned char *)p->ptr - data);      // can never be zero
2684
 
  *p->ptr++= xid;
2685
 
  p->free--;
2686
 
  p->state= DIRTY;
2687
 
 
2688
 
  /* to sync or not to sync - this is the question */
2689
 
  pthread_mutex_unlock(&LOCK_active);
2690
 
  pthread_mutex_lock(&LOCK_sync);
2691
 
  pthread_mutex_unlock(&p->lock);
2692
 
 
2693
 
  if (syncing)
2694
 
  {                                          // somebody's syncing. let's wait
2695
 
    p->waiters++;
2696
 
    /*
2697
 
      note - it must be while (), not do ... while () here
2698
 
      as p->state may be not DIRTY when we come here
2699
 
    */
2700
 
    while (p->state == DIRTY && syncing)
2701
 
      pthread_cond_wait(&p->cond, &LOCK_sync);
2702
 
    p->waiters--;
2703
 
    err= p->state == ERROR;
2704
 
    if (p->state != DIRTY)                   // page was synced
2705
 
    {
2706
 
      if (p->waiters == 0)
2707
 
        pthread_cond_signal(&COND_pool);     // in case somebody's waiting
2708
 
      pthread_mutex_unlock(&LOCK_sync);
2709
 
      goto done;                             // we're done
2710
 
    }
2711
 
  }                                          // page was not synced! do it now
2712
 
  assert(active == p && syncing == 0);
2713
 
  pthread_mutex_lock(&LOCK_active);
2714
 
  syncing=p;                                 // place is vacant - take it
2715
 
  active=0;                                  // page is not active anymore
2716
 
  pthread_cond_broadcast(&COND_active);      // in case somebody's waiting
2717
 
  pthread_mutex_unlock(&LOCK_active);
2718
 
  pthread_mutex_unlock(&LOCK_sync);
2719
 
  err= sync();
2720
 
 
2721
 
done:
2722
 
  return err ? 0 : cookie;
2723
 
}
2724
 
 
2725
 
int TC_LOG_MMAP::sync()
2726
 
{
2727
 
  int err;
2728
 
 
2729
 
  assert(syncing != active);
2730
 
 
2731
 
  /*
2732
 
    sit down and relax - this can take a while...
2733
 
    note - no locks are held at this point
2734
 
  */
2735
 
  // must cast data to (char *) for solaris. Arg1 is (void *) on linux
2736
 
  //   so the cast should be fine.
2737
 
  err= msync((char *)syncing->start, 1, MS_SYNC);
2738
 
  if(err==0)
2739
 
    err= my_sync(fd, MYF(0));
2740
 
 
2741
 
  /* page is synced. let's move it to the pool */
2742
 
  pthread_mutex_lock(&LOCK_pool);
2743
 
  pool_last->next=syncing;
2744
 
  pool_last=syncing;
2745
 
  syncing->next=0;
2746
 
  syncing->state= err ? ERROR : POOL;
2747
 
  pthread_cond_broadcast(&syncing->cond);    // signal "sync done"
2748
 
  pthread_cond_signal(&COND_pool);           // in case somebody's waiting
2749
 
  pthread_mutex_unlock(&LOCK_pool);
2750
 
 
2751
 
  /* marking 'syncing' slot free */
2752
 
  pthread_mutex_lock(&LOCK_sync);
2753
 
  syncing=0;
2754
 
  pthread_cond_signal(&active->cond);        // wake up a new syncer
2755
 
  pthread_mutex_unlock(&LOCK_sync);
2756
 
  return err;
2757
 
}
2758
 
 
2759
 
/**
2760
 
  erase xid from the page, update page free space counters/pointers.
2761
 
  cookie points directly to the memory where xid was logged.
2762
 
*/
2763
 
 
2764
 
void TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
2765
 
{
2766
 
  PAGE *p=pages+(cookie/tc_log_page_size);
2767
 
  my_xid *x=(my_xid *)(data+cookie);
2768
 
 
2769
 
  assert(*x == xid);
2770
 
  assert(x >= p->start && x < p->end);
2771
 
  *x=0;
2772
 
 
2773
 
  pthread_mutex_lock(&p->lock);
2774
 
  p->free++;
2775
 
  assert(p->free <= p->size);
2776
 
  set_if_smaller(p->ptr, x);
2777
 
  if (p->free == p->size)               // the page is completely empty
2778
 
    statistic_decrement(tc_log_cur_pages_used, &LOCK_status);
2779
 
  if (p->waiters == 0)                 // the page is in pool and ready to rock
2780
 
    pthread_cond_signal(&COND_pool);   // ping ... for overflow()
2781
 
  pthread_mutex_unlock(&p->lock);
2782
 
}
2783
 
 
2784
 
void TC_LOG_MMAP::close()
2785
 
{
2786
 
  uint32_t i;
2787
 
  switch (inited) {
2788
 
  case 6:
2789
 
    pthread_mutex_destroy(&LOCK_sync);
2790
 
    pthread_mutex_destroy(&LOCK_active);
2791
 
    pthread_mutex_destroy(&LOCK_pool);
2792
 
    pthread_cond_destroy(&COND_pool);
2793
 
  case 5:
2794
 
    data[0]='A'; // garble the first (signature) byte, in case my_delete fails
2795
 
  case 4:
2796
 
    for (i=0; i < npages; i++)
2797
 
    {
2798
 
      if (pages[i].ptr == 0)
2799
 
        break;
2800
 
      pthread_mutex_destroy(&pages[i].lock);
2801
 
      pthread_cond_destroy(&pages[i].cond);
2802
 
    }
2803
 
  case 3:
2804
 
    free((unsigned char*)pages);
2805
 
  case 2:
2806
 
    my_munmap((char*)data, (size_t)file_length);
2807
 
  case 1:
2808
 
    my_close(fd, MYF(0));
2809
 
  }
2810
 
  if (inited>=5) // cannot do in the switch because of Windows
2811
 
    my_delete(logname, MYF(MY_WME));
2812
 
  inited=0;
2813
 
}
2814
 
 
2815
 
int TC_LOG_MMAP::recover()
2816
 
{
2817
 
  HASH xids;
2818
 
  PAGE *p=pages, *end_p=pages+npages;
2819
 
 
2820
 
  if (memcmp(data, tc_log_magic, sizeof(tc_log_magic)))
2821
 
  {
2822
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Bad magic header in tc log"));
2823
 
    goto err1;
2824
 
  }
2825
 
 
2826
 
  /*
2827
 
    the first byte after magic signature is set to current
2828
 
    number of storage engines on startup
2829
 
  */
2830
 
  if (data[sizeof(tc_log_magic)] != total_ha_2pc)
2831
 
  {
2832
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Recovery failed! You must enable "
2833
 
                    "exactly %d storage engines that support "
2834
 
                    "two-phase commit protocol"),
2835
 
                    data[sizeof(tc_log_magic)]);
2836
 
    goto err1;
2837
 
  }
2838
 
 
2839
 
  if (hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0,
2840
 
                sizeof(my_xid), 0, 0, MYF(0)))
2841
 
    goto err1;
2842
 
 
2843
 
  for ( ; p < end_p ; p++)
2844
 
  {
2845
 
    for (my_xid *x=p->start; x < p->end; x++)
2846
 
      if (*x && my_hash_insert(&xids, (unsigned char *)x))
2847
 
        goto err2; // OOM
2848
 
  }
2849
 
 
2850
 
  if (ha_recover(&xids))
2851
 
    goto err2;
2852
 
 
2853
 
  hash_free(&xids);
2854
 
  memset(data, 0, (size_t)file_length);
2855
 
  return 0;
2856
 
 
2857
 
err2:
2858
 
  hash_free(&xids);
2859
 
err1:
2860
 
  errmsg_printf(ERRMSG_LVL_ERROR, _("Crash recovery failed. Either correct the problem "
2861
 
                  "(if it's, for example, out of memory error) and restart, "
2862
 
                  "or delete tc log and start drizzled with "
2863
 
                  "--tc-heuristic-recover={commit|rollback}"));
2864
 
  return 1;
2865
 
}
2866
 
#endif
2867
 
 
2868
 
TC_LOG *tc_log;
2869
 
TC_LOG_DUMMY tc_log_dummy;
2870
 
TC_LOG_MMAP  tc_log_mmap;
2871
 
 
2872
 
/**
2873
 
  Perform heuristic recovery, if --tc-heuristic-recover was used.
2874
 
 
2875
 
  @note
2876
 
    no matter whether heuristic recovery was successful or not
2877
 
    mysqld must exit. So, return value is the same in both cases.
2878
 
 
2879
 
  @retval
2880
 
    0   no heuristic recovery was requested
2881
 
  @retval
2882
 
    1   heuristic recovery was performed
2883
 
*/
2884
 
 
2885
 
int TC_LOG::using_heuristic_recover()
2886
 
{
2887
 
  if (!tc_heuristic_recover)
2888
 
    return 0;
2889
 
 
2890
 
  errmsg_printf(ERRMSG_LVL_INFO, _("Heuristic crash recovery mode"));
2891
 
  if (ha_recover(0))
2892
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Heuristic crash recovery failed"));
2893
 
  errmsg_printf(ERRMSG_LVL_INFO, _("Please restart mysqld without --tc-heuristic-recover"));
2894
 
  return 1;
2895
 
}
2896
 
 
2897
 
/****** transaction coordinator log for 2pc - binlog() based solution ******/
2898
 
#define TC_LOG_BINLOG DRIZZLE_BIN_LOG
2899
 
 
2900
 
/**
2901
 
  @todo
2902
 
  keep in-memory list of prepared transactions
2903
 
  (add to list in log(), remove on unlog())
2904
 
  and copy it to the new binlog if rotated
2905
 
  but let's check the behaviour of tc_log_page_waits first!
2906
 
*/
2907
 
 
2908
 
int TC_LOG_BINLOG::open(const char *opt_name)
2909
 
{
2910
 
  LOG_INFO log_info;
2911
 
  int      error= 1;
2912
 
 
2913
 
  assert(total_ha_2pc > 1);
2914
 
  assert(opt_name && opt_name[0]);
2915
 
 
2916
 
  pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST);
2917
 
  pthread_cond_init (&COND_prep_xids, 0);
2918
 
 
2919
 
  if (!my_b_inited(&index_file))
2920
 
  {
2921
 
    /* There was a failure to open the index file, can't open the binlog */
2922
 
    cleanup();
2923
 
    return 1;
2924
 
  }
2925
 
 
2926
 
  if (using_heuristic_recover())
2927
 
  {
2928
 
    /* generate a new binlog to mask a corrupted one */
2929
 
    open(opt_name, LOG_BIN, 0, WRITE_CACHE, 0, max_binlog_size, 0);
2930
 
    cleanup();
2931
 
    return 1;
2932
 
  }
2933
 
 
2934
 
  if ((error= find_log_pos(&log_info, NULL, 1)))
2935
 
  {
2936
 
    if (error != LOG_INFO_EOF)
2937
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("find_log_pos() failed (error: %d)"), error);
2938
 
    else
2939
 
      error= 0;
2940
 
    goto err;
2941
 
  }
2942
 
 
2943
 
  {
2944
 
    const char *errmsg;
2945
 
    IO_CACHE    log;
2946
 
    File        file;
2947
 
    Log_event  *ev=0;
2948
 
    Format_description_log_event fdle(BINLOG_VERSION);
2949
 
    char        log_name[FN_REFLEN];
2950
 
 
2951
 
    if (! fdle.is_valid())
2952
 
      goto err;
2953
 
 
2954
 
    do
2955
 
    {
2956
 
      strncpy(log_name, log_info.log_file_name, sizeof(log_name)-1);
2957
 
    } while (!(error= find_next_log(&log_info, 1)));
2958
 
 
2959
 
    if (error !=  LOG_INFO_EOF)
2960
 
    {
2961
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("find_log_pos() failed (error: %d)"), error);
2962
 
      goto err;
2963
 
    }
2964
 
 
2965
 
    if ((file= open_binlog(&log, log_name, &errmsg)) < 0)
2966
 
    {
2967
 
      errmsg_printf(ERRMSG_LVL_ERROR, "%s", errmsg);
2968
 
      goto err;
2969
 
    }
2970
 
 
2971
 
    if ((ev= Log_event::read_log_event(&log, 0, &fdle)) &&
2972
 
        ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
2973
 
        ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
2974
 
    {
2975
 
      errmsg_printf(ERRMSG_LVL_INFO, _("Recovering after a crash using %s"), opt_name);
2976
 
      error= recover(&log, (Format_description_log_event *)ev);
2977
 
    }
2978
 
    else
2979
 
      error=0;
2980
 
 
2981
 
    delete ev;
2982
 
    end_io_cache(&log);
2983
 
    my_close(file, MYF(MY_WME));
2984
 
 
2985
 
    if (error)
2986
 
      goto err;
2987
 
  }
2988
 
 
2989
 
err:
2990
 
  return error;
2991
 
}
2992
 
 
2993
 
/** This is called on shutdown, after ha_panic. */
2994
 
void TC_LOG_BINLOG::close()
2995
 
{
2996
 
  assert(prepared_xids==0);
2997
 
  pthread_mutex_destroy(&LOCK_prep_xids);
2998
 
  pthread_cond_destroy (&COND_prep_xids);
2999
 
}
3000
 
 
3001
 
/**
3002
 
  @todo
3003
 
  group commit
3004
 
 
3005
 
  @retval
3006
 
    0    error
3007
 
  @retval
3008
 
    1    success
3009
 
*/
3010
 
int TC_LOG_BINLOG::log_xid(Session *session, my_xid xid)
3011
 
{
3012
 
  Xid_log_event xle(session, xid);
3013
 
  /* TODO: Fix return type */
3014
 
  /*
3015
 
    We always commit the entire transaction when writing an XID. Also
3016
 
    note that the return value is inverted.
3017
 
 
3018
 
    TODO: fix backasswards logic on this method
3019
 
   */
3020
 
 
3021
 
  return replicator_end_transaction(session, true, true) ? false : true;
3022
 
}
3023
 
 
3024
 
void TC_LOG_BINLOG::unlog(ulong, my_xid)
3025
 
{
3026
 
  pthread_mutex_lock(&LOCK_prep_xids);
3027
 
  assert(prepared_xids > 0);
3028
 
  if (--prepared_xids == 0) {
3029
 
    pthread_cond_signal(&COND_prep_xids);
3030
 
  }
3031
 
  pthread_mutex_unlock(&LOCK_prep_xids);
3032
 
  rotate_and_purge(0);     // as ::write() did not rotate
3033
 
}
3034
 
 
3035
 
int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
3036
 
{
3037
 
  Log_event  *ev;
3038
 
  HASH xids;
3039
 
  MEM_ROOT mem_root;
3040
 
 
3041
 
  if (! fdle->is_valid() ||
3042
 
      hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
3043
 
                sizeof(my_xid), 0, 0, MYF(0)))
3044
 
    goto err1;
3045
 
 
3046
 
  init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE);
3047
 
 
3048
 
  fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error
3049
 
 
3050
 
  while ((ev= Log_event::read_log_event(log,0,fdle)) && ev->is_valid())
3051
 
  {
3052
 
    if (ev->get_type_code() == XID_EVENT)
3053
 
    {
3054
 
      Xid_log_event *xev=(Xid_log_event *)ev;
3055
 
      unsigned char *x= (unsigned char *) memdup_root(&mem_root, (unsigned char*) &xev->xid,
3056
 
                                      sizeof(xev->xid));
3057
 
      if (! x)
3058
 
        goto err2;
3059
 
      my_hash_insert(&xids, x);
3060
 
    }
3061
 
    delete ev;
3062
 
  }
3063
 
 
3064
 
  if (ha_recover(&xids))
3065
 
    goto err2;
3066
 
 
3067
 
  free_root(&mem_root, MYF(0));
3068
 
  hash_free(&xids);
3069
 
  return 0;
3070
 
 
3071
 
err2:
3072
 
  free_root(&mem_root, MYF(0));
3073
 
  hash_free(&xids);
3074
 
err1:
3075
 
  errmsg_printf(ERRMSG_LVL_ERROR, 
3076
 
                _("Crash recovery failed. Either correct the problem "
3077
 
                  "(if it's, for example, out of memory error) and restart, "
3078
 
                  "or delete (or rename) binary log and start mysqld with "
3079
 
                  "--tc-heuristic-recover={commit|rollback}"));
3080
 
  return 1;
3081
 
}
3082
 
 
3083
 
 
3084
 
bool DRIZZLE_BIN_LOG::is_table_mapped(Table *table) const
3085
 
{
3086
 
  return table->s->table_map_version == table_map_version();
3087
 
}
3088
 
 
3089
 
/**
3090
 
  Get the file name of the MySQL binlog.
3091
 
  @return the name of the binlog file
3092
 
*/
3093
 
extern "C"
3094
 
const char* drizzle_bin_log_file_name(void)
3095
 
{
3096
 
  return drizzle_bin_log.get_log_fname();
3097
 
}
3098
 
 
3099
 
 
3100
 
/**
3101
 
  Get the current position of the MySQL binlog.
3102
 
  @return byte offset from the beginning of the binlog
3103
 
*/
3104
 
extern "C"
3105
 
uint64_t drizzle_bin_log_file_pos(void)
3106
 
{
3107
 
  return (uint64_t) drizzle_bin_log.get_log_file()->pos_in_file;
3108
 
}
3109
 
 
3110
 
 
3111
275
mysql_declare_plugin(binlog)
3112
276
{
3113
277
  DRIZZLE_STORAGE_ENGINE_PLUGIN,