~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/dict/create_replication.cc

  • Committer: Brian Aker
  • Date: 2011-01-12 06:45:23 UTC
  • mto: (2073.1.4 catalogs)
  • mto: This revision was merged to the branch mainline in revision 2080.
  • Revision ID: brian@tangent.org-20110112064523-rqhptaqbph22qmj1
RemoveĀ customĀ error.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
19
 */
20
20
 
21
 
#include <config.h>
 
21
#include "config.h"
22
22
 
23
23
#include "read_replication.h"
24
24
#include "create_replication.h"
41
41
#include "usr0sess.h"
42
42
#include "ut0vec.h"
43
43
#include "row0merge.h"
44
 
#include "row0mysql.h"
45
44
 
46
45
UNIV_INTERN ulint dict_create_sys_replication_log(void)
47
46
{
53
52
 
54
53
  table1 = dict_table_get_low("SYS_REPLICATION_LOG");
55
54
 
56
 
  trx_sys_read_commit_id();
57
 
 
58
55
  if (table1) 
59
56
  {
60
57
    mutex_exit(&(dict_sys->mutex));
76
73
  error = que_eval_sql(info,
77
74
                       "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
78
75
                       "BEGIN\n"
79
 
                       "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" 
80
 
                       "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
81
 
                       "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
 
76
                       "CREATE TABLE SYS_REPLICATION_LOG(ID BINARY(8), MESSAGE BLOB);\n"
 
77
                       "CREATE UNIQUE CLUSTERED INDEX ID_IND ON SYS_REPLICATION_LOG (ID);\n"
82
78
                       "END;\n"
83
79
                       , FALSE, trx);
84
80
 
109
105
  return(error);
110
106
}
111
107
 
112
 
UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
113
 
{
114
 
  std::string search_string(table_name);
115
 
  boost::algorithm::to_lower(search_string);
116
 
 
117
 
  if (search_string.compare("sys_replication_log") != 0)
118
 
    return -1;
119
 
 
120
 
  drizzled::message::Engine *engine= table_message->mutable_engine();
121
 
  engine->set_name("InnoDB");
122
 
  table_message->set_name("SYS_REPLICATION_LOG");
123
 
  table_message->set_schema("DATA_DICTIONARY");
124
 
  table_message->set_type(drizzled::message::Table::STANDARD);
125
 
  table_message->set_creation_timestamp(0);
126
 
  table_message->set_update_timestamp(0);
127
 
 
128
 
  drizzled::message::Table::TableOptions *options= table_message->mutable_options();
129
 
  options->set_collation_id(drizzled::my_charset_bin.number);
130
 
  options->set_collation(drizzled::my_charset_bin.name);
131
 
  options->set_dont_replicate(true);
132
 
 
133
 
  drizzled::message::Table::Field *field= table_message->add_field();
134
 
  field->set_name("ID");
135
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
136
 
 
137
 
  field= table_message->add_field();
138
 
  field->set_name("SEGID");
139
 
  field->set_type(drizzled::message::Table::Field::INTEGER);
140
 
 
141
 
  field= table_message->add_field();
142
 
  field->set_name("COMMIT_ID");
143
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
144
 
 
145
 
  field= table_message->add_field();
146
 
  field->set_name("END_TIMESTAMP");
147
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
148
 
 
149
 
  field= table_message->add_field();
150
 
  field->set_name("MESSAGE_LEN");
151
 
  field->set_type(drizzled::message::Table::Field::INTEGER);
152
 
 
153
 
  field= table_message->add_field();
154
 
  field->set_name("MESSAGE");
155
 
  field->set_type(drizzled::message::Table::Field::BLOB);
156
 
  drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
157
 
  stropt->set_collation_id(drizzled::my_charset_bin.number);
158
 
  stropt->set_collation(drizzled::my_charset_bin.name);
159
 
 
160
 
  drizzled::message::Table::Index *index= table_message->add_indexes();
161
 
  index->set_name("PRIMARY");
162
 
  index->set_is_primary(true);
163
 
  index->set_is_unique(true);
164
 
  index->set_type(drizzled::message::Table::Index::BTREE);
165
 
  index->set_key_length(8);
166
 
  drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
167
 
  part->set_fieldnr(0);
168
 
  part->set_compare_length(8);
169
 
  part= index->add_index_part();
170
 
  part->set_fieldnr(1);
171
 
  part->set_compare_length(4);
172
 
 
173
 
  index= table_message->add_indexes();
174
 
  index->set_name("COMMIT_IDX");
175
 
  index->set_is_primary(false);
176
 
  index->set_is_unique(false);
177
 
  index->set_type(drizzled::message::Table::Index::BTREE);
178
 
  index->set_key_length(8);
179
 
  part= index->add_index_part();
180
 
  part->set_fieldnr(2);
181
 
  part->set_compare_length(8);
182
 
  part= index->add_index_part();
183
 
  part->set_fieldnr(0);
184
 
  part->set_compare_length(8);
185
 
 
186
 
  return 0;
187
 
}
188
 
 
189
108
extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t*    prebuilt);
190
109
 
