~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/dict/create_replication.c

  • Committer: Brian Aker
  • Date: 2010-12-18 10:14:05 UTC
  • mfrom: (2008.1.3 clean)
  • Revision ID: brian@tangent.org-20101218101405-qjbse29shi9coklg
Merge of user identifier work

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2010 Brian Aker
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; either version 2 of the License, or
 
9
 *  (at your option) any later version.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
#include "config.h"
 
22
 
 
23
#include "read_replication.h"
 
24
#include "create_replication.h"
 
25
 
 
26
#ifdef UNIV_NONINL
 
27
#include "dict0crea.ic"
 
28
#endif
 
29
 
 
30
#include "btr0pcur.h"
 
31
#include "btr0btr.h"
 
32
#include "page0page.h"
 
33
#include "mach0data.h"
 
34
#include "dict0boot.h"
 
35
#include "dict0dict.h"
 
36
#include "que0que.h"
 
37
#include "row0ins.h"
 
38
#include "row0mysql.h"
 
39
#include "pars0pars.h"
 
40
#include "trx0roll.h"
 
41
#include "usr0sess.h"
 
42
#include "ut0vec.h"
 
43
#include "row0merge.h"
 
44
 
 
45
UNIV_INTERN ulint dict_create_sys_replication_log(void)
 
46
{
 
47
  dict_table_t* table1;
 
48
  ulint error;
 
49
  trx_t *trx;
 
50
 
 
51
  mutex_enter(&(dict_sys->mutex));
 
52
 
 
53
  table1 = dict_table_get_low("SYS_REPLICATION_LOG");
 
54
 
 
55
  if (table1) 
 
56
  {
 
57
    mutex_exit(&(dict_sys->mutex));
 
58
 
 
59
    return(DB_SUCCESS);
 
60
  }
 
61
 
 
62
  mutex_exit(&(dict_sys->mutex));
 
63
 
 
64
  trx= trx_allocate_for_mysql();
 
65
 
 
66
  trx->op_info= "creating replication sys table";
 
67
 
 
68
  row_mysql_lock_data_dictionary(trx);
 
69
 
 
70
  pars_info_t *info= pars_info_create();
 
71
 
 
72
 
 
73
  error = que_eval_sql(info,
 
74
                       "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
 
75
                       "BEGIN\n"
 
76
                       "CREATE TABLE SYS_REPLICATION_LOG(ID BINARY(8), MESSAGE BLOB);\n"
 
77
                       "CREATE UNIQUE CLUSTERED INDEX ID_IND ON SYS_REPLICATION_LOG (ID);\n"
 
78
                       "END;\n"
 
79
                       , FALSE, trx);
 
80
 
 
81
 
 
82
 
 
83
  if (error != DB_SUCCESS)
 
84
  {
 
85
    fprintf(stderr, "InnoDB: error %lu in creation.\n", (ulong) error);
 
86
 
 
87
    ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
 
88
 
 
89
    fprintf(stderr,
 
90
            "InnoDB: creation failed\n"
 
91
            "InnoDB: tablespace is full\n"
 
92
            "InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
 
93
 
 
94
    row_drop_table_for_mysql("SYS_REPLICATION_LOG", trx, TRUE);
 
95
 
 
96
    error = DB_MUST_GET_MORE_FILE_SPACE;
 
97
  }
 
98
 
 
99
  trx_commit_for_mysql(trx);
 
100
 
 
101
  row_mysql_unlock_data_dictionary(trx);
 
102
 
 
103
  trx_free_for_mysql(trx);
 
104
 
 
105
  return(error);
 
106
}
 
107
 
 
108
extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t*    prebuilt);
 
109
 
 
110
ulint insert_replication_message(const char *message, size_t size, 
 
111
                                 trx_t *trx, uint64_t trx_id)
 
