~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/ha_archive.cc

  • Committer: Brian Aker
  • Date: 2009-01-09 22:07:54 UTC
  • Revision ID: brian@tangent.org-20090109220754-1y50h7lqi9i1ifcs
Dead test/wrong test.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2003 MySQL AB
2
 
   Copyright (C) 2010 Brian Aker
3
2
 
4
3
  This program is free software; you can redistribute it and/or modify
5
4
  it under the terms of the GNU General Public License as published by
12
11
 
13
12
  You should have received a copy of the GNU General Public License
14
13
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
16
 
 
17
 
 
18
 
#include "config.h"
19
 
 
20
 
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
 
 
24
 
using namespace std;
25
 
using namespace drizzled;
26
 
 
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
#include <drizzled/server_includes.h>
 
18
#include <drizzled/field.h>
 
19
#include <drizzled/field/blob.h>
 
20
#include <drizzled/field/timestamp.h>
 
21
#include <storage/myisam/myisam.h>
 
22
#include <drizzled/table.h>
 
23
#include <drizzled/session.h>
 
24
 
 
25
#include <storage/archive/ha_archive.h>
27
26
 
28
27
/*
29
28
  First, if you want to understand storage engines you should look at
93
92
    -Brian
94
93
*/
95
94
 
96
 
/* When the engine starts up set the first version */
97
 
static uint64_t global_version= 1;
98
 
 
99
 
// We use this to find out the state of the archive aio option.
100
 
extern bool archive_aio_state(void);
 
95
/* Variables for archive share methods */
 
96
pthread_mutex_t archive_mutex;
 
97
static HASH archive_open_tables;
 
98
static unsigned int global_version;
 
99
 
 
100
/* The file extension */
 
101
#define ARZ ".ARZ"               // The data file
 
102
#define ARN ".ARN"               // Files used during an optimize call
 
103
 
 
104
 
 
105
/* Static declarations for handerton */
 
106
static handler *archive_create_handler(handlerton *hton,
 
107
                                       TABLE_SHARE *table,
 
108
                                       MEM_ROOT *mem_root);
 
109
 
 
110
static bool archive_use_aio= false;
101
111
 
102
112
/*
103
113
  Number of rows that will force a bulk insert.
109
119
*/
110
120
#define ARCHIVE_ROW_HEADER_SIZE 4
111
121
 
112
 
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
113
 
{
114
 
  ArchiveMap::iterator find_iter=
115
 
    archive_open_tables.find(table_name);
116
 
 
117
 
  if (find_iter != archive_open_tables.end())
118
 
    return (*find_iter).second;
119
 
  else
120
 
    return NULL;
121
 
}
122
 
 
123
 
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
124
 
{
125
 
  archive_open_tables[table_name]= share;
126
 
}
127
 
 
128
 
void ArchiveEngine::deleteOpenTable(const string &table_name)
129
 
{
130
 
  archive_open_tables.erase(table_name);
131
 
}
132
 
 
133
 
 
134
 
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
135
 
{
136
 
  string new_path(identifier.getPath());
137
 
 
138
 
  new_path+= ARZ;
139
 
 
140
 
  int error= unlink(new_path.c_str());
141
 
 
142
 
  if (error != 0)
143
 
  {
144
 
    error= errno= errno;
145
 
  }
146
 
 
147
 
  return error;
148
 
}
149
 
 
150
 
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const TableIdentifier &identifier,
152
 
                                        drizzled::message::Table &table_proto)
153
 
{
154
 
  struct stat stat_info;
155
 
  int error= ENOENT;
156
 
  string proto_path;
157
 
 
158
 
  proto_path.reserve(FN_REFLEN);
159
 
  proto_path.assign(identifier.getPath());
160
 
 
161
 
  proto_path.append(ARZ);
162
 
 
163
 
  if (stat(proto_path.c_str(),&stat_info))
164
 
    return errno;
165
 
  else
166
 
    error= EEXIST;
167
 
 
168
 
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
170
 
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
 
      return HA_ERR_CRASHED_ON_USAGE;
173
 
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
 
    if (proto_string == NULL)
176
 
    {
177
 
      azclose(proto_stream.get());
178
 
      return ENOMEM;
179
 
    }
180
 
 
181
 
    azread_frm(proto_stream.get(), proto_string);
182
 
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
 
      error= HA_ERR_CRASHED_ON_USAGE;
185
 
 
186
 
    azclose(proto_stream.get());
187
 
    free(proto_string);
188
 
  }
189
 
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
 
  return error;
197
 
}
198
 
 
199
 
 
200
 
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
202
 
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
 
122
static handler *archive_create_handler(handlerton *hton,
 
123
                                       TABLE_SHARE *table,
 
124
                                       MEM_ROOT *mem_root)
 
125
{
 
126
  return new (mem_root) ha_archive(hton, table);
 
127
}
 
128
 
 
129
/*
 
130
  Used for hash table that tracks open tables.
 
131
*/
 
132
static unsigned char* archive_get_key(ARCHIVE_SHARE *share, size_t *length, bool)
 
133
{
 
134
  *length=share->table_name_length;
 
135
  return (unsigned char*) share->table_name;
 
136
}
 
137
 
 
138
 
 
139
/*
 
140
  Initialize the archive handler.
 
141
 
 
142
  SYNOPSIS
 
143
    archive_db_init()
 
144
    void *
 
145
 
 
146
  RETURN
 
147
    false       OK
 
148
    true        Error
 
149
*/
 
