~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/replication_schema.cc

  • Committer: Mark Atwood
  • Date: 2011-07-06 23:13:00 UTC
  • mto: This revision was merged to the branch mainline in revision 2361.
  • Revision ID: me@mark.atwood.name-20110706231300-y49o7wu01avy1jh5
restore multi master replication

Show diffs side-by-side

added added

removed removed

Lines of Context:
44
44
    return false;
45
45
 
46
46
  /*
47
 
   * Create our IO thread state information table if we need to.
 
47
    Create our IO thread state information table if we need to.
48
48
   */
49
49
 
 
50
  /*
 
51
    Table: io_state
 
52
    Version 1.0: Initial definition
 
53
    Version 1.1: Added master_id and PK on master_id
 
54
  */
 
55
 
 
56
 
50
57
  sql.clear();
51
58
  sql.push_back("COMMIT");
52
59
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
 
60
                " `master_id` BIGINT NOT NULL,"
53
61
                " `status` VARCHAR(20) NOT NULL,"
54
62
                " `error_msg` VARCHAR(250))"
55
 
                " COMMENT = 'VERSION 1.0'");
 
63
                " COMMENT = 'VERSION 1.1'");
56
64
 
57
65
  if (not executeSQL(sql))
58
66
    return false;
59
67
 
 
68
#if 0
60
69
  sql.clear();
61
70
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state`");
62
71
 
77
86
        return false;
78
87
    }
79
88
  }
 
89
#endif
80
90
 
81
91
  /*
82
92
   * Create our applier thread state information table if we need to.
86
96
   * Table: applier_state
87
97
   * Version 1.0: Initial definition
88
98
   * Version 1.1: Added originating_server_uuid and originating_commit_id
 
99
   * Version 1.2: Added master_id and changed PK to master_id
89
100
   */
90
101
 
91
102
  sql.clear();
92
103
  sql.push_back("COMMIT");
93
104
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
94
105
                " (`last_applied_commit_id` BIGINT NOT NULL PRIMARY KEY,"
 
106
                " `master_id` BIGINT NOT NULL,"
95
107
                " `originating_server_uuid` VARCHAR(36) NOT NULL,"
96
108
                " `originating_commit_id` BIGINT NOT NULL,"
97
109
                " `status` VARCHAR(20) NOT NULL,"
98
110
                " `error_msg` VARCHAR(250))"
99
 
                " COMMENT = 'VERSION 1.1'");
 
111
                " COMMENT = 'VERSION 1.2'");
100
112
 
101
113
  if (not executeSQL(sql))
102
114
    return false;
103
115
 
 
116
#if 0
104
117
  sql.clear();
105
118
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state`");
106
119
 
123
136
        return false;
124
137
    }
125
138
  }
 
139
#endif
126
140
 
127
141
  /*
128
142
   * Create our message queue table if we need to.
129
143
   * Version 1.0: Initial definition
130
144
   * Version 1.1: Added originating_server_uuid and originating_commit_id
 
145
   * Version 1.2: Added master_id and changed PK to (master_id, trx_id, seg_id)
131
146
   */
132
147
 
133
148
  sql.clear();
138
153
                " `originating_server_uuid` VARCHAR(36) NOT NULL,"
139
154
                " `originating_commit_id` BIGINT NOT NULL,"
140
155
                " `msg` BLOB,"
 
156
                " `master_id` BIGINT NOT NULL,"
141
157
                " PRIMARY KEY(`trx_id`, `seg_id`))"
142
 
                " COMMENT = 'VERSION 1.1'");
 
158
                " COMMENT = 'VERSION 1.2'");
143
159
  if (not executeSQL(sql))
144
160
    return false;
145
161
 
146
162
  return true;
147
163
}
148
164
 
149
 
bool ReplicationSchema::setInitialMaxCommitId(uint64_t value)
 
165
bool ReplicationSchema::createInitialIORow(uint32_t master_id)
 
166
{
 
167
  vector<string> sql;
 
168
 
 
169
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
 
170
 
 
171
  sql::ResultSet result_set(1);
 
172
  Execute execute(*(_session.get()), true);
 
173
  execute.run(sql[0], result_set);
 
174
  result_set.next();
 
175
  string count= result_set.getString(0);
 
176
 
 
177
  if (count == "0")
 
178
  {
 
179
    sql.clear();
 
180
    sql.push_back("INSERT INTO `sys_replication`.`io_state` (`master_id`, `status`) VALUES ("
 
181
                  + boost::lexical_cast<string>(master_id)
 
182
                  + ", 'STOPPED')");
 
183
    if (not executeSQL(sql))
 
184
      return false;
 
185
  }
 
186
 
 
187
  return true;
 
188
}
 
189
 
 
190
bool ReplicationSchema::createInitialApplierRow(uint32_t master_id)
 
191
{
 
192
  vector<string> sql;
 
193
 
 
194
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
 
195
 
 
196
  sql::ResultSet result_set(1);
 
197
  Execute execute(*(_session.get()), true);
 
198
  execute.run(sql[0], result_set);
 
199
  result_set.next();
 
200
  string count= result_set.getString(0);
 
201
 
 
202
  if (count == "0")
 
203
  {
 
204
    sql.clear();
 
205
    sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
 
206
                  " (`master_id`, `last_applied_commit_id`, `status`) VALUES ("
 
207
                  + boost::lexical_cast<string>(master_id)
 
208
                  + ",0 , 'STOPPED')");
 
209
    if (not executeSQL(sql))
 
210
      return false;
 
211
  }
 
212
 
 
213
  return true;
 
214
}
 
215
 
 
216
 
 
217
bool ReplicationSchema::setInitialMaxCommitId(uint32_t master_id, uint64_t value)
150
218
{
151
219
  vector<string> sql;
152
220
 
153
221
  sql.push_back("UPDATE `sys_replication`.`applier_state`"
154
222
                " SET `last_applied_commit_id` = "
155
 
                + lexical_cast<string>(value));
 
223
                + lexical_cast<string>(value)
 
224
                + " WHERE `master_id` = "
 
225
                + lexical_cast<string>(master_id));
156
226
 
157
227
  return executeSQL(sql);
158
228
}