~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/row/row0merge.cc

  • Committer: Monty Taylor
  • Date: 2008-09-16 00:00:48 UTC
  • mto: This revision was merged to the branch mainline in revision 391.
  • Revision ID: monty@inaugust.com-20080916000048-3rvrv3gv9l0ad3gs
Fixed copyright headers in drizzled/

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*****************************************************************************
2
 
 
3
 
Copyright (C) 2005, 2010, Innobase Oy. All Rights Reserved.
4
 
 
5
 
This program is free software; you can redistribute it and/or modify it under
6
 
the terms of the GNU General Public License as published by the Free Software
7
 
Foundation; version 2 of the License.
8
 
 
9
 
This program is distributed in the hope that it will be useful, but WITHOUT
10
 
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
 
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
 
 
13
 
You should have received a copy of the GNU General Public License along with
14
 
this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
15
 
St, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
*****************************************************************************/
18
 
 
19
 
/**************************************************//**
20
 
@file row/row0merge.c
21
 
New index creation routines using a merge sort
22
 
 
23
 
Created 12/4/2005 Jan Lindstrom
24
 
Completed by Sunny Bains and Marko Makela
25
 
*******************************************************/
26
 
 
27
 
#include "row0merge.h"
28
 
#include "row0ext.h"
29
 
#include "row0row.h"
30
 
#include "row0upd.h"
31
 
#include "row0ins.h"
32
 
#include "row0sel.h"
33
 
#include "dict0dict.h"
34
 
#include "dict0mem.h"
35
 
#include "dict0boot.h"
36
 
#include "dict0crea.h"
37
 
#include "dict0load.h"
38
 
#include "btr0btr.h"
39
 
#include "mach0data.h"
40
 
#include "trx0rseg.h"
41
 
#include "trx0trx.h"
42
 
#include "trx0roll.h"
43
 
#include "trx0undo.h"
44
 
#include "trx0purge.h"
45
 
#include "trx0rec.h"
46
 
#include "que0que.h"
47
 
#include "rem0cmp.h"
48
 
#include "read0read.h"
49
 
#include "os0file.h"
50
 
#include "lock0lock.h"
51
 
#include "data0data.h"
52
 
#include "data0type.h"
53
 
#include "que0que.h"
54
 
#include "pars0pars.h"
55
 
#include "mem0mem.h"
56
 
#include "log0log.h"
57
 
#include "ut0sort.h"
58
 
#include "handler0alter.h"
59
 
#include <unistd.h>
60
 
 
61
 
/* Ignore posix_fadvise() on those platforms where it does not exist */
62
 
#if defined __WIN__
63
 
# define posix_fadvise(fd, offset, len, advice) /* nothing */
64
 
#endif /* __WIN__ */
65
 
 
66
 
#ifdef UNIV_DEBUG
67
 
/** Set these in order ot enable debug printout. */
68
 
/* @{ */
69
 
/** Log the outcome of each row_merge_cmp() call, comparing records. */
70
 
static ibool    row_merge_print_cmp;
71
 
/** Log each record read from temporary file. */
72
 
static ibool    row_merge_print_read;
73
 
/** Log each record write to temporary file. */
74
 
static ibool    row_merge_print_write;
75
 
/** Log each row_merge_blocks() call, merging two blocks of records to
76
 
a bigger one. */
77
 
static ibool    row_merge_print_block;
78
 
/** Log each block read from temporary file. */
79
 
static ibool    row_merge_print_block_read;
80
 
/** Log each block read from temporary file. */
81
 
static ibool    row_merge_print_block_write;
82
 
/* @} */
83
 
#endif /* UNIV_DEBUG */
84
 
 
85
 
/** @brief Block size for I/O operations in merge sort.
86
 
 
87
 
The minimum is UNIV_PAGE_SIZE, or page_get_free_space_of_empty()
88
 
rounded to a power of 2.
89
 
 
90
 
When not creating a PRIMARY KEY that contains column prefixes, this
91
 
can be set as small as UNIV_PAGE_SIZE / 2.  See the comment above
92
 
ut_ad(data_size < sizeof(row_merge_block_t)). */
93
 
typedef byte    row_merge_block_t[1048576];
94
 
 
95
 
/** @brief Secondary buffer for I/O operations of merge records.
96
 
 
97
 
This buffer is used for writing or reading a record that spans two
98
 
row_merge_block_t.  Thus, it must be able to hold one merge record,
99
 
whose maximum size is the same as the minimum size of
100
 
row_merge_block_t. */
101
 
typedef byte    mrec_buf_t[UNIV_PAGE_SIZE];
102
 
 
103
 
/** @brief Merge record in row_merge_block_t.
104
 
 
105
 
The format is the same as a record in ROW_FORMAT=COMPACT with the
106
 
exception that the REC_N_NEW_EXTRA_BYTES are omitted. */
107
 
typedef byte    mrec_t;
108
 
 
109
 
/** Buffer for sorting in main memory. */
110
 
struct row_merge_buf_struct {
111
 
        mem_heap_t*     heap;           /*!< memory heap where allocated */
112
 
        dict_index_t*   index;          /*!< the index the tuples belong to */
113
 
        ulint           total_size;     /*!< total amount of data bytes */
114
 
        ulint           n_tuples;       /*!< number of data tuples */
115
 
        ulint           max_tuples;     /*!< maximum number of data tuples */
116
 
        const dfield_t**tuples;         /*!< array of pointers to
117
 
                                        arrays of fields that form
118
 
                                        the data tuples */
119
 
        const dfield_t**tmp_tuples;     /*!< temporary copy of tuples,
120
 
                                        for sorting */
121
 
};
122
 
 
123
 
/** Buffer for sorting in main memory. */
124
 
typedef struct row_merge_buf_struct row_merge_buf_t;
125
 
 
126
 
/** Information about temporary files used in merge sort */
127
 
struct merge_file_struct {
128
 
        int             fd;             /*!< file descriptor */
129
 
        ulint           offset;         /*!< file offset (end of file) */
130
 
        ib_uint64_t     n_rec;          /*!< number of records in the file */
131
 
};
132
 
 
133
 
/** Information about temporary files used in merge sort */
134
 
typedef struct merge_file_struct merge_file_t;
135
 
 
136
 
#ifdef UNIV_DEBUG
137
 
/******************************************************//**
138
 
Display a merge tuple. */
139
 
static
140
 
void
141
 
row_merge_tuple_print(
142
 
/*==================*/
143
 
        FILE*           f,      /*!< in: output stream */
144
 
        const dfield_t* entry,  /*!< in: tuple to print */
145
 
        ulint           n_fields)/*!< in: number of fields in the tuple */
146
 
{
147
 
        ulint   j;
148
 
 
149
 
        for (j = 0; j < n_fields; j++) {
150
 
                const dfield_t* field = &entry[j];
151
 
 
152
 
                if (dfield_is_null(field)) {
153
 
                        fputs("\n NULL;", f);
154
 
                } else {
155
 
                        ulint   field_len       = dfield_get_len(field);
156
 
                        ulint   len             = ut_min(field_len, 20);
157
 
                        if (dfield_is_ext(field)) {
158
 
                                fputs("\nE", f);
159
 
                        } else {
160
 
                                fputs("\n ", f);
161
 
                        }
162
 
                        ut_print_buf(f, dfield_get_data(field), len);
163
 
                        if (len != field_len) {
164
 
                                fprintf(f, " (total %lu bytes)", field_len);
165
 
                        }
166
 
                }
167
 
        }
168
 
        putc('\n', f);
169
 
}
170
 
#endif /* UNIV_DEBUG */
171
 
 
172
 
/******************************************************//**
173
 
Allocate a sort buffer.
174
 
@return own: sort buffer */
175
 
static
176
 
row_merge_buf_t*
177
 
row_merge_buf_create_low(
178
 
/*=====================*/
179
 
        mem_heap_t*     heap,           /*!< in: heap where allocated */
180
 
        dict_index_t*   index,          /*!< in: secondary index */
181
 
        ulint           max_tuples,     /*!< in: maximum number of data tuples */
182
 
        ulint           buf_size)       /*!< in: size of the buffer, in bytes */
183
 
{
184
 
        row_merge_buf_t*        buf;
185
 
 
186
 
        ut_ad(max_tuples > 0);
187
 
        ut_ad(max_tuples <= sizeof(row_merge_block_t));
188
 
        ut_ad(max_tuples < buf_size);
189
 
 
190
 
        buf = static_cast<row_merge_buf_t *>(mem_heap_zalloc(heap, buf_size));
191
 
        buf->heap = heap;
192
 
        buf->index = index;
193
 
        buf->max_tuples = max_tuples;
194
 
        buf->tuples = static_cast<const dfield_t **>(mem_heap_alloc(heap,
195
 
                                     2 * max_tuples * sizeof *buf->tuples));
196
 
        buf->tmp_tuples = buf->tuples + max_tuples;
197
 
 
198
 
        return(buf);
199
 
}
200
 
 
201
 
/******************************************************//**
202
 
Allocate a sort buffer.
203
 
@return own: sort buffer */
204
 
static
205
 
row_merge_buf_t*
206
 
row_merge_buf_create(
207
 
/*=================*/
208
 
        dict_index_t*   index)  /*!< in: secondary index */
209
 
{
210
 
        row_merge_buf_t*        buf;
211
 
        ulint                   max_tuples;
212
 
        ulint                   buf_size;
213
 
        mem_heap_t*             heap;
214
 
 
215
 
        max_tuples = sizeof(row_merge_block_t)
216
 
                / ut_max(1, dict_index_get_min_size(index));
217
 
 
218
 
        buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
219
 
 
220
 
        heap = mem_heap_create(buf_size + sizeof(row_merge_block_t));
221
 
 
222
 
        buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
223
 
 
224
 
        return(buf);
225
 
}
226
 
 
227
 
/******************************************************//**
228
 
Empty a sort buffer.
229
 
@return sort buffer */
230
 
static
231
 
row_merge_buf_t*
232
 
row_merge_buf_empty(
233
 
/*================*/
234
 
        row_merge_buf_t*        buf)    /*!< in,own: sort buffer */
235
 
{
236
 
        ulint           buf_size;
237
 
        ulint           max_tuples      = buf->max_tuples;
238
 
        mem_heap_t*     heap            = buf->heap;
239
 
        dict_index_t*   index           = buf->index;
240
 
 
241
 
        buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
242
 
 
243
 
        mem_heap_empty(heap);
244
 
 
245
 
        return(row_merge_buf_create_low(heap, index, max_tuples, buf_size));
246
 
}
247
 
 
248
 
/******************************************************//**
249
 
Deallocate a sort buffer. */
250
 
static
251
 
void
252
 
row_merge_buf_free(
253
 
/*===============*/
254
 
        row_merge_buf_t*        buf)    /*!< in,own: sort buffer, to be freed */
255
 
{
256
 
        mem_heap_free(buf->heap);
257
 
}
258
 
 
259
 
/******************************************************//**
260
 
Insert a data tuple into a sort buffer.
261
 
@return TRUE if added, FALSE if out of space */
262
 
static
263
 
ibool
264
 
row_merge_buf_add(
265
 
/*==============*/
266
 
        row_merge_buf_t*        buf,    /*!< in/out: sort buffer */
267
 
        const dtuple_t*         row,    /*!< in: row in clustered index */
268
 
        const row_ext_t*        ext)    /*!< in: cache of externally stored
269
 
                                        column prefixes, or NULL */
270
 
{
271
 
        ulint                   i;
272
 
        ulint                   n_fields;
273
 
        ulint                   data_size;
274
 
        ulint                   extra_size;
275
 
        const dict_index_t*     index;
276
 
        dfield_t*               entry;
277
 
        dfield_t*               field;
278
 
        const dict_field_t*     ifield;
279
 
 
280
 
        if (buf->n_tuples >= buf->max_tuples) {
281
 
                return(FALSE);
282
 
        }
283
 
 
284
 
        UNIV_PREFETCH_R(row->fields);
285
 
 
286
 
        index = buf->index;
287
 
 
288
 
        n_fields = dict_index_get_n_fields(index);
289
 
 
290
 
        entry = static_cast<dfield_t *>(mem_heap_alloc(buf->heap, n_fields * sizeof *entry));
291
 
        buf->tuples[buf->n_tuples] = entry;
292
 
        field = entry;
293
 
 
294
 
        data_size = 0;
295
 
        extra_size = UT_BITS_IN_BYTES(index->n_nullable);
296
 
 
297
 
        ifield = dict_index_get_nth_field(index, 0);
298
 
 
299
 
        for (i = 0; i < n_fields; i++, field++, ifield++) {
300
 
                const dict_col_t*       col;
301
 
                ulint                   col_no;
302
 
                const dfield_t*         row_field;
303
 
                ulint                   len;
304
 
 
305
 
                col = ifield->col;
306
 
                col_no = dict_col_get_no(col);
307
 
                row_field = dtuple_get_nth_field(row, col_no);
308
 
                dfield_copy(field, row_field);
309
 
                len = dfield_get_len(field);
310
 
 
311
 
                if (dfield_is_null(field)) {
312
 
                        ut_ad(!(col->prtype & DATA_NOT_NULL));
313
 
                        continue;
314
 
                } else if (UNIV_LIKELY(!ext)) {
315
 
                } else if (dict_index_is_clust(index)) {
316
 
                        /* Flag externally stored fields. */
317
 
                        const byte*     row_buf = row_ext_lookup(ext, col_no,
318
 
                                                             &len);
319
 
                        if (UNIV_LIKELY_NULL(row_buf)) {
320
 
                                ut_a(row_buf != field_ref_zero);
321
 
                                if (i < dict_index_get_n_unique(index)) {
322
 
                                        dfield_set_data(field, row_buf, len);
323
 
                                } else {
324
 
                                        dfield_set_ext(field);
325
 
                                        len = dfield_get_len(field);
326
 
                                }
327
 
                        }
328
 
                } else {
329
 
                        const byte*     row_buf = row_ext_lookup(ext, col_no,
330
 
                                                             &len);
331
 
                        if (UNIV_LIKELY_NULL(row_buf)) {
332
 
                                ut_a(row_buf != field_ref_zero);
333
 
                                dfield_set_data(field, row_buf, len);
334
 
                        }
335
 
                }
336
 
 
337
 
                /* If a column prefix index, take only the prefix */
338
 
 
339
 
                if (ifield->prefix_len) {
340
 
                        len = dtype_get_at_most_n_mbchars(
341
 
                                col->prtype,
342
 
                                col->mbminmaxlen,
343
 
                                ifield->prefix_len,
344
 
                                len, static_cast<const char *>(dfield_get_data(field)));
345
 
                        dfield_set_len(field, len);
346
 
                }
347
 
 
348
 
                ut_ad(len <= col->len || col->mtype == DATA_BLOB);
349
 
 
350
 
                if (ifield->fixed_len) {
351
 
                        ut_ad(len == ifield->fixed_len);
352
 
                        ut_ad(!dfield_is_ext(field));
353
 
                } else if (dfield_is_ext(field)) {
354
 
                        extra_size += 2;
355
 
                } else if (len < 128
356
 
                           || (col->len < 256 && col->mtype != DATA_BLOB)) {
357
 
                        extra_size++;
358
 
                } else {
359
 
                        /* For variable-length columns, we look up the
360
 
                        maximum length from the column itself.  If this
361
 
                        is a prefix index column shorter than 256 bytes,
362
 
                        this will waste one byte. */
363
 
                        extra_size += 2;
364
 
                }
365
 
                data_size += len;
366
 
        }
367
 
 
368
 
#ifdef UNIV_DEBUG
369
 
        {
370
 
                ulint   size;
371
 
                ulint   extra;
372
 
 
373
 
                size = rec_get_converted_size_comp(index,
374
 
                                                   REC_STATUS_ORDINARY,
375
 
                                                   entry, n_fields, &extra);
376
 
 
377
 
                ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
378
 
                ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
379
 
        }
380
 
#endif /* UNIV_DEBUG */
381
 
 
382
 
        /* Add to the total size of the record in row_merge_block_t
383
 
        the encoded length of extra_size and the extra bytes (extra_size).
384
 
        See row_merge_buf_write() for the variable-length encoding
385
 
        of extra_size. */
386
 
        data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
387
 
 
388
 
        /* The following assertion may fail if row_merge_block_t is
389
 
        declared very small and a PRIMARY KEY is being created with
390
 
        many prefix columns.  In that case, the record may exceed the
391
 
        page_zip_rec_needs_ext() limit.  However, no further columns
392
 
        will be moved to external storage until the record is inserted
393
 
        to the clustered index B-tree. */
394
 
        ut_ad(data_size < sizeof(row_merge_block_t));
395
 
 
396
 
        /* Reserve one byte for the end marker of row_merge_block_t. */
397
 
        if (buf->total_size + data_size >= sizeof(row_merge_block_t) - 1) {
398
 
                return(FALSE);
399
 
        }
400
 
 
401
 
        buf->total_size += data_size;
402
 
        buf->n_tuples++;
403
 
 
404
 
        field = entry;
405
 
 
406
 
        /* Copy the data fields. */
407
 
 
408
 
        do {
409
 
                dfield_dup(field++, buf->heap);
410
 
        } while (--n_fields);
411
 
 
412
 
        return(TRUE);
413
 
}
414
 
 
415
 
