~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/dict/create_replication.cc

  • Committer: Brian Aker
  • Date: 2010-08-12 17:19:46 UTC
  • mfrom: (1701.1.1 turn-off-csv)
  • Revision ID: brian@tangent.org-20100812171946-n44naaqhg27gehlh
MErge Monty, remove CSV from auto-build

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2010 Brian Aker
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; either version 2 of the License, or
9
 
 *  (at your option) any later version.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 
 */
20
 
 
21
 
#include <config.h>
22
 
 
23
 
#include "read_replication.h"
24
 
#include "create_replication.h"
25
 
 
26
 
#ifdef UNIV_NONINL
27
 
#include "dict0crea.ic"
28
 
#endif
29
 
 
30
 
#include "btr0pcur.h"
31
 
#include "btr0btr.h"
32
 
#include "page0page.h"
33
 
#include "mach0data.h"
34
 
#include "dict0boot.h"
35
 
#include "dict0dict.h"
36
 
#include "que0que.h"
37
 
#include "row0ins.h"
38
 
#include "row0mysql.h"
39
 
#include "pars0pars.h"
40
 
#include "trx0roll.h"
41
 
#include "usr0sess.h"
42
 
#include "ut0vec.h"
43
 
#include "row0merge.h"
44
 
#include "row0mysql.h"
45
 
 
46
 
UNIV_INTERN ulint dict_create_sys_replication_log(void)
47
 
{
48
 
  dict_table_t* table1;
49
 
  ulint error;
50
 
  trx_t *trx;
51
 
 
52
 
  mutex_enter(&(dict_sys->mutex));
53
 
 
54
 
  table1 = dict_table_get_low("SYS_REPLICATION_LOG");
55
 
 
56
 
  trx_sys_read_commit_id();
57
 
 
58
 
  if (table1) 
59
 
  {
60
 
    mutex_exit(&(dict_sys->mutex));
61
 
 
62
 
    return(DB_SUCCESS);
63
 
  }
64
 
 
65
 
  mutex_exit(&(dict_sys->mutex));
66
 
 
67
 
  trx= trx_allocate_for_mysql();
68
 
 
69
 
  trx->op_info= "creating replication sys table";
70
 
 
71
 
  row_mysql_lock_data_dictionary(trx);
72
 
 
73
 
  pars_info_t *info= pars_info_create();
74
 
 
75
 
 
76
 
  error = que_eval_sql(info,
77
 
                       "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
78
 
                       "BEGIN\n"
79
 
                       "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" 
80
 
                       "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
81
 
                       "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
82
 
                       "END;\n"
83
 
                       , FALSE, trx);
84
 
 
85
 
 
86
 
 
87
 
  if (error != DB_SUCCESS)
88
 
  {
89
 
    fprintf(stderr, "InnoDB: error %lu in creation.\n", (ulong) error);
90
 
 
91
 
    ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
92
 
 
93
 
    fprintf(stderr,
94
 
            "InnoDB: creation failed\n"
95
 
            "InnoDB: tablespace is full\n"
96
 
            "InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
97
 
 
98
 
    row_drop_table_for_mysql("SYS_REPLICATION_LOG", trx, TRUE);
99
 
 
100
 
    error = DB_MUST_GET_MORE_FILE_SPACE;
101
 
  }
102
 
 
103
 
  trx_commit_for_mysql(trx);
104
 
 
105
 
  row_mysql_unlock_data_dictionary(trx);
106
 
 
107
 
  trx_free_for_mysql(trx);
108
 
 
109
 
  return(error);
110
 
}
111
 
 
112
 
UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
113
 
{
114
 
  std::string search_string(table_name);
115
 
  boost::algorithm::to_lower(search_string);
116
 
 
117
 
  if (search_string.compare("sys_replication_log") != 0)
118
 
    return -1;
119
 
 
120
 
  drizzled::message::Engine *engine= table_message->mutable_engine();
121
 
  engine->set_name("InnoDB");
122
 
  table_message->set_name("SYS_REPLICATION_LOG");
123
 
  table_message->set_schema("DATA_DICTIONARY");
124
 
  table_message->set_type(drizzled::message::Table::STANDARD);
125
 
  table_message->set_creation_timestamp(0);
126
 
  table_message->set_update_timestamp(0);
127
 
 
128
 
  drizzled::message::Table::TableOptions *options= table_message->mutable_options();
129
 
  options->set_collation_id(drizzled::my_charset_bin.number);
130
 
  options->set_collation(drizzled::my_charset_bin.name);
131
 
  options->set_dont_replicate(true);
132
 
 
133
 
  drizzled::message::Table::Field *field= table_message->add_field();
134
 
  field->set_name("ID");
135
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
136
 
 
137
 
  field= table_message->add_field();
138
 
  field->set_name("SEGID");
139
 
  field->set_type(drizzled::message::Table::Field::INTEGER);
140
 
 
141
 
  field= table_message->add_field();
142
 
  field->set_name("COMMIT_ID");
143
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
144
 
 
145
 
  field= table_message->add_field();
146
 
  field->set_name("END_TIMESTAMP");
147
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
148
 
 
149
 
  field= table_message->add_field();
150
 
  field->set_name("MESSAGE_LEN");
151
 
  field->set_type(drizzled::message::Table::Field::INTEGER);
152
 
 
153
 
  field= table_message->add_field();
154
 
  field->set_name("MESSAGE");
155
 
  field->set_type(drizzled::message::Table::Field::BLOB);
156
 
  drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
157
 
  stropt->set_collation_id(drizzled::my_charset_bin.number);
158
 
  stropt->set_collation(drizzled::my_charset_bin.name);
159
 
 
160
 
  drizzled::message::Table::Index *index= table_message->add_indexes();
161
 
  index->set_name("PRIMARY");
162
 
  index->set_is_primary(true);
163
 
  index->set_is_unique(true);
164
 
  index->set_type(drizzled::message::Table::Index::BTREE);
165
 
  index->set_key_length(8);
166
 
  drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
167
 
  part->set_fieldnr(0);
168
 
  part->set_compare_length(8);
169
 
  part= index->add_index_part();
170
 
  part->set_fieldnr(1);
171
 
  part->set_compare_length(4);
172
 
 
173
 
  index= table_message->add_indexes();
174
 
  index->set_name("COMMIT_IDX");
175
 
  index->set_is_primary(false);
176
 
  index->set_is_unique(false);
177
 
  index->set_type(drizzled::message::Table::Index::BTREE);
178
 
  index->set_key_length(8);
179
 
  part= index->add_index_part();
180
 
  part->set_fieldnr(2);
181
 
  part->set_compare_length(8);
182
 
  part= index->add_index_part();
183
 
  part->set_fieldnr(0);
184
 
  part->set_compare_length(8);
185
 
 
186
 
  return 0;
187
 
}
188
 
 
189
 
extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t*    prebuilt);
190
 
 
191
 
