~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/dict/create_replication.cc

  • Committer: Lee Bieber
  • Date: 2011-03-13 16:37:38 UTC
  • mfrom: (2227.4.18 session2)
  • Revision ID: kalebral@gmail.com-20110313163738-7ti21zk40o2xi3ew
Merge Olaf - Refactor Session

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
19
 */
20
20
 
21
 
#include "config.h"
 
21
#include <config.h>
22
22
 
23
23
#include "read_replication.h"
24
24
#include "create_replication.h"
41
41
#include "usr0sess.h"
42
42
#include "ut0vec.h"
43
43
#include "row0merge.h"
 
44
#include "row0mysql.h"
44
45
 
45
46
UNIV_INTERN ulint dict_create_sys_replication_log(void)
46
47
{
52
53
 
53
54
  table1 = dict_table_get_low("SYS_REPLICATION_LOG");
54
55
 
 
56
  trx_sys_read_commit_id();
 
57
 
55
58
  if (table1) 
56
59
  {
57
60
    mutex_exit(&(dict_sys->mutex));
73
76
  error = que_eval_sql(info,
74
77
                       "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
75
78
                       "BEGIN\n"
76
 
                       "CREATE TABLE SYS_REPLICATION_LOG(ID BINARY(8), MESSAGE BLOB);\n"
77
 
                       "CREATE UNIQUE CLUSTERED INDEX ID_IND ON SYS_REPLICATION_LOG (ID);\n"
 
79
                       "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" 
 
80
                       "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
 
81
                       "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
78
82
                       "END;\n"
79
83
                       , FALSE, trx);
80
84
 
105
109
  return(error);
106
110
}
107
111
 
 
112
UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
 
113
{
 
114
  std::string search_string(table_name);
 
115
  boost::algorithm::to_lower(search_string);
 
116
 
 
117
  if (search_string.compare("sys_replication_log") != 0)
 
118
    return -1;
 
119
 
 
120
  drizzled::message::Engine *engine= table_message->mutable_engine();
 
121
  engine->set_name("InnoDB");
 
122
  table_message->set_name("SYS_REPLICATION_LOG");
 
123
  table_message->set_schema("DATA_DICTIONARY");
 
124
  table_message->set_type(drizzled::message::Table::STANDARD);
 
125
  table_message->set_creation_timestamp(0);
 
126
  table_message->set_update_timestamp(0);
 
127
 
 
128
  drizzled::message::Table::TableOptions *options= table_message->mutable_options();
 
129
  options->set_collation_id(drizzled::my_charset_bin.number);
 
130
  options->set_collation(drizzled::my_charset_bin.name);
 
131
  options->set_dont_replicate(true);
 
132
 
 
133
  drizzled::message::Table::Field *field= table_message->add_field();
 
134
  field->set_name("ID");
 
135
  field->set_type(drizzled::message::Table::Field::BIGINT);
 
136
 
 
137
  field= table_message->add_field();
 
138
  field->set_name("SEGID");
 
139
  field->set_type(drizzled::message::Table::Field::INTEGER);
 
140
 
 
141
  field= table_message->add_field();
 
142
  field->set_name("COMMIT_ID");
 
143
  field->set_type(drizzled::message::Table::Field::BIGINT);
 
144
 
 
145
  field= table_message->add_field();
 
146
  field->set_name("END_TIMESTAMP");
 
147
  field->set_type(drizzled::message::Table::Field::BIGINT);
 
148
 
 
149
  field= table_message->add_field();
 
150
  field->set_name("MESSAGE_LEN");
 
151
  field->set_type(drizzled::message::Table::Field::INTEGER);
 
152
 
 
153
  field= table_message->add_field();
 
154
  field->set_name("MESSAGE");
 
155
  field->set_type(drizzled::message::Table::Field::BLOB);
 
156
  drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
 
157
  stropt->set_collation_id(drizzled::my_charset_bin.number);
 
158
  stropt->set_collation(drizzled::my_charset_bin.name);
 
159
 
 
160
  drizzled::message::Table::Index *index= table_message->add_indexes();
 
161
  index->set_name("PRIMARY");
 
162
  index->set_is_primary(true);
 
163
  index->set_is_unique(true);
 
164
  index->set_type(drizzled::message::Table::Index::BTREE);
 
165
  index->set_key_length(12);
 
166
  drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
 
167
  part->set_fieldnr(0);
 
168
  part->set_compare_length(8);
 
169
  part= index->add_index_part();
 
170
  part->set_fieldnr(1);
 
171
  part->set_compare_length(4);
 
172
 
 
173
  index= table_message->add_indexes();
 
174
  index->set_name("COMMIT_IDX");
 
175
  index->set_is_primary(false);
 
176
  index->set_is_unique(false);
 
177
  index->set_type(drizzled::message::Table::Index::BTREE);
 
178
  index->set_key_length(16);
 
179
  part= index->add_index_part();
 
180
  part->set_fieldnr(2);
 
181
  part->set_compare_length(8);
 
182
  part= index->add_index_part();
 
183
  part->set_fieldnr(0);
 
184
  part->set_compare_length(8);
 
185
 
 
186
  return 0;
 
187
}
 
188
 
108
189
extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t*    prebuilt);
109
190
 
