~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2000-2006 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/* Write a row to a MyISAM table */
17
18
#include "fulltext.h"
19
#include "rt_index.h"
20
21
#define MAX_POINTER_LENGTH 8
22
23
	/* Functions declared in this file */
24
25
static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
26
		    uint comp_flag, uchar *key,
27
		    uint key_length, my_off_t pos, uchar *father_buff,
28
		    uchar *father_keypos, my_off_t father_page,
29
		    my_bool insert_last);
30
static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
31
			    uchar *curr_buff,uchar *father_buff,
32
			    uchar *father_keypos,my_off_t father_page);
33
static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
34
				uchar *key, uint *return_key_length,
35
				uchar **after_key);
36
int _mi_ck_write_tree(register MI_INFO *info, uint keynr,uchar *key,
37
		      uint key_length);
38
int _mi_ck_write_btree(register MI_INFO *info, uint keynr,uchar *key,
39
		       uint key_length);
40
41
	/* Write new record to database */
42
43
int mi_write(MI_INFO *info, uchar *record)
44
{
45
  MYISAM_SHARE *share=info->s;
46
  uint i;
47
  int save_errno;
48
  my_off_t filepos;
49
  uchar *buff;
50
  my_bool lock_tree= share->concurrent_insert;
51
  DBUG_ENTER("mi_write");
52
  DBUG_PRINT("enter",("isam: %d  data: %d",info->s->kfile,info->dfile));
53
54
  DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
55
                  mi_print_error(info->s, HA_ERR_CRASHED);
56
                  DBUG_RETURN(my_errno= HA_ERR_CRASHED););
57
  if (share->options & HA_OPTION_READ_ONLY_DATA)
58
  {
59
    DBUG_RETURN(my_errno=EACCES);
60
  }
61
  if (_mi_readinfo(info,F_WRLCK,1))
62
    DBUG_RETURN(my_errno);
63
  dont_break();				/* Dont allow SIGHUP or SIGINT */
64
#if !defined(NO_LOCKING) && defined(USE_RECORD_LOCK)
65
  if (!info->locked && my_lock(info->dfile,F_WRLCK,0L,F_TO_EOF,
66
			       MYF(MY_SEEK_NOT_DONE) | info->lock_wait))
67
    goto err;
68
#endif
69
  filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
70
             !info->append_insert_at_end) ?
71
	    share->state.dellink :
72
	    info->state->data_file_length);
73
74
  if (share->base.reloc == (ha_rows) 1 &&
75
      share->base.records == (ha_rows) 1 &&
76
      info->state->records == (ha_rows) 1)
77
  {						/* System file */
78
    my_errno=HA_ERR_RECORD_FILE_FULL;
79
    goto err2;
80
  }
81
  if (info->state->key_file_length >= share->base.margin_key_file_length)
82
  {
83
    my_errno=HA_ERR_INDEX_FILE_FULL;
84
    goto err2;
85
  }
86
  if (_mi_mark_file_changed(info))
87
    goto err2;
88
89
  /* Calculate and check all unique constraints */
90
  for (i=0 ; i < share->state.header.uniques ; i++)
91
  {
92
    if (mi_check_unique(info,share->uniqueinfo+i,record,
93
		     mi_unique_hash(share->uniqueinfo+i,record),
94
		     HA_OFFSET_ERROR))
95
      goto err2;
96
  }
97
98
	/* Write all keys to indextree */
99
100
  buff=info->lastkey2;
101
  for (i=0 ; i < share->base.keys ; i++)
102
  {
103
    if (mi_is_key_active(share->state.key_map, i))
104
    {
105
      my_bool local_lock_tree= (lock_tree &&
106
                                !(info->bulk_insert &&
107
                                  is_tree_inited(&info->bulk_insert[i])));
108
      if (local_lock_tree)
109
      {
110
	rw_wrlock(&share->key_root_lock[i]);
111
	share->keyinfo[i].version++;
112
      }
113
      if (share->keyinfo[i].flag & HA_FULLTEXT )
114
      {
115
        if (_mi_ft_add(info,i, buff, record, filepos))
116
        {
117
	  if (local_lock_tree)
118
	    rw_unlock(&share->key_root_lock[i]);
119
          DBUG_PRINT("error",("Got error: %d on write",my_errno));
120
          goto err;
121
        }
122
      }
123
      else
124
      {
125
        if (share->keyinfo[i].ck_insert(info,i,buff,
126
			_mi_make_key(info,i,buff,record,filepos)))
127
        {
128
          if (local_lock_tree)
129
            rw_unlock(&share->key_root_lock[i]);
130
          DBUG_PRINT("error",("Got error: %d on write",my_errno));
131
          goto err;
132
        }
133
      }
134
135
      /* The above changed info->lastkey2. Inform mi_rnext_same(). */
136
      info->update&= ~HA_STATE_RNEXT_SAME;
137
138
      if (local_lock_tree)
139
        rw_unlock(&share->key_root_lock[i]);
140
    }
141
  }
