~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/myisam/mi_check.cc

  • Committer: Brian Aker
  • Date: 2009-08-06 15:04:21 UTC
  • mfrom: (1093.1.52 captain)
  • Revision ID: brian@gaz-20090806150421-w8yrasl1m8exorxs
Merge Jay

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/* Describe, check and repair of MyISAM tables */
17
17
 
40
40
  only. And it is sufficient to calculate the checksum once only.
41
41
*/
42
42
 
43
 
#include "myisam_priv.h"
44
 
#include "drizzled/internal/m_string.h"
 
43
#include "myisamdef.h"
 
44
#include <mystrings/m_string.h>
45
45
#include <stdarg.h>
 
46
#include <mysys/my_getopt.h>
46
47
#ifdef HAVE_SYS_VADVISE_H
47
48
#include <sys/vadvise.h>
48
49
#endif
53
54
#include <sys/mman.h>
54
55
#endif
55
56
#include <drizzled/util/test.h>
56
 
#include "drizzled/error.h"
57
57
 
58
58
#include <algorithm>
59
59
 
60
60
using namespace std;
61
 
using namespace drizzled;
62
 
using namespace drizzled::internal;
63
 
 
64
 
 
65
 
#define my_off_t2double(A)  ((double) (my_off_t) (A))
 
61
 
66
62
 
67
63
/* Functions defined in this file */
68
64
 
74
70
static ha_checksum calc_checksum(ha_rows count);
75
71
static int writekeys(MI_SORT_PARAM *sort_param);
76
72
static int sort_one_index(MI_CHECK *param, MI_INFO *info,MI_KEYDEF *keyinfo,
77
 
                          my_off_t pagepos, int new_file);
78
 
int sort_key_read(MI_SORT_PARAM *sort_param,void *key);
79
 
int sort_get_next_record(MI_SORT_PARAM *sort_param);
80
 
int sort_key_cmp(MI_SORT_PARAM *sort_param, const void *a,const void *b);
81
 
int sort_key_write(MI_SORT_PARAM *sort_param, const void *a);
82
 
my_off_t get_record_for_key(MI_INFO *info,MI_KEYDEF *keyinfo,
83
 
                            unsigned char *key);
84
 
int sort_insert_key(MI_SORT_PARAM  *sort_param,
85
 
                    register SORT_KEY_BLOCKS *key_block,
86
 
                    unsigned char *key, my_off_t prev_block);
87
 
int sort_delete_record(MI_SORT_PARAM *sort_param);
88
 
 
 
73
                          my_off_t pagepos, File new_file);
 
74
extern "C"
 
75
{
 
76
  int sort_key_read(MI_SORT_PARAM *sort_param,void *key);
 
77
  int sort_get_next_record(MI_SORT_PARAM *sort_param);
 
78
  int sort_key_cmp(MI_SORT_PARAM *sort_param, const void *a,const void *b);
 
79
  int sort_key_write(MI_SORT_PARAM *sort_param, const void *a);
 
80
  my_off_t get_record_for_key(MI_INFO *info,MI_KEYDEF *keyinfo,
 
81
                              unsigned char *key);
 
82
  int sort_insert_key(MI_SORT_PARAM  *sort_param,
 
83
                      register SORT_KEY_BLOCKS *key_block,
 
84
                      unsigned char *key, my_off_t prev_block);
 
85
  int sort_delete_record(MI_SORT_PARAM *sort_param);
 
86
}
89
87
/*static int flush_pending_blocks(MI_CHECK *param);*/
90
88
static SORT_KEY_BLOCKS  *alloc_key_blocks(MI_CHECK *param, uint32_t blocks,
91
89
                                          uint32_t buffer_length);
258
256
  unsigned char *buff;
259
257
 
260
258
  if (param->testflag & T_VERBOSE)
261
 
    printf("block_size %4u:", block_size);
 
259
    printf("block_size %4u:", block_size); /* purecov: tested */
262
260
 
263
261
  next_link=info->s->state.key_del[nr];
264
262
  records= (ha_rows) (info->state->key_file_length / block_size);
272
270
    /* Key blocks must lay within the key file length entirely. */
273
271
    if (next_link + block_size > info->state->key_file_length)
274
272
    {
 
273
      /* purecov: begin tested */
275
274
      mi_check_print_error(param, "Invalid key block position: %s  "
276
275
                           "key block size: %u  file_length: %s",
277
276
                           llstr(next_link, llbuff), block_size,
278
277
                           llstr(info->state->key_file_length, llbuff2));
279
278
      return(1);
 
279
      /* purecov: end */
280
280
    }
281
281
 
282
282
    /* Key blocks must be aligned at MI_MIN_KEY_BLOCK_LENGTH. */
283
283
    if (next_link & (MI_MIN_KEY_BLOCK_LENGTH - 1))
284
284
    {
 
285
      /* purecov: begin tested */
285
286
      mi_check_print_error(param, "Mis-aligned key block: %s  "
286
287
                           "minimum key block length: %u",
287
288
                           llstr(next_link, llbuff), MI_MIN_KEY_BLOCK_LENGTH);
288
289
      return(1);
 
290
      /* purecov: end */
289
291
    }
290
292
 
291
293
    /*
293
295
      If the key cache block size is smaller than block_size, we can so
294
296
      avoid unecessary eviction of cache block.
295
297
    */
296
 
    if (!(buff=key_cache_read(info->s->getKeyCache(),
 
298
    if (!(buff=key_cache_read(info->s->key_cache,
297
299
                              info->s->kfile, next_link, DFLT_INIT_HITS,
298
300
                              (unsigned char*) info->buff, MI_MIN_KEY_BLOCK_LENGTH,
299
301
                              MI_MIN_KEY_BLOCK_LENGTH, 1)))
300
302
    {
 
303
      /* purecov: begin tested */
301
304
      mi_check_print_error(param, "key cache read error for block: %s",
302
305
                           llstr(next_link,llbuff));
303
306
      return(1);
 
307
      /* purecov: end */
304
308
    }
305
309
    next_link=mi_sizekorr(buff);
306
310
    records--;
328
332
  if (!(param->testflag & T_SILENT)) puts("- check file-size");
329
333
 
330
334
  /* The following is needed if called externally (not from myisamchk) */
331
 
  flush_key_blocks(info->s->getKeyCache(),
 
335
  flush_key_blocks(info->s->key_cache,
332
336
                   info->s->kfile, FLUSH_FORCE_WRITE);
333
337
 
334
338
  size= lseek(info->s->kfile, 0, SEEK_END);
459
463
                  llstr(share->state.key_root[key],buff));
460
464
      if (!(param->testflag & T_INFO))
461
465
        return(-1);
462
 
      result= UINT32_MAX;
 
466
      result= -1;
463
467
      continue;
464
468
    }
465
469
    param->key_file_blocks+=keyinfo->block_length;
478
482
                    llstr(info->state->records,buff2));
479
483
        if (!(param->testflag & T_INFO))
480
484
        return(-1);
481
 
        result= UINT32_MAX;
 
485
        result= -1;
482
486
        continue;
483
487
      }
484
488
      if (found_keys - full_text_keys == 1 &&
495
499
          mi_check_print_error(param,"Key 1 doesn't point at all records");
496
500
        if (!(param->testflag & T_INFO))
497
501
          return(-1);
498
 
        result= UINT32_MAX;
 
502
        result= -1;
499
503
        continue;
500
504
      }
501
505
    }
585
589
  /* Key blocks must lay within the key file length entirely. */
586
590
  if (page + keyinfo->block_length > info->state->key_file_length)
587
591
  {
 
592
    /* purecov: begin tested */
588
593
    /* Give it a chance to fit in the real file size. */
589
594
    my_off_t max_length= lseek(info->s->kfile, 0, SEEK_END);
590
595
    mi_check_print_error(param, "Invalid key block position: %s  "
596
601
    /* Fix the remebered key file length. */
597
602
    info->state->key_file_length= (max_length &
598
603
                                   ~ (my_off_t) (keyinfo->block_length - 1));
 
604
    /* purecov: end */
599
605
  }
600
606
 
601
607
  /* Key blocks must be aligned at MI_MIN_KEY_BLOCK_LENGTH. */
602
608
  if (page & (MI_MIN_KEY_BLOCK_LENGTH - 1))
603
609
  {
 
610
    /* purecov: begin tested */
604
611
    mi_check_print_error(param, "Mis-aligned key block: %s  "
605
612
                         "minimum key block length: %u",
606
613
                         llstr(page, llbuff), MI_MIN_KEY_BLOCK_LENGTH);
607
614
    goto err;
 
615
    /* purecov: end */
608
616
  }
609
617
 
610
618
  if (!_mi_fetch_keypage(info,keyinfo,page, DFLT_INIT_HITS,buff,0))
619
627
 
620
628
  return(0);
621
629
 
 
630
  /* purecov: begin tested */
622
631
err:
623
632
  return(1);
 
633
  /* purecov: end */
624
634
}
625
635
 
