~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2003 MySQL AB
 
2
   Copyright (C) 2010 Brian Aker
2
3
 
3
4
  This program is free software; you can redistribute it and/or modify
4
5
  it under the terms of the GNU General Public License as published by
11
12
 
12
13
  You should have received a copy of the GNU General Public License
13
14
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
 
17
 
#include "drizzled/server_includes.h"
18
 
#include "drizzled/field.h"
19
 
#include "drizzled/field/blob.h"
20
 
#include "drizzled/field/timestamp.h"
21
 
#include "plugin/myisam/myisam.h"
22
 
#include "drizzled/table.h"
23
 
#include "drizzled/session.h"
24
 
#include <mysys/my_dir.h>
25
 
 
26
 
#include "ha_archive.h"
27
 
 
28
 
#include <stdio.h>
29
 
#include <string>
30
 
#include <map>
 
15
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
16
 
 
17
 
 
18
#include "config.h"
 
19
 
 
20
#include "plugin/archive/archive_engine.h"
 
21
#include <memory>
 
22
#include <boost/scoped_ptr.hpp>
31
23
 
32
24
using namespace std;
 
25
using namespace drizzled;
33
26
 
34
 
static const string engine_name("ARCHIVE");
35
27
 
36
28
/*
37
29
  First, if you want to understand storage engines you should look at
101
93
    -Brian
102
94
*/
103
95
 
104
 
/* Variables for archive share methods */
105
 
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
106
 
 
107
 
std::map<const char *, ArchiveShare *> archive_open_tables;
108
 
 
109
 
static unsigned int global_version;
110
 
 
111
 
/* The file extension */
112
 
#define ARZ ".ARZ"               // The data file
113
 
#define ARN ".ARN"               // Files used during an optimize call
114
 
 
115
 
 
116
 
 
117
 
static bool archive_use_aio= false;
 
96
/* When the engine starts up set the first version */
 
97
static uint64_t global_version= 1;
 
98
 
 
99
// We use this to find out the state of the archive aio option.
 
100
extern bool archive_aio_state(void);
118
101
 
119
102
/*
120
103
  Number of rows that will force a bulk insert.
126
109
*/
127
110
#define ARCHIVE_ROW_HEADER_SIZE 4
128
111
 
129
 
/*
130
 
  We just implement one additional file extension.
131
 
*/
132
 
static const char *ha_archive_exts[] = {
133
 
  ARZ,
134
 
  NULL
135
 
};
136
 
 
137
 
class ArchiveTableNameIterator: public TableNameIteratorImplementation
138
 
{
139
 
private:
140
 
  MY_DIR *dirp;
141
 
  uint32_t current_entry;
142
 
 
143
 
public:
144
 
  ArchiveTableNameIterator(const std::string &database)
145
 
    : TableNameIteratorImplementation(database), dirp(NULL), current_entry(-1)
146
 
    {};
147
 
 
148
 
  ~ArchiveTableNameIterator();
149
 
 
150
 
  int next(std::string *name);
151
 
 
152
 
};
153
 
 
154
 
ArchiveTableNameIterator::~ArchiveTableNameIterator()
155
 
{
156
 
  if (dirp)
157
 
    my_dirend(dirp);
158
 
}
159
 
 
160
 
int ArchiveTableNameIterator::next(string *name)
161
 
{
162
 
  char uname[NAME_LEN + 1];
163
 
  FILEINFO *file;
164
 
  char *ext;
165
 
  uint32_t file_name_len;
166
 
  const char *wild= NULL;
167
 
 
168
 
  if (dirp == NULL)
169
 
  {
170
 
    bool dir= false;
171
 
    char path[FN_REFLEN];
172
 
 
173
 
    build_table_filename(path, sizeof(path), db.c_str(), "", false);
174
 
    dirp = my_dir(path,MYF(dir ? MY_WANT_STAT : 0));
175
 
    if (dirp == NULL)
176
 
    {
177
 
      if (my_errno == ENOENT)
178
 
        my_error(ER_BAD_DB_ERROR, MYF(ME_BELL+ME_WAITTANG), db.c_str());
179
 
      else
180
 
        my_error(ER_CANT_READ_DIR, MYF(ME_BELL+ME_WAITTANG), path, my_errno);
181
 
      return(ENOENT);
182
 
    }
183
 
    current_entry= -1;
184
 
  }
185
 
 
186
 
  while(true)
187
 
  {
188
 
    current_entry++;
189
 
 
190
 
    if (current_entry == dirp->number_off_files)
191
 
    {
192
 
      my_dirend(dirp);
193
 
      dirp= NULL;
194
 
      return -1;
195
 
    }
196
 
 
197
 
    file= dirp->dir_entry + current_entry;
198
 
 
199
 
    if (my_strcasecmp(system_charset_info, ext=strchr(file->name,'.'), ARZ) ||
200
 
        is_prefix(file->name, TMP_FILE_PREFIX))
201
 
      continue;
202
 
    *ext=0;
203
 
 
204
 
    file_name_len= filename_to_tablename(file->name, uname, sizeof(uname));
205
 
 
206
 
    uname[file_name_len]= '\0';
207
 
 
208
 
    if (wild && wild_compare(uname, wild, 0))
209
 
      continue;
210
 
    if (name)
211
 
      name->assign(uname);
212
 
 
213
 
    return 0;
214
 
  }
215
 
}
216
 
 
217
 
class ArchiveEngine : public StorageEngine
218
 
