~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/row/row0merge.c

  • Committer: Stewart Smith
  • Author(s): Marko Mäkelä
  • Date: 2010-12-20 03:21:44 UTC
  • mto: (2021.1.2 build)
  • mto: This revision was merged to the branch mainline in revision 2022.
  • Revision ID: stewart@flamingspork.com-20101220032144-7aqh2z403u7d7bdp
Merge Revision revid:marko.makela@oracle.com-20101104131215-pfxnpidlrzd4krg0 from MySQL InnoDB

Original revid:marko.makela@oracle.com-20101104131215-pfxnpidlrzd4krg0

Original Authors: Marko Mäkelä <marko.makela@oracle.com>
Original commit message:
row_ins_index_entry(): Note that only CREATE INDEX sets foreign=FALSE.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*****************************************************************************
 
2
 
 
3
Copyright (c) 2005, 2010, Innobase Oy. All Rights Reserved.
 
4
 
 
5
This program is free software; you can redistribute it and/or modify it under
 
6
the terms of the GNU General Public License as published by the Free Software
 
7
Foundation; version 2 of the License.
 
8
 
 
9
This program is distributed in the hope that it will be useful, but WITHOUT
 
10
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
11
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 
12
 
 
13
You should have received a copy of the GNU General Public License along with
 
14
this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
 
15
St, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
*****************************************************************************/
 
18
 
 
19
/**************************************************//**
 
20
@file row/row0merge.c
 
21
New index creation routines using a merge sort
 
22
 
 
23
Created 12/4/2005 Jan Lindstrom
 
24
Completed by Sunny Bains and Marko Makela
 
25
*******************************************************/
 
26
 
 
27
#include "row0merge.h"
 
28
#include "row0ext.h"
 
29
#include "row0row.h"
 
30
#include "row0upd.h"
 
31
#include "row0ins.h"
 
32
#include "row0sel.h"
 
33
#include "dict0dict.h"
 
34
#include "dict0mem.h"
 
35
#include "dict0boot.h"
 
36
#include "dict0crea.h"
 
37
#include "dict0load.h"
 
38
#include "btr0btr.h"
 
39
#include "mach0data.h"
 
40
#include "trx0rseg.h"
 
41
#include "trx0trx.h"
 
42
#include "trx0roll.h"
 
43
#include "trx0undo.h"
 
44
#include "trx0purge.h"
 
45
#include "trx0rec.h"
 
46
#include "que0que.h"
 
47
#include "rem0cmp.h"
 
48
#include "read0read.h"
 
49
#include "os0file.h"
 
50
#include "lock0lock.h"
 
51
#include "data0data.h"
 
52
#include "data0type.h"
 
53
#include "que0que.h"
 
54
#include "pars0pars.h"
 
55
#include "mem0mem.h"
 
56
#include "log0log.h"
 
57
#include "ut0sort.h"
 
58
#include "handler0alter.h"
 
59
#include <unistd.h>
 
60
 
 
61
/* Ignore posix_fadvise() on those platforms where it does not exist */
 
62
#if defined __WIN__
 
63
# define posix_fadvise(fd, offset, len, advice) /* nothing */
 
64
#endif /* __WIN__ */
 
65
 
 
66
#ifdef UNIV_DEBUG
 
67
/** Set these in order ot enable debug printout. */
 
68
/* @{ */
 
69
/** Log the outcome of each row_merge_cmp() call, comparing records. */
 
70
static ibool    row_merge_print_cmp;
 
71
/** Log each record read from temporary file. */
 
72
static ibool    row_merge_print_read;
 
73
/** Log each record write to temporary file. */
 
74
static ibool    row_merge_print_write;
 
75
/** Log each row_merge_blocks() call, merging two blocks of records to
 
76
a bigger one. */
 
77
static ibool    row_merge_print_block;
 
78
/** Log each block read from temporary file. */
 
79
static ibool    row_merge_print_block_read;
 
80
/** Log each block read from temporary file. */
 
81
static ibool    row_merge_print_block_write;
 
82
/* @} */
 
83
#endif /* UNIV_DEBUG */
 
84
 
 
85
/** @brief Block size for I/O operations in merge sort.
 
86
 
 
87
The minimum is UNIV_PAGE_SIZE, or page_get_free_space_of_empty()
 
88
rounded to a power of 2.
 
89
 
 
90
When not creating a PRIMARY KEY that contains column prefixes, this
 
91
can be set as small as UNIV_PAGE_SIZE / 2.  See the comment above
 
92
ut_ad(data_size < sizeof(row_merge_block_t)). */
 
93
typedef byte    row_merge_block_t[1048576];
 
94
 
 
95
/** @brief Secondary buffer for I/O operations of merge records.
 
96
 
 
97
This buffer is used for writing or reading a record that spans two
 
98
row_merge_block_t.  Thus, it must be able to hold one merge record,
 
99
whose maximum size is the same as the minimum size of
 
100
row_merge_block_t. */
 
101
typedef byte    mrec_buf_t[UNIV_PAGE_SIZE];
 
102
 
 
103
/** @brief Merge record in row_merge_block_t.
 
104
 
 
105
The format is the same as a record in ROW_FORMAT=COMPACT with the
 
106
exception that the REC_N_NEW_EXTRA_BYTES are omitted. */
 
107
typedef byte    mrec_t;
 
108
 
 
109
/** Buffer for sorting in main memory. */
 
110
struct row_merge_buf_struct {
 
111
        mem_heap_t*     heap;           /*!< memory heap where allocated */
 
112
        dict_index_t*   index;          /*!< the index the tuples belong to */
 
113
        ulint           total_size;     /*!< total amount of data bytes */
 
114
        ulint           n_tuples;       /*!< number of data tuples */
 
115
        ulint           max_tuples;     /*!< maximum number of data tuples */
 
116
        const dfield_t**tuples;         /*!< array of pointers to
 
117
                                        arrays of fields that form
 
118
                                        the data tuples */
 
119
        const dfield_t**tmp_tuples;     /*!< temporary copy of tuples,
 
120
                                        for sorting */
 
121
};
 
122
 
 
123
/** Buffer for sorting in main memory. */
 
124
typedef struct row_merge_buf_struct row_merge_buf_t;
 
125
 
 
126
/** Information about temporary files used in merge sort */
 
127
struct merge_file_struct {
 
128
        int             fd;             /*!< file descriptor */
 
129
        ulint           offset;         /*!< file offset (end of file) */
 
130
        ib_uint64_t     n_rec;          /*!< number of records in the file */
 
131
};
 
132
 
 
133
/** Information about temporary files used in merge sort */
 
134
typedef struct merge_file_struct merge_file_t;
 
135
 
 
136
#ifdef UNIV_DEBUG
 
137
/******************************************************//**
 
138
Display a merge tuple. */
 
139
static
 
140
void
 
141
row_merge_tuple_print(
 
142
/*==================*/
 
143
        FILE*           f,      /*!< in: output stream */
 
144
        const dfield_t* entry,  /*!< in: tuple to print */
 
145
        ulint           n_fields)/*!< in: number of fields in the tuple */
 
146
{
 
147
        ulint   j;
 
148
 
 
149
        for (j = 0; j < n_fields; j++) {
 
150
                const dfield_t* field = &entry[j];
 
151
 
 
152
                if (dfield_is_null(field)) {
 
153
                        fputs("\n NULL;", f);
 
154
                } else {
 
155
                        ulint   field_len       = dfield_get_len(field);
 
156
                        ulint   len             = ut_min(field_len, 20);
 
157
                        if (dfield_is_ext(field)) {
 
158
                                fputs("\nE", f);
 
159
                        } else {
 
160
                                fputs("\n ", f);
 
161
                        }
 
162
                        ut_print_buf(f, dfield_get_data(field), len);
 
163
                        if (len != field_len) {
 
164
                                fprintf(f, " (total %lu bytes)", field_len);
 
165
                        }
 
166
                }
 
167
        }
 
168
        putc('\n', f);
 
169
}
 
170
#endif /* UNIV_DEBUG */
 
171
 
 
172
/******************************************************//**
 
173
Allocate a sort buffer.
 
174
@return own: sort buffer */
 
175
static
 
176
row_merge_buf_t*
 
177
row_merge_buf_create_low(
 
178
/*=====================*/
 
179
        mem_heap_t*     heap,           /*!< in: heap where allocated */
 
180
        dict_index_t*   index,          /*!< in: secondary index */
 
181
        ulint           max_tuples,     /*!< in: maximum number of data tuples */
 
182
        ulint           buf_size)       /*!< in: size of the buffer, in bytes */
 
183
{
 
184
        row_merge_buf_t*        buf;
 
185
 
 
186
        ut_ad(max_tuples > 0);
 
187
        ut_ad(max_tuples <= sizeof(row_merge_block_t));
 
188
        ut_ad(max_tuples < buf_size);
 
189
 
 
190
        buf = mem_heap_zalloc(heap, buf_size);
 
191
        buf->heap = heap;
 
192
        buf->index = index;
 
193
        buf->max_tuples = max_tuples;
 
194
        buf->tuples = mem_heap_alloc(heap,
 
195
                                     2 * max_tuples * sizeof *buf->tuples);
 
196
        buf->tmp_tuples = buf->tuples + max_tuples;
 
197
 
 
198
        return(buf);
 
199
}
 
200
 
 
201
/******************************************************//**
 
202
Allocate a sort buffer.
 
203
@return own: sort buffer */
 
204
static
 
205
row_merge_buf_t*
 
206
row_merge_buf_create(
 
207
/*=================*/
 
208
        dict_index_t*   index)  /*!< in: secondary index */
 
209
{
 
210
        row_merge_buf_t*        buf;
 
211
        ulint                   max_tuples;
 
212
        ulint                   buf_size;
 
213
        mem_heap_t*             heap;
 
214
 
 
215
        max_tuples = sizeof(row_merge_block_t)
 
216
                / ut_max(1, dict_index_get_min_size(index));
 
217
 
 
218
        buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
 
219
 
 
220
        heap = mem_heap_create(buf_size + sizeof(row_merge_block_t));
 
221
 
 
222
        buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
 
223
 
 
224
        return(buf);
 
225
}
 
226
 
 
227
/******************************************************//**
 
228
Empty a sort buffer.
 
229
@return sort buffer */
 
230
static
 
231
row_merge_buf_t*
 
232
row_merge_buf_empty(
 
233
/*================*/
 
234
        row_merge_buf_t*        buf)    /*!< in,own: sort buffer */
 
235
{
 
236
        ulint           buf_size;
 
237
        ulint           max_tuples      = buf->max_tuples;
 
238
        mem_heap_t*     heap            = buf->heap;
 
239
        dict_index_t*   index           = buf->index;
 
240
 
 
241
        buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
 
242
 
 
243
        mem_heap_empty(heap);
 
244
 
 
245
        return(row_merge_buf_create_low(heap, index, max_tuples, buf_size));
 
246
}
 
247
 
 
248
/******************************************************//**
 
249
Deallocate a sort buffer. */
 
250
static
 
251
void
 
252
row_merge_buf_free(
 
253
/*===============*/
 
254
        row_merge_buf_t*        buf)    /*!< in,own: sort buffer, to be freed */
 
255
{
 
256
        mem_heap_free(buf->heap);
 
257
}
 
258
 
 
259
/******************************************************//**
 
260
Insert a data tuple into a sort buffer.
 
261
@return TRUE if added, FALSE if out of space */
 
262
static
 
263
ibool
 
264
row_merge_buf_add(
 
265
/*==============*/
 
266
        row_merge_buf_t*        buf,    /*!< in/out: sort buffer */
 
267
        const dtuple_t*         row,    /*!< in: row in clustered index */
 
268
        const row_ext_t*        ext)    /*!< in: cache of externally stored
 
269
                                        column prefixes, or NULL */
 
