~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/slave.cc

  • Committer: Brian Aker
  • Date: 2008-07-10 19:37:55 UTC
  • mfrom: (51.1.67 remove-dbug)
  • Revision ID: brian@tangent.org-20080710193755-f5g761uieqa3wxmt
Merge.

Show diffs side-by-side

added added

removed removed

Lines of Context:
165
165
{
166
166
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
167
167
  register int tmp_mask=0;
168
 
  DBUG_ENTER("init_thread_mask");
169
168
 
170
169
  if (set_io)
171
170
    tmp_mask |= SLAVE_IO;
174
173
  if (inverse)
175
174
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
176
175
  *mask = tmp_mask;
177
 
  DBUG_VOID_RETURN;
 
176
  return;
178
177
}
179
178
 
180
179
 
184
183
 
185
184
void lock_slave_threads(Master_info* mi)
186
185
{
187
 
  DBUG_ENTER("lock_slave_threads");
188
 
 
189
186
  //TODO: see if we can do this without dual mutex
190
187
  pthread_mutex_lock(&mi->run_lock);
191
188
  pthread_mutex_lock(&mi->rli.run_lock);
192
 
  DBUG_VOID_RETURN;
 
189
  return;
193
190
}
194
191
 
195
192
 
199
196
 
200
197
void unlock_slave_threads(Master_info* mi)
201
198
{
202
 
  DBUG_ENTER("unlock_slave_threads");
203
 
 
204
199
  //TODO: see if we can do this without dual mutex
205
200
  pthread_mutex_unlock(&mi->rli.run_lock);
206
201
  pthread_mutex_unlock(&mi->run_lock);
207
 
  DBUG_VOID_RETURN;
 
202
  return;
208
203
}
209
204
 
210
205
 
212
207
 
213
208
int init_slave()
214
209
{
215
 
  DBUG_ENTER("init_slave");
216
 
 
217
210
  /*
218
211
    This is called when mysqld starts. Before client connections are
219
212
    accepted. However bootstrap may conflict with us if it does START SLAVE.
260
253
    }
261
254
  }
262
255
  pthread_mutex_unlock(&LOCK_active_mi);
263
 
  DBUG_RETURN(0);
 
256
  return(0);
264
257
 
265
258
err:
266
259
  pthread_mutex_unlock(&LOCK_active_mi);
267
 
  DBUG_RETURN(1);
 
260
  return(1);
268
261
}
269
262
 
270
263
 
282
275
void init_slave_skip_errors(const char* arg)
283
276
{
284
277
  const char *p;
285
 
  DBUG_ENTER("init_slave_skip_errors");
286
278
 
287
279
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
288
280
  {
295
287
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
296
288
  {
297
289
    bitmap_set_all(&slave_error_mask);
298
 
    DBUG_VOID_RETURN;
 
290
    return;
299
291
  }
300
292
  for (p= arg ; *p; )
301
293
  {
307
299
    while (!my_isdigit(system_charset_info,*p) && *p)
308
300
      p++;
309
301
  }
310
 
  DBUG_VOID_RETURN;
 
302
  return;
311
303
}
312
304
 
313
305
 
314
306
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
315
307
{
316
 
  DBUG_ENTER("terminate_slave_threads");
317
 
 
318
308
  if (!mi->inited)
319
 
    DBUG_RETURN(0); /* successfully do nothing */
 
309
    return(0); /* successfully do nothing */
320
310
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
321
311
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
322
312
 
323
313
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
324
314
  {
325
 
    DBUG_PRINT("info",("Terminating IO thread"));
326
315
    mi->abort_slave=1;
327
316
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
328
317
                                      &mi->stop_cond,
329
318
                                      &mi->slave_running,
330
319
                                      skip_lock)) &&
331
320
        !force_all)
332
 
      DBUG_RETURN(error);
 
321
      return(error);
333
322
  }
334
323
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
335
324
  {
336
 
    DBUG_PRINT("info",("Terminating SQL thread"));
337
325
    mi->rli.abort_slave=1;
338
326
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
339
327
                                      &mi->rli.stop_cond,
340
328
                                      &mi->rli.slave_running,
341
329
                                      skip_lock)) &&
342
330
        !force_all)
343
 
      DBUG_RETURN(error);
 
331
      return(error);
344
332
  }
345
 
  DBUG_RETURN(0);
 
333
  return(0);
346
334
}
347
335
 
348
336
 
384
372
{
385
373
  int error;
386
374
 
387
 
  DBUG_ENTER("terminate_slave_thread");
388
 
 
389
375
  if (!skip_lock)
390
376
    pthread_mutex_lock(term_lock);
391
377
 
395
381
  {
396
382
    if (!skip_lock)
397
383
      pthread_mutex_unlock(term_lock);
398
 
    DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
 
384
    return(ER_SLAVE_NOT_RUNNING);
399
385
  }
400
 
  DBUG_ASSERT(thd != 0);
 
386
  assert(thd != 0);
401
387
  THD_CHECK_SENTRY(thd);
402
388
 
403
389
  /*
407
393
 
408
394
  while (*slave_running)                        // Should always be true
409
395
  {
410
 
    DBUG_PRINT("loop", ("killing slave thread"));
411
 
 
412
396
    pthread_mutex_lock(&thd->LOCK_delete);
413
397
#ifndef DONT_USE_THR_ALARM
414
398
    /*
416
400
      EINVAL: invalid signal number (can't happen)
417
401
      ESRCH: thread already killed (can happen, should be ignored)
418
402
    */
419
 
    IF_DBUG(int err= ) pthread_kill(thd->real_id, thr_client_alarm);
420
 
    DBUG_ASSERT(err != EINVAL);
 
403
    int err= pthread_kill(thd->real_id, thr_client_alarm);
 
404
    assert(err != EINVAL);
421
405
#endif
422
406
    thd->awake(THD::NOT_KILLED);
423
407
    pthread_mutex_unlock(&thd->LOCK_delete);
429
413
    struct timespec abstime;
430
414
    set_timespec(abstime,2);
431
415
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
432
 
    DBUG_ASSERT(error == ETIMEDOUT || error == 0);
 
416
    assert(error == ETIMEDOUT || error == 0);
433
417
  }
434
418
 
435
 
  DBUG_ASSERT(*slave_running == 0);
 
419
  assert(*slave_running == 0);
436
420
 
437
421
  if (!skip_lock)
438
422
    pthread_mutex_unlock(term_lock);
439
 
  DBUG_RETURN(0);
 
423
  return(0);
440
424
}
441
425
 
442
426
 
450
434
{
451
435
  pthread_t th;
452
436
  ulong start_id;
453
 
  DBUG_ENTER("start_slave_thread");
454
437
 
455
 
  DBUG_ASSERT(mi->inited);
 
438
  assert(mi->inited);
456
439
 
457
440
  if (start_lock)
458
441
    pthread_mutex_lock(start_lock);
463
446
    if (start_lock)
464
447
      pthread_mutex_unlock(start_lock);
465
448
    sql_print_error("Server id not set, will not start slave");
466
 
    DBUG_RETURN(ER_BAD_SLAVE);
 
449
    return(ER_BAD_SLAVE);
467
450
  }
468
451
 
469
452
  if (*slave_running)
472
455
      pthread_cond_broadcast(start_cond);
473
456
    if (start_lock)
474
457
      pthread_mutex_unlock(start_lock);
475
 
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
 
458
    return(ER_SLAVE_MUST_STOP);
476
459
  }
477
460
  start_id= *slave_run_id;
478
 
  DBUG_PRINT("info",("Creating new slave thread"));
479
461
  if (high_priority)
480
462
  {
481
463
    struct sched_param tmp_sched_param;
488
470
  {
489
471
    if (start_lock)
490
472
      pthread_mutex_unlock(start_lock);
491
 
    DBUG_RETURN(ER_SLAVE_THREAD);
 
473
    return(ER_SLAVE_THREAD);
492
474
  }
493
475
  if (start_cond && cond_lock) // caller has cond_lock
494
476
  {
495
477
    THD* thd = current_thd;
496
478
    while (start_id == *slave_run_id)
497
479
    {
498
 
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
499
480
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
500
481
                                            "Waiting for slave thread to start");
501
482
      pthread_cond_wait(start_cond,cond_lock);
502
483
      thd->exit_cond(old_msg);
503
484
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
504
485
      if (thd->killed)
505
 
        DBUG_RETURN(thd->killed_errno());
 
486
        return(thd->killed_errno());
506
487
    }
507
488
  }
508
489
  if (start_lock)
509
490
    pthread_mutex_unlock(start_lock);
