~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/rpl_mi.cc

  • Committer: Monty Taylor
  • Date: 2008-10-13 09:29:43 UTC
  • mfrom: (509 drizzle)
  • mto: (509.1.4 codestyle)
  • mto: This revision was merged to the branch mainline in revision 511.
  • Revision ID: monty@inaugust.com-20081013092943-rwvx4a6d85b5l2dh
MergedĀ inĀ trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
 
16
16
#include <drizzled/server_includes.h>
17
17
#include "rpl_mi.h"
 
18
#include <iostream>
 
19
#include <fstream>
18
20
 
19
21
#define DEFAULT_CONNECT_RETRY 60
20
22
 
26
28
 
27
29
Master_info::Master_info()
28
30
  :Slave_reporting_capability("I/O"),
29
 
   fd(-1),  io_thd(0), port(DRIZZLE_PORT),
30
31
   connect_retry(DEFAULT_CONNECT_RETRY), heartbeat_period(0),
31
32
   received_heartbeats(0), inited(0),
32
33
   abort_slave(0), slave_running(0), slave_run_id(0)
33
34
{
34
35
  host[0] = 0; user[0] = 0; password[0] = 0;
 
36
  io_thd= NULL;
 
37
  port= DRIZZLE_PORT;
35
38
 
36
 
  memset(&file, 0, sizeof(file));
37
39
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
38
40
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
39
41
  pthread_cond_init(&data_cond, NULL);
50
52
  pthread_cond_destroy(&stop_cond);
51
53
}
52
54
 
53
 
 
54
 
void init_master_log_pos(Master_info* mi)
55
 
{
56
 
  mi->master_log_name[0] = 0;
57
 
  mi->master_log_pos = BIN_LOG_HEADER_SIZE;             // skip magic number
58
 
  /* 
59
 
    always request heartbeat unless master_heartbeat_period is set
60
 
    explicitly zero.  Here is the default value for heartbeat period
61
 
    if CHANGE MASTER did not specify it.  (no data loss in conversion
62
 
    as hb period has a max)
63
 
  */
64
 
  mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
65
 
                                    (slave_net_timeout/2.0));
66
 
  assert(mi->heartbeat_period > (float) 0.001
67
 
              || mi->heartbeat_period == 0);
68
 
  return;
69
 
}
70
 
 
71
 
 
72
 
enum {
73
 
  LINES_IN_MASTER_INFO_WITH_SSL= 14,
74
 
 
75
 
  /* 5.1.16 added value of master_ssl_verify_server_cert */
76
 
  LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT= 15,
77
 
 
78
 
  /* 6.0 added value of master_heartbeat_period */
79
 
  LINE_FOR_MASTER_HEARTBEAT_PERIOD= 16,
80
 
 
81
 
  /* Number of lines currently used when saving master info file */
82
 
  LINES_IN_MASTER_INFO= LINE_FOR_MASTER_HEARTBEAT_PERIOD
83
 
};
84
 
 
85
 
 
86
 
int init_master_info(Master_info* mi, const char* master_info_fname,
87
 
                     const char* slave_info_fname,
88
 
                     bool abort_if_no_master_info_file,
89
 
                     int thread_mask)
90
 
