~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

  • Committer: Stewart Smith
  • Date: 2010-03-18 12:01:34 UTC
  • mto: (1666.2.3 build)
  • mto: This revision was merged to the branch mainline in revision 1596.
  • Revision ID: stewart@flamingspork.com-20100318120134-45fdnsw8g3j6c7oy
move RAND() into a plugin

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
#include "drizzled/plugin/transaction_replicator.h"
40
40
#include "drizzled/plugin/transaction_applier.h"
41
41
#include "drizzled/message/transaction.pb.h"
 
42
#include "drizzled/message/table.pb.h"
 
43
#include "drizzled/message/statement_transform.h"
42
44
#include "drizzled/gettext.h"
43
45
#include "drizzled/session.h"
44
46
#include "drizzled/error.h"
45
47
 
46
 
#include <string>
47
48
#include <vector>
48
 
#include <algorithm>
49
49
 
50
50
using namespace std;
51
51
 
52
52
namespace drizzled
53
53
{
54
54
 
55
 
ReplicationServices::ReplicationServices() :
56
 
  is_active(false)
 
55
ReplicationServices::ReplicationServices()
57
56
{
 
57
  is_active= false;
58
58
}
59
59
 
60
 
void ReplicationServices::normalizeReplicatorName(string &name)
 
60
void ReplicationServices::evaluateActivePlugins()
61
61
{
62
 
  transform(name.begin(),
63
 
            name.end(),
64
 
            name.begin(),
65
 
            ::tolower);
66
 
  if (name.find("replicator") == string::npos)
67
 
    name.append("replicator", 10);
68
 
  {
69
 
    size_t found_underscore= name.find('_');
70
 
    while (found_underscore != string::npos)
 
62
  /* 
 
63
   * We loop through replicators and appliers, evaluating
 
64
   * whether or not there is at least one active replicator
 
65
   * and one active applier.  If not, we set is_active
 
66
   * to false.
 
67
   */
 
68
  bool tmp_is_active= false;
 
69
 
 
70
  if (replicators.empty() || appliers.empty())
 
71
  {
 
72
    is_active= false;
 
73
    return;
 
74
  }
 
75
 
 
76
  /* 
 
77
   * Determine if any remaining replicators and if those
 
78
   * replicators are active...if not, set is_active
 
79
   * to false
 
80
   */
 
81
  for (Replicators::iterator repl_iter= replicators.begin();
 
82
       repl_iter != replicators.end();
 
83
       ++repl_iter)
 
84
  {
 
85
    if ((*repl_iter)->isEnabled())
71
86
    {
72
 
      name.erase(found_underscore, 1);
73
 
      found_underscore= name.find('_');
 
87
      tmp_is_active= true;
 
88
      break;
74
89
    }
75
90
  }
76
 
}
 
91
  if (! tmp_is_active)
 
92
  {
 
93
    /* No active replicators. Set is_active to false and exit. */
 
94
    is_active= false;
 
95
    return;
 
96
  }
77
97
 
78
 
bool ReplicationServices::evaluateRegisteredPlugins()
79
 
{
80
98
  /* 
81
 
   * We loop through appliers that have registered with us
82
 
   * and attempts to pair the applier with its requested
83
 
   * replicator.  If an applier has requested a replicator
84
 
   * that has either not been built or has not registered
85
 
   * with the replication services, we print an error and
86
 
   * return false
 
99
   * OK, we know there's at least one active replicator.
 
100
   *
 
101
   * Now determine if any remaining replicators and if those
 
102
   * replicators are active...if not, set is_active
 
103
   * to false
87
104
   */
88
 
  if (appliers.empty())
89
 
    return true;
90
 
 
91
 
  if (replicators.empty() && not appliers.empty())
92
 
  {
93
 
    errmsg_printf(ERRMSG_LVL_ERROR,
94
 
                  N_("You registered a TransactionApplier plugin but no "
95
 
                     "TransactionReplicator plugins were registered.\n"));
96
 
    return false;
97
 
  }
98
 
 
99
105
  for (Appliers::iterator appl_iter= appliers.begin();
100
106
       appl_iter != appliers.end();
101
107
       ++appl_iter)
102
108
  {
103
 
    plugin::TransactionApplier *applier= (*appl_iter).second;
104
 
    string requested_replicator_name= (*appl_iter).first;
105
 
    normalizeReplicatorName(requested_replicator_name);
106
 
 
107
 
    bool found= false;
108
 
    Replicators::iterator repl_iter;
109
 
    for (repl_iter= replicators.begin();
110
 
         repl_iter != replicators.end();
111
 
         ++repl_iter)
112
 
    {
113
 
      string replicator_name= (*repl_iter)->getName();
114
 
      normalizeReplicatorName(replicator_name);
115
 
 
116
 
      if (requested_replicator_name.compare(replicator_name) == 0)
117
 
      {
118
 
        found= true;
119
 
        break;
120
 
      }
121
 
    }
122
 
    if (not found)
123
 
    {
124
 
      errmsg_printf(ERRMSG_LVL_ERROR,
125
 
                    N_("You registered a TransactionApplier plugin but no "
126
 
                       "TransactionReplicator plugins were registered that match the "
127
 
                       "requested replicator name of '%s'.\n"
128
 
                       "We have deactivated the TransactionApplier '%s'.\n"),
129
 
                       requested_replicator_name.c_str(),
130
 
                       applier->getName().c_str());
131
 
      applier->deactivate();
132
 
      return false;
133
 
    }
134
 
    else
135
 
    {
136
 
      replication_streams.push_back(make_pair(*repl_iter, applier));
 
109
    if ((*appl_iter)->isEnabled())
 
110
    {
 
111
      is_active= true;
 
112
      return;
137
113
    }
138
114
  }
139
 
  is_active= true;
140
 
  return true;
 
115
  /* If we get here, there are no active appliers */
 
116
  is_active= false;
141
117
}
142
118
 
143
119
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
144
120
{
145
121
  replicators.push_back(in_replicator);
 
122
  evaluateActivePlugins();
146
123
}
147
124
 
148
125
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
149
126
{
150
127
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
151
 
}
152
 
 
153
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
154
 
{
155
 
  appliers.push_back(make_pair(requested_replicator_name, in_applier));
156
 
}
157
 
 
158
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
159
 
{
 
128
  evaluateActivePlugins();
 
129
}
 
130
 
 
131
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
 
132
{
 
133
  appliers.push_back(in_applier);
 
134
  evaluateActivePlugins();
 
135
}
 
136
 
 
137
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
 
138
{
 
139
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
 
140
  evaluateActivePlugins();
160
141
}
161
142
 
162
143
bool ReplicationServices::isActive() const
164
145
  return is_active;
165
146
}
166
147
 
167
 
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
 
                                                                          message::Transaction &to_push)
 
148
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
169
149
{
170
 
  plugin::ReplicationReturnCode result= plugin::SUCCESS;
171
 
 
172
 
  for (ReplicationStreams::iterator iter= replication_streams.begin();
173
 
       iter != replication_streams.end();
174
 
       ++iter)
 
150
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
 
151
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
 
152
  appl_start_iter= appliers.begin();
 
153
 
 
154
  plugin::TransactionReplicator *cur_repl;
 
155
  plugin::TransactionApplier *cur_appl;
 
156
 
 
157
  while (repl_iter != replicators.end())
175
158
  {
176
 
    plugin::TransactionReplicator *cur_repl= (*iter).first;
177
 
    plugin::TransactionApplier *cur_appl= (*iter).second;
178
 
 
179
 
    result= cur_repl->replicate(cur_appl, in_session, to_push);
180
 
 
181
 
    if (result == plugin::SUCCESS)
182
 
    {
 
159
    cur_repl= *repl_iter;
 
160
    if (! cur_repl->isEnabled())
 
161
    {
 
162
      ++repl_iter;
 
163
      continue;
 
164
    }
 
165
    
 
166
    appl_iter= appl_start_iter;
 
167
    while (appl_iter != appliers.end())
 
168
    {
 
169
      cur_appl= *appl_iter;
 
170
 
 
171
      if (! cur_appl->isEnabled())
 
172
      {
 
173
        ++appl_iter;
 
174
        continue;
 
175
      }
 
176
 
 
177
      cur_repl->replicate(cur_appl, to_push);
 
178
      
183
179
      /* 
184
180
       * We update the timestamp for the last applied Transaction so that
185
181
       * publisher plugins can ask the replication services when the
187
183
       * method.
188
184
       */
189
185
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
 
186
      ++appl_iter;
190
187
    }
191
 
    else
192
 
      return result;
 
188
    ++repl_iter;
193
189
  }
194
 
  return result;
195
 
}
196
 
 
197
 
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
198
 
{
199
 
  return replication_streams;
200
190
}
201
191
 
202
192
} /* namespace drizzled */