~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

  • Committer: Eric Day
  • Date: 2010-01-16 19:15:44 UTC
  • mto: This revision was merged to the branch mainline in revision 1271.
  • Revision ID: eday@oddments.org-20100116191544-pdpcz98ngewb0l56
Fixed test to not output verbose drizzleslap output.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2003 MySQL AB
2
 
   Copyright (C) 2010 Brian Aker
3
2
 
4
3
  This program is free software; you can redistribute it and/or modify
5
4
  it under the terms of the GNU General Public License as published by
12
11
 
13
12
  You should have received a copy of the GNU General Public License
14
13
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16
15
 
17
16
 
18
17
#include "config.h"
19
 
 
20
 
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
 
18
#include "drizzled/field.h"
 
19
#include "drizzled/field/blob.h"
 
20
#include "drizzled/field/timestamp.h"
 
21
#include "plugin/myisam/myisam.h"
 
22
#include "drizzled/table.h"
 
23
#include "drizzled/session.h"
 
24
 
 
25
#include "ha_archive.h"
 
26
 
 
27
#include <unistd.h>
 
28
#include <fcntl.h>
 
29
#include <sys/stat.h>
 
30
 
 
31
#include <cstdio>
 
32
#include <string>
 
33
#include <map>
23
34
 
24
35
using namespace std;
25
 
using namespace drizzled;
26
 
 
27
36
 
28
37
/*
29
38
  First, if you want to understand storage engines you should look at
93
102
    -Brian
94
103
*/
95
104
 
96
 
/* When the engine starts up set the first version */
97
 
static uint64_t global_version= 1;
98
 
 
99
 
// We use this to find out the state of the archive aio option.
100
 
extern bool archive_aio_state(void);
 
105
/* Variables for archive share methods */
 
106
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
 
107
 
 
108
static unsigned int global_version;
 
109
 
 
110
/* The file extension */
 
111
#define ARZ ".arz"               // The data file
 
112
#define ARN ".ARN"               // Files used during an optimize call
 
113
 
 
114
 
 
115
 
 
116
static bool archive_use_aio= false;
101
117
 
102
118
/*
103
119
  Number of rows that will force a bulk insert.
109
125
*/
110
126
#define ARCHIVE_ROW_HEADER_SIZE 4
111
127
 
 
128
/*
 
129
  We just implement one additional file extension.
 
130
*/
 
131
static const char *ha_archive_exts[] = {
 
132
  ARZ,
 
133
  NULL
 
134
};
 
135
 
 
136
class ArchiveEngine : public drizzled::plugin::StorageEngine
 
137
{
 
138
  typedef std::map<string, ArchiveShare*> ArchiveMap;
 
139
  ArchiveMap archive_open_tables;
 
140
 
 
141
public:
 
142
  ArchiveEngine(const string &name_arg)
 
143
   : drizzled::plugin::StorageEngine(name_arg,
 
144
                                     HTON_FILE_BASED |
 
145
                                     HTON_STATS_RECORDS_IS_EXACT |
 
146
                                     HTON_HAS_RECORDS |
 
147
                                     HTON_HAS_DATA_DICTIONARY),
 
148
     archive_open_tables()
 
149
  {
 
150
    table_definition_ext= ARZ;
 
151
  }
 
152
 
 
153
  virtual Cursor *create(TableShare &table,
 
154
                         drizzled::memory::Root *mem_root)
 
155
  {
 
156
    return new (mem_root) ha_archive(*this, table);
 
157
  }
 
158
 
 
159
  const char **bas_ext() const {
 
160
    return ha_archive_exts;
 
161
  }
 
162
 
 
163
  int doCreateTable(Session *session, const char *table_name,
 
164
                    Table& table_arg,
 
165
                    drizzled::message::Table& proto);
 
166
 
 
167
  int doGetTableDefinition(Session& session,
 
168
                           const char* path,
 
169
                           const char *db,
 
170
                           const char *table_name,
 
171
                           const bool is_tmp,
 
172
                           drizzled::message::Table *table_proto);
 
173
 
 
174
  void doGetTableNames(drizzled::CachedDirectory &directory, string& , set<string>& set_of_names);
 
175
 
 
176
  int doDropTable(Session&, const string table_path);
 
177
  ArchiveShare *findOpenTable(const string table_name);
 
178
  void addOpenTable(const string &table_name, ArchiveShare *);
 
179
  void deleteOpenTable(const string &table_name);
 
180
 
 
181
  uint32_t max_supported_keys()          const { return 1; }
 
182
  uint32_t max_supported_key_length()    const { return sizeof(uint64_t); }
 
183
  uint32_t max_supported_key_part_length() const { return sizeof(uint64_t); }
 
184
 
 
185
  uint32_t index_flags(enum  ha_key_alg) const
 
186
  {
 
187
    return HA_ONLY_WHOLE_INDEX;
 
188
  }
 
189
};
 
