~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

  • Committer: Brian Aker
  • Date: 2010-03-15 21:50:05 UTC
  • mto: This revision was merged to the branch mainline in revision 1343.
  • Revision ID: brian@gaz-20100315215005-oqoblpbll9n0albj
Merge of table cache/def DD.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
 
13
13
  You should have received a copy of the GNU General Public License
14
14
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
15
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16
16
 
17
17
 
18
18
#include "config.h"
19
19
 
20
20
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
21
 
24
22
using namespace std;
25
23
using namespace drizzled;
93
91
    -Brian
94
92
*/
95
93
 
 
94
/* Variables for archive share methods */
 
95
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
 
96
 
96
97
/* When the engine starts up set the first version */
97
98
static uint64_t global_version= 1;
98
99
 
131
132
}
132
133
 
133
134
 
134
 
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
135
 
{
136
 
  string new_path(identifier.getPath());
 
135
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory, 
 
136
                                    string&, 
 
137
                                    set<string>& set_of_names)
 
138
{
 
139
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
140
 
 
141
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
142
       entry_iter != entries.end(); ++entry_iter)
 
143
  {
 
144
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
145
    const string *filename= &entry->filename;
 
146
 
 
147
    assert(filename->size());
 
148
 
 
149
    const char *ext= strchr(filename->c_str(), '.');
 
150
 
 
151
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
152
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
 
153
    {  }
 
154
    else
 
155
    {
 
156
      char uname[NAME_LEN + 1];
 
157
      uint32_t file_name_len;
 
158
 
 
159
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
160
      // TODO: Remove need for memory copy here
 
161
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
162
      set_of_names.insert(uname);
 
163
    }
 
164
  }
 
165
}
 
166
 
 
167
 
 
168
int ArchiveEngine::doDropTable(Session&,
 
169
                               const string &table_path)
 
170
{
 
171
  string new_path(table_path);
137
172
 
138
173
  new_path+= ARZ;
139
174
 
148
183
}
149
184
 
150
185
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const identifier::Table &identifier,
152
 
                                        drizzled::message::Table &table_proto)
 
186
                                        const char* path,
 
187
                                        const char *,
 
188
                                        const char *,
 
189
                                        const bool,
 
190
                                        drizzled::message::Table *table_proto)
153
191
{
154
192
  struct stat stat_info;
155
193
  int error= ENOENT;
156
194
  string proto_path;
157
195
 
158
196
  proto_path.reserve(FN_REFLEN);
159
 
  proto_path.assign(identifier.getPath());
 
197
  proto_path.assign(path);
160
198
 
161
199
  proto_path.append(ARZ);
162
200
 
165
203
  else
166
204
    error= EEXIST;
167
205
 
 
206
  if (table_proto)
168
207
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
 
208
    azio_stream proto_stream;
170
209
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
210
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
211
      return HA_ERR_CRASHED_ON_USAGE;
173
212
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
 
213
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
214
    if (proto_string == NULL)
176
215
    {
177
 
      azclose(proto_stream.get());
 
216
      azclose(&proto_stream);
178
217
      return ENOMEM;
179
218
    }
180
219
 
181
 
    azread_frm(proto_stream.get(), proto_string);
 
220
    azread_frm(&proto_stream, proto_string);
182
221
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
 
222
    if (table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
223
      error= HA_ERR_CRASHED_ON_USAGE;
185
224
 
186
 
    azclose(proto_stream.get());
 
225
    azclose(&proto_stream);
187
226
    free(proto_string);
188
227
  }
189
228
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
229
  return error;
197
230
}
198
231
 
199
232
 
200
233
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
 
234
                       TableShare &table_arg)
