~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

Blackhole, CSV, Pool of Threads,Single Thread, Multi Thread.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2003 MySQL AB
2
 
   Copyright (C) 2010 Brian Aker
3
2
 
4
3
  This program is free software; you can redistribute it and/or modify
5
4
  it under the terms of the GNU General Public License as published by
12
11
 
13
12
  You should have received a copy of the GNU General Public License
14
13
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
16
 
 
17
 
 
18
 
#include <config.h>
19
 
 
20
 
#include <plugin/archive/archive_engine.h>
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
#include "drizzled/server_includes.h"
 
18
#include "drizzled/field.h"
 
19
#include "drizzled/field/blob.h"
 
20
#include "drizzled/field/timestamp.h"
 
21
#include "plugin/myisam/myisam.h"
 
22
#include "drizzled/table.h"
 
23
#include "drizzled/session.h"
 
24
#include <mysys/my_dir.h>
 
25
 
 
26
#include "ha_archive.h"
 
27
 
 
28
#include <stdio.h>
 
29
#include <string>
 
30
#include <map>
23
31
 
24
32
using namespace std;
25
 
using namespace drizzled;
26
 
 
27
33
 
28
34
/*
29
35
  First, if you want to understand storage engines you should look at
93
99
    -Brian
94
100
*/
95
101
 
96
 
/* When the engine starts up set the first version */
97
 
static uint64_t global_version= 1;
98
 
 
99
 
// We use this to find out the state of the archive aio option.
100
 
extern bool archive_aio_state(void);
 
102
/* Variables for archive share methods */
 
103
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
 
104
 
 
105
static unsigned int global_version;
 
106
 
 
107
/* The file extension */
 
108
#define ARZ ".arz"               // The data file
 
109
#define ARN ".ARN"               // Files used during an optimize call
 
110
 
 
111
 
 
112
 
 
113
static bool archive_use_aio= false;
101
114
 
102
115
/*
103
116
  Number of rows that will force a bulk insert.
109
122
*/
110
123
#define ARCHIVE_ROW_HEADER_SIZE 4
111
124
 
 
125
/*
 
126
  We just implement one additional file extension.
 
127
*/
 
128
static const char *ha_archive_exts[] = {
 
129
  ARZ,
 
130
  NULL
 
131
};
 
132
 
 
133
class ArchiveEngine : public drizzled::plugin::StorageEngine
 
134
{
 
135
  typedef std::map<string, ArchiveShare*> ArchiveMap;
 
136
  ArchiveMap archive_open_tables;
 
137
 
 
138
public:
 
139
  ArchiveEngine(const string &name_arg)
 
140
   : drizzled::plugin::StorageEngine(name_arg,
 
141
                                     HTON_FILE_BASED |
 
142
                                     HTON_STATS_RECORDS_IS_EXACT |
 
143
                                     HTON_HAS_RECORDS |
 
144
                                     HTON_HAS_DATA_DICTIONARY),
 
145
     archive_open_tables()
 
146
  {
 
147
    table_definition_ext= ARZ;
 
148
  }
 
149
 
 
150
  virtual Cursor *create(TableShare &table,
 
151
                          MEM_ROOT *mem_root)
 
152
  {
 
153
    return new (mem_root) ha_archive(*this, table);
 
154
  }
 
155
 
 
156
  const char **bas_ext() const {
 
157
    return ha_archive_exts;
 
158
  }
 
159
 
 
160
  int doCreateTable(Session *session, const char *table_name,
 
161
                    Table& table_arg,
 
162
                    drizzled::message::Table& proto);
 
163
 
 
164
  int doGetTableDefinition(Session& session,
 
165
                           const char* path,
 
166
                           const char *db,
 
167
                           const char *table_name,
 
168
                           const bool is_tmp,
 
169
                           drizzled::message::Table *table_proto);
 
170
 
 
171
  void doGetTableNames(CachedDirectory &directory, string& , set<string>& set_of_names);
 
172
 
 
173
  int doDropTable(Session&, const string table_path);
 
174
  ArchiveShare *findOpenTable(const string table_name);
 
175
  void addOpenTable(const string &table_name, ArchiveShare *);
 
176
  void deleteOpenTable(const string &table_name);
 
177
 
 
178
  uint32_t max_supported_keys()          const { return 1; }
 
179
  uint32_t max_supported_key_length()    const { return sizeof(uint64_t); }
 
180
  uint32_t max_supported_key_part_length() const { return sizeof(uint64_t); }
 
181
 
 
182
  uint32_t index_flags(enum  ha_key_alg) const
 
183
  {
 
184
    return HA_ONLY_WHOLE_INDEX;
 
185
  }
 
186
};
 
