~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/myisam/mi_write.c

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/* Write a row to a MyISAM table */
 
17
 
 
18
#include "fulltext.h"
 
19
#include "rt_index.h"
 
20
 
 
21
#define MAX_POINTER_LENGTH 8
 
22
 
 
23
        /* Functions declared in this file */
 
24
 
 
25
static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
 
26
                    uint comp_flag, uchar *key,
 
27
                    uint key_length, my_off_t pos, uchar *father_buff,
 
28
                    uchar *father_keypos, my_off_t father_page,
 
29
                    my_bool insert_last);
 
30
static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
 
31
                            uchar *curr_buff,uchar *father_buff,
 
32
                            uchar *father_keypos,my_off_t father_page);
 
33
static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
 
34
                                uchar *key, uint *return_key_length,
 
35
                                uchar **after_key);
 
36
int _mi_ck_write_tree(register MI_INFO *info, uint keynr,uchar *key,
 
37
                      uint key_length);
 
38
int _mi_ck_write_btree(register MI_INFO *info, uint keynr,uchar *key,
 
39
                       uint key_length);
 
40
 
 
41
        /* Write new record to database */
 
42
 
 
43
int mi_write(MI_INFO *info, uchar *record)
 
44
{
 
45
  MYISAM_SHARE *share=info->s;
 
46
  uint i;
 
47
  int save_errno;
 
48
  my_off_t filepos;
 
49
  uchar *buff;
 
50
  my_bool lock_tree= share->concurrent_insert;
 
51
  DBUG_ENTER("mi_write");
 
52
  DBUG_PRINT("enter",("isam: %d  data: %d",info->s->kfile,info->dfile));
 
53
 
 
54
  DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
 
55
                  mi_print_error(info->s, HA_ERR_CRASHED);
 
56
                  DBUG_RETURN(my_errno= HA_ERR_CRASHED););
 
57
  if (share->options & HA_OPTION_READ_ONLY_DATA)
 
58
  {
 
59
    DBUG_RETURN(my_errno=EACCES);
 
60
  }
 
61
  if (_mi_readinfo(info,F_WRLCK,1))
 
62
    DBUG_RETURN(my_errno);
 
63
  dont_break();                         /* Dont allow SIGHUP or SIGINT */
 
64
#if !defined(NO_LOCKING) && defined(USE_RECORD_LOCK)
 
65
  if (!info->locked && my_lock(info->dfile,F_WRLCK,0L,F_TO_EOF,
 
66
                               MYF(MY_SEEK_NOT_DONE) | info->lock_wait))
 
67
    goto err;
 
68
#endif
 
69
  filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
 
70
             !info->append_insert_at_end) ?
 
71
            share->state.dellink :
 
72
            info->state->data_file_length);
 
73
 
 
74
  if (share->base.reloc == (ha_rows) 1 &&
 
75
      share->base.records == (ha_rows) 1 &&
 
76
      info->state->records == (ha_rows) 1)
 
77
  {                                             /* System file */
 
78
    my_errno=HA_ERR_RECORD_FILE_FULL;
 
79
    goto err2;
 
80
  }
 
81
  if (info->state->key_file_length >= share->base.margin_key_file_length)
 
82
  {
 
83
    my_errno=HA_ERR_INDEX_FILE_FULL;
 
84
    goto err2;
 
85
  }
 
86
  if (_mi_mark_file_changed(info))
 
87
    goto err2;
 
88
 
 
89
  /* Calculate and check all unique constraints */
 
90
  for (i=0 ; i < share->state.header.uniques ; i++)
 
91
  {
 
92
    if (mi_check_unique(info,share->uniqueinfo+i,record,
 
93
                     mi_unique_hash(share->uniqueinfo+i,record),
 
94
                     HA_OFFSET_ERROR))
 
95
      goto err2;
 
96
  }
 
97
 
 
98
        /* Write all keys to indextree */
 
99
 
 
100
  buff=info->lastkey2;
 
101
  for (i=0 ; i < share->base.keys ; i++)
 
102
  {
 
103
    if (mi_is_key_active(share->state.key_map, i))
 
104
    {
 
105
      my_bool local_lock_tree= (lock_tree &&
 
106
                                !(info->bulk_insert &&
 
107
                                  is_tree_inited(&info->bulk_insert[i])));
 
108
      if (local_lock_tree)
 
109
      {
 
110
        rw_wrlock(&share->key_root_lock[i]);
 
111
        share->keyinfo[i].version++;
 
112
      }
 
113
      if (share->keyinfo[i].flag & HA_FULLTEXT )
 
114
      {
 
115
        if (_mi_ft_add(info,i, buff, record, filepos))
 
116
        {
 
117
          if (local_lock_tree)
 
118
            rw_unlock(&share->key_root_lock[i]);
 
119
          DBUG_PRINT("error",("Got error: %d on write",my_errno));
 
120
          goto err;
 
121
        }
 
122
      }
 
123
      else
 
124
      {
 
125
        if (share->keyinfo[i].ck_insert(info,i,buff,
 
126
                        _mi_make_key(info,i,buff,record,filepos)))
 
127
        {
 
128
          if (local_lock_tree)
 
129
            rw_unlock(&share->key_root_lock[i]);
 
130
          DBUG_PRINT("error",("Got error: %d on write",my_errno));
 
131
          goto err;
 
132
        }
 
133
      }
 
134
 
 
135
      /* The above changed info->lastkey2. Inform mi_rnext_same(). */
 
136
      info->update&= ~HA_STATE_RNEXT_SAME;
 
137
 
 
138
      if (local_lock_tree)
 
139
        rw_unlock(&share->key_root_lock[i]);
 
140
    }
 
141
  }
 
