~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/ha_archive.cc

  • Committer: Monty Taylor
  • Date: 2009-03-06 03:33:24 UTC
  • mfrom: (916.1.2 merge)
  • Revision ID: mordred@inaugust.com-20090306033324-dcedf80g9qzywbvu
Merged Brian's merge... re-rotate the tree.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2003 MySQL AB
2
 
   Copyright (C) 2010 Brian Aker
3
2
 
4
3
  This program is free software; you can redistribute it and/or modify
5
4
  it under the terms of the GNU General Public License as published by
12
11
 
13
12
  You should have received a copy of the GNU General Public License
14
13
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
16
 
 
17
 
 
18
 
#include "config.h"
19
 
 
20
 
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
 
 
24
 
using namespace std;
25
 
using namespace drizzled;
26
 
 
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
#include <drizzled/server_includes.h>
 
18
#include <drizzled/field.h>
 
19
#include <drizzled/field/blob.h>
 
20
#include <drizzled/field/timestamp.h>
 
21
#include <storage/myisam/myisam.h>
 
22
#include <drizzled/table.h>
 
23
#include <drizzled/session.h>
 
24
#include <stdio.h>
 
25
 
 
26
#include <storage/archive/ha_archive.h>
27
27
 
28
28
/*
29
29
  First, if you want to understand storage engines you should look at
93
93
    -Brian
94
94
*/
95
95
 
96
 
/* When the engine starts up set the first version */
97
 
static uint64_t global_version= 1;
98
 
 
99
 
// We use this to find out the state of the archive aio option.
100
 
extern bool archive_aio_state(void);
 
96
/* Variables for archive share methods */
 
97
pthread_mutex_t archive_mutex;
 
98
static HASH archive_open_tables;
 
99
static unsigned int global_version;
 
100
 
 
101
/* The file extension */
 
102
#define ARZ ".ARZ"               // The data file
 
103
#define ARN ".ARN"               // Files used during an optimize call
 
104
 
 
105
 
 
106
/* Static declarations for handerton */
 
107
static handler *archive_create_handler(handlerton *hton,
 
108
                                       TABLE_SHARE *table,
 
109
                                       MEM_ROOT *mem_root);
 
110
 
 
111
static bool archive_use_aio= false;
101
112
 
102
113
/*
103
114
  Number of rows that will force a bulk insert.
109
120
*/
110
121
#define ARCHIVE_ROW_HEADER_SIZE 4
111
122
 
112
 
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
113
 
{
114
 
  ArchiveMap::iterator find_iter=
115
 
    archive_open_tables.find(table_name);
116
 
 
117
 
  if (find_iter != archive_open_tables.end())
118
 
    return (*find_iter).second;
119
 
  else
120
 
    return NULL;
121
 
}
122
 
 
123
 
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
124
 
{
125
 
  archive_open_tables[table_name]= share;
126
 
}
127
 
 
128
 
void ArchiveEngine::deleteOpenTable(const string &table_name)
129
 
{
130
 
  archive_open_tables.erase(table_name);
131
 
}
132
 
 
133
 
 
134
 
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
135
 
{
136
 
  string new_path(identifier.getPath());
137
 
 
138
 
  new_path+= ARZ;
139
 
 
140
 
  int error= unlink(new_path.c_str());
141
 
 
142
 
  if (error != 0)
143
 
  {
144
 
    error= errno= errno;
145
 
  }
146
 
 
147
 
  return error;
148
 
}
149
 
 
150
 
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const TableIdentifier &identifier,
152
 
                                        drizzled::message::Table &table_proto)
153
 
{
154
 
  struct stat stat_info;
155
 
  int error= ENOENT;
156
 
  string proto_path;
157
 
 
158
 
  proto_path.reserve(FN_REFLEN);
159
 
  proto_path.assign(identifier.getPath());
160
 
 
161
 
  proto_path.append(ARZ);
162
 
 
163
 
  if (stat(proto_path.c_str(),&stat_info))
164
 
    return errno;
165
 
  else
166
 
    error= EEXIST;
167
 
 
168
 
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
170
 
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
 
      return HA_ERR_CRASHED_ON_USAGE;
173
 
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
 
    if (proto_string == NULL)
176
 
    {
177
 
      azclose(proto_stream.get());
178
 
      return ENOMEM;
179
 
    }
180
 
 
181
 
    azread_frm(proto_stream.get(), proto_string);
182
 
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
 
      error= HA_ERR_CRASHED_ON_USAGE;
185
 
 
186
 
    azclose(proto_stream.get());
187
 
    free(proto_string);
188
 
  }
189
 
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
 
  return error;
197
 
}
198
 
 
199
 
 
200
 
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
202
 
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
 
123
static handler *archive_create_handler(handlerton *hton,
 
124
                                       TABLE_SHARE *table,
 
125
                                       MEM_ROOT *mem_root)
 
126
{
 
127
  return new (mem_root) ha_archive(hton, table);
 
128
}
 
129
 
 
130
/*
 
131
  Used for hash table that tracks open tables.
 
132
*/
 
133
static unsigned char* archive_get_key(ARCHIVE_SHARE *share, size_t *length, bool)
 
134
{
 
135
  *length=share->table_name_length;
 
136
  return (unsigned char*) share->table_name;
 
137
}
 
138
 
 
139
 
 
140
/*
 
141
  Initialize the archive handler.
 
142
 
 
143
  SYNOPSIS
 
144
    archive_db_init()
 
145
    void *
 
146
 
 
147
  RETURN
 
148
    false       OK
 
149
    true        Error
 
150
*/
 
