~drizzle-trunk/drizzle/development

2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2011 David Shrewsbury
5
 *
6
 *  This program is free software; you can redistribute it and/or modify
7
 *  it under the terms of the GNU General Public License as published by
8
 *  the Free Software Foundation; either version 2 of the License, or
9
 *  (at your option) any later version.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program; if not, write to the Free Software
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
2116.1.38 by David Shrewsbury
Change include style
21
#include <config.h>
22
#include <plugin/slave/replication_slave.h>
2360.1.1 by Mark Atwood
restore multi master replication
23
#include <drizzled/errmsg_print.h>
2260.1.1 by David Shrewsbury
Revert multi-master code
24
#include <drizzled/program_options/config_file.h>
2360.1.1 by Mark Atwood
restore multi master replication
25
#include <boost/lexical_cast.hpp>
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
26
#include <boost/program_options.hpp>
27
#include <fstream>
2239.1.6 by Olaf van der Spek
Refactor includes
28
#include <drizzled/plugin.h>
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
29
30
using namespace std;
31
using namespace drizzled;
32
33
namespace po= boost::program_options;
34
35
namespace slave
36
{
37
38
void ReplicationSlave::startup(Session &session)
39
{
40
  (void)session;
41
  if (not initWithConfig())
42
  {
2360.1.1 by Mark Atwood
restore multi master replication
43
    errmsg_printf(error::ERROR, _("Could not start slave services: %s\n"),
44
                  _error.c_str());
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
45
  }
46
  else
47
  {
2360.1.1 by Mark Atwood
restore multi master replication
48
    /* Start the IO threads */
49
    boost::unordered_map<uint32_t, Master *>::const_iterator it;
50
    for (it= _masters.begin(); it != _masters.end(); ++it)
51
    {
52
      it->second->start();
53
      /* Consumer must know server IDs */
54
      _consumer.addMasterId(it->first);
55
    }
56
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
57
    _consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
58
  }
59
}
60
61
bool ReplicationSlave::initWithConfig()
62
{
63
  po::variables_map vm;
64
  po::options_description slave_options("Options for the slave plugin");
65
2360.1.1 by Mark Atwood
restore multi master replication
66
  /* Common options */
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
67
  slave_options.add_options()
2207.4.1 by David Shrewsbury
Add config variables for io/applier thread sleep
68
    ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
69
    ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
2360.1.1 by Mark Atwood
restore multi master replication
70
    ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
71
    ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
72
73
  /* Master defining options */
74
  for (size_t num= 1; num <= 10; num++)
75
  {
76
    string section("master");
77
    section.append(boost::lexical_cast<string>(num));
78
    slave_options.add_options()
79
      ((section + ".master-host").c_str(), po::value<string>()->default_value(""))
80
      ((section + ".master-port").c_str(), po::value<uint16_t>()->default_value(3306))
81
      ((section + ".master-user").c_str(), po::value<string>()->default_value(""))
82
      ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
83
      ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
84
      ((section + ".max-commit-id").c_str(), po::value<uint64_t>());
85
   }
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
86
87
  ifstream cf_stream(_config_file.c_str());
2213.2.1 by David Shrewsbury
Verify that slave config file is opened successfully and report a better error message if not. Update docs with better information about specifying config file and with some missing options.
88
89
  if (not cf_stream.is_open())
90
  {
91
    _error= "Unable to open file ";
92
    _error.append(_config_file);
93
    return false;
94
  }
95
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
96
  po::store(drizzled::program_options::parse_config_file(cf_stream, slave_options), vm);
97
98
  po::notify(vm);
99
2360.1.1 by Mark Atwood
restore multi master replication
100
  /*
101
   * We will support 10 masters. This loope effectively creates the Master
102
   * objects as they are referenced.
103
   *
104
   * @todo Support a variable number of master hosts.
105
   */
106
  for (size_t num= 1; num <= 10; num++)
107
  {
108
    string section("master");
109
    section.append(boost::lexical_cast<string>(num));
110
111
    /* WARNING! Hack!
112
     * We need to be able to determine when a master host is actually defined
113
     * by the user vs. we are just using defaults. So if the hostname is ever
114
     * the default value of "", then we'll assume that this section was not
115
     * user defined.
116
     */
117
    if (vm[section + ".master-host"].as<string>() == "")
118
      continue;
119
120
    _masters[num]= new (std::nothrow) Master(num);
121
122
    if (vm.count(section + ".master-host"))
123
      master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
124
125
    if (vm.count(section + ".master-port"))
126
      master(num).producer().setMasterPort(vm[section + ".master-port"].as<uint16_t>());
127
128
    if (vm.count(section + ".master-user"))
129
      master(num).producer().setMasterUser(vm[section + ".master-user"].as<string>());
130
131
    if (vm.count(section + ".master-pass"))
132
      master(num).producer().setMasterPassword(vm[section + ".master-pass"].as<string>());
133
134
    if (vm.count(section + ".max-commit-id"))
135
      master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
136
  }
137
138
  boost::unordered_map<uint32_t, Master *>::const_iterator it;
139
140
  for (it= _masters.begin(); it != _masters.end(); ++it)
141
  {
142
    if (vm.count("max-reconnects"))
143
      it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
144
145
    if (vm.count("seconds-between-reconnects"))
146
      it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
147
148
    if (vm.count("io-thread-sleep"))
149
      it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
150
  }
2207.4.1 by David Shrewsbury
Add config variables for io/applier thread sleep
151
152
  if (vm.count("applier-thread-sleep"))
153
    _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
2360.1.1 by Mark Atwood
restore multi master replication
154
  if (vm.count("ignore-errors"))
155
    _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
2207.4.1 by David Shrewsbury
Add config variables for io/applier thread sleep
156
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
157
  /* setup schema and tables */
158
  ReplicationSchema rs;
2116.1.32 by David Shrewsbury
incremental
159
  if (not rs.create())
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
160
  {
161
    _error= rs.getErrorMessage();
162
    return false;
163
  }
164
2360.1.1 by Mark Atwood
restore multi master replication
165
  for (it= _masters.begin(); it != _masters.end(); ++it)
2225.3.1 by David Shrewsbury
Persist --slave.max-commit-id value to applier_state table.
166
  {
2360.1.1 by Mark Atwood
restore multi master replication
167
    /* make certain a row exists for each master */
168
    rs.createInitialApplierRow(it->first);
169
    rs.createInitialIORow(it->first);
170
171
    uint64_t cachedValue= it->second->producer().cachedMaxCommitId();
172
    if (cachedValue)
2225.3.1 by David Shrewsbury
Persist --slave.max-commit-id value to applier_state table.
173
    {
2360.1.1 by Mark Atwood
restore multi master replication
174
      if (not rs.setInitialMaxCommitId(it->first, cachedValue))
175
      {
176
        _error= rs.getErrorMessage();
177
        return false;
178
      }
2225.3.1 by David Shrewsbury
Persist --slave.max-commit-id value to applier_state table.
179
    }
180
  }
181
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
182
  return true;
183
}
184
185
} /* namespace slave */