202
235
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
203
236
{
204
237
  /* Set our original buffer from pre-allocated memory */
236
269
{
237
270
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
271
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
 
272
  internal::fn_format(data_file_name, table_name.c_str(), "",
 
273
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
241
274
  /*
242
275
    We will use this lock for rows.
243
276
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
 
277
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
245
278
}
246
279
 
247
280
ArchiveShare::~ArchiveShare()
248
281
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
 
282
  thr_lock_delete(&lock);
 
283
  pthread_mutex_destroy(&mutex);
251
284
  /*
252
285
    We need to make sure we don't reset the crashed state.
253
286
    If we open a crashed file, wee need to close it as crashed unless
261
294
 
262
295
bool ArchiveShare::prime(uint64_t *auto_increment)
263
296
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
 
297
  azio_stream archive_tmp;
265
298
 
266
299
  /*
267
300
    We read the meta file, but do not mark it dirty. Since we are not
269
302
    anything but reading... open it for write and we will generate null
270
303
    compression writes).
271
304
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
 
305
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
306
               AZ_METHOD_BLOCK)))
274
307
    return false;
275
308
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
 
309
  *auto_increment= archive_tmp.auto_increment + 1;
 
310
  rows_recorded= (ha_rows)archive_tmp.rows;
 
311
  crashed= archive_tmp.dirty;
279
312
  if (version < global_version)
280
313
  {
281
314
    version_rows= rows_recorded;
282
315
    version= global_version;
283
316
  }
284
 
  azclose(archive_tmp.get());
 
317
  azclose(&archive_tmp);
285
318
 
286
319
  return true;
287
320
}
296
329
*/
297
330
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
298
331
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
 
332
  pthread_mutex_lock(&archive_mutex);
 
333
 
 
334
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
335
  share= a_engine->findOpenTable(table_name);
304
336
 
305
337
  if (!share)
308
340
 
309
341
    if (share == NULL)
310
342
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
343
      pthread_mutex_unlock(&archive_mutex);
312
344
      *rc= HA_ERR_OUT_OF_MEM;
313
345
      return(NULL);
314
346
    }
315
347
 
316
348
    if (share->prime(&stats.auto_increment_value) == false)
317
349
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
350
      pthread_mutex_unlock(&archive_mutex);
319
351
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
352
      delete share;
321
353
 
323
355
    }
324
356
 
325
357
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
358
    thr_lock_init(&share->lock);
327
359
  }
328
360
  share->use_count++;
329
361
 
330
362
  if (share->crashed)
331
363
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
364
  pthread_mutex_unlock(&archive_mutex);
333
365
 
334
366
  return(share);
335
367
}
341
373
*/
342
374
int ha_archive::free_share()
343
375
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
345
 
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
376
  pthread_mutex_lock(&archive_mutex);
347
377
  if (!--share->use_count)
348
378
  {
 
379
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
380
    a_engine->deleteOpenTable(share->table_name);
350
381
    delete share;
351
382
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
383
  pthread_mutex_unlock(&archive_mutex);
353
384
 
354
385
  return 0;
355
386
}
361
392
    a gzip file that can be both read and written we keep a writer open
362
393
    that is shared amoung all open tables.
363
394
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
395
  if (!(azopen(&(share->archive_write), share->data_file_name,
365
396
               O_RDWR, AZ_METHOD_BLOCK)))
366
397
  {
367
398
    share->crashed= true;
387
418
  {
388
419
    az_method method;
389
420
 
390
 
    if (archive_aio_state())
 
421
    switch (archive_aio_state())
391
422
    {
 
423
    case false:
 
424
      method= AZ_METHOD_BLOCK;
 
425
      break;
 
426
    case true:
392
427
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
428
      break;
 
429
    default:
396
430
      method= AZ_METHOD_BLOCK;
397
431
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
432
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
399
433
                 method)))