510
 
  DBUG_RETURN(0);
 
491
  return(0);
511
492
}
512
493
 
513
494
 
529
510
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
530
511
  pthread_cond_t* cond_io=0,*cond_sql=0;
531
512
  int error=0;
532
 
  DBUG_ENTER("start_slave_threads");
533
513
 
534
514
  if (need_slave_mutex)
535
515
  {
558
538
    if (error)
559
539
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
560
540
  }
561
 
  DBUG_RETURN(error);
 
541
  return(error);
562
542
}
563
543
 
564
544
 
565
545
#ifdef NOT_USED_YET
566
546
static int end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
567
547
{
568
 
  DBUG_ENTER("end_slave_on_walk");
569
 
 
570
548
  end_master_info(mi);
571
 
  DBUG_RETURN(0);
 
549
  return(0);
572
550
}
573
551
#endif
574
552
 
582
560
 
583
561
void end_slave()
584
562
{
585
 
  DBUG_ENTER("end_slave");
586
 
 
587
563
  /*
588
564
    This is called when the server terminates, in close_connections().
589
565
    It terminates slave threads. However, some CHANGE MASTER etc may still be
605
581
    active_mi= 0;
606
582
  }
607
583
  pthread_mutex_unlock(&LOCK_active_mi);
608
 
  DBUG_VOID_RETURN;
 
584
  return;
609
585
}
610
586
 
611
587
 
612
588
static bool io_slave_killed(THD* thd, Master_info* mi)
613
589
{
614
 
  DBUG_ENTER("io_slave_killed");
615
 
 
616
 
  DBUG_ASSERT(mi->io_thd == thd);
617
 
  DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
618
 
  DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
 
590
  assert(mi->io_thd == thd);
 
591
  assert(mi->slave_running); // tracking buffer overrun
 
592
  return(mi->abort_slave || abort_loop || thd->killed);
619
593
}
620
594
 
621
595
 
622
596
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
623
597
{
624
 
  DBUG_ENTER("sql_slave_killed");
625
 
 
626
 
  DBUG_ASSERT(rli->sql_thd == thd);
627
 
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
 
598
  assert(rli->sql_thd == thd);
 
599
  assert(rli->slave_running == 1);// tracking buffer overrun
628
600
  if (abort_loop || thd->killed || rli->abort_slave)
629
601
  {
630
602
    /*
637
609
      is actively working.
638
610
    */
639
611
    if (rli->last_event_start_time == 0)
640
 
      DBUG_RETURN(1);
641
 
    DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
642
 
                        "it some grace period"));
 
612
      return(1);
643
613
    if (difftime(time(0), rli->last_event_start_time) > 60)
644
614
    {
645
615
      rli->report(ERROR_LEVEL, 0,
649
619
                  "There is a risk of duplicate updates when the slave "
650
620
                  "SQL thread is restarted. Please check your tables' "
651
621
                  "contents after restart.");
652
 
      DBUG_RETURN(1);
 
622
      return(1);
653
623
    }
654
624
  }
655
 
  DBUG_RETURN(0);
 
625
  return(0);
656
626
}
657
627
 
658
628
 
665
635
 
666
636
void skip_load_data_infile(NET *net)
667
637
{
668
 
  DBUG_ENTER("skip_load_data_infile");
669
 
 
670
638
  (void)net_request_file(net, "/dev/null");
671
639
  (void)my_net_read(net);                               // discard response
672
640
  (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
673
 
  DBUG_VOID_RETURN;
 
641
  return;
674
642
}
675
643
 
676
644
 
677
645
bool net_request_file(NET* net, const char* fname)
678
646
{
679
 
  DBUG_ENTER("net_request_file");
680
 
  DBUG_RETURN(net_write_command(net, 251, (uchar*) fname, strlen(fname),
 
647
  return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
681
648
                                (uchar*) "", 0));
682
649
}
683
650
 
690
657
 
691
658
const char *print_slave_db_safe(const char* db)
692
659
{
693
 
  DBUG_ENTER("*print_slave_db_safe");
694
 
 
695
 
  DBUG_RETURN((db ? db : ""));
 
660
  return((db ? db : ""));
696
661
}
697
662
 
698
663
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
699
664
                                 const char *default_val)
700
665
{
701
666
  uint length;
702
 
  DBUG_ENTER("init_strvar_from_file");
703
667
 
704
668
  if ((length=my_b_gets(f,var, max_size)))
705
669
  {
715
679
      int c;
716
680
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
717
681
    }
718
 
    DBUG_RETURN(0);
 
682
    return(0);
719
683
  }
720
684
  else if (default_val)
721
685
  {
722
686
    strmake(var,  default_val, max_size-1);
723
 
    DBUG_RETURN(0);
 
687
    return(0);
724
688
  }
725
 
  DBUG_RETURN(1);
 
689
  return(1);
726
690
}
727
691
 
728
692
 
729
693
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
730
694
{
731
695
  char buf[32];
732
 
  DBUG_ENTER("init_intvar_from_file");
733
696
 
734
697
 
735
698
  if (my_b_gets(f, buf, sizeof(buf)))
736
699
  {
737
700
    *var = atoi(buf);
738
 
    DBUG_RETURN(0);
 
701
    return(0);
739
702
  }
740
703
  else if (default_val)
741
704
  {
742
705
    *var = default_val;
743
 
    DBUG_RETURN(0);
 
706
    return(0);
744
707
  }
745
 
  DBUG_RETURN(1);
 
708
  return(1);
746
709
}
747
710
 
748
711
int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
749
712
{
750
713
  char buf[16];
751
 
  DBUG_ENTER("init_floatvar_from_file");
752
714
 
753
715
 
754
716
  if (my_b_gets(f, buf, sizeof(buf)))
755
717
  {
756
718
    if (sscanf(buf, "%f", var) != 1)
757
 
      DBUG_RETURN(1);
 
719
      return(1);
758
720
    else
759
 
      DBUG_RETURN(0);
 
721
      return(0);
760
722
  }
761
723
  else if (default_val != 0.0)
762
724
  {
763
725
    *var = default_val;
764
 
    DBUG_RETURN(0);
 
726
    return(0);
765
727
  }
766
 
  DBUG_RETURN(1);
 
728
  return(1);
767
729
}
768
730
 
769
731
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
772
734
  {
773
735
    if (info && global_system_variables.log_warnings)
774
736
      sql_print_information(info);
775
 
    return TRUE;
 
737
    return true;
776
738
  }
777
 
  return FALSE;
 
739
  return false;
778
740
}
779
741
 
780
742
 
801
763
  int err_code= 0;
802
764
  MYSQL_RES *master_res= 0;
803
765
  MYSQL_ROW master_row;
804
 
  DBUG_ENTER("get_master_version_and_clock");
805
766
 
806
767
  err_msg.length(0);
807
768
  /*
1043
1004
  if (err_msg.length() != 0)
1044
1005
  {
1045
1006
    sql_print_error(err_msg.ptr());
1046
 
    DBUG_ASSERT(err_code != 0);
 
1007
    assert(err_code != 0);
1047
1008
    mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
1048
 
    DBUG_RETURN(1);
 
1009
    return(1);
1049
1010
  }
1050
1011
 
1051
 
  DBUG_RETURN(0);
 
1012
  return(0);
1052
1013
}
1053
1014
 
1054
1015
 
1058
1019
  Master_info* mi = rli->mi;
1059
1020
  const char *save_proc_info;
1060
1021
  THD* thd = mi->io_thd;
1061
 
  DBUG_ENTER("wait_for_relay_log_space");
1062
1022
 
1063
1023
  pthread_mutex_lock(&rli->log_space_lock);
1064
1024
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
1070
1030
         !rli->ignore_log_space_limit)
1071
1031
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1072
1032
  thd->exit_cond(save_proc_info);
1073
 
  DBUG_RETURN(slave_killed);
 
1033
  return(slave_killed);
1074
1034
}
1075
1035
 
1076
1036
 
1092
1052
{
1093
1053
  Relay_log_info *rli= &mi->rli;
1094
1054
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1095
 
  DBUG_ENTER("write_ignored_events_info_to_relay_log");
1096
1055
 
1097
 
  DBUG_ASSERT(thd == mi->io_thd);
 
1056
  assert(thd == mi->io_thd);
1098
1057
  pthread_mutex_lock(log_lock);
1099
1058
  if (rli->ign_master_log_name_end[0])
1100
1059
  {
1101
 
    DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
1102
1060
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1103
1061
                                               0, rli->ign_master_log_pos_end,
1104
1062
                                               Rotate_log_event::DUP_NAME);
1127
1085
  }
1128
1086
  else
1129
1087
    pthread_mutex_unlock(log_lock);
1130
 
  DBUG_VOID_RETURN;
 
1088
  return;
1131
1089
}
1132
1090
 
1133
1091
 
1136
1094
{
1137
1095
  uchar buf[1024], *pos= buf;
1138
1096
  uint report_host_len, report_user_len=0, report_password_len=0;
1139
 
  DBUG_ENTER("register_slave_on_master");
1140
1097
 
1141
 
  *suppress_warnings= FALSE;
 
1098
  *suppress_warnings= false;
1142
1099
  if (!report_host)
1143
 
    DBUG_RETURN(0);
 
1100
    return(0);
1144
1101
  report_host_len= strlen(report_host);
1145
1102
  if (report_user)
1146
1103
    report_user_len= strlen(report_user);
1149
1106
  /* 30 is a good safety margin */
