~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_producer.h

  • Committer: Monty Taylor
  • Date: 2011-03-10 18:09:05 UTC
  • mfrom: (2225.2.2 refactor)
  • mto: This revision was merged to the branch mainline in revision 2228.
  • Revision ID: mordred@inaugust.com-20110310180905-ttx05t7q7ff6nl7c
Merge Olad: Refactoring

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2011 David Shrewsbury
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; either version 2 of the License, or
 
9
 *  (at your option) any later version.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
#ifndef PLUGIN_SLAVE_QUEUE_PRODUCER_H
 
22
#define PLUGIN_SLAVE_QUEUE_PRODUCER_H
 
23
 
 
24
#include <client/client_priv.h>
 
25
#include <drizzled/error_t.h>
 
26
#include <plugin/slave/queue_thread.h>
 
27
#include <plugin/slave/sql_executor.h>
 
28
#include <string>
 
29
#include <vector>
 
30
 
 
31
namespace slave
 
32
{
 
33
  
 
34
class QueueProducer : public QueueThread, public SQLExecutor
 
35
{
 
36
public:
 
37
  QueueProducer() :
 
38
    SQLExecutor("slave", "replication"),
 
39
    _check_interval(5),
 
40
    _master_port(3306),
 
41
    _last_return(DRIZZLE_RETURN_OK),
 
42
    _is_connected(false),
 
43
    _saved_max_commit_id(0),
 
44
    _max_reconnects(10),
 
45
    _seconds_between_reconnects(30)
 
46
  {}
 
47
 
 
48
  virtual ~QueueProducer();
 
49
 
 
50
  bool init();
 
51
  bool process();
 
52
  void shutdown();
 
53
 
 
54
  void setSleepInterval(uint32_t seconds)
 
55
  {
 
56
    _check_interval= seconds;
 
57
  }
 
58
 
 
59
  uint32_t getSleepInterval()
 
60
  {
 
61
    return _check_interval;
 
62
  }
 
63
 
 
64
  void setMasterHost(const std::string &host)
 
65
  {
 
66
    _master_host= host;
 
67
  }
 
68
 
 
69
  void setMasterPort(uint16_t port)
 
70
  {
 
71
    _master_port= port;
 
72
  }
 
73
 
 
74
  void setMasterUser(const std::string &user)
 
75
  {
 
76
    _master_user= user;
 
77
  }
 
78
 
 
79
  void setMasterPassword(const std::string &password)
 
80
  {
 
81
    _master_pass= password;
 
82
  }
 
83
 
 
84
  void setMaxReconnectAttempts(uint32_t max)
 
85
  {
 
86
    _max_reconnects= max;
 
87
  }
 
88
 
 
89
  void setSecondsBetweenReconnects(uint32_t seconds)
 
90
  {
 
91
    _seconds_between_reconnects= seconds;
 
92
  }
 
93
 
 
94
  void setMaxCommitId(uint64_t value)
 
95
  {
 
96
    _saved_max_commit_id= value;
 
97
  }
 
98
 
 
99
private:
 
100
  /** Number of seconds to sleep between checking queue for messages */
 
101
  uint32_t _check_interval;
 
102
 
 
103
  /* Master server connection parameters */
 
104
  std::string _master_host;
 
105
  uint16_t    _master_port;
 
106
  std::string _master_user;
 
107
  std::string _master_pass;
 
108
 
 
109
  drizzle_st _drizzle;
 
110
  drizzle_con_st _connection;
 
111
  drizzle_return_t _last_return;
 
112
 
 
113
  bool _is_connected;
 
114
  uint64_t _saved_max_commit_id;
 
115
  uint32_t _max_reconnects;
 
116
  uint32_t _seconds_between_reconnects;
 
117
 
 
118
  std::string _last_error_message;
 
119
 
 
120
  /**
 
121
   * Open connection to the master server.
 
122
   */
 
123
  bool openConnection();
 
124
 
 
125
  /**
 
126
   * Close connection to the master server.
 
127
   */
 
128
  bool closeConnection();
 
129
 
 
130
  /**
 
131
   * Attempt to reconnect to the master server.
 
132
   *
 
133
   * This method does not return until reconnect succeeds, or we exceed our
 
134
   * maximum number of retries defined by _max_reconnects.
 
135
   *
 
136
   * @retval true Reconnect succeeded
 
137
   * @retval false Reconnect failed
 
138
   */
 
139
  bool reconnect(bool initial_connection);
 
140
 
 
141
  /**
 
142
   * Get maximum commit ID that we have stored locally on the slave.
 
143
   *
 
144
   * This method determines where this slave is in relation to the master,
 
145
   * or, in other words, how "caught up" we are.
 
146
   *
 
147
   * @param[out] max_commit_id Maximum commit ID we have on this slave.
 
148
   */
 
149
  bool queryForMaxCommitId(uint64_t *max_commit_id);
 
150
 
 
151
  /**
 
152
   * Get replication events/messages from the master.
 
153
   *
 
154
   * Calling this method will a limited number of events from the master.
 
155
   * It should be repeatedly called until it returns -1, which means there
 
156
   * were no more events to retrieve.
 
157
   *
 
158
   * @param[in] max_commit_id Largest commit ID we have stored locally.
 
159
   *
 
160
   * @retval EE_OK  Successfully retrieved events
 
161
   * @retval ER_NO  No errors, but no more events to retrieve
 
162
   * @retval ER_YES Error
 
163
   */
 
164
  enum drizzled::error_t queryForReplicationEvents(uint64_t max_commit_id);
 
165
 
 
166
  bool queryForTrxIdList(uint64_t max_commit_id, std::vector<uint64_t> &list);
 
167
  bool queueInsert(const char *trx_id,
 
168
                   const char *seg_id,
 
169
                   const char *commit_id,
 
170
                   const char *msg,
 
171
                   const char *msg_length);
 
172
 
 
173
  /**
 
174
   * Update IO thread status in state table.
 
175
   *
 
176
   * @param err_msg Error message string
 
177
   * @param status false = STOPPED, true = RUNNING
 
178
   */
 
179
  void setIOState(const std::string &err_msg, bool status);
 
180
 
 
181
};
 
182
 
 
183
} /* namespace slave */
 
184
 
 
185
#endif /* PLUGIN_SLAVE_QUEUE_PRODUCER_H */