270
{
 
271
        ulint                   i;
 
272
        ulint                   n_fields;
 
273
        ulint                   data_size;
 
274
        ulint                   extra_size;
 
275
        const dict_index_t*     index;
 
276
        dfield_t*               entry;
 
277
        dfield_t*               field;
 
278
        const dict_field_t*     ifield;
 
279
 
 
280
        if (buf->n_tuples >= buf->max_tuples) {
 
281
                return(FALSE);
 
282
        }
 
283
 
 
284
        UNIV_PREFETCH_R(row->fields);
 
285
 
 
286
        index = buf->index;
 
287
 
 
288
        n_fields = dict_index_get_n_fields(index);
 
289
 
 
290
        entry = mem_heap_alloc(buf->heap, n_fields * sizeof *entry);
 
291
        buf->tuples[buf->n_tuples] = entry;
 
292
        field = entry;
 
293
 
 
294
        data_size = 0;
 
295
        extra_size = UT_BITS_IN_BYTES(index->n_nullable);
 
296
 
 
297
        ifield = dict_index_get_nth_field(index, 0);
 
298
 
 
299
        for (i = 0; i < n_fields; i++, field++, ifield++) {
 
300
                const dict_col_t*       col;
 
301
                ulint                   col_no;
 
302
                const dfield_t*         row_field;
 
303
                ulint                   len;
 
304
 
 
305
                col = ifield->col;
 
306
                col_no = dict_col_get_no(col);
 
307
                row_field = dtuple_get_nth_field(row, col_no);
 
308
                dfield_copy(field, row_field);
 
309
                len = dfield_get_len(field);
 
310
 
 
311
                if (dfield_is_null(field)) {
 
312
                        ut_ad(!(col->prtype & DATA_NOT_NULL));
 
313
                        continue;
 
314
                } else if (UNIV_LIKELY(!ext)) {
 
315
                } else if (dict_index_is_clust(index)) {
 
316
                        /* Flag externally stored fields. */
 
317
                        const byte*     buf = row_ext_lookup(ext, col_no,
 
318
                                                             &len);
 
319
                        if (UNIV_LIKELY_NULL(buf)) {
 
320
                                ut_a(buf != field_ref_zero);
 
321
                                if (i < dict_index_get_n_unique(index)) {
 
322
                                        dfield_set_data(field, buf, len);
 
323
                                } else {
 
324
                                        dfield_set_ext(field);
 
325
                                        len = dfield_get_len(field);
 
326
                                }
 
327
                        }
 
328
                } else {
 
329
                        const byte*     buf = row_ext_lookup(ext, col_no,
 
330
                                                             &len);
 
331
                        if (UNIV_LIKELY_NULL(buf)) {
 
332
                                ut_a(buf != field_ref_zero);
 
333
                                dfield_set_data(field, buf, len);
 
334
                        }
 
335
                }
 
336
 
 
337
                /* If a column prefix index, take only the prefix */
 
338
 
 
339
                if (ifield->prefix_len) {
 
340
                        len = dtype_get_at_most_n_mbchars(
 
341
                                col->prtype,
 
342
                                col->mbminmaxlen,
 
343
                                ifield->prefix_len,
 
344
                                len, dfield_get_data(field));
 
345
                        dfield_set_len(field, len);
 
346
                }
 
347
 
 
348
                ut_ad(len <= col->len || col->mtype == DATA_BLOB);
 
349
 
 
350
                if (ifield->fixed_len) {
 
351
                        ut_ad(len == ifield->fixed_len);
 
352
                        ut_ad(!dfield_is_ext(field));
 
353
                } else if (dfield_is_ext(field)) {
 
354
                        extra_size += 2;
 
355
                } else if (len < 128
 
356
                           || (col->len < 256 && col->mtype != DATA_BLOB)) {
 
357
                        extra_size++;
 
358
                } else {
 
359
                        /* For variable-length columns, we look up the
 
360
                        maximum length from the column itself.  If this
 
361
                        is a prefix index column shorter than 256 bytes,
 
362
                        this will waste one byte. */
 
363
                        extra_size += 2;
 
364
                }
 
365
                data_size += len;
 
366
        }
 
367
 
 
368
#ifdef UNIV_DEBUG
 
369
        {
 
370
                ulint   size;
 
371
                ulint   extra;
 
372
 
 
373
                size = rec_get_converted_size_comp(index,
 
374
                                                   REC_STATUS_ORDINARY,
 
375
                                                   entry, n_fields, &extra);
 
376
 
 
377
                ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
 
378
                ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
 
379
        }
 
380
#endif /* UNIV_DEBUG */
 
381
 
 
382
        /* Add to the total size of the record in row_merge_block_t
 
383
        the encoded length of extra_size and the extra bytes (extra_size).
 
384
        See row_merge_buf_write() for the variable-length encoding
 
385
        of extra_size. */
 
386
        data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
 
387
 
 
388
        /* The following assertion may fail if row_merge_block_t is
 
389
        declared very small and a PRIMARY KEY is being created with
 
390
        many prefix columns.  In that case, the record may exceed the
 
391
        page_zip_rec_needs_ext() limit.  However, no further columns
 
392
        will be moved to external storage until the record is inserted
 
393
        to the clustered index B-tree. */
 
394
        ut_ad(data_size < sizeof(row_merge_block_t));
 
395
 
 
396
        /* Reserve one byte for the end marker of row_merge_block_t. */
 
397
        if (buf->total_size + data_size >= sizeof(row_merge_block_t) - 1) {
 
398
                return(FALSE);
 
399
        }
 
400
 
 
401
        buf->total_size += data_size;
 
402
        buf->n_tuples++;
 
403
 
 
404
        field = entry;
 
405
 
 
406
        /* Copy the data fields. */
 
407
 
 
408
        do {
 
409
                dfield_dup(field++, buf->heap);
 
410
        } while (--n_fields);
 
411
 
 
412
        return(TRUE);
 
413
}
 
414
 
 
415
/** Structure for reporting duplicate records. */
 
416
struct row_merge_dup_struct {
 
417
        const dict_index_t*     index;          /*!< index being sorted */
 
418
        struct TABLE*           table;          /*!< MySQL table object */
 
419
        ulint                   n_dup;          /*!< number of duplicates */
 
420
};
 
421
 
 
422
/** Structure for reporting duplicate records. */
 
423
typedef struct row_merge_dup_struct row_merge_dup_t;
 
424
 
 
425
/*************************************************************//**
 
426
Report a duplicate key. */
 
427
static
 
428
void
 
429
row_merge_dup_report(
 
430
/*=================*/
 
431
        row_merge_dup_t*        dup,    /*!< in/out: for reporting duplicates */
 
432
        const dfield_t*         entry)  /*!< in: duplicate index entry */
 
433
{
 
434
        mrec_buf_t*             buf;
 
435
        const dtuple_t*         tuple;
 
436
        dtuple_t                tuple_store;
 
437
        const rec_t*            rec;
 
438
        const dict_index_t*     index   = dup->index;
 
439
        ulint                   n_fields= dict_index_get_n_fields(index);
 
440
        mem_heap_t*             heap;
 
441
        ulint*                  offsets;
 
442
        ulint                   n_ext;
 
443
 
 
444
        if (dup->n_dup++) {
 
445
                /* Only report the first duplicate record,
 
446
                but count all duplicate records. */
 
447
                return;
 
448
        }
 
449
 
 
450
        /* Convert the tuple to a record and then to MySQL format. */
 
451
        heap = mem_heap_create((1 + REC_OFFS_HEADER_SIZE + n_fields)
 
452
                               * sizeof *offsets
 
453
                               + sizeof *buf);
 
454
 
 
455
        buf = mem_heap_alloc(heap, sizeof *buf);
 
456
 
 
457
        tuple = dtuple_from_fields(&tuple_store, entry, n_fields);
 
458
        n_ext = dict_index_is_clust(index) ? dtuple_get_n_ext(tuple) : 0;
 
459
 
 
460
        rec = rec_convert_dtuple_to_rec(*buf, index, tuple, n_ext);
 
461
        offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
 
462
 
 
463
        innobase_rec_to_mysql(dup->table, rec, index, offsets);
 
464
 
 
465
        mem_heap_free(heap);
 
466
}
 
467
 
 
468
/*************************************************************//**
 
469
Compare two tuples.
 
470
@return 1, 0, -1 if a is greater, equal, less, respectively, than b */
 
471
static
 
472
int
 
473
row_merge_tuple_cmp(
 
474
/*================*/
 
475
        ulint                   n_field,/*!< in: number of fields */
 
476
        const dfield_t*         a,      /*!< in: first tuple to be compared */
 
477
        const dfield_t*         b,      /*!< in: second tuple to be compared */
 
478
        row_merge_dup_t*        dup)    /*!< in/out: for reporting duplicates */
 
479
{
 
480
        int             cmp;
 
481
        const dfield_t* field   = a;
 
482
 
 
483
        /* Compare the fields of the tuples until a difference is
 
484
        found or we run out of fields to compare.  If !cmp at the
 
485
        end, the tuples are equal. */
 
486
        do {
 
487
                cmp = cmp_dfield_dfield(a++, b++);
 
488
        } while (!cmp && --n_field);
 
489
 
 
490
        if (UNIV_UNLIKELY(!cmp) && UNIV_LIKELY_NULL(dup)) {
 
491
                /* Report a duplicate value error if the tuples are
 
492
                logically equal.  NULL columns are logically inequal,
 
493
                although they are equal in the sorting order.  Find
 
494
                out if any of the fields are NULL. */
 
495
                for (b = field; b != a; b++) {
 
496
                        if (dfield_is_null(b)) {
 
497
 
 
498
                                goto func_exit;
 
499
                        }
 
500
                }
 
501
 
 
502
                row_merge_dup_report(dup, field);
 
503
        }
 
504
 
 
505
func_exit:
 
506
        return(cmp);
 
507
}
 
508
 
 
509
/** Wrapper for row_merge_tuple_sort() to inject some more context to
 
510
UT_SORT_FUNCTION_BODY().
 
511
@param a        array of tuples that being sorted
 
512
@param b        aux (work area), same size as tuples[]
 
513
@param c        lower bound of the sorting area, inclusive
 
514
@param d        upper bound of the sorting area, inclusive */
 
515
#define row_merge_tuple_sort_ctx(a,b,c,d) \
 
516
        row_merge_tuple_sort(n_field, dup, a, b, c, d)
 
517
/** Wrapper for row_merge_tuple_cmp() to inject some more context to
 
518
UT_SORT_FUNCTION_BODY().
 
519
@param a        first tuple to be compared
 
520
@param b        second tuple to be compared
 
521
@return 1, 0, -1 if a is greater, equal, less, respectively, than b */
 
522
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, a, b, dup)
 
523
 
 
524
/**********************************************************************//**
 
525
Merge sort the tuple buffer in main memory. */
 
526
static
 
527
void
 
528
row_merge_tuple_sort(
 
529
/*=================*/
 
530
        ulint                   n_field,/*!< in: number of fields */
 
531
        row_merge_dup_t*        dup,    /*!< in/out: for reporting duplicates */
 
532
        const dfield_t**        tuples, /*!< in/out: tuples */
 
533
        const dfield_t**        aux,    /*!< in/out: work area */
 
534
        ulint                   low,    /*!< in: lower bound of the
 
535
                                        sorting area, inclusive */
 
536
        ulint                   high)   /*!< in: upper bound of the
 
537
                                        sorting area, exclusive */
 
538
{
 
539
        UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
 
540
                              tuples, aux, low, high, row_merge_tuple_cmp_ctx);
 
541
}
 
542
 
 
543
/******************************************************//**
 
544
Sort a buffer. */
 
545
static
 
546
void
 
547
row_merge_buf_sort(
 
548
/*===============*/
 
549
        row_merge_buf_t*        buf,    /*!< in/out: sort buffer */
 
550
        row_merge_dup_t*        dup)    /*!< in/out: for reporting duplicates */
 
551
{
 
552
        row_merge_tuple_sort(dict_index_get_n_unique(buf->index), dup,
 
553
                             buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
 
554
}
 
555
 
 
556
/******************************************************//**
 
557
Write a buffer to a block. */
 
558
static
 
559
void
 
560
row_merge_buf_write(
 
561
/*================*/
 
562
        const row_merge_buf_t*  buf,    /*!< in: sorted buffer */
 
563
#ifdef UNIV_DEBUG
 
564
        const merge_file_t*     of,     /*!< in: output file */
 
565
#endif /* UNIV_DEBUG */
 
566
        row_merge_block_t*      block)  /*!< out: buffer for writing to file */
 
567
#ifndef UNIV_DEBUG
 
568
# define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block)
 
569
#endif /* !UNIV_DEBUG */
 
570
{
 
571
        const dict_index_t*     index   = buf->index;
 
572
        ulint                   n_fields= dict_index_get_n_fields(index);
 
573
        byte*                   b       = &(*block)[0];
 
574
 
 
575
        ulint           i;
 
576
 
 
577
        for (i = 0; i < buf->n_tuples; i++) {
 
578
                ulint           size;
 
579
                ulint           extra_size;
 
580
                const dfield_t* entry           = buf->tuples[i];
 
581
 
 
582
                size = rec_get_converted_size_comp(index,
 
583
                                                   REC_STATUS_ORDINARY,
 
584
                                                   entry, n_fields,
 
585
                                                   &extra_size);
 
586
                ut_ad(size > extra_size);
 
587
                ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
 
588
                extra_size -= REC_N_NEW_EXTRA_BYTES;
 
589
                size -= REC_N_NEW_EXTRA_BYTES;
 
590
 
 
591
                /* Encode extra_size + 1 */
 
592
                if (extra_size + 1 < 0x80) {
 
593
                        *b++ = (byte) (extra_size + 1);
 
594
                } else {
 
595
                        ut_ad((extra_size + 1) < 0x8000);
 
596
                        *b++ = (byte) (0x80 | ((extra_size + 1) >> 8));
 
597
                        *b++ = (byte) (extra_size + 1);
 
598
                }
 
599
 
 
600
                ut_ad(b + size < block[1]);
 
601
 
 
602
                rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
 
603
                                               REC_STATUS_ORDINARY,
 
604
                                               entry, n_fields);
 
605
 
 
606
                b += size;
 
607
 
 
608
#ifdef UNIV_DEBUG
 
609
                if (row_merge_print_write) {
 
610
                        fprintf(stderr, "row_merge_buf_write %p,%d,%lu %lu",
 
611
                                (void*) b, of->fd, (ulong) of->offset,
 
612
                                (ulong) i);
 
613
                        row_merge_tuple_print(stderr, entry, n_fields);
 
614
                }
 
615
#endif /* UNIV_DEBUG */
 
616
        }
 
617
 
 
618
        /* Write an "end-of-chunk" marker. */
 
619
        ut_a(b < block[1]);
 
620
        ut_a(b == block[0] + buf->total_size);
 
621
        *b++ = 0;
 
622
#ifdef UNIV_DEBUG_VALGRIND
 
623
        /* The rest of the block is uninitialized.  Initialize it
 
624
        to avoid bogus warnings. */
 
625
        memset(b, 0xff, block[1] - b);
 
626
#endif /* UNIV_DEBUG_VALGRIND */
 
627
#ifdef UNIV_DEBUG
 
628
        if (row_merge_print_write) {
 
629
                fprintf(stderr, "row_merge_buf_write %p,%d,%lu EOF\n",
 
630
                        (void*) b, of->fd, (ulong) of->offset);
 
631
        }
 
632
#endif /* UNIV_DEBUG */
 
633
}
 
634
 
 
635
/******************************************************//**
 
636
Create a memory heap and allocate space for row_merge_rec_offsets()
 
637
and mrec_buf_t[3].
 
638
@return memory heap */
 
639
static
 
640
mem_heap_t*
 
641
row_merge_heap_create(
 
642
/*==================*/
 
643
        const dict_index_t*     index,          /*!< in: record descriptor */
 
644
        mrec_buf_t**            buf,            /*!< out: 3 buffers */
 
645
        ulint**                 offsets1,       /*!< out: offsets */
 
646
        ulint**                 offsets2)       /*!< out: offsets */
 
