~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/myisam/mi_write.cc

  • Committer: Brian Aker
  • Date: 2009-05-21 19:15:01 UTC
  • mfrom: (991.1.12 for-brian)
  • Revision ID: brian@gaz-20090521191501-u5qe56byfubioj1r
Merge Stewart

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2006 MySQL AB
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
 
 
16
 
/* Write a row to a MyISAM table */
17
 
 
18
 
#include "myisam_priv.h"
19
 
 
20
 
#include "drizzled/internal/m_string.h"
21
 
#include <drizzled/util/test.h>
22
 
 
23
 
using namespace drizzled;
24
 
 
25
 
#define MAX_POINTER_LENGTH 8
26
 
 
27
 
        /* Functions declared in this file */
28
 
 
29
 
static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
30
 
                    uint32_t comp_flag, unsigned char *key,
31
 
                    uint32_t key_length, internal::my_off_t pos, unsigned char *father_buff,
32
 
                    unsigned char *father_keypos, internal::my_off_t father_page,
33
 
                    bool insert_last);
34
 
static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,unsigned char *key,
35
 
                            unsigned char *curr_buff,unsigned char *father_buff,
36
 
                            unsigned char *father_keypos,internal::my_off_t father_page);
37
 
static unsigned char *_mi_find_last_pos(MI_KEYDEF *keyinfo, unsigned char *page,
38
 
                                unsigned char *key, uint32_t *return_key_length,
39
 
                                unsigned char **after_key);
40
 
int _mi_ck_write_tree(register MI_INFO *info, uint32_t keynr,unsigned char *key,
41
 
                      uint32_t key_length);
42
 
int _mi_ck_write_btree(register MI_INFO *info, uint32_t keynr,unsigned char *key,
43
 
                       uint32_t key_length);
44
 
 
45
 
        /* Write new record to database */
46
 
 
47
 
int mi_write(MI_INFO *info, unsigned char *record)
48
 
{
49
 
  MYISAM_SHARE *share=info->s;
50
 
  uint32_t i;
51
 
  int save_errno;
52
 
  internal::my_off_t filepos;
53
 
  unsigned char *buff;
54
 
  bool lock_tree= share->concurrent_insert;
55
 
 
56
 
  if (share->options & HA_OPTION_READ_ONLY_DATA)
57
 
  {
58
 
    return(errno=EACCES);
59
 
  }
60
 
  if (_mi_readinfo(info,F_WRLCK,1))
61
 
    return(errno);
62
 
  filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
63
 
             !info->append_insert_at_end) ?
64
 
            share->state.dellink :
65
 
            info->state->data_file_length);
66
 
 
67
 
  if (share->base.reloc == (ha_rows) 1 &&
68
 
      share->base.records == (ha_rows) 1 &&
69
 
      info->state->records == (ha_rows) 1)
70
 
  {                                             /* System file */
71
 
    errno=HA_ERR_RECORD_FILE_FULL;
72
 
    goto err2;
73
 
  }
74
 
  if (info->state->key_file_length >= share->base.margin_key_file_length)
75
 
  {
76
 
    errno=HA_ERR_INDEX_FILE_FULL;
77
 
    goto err2;
78
 
  }
79
 
  if (_mi_mark_file_changed(info))
80
 
    goto err2;
81
 
 
82
 
  /* Calculate and check all unique constraints */
83
 
  for (i=0 ; i < share->state.header.uniques ; i++)
84
 
  {
85
 
    if (mi_check_unique(info,share->uniqueinfo+i,record,
86
 
                     mi_unique_hash(share->uniqueinfo+i,record),
87
 
                     HA_OFFSET_ERROR))
88
 
      goto err2;
89
 
  }
90
 
 
91
 
        /* Write all keys to indextree */
92
 
 
93
 
  buff=info->lastkey2;
94
 
  for (i=0 ; i < share->base.keys ; i++)
95
 
  {
96
 
    if (mi_is_key_active(share->state.key_map, i))
97
 
    {
98
 
      bool local_lock_tree= (lock_tree &&
99
 
                                !(info->bulk_insert &&
100
 
                                  is_tree_inited(&info->bulk_insert[i])));
101
 
      if (local_lock_tree)
102
 
      {
103
 
        share->keyinfo[i].version++;
104
 
      }
105
 
      {
106
 
        if (share->keyinfo[i].ck_insert(info,i,buff,
107
 
                        _mi_make_key(info,i,buff,record,filepos)))
108
 
        {
109
 
          goto err;
110
 
        }
111
 
      }
112
 
 
113
 
      /* The above changed info->lastkey2. Inform mi_rnext_same(). */
114
 
      info->update&= ~HA_STATE_RNEXT_SAME;
115
 
    }
116
 
  }