{
219
 
public:
220
 
  ArchiveEngine(const string &name_arg) : StorageEngine(name_arg,
221
 
                                      HTON_FILE_BASED
222
 
                                    | HTON_HAS_DATA_DICTIONARY
223
 
                                    | HTON_DATA_DIR) {}
224
 
 
225
 
  virtual handler *create(TableShare *table,
226
 
                          MEM_ROOT *mem_root)
227
 
  {
228
 
    return new (mem_root) ha_archive(this, table);
229
 
  }
230
 
 
231
 
  const char **bas_ext() const {
232
 
    return ha_archive_exts;
233
 
  }
234
 
 
235
 
  int createTableImplementation(Session *session, const char *table_name,
236
 
                                Table *table_arg, HA_CREATE_INFO *create_info,
237
 
                                drizzled::message::Table* proto);
238
 
 
239
 
  int getTableProtoImplementation(const char* path,
240
 
                                  drizzled::message::Table *table_proto);
241
 
 
242
 
  TableNameIteratorImplementation* tableNameIterator(const std::string &database)
243
 
  {
244
 
    return new ArchiveTableNameIterator(database);
245
 
  }
246
 
};
247
 
 
248
 
int ArchiveEngine::getTableProtoImplementation(const char* path,
249
 
                                         drizzled::message::Table *table_proto)
 
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
 
113
{
 
114
  ArchiveMap::iterator find_iter=
 
115
    archive_open_tables.find(table_name);
 
116
 
 
117
  if (find_iter != archive_open_tables.end())
 
118
    return (*find_iter).second;
 
119
  else
 
120
    return NULL;
 
121
}
 
122
 
 
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
 
124
{
 
125
  archive_open_tables[table_name]= share;
 
126
}
 
127
 
 
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
 
129
{
 
130
  archive_open_tables.erase(table_name);
 
131
}
 
132
 
 
133
 
 
134
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
 
135
{
 
136
  string new_path(identifier.getPath());
 
137
 
 
138
  new_path+= ARZ;
 
139
 
 
140
  int error= unlink(new_path.c_str());
 
141
 
 
142
  if (error != 0)
 
143
  {
 
144
    error= errno= errno;
 
145
  }
 
146
 
 
147
  return error;
 
148
}
 
149
 
 
150
int ArchiveEngine::doGetTableDefinition(Session&,
 
151
                                        const identifier::Table &identifier,
 
152
                                        drizzled::message::Table &table_proto)
