~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/rpl_mi.cc

  • Committer: Monty Taylor
  • Date: 2008-10-22 21:31:15 UTC
  • Revision ID: monty@inaugust.com-20081022213115-xuxc80r939tl88p1
Renamed drizzle_common again. Removed sql_common. (empty) 
Now all we need to do is merge/disect base.h, common.h, common_includes.h, server_includes.h and globa.h (good grief)

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
 
16
16
#include <drizzled/server_includes.h>
17
17
#include "rpl_mi.h"
 
18
#include <iostream>
 
19
#include <fstream>
 
20
 
 
21
using namespace std;
18
22
 
19
23
#define DEFAULT_CONNECT_RETRY 60
20
24
 
26
30
 
27
31
Master_info::Master_info()
28
32
  :Slave_reporting_capability("I/O"),
29
 
   ssl(0), ssl_verify_server_cert(0), fd(-1),  io_thd(0), port(DRIZZLE_PORT),
30
33
   connect_retry(DEFAULT_CONNECT_RETRY), heartbeat_period(0),
31
34
   received_heartbeats(0), inited(0),
32
35
   abort_slave(0), slave_running(0), slave_run_id(0)
33
36
{
34
37
  host[0] = 0; user[0] = 0; password[0] = 0;
35
 
  ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
36
 
  ssl_cipher[0]= 0; ssl_key[0]= 0;
 
38
  io_session= NULL;
 
39
  port= DRIZZLE_PORT;
37
40
 
38
 
  memset(&file, 0, sizeof(file));
39
41
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
40
42
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
41
43
  pthread_cond_init(&data_cond, NULL);
52
54
  pthread_cond_destroy(&stop_cond);
53
55
}
54
56
 
55
 
 
56
 
void init_master_log_pos(Master_info* mi)
57
 
{
58
 
  mi->master_log_name[0] = 0;
59
 
  mi->master_log_pos = BIN_LOG_HEADER_SIZE;             // skip magic number
60
 
  /* 
61
 
    always request heartbeat unless master_heartbeat_period is set
62
 
    explicitly zero.  Here is the default value for heartbeat period
63
 
    if CHANGE MASTER did not specify it.  (no data loss in conversion
64
 
    as hb period has a max)
65
 
  */
66
 
  mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
67
 
                                    (slave_net_timeout/2.0));
68
 
  assert(mi->heartbeat_period > (float) 0.001
69
 
              || mi->heartbeat_period == 0);
70
 
  return;
71
 
}
72
 
 
73
 
 
74
 
enum {
75
 
  LINES_IN_MASTER_INFO_WITH_SSL= 14,
76
 
 
77
 
  /* 5.1.16 added value of master_ssl_verify_server_cert */
78
 
  LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT= 15,
79
 
 
80
 
  /* 6.0 added value of master_heartbeat_period */
81
 
  LINE_FOR_MASTER_HEARTBEAT_PERIOD= 16,
82
 
 
83
 
  /* Number of lines currently used when saving master info file */
84
 
  LINES_IN_MASTER_INFO= LINE_FOR_MASTER_HEARTBEAT_PERIOD
85
 
};
86
 
 
87
 
 
88
 
int init_master_info(Master_info* mi, const char* master_info_fname,
89
 
                     const char* slave_info_fname,
90
 
                     bool abort_if_no_master_info_file,
91
 
                     int thread_mask)
92
 