150
 
 
151
int archive_db_init(void *p)
 
152
{
 
153
  handlerton *archive_hton;
 
154
 
 
155
  archive_hton= (handlerton *)p;
 
156
  archive_hton->state= SHOW_OPTION_YES;
 
157
  archive_hton->create= archive_create_handler;
 
158
  archive_hton->flags= HTON_NO_FLAGS;
 
159
 
 
160
  /* When the engine starts up set the first version */
 
161
  global_version= 1;
 
162
 
 
163
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
 
164
    goto error;
 
165
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
 
166
                (hash_get_key) archive_get_key, 0, 0))
 
167
  {
 
168
    pthread_mutex_destroy(&archive_mutex);
 
169
  }
 
170
  else
 
171
  {
 
172
    return(false);
 
173
  }
 
174
error:
 
175
  return(true);
 
176
}
 
177
 
 
178
/*
 
179
  Release the archive handler.
 
180
 
 
181
  SYNOPSIS
 
182
    archive_db_done()
 
183
    void
 
184
 
 
185
  RETURN
 
186
    false       OK
 
187
*/
 
188
 
 
189
int archive_db_done(void *)
 
190
{
 
191
  hash_free(&archive_open_tables);
 
192
  pthread_mutex_destroy(&archive_mutex);
 
193
 
 
194
  return 0;
 
195
}
 
196
 
 
197
 
 
198
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
 
199
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
203
200
{
204
201
  /* Set our original buffer from pre-allocated memory */
205
202
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
206
203
 
207
204
  /* The size of the offset value we will use for position() */
208
 
  ref_length= sizeof(internal::my_off_t);
 
205
  ref_length= sizeof(my_off_t);
209
206
  archive_reader_open= false;
210
207
}
211
208
 
223
220
  return(1);
224
221
}
225
222
 
226
 
ArchiveShare::ArchiveShare():
227
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
229
 
{
230
 
  assert(1);
231
 
}
232
 
 
233
 
ArchiveShare::ArchiveShare(const char *name):
234
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
236
 
{
237
 
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
 
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
241
 
  /*
242
 
    We will use this lock for rows.
243
 
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
245
 
}
246
 
 
247
 
ArchiveShare::~ArchiveShare()
248
 
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
251
 
  /*
252
 
    We need to make sure we don't reset the crashed state.
253
 
    If we open a crashed file, wee need to close it as crashed unless
254
 
    it has been repaired.
255
 
    Since we will close the data down after this, we go on and count
256
 
    the flush on close;
257
 
  */
258
 
  if (archive_write_open == true)
259
 
    (void)azclose(&archive_write);
260
 
}
261
 
 
262
 
bool ArchiveShare::prime(uint64_t *auto_increment)
263
 
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
265
 
 
266
 
  /*
267
 
    We read the meta file, but do not mark it dirty. Since we are not
268
 
    doing a write we won't mark it dirty (and we won't open it for
269
 
    anything but reading... open it for write and we will generate null
270
 
    compression writes).
271
 
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
273
 
               AZ_METHOD_BLOCK)))
274
 
    return false;
275
 
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
279
 
  if (version < global_version)
280
 
  {
281
 
    version_rows= rows_recorded;
282
 
    version= global_version;
283
 
  }
284
 
  azclose(archive_tmp.get());
285
 
 
286
 
  return true;
287
 
}
288
 
 
289
223
 
290
224
/*
291
225
  We create the shared memory space that we will use for the open table.
294
228
 
295
229
  See ha_example.cc for a longer description.
296
230
*/
297
 
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
 
231
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
298
232
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
303
 
  share= a_engine->findOpenTable(table_name);
304
 
 
305
 
  if (!share)
 
233
  uint32_t length;
 
234
 
 
235
  pthread_mutex_lock(&archive_mutex);
 
236
  length=(uint) strlen(table_name);
 
237
 
 
238
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
 
239
                                           (unsigned char*) table_name,
 
240
                                           length)))
306
241
  {
307
 
    share= new ArchiveShare(table_name);
 
242
    char *tmp_name;
 
243
    azio_stream archive_tmp;
308
244
 
309
 
    if (share == NULL)
 
245
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
246
                          &share, sizeof(*share),
 
247
                          &tmp_name, length+1,
 
248
                          NULL))
310
249
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
250
      pthread_mutex_unlock(&archive_mutex);
312
251
      *rc= HA_ERR_OUT_OF_MEM;
313
252
      return(NULL);
314
253
    }
315
254
 
316
 
    if (share->prime(&stats.auto_increment_value) == false)
 
255
    share->use_count= 0;
 
256
    share->table_name_length= length;
 
257
    share->table_name= tmp_name;
 
258
    share->crashed= false;
 
259
    share->archive_write_open= false;
 
260
    fn_format(share->data_file_name, table_name, "",
 
261
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
262
    strcpy(share->table_name, table_name);
 
263
    /*
 
264
      We will use this lock for rows.
 
265
    */
 
266
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
 
267
 
 
268
    /*
 
269
      We read the meta file, but do not mark it dirty. Since we are not
 
270
      doing a write we won't mark it dirty (and we won't open it for
 
271
      anything but reading... open it for write and we will generate null
 
272
      compression writes).
 
273
    */
 
274
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY,
 
275
                 AZ_METHOD_BLOCK)))
