~drizzle-trunk/drizzle/development

641.2.2 by Monty Taylor
InnoDB Plugin 1.0.3
1
/*****************************************************************************
2
3
Copyright (c) 1996, 2009, Innobase Oy. All Rights Reserved.
4
5
This program is free software; you can redistribute it and/or modify it under
6
the terms of the GNU General Public License as published by the Free Software
7
Foundation; version 2 of the License.
8
9
This program is distributed in the hope that it will be useful, but WITHOUT
10
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
13
You should have received a copy of the GNU General Public License along with
14
this program; if not, write to the Free Software Foundation, Inc., 59 Temple
15
Place, Suite 330, Boston, MA 02111-1307 USA
16
17
*****************************************************************************/
18
641.1.2 by Monty Taylor
Imported 1.0.1 with clean - with no changes.
19
/******************************************************
20
Insert into a table
21
22
Created 4/20/1996 Heikki Tuuri
23
*******************************************************/
24
25
#include "row0ins.h"
26
27
#ifdef UNIV_NONINL
28
#include "row0ins.ic"
29
#endif
30
31
#include "dict0dict.h"
32
#include "dict0boot.h"
33
#include "trx0undo.h"
34
#include "btr0btr.h"
35
#include "btr0cur.h"
36
#include "mach0data.h"
37
#include "que0que.h"
38
#include "row0upd.h"
39
#include "row0sel.h"
40
#include "row0row.h"
41
#include "rem0cmp.h"
42
#include "lock0lock.h"
43
#include "log0log.h"
44
#include "eval0eval.h"
45
#include "data0data.h"
46
#include "usr0sess.h"
47
#include "buf0lru.h"
48
49
#define	ROW_INS_PREV	1
50
#define	ROW_INS_NEXT	2
51
52
53
/*************************************************************************
54
Creates an insert node struct. */
55
UNIV_INTERN
56
ins_node_t*
57
ins_node_create(
58
/*============*/
59
					/* out, own: insert node struct */
60
	ulint		ins_type,	/* in: INS_VALUES, ... */
61
	dict_table_t*	table,		/* in: table where to insert */
62
	mem_heap_t*	heap)		/* in: mem heap where created */
63
{
64
	ins_node_t*	node;
65
66
	node = mem_heap_alloc(heap, sizeof(ins_node_t));
67
68
	node->common.type = QUE_NODE_INSERT;
69
70
	node->ins_type = ins_type;
71
72
	node->state = INS_NODE_SET_IX_LOCK;
73
	node->table = table;
74
	node->index = NULL;
75
	node->entry = NULL;
76
77
	node->select = NULL;
78
79
	node->trx_id = ut_dulint_zero;
80
81
	node->entry_sys_heap = mem_heap_create(128);
82
83
	node->magic_n = INS_NODE_MAGIC_N;
84
85
	return(node);
86
}
87
88
/***************************************************************
89
Creates an entry template for each index of a table. */
90
UNIV_INTERN
91
void
92
ins_node_create_entry_list(
93
/*=======================*/
94
	ins_node_t*	node)	/* in: row insert node */
95
{
96
	dict_index_t*	index;
97
	dtuple_t*	entry;
98
99
	ut_ad(node->entry_sys_heap);
100
101
	UT_LIST_INIT(node->entry_list);
102
103
	index = dict_table_get_first_index(node->table);
104
105
	while (index != NULL) {
106
		entry = row_build_index_entry(node->row, NULL, index,
107
					      node->entry_sys_heap);
108
		UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
109
110
		index = dict_table_get_next_index(index);
111
	}
112
}
113
114
/*********************************************************************
115
Adds system field buffers to a row. */
116
static
117
void
118
row_ins_alloc_sys_fields(
119
/*=====================*/
120
	ins_node_t*	node)	/* in: insert node */
121
{
122
	dtuple_t*		row;
123
	dict_table_t*		table;
124
	mem_heap_t*		heap;
125
	const dict_col_t*	col;
126
	dfield_t*		dfield;
127
	byte*			ptr;
128
129
	row = node->row;
130
	table = node->table;
131
	heap = node->entry_sys_heap;
132
133
	ut_ad(row && table && heap);
134
	ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
135
136
	/* 1. Allocate buffer for row id */
137
138
	col = dict_table_get_sys_col(table, DATA_ROW_ID);
139
140
	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
141
142
	ptr = mem_heap_alloc(heap, DATA_ROW_ID_LEN);
143
144
	dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
145
146
	node->row_id_buf = ptr;
147
148
	/* 3. Allocate buffer for trx id */
149
150
	col = dict_table_get_sys_col(table, DATA_TRX_ID);
151
152
	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
153
	ptr = mem_heap_alloc(heap, DATA_TRX_ID_LEN);
154
155
	dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
156
157
	node->trx_id_buf = ptr;
158
159
	/* 4. Allocate buffer for roll ptr */
160
161
	col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
162
163
	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
164
	ptr = mem_heap_alloc(heap, DATA_ROLL_PTR_LEN);
165
166
	dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
167
}
168
169
/*************************************************************************
170
Sets a new row to insert for an INS_DIRECT node. This function is only used
171
if we have constructed the row separately, which is a rare case; this
172
function is quite slow. */
173
UNIV_INTERN
174
void
175
ins_node_set_new_row(
176
/*=================*/
177
	ins_node_t*	node,	/* in: insert node */
178
	dtuple_t*	row)	/* in: new row (or first row) for the node */
179
{
180
	node->state = INS_NODE_SET_IX_LOCK;
181
	node->index = NULL;
182
	node->entry = NULL;
183
184
	node->row = row;
185
186
	mem_heap_empty(node->entry_sys_heap);
187
188
	/* Create templates for index entries */
189
190
	ins_node_create_entry_list(node);
191
192
	/* Allocate from entry_sys_heap buffers for sys fields */
193
194
	row_ins_alloc_sys_fields(node);
195
196
	/* As we allocated a new trx id buf, the trx id should be written
197
	there again: */
198
199
	node->trx_id = ut_dulint_zero;
200
}
201
202
/***********************************************************************
203
Does an insert operation by updating a delete-marked existing record
204
in the index. This situation can occur if the delete-marked record is
205
kept in the index for consistent reads. */
206
static
207
ulint
208
row_ins_sec_index_entry_by_modify(
209
/*==============================*/
210
				/* out: DB_SUCCESS or error code */
211
	ulint		mode,	/* in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
212
				depending on whether mtr holds just a leaf
213
				latch or also a tree latch */
214
	btr_cur_t*	cursor,	/* in: B-tree cursor */
215
	const dtuple_t*	entry,	/* in: index entry to insert */
216
	que_thr_t*	thr,	/* in: query thread */
217
	mtr_t*		mtr)	/* in: mtr; must be committed before
218
				latching any further pages */
219
{
220
	big_rec_t*	dummy_big_rec;
221
	mem_heap_t*	heap;
222
	upd_t*		update;
223
	rec_t*		rec;
224
	ulint		err;
225
226
	rec = btr_cur_get_rec(cursor);
227
228
	ut_ad(!dict_index_is_clust(cursor->index));
229
	ut_ad(rec_get_deleted_flag(rec,
230
				   dict_table_is_comp(cursor->index->table)));
231
232
	/* We know that in the alphabetical ordering, entry and rec are
233
	identified. But in their binary form there may be differences if
234
	there are char fields in them. Therefore we have to calculate the
235
	difference. */
236
237
	heap = mem_heap_create(1024);
238
239
	update = row_upd_build_sec_rec_difference_binary(
240
		cursor->index, entry, rec, thr_get_trx(thr), heap);
241
	if (mode == BTR_MODIFY_LEAF) {
242
		/* Try an optimistic updating of the record, keeping changes
243
		within the page */
244
245
		err = btr_cur_optimistic_update(BTR_KEEP_SYS_FLAG, cursor,
246
						update, 0, thr, mtr);
247
		switch (err) {
248
		case DB_OVERFLOW:
249
		case DB_UNDERFLOW:
250
		case DB_ZIP_OVERFLOW:
251
			err = DB_FAIL;
252
		}
253
	} else {
254
		ut_a(mode == BTR_MODIFY_TREE);
255
		if (buf_LRU_buf_pool_running_out()) {
256
257
			err = DB_LOCK_TABLE_FULL;
258
259
			goto func_exit;
260
		}
261
262
		err = btr_cur_pessimistic_update(BTR_KEEP_SYS_FLAG, cursor,
263
						 &heap, &dummy_big_rec, update,
264
						 0, thr, mtr);
265
		ut_ad(!dummy_big_rec);
266
	}
267
func_exit:
268
	mem_heap_free(heap);
269
270
	return(err);
271
}
272
273
/***********************************************************************
274
Does an insert operation by delete unmarking and updating a delete marked
275
existing record in the index. This situation can occur if the delete marked
276
record is kept in the index for consistent reads. */
277
static
278
ulint
279
row_ins_clust_index_entry_by_modify(
280
/*================================*/
281
				/* out: DB_SUCCESS, DB_FAIL, or error code */
282
	ulint		mode,	/* in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
283
				depending on whether mtr holds just a leaf
284
				latch or also a tree latch */
285
	btr_cur_t*	cursor,	/* in: B-tree cursor */
286
	mem_heap_t**	heap,	/* in/out: pointer to memory heap, or NULL */
287
	big_rec_t**	big_rec,/* out: possible big rec vector of fields
288
				which have to be stored externally by the
289
				caller */
290
	const dtuple_t*	entry,	/* in: index entry to insert */
291
	que_thr_t*	thr,	/* in: query thread */
292
	mtr_t*		mtr)	/* in: mtr; must be committed before
293
				latching any further pages */
294
{
295
	rec_t*		rec;
296
	upd_t*		update;
297
	ulint		err;
298
299
	ut_ad(dict_index_is_clust(cursor->index));
300
301
	*big_rec = NULL;
302
303
	rec = btr_cur_get_rec(cursor);
304
305
	ut_ad(rec_get_deleted_flag(rec,
306
				   dict_table_is_comp(cursor->index->table)));
307
308
	if (!*heap) {
309
		*heap = mem_heap_create(1024);
310
	}
311
312
	/* Build an update vector containing all the fields to be modified;
313
	NOTE that this vector may NOT contain system columns trx_id or
314
	roll_ptr */
315
316
	update = row_upd_build_difference_binary(cursor->index, entry, rec,
317
						 thr_get_trx(thr), *heap);
318
	if (mode == BTR_MODIFY_LEAF) {
319
		/* Try optimistic updating of the record, keeping changes
320
		within the page */
321
322
		err = btr_cur_optimistic_update(0, cursor, update, 0, thr,
323
						mtr);
324
		switch (err) {
325
		case DB_OVERFLOW:
326
		case DB_UNDERFLOW:
327
		case DB_ZIP_OVERFLOW:
328
			err = DB_FAIL;
329
		}
330
	} else {
331
		ut_a(mode == BTR_MODIFY_TREE);
332
		if (buf_LRU_buf_pool_running_out()) {
333
334
			return(DB_LOCK_TABLE_FULL);
335
336
		}
337
		err = btr_cur_pessimistic_update(0, cursor,
338
						 heap, big_rec, update,
339
						 0, thr, mtr);
340
	}
341
342
	return(err);
343
}
344
345
/*************************************************************************
346
Returns TRUE if in a cascaded update/delete an ancestor node of node
347
updates (not DELETE, but UPDATE) table. */
348
static
349
ibool
350
row_ins_cascade_ancestor_updates_table(
351
/*===================================*/
352
				/* out: TRUE if an ancestor updates table */
353
	que_node_t*	node,	/* in: node in a query graph */
354
	dict_table_t*	table)	/* in: table */