117
 
  if (share->calc_checksum)
118
 
    info->checksum=(*share->calc_checksum)(info,record);
119
 
  if (!(info->opt_flag & OPT_NO_ROWS))
120
 
  {
121
 
    if ((*share->write_record)(info,record))
122
 
      goto err;
123
 
    info->state->checksum+=info->checksum;
124
 
  }
125
 
  if (share->base.auto_key)
126
 
    set_if_bigger(info->s->state.auto_increment,
127
 
                  retrieve_auto_increment(info, record));
128
 
  info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
129
 
                 HA_STATE_ROW_CHANGED);
130
 
  info->state->records++;
131
 
  info->lastpos=filepos;
132
 
  _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
133
 
 
134
 
  /*
135
 
    Update status of the table. We need to do so after each row write
136
 
    for the log tables, as we want the new row to become visible to
137
 
    other threads as soon as possible. We don't lock mutex here
138
 
    (as it is required by pthread memory visibility rules) as (1) it's
139
 
    not critical to use outdated share->is_log_table value (2) locking
140
 
    mutex here for every write is too expensive.
141
 
  */
142
 
  if (share->is_log_table) // Log table do not exist in Drizzle
143
 
    assert(0);
144
 
 
145
 
  return(0);
146
 
 
147
 
err:
148
 
  save_errno=errno;
149
 
  if (errno == HA_ERR_FOUND_DUPP_KEY || errno == HA_ERR_RECORD_FILE_FULL ||
150
 
      errno == HA_ERR_NULL_IN_SPATIAL || errno == HA_ERR_OUT_OF_MEM)
151
 
  {
152
 
    if (info->bulk_insert)
153
 
    {
154
 
      uint32_t j;
155
 
      for (j=0 ; j < share->base.keys ; j++)
156
 
        mi_flush_bulk_insert(info, j);
157
 
    }
158
 
    info->errkey= (int) i;
159
 
    while ( i-- > 0)
160
 
    {
161
 
      if (mi_is_key_active(share->state.key_map, i))
162
 
      {
163
 
        {
164
 
          uint32_t key_length=_mi_make_key(info,i,buff,record,filepos);
165
 
          if (_mi_ck_delete(info,i,buff,key_length))
166
 
          {
167
 
            break;
168
 
          }
169
 
        }
170
 
      }
171
 
    }
172
 
  }
173
 
  else
174
 
  {
175
 
    mi_print_error(info->s, HA_ERR_CRASHED);
176
 
    mi_mark_crashed(info);
177
 
  }
178
 
  info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
179
 
  errno=save_errno;
180
 
err2:
181
 
  save_errno=errno;
182
 
  _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
183
 
  return(errno=save_errno);
184
 
} /* mi_write */
185
 
 
186
 
 
187
 
        /* Write one key to btree */
188
 
 
189
 
int _mi_ck_write(MI_INFO *info, uint32_t keynr, unsigned char *key, uint32_t key_length)
190
 
{
191
 
  if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
192
 
  {
193
 
    return(_mi_ck_write_tree(info, keynr, key, key_length));
194
 
  }
195
 
  else
196
 
  {
197
 
    return(_mi_ck_write_btree(info, keynr, key, key_length));
198
 
  }
199
 
} /* _mi_ck_write */
200
 
 
201
 
 
202
 
/**********************************************************************
203
 
 *                Normal insert code                                  *
204
 
 **********************************************************************/
205
 
 
206
 
int _mi_ck_write_btree(register MI_INFO *info, uint32_t keynr, unsigned char *key,
207
 
                       uint32_t key_length)
208
 
