~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

  • Committer: Monty Taylor
  • Date: 2010-04-22 02:46:23 UTC
  • mto: (1497.3.4 enable-dtrace)
  • mto: This revision was merged to the branch mainline in revision 1527.
  • Revision ID: mordred@inaugust.com-20100422024623-4urw8fi8eraci08p
Don't overwrite the pandora_vc_revinfo file if we don't have new
authoratative information.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
 
13
13
  You should have received a copy of the GNU General Public License
14
14
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
15
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16
16
 
17
17
 
18
18
#include "config.h"
19
19
 
20
20
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
21
 
24
22
using namespace std;
25
23
using namespace drizzled;
93
91
    -Brian
94
92
*/
95
93
 
 
94
/* Variables for archive share methods */
 
95
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
 
96
 
96
97
/* When the engine starts up set the first version */
97
98
static uint64_t global_version= 1;
98
99
 
131
132
}
132
133
 
133
134
 
134
 
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
 
135
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory, 
 
136
                                    SchemaIdentifier&,
 
137
                                    set<string>& set_of_names)
 
138
{
 
139
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
140
 
 
141
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
142
       entry_iter != entries.end(); ++entry_iter)
 
143
  {
 
144
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
145
    const string *filename= &entry->filename;
 
146
 
 
147
    assert(filename->size());
 
148
 
 
149
    const char *ext= strchr(filename->c_str(), '.');
 
150
 
 
151
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
152
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
 
153
    {  }
 
154
    else
 
155
    {
 
156
      char uname[NAME_LEN + 1];
 
157
      uint32_t file_name_len;
 
158
 
 
159
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
160
      // TODO: Remove need for memory copy here
 
161
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
162
      set_of_names.insert(uname);
 
163
    }
 
164
  }
 
165
}
 
166
 
 
167
 
 
168
int ArchiveEngine::doDropTable(Session&, TableIdentifier &identifier)
135
169
{
136
170
  string new_path(identifier.getPath());
137
171
 
148
182
}
149
183
 
150
184
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const TableIdentifier &identifier,
 
185
                                        TableIdentifier &identifier,
152
186
                                        drizzled::message::Table &table_proto)
153
187
{
154
188
  struct stat stat_info;
166
200
    error= EEXIST;
167
201
 
168
202
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
 
203
    azio_stream proto_stream;
170
204
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
205
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
206
      return HA_ERR_CRASHED_ON_USAGE;
173
207
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
 
208
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
209
    if (proto_string == NULL)
176
210
    {
177
 
      azclose(proto_stream.get());
 
211
      azclose(&proto_stream);
178
212
      return ENOMEM;
179
213
    }
180
214
 
181
 
    azread_frm(proto_stream.get(), proto_string);
 
215
    azread_frm(&proto_stream, proto_string);
182
216
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
 
217
    if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
218
      error= HA_ERR_CRASHED_ON_USAGE;
185
219
 
186
 
    azclose(proto_stream.get());
 
220
    azclose(&proto_stream);
187
221
    free(proto_string);
188
222
  }
189
223
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
224
  return error;
197
225
}
198
226
 
199
227
 
200
228
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
 
229
                       TableShare &table_arg)