187
 
112
188
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
113
189
{
114
190
  ArchiveMap::iterator find_iter=
131
207
}
132
208
 
133
209
 
134
 
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
135
 
{
136
 
  string new_path(identifier.getPath());
 
210
void ArchiveEngine::doGetTableNames(CachedDirectory &directory, 
 
211
                                    string&, 
 
212
                                    set<string>& set_of_names)
 
213
{
 
214
  CachedDirectory::Entries entries= directory.getEntries();
 
215
 
 
216
  for (CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
217
       entry_iter != entries.end(); ++entry_iter)
 
218
  {
 
219
    CachedDirectory::Entry *entry= *entry_iter;
 
220
    string *filename= &entry->filename;
 
221
 
 
222
    assert(filename->size());
 
223
 
 
224
    const char *ext= strchr(filename->c_str(), '.');
 
225
 
 
226
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
227
        is_prefix(filename->c_str(), TMP_FILE_PREFIX))
 
228
    {  }
 
229
    else
 
230
    {
 
231
      char uname[NAME_LEN + 1];
 
232
      uint32_t file_name_len;
 
233
 
 
234
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
235
      // TODO: Remove need for memory copy here
 
236
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
237
      set_of_names.insert(uname);
 
238
    }
 
239
  }
 
240
}
 
241
 
 
242
 
 
243
int ArchiveEngine::doDropTable(Session&,
 
244
                               const string table_path)
 
245
{
 
246
  string new_path(table_path);
137
247
 
138
248
  new_path+= ARZ;
139
249
 
141
251
 
142
252
  if (error != 0)
143
253
  {
144
 
    error= errno= errno;
 
254
    error= my_errno= errno;
145
255
  }
146
256
 
147
257
  return error;
148
258
}
149
259
 
150
260
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const identifier::Table &identifier,
152
 
                                        drizzled::message::Table &table_proto)
 
261
                                        const char* path,
 
262
                                        const char *,
 
263
                                        const char *,
 
264
                                        const bool,
 
265
                                        drizzled::message::Table *table_proto)
153
266
{
154
267
  struct stat stat_info;
155
268
  int error= ENOENT;
156
269
  string proto_path;
157
270
 
158
271
  proto_path.reserve(FN_REFLEN);
159
 
  proto_path.assign(identifier.getPath());
 
272
  proto_path.assign(path);
160
273
 
161
274
  proto_path.append(ARZ);
162
275
 
165
278
  else
166
279
    error= EEXIST;
167
280
 
 
281
  if (table_proto)
168
282
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
 
283
    azio_stream proto_stream;
170
284
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
285
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
286
      return HA_ERR_CRASHED_ON_USAGE;
173
287
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
 
288
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
289
    if (proto_string == NULL)
176
290
    {
177
 
      azclose(proto_stream.get());
 
291
      azclose(&proto_stream);
178
292
      return ENOMEM;
179
293
    }
180
294
 
181
 
    azread_frm(proto_stream.get(), proto_string);
 
295
    azread_frm(&proto_stream, proto_string);
182
296
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
 
297
    if (table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
298
      error= HA_ERR_CRASHED_ON_USAGE;
185
299
 
186
 
    azclose(proto_stream.get());
 
300
    azclose(&proto_stream);
187
301
    free(proto_string);
188
302
  }
189
303
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
304
  return error;
197
305
}
198
306
 
 
307
static ArchiveEngine *archive_engine= NULL;
 
308
 
 
309
/*
 
310
  Initialize the archive Cursor.
 
311
 
 
312
  SYNOPSIS
 
313
    archive_db_init()
 
314
    void *
 
315
 
 
316
  RETURN
 
317
    false       OK
 
318
    true        Error
 
319
*/
 
320
 
 
321
static int archive_db_init(drizzled::plugin::Registry &registry)
 
