~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/innobase/row/row0merge.c

Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
61
61
#ifdef UNIV_DEBUG
62
62
/** Set these in order ot enable debug printout. */
63
63
/* @{ */
 
64
/** Log the outcome of each row_merge_cmp() call, comparing records. */
64
65
static ibool    row_merge_print_cmp;
 
66
/** Log each record read from temporary file. */
65
67
static ibool    row_merge_print_read;
 
68
/** Log each record write to temporary file. */
66
69
static ibool    row_merge_print_write;
 
70
/** Log each row_merge_blocks() call, merging two blocks of records to
 
71
a bigger one. */
 
72
static ibool    row_merge_print_block;
 
73
/** Log each block read from temporary file. */
 
74
static ibool    row_merge_print_block_read;
 
75
/** Log each block read from temporary file. */
 
76
static ibool    row_merge_print_block_write;
67
77
/* @} */
68
78
#endif /* UNIV_DEBUG */
69
79
 
110
120
 
111
121
/** Information about temporary files used in merge sort */
112
122
struct merge_file_struct {
113
 
        int     fd;             /*!< file descriptor */
114
 
        ulint   offset;         /*!< file offset */
 
123
        int             fd;             /*!< file descriptor */
 
124
        ulint           offset;         /*!< file offset (end of file) */
 
125
        ib_uint64_t     n_rec;          /*!< number of records in the file */
115
126
};
116
127
 
117
128
/** Information about temporary files used in merge sort */
683
694
        ib_uint64_t     ofs = ((ib_uint64_t) offset) * sizeof *buf;
684
695
        ibool           success;
685
696
 
 
697
#ifdef UNIV_DEBUG
 
698
        if (row_merge_print_block_read) {
 
699
                fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
 
700
                        fd, (ulong) offset);
 
701
        }
 
702
#endif /* UNIV_DEBUG */
 
703
 
686
704
        success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
687
705
                                                 (ulint) (ofs & 0xFFFFFFFF),
688
706
                                                 (ulint) (ofs >> 32),
710
728
        ib_uint64_t     ofs = ((ib_uint64_t) offset)
711
729
                * sizeof(row_merge_block_t);
712
730
 
 
731
#ifdef UNIV_DEBUG
 
732
        if (row_merge_print_block_write) {
 
733
                fprintf(stderr, "row_merge_write fd=%d ofs=%lu\n",
 
734
                        fd, (ulong) offset);
 
735
        }
 
736
#endif /* UNIV_DEBUG */
 
737
 