647
{
 
648
        ulint           i       = 1 + REC_OFFS_HEADER_SIZE
 
649
                + dict_index_get_n_fields(index);
 
650
        mem_heap_t*     heap    = mem_heap_create(2 * i * sizeof **offsets1
 
651
                                                  + 3 * sizeof **buf);
 
652
 
 
653
        *buf = mem_heap_alloc(heap, 3 * sizeof **buf);
 
654
        *offsets1 = mem_heap_alloc(heap, i * sizeof **offsets1);
 
655
        *offsets2 = mem_heap_alloc(heap, i * sizeof **offsets2);
 
656
 
 
657
        (*offsets1)[0] = (*offsets2)[0] = i;
 
658
        (*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
 
659
 
 
660
        return(heap);
 
661
}
 
662
 
 
663
/**********************************************************************//**
 
664
Search an index object by name and column names.  If several indexes match,
 
665
return the index with the max id.
 
666
@return matching index, NULL if not found */
 
667
static
 
668
dict_index_t*
 
669
row_merge_dict_table_get_index(
 
670
/*===========================*/
 
671
        dict_table_t*           table,          /*!< in: table */
 
672
        const merge_index_def_t*index_def)      /*!< in: index definition */
 
673
{
 
674
        ulint           i;
 
675
        dict_index_t*   index;
 
676
        const char**    column_names;
 
677
 
 
678
        column_names = mem_alloc(index_def->n_fields * sizeof *column_names);
 
679
 
 
680
        for (i = 0; i < index_def->n_fields; ++i) {
 
681
                column_names[i] = index_def->fields[i].field_name;
 
682
        }
 
683
 
 
684
        index = dict_table_get_index_by_max_id(
 
685
                table, index_def->name, column_names, index_def->n_fields);
 
686
 
 
687
        mem_free((void*) column_names);
 
688
 
 
689
        return(index);
 
690
}
 
691
 
 
692
/********************************************************************//**
 
693
Read a merge block from the file system.
 
694
@return TRUE if request was successful, FALSE if fail */
 
695
static
 
696
ibool
 
697
row_merge_read(
 
698
/*===========*/
 
699
        int                     fd,     /*!< in: file descriptor */
 
700
        ulint                   offset, /*!< in: offset where to read
 
701
                                        in number of row_merge_block_t
 
702
                                        elements */
 
703
        row_merge_block_t*      buf)    /*!< out: data */
 
704
{
 
705
        ib_uint64_t     ofs = ((ib_uint64_t) offset) * sizeof *buf;
 
706
        ibool           success;
 
707
 
 
708
#ifdef UNIV_DEBUG
 
709
        if (row_merge_print_block_read) {
 
710
                fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
 
711
                        fd, (ulong) offset);
 
712
        }
 
713
#endif /* UNIV_DEBUG */
 
714
 
 
715
        success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
 
716
                                                 (ulint) (ofs & 0xFFFFFFFF),
 
717
                                                 (ulint) (ofs >> 32),
 
718
                                                 sizeof *buf);
 
719
#ifdef POSIX_FADV_DONTNEED
 
720
        /* Each block is read exactly once.  Free up the file cache. */
 
721
        posix_fadvise(fd, ofs, sizeof *buf, POSIX_FADV_DONTNEED);
 
722
#endif /* POSIX_FADV_DONTNEED */
 
723
 
 
724
        if (UNIV_UNLIKELY(!success)) {
 
725
                ut_print_timestamp(stderr);
 
726
                fprintf(stderr,
 
727
                        "  InnoDB: failed to read merge block at %"PRIu64"\n", ofs);
 
728
        }
 
729
 
 
730
        return(UNIV_LIKELY(success));
 
731
}
 
732
 
 
733
/********************************************************************//**
 
734
Write a merge block to the file system.
 
735
@return TRUE if request was successful, FALSE if fail */
 
736
static
 
737
ibool
 
738
row_merge_write(
 
739
/*============*/
 
740
        int             fd,     /*!< in: file descriptor */
 
741
        ulint           offset, /*!< in: offset where to write,
 
742
                                in number of row_merge_block_t elements */
 
743
        const void*     buf)    /*!< in: data */
 
744
{
 
745
        size_t          buf_len = sizeof(row_merge_block_t);
 
746
        ib_uint64_t     ofs = buf_len * (ib_uint64_t) offset;
 
747
        ibool           ret;
 
748
 
 
749
        ret = os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
 
750
                            (ulint) (ofs & 0xFFFFFFFF),
 
751
                            (ulint) (ofs >> 32),
 
752
                            buf_len);
 
753
 
 
754
#ifdef UNIV_DEBUG
 
755
        if (row_merge_print_block_write) {
 
756
                fprintf(stderr, "row_merge_write fd=%d ofs=%lu\n",
 
757
                        fd, (ulong) offset);
 
758
        }
 
759
#endif /* UNIV_DEBUG */
 
760
 
 
761
#ifdef POSIX_FADV_DONTNEED
 
762
        /* The block will be needed on the next merge pass,
 
763
        but it can be evicted from the file cache meanwhile. */
 
764
        posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
 
765
#endif /* POSIX_FADV_DONTNEED */
 
766
 
 
767
        return(UNIV_LIKELY(ret));
 
768
}
 
769
 
 
770
/********************************************************************//**
 
771
Read a merge record.
 
772
@return pointer to next record, or NULL on I/O error or end of list */
 
773
static __attribute__((nonnull))
 
774
const byte*
 
775
row_merge_read_rec(
 
776
/*===============*/
 
777
        row_merge_block_t*      block,  /*!< in/out: file buffer */
 
778
        mrec_buf_t*             buf,    /*!< in/out: secondary buffer */
 
779
        const byte*             b,      /*!< in: pointer to record */
 
780
        const dict_index_t*     index,  /*!< in: index of the record */
 
781
        int                     fd,     /*!< in: file descriptor */
 
782
        ulint*                  foffs,  /*!< in/out: file offset */
 
783
        const mrec_t**          mrec,   /*!< out: pointer to merge record,
 
784
                                        or NULL on end of list
 
785
                                        (non-NULL on I/O error) */
 
786
        ulint*                  offsets)/*!< out: offsets of mrec */
 
787
{
 
788
        ulint   extra_size;
 
789
        ulint   data_size;
 
790
        ulint   avail_size;
 
791
 
 
792
        ut_ad(block);
 
793
        ut_ad(buf);
 
794
        ut_ad(b >= block[0]);
 
795
        ut_ad(b < block[1]);
 
796
        ut_ad(index);
 
797
        ut_ad(foffs);
 
798
        ut_ad(mrec);
 
799
        ut_ad(offsets);
 
800
 
 
801
        ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
 
802
              + dict_index_get_n_fields(index));
 
803
 
 
804
        extra_size = *b++;
 
805
 
 
806
        if (UNIV_UNLIKELY(!extra_size)) {
 
807
                /* End of list */
 
808
                *mrec = NULL;
 
809
#ifdef UNIV_DEBUG
 
810
                if (row_merge_print_read) {
 
811
                        fprintf(stderr, "row_merge_read %p,%p,%d,%lu EOF\n",
 
812
                                (const void*) b, (const void*) block,
 
813
                                fd, (ulong) *foffs);
 
814
                }
 
815
#endif /* UNIV_DEBUG */
 
816
                return(NULL);
 
817
        }
 
818
 
 
819
        if (extra_size >= 0x80) {
 
820
                /* Read another byte of extra_size. */
 
821
 
 
822
                if (UNIV_UNLIKELY(b >= block[1])) {
 
823
                        if (!row_merge_read(fd, ++(*foffs), block)) {
 
824
err_exit:
 
825
                                /* Signal I/O error. */
 
826
                                *mrec = b;
 
827
                                return(NULL);
 
828
                        }
 
829
 
 
830
                        /* Wrap around to the beginning of the buffer. */
 
831
                        b = block[0];
 
832
                }
 
833
 
 
834
                extra_size = (extra_size & 0x7f) << 8;
 
835
                extra_size |= *b++;
 
836
        }
 
837
 
 
838
        /* Normalize extra_size.  Above, value 0 signals "end of list". */
 
839
        extra_size--;
 
840
 
 
841
        /* Read the extra bytes. */
 
842
 
 
843
        if (UNIV_UNLIKELY(b + extra_size >= block[1])) {
 
844
                /* The record spans two blocks.  Copy the entire record
 
845
                to the auxiliary buffer and handle this as a special
 
846
                case. */
 
847
 
 
848
                avail_size = block[1] - b;
 
849
 
 
850
                memcpy(*buf, b, avail_size);
 
851
 
 
852
                if (!row_merge_read(fd, ++(*foffs), block)) {
 
853
 
 
854
                        goto err_exit;
 
855
                }
 
856
 
 
857
                /* Wrap around to the beginning of the buffer. */
 
858
                b = block[0];
 
859
 
 
860
                /* Copy the record. */
 
861
                memcpy(*buf + avail_size, b, extra_size - avail_size);
 
862
                b += extra_size - avail_size;
 
863
 
 
864
                *mrec = *buf + extra_size;
 
865
 
 
866
                rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
 
867
 
 
868
                data_size = rec_offs_data_size(offsets);
 
869
 
 
870
                /* These overflows should be impossible given that
 
871
                records are much smaller than either buffer, and
 
872
                the record starts near the beginning of each buffer. */
 
873
                ut_a(extra_size + data_size < sizeof *buf);
 
874
                ut_a(b + data_size < block[1]);
 
875
 
 
876
                /* Copy the data bytes. */
 
877
                memcpy(*buf + extra_size, b, data_size);
 
878
                b += data_size;
 
879
 
 
880
                goto func_exit;
 
881
        }
 
882
 
 
883
        *mrec = b + extra_size;
 
884
 
 
885
        rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
 
886
 
 
887
        data_size = rec_offs_data_size(offsets);
 
888
        ut_ad(extra_size + data_size < sizeof *buf);
 
889
 
 
890
        b += extra_size + data_size;
 
891
 
 
892
        if (UNIV_LIKELY(b < block[1])) {
 
893
                /* The record fits entirely in the block.
 
894
                This is the normal case. */
 
895
                goto func_exit;
 
896
        }
 
897
 
 
898
        /* The record spans two blocks.  Copy it to buf. */
 
899
 
 
900
        b -= extra_size + data_size;
 
901
        avail_size = block[1] - b;
 
902
        memcpy(*buf, b, avail_size);
 
903
        *mrec = *buf + extra_size;
 
904
#ifdef UNIV_DEBUG
 
905
        /* We cannot invoke rec_offs_make_valid() here, because there
 
906
        are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
 
907
        Similarly, rec_offs_validate() would fail, because it invokes
 
908
        rec_get_status(). */
 
909
        offsets[2] = (ulint) *mrec;
 
910
        offsets[3] = (ulint) index;
 
911
#endif /* UNIV_DEBUG */
 
912
 
 
913
        if (!row_merge_read(fd, ++(*foffs), block)) {
 
914
 
 
915
                goto err_exit;
 
916
        }
 
917
 
 
918
        /* Wrap around to the beginning of the buffer. */
 
919
        b = block[0];
 
920
 
 
921
        /* Copy the rest of the record. */
 
922
        memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
 
923
        b += extra_size + data_size - avail_size;
 
924
 
 
925
func_exit:
 
926
#ifdef UNIV_DEBUG
 
927
        if (row_merge_print_read) {
 
928
                fprintf(stderr, "row_merge_read %p,%p,%d,%lu ",
 
929
                        (const void*) b, (const void*) block,
 
930
                        fd, (ulong) *foffs);
 
931
                rec_print_comp(stderr, *mrec, offsets);
 
932
                putc('\n', stderr);
 
933
        }
 
934
#endif /* UNIV_DEBUG */
 
935
 
 
936
        return(b);
 
937
}
 
938
 
 
939
/********************************************************************//**
 
940
Write a merge record. */
 
941
static
 
942
void
 
943
row_merge_write_rec_low(
 
944
/*====================*/
 
945
        byte*           b,      /*!< out: buffer */
 
946
        ulint           e,      /*!< in: encoded extra_size */
 
947
#ifdef UNIV_DEBUG
 
948
        ulint           size,   /*!< in: total size to write */
 
949
        int             fd,     /*!< in: file descriptor */
 
950
        ulint           foffs,  /*!< in: file offset */
 
951
#endif /* UNIV_DEBUG */
 
952
        const mrec_t*   mrec,   /*!< in: record to write */
 
953
        const ulint*    offsets)/*!< in: offsets of mrec */
 
954
#ifndef UNIV_DEBUG
 
955
# define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets)  \
 
956
        row_merge_write_rec_low(b, e, mrec, offsets)
 
957
#endif /* !UNIV_DEBUG */
 
958
{
 
959
#ifdef UNIV_DEBUG
 
960
        const byte* const end = b + size;
 
961
        ut_ad(e == rec_offs_extra_size(offsets) + 1);
 
962
 
 
963
        if (row_merge_print_write) {
 
964
                fprintf(stderr, "row_merge_write %p,%d,%lu ",
 
965
                        (void*) b, fd, (ulong) foffs);
 
966
                rec_print_comp(stderr, mrec, offsets);
 
967
                putc('\n', stderr);
 
968
        }
 
969
#endif /* UNIV_DEBUG */
 
970
 
 
971
        if (e < 0x80) {
 
972
                *b++ = (byte) e;
 
973
        } else {
 
974
                *b++ = (byte) (0x80 | (e >> 8));
 
975
                *b++ = (byte) e;
 
976
        }
 
977
 
 
978
        memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
 
979
        ut_ad(b + rec_offs_size(offsets) == end);
 
980
}
 
981
 
 
982
/********************************************************************//**
 
983
Write a merge record.
 
984
@return pointer to end of block, or NULL on error */
 
985
static
 
986
byte*
 
987
row_merge_write_rec(
 
988
/*================*/
 
989
        row_merge_block_t*      block,  /*!< in/out: file buffer */
 
990
        mrec_buf_t*             buf,    /*!< in/out: secondary buffer */
 
991
        byte*                   b,      /*!< in: pointer to end of block */
 
992
        int                     fd,     /*!< in: file descriptor */
 
993
        ulint*                  foffs,  /*!< in/out: file offset */
 
994
        const mrec_t*           mrec,   /*!< in: record to write */
 
995
        const ulint*            offsets)/*!< in: offsets of mrec */
 
