~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/slave.cc

  • Committer: Brian Aker
  • Date: 2008-07-11 00:10:24 UTC
  • Revision ID: brian@tangent.org-20080711001024-5lrgorqxoc80i0nl
ulong cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
54
54
char* slave_load_tmpdir = 0;
55
55
Master_info *active_mi= 0;
56
56
my_bool replicate_same_server_id;
57
 
ulonglong relay_log_space_limit = 0;
 
57
uint64_t relay_log_space_limit = 0;
58
58
 
59
59
/*
60
60
  When slave thread exits, we need to remember the temporary tables so we
63
63
  TODO: move the vars below under Master_info
64
64
*/
65
65
 
66
 
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
67
 
int events_till_abort = -1;
 
66
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 
67
int32_t events_till_abort = -1;
68
68
 
69
69
enum enum_slave_reconnect_actions
70
70
{
120
120
 
121
121
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
122
122
 
123
 
static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
 
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
 
123
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 
124
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
125
125
static bool wait_for_relay_log_space(Relay_log_info* rli);
126
126
static inline bool io_slave_killed(THD* thd,Master_info* mi);
127
127
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
128
 
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
 
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
130
 
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
 
128
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
 
129
static int32_t safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
 
130
static int32_t safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
131
131
                          bool suppress_warnings);
132
 
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
 
132
static int32_t connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
133
133
                             bool reconnect, bool suppress_warnings);
134
 
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
 
134
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
135
135
                      void* thread_killed_arg);
136
 
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
 
136
static int32_t get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
137
137
static Log_event* next_event(Relay_log_info* rli);
138
 
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
139
 
static int terminate_slave_thread(THD *thd,
 
138
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
 
139
static int32_t terminate_slave_thread(THD *thd,
140
140
                                  pthread_mutex_t* term_lock,
141
141
                                  pthread_cond_t* term_cond,
142
 
                                  volatile uint *slave_running,
 
142
                                  volatile uint32_t *slave_running,
143
143
                                  bool skip_lock);
144
144
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
145
145
 
161
161
                If inverse == 1, stopped threads
162
162
*/
163
163
 
164
 
void init_thread_mask(int* mask,Master_info* mi,bool inverse)
 
164
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
165
165
{
166
166
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
167
 
  register int tmp_mask=0;
 
167
  register int32_t tmp_mask=0;
168
168
 
169
169
  if (set_io)
170
170
    tmp_mask |= SLAVE_IO;
205
205
 
206
206
/* Initialize slave structures */
207
207
 
208
 
int init_slave()
 
208
int32_t init_slave()
209
209
{
210
210
  /*
211
211
    This is called when mysqld starts. Before client connections are
295
295
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
296
296
      break;
297
297
    if (err_code < MAX_SLAVE_ERROR)
298
 
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
 
298
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
299
299
    while (!my_isdigit(system_charset_info,*p) && *p)
300
300
      p++;
301
301
  }
303
303
}
304
304
 
305
305
 
306
 
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
 
306
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
307
307
{
308
308
  if (!mi->inited)
309
309
    return(0); /* successfully do nothing */
310
 
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
 
310
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
311
311
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
312
312
 
313
313
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
363
363
 
364
364
   @retval 0 All OK
365
365
 */
366
 
static int
 
366
static int32_t
367
367
terminate_slave_thread(THD *thd,
368
368
                       pthread_mutex_t* term_lock,
369
369
                       pthread_cond_t* term_cond,
370
 
                       volatile uint *slave_running,
 
370
                       volatile uint32_t *slave_running,
371
371
                       bool skip_lock)
372
372
{
373
 
  int error;
 
373
  int32_t error;
374
374
 
375
375
  if (!skip_lock)
376
376
    pthread_mutex_lock(term_lock);
400
400
      EINVAL: invalid signal number (can't happen)
401
401
      ESRCH: thread already killed (can happen, should be ignored)
402
402
    */
403
 
    int err= pthread_kill(thd->real_id, thr_client_alarm);
 
403
    int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
404
404
    assert(err != EINVAL);
405
405
#endif
406
406
    thd->awake(THD::NOT_KILLED);
424
424
}
425
425
 
426
426
 
427
 
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
428
 
                       pthread_mutex_t *cond_lock,
429
 
                       pthread_cond_t *start_cond,
430
 
                       volatile uint *slave_running,
431
 
                       volatile ulong *slave_run_id,
432
 
                       Master_info* mi,
433
 
                       bool high_priority)
 
427
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
 
428
                           pthread_mutex_t *cond_lock,
 
429
                           pthread_cond_t *start_cond,
 
430
                           volatile uint32_t *slave_running,
 
431
                           volatile uint32_t *slave_run_id,
 
432
                           Master_info* mi,
 
433
                           bool high_priority)
434
434
{
435
435
  pthread_t th;
436
 
  ulong start_id;
 
436
  uint32_t start_id;
437
437
 
438
438
  assert(mi->inited);
439
439
 
501
501
    started the threads that were not previously running
502
502
*/
503
503
 
504
 
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
 
504
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
505
505
                        Master_info* mi,
506
506
                        const char* master_info_fname __attribute__((__unused__)),
507
507
                        const char* slave_info_fname __attribute__((__unused__)),
508
 
                        int thread_mask)
 
508
                        int32_t thread_mask)
509
509
{
510
510
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
511
511
  pthread_cond_t* cond_io=0,*cond_sql=0;
512
 
  int error=0;
 
512
  int32_t error=0;
513
513
 
514
514
  if (need_slave_mutex)
515
515
  {
525
525
  }
526
526
 
527
527
  if (thread_mask & SLAVE_IO)
528
 
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
529
 
                             cond_io,
530
 
                             &mi->slave_running, &mi->slave_run_id,
531
 
                             mi, 1); //high priority, to read the most possible
 
528
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
 
529
                              cond_io,
 
530
                              &mi->slave_running, &mi->slave_run_id,
 
531
                              mi, 1); //high priority, to read the most possible