142
  if (share->calc_checksum)
143
    info->checksum=(*share->calc_checksum)(info,record);
144
  if (!(info->opt_flag & OPT_NO_ROWS))
145
  {
146
    if ((*share->write_record)(info,record))
147
      goto err;
148
    info->state->checksum+=info->checksum;
149
  }
150
  if (share->base.auto_key)
151
    set_if_bigger(info->s->state.auto_increment,
152
                  retrieve_auto_increment(info, record));
153
  info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
154
		 HA_STATE_ROW_CHANGED);
155
  info->state->records++;
156
  info->lastpos=filepos;
157
  myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
158
  VOID(_mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE));
159
  if (info->invalidator != 0)
160
  {
161
    DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
162
    (*info->invalidator)(info->filename);
163
    info->invalidator=0;
164
  }
165
166
  /*
167
    Update status of the table. We need to do so after each row write
168
    for the log tables, as we want the new row to become visible to
169
    other threads as soon as possible. We don't lock mutex here
170
    (as it is required by pthread memory visibility rules) as (1) it's
171
    not critical to use outdated share->is_log_table value (2) locking
172
    mutex here for every write is too expensive.
173
  */
174
  if (share->is_log_table)
175
    mi_update_status((void*) info);
176
177
  allow_break();				/* Allow SIGHUP & SIGINT */
178
  DBUG_RETURN(0);
179
180
err:
181
  save_errno=my_errno;
182
  if (my_errno == HA_ERR_FOUND_DUPP_KEY || my_errno == HA_ERR_RECORD_FILE_FULL ||
183
      my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_OUT_OF_MEM)
184
  {
185
    if (info->bulk_insert)
186
    {
187
      uint j;
188
      for (j=0 ; j < share->base.keys ; j++)
189
        mi_flush_bulk_insert(info, j);
190
    }
191
    info->errkey= (int) i;
192
    while ( i-- > 0)
193
    {
194
      if (mi_is_key_active(share->state.key_map, i))
195
      {
196
	my_bool local_lock_tree= (lock_tree &&
197
                                  !(info->bulk_insert &&
198
                                    is_tree_inited(&info->bulk_insert[i])));
199
	if (local_lock_tree)
200
	  rw_wrlock(&share->key_root_lock[i]);
201
	if (share->keyinfo[i].flag & HA_FULLTEXT)
202
        {
203
          if (_mi_ft_del(info,i, buff,record,filepos))
204
	  {
205
	    if (local_lock_tree)
206
	      rw_unlock(&share->key_root_lock[i]);
207
            break;
208
	  }
209
        }
210
        else
211
	{
212
	  uint key_length=_mi_make_key(info,i,buff,record,filepos);
213
	  if (_mi_ck_delete(info,i,buff,key_length))
214
	  {
215
	    if (local_lock_tree)
216
	      rw_unlock(&share->key_root_lock[i]);
217
	    break;
218
	  }
219
	}
220
	if (local_lock_tree)
221
	  rw_unlock(&share->key_root_lock[i]);
222
      }
223
    }
224
  }
225
  else
226
  {
227
    mi_print_error(info->s, HA_ERR_CRASHED);
228
    mi_mark_crashed(info);
229
  }
230
  info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
231
  my_errno=save_errno;
232
err2:
233
  save_errno=my_errno;
234
  myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno);
235
  VOID(_mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
236
  allow_break();			/* Allow SIGHUP & SIGINT */
237
  DBUG_RETURN(my_errno=save_errno);
238
} /* mi_write */
239
240
241
	/* Write one key to btree */
242
243
int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
244
{
245
  DBUG_ENTER("_mi_ck_write");
246
247
  if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
248
  {
249
    DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
250
  }
251
  else
252
  {
253
    DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
254
  }
255
} /* _mi_ck_write */
256
257
258
/**********************************************************************
259
 *                Normal insert code                                  *
260
 **********************************************************************/
261
262
int _mi_ck_write_btree(register MI_INFO *info, uint keynr, uchar *key,
263
		       uint key_length)
