~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/dict/create_replication.cc

  • Committer: lbieber
  • Date: 2010-10-06 16:34:16 UTC
  • mfrom: (1816.1.3 build)
  • Revision ID: lbieber@orisndriz08-20101006163416-ea0sl59qgpglk21y
Merge Monty - Change the requirement from either libinnodb to libhaildb. Also, tied it to version 2.2
Merge Andrew - fix bug 650935: remove --compress from all clients
Merge Andrew - fix bug 653471: Add -A to drizzle client
Merge Travis - 621861 = To change C structs to C++ classes in Drizzle

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2010 Brian Aker
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; either version 2 of the License, or
9
 
 *  (at your option) any later version.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 
 */
20
 
 
21
 
#include "config.h"
22
 
 
23
 
#include "read_replication.h"
24
 
#include "create_replication.h"
25
 
 
26
 
#ifdef UNIV_NONINL
27
 
#include "dict0crea.ic"
28
 
#endif
29
 
 
30
 
#include "btr0pcur.h"
31
 
#include "btr0btr.h"
32
 
#include "page0page.h"
33
 
#include "mach0data.h"
34
 
#include "dict0boot.h"
35
 
#include "dict0dict.h"
36
 
#include "que0que.h"
37
 
#include "row0ins.h"
38
 
#include "row0mysql.h"
39
 
#include "pars0pars.h"
40
 
#include "trx0roll.h"
41
 
#include "usr0sess.h"
42
 
#include "ut0vec.h"
43
 
#include "row0merge.h"
44
 
#include "row0mysql.h"
45
 
 
46
 
UNIV_INTERN ulint dict_create_sys_replication_log(void)
47
 
{
48
 
  dict_table_t* table1;
49
 
  ulint error;
50
 
  trx_t *trx;
51
 
 
52
 
  mutex_enter(&(dict_sys->mutex));
53
 
 
54
 
  table1 = dict_table_get_low("SYS_REPLICATION_LOG");
55
 
 
56
 
  trx_sys_read_commit_id();
57
 
 
58
 
  if (table1) 
59
 
  {
60
 
    mutex_exit(&(dict_sys->mutex));
61
 
 
62
 
    return(DB_SUCCESS);
63
 
  }
64
 
 
65
 
  mutex_exit(&(dict_sys->mutex));
66
 
 
67
 
  trx= trx_allocate_for_mysql();
68
 
 
69
 
  trx->op_info= "creating replication sys table";
70
 
 
71
 
  row_mysql_lock_data_dictionary(trx);
72
 
 
73
 
  pars_info_t *info= pars_info_create();
74
 
 
75
 
 
76
 
  error = que_eval_sql(info,
77
 
                       "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
78
 
                       "BEGIN\n"
79
 
                       "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" 
80
 
                       "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
81
 
                       "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
82
 
                       "END;\n"
83
 
                       , FALSE, trx);
84
 
 
85
 
 
86
 
 
87
 
  if (error != DB_SUCCESS)
88
 
  {
89
 
    fprintf(stderr, "InnoDB: error %lu in creation.\n", (ulong) error);
90
 
 
91
 
    ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
92
 
 
93
 
    fprintf(stderr,
94
 
            "InnoDB: creation failed\n"
95
 
            "InnoDB: tablespace is full\n"
96
 
            "InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
97
 
 
98
 
    row_drop_table_for_mysql("SYS_REPLICATION_LOG", trx, TRUE);
99
 
 
100
 
    error = DB_MUST_GET_MORE_FILE_SPACE;
101
 
  }
102
 
 
103
 
  trx_commit_for_mysql(trx);
104
 
 
105
 
  row_mysql_unlock_data_dictionary(trx);
106
 
 
107
 
  trx_free_for_mysql(trx);
108
 
 
109
 
  return(error);
110
 
}
111
 
 
112
 
UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
113
 
{
114
 
  std::string search_string(table_name);
115
 
  boost::algorithm::to_lower(search_string);
116
 
 
117
 
  if (search_string.compare("sys_replication_log") != 0)
118
 
    return -1;
119
 
 
120
 
  drizzled::message::Engine *engine= table_message->mutable_engine();
121
 
  engine->set_name("InnoDB");
122
 
  table_message->set_name("SYS_REPLICATION_LOG");
123
 
  table_message->set_schema("DATA_DICTIONARY");
124
 
  table_message->set_type(drizzled::message::Table::STANDARD);
125
 
  table_message->set_creation_timestamp(0);
126
 
  table_message->set_update_timestamp(0);
127
 
 
128
 
  drizzled::message::Table::TableOptions *options= table_message->mutable_options();
129
 
  options->set_collation_id(drizzled::my_charset_bin.number);
130
 
  options->set_collation(drizzled::my_charset_bin.name);
131
 
 
132
 
  drizzled::message::Table::Field *field= table_message->add_field();
133
 
  field->set_name("ID");
134
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
135
 
 
136
 
  field= table_message->add_field();
137
 
  field->set_name("SEGID");
138
 
  field->set_type(drizzled::message::Table::Field::INTEGER);
139
 
 
140
 
  field= table_message->add_field();
141
 
  field->set_name("COMMIT_ID");
142
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
143
 
 
144
 
  field= table_message->add_field();
145
 
  field->set_name("END_TIMESTAMP");
146
 
  field->set_type(drizzled::message::Table::Field::BIGINT);
147
 
 
148
 
  field= table_message->add_field();
149
 
  field->set_name("MESSAGE_LEN");
150
 
  field->set_type(drizzled::message::Table::Field::INTEGER);
151
 
 
152
 
  field= table_message->add_field();
153
 
  field->set_name("MESSAGE");
154
 
  field->set_type(drizzled::message::Table::Field::BLOB);
155
 
  drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
156
 
  stropt->set_collation_id(drizzled::my_charset_bin.number);
157
 
  stropt->set_collation(drizzled::my_charset_bin.name);
158
 
 
159
 
  drizzled::message::Table::Index *index= table_message->add_indexes();
160
 
  index->set_name("PRIMARY");
161
 
  index->set_is_primary(true);
162
 
  index->set_is_unique(true);
163
 
  index->set_type(drizzled::message::Table::Index::BTREE);
164
 
  index->set_key_length(8);
165
 
  drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
166
 
  part->set_fieldnr(0);
167
 
  part->set_compare_length(8);
168
 
  part= index->add_index_part();
169
 
  part->set_fieldnr(1);
170
 
  part->set_compare_length(4);
171
 
 
172
 
  index= table_message->add_indexes();
173
 
  index->set_name("COMMIT_IDX");
174
 
  index->set_is_primary(false);
175
 
  index->set_is_unique(false);
176
 
  index->set_type(drizzled::message::Table::Index::BTREE);
177
 
  index->set_key_length(8);
178
 
  part= index->add_index_part();
179
 
  part->set_fieldnr(2);
180
 
  part->set_compare_length(8);
181
 
  part= index->add_index_part();
182
 
  part->set_fieldnr(0);
183
 
  part->set_compare_length(8);
184
 
 
185
 
  return 0;
186
 
}
187
 
 
188
 
extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t*    prebuilt);
189
 
 
190
 