190
 
112
191
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
113
192
{
114
193
  ArchiveMap::iterator find_iter=
131
210
}
132
211
 
133
212
 
134
 
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
135
 
{
136
 
  string new_path(identifier.getPath());
 
213
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory, 
 
214
                                    string&, 
 
215
                                    set<string>& set_of_names)
 
216
{
 
217
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
218
 
 
219
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
220
       entry_iter != entries.end(); ++entry_iter)
 
221
  {
 
222
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
223
    const string *filename= &entry->filename;
 
224
 
 
225
    assert(filename->size());
 
226
 
 
227
    const char *ext= strchr(filename->c_str(), '.');
 
228
 
 
229
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
230
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
 
231
    {  }
 
232
    else
 
233
    {
 
234
      char uname[NAME_LEN + 1];
 
235
      uint32_t file_name_len;
 
236
 
 
237
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
238
      // TODO: Remove need for memory copy here
 
239
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
240
      set_of_names.insert(uname);
 
241
    }
 
242
  }
 
243
}
 
244
 
 
245
 
 
246
int ArchiveEngine::doDropTable(Session&,
 
247
                               const string table_path)
 
248
{
 
249
  string new_path(table_path);
137
250
 
138
251
  new_path+= ARZ;
139
252
 
148
261
}
149
262
 
150
263
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const TableIdentifier &identifier,
152
 
                                        drizzled::message::Table &table_proto)
 
264
                                        const char* path,
 
265
                                        const char *,
 
266
                                        const char *,
 
267
                                        const bool,
 
268
                                        drizzled::message::Table *table_proto)
153
269
{
154
270
  struct stat stat_info;
155
271
  int error= ENOENT;
156
272
  string proto_path;
157
273
 
158
274
  proto_path.reserve(FN_REFLEN);
159
 
  proto_path.assign(identifier.getPath());
 
275
  proto_path.assign(path);
160
276
 
161
277
  proto_path.append(ARZ);
162
278
 
165
281
  else
166
282
    error= EEXIST;
167
283
 
 
284
  if (table_proto)
168
285
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
 
286
    azio_stream proto_stream;
170
287
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
288
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
289
      return HA_ERR_CRASHED_ON_USAGE;
173
290
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
 
291
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
292
    if (proto_string == NULL)
176
293
    {
177
 
      azclose(proto_stream.get());
 
294
      azclose(&proto_stream);
178
295
      return ENOMEM;
179
296
    }
180
297
 
181
 
    azread_frm(proto_stream.get(), proto_string);
 
298
    azread_frm(&proto_stream, proto_string);
182
299
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
 
300
    if (table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
301
      error= HA_ERR_CRASHED_ON_USAGE;
185
302
 
186
 
    azclose(proto_stream.get());
 
303
    azclose(&proto_stream);
187
304
    free(proto_string);
188
305
  }
189
306
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
307
  return error;
197
308
}
198
309
 
 
310
static ArchiveEngine *archive_engine= NULL;
 
311
 
 
312
/*
 
313
  Initialize the archive Cursor.
 
314
 
 
315
  SYNOPSIS
 
316
    archive_db_init()
 
317
    void *
 
318
 
 
319
  RETURN
 
320
    false       OK
 
321
    true        Error
 
322
*/
 
323
 
 
324
static int archive_db_init(drizzled::plugin::Registry &registry)
 
