21
21
#include "sql_repl.h" // For check_binlog_magic
22
22
#include "rpl_utility.h"
24
static int32_t count_relay_log_space(Relay_log_info* rli);
24
static int count_relay_log_space(Relay_log_info* rli);
26
26
// Defined in slave.cc
27
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
28
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
27
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
28
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
29
29
const char *default_val);
32
32
Relay_log_info::Relay_log_info()
33
33
:Slave_reporting_capability("SQL"),
34
no_storage(false), replicate_same_server_id(::replicate_same_server_id),
34
no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
35
35
info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
36
#if defined(HAVE_purify) && HAVE_purify
39
39
cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
40
40
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
45
45
tables_to_lock(0), tables_to_lock_count(0),
46
46
last_event_start_time(0), m_flags(0)
48
DBUG_ENTER("Relay_log_info::Relay_log_info");
48
50
group_relay_log_name[0]= event_relay_log_name[0]=
49
51
group_master_log_name[0]= 0;
50
52
until_log_name[0]= ign_master_log_name_end[0]= 0;
59
61
pthread_cond_init(&stop_cond, NULL);
60
62
pthread_cond_init(&log_space_cond, NULL);
61
63
relay_log.init_pthread_objects();
66
68
Relay_log_info::~Relay_log_info()
70
DBUG_ENTER("Relay_log_info::~Relay_log_info");
68
72
pthread_mutex_destroy(&run_lock);
69
73
pthread_mutex_destroy(&data_lock);
70
74
pthread_mutex_destroy(&log_space_lock);
73
77
pthread_cond_destroy(&stop_cond);
74
78
pthread_cond_destroy(&log_space_cond);
75
79
relay_log.cleanup();
80
int32_t init_relay_log_info(Relay_log_info* rli,
84
int init_relay_log_info(Relay_log_info* rli,
81
85
const char* info_fname)
83
87
char fname[FN_REFLEN+128];
85
89
const char* msg = 0;
87
assert(!rli->no_storage); // Don't init if there is no storage
91
DBUG_ENTER("init_relay_log_info");
92
DBUG_ASSERT(!rli->no_storage); // Don't init if there is no storage
89
94
if (rli->inited) // Set if this function called
91
96
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
92
97
pthread_mutex_lock(&rli->data_lock);
93
98
info_fd = rli->info_fd;
194
199
reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
198
203
if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
200
205
sql_print_error("\
216
221
rli->info_fd= -1;
217
222
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
218
223
pthread_mutex_unlock(&rli->data_lock);
223
228
rli->info_fd = info_fd;
224
int32_t relay_log_pos, master_log_pos;
229
int relay_log_pos, master_log_pos;
225
230
if (init_strvar_from_file(rli->group_relay_log_name,
226
231
sizeof(rli->group_relay_log_name),
227
232
&rli->info_file, "") ||
256
char llbuf1[22], llbuf2[22];
257
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
258
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
264
char llbuf1[22], llbuf2[22];
265
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
266
llstr(my_b_tell(rli->cur_log),llbuf1),
267
llstr(rli->event_relay_log_pos,llbuf2)));
268
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
269
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
261
274
Now change the cache from READ to WRITE - must do this
281
294
rli->info_fd= -1;
282
295
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
283
296
pthread_mutex_unlock(&rli->data_lock);
288
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
301
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
304
DBUG_ENTER("add_relay_log");
291
305
if (stat(linfo->log_file_name,&s))
293
307
sql_print_error("log %s listed in the index, but failed to stat",
294
308
linfo->log_file_name);
297
311
rli->log_space_total += s.st_size;
314
DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
302
static int32_t count_relay_log_space(Relay_log_info* rli)
320
static int count_relay_log_space(Relay_log_info* rli)
323
DBUG_ENTER("count_relay_log_space");
305
324
rli->log_space_total= 0;
306
325
if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
308
327
sql_print_error("Could not find first log while counting relay log space");
313
332
if (add_relay_log(rli,&linfo))
315
334
} while (!rli->relay_log.find_next_log(&linfo, 1));
317
336
As we have counted everything, including what may have written in a
372
393
1 error. errmsg is set to point to the error message
375
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
376
uint64_t pos, bool need_data_lock,
396
int init_relay_log_pos(Relay_log_info* rli,const char* log,
397
ulonglong pos, bool need_data_lock,
377
398
const char** errmsg,
378
399
bool look_for_description_event)
401
DBUG_ENTER("init_relay_log_pos");
402
DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
381
405
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
467
491
Read the possible Format_description_log_event; if position
468
492
was 4, no need, it will be read naturally.
494
DBUG_PRINT("info",("looking for a Format_description_log_event"));
470
496
if (my_b_tell(rli->cur_log) >= pos)
477
503
if (!(ev=Log_event::read_log_event(rli->cur_log,0,
478
504
rli->relay_log.description_event_for_exec)))
506
DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
507
rli->cur_log->error));
480
508
if (rli->cur_log->error) /* not EOF */
482
510
*errmsg= "I/O error reading event at position 4";
487
515
else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
517
DBUG_PRINT("info",("found Format_description_log_event"));
489
518
delete rli->relay_log.description_event_for_exec;
490
519
rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
543
DBUG_PRINT("info",("found event of another type=%d",
544
ev->get_type_code()));
514
545
look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
518
549
my_b_seek(rli->cur_log,(off_t)pos);
552
char llbuf1[22], llbuf2[22];
553
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
554
llstr(my_b_tell(rli->cur_log),llbuf1),
555
llstr(rli->event_relay_log_pos,llbuf2)));
535
574
if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
536
575
*errmsg= "Invalid Format_description log event; could be out of memory";
538
return ((*errmsg) ? 1 : 0);
577
DBUG_RETURN ((*errmsg) ? 1 : 0);
551
590
timeout timeout in seconds before giving up waiting
554
timeout is longlong whereas it should be uint32_t ; but this is
593
timeout is longlong whereas it should be ulong ; but this is
555
594
to catch if the user submitted a negative timeout.
564
603
before reaching the desired log/position
567
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
606
int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
568
607
longlong log_pos,
569
608
longlong timeout)
571
int32_t event_count = 0;
572
uint32_t init_abort_pos_wait;
611
ulong init_abort_pos_wait;
574
613
struct timespec abstime; // for timeout checking
615
DBUG_ENTER("Relay_log_info::wait_for_pos");
620
DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
621
log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
580
623
set_timespec(abstime,timeout);
581
624
pthread_mutex_lock(&data_lock);
602
645
handle all possible log names comparisons (e.g. 999 vs 1000).
603
We use uint32_t for string->number conversion ; this is no
646
We use ulong for string->number conversion ; this is no
604
647
stronger limitation than in find_uniq_filename in sql/log.cc
606
uint32_t log_name_extension;
649
ulong log_name_extension;
607
650
char log_name_tmp[FN_REFLEN]; //make a char[] from String
609
652
strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
638
681
bool pos_reached;
639
int32_t cmp_result= 0;
685
("init_abort_pos_wait: %ld abort_pos_wait: %ld",
686
init_abort_pos_wait, abort_pos_wait));
687
DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
688
group_master_log_name, (ulong) group_master_log_pos));
642
691
group_master_log_name can be "", if we are just after a fresh
662
711
if the names do not match up to '.' included, return error
664
713
char *q= (char*)(fn_ext(basename)+1);
665
if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
714
if (strncmp(basename, log_name_tmp, (int)(q-basename)))
670
719
// Now compare extensions.
672
uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
721
ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
673
722
if (group_master_log_name_extension < log_name_extension)
676
725
cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
678
pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
727
pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
680
729
if (pos_reached || thd->killed)
765
DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
717
769
thd->exit_cond(msg);
770
DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
771
improper_arguments: %d timed_out: %d",
773
(int) (init_abort_pos_wait != abort_pos_wait),
776
(int) (error == -1)));
718
777
if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
723
return( error ? error : event_count );
782
DBUG_RETURN( error ? error : event_count );
727
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
786
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
789
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
731
792
pthread_mutex_lock(&data_lock);
732
793
inc_event_relay_log_pos();
766
827
the relay log is not "val".
767
828
With the end_log_pos solution, we avoid computations involving lengthes.
830
DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
831
(long) log_pos, (long) group_master_log_pos));
769
832
if (log_pos) // 3.23 binlogs don't have log_posx
771
834
group_master_log_pos= log_pos;
773
836
pthread_cond_broadcast(&data_cond);
775
838
pthread_mutex_unlock(&data_lock);
780
843
void Relay_log_info::close_temporary_tables()
782
845
TABLE *table,*next;
846
DBUG_ENTER("Relay_log_info::close_temporary_tables");
784
848
for (table=save_temporary_tables ; table ; table=next)
788
852
Don't ask for disk deletion. For now, anyway they will be deleted when
789
853
slave restarts, but it is a better intention to not delete them.
855
DBUG_PRINT("info", ("table: 0x%lx", (long) table));
791
856
close_temporary(table, 1, 0);
793
858
save_temporary_tables= 0;
794
859
slave_open_temp_tables= 0;
802
867
Assumes to have a run lock on rli and that no slave thread are running.
805
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
870
int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
806
871
const char** errmsg)
874
DBUG_ENTER("purge_relay_logs");
811
877
Even if rli->inited==0, we still try to empty rli->master_log_* variables.
831
897
if (!rli->inited)
899
DBUG_PRINT("info", ("rli->inited == 0"));
836
assert(rli->slave_running == 0);
837
assert(rli->mi->slave_running == 0);
903
DBUG_ASSERT(rli->slave_running == 0);
904
DBUG_ASSERT(rli->mi->slave_running == 0);
839
906
rli->slave_skip_counter=0;
840
907
pthread_mutex_lock(&rli->data_lock);
875
942
0 /* do not need data lock */, errmsg, 0);
948
DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
878
949
pthread_mutex_unlock(&rli->data_lock);
916
987
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
918
989
const char *log_name;
991
DBUG_ENTER("Relay_log_info::is_until_satisfied");
921
assert(until_condition != UNTIL_NONE);
993
DBUG_ASSERT(until_condition != UNTIL_NONE);
923
995
if (until_condition == UNTIL_MASTER_POS)
931
1003
log_pos= group_relay_log_pos;
1009
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1010
group_master_log_name, llstr(group_master_log_pos, buf)));
1011
DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1012
group_relay_log_name, llstr(group_relay_log_pos, buf)));
1013
DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1014
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1015
log_name, llstr(log_pos, buf)));
1016
DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1017
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1018
until_log_name, llstr(until_log_pos, buf)));
934
1022
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
946
1034
const char *basename= log_name + dirname_length(log_name);
948
1036
const char *q= (const char*)(fn_ext(basename)+1);
949
if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
1037
if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
951
1039
/* Now compare extensions. */
953
uint32_t log_name_extension= strtoul(q, &q_end, 10);
1041
ulong log_name_extension= strtoul(q, &q_end, 10);
954
1042
if (log_name_extension < until_log_name_extension)
955
1043
until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
963
1051
/* Probably error so we aborting */
964
1052
sql_print_error("Slave SQL thread is stopped because UNTIL "
965
1053
"condition is bad.");
970
return(until_log_pos == 0);
1058
DBUG_RETURN(until_log_pos == 0);
973
return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1061
DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
974
1062
log_pos >= until_log_pos) ||
975
1063
until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
979
1067
void Relay_log_info::cached_charset_invalidate()
1069
DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
981
1071
/* Full of zeroes means uninitialized. */
982
1072
bzero(cached_charset, sizeof(cached_charset));
987
1077
bool Relay_log_info::cached_charset_compare(char *charset) const
1079
DBUG_ENTER("Relay_log_info::cached_charset_compare");
989
1081
if (bcmp((uchar*) cached_charset, (uchar*) charset,
990
1082
sizeof(cached_charset)))
992
1084
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
999
1091
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
1000
1092
time_t event_creation_time)
1002
extern uint32_t debug_not_change_ts_if_art_event;
1095
extern uint debug_not_change_ts_if_art_event;
1003
1097
clear_flag(IN_STMT);
1039
1133
is that value may take some time to display in
1040
1134
Seconds_Behind_Master - not critical).
1042
1137
if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
1043
last_master_timestamp= event_creation_time;
1139
if (event_creation_time != 0)
1141
last_master_timestamp= event_creation_time;
1047
1145
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1048
1146
void Relay_log_info::cleanup_context(THD *thd, bool error)
1050
assert(sql_thd == thd);
1148
DBUG_ENTER("Relay_log_info::cleanup_context");
1150
DBUG_ASSERT(sql_thd == thd);
1052
1152
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1053
1153
may have opened tables, which we cannot be sure have been closed (because
1086
1186
if (tables_to_lock->m_tabledef_valid)
1088
1188
tables_to_lock->m_tabledef.table_def::~table_def();
1089
tables_to_lock->m_tabledef_valid= false;
1189
tables_to_lock->m_tabledef_valid= FALSE;
1091
1191
tables_to_lock=
1092
1192
static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1093
1193
tables_to_lock_count--;
1094
1194
my_free(to_free, MYF(MY_WME));
1096
assert(tables_to_lock == NULL && tables_to_lock_count == 0);
1196
DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);