151
 
 
152
int archive_db_init(void *p)
 
153
{
 
154
  handlerton *archive_hton;
 
155
 
 
156
  archive_hton= (handlerton *)p;
 
157
  archive_hton->state= SHOW_OPTION_YES;
 
158
  archive_hton->create= archive_create_handler;
 
159
  archive_hton->flags= HTON_NO_FLAGS;
 
160
 
 
161
  /* When the engine starts up set the first version */
 
162
  global_version= 1;
 
163
 
 
164
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
 
165
    goto error;
 
166
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
 
167
                (hash_get_key) archive_get_key, 0, 0))
 
168
  {
 
169
    pthread_mutex_destroy(&archive_mutex);
 
170
  }
 
171
  else
 
172
  {
 
173
    return(false);
 
174
  }
 
175
error:
 
176
  return(true);
 
177
}
 
178
 
 
179
/*
 
180
  Release the archive handler.
 
181
 
 
182
  SYNOPSIS
 
183
    archive_db_done()
 
184
    void
 
185
 
 
186
  RETURN
 
187
    false       OK
 
188
*/
 
189
 
 
190
int archive_db_done(void *)
 
191
{
 
192
  hash_free(&archive_open_tables);
 
193
  pthread_mutex_destroy(&archive_mutex);
 
194
 
 
195
  return 0;
 
196
}
 
197
 
 
198
 
 
199
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
 
200
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
203
201
{
204
202
  /* Set our original buffer from pre-allocated memory */
205
203
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
206
204
 
207
205
  /* The size of the offset value we will use for position() */
208
 
  ref_length= sizeof(internal::my_off_t);
 
206
  ref_length= sizeof(my_off_t);
209
207
  archive_reader_open= false;
210
208
}
211
209
 
223
221
  return(1);
224
222
}
225
223
 
226
 
ArchiveShare::ArchiveShare():
227
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
229
 
{
230
 
  assert(1);
231
 
}
232
 
 
233
 
ArchiveShare::ArchiveShare(const char *name):
234
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
236
 
{
237
 
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
 
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
241
 
  /*
242
 
    We will use this lock for rows.
243
 
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
245
 
}
246
 
 
247
 
ArchiveShare::~ArchiveShare()
248
 
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
251
 
  /*
252
 
    We need to make sure we don't reset the crashed state.
253
 
    If we open a crashed file, wee need to close it as crashed unless
254
 
    it has been repaired.
255
 
    Since we will close the data down after this, we go on and count
256
 
    the flush on close;
257
 
  */
258
 
  if (archive_write_open == true)
259
 
    (void)azclose(&archive_write);
260
 
}
261
 
 
262
 
bool ArchiveShare::prime(uint64_t *auto_increment)
263
 
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
265
 
 
266
 
  /*
267
 
    We read the meta file, but do not mark it dirty. Since we are not
268
 
    doing a write we won't mark it dirty (and we won't open it for
269
 
    anything but reading... open it for write and we will generate null
270
 
    compression writes).
271
 
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
273
 
               AZ_METHOD_BLOCK)))
274
 
    return false;
275
 
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
279
 
  if (version < global_version)
280
 
  {
281
 
    version_rows= rows_recorded;
282
 
    version= global_version;
283
 
  }
284
 
  azclose(archive_tmp.get());
285
 
 
286
 
  return true;
287
 
}
288
 
 
289
224
 
290
225
/*
291
226
  We create the shared memory space that we will use for the open table.
294
229
 
295
230
  See ha_example.cc for a longer description.
296
231
*/
297
 
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
 
232
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
298
233
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
303
 
  share= a_engine->findOpenTable(table_name);
304
 
 
305
 
  if (!share)
 
234
  uint32_t length;
 
235
 
 
236
  pthread_mutex_lock(&archive_mutex);
 
237
  length=(uint) strlen(table_name);
 
238
 
 
239
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
 
240
                                           (unsigned char*) table_name,
 
241
                                           length)))
306
242
  {
307
 
    share= new ArchiveShare(table_name);
 
243
    char *tmp_name;
 
244
    azio_stream archive_tmp;
308
245
 
309
 
    if (share == NULL)
 
246
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
247
                          &share, sizeof(*share),
 
248
                          &tmp_name, length+1,
 
249
                          NULL))
310
250
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
251
      pthread_mutex_unlock(&archive_mutex);
312
252
      *rc= HA_ERR_OUT_OF_MEM;
313
253
      return(NULL);
314
254
    }
315
255
 
316
 
    if (share->prime(&stats.auto_increment_value) == false)
 
256
    share->use_count= 0;
 
257
    share->table_name_length= length;
 
258
    share->table_name= tmp_name;
 
259
    share->crashed= false;
 
260
    share->archive_write_open= false;
 
261
    fn_format(share->data_file_name, table_name, "",
 
262
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
263
    strcpy(share->table_name, table_name);
 
264
    /*
 
265
      We will use this lock for rows.
 
266
    */
 
267
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
 
268
 
 
269
    /*
 
270
      We read the meta file, but do not mark it dirty. Since we are not
 
271
      doing a write we won't mark it dirty (and we won't open it for
 
272
      anything but reading... open it for write and we will generate null
 
273
      compression writes).
 
274
    */
 
275
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY,
 
276
                 AZ_METHOD_BLOCK)))