355
{
356
	que_node_t*	parent;
357
	upd_node_t*	upd_node;
358
359
	parent = que_node_get_parent(node);
360
361
	while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
362
363
		upd_node = parent;
364
365
		if (upd_node->table == table && upd_node->is_delete == FALSE) {
366
367
			return(TRUE);
368
		}
369
370
		parent = que_node_get_parent(parent);
371
372
		ut_a(parent);
373
	}
374
375
	return(FALSE);
376
}
377
378
/*************************************************************************
379
Returns the number of ancestor UPDATE or DELETE nodes of a
380
cascaded update/delete node. */
381
static
382
ulint
383
row_ins_cascade_n_ancestors(
384
/*========================*/
385
				/* out: number of ancestors */
386
	que_node_t*	node)	/* in: node in a query graph */
387
{
388
	que_node_t*	parent;
389
	ulint		n_ancestors = 0;
390
391
	parent = que_node_get_parent(node);
392
393
	while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
394
		n_ancestors++;
395
396
		parent = que_node_get_parent(parent);
397
398
		ut_a(parent);
399
	}
400
401
	return(n_ancestors);
402
}
403
404
/**********************************************************************
405
Calculates the update vector node->cascade->update for a child table in
406
a cascaded update. */
407
static
408
ulint
409
row_ins_cascade_calc_update_vec(
410
/*============================*/
411
					/* out: number of fields in the
412
					calculated update vector; the value
413
					can also be 0 if no foreign key
414
					fields changed; the returned value
415
					is ULINT_UNDEFINED if the column
416
					type in the child table is too short
417
					to fit the new value in the parent
418
					table: that means the update fails */
419
	upd_node_t*	node,		/* in: update node of the parent
420
					table */
421
	dict_foreign_t*	foreign,	/* in: foreign key constraint whose
422
					type is != 0 */
423
	mem_heap_t*	heap)		/* in: memory heap to use as
424
					temporary storage */
425
{
426
	upd_node_t*	cascade		= node->cascade_node;
427
	dict_table_t*	table		= foreign->foreign_table;
428
	dict_index_t*	index		= foreign->foreign_index;
429
	upd_t*		update;
430
	upd_field_t*	ufield;
431
	dict_table_t*	parent_table;
432
	dict_index_t*	parent_index;
433
	upd_t*		parent_update;
434
	upd_field_t*	parent_ufield;
435
	ulint		n_fields_updated;
436
	ulint		parent_field_no;
437
	ulint		i;
438
	ulint		j;
439
440
	ut_a(node);
441
	ut_a(foreign);
442
	ut_a(cascade);
443
	ut_a(table);
444
	ut_a(index);
445
446
	/* Calculate the appropriate update vector which will set the fields
447
	in the child index record to the same value (possibly padded with
448
	spaces if the column is a fixed length CHAR or FIXBINARY column) as
449
	the referenced index record will get in the update. */
450
451
	parent_table = node->table;
452
	ut_a(parent_table == foreign->referenced_table);
453
	parent_index = foreign->referenced_index;
454
	parent_update = node->update;
455
456
	update = cascade->update;
457
458
	update->info_bits = 0;
459
	update->n_fields = foreign->n_fields;
460
461
	n_fields_updated = 0;
462
463
	for (i = 0; i < foreign->n_fields; i++) {
464
465
		parent_field_no = dict_table_get_nth_col_pos(
466
			parent_table,
467
			dict_index_get_nth_col_no(parent_index, i));
468
469
		for (j = 0; j < parent_update->n_fields; j++) {
470
			parent_ufield = parent_update->fields + j;
471
472
			if (parent_ufield->field_no == parent_field_no) {
473
474
				ulint			min_size;
475
				const dict_col_t*	col;
476
				ulint			ufield_len;
477
478
				col = dict_index_get_nth_col(index, i);
479
480
				/* A field in the parent index record is
481
				updated. Let us make the update vector
482
				field for the child table. */
483
484
				ufield = update->fields + n_fields_updated;
485
486
				ufield->field_no
487
					= dict_table_get_nth_col_pos(
488
					table, dict_col_get_no(col));
489
				ufield->exp = NULL;
490
491
				ufield->new_val = parent_ufield->new_val;
492
				ufield_len = dfield_get_len(&ufield->new_val);
493
494
				/* Clear the "external storage" flag */
495
				dfield_set_len(&ufield->new_val, ufield_len);
496
497
				/* Do not allow a NOT NULL column to be
498
				updated as NULL */
499
500
				if (dfield_is_null(&ufield->new_val)
501
				    && (col->prtype & DATA_NOT_NULL)) {
502
503
					return(ULINT_UNDEFINED);
504
				}
505
506
				/* If the new value would not fit in the
507
				column, do not allow the update */
508
509
				if (!dfield_is_null(&ufield->new_val)
510
				    && dtype_get_at_most_n_mbchars(
511
					col->prtype,
512
					col->mbminlen, col->mbmaxlen,
513
					col->len,
514
					ufield_len,
515
					dfield_get_data(&ufield->new_val))
516
				    < ufield_len) {
517
518
					return(ULINT_UNDEFINED);
519
				}
520
521
				/* If the parent column type has a different
522
				length than the child column type, we may
523
				need to pad with spaces the new value of the
524
				child column */
525
526
				min_size = dict_col_get_min_size(col);
527
528
				/* Because UNIV_SQL_NULL (the marker
529
				of SQL NULL values) exceeds all possible
530
				values of min_size, the test below will
531
				not hold for SQL NULL columns. */
532
533
				if (min_size > ufield_len) {
534
535
					char*		pad_start;
536
					const char*	pad_end;
537
					char*		padded_data
538
						= mem_heap_alloc(
539
							heap, min_size);
540
					pad_start = padded_data + ufield_len;
541
					pad_end = padded_data + min_size;
542
543
					memcpy(padded_data,
544
					       dfield_get_data(&ufield
545
							       ->new_val),
546
					       dfield_get_len(&ufield
547
							      ->new_val));
548
549
					switch (UNIV_EXPECT(col->mbminlen,1)) {
550
					default:
551
						ut_error;
552
						return(ULINT_UNDEFINED);
553
					case 1:
554
						if (UNIV_UNLIKELY
555
						    (dtype_get_charset_coll(
556
							    col->prtype)
557
						     == DATA_MYSQL_BINARY_CHARSET_COLL)) {
558
							/* Do not pad BINARY
559
							columns. */
560
							return(ULINT_UNDEFINED);
561
						}
562
563
						/* space=0x20 */
564
						memset(pad_start, 0x20,
565
						       pad_end - pad_start);
566
						break;
567
					case 2:
568
						/* space=0x0020 */
569
						ut_a(!(ufield_len % 2));
570
						ut_a(!(min_size % 2));
571
						do {
572
							*pad_start++ = 0x00;
573
							*pad_start++ = 0x20;
574
						} while (pad_start < pad_end);
575
						break;
576
					}
577
578
					dfield_set_data(&ufield->new_val,
579
							padded_data, min_size);
580
				}
581
582
				n_fields_updated++;
583
			}
584
		}
585
	}
586
587
	update->n_fields = n_fields_updated;
588
589
	return(n_fields_updated);
590
}
591
592
/*************************************************************************
593
Set detailed error message associated with foreign key errors for
594
the given transaction. */
595
static
596
void
597
row_ins_set_detailed(
598
/*=================*/
599
	trx_t*		trx,		/* in: transaction */
600
	dict_foreign_t*	foreign)	/* in: foreign key constraint */
601
{
602
	mutex_enter(&srv_misc_tmpfile_mutex);
603
	rewind(srv_misc_tmpfile);
604
605
	if (os_file_set_eof(srv_misc_tmpfile)) {
606
		ut_print_name(srv_misc_tmpfile, trx, TRUE,
607
			      foreign->foreign_table_name);
608
		dict_print_info_on_foreign_key_in_create_format(
609
			srv_misc_tmpfile, trx, foreign, FALSE);
610
		trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
611
	} else {
612
		trx_set_detailed_error(trx, "temp file operation failed");
613
	}
614
615
	mutex_exit(&srv_misc_tmpfile_mutex);
616
}
617
618
/*************************************************************************
619
Reports a foreign key error associated with an update or a delete of a
620
parent table index entry. */
621
static
622
void
623
row_ins_foreign_report_err(
624
/*=======================*/
625
	const char*	errstr,		/* in: error string from the viewpoint
626
					of the parent table */
627
	que_thr_t*	thr,		/* in: query thread whose run_node
628
					is an update node */
629
	dict_foreign_t*	foreign,	/* in: foreign key constraint */
630
	const rec_t*	rec,		/* in: a matching index record in the
631
					child table */
632
	const dtuple_t*	entry)		/* in: index entry in the parent
633
					table */
634
{
635
	FILE*	ef	= dict_foreign_err_file;
636
	trx_t*	trx	= thr_get_trx(thr);
637
638
	row_ins_set_detailed(trx, foreign);
639
640
	mutex_enter(&dict_foreign_err_mutex);
641
	rewind(ef);
642
	ut_print_timestamp(ef);
643
	fputs(" Transaction:\n", ef);
644
	trx_print(ef, trx, 600);
645
646
	fputs("Foreign key constraint fails for table ", ef);
647
	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
648
	fputs(":\n", ef);
649
	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
650
							TRUE);
651
	putc('\n', ef);
652
	fputs(errstr, ef);
653
	fputs(" in parent table, in index ", ef);
654
	ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
655
	if (entry) {
656
		fputs(" tuple:\n", ef);
657
		dtuple_print(ef, entry);
658
	}
659
	fputs("\nBut in child table ", ef);
660
	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
661
	fputs(", in index ", ef);
662
	ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
663
	if (rec) {
664
		fputs(", there is a record:\n", ef);
665
		rec_print(ef, rec, foreign->foreign_index);
666
	} else {
667
		fputs(", the record is not available\n", ef);
668
	}
669
	putc('\n', ef);
670
671
	mutex_exit(&dict_foreign_err_mutex);