/** Structure for reporting duplicate records. */
416
 
struct row_merge_dup_struct {
417
 
        const dict_index_t*     index;          /*!< index being sorted */
418
 
        TABLE*          table;          /*!< MySQL table object */
419
 
        ulint                   n_dup;          /*!< number of duplicates */
420
 
};
421
 
 
422
 
/** Structure for reporting duplicate records. */
423
 
typedef struct row_merge_dup_struct row_merge_dup_t;
424
 
 
425
 
/*************************************************************//**
426
 
Report a duplicate key. */
427
 
static
428
 
void
429
 
row_merge_dup_report(
430
 
/*=================*/
431
 
        row_merge_dup_t*        dup,    /*!< in/out: for reporting duplicates */
432
 
        const dfield_t*         entry)  /*!< in: duplicate index entry */
433
 
{
434
 
        mrec_buf_t*             buf;
435
 
        const dtuple_t*         tuple;
436
 
        dtuple_t                tuple_store;
437
 
        const rec_t*            rec;
438
 
        const dict_index_t*     index   = dup->index;
439
 
        ulint                   n_fields= dict_index_get_n_fields(index);
440
 
        mem_heap_t*             heap;
441
 
        ulint*                  offsets;
442
 
        ulint                   n_ext;
443
 
 
444
 
        if (dup->n_dup++) {
445
 
                /* Only report the first duplicate record,
446
 
                but count all duplicate records. */
447
 
                return;
448
 
        }
449
 
 
450
 
        /* Convert the tuple to a record and then to MySQL format. */
451
 
        heap = mem_heap_create((1 + REC_OFFS_HEADER_SIZE + n_fields)
452
 
                               * sizeof *offsets
453
 
                               + sizeof *buf);
454
 
 
455
 
        buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof *buf));
456
 
 
457
 
        tuple = dtuple_from_fields(&tuple_store, entry, n_fields);
458
 
        n_ext = dict_index_is_clust(index) ? dtuple_get_n_ext(tuple) : 0;
459
 
 
460
 
        rec = rec_convert_dtuple_to_rec(*buf, index, tuple, n_ext);
461
 
        offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
462
 
 
463
 
        innobase_rec_to_mysql(dup->table, rec, index, offsets);
464
 
 
465
 
        mem_heap_free(heap);
466
 
}
467
 
 
468
 
/*************************************************************//**
469
 
Compare two tuples.
470
 
@return 1, 0, -1 if a is greater, equal, less, respectively, than b */
471
 
static
472
 
int
473
 
row_merge_tuple_cmp(
474
 
/*================*/
475
 
        ulint                   n_field,/*!< in: number of fields */
476
 
        const dfield_t*         a,      /*!< in: first tuple to be compared */
477
 
        const dfield_t*         b,      /*!< in: second tuple to be compared */
478
 
        row_merge_dup_t*        dup)    /*!< in/out: for reporting duplicates */
479
 
{
480
 
        int             cmp;
481
 
        const dfield_t* field   = a;
482
 
 
483
 
        /* Compare the fields of the tuples until a difference is
484
 
        found or we run out of fields to compare.  If !cmp at the
485
 
        end, the tuples are equal. */
486
 
        do {
487
 
                cmp = cmp_dfield_dfield(a++, b++);
488
 
        } while (!cmp && --n_field);
489
 
 
490
 
        if (UNIV_UNLIKELY(!cmp) && UNIV_LIKELY_NULL(dup)) {
491
 
                /* Report a duplicate value error if the tuples are
492
 
                logically equal.  NULL columns are logically inequal,
493
 
                although they are equal in the sorting order.  Find
494
 
                out if any of the fields are NULL. */
495
 
                for (b = field; b != a; b++) {
496
 
                        if (dfield_is_null(b)) {
497
 
 
498
 
                                goto func_exit;
499
 
                        }
500
 
                }
501
 
 
502
 
                row_merge_dup_report(dup, field);
503
 
        }
504
 
 
505
 
func_exit:
506
 
        return(cmp);
507
 
}
508
 
 
509
 
/** Wrapper for row_merge_tuple_sort() to inject some more context to
510
 
UT_SORT_FUNCTION_BODY().
511
 
@param a        array of tuples that being sorted
512
 
@param b        aux (work area), same size as tuples[]
513
 
@param c        lower bound of the sorting area, inclusive
514
 
@param d        upper bound of the sorting area, inclusive */
515
 
#define row_merge_tuple_sort_ctx(a,b,c,d) \
516
 
        row_merge_tuple_sort(n_field, dup, a, b, c, d)
517
 
/** Wrapper for row_merge_tuple_cmp() to inject some more context to
518
 
UT_SORT_FUNCTION_BODY().
519
 
@param a        first tuple to be compared
520
 
@param b        second tuple to be compared
521
 
@return 1, 0, -1 if a is greater, equal, less, respectively, than b */
522
 
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, a, b, dup)
523
 
 
524
 
/**********************************************************************//**
525
 
Merge sort the tuple buffer in main memory. */
526
 
static
527
 
void
528
 
row_merge_tuple_sort(
529
 
/*=================*/
530
 
        ulint                   n_field,/*!< in: number of fields */
531
 
        row_merge_dup_t*        dup,    /*!< in/out: for reporting duplicates */
532
 
        const dfield_t**        tuples, /*!< in/out: tuples */
533
 
        const dfield_t**        aux,    /*!< in/out: work area */
534
 
        ulint                   low,    /*!< in: lower bound of the
535
 
                                        sorting area, inclusive */
536
 
        ulint                   high)   /*!< in: upper bound of the
537
 
                                        sorting area, exclusive */
538
 
{
539
 
        UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
540
 
                              tuples, aux, low, high, row_merge_tuple_cmp_ctx);
541
 
}
542
 
 
543
 
/******************************************************//**
544
 
Sort a buffer. */
545
 
static
546
 
void
547
 
row_merge_buf_sort(
548
 
/*===============*/
549
 
        row_merge_buf_t*        buf,    /*!< in/out: sort buffer */
550
 
        row_merge_dup_t*        dup)    /*!< in/out: for reporting duplicates */
551
 
{
552
 
        row_merge_tuple_sort(dict_index_get_n_unique(buf->index), dup,
553
 
                             buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
554
 
}
555
 
 
556
 
/******************************************************//**
557
 
Write a buffer to a block. */
558
 
static
559
 
void
560
 
row_merge_buf_write(
561
 
/*================*/
562
 
        const row_merge_buf_t*  buf,    /*!< in: sorted buffer */
563
 
#ifdef UNIV_DEBUG
564
 
        const merge_file_t*     of,     /*!< in: output file */
565
 
#endif /* UNIV_DEBUG */
566
 
        row_merge_block_t*      block)  /*!< out: buffer for writing to file */
567
 
#ifndef UNIV_DEBUG
568
 
# define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block)
569
 
#endif /* !UNIV_DEBUG */
570
 
{
571
 
        const dict_index_t*     index   = buf->index;
572
 
        ulint                   n_fields= dict_index_get_n_fields(index);
573
 
        byte*                   b       = &(*block)[0];
574
 
 
575
 
        ulint           i;
576
 
 
577
 
        for (i = 0; i < buf->n_tuples; i++) {
578
 
                ulint           size;
579
 
                ulint           extra_size;
580
 
                const dfield_t* entry           = buf->tuples[i];
581
 
 
582
 
                size = rec_get_converted_size_comp(index,
583
 
                                                   REC_STATUS_ORDINARY,
584
 
                                                   entry, n_fields,
585
 
                                                   &extra_size);
586
 
                ut_ad(size > extra_size);
587
 
                ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
588
 
                extra_size -= REC_N_NEW_EXTRA_BYTES;
589
 
                size -= REC_N_NEW_EXTRA_BYTES;
590
 
 
591
 
                /* Encode extra_size + 1 */
592
 
                if (extra_size + 1 < 0x80) {
593
 
                        *b++ = (byte) (extra_size + 1);
594
 
                } else {
595
 
                        ut_ad((extra_size + 1) < 0x8000);
596
 
                        *b++ = (byte) (0x80 | ((extra_size + 1) >> 8));
597
 
                        *b++ = (byte) (extra_size + 1);
598
 
                }
599
 
 
600
 
                ut_ad(b + size < block[1]);
601
 
 
602
 
                rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
603
 
                                               REC_STATUS_ORDINARY,
604
 
                                               entry, n_fields);
605
 
 
606
 
                b += size;
607
 
 
608
 
#ifdef UNIV_DEBUG
609
 
                if (row_merge_print_write) {
610
 
                        fprintf(stderr, "row_merge_buf_write %p,%d,%lu %lu",
611
 
                                (void*) b, of->fd, (ulong) of->offset,
612
 
                                (ulong) i);
613
 
                        row_merge_tuple_print(stderr, entry, n_fields);
614
 
                }
615
 
#endif /* UNIV_DEBUG */
616
 
        }
617
 
 
618
 
        /* Write an "end-of-chunk" marker. */
619
 
        ut_a(b < block[1]);
620
 
        ut_a(b == block[0] + buf->total_size);
621
 
        *b++ = 0;
622
 
#ifdef UNIV_DEBUG_VALGRIND
623
 
        /* The rest of the block is uninitialized.  Initialize it
624
 
        to avoid bogus warnings. */
625
 
        memset(b, 0xff, block[1] - b);
626
 
#endif /* UNIV_DEBUG_VALGRIND */
627
 
#ifdef UNIV_DEBUG
628
 
        if (row_merge_print_write) {
629
 
                fprintf(stderr, "row_merge_buf_write %p,%d,%lu EOF\n",
630
 
                        (void*) b, of->fd, (ulong) of->offset);
631
 
        }
632
 
#endif /* UNIV_DEBUG */
633
 
}
634
 
 
635
 
/******************************************************//**
636
 
Create a memory heap and allocate space for row_merge_rec_offsets()
637
 
and mrec_buf_t[3].
638
 
@return memory heap */
639
 
static
640
 
mem_heap_t*
641
 
row_merge_heap_create(
642
 
/*==================*/
643
 
        const dict_index_t*     index,          /*!< in: record descriptor */
644
 
        mrec_buf_t**            buf,            /*!< out: 3 buffers */
645
 
        ulint**                 offsets1,       /*!< out: offsets */
646
 
        ulint**                 offsets2)       /*!< out: offsets */
647
 
