~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/sql_repl.cc

Removed dead variable, sorted authors file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
 
#include <drizzled/server_includes.h>
 
16
#include "mysql_priv.h"
 
17
#ifdef HAVE_REPLICATION
17
18
 
18
 
#include <drizzled/replication/mi.h>
19
 
#include <drizzled/replication/replication.h>
20
 
#include <drizzled/log_event.h>
21
 
#include <libdrizzle/libdrizzle.h>
22
 
#include <mysys/hash.h>
23
 
#include <drizzled/error.h>
24
 
#include <drizzled/gettext.h>
25
 
#include <drizzled/data_home.h>
26
 
#include <drizzled/unireg.h>
 
19
#include "rpl_mi.h"
 
20
#include "sql_repl.h"
 
21
#include "log_event.h"
 
22
#include "rpl_filter.h"
 
23
#include <my_dir.h>
27
24
 
28
25
int max_binlog_dump_events = 0; // unlimited
29
26
 
30
27
/*
31
 
  fake_rotate_event() builds a fake (=which does not exist physically in any
32
 
  binlog) Rotate event, which contains the name of the binlog we are going to
33
 
  send to the slave (because the slave may not know it if it just asked for
34
 
  MASTER_LOG_FILE='', MASTER_LOG_POS=4).
35
 
  < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
36
 
  After this version we always call it, so that a 3.23.58 slave can rely on
37
 
  it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
38
 
  zeros in the good positions which, by chance, make it possible for the 3.23
39
 
  slave to detect that this event is unexpected) (this is luck which happens
40
 
  because the master and slave disagree on the size of the header of
41
 
  Log_event).
 
28
    fake_rotate_event() builds a fake (=which does not exist physically in any
 
29
    binlog) Rotate event, which contains the name of the binlog we are going to
 
30
    send to the slave (because the slave may not know it if it just asked for
 
31
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
 
32
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
 
33
    After this version we always call it, so that a 3.23.58 slave can rely on
 
34
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
 
35
    zeros in the good positions which, by chance, make it possible for the 3.23
 
36
    slave to detect that this event is unexpected) (this is luck which happens
 
37
    because the master and slave disagree on the size of the header of
 
38
    Log_event).
42
39
 
43
 
  Relying on the event length of the Rotate event instead of these
44
 
  well-placed zeros was not possible as Rotate events have a variable-length
45
 
  part.
 
40
    Relying on the event length of the Rotate event instead of these
 
41
    well-placed zeros was not possible as Rotate events have a variable-length
 
42
    part.
46
43
*/
47
44
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
48
45
                             uint64_t position, const char** errmsg)
56
53
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
57
54
 
58
55
  char* p = log_file_name+dirname_length(log_file_name);
59
 
  uint32_t ident_len = (uint32_t) strlen(p);
60
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
56
  uint ident_len = (uint) strlen(p);
 
57
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
61
58
  int4store(header + SERVER_ID_OFFSET, server_id);
62
59
  int4store(header + EVENT_LEN_OFFSET, event_len);
63
60
  int2store(header + FLAGS_OFFSET, 0);
69
66
  int8store(buf+R_POS_OFFSET,position);
70
67
  packet->append(buf, ROTATE_HEADER_LEN);
71
68
  packet->append(p,ident_len);
72
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
69
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
73
70
  {
74
71
    *errmsg = "failed on my_net_write()";
75
72
    return(-1);
77
74
  return(0);
78
75
}
79
76
 
80
 
static int send_file(Session *session)
 
77
static int send_file(THD *thd)
81
78
{
82
 
  NET* net = &session->net;
 
79
  NET* net = &thd->net;
83
80
  int fd = -1, error = 1;
84
81
  size_t bytes;
85
82
  char fname[FN_REFLEN+1];
86
83
  const char *errmsg = 0;
87
84
  int old_timeout;
88
85
  unsigned long packet_len;
89
 
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
 
86
  uchar buf[IO_SIZE];                           // It's safe to alloc this
90
87
 
91
88
  /*
92
89
    The client might be slow loading the data, give him wait_timeout to do
93
90
    the job
94
91
  */
95
92
  old_timeout= net->read_timeout;
96
 
  my_net_set_read_timeout(net, session->variables.net_wait_timeout);
 
93
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
97
94
 
98
95
  /*
99
96
    We need net_flush here because the client will not know it needs to send
101
98
  */
102
99
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
103
100
  {
104
 
    errmsg = _("Failed in send_file() while reading file name");
 
101
    errmsg = "while reading file name";
105
102
    goto err;
106
103
  }
107
104
 
114
111
 
115
112
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
116
113
  {
117
 
    errmsg = _("Failed in send_file() on open of file");
 
114
    errmsg = "on open of file";
118
115
    goto err;
119
116
  }
120
117
 
122
119
  {
123
120
    if (my_net_write(net, buf, bytes))
124
121
    {
125
 
      errmsg = _("Failed in send_file() while writing data to client");
 
122
      errmsg = "while writing data to client";
126
123
      goto err;
127
124
    }
128
125
  }
129
126
 
130
127
 end:
131
 
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
 
128
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
132
129
      (my_net_read(net) == packet_error))
133
130
  {
134
 
    errmsg = _("Failed in send_file() while negotiating file transfer close");
 
131
    errmsg = "while negotiating file transfer close";
135
132
    goto err;
136
133
  }
137
134
  error = 0;
142
139
    (void) my_close(fd, MYF(0));
143
140
  if (errmsg)
144
141
  {
145
 
    sql_print_error("%s",errmsg);
 
142
    sql_print_error("Failed in send_file() %s", errmsg);
146
143
  }
147
144
  return(error);
148
145
}
171
168
 