142
  if (share->calc_checksum)
 
143
    info->checksum=(*share->calc_checksum)(info,record);
 
144
  if (!(info->opt_flag & OPT_NO_ROWS))
 
145
  {
 
146
    if ((*share->write_record)(info,record))
 
147
      goto err;
 
148
    info->state->checksum+=info->checksum;
 
149
  }
 
150
  if (share->base.auto_key)
 
151
    set_if_bigger(info->s->state.auto_increment,
 
152
                  retrieve_auto_increment(info, record));
 
153
  info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
 
154
                 HA_STATE_ROW_CHANGED);
 
155
  info->state->records++;
 
156
  info->lastpos=filepos;
 
157
  myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
 
158
  VOID(_mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE));
 
159
  if (info->invalidator != 0)
 
160
  {
 
161
    DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
 
162
    (*info->invalidator)(info->filename);
 
163
    info->invalidator=0;
 
164
  }
 
165
 
 
166
  /*
 
167
    Update status of the table. We need to do so after each row write
 
168
    for the log tables, as we want the new row to become visible to
 
169
    other threads as soon as possible. We don't lock mutex here
 
170
    (as it is required by pthread memory visibility rules) as (1) it's
 
171
    not critical to use outdated share->is_log_table value (2) locking
 
172
    mutex here for every write is too expensive.
 
173
  */
 
174
  if (share->is_log_table)
 
175
    mi_update_status((void*) info);
 
176
 
 
177
  allow_break();                                /* Allow SIGHUP & SIGINT */
 
178
  DBUG_RETURN(0);
 
179
 
 
180
err:
 
181
  save_errno=my_errno;
 
182
  if (my_errno == HA_ERR_FOUND_DUPP_KEY || my_errno == HA_ERR_RECORD_FILE_FULL ||
 
183
      my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_OUT_OF_MEM)
 
184
  {
 
185
    if (info->bulk_insert)
 
186
    {
 
187
      uint j;
 
188
      for (j=0 ; j < share->base.keys ; j++)
 
189
        mi_flush_bulk_insert(info, j);
 
190
    }
 
191
    info->errkey= (int) i;
 
192
    while ( i-- > 0)
 
193
    {
 
194
      if (mi_is_key_active(share->state.key_map, i))
 
195
      {
 
196
        my_bool local_lock_tree= (lock_tree &&
 
197
                                  !(info->bulk_insert &&
 
198
                                    is_tree_inited(&info->bulk_insert[i])));
 
199
        if (local_lock_tree)
 
200
          rw_wrlock(&share->key_root_lock[i]);
 
201
        if (share->keyinfo[i].flag & HA_FULLTEXT)
 
202
        {
 
203
          if (_mi_ft_del(info,i, buff,record,filepos))
 
204
          {
 
205
            if (local_lock_tree)
 
206
              rw_unlock(&share->key_root_lock[i]);
 
207
            break;
 
208
          }
 
209
        }
 
210
        else
 
211
        {
 
212
          uint key_length=_mi_make_key(info,i,buff,record,filepos);
 
213
          if (_mi_ck_delete(info,i,buff,key_length))
 
214
          {
 
215
            if (local_lock_tree)
 
216
              rw_unlock(&share->key_root_lock[i]);
 
217
            break;
 
218
          }
 
219
        }
 
220
        if (local_lock_tree)
 
221
          rw_unlock(&share->key_root_lock[i]);
 
222
      }
 
223
    }
 
224
  }
 
225
  else
 
226
  {
 
227
    mi_print_error(info->s, HA_ERR_CRASHED);
 
228
    mi_mark_crashed(info);
 
229
  }
 
230
  info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
 
231
  my_errno=save_errno;
 
232
err2:
 
233
  save_errno=my_errno;
 
234
  myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno);
 
235
  VOID(_mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
 
236
  allow_break();                        /* Allow SIGHUP & SIGINT */
 
237
  DBUG_RETURN(my_errno=save_errno);
 
238
} /* mi_write */
 
239
 
 
240
 
 
241
        /* Write one key to btree */
 
242
 
 
243
int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
 
244
{
 
245
  DBUG_ENTER("_mi_ck_write");
 
246
 
 
247
  if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
 
248
  {
 
249
    DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
 
250
  }
 
251
  else
 
252
  {
 
253
    DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
 
254
  }
 
255
} /* _mi_ck_write */
 
256
 
 
257
 
 
258
/**********************************************************************
 
259
 *                Normal insert code                                  *
 
260
 **********************************************************************/
 
261
 
 
262
int _mi_ck_write_btree(register MI_INFO *info, uint keynr, uchar *key,
 
263
                       uint key_length)
 