996
{
 
997
        ulint   extra_size;
 
998
        ulint   size;
 
999
        ulint   avail_size;
 
1000
 
 
1001
        ut_ad(block);
 
1002
        ut_ad(buf);
 
1003
        ut_ad(b >= block[0]);
 
1004
        ut_ad(b < block[1]);
 
1005
        ut_ad(mrec);
 
1006
        ut_ad(foffs);
 
1007
        ut_ad(mrec < block[0] || mrec > block[1]);
 
1008
        ut_ad(mrec < buf[0] || mrec > buf[1]);
 
1009
 
 
1010
        /* Normalize extra_size.  Value 0 signals "end of list". */
 
1011
        extra_size = rec_offs_extra_size(offsets) + 1;
 
1012
 
 
1013
        size = extra_size + (extra_size >= 0x80)
 
1014
                + rec_offs_data_size(offsets);
 
1015
 
 
1016
        if (UNIV_UNLIKELY(b + size >= block[1])) {
 
1017
                /* The record spans two blocks.
 
1018
                Copy it to the temporary buffer first. */
 
1019
                avail_size = block[1] - b;
 
1020
 
 
1021
                row_merge_write_rec_low(buf[0],
 
1022
                                        extra_size, size, fd, *foffs,
 
1023
                                        mrec, offsets);
 
1024
 
 
1025
                /* Copy the head of the temporary buffer, write
 
1026
                the completed block, and copy the tail of the
 
1027
                record to the head of the new block. */
 
1028
                memcpy(b, buf[0], avail_size);
 
1029
 
 
1030
                if (!row_merge_write(fd, (*foffs)++, block)) {
 
1031
                        return(NULL);
 
1032
                }
 
1033
 
 
1034
                UNIV_MEM_INVALID(block[0], sizeof block[0]);
 
1035
 
 
1036
                /* Copy the rest. */
 
1037
                b = block[0];
 
1038
                memcpy(b, buf[0] + avail_size, size - avail_size);
 
1039
                b += size - avail_size;
 
1040
        } else {
 
1041
                row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
 
1042
                                        mrec, offsets);
 
1043
                b += size;
 
1044
        }
 
1045
 
 
1046
        return(b);
 
1047
}
 
1048
 
 
1049
/********************************************************************//**
 
1050
Write an end-of-list marker.
 
1051
@return pointer to end of block, or NULL on error */
 
1052
static
 
1053
byte*
 
1054
row_merge_write_eof(
 
1055
/*================*/
 
1056
        row_merge_block_t*      block,  /*!< in/out: file buffer */
 
1057
        byte*                   b,      /*!< in: pointer to end of block */
 
1058
        int                     fd,     /*!< in: file descriptor */
 
1059
        ulint*                  foffs)  /*!< in/out: file offset */
 
1060
{
 
1061
        ut_ad(block);
 
1062
        ut_ad(b >= block[0]);
 
1063
        ut_ad(b < block[1]);
 
1064
        ut_ad(foffs);
 
1065
#ifdef UNIV_DEBUG
 
1066
        if (row_merge_print_write) {
 
1067
                fprintf(stderr, "row_merge_write %p,%p,%d,%lu EOF\n",
 
1068
                        (void*) b, (void*) block, fd, (ulong) *foffs);
 
1069
        }
 
1070
#endif /* UNIV_DEBUG */
 
1071
 
 
1072
        *b++ = 0;
 
1073
        UNIV_MEM_ASSERT_RW(block[0], b - block[0]);
 
1074
        UNIV_MEM_ASSERT_W(block[0], sizeof block[0]);
 
1075
#ifdef UNIV_DEBUG_VALGRIND
 
1076
        /* The rest of the block is uninitialized.  Initialize it
 
1077
        to avoid bogus warnings. */
 
1078
        memset(b, 0xff, block[1] - b);
 
1079
#endif /* UNIV_DEBUG_VALGRIND */
 
1080
 
 
1081
        if (!row_merge_write(fd, (*foffs)++, block)) {
 
1082
                return(NULL);
 
1083
        }
 
1084
 
 
1085
        UNIV_MEM_INVALID(block[0], sizeof block[0]);
 
1086
        return(block[0]);
 
1087
}
 
1088
 
 
1089
/*************************************************************//**
 
1090
Compare two merge records.
 
1091
@return 1, 0, -1 if mrec1 is greater, equal, less, respectively, than mrec2 */
 
1092
static
 
1093
int
 
1094
row_merge_cmp(
 
1095
/*==========*/
 
1096
        const mrec_t*           mrec1,          /*!< in: first merge
 
1097
                                                record to be compared */
 
1098
        const mrec_t*           mrec2,          /*!< in: second merge
 
1099
                                                record to be compared */
 
1100
        const ulint*            offsets1,       /*!< in: first record offsets */
 
1101
        const ulint*            offsets2,       /*!< in: second record offsets */
 
1102
        const dict_index_t*     index,          /*!< in: index */
 
1103
        ibool*                  null_eq)        /*!< out: set to TRUE if
 
1104
                                                found matching null values */
 
1105
{
 
1106
        int     cmp;
 
1107
 
 
1108
        cmp = cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index,
 
1109
                                 null_eq);
 
1110
 
 
1111
#ifdef UNIV_DEBUG
 
1112
        if (row_merge_print_cmp) {
 
1113
                fputs("row_merge_cmp1 ", stderr);
 
1114
                rec_print_comp(stderr, mrec1, offsets1);
 
1115
                fputs("\nrow_merge_cmp2 ", stderr);
 
1116
                rec_print_comp(stderr, mrec2, offsets2);
 
1117
                fprintf(stderr, "\nrow_merge_cmp=%d\n", cmp);
 
1118
        }
 
1119
#endif /* UNIV_DEBUG */
 
1120
 
 
1121
        return(cmp);
 
1122
}
 
1123
 
 
1124
/********************************************************************//**
 
1125
Reads clustered index of the table and create temporary files
 
1126
containing the index entries for the indexes to be built.
 
1127
@return DB_SUCCESS or error */
 
1128
static __attribute__((nonnull))
 
1129
ulint
 
1130
row_merge_read_clustered_index(
 
1131
/*===========================*/
 
1132
        trx_t*                  trx,    /*!< in: transaction */
 
1133
        struct TABLE*           table,  /*!< in/out: MySQL table object,
 
1134
                                        for reporting erroneous records */
 
1135
        const dict_table_t*     old_table,/*!< in: table where rows are
 
1136
                                        read from */
 
1137
        const dict_table_t*     new_table,/*!< in: table where indexes are
 
1138
                                        created; identical to old_table
 
1139
                                        unless creating a PRIMARY KEY */
 
1140
        dict_index_t**          index,  /*!< in: indexes to be created */
 
1141
        merge_file_t*           files,  /*!< in: temporary files */
 
1142
        ulint                   n_index,/*!< in: number of indexes to create */
 
1143
        row_merge_block_t*      block)  /*!< in/out: file buffer */
 
1144
{
 
1145
        dict_index_t*           clust_index;    /* Clustered index */
 
1146
        mem_heap_t*             row_heap;       /* Heap memory to create
 
1147
                                                clustered index records */
 
1148
        row_merge_buf_t**       merge_buf;      /* Temporary list for records*/
 
1149
        btr_pcur_t              pcur;           /* Persistent cursor on the
 
1150
                                                clustered index */
 
1151
        mtr_t                   mtr;            /* Mini transaction */
 
1152
        ulint                   err = DB_SUCCESS;/* Return code */
 
1153
        ulint                   i;
 
1154
        ulint                   n_nonnull = 0;  /* number of columns
 
1155
                                                changed to NOT NULL */
 
1156
        ulint*                  nonnull = NULL; /* NOT NULL columns */
 
1157
 
 
1158
        trx->op_info = "reading clustered index";
 
1159
 
 
1160
        ut_ad(trx);
 
1161
        ut_ad(old_table);
 
1162
        ut_ad(new_table);
 
1163
        ut_ad(index);
 
1164
        ut_ad(files);
 
1165
 
 
1166
        /* Create and initialize memory for record buffers */
 
1167
 
 
1168
        merge_buf = mem_alloc(n_index * sizeof *merge_buf);
 
1169
 
 
1170
        for (i = 0; i < n_index; i++) {
 
1171
                merge_buf[i] = row_merge_buf_create(index[i]);
 
1172
        }
 
1173
 
 
1174
        mtr_start(&mtr);
 
1175
 
 
1176
        /* Find the clustered index and create a persistent cursor
 
1177
        based on that. */
 
1178
 
 
1179
        clust_index = dict_table_get_first_index(old_table);
 
1180
 
 
1181
        btr_pcur_open_at_index_side(
 
1182
                TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
 
1183
 
 
1184
        if (UNIV_UNLIKELY(old_table != new_table)) {
 
1185
                ulint   n_cols = dict_table_get_n_cols(old_table);
 
1186
 
 
1187
                /* A primary key will be created.  Identify the
 
1188
                columns that were flagged NOT NULL in the new table,
 
1189
                so that we can quickly check that the records in the
 
1190
                (old) clustered index do not violate the added NOT
 
1191
                NULL constraints. */
 
1192
 
 
1193
                ut_a(n_cols == dict_table_get_n_cols(new_table));
 
1194
 
 
1195
                nonnull = mem_alloc(n_cols * sizeof *nonnull);
 
1196
 
 
1197
                for (i = 0; i < n_cols; i++) {
 
1198
                        if (dict_table_get_nth_col(old_table, i)->prtype
 
1199
                            & DATA_NOT_NULL) {
 
1200
 
 
1201
                                continue;
 
1202
                        }
 
1203
 
 
1204
                        if (dict_table_get_nth_col(new_table, i)->prtype
 
1205
                            & DATA_NOT_NULL) {
 
1206
 
 
1207
                                nonnull[n_nonnull++] = i;
 
1208
                        }
 
1209
                }
 
1210
 
 
1211
                if (!n_nonnull) {
 
1212
                        mem_free(nonnull);
 
1213
                        nonnull = NULL;
 
1214
                }
 
1215
        }
 
1216
 
 
1217
        row_heap = mem_heap_create(sizeof(mrec_buf_t));
 
1218
 
 
1219
        /* Scan the clustered index. */
 
1220
        for (;;) {
 
1221
                const rec_t*    rec;
 
1222
                ulint*          offsets;
 
1223
                dtuple_t*       row             = NULL;
 
1224
                row_ext_t*      ext;
 
1225
                ibool           has_next        = TRUE;
 
1226
 
 
1227
                btr_pcur_move_to_next_on_page(&pcur);
 
1228
 
 
1229
                /* When switching pages, commit the mini-transaction
 
1230
                in order to release the latch on the old page. */
 
1231
 
 
1232
                if (btr_pcur_is_after_last_on_page(&pcur)) {
 
1233
                        if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
 
1234
                                err = DB_INTERRUPTED;
 
1235
                                trx->error_key_num = 0;
 
1236
                                goto func_exit;
 
1237
                        }
 
1238
 
 
1239
                        btr_pcur_store_position(&pcur, &mtr);
 
1240
                        mtr_commit(&mtr);
 
1241
                        mtr_start(&mtr);
 
1242
                        btr_pcur_restore_position(BTR_SEARCH_LEAF,
 
1243
                                                  &pcur, &mtr);
 
1244
                        has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
 
1245
                }
 
1246
 
 
1247
                if (UNIV_LIKELY(has_next)) {
 
1248
                        rec = btr_pcur_get_rec(&pcur);
 
1249
                        offsets = rec_get_offsets(rec, clust_index, NULL,
 
1250
                                                  ULINT_UNDEFINED, &row_heap);
 
1251
 
 
1252
                        /* Skip delete marked records. */
 
1253
                        if (rec_get_deleted_flag(
 
1254
                                    rec, dict_table_is_comp(old_table))) {
 
1255
                                continue;
 
1256
                        }
 
1257
 
 
1258
                        srv_n_rows_inserted++;
 
1259
 
 
1260
                        /* Build a row based on the clustered index. */
 
1261
 
 
1262
                        row = row_build(ROW_COPY_POINTERS, clust_index,
 
1263
                                        rec, offsets,
 
1264
                                        new_table, &ext, row_heap);
 
1265
 
 
1266
                        if (UNIV_LIKELY_NULL(nonnull)) {
 
1267
                                for (i = 0; i < n_nonnull; i++) {
 
1268
                                        dfield_t*       field
 
1269
                                                = &row->fields[nonnull[i]];
 
1270
                                        dtype_t*        field_type
 
1271
                                                = dfield_get_type(field);
 
1272
 
 
1273
                                        ut_a(!(field_type->prtype
 
1274
                                               & DATA_NOT_NULL));
 
1275
 
 
1276
                                        if (dfield_is_null(field)) {
 
1277
                                                err = DB_PRIMARY_KEY_IS_NULL;
 
1278
                                                trx->error_key_num = 0;
 
1279
                                                goto func_exit;
 
1280
                                        }
 
1281
 
 
1282
                                        field_type->prtype |= DATA_NOT_NULL;
 
1283
                                }
 
1284
                        }
 
1285
                }
 
1286
 
 
1287
                /* Build all entries for all the indexes to be created
 
1288
                in a single scan of the clustered index. */
 
1289
 
 
1290
                for (i = 0; i < n_index; i++) {
 
1291
                        row_merge_buf_t*        buf     = merge_buf[i];
 
1292
                        merge_file_t*           file    = &files[i];
 
1293
                        const dict_index_t*     index   = buf->index;
 
1294
 
 
1295
                        if (UNIV_LIKELY
 
1296
                            (row && row_merge_buf_add(buf, row, ext))) {
 
1297
                                file->n_rec++;
 
1298
                                continue;
 
1299
                        }
 
1300
 
 
1301
                        /* The buffer must be sufficiently large
 
1302
                        to hold at least one record. */
 
1303
                        ut_ad(buf->n_tuples || !has_next);
 
1304
 
 
1305
                        /* We have enough data tuples to form a block.
 
1306
                        Sort them and write to disk. */
 
1307
 
 
1308
                        if (buf->n_tuples) {
 
1309
                                if (dict_index_is_unique(index)) {
 
1310
                                        row_merge_dup_t dup;
 
1311
                                        dup.index = buf->index;
 
1312
                                        dup.table = table;
 
1313
                                        dup.n_dup = 0;
 
1314
 
 
1315
                                        row_merge_buf_sort(buf, &dup);
 
1316
 
 
1317
                                        if (dup.n_dup) {
 
1318
                                                err = DB_DUPLICATE_KEY;
 
1319
                                                trx->error_key_num = i;
 
1320
                                                goto func_exit;
 
1321
                                        }
 
1322
                                } else {
 
1323
                                        row_merge_buf_sort(buf, NULL);
 
1324
                                }
 
1325
                        }
 
1326
 
 
1327
                        row_merge_buf_write(buf, file, block);
 
1328
 
 
1329
                        if (!row_merge_write(file->fd, file->offset++,
 
1330
                                             block)) {
 
1331
                                err = DB_OUT_OF_FILE_SPACE;
 
1332
                                trx->error_key_num = i;
 
1333
                                goto func_exit;
 
1334
                        }
 
1335
 
 
1336
                        UNIV_MEM_INVALID(block[0], sizeof block[0]);
 
1337
                        merge_buf[i] = row_merge_buf_empty(buf);
 
1338
 
 
1339
                        if (UNIV_LIKELY(row != NULL)) {
 
1340
                                /* Try writing the record again, now
 
1341
                                that the buffer has been written out
 
1342
                                and emptied. */
 
1343
 
 
1344
                                if (UNIV_UNLIKELY
 
1345
                                    (!row_merge_buf_add(buf, row, ext))) {
 
1346
                                        /* An empty buffer should have enough
 
1347
                                        room for at least one record. */
 
1348
                                        ut_error;
 
1349
                                }
 
1350
 
 
1351
                                file->n_rec++;
 
1352
                        }
 
1353
                }
 
1354
 
 
1355
                mem_heap_empty(row_heap);
 
1356
 
 
1357
                if (UNIV_UNLIKELY(!has_next)) {
 
1358
                        goto func_exit;
 
1359
                }
 
1360
        }
 
1361
 
 
1362
func_exit:
 
1363
        btr_pcur_close(&pcur);
 
1364
        mtr_commit(&mtr);
 
1365
        mem_heap_free(row_heap);
 
1366
 
 
1367
        if (UNIV_LIKELY_NULL(nonnull)) {
 
1368
                mem_free(nonnull);
 
1369
        }
 
1370
 
 
1371
        for (i = 0; i < n_index; i++) {
 
1372
                row_merge_buf_free(merge_buf[i]);
 
1373
        }
 
1374
 
 
1375
        mem_free(merge_buf);
 
1376
 
 
1377
        trx->op_info = "";
 
1378
 
 
1379
        return(err);
 
1380
}
 