1150
1107
  if (report_host_len + report_user_len + report_password_len + 30 >
1151
1108
      sizeof(buf))
1152
 
    DBUG_RETURN(0);                                     // safety
 
1109
    return(0);                                     // safety
1153
1110
 
1154
1111
  int4store(pos, server_id); pos+= 4;
1155
1112
  pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1164
1121
  {
1165
1122
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1166
1123
    {
1167
 
      *suppress_warnings= TRUE;                 // Suppress reconnect warning
 
1124
      *suppress_warnings= true;                 // Suppress reconnect warning
1168
1125
    }
1169
1126
    else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1170
1127
    {
1174
1131
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1175
1132
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1176
1133
    }
1177
 
    DBUG_RETURN(1);
 
1134
    return(1);
1178
1135
  }
1179
 
  DBUG_RETURN(0);
 
1136
  return(0);
1180
1137
}
1181
1138
 
1182
1139
 
1185
1142
  // TODO: fix this for multi-master
1186
1143
  List<Item> field_list;
1187
1144
  Protocol *protocol= thd->protocol;
1188
 
  DBUG_ENTER("show_master_info");
1189
1145
 
1190
1146
  field_list.push_back(new Item_empty_string("Slave_IO_State",
1191
1147
                                                     14));
1250
1206
 
1251
1207
  if (protocol->send_fields(&field_list,
1252
1208
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1253
 
    DBUG_RETURN(TRUE);
 
1209
    return(true);
1254
1210
 
1255
1211
  if (mi->host[0])
1256
1212
  {
1257
 
    DBUG_PRINT("info",("host is set: '%s'", mi->host));
1258
1213
    String *packet= &thd->packet;
1259
1214
    protocol->prepare_for_resend();
1260
1215
 
1367
1322
    pthread_mutex_unlock(&mi->data_lock);
1368
1323
 
1369
1324
    if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1370
 
      DBUG_RETURN(TRUE);
 
1325
      return(true);
1371
1326
  }
1372
1327
  my_eof(thd);
1373
 
  DBUG_RETURN(FALSE);
 
1328
  return(false);
1374
1329
}
1375
1330
 
1376
1331
 
1377
1332
void set_slave_thread_options(THD* thd)
1378
1333
{
1379
 
  DBUG_ENTER("set_slave_thread_options");
1380
1334
  /*
1381
1335
     It's nonsense to constrain the slave threads with max_join_size; if a
1382
1336
     query succeeded on master, we HAVE to execute it. So set
1393
1347
    options&= ~OPTION_BIN_LOG;
1394
1348
  thd->options= options;
1395
1349
  thd->variables.completion_type= 0;
1396
 
  DBUG_VOID_RETURN;
 
1350
  return;
1397
1351
}
1398
1352
 
1399
1353
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1400
1354
{
1401
 
  DBUG_ENTER("set_slave_thread_default_charset");
1402
 
 
1403
1355
  thd->variables.character_set_client=
1404
1356
    global_system_variables.character_set_client;
1405
1357
  thd->variables.collation_connection=
1415
1367
    effect.
1416
1368
   */
1417
1369
  const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1418
 
  DBUG_VOID_RETURN;
 
1370
  return;
1419
1371
}
1420
1372
 
1421
1373
/*
1424
1376
 
1425
1377
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1426
1378
{
1427
 
  DBUG_ENTER("init_slave_thread");
1428
 
#if !defined(DBUG_OFF)
1429
1379
  int simulate_error= 0;
1430
 
#endif
1431
1380
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1432
1381
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1433
1382
  thd->security_ctx->skip_grants();
1447
1396
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1448
1397
  pthread_mutex_unlock(&LOCK_thread_count);
1449
1398
 
1450
 
  DBUG_EXECUTE_IF("simulate_io_slave_error_on_init",
1451
 
                  simulate_error|= (1 << SLAVE_THD_IO););
1452
 
  DBUG_EXECUTE_IF("simulate_sql_slave_error_on_init",
1453
 
                  simulate_error|= (1 << SLAVE_THD_SQL););
1454
 
#if !defined(DBUG_OFF)
 
1399
 simulate_error|= (1 << SLAVE_THD_IO);
 
1400
 simulate_error|= (1 << SLAVE_THD_SQL);
1455
1401
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1456
 
#else
1457
 
  if (init_thr_lock() || thd->store_globals())
1458
 
#endif
1459
1402
  {
1460
1403
    thd->cleanup();
1461
 
    DBUG_RETURN(-1);
 
1404
    return(-1);
1462
1405
  }
1463
1406
  lex_start(thd);
1464
1407
 
1468
1411
    thd_proc_info(thd, "Waiting for master update");
1469
1412
  thd->version=refresh_version;
1470
1413
  thd->set_time();
1471
 
  DBUG_RETURN(0);
 
1414
  return(0);
1472
1415
}
1473
1416
 
1474
1417
 
1477
1420
{
1478
1421
  int nap_time;
1479
1422
  thr_alarm_t alarmed;
1480
 
  DBUG_ENTER("safe_sleep");
1481
1423
 
1482
1424
  thr_alarm_init(&alarmed);
1483
1425
  time_t start_time= my_time(0);
1496
1438
    thr_end_alarm(&alarmed);
1497
1439
 
1498
1440
    if ((*thread_killed)(thd,thread_killed_arg))
1499
 
      DBUG_RETURN(1);
 
1441
      return(1);
1500
1442
    start_time= my_time(0);
1501
1443
  }
1502
 
  DBUG_RETURN(0);
 
1444
  return(0);
1503
1445
}
1504
1446
 
1505
1447
 
1510
1452
  int len;
1511
1453
  int binlog_flags = 0; // for now
1512
1454
  char* logname = mi->master_log_name;
1513
 
  DBUG_ENTER("request_dump");
1514
1455
  
1515
 
  *suppress_warnings= FALSE;
 
1456
  *suppress_warnings= false;
1516
1457
 
1517
1458
  // TODO if big log files: Change next to int8store()
1518
1459
  int4store(buf, (ulong) mi->master_log_pos);
1528
1469
      now we just fill up the error log :-)
1529
1470
    */
1530
1471
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1531
 
      *suppress_warnings= TRUE;                 // Suppress reconnect warning
 
1472
      *suppress_warnings= true;                 // Suppress reconnect warning
1532
1473
    else
1533
1474
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
1534
1475
                      mysql_errno(mysql), mysql_error(mysql),
1535
1476
                      mi->connect_retry);
1536
 
    DBUG_RETURN(1);
 
1477
    return(1);
1537
1478
  }
1538
1479
 
1539
 
  DBUG_RETURN(0);
 
1480
  return(0);
1540
1481
}
1541
1482
 
1542
1483
/*
1561
1502
                        bool* suppress_warnings)
1562
1503
{
1563
1504
  ulong len;
1564
 
  DBUG_ENTER("read_event");
1565
1505
 
1566
 
  *suppress_warnings= FALSE;
 
1506
  *suppress_warnings= false;
1567
1507
  /*
1568
1508
    my_real_read() will time us out
1569
1509
    We check if we were told to die, and if not, try reading again
1570
1510
  */
1571
 
#ifndef DBUG_OFF
1572
1511
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1573
 
    DBUG_RETURN(packet_error);
1574
 
#endif
 
1512
    return(packet_error);
1575
1513
 
1576
1514
  len = cli_safe_read(mysql);
1577
1515
  if (len == packet_error || (long) len < 1)
1583
1521
        we suppress prints to .err file as long as the reconnect
1584
1522
        happens without problems
1585
1523
      */
1586
 
      *suppress_warnings= TRUE;
 
1524
      *suppress_warnings= true;
1587
1525
    }