{
648
 
        ulint           i       = 1 + REC_OFFS_HEADER_SIZE
649
 
                + dict_index_get_n_fields(index);
650
 
        mem_heap_t*     heap    = mem_heap_create(2 * i * sizeof **offsets1
651
 
                                                  + 3 * sizeof **buf);
652
 
 
653
 
        *buf = static_cast<mrec_buf_t*>(mem_heap_alloc(heap, 3 * sizeof **buf));
654
 
        *offsets1 = static_cast<ulint*>(mem_heap_alloc(heap, i * sizeof **offsets1));
655
 
        *offsets2 = static_cast<ulint*>(mem_heap_alloc(heap, i * sizeof **offsets2));
656
 
 
657
 
        (*offsets1)[0] = (*offsets2)[0] = i;
658
 
        (*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
659
 
 
660
 
        return(heap);
661
 
}
662
 
 
663
 
/**********************************************************************//**
664
 
Search an index object by name and column names.  If several indexes match,
665
 
return the index with the max id.
666
 
@return matching index, NULL if not found */
667
 
static
668
 
dict_index_t*
669
 
row_merge_dict_table_get_index(
670
 
/*===========================*/
671
 
        dict_table_t*           table,          /*!< in: table */
672
 
        const merge_index_def_t*index_def)      /*!< in: index definition */
673
 
{
674
 
        ulint           i;
675
 
        dict_index_t*   index;
676
 
        const char**    column_names;
677
 
 
678
 
        column_names = static_cast<const char **>(mem_alloc(index_def->n_fields * sizeof *column_names));
679
 
 
680
 
        for (i = 0; i < index_def->n_fields; ++i) {
681
 
                column_names[i] = index_def->fields[i].field_name;
682
 
        }
683
 
 
684
 
        index = dict_table_get_index_by_max_id(
685
 
                table, index_def->name, column_names, index_def->n_fields);
686
 
 
687
 
        mem_free((void*) column_names);
688
 
 
689
 
        return(index);
690
 
}
691
 
 
692
 
/********************************************************************//**
693
 
Read a merge block from the file system.
694
 
@return TRUE if request was successful, FALSE if fail */
695
 
static
696
 
ibool
697
 
row_merge_read(
698
 
/*===========*/
699
 
        int                     fd,     /*!< in: file descriptor */
700
 
        ulint                   offset, /*!< in: offset where to read
701
 
                                        in number of row_merge_block_t
702
 
                                        elements */
703
 
        row_merge_block_t*      buf)    /*!< out: data */
704
 
{
705
 
        ib_uint64_t     ofs = ((ib_uint64_t) offset) * sizeof *buf;
706
 
        ibool           success;
707
 
 
708
 
#ifdef UNIV_DEBUG
709
 
        if (row_merge_print_block_read) {
710
 
                fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
711
 
                        fd, (ulong) offset);
712
 
        }
713
 
#endif /* UNIV_DEBUG */
714
 
 
715
 
        success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
716
 
                                                 (ulint) (ofs & 0xFFFFFFFF),
717
 
                                                 (ulint) (ofs >> 32),
718
 
                                                 sizeof *buf);
719
 
#ifdef POSIX_FADV_DONTNEED
720
 
        /* Each block is read exactly once.  Free up the file cache. */
721
 
        posix_fadvise(fd, ofs, sizeof *buf, POSIX_FADV_DONTNEED);
722
 
#endif /* POSIX_FADV_DONTNEED */
723
 
 
724
 
        if (UNIV_UNLIKELY(!success)) {
725
 
                ut_print_timestamp(stderr);
726
 
                fprintf(stderr,
727
 
                        "  InnoDB: failed to read merge block at %"PRIu64"\n", ofs);
728
 
        }
729
 
 
730
 
        return(UNIV_LIKELY(success));
731
 
}
732
 
 
733
 
/********************************************************************//**
734
 
Write a merge block to the file system.
735
 
@return TRUE if request was successful, FALSE if fail */
736
 
static
737
 
ibool
738
 
row_merge_write(
739
 
/*============*/
740
 
        int             fd,     /*!< in: file descriptor */
741
 
        ulint           offset, /*!< in: offset where to write,
742
 
                                in number of row_merge_block_t elements */
743
 
        const void*     buf)    /*!< in: data */
744
 
{
745
 
        size_t          buf_len = sizeof(row_merge_block_t);
746
 
        ib_uint64_t     ofs = buf_len * (ib_uint64_t) offset;
747
 
        ibool           ret;
748
 
 
749
 
        ret = os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
750
 
                            (ulint) (ofs & 0xFFFFFFFF),
751
 
                            (ulint) (ofs >> 32),
752
 
                            buf_len);
753
 
 
754
 
#ifdef UNIV_DEBUG
755
 
        if (row_merge_print_block_write) {
756
 
                fprintf(stderr, "row_merge_write fd=%d ofs=%lu\n",
757
 
                        fd, (ulong) offset);
758
 
        }
759
 
#endif /* UNIV_DEBUG */
760
 
 
761
 
#ifdef POSIX_FADV_DONTNEED
762
 
        /* The block will be needed on the next merge pass,
763
 
        but it can be evicted from the file cache meanwhile. */
764
 
        posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
765
 
#endif /* POSIX_FADV_DONTNEED */
766
 
 
767
 
        return(UNIV_LIKELY(ret));
768
 
}
769
 
 
770
 
/********************************************************************//**
771
 
Read a merge record.
772
 
@return pointer to next record, or NULL on I/O error or end of list */
773
 
static __attribute__((nonnull))
774
 
const byte*
775
 
row_merge_read_rec(
776
 
/*===============*/
777
 
        row_merge_block_t*      block,  /*!< in/out: file buffer */
778
 
        mrec_buf_t*             buf,    /*!< in/out: secondary buffer */
779
 
        const byte*             b,      /*!< in: pointer to record */
780
 
        const dict_index_t*     index,  /*!< in: index of the record */
781
 
        int                     fd,     /*!< in: file descriptor */
782
 
        ulint*                  foffs,  /*!< in/out: file offset */
783
 
        const mrec_t**          mrec,   /*!< out: pointer to merge record,
784
 
                                        or NULL on end of list
785
 
                                        (non-NULL on I/O error) */
786
 
        ulint*                  offsets)/*!< out: offsets of mrec */
787
 
{
788
 
        ulint   extra_size;
789
 
        ulint   data_size;
790
 
        ulint   avail_size;
791
 
 
792
 
        ut_ad(block);
793
 
        ut_ad(buf);
794
 
        ut_ad(b >= block[0]);
795
 
        ut_ad(b < block[1]);
796
 
        ut_ad(index);
797
 
        ut_ad(foffs);
798
 
        ut_ad(mrec);
799
 
        ut_ad(offsets);
800
 
 
801
 
        ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
802
 
              + dict_index_get_n_fields(index));
803
 
 
804
 
        extra_size = *b++;
805
 
 
806
 
        if (UNIV_UNLIKELY(!extra_size)) {
807
 
                /* End of list */
808
 
                *mrec = NULL;
809
 
#ifdef UNIV_DEBUG
810
 
                if (row_merge_print_read) {
811
 
                        fprintf(stderr, "row_merge_read %p,%p,%d,%lu EOF\n",
812
 
                                (const void*) b, (const void*) block,
813
 
                                fd, (ulong) *foffs);
814
 
                }
815
 
#endif /* UNIV_DEBUG */
816
 
                return(NULL);
817
 
        }
818
 
 
819
 
        if (extra_size >= 0x80) {
820
 
                /* Read another byte of extra_size. */
821
 
 
822
 
                if (UNIV_UNLIKELY(b >= block[1])) {
823
 
                        if (!row_merge_read(fd, ++(*foffs), block)) {
824
 
err_exit:
825
 
                                /* Signal I/O error. */
826
 
                                *mrec = b;
827
 
                                return(NULL);
828
 
                        }
829
 
 
830
 
                        /* Wrap around to the beginning of the buffer. */
831
 
                        b = block[0];
832
 
                }
833
 
 
834
 
                extra_size = (extra_size & 0x7f) << 8;
835
 
                extra_size |= *b++;
836
 
        }
837
 
 
838
 
        /* Normalize extra_size.  Above, value 0 signals "end of list". */
839
 
        extra_size--;
840
 
 
841
 
        /* Read the extra bytes. */
842
 
 
843
 
        if (UNIV_UNLIKELY(b + extra_size >= block[1])) {
844
 
                /* The record spans two blocks.  Copy the entire record
845
 
                to the auxiliary buffer and handle this as a special
846
 
                case. */
847
 
 
848
 
                avail_size = block[1] - b;
849
 
 
850
 
                memcpy(*buf, b, avail_size);
851
 
 
852
 
                if (!row_merge_read(fd, ++(*foffs), block)) {
853
 
 
854
 
                        goto err_exit;
855
 
                }
856
 
 
857
 
                /* Wrap around to the beginning of the buffer. */
858
 
                b = block[0];
859
 
 
860
 
                /* Copy the record. */
861
 
                memcpy(*buf + avail_size, b, extra_size - avail_size);
862
 
                b += extra_size - avail_size;
863
 
 
864
 
                *mrec = *buf + extra_size;
865
 
 
866
 
                rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
867
 
 
868
 
                data_size = rec_offs_data_size(offsets);
869
 
 
870
 
                /* These overflows should be impossible given that
871
 
                records are much smaller than either buffer, and
872
 
                the record starts near the beginning of each buffer. */
873
 
                ut_a(extra_size + data_size < sizeof *buf);
874
 
                ut_a(b + data_size < block[1]);
875
 
 
876
 
                /* Copy the data bytes. */
877
 
                memcpy(*buf + extra_size, b, data_size);
878
 
                b += data_size;
879
 
 
880
 
                goto func_exit;
881
 
        }
882
 
 
883
 
        *mrec = b + extra_size;
884
 
 
885
 
        rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
886
 
 
887
 
        data_size = rec_offs_data_size(offsets);
888
 
        ut_ad(extra_size + data_size < sizeof *buf);
889
 
 
890
 
        b += extra_size + data_size;
891
 
 
892
 
        if (UNIV_LIKELY(b < block[1])) {
893
 
                /* The record fits entirely in the block.
894
 
                This is the normal case. */
895
 
                goto func_exit;
896
 
        }
897
 
 
898
 
        /* The record spans two blocks.  Copy it to buf. */
899
 
 
900
 
        b -= extra_size + data_size;
901
 
        avail_size = block[1] - b;
902
 
        memcpy(*buf, b, avail_size);
903
 
        *mrec = *buf + extra_size;
904
 
#ifdef UNIV_DEBUG
905
 
        /* We cannot invoke rec_offs_make_valid() here, because there
906
 
        are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
907
 
        Similarly, rec_offs_validate() would fail, because it invokes
908
 
        rec_get_status(). */
909
 
        offsets[2] = (ulint) *mrec;
910
 
        offsets[3] = (ulint) index;
911
 
#endif /* UNIV_DEBUG */
912
 
 
913
 
        if (!row_merge_read(fd, ++(*foffs), block)) {
914
 
 
915
 
                goto err_exit;
916
 
        }
917
 
 
918
 
        /* Wrap around to the beginning of the buffer. */
919
 
        b = block[0];
920
 
 
921
 
        /* Copy the rest of the record. */
922
 
        memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
923
 
        b += extra_size + data_size - avail_size;
924
 
 
925
 
func_exit:
926
 
#ifdef UNIV_DEBUG
927
 
        if (row_merge_print_read) {
928
 
                fprintf(stderr, "row_merge_read %p,%p,%d,%lu ",
929
 
                        (const void*) b, (const void*) block,
930
 
                        fd, (ulong) *foffs);
931
 
                rec_print_comp(stderr, *mrec, offsets);
932
 
                putc('\n', stderr);
933
 
        }
934
 
#endif /* UNIV_DEBUG */
935
 
 
936
 
        return(b);
937
 
}
938
 
 
939
 
/********************************************************************//**
940
 
Write a merge record. */
941
 
static
942
 
void
943
 
row_merge_write_rec_low(
944
 
/*====================*/
945
 
        byte*           b,      /*!< out: buffer */
946
 
        ulint           e,      /*!< in: encoded extra_size */
947
 
#ifdef UNIV_DEBUG
948
 
        ulint           size,   /*!< in: total size to write */
949
 
        int             fd,     /*!< in: file descriptor */
950
 
        ulint           foffs,  /*!< in: file offset */
951
 
#endif /* UNIV_DEBUG */
952
 
        const mrec_t*   mrec,   /*!< in: record to write */
953
 
        const ulint*    offsets)/*!< in: offsets of mrec */
954
 
#ifndef UNIV_DEBUG
955
 
# define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets)  \
956
 
        row_merge_write_rec_low(b, e, mrec, offsets)
957
 
#endif /* !UNIV_DEBUG */
958
 
{
959
 
#ifdef UNIV_DEBUG
960
 
        const byte* const end = b + size;
961
 
        ut_ad(e == rec_offs_extra_size(offsets) + 1);
962
 
 
963
 
        if (row_merge_print_write) {
964
 
                fprintf(stderr, "row_merge_write %p,%d,%lu ",
965
 
                        (void*) b, fd, (ulong) foffs);
966
 
                rec_print_comp(stderr, mrec, offsets);
967
 
                putc('\n', stderr);
968
 
        }
969
 
#endif /* UNIV_DEBUG */
970
 
 
971
 
        if (e < 0x80) {
972
 
                *b++ = (byte) e;
973
 
        } else {
974
 
                *b++ = (byte) (0x80 | (e >> 8));
975
 
                *b++ = (byte) e;
976
 
        }
977
 
 
978
 
        memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
979
 
        ut_ad(b + rec_offs_size(offsets) == end);
980
 
}
981
 
 
982
 
/********************************************************************//**
983
 
Write a merge record.
984
 
@return pointer to end of block, or NULL on error */
985
 
static
986
 
byte*
987
 
row_merge_write_rec(
988
 
/*================*/
989
 
        row_merge_block_t*      block,  /*!< in/out: file buffer */
990
 
        mrec_buf_t*             buf,    /*!< in/out: secondary buffer */
991
 
        byte*                   b,      /*!< in: pointer to end of block */
992
 
        int                     fd,     /*!< in: file descriptor */
993
 
        ulint*                  foffs,  /*!< in/out: file offset */
994
 
        const mrec_t*           mrec,   /*!< in: record to write */
995
 
        const ulint*            offsets)/*!< in: offsets of mrec */
996
 
{
997
 
        ulint   extra_size;
998
 
        ulint   size;
999
 
        ulint   avail_size;
1000
 
 
1001
 
        ut_ad(block);
1002
 
        ut_ad(buf);
1003
 
        ut_ad(b >= block[0]);
1004
 
        ut_ad(b < block[1]);
1005
 
        ut_ad(mrec);
1006
 
        ut_ad(foffs);
1007
 
        ut_ad(mrec < block[0] || mrec > block[1]);
1008
 
        ut_ad(mrec < buf[0] || mrec > buf[1]);
1009
 
 
1010
 
        /* Normalize extra_size.  Value 0 signals "end of list". */
1011
 
        extra_size = rec_offs_extra_size(offsets) + 1;
1012
 
 
1013
 
        size = extra_size + (extra_size >= 0x80)
1014
 
                + rec_offs_data_size(offsets);
1015
 
 
1016
 
        if (UNIV_UNLIKELY(b + size >= block[1])) {
1017
 
                /* The record spans two blocks.
1018
 
                Copy it to the temporary buffer first. */
1019
 
                avail_size = block[1] - b;
1020
 
 
1021
 
                row_merge_write_rec_low(buf[0],
1022
 
                                        extra_size, size, fd, *foffs,
1023
 
                                        mrec, offsets);
1024
 
 
1025
 
                /* Copy the head of the temporary buffer, write
1026
 
                the completed block, and copy the tail of the
1027
 
                record to the head of the new block. */
1028
 
                memcpy(b, buf[0], avail_size);
1029
 
 
1030
 
                if (!row_merge_write(fd, (*foffs)++, block)) {
1031
 
                        return(NULL);
1032
 
                }
1033
 
 
1034
 
                UNIV_MEM_INVALID(block[0], sizeof block[0]);
1035
 
 
1036
 
                /* Copy the rest. */
1037
 
                b = block[0];
1038
 
                memcpy(b, buf[0] + avail_size, size - avail_size);
1039
 
                b += size - avail_size;
1040
 
        } else {
1041
 
                row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
1042
 
                                        mrec, offsets);
1043
 
                b += size;
1044
 
        }