325
{
 
326
 
 
327
  pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
 
328
  archive_engine= new ArchiveEngine("ARCHIVE");
 
329
  registry.add(archive_engine);
 
330
 
 
331
  /* When the engine starts up set the first version */
 
332
  global_version= 1;
 
333
 
 
334
  return false;
 
335
}
 
336
 
 
337
/*
 
338
  Release the archive Cursor.
 
339
 
 
340
  SYNOPSIS
 
341
    archive_db_done()
 
342
    void
 
343
 
 
344
  RETURN
 
345
    false       OK
 
346
*/
 
347
 
 
348
static int archive_db_done(drizzled::plugin::Registry &registry)
 
349
{
 
350
  registry.remove(archive_engine);
 
351
  delete archive_engine;
 
352
 
 
353
  pthread_mutex_destroy(&archive_mutex);
 
354
 
 
355
  return 0;
 
356
}
 
357
 
199
358
 
200
359
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
 
360
                       TableShare &table_arg)
202
361
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
203
362
{
204
363
  /* Set our original buffer from pre-allocated memory */
205
364
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
206
365
 
207
366
  /* The size of the offset value we will use for position() */
208
 
  ref_length= sizeof(internal::my_off_t);
 
367
  ref_length= sizeof(my_off_t);
209
368
  archive_reader_open= false;
210
369
}
211
370
 
236
395
{
237
396
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
397
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
 
398
  fn_format(data_file_name, table_name.c_str(), "",
 
399
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
241
400
  /*
242
401
    We will use this lock for rows.
243
402
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
 
403
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
245
404
}
246
405
 
247
406
ArchiveShare::~ArchiveShare()
248
407
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
 
408
  thr_lock_delete(&lock);
 
409
  pthread_mutex_destroy(&mutex);
251
410
  /*
252
411
    We need to make sure we don't reset the crashed state.
253
412
    If we open a crashed file, wee need to close it as crashed unless
261
420
 
262
421
bool ArchiveShare::prime(uint64_t *auto_increment)
263
422
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
 
423
  azio_stream archive_tmp;
265
424
 
266
425
  /*
267
426
    We read the meta file, but do not mark it dirty. Since we are not
269
428
    anything but reading... open it for write and we will generate null
270
429
    compression writes).
271
430
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
 
431
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
432
               AZ_METHOD_BLOCK)))
274
433
    return false;
275
434
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
 
435
  *auto_increment= archive_tmp.auto_increment + 1;
 
436
  rows_recorded= (ha_rows)archive_tmp.rows;
 
437
  crashed= archive_tmp.dirty;
279
438
  if (version < global_version)
280
439
  {
281
440
    version_rows= rows_recorded;
282
441
    version= global_version;
283
442
  }
284
 
  azclose(archive_tmp.get());
 
443
  azclose(&archive_tmp);
285
444
 
286
445
  return true;
287
446
}
296
455
*/
297
456
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
298
457
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
 
458
  pthread_mutex_lock(&archive_mutex);
 
459
 
 
460
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
461
  share= a_engine->findOpenTable(table_name);
304
462
 
305
463
  if (!share)
308
466
 
309
467
    if (share == NULL)
310
468
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
469
      pthread_mutex_unlock(&archive_mutex);
312
470
      *rc= HA_ERR_OUT_OF_MEM;
313
471
      return(NULL);
314
472
    }
315
473
 
316
474
    if (share->prime(&stats.auto_increment_value) == false)
317
475
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
476
      pthread_mutex_unlock(&archive_mutex);
319
477
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
478
      delete share;
321
479
 
323
481
    }
324
482
 
325
483
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
484
    thr_lock_init(&share->lock);
327
485
  }
328
486
  share->use_count++;
329
487
 
330
488
  if (share->crashed)
331
489
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
490
  pthread_mutex_unlock(&archive_mutex);
333
491
 
334
492
  return(share);
335
493
}
341
499
*/
342
500
int ha_archive::free_share()
343
501
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
345
 
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
502
  pthread_mutex_lock(&archive_mutex);
347
503
  if (!--share->use_count)
