~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

  • Committer: Olaf van der Spek
  • Date: 2011-02-12 18:24:24 UTC
  • mto: (2167.1.2 build) (2172.1.4 build)
  • mto: This revision was merged to the branch mainline in revision 2168.
  • Revision ID: olafvdspek@gmail.com-20110212182424-kgnm9osi7qo97at2
casts

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 
 *  Copyright (c) 2009-2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
 
5
 *  Copyright (C) 2009-2010 Jay Pipes <jaypipes@gmail.com>
6
6
 *
7
7
 *  Authors:
8
8
 *
39
39
#include "drizzled/plugin/transaction_replicator.h"
40
40
#include "drizzled/plugin/transaction_applier.h"
41
41
#include "drizzled/message/transaction.pb.h"
42
 
#include "drizzled/message/table.pb.h"
43
 
#include "drizzled/message/statement_transform.h"
44
42
#include "drizzled/gettext.h"
45
43
#include "drizzled/session.h"
46
44
#include "drizzled/error.h"
47
45
 
 
46
#include <string>
48
47
#include <vector>
 
48
#include <algorithm>
49
49
 
50
50
using namespace std;
51
51
 
52
52
namespace drizzled
53
53
{
54
54
 
55
 
ReplicationServices::ReplicationServices()
 
55
ReplicationServices::ReplicationServices() :
 
56
  is_active(false)
56
57
{
57
 
  is_active= false;
58
58
}
59
59
 
60
 
void ReplicationServices::evaluateActivePlugins()
 
60
void ReplicationServices::normalizeReplicatorName(string &name)
61
61
{
62
 
  /* 
63
 
   * We loop through replicators and appliers, evaluating
64
 
   * whether or not there is at least one active replicator
65
 
   * and one active applier.  If not, we set is_active
66
 
   * to false.
67
 
   */
68
 
  bool tmp_is_active= false;
69
 
 
70
 
  if (replicators.empty() || appliers.empty())
71
 
  {
72
 
    is_active= false;
73
 
    return;
74
 
  }
75
 
 
76
 
  /* 
77
 
   * Determine if any remaining replicators and if those
78
 
   * replicators are active...if not, set is_active
79
 
   * to false
80
 
   */
81
 
  for (Replicators::iterator repl_iter= replicators.begin();
82
 
       repl_iter != replicators.end();
83
 
       ++repl_iter)
84
 
  {
85
 
    if ((*repl_iter)->isEnabled())
 
62
  transform(name.begin(),
 
63
            name.end(),
 
64
            name.begin(),
 
65
            ::tolower);
 
66
  if (name.find("replicator") == string::npos)
 
67
    name.append("replicator", 10);
 
68
  {
 
69
    size_t found_underscore= name.find('_');
 
70
    while (found_underscore != string::npos)
86
71
    {
87
 
      tmp_is_active= true;
88
 
      break;
 
72
      name.erase(found_underscore, 1);
 
73
      found_underscore= name.find('_');
89
74
    }
90
75
  }
91
 
  if (! tmp_is_active)
92
 
  {
93
 
    /* No active replicators. Set is_active to false and exit. */
94
 
    is_active= false;
95
 
    return;
96
 
  }
 
76
}
97
77
 
 
78
bool ReplicationServices::evaluateRegisteredPlugins()
 
79
{
98
80
  /* 
99
 
   * OK, we know there's at least one active replicator.
100
 
   *
101
 
   * Now determine if any remaining replicators and if those
102
 
   * replicators are active...if not, set is_active
103
 
   * to false
 
81
   * We loop through appliers that have registered with us
 
82
   * and attempts to pair the applier with its requested
 
83
   * replicator.  If an applier has requested a replicator
 
84
   * that has either not been built or has not registered
 
85
   * with the replication services, we print an error and
 
86
   * return false
104
87
   */
 
88
  if (appliers.empty())
 
89
    return true;
 
90
 
 
91
  if (replicators.empty() && not appliers.empty())
 
92
  {
 
93
    errmsg_printf(error::ERROR,
 
94
                  N_("You registered a TransactionApplier plugin but no "
 
95
                     "TransactionReplicator plugins were registered.\n"));
 
96
    return false;
 
97
  }
 
98
 
105
99
  for (Appliers::iterator appl_iter= appliers.begin();
106
100
       appl_iter != appliers.end();
107
101
       ++appl_iter)
108
102
  {
109
 
    if ((*appl_iter)->isEnabled())
110
 
    {
111
 
      is_active= true;
112
 
      return;
 
103
    plugin::TransactionApplier *applier= (*appl_iter).second;
 
104
    string requested_replicator_name= (*appl_iter).first;
 
105
    normalizeReplicatorName(requested_replicator_name);
 
106
 
 
107
    bool found= false;
 
108
    Replicators::iterator repl_iter;
 
109
    for (repl_iter= replicators.begin();
 
110
         repl_iter != replicators.end();
 
111
         ++repl_iter)
 
112
    {
 
113
      string replicator_name= (*repl_iter)->getName();
 
114
      normalizeReplicatorName(replicator_name);
 
115
 
 
116
      if (requested_replicator_name.compare(replicator_name) == 0)
 
117
      {
 
118
        found= true;
 
119
        break;
 
120
      }
 
121
    }
 
122
    if (not found)
 
123
    {
 
124
      errmsg_printf(error::ERROR,
 
125
                    N_("You registered a TransactionApplier plugin but no "
 
126
                       "TransactionReplicator plugins were registered that match the "
 
127
                       "requested replicator name of '%s'.\n"
 
128
                       "We have deactivated the TransactionApplier '%s'.\n"),
 
129
                       requested_replicator_name.c_str(),
 
130
                       applier->getName().c_str());
 
131
      applier->deactivate();
 
132
      return false;
 
133
    }
 
134
    else
 
135
    {
 
136
      replication_streams.push_back(make_pair(*repl_iter, applier));
113
137
    }
114
138
  }
115
 
  /* If we get here, there are no active appliers */
116
 
  is_active= false;
 
139
  is_active= true;
 
140
  return true;
117
141
}
118
142
 
119
143
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
120
144
{
121
145
  replicators.push_back(in_replicator);
122
 
  evaluateActivePlugins();
123
146
}
124
147
 
125
148
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
126
149
{
127
150
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
128
 
  evaluateActivePlugins();
129
 
}
130
 
 
131
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
132
 
{
133
 
  appliers.push_back(in_applier);
134
 
  evaluateActivePlugins();
135
 
}
136
 
 
137
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
138
 
{
139
 
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
140
 
  evaluateActivePlugins();
 
151
}
 
152
 
 
153
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
 
154
{
 
155
  appliers.push_back(make_pair(requested_replicator_name, in_applier));
 
156
}
 
157
 
 
158
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
 
159
{
141
160
}
142
161
 
143
162
bool ReplicationServices::isActive() const
145
164
  return is_active;
146
165
}
147
166
 
148
 
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
 
167
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
 
168
                                                                          message::Transaction &to_push)
149
169
{
150
 
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
151
 
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
152
 
  appl_start_iter= appliers.begin();
153
 
 
154
 
  plugin::TransactionReplicator *cur_repl;
155
 
  plugin::TransactionApplier *cur_appl;
156
 
 
157
 
  while (repl_iter != replicators.end())
 
170
  plugin::ReplicationReturnCode result= plugin::SUCCESS;
 
171
 
 
172
  for (ReplicationStreams::iterator iter= replication_streams.begin();
 
173
       iter != replication_streams.end();
 
174
       ++iter)
158
175
  {
159
 
    cur_repl= *repl_iter;
160
 
    if (! cur_repl->isEnabled())
161
 
    {
162
 
      ++repl_iter;
163
 
      continue;
164
 
    }
165
 
    
166
 
    appl_iter= appl_start_iter;
167
 
    while (appl_iter != appliers.end())
168
 
    {
169
 
      cur_appl= *appl_iter;
170
 
 
171
 
      if (! cur_appl->isEnabled())
172
 
      {
173
 
        ++appl_iter;
174
 
        continue;
175
 
      }
176
 
 
177
 
      cur_repl->replicate(cur_appl, to_push);
178
 
      
 
176
    plugin::TransactionReplicator *cur_repl= (*iter).first;
 
177
    plugin::TransactionApplier *cur_appl= (*iter).second;
 
178
 
 
179
    result= cur_repl->replicate(cur_appl, in_session, to_push);
 
180
 
 
181
    if (result == plugin::SUCCESS)
 
182
    {
179
183
      /* 
180
184
       * We update the timestamp for the last applied Transaction so that
181
185
       * publisher plugins can ask the replication services when the
183
187
       * method.
184
188
       */
185
189
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
186
 
      ++appl_iter;
187
190
    }
188
 
    ++repl_iter;
 
191
    else
 
192
      return result;
189
193
  }
 
194
  return result;
 
195
}
 
196
 
 
197
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
 
198
{
 
199
  return replication_streams;
190
200
}
191
201
 
192
202
} /* namespace drizzled */