202
230
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
203
231
{
204
232
  /* Set our original buffer from pre-allocated memory */
236
264
{
237
265
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
266
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
 
267
  internal::fn_format(data_file_name, table_name.c_str(), "",
 
268
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
241
269
  /*
242
270
    We will use this lock for rows.
243
271
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
 
272
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
245
273
}
246
274
 
247
275
ArchiveShare::~ArchiveShare()
248
276
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
 
277
  thr_lock_delete(&lock);
 
278
  pthread_mutex_destroy(&mutex);
251
279
  /*
252
280
    We need to make sure we don't reset the crashed state.
253
281
    If we open a crashed file, wee need to close it as crashed unless
257
285
  */
258
286
  if (archive_write_open == true)
259
287
    (void)azclose(&archive_write);
 
288
  pthread_mutex_destroy(&archive_mutex);
260
289
}
261
290
 
262
291
bool ArchiveShare::prime(uint64_t *auto_increment)
263
292
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
 
293
  azio_stream archive_tmp;
265
294
 
266
295
  /*
267
296
    We read the meta file, but do not mark it dirty. Since we are not
269
298
    anything but reading... open it for write and we will generate null
270
299
    compression writes).
271
300
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
 
301
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
302
               AZ_METHOD_BLOCK)))
274
303
    return false;
275
304
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
 
305
  *auto_increment= archive_tmp.auto_increment + 1;
 
306
  rows_recorded= (ha_rows)archive_tmp.rows;
 
307
  crashed= archive_tmp.dirty;
279
308
  if (version < global_version)
280
309
  {
281
310
    version_rows= rows_recorded;
282
311
    version= global_version;
283
312
  }
284
 
  azclose(archive_tmp.get());
 
313
  azclose(&archive_tmp);
285
314
 
286
315
  return true;
287
316
}
296
325
*/
297
326
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
298
327
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
 
328
  pthread_mutex_lock(&archive_mutex);
 
329
 
 
330
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
331
  share= a_engine->findOpenTable(table_name);
304
332
 
305
333
  if (!share)
308
336
 
309
337
    if (share == NULL)
310
338
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
339
      pthread_mutex_unlock(&archive_mutex);
312
340
      *rc= HA_ERR_OUT_OF_MEM;
313
341
      return(NULL);
314
342
    }
315
343
 
316
344
    if (share->prime(&stats.auto_increment_value) == false)
317
345
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
346
      pthread_mutex_unlock(&archive_mutex);
319
347
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
348
      delete share;
321
349
 
323
351
    }
324
352
 
325
353
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
354
    thr_lock_init(&share->lock);
327
355
  }
328
356
  share->use_count++;
329
357
 
330
358
  if (share->crashed)
331
359
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
360
  pthread_mutex_unlock(&archive_mutex);
333
361
 
334
362
  return(share);
335
363
}
341
369
*/
342
370
int ha_archive::free_share()
343
371
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
345
 
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
372
  pthread_mutex_lock(&archive_mutex);
347
373
  if (!--share->use_count)
348
374
  {
 
375
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
376
    a_engine->deleteOpenTable(share->table_name);
350
377
    delete share;
351
378
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
379
  pthread_mutex_unlock(&archive_mutex);
353
380
 
354
381
  return 0;
355
382
}
361
388
    a gzip file that can be both read and written we keep a writer open
362
389
    that is shared amoung all open tables.
363
390
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
391
  if (!(azopen(&(share->archive_write), share->data_file_name,
365
392
               O_RDWR, AZ_METHOD_BLOCK)))
366
393
  {
367
394
    share->crashed= true;
387
414
  {
388
415
    az_method method;
389
416
 
390
 
    if (archive_aio_state())
 
417
    switch (archive_aio_state())
391
418
    {
 
419
    case false:
 
420
      method= AZ_METHOD_BLOCK;
 
421
      break;
 
422
    case true:
392
423
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
424
      break;
 
425
    default:
396
426
      method= AZ_METHOD_BLOCK;
397
427
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
428
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
399
429
                 method)))
