~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/innobase/page/page0cur.c

Tags: innodb-plugin-1.0.1
Imported 1.0.1 with clean - with no changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/************************************************************************
 
2
The page cursor
 
3
 
 
4
(c) 1994-1996 Innobase Oy
 
5
 
 
6
Created 10/4/1994 Heikki Tuuri
 
7
*************************************************************************/
 
8
 
 
9
#include "page0cur.h"
 
10
#ifdef UNIV_NONINL
 
11
#include "page0cur.ic"
 
12
#endif
 
13
 
 
14
#include "page0zip.h"
 
15
#include "rem0cmp.h"
 
16
#include "mtr0log.h"
 
17
#include "log0recv.h"
 
18
#include "rem0cmp.h"
 
19
 
 
20
static ulint    page_rnd        = 976722341;
 
21
 
 
22
#ifdef PAGE_CUR_ADAPT
 
23
# ifdef UNIV_SEARCH_PERF_STAT
 
24
static ulint    page_cur_short_succ     = 0;
 
25
# endif /* UNIV_SEARCH_PERF_STAT */
 
26
 
 
27
/********************************************************************
 
28
Tries a search shortcut based on the last insert. */
 
29
UNIV_INLINE
 
30
ibool
 
31
page_cur_try_search_shortcut(
 
32
/*=========================*/
 
33
                                        /* out: TRUE on success */
 
34
        const buf_block_t*      block,  /* in: index page */
 
35
        const dict_index_t*     index,  /* in: record descriptor */
 
36
        const dtuple_t*         tuple,  /* in: data tuple */
 
37
        ulint*                  iup_matched_fields,
 
38
                                        /* in/out: already matched
 
39
                                        fields in upper limit record */
 
40
        ulint*                  iup_matched_bytes,
 
41
                                        /* in/out: already matched
 
42
                                        bytes in a field not yet
 
43
                                        completely matched */
 
44
        ulint*                  ilow_matched_fields,
 
45
                                        /* in/out: already matched
 
46
                                        fields in lower limit record */
 
47
        ulint*                  ilow_matched_bytes,
 
48
                                        /* in/out: already matched
 
49
                                        bytes in a field not yet
 
50
                                        completely matched */
 
51
        page_cur_t*             cursor) /* out: page cursor */
 
52
{
 
53
        const rec_t*    rec;
 
54
        const rec_t*    next_rec;
 
55
        ulint           low_match;
 
56
        ulint           low_bytes;
 
57
        ulint           up_match;
 
58
        ulint           up_bytes;
 
59
#ifdef UNIV_SEARCH_DEBUG
 
60
        page_cur_t      cursor2;
 
61
#endif
 
62
        ibool           success         = FALSE;
 
63
        const page_t*   page            = buf_block_get_frame(block);
 
64
        mem_heap_t*     heap            = NULL;
 
65
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
66
        ulint*          offsets         = offsets_;
 
67
        rec_offs_init(offsets_);
 
68
 
 
69
        ut_ad(dtuple_check_typed(tuple));
 
70
 
 
71
        rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
 
72
        offsets = rec_get_offsets(rec, index, offsets,
 
73
                                  dtuple_get_n_fields(tuple), &heap);
 
74
 
 
75
        ut_ad(rec);
 
76
        ut_ad(page_rec_is_user_rec(rec));
 
77
 
 
78
        ut_pair_min(&low_match, &low_bytes,
 
79
                    *ilow_matched_fields, *ilow_matched_bytes,
 
80
                    *iup_matched_fields, *iup_matched_bytes);
 
81
 
 
82
        up_match = low_match;
 
83
        up_bytes = low_bytes;
 
84
 
 
85
        if (page_cmp_dtuple_rec_with_match(tuple, rec, offsets,
 
86
                                           &low_match, &low_bytes) < 0) {
 
87
                goto exit_func;
 
88
        }
 
89
 
 
90
        next_rec = page_rec_get_next_const(rec);
 
91
        offsets = rec_get_offsets(next_rec, index, offsets,
 
92
                                  dtuple_get_n_fields(tuple), &heap);
 
93
 
 
94
        if (page_cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
 
95
                                           &up_match, &up_bytes) >= 0) {
 
96
                goto exit_func;
 
97
        }
 
98
 
 
99
        page_cur_position(rec, block, cursor);
 
100
 
 
101
#ifdef UNIV_SEARCH_DEBUG
 
102
        page_cur_search_with_match(block, index, tuple, PAGE_CUR_DBG,
 
103
                                   iup_matched_fields,
 
104
                                   iup_matched_bytes,
 
105
                                   ilow_matched_fields,
 
106
                                   ilow_matched_bytes,
 
107
                                   &cursor2);
 
108
        ut_a(cursor2.rec == cursor->rec);
 
109
 
 
110
        if (!page_rec_is_supremum(next_rec)) {
 
111
 
 
112
                ut_a(*iup_matched_fields == up_match);
 
113
                ut_a(*iup_matched_bytes == up_bytes);
 
114
        }
 
115
 
 
116
        ut_a(*ilow_matched_fields == low_match);
 
117
        ut_a(*ilow_matched_bytes == low_bytes);
 
118
#endif
 
119
        if (!page_rec_is_supremum(next_rec)) {
 
120
 
 
121
                *iup_matched_fields = up_match;
 
122
                *iup_matched_bytes = up_bytes;
 
123
        }
 
124
 
 
125
        *ilow_matched_fields = low_match;
 
126
        *ilow_matched_bytes = low_bytes;
 
127
 
 
128
#ifdef UNIV_SEARCH_PERF_STAT
 
129
        page_cur_short_succ++;
 
130
#endif
 
131
        success = TRUE;
 
132
exit_func:
 
133
        if (UNIV_LIKELY_NULL(heap)) {
 
134
                mem_heap_free(heap);
 
135
        }
 
136
        return(success);
 
137
}
 
138
 
 
139
#endif
 
140
 
 
141
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
142
/********************************************************************
 
143
Checks if the nth field in a record is a character type field which extends
 
144
the nth field in tuple, i.e., the field is longer or equal in length and has
 
145
common first characters. */
 
146
static
 
147
ibool
 
148
page_cur_rec_field_extends(
 
149
/*=======================*/
 
150
                                /* out: TRUE if rec field
 
151
                                extends tuple field */
 
152
        const dtuple_t* tuple,  /* in: data tuple */
 
153
        const rec_t*    rec,    /* in: record */
 
154
        const ulint*    offsets,/* in: array returned by rec_get_offsets() */
 
155
        ulint           n)      /* in: compare nth field */
 
156
{
 
157
        const dtype_t*  type;
 
158
        const dfield_t* dfield;
 
159
        const byte*     rec_f;
 
160
        ulint           rec_f_len;
 
161
 
 
162
        ut_ad(rec_offs_validate(rec, NULL, offsets));
 
163
        dfield = dtuple_get_nth_field(tuple, n);
 
164
 
 
165
        type = dfield_get_type(dfield);
 
166
 
 
167
        rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
 
168
 
 
169
        if (type->mtype == DATA_VARCHAR
 
170
            || type->mtype == DATA_CHAR
 
171
            || type->mtype == DATA_FIXBINARY
 
172
            || type->mtype == DATA_BINARY
 
173
            || type->mtype == DATA_BLOB
 
174
            || type->mtype == DATA_VARMYSQL
 
175
            || type->mtype == DATA_MYSQL) {
 
176
 
 
177
                if (dfield_get_len(dfield) != UNIV_SQL_NULL
 
178
                    && rec_f_len != UNIV_SQL_NULL
 
179
                    && rec_f_len >= dfield_get_len(dfield)
 
180
                    && !cmp_data_data_slow(type->mtype, type->prtype,
 
181
                                           dfield_get_data(dfield),
 
182
                                           dfield_get_len(dfield),
 
183
                                           rec_f, dfield_get_len(dfield))) {
 
184
 
 
185
                        return(TRUE);
 
186
                }
 
187
        }
 
188
 
 
189
        return(FALSE);
 
190
}
 
191
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
192
 
 
193
/********************************************************************
 
194
Searches the right position for a page cursor. */
 
195
UNIV_INTERN
 
196
void
 
197
page_cur_search_with_match(
 
198
/*=======================*/
 
199
        const buf_block_t*      block,  /* in: buffer block */
 
200
        const dict_index_t*     index,  /* in: record descriptor */
 
201
        const dtuple_t*         tuple,  /* in: data tuple */
 
202
        ulint                   mode,   /* in: PAGE_CUR_L,
 
203
                                        PAGE_CUR_LE, PAGE_CUR_G, or
 
204
                                        PAGE_CUR_GE */
 
205
        ulint*                  iup_matched_fields,
 
206
                                        /* in/out: already matched
 
207
                                        fields in upper limit record */
 
208
        ulint*                  iup_matched_bytes,
 
209
                                        /* in/out: already matched
 
210
                                        bytes in a field not yet
 
211
                                        completely matched */
 
212
        ulint*                  ilow_matched_fields,
 
213
                                        /* in/out: already matched
 
214
                                        fields in lower limit record */
 
215
        ulint*                  ilow_matched_bytes,
 
216
                                        /* in/out: already matched
 
217
                                        bytes in a field not yet
 
218
                                        completely matched */
 
219
        page_cur_t*             cursor) /* out: page cursor */
 
