~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

Merge Joe, plus I updated the tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
 
13
13
  You should have received a copy of the GNU General Public License
14
14
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
15
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16
16
 
17
17
 
18
18
#include "config.h"
19
19
 
20
20
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
21
 
24
22
using namespace std;
25
23
using namespace drizzled;
93
91
    -Brian
94
92
*/
95
93
 
 
94
/* Variables for archive share methods */
 
95
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
 
96
 
96
97
/* When the engine starts up set the first version */
97
98
static uint64_t global_version= 1;
98
99
 
131
132
}
132
133
 
133
134
 
134
 
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
 
135
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory, 
 
136
                                    const SchemaIdentifier&,
 
137
                                    set<string>& set_of_names)
 
138
{
 
139
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
140
 
 
141
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
142
       entry_iter != entries.end(); ++entry_iter)
 
143
  {
 
144
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
145
    const string *filename= &entry->filename;
 
146
 
 
147
    assert(filename->size());
 
148
 
 
149
    const char *ext= strchr(filename->c_str(), '.');
 
150
 
 
151
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
152
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
 
153
    {  }
 
154
    else
 
155
    {
 
156
      char uname[NAME_LEN + 1];
 
157
      uint32_t file_name_len;
 
158
 
 
159
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
160
      // TODO: Remove need for memory copy here
 
161
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
162
      set_of_names.insert(uname);
 
163
    }
 
164
  }
 
165
}
 
166
 
 
167
 
 
168
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
135
169
{
136
170
  string new_path(identifier.getPath());
137
171
 
148
182
}
149
183
 
150
184
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const identifier::Table &identifier,
 
185
                                        const TableIdentifier &identifier,
152
186
                                        drizzled::message::Table &table_proto)
153
187
{
154
188
  struct stat stat_info;
166
200
    error= EEXIST;
167
201
 
168
202
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
 
203
    azio_stream proto_stream;
170
204
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
205
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
206
      return HA_ERR_CRASHED_ON_USAGE;
173
207
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
 
208
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
209
    if (proto_string == NULL)
176
210
    {
177
 
      azclose(proto_stream.get());
 
211
      azclose(&proto_stream);
178
212
      return ENOMEM;
179
213
    }
180
214
 
181
 
    azread_frm(proto_stream.get(), proto_string);
 
215
    azread_frm(&proto_stream, proto_string);
182
216
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
 
217
    if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
218
      error= HA_ERR_CRASHED_ON_USAGE;
185
219
 
186
 
    azclose(proto_stream.get());
 
220
    azclose(&proto_stream);
187
221
    free(proto_string);
188
222
  }
189
223
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
224
  return error;
197
225
}
198
226
 
199
227
 
200
228
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
 
229
                       TableShare &table_arg)
202
230
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
203
231
{
204
232
  /* Set our original buffer from pre-allocated memory */
236
264
{
237
265
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
266
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
 
267
  internal::fn_format(data_file_name, table_name.c_str(), "",
 
268
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
241
269
  /*
242
270
    We will use this lock for rows.
243
271
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
 
272
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
245
273
}
246
274
 
247
275
ArchiveShare::~ArchiveShare()
248
276
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
 
277
  thr_lock_delete(&lock);
 
278
  pthread_mutex_destroy(&mutex);
251
279
  /*
252
280
    We need to make sure we don't reset the crashed state.
253
281
    If we open a crashed file, wee need to close it as crashed unless
257
285
  */
258
286
  if (archive_write_open == true)
259
287
    (void)azclose(&archive_write);
 
288
  pthread_mutex_destroy(&archive_mutex);
260
289
}
261
290
 
262
291
bool ArchiveShare::prime(uint64_t *auto_increment)
263
292
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
 
293
  azio_stream archive_tmp;
265
294
 
266
295
  /*
267
296
    We read the meta file, but do not mark it dirty. Since we are not
269
298
    anything but reading... open it for write and we will generate null
270
299
    compression writes).
271
300
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
 
301
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
302
               AZ_METHOD_BLOCK)))
274
303
    return false;
275
304
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
 
305
  *auto_increment= archive_tmp.auto_increment + 1;
 
306
  rows_recorded= (ha_rows)archive_tmp.rows;
 
307
  crashed= archive_tmp.dirty;
279
308
  if (version < global_version)
280
309
  {
281
310
    version_rows= rows_recorded;
282
311
    version= global_version;
283
312
  }
284
 
  azclose(archive_tmp.get());
 
313
  azclose(&archive_tmp);
285
314
 
286
315
  return true;
287
316
}
296
325
*/
297
326
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
298
327
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
 
328
  pthread_mutex_lock(&archive_mutex);
 
329
 
 
330
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
331
  share= a_engine->findOpenTable(table_name);
304
332
 
305
333
  if (!share)
308
336
 
309
337
    if (share == NULL)
310
338
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
339
      pthread_mutex_unlock(&archive_mutex);
312
340
      *rc= HA_ERR_OUT_OF_MEM;
313
341
      return(NULL);
314
342
    }
315
343
 
316
344
    if (share->prime(&stats.auto_increment_value) == false)
317
345
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
346
      pthread_mutex_unlock(&archive_mutex);
319
347
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
348
      delete share;
321
349
 
323
351
    }