317
276
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
277
      pthread_mutex_destroy(&share->mutex);
 
278
      free(share);
 
279
      pthread_mutex_unlock(&archive_mutex);
319
280
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
 
      delete share;
321
 
 
322
 
      return NULL;
323
 
    }
324
 
 
325
 
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
281
      return(NULL);
 
282
    }
 
283
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
 
284
    share->rows_recorded= (ha_rows)archive_tmp.rows;
 
285
    share->crashed= archive_tmp.dirty;
 
286
    if (share->version < global_version)
 
287
    {
 
288
      share->version_rows= share->rows_recorded;
 
289
      share->version= global_version;
 
290
    }
 
291
    azclose(&archive_tmp);
 
292
 
 
293
    my_hash_insert(&archive_open_tables, (unsigned char*) share);
 
294
    thr_lock_init(&share->lock);
327
295
  }
328
296
  share->use_count++;
329
 
 
330
297
  if (share->crashed)
331
298
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
299
  pthread_mutex_unlock(&archive_mutex);
333
300
 
334
301
  return(share);
335
302
}
341
308
*/
342
309
int ha_archive::free_share()
343
310
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
311
  int rc= 0;
345
312
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
313
  pthread_mutex_lock(&archive_mutex);
347
314
  if (!--share->use_count)
348
315
  {
349
 
    a_engine->deleteOpenTable(share->table_name);
350
 
    delete share;
 
316
    hash_delete(&archive_open_tables, (unsigned char*) share);
 
317
    thr_lock_delete(&share->lock);
 
318
    pthread_mutex_destroy(&share->mutex);
 
319
    /*
 
320
      We need to make sure we don't reset the crashed state.
 
321
      If we open a crashed file, wee need to close it as crashed unless
 
322
      it has been repaired.
 
323
      Since we will close the data down after this, we go on and count
 
324
      the flush on close;
 
325
    */
 
326
    if (share->archive_write_open == true)
 
327
    {
 
328
      if (azclose(&(share->archive_write)))
 
329
        rc= 1;
 
330
    }
 
331
    free((unsigned char*) share);
351
332
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
333
  pthread_mutex_unlock(&archive_mutex);
353
334
 
354
 
  return 0;
 
335
  return(rc);
355
336
}
356
337
 
357
338
int ha_archive::init_archive_writer()
361
342
    a gzip file that can be both read and written we keep a writer open
362
343
    that is shared amoung all open tables.
363
344
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
345
  if (!(azopen(&(share->archive_write), share->data_file_name,
365
346
               O_RDWR, AZ_METHOD_BLOCK)))