250
153
{
251
154
  struct stat stat_info;
252
 
  int error= 0;
 
155
  int error= ENOENT;
253
156
  string proto_path;
254
157
 
255
158
  proto_path.reserve(FN_REFLEN);
256
 
  proto_path.assign(path);
 
159
  proto_path.assign(identifier.getPath());
257
160
 
258
161
  proto_path.append(ARZ);
259
162
 
260
163
  if (stat(proto_path.c_str(),&stat_info))
261
164
    return errno;
 
165
  else
 
166
    error= EEXIST;
262
167
 
263
 
  if (table_proto)
264
168
  {
265
 
    azio_stream proto_stream;
 
169
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
266
170
    char* proto_string;
267
 
    if(azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
171
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
268
172
      return HA_ERR_CRASHED_ON_USAGE;
269
173
 
270
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
 
174
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
271
175
    if (proto_string == NULL)
272
176
    {
273
 
      azclose(&proto_stream);
 
177
      azclose(proto_stream.get());
274
178
      return ENOMEM;
275
179
    }
276
180
 
277
 
    azread_frm(&proto_stream, proto_string);
 
181
    azread_frm(proto_stream.get(), proto_string);
278
182
 
279
 
    if(table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
 
183
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
280
184
      error= HA_ERR_CRASHED_ON_USAGE;
281
185
 
282
 
    azclose(&proto_stream);
 
186
    azclose(proto_stream.get());
283
187
    free(proto_string);
284
188
  }
285
189
 
286
 
  return EEXIST;
287
 
}
288
 
 
289
 
static ArchiveEngine *archive_engine= NULL;
290
 
 
291
 
/*
292
 
  Initialize the archive handler.
293
 
 
294
 
  SYNOPSIS
295
 
    archive_db_init()
296
 
    void *
297
 
 
298
 
  RETURN
299
 
    false       OK
300
 
    true        Error
301
 
*/
302
 
 
303
 
static int archive_db_init(drizzled::plugin::Registry &registry)
304
 
{
305
 
 
306
 
  pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
307
 
  archive_engine= new ArchiveEngine(engine_name);
308
 
  registry.add(archive_engine);
309
 
 
310
 
  /* When the engine starts up set the first version */
311
 
  global_version= 1;
312
 
 
313
 
  return false;
314
 
}
315
 
 
316
 
/*
317
 
  Release the archive handler.
318
 
 
319
 
  SYNOPSIS
320
 
    archive_db_done()
321
 
    void
322
 
 
323
 
  RETURN
324
 
    false       OK
325
 
*/
326
 
 
327
 
static int archive_db_done(drizzled::plugin::Registry &registry)
328
 
{
329
 
  registry.remove(archive_engine);
330
 
  delete archive_engine;
331
 
 
332
 
  pthread_mutex_destroy(&archive_mutex);
333
 
 
334
 
  return 0;
335
 
}
336
 
 
337
 
 
338
 
ha_archive::ha_archive(StorageEngine *engine_arg, TableShare *table_arg)
339
 
  :handler(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
 
190
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
 
191
     we do not rewrite the table proto (as it's wedged in the file header)
 
192
  */
 
193
  table_proto.set_schema(identifier.getSchemaName());
 
194
  table_proto.set_name(identifier.getTableName());
 
195
 
 
196
  return error;
 
197
}
 
198
 
 
199
 
 
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
 
201
                       Table &table_arg)
 
202
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
340
203
{
341
204
  /* Set our original buffer from pre-allocated memory */
342
205
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
343
206
 
344
207
  /* The size of the offset value we will use for position() */
345
 
  ref_length= sizeof(my_off_t);
 
208
  ref_length= sizeof(internal::my_off_t);
346
209
  archive_reader_open= false;
347
210
}
348
211
 
373
236
{
374
237
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
375
238
  table_name.append(name);
376
 
  fn_format(data_file_name, table_name.c_str(), "",
377
 
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
239
  data_file_name.assign(table_name);
 
240
  data_file_name.append(ARZ);
378
241
  /*
379
242
    We will use this lock for rows.
380
243
  */
381
 
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
 
244
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
382
245
}
383
246
 
384
247
ArchiveShare::~ArchiveShare()
385
248
{
386
 
  thr_lock_delete(&lock);
387
 
  pthread_mutex_destroy(&mutex);
 
249
  _lock.deinit();
 
250
  pthread_mutex_destroy(&_mutex);
388
251
  /*
389
252
    We need to make sure we don't reset the crashed state.
390
253
    If we open a crashed file, wee need to close it as crashed unless
398
261
 
399
262
bool ArchiveShare::prime(uint64_t *auto_increment)
400
263
{
401
 
  azio_stream archive_tmp;
 
264
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
402
265
 
403
266
  /*
404
267
    We read the meta file, but do not mark it dirty. Since we are not
406
269
    anything but reading... open it for write and we will generate null
407
270
    compression writes).
408
271
  */
409
 
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
 
272
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
410
273
               AZ_METHOD_BLOCK)))
411
274
    return false;
412
275
 
413
 
  *auto_increment= archive_tmp.auto_increment + 1;
414
 
  rows_recorded= (ha_rows)archive_tmp.rows;
415
 
  crashed= archive_tmp.dirty;
 
276
  *auto_increment= archive_tmp->auto_increment + 1;
 
277
  rows_recorded= (ha_rows)archive_tmp->rows;
 
278
  crashed= archive_tmp->dirty;
416
279
  if (version < global_version)
417
280
  {
418
281
    version_rows= rows_recorded;
419
282
    version= global_version;
420
283
  }
421
 
  azclose(&archive_tmp);
 
284
  azclose(archive_tmp.get());
422
285
 
423
286
  return true;
424
287
}
433
296
*/
434
297
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
435
298
{
436
 
  uint32_t length;
437
 
  map<const char *, ArchiveShare *> ::iterator find_iter;
438
 
 
439
 
  pthread_mutex_lock(&archive_mutex);
440
 
  length=(uint) strlen(table_name);
441
 
 
442
 
  find_iter= archive_open_tables.find(table_name);
443
 
 
444
 
  if (find_iter != archive_open_tables.end())
445
 
    share= (*find_iter).second;
446
 
  else
447
 
    share= NULL;
 
299
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
300
 
 
301
  pthread_mutex_lock(&a_engine->mutex());
 
302
 
 
303
  share= a_engine->findOpenTable(table_name);
448
304
 
449
305
  if (!share)
450
306
  {
452
308
 
453
309
    if (share == NULL)
454
310
    {
455
 
      pthread_mutex_unlock(&archive_mutex);
 
311
      pthread_mutex_unlock(&a_engine->mutex());
456
312
      *rc= HA_ERR_OUT_OF_MEM;
457
313
      return(NULL);
458
314
    }
459
315
 
460
316
    if (share->prime(&stats.auto_increment_value) == false)
461
317
    {
462
 
      pthread_mutex_unlock(&archive_mutex);
 
318
      pthread_mutex_unlock(&a_engine->mutex());
463
319
      *rc= HA_ERR_CRASHED_ON_REPAIR;
464
320
      delete share;
465
321
 
466
322
      return NULL;
467
323
    }
468
324
 
469
 
    archive_open_tables[share->table_name.c_str()]= share; 
470
 
    thr_lock_init(&share->lock);
 
325
    a_engine->addOpenTable(share->table_name, share);
 
326
    thr_lock_init(&share->_lock);
471
327
  }
472
328
  share->use_count++;
 
329
 
473
330
  if (share->crashed)
474
331
    *rc= HA_ERR_CRASHED_ON_USAGE;
475
 
  pthread_mutex_unlock(&archive_mutex);
 
332
  pthread_mutex_unlock(&a_engine->mutex());
476
333
 
477
334
  return(share);
478
335
}
484
341
*/
485
342
int ha_archive::free_share()
486
343
{
487
 
  pthread_mutex_lock(&archive_mutex);
 
344
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
345
 
 
346
  pthread_mutex_lock(&a_engine->mutex());
488
347
  if (!--share->use_count)
489
348
  {
490
 
    archive_open_tables.erase(share->table_name.c_str());
 
349
    a_engine->deleteOpenTable(share->table_name);
491
350
    delete share;
492
351
  }
493
 
  pthread_mutex_unlock(&archive_mutex);
 
352
  pthread_mutex_unlock(&a_engine->mutex());
494
353
 
495
354
  return 0;
496
355
}
502
361
    a gzip file that can be both read and written we keep a writer open
503
362
    that is shared amoung all open tables.
504
363
  */
505
 
  if (!(azopen(&(share->archive_write), share->data_file_name,
 
364
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
506
365
               O_RDWR, AZ_METHOD_BLOCK)))
507
366
  {
508
367
    share->crashed= true;
515
374
 
516
375
 
517
376
/*
518
 
  No locks are required because it is associated with just one handler instance
 
377
  No locks are required because it is associated with just one Cursor instance
519
378
*/
520
379
int ha_archive::init_archive_reader()
521
380
{
528
387
  {
529
388
    az_method method;
530
389
 
531
 
    switch (archive_use_aio)
 
390
    if (archive_aio_state())
532
391
    {
533
 
    case false:
534
 
      method= AZ_METHOD_BLOCK;
535
 
      break;
536
 
    case true:
537
392
      method= AZ_METHOD_AIO;
538
 
      break;
539
 
    default:
 
393
    }
 
394
    else
 
395
    {
540
396
      method= AZ_METHOD_BLOCK;
541
397
    }
542
 
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
 
398
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
543
399
                 method)))