1045
 
 
1046
 
        return(b);
1047
 
}
1048
 
 
1049
 
/********************************************************************//**
1050
 
Write an end-of-list marker.
1051
 
@return pointer to end of block, or NULL on error */
1052
 
static
1053
 
byte*
1054
 
row_merge_write_eof(
1055
 
/*================*/
1056
 
        row_merge_block_t*      block,  /*!< in/out: file buffer */
1057
 
        byte*                   b,      /*!< in: pointer to end of block */
1058
 
        int                     fd,     /*!< in: file descriptor */
1059
 
        ulint*                  foffs)  /*!< in/out: file offset */
1060
 
{
1061
 
        ut_ad(block);
1062
 
        ut_ad(b >= block[0]);
1063
 
        ut_ad(b < block[1]);
1064
 
        ut_ad(foffs);
1065
 
#ifdef UNIV_DEBUG
1066
 
        if (row_merge_print_write) {
1067
 
                fprintf(stderr, "row_merge_write %p,%p,%d,%lu EOF\n",
1068
 
                        (void*) b, (void*) block, fd, (ulong) *foffs);
1069
 
        }
1070
 
#endif /* UNIV_DEBUG */
1071
 
 
1072
 
        *b++ = 0;
1073
 
        UNIV_MEM_ASSERT_RW(block[0], b - block[0]);
1074
 
        UNIV_MEM_ASSERT_W(block[0], sizeof block[0]);
1075
 
#ifdef UNIV_DEBUG_VALGRIND
1076
 
        /* The rest of the block is uninitialized.  Initialize it
1077
 
        to avoid bogus warnings. */
1078
 
        memset(b, 0xff, block[1] - b);
1079
 
#endif /* UNIV_DEBUG_VALGRIND */
1080
 
 
1081
 
        if (!row_merge_write(fd, (*foffs)++, block)) {
1082
 
                return(NULL);
1083
 
        }
1084
 
 
1085
 
        UNIV_MEM_INVALID(block[0], sizeof block[0]);
1086
 
        return(block[0]);
1087
 
}
1088
 
 
1089
 
/*************************************************************//**
1090
 
Compare two merge records.
1091
 
@return 1, 0, -1 if mrec1 is greater, equal, less, respectively, than mrec2 */
1092
 
static
1093
 
int
1094
 
row_merge_cmp(
1095
 
/*==========*/
1096
 
        const mrec_t*           mrec1,          /*!< in: first merge
1097
 
                                                record to be compared */
1098
 
        const mrec_t*           mrec2,          /*!< in: second merge
1099
 
                                                record to be compared */
1100
 
        const ulint*            offsets1,       /*!< in: first record offsets */
1101
 
        const ulint*            offsets2,       /*!< in: second record offsets */
1102
 
        const dict_index_t*     index,          /*!< in: index */
1103
 
        ibool*                  null_eq)        /*!< out: set to TRUE if
1104
 
                                                found matching null values */
1105
 
{
1106
 
        int     cmp;
1107
 
 
1108
 
        cmp = cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index,
1109
 
                                 null_eq);
1110
 
 
1111
 
#ifdef UNIV_DEBUG
1112
 
        if (row_merge_print_cmp) {
1113
 
                fputs("row_merge_cmp1 ", stderr);
1114
 
                rec_print_comp(stderr, mrec1, offsets1);
1115
 
                fputs("\nrow_merge_cmp2 ", stderr);
1116
 
                rec_print_comp(stderr, mrec2, offsets2);
1117
 
                fprintf(stderr, "\nrow_merge_cmp=%d\n", cmp);
1118
 
        }
1119
 
#endif /* UNIV_DEBUG */
1120
 
 
1121
 
        return(cmp);
1122
 
}
1123
 
 
1124
 
/********************************************************************//**
1125
 
Reads clustered index of the table and create temporary files
1126
 
containing the index entries for the indexes to be built.
1127
 
@return DB_SUCCESS or error */
1128
 
static __attribute__((nonnull))
1129
 
ulint
1130
 
row_merge_read_clustered_index(
1131
 
/*===========================*/
1132
 
        trx_t*                  trx,    /*!< in: transaction */
1133
 
        TABLE*          table,  /*!< in/out: MySQL table object,
1134
 
                                        for reporting erroneous records */
1135
 
        const dict_table_t*     old_table,/*!< in: table where rows are
1136
 
                                        read from */
1137
 
        const dict_table_t*     new_table,/*!< in: table where indexes are
1138
 
                                        created; identical to old_table
1139
 
                                        unless creating a PRIMARY KEY */
1140
 
        dict_index_t**          index,  /*!< in: indexes to be created */
1141
 
        merge_file_t*           files,  /*!< in: temporary files */
1142
 
        ulint                   n_index,/*!< in: number of indexes to create */
1143
 
        row_merge_block_t*      block)  /*!< in/out: file buffer */
1144
 
{
1145
 
        dict_index_t*           clust_index;    /* Clustered index */
1146
 
        mem_heap_t*             row_heap;       /* Heap memory to create
1147
 
                                                clustered index records */
1148
 
        row_merge_buf_t**       merge_buf;      /* Temporary list for records*/
1149
 
        btr_pcur_t              pcur;           /* Persistent cursor on the
1150
 
                                                clustered index */
1151
 
        mtr_t                   mtr;            /* Mini transaction */
1152
 
        ulint                   err = DB_SUCCESS;/* Return code */
1153
 
        ulint                   i;
1154
 
        ulint                   n_nonnull = 0;  /* number of columns
1155
 
                                                changed to NOT NULL */
1156
 
        ulint*                  nonnull = NULL; /* NOT NULL columns */
1157
 
 
1158
 
        trx->op_info = "reading clustered index";
1159
 
 
1160
 
        ut_ad(trx);
1161
 
        ut_ad(old_table);
1162
 
        ut_ad(new_table);
1163
 
        ut_ad(index);
1164
 
        ut_ad(files);
1165
 
 
1166
 
        /* Create and initialize memory for record buffers */
1167
 
 
1168
 
        merge_buf = static_cast<row_merge_buf_t **>(mem_alloc(n_index * sizeof *merge_buf));
1169
 
 
1170
 
        for (i = 0; i < n_index; i++) {
1171
 
                merge_buf[i] = row_merge_buf_create(index[i]);
1172
 
        }
1173
 
 
1174
 
        mtr_start(&mtr);
1175
 
 
1176
 
        /* Find the clustered index and create a persistent cursor
1177
 
        based on that. */
1178
 
 
1179
 
        clust_index = dict_table_get_first_index(old_table);
1180
 
 
1181
 
        btr_pcur_open_at_index_side(
1182
 
                TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
1183
 
 
1184
 
        if (UNIV_UNLIKELY(old_table != new_table)) {
1185
 
                ulint   n_cols = dict_table_get_n_cols(old_table);
1186
 
 
1187
 
                /* A primary key will be created.  Identify the
1188
 
                columns that were flagged NOT NULL in the new table,
1189
 
                so that we can quickly check that the records in the
1190
 
                (old) clustered index do not violate the added NOT
1191
 
                NULL constraints. */
1192
 
 
1193
 
                ut_a(n_cols == dict_table_get_n_cols(new_table));
1194
 
 
1195
 
                nonnull = static_cast<ulint*>(mem_alloc(n_cols * sizeof *nonnull));
1196
 
 
1197
 
                for (i = 0; i < n_cols; i++) {
1198
 
                        if (dict_table_get_nth_col(old_table, i)->prtype
1199
 
                            & DATA_NOT_NULL) {
1200
 
 
1201
 
                                continue;
1202
 
                        }
1203
 
 
1204
 
                        if (dict_table_get_nth_col(new_table, i)->prtype
1205
 
                            & DATA_NOT_NULL) {
1206
 
 
1207
 
                                nonnull[n_nonnull++] = i;
1208
 
                        }
1209
 
                }
1210
 
 
1211
 
                if (!n_nonnull) {
1212
 
                        mem_free(nonnull);
1213
 
                        nonnull = NULL;
1214
 
                }
1215
 
        }
1216
 
 
1217
 
        row_heap = mem_heap_create(sizeof(mrec_buf_t));
1218
 
 
1219
 
        /* Scan the clustered index. */
1220
 
        for (;;) {
1221
 
                const rec_t*    rec;
1222
 
                ulint*          offsets;
1223
 
                dtuple_t*       row             = NULL;
1224
 
                row_ext_t*      ext;
1225
 
                ibool           has_next        = TRUE;
1226
 
 
1227
 
                btr_pcur_move_to_next_on_page(&pcur);
1228
 
 
1229
 
                /* When switching pages, commit the mini-transaction
1230
 
                in order to release the latch on the old page. */
1231
 
 
1232
 
                if (btr_pcur_is_after_last_on_page(&pcur)) {
1233
 
                        if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1234
 
                                err = DB_INTERRUPTED;
1235
 
                                trx->error_key_num = 0;
1236
 
                                goto func_exit;
1237
 
                        }
1238
 
 
1239
 
                        btr_pcur_store_position(&pcur, &mtr);
1240
 
                        mtr_commit(&mtr);
1241
 
                        mtr_start(&mtr);
1242
 
                        btr_pcur_restore_position(BTR_SEARCH_LEAF,
1243
 
                                                  &pcur, &mtr);
1244
 
                        has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1245
 
                }
1246
 
 
1247
 
                if (UNIV_LIKELY(has_next)) {
1248
 
                        rec = btr_pcur_get_rec(&pcur);
1249
 
                        offsets = rec_get_offsets(rec, clust_index, NULL,
1250
 
                                                  ULINT_UNDEFINED, &row_heap);
1251
 
 
1252
 
                        /* Skip delete marked records. */
1253
 
                        if (rec_get_deleted_flag(
1254
 
                                    rec, dict_table_is_comp(old_table))) {
1255
 
                                continue;
1256
 
                        }
1257
 
 
1258
 
                        srv_n_rows_inserted++;
1259
 
 
1260
 
                        /* Build a row based on the clustered index. */
1261
 
 
1262
 
                        row = row_build(ROW_COPY_POINTERS, clust_index,
1263
 
                                        rec, offsets,
1264
 
                                        new_table, &ext, row_heap);
1265
 
 
1266
 
                        if (UNIV_LIKELY_NULL(nonnull)) {
1267
 
                                for (i = 0; i < n_nonnull; i++) {
1268
 
                                        dfield_t*       field
1269
 
                                                = &row->fields[nonnull[i]];
1270
 
                                        dtype_t*        field_type
1271
 
                                                = dfield_get_type(field);
1272
 
 
1273
 
                                        ut_a(!(field_type->prtype
1274
 
                                               & DATA_NOT_NULL));
1275
 
 
1276
 
                                        if (dfield_is_null(field)) {
1277
 
                                                err = DB_PRIMARY_KEY_IS_NULL;
1278
 
                                                trx->error_key_num = 0;
1279
 
                                                goto func_exit;
1280
 
                                        }
1281
 
 
1282
 
                                        field_type->prtype |= DATA_NOT_NULL;
1283
 
                                }
1284
 
                        }
1285
 
                }
1286
 
 
1287
 
                /* Build all entries for all the indexes to be created
1288
 
                in a single scan of the clustered index. */
1289
 
 
1290
 
                for (i = 0; i < n_index; i++) {
1291
 
                        row_merge_buf_t*        buf     = merge_buf[i];
1292
 
                        merge_file_t*           file    = &files[i];
1293
 
                        const dict_index_t*     buf_index       = buf->index;
1294
 
 
1295
 
                        if (UNIV_LIKELY
1296
 
                            (row && row_merge_buf_add(buf, row, ext))) {
1297
 
                                file->n_rec++;
1298
 
                                continue;
1299
 
                        }
1300
 
 
1301
 
                        /* The buffer must be sufficiently large
1302
 
                        to hold at least one record. */
1303
 
                        ut_ad(buf->n_tuples || !has_next);
1304
 
 
1305
 
                        /* We have enough data tuples to form a block.
1306
 
                        Sort them and write to disk. */
1307
 
 
1308
 
                        if (buf->n_tuples) {
1309
 
                                if (dict_index_is_unique(buf_index)) {
1310
 
                                        row_merge_dup_t dup;
1311
 
                                        dup.index = buf->index;
1312
 
                                        dup.table = table;
1313
 
                                        dup.n_dup = 0;
1314
 
 
1315
 
                                        row_merge_buf_sort(buf, &dup);
1316
 
 
1317
 
                                        if (dup.n_dup) {
1318
 
                                                err = DB_DUPLICATE_KEY;
1319
 
                                                trx->error_key_num = i;
1320
 
                                                goto func_exit;
1321
 
                                        }
1322
 
                                } else {
1323
 
                                        row_merge_buf_sort(buf, NULL);
1324
 
                                }
1325
 
                        }
1326
 
 
1327
 
                        row_merge_buf_write(buf, file, block);
1328
 
 
1329
 
                        if (!row_merge_write(file->fd, file->offset++,
1330
 
                                             block)) {
1331
 
                                err = DB_OUT_OF_FILE_SPACE;
1332
 
                                trx->error_key_num = i;
1333
 
                                goto func_exit;
1334
 
                        }
1335
 
 
1336
 
                        UNIV_MEM_INVALID(block[0], sizeof block[0]);
1337
 
                        merge_buf[i] = row_merge_buf_empty(buf);
1338
 
 
1339
 
                        if (UNIV_LIKELY(row != NULL)) {
1340
 
                                /* Try writing the record again, now
1341
 
                                that the buffer has been written out
1342
 
                                and emptied. */
1343
 
 
1344
 
                                if (UNIV_UNLIKELY
1345
 
                                    (!row_merge_buf_add(buf, row, ext))) {
1346
 
                                        /* An empty buffer should have enough
1347
 
                                        room for at least one record. */
1348
 
                                        ut_error;
1349
 
                                }
1350
 
 
1351
 
                                file->n_rec++;
1352
 
                        }
1353
 
                }
1354
 
 
1355
 
                mem_heap_empty(row_heap);
1356
 
 
1357
 
                if (UNIV_UNLIKELY(!has_next)) {
1358
 
                        goto func_exit;
1359
 
                }
1360
 
        }
1361
 
 
1362
 
func_exit:
1363
 
        btr_pcur_close(&pcur);
1364
 
        mtr_commit(&mtr);
1365
 
        mem_heap_free(row_heap);
1366
 
 
1367
 
        if (UNIV_LIKELY_NULL(nonnull)) {
1368
 
                mem_free(nonnull);
1369
 
        }
1370
 
 
1371
 
        for (i = 0; i < n_index; i++) {
1372
 
                row_merge_buf_free(merge_buf[i]);
1373
 
        }
1374
 
 
1375
 
        mem_free(merge_buf);
1376
 
 
1377
 
        trx->op_info = "";
1378
 
 
1379
 
        return(err);
1380
 
}
1381
 
 
1382
 