1588
1526
    else
1589
1527
      sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
1590
1528
                      mysql_error(mysql), mysql_errno(mysql));
1591
 
    DBUG_RETURN(packet_error);
 
1529
    return(packet_error);
1592
1530
  }
1593
1531
 
1594
1532
  /* Check if eof packet */
1597
1535
    sql_print_information("Slave: received end packet from server, apparent "
1598
1536
                          "master shutdown: %s",
1599
1537
                     mysql_error(mysql));
1600
 
     DBUG_RETURN(packet_error);
 
1538
     return(packet_error);
1601
1539
  }
1602
1540
 
1603
 
  DBUG_PRINT("exit", ("len: %lu  net->read_pos[4]: %d",
1604
 
                      len, mysql->net.read_pos[4]));
1605
 
  DBUG_RETURN(len - 1);
 
1541
  return(len - 1);
1606
1542
}
1607
1543
 
1608
1544
 
1610
1546
                         Relay_log_info const *rli __attribute__((__unused__)),
1611
1547
                         int expected_error)
1612
1548
{
1613
 
  DBUG_ENTER("check_expected_error");
1614
 
 
1615
1549
  switch (expected_error) {
1616
1550
  case ER_NET_READ_ERROR:
1617
1551
  case ER_NET_ERROR_ON_WRITE:
1618
1552
  case ER_QUERY_INTERRUPTED:
1619
1553
  case ER_SERVER_SHUTDOWN:
1620
1554
  case ER_NEW_ABORTING_CONNECTION:
1621
 
    DBUG_RETURN(1);
 
1555
    return(1);
1622
1556
  default:
1623
 
    DBUG_RETURN(0);
 
1557
    return(0);
1624
1558
  }
1625
1559
}
1626
1560
 
1634
1568
*/
1635
1569
static int has_temporary_error(THD *thd)
1636
1570
{
1637
 
  DBUG_ENTER("has_temporary_error");
1638
 
 
1639
1571
  if (thd->is_fatal_error)
1640
 
    DBUG_RETURN(0);
 
1572
    return(0);
1641
1573
 
1642
 
  DBUG_EXECUTE_IF("all_errors_are_temporary_errors",
1643
 
                  if (thd->main_da.is_error())
1644
 
                  {
1645
 
                    thd->clear_error();
1646
 
                    my_error(ER_LOCK_DEADLOCK, MYF(0));
1647
 
                  });
 
1574
  if (thd->main_da.is_error())
 
1575
  {
 
1576
    thd->clear_error();
 
1577
    my_error(ER_LOCK_DEADLOCK, MYF(0));
 
1578
  }
1648
1579
 
1649
1580
  /*
1650
1581
    If there is no message in THD, we can't say if it's a temporary
1652
1583
    which sets no message. Return FALSE.
1653
1584
  */
1654
1585
  if (!thd->is_error())
1655
 
    DBUG_RETURN(0);
 
1586
    return(0);
1656
1587
 
1657
1588
  /*
1658
1589
    Temporary error codes:
1661
1592
  */
1662
1593
  if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1663
1594
      thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1664
 
    DBUG_RETURN(1);
 
1595
    return(1);
1665
1596
 
1666
 
  DBUG_RETURN(0);
 
1597
  return(0);
1667
1598
}
1668
1599
 
1669
1600
 
1698
1629
{
1699
1630
  int exec_res= 0;
1700
1631
 
1701
 
  DBUG_ENTER("apply_event_and_update_pos");
1702
 
 
1703
 
  DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
1704
 
                           ev->get_type_str(), ev->get_type_code(),
1705
 
                           ev->server_id));
1706
 
  DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu",
1707
 
                      FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
1708
 
                      FLAGSTR(thd->options, OPTION_BEGIN),
1709
 
                      rli->last_event_start_time));
1710
 
 
1711
1632
  /*
1712
1633
    Execute the event to change the database and update the binary
1713
1634
    log coordinates, but first we set some data that is needed for
1747
1668
    pthread_mutex_unlock(&rli->data_lock);
1748
1669
    if (reason == Log_event::EVENT_SKIP_NOT)
1749
1670
      exec_res= ev->apply_event(rli);
1750
 
#ifndef DBUG_OFF
1751
 
    /*
1752
 
      This only prints information to the debug trace.
1753
 
 
1754
 
      TODO: Print an informational message to the error log?
1755
 
    */
1756
 
    static const char *const explain[] = {
1757
 
      // EVENT_SKIP_NOT,
1758
 
      "not skipped",
1759
 
      // EVENT_SKIP_IGNORE,
1760
 
      "skipped because event should be ignored",
1761
 
      // EVENT_SKIP_COUNT
1762
 
      "skipped because event skip counter was non-zero"
1763
 
    };
1764
 
    DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
1765
 
                        thd->options & OPTION_BEGIN ? 1 : 0,
1766
 
                        rli->get_flag(Relay_log_info::IN_STMT)));
1767
 
    DBUG_PRINT("skip_event", ("%s event was %s",
1768
 
                              ev->get_type_str(), explain[reason]));
1769
 
#endif
1770
1671
  }
1771
1672
  else
1772
1673
    exec_res= ev->apply_event(rli);
1773
1674
 
1774
 
  DBUG_PRINT("info", ("apply_event error = %d", exec_res));
1775
1675
  if (exec_res == 0)
1776
1676
  {
1777
1677
    int error= ev->update_pos(rli);
1778
 
#ifdef HAVE_purify
1779
 
    if (!rli->is_fake)
1780
 
#endif
1781
 
    {
1782
 
#ifndef DBUG_OFF
1783
 
      char buf[22];
1784
 
#endif
1785
 
      DBUG_PRINT("info", ("update_pos error = %d", error));
1786
 
      DBUG_PRINT("info", ("group %s %s",
1787
 
                          llstr(rli->group_relay_log_pos, buf),
1788
 
                          rli->group_relay_log_name));
1789
 
      DBUG_PRINT("info", ("event %s %s",
1790
 
                          llstr(rli->event_relay_log_pos, buf),
1791
 
                          rli->event_relay_log_name));
1792
 
    }
1793
1678
    /*
1794
1679
      The update should not fail, so print an error message and
1795
1680
      return an error code.
1807
1692
                  " Stopped in %s position %s",
1808
1693
                  rli->group_relay_log_name,
1809
1694
                  llstr(rli->group_relay_log_pos, buf));
1810
 
      DBUG_RETURN(2);
 
1695
      return(2);
1811
1696
    }
1812
1697
  }
1813
1698
 
1814
 
  DBUG_RETURN(exec_res ? 1 : 0);
 
1699
  return(exec_res ? 1 : 0);
1815
1700
}
1816
1701
 
1817
1702
 
1845
1730
*/
1846
1731
static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
1847
1732
{
1848
 
  DBUG_ENTER("exec_relay_log_event");
1849
 
 
1850
1733
  /*
1851
1734
     We acquire this mutex since we need it for all operations except
1852
1735
     event execution. But we will release it in places where we will
1856
1739
 
1857
1740
  Log_event * ev = next_event(rli);
1858
1741
 
1859
 
  DBUG_ASSERT(rli->sql_thd==thd);
 
1742
  assert(rli->sql_thd==thd);
1860
1743
 
1861
1744
  if (sql_slave_killed(thd,rli))
1862
1745
  {
1863
1746
    pthread_mutex_unlock(&rli->data_lock);
1864
1747
    delete ev;
1865
 
    DBUG_RETURN(1);
 
1748
    return(1);
1866
1749
  }
1867
1750
  if (ev)
1868
1751
  {
1887
1770
      rli->abort_slave= 1;
1888
1771
      pthread_mutex_unlock(&rli->data_lock);
1889
1772
      delete ev;
1890
 
      DBUG_RETURN(1);
 
1773
      return(1);
1891
1774
    }
1892
 
    exec_res= apply_event_and_update_pos(ev, thd, rli, TRUE);
 
1775
    exec_res= apply_event_and_update_pos(ev, thd, rli, true);
1893
1776
 
1894
1777
    /*
1895
1778
      Format_description_log_event should not be deleted because it will be
1898
1781
    */
1899
1782
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1900
1783
    {
1901
 
      DBUG_PRINT("info", ("Deleting the event after it has been executed"));
1902
1784
      delete ev;
1903
1785
    }
1904
1786
 
1907
1789
      retry.
1908
1790
    */
1909
1791
    if (exec_res == 2)
1910
 
      DBUG_RETURN(1);
 
1792
      return(1);
1911
1793
 
1912
1794
    if (slave_trans_retries)
1913
1795
    {
1950
1832
            rli->trans_retries++;
1951
1833
            rli->retried_trans++;
1952
1834
            pthread_mutex_unlock(&rli->data_lock);
1953
 
            DBUG_PRINT("info", ("Slave retries transaction "
1954
 
                                "rli->trans_retries: %lu", rli->trans_retries));
1955
1835
          }
1956
1836
        }
1957
1837
        else
1971
1851
          non-transient error, the slave will stop with an error.
1972
1852
         */
1973
1853
        rli->trans_retries= 0; // restart from fresh
1974
 
        DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu",
1975
 
                            rli->trans_retries));