264
{
265
  int error;
266
  uint comp_flag;
267
  MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
268
  my_off_t  *root=&info->s->state.key_root[keynr];
269
  DBUG_ENTER("_mi_ck_write_btree");
270
271
  if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
272
    comp_flag=SEARCH_BIGGER;			/* Put after same key */
273
  else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
274
  {
275
    comp_flag=SEARCH_FIND | SEARCH_UPDATE;	/* No duplicates */
276
    if (keyinfo->flag & HA_NULL_ARE_EQUAL)
277
      comp_flag|= SEARCH_NULL_ARE_EQUAL;
278
  }
279
  else
280
    comp_flag=SEARCH_SAME;			/* Keys in rec-pos order */
281
282
  error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
283
                                root, comp_flag);
284
  if (info->ft1_to_ft2)
285
  {
286
    if (!error)
287
      error= _mi_ft_convert_to_ft2(info, keynr, key);
288
    delete_dynamic(info->ft1_to_ft2);
289
    my_free((uchar*)info->ft1_to_ft2, MYF(0));
290
    info->ft1_to_ft2=0;
291
  }
292
  DBUG_RETURN(error);
293
} /* _mi_ck_write_btree */
294
295
int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
296
    uchar *key, uint key_length, my_off_t *root, uint comp_flag)
297
{
298
  int error;
299
  DBUG_ENTER("_mi_ck_real_write_btree");
300
  /* key_length parameter is used only if comp_flag is SEARCH_FIND */
301
  if (*root == HA_OFFSET_ERROR ||
302
      (error=w_search(info, keyinfo, comp_flag, key, key_length,
303
		      *root, (uchar *) 0, (uchar*) 0,
304
		      (my_off_t) 0, 1)) > 0)
305
    error=_mi_enlarge_root(info,keyinfo,key,root);
306
  DBUG_RETURN(error);
307
} /* _mi_ck_real_write_btree */
308
309
310
	/* Make a new root with key as only pointer */
311
312
int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
313
                     my_off_t *root)
314
{
315
  uint t_length,nod_flag;
316
  MI_KEY_PARAM s_temp;
317
  MYISAM_SHARE *share=info->s;
318
  DBUG_ENTER("_mi_enlarge_root");
319
320
  nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
321
  _mi_kpointer(info,info->buff+2,*root); /* if nod */
322
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
323
				(uchar*) 0, (uchar*) 0, key,&s_temp);
324
  mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
325
  (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
326
  info->buff_used=info->page_changed=1;		/* info->buff is used */
327
  if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
328
      _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
329
    DBUG_RETURN(-1);
330
  DBUG_RETURN(0);
331
} /* _mi_enlarge_root */
332
333
334
	/*
335
	  Search after a position for a key and store it there
336
	  Returns -1 = error
337
		   0  = ok
338
		   1  = key should be stored in higher tree
339
	*/
340
341
static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
342
		    uint comp_flag, uchar *key, uint key_length, my_off_t page,
343
		    uchar *father_buff, uchar *father_keypos,
344
		    my_off_t father_page, my_bool insert_last)
345
{
346
  int error,flag;
347
  uint nod_flag, search_key_length;
348
  uchar *temp_buff,*keypos;
349
  uchar keybuff[MI_MAX_KEY_BUFF];
350
  my_bool was_last_key;
351
  my_off_t next_page, dupp_key_pos;
352
  DBUG_ENTER("w_search");
353
  DBUG_PRINT("enter",("page: %ld", (long) page));
354
355
  search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
356
  if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
357
				      MI_MAX_KEY_BUFF*2)))
358
    DBUG_RETURN(-1);
359
  if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
360
    goto err;
361
362
  flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
363
			      comp_flag, &keypos, keybuff, &was_last_key);
364
  nod_flag=mi_test_if_nod(temp_buff);
365
  if (flag == 0)