400
430
    {
401
431
      share->crashed= true;
413
443
  Init out lock.
414
444
  We open the file we will read from.
415
445
*/
416
 
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
446
int ha_archive::open(const char *name, int, uint32_t)
417
447
{
418
448
  int rc= 0;
419
 
  share= get_share(identifier.getPath().c_str(), &rc);
 
449
  share= get_share(name, &rc);
420
450
 
421
451
  /** 
422
452
    We either fix it ourselves, or we just take it offline 
437
467
 
438
468
  assert(share);
439
469
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
 
470
  record_buffer= create_record_buffer(table->s->reclength +
 
471
                                      ARCHIVE_ROW_HEADER_SIZE);
 
472
 
 
473
  if (!record_buffer)
 
474
  {
 
475
    free_share();
 
476
    return(HA_ERR_OUT_OF_MEM);
 
477
  }
 
478
 
 
479
  thr_lock_data_init(&share->lock, &lock, NULL);
443
480
 
444
481
  return(rc);
445
482
}
446
483
 
447
 
// Should never be called
448
 
int ha_archive::open(const char *, int, uint32_t)
449
 
{
450
 
  assert(0);
451
 
  return -1;
452
 
}
453
 
 
454
484
 
455
485
/*
456
486
  Closes the file.
473
503
{
474
504
  int rc= 0;
475
505
 
476
 
  record_buffer.clear();
 
506
  destroy_record_buffer(record_buffer);
477
507
 
478
508
  /* First close stream */
479
509
  if (archive_reader_open == true)
499
529
 
500
530
int ArchiveEngine::doCreateTable(Session &,
501
531
                                 Table& table_arg,
502
 
                                 const drizzled::TableIdentifier &identifier,
 
532
                                 drizzled::TableIdentifier &identifier,
503
533
                                 drizzled::message::Table& proto)
504
534
{
 
535
  char name_buff[FN_REFLEN];
505
536
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
 
537
  azio_stream create_stream;            /* Archive file we are working with */
507
538
  uint64_t auto_increment_value;
508
539
  string serialized_proto;
509
540
 
511
542
 
512
543
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
513
544
  {
514
 
    KeyInfo *pos= &table_arg.key_info[key];
515
 
    KeyPartInfo *key_part=     pos->key_part;
516
 
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
545
    KEY *pos= table_arg.key_info+key;
 
546
    KEY_PART_INFO *key_part=     pos->key_part;
 
547
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
517
548
 
518
549
    for (; key_part != key_part_end; key_part++)
519
550
    {
521
552
 
522
553
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
554
      {
524
 
        return -1;
 
555
        error= -1;
 
556
        goto error;
525
557
      }
526
558
    }
527
559
  }
528
560
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
 
561
  /*
 
562
    We reuse name_buff since it is available.
 
563
  */
 
564
  internal::fn_format(name_buff, identifier.getPath().c_str(), "", ARZ,
 
565
                      MY_REPLACE_EXT | MY_UNPACK_FILENAME);
531
566
 
532
567
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
 
568
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
569
             AZ_METHOD_BLOCK) == 0)
535
570
  {
536
571
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
540
 
  }
541
 
 
542
 
  try {
543
 
    proto.SerializeToString(&serialized_proto);
544
 
  }
545
 
  catch (...)
546
 
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
550
 
  }
551
 
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
 
572
    goto error2;
 
573
  }
 
574
 
 
575
  proto.SerializeToString(&serialized_proto);
 
576
 
 
577
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
578
                  serialized_proto.length()))
554
 
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
558
 
  }
 
579
    goto error2;
559
580
 
560
581
  if (proto.options().has_comment())
561
582
  {
562
583
    int write_length;
563
584
 
564
 
    write_length= azwrite_comment(create_stream.get(),
 
585
    write_length= azwrite_comment(&create_stream,
565
586
                                  proto.options().comment().c_str(),
566
587
                                  proto.options().comment().length());
567
588
 
568
589
    if (write_length < 0)
569
590
    {
570
591
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
 
592
      goto error2;
574
593
    }
575
594
  }
576
595
 
578
597
    Yes you need to do this, because the starting value
579
598
    for the autoincrement may not be zero.
580
599
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
 
600
  create_stream.auto_increment= auto_increment_value ?
582
601
    auto_increment_value - 1 : 0;
583
602
 
584
 
  if (azclose(create_stream.get()))
 
603
  if (azclose(&create_stream))
585
604
  {
586
605
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
606
    goto error2;
590
607
  }
591
608
 
592
609
  return(0);
 
610
 
 
611
error2:
 
612
  unlink(name_buff);
 
613
 
 
614
error:
 
615
  /* Return error number, if we got one */
 
616
  return(error ? error : -1);
593
617
}
594
618
 