{
209
 
  uint32_t error;
210
 
  uint32_t comp_flag;
211
 
  MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
212
 
  internal::my_off_t  *root=&info->s->state.key_root[keynr];
213
 
 
214
 
  if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
215
 
    comp_flag=SEARCH_BIGGER;                    /* Put after same key */
216
 
  else if (keyinfo->flag & (HA_NOSAME))
217
 
  {
218
 
    comp_flag=SEARCH_FIND | SEARCH_UPDATE;      /* No duplicates */
219
 
    if (keyinfo->flag & HA_NULL_ARE_EQUAL)
220
 
      comp_flag|= SEARCH_NULL_ARE_EQUAL;
221
 
  }
222
 
  else
223
 
    comp_flag=SEARCH_SAME;                      /* Keys in rec-pos order */
224
 
 
225
 
  error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
226
 
                                root, comp_flag);
227
 
  return(error);
228
 
} /* _mi_ck_write_btree */
229
 
 
230
 
int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
231
 
    unsigned char *key, uint32_t key_length, internal::my_off_t *root, uint32_t comp_flag)
232
 
{
233
 
  int error;
234
 
  /* key_length parameter is used only if comp_flag is SEARCH_FIND */
235
 
  if (*root == HA_OFFSET_ERROR ||
236
 
      (error=w_search(info, keyinfo, comp_flag, key, key_length,
237
 
                      *root, (unsigned char *) 0, (unsigned char*) 0,
238
 
                      (internal::my_off_t) 0, 1)) > 0)
239
 
    error=_mi_enlarge_root(info,keyinfo,key,root);
240
 
  return(error);
241
 
} /* _mi_ck_real_write_btree */
242
 
 
243
 
 
244
 
        /* Make a new root with key as only pointer */
245
 
 
246
 
int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, unsigned char *key,
247
 
                     internal::my_off_t *root)
248
 
{
249
 
  uint32_t t_length,nod_flag;
250
 
  MI_KEY_PARAM s_temp;
251
 
  MYISAM_SHARE *share=info->s;
252
 
 
253
 
  nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
254
 
  _mi_kpointer(info,info->buff+2,*root); /* if nod */
255
 
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(unsigned char*) 0,
256
 
                                (unsigned char*) 0, (unsigned char*) 0, key,&s_temp);
257
 
  mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
258
 
  (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
259
 
  info->buff_used=info->page_changed=1;         /* info->buff is used */
260
 
  if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
261
 
      _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
262
 
    return(-1);
263
 
  return(0);
264
 
} /* _mi_enlarge_root */
265
 
 
266
 
 
267
 
        /*
268
 
          Search after a position for a key and store it there
269
 
          Returns -1 = error
270
 
                   0  = ok
271
 
                   1  = key should be stored in higher tree
272
 
        */
273
 
 
274
 
static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
275
 
                    uint32_t comp_flag, unsigned char *key, uint32_t key_length, internal::my_off_t page,
276
 
                    unsigned char *father_buff, unsigned char *father_keypos,
277
 
                    internal::my_off_t father_page, bool insert_last)
278
 
{
279
 
  int error,flag;
280
 
  uint32_t nod_flag, search_key_length;
281
 
  unsigned char *temp_buff,*keypos;
282
 
  unsigned char keybuff[MI_MAX_KEY_BUFF];
283
 
  bool was_last_key;
284
 
  internal::my_off_t next_page, dupp_key_pos;
285
 
 
286
 
  search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
287
 
  if (!(temp_buff= (unsigned char*) malloc(keyinfo->block_length+
288
 
                                           MI_MAX_KEY_BUFF*2)))
289
 
    return(-1);
290
 
  if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
291
 
    goto err;
292
 
 
293
 
  flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
294
 
                              comp_flag, &keypos, keybuff, &was_last_key);
295
 
  nod_flag=mi_test_if_nod(temp_buff);
296
 
  if (flag == 0)
297
 
  {
298
 
    uint32_t tmp_key_length;
299
 
        /* get position to record with duplicated key */
300
 
    tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
301
 
    if (tmp_key_length)
302
 
      dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
303
 
    else
304
 
      dupp_key_pos= HA_OFFSET_ERROR;
305
 
 
306
 
    {
307
 
      info->dupp_key_pos= dupp_key_pos;
308
 
      free(temp_buff);
309
 
      errno=HA_ERR_FOUND_DUPP_KEY;
310
 
      return(-1);
311
 
    }
312
 
  }
313
 
  if (flag == MI_FOUND_WRONG_KEY)
314
 
    return(-1);
315
 
  if (!was_last_key)
316
 
    insert_last=0;