1381
 
 
1382
/** Write a record via buffer 2 and read the next record to buffer N.
 
1383
@param N        number of the buffer (0 or 1)
 
1384
@param AT_END   statement to execute at end of input */
 
1385
#define ROW_MERGE_WRITE_GET_NEXT(N, AT_END)                             \
 
1386
        do {                                                            \
 
1387
                b2 = row_merge_write_rec(&block[2], &buf[2], b2,        \
 
1388
                                         of->fd, &of->offset,           \
 
1389
                                         mrec##N, offsets##N);          \
 
1390
                if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) {  \
 
1391
                        goto corrupt;                                   \
 
1392
                }                                                       \
 
1393
                b##N = row_merge_read_rec(&block[N], &buf[N],           \
 
1394
                                          b##N, index,                  \
 
1395
                                          file->fd, foffs##N,           \
 
1396
                                          &mrec##N, offsets##N);        \
 
1397
                if (UNIV_UNLIKELY(!b##N)) {                             \
 
1398
                        if (mrec##N) {                                  \
 
1399
                                goto corrupt;                           \
 
1400
                        }                                               \
 
1401
                        AT_END;                                         \
 
1402
                }                                                       \
 
1403
        } while (0)
 
1404
 
 
1405
/*************************************************************//**
 
1406
Merge two blocks of records on disk and write a bigger block.
 
1407
@return DB_SUCCESS or error code */
 
1408
static
 
1409
ulint
 
1410
row_merge_blocks(
 
1411
/*=============*/
 
1412
        const dict_index_t*     index,  /*!< in: index being created */
 
1413
        const merge_file_t*     file,   /*!< in: file containing
 
1414
                                        index entries */
 
1415
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
 
1416
        ulint*                  foffs0, /*!< in/out: offset of first
 
1417
                                        source list in the file */
 
1418
        ulint*                  foffs1, /*!< in/out: offset of second
 
1419
                                        source list in the file */
 
1420
        merge_file_t*           of,     /*!< in/out: output file */
 
1421
        struct TABLE*           table)  /*!< in/out: MySQL table, for
 
1422
                                        reporting erroneous key value
 
1423
                                        if applicable */
 
1424
{
 
1425
        mem_heap_t*     heap;   /*!< memory heap for offsets0, offsets1 */
 
1426
 
 
1427
        mrec_buf_t*     buf;    /*!< buffer for handling
 
1428
                                split mrec in block[] */
 
1429
        const byte*     b0;     /*!< pointer to block[0] */
 
1430
        const byte*     b1;     /*!< pointer to block[1] */
 
1431
        byte*           b2;     /*!< pointer to block[2] */
 
1432
        const mrec_t*   mrec0;  /*!< merge rec, points to block[0] or buf[0] */
 
1433
        const mrec_t*   mrec1;  /*!< merge rec, points to block[1] or buf[1] */
 
1434
        ulint*          offsets0;/* offsets of mrec0 */
 
1435
        ulint*          offsets1;/* offsets of mrec1 */
 
1436
 
 
1437
#ifdef UNIV_DEBUG
 
1438
        if (row_merge_print_block) {
 
1439
                fprintf(stderr,
 
1440
                        "row_merge_blocks fd=%d ofs=%lu + fd=%d ofs=%lu"
 
1441
                        " = fd=%d ofs=%lu\n",
 
1442
                        file->fd, (ulong) *foffs0,
 
1443
                        file->fd, (ulong) *foffs1,
 
1444
                        of->fd, (ulong) of->offset);
 
1445
        }
 
1446
#endif /* UNIV_DEBUG */
 
1447
 
 
1448
        heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
 
1449
 
 
1450
        buf = mem_heap_alloc(heap, sizeof(mrec_buf_t) * 3);
 
1451
 
 
1452
        /* Write a record and read the next record.  Split the output
 
1453
        file in two halves, which can be merged on the following pass. */
 
1454
 
 
1455
        if (!row_merge_read(file->fd, *foffs0, &block[0])
 
1456
            || !row_merge_read(file->fd, *foffs1, &block[1])) {
 
1457
corrupt:
 
1458
                mem_heap_free(heap);
 
1459
                return(DB_CORRUPTION);
 
1460
        }
 
1461
 
 
1462
        b0 = block[0];
 
1463
        b1 = block[1];
 
1464
        b2 = block[2];
 
1465
 
 
1466
        b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
 
1467
                                foffs0, &mrec0, offsets0);
 
1468
        b1 = row_merge_read_rec(&block[1], &buf[1], b1, index, file->fd,
 
1469
                                foffs1, &mrec1, offsets1);
 
1470
        if (UNIV_UNLIKELY(!b0 && mrec0)
 
1471
            || UNIV_UNLIKELY(!b1 && mrec1)) {
 
1472
 
 
1473
                goto corrupt;
 
1474
        }
 
1475
 
 
1476
        while (mrec0 && mrec1) {
 
1477
                ibool   null_eq = FALSE;
 
1478
                switch (row_merge_cmp(mrec0, mrec1,
 
1479
                                      offsets0, offsets1, index,
 
1480
                                      &null_eq)) {
 
1481
                case 0:
 
1482
                        if (UNIV_UNLIKELY
 
1483
                            (dict_index_is_unique(index) && !null_eq)) {
 
1484
                                innobase_rec_to_mysql(table, mrec0,
 
1485
                                                      index, offsets0);
 
1486
                                mem_heap_free(heap);
 
1487
                                return(DB_DUPLICATE_KEY);
 
1488
                        }
 
1489
                        /* fall through */
 
1490
                case -1:
 
1491
                        ROW_MERGE_WRITE_GET_NEXT(0, goto merged);
 
1492
                        break;
 
1493
                case 1:
 
1494
                        ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
 
1495
                        break;
 
1496
                default:
 
1497
                        ut_error;
 
1498
                }
 
1499
 
 
1500
        }
 
1501
 
 
1502
merged:
 
1503
        if (mrec0) {
 
1504
                /* append all mrec0 to output */
 
1505
                for (;;) {
 
1506
                        ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
 
1507
                }
 
1508
        }
 
1509
done0:
 
1510
        if (mrec1) {
 
1511
                /* append all mrec1 to output */
 
1512
                for (;;) {
 
1513
                        ROW_MERGE_WRITE_GET_NEXT(1, goto done1);
 
1514
                }
 
1515
        }
 
1516
done1:
 
1517
 
 
1518
        mem_heap_free(heap);
 
1519
        b2 = row_merge_write_eof(&block[2], b2, of->fd, &of->offset);
 
1520
        return(b2 ? DB_SUCCESS : DB_CORRUPTION);
 
1521
}
 
1522
 
 
1523
/*************************************************************//**
 
1524
Copy a block of index entries.
 
1525
@return TRUE on success, FALSE on failure */
 
1526
static __attribute__((nonnull))
 
1527
ibool
 
1528
row_merge_blocks_copy(
 
1529
/*==================*/
 
1530
        const dict_index_t*     index,  /*!< in: index being created */
 
1531
        const merge_file_t*     file,   /*!< in: input file */
 
1532
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
 
1533
        ulint*                  foffs0, /*!< in/out: input file offset */
 
1534
        merge_file_t*           of)     /*!< in/out: output file */
 
1535
{
 
1536
        mem_heap_t*     heap;   /*!< memory heap for offsets0, offsets1 */
 
1537
 
 
1538
        mrec_buf_t*     buf;    /*!< buffer for handling
 
1539
                                split mrec in block[] */
 
1540
        const byte*     b0;     /*!< pointer to block[0] */
 
1541
        byte*           b2;     /*!< pointer to block[2] */
 
1542
        const mrec_t*   mrec0;  /*!< merge rec, points to block[0] */
 
1543
        ulint*          offsets0;/* offsets of mrec0 */
 
1544
        ulint*          offsets1;/* dummy offsets */
 
1545
 
 
1546
#ifdef UNIV_DEBUG
 
1547
        if (row_merge_print_block) {
 
1548
                fprintf(stderr,
 
1549
                        "row_merge_blocks_copy fd=%d ofs=%lu"
 
1550
                        " = fd=%d ofs=%lu\n",
 
1551
                        file->fd, (ulong) foffs0,
 
1552
                        of->fd, (ulong) of->offset);
 
1553
        }
 
1554
#endif /* UNIV_DEBUG */
 
1555
 
 
1556
        heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
 
1557
        buf = mem_heap_alloc(heap, sizeof(mrec_buf_t) * 3);
 
1558
 
 
1559
        /* Write a record and read the next record.  Split the output
 
1560
        file in two halves, which can be merged on the following pass. */
 
1561
 
 
1562
        if (!row_merge_read(file->fd, *foffs0, &block[0])) {
 
1563
corrupt:
 
1564
                mem_heap_free(heap);
 
1565
                return(FALSE);
 
1566
        }
 
1567
 
 
1568
        b0 = block[0];
 
1569
        b2 = block[2];
 
1570
 
 
1571
        b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
 
1572
                                foffs0, &mrec0, offsets0);
 
1573
        if (UNIV_UNLIKELY(!b0 && mrec0)) {
 
1574
 
 
1575
                goto corrupt;
 
1576
        }
 
1577
 
 
1578
        if (mrec0) {
 
1579
                /* append all mrec0 to output */
 
1580
                for (;;) {
 
1581
                        ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
 
1582
                }
 
1583
        }
 
1584
done0:
 
1585
 
 
1586
        /* The file offset points to the beginning of the last page
 
1587
        that has been read.  Update it to point to the next block. */
 
1588
        (*foffs0)++;
 
1589
 
 
1590
        mem_heap_free(heap);
 
1591
        return(row_merge_write_eof(&block[2], b2, of->fd, &of->offset)
 
1592
               != NULL);
 
1593
}
 
1594
 
 
1595
/*************************************************************//**
 
1596
Merge disk files.
 
1597
@return DB_SUCCESS or error code */
 
1598
static __attribute__((nonnull))
 
1599
ulint
 
1600
row_merge(
 
1601
/*======*/
 
1602
        trx_t*                  trx,    /*!< in: transaction */
 
1603
        const dict_index_t*     index,  /*!< in: index being created */
 
1604
        merge_file_t*           file,   /*!< in/out: file containing
 
1605
                                        index entries */
 
1606
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
 
1607
        int*                    tmpfd,  /*!< in/out: temporary file handle */
 
1608
        struct TABLE*           table,  /*!< in/out: MySQL table, for
 
1609
                                        reporting erroneous key value
 
1610
                                        if applicable */
 
1611
        ulint*                  num_run,/*!< in/out: Number of runs remain
 
1612
                                        to be merged */
 
1613
        ulint*                  run_offset) /*!< in/out: Array contains the
 
1614
                                        first offset number for each merge
 
1615
                                        run */
 
1616
{
 
1617
        ulint           foffs0; /*!< first input offset */
 
1618
        ulint           foffs1; /*!< second input offset */
 
1619
        ulint           error;  /*!< error code */
 
1620
        merge_file_t    of;     /*!< output file */
 
1621
        const ulint     ihalf   = run_offset[*num_run / 2];
 
1622
                                /*!< half the input file */
 
1623
        ulint           n_run   = 0;
 
1624
                                /*!< num of runs generated from this merge */
 
1625
 
 
1626
 
 
1627
        UNIV_MEM_ASSERT_W(block[0], 3 * sizeof block[0]);
 
1628
        ut_ad(ihalf < file->offset);
 
1629
 
 
1630
        of.fd = *tmpfd;
 
1631
        of.offset = 0;
 
1632
        of.n_rec = 0;
 
1633
 
 
1634
#ifdef POSIX_FADV_SEQUENTIAL
 
1635
        /* The input file will be read sequentially, starting from the
 
1636
        beginning and the middle.  In Linux, the POSIX_FADV_SEQUENTIAL
 
1637
        affects the entire file.  Each block will be read exactly once. */
 
1638
        posix_fadvise(file->fd, 0, 0,
 
1639
                      POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
 
1640
#endif /* POSIX_FADV_SEQUENTIAL */
 
1641
 
 
1642
        /* Merge blocks to the output file. */
 
1643
        foffs0 = 0;
 
1644
        foffs1 = ihalf;
 
1645
 
 
1646
        UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
 
1647
 
 
1648
        for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
 
1649
 
 
1650
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
 
1651
                        return(DB_INTERRUPTED);
 
1652
                }
 
1653
 
 
1654
                /* Remember the offset number for this run */
 
1655
                run_offset[n_run++] = of.offset;
 
1656
 
 
1657
                error = row_merge_blocks(index, file, block,
 
1658
                                         &foffs0, &foffs1, &of, table);
 
1659
 
 
1660
                if (error != DB_SUCCESS) {
 
1661
                        return(error);
 
1662
                }
 
1663
 
 
1664
        }
 
1665
 
 
1666
        /* Copy the last blocks, if there are any. */
 
1667
 
 
1668
        while (foffs0 < ihalf) {
 
1669
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
 
1670
                        return(DB_INTERRUPTED);
 
1671
                }
 
1672
 
 
1673
                /* Remember the offset number for this run */
 
