~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/cursor.cc

  • Committer: Stewart Smith
  • Date: 2010-07-27 00:49:32 UTC
  • mto: (1720.1.1 drizzle)
  • mto: This revision was merged to the branch mainline in revision 1721.
  • Revision ID: stewart@flamingspork.com-20100727004932-basq3vx9szmmbswm
fix storing and manipulating foreign keys in the proto around ALTER TABLE, CREATE TABLE and ALTER TABLE ADD/DROP FOREIGN KEY. We also (mostly) emulate the naming of innodb foreign keys in the upper layer.

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
#include "drizzled/session.h"
37
37
#include "drizzled/sql_base.h"
38
38
#include "drizzled/transaction_services.h"
39
 
#include "drizzled/replication_services.h"
40
39
#include "drizzled/lock.h"
41
40
#include "drizzled/item/int.h"
42
41
#include "drizzled/item/empty_string.h"
44
43
#include "drizzled/message/table.pb.h"
45
44
#include "drizzled/plugin/client.h"
46
45
#include "drizzled/internal/my_sys.h"
47
 
#include "drizzled/transaction_services.h"
 
46
#include "drizzled/plugin/event_observer.h"
48
47
 
49
48
using namespace std;
50
49
 
58
57
               TableShare &share_arg)
59
58
  : table_share(&share_arg), table(0),
60
59
    estimation_rows_to_insert(0), engine(&engine_arg),
61
 
    ref(0), in_range_check_pushed_down(false),
 
60
    ref(0),
62
61
    key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
63
62
    ref_length(sizeof(internal::my_off_t)),
64
63
    inited(NONE),
65
 
    locked(false), implicit_emptied(0),
 
64
    locked(false),
66
65
    next_insert_id(0), insert_id_for_cur_row(0)
67
66
{ }
68
67
 
73
72
}
74
73
 
75
74
 
 
75
/*
 
76
 * @note this only used in
 
77
 * optimizer::QuickRangeSelect::init_ror_merged_scan(bool reuse_handler) as
 
78
 * of the writing of this comment. -Brian
 
79
 */