220
{
 
221
        ulint           up;
 
222
        ulint           low;
 
223
        ulint           mid;
 
224
        const page_t*   page;
 
225
        const page_dir_slot_t* slot;
 
226
        const rec_t*    up_rec;
 
227
        const rec_t*    low_rec;
 
228
        const rec_t*    mid_rec;
 
229
        ulint           up_matched_fields;
 
230
        ulint           up_matched_bytes;
 
231
        ulint           low_matched_fields;
 
232
        ulint           low_matched_bytes;
 
233
        ulint           cur_matched_fields;
 
234
        ulint           cur_matched_bytes;
 
235
        int             cmp;
 
236
#ifdef UNIV_SEARCH_DEBUG
 
237
        int             dbg_cmp;
 
238
        ulint           dbg_matched_fields;
 
239
        ulint           dbg_matched_bytes;
 
240
#endif
 
241
#ifdef UNIV_ZIP_DEBUG
 
242
        const page_zip_des_t*   page_zip = buf_block_get_page_zip(block);
 
243
#endif /* UNIV_ZIP_DEBUG */
 
244
        mem_heap_t*     heap            = NULL;
 
245
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
246
        ulint*          offsets         = offsets_;
 
247
        rec_offs_init(offsets_);
 
248
 
 
249
        ut_ad(block && tuple && iup_matched_fields && iup_matched_bytes
 
250
              && ilow_matched_fields && ilow_matched_bytes && cursor);
 
251
        ut_ad(dtuple_validate(tuple));
 
252
#ifdef UNIV_DEBUG
 
253
# ifdef PAGE_CUR_DBG
 
254
        if (mode != PAGE_CUR_DBG)
 
255
# endif /* PAGE_CUR_DBG */
 
256
# ifdef PAGE_CUR_LE_OR_EXTENDS
 
257
                if (mode != PAGE_CUR_LE_OR_EXTENDS)
 
258
# endif /* PAGE_CUR_LE_OR_EXTENDS */
 
259
                        ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
 
260
                              || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
 
261
#endif /* UNIV_DEBUG */
 
262
        page = buf_block_get_frame(block);
 
263
#ifdef UNIV_ZIP_DEBUG
 
264
        ut_a(!page_zip || page_zip_validate(page_zip, page));
 
265
#endif /* UNIV_ZIP_DEBUG */
 
266
 
 
267
        page_check_dir(page);
 
268
 
 
269
#ifdef PAGE_CUR_ADAPT
 
270
        if (page_is_leaf(page)
 
271
            && (mode == PAGE_CUR_LE)
 
272
            && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
 
273
            && (page_header_get_ptr(page, PAGE_LAST_INSERT))
 
274
            && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
 
275
 
 
276
                if (page_cur_try_search_shortcut(
 
277
                            block, index, tuple,
 
278
                            iup_matched_fields, iup_matched_bytes,
 
279
                            ilow_matched_fields, ilow_matched_bytes,
 
280
                            cursor)) {
 
281
                        return;
 
282
                }
 
283
        }
 
284
# ifdef PAGE_CUR_DBG
 
285
        if (mode == PAGE_CUR_DBG) {
 
286
                mode = PAGE_CUR_LE;
 
287
        }
 
288
# endif
 
289
#endif
 
290
 
 
291
        /* The following flag does not work for non-latin1 char sets because
 
292
        cmp_full_field does not tell how many bytes matched */
 
293
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
294
        ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
 
295
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
296
 
 
297
        /* If mode PAGE_CUR_G is specified, we are trying to position the
 
298
        cursor to answer a query of the form "tuple < X", where tuple is
 
299
        the input parameter, and X denotes an arbitrary physical record on
 
300
        the page. We want to position the cursor on the first X which
 
301
        satisfies the condition. */
 
302
 
 
303
        up_matched_fields  = *iup_matched_fields;
 
304
        up_matched_bytes   = *iup_matched_bytes;
 
305
        low_matched_fields = *ilow_matched_fields;
 
306
        low_matched_bytes  = *ilow_matched_bytes;
 
307
 
 
308
        /* Perform binary search. First the search is done through the page
 
309
        directory, after that as a linear search in the list of records
 
310
        owned by the upper limit directory slot. */
 
311
 
 
312
        low = 0;
 
313
        up = page_dir_get_n_slots(page) - 1;
 
314
 
 
315
        /* Perform binary search until the lower and upper limit directory
 
316
        slots come to the distance 1 of each other */
 
317
 
 
318
        while (up - low > 1) {
 
319
                mid = (low + up) / 2;
 
320
                slot = page_dir_get_nth_slot(page, mid);
 
321
                mid_rec = page_dir_slot_get_rec(slot);
 
322
 
 
323
                ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
 
324
                            low_matched_fields, low_matched_bytes,
 
325
                            up_matched_fields, up_matched_bytes);
 
326
 
 
327
                offsets = rec_get_offsets(mid_rec, index, offsets,
 
328
                                          dtuple_get_n_fields_cmp(tuple),
 
329
                                          &heap);
 
330
 
 
331
                cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
 
332
                                                &cur_matched_fields,
 
333
                                                &cur_matched_bytes);
 
334
                if (UNIV_LIKELY(cmp > 0)) {
 
335
low_slot_match:
 
336
                        low = mid;
 
337
                        low_matched_fields = cur_matched_fields;
 
338
                        low_matched_bytes = cur_matched_bytes;
 
339
 
 
340
                } else if (UNIV_EXPECT(cmp, -1)) {
 
341
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
342
                        if (mode == PAGE_CUR_LE_OR_EXTENDS
 
343
                            && page_cur_rec_field_extends(
 
344
                                    tuple, mid_rec, offsets,
 
345
                                    cur_matched_fields)) {
 
346
 
 
347
                                goto low_slot_match;
 
348
                        }
 
349
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
350
up_slot_match:
 
351
                        up = mid;
 
352
                        up_matched_fields = cur_matched_fields;
 
353
                        up_matched_bytes = cur_matched_bytes;
 
354
 
 
355
                } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
 
356
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
357
                           || mode == PAGE_CUR_LE_OR_EXTENDS
 
358
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
359
                           ) {
 
360
 
 
361
                        goto low_slot_match;
 
362
                } else {
 
363
 
 
364
                        goto up_slot_match;
 
365
                }
 
366
        }
 
367
 
 
368
        slot = page_dir_get_nth_slot(page, low);
 
369
        low_rec = page_dir_slot_get_rec(slot);
 
370
        slot = page_dir_get_nth_slot(page, up);
 
371
        up_rec = page_dir_slot_get_rec(slot);
 
372
 
 
373
        /* Perform linear search until the upper and lower records come to
 
374
        distance 1 of each other. */
 
375
 
 
376
        while (page_rec_get_next_const(low_rec) != up_rec) {
 
377
 
 
378
                mid_rec = page_rec_get_next_const(low_rec);
 
379
 
 
380
                ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
 
381
                            low_matched_fields, low_matched_bytes,
 
382
                            up_matched_fields, up_matched_bytes);
 
383
 
 
384
                offsets = rec_get_offsets(mid_rec, index, offsets,
 
385
                                          dtuple_get_n_fields_cmp(tuple),
 
386
                                          &heap);
 
387
 
 
388
                cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
 
389
                                                &cur_matched_fields,
 
390
                                                &cur_matched_bytes);
 
391
                if (UNIV_LIKELY(cmp > 0)) {
 
392
low_rec_match:
 
393
                        low_rec = mid_rec;
 
394
                        low_matched_fields = cur_matched_fields;
 
395
                        low_matched_bytes = cur_matched_bytes;
 
396
 
 
397
                } else if (UNIV_EXPECT(cmp, -1)) {
 
398
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
399
                        if (mode == PAGE_CUR_LE_OR_EXTENDS
 
400
                            && page_cur_rec_field_extends(
 
401
                                    tuple, mid_rec, offsets,
 
402
                                    cur_matched_fields)) {
 
403
 
 
404
                                goto low_rec_match;
 
405
                        }
 
406
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
407
up_rec_match:
 
408
                        up_rec = mid_rec;
 
409
                        up_matched_fields = cur_matched_fields;
 
410
                        up_matched_bytes = cur_matched_bytes;
 
411
                } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
 
412
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
413
                           || mode == PAGE_CUR_LE_OR_EXTENDS
 
414
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
415
                           ) {
 
416
 
 
417
                        goto low_rec_match;
 
418
                } else {
 
419
 
 
420
                        goto up_rec_match;
 
421
                }
 
422
        }
 
423
 
 
424
#ifdef UNIV_SEARCH_DEBUG
 
425
 
 
426
        /* Check that the lower and upper limit records have the
 
427
        right alphabetical order compared to tuple. */
 
428
        dbg_matched_fields = 0;
 
429
        dbg_matched_bytes = 0;
 
430
 
 
431
        offsets = rec_get_offsets(low_rec, index, offsets,
 
432
                                  ULINT_UNDEFINED, &heap);
 
433
        dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, low_rec, offsets,
 
434
                                                 &dbg_matched_fields,
 
435
                                                 &dbg_matched_bytes);
 
436
        if (mode == PAGE_CUR_G) {
 
437
                ut_a(dbg_cmp >= 0);
 
438
        } else if (mode == PAGE_CUR_GE) {
 
439
                ut_a(dbg_cmp == 1);
 
440
        } else if (mode == PAGE_CUR_L) {
 
441
                ut_a(dbg_cmp == 1);
 
442
        } else if (mode == PAGE_CUR_LE) {
 
443
                ut_a(dbg_cmp >= 0);
 
444
        }
 
445
 
 
446
        if (!page_rec_is_infimum(low_rec)) {
 
447
 
 
448
                ut_a(low_matched_fields == dbg_matched_fields);
 
449
                ut_a(low_matched_bytes == dbg_matched_bytes);
 
450
        }
 
451
 
 
452
        dbg_matched_fields = 0;
 
453
        dbg_matched_bytes = 0;
 
454
 
 
455
        offsets = rec_get_offsets(up_rec, index, offsets,
 
456
                                  ULINT_UNDEFINED, &heap);
 
457
        dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, up_rec, offsets,
 
458
                                                 &dbg_matched_fields,
 
459
                                                 &dbg_matched_bytes);
 
