~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Monty Taylor
  • Date: 2008-08-02 01:03:15 UTC
  • mto: (236.1.42 codestyle)
  • mto: This revision was merged to the branch mainline in revision 261.
  • Revision ID: monty@inaugust.com-20080802010315-65h5938pymg9d99z
Moved m4 macros to top-level m4 dir, per GNU standards (and where gettext wanted it :)

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
 
#include <drizzled/server_includes.h>
 
16
#include "mysql_priv.h"
 
17
#ifdef HAVE_REPLICATION
17
18
 
18
 
#include <drizzled/replication/mi.h>
19
 
#include <drizzled/replication/replication.h>
20
 
#include <drizzled/log_event.h>
21
 
#include <libdrizzle/libdrizzle.h>
22
 
#include <mysys/hash.h>
23
 
#include <drizzled/error.h>
24
 
#include <drizzled/gettext.h>
25
 
#include <drizzled/data_home.h>
26
 
#include <drizzled/unireg.h>
27
 
#include <drizzled/item/return_int.h>
28
 
#include <drizzled/item/empty_string.h>
 
19
#include "rpl_mi.h"
 
20
#include "sql_repl.h"
 
21
#include "log_event.h"
 
22
#include "rpl_filter.h"
 
23
#include <drizzled/drizzled_error_messages.h>
29
24
 
30
25
int max_binlog_dump_events = 0; // unlimited
31
26
 
32
27
/*
33
 
  fake_rotate_event() builds a fake (=which does not exist physically in any
34
 
  binlog) Rotate event, which contains the name of the binlog we are going to
35
 
  send to the slave (because the slave may not know it if it just asked for
36
 
  MASTER_LOG_FILE='', MASTER_LOG_POS=4).
37
 
  < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
38
 
  After this version we always call it, so that a 3.23.58 slave can rely on
39
 
  it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
40
 
  zeros in the good positions which, by chance, make it possible for the 3.23
41
 
  slave to detect that this event is unexpected) (this is luck which happens
42
 
  because the master and slave disagree on the size of the header of
43
 
  Log_event).
 
28
    fake_rotate_event() builds a fake (=which does not exist physically in any
 
29
    binlog) Rotate event, which contains the name of the binlog we are going to
 
30
    send to the slave (because the slave may not know it if it just asked for
 
31
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
 
32
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
 
33
    After this version we always call it, so that a 3.23.58 slave can rely on
 
34
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
 
35
    zeros in the good positions which, by chance, make it possible for the 3.23
 
36
    slave to detect that this event is unexpected) (this is luck which happens
 
37
    because the master and slave disagree on the size of the header of
 
38
    Log_event).
44
39
 
45
 
  Relying on the event length of the Rotate event instead of these
46
 
  well-placed zeros was not possible as Rotate events have a variable-length
47
 
  part.
 
40
    Relying on the event length of the Rotate event instead of these
 
41
    well-placed zeros was not possible as Rotate events have a variable-length
 
42
    part.
48
43
*/
49
44
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
50
45
                             uint64_t position, const char** errmsg)
58
53
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
59
54
 
60
55
  char* p = log_file_name+dirname_length(log_file_name);
61
 
  uint32_t ident_len = (uint32_t) strlen(p);
62
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
56
  uint ident_len = (uint) strlen(p);
 
57
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
63
58
  int4store(header + SERVER_ID_OFFSET, server_id);
64
59
  int4store(header + EVENT_LEN_OFFSET, event_len);
65
60
  int2store(header + FLAGS_OFFSET, 0);
71
66
  int8store(buf+R_POS_OFFSET,position);
72
67
  packet->append(buf, ROTATE_HEADER_LEN);
73
68
  packet->append(p,ident_len);
74
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
69
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
75
70
  {
76
71
    *errmsg = "failed on my_net_write()";
77
72
    return(-1);
79
74
  return(0);
80
75
}
81
76
 
82
 
static int send_file(Session *session)
 
77
static int send_file(THD *thd)
83
78
{
84
 
  NET* net = &session->net;
 
79
  NET* net = &thd->net;
85
80
  int fd = -1, error = 1;
86
81
  size_t bytes;
87
82
  char fname[FN_REFLEN+1];
88
83
  const char *errmsg = 0;
89
84
  int old_timeout;
90
85
  unsigned long packet_len;
91
 
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
 
86
  uchar buf[IO_SIZE];                           // It's safe to alloc this
92
87
 
93
88
  /*
94
89
    The client might be slow loading the data, give him wait_timeout to do
95
90
    the job
96
91
  */
97
92
  old_timeout= net->read_timeout;
98
 
  my_net_set_read_timeout(net, session->variables.net_wait_timeout);
 
93
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
99
94
 
100
95
  /*
101
96
    We need net_flush here because the client will not know it needs to send
103
98
  */
104
99
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
105
100
  {
106
 
    errmsg = _("Failed in send_file() while reading file name");
 
101
    errmsg = "while reading file name";
107
102
    goto err;
108
103
  }
109
104
 
116
111
 
117
112
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
118
113
  {
119
 
    errmsg = _("Failed in send_file() on open of file");
 
114
    errmsg = "on open of file";
120
115
    goto err;
121
116
  }
122
117
 
124
119
  {
125
120
    if (my_net_write(net, buf, bytes))
126
121
    {
127
 
      errmsg = _("Failed in send_file() while writing data to client");
 
122
      errmsg = "while writing data to client";
128
123
      goto err;
129
124
    }
130
125
  }
131
126
 
132
127
 end:
133
 
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
 
128
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
134
129
      (my_net_read(net) == packet_error))
135
130
  {
136
 
    errmsg = _("Failed in send_file() while negotiating file transfer close");
 
131
    errmsg = "while negotiating file transfer close";
137
132
    goto err;
138
133
  }
139
134
  error = 0;
144
139
    (void) my_close(fd, MYF(0));
145
140
  if (errmsg)
146
141
  {
147
 
    errmsg_printf(ERRMSG_LVL_ERROR, "%s",errmsg);
 
142
    sql_print_error("Failed in send_file() %s", errmsg);
148
143
  }
149
144
  return(error);
150
145
}
173
168
 