595
619
/*
603
627
  /* We pack the row for writing */
604
628
  r_pack_length= pack_row(buf);
605
629
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
630
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
631
  if (written != r_pack_length)
608
632
  {
609
633
    return(-1);
623
647
 
624
648
uint32_t ha_archive::max_row_length(const unsigned char *)
625
649
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
650
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
651
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
652
 
629
653
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
654
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
655
       ptr != end ;
632
656
       ptr++)
633
657
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
658
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
635
659
  }
636
660
 
637
661
  return length;
646
670
    return(HA_ERR_OUT_OF_MEM);
647
671
 
648
672
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
673
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
674
  ptr= record_buffer->buffer + table->s->null_bytes;
651
675
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
676
  for (Field **field=table->field ; *field ; field++)
653
677
  {
654
678
    if (!((*field)->is_null()))
655
679
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
680
  }
657
681
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
682
  return((unsigned int) (ptr - record_buffer->buffer));
659
683
}
660
684
 
661
685
 
673
697
  int rc;
674
698
  unsigned char *read_buf= NULL;
675
699
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
700
  unsigned char *record=  table->record[0];
677
701
 
678
702
  if (share->crashed)
679
703
    return(HA_ERR_CRASHED_ON_USAGE);
680
704
 
681
 
  pthread_mutex_lock(&share->mutex());
 
705
  ha_statistic_increment(&system_status_var::ha_write_count);
 
706
  pthread_mutex_lock(&share->mutex);
682
707
 
683
708
  if (share->archive_write_open == false)
684
709
    if (init_archive_writer())
685
710
      return(HA_ERR_CRASHED_ON_USAGE);
686
711
 
687
712
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
713
  if (table->next_number_field && record == table->record[0])
689
714
  {
 
715
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
716
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
717
    temp_auto= table->next_number_field->val_int();
692
718
 
693
719
    /*
694
720
      We don't support decremening auto_increment. They make the performance
695
721
      just cry.
696
722
    */
697
723
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
724
        mkey->flags & HA_NOSAME)
699
725
    {
700
726
      rc= HA_ERR_FOUND_DUPP_KEY;
701
727
      goto error;
715
741
  share->rows_recorded++;
716
742
  rc= real_write_row(buf,  &(share->archive_write));
717
743
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
744
  pthread_mutex_unlock(&share->mutex);
719
745
  if (read_buf)
720
746
    free((unsigned char*) read_buf);
721
747
 
730
756
  *first_value= share->archive_write.auto_increment + 1;
731
757
}
732
758
 
733
 
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
 
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
759
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
760
int ha_archive::index_init(uint32_t keynr, bool)
735
761
{
736
762
  active_index= keynr;
737
763
  return(0);
747
773
{
748
774
  int rc;
749
775
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
776
  KEY *mkey= &table->s->key_info[0];
 
777
  current_k_offset= mkey->key_part->offset;
751
778
  current_key= key;
752
779
  current_key_len= key_len;
753
780
 
754
 
  rc= doStartTableScan(true);
 
781
  rc= rnd_init(true);
755
782
 
756
783
  if (rc)
757
784
    goto error;
795
822
  we assume the position will be set.
796
823
*/
797
824
 
798
 
int ha_archive::doStartTableScan(bool scan)
 
825
int ha_archive::rnd_init(bool scan)
799
826
{
800
827
  if (share->crashed)
801
828
      return(HA_ERR_CRASHED_ON_USAGE);
832
859
/* Reallocate buffer if needed */
833
860
bool ha_archive::fix_rec_buff(unsigned int length)
834
861
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
862
  assert(record_buffer->buffer);
 
863
 
 
864
  if (length > record_buffer->length)
 
865
  {
 
866
    unsigned char *newptr;
 
867
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
868
      return(1);
 
869
    record_buffer->buffer= newptr;
 
870
    record_buffer->length= length;
 
871
  }
 
872
 
 
873
  assert(length <= record_buffer->length);
 
874
 
 
875
  return(0);
838
876
}
839
877
 
840
878
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
890
  }
853
891
 
854
892
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
893
  memcpy(record, ptr, table->getNullBytes());
 
894
  ptr+= table->getNullBytes();
 
895
  for (Field **field=table->field ; *field ; field++)
858
896
  {
859
897
    if (!((*field)->is_null()))
860
898
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
899
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
900
    }
863
901
  }
