13
13
along with this program; if not, write to the Free Software
14
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <drizzled/server_includes.h>
16
#include "mysql_priv.h"
18
18
#include "rpl_mi.h"
19
19
#include "rpl_rli.h"
20
21
#include "sql_repl.h" // For check_binlog_magic
21
22
#include "rpl_utility.h"
23
#include <libdrizzle/gettext.h>
25
static int32_t count_relay_log_space(Relay_log_info* rli);
24
static int count_relay_log_space(Relay_log_info* rli);
27
26
// Defined in slave.cc
28
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
29
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
27
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
28
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
30
29
const char *default_val);
33
32
Relay_log_info::Relay_log_info()
34
33
:Slave_reporting_capability("SQL"),
35
no_storage(false), replicate_same_server_id(::replicate_same_server_id),
34
no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
36
35
info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
37
#if defined(HAVE_purify) && HAVE_purify
40
39
cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
41
40
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
46
45
tables_to_lock(0), tables_to_lock_count(0),
47
46
last_event_start_time(0), m_flags(0)
48
DBUG_ENTER("Relay_log_info::Relay_log_info");
49
50
group_relay_log_name[0]= event_relay_log_name[0]=
50
51
group_master_log_name[0]= 0;
51
52
until_log_name[0]= ign_master_log_name_end[0]= 0;
52
memset(&info_file, 0, sizeof(info_file));
53
memset(&cache_buf, 0, sizeof(cache_buf));
53
bzero((char*) &info_file, sizeof(info_file));
54
bzero((char*) &cache_buf, sizeof(cache_buf));
54
55
cached_charset_invalidate();
55
56
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
56
57
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
60
61
pthread_cond_init(&stop_cond, NULL);
61
62
pthread_cond_init(&log_space_cond, NULL);
62
63
relay_log.init_pthread_objects();
67
68
Relay_log_info::~Relay_log_info()
70
DBUG_ENTER("Relay_log_info::~Relay_log_info");
69
72
pthread_mutex_destroy(&run_lock);
70
73
pthread_mutex_destroy(&data_lock);
71
74
pthread_mutex_destroy(&log_space_lock);
74
77
pthread_cond_destroy(&stop_cond);
75
78
pthread_cond_destroy(&log_space_cond);
76
79
relay_log.cleanup();
81
int32_t init_relay_log_info(Relay_log_info* rli,
84
int init_relay_log_info(Relay_log_info* rli,
82
85
const char* info_fname)
84
87
char fname[FN_REFLEN+128];
86
89
const char* msg = 0;
88
assert(!rli->no_storage); // Don't init if there is no storage
91
DBUG_ENTER("init_relay_log_info");
92
DBUG_ASSERT(!rli->no_storage); // Don't init if there is no storage
90
94
if (rli->inited) // Set if this function called
92
96
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
93
97
pthread_mutex_lock(&rli->data_lock);
94
98
info_fd = rli->info_fd;
131
135
instead require a name. But as we don't want to break many existing
132
136
setups, we only give warning, not error.
134
sql_print_warning(_("Neither --relay-log nor --relay-log-index were used;"
138
sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
135
139
" so replication "
136
140
"may break when this MySQL server acts as a "
137
141
"slave and has his hostname changed!! Please "
138
"use '--relay-log=%s' to avoid this problem."), ln);
142
"use '--relay-log=%s' to avoid this problem.", ln);
139
143
name_warning_sent= 1;
165
168
my_close(info_fd, MYF(MY_WME));
166
169
if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
168
sql_print_error(_("Failed to create a new relay log info file "
169
"( file '%s', errno %d)"), fname, my_errno);
171
sql_print_error("Failed to create a new relay log info file (\
172
file '%s', errno %d)", fname, my_errno);
170
173
msg= current_thd->main_da.message();
173
176
if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
176
sql_print_error(_("Failed to create a cache on relay log info file '%s'"),
179
sql_print_error("Failed to create a cache on relay log info file '%s'",
178
181
msg= current_thd->main_da.message();
182
185
/* Init relay log with first entry in the relay index file */
183
if (init_relay_log_pos(rli,NULL,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
186
if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
186
sql_print_error(_("Failed to open the relay log 'FIRST' (relay_log_pos 4)"));
189
sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
189
192
rli->group_master_log_name[0]= 0;
196
199
reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
200
203
if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
202
sql_print_error(_("Failed to open the existing relay log info "
203
"file '%s' (errno %d)"),
206
Failed to open the existing relay log info file '%s' (errno %d)",
204
207
fname, my_errno);
207
210
else if (init_io_cache(&rli->info_file, info_fd,
208
211
IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
210
sql_print_error(_("Failed to create a cache on relay log info "
213
sql_print_error("Failed to create a cache on relay log info file '%s'",
219
221
rli->info_fd= -1;
220
222
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
221
223
pthread_mutex_unlock(&rli->data_lock);
226
228
rli->info_fd = info_fd;
227
int32_t relay_log_pos, master_log_pos;
229
int relay_log_pos, master_log_pos;
228
230
if (init_strvar_from_file(rli->group_relay_log_name,
229
231
sizeof(rli->group_relay_log_name),
230
232
&rli->info_file, "") ||
253
sql_print_error(_("Failed to open the relay log '%s' (relay_log_pos %s)"),
255
sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
254
256
rli->group_relay_log_name,
255
257
llstr(rli->group_relay_log_pos, llbuf));
259
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
260
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
264
char llbuf1[22], llbuf2[22];
265
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
266
llstr(my_b_tell(rli->cur_log),llbuf1),
267
llstr(rli->event_relay_log_pos,llbuf2)));
268
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
269
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
263
274
Now change the cache from READ to WRITE - must do this
266
277
reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
267
278
if ((error= flush_relay_log_info(rli)))
268
sql_print_error(_("Failed to flush relay log info file"));
279
sql_print_error("Failed to flush relay log info file");
269
280
if (count_relay_log_space(rli))
271
msg=_("Error counting relay log space");
282
msg="Error counting relay log space";
275
286
pthread_mutex_unlock(&rli->data_lock);
279
290
sql_print_error(msg);
283
294
rli->info_fd= -1;
284
295
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
285
296
pthread_mutex_unlock(&rli->data_lock);
290
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
301
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
304
DBUG_ENTER("add_relay_log");
293
305
if (stat(linfo->log_file_name,&s))
295
sql_print_error(_("log %s listed in the index, but failed to stat"),
307
sql_print_error("log %s listed in the index, but failed to stat",
296
308
linfo->log_file_name);
299
311
rli->log_space_total += s.st_size;
314
DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
304
static int32_t count_relay_log_space(Relay_log_info* rli)
320
static int count_relay_log_space(Relay_log_info* rli)
323
DBUG_ENTER("count_relay_log_space");
307
324
rli->log_space_total= 0;
308
if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
325
if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
310
sql_print_error(_("Could not find first log while counting relay "
327
sql_print_error("Could not find first log while counting relay log space");
316
332
if (add_relay_log(rli,&linfo))
318
334
} while (!rli->relay_log.find_next_log(&linfo, 1));
320
336
As we have counted everything, including what may have written in a
375
393
1 error. errmsg is set to point to the error message
378
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
379
uint64_t pos, bool need_data_lock,
396
int init_relay_log_pos(Relay_log_info* rli,const char* log,
397
ulonglong pos, bool need_data_lock,
380
398
const char** errmsg,
381
399
bool look_for_description_event)
401
DBUG_ENTER("init_relay_log_pos");
402
DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
384
405
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
420
441
Test to see if the previous run was with the skip of purging
421
442
If yes, we do not purge when we restart
423
if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
444
if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
425
446
*errmsg="Could not find first log during relay log initialization";
470
491
Read the possible Format_description_log_event; if position
471
492
was 4, no need, it will be read naturally.
494
DBUG_PRINT("info",("looking for a Format_description_log_event"));
473
496
if (my_b_tell(rli->cur_log) >= pos)
480
503
if (!(ev=Log_event::read_log_event(rli->cur_log,0,
481
504
rli->relay_log.description_event_for_exec)))
506
DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
507
rli->cur_log->error));
483
508
if (rli->cur_log->error) /* not EOF */
485
510
*errmsg= "I/O error reading event at position 4";
490
515
else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
517
DBUG_PRINT("info",("found Format_description_log_event"));
492
518
delete rli->relay_log.description_event_for_exec;
493
519
rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
543
DBUG_PRINT("info",("found event of another type=%d",
544
ev->get_type_code()));
517
545
look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
521
549
my_b_seek(rli->cur_log,(off_t)pos);
552
char llbuf1[22], llbuf2[22];
553
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
554
llstr(my_b_tell(rli->cur_log),llbuf1),
555
llstr(rli->event_relay_log_pos,llbuf2)));
538
574
if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
539
575
*errmsg= "Invalid Format_description log event; could be out of memory";
541
return ((*errmsg) ? 1 : 0);
577
DBUG_RETURN ((*errmsg) ? 1 : 0);
554
590
timeout timeout in seconds before giving up waiting
557
timeout is int64_t whereas it should be uint32_t ; but this is
593
timeout is longlong whereas it should be ulong ; but this is
558
594
to catch if the user submitted a negative timeout.
567
603
before reaching the desired log/position
570
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
606
int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
574
int32_t event_count = 0;
575
uint32_t init_abort_pos_wait;
611
ulong init_abort_pos_wait;
577
613
struct timespec abstime; // for timeout checking
615
DBUG_ENTER("Relay_log_info::wait_for_pos");
620
DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
621
log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
583
623
set_timespec(abstime,timeout);
584
624
pthread_mutex_lock(&data_lock);
605
645
handle all possible log names comparisons (e.g. 999 vs 1000).
606
We use uint32_t for string->number conversion ; this is no
646
We use ulong for string->number conversion ; this is no
607
647
stronger limitation than in find_uniq_filename in sql/log.cc
609
uint32_t log_name_extension;
649
ulong log_name_extension;
610
650
char log_name_tmp[FN_REFLEN]; //make a char[] from String
612
strmake(log_name_tmp, log_name->ptr(), cmin(log_name->length(), (uint32_t)FN_REFLEN-1));
652
strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
614
654
char *p= fn_ext(log_name_tmp);
621
661
// Convert 0-3 to 4
622
log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
662
log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
623
663
/* p points to '.' */
624
664
log_name_extension= strtoul(++p, &p_end, 10);
641
681
bool pos_reached;
642
int32_t cmp_result= 0;
685
("init_abort_pos_wait: %ld abort_pos_wait: %ld",
686
init_abort_pos_wait, abort_pos_wait));
687
DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
688
group_master_log_name, (ulong) group_master_log_pos));
645
691
group_master_log_name can be "", if we are just after a fresh
665
711
if the names do not match up to '.' included, return error
667
713
char *q= (char*)(fn_ext(basename)+1);
668
if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
714
if (strncmp(basename, log_name_tmp, (int)(q-basename)))
673
719
// Now compare extensions.
675
uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
721
ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
676
722
if (group_master_log_name_extension < log_name_extension)
679
725
cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
681
pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
727
pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
683
729
if (pos_reached || thd->killed)
765
DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
720
769
thd->exit_cond(msg);
770
DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
771
improper_arguments: %d timed_out: %d",
773
(int) (init_abort_pos_wait != abort_pos_wait),
776
(int) (error == -1)));
721
777
if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
726
return( error ? error : event_count );
782
DBUG_RETURN( error ? error : event_count );
730
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
786
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
789
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
734
792
pthread_mutex_lock(&data_lock);
735
793
inc_event_relay_log_pos();
769
827
the relay log is not "val".
770
828
With the end_log_pos solution, we avoid computations involving lengthes.
830
DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
831
(long) log_pos, (long) group_master_log_pos));
772
832
if (log_pos) // 3.23 binlogs don't have log_posx
774
834
group_master_log_pos= log_pos;
776
836
pthread_cond_broadcast(&data_cond);
778
838
pthread_mutex_unlock(&data_lock);
783
843
void Relay_log_info::close_temporary_tables()
846
DBUG_ENTER("Relay_log_info::close_temporary_tables");
787
848
for (table=save_temporary_tables ; table ; table=next)
791
852
Don't ask for disk deletion. For now, anyway they will be deleted when
792
853
slave restarts, but it is a better intention to not delete them.
855
DBUG_PRINT("info", ("table: 0x%lx", (long) table));
794
856
close_temporary(table, 1, 0);
796
858
save_temporary_tables= 0;
797
859
slave_open_temp_tables= 0;
805
867
Assumes to have a run lock on rli and that no slave thread are running.
808
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
870
int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
809
871
const char** errmsg)
874
DBUG_ENTER("purge_relay_logs");
814
877
Even if rli->inited==0, we still try to empty rli->master_log_* variables.
834
897
if (!rli->inited)
899
DBUG_PRINT("info", ("rli->inited == 0"));
839
assert(rli->slave_running == 0);
840
assert(rli->mi->slave_running == 0);
903
DBUG_ASSERT(rli->slave_running == 0);
904
DBUG_ASSERT(rli->mi->slave_running == 0);
842
906
rli->slave_skip_counter=0;
843
907
pthread_mutex_lock(&rli->data_lock);
878
942
0 /* do not need data lock */, errmsg, 0);
948
DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
881
949
pthread_mutex_unlock(&rli->data_lock);
919
987
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
921
989
const char *log_name;
991
DBUG_ENTER("Relay_log_info::is_until_satisfied");
924
assert(until_condition != UNTIL_NONE);
993
DBUG_ASSERT(until_condition != UNTIL_NONE);
926
995
if (until_condition == UNTIL_MASTER_POS)
934
1003
log_pos= group_relay_log_pos;
1009
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1010
group_master_log_name, llstr(group_master_log_pos, buf)));
1011
DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1012
group_relay_log_name, llstr(group_relay_log_pos, buf)));
1013
DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1014
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1015
log_name, llstr(log_pos, buf)));
1016
DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1017
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1018
until_log_name, llstr(until_log_pos, buf)));
937
1022
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
949
1034
const char *basename= log_name + dirname_length(log_name);
951
1036
const char *q= (const char*)(fn_ext(basename)+1);
952
if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
1037
if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
954
1039
/* Now compare extensions. */
956
uint32_t log_name_extension= strtoul(q, &q_end, 10);
1041
ulong log_name_extension= strtoul(q, &q_end, 10);
957
1042
if (log_name_extension < until_log_name_extension)
958
1043
until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
966
1051
/* Probably error so we aborting */
967
sql_print_error(_("Slave SQL thread is stopped because UNTIL "
968
"condition is bad."));
1052
sql_print_error("Slave SQL thread is stopped because UNTIL "
1053
"condition is bad.");
973
return(until_log_pos == 0);
1058
DBUG_RETURN(until_log_pos == 0);
976
return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1061
DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
977
1062
log_pos >= until_log_pos) ||
978
1063
until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
982
1067
void Relay_log_info::cached_charset_invalidate()
1069
DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
984
1071
/* Full of zeroes means uninitialized. */
985
memset(cached_charset, 0, sizeof(cached_charset));
1072
bzero(cached_charset, sizeof(cached_charset));
990
1077
bool Relay_log_info::cached_charset_compare(char *charset) const
992
if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1079
DBUG_ENTER("Relay_log_info::cached_charset_compare");
1081
if (bcmp((uchar*) cached_charset, (uchar*) charset,
1082
sizeof(cached_charset)))
994
1084
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1001
1091
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
1002
1092
time_t event_creation_time)
1004
extern uint32_t debug_not_change_ts_if_art_event;
1095
extern uint debug_not_change_ts_if_art_event;
1005
1097
clear_flag(IN_STMT);
1041
1133
is that value may take some time to display in
1042
1134
Seconds_Behind_Master - not critical).
1044
1137
if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
1045
last_master_timestamp= event_creation_time;
1139
if (event_creation_time != 0)
1141
last_master_timestamp= event_creation_time;
1145
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1049
1146
void Relay_log_info::cleanup_context(THD *thd, bool error)
1051
assert(sql_thd == thd);
1148
DBUG_ENTER("Relay_log_info::cleanup_context");
1150
DBUG_ASSERT(sql_thd == thd);
1053
1152
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1054
1153
may have opened tables, which we cannot be sure have been closed (because
1076
1175
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1077
1176
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1078
1177
last_event_start_time= 0;
1082
1181
void Relay_log_info::clear_tables_to_lock()
1084
1183
while (tables_to_lock)
1086
unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
1185
uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1087
1186
if (tables_to_lock->m_tabledef_valid)
1089
1188
tables_to_lock->m_tabledef.table_def::~table_def();
1090
tables_to_lock->m_tabledef_valid= false;
1189
tables_to_lock->m_tabledef_valid= FALSE;
1092
1191
tables_to_lock=
1093
static_cast<RPL_TableList*>(tables_to_lock->next_global);
1192
static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1094
1193
tables_to_lock_count--;
1194
my_free(to_free, MYF(MY_WME));
1097
assert(tables_to_lock == NULL && tables_to_lock_count == 0);
1196
DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);