1976
1854
      }
1977
1855
    }
1978
 
    DBUG_RETURN(exec_res);
 
1856
    return(exec_res);
1979
1857
  }
1980
1858
  pthread_mutex_unlock(&rli->data_lock);
1981
1859
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1988
1866
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
1989
1867
on this slave.\
1990
1868
");
1991
 
  DBUG_RETURN(1);
 
1869
  return(1);
1992
1870
}
1993
1871
 
1994
1872
 
2079
1957
  char llbuff[22];
2080
1958
  uint retry_count;
2081
1959
  bool suppress_warnings;
2082
 
#ifndef DBUG_OFF
2083
1960
  uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
2084
 
#endif
2085
1961
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
2086
1962
  my_thread_init();
2087
 
  DBUG_ENTER("handle_slave_io");
2088
1963
 
2089
 
  DBUG_ASSERT(mi->inited);
 
1964
  assert(mi->inited);
2090
1965
  mysql= NULL ;
2091
1966
  retry_count= 0;
2092
1967
 
2094
1969
  /* Inform waiting threads that slave has started */
2095
1970
  mi->slave_run_id++;
2096
1971
 
2097
 
#ifndef DBUG_OFF
2098
1972
  mi->events_till_disconnect = disconnect_slave_event_count;
2099
 
#endif
2100
1973
 
2101
1974
  thd= new THD; // note that contructor of THD uses DBUG_ !
2102
1975
  THD_CHECK_SENTRY(thd);
2119
1992
  pthread_mutex_unlock(&mi->run_lock);
2120
1993
  pthread_cond_broadcast(&mi->start_cond);
2121
1994
 
2122
 
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
2123
 
                            mi->master_log_name,
2124
 
                            llstr(mi->master_log_pos,llbuff)));
2125
 
 
2126
1995
  if (!(mi->mysql = mysql = mysql_init(NULL)))
2127
1996
  {
2128
1997
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2181
2050
        goto err;
2182
2051
      goto connected;
2183
2052
    }
2184
 
    DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_REG", 
2185
 
      if (!retry_count_reg)
2186
 
      {
2187
 
        retry_count_reg++;
2188
 
        sql_print_information("Forcing to reconnect slave I/O thread");
2189
 
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2190
 
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
2191
 
          goto err;
2192
 
        goto connected;
2193
 
      });
 
2053
    if (!retry_count_reg)
 
2054
    {
 
2055
      retry_count_reg++;
 
2056
      sql_print_information("Forcing to reconnect slave I/O thread");
 
2057
      if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2058
                         reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2059
        goto err;
 
2060
      goto connected;
 
2061
    }
2194
2062
  }
2195
2063
 
2196
 
  DBUG_PRINT("info",("Starting reading binary log from master"));
2197
2064
  while (!io_slave_killed(thd,mi))
2198
2065
  {
2199
2066
    thd_proc_info(thd, "Requesting binlog dump");
2207
2074
        goto err;
2208
2075
      goto connected;
2209
2076
    }
2210
 
    DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_DUMP", 
2211
 
      if (!retry_count_dump)
2212
 
      {
2213
 
        retry_count_dump++;
2214
 
        sql_print_information("Forcing to reconnect slave I/O thread");
2215
 
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2216
 
                             reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2217
 
          goto err;
2218
 
        goto connected;
2219
 
      });
 
2077
    if (!retry_count_dump)
 
2078
    {
 
2079
      retry_count_dump++;
 
2080
      sql_print_information("Forcing to reconnect slave I/O thread");
 
2081
      if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2082
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2083
        goto err;
 
2084
      goto connected;
 
2085
    }
2220
2086
 
2221
2087
    while (!io_slave_killed(thd,mi))
2222
2088
    {
2232
2098
      if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
2233
2099
reading event"))
2234
2100
        goto err;
2235
 
      DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT",
2236
 
        if (!retry_count_event)
2237
 
        {
2238
 
          retry_count_event++;
2239
 
          sql_print_information("Forcing to reconnect slave I/O thread");
2240
 
          if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2241
 
                               reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2242
 
            goto err;
2243
 
          goto connected;
2244
 
        });
 
2101
      if (!retry_count_event)
 
2102
      {
 
2103
        retry_count_event++;
 
2104
        sql_print_information("Forcing to reconnect slave I/O thread");
 
2105
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2106
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2107
          goto err;
 
2108
        goto connected;
 
2109
      }
2245
2110
 
2246
2111
      if (event_len == packet_error)
2247
2112
      {
2293
2158
        for no reason, but this function will do a clean read, notice the clean
2294
2159
        value and exit immediately.
2295
2160
      */
2296
 
#ifndef DBUG_OFF
2297
 
      {
2298
 
        char llbuf1[22], llbuf2[22];
2299
 
        DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
2300
 
ignore_log_space_limit=%d",
2301
 
                            llstr(rli->log_space_limit,llbuf1),
2302
 
                            llstr(rli->log_space_total,llbuf2),
2303
 
                            (int) rli->ignore_log_space_limit));
2304
 
      }
2305
 
#endif
2306
 
 
2307
2161
      if (rli->log_space_limit && rli->log_space_limit <
2308
2162
          rli->log_space_total &&
2309
2163
          !rli->ignore_log_space_limit)
2350
2204
  mi->rli.relay_log.description_event_for_queue= 0;
2351
2205
  // TODO: make rpl_status part of Master_info
2352
2206
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2353
 
  DBUG_ASSERT(thd->net.buff != 0);
 
2207
  assert(thd->net.buff != 0);
2354
2208
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2355
2209
  close_thread_tables(thd);
2356
2210
  pthread_mutex_lock(&LOCK_thread_count);
2369
2223
  pthread_mutex_unlock(&mi->run_lock);
2370
2224
  my_thread_end();
2371
2225
  pthread_exit(0);
2372
 
  DBUG_RETURN(0);                               // Can't return anything here
 
2226
  return(0);                               // Can't return anything here
2373
2227
}
2374
2228
 
2375
2229
 
2385
2239
 
2386
2240
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
2387
2241
  my_thread_init();
2388
 
  DBUG_ENTER("handle_slave_sql");
2389
2242
 
2390
 
  DBUG_ASSERT(rli->inited);
 
2243
  assert(rli->inited);
2391
2244
  pthread_mutex_lock(&rli->run_lock);
2392
 
  DBUG_ASSERT(!rli->slave_running);
 
2245
  assert(!rli->slave_running);
2393
2246
  errmsg= 0;
2394
 
#ifndef DBUG_OFF
2395
2247
  rli->events_till_abort = abort_slave_event_count;
2396
 
#endif
2397
2248
 
2398
2249
  thd = new THD; // note that contructor of THD uses DBUG_ !
2399
2250
  thd->thread_stack = (char*)&thd; // remember where our stack is
2449
2300
  rli->ignore_log_space_limit= 0;
2450
2301
  pthread_mutex_unlock(&rli->log_space_lock);
2451
2302
  rli->trans_retries= 0; // start from "no error"
2452
 
  DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries));
2453
2303
 
2454
2304
  if (init_relay_log_pos(rli,
2455
2305
                         rli->group_relay_log_name,
2462
2312
    goto err;
2463
2313
  }
2464
2314
  THD_CHECK_SENTRY(thd);
2465
 
#ifndef DBUG_OFF
2466
 
  {
2467
 
    char llbuf1[22], llbuf2[22];
2468
 
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
2469
 
                        llstr(my_b_tell(rli->cur_log),llbuf1),
2470
 
                        llstr(rli->event_relay_log_pos,llbuf2)));
2471
 
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2472
 
    /*
2473
 
      Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2474
 
      correct position when it's called just after my_b_seek() (the questionable
2475
 
      stuff is those "seek is done on next read" comments in the my_b_seek()
2476
 
      source code).
2477
 
      The crude reality is that this assertion randomly fails whereas
2478
 
      replication seems to work fine. And there is no easy explanation why it
2479
 
      fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2480
 
      init_relay_log_pos() called above). Maybe the assertion would be
2481
 
      meaningful if we held rli->data_lock between the my_b_seek() and the
2482
 
      DBUG_ASSERT().
2483
 
    */