317
277
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
278
      pthread_mutex_destroy(&share->mutex);
 
279
      free(share);
 
280
      pthread_mutex_unlock(&archive_mutex);
319
281
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
 
      delete share;
321
 
 
322
 
      return NULL;
323
 
    }
324
 
 
325
 
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
282
      return(NULL);
 
283
    }
 
284
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
 
285
    share->rows_recorded= (ha_rows)archive_tmp.rows;
 
286
    share->crashed= archive_tmp.dirty;
 
287
    if (share->version < global_version)
 
288
    {
 
289
      share->version_rows= share->rows_recorded;
 
290
      share->version= global_version;
 
291
    }
 
292
    azclose(&archive_tmp);
 
293
 
 
294
    my_hash_insert(&archive_open_tables, (unsigned char*) share);
 
295
    thr_lock_init(&share->lock);
327
296
  }
328
297
  share->use_count++;
329
 
 
330
298
  if (share->crashed)
331
299
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
300
  pthread_mutex_unlock(&archive_mutex);
333
301
 
334
302
  return(share);
335
303
}
341
309
*/
342
310
int ha_archive::free_share()
343
311
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
312
  int rc= 0;
345
313
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
314
  pthread_mutex_lock(&archive_mutex);
347
315
  if (!--share->use_count)
348
316
  {
349
 
    a_engine->deleteOpenTable(share->table_name);
350
 
    delete share;
 
317
    hash_delete(&archive_open_tables, (unsigned char*) share);
 
318
    thr_lock_delete(&share->lock);
 
319
    pthread_mutex_destroy(&share->mutex);
 
320
    /*
 
321
      We need to make sure we don't reset the crashed state.
 
322
      If we open a crashed file, wee need to close it as crashed unless
 
323
      it has been repaired.
 
324
      Since we will close the data down after this, we go on and count
 
325
      the flush on close;
 
326
    */
 
327
    if (share->archive_write_open == true)
 
328
    {
 
329
      if (azclose(&(share->archive_write)))
 
330
        rc= 1;
 
331
    }
 
332
    free((unsigned char*) share);
351
333
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
334
  pthread_mutex_unlock(&archive_mutex);
353
335
 
354
 
  return 0;
 
336
  return(rc);
355
337
}
356
338
 
357
339
int ha_archive::init_archive_writer()
361
343
    a gzip file that can be both read and written we keep a writer open
362
344
    that is shared amoung all open tables.
363
345
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
346
  if (!(azopen(&(share->archive_write), share->data_file_name,
365
347
               O_RDWR, AZ_METHOD_BLOCK)))