174
169
void adjust_linfo_offsets(my_off_t purge_offset)
175
170
{
176
 
  Session *tmp;
 
171
  THD *tmp;
177
172
 
178
173
  pthread_mutex_lock(&LOCK_thread_count);
179
 
  I_List_iterator<Session> it(threads);
 
174
  I_List_iterator<THD> it(threads);
180
175
 
181
176
  while ((tmp=it++))
182
177
  {
203
198
bool log_in_use(const char* log_name)
204
199
{
205
200
  int log_name_len = strlen(log_name) + 1;
206
 
  Session *tmp;
 
201
  THD *tmp;
207
202
  bool result = 0;
208
203
 
209
204
  pthread_mutex_lock(&LOCK_thread_count);
210
 
  I_List_iterator<Session> it(threads);
 
205
  I_List_iterator<THD> it(threads);
211
206
 
212
207
  while ((tmp=it++))
213
208
  {
215
210
    if ((linfo = tmp->current_linfo))
216
211
    {
217
212
      pthread_mutex_lock(&linfo->lock);
218
 
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
 
213
      result = !memcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
 
214
                     log_name_len);
219
215
      pthread_mutex_unlock(&linfo->lock);
220
216
      if (result)
221
217
        break;
226
222
  return result;
227
223
}
228
224
 
229
 
bool purge_error_message(Session* session, int res)
 
225
bool purge_error_message(THD* thd, int res)
230
226
{
231
 
  uint32_t errmsg= 0;
 
227
  uint errmsg= 0;
232
228
 
233
229
  switch (res)  {
234
230
  case 0: break;
248
244
    my_message(errmsg, ER(errmsg), MYF(0));
249
245
    return true;
250
246
  }
251
 
  my_ok(session);
 
247
  my_ok(thd);
252
248
  return false;
253
249
}
254
250
 
255
251
 
256
 
bool purge_master_logs(Session* session, const char* to_log)
 
252
bool purge_master_logs(THD* thd, const char* to_log)
257
253
{
258
254
  char search_file_name[FN_REFLEN];
259
 
  if (!drizzle_bin_log.is_open())
 
255
  if (!mysql_bin_log.is_open())
260
256
  {
261
 
    my_ok(session);
 
257
    my_ok(thd);
262
258
    return false;
263
259
  }
264
260
 
265
 
  drizzle_bin_log.make_log_name(search_file_name, to_log);
266
 
  return purge_error_message(session,
267
 
                             drizzle_bin_log.purge_logs(search_file_name, 0, 1,
 
261
  mysql_bin_log.make_log_name(search_file_name, to_log);
 
262
  return purge_error_message(thd,
 
263
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
268
264
                                                      1, NULL));
269
265
}
270
266
 
271
267
 
272
 
bool purge_master_logs_before_date(Session* session, time_t purge_time)
 
268
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
273
269
{
274
 
  if (!drizzle_bin_log.is_open())
 
270
  if (!mysql_bin_log.is_open())
275
271
  {
276
 
    my_ok(session);
 
272
    my_ok(thd);
277
273
    return 0;
278
274
  }
279
 
  return purge_error_message(session,
280
 
                             drizzle_bin_log.purge_logs_before_date(purge_time));
 
275
  return purge_error_message(thd,
 
276
                             mysql_bin_log.purge_logs_before_date(purge_time));
281
277
}
282
278
 
283
279
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
314
310
  An auxiliary function for calling in mysql_binlog_send
315
311
  to initialize the heartbeat timeout in waiting for a binlogged event.
316
312
 
317
 
  @param[in]    session  Session to access a user variable
 
313
  @param[in]    thd  THD to access a user variable
318
314
 
319
315
  @return        heartbeat period an uint64_t of nanoseconds
320
316
                 or zero if heartbeat was not demanded by slave
321
 
*/
322
 
static uint64_t get_heartbeat_period(Session * session)
 
317
*/ 
 
318
static uint64_t get_heartbeat_period(THD * thd)
323
319
{
324
 
  bool null_value;
 
320
  my_bool null_value;
325
321
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
326
 
  user_var_entry *entry=
327
 
    (user_var_entry*) hash_search(&session->user_vars, (unsigned char*) name.str,
 
322
  user_var_entry *entry= 
 
323
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
328
324
                                  name.length);
329
325
  return entry? entry->val_int(&null_value) : 0;
330
326
}
332
328
/*
333
329
  Function prepares and sends repliation heartbeat event.
334
330
 
335
 
  @param net                net object of Session
 
331
  @param net                net object of THD
336
332
  @param packet             buffer to store the heartbeat instance
337
333
  @param event_coordinates  binlog file name and position of the last
338
334
                            real event master sent from binlog
339
335
 
340
 
  @note
 
336
  @note 
341
337
    Among three essential pieces of heartbeat data Log_event::when
342
338
    is computed locally.
343
339
    The  error to send is serious and should force terminating
357
353
 
358
354
  char* p= coord->file_name + dirname_length(coord->file_name);
359
355
 
360
 
  uint32_t ident_len = strlen(p);
361
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
356
  uint ident_len = strlen(p);
 
357
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
362
358
  int4store(header + SERVER_ID_OFFSET, server_id);
363
359
  int4store(header + EVENT_LEN_OFFSET, event_len);
364
360
  int2store(header + FLAGS_OFFSET, 0);
368
364
  packet->append(header, sizeof(header));
369
365
  packet->append(p, ident_len);             // log_file_name
370
366
 
371
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
 
367
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
372
368
      net_flush(net))
373
369
  {
374
370
    return(-1);
381
377
  TODO: Clean up loop to only have one call to send_file()
382
378
*/
383
379
 
384
 
void mysql_binlog_send(Session* session, char* log_ident, my_off_t pos,
385
 
                       uint16_t flags)
 
380
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
 
381
                       ushort flags)
386
382
{
387
383
  LOG_INFO linfo;
388
384
  char *log_file_name = linfo.log_file_name;
389
385
  char search_file_name[FN_REFLEN], *name;
390
386
  IO_CACHE log;
391
387
  File file = -1;
392
 
  String* packet = &session->packet;
 
388
  String* packet = &thd->packet;
393
389
  int error;
394
390
  const char *errmsg = "Unknown error";
395
 
  NET* net = &session->net;
 
391
  NET* net = &thd->net;
396
392
  pthread_mutex_t *log_lock;
397
393
  bool binlog_can_be_corrupted= false;
398
394
 
399
 
  memset(&log, 0, sizeof(log));
400
 
  /*
 
395
  memset((char*) &log, 0, sizeof(log));
 
396
  /* 
401
397
     heartbeat_period from @master_heartbeat_period user variable
402
398
  */
403
 
  uint64_t heartbeat_period= get_heartbeat_period(session);
 
399
  uint64_t heartbeat_period= get_heartbeat_period(thd);
404
400
  struct timespec heartbeat_buf;
405
401
  struct event_coordinates coord_buf;
406
402
  struct timespec *heartbeat_ts= NULL;
407
403
  struct event_coordinates *coord= NULL;
408
 
  if (heartbeat_period != 0L)
 
404
  if (heartbeat_period != 0LL)
409
405
  {
410
406
    heartbeat_ts= &heartbeat_buf;
411
407
    set_timespec_nsec(*heartbeat_ts, 0);
414
410
    coord->pos= pos;
415
411
  }
416
412
 
417
 
  if (!drizzle_bin_log.is_open())
 
413
  if (!mysql_bin_log.is_open())
418
414
  {
419
415
    errmsg = "Binary log is not open";
420
416
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
429
425
 
430
426
  name=search_file_name;
431
427
  if (log_ident[0])
432
 
    drizzle_bin_log.make_log_name(search_file_name, log_ident);
 
428
    mysql_bin_log.make_log_name(search_file_name, log_ident);
433
429
  else
434
430
    name=0;                                     // Find first log
435
431
 
436
432
  linfo.index_file_offset = 0;
437
433
 
438
 
  if (drizzle_bin_log.find_log_pos(&linfo, name, 1))
 
434
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
439
435
  {
440
436
    errmsg = "Could not find first log file name in binary log index file";
441
437
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
443
439
  }
444
440
 
445
441
  pthread_mutex_lock(&LOCK_thread_count);
446
 
  session->current_linfo = &linfo;
 
442
  thd->current_linfo = &linfo;
447
443
  pthread_mutex_unlock(&LOCK_thread_count);
448
444
 
449
445
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
506
502
  packet->set("\0", 1, &my_charset_bin);
507
503
  /*
508
504
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
509
 
    this larger than the corresponding packet (query) sent
 
505
    this larger than the corresponding packet (query) sent 
510
506
    from client to master.
511
507
  */
512
 
  session->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
 
508
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
513
509
 
514
510
  /*
515
511
    We can set log_lock now, it does not move (it's a member of
516
 
    drizzle_bin_log, and it's already inited, and it will be destroyed
 
512
    mysql_bin_log, and it's already inited, and it will be destroyed
517
513
    only at shutdown).
518
514
  */
519
 
  log_lock = drizzle_bin_log.get_log_lock();
 
515
  log_lock = mysql_bin_log.get_log_lock();
520
516
  if (pos > BIN_LOG_HEADER_SIZE)
521
517
  {
522
518
     /*
545
541
           to avoid destroying temp tables.
546
542
          */
547
543
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
548
 
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
 
544
                   ST_CREATED_OFFSET+1, (ulong) 0);
549
545
         /* send it */
550
 
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
546
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
551
547
         {
552
548
           errmsg = "Failed on my_net_write()";
553
549
           my_errno= ER_UNKNOWN_ERROR;
582
578
  /* seek to the requested position, to start the requested dump */
583
579
  my_b_seek(&log, pos);                 // Seek will done on next read
584
580
 
585
 
  while (!net->error && net->vio != 0 && !session->killed)
 
581
  while (!net->error && net->vio != 0 && !thd->killed)
586
582
  {
587
583
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
588
584
    {
601
597
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
602
598
        binlog_can_be_corrupted= false;
603
599
 
604
 
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
600
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
605
601
      {
606
602
        errmsg = "Failed on my_net_write()";
607
603
        my_errno= ER_UNKNOWN_ERROR;
610
606
 
611
607
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
612
608
      {
613
 
        if (send_file(session))
 
609
        if (send_file(thd))
614
610
        {
615
611
          errmsg = "failed in send_file()";
616
612
          my_errno= ER_UNKNOWN_ERROR;
637
633
      goto err;
638
634
 
639
635
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
640
 
        drizzle_bin_log.is_active(log_file_name))
 
636
        mysql_bin_log.is_active(log_file_name))
641
637
    {
642
638
      /*
643
639
        Block until there is more data in the log
682
678
        case LOG_READ_EOF:
683
679
        {
684
680
          int ret;
685
 
          if (session->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
 
681
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
686
682
          {
687
683
            pthread_mutex_unlock(log_lock);
688
684
            goto end;
689
685
          }
690
686
 
691
 
          do
 
687
          do 
692
688
          {
693
689
            if (coord)
694
690
            {
695
 
              assert(heartbeat_ts && heartbeat_period != 0L);
 
691
              assert(heartbeat_ts && heartbeat_period != 0LL);
696
692
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
697
693
            }
698
 
            ret= drizzle_bin_log.wait_for_update_bin_log(session, heartbeat_ts);
699
 
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
 
694
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
 
695
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
700
696
            if (ret == ETIMEDOUT || ret == ETIME)
701
697
            {
702
698
              if (send_heartbeat_event(net, packet, coord))
711
707
            {
712
708
              assert(ret == 0);
713
709
            }
714
 
          } while (ret != 0 && coord != NULL && !session->killed);
 
710
          } while (ret != 0 && coord != NULL && !thd->killed);
715
711
          pthread_mutex_unlock(log_lock);
716
 
        }
 
712
        }    
717
713
        break;
718
 
 
 
714
            
719
715
        default:
720
716
          pthread_mutex_unlock(log_lock);
721
717
          fatal_error = 1;
724
720
 
725
721
        if (read_packet)
726
722
        {
727
 
          session->set_proc_info("Sending binlog event to slave");
728
 
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
 
723
          thd_proc_info(thd, "Sending binlog event to slave");
 
724
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
729
725
          {
730
726
            errmsg = "Failed on my_net_write()";
731
727
            my_errno= ER_UNKNOWN_ERROR;
734
730
 
735
731
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
736
732
          {
737
 
            if (send_file(session))
 
733
            if (send_file(thd))
738
734
            {
739
735
              errmsg = "failed in send_file()";
740
736
              my_errno= ER_UNKNOWN_ERROR;
762
758
      bool loop_breaker = 0;
763
759
      /* need this to break out of the for loop from switch */
764
760
 
765
 
      session->set_proc_info("Finished reading one binlog; switching to next binlog");
766
 
      switch (drizzle_bin_log.find_next_log(&linfo, 1)) {
 
761
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
762
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
767
763
      case LOG_INFO_EOF:
768
764
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
769
765
        break;
809
805
  end_io_cache(&log);
810
806
  (void)my_close(file, MYF(MY_WME));
811
807
 
812
 
  my_eof(session);
813
 
  session->set_proc_info("Waiting to finalize termination");
 
808
  my_eof(thd);
 
809
  thd_proc_info(thd, "Waiting to finalize termination");
814
810
  pthread_mutex_lock(&LOCK_thread_count);
815
 
  session->current_linfo = 0;
 
811
  thd->current_linfo = 0;
816
812
  pthread_mutex_unlock(&LOCK_thread_count);
817
813
  return;
818
814
 
819
815
err:
820
 
  session->set_proc_info("Waiting to finalize termination");
 
816
  thd_proc_info(thd, "Waiting to finalize termination");
821
817
  end_io_cache(&log);
822
818
  /*
823
819
    Exclude  iteration through thread list
824
820
    this is needed for purge_logs() - it will iterate through
825
 
    thread list and update session->current_linfo->index_file_offset
 
821
    thread list and update thd->current_linfo->index_file_offset
826
822
    this mutex will make sure that it never tried to update our linfo
827
823
    after we return from this stack frame
828
824
  */
829
825
  pthread_mutex_lock(&LOCK_thread_count);
830
 
  session->current_linfo = 0;
 
826
  thd->current_linfo = 0;
831
827
  pthread_mutex_unlock(&LOCK_thread_count);
832
828
  if (file >= 0)
833
829
    (void) my_close(file, MYF(MY_WME));
836
832
  return;
837
833
}
838
834
 
839
 
int start_slave(Session* session , Master_info* mi,  bool net_report)
 
835
int start_slave(THD* thd , Master_info* mi,  bool net_report)
840
836
{
841
837
  int slave_errno= 0;
842
838
  int thread_mask;
850
846
    don't wan't to touch the other thread), so set the bit to 0 for the
851
847
    other thread
852
848
  */
853
 
  if (session->lex->slave_session_opt)
854
 
    thread_mask&= session->lex->slave_session_opt;
 
849
  if (thd->lex->slave_thd_opt)
 
850
    thread_mask&= thd->lex->slave_thd_opt;
855
851
  if (thread_mask) //some threads are stopped, start them
856
852
  {
857
 
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
853
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
 
854
                         thread_mask))
858
855
      slave_errno=ER_MASTER_INFO;
859
 
    else if (server_id_supplied && *mi->getHostname())
 
856
    else if (server_id_supplied && *mi->host)
860
857
    {
861
858
      /*
862
859
        If we will start SQL thread we will care about UNTIL options If
867
864
      {
868
865
        pthread_mutex_lock(&mi->rli.data_lock);
869
866
 
870
 
        if (session->lex->mi.pos)
 
867
        if (thd->lex->mi.pos)
871
868
        {
872
869
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
873
 
          mi->rli.until_log_pos= session->lex->mi.pos;
 
870
          mi->rli.until_log_pos= thd->lex->mi.pos;
874
871
          /*
875
 
             We don't check session->lex->mi.log_file_name for NULL here
 
872
             We don't check thd->lex->mi.log_file_name for NULL here
876
873
             since it is checked in sql_yacc.yy
877
874
          */
878
 
          strncpy(mi->rli.until_log_name, session->lex->mi.log_file_name,
 
875
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
879
876
                  sizeof(mi->rli.until_log_name)-1);
880
877
        }
881
 
        else if (session->lex->mi.relay_log_pos)
 
878
        else if (thd->lex->mi.relay_log_pos)
882
879
        {
883
880
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
884
 
          mi->rli.until_log_pos= session->lex->mi.relay_log_pos;
885
 
          strncpy(mi->rli.until_log_name, session->lex->mi.relay_log_name,
 
881
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
 
882
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
886
883
                  sizeof(mi->rli.until_log_name)-1);
887
884
        }
888
885
        else
914
911
 
915
912
          /* Issuing warning then started without --skip-slave-start */
916
913
          if (!opt_skip_slave_start)
917
 
            push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
914
            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
918
915
                         ER_MISSING_SKIP_SLAVE,
919
916
                         ER(ER_MISSING_SKIP_SLAVE));
920
917
        }
921
918
 
922
919
        pthread_mutex_unlock(&mi->rli.data_lock);
923
920
      }
924
 
      else if (session->lex->mi.pos || session->lex->mi.relay_log_pos)
925
 
        push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
921
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
 
922
        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
926
923
                     ER(ER_UNTIL_COND_IGNORED));
927
924
 
928
925
      if (!slave_errno)
938
935
  else
939
936
  {
940
937
    /* no error if all threads are already started, only a warning */
941
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
938
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
942
939
                 ER(ER_SLAVE_WAS_RUNNING));
943
940
  }
944
941
 
951
948
    return(1);
952
949
  }
953
950
  else if (net_report)
954
 
    my_ok(session);
 
951
    my_ok(thd);
955
952
 
956
953
  return(0);
957
954
}
958
955
 
959
956
 
960
 
int stop_slave(Session* session, Master_info* mi, bool net_report )
 
957
int stop_slave(THD* thd, Master_info* mi, bool net_report )
961
958
{
962
959
  int slave_errno;
963
 
  if (!session)
964
 
    session = current_session;
 
960
  if (!thd)
 
961
    thd = current_thd;
965
962
 
966
 
  session->set_proc_info("Killing slave");
 
963
  thd_proc_info(thd, "Killing slave");
967
964
  int thread_mask;
968
965
  lock_slave_threads(mi);
969
966
  // Get a mask of _running_ threads
974
971
    was stopped (as we don't wan't to touch the other thread), so set the
975
972
    bit to 0 for the other thread
976
973
  */
977
 
  if (session->lex->slave_session_opt)
978
 
    thread_mask &= session->lex->slave_session_opt;
 
974
  if (thd->lex->slave_thd_opt)
 
975
    thread_mask &= thd->lex->slave_thd_opt;
979
976
 
980
977
  if (thread_mask)
981
978
  {
986
983
  {
987
984
    //no error if both threads are already stopped, only a warning
988
985
    slave_errno= 0;
989
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
986
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
990
987
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
991
988
  }
992
989
  unlock_slave_threads(mi);
993
 
  session->set_proc_info(0);
 
990
  thd_proc_info(thd, 0);
994
991
 
995
992
  if (slave_errno)
996
993
  {
999
996
    return(1);
1000
997
  }
1001
998
  else if (net_report)
1002
 
    my_ok(session);
 
999
    my_ok(thd);
1003
1000
 
1004
1001
  return(0);
1005
1002
}
1010
1007
 
1011
1008
  SYNOPSIS
1012
1009
    reset_slave()
1013
 
    session                     Thread handler
 
1010
    thd                 Thread handler
1014
1011
    mi                  Master info for the slave
1015
1012
 
1016
1013
  RETURN
1019
1016
*/
1020
1017
 
1021
1018
 
1022
 
int reset_slave(Session *session, Master_info* mi)
 
1019
int reset_slave(THD *thd, Master_info* mi)
1023
1020
{
1024
1021
  struct stat stat_area;
1025
1022
  char fname[FN_REFLEN];
1026
1023
  int thread_mask= 0, error= 0;
1027
 
  uint32_t sql_errno=0;
 
1024
  uint sql_errno=0;
1028
1025
  const char* errmsg=0;
1029
1026
 
1030
1027
  lock_slave_threads(mi);
1036
1033
    goto err;
1037
1034
  }
1038
1035
 
 
1036
  ha_reset_slave(thd);
 
1037
 
1039
1038
  // delete relay logs, clear relay log coordinates
1040
 
  if ((error= purge_relay_logs(&mi->rli, session,
 
1039
  if ((error= purge_relay_logs(&mi->rli, thd,
1041
1040
                               1 /* just reset */,
1042
1041
                               &errmsg)))
1043
1042
    goto err;
1044
1043
 
1045
1044
  /* Clear master's log coordinates */
1046
 
  mi->reset();
 
1045
  init_master_log_pos(mi);
1047
1046
  /*
1048
1047
     Reset errors (the idea is that we forget about the
1049
1048
     old master).
1052
1051
  mi->rli.clear_until_condition();
1053
1052
 
1054
1053
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1055
 
  mi->end_master_info();
 
1054
  end_master_info(mi);
1056
1055
  // and delete these two files
1057
 
  fn_format(fname, master_info_file, drizzle_data_home, "", 4+32);
 
1056
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1058
1057
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1059
1058
  {
1060
1059
    error=1;
1061
1060
    goto err;
1062
1061
  }
1063
1062
  // delete relay_log_info_file
1064
 
  fn_format(fname, relay_log_info_file, drizzle_data_home, "", 4+32);
 
1063
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
1065
1064
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1066
1065
  {
1067
1066
    error=1;
1098
1097
void kill_zombie_dump_threads(uint32_t slave_server_id)
1099
1098
{
1100
1099
  pthread_mutex_lock(&LOCK_thread_count);
1101
 
  I_List_iterator<Session> it(threads);
1102
 
  Session *tmp;
 
1100
  I_List_iterator<THD> it(threads);
 
1101
  THD *tmp;
1103
1102
 
1104
1103
  while ((tmp=it++))
1105
1104
  {
1118
1117
      it will be slow because it will iterate through the list
1119
1118
      again. We just to do kill the thread ourselves.
1120
1119
    */
1121
 
    tmp->awake(Session::KILL_QUERY);
 
1120
    tmp->awake(THD::KILL_QUERY);
1122
1121
    pthread_mutex_unlock(&tmp->LOCK_delete);
1123
1122
  }
1124
1123
}
1125
1124
 
1126
1125
 
1127
 
bool change_master(Session* session, Master_info* mi)
 
1126
bool change_master(THD* thd, Master_info* mi)
1128
1127
{
1129
1128
  int thread_mask;
1130
1129
  const char* errmsg= 0;
1139
1138
    return(true);
1140
1139
  }
1141
1140
 
1142
 
  session->set_proc_info("Changing master");
1143
 
  LEX_MASTER_INFO* lex_mi= &session->lex->mi;
 
1141
  thd_proc_info(thd, "Changing master");
 
1142
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1144
1143
  // TODO: see if needs re-write
1145
 
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
1144
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
 
1145
                       thread_mask))
1146
1146
  {
1147
1147
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1148
1148
    unlock_slave_threads(mi);
1161
1161
  */
1162
1162
 
1163
1163
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1164
 
    mi->reset();
 
1164
  {
 
1165
    mi->master_log_name[0] = 0;
 
1166
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
 
1167
  }
1165
1168
 
1166
1169
  if (lex_mi->log_file_name)
1167
 
    mi->setLogName(lex_mi->log_file_name);
 
1170
    strmake(mi->master_log_name, lex_mi->log_file_name,
 
1171
            sizeof(mi->master_log_name)-1);
1168
1172
  if (lex_mi->pos)
1169
1173
  {
1170
 
    mi->setLogPosition(lex_mi->pos);
 
1174
    mi->master_log_pos= lex_mi->pos;
1171
1175
  }
1172
1176
 
1173
1177
  if (lex_mi->host)
1174
 
    mi->setHost(lex_mi->host, lex_mi->port);
 
1178
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1175
1179
  if (lex_mi->user)
1176
 
    mi->setUsername(lex_mi->user);
 
1180
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1177
1181
  if (lex_mi->password)
1178
 
    mi->setPassword(lex_mi->password);
 
1182
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
 
1183
  if (lex_mi->port)
 
1184
    mi->port = lex_mi->port;
1179
1185
  if (lex_mi->connect_retry)
1180
1186
    mi->connect_retry = lex_mi->connect_retry;
1181
1187
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1182
1188
    mi->heartbeat_period = lex_mi->heartbeat_period;
1183
1189
  else
1184
 
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
 
1190
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1185
1191
                                      (slave_net_timeout/2.0));
1186
 
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
 
1192
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
 
1193
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1194
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1195
 
 
1196
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1197
    mi->ssl_verify_server_cert=
 
1198
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1199
 
 
1200
  if (lex_mi->ssl_ca)
 
1201
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
 
1202
  if (lex_mi->ssl_capath)
 
1203
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
 
1204
  if (lex_mi->ssl_cert)
 
1205
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
 
1206
  if (lex_mi->ssl_cipher)
 
1207
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
 
1208
  if (lex_mi->ssl_key)
 
1209
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
 
1210
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
 
1211
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
 
1212
      lex_mi->ssl_verify_server_cert )
 
1213
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
1214
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1187
1215
 
1188
1216
  if (lex_mi->relay_log_name)
1189
1217
  {
1190
1218
    need_relay_log_purge= 0;
1191
 
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
 
1219
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
 
1220
            sizeof(mi->rli.group_relay_log_name)-1);
 
1221
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
 
1222
            sizeof(mi->rli.event_relay_log_name)-1);
