43
43
#include "drizzled/session.h"
44
44
#include "drizzled/error.h"
48
50
using namespace std;
53
ReplicationServices::ReplicationServices()
55
ReplicationServices::ReplicationServices() :
58
void ReplicationServices::evaluateActivePlugins()
60
void ReplicationServices::normalizeReplicatorName(string &name)
61
* We loop through replicators and appliers, evaluating
62
* whether or not there is at least one active replicator
63
* and one active applier. If not, we set is_active
66
bool tmp_is_active= false;
68
if (replicators.empty() || appliers.empty())
75
* Determine if any remaining replicators and if those
76
* replicators are active...if not, set is_active
79
for (Replicators::iterator repl_iter= replicators.begin();
80
repl_iter != replicators.end();
83
if ((*repl_iter)->isEnabled())
62
transform(name.begin(),
66
if (name.find("replicator") == string::npos)
67
name.append("replicator", 10);
69
size_t found_underscore= name.find('_');
70
while (found_underscore != string::npos)
72
name.erase(found_underscore, 1);
73
found_underscore= name.find('_');
91
/* No active replicators. Set is_active to false and exit. */
78
bool ReplicationServices::evaluateRegisteredPlugins()
97
* OK, we know there's at least one active replicator.
99
* Now determine if any remaining replicators and if those
100
* replicators are active...if not, set is_active
81
* We loop through appliers that have registered with us
82
* and attempts to pair the applier with its requested
83
* replicator. If an applier has requested a replicator
84
* that has either not been built or has not registered
85
* with the replication services, we print an error and
91
if (replicators.empty() && not appliers.empty())
93
errmsg_printf(ERRMSG_LVL_ERROR,
94
N_("You registered a TransactionApplier plugin but no "
95
"TransactionReplicator plugins were registered.\n"));
103
99
for (Appliers::iterator appl_iter= appliers.begin();
104
100
appl_iter != appliers.end();
107
if ((*appl_iter)->isEnabled())
103
plugin::TransactionApplier *applier= (*appl_iter).second;
104
string requested_replicator_name= (*appl_iter).first;
105
normalizeReplicatorName(requested_replicator_name);
108
Replicators::iterator repl_iter;
109
for (repl_iter= replicators.begin();
110
repl_iter != replicators.end();
113
string replicator_name= (*repl_iter)->getName();
114
normalizeReplicatorName(replicator_name);
116
if (requested_replicator_name.compare(replicator_name) == 0)
124
errmsg_printf(ERRMSG_LVL_ERROR,
125
N_("You registered a TransactionApplier plugin but no "
126
"TransactionReplicator plugins were registered that match the "
127
"requested replicator name of '%s'.\n"
128
"We have deactivated the TransactionApplier '%s'.\n"),
129
requested_replicator_name.c_str(),
130
applier->getName().c_str());
131
applier->deactivate();
136
replication_streams.push_back(make_pair(*repl_iter, applier));
113
/* If we get here, there are no active appliers */
117
143
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
119
145
replicators.push_back(in_replicator);
120
evaluateActivePlugins();
123
148
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
125
150
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
126
evaluateActivePlugins();
129
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
131
appliers.push_back(in_applier);
132
evaluateActivePlugins();
135
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
137
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
138
evaluateActivePlugins();
153
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
155
appliers.push_back(make_pair(requested_replicator_name, in_applier));
158
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
141
162
bool ReplicationServices::isActive() const
146
167
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
147
168
message::Transaction &to_push)
149
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
150
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
151
appl_start_iter= appliers.begin();
153
plugin::TransactionReplicator *cur_repl;
154
plugin::TransactionApplier *cur_appl;
156
170
plugin::ReplicationReturnCode result= plugin::SUCCESS;
158
while (repl_iter != replicators.end())
172
for (ReplicationStreams::iterator iter= replication_streams.begin();
173
iter != replication_streams.end();
160
cur_repl= *repl_iter;
161
if (! cur_repl->isEnabled())
167
appl_iter= appl_start_iter;
168
while (appl_iter != appliers.end())
170
cur_appl= *appl_iter;
172
if (! cur_appl->isEnabled())
178
result= cur_repl->replicate(cur_appl, in_session, to_push);
180
if (result == plugin::SUCCESS)
183
* We update the timestamp for the last applied Transaction so that
184
* publisher plugins can ask the replication services when the
185
* last known applied Transaction was using the getLastAppliedTimestamp()
188
last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
176
plugin::TransactionReplicator *cur_repl= (*iter).first;
177
plugin::TransactionApplier *cur_appl= (*iter).second;
179
result= cur_repl->replicate(cur_appl, in_session, to_push);
181
if (result == plugin::SUCCESS)
184
* We update the timestamp for the last applied Transaction so that
185
* publisher plugins can ask the replication services when the
186
* last known applied Transaction was using the getLastAppliedTimestamp()
189
last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
197
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
199
return replication_streams;
199
202
} /* namespace drizzled */