460
        if (mode == PAGE_CUR_G) {
 
461
                ut_a(dbg_cmp == -1);
 
462
        } else if (mode == PAGE_CUR_GE) {
 
463
                ut_a(dbg_cmp <= 0);
 
464
        } else if (mode == PAGE_CUR_L) {
 
465
                ut_a(dbg_cmp <= 0);
 
466
        } else if (mode == PAGE_CUR_LE) {
 
467
                ut_a(dbg_cmp == -1);
 
468
        }
 
469
 
 
470
        if (!page_rec_is_supremum(up_rec)) {
 
471
 
 
472
                ut_a(up_matched_fields == dbg_matched_fields);
 
473
                ut_a(up_matched_bytes == dbg_matched_bytes);
 
474
        }
 
475
#endif
 
476
        if (mode <= PAGE_CUR_GE) {
 
477
                page_cur_position(up_rec, block, cursor);
 
478
        } else {
 
479
                page_cur_position(low_rec, block, cursor);
 
480
        }
 
481
 
 
482
        *iup_matched_fields  = up_matched_fields;
 
483
        *iup_matched_bytes   = up_matched_bytes;
 
484
        *ilow_matched_fields = low_matched_fields;
 
485
        *ilow_matched_bytes  = low_matched_bytes;
 
486
        if (UNIV_LIKELY_NULL(heap)) {
 
487
                mem_heap_free(heap);
 
488
        }
 
489
}
 
490
 
 
491
/***************************************************************
 
492
Positions a page cursor on a randomly chosen user record on a page. If there
 
493
are no user records, sets the cursor on the infimum record. */
 
494
UNIV_INTERN
 
495
void
 
496
page_cur_open_on_rnd_user_rec(
 
497
/*==========================*/
 
498
        buf_block_t*    block,  /* in: page */
 
499
        page_cur_t*     cursor) /* out: page cursor */
 
500
{
 
501
        ulint   rnd;
 
502
        ulint   n_recs = page_get_n_recs(buf_block_get_frame(block));
 
503
 
 
504
        page_cur_set_before_first(block, cursor);
 
505
 
 
506
        if (UNIV_UNLIKELY(n_recs == 0)) {
 
507
 
 
508
                return;
 
509
        }
 
510
 
 
511
        page_rnd += 87584577;
 
512
 
 
513
        rnd = page_rnd % n_recs;
 
514
 
 
515
        do {
 
516
                page_cur_move_to_next(cursor);
 
517
        } while (rnd--);
 
518
}
 
519
 
 
520
/***************************************************************
 
521
Writes the log record of a record insert on a page. */
 
522
static
 
523
void
 
524
page_cur_insert_rec_write_log(
 
525
/*==========================*/
 
526
        rec_t*          insert_rec,     /* in: inserted physical record */
 
527
        ulint           rec_size,       /* in: insert_rec size */
 
528
        rec_t*          cursor_rec,     /* in: record the
 
529
                                        cursor is pointing to */
 
530
        dict_index_t*   index,          /* in: record descriptor */
 
531
        mtr_t*          mtr)            /* in: mini-transaction handle */
 
532
{
 
533
        ulint   cur_rec_size;
 
534
        ulint   extra_size;
 
535
        ulint   cur_extra_size;
 
536
        const byte* ins_ptr;
 
537
        byte*   log_ptr;
 
538
        const byte* log_end;
 
539
        ulint   i;
 
540
 
 
541
        ut_a(rec_size < UNIV_PAGE_SIZE);
 
542
        ut_ad(page_align(insert_rec) == page_align(cursor_rec));
 
543
        ut_ad(!page_rec_is_comp(insert_rec)
 
544
              == !dict_table_is_comp(index->table));
 
545
 
 
546
        {
 
547
                mem_heap_t*     heap            = NULL;
 
548
                ulint           cur_offs_[REC_OFFS_NORMAL_SIZE];
 
549
                ulint           ins_offs_[REC_OFFS_NORMAL_SIZE];
 
550
 
 
551
                ulint*          cur_offs;
 
552
                ulint*          ins_offs;
 
553
 
 
554
                rec_offs_init(cur_offs_);
 
555
                rec_offs_init(ins_offs_);
 
556
 
 
557
                cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
 
558
                                           ULINT_UNDEFINED, &heap);
 
559
                ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
 
560
                                           ULINT_UNDEFINED, &heap);
 
561
 
 
562
                extra_size = rec_offs_extra_size(ins_offs);
 
563
                cur_extra_size = rec_offs_extra_size(cur_offs);
 
564
                ut_ad(rec_size == rec_offs_size(ins_offs));
 
565
                cur_rec_size = rec_offs_size(cur_offs);
 
566
 
 
567
                if (UNIV_LIKELY_NULL(heap)) {
 
568
                        mem_heap_free(heap);
 
569
                }
 
570
        }
 
571
 
 
572
        ins_ptr = insert_rec - extra_size;
 
573
 
 
574
        i = 0;
 
575
 
 
576
        if (cur_extra_size == extra_size) {
 
577
                ulint           min_rec_size = ut_min(cur_rec_size, rec_size);
 
578
 
 
579
                const byte*     cur_ptr = cursor_rec - cur_extra_size;
 
580
 
 
581
                /* Find out the first byte in insert_rec which differs from
 
582
                cursor_rec; skip the bytes in the record info */
 
583
 
 
584
                do {
 
585
                        if (*ins_ptr == *cur_ptr) {
 
586
                                i++;
 
587
                                ins_ptr++;
 
588
                                cur_ptr++;
 
589
                        } else if ((i < extra_size)
 
590
                                   && (i >= extra_size
 
591
                                       - page_rec_get_base_extra_size
 
592
                                       (insert_rec))) {
 
593
                                i = extra_size;
 
594
                                ins_ptr = insert_rec;
 
595
                                cur_ptr = cursor_rec;
 
596
                        } else {
 
597
                                break;
 
598
                        }
 
599
                } while (i < min_rec_size);
 
600
        }
 
601
 
 
602
        if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
 
603
 
 
604
                if (page_rec_is_comp(insert_rec)) {
 
605
                        log_ptr = mlog_open_and_write_index(
 
606
                                mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
 
607
                                2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
 
608
                        if (UNIV_UNLIKELY(!log_ptr)) {
 
609
                                /* Logging in mtr is switched off
 
610
                                during crash recovery: in that case
 
611
                                mlog_open returns NULL */
 
612
                                return;
 
613
                        }
 
614
                } else {
 
615
                        log_ptr = mlog_open(mtr, 11
 
616
                                            + 2 + 5 + 1 + 5 + 5
 
617
                                            + MLOG_BUF_MARGIN);
 
618
                        if (UNIV_UNLIKELY(!log_ptr)) {
 
619
                                /* Logging in mtr is switched off
 
620
                                during crash recovery: in that case
 
621
                                mlog_open returns NULL */
 
622
                                return;
 
623
                        }
 
624
 
 
625
                        log_ptr = mlog_write_initial_log_record_fast(
 
626
                                insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
 
627
                }
 
628
 
 
629
                log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
 
630
                /* Write the cursor rec offset as a 2-byte ulint */
 
631
                mach_write_to_2(log_ptr, page_offset(cursor_rec));
 
632
                log_ptr += 2;
 
633
        } else {
 
634
                log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
 
635
                if (!log_ptr) {
 
636
                        /* Logging in mtr is switched off during crash
 
637
                        recovery: in that case mlog_open returns NULL */
 
638
                        return;
 
639
                }
 
640
                log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
 
641
        }
 
642
 
 
643
        if (page_rec_is_comp(insert_rec)) {
 
644
                if (UNIV_UNLIKELY
 
645
                    (rec_get_info_and_status_bits(insert_rec, TRUE)
 
646
                     != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
 
647
 
 
648
                        goto need_extra_info;
 
649
                }
 
650
        } else {
 
651
                if (UNIV_UNLIKELY
 
652
                    (rec_get_info_and_status_bits(insert_rec, FALSE)
 
653
                     != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
 
654
 
 
655
                        goto need_extra_info;
 
656
                }
 
657
        }
 
658
 
 
659
        if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
 
660
need_extra_info:
 
661
                /* Write the record end segment length
 
662
                and the extra info storage flag */
 
663
                log_ptr += mach_write_compressed(log_ptr,
 
664
                                                 2 * (rec_size - i) + 1);
 
665
 
 
666
                /* Write the info bits */
 
667
                mach_write_to_1(log_ptr,
 
668
                                rec_get_info_and_status_bits(
 
669
                                        insert_rec,
 
670
                                        page_rec_is_comp(insert_rec)));
 
671
                log_ptr++;
 
672
 
 
673
                /* Write the record origin offset */
 
674
                log_ptr += mach_write_compressed(log_ptr, extra_size);
 
675
 
 
676
                /* Write the mismatch index */
 
677
                log_ptr += mach_write_compressed(log_ptr, i);
 
678
 
 
679
                ut_a(i < UNIV_PAGE_SIZE);
 
680
                ut_a(extra_size < UNIV_PAGE_SIZE);
 
681
        } else {
 
682
                /* Write the record end segment length
 
683
                and the extra info storage flag */
 
684
                log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
 
685
        }
 
686
 
 
687
        /* Write to the log the inserted index record end segment which
 
688
        differs from the cursor record */
 
689
 
 
690
        rec_size -= i;
 
691
 
 
692
        if (log_ptr + rec_size <= log_end) {
 
693
                memcpy(log_ptr, ins_ptr, rec_size);
 
694
                mlog_close(mtr, log_ptr + rec_size);
 
695
        } else {
 
696
                mlog_close(mtr, log_ptr);
 
697
                ut_a(rec_size < UNIV_PAGE_SIZE);
 
698
                mlog_catenate_string(mtr, ins_ptr, rec_size);
 
699
        }
 
700
}
 
701
 
 
702
/***************************************************************
 
703
Parses a log record of a record insert on a page. */
 
704
UNIV_INTERN
 
705
byte*
 
706
page_cur_parse_insert_rec(
 
707
/*======================*/
 
708
                                /* out: end of log record or NULL */
 
709
        ibool           is_short,/* in: TRUE if short inserts */
 
710
        byte*           ptr,    /* in: buffer */
 
711
        byte*           end_ptr,/* in: buffer end */
 
712
        buf_block_t*    block,  /* in: page or NULL */
 
713
        dict_index_t*   index,  /* in: record descriptor */
 
714
        mtr_t*          mtr)    /* in: mtr or NULL */
 
715
{
 
716
        ulint   origin_offset;
 
717
        ulint   end_seg_len;
 
718
        ulint   mismatch_index;
 
719
        page_t* page;
 
720
        rec_t*  cursor_rec;
 
721
        byte    buf1[1024];
 
722
        byte*   buf;
 
723
        byte*   ptr2                    = ptr;
 
724
        ulint   info_and_status_bits = 0; /* remove warning */
 
725
        page_cur_t cursor;
 
726
        mem_heap_t*     heap            = NULL;
 
727
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
728
        ulint*          offsets         = offsets_;
 
729
        rec_offs_init(offsets_);
 
730
 
 
731
        page = block ? buf_block_get_frame(block) : NULL;
 
732
 
 
733
        if (is_short) {
 
734
                cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
 
735
        } else {
 
736
                ulint   offset;
 
737
 
 
738
                /* Read the cursor rec offset as a 2-byte ulint */
 
739
 
 
740
                if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
 
741
 
 
742
                        return(NULL);
 
743
                }
 
744
 
 
745
                offset = mach_read_from_2(ptr);
 
746
                ptr += 2;
 
747
 
 
748
                cursor_rec = page + offset;
 
749
 
 
750
                if (UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)) {
 
751
 
 
752
                        recv_sys->found_corrupt_log = TRUE;
 
753
 
 
754
                        return(NULL);
 
755
                }
 
756
        }
 