864
902
  return(0);
893
931
  current_position= aztell(&archive);
894
932
  rc= get_row(&archive, buf);
895
933
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
934
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
935
 
898
936
  return(rc);
899
937
}
951
989
int ha_archive::optimize()
952
990
{
953
991
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
992
  azio_stream writer;
 
993
  char writer_filename[FN_REFLEN];
955
994
 
956
995
  init_archive_reader();
957
996
 
971
1010
  azread_frm(&archive, proto_string);
972
1011
 
973
1012
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
 
1013
  internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
 
1014
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
976
1015
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
1016
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
1017
  {
979
1018
    free(proto_string);
980
1019
    return(HA_ERR_CRASHED_ON_USAGE);
981
1020
  }
982
1021
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
 
1022
  azwrite_frm(&writer, proto_string, archive.frm_length);
984
1023
 
985
1024
  /*
986
1025
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1004
1043
    */
1005
1044
    if (!rc)
1006
1045
    {
 
1046
      uint64_t x;
1007
1047
      uint64_t rows_restored;
1008
1048
      share->rows_recorded= 0;
1009
1049
      stats.auto_increment_value= 1;
1011
1051
 
1012
1052
      rows_restored= archive.rows;
1013
1053
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1054
      for (x= 0; x < rows_restored ; x++)
1015
1055
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1056
        rc= get_row(&archive, table->record[0]);
1017
1057
 
1018
1058
        if (rc != 0)
1019
1059
          break;
1020
1060
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1061
        real_write_row(table->record[0], &writer);
1022
1062
        /*
1023
1063
          Long term it should be possible to optimize this so that
1024
1064
          it is not called on each row.
1025
1065
        */
1026
 
        if (getTable()->found_next_number_field)
 
1066
        if (table->found_next_number_field)
1027
1067
        {
1028
 
          Field *field= getTable()->found_next_number_field;
 
1068
          Field *field= table->found_next_number_field;
1029
1069
 
1030
1070
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
1071
          field->setReadSet();
1032
1072
 
1033
1073
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1074
            (uint64_t) field->val_int(table->record[0] +
 
1075
                                       field->offset(table->record[0]));
1036
1076
          if (share->archive_write.auto_increment < auto_value)
1037
1077
            stats.auto_increment_value=
1038
1078
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1079
        }
1040
1080
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1081
      share->rows_recorded= (ha_rows)writer.rows;
1042
1082
    }
1043
1083
 
1044
1084
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1087
    }
1048
1088
  }
1049
1089
 
1050
 
  azclose(writer.get());
 
1090
  azclose(&writer);
1051
1091
  share->dirty= false;
1052
1092
 
1053
1093
  azclose(&archive);
1054
1094
 
1055
1095
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
 
1096
  rc = internal::my_rename(writer_filename,share->data_file_name,MYF(0));
1057
1097
 
1058
1098
  free(proto_string);
1059
1099
  return(rc);
1060
1100
error:
1061
1101
  free(proto_string);
1062
 
  azclose(writer.get());
 
1102
  azclose(&writer);
1063
1103
 
1064
1104
  return(rc);