317
 
  next_page=_mi_kpos(nod_flag,keypos);
318
 
  if (next_page == HA_OFFSET_ERROR ||
319
 
      (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
320
 
                      temp_buff, keypos, page, insert_last)) >0)
321
 
  {
322
 
    error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
323
 
                     father_keypos,father_page, insert_last);
324
 
    if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
325
 
      goto err;
326
 
  }
327
 
  free(temp_buff);
328
 
  return(error);
329
 
err:
330
 
  free(temp_buff);
331
 
  return (-1);
332
 
} /* w_search */
333
 
 
334
 
 
335
 
/*
336
 
  Insert new key.
337
 
 
338
 
  SYNOPSIS
339
 
    _mi_insert()
340
 
    info                        Open table information.
341
 
    keyinfo                     Key definition information.
342
 
    key                         New key.
343
 
    anc_buff                    Key page (beginning).
344
 
    key_pos                     Position in key page where to insert.
345
 
    key_buff                    Copy of previous key.
346
 
    father_buff                 parent key page for balancing.
347
 
    father_key_pos              position in parent key page for balancing.
348
 
    father_page                 position of parent key page in file.
349
 
    insert_last                 If to append at end of page.
350
 
 
351
 
  DESCRIPTION
352
 
    Insert new key at right of key_pos.
353
 
 
354
 
  RETURN
355
 
    2           if key contains key to upper level.
356
 
    0           OK.
357
 
    < 0         Error.
358
 
*/
359
 
 
360
 
int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
361
 
               unsigned char *key, unsigned char *anc_buff, unsigned char *key_pos, unsigned char *key_buff,
362
 
               unsigned char *father_buff, unsigned char *father_key_pos, internal::my_off_t father_page,
363
 
               bool insert_last)
364
 
{
365
 
  uint32_t a_length,nod_flag;
366
 
  int t_length;
367
 
  unsigned char *endpos, *prev_key;
368
 
  MI_KEY_PARAM s_temp;
369
 
 
370
 
  nod_flag=mi_test_if_nod(anc_buff);
371
 
  a_length=mi_getint(anc_buff);
372
 
  endpos= anc_buff+ a_length;
373
 
  prev_key=(key_pos == anc_buff+2+nod_flag ? (unsigned char*) 0 : key_buff);
374
 
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
375
 
                                (key_pos == endpos ? (unsigned char*) 0 : key_pos),
376
 
                                prev_key, prev_key,
377
 
                                key,&s_temp);
378
 
 
379
 
  if (t_length > 0)
380
 
  {
381
 
    if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
382
 
    {
383
 
      mi_print_error(info->s, HA_ERR_CRASHED);
384
 
      errno=HA_ERR_CRASHED;
385
 
      return(-1);
386
 
    }
387
 
    internal::bmove_upp((unsigned char*) endpos+t_length,(unsigned char*) endpos,(uint) (endpos-key_pos));
388
 
  }
389
 
  else
390
 
  {
391
 
    if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
392
 
    {
393
 
      mi_print_error(info->s, HA_ERR_CRASHED);
394
 
      errno=HA_ERR_CRASHED;
395
 
      return(-1);
396
 
    }
397
 
    memmove(key_pos, key_pos - t_length, endpos - key_pos + t_length);
398
 
  }
399
 
  (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
400
 
  a_length+=t_length;
401
 
  mi_putint(anc_buff,a_length,nod_flag);
402
 
  if (a_length <= keyinfo->block_length)
403
 
  {
404
 
    return(0);                          /* There is room on page */
405
 
  }
406
 
  /* Page is full */
407
 
  if (nod_flag)
408
 
    insert_last=0;
409
 
  if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
410
 
      father_buff && !insert_last)
411
 
    return(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
412
 
                                 father_key_pos,father_page));
413
 
  return(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
414
 
} /* _mi_insert */
415
 
 
416
 
 
417
 
        /* split a full page in two and assign emerging item to key */
418
 
 
419
 
int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
420
 
                   unsigned char *key, unsigned char *buff, unsigned char *key_buff,
421
 
                   bool insert_last_key)
422
 