544
400
    {
545
401
      share->crashed= true;
557
413
  Init out lock.
558
414
  We open the file we will read from.
559
415
*/
560
 
int ha_archive::open(const char *name, int, uint32_t open_options)
 
416
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
561
417
{
562
418
  int rc= 0;
563
 
  share= get_share(name, &rc);
564
 
 
565
 
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
 
419
  share= get_share(identifier.getPath().c_str(), &rc);
 
420
 
 
421
  /** 
 
422
    We either fix it ourselves, or we just take it offline 
 
423
 
 
424
    @todo Create some documentation in the recovery tools shipped with the engine.
 
425
  */
 
426
  if (rc == HA_ERR_CRASHED_ON_USAGE)
566
427
  {
567
 
    /* purecov: begin inspected */
568
428
    free_share();
569
 
    return(rc);
570
 
    /* purecov: end */
 
429
    rc= repair();
 
430
 
 
431
    return 0;
571
432
  }
572
433
  else if (rc == HA_ERR_OUT_OF_MEM)
573
434
  {
576
437
 
577
438
  assert(share);
578
439
 
579
 
  record_buffer= create_record_buffer(table->s->reclength +
580
 
                                      ARCHIVE_ROW_HEADER_SIZE);
581
 
 
582
 
  if (!record_buffer)
583
 
  {
584
 
    free_share();
585
 
    return(HA_ERR_OUT_OF_MEM);
586
 
  }
587
 
 
588
 
  thr_lock_data_init(&share->lock, &lock, NULL);
589
 
 
590
 
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
591
 
  {
592
 
    return(0);
593
 
  }
594
 
  else
595
 
    return(rc);
 
440
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
 
441
 
 
442
  lock.init(&share->_lock);
 
443
 
 
444
  return(rc);
 
445
}
 
446
 
 
447
// Should never be called
 
448
int ha_archive::open(const char *, int, uint32_t)
 
449
{
 
450
  assert(0);
 
451
  return -1;
596
452
}
597
453
 
598
454
 
617
473
{
618
474
  int rc= 0;
619
475
 
620
 
  destroy_record_buffer(record_buffer);
 
476
  record_buffer.clear();
621
477
 
622
478
  /* First close stream */
623
479
  if (archive_reader_open == true)
641
497
  of creation.
642
498
*/
643
499
 
644
 
int ArchiveEngine::createTableImplementation(Session *session,
645
 
                                             const char *table_name,
646
 
                                             Table *table_arg,
647
 
                                             HA_CREATE_INFO *create_info,
648
 
                                             drizzled::message::Table *proto)
 
500
int ArchiveEngine::doCreateTable(Session &,
 
501
                                 Table& table_arg,
 
502
                                 const drizzled::identifier::Table &identifier,
 
503
                                 drizzled::message::Table& proto)
649
504
{
650
 
  char name_buff[FN_REFLEN];
651
 
  char linkname[FN_REFLEN];
652
505
  int error= 0;
653
 
  azio_stream create_stream;            /* Archive file we are working with */
 
506
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
654
507
  uint64_t auto_increment_value;
655
508
  string serialized_proto;
656
509
 
657
 
  auto_increment_value= create_info->auto_increment_value;
 
510
  auto_increment_value= proto.options().auto_increment_value();
658
511
 
659
 
  for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
 
512
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
660
513
  {
661
 
    KEY *pos= table_arg->key_info+key;
662
 
    KEY_PART_INFO *key_part=     pos->key_part;
663
 
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
 
514
    KeyInfo *pos= &table_arg.key_info[key];
 
515
    KeyPartInfo *key_part=     pos->key_part;
 
516
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
664
517
 
665
518
    for (; key_part != key_part_end; key_part++)
666
519
    {
668
521
 
669
522
      if (!(field->flags & AUTO_INCREMENT_FLAG))
670
523
      {
671
 
        error= -1;
672
 
        goto error;
 
524
        return -1;
673
525
      }
674
526
    }
675
527
  }
676
528
 
677
 
  /*
678
 
    We reuse name_buff since it is available.
679
 
  */
680
 
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
681
 
  {
682
 
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
683
 
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
684
 
    fn_format(linkname, table_name, "", ARZ,
685
 
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
686
 
  }
687
 
  else
688
 
  {
689
 
    fn_format(name_buff, table_name, "", ARZ,
690
 
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
691
 
    linkname[0]= 0;
692
 
  }
 
529
  std::string named_file= identifier.getPath();
 
530
  named_file.append(ARZ);
693
531
 
694
 
  my_errno= 0;
695
 
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
 
532
  errno= 0;
 
533
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
696
534
             AZ_METHOD_BLOCK) == 0)
697
535
  {
698
536
    error= errno;
699
 
    goto error2;
700
 
  }
701
 
 
702
 
  if (linkname[0])
703
 
    if(symlink(name_buff, linkname) != 0)
704
 
      goto error2;
705
 
 
706
 
  proto->SerializeToString(&serialized_proto);
707
 
 
708
 
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
 
537
    unlink(named_file.c_str());
 
538
 
 
539
    return(error ? error : -1);
 
540
  }
 
541
 
 
542
  try {
 
543
    proto.SerializeToString(&serialized_proto);
 
544
  }
 
545
  catch (...)
 
546
  {
 
547
    unlink(named_file.c_str());
 
548
 
 
549
    return(error ? error : -1);
 
550
  }
 
551
 
 
552
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
709
553
                  serialized_proto.length()))
710
 
    goto error2;
711
 
 
712
 
  if (create_info->comment.str)
713
 
  {
714
 
    size_t write_length;
715
 
 
716
 
    write_length= azwrite_comment(&create_stream, create_info->comment.str,
717
 
                                  (unsigned int)create_info->comment.length);
718
 
 
719
 
    if (write_length == (size_t)create_info->comment.length)
720
 
      goto error2;
 
554
  {
 
555
    unlink(named_file.c_str());
 
556
 
 
557
    return(error ? error : -1);
 
558
  }
 
559
 
 
560
  if (proto.options().has_comment())
 
561
  {
 
562
    int write_length;
 
563
 
 
564
    write_length= azwrite_comment(create_stream.get(),
 
565
                                  proto.options().comment().c_str(),
 
566
                                  proto.options().comment().length());
 
567
 
 
568
    if (write_length < 0)
 
569
    {
 
570
      error= errno;
 
571
      unlink(named_file.c_str());
 
572
 
 
573
      return(error ? error : -1);
 
574
    }
721
575
  }
722
576
 
723
577
  /*
724
578
    Yes you need to do this, because the starting value
725
579
    for the autoincrement may not be zero.
726
580
  */
727
 
  create_stream.auto_increment= auto_increment_value ?
 
581
  create_stream->auto_increment= auto_increment_value ?
728
582
    auto_increment_value - 1 : 0;
729
583
 
730
 
  if (azclose(&create_stream))
 
584
  if (azclose(create_stream.get()))
731
585
  {
732
586
    error= errno;
733
 
    goto error2;
 
587
    unlink(named_file.c_str());
 
588
 
 
589
    return(error ? error : -1);
734
590
  }
735
591
 
736
592
  return(0);
737
 
 
738
 
error2:
739
 
  deleteTable(session, table_name);
740
 
error:
741
 
  /* Return error number, if we got one */
742
 
  return(error ? error : -1);
743
593
}
744
594
 
745
595
/*
753
603
  /* We pack the row for writing */
754
604
  r_pack_length= pack_row(buf);
755
605
 
756
 
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
 
606
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
757
607
  if (written != r_pack_length)
758
608
  {
759
609
    return(-1);
773
623
 
774
624
uint32_t ha_archive::max_row_length(const unsigned char *)
775
625
{
776
 
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
 
626
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
777
627
  length+= ARCHIVE_ROW_HEADER_SIZE;
778
628
 
779
629
  uint32_t *ptr, *end;
780
 
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
 
630
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
781
631
       ptr != end ;
782
632
       ptr++)
783
633
  {
784
 
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
 
634
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
785
635
  }
786
636
 
787
637
  return length;
793
643
  unsigned char *ptr;
794
644
 
795
645
  if (fix_rec_buff(max_row_length(record)))
796
 
    return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
 
646
    return(HA_ERR_OUT_OF_MEM);
797
647
 
798
648
  /* Copy null bits */
799
 
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
800
 
  ptr= record_buffer->buffer + table->s->null_bytes;
 
649
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
 
650
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
801
651
 
802
 
  for (Field **field=table->field ; *field ; field++)
 
652
  for (Field **field=getTable()->getFields() ; *field ; field++)
803
653
  {
804
654
    if (!((*field)->is_null()))
805
655
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
806
656
  }
807
657
 
808
 
  return((unsigned int) (ptr - record_buffer->buffer));
 
658
  return((unsigned int) (ptr - &record_buffer[0]));
809
659
}
810
660
 
811
661
 
818
668
  for implementing start_bulk_insert() is that we could skip
819
669
  setting dirty to true each time.
820
670
*/
821
 
int ha_archive::write_row(unsigned char *buf)
 
671
int ha_archive::doInsertRecord(unsigned char *buf)
822
672
{
823
673
  int rc;
824
674
  unsigned char *read_buf= NULL;
825
675
  uint64_t temp_auto;
826
 
  unsigned char *record=  table->record[0];
 
676
  unsigned char *record=  getTable()->getInsertRecord();
827
677
 
828
678
  if (share->crashed)
829
679
    return(HA_ERR_CRASHED_ON_USAGE);
830
680
 
831
 
  ha_statistic_increment(&SSV::ha_write_count);
832
 
  pthread_mutex_lock(&share->mutex);
 
681
  pthread_mutex_lock(&share->mutex());
833
682
 
834
683
  if (share->archive_write_open == false)
835
684
    if (init_archive_writer())
836
685
      return(HA_ERR_CRASHED_ON_USAGE);
837
686
 
838
687
 
839
 
  if (table->next_number_field && record == table->record[0])
 
688
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
840
689
  {
841
 
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
842
690
    update_auto_increment();
843
 
    temp_auto= table->next_number_field->val_int();
 
691
    temp_auto= getTable()->next_number_field->val_int();
844
692
 
845
693
    /*
846
694
      We don't support decremening auto_increment. They make the performance
847
695
      just cry.
848
696
    */
849
697
    if (temp_auto <= share->archive_write.auto_increment &&
850
 
        mkey->flags & HA_NOSAME)
 
698
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
851
699
    {
852
700
      rc= HA_ERR_FOUND_DUPP_KEY;
853
701
      goto error;
867
715
  share->rows_recorded++;
868
716
  rc= real_write_row(buf,  &(share->archive_write));
869
717
error:
870
 
  pthread_mutex_unlock(&share->mutex);
 
718
  pthread_mutex_unlock(&share->mutex());
871
719
  if (read_buf)
872
720
    free((unsigned char*) read_buf);
873
721
 
882
730
  *first_value= share->archive_write.auto_increment + 1;
883
731
}
884
732
 
885
 
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
886
 
int ha_archive::index_init(uint32_t keynr, bool)
 
733
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
 
734
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
887
735
{
888
736
  active_index= keynr;
889
737
  return(0);
895
743
  the optimizer that we have unique indexes, we scan
896
744
*/
897
745
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
898
 
                             uint32_t key_len, enum ha_rkey_function find_flag)
899
 
{
900
 
  int rc;
901
 
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
902
 
  return(rc);
903
 
}
904
 
 
905
 
 
906
 
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
907
 
                               uint32_t key_len, enum ha_rkey_function)
 
746
                             uint32_t key_len, enum ha_rkey_function)
908
747
{
909
748
  int rc;
910
749
  bool found= 0;
911
 
  KEY *mkey= &table->s->key_info[index];
912
 
  current_k_offset= mkey->key_part->offset;
 
750
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
913
751
  current_key= key;
914
752
  current_key_len= key_len;
915
753
 
916
 
  rc= rnd_init(true);
 
754
  rc= doStartTableScan(true);
917
755
 
918
756
  if (rc)
919
757
    goto error;
957
795
  we assume the position will be set.
958
796
*/
959
797
 
960
 
int ha_archive::rnd_init(bool scan)
 
798
int ha_archive::doStartTableScan(bool scan)
961
799
{
962
800
  if (share->crashed)
963
801
      return(HA_ERR_CRASHED_ON_USAGE);
994
832
/* Reallocate buffer if needed */
995
833
bool ha_archive::fix_rec_buff(unsigned int length)
996
834
{
997
 
  assert(record_buffer->buffer);
998
 
 
999
 
  if (length > record_buffer->length)
1000
 
  {
1001
 
    unsigned char *newptr;
1002
 
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
1003
 
      return(1);
1004
 
    record_buffer->buffer= newptr;
1005
 
    record_buffer->length= length;
1006
 
  }
1007
 
 
1008
 
  assert(length <= record_buffer->length);
1009
 
 
1010
 
  return(0);
 
835
  record_buffer.resize(length);
 
836
 
 
837
  return false;
1011
838
}
1012
839
 
1013
840
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
1025
852
  }
1026
853
 
1027
854
  /* Copy null bits */
1028
 
  memcpy(record, ptr, table->getNullBytes());
1029
 
  ptr+= table->getNullBytes();
1030
 
  for (Field **field=table->field ; *field ; field++)
 
855
  memcpy(record, ptr, getTable()->getNullBytes());
 
856
  ptr+= getTable()->getNullBytes();
 
857
  for (Field **field= getTable()->getFields() ; *field ; field++)
1031
858
  {
1032
859
    if (!((*field)->is_null()))
1033
860
    {
1034
 
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
 
861
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
1035
862
    }
1036
863
  }
1037
864
  return(0);
1062
889
    return(HA_ERR_END_OF_FILE);
1063
890
  scan_rows--;
1064
891
 
1065
 
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
892
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
1066
893
  current_position= aztell(&archive);
1067
894
  rc= get_row(&archive, buf);
1068
895
 
1069
 
  table->status=rc ? STATUS_NOT_FOUND: 0;
 
896
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
1070
897
 
1071
898
  return(rc);
1072
899
}
1073
900
 
1074
901
 
1075
902
/*
1076
 
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
 
903
  Thanks to the table bool is_ordered this will be called after
1077
904
  each call to ha_archive::rnd_next() if an ordering of the rows is
1078
905
  needed.
1079
906
*/
1080
907
 
1081
908
void ha_archive::position(const unsigned char *)
1082
909
{
1083
 
  my_store_ptr(ref, ref_length, current_position);
 
910
  internal::my_store_ptr(ref, ref_length, current_position);
1084
911
  return;
1085
912
}
1086
913
 
1094
921
 
1095
922
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
1096
923
{
1097
 
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1098
 
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
 
924
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
925
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
1099
926
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
1100
927
    return(HA_ERR_CRASHED_ON_USAGE);
1101
928
  return(get_row(&archive, buf));
1106
933
  rewriting the meta file. Currently it does this by calling optimize with
1107
934
  the extended flag.
1108
935
*/
1109
 
int ha_archive::repair(Session* session, HA_CHECK_OPT* check_opt)
 
936
int ha_archive::repair()
1110
937
{
1111
 
  check_opt->flags= T_EXTEND;
1112
 
  int rc= optimize(session, check_opt);
 
938
  int rc= optimize();
1113
939
 
1114
940
  if (rc)
1115
941
    return(HA_ERR_CRASHED_ON_REPAIR);
1122
948
  The table can become fragmented if data was inserted, read, and then
1123
949
  inserted again. What we do is open up the file and recompress it completely.
1124
950
*/
1125
 
int ha_archive::optimize(Session *, HA_CHECK_OPT *)
 
951
int ha_archive::optimize()
1126
952
{
1127
953
  int rc= 0;
1128
 
  azio_stream writer;
1129
 
  char writer_filename[FN_REFLEN];
 
954
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
1130
955
 
1131
956
  init_archive_reader();
1132
957
 
1146
971
  azread_frm(&archive, proto_string);
1147
972
 
1148
973
  /* Lets create a file to contain the new data */
1149
 
  fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1150
 
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
974
  std::string writer_filename= share->table_name;
 
975
  writer_filename.append(ARN);
1151
976
 
1152
 
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
977
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1153
978
  {
1154
979
    free(proto_string);
1155
980
    return(HA_ERR_CRASHED_ON_USAGE);
1156
981
  }
1157
982
 
1158
 
  azwrite_frm(&writer, proto_string, archive.frm_length);
 
983
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
1159
984
 
1160
985
  /*
1161
986
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1179
1004
    */
1180
1005
    if (!rc)
1181
1006
    {
1182
 
      uint64_t x;
1183
1007
      uint64_t rows_restored;
1184
1008
      share->rows_recorded= 0;
1185
1009
      stats.auto_increment_value= 1;
1187
1011
 
1188
1012
      rows_restored= archive.rows;
1189
1013
 
1190
 
      for (x= 0; x < rows_restored ; x++)
 
1014
      for (uint64_t x= 0; x < rows_restored ; x++)
1191
1015
      {
1192
 
        rc= get_row(&archive, table->record[0]);
 
1016
        rc= get_row(&archive, getTable()->getInsertRecord());
1193
1017
 
1194
1018
        if (rc != 0)
1195
1019
          break;
1196
1020
 
1197
 
        real_write_row(table->record[0], &writer);
 
1021
        real_write_row(getTable()->getInsertRecord(), writer.get());
1198
1022
        /*
1199
1023
          Long term it should be possible to optimize this so that
1200
1024
          it is not called on each row.
1201
1025
        */
1202
 
        if (table->found_next_number_field)
 
1026
        if (getTable()->found_next_number_field)
1203
1027
        {
1204
 
          Field *field= table->found_next_number_field;
 
1028
          Field *field= getTable()->found_next_number_field;
1205
1029
 
1206
1030
          /* Since we will need to use field to translate, we need to flip its read bit */
1207
1031
          field->setReadSet();
1208
1032
 
1209
1033
          uint64_t auto_value=
1210
 
            (uint64_t) field->val_int(table->record[0] +
1211
 
                                       field->offset(table->record[0]));
 
1034
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
 
1035
                                               field->offset(getTable()->getInsertRecord()));
1212
1036
          if (share->archive_write.auto_increment < auto_value)
1213
1037
            stats.auto_increment_value=
1214
1038
              (share->archive_write.auto_increment= auto_value) + 1;
1215
1039
        }
1216
1040
      }
1217
 
      share->rows_recorded= (ha_rows)writer.rows;
 
1041
      share->rows_recorded= (ha_rows)writer->rows;
1218
1042
    }
1219
1043
 
1220
1044
    if (rc && rc != HA_ERR_END_OF_FILE)
1223
1047
    }
1224
1048
  }
1225
1049
 
1226
 
  azclose(&writer);
 
1050
  azclose(writer.get());
1227
1051
  share->dirty= false;
1228
1052
 
1229
1053
  azclose(&archive);
1230
1054
 
1231
1055
  // make the file we just wrote be our data file
1232
 
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
 
1056
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1233
1057
 
1234
1058
  free(proto_string);
1235
1059
  return(rc);
1236
1060
error:
1237
1061
  free(proto_string);
1238
 
  azclose(&writer);
 
1062
  azclose(writer.get());
1239
1063
 
1240
1064
  return(rc);
1241
1065
}
1260
1084
 