2484
 
#ifdef SHOULD_BE_CHECKED
2485
 
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2486
 
#endif
2487
 
  }
2488
 
#endif
2489
 
  DBUG_ASSERT(rli->sql_thd == thd);
 
2315
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
2316
  /*
 
2317
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
 
2318
    correct position when it's called just after my_b_seek() (the questionable
 
2319
    stuff is those "seek is done on next read" comments in the my_b_seek()
 
2320
    source code).
 
2321
    The crude reality is that this assertion randomly fails whereas
 
2322
    replication seems to work fine. And there is no easy explanation why it
 
2323
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
 
2324
    init_relay_log_pos() called above). Maybe the assertion would be
 
2325
    meaningful if we held rli->data_lock between the my_b_seek() and the
 
2326
    assert().
 
2327
  */
 
2328
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
2329
  assert(rli->sql_thd == thd);
2490
2330
 
2491
 
  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
2492
 
                            rli->group_master_log_name,
2493
 
                            llstr(rli->group_master_log_pos,llbuff)));
2494
2331
  if (global_system_variables.log_warnings)
2495
2332
    sql_print_information("Slave SQL thread initialized, starting replication in \
2496
2333
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
2530
2367
  while (!sql_slave_killed(thd,rli))
2531
2368
  {
2532
2369
    thd_proc_info(thd, "Reading event from the relay log");
2533
 
    DBUG_ASSERT(rli->sql_thd == thd);
 
2370
    assert(rli->sql_thd == thd);
2534
2371
    THD_CHECK_SENTRY(thd);
2535
2372
    if (exec_relay_log_event(thd,rli))
2536
2373
    {
2537
 
      DBUG_PRINT("info", ("exec_relay_log_event() failed"));
2538
2374
      // do not scare the user if SQL thread was simply killed or stopped
2539
2375
      if (!sql_slave_killed(thd,rli))
2540
2376
      {
2549
2385
        {
2550
2386
          char const *const errmsg= thd->main_da.message();
2551
2387
 
2552
 
          DBUG_PRINT("info",
2553
 
                     ("thd->main_da.sql_errno()=%d; rli->last_error.number=%d",
2554
 
                      thd->main_da.sql_errno(), last_errno));
2555
2388
          if (last_errno == 0)
2556
2389
          {
2557
2390
            rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2620
2453
  pthread_mutex_lock(&rli->run_lock);
2621
2454
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
2622
2455
  pthread_mutex_lock(&rli->data_lock);
2623
 
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
 
2456
  assert(rli->slave_running == 1); // tracking buffer overrun
2624
2457
  /* When master_pos_wait() wakes up it will check this and terminate */
2625
2458
  rli->slave_running= 0;
2626
2459
  /* Forget the relay log's format */
2628
2461
  rli->relay_log.description_event_for_exec= 0;
2629
2462
  /* Wake up master_pos_wait() */
2630
2463
  pthread_mutex_unlock(&rli->data_lock);
2631
 
  DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
2632
2464
  pthread_cond_broadcast(&rli->data_cond);
2633
2465
  rli->ignore_log_space_limit= 0; /* don't need any lock */
2634
2466
  /* we die so won't remember charset - re-update them on next thread start */
2640
2472
    to avoid unneeded position re-init
2641
2473
  */
2642
2474
  thd->temporary_tables = 0; // remove tempation from destructor to close them
2643
 
  DBUG_ASSERT(thd->net.buff != 0);
 
2475
  assert(thd->net.buff != 0);
2644
2476
  net_end(&thd->net); // destructor will not free it, because we are weird
2645
 
  DBUG_ASSERT(rli->sql_thd == thd);
 
2477
  assert(rli->sql_thd == thd);
2646
2478
  THD_CHECK_SENTRY(thd);
2647
2479
  rli->sql_thd= 0;
2648
2480
  pthread_mutex_lock(&LOCK_thread_count);
2659
2491
  
2660
2492
  my_thread_end();
2661
2493
  pthread_exit(0);
2662
 
  DBUG_RETURN(0);                               // Can't return anything here
 
2494
  return(0);                               // Can't return anything here
2663
2495
}
2664
2496
 
2665
2497
 
2674
2506
  bool cev_not_written;
2675
2507
  THD *thd = mi->io_thd;
2676
2508
  NET *net = &mi->mysql->net;
2677
 
  DBUG_ENTER("process_io_create_file");
2678
2509
 
2679
2510
  if (unlikely(!cev->is_valid()))
2680
 
    DBUG_RETURN(1);
 
2511
    return(1);
2681
2512
 
2682
2513
  if (!rpl_filter->db_ok(cev->db))
2683
2514
  {
2684
2515
    skip_load_data_infile(net);
2685
 
    DBUG_RETURN(0);
 
2516
    return(0);
2686
2517
  }
2687
 
  DBUG_ASSERT(cev->inited_from_old);
 
2518
  assert(cev->inited_from_old);
2688
2519
  thd->file_id = cev->file_id = mi->file_id++;
2689
2520
  thd->server_id = cev->server_id;
2690
2521
  cev_not_written = 1;
2768
2599
  }
2769
2600
  error=0;
2770
2601
err:
2771
 
  DBUG_RETURN(error);
 
2602
  return(error);
2772
2603
}
2773
2604
 
2774
2605
 
2796
2627
 
2797
2628
static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2798
2629
{
2799
 
  DBUG_ENTER("process_io_rotate");
2800
2630
  safe_mutex_assert_owner(&mi->data_lock);
2801
2631
 
2802
2632
  if (unlikely(!rev->is_valid()))
2803
 
    DBUG_RETURN(1);
 
2633
    return(1);
2804
2634
 
2805
2635
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2806
2636
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2807
2637
  mi->master_log_pos= rev->pos;
2808
 
  DBUG_PRINT("info", ("master_log_pos: '%s' %lu",
2809
 
                      mi->master_log_name, (ulong) mi->master_log_pos));
2810
 
#ifndef DBUG_OFF
2811
2638
  /*
2812
2639
    If we do not do this, we will be getting the first
2813
2640
    rotate event forever, so we need to not disconnect after one.
2814
2641
  */
2815
2642
  if (disconnect_slave_event_count)
2816
2643
    mi->events_till_disconnect++;
2817
 
#endif
2818
2644
 
2819
2645
  /*
2820
2646
    If description_event_for_queue is format <4, there is conversion in the
2835
2661
    start or mysqlbinlog)
2836
2662
  */
2837
2663
  rotate_relay_log(mi); /* will take the right mutexes */
2838
 
  DBUG_RETURN(0);
 
2664
  return(0);
2839
2665
}
2840
2666
 
2841
2667
/*
2850
2676
  bool ignore_event= 0;
2851
2677
  char *tmp_buf = 0;
2852
2678
  Relay_log_info *rli= &mi->rli;
2853
 
  DBUG_ENTER("queue_binlog_ver_1_event");
2854
2679
 
2855
2680
  /*
2856
2681
    If we get Load event, we need to pass a non-reusable buffer
2862
2687
    {
2863
2688
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2864
2689
                 ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed");
2865
 
      DBUG_RETURN(1);
 
2690
      return(1);
2866
2691
    }
2867
2692
    memcpy(tmp_buf,buf,event_len);
2868
2693
    /*
2890
2715
 master could be corrupt but a more likely cause of this is a bug",
2891
2716
                    errmsg);
2892
2717
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2893
 
    DBUG_RETURN(1);
 
2718
    return(1);
2894
2719
  }
2895
2720
 
2896
2721
  pthread_mutex_lock(&mi->data_lock);
2905
2730
    {
2906
2731
      delete ev;
2907
2732
      pthread_mutex_unlock(&mi->data_lock);
2908
 
      DBUG_RETURN(1);
 
2733
      return(1);
2909
2734
    }
2910
2735
    inc_pos= 0;
2911
2736
    break;
2918
2743
    */
2919
2744
  {
2920
2745
    /* We come here when and only when tmp_buf != 0 */
2921
 
    DBUG_ASSERT(tmp_buf != 0);
 
2746
    assert(tmp_buf != 0);
2922
2747
    inc_pos=event_len;
2923
2748
    ev->log_pos+= inc_pos;
2924
2749
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
2925
2750
    delete ev;
2926
2751
    mi->master_log_pos += inc_pos;
2927
 
    DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
2928
2752
    pthread_mutex_unlock(&mi->data_lock);
2929
2753
    my_free((char*)tmp_buf, MYF(0));
2930
 
    DBUG_RETURN(error);
 
2754
    return(error);
2931
2755
  }
2932
2756
  default:
2933
2757
    inc_pos= event_len;
2945
2769
    {
2946
2770
      delete ev;
2947
2771
      pthread_mutex_unlock(&mi->data_lock);
2948
 
      DBUG_RETURN(1);
 
2772
      return(1);
2949
2773
    }
2950
2774
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2951
2775
  }
2952
2776
  delete ev;
2953
2777
  mi->master_log_pos+= inc_pos;
2954
 
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
2955
2778
  pthread_mutex_unlock(&mi->data_lock);
2956
 
  DBUG_RETURN(0);
 
2779
  return(0);
2957
2780
}
2958
2781
 
2959
2782
/*
2967
2790
  ulong inc_pos;
2968
2791
  char *tmp_buf = 0;
2969
2792
  Relay_log_info *rli= &mi->rli;
2970
 
  DBUG_ENTER("queue_binlog_ver_3_event");
2971
2793
 
2972
2794
  /* read_log_event() will adjust log_pos to be end_log_pos */
2973
2795
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2978
2800
 master could be corrupt but a more likely cause of this is a bug",
2979
2801
                    errmsg);
