~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/dict/create_replication.cc

  • Committer: Lee Bieber
  • Date: 2011-02-11 20:30:05 UTC
  • mfrom: (2157.1.3 build)
  • Revision ID: kalebral@gmail.com-20110211203005-757o1y2yf78dxzqr
Merge Stewart - 716848: drizzleimport displays wrong program_name
Merge Stewart - update README file
Merge Andrew and Joe - Exposes the InnoDB SYS_REPLICATION_LOG to data_dictionary so that it is fast and fixes many issues we have been having

Show diffs side-by-side

added added

removed removed

Lines of Context:
41
41
#include "usr0sess.h"
42
42
#include "ut0vec.h"
43
43
#include "row0merge.h"
 
44
#include "row0mysql.h"
44
45
 
45
46
UNIV_INTERN ulint dict_create_sys_replication_log(void)
46
47
{
52
53
 
53
54
  table1 = dict_table_get_low("SYS_REPLICATION_LOG");
54
55
 
 
56
  trx_sys_read_commit_id();
 
57
 
55
58
  if (table1) 
56
59
  {
57
60
    mutex_exit(&(dict_sys->mutex));
73
76
  error = que_eval_sql(info,
74
77
                       "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
75
78
                       "BEGIN\n"
76
 
                       "CREATE TABLE SYS_REPLICATION_LOG(ID BINARY(8), SEGID BINARY(4), MESSAGE BLOB);\n"
77
 
                       "CREATE UNIQUE CLUSTERED INDEX ID_IND ON SYS_REPLICATION_LOG (ID);\n"
 
79
                       "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" 
 
80
                       "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
 
81
                       "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
78
82
                       "END;\n"
79
83
                       , FALSE, trx);
80
84
 
105
109
  return(error);
106
110
}
107
111
 
 
112
UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
 
113
{
 
114
  std::string search_string(table_name);
 
115
  boost::algorithm::to_lower(search_string);
 
116
 
 
117
  if (search_string.compare("sys_replication_log") != 0)
 
118
    return -1;
 
119
 
 
120
  drizzled::message::Engine *engine= table_message->mutable_engine();
 
121
  engine->set_name("InnoDB");
 
122
  table_message->set_name("SYS_REPLICATION_LOG");
 
123
  table_message->set_schema("DATA_DICTIONARY");
 
124
  table_message->set_type(drizzled::message::Table::STANDARD);
 
125
  table_message->set_creation_timestamp(0);
 
126
  table_message->set_update_timestamp(0);
 
127
 
 
128
  drizzled::message::Table::TableOptions *options= table_message->mutable_options();
 
129
  options->set_collation_id(drizzled::my_charset_bin.number);
 
130
  options->set_collation(drizzled::my_charset_bin.name);
 
131
 
 
132
  drizzled::message::Table::Field *field= table_message->add_field();
 
133
  field->set_name("ID");
 
134
  field->set_type(drizzled::message::Table::Field::BIGINT);
 
135
 
 
136
  field= table_message->add_field();
 
137
  field->set_name("SEGID");
 
138
  field->set_type(drizzled::message::Table::Field::INTEGER);
 
139
 
 
140
  field= table_message->add_field();
 
141
  field->set_name("COMMIT_ID");
 
142
  field->set_type(drizzled::message::Table::Field::BIGINT);
 
143
 
 
144
  field= table_message->add_field();
 
145
  field->set_name("END_TIMESTAMP");
 
146
  field->set_type(drizzled::message::Table::Field::BIGINT);
 
147
 
 
148
  field= table_message->add_field();
 
149
  field->set_name("MESSAGE_LEN");
 
150
  field->set_type(drizzled::message::Table::Field::INTEGER);
 
151
 
 
152
  field= table_message->add_field();
 
153
  field->set_name("MESSAGE");
 
154
  field->set_type(drizzled::message::Table::Field::BLOB);
 
155
  drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
 
156
  stropt->set_collation_id(drizzled::my_charset_bin.number);
 
157
  stropt->set_collation(drizzled::my_charset_bin.name);
 
158
 
 
159
  drizzled::message::Table::Index *index= table_message->add_indexes();
 
160
  index->set_name("PRIMARY");
 
161
  index->set_is_primary(true);
 
162
  index->set_is_unique(true);
 
163
  index->set_type(drizzled::message::Table::Index::BTREE);
 
164
  index->set_key_length(8);
 
165
  drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
 
166
  part->set_fieldnr(0);
 
167
  part->set_compare_length(8);
 
168
  part= index->add_index_part();
 
169
  part->set_fieldnr(1);
 
170
  part->set_compare_length(4);
 
171
 
 
172
  index= table_message->add_indexes();
 
173
  index->set_name("COMMIT_IDX");
 
174
  index->set_is_primary(false);
 
175
  index->set_is_unique(false);
 
176
  index->set_type(drizzled::message::Table::Index::BTREE);
 
177
  index->set_key_length(8);
 
178
  part= index->add_index_part();
 
179
  part->set_fieldnr(2);
 
180
  part->set_compare_length(8);
 
181
  part= index->add_index_part();
 
182
  part->set_fieldnr(0);
 
183
  part->set_compare_length(8);
 
184
 
 
185
  return 0;
 
186
}
 
187
 
108
188
extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t*    prebuilt);
109
189
 