264
{
 
265
  int error;
 
266
  uint comp_flag;
 
267
  MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
 
268
  my_off_t  *root=&info->s->state.key_root[keynr];
 
269
  DBUG_ENTER("_mi_ck_write_btree");
 
270
 
 
271
  if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
 
272
    comp_flag=SEARCH_BIGGER;                    /* Put after same key */
 
273
  else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
 
274
  {
 
275
    comp_flag=SEARCH_FIND | SEARCH_UPDATE;      /* No duplicates */
 
276
    if (keyinfo->flag & HA_NULL_ARE_EQUAL)
 
277
      comp_flag|= SEARCH_NULL_ARE_EQUAL;
 
278
  }
 
279
  else
 
280
    comp_flag=SEARCH_SAME;                      /* Keys in rec-pos order */
 
281
 
 
282
  error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
 
283
                                root, comp_flag);
 
284
  if (info->ft1_to_ft2)
 
285
  {
 
286
    if (!error)
 
287
      error= _mi_ft_convert_to_ft2(info, keynr, key);
 
288
    delete_dynamic(info->ft1_to_ft2);
 
289
    my_free((uchar*)info->ft1_to_ft2, MYF(0));
 
290
    info->ft1_to_ft2=0;
 
291
  }
 
292
  DBUG_RETURN(error);
 
293
} /* _mi_ck_write_btree */
 
294
 
 
295
int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
 
296
    uchar *key, uint key_length, my_off_t *root, uint comp_flag)
 
297
{
 
298
  int error;
 
299
  DBUG_ENTER("_mi_ck_real_write_btree");
 
300
  /* key_length parameter is used only if comp_flag is SEARCH_FIND */
 
301
  if (*root == HA_OFFSET_ERROR ||
 
302
      (error=w_search(info, keyinfo, comp_flag, key, key_length,
 
303
                      *root, (uchar *) 0, (uchar*) 0,
 
304
                      (my_off_t) 0, 1)) > 0)
 
305
    error=_mi_enlarge_root(info,keyinfo,key,root);
 
306
  DBUG_RETURN(error);
 
307
} /* _mi_ck_real_write_btree */
 
308
 
 
309
 
 
310
        /* Make a new root with key as only pointer */
 
311
 
 
312
int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
 
313
                     my_off_t *root)
 
314
{
 
315
  uint t_length,nod_flag;
 
316
  MI_KEY_PARAM s_temp;
 
317
  MYISAM_SHARE *share=info->s;
 
318
  DBUG_ENTER("_mi_enlarge_root");
 
319
 
 
320
  nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
 
321
  _mi_kpointer(info,info->buff+2,*root); /* if nod */
 
322
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
 
323
                                (uchar*) 0, (uchar*) 0, key,&s_temp);
 
324
  mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
 
325
  (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
 
326
  info->buff_used=info->page_changed=1;         /* info->buff is used */
 
327
  if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
 
328
      _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
 
329
    DBUG_RETURN(-1);
 
330
  DBUG_RETURN(0);
 
331
} /* _mi_enlarge_root */
 
332
 
 
333
 
 
334
        /*
 
335
          Search after a position for a key and store it there
 
336
          Returns -1 = error
 
337
                   0  = ok
 
338
                   1  = key should be stored in higher tree
 
339
        */
 
340
 
 
341
static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
 
342
                    uint comp_flag, uchar *key, uint key_length, my_off_t page,
 
343
                    uchar *father_buff, uchar *father_keypos,
 
344
                    my_off_t father_page, my_bool insert_last)
 
345
{
 
346
  int error,flag;
 
347
  uint nod_flag, search_key_length;
 
348
  uchar *temp_buff,*keypos;
 
349
  uchar keybuff[MI_MAX_KEY_BUFF];
 
350
  my_bool was_last_key;
 
351
  my_off_t next_page, dupp_key_pos;
 
352
  DBUG_ENTER("w_search");
 
353
  DBUG_PRINT("enter",("page: %ld", (long) page));
 
354
 
 
355
  search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
 
356
  if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
 
357
                                      MI_MAX_KEY_BUFF*2)))
 
358
    DBUG_RETURN(-1);
 
359
  if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
 
360
    goto err;
 
361
 
 
362
  flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
 
363
                              comp_flag, &keypos, keybuff, &was_last_key);
 
364
  nod_flag=mi_test_if_nod(temp_buff);
 
365
  if (flag == 0)
 