366
348
  {
367
349
    share->crashed= true;
374
356
 
375
357
 
376
358
/*
377
 
  No locks are required because it is associated with just one Cursor instance
 
359
  No locks are required because it is associated with just one handler instance
378
360
*/
379
361
int ha_archive::init_archive_reader()
380
362
{
387
369
  {
388
370
    az_method method;
389
371
 
390
 
    if (archive_aio_state())
 
372
    switch (archive_use_aio)
391
373
    {
 
374
    case false:
 
375
      method= AZ_METHOD_BLOCK;
 
376
      break;
 
377
    case true:
392
378
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
379
      break;
 
380
    default:
396
381
      method= AZ_METHOD_BLOCK;
397
382
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
383
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
399
384
                 method)))
400
385
    {
401
386
      share->crashed= true;
407
392
  return(0);
408
393
}
409
394
 
 
395
 
 
396
/*
 
397
  We just implement one additional file extension.
 
398
*/
 
399
static const char *ha_archive_exts[] = {
 
400
  ARZ,
 
401
  NULL
 
402
};
 
403
 
 
404
const char **ha_archive::bas_ext() const
 
405
{
 
406
  return ha_archive_exts;
 
407
}
 
408
 
 
409
 
410
410
/*
411
411
  When opening a file we:
412
412
  Create/get our shared structure.
413
413
  Init out lock.
414
414
  We open the file we will read from.
415
415
*/
416
 
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
416
int ha_archive::open(const char *name, int, uint32_t open_options)
417
417
{
418
418
  int rc= 0;
419
 
  share= get_share(identifier.getPath().c_str(), &rc);
420
 
 
421
 
  /** 
422
 
    We either fix it ourselves, or we just take it offline 
423
 
 
424
 
    @todo Create some documentation in the recovery tools shipped with the engine.
425
 
  */
426
 
  if (rc == HA_ERR_CRASHED_ON_USAGE)
 
419
  share= get_share(name, &rc);
 
420
 
 
421
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
427
422
  {
 
423
    /* purecov: begin inspected */
428
424
    free_share();
429
 
    rc= repair();
430
 
 
431
 
    return 0;
 
425
    return(rc);
 
426
    /* purecov: end */
432
427
  }
433
428
  else if (rc == HA_ERR_OUT_OF_MEM)
434
429
  {
437
432
 
438
433
  assert(share);
439
434
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
443
 
 
444
 
  return(rc);
445
 
}
446
 
 
447
 
// Should never be called
448
 
int ha_archive::open(const char *, int, uint32_t)
449
 
{
450
 
  assert(0);
451
 
  return -1;
 
435
  record_buffer= create_record_buffer(table->s->reclength +
 
436
                                      ARCHIVE_ROW_HEADER_SIZE);
 
437
 
 
438
  if (!record_buffer)
 
439
  {
 
440
    free_share();
 
441
    return(HA_ERR_OUT_OF_MEM);
 
442
  }
 
443
 
 
444
  thr_lock_data_init(&share->lock, &lock, NULL);
 
445
 
 
446
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
 
447
  {
 
448
    return(0);
 
449
  }
 
450
  else
 
451
    return(rc);
452
452
}
453
453
 
454
454
 
473
473
{
474
474
  int rc= 0;
475
475
 
476
 
  record_buffer.clear();
 
476
  destroy_record_buffer(record_buffer);
477
477
 
478
478
  /* First close stream */
479
479
  if (archive_reader_open == true)
497
497
  of creation.
498
498
*/
499
499
 
500
 
int ArchiveEngine::doCreateTable(Session &,
501
 
                                 Table& table_arg,
502
 
                                 const drizzled::TableIdentifier &identifier,
503
 
                                 drizzled::message::Table& proto)
 
500
int ha_archive::create(const char *name, Table *table_arg,
 
501
                       HA_CREATE_INFO *create_info)
504
502
{
 
503
  char name_buff[FN_REFLEN];
 
504
  char linkname[FN_REFLEN];
505
505
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
507
 
  uint64_t auto_increment_value;
508
 
  string serialized_proto;
509
 
 
510
 
  auto_increment_value= proto.options().auto_increment_value();
511
 
 
512
 
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
 
506
  azio_stream create_stream;            /* Archive file we are working with */
 
507
  FILE *frm_file;                   /* File handler for readers */
 
508
  struct stat file_stat;
 
509
  unsigned char *frm_ptr;
 
510
  int r;
 
511
 
 
512
  stats.auto_increment_value= create_info->auto_increment_value;
 
513
 
 
514
  for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
513
515
  {
514
 
    KeyInfo *pos= &table_arg.key_info[key];
515
 
    KeyPartInfo *key_part=     pos->key_part;
516
 
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
516
    KEY *pos= table_arg->key_info+key;
 
517
    KEY_PART_INFO *key_part=     pos->key_part;
 
518
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
517
519
 
518
520
    for (; key_part != key_part_end; key_part++)
519
521
    {
521
523
 
522
524
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
525
      {
524
 
        return -1;
 
526
        error= -1;
 
527
        goto error;
525
528
      }
526
529
    }
527
530
  }
528
531
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
531
 
 
532
 
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
 
             AZ_METHOD_BLOCK) == 0)
 
532
  /*
 
533
    We reuse name_buff since it is available.
 
534
  */
 
535
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
 
536
  {
 
537
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
 
538
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
539
    fn_format(linkname, name, "", ARZ,
 
540
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
541
  }
 
542
  else
 
543
  {
 
544
    fn_format(name_buff, name, "", ARZ,
 
545
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
546
    linkname[0]= 0;
 
547
  }
 
548
 
 
549
  /*
 
550
    There is a chance that the file was "discovered". In this case
 
551
    just use whatever file is there.
 
552
  */
 
553
  r= stat(name_buff, &file_stat);
 
554
  if (r == -1 && errno!=ENOENT)
 
555
  {
 
556
    return errno;
 
557
  }
 
558
  if (!r)
 
559
    return HA_ERR_TABLE_EXIST;
 
560
 
 
561
  my_errno= 0;
 
562
  if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
 
563
               AZ_METHOD_BLOCK)))
535
564
  {
536
565
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
540
 
  }
541
 
 
542
 
  try {
543
 
    proto.SerializeToString(&serialized_proto);
544
 
  }
545
 
  catch (...)
546
 
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
550
 
  }
551
 
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
 
                  serialized_proto.length()))
554
 
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
558
 
  }
559
 
 
560
 
  if (proto.options().has_comment())
561
 
  {
562
 
    int write_length;
563
 
 
564
 
    write_length= azwrite_comment(create_stream.get(),
565
 
                                  proto.options().comment().c_str(),
566
 
                                  proto.options().comment().length());
567
 
 
568
 
    if (write_length < 0)
 
566
    goto error2;
 
567
  }
 
568
 
 
569
  if (linkname[0])
 
570
    my_symlink(name_buff, linkname, MYF(0));
 