366
  {
367
    uint tmp_key_length;
368
	/* get position to record with duplicated key */
369
    tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
370
    if (tmp_key_length)
371
      dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
372
    else
373
      dupp_key_pos= HA_OFFSET_ERROR;
374
375
    if (keyinfo->flag & HA_FULLTEXT)
376
    {
377
      uint off;
378
      int  subkeys;
379
380
      get_key_full_length_rdonly(off, keybuff);
381
      subkeys=ft_sintXkorr(keybuff+off);
382
      comp_flag=SEARCH_SAME;
383
      if (subkeys >= 0)
384
      {
385
        /* normal word, one-level tree structure */
386
        flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
387
                                    USE_WHOLE_KEY, comp_flag,
388
                                    &keypos, keybuff, &was_last_key);
389
      }
390
      else
391
      {
392
        /* popular word. two-level tree. going down */
393
        my_off_t root=dupp_key_pos;
394
        keyinfo=&info->s->ft2_keyinfo;
395
        get_key_full_length_rdonly(off, key);
396
        key+=off;
397
        keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
398
        error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
399
                                      &root, comp_flag);
400
        _mi_dpointer(info, keypos+HA_FT_WLEN, root);
401
        subkeys--; /* should there be underflow protection ? */
402
        DBUG_ASSERT(subkeys < 0);
403
        ft_intXstore(keypos, subkeys);
404
        if (!error)
405
          error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
406
        my_afree((uchar*) temp_buff);
407
        DBUG_RETURN(error);
408
      }
409
    }
410
    else /* not HA_FULLTEXT, normal HA_NOSAME key */
411
    {
412
      info->dupp_key_pos= dupp_key_pos;
413
      my_afree((uchar*) temp_buff);
414
      my_errno=HA_ERR_FOUND_DUPP_KEY;
415
      DBUG_RETURN(-1);
416
    }
417
  }
418
  if (flag == MI_FOUND_WRONG_KEY)
419
    DBUG_RETURN(-1);
420
  if (!was_last_key)
421
    insert_last=0;
422
  next_page=_mi_kpos(nod_flag,keypos);
423
  if (next_page == HA_OFFSET_ERROR ||
424
      (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
425
		      temp_buff, keypos, page, insert_last)) >0)
426
  {
427
    error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
428
		     father_keypos,father_page, insert_last);
429
    if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
430
      goto err;
431
  }
432
  my_afree((uchar*) temp_buff);
433
  DBUG_RETURN(error);
434
err:
435
  my_afree((uchar*) temp_buff);
436
  DBUG_PRINT("exit",("Error: %d",my_errno));
437
  DBUG_RETURN (-1);
438
} /* w_search */
439
440
441
/*
442
  Insert new key.
443
444
  SYNOPSIS
445
    _mi_insert()
446
    info                        Open table information.
447
    keyinfo                     Key definition information.
448
    key                         New key.
449
    anc_buff                    Key page (beginning).
450
    key_pos                     Position in key page where to insert.
451
    key_buff                    Copy of previous key.
452
    father_buff                 parent key page for balancing.
453
    father_key_pos              position in parent key page for balancing.
454
    father_page                 position of parent key page in file.
455
    insert_last                 If to append at end of page.
456
457
  DESCRIPTION
458
    Insert new key at right of key_pos.
459
460
  RETURN
461
    2           if key contains key to upper level.
462
    0           OK.
463
    < 0         Error.
464
*/
465
466
int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
467
	       uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
468
               uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
469
	       my_bool insert_last)
470
{
471
  uint a_length,nod_flag;
472
  int t_length;
473
  uchar *endpos, *prev_key;
474
  MI_KEY_PARAM s_temp;
475
  DBUG_ENTER("_mi_insert");
476
  DBUG_PRINT("enter",("key_pos: 0x%lx", (long) key_pos));
477
  DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
478
479
  nod_flag=mi_test_if_nod(anc_buff);
480
  a_length=mi_getint(anc_buff);
481
  endpos= anc_buff+ a_length;
482
  prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
483
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
484
				(key_pos == endpos ? (uchar*) 0 : key_pos),
485
				prev_key, prev_key,
486
				key,&s_temp);
487
#ifndef DBUG_OFF
488
  if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
489
					 (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
490
  {
491
    DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
492
  }
493
  if (keyinfo->flag & HA_PACK_KEY)
494
  {
495
    DBUG_PRINT("test",("t_length: %d  ref_len: %d",
496
		       t_length,s_temp.ref_length));
497
    DBUG_PRINT("test",("n_ref_len: %d  n_length: %d  key_pos: 0x%lx",
498
		       s_temp.n_ref_length,s_temp.n_length, (long) s_temp.key));
499
  }
500
#endif
501
  if (t_length > 0)
502
  {
503
    if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
504
    {
505
      mi_print_error(info->s, HA_ERR_CRASHED);
506
      my_errno=HA_ERR_CRASHED;
507
      DBUG_RETURN(-1);
508
    }
509
    bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,(uint) (endpos-key_pos));
510
  }