348
504
  {
 
505
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
506
    a_engine->deleteOpenTable(share->table_name);
350
507
    delete share;
351
508
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
509
  pthread_mutex_unlock(&archive_mutex);
353
510
 
354
511
  return 0;
355
512
}
361
518
    a gzip file that can be both read and written we keep a writer open
362
519
    that is shared amoung all open tables.
363
520
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
521
  if (!(azopen(&(share->archive_write), share->data_file_name,
365
522
               O_RDWR, AZ_METHOD_BLOCK)))
366
523
  {
367
524
    share->crashed= true;
387
544
  {
388
545
    az_method method;
389
546
 
390
 
    if (archive_aio_state())
 
547
    switch (archive_use_aio)
391
548
    {
 
549
    case false:
 
550
      method= AZ_METHOD_BLOCK;
 
551
      break;
 
552
    case true:
392
553
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
554
      break;
 
555
    default:
396
556
      method= AZ_METHOD_BLOCK;
397
557
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
558
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
399
559
                 method)))
400
560
    {
401
561
      share->crashed= true;
413
573
  Init out lock.
414
574
  We open the file we will read from.
415
575
*/
416
 
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
576
int ha_archive::open(const char *name, int, uint32_t)
417
577
{
418
578
  int rc= 0;
419
 
  share= get_share(identifier.getPath().c_str(), &rc);
 
579
  share= get_share(name, &rc);
420
580
 
421
581
  /** 
422
582
    We either fix it ourselves, or we just take it offline 
437
597
 
438
598
  assert(share);
439
599
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
 
600
  record_buffer= create_record_buffer(table->s->reclength +
 
601
                                      ARCHIVE_ROW_HEADER_SIZE);
 
602
 
 
603
  if (!record_buffer)
 
604
  {
 
605
    free_share();
 
606
    return(HA_ERR_OUT_OF_MEM);
 
607
  }
 
608
 
 
609
  thr_lock_data_init(&share->lock, &lock, NULL);
443
610
 
444
611
  return(rc);
445
612
}
446
613
 
447
 
// Should never be called
448
 
int ha_archive::open(const char *, int, uint32_t)
449
 
{
450
 
  assert(0);
451
 
  return -1;
452
 
}
453
 
 
454
614
 
455
615
/*
456
616
  Closes the file.
473
633
{
474
634
  int rc= 0;
475
635
 
476
 
  record_buffer.clear();
 
636
  destroy_record_buffer(record_buffer);
477
637
 
478
638
  /* First close stream */
479
639
  if (archive_reader_open == true)
497
657
  of creation.
498
658
*/
499
659
 
500
 
int ArchiveEngine::doCreateTable(Session &,
 
660
int ArchiveEngine::doCreateTable(Session *,
 
661
                                 const char *table_name,
501
662
                                 Table& table_arg,
502
 
                                 const drizzled::TableIdentifier &identifier,
503
663
                                 drizzled::message::Table& proto)
504
664
{
 
665
  char name_buff[FN_REFLEN];
505
666
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
 
667
  azio_stream create_stream;            /* Archive file we are working with */
507
668
  uint64_t auto_increment_value;
508
669
  string serialized_proto;
509
670
 
511
672
 
512
673
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
513
674
  {
514
 
    KeyInfo *pos= &table_arg.key_info[key];
515
 
    KeyPartInfo *key_part=     pos->key_part;
516
 
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
675
    KEY *pos= table_arg.key_info+key;
 
676
    KEY_PART_INFO *key_part=     pos->key_part;
 
677
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
517
678
 
518
679
    for (; key_part != key_part_end; key_part++)
519
680
    {
521
682
 
522
683
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
684
      {
524
 
        return -1;
 
685
        error= -1;
 
686
        goto error;
525
687
      }
526
688
    }
527
689
  }
528
690
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
 
691
  /*
 
692
    We reuse name_buff since it is available.
 
693
  */
 
694
  fn_format(name_buff, table_name, "", ARZ,
 
695
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
531
696
 
532
697
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
 
698
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
699
             AZ_METHOD_BLOCK) == 0)
535
700
  {
536
701
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
540
 
  }
541
 
 
542
 
  try {
543
 
    proto.SerializeToString(&serialized_proto);
544
 
  }
545
 
  catch (...)
546
 
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
550
 
  }
551
 
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
 
702
    goto error2;
 
703
  }
 
704
 
 
705
  proto.SerializeToString(&serialized_proto);
 
706
 
 
707
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
708
                  serialized_proto.length()))