366
347
  {
367
348
    share->crashed= true;
374
355
 
375
356
 
376
357
/*
377
 
  No locks are required because it is associated with just one Cursor instance
 
358
  No locks are required because it is associated with just one handler instance
378
359
*/
379
360
int ha_archive::init_archive_reader()
380
361
{
387
368
  {
388
369
    az_method method;
389
370
 
390
 
    if (archive_aio_state())
 
371
    switch (archive_use_aio)
391
372
    {
 
373
    case false:
 
374
      method= AZ_METHOD_BLOCK;
 
375
      break;
 
376
    case true:
392
377
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
378
      break;
 
379
    default:
396
380
      method= AZ_METHOD_BLOCK;
397
381
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
382
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
399
383
                 method)))
400
384
    {
401
385
      share->crashed= true;
407
391
  return(0);
408
392
}
409
393
 
 
394
 
 
395
/*
 
396
  We just implement one additional file extension.
 
397
*/
 
398
static const char *ha_archive_exts[] = {
 
399
  ARZ,
 
400
  NULL
 
401
};
 
402
 
 
403
const char **ha_archive::bas_ext() const
 
404
{
 
405
  return ha_archive_exts;
 
406
}
 
407
 
 
408
 
410
409
/*
411
410
  When opening a file we:
412
411
  Create/get our shared structure.
413
412
  Init out lock.
414
413
  We open the file we will read from.
415
414
*/
416
 
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
415
int ha_archive::open(const char *name, int, uint32_t open_options)
417
416
{
418
417
  int rc= 0;
419
 
  share= get_share(identifier.getPath().c_str(), &rc);
420
 
 
421
 
  /** 
422
 
    We either fix it ourselves, or we just take it offline 
423
 
 
424
 
    @todo Create some documentation in the recovery tools shipped with the engine.
425
 
  */
426
 
  if (rc == HA_ERR_CRASHED_ON_USAGE)
 
418
  share= get_share(name, &rc);
 
419
 
 
420
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
427
421
  {
 
422
    /* purecov: begin inspected */
428
423
    free_share();
429
 
    rc= repair();
430
 
 
431
 
    return 0;
 
424
    return(rc);
 
425
    /* purecov: end */
432
426
  }
433
427
  else if (rc == HA_ERR_OUT_OF_MEM)
434
428
  {
437
431
 
438
432
  assert(share);
439
433
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
443
 
 
444
 
  return(rc);
445
 
}
446
 
 
447
 
// Should never be called
448
 
int ha_archive::open(const char *, int, uint32_t)
449
 
{
450
 
  assert(0);
451
 
  return -1;
 
434
  record_buffer= create_record_buffer(table->s->reclength +
 
435
                                      ARCHIVE_ROW_HEADER_SIZE);
 
436
 
 
437
  if (!record_buffer)
 
438
  {
 
439
    free_share();
 
440
    return(HA_ERR_OUT_OF_MEM);
 
441
  }
 
442
 
 
443
  thr_lock_data_init(&share->lock, &lock, NULL);
 
444
 
 
445
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
 
446
  {
 
447
    return(0);
 
448
  }
 
449
  else
 
450
    return(rc);
452
451
}
453
452
 
454
453
 
473
472
{
474
473
  int rc= 0;
475
474
 
476
 
  record_buffer.clear();
 
475
  destroy_record_buffer(record_buffer);
477
476
 
478
477
  /* First close stream */
479
478
  if (archive_reader_open == true)
497
496
  of creation.
498
497
*/
499
498
 
500
 
int ArchiveEngine::doCreateTable(Session &,
501
 
                                 Table& table_arg,
502
 
                                 const drizzled::TableIdentifier &identifier,
503
 
                                 drizzled::message::Table& proto)
 
499
int ha_archive::create(const char *name, Table *table_arg,
 
500
                       HA_CREATE_INFO *create_info)
504
501
{
505
 
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
507
 
  uint64_t auto_increment_value;
508
 
  string serialized_proto;
509
 
 
510
 
  auto_increment_value= proto.options().auto_increment_value();
511
 
 
512
 
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
 
502
  char name_buff[FN_REFLEN];
 
503
  char linkname[FN_REFLEN];
 
504
  int error;
 
505
  azio_stream create_stream;            /* Archive file we are working with */
 
506
  File frm_file;                   /* File handler for readers */
 
507
  struct stat file_stat;
 
508
  unsigned char *frm_ptr;
 
509
 
 
510
  stats.auto_increment_value= create_info->auto_increment_value;
 
511
 
 
512
  for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
513
513
  {
514
 
    KeyInfo *pos= &table_arg.key_info[key];
515
 
    KeyPartInfo *key_part=     pos->key_part;
516
 
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
514
    KEY *pos= table_arg->key_info+key;
 
515
    KEY_PART_INFO *key_part=     pos->key_part;
 
516
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
517
517
 
518
518
    for (; key_part != key_part_end; key_part++)
519
519
    {
521
521
 
522
522
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
523
      {
524
 
        return -1;
525
 
      }
526
 
    }
527
 
  }
528
 
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
531
 
 
532
 
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
 
             AZ_METHOD_BLOCK) == 0)
535
 
  {
536
 
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
540
 
  }
541
 
 
542
 
  try {
543
 
    proto.SerializeToString(&serialized_proto);
544
 
  }
545
 
  catch (...)
546
 
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
550
 
  }
551
 
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
 
                  serialized_proto.length()))
554
 
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
558
 
  }
559
 
 
560
 
  if (proto.options().has_comment())
561
 
  {
562
 
    int write_length;
563
 
 
564
 
    write_length= azwrite_comment(create_stream.get(),
565
 
                                  proto.options().comment().c_str(),
566
 
                                  proto.options().comment().length());
567
 
 
568
 
    if (write_length < 0)
569
 
    {
570
 
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
574
 
    }
575
 
  }
576
 
 
577
 
  /*
578
 
    Yes you need to do this, because the starting value
579
 
    for the autoincrement may not be zero.
580
 
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
582
 
    auto_increment_value - 1 : 0;
583
 
 
584
 
  if (azclose(create_stream.get()))
585
 
  {
586
 
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
590
 
  }
 
524
        error= -1;
 
525
        goto error;
 
526
      }
 
527
    }
 
528
  }
 
529
 
 
530
  /*
 
531
    We reuse name_buff since it is available.
 
532
  */
 
533
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
 
534
  {
 
535
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
 
536
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
537
    fn_format(linkname, name, "", ARZ,
 
538
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
539
  }
 
540
  else
 
541
  {
 
542
    fn_format(name_buff, name, "", ARZ,
 
543
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
544
    linkname[0]= 0;
 
545
  }
 
546
 
 
547
  /*
 
548
    There is a chance that the file was "discovered". In this case
 
549
    just use whatever file is there.
 
550
  */
 
551
  if (!stat(name_buff, &file_stat))
 
552
  {
 
553
    my_errno= 0;
 
554
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
 
555
                 AZ_METHOD_BLOCK)))
 
556
    {
 
557
      error= errno;
 
558
      goto error2;
 
559
    }
 
560
 
 
561
    if (linkname[0])
 
562
      my_symlink(name_buff, linkname, MYF(0));
 
563
    fn_format(name_buff, name, "", ".frm",
 
564
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
565
 
 
566
    /*
 
567
      Here is where we open up the frm and pass it to archive to store
 
568
    */
 
569
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
 
570
    {
 
571
      if (fstat(frm_file, &file_stat))
 
572
      {
 
573
        frm_ptr= (unsigned char *)malloc(sizeof(unsigned char) *
 
574
                                         file_stat.st_size);
 
575
        if (frm_ptr)
 
576
        {
 
577
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
 
578
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
 
579
          free((unsigned char*)frm_ptr);
 
580
        }
 
581
      }
 
582
      my_close(frm_file, MYF(0));
 
583
    }
 
584
 
 
585
    if (create_info->comment.str)
 
586
      azwrite_comment(&create_stream, create_info->comment.str,
 
587
                      (unsigned int)create_info->comment.length);
 
588
 
 
589
    /*
 
590
      Yes you need to do this, because the starting value
 
591
      for the autoincrement may not be zero.
 
592
    */
 
593
    create_stream.auto_increment= stats.auto_increment_value ?
 
594
                                    stats.auto_increment_value - 1 : 0;
 
595
    if (azclose(&create_stream))
 
596
    {
 
597
      error= errno;
 
598
      goto error2;
 
599
    }
 
600
  }
 
