2331
2346
free((unsigned char*) sort_info.key_block);
2332
2347
free(sort_info.buff);
2333
param->read_cache.end_io_cache();
2348
end_io_cache(¶m->read_cache);
2349
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2350
if (!got_error && (param->testflag & T_UNPACK))
2352
share->state.header.options[0]&= (unsigned char) ~HA_OPTION_COMPRESS_RECORD;
2353
share->pack.header_length=0;
2359
Threaded repair of table using sorting
2362
mi_repair_parallel()
2363
param Repair parameters
2364
info MyISAM handler to repair
2365
name Name of table (for warnings)
2366
rep_quick set to <> 0 if we should not change data file
2369
Same as mi_repair_by_sort but do it multithreaded
2370
Each key is handled by a separate thread.
2371
TODO: make a number of threads a parameter
2373
In parallel repair we use one thread per index. There are two modes:
2377
Only the indexes are rebuilt. All threads share a read buffer.
2378
Every thread that needs fresh data in the buffer enters the shared
2379
cache lock. The last thread joining the lock reads the buffer from
2380
the data file and wakes all other threads.
2384
The data file is rebuilt and all indexes are rebuilt to point to
2385
the new record positions. One thread is the master thread. It
2386
reads from the old data file and writes to the new data file. It
2387
also creates one of the indexes. The other threads read from a
2388
buffer which is filled by the master. If they need fresh data,
2389
they enter the shared cache lock. If the masters write buffer is
2390
full, it flushes it to the new data file and enters the shared
2391
cache lock too. When all threads joined in the lock, the master
2392
copies its write buffer to the read buffer for the other threads
2400
int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
2401
const char * name, int rep_quick)
2404
uint32_t i,key, total_key_length, istep;
2406
ha_rows start_records;
2407
my_off_t new_header_length,del;
2409
MI_SORT_PARAM *sort_param=0;
2410
MYISAM_SHARE *share=info->s;
2411
ulong *rec_per_key_part;
2414
IO_CACHE new_data_cache; /* For non-quick repair. */
2415
IO_CACHE_SHARE io_share;
2416
SORT_INFO sort_info;
2417
uint64_t key_map= 0;
2418
pthread_attr_t thr_attr;
2419
ulong max_pack_reclength;
2421
start_records=info->state->records;
2424
new_header_length=(param->testflag & T_UNPACK) ? 0 :
2425
share->pack.header_length;
2426
if (!(param->testflag & T_SILENT))
2428
printf("- parallel recovering (with sort) MyISAM-table '%s'\n",name);
2429
printf("Data records: %s\n", llstr(start_records,llbuff));
2431
param->testflag|=T_REP; /* for easy checking */
2433
if (info->s->options & (HA_OPTION_COMPRESS_RECORD))
2434
param->testflag|=T_CALC_CHECKSUM;
2437
Quick repair (not touching data file, rebuilding indexes):
2439
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
2442
Non-quick repair (rebuilding data file and indexes):
2446
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
2447
Write cache is (MI_INFO *info)->rec_cache using new_file.
2451
Read cache is new_data_cache synced to master rec_cache.
2453
The final assignment of the filedescriptor for rec_cache is done
2454
after the cache creation.
2456
Don't check file size on new_data_cache, as the resulting file size
2459
As rec_cache and new_data_cache are synced, write_buffer_length is
2460
used for the read cache 'new_data_cache'. Both start at the same
2461
position 'new_header_length'.
2464
memset(&sort_info, 0, sizeof(sort_info));
2465
/* Initialize pthread structures before goto err. */
2466
pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST);
2467
pthread_cond_init(&sort_info.cond, 0);
2469
if (!(sort_info.key_block=
2470
alloc_key_blocks(param, (uint) param->sort_key_blocks,
2471
share->base.max_key_block_length)) ||
2472
init_io_cache(¶m->read_cache, info->dfile,
2473
(uint) param->read_buffer_length,
2474
READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) ||
2476
(init_io_cache(&info->rec_cache, info->dfile,
2477
(uint) param->write_buffer_length,
2478
WRITE_CACHE, new_header_length, 1,
2479
MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) ||
2480
init_io_cache(&new_data_cache, -1,
2481
(uint) param->write_buffer_length,
2482
READ_CACHE, new_header_length, 1,
2483
MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))))
2485
sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
2486
info->opt_flag|=WRITE_CACHE_USED;
2487
info->rec_cache.file=info->dfile; /* for sort_delete_record */
2491
/* Get real path for data file */
2492
if ((new_file=my_create(fn_format(param->temp_filename,
2493
share->data_file_name, "",
2496
0,param->tmpfile_createflag,
2499
mi_check_print_error(param,"Can't create new tempfile: '%s'",
2500
param->temp_filename);
2503
if (new_header_length &&
2504
filecopy(param, new_file,info->dfile,0L,new_header_length,
2507
if (param->testflag & T_UNPACK)
2509
share->options&= ~HA_OPTION_COMPRESS_RECORD;
2510
mi_int2store(share->state.header.options,share->options);
2512
share->state.dellink= HA_OFFSET_ERROR;
2513
info->rec_cache.file=new_file;
2516
info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
2518
/* Optionally drop indexes and optionally modify the key_map. */
2519
mi_drop_all_indexes(param, info, false);
2520
key_map= share->state.key_map;
2521
if (param->testflag & T_CREATE_MISSING_KEYS)
2523
/* Invert the copied key_map to recreate all disabled indexes. */
2527
sort_info.info=info;
2528
sort_info.param = param;
2530
set_data_file_type(&sort_info, share);
2533
param->read_cache.end_of_file=sort_info.filelength=
2534
lseek(param->read_cache.file,0L,SEEK_END);
2536
if (share->data_file_type == DYNAMIC_RECORD)
2537
rec_length=max(share->base.min_pack_length+1,share->base.min_block_length);
2538
else if (share->data_file_type == COMPRESSED_RECORD)
2539
rec_length=share->base.min_block_length;
2541
rec_length=share->base.pack_reclength;
2543
+1 below is required hack for parallel repair mode.
2544
The info->state->records value, that is compared later
2545
to sort_info.max_records and cannot exceed it, is
2546
increased in sort_key_write. In mi_repair_by_sort, sort_key_write
2547
is called after sort_key_read, where the comparison is performed,
2548
but in parallel mode master thread can call sort_key_write
2549
before some other repair thread calls sort_key_read.
2550
Furthermore I'm not even sure +1 would be enough.
2551
May be sort_info.max_records shold be always set to max value in
2554
sort_info.max_records=
2555
((param->testflag & T_CREATE_MISSING_KEYS) ? info->state->records + 1:
2556
(ha_rows) (sort_info.filelength/rec_length+1));
2558
del=info->state->del;
2560
/* for compressed tables */
2561
max_pack_reclength= share->base.pack_reclength;
2562
if (share->options & HA_OPTION_COMPRESS_RECORD)
2563
set_if_bigger(max_pack_reclength, share->max_pack_length);
2564
if (!(sort_param=(MI_SORT_PARAM *)
2565
malloc(share->base.keys *
2566
(sizeof(MI_SORT_PARAM) + max_pack_reclength))))
2568
mi_check_print_error(param,"Not enough memory for key!");
2571
memset(sort_param, 0, share->base.keys *
2572
(sizeof(MI_SORT_PARAM) + max_pack_reclength));
2574
rec_per_key_part= param->rec_per_key_part;
2575
info->state->records=info->state->del=share->state.split=0;
2576
info->state->empty=0;
2578
for (i=key=0, istep=1 ; key < share->base.keys ;
2579
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
2581
sort_param[i].key=key;
2582
sort_param[i].keyinfo=share->keyinfo+key;
2583
sort_param[i].seg=sort_param[i].keyinfo->seg;
2585
Skip this index if it is marked disabled in the copied
2586
(and possibly inverted) key_map.
2588
if (! mi_is_key_active(key_map, key))
2590
/* Remember old statistics for key */
2591
assert(rec_per_key_part >= param->rec_per_key_part);
2592
memcpy(rec_per_key_part,
2593
(share->state.rec_per_key_part +
2594
(rec_per_key_part - param->rec_per_key_part)),
2595
sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
2600
if ((!(param->testflag & T_SILENT)))
2601
printf ("- Fixing index %d\n",key+1);
2603
sort_param[i].key_read=sort_key_read;
2604
sort_param[i].key_write=sort_key_write;
2606
sort_param[i].key_cmp=sort_key_cmp;
2607
sort_param[i].lock_in_memory=lock_memory;
2608
sort_param[i].sort_info=&sort_info;
2609
sort_param[i].master=0;
2610
sort_param[i].fix_datafile=0;
2611
sort_param[i].calc_checksum= 0;
2613
sort_param[i].filepos=new_header_length;
2614
sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length;
2616
sort_param[i].record= (((unsigned char *)(sort_param+share->base.keys))+
2617
(max_pack_reclength * i));
2618
if (!mi_alloc_rec_buff(info, -1, &sort_param[i].rec_buff))
2620
mi_check_print_error(param,"Not enough memory!");
2624
sort_param[i].key_length=share->rec_reflength;
2625
for (keyseg=sort_param[i].seg; keyseg->type != HA_KEYTYPE_END;
2628
sort_param[i].key_length+=keyseg->length;
2629
if (keyseg->flag & HA_SPACE_PACK)
2630
sort_param[i].key_length+=get_pack_length(keyseg->length);
2631
if (keyseg->flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
2632
sort_param[i].key_length+=2 + test(keyseg->length >= 127);
2633
if (keyseg->flag & HA_NULL_PART)
2634
sort_param[i].key_length++;
2636
total_key_length+=sort_param[i].key_length;
2638
sort_info.total_keys=i;
2639
sort_param[0].master= 1;
2640
sort_param[0].fix_datafile= (bool)(! rep_quick);
2641
sort_param[0].calc_checksum= test(param->testflag & T_CALC_CHECKSUM);
2643
sort_info.got_error=0;
2644
pthread_mutex_lock(&sort_info.mutex);
2647
Initialize the I/O cache share for use with the read caches and, in
2648
case of non-quick repair, the write cache. When all threads join on
2649
the cache lock, the writer copies the write cache contents to the
2655
init_io_cache_share(¶m->read_cache, &io_share, NULL, i);
2657
init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
2660
io_share.total_threads= 0; /* share not used */
2662
(void) pthread_attr_init(&thr_attr);
2663
(void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
2665
for (i=0 ; i < sort_info.total_keys ; i++)
2668
Copy the properly initialized IO_CACHE structure so that every
2669
thread has its own copy. In quick mode param->read_cache is shared
2670
for use by all threads. In non-quick mode all threads but the
2671
first copy the shared new_data_cache, which is synchronized to the
2672
write cache of the first thread. The first thread copies
2673
param->read_cache, which is not shared.
2675
sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache :
2679
two approaches: the same amount of memory for each thread
2680
or the memory for the same number of keys for each thread...
2681
In the second one all the threads will fill their sort_buffers
2682
(and call write_keys) at the same time, putting more stress on i/o.
2684
sort_param[i].sortbuff_size=
2685
#ifndef USING_SECOND_APPROACH
2686
param->sort_buffer_length/sort_info.total_keys;
2688
param->sort_buffer_length*sort_param[i].key_length/total_key_length;
2690
if (pthread_create(&sort_param[i].thr, &thr_attr,
2692
(void *) (sort_param+i)))
2694
mi_check_print_error(param,"Cannot start a repair thread");
2695
/* Cleanup: Detach from the share. Avoid others to be blocked. */
2696
if (io_share.total_threads)
2697
remove_io_thread(&sort_param[i].read_cache);
2698
sort_info.got_error=1;
2701
sort_info.threads_running++;
2703
(void) pthread_attr_destroy(&thr_attr);
2705
/* waiting for all threads to finish */
2706
while (sort_info.threads_running)
2707
pthread_cond_wait(&sort_info.cond, &sort_info.mutex);
2708
pthread_mutex_unlock(&sort_info.mutex);
2710
if ((got_error= thr_write_keys(sort_param)))
2712
param->retry_repair=1;
2715
got_error=1; /* Assume the following may go wrong */
2717
if (sort_param[0].fix_datafile)
2720
Append some nuls to the end of a memory mapped file. Destroy the
2721
write cache. The master thread did already detach from the share
2722
by remove_io_thread() in sort.c:thr_find_all_keys().
2724
if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
2726
if (param->testflag & T_SAFE_REPAIR)
2728
/* Don't repair if we loosed more than one row */
2729
if (info->state->records+1 < start_records)
2731
info->state->records=start_records;
2735
share->state.state.data_file_length= info->state->data_file_length=
2736
sort_param->filepos;
2737
/* Only whole records */
2738
share->state.version=(ulong) time((time_t*) 0);
2741
Exchange the data file descriptor of the table, so that we use the
2742
new file from now on.
2744
my_close(info->dfile,MYF(0));
2745
info->dfile=new_file;
2747
share->data_file_type=sort_info.new_data_file_type;
2748
share->pack.header_length=(ulong) new_header_length;
2751
info->state->data_file_length=sort_param->max_pos;
2753
if (rep_quick && del+sort_info.dupp != info->state->del)
2755
mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records");
2756
mi_check_print_error(param,"Run recovery again without -q");
2757
param->retry_repair=1;
2758
param->testflag|=T_RETRY_WITHOUT_QUICK;
2762
if (rep_quick & T_FORCE_UNIQUENESS)
2764
my_off_t skr=info->state->data_file_length+
2765
(share->options & HA_OPTION_COMPRESS_RECORD ?
2766
MEMMAP_EXTRA_MARGIN : 0);
2768
if (share->data_file_type == STATIC_RECORD &&
2769
skr < share->base.reloc*share->base.min_pack_length)
2770
skr=share->base.reloc*share->base.min_pack_length;
2772
if (skr != sort_info.filelength && !info->s->base.raid_type)
2773
if (ftruncate(info->dfile, skr))
2774
mi_check_print_warning(param,
2775
"Can't change size of datafile, error: %d",
2778
if (param->testflag & T_CALC_CHECKSUM)
2779
info->state->checksum=param->glob_crc;
2781
if (ftruncate(share->kfile, info->state->key_file_length))
2782
mi_check_print_warning(param,
2783
"Can't change size of indexfile, error: %d", my_errno);
2785
if (!(param->testflag & T_SILENT))
2787
if (start_records != info->state->records)
2788
printf("Data records: %s\n", llstr(info->state->records,llbuff));
2790
mi_check_print_warning(param,
2791
"%s records have been removed",
2792
llstr(sort_info.dupp,llbuff));
2796
if (&share->state.state != info->state)
2797
memcpy(&share->state.state, info->state, sizeof(*info->state));
2800
got_error|= flush_blocks(param, share->key_cache, share->kfile);
2802
Destroy the write cache. The master thread did already detach from
2803
the share by remove_io_thread() or it was not yet started (if the
2804
error happend before creating the thread).
2806
end_io_cache(&info->rec_cache);
2808
Destroy the new data cache in case of non-quick repair. All slave
2809
threads did either detach from the share by remove_io_thread()
2810
already or they were not yet started (if the error happend before
2811
creating the threads).
2814
end_io_cache(&new_data_cache);
2817
/* Replace the actual file with the temporary file */
2820
my_close(new_file,MYF(0));
2821
info->dfile=new_file= -1;
2822
if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
2823
DATA_TMP_EXT, share->base.raid_chunks,
2824
(param->testflag & T_BACKUP_DATA ?
2825
MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
2826
mi_open_datafile(info,share,-1))
2832
if (! param->error_printed)
2833
mi_check_print_error(param,"%d when fixing table",my_errno);
2836
my_close(new_file,MYF(0));
2837
my_delete(param->temp_filename, MYF(MY_WME));
2838
if (info->dfile == new_file)
2841
mi_mark_crashed_on_repair(info);
2843
else if (key_map == share->state.key_map)
2844
share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS;
2845
share->state.changed|=STATE_NOT_SORTED_PAGES;
2847
pthread_cond_destroy (&sort_info.cond);
2848
pthread_mutex_destroy(&sort_info.mutex);
2850
free((unsigned char*) sort_info.key_block);
2851
free((unsigned char*) sort_param);
2852
free(sort_info.buff);
2853
end_io_cache(¶m->read_cache);
2334
2854
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2335
2855
if (!got_error && (param->testflag & T_UNPACK))
3152
3672
(my_off_t) info->s->base.max_data_file_length;
3675
/* Recreate table with bigger more alloced record-data */
3677
int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename)
3682
MI_KEYDEF *keyinfo,*key,*key_end;
3683
HA_KEYSEG *keysegs,*keyseg;
3684
MI_COLUMNDEF *recdef,*rec,*end;
3685
MI_UNIQUEDEF *uniquedef,*u_ptr,*u_end;
3686
MI_STATUS_INFO status_info;
3687
uint32_t unpack,key_parts;
3688
ha_rows max_records;
3689
uint64_t file_length,tmp_length;
3690
MI_CREATE_INFO create_info;
3692
error=1; /* Default error */
3694
status_info= (*org_info)->state[0];
3695
info.state= &status_info;
3696
share= *(*org_info)->s;
3697
unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
3698
(param->testflag & T_UNPACK);
3699
if (!(keyinfo=(MI_KEYDEF*) malloc(sizeof(MI_KEYDEF)*share.base.keys)))
3701
memcpy(keyinfo,share.keyinfo,sizeof(MI_KEYDEF)*share.base.keys);
3703
key_parts= share.base.all_key_parts;
3704
if (!(keysegs=(HA_KEYSEG*) malloc(sizeof(HA_KEYSEG)*
3705
(key_parts+share.base.keys))))
3710
if (!(recdef=(MI_COLUMNDEF*)
3711
malloc(sizeof(MI_COLUMNDEF)*(share.base.fields+1))))
3717
if (!(uniquedef=(MI_UNIQUEDEF*)
3718
malloc(sizeof(MI_UNIQUEDEF)*(share.state.header.uniques+1))))
3726
/* Copy the column definitions */
3727
memcpy(recdef, share.rec, sizeof(MI_COLUMNDEF)*(share.base.fields+1));
3728
for (rec=recdef,end=recdef+share.base.fields; rec != end ; rec++)
3730
if (unpack && !(share.options & HA_OPTION_PACK_RECORD) &&
3731
rec->type != FIELD_BLOB &&
3732
rec->type != FIELD_VARCHAR &&
3733
rec->type != FIELD_CHECK)
3734
rec->type=(int) FIELD_NORMAL;
3737
/* Change the new key to point at the saved key segments */
3738
memcpy(keysegs,share.keyparts,
3739
sizeof(HA_KEYSEG)*(key_parts+share.base.keys+
3740
share.state.header.uniques));
3742
for (key=keyinfo,key_end=keyinfo+share.base.keys; key != key_end ; key++)
3745
for (; keyseg->type ; keyseg++)
3747
if (param->language)
3748
keyseg->language=param->language; /* change language */
3750
keyseg++; /* Skip end pointer */
3753
/* Copy the unique definitions and change them to point at the new key
3755
memcpy(uniquedef,share.uniqueinfo,
3756
sizeof(MI_UNIQUEDEF)*(share.state.header.uniques));
3757
for (u_ptr=uniquedef,u_end=uniquedef+share.state.header.uniques;
3758
u_ptr != u_end ; u_ptr++)
3761
keyseg+=u_ptr->keysegs+1;
3763
if (share.options & HA_OPTION_COMPRESS_RECORD)
3764
share.base.records=max_records=info.state->records;
3765
else if (share.base.min_pack_length)
3766
max_records=(ha_rows) (lseek(info.dfile,0L,SEEK_END) /
3767
(ulong) share.base.min_pack_length);
3770
unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
3771
(param->testflag & T_UNPACK);
3772
share.options&= ~HA_OPTION_TEMP_COMPRESS_RECORD;
3774
file_length=(uint64_t) lseek(info.dfile,0L,SEEK_END);
3775
tmp_length= file_length+file_length/10;
3776
set_if_bigger(file_length,param->max_data_file_length);
3777
set_if_bigger(file_length,tmp_length);
3778
set_if_bigger(file_length,(uint64_t) share.base.max_data_file_length);
3780
mi_close(*org_info);
3781
memset(&create_info, 0, sizeof(create_info));
3782
create_info.max_rows=max(max_records,share.base.records);
3783
create_info.reloc_rows=share.base.reloc;
3784
create_info.old_options=(share.options |
3785
(unpack ? HA_OPTION_TEMP_COMPRESS_RECORD : 0));
3787
create_info.data_file_length=file_length;
3788
create_info.auto_increment=share.state.auto_increment;
3789
create_info.language = (param->language ? param->language :
3790
share.state.header.language);
3791
create_info.key_file_length= status_info.key_file_length;
3793
Allow for creating an auto_increment key. This has an effect only if
3794
an auto_increment key exists in the original table.
3796
create_info.with_auto_increment= true;
3797
/* We don't have to handle symlinks here because we are using
3798
HA_DONT_TOUCH_DATA */
3799
if (mi_create(filename,
3800
share.base.keys - share.state.header.uniques,
3801
keyinfo, share.base.fields, recdef,
3802
share.state.header.uniques, uniquedef,
3804
HA_DONT_TOUCH_DATA))
3806
mi_check_print_error(param,"Got error %d when trying to recreate indexfile",my_errno);
3809
*org_info=mi_open(filename,O_RDWR,
3810
(param->testflag & T_WAIT_FOREVER) ? HA_OPEN_WAIT_IF_LOCKED :
3811
(param->testflag & T_DESCRIPT) ? HA_OPEN_IGNORE_IF_LOCKED :
3812
HA_OPEN_ABORT_IF_LOCKED);
3815
mi_check_print_error(param,"Got error %d when trying to open re-created indexfile",
3819
/* We are modifing */
3820
(*org_info)->s->options&= ~HA_OPTION_READ_ONLY_DATA;
3821
_mi_readinfo(*org_info,F_WRLCK,0);
3822
(*org_info)->state->records=info.state->records;
3823
if (share.state.create_time)
3824
(*org_info)->s->state.create_time=share.state.create_time;
3825
(*org_info)->s->state.unique=(*org_info)->this_unique=
3827
(*org_info)->state->checksum=info.state->checksum;
3828
(*org_info)->state->del=info.state->del;
3829
(*org_info)->s->state.dellink=share.state.dellink;
3830
(*org_info)->state->empty=info.state->empty;
3831
(*org_info)->state->data_file_length=info.state->data_file_length;
3832
if (update_state_info(param,*org_info,UPDATE_TIME | UPDATE_STAT |
3156
3845
/* write suffix to data file if neaded */