672
}
673
674
/*************************************************************************
675
Reports a foreign key error to dict_foreign_err_file when we are trying
676
to add an index entry to a child table. Note that the adding may be the result
677
of an update, too. */
678
static
679
void
680
row_ins_foreign_report_add_err(
681
/*===========================*/
682
	trx_t*		trx,		/* in: transaction */
683
	dict_foreign_t*	foreign,	/* in: foreign key constraint */
684
	const rec_t*	rec,		/* in: a record in the parent table:
685
					it does not match entry because we
686
					have an error! */
687
	const dtuple_t*	entry)		/* in: index entry to insert in the
688
					child table */
689
{
690
	FILE*	ef	= dict_foreign_err_file;
691
692
	row_ins_set_detailed(trx, foreign);
693
694
	mutex_enter(&dict_foreign_err_mutex);
695
	rewind(ef);
696
	ut_print_timestamp(ef);
697
	fputs(" Transaction:\n", ef);
698
	trx_print(ef, trx, 600);
699
	fputs("Foreign key constraint fails for table ", ef);
700
	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
701
	fputs(":\n", ef);
702
	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
703
							TRUE);
704
	fputs("\nTrying to add in child table, in index ", ef);
705
	ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
706
	if (entry) {
707
		fputs(" tuple:\n", ef);
708
		/* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
709
		It would be better to only display the user columns. */
710
		dtuple_print(ef, entry);
711
	}
712
	fputs("\nBut in parent table ", ef);
713
	ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
714
	fputs(", in index ", ef);
715
	ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
716
	fputs(",\nthe closest match we can find is record:\n", ef);
717
	if (rec && page_rec_is_supremum(rec)) {
718
		/* If the cursor ended on a supremum record, it is better
719
		to report the previous record in the error message, so that
720
		the user gets a more descriptive error message. */
721
		rec = page_rec_get_prev_const(rec);
722
	}
723
724
	if (rec) {
725
		rec_print(ef, rec, foreign->referenced_index);
726
	}
727
	putc('\n', ef);
728
729
	mutex_exit(&dict_foreign_err_mutex);
730
}
731
732
/*************************************************************************
733
Invalidate the query cache for the given table. */
734
static
735
void
736
row_ins_invalidate_query_cache(
737
/*===========================*/
1003.1.19 by Brian Aker
Clean up final code bits (from previous builds).
738
	que_thr_t*	unused,		/* in: query thread whose run_node
641.1.2 by Monty Taylor
Imported 1.0.1 with clean - with no changes.
739
					is an update node */
740
	const char*	name)		/* in: table name prefixed with
741
					database name and a '/' character */
742
{
743
	char*	buf;
744
	char*	ptr;
745
	ulint	len = strlen(name) + 1;
746
1003.1.19 by Brian Aker
Clean up final code bits (from previous builds).
747
        (void)unused;
748
641.1.2 by Monty Taylor
Imported 1.0.1 with clean - with no changes.
749
	buf = mem_strdupl(name, len);
750
751
	ptr = strchr(buf, '/');
752
	ut_a(ptr);
753
	*ptr = '\0';
754
755
	mem_free(buf);
756
}
757
758
/*************************************************************************
759
Perform referential actions or checks when a parent row is deleted or updated
760
and the constraint had an ON DELETE or ON UPDATE condition which was not
761
RESTRICT. */
762
static
763
ulint
764
row_ins_foreign_check_on_constraint(
765
/*================================*/
766
					/* out: DB_SUCCESS, DB_LOCK_WAIT,
767
					or error code */
768
	que_thr_t*	thr,		/* in: query thread whose run_node
769
					is an update node */
770
	dict_foreign_t*	foreign,	/* in: foreign key constraint whose
771
					type is != 0 */
772
	btr_pcur_t*	pcur,		/* in: cursor placed on a matching
773
					index record in the child table */
774
	dtuple_t*	entry,		/* in: index entry in the parent
775
					table */
776
	mtr_t*		mtr)		/* in: mtr holding the latch of pcur
777
					page */
