2347
2348
return(got_error);
2351
Threaded repair of table using sorting
2354
mi_repair_parallel()
2355
param Repair parameters
2356
info MyISAM handler to repair
2357
name Name of table (for warnings)
2358
rep_quick set to <> 0 if we should not change data file
2361
Same as mi_repair_by_sort but do it multithreaded
2362
Each key is handled by a separate thread.
2363
TODO: make a number of threads a parameter
2365
In parallel repair we use one thread per index. There are two modes:
2369
Only the indexes are rebuilt. All threads share a read buffer.
2370
Every thread that needs fresh data in the buffer enters the shared
2371
cache lock. The last thread joining the lock reads the buffer from
2372
the data file and wakes all other threads.
2376
The data file is rebuilt and all indexes are rebuilt to point to
2377
the new record positions. One thread is the master thread. It
2378
reads from the old data file and writes to the new data file. It
2379
also creates one of the indexes. The other threads read from a
2380
buffer which is filled by the master. If they need fresh data,
2381
they enter the shared cache lock. If the masters write buffer is
2382
full, it flushes it to the new data file and enters the shared
2383
cache lock too. When all threads joined in the lock, the master
2384
copies its write buffer to the read buffer for the other threads
2392
int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
2393
const char * name, int rep_quick)
2396
uint32_t i,key, total_key_length, istep;
2398
ha_rows start_records;
2399
my_off_t new_header_length,del;
2401
MI_SORT_PARAM *sort_param=0;
2402
MYISAM_SHARE *share=info->s;
2403
ulong *rec_per_key_part;
2406
IO_CACHE new_data_cache; /* For non-quick repair. */
2407
IO_CACHE_SHARE io_share;
2408
SORT_INFO sort_info;
2409
uint64_t key_map= 0;
2410
pthread_attr_t thr_attr;
2411
ulong max_pack_reclength;
2413
start_records=info->state->records;
2416
new_header_length=(param->testflag & T_UNPACK) ? 0 :
2417
share->pack.header_length;
2418
if (!(param->testflag & T_SILENT))
2420
printf("- parallel recovering (with sort) MyISAM-table '%s'\n",name);
2421
printf("Data records: %s\n", llstr(start_records,llbuff));
2423
param->testflag|=T_REP; /* for easy checking */
2425
if (info->s->options & (HA_OPTION_COMPRESS_RECORD))
2426
param->testflag|=T_CALC_CHECKSUM;
2429
Quick repair (not touching data file, rebuilding indexes):
2431
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
2434
Non-quick repair (rebuilding data file and indexes):
2438
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
2439
Write cache is (MI_INFO *info)->rec_cache using new_file.
2443
Read cache is new_data_cache synced to master rec_cache.
2445
The final assignment of the filedescriptor for rec_cache is done
2446
after the cache creation.
2448
Don't check file size on new_data_cache, as the resulting file size
2451
As rec_cache and new_data_cache are synced, write_buffer_length is
2452
used for the read cache 'new_data_cache'. Both start at the same
2453
position 'new_header_length'.
2456
memset(&sort_info, 0, sizeof(sort_info));
2457
/* Initialize pthread structures before goto err. */
2458
pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST);
2459
pthread_cond_init(&sort_info.cond, 0);
2461
if (!(sort_info.key_block=
2462
alloc_key_blocks(param, (uint) param->sort_key_blocks,
2463
share->base.max_key_block_length)) ||
2464
init_io_cache(¶m->read_cache, info->dfile,
2465
(uint) param->read_buffer_length,
2466
READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) ||
2468
(init_io_cache(&info->rec_cache, info->dfile,
2469
(uint) param->write_buffer_length,
2470
WRITE_CACHE, new_header_length, 1,
2471
MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) ||
2472
init_io_cache(&new_data_cache, -1,
2473
(uint) param->write_buffer_length,
2474
READ_CACHE, new_header_length, 1,
2475
MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))))
2477
sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
2478
info->opt_flag|=WRITE_CACHE_USED;
2479
info->rec_cache.file=info->dfile; /* for sort_delete_record */
2483
/* Get real path for data file */
2484
if ((new_file=my_create(internal::fn_format(param->temp_filename,
2485
share->data_file_name, "",
2488
0,param->tmpfile_createflag,
2491
mi_check_print_error(param,"Can't create new tempfile: '%s'",
2492
param->temp_filename);
2495
if (new_header_length &&
2496
filecopy(param, new_file,info->dfile,0L,new_header_length,
2499
if (param->testflag & T_UNPACK)
2501
share->options&= ~HA_OPTION_COMPRESS_RECORD;
2502
mi_int2store(share->state.header.options,share->options);
2504
share->state.dellink= HA_OFFSET_ERROR;
2505
info->rec_cache.file=new_file;
2508
info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
2510
/* Optionally drop indexes and optionally modify the key_map. */
2511
mi_drop_all_indexes(param, info, false);
2512
key_map= share->state.key_map;
2513
if (param->testflag & T_CREATE_MISSING_KEYS)
2515
/* Invert the copied key_map to recreate all disabled indexes. */
2519
sort_info.info=info;
2520
sort_info.param = param;
2522
set_data_file_type(&sort_info, share);
2525
param->read_cache.end_of_file=sort_info.filelength=
2526
lseek(param->read_cache.file,0L,SEEK_END);
2528
if (share->data_file_type == DYNAMIC_RECORD)
2529
rec_length=max(share->base.min_pack_length+1,share->base.min_block_length);
2530
else if (share->data_file_type == COMPRESSED_RECORD)
2531
rec_length=share->base.min_block_length;
2533
rec_length=share->base.pack_reclength;
2535
+1 below is required hack for parallel repair mode.
2536
The info->state->records value, that is compared later
2537
to sort_info.max_records and cannot exceed it, is
2538
increased in sort_key_write. In mi_repair_by_sort, sort_key_write
2539
is called after sort_key_read, where the comparison is performed,
2540
but in parallel mode master thread can call sort_key_write
2541
before some other repair thread calls sort_key_read.
2542
Furthermore I'm not even sure +1 would be enough.
2543
May be sort_info.max_records shold be always set to max value in
2546
sort_info.max_records=
2547
((param->testflag & T_CREATE_MISSING_KEYS) ? info->state->records + 1:
2548
(ha_rows) (sort_info.filelength/rec_length+1));
2550
del=info->state->del;
2552
/* for compressed tables */
2553
max_pack_reclength= share->base.pack_reclength;
2554
if (share->options & HA_OPTION_COMPRESS_RECORD)
2555
set_if_bigger(max_pack_reclength, share->max_pack_length);
2556
if (!(sort_param=(MI_SORT_PARAM *)
2557
malloc(share->base.keys *
2558
(sizeof(MI_SORT_PARAM) + max_pack_reclength))))
2560
mi_check_print_error(param,"Not enough memory for key!");
2563
memset(sort_param, 0, share->base.keys *
2564
(sizeof(MI_SORT_PARAM) + max_pack_reclength));
2566
rec_per_key_part= param->rec_per_key_part;
2567
info->state->records=info->state->del=share->state.split=0;
2568
info->state->empty=0;
2570
for (i=key=0, istep=1 ; key < share->base.keys ;
2571
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
2573
sort_param[i].key=key;
2574
sort_param[i].keyinfo=share->keyinfo+key;
2575
sort_param[i].seg=sort_param[i].keyinfo->seg;
2577
Skip this index if it is marked disabled in the copied
2578
(and possibly inverted) key_map.
2580
if (! mi_is_key_active(key_map, key))
2582
/* Remember old statistics for key */
2583
assert(rec_per_key_part >= param->rec_per_key_part);
2584
memcpy(rec_per_key_part,
2585
(share->state.rec_per_key_part +
2586
(rec_per_key_part - param->rec_per_key_part)),
2587
sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
2592
if ((!(param->testflag & T_SILENT)))
2593
printf ("- Fixing index %d\n",key+1);
2595
sort_param[i].key_read=sort_key_read;
2596
sort_param[i].key_write=sort_key_write;
2598
sort_param[i].key_cmp=sort_key_cmp;
2599
sort_param[i].lock_in_memory=lock_memory;
2600
sort_param[i].sort_info=&sort_info;
2601
sort_param[i].master=0;
2602
sort_param[i].fix_datafile=0;
2603
sort_param[i].calc_checksum= 0;
2605
sort_param[i].filepos=new_header_length;
2606
sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length;
2608
sort_param[i].record= (((unsigned char *)(sort_param+share->base.keys))+
2609
(max_pack_reclength * i));
2610
if (!mi_alloc_rec_buff(info, -1, &sort_param[i].rec_buff))
2612
mi_check_print_error(param,"Not enough memory!");
2616
sort_param[i].key_length=share->rec_reflength;
2617
for (keyseg=sort_param[i].seg; keyseg->type != HA_KEYTYPE_END;
2620
sort_param[i].key_length+=keyseg->length;
2621
if (keyseg->flag & HA_SPACE_PACK)
2622
sort_param[i].key_length+=get_pack_length(keyseg->length);
2623
if (keyseg->flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
2624
sort_param[i].key_length+=2 + test(keyseg->length >= 127);
2625
if (keyseg->flag & HA_NULL_PART)
2626
sort_param[i].key_length++;
2628
total_key_length+=sort_param[i].key_length;
2630
sort_info.total_keys=i;
2631
sort_param[0].master= 1;
2632
sort_param[0].fix_datafile= (bool)(! rep_quick);
2633
sort_param[0].calc_checksum= test(param->testflag & T_CALC_CHECKSUM);
2635
sort_info.got_error=0;
2636
pthread_mutex_lock(&sort_info.mutex);
2639
Initialize the I/O cache share for use with the read caches and, in
2640
case of non-quick repair, the write cache. When all threads join on
2641
the cache lock, the writer copies the write cache contents to the
2647
init_io_cache_share(¶m->read_cache, &io_share, NULL, i);
2649
init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
2652
io_share.total_threads= 0; /* share not used */
2654
(void) pthread_attr_init(&thr_attr);
2655
(void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
2657
for (i=0 ; i < sort_info.total_keys ; i++)
2660
Copy the properly initialized IO_CACHE structure so that every
2661
thread has its own copy. In quick mode param->read_cache is shared
2662
for use by all threads. In non-quick mode all threads but the
2663
first copy the shared new_data_cache, which is synchronized to the
2664
write cache of the first thread. The first thread copies
2665
param->read_cache, which is not shared.
2667
sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache :
2671
two approaches: the same amount of memory for each thread
2672
or the memory for the same number of keys for each thread...
2673
In the second one all the threads will fill their sort_buffers
2674
(and call write_keys) at the same time, putting more stress on i/o.
2676
sort_param[i].sortbuff_size=
2677
#ifndef USING_SECOND_APPROACH
2678
param->sort_buffer_length/sort_info.total_keys;
2680
param->sort_buffer_length*sort_param[i].key_length/total_key_length;
2682
if (pthread_create(&sort_param[i].thr, &thr_attr,
2684
(void *) (sort_param+i)))
2686
mi_check_print_error(param,"Cannot start a repair thread");
2687
/* Cleanup: Detach from the share. Avoid others to be blocked. */
2688
if (io_share.total_threads)
2689
remove_io_thread(&sort_param[i].read_cache);
2690
sort_info.got_error=1;
2693
sort_info.threads_running++;
2695
(void) pthread_attr_destroy(&thr_attr);
2697
/* waiting for all threads to finish */
2698
while (sort_info.threads_running)
2699
pthread_cond_wait(&sort_info.cond, &sort_info.mutex);
2700
pthread_mutex_unlock(&sort_info.mutex);
2702
if ((got_error= thr_write_keys(sort_param)))
2704
param->retry_repair=1;
2707
got_error=1; /* Assume the following may go wrong */
2709
if (sort_param[0].fix_datafile)
2712
Append some nuls to the end of a memory mapped file. Destroy the
2713
write cache. The master thread did already detach from the share
2714
by remove_io_thread() in sort.c:thr_find_all_keys().
2716
if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
2718
if (param->testflag & T_SAFE_REPAIR)
2720
/* Don't repair if we loosed more than one row */
2721
if (info->state->records+1 < start_records)
2723
info->state->records=start_records;
2727
share->state.state.data_file_length= info->state->data_file_length=
2728
sort_param->filepos;
2729
/* Only whole records */
2730
share->state.version=(ulong) time((time_t*) 0);
2733
Exchange the data file descriptor of the table, so that we use the
2734
new file from now on.
2736
internal::my_close(info->dfile,MYF(0));
2737
info->dfile=new_file;
2739
share->data_file_type=sort_info.new_data_file_type;
2740
share->pack.header_length=(ulong) new_header_length;
2743
info->state->data_file_length=sort_param->max_pos;
2745
if (rep_quick && del+sort_info.dupp != info->state->del)
2747
mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records");
2748
mi_check_print_error(param,"Run recovery again without -q");
2749
param->retry_repair=1;
2750
param->testflag|=T_RETRY_WITHOUT_QUICK;
2754
if (rep_quick & T_FORCE_UNIQUENESS)
2756
my_off_t skr=info->state->data_file_length+
2757
(share->options & HA_OPTION_COMPRESS_RECORD ?
2758
MEMMAP_EXTRA_MARGIN : 0);
2760
if (share->data_file_type == STATIC_RECORD &&
2761
skr < share->base.reloc*share->base.min_pack_length)
2762
skr=share->base.reloc*share->base.min_pack_length;
2764
if (skr != sort_info.filelength && !info->s->base.raid_type)
2765
if (ftruncate(info->dfile, skr))
2766
mi_check_print_warning(param,
2767
"Can't change size of datafile, error: %d",
2770
if (param->testflag & T_CALC_CHECKSUM)
2771
info->state->checksum=param->glob_crc;
2773
if (ftruncate(share->kfile, info->state->key_file_length))
2774
mi_check_print_warning(param,
2775
"Can't change size of indexfile, error: %d", errno);
2777
if (!(param->testflag & T_SILENT))
2779
if (start_records != info->state->records)
2780
printf("Data records: %s\n", llstr(info->state->records,llbuff));
2782
mi_check_print_warning(param,
2783
"%s records have been removed",
2784
llstr(sort_info.dupp,llbuff));
2788
if (&share->state.state != info->state)
2789
memcpy(&share->state.state, info->state, sizeof(*info->state));
2792
got_error|= flush_blocks(param, share->key_cache, share->kfile);
2794
Destroy the write cache. The master thread did already detach from
2795
the share by remove_io_thread() or it was not yet started (if the
2796
error happend before creating the thread).
2798
end_io_cache(&info->rec_cache);
2800
Destroy the new data cache in case of non-quick repair. All slave
2801
threads did either detach from the share by remove_io_thread()
2802
already or they were not yet started (if the error happend before
2803
creating the threads).
2806
end_io_cache(&new_data_cache);
2809
/* Replace the actual file with the temporary file */
2812
internal::my_close(new_file,MYF(0));
2813
info->dfile=new_file= -1;
2814
if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
2815
DATA_TMP_EXT, share->base.raid_chunks,
2816
(param->testflag & T_BACKUP_DATA ?
2817
MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
2818
mi_open_datafile(info,share,-1))
2824
if (! param->error_printed)
2825
mi_check_print_error(param,"%d when fixing table",errno);
2828
internal::my_close(new_file,MYF(0));
2829
my_delete(param->temp_filename, MYF(MY_WME));
2830
if (info->dfile == new_file)
2833
mi_mark_crashed_on_repair(info);
2835
else if (key_map == share->state.key_map)
2836
share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS;
2837
share->state.changed|=STATE_NOT_SORTED_PAGES;
2839
pthread_cond_destroy (&sort_info.cond);
2840
pthread_mutex_destroy(&sort_info.mutex);
2842
free((unsigned char*) sort_info.key_block);
2843
free((unsigned char*) sort_param);
2844
free(sort_info.buff);
2845
end_io_cache(¶m->read_cache);
2846
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2847
if (!got_error && (param->testflag & T_UNPACK))
2849
share->state.header.options[0]&= (unsigned char) ~HA_OPTION_COMPRESS_RECORD;
2850
share->pack.header_length=0;
2855
2351
/* Read next record and return next key */
2857
2353
int sort_key_read(MI_SORT_PARAM *sort_param, void *key)
3664
3160
(my_off_t) info->s->base.max_data_file_length;
3667
/* Recreate table with bigger more alloced record-data */
3669
int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename)
3674
MI_KEYDEF *keyinfo,*key,*key_end;
3675
HA_KEYSEG *keysegs,*keyseg;
3676
MI_COLUMNDEF *recdef,*rec,*end;
3677
MI_UNIQUEDEF *uniquedef,*u_ptr,*u_end;
3678
MI_STATUS_INFO status_info;
3679
uint32_t unpack,key_parts;
3680
ha_rows max_records;
3681
uint64_t file_length,tmp_length;
3682
MI_CREATE_INFO create_info;
3684
error=1; /* Default error */
3686
status_info= (*org_info)->state[0];
3687
info.state= &status_info;
3688
share= *(*org_info)->s;
3689
unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
3690
(param->testflag & T_UNPACK);
3691
if (!(keyinfo=(MI_KEYDEF*) malloc(sizeof(MI_KEYDEF)*share.base.keys)))
3693
memcpy(keyinfo,share.keyinfo,sizeof(MI_KEYDEF)*share.base.keys);
3695
key_parts= share.base.all_key_parts;
3696
if (!(keysegs=(HA_KEYSEG*) malloc(sizeof(HA_KEYSEG)*
3697
(key_parts+share.base.keys))))
3702
if (!(recdef=(MI_COLUMNDEF*)
3703
malloc(sizeof(MI_COLUMNDEF)*(share.base.fields+1))))
3709
if (!(uniquedef=(MI_UNIQUEDEF*)
3710
malloc(sizeof(MI_UNIQUEDEF)*(share.state.header.uniques+1))))
3718
/* Copy the column definitions */
3719
memcpy(recdef, share.rec, sizeof(MI_COLUMNDEF)*(share.base.fields+1));
3720
for (rec=recdef,end=recdef+share.base.fields; rec != end ; rec++)
3722
if (unpack && !(share.options & HA_OPTION_PACK_RECORD) &&
3723
rec->type != FIELD_BLOB &&
3724
rec->type != FIELD_VARCHAR &&
3725
rec->type != FIELD_CHECK)
3726
rec->type=(int) FIELD_NORMAL;
3729
/* Change the new key to point at the saved key segments */
3730
memcpy(keysegs,share.keyparts,
3731
sizeof(HA_KEYSEG)*(key_parts+share.base.keys+
3732
share.state.header.uniques));
3734
for (key=keyinfo,key_end=keyinfo+share.base.keys; key != key_end ; key++)
3737
for (; keyseg->type ; keyseg++)
3739
if (param->language)
3740
keyseg->language=param->language; /* change language */
3742
keyseg++; /* Skip end pointer */
3745
/* Copy the unique definitions and change them to point at the new key
3747
memcpy(uniquedef,share.uniqueinfo,
3748
sizeof(MI_UNIQUEDEF)*(share.state.header.uniques));
3749
for (u_ptr=uniquedef,u_end=uniquedef+share.state.header.uniques;
3750
u_ptr != u_end ; u_ptr++)
3753
keyseg+=u_ptr->keysegs+1;
3755
if (share.options & HA_OPTION_COMPRESS_RECORD)
3756
share.base.records=max_records=info.state->records;
3757
else if (share.base.min_pack_length)
3758
max_records=(ha_rows) (lseek(info.dfile,0L,SEEK_END) /
3759
(ulong) share.base.min_pack_length);
3762
unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
3763
(param->testflag & T_UNPACK);
3764
share.options&= ~HA_OPTION_TEMP_COMPRESS_RECORD;
3766
file_length=(uint64_t) lseek(info.dfile,0L,SEEK_END);
3767
tmp_length= file_length+file_length/10;
3768
set_if_bigger(file_length,param->max_data_file_length);
3769
set_if_bigger(file_length,tmp_length);
3770
set_if_bigger(file_length,(uint64_t) share.base.max_data_file_length);
3772
mi_close(*org_info);
3773
memset(&create_info, 0, sizeof(create_info));
3774
create_info.max_rows=max(max_records,share.base.records);
3775
create_info.reloc_rows=share.base.reloc;
3776
create_info.old_options=(share.options |
3777
(unpack ? HA_OPTION_TEMP_COMPRESS_RECORD : 0));
3779
create_info.data_file_length=file_length;
3780
create_info.auto_increment=share.state.auto_increment;
3781
create_info.language = (param->language ? param->language :
3782
share.state.header.language);
3783
create_info.key_file_length= status_info.key_file_length;
3785
Allow for creating an auto_increment key. This has an effect only if
3786
an auto_increment key exists in the original table.
3788
create_info.with_auto_increment= true;
3789
/* We don't have to handle symlinks here because we are using
3790
HA_DONT_TOUCH_DATA */
3791
if (mi_create(filename,
3792
share.base.keys - share.state.header.uniques,
3793
keyinfo, share.base.fields, recdef,
3794
share.state.header.uniques, uniquedef,
3796
HA_DONT_TOUCH_DATA))
3798
mi_check_print_error(param,"Got error %d when trying to recreate indexfile",errno);
3801
*org_info=mi_open(filename,O_RDWR,
3802
(param->testflag & T_WAIT_FOREVER) ? HA_OPEN_WAIT_IF_LOCKED :
3803
(param->testflag & T_DESCRIPT) ? HA_OPEN_IGNORE_IF_LOCKED :
3804
HA_OPEN_ABORT_IF_LOCKED);
3807
mi_check_print_error(param,"Got error %d when trying to open re-created indexfile",
3811
/* We are modifing */
3812
(*org_info)->s->options&= ~HA_OPTION_READ_ONLY_DATA;
3813
_mi_readinfo(*org_info,F_WRLCK,0);
3814
(*org_info)->state->records=info.state->records;
3815
if (share.state.create_time)
3816
(*org_info)->s->state.create_time=share.state.create_time;
3817
(*org_info)->s->state.unique=(*org_info)->this_unique=
3819
(*org_info)->state->checksum=info.state->checksum;
3820
(*org_info)->state->del=info.state->del;
3821
(*org_info)->s->state.dellink=share.state.dellink;
3822
(*org_info)->state->empty=info.state->empty;
3823
(*org_info)->state->data_file_length=info.state->data_file_length;
3824
if (update_state_info(param,*org_info,UPDATE_TIME | UPDATE_STAT |
3837
3164
/* write suffix to data file if neaded */