2980
2802
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2981
 
    DBUG_RETURN(1);
 
2803
    return(1);
2982
2804
  }
2983
2805
  pthread_mutex_lock(&mi->data_lock);
2984
2806
  switch (ev->get_type_code()) {
2989
2811
    {
2990
2812
      delete ev;
2991
2813
      pthread_mutex_unlock(&mi->data_lock);
2992
 
      DBUG_RETURN(1);
 
2814
      return(1);
2993
2815
    }
2994
2816
    inc_pos= 0;
2995
2817
    break;
3001
2823
  {
3002
2824
    delete ev;
3003
2825
    pthread_mutex_unlock(&mi->data_lock);
3004
 
    DBUG_RETURN(1);
 
2826
    return(1);
3005
2827
  }
3006
2828
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3007
2829
  delete ev;
3008
2830
  mi->master_log_pos+= inc_pos;
3009
2831
err:
3010
 
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3011
2832
  pthread_mutex_unlock(&mi->data_lock);
3012
 
  DBUG_RETURN(0);
 
2833
  return(0);
3013
2834
}
3014
2835
 
3015
2836
/*
3027
2848
static int queue_old_event(Master_info *mi, const char *buf,
3028
2849
                           ulong event_len)
3029
2850
{
3030
 
  DBUG_ENTER("queue_old_event");
3031
 
 
3032
2851
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
3033
2852
  {
3034
2853
  case 1:
3035
 
      DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len));
 
2854
      return(queue_binlog_ver_1_event(mi,buf,event_len));
3036
2855
  case 3:
3037
 
      DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len));
 
2856
      return(queue_binlog_ver_3_event(mi,buf,event_len));
3038
2857
  default: /* unsupported format; eg version 2 */
3039
 
    DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
3040
 
                       mi->rli.relay_log.description_event_for_queue->binlog_version));
3041
 
    DBUG_RETURN(1);
 
2858
    return(1);
3042
2859
  }
3043
2860
}
3044
2861
 
3059
2876
  ulong inc_pos;
3060
2877
  Relay_log_info *rli= &mi->rli;
3061
2878
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
3062
 
  DBUG_ENTER("queue_event");
3063
2879
 
3064
2880
 
3065
2881
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
3066
2882
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
3067
 
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
 
2883
    return(queue_old_event(mi,buf,event_len));
3068
2884
 
3069
2885
  pthread_mutex_lock(&mi->data_lock);
3070
2886
 
3129
2945
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
3130
2946
    */
3131
2947
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
3132
 
    DBUG_PRINT("info",("binlog format is now %d",
3133
 
                       mi->rli.relay_log.description_event_for_queue->binlog_version));
3134
 
 
3135
2948
  }
3136
2949
  break;
3137
2950
 
3227
3040
    {
3228
3041
      mi->master_log_pos+= inc_pos;
3229
3042
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3230
 
      DBUG_ASSERT(rli->ign_master_log_name_end[0]);
 
3043
      assert(rli->ign_master_log_name_end[0]);
3231
3044
      rli->ign_master_log_pos_end= mi->master_log_pos;
3232
3045
    }
3233
3046
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3234
 
    DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored",
3235
 
                        (ulong) mi->master_log_pos));
3236
3047
  }
3237
3048
  else
3238
3049
  {
3240
3051
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3241
3052
    {
3242
3053
      mi->master_log_pos+= inc_pos;
3243
 
      DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3244
3054
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3245
3055
    }
3246
3056
    else
3255
3065
  
3256
3066
err:
3257
3067
  pthread_mutex_unlock(&mi->data_lock);
3258
 
  DBUG_PRINT("info", ("error: %d", error));
3259
3068
  if (error)
3260
3069
    mi->report(ERROR_LEVEL, error, ER(error), 
3261
3070
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3262
3071
               "could not queue event from master" :
3263
3072
               error_msg.ptr());
3264
 
  DBUG_RETURN(error);
 
3073
  return(error);
3265
3074
}
3266
3075
 
3267
3076
 
3268
3077
void end_relay_log_info(Relay_log_info* rli)
3269
3078
{
3270
 
  DBUG_ENTER("end_relay_log_info");
3271
 
 
3272
3079
  if (!rli->inited)
3273
 
    DBUG_VOID_RETURN;
 
3080
    return;
3274
3081
  if (rli->info_fd >= 0)
3275
3082
  {
3276
3083
    end_io_cache(&rli->info_file);
3292
3099
    of slave's temp tables after shutdown.
3293
3100
  */
3294
3101
  rli->close_temporary_tables();
3295
 
  DBUG_VOID_RETURN;
 
3102
  return;
3296
3103
}
3297
3104
 
3298
3105
/*
3311
3118
 
3312
3119
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi)
3313
3120
{
3314
 
  DBUG_ENTER("safe_connect");
3315
 
 
3316
 
  DBUG_RETURN(connect_to_master(thd, mysql, mi, 0, 0));
 
3121
  return(connect_to_master(thd, mysql, mi, 0, 0));
3317
3122
}
3318
3123
 
3319
3124
 
3333
3138
  int last_errno= -2;                           // impossible error
3334
3139
  ulong err_count=0;
3335
3140
  char llbuff[22];
3336
 
  DBUG_ENTER("connect_to_master");
3337
3141
 
3338
 
#ifndef DBUG_OFF
3339
3142
  mi->events_till_disconnect = disconnect_slave_event_count;
3340
 
#endif
3341
3143
  ulong client_flag= CLIENT_REMEMBER_OPTIONS;
3342
3144
  if (opt_slave_compressed_protocol)
3343
3145
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
3405
3207
#endif
3406
3208
  }
3407
3209
  mysql->reconnect= 1;
3408
 
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
3409
 
  DBUG_RETURN(slave_was_killed);
 
3210
  return(slave_was_killed);
3410
3211
}
3411
3212
 
3412
3213
 
3421
3222
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
3422
3223
                          bool suppress_warnings)
3423
3224
{
3424
 
  DBUG_ENTER("safe_reconnect");
3425
 
  DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
 
3225
  return(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
3426
3226
}
3427
3227
 
3428
3228
 
3458
3258
bool flush_relay_log_info(Relay_log_info* rli)
3459
3259
{
3460
3260
  bool error=0;
3461
 
  DBUG_ENTER("flush_relay_log_info");
3462
3261
 
3463
3262
  if (unlikely(rli->no_storage))
3464
 
    DBUG_RETURN(0);
 
3263
    return(0);
3465
3264
 
3466
3265
  IO_CACHE *file = &rli->info_file;
3467
3266
  char buff[FN_REFLEN*2+22*2+4], *pos;
3481
3280
    error=1;
3482
3281
 
3483
3282
  /* Flushing the relay log is done by the slave I/O thread */
3484
 
  DBUG_RETURN(error);
 
3283
  return(error);
3485
3284
}
3486
3285
 
3487
3286
 
3491
3290
 
3492
3291
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3493
3292
{
3494
 
  DBUG_ENTER("reopen_relay_log");
3495
 
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
3496
 
  DBUG_ASSERT(rli->cur_log_fd == -1);
 
3293
  assert(rli->cur_log != &rli->cache_buf);
 
3294
  assert(rli->cur_log_fd == -1);
3497
3295
 
3498
3296
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3499
3297
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3500
3298
                                   errmsg)) <0)