1261
1085
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1262
1086
         lock_type <= TL_WRITE)
1263
 
        && !session_tablespace_op(session))
 
1087
        && ! session->doing_tablespace_operation())
1264
1088
      lock_type = TL_WRITE_ALLOW_WRITE;
1265
1089
 
1266
1090
    /*
1282
1106
  return to;
1283
1107
}
1284
1108
 
1285
 
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1286
 
{
1287
 
  ha_archive::info(HA_STATUS_AUTO);
1288
 
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1289
 
  {
1290
 
    create_info->auto_increment_value= stats.auto_increment_value;
1291
 
  }
1292
 
 
1293
 
  ssize_t sym_link_size= readlink(share->data_file_name,share->real_path,FN_REFLEN-1);
1294
 
  if (sym_link_size >= 0) {
1295
 
    share->real_path[sym_link_size]= '\0';
1296
 
    create_info->data_file_name= share->real_path;
1297
 
  }
1298
 
 
1299
 
  return;
1300
 
}
1301
 
 
1302
 
 
1303
1109
/*
1304
1110
  Hints for optimizer, see ha_tina for more information
1305
1111
*/
1309
1115
    If dirty, we lock, and then reset/flush the data.
1310
1116
    I found that just calling azflush() doesn't always work.
1311
1117
  */