511
  else
512
  {
513
    if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
514
    {
515
      mi_print_error(info->s, HA_ERR_CRASHED);
516
      my_errno=HA_ERR_CRASHED;
517
      DBUG_RETURN(-1);
518
    }
519
    bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
520
  }
521
  (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
522
  a_length+=t_length;
523
  mi_putint(anc_buff,a_length,nod_flag);
524
  if (a_length <= keyinfo->block_length)
525
  {
526
    if (keyinfo->block_length - a_length < 32 &&
527
        keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
528
        info->s->base.key_reflength <= info->s->base.rec_reflength &&
529
        info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
530
    {
531
      /*
532
        Normal word. One-level tree. Page is almost full.
533
        Let's consider converting.
534
        We'll compare 'key' and the first key at anc_buff
535
       */
536
      uchar *a=key, *b=anc_buff+2+nod_flag;
537
      uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
538
      /* the very first key on the page is always unpacked */
539
      DBUG_ASSERT((*b & 128) == 0);
540
#if HA_FT_MAXLEN >= 127
541
      blen= mi_uint2korr(b); b+=2;
542
#else
543
      blen= *b++;
544
#endif
545
      get_key_length(alen,a);
546
      DBUG_ASSERT(info->ft1_to_ft2==0);
547
      if (alen == blen &&
548
          ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0, 0)==0)
549
      {
550
        /* yup. converting */
551
        info->ft1_to_ft2=(DYNAMIC_ARRAY *)
552
          my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
553
        my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50);
554
555
        /*
556
          now, adding all keys from the page to dynarray
557
          if the page is a leaf (if not keys will be deleted later)
558
        */
559
        if (!nod_flag)
560
        {
561
          /* let's leave the first key on the page, though, because
562
             we cannot easily dispatch an empty page here */
563
          b+=blen+ft2len+2;
564
          for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
565
            insert_dynamic(info->ft1_to_ft2, b);
566
567
          /* fixing the page's length - it contains only one key now */
568
          mi_putint(anc_buff,2+blen+ft2len+2,0);
569
        }
570
        /* the rest will be done when we're back from recursion */
571
      }
572
    }
573
    DBUG_RETURN(0);				/* There is room on page */
574
  }
575
  /* Page is full */
576
  if (nod_flag)
577
    insert_last=0;
578
  if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
579
      father_buff && !insert_last)
580
    DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
581
				 father_key_pos,father_page));
582
  DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
583
} /* _mi_insert */
584
585
586
	/* split a full page in two and assign emerging item to key */
587
588
int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
589
		   uchar *key, uchar *buff, uchar *key_buff,
590
		   my_bool insert_last_key)
591
{
592
  uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
593
  uchar *key_pos,*pos, *after_key= NULL;
594
  my_off_t new_pos;
595
  MI_KEY_PARAM s_temp;
596
  DBUG_ENTER("mi_split_page");
597
  DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
598
599
  if (info->s->keyinfo+info->lastinx == keyinfo)
600
    info->page_changed=1;			/* Info->buff is used */
601
  info->buff_used=1;
602
  nod_flag=mi_test_if_nod(buff);
603
  key_ref_length=2+nod_flag;
604
  if (insert_last_key)
605
    key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
606
  else
607
    key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
608
			      &after_key);
609
  if (!key_pos)
610
    DBUG_RETURN(-1);
611
612
  length=(uint) (key_pos-buff);
613
  a_length=mi_getint(buff);
614
  mi_putint(buff,length,nod_flag);
615
616
  key_pos=after_key;
617
  if (nod_flag)
618
  {
619
    DBUG_PRINT("test",("Splitting nod"));
620
    pos=key_pos-nod_flag;
621
    memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
622
  }
623
624
	/* Move middle item to key and pointer to new page */
625
  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
626
    DBUG_RETURN(-1);
627
  _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
628
629
	/* Store new page */
630
  if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
631
    DBUG_RETURN(-1);
632
633
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
634
				(uchar*) 0, (uchar*) 0,
635
				key_buff, &s_temp);
636
  length=(uint) ((buff+a_length)-key_pos);
637
  memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
638
	 (size_t) length);
639
  (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
640
  mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
641
642
  if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
643
    DBUG_RETURN(-1);
644
  DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
645
  DBUG_RETURN(2);				/* Middle key up */
646
} /* _mi_split_page */
647
648
649
	/*
650
	  Calculate how to much to move to split a page in two
651
	  Returns pointer to start of key.
652
	  key will contain the key.
653
	  return_key_length will contain the length of key
654
	  after_key will contain the position to where the next key starts
655
	*/