324
352
 
325
353
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
354
    thr_lock_init(&share->lock);
327
355
  }
328
356
  share->use_count++;
329
357
 
330
358
  if (share->crashed)
331
359
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
360
  pthread_mutex_unlock(&archive_mutex);
333
361
 
334
362
  return(share);
335
363
}
341
369
*/
342
370
int ha_archive::free_share()
343
371
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
345
 
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
372
  pthread_mutex_lock(&archive_mutex);
347
373
  if (!--share->use_count)
348
374
  {
 
375
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
376
    a_engine->deleteOpenTable(share->table_name);
350
377
    delete share;
351
378
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
379
  pthread_mutex_unlock(&archive_mutex);
353
380
 
354
381
  return 0;
355
382
}
361
388
    a gzip file that can be both read and written we keep a writer open
362
389
    that is shared amoung all open tables.
363
390
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
391
  if (!(azopen(&(share->archive_write), share->data_file_name,
365
392
               O_RDWR, AZ_METHOD_BLOCK)))
366
393
  {
367
394
    share->crashed= true;
387
414
  {
388
415
    az_method method;
389
416
 
390
 
    if (archive_aio_state())
 
417
    switch (archive_aio_state())
391
418
    {
 
419
    case false:
 
420
      method= AZ_METHOD_BLOCK;
 
421
      break;
 
422
    case true:
392
423
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
424
      break;
 
425
    default:
396
426
      method= AZ_METHOD_BLOCK;
397
427
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
428
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
399
429
                 method)))
400
430
    {
401
431
      share->crashed= true;
413
443
  Init out lock.
414
444
  We open the file we will read from.
415
445
*/
416
 
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
 
446
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
417
447
{
418
448
  int rc= 0;
419
449
  share= get_share(identifier.getPath().c_str(), &rc);
437
467
 
438
468
  assert(share);
439
469
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
 
470
  record_buffer= create_record_buffer(table->getShare()->getRecordLength() +
 
471
                                      ARCHIVE_ROW_HEADER_SIZE);
 
472
 
 
473
  if (!record_buffer)
 
474
  {
 
475
    free_share();
 
476
    return(HA_ERR_OUT_OF_MEM);
 
477
  }
 
478
 
 
479
  thr_lock_data_init(&share->lock, &lock, NULL);
443
480
 
444
481
  return(rc);
445
482
}
473
510
{
474
511
  int rc= 0;
475
512
 
476
 
  record_buffer.clear();
 
513
  destroy_record_buffer(record_buffer);
477
514
 
478
515
  /* First close stream */
479
516
  if (archive_reader_open == true)
499
536
 
500
537
int ArchiveEngine::doCreateTable(Session &,
501
538
                                 Table& table_arg,
502
 
                                 const drizzled::identifier::Table &identifier,
 
539
                                 const drizzled::TableIdentifier &identifier,
503
540
                                 drizzled::message::Table& proto)
504
541
{
 
542
  char name_buff[FN_REFLEN];
505
543
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
 
544
  azio_stream create_stream;            /* Archive file we are working with */
507
545
  uint64_t auto_increment_value;
508
546
  string serialized_proto;
509
547
 
521
559
 
522
560
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
561
      {
524
 
        return -1;
 
562
        error= -1;
 
563
        goto error;
525
564
      }
526
565
    }
527
566
  }
528
567
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
 
568
  /*
 
569
    We reuse name_buff since it is available.
 
570
  */
 
571
  internal::fn_format(name_buff, identifier.getPath().c_str(), "", ARZ,
 
572
                      MY_REPLACE_EXT | MY_UNPACK_FILENAME);
531
573
 
532
574
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
 
575
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
576
             AZ_METHOD_BLOCK) == 0)
