~drizzle-trunk/drizzle/development

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 *
 *  Copyright (C) 2011 David Shrewsbury
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 */

#ifndef PLUGIN_SLAVE_QUEUE_PRODUCER_H
#define PLUGIN_SLAVE_QUEUE_PRODUCER_H

#include <plugin/slave/queue_thread.h>
#include <plugin/slave/sql_executor.h>
#include <client/client_priv.h>
#include <string>
#include <vector>

namespace slave
{
  
class QueueProducer : public QueueThread, public SQLExecutor
{
public:
  QueueProducer() :
    SQLExecutor("slave", "replication"),
    _check_interval(5),
    _master_port(3306),
    _last_return(DRIZZLE_RETURN_OK),
    _is_connected(false),
    _saved_max_commit_id(0),
    _max_reconnects(10),
    _seconds_between_reconnects(30)
  {}

  virtual ~QueueProducer();

  bool init();
  bool process();
  void shutdown();

  void setSleepInterval(uint32_t seconds)
  {
    _check_interval= seconds;
  }

  uint32_t getSleepInterval()
  {
    return _check_interval;
  }

  void setMasterHost(const std::string &host)
  {
    _master_host= host;
  }

  void setMasterPort(uint16_t port)
  {
    _master_port= port;
  }

  void setMasterUser(const std::string &user)
  {
    _master_user= user;
  }

  void setMasterPassword(const std::string &password)
  {
    _master_pass= password;
  }

  void setMaxReconnectAttempts(uint32_t max)
  {
    _max_reconnects= max;
  }

  void setSecondsBetweenReconnects(uint32_t seconds)
  {
    _seconds_between_reconnects= seconds;
  }

private:
  /** Number of seconds to sleep between checking queue for messages */
  uint32_t _check_interval;

  /* Master server connection parameters */
  std::string _master_host;
  uint16_t    _master_port;
  std::string _master_user;
  std::string _master_pass;

  drizzle_st _drizzle;
  drizzle_con_st _connection;
  drizzle_return_t _last_return;

  bool _is_connected;
  uint64_t _saved_max_commit_id;
  uint32_t _max_reconnects;
  uint32_t _seconds_between_reconnects;

  std::string _last_error_message;

  /**
   * Open connection to the master server.
   */
  bool openConnection();

  /**
   * Close connection to the master server.
   */
  bool closeConnection();

  /**
   * Attempt to reconnect to the master server.
   *
   * This method does not return until reconnect succeeds, or we exceed our
   * maximum number of retries defined by _max_reconnects.
   *
   * @retval true Reconnect succeeded
   * @retval false Reconnect failed
   */
  bool reconnect(bool initial_connection);

  bool queryForMaxCommitId(uint64_t *max_commit_id);
  bool queryForReplicationEvents(uint64_t max_commit_id);
  bool queryForTrxIdList(uint64_t max_commit_id, std::vector<uint64_t> &list);
  bool queueInsert(const char *trx_id,
                   const char *seg_id,
                   const char *commit_id,
                   const char *msg,
                   const char *msg_length);

  /**
   * Update IO thread status in state table.
   *
   * @param err_msg Error message string
   * @param status false = STOPPED, true = RUNNING
   */
  void setIOState(const std::string &err_msg, bool status);

};

} /* namespace slave */

#endif /* PLUGIN_SLAVE_QUEUE_PRODUCER_H */