713
738
        return(UNIV_LIKELY(os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
714
739
                                         (ulint) (ofs & 0xFFFFFFFF),
715
740
                                         (ulint) (ofs >> 32),
719
744
/********************************************************************//**
720
745
Read a merge record.
721
746
@return pointer to next record, or NULL on I/O error or end of list */
722
 
static
 
747
static __attribute__((nonnull))
723
748
const byte*
724
749
row_merge_read_rec(
725
750
/*===============*/
1071
1096
Reads clustered index of the table and create temporary files
1072
1097
containing the index entries for the indexes to be built.
1073
1098
@return DB_SUCCESS or error */
1074
 
static
 
1099
static __attribute__((nonnull))
1075
1100
ulint
1076
1101
row_merge_read_clustered_index(
1077
1102
/*===========================*/
1176
1201
                in order to release the latch on the old page. */
1177
1202
 
1178
1203
                if (btr_pcur_is_after_last_on_page(&pcur)) {
 
1204
                        if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
 
1205
                                err = DB_INTERRUPTED;
 
1206
                                trx->error_key_num = 0;
 
1207
                                goto func_exit;
 
1208
                        }
 
1209
 
1179
1210
                        btr_pcur_store_position(&pcur, &mtr);
1180
1211
                        mtr_commit(&mtr);
1181
1212
                        mtr_start(&mtr);
1215
1246
 
1216
1247
                                        if (dfield_is_null(field)) {
1217
1248
                                                err = DB_PRIMARY_KEY_IS_NULL;
1218
 
                                                i = 0;
1219
 
                                                trx->error_key_num = i;
 
1249
                                                trx->error_key_num = 0;
1220
1250
                                                goto func_exit;
1221
1251
                                        }
1222
1252
 
1235
1265
 
1236
1266
                        if (UNIV_LIKELY
1237
1267
                            (row && row_merge_buf_add(buf, row, ext))) {
 
1268
                                file->n_rec++;
1238
1269
                                continue;
1239
1270
                        }
1240
1271
 
1256
1287
 
1257
1288
                                        if (dup.n_dup) {
1258
1289
                                                err = DB_DUPLICATE_KEY;
1259
 
err_exit:
1260
1290
                                                trx->error_key_num = i;
1261
1291
                                                goto func_exit;
1262
1292
                                        }
1270
1300
                        if (!row_merge_write(file->fd, file->offset++,
1271
1301
                                             block)) {
1272
1302
                                err = DB_OUT_OF_FILE_SPACE;
1273
 
                                goto err_exit;
 
1303
                                trx->error_key_num = i;
 
1304
                                goto func_exit;
1274
1305
                        }
1275
1306
 
1276
1307
                        UNIV_MEM_INVALID(block[0], sizeof block[0]);
1277
1308
                        merge_buf[i] = row_merge_buf_empty(buf);
1278
1309
 
1279
 
                        /* Try writing the record again, now that
1280
 
                        the buffer has been written out and emptied. */
1281
 
 
1282
 
                        if (UNIV_UNLIKELY
1283
 
                            (row && !row_merge_buf_add(buf, row, ext))) {
1284
 
                                /* An empty buffer should have enough
1285
 
                                room for at least one record. */
1286
 
                                ut_error;
 
1310
                        if (UNIV_LIKELY(row != NULL)) {
 
1311
                                /* Try writing the record again, now
 
1312
                                that the buffer has been written out
 
1313
                                and emptied. */
 
1314
 
 
1315
                                if (UNIV_UNLIKELY
 
1316
                                    (!row_merge_buf_add(buf, row, ext))) {
 
1317
                                        /* An empty buffer should have enough
 
1318
                                        room for at least one record. */
 
1319
                                        ut_error;
 
1320
                                }
 
1321
 
 
1322
                                file->n_rec++;
1287
1323
                        }
1288
1324
                }
1289
1325
 
1322
1358
                b2 = row_merge_write_rec(&block[2], &buf[2], b2,        \
1323
1359
                                         of->fd, &of->offset,           \
1324
1360
                                         mrec##N, offsets##N);          \
1325
 
                if (UNIV_UNLIKELY(!b2)) {                               \
 
1361
                if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) {  \
1326
1362
                        goto corrupt;                                   \
1327
1363
                }                                                       \
1328
1364
                b##N = row_merge_read_rec(&block[N], &buf[N],           \
1338
1374
        } while (0)
1339
1375
 
1340
1376
/*************************************************************//**
1341
 
Merge two blocks of linked lists on disk and write a bigger block.
 
1377
Merge two blocks of records on disk and write a bigger block.
1342
1378
@return DB_SUCCESS or error code */
1343
1379
static
1344
1380
ulint
1345
1381
row_merge_blocks(
1346
1382
/*=============*/
1347
1383
        const dict_index_t*     index,  /*!< in: index being created */
1348
 
        merge_file_t*           file,   /*!< in/out: file containing
 
1384
        const merge_file_t*     file,   /*!< in: file containing
1349
1385
                                        index entries */
1350
1386
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
1351
1387
        ulint*                  foffs0, /*!< in/out: offset of first
1368
1404
        ulint*          offsets0;/* offsets of mrec0 */
1369
1405
        ulint*          offsets1;/* offsets of mrec1 */
1370
1406
 
 
1407
#ifdef UNIV_DEBUG
 
1408
        if (row_merge_print_block) {
 
1409
                fprintf(stderr,
 
1410
                        "row_merge_blocks fd=%d ofs=%lu + fd=%d ofs=%lu"
 
1411
                        " = fd=%d ofs=%lu\n",
 
1412
                        file->fd, (ulong) *foffs0,
 
1413
                        file->fd, (ulong) *foffs1,
 
1414
                        of->fd, (ulong) of->offset);
 
1415
        }
 
1416
#endif /* UNIV_DEBUG */
 
1417
 
1371
1418
        heap = row_merge_heap_create(index, &offsets0, &offsets1);
1372
1419
 
1373
1420
        /* Write a record and read the next record.  Split the output
1440
1487
}
1441
1488
 
1442
1489
/*************************************************************//**
 
1490
Copy a block of index entries.
 
1491
@return TRUE on success, FALSE on failure */
 
1492
static __attribute__((nonnull))
 
1493
ibool
 
1494
row_merge_blocks_copy(
 
1495
/*==================*/
 
1496
        const dict_index_t*     index,  /*!< in: index being created */
 
1497
        const merge_file_t*     file,   /*!< in: input file */
 
1498
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
 
1499
        ulint*                  foffs0, /*!< in/out: input file offset */
 
1500
        merge_file_t*           of)     /*!< in/out: output file */
 
1501
{
 
1502
        mem_heap_t*     heap;   /*!< memory heap for offsets0, offsets1 */
 
1503
 
 
1504
        mrec_buf_t      buf[3]; /*!< buffer for handling
 
1505
                                split mrec in block[] */
 
1506
        const byte*     b0;     /*!< pointer to block[0] */
 
1507
        byte*           b2;     /*!< pointer to block[2] */
 
1508
        const mrec_t*   mrec0;  /*!< merge rec, points to block[0] */
 
1509
        ulint*          offsets0;/* offsets of mrec0 */
 
1510
        ulint*          offsets1;/* dummy offsets */
 
1511
 
 
1512
#ifdef UNIV_DEBUG
 
1513
        if (row_merge_print_block) {
 
1514
                fprintf(stderr,
 
1515
                        "row_merge_blocks_copy fd=%d ofs=%lu"
 
1516
                        " = fd=%d ofs=%lu\n",
 
1517
                        file->fd, (ulong) foffs0,
 
1518
                        of->fd, (ulong) of->offset);
 
1519
        }
 
1520
#endif /* UNIV_DEBUG */
 
1521
 
 
1522
        heap = row_merge_heap_create(index, &offsets0, &offsets1);
 
1523
 
 
1524
        /* Write a record and read the next record.  Split the output
 
1525
        file in two halves, which can be merged on the following pass. */
 
1526
 
 
1527
        if (!row_merge_read(file->fd, *foffs0, &block[0])) {
 
1528
corrupt:
 
1529
                mem_heap_free(heap);
 
1530
                return(FALSE);
 
1531
        }
 
1532
 
 
1533
        b0 = block[0];
 
1534
        b2 = block[2];
 
1535
 
 
1536
        b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
 
1537
                                foffs0, &mrec0, offsets0);
 
1538
        if (UNIV_UNLIKELY(!b0 && mrec0)) {
 
1539
 
 
1540
                goto corrupt;
 
1541
        }
 
1542
 
 
1543
        if (mrec0) {
 
1544
                /* append all mrec0 to output */
 
1545
                for (;;) {
 
1546
                        ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
 
1547
                }
 
1548
        }
 
1549
done0:
 
1550
 
 
1551
        /* The file offset points to the beginning of the last page
 
1552
        that has been read.  Update it to point to the next block. */
 
1553
        (*foffs0)++;
 
1554
 
 
1555
        mem_heap_free(heap);
 
1556
        return(row_merge_write_eof(&block[2], b2, of->fd, &of->offset)
 
1557
               != NULL);
 
1558
}
 
1559
 
 
1560
/*************************************************************//**
1443
1561
Merge disk files.
1444
1562
@return DB_SUCCESS or error code */
1445
 
static
 
1563
static __attribute__((nonnull))
1446
1564
ulint
1447
1565
row_merge(
1448
1566
/*======*/
 
1567
        trx_t*                  trx,    /*!< in: transaction */
1449
1568
        const dict_index_t*     index,  /*!< in: index being created */
1450
1569
        merge_file_t*           file,   /*!< in/out: file containing
1451
1570
                                        index entries */
1452
 
        ulint                   half,   /*!< in: half the file */
 
1571
        ulint*                  half,   /*!< in/out: half the file */
1453
1572
        row_merge_block_t*      block,  /*!< in/out: 3 buffers */
1454
1573
        int*                    tmpfd,  /*!< in/out: temporary file handle */
1455
1574
        TABLE*                  table)  /*!< in/out: MySQL table, for
1460
1579
        ulint           foffs1; /*!< second input offset */
1461
1580
        ulint           error;  /*!< error code */
1462
1581
        merge_file_t    of;     /*!< output file */
 
1582
        const ulint     ihalf   = *half;
 
1583
                                /*!< half the input file */
 
1584
        ulint           ohalf;  /*!< half the output file */
1463
1585
 
1464
1586
        UNIV_MEM_ASSERT_W(block[0], 3 * sizeof block[0]);
1465
 
        ut_ad(half > 0);
 
1587
        ut_ad(ihalf < file->offset);
1466
1588
 
1467
1589
        of.fd = *tmpfd;
1468
1590
        of.offset = 0;
 
1591
        of.n_rec = 0;
1469
1592
 
1470
1593
        /* Merge blocks to the output file. */
 
1594
        ohalf = 0;
1471
1595
        foffs0 = 0;
1472
 
        foffs1 = half;
1473
 
 
1474
 
        for (; foffs0 < half && foffs1 < file->offset; foffs0++, foffs1++) {
 
1596
        foffs1 = ihalf;
 
1597
 
 
1598
        for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
 
1599
                ulint   ahalf;  /*!< arithmetic half the input file */
 
1600
 
 
1601
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
 
1602
                        return(DB_INTERRUPTED);
 
1603
                }
 
1604
 
1475
1605
                error = row_merge_blocks(index, file, block,
1476
1606
                                         &foffs0, &foffs1, &of, table);
1477
1607
 
1478
1608
                if (error != DB_SUCCESS) {
1479
1609
                        return(error);
1480
1610
                }
 
1611
 
 
1612
                /* Record the offset of the output file when
 
1613
                approximately half the output has been generated.  In
 
1614
                this way, the next invocation of row_merge() will
 
1615
                spend most of the time in this loop.  The initial
 
1616
                estimate is ohalf==0. */
 
1617
                ahalf = file->offset / 2;
 
1618
                ut_ad(ohalf <= of.offset);
 
1619
 
 
1620
                /* Improve the estimate until reaching half the input
 
1621
                file size, or we can not get any closer to it.  All
 
1622
                comparands should be non-negative when !(ohalf < ahalf)
 
1623
                because ohalf <= of.offset. */
 
1624
                if (ohalf < ahalf || of.offset - ahalf < ohalf - ahalf) {
 
1625
                        ohalf = of.offset;
 
1626
                }
1481
1627
        }
1482
1628
 
1483
 
        /* Copy the last block, if there is one. */
1484
 
        while (foffs0 < half) {
1485
 
                if (!row_merge_read(file->fd, foffs0++, block)
1486
 
                    || !row_merge_write(of.fd, of.offset++, block)) {
 
1629
        /* Copy the last blocks, if there are any. */
 
1630
 
 
1631
        while (foffs0 < ihalf) {
 
1632
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
 
1633
                        return(DB_INTERRUPTED);
 
1634
                }
 
1635
 
 
1636
                if (!row_merge_blocks_copy(index, file, block, &foffs0, &of)) {
1487
1637
                        return(DB_CORRUPTION);
1488
1638
                }
1489
1639
        }
 
1640
 
 
1641
        ut_ad(foffs0 == ihalf);
 
1642
 
1490
1643
        while (foffs1 < file->offset) {
1491
 
                if (!row_merge_read(file->fd, foffs1++, block)
1492
 
                    || !row_merge_write(of.fd, of.offset++, block)) {
 
1644
                if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
 
1645
                        return(DB_INTERRUPTED);
 
1646
                }
 
1647
 
 
1648
                if (!row_merge_blocks_copy(index, file, block, &foffs1, &of)) {
1493
1649
                        return(DB_CORRUPTION);
1494
1650
                }
1495
1651
        }
1496
1652
 
 
1653
        ut_ad(foffs1 == file->offset);
 
1654
 
 
1655
        if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
 
1656
                return(DB_CORRUPTION);
 
1657
        }
 
1658
 
1497
1659
        /* Swap file descriptors for the next pass. */
1498
1660
        *tmpfd = file->fd;
1499
1661
        *file = of;
 
1662
        *half = ohalf;
1500
1663
 
1501
1664
        UNIV_MEM_INVALID(block[0], 3 * sizeof block[0]);
1502
1665
 
1510
1673
ulint
1511
1674
row_merge_sort(
1512
1675
/*===========*/
 
1676
        trx_t*                  trx,    /*!< in: transaction */
1513
1677
        const dict_index_t*     index,  /*!< in: index being created */
1514
1678
        merge_file_t*           file,   /*!< in/out: file containing
1515
1679
                                        index entries */
1519
1683
                                        reporting erroneous key value
1520
1684
                                        if applicable */
1521
1685
{
1522
 
        ulint   blksz;  /*!< block size */
1523
 
 
1524
 
        for (blksz = 1; blksz < file->offset; blksz *= 2) {
1525
 
                ulint   half;
 
1686
        ulint   half = file->offset / 2;
 
1687
 
 
1688
        /* The file should always contain at least one byte (the end
 
1689
        of file marker).  Thus, it must be at least one block. */
 
1690
        ut_ad(file->offset > 0);
 
1691
 
 
1692
        do {
1526
1693
                ulint   error;
1527
1694
 
1528
 
                ut_ad(ut_is_2pow(blksz));
1529
 
                half = ut_2pow_round((file->offset + (blksz - 1)) / 2, blksz);
1530
 
                error = row_merge(index, file, half, block, tmpfd, table);
 
1695
                error = row_merge(trx, index, file, &half,
 
1696
                                  block, tmpfd, table);
1531
1697
 
1532
1698
                if (error != DB_SUCCESS) {
1533
1699
                        return(error);
1534
1700
                }
1535
 
        }
 
1701
 
 
1702
                /* half > 0 should hold except when the file consists
 
1703
                of one block.  No need to merge further then. */
 
1704
                ut_ad(half > 0 || file->offset == 1);
 
1705
        } while (half < file->offset && half > 0);
1536
1706
 
1537
1707
        return(DB_SUCCESS);
1538
1708
}
1799
1969
        static const char str1[] =
1800
1970
                "PROCEDURE DROP_INDEX_PROC () IS\n"
1801
1971
                "BEGIN\n"
 
1972
                /* Rename the index, so that it will be dropped by
 
1973
                row_merge_drop_temp_indexes() at crash recovery
 
1974
                if the server crashes before this trx is committed. */
 
1975
                "UPDATE SYS_INDEXES SET NAME=CONCAT('"
 
1976
                TEMP_INDEX_PREFIX_STR "', NAME) WHERE ID = :indexid;\n"
 
1977
                "COMMIT WORK;\n"
 
1978
                /* Drop the field definitions of the index. */
1802
1979
                "DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
 
1980
                /* Drop the index definition and the B-tree. */
1803
1981
                "DELETE FROM SYS_INDEXES WHERE ID = :indexid\n"
1804
1982
                "               AND TABLE_ID = :tableid;\n"
1805
1983
                "END;\n";
1912
2090
{
1913
2091
        merge_file->fd = innobase_mysql_tmpfile();
1914
2092
        merge_file->offset = 0;
 
2093
        merge_file->n_rec = 0;
1915
2094
}
1916
2095
 
1917
2096
/*********************************************************************//**
2132
2311
        if (err != DB_SUCCESS) {
2133
2312
err_exit:
2134
2313
                trx->error_state = DB_SUCCESS;
2135
 
                trx_general_rollback_for_mysql(trx, FALSE, NULL);
 
2314
                trx_general_rollback_for_mysql(trx, NULL);
2136
2315
                trx->error_state = DB_SUCCESS;
2137
2316
        }
2138
2317
 
2334
2513
        sorting and inserting. */
2335
2514
 
2336
2515
        for (i = 0; i < n_indexes; i++) {
2337
 
                error = row_merge_sort(indexes[i], &merge_files[i],
 
2516
                error = row_merge_sort(trx, indexes[i], &merge_files[i],
2338
2517
                                       block, &tmpfd, table);
2339
2518
 
2340
2519
                if (error == DB_SUCCESS) {