400
434
    {
401
435
      share->crashed= true;
413
447
  Init out lock.
414
448
  We open the file we will read from.
415
449
*/
416
 
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
 
450
int ha_archive::open(const char *name, int, uint32_t)
417
451
{
418
452
  int rc= 0;
419
 
  share= get_share(identifier.getPath().c_str(), &rc);
 
453
  share= get_share(name, &rc);
420
454
 
421
455
  /** 
422
456
    We either fix it ourselves, or we just take it offline 
437
471
 
438
472
  assert(share);
439
473
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
 
474
  record_buffer= create_record_buffer(table->s->reclength +
 
475
                                      ARCHIVE_ROW_HEADER_SIZE);
 
476
 
 
477
  if (!record_buffer)
 
478
  {
 
479
    free_share();
 
480
    return(HA_ERR_OUT_OF_MEM);
 
481
  }
 
482
 
 
483
  thr_lock_data_init(&share->lock, &lock, NULL);
443
484
 
444
485
  return(rc);
445
486
}
446
487
 
447
 
// Should never be called
448
 
int ha_archive::open(const char *, int, uint32_t)
449
 
{
450
 
  assert(0);
451
 
  return -1;
452
 
}
453
 
 
454
488
 
455
489
/*
456
490
  Closes the file.
473
507
{
474
508
  int rc= 0;
475
509
 
476
 
  record_buffer.clear();
 
510
  destroy_record_buffer(record_buffer);
477
511
 
478
512
  /* First close stream */
479
513
  if (archive_reader_open == true)
497
531
  of creation.
498
532
*/
499
533
 
500
 
int ArchiveEngine::doCreateTable(Session &,
 
534
int ArchiveEngine::doCreateTable(Session *,
 
535
                                 const char *table_name,
501
536
                                 Table& table_arg,
502
 
                                 const drizzled::identifier::Table &identifier,
503
537
                                 drizzled::message::Table& proto)
504
538
{
 
539
  char name_buff[FN_REFLEN];
505
540
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
 
541
  azio_stream create_stream;            /* Archive file we are working with */
507
542
  uint64_t auto_increment_value;
508
543
  string serialized_proto;
509
544
 
511
546
 
512
547
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
513
548
  {
514
 
    KeyInfo *pos= &table_arg.key_info[key];
515
 
    KeyPartInfo *key_part=     pos->key_part;
516
 
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
549
    KEY *pos= table_arg.key_info+key;
 
550
    KEY_PART_INFO *key_part=     pos->key_part;
 
551
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
517
552
 
518
553
    for (; key_part != key_part_end; key_part++)
519
554
    {
521
556
 
522
557
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
558
      {
524
 
        return -1;
 
559
        error= -1;
 
560
        goto error;
525
561
      }
526
562
    }
527
563
  }
528
564
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
 
565
  /*
 
566
    We reuse name_buff since it is available.
 
567
  */
 
568
  internal::fn_format(name_buff, table_name, "", ARZ,
 
569
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
531
570
 
532
571
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
 
572
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
573
             AZ_METHOD_BLOCK) == 0)
535
574
  {
536
575
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
540
 
  }
541
 
 
542
 
  try {
543
 
    proto.SerializeToString(&serialized_proto);
544
 
  }
545
 
  catch (...)
546
 
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
550
 
  }
551
 
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
 
576
    goto error2;
 
577
  }
 
578
 
 
579
  proto.SerializeToString(&serialized_proto);
 
580
 
 
581
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
582
                  serialized_proto.length()))
554
 
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
558
 
  }
 
583
    goto error2;
559
584
 
560
585
  if (proto.options().has_comment())
561
586
  {
562
587
    int write_length;
563
588
 
564
 
    write_length= azwrite_comment(create_stream.get(),
 
589
    write_length= azwrite_comment(&create_stream,
565
590
                                  proto.options().comment().c_str(),
566
591
                                  proto.options().comment().length());
567
592
 
568
593
    if (write_length < 0)
569
594
    {
570
595
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
 
596
      goto error2;
574
597
    }
575
598
  }
576
599
 
578
601
    Yes you need to do this, because the starting value
579
602
    for the autoincrement may not be zero.
580
603
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
 
604
  create_stream.auto_increment= auto_increment_value ?
582
605
    auto_increment_value - 1 : 0;
583
606
 
584
 
  if (azclose(create_stream.get()))
 
607
  if (azclose(&create_stream))
585
608
  {
586
609
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
610
    goto error2;
590
611
  }
591
612
 
592
613
  return(0);
 
614
 
 
615
error2:
 
616
  unlink(name_buff);
 
617
 
 
618
error:
 
619
  /* Return error number, if we got one */
 
620
  return(error ? error : -1);
593
621
}
594
622
 
595
623
/*
603
631
  /* We pack the row for writing */
604
632
  r_pack_length= pack_row(buf);
605
633
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
634
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
635
  if (written != r_pack_length)
608
636
  {
609
637
    return(-1);
623
651
 
624
652
uint32_t ha_archive::max_row_length(const unsigned char *)
625
653
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
654
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
655
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
656
 
629
657
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
658
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
659
       ptr != end ;
632
660
       ptr++)
633
661
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
662
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
635
663
  }
636
664
 
637
665
  return length;
646
674
    return(HA_ERR_OUT_OF_MEM);
647
675
 
648
676
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
677
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
678
  ptr= record_buffer->buffer + table->s->null_bytes;
651
679
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
680
  for (Field **field=table->field ; *field ; field++)