1312
 
  pthread_mutex_lock(&share->mutex);
 
1118
  pthread_mutex_lock(&share->mutex());
1313
1119
  if (share->dirty == true)
1314
1120
  {
1315
1121
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1328
1134
    cause the number to be inaccurate.
1329
1135
  */
1330
1136
  stats.records= share->rows_recorded;
1331
 
  pthread_mutex_unlock(&share->mutex);
 
1137
  pthread_mutex_unlock(&share->mutex());
1332
1138
 
1333
1139
  scan_rows= stats.records;
1334
1140
  stats.deleted= 0;
1338
1144
  {
1339
1145
    struct stat file_stat;  // Stat information for the data file
1340
1146
 
1341
 
    stat(share->data_file_name, &file_stat);
 
1147
    stat(share->data_file_name.c_str(), &file_stat);
1342
1148
 
1343
 
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
 
1149
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
1344
1150
    stats.data_file_length= file_stat.st_size;
1345
1151
    stats.create_time= file_stat.st_ctime;
1346
1152
    stats.update_time= file_stat.st_mtime;
1352
1158
  if (flag & HA_STATUS_AUTO)
1353
1159
  {
1354
1160
    init_archive_reader();
1355
 
    pthread_mutex_lock(&share->mutex);
 
1161
    pthread_mutex_lock(&share->mutex());
1356
1162
    azflush(&archive, Z_SYNC_FLUSH);
1357
 
    pthread_mutex_unlock(&share->mutex);
 
1163
    pthread_mutex_unlock(&share->mutex());
1358
1164
    stats.auto_increment_value= archive.auto_increment + 1;
1359
1165
  }
1360
1166
 
1364
1170
 
1365
1171
/*
1366
1172
  This method tells us that a bulk insert operation is about to occur. We set
1367
 
  a flag which will keep write_row from saying that its data is dirty. This in
 
1173
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
1368
1174
  turn will keep selects from causing a sync to occur.
1369
1175
  Basically, yet another optimizations to keep compression working well.
1370
1176
*/
1398
1204
}
1399
1205
 
1400
1206
/*
1401
 
  We just return state if asked.
1402
 
*/
1403
 
bool ha_archive::is_crashed() const
1404
 
{
1405
 
  return(share->crashed);
1406
 
}
1407
 
 
1408
 
/*
1409
1207
  Simple scan of the tables to make sure everything is ok.
1410
1208
*/
1411
1209
 
1412
 
int ha_archive::check(Session* session, HA_CHECK_OPT *)
 
1210
int ha_archive::check(Session* session)
1413
1211
{
1414
1212
  int rc= 0;
1415
1213
  const char *old_proc_info;
1416
 
  uint64_t x;
1417
1214
 
1418
 
  old_proc_info= get_session_proc_info(session);
1419
 
  set_session_proc_info(session, "Checking table");
 
1215
  old_proc_info= session->get_proc_info();
 
1216
  session->set_proc_info("Checking table");
1420
1217
  /* Flush any waiting data */
1421
 
  pthread_mutex_lock(&share->mutex);
 
1218
  pthread_mutex_lock(&share->mutex());
1422
1219
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1423
 
  pthread_mutex_unlock(&share->mutex);
 
1220
  pthread_mutex_unlock(&share->mutex());
1424
1221
 
1425
1222
  /*
1426
1223
    Now we will rewind the archive file so that we are positioned at the
1429
1226
  init_archive_reader();
1430
1227
  azflush(&archive, Z_SYNC_FLUSH);
1431
1228
  read_data_header(&archive);
1432
 
  for (x= 0; x < share->archive_write.rows; x++)
 
1229
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
1433
1230
  {
1434
 
    rc= get_row(&archive, table->record[0]);
 
1231
    rc= get_row(&archive, getTable()->getInsertRecord());
1435
1232
 
1436
1233
    if (rc != 0)
1437
1234
      break;
1438
1235
  }
1439
1236
 
1440
 
  set_session_proc_info(session, old_proc_info);
 
1237
  session->set_proc_info(old_proc_info);
1441
1238
 
1442
1239
  if ((rc && rc != HA_ERR_END_OF_FILE))
1443
1240
  {
1450
1247
  }
1451
1248
}
1452
1249
 
1453
 
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1454
 
{
1455
 
  archive_record_buffer *r;
1456
 
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1457
 
  {
1458
 
    return(NULL); /* purecov: inspected */
1459
 
  }
1460
 
  r->length= (int)length;
1461
 
 
1462
 
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
1463
 
  {
1464
 
    free((char*) r);
1465
 
    return(NULL); /* purecov: inspected */
1466
 
  }
1467
 
 
1468
 
  return(r);
1469
 
}
1470
 
 
1471
 
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1472
 