366
  {
 
367
    uint tmp_key_length;
 
368
        /* get position to record with duplicated key */
 
369
    tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
 
370
    if (tmp_key_length)
 
371
      dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
 
372
    else
 
373
      dupp_key_pos= HA_OFFSET_ERROR;
 
374
 
 
375
    if (keyinfo->flag & HA_FULLTEXT)
 
376
    {
 
377
      uint off;
 
378
      int  subkeys;
 
379
 
 
380
      get_key_full_length_rdonly(off, keybuff);
 
381
      subkeys=ft_sintXkorr(keybuff+off);
 
382
      comp_flag=SEARCH_SAME;
 
383
      if (subkeys >= 0)
 
384
      {
 
385
        /* normal word, one-level tree structure */
 
386
        flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
 
387
                                    USE_WHOLE_KEY, comp_flag,
 
388
                                    &keypos, keybuff, &was_last_key);
 
389
      }
 
390
      else
 
391
      {
 
392
        /* popular word. two-level tree. going down */
 
393
        my_off_t root=dupp_key_pos;
 
394
        keyinfo=&info->s->ft2_keyinfo;
 
395
        get_key_full_length_rdonly(off, key);
 
396
        key+=off;
 
397
        keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
 
398
        error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
 
399
                                      &root, comp_flag);
 
400
        _mi_dpointer(info, keypos+HA_FT_WLEN, root);
 
401
        subkeys--; /* should there be underflow protection ? */
 
402
        DBUG_ASSERT(subkeys < 0);
 
403
        ft_intXstore(keypos, subkeys);
 
404
        if (!error)
 
405
          error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
 
406
        my_afree((uchar*) temp_buff);
 
407
        DBUG_RETURN(error);
 
408
      }
 
409
    }
 
410
    else /* not HA_FULLTEXT, normal HA_NOSAME key */
 
411
    {
 
412
      info->dupp_key_pos= dupp_key_pos;
 
413
      my_afree((uchar*) temp_buff);
 
414
      my_errno=HA_ERR_FOUND_DUPP_KEY;
 
415
      DBUG_RETURN(-1);
 
416
    }
 
417
  }
 
418
  if (flag == MI_FOUND_WRONG_KEY)
 
419
    DBUG_RETURN(-1);
 
420
  if (!was_last_key)
 
421
    insert_last=0;
 
422
  next_page=_mi_kpos(nod_flag,keypos);
 
423
  if (next_page == HA_OFFSET_ERROR ||
 
424
      (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
 
425
                      temp_buff, keypos, page, insert_last)) >0)
 
426
  {
 
427
    error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
 
428
                     father_keypos,father_page, insert_last);
 
429
    if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
 
430
      goto err;
 
431
  }
 
432
  my_afree((uchar*) temp_buff);
 
433
  DBUG_RETURN(error);
 
434
err:
 
435
  my_afree((uchar*) temp_buff);
 
436
  DBUG_PRINT("exit",("Error: %d",my_errno));
 
437
  DBUG_RETURN (-1);
 
438
} /* w_search */
 
439
 
 
440
 
 
441
/*
 
442
  Insert new key.
 
443
 
 
444
  SYNOPSIS
 
445
    _mi_insert()
 
446
    info                        Open table information.
 
447
    keyinfo                     Key definition information.
 
448
    key                         New key.
 
449
    anc_buff                    Key page (beginning).
 
450
    key_pos                     Position in key page where to insert.
 
451
    key_buff                    Copy of previous key.
 
452
    father_buff                 parent key page for balancing.
 
453
    father_key_pos              position in parent key page for balancing.
 
454
    father_page                 position of parent key page in file.
 
455
    insert_last                 If to append at end of page.
 
456
 
 
457
  DESCRIPTION
 
458
    Insert new key at right of key_pos.
 
459
 
 
460
  RETURN
 
461
    2           if key contains key to upper level.
 
462
    0           OK.
 
463
    < 0         Error.
 
464
*/
 
465
 
 
466
int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
 
467
               uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
 
468
               uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
 
469
               my_bool insert_last)
 
470
{
 
471
  uint a_length,nod_flag;
 
472
  int t_length;
 
473
  uchar *endpos, *prev_key;
 
474
  MI_KEY_PARAM s_temp;
 
475
  DBUG_ENTER("_mi_insert");
 
476
  DBUG_PRINT("enter",("key_pos: 0x%lx", (long) key_pos));
 
477
  DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
 
478
 
 
479
  nod_flag=mi_test_if_nod(anc_buff);
 
480
  a_length=mi_getint(anc_buff);
 
481
  endpos= anc_buff+ a_length;
 
482
  prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
 
483
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
 
484
                                (key_pos == endpos ? (uchar*) 0 : key_pos),
 
485
                                prev_key, prev_key,
 
486
                                key,&s_temp);
 
487
#ifndef DBUG_OFF
 
488
  if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
 
489
                                         (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
 
490
  {
 
491
    DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
 
492
  }
 
493
  if (keyinfo->flag & HA_PACK_KEY)
 
494
  {
 
495
    DBUG_PRINT("test",("t_length: %d  ref_len: %d",
 
496
                       t_length,s_temp.ref_length));
 
497
    DBUG_PRINT("test",("n_ref_len: %d  n_length: %d  key_pos: 0x%lx",
 
498
                       s_temp.n_ref_length,s_temp.n_length, (long) s_temp.key));
 
499
  }
 
500
#endif
 
501
  if (t_length > 0)
 
502
  {
 
503
    if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
 
504
    {
 
505
      mi_print_error(info->s, HA_ERR_CRASHED);
 
506
      my_errno=HA_ERR_CRASHED;
 
507
      DBUG_RETURN(-1);
 
508
    }
 
509
    bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,(uint) (endpos-key_pos));
 
510
  }
 
511
  else
 
512
  {
 
513
    if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
 
514
    {
 
515
      mi_print_error(info->s, HA_ERR_CRASHED);
 
516
      my_errno=HA_ERR_CRASHED;
 
517
      DBUG_RETURN(-1);
 
518
    }
 
519
    bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
 
520
  }
 