757
 
 
758
        ptr = mach_parse_compressed(ptr, end_ptr, &end_seg_len);
 
759
 
 
760
        if (ptr == NULL) {
 
761
 
 
762
                return(NULL);
 
763
        }
 
764
 
 
765
        if (UNIV_UNLIKELY(end_seg_len >= UNIV_PAGE_SIZE << 1)) {
 
766
                recv_sys->found_corrupt_log = TRUE;
 
767
 
 
768
                return(NULL);
 
769
        }
 
770
 
 
771
        if (end_seg_len & 0x1UL) {
 
772
                /* Read the info bits */
 
773
 
 
774
                if (end_ptr < ptr + 1) {
 
775
 
 
776
                        return(NULL);
 
777
                }
 
778
 
 
779
                info_and_status_bits = mach_read_from_1(ptr);
 
780
                ptr++;
 
781
 
 
782
                ptr = mach_parse_compressed(ptr, end_ptr, &origin_offset);
 
783
 
 
784
                if (ptr == NULL) {
 
785
 
 
786
                        return(NULL);
 
787
                }
 
788
 
 
789
                ut_a(origin_offset < UNIV_PAGE_SIZE);
 
790
 
 
791
                ptr = mach_parse_compressed(ptr, end_ptr, &mismatch_index);
 
792
 
 
793
                if (ptr == NULL) {
 
794
 
 
795
                        return(NULL);
 
796
                }
 
797
 
 
798
                ut_a(mismatch_index < UNIV_PAGE_SIZE);
 
799
        }
 
800
 
 
801
        if (UNIV_UNLIKELY(end_ptr < ptr + (end_seg_len >> 1))) {
 
802
 
 
803
                return(NULL);
 
804
        }
 
805
 
 
806
        if (!block) {
 
807
 
 
808
                return(ptr + (end_seg_len >> 1));
 
809
        }
 
810
 
 
811
        ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
 
812
        ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
 
813
 
 
814
        /* Read from the log the inserted index record end segment which
 
815
        differs from the cursor record */
 
816
 
 
817
        offsets = rec_get_offsets(cursor_rec, index, offsets,
 
818
                                  ULINT_UNDEFINED, &heap);
 
819
 
 
820
        if (!(end_seg_len & 0x1UL)) {
 
821
                info_and_status_bits = rec_get_info_and_status_bits(
 
822
                        cursor_rec, page_is_comp(page));
 
823
                origin_offset = rec_offs_extra_size(offsets);
 
824
                mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
 
825
        }
 
826
 
 
827
        end_seg_len >>= 1;
 
828
 
 
829
        if (mismatch_index + end_seg_len < sizeof buf1) {
 
830
                buf = buf1;
 
831
        } else {
 
832
                buf = mem_alloc(mismatch_index + end_seg_len);
 
833
        }
 
834
 
 
835
        /* Build the inserted record to buf */
 
836
 
 
837
        if (UNIV_UNLIKELY(mismatch_index >= UNIV_PAGE_SIZE)) {
 
838
                fprintf(stderr,
 
839
                        "Is short %lu, info_and_status_bits %lu, offset %lu, "
 
840
                        "o_offset %lu\n"
 
841
                        "mismatch index %lu, end_seg_len %lu\n"
 
842
                        "parsed len %lu\n",
 
843
                        (ulong) is_short, (ulong) info_and_status_bits,
 
844
                        (ulong) page_offset(cursor_rec),
 
845
                        (ulong) origin_offset,
 
846
                        (ulong) mismatch_index, (ulong) end_seg_len,
 
847
                        (ulong) (ptr - ptr2));
 
848
 
 
849
                fputs("Dump of 300 bytes of log:\n", stderr);
 
850
                ut_print_buf(stderr, ptr2, 300);
 
851
 
 
852
                buf_page_print(page, 0);
 
853
 
 
854
                ut_error;
 
855
        }
 
856
 
 
857
        ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
 
858
        ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
 
859
 
 
860
        if (page_is_comp(page)) {
 
861
                rec_set_info_and_status_bits(buf + origin_offset,
 
862
                                     info_and_status_bits);
 
863
        } else {
 
864
                rec_set_info_bits_old(buf + origin_offset,
 
865
                                                        info_and_status_bits);
 
866
        }
 
867
 
 
868
        page_cur_position(cursor_rec, block, &cursor);
 
869
 
 
870
        offsets = rec_get_offsets(buf + origin_offset, index, offsets,
 
871
                                  ULINT_UNDEFINED, &heap);
 
872
        if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
 
873
                                               buf + origin_offset,
 
874
                                               index, offsets, mtr))) {
 
875
                /* The redo log record should only have been written
 
876
                after the write was successful. */
 
877
                ut_error;
 
878
        }
 
879
 
 
880
        if (buf != buf1) {
 
881
 
 
882
                mem_free(buf);
 
883
        }
 
884
 
 
885
        if (UNIV_LIKELY_NULL(heap)) {
 
886
                mem_heap_free(heap);
 
887
        }
 
888
 
 
889
        return(ptr + end_seg_len);
 
890
}
 
891
 
 
892
/***************************************************************
 
893
Inserts a record next to page cursor on an uncompressed page.
 
894
Returns pointer to inserted record if succeed, i.e., enough
 
895
space available, NULL otherwise. The cursor stays at the same position. */
 
896
UNIV_INTERN
 
897
rec_t*
 
898
page_cur_insert_rec_low(
 
899
/*====================*/
 
900
                                /* out: pointer to record if succeed, NULL
 
901
                                otherwise */
 
902
        rec_t*          current_rec,/* in: pointer to current record after
 
903
                                which the new record is inserted */
 
904
        dict_index_t*   index,  /* in: record descriptor */
 
905
        const rec_t*    rec,    /* in: pointer to a physical record */
 
906
        ulint*          offsets,/* in/out: rec_get_offsets(rec, index) */
 
907
        mtr_t*          mtr)    /* in: mini-transaction handle, or NULL */
 