{
423
 
  uint32_t length,a_length,key_ref_length,t_length,nod_flag,key_length;
424
 
  unsigned char *key_pos,*pos, *after_key= NULL;
425
 
  internal::my_off_t new_pos;
426
 
  MI_KEY_PARAM s_temp;
427
 
 
428
 
  if (info->s->keyinfo+info->lastinx == keyinfo)
429
 
    info->page_changed=1;                       /* Info->buff is used */
430
 
  info->buff_used=1;
431
 
  nod_flag=mi_test_if_nod(buff);
432
 
  key_ref_length=2+nod_flag;
433
 
  if (insert_last_key)
434
 
    key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
435
 
  else
436
 
    key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
437
 
                              &after_key);
438
 
  if (!key_pos)
439
 
    return(-1);
440
 
 
441
 
  length=(uint) (key_pos-buff);
442
 
  a_length=mi_getint(buff);
443
 
  mi_putint(buff,length,nod_flag);
444
 
 
445
 
  key_pos=after_key;
446
 
  if (nod_flag)
447
 
  {
448
 
    pos=key_pos-nod_flag;
449
 
    memcpy(info->buff + 2, pos, nod_flag);
450
 
  }
451
 
 
452
 
        /* Move middle item to key and pointer to new page */
453
 
  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
454
 
    return(-1);
455
 
  _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
456
 
 
457
 
        /* Store new page */
458
 
  if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
459
 
    return(-1);
460
 
 
461
 
  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(unsigned char *) 0,
462
 
                                (unsigned char*) 0, (unsigned char*) 0,
463
 
                                key_buff, &s_temp);
464
 
  length=(uint) ((buff+a_length)-key_pos);
465
 
  memcpy(info->buff+key_ref_length+t_length, key_pos, length);
466
 
  (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
467
 
  mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
468
 
 
469
 
  if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
470
 
    return(-1);
471
 
  return(2);                            /* Middle key up */
472
 
} /* _mi_split_page */
473
 
 
474
 
 
475
 
        /*
476
 
          Calculate how to much to move to split a page in two
477
 
          Returns pointer to start of key.
478
 
          key will contain the key.
479
 
          return_key_length will contain the length of key
480
 
          after_key will contain the position to where the next key starts
481
 
        */
482
 
 
483
 
unsigned char *_mi_find_half_pos(uint32_t nod_flag, MI_KEYDEF *keyinfo, unsigned char *page,
484
 
                         unsigned char *key, uint32_t *return_key_length,
485
 
                         unsigned char **after_key)
486
 
{
487
 
  uint32_t keys,length,key_ref_length;
488
 
  unsigned char *end,*lastpos;
489
 
 
490
 
  key_ref_length=2+nod_flag;
491
 
  length=mi_getint(page)-key_ref_length;
492
 
  page+=key_ref_length;
493
 
  if (!(keyinfo->flag &
494
 
        (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
495
 
         HA_BINARY_PACK_KEY)))
496
 
  {
497
 
    key_ref_length=keyinfo->keylength+nod_flag;
498
 
    keys=length/(key_ref_length*2);
499
 
    *return_key_length=keyinfo->keylength;
500
 
    end=page+keys*key_ref_length;
501
 
    *after_key=end+key_ref_length;
502
 
    memcpy(key,end,key_ref_length);
503
 
    return(end);
504
 
  }
505
 
 
506
 
  end=page+length/2-key_ref_length;             /* This is aprox. half */
507
 
  *key='\0';
508
 
  do
509
 
  {
510
 
    lastpos=page;
511
 
    if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
512
 
      return(0);
513
 
  } while (page < end);
514
 
  *return_key_length=length;
515
 
  *after_key=page;
516
 
  return(lastpos);
517
 
} /* _mi_find_half_pos */
518
 
 
519
 
 
520
 
        /*
521
 
          Split buffer at last key
522
 
          Returns pointer to the start of the key before the last key
523
 
          key will contain the last key
524
 
        */
525
 
 
526
 
static unsigned char *_mi_find_last_pos(MI_KEYDEF *keyinfo, unsigned char *page,
527
 
                                unsigned char *key, uint32_t *return_key_length,
528
 
                                unsigned char **after_key)
529
 