/** Write a record via buffer 2 and read the next record to buffer N.
1383
 
@param N        number of the buffer (0 or 1)
1384
 
@param AT_END   statement to execute at end of input */
1385
 
#define ROW_MERGE_WRITE_GET_NEXT(N, AT_END)                             \
1386
 
        do {                                                            \
1387
 
                b2 = row_merge_write_rec(&block[2], &buf[2], b2,        \
1388
 
                                         of->fd, &of->offset,           \
1389
 
                                         mrec##N, offsets##N);          \
1390
 
                if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) {  \
1391
 
                        goto corrupt;                                   \
1392
 
                }                                                       \
1393
 
                b##N = row_merge_read_rec(&block[N], &buf[N],           \
1394
 
                                          b##N, index,                  \
1395
 
                                          file->fd, foffs##N,           \
1396
 
                                          &mrec##N, offsets##N);        \
1397
 
                if (UNIV_UNLIKELY(!b##N)) {                             \
1398
 
                        if (mrec##N) {                                  \
1399
 
                                goto corrupt;                           \
1400
 
                        }                                               \
1401
 
                        AT_END;                                         \
1402
 
                }                                                       \
1403
 
        } while (0)
1404
 
 
1405
 
/*************************************************************//**
1406
 
Merge two blocks of records on disk and write a bigger block.
1407
 
@return DB_SUCCESS or error code */
1408
 
static
1409
 
ulint
1410
 
row_merge_blocks(
1411
 
/*=============*/
1412
 
        const dict_index_t*     index,  /*!< in: index being created */
1413
 
        const merge_file_t*     file,   /*!< in: file containing
1414
 
                                        index entries */
1415
 
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
1416
 
        ulint*                  foffs0, /*!< in/out: offset of first
1417
 
                                        source list in the file */
1418
 
        ulint*                  foffs1, /*!< in/out: offset of second
1419
 
                                        source list in the file */
1420
 
        merge_file_t*           of,     /*!< in/out: output file */
1421
 
        TABLE*          table)  /*!< in/out: MySQL table, for
1422
 
                                        reporting erroneous key value
1423
 
                                        if applicable */
1424
 
{
1425
 
        mem_heap_t*     heap;   /*!< memory heap for offsets0, offsets1 */
1426
 
 
1427
 
        mrec_buf_t*     buf;    /*!< buffer for handling
1428
 
                                split mrec in block[] */
1429
 
        const byte*     b0;     /*!< pointer to block[0] */
1430
 
        const byte*     b1;     /*!< pointer to block[1] */
1431
 
        byte*           b2;     /*!< pointer to block[2] */
1432
 
        const mrec_t*   mrec0;  /*!< merge rec, points to block[0] or buf[0] */
1433
 
        const mrec_t*   mrec1;  /*!< merge rec, points to block[1] or buf[1] */
1434
 
        ulint*          offsets0;/* offsets of mrec0 */
1435
 
        ulint*          offsets1;/* offsets of mrec1 */
1436
 
 
1437
 
#ifdef UNIV_DEBUG
1438
 
        if (row_merge_print_block) {
1439
 
                fprintf(stderr,
1440
 
                        "row_merge_blocks fd=%d ofs=%lu + fd=%d ofs=%lu"
1441
 
                        " = fd=%d ofs=%lu\n",
1442
 
                        file->fd, (ulong) *foffs0,
1443
 
                        file->fd, (ulong) *foffs1,
1444
 
                        of->fd, (ulong) of->offset);
1445
 
        }
1446
 
#endif /* UNIV_DEBUG */
1447
 
 
1448
 
        heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
1449
 
 
1450
 
        buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof(mrec_buf_t) * 3));
1451
 
 
1452
 
        /* Write a record and read the next record.  Split the output
1453
 
        file in two halves, which can be merged on the following pass. */
1454
 
 
1455
 
        if (!row_merge_read(file->fd, *foffs0, &block[0])
1456
 
            || !row_merge_read(file->fd, *foffs1, &block[1])) {
1457
 
corrupt:
1458
 
                mem_heap_free(heap);
1459
 
                return(DB_CORRUPTION);
1460
 
        }
1461
 
 
1462
 
        b0 = block[0];
1463
 
        b1 = block[1];
1464
 
        b2 = block[2];
1465
 
 
1466
 
        b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
1467
 
                                foffs0, &mrec0, offsets0);
1468
 
        b1 = row_merge_read_rec(&block[1], &buf[1], b1, index, file->fd,
1469
 
                                foffs1, &mrec1, offsets1);
1470
 
        if (UNIV_UNLIKELY(!b0 && mrec0)
1471
 
            || UNIV_UNLIKELY(!b1 && mrec1)) {
1472
 
 
1473
 
                goto corrupt;
1474
 
        }
1475
 
 
1476
 
        while (mrec0 && mrec1) {
1477
 
                ibool   null_eq = FALSE;
1478
 
                switch (row_merge_cmp(mrec0, mrec1,
1479
 
                                      offsets0, offsets1, index,
1480
 
                                      &null_eq)) {
1481
 
                case 0:
1482
 
                        if (UNIV_UNLIKELY
1483
 
                            (dict_index_is_unique(index) && !null_eq)) {
1484
 
                                innobase_rec_to_mysql(table, mrec0,
1485
 
                                                      index, offsets0);
1486
 
                                mem_heap_free(heap);
1487
 
                                return(DB_DUPLICATE_KEY);
1488
 
                        }
1489
 
                        /* fall through */
1490
 
                case -1:
1491
 
                        ROW_MERGE_WRITE_GET_NEXT(0, goto merged);
1492
 
                        break;
1493
 
                case 1:
1494
 
                        ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
1495
 
                        break;
1496
 
                default:
1497
 
                        ut_error;
1498
 
                }
1499
 
 
1500
 
        }
1501
 
 
1502
 
merged:
1503
 
        if (mrec0) {
1504
 
                /* append all mrec0 to output */
1505
 
                for (;;) {
1506
 
                        ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
1507
 
                }
1508
 
        }
1509
 
done0:
1510
 
        if (mrec1) {
1511
 
                /* append all mrec1 to output */
1512
 
                for (;;) {
1513
 
                        ROW_MERGE_WRITE_GET_NEXT(1, goto done1);
1514
 
                }
1515
 
        }
1516
 
done1:
1517
 
 
1518
 
        mem_heap_free(heap);
1519
 
        b2 = row_merge_write_eof(&block[2], b2, of->fd, &of->offset);
1520
 
        return(b2 ? DB_SUCCESS : DB_CORRUPTION);
1521
 
}
1522
 
 
1523
 
/*************************************************************//**
1524
 
Copy a block of index entries.
1525
 
@return TRUE on success, FALSE on failure */
1526
 
static __attribute__((nonnull))
1527
 
ibool
1528
 
row_merge_blocks_copy(
1529
 
/*==================*/
1530
 
        const dict_index_t*     index,  /*!< in: index being created */
1531
 
        const merge_file_t*     file,   /*!< in: input file */
1532
 
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
1533
 
        ulint*                  foffs0, /*!< in/out: input file offset */
1534
 
        merge_file_t*           of)     /*!< in/out: output file */
1535
 
{
1536
 
        mem_heap_t*     heap;   /*!< memory heap for offsets0, offsets1 */
1537
 
 
1538
 
        mrec_buf_t*     buf;    /*!< buffer for handling
1539
 
                                split mrec in block[] */
1540
 
        const byte*     b0;     /*!< pointer to block[0] */
1541
 
        byte*           b2;     /*!< pointer to block[2] */
1542
 
        const mrec_t*   mrec0;  /*!< merge rec, points to block[0] */
1543
 
        ulint*          offsets0;/* offsets of mrec0 */
1544
 
        ulint*          offsets1;/* dummy offsets */
1545
 
 
1546
 
#ifdef UNIV_DEBUG
1547
 
        if (row_merge_print_block) {
1548
 
                fprintf(stderr,
1549
 
                        "row_merge_blocks_copy fd=%d ofs=%lu"
1550
 
                        " = fd=%d ofs=%lu\n",
1551
 
                        file->fd, (ulong) foffs0,
1552
 
                        of->fd, (ulong) of->offset);
1553
 
        }
1554
 
#endif /* UNIV_DEBUG */
1555
 
 
1556
 
        heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
1557
 
        buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof(mrec_buf_t) * 3));
1558
 
 
1559
 
        /* Write a record and read the next record.  Split the output
1560
 
        file in two halves, which can be merged on the following pass. */
1561
 
 
1562
 
        if (!row_merge_read(file->fd, *foffs0, &block[0])) {
1563
 
corrupt:
1564
 
                mem_heap_free(heap);
1565
 
                return(FALSE);
1566
 
        }
1567
 
 
1568
 
        b0 = block[0];
1569
 
        b2 = block[2];
1570
 
 
1571
 
        b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
1572
 
                                foffs0, &mrec0, offsets0);
1573
 
        if (UNIV_UNLIKELY(!b0 && mrec0)) {
1574
 
 
1575
 
                goto corrupt;
1576
 
        }
1577
 
 
1578
 
        if (mrec0) {
1579
 
                /* append all mrec0 to output */
1580
 
                for (;;) {
1581
 
                        ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
1582
 
                }
1583
 
        }
1584
 
done0:
1585
 
 
1586
 
        /* The file offset points to the beginning of the last page
1587
 
        that has been read.  Update it to point to the next block. */
1588
 
        (*foffs0)++;
1589
 
 
1590
 
        mem_heap_free(heap);
1591
 
        return(row_merge_write_eof(&block[2], b2, of->fd, &of->offset)
1592
 
               != NULL);
1593
 
}
1594
 
 
1595
 
/*************************************************************//**
1596
 
Merge disk files.
1597
 
@return DB_SUCCESS or error code */
1598
 
static __attribute__((nonnull))
1599
 
ulint
1600
 
row_merge(
1601
 
/*======*/
1602
 
        trx_t*                  trx,    /*!< in: transaction */
1603
 
        const dict_index_t*     index,  /*!< in: index being created */
1604
 
        merge_file_t*           file,   /*!< in/out: file containing
1605
 
                                        index entries */
1606
 
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
1607
 
        int*                    tmpfd,  /*!< in/out: temporary file handle */
1608
 
        TABLE*          table,  /*!< in/out: MySQL table, for
1609
 
                                        reporting erroneous key value
1610
 
                                        if applicable */
1611
 
        ulint*                  num_run,/*!< in/out: Number of runs remain
1612
 
                                        to be merged */
1613
 
        ulint*                  run_offset) /*!< in/out: Array contains the
1614
 
                                        first offset number for each merge
1615
 
                                        run */
1616
 
{
1617
 
        ulint           foffs0; /*!< first input offset */
1618
 
        ulint           foffs1; /*!< second input offset */
1619
 
        ulint           error;  /*!< error code */
1620
 
        merge_file_t    of;     /*!< output file */
1621
 
        const ulint     ihalf   = run_offset[*num_run / 2];
1622
 
                                /*!< half the input file */
1623
 
        ulint           n_run   = 0;
1624
 
                                /*!< num of runs generated from this merge */
1625
 
 
1626
 
 
1627
 
        UNIV_MEM_ASSERT_W(block[0], 3 * sizeof block[0]);
1628
 
        ut_ad(ihalf < file->offset);
1629
 
 
1630
 
        of.fd = *tmpfd;
1631
 
        of.offset = 0;
1632
 
        of.n_rec = 0;
1633
 
 
1634
 
#ifdef POSIX_FADV_SEQUENTIAL
1635
 
        /* The input file will be read sequentially, starting from the
1636
 
        beginning and the middle.  In Linux, the POSIX_FADV_SEQUENTIAL
1637
 
        affects the entire file.  Each block will be read exactly once. */
1638
 
        posix_fadvise(file->fd, 0, 0,
1639
 
                      POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
1640
 
#endif /* POSIX_FADV_SEQUENTIAL */
1641
 
 
1642
 
        /* Merge blocks to the output file. */
1643
 
        foffs0 = 0;
1644
 
        foffs1 = ihalf;
1645
 
 
1646
 
        UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
1647
 
 
1648
 
        for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
1649
 
 
1650
 
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1651
 
                        return(DB_INTERRUPTED);
1652
 
                }
1653
 
 
1654
 
                /* Remember the offset number for this run */
1655
 
                run_offset[n_run++] = of.offset;
1656
 
 
1657
 
                error = row_merge_blocks(index, file, block,
1658
 
                                         &foffs0, &foffs1, &of, table);
1659
 
 
1660
 
                if (error != DB_SUCCESS) {
1661
 
                        return(error);
1662
 
                }
1663
 
 
1664
 
        }
1665
 
 
1666
 
        /* Copy the last blocks, if there are any. */
1667
 
 
1668
 
        while (foffs0 < ihalf) {
1669
 
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1670
 
                        return(DB_INTERRUPTED);
1671
 
                }
1672
 
 
1673
 
                /* Remember the offset number for this run */
1674
 
                run_offset[n_run++] = of.offset;