{
91
 
  int fd,error;
92
 
  char fname[FN_REFLEN+128];
93
 
 
94
 
  if (mi->inited)
 
55
bool Master_info::setPassword(const char *pword)
 
56
{
 
57
  password.assign(pword);
 
58
 
 
59
  return true;
 
60
}
 
61
 
 
62
const char *Master_info::getPassword()
 
63
{
 
64
  return password.c_str();
 
65
}
 
66
 
 
67
bool Master_info::setUsername(const char *username)
 
68
{
 
69
  user.assign(username);
 
70
 
 
71
  return true;
 
72
}
 
73
 
 
74
const char *Master_info::getUsername()
 
75
{
 
76
  return user.c_str();
 
77
}
 
78
 
 
79
bool Master_info::setHost(const char *hostname, uint16_t new_port)
 
80
{
 
81
  host.assign(hostname);
 
82
  port= new_port;
 
83
 
 
84
  return true;
 
85
}
 
86
 
 
87
const char *Master_info::getHostname()
 
88
{
 
89
  return host.c_str();
 
90
}
 
91
 
 
92
uint16_t Master_info::getPort()
 
93
{
 
94
  return port;
 
95
}
 
96
 
 
97
off_t Master_info::getLogPosition()
 
98
{
 
99
  return log_pos;
 
100
}
 
101
 
 
102
bool Master_info::setLogPosition(off_t position)
 
103
{
 
104
  log_pos= position;
 
105
 
 
106
  return true;
 
107
}
 
108
 
 
109
void Master_info::incrementLogPosition(off_t position)
 
110
{
 
111
  log_pos+= position;
 
112
}
 
113
 
 
114
const char *Master_info::getLogName()
 
115
{
 
116
  return log_name.c_str();
 
117
}
 
118
 
 
119
bool Master_info::setLogName(const char *name)
 
120
{
 
121
 log_name.assign(name);
 
122
 
 
123
  return true;
 
124
}
 
125
 
 
126
uint32_t Master_info::getConnectionRetry()
 
127
{
 
128
  return connect_retry;
 
129
}
 
130
 
 
131
bool Master_info::setConnectionRetry(uint32_t retry)
 
132
{
 
133
  connect_retry= retry;
 
134
 
 
135
  return true;
 
136
}
 
137
 
 
138
 
 
139
void Master_info::reset()
 
140
{
 
141
  log_name.clear();
 
142
  log_pos= 0; 
 
143
}
 
144
 
 
145
 
 
146
int Master_info::init_master_info(const char* master_info_fname,
 
147
                                  const char* slave_info_fname,
 
148
                                  int thread_mask)
 
149
{
 
150
  int error;
 
151
 
 
152
  if (inited)
95
153
  {
96
154
    /*
97
155
      We have to reset read position of relay-log-bin as we may have
107
165
    */
108
166
    if (thread_mask & SLAVE_SQL)
109
167
    {
110
 
      my_b_seek(mi->rli.cur_log, (my_off_t) 0);
 
168
      my_b_seek(rli.cur_log, (my_off_t) 0);
111
169
    }
112
170
    return(0);
113
171
  }
114
172
 
115
 
  mi->drizzle=0;
116
 
  mi->file_id=1;
117
 
  fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
 
173
  drizzle= 0;
 
174
  file_id= 1;
 
175
  {
 
176
    char fname[FN_REFLEN+128];
 
177
 
 
178
    fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
 
179
    info_filename.assign(fname);
 
180
  }
118
181
 
119
182
  /*
120
183
    We need a mutex while we are changing master info parameters to
121
184
    keep other threads from reading bogus info
122
185
  */
123
186
 
124
 
  pthread_mutex_lock(&mi->data_lock);
125
 
  fd = mi->fd;
 
187
  pthread_mutex_lock(&data_lock);
126
188
 
127
189
  /* does master.info exist ? */
128
190
 
129
 
  if (access(fname,F_OK))
 
191
  if (access(info_filename.c_str(), F_OK))
130
192
  {
131
 
    if (abort_if_no_master_info_file)
132
 
    {
133
 
      pthread_mutex_unlock(&mi->data_lock);
134
 
      return(0);
135
 
    }
136
 
    /*
137
 
      if someone removed the file from underneath our feet, just close
138
 
      the old descriptor and re-create the old file
139
 
    */
140
 
    if (fd >= 0)
141
 
      my_close(fd, MYF(MY_WME));
142
 
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
143
 
    {
144
 
      sql_print_error(_("Failed to create a new master info file (file '%s', errno %d)"), fname, my_errno);
145
 
      goto err;
146
 
    }
147
 
    if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
148
 
                      MYF(MY_WME)))
149
 
    {
150
 
      sql_print_error(_("Failed to create a cache on master info file (file '%s')"), fname);
151
 
      goto err;
152
 
    }
153
 
 
154
 
    mi->fd = fd;
155
 
    init_master_log_pos(mi);
156
 
 
 
193
    drizzle::MasterList_Record *record;
 
194
 
 
195
    reset();
 
196
 
 
197
    /* Write new Master info file here (from info_filename) */
 
198
    record= list.add_record();
 
199
    record->set_hostname(host);
 
200
    record->set_username(user);
 
201
    record->set_password(password);
 
202
    record->set_port(port);
 
203
    record->set_connect_retry(connect_retry);
 
204
    record->set_log_name(log_name);
 
205
    record->set_log_position(log_pos);
 
206
 
 
207
    fstream output(info_filename.c_str(), ios::out | ios::trunc | ios::binary);
 
208
    if (!list.SerializeToOstream(&output)) 
 
209
    { 
 
210
      assert(0);
 
211
      return -1;
 
212
    }
157
213
  }
158
214
  else // file exists
159
215
  {
160
 
    if (fd >= 0)
161
 
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
162
 
    else
163
 
    {
164
 
      if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
165
 
      {
166
 
        sql_print_error(_("Failed to open the existing master info file (file '%s', errno %d)"), fname, my_errno);
167
 
        goto err;
168
 
      }
169
 
      if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
170
 
                        0, MYF(MY_WME)))
171
 
      {
172
 
        sql_print_error(_("Failed to create a cache on master info file (file '%s')"), fname);
173
 
        goto err;
174
 
      }
175
 
    }
176
 
 
177
 
    mi->fd = fd;