{
1473
 
  free((char*) r->buffer);
1474
 
  free((char*) r);
1475
 
  return;
1476
 
}
1477
 
 
1478
 
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1479
 
  PLUGIN_VAR_NOCMDOPT,
1480
 
  "Whether or not to use asynchronous IO.",
1481
 
  NULL, NULL, true);
1482
 
 
1483
 
static struct st_mysql_sys_var* archive_system_variables[]= {
1484
 
  DRIZZLE_SYSVAR(aio),
1485
 
  NULL
1486
 
};
1487
 
 
1488
 
drizzle_declare_plugin(archive)
1489
 
{
1490
 
  "ARCHIVE",
1491
 
  "3.5",
1492
 
  "Brian Aker, MySQL AB",
1493
 
  "Archive storage engine",
1494
 
  PLUGIN_LICENSE_GPL,
1495
 
  archive_db_init, /* Plugin Init */
1496
 
  archive_db_done, /* Plugin Deinit */
1497
 
  NULL,                       /* status variables                */
1498
 
  archive_system_variables,   /* system variables                */
1499
 
  NULL                        /* config options                  */
1500
 
}
1501
 
drizzle_declare_plugin_end;
1502
 
 
 
1250
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
 
1251
{
 
1252
  int error= 0;
 
1253
 
 
1254
  for (const char **ext= bas_ext(); *ext ; ext++)
 
1255
  {
 
1256
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
 
1257
    {
 
1258
      if ((error=errno) != ENOENT)
 
1259
        break;
 
1260
      error= 0;
 
1261
    }
 
1262
  }
 
1263
 
 
1264
  return error;
 
1265
}
 