1675
 
 
1676
 
                if (!row_merge_blocks_copy(index, file, block, &foffs0, &of)) {
1677
 
                        return(DB_CORRUPTION);
1678
 
                }
1679
 
        }
1680
 
 
1681
 
        ut_ad(foffs0 == ihalf);
1682
 
 
1683
 
        while (foffs1 < file->offset) {
1684
 
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1685
 
                        return(DB_INTERRUPTED);
1686
 
                }
1687
 
 
1688
 
                /* Remember the offset number for this run */
1689
 
                run_offset[n_run++] = of.offset;
1690
 
 
1691
 
                if (!row_merge_blocks_copy(index, file, block, &foffs1, &of)) {
1692
 
                        return(DB_CORRUPTION);
1693
 
                }
1694
 
        }
1695
 
 
1696
 
        ut_ad(foffs1 == file->offset);
1697
 
 
1698
 
        if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
1699
 
                return(DB_CORRUPTION);
1700
 
        }
1701
 
 
1702
 
        ut_ad(n_run <= *num_run);
1703
 
 
1704
 
        *num_run = n_run;
1705
 
 
1706
 
        /* Each run can contain one or more offsets. As merge goes on,
1707
 
        the number of runs (to merge) will reduce until we have one
1708
 
        single run. So the number of runs will always be smaller than
1709
 
        the number of offsets in file */
1710
 
        ut_ad((*num_run) <= file->offset);
1711
 
 
1712
 
        /* The number of offsets in output file is always equal or
1713
 
        smaller than input file */
1714
 
        ut_ad(of.offset <= file->offset);
1715
 
 
1716
 
        /* Swap file descriptors for the next pass. */
1717
 
        *tmpfd = file->fd;
1718
 
        *file = of;
1719
 
 
1720
 
        UNIV_MEM_INVALID(block[0], 3 * sizeof block[0]);
1721
 
 
1722
 
        return(DB_SUCCESS);
1723
 
}
1724
 
 
1725
 
/*************************************************************//**
1726
 
Merge disk files.
1727
 
@return DB_SUCCESS or error code */
1728
 
static
1729
 
ulint
1730
 
row_merge_sort(
1731
 
/*===========*/
1732
 
        trx_t*                  trx,    /*!< in: transaction */
1733
 
        const dict_index_t*     index,  /*!< in: index being created */
1734
 
        merge_file_t*           file,   /*!< in/out: file containing
1735
 
                                        index entries */
1736
 
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
1737
 
        int*                    tmpfd,  /*!< in/out: temporary file handle */
1738
 
        TABLE*          table)  /*!< in/out: MySQL table, for
1739
 
                                        reporting erroneous key value
1740
 
                                        if applicable */
1741
 
{
1742
 
        ulint   half = file->offset / 2;
1743
 
        ulint   num_runs;
1744
 
        ulint*  run_offset;
1745
 
        ulint   error = DB_SUCCESS;
1746
 
 
1747
 
        /* Record the number of merge runs we need to perform */
1748
 
        num_runs = file->offset;
1749
 
 
1750
 
        /* If num_runs are less than 1, nothing to merge */
1751
 
        if (num_runs <= 1) {
1752
 
                return(error);
1753
 
        }
1754
 
 
1755
 
        /* "run_offset" records each run's first offset number */
1756
 
        run_offset = (ulint*) mem_alloc(file->offset * sizeof(ulint));
1757
 
 
1758
 
        /* This tells row_merge() where to start for the first round
1759
 
        of merge. */
1760
 
        run_offset[half] = half;
1761
 
 
1762
 
        /* The file should always contain at least one byte (the end
1763
 
        of file marker).  Thus, it must be at least one block. */
1764
 
        ut_ad(file->offset > 0);
1765
 
 
1766
 
        /* Merge the runs until we have one big run */
1767
 
        do {
1768
 
                error = row_merge(trx, index, file, block, tmpfd,
1769
 
                                  table, &num_runs, run_offset);
1770
 
 
1771
 
                UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
1772
 
 
1773
 
                if (error != DB_SUCCESS) {
1774
 
                        break;
1775
 
                }
1776
 
        } while (num_runs > 1);
1777
 
 
1778
 
        mem_free(run_offset);
1779
 
 
1780
 
        return(error);
1781
 
}
1782
 
 
1783
 
/*************************************************************//**
1784
 
Copy externally stored columns to the data tuple. */
1785
 
static
1786
 
void
1787
 
row_merge_copy_blobs(
1788
 
/*=================*/
1789
 
        const mrec_t*   mrec,   /*!< in: merge record */
1790
 
        const ulint*    offsets,/*!< in: offsets of mrec */
1791
 
        ulint           zip_size,/*!< in: compressed page size in bytes, or 0 */
1792
 
        dtuple_t*       tuple,  /*!< in/out: data tuple */
1793
 
        mem_heap_t*     heap)   /*!< in/out: memory heap */
1794
 
{
1795
 
        ulint   i;
1796
 
        ulint   n_fields = dtuple_get_n_fields(tuple);
1797
 
 
1798
 
        for (i = 0; i < n_fields; i++) {
1799
 
                ulint           len;
1800
 
                const void*     data;
1801
 
                dfield_t*       field = dtuple_get_nth_field(tuple, i);
1802
 
 
1803
 
                if (!dfield_is_ext(field)) {
1804
 
                        continue;
1805
 
                }
1806
 
 
1807
 
                ut_ad(!dfield_is_null(field));
1808
 
 
1809
 
                /* The table is locked during index creation.
1810
 
                Therefore, externally stored columns cannot possibly
1811
 
                be freed between the time the BLOB pointers are read
1812
 
                (row_merge_read_clustered_index()) and dereferenced
1813
 
                (below). */
1814
 
                data = btr_rec_copy_externally_stored_field(
1815
 
                        mrec, offsets, zip_size, i, &len, heap);
1816
 
                /* Because we have locked the table, any records
1817
 
                written by incomplete transactions must have been
1818
 
                rolled back already. There must not be any incomplete
1819
 
                BLOB columns. */
1820
 
                ut_a(data);
1821
 
 
1822
 
                dfield_set_data(field, data, len);
1823
 
        }
1824
 
}
1825
 
 
1826
 
/********************************************************************//**
1827
 
Read sorted file containing index data tuples and insert these data
1828
 
tuples to the index
1829
 
@return DB_SUCCESS or error number */
1830
 
static
1831
 
ulint
1832
 
row_merge_insert_index_tuples(
1833
 
/*==========================*/
1834
 
        trx_t*                  trx,    /*!< in: transaction */
1835
 
        dict_index_t*           index,  /*!< in: index */
1836
 
        dict_table_t*           table,  /*!< in: new table */
1837
 
        ulint                   zip_size,/*!< in: compressed page size of
1838
 
                                         the old table, or 0 if uncompressed */
1839
 
        int                     fd,     /*!< in: file descriptor */
1840
 
        row_merge_block_t*      block)  /*!< in/out: file buffer */
1841
 
{
1842
 
        const byte*             b;
1843
 
        que_thr_t*              thr;
1844
 
        ins_node_t*             node;
1845
 
        mem_heap_t*             tuple_heap;
1846
 
        mem_heap_t*             graph_heap;
1847
 
        ulint                   error = DB_SUCCESS;
1848
 
        ulint                   foffs = 0;
1849
 
        ulint*                  offsets;
1850
 
 
1851
 
        ut_ad(trx);
1852
 
        ut_ad(index);
1853
 
        ut_ad(table);
1854
 
 
1855
 
        /* We use the insert query graph as the dummy graph
1856
 
        needed in the row module call */
1857
 
 
1858
 
        trx->op_info = "inserting index entries";
1859
 
 
1860
 
        graph_heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1861
 
        node = ins_node_create(INS_DIRECT, table, graph_heap);
1862
 
 
1863
 
        thr = pars_complete_graph_for_exec(node, trx, graph_heap);
1864
 
 
1865
 
        que_thr_move_to_run_state_for_mysql(thr, trx);
1866
 
 
1867
 
        tuple_heap = mem_heap_create(1000);
1868
 
 
1869
 
        {
1870
 
                ulint i = 1 + REC_OFFS_HEADER_SIZE
1871
 
                        + dict_index_get_n_fields(index);
1872
 
                offsets = static_cast<ulint *>(mem_heap_alloc(graph_heap, i * sizeof *offsets));
1873
 
                offsets[0] = i;
1874
 
                offsets[1] = dict_index_get_n_fields(index);
1875
 
        }
1876
 
 
1877
 
        b = *block;
1878
 
 
1879
 
        if (!row_merge_read(fd, foffs, block)) {
1880
 
                error = DB_CORRUPTION;
1881
 
        } else {
1882
 
                mrec_buf_t*     buf = static_cast<mrec_buf_t *>(mem_heap_alloc(graph_heap, sizeof *buf));
1883
 
 
1884
 
                for (;;) {
1885
 
                        const mrec_t*   mrec;
1886
 
                        dtuple_t*       dtuple;
1887
 
                        ulint           n_ext;
1888
 
 
1889
 
                        b = row_merge_read_rec(block, buf, b, index,
1890
 
                                               fd, &foffs, &mrec, offsets);
1891
 
                        if (UNIV_UNLIKELY(!b)) {
1892
 
                                /* End of list, or I/O error */
1893
 
                                if (mrec) {
1894
 
                                        error = DB_CORRUPTION;
1895
 
                                }
1896
 
                                break;
1897
 
                        }
1898
 
 
1899
 
                        dtuple = row_rec_to_index_entry_low(
1900
 
                                mrec, index, offsets, &n_ext, tuple_heap);
1901
 
 
1902
 
                        if (UNIV_UNLIKELY(n_ext)) {
1903
 
                                row_merge_copy_blobs(mrec, offsets, zip_size,
1904
 
                                                     dtuple, tuple_heap);
1905
 
                        }
1906
 
 
1907
 
                        node->row = dtuple;
1908
 
                        node->table = table;
1909
 
                        node->trx_id = trx->id;
1910
 
 
1911
 
                        ut_ad(dtuple_validate(dtuple));
1912
 
 
1913
 
                        do {
1914
 
                                thr->run_node = thr;
1915
 
                                thr->prev_node = thr->common.parent;
1916
 
 
1917
 
                                error = row_ins_index_entry(index, dtuple,
1918
 
                                                            0, FALSE, thr);
1919
 
 
1920
 
                                if (UNIV_LIKELY(error == DB_SUCCESS)) {
1921
 
 
1922
 
                                        goto next_rec;
1923
 
                                }
1924
 
 
1925
 
                                thr->lock_state = QUE_THR_LOCK_ROW;
1926
 
                                trx->error_state = error;
1927
 
                                que_thr_stop_for_mysql(thr);
1928
 
                                thr->lock_state = QUE_THR_LOCK_NOLOCK;
1929
 
                        } while (row_mysql_handle_errors(&error, trx,
1930
 
                                                         thr, NULL));
1931
 
 
1932
 
                        goto err_exit;
1933
 
next_rec:
1934
 
                        mem_heap_empty(tuple_heap);
1935
 
                }
1936
 
        }
1937
 
 
1938
 
        que_thr_stop_for_mysql_no_error(thr, trx);
1939
 
err_exit:
1940
 
        que_graph_free(thr->graph);
1941
 
 
1942
 
        trx->op_info = "";
1943
 
 
1944
 
        mem_heap_free(tuple_heap);
1945
 
 
1946
 
        return(error);
1947
 
}
1948
 
 
1949
 
/*********************************************************************//**
1950
 
Sets an exclusive lock on a table, for the duration of creating indexes.
1951
 
@return error code or DB_SUCCESS */
1952
 
UNIV_INTERN
1953
 
ulint
1954
 
row_merge_lock_table(
1955
 
/*=================*/
1956
 
        trx_t*          trx,            /*!< in/out: transaction */
1957
 
        dict_table_t*   table,          /*!< in: table to lock */
1958
 
        enum lock_mode  mode)           /*!< in: LOCK_X or LOCK_S */
1959
 
{
1960
 
        mem_heap_t*     heap;
1961
 
        que_thr_t*      thr;
1962
 
        ulint           err;
1963
 
        sel_node_t*     node;
1964
 
 
1965
 
        ut_ad(trx);
1966
 
        ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
1967
 
        ut_ad(mode == LOCK_X || mode == LOCK_S);
1968
 
 
1969
 
        heap = mem_heap_create(512);
1970
 
 
1971
 
        trx->op_info = "setting table lock for creating or dropping index";
1972
 
 
1973
 
        node = sel_node_create(heap);
1974
 
        thr = pars_complete_graph_for_exec(node, trx, heap);
1975
 
        thr->graph->state = QUE_FORK_ACTIVE;
1976
 
 
1977
 
        /* We use the select query graph as the dummy graph needed
1978
 
        in the lock module call */
1979
 
 
1980
 
        thr = que_fork_get_first_thr(static_cast<que_fork_t *>(que_node_get_parent(thr)));
1981
 
        que_thr_move_to_run_state_for_mysql(thr, trx);
1982
 
 
1983
 
run_again:
1984
 
        thr->run_node = thr;
1985
 
        thr->prev_node = thr->common.parent;
1986
 
 
1987
 
        err = lock_table(0, table, mode, thr);
1988
 
 
1989
 
        trx->error_state = err;
1990
 
 
1991
 
        if (UNIV_LIKELY(err == DB_SUCCESS)) {
1992
 
                que_thr_stop_for_mysql_no_error(thr, trx);
1993
 
        } else {
1994
 
                que_thr_stop_for_mysql(thr);
1995
 
 
1996
 
                if (err != DB_QUE_THR_SUSPENDED) {
1997
 
                        ibool   was_lock_wait;
1998
 
 
1999
 
                        was_lock_wait = row_mysql_handle_errors(
2000
 
                                &err, trx, thr, NULL);
2001
 
 
2002
 
                        if (was_lock_wait) {
2003
 
                                goto run_again;
2004
 
                        }
2005
 
                } else {
2006
 
                        que_thr_t*      run_thr;
2007
 
                        que_node_t*     parent;
2008
 
 
2009
 
                        parent = que_node_get_parent(thr);
2010
 
                        run_thr = que_fork_start_command(static_cast<que_fork_t *>(parent));
2011
 
 
2012
 
                        ut_a(run_thr == thr);
2013
 
 
2014
 
                        /* There was a lock wait but the thread was not
2015
 
                        in a ready to run or running state. */
2016
 
                        trx->error_state = DB_LOCK_WAIT;
2017
 
 
2018
 
                        goto run_again;
2019
 
                }
2020
 
        }
2021
 
 
2022
 
        que_graph_free(thr->graph);
2023
 
        trx->op_info = "";
2024
 
 
2025
 
        return(err);
2026
 
}
2027
 
 
2028
 