601
  else
 
602
    my_errno= 0;
591
603
 
592
604
  return(0);
 
605
 
 
606
error2:
 
607
  delete_table(name);
 
608
error:
 
609
  /* Return error number, if we got one */
 
610
  return(error ? error : -1);
593
611
}
594
612
 
595
613
/*
597
615
*/
598
616
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
599
617
{
600
 
  off_t written;
 
618
  my_off_t written;
601
619
  unsigned int r_pack_length;
602
620
 
603
621
  /* We pack the row for writing */
604
622
  r_pack_length= pack_row(buf);
605
623
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
624
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
625
  if (written != r_pack_length)
608
626
  {
609
627
    return(-1);
623
641
 
624
642
uint32_t ha_archive::max_row_length(const unsigned char *)
625
643
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
644
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
645
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
646
 
629
647
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
648
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
649
       ptr != end ;
632
650
       ptr++)
633
651
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
652
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
635
653
  }
636
654
 
637
655
  return length;
643
661
  unsigned char *ptr;
644
662
 
645
663
  if (fix_rec_buff(max_row_length(record)))
646
 
    return(HA_ERR_OUT_OF_MEM);
 
664
    return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
647
665
 
648
666
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
667
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
668
  ptr= record_buffer->buffer + table->s->null_bytes;
651
669
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
670
  for (Field **field=table->field ; *field ; field++)
653
671
  {
654
672
    if (!((*field)->is_null()))
655
673
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
674
  }
657
675
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
676
  return((unsigned int) (ptr - record_buffer->buffer));
659
677
}
660
678
 
661
679
 
668
686
  for implementing start_bulk_insert() is that we could skip
669
687
  setting dirty to true each time.
670
688
*/
671
 
int ha_archive::doInsertRecord(unsigned char *buf)
 
689
int ha_archive::write_row(unsigned char *buf)
672
690
{
673
691
  int rc;
674
692
  unsigned char *read_buf= NULL;
675
693
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
694
  unsigned char *record=  table->record[0];
677
695
 
678
696
  if (share->crashed)
679
697
    return(HA_ERR_CRASHED_ON_USAGE);
680
698
 
681
 
  pthread_mutex_lock(&share->mutex());
 
699
  ha_statistic_increment(&SSV::ha_write_count);
 
700
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
701
    table->timestamp_field->set_time();
 
702
  pthread_mutex_lock(&share->mutex);
682
703
 
683
704
  if (share->archive_write_open == false)
684
705
    if (init_archive_writer())
685
706
      return(HA_ERR_CRASHED_ON_USAGE);
686
707
 
687
708
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
709
  if (table->next_number_field && record == table->record[0])
689
710
  {
 
711
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
712
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
713
    temp_auto= table->next_number_field->val_int();
692
714
 
693
715
    /*
694
716
      We don't support decremening auto_increment. They make the performance
695
717
      just cry.
696
718
    */
697
719
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
720
        mkey->flags & HA_NOSAME)
699
721
    {
700
722
      rc= HA_ERR_FOUND_DUPP_KEY;
701
723
      goto error;
702
724
    }
 
725
#ifdef DEAD_CODE
 
726
    /*
 
727
      Bad news, this will cause a search for the unique value which is very
 
728
      expensive since we will have to do a table scan which will lock up
 
729
      all other writers during this period. This could perhaps be optimized
 
730
      in the future.
 
731
    */
 
732
    {
 
733
      /*
 
734
        First we create a buffer that we can use for reading rows, and can pass
 
735
        to get_row().
 
736
      */
 
737
      if (!(read_buf= (unsigned char*) malloc(table->s->reclength)))
 
738
      {
 
739
        rc= HA_ERR_OUT_OF_MEM;
 
740
        goto error;
 
741
      }
 
742
       /*
 
743
         All of the buffer must be written out or we won't see all of the
 
744
         data
 
745
       */
 
746
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
747
      /*
 
748
        Set the position of the local read thread to the beginning postion.
 
749
      */
 
750
      if (read_data_header(&archive))
 
751
      {
 
752
        rc= HA_ERR_CRASHED_ON_USAGE;
 
753
        goto error;
 
754
      }
 
755
 
 
756
      Field *mfield= table->next_number_field;
 
757
 
 
758
      while (!(get_row(&archive, read_buf)))
 
759
      {
 
760
        if (!memcmp(read_buf + mfield->offset(record),
 
761
                    table->next_number_field->ptr,
 
762
                    mfield->max_display_length()))
 
763
        {
 
764
          rc= HA_ERR_FOUND_DUPP_KEY;
 
765
          goto error;
 
766
        }
 
767
      }
 
768
    }
 
769
#endif
703
770
    else
704
771
    {
705
772
      if (temp_auto > share->archive_write.auto_increment)
715
782
  share->rows_recorded++;
716
783
  rc= real_write_row(buf,  &(share->archive_write));
717
784
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
785
  pthread_mutex_unlock(&share->mutex);
719
786
  if (read_buf)
720
787
    free((unsigned char*) read_buf);
721
788
 
730
797
  *first_value= share->archive_write.auto_increment + 1;
731
798
}
732
799
 
733
 
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
 
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
800
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
801
int ha_archive::index_init(uint32_t keynr, bool)
735
802
{
736
803
  active_index= keynr;
737
804
  return(0);
743
810
  the optimizer that we have unique indexes, we scan
744
811
*/
745
812
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
 
                             uint32_t key_len, enum ha_rkey_function)
 
813
                             uint32_t key_len, enum ha_rkey_function find_flag)
 