653
681
  {
654
682
    if (!((*field)->is_null()))
655
683
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
684
  }
657
685
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
686
  return((unsigned int) (ptr - record_buffer->buffer));
659
687
}
660
688
 
661
689
 
668
696
  for implementing start_bulk_insert() is that we could skip
669
697
  setting dirty to true each time.
670
698
*/
671
 
int ha_archive::doInsertRecord(unsigned char *buf)
 
699
int ha_archive::write_row(unsigned char *buf)
672
700
{
673
701
  int rc;
674
702
  unsigned char *read_buf= NULL;
675
703
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
704
  unsigned char *record=  table->record[0];
677
705
 
678
706
  if (share->crashed)
679
707
    return(HA_ERR_CRASHED_ON_USAGE);
680
708
 
681
 
  pthread_mutex_lock(&share->mutex());
 
709
  ha_statistic_increment(&system_status_var::ha_write_count);
 
710
  pthread_mutex_lock(&share->mutex);
682
711
 
683
712
  if (share->archive_write_open == false)
684
713
    if (init_archive_writer())
685
714
      return(HA_ERR_CRASHED_ON_USAGE);
686
715
 
687
716
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
717
  if (table->next_number_field && record == table->record[0])
689
718
  {
 
719
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
720
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
721
    temp_auto= table->next_number_field->val_int();
692
722
 
693
723
    /*
694
724
      We don't support decremening auto_increment. They make the performance
695
725
      just cry.
696
726
    */
697
727
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
728
        mkey->flags & HA_NOSAME)
699
729
    {
700
730
      rc= HA_ERR_FOUND_DUPP_KEY;
701
731
      goto error;
715
745
  share->rows_recorded++;
716
746
  rc= real_write_row(buf,  &(share->archive_write));
717
747
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
748
  pthread_mutex_unlock(&share->mutex);
719
749
  if (read_buf)
720
750
    free((unsigned char*) read_buf);
721
751
 
730
760
  *first_value= share->archive_write.auto_increment + 1;
731
761
}
732
762
 
733
 
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
 
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
763
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
764
int ha_archive::index_init(uint32_t keynr, bool)
735
765
{
736
766
  active_index= keynr;
737
767
  return(0);
743
773
  the optimizer that we have unique indexes, we scan
744
774
*/
745
775
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
 
                             uint32_t key_len, enum ha_rkey_function)
 
776
                             uint32_t key_len, enum ha_rkey_function find_flag)
 
777
{
 
778
  int rc;
 
779
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
 
780
  return(rc);
 
781
}
 
782
 
 
783
 
 
784
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
 
785
                               uint32_t key_len, enum ha_rkey_function)
747
786
{
748
787
  int rc;
749
788
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
789
  KEY *mkey= &table->s->key_info[index];
 
790
  current_k_offset= mkey->key_part->offset;
751
791
  current_key= key;
752
792
  current_key_len= key_len;
753
793
 
754
 
  rc= doStartTableScan(true);
 
794
  rc= rnd_init(true);
755
795
 
756
796
  if (rc)
757
797
    goto error;
795
835
  we assume the position will be set.
796
836
*/
797
837
 
798
 
int ha_archive::doStartTableScan(bool scan)
 
838
int ha_archive::rnd_init(bool scan)
799
839
{
800
840
  if (share->crashed)
801
841
      return(HA_ERR_CRASHED_ON_USAGE);
832
872
/* Reallocate buffer if needed */
833
873
bool ha_archive::fix_rec_buff(unsigned int length)
834
874
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
875
  assert(record_buffer->buffer);
 
876
 
 
877
  if (length > record_buffer->length)
 
878
  {
 
879
    unsigned char *newptr;
 
880
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
881
      return(1);
 
882
    record_buffer->buffer= newptr;
 
883
    record_buffer->length= length;
 
884
  }
 
885
 
 
886
  assert(length <= record_buffer->length);
 
887
 
 
888
  return(0);
838
889
}
839
890
 
840
891
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
903
  }
853
904
 
854
905
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
906
  memcpy(record, ptr, table->getNullBytes());
 
907
  ptr+= table->getNullBytes();
 
908
  for (Field **field=table->field ; *field ; field++)
858
909
  {
859
910
    if (!((*field)->is_null()))
860
911
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
912
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
913
    }
863
914
  }