191
110
ulint insert_replication_message(const char *message, size_t size, 
192
 
                                 trx_t *trx, uint64_t trx_id, 
193
 
                                 uint64_t end_timestamp, bool is_end_segment, 
194
 
                                 uint32_t seg_id) 
 
111
                                 trx_t *trx, uint64_t trx_id)
195
112
{
196
113
  ulint error;
197
114
  row_prebuilt_t*       prebuilt;       /* For reading rows */
198
115
  dict_table_t *table;
199
116
  que_thr_t*    thr;
200
 
  byte*  data;
201
117
 
202
118
  table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
203
119
 
223
139
 
224
140
  dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
225
141
  dfield_t *dfield;
226
 
 
227
142
  dfield = dtuple_get_nth_field(dtuple, 0);
228
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
229
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
230
 
  dfield_set_data(dfield, data, 8);
 
143
 
 
144
  dfield_set_data(dfield, &trx_id, 8);
231
145
 
232
146
  dfield = dtuple_get_nth_field(dtuple, 1);
233
 
 
234
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
235
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
236
 
  dfield_set_data(dfield, data, 4);
237
 
  
238
 
  uint64_t commit_id= 0;
239
 
  if (is_end_segment)
240
 
  {
241
 
    commit_id= trx_sys_commit_id.increment();
242
 
  } 
243
 
 
244
 
  dfield = dtuple_get_nth_field(dtuple, 2);
245
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
246
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
247
 
  dfield_set_data(dfield, data, 8);
248
 
 
249
 
  dfield = dtuple_get_nth_field(dtuple, 3);
250
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
251
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
252
 
  dfield_set_data(dfield, data, 8);
253
 
 
254
 
  dfield = dtuple_get_nth_field(dtuple, 4);
255
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
256
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
257
 
  dfield_set_data(dfield, data, 4);
258
 
 
259
 
  dfield = dtuple_get_nth_field(dtuple, 5);
260
147
  dfield_set_data(dfield, message, size);
261
148
 
262
149
  ins_node_t*   node            = prebuilt->ins_node;
335
222
 
336
223
    // Store transaction id
337
224
    field = rec_get_nth_field_old(rec, 0, &len);
338
 
    byte idbyte[8];
339
 
    convert_to_mysql_format(idbyte, field, 8);
340
 
    ret.id= *(uint64_t *)idbyte;
341
 
 
342
 
    // Store segment id
343
 
    field = rec_get_nth_field_old(rec, 1, &len);
344
 
    byte segbyte[4];
345
 
    convert_to_mysql_format(segbyte, field, 4);
346
 
    ret.seg_id= *(uint32_t *)segbyte;
347
 
 
348
 
    field = rec_get_nth_field_old(rec, 4, &len);
349
 
    byte commitbyte[8];
350
 
    convert_to_mysql_format(commitbyte, field, 8);
351
 
    ret.commit_id= *(uint64_t *)commitbyte;
352
 
 
353
 
    field = rec_get_nth_field_old(rec, 5, &len);
354
 
    byte timestampbyte[8];
355
 
    convert_to_mysql_format(timestampbyte, field, 8);
356
 
    ret.end_timestamp= *(uint64_t *)timestampbyte;
 
225
    ret.id= *(uint64_t *)field;
357
226
 
358
227
    // Handler message
359
 
    field = rec_get_nth_field_old(rec, 7, &len);
 
228
    field = rec_get_nth_field_old(rec, 3, &len);
360
229
    ret.message= (char *)field;
361
230
    ret.message_length= len;
362
231
 
377
246
 
378
247
  return ret;
379
248
}
380
 
 
381
 
UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
382
 
{
383
 
  byte *ptr;
384
 
  ptr = out + len;
385
 
 
386
 
  for (;;) {
387
 
    ptr--;
388
 
    *ptr = *in;
389
 
    if (ptr == out) {
390
 
      break;
391
 
    }
392
 
    in++;
393
 
  }
394
 
 
395
 
  out[len - 1] = (byte) (out[len - 1] ^ 128);
396
 
 
397
 
}