571
  fn_format(name_buff, name, "", ".frm",
 
572
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
573
 
 
574
  /*
 
575
    Here is where we open up the frm and pass it to archive to store
 
576
  */
 
577
  if ((frm_file= fopen(name_buff, "r")) > 0)
 
578
  {
 
579
    if (fstat(fileno(frm_file), &file_stat))
569
580
    {
570
 
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
 
581
      if ((uint64_t)file_stat.st_size > SIZE_MAX)
 
582
      {
 
583
        error= ENOMEM;
 
584
        goto error2;
 
585
      }
 
586
      frm_ptr= (unsigned char *)malloc((size_t)file_stat.st_size);
 
587
      if (frm_ptr)
 
588
      {
 
589
        size_t length_io;
 
590
        length_io= read(fileno(frm_file), frm_ptr, (size_t)file_stat.st_size);
 
591
 
 
592
        if (length_io != (size_t)file_stat.st_size)
 
593
        {
 
594
          free(frm_ptr);
 
595
          goto error2;
 
596
        }
 
597
 
 
598
        length_io= azwrite_frm(&create_stream, (char *)frm_ptr, (size_t)file_stat.st_size);
 
599
 
 
600
        if (length_io != (size_t)file_stat.st_size)
 
601
        {
 
602
          free(frm_ptr);
 
603
          goto error2;
 
604
        }
 
605
 
 
606
        free(frm_ptr);
 
607
      }
574
608
    }
 
609
    fclose(frm_file);
 
610
  }
 
611
 
 
612
  if (create_info->comment.str)
 
613
  {
 
614
    size_t write_length;
 
615
 
 
616
    write_length= azwrite_comment(&create_stream, create_info->comment.str,
 
617
                                  (unsigned int)create_info->comment.length);
 
618
 
 
619
    if (write_length == (size_t)create_info->comment.length)
 
620
      goto error2;
575
621
  }
576
622
 
577
623
  /*
578
624
    Yes you need to do this, because the starting value
579
625
    for the autoincrement may not be zero.
580
626
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
582
 
    auto_increment_value - 1 : 0;
583
 
 
584
 
  if (azclose(create_stream.get()))
 
627
  create_stream.auto_increment= stats.auto_increment_value ?
 
628
    stats.auto_increment_value - 1 : 0;
 
629
  if (azclose(&create_stream))
585
630
  {
586
631
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
632
    goto error2;
590
633
  }
591
634
 
592
635
  return(0);
 
636
 
 
637
error2:
 
638
  delete_table(name);
 
639
error:
 
640
  /* Return error number, if we got one */
 
641
  return(error ? error : -1);
593
642
}
594
643
 
595
644
/*
603
652
  /* We pack the row for writing */
604
653
  r_pack_length= pack_row(buf);
605
654
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
655
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
656
  if (written != r_pack_length)
608
657
  {
609
658
    return(-1);
623
672
 
624
673
uint32_t ha_archive::max_row_length(const unsigned char *)
625
674
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
675
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
676
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
677
 
629
678
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
679
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
680
       ptr != end ;
632
681
       ptr++)
633
682
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
683
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
635
684
  }
636
685
 
637
686
  return length;
643
692
  unsigned char *ptr;
644
693
 
645
694
  if (fix_rec_buff(max_row_length(record)))
646
 
    return(HA_ERR_OUT_OF_MEM);
 
695
    return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
647
696
 
648
697
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
698
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
699
  ptr= record_buffer->buffer + table->s->null_bytes;
651
700
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
701
  for (Field **field=table->field ; *field ; field++)
653
702
  {
654
703
    if (!((*field)->is_null()))
655
704
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
705
  }
657
706
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
707
  return((unsigned int) (ptr - record_buffer->buffer));
659
708
}
660
709
 
661
710
 
668
717
  for implementing start_bulk_insert() is that we could skip
669
718
  setting dirty to true each time.
670
719
*/
671
 
int ha_archive::doInsertRecord(unsigned char *buf)
 
720
int ha_archive::write_row(unsigned char *buf)
672
721
{
673
722
  int rc;
674
723
  unsigned char *read_buf= NULL;
675
724
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
725
  unsigned char *record=  table->record[0];
677
726
 
678
727
  if (share->crashed)
679
728
    return(HA_ERR_CRASHED_ON_USAGE);
680
729
 
681
 
  pthread_mutex_lock(&share->mutex());
 
730
  ha_statistic_increment(&SSV::ha_write_count);
 
731
  pthread_mutex_lock(&share->mutex);
682
732
 
683
733
  if (share->archive_write_open == false)
684
734
    if (init_archive_writer())
685
735
      return(HA_ERR_CRASHED_ON_USAGE);
686
736
 
687
737
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
738
  if (table->next_number_field && record == table->record[0])
689
739
  {
 
740
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
741
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
742
    temp_auto= table->next_number_field->val_int();
692
743
 
693
744
    /*
694
745
      We don't support decremening auto_increment. They make the performance
695
746
      just cry.
696
747
    */
697
748
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
749
        mkey->flags & HA_NOSAME)
699
750
    {
700
751
      rc= HA_ERR_FOUND_DUPP_KEY;
701
752
      goto error;
702
753
    }
 
754
#ifdef DEAD_CODE
 
755
    /*
 
756
      Bad news, this will cause a search for the unique value which is very
 
757
      expensive since we will have to do a table scan which will lock up
 
758
      all other writers during this period. This could perhaps be optimized
 
759
      in the future.
 
760
    */
 
761
    {
 
762
      /*
 
763
        First we create a buffer that we can use for reading rows, and can pass
 
764
        to get_row().
 
765
      */
 
766
      if (!(read_buf= (unsigned char*) malloc(table->s->reclength)))
 
767
      {
 
768
        rc= HA_ERR_OUT_OF_MEM;
 
769
        goto error;
 
770
      }
 
771
       /*
 
772
         All of the buffer must be written out or we won't see all of the
 
773
         data
 
774
       */
 
775
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
776
      /*
 
777
        Set the position of the local read thread to the beginning postion.
 
778
      */
 
779
      if (read_data_header(&archive))
 
780
      {
 
781
        rc= HA_ERR_CRASHED_ON_USAGE;
 
782
        goto error;
 
783
      }
 
784
 
 
785
      Field *mfield= table->next_number_field;
 
786
 
 
787
      while (!(get_row(&archive, read_buf)))
 
788
      {
 
789
        if (!memcmp(read_buf + mfield->offset(record),
 
790
                    table->next_number_field->ptr,
 
791
                    mfield->max_display_length()))
 
792
        {
 
793
          rc= HA_ERR_FOUND_DUPP_KEY;
 
794
          goto error;
 
795
        }
 
796
      }
 
797
    }
 
798
#endif
703
799
    else
704
800
    {
705
801
      if (temp_auto > share->archive_write.auto_increment)
715
811
  share->rows_recorded++;
716
812
  rc= real_write_row(buf,  &(share->archive_write));
717
813
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
814
  pthread_mutex_unlock(&share->mutex);
719
815
  if (read_buf)
720
816
    free((unsigned char*) read_buf);
721
817
 
730
826
  *first_value= share->archive_write.auto_increment + 1;
731
827
}
732
828
 
733
 
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
 
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
829
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
830
int ha_archive::index_init(uint32_t keynr, bool)
735
831
{
736
832
  active_index= keynr;
737
833
  return(0);
743
839
  the optimizer that we have unique indexes, we scan
744
840
*/
745
841
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
 
                             uint32_t key_len, enum ha_rkey_function)
 
842
                             uint32_t key_len, enum ha_rkey_function find_flag)
 