778
{
779
	upd_node_t*	node;
780
	upd_node_t*	cascade;
781
	dict_table_t*	table		= foreign->foreign_table;
782
	dict_index_t*	index;
783
	dict_index_t*	clust_index;
784
	dtuple_t*	ref;
785
	mem_heap_t*	upd_vec_heap	= NULL;
786
	const rec_t*	rec;
787
	const rec_t*	clust_rec;
788
	const buf_block_t* clust_block;
789
	upd_t*		update;
790
	ulint		n_to_update;
791
	ulint		err;
792
	ulint		i;
793
	trx_t*		trx;
794
	mem_heap_t*	tmp_heap	= NULL;
795
796
	ut_a(thr);
797
	ut_a(foreign);
798
	ut_a(pcur);
799
	ut_a(mtr);
800
801
	trx = thr_get_trx(thr);
802
803
	/* Since we are going to delete or update a row, we have to invalidate
804
	the MySQL query cache for table. A deadlock of threads is not possible
805
	here because the caller of this function does not hold any latches with
806
	the sync0sync.h rank above the kernel mutex. The query cache mutex has
807
	a rank just above the kernel mutex. */
808
809
	row_ins_invalidate_query_cache(thr, table->name);
810
811
	node = thr->run_node;
812
813
	if (node->is_delete && 0 == (foreign->type
814
				     & (DICT_FOREIGN_ON_DELETE_CASCADE
815
					| DICT_FOREIGN_ON_DELETE_SET_NULL))) {
816
817
		row_ins_foreign_report_err("Trying to delete",
818
					   thr, foreign,
819
					   btr_pcur_get_rec(pcur), entry);
820
821
		return(DB_ROW_IS_REFERENCED);
822
	}
823
824
	if (!node->is_delete && 0 == (foreign->type
825
				      & (DICT_FOREIGN_ON_UPDATE_CASCADE
826
					 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
827
828
		/* This is an UPDATE */
829
830
		row_ins_foreign_report_err("Trying to update",
831
					   thr, foreign,
832
					   btr_pcur_get_rec(pcur), entry);
833
834
		return(DB_ROW_IS_REFERENCED);
835
	}
836
837
	if (node->cascade_node == NULL) {
838
		/* Extend our query graph by creating a child to current
839
		update node. The child is used in the cascade or set null
840
		operation. */
841
842
		node->cascade_heap = mem_heap_create(128);
843
		node->cascade_node = row_create_update_node_for_mysql(
844
			table, node->cascade_heap);
845
		que_node_set_parent(node->cascade_node, node);
846
	}
847
848
	/* Initialize cascade_node to do the operation we want. Note that we
849
	use the SAME cascade node to do all foreign key operations of the
850
	SQL DELETE: the table of the cascade node may change if there are
851
	several child tables to the table where the delete is done! */
852
853
	cascade = node->cascade_node;
854
855
	cascade->table = table;
856
857
	cascade->foreign = foreign;
858
859
	if (node->is_delete
860
	    && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
861
		cascade->is_delete = TRUE;
862
	} else {
863
		cascade->is_delete = FALSE;
864
865
		if (foreign->n_fields > cascade->update_n_fields) {
866
			/* We have to make the update vector longer */
867
868
			cascade->update = upd_create(foreign->n_fields,
869
						     node->cascade_heap);
870
			cascade->update_n_fields = foreign->n_fields;
871
		}
872
	}
873
874
	/* We do not allow cyclic cascaded updating (DELETE is allowed,
875
	but not UPDATE) of the same table, as this can lead to an infinite
876
	cycle. Check that we are not updating the same table which is
877
	already being modified in this cascade chain. We have to check
878
	this also because the modification of the indexes of a 'parent'
879
	table may still be incomplete, and we must avoid seeing the indexes
880
	of the parent table in an inconsistent state! */
881
882
	if (!cascade->is_delete
883
	    && row_ins_cascade_ancestor_updates_table(cascade, table)) {
884
885
		/* We do not know if this would break foreign key
886
		constraints, but play safe and return an error */
887
888
		err = DB_ROW_IS_REFERENCED;
889
890
		row_ins_foreign_report_err(
891
			"Trying an update, possibly causing a cyclic"
892
			" cascaded update\n"
893
			"in the child table,", thr, foreign,
894
			btr_pcur_get_rec(pcur), entry);
895
896
		goto nonstandard_exit_func;
897
	}
898
899
	if (row_ins_cascade_n_ancestors(cascade) >= 15) {
900
		err = DB_ROW_IS_REFERENCED;
901
902
		row_ins_foreign_report_err(
903
			"Trying a too deep cascaded delete or update\n",
904
			thr, foreign, btr_pcur_get_rec(pcur), entry);
905
906
		goto nonstandard_exit_func;
907
	}
908
909
	index = btr_pcur_get_btr_cur(pcur)->index;
910
911
	ut_a(index == foreign->foreign_index);
912
913
	rec = btr_pcur_get_rec(pcur);
914
915
	if (dict_index_is_clust(index)) {
916
		/* pcur is already positioned in the clustered index of
917
		the child table */
918
919
		clust_index = index;
920
		clust_rec = rec;
921
		clust_block = btr_pcur_get_block(pcur);
922
	} else {
923
		/* We have to look for the record in the clustered index
924
		in the child table */
925
926
		clust_index = dict_table_get_first_index(table);
927
928
		tmp_heap = mem_heap_create(256);
929
930
		ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
931
					tmp_heap);
932
		btr_pcur_open_with_no_init(clust_index, ref,
933
					   PAGE_CUR_LE, BTR_SEARCH_LEAF,
934
					   cascade->pcur, 0, mtr);
935
936
		clust_rec = btr_pcur_get_rec(cascade->pcur);
937
		clust_block = btr_pcur_get_block(cascade->pcur);
938
939
		if (!page_rec_is_user_rec(clust_rec)
940
		    || btr_pcur_get_low_match(cascade->pcur)
941
		    < dict_index_get_n_unique(clust_index)) {
942
943
			fputs("InnoDB: error in cascade of a foreign key op\n"
944
			      "InnoDB: ", stderr);
945
			dict_index_name_print(stderr, trx, index);
946
947
			fputs("\n"
948
			      "InnoDB: record ", stderr);
949
			rec_print(stderr, rec, index);
950
			fputs("\n"
951
			      "InnoDB: clustered record ", stderr);
952
			rec_print(stderr, clust_rec, clust_index);
953
			fputs("\n"
954
			      "InnoDB: Submit a detailed bug report to"
955
			      " http://bugs.mysql.com\n", stderr);
956
957
			err = DB_SUCCESS;
958
959
			goto nonstandard_exit_func;
960
		}
961
	}
962
963
	/* Set an X-lock on the row to delete or update in the child table */
964
965
	err = lock_table(0, table, LOCK_IX, thr);
966
967
	if (err == DB_SUCCESS) {
968
		/* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
969
		we already have a normal shared lock on the appropriate
970
		gap if the search criterion was not unique */
971
972
		err = lock_clust_rec_read_check_and_lock_alt(
973
			0, clust_block, clust_rec, clust_index,
974
			LOCK_X, LOCK_REC_NOT_GAP, thr);
975
	}
976
977
	if (err != DB_SUCCESS) {
978
979
		goto nonstandard_exit_func;
980
	}
981
982
	if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
983
		/* This can happen if there is a circular reference of
984
		rows such that cascading delete comes to delete a row
985
		already in the process of being delete marked */
986
		err = DB_SUCCESS;
987
988
		goto nonstandard_exit_func;
989
	}
990
991
	if ((node->is_delete
992
	     && (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL))
993
	    || (!node->is_delete
994
		&& (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
995
996
		/* Build the appropriate update vector which sets
997
		foreign->n_fields first fields in rec to SQL NULL */
998
999
		update = cascade->update;
1000
1001
		update->info_bits = 0;
1002
		update->n_fields = foreign->n_fields;
1003
1004
		for (i = 0; i < foreign->n_fields; i++) {
1005
			upd_field_t*	ufield = &update->fields[i];
1006
1007
			ufield->field_no = dict_table_get_nth_col_pos(
1008
				table,
1009
				dict_index_get_nth_col_no(index, i));
1010
			ufield->orig_len = 0;
1011
			ufield->exp = NULL;
1012
			dfield_set_null(&ufield->new_val);
1013
		}
1014
	}
1015
1016
	if (!node->is_delete
1017
	    && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1018
1019
		/* Build the appropriate update vector which sets changing
1020
		foreign->n_fields first fields in rec to new values */
1021
1022
		upd_vec_heap = mem_heap_create(256);
1023
1024
		n_to_update = row_ins_cascade_calc_update_vec(node, foreign,
1025
							      upd_vec_heap);
1026
		if (n_to_update == ULINT_UNDEFINED) {
1027
			err = DB_ROW_IS_REFERENCED;
1028
1029
			row_ins_foreign_report_err(
1030
				"Trying a cascaded update where the"
1031
				" updated value in the child\n"
1032
				"table would not fit in the length"
1033
				" of the column, or the value would\n"
1034
				"be NULL and the column is"
1035
				" declared as not NULL in the child table,",
1036
				thr, foreign, btr_pcur_get_rec(pcur), entry);
1037
1038
			goto nonstandard_exit_func;
1039
		}
1040
1041
		if (cascade->update->n_fields == 0) {
1042
1043
			/* The update does not change any columns referred
1044
			to in this foreign key constraint: no need to do
1045
			anything */
1046
1047
			err = DB_SUCCESS;
1048
1049
			goto nonstandard_exit_func;
1050
		}
1051
	}
1052
1053
	/* Store pcur position and initialize or store the cascade node
1054
	pcur stored position */
1055
1056
	btr_pcur_store_position(pcur, mtr);
1057
1058
	if (index == clust_index) {
1059
		btr_pcur_copy_stored_position(cascade->pcur, pcur);
1060
	} else {
1061
		btr_pcur_store_position(cascade->pcur, mtr);
1062
	}
1063
1064
	mtr_commit(mtr);
1065
1066
	ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1067
1068
	cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1069
1070
	err = row_update_cascade_for_mysql(thr, cascade,
1071
					   foreign->foreign_table);
1072
1073
	if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
1074
		fprintf(stderr,
1075
			"InnoDB: error: table %s has the counter 0"
1076
			" though there is\n"
1077
			"InnoDB: a FOREIGN KEY check running on it.\n",
1078
			foreign->foreign_table->name);
1079
	}
1080
1081
	/* Release the data dictionary latch for a while, so that we do not
1082
	starve other threads from doing CREATE TABLE etc. if we have a huge
1083
	cascaded operation running. The counter n_foreign_key_checks_running
1084
	will prevent other users from dropping or ALTERing the table when we
1085
	release the latch. */
1086
1087
	row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1088
	row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1089
1090
	mtr_start(mtr);
1091
1092
	/* Restore pcur position */
1093
1094
	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1095
1096
	if (tmp_heap) {
1097
		mem_heap_free(tmp_heap);
1098
	}
1099
1100
	if (upd_vec_heap) {
1101
		mem_heap_free(upd_vec_heap);
1102
	}
1103
1104
	return(err);
1105
1106
nonstandard_exit_func:
1107
	if (tmp_heap) {
1108
		mem_heap_free(tmp_heap);
1109
	}
1110
1111
	if (upd_vec_heap) {
1112
		mem_heap_free(upd_vec_heap);
1113
	}
1114
1115
	btr_pcur_store_position(pcur, mtr);
1116
1117
	mtr_commit(mtr);
1118
	mtr_start(mtr);
1119
1120
	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1121
1122
	return(err);
1123
}
1124
1125
/*************************************************************************
1126
Sets a shared lock on a record. Used in locking possible duplicate key
1127
records and also in checking foreign key constraints. */
1128
static
1129
ulint
1130
row_ins_set_shared_rec_lock(
1131
/*========================*/
1132
					/* out: DB_SUCCESS or error code */
1133
	ulint			type,	/* in: LOCK_ORDINARY, LOCK_GAP, or
1134
					LOCK_REC_NOT_GAP type lock */
1135
	const buf_block_t*	block,	/* in: buffer block of rec */
1136
	const rec_t*		rec,	/* in: record */
1137
	dict_index_t*		index,	/* in: index */
1138
	const ulint*		offsets,/* in: rec_get_offsets(rec, index) */
1139
	que_thr_t*		thr)	/* in: query thread */
1140
{
1141
	ulint	err;
1142
1143
	ut_ad(rec_offs_validate(rec, index, offsets));
1144
1145
	if (dict_index_is_clust(index)) {
1146
		err = lock_clust_rec_read_check_and_lock(
1147
			0, block, rec, index, offsets, LOCK_S, type, thr);
1148
	} else {
1149
		err = lock_sec_rec_read_check_and_lock(
1150
			0, block, rec, index, offsets, LOCK_S, type, thr);
1151
	}
1152
1153
	return(err);
1154
}
1155
1156
#ifndef UNIV_HOTBACKUP
1157
/*************************************************************************
1158
Sets a exclusive lock on a record. Used in locking possible duplicate key
1159
records */
1160
static
1161
ulint
1162
row_ins_set_exclusive_rec_lock(
1163
/*===========================*/
1164
					/* out: DB_SUCCESS or error code */
1165
	ulint			type,	/* in: LOCK_ORDINARY, LOCK_GAP, or
1166
					LOCK_REC_NOT_GAP type lock */
1167
	const buf_block_t*	block,	/* in: buffer block of rec */
1168
	const rec_t*		rec,	/* in: record */
1169
	dict_index_t*		index,	/* in: index */
1170
	const ulint*		offsets,/* in: rec_get_offsets(rec, index) */
1171
	que_thr_t*		thr)	/* in: query thread */
1172
{
1173
	ulint	err;
1174
1175
	ut_ad(rec_offs_validate(rec, index, offsets));
1176
1177
	if (dict_index_is_clust(index)) {
1178
		err = lock_clust_rec_read_check_and_lock(
1179
			0, block, rec, index, offsets, LOCK_X, type, thr);
1180
	} else {
1181
		err = lock_sec_rec_read_check_and_lock(
1182
			0, block, rec, index, offsets, LOCK_X, type, thr);
1183
	}
1184
1185
	return(err);
1186
}
1187
#endif /* !UNIV_HOTBACKUP */
1188
1189
/*******************************************************************
1190
Checks if foreign key constraint fails for an index entry. Sets shared locks
1191
which lock either the success or the failure of the constraint. NOTE that
1192
the caller must have a shared latch on dict_operation_lock. */
1193
UNIV_INTERN
1194
ulint
1195
row_ins_check_foreign_constraint(
1196
/*=============================*/
1197
				/* out: DB_SUCCESS,
1198
				DB_NO_REFERENCED_ROW,
1199
				or DB_ROW_IS_REFERENCED */
1200
	ibool		check_ref,/* in: TRUE if we want to check that
1201
				the referenced table is ok, FALSE if we
1202
				want to to check the foreign key table */
1203
	dict_foreign_t*	foreign,/* in: foreign constraint; NOTE that the
1204
				tables mentioned in it must be in the
1205
				dictionary cache if they exist at all */
1206
	dict_table_t*	table,	/* in: if check_ref is TRUE, then the foreign
1207
				table, else the referenced table */
1208
	dtuple_t*	entry,	/* in: index entry for index */
1209
	que_thr_t*	thr)	/* in: query thread */
1210
{
1211
	upd_node_t*	upd_node;
1212
	dict_table_t*	check_table;
1213
	dict_index_t*	check_index;
1214
	ulint		n_fields_cmp;
1215
	btr_pcur_t	pcur;
1216
	ibool		moved;
1217
	int		cmp;
1218
	ulint		err;
1219
	ulint		i;
1220
	mtr_t		mtr;
1221
	trx_t*		trx		= thr_get_trx(thr);
1222
	mem_heap_t*	heap		= NULL;
1223
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1224
	ulint*		offsets		= offsets_;
1225
	rec_offs_init(offsets_);
1226
1227
run_again:
1228
#ifdef UNIV_SYNC_DEBUG
1229
	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
1230
#endif /* UNIV_SYNC_DEBUG */
1231
1232
	err = DB_SUCCESS;
1233
1234
	if (trx->check_foreigns == FALSE) {
1235
		/* The user has suppressed foreign key checks currently for
1236
		this session */
1237
		goto exit_func;
1238
	}
1239
1240
	/* If any of the foreign key fields in entry is SQL NULL, we
1241
	suppress the foreign key check: this is compatible with Oracle,
1242
	for example */
1243
1244
	for (i = 0; i < foreign->n_fields; i++) {
1245
		if (UNIV_SQL_NULL == dfield_get_len(
1246
			    dtuple_get_nth_field(entry, i))) {
1247
1248
			goto exit_func;
1249
		}
1250
	}
1251
1252
	if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1253
		upd_node = thr->run_node;
1254
1255
		if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1256
			/* If a cascaded update is done as defined by a
1257
			foreign key constraint, do not check that
1258
			constraint for the child row. In ON UPDATE CASCADE
1259
			the update of the parent row is only half done when
1260
			we come here: if we would check the constraint here
1261
			for the child row it would fail.
1262
1263
			A QUESTION remains: if in the child table there are
1264
			several constraints which refer to the same parent
1265
			table, we should merge all updates to the child as
1266
			one update? And the updates can be contradictory!
1267
			Currently we just perform the update associated
1268
			with each foreign key constraint, one after
1269
			another, and the user has problems predicting in
1270
			which order they are performed. */
1271
1272
			goto exit_func;
1273
		}
1274
	}