1266
 
 
1267
bool ArchiveEngine::doDoesTableExist(Session&,
 
1268
                                     const identifier::Table &identifier)
 
1269
{
 
1270
  string proto_path(identifier.getPath());
 
1271
  proto_path.append(ARZ);
 
1272
 
 
1273
  if (access(proto_path.c_str(), F_OK))
 
1274
  {
 
1275
    return false;
 
1276
  }
 
1277
 
 
1278
  return true;
 
1279
}
 
1280
 
 
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
1282
                                          const drizzled::identifier::Schema &schema_identifier,
 
1283
                                          drizzled::identifier::Table::vector &set_of_identifiers)
 
1284
{
 
1285
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
1286
 
 
1287
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
1288
       entry_iter != entries.end(); ++entry_iter)
 
1289
  {
 
1290
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
1291
    const string *filename= &entry->filename;
 
1292
 
 
1293
    assert(filename->size());
 
1294
 
 
1295
    const char *ext= strchr(filename->c_str(), '.');
 
1296
 
 
1297
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
1298
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
 
1299
    {  }
 
1300
    else
 
1301
    {
 
1302
      char uname[NAME_LEN + 1];
 
1303
      uint32_t file_name_len;
 
1304
 
 
1305
      file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
1306
      // TODO: Remove need for memory copy here
 
1307
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
1308
 
 
1309
      set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
 
1310
    }
 
1311
  }
 
1312
}