843
{
 
844
  int rc;
 
845
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
 
846
  return(rc);
 
847
}
 
848
 
 
849
 
 
850
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
 
851
                               uint32_t key_len, enum ha_rkey_function)
747
852
{
748
853
  int rc;
749
854
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
855
  KEY *mkey= &table->s->key_info[index];
 
856
  current_k_offset= mkey->key_part->offset;
751
857
  current_key= key;
752
858
  current_key_len= key_len;
753
859
 
754
 
  rc= doStartTableScan(true);
 
860
  rc= rnd_init(true);
755
861
 
756
862
  if (rc)
757
863
    goto error;
795
901
  we assume the position will be set.
796
902
*/
797
903
 
798
 
int ha_archive::doStartTableScan(bool scan)
 
904
int ha_archive::rnd_init(bool scan)
799
905
{
800
906
  if (share->crashed)
801
907
      return(HA_ERR_CRASHED_ON_USAGE);
832
938
/* Reallocate buffer if needed */
833
939
bool ha_archive::fix_rec_buff(unsigned int length)
834
940
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
941
  assert(record_buffer->buffer);
 
942
 
 
943
  if (length > record_buffer->length)
 
944
  {
 
945
    unsigned char *newptr;
 
946
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
947
      return(1);
 
948
    record_buffer->buffer= newptr;
 
949
    record_buffer->length= length;
 
950
  }
 
951
 
 
952
  assert(length <= record_buffer->length);
 
953
 
 
954
  return(0);
838
955
}
839
956
 
840
957
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
969
  }
853
970
 
854
971
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
972
  memcpy(record, ptr, table->getNullBytes());
 
973
  ptr+= table->getNullBytes();
 
974
  for (Field **field=table->field ; *field ; field++)
858
975
  {
859
976
    if (!((*field)->is_null()))
860
977
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
978
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
979
    }
863
980
  }
864
981
  return(0);
889
1006
    return(HA_ERR_END_OF_FILE);
890
1007
  scan_rows--;
891
1008
 
892
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1009
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
893
1010
  current_position= aztell(&archive);
894
1011
  rc= get_row(&archive, buf);
895
1012
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
1013
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
1014
 
898
1015
  return(rc);
899
1016
}
900
1017
 
901
1018
 
902
1019
/*
903
 
  Thanks to the table bool is_ordered this will be called after
 
1020
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
904
1021
  each call to ha_archive::rnd_next() if an ordering of the rows is
905
1022
  needed.
906
1023
*/
907
1024
 
908
1025
void ha_archive::position(const unsigned char *)
909
1026
{
910
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1027
  my_store_ptr(ref, ref_length, current_position);
911
1028
  return;
912
1029
}
913
1030
 
921
1038
 