1275
1276
	if (check_ref) {
1277
		check_table = foreign->referenced_table;
1278
		check_index = foreign->referenced_index;
1279
	} else {
1280
		check_table = foreign->foreign_table;
1281
		check_index = foreign->foreign_index;
1282
	}
1283
1284
	if (check_table == NULL || check_table->ibd_file_missing) {
1285
		if (check_ref) {
1286
			FILE*	ef = dict_foreign_err_file;
1287
1288
			row_ins_set_detailed(trx, foreign);
1289
1290
			mutex_enter(&dict_foreign_err_mutex);
1291
			rewind(ef);
1292
			ut_print_timestamp(ef);
1293
			fputs(" Transaction:\n", ef);
1294
			trx_print(ef, trx, 600);
1295
			fputs("Foreign key constraint fails for table ", ef);
1296
			ut_print_name(ef, trx, TRUE,
1297
				      foreign->foreign_table_name);
1298
			fputs(":\n", ef);
1299
			dict_print_info_on_foreign_key_in_create_format(
1300
				ef, trx, foreign, TRUE);
1301
			fputs("\nTrying to add to index ", ef);
1302
			ut_print_name(ef, trx, FALSE,
1303
				      foreign->foreign_index->name);
1304
			fputs(" tuple:\n", ef);
1305
			dtuple_print(ef, entry);
1306
			fputs("\nBut the parent table ", ef);
1307
			ut_print_name(ef, trx, TRUE,
1308
				      foreign->referenced_table_name);
1309
			fputs("\nor its .ibd file does"
1310
			      " not currently exist!\n", ef);
1311
			mutex_exit(&dict_foreign_err_mutex);
1312
1313
			err = DB_NO_REFERENCED_ROW;
1314
		}
1315
1316
		goto exit_func;
1317
	}
1318
1319
	ut_a(check_table);
1320
	ut_a(check_index);
1321
1322
	if (check_table != table) {
1323
		/* We already have a LOCK_IX on table, but not necessarily
1324
		on check_table */
1325
1326
		err = lock_table(0, check_table, LOCK_IS, thr);
1327
1328
		if (err != DB_SUCCESS) {
1329
1330
			goto do_possible_lock_wait;
1331
		}
1332
	}
1333
1334
	mtr_start(&mtr);
1335
1336
	/* Store old value on n_fields_cmp */
1337
1338
	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1339
1340
	dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1341
1342
	btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1343
		      BTR_SEARCH_LEAF, &pcur, &mtr);
1344
1345
	/* Scan index records and check if there is a matching record */
1346
1347
	for (;;) {
1348
		const rec_t*		rec = btr_pcur_get_rec(&pcur);
1349
		const buf_block_t*	block = btr_pcur_get_block(&pcur);
1350
1351
		if (page_rec_is_infimum(rec)) {
1352
1353
			goto next_rec;
1354
		}
1355
1356
		offsets = rec_get_offsets(rec, check_index,
1357
					  offsets, ULINT_UNDEFINED, &heap);
1358
1359
		if (page_rec_is_supremum(rec)) {
1360
1361
			err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1362
							  rec, check_index,
1363
							  offsets, thr);
1364
			if (err != DB_SUCCESS) {
1365
1366
				break;
1367
			}
1368
1369
			goto next_rec;
1370
		}
1371
1372
		cmp = cmp_dtuple_rec(entry, rec, offsets);
1373
1374
		if (cmp == 0) {
1375
			if (rec_get_deleted_flag(rec,
1376
						 rec_offs_comp(offsets))) {
1377
				err = row_ins_set_shared_rec_lock(
1378
					LOCK_ORDINARY, block,
1379
					rec, check_index, offsets, thr);
1380
				if (err != DB_SUCCESS) {
1381
1382
					break;
1383
				}
1384
			} else {
1385
				/* Found a matching record. Lock only
1386
				a record because we can allow inserts
1387
				into gaps */
1388
1389
				err = row_ins_set_shared_rec_lock(
1390
					LOCK_REC_NOT_GAP, block,
1391
					rec, check_index, offsets, thr);
1392
1393
				if (err != DB_SUCCESS) {
1394
1395
					break;
1396
				}
1397
1398
				if (check_ref) {
1399
					err = DB_SUCCESS;
1400
1401
					break;
1402
				} else if (foreign->type != 0) {
1403
					/* There is an ON UPDATE or ON DELETE
1404
					condition: check them in a separate
1405
					function */
1406
1407
					err = row_ins_foreign_check_on_constraint(
1408
						thr, foreign, &pcur, entry,
1409
						&mtr);
1410
					if (err != DB_SUCCESS) {
1411
						/* Since reporting a plain
1412
						"duplicate key" error
1413
						message to the user in
1414
						cases where a long CASCADE
1415
						operation would lead to a
1416
						duplicate key in some
1417
						other table is very
1418
						confusing, map duplicate
1419
						key errors resulting from
1420
						FK constraints to a
1421
						separate error code. */
1422
1423
						if (err == DB_DUPLICATE_KEY) {
1424
							err = DB_FOREIGN_DUPLICATE_KEY;
1425
						}
1426
1427
						break;
1428
					}
1429
1430
					/* row_ins_foreign_check_on_constraint
1431
					may have repositioned pcur on a
1432
					different block */
1433
					block = btr_pcur_get_block(&pcur);
1434
				} else {
1435
					row_ins_foreign_report_err(
1436
						"Trying to delete or update",
1437
						thr, foreign, rec, entry);
1438
1439
					err = DB_ROW_IS_REFERENCED;
1440
					break;
1441
				}
1442
			}
1443
		}
1444
1445
		if (cmp < 0) {
1446
			err = row_ins_set_shared_rec_lock(
1447
				LOCK_GAP, block,
1448
				rec, check_index, offsets, thr);
1449
			if (err != DB_SUCCESS) {
1450
1451
				break;
1452
			}
1453
1454
			if (check_ref) {
1455
				err = DB_NO_REFERENCED_ROW;
1456
				row_ins_foreign_report_add_err(
1457
					trx, foreign, rec, entry);
1458
			} else {
1459
				err = DB_SUCCESS;
1460
			}
1461
1462
			break;
1463
		}
1464
1465
		ut_a(cmp == 0);
1466
next_rec:
1467
		moved = btr_pcur_move_to_next(&pcur, &mtr);
1468
1469
		if (!moved) {
1470
			if (check_ref) {
1471
				rec = btr_pcur_get_rec(&pcur);
1472
				row_ins_foreign_report_add_err(
1473
					trx, foreign, rec, entry);
1474
				err = DB_NO_REFERENCED_ROW;
1475
			} else {
1476
				err = DB_SUCCESS;
1477
			}
1478
1479
			break;
1480
		}
1481
	}
1482
1483
	btr_pcur_close(&pcur);
1484
1485
	mtr_commit(&mtr);
1486
1487
	/* Restore old value */
1488
	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1489
1490
do_possible_lock_wait:
1491
	if (err == DB_LOCK_WAIT) {
1492
		trx->error_state = err;
1493
1494
		que_thr_stop_for_mysql(thr);
1495
1496
		srv_suspend_mysql_thread(thr);
1497
1498
		if (trx->error_state == DB_SUCCESS) {
1499
1500
			goto run_again;
1501
		}
1502
1503
		err = trx->error_state;
1504
	}
1505
1506
exit_func:
1507
	if (UNIV_LIKELY_NULL(heap)) {
1508
		mem_heap_free(heap);
1509
	}
1510
	return(err);
1511
}
1512
1513
/*******************************************************************
1514
Checks if foreign key constraints fail for an index entry. If index
1515
is not mentioned in any constraint, this function does nothing,
1516
Otherwise does searches to the indexes of referenced tables and
1517
sets shared locks which lock either the success or the failure of
1518
a constraint. */
1519
static
1520
ulint
1521
row_ins_check_foreign_constraints(
1522
/*==============================*/
1523
				/* out: DB_SUCCESS or error code */
1524
	dict_table_t*	table,	/* in: table */
1525
	dict_index_t*	index,	/* in: index */
1526
	dtuple_t*	entry,	/* in: index entry for index */
1527
	que_thr_t*	thr)	/* in: query thread */
1528
{
1529
	dict_foreign_t*	foreign;
1530
	ulint		err;
1531
	trx_t*		trx;
1532
	ibool		got_s_lock	= FALSE;
1533
1534
	trx = thr_get_trx(thr);
1535
1536
	foreign = UT_LIST_GET_FIRST(table->foreign_list);
1537
1538
	while (foreign) {
1539
		if (foreign->foreign_index == index) {
1540
1541
			if (foreign->referenced_table == NULL) {
1542
				dict_table_get(foreign->referenced_table_name,
1543
					       FALSE);
1544
			}
1545
1546
			if (0 == trx->dict_operation_lock_mode) {
1547
				got_s_lock = TRUE;
1548
1549
				row_mysql_freeze_data_dictionary(trx);
1550
			}
1551
1552
			if (foreign->referenced_table) {
1553
				mutex_enter(&(dict_sys->mutex));
1554
1555
				(foreign->referenced_table
1556
				 ->n_foreign_key_checks_running)++;
1557
1558
				mutex_exit(&(dict_sys->mutex));
1559
			}
1560
1561
			/* NOTE that if the thread ends up waiting for a lock
1562
			we will release dict_operation_lock temporarily!
1563
			But the counter on the table protects the referenced
1564
			table from being dropped while the check is running. */
1565
1566
			err = row_ins_check_foreign_constraint(
1567
				TRUE, foreign, table, entry, thr);
1568
1569
			if (foreign->referenced_table) {
1570
				mutex_enter(&(dict_sys->mutex));
1571
1572
				ut_a(foreign->referenced_table
1573
				     ->n_foreign_key_checks_running > 0);
1574
				(foreign->referenced_table
1575
				 ->n_foreign_key_checks_running)--;
1576
1577
				mutex_exit(&(dict_sys->mutex));
1578
			}
1579
1580
			if (got_s_lock) {
1581
				row_mysql_unfreeze_data_dictionary(trx);
1582
			}
1583
1584
			if (err != DB_SUCCESS) {
1585
				return(err);
1586
			}
1587
		}
1588
1589
		foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
1590
	}
1591
1592
	return(DB_SUCCESS);
1593
}
1594
1595
#ifndef UNIV_HOTBACKUP
1596
/*******************************************************************
1597
Checks if a unique key violation to rec would occur at the index entry
1598
insert. */
1599
static
1600
ibool
1601
row_ins_dupl_error_with_rec(
1602
/*========================*/
1603
				/* out: TRUE if error */
1604
	const rec_t*	rec,	/* in: user record; NOTE that we assume
1605
				that the caller already has a record lock on
1606
				the record! */
1607
	const dtuple_t*	entry,	/* in: entry to insert */
1608
	dict_index_t*	index,	/* in: index */
1609
	const ulint*	offsets)/* in: rec_get_offsets(rec, index) */