554
 
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
558
 
  }
 
709
    goto error2;
559
710
 
560
711
  if (proto.options().has_comment())
561
712
  {
562
713
    int write_length;
563
714
 
564
 
    write_length= azwrite_comment(create_stream.get(),
 
715
    write_length= azwrite_comment(&create_stream,
565
716
                                  proto.options().comment().c_str(),
566
717
                                  proto.options().comment().length());
567
718
 
568
719
    if (write_length < 0)
569
720
    {
570
721
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
 
722
      goto error2;
574
723
    }
575
724
  }
576
725
 
578
727
    Yes you need to do this, because the starting value
579
728
    for the autoincrement may not be zero.
580
729
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
 
730
  create_stream.auto_increment= auto_increment_value ?
582
731
    auto_increment_value - 1 : 0;
583
732
 
584
 
  if (azclose(create_stream.get()))
 
733
  if (azclose(&create_stream))
585
734
  {
586
735
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
736
    goto error2;
590
737
  }
591
738
 
592
739
  return(0);
 
740
 
 
741
error2:
 
742
  unlink(name_buff);
 
743
 
 
744
error:
 
745
  /* Return error number, if we got one */
 
746
  return(error ? error : -1);
593
747
}
594
748
 
595
749
/*
603
757
  /* We pack the row for writing */
604
758
  r_pack_length= pack_row(buf);
605
759
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
760
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
761
  if (written != r_pack_length)
608
762
  {
609
763
    return(-1);
623
777
 
624
778
uint32_t ha_archive::max_row_length(const unsigned char *)
625
779
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
780
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
781
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
782
 
629
783
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
784
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
785
       ptr != end ;
632
786
       ptr++)
633
787
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
788
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
635
789
  }
636
790
 
637
791
  return length;
646
800
    return(HA_ERR_OUT_OF_MEM);
647
801
 
648
802
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
803
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
804
  ptr= record_buffer->buffer + table->s->null_bytes;
651
805
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
806
  for (Field **field=table->field ; *field ; field++)
653
807
  {
654
808
    if (!((*field)->is_null()))
655
809
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
810
  }
657
811
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
812
  return((unsigned int) (ptr - record_buffer->buffer));
659
813
}
660
814
 
661
815
 
668
822
  for implementing start_bulk_insert() is that we could skip
669
823
  setting dirty to true each time.
670
824
*/
671
 
int ha_archive::doInsertRecord(unsigned char *buf)
 
825
int ha_archive::write_row(unsigned char *buf)
672
826
{
673
827
  int rc;
674
828
  unsigned char *read_buf= NULL;
675
829
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
830
  unsigned char *record=  table->record[0];
677
831
 
678
832
  if (share->crashed)
679
833
    return(HA_ERR_CRASHED_ON_USAGE);
680
834
 
681
 
  pthread_mutex_lock(&share->mutex());
 
835
  ha_statistic_increment(&SSV::ha_write_count);
 
836
  pthread_mutex_lock(&share->mutex);
682
837
 
683
838
  if (share->archive_write_open == false)
684
839
    if (init_archive_writer())
685
840
      return(HA_ERR_CRASHED_ON_USAGE);
686
841
 
687
842
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
843
  if (table->next_number_field && record == table->record[0])
689
844
  {
 
845
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
846
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
847
    temp_auto= table->next_number_field->val_int();
692
848
 
693
849
    /*
694
850
      We don't support decremening auto_increment. They make the performance
695
851
      just cry.
696
852
    */
697
853
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
854
        mkey->flags & HA_NOSAME)
699
855
    {
700
856
      rc= HA_ERR_FOUND_DUPP_KEY;
701
857
      goto error;
715
871
  share->rows_recorded++;
716
872
  rc= real_write_row(buf,  &(share->archive_write));
717
873
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
874
  pthread_mutex_unlock(&share->mutex);
719
875
  if (read_buf)
720
876
    free((unsigned char*) read_buf);
721
877
 
730
886
  *first_value= share->archive_write.auto_increment + 1;
731
887
}
732
888
 
733
 
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
 
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
889
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
890
int ha_archive::index_init(uint32_t keynr, bool)
735
891
{
736
892
  active_index= keynr;
737
893
  return(0);
743
899
  the optimizer that we have unique indexes, we scan
744
900
*/
745
901
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
 
                             uint32_t key_len, enum ha_rkey_function)
 
902
                             uint32_t key_len, enum ha_rkey_function find_flag)
 