322
{
 
323
 
 
324
  pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
 
325
  archive_engine= new ArchiveEngine("ARCHIVE");
 
326
  registry.add(archive_engine);
 
327
 
 
328
  /* When the engine starts up set the first version */
 
329
  global_version= 1;
 
330
 
 
331
  return false;
 
332
}
 
333
 
 
334
/*
 
335
  Release the archive Cursor.
 
336
 
 
337
  SYNOPSIS
 
338
    archive_db_done()
 
339
    void
 
340
 
 
341
  RETURN
 
342
    false       OK
 
343
*/
 
344
 
 
345
static int archive_db_done(drizzled::plugin::Registry &registry)
 
346
{
 
347
  registry.remove(archive_engine);
 
348
  delete archive_engine;
 
349
 
 
350
  pthread_mutex_destroy(&archive_mutex);
 
351
 
 
352
  return 0;
 
353
}
 
354
 
199
355
 
200
356
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
 
357
                       TableShare &table_arg)
202
358
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
203
359
{
204
360
  /* Set our original buffer from pre-allocated memory */
205
361
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
206
362
 
207
363
  /* The size of the offset value we will use for position() */
208
 
  ref_length= sizeof(internal::my_off_t);
 
364
  ref_length= sizeof(my_off_t);
209
365
  archive_reader_open= false;
210
366
}
211
367
 
236
392
{
237
393
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
394
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
 
395
  fn_format(data_file_name, table_name.c_str(), "",
 
396
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
241
397
  /*
242
398
    We will use this lock for rows.
243
399
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
 
400
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
245
401
}
246
402
 
247
403
ArchiveShare::~ArchiveShare()
248
404
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
 
405
  thr_lock_delete(&lock);
 
406
  pthread_mutex_destroy(&mutex);
251
407
  /*
252
408
    We need to make sure we don't reset the crashed state.
253
409
    If we open a crashed file, wee need to close it as crashed unless
261
417
 
262
418
bool ArchiveShare::prime(uint64_t *auto_increment)
263
419
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
 
420
  azio_stream archive_tmp;
265
421
 
266
422
  /*
267
423
    We read the meta file, but do not mark it dirty. Since we are not
269
425
    anything but reading... open it for write and we will generate null
270
426
    compression writes).
271
427
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
 
428
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
429
               AZ_METHOD_BLOCK)))
274
430
    return false;
275
431
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
 
432
  *auto_increment= archive_tmp.auto_increment + 1;
 
433
  rows_recorded= (ha_rows)archive_tmp.rows;
 
434
  crashed= archive_tmp.dirty;
279
435
  if (version < global_version)
280
436
  {
281
437
    version_rows= rows_recorded;
282
438
    version= global_version;
283
439
  }
284
 
  azclose(archive_tmp.get());
 
440
  azclose(&archive_tmp);
285
441
 
286
442
  return true;
287
443
}
296
452
*/
297
453
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
298
454
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
 
455
  pthread_mutex_lock(&archive_mutex);
 
456
 
 
457
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
458
  share= a_engine->findOpenTable(table_name);
304
459
 
305
460
  if (!share)
308
463
 
309
464
    if (share == NULL)
310
465
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
466
      pthread_mutex_unlock(&archive_mutex);
312
467
      *rc= HA_ERR_OUT_OF_MEM;
313
468
      return(NULL);
314
469
    }
315
470
 
316
471
    if (share->prime(&stats.auto_increment_value) == false)
317
472
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
473
      pthread_mutex_unlock(&archive_mutex);
319
474
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
475
      delete share;
321
476
 
323
478
    }
324
479
 
325
480
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
481
    thr_lock_init(&share->lock);
327
482
  }
328
483
  share->use_count++;
329
484
 
330
485
  if (share->crashed)
331
486
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
487
  pthread_mutex_unlock(&archive_mutex);
333
488
 
334
489
  return(share);
335
490
}
341
496
*/
342
497
int ha_archive::free_share()
343
498
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
345
 
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
499
  pthread_mutex_lock(&archive_mutex);
347
500
  if (!--share->use_count)