532
532
  if (!error && (thread_mask & SLAVE_SQL))
533
533
  {
534
 
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
535
 
                             cond_sql,
536
 
                             &mi->rli.slave_running, &mi->rli.slave_run_id,
537
 
                             mi, 0);
 
534
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
 
535
                              cond_sql,
 
536
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
 
537
                              mi, 0);
538
538
    if (error)
539
539
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
540
540
  }
543
543
 
544
544
 
545
545
#ifdef NOT_USED_YET
546
 
static int end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
 
546
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
547
547
{
548
548
  end_master_info(mi);
549
549
  return(0);
660
660
  return((db ? db : ""));
661
661
}
662
662
 
663
 
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
 
663
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
664
664
                                 const char *default_val)
665
665
{
666
 
  uint length;
 
666
  uint32_t length;
667
667
 
668
668
  if ((length=my_b_gets(f,var, max_size)))
669
669
  {
676
676
        If we truncated a line or stopped on last char, remove all chars
677
677
        up to and including newline.
678
678
      */
679
 
      int c;
680
 
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
 
679
      int32_t c;
 
680
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
681
681
    }
682
682
    return(0);
683
683
  }
690
690
}
691
691
 
692
692
 
693
 
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
 
693
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
694
694
{
695
695
  char buf[32];
696
696
 
708
708
  return(1);
709
709
}
710
710
 
711
 
int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
 