903
{
 
904
  int rc;
 
905
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
 
906
  return(rc);
 
907
}
 
908
 
 
909
 
 
910
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
 
911
                               uint32_t key_len, enum ha_rkey_function)
747
912
{
748
913
  int rc;
749
914
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
915
  KEY *mkey= &table->s->key_info[index];
 
916
  current_k_offset= mkey->key_part->offset;
751
917
  current_key= key;
752
918
  current_key_len= key_len;
753
919
 
754
 
  rc= doStartTableScan(true);
 
920
  rc= rnd_init(true);
755
921
 
756
922
  if (rc)
757
923
    goto error;
795
961
  we assume the position will be set.
796
962
*/
797
963
 
798
 
int ha_archive::doStartTableScan(bool scan)
 
964
int ha_archive::rnd_init(bool scan)
799
965
{
800
966
  if (share->crashed)
801
967
      return(HA_ERR_CRASHED_ON_USAGE);
832
998
/* Reallocate buffer if needed */
833
999
bool ha_archive::fix_rec_buff(unsigned int length)
834
1000
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
1001
  assert(record_buffer->buffer);
 
1002
 
 
1003
  if (length > record_buffer->length)
 
1004
  {
 
1005
    unsigned char *newptr;
 
1006
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
1007
      return(1);
 
1008
    record_buffer->buffer= newptr;
 
1009
    record_buffer->length= length;
 
1010
  }
 
1011
 
 
1012
  assert(length <= record_buffer->length);
 
1013
 
 
1014
  return(0);
838
1015
}
839
1016
 
840
1017
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
1029
  }
853
1030
 
854
1031
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
1032
  memcpy(record, ptr, table->getNullBytes());
 
1033
  ptr+= table->getNullBytes();
 
1034
  for (Field **field=table->field ; *field ; field++)
858
1035
  {
859
1036
    if (!((*field)->is_null()))
860
1037
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
1038
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
1039
    }
863
1040
  }
864
1041
  return(0);
889
1066
    return(HA_ERR_END_OF_FILE);
890
1067
  scan_rows--;
891
1068
 
892
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1069
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
893
1070
  current_position= aztell(&archive);
894
1071
  rc= get_row(&archive, buf);
895
1072
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
1073
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
1074
 
898
1075
  return(rc);
899
1076
}
907
1084
 
908
1085
void ha_archive::position(const unsigned char *)
909
1086
{
910
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1087
  my_store_ptr(ref, ref_length, current_position);
911
1088
  return;
912
1089
}
913
1090
 
921
1098
 