348
501
  {
 
502
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
503
    a_engine->deleteOpenTable(share->table_name);
350
504
    delete share;
351
505
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
506
  pthread_mutex_unlock(&archive_mutex);
353
507
 
354
508
  return 0;
355
509
}
361
515
    a gzip file that can be both read and written we keep a writer open
362
516
    that is shared amoung all open tables.
363
517
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
518
  if (!(azopen(&(share->archive_write), share->data_file_name,
365
519
               O_RDWR, AZ_METHOD_BLOCK)))
366
520
  {
367
521
    share->crashed= true;
387
541
  {
388
542
    az_method method;
389
543
 
390
 
    if (archive_aio_state())
 
544
    switch (archive_use_aio)
391
545
    {
 
546
    case false:
 
547
      method= AZ_METHOD_BLOCK;
 
548
      break;
 
549
    case true:
392
550
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
551
      break;
 
552
    default:
396
553
      method= AZ_METHOD_BLOCK;
397
554
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
555
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
399
556
                 method)))
400
557
    {
401
558
      share->crashed= true;
413
570
  Init out lock.
414
571
  We open the file we will read from.
415
572
*/
416
 
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
 
573
int ha_archive::open(const char *name, int, uint32_t)
417
574
{
418
575
  int rc= 0;
419
 
  share= get_share(identifier.getPath().c_str(), &rc);
 
576
  share= get_share(name, &rc);
420
577
 
421
578
  /** 
422
579
    We either fix it ourselves, or we just take it offline 
437
594
 
438
595
  assert(share);
439
596
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
 
597
  record_buffer= create_record_buffer(table->s->reclength +
 
598
                                      ARCHIVE_ROW_HEADER_SIZE);
 
599
 
 
600
  if (!record_buffer)
 
601
  {
 
602
    free_share();
 
603
    return(HA_ERR_OUT_OF_MEM);
 
604
  }
 
605
 
 
606
  thr_lock_data_init(&share->lock, &lock, NULL);
443
607
 
444
608
  return(rc);
445
609
}
446
610
 
447
 
// Should never be called
448
 
int ha_archive::open(const char *, int, uint32_t)
449
 
{
450
 
  assert(0);
451
 
  return -1;
452
 
}
453
 
 
454
611
 
455
612
/*
456
613
  Closes the file.
473
630
{
474
631
  int rc= 0;
475
632
 
476
 
  record_buffer.clear();
 
633
  destroy_record_buffer(record_buffer);
477
634
 
478
635
  /* First close stream */
479
636
  if (archive_reader_open == true)
497
654
  of creation.
498
655
*/
499
656
 
500
 
int ArchiveEngine::doCreateTable(Session &,
 
657
int ArchiveEngine::doCreateTable(Session *,
 
658
                                 const char *table_name,
501
659
                                 Table& table_arg,
502
 
                                 const drizzled::identifier::Table &identifier,
503
660
                                 drizzled::message::Table& proto)
504
661
{
 
662
  char name_buff[FN_REFLEN];
505
663
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
 
664
  azio_stream create_stream;            /* Archive file we are working with */
507
665
  uint64_t auto_increment_value;
508
666
  string serialized_proto;
509
667
 
511
669
 
512
670
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
513
671
  {
514
 
    KeyInfo *pos= &table_arg.key_info[key];
515
 
    KeyPartInfo *key_part=     pos->key_part;
516
 
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
672
    KEY *pos= table_arg.key_info+key;
 
673
    KEY_PART_INFO *key_part=     pos->key_part;
 
674
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
517
675
 
518
676
    for (; key_part != key_part_end; key_part++)
519
677
    {
521
679
 
522
680
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
681
      {
524
 
        return -1;
 
682
        error= -1;
 
683
        goto error;
525
684
      }
526
685
    }
527
686
  }
528
687
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
 
688
  /*
 
689
    We reuse name_buff since it is available.
 
690
  */
 
691
  fn_format(name_buff, table_name, "", ARZ,
 
692
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
531
693
 
532
 
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
 
694
  my_errno= 0;
 
695
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
696
             AZ_METHOD_BLOCK) == 0)
535
697
  {
536
698
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
540
 
  }
541
 
 
542
 
  try {
543
 
    proto.SerializeToString(&serialized_proto);
544
 
  }
545
 
  catch (...)
546
 
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
550
 
  }
551
 
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
 
699
    goto error2;
 
700
  }
 
701
 
 
702
  proto.SerializeToString(&serialized_proto);
 
703
 
 
704
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
705
                  serialized_proto.length()))