535
577
  {
536
578
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
 
579
    goto error2;
540
580
  }
541
581
 
542
582
  try {
544
584
  }
545
585
  catch (...)
546
586
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
 
587
    goto error2;
550
588
  }
551
589
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
 
590
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
591
                  serialized_proto.length()))
554
592
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
 
593
    goto error2;
558
594
  }
559
595
 
560
596
  if (proto.options().has_comment())
561
597
  {
562
598
    int write_length;
563
599
 
564
 
    write_length= azwrite_comment(create_stream.get(),
 
600
    write_length= azwrite_comment(&create_stream,
565
601
                                  proto.options().comment().c_str(),
566
602
                                  proto.options().comment().length());
567
603
 
568
604
    if (write_length < 0)
569
605
    {
570
606
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
 
607
      goto error2;
574
608
    }
575
609
  }
576
610
 
578
612
    Yes you need to do this, because the starting value
579
613
    for the autoincrement may not be zero.
580
614
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
 
615
  create_stream.auto_increment= auto_increment_value ?
582
616
    auto_increment_value - 1 : 0;
583
617
 
584
 
  if (azclose(create_stream.get()))
 
618
  if (azclose(&create_stream))
585
619
  {
586
620
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
621
    goto error2;
590
622
  }
591
623
 
592
624
  return(0);
 
625
 
 
626
error2:
 
627
  unlink(name_buff);
 
628
 
 
629
error:
 
630
  /* Return error number, if we got one */
 
631
  return(error ? error : -1);
593
632
}
594
633
 
595
634
/*
603
642
  /* We pack the row for writing */
604
643
  r_pack_length= pack_row(buf);
605
644
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
645
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
646
  if (written != r_pack_length)
608
647
  {
609
648
    return(-1);
623
662
 
624
663
uint32_t ha_archive::max_row_length(const unsigned char *)
625
664
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
665
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
666
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
667
 
629
668
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
669
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
670
       ptr != end ;
632
671
       ptr++)
633
672
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
673
      length += 2 + ((Field_blob*)table->getField(*ptr))->get_length();
635
674
  }
636
675
 
637
676
  return length;
646
685
    return(HA_ERR_OUT_OF_MEM);
647
686
 
648
687
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
688
  memcpy(record_buffer->buffer, record, table->getShare()->null_bytes);
 
689
  ptr= record_buffer->buffer + table->getShare()->null_bytes;
651
690
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
691
  for (Field **field=table->getFields() ; *field ; field++)
653
692
  {
654
693
    if (!((*field)->is_null()))
655
694
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
695
  }
657
696
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
697
  return((unsigned int) (ptr - record_buffer->buffer));
659
698
}
660
699
 
661
700
 
673
712
  int rc;
674
713
  unsigned char *read_buf= NULL;
675
714
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
715
  unsigned char *record=  table->record[0];
677
716
 
678
717
  if (share->crashed)
679
718
    return(HA_ERR_CRASHED_ON_USAGE);
680
719
 
681
 
  pthread_mutex_lock(&share->mutex());
 
720
  pthread_mutex_lock(&share->mutex);
682
721
 
683
722
  if (share->archive_write_open == false)
684
723
    if (init_archive_writer())
685
724
      return(HA_ERR_CRASHED_ON_USAGE);
686
725
 
687
726
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
727
  if (table->next_number_field && record == table->record[0])
689
728
  {
690
729
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
730
    temp_auto= table->next_number_field->val_int();
692
731
 
693
732
    /*
694
733
      We don't support decremening auto_increment. They make the performance
695
734
      just cry.
696
735
    */
697
736
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
737
        table->getShare()->getKeyInfo(0).flags & HA_NOSAME)
