37
/* Gets called after all plugins are initialized. */
38
38
void ReplicationSlave::startup(Session &session)
41
41
if (not initWithConfig())
43
errmsg_printf(error::ERROR,
44
_("Could not start slave services: %s\n"),
43
errmsg_printf(error::ERROR, _("Could not start slave services: %s\n"),
48
/* Start the IO threads */
49
boost::unordered_map<uint32_t, Master *>::const_iterator it;
50
for (it= _masters.begin(); it != _masters.end(); ++it)
53
/* Consumer must know server IDs */
54
_consumer.addMasterId(it->first);
49
57
_consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
50
_producer_thread= boost::thread(&QueueProducer::run, &_producer);
56
63
po::variables_map vm;
57
64
po::options_description slave_options("Options for the slave plugin");
59
67
slave_options.add_options()
60
("master-host", po::value<string>()->default_value(""))
61
("master-port", po::value<uint16_t>()->default_value(3306))
62
("master-user", po::value<string>()->default_value(""))
63
("master-pass", po::value<string>()->default_value(""))
64
("max-reconnects", po::value<uint32_t>()->default_value(10))
65
68
("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
66
69
("io-thread-sleep", po::value<uint32_t>()->default_value(5))
67
("applier-thread-sleep", po::value<uint32_t>()->default_value(5));
70
("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
71
("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
73
/* Master defining options */
74
for (size_t num= 1; num <= 10; num++)
76
string section("master");
77
section.append(boost::lexical_cast<string>(num));
78
slave_options.add_options()
79
((section + ".master-host").c_str(), po::value<string>()->default_value(""))
80
((section + ".master-port").c_str(), po::value<uint16_t>()->default_value(3306))
81
((section + ".master-user").c_str(), po::value<string>()->default_value(""))
82
((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
83
((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
84
((section + ".max-commit-id").c_str(), po::value<uint64_t>());
69
87
ifstream cf_stream(_config_file.c_str());
82
if (vm.count("master-host"))
83
_producer.setMasterHost(vm["master-host"].as<string>());
85
if (vm.count("master-port"))
86
_producer.setMasterPort(vm["master-port"].as<uint16_t>());
88
if (vm.count("master-user"))
89
_producer.setMasterUser(vm["master-user"].as<string>());
91
if (vm.count("master-pass"))
92
_producer.setMasterPassword(vm["master-pass"].as<string>());
94
if (vm.count("max-reconnects"))
95
_producer.setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
97
if (vm.count("seconds-between-reconnects"))
98
_producer.setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
100
if (vm.count("io-thread-sleep"))
101
_producer.setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
101
* We will support 10 masters. This loope effectively creates the Master
102
* objects as they are referenced.
104
* @todo Support a variable number of master hosts.
106
for (size_t num= 1; num <= 10; num++)
108
string section("master");
109
section.append(boost::lexical_cast<string>(num));
112
* We need to be able to determine when a master host is actually defined
113
* by the user vs. we are just using defaults. So if the hostname is ever
114
* the default value of "", then we'll assume that this section was not
117
if (vm[section + ".master-host"].as<string>() == "")
120
_masters[num]= new (std::nothrow) Master(num);
122
if (vm.count(section + ".master-host"))
123
master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
125
if (vm.count(section + ".master-port"))
126
master(num).producer().setMasterPort(vm[section + ".master-port"].as<uint16_t>());
128
if (vm.count(section + ".master-user"))
129
master(num).producer().setMasterUser(vm[section + ".master-user"].as<string>());
131
if (vm.count(section + ".master-pass"))
132
master(num).producer().setMasterPassword(vm[section + ".master-pass"].as<string>());
134
if (vm.count(section + ".max-commit-id"))
135
master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
138
boost::unordered_map<uint32_t, Master *>::const_iterator it;
140
for (it= _masters.begin(); it != _masters.end(); ++it)
142
if (vm.count("max-reconnects"))
143
it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
145
if (vm.count("seconds-between-reconnects"))
146
it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
148
if (vm.count("io-thread-sleep"))
149
it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
103
152
if (vm.count("applier-thread-sleep"))
104
153
_consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
154
if (vm.count("ignore-errors"))
155
_consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
106
157
/* setup schema and tables */
107
158
ReplicationSchema rs;
114
if (_initial_max_commit_id)
165
for (it= _masters.begin(); it != _masters.end(); ++it)
116
if (not rs.setInitialMaxCommitId(_initial_max_commit_id))
167
/* make certain a row exists for each master */
168
rs.createInitialApplierRow(it->first);
169
rs.createInitialIORow(it->first);
171
uint64_t cachedValue= it->second->producer().cachedMaxCommitId();
118
_error= rs.getErrorMessage();
174
if (not rs.setInitialMaxCommitId(it->first, cachedValue))
176
_error= rs.getErrorMessage();