656
657
uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
658
			 uchar *key, uint *return_key_length,
659
			 uchar **after_key)
660
{
661
  uint keys,length,key_ref_length;
662
  uchar *end,*lastpos;
663
  DBUG_ENTER("_mi_find_half_pos");
664
665
  key_ref_length=2+nod_flag;
666
  length=mi_getint(page)-key_ref_length;
667
  page+=key_ref_length;
668
  if (!(keyinfo->flag &
669
	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
670
	 HA_BINARY_PACK_KEY)))
671
  {
672
    key_ref_length=keyinfo->keylength+nod_flag;
673
    keys=length/(key_ref_length*2);
674
    *return_key_length=keyinfo->keylength;
675
    end=page+keys*key_ref_length;
676
    *after_key=end+key_ref_length;
677
    memcpy(key,end,key_ref_length);
678
    DBUG_RETURN(end);
679
  }
680
681
  end=page+length/2-key_ref_length;		/* This is aprox. half */
682
  *key='\0';
683
  do
684
  {
685
    lastpos=page;
686
    if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
687
      DBUG_RETURN(0);
688
  } while (page < end);
689
  *return_key_length=length;
690
  *after_key=page;
691
  DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  half: 0x%lx",
692
                     (long) lastpos, (long) page, (long) end));
693
  DBUG_RETURN(lastpos);
694
} /* _mi_find_half_pos */
695
696
697
	/*
698
	  Split buffer at last key
699
	  Returns pointer to the start of the key before the last key
700
	  key will contain the last key
701
	*/
702
703
static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
704
				uchar *key, uint *return_key_length,
705
				uchar **after_key)
706
{
707
  uint keys, length, last_length, key_ref_length;
708
  uchar *end, *lastpos, *prevpos= NULL;
709
  uchar key_buff[MI_MAX_KEY_BUFF];
710
  DBUG_ENTER("_mi_find_last_pos");
711
712
  key_ref_length=2;
713
  length=mi_getint(page)-key_ref_length;
714
  page+=key_ref_length;
715
  if (!(keyinfo->flag &
716
	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
717
	 HA_BINARY_PACK_KEY)))
718
  {
719
    keys=length/keyinfo->keylength-2;
720
    *return_key_length=length=keyinfo->keylength;
721
    end=page+keys*length;
722
    *after_key=end+length;
723
    memcpy(key,end,length);
724
    DBUG_RETURN(end);
725
  }
726
727
  end= page + length - key_ref_length;
728
  *key='\0';
729
  length=0;
730
  lastpos=page;
731
  while (page < end)
732
  {
733
    prevpos=lastpos; lastpos=page;
734
    last_length=length;
735
    memcpy(key, key_buff, length);		/* previous key */
736
    if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
737
    {
738
      mi_print_error(keyinfo->share, HA_ERR_CRASHED);
739
      my_errno=HA_ERR_CRASHED;
740
      DBUG_RETURN(0);
741
    }
742
  }
743
  *return_key_length=last_length;
744
  *after_key=lastpos;
745
  DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  end: 0x%lx",
746
                     (long) prevpos,(long) page,(long) end));
747
  DBUG_RETURN(prevpos);
748
} /* _mi_find_last_pos */
749
750
751
	/* Balance page with not packed keys with page on right/left */
752
	/* returns 0 if balance was done */
753
754
static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
755
			    uchar *key, uchar *curr_buff, uchar *father_buff,
756
			    uchar *father_key_pos, my_off_t father_page)