922
1099
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
923
1100
{
924
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
925
 
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
 
1101
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1102
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
926
1103
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
927
1104
    return(HA_ERR_CRASHED_ON_USAGE);
928
1105
  return(get_row(&archive, buf));
951
1128
int ha_archive::optimize()
952
1129
{
953
1130
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1131
  azio_stream writer;
 
1132
  char writer_filename[FN_REFLEN];
955
1133
 
956
1134
  init_archive_reader();
957
1135
 
971
1149
  azread_frm(&archive, proto_string);
972
1150
 
973
1151
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
 
1152
  fn_format(writer_filename, share->table_name.c_str(), "", ARN,
 
1153
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
976
1154
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
1155
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
1156
  {
979
1157
    free(proto_string);
980
1158
    return(HA_ERR_CRASHED_ON_USAGE);
981
1159
  }
982
1160
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
 
1161
  azwrite_frm(&writer, proto_string, archive.frm_length);
984
1162
 
985
1163
  /*
986
1164
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1004
1182
    */
1005
1183
    if (!rc)
1006
1184
    {
 
1185
      uint64_t x;
1007
1186
      uint64_t rows_restored;
1008
1187
      share->rows_recorded= 0;
1009
1188
      stats.auto_increment_value= 1;
1011
1190
 
1012
1191
      rows_restored= archive.rows;
1013
1192
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1193
      for (x= 0; x < rows_restored ; x++)
1015
1194
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1195
        rc= get_row(&archive, table->record[0]);
1017
1196
 
1018
1197
        if (rc != 0)
1019
1198
          break;
1020
1199
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1200
        real_write_row(table->record[0], &writer);
1022
1201
        /*
1023
1202
          Long term it should be possible to optimize this so that
1024
1203
          it is not called on each row.
1025
1204
        */
1026
 
        if (getTable()->found_next_number_field)
 
1205
        if (table->found_next_number_field)
1027
1206
        {
1028
 
          Field *field= getTable()->found_next_number_field;
 
1207
          Field *field= table->found_next_number_field;
1029
1208
 
1030
1209
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
1210
          field->setReadSet();
1032
1211
 
1033
1212
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1213
            (uint64_t) field->val_int(table->record[0] +
 
1214
                                       field->offset(table->record[0]));
1036
1215
          if (share->archive_write.auto_increment < auto_value)
1037
1216
            stats.auto_increment_value=
1038
1217
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1218
        }
1040
1219
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1220
      share->rows_recorded= (ha_rows)writer.rows;
1042
1221
    }
1043
1222
 
1044
1223
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1226
    }
1048
1227
  }
1049
1228
 
1050
 
  azclose(writer.get());
 
1229
  azclose(&writer);
1051
1230
  share->dirty= false;
1052
1231
 
1053
1232
  azclose(&archive);
1054
1233
 
1055
1234
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
 
1235
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1057
1236
 
1058
1237
  free(proto_string);
1059
1238
  return(rc);
1060
1239
error:
1061
1240
  free(proto_string);
1062
 
  azclose(writer.get());
 
1241
  azclose(&writer);
1063
1242
 
1064
1243
  return(rc);
1065
1244
}
1115
1294
    If dirty, we lock, and then reset/flush the data.
1116
1295
    I found that just calling azflush() doesn't always work.
1117
1296
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1297
  pthread_mutex_lock(&share->mutex);
1119
1298
  if (share->dirty == true)