521
  (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
 
522
  a_length+=t_length;
 
523
  mi_putint(anc_buff,a_length,nod_flag);
 
524
  if (a_length <= keyinfo->block_length)
 
525
  {
 
526
    if (keyinfo->block_length - a_length < 32 &&
 
527
        keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
 
528
        info->s->base.key_reflength <= info->s->base.rec_reflength &&
 
529
        info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
 
530
    {
 
531
      /*
 
532
        Normal word. One-level tree. Page is almost full.
 
533
        Let's consider converting.
 
534
        We'll compare 'key' and the first key at anc_buff
 
535
       */
 
536
      uchar *a=key, *b=anc_buff+2+nod_flag;
 
537
      uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
 
538
      /* the very first key on the page is always unpacked */
 
539
      DBUG_ASSERT((*b & 128) == 0);
 
540
#if HA_FT_MAXLEN >= 127
 
541
      blen= mi_uint2korr(b); b+=2;
 
542
#else
 
543
      blen= *b++;
 
544
#endif
 
545
      get_key_length(alen,a);
 
546
      DBUG_ASSERT(info->ft1_to_ft2==0);
 
547
      if (alen == blen &&
 
548
          ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0, 0)==0)
 
549
      {
 
550
        /* yup. converting */
 
551
        info->ft1_to_ft2=(DYNAMIC_ARRAY *)
 
552
          my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
 
553
        my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50);
 
554
 
 
555
        /*
 
556
          now, adding all keys from the page to dynarray
 
557
          if the page is a leaf (if not keys will be deleted later)
 
558
        */
 
559
        if (!nod_flag)
 
560
        {
 
561
          /* let's leave the first key on the page, though, because
 
562
             we cannot easily dispatch an empty page here */
 
563
          b+=blen+ft2len+2;
 
564
          for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
 
565
            insert_dynamic(info->ft1_to_ft2, b);
 
566
 
 
567
          /* fixing the page's length - it contains only one key now */
 
568
          mi_putint(anc_buff,2+blen+ft2len+2,0);
 
569
        }
 
570
        /* the rest will be done when we're back from recursion */
 
571
      }
 
572
    }
 
573
    DBUG_RETURN(0);                             /* There is room on page */
 
574
  }
 
575
  /* Page is full */
 
576
  if (nod_flag)
 
577
    insert_last=0;
 
578
  if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
 
579
      father_buff && !insert_last)
 
580
    DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
 
581
                                 father_key_pos,father_page));
 
582
  DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
 
583
} /* _mi_insert */
 
584
 
 
585
 
 
586
        /* split a full page in two and assign emerging item to key */
 
587
 
 
588
int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
 
589
                   uchar *key, uchar *buff, uchar *key_buff,
 
590
                   my_bool insert_last_key)
 
591
{
 
592
  uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
 
593
  uchar *key_pos,*pos, *after_key= NULL;
 
594
  my_off_t new_pos;
 
595
  MI_KEY_PARAM s_temp;
 
596
  DBUG_ENTER("mi_split_page");
 
597
  DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
 
598
 
 
599
  if (info->s->keyinfo+info->lastinx == keyinfo)
 
600
    info->page_changed=1;                       /* Info->buff is used */
 
601
  info->buff_used=1;
 
602
  nod_flag=mi_test_if_nod(buff);
 
603
  key_ref_length=2+nod_flag;
 
604
  if (insert_last_key)
 
605
    key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
 
606
  else
 
607
    key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
 
608
                              &after_key);
 
609
  if (!key_pos)
 
610
    DBUG_RETURN(-1);
 
611
 
 
612
  length=(uint) (key_pos-buff);
 
613
  a_length=mi_getint(buff);
 
614
  mi_putint(buff,length,nod_flag);
 
615
 
 
616
  key_pos=after_key;
 
617
  if (nod_flag)
 
618
  {
 
619
    DBUG_PRINT("test",("Splitting nod"));
 
620
    pos=key_pos-nod_flag;
 
621
    memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
 
622
  }
 
623
 
 
624
        /* Move middle item to key and pointer to new page */
 
625
  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
 
626
    DBUG_RETURN(-1);
 
627
  _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
 
628
 
 
629
        /* Store new page */
 
630
  if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
 
631
    DBUG_RETURN(-1);
 
632
 
 
633
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
 
634
                                (uchar*) 0, (uchar*) 0,
 
635
                                key_buff, &s_temp);
 
636
  length=(uint) ((buff+a_length)-key_pos);
 
637
  memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
 
638
         (size_t) length);
 
639
  (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
 
640
  mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
 
641
 
 
642
  if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
 
643
    DBUG_RETURN(-1);
 
644
  DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
 
645
  DBUG_RETURN(2);                               /* Middle key up */
 
646
} /* _mi_split_page */
 
647
 
 
648
 
 
649
        /*
 
650
          Calculate how to much to move to split a page in two
 
651
          Returns pointer to start of key.
 
652
          key will contain the key.
 
653
          return_key_length will contain the length of key
 
654
          after_key will contain the position to where the next key starts
 
655
        */
 