757
{
758
  my_bool right;
759
  uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
760
       right_length,left_length,new_right_length,new_left_length,extra_length,
761
       length,keys;
762
  uchar *pos,*buff,*extra_buff;
763
  my_off_t next_page,new_pos;
764
  uchar tmp_part_key[MI_MAX_KEY_BUFF];
765
  DBUG_ENTER("_mi_balance_page");
766
767
  k_length=keyinfo->keylength;
768
  father_length=mi_getint(father_buff);
769
  father_keylength=k_length+info->s->base.key_reflength;
770
  nod_flag=mi_test_if_nod(curr_buff);
771
  curr_keylength=k_length+nod_flag;
772
  info->page_changed=1;
773
774
  if ((father_key_pos != father_buff+father_length &&
775
       (info->state->records & 1)) ||
776
      father_key_pos == father_buff+2+info->s->base.key_reflength)
777
  {
778
    right=1;
779
    next_page= _mi_kpos(info->s->base.key_reflength,
780
			father_key_pos+father_keylength);
781
    buff=info->buff;
782
    DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
783
  }
784
  else
785
  {
786
    right=0;
787
    father_key_pos-=father_keylength;
788
    next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
789
					/* Fix that curr_buff is to left */
790
    buff=curr_buff; curr_buff=info->buff;
791
    DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
792
  }					/* father_key_pos ptr to parting key */
793
794
  if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
795
    goto err;
796
  DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
797
798
	/* Test if there is room to share keys */
799
800
  left_length=mi_getint(curr_buff);
801
  right_length=mi_getint(buff);
802
  keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
803
804
  if ((right ? right_length : left_length) + curr_keylength <=
805
      keyinfo->block_length)
806
  {						/* Merge buffs */
807
    new_left_length=2+nod_flag+(keys/2)*curr_keylength;
808
    new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
809
    mi_putint(curr_buff,new_left_length,nod_flag);
810
    mi_putint(buff,new_right_length,nod_flag);
811
812
    if (left_length < new_left_length)
813
    {						/* Move keys buff -> leaf */
814
      pos=curr_buff+left_length;
815
      memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
816
      memcpy((uchar*) pos+k_length, (uchar*) buff+2,
817
	     (size_t) (length=new_left_length - left_length - k_length));
818
      pos=buff+2+length;
819
      memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
820
      bmove((uchar*) buff+2,(uchar*) pos+k_length,new_right_length);
821
    }
822
    else
823
    {						/* Move keys -> buff */
824
825
      bmove_upp((uchar*) buff+new_right_length,(uchar*) buff+right_length,
826
		right_length-2);
827
      length=new_right_length-right_length-k_length;
828
      memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
829
      pos=curr_buff+new_left_length;
830
      memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
831
      memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
832
    }
833
834
    if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
835
	_mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
836
      goto err;
837
    DBUG_RETURN(0);
838
  }
839
840
	/* curr_buff[] and buff[] are full, lets split and make new nod */
841
842
  extra_buff=info->buff+info->s->base.max_key_block_length;
843
  new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
844
  if (keys == 5)				/* Too few keys to balance */
845
    new_left_length-=curr_keylength;
846
  extra_length=nod_flag+left_length+right_length-
847
    new_left_length-new_right_length-curr_keylength;
848
  DBUG_PRINT("info",("left_length: %d  right_length: %d  new_left_length: %d  new_right_length: %d  extra_length: %d",
849
		     left_length, right_length,
850
		     new_left_length, new_right_length,
851
		     extra_length));
852
  mi_putint(curr_buff,new_left_length,nod_flag);
853
  mi_putint(buff,new_right_length,nod_flag);
854
  mi_putint(extra_buff,extra_length+2,nod_flag);
855
856
  /* move first largest keys to new page  */
857
  pos=buff+right_length-extra_length;
858
  memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
859
  /* Save new parting key */
860
  memcpy(tmp_part_key, pos-k_length,k_length);
861
  /* Make place for new keys */
862
  bmove_upp((uchar*) buff+new_right_length,(uchar*) pos-k_length,
863
	    right_length-extra_length-k_length-2);
864
  /* Copy keys from left page */
865
  pos= curr_buff+new_left_length;
866
  memcpy((uchar*) buff+2,(uchar*) pos+k_length,
867
	 (size_t) (length=left_length-new_left_length-k_length));
868
  /* Copy old parting key */
869
  memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
870
871
  /* Move new parting keys up to caller */
872
  memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
873
  memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
874
875
  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
876
    goto err;
877
  _mi_kpointer(info,key+k_length,new_pos);
878
  if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
879
			DFLT_INIT_HITS,info->buff) ||
880
      _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
881
                        DFLT_INIT_HITS,extra_buff))
882
    goto err;
883
884
  DBUG_RETURN(1);				/* Middle key up */
885
886
err:
887
  DBUG_RETURN(-1);
888
} /* _mi_balance_page */
889
890
/**********************************************************************
891
 *                Bulk insert code                                    *
892
 **********************************************************************/
893
894
typedef struct {
895
  MI_INFO *info;
896
  uint keynr;
897
} bulk_insert_param;
898
899
int _mi_ck_write_tree(register MI_INFO *info, uint keynr, uchar *key,
900
		      uint key_length)