814
{
 
815
  int rc;
 
816
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
 
817
  return(rc);
 
818
}
 
819
 
 
820
 
 
821
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
 
822
                               uint32_t key_len, enum ha_rkey_function)
747
823
{
748
824
  int rc;
749
825
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
826
  KEY *mkey= &table->s->key_info[index];
 
827
  current_k_offset= mkey->key_part->offset;
751
828
  current_key= key;
752
829
  current_key_len= key_len;
753
830
 
754
 
  rc= doStartTableScan(true);
 
831
  rc= rnd_init(true);
755
832
 
756
833
  if (rc)
757
834
    goto error;
795
872
  we assume the position will be set.
796
873
*/
797
874
 
798
 
int ha_archive::doStartTableScan(bool scan)
 
875
int ha_archive::rnd_init(bool scan)
799
876
{
800
877
  if (share->crashed)
801
878
      return(HA_ERR_CRASHED_ON_USAGE);
832
909
/* Reallocate buffer if needed */
833
910
bool ha_archive::fix_rec_buff(unsigned int length)
834
911
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
912
  assert(record_buffer->buffer);
 
913
 
 
914
  if (length > record_buffer->length)
 
915
  {
 
916
    unsigned char *newptr;
 
917
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
918
      return(1);
 
919
    record_buffer->buffer= newptr;
 
920
    record_buffer->length= length;
 
921
  }
 
922
 
 
923
  assert(length <= record_buffer->length);
 
924
 
 
925
  return(0);
838
926
}
839
927
 
840
928
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
940
  }
853
941
 
854
942
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
943
  memcpy(record, ptr, table->getNullBytes());
 
944
  ptr+= table->getNullBytes();
 
945
  for (Field **field=table->field ; *field ; field++)
858
946
  {
859
947
    if (!((*field)->is_null()))
860
948
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
949
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
950
    }
863
951
  }
864
952
  return(0);
889
977
    return(HA_ERR_END_OF_FILE);
890
978
  scan_rows--;
891
979
 
892
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
980
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
893
981
  current_position= aztell(&archive);
894
982
  rc= get_row(&archive, buf);
895
983
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
984
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
985
 
898
986
  return(rc);
899
987
}
900
988
 
901
989
 
902
990
/*
903
 
  Thanks to the table bool is_ordered this will be called after
 
991
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
904
992
  each call to ha_archive::rnd_next() if an ordering of the rows is
905
993
  needed.
906
994
*/
907
995
 
908
996
void ha_archive::position(const unsigned char *)
909
997
{
910
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
998
  my_store_ptr(ref, ref_length, current_position);
911
999
  return;
912
1000
}
913
1001
 
921
1009
 
