~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Brian Aker
  • Date: 2008-10-20 03:40:03 UTC
  • mto: (492.3.21 drizzle-clean-code)
  • mto: This revision was merged to the branch mainline in revision 530.
  • Revision ID: brian@tangent.org-20081020034003-t2dcnl0ayr2ymm8k
THD -> Session rename

Show diffs side-by-side

added added

removed removed

Lines of Context:
54
54
bool use_slave_mask = 0;
55
55
MY_BITMAP slave_error_mask;
56
56
 
57
 
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
 
57
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
58
58
 
59
59
char* slave_load_tmpdir = 0;
60
60
Master_info *active_mi= 0;
125
125
};
126
126
 
127
127
 
128
 
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
 
128
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
129
129
 
130
130
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
131
131
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
132
132
static bool wait_for_relay_log_space(Relay_log_info* rli);
133
 
static inline bool io_slave_killed(THD* thd,Master_info* mi);
134
 
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
135
 
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
136
 
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
137
 
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
133
static inline bool io_slave_killed(Session* thd,Master_info* mi);
 
134
static inline bool sql_slave_killed(Session* thd,Relay_log_info* rli);
 
135
static int32_t init_slave_thread(Session* thd, SLAVE_Session_TYPE thd_type);
 
136
static int32_t safe_connect(Session* thd, DRIZZLE *drizzle, Master_info* mi);
 
137
static int32_t safe_reconnect(Session* thd, DRIZZLE *drizzle, Master_info* mi,
138
138
                          bool suppress_warnings);
139
 
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
139
static int32_t connect_to_master(Session* thd, DRIZZLE *drizzle, Master_info* mi,
140
140
                             bool reconnect, bool suppress_warnings);
141
 
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
141
static int32_t safe_sleep(Session* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
142
142
                      void* thread_killed_arg);
143
143
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
144
144
static Log_event* next_event(Relay_log_info* rli);
145
145
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
146
 
