121
121
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
123
static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
123
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
125
125
static bool wait_for_relay_log_space(Relay_log_info* rli);
126
126
static inline bool io_slave_killed(THD* thd,Master_info* mi);
127
127
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
128
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
130
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
128
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
static int32_t safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
130
static int32_t safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
131
131
bool suppress_warnings);
132
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
132
static int32_t connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
133
133
bool reconnect, bool suppress_warnings);
134
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
134
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
135
135
void* thread_killed_arg);
136
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
136
static int32_t get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
137
137
static Log_event* next_event(Relay_log_info* rli);
138
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
139
static int terminate_slave_thread(THD *thd,
138
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
139
static int32_t terminate_slave_thread(THD *thd,
140
140
pthread_mutex_t* term_lock,
141
141
pthread_cond_t* term_cond,
142
volatile uint *slave_running,
142
volatile uint32_t *slave_running,
144
144
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
161
161
If inverse == 1, stopped threads
164
void init_thread_mask(int* mask,Master_info* mi,bool inverse)
164
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
166
166
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
167
register int tmp_mask=0;
167
register int32_t tmp_mask=0;
170
170
tmp_mask |= SLAVE_IO;
295
295
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
297
297
if (err_code < MAX_SLAVE_ERROR)
298
bitmap_set_bit(&slave_error_mask,(uint)err_code);
298
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
299
299
while (!my_isdigit(system_charset_info,*p) && *p)
306
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
306
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
309
309
return(0); /* successfully do nothing */
310
int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
310
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
311
311
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
313
313
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
367
367
terminate_slave_thread(THD *thd,
368
368
pthread_mutex_t* term_lock,
369
369
pthread_cond_t* term_cond,
370
volatile uint *slave_running,
370
volatile uint32_t *slave_running,
376
376
pthread_mutex_lock(term_lock);
427
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
428
pthread_mutex_t *cond_lock,
429
pthread_cond_t *start_cond,
430
volatile uint *slave_running,
431
volatile ulong *slave_run_id,
427
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
428
pthread_mutex_t *cond_lock,
429
pthread_cond_t *start_cond,
430
volatile uint32_t *slave_running,
431
volatile uint32_t *slave_run_id,
438
438
assert(mi->inited);
501
501
started the threads that were not previously running
504
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
504
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
506
506
const char* master_info_fname __attribute__((__unused__)),
507
507
const char* slave_info_fname __attribute__((__unused__)),
510
510
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
511
511
pthread_cond_t* cond_io=0,*cond_sql=0;
514
514
if (need_slave_mutex)
527
527
if (thread_mask & SLAVE_IO)
528
error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
530
&mi->slave_running, &mi->slave_run_id,
531
mi, 1); //high priority, to read the most possible
528
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
530
&mi->slave_running, &mi->slave_run_id,
531
mi, 1); //high priority, to read the most possible
532
532
if (!error && (thread_mask & SLAVE_SQL))
534
error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
536
&mi->rli.slave_running, &mi->rli.slave_run_id,
534
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
536
&mi->rli.slave_running, &mi->rli.slave_run_id,
539
539
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
977
977
const char query_format[]= "SET @master_heartbeat_period= %s";
978
978
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
980
the period is an ulonglong of nano-secs.
980
the period is an uint64_t of nano-secs.
982
llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
982
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
983
983
my_sprintf(query, (query, query_format, llbuf));
985
985
if (mysql_real_query(mysql, query, strlen(query))
1092
int register_slave_on_master(MYSQL* mysql, Master_info *mi,
1092
int32_t register_slave_on_master(MYSQL* mysql, Master_info *mi,
1093
1093
bool *suppress_warnings)
1095
1095
uchar buf[1024], *pos= buf;
1096
uint report_host_len, report_user_len=0, report_password_len=0;
1096
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1098
1098
*suppress_warnings= false;
1099
1099
if (!report_host)
1228
1228
protocol->store((uint32) mi->port);
1229
1229
protocol->store((uint32) mi->connect_retry);
1230
1230
protocol->store(mi->master_log_name, &my_charset_bin);
1231
protocol->store((ulonglong) mi->master_log_pos);
1231
protocol->store((uint64_t) mi->master_log_pos);
1232
1232
protocol->store(mi->rli.group_relay_log_name +
1233
1233
dirname_length(mi->rli.group_relay_log_name),
1234
1234
&my_charset_bin);
1235
protocol->store((ulonglong) mi->rli.group_relay_log_pos);
1235
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1236
1236
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1237
1237
protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
1238
1238
"Yes" : "No", &my_charset_bin);
1254
1254
protocol->store(mi->rli.last_error().number);
1255
1255
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1256
1256
protocol->store((uint32) mi->rli.slave_skip_counter);
1257
protocol->store((ulonglong) mi->rli.group_master_log_pos);
1258
protocol->store((ulonglong) mi->rli.log_space_total);
1257
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1258
protocol->store((uint64_t) mi->rli.log_space_total);
1260
1260
protocol->store(
1261
1261
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1262
1262
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1263
1263
"Relay"), &my_charset_bin);
1264
1264
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1265
protocol->store((ulonglong) mi->rli.until_log_pos);
1265
protocol->store((uint64_t) mi->rli.until_log_pos);
1267
1267
protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1268
1268
protocol->store(mi->ssl_ca, &my_charset_bin);
1418
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
1418
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1419
1419
void* thread_killed_arg)
1422
1422
thr_alarm_t alarmed;
1424
1424
thr_alarm_init(&alarmed);
1425
1425
time_t start_time= my_time(0);
1426
1426
time_t end_time= start_time+sec;
1428
while ((nap_time= (int) (end_time - start_time)) > 0)
1428
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1430
1430
ALARM alarm_buff;
1448
static int request_dump(MYSQL* mysql, Master_info* mi,
1448
static int32_t request_dump(MYSQL* mysql, Master_info* mi,
1449
1449
bool *suppress_warnings)
1451
1451
uchar buf[FN_REFLEN + 10];
1453
int binlog_flags = 0; // for now
1453
int32_t binlog_flags = 0; // for now
1454
1454
char* logname = mi->master_log_name;
1456
1456
*suppress_warnings= false;
1458
1458
// TODO if big log files: Change next to int8store()
1459
int4store(buf, (ulong) mi->master_log_pos);
1459
int4store(buf, (uint32_t) mi->master_log_pos);
1460
1460
int2store(buf + 4, binlog_flags);
1461
1461
int4store(buf + 6, server_id);
1462
len = (uint) strlen(logname);
1462
len = (uint32_t) strlen(logname);
1463
1463
memcpy(buf + 10, logname,len);
1464
1464
if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
1896
1896
@retval 1 There was an error.
1899
static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
1900
uint *retry_count, bool suppress_warnings,
1899
static int32_t try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
1900
uint32_t *retry_count, bool suppress_warnings,
1901
1901
const char *messages[SLAVE_RECON_MSG_MAX])
1903
1903
mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
2783
2783
Reads a 4.0 event and converts it to the slave's format. This code was copied
2784
2784
from queue_binlog_ver_1_event(), with some affordable simplifications.
2786
static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2786
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2789
2789
const char *errmsg = 0;
2791
2791
char *tmp_buf = 0;
2792
2792
Relay_log_info *rli= &mi->rli;
2960
2960
error= ER_SLAVE_HEARTBEAT_FAILURE;
2961
2961
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2962
2962
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2963
error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
2963
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2964
2964
error_msg.append(STRING_WITH_LEN(" log_pos "));
2965
2965
llstr(hb.log_pos, llbuf);
2966
2966
error_msg.append(llbuf, strlen(llbuf));
2987
2987
error= ER_SLAVE_HEARTBEAT_FAILURE;
2988
2988
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2989
2989
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2990
error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
2990
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2991
2991
error_msg.append(STRING_WITH_LEN(" log_pos "));
2992
2992
llstr(hb.log_pos, llbuf);
2993
2993
error_msg.append(llbuf, strlen(llbuf));
3131
3131
master_retry_count times
3134
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
3134
static int32_t connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
3135
3135
bool reconnect, bool suppress_warnings)
3137
int slave_was_killed;
3138
int last_errno= -2; // impossible error
3137
int32_t slave_was_killed;
3138
int32_t last_errno= -2; // impossible error
3139
uint32_t err_count=0;
3140
3140
char llbuff[22];
3142
3142
mi->events_till_disconnect = disconnect_slave_event_count;
3143
ulong client_flag= CLIENT_REMEMBER_OPTIONS;
3143
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3144
3144
if (opt_slave_compressed_protocol)
3145
3145
client_flag=CLIENT_COMPRESS; /* We will use compression */
3219
3219
master_retry_count times
3222
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
3222
static int32_t safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
3223
3223
bool suppress_warnings)
3225
3225
return(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
3528
3528
If the group's coordinates are equal to the event's coordinates
3529
3529
(i.e. the relay log was not rotated in the middle of a group),
3530
3530
we can purge this relay log too.
3531
We do ulonglong and string comparisons, this may be slow but
3531
We do uint64_t and string comparisons, this may be slow but
3532
3532
- purging the last relay log is nice (it can save 1GB of disk), so we
3533
3533
like to detect the case where we can do it, and given this,
3534
3534
- I see no better detection method
3703
3703
@param report bool report error message, default TRUE
3704
3704
@return true if master has the bug, FALSE if it does not.
3706
bool rpl_master_has_bug(Relay_log_info *rli, uint bug_id, bool report)
3706
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3708
3708
struct st_version_range_for_one_bug {
3710
3710
const uchar introduced_in[3]; // first version with bug
3711
3711
const uchar fixed_in[3]; // first version with fix