864
915
  return(0);
893
944
  current_position= aztell(&archive);
894
945
  rc= get_row(&archive, buf);
895
946
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
947
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
948
 
898
949
  return(rc);
899
950
}
951
1002
int ha_archive::optimize()
952
1003
{
953
1004
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1005
  azio_stream writer;
 
1006
  char writer_filename[FN_REFLEN];
955
1007
 
956
1008
  init_archive_reader();
957
1009
 
971
1023
  azread_frm(&archive, proto_string);
972
1024
 
973
1025
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
 
1026
  internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
 
1027
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
976
1028
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
1029
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
1030
  {
979
1031
    free(proto_string);
980
1032
    return(HA_ERR_CRASHED_ON_USAGE);
981
1033
  }
982
1034
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
 
1035
  azwrite_frm(&writer, proto_string, archive.frm_length);
984
1036
 
985
1037
  /*
986
1038
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1004
1056
    */
1005
1057
    if (!rc)
1006
1058
    {
 
1059
      uint64_t x;
1007
1060
      uint64_t rows_restored;
1008
1061
      share->rows_recorded= 0;
1009
1062
      stats.auto_increment_value= 1;
1011
1064
 
1012
1065
      rows_restored= archive.rows;
1013
1066
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1067
      for (x= 0; x < rows_restored ; x++)
1015
1068
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1069
        rc= get_row(&archive, table->record[0]);
1017
1070
 
1018
1071
        if (rc != 0)
1019
1072
          break;
1020
1073
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1074
        real_write_row(table->record[0], &writer);
1022
1075
        /*
1023
1076
          Long term it should be possible to optimize this so that
1024
1077
          it is not called on each row.
1025
1078
        */
1026
 
        if (getTable()->found_next_number_field)
 
1079
        if (table->found_next_number_field)
1027
1080
        {
1028
 
          Field *field= getTable()->found_next_number_field;
 
1081
          Field *field= table->found_next_number_field;
1029
1082
 
1030
1083
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
1084
          field->setReadSet();
1032
1085
 
1033
1086
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1087
            (uint64_t) field->val_int(table->record[0] +
 
1088
                                       field->offset(table->record[0]));
1036
1089
          if (share->archive_write.auto_increment < auto_value)
1037
1090
            stats.auto_increment_value=
1038
1091
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1092
        }
1040
1093
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1094
      share->rows_recorded= (ha_rows)writer.rows;
1042
1095
    }
1043
1096
 
1044
1097
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1100
    }
1048
1101
  }
1049
1102
 
1050
 
  azclose(writer.get());
 
1103
  azclose(&writer);
1051
1104
  share->dirty= false;
1052
1105
 
1053
1106
  azclose(&archive);
1054
1107
 
1055
1108
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
 
1109
  rc = internal::my_rename(writer_filename,share->data_file_name,MYF(0));
1057
1110
 
1058
1111
  free(proto_string);
1059
1112
  return(rc);
1060
1113
error:
1061
1114
  free(proto_string);
1062
 
  azclose(writer.get());
 
1115
  azclose(&writer);
1063
1116
 
1064
1117
  return(rc);
1065
1118
}
1084
1137
 
1085
1138
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
1139
         lock_type <= TL_WRITE)
1087
 
        && ! session->doing_tablespace_operation())
 
1140
        && !session_tablespace_op(session))
1088
1141
      lock_type = TL_WRITE_ALLOW_WRITE;
1089
1142
 
1090
1143
    /*
1115
1168
    If dirty, we lock, and then reset/flush the data.
1116
1169
    I found that just calling azflush() doesn't always work.
1117
1170
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1171
  pthread_mutex_lock(&share->mutex);
1119
1172
  if (share->dirty == true)
1120
1173
  {
1121
1174
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1187
    cause the number to be inaccurate.
1135
1188
  */
1136
1189
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1190
  pthread_mutex_unlock(&share->mutex);
1138
1191
 
1139
1192
  scan_rows= stats.records;
1140
1193
  stats.deleted= 0;