922
1010
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
923
1011
{
924
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
925
 
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
 
1012
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1013
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
926
1014
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
927
1015
    return(HA_ERR_CRASHED_ON_USAGE);
928
1016
  return(get_row(&archive, buf));
933
1021
  rewriting the meta file. Currently it does this by calling optimize with
934
1022
  the extended flag.
935
1023
*/
936
 
int ha_archive::repair()
 
1024
int ha_archive::repair(Session* session, HA_CHECK_OPT* check_opt)
937
1025
{
938
 
  int rc= optimize();
 
1026
  check_opt->flags= T_EXTEND;
 
1027
  int rc= optimize(session, check_opt);
939
1028
 
940
1029
  if (rc)
941
1030
    return(HA_ERR_CRASHED_ON_REPAIR);
948
1037
  The table can become fragmented if data was inserted, read, and then
949
1038
  inserted again. What we do is open up the file and recompress it completely.
950
1039
*/
951
 
int ha_archive::optimize()
 
1040
int ha_archive::optimize(Session *, HA_CHECK_OPT *)
952
1041
{
953
1042
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1043
  azio_stream writer;
 
1044
  char writer_filename[FN_REFLEN];
955
1045
 
956
1046
  init_archive_reader();
957
1047
 
962
1052
    share->archive_write_open= false;
963
1053
  }
964
1054
 
965
 
  char* proto_string;
966
 
  proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
 
  if (proto_string == NULL)
968
 
  {
969
 
    return ENOMEM;
970
 
  }
971
 
  azread_frm(&archive, proto_string);
972
 
 
973
1055
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
 
1056
  fn_format(writer_filename, share->table_name, "", ARN,
 
1057
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
976
1058
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
 
  {
979
 
    free(proto_string);
 
1059
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
980
1060
    return(HA_ERR_CRASHED_ON_USAGE);
981
 
  }
982
 
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
984
1061
 
985
1062
  /*
986
1063
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1004
1081
    */
1005
1082
    if (!rc)
1006
1083
    {
 
1084
      uint64_t x;
1007
1085
      uint64_t rows_restored;
1008
1086
      share->rows_recorded= 0;
1009
1087
      stats.auto_increment_value= 1;
1011
1089
 
1012
1090
      rows_restored= archive.rows;
1013
1091
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1092
      for (x= 0; x < rows_restored ; x++)
1015
1093
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1094
        rc= get_row(&archive, table->record[0]);
1017
1095
 
1018
1096
        if (rc != 0)
1019
1097
          break;
1020
1098
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1099
        real_write_row(table->record[0], &writer);
1022
1100
        /*
1023
1101
          Long term it should be possible to optimize this so that
1024
1102
          it is not called on each row.
1025
1103
        */
1026
 
        if (getTable()->found_next_number_field)
 
1104
        if (table->found_next_number_field)
1027
1105
        {
1028
 
          Field *field= getTable()->found_next_number_field;
1029
 
 
1030
 
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
 
          field->setReadSet();
1032
 
 
 
1106
          Field *field= table->found_next_number_field;
1033
1107
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1108
            (uint64_t) field->val_int(table->record[0] +
 
1109
                                       field->offset(table->record[0]));
1036
1110
          if (share->archive_write.auto_increment < auto_value)
1037
1111
            stats.auto_increment_value=
1038
1112
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1113
        }
1040
1114
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1115
      share->rows_recorded= (ha_rows)writer.rows;
1042
1116
    }
1043
1117
 
1044
1118
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1121
    }
1048
1122
  }
1049
1123
 
1050
 
  azclose(writer.get());
 
1124
  azclose(&writer);
1051
1125
  share->dirty= false;
1052
1126
 
1053
1127
  azclose(&archive);
1054
1128
 
1055
1129
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1057
 
 
1058
 
  free(proto_string);
 
1130
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
 
1131
 
 
1132
 
1059
1133
  return(rc);
1060
1134
error:
1061
 
  free(proto_string);
1062
 
  azclose(writer.get());
 
1135
  azclose(&writer);
1063
1136
 
1064
1137
  return(rc);
1065
1138
}
1071
1144
                                       THR_LOCK_DATA **to,
1072
1145
                                       enum thr_lock_type lock_type)
1073
1146
{
1074
 
  delayed_insert= false;
 
1147
  if (lock_type == TL_WRITE_DELAYED)
 
1148
    delayed_insert= true;
 
1149
  else
 
1150
    delayed_insert= false;
1075
1151
 
1076
1152
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1077
1153
  {
1083
1159
    */
1084
1160
 
1085
1161
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
 
         lock_type <= TL_WRITE)
 
1162
         lock_type <= TL_WRITE) && !session_in_lock_tables(session)
1087
1163
        && !session_tablespace_op(session))
1088
1164
      lock_type = TL_WRITE_ALLOW_WRITE;
1089
1165
 
1095
1171
      concurrent inserts to t2.
1096
1172
    */
1097
1173
 
1098
 
    if (lock_type == TL_READ_NO_INSERT)
 
1174
    if (lock_type == TL_READ_NO_INSERT && !session_in_lock_tables(session))
1099
1175
      lock_type = TL_READ;
1100
1176
 
1101
1177
    lock.type=lock_type;
1106
1182
  return to;
1107
1183
}
1108
1184
 
 
1185
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
 
1186
{
 
1187
  ha_archive::info(HA_STATUS_AUTO);
 
1188
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
 
1189
  {
 
1190
    create_info->auto_increment_value= stats.auto_increment_value;
 
1191
  }
 
1192
 
 
1193
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
 
1194
    create_info->data_file_name= share->real_path;
 
1195
 
 
1196
  return;
 
1197
}
 
1198
 
 
1199
 
1109
1200
/*
1110
1201
  Hints for optimizer, see ha_tina for more information
1111
1202
*/
1115
1206
    If dirty, we lock, and then reset/flush the data.
1116
1207
    I found that just calling azflush() doesn't always work.
1117
1208
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1209
  pthread_mutex_lock(&share->mutex);
1119
1210
  if (share->dirty == true)
