23
23
Handler-calling-functions
26
#include "drizzled/server_includes.h"
27
#include "mysys/hash.h"
30
28
#include "drizzled/error.h"
31
#include "drizzled/field/epoch.h"
32
29
#include "drizzled/gettext.h"
33
#include "drizzled/internal/my_sys.h"
34
#include "drizzled/item/empty_string.h"
35
#include "drizzled/item/int.h"
36
#include "drizzled/lock.h"
37
#include "drizzled/message/table.h"
38
#include "drizzled/my_hash.h"
39
#include "drizzled/optimizer/cost_vector.h"
40
#include "drizzled/plugin/client.h"
41
#include "drizzled/plugin/event_observer.h"
42
#include "drizzled/plugin/storage_engine.h"
43
30
#include "drizzled/probes.h"
31
#include "drizzled/sql_parse.h"
32
#include "drizzled/cost_vect.h"
44
33
#include "drizzled/session.h"
45
34
#include "drizzled/sql_base.h"
46
#include "drizzled/sql_parse.h"
47
#include "drizzled/transaction_services.h"
35
#include "drizzled/replication_services.h"
36
#include "drizzled/lock.h"
37
#include "drizzled/item/int.h"
38
#include "drizzled/item/empty_string.h"
39
#include "drizzled/unireg.h" // for mysql_frm_type
40
#include "drizzled/field/timestamp.h"
41
#include "drizzled/message/table.pb.h"
42
#include "drizzled/plugin/client.h"
49
44
using namespace std;
45
using namespace drizzled;
54
47
/****************************************************************************
55
48
** General Cursor functions
56
49
****************************************************************************/
57
Cursor::Cursor(plugin::StorageEngine &engine_arg,
61
estimation_rows_to_insert(0),
63
key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
64
ref_length(sizeof(internal::my_off_t)),
67
next_insert_id(0), insert_id_for_cur_row(0)
70
50
Cursor::~Cursor(void)
72
52
assert(locked == false);
78
* @note this only used in
79
* optimizer::QuickRangeSelect::init_ror_merged_scan(bool reuse_handler) as
80
* of the writing of this comment. -Brian
82
Cursor *Cursor::clone(memory::Root *mem_root)
57
Cursor *Cursor::clone(MEM_ROOT *mem_root)
84
Cursor *new_handler= getTable()->getMutableShare()->db_type()->getCursor(*getTable());
59
Cursor *new_handler= table->s->db_type()->getCursor(*table->s, mem_root);
87
62
Allocate Cursor->ref here because otherwise ha_open will allocate it
88
63
on this->table->mem_root and we will not be able to reclaim that memory
89
64
when the clone Cursor object is destroyed.
91
if (!(new_handler->ref= (unsigned char*) mem_root->alloc_root(ALIGN_SIZE(ref_length)*2)))
66
if (!(new_handler->ref= (unsigned char*) alloc_root(mem_root, ALIGN_SIZE(ref_length)*2)))
94
identifier::Table identifier(getTable()->getShare()->getSchemaName(),
95
getTable()->getShare()->getTableName(),
96
getTable()->getShare()->getType());
98
if (new_handler && !new_handler->ha_open(identifier,
99
getTable()->getDBStat(),
68
if (new_handler && !new_handler->ha_open(table,
69
table->s->normalized_path.str,
100
71
HA_OPEN_IGNORE_IF_LOCKED))
101
72
return new_handler;
107
given a buffer with a key value, and a map of keyparts
108
that are present in this value, returns the length of the value
110
uint32_t Cursor::calculate_key_len(uint32_t key_position, key_part_map keypart_map_arg)
112
/* works only with key prefixes */
113
assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
115
const KeyPartInfo *key_part_found= getTable()->getShare()->getKeyInfo(key_position).key_part;
116
const KeyPartInfo *end_key_part_found= key_part_found + getTable()->getShare()->getKeyInfo(key_position).key_parts;
119
while (key_part_found < end_key_part_found && keypart_map_arg)
121
length+= key_part_found->store_length;
122
keypart_map_arg >>= 1;
128
int Cursor::startIndexScan(uint32_t idx, bool sorted)
76
int Cursor::ha_index_init(uint32_t idx, bool sorted)
131
79
assert(inited == NONE);
132
if (!(result= doStartIndexScan(idx, sorted)))
80
if (!(result= index_init(idx, sorted)))
138
int Cursor::endIndexScan()
86
int Cursor::ha_index_end()
140
88
assert(inited==INDEX);
143
return(doEndIndexScan());
146
int Cursor::startTableScan(bool scan)
94
int Cursor::ha_rnd_init(bool scan)
149
97
assert(inited==NONE || (inited==RND && scan));
150
inited= (result= doStartTableScan(scan)) ? NONE: RND;
98
inited= (result= rnd_init(scan)) ? NONE: RND;
155
int Cursor::endTableScan()
103
int Cursor::ha_rnd_end()
157
105
assert(inited==RND);
159
return(doEndTableScan());
162
110
int Cursor::ha_index_or_rnd_end()
164
return inited == INDEX ? endIndexScan() : inited == RND ? endTableScan() : 0;
112
return inited == INDEX ? ha_index_end() : inited == RND ? ha_rnd_end() : 0;
115
plugin::StorageEngine::Table_flags Cursor::ha_table_flags() const
117
return engine->table_flags();
167
120
void Cursor::ha_start_bulk_insert(ha_rows rows)
222
181
Try O_RDONLY if cannot open as O_RDWR
223
182
Don't wait for locks if not HA_OPEN_WAIT_IF_LOCKED is set
225
int Cursor::ha_open(const identifier::Table &identifier,
184
int Cursor::ha_open(Table *table_arg, const char *name, int mode,
231
if ((error= doOpen(identifier, mode, test_if_locked)))
190
assert(table->s == table_share);
191
assert(alloc_root_inited(&table->mem_root));
193
if ((error=open(name,mode,test_if_locked)))
233
195
if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
234
(getTable()->db_stat & HA_TRY_READ_ONLY))
196
(table->db_stat & HA_TRY_READ_ONLY))
236
getTable()->db_stat|=HA_READ_ONLY;
237
error= doOpen(identifier, O_RDONLY,test_if_locked);
198
table->db_stat|=HA_READ_ONLY;
199
error=open(name,O_RDONLY,test_if_locked);
242
errno= error; /* Safeguard */
204
my_errno= error; /* Safeguard */
246
if (getTable()->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
247
getTable()->db_stat|=HA_READ_ONLY;
208
if (table->s->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
209
table->db_stat|=HA_READ_ONLY;
248
210
(void) extra(HA_EXTRA_NO_READCHECK); // Not needed in SQL
250
212
/* ref is already allocated for us if we're called from Cursor::clone() */
251
if (!ref && !(ref= (unsigned char*) getTable()->alloc_root(ALIGN_SIZE(ref_length)*2)))
213
if (!ref && !(ref= (unsigned char*) alloc_root(&table->mem_root,
214
ALIGN_SIZE(ref_length)*2)))
254
217
error=HA_ERR_OUT_OF_MEM;
606
583
@param first_value (OUT) the first value reserved by the Cursor
607
584
@param nb_reserved_values (OUT) how many values the Cursor reserved
586
void Cursor::get_auto_increment(uint64_t ,
589
uint64_t *first_value,
590
uint64_t *nb_reserved_values)
595
(void) extra(HA_EXTRA_KEYREAD);
596
table->mark_columns_used_by_index_no_reset(table->s->next_number_index);
597
index_init(table->s->next_number_index, 1);
598
if (table->s->next_number_keypart == 0)
599
{ // Autoincrement at key-start
600
error=index_last(table->record[1]);
602
MySQL implicitely assumes such method does locking (as MySQL decides to
603
use nr+increment without checking again with the Cursor, in
604
Cursor::update_auto_increment()), so reserves to infinite.
606
*nb_reserved_values= UINT64_MAX;
610
unsigned char key[MAX_KEY_LENGTH];
611
key_copy(key, table->record[0],
612
table->key_info + table->s->next_number_index,
613
table->s->next_number_key_offset);
614
error= index_read_map(table->record[1], key,
615
make_prev_keypart_map(table->s->next_number_keypart),
616
HA_READ_PREFIX_LAST);
618
MySQL needs to call us for next row: assume we are inserting ("a",null)
619
here, we return 3, and next this statement will want to insert
620
("b",null): there is no reason why ("b",3+1) would be the good row to
621
insert: maybe it already exists, maybe 3+1 is too large...
623
*nb_reserved_values= 1;
629
nr= ((uint64_t) table->next_number_field->
630
val_int_offset(table->s->rec_buff_length)+1);
632
(void) extra(HA_EXTRA_NO_KEYREAD);
610
637
void Cursor::ha_release_auto_increment()
677
694
Unfortunately here we can't know know for sure if the engine
678
695
has registered the transaction or not, so we must check.
680
if (resource_context->isStarted())
697
if (ha_info->is_started())
682
resource_context->markModifiedData();
699
ha_info->set_trx_read_write();
704
Bulk update row: public interface.
706
@sa Cursor::bulk_update_row()
710
Cursor::ha_bulk_update_row(const unsigned char *old_data, unsigned char *new_data,
711
uint32_t *dup_key_found)
713
mark_trx_read_write();
715
return bulk_update_row(old_data, new_data, dup_key_found);
688
720
Delete all rows: public interface.
690
722
@sa Cursor::delete_all_rows()
694
This is now equalivalent to TRUNCATE TABLE.
698
726
Cursor::ha_delete_all_rows()
700
setTransactionReadWrite();
702
int result= delete_all_rows();
707
* Trigger post-truncate notification to plugins...
709
* @todo Make TransactionServices generic to AfterTriggerServices
712
Session *const session= getTable()->in_use;
713
TransactionServices &transaction_services= TransactionServices::singleton();
714
transaction_services.truncateTable(*session, *getTable());
728
mark_trx_read_write();
730
return delete_all_rows();
801
829
Cursor::closeMarkForDelete(const char *name)
803
setTransactionReadWrite();
831
mark_trx_read_write();
805
833
return drop_table(name);
837
Tell the storage engine that it is allowed to "disable transaction" in the
838
Cursor. It is a hint that ACID is not required - it is used in NDB for
839
ALTER Table, for example, when data are copied to temporary table.
840
A storage engine may treat this hint any way it likes. NDB for example
841
starts to commit every now and then automatically.
842
This hint can be safely ignored.
844
int ha_enable_transaction(Session *session, bool on)
848
if ((session->transaction.on= on))
851
Now all storage engines should have transaction handling enabled.
852
But some may have it enabled all the time - "disabling" transactions
853
is an optimization hint that storage engine is free to ignore.
854
So, let's commit an open transaction (if any) now.
856
if (!(error= ha_commit_trans(session, 0)))
857
if (! session->endTransaction(COMMIT))
808
864
int Cursor::index_next_same(unsigned char *buf, const unsigned char *key, uint32_t keylen)
811
867
if (!(error=index_next(buf)))
813
ptrdiff_t ptrdiff= buf - getTable()->getInsertRecord();
869
ptrdiff_t ptrdiff= buf - table->record[0];
814
870
unsigned char *save_record_0= NULL;
815
KeyInfo *key_info= NULL;
816
KeyPartInfo *key_part;
817
KeyPartInfo *key_part_end= NULL;
872
KEY_PART_INFO *key_part;
873
KEY_PART_INFO *key_part_end= NULL;
820
key_cmp_if_same() compares table->getInsertRecord() against 'key'.
821
In parts it uses table->getInsertRecord() directly, in parts it uses
822
field objects with their local pointers into table->getInsertRecord().
823
If 'buf' is distinct from table->getInsertRecord(), we need to move
824
all record references. This is table->getInsertRecord() itself and
876
key_cmp_if_same() compares table->record[0] against 'key'.
877
In parts it uses table->record[0] directly, in parts it uses
878
field objects with their local pointers into table->record[0].
879
If 'buf' is distinct from table->record[0], we need to move
880
all record references. This is table->record[0] itself and
825
881
the field pointers of the fields used in this key.
829
save_record_0= getTable()->getInsertRecord();
830
getTable()->record[0]= buf;
831
key_info= getTable()->key_info + active_index;
885
save_record_0= table->record[0];
886
table->record[0]= buf;
887
key_info= table->key_info + active_index;
832
888
key_part= key_info->key_part;
833
889
key_part_end= key_part + key_info->key_parts;
834
890
for (; key_part < key_part_end; key_part++)
1016
1076
int Cursor::multi_range_read_info(uint32_t keyno, uint32_t n_ranges, uint32_t n_rows,
1017
uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
1077
uint32_t *bufsz, uint32_t *flags, COST_VECT *cost)
1019
1079
*bufsz= 0; /* Default implementation doesn't need a buffer */
1021
1081
*flags |= HA_MRR_USE_DEFAULT_IMPL;
1024
cost->setAvgIOCost(1); /* assume random seeks */
1084
cost->avg_io_cost= 1; /* assume random seeks */
1026
1086
/* Produce the same cost as non-MRR code does */
1027
1087
if (*flags & HA_MRR_INDEX_ONLY)
1028
cost->setIOCount(index_only_read_time(keyno, n_rows));
1088
cost->io_count= index_only_read_time(keyno, n_rows);
1030
cost->setIOCount(read_time(keyno, n_ranges, n_rows));
1090
cost->io_count= read_time(keyno, n_ranges, n_rows);
1331
Same as compare_key() but doesn't check have in_range_check_pushed_down.
1332
This is used by index condition pushdown implementation.
1335
int Cursor::compare_key2(key_range *range)
1339
return 0; // no max range
1340
cmp= key_cmp(range_key_part, range->key, range->length);
1342
cmp= key_compare_result_on_equal;
1269
1346
int Cursor::index_read_idx_map(unsigned char * buf, uint32_t index,
1270
1347
const unsigned char * key,
1271
1348
key_part_map keypart_map,
1272
1349
enum ha_rkey_function find_flag)
1274
1351
int error, error1;
1275
error= doStartIndexScan(index, 0);
1352
error= index_init(index, 0);
1278
1355
error= index_read_map(buf, key, keypart_map, find_flag);
1279
error1= doEndIndexScan();
1356
error1= index_end();
1281
1358
return error ? error : error1;
1292
1369
const unsigned char *before_record,
1293
1370
const unsigned char *after_record)
1295
TransactionServices &transaction_services= TransactionServices::singleton();
1372
ReplicationServices &replication_services= ReplicationServices::singleton();
1296
1373
Session *const session= table->in_use;
1298
if (table->getShare()->getType() || not transaction_services.shouldConstructMessages())
1375
if (table->s->tmp_table || ! replication_services.isActive())
1303
1378
switch (session->lex->sql_command)
1305
case SQLCOM_CREATE_TABLE:
1307
* We are in a CREATE TABLE ... SELECT statement
1308
* and the kernel has already created the table
1309
* and put a CreateTableStatement in the active
1310
* Transaction message. Here, we add a new InsertRecord
1311
* to a new Transaction message (because the above
1312
* CREATE TABLE will commit the transaction containing
1315
result= transaction_services.insertRecord(*session, *table);
1317
1380
case SQLCOM_REPLACE:
1318
1381
case SQLCOM_REPLACE_SELECT:
1320
1383
* This is a total hack because of the code that is
1321
1384
* in write_record() in sql_insert.cc. During
1322
* a REPLACE statement, a call to insertRecord() is
1323
* called. If it fails, then a call to deleteRecord()
1385
* a REPLACE statement, a call to ha_write_row() is
1386
* called. If it fails, then a call to ha_delete_row()
1324
1387
* is called, followed by a repeat of the original
1325
* call to insertRecord(). So, log_row_for_replication
1326
* could be called multiple times for a REPLACE
1388
* call to ha_write_row(). So, log_row_for_replication
1389
* could be called either once or twice for a REPLACE
1327
1390
* statement. The below looks at the values of before_record
1328
1391
* and after_record to determine which call to this
1329
1392
* function is for the delete or the insert, since NULL
1337
1400
if (after_record == NULL)
1340
* The storage engine is passed the record in table->record[1]
1341
* as the row to delete (this is the conflicting row), so
1342
* we need to notify TransactionService to use that row.
1344
transaction_services.deleteRecord(*session, *table, true);
1402
replication_services.deleteRecord(session, table);
1346
1404
* We set the "current" statement message to NULL. This triggers
1347
1405
* the replication services component to generate a new statement
1348
1406
* message for the inserted record which will come next.
1350
transaction_services.finalizeStatementMessage(*session->getStatementMessage(), *session);
1408
replication_services.finalizeStatement(*session->getStatementMessage(), session);
1354
1412
if (before_record == NULL)
1355
result= transaction_services.insertRecord(*session, *table);
1413
replication_services.insertRecord(session, table);
1357
transaction_services.updateRecord(*session, *table, before_record, after_record);
1415
replication_services.updateRecord(session, table, before_record, after_record);
1360
1418
case SQLCOM_INSERT:
1361
1419
case SQLCOM_INSERT_SELECT:
1364
1421
* The else block below represents an
1365
1422
* INSERT ... ON DUPLICATE KEY UPDATE that
1369
1426
if (before_record == NULL)
1370
result= transaction_services.insertRecord(*session, *table);
1427
replication_services.insertRecord(session, table);
1372
transaction_services.updateRecord(*session, *table, before_record, after_record);
1429
replication_services.updateRecord(session, table, before_record, after_record);
1375
1432
case SQLCOM_UPDATE:
1376
transaction_services.updateRecord(*session, *table, before_record, after_record);
1433
replication_services.updateRecord(session, table, before_record, after_record);
1379
1436
case SQLCOM_DELETE:
1380
transaction_services.deleteRecord(*session, *table);
1437
replication_services.deleteRecord(session, table);
1389
1446
int Cursor::ha_external_lock(Session *session, int lock_type)
1396
1453
assert(next_insert_id == 0);
1398
if (DRIZZLE_CURSOR_RDLOCK_START_ENABLED() ||
1399
DRIZZLE_CURSOR_WRLOCK_START_ENABLED() ||
1400
DRIZZLE_CURSOR_UNLOCK_START_ENABLED())
1402
if (lock_type == F_RDLCK)
1404
DRIZZLE_CURSOR_RDLOCK_START(getTable()->getShare()->getSchemaName(),
1405
getTable()->getShare()->getTableName());
1407
else if (lock_type == F_WRLCK)
1409
DRIZZLE_CURSOR_WRLOCK_START(getTable()->getShare()->getSchemaName(),
1410
getTable()->getShare()->getTableName());
1412
else if (lock_type == F_UNLCK)
1414
DRIZZLE_CURSOR_UNLOCK_START(getTable()->getShare()->getSchemaName(),
1415
getTable()->getShare()->getTableName());
1420
1456
We cache the table flags if the locking succeeded. Otherwise, we
1421
1457
keep them as they were when they were fetched in ha_open().
1451
1469
int Cursor::ha_reset()
1453
1471
/* Check that we have called all proper deallocation functions */
1454
assert(! getTable()->getShare()->all_set.none());
1455
assert(getTable()->key_read == 0);
1456
/* ensure that ha_index_end / endTableScan has been called */
1472
assert((unsigned char*) table->def_read_set.getBitmap() +
1473
table->s->column_bitmap_size ==
1474
(unsigned char*) table->def_write_set.getBitmap());
1475
assert(table->s->all_set.isSetAll());
1476
assert(table->key_read == 0);
1477
/* ensure that ha_index_end / ha_rnd_end has been called */
1457
1478
assert(inited == NONE);
1458
1479
/* Free cache used by filesort */
1459
getTable()->free_io_cache();
1480
table->free_io_cache();
1460
1481
/* reset the bitmaps to point to defaults */
1461
getTable()->default_column_bitmaps();
1482
table->default_column_bitmaps();
1462
1483
return(reset());
1466
int Cursor::insertRecord(unsigned char *buf)
1487
int Cursor::ha_write_row(unsigned char *buf)
1471
* If we have a timestamp column, update it to the current time
1492
* If we have a timestamp column, update it to the current time
1473
1494
* @TODO Technically, the below two lines can be take even further out of the
1474
1495
* Cursor interface and into the fill_record() method.
1476
if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
1478
getTable()->timestamp_field->set_time();
1481
DRIZZLE_INSERT_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1482
setTransactionReadWrite();
1484
if (unlikely(plugin::EventObserver::beforeInsertRecord(*getTable(), buf)))
1486
error= ER_EVENT_OBSERVER_PLUGIN;
1490
error= doInsertRecord(buf);
1491
if (unlikely(plugin::EventObserver::afterInsertRecord(*getTable(), buf, error)))
1493
error= ER_EVENT_OBSERVER_PLUGIN;
1497
ha_statistic_increment(&system_status_var::ha_write_count);
1499
DRIZZLE_INSERT_ROW_DONE(error);
1501
if (unlikely(error))
1497
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
1498
table->timestamp_field->set_time();
1500
mark_trx_read_write();
1502
if (unlikely(error= write_row(buf)))
1506
if (unlikely(log_row_for_replication(getTable(), NULL, buf)))
1507
if (unlikely(log_row_for_replication(table, NULL, buf)))
1507
1508
return HA_ERR_RBR_LOGGING_FAILED;
1513
int Cursor::updateRecord(const unsigned char *old_data, unsigned char *new_data)
1514
int Cursor::ha_update_row(const unsigned char *old_data, unsigned char *new_data)
1518
Some storage engines require that the new record is in getInsertRecord()
1519
(and the old record is in getUpdateRecord()).
1519
Some storage engines require that the new record is in record[0]
1520
(and the old record is in record[1]).
1521
assert(new_data == getTable()->getInsertRecord());
1523
DRIZZLE_UPDATE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1524
setTransactionReadWrite();
1525
if (unlikely(plugin::EventObserver::beforeUpdateRecord(*getTable(), old_data, new_data)))
1527
error= ER_EVENT_OBSERVER_PLUGIN;
1531
if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1533
getTable()->timestamp_field->set_time();
1536
error= doUpdateRecord(old_data, new_data);
1537
if (unlikely(plugin::EventObserver::afterUpdateRecord(*getTable(), old_data, new_data, error)))
1539
error= ER_EVENT_OBSERVER_PLUGIN;
1543
ha_statistic_increment(&system_status_var::ha_update_count);
1545
DRIZZLE_UPDATE_ROW_DONE(error);
1547
if (unlikely(error))
1522
assert(new_data == table->record[0]);
1524
mark_trx_read_write();
1526
if (unlikely(error= update_row(old_data, new_data)))
1552
if (unlikely(log_row_for_replication(getTable(), old_data, new_data)))
1531
if (unlikely(log_row_for_replication(table, old_data, new_data)))
1553
1532
return HA_ERR_RBR_LOGGING_FAILED;
1557
TableShare *Cursor::getShare()
1559
return getTable()->getMutableShare();
1562
int Cursor::deleteRecord(const unsigned char *buf)
1537
int Cursor::ha_delete_row(const unsigned char *buf)
1566
DRIZZLE_DELETE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1567
setTransactionReadWrite();
1568
if (unlikely(plugin::EventObserver::beforeDeleteRecord(*getTable(), buf)))
1570
error= ER_EVENT_OBSERVER_PLUGIN;
1574
error= doDeleteRecord(buf);
1575
if (unlikely(plugin::EventObserver::afterDeleteRecord(*getTable(), buf, error)))
1577
error= ER_EVENT_OBSERVER_PLUGIN;
1581
ha_statistic_increment(&system_status_var::ha_delete_count);
1583
DRIZZLE_DELETE_ROW_DONE(error);
1585
if (unlikely(error))
1541
mark_trx_read_write();
1543
if (unlikely(error= delete_row(buf)))
1588
if (unlikely(log_row_for_replication(getTable(), buf, NULL)))
1546
if (unlikely(log_row_for_replication(table, buf, NULL)))
1589
1547
return HA_ERR_RBR_LOGGING_FAILED;
1594
} /* namespace drizzled */