1674
                run_offset[n_run++] = of.offset;
 
1675
 
 
1676
                if (!row_merge_blocks_copy(index, file, block, &foffs0, &of)) {
 
1677
                        return(DB_CORRUPTION);
 
1678
                }
 
1679
        }
 
1680
 
 
1681
        ut_ad(foffs0 == ihalf);
 
1682
 
 
1683
        while (foffs1 < file->offset) {
 
1684
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
 
1685
                        return(DB_INTERRUPTED);
 
1686
                }
 
1687
 
 
1688
                /* Remember the offset number for this run */
 
1689
                run_offset[n_run++] = of.offset;
 
1690
 
 
1691
                if (!row_merge_blocks_copy(index, file, block, &foffs1, &of)) {
 
1692
                        return(DB_CORRUPTION);
 
1693
                }
 
1694
        }
 
1695
 
 
1696
        ut_ad(foffs1 == file->offset);
 
1697
 
 
1698
        if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
 
1699
                return(DB_CORRUPTION);
 
1700
        }
 
1701
 
 
1702
        ut_ad(n_run <= *num_run);
 
1703
 
 
1704
        *num_run = n_run;
 
1705
 
 
1706
        /* Each run can contain one or more offsets. As merge goes on,
 
1707
        the number of runs (to merge) will reduce until we have one
 
1708
        single run. So the number of runs will always be smaller than
 
1709
        the number of offsets in file */
 
1710
        ut_ad((*num_run) <= file->offset);
 
1711
 
 
1712
        /* The number of offsets in output file is always equal or
 
1713
        smaller than input file */
 
1714
        ut_ad(of.offset <= file->offset);
 
1715
 
 
1716
        /* Swap file descriptors for the next pass. */
 
1717
        *tmpfd = file->fd;
 
1718
        *file = of;
 
1719
 
 
1720
        UNIV_MEM_INVALID(block[0], 3 * sizeof block[0]);
 
1721
 
 
1722
        return(DB_SUCCESS);
 
1723
}
 
1724
 
 
1725
/*************************************************************//**
 
1726
Merge disk files.
 
1727
@return DB_SUCCESS or error code */
 
1728
static
 
1729
ulint
 
1730
row_merge_sort(
 
1731
/*===========*/
 
1732
        trx_t*                  trx,    /*!< in: transaction */
 
1733
        const dict_index_t*     index,  /*!< in: index being created */
 
1734
        merge_file_t*           file,   /*!< in/out: file containing
 
1735
                                        index entries */
 
1736
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
 
1737
        int*                    tmpfd,  /*!< in/out: temporary file handle */
 
1738
        struct TABLE*           table)  /*!< in/out: MySQL table, for
 
1739
                                        reporting erroneous key value
 
1740
                                        if applicable */
 
1741
{
 
1742
        ulint   half = file->offset / 2;
 
1743
        ulint   num_runs;
 
1744
        ulint*  run_offset;
 
1745
        ulint   error = DB_SUCCESS;
 
1746
 
 
1747
        /* Record the number of merge runs we need to perform */
 
1748
        num_runs = file->offset;
 
1749
 
 
1750
        /* If num_runs are less than 1, nothing to merge */
 
1751
        if (num_runs <= 1) {
 
1752
                return(error);
 
1753
        }
 
1754
 
 
1755
        /* "run_offset" records each run's first offset number */
 
1756
        run_offset = (ulint*) mem_alloc(file->offset * sizeof(ulint));
 
1757
 
 
1758
        /* This tells row_merge() where to start for the first round
 
1759
        of merge. */
 
1760
        run_offset[half] = half;
 
1761
 
 
1762
        /* The file should always contain at least one byte (the end
 
1763
        of file marker).  Thus, it must be at least one block. */
 
1764
        ut_ad(file->offset > 0);
 
1765
 
 
1766
        /* Merge the runs until we have one big run */
 
1767
        do {
 
1768
                error = row_merge(trx, index, file, block, tmpfd,
 
1769
                                  table, &num_runs, run_offset);
 
1770
 
 
1771
                UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
 
1772
 
 
1773
                if (error != DB_SUCCESS) {
 
1774
                        break;
 
1775
                }
 
1776
        } while (num_runs > 1);
 
1777
 
 
1778
        mem_free(run_offset);
 
1779
 
 
1780
        return(error);
 
1781
}
 
1782
 
 
1783
/*************************************************************//**
 
1784
Copy externally stored columns to the data tuple. */
 
1785
static
 
1786
void
 
1787
row_merge_copy_blobs(
 
1788
/*=================*/
 
1789
        const mrec_t*   mrec,   /*!< in: merge record */
 
1790
        const ulint*    offsets,/*!< in: offsets of mrec */
 
1791
        ulint           zip_size,/*!< in: compressed page size in bytes, or 0 */
 
1792
        dtuple_t*       tuple,  /*!< in/out: data tuple */
 
1793
        mem_heap_t*     heap)   /*!< in/out: memory heap */
 
1794
{
 
1795
        ulint   i;
 
1796
        ulint   n_fields = dtuple_get_n_fields(tuple);
 
1797
 
 
1798
        for (i = 0; i < n_fields; i++) {
 
1799
                ulint           len;
 
1800
                const void*     data;
 
1801
                dfield_t*       field = dtuple_get_nth_field(tuple, i);
 
1802
 
 
1803
                if (!dfield_is_ext(field)) {
 
1804
                        continue;
 
1805
                }
 
1806
 
 
1807
                ut_ad(!dfield_is_null(field));
 
1808
 
 
1809
                /* The table is locked during index creation.
 
1810
                Therefore, externally stored columns cannot possibly
 
1811
                be freed between the time the BLOB pointers are read
 
1812
                (row_merge_read_clustered_index()) and dereferenced
 
1813
                (below). */
 
1814
                data = btr_rec_copy_externally_stored_field(
 
1815
                        mrec, offsets, zip_size, i, &len, heap);
 
1816
                /* Because we have locked the table, any records
 
1817
                written by incomplete transactions must have been
 
1818
                rolled back already. There must not be any incomplete
 
1819
                BLOB columns. */
 
1820
                ut_a(data);
 
1821
 
 
1822
                dfield_set_data(field, data, len);
 
1823
        }
 
1824
}
 
1825
 
 
1826
/********************************************************************//**
 
1827
Read sorted file containing index data tuples and insert these data
 
1828
tuples to the index
 
1829
@return DB_SUCCESS or error number */
 
1830
static
 
1831
ulint
 
1832
row_merge_insert_index_tuples(
 
1833
/*==========================*/
 
1834
        trx_t*                  trx,    /*!< in: transaction */
 
1835
        dict_index_t*           index,  /*!< in: index */
 
1836
        dict_table_t*           table,  /*!< in: new table */
 
1837
        ulint                   zip_size,/*!< in: compressed page size of
 
1838
                                         the old table, or 0 if uncompressed */
 
1839
        int                     fd,     /*!< in: file descriptor */
 
1840
        row_merge_block_t*      block)  /*!< in/out: file buffer */
 
1841
{
 
1842
        const byte*             b;
 
1843
        que_thr_t*              thr;
 
1844
        ins_node_t*             node;
 
1845
        mem_heap_t*             tuple_heap;
 
1846
        mem_heap_t*             graph_heap;
 
1847
        ulint                   error = DB_SUCCESS;
 
1848
        ulint                   foffs = 0;
 
1849
        ulint*                  offsets;
 
1850
 
 
1851
        ut_ad(trx);
 
1852
        ut_ad(index);
 
1853
        ut_ad(table);
 
1854
 
 
1855
        /* We use the insert query graph as the dummy graph
 
1856
        needed in the row module call */
 
1857
 
 
1858
        trx->op_info = "inserting index entries";
 
1859
 
 
1860
        graph_heap = mem_heap_create(500 + sizeof(mrec_buf_t));
 
1861
        node = ins_node_create(INS_DIRECT, table, graph_heap);
 
1862
 
 
1863
        thr = pars_complete_graph_for_exec(node, trx, graph_heap);
 
1864
 
 
1865
        que_thr_move_to_run_state_for_mysql(thr, trx);
 
1866
 
 
1867
        tuple_heap = mem_heap_create(1000);
 
1868
 
 
1869
        {
 
1870
                ulint i = 1 + REC_OFFS_HEADER_SIZE
 
1871
                        + dict_index_get_n_fields(index);
 
1872
                offsets = mem_heap_alloc(graph_heap, i * sizeof *offsets);
 
1873
                offsets[0] = i;
 
1874
                offsets[1] = dict_index_get_n_fields(index);
 
1875
        }
 
1876
 
 
1877
        b = *block;
 
1878
 
 
1879
        if (!row_merge_read(fd, foffs, block)) {
 
1880
                error = DB_CORRUPTION;
 
1881
        } else {
 
1882
                mrec_buf_t*     buf = mem_heap_alloc(graph_heap, sizeof *buf);
 
1883
 
 
1884
                for (;;) {
 
1885
                        const mrec_t*   mrec;
 
1886
                        dtuple_t*       dtuple;
 
1887
                        ulint           n_ext;
 
1888
 
 
1889
                        b = row_merge_read_rec(block, buf, b, index,
 
1890
                                               fd, &foffs, &mrec, offsets);
 
1891
                        if (UNIV_UNLIKELY(!b)) {
 
1892
                                /* End of list, or I/O error */
 
1893
                                if (mrec) {
 
1894
                                        error = DB_CORRUPTION;
 
1895
                                }
 
1896
                                break;
 
1897
                        }
 
1898
 
 
1899
                        dtuple = row_rec_to_index_entry_low(
 
1900
                                mrec, index, offsets, &n_ext, tuple_heap);
 
1901
 
 
1902
                        if (UNIV_UNLIKELY(n_ext)) {
 
1903
                                row_merge_copy_blobs(mrec, offsets, zip_size,
 
1904
                                                     dtuple, tuple_heap);
 
1905
                        }
 
1906
 
 
1907
                        node->row = dtuple;
 
1908
                        node->table = table;
 
1909
                        node->trx_id = trx->id;
 
1910
 
 
1911
                        ut_ad(dtuple_validate(dtuple));
 
1912
 
 
1913
                        do {
 
1914
                                thr->run_node = thr;
 
1915
                                thr->prev_node = thr->common.parent;
 
1916
 
 
1917
                                error = row_ins_index_entry(index, dtuple,
 
1918
                                                            0, FALSE, thr);
 
1919
 
 
1920
                                if (UNIV_LIKELY(error == DB_SUCCESS)) {
 
1921
 
 
1922
                                        goto next_rec;
 
1923
                                }
 
1924
 
 
1925
                                thr->lock_state = QUE_THR_LOCK_ROW;
 
1926
                                trx->error_state = error;
 
1927
                                que_thr_stop_for_mysql(thr);
 
1928
                                thr->lock_state = QUE_THR_LOCK_NOLOCK;
 
1929
                        } while (row_mysql_handle_errors(&error, trx,
 
1930
                                                         thr, NULL));
 
1931
 
 
1932
                        goto err_exit;
 
1933
next_rec:
 
1934
                        mem_heap_empty(tuple_heap);
 
1935
                }
 
1936
        }
 
1937
 
 
1938
        que_thr_stop_for_mysql_no_error(thr, trx);
 
1939
err_exit:
 
1940
        que_graph_free(thr->graph);
 
1941
 
 
1942
        trx->op_info = "";
 
1943
 
 
1944
        mem_heap_free(tuple_heap);
 
1945
 
 
1946
        return(error);
 
1947
}
 
1948
 
 
1949
/*********************************************************************//**
 
1950
Sets an exclusive lock on a table, for the duration of creating indexes.
 
1951
@return error code or DB_SUCCESS */
 
1952
UNIV_INTERN
 
1953
ulint
 
1954
row_merge_lock_table(
 
1955
/*=================*/
 
1956
        trx_t*          trx,            /*!< in/out: transaction */
 
1957
        dict_table_t*   table,          /*!< in: table to lock */
 
1958
        enum lock_mode  mode)           /*!< in: LOCK_X or LOCK_S */
 
1959
{
 
1960
        mem_heap_t*     heap;
 
1961
        que_thr_t*      thr;
 
1962
        ulint           err;
 
1963
        sel_node_t*     node;
 
1964
 
 
1965
        ut_ad(trx);
 
1966
        ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
 
1967
        ut_ad(mode == LOCK_X || mode == LOCK_S);
 
1968
 
 
1969
        heap = mem_heap_create(512);
 
1970
 
 
1971
        trx->op_info = "setting table lock for creating or dropping index";
 
1972
 
 
1973
        node = sel_node_create(heap);
 
1974
        thr = pars_complete_graph_for_exec(node, trx, heap);
 
1975
        thr->graph->state = QUE_FORK_ACTIVE;
 
1976
 
 
1977
        /* We use the select query graph as the dummy graph needed
 
1978
        in the lock module call */
 
1979
 
 
1980
        thr = que_fork_get_first_thr(que_node_get_parent(thr));
 
1981
        que_thr_move_to_run_state_for_mysql(thr, trx);
 
1982
 
 
1983
run_again:
 
1984
        thr->run_node = thr;
 
1985
        thr->prev_node = thr->common.parent;
 
1986
 
 
1987
        err = lock_table(0, table, mode, thr);
 
1988
 
 
1989
        trx->error_state = err;
 
1990
 
 
1991
        if (UNIV_LIKELY(err == DB_SUCCESS)) {
 
1992
                que_thr_stop_for_mysql_no_error(thr, trx);
 
1993
        } else {
 
1994
                que_thr_stop_for_mysql(thr);
 
1995
 
 
1996
                if (err != DB_QUE_THR_SUSPENDED) {
 
1997
                        ibool   was_lock_wait;
 
1998
 
 
1999
                        was_lock_wait = row_mysql_handle_errors(
 
2000
                                &err, trx, thr, NULL);
 
2001
 
 
2002
                        if (was_lock_wait) {
 
2003
                                goto run_again;
 
2004
                        }
 
2005
                } else {
 
2006
                        que_thr_t*      run_thr;
 
2007
                        que_node_t*     parent;
 
2008
 
 
2009
                        parent = que_node_get_parent(thr);
 
2010
                        run_thr = que_fork_start_command(parent);
 
2011
 
 
2012
                        ut_a(run_thr == thr);
 
2013
 
 
2014
                        /* There was a lock wait but the thread was not
 
2015
                        in a ready to run or running state. */
 
2016
                        trx->error_state = DB_LOCK_WAIT;
 
2017
 
 
2018
                        goto run_again;
 
2019
                }
 
2020
        }
 
2021
 
 
2022
        que_graph_free(thr->graph);
 
2023
        trx->op_info = "";
 
2024
 
 
2025
        return(err);
 
2026
}
 