{
530
 
  uint32_t keys;
531
 
  uint32_t length;
532
 
  uint32_t last_length= 0;
533
 
  uint32_t key_ref_length;
534
 
  unsigned char *end, *lastpos, *prevpos= NULL;
535
 
  unsigned char key_buff[MI_MAX_KEY_BUFF];
536
 
 
537
 
  key_ref_length=2;
538
 
  length=mi_getint(page)-key_ref_length;
539
 
  page+=key_ref_length;
540
 
  if (!(keyinfo->flag &
541
 
        (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
542
 
         HA_BINARY_PACK_KEY)))
543
 
  {
544
 
    keys=length/keyinfo->keylength-2;
545
 
    *return_key_length=length=keyinfo->keylength;
546
 
    end=page+keys*length;
547
 
    *after_key=end+length;
548
 
    memcpy(key,end,length);
549
 
    return(end);
550
 
  }
551
 
 
552
 
  end= page + length - key_ref_length;
553
 
  *key='\0';
554
 
  length=0;
555
 
  lastpos=page;
556
 
  while (page < end)
557
 
  {
558
 
    prevpos=lastpos; lastpos=page;
559
 
    last_length=length;
560
 
    memcpy(key, key_buff, length);              /* previous key */
561
 
    if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
562
 
    {
563
 
      mi_print_error(keyinfo->share, HA_ERR_CRASHED);
564
 
      errno=HA_ERR_CRASHED;
565
 
      return(0);
566
 
    }
567
 
  }
568
 
  *return_key_length=last_length;
569
 
  *after_key=lastpos;
570
 
  return(prevpos);
571
 
} /* _mi_find_last_pos */
572
 
 
573
 
 
574
 
        /* Balance page with not packed keys with page on right/left */
575
 
        /* returns 0 if balance was done */
576
 
 
577
 
static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
578
 
                            unsigned char *key, unsigned char *curr_buff, unsigned char *father_buff,
579
 
                            unsigned char *father_key_pos, internal::my_off_t father_page)
580
 