1610
{
1611
	ulint	matched_fields;
1612
	ulint	matched_bytes;
1613
	ulint	n_unique;
1614
	ulint	i;
1615
1616
	ut_ad(rec_offs_validate(rec, index, offsets));
1617
1618
	n_unique = dict_index_get_n_unique(index);
1619
1620
	matched_fields = 0;
1621
	matched_bytes = 0;
1622
1623
	cmp_dtuple_rec_with_match(entry, rec, offsets,
1624
				  &matched_fields, &matched_bytes);
1625
1626
	if (matched_fields < n_unique) {
1627
1628
		return(FALSE);
1629
	}
1630
1631
	/* In a unique secondary index we allow equal key values if they
1632
	contain SQL NULLs */
1633
1634
	if (!dict_index_is_clust(index)) {
1635
1636
		for (i = 0; i < n_unique; i++) {
1637
			if (UNIV_SQL_NULL == dfield_get_len(
1638
				    dtuple_get_nth_field(entry, i))) {
1639
1640
				return(FALSE);
1641
			}
1642
		}
1643
	}
1644
1645
	return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1646
}
1647
#endif /* !UNIV_HOTBACKUP */
1648
1649
/*******************************************************************
1650
Scans a unique non-clustered index at a given index entry to determine
1651
whether a uniqueness violation has occurred for the key value of the entry.
1652
Set shared locks on possible duplicate records. */
1653
static
1654
ulint
1655
row_ins_scan_sec_index_for_duplicate(
1656
/*=================================*/
1657
				/* out: DB_SUCCESS, DB_DUPLICATE_KEY, or
1658
				DB_LOCK_WAIT */
1659
	dict_index_t*	index,	/* in: non-clustered unique index */
1660
	dtuple_t*	entry,	/* in: index entry */
1661
	que_thr_t*	thr)	/* in: query thread */
1662
{
1663
#ifndef UNIV_HOTBACKUP
1664
	ulint		n_unique;
1665
	ulint		i;
1666
	int		cmp;
1667
	ulint		n_fields_cmp;
1668
	btr_pcur_t	pcur;
1669
	ulint		err		= DB_SUCCESS;
1670
	unsigned	allow_duplicates;
1671
	mtr_t		mtr;
1672
	mem_heap_t*	heap		= NULL;
1673
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1674
	ulint*		offsets		= offsets_;
1675
	rec_offs_init(offsets_);
1676
1677
	n_unique = dict_index_get_n_unique(index);
1678
1679
	/* If the secondary index is unique, but one of the fields in the
1680
	n_unique first fields is NULL, a unique key violation cannot occur,
1681
	since we define NULL != NULL in this case */
1682
1683
	for (i = 0; i < n_unique; i++) {
1684
		if (UNIV_SQL_NULL == dfield_get_len(
1685
			    dtuple_get_nth_field(entry, i))) {
1686
1687
			return(DB_SUCCESS);
1688
		}
1689
	}
1690
1691
	mtr_start(&mtr);
1692
1693
	/* Store old value on n_fields_cmp */
1694
1695
	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1696
1697
	dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index));
1698
1699
	btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
1700
1701
	allow_duplicates = thr_get_trx(thr)->duplicates & TRX_DUP_IGNORE;
1702
1703
	/* Scan index records and check if there is a duplicate */
1704
1705
	do {
1706
		const rec_t*		rec	= btr_pcur_get_rec(&pcur);
1707
		const buf_block_t*	block	= btr_pcur_get_block(&pcur);
1708
1709
		if (page_rec_is_infimum(rec)) {
1710
1711
			continue;
1712
		}
1713
1714
		offsets = rec_get_offsets(rec, index, offsets,
1715
					  ULINT_UNDEFINED, &heap);
1716
1717
		if (allow_duplicates) {
1718
1719
			/* If the SQL-query will update or replace
1720
			duplicate key we will take X-lock for
1721
			duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1722
			INSERT ON DUPLICATE KEY UPDATE). */
1723
1724
			err = row_ins_set_exclusive_rec_lock(
1725
				LOCK_ORDINARY, block,
1726
				rec, index, offsets, thr);
1727
		} else {
1728
1729
			err = row_ins_set_shared_rec_lock(
1730
				LOCK_ORDINARY, block,
1731
				rec, index, offsets, thr);
1732
		}
1733
1734
		if (err != DB_SUCCESS) {
1735
1736
			break;
1737
		}
1738
1739
		if (page_rec_is_supremum(rec)) {
1740
1741
			continue;
1742
		}
1743
1744
		cmp = cmp_dtuple_rec(entry, rec, offsets);
1745
1746
		if (cmp == 0) {
1747
			if (row_ins_dupl_error_with_rec(rec, entry,
1748
							index, offsets)) {
1749
				err = DB_DUPLICATE_KEY;
1750
1751
				thr_get_trx(thr)->error_info = index;
1752
1753
				break;
1754
			}
1755
		}
1756
1757
		if (cmp < 0) {
1758
			break;
1759
		}
1760
1761
		ut_a(cmp == 0);
1762
	} while (btr_pcur_move_to_next(&pcur, &mtr));
1763
1764
	if (UNIV_LIKELY_NULL(heap)) {
1765
		mem_heap_free(heap);
1766
	}
1767
	mtr_commit(&mtr);
1768
1769
	/* Restore old value */
1770
	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1771
1772
	return(err);
1773
#else /* UNIV_HOTBACKUP */
1774
	/* This function depends on MySQL code that is not included in
1775
	InnoDB Hot Backup builds.  Besides, this function should never
1776
	be called in InnoDB Hot Backup. */
1777
	ut_error;
1778
	return(DB_FAIL);
1779
#endif /* UNIV_HOTBACKUP */
1780
}
1781
1782
/*******************************************************************
1783
Checks if a unique key violation error would occur at an index entry
1784
insert. Sets shared locks on possible duplicate records. Works only
1785
for a clustered index! */
1786
static
1787
ulint
1788
row_ins_duplicate_error_in_clust(
1789
/*=============================*/
1790
				/* out: DB_SUCCESS if no error,
1791
				DB_DUPLICATE_KEY if error, DB_LOCK_WAIT if we
1792
				have to wait for a lock on a possible
1793
				duplicate record */
1794
	btr_cur_t*	cursor,	/* in: B-tree cursor */
1795
	dtuple_t*	entry,	/* in: entry to insert */
1796
	que_thr_t*	thr,	/* in: query thread */
1797
	mtr_t*		mtr)	/* in: mtr */
1798
{
1799
#ifndef UNIV_HOTBACKUP
1800
	ulint	err;
1801
	rec_t*	rec;
1802
	ulint	n_unique;
1803
	trx_t*	trx		= thr_get_trx(thr);
1804
	mem_heap_t*heap		= NULL;
1805
	ulint	offsets_[REC_OFFS_NORMAL_SIZE];
1806
	ulint*	offsets		= offsets_;
1807
	rec_offs_init(offsets_);
1808
1809
	UT_NOT_USED(mtr);
1810
1811
	ut_a(dict_index_is_clust(cursor->index));
1812
	ut_ad(dict_index_is_unique(cursor->index));
1813
1814
	/* NOTE: For unique non-clustered indexes there may be any number
1815
	of delete marked records with the same value for the non-clustered
1816
	index key (remember multiversioning), and which differ only in
1817
	the row refererence part of the index record, containing the
1818
	clustered index key fields. For such a secondary index record,
1819
	to avoid race condition, we must FIRST do the insertion and after
1820
	that check that the uniqueness condition is not breached! */
1821
1822
	/* NOTE: A problem is that in the B-tree node pointers on an
1823
	upper level may match more to the entry than the actual existing
1824
	user records on the leaf level. So, even if low_match would suggest
1825
	that a duplicate key violation may occur, this may not be the case. */
1826
1827
	n_unique = dict_index_get_n_unique(cursor->index);
1828
1829
	if (cursor->low_match >= n_unique) {
1830
1831
		rec = btr_cur_get_rec(cursor);
1832
1833
		if (!page_rec_is_infimum(rec)) {
1834
			offsets = rec_get_offsets(rec, cursor->index, offsets,
1835
						  ULINT_UNDEFINED, &heap);
1836
1837
			/* We set a lock on the possible duplicate: this
1838
			is needed in logical logging of MySQL to make
1839
			sure that in roll-forward we get the same duplicate
1840
			errors as in original execution */
1841
1842
			if (trx->duplicates & TRX_DUP_IGNORE) {
1843
1844
				/* If the SQL-query will update or replace
1845
				duplicate key we will take X-lock for
1846
				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1847
				INSERT ON DUPLICATE KEY UPDATE). */
1848
1849
				err = row_ins_set_exclusive_rec_lock(
1850
					LOCK_REC_NOT_GAP,
1851
					btr_cur_get_block(cursor),
1852
					rec, cursor->index, offsets, thr);
1853
			} else {
1854
1855
				err = row_ins_set_shared_rec_lock(
1856
					LOCK_REC_NOT_GAP,
1857
					btr_cur_get_block(cursor), rec,
1858
					cursor->index, offsets, thr);
1859
			}
1860
1861
			if (err != DB_SUCCESS) {
1862
				goto func_exit;
1863
			}
1864
1865
			if (row_ins_dupl_error_with_rec(
1866
				    rec, entry, cursor->index, offsets)) {
1867
				trx->error_info = cursor->index;
1868
				err = DB_DUPLICATE_KEY;
1869
				goto func_exit;
1870
			}
1871
		}
1872
	}
1873
1874
	if (cursor->up_match >= n_unique) {
1875
1876
		rec = page_rec_get_next(btr_cur_get_rec(cursor));
1877
1878
		if (!page_rec_is_supremum(rec)) {
1879
			offsets = rec_get_offsets(rec, cursor->index, offsets,
1880
						  ULINT_UNDEFINED, &heap);
1881
1882
			if (trx->duplicates & TRX_DUP_IGNORE) {
1883
1884
				/* If the SQL-query will update or replace
1885
				duplicate key we will take X-lock for
1886
				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1887
				INSERT ON DUPLICATE KEY UPDATE). */
1888
1889
				err = row_ins_set_exclusive_rec_lock(
1890
					LOCK_REC_NOT_GAP,
1891
					btr_cur_get_block(cursor),
1892
					rec, cursor->index, offsets, thr);
1893
			} else {
1894
1895
				err = row_ins_set_shared_rec_lock(
1896
					LOCK_REC_NOT_GAP,
1897
					btr_cur_get_block(cursor),
1898
					rec, cursor->index, offsets, thr);
1899
			}
1900
1901
			if (err != DB_SUCCESS) {
1902
				goto func_exit;
1903
			}
1904
1905
			if (row_ins_dupl_error_with_rec(
1906
				    rec, entry, cursor->index, offsets)) {
1907
				trx->error_info = cursor->index;
1908
				err = DB_DUPLICATE_KEY;
1909
				goto func_exit;
1910
			}
1911
		}
1912
1913
		ut_a(!dict_index_is_clust(cursor->index));
1914
		/* This should never happen */
1915
	}
1916
1917
	err = DB_SUCCESS;
1918
func_exit:
1919
	if (UNIV_LIKELY_NULL(heap)) {
1920
		mem_heap_free(heap);
1921
	}
1922
	return(err);
1923
#else /* UNIV_HOTBACKUP */
1924
	/* This function depends on MySQL code that is not included in
1925
	InnoDB Hot Backup builds.  Besides, this function should never
1926
	be called in InnoDB Hot Backup. */
1927
	ut_error;
1928
	return(DB_FAIL);
1929
#endif /* UNIV_HOTBACKUP */
1930
}
1931
1932
/*******************************************************************
1933
Checks if an index entry has long enough common prefix with an existing
1934
record so that the intended insert of the entry must be changed to a modify of
1935
the existing record. In the case of a clustered index, the prefix must be
1936
n_unique fields long, and in the case of a secondary index, all fields must be
1937
equal. */
1938
UNIV_INLINE
1939
ulint
1940
row_ins_must_modify(
1941
/*================*/
1942
				/* out: 0 if no update, ROW_INS_PREV if
1943
				previous should be updated; currently we
1944
				do the search so that only the low_match
1945
				record can match enough to the search tuple,
1946
				not the next record */
1947
	btr_cur_t*	cursor)	/* in: B-tree cursor */