1144
1197
  {
1145
1198
    struct stat file_stat;  // Stat information for the data file
1146
1199
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1200
    stat(share->data_file_name, &file_stat);
1148
1201
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1202
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1203
    stats.data_file_length= file_stat.st_size;
1151
1204
    stats.create_time= file_stat.st_ctime;
1152
1205
    stats.update_time= file_stat.st_mtime;
1158
1211
  if (flag & HA_STATUS_AUTO)
1159
1212
  {
1160
1213
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1214
    pthread_mutex_lock(&share->mutex);
1162
1215
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1216
    pthread_mutex_unlock(&share->mutex);
1164
1217
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1218
  }
1166
1219
 
1170
1223
 
1171
1224
/*
1172
1225
  This method tells us that a bulk insert operation is about to occur. We set
1173
 
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
 
1226
  a flag which will keep write_row from saying that its data is dirty. This in
1174
1227
  turn will keep selects from causing a sync to occur.
1175
1228
  Basically, yet another optimizations to keep compression working well.
1176
1229
*/
1211
1264
{
1212
1265
  int rc= 0;
1213
1266
  const char *old_proc_info;
 
1267
  uint64_t x;
1214
1268
 
1215
 
  old_proc_info= session->get_proc_info();
1216
 
  session->set_proc_info("Checking table");
 
1269
  old_proc_info= get_session_proc_info(session);
 
1270
  set_session_proc_info(session, "Checking table");
1217
1271
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1272
  pthread_mutex_lock(&share->mutex);
1219
1273
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1274
  pthread_mutex_unlock(&share->mutex);
1221
1275
 
1222
1276
  /*
1223
1277
    Now we will rewind the archive file so that we are positioned at the
1226
1280
  init_archive_reader();
1227
1281
  azflush(&archive, Z_SYNC_FLUSH);
1228
1282
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1283
  for (x= 0; x < share->archive_write.rows; x++)
1230
1284
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1285
    rc= get_row(&archive, table->record[0]);
1232
1286
 
1233
1287
    if (rc != 0)
1234
1288
      break;
1235
1289
  }
1236
1290
 
1237
 
  session->set_proc_info(old_proc_info);
 
1291
  set_session_proc_info(session, old_proc_info);
1238
1292
 
1239
1293
  if ((rc && rc != HA_ERR_END_OF_FILE))
1240
1294
  {
1247
1301
  }
1248
1302
}
1249
1303
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1251
 
{
1252
 
  int error= 0;
1253
 
 
1254
 
  for (const char **ext= bas_ext(); *ext ; ext++)
1255
 
  {
1256
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1257
 
    {
1258
 
      if ((error=errno) != ENOENT)
1259
 
        break;
1260
 
      error= 0;
1261
 
    }
1262
 
  }
1263
 
 
1264
 
  return error;
1265
 
}
1266
 
 
1267
 
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const identifier::Table &identifier)
1269
 
{
1270
 
  string proto_path(identifier.getPath());
1271
 
  proto_path.append(ARZ);
1272
 
 
1273
 
  if (access(proto_path.c_str(), F_OK))
1274
 
  {
1275
 
    return false;
1276
 
  }
1277
 
 
1278
 
  return true;
1279
 
}
1280
 
 
1281
 
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::identifier::Schema &schema_identifier,
1283
 
                                          drizzled::identifier::Table::vector &set_of_identifiers)
1284
 
{
1285
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
 
 
1287
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
1288
 
       entry_iter != entries.end(); ++entry_iter)
1289
 
  {
1290
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
 
    const string *filename= &entry->filename;
1292
 
 
1293
 
    assert(filename->size());
1294
 
 
1295
 
    const char *ext= strchr(filename->c_str(), '.');
1296
 
 
1297
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1299
 
    {  }
1300
 
    else
1301
 
    {
1302
 
      char uname[NAME_LEN + 1];
1303
 
      uint32_t file_name_len;
1304
 
 
1305
 
      file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
 
      // TODO: Remove need for memory copy here
1307
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
 
 
1309
 
      set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
1310
 
    }
1311
 
  }
 
1304
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1305
{
 
1306
  archive_record_buffer *r;
 
1307
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1308
  {
 
1309
    return(NULL);
 
1310
  }
 
1311
  r->length= (int)length;
 
1312
 
 
1313
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1314
  {
 
1315
    free((char*) r);
 
1316
    return(NULL);
 
1317
  }
 
1318
 
 
1319
  return(r);
 
1320
}
 
1321
 
 
1322
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1323
{
 
1324
  free((char*) r->buffer);
 
1325
  free((char*) r);
 
1326
  return;
1312
1327
}