~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_producer.h

  • Committer: Olaf van der Spek
  • Date: 2011-02-28 14:09:50 UTC
  • mfrom: (2207 bootstrap)
  • mto: (2209.1.2 build)
  • mto: This revision was merged to the branch mainline in revision 2210.
  • Revision ID: olafvdspek@gmail.com-20110228140950-2nu0hyzhuww3wssx
Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2011 David Shrewsbury
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; either version 2 of the License, or
 
9
 *  (at your option) any later version.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
#ifndef PLUGIN_SLAVE_QUEUE_PRODUCER_H
 
22
#define PLUGIN_SLAVE_QUEUE_PRODUCER_H
 
23
 
 
24
#include <plugin/slave/queue_thread.h>
 
25
#include <plugin/slave/sql_executor.h>
 
26
#include <client/client_priv.h>
 
27
#include <string>
 
28
#include <vector>
 
29
 
 
30
namespace slave
 
31
{
 
32
  
 
33
class QueueProducer : public QueueThread, public SQLExecutor
 
34
{
 
35
public:
 
36
  QueueProducer() :
 
37
    SQLExecutor("slave", "replication"),
 
38
    _check_interval(5),
 
39
    _master_port(3306),
 
40
    _last_return(DRIZZLE_RETURN_OK),
 
41
    _is_connected(false),
 
42
    _saved_max_commit_id(0),
 
43
    _max_reconnects(10),
 
44
    _seconds_between_reconnects(30)
 
45
  {}
 
46
 
 
47
  virtual ~QueueProducer();
 
48
 
 
49
  bool init();
 
50
  bool process();
 
51
  void shutdown();
 
52
 
 
53
  void setSleepInterval(uint32_t seconds)
 
54
  {
 
55
    _check_interval= seconds;
 
56
  }
 
57
 
 
58
  uint32_t getSleepInterval()
 
59
  {
 
60
    return _check_interval;
 
61
  }
 
62
 
 
63
  void setMasterHost(const std::string &host)
 
64
  {
 
65
    _master_host= host;
 
66
  }
 
67
 
 
68
  void setMasterPort(uint16_t port)
 
69
  {
 
70
    _master_port= port;
 
71
  }
 
72
 
 
73
  void setMasterUser(const std::string &user)
 
74
  {
 
75
    _master_user= user;
 
76
  }
 
77
 
 
78
  void setMasterPassword(const std::string &password)
 
79
  {
 
80
    _master_pass= password;
 
81
  }
 
82
 
 
83
  void setMaxReconnectAttempts(uint32_t max)
 
84
  {
 
85
    _max_reconnects= max;
 
86
  }
 
87
 
 
88
  void setSecondsBetweenReconnects(uint32_t seconds)
 
89
  {
 
90
    _seconds_between_reconnects= seconds;
 
91
  }
 
92
 
 
93
private:
 
94
  /** Number of seconds to sleep between checking queue for messages */
 
95
  uint32_t _check_interval;
 
96
 
 
97
  /* Master server connection parameters */
 
98
  std::string _master_host;
 
99
  uint16_t    _master_port;
 
100
  std::string _master_user;
 
101
  std::string _master_pass;
 
102
 
 
103
  drizzle_st _drizzle;
 
104
  drizzle_con_st _connection;
 
105
  drizzle_return_t _last_return;
 
106
 
 
107
  bool _is_connected;
 
108
  uint64_t _saved_max_commit_id;
 
109
  uint32_t _max_reconnects;
 
110
  uint32_t _seconds_between_reconnects;
 
111
 
 
112
  std::string _last_error_message;
 
113
 
 
114
  /**
 
115
   * Open connection to the master server.
 
116
   */
 
117
  bool openConnection();
 
118
 
 
119
  /**
 
120
   * Close connection to the master server.
 
121
   */
 
122
  bool closeConnection();
 
123
 
 
124
  /**
 
125
   * Attempt to reconnect to the master server.
 
126
   *
 
127
   * This method does not return until reconnect succeeds, or we exceed our
 
128
   * maximum number of retries defined by _max_reconnects.
 
129
   *
 
130
   * @retval true Reconnect succeeded
 
131
   * @retval false Reconnect failed
 
132
   */
 
133
  bool reconnect();
 
134
 
 
135
  bool queryForMaxCommitId(uint64_t *max_commit_id);
 
136
  bool queryForReplicationEvents(uint64_t max_commit_id);
 
137
  bool queryForTrxIdList(uint64_t max_commit_id, std::vector<uint64_t> &list);
 
138
  bool queueInsert(const char *trx_id,
 
139
                   const char *seg_id,
 
140
                   const char *commit_id,
 
141
                   const char *msg,
 
142
                   const char *msg_length);
 
143
 
 
144
  /**
 
145
   * Update IO thread status in state table.
 
146
   *
 
147
   * @param err_msg Error message string
 
148
   * @param status false = STOPPED, true = RUNNING
 
149
   */
 
150
  void setIOState(const std::string &err_msg, bool status);
 
151
 
 
152
};
 
153
 
 
154
} /* namespace slave */
 
155
 
 
156
#endif /* PLUGIN_SLAVE_QUEUE_PRODUCER_H */