178
 
    int port, connect_retry, master_log_pos, lines;
179
 
    float master_heartbeat_period= 0.0;
180
 
    char *first_non_digit;
181
 
 
182
 
    /*
183
 
       Starting from 4.1.x master.info has new format. Now its
184
 
       first line contains number of lines in file. By reading this
185
 
       number we will be always distinguish to which version our
186
 
       master.info corresponds to. We can't simply count lines in
187
 
       file since versions before 4.1.x could generate files with more
188
 
       lines than needed.
189
 
       If first line doesn't contain a number or contain number less than
190
 
       LINES_IN_MASTER_INFO_WITH_SSL then such file is treated like file
191
 
       from pre 4.1.1 version.
192
 
       There is no ambiguity when reading an old master.info, as before
193
 
       4.1.1, the first line contained the binlog's name, which is either
194
 
       empty or has an extension (contains a '.'), so can't be confused
195
 
       with an integer.
196
 
 
197
 
       So we're just reading first line and trying to figure which version
198
 
       is this.
199
 
    */
200
 
 
201
 
    /*
202
 
       The first row is temporarily stored in mi->master_log_name,
203
 
       if it is line count and not binlog name (new format) it will be
204
 
       overwritten by the second row later.
205
 
    */
206
 
    if (init_strvar_from_file(mi->master_log_name,
207
 
                              sizeof(mi->master_log_name), &mi->file,
208
 
                              ""))
209
 
      goto errwithmsg;
210
 
 
211
 
    lines= strtoul(mi->master_log_name, &first_non_digit, 10);
212
 
 
213
 
    if (mi->master_log_name[0]!='\0' &&
214
 
        *first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
215
 
    {
216
 
      /* Seems to be new format => read master log name from next line */
217
 
      if (init_strvar_from_file(mi->master_log_name,
218
 
            sizeof(mi->master_log_name), &mi->file, ""))
219
 
        goto errwithmsg;
220
 
    }
221
 
    else
222
 
      lines= 7;
223
 
 
224
 
    if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
225
 
        init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, 0) ||
226
 
        init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file, "test") ||
227
 
        init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
228
 
                              &mi->file, 0 ) ||
229
 
        init_intvar_from_file(&port, &mi->file, DRIZZLE_PORT) ||
230
 
        init_intvar_from_file(&connect_retry, &mi->file, DEFAULT_CONNECT_RETRY))
231
 
      goto errwithmsg;
232
 
 
233
 
    /*
234
 
      If file has ssl part use it even if we have server without
235
 
      SSL support. But these option will be ignored later when
236
 
      slave will try connect to master, so in this case warning
237
 
      is printed.
238
 
    */
239
 
    if (lines >= LINES_IN_MASTER_INFO_WITH_SSL)
240
 
    {
241
 
      /*
242
 
        Starting from 6.0 master_heartbeat_period might be
243
 
        in the file
244
 
      */
245
 
      if (lines >= LINE_FOR_MASTER_HEARTBEAT_PERIOD &&
246
 
          init_floatvar_from_file(&master_heartbeat_period, &mi->file, 0.0))
247
 
        goto errwithmsg;
248
 
    }
249
 
 
250
 
    /*
251
 
      This has to be handled here as init_intvar_from_file can't handle
252
 
      my_off_t types
253
 
    */
254
 
    mi->master_log_pos= (my_off_t) master_log_pos;
255
 
    mi->port= (uint) port;
256
 
    mi->connect_retry= (uint) connect_retry;
257
 
    mi->heartbeat_period= master_heartbeat_period;
 
216
    /* Read Master info file here (from info_filename) */
 
217
    fstream input(info_filename.c_str(), ios::in | ios::binary);
 
218
    if (!list.ParseFromIstream(&input)) 
 
219
    {
 
220
      assert(0);
 
221
      return -1;
 
222
    }
 
223
 
 
224
    /* We do not support multi-master just yet */
 
225
    assert(list.record_size() == 1);
 
226
    const drizzle::MasterList_Record record= list.record(0);
 
227
 
 
228
    if (record.has_username())
 
229
      user= record.username();
 
230
    if (record.has_password())
 
231
      password= record.password();
 
232
    if (record.has_port())
 
233
      port= record.port();
 
234
    if (record.has_connect_retry())
 
235
      connect_retry= record.connect_retry();
 
236
    if (record.has_log_name())
 
237
      log_name= record.log_name();
 
238
    if (record.has_log_position())
 
239
      log_pos= record.log_position();
258
240
  }
259
241
 
260
 
  mi->rli.mi = mi;
261
 
  if (init_relay_log_info(&mi->rli, slave_info_fname))
 
242
  rli.mi = this;
 
243
  if (init_relay_log_info(&rli, slave_info_fname))
262
244
    goto err;
263
245
 