ulint insert_replication_message(const char *message, size_t size, 
191
 
                                 trx_t *trx, uint64_t trx_id, 
192
 
                                 uint64_t end_timestamp, bool is_end_segment, 
193
 
                                 uint32_t seg_id) 
194
 
{
195
 
  ulint error;
196
 
  row_prebuilt_t*       prebuilt;       /* For reading rows */
197
 
  dict_table_t *table;
198
 
  que_thr_t*    thr;
199
 
  byte*  data;
200
 
 
201
 
  table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
202
 
 
203
 
  prebuilt = row_create_prebuilt(table);
204
 
 
205
 
  if (prebuilt->trx != trx) 
206
 
  {
207
 
    row_update_prebuilt_trx(prebuilt, trx);
208
 
  }
209
 
 
210
 
  /* DDL operations create table/drop table call
211
 
   * innobase_commit_low() which will commit the trx
212
 
   * that leaves the operation of committing to the
213
 
   * log in a new trx. If that is the case we need
214
 
   * to keep track and commit the trx later in this
215
 
   * function. 
216
 
   */ 
217
 
  bool is_started= true;
218
 
  if (trx->conc_state == TRX_NOT_STARTED)
219
 
  {
220
 
    is_started= false;
221
 
  }
222
 
 
223
 
  dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
224
 
  dfield_t *dfield;
225
 
 
226
 
  dfield = dtuple_get_nth_field(dtuple, 0);
227
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
228
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
229
 
  dfield_set_data(dfield, data, 8);
230
 
 
231
 
  dfield = dtuple_get_nth_field(dtuple, 1);
232
 
 
233
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
234
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
235
 
  dfield_set_data(dfield, data, 4);
236
 
  
237
 
  uint64_t commit_id= 0;
238
 
  if (is_end_segment)
239
 
  {
240
 
    commit_id= trx_sys_commit_id.increment();
241
 
  } 
242
 
 
243
 
  dfield = dtuple_get_nth_field(dtuple, 2);
244
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
245
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
246
 
  dfield_set_data(dfield, data, 8);
247
 
 
248
 
  dfield = dtuple_get_nth_field(dtuple, 3);
249
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
250
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
251
 
  dfield_set_data(dfield, data, 8);
252
 
 
253
 
  dfield = dtuple_get_nth_field(dtuple, 4);
254
 
  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
255
 
  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
256
 
  dfield_set_data(dfield, data, 4);
257
 
 
258
 
  dfield = dtuple_get_nth_field(dtuple, 5);
259
 
  dfield_set_data(dfield, message, size);
260
 
 
261
 
  ins_node_t*   node            = prebuilt->ins_node;
262
 
 
263
 
  thr = que_fork_get_first_thr(prebuilt->ins_graph);
264
 
 
265
 
  if (prebuilt->sql_stat_start) {
266
 
    node->state = INS_NODE_SET_IX_LOCK;
267
 
    prebuilt->sql_stat_start = FALSE;
268
 
  } else {
269
 
    node->state = INS_NODE_ALLOC_ROW_ID;
270
 
  }
271
 
 
272
 
  que_thr_move_to_run_state_for_mysql(thr, trx);
273
 
 
274
 
//run_again:
275
 
  thr->run_node = node;
276
 
  thr->prev_node = node;
277
 
 
278
 
  row_ins_step(thr);
279
 
 
280
 
  error = trx->error_state;
281
 
 
282
 
  que_thr_stop_for_mysql_no_error(thr, trx);
283
 
  row_prebuilt_free(prebuilt, FALSE);
284
 
 
285
 
  if (! is_started)
286
 
  {
287
 
    trx_commit_for_mysql(trx);
288
 
  }
289
 
 
290
 
  return error;
291
 
}
292
 
 
293
 