/*********************************************************************//**
2029
 
Drop an index from the InnoDB system tables.  The data dictionary must
2030
 
have been locked exclusively by the caller, because the transaction
2031
 
will not be committed. */
2032
 
UNIV_INTERN
2033
 
void
2034
 
row_merge_drop_index(
2035
 
/*=================*/
2036
 
        dict_index_t*   index,  /*!< in: index to be removed */
2037
 
        dict_table_t*   table,  /*!< in: table */
2038
 
        trx_t*          trx)    /*!< in: transaction handle */
2039
 
{
2040
 
        ulint           err;
2041
 
        pars_info_t*    info = pars_info_create();
2042
 
 
2043
 
        /* We use the private SQL parser of Innobase to generate the
2044
 
        query graphs needed in deleting the dictionary data from system
2045
 
        tables in Innobase. Deleting a row from SYS_INDEXES table also
2046
 
        frees the file segments of the B-tree associated with the index. */
2047
 
 
2048
 
        static const char str1[] =
2049
 
                "PROCEDURE DROP_INDEX_PROC () IS\n"
2050
 
                "BEGIN\n"
2051
 
                /* Rename the index, so that it will be dropped by
2052
 
                row_merge_drop_temp_indexes() at crash recovery
2053
 
                if the server crashes before this trx is committed. */
2054
 
                "UPDATE SYS_INDEXES SET NAME=CONCAT('"
2055
 
                TEMP_INDEX_PREFIX_STR "', NAME) WHERE ID = :indexid;\n"
2056
 
                "COMMIT WORK;\n"
2057
 
                /* Drop the field definitions of the index. */
2058
 
                "DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
2059
 
                /* Drop the index definition and the B-tree. */
2060
 
                "DELETE FROM SYS_INDEXES WHERE ID = :indexid;\n"
2061
 
                "END;\n";
2062
 
 
2063
 
        ut_ad(index && table && trx);
2064
 
 
2065
 
        pars_info_add_ull_literal(info, "indexid", index->id);
2066
 
 
2067
 
        trx_start_if_not_started(trx);
2068
 
        trx->op_info = "dropping index";
2069
 
 
2070
 
        ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2071
 
 
2072
 
        err = que_eval_sql(info, str1, FALSE, trx);
2073
 
 
2074
 
        ut_a(err == DB_SUCCESS);
2075
 
 
2076
 
        /* Replace this index with another equivalent index for all
2077
 
        foreign key constraints on this table where this index is used */
2078
 
 
2079
 
        dict_table_replace_index_in_foreign_list(table, index, trx);
2080
 
        dict_index_remove_from_cache(table, index);
2081
 
 
2082
 
        trx->op_info = "";
2083
 
}
2084
 
 
2085
 
/*********************************************************************//**
2086
 
Drop those indexes which were created before an error occurred when
2087
 
building an index.  The data dictionary must have been locked
2088
 
exclusively by the caller, because the transaction will not be
2089
 
committed. */
2090
 
UNIV_INTERN
2091
 
void
2092
 
row_merge_drop_indexes(
2093
 
/*===================*/
2094
 
        trx_t*          trx,            /*!< in: transaction */
2095
 
        dict_table_t*   table,          /*!< in: table containing the indexes */
2096
 
        dict_index_t**  index,          /*!< in: indexes to drop */
2097
 
        ulint           num_created)    /*!< in: number of elements in index[] */
2098
 
{
2099
 
        ulint   key_num;
2100
 
 
2101
 
        for (key_num = 0; key_num < num_created; key_num++) {
2102
 
                row_merge_drop_index(index[key_num], table, trx);
2103
 
        }
2104
 
}
2105
 
 
2106
 
/*********************************************************************//**
2107
 
Drop all partially created indexes during crash recovery. */
2108
 
UNIV_INTERN
2109
 
void
2110
 
row_merge_drop_temp_indexes(void)
2111
 
/*=============================*/
2112
 
{
2113
 
        trx_t*          trx;
2114
 
        btr_pcur_t      pcur;
2115
 
        mtr_t           mtr;
2116
 
 
2117
 
        /* Load the table definitions that contain partially defined
2118
 
        indexes, so that the data dictionary information can be checked
2119
 
        when accessing the tablename.ibd files. */
2120
 
 
2121
 
        trx = trx_allocate_for_background();
2122
 
        trx->op_info = "dropping partially created indexes";
2123
 
        row_mysql_lock_data_dictionary(trx);
2124
 
 
2125
 
        mtr_start(&mtr);
2126
 
 
2127
 
        btr_pcur_open_at_index_side(
2128
 
                TRUE,
2129
 
                dict_table_get_first_index(dict_sys->sys_indexes),
2130
 
                BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
2131
 
 
2132
 
        for (;;) {
2133
 
                const rec_t*    rec;
2134
 
                const byte*     field;
2135
 
                ulint           len;
2136
 
                table_id_t      table_id;
2137
 
                dict_table_t*   table;
2138
 
 
2139
 
                btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2140
 
 
2141
 
                if (!btr_pcur_is_on_user_rec(&pcur)) {
2142
 
                        break;
2143
 
                }
2144
 
 
2145
 
                rec = btr_pcur_get_rec(&pcur);
2146
 
                field = rec_get_nth_field_old(rec, DICT_SYS_INDEXES_NAME_FIELD,
2147
 
                                              &len);
2148
 
                if (len == UNIV_SQL_NULL || len == 0
2149
 
                    || (char) *field != TEMP_INDEX_PREFIX) {
2150
 
                        continue;
2151
 
                }
2152
 
 
2153
 
                /* This is a temporary index. */
2154
 
 
2155
 
                field = rec_get_nth_field_old(rec, 0/*TABLE_ID*/, &len);
2156
 
                if (len != 8) {
2157
 
                        /* Corrupted TABLE_ID */
2158
 
                        continue;
2159
 
                }
2160
 
 
2161
 
                table_id = mach_read_from_8(field);
2162
 
 
2163
 
                btr_pcur_store_position(&pcur, &mtr);
2164
 
                btr_pcur_commit_specify_mtr(&pcur, &mtr);
2165
 
 
2166
 
                table = dict_table_get_on_id_low(table_id);
2167
 
 
2168
 
                if (table) {
2169
 
                        dict_index_t*   index;
2170
 
                        dict_index_t*   next_index;
2171
 
 
2172
 
                        for (index = dict_table_get_first_index(table);
2173
 
                             index; index = next_index) {
2174
 
 
2175
 
                                next_index = dict_table_get_next_index(index);
2176
 
 
2177
 
                                if (*index->name == TEMP_INDEX_PREFIX) {
2178
 
                                        row_merge_drop_index(index, table, trx);
2179
 
                                        trx_commit_for_mysql(trx);
2180
 
                                }
2181
 
                        }
2182
 
                }
2183
 
 
2184
 
                mtr_start(&mtr);
2185
 
                btr_pcur_restore_position(BTR_SEARCH_LEAF,
2186
 
                                          &pcur, &mtr);
2187
 
        }
2188
 
 
2189
 
        btr_pcur_close(&pcur);
2190
 
        mtr_commit(&mtr);
2191
 
        row_mysql_unlock_data_dictionary(trx);
2192
 
        trx_free_for_background(trx);
2193
 
}
2194
 
 
2195
 
/*********************************************************************//**
2196
 
Create a merge file. */
2197
 
static
2198
 
void
2199
 
row_merge_file_create(
2200
 
/*==================*/
2201
 
        merge_file_t*   merge_file)     /*!< out: merge file structure */
2202
 
{
2203
 
#ifdef UNIV_PFS_IO
2204
 
        /* This temp file open does not go through normal
2205
 
        file APIs, add instrumentation to register with
2206
 
        performance schema */
2207
 
        struct PSI_file_locker* locker = NULL;
2208
 
        PSI_file_locker_state   state;
2209
 
        register_pfs_file_open_begin(&state, locker, innodb_file_temp_key,
2210
 
                                     PSI_FILE_OPEN,
2211
 
                                     "Innodb Merge Temp File",
2212
 
                                     __FILE__, __LINE__);
2213
 
#endif
2214
 
        merge_file->fd = innobase_mysql_tmpfile();
2215
 
        merge_file->offset = 0;
2216
 
        merge_file->n_rec = 0;
2217
 
#ifdef UNIV_PFS_IO
2218
 
        register_pfs_file_open_end(locker, merge_file->fd);
2219
 
#endif
2220
 
}
2221
 
 
2222
 
/*********************************************************************//**
2223
 
Destroy a merge file. */
2224
 
static
2225
 
void
2226
 
row_merge_file_destroy(
2227
 
/*===================*/
2228
 
        merge_file_t*   merge_file)     /*!< out: merge file structure */
2229
 
{
2230
 
#ifdef UNIV_PFS_IO
2231
 
        struct PSI_file_locker* locker = NULL;
2232
 
        PSI_file_locker_state   state;
2233
 
        register_pfs_file_io_begin(&state, locker, merge_file->fd, 0, PSI_FILE_CLOSE,
2234
 
                                   __FILE__, __LINE__);
2235
 
#endif
2236
 
        if (merge_file->fd != -1) {
2237
 
                close(merge_file->fd);
2238
 
                merge_file->fd = -1;
2239
 
        }
2240
 
 
2241
 
#ifdef UNIV_PFS_IO
2242
 
        register_pfs_file_io_end(locker, 0);
2243
 
#endif
2244
 
}
2245
 
 
2246
 
/*********************************************************************//**
2247
 
Determine the precise type of a column that is added to a tem
2248
 
if a column must be constrained NOT NULL.
2249
 
@return col->prtype, possibly ORed with DATA_NOT_NULL */
2250
 
UNIV_INLINE
2251
 
ulint
2252
 
row_merge_col_prtype(
2253
 
/*=================*/
2254
 
        const dict_col_t*       col,            /*!< in: column */
2255
 
        const char*             col_name,       /*!< in: name of the column */
2256
 
        const merge_index_def_t*index_def)      /*!< in: the index definition
2257
 
                                                of the primary key */
2258
 
{
2259
 
        ulint   prtype = col->prtype;
2260
 
        ulint   i;
2261
 
 
2262
 
        ut_ad(index_def->ind_type & DICT_CLUSTERED);
2263
 
 
2264
 
        if (prtype & DATA_NOT_NULL) {
2265
 
 
2266
 
                return(prtype);
2267
 
        }
2268
 
 
2269
 
        /* All columns that are included
2270
 
        in the PRIMARY KEY must be NOT NULL. */
2271
 
 
2272
 
        for (i = 0; i < index_def->n_fields; i++) {
2273
 
                if (!strcmp(col_name, index_def->fields[i].field_name)) {
2274
 
                        return(prtype | DATA_NOT_NULL);
2275
 
                }
2276
 
        }
2277
 
 
2278
 
        return(prtype);
2279
 
}
2280
 
 
2281
 
/*********************************************************************//**
2282
 
Create a temporary table for creating a primary key, using the definition
2283
 
of an existing table.
2284
 
@return table, or NULL on error */
2285
 
UNIV_INTERN
2286
 
dict_table_t*
2287
 
row_merge_create_temporary_table(
2288
 
/*=============================*/
2289
 
        const char*             table_name,     /*!< in: new table name */
2290
 
        const merge_index_def_t*index_def,      /*!< in: the index definition
2291
 
                                                of the primary key */
2292
 
        const dict_table_t*     table,          /*!< in: old table definition */
2293
 
        trx_t*                  trx)            /*!< in/out: transaction
2294
 
                                                (sets error_state) */
2295
 
{
2296
 
        ulint           i;
2297
 
        dict_table_t*   new_table = NULL;
2298
 
        ulint           n_cols = dict_table_get_n_user_cols(table);
2299
 
        ulint           error;
2300
 
        mem_heap_t*     heap = mem_heap_create(1000);
2301
 
 
2302
 
        ut_ad(table_name);
2303
 
        ut_ad(index_def);
2304
 
        ut_ad(table);
2305
 
        ut_ad(mutex_own(&dict_sys->mutex));
2306
 
 
2307
 
        new_table = dict_mem_table_create(table_name, 0, n_cols, table->flags);
2308
 
 
2309
 
        for (i = 0; i < n_cols; i++) {
2310
 
                const dict_col_t*       col;
2311
 
                const char*             col_name;
2312
 
 
2313
 
                col = dict_table_get_nth_col(table, i);
2314
 
                col_name = dict_table_get_col_name(table, i);
2315
 
 
2316
 
                dict_mem_table_add_col(new_table, heap, col_name, col->mtype,
2317
 
                                       row_merge_col_prtype(col, col_name,
2318
 
                                                            index_def),
2319
 
                                       col->len);
2320
 
        }
2321
 
 
2322
 
        error = row_create_table_for_mysql(new_table, trx);
2323
 
        mem_heap_free(heap);
2324
 
 
2325
 
        if (error != DB_SUCCESS) {
2326
 
                trx->error_state = error;
2327
 
                new_table = NULL;
2328
 
        }
2329
 
 
2330
 
        return(new_table);
2331
 
}
2332
 
 
2333
 
/*********************************************************************//**
2334
 
Rename the temporary indexes in the dictionary to permanent ones.  The
2335
 
data dictionary must have been locked exclusively by the caller,
2336
 
because the transaction will not be committed.
2337
 
@return DB_SUCCESS if all OK */
2338
 
UNIV_INTERN
2339
 
ulint
2340
 
row_merge_rename_indexes(
2341
 
/*=====================*/
2342
 
        trx_t*          trx,            /*!< in/out: transaction */
2343
 
        dict_table_t*   table)          /*!< in/out: table with new indexes */
2344
 
{
2345
 
        ulint           err = DB_SUCCESS;
2346
 
        pars_info_t*    info = pars_info_create();
2347
 
 
2348
 
        /* We use the private SQL parser of Innobase to generate the
2349
 
        query graphs needed in renaming indexes. */
2350
 
 
2351
 
        static const char rename_indexes[] =
2352
 
                "PROCEDURE RENAME_INDEXES_PROC () IS\n"
2353
 
                "BEGIN\n"
2354
 
                "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
2355
 
                "WHERE TABLE_ID = :tableid AND SUBSTR(NAME,0,1)='"
2356
 
                TEMP_INDEX_PREFIX_STR "';\n"
2357
 
                "END;\n";
2358
 
 
2359
 
        ut_ad(table);
2360
 
        ut_ad(trx);
2361
 
        ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2362
 
 
2363
 
        trx->op_info = "renaming indexes";
2364
 
 
2365
 
        pars_info_add_ull_literal(info, "tableid", table->id);
2366
 
 
2367
 
        err = que_eval_sql(info, rename_indexes, FALSE, trx);
2368
 
 
2369
 
        if (err == DB_SUCCESS) {
2370
 
                dict_index_t*   index = dict_table_get_first_index(table);
2371
 
                do {
2372
 
                        if (*index->name == TEMP_INDEX_PREFIX) {
2373
 
                                index->name++;
2374
 
                        }
2375
 
                        index = dict_table_get_next_index(index);
2376
 
                } while (index);
2377
 
        }
2378
 
 
2379
 
        trx->op_info = "";
2380
 
 
2381
 
        return(err);
2382
 
}
2383
 
 
2384
 
/*********************************************************************//**
2385
 
Rename the tables in the data dictionary.  The data dictionary must
2386
 
have been locked exclusively by the caller, because the transaction
2387
 
will not be committed.
2388
 
@return error code or DB_SUCCESS */
2389
 
UNIV_INTERN
2390
 
ulint
2391
 
row_merge_rename_tables(
2392
 
/*====================*/
2393
 
        dict_table_t*   old_table,      /*!< in/out: old table, renamed to
2394
 
                                        tmp_name */
2395
 
        dict_table_t*   new_table,      /*!< in/out: new table, renamed to
2396
 
                                        old_table->name */
2397
 
        const char*     tmp_name,       /*!< in: new name for old_table */
2398
 
        trx_t*          trx)            /*!< in: transaction handle */
2399
 
{
2400
 
        ulint           err     = DB_ERROR;
2401
 
        pars_info_t*    info;
2402
 
        char            old_name[MAX_TABLE_NAME_LEN + 1];
2403
 
 
2404
 
        ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
2405
 
        ut_ad(old_table != new_table);
2406
 
        ut_ad(mutex_own(&dict_sys->mutex));
2407
 
 
2408
 
        ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2409
 
 
2410
 
        /* store the old/current name to an automatic variable */
2411
 
        if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
2412
 
                memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
2413
 
        } else {
2414
 
                ut_print_timestamp(stderr);
2415
 
                fprintf(stderr, "InnoDB: too long table name: '%s', "
2416
 
                        "max length is %d\n", old_table->name,
2417
 
                        MAX_TABLE_NAME_LEN);
2418
 
                ut_error;
2419
 
        }
2420
 
 
2421
 
        /* store the old/current name to an automatic variable */
2422
 
        if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
2423
 
                memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
2424
 
        } else {
2425
 
                ut_print_timestamp(stderr);
2426
 
                fprintf(stderr, "InnoDB: too long table name: '%s', "
2427
 
                        "max length is %d\n", old_table->name,
2428
 
                        MAX_TABLE_NAME_LEN);
2429
 
                ut_error;
2430
 
        }