1065
1105
}
1115
1155
    If dirty, we lock, and then reset/flush the data.
1116
1156
    I found that just calling azflush() doesn't always work.
1117
1157
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1158
  pthread_mutex_lock(&share->mutex);
1119
1159
  if (share->dirty == true)
1120
1160
  {
1121
1161
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1174
    cause the number to be inaccurate.
1135
1175
  */
1136
1176
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1177
  pthread_mutex_unlock(&share->mutex);
1138
1178
 
1139
1179
  scan_rows= stats.records;
1140
1180
  stats.deleted= 0;
1144
1184
  {
1145
1185
    struct stat file_stat;  // Stat information for the data file
1146
1186
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1187
    stat(share->data_file_name, &file_stat);
1148
1188
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1189
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1190
    stats.data_file_length= file_stat.st_size;
1151
1191
    stats.create_time= file_stat.st_ctime;
1152
1192
    stats.update_time= file_stat.st_mtime;
1158
1198
  if (flag & HA_STATUS_AUTO)
1159
1199
  {
1160
1200
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1201
    pthread_mutex_lock(&share->mutex);
1162
1202
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1203
    pthread_mutex_unlock(&share->mutex);
1164
1204
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1205
  }
1166
1206
 
1211
1251
{
1212
1252
  int rc= 0;
1213
1253
  const char *old_proc_info;
 
1254
  uint64_t x;
1214
1255
 
1215
1256
  old_proc_info= get_session_proc_info(session);
1216
1257
  set_session_proc_info(session, "Checking table");
1217
1258
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1259
  pthread_mutex_lock(&share->mutex);
1219
1260
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1261
  pthread_mutex_unlock(&share->mutex);
1221
1262
 
1222
1263
  /*
1223
1264
    Now we will rewind the archive file so that we are positioned at the
1226
1267
  init_archive_reader();
1227
1268
  azflush(&archive, Z_SYNC_FLUSH);
1228
1269
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1270
  for (x= 0; x < share->archive_write.rows; x++)
1230
1271
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1272
    rc= get_row(&archive, table->record[0]);
1232
1273
 
1233
1274
    if (rc != 0)
1234
1275
      break;
1247
1288
  }
1248
1289
}
1249
1290
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
 
1291
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1292
{
 
1293
  archive_record_buffer *r;
 
1294
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1295
  {
 
1296
    return(NULL);
 
1297
  }
 
1298
  r->length= (int)length;
 
1299
 
 
1300
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1301
  {
 
1302
    free((char*) r);
 
1303
    return(NULL);
 
1304
  }
 
1305
 
 
1306
  return(r);
 
1307
}
 
1308
 
 
1309
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1310
{
 
1311
  free((char*) r->buffer);
 
1312
  free((char*) r);
 
1313
  return;
 
1314
}
 
1315
 
 
1316
int ArchiveEngine::doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to)
1251
1317
{
1252
1318
  int error= 0;
1253
1319
 
1265
1331
}
1266
1332
 
1267
1333
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const TableIdentifier &identifier)
 
1334
                                     TableIdentifier &identifier)
1269
1335
{
1270
1336
  string proto_path(identifier.getPath());
1271
1337
  proto_path.append(ARZ);
1279
1345
}
1280
1346
 
1281
1347
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::SchemaIdentifier &schema_identifier,
1283
 
                                          drizzled::TableIdentifier::vector &set_of_identifiers)
 
1348
                                          drizzled::SchemaIdentifier &schema_identifier,
 
1349
                                          drizzled::TableIdentifiers &set_of_identifiers)
1284
1350
{
1285
1351
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
1352
 
1302
1368
      char uname[NAME_LEN + 1];
1303
1369
      uint32_t file_name_len;
1304
1370
 
1305
 
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
1371
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
1372
      // TODO: Remove need for memory copy here
1307
1373
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
1374
 
1310
1376
    }
1311
1377
  }
1312
1378
}
 
1379