656
 
 
657
uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
 
658
                         uchar *key, uint *return_key_length,
 
659
                         uchar **after_key)
 
660
{
 
661
  uint keys,length,key_ref_length;
 
662
  uchar *end,*lastpos;
 
663
  DBUG_ENTER("_mi_find_half_pos");
 
664
 
 
665
  key_ref_length=2+nod_flag;
 
666
  length=mi_getint(page)-key_ref_length;
 
667
  page+=key_ref_length;
 
668
  if (!(keyinfo->flag &
 
669
        (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
 
670
         HA_BINARY_PACK_KEY)))
 
671
  {
 
672
    key_ref_length=keyinfo->keylength+nod_flag;
 
673
    keys=length/(key_ref_length*2);
 
674
    *return_key_length=keyinfo->keylength;
 
675
    end=page+keys*key_ref_length;
 
676
    *after_key=end+key_ref_length;
 
677
    memcpy(key,end,key_ref_length);
 
678
    DBUG_RETURN(end);
 
679
  }
 
680
 
 
681
  end=page+length/2-key_ref_length;             /* This is aprox. half */
 
682
  *key='\0';
 
683
  do
 
684
  {
 
685
    lastpos=page;
 
686
    if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
 
687
      DBUG_RETURN(0);
 
688
  } while (page < end);
 
689
  *return_key_length=length;
 
690
  *after_key=page;
 
691
  DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  half: 0x%lx",
 
692
                     (long) lastpos, (long) page, (long) end));
 
693
  DBUG_RETURN(lastpos);
 
694
} /* _mi_find_half_pos */
 
695
 
 
696
 
 
697
        /*
 
698
          Split buffer at last key
 
699
          Returns pointer to the start of the key before the last key
 
700
          key will contain the last key
 
701
        */
 
702
 
 
703
static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
 
704
                                uchar *key, uint *return_key_length,
 
705
                                uchar **after_key)
 
706
{
 
707
  uint keys, length, last_length, key_ref_length;
 
708
  uchar *end, *lastpos, *prevpos= NULL;
 
709
  uchar key_buff[MI_MAX_KEY_BUFF];
 
710
  DBUG_ENTER("_mi_find_last_pos");
 
711
 
 
712
  key_ref_length=2;
 
713
  length=mi_getint(page)-key_ref_length;
 
714
  page+=key_ref_length;
 
715
  if (!(keyinfo->flag &
 
716
        (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
 
717
         HA_BINARY_PACK_KEY)))
 
718
  {
 
719
    keys=length/keyinfo->keylength-2;
 
720
    *return_key_length=length=keyinfo->keylength;
 
721
    end=page+keys*length;
 
722
    *after_key=end+length;
 
723
    memcpy(key,end,length);
 
724
    DBUG_RETURN(end);
 
725
  }
 
726
 
 
727
  end= page + length - key_ref_length;
 
728
  *key='\0';
 
729
  length=0;
 
730
  lastpos=page;
 
731
  while (page < end)
 
732
  {
 
733
    prevpos=lastpos; lastpos=page;
 
734
    last_length=length;
 
735
    memcpy(key, key_buff, length);              /* previous key */
 
736
    if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
 
737
    {
 
738
      mi_print_error(keyinfo->share, HA_ERR_CRASHED);
 
739
      my_errno=HA_ERR_CRASHED;
 
740
      DBUG_RETURN(0);
 
741
    }
 
742
  }
 
743
  *return_key_length=last_length;
 
744
  *after_key=lastpos;
 
745
  DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  end: 0x%lx",
 
746
                     (long) prevpos,(long) page,(long) end));
 
747
  DBUG_RETURN(prevpos);
 
748
} /* _mi_find_last_pos */
 
749
 
 
750
 
 
751
        /* Balance page with not packed keys with page on right/left */
 
752
        /* returns 0 if balance was done */
 
753
 
 
754
static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
 
755
                            uchar *key, uchar *curr_buff, uchar *father_buff,
 
756
                            uchar *father_key_pos, my_off_t father_page)
 
