2331
2358
free((unsigned char*) sort_info.key_block);
2332
2359
free(sort_info.buff);
2333
param->read_cache.end_io_cache();
2360
end_io_cache(¶m->read_cache);
2361
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2362
if (!got_error && (param->testflag & T_UNPACK))
2364
share->state.header.options[0]&= (unsigned char) ~HA_OPTION_COMPRESS_RECORD;
2365
share->pack.header_length=0;
2371
Threaded repair of table using sorting
2374
mi_repair_parallel()
2375
param Repair parameters
2376
info MyISAM handler to repair
2377
name Name of table (for warnings)
2378
rep_quick set to <> 0 if we should not change data file
2381
Same as mi_repair_by_sort but do it multithreaded
2382
Each key is handled by a separate thread.
2383
TODO: make a number of threads a parameter
2385
In parallel repair we use one thread per index. There are two modes:
2389
Only the indexes are rebuilt. All threads share a read buffer.
2390
Every thread that needs fresh data in the buffer enters the shared
2391
cache lock. The last thread joining the lock reads the buffer from
2392
the data file and wakes all other threads.
2396
The data file is rebuilt and all indexes are rebuilt to point to
2397
the new record positions. One thread is the master thread. It
2398
reads from the old data file and writes to the new data file. It
2399
also creates one of the indexes. The other threads read from a
2400
buffer which is filled by the master. If they need fresh data,
2401
they enter the shared cache lock. If the masters write buffer is
2402
full, it flushes it to the new data file and enters the shared
2403
cache lock too. When all threads joined in the lock, the master
2404
copies its write buffer to the read buffer for the other threads
2412
int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
2413
const char * name, int rep_quick)
2416
uint32_t i,key, total_key_length, istep;
2418
ha_rows start_records;
2419
my_off_t new_header_length,del;
2421
MI_SORT_PARAM *sort_param=0;
2422
MYISAM_SHARE *share=info->s;
2423
ulong *rec_per_key_part;
2426
IO_CACHE new_data_cache; /* For non-quick repair. */
2427
IO_CACHE_SHARE io_share;
2428
SORT_INFO sort_info;
2429
uint64_t key_map= 0;
2430
pthread_attr_t thr_attr;
2431
ulong max_pack_reclength;
2433
start_records=info->state->records;
2436
new_header_length=(param->testflag & T_UNPACK) ? 0 :
2437
share->pack.header_length;
2438
if (!(param->testflag & T_SILENT))
2440
printf("- parallel recovering (with sort) MyISAM-table '%s'\n",name);
2441
printf("Data records: %s\n", llstr(start_records,llbuff));
2443
param->testflag|=T_REP; /* for easy checking */
2445
if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
2446
param->testflag|=T_CALC_CHECKSUM;
2449
Quick repair (not touching data file, rebuilding indexes):
2451
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
2454
Non-quick repair (rebuilding data file and indexes):
2458
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
2459
Write cache is (MI_INFO *info)->rec_cache using new_file.
2463
Read cache is new_data_cache synced to master rec_cache.
2465
The final assignment of the filedescriptor for rec_cache is done
2466
after the cache creation.
2468
Don't check file size on new_data_cache, as the resulting file size
2471
As rec_cache and new_data_cache are synced, write_buffer_length is
2472
used for the read cache 'new_data_cache'. Both start at the same
2473
position 'new_header_length'.
2476
memset(&sort_info, 0, sizeof(sort_info));
2477
/* Initialize pthread structures before goto err. */
2478
pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST);
2479
pthread_cond_init(&sort_info.cond, 0);
2481
if (!(sort_info.key_block=
2482
alloc_key_blocks(param, (uint) param->sort_key_blocks,
2483
share->base.max_key_block_length)) ||
2484
init_io_cache(¶m->read_cache, info->dfile,
2485
(uint) param->read_buffer_length,
2486
READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) ||
2488
(init_io_cache(&info->rec_cache, info->dfile,
2489
(uint) param->write_buffer_length,
2490
WRITE_CACHE, new_header_length, 1,
2491
MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) ||
2492
init_io_cache(&new_data_cache, -1,
2493
(uint) param->write_buffer_length,
2494
READ_CACHE, new_header_length, 1,
2495
MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))))
2497
sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
2498
info->opt_flag|=WRITE_CACHE_USED;
2499
info->rec_cache.file=info->dfile; /* for sort_delete_record */
2503
/* Get real path for data file */
2504
if ((new_file=my_create(fn_format(param->temp_filename,
2505
share->data_file_name, "",
2508
0,param->tmpfile_createflag,
2511
mi_check_print_error(param,"Can't create new tempfile: '%s'",
2512
param->temp_filename);
2515
if (new_header_length &&
2516
filecopy(param, new_file,info->dfile,0L,new_header_length,
2519
if (param->testflag & T_UNPACK)
2521
share->options&= ~HA_OPTION_COMPRESS_RECORD;
2522
mi_int2store(share->state.header.options,share->options);
2524
share->state.dellink= HA_OFFSET_ERROR;
2525
info->rec_cache.file=new_file;
2528
info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
2530
/* Optionally drop indexes and optionally modify the key_map. */
2531
mi_drop_all_indexes(param, info, false);
2532
key_map= share->state.key_map;
2533
if (param->testflag & T_CREATE_MISSING_KEYS)
2535
/* Invert the copied key_map to recreate all disabled indexes. */
2539
sort_info.info=info;
2540
sort_info.param = param;
2542
set_data_file_type(&sort_info, share);
2545
param->read_cache.end_of_file=sort_info.filelength=
2546
lseek(param->read_cache.file,0L,SEEK_END);
2548
if (share->data_file_type == DYNAMIC_RECORD)
2549
rec_length=max(share->base.min_pack_length+1,share->base.min_block_length);
2550
else if (share->data_file_type == COMPRESSED_RECORD)
2551
rec_length=share->base.min_block_length;
2553
rec_length=share->base.pack_reclength;
2555
+1 below is required hack for parallel repair mode.
2556
The info->state->records value, that is compared later
2557
to sort_info.max_records and cannot exceed it, is
2558
increased in sort_key_write. In mi_repair_by_sort, sort_key_write
2559
is called after sort_key_read, where the comparison is performed,
2560
but in parallel mode master thread can call sort_key_write
2561
before some other repair thread calls sort_key_read.
2562
Furthermore I'm not even sure +1 would be enough.
2563
May be sort_info.max_records shold be always set to max value in
2566
sort_info.max_records=
2567
((param->testflag & T_CREATE_MISSING_KEYS) ? info->state->records + 1:
2568
(ha_rows) (sort_info.filelength/rec_length+1));
2570
del=info->state->del;
2572
/* for compressed tables */
2573
max_pack_reclength= share->base.pack_reclength;
2574
if (share->options & HA_OPTION_COMPRESS_RECORD)
2575
set_if_bigger(max_pack_reclength, share->max_pack_length);
2576
if (!(sort_param=(MI_SORT_PARAM *)
2577
malloc(share->base.keys *
2578
(sizeof(MI_SORT_PARAM) + max_pack_reclength))))
2580
mi_check_print_error(param,"Not enough memory for key!");
2583
memset(sort_param, 0, share->base.keys *
2584
(sizeof(MI_SORT_PARAM) + max_pack_reclength));
2586
rec_per_key_part= param->rec_per_key_part;
2587
info->state->records=info->state->del=share->state.split=0;
2588
info->state->empty=0;
2590
for (i=key=0, istep=1 ; key < share->base.keys ;
2591
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
2593
sort_param[i].key=key;
2594
sort_param[i].keyinfo=share->keyinfo+key;
2595
sort_param[i].seg=sort_param[i].keyinfo->seg;
2597
Skip this index if it is marked disabled in the copied
2598
(and possibly inverted) key_map.
2600
if (! mi_is_key_active(key_map, key))
2602
/* Remember old statistics for key */
2603
assert(rec_per_key_part >= param->rec_per_key_part);
2604
memcpy(rec_per_key_part,
2605
(share->state.rec_per_key_part +
2606
(rec_per_key_part - param->rec_per_key_part)),
2607
sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
2612
if ((!(param->testflag & T_SILENT)))
2613
printf ("- Fixing index %d\n",key+1);
2615
sort_param[i].key_read=sort_key_read;
2616
sort_param[i].key_write=sort_key_write;
2618
sort_param[i].key_cmp=sort_key_cmp;
2619
sort_param[i].lock_in_memory=lock_memory;
2620
sort_param[i].sort_info=&sort_info;
2621
sort_param[i].master=0;
2622
sort_param[i].fix_datafile=0;
2623
sort_param[i].calc_checksum= 0;
2625
sort_param[i].filepos=new_header_length;
2626
sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length;
2628
sort_param[i].record= (((unsigned char *)(sort_param+share->base.keys))+
2629
(max_pack_reclength * i));
2630
if (!mi_alloc_rec_buff(info, -1, &sort_param[i].rec_buff))
2632
mi_check_print_error(param,"Not enough memory!");
2636
sort_param[i].key_length=share->rec_reflength;
2637
for (keyseg=sort_param[i].seg; keyseg->type != HA_KEYTYPE_END;
2640
sort_param[i].key_length+=keyseg->length;
2641
if (keyseg->flag & HA_SPACE_PACK)
2642
sort_param[i].key_length+=get_pack_length(keyseg->length);
2643
if (keyseg->flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
2644
sort_param[i].key_length+=2 + test(keyseg->length >= 127);
2645
if (keyseg->flag & HA_NULL_PART)
2646
sort_param[i].key_length++;
2648
total_key_length+=sort_param[i].key_length;
2650
sort_info.total_keys=i;
2651
sort_param[0].master= 1;
2652
sort_param[0].fix_datafile= (bool)(! rep_quick);
2653
sort_param[0].calc_checksum= test(param->testflag & T_CALC_CHECKSUM);
2655
sort_info.got_error=0;
2656
pthread_mutex_lock(&sort_info.mutex);
2659
Initialize the I/O cache share for use with the read caches and, in
2660
case of non-quick repair, the write cache. When all threads join on
2661
the cache lock, the writer copies the write cache contents to the
2667
init_io_cache_share(¶m->read_cache, &io_share, NULL, i);
2669
init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
2672
io_share.total_threads= 0; /* share not used */
2674
(void) pthread_attr_init(&thr_attr);
2675
(void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
2677
for (i=0 ; i < sort_info.total_keys ; i++)
2680
Copy the properly initialized IO_CACHE structure so that every
2681
thread has its own copy. In quick mode param->read_cache is shared
2682
for use by all threads. In non-quick mode all threads but the
2683
first copy the shared new_data_cache, which is synchronized to the
2684
write cache of the first thread. The first thread copies
2685
param->read_cache, which is not shared.
2687
sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache :
2691
two approaches: the same amount of memory for each thread
2692
or the memory for the same number of keys for each thread...
2693
In the second one all the threads will fill their sort_buffers
2694
(and call write_keys) at the same time, putting more stress on i/o.
2696
sort_param[i].sortbuff_size=
2697
#ifndef USING_SECOND_APPROACH
2698
param->sort_buffer_length/sort_info.total_keys;
2700
param->sort_buffer_length*sort_param[i].key_length/total_key_length;
2702
if (pthread_create(&sort_param[i].thr, &thr_attr,
2704
(void *) (sort_param+i)))
2706
mi_check_print_error(param,"Cannot start a repair thread");
2707
/* Cleanup: Detach from the share. Avoid others to be blocked. */
2708
if (io_share.total_threads)
2709
remove_io_thread(&sort_param[i].read_cache);
2710
sort_info.got_error=1;
2713
sort_info.threads_running++;
2715
(void) pthread_attr_destroy(&thr_attr);
2717
/* waiting for all threads to finish */
2718
while (sort_info.threads_running)
2719
pthread_cond_wait(&sort_info.cond, &sort_info.mutex);
2720
pthread_mutex_unlock(&sort_info.mutex);
2722
if ((got_error= thr_write_keys(sort_param)))
2724
param->retry_repair=1;
2727
got_error=1; /* Assume the following may go wrong */
2729
if (sort_param[0].fix_datafile)
2732
Append some nuls to the end of a memory mapped file. Destroy the
2733
write cache. The master thread did already detach from the share
2734
by remove_io_thread() in sort.c:thr_find_all_keys().
2736
if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
2738
if (param->testflag & T_SAFE_REPAIR)
2740
/* Don't repair if we loosed more than one row */
2741
if (info->state->records+1 < start_records)
2743
info->state->records=start_records;
2747
share->state.state.data_file_length= info->state->data_file_length=
2748
sort_param->filepos;
2749
/* Only whole records */
2750
share->state.version=(ulong) time((time_t*) 0);
2753
Exchange the data file descriptor of the table, so that we use the
2754
new file from now on.
2756
my_close(info->dfile,MYF(0));
2757
info->dfile=new_file;
2759
share->data_file_type=sort_info.new_data_file_type;
2760
share->pack.header_length=(ulong) new_header_length;
2763
info->state->data_file_length=sort_param->max_pos;
2765
if (rep_quick && del+sort_info.dupp != info->state->del)
2767
mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records");
2768
mi_check_print_error(param,"Run recovery again without -q");
2769
param->retry_repair=1;
2770
param->testflag|=T_RETRY_WITHOUT_QUICK;
2774
if (rep_quick & T_FORCE_UNIQUENESS)
2776
my_off_t skr=info->state->data_file_length+
2777
(share->options & HA_OPTION_COMPRESS_RECORD ?
2778
MEMMAP_EXTRA_MARGIN : 0);
2780
if (share->data_file_type == STATIC_RECORD &&
2781
skr < share->base.reloc*share->base.min_pack_length)
2782
skr=share->base.reloc*share->base.min_pack_length;
2784
if (skr != sort_info.filelength && !info->s->base.raid_type)
2785
if (ftruncate(info->dfile, skr))
2786
mi_check_print_warning(param,
2787
"Can't change size of datafile, error: %d",
2790
if (param->testflag & T_CALC_CHECKSUM)
2791
info->state->checksum=param->glob_crc;
2793
if (ftruncate(share->kfile, info->state->key_file_length))
2794
mi_check_print_warning(param,
2795
"Can't change size of indexfile, error: %d", my_errno);
2797
if (!(param->testflag & T_SILENT))
2799
if (start_records != info->state->records)
2800
printf("Data records: %s\n", llstr(info->state->records,llbuff));
2802
mi_check_print_warning(param,
2803
"%s records have been removed",
2804
llstr(sort_info.dupp,llbuff));
2808
if (&share->state.state != info->state)
2809
memcpy(&share->state.state, info->state, sizeof(*info->state));
2812
got_error|= flush_blocks(param, share->key_cache, share->kfile);
2814
Destroy the write cache. The master thread did already detach from
2815
the share by remove_io_thread() or it was not yet started (if the
2816
error happend before creating the thread).
2818
end_io_cache(&info->rec_cache);
2820
Destroy the new data cache in case of non-quick repair. All slave
2821
threads did either detach from the share by remove_io_thread()
2822
already or they were not yet started (if the error happend before
2823
creating the threads).
2826
end_io_cache(&new_data_cache);
2829
/* Replace the actual file with the temporary file */
2832
my_close(new_file,MYF(0));
2833
info->dfile=new_file= -1;
2834
if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
2835
DATA_TMP_EXT, share->base.raid_chunks,
2836
(param->testflag & T_BACKUP_DATA ?
2837
MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
2838
mi_open_datafile(info,share,-1))
2844
if (! param->error_printed)
2845
mi_check_print_error(param,"%d when fixing table",my_errno);
2848
my_close(new_file,MYF(0));
2849
my_delete(param->temp_filename, MYF(MY_WME));
2850
if (info->dfile == new_file)
2853
mi_mark_crashed_on_repair(info);
2855
else if (key_map == share->state.key_map)
2856
share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS;
2857
share->state.changed|=STATE_NOT_SORTED_PAGES;
2859
pthread_cond_destroy (&sort_info.cond);
2860
pthread_mutex_destroy(&sort_info.mutex);
2862
free((unsigned char*) sort_info.key_block);
2863
free((unsigned char*) sort_param);
2864
free(sort_info.buff);
2865
end_io_cache(¶m->read_cache);
2334
2866
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2335
2867
if (!got_error && (param->testflag & T_UNPACK))
3152
3684
(my_off_t) info->s->base.max_data_file_length;
3687
/* Recreate table with bigger more alloced record-data */
3689
int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename)
3694
MI_KEYDEF *keyinfo,*key,*key_end;
3695
HA_KEYSEG *keysegs,*keyseg;
3696
MI_COLUMNDEF *recdef,*rec,*end;
3697
MI_UNIQUEDEF *uniquedef,*u_ptr,*u_end;
3698
MI_STATUS_INFO status_info;
3699
uint32_t unpack,key_parts;
3700
ha_rows max_records;
3701
uint64_t file_length,tmp_length;
3702
MI_CREATE_INFO create_info;
3704
error=1; /* Default error */
3706
status_info= (*org_info)->state[0];
3707
info.state= &status_info;
3708
share= *(*org_info)->s;
3709
unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
3710
(param->testflag & T_UNPACK);
3711
if (!(keyinfo=(MI_KEYDEF*) malloc(sizeof(MI_KEYDEF)*share.base.keys)))
3713
memcpy(keyinfo,share.keyinfo,sizeof(MI_KEYDEF)*share.base.keys);
3715
key_parts= share.base.all_key_parts;
3716
if (!(keysegs=(HA_KEYSEG*) malloc(sizeof(HA_KEYSEG)*
3717
(key_parts+share.base.keys))))
3722
if (!(recdef=(MI_COLUMNDEF*)
3723
malloc(sizeof(MI_COLUMNDEF)*(share.base.fields+1))))
3729
if (!(uniquedef=(MI_UNIQUEDEF*)
3730
malloc(sizeof(MI_UNIQUEDEF)*(share.state.header.uniques+1))))
3738
/* Copy the column definitions */
3739
memcpy(recdef, share.rec, sizeof(MI_COLUMNDEF)*(share.base.fields+1));
3740
for (rec=recdef,end=recdef+share.base.fields; rec != end ; rec++)
3742
if (unpack && !(share.options & HA_OPTION_PACK_RECORD) &&
3743
rec->type != FIELD_BLOB &&
3744
rec->type != FIELD_VARCHAR &&
3745
rec->type != FIELD_CHECK)
3746
rec->type=(int) FIELD_NORMAL;
3749
/* Change the new key to point at the saved key segments */
3750
memcpy(keysegs,share.keyparts,
3751
sizeof(HA_KEYSEG)*(key_parts+share.base.keys+
3752
share.state.header.uniques));
3754
for (key=keyinfo,key_end=keyinfo+share.base.keys; key != key_end ; key++)
3757
for (; keyseg->type ; keyseg++)
3759
if (param->language)
3760
keyseg->language=param->language; /* change language */
3762
keyseg++; /* Skip end pointer */
3765
/* Copy the unique definitions and change them to point at the new key
3767
memcpy(uniquedef,share.uniqueinfo,
3768
sizeof(MI_UNIQUEDEF)*(share.state.header.uniques));
3769
for (u_ptr=uniquedef,u_end=uniquedef+share.state.header.uniques;
3770
u_ptr != u_end ; u_ptr++)
3773
keyseg+=u_ptr->keysegs+1;
3775
if (share.options & HA_OPTION_COMPRESS_RECORD)
3776
share.base.records=max_records=info.state->records;
3777
else if (share.base.min_pack_length)
3778
max_records=(ha_rows) (lseek(info.dfile,0L,SEEK_END) /
3779
(ulong) share.base.min_pack_length);
3782
unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
3783
(param->testflag & T_UNPACK);
3784
share.options&= ~HA_OPTION_TEMP_COMPRESS_RECORD;
3786
file_length=(uint64_t) lseek(info.dfile,0L,SEEK_END);
3787
tmp_length= file_length+file_length/10;
3788
set_if_bigger(file_length,param->max_data_file_length);
3789
set_if_bigger(file_length,tmp_length);
3790
set_if_bigger(file_length,(uint64_t) share.base.max_data_file_length);
3792
mi_close(*org_info);
3793
memset(&create_info, 0, sizeof(create_info));
3794
create_info.max_rows=max(max_records,share.base.records);
3795
create_info.reloc_rows=share.base.reloc;
3796
create_info.old_options=(share.options |
3797
(unpack ? HA_OPTION_TEMP_COMPRESS_RECORD : 0));
3799
create_info.data_file_length=file_length;
3800
create_info.auto_increment=share.state.auto_increment;
3801
create_info.language = (param->language ? param->language :
3802
share.state.header.language);
3803
create_info.key_file_length= status_info.key_file_length;
3805
Allow for creating an auto_increment key. This has an effect only if
3806
an auto_increment key exists in the original table.
3808
create_info.with_auto_increment= true;
3809
/* We don't have to handle symlinks here because we are using
3810
HA_DONT_TOUCH_DATA */
3811
if (mi_create(filename,
3812
share.base.keys - share.state.header.uniques,
3813
keyinfo, share.base.fields, recdef,
3814
share.state.header.uniques, uniquedef,
3816
HA_DONT_TOUCH_DATA))
3818
mi_check_print_error(param,"Got error %d when trying to recreate indexfile",my_errno);
3821
*org_info=mi_open(filename,O_RDWR,
3822
(param->testflag & T_WAIT_FOREVER) ? HA_OPEN_WAIT_IF_LOCKED :
3823
(param->testflag & T_DESCRIPT) ? HA_OPEN_IGNORE_IF_LOCKED :
3824
HA_OPEN_ABORT_IF_LOCKED);
3827
mi_check_print_error(param,"Got error %d when trying to open re-created indexfile",
3831
/* We are modifing */
3832
(*org_info)->s->options&= ~HA_OPTION_READ_ONLY_DATA;
3833
_mi_readinfo(*org_info,F_WRLCK,0);
3834
(*org_info)->state->records=info.state->records;
3835
if (share.state.create_time)
3836
(*org_info)->s->state.create_time=share.state.create_time;
3837
(*org_info)->s->state.unique=(*org_info)->this_unique=
3839
(*org_info)->state->checksum=info.state->checksum;
3840
(*org_info)->state->del=info.state->del;
3841
(*org_info)->s->state.dellink=share.state.dellink;
3842
(*org_info)->state->empty=info.state->empty;
3843
(*org_info)->state->data_file_length=info.state->data_file_length;
3844
if (update_state_info(param,*org_info,UPDATE_TIME | UPDATE_STAT |
3156
3857
/* write suffix to data file if neaded */