{
93
 
  int fd,error;
94
 
  char fname[FN_REFLEN+128];
95
 
 
96
 
  if (mi->inited)
 
57
bool Master_info::setPassword(const char *pword)
 
58
{
 
59
  password.assign(pword);
 
60
 
 
61
  return true;
 
62
}
 
63
 
 
64
const char *Master_info::getPassword()
 
65
{
 
66
  return password.c_str();
 
67
}
 
68
 
 
69
bool Master_info::setUsername(const char *username)
 
70
{
 
71
  user.assign(username);
 
72
 
 
73
  return true;
 
74
}
 
75
 
 
76
const char *Master_info::getUsername()
 
77
{
 
78
  return user.c_str();
 
79
}
 
80
 
 
81
bool Master_info::setHost(const char *hostname, uint16_t new_port)
 
82
{
 
83
  host.assign(hostname);
 
84
  port= new_port;
 
85
 
 
86
  return true;
 
87
}
 
88
 
 
89
const char *Master_info::getHostname()
 
90
{
 
91
  return host.c_str();
 
92
}
 
93
 
 
94
uint16_t Master_info::getPort()
 
95
{
 
96
  return port;
 
97
}
 
98
 
 
99
off_t Master_info::getLogPosition()
 
100
{
 
101
  return log_pos;
 
102
}
 
103
 
 
104
bool Master_info::setLogPosition(off_t position)
 
105
{
 
106
  log_pos= position;
 
107
 
 
108
  return true;
 
109
}
 
110
 
 
111
void Master_info::incrementLogPosition(off_t position)
 
112
{
 
113
  log_pos+= position;
 
114
}
 
115
 
 
116
const char *Master_info::getLogName()
 
117
{
 
118
  return log_name.c_str();
 
119
}
 
120
 
 
121
bool Master_info::setLogName(const char *name)
 
122
{
 
123
 log_name.assign(name);
 
124
 
 
125
  return true;
 
126
}
 
127
 
 
128
uint32_t Master_info::getConnectionRetry()
 
129
{
 
130
  return connect_retry;
 
131
}
 
132
 
 
133
bool Master_info::setConnectionRetry(uint32_t retry)
 
134
{
 
135
  connect_retry= retry;
 
136
 
 
137
  return true;
 
138
}
 
139
 
 
140
 
 
141
void Master_info::reset()
 
142
{
 
143
  log_name.clear();
 
144
  log_pos= 0; 
 
145
}
 
146
 
 
147
 
 
148
int Master_info::init_master_info(const char* master_info_fname,
 
149
                                  const char* slave_info_fname,
 
150
                                  int thread_mask)
 
151
{
 
152
  int error;
 
153
 
 
154
  if (inited)
97
155
  {
98
156
    /*
99
157
      We have to reset read position of relay-log-bin as we may have
109
167
    */
110
168
    if (thread_mask & SLAVE_SQL)
111
169
    {
112
 
      my_b_seek(mi->rli.cur_log, (my_off_t) 0);
 
170
      my_b_seek(rli.cur_log, (my_off_t) 0);
113
171
    }
114
172
    return(0);
115
173
  }
116
174
 
117
 
  mi->drizzle=0;
118
 
  mi->file_id=1;
119
 
  fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
 
175
  drizzle= 0;
 
176
  file_id= 1;
 
177
  {
 
178
    char fname[FN_REFLEN+128];
 
179
 
 
180
    fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
 
181
    info_filename.assign(fname);
 
182
  }
120
183
 
121
184
  /*
122
185
    We need a mutex while we are changing master info parameters to
123
186
    keep other threads from reading bogus info
124
187
  */
125
188
 
126
 
  pthread_mutex_lock(&mi->data_lock);
127
 
  fd = mi->fd;
 
189
  pthread_mutex_lock(&data_lock);
128
190
 
129
191
  /* does master.info exist ? */
130
192
 
131
 
  if (access(fname,F_OK))
 
193
  if (access(info_filename.c_str(), F_OK))
132
194
  {
133
 
    if (abort_if_no_master_info_file)
134
 
    {
135
 
      pthread_mutex_unlock(&mi->data_lock);
136
 
      return(0);
137
 
    }
138
 
    /*
139
 
      if someone removed the file from underneath our feet, just close
140
 
      the old descriptor and re-create the old file
141
 
    */
142
 
    if (fd >= 0)
143
 
      my_close(fd, MYF(MY_WME));
144
 
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
145
 
    {
146
 
      sql_print_error(_("Failed to create a new master info file (file '%s', errno %d)"), fname, my_errno);
147
 
      goto err;
148
 
    }
149
 
    if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
150
 
                      MYF(MY_WME)))
151
 
    {
152
 
      sql_print_error(_("Failed to create a cache on master info file (file '%s')"), fname);
153
 
      goto err;
154
 
    }
155
 
 
156
 
    mi->fd = fd;
157
 
    init_master_log_pos(mi);
158
 
 
 
195
    drizzle::MasterList_Record *record;
 
196
 
 
197
    reset();
 
198
 
 
199
    /* Write new Master info file here (from info_filename) */
 
200
    record= list.add_record();
 
201
    record->set_hostname(host);
 
202
    record->set_username(user);
 
203
    record->set_password(password);
 
204
    record->set_port(port);
 
205
    record->set_connect_retry(connect_retry);
 
206
    record->set_log_name(log_name);
 
207
    record->set_log_position(log_pos);
 
208
 
 
209
    fstream output(info_filename.c_str(), ios::out | ios::trunc | ios::binary);
 
210
    if (!list.SerializeToOstream(&output)) 
 
211
    { 
 
212
      assert(0);
 
213
      return -1;
 
214
    }
159
215
  }