757
{
 
758
  my_bool right;
 
759
  uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
 
760
       right_length,left_length,new_right_length,new_left_length,extra_length,
 
761
       length,keys;
 
762
  uchar *pos,*buff,*extra_buff;
 
763
  my_off_t next_page,new_pos;
 
764
  uchar tmp_part_key[MI_MAX_KEY_BUFF];
 
765
  DBUG_ENTER("_mi_balance_page");
 
766
 
 
767
  k_length=keyinfo->keylength;
 
768
  father_length=mi_getint(father_buff);
 
769
  father_keylength=k_length+info->s->base.key_reflength;
 
770
  nod_flag=mi_test_if_nod(curr_buff);
 
771
  curr_keylength=k_length+nod_flag;
 
772
  info->page_changed=1;
 
773
 
 
774
  if ((father_key_pos != father_buff+father_length &&
 
775
       (info->state->records & 1)) ||
 
776
      father_key_pos == father_buff+2+info->s->base.key_reflength)
 
777
  {
 
778
    right=1;
 
779
    next_page= _mi_kpos(info->s->base.key_reflength,
 
780
                        father_key_pos+father_keylength);
 
781
    buff=info->buff;
 
782
    DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
 
783
  }
 
784
  else
 
785
  {
 
786
    right=0;
 
787
    father_key_pos-=father_keylength;
 
788
    next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
 
789
                                        /* Fix that curr_buff is to left */
 
790
    buff=curr_buff; curr_buff=info->buff;
 
791
    DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
 
792
  }                                     /* father_key_pos ptr to parting key */
 
793
 
 
794
  if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
 
795
    goto err;
 
796
  DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
 
797
 
 
798
        /* Test if there is room to share keys */
 
799
 
 
800
  left_length=mi_getint(curr_buff);
 
801
  right_length=mi_getint(buff);
 
802
  keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
 
803
 
 
804
  if ((right ? right_length : left_length) + curr_keylength <=
 
805
      keyinfo->block_length)
 
806
  {                                             /* Merge buffs */
 
807
    new_left_length=2+nod_flag+(keys/2)*curr_keylength;
 
808
    new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
 
809
    mi_putint(curr_buff,new_left_length,nod_flag);
 
810
    mi_putint(buff,new_right_length,nod_flag);
 
811
 
 
812
    if (left_length < new_left_length)
 
813
    {                                           /* Move keys buff -> leaf */
 
814
      pos=curr_buff+left_length;
 
815
      memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
 
816
      memcpy((uchar*) pos+k_length, (uchar*) buff+2,
 
817
             (size_t) (length=new_left_length - left_length - k_length));
 
818
      pos=buff+2+length;
 
819
      memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
 
820
      bmove((uchar*) buff+2,(uchar*) pos+k_length,new_right_length);
 
821
    }
 
822
    else
 
823
    {                                           /* Move keys -> buff */
 
824
 
 
825
      bmove_upp((uchar*) buff+new_right_length,(uchar*) buff+right_length,
 
826
                right_length-2);
 
827
      length=new_right_length-right_length-k_length;
 
828
      memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
 
829
      pos=curr_buff+new_left_length;
 
830
      memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
 
831
      memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
 
832
    }
 
833
 
 
834
    if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
 
835
        _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
 
836
      goto err;
 
837
    DBUG_RETURN(0);
 
838
  }
 
839
 
 
840
        /* curr_buff[] and buff[] are full, lets split and make new nod */
 
841
 
 
842
  extra_buff=info->buff+info->s->base.max_key_block_length;
 
843
  new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
 
844
  if (keys == 5)                                /* Too few keys to balance */
 
845
    new_left_length-=curr_keylength;
 
846
  extra_length=nod_flag+left_length+right_length-
 
847
    new_left_length-new_right_length-curr_keylength;
 
848
  DBUG_PRINT("info",("left_length: %d  right_length: %d  new_left_length: %d  new_right_length: %d  extra_length: %d",
 
849
                     left_length, right_length,
 
850
                     new_left_length, new_right_length,
 
851
                     extra_length));
 
852
  mi_putint(curr_buff,new_left_length,nod_flag);
 
853
  mi_putint(buff,new_right_length,nod_flag);
 
854
  mi_putint(extra_buff,extra_length+2,nod_flag);
 
855
 
 
856
  /* move first largest keys to new page  */
 
857
  pos=buff+right_length-extra_length;
 
858
  memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
 
859
  /* Save new parting key */
 
860
  memcpy(tmp_part_key, pos-k_length,k_length);
 
861
  /* Make place for new keys */
 
862
  bmove_upp((uchar*) buff+new_right_length,(uchar*) pos-k_length,
 
863
            right_length-extra_length-k_length-2);
 
864
  /* Copy keys from left page */
 
865
  pos= curr_buff+new_left_length;
 
866
  memcpy((uchar*) buff+2,(uchar*) pos+k_length,
 
867
         (size_t) (length=left_length-new_left_length-k_length));
 
868
  /* Copy old parting key */
 
869
  memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
 
870
 
 
871
  /* Move new parting keys up to caller */
 
872
  memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
 
873
  memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
 
874
 
 
875
  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
 
876
    goto err;
 
877
  _mi_kpointer(info,key+k_length,new_pos);
 
878
  if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
 
879
                        DFLT_INIT_HITS,info->buff) ||
 
880
      _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
 
881
                        DFLT_INIT_HITS,extra_buff))
 
882
    goto err;
 
883
 
 
884
  DBUG_RETURN(1);                               /* Middle key up */
 
885
 
 
886
err:
 
887
  DBUG_RETURN(-1);
 
888
} /* _mi_balance_page */
 
889
 
 
890
/**********************************************************************
 
891
 *                Bulk insert code                                    *
 
892
 **********************************************************************/
 
893
 
 
894
typedef struct {
 
895
  MI_INFO *info;
 
896
  uint keynr;
 
897
} bulk_insert_param;
 
898
 
 
899
int _mi_ck_write_tree(register MI_INFO *info, uint keynr, uchar *key,
 
900
                      uint key_length)
 