908
{
 
909
        byte*           insert_buf      = NULL;
 
910
        ulint           rec_size;
 
911
        page_t*         page;           /* the relevant page */
 
912
        rec_t*          last_insert;    /* cursor position at previous
 
913
                                        insert */
 
914
        rec_t*          free_rec;       /* a free record that was reused,
 
915
                                        or NULL */
 
916
        rec_t*          insert_rec;     /* inserted record */
 
917
        ulint           heap_no;        /* heap number of the inserted
 
918
                                        record */
 
919
 
 
920
        ut_ad(rec_offs_validate(rec, index, offsets));
 
921
 
 
922
        page = page_align(current_rec);
 
923
        ut_ad(dict_table_is_comp(index->table)
 
924
              == (ibool) !!page_is_comp(page));
 
925
 
 
926
        ut_ad(!page_rec_is_supremum(current_rec));
 
927
 
 
928
        /* 1. Get the size of the physical record in the page */
 
929
        rec_size = rec_offs_size(offsets);
 
930
 
 
931
#ifdef UNIV_DEBUG_VALGRIND
 
932
        {
 
933
                const void*     rec_start
 
934
                        = rec - rec_offs_extra_size(offsets);
 
935
                ulint           extra_size
 
936
                        = rec_offs_extra_size(offsets)
 
937
                        - (rec_offs_comp(offsets)
 
938
                           ? REC_N_NEW_EXTRA_BYTES
 
939
                           : REC_N_OLD_EXTRA_BYTES);
 
940
 
 
941
                /* All data bytes of the record must be valid. */
 
942
                UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
 
943
                /* The variable-length header must be valid. */
 
944
                UNIV_MEM_ASSERT_RW(rec_start, extra_size);
 
945
        }
 
946
#endif /* UNIV_DEBUG_VALGRIND */
 
947
 
 
948
        /* 2. Try to find suitable space from page memory management */
 
949
 
 
950
        free_rec = page_header_get_ptr(page, PAGE_FREE);
 
951
        if (UNIV_LIKELY_NULL(free_rec)) {
 
952
                /* Try to allocate from the head of the free list. */
 
953
                ulint           foffsets_[REC_OFFS_NORMAL_SIZE];
 
954
                ulint*          foffsets        = foffsets_;
 
955
                mem_heap_t*     heap            = NULL;
 
956
 
 
957
                rec_offs_init(foffsets_);
 
958
 
 
959
                foffsets = rec_get_offsets(free_rec, index, foffsets,
 
960
                                        ULINT_UNDEFINED, &heap);
 
961
                if (rec_offs_size(foffsets) < rec_size) {
 
962
                        if (UNIV_LIKELY_NULL(heap)) {
 
963
                                mem_heap_free(heap);
 
964
                        }
 
965
 
 
966
                        goto use_heap;
 
967
                }
 
968
 
 
969
                insert_buf = free_rec - rec_offs_extra_size(foffsets);
 
970
 
 
971
                if (page_is_comp(page)) {
 
972
                        heap_no = rec_get_heap_no_new(free_rec);
 
973
                        page_mem_alloc_free(page, NULL,
 
974
                                        rec_get_next_ptr(free_rec, TRUE),
 
975
                                        rec_size);
 
976
                } else {
 
977
                        heap_no = rec_get_heap_no_old(free_rec);
 
978
                        page_mem_alloc_free(page, NULL,
 
979
                                        rec_get_next_ptr(free_rec, FALSE),
 
980
                                        rec_size);
 
981
                }
 
982
 
 
983
                if (UNIV_LIKELY_NULL(heap)) {
 
984
                        mem_heap_free(heap);
 
985
                }
 
986
        } else {
 
987
use_heap:
 
988
                free_rec = NULL;
 
989
                insert_buf = page_mem_alloc_heap(page, NULL,
 
990
                                                 rec_size, &heap_no);
 
991
 
 
992
                if (UNIV_UNLIKELY(insert_buf == NULL)) {
 
993
                        return(NULL);
 
994
                }
 
995
        }
 
996
 
 
997
        /* 3. Create the record */
 
998
        insert_rec = rec_copy(insert_buf, rec, offsets);
 
999
        rec_offs_make_valid(insert_rec, index, offsets);
 
1000
 
 
1001
        /* 4. Insert the record in the linked list of records */
 
1002
        ut_ad(current_rec != insert_rec);
 
1003
 
 
1004
        {
 
1005
                /* next record after current before the insertion */
 
1006
                rec_t*  next_rec = page_rec_get_next(current_rec);
 
1007
#ifdef UNIV_DEBUG
 
1008
                if (page_is_comp(page)) {
 
1009
                        ut_ad(rec_get_status(current_rec)
 
1010
                                <= REC_STATUS_INFIMUM);
 
1011
                        ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
 
1012
                        ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
 
1013
                }
 
1014
#endif
 
1015
                page_rec_set_next(insert_rec, next_rec);
 
1016
                page_rec_set_next(current_rec, insert_rec);
 
1017
        }
 
1018
 
 
1019
        page_header_set_field(page, NULL, PAGE_N_RECS,
 
1020
                              1 + page_get_n_recs(page));
 
1021
 
 
1022
        /* 5. Set the n_owned field in the inserted record to zero,
 
1023
        and set the heap_no field */
 
1024
        if (page_is_comp(page)) {
 
1025
                rec_set_n_owned_new(insert_rec, NULL, 0);
 
1026
                rec_set_heap_no_new(insert_rec, heap_no);
 
1027
        } else {
 
1028
                rec_set_n_owned_old(insert_rec, 0);
 
1029
                rec_set_heap_no_old(insert_rec, heap_no);
 
1030
        }
 
1031
 
 
1032
        UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
 
1033
                           rec_offs_size(offsets));
 
1034
        /* 6. Update the last insertion info in page header */
 
1035
 
 
1036
        last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
 
1037
        ut_ad(!last_insert || !page_is_comp(page)
 
1038
              || rec_get_node_ptr_flag(last_insert)
 
1039
              == rec_get_node_ptr_flag(insert_rec));
 
1040
 
 
1041
        if (UNIV_UNLIKELY(last_insert == NULL)) {
 
1042
                page_header_set_field(page, NULL, PAGE_DIRECTION,
 
1043
                                      PAGE_NO_DIRECTION);
 
1044
                page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
 
1045
 
 
1046
        } else if ((last_insert == current_rec)
 
1047
                   && (page_header_get_field(page, PAGE_DIRECTION)
 
1048
                       != PAGE_LEFT)) {
 
1049
 
 
1050
                page_header_set_field(page, NULL, PAGE_DIRECTION,
 
1051
                                                        PAGE_RIGHT);
 
1052
                page_header_set_field(page, NULL, PAGE_N_DIRECTION,
 
1053
                                      page_header_get_field(
 
1054
                                              page, PAGE_N_DIRECTION) + 1);
 
1055
 
 
1056
        } else if ((page_rec_get_next(insert_rec) == last_insert)
 
1057
                   && (page_header_get_field(page, PAGE_DIRECTION)
 
1058
                       != PAGE_RIGHT)) {
 
1059
 
 
1060
                page_header_set_field(page, NULL, PAGE_DIRECTION,
 
1061
                                                        PAGE_LEFT);
 
1062
                page_header_set_field(page, NULL, PAGE_N_DIRECTION,
 
1063
                                      page_header_get_field(
 
1064
                                              page, PAGE_N_DIRECTION) + 1);
 
1065
        } else {
 
1066
                page_header_set_field(page, NULL, PAGE_DIRECTION,
 
1067
                                                        PAGE_NO_DIRECTION);
 
1068
                page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
 
1069
        }
 
1070
 
 
1071
        page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
 
1072
 
 
1073
        /* 7. It remains to update the owner record. */
 
1074
        {
 
1075
                rec_t*  owner_rec       = page_rec_find_owner_rec(insert_rec);
 
1076
                ulint   n_owned;
 
1077
                if (page_is_comp(page)) {
 
1078
                        n_owned = rec_get_n_owned_new(owner_rec);
 
1079
                        rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
 
1080
                } else {
 
1081
                        n_owned = rec_get_n_owned_old(owner_rec);
 
1082
                        rec_set_n_owned_old(owner_rec, n_owned + 1);
 
1083
                }
 
1084
 
 
1085
                /* 8. Now we have incremented the n_owned field of the owner
 
1086
                record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
 
1087
                we have to split the corresponding directory slot in two. */
 
1088
 
 
1089
                if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
 
1090
                        page_dir_split_slot(
 
1091
                                page, NULL,
 
1092
                                page_dir_find_owner_slot(owner_rec));
 
1093
                }
 
1094
        }
 
1095
 
 
1096
        /* 9. Write log record of the insert */
 
1097
        if (UNIV_LIKELY(mtr != NULL)) {
 
1098
                page_cur_insert_rec_write_log(insert_rec, rec_size,
 
1099
                                              current_rec, index, mtr);
 
1100
        }
 
1101
 
 
1102
        return(insert_rec);
 
1103
}
 
1104
 
 
1105
/***************************************************************
 
1106
Compresses or reorganizes a page after an optimistic insert. */
 
1107
static
 
1108
rec_t*
 
1109
page_cur_insert_rec_zip_reorg(
 
1110
/*==========================*/
 
1111
                                /* out: rec if succeed, NULL otherwise */
 
1112
        rec_t**         current_rec,/* in/out: pointer to current record after
 
1113
                                which the new record is inserted */
 
1114
        buf_block_t*    block,  /* in: buffer block */
 
1115
        dict_index_t*   index,  /* in: record descriptor */
 
1116
        rec_t*          rec,    /* in: inserted record */
 
1117
        page_t*         page,   /* in: uncompressed page */
 
1118
        page_zip_des_t* page_zip,/* in: compressed page */
 
1119
        mtr_t*          mtr)    /* in: mini-transaction, or NULL */
 
1120
{
 
1121
        ulint           pos;
 
1122
 
 
1123
        /* Recompress or reorganize and recompress the page. */
 
1124
        if (UNIV_LIKELY(page_zip_compress(page_zip, page, index, mtr))) {
 
1125
                return(rec);
 
1126
        }
 
1127
 
 
1128
        /* Before trying to reorganize the page,
 
1129
        store the number of preceding records on the page. */
 
1130
        pos = page_rec_get_n_recs_before(rec);
 
1131
 
 
1132
        if (page_zip_reorganize(block, index, mtr)) {
 
1133
                /* The page was reorganized: Find rec by seeking to pos,
 
1134
                and update *current_rec. */
 
1135
                rec = page + PAGE_NEW_INFIMUM;
 
1136
 
 
1137
                while (--pos) {
 
1138
                        rec = page + rec_get_next_offs(rec, TRUE);
 
1139
                }
 
1140
 
 
1141
                *current_rec = rec;
 
1142
                rec = page + rec_get_next_offs(rec, TRUE);
 
1143
 
 
1144
                return(rec);
 
1145
        }
 
1146
 
 
1147
        /* Out of space: restore the page */
 
1148
        if (!page_zip_decompress(page_zip, page)) {
 
1149
                ut_error; /* Memory corrupted? */
 
1150
        }
 
1151
        ut_ad(page_validate(page, index));
 
1152
        return(NULL);
 
1153
}
 
1154
 
 
1155
/***************************************************************
 
1156
Inserts a record next to page cursor on a compressed and uncompressed
 
1157
page. Returns pointer to inserted record if succeed, i.e.,
 
1158
enough space available, NULL otherwise.
 
1159
The cursor stays at the same position. */
 
1160
UNIV_INTERN
 
1161
rec_t*
 
