13
13
along with this program; if not, write to the Free Software
14
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <drizzled/server_includes.h>
18
#if TIME_WITH_SYS_TIME
19
# include <sys/time.h>
23
# include <sys/time.h>
16
#include "mysql_priv.h"
30
18
#include "rpl_mi.h"
31
19
#include "rpl_rli.h"
32
21
#include "sql_repl.h" // For check_binlog_magic
33
22
#include "rpl_utility.h"
35
#include <libdrizzle/gettext.h>
37
static int32_t count_relay_log_space(Relay_log_info* rli);
24
static int count_relay_log_space(Relay_log_info* rli);
39
26
// Defined in slave.cc
40
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
41
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
27
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
28
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
42
29
const char *default_val);
45
32
Relay_log_info::Relay_log_info()
46
33
:Slave_reporting_capability("SQL"),
47
no_storage(false), replicate_same_server_id(::replicate_same_server_id),
34
no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
48
35
info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
49
#if defined(HAVE_purify) && HAVE_purify
52
39
cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
53
40
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
58
45
tables_to_lock(0), tables_to_lock_count(0),
59
46
last_event_start_time(0), m_flags(0)
48
DBUG_ENTER("Relay_log_info::Relay_log_info");
61
50
group_relay_log_name[0]= event_relay_log_name[0]=
62
51
group_master_log_name[0]= 0;
63
52
until_log_name[0]= ign_master_log_name_end[0]= 0;
64
memset(&info_file, 0, sizeof(info_file));
65
memset(&cache_buf, 0, sizeof(cache_buf));
53
bzero((char*) &info_file, sizeof(info_file));
54
bzero((char*) &cache_buf, sizeof(cache_buf));
55
cached_charset_invalidate();
66
56
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
67
57
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
68
58
pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
85
77
pthread_cond_destroy(&stop_cond);
86
78
pthread_cond_destroy(&log_space_cond);
87
79
relay_log.cleanup();
92
int32_t init_relay_log_info(Relay_log_info* rli,
84
int init_relay_log_info(Relay_log_info* rli,
93
85
const char* info_fname)
95
87
char fname[FN_REFLEN+128];
97
89
const char* msg = 0;
99
assert(!rli->no_storage); // Don't init if there is no storage
91
DBUG_ENTER("init_relay_log_info");
92
DBUG_ASSERT(!rli->no_storage); // Don't init if there is no storage
101
94
if (rli->inited) // Set if this function called
103
96
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
104
97
pthread_mutex_lock(&rli->data_lock);
105
98
info_fd = rli->info_fd;
159
152
max_binlog_size), 1))
161
154
pthread_mutex_unlock(&rli->data_lock);
162
sql_print_error(_("Failed in open_log() called from "
163
"init_relay_log_info()"));
155
sql_print_error("Failed in open_log() called from init_relay_log_info()");
168
160
/* if file does not exist */
169
161
if (access(fname,F_OK))
171
/* Create a new file */
164
If someone removed the file from underneath our feet, just close
165
the old descriptor and re-create the old file
168
my_close(info_fd, MYF(MY_WME));
169
if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
171
sql_print_error("Failed to create a new relay log info file (\
172
file '%s', errno %d)", fname, my_errno);
173
msg= current_thd->main_da.message();
176
if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
179
sql_print_error("Failed to create a cache on relay log info file '%s'",
181
msg= current_thd->main_da.message();
185
/* Init relay log with first entry in the relay index file */
186
if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
189
sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
192
rli->group_master_log_name[0]= 0;
193
rli->group_master_log_pos= 0;
194
rli->info_fd= info_fd;
173
196
else // file exists
175
/* Open up fname here and pull out the relay.info data */
199
reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
203
if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
206
Failed to open the existing relay log info file '%s' (errno %d)",
210
else if (init_io_cache(&rli->info_file, info_fd,
211
IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
213
sql_print_error("Failed to create a cache on relay log info file '%s'",
220
my_close(info_fd, MYF(0));
222
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
223
pthread_mutex_unlock(&rli->data_lock);
228
rli->info_fd = info_fd;
229
int relay_log_pos, master_log_pos;
230
if (init_strvar_from_file(rli->group_relay_log_name,
231
sizeof(rli->group_relay_log_name),
232
&rli->info_file, "") ||
233
init_intvar_from_file(&relay_log_pos,
234
&rli->info_file, BIN_LOG_HEADER_SIZE) ||
235
init_strvar_from_file(rli->group_master_log_name,
236
sizeof(rli->group_master_log_name),
237
&rli->info_file, "") ||
238
init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
240
msg="Error reading slave log configuration";
243
strmake(rli->event_relay_log_name,rli->group_relay_log_name,
244
sizeof(rli->event_relay_log_name)-1);
245
rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
246
rli->group_master_log_pos= master_log_pos;
248
if (init_relay_log_pos(rli,
249
rli->group_relay_log_name,
250
rli->group_relay_log_pos,
255
sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
256
rli->group_relay_log_name,
257
llstr(rli->group_relay_log_pos, llbuf));
264
char llbuf1[22], llbuf2[22];
265
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
266
llstr(my_b_tell(rli->cur_log),llbuf1),
267
llstr(rli->event_relay_log_pos,llbuf2)));
268
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
269
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
179
274
Now change the cache from READ to WRITE - must do this
199
294
rli->info_fd= -1;
200
295
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
201
296
pthread_mutex_unlock(&rli->data_lock);
206
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
301
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
304
DBUG_ENTER("add_relay_log");
209
305
if (stat(linfo->log_file_name,&s))
211
sql_print_error(_("log %s listed in the index, but failed to stat"),
307
sql_print_error("log %s listed in the index, but failed to stat",
212
308
linfo->log_file_name);
215
311
rli->log_space_total += s.st_size;
314
DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
220
static int32_t count_relay_log_space(Relay_log_info* rli)
320
static int count_relay_log_space(Relay_log_info* rli)
323
DBUG_ENTER("count_relay_log_space");
223
324
rli->log_space_total= 0;
224
if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
325
if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
226
sql_print_error(_("Could not find first log while counting relay "
327
sql_print_error("Could not find first log while counting relay log space");
232
332
if (add_relay_log(rli,&linfo))
234
334
} while (!rli->relay_log.find_next_log(&linfo, 1));
236
336
As we have counted everything, including what may have written in a
291
393
1 error. errmsg is set to point to the error message
294
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
295
uint64_t pos, bool need_data_lock,
396
int init_relay_log_pos(Relay_log_info* rli,const char* log,
397
ulonglong pos, bool need_data_lock,
296
398
const char** errmsg,
297
399
bool look_for_description_event)
401
DBUG_ENTER("init_relay_log_pos");
402
DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
300
405
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
483
603
before reaching the desired log/position
486
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
606
int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
490
int32_t event_count = 0;
491
uint32_t init_abort_pos_wait;
611
ulong init_abort_pos_wait;
493
613
struct timespec abstime; // for timeout checking
615
DBUG_ENTER("Relay_log_info::wait_for_pos");
620
DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
621
log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
499
623
set_timespec(abstime,timeout);
500
624
pthread_mutex_lock(&data_lock);
521
645
handle all possible log names comparisons (e.g. 999 vs 1000).
522
We use uint32_t for string->number conversion ; this is no
646
We use ulong for string->number conversion ; this is no
523
647
stronger limitation than in find_uniq_filename in sql/log.cc
525
uint32_t log_name_extension;
649
ulong log_name_extension;
526
650
char log_name_tmp[FN_REFLEN]; //make a char[] from String
528
strmake(log_name_tmp, log_name->ptr(), cmin(log_name->length(), (uint32_t)FN_REFLEN-1));
652
strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
530
654
char *p= fn_ext(log_name_tmp);
581
711
if the names do not match up to '.' included, return error
583
713
char *q= (char*)(fn_ext(basename)+1);
584
if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
714
if (strncmp(basename, log_name_tmp, (int)(q-basename)))
589
719
// Now compare extensions.
591
uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
721
ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
592
722
if (group_master_log_name_extension < log_name_extension)
595
725
cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
597
pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
727
pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
599
729
if (pos_reached || thd->killed)
765
DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
636
769
thd->exit_cond(msg);
770
DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
771
improper_arguments: %d timed_out: %d",
773
(int) (init_abort_pos_wait != abort_pos_wait),
776
(int) (error == -1)));
637
777
if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
642
return( error ? error : event_count );
782
DBUG_RETURN( error ? error : event_count );
646
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
786
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
789
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
650
792
pthread_mutex_lock(&data_lock);
651
793
inc_event_relay_log_pos();
652
794
group_relay_log_pos= event_relay_log_pos;
653
group_relay_log_name.assign(event_relay_log_name);
795
strmake(group_relay_log_name,event_relay_log_name,
796
sizeof(group_relay_log_name)-1);
655
798
notify_group_relay_log_name_update();
832
987
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
834
989
const char *log_name;
991
DBUG_ENTER("Relay_log_info::is_until_satisfied");
837
assert(until_condition != UNTIL_NONE);
993
DBUG_ASSERT(until_condition != UNTIL_NONE);
839
995
if (until_condition == UNTIL_MASTER_POS)
841
log_name= group_master_log_name.c_str();
997
log_name= group_master_log_name;
842
998
log_pos= master_beg_pos;
845
1001
{ /* until_condition == UNTIL_RELAY_POS */
846
log_name= group_relay_log_name.c_str();
1002
log_name= group_relay_log_name;
847
1003
log_pos= group_relay_log_pos;
1009
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1010
group_master_log_name, llstr(group_master_log_pos, buf)));
1011
DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1012
group_relay_log_name, llstr(group_relay_log_pos, buf)));
1013
DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1014
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1015
log_name, llstr(log_pos, buf)));
1016
DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1017
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1018
until_log_name, llstr(until_log_pos, buf)));
850
1022
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
879
1051
/* Probably error so we aborting */
880
sql_print_error(_("Slave SQL thread is stopped because UNTIL "
881
"condition is bad."));
1052
sql_print_error("Slave SQL thread is stopped because UNTIL "
1053
"condition is bad.");
886
return(until_log_pos == 0);
1058
DBUG_RETURN(until_log_pos == 0);
889
return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1061
DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
890
1062
log_pos >= until_log_pos) ||
891
1063
until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
1067
void Relay_log_info::cached_charset_invalidate()
1069
DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1071
/* Full of zeroes means uninitialized. */
1072
bzero(cached_charset, sizeof(cached_charset));
1077
bool Relay_log_info::cached_charset_compare(char *charset) const
1079
DBUG_ENTER("Relay_log_info::cached_charset_compare");
1081
if (bcmp((uchar*) cached_charset, (uchar*) charset,
1082
sizeof(cached_charset)))
1084
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
895
1091
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
896
1092
time_t event_creation_time)
898
extern uint32_t debug_not_change_ts_if_art_event;
1095
extern uint debug_not_change_ts_if_art_event;
899
1097
clear_flag(IN_STMT);
935
1133
is that value may take some time to display in
936
1134
Seconds_Behind_Master - not critical).
938
1137
if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
939
last_master_timestamp= event_creation_time;
1139
if (event_creation_time != 0)
1141
last_master_timestamp= event_creation_time;
1145
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
943
1146
void Relay_log_info::cleanup_context(THD *thd, bool error)
945
assert(sql_thd == thd);
1148
DBUG_ENTER("Relay_log_info::cleanup_context");
1150
DBUG_ASSERT(sql_thd == thd);
947
1152
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
948
1153
may have opened tables, which we cannot be sure have been closed (because
970
1175
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
971
1176
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
972
1177
last_event_start_time= 0;
976
1181
void Relay_log_info::clear_tables_to_lock()
978
1183
while (tables_to_lock)
980
unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
1185
uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
981
1186
if (tables_to_lock->m_tabledef_valid)
983
1188
tables_to_lock->m_tabledef.table_def::~table_def();
984
tables_to_lock->m_tabledef_valid= false;
1189
tables_to_lock->m_tabledef_valid= FALSE;
987
static_cast<RPL_TableList*>(tables_to_lock->next_global);
1192
static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
988
1193
tables_to_lock_count--;
1194
my_free(to_free, MYF(MY_WME));
991
assert(tables_to_lock == NULL && tables_to_lock_count == 0);
1196
DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);