901
{
 
902
  int error;
 
903
  DBUG_ENTER("_mi_ck_write_tree");
 
904
 
 
905
  error= tree_insert(&info->bulk_insert[keynr], key,
 
906
         key_length + info->s->rec_reflength,
 
907
         info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
 
908
 
 
909
  DBUG_RETURN(error);
 
910
} /* _mi_ck_write_tree */
 
911
 
 
912
 
 
913
/* typeof(_mi_keys_compare)=qsort_cmp2 */
 
914
 
 
915
static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
 
916
{
 
917
  uint not_used[2];
 
918
  return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
 
919
                    key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
 
920
                    not_used);
 
921
}
 
922
 
 
923
 
 
924
static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
 
925
{
 
926
  /*
 
927
    Probably I can use info->lastkey here, but I'm not sure,
 
928
    and to be safe I'd better use local lastkey.
 
929
  */
 
930
  uchar lastkey[MI_MAX_KEY_BUFF];
 
931
  uint keylen;
 
932
  MI_KEYDEF *keyinfo;
 
933
 
 
934
  switch (mode) {
 
935
  case free_init:
 
936
    if (param->info->s->concurrent_insert)
 
937
    {
 
938
      rw_wrlock(&param->info->s->key_root_lock[param->keynr]);
 
939
      param->info->s->keyinfo[param->keynr].version++;
 
940
    }
 
941
    return 0;
 
942
  case free_free:
 
943
    keyinfo=param->info->s->keyinfo+param->keynr;
 
944
    keylen=_mi_keylength(keyinfo, key);
 
945
    memcpy(lastkey, key, keylen);
 
946
    return _mi_ck_write_btree(param->info,param->keynr,lastkey,
 
947
                              keylen - param->info->s->rec_reflength);
 
948
  case free_end:
 
949
    if (param->info->s->concurrent_insert)
 
950
      rw_unlock(&param->info->s->key_root_lock[param->keynr]);
 
951
    return 0;
 
952
  }
 
953
  return -1;
 
954
}
 
955
 
 
956
 
 
957
int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows)
 
958
{
 
959
  MYISAM_SHARE *share=info->s;
 
960
  MI_KEYDEF *key=share->keyinfo;
 
961
  bulk_insert_param *params;
 
962
  uint i, num_keys, total_keylength;
 
963
  ulonglong key_map;
 
964
  DBUG_ENTER("_mi_init_bulk_insert");
 
965
  DBUG_PRINT("enter",("cache_size: %lu", cache_size));
 
966
 
 
967
  DBUG_ASSERT(!info->bulk_insert &&
 
968
              (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
 
969
 
 
970
  mi_clear_all_keys_active(key_map);
 
971
  for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
 
972
  {
 
973
    if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
 
974
        mi_is_key_active(share->state.key_map, i))
 
975
    {
 
976
      num_keys++;
 
977
      mi_set_key_active(key_map, i);
 
978
      total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
 
979
    }
 
980
  }
 
981
 
 
982
  if (num_keys==0 ||
 
983
      num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
 
984
    DBUG_RETURN(0);
 
985
 
 
986
  if (rows && rows*total_keylength < cache_size)
 
987
    cache_size= (ulong)rows;
 
988
  else
 
989
    cache_size/=total_keylength*16;
 
990
 
 
991
  info->bulk_insert=(TREE *)
 
992
    my_malloc((sizeof(TREE)*share->base.keys+
 
993
               sizeof(bulk_insert_param)*num_keys),MYF(0));
 
994
 
 
995
  if (!info->bulk_insert)
 
996
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
997
 
 
998
  params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
 
999
  for (i=0 ; i < share->base.keys ; i++)
 
1000
  {
 
1001
    if (mi_is_key_active(key_map, i))
 
1002
    {
 
1003
      params->info=info;
 
1004
      params->keynr=i;
 
1005
      /* Only allocate a 16'th of the buffer at a time */
 
1006
      init_tree(&info->bulk_insert[i],
 
1007
                cache_size * key[i].maxlength,
 
1008
                cache_size * key[i].maxlength, 0,
 
1009
                (qsort_cmp2)keys_compare, 0,
 
1010
                (tree_element_free) keys_free, (void *)params++);
 
1011
    }
 
1012
    else
 
1013
     info->bulk_insert[i].root=0;
 
1014
  }
 
1015
 
 
1016
  DBUG_RETURN(0);
 
1017
}
 
1018
 
 
1019
void mi_flush_bulk_insert(MI_INFO *info, uint inx)
 
1020
{
 
1021
  if (info->bulk_insert)
 
1022
  {
 
1023
    if (is_tree_inited(&info->bulk_insert[inx]))
 
1024
      reset_tree(&info->bulk_insert[inx]);
 
1025
  }
 
1026
}
 
1027
 
 
1028
void mi_end_bulk_insert(MI_INFO *info)
 
1029
{
 
1030
  if (info->bulk_insert)
 
1031
  {
 
1032
    uint i;
 
1033
    for (i=0 ; i < info->s->base.keys ; i++)
 
1034
    {
 
1035
      if (is_tree_inited(& info->bulk_insert[i]))
 
1036
      {
 
1037
        delete_tree(& info->bulk_insert[i]);
 
1038
      }
 
1039
    }
 
1040
    my_free((void *)info->bulk_insert, MYF(0));
 
1041
    info->bulk_insert=0;
 
1042
  }
 
1043
}