112
{
 
113
  ulint error;
 
114
  row_prebuilt_t*       prebuilt;       /* For reading rows */
 
115
  dict_table_t *table;
 
116
  que_thr_t*    thr;
 
117
 
 
118
  table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
 
119
 
 
120
  prebuilt = row_create_prebuilt(table);
 
121
 
 
122
  if (prebuilt->trx != trx) 
 
123
  {
 
124
    row_update_prebuilt_trx(prebuilt, trx);
 
125
  }
 
126
 
 
127
  /* DDL operations create table/drop table call
 
128
   * innobase_commit_low() which will commit the trx
 
129
   * that leaves the operation of committing to the
 
130
   * log in a new trx. If that is the case we need
 
131
   * to keep track and commit the trx later in this
 
132
   * function. 
 
133
   */ 
 
134
  bool is_started= true;
 
135
  if (trx->conc_state == TRX_NOT_STARTED)
 
136
  {
 
137
    is_started= false;
 
138
  }
 
139
 
 
140
  dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
 
141
  dfield_t *dfield;
 
142
  dfield = dtuple_get_nth_field(dtuple, 0);
 
143
 
 
144
  dfield_set_data(dfield, &trx_id, 8);
 
145
 
 
146
  dfield = dtuple_get_nth_field(dtuple, 1);
 
147
  dfield_set_data(dfield, message, size);
 
148
 
 
149
  ins_node_t*   node            = prebuilt->ins_node;
 
150
 
 
151
  thr = que_fork_get_first_thr(prebuilt->ins_graph);
 
152
 
 
153
  if (prebuilt->sql_stat_start) {
 
154
    node->state = INS_NODE_SET_IX_LOCK;
 
155
    prebuilt->sql_stat_start = FALSE;
 
156
  } else {
 
157
    node->state = INS_NODE_ALLOC_ROW_ID;
 
158
  }
 
159
 
 
160
  que_thr_move_to_run_state_for_mysql(thr, trx);
 
161
 
 
162
//run_again:
 
163
  thr->run_node = node;
 
164
  thr->prev_node = node;
 
165
 
 
166
  row_ins_step(thr);
 
167
 
 
168
  error = trx->error_state;
 
169
 
 
170
  que_thr_stop_for_mysql_no_error(thr, trx);
 
171
  row_prebuilt_free(prebuilt, FALSE);
 
172
 
 
173
  if (! is_started)
 
174
  {
 
175
    trx_commit_for_mysql(trx);
 
176
  }
 
177
 
 
178
  return error;
 
179
}
 
180
 
 
181
UNIV_INTERN struct read_replication_state_st *replication_read_init(void)
 
182
{
 
183
  struct read_replication_state_st *state= calloc(1, sizeof(struct read_replication_state_st));
 
184
 
 
185
  mutex_enter(&(dict_sys->mutex));
 
186
 
 
187
  mtr_start(&state->mtr);
 
188
  state->sys_tables= dict_table_get_low("SYS_REPLICATION_LOG");
 
189
  state->sys_index= UT_LIST_GET_FIRST(state->sys_tables->indexes);
 
190
 
 
191
  mutex_exit(&(dict_sys->mutex));
 
192
 
 
193
  btr_pcur_open_at_index_side(TRUE, state->sys_index, BTR_SEARCH_LEAF, &state->pcur, TRUE, &state->mtr);
 
194
 
 
195
  return state;
 
196
}
 
197
 
 
198
UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *state)
 
199
{
 
200
  btr_pcur_close(&state->pcur);
 
201
  mtr_commit(&state->mtr);
 
202
  free(state);
 
203
}
 
204
 
 
205
UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
 
206
{
 
207
  struct read_replication_return_st ret;
 
208
  const rec_t *rec;
 
209
 
 
210
  btr_pcur_move_to_next_user_rec(&state->pcur, &state->mtr);
 
211
 
 
212
  rec= btr_pcur_get_rec(&state->pcur);
 
213
 
 
214
  while (btr_pcur_is_on_user_rec(&state->pcur))
 
215
  {
 
216
    const byte* field;
 
217
    ulint len;
 
218
 
 
219
    // Is the row deleted? If so go fetch the next
 
220
    if (rec_get_deleted_flag(rec, 0))
 
221
      continue;
 
222
 
 
223
    // Store transaction id
 
224
    field = rec_get_nth_field_old(rec, 0, &len);
 
225
    ret.id= *(uint64_t *)field;
 
226
 
 
227
    // Handler message
 
228
    field = rec_get_nth_field_old(rec, 3, &len);
 
229
    ret.message= (char *)field;
 
230
    ret.message_length= len;
 
231
 
 
232
    // @todo double check that "field" will continue to be value past this
 
233
    // point.
 
234
    btr_pcur_store_position(&state->pcur, &state->mtr);
 
235
    mtr_commit(&state->mtr);
 
236
 
 
237
    mtr_start(&state->mtr);
 
238
 
 
239
    btr_pcur_restore_position(BTR_SEARCH_LEAF, &state->pcur, &state->mtr);
 
240
 
 
241
    return ret;
 
242
  }
 
243
 
 
244
  /* end of index */
 
245
  memset(&ret, 0, sizeof(ret));
 
246
 
 
247
  return ret;
 
248
}