711
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
712
712
{
713
713
  char buf[16];
714
714
 
754
754
  1       error
755
755
*/
756
756
 
757
 
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
 
757
static int32_t get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
758
758
{
759
759
  char error_buf[512];
760
760
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
761
761
  char err_buff[MAX_SLAVE_ERRMSG];
762
762
  const char* errmsg= 0;
763
 
  int err_code= 0;
 
763
  int32_t err_code= 0;
764
764
  MYSQL_RES *master_res= 0;
765
765
  MYSQL_ROW master_row;
766
766
 
977
977
    const char query_format[]= "SET @master_heartbeat_period= %s";
978
978
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
979
979
    /* 
980
 
       the period is an ulonglong of nano-secs. 
 
980
       the period is an uint64_t of nano-secs. 
981
981
    */
982
 
    llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
 
982
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
983
983
    my_sprintf(query, (query, query_format, llbuf));
984
984
 
985
985
    if (mysql_real_query(mysql, query, strlen(query))
1089
1089
}
1090
1090
 
1091
1091
 
1092
 
int register_slave_on_master(MYSQL* mysql, Master_info *mi,
 
1092
int32_t register_slave_on_master(MYSQL* mysql, Master_info *mi,
1093
1093
                             bool *suppress_warnings)
1094
1094
{
1095
1095
  uchar buf[1024], *pos= buf;
1096
 
  uint report_host_len, report_user_len=0, report_password_len=0;
 
1096
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
1097
1097
 
1098
1098
  *suppress_warnings= false;
1099
1099
  if (!report_host)
1228
1228
    protocol->store((uint32) mi->port);
1229
1229
    protocol->store((uint32) mi->connect_retry);
1230
1230
    protocol->store(mi->master_log_name, &my_charset_bin);
1231
 
    protocol->store((ulonglong) mi->master_log_pos);
 
1231
    protocol->store((uint64_t) mi->master_log_pos);
1232
1232
    protocol->store(mi->rli.group_relay_log_name +
1233
1233
                    dirname_length(mi->rli.group_relay_log_name),
1234
1234
                    &my_charset_bin);
1235
 
    protocol->store((ulonglong) mi->rli.group_relay_log_pos);
 
1235
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1236
1236
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1237
1237
    protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
1238
1238
                    "Yes" : "No", &my_charset_bin);
1254
1254
    protocol->store(mi->rli.last_error().number);
1255
1255
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
1256
1256
    protocol->store((uint32) mi->rli.slave_skip_counter);
1257
 
    protocol->store((ulonglong) mi->rli.group_master_log_pos);
1258
 
    protocol->store((ulonglong) mi->rli.log_space_total);
 
1257
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
 
1258
    protocol->store((uint64_t) mi->rli.log_space_total);
1259
1259
 
1260
1260
    protocol->store(
1261
1261
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1262
1262
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1263
1263
          "Relay"), &my_charset_bin);
1264
1264
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
1265
 
    protocol->store((ulonglong) mi->rli.until_log_pos);
 
1265
    protocol->store((uint64_t) mi->rli.until_log_pos);
1266
1266
 
1267
1267
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1268
1268
    protocol->store(mi->ssl_ca, &my_charset_bin);
1340
1340
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1341
1341
     only for client threads.
1342
1342
  */
1343
 
  ulonglong options= thd->options | OPTION_BIG_SELECTS;
 
1343
  uint64_t options= thd->options | OPTION_BIG_SELECTS;
1344
1344
  if (opt_log_slave_updates)
1345
1345
    options|= OPTION_BIN_LOG;
1346
1346
  else
1374
1374
  init_slave_thread()
1375
1375
*/
1376
1376
 
1377
 
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
 
1377
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1378
1378
{
1379
 
  int simulate_error= 0;
 
1379
  int32_t simulate_error= 0;
1380
1380
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1381
1381
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1382
1382
  thd->security_ctx->skip_grants();
1415
1415
}
1416
1416
 
1417
1417
 
1418
 
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
 
1418
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1419
1419
                      void* thread_killed_arg)
1420
1420
{
1421
 
  int nap_time;
 
1421
  int32_t nap_time;
1422
1422
  thr_alarm_t alarmed;
1423
1423
 
1424
1424
  thr_alarm_init(&alarmed);
1425
1425
  time_t start_time= my_time(0);
1426
1426
  time_t end_time= start_time+sec;
1427
1427
 
1428
 
  while ((nap_time= (int) (end_time - start_time)) > 0)
 
1428
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1429
1429
  {
1430
1430
    ALARM alarm_buff;
1431
1431
    /*
1445
1445
}
1446
1446
 
1447
1447
 
1448
 
static int request_dump(MYSQL* mysql, Master_info* mi,
 
1448
static int32_t request_dump(MYSQL* mysql, Master_info* mi,
1449
1449
                        bool *suppress_warnings)
1450
1450
{
1451
1451
  uchar buf[FN_REFLEN + 10];
1452
 
  int len;
1453
 
  int binlog_flags = 0; // for now
 
1452
  int32_t len;
 
1453
  int32_t binlog_flags = 0; // for now
1454
1454
  char* logname = mi->master_log_name;
1455
1455
  
1456
1456
  *suppress_warnings= false;
1457
1457
 
1458
1458
  // TODO if big log files: Change next to int8store()
1459
 
  int4store(buf, (ulong) mi->master_log_pos);
 
1459
  int4store(buf, (uint32_t) mi->master_log_pos);
1460
1460
  int2store(buf + 4, binlog_flags);
1461
1461
  int4store(buf + 6, server_id);
1462
 
  len = (uint) strlen(logname);
 
1462
  len = (uint32_t) strlen(logname);
1463
1463
  memcpy(buf + 10, logname,len);
1464
1464
  if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
1465
1465
  {
1497
1497
    number              Length of packet
1498
1498
*/
1499
1499
 
1500
 
static ulong read_event(MYSQL* mysql,
 
1500
static uint32_t read_event(MYSQL* mysql,
1501
1501
                        Master_info *mi __attribute__((__unused__)),
1502
1502
                        bool* suppress_warnings)
1503
1503
{
1504
 
  ulong len;
 
1504
  uint32_t len;
1505
1505
 
1506
1506
  *suppress_warnings= false;
1507
1507
  /*
1542
1542
}
1543
1543
 
1544
1544
 
1545
 
int check_expected_error(THD* thd __attribute__((__unused__)),
 
1545
int32_t check_expected_error(THD* thd __attribute__((__unused__)),
1546
1546
                         Relay_log_info const *rli __attribute__((__unused__)),
1547
 
                         int expected_error)
 
1547
                         int32_t expected_error)
1548
1548
{
1549
1549
  switch (expected_error) {
1550
1550
  case ER_NET_READ_ERROR:
1566
1566
  that the error is temporary by pushing a warning with the error code
1567
1567
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1568
1568
*/
1569
 
static int has_temporary_error(THD *thd)
 
1569
static int32_t has_temporary_error(THD *thd)
1570
1570
{
1571
1571
  if (thd->is_fatal_error)
1572
1572
    return(0);
1624
1624
  @retval 2 No error calling ev->apply_event(), but error calling
1625
1625
  ev->update_pos().
1626
1626
*/
1627
 
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
 
1627
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
1628
1628
                               bool skip)
1629
1629
{
1630
 
  int exec_res= 0;
 
1630
  int32_t exec_res= 0;
1631
1631
 
1632
1632
  /*
1633
1633
    Execute the event to change the database and update the binary
1662
1662
 
1663
1663
  if (skip)
1664
1664
  {
1665
 
    int reason= ev->shall_skip(rli);
 
1665
    int32_t reason= ev->shall_skip(rli);
1666
1666
    if (reason == Log_event::EVENT_SKIP_COUNT)
1667
1667
      --rli->slave_skip_counter;
1668
1668
    pthread_mutex_unlock(&rli->data_lock);
1674
1674
 
1675
1675
  if (exec_res == 0)
1676
1676
  {
1677
 
    int error= ev->update_pos(rli);
 
1677
    int32_t error= ev->update_pos(rli);
1678
1678
    /*
1679
1679
      The update should not fail, so print an error message and
1680
1680
      return an error code.
1728
1728
 
1729
1729
  @retval 1 The event was not applied.
1730
1730
*/
1731
 
static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
 
1731
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
1732
1732
{
1733
1733
  /*
1734
1734
     We acquire this mutex since we need it for all operations except
1749
1749
  }
1750
1750
  if (ev)
1751
1751
  {
1752
 
    int exec_res;
 
1752
    int32_t exec_res;
1753
1753
 
1754
1754
    /*
1755
1755
      This tests if the position of the beginning of the current event
1793
1793
 
1794
1794
    if (slave_trans_retries)
1795
1795
    {
1796
 
      int temp_err= 0;
 
1796
      int32_t temp_err= 0;
1797
1797
      if (exec_res && (temp_err= has_temporary_error(thd)))
1798
1798
      {
1799
1799
        const char *errmsg;
1896
1896
  @retval        1                   There was an error.
1897
1897
*/
1898
1898
 
1899
 
static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
1900
 
                            uint *retry_count, bool suppress_warnings,
 
1899
static int32_t try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
 
1900
                            uint32_t *retry_count, bool suppress_warnings,
1901
1901
                            const char *messages[SLAVE_RECON_MSG_MAX])
1902
1902
{
1903
1903
  mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
1955
1955
  Master_info *mi = (Master_info*)arg;
1956
1956
  Relay_log_info *rli= &mi->rli;
1957
1957
  char llbuff[22];
1958
 
  uint retry_count;
 
1958
  uint32_t retry_count;
1959
1959
  bool suppress_warnings;
1960
 
  uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 
1960
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1961
1961
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
1962
1962
  my_thread_init();
1963
1963
 
2086
2086
 
2087
2087
    while (!io_slave_killed(thd,mi))
2088
2088
    {
2089
 
      ulong event_len;
 
2089
      uint32_t event_len;
2090
2090
      /*
2091
2091
         We say "waiting" because read_event() will wait if there's nothing to
2092
2092
         read. But if there's something to read, it will not wait. The
2110
2110
 
2111
2111
      if (event_len == packet_error)
2112
2112
      {
2113
 
        uint mysql_error_number= mysql_errno(mysql);
 
2113
        uint32_t mysql_error_number= mysql_errno(mysql);
2114
2114
        switch (mysql_error_number) {
2115
2115
        case CR_NET_PACKET_TOO_LARGE:
2116
2116
          sql_print_error("\
2499
2499
  process_io_create_file()
2500
2500
*/
2501
2501
 
2502
 
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
 
2502
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2503
2503
{
2504
 
  int error = 1;
2505
 
  ulong num_bytes;
 
2504
  int32_t error = 1;
 
2505
  uint32_t num_bytes;
2506
2506
  bool cev_not_written;
2507
2507
  THD *thd = mi->io_thd;
2508
2508
  NET *net = &mi->mysql->net;
2625
2625
 
2626
2626
*/
2627
2627
 
2628
 
static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
 
2628
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2629
2629
{
2630
2630
  safe_mutex_assert_owner(&mi->data_lock);
2631
2631
 
2668
2668
  Reads a 3.23 event and converts it to the slave's format. This code was
2669
2669
  copied from MySQL 4.0.
2670
2670
*/
2671
 
static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2672
 
                           ulong event_len)
 
2671
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
 
2672
                           uint32_t event_len)
2673
2673
{
2674
2674
  const char *errmsg = 0;
2675
 
  ulong inc_pos;
 
2675
  uint32_t inc_pos;
2676
2676
  bool ignore_event= 0;
2677
2677
  char *tmp_buf = 0;
2678
2678
  Relay_log_info *rli= &mi->rli;
2746
2746
    assert(tmp_buf != 0);
2747
2747
    inc_pos=event_len;
2748
2748
    ev->log_pos+= inc_pos;
2749
 
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
 
2749
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2750
2750
    delete ev;
2751
2751
    mi->master_log_pos += inc_pos;
2752
2752
    pthread_mutex_unlock(&mi->data_lock);
2783
2783
  Reads a 4.0 event and converts it to the slave's format. This code was copied
2784
2784
  from queue_binlog_ver_1_event(), with some affordable simplifications.
2785
2785
*/
2786
 
static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2787
 
                           ulong event_len)
 
2786
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
 
2787
                           uint32_t event_len)
2788
2788
{
2789
2789
  const char *errmsg = 0;
2790
 
  ulong inc_pos;
 
2790
  uint32_t inc_pos;
2791
2791
  char *tmp_buf = 0;
2792
2792
  Relay_log_info *rli= &mi->rli;
2793
2793
 
2845
2845
    setup with 3.23 master or 4.0 master
2846
2846
*/
2847
2847
 
2848
 
static int queue_old_event(Master_info *mi, const char *buf,
2849
 
                           ulong event_len)
 
2848
static int32_t queue_old_event(Master_info *mi, const char *buf,
 
2849
                           uint32_t event_len)
2850
2850
{
2851
2851
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2852
2852
  {
2869
2869
  any >=5.0.0 format.
2870
2870
*/
2871
2871
 
2872
 
static int queue_event(Master_info* mi,const char* buf, ulong event_len)
 
2872
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2873
2873
{
2874
 
  int error= 0;
 
2874
  int32_t error= 0;
2875
2875
  String error_msg;
2876
 
  ulong inc_pos;
 
2876
  uint32_t inc_pos;
2877
2877
  Relay_log_info *rli= &mi->rli;
2878
2878
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2879
2879
 
2960
2960
      error= ER_SLAVE_HEARTBEAT_FAILURE;
2961
2961
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2962
2962
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2963
 
      error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
 
2963
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2964
2964
      error_msg.append(STRING_WITH_LEN(" log_pos "));
2965
2965
      llstr(hb.log_pos, llbuf);
2966
2966
      error_msg.append(llbuf, strlen(llbuf));
2987
2987
      error= ER_SLAVE_HEARTBEAT_FAILURE;
2988
2988
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2989
2989
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2990
 
      error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
 
2990
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2991
2991
      error_msg.append(STRING_WITH_LEN(" log_pos "));
2992
2992
      llstr(hb.log_pos, llbuf);
2993
2993
      error_msg.append(llbuf, strlen(llbuf));
3116
3116
    #   Error
3117
3117
*/
3118
3118
 
3119
 
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi)
 
3119
static int32_t safe_connect(THD* thd, MYSQL* mysql, Master_info* mi)
3120
3120
{
3121
3121
  return(connect_to_master(thd, mysql, mi, 0, 0));
3122
3122
}
3131
3131
    master_retry_count times
3132
3132
*/
3133
3133
 
3134
 
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
 
3134
static int32_t connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
3135
3135
                             bool reconnect, bool suppress_warnings)
3136
3136
{
3137
 
  int slave_was_killed;
3138
 
  int last_errno= -2;                           // impossible error
3139
 
  ulong err_count=0;
 
3137
  int32_t slave_was_killed;
 
3138
  int32_t last_errno= -2;                           // impossible error
 
3139
  uint32_t err_count=0;
3140
3140
  char llbuff[22];
3141
3141
 
3142
3142
  mi->events_till_disconnect = disconnect_slave_event_count;
3143
 
  ulong client_flag= CLIENT_REMEMBER_OPTIONS;
 
3143
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3144
3144
  if (opt_slave_compressed_protocol)
3145
3145
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
3146
3146
 
3157
3157
                             mi->port, 0, client_flag) == 0))
3158
3158
  {
3159
3159
    /* Don't repeat last error */
3160
 
    if ((int)mysql_errno(mysql) != last_errno)
 
3160
    if ((int32_t)mysql_errno(mysql) != last_errno)
3161
3161
    {
3162
3162
      last_errno=mysql_errno(mysql);
3163
3163
      suppress_warnings= 0;
3219
3219
    master_retry_count times
3220
3220
*/
3221
3221
 
3222
 
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
 
3222
static int32_t safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
3223
3223
                          bool suppress_warnings)
3224
3224
{
3225
3225
  return(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
3528
3528
          If the group's coordinates are equal to the event's coordinates
3529
3529
          (i.e. the relay log was not rotated in the middle of a group),
3530
3530
          we can purge this relay log too.
3531
 
          We do ulonglong and string comparisons, this may be slow but
 
3531
          We do uint64_t and string comparisons, this may be slow but
3532
3532
          - purging the last relay log is nice (it can save 1GB of disk), so we
3533
3533
          like to detect the case where we can do it, and given this,
3534
3534
          - I see no better detection method
3703
3703
   @param report bool report error message, default TRUE
3704
3704
   @return true if master has the bug, FALSE if it does not.
3705
3705
*/
3706
 
bool rpl_master_has_bug(Relay_log_info *rli, uint bug_id, bool report)
 
3706
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3707
3707
{
3708
3708
  struct st_version_range_for_one_bug {
3709
 
    uint        bug_id;
 
3709
    uint32_t        bug_id;
3710
3710
    const uchar introduced_in[3]; // first version with bug
3711
3711
    const uchar fixed_in[3];      // first version with fix
3712
3712
  };
3722
3722
 
3723
3723
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3724
3724
 
3725
 
  for (uint i= 0;
 
3725
  for (uint32_t i= 0;
3726
3726
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3727
3727
  {
3728
3728
    const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,