922
1039
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
923
1040
{
924
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
925
 
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
 
1041
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1042
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
926
1043
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
927
1044
    return(HA_ERR_CRASHED_ON_USAGE);
928
1045
  return(get_row(&archive, buf));
933
1050
  rewriting the meta file. Currently it does this by calling optimize with
934
1051
  the extended flag.
935
1052
*/
936
 
int ha_archive::repair()
 
1053
int ha_archive::repair(Session* session, HA_CHECK_OPT* check_opt)
937
1054
{
938
 
  int rc= optimize();
 
1055
  check_opt->flags= T_EXTEND;
 
1056
  int rc= optimize(session, check_opt);
939
1057
 
940
1058
  if (rc)
941
1059
    return(HA_ERR_CRASHED_ON_REPAIR);
948
1066
  The table can become fragmented if data was inserted, read, and then
949
1067
  inserted again. What we do is open up the file and recompress it completely.
950
1068
*/
951
 
int ha_archive::optimize()
 
1069
int ha_archive::optimize(Session *, HA_CHECK_OPT *)
952
1070
{
953
1071
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1072
  azio_stream writer;
 
1073
  char writer_filename[FN_REFLEN];
955
1074
 
956
1075
  init_archive_reader();
957
1076
 
962
1081
    share->archive_write_open= false;
963
1082
  }
964
1083
 
965
 
  char* proto_string;
966
 
  proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
 
  if (proto_string == NULL)
968
 
  {
969
 
    return ENOMEM;
970
 
  }
971
 
  azread_frm(&archive, proto_string);
972
 
 
973
1084
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
 
1085
  fn_format(writer_filename, share->table_name, "", ARN,
 
1086
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
976
1087
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
 
  {
979
 
    free(proto_string);
 
1088
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
980
1089
    return(HA_ERR_CRASHED_ON_USAGE);
981
 
  }
982
 
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
984
1090
 
985
1091
  /*
986
1092
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1004
1110
    */
1005
1111
    if (!rc)
1006
1112
    {
 
1113
      uint64_t x;
1007
1114
      uint64_t rows_restored;
1008
1115
      share->rows_recorded= 0;
1009
1116
      stats.auto_increment_value= 1;
1011
1118
 
1012
1119
      rows_restored= archive.rows;
1013
1120
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1121
      for (x= 0; x < rows_restored ; x++)
1015
1122
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1123
        rc= get_row(&archive, table->record[0]);
1017
1124
 
1018
1125
        if (rc != 0)
1019
1126
          break;
1020
1127
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1128
        real_write_row(table->record[0], &writer);
1022
1129
        /*
1023
1130
          Long term it should be possible to optimize this so that
1024
1131
          it is not called on each row.
1025
1132
        */
1026
 
        if (getTable()->found_next_number_field)
 
1133
        if (table->found_next_number_field)
1027
1134
        {
1028
 
          Field *field= getTable()->found_next_number_field;
1029
 
 
1030
 
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
 
          field->setReadSet();
1032
 
 
 
1135
          Field *field= table->found_next_number_field;
1033
1136
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1137
            (uint64_t) field->val_int(table->record[0] +
 
1138
                                       field->offset(table->record[0]));
1036
1139
          if (share->archive_write.auto_increment < auto_value)
1037
1140
            stats.auto_increment_value=
1038
1141
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1142
        }
1040
1143
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1144
      share->rows_recorded= (ha_rows)writer.rows;
1042
1145
    }
1043
1146
 
1044
1147
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1150
    }
1048
1151
  }
1049
1152
 
1050
 
  azclose(writer.get());
 
1153
  azclose(&writer);
1051
1154
  share->dirty= false;
1052
1155
 
1053
1156
  azclose(&archive);
1054
1157
 
1055
1158
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1057
 
 
1058
 
  free(proto_string);
 
1159
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
 
1160
 
 
1161
 
1059
1162
  return(rc);
1060
1163
error:
1061
 
  free(proto_string);
1062
 
  azclose(writer.get());
 
1164
  azclose(&writer);
1063
1165
 
1064
1166
  return(rc);
1065
1167
}
1071
1173
                                       THR_LOCK_DATA **to,
1072
1174
                                       enum thr_lock_type lock_type)
1073
1175
{
1074
 
  delayed_insert= false;
 
1176
  if (lock_type == TL_WRITE_DELAYED)
 
1177
    delayed_insert= true;
 
1178
  else
 
1179
    delayed_insert= false;
1075
1180
 
1076
1181
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1077
1182
  {
1083
1188
    */
1084
1189
 
1085
1190
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
 
         lock_type <= TL_WRITE)
 
1191
         lock_type <= TL_WRITE) && !session_in_lock_tables(session)
1087
1192
        && !session_tablespace_op(session))
1088
1193
      lock_type = TL_WRITE_ALLOW_WRITE;
1089
1194
 
1095
1200
      concurrent inserts to t2.
1096
1201
    */
1097
1202
 
1098
 
    if (lock_type == TL_READ_NO_INSERT)
 
1203
    if (lock_type == TL_READ_NO_INSERT && !session_in_lock_tables(session))
1099
1204
      lock_type = TL_READ;
1100
1205
 
1101
1206
    lock.type=lock_type;
1106
1211
  return to;
1107
1212
}
1108
1213
 
 
1214
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
 
1215
{
 
1216
  ha_archive::info(HA_STATUS_AUTO);
 
1217
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
 
1218
  {
 
1219
    create_info->auto_increment_value= stats.auto_increment_value;
 
1220
  }
 
1221
 
 
1222
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
 
1223
    create_info->data_file_name= share->real_path;
 
1224
 
 
1225
  return;
 
1226
}
 
1227
 
 
1228
 
1109
1229
/*
1110
1230
  Hints for optimizer, see ha_tina for more information
1111
1231
*/
1115
1235
    If dirty, we lock, and then reset/flush the data.
1116
1236
    I found that just calling azflush() doesn't always work.
1117
1237
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1238
  pthread_mutex_lock(&share->mutex);
1119
1239
  if (share->dirty == true)