626
636
 
894
904
      puts("- check record links");
895
905
  }
896
906
 
897
 
  if (!mi_alloc_rec_buff(info, SIZE_MAX, &record))
 
907
  if (!mi_alloc_rec_buff(info, -1, &record))
898
908
  {
899
909
    mi_check_print_error(param,"Not enough memory for record");
900
910
    return(-1);
1186
1196
  }
1187
1197
  else if (param->glob_crc != info->state->checksum &&
1188
1198
           (info->s->options &
1189
 
            (HA_OPTION_COMPRESS_RECORD)))
 
1199
            (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD)))
1190
1200
  {
1191
1201
    mi_check_print_warning(param,
1192
1202
                           "Record checksum is not the same as checksum stored in the index file\n");
1265
1275
  free(mi_get_rec_buff_ptr(info, record));
1266
1276
  return (error);
1267
1277
 err:
1268
 
  mi_check_print_error(param,"got error: %d when reading datafile at record: %s",errno, llstr(records,llbuff));
 
1278
  mi_check_print_error(param,"got error: %d when reading datafile at record: %s",my_errno, llstr(records,llbuff));
1269
1279
 err2:
1270
1280
  free(mi_get_rec_buff_ptr(info, record));
1271
1281
  param->testflag|=T_RETRY_WITHOUT_QUICK;
1313
1323
 
1314
1324
    However, there is an exception. Sometimes MySQL disables non-unique
1315
1325
    indexes when the table is empty (e.g. when copying a table in
1316
 
    drizzled::alter_table()). When enabling the non-unique indexes, they
 
1326
    mysql_alter_table()). When enabling the non-unique indexes, they
1317
1327
    are still empty. So there is no index block that can be lost. This
1318
1328
    optimization is implemented in this function.
1319
1329
 
1362
1372
        Flush dirty blocks of this index file from key cache and remove
1363
1373
        all blocks of this index file from key cache.
1364
1374
      */
1365
 
      error= flush_key_blocks(share->getKeyCache(), share->kfile,
 
1375
      error= flush_key_blocks(share->key_cache, share->kfile,
1366
1376
                              FLUSH_FORCE_WRITE);
1367
1377
      goto end;
1368
1378
    }
1375
1385
  }
1376
1386
 
1377
1387
  /* Remove all key blocks of this index file from key cache. */