110
191
ulint insert_replication_message(const char *message, size_t size, 
111
 
                                 trx_t *trx, uint64_t trx_id)
 
192
                                 trx_t *trx, uint64_t trx_id, 
 
193
                                 uint64_t end_timestamp, bool is_end_segment, 
 
194
                                 uint32_t seg_id) 
112
195
{
113
196
  ulint error;
114
197
  row_prebuilt_t*       prebuilt;       /* For reading rows */
115
198
  dict_table_t *table;
116
199
  que_thr_t*    thr;
 
200
  byte*  data;
117
201
 
118
202
  table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
119
203
 
139
223
 
140
224
  dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
141
225
  dfield_t *dfield;
 
226
 
142
227
  dfield = dtuple_get_nth_field(dtuple, 0);
143
 
 
144
 
  dfield_set_data(dfield, &trx_id, 8);
 
228
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
 
229
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
 
230
  dfield_set_data(dfield, data, 8);
145
231
 
146
232
  dfield = dtuple_get_nth_field(dtuple, 1);
 
233
 
 
234
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
 
235
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
 
236
  dfield_set_data(dfield, data, 4);
 
237
  
 
238
  uint64_t commit_id= 0;
 
239
  if (is_end_segment)
 
240
  {
 
241
    commit_id= trx_sys_commit_id.increment();
 
242
  } 
 
243
 
 
244
  dfield = dtuple_get_nth_field(dtuple, 2);
 
245
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
 
246
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
 
247
  dfield_set_data(dfield, data, 8);
 
248
 
 
249
  dfield = dtuple_get_nth_field(dtuple, 3);
 
250
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
 
251
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
 
252
  dfield_set_data(dfield, data, 8);
 
253
 
 
254
  dfield = dtuple_get_nth_field(dtuple, 4);
 
255
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
 
256
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
 
257
  dfield_set_data(dfield, data, 4);
 
258
 
 
259
  dfield = dtuple_get_nth_field(dtuple, 5);
147
260
  dfield_set_data(dfield, message, size);
148
261
 
149
262
  ins_node_t*   node            = prebuilt->ins_node;
178
291
  return error;
179
292
}
180
293
 
181
 
UNIV_INTERN struct read_replication_state_st *replication_read_init(void)
 
294
UNIV_INTERN read_replication_state_st *replication_read_init(void)
182
295
{
183
 
  struct read_replication_state_st *state= calloc(1, sizeof(struct read_replication_state_st));
 
296
  read_replication_state_st *state= new read_replication_state_st;
184
297
 
185
298
  mutex_enter(&(dict_sys->mutex));
186
299
 
199
312
{
200
313
  btr_pcur_close(&state->pcur);
201
314
  mtr_commit(&state->mtr);
202
 
  free(state);
 
315
  delete state;
203
316
}
204
317
 
205
318
UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
222
335
 
223
336
    // Store transaction id
224
337
    field = rec_get_nth_field_old(rec, 0, &len);
225
 
    ret.id= *(uint64_t *)field;
 
338
    byte idbyte[8];
 
339
    convert_to_mysql_format(idbyte, field, 8);
 
340
    ret.id= *(uint64_t *)idbyte;
 
341
 
 
342
    // Store segment id
 
343
    field = rec_get_nth_field_old(rec, 1, &len);
 
344
    byte segbyte[4];
 
345
    convert_to_mysql_format(segbyte, field, 4);
 
346
    ret.seg_id= *(uint32_t *)segbyte;
 
347
 
 
348
    field = rec_get_nth_field_old(rec, 4, &len);
 
349
    byte commitbyte[8];
 
350
    convert_to_mysql_format(commitbyte, field, 8);
 
351
    ret.commit_id= *(uint64_t *)commitbyte;
 
352
 
 
353
    field = rec_get_nth_field_old(rec, 5, &len);
 
354
    byte timestampbyte[8];
 
355
    convert_to_mysql_format(timestampbyte, field, 8);
 
356
    ret.end_timestamp= *(uint64_t *)timestampbyte;
226
357
 
227
358
    // Handler message
228
 
    field = rec_get_nth_field_old(rec, 3, &len);
 
359
    field = rec_get_nth_field_old(rec, 7, &len);
229
360
    ret.message= (char *)field;
230
361
    ret.message_length= len;
231
362
 
246
377
 
247
378
  return ret;
248
379
}
 
380
 
 
381
UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
 
382
{
 
383
  byte *ptr;
 
384
  ptr = out + len;
 
385
 
 
386
  for (;;) {
 
387
    ptr--;
 
388
    *ptr = *in;
 
389
    if (ptr == out) {
 
390
      break;
 
391
    }
 
392
    in++;
 
393
  }
 
394
 
 
395
  out[len - 1] = (byte) (out[len - 1] ^ 128);
 
396
 
 
397
}