~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/replication_slave.cc

  • Committer: Andrew Hutchings
  • Date: 2011-03-29 20:45:43 UTC
  • mfrom: (2257 drizzle)
  • mto: (2257.1.3 build)
  • mto: This revision was merged to the branch mainline in revision 2258.
  • Revision ID: andrew@linuxjedi.co.uk-20110329204543-ssex0nuo8knncgwx
Merge with trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include <config.h>
22
22
#include <plugin/slave/replication_slave.h>
 
23
#include <drizzled/errmsg_print.h>
23
24
#include <drizzled/program_options/config_file.h>
24
 
#include <drizzled/errmsg_print.h>
 
25
#include <boost/lexical_cast.hpp>
25
26
#include <boost/program_options.hpp>
26
27
#include <fstream>
27
28
#include <drizzled/plugin.h>
34
35
namespace slave
35
36
{
36
37
 
37
 
/* Gets called after all plugins are initialized. */
38
38
void ReplicationSlave::startup(Session &session)
39
39
{
40
40
  (void)session;
41
41
  if (not initWithConfig())
42
42
  {
43
 
    errmsg_printf(error::ERROR,
44
 
                  _("Could not start slave services: %s\n"),
45
 
                  getError().c_str());
 
43
    errmsg_printf(error::ERROR, _("Could not start slave services: %s\n"),
 
44
                  _error.c_str());
46
45
  }
47
46
  else
48
47
  {
 
48
    /* Start the IO threads */
 
49
    boost::unordered_map<uint32_t, Master *>::const_iterator it;
 
50
    for (it= _masters.begin(); it != _masters.end(); ++it)
 
51
    {
 
52
      it->second->start();
 
53
      /* Consumer must know server IDs */
 
54
      _consumer.addMasterId(it->first);
 
55
    }
 
56
 
49
57
    _consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
50
 
    _producer_thread= boost::thread(&QueueProducer::run, &_producer);
51
58
  }
52
59
}
53
60
 
56
63
  po::variables_map vm;
57
64
  po::options_description slave_options("Options for the slave plugin");
58
65
 
 
66
  /* Common options */
59
67
  slave_options.add_options()
60
 
    ("master-host", po::value<string>()->default_value(""))
61
 
    ("master-port", po::value<uint16_t>()->default_value(3306))
62
 
    ("master-user", po::value<string>()->default_value(""))
63
 
    ("master-pass", po::value<string>()->default_value(""))
64
 
    ("max-reconnects", po::value<uint32_t>()->default_value(10))
65
68
    ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
66
69
    ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
67
 
    ("applier-thread-sleep", po::value<uint32_t>()->default_value(5));
 
70
    ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
 
71
    ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
 
72
 
 
73
  /* Master defining options */
 
74
  for (size_t num= 1; num <= 10; num++)
 
75
  {
 
76
    string section("master");
 
77
    section.append(boost::lexical_cast<string>(num));
 
78
    slave_options.add_options()
 
79
      ((section + ".master-host").c_str(), po::value<string>()->default_value(""))
 
80
      ((section + ".master-port").c_str(), po::value<uint16_t>()->default_value(3306))
 
81
      ((section + ".master-user").c_str(), po::value<string>()->default_value(""))
 
82
      ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
 
83
      ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
 
84
      ((section + ".max-commit-id").c_str(), po::value<uint64_t>());
 
85
   }
68
86
 
69
87
  ifstream cf_stream(_config_file.c_str());
70
88
 
79
97
 
80
98
  po::notify(vm);
81
99
 
82
 
  if (vm.count("master-host"))
83
 
    _producer.setMasterHost(vm["master-host"].as<string>());
84
 
 
85
 
  if (vm.count("master-port"))
86
 
    _producer.setMasterPort(vm["master-port"].as<uint16_t>());
87
 
 
88
 
  if (vm.count("master-user"))
89
 
    _producer.setMasterUser(vm["master-user"].as<string>());
90
 
 
91
 
  if (vm.count("master-pass"))
92
 
    _producer.setMasterPassword(vm["master-pass"].as<string>());
93
 
 
94
 
  if (vm.count("max-reconnects"))
95
 
    _producer.setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
96
 
 
97
 
  if (vm.count("seconds-between-reconnects"))
98
 
    _producer.setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
99
 
 
100
 
  if (vm.count("io-thread-sleep"))
101
 
    _producer.setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
 
100
  /*
 
101
   * We will support 10 masters. This loope effectively creates the Master
 
102
   * objects as they are referenced.
 
103
   *
 
104
   * @todo Support a variable number of master hosts.
 
105
   */
 
106
  for (size_t num= 1; num <= 10; num++)
 
107
  {
 
108
    string section("master");
 
109
    section.append(boost::lexical_cast<string>(num));
 
110
 
 
111
    /* WARNING! Hack!
 
112
     * We need to be able to determine when a master host is actually defined
 
113
     * by the user vs. we are just using defaults. So if the hostname is ever
 
114
     * the default value of "", then we'll assume that this section was not
 
115
     * user defined.
 
116
     */
 
117
    if (vm[section + ".master-host"].as<string>() == "")
 
118
      continue;
 
119
 
 
120
    _masters[num]= new (std::nothrow) Master(num);
 
121
 
 
122
    if (vm.count(section + ".master-host"))
 
123
      master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
 
124
 
 
125
    if (vm.count(section + ".master-port"))
 
126
      master(num).producer().setMasterPort(vm[section + ".master-port"].as<uint16_t>());
 
127
 
 
128
    if (vm.count(section + ".master-user"))
 
129
      master(num).producer().setMasterUser(vm[section + ".master-user"].as<string>());
 
130
 
 
131
    if (vm.count(section + ".master-pass"))
 
132
      master(num).producer().setMasterPassword(vm[section + ".master-pass"].as<string>());
 
133
 
 
134
    if (vm.count(section + ".max-commit-id"))
 
135
      master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
 
136
  }
 
137
 
 
138
  boost::unordered_map<uint32_t, Master *>::const_iterator it;
 
139
 
 
140
  for (it= _masters.begin(); it != _masters.end(); ++it)
 
141
  {
 
142
    if (vm.count("max-reconnects"))
 
143
      it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
 
144
 
 
145
    if (vm.count("seconds-between-reconnects"))
 
146
      it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
 
147
 
 
148
    if (vm.count("io-thread-sleep"))
 
149
      it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
 
150
  }
102
151
 
103
152
  if (vm.count("applier-thread-sleep"))
104
153
    _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
 
154
  if (vm.count("ignore-errors"))
 
155
    _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
105
156
 
106
157
  /* setup schema and tables */
107
158
  ReplicationSchema rs;
111
162
    return false;
112
163
  }
113
164
 
114
 
  if (_initial_max_commit_id)
 
165
  for (it= _masters.begin(); it != _masters.end(); ++it)
115
166
  {
116
 
    if (not rs.setInitialMaxCommitId(_initial_max_commit_id))
 
167
    /* make certain a row exists for each master */
 
168
    rs.createInitialApplierRow(it->first);
 
169
    rs.createInitialIORow(it->first);
 
170
 
 
171
    uint64_t cachedValue= it->second->producer().cachedMaxCommitId();
 
172
    if (cachedValue)
117
173
    {
118
 
      _error= rs.getErrorMessage();
119
 
      return false;
 
174
      if (not rs.setInitialMaxCommitId(it->first, cachedValue))
 
175
      {
 
176
        _error= rs.getErrorMessage();
 
177
        return false;
 
178
      }
120
179
    }
121
180
  }
122
181