110
190
ulint insert_replication_message(const char *message, size_t size, 
111
 
                                 trx_t *trx, uint64_t trx_id, uint32_t seg_id)
 
191
                                 trx_t *trx, uint64_t trx_id, 
 
192
                                 uint64_t end_timestamp, bool is_end_segment, 
 
193
                                 uint32_t seg_id) 
112
194
{
113
195
  ulint error;
114
196
  row_prebuilt_t*       prebuilt;       /* For reading rows */
115
197
  dict_table_t *table;
116
198
  que_thr_t*    thr;
 
199
  byte*  data;
117
200
 
118
201
  table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
119
202
 
139
222
 
140
223
  dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
141
224
  dfield_t *dfield;
 
225
 
142
226
  dfield = dtuple_get_nth_field(dtuple, 0);
143
 
 
144
 
  dfield_set_data(dfield, &trx_id, 8);
 
227
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
 
228
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
 
229
  dfield_set_data(dfield, data, 8);
145
230
 
146
231
  dfield = dtuple_get_nth_field(dtuple, 1);
147
 
  dfield_set_data(dfield, &seg_id, 4);
 
232
 
 
233
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
 
234
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
 
235
  dfield_set_data(dfield, data, 4);
 
236
  
 
237
  uint64_t commit_id= 0;
 
238
  if (is_end_segment)
 
239
  {
 
240
    commit_id= trx_sys_commit_id.increment();
 
241
  } 
148
242
 
149
243
  dfield = dtuple_get_nth_field(dtuple, 2);
 
244
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
 
245
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
 
246
  dfield_set_data(dfield, data, 8);
 
247
 
 
248
  dfield = dtuple_get_nth_field(dtuple, 3);
 
249
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
 
250
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
 
251
  dfield_set_data(dfield, data, 8);
 
252
 
 
253
  dfield = dtuple_get_nth_field(dtuple, 4);
 
254
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
 
255
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
 
256
  dfield_set_data(dfield, data, 4);
 
257
 
 
258
  dfield = dtuple_get_nth_field(dtuple, 5);
150
259
  dfield_set_data(dfield, message, size);
151
260
 
152
261
  ins_node_t*   node            = prebuilt->ins_node;
225
334
 
226
335
    // Store transaction id
227
336
    field = rec_get_nth_field_old(rec, 0, &len);
228
 
    ret.id= *(uint64_t *)field;
 
337
    byte idbyte[8];
 
338
    convert_to_mysql_format(idbyte, field, 8);
 
339
    ret.id= *(uint64_t *)idbyte;
229
340
 
230
341
    // Store segment id
231
 
    field = rec_get_nth_field_old(rec, 3, &len);
232
 
    ret.seg_id= *(uint32_t *)field;
 
342
    field = rec_get_nth_field_old(rec, 1, &len);
 
343
    byte segbyte[4];
 
344
    convert_to_mysql_format(segbyte, field, 4);
 
345
    ret.seg_id= *(uint32_t *)segbyte;
 
346
 
 
347
    field = rec_get_nth_field_old(rec, 4, &len);
 
348
    byte commitbyte[8];
 
349
    convert_to_mysql_format(commitbyte, field, 8);
 
350
    ret.commit_id= *(uint64_t *)commitbyte;
 
351
 
 
352
    field = rec_get_nth_field_old(rec, 5, &len);
 
353
    byte timestampbyte[8];
 
354
    convert_to_mysql_format(timestampbyte, field, 8);
 
355
    ret.end_timestamp= *(uint64_t *)timestampbyte;
233
356
 
234
357
    // Handler message
235
 
    field = rec_get_nth_field_old(rec, 4, &len);
 
358
    field = rec_get_nth_field_old(rec, 7, &len);
236
359
    ret.message= (char *)field;
237
360
    ret.message_length= len;
238
361
 
253
376
 
254
377
  return ret;
255
378
}
 
379
 
 
380
UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
 
381
{
 
382
  byte *ptr;
 
383
  ptr = out + len;
 
384
 
 
385
  for (;;) {
 
386
    ptr--;
 
387
    *ptr = *in;
 
388
    if (ptr == out) {
 
389
      break;
 
390
    }
 
391
    in++;
 
392
  }
 
393
 
 
394
  out[len - 1] = (byte) (out[len - 1] ^ 128);
 
395
 
 
396
}