1120
1211
  {
1121
1212
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1225
    cause the number to be inaccurate.
1135
1226
  */
1136
1227
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1228
  pthread_mutex_unlock(&share->mutex);
1138
1229
 
1139
1230
  scan_rows= stats.records;
1140
1231
  stats.deleted= 0;
1144
1235
  {
1145
1236
    struct stat file_stat;  // Stat information for the data file
1146
1237
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1238
    stat(share->data_file_name, &file_stat);
1148
1239
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1240
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1241
    stats.data_file_length= file_stat.st_size;
1151
1242
    stats.create_time= file_stat.st_ctime;
1152
1243
    stats.update_time= file_stat.st_mtime;
1158
1249
  if (flag & HA_STATUS_AUTO)
1159
1250
  {
1160
1251
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1252
    pthread_mutex_lock(&share->mutex);
1162
1253
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1254
    pthread_mutex_unlock(&share->mutex);
1164
1255
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1256
  }
1166
1257
 
1170
1261
 
1171
1262
/*
1172
1263
  This method tells us that a bulk insert operation is about to occur. We set
1173
 
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
 
1264
  a flag which will keep write_row from saying that its data is dirty. This in
1174
1265
  turn will keep selects from causing a sync to occur.
1175
1266
  Basically, yet another optimizations to keep compression working well.
1176
1267
*/
1204
1295
}
1205
1296
 
1206
1297
/*
 
1298
  We just return state if asked.
 
1299
*/
 
1300
bool ha_archive::is_crashed() const
 
1301
{
 
1302
  return(share->crashed);
 
1303
}
 
1304
 
 
1305
/*
1207
1306
  Simple scan of the tables to make sure everything is ok.
1208
1307
*/
1209
1308
 
1210
 
int ha_archive::check(Session* session)
 
1309
int ha_archive::check(Session* session, HA_CHECK_OPT *)
1211
1310
{
1212
1311
  int rc= 0;
1213
1312
  const char *old_proc_info;
 
1313
  uint64_t x;
1214
1314
 
1215
1315
  old_proc_info= get_session_proc_info(session);
1216
1316
  set_session_proc_info(session, "Checking table");
1217
1317
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1318
  pthread_mutex_lock(&share->mutex);
1219
1319
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1320
  pthread_mutex_unlock(&share->mutex);
1221
1321
 
1222
1322
  /*
1223
1323
    Now we will rewind the archive file so that we are positioned at the
1226
1326
  init_archive_reader();
1227
1327
  azflush(&archive, Z_SYNC_FLUSH);
1228
1328
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1329
  for (x= 0; x < share->archive_write.rows; x++)
1230
1330
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1331
    rc= get_row(&archive, table->record[0]);
1232
1332
 
1233
1333
    if (rc != 0)
1234
1334
      break;
1247
1347
  }
1248
1348
}
1249
1349
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1251
 
{
1252
 
  int error= 0;
1253
 
 
1254
 
  for (const char **ext= bas_ext(); *ext ; ext++)
1255
 
  {
1256
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1257
 
    {
1258
 
      if ((error=errno) != ENOENT)
1259
 
        break;
1260
 
      error= 0;
1261
 
    }
1262
 
  }
1263
 
 
1264
 
  return error;
1265
 
}
1266
 
 
1267
 
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const TableIdentifier &identifier)
1269
 
{
1270
 
  string proto_path(identifier.getPath());
1271
 
  proto_path.append(ARZ);
1272
 
 
1273
 
  if (access(proto_path.c_str(), F_OK))
1274
 
  {
1275
 
    return false;
1276
 
  }
1277
 
 
1278
 
  return true;
1279
 
}
1280
 
 
1281
 
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::SchemaIdentifier &schema_identifier,
1283
 
                                          drizzled::TableIdentifier::vector &set_of_identifiers)
1284
 
{
1285
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
 
 
1287
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
1288
 
       entry_iter != entries.end(); ++entry_iter)
1289
 
  {
1290
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
 
    const string *filename= &entry->filename;
1292
 
 
1293
 
    assert(filename->size());
1294
 
 
1295
 
    const char *ext= strchr(filename->c_str(), '.');
1296
 
 
1297
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1299
 
    {  }
1300
 
    else
1301
 
    {
1302
 
      char uname[NAME_LEN + 1];
1303
 
      uint32_t file_name_len;
1304
 
 
1305
 
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
 
      // TODO: Remove need for memory copy here
1307
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
 
 
1309
 
      set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1310
 
    }
1311
 
  }
1312
 
}
 
1350
/*
 
1351
  Check and repair the table if needed.
 
1352
*/
 
1353
bool ha_archive::check_and_repair(Session *session)
 
1354
{
 
1355
  HA_CHECK_OPT check_opt;
 
1356
 
 
1357
  check_opt.init();
 
1358
 
 
1359
  return(repair(session, &check_opt));
 
1360
}
 
1361
 
 
1362
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1363
{
 
1364
  archive_record_buffer *r;
 
1365
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1366
  {
 
1367
    return(NULL); /* purecov: inspected */
 
1368
  }
 
1369
  r->length= (int)length;
 
1370
 
 
1371
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1372
  {
 
1373
    free((char*) r);
 
1374
    return(NULL); /* purecov: inspected */
 
1375
  }
 
1376
 
 
1377
  return(r);
 
1378
}
 
1379
 
 
1380
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1381
{
 
1382
  free((char*) r->buffer);
 
1383
  free((char*) r);
 
1384
  return;
 
1385
}
 
1386
 
 
1387
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
 
1388
  PLUGIN_VAR_NOCMDOPT,
 
1389
  "Whether or not to use asynchronous IO.",
 
1390
  NULL, NULL, true);
 
1391
 
 
1392
static struct st_mysql_sys_var* archive_system_variables[]= {
 
1393
  DRIZZLE_SYSVAR(aio),
 
1394
  NULL
 
1395
};
 
1396
 
 
1397
mysql_declare_plugin(archive)
 
1398
{
 
1399
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
1400
  "ARCHIVE",
 
1401
  "3.5",
 
1402
  "Brian Aker, MySQL AB",
 
1403
  "Archive storage engine",
 
1404
  PLUGIN_LICENSE_GPL,
 
1405
  archive_db_init, /* Plugin Init */
 
1406
  archive_db_done, /* Plugin Deinit */
 
1407
  NULL,                       /* status variables                */
 
1408
  archive_system_variables,   /* system variables                */
 
1409
  NULL                        /* config options                  */
 
1410
}
 
1411
mysql_declare_plugin_end;
 
1412