1192
1223
  }
1193
1224
 
1194
1225
  if (lex_mi->relay_log_pos)
1217
1248
   {
1218
1249
     /*
1219
1250
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1220
 
       not initialized), so we use a cmax().
 
1251
       not initialized), so we use a max().
1221
1252
       What happens to mi->rli.master_log_pos during the initialization stages
1222
1253
       of replication is not 100% clear, so we guard against problems using
1223
 
       cmax().
 
1254
       max().
1224
1255
      */
1225
 
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1226
 
                         ? BIN_LOG_HEADER_SIZE
1227
 
                         : mi->rli.group_master_log_pos));
1228
 
     mi->setLogName(mi->rli.group_master_log_name.c_str());
 
1256
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
 
1257
                              mi->rli.group_master_log_pos);
 
1258
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
 
1259
             sizeof(mi->master_log_name)-1);
1229
1260
  }
1230
1261
  /*
1231
1262
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1232
1263
    a slave before).
1233
1264
  */
1234
 
  if (mi->flush())
 
1265
  if (flush_master_info(mi, 0))
1235
1266
  {
1236
1267
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1237
1268
    unlock_slave_threads(mi);
1240
1271
  if (need_relay_log_purge)
1241
1272
  {
1242
1273
    relay_log_purge= 1;
1243
 
    session->set_proc_info("Purging old relay logs");
1244
 
    if (purge_relay_logs(&mi->rli, session,
 
1274
    thd_proc_info(thd, "Purging old relay logs");
 
1275
    if (purge_relay_logs(&mi->rli, thd,
1245
1276
                         0 /* not only reset, but also reinit */,
1246
1277
                         &errmsg))
1247
1278
    {
1256
1287
    relay_log_purge= 0;
1257
1288
    /* Relay log is already initialized */
1258
1289
    if (init_relay_log_pos(&mi->rli,
1259
 
                           mi->rli.group_relay_log_name.c_str(),
 
1290
                           mi->rli.group_relay_log_name,
1260
1291
                           mi->rli.group_relay_log_pos,
1261
1292
                           0 /*no data lock*/,
1262
1293
                           &msg, 0))
1276
1307
    ''/0: we have lost all copies of the original good coordinates.
1277
1308
    That's why we always save good coords in rli.
1278
1309
  */
1279
 
  mi->rli.group_master_log_pos= mi->getLogPosition();
1280
 
  mi->rli.group_master_log_name.assign(mi->getLogName());
 
1310
  mi->rli.group_master_log_pos= mi->master_log_pos;
 
1311
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
 
1312
          sizeof(mi->rli.group_master_log_name)-1);
1281
1313
 
1282
 
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
1283
 
    mi->rli.group_master_log_pos= 0;
 
1314
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
 
1315
    mi->rli.group_master_log_pos=0;
1284
1316
 
1285
1317
  pthread_mutex_lock(&mi->rli.data_lock);
1286
1318
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1299
1331
  pthread_mutex_unlock(&mi->rli.data_lock);
1300
1332
 
1301
1333
  unlock_slave_threads(mi);
1302
 
  session->set_proc_info(0);
1303
 
  my_ok(session);
 
1334
  thd_proc_info(thd, 0);
 
1335
  my_ok(thd);
1304
1336
  return(false);
1305
1337
}
1306
1338
 
1307
 
int reset_master(Session* session)
 
1339
int reset_master(THD* thd)
1308
1340
{
1309
 
  if (!drizzle_bin_log.is_open())
 
1341
  if (!mysql_bin_log.is_open())
1310
1342
  {
1311
1343
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1312
1344
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1313
1345
    return 1;
1314
1346
  }
1315
 
  return drizzle_bin_log.reset_logs(session);
 
1347
  return mysql_bin_log.reset_logs(thd);
1316
1348
}
1317
1349
 
1318
1350
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1319
1351
                   const char* log_file_name2, uint64_t log_pos2)
1320
1352
{
1321
1353
  int res;
1322
 
  uint32_t log_file_name1_len=  strlen(log_file_name1);
1323
 
  uint32_t log_file_name2_len=  strlen(log_file_name2);
 
1354
  uint log_file_name1_len=  strlen(log_file_name1);
 
1355
  uint log_file_name2_len=  strlen(log_file_name2);
1324
1356
 
1325
1357
  //  We assume that both log names match up to '.'
1326
1358
  if (log_file_name1_len == log_file_name2_len)
1333
1365
}
1334
1366
 
1335
1367
 
1336
 
bool show_binlog_info(Session* session)
1337
 
{
1338
 
  Protocol *protocol= session->protocol;
 
1368
bool mysql_show_binlog_events(THD* thd)
 
1369
{
 
1370
  Protocol *protocol= thd->protocol;
 
1371
  List<Item> field_list;
 
1372
  const char *errmsg= 0;
 
1373
  bool ret= true;
 
1374
  IO_CACHE log;
 
1375
  File file= -1;
 
1376
 
 
1377
  Log_event::init_show_field_list(&field_list);
 
1378
  if (protocol->send_fields(&field_list,
 
1379
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1380
    return(true);
 
1381
 
 
1382
  Format_description_log_event *description_event= new
 
1383
    Format_description_log_event(3); /* MySQL 4.0 by default */
 
1384
 
 
1385
  /*
 
1386
    Wait for handlers to insert any pending information
 
1387
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
 
1388
    this is needed so that the uses sees all its own commands in the binlog
 
1389
  */
 
1390
  ha_binlog_wait(thd);
 
1391
 
 
1392
  if (mysql_bin_log.is_open())
 
1393
  {
 
1394
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
 
1395
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
 
1396
    ha_rows event_count, limit_start, limit_end;
 
1397
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
 
1398
    char search_file_name[FN_REFLEN], *name;
 
1399
    const char *log_file_name = lex_mi->log_file_name;
 
1400
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
 
1401
    LOG_INFO linfo;
 
1402
    Log_event* ev;
 
1403
 
 
1404
    unit->set_limit(thd->lex->current_select);
 
1405
    limit_start= unit->offset_limit_cnt;
 
1406
    limit_end= unit->select_limit_cnt;
 
1407
 
 
1408
    name= search_file_name;
 
1409
    if (log_file_name)
 
1410
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
 
1411
    else
 
1412
      name=0;                                   // Find first log
 
1413
 
 
1414
    linfo.index_file_offset = 0;
 
1415
 
 
1416
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
1417
    {
 
1418
      errmsg = "Could not find target log";
 
1419
      goto err;
 
1420
    }
 
1421
 
 
1422
    pthread_mutex_lock(&LOCK_thread_count);
 
1423
    thd->current_linfo = &linfo;
 
1424
    pthread_mutex_unlock(&LOCK_thread_count);
 
1425
 
 
1426
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
 
1427
      goto err;
 
1428
 
 
1429
    /*
 
1430
      to account binlog event header size
 
1431
    */
 
1432
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
 
1433
 
 
1434
    pthread_mutex_lock(log_lock);
 
1435
 
 
1436
    /*
 
1437
      open_binlog() sought to position 4.
 
1438
      Read the first event in case it's a Format_description_log_event, to
 
1439
      know the format. If there's no such event, we are 3.23 or 4.x. This
 
1440
      code, like before, can't read 3.23 binlogs.
 
1441
      This code will fail on a mixed relay log (one which has Format_desc then
 
1442
      Rotate then Format_desc).
 
1443
    */
 
1444
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
 
1445
    if (ev)
 
1446
    {
 
1447
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
1448
      {
 
1449
        delete description_event;
 
1450
        description_event= (Format_description_log_event*) ev;
 
1451
      }
 
1452
      else
 
1453
        delete ev;
 
1454
    }
 
1455
 
 
1456
    my_b_seek(&log, pos);
 
1457
 
 
1458
    if (!description_event->is_valid())
 
1459
    {
 
1460
      errmsg="Invalid Format_description event; could be out of memory";
 
1461
      goto err;
 
1462
    }
 
1463
 
 
1464
    for (event_count = 0;
 
1465
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
 
1466
                                         description_event)); )
 
1467
    {
 
1468
      if (event_count >= limit_start &&
 
1469
          ev->net_send(protocol, linfo.log_file_name, pos))
 
1470
      {
 
1471
        errmsg = "Net error";
 
1472
        delete ev;
 
1473
        pthread_mutex_unlock(log_lock);
 
1474
        goto err;
 
1475
      }
 
1476
 
 
1477
      pos = my_b_tell(&log);
 
1478
      delete ev;
 
1479
 
 
1480
      if (++event_count >= limit_end)
 
1481
        break;
 
1482
    }
 
1483
 
 
1484
    if (event_count < limit_end && log.error)
 
1485
    {
 
1486
      errmsg = "Wrong offset or I/O error";
 
1487
      pthread_mutex_unlock(log_lock);
 
1488
      goto err;
 
1489
    }
 
1490
 
 
1491
    pthread_mutex_unlock(log_lock);
 
1492
  }
 
1493
 
 
1494
  ret= false;
 
1495
 
 
1496
err:
 
1497
  delete description_event;
 
1498
  if (file >= 0)
 
1499
  {
 
1500
    end_io_cache(&log);
 
1501
    (void) my_close(file, MYF(MY_WME));
 
1502
  }
 
1503
 
 
1504
  if (errmsg)
 
1505
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
 
1506
             "SHOW BINLOG EVENTS", errmsg);
 
1507
  else
 
1508
    my_eof(thd);
 
1509
 
 
1510
  pthread_mutex_lock(&LOCK_thread_count);
 
1511
  thd->current_linfo = 0;
 
1512
  pthread_mutex_unlock(&LOCK_thread_count);
 
1513
  return(ret);
 
1514
}
 
1515
 
 
1516
 
 
1517
bool show_binlog_info(THD* thd)
 
1518
{
 
1519
  Protocol *protocol= thd->protocol;
1339
1520
  List<Item> field_list;
1340
1521
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1341
1522
  field_list.push_back(new Item_return_int("Position",20,
1342
1523
                                           DRIZZLE_TYPE_LONGLONG));
 
1524
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
 
1525
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1343
1526
 
1344
1527
  if (protocol->send_fields(&field_list,
1345
1528
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1346
1529
    return(true);
1347
1530
  protocol->prepare_for_resend();
1348
1531
 
1349
 
  if (drizzle_bin_log.is_open())
 
1532
  if (mysql_bin_log.is_open())
1350
1533
  {
1351
1534
    LOG_INFO li;
1352
 
    drizzle_bin_log.get_current_log(&li);
 
1535
    mysql_bin_log.get_current_log(&li);
1353
1536
    int dir_len = dirname_length(li.log_file_name);
1354
1537
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1355
1538
    protocol->store((uint64_t) li.pos);
 
1539
    protocol->store(binlog_filter->get_do_db());
 
1540
    protocol->store(binlog_filter->get_ignore_db());
1356
1541
    if (protocol->write())
1357
1542
      return(true);
1358
1543
  }
1359
 
  my_eof(session);
 
1544
  my_eof(thd);
1360
1545
  return(false);
1361
1546
}
1362
1547
 
1366
1551
 
1367
1552
  SYNOPSIS
1368
1553
    show_binlogs()
1369
 
    session             Thread specific variable
 
1554
    thd         Thread specific variable
1370
1555
 
1371
1556
  RETURN VALUES
1372
1557
    false OK
1373
1558
    true  error
1374
1559
*/
1375
1560
 
1376
 
bool show_binlogs(Session* session)
 
1561
bool show_binlogs(THD* thd)
1377
1562
{
1378
1563
  IO_CACHE *index_file;
1379
1564
  LOG_INFO cur;
1380
1565
  File file;
1381
1566
  char fname[FN_REFLEN];
1382
1567
  List<Item> field_list;
1383
 
  uint32_t length;
 
1568
  uint length;
1384
1569
  int cur_dir_len;
1385
 
  Protocol *protocol= session->protocol;
 
1570
  Protocol *protocol= thd->protocol;
1386
1571
 
1387
 
  if (!drizzle_bin_log.is_open())
 
1572
  if (!mysql_bin_log.is_open())
1388
1573
  {
1389
1574
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1390
1575
    return 1;
1396
1581
  if (protocol->send_fields(&field_list,
1397
1582
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1398
1583
    return(true);
1399
 
 
1400
 
  pthread_mutex_lock(drizzle_bin_log.get_log_lock());
1401
 
  drizzle_bin_log.lock_index();
1402
 
  index_file= drizzle_bin_log.get_index_file();
1403
 
 
1404
 
  drizzle_bin_log.raw_get_current_log(&cur); // dont take mutex
1405
 
  pthread_mutex_unlock(drizzle_bin_log.get_log_lock()); // lockdep, OK
1406
 
 
 
1584
  
 
1585
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
 
1586
  mysql_bin_log.lock_index();
 
1587
  index_file=mysql_bin_log.get_index_file();
 
1588
  
 
1589
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
 
1590
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
 
1591
  
1407
1592
  cur_dir_len= dirname_length(cur.log_file_name);
1408
1593
 
1409
1594
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1412
1597
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1413
1598
  {
1414
1599
    int dir_len;
1415
 
    off_t file_length= 0;                   // Length if open fails
 
1600
    uint64_t file_length= 0;                   // Length if open fails
1416
1601
    fname[--length] = '\0';                     // remove the newline
1417
1602
 
1418
1603
    protocol->prepare_for_resend();
1425
1610
    else
1426
1611
    {
1427
1612
      /* this is an old log, open it and find the size */
1428
 
      if ((file= my_open(fname, O_RDONLY,
 
1613
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1429
1614
                         MYF(0))) >= 0)
1430
1615
      {
1431
 
        file_length= lseek(file, 0L, SEEK_END);
 
1616
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1432
1617
        my_close(file, MYF(0));
1433
1618
      }
1434
1619
    }
1436
1621
    if (protocol->write())
1437
1622
      goto err;
1438
1623
  }
1439
 
  drizzle_bin_log.unlock_index();
1440
 
  my_eof(session);
 
1624
  mysql_bin_log.unlock_index();
 
1625
  my_eof(thd);
1441
1626
  return(false);
1442
1627
 
1443
1628
err:
1444
 
  drizzle_bin_log.unlock_index();
 
1629
  mysql_bin_log.unlock_index();
1445
1630
  return(true);
1446
1631
}
1447
1632
 
1450
1635
   before a chunk of data is being read into the cache's buffer
1451
1636
   The fuction instantianates and writes into the binlog
1452
1637
   replication events along LOAD DATA processing.
1453
 
 
 
1638
   
1454
1639
   @param file  pointer to io-cache
1455
1640
   @return 0
1456
1641
*/
1457
1642
int log_loaded_block(IO_CACHE* file)
1458
1643
{
1459
1644
  LOAD_FILE_INFO *lf_info;
1460
 
  uint32_t block_len;
 
1645
  uint block_len;
1461
1646
  /* buffer contains position where we started last read */
1462
 
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1463
 
  uint32_t max_event_size= current_session->variables.max_allowed_packet;
 
1647
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
 
1648
  uint max_event_size= current_thd->variables.max_allowed_packet;
1464
1649
  lf_info= (LOAD_FILE_INFO*) file->arg;
1465
 
  if (true)
 
1650
  if (lf_info->thd->current_stmt_binlog_row_based)
1466
1651
    return(0);
1467
1652
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1468
1653
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1469
1654
    return(0);
1470
 
 
 
1655
  
1471
1656
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1472
 
       buffer += cmin(block_len, max_event_size),
1473
 
       block_len -= cmin(block_len, max_event_size))
 
1657
       buffer += min(block_len, max_event_size),
 
1658
       block_len -= min(block_len, max_event_size))
1474
1659
  {
1475
1660
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1476
1661
    if (lf_info->wrote_create_file)
1477
1662
    {
1478
 
      Append_block_log_event a(lf_info->session, lf_info->session->db, buffer,
1479
 
                               cmin(block_len, max_event_size),
 
1663
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
 
1664
                               min(block_len, max_event_size),
1480
1665
                               lf_info->log_delayed);
1481
 
      drizzle_bin_log.write(&a);
 
1666
      mysql_bin_log.write(&a);
1482
1667
    }
1483
1668
    else
1484
1669
    {
1485
 
      Begin_load_query_log_event b(lf_info->session, lf_info->session->db,
 
1670
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1486
1671
                                   buffer,
1487
 
                                   cmin(block_len, max_event_size),
 
1672
                                   min(block_len, max_event_size),
1488
1673
                                   lf_info->log_delayed);
1489
 
      drizzle_bin_log.write(&b);
 
1674
      mysql_bin_log.write(&b);
1490
1675
      lf_info->wrote_create_file= 1;
1491
1676
    }
1492
1677
  }
1503
1688
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1504
1689
    :sys_var(name_arg)
1505
1690
  { chain_sys_var(chain); }
1506
 
  bool check(Session *session, set_var *var);
1507
 
  bool update(Session *session, set_var *var);
 
1691
  bool check(THD *thd, set_var *var);
 
1692
  bool update(THD *thd, set_var *var);
1508
1693
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1509
1694
  /*
1510
1695
    We can't retrieve the value of this, so we don't have to define
1516
1701
{
1517
1702
public:
1518
1703
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1519
 
                             uint64_t *value_ptr)
1520
 
    :sys_var_long_ptr(chain, name_arg, value_ptr) {}
1521
 
  bool update(Session *session, set_var *var);
 
1704
                             ulong *value_ptr)
 
1705
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
 
1706
  bool update(THD *thd, set_var *var);
1522
1707
};
1523
1708
 
1524
 
static void fix_slave_net_timeout(Session *session,
 
1709
static void fix_slave_net_timeout(THD *thd,
1525
1710
                                  enum_var_type type __attribute__((unused)))
1526
1711
{
 
1712
#ifdef HAVE_REPLICATION
1527
1713
  pthread_mutex_lock(&LOCK_active_mi);
1528
1714
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1529
 
    push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1715
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1530
1716
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1531
1717
                        "The currect value for master_heartbeat_period"
1532
1718
                        " exceeds the new value of `slave_net_timeout' sec."
1533
1719
                        " A sensible value for the period should be"
1534
1720
                        " less than the timeout.");
1535
1721
  pthread_mutex_unlock(&LOCK_active_mi);
 
1722
#endif
1536
1723
  return;
1537
1724
}
1538
1725
 
1543
1730
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
1544
1731
                                              &slave_net_timeout,
1545
1732
                                              fix_slave_net_timeout);
1546
 
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries", &slave_trans_retries);
 
1733
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
 
1734
                                                &slave_trans_retries);
1547
1735
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1548
1736
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1549
1737
 
1550
 
static int show_slave_skip_errors(Session *session, SHOW_VAR *var, char *buff);
1551
 
 
1552
 
 
1553
 
static int show_slave_skip_errors(Session *session __attribute__((unused)),
 
1738
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
 
1739
 
 
1740
 
 
1741
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
1554
1742
                                  SHOW_VAR *var, char *buff)
1555
1743
{
1556
1744
  var->type=SHOW_CHAR;
1582
1770
    if (var->value != buff)
1583
1771
      buff--;                           // Remove last ','
1584
1772
    if (i < MAX_SLAVE_ERROR)
1585
 
      buff= strcpy(buff, "...")+3;  // Couldn't show all errors
 
1773
      buff= strmov(buff, "...");  // Couldn't show all errors
1586
1774
    *buff=0;
1587
1775
  }
1588
1776
  return 0;
1602
1790
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
1603
1791
};
1604
1792
 
1605
 
bool sys_var_slave_skip_counter::check(Session *session __attribute__((unused)),
 
1793
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
1606
1794
                                       set_var *var)
1607
1795
{
1608
1796
  int result= 0;
1615
1803
  }
1616
1804
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1617
1805
  pthread_mutex_unlock(&LOCK_active_mi);
1618
 
  var->save_result.uint32_t_value= (uint32_t) var->value->val_int();
 
1806
  var->save_result.ulong_value= (ulong) var->value->val_int();
1619
1807
  return result;
1620
1808
}
1621
1809
 
1622
1810
 
1623
 
bool sys_var_slave_skip_counter::update(Session *session __attribute__((unused)),
 
1811
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
1624
1812
                                        set_var *var)
1625
1813
{
1626
1814
  pthread_mutex_lock(&LOCK_active_mi);
1633
1821
  if (!active_mi->rli.slave_running)
1634
1822
  {
1635
1823
    pthread_mutex_lock(&active_mi->rli.data_lock);
1636
 
    active_mi->rli.slave_skip_counter= var->save_result.uint32_t_value;
 
1824
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1637
1825
    pthread_mutex_unlock(&active_mi->rli.data_lock);
1638
1826
  }
1639
1827
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1642
1830
}
1643
1831
 
1644
1832
 
1645
 
bool sys_var_sync_binlog_period::update(Session *session __attribute__((unused)),
 
1833
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
1646
1834
                                        set_var *var)
1647
1835
{
1648
 
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
 
1836
  sync_binlog_period= (ulong) var->save_result.uint64_t_value;
1649
1837
  return 0;
1650
1838
}
1651
1839
 
1661
1849
  }
1662
1850
  return 0;
1663
1851
}
 
1852
 
 
1853
#endif /* HAVE_REPLICATION */
 
1854
 
 
1855