554
 
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
558
 
  }
 
706
    goto error2;
559
707
 
560
708
  if (proto.options().has_comment())
561
709
  {
562
710
    int write_length;
563
711
 
564
 
    write_length= azwrite_comment(create_stream.get(),
 
712
    write_length= azwrite_comment(&create_stream,
565
713
                                  proto.options().comment().c_str(),
566
714
                                  proto.options().comment().length());
567
715
 
568
716
    if (write_length < 0)
569
717
    {
570
718
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
 
719
      goto error2;
574
720
    }
575
721
  }
576
722
 
578
724
    Yes you need to do this, because the starting value
579
725
    for the autoincrement may not be zero.
580
726
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
 
727
  create_stream.auto_increment= auto_increment_value ?
582
728
    auto_increment_value - 1 : 0;
583
729
 
584
 
  if (azclose(create_stream.get()))
 
730
  if (azclose(&create_stream))
585
731
  {
586
732
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
733
    goto error2;
590
734
  }
591
735
 
592
736
  return(0);
 
737
 
 
738
error2:
 
739
  unlink(name_buff);
 
740
 
 
741
error:
 
742
  /* Return error number, if we got one */
 
743
  return(error ? error : -1);
593
744
}
594
745
 
595
746
/*
603
754
  /* We pack the row for writing */
604
755
  r_pack_length= pack_row(buf);
605
756
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
757
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
758
  if (written != r_pack_length)
608
759
  {
609
760
    return(-1);
623
774
 
624
775
uint32_t ha_archive::max_row_length(const unsigned char *)
625
776
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
777
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
778
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
779
 
629
780
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
781
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
782
       ptr != end ;
632
783
       ptr++)
633
784
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
785
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
635
786
  }
636
787
 
637
788
  return length;
646
797
    return(HA_ERR_OUT_OF_MEM);
647
798
 
648
799
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
800
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
801
  ptr= record_buffer->buffer + table->s->null_bytes;
651
802
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
803
  for (Field **field=table->field ; *field ; field++)
653
804
  {
654
805
    if (!((*field)->is_null()))
655
806
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
807
  }
657
808
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
809
  return((unsigned int) (ptr - record_buffer->buffer));
659
810
}
660
811
 
661
812
 
668
819
  for implementing start_bulk_insert() is that we could skip
669
820
  setting dirty to true each time.
670
821
*/
671
 
int ha_archive::doInsertRecord(unsigned char *buf)
 
822
int ha_archive::write_row(unsigned char *buf)
672
823
{
673
824
  int rc;
674
825
  unsigned char *read_buf= NULL;
675
826
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
827
  unsigned char *record=  table->record[0];
677
828
 
678
829
  if (share->crashed)
679
830
    return(HA_ERR_CRASHED_ON_USAGE);
680
831
 
681
 
  pthread_mutex_lock(&share->mutex());
 
832
  ha_statistic_increment(&SSV::ha_write_count);
 
833
  pthread_mutex_lock(&share->mutex);
682
834
 
683
835
  if (share->archive_write_open == false)
684
836
    if (init_archive_writer())
685
837
      return(HA_ERR_CRASHED_ON_USAGE);
686
838
 
687
839
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
840
  if (table->next_number_field && record == table->record[0])
689
841
  {
 
842
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
843
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
844
    temp_auto= table->next_number_field->val_int();
692
845
 
693
846
    /*
694
847
      We don't support decremening auto_increment. They make the performance
695
848
      just cry.
696
849
    */
697
850
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
851
        mkey->flags & HA_NOSAME)
699
852
    {
700
853
      rc= HA_ERR_FOUND_DUPP_KEY;
701
854
      goto error;
715
868
  share->rows_recorded++;
716
869
  rc= real_write_row(buf,  &(share->archive_write));
717
870
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
871
  pthread_mutex_unlock(&share->mutex);
719
872
  if (read_buf)
720
873
    free((unsigned char*) read_buf);
721
874
 
730
883
  *first_value= share->archive_write.auto_increment + 1;
731
884
}
732
885
 
733
 
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
 
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
886
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
887
int ha_archive::index_init(uint32_t keynr, bool)
735
888
{
736
889
  active_index= keynr;
737
890
  return(0);
743
896
  the optimizer that we have unique indexes, we scan
744
897
*/
745
898
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
 
                             uint32_t key_len, enum ha_rkey_function)
 
899
                             uint32_t key_len, enum ha_rkey_function find_flag)
 