264
 
  mi->inited = 1;
265
 
  // now change cache READ -> WRITE - must do this before flush_master_info
266
 
  reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
267
 
  if ((error=test(flush_master_info(mi, 1))))
 
246
  inited= 1;
 
247
  if ((error= test(flush())))
268
248
    sql_print_error(_("Failed to flush master info file"));
269
 
  pthread_mutex_unlock(&mi->data_lock);
 
249
  pthread_mutex_unlock(&data_lock);
270
250
  return(error);
271
251
 
272
 
errwithmsg:
273
 
  sql_print_error(_("Error reading master configuration"));
274
 
 
275
252
err:
276
 
  if (fd >= 0)
277
 
  {
278
 
    my_close(fd, MYF(0));
279
 
    end_io_cache(&mi->file);
280
 
  }
281
 
  mi->fd= -1;
282
 
  pthread_mutex_unlock(&mi->data_lock);
283
 
  return(1);
 
253
  pthread_mutex_unlock(&data_lock);
 
254
  return 1;
284
255
}
285
256
 
286
257
 
290
261
     1 - flush master info failed
291
262
     0 - all ok
292
263
*/
293
 
int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
 
264
int Master_info::flush()
294
265
{
295
 
  IO_CACHE* file = &mi->file;
296
 
  char lbuf[22];
297
 
 
298
266
  /*
299
267
    Flush the relay log to disk. If we don't do it, then the relay log while
300
268
    have some part (its last kilobytes) in memory only, so if the slave server
307
275
    When we come to this place in code, relay log may or not be initialized;
308
276
    the caller is responsible for setting 'flush_relay_log_cache' accordingly.
309
277
  */
310
 
  if (flush_relay_log_cache &&
311
 
      flush_io_cache(mi->rli.relay_log.get_log_file()))
312
 
    return(2);
313
 
 
314
 
  /*
315
 
    We flushed the relay log BEFORE the master.info file, because if we crash
316
 
    now, we will get a duplicate event in the relay log at restart. If we
317
 
    flushed in the other order, we would get a hole in the relay log.
318
 
    And duplicate is better than hole (with a duplicate, in later versions we
319
 
    can add detection and scrap one event; with a hole there's nothing we can
320
 
    do).
321
 
  */
322
 
 
323
 
  /*
324
 
     In certain cases this code may create master.info files that seems
325
 
     corrupted, because of extra lines filled with garbage in the end
326
 
     file (this happens if new contents take less space than previous
327
 
     contents of file). But because of number of lines in the first line
328
 
     of file we don't care about this garbage.
329
 
  */
330
 
  char heartbeat_buf[sizeof(mi->heartbeat_period) * 4]; // buffer to suffice always
331
 
  sprintf(heartbeat_buf, "%.3f", mi->heartbeat_period);
332
 
  my_b_seek(file, 0L);
333
 
  my_b_printf(file,
334
 
              "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n",
335
 
              LINES_IN_MASTER_INFO,
336
 
              mi->master_log_name, llstr(mi->master_log_pos, lbuf),
337
 
              mi->host, mi->user,
338
 
              mi->password, mi->port, mi->connect_retry,
339
 
              heartbeat_buf);
340
 
  return(-flush_io_cache(file));
 
278
 
 
279
  /* Write Master info file here (from info_filename) */
 
280
  assert(info_filename.length());
 
281
  assert(list.record_size() == 1);
 
282
  drizzle::MasterList_Record *record= list.mutable_record(0);
 
283
 
 
284
  record->set_hostname(host);
 
285
  record->set_username(user);
 
286
  record->set_password(password);
 
287
  record->set_port(port);
 
288
  record->set_connect_retry(connect_retry);
 
289
  record->set_log_name(log_name);
 
290
  record->set_log_position(log_pos);
 
291
 
 
292
  fstream output(info_filename.c_str(), ios::out | ios::trunc | ios::binary);
 
293
  if (!list.SerializeToOstream(&output)) 
 
294
  { 
 
295
    assert(0);
 
296
    return 1;
 
297
  }
 
298
 
 
299
  return 0;
341
300
}
342
301
 
343
302
 
344
 
void end_master_info(Master_info* mi)
 
303
void Master_info::end_master_info()
345
304
{
346
 
  if (!mi->inited)
 
305
  if (!inited)
347
306
    return;
348
 
  end_relay_log_info(&mi->rli);
349
 
  if (mi->fd >= 0)
350
 
  {
351
 
    end_io_cache(&mi->file);
352
 
    (void)my_close(mi->fd, MYF(MY_WME));
353
 
    mi->fd = -1;
354
 
  }
355
 
  mi->inited = 0;
 
307
  end_relay_log_info(&rli);
 
308
  inited = 0;
356
309
 
357
310
  return;
358
311
}