2027
 
 
2028
/*********************************************************************//**
 
2029
Drop an index from the InnoDB system tables.  The data dictionary must
 
2030
have been locked exclusively by the caller, because the transaction
 
2031
will not be committed. */
 
2032
UNIV_INTERN
 
2033
void
 
2034
row_merge_drop_index(
 
2035
/*=================*/
 
2036
        dict_index_t*   index,  /*!< in: index to be removed */
 
2037
        dict_table_t*   table,  /*!< in: table */
 
2038
        trx_t*          trx)    /*!< in: transaction handle */
 
2039
{
 
2040
        ulint           err;
 
2041
        pars_info_t*    info = pars_info_create();
 
2042
 
 
2043
        /* We use the private SQL parser of Innobase to generate the
 
2044
        query graphs needed in deleting the dictionary data from system
 
2045
        tables in Innobase. Deleting a row from SYS_INDEXES table also
 
2046
        frees the file segments of the B-tree associated with the index. */
 
2047
 
 
2048
        static const char str1[] =
 
2049
                "PROCEDURE DROP_INDEX_PROC () IS\n"
 
2050
                "BEGIN\n"
 
2051
                /* Rename the index, so that it will be dropped by
 
2052
                row_merge_drop_temp_indexes() at crash recovery
 
2053
                if the server crashes before this trx is committed. */
 
2054
                "UPDATE SYS_INDEXES SET NAME=CONCAT('"
 
2055
                TEMP_INDEX_PREFIX_STR "', NAME) WHERE ID = :indexid;\n"
 
2056
                "COMMIT WORK;\n"
 
2057
                /* Drop the field definitions of the index. */
 
2058
                "DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
 
2059
                /* Drop the index definition and the B-tree. */
 
2060
                "DELETE FROM SYS_INDEXES WHERE ID = :indexid;\n"
 
2061
                "END;\n";
 
2062
 
 
2063
        ut_ad(index && table && trx);
 
2064
 
 
2065
        pars_info_add_ull_literal(info, "indexid", index->id);
 
2066
 
 
2067
        trx_start_if_not_started(trx);
 
2068
        trx->op_info = "dropping index";
 
2069
 
 
2070
        ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
 
2071
 
 
2072
        err = que_eval_sql(info, str1, FALSE, trx);
 
2073
 
 
2074
        ut_a(err == DB_SUCCESS);
 
2075
 
 
2076
        /* Replace this index with another equivalent index for all
 
2077
        foreign key constraints on this table where this index is used */
 
2078
 
 
2079
        dict_table_replace_index_in_foreign_list(table, index);
 
2080
        dict_index_remove_from_cache(table, index);
 
2081
 
 
2082
        trx->op_info = "";
 
2083
}
 
2084
 
 
2085
/*********************************************************************//**
 
2086
Drop those indexes which were created before an error occurred when
 
2087
building an index.  The data dictionary must have been locked
 
2088
exclusively by the caller, because the transaction will not be
 
2089
committed. */
 
2090
UNIV_INTERN
 
2091
void
 
2092
row_merge_drop_indexes(
 
2093
/*===================*/
 
2094
        trx_t*          trx,            /*!< in: transaction */
 
2095
        dict_table_t*   table,          /*!< in: table containing the indexes */
 
2096
        dict_index_t**  index,          /*!< in: indexes to drop */
 
2097
        ulint           num_created)    /*!< in: number of elements in index[] */
 
2098
{
 
2099
        ulint   key_num;
 
2100
 
 
2101
        for (key_num = 0; key_num < num_created; key_num++) {
 
2102
                row_merge_drop_index(index[key_num], table, trx);
 
2103
        }
 
2104
}
 
2105
 
 
2106
/*********************************************************************//**
 
2107
Drop all partially created indexes during crash recovery. */
 
2108
UNIV_INTERN
 
2109
void
 
2110
row_merge_drop_temp_indexes(void)
 
2111
/*=============================*/
 
2112
{
 
2113
        trx_t*          trx;
 
2114
        btr_pcur_t      pcur;
 
2115
        mtr_t           mtr;
 
2116
 
 
2117
        /* Load the table definitions that contain partially defined
 
2118
        indexes, so that the data dictionary information can be checked
 
2119
        when accessing the tablename.ibd files. */
 
2120
 
 
2121
        trx = trx_allocate_for_background();
 
2122
        trx->op_info = "dropping partially created indexes";
 
2123
        row_mysql_lock_data_dictionary(trx);
 
2124
 
 
2125
        mtr_start(&mtr);
 
2126
 
 
2127
        btr_pcur_open_at_index_side(
 
2128
                TRUE,
 
2129
                dict_table_get_first_index(dict_sys->sys_indexes),
 
2130
                BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
 
2131
 
 
2132
        for (;;) {
 
2133
                const rec_t*    rec;
 
2134
                const byte*     field;
 
2135
                ulint           len;
 
2136
                table_id_t      table_id;
 
2137
                dict_table_t*   table;
 
2138
 
 
2139
                btr_pcur_move_to_next_user_rec(&pcur, &mtr);
 
2140
 
 
2141
                if (!btr_pcur_is_on_user_rec(&pcur)) {
 
2142
                        break;
 
2143
                }
 
2144
 
 
2145
                rec = btr_pcur_get_rec(&pcur);
 
2146
                field = rec_get_nth_field_old(rec, DICT_SYS_INDEXES_NAME_FIELD,
 
2147
                                              &len);
 
2148
                if (len == UNIV_SQL_NULL || len == 0
 
2149
                    || (char) *field != TEMP_INDEX_PREFIX) {
 
2150
                        continue;
 
2151
                }
 
2152
 
 
2153
                /* This is a temporary index. */
 
2154
 
 
2155
                field = rec_get_nth_field_old(rec, 0/*TABLE_ID*/, &len);
 
2156
                if (len != 8) {
 
2157
                        /* Corrupted TABLE_ID */
 
2158
                        continue;
 
2159
                }
 
2160
 
 
2161
                table_id = mach_read_from_8(field);
 
2162
 
 
2163
                btr_pcur_store_position(&pcur, &mtr);
 
2164
                btr_pcur_commit_specify_mtr(&pcur, &mtr);
 
2165
 
 
2166
                table = dict_table_get_on_id_low(table_id);
 
2167
 
 
2168
                if (table) {
 
2169
                        dict_index_t*   index;
 
2170
                        dict_index_t*   next_index;
 
2171
 
 
2172
                        for (index = dict_table_get_first_index(table);
 
2173
                             index; index = next_index) {
 
2174
 
 
2175
                                next_index = dict_table_get_next_index(index);
 
2176
 
 
2177
                                if (*index->name == TEMP_INDEX_PREFIX) {
 
2178
                                        row_merge_drop_index(index, table, trx);
 
2179
                                        trx_commit_for_mysql(trx);
 
2180
                                }
 
2181
                        }
 
2182
                }
 
2183
 
 
2184
                mtr_start(&mtr);
 
2185
                btr_pcur_restore_position(BTR_SEARCH_LEAF,
 
2186
                                          &pcur, &mtr);
 
2187
        }
 
2188
 
 
2189
        btr_pcur_close(&pcur);
 
2190
        mtr_commit(&mtr);
 
2191
        row_mysql_unlock_data_dictionary(trx);
 
2192
        trx_free_for_background(trx);
 
2193
}
 
2194
 
 
2195
/*********************************************************************//**
 
2196
Create a merge file. */
 
2197
static
 
2198
void
 
2199
row_merge_file_create(
 
2200
/*==================*/
 
2201
        merge_file_t*   merge_file)     /*!< out: merge file structure */
 
2202
{
 
2203
#ifdef UNIV_PFS_IO
 
2204
        /* This temp file open does not go through normal
 
2205
        file APIs, add instrumentation to register with
 
2206
        performance schema */
 
2207
        struct PSI_file_locker* locker = NULL;
 
2208
        PSI_file_locker_state   state;
 
2209
        register_pfs_file_open_begin(&state, locker, innodb_file_temp_key,
 
2210
                                     PSI_FILE_OPEN,
 
2211
                                     "Innodb Merge Temp File",
 
2212
                                     __FILE__, __LINE__);
 
2213
#endif
 
2214
        merge_file->fd = innobase_mysql_tmpfile();
 
2215
        merge_file->offset = 0;
 
2216
        merge_file->n_rec = 0;
 
2217
#ifdef UNIV_PFS_IO
 
2218
        register_pfs_file_open_end(locker, merge_file->fd);
 
2219
#endif
 
2220
}
 
2221
 
 
2222
/*********************************************************************//**
 
2223
Destroy a merge file. */
 
2224
static
 
2225
void
 
2226
row_merge_file_destroy(
 
2227
/*===================*/
 
2228
        merge_file_t*   merge_file)     /*!< out: merge file structure */
 
2229
{
 
2230
#ifdef UNIV_PFS_IO
 
2231
        struct PSI_file_locker* locker = NULL;
 
2232
        PSI_file_locker_state   state;
 
2233
        register_pfs_file_io_begin(&state, locker, merge_file->fd, 0, PSI_FILE_CLOSE,
 
2234
                                   __FILE__, __LINE__);
 
2235
#endif
 
2236
        if (merge_file->fd != -1) {
 
2237
                close(merge_file->fd);
 
2238
                merge_file->fd = -1;
 
2239
        }
 
2240
 
 
2241
#ifdef UNIV_PFS_IO
 
2242
        register_pfs_file_io_end(locker, 0);
 
2243
#endif
 
2244
}
 
2245
 
 
2246
/*********************************************************************//**
 
2247
Determine the precise type of a column that is added to a tem
 
2248
if a column must be constrained NOT NULL.
 
2249
@return col->prtype, possibly ORed with DATA_NOT_NULL */
 
2250
UNIV_INLINE
 
2251
ulint
 
2252
row_merge_col_prtype(
 
2253
/*=================*/
 
2254
        const dict_col_t*       col,            /*!< in: column */
 
2255
        const char*             col_name,       /*!< in: name of the column */
 
2256
        const merge_index_def_t*index_def)      /*!< in: the index definition
 
2257
                                                of the primary key */
 
2258
{
 
2259
        ulint   prtype = col->prtype;
 
2260
        ulint   i;
 
2261
 
 
2262
        ut_ad(index_def->ind_type & DICT_CLUSTERED);
 
2263
 
 
2264
        if (prtype & DATA_NOT_NULL) {
 
2265
 
 
2266
                return(prtype);
 
2267
        }
 
2268
 
 
2269
        /* All columns that are included
 
2270
        in the PRIMARY KEY must be NOT NULL. */
 
2271
 
 
2272
        for (i = 0; i < index_def->n_fields; i++) {
 
2273
                if (!strcmp(col_name, index_def->fields[i].field_name)) {
 
2274
                        return(prtype | DATA_NOT_NULL);
 
2275
                }
 
2276
        }
 
2277
 
 
2278
        return(prtype);
 
2279
}
 
2280
 
 
2281
/*********************************************************************//**
 
2282
Create a temporary table for creating a primary key, using the definition
 
2283
of an existing table.
 
2284
@return table, or NULL on error */
 
2285
UNIV_INTERN
 
2286
dict_table_t*
 
2287
row_merge_create_temporary_table(
 
2288
/*=============================*/
 
2289
        const char*             table_name,     /*!< in: new table name */
 
2290
        const merge_index_def_t*index_def,      /*!< in: the index definition
 
2291
                                                of the primary key */
 
2292
        const dict_table_t*     table,          /*!< in: old table definition */
 
2293
        trx_t*                  trx)            /*!< in/out: transaction
 
2294
                                                (sets error_state) */
 
2295
{
 
2296
        ulint           i;
 
2297
        dict_table_t*   new_table = NULL;
 
2298
        ulint           n_cols = dict_table_get_n_user_cols(table);
 
2299
        ulint           error;
 
2300
        mem_heap_t*     heap = mem_heap_create(1000);
 
2301
 
 
2302
        ut_ad(table_name);
 
2303
        ut_ad(index_def);
 
2304
        ut_ad(table);
 
2305
        ut_ad(mutex_own(&dict_sys->mutex));
 
2306
 
 
2307
        new_table = dict_mem_table_create(table_name, 0, n_cols, table->flags);
 
2308
 
 
2309
        for (i = 0; i < n_cols; i++) {
 
2310
                const dict_col_t*       col;
 
2311
                const char*             col_name;
 
2312
 
 
2313
                col = dict_table_get_nth_col(table, i);
 
2314
                col_name = dict_table_get_col_name(table, i);
 
2315
 
 
2316
                dict_mem_table_add_col(new_table, heap, col_name, col->mtype,
 
2317
                                       row_merge_col_prtype(col, col_name,
 
2318
                                                            index_def),
 
2319
                                       col->len);
 
2320
        }
 
2321
 
 
2322
        error = row_create_table_for_mysql(new_table, trx);
 
2323
        mem_heap_free(heap);
 
2324
 
 
2325
        if (error != DB_SUCCESS) {
 
2326
                trx->error_state = error;
 
2327
                new_table = NULL;
 
2328
        }
 
2329
 
 
2330
        return(new_table);
 
2331
}
 
2332
 
 
2333
/*********************************************************************//**
 
2334
Rename the temporary indexes in the dictionary to permanent ones.  The
 
2335
data dictionary must have been locked exclusively by the caller,
 
2336
because the transaction will not be committed.
 
2337
@return DB_SUCCESS if all OK */
 
2338
UNIV_INTERN
 
2339
ulint
 
2340
row_merge_rename_indexes(
 
2341
/*=====================*/
 
2342
        trx_t*          trx,            /*!< in/out: transaction */
 
2343
        dict_table_t*   table)          /*!< in/out: table with new indexes */
 
2344
{
 
2345
        ulint           err = DB_SUCCESS;
 
2346
        pars_info_t*    info = pars_info_create();
 
2347
 
 
2348
        /* We use the private SQL parser of Innobase to generate the
 
2349
        query graphs needed in renaming indexes. */
 
2350
 
 
2351
        static const char rename_indexes[] =
 
2352
                "PROCEDURE RENAME_INDEXES_PROC () IS\n"
 
2353
                "BEGIN\n"
 
2354
                "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
 
2355
                "WHERE TABLE_ID = :tableid AND SUBSTR(NAME,0,1)='"
 
2356
                TEMP_INDEX_PREFIX_STR "';\n"
 
2357
                "END;\n";
 
2358
 
 
2359
        ut_ad(table);
 
2360
        ut_ad(trx);
 
2361
        ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
 
2362
 
 
2363
        trx->op_info = "renaming indexes";
 
2364
 
 
2365
        pars_info_add_ull_literal(info, "tableid", table->id);
 
2366
 
 
2367
        err = que_eval_sql(info, rename_indexes, FALSE, trx);
 
2368
 
 
2369
        if (err == DB_SUCCESS) {
 
2370
                dict_index_t*   index = dict_table_get_first_index(table);
 
2371
                do {
 
2372
                        if (*index->name == TEMP_INDEX_PREFIX) {
 
2373
                                index->name++;
 
2374
                        }
 
2375
                        index = dict_table_get_next_index(index);
 
2376
                } while (index);
 
2377
        }
 
2378
 
 
2379
        trx->op_info = "";
 
2380
 
 
2381
        return(err);
 
2382
}
 
2383
 
 
2384
/*********************************************************************//**
 
2385
Rename the tables in the data dictionary.  The data dictionary must
 
2386
have been locked exclusively by the caller, because the transaction
 
2387
will not be committed.
 
2388
@return error code or DB_SUCCESS */
 
2389
UNIV_INTERN
 
2390
ulint
 
2391
row_merge_rename_tables(
 
2392
/*====================*/
 
2393
        dict_table_t*   old_table,      /*!< in/out: old table, renamed to
 
2394
                                        tmp_name */
 
2395
        dict_table_t*   new_table,      /*!< in/out: new table, renamed to
 
2396
                                        old_table->name */
 
2397
        const char*     tmp_name,       /*!< in: new name for old_table */
 
2398
        trx_t*          trx)            /*!< in: transaction handle */
 
2399
{
 
2400
        ulint           err     = DB_ERROR;
 
2401
        pars_info_t*    info;
 
2402
        char            old_name[MAX_TABLE_NAME_LEN + 1];
 
2403
 
 
2404
        ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
 
2405
        ut_ad(old_table != new_table);
 
2406
        ut_ad(mutex_own(&dict_sys->mutex));
 
2407
 
 
2408
        ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
 
2409
 
 
2410
        /* store the old/current name to an automatic variable */
 
2411
        if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
 
2412
                memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
 
2413
        } else {
 
2414
                ut_print_timestamp(stderr);
 
2415
                fprintf(stderr, "InnoDB: too long table name: '%s', "
 
2416
                        "max length is %d\n", old_table->name,
 
2417
                        MAX_TABLE_NAME_LEN);
 
2418
                ut_error;
 
2419
        }
 
2420
 
 
2421
        /* store the old/current name to an automatic variable */
 
2422
        if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
 
2423
                memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
 
2424
        } else {
 
2425
                ut_print_timestamp(stderr);
 
2426
                fprintf(stderr, "InnoDB: too long table name: '%s', "
 
2427
                        "max length is %d\n", old_table->name,
 
2428
                        MAX_TABLE_NAME_LEN);
 
2429
                ut_error;
 
2430
        }
 