160
216
  else // file exists
161
217
  {
162
 
    if (fd >= 0)
163
 
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
164
 
    else
165
 
    {
166
 
      if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
167
 
      {
168
 
        sql_print_error(_("Failed to open the existing master info file (file '%s', errno %d)"), fname, my_errno);
169
 
        goto err;
170
 
      }
171
 
      if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
172
 
                        0, MYF(MY_WME)))
173
 
      {
174
 
        sql_print_error(_("Failed to create a cache on master info file (file '%s')"), fname);
175
 
        goto err;
176
 
      }
177
 
    }
178
 
 
179
 
    mi->fd = fd;
180
 
    int port, connect_retry, master_log_pos, lines;
181
 
    int ssl= 0, ssl_verify_server_cert= 0;
182
 
    float master_heartbeat_period= 0.0;
183
 
    char *first_non_digit;
184
 
 
185
 
    /*
186
 
       Starting from 4.1.x master.info has new format. Now its
187
 
       first line contains number of lines in file. By reading this
188
 
       number we will be always distinguish to which version our
189
 
       master.info corresponds to. We can't simply count lines in
190
 
       file since versions before 4.1.x could generate files with more
191
 
       lines than needed.
192
 
       If first line doesn't contain a number or contain number less than
193
 
       LINES_IN_MASTER_INFO_WITH_SSL then such file is treated like file
194
 
       from pre 4.1.1 version.
195
 
       There is no ambiguity when reading an old master.info, as before
196
 
       4.1.1, the first line contained the binlog's name, which is either
197
 
       empty or has an extension (contains a '.'), so can't be confused
198
 
       with an integer.
199
 
 
200
 
       So we're just reading first line and trying to figure which version
201
 
       is this.
202
 
    */
203
 
 
204
 
    /*
205
 
       The first row is temporarily stored in mi->master_log_name,
206
 
       if it is line count and not binlog name (new format) it will be
207
 
       overwritten by the second row later.
208
 
    */
209
 
    if (init_strvar_from_file(mi->master_log_name,
210
 
                              sizeof(mi->master_log_name), &mi->file,
211
 
                              ""))
212
 
      goto errwithmsg;
213
 
 
214
 
    lines= strtoul(mi->master_log_name, &first_non_digit, 10);
215
 
 
216
 
    if (mi->master_log_name[0]!='\0' &&
217
 
        *first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
218
 
    {
219
 
      /* Seems to be new format => read master log name from next line */
220
 
      if (init_strvar_from_file(mi->master_log_name,
221
 
            sizeof(mi->master_log_name), &mi->file, ""))
222
 
        goto errwithmsg;
223
 
    }
224
 
    else
225
 
      lines= 7;
226
 
 
227
 
    if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
228
 
        init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, 0) ||
229
 
        init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file, "test") ||
230
 
        init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
231
 
                              &mi->file, 0 ) ||
232
 
        init_intvar_from_file(&port, &mi->file, DRIZZLE_PORT) ||
233
 
        init_intvar_from_file(&connect_retry, &mi->file, DEFAULT_CONNECT_RETRY))
234
 
      goto errwithmsg;
235
 
 
236
 
    /*
237
 
      If file has ssl part use it even if we have server without
238
 
      SSL support. But these option will be ignored later when
239
 
      slave will try connect to master, so in this case warning
240
 
      is printed.
241
 
    */