{
581
 
  bool right;
582
 
  uint32_t k_length,father_length,father_keylength,nod_flag,curr_keylength,
583
 
       right_length,left_length,new_right_length,new_left_length,extra_length,
584
 
       length,keys;
585
 
  unsigned char *pos,*buff,*extra_buff;
586
 
  internal::my_off_t next_page,new_pos;
587
 
  unsigned char tmp_part_key[MI_MAX_KEY_BUFF];
588
 
 
589
 
  k_length=keyinfo->keylength;
590
 
  father_length=mi_getint(father_buff);
591
 
  father_keylength=k_length+info->s->base.key_reflength;
592
 
  nod_flag=mi_test_if_nod(curr_buff);
593
 
  curr_keylength=k_length+nod_flag;
594
 
  info->page_changed=1;
595
 
 
596
 
  if ((father_key_pos != father_buff+father_length &&
597
 
       (info->state->records & 1)) ||
598
 
      father_key_pos == father_buff+2+info->s->base.key_reflength)
599
 
  {
600
 
    right=1;
601
 
    next_page= _mi_kpos(info->s->base.key_reflength,
602
 
                        father_key_pos+father_keylength);
603
 
    buff=info->buff;
604
 
  }
605
 
  else
606
 
  {
607
 
    right=0;
608
 
    father_key_pos-=father_keylength;
609
 
    next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
610
 
                                        /* Fix that curr_buff is to left */
611
 
    buff=curr_buff; curr_buff=info->buff;
612
 
  }                                     /* father_key_pos ptr to parting key */
613
 
 
614
 
  if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
615
 
    goto err;
616
 
 
617
 
        /* Test if there is room to share keys */
618
 
 
619
 
  left_length=mi_getint(curr_buff);
620
 
  right_length=mi_getint(buff);
621
 
  keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
622
 
 
623
 
  if ((right ? right_length : left_length) + curr_keylength <=
624
 
      keyinfo->block_length)
625
 
  {                                             /* Merge buffs */
626
 
    new_left_length=2+nod_flag+(keys/2)*curr_keylength;
627
 
    new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
628
 
    mi_putint(curr_buff,new_left_length,nod_flag);
629
 
    mi_putint(buff,new_right_length,nod_flag);
630
 
 
631
 
    if (left_length < new_left_length)
632
 
    {                                           /* Move keys buff -> leaf */
633
 
      pos=curr_buff+left_length;
634
 
      memcpy(pos, father_key_pos, k_length);
635
 
      length= new_left_length - left_length - k_length;
636
 
      memcpy(pos+k_length, buff+2, length);
637
 
      pos=buff+2+length;
638
 
      memcpy(father_key_pos, pos, k_length);
639
 
      memmove(buff+2, pos+k_length, new_right_length);
640
 
    }
641
 
    else
642
 
    {                                           /* Move keys -> buff */
643
 
 
644
 
      internal::bmove_upp((unsigned char*) buff+new_right_length,(unsigned char*) buff+right_length,
645
 
                right_length-2);
646
 
      length=new_right_length-right_length-k_length;
647
 
      memcpy(buff+2+length,father_key_pos, k_length);
648
 
      pos=curr_buff+new_left_length;
649
 
      memcpy(father_key_pos, pos, k_length);
650
 
      memcpy(buff+2, pos+k_length, length);
651
 
    }
652
 
 
653
 
    if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
654
 
        _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
655
 
      goto err;
656
 
    return(0);
657
 
  }
658
 
 
659
 
        /* curr_buff[] and buff[] are full, lets split and make new nod */
660
 
 
661
 
  extra_buff=info->buff+info->s->base.max_key_block_length;
662
 
  new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
663
 
  if (keys == 5)                                /* Too few keys to balance */
664
 
    new_left_length-=curr_keylength;
665
 
  extra_length=nod_flag+left_length+right_length-
666
 
    new_left_length-new_right_length-curr_keylength;
667
 
  mi_putint(curr_buff,new_left_length,nod_flag);
668
 
  mi_putint(buff,new_right_length,nod_flag);
669
 
  mi_putint(extra_buff,extra_length+2,nod_flag);
670
 
 
671
 
  /* move first largest keys to new page  */
672
 
  pos=buff+right_length-extra_length;
673
 
  memcpy(extra_buff+2, pos, extra_length);
674
 
  /* Save new parting key */
675
 
  memcpy(tmp_part_key, pos-k_length,k_length);
676
 
  /* Make place for new keys */
677
 
  internal::bmove_upp((unsigned char*) buff+new_right_length,(unsigned char*) pos-k_length,
678
 
            right_length-extra_length-k_length-2);
679
 
  /* Copy keys from left page */
680
 
  pos= curr_buff+new_left_length;
681
 
  length= left_length - new_left_length - k_length;
682
 
  memcpy(buff+2, pos+k_length, length);
683
 
  /* Copy old parting key */
684
 
  memcpy(buff+2+length, father_key_pos, k_length);
685
 
 
686
 
  /* Move new parting keys up to caller */
687
 
  memcpy((right ? key : father_key_pos), pos, k_length);
688
 
  memcpy((right ? father_key_pos : key), tmp_part_key, k_length);
689
 
 
690
 
  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
691
 
    goto err;
692
 
  _mi_kpointer(info,key+k_length,new_pos);
693
 
  if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
694
 
                        DFLT_INIT_HITS,info->buff) ||
695
 
      _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
696
 
                        DFLT_INIT_HITS,extra_buff))
697
 
    goto err;
698
 
 
699
 
  return(1);                            /* Middle key up */
700
 
 
701
 
err:
702
 
  return(-1);
703
 
} /* _mi_balance_page */
704
 
 
705
 
/**********************************************************************
706
 
 *                Bulk insert code                                    *
707
 
 **********************************************************************/
708
 
 
709
 
typedef struct {
710
 
  MI_INFO *info;
711
 
  uint32_t keynr;
712
 
} bulk_insert_param;
713
 
 
714
 
int _mi_ck_write_tree(register MI_INFO *info, uint32_t keynr, unsigned char *key,
715
 
                      uint32_t key_length)
716
 
{
717
 
  int error;
718
 
 
719
 
  error= tree_insert(&info->bulk_insert[keynr], key,
720
 
         key_length + info->s->rec_reflength,
721
 
         info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
722
 
 
723
 
  return(error);
724
 
} /* _mi_ck_write_tree */
725
 
 
726
 
 
727
 
/* typeof(_mi_keys_compare)=qsort_cmp2 */
728
 
 
729
 
static int keys_compare(bulk_insert_param *param, unsigned char *key1, unsigned char *key2)
730
 
{
731
 
  uint32_t not_used[2];
732
 
  return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
733
 
                    key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
734
 
                    not_used);
735
 
}
736
 
 
737
 
 
738
 
static int keys_free(unsigned char *key, TREE_FREE mode, bulk_insert_param *param)
739
 