1948
{
1949
	ulint	enough_match;
1950
	rec_t*	rec;
1951
1952
	/* NOTE: (compare to the note in row_ins_duplicate_error) Because node
1953
	pointers on upper levels of the B-tree may match more to entry than
1954
	to actual user records on the leaf level, we have to check if the
1955
	candidate record is actually a user record. In a clustered index
1956
	node pointers contain index->n_unique first fields, and in the case
1957
	of a secondary index, all fields of the index. */
1958
1959
	enough_match = dict_index_get_n_unique_in_tree(cursor->index);
1960
1961
	if (cursor->low_match >= enough_match) {
1962
1963
		rec = btr_cur_get_rec(cursor);
1964
1965
		if (!page_rec_is_infimum(rec)) {
1966
1967
			return(ROW_INS_PREV);
1968
		}
1969
	}
1970
1971
	return(0);
1972
}
1973
1974
/*******************************************************************
1975
Tries to insert an index entry to an index. If the index is clustered
1976
and a record with the same unique key is found, the other record is
1977
necessarily marked deleted by a committed transaction, or a unique key
1978
violation error occurs. The delete marked record is then updated to an
1979
existing record, and we must write an undo log record on the delete
1980
marked record. If the index is secondary, and a record with exactly the
1981
same fields is found, the other record is necessarily marked deleted.
1982
It is then unmarked. Otherwise, the entry is just inserted to the index. */
1983
static
1984
ulint
1985
row_ins_index_entry_low(
1986
/*====================*/
1987
				/* out: DB_SUCCESS, DB_LOCK_WAIT, DB_FAIL
1988
				if pessimistic retry needed, or error code */
1989
	ulint		mode,	/* in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
1990
				depending on whether we wish optimistic or
1991
				pessimistic descent down the index tree */
1992
	dict_index_t*	index,	/* in: index */
1993
	dtuple_t*	entry,	/* in: index entry to insert */
1994
	ulint		n_ext,	/* in: number of externally stored columns */
1995
	que_thr_t*	thr)	/* in: query thread */
1996
{
1997
	btr_cur_t	cursor;
1998
	ulint		ignore_sec_unique	= 0;
1999
	ulint		modify = 0; /* remove warning */
2000
	rec_t*		insert_rec;
2001
	rec_t*		rec;
2002
	ulint		err;
2003
	ulint		n_unique;
2004
	big_rec_t*	big_rec			= NULL;
2005
	mtr_t		mtr;
2006
	mem_heap_t*	heap			= NULL;
2007
2008
	log_free_check();
2009
2010
	mtr_start(&mtr);
2011
2012
	cursor.thr = thr;
2013
2014
	/* Note that we use PAGE_CUR_LE as the search mode, because then
2015
	the function will return in both low_match and up_match of the
2016
	cursor sensible values */
2017
2018
	if (!(thr_get_trx(thr)->check_unique_secondary)) {
2019
		ignore_sec_unique = BTR_IGNORE_SEC_UNIQUE;
2020
	}
2021
2022
	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2023
				    mode | BTR_INSERT | ignore_sec_unique,
2024
				    &cursor, 0, &mtr);
2025
2026
	if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2027
		/* The insertion was made to the insert buffer already during
2028
		the search: we are done */
2029
2030
		err = DB_SUCCESS;
2031
2032
		goto function_exit;
2033
	}
2034
2035
#ifdef UNIV_DEBUG
2036
	{
2037
		page_t*	page = btr_cur_get_page(&cursor);
2038
		rec_t*	first_rec = page_rec_get_next(
2039
			page_get_infimum_rec(page));
2040
2041
		ut_ad(page_rec_is_supremum(first_rec)
2042
		      || rec_get_n_fields(first_rec, index)
2043
		      == dtuple_get_n_fields(entry));
2044
	}
2045
#endif
2046
2047
	n_unique = dict_index_get_n_unique(index);
2048
2049
	if (dict_index_is_unique(index) && (cursor.up_match >= n_unique
2050
					    || cursor.low_match >= n_unique)) {
2051
2052
		if (dict_index_is_clust(index)) {
2053
			/* Note that the following may return also
2054
			DB_LOCK_WAIT */
2055
2056
			err = row_ins_duplicate_error_in_clust(
2057
				&cursor, entry, thr, &mtr);
2058
			if (err != DB_SUCCESS) {
2059
2060
				goto function_exit;
2061
			}
2062
		} else {
2063
			mtr_commit(&mtr);
2064
			err = row_ins_scan_sec_index_for_duplicate(
2065
				index, entry, thr);
2066
			mtr_start(&mtr);
2067
2068
			if (err != DB_SUCCESS) {
2069
2070
				goto function_exit;
2071
			}
2072
2073
			/* We did not find a duplicate and we have now
2074
			locked with s-locks the necessary records to
2075
			prevent any insertion of a duplicate by another
2076
			transaction. Let us now reposition the cursor and
2077
			continue the insertion. */
2078
2079
			btr_cur_search_to_nth_level(index, 0, entry,
2080
						    PAGE_CUR_LE,
2081
						    mode | BTR_INSERT,
2082
						    &cursor, 0, &mtr);
2083
		}
2084
	}
2085
2086
	modify = row_ins_must_modify(&cursor);
2087
2088
	if (modify != 0) {
2089
		/* There is already an index entry with a long enough common
2090
		prefix, we must convert the insert into a modify of an
2091
		existing record */
2092
2093
		if (modify == ROW_INS_NEXT) {
2094
			rec = page_rec_get_next(btr_cur_get_rec(&cursor));
2095
2096
			btr_cur_position(index, rec,
2097
					 btr_cur_get_block(&cursor),&cursor);
2098
		}
2099
2100
		if (dict_index_is_clust(index)) {
2101
			err = row_ins_clust_index_entry_by_modify(
2102
				mode, &cursor, &heap, &big_rec, entry,
2103
				thr, &mtr);
2104
		} else {
2105
			ut_ad(!n_ext);
2106
			err = row_ins_sec_index_entry_by_modify(
2107
				mode, &cursor, entry, thr, &mtr);
2108
		}
2109
	} else {
2110
		if (mode == BTR_MODIFY_LEAF) {
2111
			err = btr_cur_optimistic_insert(
2112
				0, &cursor, entry, &insert_rec, &big_rec,
2113
				n_ext, thr, &mtr);
2114
		} else {
2115
			ut_a(mode == BTR_MODIFY_TREE);
2116
			if (buf_LRU_buf_pool_running_out()) {
2117
2118
				err = DB_LOCK_TABLE_FULL;
2119
2120
				goto function_exit;
2121
			}
2122
			err = btr_cur_pessimistic_insert(
2123
				0, &cursor, entry, &insert_rec, &big_rec,
2124
				n_ext, thr, &mtr);
2125
		}
2126
	}
2127
2128
function_exit:
2129
	mtr_commit(&mtr);
2130
2131
	if (UNIV_LIKELY_NULL(big_rec)) {
2132
		rec_t*	rec;
2133
		ulint*	offsets;
2134
		mtr_start(&mtr);
2135
2136
		btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2137
					    BTR_MODIFY_TREE, &cursor, 0, &mtr);
2138
		rec = btr_cur_get_rec(&cursor);
2139
		offsets = rec_get_offsets(rec, index, NULL,
2140
					  ULINT_UNDEFINED, &heap);
2141
2142
		err = btr_store_big_rec_extern_fields(
2143
			index, btr_cur_get_block(&cursor),
2144
			rec, offsets, big_rec, &mtr);
2145
2146
		if (modify) {
2147
			dtuple_big_rec_free(big_rec);
2148
		} else {
2149
			dtuple_convert_back_big_rec(index, entry, big_rec);
2150
		}
2151
2152
		mtr_commit(&mtr);
2153
	}
2154
2155
	if (UNIV_LIKELY_NULL(heap)) {
2156
		mem_heap_free(heap);
2157
	}
2158
	return(err);
2159
}
2160
2161
/*******************************************************************
2162
Inserts an index entry to index. Tries first optimistic, then pessimistic
2163
descent down the tree. If the entry matches enough to a delete marked record,
2164
performs the insert by updating or delete unmarking the delete marked
2165
record. */
2166
UNIV_INTERN
2167
ulint
2168
row_ins_index_entry(
2169
/*================*/
2170
				/* out: DB_SUCCESS, DB_LOCK_WAIT,
2171
				DB_DUPLICATE_KEY, or some other error code */
2172
	dict_index_t*	index,	/* in: index */
2173
	dtuple_t*	entry,	/* in: index entry to insert */
2174
	ulint		n_ext,	/* in: number of externally stored columns */
2175
	ibool		foreign,/* in: TRUE=check foreign key constraints */
2176
	que_thr_t*	thr)	/* in: query thread */
2177
{
2178
	ulint	err;
2179
2180
	if (foreign && UT_LIST_GET_FIRST(index->table->foreign_list)) {
2181
		err = row_ins_check_foreign_constraints(index->table, index,
2182
							entry, thr);
2183
		if (err != DB_SUCCESS) {
2184
2185
			return(err);
2186
		}
2187
	}
2188
2189
	/* Try first optimistic descent to the B-tree */
2190
2191
	err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry,
2192
				      n_ext, thr);
2193
	if (err != DB_FAIL) {
2194
2195
		return(err);
2196
	}