699
738
    {
700
739
      rc= HA_ERR_FOUND_DUPP_KEY;
701
740
      goto error;
715
754
  share->rows_recorded++;
716
755
  rc= real_write_row(buf,  &(share->archive_write));
717
756
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
757
  pthread_mutex_unlock(&share->mutex);
719
758
  if (read_buf)
720
759
    free((unsigned char*) read_buf);
721
760
 
747
786
{
748
787
  int rc;
749
788
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
789
  current_k_offset= table->getShare()->getKeyInfo(0).key_part->offset;
751
790
  current_key= key;
752
791
  current_key_len= key_len;
753
792
 
832
871
/* Reallocate buffer if needed */
833
872
bool ha_archive::fix_rec_buff(unsigned int length)
834
873
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
874
  assert(record_buffer->buffer);
 
875
 
 
876
  if (length > record_buffer->length)
 
877
  {
 
878
    unsigned char *newptr;
 
879
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
880
      return(1);
 
881
    record_buffer->buffer= newptr;
 
882
    record_buffer->length= length;
 
883
  }
 
884
 
 
885
  assert(length <= record_buffer->length);
 
886
 
 
887
  return(0);
838
888
}
839
889
 
840
890
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
902
  }
853
903
 
854
904
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
905
  memcpy(record, ptr, table->getNullBytes());
 
906
  ptr+= table->getNullBytes();
 
907
  for (Field **field= table->getFields() ; *field ; field++)
858
908
  {
859
909
    if (!((*field)->is_null()))
860
910
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
911
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
912
    }
863
913
  }
864
914
  return(0);
893
943
  current_position= aztell(&archive);
894
944
  rc= get_row(&archive, buf);
895
945
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
946
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
947
 
898
948
  return(rc);
899
949
}
951
1001
int ha_archive::optimize()
952
1002
{
953
1003
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1004
  azio_stream writer;
 
1005
  char writer_filename[FN_REFLEN];
955
1006
 
956
1007
  init_archive_reader();
957
1008
 
971
1022
  azread_frm(&archive, proto_string);
972
1023
 
973
1024
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
 
1025
  internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
 
1026
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
976
1027
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
1028
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
1029
  {
979
1030
    free(proto_string);
980
1031
    return(HA_ERR_CRASHED_ON_USAGE);
981
1032
  }
982
1033
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
 
1034
  azwrite_frm(&writer, proto_string, archive.frm_length);
984
1035
 
985
1036
  /*
986
1037
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1004
1055
    */
1005
1056
    if (!rc)
1006
1057
    {
 
1058
      uint64_t x;
1007
1059
      uint64_t rows_restored;
1008
1060
      share->rows_recorded= 0;
1009
1061
      stats.auto_increment_value= 1;
1011
1063
 
1012
1064
      rows_restored= archive.rows;
1013
1065
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1066
      for (x= 0; x < rows_restored ; x++)
1015
1067
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1068
        rc= get_row(&archive, table->record[0]);
1017
1069
 
1018
1070
        if (rc != 0)
1019
1071
          break;
1020
1072
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1073
        real_write_row(table->record[0], &writer);
1022
1074
        /*
1023
1075
          Long term it should be possible to optimize this so that
1024
1076
          it is not called on each row.
1025
1077
        */
1026
 
        if (getTable()->found_next_number_field)
 
1078
        if (table->found_next_number_field)
1027
1079
        {
1028
 
          Field *field= getTable()->found_next_number_field;
 
1080
          Field *field= table->found_next_number_field;
1029
1081
 
1030
1082
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
1083
          field->setReadSet();
1032
1084
 
1033
1085
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1086
            (uint64_t) field->val_int(table->record[0] +
 
1087
                                       field->offset(table->record[0]));
1036
1088
          if (share->archive_write.auto_increment < auto_value)
1037
1089
            stats.auto_increment_value=
1038
1090
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1091
        }
1040
1092
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1093
      share->rows_recorded= (ha_rows)writer.rows;
1042
1094
    }
1043
1095
 
1044
1096
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1099
    }
1048
1100
  }
1049
1101
 
1050
 
  azclose(writer.get());
 
1102
  azclose(&writer);
1051
1103
  share->dirty= false;
1052
1104
 
1053
1105
  azclose(&archive);
1054
1106
 
1055
1107
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
 
1108
  rc = internal::my_rename(writer_filename,share->data_file_name,MYF(0));
1057
1109
 
1058
1110
  free(proto_string);
1059
1111
  return(rc);
1060
1112
error:
1061
1113
  free(proto_string);
1062
 
  azclose(writer.get());
 
1114
  azclose(&writer);
1063
1115
 
1064
1116
  return(rc);
1065
1117
}
1084
1136
 
1085
1137
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
1138
         lock_type <= TL_WRITE)
1087
 
        && ! session->doing_tablespace_operation())
 
1139
        && !session_tablespace_op(session))