900
{
 
901
  int rc;
 
902
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
 
903
  return(rc);
 
904
}
 
905
 
 
906
 
 
907
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
 
908
                               uint32_t key_len, enum ha_rkey_function)
747
909
{
748
910
  int rc;
749
911
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
912
  KEY *mkey= &table->s->key_info[index];
 
913
  current_k_offset= mkey->key_part->offset;
751
914
  current_key= key;
752
915
  current_key_len= key_len;
753
916
 
754
 
  rc= doStartTableScan(true);
 
917
  rc= rnd_init(true);
755
918
 
756
919
  if (rc)
757
920
    goto error;
795
958
  we assume the position will be set.
796
959
*/
797
960
 
798
 
int ha_archive::doStartTableScan(bool scan)
 
961
int ha_archive::rnd_init(bool scan)
799
962
{
800
963
  if (share->crashed)
801
964
      return(HA_ERR_CRASHED_ON_USAGE);
832
995
/* Reallocate buffer if needed */
833
996
bool ha_archive::fix_rec_buff(unsigned int length)
834
997
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
998
  assert(record_buffer->buffer);
 
999
 
 
1000
  if (length > record_buffer->length)
 
1001
  {
 
1002
    unsigned char *newptr;
 
1003
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
1004
      return(1);
 
1005
    record_buffer->buffer= newptr;
 
1006
    record_buffer->length= length;
 
1007
  }
 
1008
 
 
1009
  assert(length <= record_buffer->length);
 
1010
 
 
1011
  return(0);
838
1012
}
839
1013
 
840
1014
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
1026
  }
853
1027
 
854
1028
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
1029
  memcpy(record, ptr, table->getNullBytes());
 
1030
  ptr+= table->getNullBytes();
 
1031
  for (Field **field=table->field ; *field ; field++)
858
1032
  {
859
1033
    if (!((*field)->is_null()))
860
1034
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
1035
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
1036
    }
863
1037
  }
864
1038
  return(0);
889
1063
    return(HA_ERR_END_OF_FILE);
890
1064
  scan_rows--;
891
1065
 
892
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1066
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
893
1067
  current_position= aztell(&archive);
894
1068
  rc= get_row(&archive, buf);
895
1069
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
1070
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
1071
 
898
1072
  return(rc);
899
1073
}
907
1081
 
908
1082
void ha_archive::position(const unsigned char *)
909
1083
{
910
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1084
  my_store_ptr(ref, ref_length, current_position);
911
1085
  return;
912
1086
}
913
1087
 
921
1095
 