2431
 
 
2432
        trx->op_info = "renaming tables";
 
2433
 
 
2434
        /* We use the private SQL parser of Innobase to generate the query
 
2435
        graphs needed in updating the dictionary data in system tables. */
 
2436
 
 
2437
        info = pars_info_create();
 
2438
 
 
2439
        pars_info_add_str_literal(info, "new_name", new_table->name);
 
2440
        pars_info_add_str_literal(info, "old_name", old_name);
 
2441
        pars_info_add_str_literal(info, "tmp_name", tmp_name);
 
2442
 
 
2443
        err = que_eval_sql(info,
 
2444
                           "PROCEDURE RENAME_TABLES () IS\n"
 
2445
                           "BEGIN\n"
 
2446
                           "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
 
2447
                           " WHERE NAME = :old_name;\n"
 
2448
                           "UPDATE SYS_TABLES SET NAME = :old_name\n"
 
2449
                           " WHERE NAME = :new_name;\n"
 
2450
                           "END;\n", FALSE, trx);
 
2451
 
 
2452
        if (err != DB_SUCCESS) {
 
2453
 
 
2454
                goto err_exit;
 
2455
        }
 
2456
 
 
2457
        /* The following calls will also rename the .ibd data files if
 
2458
        the tables are stored in a single-table tablespace */
 
2459
 
 
2460
        if (!dict_table_rename_in_cache(old_table, tmp_name, FALSE)
 
2461
            || !dict_table_rename_in_cache(new_table, old_name, FALSE)) {
 
2462
 
 
2463
                err = DB_ERROR;
 
2464
                goto err_exit;
 
2465
        }
 
2466
 
 
2467
        err = dict_load_foreigns(old_name, FALSE, TRUE);
 
2468
 
 
2469
        if (err != DB_SUCCESS) {
 
2470
err_exit:
 
2471
                trx->error_state = DB_SUCCESS;
 
2472
                trx_general_rollback_for_mysql(trx, NULL);
 
2473
                trx->error_state = DB_SUCCESS;
 
2474
        }
 
2475
 
 
2476
        trx->op_info = "";
 
2477
 
 
2478
        return(err);
 
2479
}
 
2480
 
 
2481
/*********************************************************************//**
 
2482
Create and execute a query graph for creating an index.
 
2483
@return DB_SUCCESS or error code */
 
2484
static
 
2485
ulint
 
2486
row_merge_create_index_graph(
 
2487
/*=========================*/
 
2488
        trx_t*          trx,            /*!< in: trx */
 
2489
        dict_table_t*   table,          /*!< in: table */
 
2490
        dict_index_t*   index)          /*!< in: index */
 
2491
{
 
2492
        ind_node_t*     node;           /*!< Index creation node */
 
2493
        mem_heap_t*     heap;           /*!< Memory heap */
 
2494
        que_thr_t*      thr;            /*!< Query thread */
 
2495
        ulint           err;
 
2496
 
 
2497
        ut_ad(trx);
 
2498
        ut_ad(table);
 
2499
        ut_ad(index);
 
2500
 
 
2501
        heap = mem_heap_create(512);
 
2502
 
 
2503
        index->table = table;
 
2504
        node = ind_create_graph_create(index, heap);
 
2505
        thr = pars_complete_graph_for_exec(node, trx, heap);
 
2506
 
 
2507
        ut_a(thr == que_fork_start_command(que_node_get_parent(thr)));
 
2508
 
 
2509
        que_run_threads(thr);
 
2510
 
 
2511
        err = trx->error_state;
 
2512
 
 
2513
        que_graph_free((que_t*) que_node_get_parent(thr));
 
2514
 
 
2515
        return(err);
 
2516
}
 
2517
 
 
2518
/*********************************************************************//**
 
2519
Create the index and load in to the dictionary.
 
2520
@return index, or NULL on error */
 
2521
UNIV_INTERN
 
2522
dict_index_t*
 
2523
row_merge_create_index(
 
2524
/*===================*/
 
2525
        trx_t*                  trx,    /*!< in/out: trx (sets error_state) */
 
2526
        dict_table_t*           table,  /*!< in: the index is on this table */
 
2527
        const merge_index_def_t*index_def)
 
2528
                                        /*!< in: the index definition */
 
2529
{
 
2530
        dict_index_t*   index;
 
2531
        ulint           err;
 
2532
        ulint           n_fields = index_def->n_fields;
 
2533
        ulint           i;
 
2534
 
 
2535
        /* Create the index prototype, using the passed in def, this is not
 
2536
        a persistent operation. We pass 0 as the space id, and determine at
 
2537
        a lower level the space id where to store the table. */
 
2538
 
 
2539
        index = dict_mem_index_create(table->name, index_def->name,
 
2540
                                      0, index_def->ind_type, n_fields);
 
2541
 
 
2542
        ut_a(index);
 
2543
 
 
2544
        for (i = 0; i < n_fields; i++) {
 
2545
                merge_index_field_t*    ifield = &index_def->fields[i];
 
2546
 
 
2547
                dict_mem_index_add_field(index, ifield->field_name,
 
2548
                                         ifield->prefix_len);
 
2549
        }
 
2550
 
 
2551
        /* Add the index to SYS_INDEXES, using the index prototype. */
 
2552
        err = row_merge_create_index_graph(trx, table, index);
 
2553
 
 
2554
        if (err == DB_SUCCESS) {
 
2555
 
 
2556
                index = row_merge_dict_table_get_index(
 
2557
                        table, index_def);
 
2558
 
 
2559
                ut_a(index);
 
2560
 
 
2561
                /* Note the id of the transaction that created this
 
2562
                index, we use it to restrict readers from accessing
 
2563
                this index, to ensure read consistency. */
 
2564
                index->trx_id = trx->id;
 
2565
        } else {
 
2566
                index = NULL;
 
2567
        }
 
2568
 
 
2569
        return(index);
 
2570
}
 
2571
 
 
2572
/*********************************************************************//**
 
2573
Check if a transaction can use an index. */
 
2574
UNIV_INTERN
 
2575
ibool
 
2576
row_merge_is_index_usable(
 
2577
/*======================*/
 
2578
        const trx_t*            trx,    /*!< in: transaction */
 
2579
        const dict_index_t*     index)  /*!< in: index to check */
 
2580
{
 
2581
        return(!trx->read_view
 
2582
               || read_view_sees_trx_id(trx->read_view, index->trx_id));
 
2583
}
 
2584
 
 
2585
/*********************************************************************//**
 
2586
Drop the old table.
 
2587
@return DB_SUCCESS or error code */
 
2588
UNIV_INTERN
 
2589
ulint
 
2590
row_merge_drop_table(
 
2591
/*=================*/
 
2592
        trx_t*          trx,            /*!< in: transaction */
 
2593
        dict_table_t*   table)          /*!< in: table to drop */
 
2594
{
 
2595
        /* There must be no open transactions on the table. */
 
2596
        ut_a(table->n_mysql_handles_opened == 0);
 
2597
 
 
2598
        return(row_drop_table_for_mysql(table->name, trx, FALSE));
 
2599
}
 
2600
 
 
2601
/*********************************************************************//**
 
2602
Build indexes on a table by reading a clustered index,
 
2603
creating a temporary file containing index entries, merge sorting
 
2604
these index entries and inserting sorted index entries to indexes.
 
2605
@return DB_SUCCESS or error code */
 
2606
UNIV_INTERN
 
2607
ulint
 
2608
row_merge_build_indexes(
 
2609
/*====================*/
 
2610
        trx_t*          trx,            /*!< in: transaction */
 
2611
        dict_table_t*   old_table,      /*!< in: table where rows are
 
2612
                                        read from */
 
2613
        dict_table_t*   new_table,      /*!< in: table where indexes are
 
2614
                                        created; identical to old_table
 
2615
                                        unless creating a PRIMARY KEY */
 
2616
        dict_index_t**  indexes,        /*!< in: indexes to be created */
 
2617
        ulint           n_indexes,      /*!< in: size of indexes[] */
 
2618
        TABLE*  table)          /*!< in/out: MySQL table, for
 
2619
                                        reporting erroneous key value
 
2620
                                        if applicable */
 
2621
{
 
2622
        merge_file_t*           merge_files;
 
2623
        row_merge_block_t*      block;
 
2624
        ulint                   block_size;
 
2625
        ulint                   i;
 
2626
        ulint                   error;
 
2627
        int                     tmpfd;
 
2628
 
 
2629
        ut_ad(trx);
 
2630
        ut_ad(old_table);
 
2631
        ut_ad(new_table);
 
2632
        ut_ad(indexes);
 
2633
        ut_ad(n_indexes);
 
2634
 
 
2635
        trx_start_if_not_started(trx);
 
2636
 
 
2637
        /* Allocate memory for merge file data structure and initialize
 
2638
        fields */
 
2639
 
 
2640
        merge_files = mem_alloc(n_indexes * sizeof *merge_files);
 
2641
        block_size = 3 * sizeof *block;
 
2642
        block = os_mem_alloc_large(&block_size);
 
2643
 
 
2644
        for (i = 0; i < n_indexes; i++) {
 
2645
 
 
2646
                row_merge_file_create(&merge_files[i]);
 
2647
        }
 
2648
 
 
2649
        tmpfd = innobase_mysql_tmpfile();
 
2650
 
 
2651
        /* Reset the MySQL row buffer that is used when reporting
 
2652
        duplicate keys. */
 
2653
        innobase_rec_reset(table);
 
2654
 
 
2655
        /* Read clustered index of the table and create files for
 
2656
        secondary index entries for merge sort */
 
2657
 
 
2658
        error = row_merge_read_clustered_index(
 
2659
                trx, table, old_table, new_table, indexes,
 
2660
                merge_files, n_indexes, block);
 
2661
 
 
2662
        if (error != DB_SUCCESS) {
 
2663
 
 
2664
                goto func_exit;
 
2665
        }
 
2666
 
 
2667
        /* Now we have files containing index entries ready for
 
2668
        sorting and inserting. */
 
2669
 
 
2670
        for (i = 0; i < n_indexes; i++) {
 
2671
                error = row_merge_sort(trx, indexes[i], &merge_files[i],
 
2672
                                       block, &tmpfd, table);
 
2673
 
 
2674
                if (error == DB_SUCCESS) {
 
2675
                        error = row_merge_insert_index_tuples(
 
2676
                                trx, indexes[i], new_table,
 
2677
                                dict_table_zip_size(old_table),
 
2678
                                merge_files[i].fd, block);
 
2679
                }
 
2680
 
 
2681
                /* Close the temporary file to free up space. */
 
2682
                row_merge_file_destroy(&merge_files[i]);
 
2683
 
 
2684
                if (error != DB_SUCCESS) {
 
2685
                        trx->error_key_num = i;
 
2686
                        goto func_exit;
 
2687
                }
 
2688
        }
 
2689
 
 
2690
func_exit:
 
2691
        close(tmpfd);
 
2692
 
 
2693
        for (i = 0; i < n_indexes; i++) {
 
2694
                row_merge_file_destroy(&merge_files[i]);
 
2695
        }
 
2696
 
 
2697
        mem_free(merge_files);
 
2698
        os_mem_free_large(block, block_size);
 
2699
 
 
2700
        return(error);
 
2701
}