2331
2348
free((unsigned char*) sort_info.key_block);
2332
2349
free(sort_info.buff);
2333
param->read_cache.end_io_cache();
2350
end_io_cache(¶m->read_cache);
2351
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2352
if (!got_error && (param->testflag & T_UNPACK))
2354
share->state.header.options[0]&= (unsigned char) ~HA_OPTION_COMPRESS_RECORD;
2355
share->pack.header_length=0;
2361
Threaded repair of table using sorting
2364
mi_repair_parallel()
2365
param Repair parameters
2366
info MyISAM handler to repair
2367
name Name of table (for warnings)
2368
rep_quick set to <> 0 if we should not change data file
2371
Same as mi_repair_by_sort but do it multithreaded
2372
Each key is handled by a separate thread.
2373
TODO: make a number of threads a parameter
2375
In parallel repair we use one thread per index. There are two modes:
2379
Only the indexes are rebuilt. All threads share a read buffer.
2380
Every thread that needs fresh data in the buffer enters the shared
2381
cache lock. The last thread joining the lock reads the buffer from
2382
the data file and wakes all other threads.
2386
The data file is rebuilt and all indexes are rebuilt to point to
2387
the new record positions. One thread is the master thread. It
2388
reads from the old data file and writes to the new data file. It
2389
also creates one of the indexes. The other threads read from a
2390
buffer which is filled by the master. If they need fresh data,
2391
they enter the shared cache lock. If the masters write buffer is
2392
full, it flushes it to the new data file and enters the shared
2393
cache lock too. When all threads joined in the lock, the master
2394
copies its write buffer to the read buffer for the other threads
2402
int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
2403
const char * name, int rep_quick)
2406
uint32_t i,key, total_key_length, istep;
2408
ha_rows start_records;
2409
my_off_t new_header_length,del;
2411
MI_SORT_PARAM *sort_param=0;
2412
MYISAM_SHARE *share=info->s;
2413
ulong *rec_per_key_part;
2416
IO_CACHE new_data_cache; /* For non-quick repair. */
2417
IO_CACHE_SHARE io_share;
2418
SORT_INFO sort_info;
2419
uint64_t key_map= 0;
2420
pthread_attr_t thr_attr;
2421
ulong max_pack_reclength;
2423
start_records=info->state->records;
2426
new_header_length=(param->testflag & T_UNPACK) ? 0 :
2427
share->pack.header_length;
2428
if (!(param->testflag & T_SILENT))
2430
printf("- parallel recovering (with sort) MyISAM-table '%s'\n",name);
2431
printf("Data records: %s\n", llstr(start_records,llbuff));
2433
param->testflag|=T_REP; /* for easy checking */
2435
if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
2436
param->testflag|=T_CALC_CHECKSUM;
2439
Quick repair (not touching data file, rebuilding indexes):
2441
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
2444
Non-quick repair (rebuilding data file and indexes):
2448
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
2449
Write cache is (MI_INFO *info)->rec_cache using new_file.
2453
Read cache is new_data_cache synced to master rec_cache.
2455
The final assignment of the filedescriptor for rec_cache is done
2456
after the cache creation.
2458
Don't check file size on new_data_cache, as the resulting file size
2461
As rec_cache and new_data_cache are synced, write_buffer_length is
2462
used for the read cache 'new_data_cache'. Both start at the same
2463
position 'new_header_length'.
2466
memset(&sort_info, 0, sizeof(sort_info));
2467
/* Initialize pthread structures before goto err. */
2468
pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST);
2469
pthread_cond_init(&sort_info.cond, 0);
2471
if (!(sort_info.key_block=
2472
alloc_key_blocks(param, (uint) param->sort_key_blocks,
2473
share->base.max_key_block_length)) ||
2474
init_io_cache(¶m->read_cache, info->dfile,
2475
(uint) param->read_buffer_length,
2476
READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) ||
2478
(init_io_cache(&info->rec_cache, info->dfile,
2479
(uint) param->write_buffer_length,
2480
WRITE_CACHE, new_header_length, 1,
2481
MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) ||
2482
init_io_cache(&new_data_cache, -1,
2483
(uint) param->write_buffer_length,
2484
READ_CACHE, new_header_length, 1,
2485
MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))))
2487
sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
2488
info->opt_flag|=WRITE_CACHE_USED;
2489
info->rec_cache.file=info->dfile; /* for sort_delete_record */
2493
/* Get real path for data file */
2494
if ((new_file=my_create(fn_format(param->temp_filename,
2495
share->data_file_name, "",
2498
0,param->tmpfile_createflag,
2501
mi_check_print_error(param,"Can't create new tempfile: '%s'",
2502
param->temp_filename);
2505
if (new_header_length &&
2506
filecopy(param, new_file,info->dfile,0L,new_header_length,
2509
if (param->testflag & T_UNPACK)
2511
share->options&= ~HA_OPTION_COMPRESS_RECORD;
2512
mi_int2store(share->state.header.options,share->options);
2514
share->state.dellink= HA_OFFSET_ERROR;
2515
info->rec_cache.file=new_file;
2518
info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
2520
/* Optionally drop indexes and optionally modify the key_map. */
2521
mi_drop_all_indexes(param, info, false);
2522
key_map= share->state.key_map;
2523
if (param->testflag & T_CREATE_MISSING_KEYS)
2525
/* Invert the copied key_map to recreate all disabled indexes. */
2529
sort_info.info=info;
2530
sort_info.param = param;
2532
set_data_file_type(&sort_info, share);
2535
param->read_cache.end_of_file=sort_info.filelength=
2536
lseek(param->read_cache.file,0L,SEEK_END);
2538
if (share->data_file_type == DYNAMIC_RECORD)
2539
rec_length=cmax(share->base.min_pack_length+1,share->base.min_block_length);
2540
else if (share->data_file_type == COMPRESSED_RECORD)
2541
rec_length=share->base.min_block_length;
2543
rec_length=share->base.pack_reclength;
2545
+1 below is required hack for parallel repair mode.
2546
The info->state->records value, that is compared later
2547
to sort_info.max_records and cannot exceed it, is
2548
increased in sort_key_write. In mi_repair_by_sort, sort_key_write
2549
is called after sort_key_read, where the comparison is performed,
2550
but in parallel mode master thread can call sort_key_write
2551
before some other repair thread calls sort_key_read.
2552
Furthermore I'm not even sure +1 would be enough.
2553
May be sort_info.max_records shold be always set to max value in
2556
sort_info.max_records=
2557
((param->testflag & T_CREATE_MISSING_KEYS) ? info->state->records + 1:
2558
(ha_rows) (sort_info.filelength/rec_length+1));
2560
del=info->state->del;
2562
/* for compressed tables */
2563
max_pack_reclength= share->base.pack_reclength;
2564
if (share->options & HA_OPTION_COMPRESS_RECORD)
2565
set_if_bigger(max_pack_reclength, share->max_pack_length);
2566
if (!(sort_param=(MI_SORT_PARAM *)
2567
malloc(share->base.keys *
2568
(sizeof(MI_SORT_PARAM) + max_pack_reclength))))
2570
mi_check_print_error(param,"Not enough memory for key!");
2573
memset(sort_param, 0, share->base.keys *
2574
(sizeof(MI_SORT_PARAM) + max_pack_reclength));
2576
rec_per_key_part= param->rec_per_key_part;
2577
info->state->records=info->state->del=share->state.split=0;
2578
info->state->empty=0;
2580
for (i=key=0, istep=1 ; key < share->base.keys ;
2581
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
2583
sort_param[i].key=key;
2584
sort_param[i].keyinfo=share->keyinfo+key;
2585
sort_param[i].seg=sort_param[i].keyinfo->seg;
2587
Skip this index if it is marked disabled in the copied
2588
(and possibly inverted) key_map.
2590
if (! mi_is_key_active(key_map, key))
2592
/* Remember old statistics for key */
2593
assert(rec_per_key_part >= param->rec_per_key_part);
2594
memcpy(rec_per_key_part,
2595
(share->state.rec_per_key_part +
2596
(rec_per_key_part - param->rec_per_key_part)),
2597
sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
2602
if ((!(param->testflag & T_SILENT)))
2603
printf ("- Fixing index %d\n",key+1);
2605
sort_param[i].key_read=sort_key_read;
2606
sort_param[i].key_write=sort_key_write;
2608
sort_param[i].key_cmp=sort_key_cmp;
2609
sort_param[i].lock_in_memory=lock_memory;
2610
sort_param[i].sort_info=&sort_info;
2611
sort_param[i].master=0;
2612
sort_param[i].fix_datafile=0;
2613
sort_param[i].calc_checksum= 0;
2615
sort_param[i].filepos=new_header_length;
2616
sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length;
2618
sort_param[i].record= (((unsigned char *)(sort_param+share->base.keys))+
2619
(max_pack_reclength * i));
2620
if (!mi_alloc_rec_buff(info, -1, &sort_param[i].rec_buff))
2622
mi_check_print_error(param,"Not enough memory!");
2626
sort_param[i].key_length=share->rec_reflength;
2627
for (keyseg=sort_param[i].seg; keyseg->type != HA_KEYTYPE_END;
2630
sort_param[i].key_length+=keyseg->length;
2631
if (keyseg->flag & HA_SPACE_PACK)
2632
sort_param[i].key_length+=get_pack_length(keyseg->length);
2633
if (keyseg->flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
2634
sort_param[i].key_length+=2 + test(keyseg->length >= 127);
2635
if (keyseg->flag & HA_NULL_PART)
2636
sort_param[i].key_length++;
2638
total_key_length+=sort_param[i].key_length;
2640
sort_info.total_keys=i;
2641
sort_param[0].master= 1;
2642
sort_param[0].fix_datafile= (bool)(! rep_quick);
2643
sort_param[0].calc_checksum= test(param->testflag & T_CALC_CHECKSUM);
2645
sort_info.got_error=0;
2646
pthread_mutex_lock(&sort_info.mutex);
2649
Initialize the I/O cache share for use with the read caches and, in
2650
case of non-quick repair, the write cache. When all threads join on
2651
the cache lock, the writer copies the write cache contents to the
2657
init_io_cache_share(¶m->read_cache, &io_share, NULL, i);
2659
init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
2662
io_share.total_threads= 0; /* share not used */
2664
(void) pthread_attr_init(&thr_attr);
2665
(void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
2667
for (i=0 ; i < sort_info.total_keys ; i++)
2670
Copy the properly initialized IO_CACHE structure so that every
2671
thread has its own copy. In quick mode param->read_cache is shared
2672
for use by all threads. In non-quick mode all threads but the
2673
first copy the shared new_data_cache, which is synchronized to the
2674
write cache of the first thread. The first thread copies
2675
param->read_cache, which is not shared.
2677
sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache :
2681
two approaches: the same amount of memory for each thread
2682
or the memory for the same number of keys for each thread...
2683
In the second one all the threads will fill their sort_buffers
2684
(and call write_keys) at the same time, putting more stress on i/o.
2686
sort_param[i].sortbuff_size=
2687
#ifndef USING_SECOND_APPROACH
2688
param->sort_buffer_length/sort_info.total_keys;
2690
param->sort_buffer_length*sort_param[i].key_length/total_key_length;
2692
if (pthread_create(&sort_param[i].thr, &thr_attr,
2694
(void *) (sort_param+i)))
2696
mi_check_print_error(param,"Cannot start a repair thread");
2697
/* Cleanup: Detach from the share. Avoid others to be blocked. */
2698
if (io_share.total_threads)
2699
remove_io_thread(&sort_param[i].read_cache);
2700
sort_info.got_error=1;
2703
sort_info.threads_running++;
2705
(void) pthread_attr_destroy(&thr_attr);
2707
/* waiting for all threads to finish */
2708
while (sort_info.threads_running)
2709
pthread_cond_wait(&sort_info.cond, &sort_info.mutex);
2710
pthread_mutex_unlock(&sort_info.mutex);
2712
if ((got_error= thr_write_keys(sort_param)))
2714
param->retry_repair=1;
2717
got_error=1; /* Assume the following may go wrong */
2719
if (sort_param[0].fix_datafile)
2722
Append some nuls to the end of a memory mapped file. Destroy the
2723
write cache. The master thread did already detach from the share
2724
by remove_io_thread() in sort.c:thr_find_all_keys().
2726
if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
2728
if (param->testflag & T_SAFE_REPAIR)
2730
/* Don't repair if we loosed more than one row */
2731
if (info->state->records+1 < start_records)
2733
info->state->records=start_records;
2737
share->state.state.data_file_length= info->state->data_file_length=
2738
sort_param->filepos;
2739
/* Only whole records */
2740
share->state.version=(ulong) time((time_t*) 0);
2743
Exchange the data file descriptor of the table, so that we use the
2744
new file from now on.
2746
my_close(info->dfile,MYF(0));
2747
info->dfile=new_file;
2749
share->data_file_type=sort_info.new_data_file_type;
2750
share->pack.header_length=(ulong) new_header_length;
2753
info->state->data_file_length=sort_param->max_pos;
2755
if (rep_quick && del+sort_info.dupp != info->state->del)
2757
mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records");
2758
mi_check_print_error(param,"Run recovery again without -q");
2759
param->retry_repair=1;
2760
param->testflag|=T_RETRY_WITHOUT_QUICK;
2764
if (rep_quick & T_FORCE_UNIQUENESS)
2766
my_off_t skr=info->state->data_file_length+
2767
(share->options & HA_OPTION_COMPRESS_RECORD ?
2768
MEMMAP_EXTRA_MARGIN : 0);
2770
if (share->data_file_type == STATIC_RECORD &&
2771
skr < share->base.reloc*share->base.min_pack_length)
2772
skr=share->base.reloc*share->base.min_pack_length;
2774
if (skr != sort_info.filelength && !info->s->base.raid_type)
2775
if (ftruncate(info->dfile, skr))
2776
mi_check_print_warning(param,
2777
"Can't change size of datafile, error: %d",
2780
if (param->testflag & T_CALC_CHECKSUM)
2781
info->state->checksum=param->glob_crc;
2783
if (ftruncate(share->kfile, info->state->key_file_length))
2784
mi_check_print_warning(param,
2785
"Can't change size of indexfile, error: %d", my_errno);
2787
if (!(param->testflag & T_SILENT))
2789
if (start_records != info->state->records)
2790
printf("Data records: %s\n", llstr(info->state->records,llbuff));
2792
mi_check_print_warning(param,
2793
"%s records have been removed",
2794
llstr(sort_info.dupp,llbuff));
2798
if (&share->state.state != info->state)
2799
memcpy(&share->state.state, info->state, sizeof(*info->state));
2802
got_error|= flush_blocks(param, share->key_cache, share->kfile);
2804
Destroy the write cache. The master thread did already detach from
2805
the share by remove_io_thread() or it was not yet started (if the
2806
error happend before creating the thread).
2808
end_io_cache(&info->rec_cache);
2810
Destroy the new data cache in case of non-quick repair. All slave
2811
threads did either detach from the share by remove_io_thread()
2812
already or they were not yet started (if the error happend before
2813
creating the threads).
2816
end_io_cache(&new_data_cache);
2819
/* Replace the actual file with the temporary file */
2822
my_close(new_file,MYF(0));
2823
info->dfile=new_file= -1;
2824
if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
2825
DATA_TMP_EXT, share->base.raid_chunks,
2826
(param->testflag & T_BACKUP_DATA ?
2827
MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
2828
mi_open_datafile(info,share,-1))
2834
if (! param->error_printed)
2835
mi_check_print_error(param,"%d when fixing table",my_errno);
2838
my_close(new_file,MYF(0));
2839
my_delete(param->temp_filename, MYF(MY_WME));
2840
if (info->dfile == new_file)
2843
mi_mark_crashed_on_repair(info);
2845
else if (key_map == share->state.key_map)
2846
share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS;
2847
share->state.changed|=STATE_NOT_SORTED_PAGES;
2849
pthread_cond_destroy (&sort_info.cond);
2850
pthread_mutex_destroy(&sort_info.mutex);
2852
free((unsigned char*) sort_info.key_block);
2853
free((unsigned char*) sort_param);
2854
free(sort_info.buff);
2855
end_io_cache(¶m->read_cache);
2334
2856
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2335
2857
if (!got_error && (param->testflag & T_UNPACK))
3152
3676
(my_off_t) info->s->base.max_data_file_length;
3679
/* Recreate table with bigger more alloced record-data */
3681
int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename)
3686
MI_KEYDEF *keyinfo,*key,*key_end;
3687
HA_KEYSEG *keysegs,*keyseg;
3688
MI_COLUMNDEF *recdef,*rec,*end;
3689
MI_UNIQUEDEF *uniquedef,*u_ptr,*u_end;
3690
MI_STATUS_INFO status_info;
3691
uint32_t unpack,key_parts;
3692
ha_rows max_records;
3693
uint64_t file_length,tmp_length;
3694
MI_CREATE_INFO create_info;
3696
error=1; /* Default error */
3698
status_info= (*org_info)->state[0];
3699
info.state= &status_info;
3700
share= *(*org_info)->s;
3701
unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
3702
(param->testflag & T_UNPACK);
3703
if (!(keyinfo=(MI_KEYDEF*) malloc(sizeof(MI_KEYDEF)*share.base.keys)))
3705
memcpy(keyinfo,share.keyinfo,sizeof(MI_KEYDEF)*share.base.keys);
3707
key_parts= share.base.all_key_parts;
3708
if (!(keysegs=(HA_KEYSEG*) malloc(sizeof(HA_KEYSEG)*
3709
(key_parts+share.base.keys))))
3714
if (!(recdef=(MI_COLUMNDEF*)
3715
malloc(sizeof(MI_COLUMNDEF)*(share.base.fields+1))))
3721
if (!(uniquedef=(MI_UNIQUEDEF*)
3722
malloc(sizeof(MI_UNIQUEDEF)*(share.state.header.uniques+1))))
3730
/* Copy the column definitions */
3731
memcpy(recdef, share.rec, sizeof(MI_COLUMNDEF)*(share.base.fields+1));
3732
for (rec=recdef,end=recdef+share.base.fields; rec != end ; rec++)
3734
if (unpack && !(share.options & HA_OPTION_PACK_RECORD) &&
3735
rec->type != FIELD_BLOB &&
3736
rec->type != FIELD_VARCHAR &&
3737
rec->type != FIELD_CHECK)
3738
rec->type=(int) FIELD_NORMAL;
3741
/* Change the new key to point at the saved key segments */
3742
memcpy(keysegs,share.keyparts,
3743
sizeof(HA_KEYSEG)*(key_parts+share.base.keys+
3744
share.state.header.uniques));
3746
for (key=keyinfo,key_end=keyinfo+share.base.keys; key != key_end ; key++)
3749
for (; keyseg->type ; keyseg++)
3751
if (param->language)
3752
keyseg->language=param->language; /* change language */
3754
keyseg++; /* Skip end pointer */
3757
/* Copy the unique definitions and change them to point at the new key
3759
memcpy(uniquedef,share.uniqueinfo,
3760
sizeof(MI_UNIQUEDEF)*(share.state.header.uniques));
3761
for (u_ptr=uniquedef,u_end=uniquedef+share.state.header.uniques;
3762
u_ptr != u_end ; u_ptr++)
3765
keyseg+=u_ptr->keysegs+1;
3767
if (share.options & HA_OPTION_COMPRESS_RECORD)
3768
share.base.records=max_records=info.state->records;
3769
else if (share.base.min_pack_length)
3770
max_records=(ha_rows) (lseek(info.dfile,0L,SEEK_END) /
3771
(ulong) share.base.min_pack_length);
3774
unpack= (share.options & HA_OPTION_COMPRESS_RECORD) &&
3775
(param->testflag & T_UNPACK);
3776
share.options&= ~HA_OPTION_TEMP_COMPRESS_RECORD;
3778
file_length=(uint64_t) lseek(info.dfile,0L,SEEK_END);
3779
tmp_length= file_length+file_length/10;
3780
set_if_bigger(file_length,param->max_data_file_length);
3781
set_if_bigger(file_length,tmp_length);
3782
set_if_bigger(file_length,(uint64_t) share.base.max_data_file_length);
3784
mi_close(*org_info);
3785
memset(&create_info, 0, sizeof(create_info));
3786
create_info.max_rows=cmax(max_records,share.base.records);
3787
create_info.reloc_rows=share.base.reloc;
3788
create_info.old_options=(share.options |
3789
(unpack ? HA_OPTION_TEMP_COMPRESS_RECORD : 0));
3791
create_info.data_file_length=file_length;
3792
create_info.auto_increment=share.state.auto_increment;
3793
create_info.language = (param->language ? param->language :
3794
share.state.header.language);
3795
create_info.key_file_length= status_info.key_file_length;
3797
Allow for creating an auto_increment key. This has an effect only if
3798
an auto_increment key exists in the original table.
3800
create_info.with_auto_increment= true;
3801
/* We don't have to handle symlinks here because we are using
3802
HA_DONT_TOUCH_DATA */
3803
if (mi_create(filename,
3804
share.base.keys - share.state.header.uniques,
3805
keyinfo, share.base.fields, recdef,
3806
share.state.header.uniques, uniquedef,
3808
HA_DONT_TOUCH_DATA))
3810
mi_check_print_error(param,"Got error %d when trying to recreate indexfile",my_errno);
3813
*org_info=mi_open(filename,O_RDWR,
3814
(param->testflag & T_WAIT_FOREVER) ? HA_OPEN_WAIT_IF_LOCKED :
3815
(param->testflag & T_DESCRIPT) ? HA_OPEN_IGNORE_IF_LOCKED :
3816
HA_OPEN_ABORT_IF_LOCKED);
3819
mi_check_print_error(param,"Got error %d when trying to open re-created indexfile",
3823
/* We are modifing */
3824
(*org_info)->s->options&= ~HA_OPTION_READ_ONLY_DATA;
3825
_mi_readinfo(*org_info,F_WRLCK,0);
3826
(*org_info)->state->records=info.state->records;
3827
if (share.state.create_time)
3828
(*org_info)->s->state.create_time=share.state.create_time;
3829
(*org_info)->s->state.unique=(*org_info)->this_unique=
3831
(*org_info)->state->checksum=info.state->checksum;
3832
(*org_info)->state->del=info.state->del;
3833
(*org_info)->s->state.dellink=share.state.dellink;
3834
(*org_info)->state->empty=info.state->empty;
3835
(*org_info)->state->data_file_length=info.state->data_file_length;
3836
if (update_state_info(param,*org_info,UPDATE_TIME | UPDATE_STAT |
3156
3849
/* write suffix to data file if neaded */