1162
page_cur_insert_rec_zip(
 
1163
/*====================*/
 
1164
                                /* out: pointer to record if succeed, NULL
 
1165
                                otherwise */
 
1166
        rec_t**         current_rec,/* in/out: pointer to current record after
 
1167
                                which the new record is inserted */
 
1168
        buf_block_t*    block,  /* in: buffer block of *current_rec */
 
1169
        dict_index_t*   index,  /* in: record descriptor */
 
1170
        const rec_t*    rec,    /* in: pointer to a physical record */
 
1171
        ulint*          offsets,/* in/out: rec_get_offsets(rec, index) */
 
1172
        mtr_t*          mtr)    /* in: mini-transaction handle, or NULL */
 
1173
{
 
1174
        byte*           insert_buf      = NULL;
 
1175
        ulint           rec_size;
 
1176
        page_t*         page;           /* the relevant page */
 
1177
        rec_t*          last_insert;    /* cursor position at previous
 
1178
                                        insert */
 
1179
        rec_t*          free_rec;       /* a free record that was reused,
 
1180
                                        or NULL */
 
1181
        rec_t*          insert_rec;     /* inserted record */
 
1182
        ulint           heap_no;        /* heap number of the inserted
 
1183
                                        record */
 
1184
        page_zip_des_t* page_zip;
 
1185
 
 
1186
        page_zip = buf_block_get_page_zip(block);
 
1187
        ut_ad(page_zip);
 
1188
 
 
1189
        ut_ad(rec_offs_validate(rec, index, offsets));
 
1190
 
 
1191
        page = page_align(*current_rec);
 
1192
        ut_ad(dict_table_is_comp(index->table));
 
1193
        ut_ad(page_is_comp(page));
 
1194
 
 
1195
        ut_ad(!page_rec_is_supremum(*current_rec));
 
1196
#ifdef UNIV_ZIP_DEBUG
 
1197
        ut_a(page_zip_validate(page_zip, page));
 
1198
#endif /* UNIV_ZIP_DEBUG */
 
1199
 
 
1200
        /* 1. Get the size of the physical record in the page */
 
1201
        rec_size = rec_offs_size(offsets);
 
1202
 
 
1203
#ifdef UNIV_DEBUG_VALGRIND
 
1204
        {
 
1205
                const void*     rec_start
 
1206
                        = rec - rec_offs_extra_size(offsets);
 
1207
                ulint           extra_size
 
1208
                        = rec_offs_extra_size(offsets)
 
1209
                        - (rec_offs_comp(offsets)
 
1210
                           ? REC_N_NEW_EXTRA_BYTES
 
1211
                           : REC_N_OLD_EXTRA_BYTES);
 
1212
 
 
1213
                /* All data bytes of the record must be valid. */
 
1214
                UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
 
1215
                /* The variable-length header must be valid. */
 
1216
                UNIV_MEM_ASSERT_RW(rec_start, extra_size);
 
1217
        }
 
1218
#endif /* UNIV_DEBUG_VALGRIND */
 
1219
 
 
1220
        /* 2. Try to find suitable space from page memory management */
 
1221
        if (!page_zip_available(page_zip, dict_index_is_clust(index),
 
1222
                                rec_size, 1)) {
 
1223
 
 
1224
                /* Try compressing the whole page afterwards. */
 
1225
                insert_rec = page_cur_insert_rec_low(*current_rec,
 
1226
                                                     index, rec, offsets,
 
1227
                                                     NULL);
 
1228
 
 
1229
                if (UNIV_LIKELY(insert_rec != NULL)) {
 
1230
                        insert_rec = page_cur_insert_rec_zip_reorg(
 
1231
                                current_rec, block, index, insert_rec,
 
1232
                                page, page_zip, mtr);
 
1233
                }
 
1234
 
 
1235
                return(insert_rec);
 
1236
        }
 
1237
 
 
1238
        free_rec = page_header_get_ptr(page, PAGE_FREE);
 
1239
        if (UNIV_LIKELY_NULL(free_rec)) {
 
1240
                /* Try to allocate from the head of the free list. */
 
1241
                lint    extra_size_diff;
 
1242
                ulint           foffsets_[REC_OFFS_NORMAL_SIZE];
 
1243
                ulint*          foffsets        = foffsets_;
 
1244
                mem_heap_t*     heap            = NULL;
 
1245
 
 
1246
                rec_offs_init(foffsets_);
 
1247
 
 
1248
                foffsets = rec_get_offsets(free_rec, index, foffsets,
 
1249
                                        ULINT_UNDEFINED, &heap);
 
1250
                if (rec_offs_size(foffsets) < rec_size) {
 
1251
too_small:
 
1252
                        if (UNIV_LIKELY_NULL(heap)) {
 
1253
                                mem_heap_free(heap);
 
1254
                        }
 
1255
 
 
1256
                        goto use_heap;
 
1257
                }
 
1258
 
 
1259
                insert_buf = free_rec - rec_offs_extra_size(foffsets);
 
1260
 
 
1261
                /* On compressed pages, do not relocate records from
 
1262
                the free list.  If extra_size would grow, use the heap. */
 
1263
                extra_size_diff
 
1264
                        = rec_offs_extra_size(offsets)
 
1265
                        - rec_offs_extra_size(foffsets);
 
1266
 
 
1267
                if (UNIV_UNLIKELY(extra_size_diff < 0)) {
 
1268
                        /* Add an offset to the extra_size. */
 
1269
                        if (rec_offs_size(foffsets)
 
1270
                            < rec_size - extra_size_diff) {
 
1271
 
 
1272
                                goto too_small;
 
1273
                        }
 
1274
 
 
1275
                        insert_buf -= extra_size_diff;
 
1276
                } else if (UNIV_UNLIKELY(extra_size_diff)) {
 
1277
                        /* Do not allow extra_size to grow */
 
1278
 
 
1279
                        goto too_small;
 
1280
                }
 
1281
 
 
1282
                heap_no = rec_get_heap_no_new(free_rec);
 
1283
                page_mem_alloc_free(page, page_zip,
 
1284
                                    rec_get_next_ptr(free_rec, TRUE),
 
1285
                                    rec_size);
 
1286
 
 
1287
                if (UNIV_LIKELY_NULL(heap)) {
 
1288
                        mem_heap_free(heap);
 
1289
                }
 
1290
        } else {
 
1291
use_heap:
 
1292
                free_rec = NULL;
 
1293
                insert_buf = page_mem_alloc_heap(page, page_zip,
 
1294
                                                 rec_size, &heap_no);
 
1295
 
 
1296
                if (UNIV_UNLIKELY(insert_buf == NULL)) {
 
1297
                        return(NULL);
 
1298
                }
 
1299
 
 
1300
                page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
 
1301
        }
 
1302
 
 
1303
        /* 3. Create the record */
 
1304
        insert_rec = rec_copy(insert_buf, rec, offsets);
 
1305
        rec_offs_make_valid(insert_rec, index, offsets);
 
1306
 
 
1307
        /* 4. Insert the record in the linked list of records */
 
1308
        ut_ad(*current_rec != insert_rec);
 
1309
 
 
1310
        {
 
1311
                /* next record after current before the insertion */
 
1312
                rec_t*  next_rec = page_rec_get_next(*current_rec);
 
1313
                ut_ad(rec_get_status(*current_rec)
 
1314
                      <= REC_STATUS_INFIMUM);
 
1315
                ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
 
1316
                ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
 
1317
 
 
1318
                page_rec_set_next(insert_rec, next_rec);
 
1319
                page_rec_set_next(*current_rec, insert_rec);
 
1320
        }
 
1321
 
 
1322
        page_header_set_field(page, page_zip, PAGE_N_RECS,
 
1323
                              1 + page_get_n_recs(page));
 
1324
 
 
1325
        /* 5. Set the n_owned field in the inserted record to zero,
 
1326
        and set the heap_no field */
 
1327
        rec_set_n_owned_new(insert_rec, NULL, 0);
 
1328
        rec_set_heap_no_new(insert_rec, heap_no);
 
1329
 
 
1330
        UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
 
1331
                           rec_offs_size(offsets));
 
1332
 
 
1333
        page_zip_dir_insert(page_zip, *current_rec, free_rec, insert_rec);
 
1334
 
 
1335
        /* 6. Update the last insertion info in page header */
 
1336
 
 
1337
        last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
 
1338
        ut_ad(!last_insert
 
1339
              || rec_get_node_ptr_flag(last_insert)
 
1340
              == rec_get_node_ptr_flag(insert_rec));
 
1341
 
 
1342
        if (UNIV_UNLIKELY(last_insert == NULL)) {
 
1343
                page_header_set_field(page, page_zip, PAGE_DIRECTION,
 
1344
                                                        PAGE_NO_DIRECTION);
 
1345
                page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
 
1346
 
 
1347
        } else if ((last_insert == *current_rec)
 
1348
                   && (page_header_get_field(page, PAGE_DIRECTION)
 
1349
                       != PAGE_LEFT)) {
 
1350
 
 
1351
                page_header_set_field(page, page_zip, PAGE_DIRECTION,
 
1352
                                                        PAGE_RIGHT);
 
1353
                page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
 
1354
                                      page_header_get_field(
 
1355
                                              page, PAGE_N_DIRECTION) + 1);
 
1356
 
 
1357
        } else if ((page_rec_get_next(insert_rec) == last_insert)
 
1358
                   && (page_header_get_field(page, PAGE_DIRECTION)
 
1359
                       != PAGE_RIGHT)) {
 
1360
 
 
1361
                page_header_set_field(page, page_zip, PAGE_DIRECTION,
 
1362
                                                        PAGE_LEFT);
 
1363
                page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
 
1364
                                      page_header_get_field(
 
1365
                                              page, PAGE_N_DIRECTION) + 1);
 
1366
        } else {
 
1367
                page_header_set_field(page, page_zip, PAGE_DIRECTION,
 
1368
                                                        PAGE_NO_DIRECTION);
 
1369
                page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
 
1370
        }
 
1371
 
 
1372
        page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
 
1373
 
 
1374
        /* 7. It remains to update the owner record. */
 