1120
1240
  {
1121
1241
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1254
    cause the number to be inaccurate.
1135
1255
  */
1136
1256
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1257
  pthread_mutex_unlock(&share->mutex);
1138
1258
 
1139
1259
  scan_rows= stats.records;
1140
1260
  stats.deleted= 0;
1144
1264
  {
1145
1265
    struct stat file_stat;  // Stat information for the data file
1146
1266
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1267
    stat(share->data_file_name, &file_stat);
1148
1268
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1269
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1270
    stats.data_file_length= file_stat.st_size;
1151
1271
    stats.create_time= file_stat.st_ctime;
1152
1272
    stats.update_time= file_stat.st_mtime;
1158
1278
  if (flag & HA_STATUS_AUTO)
1159
1279
  {
1160
1280
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1281
    pthread_mutex_lock(&share->mutex);
1162
1282
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1283
    pthread_mutex_unlock(&share->mutex);
1164
1284
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1285
  }
1166
1286
 
1170
1290
 
1171
1291
/*
1172
1292
  This method tells us that a bulk insert operation is about to occur. We set
1173
 
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
 
1293
  a flag which will keep write_row from saying that its data is dirty. This in
1174
1294
  turn will keep selects from causing a sync to occur.
1175
1295
  Basically, yet another optimizations to keep compression working well.
1176
1296
*/
1204
1324
}
1205
1325
 
1206
1326
/*
 
1327
  We just return state if asked.
 
1328
*/
 
1329
bool ha_archive::is_crashed() const
 
1330
{
 
1331
  return(share->crashed);
 
1332
}
 
1333
 
 
1334
/*
1207
1335
  Simple scan of the tables to make sure everything is ok.
1208
1336
*/
1209
1337
 
1210
 
int ha_archive::check(Session* session)
 
1338
int ha_archive::check(Session* session, HA_CHECK_OPT *)
1211
1339
{
1212
1340
  int rc= 0;
1213
1341
  const char *old_proc_info;
 
1342
  uint64_t x;
1214
1343
 
1215
1344
  old_proc_info= get_session_proc_info(session);
1216
1345
  set_session_proc_info(session, "Checking table");
1217
1346
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1347
  pthread_mutex_lock(&share->mutex);
1219
1348
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1349
  pthread_mutex_unlock(&share->mutex);
1221
1350
 
1222
1351
  /*
1223
1352
    Now we will rewind the archive file so that we are positioned at the
1226
1355
  init_archive_reader();
1227
1356
  azflush(&archive, Z_SYNC_FLUSH);
1228
1357
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1358
  for (x= 0; x < share->archive_write.rows; x++)
1230
1359
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1360
    rc= get_row(&archive, table->record[0]);
1232
1361
 
1233
1362
    if (rc != 0)
1234
1363
      break;
1247
1376
  }
1248
1377
}
1249
1378
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1251
 
{
1252
 
  int error= 0;
1253
 
 
1254
 
  for (const char **ext= bas_ext(); *ext ; ext++)
1255
 
  {
1256
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1257
 
    {
1258
 
      if ((error=errno) != ENOENT)
1259
 
        break;
1260
 
      error= 0;
1261
 
    }
1262
 
  }
1263
 
 
1264
 
  return error;
1265
 
}
1266
 
 
1267
 
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const TableIdentifier &identifier)
1269
 
{
1270
 
  string proto_path(identifier.getPath());
1271
 
  proto_path.append(ARZ);
1272
 
 
1273
 
  if (access(proto_path.c_str(), F_OK))
1274
 
  {
1275
 
    return false;
1276
 
  }
1277
 
 
1278
 
  return true;
1279
 
}
1280
 
 
1281
 
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::SchemaIdentifier &schema_identifier,
1283
 
                                          drizzled::TableIdentifier::vector &set_of_identifiers)
1284
 
{
1285
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
 
 
1287
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
1288
 
       entry_iter != entries.end(); ++entry_iter)
1289
 
  {
1290
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
 
    const string *filename= &entry->filename;
1292
 
 
1293
 
    assert(filename->size());
1294
 
 
1295
 
    const char *ext= strchr(filename->c_str(), '.');
1296
 
 
1297
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1299
 
    {  }
1300
 
    else
1301
 
    {
1302
 
      char uname[NAME_LEN + 1];
1303
 
      uint32_t file_name_len;
1304
 
 
1305
 
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
 
      // TODO: Remove need for memory copy here
1307
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
 
 
1309
 
      set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1310
 
    }
1311
 
  }
1312
 
}
 
1379
/*
 
1380
  Check and repair the table if needed.
 
1381
*/
 
1382
bool ha_archive::check_and_repair(Session *session)
 
1383
{
 
1384
  HA_CHECK_OPT check_opt;
 
1385
 
 
1386
  check_opt.init();
 
1387
 
 
1388
  return(repair(session, &check_opt));
 
1389
}
 
1390
 
 
1391
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1392
{
 
1393
  archive_record_buffer *r;
 
1394
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1395
  {
 
1396
    return(NULL); /* purecov: inspected */
 
1397
  }
 
1398
  r->length= (int)length;
 
1399
 
 
1400
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1401
  {
 
1402
    free((char*) r);
 
1403
    return(NULL); /* purecov: inspected */
 
1404
  }
 
1405
 
 
1406
  return(r);
 
1407
}
 
1408
 
 
1409
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1410
{
 
1411
  free((char*) r->buffer);
 
1412
  free((char*) r);
 
1413
  return;
 
1414
}
 
1415
 
 
1416
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
 
1417
  PLUGIN_VAR_NOCMDOPT,
 
1418
  "Whether or not to use asynchronous IO.",
 
1419
  NULL, NULL, true);
 
1420
 
 
1421
static struct st_mysql_sys_var* archive_system_variables[]= {
 
1422
  DRIZZLE_SYSVAR(aio),
 
1423
  NULL
 
1424
};
 
1425
 
 
1426
drizzle_declare_plugin(archive)
 
1427
{
 
1428
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
1429
  "ARCHIVE",
 
1430
  "3.5",
 
1431
  "Brian Aker, MySQL AB",
 
1432
  "Archive storage engine",
 
1433
  PLUGIN_LICENSE_GPL,
 
1434
  archive_db_init, /* Plugin Init */
 
1435
  archive_db_done, /* Plugin Deinit */
 
1436
  NULL,                       /* status variables                */
 
1437
  archive_system_variables,   /* system variables                */
 
1438
  NULL                        /* config options                  */
 
1439
}
 
1440
drizzle_declare_plugin_end;
 
1441