ulint insert_replication_message(const char *message, size_t size, 
192
 
                                 trx_t *trx, uint64_t trx_id, 
193
 
                                 uint64_t end_timestamp, bool is_end_segment, 
194
 
                                 uint32_t seg_id) 
195
 
{
196
 
  ulint error;
197
 
  row_prebuilt_t*       prebuilt;       /* For reading rows */
198
 
  dict_table_t *table;
199
 
  que_thr_t*    thr;
200
 
  byte*  data;
201
 
 
202
 
  table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
203
 
 
204
 
  prebuilt = row_create_prebuilt(table);
205
 
 
206
 
  if (prebuilt->trx != trx) 
207
 
  {
208
 
    row_update_prebuilt_trx(prebuilt, trx);
209
 
  }
210
 
 
211
 
  /* DDL operations create table/drop table call
212
 
   * innobase_commit_low() which will commit the trx
213
 
   * that leaves the operation of committing to the
214
 
   * log in a new trx. If that is the case we need
215
 
   * to keep track and commit the trx later in this
216
 
   * function. 
217
 
   */ 
218
 
  bool is_started= true;
219
 
  if (trx->conc_state == TRX_NOT_STARTED)
220
 
  {
221
 
    is_started= false;
222
 
  }
223
 
 
224
 
  dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
225
 
  dfield_t *dfield;
226
 
 
227
 
  dfield = dtuple_get_nth_field(dtuple, 0);
228
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
229
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
230
 
  dfield_set_data(dfield, data, 8);
231
 
 
232
 
  dfield = dtuple_get_nth_field(dtuple, 1);
233
 
 
234
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
235
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
236
 
  dfield_set_data(dfield, data, 4);
237
 
  
238
 
  uint64_t commit_id= 0;
239
 
  if (is_end_segment)
240
 
  {
241
 
    commit_id= trx_sys_commit_id.increment();
242
 
  } 
243
 
 
244
 
  dfield = dtuple_get_nth_field(dtuple, 2);
245
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
246
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
247
 
  dfield_set_data(dfield, data, 8);
248
 
 
249
 
  dfield = dtuple_get_nth_field(dtuple, 3);
250
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
251
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
252
 
  dfield_set_data(dfield, data, 8);
253
 
 
254
 
  dfield = dtuple_get_nth_field(dtuple, 4);
255
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
256
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
257
 
  dfield_set_data(dfield, data, 4);
258
 
 
259
 
  dfield = dtuple_get_nth_field(dtuple, 5);
260
 
  dfield_set_data(dfield, message, size);
261
 
 
262
 
  ins_node_t*   node            = prebuilt->ins_node;
263
 
 
264
 
  thr = que_fork_get_first_thr(prebuilt->ins_graph);
265
 
 
266
 
  if (prebuilt->sql_stat_start) {
267
 
    node->state = INS_NODE_SET_IX_LOCK;
268
 
    prebuilt->sql_stat_start = FALSE;
269
 
  } else {
270
 
    node->state = INS_NODE_ALLOC_ROW_ID;
271
 
  }
272
 
 
273
 
  que_thr_move_to_run_state_for_mysql(thr, trx);
274
 
 
275
 
//run_again:
276
 
  thr->run_node = node;
277
 
  thr->prev_node = node;
278
 
 
279
 
  row_ins_step(thr);
280
 
 
281
 
  error = trx->error_state;
282
 
 
283
 
  que_thr_stop_for_mysql_no_error(thr, trx);
284
 
  row_prebuilt_free(prebuilt, FALSE);
285
 
 
286
 
  if (! is_started)
287
 
  {
288
 
    trx_commit_for_mysql(trx);
289
 
  }
290
 
 
291
 
  return error;
292
 
}
293
 
 
294
 