1375
        {
 
1376
                rec_t*  owner_rec       = page_rec_find_owner_rec(insert_rec);
 
1377
                ulint   n_owned;
 
1378
 
 
1379
                n_owned = rec_get_n_owned_new(owner_rec);
 
1380
                rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
 
1381
 
 
1382
                /* 8. Now we have incremented the n_owned field of the owner
 
1383
                record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
 
1384
                we have to split the corresponding directory slot in two. */
 
1385
 
 
1386
                if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
 
1387
                        page_dir_split_slot(
 
1388
                                page, page_zip,
 
1389
                                page_dir_find_owner_slot(owner_rec));
 
1390
                }
 
1391
        }
 
1392
 
 
1393
        page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
 
1394
 
 
1395
        /* 9. Write log record of the insert */
 
1396
        if (UNIV_LIKELY(mtr != NULL)) {
 
1397
                page_cur_insert_rec_write_log(insert_rec, rec_size,
 
1398
                                              *current_rec, index, mtr);
 
1399
        }
 
1400
 
 
1401
        return(insert_rec);
 
1402
}
 
1403
 
 
1404
/**************************************************************
 
1405
Writes a log record of copying a record list end to a new created page. */
 
1406
UNIV_INLINE
 
1407
byte*
 
1408
page_copy_rec_list_to_created_page_write_log(
 
1409
/*=========================================*/
 
1410
                                /* out: 4-byte field where to
 
1411
                                write the log data length,
 
1412
                                or NULL if logging is disabled */
 
1413
        page_t*         page,   /* in: index page */
 
1414
        dict_index_t*   index,  /* in: record descriptor */
 
1415
        mtr_t*          mtr)    /* in: mtr */
 
1416
{
 
1417
        byte*   log_ptr;
 
1418
 
 
1419
        ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
 
1420
 
 
1421
        log_ptr = mlog_open_and_write_index(mtr, page, index,
 
1422
                                            page_is_comp(page)
 
1423
                                            ? MLOG_COMP_LIST_END_COPY_CREATED
 
1424
                                            : MLOG_LIST_END_COPY_CREATED, 4);
 
1425
        if (UNIV_LIKELY(log_ptr != NULL)) {
 
1426
                mlog_close(mtr, log_ptr + 4);
 
1427
        }
 
1428
 
 
1429
        return(log_ptr);
 
1430
}
 
1431
 
 
1432
/**************************************************************
 
1433
Parses a log record of copying a record list end to a new created page. */
 
1434
UNIV_INTERN
 
1435
byte*
 
1436
page_parse_copy_rec_list_to_created_page(
 
1437
/*=====================================*/
 
1438
                                /* out: end of log record or NULL */
 
1439
        byte*           ptr,    /* in: buffer */
 
1440
        byte*           end_ptr,/* in: buffer end */
 
1441
        buf_block_t*    block,  /* in: page or NULL */
 
1442
        dict_index_t*   index,  /* in: record descriptor */
 
1443
        mtr_t*          mtr)    /* in: mtr or NULL */
 
1444
{
 
1445
        byte*           rec_end;
 
1446
        ulint           log_data_len;
 
1447
        page_t*         page;
 
1448
        page_zip_des_t* page_zip;
 
1449
 
 
1450
        if (ptr + 4 > end_ptr) {
 
1451
 
 
1452
                return(NULL);
 
1453
        }
 
1454
 
 
1455
        log_data_len = mach_read_from_4(ptr);
 
1456
        ptr += 4;
 
1457
 
 
1458
        rec_end = ptr + log_data_len;
 
1459
 
 
1460
        if (rec_end > end_ptr) {
 
1461
 
 
1462
                return(NULL);
 
1463
        }
 
1464
 
 
1465
        if (!block) {
 
1466
 
 
1467
                return(rec_end);
 
1468
        }
 
1469
 
 
1470
        while (ptr < rec_end) {
 
1471
                ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
 
1472
                                                block, index, mtr);
 
1473
        }
 
1474
 
 
1475
        ut_a(ptr == rec_end);
 
1476
 
 
1477
        page = buf_block_get_frame(block);
 
1478
        page_zip = buf_block_get_page_zip(block);
 
1479
 
 
1480
        page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
 
1481
        page_header_set_field(page, page_zip, PAGE_DIRECTION,
 
1482
                                                        PAGE_NO_DIRECTION);
 
1483
        page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
 
1484
 
 
1485
        return(rec_end);
 
1486
}
 
1487
 
 
1488
/*****************************************************************
 
1489
Copies records from page to a newly created page, from a given record onward,
 
1490
including that record. Infimum and supremum records are not copied. */
 
1491
UNIV_INTERN
 
1492
void
 
1493
page_copy_rec_list_end_to_created_page(
 
1494
/*===================================*/
 
1495
        page_t*         new_page,       /* in/out: index page to copy to */
 
1496
        rec_t*          rec,            /* in: first record to copy */
 
1497
        dict_index_t*   index,          /* in: record descriptor */
 
1498
        mtr_t*          mtr)            /* in: mtr */
 
1499
{
 
1500
        page_dir_slot_t* slot = 0; /* remove warning */
 
1501
        byte*   heap_top;
 
1502
        rec_t*  insert_rec = 0; /* remove warning */
 
1503
        rec_t*  prev_rec;
 
1504
        ulint   count;
 
1505
        ulint   n_recs;
 
1506
        ulint   slot_index;
 
1507
        ulint   rec_size;
 
1508
        ulint   log_mode;
 
1509
        byte*   log_ptr;
 
1510
        ulint   log_data_len;
 
1511
        mem_heap_t*     heap            = NULL;
 
1512
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
1513
        ulint*          offsets         = offsets_;
 
1514
        rec_offs_init(offsets_);
 
1515
 
 
1516
        ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
 
1517
        ut_ad(page_align(rec) != new_page);
 
1518
        ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
 
1519
 
 
1520
        if (page_rec_is_infimum(rec)) {
 
1521
 
 
1522
                rec = page_rec_get_next(rec);
 
1523
        }
 
1524
 
 
1525
        if (page_rec_is_supremum(rec)) {
 
1526
 
 
1527
                return;
 
1528
        }
 
1529
 
 
1530
#ifdef UNIV_DEBUG
 
1531
        /* To pass the debug tests we have to set these dummy values
 
1532
        in the debug version */
 
1533
        page_dir_set_n_slots(new_page, NULL, UNIV_PAGE_SIZE / 2);
 
1534
        page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
 
1535
                            new_page + UNIV_PAGE_SIZE - 1);
 
1536
#endif
 
1537
 
 
1538
        log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
 
1539
                                                               index, mtr);
 
1540
 
 
1541
        log_data_len = dyn_array_get_data_size(&(mtr->log));
 
1542
 
 
1543
        /* Individual inserts are logged in a shorter form */
 
1544
 
 
1545
        log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
 
1546
 
 
1547
        prev_rec = page_get_infimum_rec(new_page);
 
1548
        if (page_is_comp(new_page)) {
 
1549
                heap_top = new_page + PAGE_NEW_SUPREMUM_END;
 
1550
        } else {
 
1551
                heap_top = new_page + PAGE_OLD_SUPREMUM_END;
 
1552
        }
 
1553
        count = 0;
 
1554
        slot_index = 0;
 
1555
        n_recs = 0;
 
1556
 
 
1557
        do {
 
1558
                offsets = rec_get_offsets(rec, index, offsets,
 
1559
                                          ULINT_UNDEFINED, &heap);
 
1560
                insert_rec = rec_copy(heap_top, rec, offsets);
 
1561
 
 
1562
                if (page_is_comp(new_page)) {
 
1563
                        rec_set_next_offs_new(prev_rec,
 
1564
                                              page_offset(insert_rec));
 
1565
 
 
1566
                        rec_set_n_owned_new(insert_rec, NULL, 0);
 
1567
                        rec_set_heap_no_new(insert_rec,
 
1568
                                            PAGE_HEAP_NO_USER_LOW + n_recs);
 
1569
                } else {
 
1570
                        rec_set_next_offs_old(prev_rec,
 
1571
                                              page_offset(insert_rec));
 
1572
 
 
1573
                        rec_set_n_owned_old(insert_rec, 0);
 
1574
                        rec_set_heap_no_old(insert_rec,
 
1575
                                            PAGE_HEAP_NO_USER_LOW + n_recs);
 
1576
                }
 
1577
 
 
1578
                count++;
 
1579
                n_recs++;
 
1580
 
 
1581
                if (UNIV_UNLIKELY
 
1582
                    (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
 
1583
 
 
1584
                        slot_index++;
 
1585
 
 
1586
                        slot = page_dir_get_nth_slot(new_page, slot_index);
 
1587
 
 
1588
                        page_dir_slot_set_rec(slot, insert_rec);
 
1589
                        page_dir_slot_set_n_owned(slot, NULL, count);
 
1590
 
 
1591
                        count = 0;
 
1592
                }
 
1593
 
 
1594
                rec_size = rec_offs_size(offsets);
 
1595
 
 
1596
                ut_ad(heap_top < new_page + UNIV_PAGE_SIZE);
 
1597
 
 
1598
                heap_top += rec_size;
 
1599
 
 
1600
                page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
 
1601
                                              index, mtr);
 
1602
                prev_rec = insert_rec;
 
1603
                rec = page_rec_get_next(rec);
 
1604
        } while (!page_rec_is_supremum(rec));
 