172
169
void adjust_linfo_offsets(my_off_t purge_offset)
173
170
{
174
 
  Session *tmp;
 
171
  THD *tmp;
175
172
 
176
173
  pthread_mutex_lock(&LOCK_thread_count);
177
 
  I_List_iterator<Session> it(threads);
 
174
  I_List_iterator<THD> it(threads);
178
175
 
179
176
  while ((tmp=it++))
180
177
  {
201
198
bool log_in_use(const char* log_name)
202
199
{
203
200
  int log_name_len = strlen(log_name) + 1;
204
 
  Session *tmp;
 
201
  THD *tmp;
205
202
  bool result = 0;
206
203
 
207
204
  pthread_mutex_lock(&LOCK_thread_count);
208
 
  I_List_iterator<Session> it(threads);
 
205
  I_List_iterator<THD> it(threads);
209
206
 
210
207
  while ((tmp=it++))
211
208
  {
213
210
    if ((linfo = tmp->current_linfo))
214
211
    {
215
212
      pthread_mutex_lock(&linfo->lock);
216
 
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
 
213
      result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
 
214
                     log_name_len);
217
215
      pthread_mutex_unlock(&linfo->lock);
218
216
      if (result)
219
217
        break;
224
222
  return result;
225
223
}
226
224
 
227
 
bool purge_error_message(Session* session, int res)
 
225
bool purge_error_message(THD* thd, int res)
228
226
{
229
 
  uint32_t errmsg= 0;
 
227
  uint errmsg= 0;
230
228
 
231
229
  switch (res)  {
232
230
  case 0: break;
246
244
    my_message(errmsg, ER(errmsg), MYF(0));
247
245
    return true;
248
246
  }
249
 
  my_ok(session);
 
247
  my_ok(thd);
250
248
  return false;
251
249
}
252
250
 
253
251
 
254
 
bool purge_master_logs(Session* session, const char* to_log)
 
252
bool purge_master_logs(THD* thd, const char* to_log)
255
253
{
256
254
  char search_file_name[FN_REFLEN];
257
 
  if (!drizzle_bin_log.is_open())
 
255
  if (!mysql_bin_log.is_open())
258
256
  {
259
 
    my_ok(session);
 
257
    my_ok(thd);
260
258
    return false;
261
259
  }
262
260
 
263
 
  drizzle_bin_log.make_log_name(search_file_name, to_log);
264
 
  return purge_error_message(session,
265
 
                             drizzle_bin_log.purge_logs(search_file_name, 0, 1,
 
261
  mysql_bin_log.make_log_name(search_file_name, to_log);
 
262
  return purge_error_message(thd,
 
263
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
266
264
                                                      1, NULL));
267
265
}
268
266
 
269
267
 
270
 
bool purge_master_logs_before_date(Session* session, time_t purge_time)
 
268
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
271
269
{
272
 
  if (!drizzle_bin_log.is_open())
 
270
  if (!mysql_bin_log.is_open())
273
271
  {
274
 
    my_ok(session);
 
272
    my_ok(thd);
275
273
    return 0;
276
274
  }
277
 
  return purge_error_message(session,
278
 
                             drizzle_bin_log.purge_logs_before_date(purge_time));
 
275
  return purge_error_message(thd,
 
276
                             mysql_bin_log.purge_logs_before_date(purge_time));
279
277
}
280
278
 
281
279
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
312
310
  An auxiliary function for calling in mysql_binlog_send
313
311
  to initialize the heartbeat timeout in waiting for a binlogged event.
314
312
 
315
 
  @param[in]    session  Session to access a user variable
 
313
  @param[in]    thd  THD to access a user variable
316
314
 
317
315
  @return        heartbeat period an uint64_t of nanoseconds
318
316
                 or zero if heartbeat was not demanded by slave
319
 
*/
320
 
static uint64_t get_heartbeat_period(Session * session)
 
317
*/ 
 
318
static uint64_t get_heartbeat_period(THD * thd)
321
319
{
322
 
  bool null_value;
 
320
  my_bool null_value;
323
321
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
324
 
  user_var_entry *entry=
325
 
    (user_var_entry*) hash_search(&session->user_vars, (unsigned char*) name.str,
 
322
  user_var_entry *entry= 
 
323
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
326
324
                                  name.length);
327
325
  return entry? entry->val_int(&null_value) : 0;
328
326
}
330
328
/*
331
329
  Function prepares and sends repliation heartbeat event.
332
330
 
333
 
  @param net                net object of Session
 
331
  @param net                net object of THD
334
332
  @param packet             buffer to store the heartbeat instance
335
333
  @param event_coordinates  binlog file name and position of the last
336
334
                            real event master sent from binlog
337
335
 
338
 
  @note
 
336
  @note 
339
337
    Among three essential pieces of heartbeat data Log_event::when
340
338
    is computed locally.
341
339
    The  error to send is serious and should force terminating
355
353
 
356
354
  char* p= coord->file_name + dirname_length(coord->file_name);
357
355
 
358
 
  uint32_t ident_len = strlen(p);
359
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
356
  uint ident_len = strlen(p);
 
357
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
360
358
  int4store(header + SERVER_ID_OFFSET, server_id);
361
359
  int4store(header + EVENT_LEN_OFFSET, event_len);
362
360
  int2store(header + FLAGS_OFFSET, 0);
366
364
  packet->append(header, sizeof(header));
367
365
  packet->append(p, ident_len);             // log_file_name
368
366
 
369
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
 
367
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
370
368
      net_flush(net))
371
369
  {
372
370
    return(-1);
379
377
  TODO: Clean up loop to only have one call to send_file()
380
378
*/
381
379
 
382
 
void mysql_binlog_send(Session* session, char* log_ident, my_off_t pos,
383
 
                       uint16_t flags)
 
380
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
 
381
                       ushort flags)
384
382
{
385
383
  LOG_INFO linfo;
386
384
  char *log_file_name = linfo.log_file_name;
387
385
  char search_file_name[FN_REFLEN], *name;
388
386
  IO_CACHE log;
389
387
  File file = -1;
390
 
  String* packet = &session->packet;
 
388
  String* packet = &thd->packet;
391
389
  int error;
392
390
  const char *errmsg = "Unknown error";
393
 
  NET* net = &session->net;
 
391
  NET* net = &thd->net;
394
392
  pthread_mutex_t *log_lock;
395
393
  bool binlog_can_be_corrupted= false;
396
394
 
397
 
  memset(&log, 0, sizeof(log));
398
 
  /*
 
395
  bzero((char*) &log,sizeof(log));
 
396
  /* 
399
397
     heartbeat_period from @master_heartbeat_period user variable
400
398
  */
401
 
  uint64_t heartbeat_period= get_heartbeat_period(session);
 
399
  uint64_t heartbeat_period= get_heartbeat_period(thd);
402
400
  struct timespec heartbeat_buf;
403
401
  struct event_coordinates coord_buf;
404
402
  struct timespec *heartbeat_ts= NULL;
405
403
  struct event_coordinates *coord= NULL;
406
 
  if (heartbeat_period != 0L)
 
404
  if (heartbeat_period != 0LL)
407
405
  {
408
406
    heartbeat_ts= &heartbeat_buf;
409
407
    set_timespec_nsec(*heartbeat_ts, 0);
412
410
    coord->pos= pos;
413
411
  }
414
412
 
415
 
  if (!drizzle_bin_log.is_open())
 
413
  if (!mysql_bin_log.is_open())
416
414
  {
417
415
    errmsg = "Binary log is not open";
418
416
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
427
425
 
428
426
  name=search_file_name;
429
427
  if (log_ident[0])
430
 
    drizzle_bin_log.make_log_name(search_file_name, log_ident);
 
428
    mysql_bin_log.make_log_name(search_file_name, log_ident);
431
429
  else
432
430
    name=0;                                     // Find first log
433
431
 
434
432
  linfo.index_file_offset = 0;
435
433
 
436
 
  if (drizzle_bin_log.find_log_pos(&linfo, name, 1))
 
434
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
437
435
  {
438
436
    errmsg = "Could not find first log file name in binary log index file";
439
437
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
441
439
  }
442
440
 
443
441
  pthread_mutex_lock(&LOCK_thread_count);
444
 
  session->current_linfo = &linfo;
 
442
  thd->current_linfo = &linfo;
445
443
  pthread_mutex_unlock(&LOCK_thread_count);
446
444
 
447
445
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
504
502
  packet->set("\0", 1, &my_charset_bin);
505
503
  /*
506
504
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
507
 
    this larger than the corresponding packet (query) sent
 
505
    this larger than the corresponding packet (query) sent 
508
506
    from client to master.
509
507
  */
510
 
  session->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
 
508
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
511
509
 
512
510
  /*
513
511
    We can set log_lock now, it does not move (it's a member of
514
 
    drizzle_bin_log, and it's already inited, and it will be destroyed
 
512
    mysql_bin_log, and it's already inited, and it will be destroyed
515
513
    only at shutdown).
516
514
  */
517
 
  log_lock = drizzle_bin_log.get_log_lock();
 
515
  log_lock = mysql_bin_log.get_log_lock();
518
516
  if (pos > BIN_LOG_HEADER_SIZE)
519
517
  {
520
518
     /*
543
541
           to avoid destroying temp tables.
544
542
          */
545
543
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
546
 
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
 
544
                   ST_CREATED_OFFSET+1, (ulong) 0);
547
545
         /* send it */
548
 
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
546
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
549
547
         {
550
548
           errmsg = "Failed on my_net_write()";
551
549
           my_errno= ER_UNKNOWN_ERROR;
580
578
  /* seek to the requested position, to start the requested dump */
581
579
  my_b_seek(&log, pos);                 // Seek will done on next read
582
580
 
583
 
  while (!net->error && net->vio != 0 && !session->killed)
 
581
  while (!net->error && net->vio != 0 && !thd->killed)
584
582
  {
585
583
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
586
584
    {
599
597
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
600
598
        binlog_can_be_corrupted= false;
601
599
 
602
 
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
600
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
603
601
      {
604
602
        errmsg = "Failed on my_net_write()";
605
603
        my_errno= ER_UNKNOWN_ERROR;
608
606
 
609
607
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
610
608
      {
611
 
        if (send_file(session))
 
609
        if (send_file(thd))
612
610
        {
613
611
          errmsg = "failed in send_file()";
614
612
          my_errno= ER_UNKNOWN_ERROR;
635
633
      goto err;
636
634
 
637
635
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
638
 
        drizzle_bin_log.is_active(log_file_name))
 
636
        mysql_bin_log.is_active(log_file_name))
639
637
    {
640
638
      /*
641
639
        Block until there is more data in the log
680
678
        case LOG_READ_EOF:
681
679
        {
682
680
          int ret;
683
 
          if (session->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
 
681
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
684
682
          {
685
683
            pthread_mutex_unlock(log_lock);
686
684
            goto end;
687
685
          }
688
686
 
689
 
          do
 
687
          do 
690
688
          {
691
689
            if (coord)
692
690
            {
693
 
              assert(heartbeat_ts && heartbeat_period != 0L);
 
691
              assert(heartbeat_ts && heartbeat_period != 0LL);
694
692
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
695
693
            }
696
 
            ret= drizzle_bin_log.wait_for_update_bin_log(session, heartbeat_ts);
697
 
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
 
694
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
 
695
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
698
696
            if (ret == ETIMEDOUT || ret == ETIME)
699
697
            {
700
698
              if (send_heartbeat_event(net, packet, coord))
709
707
            {
710
708
              assert(ret == 0);
711
709
            }
712
 
          } while (ret != 0 && coord != NULL && !session->killed);
 
710
          } while (ret != 0 && coord != NULL && !thd->killed);
713
711
          pthread_mutex_unlock(log_lock);
714
 
        }
 
712
        }    
715
713
        break;
716
 
 
 
714
            
717
715
        default:
718
716
          pthread_mutex_unlock(log_lock);
719
717
          fatal_error = 1;
722
720
 
723
721
        if (read_packet)
724
722
        {
725
 
          session->set_proc_info("Sending binlog event to slave");
726
 
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
 
723
          thd_proc_info(thd, "Sending binlog event to slave");
 
724
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
727
725
          {
728
726
            errmsg = "Failed on my_net_write()";
729
727
            my_errno= ER_UNKNOWN_ERROR;
732
730
 
733
731
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
734
732
          {
735
 
            if (send_file(session))
 
733
            if (send_file(thd))
736
734
            {
737
735
              errmsg = "failed in send_file()";
738
736
              my_errno= ER_UNKNOWN_ERROR;
760
758
      bool loop_breaker = 0;
761
759
      /* need this to break out of the for loop from switch */
762
760
 
763
 
      session->set_proc_info("Finished reading one binlog; switching to next binlog");
764
 
      switch (drizzle_bin_log.find_next_log(&linfo, 1)) {
 
761
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
762
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
765
763
      case LOG_INFO_EOF:
766
764
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
767
765
        break;
807
805
  end_io_cache(&log);
808
806
  (void)my_close(file, MYF(MY_WME));
809
807
 
810
 
  my_eof(session);
811
 
  session->set_proc_info("Waiting to finalize termination");
 
808
  my_eof(thd);
 
809
  thd_proc_info(thd, "Waiting to finalize termination");
812
810
  pthread_mutex_lock(&LOCK_thread_count);
813
 
  session->current_linfo = 0;
 
811
  thd->current_linfo = 0;
814
812
  pthread_mutex_unlock(&LOCK_thread_count);
815
813
  return;
816
814
 
817
815
err:
818
 
  session->set_proc_info("Waiting to finalize termination");
 
816
  thd_proc_info(thd, "Waiting to finalize termination");
819
817
  end_io_cache(&log);
820
818
  /*
821
819
    Exclude  iteration through thread list
822
820
    this is needed for purge_logs() - it will iterate through
823
 
    thread list and update session->current_linfo->index_file_offset
 
821
    thread list and update thd->current_linfo->index_file_offset
824
822
    this mutex will make sure that it never tried to update our linfo
825
823
    after we return from this stack frame
826
824
  */
827
825
  pthread_mutex_lock(&LOCK_thread_count);
828
 
  session->current_linfo = 0;
 
826
  thd->current_linfo = 0;
829
827
  pthread_mutex_unlock(&LOCK_thread_count);
830
828
  if (file >= 0)
831
829
    (void) my_close(file, MYF(MY_WME));
834
832
  return;
835
833
}
836
834
 
837
 
int start_slave(Session* session , Master_info* mi,  bool net_report)
 
835
int start_slave(THD* thd , Master_info* mi,  bool net_report)
838
836
{
839
837
  int slave_errno= 0;
840
838
  int thread_mask;
848
846
    don't wan't to touch the other thread), so set the bit to 0 for the
849
847
    other thread
850
848
  */
851
 
  if (session->lex->slave_session_opt)
852
 
    thread_mask&= session->lex->slave_session_opt;
 
849
  if (thd->lex->slave_thd_opt)
 
850
    thread_mask&= thd->lex->slave_thd_opt;
853
851
  if (thread_mask) //some threads are stopped, start them
854
852
  {
855
 
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
853
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
 
854
                         thread_mask))
856
855
      slave_errno=ER_MASTER_INFO;
857
 
    else if (server_id_supplied && *mi->getHostname())
 
856
    else if (server_id_supplied && *mi->host)
858
857
    {
859
858
      /*
860
859
        If we will start SQL thread we will care about UNTIL options If
865
864
      {
866
865
        pthread_mutex_lock(&mi->rli.data_lock);
867
866
 
868
 
        if (session->lex->mi.pos)
 
867
        if (thd->lex->mi.pos)
869
868
        {
870
869
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
871
 
          mi->rli.until_log_pos= session->lex->mi.pos;
 
870
          mi->rli.until_log_pos= thd->lex->mi.pos;
872
871
          /*
873
 
             We don't check session->lex->mi.log_file_name for NULL here
 
872
             We don't check thd->lex->mi.log_file_name for NULL here
874
873
             since it is checked in sql_yacc.yy
875
874
          */
876
 
          strncpy(mi->rli.until_log_name, session->lex->mi.log_file_name,
 
875
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
877
876
                  sizeof(mi->rli.until_log_name)-1);
878
877
        }
879
 
        else if (session->lex->mi.relay_log_pos)
 
878
        else if (thd->lex->mi.relay_log_pos)
880
879
        {
881
880
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
882
 
          mi->rli.until_log_pos= session->lex->mi.relay_log_pos;
883
 
          strncpy(mi->rli.until_log_name, session->lex->mi.relay_log_name,
 
881
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
 
882
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
884
883
                  sizeof(mi->rli.until_log_name)-1);
885
884
        }
886
885
        else
912
911
 
913
912
          /* Issuing warning then started without --skip-slave-start */
914
913
          if (!opt_skip_slave_start)
915
 
            push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
914
            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
916
915
                         ER_MISSING_SKIP_SLAVE,
917
916
                         ER(ER_MISSING_SKIP_SLAVE));
918
917
        }
919
918
 
920
919
        pthread_mutex_unlock(&mi->rli.data_lock);
921
920
      }
922
 
      else if (session->lex->mi.pos || session->lex->mi.relay_log_pos)
923
 
        push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
921
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
 
922
        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
924
923
                     ER(ER_UNTIL_COND_IGNORED));
925
924
 
926
925
      if (!slave_errno)
936
935
  else
937
936
  {
938
937
    /* no error if all threads are already started, only a warning */
939
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
938
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
940
939
                 ER(ER_SLAVE_WAS_RUNNING));
941
940
  }
942
941
 
949
948
    return(1);
950
949
  }
951
950
  else if (net_report)
952
 
    my_ok(session);
 
951
    my_ok(thd);
953
952
 
954
953
  return(0);
955
954
}
956
955
 
957
956
 
958
 
int stop_slave(Session* session, Master_info* mi, bool net_report )
 
957
int stop_slave(THD* thd, Master_info* mi, bool net_report )
959
958
{
960
959
  int slave_errno;
961
 
  if (!session)
962
 
    session = current_session;
 
960
  if (!thd)
 
961
    thd = current_thd;
963
962
 
964
 
  session->set_proc_info("Killing slave");
 
963
  thd_proc_info(thd, "Killing slave");
965
964
  int thread_mask;
966
965
  lock_slave_threads(mi);
967
966
  // Get a mask of _running_ threads
972
971
    was stopped (as we don't wan't to touch the other thread), so set the
973
972
    bit to 0 for the other thread
974
973
  */
975
 
  if (session->lex->slave_session_opt)
976
 
    thread_mask &= session->lex->slave_session_opt;
 
974
  if (thd->lex->slave_thd_opt)
 
975
    thread_mask &= thd->lex->slave_thd_opt;
977
976
 
978
977
  if (thread_mask)
979
978
  {
984
983
  {
985
984
    //no error if both threads are already stopped, only a warning
986
985
    slave_errno= 0;
987
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
986
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
988
987
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
989
988
  }
990
989
  unlock_slave_threads(mi);
991
 
  session->set_proc_info(0);
 
990
  thd_proc_info(thd, 0);
992
991
 
993
992
  if (slave_errno)
994
993
  {
997
996
    return(1);
998
997
  }
999
998
  else if (net_report)
1000
 
    my_ok(session);
 
999
    my_ok(thd);
1001
1000
 
1002
1001
  return(0);
1003
1002
}
1008
1007
 
1009
1008
  SYNOPSIS
1010
1009
    reset_slave()
1011
 
    session                     Thread handler
 
1010
    thd                 Thread handler
1012
1011
    mi                  Master info for the slave
1013
1012
 
1014
1013
  RETURN
1017
1016
*/
1018
1017
 
1019
1018
 
1020
 
int reset_slave(Session *session, Master_info* mi)
 
1019
int reset_slave(THD *thd, Master_info* mi)
1021
1020
{
1022
1021
  struct stat stat_area;
1023
1022
  char fname[FN_REFLEN];
1024
1023
  int thread_mask= 0, error= 0;
1025
 
  uint32_t sql_errno=0;
 
1024
  uint sql_errno=0;
1026
1025
  const char* errmsg=0;
1027
1026
 
1028
1027
  lock_slave_threads(mi);
1034
1033
    goto err;
1035
1034
  }
1036
1035
 
 
1036
  ha_reset_slave(thd);
 
1037
 
1037
1038
  // delete relay logs, clear relay log coordinates
1038
 
  if ((error= purge_relay_logs(&mi->rli, session,
 
1039
  if ((error= purge_relay_logs(&mi->rli, thd,
1039
1040
                               1 /* just reset */,
1040
1041
                               &errmsg)))
1041
1042
    goto err;
1042
1043
 
1043
1044
  /* Clear master's log coordinates */
1044
 
  mi->reset();
 
1045
  init_master_log_pos(mi);
1045
1046
  /*
1046
1047
     Reset errors (the idea is that we forget about the
1047
1048
     old master).
1050
1051
  mi->rli.clear_until_condition();
1051
1052
 
1052
1053
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1053
 
  mi->end_master_info();
 
1054
  end_master_info(mi);
1054
1055
  // and delete these two files
1055
 
  fn_format(fname, master_info_file, drizzle_data_home, "", 4+32);
 
1056
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1056
1057
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1057
1058
  {
1058
1059
    error=1;
1059
1060
    goto err;
1060
1061
  }
1061
1062
  // delete relay_log_info_file
1062
 
  fn_format(fname, relay_log_info_file, drizzle_data_home, "", 4+32);
 
1063
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
1063
1064
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1064
1065
  {
1065
1066
    error=1;
1093
1094
*/
1094
1095
 
1095
1096
 
1096
 
void kill_zombie_dump_threads(uint32_t slave_server_id)
 
1097
void kill_zombie_dump_threads(uint32 slave_server_id)
1097
1098
{
1098
1099
  pthread_mutex_lock(&LOCK_thread_count);
1099
 
  I_List_iterator<Session> it(threads);
1100
 
  Session *tmp;
 
1100
  I_List_iterator<THD> it(threads);
 
1101
  THD *tmp;
1101
1102
 
1102
1103
  while ((tmp=it++))
1103
1104
  {
1116
1117
      it will be slow because it will iterate through the list
1117
1118
      again. We just to do kill the thread ourselves.
1118
1119
    */
1119
 
    tmp->awake(Session::KILL_QUERY);
 
1120
    tmp->awake(THD::KILL_QUERY);
1120
1121
    pthread_mutex_unlock(&tmp->LOCK_delete);
1121
1122
  }
1122
1123
}
1123
1124
 
1124
1125
 
1125
 
bool change_master(Session* session, Master_info* mi)
 
1126
bool change_master(THD* thd, Master_info* mi)
1126
1127
{
1127
1128
  int thread_mask;
1128
1129
  const char* errmsg= 0;
1137
1138
    return(true);
1138
1139
  }
1139
1140
 
1140
 
  session->set_proc_info("Changing master");
1141
 
  LEX_MASTER_INFO* lex_mi= &session->lex->mi;
 
1141
  thd_proc_info(thd, "Changing master");
 
1142
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1142
1143
  // TODO: see if needs re-write
1143
 
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
1144
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
 
1145
                       thread_mask))
1144
1146
  {
1145
1147
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1146
1148
    unlock_slave_threads(mi);
1159
1161
  */
1160
1162
 
1161
1163
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1162
 
    mi->reset();
 
1164
  {
 
1165
    mi->master_log_name[0] = 0;
 
1166
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
 
1167
  }
1163
1168
 
1164
1169
  if (lex_mi->log_file_name)
1165
 
    mi->setLogName(lex_mi->log_file_name);
 
1170
    strmake(mi->master_log_name, lex_mi->log_file_name,
 
1171
            sizeof(mi->master_log_name)-1);
1166
1172
  if (lex_mi->pos)
1167
1173
  {
1168
 
    mi->setLogPosition(lex_mi->pos);
 
1174
    mi->master_log_pos= lex_mi->pos;
1169
1175
  }
1170
1176
 
1171
1177
  if (lex_mi->host)
1172
 
    mi->setHost(lex_mi->host, lex_mi->port);
 
1178
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1173
1179
  if (lex_mi->user)
1174
 
    mi->setUsername(lex_mi->user);
 
1180
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1175
1181
  if (lex_mi->password)
1176
 
    mi->setPassword(lex_mi->password);
 
1182
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
 
1183
  if (lex_mi->port)
 
1184
    mi->port = lex_mi->port;
1177
1185
  if (lex_mi->connect_retry)
1178
1186
    mi->connect_retry = lex_mi->connect_retry;
1179
1187
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1180
1188
    mi->heartbeat_period = lex_mi->heartbeat_period;
1181
1189
  else
1182
 
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
 
1190
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1183
1191
                                      (slave_net_timeout/2.0));
1184
 
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
 
1192
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
 
1193
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1194
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1195
 
 
1196
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1197
    mi->ssl_verify_server_cert=
 
1198
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1199
 
 
1200
  if (lex_mi->ssl_ca)
 
1201
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
 
1202
  if (lex_mi->ssl_capath)
 
1203
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
 
1204
  if (lex_mi->ssl_cert)
 
1205
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
 
1206
  if (lex_mi->ssl_cipher)
 
1207
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
 
1208
  if (lex_mi->ssl_key)
 
1209
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
 
1210
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
 
1211
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
 
1212
      lex_mi->ssl_verify_server_cert )
 
1213
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
1214
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1185
1215
 
1186
1216
  if (lex_mi->relay_log_name)
1187
1217
  {
1188
1218
    need_relay_log_purge= 0;
1189
 
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
 
1219
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
 
1220
            sizeof(mi->rli.group_relay_log_name)-1);
 
1221
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
 
1222
            sizeof(mi->rli.event_relay_log_name)-1);
1190
1223
  }
1191
1224
 
1192
1225
  if (lex_mi->relay_log_pos)
1215
1248
   {
1216
1249
     /*
1217
1250
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1218
 
       not initialized), so we use a cmax().
 
1251
       not initialized), so we use a max().
1219
1252
       What happens to mi->rli.master_log_pos during the initialization stages
1220
1253
       of replication is not 100% clear, so we guard against problems using
1221
 
       cmax().
 
1254
       max().
1222
1255
      */
1223
 
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1224
 
                         ? BIN_LOG_HEADER_SIZE
1225
 
                         : mi->rli.group_master_log_pos));
