~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_producer.h

  • Committer: pcrews
  • Date: 2011-05-24 17:36:24 UTC
  • mfrom: (1099.4.232 drizzle)
  • Revision ID: pcrews@lucid32-20110524173624-mwr1bvq6fa1r01ao
Updated translations + 2011.05.18 tarball tag

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2011 David Shrewsbury
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; either version 2 of the License, or
 
9
 *  (at your option) any later version.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
#pragma once
 
22
 
 
23
#include <client/client_priv.h>
 
24
#include <drizzled/error_t.h>
 
25
#include <plugin/slave/queue_thread.h>
 
26
#include <plugin/slave/sql_executor.h>
 
27
#include <string>
 
28
#include <vector>
 
29
 
 
30
namespace slave
 
31
{
 
32
  
 
33
class QueueProducer : public QueueThread, public SQLExecutor
 
34
{
 
35
public:
 
36
  QueueProducer() :
 
37
    SQLExecutor("slave", "replication"),
 
38
    _check_interval(5),
 
39
    _master_port(3306),
 
40
    _last_return(DRIZZLE_RETURN_OK),
 
41
    _is_connected(false),
 
42
    _saved_max_commit_id(0),
 
43
    _max_reconnects(10),
 
44
    _seconds_between_reconnects(30)
 
45
  {}
 
46
 
 
47
  virtual ~QueueProducer();
 
48
 
 
49
  bool init();
 
50
  bool process();
 
51
  void shutdown();
 
52
 
 
53
  void setSleepInterval(uint32_t seconds)
 
54
  {
 
55
    _check_interval= seconds;
 
56
  }
 
57
 
 
58
  uint32_t getSleepInterval()
 
59
  {
 
60
    return _check_interval;
 
61
  }
 
62
 
 
63
  void setMasterHost(const std::string &host)
 
64
  {
 
65
    _master_host= host;
 
66
  }
 
67
 
 
68
  void setMasterPort(uint16_t port)
 
69
  {
 
70
    _master_port= port;
 
71
  }
 
72
 
 
73
  void setMasterUser(const std::string &user)
 
74
  {
 
75
    _master_user= user;
 
76
  }
 
77
 
 
78
  void setMasterPassword(const std::string &password)
 
79
  {
 
80
    _master_pass= password;
 
81
  }
 
82
 
 
83
  void setMaxReconnectAttempts(uint32_t max)
 
84
  {
 
85
    _max_reconnects= max;
 
86
  }
 
87
 
 
88
  void setSecondsBetweenReconnects(uint32_t seconds)
 
89
  {
 
90
    _seconds_between_reconnects= seconds;
 
91
  }
 
92
 
 
93
  void setCachedMaxCommitId(uint64_t value)
 
94
  {
 
95
    _saved_max_commit_id= value;
 
96
  }
 
97
 
 
98
private:
 
99
  /** Number of seconds to sleep between checking queue for messages */
 
100
  uint32_t _check_interval;
 
101
 
 
102
  /* Master server connection parameters */
 
103
  std::string _master_host;
 
104
  uint16_t    _master_port;
 
105
  std::string _master_user;
 
106
  std::string _master_pass;
 
107
 
 
108
  drizzle_st _drizzle;
 
109
  drizzle_con_st _connection;
 
110
  drizzle_return_t _last_return;
 
111
 
 
112
  bool _is_connected;
 
113
  uint64_t _saved_max_commit_id;
 
114
  uint32_t _max_reconnects;
 
115
  uint32_t _seconds_between_reconnects;
 
116
 
 
117
  std::string _last_error_message;
 
118
 
 
119
  /**
 
120
   * Open connection to the master server.
 
121
   */
 
122
  bool openConnection();
 
123
 
 
124
  /**
 
125
   * Close connection to the master server.
 
126
   */
 
127
  bool closeConnection();
 
128
 
 
129
  /**
 
130
   * Attempt to reconnect to the master server.
 
131
   *
 
132
   * This method does not return until reconnect succeeds, or we exceed our
 
133
   * maximum number of retries defined by _max_reconnects.
 
134
   *
 
135
   * @retval true Reconnect succeeded
 
136
   * @retval false Reconnect failed
 
137
   */
 
138
  bool reconnect(bool initial_connection);
 
139
 
 
140
  /**
 
141
   * Get maximum commit ID that we have stored locally on the slave.
 
142
   *
 
143
   * This method determines where this slave is in relation to the master,
 
144
   * or, in other words, how "caught up" we are.
 
145
   *
 
146
   * @param[out] max_commit_id Maximum commit ID we have on this slave.
 
147
   */
 
148
  bool queryForMaxCommitId(uint64_t *max_commit_id);
 
149
 
 
150
  /**
 
151
   * Get replication events/messages from the master.
 
152
   *
 
153
   * Calling this method will a limited number of events from the master.
 
154
   * It should be repeatedly called until it returns -1, which means there
 
155
   * were no more events to retrieve.
 
156
   *
 
157
   * @param[in] max_commit_id Largest commit ID we have stored locally.
 
158
   *
 
159
   * @retval EE_OK  Successfully retrieved events
 
160
   * @retval ER_NO  No errors, but no more events to retrieve
 
161
   * @retval ER_YES Error
 
162
   */
 
163
  enum drizzled::error_t queryForReplicationEvents(uint64_t max_commit_id);
 
164
 
 
165
  bool queryForTrxIdList(uint64_t max_commit_id, std::vector<uint64_t> &list);
 
166
  bool queueInsert(const char *trx_id,
 
167
                   const char *seg_id,
 
168
                   const char *commit_id,
 
169
                   const char *originating_server_uuid,
 
170
                   const char *originating_commit_id,
 
171
                   const char *msg,
 
172
                   const char *msg_length);
 
173
 
 
174
  /**
 
175
   * Update IO thread status in state table.
 
176
   *
 
177
   * @param err_msg Error message string
 
178
   * @param status false = STOPPED, true = RUNNING
 
179
   */
 
180
  void setIOState(const std::string &err_msg, bool status);
 
181
 
 
182
};
 
183
 
 
184
} /* namespace slave */
 
185