1088
1140
      lock_type = TL_WRITE_ALLOW_WRITE;
1089
1141
 
1090
1142
    /*
1115
1167
    If dirty, we lock, and then reset/flush the data.
1116
1168
    I found that just calling azflush() doesn't always work.
1117
1169
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1170
  pthread_mutex_lock(&share->mutex);
1119
1171
  if (share->dirty == true)
1120
1172
  {
1121
1173
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1186
    cause the number to be inaccurate.
1135
1187
  */
1136
1188
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1189
  pthread_mutex_unlock(&share->mutex);
1138
1190
 
1139
1191
  scan_rows= stats.records;
1140
1192
  stats.deleted= 0;
1144
1196
  {
1145
1197
    struct stat file_stat;  // Stat information for the data file
1146
1198
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1199
    stat(share->data_file_name, &file_stat);
1148
1200
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1201
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1202
    stats.data_file_length= file_stat.st_size;
1151
1203
    stats.create_time= file_stat.st_ctime;
1152
1204
    stats.update_time= file_stat.st_mtime;
1158
1210
  if (flag & HA_STATUS_AUTO)
1159
1211
  {
1160
1212
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1213
    pthread_mutex_lock(&share->mutex);
1162
1214
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1215
    pthread_mutex_unlock(&share->mutex);
1164
1216
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1217
  }
1166
1218
 
1211
1263
{
1212
1264
  int rc= 0;
1213
1265
  const char *old_proc_info;
 
1266
  uint64_t x;
1214
1267
 
1215
 
  old_proc_info= session->get_proc_info();
1216
 
  session->set_proc_info("Checking table");
 
1268
  old_proc_info= get_session_proc_info(session);
 
1269
  set_session_proc_info(session, "Checking table");
1217
1270
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1271
  pthread_mutex_lock(&share->mutex);
1219
1272
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1273
  pthread_mutex_unlock(&share->mutex);
1221
1274
 
1222
1275
  /*
1223
1276
    Now we will rewind the archive file so that we are positioned at the
1226
1279
  init_archive_reader();
1227
1280
  azflush(&archive, Z_SYNC_FLUSH);
1228
1281
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1282
  for (x= 0; x < share->archive_write.rows; x++)
1230
1283
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1284
    rc= get_row(&archive, table->record[0]);
1232
1285
 
1233
1286
    if (rc != 0)
1234
1287
      break;
1235
1288
  }
1236
1289
 
1237
 
  session->set_proc_info(old_proc_info);
 
1290
  set_session_proc_info(session, old_proc_info);
1238
1291
 
1239
1292
  if ((rc && rc != HA_ERR_END_OF_FILE))
1240
1293
  {
1247
1300
  }
1248
1301
}
1249
1302
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
 
1303
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1304
{
 
1305
  archive_record_buffer *r;
 
1306
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1307
  {
 
1308
    return(NULL);
 
1309
  }
 
1310
  r->length= (int)length;
 
1311
 
 
1312
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1313
  {
 
1314
    free((char*) r);
 
1315
    return(NULL);
 
1316
  }
 
1317
 
 
1318
  return(r);
 
1319
}
 
1320
 
 
1321
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1322
{
 
1323
  free((char*) r->buffer);
 
1324
  free((char*) r);
 
1325
  return;
 
1326
}
 
1327
 
 
1328
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1251
1329
{
1252
1330
  int error= 0;
1253
1331
 
1265
1343
}
1266
1344
 
1267
1345
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const identifier::Table &identifier)
 
1346
                                     const TableIdentifier &identifier)
1269
1347
{
1270
1348
  string proto_path(identifier.getPath());
1271
1349
  proto_path.append(ARZ);
1279
1357
}
1280
1358
 
1281
1359
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::identifier::Schema &schema_identifier,
1283
 
                                          drizzled::identifier::Table::vector &set_of_identifiers)
 
1360
                                          const drizzled::SchemaIdentifier &schema_identifier,
 
1361
                                          drizzled::TableIdentifiers &set_of_identifiers)
1284
1362
{
1285
1363
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
1364
 
1302
1380
      char uname[NAME_LEN + 1];
1303
1381
      uint32_t file_name_len;
1304
1382
 
1305
 
      file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
1383
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
1384
      // TODO: Remove need for memory copy here
1307
1385
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
1386
 
1309
 
      set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
 
1387
      set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1310
1388
    }
1311
1389
  }
1312
1390
}
 
1391