3501
 
    DBUG_RETURN(0);
 
3299
    return(0);
3502
3300
  /*
3503
3301
    We want to start exactly where we was before:
3504
3302
    relay_log_pos       Current log pos
3506
3304
  */
3507
3305
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
3508
3306
  my_b_seek(cur_log,rli->event_relay_log_pos);
3509
 
  DBUG_RETURN(cur_log);
 
3307
  return(cur_log);
3510
3308
}
3511
3309
 
3512
3310
 
3517
3315
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3518
3316
  const char* errmsg=0;
3519
3317
  THD* thd = rli->sql_thd;
3520
 
  DBUG_ENTER("next_event");
3521
 
 
3522
 
  DBUG_ASSERT(thd != 0);
3523
 
 
3524
 
#ifndef DBUG_OFF
 
3318
 
 
3319
  assert(thd != 0);
 
3320
 
3525
3321
  if (abort_slave_event_count && !rli->events_till_abort--)
3526
 
    DBUG_RETURN(0);
3527
 
#endif
 
3322
    return(0);
3528
3323
 
3529
3324
  /*
3530
3325
    For most operations we need to protect rli members with data_lock,
3551
3346
    bool hot_log;
3552
3347
    if ((hot_log = (cur_log != &rli->cache_buf)))
3553
3348
    {
3554
 
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
 
3349
      assert(rli->cur_log_fd == -1); // foreign descriptor
3555
3350
      pthread_mutex_lock(log_lock);
3556
3351
 
3557
3352
      /*
3575
3370
    */
3576
3371
    if (!my_b_inited(cur_log))
3577
3372
      goto err;
3578
 
#ifndef DBUG_OFF
3579
 
    {
3580
 
      /* This is an assertion which sometimes fails, let's try to track it */
3581
 
      char llbuf1[22], llbuf2[22];
3582
 
      DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
3583
 
                          llstr(my_b_tell(cur_log),llbuf1),
3584
 
                          llstr(rli->event_relay_log_pos,llbuf2)));
3585
 
      DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3586
 
      DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
3587
 
    }
3588
 
#endif
 
3373
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
 
3374
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
 
3375
 
3589
3376
    /*
3590
3377
      Relay log is always in new format - if the master is 3.23, the
3591
3378
      I/O thread will convert the format for us.
3602
3389
                                      rli->relay_log.description_event_for_exec)))
3603
3390
 
3604
3391
    {
3605
 
      DBUG_ASSERT(thd==rli->sql_thd);
 
3392
      assert(thd==rli->sql_thd);
3606
3393
      /*
3607
3394
        read it while we have a lock, to avoid a mutex lock in
3608
3395
        inc_event_relay_log_pos()
3610
3397
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
3611
3398
      if (hot_log)
3612
3399
        pthread_mutex_unlock(log_lock);
3613
 
      DBUG_RETURN(ev);
 
3400
      return(ev);
3614
3401
    }
3615
 
    DBUG_ASSERT(thd==rli->sql_thd);
 
3402
    assert(thd==rli->sql_thd);
3616
3403
    if (opt_reckless_slave)                     // For mysql-test
3617
3404
      cur_log->error = 0;
3618
3405
    if (cur_log->error < 0)
3655
3442
        time_t save_timestamp= rli->last_master_timestamp;
3656
3443
        rli->last_master_timestamp= 0;
3657
3444
 
3658
 
        DBUG_ASSERT(rli->relay_log.get_open_count() ==
 
3445
        assert(rli->relay_log.get_open_count() ==
3659
3446
                    rli->cur_log_old_open_count);
3660
3447
 
3661
3448
        if (rli->ign_master_log_name_end[0])
3662
3449
        {
3663
3450
          /* We generate and return a Rotate, to make our positions advance */
3664
 
          DBUG_PRINT("info",("seeing an ignored end segment"));
3665
3451
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
3666
3452
                                   0, rli->ign_master_log_pos_end,
3667
3453
                                   Rotate_log_event::DUP_NAME);
3674
3460
            goto err;
3675
3461
          }
3676
3462
          ev->server_id= 0; // don't be ignored by slave SQL thread
3677
 
          DBUG_RETURN(ev);
 
3463
          return(ev);
3678
3464
        }
3679
3465
 
3680
3466
        /*
3731
3517
        cases separately after doing some common initialization
3732
3518
      */
3733
3519
      end_io_cache(cur_log);
3734
 
      DBUG_ASSERT(rli->cur_log_fd >= 0);
 
3520
      assert(rli->cur_log_fd >= 0);
3735
3521
      my_close(rli->cur_log_fd, MYF(MY_WME));
3736
3522
      rli->cur_log_fd = -1;
3737
3523
 
3787
3573
        To guard against this, we need to have LOCK_log.
3788
3574
      */
3789
3575
 
3790
 
      DBUG_PRINT("info",("hot_log: %d",hot_log));
3791
3576
      if (!hot_log) /* if hot_log, we already have this mutex */
3792
3577
        pthread_mutex_lock(log_lock);
3793
3578
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
3799
3584
#endif
3800
3585
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
3801
3586
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3802
 
        DBUG_ASSERT(rli->cur_log_fd == -1);
 
3587
        assert(rli->cur_log_fd == -1);
3803
3588
 
3804
3589
        /*
3805
3590
          Read pointer has to be at the start since we are the only
3854
3639
  {
3855
3640
    sql_print_information("Error reading relay log event: %s",
3856
3641
                          "slave SQL thread was killed");
3857
 
    DBUG_RETURN(0);
 
3642
    return(0);
3858
3643
  }
3859
3644
 
3860
3645
err:
3861
3646
  if (errmsg)
3862
3647
    sql_print_error("Error reading relay log event: %s", errmsg);
3863
 
  DBUG_RETURN(0);
 
3648
  return(0);
3864
3649
}
3865
3650
 
3866
3651
/*
3873
3658
 
3874
3659
void rotate_relay_log(Master_info* mi)
3875
3660
{
3876
 
  DBUG_ENTER("rotate_relay_log");
3877
3661
  Relay_log_info* rli= &mi->rli;
3878
3662
 
3879
3663
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
3885
3669
  */
3886
3670
  if (!rli->inited)
3887
3671
  {
3888
 
    DBUG_PRINT("info", ("rli->inited == 0"));
3889
3672
    goto end;
3890
3673
  }
3891
3674
 
3908
3691
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3909
3692
end:
3910
3693
  pthread_mutex_unlock(&mi->run_lock);
3911
 
  DBUG_VOID_RETURN;
 
3694
  return;
3912
3695
}
3913
3696
 
3914
3697
 
3918
3701
   @param rli Relay_log_info which tells the master's version
3919
3702
   @param bug_id Number of the bug as found in bugs.mysql.com
3920
3703
   @param report bool report error message, default TRUE
3921
 
   @return TRUE if master has the bug, FALSE if it does not.
 
3704
   @return true if master has the bug, FALSE if it does not.
3922
3705
*/
3923
3706
bool rpl_master_has_bug(Relay_log_info *rli, uint bug_id, bool report)
3924
3707
{
3937
3720
  const uchar *master_ver=
3938
3721
    rli->relay_log.description_event_for_exec->server_version_split;
3939
3722
 
3940
 
  DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
 
3723
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3941
3724
 
3942
3725
  for (uint i= 0;
3943
3726
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3949
3732
        (memcmp(fixed_in,      master_ver, 3) >  0))
3950
3733
    {
3951
3734
      if (!report)
3952
 
        return TRUE;
 
3735
        return true;
3953
3736
      
3954
3737
      // a short message for SHOW SLAVE STATUS (message length constraints)
3955
3738
      my_printf_error(ER_UNKNOWN_ERROR, "master may suffer from"
3976
3759
                      rli->relay_log.description_event_for_exec->server_version,
3977
3760
                      bug_id,
3978
3761
                      fixed_in[0], fixed_in[1], fixed_in[2]);
3979
 
      return TRUE;
 
3762
      return true;
3980
3763
    }
3981
3764
  }
3982
 
  return FALSE;
 
3765
  return false;
3983
3766
}
3984
3767
 
3985
3768
/**
3997
3780
  if (active_mi && active_mi->rli.sql_thd == thd)
3998
3781
  {
3999
3782
    Relay_log_info *rli= &active_mi->rli;
4000
 
    return rpl_master_has_bug(rli, 33029, FALSE);
 
3783
    return rpl_master_has_bug(rli, 33029, false);
4001
3784
  }
4002
 
  return FALSE;
 
3785
  return false;
4003
3786
}
4004
3787
 
4005
3788
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION