~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

  • Committer: Monty Taylor
  • Date: 2010-07-31 21:37:09 UTC
  • mto: (1680.1.1 build)
  • mto: This revision was merged to the branch mainline in revision 1681.
  • Revision ID: mordred@inaugust.com-20100731213709-p2tlj3txhwdi9339
Removed our unordered wrappers. We use Boost now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
 
13
13
  You should have received a copy of the GNU General Public License
14
14
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
15
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16
16
 
17
17
 
18
18
#include "config.h"
19
19
 
20
20
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
21
 
24
22
using namespace std;
25
23
using namespace drizzled;
93
91
    -Brian
94
92
*/
95
93
 
 
94
/* Variables for archive share methods */
 
95
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
 
96
 
96
97
/* When the engine starts up set the first version */
97
98
static uint64_t global_version= 1;
98
99
 
131
132
}
132
133
 
133
134
 
 
135
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory, 
 
136
                                    const SchemaIdentifier&,
 
137
                                    set<string>& set_of_names)
 
138
{
 
139
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
140
 
 
141
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
142
       entry_iter != entries.end(); ++entry_iter)
 
143
  {
 
144
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
145
    const string *filename= &entry->filename;
 
146
 
 
147
    assert(filename->size());
 
148
 
 
149
    const char *ext= strchr(filename->c_str(), '.');
 
150
 
 
151
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
152
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
 
153
    {  }
 
154
    else
 
155
    {
 
156
      char uname[NAME_LEN + 1];
 
157
      uint32_t file_name_len;
 
158
 
 
159
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
160
      // TODO: Remove need for memory copy here
 
161
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
162
      set_of_names.insert(uname);
 
163
    }
 
164
  }
 
165
}
 
166
 
 
167
 
134
168
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
135
169
{
136
170
  string new_path(identifier.getPath());
166
200
    error= EEXIST;
167
201
 
168
202
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
 
203
    azio_stream proto_stream;
170
204
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
205
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
206
      return HA_ERR_CRASHED_ON_USAGE;
173
207
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
 
208
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
209
    if (proto_string == NULL)
176
210
    {
177
 
      azclose(proto_stream.get());
 
211
      azclose(&proto_stream);
178
212
      return ENOMEM;
179
213
    }
180
214
 
181
 
    azread_frm(proto_stream.get(), proto_string);
 
215
    azread_frm(&proto_stream, proto_string);
182
216
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
 
217
    if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
218
      error= HA_ERR_CRASHED_ON_USAGE;
185
219
 
186
 
    azclose(proto_stream.get());
 
220
    azclose(&proto_stream);
187
221
    free(proto_string);
188
222
  }
189
223
 
198
232
 
199
233
 
200
234
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
 
235
                       TableShare &table_arg)
202
236
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
203
237
{
204
238
  /* Set our original buffer from pre-allocated memory */
236
270
{
237
271
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
272
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
 
273
  internal::fn_format(data_file_name, table_name.c_str(), "",
 
274
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
241
275
  /*
242
276
    We will use this lock for rows.
243
277
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
 
278
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
245
279
}
246
280
 
247
281
ArchiveShare::~ArchiveShare()
248
282
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
 
283
  thr_lock_delete(&lock);
 
284
  pthread_mutex_destroy(&mutex);
251
285
  /*
252
286
    We need to make sure we don't reset the crashed state.
253
287
    If we open a crashed file, wee need to close it as crashed unless
257
291
  */
258
292
  if (archive_write_open == true)
259
293
    (void)azclose(&archive_write);
 
294
  pthread_mutex_destroy(&archive_mutex);
260
295
}
261
296
 
262
297
bool ArchiveShare::prime(uint64_t *auto_increment)
263
298
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
 
299
  azio_stream archive_tmp;
265
300
 
266
301
  /*
267
302
    We read the meta file, but do not mark it dirty. Since we are not
269
304
    anything but reading... open it for write and we will generate null
270
305
    compression writes).
271
306
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
 
307
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
308
               AZ_METHOD_BLOCK)))
274
309
    return false;
275
310
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
 
311
  *auto_increment= archive_tmp.auto_increment + 1;
 
312
  rows_recorded= (ha_rows)archive_tmp.rows;
 
313
  crashed= archive_tmp.dirty;
279
314
  if (version < global_version)
280
315
  {
281
316
    version_rows= rows_recorded;
282
317
    version= global_version;
283
318
  }
284
 
  azclose(archive_tmp.get());
 
319
  azclose(&archive_tmp);
285
320
 
286
321
  return true;
287
322
}
296
331
*/
297
332
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
298
333
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
 
334
  pthread_mutex_lock(&archive_mutex);
 
335
 
 
336
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
337
  share= a_engine->findOpenTable(table_name);
304
338
 
305
339
  if (!share)
308
342
 
309
343
    if (share == NULL)
310
344
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
345
      pthread_mutex_unlock(&archive_mutex);
312
346
      *rc= HA_ERR_OUT_OF_MEM;
313
347
      return(NULL);
314
348
    }
315
349
 
316
350
    if (share->prime(&stats.auto_increment_value) == false)
317
351
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
352
      pthread_mutex_unlock(&archive_mutex);
319
353
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
354
      delete share;
321
355
 
323
357
    }
324
358
 
325
359
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
360
    thr_lock_init(&share->lock);
327
361
  }
328
362
  share->use_count++;
329
363
 
330
364
  if (share->crashed)
331
365
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
366
  pthread_mutex_unlock(&archive_mutex);
333
367
 
334
368
  return(share);
335
369
}
341
375
*/
342
376
int ha_archive::free_share()
343
377
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
345
 
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
378
  pthread_mutex_lock(&archive_mutex);
347
379
  if (!--share->use_count)
348
380
  {
 
381
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
382
    a_engine->deleteOpenTable(share->table_name);
350
383
    delete share;
351
384
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
385
  pthread_mutex_unlock(&archive_mutex);
353
386
 
354
387
  return 0;
355
388
}
361
394
    a gzip file that can be both read and written we keep a writer open
362
395
    that is shared amoung all open tables.
363
396
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
397
  if (!(azopen(&(share->archive_write), share->data_file_name,
365
398
               O_RDWR, AZ_METHOD_BLOCK)))
366
399
  {
367
400
    share->crashed= true;
387
420
  {
388
421
    az_method method;
389
422
 
390
 
    if (archive_aio_state())
 
423
    switch (archive_aio_state())
391
424
    {
 
425
    case false:
 
426
      method= AZ_METHOD_BLOCK;
 
427
      break;
 
428
    case true:
392
429
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
430
      break;
 
431
    default:
396
432
      method= AZ_METHOD_BLOCK;
397
433
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
434
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
399
435
                 method)))
400
436
    {
401
437
      share->crashed= true;
437
473
 
438
474
  assert(share);
439
475
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
 
476
  record_buffer= create_record_buffer(table->getShare()->getRecordLength() +
 
477
                                      ARCHIVE_ROW_HEADER_SIZE);
 
478
 
 
479
  if (!record_buffer)
 
480
  {
 
481
    free_share();
 
482
    return(HA_ERR_OUT_OF_MEM);
 
483
  }
 
484
 
 
485
  thr_lock_data_init(&share->lock, &lock, NULL);
443
486
 
444
487
  return(rc);
445
488
}
473
516
{
474
517
  int rc= 0;
475
518
 
476
 
  record_buffer.clear();
 
519
  destroy_record_buffer(record_buffer);
477
520
 
478
521
  /* First close stream */
479
522
  if (archive_reader_open == true)
502
545
                                 const drizzled::TableIdentifier &identifier,
503
546
                                 drizzled::message::Table& proto)
504
547
{
 
548
  char name_buff[FN_REFLEN];
505
549
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
 
550
  azio_stream create_stream;            /* Archive file we are working with */
507
551
  uint64_t auto_increment_value;
508
552
  string serialized_proto;
509
553
 
521
565
 
522
566
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
567
      {
524
 
        return -1;
 
568
        error= -1;
 
569
        goto error;
525
570
      }
526
571
    }
527
572
  }
528
573
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
 
574
  /*
 
575
    We reuse name_buff since it is available.
 
576
  */
 
577
  internal::fn_format(name_buff, identifier.getPath().c_str(), "", ARZ,
 
578
                      MY_REPLACE_EXT | MY_UNPACK_FILENAME);
531
579
 
532
580
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
 
581
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
582
             AZ_METHOD_BLOCK) == 0)
535
583
  {
536
584
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
 
585
    goto error2;
540
586
  }
541
587
 
542
588
  try {
544
590
  }
545
591
  catch (...)
546
592
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
 
593
    goto error2;
550
594
  }
551
595
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
 
596
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
597
                  serialized_proto.length()))
554
598
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
 
599
    goto error2;
558
600
  }
559
601
 
560
602
  if (proto.options().has_comment())
561
603
  {
562
604
    int write_length;
563
605
 
564
 
    write_length= azwrite_comment(create_stream.get(),
 
606
    write_length= azwrite_comment(&create_stream,
565
607
                                  proto.options().comment().c_str(),
566
608
                                  proto.options().comment().length());
567
609
 
568
610
    if (write_length < 0)
569
611
    {
570
612
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
 
613
      goto error2;
574
614
    }
575
615
  }
576
616
 
578
618
    Yes you need to do this, because the starting value
579
619
    for the autoincrement may not be zero.
580
620
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
 
621
  create_stream.auto_increment= auto_increment_value ?
582
622
    auto_increment_value - 1 : 0;
583
623
 
584
 
  if (azclose(create_stream.get()))
 
624
  if (azclose(&create_stream))
585
625
  {
586
626
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
627
    goto error2;
590
628
  }
591
629
 
592
630
  return(0);
 
631
 
 
632
error2:
 
633
  unlink(name_buff);
 
634
 
 
635
error:
 
636
  /* Return error number, if we got one */
 
637
  return(error ? error : -1);
593
638
}
594
639
 
595
640
/*
603
648
  /* We pack the row for writing */
604
649
  r_pack_length= pack_row(buf);
605
650
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
651
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
652
  if (written != r_pack_length)
608
653
  {
609
654
    return(-1);
623
668
 
624
669
uint32_t ha_archive::max_row_length(const unsigned char *)
625
670
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
671
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
672
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
673
 
629
674
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
675
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
676
       ptr != end ;
632
677
       ptr++)
633
678
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
679
      length += 2 + ((Field_blob*)table->getField(*ptr))->get_length();
635
680
  }
636
681
 
637
682
  return length;
646
691
    return(HA_ERR_OUT_OF_MEM);
647
692
 
648
693
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
694
  memcpy(record_buffer->buffer, record, table->getShare()->null_bytes);
 
695
  ptr= record_buffer->buffer + table->getShare()->null_bytes;
651
696
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
697
  for (Field **field=table->getFields() ; *field ; field++)
653
698
  {
654
699
    if (!((*field)->is_null()))
655
700
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
701
  }
657
702
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
703
  return((unsigned int) (ptr - record_buffer->buffer));
659
704
}
660
705
 
661
706
 
673
718
  int rc;
674
719
  unsigned char *read_buf= NULL;
675
720
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
721
  unsigned char *record=  table->record[0];
677
722
 
678
723
  if (share->crashed)
679
724
    return(HA_ERR_CRASHED_ON_USAGE);
680
725
 
681
 
  pthread_mutex_lock(&share->mutex());
 
726
  pthread_mutex_lock(&share->mutex);
682
727
 
683
728
  if (share->archive_write_open == false)
684
729
    if (init_archive_writer())
685
730
      return(HA_ERR_CRASHED_ON_USAGE);
686
731
 
687
732
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
733
  if (table->next_number_field && record == table->record[0])
689
734
  {
690
735
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
736
    temp_auto= table->next_number_field->val_int();
692
737
 
693
738
    /*
694
739
      We don't support decremening auto_increment. They make the performance
695
740
      just cry.
696
741
    */
697
742
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
743
        table->getShare()->getKeyInfo(0).flags & HA_NOSAME)
699
744
    {
700
745
      rc= HA_ERR_FOUND_DUPP_KEY;
701
746
      goto error;
715
760
  share->rows_recorded++;
716
761
  rc= real_write_row(buf,  &(share->archive_write));
717
762
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
763
  pthread_mutex_unlock(&share->mutex);
719
764
  if (read_buf)
720
765
    free((unsigned char*) read_buf);
721
766
 
747
792
{
748
793
  int rc;
749
794
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
795
  current_k_offset= table->getShare()->getKeyInfo(0).key_part->offset;
751
796
  current_key= key;
752
797
  current_key_len= key_len;
753
798
 
832
877
/* Reallocate buffer if needed */
833
878
bool ha_archive::fix_rec_buff(unsigned int length)
834
879
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
880
  assert(record_buffer->buffer);
 
881
 
 
882
  if (length > record_buffer->length)
 
883
  {
 
884
    unsigned char *newptr;
 
885
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
886
      return(1);
 
887
    record_buffer->buffer= newptr;
 
888
    record_buffer->length= length;
 
889
  }
 
890
 
 
891
  assert(length <= record_buffer->length);
 
892
 
 
893
  return(0);
838
894
}
839
895
 
840
896
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
908
  }
853
909
 
854
910
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
911
  memcpy(record, ptr, table->getNullBytes());
 
912
  ptr+= table->getNullBytes();
 
913
  for (Field **field= table->getFields() ; *field ; field++)
858
914
  {
859
915
    if (!((*field)->is_null()))
860
916
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
917
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
918
    }
863
919
  }
864
920
  return(0);
893
949
  current_position= aztell(&archive);
894
950
  rc= get_row(&archive, buf);
895
951
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
952
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
953
 
898
954
  return(rc);
899
955
}
951
1007
int ha_archive::optimize()
952
1008
{
953
1009
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1010
  azio_stream writer;
 
1011
  char writer_filename[FN_REFLEN];
955
1012
 
956
1013
  init_archive_reader();
957
1014
 
971
1028
  azread_frm(&archive, proto_string);
972
1029
 
973
1030
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
 
1031
  internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
 
1032
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
976
1033
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
1034
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
1035
  {
979
1036
    free(proto_string);
980
1037
    return(HA_ERR_CRASHED_ON_USAGE);
981
1038
  }
982
1039
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
 
1040
  azwrite_frm(&writer, proto_string, archive.frm_length);
984
1041
 
985
1042
  /*
986
1043
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1004
1061
    */
1005
1062
    if (!rc)
1006
1063
    {
 
1064
      uint64_t x;
1007
1065
      uint64_t rows_restored;
1008
1066
      share->rows_recorded= 0;
1009
1067
      stats.auto_increment_value= 1;
1011
1069
 
1012
1070
      rows_restored= archive.rows;
1013
1071
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1072
      for (x= 0; x < rows_restored ; x++)
1015
1073
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1074
        rc= get_row(&archive, table->record[0]);
1017
1075
 
1018
1076
        if (rc != 0)
1019
1077
          break;
1020
1078
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1079
        real_write_row(table->record[0], &writer);
1022
1080
        /*
1023
1081
          Long term it should be possible to optimize this so that
1024
1082
          it is not called on each row.
1025
1083
        */
1026
 
        if (getTable()->found_next_number_field)
 
1084
        if (table->found_next_number_field)
1027
1085
        {
1028
 
          Field *field= getTable()->found_next_number_field;
 
1086
          Field *field= table->found_next_number_field;
1029
1087
 
1030
1088
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
1089
          field->setReadSet();
1032
1090
 
1033
1091
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1092
            (uint64_t) field->val_int(table->record[0] +
 
1093
                                       field->offset(table->record[0]));
1036
1094
          if (share->archive_write.auto_increment < auto_value)
1037
1095
            stats.auto_increment_value=
1038
1096
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1097
        }
1040
1098
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1099
      share->rows_recorded= (ha_rows)writer.rows;
1042
1100
    }
1043
1101
 
1044
1102
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1105
    }
1048
1106
  }
1049
1107
 
1050
 
  azclose(writer.get());
 
1108
  azclose(&writer);
1051
1109
  share->dirty= false;
1052
1110
 
1053
1111
  azclose(&archive);
1054
1112
 
1055
1113
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
 
1114
  rc = internal::my_rename(writer_filename,share->data_file_name,MYF(0));
1057
1115
 
1058
1116
  free(proto_string);
1059
1117
  return(rc);
1060
1118
error:
1061
1119
  free(proto_string);
1062
 
  azclose(writer.get());
 
1120
  azclose(&writer);
1063
1121
 
1064
1122
  return(rc);
1065
1123
}
1115
1173
    If dirty, we lock, and then reset/flush the data.
1116
1174
    I found that just calling azflush() doesn't always work.
1117
1175
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1176
  pthread_mutex_lock(&share->mutex);
1119
1177
  if (share->dirty == true)
1120
1178
  {
1121
1179
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1192
    cause the number to be inaccurate.
1135
1193
  */
1136
1194
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1195
  pthread_mutex_unlock(&share->mutex);
1138
1196
 
1139
1197
  scan_rows= stats.records;
1140
1198
  stats.deleted= 0;
1144
1202
  {
1145
1203
    struct stat file_stat;  // Stat information for the data file
1146
1204
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1205
    stat(share->data_file_name, &file_stat);
1148
1206
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1207
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1208
    stats.data_file_length= file_stat.st_size;
1151
1209
    stats.create_time= file_stat.st_ctime;
1152
1210
    stats.update_time= file_stat.st_mtime;
1158
1216
  if (flag & HA_STATUS_AUTO)
1159
1217
  {
1160
1218
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1219
    pthread_mutex_lock(&share->mutex);
1162
1220
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1221
    pthread_mutex_unlock(&share->mutex);
1164
1222
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1223
  }
1166
1224
 
1211
1269
{
1212
1270
  int rc= 0;
1213
1271
  const char *old_proc_info;
 
1272
  uint64_t x;
1214
1273
 
1215
1274
  old_proc_info= get_session_proc_info(session);
1216
1275
  set_session_proc_info(session, "Checking table");
1217
1276
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1277
  pthread_mutex_lock(&share->mutex);
1219
1278
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1279
  pthread_mutex_unlock(&share->mutex);
1221
1280
 
1222
1281
  /*
1223
1282
    Now we will rewind the archive file so that we are positioned at the
1226
1285
  init_archive_reader();
1227
1286
  azflush(&archive, Z_SYNC_FLUSH);
1228
1287
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1288
  for (x= 0; x < share->archive_write.rows; x++)
1230
1289
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1290
    rc= get_row(&archive, table->record[0]);
1232
1291
 
1233
1292
    if (rc != 0)
1234
1293
      break;
1247
1306
  }
1248
1307
}
1249
1308
 
 
1309
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1310
{
 
1311
  archive_record_buffer *r;
 
1312
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1313
  {
 
1314
    return(NULL);
 
1315
  }
 
1316
  r->length= (int)length;
 
1317
 
 
1318
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1319
  {
 
1320
    free((char*) r);
 
1321
    return(NULL);
 
1322
  }
 
1323
 
 
1324
  return(r);
 
1325
}
 
1326
 
 
1327
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1328
{
 
1329
  free((char*) r->buffer);
 
1330
  free((char*) r);
 
1331
  return;
 
1332
}
 
1333
 
1250
1334
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1251
1335
{
1252
1336
  int error= 0;
1280
1364
 
1281
1365
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
1366
                                          const drizzled::SchemaIdentifier &schema_identifier,
1283
 
                                          drizzled::TableIdentifier::vector &set_of_identifiers)
 
1367
                                          drizzled::TableIdentifiers &set_of_identifiers)
1284
1368
{
1285
1369
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
1370
 
1310
1394
    }
1311
1395
  }
1312
1396
}
 
1397