2197
2198
	/* Try then pessimistic descent to the B-tree */
2199
2200
	err = row_ins_index_entry_low(BTR_MODIFY_TREE, index, entry,
2201
				      n_ext, thr);
2202
	return(err);
2203
}
2204
2205
/***************************************************************
2206
Sets the values of the dtuple fields in entry from the values of appropriate
2207
columns in row. */
2208
static
2209
void
2210
row_ins_index_entry_set_vals(
2211
/*=========================*/
2212
	dict_index_t*	index,	/* in: index */
2213
	dtuple_t*	entry,	/* in: index entry to make */
2214
	const dtuple_t*	row)	/* in: row */
2215
{
2216
	ulint	n_fields;
2217
	ulint	i;
2218
2219
	ut_ad(entry && row);
2220
2221
	n_fields = dtuple_get_n_fields(entry);
2222
2223
	for (i = 0; i < n_fields; i++) {
2224
		dict_field_t*	ind_field;
2225
		dfield_t*	field;
2226
		const dfield_t*	row_field;
2227
		ulint		len;
2228
2229
		field = dtuple_get_nth_field(entry, i);
2230
		ind_field = dict_index_get_nth_field(index, i);
2231
		row_field = dtuple_get_nth_field(row, ind_field->col->ind);
2232
		len = dfield_get_len(row_field);
2233
2234
		/* Check column prefix indexes */
2235
		if (ind_field->prefix_len > 0
2236
		    && dfield_get_len(row_field) != UNIV_SQL_NULL) {
2237
2238
			const	dict_col_t*	col
2239
				= dict_field_get_col(ind_field);
2240
2241
			len = dtype_get_at_most_n_mbchars(
2242
				col->prtype, col->mbminlen, col->mbmaxlen,
2243
				ind_field->prefix_len,
2244
				len, dfield_get_data(row_field));
2245
2246
			ut_ad(!dfield_is_ext(row_field));
2247
		}
2248
2249
		dfield_set_data(field, dfield_get_data(row_field), len);
2250
		if (dfield_is_ext(row_field)) {
2251
			ut_ad(dict_index_is_clust(index));
2252
			dfield_set_ext(field);
2253
		}
2254
	}
2255
}
2256
2257
/***************************************************************
2258
Inserts a single index entry to the table. */
2259
static
2260
ulint
2261
row_ins_index_entry_step(
2262
/*=====================*/
2263
				/* out: DB_SUCCESS if operation successfully
2264
				completed, else error code or DB_LOCK_WAIT */
2265
	ins_node_t*	node,	/* in: row insert node */
2266
	que_thr_t*	thr)	/* in: query thread */
2267
{
2268
	ulint	err;
2269
2270
	ut_ad(dtuple_check_typed(node->row));
2271
2272
	row_ins_index_entry_set_vals(node->index, node->entry, node->row);
2273
2274
	ut_ad(dtuple_check_typed(node->entry));
2275
2276
	err = row_ins_index_entry(node->index, node->entry, 0, TRUE, thr);
2277
2278
	return(err);
2279
}
2280
2281
/***************************************************************
2282
Allocates a row id for row and inits the node->index field. */
2283
UNIV_INLINE
2284
void
2285
row_ins_alloc_row_id_step(
2286
/*======================*/
2287
	ins_node_t*	node)	/* in: row insert node */
2288
{
2289
	dulint	row_id;
2290
2291
	ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
2292
2293
	if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
2294
2295
		/* No row id is stored if the clustered index is unique */
2296
2297
		return;
2298
	}
2299
2300
	/* Fill in row id value to row */
2301
2302
	row_id = dict_sys_get_new_row_id();
2303
2304
	dict_sys_write_row_id(node->row_id_buf, row_id);
2305
}
2306
2307
/***************************************************************
2308
Gets a row to insert from the values list. */
2309
UNIV_INLINE
2310
void
2311
row_ins_get_row_from_values(
2312
/*========================*/
2313
	ins_node_t*	node)	/* in: row insert node */
2314
{
2315
	que_node_t*	list_node;
2316
	dfield_t*	dfield;
2317
	dtuple_t*	row;
2318
	ulint		i;
2319
2320
	/* The field values are copied in the buffers of the select node and
2321
	it is safe to use them until we fetch from select again: therefore
2322
	we can just copy the pointers */
2323
2324
	row = node->row;
2325
2326
	i = 0;
2327
	list_node = node->values_list;
2328
2329
	while (list_node) {
2330
		eval_exp(list_node);
2331
2332
		dfield = dtuple_get_nth_field(row, i);
2333
		dfield_copy_data(dfield, que_node_get_val(list_node));
2334
2335
		i++;
2336
		list_node = que_node_get_next(list_node);
2337
	}
2338
}
2339
2340
/***************************************************************
2341
Gets a row to insert from the select list. */
2342
UNIV_INLINE
2343
void
2344
row_ins_get_row_from_select(
2345
/*========================*/
2346
	ins_node_t*	node)	/* in: row insert node */
2347
{
2348
	que_node_t*	list_node;
2349
	dfield_t*	dfield;
2350
	dtuple_t*	row;
2351
	ulint		i;
2352
2353
	/* The field values are copied in the buffers of the select node and
2354
	it is safe to use them until we fetch from select again: therefore
2355
	we can just copy the pointers */
2356
2357
	row = node->row;
2358
2359
	i = 0;
2360
	list_node = node->select->select_list;
2361
2362
	while (list_node) {
2363
		dfield = dtuple_get_nth_field(row, i);
2364
		dfield_copy_data(dfield, que_node_get_val(list_node));
2365
2366
		i++;
2367
		list_node = que_node_get_next(list_node);
2368
	}
2369
}
2370
2371
/***************************************************************
2372
Inserts a row to a table. */
2373
static
2374
ulint
2375
row_ins(
2376
/*====*/
2377
				/* out: DB_SUCCESS if operation successfully
2378
				completed, else error code or DB_LOCK_WAIT */
2379
	ins_node_t*	node,	/* in: row insert node */
2380
	que_thr_t*	thr)	/* in: query thread */
2381
{
2382
	ulint	err;
2383
2384
	ut_ad(node && thr);
2385
2386
	if (node->state == INS_NODE_ALLOC_ROW_ID) {
2387
2388
		row_ins_alloc_row_id_step(node);
2389
2390
		node->index = dict_table_get_first_index(node->table);
2391
		node->entry = UT_LIST_GET_FIRST(node->entry_list);
2392
2393
		if (node->ins_type == INS_SEARCHED) {
2394
2395
			row_ins_get_row_from_select(node);
2396
2397
		} else if (node->ins_type == INS_VALUES) {
2398
2399
			row_ins_get_row_from_values(node);
2400
		}
2401
2402
		node->state = INS_NODE_INSERT_ENTRIES;
2403
	}
2404
2405
	ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
2406
2407
	while (node->index != NULL) {
2408
		err = row_ins_index_entry_step(node, thr);
2409
2410
		if (err != DB_SUCCESS) {
2411
2412
			return(err);
2413
		}
2414
2415
		node->index = dict_table_get_next_index(node->index);
2416
		node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
2417
	}
2418
2419
	ut_ad(node->entry == NULL);
2420
2421
	node->state = INS_NODE_ALLOC_ROW_ID;
2422
2423
	return(DB_SUCCESS);
2424
}
2425
2426
/***************************************************************
2427
Inserts a row to a table. This is a high-level function used in SQL execution
2428
graphs. */
2429
UNIV_INTERN
2430
que_thr_t*
2431
row_ins_step(
2432
/*=========*/
2433
				/* out: query thread to run next or NULL */
2434
	que_thr_t*	thr)	/* in: query thread */
2435
{
2436
	ins_node_t*	node;
2437
	que_node_t*	parent;
2438
	sel_node_t*	sel_node;
2439
	trx_t*		trx;
2440
	ulint		err;
2441
2442
	ut_ad(thr);
2443
2444
	trx = thr_get_trx(thr);
2445
2446
	trx_start_if_not_started(trx);
2447
2448
	node = thr->run_node;
2449
2450
	ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
2451
2452
	parent = que_node_get_parent(node);
2453
	sel_node = node->select;
2454
2455
	if (thr->prev_node == parent) {
2456
		node->state = INS_NODE_SET_IX_LOCK;
2457
	}
2458
2459
	/* If this is the first time this node is executed (or when
2460
	execution resumes after wait for the table IX lock), set an
2461
	IX lock on the table and reset the possible select node. MySQL's
2462
	partitioned table code may also call an insert within the same
2463
	SQL statement AFTER it has used this table handle to do a search.
2464
	This happens, for example, when a row update moves it to another
2465
	partition. In that case, we have already set the IX lock on the
2466
	table during the search operation, and there is no need to set
2467
	it again here. But we must write trx->id to node->trx_id_buf. */
2468
2469
	trx_write_trx_id(node->trx_id_buf, trx->id);
2470
2471
	if (node->state == INS_NODE_SET_IX_LOCK) {
2472
2473
		/* It may be that the current session has not yet started
2474
		its transaction, or it has been committed: */
2475
2476
		if (UT_DULINT_EQ(trx->id, node->trx_id)) {
2477
			/* No need to do IX-locking */
2478
2479
			goto same_trx;
2480
		}
2481
2482
		err = lock_table(0, node->table, LOCK_IX, thr);
2483
2484
		if (err != DB_SUCCESS) {
2485
2486
			goto error_handling;
2487
		}
2488
2489
		node->trx_id = trx->id;
2490
same_trx:
2491
		node->state = INS_NODE_ALLOC_ROW_ID;
2492
2493
		if (node->ins_type == INS_SEARCHED) {
2494
			/* Reset the cursor */
2495
			sel_node->state = SEL_NODE_OPEN;
2496
2497
			/* Fetch a row to insert */
2498
2499
			thr->run_node = sel_node;
2500
2501
			return(thr);
2502
		}
2503
	}
2504
2505
	if ((node->ins_type == INS_SEARCHED)
2506
	    && (sel_node->state != SEL_NODE_FETCH)) {
2507
2508
		ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
2509
2510
		/* No more rows to insert */
2511
		thr->run_node = parent;
2512
2513
		return(thr);
2514
	}
2515
2516
	/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
2517
2518
	err = row_ins(node, thr);
2519
2520
error_handling:
2521
	trx->error_state = err;
2522
2523
	if (err != DB_SUCCESS) {
2524
		/* err == DB_LOCK_WAIT or SQL error detected */
2525
		return(NULL);
2526
	}
2527
2528
	/* DO THE TRIGGER ACTIONS HERE */
2529
2530
	if (node->ins_type == INS_SEARCHED) {
2531
		/* Fetch a row to insert */
2532
2533
		thr->run_node = sel_node;
2534
	} else {
2535
		thr->run_node = que_node_get_parent(node);
2536
	}
2537
2538
	return(thr);
2539
}