~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/replication_schema.cc

  • Committer: Lee Bieber
  • Date: 2011-04-02 03:51:56 UTC
  • mfrom: (2260.1.1 revert_slave)
  • Revision ID: kalebral@gmail.com-20110402035156-lhfvo7o6yqtli0xs
Merge Shrews - Revert the multi-master slave plugin changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
   * Create our IO thread state information table if we need to.
48
48
   */
49
49
 
50
 
  /*
51
 
   * Table: io_state
52
 
   * Version 1.0: Initial definition
53
 
   * Version 1.1: Added master_id and PK on master_id
54
 
   */
55
 
 
56
50
  sql.clear();
57
51
  sql.push_back("COMMIT");
58
52
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
59
 
                " `master_id` BIGINT NOT NULL,"
60
53
                " `status` VARCHAR(20) NOT NULL,"
61
 
                " `error_msg` VARCHAR(250),"
62
 
                " PRIMARY KEY(`master_id`))"
63
 
                " COMMENT = 'VERSION 1.1'");
 
54
                " `error_msg` VARCHAR(250))"
 
55
                " COMMENT = 'VERSION 1.0'");
64
56
 
65
57
  if (not executeSQL(sql))
66
58
    return false;
67
59
 
 
60
  sql.clear();
 
61
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state`");
 
62
 
 
63
  {
 
64
    sql::ResultSet result_set(1);
 
65
    Execute execute(*(_session.get()), true);
 
66
    execute.run(sql[0], result_set);
 
67
    result_set.next();
 
68
    string count= result_set.getString(0);
 
69
 
 
70
    /* Must always be at least one row in the table */
 
71
    if (count == "0")
 
72
    {
 
73
      sql.clear();
 
74
      sql.push_back("INSERT INTO `sys_replication`.`io_state` (`status`)"
 
75
                    " VALUES ('STOPPED')");
 
76
      if (not executeSQL(sql))
 
77
        return false;
 
78
    }
 
79
  }
 
80
 
68
81
  /*
69
82
   * Create our applier thread state information table if we need to.
70
83
   */
71
84
 
72
 
  /*
73
 
   * Table: applier_state
74
 
   * Version 1.0: Initial definition
75
 
   * Version 1.1: Added master_id and changed PK to master_id
76
 
   */
77
 
 
78
85
  sql.clear();
79
86
  sql.push_back("COMMIT");
80
 
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` ("
81
 
                " `master_id` BIGINT NOT NULL,"
82
 
                " `last_applied_commit_id` BIGINT NOT NULL,"
 
87
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
 
88
                " (`last_applied_commit_id` BIGINT NOT NULL PRIMARY KEY,"
83
89
                " `status` VARCHAR(20) NOT NULL,"
84
 
                " `error_msg` VARCHAR(250),"
85
 
                " PRIMARY KEY(`master_id`))"
86
 
                " COMMENT = 'VERSION 1.1'");
 
90
                " `error_msg` VARCHAR(250))"
 
91
                " COMMENT = 'VERSION 1.0'");
87
92
 
88
93
  if (not executeSQL(sql))
89
94
    return false;
90
95
 
 
96
  sql.clear();
 
97
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state`");
 
98
 
 
99
  {
 
100
    sql::ResultSet result_set(1);
 
101
    Execute execute(*(_session.get()), true);
 
102
    execute.run(sql[0], result_set);
 
103
    result_set.next();
 
104
    string count= result_set.getString(0);
 
105
 
 
106
    /* Must always be at least one row in the table */
 
107
    if (count == "0")
 
108
    {
 
109
      sql.clear();
 
110
      sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
 
111
                    " (`last_applied_commit_id`, `status`)"
 
112
                    " VALUES (0, 'STOPPED')");
 
113
      if (not executeSQL(sql))
 
114
        return false;
 
115
    }
 
116
  }
 
117
 
91
118
  /*
92
119
   * Create our message queue table if we need to.
93
 
   * Version 1.0: Initial definition
94
 
   * Version 1.1: Added master_id and changed PK to (master_id, trx_id, seg_id)
95
120
   */
96
121
 
97
122
  sql.clear();
98
123
  sql.push_back("COMMIT");
99
 
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue` ("
100
 
                " `master_id` BIGINT NOT NULL,"
101
 
                " `trx_id` BIGINT NOT NULL,"
102
 
                " `seg_id` INT NOT NULL,"
103
 
                " `commit_order` BIGINT,"
104
 
                " `msg` BLOB,"
105
 
                " PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))"
106
 
                " COMMENT = 'VERSION 1.1'");
 
124
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue`"
 
125
                " (`trx_id` BIGINT NOT NULL, `seg_id` INT NOT NULL,"
 
126
                " `commit_order` BIGINT, `msg` BLOB,"
 
127
                " PRIMARY KEY(`trx_id`, `seg_id`))"
 
128
                " COMMENT = 'VERSION 1.0'");
107
129
  if (not executeSQL(sql))
108
130
    return false;
109
131
 
110
132
  return true;
111
133
}
112
134
 
113
 
bool ReplicationSchema::createInitialIORow(uint32_t master_id)
114
 
{
115
 
  vector<string> sql;
116
 
 
117
 
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
118
 
 
119
 
  sql::ResultSet result_set(1);
120
 
  Execute execute(*(_session.get()), true);
121
 
  execute.run(sql[0], result_set);
122
 
  result_set.next();
123
 
  string count= result_set.getString(0);
124
 
 
125
 
  if (count == "0")
126
 
  {
127
 
    sql.clear();
128
 
    sql.push_back("INSERT INTO `sys_replication`.`io_state` (`master_id`, `status`) VALUES ("
129
 
                  + boost::lexical_cast<string>(master_id)
130
 
                  + ", 'STOPPED')");
131
 
    if (not executeSQL(sql))
132
 
      return false;
133
 
  }
134
 
 
135
 
  return true;
136
 
}
137
 
 
138
 
bool ReplicationSchema::createInitialApplierRow(uint32_t master_id)
139
 
{
140
 
  vector<string> sql;
141
 
 
142
 
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
143
 
 
144
 
  sql::ResultSet result_set(1);
145
 
  Execute execute(*(_session.get()), true);
146
 
  execute.run(sql[0], result_set);
147
 
  result_set.next();
148
 
  string count= result_set.getString(0);
149
 
 
150
 
  if (count == "0")
151
 
  {
152
 
    sql.clear();
153
 
    sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
154
 
                  " (`master_id`, `last_applied_commit_id`, `status`) VALUES ("
155
 
                  + boost::lexical_cast<string>(master_id)
156
 
                  + ",0 , 'STOPPED')");
157
 
    if (not executeSQL(sql))
158
 
      return false;
159
 
  }
160
 
 
161
 
  return true;
162
 
}
163
 
 
164
 
bool ReplicationSchema::setInitialMaxCommitId(uint32_t master_id, uint64_t value)
 
135
bool ReplicationSchema::setInitialMaxCommitId(uint64_t value)
165
136
{
166
137
  vector<string> sql;
167
138
 
168
139
  sql.push_back("UPDATE `sys_replication`.`applier_state`"
169
140
                " SET `last_applied_commit_id` = "
170
 
                + lexical_cast<string>(value)
171
 
                + " WHERE `master_id` = "
172
 
                + lexical_cast<string>(master_id));
 
141
                + lexical_cast<string>(value));
173
142
 
174
143
  return executeSQL(sql);
175
144
}