242
 
    if (lines >= LINES_IN_MASTER_INFO_WITH_SSL)
243
 
    {
244
 
      if (init_intvar_from_file(&ssl, &mi->file, 0) ||
245
 
          init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca),
246
 
                                &mi->file, 0) ||
247
 
          init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath),
248
 
                                &mi->file, 0) ||
249
 
          init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert),
250
 
                                &mi->file, 0) ||
251
 
          init_strvar_from_file(mi->ssl_cipher, sizeof(mi->ssl_cipher),
252
 
                                &mi->file, 0) ||
253
 
          init_strvar_from_file(mi->ssl_key, sizeof(mi->ssl_key),
254
 
                               &mi->file, 0))
255
 
      goto errwithmsg;
256
 
 
257
 
      /*
258
 
        Starting from 5.1.16 ssl_verify_server_cert might be
259
 
        in the file
260
 
      */
261
 
      if (lines >= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT &&
262
 
          init_intvar_from_file(&ssl_verify_server_cert, &mi->file, 0))
263
 
        goto errwithmsg;
264
 
      /*
265
 
        Starting from 6.0 master_heartbeat_period might be
266
 
        in the file
267
 
      */
268
 
      if (lines >= LINE_FOR_MASTER_HEARTBEAT_PERIOD &&
269
 
          init_floatvar_from_file(&master_heartbeat_period, &mi->file, 0.0))
270
 
        goto errwithmsg;
271
 
    }
272
 
 
273
 
    if (ssl)
274
 
      sql_print_warning(_("SSL information in the master info file "
275
 
                          "('%s') are ignored because this MySQL slave was "
276
 
                          "compiled without SSL support."), fname);
277
 
 
278
 
    /*
279
 
      This has to be handled here as init_intvar_from_file can't handle
280
 
      my_off_t types
281
 
    */
282
 
    mi->master_log_pos= (my_off_t) master_log_pos;
283
 
    mi->port= (uint) port;
284
 
    mi->connect_retry= (uint) connect_retry;
285
 
    mi->ssl= (bool) ssl;
286
 
    mi->ssl_verify_server_cert= ssl_verify_server_cert;
287
 
    mi->heartbeat_period= master_heartbeat_period;
 
218
    /* Read Master info file here (from info_filename) */
 
219
    fstream input(info_filename.c_str(), ios::in | ios::binary);
 
220
    if (!list.ParseFromIstream(&input)) 
 
221
    {
 
222
      assert(0);
 
223
      return -1;
 
224
    }
 
225
 
 
226
    /* We do not support multi-master just yet */
 
227
    assert(list.record_size() == 1);
 
228
    const drizzle::MasterList_Record record= list.record(0);
 
229
 
 
230
    if (record.has_username())
 
231
      user= record.username();
 
232
    if (record.has_password())
 
233
      password= record.password();
 
234
    if (record.has_port())
 
235
      port= record.port();
 
236
    if (record.has_connect_retry())
 
237
      connect_retry= record.connect_retry();
 
238
    if (record.has_log_name())
 
239
      log_name= record.log_name();
 
240
    if (record.has_log_position())
 
241
      log_pos= record.log_position();
288
242
  }
289
243
 
290
 
  mi->rli.mi = mi;
291
 
  if (init_relay_log_info(&mi->rli, slave_info_fname))
 
244
  rli.mi = this;
 
245
  if (init_relay_log_info(&rli, slave_info_fname))
292
246
    goto err;
293
247
 
294
 
  mi->inited = 1;
295
 
  // now change cache READ -> WRITE - must do this before flush_master_info
296
 
  reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
297
 
  if ((error=test(flush_master_info(mi, 1))))
 
248
  inited= 1;
 
249
  if ((error= test(flush())))
298
250
    sql_print_error(_("Failed to flush master info file"));
299
 
  pthread_mutex_unlock(&mi->data_lock);
 
251
  pthread_mutex_unlock(&data_lock);
300
252
  return(error);
301
253
 
302
 
errwithmsg:
303
 
  sql_print_error(_("Error reading master configuration"));
304
 
 
305
254
err:
306
 
  if (fd >= 0)