static int32_t terminate_slave_thread(THD *thd,
 
146
static int32_t terminate_slave_thread(Session *thd,
147
147
                                  pthread_mutex_t* term_lock,
148
148
                                  pthread_cond_t* term_cond,
149
149
                                  volatile uint32_t *slave_running,
150
150
                                  bool skip_lock);
151
 
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 
151
static bool check_io_slave_killed(Session *thd, Master_info *mi, const char *info);
152
152
 
153
153
/*
154
154
  Find out which replications threads are running
370
370
   @retval 0 All OK
371
371
 */
372
372
static int32_t
373
 
terminate_slave_thread(THD *thd,
 
373
terminate_slave_thread(Session *thd,
374
374
                       pthread_mutex_t* term_lock,
375
375
                       pthread_cond_t* term_cond,
376
376
                       volatile uint32_t *slave_running,
390
390
    return(ER_SLAVE_NOT_RUNNING);
391
391
  }
392
392
  assert(thd != 0);
393
 
  THD_CHECK_SENTRY(thd);
 
393
  Session_CHECK_SENTRY(thd);
394
394
 
395
395
  /*
396
396
    Is is critical to test if the slave is running. Otherwise, we might
409
409
    int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
410
410
    assert(err != EINVAL);
411
411
#endif
412
 
    thd->awake(THD::NOT_KILLED);
 
412
    thd->awake(Session::NOT_KILLED);
413
413
    pthread_mutex_unlock(&thd->LOCK_delete);
414
414
 
415
415
    /*
480
480
  }
481
481
  if (start_cond && cond_lock) // caller has cond_lock
482
482
  {
483
 
    THD* thd = current_thd;
 
483
    Session* thd = current_thd;
484
484
    while (start_id == *slave_run_id)
485
485
    {
486
486
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
591
591
}
592
592
 
593
593
 
594
 
static bool io_slave_killed(THD* thd, Master_info* mi)
 
594
static bool io_slave_killed(Session* thd, Master_info* mi)
595
595
{
596
596
  assert(mi->io_thd == thd);
597
597
  assert(mi->slave_running); // tracking buffer overrun
599
599
}
600
600
 
601
601
 
602
 
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
 
602
static bool sql_slave_killed(Session* thd, Relay_log_info* rli)
603
603
{
604
604
  assert(rli->sql_thd == thd);
605
605
  assert(rli->slave_running == 1);// tracking buffer overrun
734
734
  return(1);
735
735
}
736
736
 
737
 
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
 
737
static bool check_io_slave_killed(Session *thd, Master_info *mi, const char *info)
738
738
{
739
739
  if (io_slave_killed(thd, mi))
740
740
  {
1026
1026
  bool slave_killed=0;
1027
1027
  Master_info* mi = rli->mi;
1028
1028
  const char *save_proc_info;
1029
 
  THD* thd = mi->io_thd;
 
1029
  Session* thd = mi->io_thd;
1030
1030
 
1031
1031
  pthread_mutex_lock(&rli->log_space_lock);
1032
1032
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
1055
1055
    ignored events' end position for the use of the slave SQL thread, by
1056
1056
    calling this function. Only that thread can call it (see assertion).
1057
1057
 */
1058
 
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
 
1058
static void write_ignored_events_info_to_relay_log(Session *thd __attribute__((unused)),
1059
1059
                                                   Master_info *mi)
1060
1060
{
1061
1061
  Relay_log_info *rli= &mi->rli;
1145
1145
}
1146
1146
 
1147
1147
 
1148
 
bool show_master_info(THD* thd, Master_info* mi)
 
1148
bool show_master_info(Session* thd, Master_info* mi)
1149
1149
{
1150
1150
  // TODO: fix this for multi-master
1151
1151
  List<Item> field_list;
1316
1316
}
1317
1317
 
1318
1318
 
1319
 
void set_slave_thread_options(THD* thd)
 
1319
void set_slave_thread_options(Session* thd)
1320
1320
{
1321
1321
  /*
1322
1322
     It's nonsense to constrain the slave threads with max_join_size; if a
1341
1341
  init_slave_thread()
1342
1342
*/
1343
1343
 
1344
 
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
 
1344
static int32_t init_slave_thread(Session* thd, SLAVE_Session_TYPE thd_type)
1345
1345
{
1346
1346
  int32_t simulate_error= 0;
1347
 
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
 
1347
  thd->system_thread = (thd_type == SLAVE_Session_SQL) ?
1348
1348
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1349
1349
  thd->security_ctx->skip_grants();
1350
1350
  my_net_init(&thd->net, 0);
1362
1362
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1363
1363
  pthread_mutex_unlock(&LOCK_thread_count);
1364
1364
 
1365
 
 simulate_error|= (1 << SLAVE_THD_IO);
1366
 
 simulate_error|= (1 << SLAVE_THD_SQL);
 
1365
 simulate_error|= (1 << SLAVE_Session_IO);
 
1366
 simulate_error|= (1 << SLAVE_Session_SQL);
1367
1367
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1368
1368
  {
1369
1369
    thd->cleanup();
1371
1371
  }
1372
1372
  lex_start(thd);
1373
1373
 
1374
 
  if (thd_type == SLAVE_THD_SQL)
 
1374
  if (thd_type == SLAVE_Session_SQL)
1375
1375
    thd->set_proc_info("Waiting for the next event in relay log");
1376
1376
  else
1377
1377
    thd->set_proc_info("Waiting for master update");
1381
1381
}
1382
1382
 
1383
1383
 
1384
 
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
1384
static int32_t safe_sleep(Session* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1385
1385
                      void* thread_killed_arg)
1386
1386
{
1387
1387
  int32_t nap_time;
1508
1508
}
1509
1509
 
1510
1510
 
1511
 
int32_t check_expected_error(THD* thd __attribute__((unused)),
 
1511
int32_t check_expected_error(Session* thd __attribute__((unused)),
1512
1512
                         Relay_log_info const *rli __attribute__((unused)),
1513
1513
                         int32_t expected_error)
1514
1514
{
1532
1532
  that the error is temporary by pushing a warning with the error code
1533
1533
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1534
1534
*/
1535
 
static int32_t has_temporary_error(THD *thd)
 
1535
static int32_t has_temporary_error(Session *thd)
1536
1536
{
1537
1537
  if (thd->is_fatal_error)
1538
1538
    return(0);
1544
1544
  }
1545
1545
 
1546
1546
  /*
1547
 
    If there is no message in THD, we can't say if it's a temporary
 
1547
    If there is no message in Session, we can't say if it's a temporary
1548
1548
    error or not. This is currently the case for Incident_log_event,
1549
1549
    which sets no message. Return FALSE.
1550
1550
  */
1590
1590
  @retval 2 No error calling ev->apply_event(), but error calling
1591
1591
  ev->update_pos().
1592
1592
*/
1593
 
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
 
1593
int32_t apply_event_and_update_pos(Log_event* ev, Session* thd, Relay_log_info* rli,
1594
1594
                               bool skip)
1595
1595
{
1596
1596
  int32_t exec_res= 0;
1694
1694
 
1695
1695
  @retval 1 The event was not applied.
1696
1696
*/
1697
 
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
 
1697
static int32_t exec_relay_log_event(Session* thd, Relay_log_info* rli)
1698
1698
{
1699
1699
  /*
1700
1700
     We acquire this mutex since we need it for all operations except
1864
1864
  @retval        1                   There was an error.
1865
1865
*/
1866
1866
 
1867
 
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
 
1867
static int32_t try_to_reconnect(Session *thd, DRIZZLE *drizzle, Master_info *mi,
1868
1868
                                uint32_t *retry_count, bool suppress_warnings,
1869
1869
                                const char *messages[SLAVE_RECON_MSG_MAX])
1870
1870
{
1916
1916
 
1917
1917
pthread_handler_t handle_slave_io(void *arg)
1918
1918
{
1919
 
  THD *thd; // needs to be first for thread_stack
 
1919
  Session *thd; // needs to be first for thread_stack
1920
1920
  DRIZZLE *drizzle;
1921
1921
  Master_info *mi = (Master_info*)arg;
1922
1922
  Relay_log_info *rli= &mi->rli;
1936
1936
 
1937
1937
  mi->events_till_disconnect = disconnect_slave_event_count;
1938
1938
 
1939
 
  thd= new THD;
1940
 
  THD_CHECK_SENTRY(thd);
 
1939
  thd= new Session;
 
1940
  Session_CHECK_SENTRY(thd);
1941
1941
  mi->io_thd = thd;
1942
1942
 
1943
1943
  pthread_detach_this_thread();
1944
1944
  thd->thread_stack= (char*) &thd; // remember where our stack is
1945
 
  if (init_slave_thread(thd, SLAVE_THD_IO))
 
1945
  if (init_slave_thread(thd, SLAVE_Session_IO))
1946
1946
  {
1947
1947
    pthread_cond_broadcast(&mi->start_cond);
1948
1948
    pthread_mutex_unlock(&mi->run_lock);
2150
2150
  {
2151
2151
    /*
2152
2152
      Here we need to clear the active VIO before closing the
2153
 
      connection with the master.  The reason is that THD::awake()
 
2153
      connection with the master.  The reason is that Session::awake()
2154
2154
      might be called from terminate_slave_thread() because somebody
2155
2155
      issued a STOP SLAVE.  If that happends, the close_active_vio()
2156
2156
      can be called in the middle of closing the VIO associated with
2170
2170
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2171
2171
  close_thread_tables(thd);
2172
2172
  pthread_mutex_lock(&LOCK_thread_count);
2173
 
  THD_CHECK_SENTRY(thd);
 
2173
  Session_CHECK_SENTRY(thd);
2174
2174
  delete thd;
2175
2175
  pthread_mutex_unlock(&LOCK_thread_count);
2176
2176
  mi->abort_slave= 0;
2193
2193
 
2194
2194
pthread_handler_t handle_slave_sql(void *arg)
2195
2195
{
2196
 
  THD *thd;                     /* needs to be first for thread_stack */
 
2196
  Session *thd;                     /* needs to be first for thread_stack */
2197
2197
  char llbuff[22],llbuff1[22];
2198
2198
 
2199
2199
  Relay_log_info* rli = &((Master_info*)arg)->rli;
2207
2207
  errmsg= 0;
2208
2208
  rli->events_till_abort = abort_slave_event_count;
2209
2209
 
2210
 
  thd = new THD;
 
2210
  thd = new Session;
2211
2211
  thd->thread_stack = (char*)&thd; // remember where our stack is
2212
2212
  rli->sql_thd= thd;
2213
2213
  
2216
2216
  rli->slave_running = 1;
2217
2217
 
2218
2218
  pthread_detach_this_thread();
2219
 
  if (init_slave_thread(thd, SLAVE_THD_SQL))
 
2219
  if (init_slave_thread(thd, SLAVE_Session_SQL))
2220
2220
  {
2221
2221
    /*
2222
2222
      TODO: this is currently broken - slave start and change master
2272
2272
                    errmsg);
2273
2273
    goto err;
2274
2274
  }
2275
 
  THD_CHECK_SENTRY(thd);
 
2275
  Session_CHECK_SENTRY(thd);
2276
2276
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2277
2277
  /*
2278
2278
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2332
2332
  {
2333
2333
    thd->set_proc_info(_("Reading event from the relay log"));
2334
2334
    assert(rli->sql_thd == thd);
2335
 
    THD_CHECK_SENTRY(thd);
 
2335
    Session_CHECK_SENTRY(thd);
2336
2336
    if (exec_relay_log_event(thd,rli))
2337
2337
    {
2338
2338
      // do not scare the user if SQL thread was simply killed or stopped
2442
2442
  assert(thd->net.buff != 0);
2443
2443
  net_end(&thd->net); // destructor will not free it, because we are weird
2444
2444
  assert(rli->sql_thd == thd);
2445
 
  THD_CHECK_SENTRY(thd);
 
2445
  Session_CHECK_SENTRY(thd);
2446
2446
  rli->sql_thd= 0;
2447
2447
  pthread_mutex_lock(&LOCK_thread_count);
2448
 
  THD_CHECK_SENTRY(thd);
 
2448
  Session_CHECK_SENTRY(thd);
2449
2449
  delete thd;
2450
2450
  pthread_mutex_unlock(&LOCK_thread_count);
2451
2451
 /*
2471
2471
  int32_t error = 1;
2472
2472
  uint32_t num_bytes;
2473
2473
  bool cev_not_written;
2474
 
  THD *thd = mi->io_thd;
 
2474
  Session *thd = mi->io_thd;
2475
2475
  NET *net = &mi->drizzle->net;
2476
2476
 
2477
2477
  if (unlikely(!cev->is_valid()))
3084
3084
    #   Error
3085
3085
*/
3086
3086
 
3087
 
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
 
3087
static int32_t safe_connect(Session* thd, DRIZZLE *drizzle, Master_info* mi)
3088
3088
{
3089
3089
  return(connect_to_master(thd, drizzle, mi, 0, 0));
3090
3090
}
3099
3099
    master_retry_count times
3100
3100
*/
3101
3101
 
3102
 
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
3102
static int32_t connect_to_master(Session* thd, DRIZZLE *drizzle, Master_info* mi,
3103
3103
                                 bool reconnect, bool suppress_warnings)
3104
3104
{
3105
3105
  int32_t slave_was_killed;
3173
3173
    master_retry_count times
3174
3174
*/
3175
3175
 
3176
 
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
3176
static int32_t safe_reconnect(Session* thd, DRIZZLE *drizzle, Master_info* mi,
3177
3177
                          bool suppress_warnings)
3178
3178
{
3179
3179
  return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
3249
3249
  IO_CACHE* cur_log = rli->cur_log;
3250
3250
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3251
3251
  const char* errmsg=0;
3252
 
  THD* thd = rli->sql_thd;
 
3252
  Session* thd = rli->sql_thd;
3253
3253
 
3254
3254
  assert(thd != 0);
3255
3255
 
3710
3710
 
3711
3711
   Detect buggy master to work around.
3712
3712
 */
3713
 
bool rpl_master_erroneous_autoinc(THD *thd)
 
3713
bool rpl_master_erroneous_autoinc(Session *thd)
3714
3714
{
3715
3715
  if (active_mi && active_mi->rli.sql_thd == thd)
3716
3716
  {