UNIV_INTERN read_replication_state_st *replication_read_init(void)
294
 
{
295
 
  read_replication_state_st *state= new read_replication_state_st;
296
 
 
297
 
  mutex_enter(&(dict_sys->mutex));
298
 
 
299
 
  mtr_start(&state->mtr);
300
 
  state->sys_tables= dict_table_get_low("SYS_REPLICATION_LOG");
301
 
  state->sys_index= UT_LIST_GET_FIRST(state->sys_tables->indexes);
302
 
 
303
 
  mutex_exit(&(dict_sys->mutex));
304
 
 
305
 
  btr_pcur_open_at_index_side(TRUE, state->sys_index, BTR_SEARCH_LEAF, &state->pcur, TRUE, &state->mtr);
306
 
 
307
 
  return state;
308
 
}
309
 
 
310
 
UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *state)
311
 
{
312
 
  btr_pcur_close(&state->pcur);
313
 
  mtr_commit(&state->mtr);
314
 
  delete state;
315
 
}
316
 
 
317
 
UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
318
 
{
319
 
  struct read_replication_return_st ret;
320
 
  const rec_t *rec;
321
 
 
322
 
  btr_pcur_move_to_next_user_rec(&state->pcur, &state->mtr);
323
 
 
324
 
  rec= btr_pcur_get_rec(&state->pcur);
325
 
 
326
 
  while (btr_pcur_is_on_user_rec(&state->pcur))
327
 
  {
328
 
    const byte* field;
329
 
    ulint len;
330
 
 
331
 
    // Is the row deleted? If so go fetch the next
332
 
    if (rec_get_deleted_flag(rec, 0))
333
 
      continue;
334
 
 
335
 
    // Store transaction id
336
 
    field = rec_get_nth_field_old(rec, 0, &len);
337
 
    byte idbyte[8];
338
 
    convert_to_mysql_format(idbyte, field, 8);
339
 
    ret.id= *(uint64_t *)idbyte;
340
 
 
341
 
    // Store segment id
342
 
    field = rec_get_nth_field_old(rec, 1, &len);
343
 
    byte segbyte[4];
344
 
    convert_to_mysql_format(segbyte, field, 4);
345
 
    ret.seg_id= *(uint32_t *)segbyte;
346
 
 
347
 
    field = rec_get_nth_field_old(rec, 4, &len);
348
 
    byte commitbyte[8];
349
 
    convert_to_mysql_format(commitbyte, field, 8);
350
 
    ret.commit_id= *(uint64_t *)commitbyte;
351
 
 
352
 
    field = rec_get_nth_field_old(rec, 5, &len);
353
 
    byte timestampbyte[8];
354
 
    convert_to_mysql_format(timestampbyte, field, 8);
355
 
    ret.end_timestamp= *(uint64_t *)timestampbyte;
356
 
 
357
 
    // Handler message
358
 
    field = rec_get_nth_field_old(rec, 7, &len);
359
 
    ret.message= (char *)field;
360
 
    ret.message_length= len;
361
 
 
362
 
    // @todo double check that "field" will continue to be value past this
363
 
    // point.
364
 
    btr_pcur_store_position(&state->pcur, &state->mtr);
365
 
    mtr_commit(&state->mtr);
366
 
 
367
 
    mtr_start(&state->mtr);
368
 
 
369
 
    btr_pcur_restore_position(BTR_SEARCH_LEAF, &state->pcur, &state->mtr);
370
 
 
371
 
    return ret;
372
 
  }
373
 
 
374
 
  /* end of index */
375
 
  memset(&ret, 0, sizeof(ret));
376
 
 
377
 
  return ret;
378
 
}
379
 
 
380
 
UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
381
 
{
382
 
  byte *ptr;
383
 
  ptr = out + len;
384
 
 
385
 
  for (;;) {
386
 
    ptr--;
387
 
    *ptr = *in;
388
 
    if (ptr == out) {
389
 
      break;
390
 
    }
391
 
    in++;
392
 
  }
393
 
 
394
 
  out[len - 1] = (byte) (out[len - 1] ^ 128);
395
 
 
396
 
}