1378
 
  if ((error= flush_key_blocks(share->getKeyCache(), share->kfile,
 
1388
  if ((error= flush_key_blocks(share->key_cache, share->kfile,
1379
1389
                               FLUSH_IGNORE_CHANGED)))
1380
 
    goto end;
 
1390
    goto end; /* purecov: inspected */
1381
1391
 
1382
1392
  /* Clear index root block pointers. */
1383
1393
  for (i= 0; i < share->base.keys; i++)
1406
1416
  int error,got_error;
1407
1417
  ha_rows start_records,new_header_length;
1408
1418
  my_off_t del;
1409
 
  int new_file;
 
1419
  File new_file;
1410
1420
  MYISAM_SHARE *share=info->s;
1411
1421
  char llbuff[22],llbuff2[22];
1412
1422
  SORT_INFO sort_info;
1428
1438
  }
1429
1439
  param->testflag|=T_REP; /* for easy checking */
1430
1440
 
1431
 
  if (info->s->options & (HA_OPTION_COMPRESS_RECORD))
 
1441
  if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
1432
1442
    param->testflag|=T_CALC_CHECKSUM;
1433
1443
 
1434
1444
  if (!param->using_global_keycache)
1435
 
    assert(0);
 
1445
    init_key_cache(dflt_key_cache, param->key_cache_block_size,
 
1446
                   (size_t)param->use_buffers, 0, 0);
1436
1447
 
1437
 
  if (param->read_cache.init_io_cache(info->dfile, (uint) param->read_buffer_length, READ_CACHE,share->pack.header_length,1,MYF(MY_WME)))
 
1448
  if (init_io_cache(&param->read_cache,info->dfile,
 
1449
                    (uint) param->read_buffer_length,
 
1450
                    READ_CACHE,share->pack.header_length,1,MYF(MY_WME)))
1438
1451
  {
1439
1452
    memset(&info->rec_cache, 0, sizeof(info->rec_cache));
1440
1453
    goto err;
1441
1454
  }
1442
 
  if (not rep_quick)
1443
 
  {
1444
 
    if (info->rec_cache.init_io_cache(-1, (uint) param->write_buffer_length, WRITE_CACHE, new_header_length, 1, MYF(MY_WME | MY_WAIT_IF_FULL)))
1445
 
    {
 
1455
  if (!rep_quick)
 
1456
    if (init_io_cache(&info->rec_cache,-1,(uint) param->write_buffer_length,
 
1457
                      WRITE_CACHE, new_header_length, 1,
 
1458
                      MYF(MY_WME | MY_WAIT_IF_FULL)))
1446
1459
      goto err;
1447
 
    }
1448
 
  }
1449
1460
  info->opt_flag|=WRITE_CACHE_USED;
1450
 
  if (!mi_alloc_rec_buff(info, SIZE_MAX, &sort_param.record) ||
1451
 
      !mi_alloc_rec_buff(info, SIZE_MAX, &sort_param.rec_buff))
 
1461
  if (!mi_alloc_rec_buff(info, -1, &sort_param.record) ||
 
1462
      !mi_alloc_rec_buff(info, -1, &sort_param.rec_buff))
1452
1463
  {
1453
1464
    mi_check_print_error(param, "Not enough memory for extra record");
1454
1465
    goto err;
1457
1468
  if (!rep_quick)
1458
1469
  {
1459
1470
    /* Get real path for data file */
1460
 
    if ((new_file=my_create(internal::fn_format(param->temp_filename,
 
1471
    if ((new_file=my_create(fn_format(param->temp_filename,
1461
1472
                                      share->data_file_name, "",
1462
1473
                                      DATA_TMP_EXT, 2+4),
1463
1474
                            0,param->tmpfile_createflag,
1513
1524
  {
1514
1525
    if (writekeys(&sort_param))
1515
1526
    {
1516
 
      if (errno != HA_ERR_FOUND_DUPP_KEY)
 
1527
      if (my_errno != HA_ERR_FOUND_DUPP_KEY)
1517
1528
        goto err;
1518
1529
      mi_check_print_info(param,"Duplicate key %2d for record at %10s against new record at %10s",
1519
1530
                          info->errkey+1,
1548
1559
  {
1549
1560
    mi_check_print_warning(param,
1550
1561
                           "Can't change size of indexfile, error: %d",
1551
 
                           errno);
 
1562
                           my_errno);
1552
1563
    goto err;
1553
1564
  }
1554
1565
 
1574
1585
 
1575
1586
  if (!rep_quick)
1576
1587
  {
1577
 
    internal::my_close(info->dfile,MYF(0));
 
1588
    my_close(info->dfile,MYF(0));
1578
1589
    info->dfile=new_file;
1579
1590
    info->state->data_file_length=sort_param.filepos;
1580
1591
    share->state.version=(ulong) time((time_t*) 0);     /* Force reopen */
1607
1618
    /* Replace the actual file with the temporary file */
1608
1619
    if (new_file >= 0)
1609
1620
    {
1610
 
      internal::my_close(new_file,MYF(0));
 
1621
      my_close(new_file,MYF(0));
1611
1622
      info->dfile=new_file= -1;
1612
1623
      if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
1613
1624
                            DATA_TMP_EXT, share->base.raid_chunks,
1620
1631
  if (got_error)
1621
1632
  {
1622
1633
    if (! param->error_printed)
1623
 
      mi_check_print_error(param,"%d for record at pos %s",errno,
 
1634
      mi_check_print_error(param,"%d for record at pos %s",my_errno,
1624
1635
                  llstr(sort_param.start_recpos,llbuff));
1625
1636
    if (new_file >= 0)
1626
1637
    {
1627
 
      internal::my_close(new_file,MYF(0));
 
1638
      my_close(new_file,MYF(0));
1628
1639
      my_delete(param->temp_filename, MYF(MY_WME));
1629
1640
      info->rec_cache.file=-1; /* don't flush data to new_file, it's closed */
1630
1641
    }
1641
1652
  rec_buff_ptr= NULL;
1642
1653
 
1643
1654
  free(sort_info.buff);
1644
 
  param->read_cache.end_io_cache();
 
1655
  end_io_cache(&param->read_cache);
1645
1656
  info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
1646
 
  info->rec_cache.end_io_cache();
1647
 
  got_error|= flush_blocks(param, share->getKeyCache(), share->kfile);
1648
 
  if (not got_error && param->testflag & T_UNPACK)
 
1657
  end_io_cache(&info->rec_cache);
 
1658
  got_error|=flush_blocks(param, share->key_cache, share->kfile);
 
1659
  if (!got_error && param->testflag & T_UNPACK)
1649
1660
  {
1650
1661
    share->state.header.options[0]&= (unsigned char) ~HA_OPTION_COMPRESS_RECORD;
1651
1662
    share->pack.header_length=0;
1682
1693
  return(0);
1683
1694
 
1684
1695
 err:
1685
 
  if (errno == HA_ERR_FOUND_DUPP_KEY)
 
1696
  if (my_errno == HA_ERR_FOUND_DUPP_KEY)
1686
1697
  {
1687
1698
    info->errkey=(int) i;                       /* This key was found */
1688
1699
    while ( i-- > 0 )
1751
1762
 
1752
1763
        /* Tell system that we want all memory for our cache */
1753
1764
 
1754
 
void lock_memory(MI_CHECK *)
 
1765
void lock_memory(MI_CHECK *param)
1755
1766
{
 
1767
#ifdef SUN_OS                           /* Key-cacheing thrases on sun 4.1 */
 
1768
  if (param->opt_lock_memory)
 
1769
  {
 
1770
    int success = mlockall(MCL_CURRENT);        /* or plock(DATLOCK); */
 
1771
    if (geteuid() == 0 && success != 0)
 
1772
      mi_check_print_warning(param,
 
1773
                             "Failed to lock memory. errno %d",my_errno);
 
1774
  }
 
1775
#else
 
1776
  (void)param;
 
1777
#endif
1756
1778
} /* lock_memory */
1757
1779
 
1758
1780
 
1759
1781
        /* Flush all changed blocks to disk */
1760
1782
 
1761
 
int flush_blocks(MI_CHECK *param, KEY_CACHE *key_cache, int file)
 
1783
int flush_blocks(MI_CHECK *param, KEY_CACHE *key_cache, File file)
1762
1784
{
1763
1785
  if (flush_key_blocks(key_cache, file, FLUSH_RELEASE))
1764
1786
  {
1765
 
    mi_check_print_error(param,"%d when trying to write bufferts",errno);
 
1787
    mi_check_print_error(param,"%d when trying to write bufferts",my_errno);
1766
1788
    return(1);
1767
1789
  }
1768
1790
  if (!param->using_global_keycache)
1777
1799
{
1778
1800
  register uint32_t key;
1779
1801
  register MI_KEYDEF *keyinfo;
1780
 
  int new_file;
 
1802
  File new_file;
1781
1803
  my_off_t index_pos[HA_MAX_POSSIBLE_KEY];
1782
1804
  uint32_t r_locks,w_locks;
1783
1805
  int old_lock;
1792
1814
    printf("- Sorting index for MyISAM-table '%s'\n",name);
1793
1815
 
1794
1816
  /* Get real path for index file */
1795
 
  internal::fn_format(param->temp_filename,name,"", MI_NAME_IEXT,2+4+32);
1796
 
  if ((new_file=my_create(internal::fn_format(param->temp_filename,param->temp_filename,
 
1817
  fn_format(param->temp_filename,name,"", MI_NAME_IEXT,2+4+32);
 
1818
  if ((new_file=my_create(fn_format(param->temp_filename,param->temp_filename,
1797
1819
                                    "", INDEX_TMP_EXT,2+4),
1798
1820
                          0,param->tmpfile_createflag,MYF(0))) <= 0)
1799
1821
  {
1824
1846
  }
1825
1847
 
1826
1848
  /* Flush key cache for this file if we are calling this outside myisamchk */
1827
 
  flush_key_blocks(share->getKeyCache(), share->kfile, FLUSH_IGNORE_CHANGED);
 
1849
  flush_key_blocks(share->key_cache,share->kfile, FLUSH_IGNORE_CHANGED);
1828
1850
 
1829
1851
  share->state.version=(ulong) time((time_t*) 0);
1830
1852
  old_state= share->state;                      /* save state if not stored */
1835
1857
        /* Put same locks as old file */
1836
1858
  share->r_locks= share->w_locks= share->tot_locks= 0;
1837
1859
  (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
1838
 
  internal::my_close(share->kfile,MYF(MY_WME));
 
1860
  my_close(share->kfile,MYF(MY_WME));
1839
1861
  share->kfile = -1;
1840
 
  internal::my_close(new_file,MYF(MY_WME));
 
1862
  my_close(new_file,MYF(MY_WME));
1841
1863
  if (change_to_newfile(share->index_file_name,MI_NAME_IEXT,INDEX_TMP_EXT,0,
1842
1864
                        MYF(0)) ||
1843
1865
      mi_open_keyfile(share))
1861
1883
  return(0);
1862
1884
 
1863
1885
err:
1864
 
  internal::my_close(new_file,MYF(MY_WME));
 
1886
  my_close(new_file,MYF(MY_WME));
1865
1887
err2:
1866
1888
  my_delete(param->temp_filename,MYF(MY_WME));
1867
1889
  return(-1);
1871
1893
         /* Sort records recursive using one index */
1872
1894
 
1873
1895
static int sort_one_index(MI_CHECK *param, MI_INFO *info, MI_KEYDEF *keyinfo,
1874
 
                          my_off_t pagepos, int new_file)
 
1896
                          my_off_t pagepos, File new_file)
1875
1897
{
1876
1898
  uint32_t length,nod_flag,used_length, key_length;
1877
1899
  unsigned char *buff,*keypos,*endpos;
1922
1944
  if (my_pwrite(new_file,(unsigned char*) buff,(uint) keyinfo->block_length,
1923
1945
                new_page_pos,MYF(MY_NABP | MY_WAIT_IF_FULL)))
1924
1946
  {
1925
 
    mi_check_print_error(param,"Can't write indexblock, error: %d",errno);
 
1947
    mi_check_print_error(param,"Can't write indexblock, error: %d",my_errno);
1926
1948
    goto err;
1927
1949
  }
1928
1950
  free(buff);
1950
1972
  (void)raid_chunks;
1951
1973
  char old_filename[FN_REFLEN],new_filename[FN_REFLEN];
1952
1974
  /* Get real path to filename */
1953
 
  (void) internal::fn_format(old_filename,filename,"",old_ext,2+4+32);
 
1975
  (void) fn_format(old_filename,filename,"",old_ext,2+4+32);
1954
1976
  return my_redel(old_filename,
1955
 
                  internal::fn_format(new_filename,old_filename,"",new_ext,2+4),
 
1977
                  fn_format(new_filename,old_filename,"",new_ext,2+4),
1956
1978
                  MYF(MY_WME | MY_LINK_WARNING | MyFlags));
1957
1979
} /* change_to_newfile */
1958
1980
 
1960
1982
 
1961
1983
        /* Copy a block between two files */
1962
1984
 
1963
 
int filecopy(MI_CHECK *param, int to,int from,my_off_t start,
 
1985
int filecopy(MI_CHECK *param, File to,File from,my_off_t start,
1964
1986
             my_off_t length, const char *type)
1965
1987
{
1966
1988
  char tmp_buff[IO_SIZE],*buff;
1990
2012
  if (buff != tmp_buff)
1991
2013
    free(buff);
1992
2014
  mi_check_print_error(param,"Can't copy %s to tempfile, error %d",
1993
 
                       type,errno);
 
2015
                       type,my_errno);
1994
2016
  return(1);
1995
2017
}
1996
2018
 
2018
2040
  ulong length;
2019
2041
  ha_rows start_records;
2020
2042
  my_off_t new_header_length,del;
2021
 
  int new_file;
 
2043
  File new_file;
2022
2044
  MI_SORT_PARAM sort_param;
2023
2045
  MYISAM_SHARE *share=info->s;
2024
2046
  HA_KEYSEG *keyseg;
2039
2061
  }
2040
2062
  param->testflag|=T_REP; /* for easy checking */
2041
2063
 
2042
 
  if (info->s->options & (HA_OPTION_COMPRESS_RECORD))
 
2064
  if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
2043
2065
    param->testflag|=T_CALC_CHECKSUM;
2044
2066
 
2045
2067
  memset(&sort_info, 0, sizeof(sort_info));
2046
2068
  memset(&sort_param, 0, sizeof(sort_param));
2047
2069
  if (!(sort_info.key_block=
2048
 
        alloc_key_blocks(param, (uint) param->sort_key_blocks, share->base.max_key_block_length))
2049
 
      || param->read_cache.init_io_cache(info->dfile, (uint) param->read_buffer_length, READ_CACHE,share->pack.header_length,1,MYF(MY_WME))
2050
 
      || (! rep_quick && info->rec_cache.init_io_cache(info->dfile, (uint) param->write_buffer_length, WRITE_CACHE,new_header_length,1, MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw)))
2051
 
  {
 
2070
        alloc_key_blocks(param,
 
2071
                         (uint) param->sort_key_blocks,
 
2072
                         share->base.max_key_block_length))
 
2073
      || init_io_cache(&param->read_cache,info->dfile,
 
2074
                       (uint) param->read_buffer_length,
 
2075
                       READ_CACHE,share->pack.header_length,1,MYF(MY_WME)) ||
 
2076
      (! rep_quick &&
 
2077
       init_io_cache(&info->rec_cache,info->dfile,
 
2078
                     (uint) param->write_buffer_length,
 
2079
                     WRITE_CACHE,new_header_length,1,
 
2080
                     MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw)))
2052
2081
    goto err;
2053
 
  }
2054
2082
  sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
2055
2083
  info->opt_flag|=WRITE_CACHE_USED;
2056
2084
  info->rec_cache.file=info->dfile;             /* for sort_delete_record */
2057
2085
 
2058
 
  if (!mi_alloc_rec_buff(info, SIZE_MAX, &sort_param.record) ||
2059
 
      !mi_alloc_rec_buff(info, SIZE_MAX, &sort_param.rec_buff))
 
2086
  if (!mi_alloc_rec_buff(info, -1, &sort_param.record) ||
 
2087
      !mi_alloc_rec_buff(info, -1, &sort_param.rec_buff))
2060
2088
  {
2061
2089
    mi_check_print_error(param, "Not enough memory for extra record");
2062
2090
    goto err;
2064
2092
  if (!rep_quick)
2065
2093
  {
2066
2094
    /* Get real path for data file */
2067
 
    if ((new_file=my_create(internal::fn_format(param->temp_filename,
 
2095
    if ((new_file=my_create(fn_format(param->temp_filename,
2068
2096
                                      share->data_file_name, "",
2069
2097
                                      DATA_TMP_EXT, 2+4),
2070
2098
                            0,param->tmpfile_createflag,
2185
2213
    }
2186
2214
    /* No need to calculate checksum again. */
2187
2215
    sort_param.calc_checksum= 0;
2188
 
    sort_param.wordroot.free_root(MYF(0));
 
2216
    free_root(&sort_param.wordroot, MYF(0));
2189
2217
 
2190
2218
    /* Set for next loop */
2191
2219
    sort_info.max_records= (ha_rows) info->state->records;
2201
2229
    if (sort_param.fix_datafile)
2202
2230
    {
2203
2231
      param->read_cache.end_of_file=sort_param.filepos;
2204
 
      if (write_data_suffix(&sort_info, 1) || info->rec_cache.end_io_cache())
2205
 
      {
 
2232
      if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
2206
2233
        goto err;
2207
 
      }
2208
2234
      if (param->testflag & T_SAFE_REPAIR)
2209
2235
      {
2210
2236
        /* Don't repair if we loosed more than one row */
2218
2244
        sort_param.filepos;
2219
2245
      /* Only whole records */
2220
2246
      share->state.version=(ulong) time((time_t*) 0);
2221
 
      internal::my_close(info->dfile,MYF(0));
 
2247
      my_close(info->dfile,MYF(0));
2222
2248
      info->dfile=new_file;
2223
2249
      share->data_file_type=sort_info.new_data_file_type;
2224
2250
      share->pack.header_length=(ulong) new_header_length;
2228
2254
      info->state->data_file_length=sort_param.max_pos;
2229
2255
 
2230
2256
    param->read_cache.file=info->dfile;         /* re-init read cache */
2231
 
    param->read_cache.reinit_io_cache(READ_CACHE,share->pack.header_length, 1,1);
 
2257
    reinit_io_cache(&param->read_cache,READ_CACHE,share->pack.header_length,
 
2258
                    1,1);
2232
2259
  }
2233
2260
 
2234
2261
  if (param->testflag & T_WRITE_LOOP)
2260
2287
      if (ftruncate(info->dfile, skr))
2261
2288
        mi_check_print_warning(param,
2262
2289
                               "Can't change size of datafile,  error: %d",
2263
 
                               errno);
 
2290
                               my_errno);
2264
2291
  }
2265
2292
  if (param->testflag & T_CALC_CHECKSUM)
2266
2293
    info->state->checksum=param->glob_crc;
2268
2295
  if (ftruncate(share->kfile, info->state->key_file_length))
2269
2296
    mi_check_print_warning(param,
2270
2297
                           "Can't change size of indexfile, error: %d",
2271
 
                           errno);
 
2298
                           my_errno);
2272
2299
 
2273
2300
  if (!(param->testflag & T_SILENT))
2274
2301
  {
2285
2312
    memcpy( &share->state.state, info->state, sizeof(*info->state));
2286
2313
 
2287
2314
err:
2288
 
  got_error|= flush_blocks(param, share->getKeyCache(), share->kfile);
2289
 
  info->rec_cache.end_io_cache();
 
2315
  got_error|= flush_blocks(param, share->key_cache, share->kfile);
 
2316
  end_io_cache(&info->rec_cache);
2290
2317
  if (!got_error)
2291
2318
  {
2292
2319
    /* Replace the actual file with the temporary file */
2293
2320
    if (new_file >= 0)
2294
2321
    {
2295
 
      internal::my_close(new_file,MYF(0));
 
2322
      my_close(new_file,MYF(0));
2296
2323
      info->dfile=new_file= -1;
2297
2324
      if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
2298
2325
                            DATA_TMP_EXT, share->base.raid_chunks,
2305
2332
  if (got_error)
2306
2333
  {
2307
2334
    if (! param->error_printed)
2308
 
      mi_check_print_error(param,"%d when fixing table",errno);
 
2335
      mi_check_print_error(param,"%d when fixing table",my_errno);
2309
2336
    if (new_file >= 0)
2310
2337
    {
2311
 
      internal::my_close(new_file,MYF(0));
 
2338
      my_close(new_file,MYF(0));
2312
2339
      my_delete(param->temp_filename, MYF(MY_WME));
2313
2340
      if (info->dfile == new_file)
2314
2341
        info->dfile= -1;
2330
2357
 
2331
2358
  free((unsigned char*) sort_info.key_block);
2332
2359
  free(sort_info.buff);
2333
 
  param->read_cache.end_io_cache();
 
2360
  end_io_cache(&param->read_cache);
 
2361
  info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
 
2362
  if (!got_error && (param->testflag & T_UNPACK))
 
2363
  {
 
2364
    share->state.header.options[0]&= (unsigned char) ~HA_OPTION_COMPRESS_RECORD;
 
2365
    share->pack.header_length=0;
 
2366
  }
 
2367
  return(got_error);
 
2368
}
 
2369
 
 
2370
/*
 
2371
  Threaded repair of table using sorting
 
2372
 
 
2373
  SYNOPSIS
 
2374
    mi_repair_parallel()
 
2375
    param               Repair parameters
 
2376
    info                MyISAM handler to repair
 
2377
    name                Name of table (for warnings)
 
2378
    rep_quick           set to <> 0 if we should not change data file
 
2379
 
 
2380
  DESCRIPTION
 
2381
    Same as mi_repair_by_sort but do it multithreaded
 
2382
    Each key is handled by a separate thread.
 
2383
    TODO: make a number of threads a parameter
 
2384
 
 
2385
    In parallel repair we use one thread per index. There are two modes:
 
2386
 
 
2387
    Quick
 
2388
 
 
2389
      Only the indexes are rebuilt. All threads share a read buffer.
 
2390
      Every thread that needs fresh data in the buffer enters the shared
 
2391
      cache lock. The last thread joining the lock reads the buffer from
 
2392
      the data file and wakes all other threads.
 
2393
 
 
2394
    Non-quick
 
2395
 
 
2396
      The data file is rebuilt and all indexes are rebuilt to point to
 
2397
      the new record positions. One thread is the master thread. It
 
2398
      reads from the old data file and writes to the new data file. It
 
2399
      also creates one of the indexes. The other threads read from a
 
2400
      buffer which is filled by the master. If they need fresh data,
 
2401
      they enter the shared cache lock. If the masters write buffer is
 
2402
      full, it flushes it to the new data file and enters the shared
 
2403
      cache lock too. When all threads joined in the lock, the master
 
2404
      copies its write buffer to the read buffer for the other threads
 
2405
      and wakes them.
 
2406
 
 
2407
  RESULT
 
2408
    0   ok
 
2409
    <>0 Error
 
2410
*/
 
2411
 
 
2412
int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
 
2413
                        const char * name, int rep_quick)
 
2414
{
 
2415
  int got_error;
 
2416
  uint32_t i,key, total_key_length, istep;
 
2417
  ulong rec_length;
 
2418
  ha_rows start_records;
 
2419
  my_off_t new_header_length,del;
 
2420
  File new_file;
 
2421
  MI_SORT_PARAM *sort_param=0;
 
2422
  MYISAM_SHARE *share=info->s;
 
2423
  ulong   *rec_per_key_part;
 
2424
  HA_KEYSEG *keyseg;
 
2425
  char llbuff[22];
 
2426
  IO_CACHE new_data_cache; /* For non-quick repair. */
 
2427
  IO_CACHE_SHARE io_share;
 
2428
  SORT_INFO sort_info;
 
2429
  uint64_t key_map= 0;
 
2430
  pthread_attr_t thr_attr;
 
2431
  ulong max_pack_reclength;
 
2432
 
 
2433
  start_records=info->state->records;
 
2434
  got_error=1;
 
2435
  new_file= -1;
 
2436
  new_header_length=(param->testflag & T_UNPACK) ? 0 :
 
2437
    share->pack.header_length;
 
2438
  if (!(param->testflag & T_SILENT))
 
2439
  {
 
2440
    printf("- parallel recovering (with sort) MyISAM-table '%s'\n",name);
 
2441
    printf("Data records: %s\n", llstr(start_records,llbuff));
 
2442
  }
 
2443
  param->testflag|=T_REP; /* for easy checking */
 
2444
 
 
2445
  if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
 
2446
    param->testflag|=T_CALC_CHECKSUM;
 
2447
 
 
2448
  /*
 
2449
    Quick repair (not touching data file, rebuilding indexes):
 
2450
    {
 
2451
      Read  cache is (MI_CHECK *param)->read_cache using info->dfile.
 
2452
    }
 
2453
 
 
2454
    Non-quick repair (rebuilding data file and indexes):
 
2455
    {
 
2456
      Master thread:
 
2457
 
 
2458
        Read  cache is (MI_CHECK *param)->read_cache using info->dfile.
 
2459
        Write cache is (MI_INFO   *info)->rec_cache  using new_file.
 
2460
 
 
2461
      Slave threads:
 
2462
 
 
2463
        Read  cache is new_data_cache synced to master rec_cache.
 
2464
 
 
2465
      The final assignment of the filedescriptor for rec_cache is done
 
2466
      after the cache creation.
 
2467
 
 
2468
      Don't check file size on new_data_cache, as the resulting file size
 
2469
      is not known yet.
 
2470
 
 
2471
      As rec_cache and new_data_cache are synced, write_buffer_length is
 
2472
      used for the read cache 'new_data_cache'. Both start at the same
 
2473
      position 'new_header_length'.
 
2474
    }
 
2475
  */
 
2476
  memset(&sort_info, 0, sizeof(sort_info));
 
2477
  /* Initialize pthread structures before goto err. */
 
2478
  pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST);
 
2479
  pthread_cond_init(&sort_info.cond, 0);
 
2480
 
 
2481
  if (!(sort_info.key_block=
 
2482
        alloc_key_blocks(param, (uint) param->sort_key_blocks,
 
2483
                         share->base.max_key_block_length)) ||
 
2484
      init_io_cache(&param->read_cache, info->dfile,
 
2485
                    (uint) param->read_buffer_length,
 
2486
                    READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) ||
 
2487
      (!rep_quick &&
 
2488
       (init_io_cache(&info->rec_cache, info->dfile,
 
2489
                      (uint) param->write_buffer_length,
 
2490
                      WRITE_CACHE, new_header_length, 1,
 
2491
                      MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) ||
 
2492
        init_io_cache(&new_data_cache, -1,
 
2493
                      (uint) param->write_buffer_length,
 
2494
                      READ_CACHE, new_header_length, 1,
 
2495
                      MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))))
 
2496
    goto err;
 
2497
  sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
 
2498
  info->opt_flag|=WRITE_CACHE_USED;
 
2499
  info->rec_cache.file=info->dfile;         /* for sort_delete_record */
 
2500
 
 
2501
  if (!rep_quick)
 
2502
  {
 
2503
    /* Get real path for data file */
 
2504
    if ((new_file=my_create(fn_format(param->temp_filename,
 
2505
                                      share->data_file_name, "",
 
2506
                                      DATA_TMP_EXT,
 
2507
                                      2+4),
 
2508
                            0,param->tmpfile_createflag,
 
2509
                            MYF(0))) < 0)
 
2510
    {
 
2511
      mi_check_print_error(param,"Can't create new tempfile: '%s'",
 
2512
                           param->temp_filename);
 
2513
      goto err;
 
2514
    }
 
2515
    if (new_header_length &&
 
2516
        filecopy(param, new_file,info->dfile,0L,new_header_length,
 
2517
                 "datafile-header"))
 
2518
      goto err;
 
2519
    if (param->testflag & T_UNPACK)
 
2520
    {
 
2521
      share->options&= ~HA_OPTION_COMPRESS_RECORD;
 
2522
      mi_int2store(share->state.header.options,share->options);
 
2523
    }
 
2524
    share->state.dellink= HA_OFFSET_ERROR;
 
2525
    info->rec_cache.file=new_file;
 
2526
  }
 
2527
 
 
2528
  info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
 
2529
 
 
2530
  /* Optionally drop indexes and optionally modify the key_map. */
 
2531
  mi_drop_all_indexes(param, info, false);
 
2532
  key_map= share->state.key_map;
 
2533
  if (param->testflag & T_CREATE_MISSING_KEYS)
 
2534
  {
 
2535
    /* Invert the copied key_map to recreate all disabled indexes. */
 
2536
    key_map= ~key_map;
 
2537
  }
 
2538
 
 
2539
  sort_info.info=info;
 
2540
  sort_info.param = param;
 
2541
 
 
2542
  set_data_file_type(&sort_info, share);
 
2543
  sort_info.dupp=0;
 
2544
  sort_info.buff=0;
 
2545
  param->read_cache.end_of_file=sort_info.filelength=
 
2546
    lseek(param->read_cache.file,0L,SEEK_END);
 
2547
 
 
2548
  if (share->data_file_type == DYNAMIC_RECORD)
 
2549
    rec_length=max(share->base.min_pack_length+1,share->base.min_block_length);
 
2550
  else if (share->data_file_type == COMPRESSED_RECORD)
 
2551
    rec_length=share->base.min_block_length;
 
2552
  else
 
2553
    rec_length=share->base.pack_reclength;
 
2554
  /*
 
2555
    +1 below is required hack for parallel repair mode.
 
2556
    The info->state->records value, that is compared later
 
2557
    to sort_info.max_records and cannot exceed it, is
 
2558
    increased in sort_key_write. In mi_repair_by_sort, sort_key_write
 
2559
    is called after sort_key_read, where the comparison is performed,
 
2560
    but in parallel mode master thread can call sort_key_write
 
2561
    before some other repair thread calls sort_key_read.
 
2562
    Furthermore I'm not even sure +1 would be enough.
 
2563
    May be sort_info.max_records shold be always set to max value in
 
2564
    parallel mode.
 
2565
  */
 
2566
  sort_info.max_records=
 
2567
    ((param->testflag & T_CREATE_MISSING_KEYS) ? info->state->records + 1:
 
2568
     (ha_rows) (sort_info.filelength/rec_length+1));
 
2569
 
 
2570
  del=info->state->del;
 
2571
  param->glob_crc=0;
 
2572
  /* for compressed tables */
 
2573
  max_pack_reclength= share->base.pack_reclength;
 
2574
  if (share->options & HA_OPTION_COMPRESS_RECORD)
 
2575
    set_if_bigger(max_pack_reclength, share->max_pack_length);
 
2576
  if (!(sort_param=(MI_SORT_PARAM *)
 
2577
        malloc(share->base.keys *
 
2578
               (sizeof(MI_SORT_PARAM) + max_pack_reclength))))
 
2579
  {
 
2580
    mi_check_print_error(param,"Not enough memory for key!");
 
2581
    goto err;
 
2582
  }
 
2583
  memset(sort_param, 0, share->base.keys *
 
2584
                        (sizeof(MI_SORT_PARAM) + max_pack_reclength));
 
2585
  total_key_length=0;
 
2586
  rec_per_key_part= param->rec_per_key_part;
 
2587
  info->state->records=info->state->del=share->state.split=0;
 
2588
  info->state->empty=0;
 
2589
 
 
2590
  for (i=key=0, istep=1 ; key < share->base.keys ;
 
2591
       rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
 
2592
  {
 
2593
    sort_param[i].key=key;
 
2594
    sort_param[i].keyinfo=share->keyinfo+key;
 
2595
    sort_param[i].seg=sort_param[i].keyinfo->seg;
 
2596
    /*
 
2597
      Skip this index if it is marked disabled in the copied
 
2598
      (and possibly inverted) key_map.
 
2599
    */
 
2600
    if (! mi_is_key_active(key_map, key))
 
2601
    {
 
2602
      /* Remember old statistics for key */
 
2603
      assert(rec_per_key_part >= param->rec_per_key_part);
 
2604
      memcpy(rec_per_key_part,
 
2605
             (share->state.rec_per_key_part +
 
2606
              (rec_per_key_part - param->rec_per_key_part)),
 
2607
             sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
 
2608
      istep=0;
 
2609
      continue;
 
2610
    }
 
2611
    istep=1;
 
2612
    if ((!(param->testflag & T_SILENT)))
 
2613
      printf ("- Fixing index %d\n",key+1);
 
2614
    {
 
2615
      sort_param[i].key_read=sort_key_read;
 
2616
      sort_param[i].key_write=sort_key_write;
 
2617
    }
 
2618
    sort_param[i].key_cmp=sort_key_cmp;
 
2619
    sort_param[i].lock_in_memory=lock_memory;
 
2620
    sort_param[i].sort_info=&sort_info;
 
2621
    sort_param[i].master=0;
 
2622
    sort_param[i].fix_datafile=0;
 
2623
    sort_param[i].calc_checksum= 0;
 
2624
 
 
2625
    sort_param[i].filepos=new_header_length;
 
2626
    sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length;
 
2627
 
 
2628
    sort_param[i].record= (((unsigned char *)(sort_param+share->base.keys))+
 
2629
                           (max_pack_reclength * i));
 
2630
    if (!mi_alloc_rec_buff(info, -1, &sort_param[i].rec_buff))
 
2631
    {
 
2632
      mi_check_print_error(param,"Not enough memory!");
 
2633
      goto err;
 
2634
    }
 
2635
 
 
2636
    sort_param[i].key_length=share->rec_reflength;
 
2637
    for (keyseg=sort_param[i].seg; keyseg->type != HA_KEYTYPE_END;
 
2638
         keyseg++)
 
2639
    {
 
2640
      sort_param[i].key_length+=keyseg->length;
 
2641
      if (keyseg->flag & HA_SPACE_PACK)
 
2642
        sort_param[i].key_length+=get_pack_length(keyseg->length);
 
2643
      if (keyseg->flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
 
2644
        sort_param[i].key_length+=2 + test(keyseg->length >= 127);
 
2645
      if (keyseg->flag & HA_NULL_PART)
 
2646
        sort_param[i].key_length++;
 
2647
    }
 
2648
    total_key_length+=sort_param[i].key_length;
 
2649
  }
 
2650
  sort_info.total_keys=i;
 
2651
  sort_param[0].master= 1;
 
2652
  sort_param[0].fix_datafile= (bool)(! rep_quick);
 
2653
  sort_param[0].calc_checksum= test(param->testflag & T_CALC_CHECKSUM);
 
2654
 
 
2655
  sort_info.got_error=0;
 
2656
  pthread_mutex_lock(&sort_info.mutex);
 
2657
 
 
2658
  /*
 
2659
    Initialize the I/O cache share for use with the read caches and, in
 
2660
    case of non-quick repair, the write cache. When all threads join on
 
2661
    the cache lock, the writer copies the write cache contents to the
 
2662
    read caches.
 
2663
  */
 
2664
  if (i > 1)
 
2665
  {
 
2666
    if (rep_quick)
 
2667
      init_io_cache_share(&param->read_cache, &io_share, NULL, i);
 
2668
    else
 
2669
      init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
 
2670
  }
 
2671
  else
 
2672
    io_share.total_threads= 0; /* share not used */
 
2673
 
 
2674
  (void) pthread_attr_init(&thr_attr);
 
2675
  (void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
 
2676
 
 
2677
  for (i=0 ; i < sort_info.total_keys ; i++)
 
2678
  {
 
2679
    /*
 
2680
      Copy the properly initialized IO_CACHE structure so that every
 
2681
      thread has its own copy. In quick mode param->read_cache is shared
 
2682
      for use by all threads. In non-quick mode all threads but the
 
2683
      first copy the shared new_data_cache, which is synchronized to the
 
2684
      write cache of the first thread. The first thread copies
 
2685
      param->read_cache, which is not shared.
 
2686
    */
 
2687
    sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache :
 
2688
                               new_data_cache);
 
2689
 
 
2690
    /*
 
2691
      two approaches: the same amount of memory for each thread
 
2692
      or the memory for the same number of keys for each thread...
 
2693
      In the second one all the threads will fill their sort_buffers
 
2694
      (and call write_keys) at the same time, putting more stress on i/o.
 
2695
    */
 
2696
    sort_param[i].sortbuff_size=
 
2697
#ifndef USING_SECOND_APPROACH
 
2698
      param->sort_buffer_length/sort_info.total_keys;
 
2699
#else
 
2700
      param->sort_buffer_length*sort_param[i].key_length/total_key_length;
 
2701
#endif
 
2702
    if (pthread_create(&sort_param[i].thr, &thr_attr,
 
2703
                       thr_find_all_keys,
 
2704
                       (void *) (sort_param+i)))
 
2705
    {
 
2706
      mi_check_print_error(param,"Cannot start a repair thread");
 
2707
      /* Cleanup: Detach from the share. Avoid others to be blocked. */
 
2708
      if (io_share.total_threads)
 
2709
        remove_io_thread(&sort_param[i].read_cache);
 
2710
      sort_info.got_error=1;
 
2711
    }
 
2712
    else
 
2713
      sort_info.threads_running++;
 
2714
  }
 
2715
  (void) pthread_attr_destroy(&thr_attr);
 
2716
 
 
2717
  /* waiting for all threads to finish */
 
2718
  while (sort_info.threads_running)
 
2719
    pthread_cond_wait(&sort_info.cond, &sort_info.mutex);
 
2720
  pthread_mutex_unlock(&sort_info.mutex);
 
2721
 
 
2722
  if ((got_error= thr_write_keys(sort_param)))
 
2723
  {
 
2724
    param->retry_repair=1;
 
2725
    goto err;
 
2726
  }
 
2727
  got_error=1;                          /* Assume the following may go wrong */
 
2728
 
 
2729
  if (sort_param[0].fix_datafile)
 
2730
  {
 
2731
    /*
 
2732
      Append some nuls to the end of a memory mapped file. Destroy the
 
2733
      write cache. The master thread did already detach from the share
 
2734
      by remove_io_thread() in sort.c:thr_find_all_keys().
 
2735
    */
 
2736
    if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
 
2737
      goto err;
 
2738
    if (param->testflag & T_SAFE_REPAIR)
 
2739
    {
 
2740
      /* Don't repair if we loosed more than one row */
 
2741
      if (info->state->records+1 < start_records)
 
2742
      {
 
2743
        info->state->records=start_records;
 
2744
        goto err;
 
2745
      }
 
2746
    }
 
2747
    share->state.state.data_file_length= info->state->data_file_length=
 
2748
      sort_param->filepos;
 
2749
    /* Only whole records */
 
2750
    share->state.version=(ulong) time((time_t*) 0);
 
2751
 
 
2752
    /*
 
2753
      Exchange the data file descriptor of the table, so that we use the
 
2754
      new file from now on.
 
2755
     */
 
2756
    my_close(info->dfile,MYF(0));
 
2757
    info->dfile=new_file;
 
2758
 
 
2759
    share->data_file_type=sort_info.new_data_file_type;
 
2760
    share->pack.header_length=(ulong) new_header_length;
 
2761
  }
 
2762
  else
 
2763
    info->state->data_file_length=sort_param->max_pos;
 
2764
 
 
2765
  if (rep_quick && del+sort_info.dupp != info->state->del)
 
2766
  {
 
2767
    mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records");
 
2768
    mi_check_print_error(param,"Run recovery again without -q");
 
2769
    param->retry_repair=1;
 
2770
    param->testflag|=T_RETRY_WITHOUT_QUICK;
 
2771
    goto err;
 
2772
  }
 
2773
 
 
2774
  if (rep_quick & T_FORCE_UNIQUENESS)
 
2775
  {
 
2776
    my_off_t skr=info->state->data_file_length+
 
2777
      (share->options & HA_OPTION_COMPRESS_RECORD ?
 
2778
       MEMMAP_EXTRA_MARGIN : 0);
 
2779
#ifdef USE_RELOC
 
2780
    if (share->data_file_type == STATIC_RECORD &&
 
2781
        skr < share->base.reloc*share->base.min_pack_length)
 
2782
      skr=share->base.reloc*share->base.min_pack_length;
 
2783
#endif
 
2784
    if (skr != sort_info.filelength && !info->s->base.raid_type)
 
2785
      if (ftruncate(info->dfile, skr))
 
2786
        mi_check_print_warning(param,
 
2787
                               "Can't change size of datafile,  error: %d",
 
2788
                               my_errno);
 
2789
  }
 
2790
  if (param->testflag & T_CALC_CHECKSUM)
 
2791
    info->state->checksum=param->glob_crc;
 
2792
 
 
2793
  if (ftruncate(share->kfile, info->state->key_file_length))
 
2794
    mi_check_print_warning(param,
 
2795
                           "Can't change size of indexfile, error: %d", my_errno);
 
2796
 
 
2797
  if (!(param->testflag & T_SILENT))
 
2798
  {
 
2799
    if (start_records != info->state->records)
 
2800
      printf("Data records: %s\n", llstr(info->state->records,llbuff));
 
2801
    if (sort_info.dupp)
 
2802
      mi_check_print_warning(param,
 
2803
                             "%s records have been removed",
 
2804
                             llstr(sort_info.dupp,llbuff));
 
2805
  }
 
2806
  got_error=0;
 
2807
 
 
2808
  if (&share->state.state != info->state)
 
2809
    memcpy(&share->state.state, info->state, sizeof(*info->state));
 
2810
 
 
2811
err:
 
2812
  got_error|= flush_blocks(param, share->key_cache, share->kfile);
 
2813
  /*
 
2814
    Destroy the write cache. The master thread did already detach from
 
2815
    the share by remove_io_thread() or it was not yet started (if the
 
2816
    error happend before creating the thread).
 
2817
  */
 
2818
  end_io_cache(&info->rec_cache);
 
2819
  /*
 
2820
    Destroy the new data cache in case of non-quick repair. All slave
 
2821
    threads did either detach from the share by remove_io_thread()
 
2822
    already or they were not yet started (if the error happend before
 
2823
    creating the threads).
 
2824
  */
 
2825
  if (!rep_quick)
 
2826
    end_io_cache(&new_data_cache);
 
2827
  if (!got_error)
 
2828
  {
 
2829
    /* Replace the actual file with the temporary file */
 
2830
    if (new_file >= 0)
 
2831
    {
 
2832
      my_close(new_file,MYF(0));
 
2833
      info->dfile=new_file= -1;
 
2834
      if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
 
2835
                            DATA_TMP_EXT, share->base.raid_chunks,
 
2836
                            (param->testflag & T_BACKUP_DATA ?
 
2837
                             MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
 
2838
          mi_open_datafile(info,share,-1))
 
2839
        got_error=1;
 
2840
    }
 
2841
  }
 
2842
  if (got_error)
 
2843
  {
 
2844
    if (! param->error_printed)
 
2845
      mi_check_print_error(param,"%d when fixing table",my_errno);
 
2846
    if (new_file >= 0)
 
2847
    {
 
2848
      my_close(new_file,MYF(0));
 
2849
      my_delete(param->temp_filename, MYF(MY_WME));
 
2850
      if (info->dfile == new_file)
 
2851
        info->dfile= -1;
 
2852
    }
 
2853
    mi_mark_crashed_on_repair(info);
 
2854
  }
 
2855
  else if (key_map == share->state.key_map)
 
2856
    share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS;
 
2857
  share->state.changed|=STATE_NOT_SORTED_PAGES;
 
2858
 
 
2859
  pthread_cond_destroy (&sort_info.cond);
 
2860
  pthread_mutex_destroy(&sort_info.mutex);
 
2861
 
 
2862
  free((unsigned char*) sort_info.key_block);
 
2863
  free((unsigned char*) sort_param);
 
2864
  free(sort_info.buff);
 
2865
  end_io_cache(&param->read_cache);
2334
2866
  info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2335
2867
  if (!got_error && (param->testflag & T_UNPACK))
2336
2868
  {
2361
2893
    (info->s->rec_reflength+
2362
2894
     _mi_make_key(info, sort_param->key, (unsigned char*) key,
2363
2895
                  sort_param->record, sort_param->filepos));
2364
 
#ifdef HAVE_VALGRIND
 
2896
#ifdef HAVE_purify
2365
2897
  memset((unsigned char *)key+sort_param->real_key_length, 0,
2366
2898
         (sort_param->key_length-sort_param->real_key_length));
2367
2899
#endif
2690
3222
        {
2691
3223
          mi_check_print_info(param,
2692
3224
                              "Read error for block at: %s (error: %d); Skipped",
2693
 
                              llstr(block_info.filepos,llbuff),errno);
 
3225
                              llstr(block_info.filepos,llbuff),my_errno);
2694
3226
          goto try_next;
2695
3227
        }
2696
3228
        left_length-=block_info.data_len;
2783
3315
      if (my_b_write(&info->rec_cache,sort_param->record,
2784
3316
                     share->base.pack_reclength))
2785
3317
      {
2786
 
        mi_check_print_error(param,"%d when writing to datafile",errno);
 
3318
        mi_check_print_error(param,"%d when writing to datafile",my_errno);
2787
3319
        return(1);
2788
3320
      }
2789
3321
      sort_param->filepos+=share->base.pack_reclength;
2837
3369
                                  sort_param->filepos+block_length,
2838
3370
                                  &from,&reclength,&flag))
2839
3371
        {
2840
 
          mi_check_print_error(param,"%d when writing to datafile",errno);
 
3372
          mi_check_print_error(param,"%d when writing to datafile",my_errno);
2841
3373
          return(1);
2842
3374
        }
2843
3375
        sort_param->filepos+=block_length;
3152
3684
         (my_off_t) info->s->base.max_data_file_length;
3153
3685
}
3154
3686
 
 
3687
        /* Recreate table with bigger more alloced record-data */
 
3688
 
 
3689
int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename)
 
3690
{
 
3691
  int error;
 
3692
  MI_INFO info;
 
3693
  MYISAM_SHARE share;
 
3694
  MI_KEYDEF *keyinfo,*key,*key_end;
 
3695
  HA_KEYSEG *keysegs,*keyseg;
 
3696
  MI_COLUMNDEF *recdef,*rec,*end;
 
3697
  MI_UNIQUEDEF *uniquedef,*u_ptr,*u_end;
 
3698
  MI_STATUS_INFO status_info;
 
3699
  uint32_t unpack,key_parts;
 
3700
  ha_rows max_records;
 
3701
  uint64_t file_length,tmp_length;
 
3702
  MI_CREATE_INFO create_info;
 
3703
 
 
3704
  error=1;                                      /* Default error */
 
3705
  info= **org_info;
 
3706
  status_info= (*org_info)->state[0];
 
3707
  info.state= &status_info;
 
3708
  share= *(*org_info)->s;
 
3709
  unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
 
3710
    (param->testflag & T_UNPACK);
 
3711
  if (!(keyinfo=(MI_KEYDEF*) malloc(sizeof(MI_KEYDEF)*share.base.keys)))
 
3712
    return(0);
 
3713
  memcpy(keyinfo,share.keyinfo,sizeof(MI_KEYDEF)*share.base.keys);
 
3714
 
 
3715
  key_parts= share.base.all_key_parts;
 
3716
  if (!(keysegs=(HA_KEYSEG*) malloc(sizeof(HA_KEYSEG)*
 
3717
                                    (key_parts+share.base.keys))))
 
3718
  {
 
3719
    free(keyinfo);
 
3720
    return(1);
 
3721
  }
 
3722
  if (!(recdef=(MI_COLUMNDEF*)
 
3723
        malloc(sizeof(MI_COLUMNDEF)*(share.base.fields+1))))
 
3724
  {
 
3725
    free(keyinfo);
 
3726
    free(keysegs);
 
3727
    return(1);
 
3728
  }
 
3729
  if (!(uniquedef=(MI_UNIQUEDEF*)
 
3730
        malloc(sizeof(MI_UNIQUEDEF)*(share.state.header.uniques+1))))
 
3731
  {
 
3732
    free(recdef);
 
3733
    free(keyinfo);
 
3734
    free(keysegs);
 
3735
    return(1);
 
3736
  }
 
3737
 
 
3738
  /* Copy the column definitions */
 
3739
  memcpy(recdef, share.rec, sizeof(MI_COLUMNDEF)*(share.base.fields+1));
 
3740
  for (rec=recdef,end=recdef+share.base.fields; rec != end ; rec++)
 
3741
  {
 
3742
    if (unpack && !(share.options & HA_OPTION_PACK_RECORD) &&
 
3743
        rec->type != FIELD_BLOB &&
 
3744
        rec->type != FIELD_VARCHAR &&
 
3745
        rec->type != FIELD_CHECK)
 
3746
      rec->type=(int) FIELD_NORMAL;
 
3747
  }
 
3748
 
 
3749
  /* Change the new key to point at the saved key segments */
 
3750
  memcpy(keysegs,share.keyparts,
 
3751
         sizeof(HA_KEYSEG)*(key_parts+share.base.keys+
 
3752
                            share.state.header.uniques));
 
3753
  keyseg=keysegs;
 
3754
  for (key=keyinfo,key_end=keyinfo+share.base.keys; key != key_end ; key++)
 
3755
  {
 
3756
    key->seg=keyseg;
 
3757
    for (; keyseg->type ; keyseg++)
 
3758
    {
 
3759
      if (param->language)
 
3760
        keyseg->language=param->language;       /* change language */
 
3761
    }
 
3762
    keyseg++;                                   /* Skip end pointer */
 
3763
  }
 
3764
 
 
3765
  /* Copy the unique definitions and change them to point at the new key
 
3766
     segments*/
 
3767
  memcpy(uniquedef,share.uniqueinfo,
 
3768
         sizeof(MI_UNIQUEDEF)*(share.state.header.uniques));
 
3769
  for (u_ptr=uniquedef,u_end=uniquedef+share.state.header.uniques;
 
3770
       u_ptr != u_end ; u_ptr++)
 
3771
  {
 
3772
    u_ptr->seg=keyseg;
 
3773
    keyseg+=u_ptr->keysegs+1;
 
3774
  }
 
3775
  if (share.options & HA_OPTION_COMPRESS_RECORD)
 
3776
    share.base.records=max_records=info.state->records;
 
3777
  else if (share.base.min_pack_length)
 
3778
    max_records=(ha_rows) (lseek(info.dfile,0L,SEEK_END) /
 
3779
                           (ulong) share.base.min_pack_length);
 
3780
  else
 
3781
    max_records=0;
 
3782
  unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
 
3783
    (param->testflag & T_UNPACK);
 
3784
  share.options&= ~HA_OPTION_TEMP_COMPRESS_RECORD;
 
3785
 
 
3786
  file_length=(uint64_t) lseek(info.dfile,0L,SEEK_END);
 
3787
  tmp_length= file_length+file_length/10;
 
3788
  set_if_bigger(file_length,param->max_data_file_length);
 
3789
  set_if_bigger(file_length,tmp_length);
 
3790
  set_if_bigger(file_length,(uint64_t) share.base.max_data_file_length);
 
3791
 
 
3792
  mi_close(*org_info);
 
3793
  memset(&create_info, 0, sizeof(create_info));
 
3794
  create_info.max_rows=max(max_records,share.base.records);
 
3795
  create_info.reloc_rows=share.base.reloc;
 
3796
  create_info.old_options=(share.options |
 
3797
                           (unpack ? HA_OPTION_TEMP_COMPRESS_RECORD : 0));
 
3798
 
 
3799
  create_info.data_file_length=file_length;
 
3800
  create_info.auto_increment=share.state.auto_increment;
 
3801
  create_info.language = (param->language ? param->language :
 
3802
                          share.state.header.language);
 
3803
  create_info.key_file_length=  status_info.key_file_length;
 
3804
  /*
 
3805
    Allow for creating an auto_increment key. This has an effect only if
 
3806
    an auto_increment key exists in the original table.
 
3807
  */
 
3808
  create_info.with_auto_increment= true;
 
3809
  /* We don't have to handle symlinks here because we are using
 
3810
     HA_DONT_TOUCH_DATA */
 
3811
  if (mi_create(filename,
 
3812
                share.base.keys - share.state.header.uniques,
 
3813
                keyinfo, share.base.fields, recdef,
 
3814
                share.state.header.uniques, uniquedef,
 
3815
                &create_info,
 
3816
                HA_DONT_TOUCH_DATA))
 
3817
  {
 
3818
    mi_check_print_error(param,"Got error %d when trying to recreate indexfile",my_errno);
 
3819
    goto end;
 
3820
  }
 
3821
  *org_info=mi_open(filename,O_RDWR,
 
3822
                    (param->testflag & T_WAIT_FOREVER) ? HA_OPEN_WAIT_IF_LOCKED :
 
3823
                    (param->testflag & T_DESCRIPT) ? HA_OPEN_IGNORE_IF_LOCKED :
 
3824
                    HA_OPEN_ABORT_IF_LOCKED);
 
3825
  if (!*org_info)
 
3826
  {
 
3827
    mi_check_print_error(param,"Got error %d when trying to open re-created indexfile",
 
3828
                my_errno);
 
3829
    goto end;
 
3830
  }
 
3831
  /* We are modifing */
 
3832
  (*org_info)->s->options&= ~HA_OPTION_READ_ONLY_DATA;
 
3833
  _mi_readinfo(*org_info,F_WRLCK,0);
 
3834
  (*org_info)->state->records=info.state->records;
 
3835
  if (share.state.create_time)
 
3836
    (*org_info)->s->state.create_time=share.state.create_time;
 
3837
  (*org_info)->s->state.unique=(*org_info)->this_unique=
 
3838
    share.state.unique;
 
3839
  (*org_info)->state->checksum=info.state->checksum;
 
3840
  (*org_info)->state->del=info.state->del;
 
3841
  (*org_info)->s->state.dellink=share.state.dellink;
 
3842
  (*org_info)->state->empty=info.state->empty;
 
3843
  (*org_info)->state->data_file_length=info.state->data_file_length;
 
3844
  if (update_state_info(param,*org_info,UPDATE_TIME | UPDATE_STAT |
 
3845
                        UPDATE_OPEN_COUNT))
 
3846
    goto end;
 
3847
  error=0;
 
3848
end:
 
3849
  free(uniquedef);
 
3850
  free(keyinfo);
 
3851
  free(recdef);
 
3852
  free(keysegs);
 
3853
  return(error);
 
3854
}
 
3855
 
3155
3856
 
3156
3857
        /* write suffix to data file if neaded */
3157
3858
 
3166
3867
    if (my_b_write(&info->rec_cache,buff,sizeof(buff)))
3167
3868
    {
3168
3869
      mi_check_print_error(sort_info->param,
3169
 
                           "%d when writing to datafile",errno);
 
3870
                           "%d when writing to datafile",my_errno);
3170
3871
      return 1;
3171
3872
    }
3172
3873
    sort_info->param->read_cache.end_of_file+=sizeof(buff);
3231
3932
      return 0;
3232
3933
  }
3233
3934
err:
3234
 
  mi_check_print_error(param,"%d when updating keyfile",errno);
 
3935
  mi_check_print_error(param,"%d when updating keyfile",my_errno);
3235
3936
  return 1;
3236
3937
}
3237
3938
 
3269
3970
    We have to use an allocated buffer instead of info->rec_buff as
3270
3971
    _mi_put_key_in_record() may use info->rec_buff
3271
3972
  */
3272
 
  if (!mi_alloc_rec_buff(info, SIZE_MAX, &record))
 
3973
  if (!mi_alloc_rec_buff(info, -1, &record))
3273
3974
  {
3274
3975
    mi_check_print_error(param,"Not enough memory for extra record");
3275
3976
    return;
3278
3979
  mi_extra(info,HA_EXTRA_KEYREAD,0);
3279
3980
  if (mi_rlast(info, record, info->s->base.auto_key-1))
3280
3981
  {
3281
 
    if (errno != HA_ERR_END_OF_FILE)
 
3982
    if (my_errno != HA_ERR_END_OF_FILE)
3282
3983
    {
3283
3984
      mi_extra(info,HA_EXTRA_NO_KEYREAD,0);
3284
3985
      free(mi_get_rec_buff_ptr(info, record));
3285
 
      mi_check_print_error(param,"%d when reading last record",errno);
 
3986
      mi_check_print_error(param,"%d when reading last record",my_errno);
3286
3987
      return;
3287
3988
    }
3288
3989
    if (!repair_only)
3412
4113
  uint32_t key_maxlength=key->maxlength;
3413
4114
  return (key->flag & (HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY) &&
3414
4115
          ((uint64_t) rows * key_maxlength >
3415
 
           (uint64_t) MAX_FILE_SIZE));
 
4116
           (uint64_t) myisam_max_temp_length));
3416
4117
}
3417
4118
 
3418
4119
/*