1120
1299
  {
1121
1300
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1313
    cause the number to be inaccurate.
1135
1314
  */
1136
1315
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1316
  pthread_mutex_unlock(&share->mutex);
1138
1317
 
1139
1318
  scan_rows= stats.records;
1140
1319
  stats.deleted= 0;
1144
1323
  {
1145
1324
    struct stat file_stat;  // Stat information for the data file
1146
1325
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1326
    stat(share->data_file_name, &file_stat);
1148
1327
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1328
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1329
    stats.data_file_length= file_stat.st_size;
1151
1330
    stats.create_time= file_stat.st_ctime;
1152
1331
    stats.update_time= file_stat.st_mtime;
1158
1337
  if (flag & HA_STATUS_AUTO)
1159
1338
  {
1160
1339
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1340
    pthread_mutex_lock(&share->mutex);
1162
1341
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1342
    pthread_mutex_unlock(&share->mutex);
1164
1343
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1344
  }
1166
1345
 
1170
1349
 
1171
1350
/*
1172
1351
  This method tells us that a bulk insert operation is about to occur. We set
1173
 
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
 
1352
  a flag which will keep write_row from saying that its data is dirty. This in
1174
1353
  turn will keep selects from causing a sync to occur.
1175
1354
  Basically, yet another optimizations to keep compression working well.
1176
1355
*/
1211
1390
{
1212
1391
  int rc= 0;
1213
1392
  const char *old_proc_info;
 
1393
  uint64_t x;
1214
1394
 
1215
1395
  old_proc_info= get_session_proc_info(session);
1216
1396
  set_session_proc_info(session, "Checking table");
1217
1397
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1398
  pthread_mutex_lock(&share->mutex);
1219
1399
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1400
  pthread_mutex_unlock(&share->mutex);
1221
1401
 
1222
1402
  /*
1223
1403
    Now we will rewind the archive file so that we are positioned at the
1226
1406
  init_archive_reader();
1227
1407
  azflush(&archive, Z_SYNC_FLUSH);
1228
1408
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1409
  for (x= 0; x < share->archive_write.rows; x++)
1230
1410
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1411
    rc= get_row(&archive, table->record[0]);
1232
1412
 
1233
1413
    if (rc != 0)
1234
1414
      break;
1247
1427
  }
1248
1428
}
1249
1429
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1251
 
{
1252
 
  int error= 0;
1253
 
 
1254
 
  for (const char **ext= bas_ext(); *ext ; ext++)
1255
 
  {
1256
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1257
 
    {
1258
 
      if ((error=errno) != ENOENT)
1259
 
        break;
1260
 
      error= 0;
1261
 
    }
1262
 
  }
1263
 
 
1264
 
  return error;
1265
 
}
1266
 
 
1267
 
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const TableIdentifier &identifier)
1269
 
{
1270
 
  string proto_path(identifier.getPath());
1271
 
  proto_path.append(ARZ);
1272
 
 
1273
 
  if (access(proto_path.c_str(), F_OK))
1274
 
  {
1275
 
    return false;
1276
 
  }
1277
 
 
1278
 
  return true;
1279
 
}
1280
 
 
1281
 
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::SchemaIdentifier &schema_identifier,
1283
 
                                          drizzled::TableIdentifier::vector &set_of_identifiers)
1284
 
{
1285
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
 
 
1287
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
1288
 
       entry_iter != entries.end(); ++entry_iter)
1289
 
  {
1290
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
 
    const string *filename= &entry->filename;
1292
 
 
1293
 
    assert(filename->size());
1294
 
 
1295
 
    const char *ext= strchr(filename->c_str(), '.');
1296
 
 
1297
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1299
 
    {  }
1300
 
    else
1301
 
    {
1302
 
      char uname[NAME_LEN + 1];
1303
 
      uint32_t file_name_len;
1304
 
 
1305
 
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
 
      // TODO: Remove need for memory copy here
1307
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
 
 
1309
 
      set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1310
 
    }
1311
 
  }
1312
 
}
 
1430
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1431
{
 
1432
  archive_record_buffer *r;
 
1433
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1434
  {
 
1435
    return(NULL);
 
1436
  }
 
1437
  r->length= (int)length;
 
1438
 
 
1439
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1440
  {
 
1441
    free((char*) r);
 
1442
    return(NULL);
 
1443
  }
 
1444
 
 
1445
  return(r);
 
1446
}
 
1447
 
 
1448
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1449
{
 
1450
  free((char*) r->buffer);
 
1451
  free((char*) r);
 
1452
  return;
 
1453
}
 
1454
 
 
1455
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
 
1456
  PLUGIN_VAR_NOCMDOPT,
 
1457
  "Whether or not to use asynchronous IO.",
 
1458
  NULL, NULL, true);
 
1459
 
 
1460
static drizzle_sys_var* archive_system_variables[]= {
 
1461
  DRIZZLE_SYSVAR(aio),
 
1462
  NULL
 
1463
};
 
1464
 
 
1465
DRIZZLE_DECLARE_PLUGIN
 
1466
{
 
1467
  DRIZZLE_VERSION_ID,
 
1468
  "ARCHIVE",
 
1469
  "3.5",
 
1470
  "Brian Aker, MySQL AB",
 
1471
  "Archive storage engine",
 
1472
  PLUGIN_LICENSE_GPL,
 
1473
  archive_db_init, /* Plugin Init */
 
1474
  archive_db_done, /* Plugin Deinit */
 
1475
  NULL,                       /* status variables                */
 
1476
  archive_system_variables,   /* system variables                */
 
1477
  NULL                        /* config options                  */
 
1478
}
 
1479
DRIZZLE_DECLARE_PLUGIN_END;
 
1480