~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/replication_slave.cc

  • Committer: Lee Bieber
  • Date: 2011-04-02 03:51:56 UTC
  • mfrom: (2260.1.1 revert_slave)
  • Revision ID: kalebral@gmail.com-20110402035156-lhfvo7o6yqtli0xs
Merge Shrews - Revert the multi-master slave plugin changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include <config.h>
22
22
#include <plugin/slave/replication_slave.h>
 
23
#include <drizzled/program_options/config_file.h>
23
24
#include <drizzled/errmsg_print.h>
24
 
#include <drizzled/program_options/config_file.h>
25
 
#include <boost/lexical_cast.hpp>
26
25
#include <boost/program_options.hpp>
27
26
#include <fstream>
28
27
#include <drizzled/plugin.h>
35
34
namespace slave
36
35
{
37
36
 
 
37
/* Gets called after all plugins are initialized. */
38
38
void ReplicationSlave::startup(Session &session)
39
39
{
40
40
  (void)session;
41
41
  if (not initWithConfig())
42
42
  {
43
 
    errmsg_printf(error::ERROR, _("Could not start slave services: %s\n"),
44
 
                  _error.c_str());
 
43
    errmsg_printf(error::ERROR,
 
44
                  _("Could not start slave services: %s\n"),
 
45
                  getError().c_str());
45
46
  }
46
47
  else
47
48
  {
48
 
    /* Start the IO threads */
49
 
    boost::unordered_map<uint32_t, Master *>::const_iterator it;
50
 
    for (it= _masters.begin(); it != _masters.end(); ++it)
51
 
    {
52
 
      it->second->start();
53
 
      /* Consumer must know server IDs */
54
 
      _consumer.addMasterId(it->first);
55
 
    }
56
 
 
57
49
    _consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
 
50
    _producer_thread= boost::thread(&QueueProducer::run, &_producer);
58
51
  }
59
52
}
60
53
 
63
56
  po::variables_map vm;
64
57
  po::options_description slave_options("Options for the slave plugin");
65
58
 
66
 
  /* Common options */
67
59
  slave_options.add_options()
 
60
    ("master-host", po::value<string>()->default_value(""))
 
61
    ("master-port", po::value<uint16_t>()->default_value(3306))
 
62
    ("master-user", po::value<string>()->default_value(""))
 
63
    ("master-pass", po::value<string>()->default_value(""))
 
64
    ("max-reconnects", po::value<uint32_t>()->default_value(10))
68
65
    ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
69
66
    ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
70
 
    ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
71
 
    ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
72
 
 
73
 
  /* Master defining options */
74
 
  for (size_t num= 1; num <= 10; num++)
75
 
  {
76
 
    string section("master");
77
 
    section.append(boost::lexical_cast<string>(num));
78
 
    slave_options.add_options()
79
 
      ((section + ".master-host").c_str(), po::value<string>()->default_value(""))
80
 
      ((section + ".master-port").c_str(), po::value<uint16_t>()->default_value(3306))
81
 
      ((section + ".master-user").c_str(), po::value<string>()->default_value(""))
82
 
      ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
83
 
      ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
84
 
      ((section + ".max-commit-id").c_str(), po::value<uint64_t>());
85
 
   }
 
67
    ("applier-thread-sleep", po::value<uint32_t>()->default_value(5));
86
68
 
87
69
  ifstream cf_stream(_config_file.c_str());
88
70
 
97
79
 
98
80
  po::notify(vm);
99
81
 
100
 
  /*
101
 
   * We will support 10 masters. This loope effectively creates the Master
102
 
   * objects as they are referenced.
103
 
   *
104
 
   * @todo Support a variable number of master hosts.
105
 
   */
106
 
  for (size_t num= 1; num <= 10; num++)
107
 
  {
108
 
    string section("master");
109
 
    section.append(boost::lexical_cast<string>(num));
110
 
 
111
 
    /* WARNING! Hack!
112
 
     * We need to be able to determine when a master host is actually defined
113
 
     * by the user vs. we are just using defaults. So if the hostname is ever
114
 
     * the default value of "", then we'll assume that this section was not
115
 
     * user defined.
116
 
     */
117
 
    if (vm[section + ".master-host"].as<string>() == "")
118
 
      continue;
119
 
 
120
 
    _masters[num]= new (std::nothrow) Master(num);
121
 
 
122
 
    if (vm.count(section + ".master-host"))
123
 
      master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
124
 
 
125
 
    if (vm.count(section + ".master-port"))
126
 
      master(num).producer().setMasterPort(vm[section + ".master-port"].as<uint16_t>());
127
 
 
128
 
    if (vm.count(section + ".master-user"))
129
 
      master(num).producer().setMasterUser(vm[section + ".master-user"].as<string>());
130
 
 
131
 
    if (vm.count(section + ".master-pass"))
132
 
      master(num).producer().setMasterPassword(vm[section + ".master-pass"].as<string>());
133
 
 
134
 
    if (vm.count(section + ".max-commit-id"))
135
 
      master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
136
 
  }
137
 
 
138
 
  boost::unordered_map<uint32_t, Master *>::const_iterator it;
139
 
 
140
 
  for (it= _masters.begin(); it != _masters.end(); ++it)
141
 
  {
142
 
    if (vm.count("max-reconnects"))
143
 
      it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
144
 
 
145
 
    if (vm.count("seconds-between-reconnects"))
146
 
      it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
147
 
 
148
 
    if (vm.count("io-thread-sleep"))
149
 
      it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
150
 
  }
 
82
  if (vm.count("master-host"))
 
83
    _producer.setMasterHost(vm["master-host"].as<string>());
 
84
 
 
85
  if (vm.count("master-port"))
 
86
    _producer.setMasterPort(vm["master-port"].as<uint16_t>());
 
87
 
 
88
  if (vm.count("master-user"))
 
89
    _producer.setMasterUser(vm["master-user"].as<string>());
 
90
 
 
91
  if (vm.count("master-pass"))
 
92
    _producer.setMasterPassword(vm["master-pass"].as<string>());
 
93
 
 
94
  if (vm.count("max-reconnects"))
 
95
    _producer.setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
 
96
 
 
97
  if (vm.count("seconds-between-reconnects"))
 
98
    _producer.setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
 
99
 
 
100
  if (vm.count("io-thread-sleep"))
 
101
    _producer.setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
151
102
 
152
103
  if (vm.count("applier-thread-sleep"))
153
104
    _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
154
 
  if (vm.count("ignore-errors"))
155
 
    _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
156
105
 
157
106
  /* setup schema and tables */
158
107
  ReplicationSchema rs;
162
111
    return false;
163
112
  }
164
113
 
165
 
  for (it= _masters.begin(); it != _masters.end(); ++it)
 
114
  if (_initial_max_commit_id)
166
115
  {
167
 
    /* make certain a row exists for each master */
168
 
    rs.createInitialApplierRow(it->first);
169
 
    rs.createInitialIORow(it->first);
170
 
 
171
 
    uint64_t cachedValue= it->second->producer().cachedMaxCommitId();
172
 
    if (cachedValue)
 
116
    if (not rs.setInitialMaxCommitId(_initial_max_commit_id))
173
117
    {
174
 
      if (not rs.setInitialMaxCommitId(it->first, cachedValue))
175
 
      {
176
 
        _error= rs.getErrorMessage();
177
 
        return false;
178
 
      }
 
118
      _error= rs.getErrorMessage();
 
119
      return false;
179
120
    }
180
121
  }
181
122