76
80
Cursor *Cursor::clone(memory::Root *mem_root)
77
81
{
78
 
  Cursor *new_handler= table->s->db_type()->getCursor(*table->s, mem_root);
 
82
  Cursor *new_handler= table->getMutableShare()->db_type()->getCursor(*table->getMutableShare(), mem_root);
79
83
 
80
84
  /*
81
85
    Allocate Cursor->ref here because otherwise ha_open will allocate it
82
86
    on this->table->mem_root and we will not be able to reclaim that memory
83
87
    when the clone Cursor object is destroyed.
84
88
  */
85
 
  if (!(new_handler->ref= (unsigned char*) alloc_root(mem_root, ALIGN_SIZE(ref_length)*2)))
 
89
  if (!(new_handler->ref= (unsigned char*) mem_root->alloc_root(ALIGN_SIZE(ref_length)*2)))
86
90
    return NULL;
87
 
  if (new_handler && !new_handler->ha_open(table,
88
 
                                           table->s->normalized_path.str,
 
91
 
 
92
  TableIdentifier identifier(table->getShare()->getSchemaName(),
 
93
                             table->getShare()->getTableName(),
 
94
                             table->getShare()->getType());
 
95
 
 
96
  if (new_handler && !new_handler->ha_open(identifier,
 
97
                                           table,
 
98
                                           table->getMutableShare()->getNormalizedPath(),
89
99
                                           table->getDBStat(),
90
100
                                           HA_OPEN_IGNORE_IF_LOCKED))
91
101
    return new_handler;
92
102
  return NULL;
93
103
}
94
104
 
95
 
int Cursor::ha_index_init(uint32_t idx, bool sorted)
 
105
/*
 
106
  DESCRIPTION
 
107
    given a buffer with a key value, and a map of keyparts
 
108
    that are present in this value, returns the length of the value
 
109
*/
 
110
uint32_t Cursor::calculate_key_len(uint32_t key_position, key_part_map keypart_map_arg)
 
111
{
 
112
  /* works only with key prefixes */
 
113
  assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
 
114
 
 
115
  const KeyPartInfo *key_part_found= table->getShare()->getKeyInfo(key_position).key_part;
 
116
  const KeyPartInfo *end_key_part_found= key_part_found + table->getShare()->getKeyInfo(key_position).key_parts;
 
117
  uint32_t length= 0;
 
118
 
 
119
  while (key_part_found < end_key_part_found && keypart_map_arg)
 
120
  {
 
121
    length+= key_part_found->store_length;
 
122
    keypart_map_arg >>= 1;
 
123
    key_part_found++;
 
124
  }
 
125
  return length;
 
126
}
 
127
 
 
128
int Cursor::startIndexScan(uint32_t idx, bool sorted)
96
129
{
97
130
  int result;
98
131
  assert(inited == NONE);
99
 
  if (!(result= index_init(idx, sorted)))
 
132
  if (!(result= doStartIndexScan(idx, sorted)))
100
133
    inited=INDEX;
101
134
  end_range= NULL;
102
135
  return result;
103
136
}
104
137
 
105
 
int Cursor::ha_index_end()
 
138
int Cursor::endIndexScan()
106
139
{
107
140
  assert(inited==INDEX);
108
141
  inited=NONE;
109
142
  end_range= NULL;
110
 
  return(index_end());
 
143
  return(doEndIndexScan());
111
144
}
112
145
 
113
 
int Cursor::ha_rnd_init(bool scan)
 
146
int Cursor::startTableScan(bool scan)
114
147
{
115
148
  int result;
116
149
  assert(inited==NONE || (inited==RND && scan));
117
 
  inited= (result= rnd_init(scan)) ? NONE: RND;
 
150
  inited= (result= doStartTableScan(scan)) ? NONE: RND;
118
151
 
119
152
  return result;
120
153
}
121
154
 
122
 
int Cursor::ha_rnd_end()
 
155
int Cursor::endTableScan()
123
156
{
124
157
  assert(inited==RND);
125
158
  inited=NONE;
126
 
  return(rnd_end());
 
159
  return(doEndTableScan());
127
160
}
128
161
 
129
162
int Cursor::ha_index_or_rnd_end()
130
163
{
131
 
  return inited == INDEX ? ha_index_end() : inited == RND ? ha_rnd_end() : 0;
 
164
  return inited == INDEX ? endIndexScan() : inited == RND ? endTableScan() : 0;
132
165
}
133
166
 
134
167
void Cursor::ha_start_bulk_insert(ha_rows rows)
156
189
 
157
190
bool Cursor::has_transactions()
158
191
{
159
 
  return (table->s->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
 
192
  return (table->getShare()->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
160
193
}
161
194
 
162
 
void Cursor::ha_statistic_increment(ulong system_status_var::*offset) const
 
195
void Cursor::ha_statistic_increment(uint64_t system_status_var::*offset) const
163
196
{
164
197
  status_var_increment(table->in_use->status_var.*offset);
165
198
}
169
202
  return session->getEngineData(engine);
170
203
}
171
204
 
172
 
Session *Cursor::ha_session(void) const
173
 
{
174
 
  assert(!table || !table->in_use || table->in_use == current_session);
175
 
  return (table && table->in_use) ? table->in_use : current_session;
176
 
}
177
 
 
178
 
 
179
205
bool Cursor::is_fatal_error(int error, uint32_t flags)
180
206
{
181
207
  if (!error ||
191
217
uint64_t Cursor::tableSize() { return stats.index_file_length + stats.data_file_length; }
192
218
uint64_t Cursor::rowSize() { return table->getRecordLength() + table->sizeFields(); }
193
219
 
 
220
int Cursor::doOpen(const TableIdentifier &identifier, int mode, uint32_t test_if_locked)
 
221
{
 
222
  return open(identifier.getPath().c_str(), mode, test_if_locked);
 
223
}
 
224
 
194
225
/**
195
226
  Open database-Cursor.
196
227
 
197
228
  Try O_RDONLY if cannot open as O_RDWR
198
229
  Don't wait for locks if not HA_OPEN_WAIT_IF_LOCKED is set
199
230
*/
200
 
int Cursor::ha_open(Table *table_arg, const char *name, int mode,
201
 
                     int test_if_locked)
 
231
int Cursor::ha_open(const TableIdentifier &identifier,
 
232
                    Table *table_arg, const char *name, int mode,
 
233
                    int test_if_locked)
202
234
{
203
235
  int error;
204
236
 
205
237
  table= table_arg;
206
 
  assert(table->s == table_share);
207
 
  assert(alloc_root_inited(&table->mem_root));
 
238
  assert(table->getShare() == table_share);
208
239
 
209
 
  if ((error=open(name, mode, test_if_locked)))
 
240
  assert(identifier.getPath().compare(name) == 0);
 
241
  if ((error= doOpen(identifier, mode, test_if_locked)))
210
242
  {
211
243
    if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
212
244
        (table->db_stat & HA_TRY_READ_ONLY))
213
245
    {
214
246
      table->db_stat|=HA_READ_ONLY;
215
 
      error=open(name,O_RDONLY,test_if_locked);
 
247
      error= doOpen(identifier, O_RDONLY,test_if_locked);
216
248
    }
217
249
  }
218
250
  if (error)
221
253
  }
222
254
  else
223
255
  {
224
 
    if (table->s->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
 
256
    if (table->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
225
257
      table->db_stat|=HA_READ_ONLY;
226
258
    (void) extra(HA_EXTRA_NO_READCHECK);        // Not needed in SQL
227
259
 
228
260
    /* ref is already allocated for us if we're called from Cursor::clone() */
229
 
    if (!ref && !(ref= (unsigned char*) alloc_root(&table->mem_root,
230
 
                                          ALIGN_SIZE(ref_length)*2)))
 
261
    if (!ref && !(ref= (unsigned char*) table->alloc_root(ALIGN_SIZE(ref_length)*2)))
231
262
    {
232
263
      close();
233
264
      error=HA_ERR_OUT_OF_MEM;
239
270
}
240
271
 
241
272
/**
242
 
  one has to use this method when to find
243
 
  random position by record as the plain
244
 
  position() call doesn't work for some
245
 
  handlers for random position
246
 
*/
247
 
 
248
 
int Cursor::rnd_pos_by_record(unsigned char *record)
249
 
{
250
 
  register int error;
251
 
 
252
 
  position(record);
253
 
  if (inited && (error= ha_index_end()))
254
 
    return error;
255
 
  if ((error= ha_rnd_init(false)))
256
 
    return error;
257
 
 
258
 
  return rnd_pos(record, ref);
259
 
}
260
 
 
261
 
/**
262
273
  Read first row (only) from a table.
263
274
 
264
275
  This is never called for InnoDB tables, as these table types
278
289
  if (stats.deleted < 10 || primary_key >= MAX_KEY ||
279
290
      !(table->index_flags(primary_key) & HA_READ_ORDER))
280
291
  {
281
 
    (void) ha_rnd_init(1);
 
292
    (void) startTableScan(1);
282
293
    while ((error= rnd_next(buf)) == HA_ERR_RECORD_DELETED) ;
283
 
    (void) ha_rnd_end();
 
294
    (void) endTableScan();
284
295
  }
285
296
  else
286
297
  {
287
298
    /* Find the first row through the primary key */
288
 
    (void) ha_index_init(primary_key, 0);
 
299
    (void) startIndexScan(primary_key, 0);
289
300
    error=index_first(buf);
290
 
    (void) ha_index_end();
 
301
    (void) endIndexScan();
291
302
  }
292
303
  return error;
293
304
}
533
544
      nr= compute_next_insert_id(nr-1, variables);
534
545
    }
535
546
 
536
 
    if (table->s->next_number_keypart == 0)
 
547
    if (table->getShare()->next_number_keypart == 0)
537
548
    {
538
549
      /* We must defer the appending until "nr" has been possibly truncated */
539
550
      append= true;
651
662
void
652
663
Cursor::setTransactionReadWrite()
653
664
{
654
 
  ResourceContext *resource_context= ha_session()->getResourceContext(engine);
 
665
  ResourceContext *resource_context;
 
666
 
 
667
  /*
 
668
   * If the cursor has not context for execution then there should be no
 
669
   * possible resource to gain (and if there is... then there is a bug such
 
670
   * that in_use should have been set.
 
671
 */
 
672
  if (not table || not table->in_use)
 
673
    return;
 
674
 
 
675
  resource_context= table->in_use->getResourceContext(engine);
655
676
  /*
656
677
    When a storage engine method is called, the transaction must
657
678
    have been started, unless it's a DDL call, for which the
795
816
  {
796
817
    ptrdiff_t ptrdiff= buf - table->record[0];
797
818
    unsigned char *save_record_0= NULL;
798
 
    KEY *key_info= NULL;
799
 
    KEY_PART_INFO *key_part;
800
 
    KEY_PART_INFO *key_part_end= NULL;
 
819
    KeyInfo *key_info= NULL;
 
820
    KeyPartInfo *key_part;
 
821
    KeyPartInfo *key_part_end= NULL;
801
822
 
802
823
    /*
803
824
      key_cmp_if_same() compares table->record[0] against 'key'.
918
939
  range_seq_t seq_it;
919
940
  ha_rows rows, total_rows= 0;
920
941
  uint32_t n_ranges=0;
921
 
  Session *session= current_session;
922
942
 
923
943
  /* Default MRR implementation doesn't need buffer */
924
944
  *bufsz= 0;
926
946
  seq_it= seq->init(seq_init_param, n_ranges, *flags);
927
947
  while (!seq->next(seq_it, &range))
928
948
  {
929
 
    if (unlikely(session->killed != 0))
930
 
      return HA_POS_ERROR;
931
 
 
932
949
    n_ranges++;
933
950
    key_range *min_endp, *max_endp;
934
951
    {
1038
1055
  @param buf             INOUT: memory buffer to be used
1039
1056
 
1040
1057
  @note
1041
 
    One must have called index_init() before calling this function. Several
 
1058
    One must have called doStartIndexScan() before calling this function. Several
1042
1059
    multi_range_read_init() calls may be made in course of one query.
1043
1060
 
1044
1061
    Until WL#2623 is done (see its text, section 3.2), the following will
1053
1070
    The callee consumes all or some fraction of the provided buffer space, and
1054
1071
    sets the HANDLER_BUFFER members accordingly.
1055
1072
    The callee may use the buffer memory until the next multi_range_read_init()
1056
 
    call is made, all records have been read, or until index_end() call is
 
1073
    call is made, all records have been read, or until doEndIndexScan() call is
1057
1074
    made, whichever comes first.
1058
1075
 
1059
1076
  @retval 0  OK
1062
1079
 
1063
1080
int
1064
1081
Cursor::multi_range_read_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param,
1065
 
                               uint32_t n_ranges, uint32_t mode,
1066
 
                               HANDLER_BUFFER *)
 
1082
                               uint32_t n_ranges, uint32_t mode)
1067
1083
{
1068
1084
  mrr_iter= seq_funcs->init(seq_init_param, n_ranges, mode);
1069
1085
  mrr_funcs= *seq_funcs;
1070
1086
  mrr_is_output_sorted= test(mode & HA_MRR_SORTED);
1071
1087
  mrr_have_range= false;
 
1088
 
1072
1089
  return 0;
1073
1090
}
1074
1091
 
1091
1108
  int result= 0;
1092
1109
  int range_res= 0;
1093
1110
 
1094
 
  if (!mrr_have_range)
 
1111
  if (not mrr_have_range)
1095
1112
  {
1096
1113
    mrr_have_range= true;
1097
1114
    goto start;
1245
1262
int Cursor::compare_key(key_range *range)
1246
1263
{
1247
1264
  int cmp;
1248
 
  if (!range || in_range_check_pushed_down)
 
1265
  if (not range)
1249
1266
    return 0;                                   // No max range
1250
1267
  cmp= key_cmp(range_key_part, range->key, range->length);
1251
1268
  if (!cmp)
1253
1270
  return cmp;
1254
1271
}
1255
1272
 
1256
 
 
1257
 
/*
1258
 
  Same as compare_key() but doesn't check have in_range_check_pushed_down.
1259
 
  This is used by index condition pushdown implementation.
1260
 
*/
1261
 
 
1262
 
int Cursor::compare_key2(key_range *range)
1263
 
{
1264
 
  int cmp;
1265
 
  if (!range)
1266
 
    return 0;                                   // no max range
1267
 
  cmp= key_cmp(range_key_part, range->key, range->length);
1268
 
  if (!cmp)
1269
 
    cmp= key_compare_result_on_equal;
1270
 
  return cmp;
1271
 
}
1272
 
 
1273
1273
int Cursor::index_read_idx_map(unsigned char * buf, uint32_t index,
1274
1274
                                const unsigned char * key,
1275
1275
                                key_part_map keypart_map,
1276
1276
                                enum ha_rkey_function find_flag)
1277
1277
{
1278
1278
  int error, error1;
1279
 
  error= index_init(index, 0);
 
1279
  error= doStartIndexScan(index, 0);
1280
1280
  if (!error)
1281
1281
  {
1282
1282
    error= index_read_map(buf, key, keypart_map, find_flag);
1283
 
    error1= index_end();
 
1283
    error1= doEndIndexScan();
1284
1284
  }
1285
1285
  return error ?  error : error1;
1286
1286
}
1297
1297
                                    const unsigned char *after_record)
1298
1298
{
1299
1299
  TransactionServices &transaction_services= TransactionServices::singleton();
1300
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1301
1300
  Session *const session= table->in_use;
1302
1301
 
1303
 
  if (table->s->tmp_table || not replication_services.isActive())
 
1302
  if (table->getShare()->getType() || not transaction_services.shouldConstructMessages())
1304
1303
    return false;
1305
1304
 
1306
1305
  bool result= false;
1324
1323
    /*
1325
1324
     * This is a total hack because of the code that is
1326
1325
     * in write_record() in sql_insert.cc. During
1327
 
     * a REPLACE statement, a call to ha_write_row() is
1328
 
     * called.  If it fails, then a call to ha_delete_row()
 
1326
     * a REPLACE statement, a call to insertRecord() is
 
1327
     * called.  If it fails, then a call to deleteRecord()
1329
1328
     * is called, followed by a repeat of the original
1330
 
     * call to ha_write_row().  So, log_row_for_replication
 
1329
     * call to insertRecord().  So, log_row_for_replication
1331
1330
     * could be called either once or twice for a REPLACE
1332
1331
     * statement.  The below looks at the values of before_record
1333
1332
     * and after_record to determine which call to this
1451
1450
{
1452
1451
  /* Check that we have called all proper deallocation functions */
1453
1452
  assert((unsigned char*) table->def_read_set.getBitmap() +
1454
 
              table->s->column_bitmap_size ==
 
1453
              table->getShare()->column_bitmap_size ==
1455
1454
              (unsigned char*) table->def_write_set.getBitmap());
1456
 
  assert(table->s->all_set.isSetAll());
 
1455
  assert(table->getShare()->all_set.isSetAll());
1457
1456
  assert(table->key_read == 0);
1458
 
  /* ensure that ha_index_end / ha_rnd_end has been called */
 
1457
  /* ensure that ha_index_end / endTableScan has been called */
1459
1458
  assert(inited == NONE);
1460
1459
  /* Free cache used by filesort */
1461
1460
  table->free_io_cache();
1465
1464
}
1466
1465
 
1467
1466
 
1468
 
int Cursor::ha_write_row(unsigned char *buf)
 
1467
int Cursor::insertRecord(unsigned char *buf)
1469
1468
{
1470
1469
  int error;
1471
1470
 
1476
1475
   * Cursor interface and into the fill_record() method.
1477
1476
   */
1478
1477
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
1478
  {
1479
1479
    table->timestamp_field->set_time();
 
1480
  }
1480
1481
 
1481
1482
  DRIZZLE_INSERT_ROW_START(table_share->getSchemaName(), table_share->getTableName());
1482
1483
  setTransactionReadWrite();
1483
 
  error= write_row(buf);
 
1484
  
 
1485
  if (unlikely(plugin::EventObserver::beforeInsertRecord(*table, buf)))
 
1486
  {
 
1487
    error= ER_EVENT_OBSERVER_PLUGIN;
 
1488
  }
 
1489
  else
 
1490
  {
 
1491
    error= doInsertRecord(buf);
 
1492
    if (unlikely(plugin::EventObserver::afterInsertRecord(*table, buf, error))) 
 
1493
    {
 
1494
      error= ER_EVENT_OBSERVER_PLUGIN;
 
1495
    }
 
1496
  }
 
1497
 
 
1498
  ha_statistic_increment(&system_status_var::ha_write_count);
 
1499
 
1484
1500
  DRIZZLE_INSERT_ROW_DONE(error);
1485
1501
 
1486
1502
  if (unlikely(error))
1495
1511
}
1496
1512
 
1497
1513
 
1498
 
int Cursor::ha_update_row(const unsigned char *old_data, unsigned char *new_data)
 
1514
int Cursor::updateRecord(const unsigned char *old_data, unsigned char *new_data)
1499
1515
{
1500
1516
  int error;
1501
1517
 
1507
1523
 
1508
1524
  DRIZZLE_UPDATE_ROW_START(table_share->getSchemaName(), table_share->getTableName());
1509
1525
  setTransactionReadWrite();
1510
 
  error= update_row(old_data, new_data);
 
1526
  if (unlikely(plugin::EventObserver::beforeUpdateRecord(*table, old_data, new_data)))
 
1527
  {
 
1528
    error= ER_EVENT_OBSERVER_PLUGIN;
 
1529
  }
 
1530
  else
 
1531
  {
 
1532
    if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
1533
    {
 
1534
      table->timestamp_field->set_time();
 
1535
    }
 
1536
 
 
1537
    error= doUpdateRecord(old_data, new_data);
 
1538
    if (unlikely(plugin::EventObserver::afterUpdateRecord(*table, old_data, new_data, error)))
 
1539
    {
 
1540
      error= ER_EVENT_OBSERVER_PLUGIN;
 
1541
    }
 
1542
  }
 
1543
 
 
1544
  ha_statistic_increment(&system_status_var::ha_update_count);
 
1545
 
1511
1546
  DRIZZLE_UPDATE_ROW_DONE(error);
1512
1547
 
1513
1548
  if (unlikely(error))
1521
1556
  return 0;
1522
1557
}
1523
1558
 
1524
 
int Cursor::ha_delete_row(const unsigned char *buf)
 
1559
int Cursor::deleteRecord(const unsigned char *buf)
1525
1560
{
1526
1561
  int error;
1527
1562
 
1528
1563
  DRIZZLE_DELETE_ROW_START(table_share->getSchemaName(), table_share->getTableName());
1529
1564
  setTransactionReadWrite();
1530
 
  error= delete_row(buf);
 
1565
  if (unlikely(plugin::EventObserver::beforeDeleteRecord(*table, buf)))
 
1566
  {
 
1567
    error= ER_EVENT_OBSERVER_PLUGIN;
 
1568
  }
 
1569
  else
 
1570
  {
 
1571
    error= doDeleteRecord(buf);
 
1572
    if (unlikely(plugin::EventObserver::afterDeleteRecord(*table, buf, error)))
 
1573
    {
 
1574
      error= ER_EVENT_OBSERVER_PLUGIN;
 
1575
    }
 
1576
  }
 
1577
 
 
1578
  ha_statistic_increment(&system_status_var::ha_delete_count);
 
1579
 
1531
1580
  DRIZZLE_DELETE_ROW_DONE(error);
1532
1581
 
1533
1582
  if (unlikely(error))