1226
 
     mi->setLogName(mi->rli.group_master_log_name.c_str());
 
1256
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
 
1257
                              mi->rli.group_master_log_pos);
 
1258
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
 
1259
             sizeof(mi->master_log_name)-1);
1227
1260
  }
1228
1261
  /*
1229
1262
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1230
1263
    a slave before).
1231
1264
  */
1232
 
  if (mi->flush())
 
1265
  if (flush_master_info(mi, 0))
1233
1266
  {
1234
1267
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1235
1268
    unlock_slave_threads(mi);
1238
1271
  if (need_relay_log_purge)
1239
1272
  {
1240
1273
    relay_log_purge= 1;
1241
 
    session->set_proc_info("Purging old relay logs");
1242
 
    if (purge_relay_logs(&mi->rli, session,
 
1274
    thd_proc_info(thd, "Purging old relay logs");
 
1275
    if (purge_relay_logs(&mi->rli, thd,
1243
1276
                         0 /* not only reset, but also reinit */,
1244
1277
                         &errmsg))
1245
1278
    {
1254
1287
    relay_log_purge= 0;
1255
1288
    /* Relay log is already initialized */
1256
1289
    if (init_relay_log_pos(&mi->rli,
1257
 
                           mi->rli.group_relay_log_name.c_str(),
 
1290
                           mi->rli.group_relay_log_name,
1258
1291
                           mi->rli.group_relay_log_pos,
1259
1292
                           0 /*no data lock*/,
1260
1293
                           &msg, 0))
1274
1307
    ''/0: we have lost all copies of the original good coordinates.
1275
1308
    That's why we always save good coords in rli.
1276
1309
  */
1277
 
  mi->rli.group_master_log_pos= mi->getLogPosition();
1278
 
  mi->rli.group_master_log_name.assign(mi->getLogName());
 
1310
  mi->rli.group_master_log_pos= mi->master_log_pos;
 
1311
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
 
1312
          sizeof(mi->rli.group_master_log_name)-1);
1279
1313
 
1280
 
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
1281
 
    mi->rli.group_master_log_pos= 0;
 
1314
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
 
1315
    mi->rli.group_master_log_pos=0;
1282
1316
 
1283
1317
  pthread_mutex_lock(&mi->rli.data_lock);
1284
1318
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1297
1331
  pthread_mutex_unlock(&mi->rli.data_lock);
1298
1332
 
1299
1333
  unlock_slave_threads(mi);
1300
 
  session->set_proc_info(0);
1301
 
  my_ok(session);
 
1334
  thd_proc_info(thd, 0);
 
1335
  my_ok(thd);
1302
1336
  return(false);
1303
1337
}
1304
1338
 
1305
 
int reset_master(Session* session)
 
1339
int reset_master(THD* thd)
1306
1340
{
1307
 
  if (!drizzle_bin_log.is_open())
 
1341
  if (!mysql_bin_log.is_open())
1308
1342
  {
1309
1343
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1310
1344
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1311
1345
    return 1;
1312
1346
  }
1313
 
  return drizzle_bin_log.reset_logs(session);
 
1347
  return mysql_bin_log.reset_logs(thd);
1314
1348
}
1315
1349
 
1316
1350
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1317
1351
                   const char* log_file_name2, uint64_t log_pos2)
1318
1352
{
1319
1353
  int res;
1320
 
  uint32_t log_file_name1_len=  strlen(log_file_name1);
1321
 
  uint32_t log_file_name2_len=  strlen(log_file_name2);
 
1354
  uint log_file_name1_len=  strlen(log_file_name1);
 
1355
  uint log_file_name2_len=  strlen(log_file_name2);
1322
1356
 
1323
1357
  //  We assume that both log names match up to '.'
1324
1358
  if (log_file_name1_len == log_file_name2_len)
1331
1365
}
1332
1366
 
1333
1367
 
1334
 
bool show_binlog_info(Session* session)
1335
 
{
1336
 
  Protocol *protocol= session->protocol;
 
1368
bool mysql_show_binlog_events(THD* thd)
 
1369
{
 
1370
  Protocol *protocol= thd->protocol;
 
1371
  List<Item> field_list;
 
1372
  const char *errmsg= 0;
 
1373
  bool ret= true;
 
1374
  IO_CACHE log;
 
1375
  File file= -1;
 
1376
 
 
1377
  Log_event::init_show_field_list(&field_list);
 
1378
  if (protocol->send_fields(&field_list,
 
1379
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1380
    return(true);
 
1381
 
 
1382
  Format_description_log_event *description_event= new
 
1383
    Format_description_log_event(3); /* MySQL 4.0 by default */
 
1384
 
 
1385
  /*
 
1386
    Wait for handlers to insert any pending information
 
1387
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
 
1388
    this is needed so that the uses sees all its own commands in the binlog
 
1389
  */
 
1390
  ha_binlog_wait(thd);
 
1391
 
 
1392
  if (mysql_bin_log.is_open())
 
1393
  {
 
1394
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
 
1395
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
 
1396
    ha_rows event_count, limit_start, limit_end;
 
1397
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
 
1398
    char search_file_name[FN_REFLEN], *name;
 
1399
    const char *log_file_name = lex_mi->log_file_name;
 
1400
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
 
1401
    LOG_INFO linfo;
 
1402
    Log_event* ev;
 
1403
 
 
1404
    unit->set_limit(thd->lex->current_select);
 
1405
    limit_start= unit->offset_limit_cnt;
 
1406
    limit_end= unit->select_limit_cnt;
 
1407
 
 
1408
    name= search_file_name;
 
1409
    if (log_file_name)
 
1410
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
 
1411
    else
 
1412
      name=0;                                   // Find first log
 
1413
 
 
1414
    linfo.index_file_offset = 0;
 
1415
 
 
1416
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
1417
    {
 
1418
      errmsg = "Could not find target log";
 
1419
      goto err;
 
1420
    }
 
1421
 
 
1422
    pthread_mutex_lock(&LOCK_thread_count);
 
1423
    thd->current_linfo = &linfo;
 
1424
    pthread_mutex_unlock(&LOCK_thread_count);
 
1425
 
 
1426
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
 
1427
      goto err;
 
1428
 
 
1429
    /*
 
1430
      to account binlog event header size
 
1431
    */
 
1432
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
 
1433
 
 
1434
    pthread_mutex_lock(log_lock);
 
1435
 
 
1436
    /*
 
1437
      open_binlog() sought to position 4.
 
1438
      Read the first event in case it's a Format_description_log_event, to
 
1439
      know the format. If there's no such event, we are 3.23 or 4.x. This
 
1440
      code, like before, can't read 3.23 binlogs.
 
1441
      This code will fail on a mixed relay log (one which has Format_desc then
 
1442
      Rotate then Format_desc).
 
1443
    */
 
1444
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
 
1445
    if (ev)
 
1446
    {
 
1447
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
1448
      {
 
1449
        delete description_event;
 
1450
        description_event= (Format_description_log_event*) ev;
 
1451
      }
 
1452
      else
 
1453
        delete ev;
 
1454
    }
 
1455
 
 
1456
    my_b_seek(&log, pos);
 
1457
 
 
1458
    if (!description_event->is_valid())
 
1459
    {
 
1460
      errmsg="Invalid Format_description event; could be out of memory";
 
1461
      goto err;
 
1462
    }
 
1463
 
 
1464
    for (event_count = 0;
 
1465
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
 
1466
                                         description_event)); )
 
1467
    {
 
1468
      if (event_count >= limit_start &&
 
1469
          ev->net_send(protocol, linfo.log_file_name, pos))
 
1470
      {
 
1471
        errmsg = "Net error";
 
1472
        delete ev;
 
1473
        pthread_mutex_unlock(log_lock);
 
1474
        goto err;
 
1475
      }
 
1476
 
 
1477
      pos = my_b_tell(&log);
 
1478
      delete ev;
 
1479
 
 
1480
      if (++event_count >= limit_end)
 
1481
        break;
 
1482
    }
 
1483
 
 
1484
    if (event_count < limit_end && log.error)
 
1485
    {
 
1486
      errmsg = "Wrong offset or I/O error";
 
1487
      pthread_mutex_unlock(log_lock);
 
1488
      goto err;
 
1489
    }
 
1490
 
 
1491
    pthread_mutex_unlock(log_lock);
 
1492
  }
 
1493
 
 
1494
  ret= false;
 
1495
 
 
1496
err:
 
1497
  delete description_event;
 
1498
  if (file >= 0)
 
1499
  {
 
1500
    end_io_cache(&log);
 
1501
    (void) my_close(file, MYF(MY_WME));
 
1502
  }
 
1503
 
 
1504
  if (errmsg)
 
1505
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
 
1506
             "SHOW BINLOG EVENTS", errmsg);
 
1507
  else
 
1508
    my_eof(thd);
 
1509
 
 
1510
  pthread_mutex_lock(&LOCK_thread_count);
 
1511
  thd->current_linfo = 0;
 
1512
  pthread_mutex_unlock(&LOCK_thread_count);
 
1513
  return(ret);
 
1514
}
 
1515
 
 
1516
 
 
1517
bool show_binlog_info(THD* thd)
 
1518
{
 
1519
  Protocol *protocol= thd->protocol;
1337
1520
  List<Item> field_list;
1338
1521
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1339
1522
  field_list.push_back(new Item_return_int("Position",20,
1340
 
                                           DRIZZLE_TYPE_LONGLONG));
 
1523
                                           MYSQL_TYPE_LONGLONG));
 
1524
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
 
1525
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1341
1526
 
1342
1527
  if (protocol->send_fields(&field_list,
1343
1528
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1344
1529
    return(true);
1345
1530
  protocol->prepare_for_resend();
1346
1531
 
1347
 
  if (drizzle_bin_log.is_open())
 
1532
  if (mysql_bin_log.is_open())
1348
1533
  {
1349
1534
    LOG_INFO li;
1350
 
    drizzle_bin_log.get_current_log(&li);
 
1535
    mysql_bin_log.get_current_log(&li);
1351
1536
    int dir_len = dirname_length(li.log_file_name);
1352
1537
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1353
1538
    protocol->store((uint64_t) li.pos);
 
1539
    protocol->store(binlog_filter->get_do_db());
 
1540
    protocol->store(binlog_filter->get_ignore_db());
1354
1541
    if (protocol->write())
1355
1542
      return(true);
1356
1543
  }
1357
 
  my_eof(session);
 
1544
  my_eof(thd);
1358
1545
  return(false);
1359
1546
}
1360
1547
 
1364
1551
 
1365
1552
  SYNOPSIS
1366
1553
    show_binlogs()
1367
 
    session             Thread specific variable
 
1554
    thd         Thread specific variable
1368
1555
 
1369
1556
  RETURN VALUES
1370
1557
    false OK
1371
1558
    true  error
1372
1559
*/
1373
1560
 
1374
 
bool show_binlogs(Session* session)
 
1561
bool show_binlogs(THD* thd)
1375
1562
{
1376
1563
  IO_CACHE *index_file;
1377
1564
  LOG_INFO cur;
1378
1565
  File file;
1379
1566
  char fname[FN_REFLEN];
1380
1567
  List<Item> field_list;
1381
 
  uint32_t length;
 
1568
  uint length;
1382
1569
  int cur_dir_len;
1383
 
  Protocol *protocol= session->protocol;
 
1570
  Protocol *protocol= thd->protocol;
1384
1571
 
1385
 
  if (!drizzle_bin_log.is_open())
 
1572
  if (!mysql_bin_log.is_open())
1386
1573
  {
1387
1574
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1388
1575
    return 1;
1390
1577
 
1391
1578
  field_list.push_back(new Item_empty_string("Log_name", 255));
1392
1579
  field_list.push_back(new Item_return_int("File_size", 20,
1393
 
                                           DRIZZLE_TYPE_LONGLONG));
 
1580
                                           MYSQL_TYPE_LONGLONG));
1394
1581
  if (protocol->send_fields(&field_list,
1395
1582
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1396
1583
    return(true);
1397
 
 
1398
 
  pthread_mutex_lock(drizzle_bin_log.get_log_lock());
1399
 
  drizzle_bin_log.lock_index();
1400
 
  index_file= drizzle_bin_log.get_index_file();
1401
 
 
1402
 
  drizzle_bin_log.raw_get_current_log(&cur); // dont take mutex
1403
 
  pthread_mutex_unlock(drizzle_bin_log.get_log_lock()); // lockdep, OK
1404
 
 
 
1584
  
 
1585
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
 
1586
  mysql_bin_log.lock_index();
 
1587
  index_file=mysql_bin_log.get_index_file();
 
1588
  
 
1589
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
 
1590
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
 
1591
  
1405
1592
  cur_dir_len= dirname_length(cur.log_file_name);
1406
1593
 
1407
1594
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1410
1597
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1411
1598
  {
1412
1599
    int dir_len;
1413
 
    off_t file_length= 0;                   // Length if open fails
 
1600
    uint64_t file_length= 0;                   // Length if open fails
1414
1601
    fname[--length] = '\0';                     // remove the newline
1415
1602
 
1416
1603
    protocol->prepare_for_resend();
1423
1610
    else
1424
1611
    {
1425
1612
      /* this is an old log, open it and find the size */
1426
 
      if ((file= my_open(fname, O_RDONLY,
 
1613
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1427
1614
                         MYF(0))) >= 0)
1428
1615
      {
1429
 
        file_length= lseek(file, 0L, SEEK_END);
 
1616
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1430
1617
        my_close(file, MYF(0));
1431
1618
      }
1432
1619
    }
1434
1621
    if (protocol->write())
1435
1622
      goto err;
1436
1623
  }
1437
 
  drizzle_bin_log.unlock_index();
1438
 
  my_eof(session);
 
1624
  mysql_bin_log.unlock_index();
 
1625
  my_eof(thd);
1439
1626
  return(false);
1440
1627
 
1441
1628
err:
1442
 
  drizzle_bin_log.unlock_index();
 
1629
  mysql_bin_log.unlock_index();
1443
1630
  return(true);
1444
1631
}
1445
1632
 
1448
1635
   before a chunk of data is being read into the cache's buffer
1449
1636
   The fuction instantianates and writes into the binlog
1450
1637
   replication events along LOAD DATA processing.
1451
 
 
 
1638
   
1452
1639
   @param file  pointer to io-cache
1453
1640
   @return 0
1454
1641
*/
1455
1642
int log_loaded_block(IO_CACHE* file)
1456
1643
{
1457
1644
  LOAD_FILE_INFO *lf_info;
1458
 
  uint32_t block_len;
 
1645
  uint block_len;
1459
1646
  /* buffer contains position where we started last read */
1460
 
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1461
 
  uint32_t max_event_size= current_session->variables.max_allowed_packet;
 
1647
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
 
1648
  uint max_event_size= current_thd->variables.max_allowed_packet;
1462
1649
  lf_info= (LOAD_FILE_INFO*) file->arg;
1463
 
  if (true)
 
1650
  if (lf_info->thd->current_stmt_binlog_row_based)
1464
1651
    return(0);
1465
1652
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1466
1653
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1467
1654
    return(0);
1468
 
 
 
1655
  
1469
1656
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1470
 
       buffer += cmin(block_len, max_event_size),
1471
 
       block_len -= cmin(block_len, max_event_size))
 
1657
       buffer += min(block_len, max_event_size),
 
1658
       block_len -= min(block_len, max_event_size))
1472
1659
  {
1473
1660
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1474
1661
    if (lf_info->wrote_create_file)
1475
1662
    {
1476
 
      Append_block_log_event a(lf_info->session, lf_info->session->db, buffer,
1477
 
                               cmin(block_len, max_event_size),
 
1663
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
 
1664
                               min(block_len, max_event_size),
1478
1665
                               lf_info->log_delayed);
1479
 
      drizzle_bin_log.write(&a);
 
1666
      mysql_bin_log.write(&a);
1480
1667
    }
1481
1668
    else
1482
1669
    {
1483
 
      Begin_load_query_log_event b(lf_info->session, lf_info->session->db,
 
1670
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1484
1671
                                   buffer,
1485
 
                                   cmin(block_len, max_event_size),
 
1672
                                   min(block_len, max_event_size),
1486
1673
                                   lf_info->log_delayed);
1487
 
      drizzle_bin_log.write(&b);
 
1674
      mysql_bin_log.write(&b);
1488
1675
      lf_info->wrote_create_file= 1;
1489
1676
    }
1490
1677
  }
1501
1688
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1502
1689
    :sys_var(name_arg)
1503
1690
  { chain_sys_var(chain); }
1504
 
  bool check(Session *session, set_var *var);
1505
 
  bool update(Session *session, set_var *var);
 
1691
  bool check(THD *thd, set_var *var);
 
1692
  bool update(THD *thd, set_var *var);
1506
1693
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1507
1694
  /*
1508
1695
    We can't retrieve the value of this, so we don't have to define
1514
1701
{
1515
1702
public:
1516
1703
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1517
 
                             uint64_t *value_ptr)
1518
 
    :sys_var_long_ptr(chain, name_arg, value_ptr) {}
1519
 
  bool update(Session *session, set_var *var);
 
1704
                             ulong *value_ptr)
 
1705
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
 
1706
  bool update(THD *thd, set_var *var);
1520
1707
};
1521
1708
 
1522
 
static void fix_slave_net_timeout(Session *session,
1523
 
                                  enum_var_type type __attribute__((unused)))
 
1709
static void fix_slave_net_timeout(THD *thd,
 
1710
                                  enum_var_type type __attribute__((__unused__)))
1524
1711
{
 
1712
#ifdef HAVE_REPLICATION
1525
1713
  pthread_mutex_lock(&LOCK_active_mi);
1526
1714
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1527
 
    push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1715
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1528
1716
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1529
1717
                        "The currect value for master_heartbeat_period"
1530
1718
                        " exceeds the new value of `slave_net_timeout' sec."
1531
1719
                        " A sensible value for the period should be"
1532
1720
                        " less than the timeout.");
1533
1721
  pthread_mutex_unlock(&LOCK_active_mi);
 
1722
#endif
1534
1723
  return;
1535
1724
}
1536
1725
 
1541
1730
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
1542
1731
                                              &slave_net_timeout,
1543
1732
                                              fix_slave_net_timeout);
1544
 
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries", &slave_trans_retries);
 
1733
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
 
1734
                                                &slave_trans_retries);
1545
1735
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1546
1736
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1547
1737
 
1548
 
static int show_slave_skip_errors(Session *session, SHOW_VAR *var, char *buff);
1549
 
 
1550
 
 
1551
 
static int show_slave_skip_errors(Session *session __attribute__((unused)),
 
1738
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
 
1739
 
 
1740
 
 
1741
static int show_slave_skip_errors(THD *thd __attribute__((__unused__)),
1552
1742
                                  SHOW_VAR *var, char *buff)
1553
1743
{
1554
1744
  var->type=SHOW_CHAR;
1580
1770
    if (var->value != buff)
1581
1771
      buff--;                           // Remove last ','
1582
1772
    if (i < MAX_SLAVE_ERROR)
1583
 
      buff= strcpy(buff, "...")+3;  // Couldn't show all errors
 
1773
      buff= strmov(buff, "...");  // Couldn't show all errors
1584
1774
    *buff=0;
1585
1775
  }
1586
1776
  return 0;
1600
1790
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
1601
1791
};
1602
1792
 
1603
 
bool sys_var_slave_skip_counter::check(Session *session __attribute__((unused)),
 
1793
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((__unused__)),
1604
1794
                                       set_var *var)
1605
1795
{
1606
1796
  int result= 0;
1613
1803
  }
1614
1804
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1615
1805
  pthread_mutex_unlock(&LOCK_active_mi);
1616
 
  var->save_result.uint32_t_value= (uint32_t) var->value->val_int();
 
1806
  var->save_result.ulong_value= (ulong) var->value->val_int();
1617
1807
  return result;
1618
1808
}
1619
1809
 
1620
1810
 
1621
 
bool sys_var_slave_skip_counter::update(Session *session __attribute__((unused)),
 
1811
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((__unused__)),
1622
1812
                                        set_var *var)
1623
1813
{
1624
1814
  pthread_mutex_lock(&LOCK_active_mi);
1631
1821
  if (!active_mi->rli.slave_running)
1632
1822
  {
1633
1823
    pthread_mutex_lock(&active_mi->rli.data_lock);
1634
 
    active_mi->rli.slave_skip_counter= var->save_result.uint32_t_value;
 
1824
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1635
1825
    pthread_mutex_unlock(&active_mi->rli.data_lock);
1636
1826
  }
1637
1827
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1640
1830
}
1641
1831
 
1642
1832
 
1643
 
bool sys_var_sync_binlog_period::update(Session *session __attribute__((unused)),
 
1833
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((__unused__)),
1644
1834
                                        set_var *var)
1645
1835
{
1646
 
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
 
1836
  sync_binlog_period= (ulong) var->save_result.uint64_t_value;
1647
1837
  return 0;
1648
1838
}
1649
1839
 
1659
1849
  }
1660
1850
  return 0;
1661
1851
}
 
1852
 
 
1853
#endif /* HAVE_REPLICATION */
 
1854
 
 
1855