UNIV_INTERN read_replication_state_st *replication_read_init(void)
295
 
{
296
 
  read_replication_state_st *state= new read_replication_state_st;
297
 
 
298
 
  mutex_enter(&(dict_sys->mutex));
299
 
 
300
 
  mtr_start(&state->mtr);
301
 
  state->sys_tables= dict_table_get_low("SYS_REPLICATION_LOG");
302
 
  state->sys_index= UT_LIST_GET_FIRST(state->sys_tables->indexes);
303
 
 
304
 
  mutex_exit(&(dict_sys->mutex));
305
 
 
306
 
  btr_pcur_open_at_index_side(TRUE, state->sys_index, BTR_SEARCH_LEAF, &state->pcur, TRUE, &state->mtr);
307
 
 
308
 
  return state;
309
 
}
310
 
 
311
 
UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *state)
312
 
{
313
 
  btr_pcur_close(&state->pcur);
314
 
  mtr_commit(&state->mtr);
315
 
  delete state;
316
 
}
317
 
 
318
 
UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
319
 
{
320
 
  struct read_replication_return_st ret;
321
 
  const rec_t *rec;
322
 
 
323
 
  btr_pcur_move_to_next_user_rec(&state->pcur, &state->mtr);
324
 
 
325
 
  rec= btr_pcur_get_rec(&state->pcur);
326
 
 
327
 
  while (btr_pcur_is_on_user_rec(&state->pcur))
328
 
  {
329
 
    const byte* field;
330
 
    ulint len;
331
 
 
332
 
    // Is the row deleted? If so go fetch the next
333
 
    if (rec_get_deleted_flag(rec, 0))
334
 
      continue;
335
 
 
336
 
    // Store transaction id
337
 
    field = rec_get_nth_field_old(rec, 0, &len);
338
 
    byte idbyte[8];
339
 
    convert_to_mysql_format(idbyte, field, 8);
340
 
    ret.id= *(uint64_t *)idbyte;
341
 
 
342
 
    // Store segment id
343
 
    field = rec_get_nth_field_old(rec, 1, &len);
344
 
    byte segbyte[4];
345
 
    convert_to_mysql_format(segbyte, field, 4);
346
 
    ret.seg_id= *(uint32_t *)segbyte;
347
 
 
348
 
    field = rec_get_nth_field_old(rec, 4, &len);
349
 
    byte commitbyte[8];
350
 
    convert_to_mysql_format(commitbyte, field, 8);
351
 
    ret.commit_id= *(uint64_t *)commitbyte;
352
 
 
353
 
    field = rec_get_nth_field_old(rec, 5, &len);
354
 
    byte timestampbyte[8];
355
 
    convert_to_mysql_format(timestampbyte, field, 8);
356
 
    ret.end_timestamp= *(uint64_t *)timestampbyte;
357
 
 
358
 
    // Handler message
359
 
    field = rec_get_nth_field_old(rec, 7, &len);
360
 
    ret.message= (char *)field;
361
 
    ret.message_length= len;
362
 
 
363
 
    // @todo double check that "field" will continue to be value past this
364
 
    // point.
365
 
    btr_pcur_store_position(&state->pcur, &state->mtr);
366
 
    mtr_commit(&state->mtr);
367
 
 
368
 
    mtr_start(&state->mtr);
369
 
 
370
 
    btr_pcur_restore_position(BTR_SEARCH_LEAF, &state->pcur, &state->mtr);
371
 
 
372
 
    return ret;
373
 
  }
374
 
 
375
 
  /* end of index */
376
 
  memset(&ret, 0, sizeof(ret));
377
 
 
378
 
  return ret;
379
 
}
380
 
 
381
 
UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
382
 
{
383
 
  byte *ptr;
384
 
  ptr = out + len;
385
 
 
386
 
  for (;;) {
387
 
    ptr--;
388
 
    *ptr = *in;
389
 
    if (ptr == out) {
390
 
      break;
391
 
    }
392
 
    in++;
393
 
  }
394
 
 
395
 
  out[len - 1] = (byte) (out[len - 1] ^ 128);
396
 
 
397
 
}