~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/replication_slave.h

  • Committer: Mark Atwood
  • Date: 2011-07-06 23:13:00 UTC
  • mto: This revision was merged to the branch mainline in revision 2361.
  • Revision ID: me@mark.atwood.name-20110706231300-y49o7wu01avy1jh5
restore multi master replication

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
#include <plugin/slave/replication_schema.h>
26
26
#include <drizzled/plugin/daemon.h>
27
27
#include <boost/thread.hpp>
 
28
#include <boost/unordered_map.hpp>
28
29
 
29
30
namespace drizzled
30
31
{
40
41
 
41
42
  ReplicationSlave(const std::string &config)
42
43
    : drizzled::plugin::Daemon("Replication Slave"),
43
 
      _config_file(config),
44
 
      _initial_max_commit_id(0)
 
44
      _config_file(config)
45
45
  {}
46
46
  
47
47
  ~ReplicationSlave()
48
48
  {
49
49
    _consumer_thread.interrupt();
50
 
    _producer_thread.interrupt();
 
50
 
 
51
    boost::unordered_map<uint32_t, Master *>::const_iterator it;
 
52
 
 
53
    for (it= _masters.begin(); it != _masters.end(); ++it)
 
54
    {
 
55
      it->second->thread().interrupt();
 
56
    }
51
57
  }
52
58
 
 
59
  /** Gets called after all plugins are initialized */
53
60
  void startup(drizzled::Session &session);
54
61
 
55
 
  /**
56
 
   * Get the error message describing what went wrong during setup.
57
 
   */
58
 
  const std::string &getError() const
59
 
  {
60
 
    return _error;
61
 
  }
62
 
 
63
 
  /**
64
 
   * Set the initial value for the slave's maximum commit ID.
65
 
   *
66
 
   * This value basically determines where to start retrieving events from
67
 
   * the master. Normally this is computed automatically based on the
68
 
   * contents of the queue and/or the last applied commit ID. This allows
69
 
   * us to override those values and start from another point. E.g., new
70
 
   * slave provisioning or skipping a trouble statement.
71
 
   *
72
 
   * @param[in] value The commit ID value.
73
 
   */
74
 
  void setMaxCommitId(uint64_t value)
75
 
  {
76
 
    /* must tell producer to set its cached value */
77
 
    _producer.setCachedMaxCommitId(value);
78
 
    /* setting this indicates that we should store it permanently */
79
 
    _initial_max_commit_id= value;
80
 
  }
81
 
 
82
62
private:
 
63
 
 
64
  /**
 
65
   * Class representing a master server.
 
66
   */
 
67
  class Master
 
68
  {
 
69
  public:
 
70
    Master(uint32_t id)
 
71
    {
 
72
      _producer.setMasterId(id);
 
73
    }
 
74
 
 
75
    QueueProducer &producer()
 
76
    {
 
77
      return _producer;
 
78
    }
 
79
  
 
80
    boost::thread &thread()
 
81
    {
 
82
      return _producer_thread;
 
83
    }
 
84
 
 
85
    void start()
 
86
    {
 
87
      _producer_thread= boost::thread(&QueueProducer::run, &_producer);
 
88
    }
 
89
 
 
90
  private:
 
91
    /** Manages a single master */
 
92
    QueueProducer _producer;
 
93
 
 
94
    /** I/O thread that will populate the work queue */
 
95
    boost::thread _producer_thread;
 
96
  };
 
97
 
 
98
  /** Configuration file containing master info */
83
99
  std::string _config_file;
 
100
 
84
101
  std::string _error;
85
 
 
 
102
  
 
103
  /** Object to use with the consumer thread */
86
104
  QueueConsumer _consumer;
87
 
  QueueProducer _producer;
88
105
 
89
 
  /** Applier thread that will drain the work queue */
 
106
  /**
 
107
   * Applier thread that will drain the work queue.
 
108
   * @todo Support more than one consumer thread.
 
109
   */
90
110
  boost::thread _consumer_thread;
91
111
 
92
 
  /** I/O thread that will populate the work queue */
93
 
  boost::thread _producer_thread;
 
112
  /** List of master objects, one per master */
 
113
  boost::unordered_map<uint32_t, Master *> _masters;
94
114
 
95
 
  uint64_t _initial_max_commit_id;
 
115
  /** Convenience method to get object reference */
 
116
  Master &master(size_t index)
 
117
  {
 
118
    return *(_masters[index]);
 
119
  }
96
120
 
97
121
  /**
98
122
   * Initialize slave services with the given configuration file.
99
123
   *
100
 
   * In case of an error during initialization, you can call the getError()
101
 
   * method to get a string describing what went wrong.
 
124
   * In case of an error during initialization, _error contains a
 
125
   * string describing what went wrong.
102
126
   *
103
127
   * @retval true Success
104
128
   * @retval false Failure