901
{
902
  int error;
903
  DBUG_ENTER("_mi_ck_write_tree");
904
905
  error= tree_insert(&info->bulk_insert[keynr], key,
906
         key_length + info->s->rec_reflength,
907
         info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
908
909
  DBUG_RETURN(error);
910
} /* _mi_ck_write_tree */
911
912
913
/* typeof(_mi_keys_compare)=qsort_cmp2 */
914
915
static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
916
{
917
  uint not_used[2];
918
  return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
919
                    key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
920
                    not_used);
921
}
922
923
924
static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
925
{
926
  /*
927
    Probably I can use info->lastkey here, but I'm not sure,
928
    and to be safe I'd better use local lastkey.
929
  */
930
  uchar lastkey[MI_MAX_KEY_BUFF];
931
  uint keylen;
932
  MI_KEYDEF *keyinfo;
933
934
  switch (mode) {
935
  case free_init:
936
    if (param->info->s->concurrent_insert)
937
    {
938
      rw_wrlock(&param->info->s->key_root_lock[param->keynr]);
939
      param->info->s->keyinfo[param->keynr].version++;
940
    }
941
    return 0;
942
  case free_free:
943
    keyinfo=param->info->s->keyinfo+param->keynr;
944
    keylen=_mi_keylength(keyinfo, key);
945
    memcpy(lastkey, key, keylen);
946
    return _mi_ck_write_btree(param->info,param->keynr,lastkey,
947
			      keylen - param->info->s->rec_reflength);
948
  case free_end:
949
    if (param->info->s->concurrent_insert)
950
      rw_unlock(&param->info->s->key_root_lock[param->keynr]);
951
    return 0;
952
  }
953
  return -1;
954
}
955
956
957
int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows)
958
{
959
  MYISAM_SHARE *share=info->s;
960
  MI_KEYDEF *key=share->keyinfo;
961
  bulk_insert_param *params;
962
  uint i, num_keys, total_keylength;
963
  ulonglong key_map;
964
  DBUG_ENTER("_mi_init_bulk_insert");
965
  DBUG_PRINT("enter",("cache_size: %lu", cache_size));
966
967
  DBUG_ASSERT(!info->bulk_insert &&
968
	      (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
969
970
  mi_clear_all_keys_active(key_map);
971
  for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
972
  {
973
    if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
974
        mi_is_key_active(share->state.key_map, i))
975
    {
976
      num_keys++;
977
      mi_set_key_active(key_map, i);
978
      total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
979
    }
980
  }
981
982
  if (num_keys==0 ||
983
      num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
984
    DBUG_RETURN(0);
985
986
  if (rows && rows*total_keylength < cache_size)
987
    cache_size= (ulong)rows;
988
  else
989
    cache_size/=total_keylength*16;
990
991
  info->bulk_insert=(TREE *)
992
    my_malloc((sizeof(TREE)*share->base.keys+
993
               sizeof(bulk_insert_param)*num_keys),MYF(0));
994
995
  if (!info->bulk_insert)
996
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
997
998
  params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
999
  for (i=0 ; i < share->base.keys ; i++)
1000
  {
1001
    if (mi_is_key_active(key_map, i))
1002
    {
1003
      params->info=info;
1004
      params->keynr=i;
1005
      /* Only allocate a 16'th of the buffer at a time */
1006
      init_tree(&info->bulk_insert[i],
1007
                cache_size * key[i].maxlength,
1008
                cache_size * key[i].maxlength, 0,
1009
		(qsort_cmp2)keys_compare, 0,
1010
		(tree_element_free) keys_free, (void *)params++);
1011
    }
1012
    else
1013
     info->bulk_insert[i].root=0;
1014
  }
1015
1016
  DBUG_RETURN(0);
1017
}
1018
1019
void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1020
{
1021
  if (info->bulk_insert)
1022
  {
1023
    if (is_tree_inited(&info->bulk_insert[inx]))
1024
      reset_tree(&info->bulk_insert[inx]);
1025
  }
1026
}
1027
1028
void mi_end_bulk_insert(MI_INFO *info)
1029
{
1030
  if (info->bulk_insert)
1031
  {
1032
    uint i;
1033
    for (i=0 ; i < info->s->base.keys ; i++)
1034
    {
1035
      if (is_tree_inited(& info->bulk_insert[i]))
1036
      {
1037
        delete_tree(& info->bulk_insert[i]);
1038
      }
1039
    }
1040
    my_free((void *)info->bulk_insert, MYF(0));
1041
    info->bulk_insert=0;
1042
  }
1043
}