{
740
 
  /*
741
 
    Probably I can use info->lastkey here, but I'm not sure,
742
 
    and to be safe I'd better use local lastkey.
743
 
  */
744
 
  unsigned char lastkey[MI_MAX_KEY_BUFF];
745
 
  uint32_t keylen;
746
 
  MI_KEYDEF *keyinfo;
747
 
 
748
 
  switch (mode) {
749
 
  case free_init:
750
 
    if (param->info->s->concurrent_insert)
751
 
    {
752
 
      param->info->s->keyinfo[param->keynr].version++;
753
 
    }
754
 
    return 0;
755
 
  case free_free:
756
 
    keyinfo=param->info->s->keyinfo+param->keynr;
757
 
    keylen=_mi_keylength(keyinfo, key);
758
 
    memcpy(lastkey, key, keylen);
759
 
    return _mi_ck_write_btree(param->info,param->keynr,lastkey,
760
 
                              keylen - param->info->s->rec_reflength);
761
 
  case free_end:
762
 
    return 0;
763
 
  }
764
 
  return -1;
765
 
}
766
 
 
767
 
 
768
 
int mi_init_bulk_insert(MI_INFO *info, uint32_t cache_size, ha_rows rows)
769
 
{
770
 
  MYISAM_SHARE *share=info->s;
771
 
  MI_KEYDEF *key=share->keyinfo;
772
 
  bulk_insert_param *params;
773
 
  uint32_t i, num_keys, total_keylength;
774
 
  uint64_t key_map;
775
 
 
776
 
  assert(!info->bulk_insert &&
777
 
              (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
778
 
 
779
 
  mi_clear_all_keys_active(key_map);
780
 
  for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
781
 
  {
782
 
    if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
783
 
        mi_is_key_active(share->state.key_map, i))
784
 
    {
785
 
      num_keys++;
786
 
      mi_set_key_active(key_map, i);
787
 
      total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
788
 
    }
789
 
  }
790
 
 
791
 
  if (num_keys==0 ||
792
 
      num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
793
 
    return(0);
794
 
 
795
 
  if (rows && rows*total_keylength < cache_size)
796
 
    cache_size= (uint32_t)rows;
797
 
  else
798
 
    cache_size/=total_keylength*16;
799
 
 
800
 
  info->bulk_insert=(TREE *)
801
 
    malloc((sizeof(TREE)*share->base.keys+
802
 
           sizeof(bulk_insert_param)*num_keys));
803
 
 
804
 
  if (!info->bulk_insert)
805
 
    return(HA_ERR_OUT_OF_MEM);
806
 
 
807
 
  params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
808
 
  for (i=0 ; i < share->base.keys ; i++)
809
 
  {
810
 
    if (mi_is_key_active(key_map, i))
811
 
    {
812
 
      params->info=info;
813
 
      params->keynr=i;
814
 
      /* Only allocate a 16'th of the buffer at a time */
815
 
      init_tree(&info->bulk_insert[i],
816
 
                cache_size * key[i].maxlength,
817
 
                cache_size * key[i].maxlength, 0,
818
 
                (qsort_cmp2)keys_compare, false,
819
 
                (tree_element_free) keys_free, (void *)params++);
820
 
    }
821
 
    else
822
 
     info->bulk_insert[i].root=0;
823
 
  }
824
 
 
825
 
  return(0);
826
 
}
827
 
 
828
 
void mi_flush_bulk_insert(MI_INFO *info, uint32_t inx)
829
 
{
830
 
  if (info->bulk_insert)
831
 
  {
832
 
    if (is_tree_inited(&info->bulk_insert[inx]))
833
 
      reset_tree(&info->bulk_insert[inx]);
834
 
  }
835
 
}
836
 
 
837
 
void mi_end_bulk_insert(MI_INFO *info)
838
 
{
839
 
  if (info->bulk_insert)
840
 
  {
841
 
    uint32_t i;
842
 
    for (i=0 ; i < info->s->base.keys ; i++)
843
 
    {
844
 
      if (is_tree_inited(& info->bulk_insert[i]))
845
 
      {
846
 
        delete_tree(& info->bulk_insert[i]);
847
 
      }
848
 
    }
849
 
    free((void *)info->bulk_insert);
850
 
    info->bulk_insert=0;
851
 
  }
852
 
}