922
1096
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
923
1097
{
924
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
925
 
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
 
1098
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1099
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
926
1100
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
927
1101
    return(HA_ERR_CRASHED_ON_USAGE);
928
1102
  return(get_row(&archive, buf));
951
1125
int ha_archive::optimize()
952
1126
{
953
1127
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1128
  azio_stream writer;
 
1129
  char writer_filename[FN_REFLEN];
955
1130
 
956
1131
  init_archive_reader();
957
1132
 
971
1146
  azread_frm(&archive, proto_string);
972
1147
 
973
1148
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
 
1149
  fn_format(writer_filename, share->table_name.c_str(), "", ARN,
 
1150
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
976
1151
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
1152
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
1153
  {
979
1154
    free(proto_string);
980
1155
    return(HA_ERR_CRASHED_ON_USAGE);
981
1156
  }
982
1157
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
 
1158
  azwrite_frm(&writer, proto_string, archive.frm_length);
984
1159
 
985
1160
  /*
986
1161
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1004
1179
    */
1005
1180
    if (!rc)
1006
1181
    {
 
1182
      uint64_t x;
1007
1183
      uint64_t rows_restored;
1008
1184
      share->rows_recorded= 0;
1009
1185
      stats.auto_increment_value= 1;
1011
1187
 
1012
1188
      rows_restored= archive.rows;
1013
1189
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1190
      for (x= 0; x < rows_restored ; x++)
1015
1191
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1192
        rc= get_row(&archive, table->record[0]);
1017
1193
 
1018
1194
        if (rc != 0)
1019
1195
          break;
1020
1196
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1197
        real_write_row(table->record[0], &writer);
1022
1198
        /*
1023
1199
          Long term it should be possible to optimize this so that
1024
1200
          it is not called on each row.
1025
1201
        */
1026
 
        if (getTable()->found_next_number_field)
 
1202
        if (table->found_next_number_field)
1027
1203
        {
1028
 
          Field *field= getTable()->found_next_number_field;
 
1204
          Field *field= table->found_next_number_field;
1029
1205
 
1030
1206
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
1207
          field->setReadSet();
1032
1208
 
1033
1209
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1210
            (uint64_t) field->val_int(table->record[0] +
 
1211
                                       field->offset(table->record[0]));
1036
1212
          if (share->archive_write.auto_increment < auto_value)
1037
1213
            stats.auto_increment_value=
1038
1214
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1215
        }
1040
1216
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1217
      share->rows_recorded= (ha_rows)writer.rows;
1042
1218
    }
1043
1219
 
1044
1220
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1223
    }
1048
1224
  }
1049
1225
 
1050
 
  azclose(writer.get());
 
1226
  azclose(&writer);
1051
1227
  share->dirty= false;
1052
1228
 
1053
1229
  azclose(&archive);
1054
1230
 
1055
1231
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
 
1232
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1057
1233
 
1058
1234
  free(proto_string);
1059
1235
  return(rc);
1060
1236
error:
1061
1237
  free(proto_string);
1062
 
  azclose(writer.get());
 
1238
  azclose(&writer);
1063
1239
 
1064
1240
  return(rc);
1065
1241
}
1084
1260
 
1085
1261
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
1262
         lock_type <= TL_WRITE)
1087
 
        && ! session->doing_tablespace_operation())
 
1263
        && !session_tablespace_op(session))
1088
1264
      lock_type = TL_WRITE_ALLOW_WRITE;
1089
1265
 
1090
1266
    /*
1115
1291
    If dirty, we lock, and then reset/flush the data.
1116
1292
    I found that just calling azflush() doesn't always work.
1117
1293
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1294
  pthread_mutex_lock(&share->mutex);
1119
1295
  if (share->dirty == true)
1120
1296
  {
1121
1297
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1310
    cause the number to be inaccurate.
1135
1311
  */
1136
1312
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1313
  pthread_mutex_unlock(&share->mutex);
1138
1314
 
1139
1315
  scan_rows= stats.records;
1140
1316
  stats.deleted= 0;
