2116.1.23
by David Shrewsbury
Added empty version of producer thread |
1 |
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
|
2 |
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
|
|
3 |
*
|
|
4 |
* Copyright (C) 2011 David Shrewsbury
|
|
5 |
*
|
|
6 |
* This program is free software; you can redistribute it and/or modify
|
|
7 |
* it under the terms of the GNU General Public License as published by
|
|
8 |
* the Free Software Foundation; either version 2 of the License, or
|
|
9 |
* (at your option) any later version.
|
|
10 |
*
|
|
11 |
* This program is distributed in the hope that it will be useful,
|
|
12 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
13 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
14 |
* GNU General Public License for more details.
|
|
15 |
*
|
|
16 |
* You should have received a copy of the GNU General Public License
|
|
17 |
* along with this program; if not, write to the Free Software
|
|
18 |
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
19 |
*/
|
|
20 |
||
2116.1.38
by David Shrewsbury
Change include style |
21 |
#include <plugin/slave/queue_producer.h> |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
22 |
#include <drizzled/errmsg_print.h> |
2116.1.32
by David Shrewsbury
incremental |
23 |
#include <drizzled/sql/result_set.h> |
24 |
#include <drizzled/execute.h> |
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
25 |
#include <drizzled/gettext.h> |
2116.1.32
by David Shrewsbury
incremental |
26 |
#include <drizzled/message/transaction.pb.h> |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
27 |
#include <boost/lexical_cast.hpp> |
2116.1.32
by David Shrewsbury
incremental |
28 |
#include <google/protobuf/text_format.h> |
2360.1.1
by Mark Atwood
restore multi master replication |
29 |
#include <string> |
30 |
#include <vector> |
|
2116.1.23
by David Shrewsbury
Added empty version of producer thread |
31 |
|
2116.1.29
by David Shrewsbury
Initial stages of master connection |
32 |
using namespace std; |
33 |
using namespace drizzled; |
|
2116.1.28
by David Shrewsbury
More config file work |
34 |
|
2116.1.23
by David Shrewsbury
Added empty version of producer thread |
35 |
namespace slave |
36 |
{
|
|
37 |
||
2116.1.29
by David Shrewsbury
Initial stages of master connection |
38 |
QueueProducer::~QueueProducer() |
39 |
{
|
|
2116.1.30
by David Shrewsbury
incremental |
40 |
if (_is_connected) |
41 |
closeConnection(); |
|
2116.1.29
by David Shrewsbury
Initial stages of master connection |
42 |
}
|
43 |
||
2116.1.23
by David Shrewsbury
Added empty version of producer thread |
44 |
bool QueueProducer::init() |
45 |
{
|
|
2116.1.33
by David Shrewsbury
incremental |
46 |
setIOState("", true); |
2116.2.1
by Joseph Daly
call reconnect() rather then openConnection() upon initialization, this fixes bug 720908 |
47 |
return reconnect(true); |
2116.1.23
by David Shrewsbury
Added empty version of producer thread |
48 |
}
|
49 |
||
50 |
bool QueueProducer::process() |
|
51 |
{
|
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
52 |
if (_saved_max_commit_id == 0) |
53 |
{
|
|
54 |
if (not queryForMaxCommitId(&_saved_max_commit_id)) |
|
55 |
{
|
|
2116.1.32
by David Shrewsbury
incremental |
56 |
if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION) |
57 |
{
|
|
2116.2.1
by Joseph Daly
call reconnect() rather then openConnection() upon initialization, this fixes bug 720908 |
58 |
if (reconnect(false)) |
2116.1.32
by David Shrewsbury
incremental |
59 |
{
|
60 |
return true; /* reconnect successful, try again */ |
|
61 |
}
|
|
62 |
else
|
|
63 |
{
|
|
2116.1.33
by David Shrewsbury
incremental |
64 |
_last_error_message= "Master offline"; |
2116.1.32
by David Shrewsbury
incremental |
65 |
return false; /* reconnect failed, shutdown the thread */ |
66 |
}
|
|
67 |
}
|
|
68 |
else
|
|
69 |
{
|
|
70 |
return false; /* unrecoverable error, shutdown the thread */ |
|
71 |
}
|
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
72 |
}
|
73 |
}
|
|
74 |
||
2221.4.1
by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep. |
75 |
/* Keep getting events until caught up */
|
2221.4.2
by David Shrewsbury
Fix ambiguity of error_t in slave plugin. |
76 |
enum drizzled::error_t err; |
2221.4.1
by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep. |
77 |
while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK) |
78 |
{}
|
|
79 |
||
80 |
if (err == ER_YES) /* We encountered an error */ |
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
81 |
{
|
2116.1.32
by David Shrewsbury
incremental |
82 |
if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION) |
83 |
{
|
|
2116.2.1
by Joseph Daly
call reconnect() rather then openConnection() upon initialization, this fixes bug 720908 |
84 |
if (reconnect(false)) |
2116.1.32
by David Shrewsbury
incremental |
85 |
{
|
86 |
return true; /* reconnect successful, try again */ |
|
87 |
}
|
|
88 |
else
|
|
89 |
{
|
|
2116.1.33
by David Shrewsbury
incremental |
90 |
_last_error_message= "Master offline"; |
2116.1.32
by David Shrewsbury
incremental |
91 |
return false; /* reconnect failed, shutdown the thread */ |
92 |
}
|
|
93 |
}
|
|
94 |
else
|
|
95 |
{
|
|
96 |
return false; /* unrecoverable error, shutdown the thread */ |
|
97 |
}
|
|
2116.1.30
by David Shrewsbury
incremental |
98 |
}
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
99 |
|
2116.1.23
by David Shrewsbury
Added empty version of producer thread |
100 |
return true; |
101 |
}
|
|
102 |
||
103 |
void QueueProducer::shutdown() |
|
104 |
{
|
|
2116.1.33
by David Shrewsbury
incremental |
105 |
setIOState(_last_error_message, false); |
2116.1.30
by David Shrewsbury
incremental |
106 |
if (_is_connected) |
107 |
closeConnection(); |
|
2116.1.23
by David Shrewsbury
Added empty version of producer thread |
108 |
}
|
109 |
||
2116.2.2
by Joseph Daly
fix bad variable names |
110 |
bool QueueProducer::reconnect(bool initial_connection) |
2116.1.32
by David Shrewsbury
incremental |
111 |
{
|
2116.2.2
by Joseph Daly
fix bad variable names |
112 |
if (not initial_connection) |
2116.2.1
by Joseph Daly
call reconnect() rather then openConnection() upon initialization, this fixes bug 720908 |
113 |
{
|
114 |
errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting.")); |
|
115 |
}
|
|
2116.1.32
by David Shrewsbury
incremental |
116 |
|
117 |
_is_connected= false; |
|
118 |
_last_return= DRIZZLE_RETURN_OK; |
|
2116.1.33
by David Shrewsbury
incremental |
119 |
_last_error_message.clear(); |
2116.1.32
by David Shrewsbury
incremental |
120 |
boost::posix_time::seconds duration(_seconds_between_reconnects); |
121 |
||
122 |
uint32_t attempts= 1; |
|
123 |
||
124 |
while (not openConnection()) |
|
125 |
{
|
|
126 |
if (attempts++ == _max_reconnects) |
|
127 |
break; |
|
128 |
boost::this_thread::sleep(duration); |
|
129 |
}
|
|
130 |
||
131 |
return _is_connected; |
|
132 |
}
|
|
133 |
||
2116.1.29
by David Shrewsbury
Initial stages of master connection |
134 |
bool QueueProducer::openConnection() |
135 |
{
|
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
136 |
if (drizzle_create(&_drizzle) == NULL) |
2116.1.29
by David Shrewsbury
Initial stages of master connection |
137 |
{
|
2116.1.32
by David Shrewsbury
incremental |
138 |
_last_return= DRIZZLE_RETURN_INTERNAL_ERROR; |
2116.1.33
by David Shrewsbury
incremental |
139 |
_last_error_message= "Replication slave: "; |
140 |
_last_error_message.append(drizzle_error(&_drizzle)); |
|
141 |
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str()); |
|
2116.1.29
by David Shrewsbury
Initial stages of master connection |
142 |
return false; |
143 |
}
|
|
144 |
||
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
145 |
if (drizzle_con_create(&_drizzle, &_connection) == NULL) |
2116.1.29
by David Shrewsbury
Initial stages of master connection |
146 |
{
|
2116.1.32
by David Shrewsbury
incremental |
147 |
_last_return= DRIZZLE_RETURN_INTERNAL_ERROR; |
2116.1.33
by David Shrewsbury
incremental |
148 |
_last_error_message= "Replication slave: "; |
149 |
_last_error_message.append(drizzle_error(&_drizzle)); |
|
150 |
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str()); |
|
2116.1.29
by David Shrewsbury
Initial stages of master connection |
151 |
return false; |
152 |
}
|
|
153 |
||
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
154 |
drizzle_con_set_tcp(&_connection, _master_host.c_str(), _master_port); |
155 |
drizzle_con_set_auth(&_connection, _master_user.c_str(), _master_pass.c_str()); |
|
2116.1.29
by David Shrewsbury
Initial stages of master connection |
156 |
|
2116.1.32
by David Shrewsbury
incremental |
157 |
drizzle_return_t ret= drizzle_con_connect(&_connection); |
2116.1.29
by David Shrewsbury
Initial stages of master connection |
158 |
|
159 |
if (ret != DRIZZLE_RETURN_OK) |
|
160 |
{
|
|
2116.1.32
by David Shrewsbury
incremental |
161 |
_last_return= ret; |
2116.1.33
by David Shrewsbury
incremental |
162 |
_last_error_message= "Replication slave: "; |
163 |
_last_error_message.append(drizzle_error(&_drizzle)); |
|
164 |
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str()); |
|
2116.1.29
by David Shrewsbury
Initial stages of master connection |
165 |
return false; |
166 |
}
|
|
167 |
||
2116.1.30
by David Shrewsbury
incremental |
168 |
_is_connected= true; |
169 |
||
2116.1.29
by David Shrewsbury
Initial stages of master connection |
170 |
return true; |
171 |
}
|
|
172 |
||
173 |
bool QueueProducer::closeConnection() |
|
174 |
{
|
|
175 |
drizzle_return_t ret; |
|
176 |
drizzle_result_st result; |
|
177 |
||
2116.1.30
by David Shrewsbury
incremental |
178 |
_is_connected= false; |
179 |
||
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
180 |
if (drizzle_quit(&_connection, &result, &ret) == NULL) |
181 |
{
|
|
2116.1.32
by David Shrewsbury
incremental |
182 |
_last_return= ret; |
2217.1.2
by Andrew Hutchings
Fix leaks in slave result sets |
183 |
drizzle_result_free(&result); |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
184 |
return false; |
185 |
}
|
|
186 |
||
187 |
drizzle_result_free(&result); |
|
188 |
||
189 |
return true; |
|
190 |
}
|
|
191 |
||
2116.1.50
by David Shrewsbury
Fix commit_id to correct uint64_t type |
192 |
bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id) |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
193 |
{
|
194 |
/*
|
|
195 |
* This SQL will get the maximum commit_id value we have pulled over from
|
|
196 |
* the master. We query two tables because either the queue will be empty,
|
|
197 |
* in which case the last_applied_commit_id will be the value we want, or
|
|
198 |
* we have yet to drain the queue, we get the maximum value still in
|
|
199 |
* the queue.
|
|
200 |
*/
|
|
201 |
string sql("SELECT MAX(x.cid) FROM" |
|
2116.1.52
by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word. |
202 |
" (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
|
2360.1.1
by Mark Atwood
restore multi master replication |
203 |
" WHERE `master_id` = "
|
204 |
+ boost::lexical_cast<string>(masterId()) |
|
205 |
+ " UNION ALL SELECT `last_applied_commit_id` AS cid" |
|
206 |
+ " FROM `sys_replication`.`applier_state` WHERE `master_id` = " |
|
207 |
+ boost::lexical_cast<string>(masterId()) |
|
208 |
+ ") AS x"); |
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
209 |
|
2116.1.32
by David Shrewsbury
incremental |
210 |
sql::ResultSet result_set(1); |
211 |
Execute execute(*(_session.get()), true); |
|
212 |
execute.run(sql, result_set); |
|
213 |
assert(result_set.getMetaData().getColumnCount() == 1); |
|
214 |
||
215 |
/* Really should only be 1 returned row */
|
|
216 |
uint32_t found_rows= 0; |
|
217 |
while (result_set.next()) |
|
218 |
{
|
|
219 |
string value= result_set.getString(0); |
|
220 |
||
221 |
if ((value == "") || (found_rows == 1)) |
|
222 |
break; |
|
223 |
||
224 |
assert(result_set.isNull(0) == false); |
|
2116.1.50
by David Shrewsbury
Fix commit_id to correct uint64_t type |
225 |
*max_commit_id= boost::lexical_cast<uint64_t>(value); |
2116.1.32
by David Shrewsbury
incremental |
226 |
found_rows++; |
227 |
}
|
|
228 |
||
229 |
if (found_rows == 0) |
|
2116.1.33
by David Shrewsbury
incremental |
230 |
{
|
231 |
_last_error_message= "Could not determine last committed transaction."; |
|
2116.1.32
by David Shrewsbury
incremental |
232 |
return false; |
2116.1.33
by David Shrewsbury
incremental |
233 |
}
|
2116.1.32
by David Shrewsbury
incremental |
234 |
|
235 |
return true; |
|
236 |
}
|
|
237 |
||
2116.1.50
by David Shrewsbury
Fix commit_id to correct uint64_t type |
238 |
bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id, |
2116.1.32
by David Shrewsbury
incremental |
239 |
vector<uint64_t> &list) |
240 |
{
|
|
241 |
(void)list; |
|
2116.1.40
by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word) |
242 |
string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`" |
2116.1.50
by David Shrewsbury
Fix commit_id to correct uint64_t type |
243 |
" WHERE `commit_id` > "); |
2116.1.32
by David Shrewsbury
incremental |
244 |
sql.append(boost::lexical_cast<string>(max_commit_id)); |
2116.1.51
by David Shrewsbury
Fix for bug 720886: cached max commit_id value was not being set properly causing us to pull the same message from the master more than once |
245 |
sql.append(" ORDER BY `commit_id` LIMIT 25"); |
2116.1.32
by David Shrewsbury
incremental |
246 |
|
247 |
drizzle_return_t ret; |
|
248 |
drizzle_result_st result; |
|
249 |
drizzle_query_str(&_connection, &result, sql.c_str(), &ret); |
|
250 |
||
251 |
if (ret != DRIZZLE_RETURN_OK) |
|
252 |
{
|
|
253 |
_last_return= ret; |
|
2116.1.33
by David Shrewsbury
incremental |
254 |
_last_error_message= "Replication slave: "; |
255 |
_last_error_message.append(drizzle_error(&_drizzle)); |
|
256 |
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str()); |
|
2217.1.2
by Andrew Hutchings
Fix leaks in slave result sets |
257 |
drizzle_result_free(&result); |
2116.1.32
by David Shrewsbury
incremental |
258 |
return false; |
259 |
}
|
|
260 |
||
261 |
ret= drizzle_result_buffer(&result); |
|
262 |
||
263 |
if (ret != DRIZZLE_RETURN_OK) |
|
264 |
{
|
|
2116.1.33
by David Shrewsbury
incremental |
265 |
_last_return= ret; |
266 |
_last_error_message= "Replication slave: "; |
|
267 |
_last_error_message.append(drizzle_error(&_drizzle)); |
|
268 |
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str()); |
|
2116.1.32
by David Shrewsbury
incremental |
269 |
drizzle_result_free(&result); |
270 |
return false; |
|
271 |
}
|
|
272 |
||
273 |
drizzle_row_t row; |
|
274 |
||
275 |
while ((row= drizzle_row_next(&result)) != NULL) |
|
276 |
{
|
|
277 |
if (row[0]) |
|
278 |
{
|
|
279 |
list.push_back(boost::lexical_cast<uint32_t>(row[0])); |
|
280 |
}
|
|
281 |
else
|
|
282 |
{
|
|
2116.1.33
by David Shrewsbury
incremental |
283 |
_last_return= ret; |
284 |
_last_error_message= "Replication slave: Unexpected NULL for trx id"; |
|
285 |
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str()); |
|
2116.1.32
by David Shrewsbury
incremental |
286 |
drizzle_result_free(&result); |
287 |
return false; |
|
288 |
}
|
|
289 |
}
|
|
290 |
||
291 |
drizzle_result_free(&result); |
|
292 |
return true; |
|
293 |
}
|
|
294 |
||
295 |
||
296 |
bool QueueProducer::queueInsert(const char *trx_id, |
|
297 |
const char *seg_id, |
|
298 |
const char *commit_id, |
|
2290.1.3
by Joseph Daly
slave plugin work |
299 |
const char *originating_server_uuid, |
300 |
const char *originating_commit_id, |
|
2116.1.32
by David Shrewsbury
incremental |
301 |
const char *msg, |
302 |
const char *msg_length) |
|
303 |
{
|
|
304 |
message::Transaction message; |
|
305 |
||
306 |
message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length)); |
|
307 |
||
308 |
/*
|
|
309 |
* The SQL to insert our results into the local queue.
|
|
310 |
*/
|
|
2116.1.52
by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word. |
311 |
string sql= "INSERT INTO `sys_replication`.`queue`" |
2360.1.1
by Mark Atwood
restore multi master replication |
312 |
" (`master_id`, `trx_id`, `seg_id`, `commit_order`,"
|
2290.1.5
by Joseph Daly
slave fixes |
313 |
" `originating_server_uuid`, `originating_commit_id`, `msg`) VALUES ("; |
2360.1.1
by Mark Atwood
restore multi master replication |
314 |
sql.append(boost::lexical_cast<string>(masterId())); |
315 |
sql.append(", ", 2); |
|
2116.1.32
by David Shrewsbury
incremental |
316 |
sql.append(trx_id); |
317 |
sql.append(", ", 2); |
|
318 |
sql.append(seg_id); |
|
319 |
sql.append(", ", 2); |
|
320 |
sql.append(commit_id); |
|
2290.1.5
by Joseph Daly
slave fixes |
321 |
sql.append(", '", 3); |
2290.1.3
by Joseph Daly
slave plugin work |
322 |
sql.append(originating_server_uuid); |
2290.1.5
by Joseph Daly
slave fixes |
323 |
sql.append("' , ", 4); |
2290.1.3
by Joseph Daly
slave plugin work |
324 |
sql.append(originating_commit_id); |
2116.1.32
by David Shrewsbury
incremental |
325 |
sql.append(", '", 3); |
326 |
||
327 |
/*
|
|
328 |
* Ideally we would store the Transaction message in binary form, as it
|
|
329 |
* it stored on the master and tranferred to the slave. However, we are
|
|
330 |
* inserting using drizzle::Execute which doesn't really handle binary
|
|
331 |
* data. Until that is changed, we store as plain text.
|
|
332 |
*/
|
|
333 |
string message_text; |
|
334 |
google::protobuf::TextFormat::PrintToString(message, &message_text); |
|
335 |
||
2116.1.34
by David Shrewsbury
Working replication. |
336 |
/*
|
2116.1.46
by David Shrewsbury
Fix for bug 720819 |
337 |
* Execution using drizzled::Execute requires some special escaping.
|
2116.1.34
by David Shrewsbury
Working replication. |
338 |
*/
|
2116.1.32
by David Shrewsbury
incremental |
339 |
string::iterator it= message_text.begin(); |
340 |
for (; it != message_text.end(); ++it) |
|
341 |
{
|
|
2116.1.46
by David Shrewsbury
Fix for bug 720819 |
342 |
if (*it == '\"') |
343 |
{
|
|
344 |
it= message_text.insert(it, '\\'); |
|
345 |
++it; |
|
346 |
}
|
|
347 |
else if (*it == '\'') |
|
348 |
{
|
|
349 |
it= message_text.insert(it, '\\'); |
|
350 |
++it; |
|
351 |
it= message_text.insert(it, '\\'); |
|
352 |
++it; |
|
353 |
}
|
|
354 |
else if (*it == '\\') |
|
355 |
{
|
|
356 |
it= message_text.insert(it, '\\'); |
|
357 |
++it; |
|
358 |
it= message_text.insert(it, '\\'); |
|
359 |
++it; |
|
2116.1.34
by David Shrewsbury
Working replication. |
360 |
it= message_text.insert(it, '\\'); |
361 |
++it; |
|
362 |
}
|
|
2218.1.2
by David Shrewsbury
Escape semicolons properly in slave plugin. |
363 |
else if (*it == ';') |
364 |
{
|
|
365 |
it= message_text.insert(it, '\\'); |
|
366 |
++it; /* advance back to the semicolon */ |
|
367 |
}
|
|
2116.1.32
by David Shrewsbury
incremental |
368 |
}
|
369 |
||
370 |
sql.append(message_text); |
|
371 |
sql.append("')", 2); |
|
372 |
||
2116.1.33
by David Shrewsbury
incremental |
373 |
vector<string> statements; |
374 |
statements.push_back(sql); |
|
375 |
||
376 |
if (not executeSQL(statements)) |
|
377 |
{
|
|
378 |
markInErrorState(); |
|
379 |
return false; |
|
380 |
}
|
|
381 |
||
2116.1.51
by David Shrewsbury
Fix for bug 720886: cached max commit_id value was not being set properly causing us to pull the same message from the master more than once |
382 |
uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id); |
383 |
if (tmp_commit_id > _saved_max_commit_id) |
|
384 |
_saved_max_commit_id= tmp_commit_id; |
|
385 |
||
2116.1.32
by David Shrewsbury
incremental |
386 |
return true; |
387 |
}
|
|
388 |
||
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
389 |
|
2221.4.2
by David Shrewsbury
Fix ambiguity of error_t in slave plugin. |
390 |
enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id) |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
391 |
{
|
2116.1.32
by David Shrewsbury
incremental |
392 |
vector<uint64_t> trx_id_list; |
393 |
||
394 |
if (not queryForTrxIdList(max_commit_id, trx_id_list)) |
|
2221.4.1
by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep. |
395 |
return ER_YES; |
2116.1.32
by David Shrewsbury
incremental |
396 |
|
2116.1.34
by David Shrewsbury
Working replication. |
397 |
if (trx_id_list.size() == 0) /* nothing to get from the master */ |
2116.1.32
by David Shrewsbury
incremental |
398 |
{
|
2221.4.1
by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep. |
399 |
return ER_NO; |
2116.1.32
by David Shrewsbury
incremental |
400 |
}
|
401 |
||
402 |
/*
|
|
403 |
* The SQL to pull everything we need from the master.
|
|
404 |
*/
|
|
2290.1.5
by Joseph Daly
slave fixes |
405 |
string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_uuid`," |
2290.1.3
by Joseph Daly
slave plugin work |
406 |
" `originating_commit_id`, `message`, `message_len` "
|
2116.1.40
by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word) |
407 |
" FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN ("; |
2116.1.32
by David Shrewsbury
incremental |
408 |
|
409 |
for (size_t x= 0; x < trx_id_list.size(); x++) |
|
410 |
{
|
|
411 |
if (x > 0) |
|
412 |
sql.append(", ", 2); |
|
413 |
sql.append(boost::lexical_cast<string>(trx_id_list[x])); |
|
414 |
}
|
|
415 |
||
416 |
sql.append(")", 1); |
|
2270.2.1
by Joseph Daly
bug 755201 must order the select from the sys_replication log this order is later used for the insertion order |
417 |
sql.append(" ORDER BY `commit_id` ASC"); |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
418 |
|
419 |
drizzle_return_t ret; |
|
420 |
drizzle_result_st result; |
|
2116.1.32
by David Shrewsbury
incremental |
421 |
drizzle_query_str(&_connection, &result, sql.c_str(), &ret); |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
422 |
|
423 |
if (ret != DRIZZLE_RETURN_OK) |
|
424 |
{
|
|
2116.1.32
by David Shrewsbury
incremental |
425 |
_last_return= ret; |
2116.1.33
by David Shrewsbury
incremental |
426 |
_last_error_message= "Replication slave: "; |
427 |
_last_error_message.append(drizzle_error(&_drizzle)); |
|
428 |
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str()); |
|
2217.1.2
by Andrew Hutchings
Fix leaks in slave result sets |
429 |
drizzle_result_free(&result); |
2221.4.1
by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep. |
430 |
return ER_YES; |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
431 |
}
|
432 |
||
2116.1.32
by David Shrewsbury
incremental |
433 |
/* TODO: Investigate 1-row-at-a-time buffering */
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
434 |
|
435 |
ret= drizzle_result_buffer(&result); |
|
436 |
||
437 |
if (ret != DRIZZLE_RETURN_OK) |
|
438 |
{
|
|
2116.1.33
by David Shrewsbury
incremental |
439 |
_last_return= ret; |
440 |
_last_error_message= "Replication slave: "; |
|
441 |
_last_error_message.append(drizzle_error(&_drizzle)); |
|
442 |
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str()); |
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
443 |
drizzle_result_free(&result); |
2221.4.1
by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep. |
444 |
return ER_YES; |
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
445 |
}
|
446 |
||
447 |
drizzle_row_t row; |
|
448 |
||
2116.1.32
by David Shrewsbury
incremental |
449 |
while ((row= drizzle_row_next(&result)) != NULL) |
450 |
{
|
|
2290.1.3
by Joseph Daly
slave plugin work |
451 |
if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6])) |
2116.1.32
by David Shrewsbury
incremental |
452 |
{
|
453 |
errmsg_printf(error::ERROR, |
|
454 |
_("Replication slave: Unable to insert into queue.")); |
|
2217.1.2
by Andrew Hutchings
Fix leaks in slave result sets |
455 |
drizzle_result_free(&result); |
2221.4.1
by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep. |
456 |
return ER_YES; |
2116.1.32
by David Shrewsbury
incremental |
457 |
}
|
2116.1.31
by David Shrewsbury
Major refactor of common functionality into new classes. |
458 |
}
|
459 |
||
460 |
drizzle_result_free(&result); |
|
2116.1.32
by David Shrewsbury
incremental |
461 |
|
2221.4.1
by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep. |
462 |
return EE_OK; |
2116.1.29
by David Shrewsbury
Initial stages of master connection |
463 |
}
|
464 |
||
2116.1.33
by David Shrewsbury
incremental |
465 |
|
466 |
void QueueProducer::setIOState(const string &err_msg, bool status) |
|
467 |
{
|
|
468 |
vector<string> statements; |
|
469 |
string sql; |
|
470 |
string msg(err_msg); |
|
471 |
||
472 |
if (not status) |
|
473 |
{
|
|
2116.1.52
by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word. |
474 |
sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'"; |
2116.1.33
by David Shrewsbury
incremental |
475 |
}
|
476 |
else
|
|
477 |
{
|
|
2116.1.52
by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word. |
478 |
sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'"; |
2116.1.33
by David Shrewsbury
incremental |
479 |
}
|
480 |
||
2116.1.40
by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word) |
481 |
sql.append(", `error_msg` = '", 17); |
2116.1.33
by David Shrewsbury
incremental |
482 |
|
483 |
/* Escape embedded quotes and statement terminators */
|
|
484 |
string::iterator it; |
|
485 |
for (it= msg.begin(); it != msg.end(); ++it) |
|
486 |
{
|
|
487 |
if (*it == '\'') |
|
488 |
{
|
|
489 |
it= msg.insert(it, '\''); |
|
490 |
++it; /* advance back to the quote */ |
|
491 |
}
|
|
492 |
else if (*it == ';') |
|
493 |
{
|
|
494 |
it= msg.insert(it, '\\'); |
|
495 |
++it; /* advance back to the semicolon */ |
|
496 |
}
|
|
497 |
}
|
|
498 |
||
499 |
sql.append(msg); |
|
2360.1.1
by Mark Atwood
restore multi master replication |
500 |
sql.append("' WHERE `master_id` = "); |
501 |
sql.append(boost::lexical_cast<string>(masterId())); |
|
2116.1.33
by David Shrewsbury
incremental |
502 |
|
503 |
statements.push_back(sql); |
|
504 |
executeSQL(statements); |
|
505 |
}
|
|
506 |
||
2116.1.23
by David Shrewsbury
Added empty version of producer thread |
507 |
} /* namespace slave */ |