1605
 
 
1606
        if ((slot_index > 0) && (count + 1
 
1607
                                 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
 
1608
                                 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
 
1609
                /* We can merge the two last dir slots. This operation is
 
1610
                here to make this function imitate exactly the equivalent
 
1611
                task made using page_cur_insert_rec, which we use in database
 
1612
                recovery to reproduce the task performed by this function.
 
1613
                To be able to check the correctness of recovery, it is good
 
1614
                that it imitates exactly. */
 
1615
 
 
1616
                count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
 
1617
 
 
1618
                page_dir_slot_set_n_owned(slot, NULL, 0);
 
1619
 
 
1620
                slot_index--;
 
1621
        }
 
1622
 
 
1623
        if (UNIV_LIKELY_NULL(heap)) {
 
1624
                mem_heap_free(heap);
 
1625
        }
 
1626
 
 
1627
        log_data_len = dyn_array_get_data_size(&(mtr->log)) - log_data_len;
 
1628
 
 
1629
        ut_a(log_data_len < 100 * UNIV_PAGE_SIZE);
 
1630
 
 
1631
        if (UNIV_LIKELY(log_ptr != NULL)) {
 
1632
                mach_write_to_4(log_ptr, log_data_len);
 
1633
        }
 
1634
 
 
1635
        if (page_is_comp(new_page)) {
 
1636
                rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
 
1637
        } else {
 
1638
                rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
 
1639
        }
 
1640
 
 
1641
        slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
 
1642
 
 
1643
        page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
 
1644
        page_dir_slot_set_n_owned(slot, NULL, count + 1);
 
1645
 
 
1646
        page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
 
1647
        page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
 
1648
        page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
 
1649
        page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
 
1650
 
 
1651
        page_header_set_ptr(new_page, NULL, PAGE_LAST_INSERT, NULL);
 
1652
        page_header_set_field(new_page, NULL, PAGE_DIRECTION,
 
1653
                                                        PAGE_NO_DIRECTION);
 
1654
        page_header_set_field(new_page, NULL, PAGE_N_DIRECTION, 0);
 
1655
 
 
1656
        /* Restore the log mode */
 
1657
 
 
1658
        mtr_set_log_mode(mtr, log_mode);
 
1659
}
 
1660
 
 
1661
/***************************************************************
 
1662
Writes log record of a record delete on a page. */
 
1663
UNIV_INLINE
 
1664
void
 
1665
page_cur_delete_rec_write_log(
 
1666
/*==========================*/
 
1667
        rec_t*          rec,    /* in: record to be deleted */
 
1668
        dict_index_t*   index,  /* in: record descriptor */
 
1669
        mtr_t*          mtr)    /* in: mini-transaction handle */
 
1670
{
 
1671
        byte*   log_ptr;
 
1672
 
 
1673
        ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
 
1674
 
 
1675
        log_ptr = mlog_open_and_write_index(mtr, rec, index,
 
1676
                                            page_rec_is_comp(rec)
 
1677
                                            ? MLOG_COMP_REC_DELETE
 
1678
                                            : MLOG_REC_DELETE, 2);
 
1679
 
 
1680
        if (!log_ptr) {
 
1681
                /* Logging in mtr is switched off during crash recovery:
 
1682
                in that case mlog_open returns NULL */
 
1683
                return;
 
1684
        }
 
1685
 
 
1686
        /* Write the cursor rec offset as a 2-byte ulint */
 
1687
        mach_write_to_2(log_ptr, page_offset(rec));
 
1688
 
 
1689
        mlog_close(mtr, log_ptr + 2);
 
1690
}
 
1691
 
 
1692
/***************************************************************
 
1693
Parses log record of a record delete on a page. */
 
1694
UNIV_INTERN
 
1695
byte*
 
1696
page_cur_parse_delete_rec(
 
1697
/*======================*/
 
1698
                                /* out: pointer to record end or NULL */
 
1699
        byte*           ptr,    /* in: buffer */
 
1700
        byte*           end_ptr,/* in: buffer end */
 
1701
        buf_block_t*    block,  /* in: page or NULL */
 
1702
        dict_index_t*   index,  /* in: record descriptor */
 
1703
        mtr_t*          mtr)    /* in: mtr or NULL */
 
1704
{
 
1705
        ulint           offset;
 
1706
        page_cur_t      cursor;
 
1707
 
 
1708
        if (end_ptr < ptr + 2) {
 
1709
 
 
1710
                return(NULL);
 
1711
        }
 
1712
 
 
1713
        /* Read the cursor rec offset as a 2-byte ulint */
 
1714
        offset = mach_read_from_2(ptr);
 
1715
        ptr += 2;
 
1716
 
 
1717
        ut_a(offset <= UNIV_PAGE_SIZE);
 
1718
 
 
1719
        if (block) {
 
1720
                page_t*         page            = buf_block_get_frame(block);
 
1721
                mem_heap_t*     heap            = NULL;
 
1722
                ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
1723
                rec_t*          rec             = page + offset;
 
1724
                rec_offs_init(offsets_);
 
1725
 
 
1726
                page_cur_position(rec, block, &cursor);
 
1727
                ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
 
1728
 
 
1729
                page_cur_delete_rec(&cursor, index,
 
1730
                                    rec_get_offsets(rec, index, offsets_,
 
1731
                                                    ULINT_UNDEFINED, &heap),
 
1732
                                    mtr);
 
1733
                if (UNIV_LIKELY_NULL(heap)) {
 
1734
                        mem_heap_free(heap);
 
1735
                }
 
1736
        }
 
1737
 
 
1738
        return(ptr);
 
1739
}
 
1740
 
 
1741
/***************************************************************
 
1742
Deletes a record at the page cursor. The cursor is moved to the next
 
1743
record after the deleted one. */
 
1744
UNIV_INTERN
 
1745
void
 
1746
page_cur_delete_rec(
 
1747
/*================*/
 
1748
        page_cur_t*     cursor, /* in/out: a page cursor */
 
1749
        dict_index_t*   index,  /* in: record descriptor */
 
1750
        const ulint*    offsets,/* in: rec_get_offsets(cursor->rec, index) */
 
1751
        mtr_t*          mtr)    /* in: mini-transaction handle */
 
1752
{
 
1753
        page_dir_slot_t* cur_dir_slot;
 
1754
        page_dir_slot_t* prev_slot;
 
1755
        page_t*         page;
 
1756
        page_zip_des_t* page_zip;
 
1757
        rec_t*          current_rec;
 
1758
        rec_t*          prev_rec        = NULL;
 
1759
        rec_t*          next_rec;
 
1760
        ulint           cur_slot_no;
 
1761
        ulint           cur_n_owned;
 
1762
        rec_t*          rec;
 
1763
 
 
1764
        ut_ad(cursor && mtr);
 
1765
 
 
1766
        page = page_cur_get_page(cursor);
 
1767
        page_zip = page_cur_get_page_zip(cursor);
 
1768
        /* page_zip_validate() may fail here when
 
1769
        btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark(). */
 
1770
        current_rec = cursor->rec;
 
1771
        ut_ad(rec_offs_validate(current_rec, index, offsets));
 
1772
        ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
 
1773
 
 
1774
        /* The record must not be the supremum or infimum record. */
 
1775
        ut_ad(page_rec_is_user_rec(current_rec));
 
1776
 
 
1777
        /* Save to local variables some data associated with current_rec */
 
1778
        cur_slot_no = page_dir_find_owner_slot(current_rec);
 
1779
        cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
 
1780
        cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
 
1781
 
 
1782
        /* 0. Write the log record */
 
1783
        page_cur_delete_rec_write_log(current_rec, index, mtr);
 
1784
 
 
1785
        /* 1. Reset the last insert info in the page header and increment
 
1786
        the modify clock for the frame */
 
1787
 
 
1788
        page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
 
1789
 
 
1790
        /* The page gets invalid for optimistic searches: increment the
 
1791
        frame modify clock */
 
1792
 
 
1793
        buf_block_modify_clock_inc(page_cur_get_block(cursor));
 
1794
 
 
1795
        /* 2. Find the next and the previous record. Note that the cursor is
 
1796
        left at the next record. */
 
1797
 
 
1798
        ut_ad(cur_slot_no > 0);
 
1799
        prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
 
1800
 
 
1801
        rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
 
1802
 
 
1803
        /* rec now points to the record of the previous directory slot. Look
 
1804
        for the immediate predecessor of current_rec in a loop. */
 
1805
 
 
1806
        while(current_rec != rec) {
 
1807
                prev_rec = rec;
 
1808
                rec = page_rec_get_next(rec);
 
1809
        }
 
1810
 
 
1811
        page_cur_move_to_next(cursor);
 
1812
        next_rec = cursor->rec;
 
1813
 
 
1814
        /* 3. Remove the record from the linked list of records */
 
1815
 
 
1816
        page_rec_set_next(prev_rec, next_rec);
 
1817
 
 
1818
        /* 4. If the deleted record is pointed to by a dir slot, update the
 
1819
        record pointer in slot. In the following if-clause we assume that
 
1820
        prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
 
1821
        >= 2. */
 
1822
 
 
1823
#if PAGE_DIR_SLOT_MIN_N_OWNED < 2
 
1824
# error "PAGE_DIR_SLOT_MIN_N_OWNED < 2"
 
1825
#endif
 
1826
        ut_ad(cur_n_owned > 1);
 
1827
 
 
1828
        if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
 
1829
                page_dir_slot_set_rec(cur_dir_slot, prev_rec);
 
1830
        }
 
1831
 
 
1832
        /* 5. Update the number of owned records of the slot */
 
1833
 
 
1834
        page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
 
1835
 
 
1836
        /* 6. Free the memory occupied by the record */
 
1837
        page_mem_free(page, page_zip, current_rec, index, offsets);
 
1838
 
 
1839
        /* 7. Now we have decremented the number of owned records of the slot.
 
1840
        If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
 
1841
        slots. */
 
1842
 
 
1843
        if (UNIV_UNLIKELY(cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED)) {
 
1844
                page_dir_balance_slot(page, page_zip, cur_slot_no);
 
1845
        }
 
1846
 
 
1847
#ifdef UNIV_ZIP_DEBUG
 
1848
        ut_a(!page_zip || page_zip_validate(page_zip, page));
 
1849
#endif /* UNIV_ZIP_DEBUG */
 
1850
}