2431
 
 
2432
 
        trx->op_info = "renaming tables";
2433
 
 
2434
 
        /* We use the private SQL parser of Innobase to generate the query
2435
 
        graphs needed in updating the dictionary data in system tables. */
2436
 
 
2437
 
        info = pars_info_create();
2438
 
 
2439
 
        pars_info_add_str_literal(info, "new_name", new_table->name);
2440
 
        pars_info_add_str_literal(info, "old_name", old_name);
2441
 
        pars_info_add_str_literal(info, "tmp_name", tmp_name);
2442
 
 
2443
 
        err = que_eval_sql(info,
2444
 
                           "PROCEDURE RENAME_TABLES () IS\n"
2445
 
                           "BEGIN\n"
2446
 
                           "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
2447
 
                           " WHERE NAME = :old_name;\n"
2448
 
                           "UPDATE SYS_TABLES SET NAME = :old_name\n"
2449
 
                           " WHERE NAME = :new_name;\n"
2450
 
                           "END;\n", FALSE, trx);
2451
 
 
2452
 
        if (err != DB_SUCCESS) {
2453
 
 
2454
 
                goto err_exit;
2455
 
        }
2456
 
 
2457
 
        /* The following calls will also rename the .ibd data files if
2458
 
        the tables are stored in a single-table tablespace */
2459
 
 
2460
 
        if (!dict_table_rename_in_cache(old_table, tmp_name, FALSE)
2461
 
            || !dict_table_rename_in_cache(new_table, old_name, FALSE)) {
2462
 
 
2463
 
                err = DB_ERROR;
2464
 
                goto err_exit;
2465
 
        }
2466
 
 
2467
 
        err = dict_load_foreigns(old_name, FALSE, TRUE);
2468
 
 
2469
 
        if (err != DB_SUCCESS) {
2470
 
err_exit:
2471
 
                trx->error_state = DB_SUCCESS;
2472
 
                trx_general_rollback_for_mysql(trx, NULL);
2473
 
                trx->error_state = DB_SUCCESS;
2474
 
        }
2475
 
 
2476
 
        trx->op_info = "";
2477
 
 
2478
 
        return(err);
2479
 
}
2480
 
 
2481
 
/*********************************************************************//**
2482
 
Create and execute a query graph for creating an index.
2483
 
@return DB_SUCCESS or error code */
2484
 
static
2485
 
ulint
2486
 
row_merge_create_index_graph(
2487
 
/*=========================*/
2488
 
        trx_t*          trx,            /*!< in: trx */
2489
 
        dict_table_t*   table,          /*!< in: table */
2490
 
        dict_index_t*   index)          /*!< in: index */
2491
 
{
2492
 
        ind_node_t*     node;           /*!< Index creation node */
2493
 
        mem_heap_t*     heap;           /*!< Memory heap */
2494
 
        que_thr_t*      thr;            /*!< Query thread */
2495
 
        ulint           err;
2496
 
 
2497
 
        ut_ad(trx);
2498
 
        ut_ad(table);
2499
 
        ut_ad(index);
2500
 
 
2501
 
        heap = mem_heap_create(512);
2502
 
 
2503
 
        index->table = table;
2504
 
        node = ind_create_graph_create(index, heap);
2505
 
        thr = pars_complete_graph_for_exec(node, trx, heap);
2506
 
 
2507
 
        ut_a(thr == que_fork_start_command(static_cast<que_fork_t *>(que_node_get_parent(thr))));
2508
 
 
2509
 
        que_run_threads(thr);
2510
 
 
2511
 
        err = trx->error_state;
2512
 
 
2513
 
        que_graph_free((que_t*) que_node_get_parent(thr));
2514
 
 
2515
 
        return(err);
2516
 
}
2517
 
 
2518
 
/*********************************************************************//**
2519
 
Create the index and load in to the dictionary.
2520
 
@return index, or NULL on error */
2521
 
UNIV_INTERN
2522
 
dict_index_t*
2523
 
row_merge_create_index(
2524
 
/*===================*/
2525
 
        trx_t*                  trx,    /*!< in/out: trx (sets error_state) */
2526
 
        dict_table_t*           table,  /*!< in: the index is on this table */
2527
 
        const merge_index_def_t*index_def)
2528
 
                                        /*!< in: the index definition */
2529
 
{
2530
 
        dict_index_t*   index;
2531
 
        ulint           err;
2532
 
        ulint           n_fields = index_def->n_fields;
2533
 
        ulint           i;
2534
 
 
2535
 
        /* Create the index prototype, using the passed in def, this is not
2536
 
        a persistent operation. We pass 0 as the space id, and determine at
2537
 
        a lower level the space id where to store the table. */
2538
 
 
2539
 
        index = dict_mem_index_create(table->name, index_def->name,
2540
 
                                      0, index_def->ind_type, n_fields);
2541
 
 
2542
 
        ut_a(index);
2543
 
 
2544
 
        for (i = 0; i < n_fields; i++) {
2545
 
                merge_index_field_t*    ifield = &index_def->fields[i];
2546
 
 
2547
 
                dict_mem_index_add_field(index, ifield->field_name,
2548
 
                                         ifield->prefix_len);
2549
 
        }
2550
 
 
2551
 
        /* Add the index to SYS_INDEXES, using the index prototype. */
2552
 
        err = row_merge_create_index_graph(trx, table, index);
2553
 
 
2554
 
        if (err == DB_SUCCESS) {
2555
 
 
2556
 
                index = row_merge_dict_table_get_index(
2557
 
                        table, index_def);
2558
 
 
2559
 
                ut_a(index);
2560
 
 
2561
 
                /* Note the id of the transaction that created this
2562
 
                index, we use it to restrict readers from accessing
2563
 
                this index, to ensure read consistency. */
2564
 
                index->trx_id = trx->id;
2565
 
        } else {
2566
 
                index = NULL;
2567
 
        }
2568
 
 
2569
 
        return(index);
2570
 
}
2571
 
 
2572
 
/*********************************************************************//**
2573
 
Check if a transaction can use an index. */
2574
 
UNIV_INTERN
2575
 
ibool
2576
 
row_merge_is_index_usable(
2577
 
/*======================*/
2578
 
        const trx_t*            trx,    /*!< in: transaction */
2579
 
        const dict_index_t*     index)  /*!< in: index to check */
2580
 
{
2581
 
        return(!trx->read_view
2582
 
               || read_view_sees_trx_id(trx->read_view, index->trx_id));
2583
 
}
2584
 
 
2585
 
/*********************************************************************//**
2586
 
Drop the old table.
2587
 
@return DB_SUCCESS or error code */
2588
 
UNIV_INTERN
2589
 
ulint
2590
 
row_merge_drop_table(
2591
 
/*=================*/
2592
 
        trx_t*          trx,            /*!< in: transaction */
2593
 
        dict_table_t*   table)          /*!< in: table to drop */
2594
 
{
2595
 
        /* There must be no open transactions on the table. */
2596
 
        ut_a(table->n_mysql_handles_opened == 0);
2597
 
 
2598
 
        return(row_drop_table_for_mysql(table->name, trx, FALSE));
2599
 
}
2600
 
 
2601
 
/*********************************************************************//**
2602
 
Build indexes on a table by reading a clustered index,
2603
 
creating a temporary file containing index entries, merge sorting
2604
 
these index entries and inserting sorted index entries to indexes.
2605
 
@return DB_SUCCESS or error code */
2606
 
UNIV_INTERN
2607
 
ulint
2608
 
row_merge_build_indexes(
2609
 
/*====================*/
2610
 
        trx_t*          trx,            /*!< in: transaction */
2611
 
        dict_table_t*   old_table,      /*!< in: table where rows are
2612
 
                                        read from */
2613
 
        dict_table_t*   new_table,      /*!< in: table where indexes are
2614
 
                                        created; identical to old_table
2615
 
                                        unless creating a PRIMARY KEY */
2616
 
        dict_index_t**  indexes,        /*!< in: indexes to be created */
2617
 
        ulint           n_indexes,      /*!< in: size of indexes[] */
2618
 
        TABLE*  table)          /*!< in/out: MySQL table, for
2619
 
                                        reporting erroneous key value
2620
 
                                        if applicable */
2621
 
{
2622
 
        merge_file_t*           merge_files;
2623
 
        row_merge_block_t*      block;
2624
 
        ulint                   block_size;
2625
 
        ulint                   i;
2626
 
        ulint                   error;
2627
 
        int                     tmpfd;
2628
 
 
2629
 
        ut_ad(trx);
2630
 
        ut_ad(old_table);
2631
 
        ut_ad(new_table);
2632
 
        ut_ad(indexes);
2633
 
        ut_ad(n_indexes);
2634
 
 
2635
 
        trx_start_if_not_started(trx);
2636
 
 
2637
 
        /* Allocate memory for merge file data structure and initialize
2638
 
        fields */
2639
 
 
2640
 
        merge_files = static_cast<merge_file_t *>(mem_alloc(n_indexes * sizeof *merge_files));
2641
 
        block_size = 3 * sizeof *block;
2642
 
        block = static_cast<row_merge_block_t *>(os_mem_alloc_large(&block_size));
2643
 
 
2644
 
        for (i = 0; i < n_indexes; i++) {
2645
 
 
2646
 
                row_merge_file_create(&merge_files[i]);
2647
 
        }
2648
 
 
2649
 
        tmpfd = innobase_mysql_tmpfile();
2650
 
 
2651
 
        /* Reset the MySQL row buffer that is used when reporting
2652
 
        duplicate keys. */
2653
 
        innobase_rec_reset(table);
2654
 
 
2655
 
        /* Read clustered index of the table and create files for
2656
 
        secondary index entries for merge sort */
2657
 
 
2658
 
        error = row_merge_read_clustered_index(
2659
 
                trx, table, old_table, new_table, indexes,
2660
 
                merge_files, n_indexes, block);
2661
 
 
2662
 
        if (error != DB_SUCCESS) {
2663
 
 
2664
 
                goto func_exit;
2665
 
        }
2666
 
 
2667
 
        /* Now we have files containing index entries ready for
2668
 
        sorting and inserting. */
2669
 
 
2670
 
        for (i = 0; i < n_indexes; i++) {
2671
 
                error = row_merge_sort(trx, indexes[i], &merge_files[i],
2672
 
                                       block, &tmpfd, table);
2673
 
 
2674
 
                if (error == DB_SUCCESS) {
2675
 
                        error = row_merge_insert_index_tuples(
2676
 
                                trx, indexes[i], new_table,
2677
 
                                dict_table_zip_size(old_table),
2678
 
                                merge_files[i].fd, block);
2679
 
                }
2680
 
 
2681
 
                /* Close the temporary file to free up space. */
2682
 
                row_merge_file_destroy(&merge_files[i]);
2683
 
 
2684
 
                if (error != DB_SUCCESS) {
2685
 
                        trx->error_key_num = i;
2686
 
                        goto func_exit;
2687
 
                }
2688
 
        }
2689
 
 
2690
 
func_exit:
2691
 
        close(tmpfd);
2692
 
 
2693
 
        for (i = 0; i < n_indexes; i++) {
2694
 
                row_merge_file_destroy(&merge_files[i]);
2695
 
        }
2696
 
 
2697
 
        mem_free(merge_files);
2698
 
        os_mem_free_large(block, block_size);
2699
 
 
2700
 
        return(error);
2701
 
}