1144
1320
  {
1145
1321
    struct stat file_stat;  // Stat information for the data file
1146
1322
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1323
    stat(share->data_file_name, &file_stat);
1148
1324
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1325
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1326
    stats.data_file_length= file_stat.st_size;
1151
1327
    stats.create_time= file_stat.st_ctime;
1152
1328
    stats.update_time= file_stat.st_mtime;
1158
1334
  if (flag & HA_STATUS_AUTO)
1159
1335
  {
1160
1336
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1337
    pthread_mutex_lock(&share->mutex);
1162
1338
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1339
    pthread_mutex_unlock(&share->mutex);
1164
1340
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1341
  }
1166
1342
 
1170
1346
 
1171
1347
/*
1172
1348
  This method tells us that a bulk insert operation is about to occur. We set
1173
 
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
 
1349
  a flag which will keep write_row from saying that its data is dirty. This in
1174
1350
  turn will keep selects from causing a sync to occur.
1175
1351
  Basically, yet another optimizations to keep compression working well.
1176
1352
*/
1211
1387
{
1212
1388
  int rc= 0;
1213
1389
  const char *old_proc_info;
 
1390
  uint64_t x;
1214
1391
 
1215
 
  old_proc_info= session->get_proc_info();
1216
 
  session->set_proc_info("Checking table");
 
1392
  old_proc_info= get_session_proc_info(session);
 
1393
  set_session_proc_info(session, "Checking table");
1217
1394
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1395
  pthread_mutex_lock(&share->mutex);
1219
1396
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1397
  pthread_mutex_unlock(&share->mutex);
1221
1398
 
1222
1399
  /*
1223
1400
    Now we will rewind the archive file so that we are positioned at the
1226
1403
  init_archive_reader();
1227
1404
  azflush(&archive, Z_SYNC_FLUSH);
1228
1405
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1406
  for (x= 0; x < share->archive_write.rows; x++)
1230
1407
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1408
    rc= get_row(&archive, table->record[0]);
1232
1409
 
1233
1410
    if (rc != 0)
1234
1411
      break;
1235
1412
  }
1236
1413
 
1237
 
  session->set_proc_info(old_proc_info);
 
1414
  set_session_proc_info(session, old_proc_info);
1238
1415
 
1239
1416
  if ((rc && rc != HA_ERR_END_OF_FILE))
1240
1417
  {
1247
1424
  }
1248
1425
}
1249
1426
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1251
 
{
1252
 
  int error= 0;
1253
 
 
1254
 
  for (const char **ext= bas_ext(); *ext ; ext++)
1255
 
  {
1256
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1257
 
    {
1258
 
      if ((error=errno) != ENOENT)
1259
 
        break;
1260
 
      error= 0;
1261
 
    }
1262
 
  }
1263
 
 
1264
 
  return error;
1265
 
}
1266
 
 
1267
 
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const identifier::Table &identifier)
1269
 
{
1270
 
  string proto_path(identifier.getPath());
1271
 
  proto_path.append(ARZ);
1272
 
 
1273
 
  if (access(proto_path.c_str(), F_OK))
1274
 
  {
1275
 
    return false;
1276
 
  }
1277
 
 
1278
 
  return true;
1279
 
}
1280
 
 
1281
 
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::identifier::Schema &schema_identifier,
1283
 
                                          drizzled::identifier::Table::vector &set_of_identifiers)
1284
 
{
1285
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
 
 
1287
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
1288
 
       entry_iter != entries.end(); ++entry_iter)
1289
 
  {
1290
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
 
    const string *filename= &entry->filename;
1292
 
 
1293
 
    assert(filename->size());
1294
 
 
1295
 
    const char *ext= strchr(filename->c_str(), '.');
1296
 
 
1297
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1299
 
    {  }
1300
 
    else
1301
 
    {
1302
 
      char uname[NAME_LEN + 1];
1303
 
      uint32_t file_name_len;
1304
 
 
1305
 
      file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
 
      // TODO: Remove need for memory copy here
1307
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
 
 
1309
 
      set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
1310
 
    }
1311
 
  }
1312
 
}
 
1427
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1428
{
 
1429
  archive_record_buffer *r;
 
1430
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1431
  {
 
1432
    return(NULL);
 
1433
  }
 
1434
  r->length= (int)length;
 
1435
 
 
1436
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1437
  {
 
1438
    free((char*) r);
 
1439
    return(NULL);
 
1440
  }
 
1441
 
 
1442
  return(r);
 
1443
}
 
1444
 
 
1445
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1446
{
 
1447
  free((char*) r->buffer);
 
1448
  free((char*) r);
 
1449
  return;
 
1450
}
 
1451
 
 
1452
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
 
1453
  PLUGIN_VAR_NOCMDOPT,
 
1454
  "Whether or not to use asynchronous IO.",
 
1455
  NULL, NULL, true);
 
1456
 
 
1457
static drizzle_sys_var* archive_system_variables[]= {
 
1458
  DRIZZLE_SYSVAR(aio),
 
1459
  NULL
 
1460
};
 
1461
 
 
1462
DRIZZLE_DECLARE_PLUGIN
 
1463
{
 
1464
  "ARCHIVE",
 
1465
  "3.5",
 
1466
  "Brian Aker, MySQL AB",
 
1467
  "Archive storage engine",
 
1468
  PLUGIN_LICENSE_GPL,
 
1469
  archive_db_init, /* Plugin Init */
 
1470
  archive_db_done, /* Plugin Deinit */
 
1471
  NULL,                       /* status variables                */
 
1472
  archive_system_variables,   /* system variables                */
 
1473
  NULL                        /* config options                  */
 
1474
}
 
1475
DRIZZLE_DECLARE_PLUGIN_END;
 
1476