307
 
  {
308
 
    my_close(fd, MYF(0));
309
 
    end_io_cache(&mi->file);
310
 
  }
311
 
  mi->fd= -1;
312
 
  pthread_mutex_unlock(&mi->data_lock);
313
 
  return(1);
 
255
  pthread_mutex_unlock(&data_lock);
 
256
  return 1;
314
257
}
315
258
 
316
259
 
320
263
     1 - flush master info failed
321
264
     0 - all ok
322
265
*/
323
 
int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
 
266
int Master_info::flush()
324
267
{
325
 
  IO_CACHE* file = &mi->file;
326
 
  char lbuf[22];
327
 
 
328
268
  /*
329
269
    Flush the relay log to disk. If we don't do it, then the relay log while
330
270
    have some part (its last kilobytes) in memory only, so if the slave server
337
277
    When we come to this place in code, relay log may or not be initialized;
338
278
    the caller is responsible for setting 'flush_relay_log_cache' accordingly.
339
279
  */
340
 
  if (flush_relay_log_cache &&
341
 
      flush_io_cache(mi->rli.relay_log.get_log_file()))
342
 
    return(2);
343
 
 
344
 
  /*
345
 
    We flushed the relay log BEFORE the master.info file, because if we crash
346
 
    now, we will get a duplicate event in the relay log at restart. If we
347
 
    flushed in the other order, we would get a hole in the relay log.
348
 
    And duplicate is better than hole (with a duplicate, in later versions we
349
 
    can add detection and scrap one event; with a hole there's nothing we can
350
 
    do).
351
 
  */
352
 
 
353
 
  /*
354
 
     In certain cases this code may create master.info files that seems
355
 
     corrupted, because of extra lines filled with garbage in the end
356
 
     file (this happens if new contents take less space than previous
357
 
     contents of file). But because of number of lines in the first line
358
 
     of file we don't care about this garbage.
359
 
  */
360
 
  char heartbeat_buf[sizeof(mi->heartbeat_period) * 4]; // buffer to suffice always
361
 
  sprintf(heartbeat_buf, "%.3f", mi->heartbeat_period);
362
 
  my_b_seek(file, 0L);
363
 
  my_b_printf(file,
364
 
              "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n",
365
 
              LINES_IN_MASTER_INFO,
366
 
              mi->master_log_name, llstr(mi->master_log_pos, lbuf),
367
 
              mi->host, mi->user,
368
 
              mi->password, mi->port, mi->connect_retry,
369
 
              (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
370
 
              mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
371
 
              heartbeat_buf);
372
 
  return(-flush_io_cache(file));
 
280
 
 
281
  /* Write Master info file here (from info_filename) */
 
282
  assert(info_filename.length());
 
283
  assert(list.record_size() == 1);
 
284
  drizzle::MasterList_Record *record= list.mutable_record(0);
 
285
 
 
286
  record->set_hostname(host);
 
287
  record->set_username(user);
 
288
  record->set_password(password);
 
289
  record->set_port(port);
 
290
  record->set_connect_retry(connect_retry);
 
291
  record->set_log_name(log_name);
 
292
  record->set_log_position(log_pos);
 
293
 
 
294
  fstream output(info_filename.c_str(), ios::out | ios::trunc | ios::binary);
 
295
  if (!list.SerializeToOstream(&output)) 
 
296
  { 
 
297
    assert(0);
 
298
    return 1;
 
299
  }
 
300
 
 
301
  return 0;
373
302
}
374
303
 
375
304
 
376
 
void end_master_info(Master_info* mi)
 
305
void Master_info::end_master_info()
377
306
{
378
 
  if (!mi->inited)
 
307
  if (!inited)
379
308
    return;
380
 
  end_relay_log_info(&mi->rli);
381
 
  if (mi->fd >= 0)
382
 
  {
383
 
    end_io_cache(&mi->file);
384
 
    (void)my_close(mi->fd, MYF(MY_WME));
385
 
    mi->fd = -1;
386
 
  }
387
 
  mi->inited = 0;
 
309
  end_relay_log_info(&rli);
 
310
  inited = 0;
388
311
 
389
312
  return;
390
313
}