~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/ha_archive.cc

  • Committer: Brian Aker
  • Date: 2009-01-24 09:43:35 UTC
  • Revision ID: brian@gir-3.local-20090124094335-6qdtvc35gl5fvivz
Adding in an example singe thread scheduler

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2003 MySQL AB
2
 
   Copyright (C) 2010 Brian Aker
3
2
 
4
3
  This program is free software; you can redistribute it and/or modify
5
4
  it under the terms of the GNU General Public License as published by
15
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16
15
 
17
16
 
18
 
#include "config.h"
19
 
 
20
 
#include "plugin/archive/archive_engine.h"
21
 
 
22
 
using namespace std;
23
 
using namespace drizzled;
24
 
 
 
17
#include <drizzled/server_includes.h>
 
18
#include <drizzled/field.h>
 
19
#include <drizzled/field/blob.h>
 
20
#include <drizzled/field/timestamp.h>
 
21
#include <storage/myisam/myisam.h>
 
22
#include <drizzled/table.h>
 
23
#include <drizzled/session.h>
 
24
 
 
25
#include <storage/archive/ha_archive.h>
25
26
 
26
27
/*
27
28
  First, if you want to understand storage engines you should look at
92
93
*/
93
94
 
94
95
/* Variables for archive share methods */
95
 
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
96
 
 
97
 
/* When the engine starts up set the first version */
98
 
static uint64_t global_version= 1;
99
 
 
100
 
// We use this to find out the state of the archive aio option.
101
 
extern bool archive_aio_state(void);
 
96
pthread_mutex_t archive_mutex;
 
97
static HASH archive_open_tables;
 
98
static unsigned int global_version;
 
99
 
 
100
/* The file extension */
 
101
#define ARZ ".ARZ"               // The data file
 
102
#define ARN ".ARN"               // Files used during an optimize call
 
103
 
 
104
 
 
105
/* Static declarations for handerton */
 
106
static handler *archive_create_handler(handlerton *hton,
 
107
                                       TABLE_SHARE *table,
 
108
                                       MEM_ROOT *mem_root);
 
109
 
 
110
static bool archive_use_aio= false;
102
111
 
103
112
/*
104
113
  Number of rows that will force a bulk insert.
110
119
*/
111
120
#define ARCHIVE_ROW_HEADER_SIZE 4
112
121
 
113
 
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
 
{
115
 
  ArchiveMap::iterator find_iter=
116
 
    archive_open_tables.find(table_name);
117
 
 
118
 
  if (find_iter != archive_open_tables.end())
119
 
    return (*find_iter).second;
120
 
  else
121
 
    return NULL;
122
 
}
123
 
 
124
 
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
 
{
126
 
  archive_open_tables[table_name]= share;
127
 
}
128
 
 
129
 
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
 
{
131
 
  archive_open_tables.erase(table_name);
132
 
}
133
 
 
134
 
 
135
 
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory, 
136
 
                                    SchemaIdentifier&,
137
 
                                    set<string>& set_of_names)
138
 
{
139
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
140
 
 
141
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
142
 
       entry_iter != entries.end(); ++entry_iter)
143
 
  {
144
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
145
 
    const string *filename= &entry->filename;
146
 
 
147
 
    assert(filename->size());
148
 
 
149
 
    const char *ext= strchr(filename->c_str(), '.');
150
 
 
151
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
152
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
153
 
    {  }
154
 
    else
155
 
    {
156
 
      char uname[NAME_LEN + 1];
157
 
      uint32_t file_name_len;
158
 
 
159
 
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
160
 
      // TODO: Remove need for memory copy here
161
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
162
 
      set_of_names.insert(uname);
163
 
    }
164
 
  }
165
 
}
166
 
 
167
 
 
168
 
int ArchiveEngine::doDropTable(Session&, TableIdentifier &identifier)
169
 
{
170
 
  string new_path(identifier.getPath());
171
 
 
172
 
  new_path+= ARZ;
173
 
 
174
 
  int error= unlink(new_path.c_str());
175
 
 
176
 
  if (error != 0)
177
 
  {
178
 
    error= errno= errno;
179
 
  }
180
 
 
181
 
  return error;
182
 
}
183
 
 
184
 
int ArchiveEngine::doGetTableDefinition(Session&,
185
 
                                        TableIdentifier &identifier,
186
 
                                        drizzled::message::Table &table_proto)
187
 
{
188
 
  struct stat stat_info;
189
 
  int error= ENOENT;
190
 
  string proto_path;
191
 
 
192
 
  proto_path.reserve(FN_REFLEN);
193
 
  proto_path.assign(identifier.getPath());
194
 
 
195
 
  proto_path.append(ARZ);
196
 
 
197
 
  if (stat(proto_path.c_str(),&stat_info))
198
 
    return errno;
199
 
  else
200
 
    error= EEXIST;
201
 
 
202
 
  {
203
 
    azio_stream proto_stream;
204
 
    char* proto_string;
205
 
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
206
 
      return HA_ERR_CRASHED_ON_USAGE;
207
 
 
208
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
209
 
    if (proto_string == NULL)
210
 
    {
211
 
      azclose(&proto_stream);
212
 
      return ENOMEM;
213
 
    }
214
 
 
215
 
    azread_frm(&proto_stream, proto_string);
216
 
 
217
 
    if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
218
 
      error= HA_ERR_CRASHED_ON_USAGE;
219
 
 
220
 
    azclose(&proto_stream);
221
 
    free(proto_string);
222
 
  }
223
 
 
224
 
  return error;
225
 
}
226
 
 
227
 
 
228
 
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
229
 
                       TableShare &table_arg)
230
 
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
 
122
static handler *archive_create_handler(handlerton *hton,
 
123
                                       TABLE_SHARE *table,
 
124
                                       MEM_ROOT *mem_root)
 
125
{
 
126
  return new (mem_root) ha_archive(hton, table);
 
127
}
 
128
 
 
129
/*
 
130
  Used for hash table that tracks open tables.
 
131
*/
 
132
static unsigned char* archive_get_key(ARCHIVE_SHARE *share, size_t *length, bool)
 
133
{
 
134
  *length=share->table_name_length;
 
135
  return (unsigned char*) share->table_name;
 
136
}
 
137
 
 
138
 
 
139
/*
 
140
  Initialize the archive handler.
 
141
 
 
142
  SYNOPSIS
 
143
    archive_db_init()
 
144
    void *
 
145
 
 
146
  RETURN
 
147
    false       OK
 
148
    true        Error
 
149
*/
 
150
 
 
151
int archive_db_init(void *p)
 
152
{
 
153
  handlerton *archive_hton;
 
154
 
 
155
  archive_hton= (handlerton *)p;
 
156
  archive_hton->state= SHOW_OPTION_YES;
 
157
  archive_hton->create= archive_create_handler;
 
158
  archive_hton->flags= HTON_NO_FLAGS;
 
159
 
 
160
  /* When the engine starts up set the first version */
 
161
  global_version= 1;
 
162
 
 
163
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
 
164
    goto error;
 
165
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
 
166
                (hash_get_key) archive_get_key, 0, 0))
 
167
  {
 
168
    pthread_mutex_destroy(&archive_mutex);
 
169
  }
 
170
  else
 
171
  {
 
172
    return(false);
 
173
  }
 
174
error:
 
175
  return(true);
 
176
}
 
177
 
 
178
/*
 
179
  Release the archive handler.
 
180
 
 
181
  SYNOPSIS
 
182
    archive_db_done()
 
183
    void
 
184
 
 
185
  RETURN
 
186
    false       OK
 
187
*/
 
188
 
 
189
int archive_db_done(void *)
 
190
{
 
191
  hash_free(&archive_open_tables);
 
192
  pthread_mutex_destroy(&archive_mutex);
 
193
 
 
194
  return 0;
 
195
}
 
196
 
 
197
 
 
198
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
 
199
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
231
200
{
232
201
  /* Set our original buffer from pre-allocated memory */
233
202
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
234
203
 
235
204
  /* The size of the offset value we will use for position() */
236
 
  ref_length= sizeof(internal::my_off_t);
 
205
  ref_length= sizeof(my_off_t);
237
206
  archive_reader_open= false;
238
207
}
239
208
 
251
220
  return(1);
252
221
}
253
222
 
254
 
ArchiveShare::ArchiveShare():
255
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
256
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
257
 
{
258
 
  assert(1);
259
 
}
260
 
 
261
 
ArchiveShare::ArchiveShare(const char *name):
262
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
263
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
264
 
{
265
 
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
266
 
  table_name.append(name);
267
 
  internal::fn_format(data_file_name, table_name.c_str(), "",
268
 
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
269
 
  /*
270
 
    We will use this lock for rows.
271
 
  */
272
 
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
273
 
}
274
 
 
275
 
ArchiveShare::~ArchiveShare()
276
 
{
277
 
  thr_lock_delete(&lock);
278
 
  pthread_mutex_destroy(&mutex);
279
 
  /*
280
 
    We need to make sure we don't reset the crashed state.
281
 
    If we open a crashed file, wee need to close it as crashed unless
282
 
    it has been repaired.
283
 
    Since we will close the data down after this, we go on and count
284
 
    the flush on close;
285
 
  */
286
 
  if (archive_write_open == true)
287
 
    (void)azclose(&archive_write);
288
 
  pthread_mutex_destroy(&archive_mutex);
289
 
}
290
 
 
291
 
bool ArchiveShare::prime(uint64_t *auto_increment)
292
 
{
293
 
  azio_stream archive_tmp;
294
 
 
295
 
  /*
296
 
    We read the meta file, but do not mark it dirty. Since we are not
297
 
    doing a write we won't mark it dirty (and we won't open it for
298
 
    anything but reading... open it for write and we will generate null
299
 
    compression writes).
300
 
  */
301
 
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
302
 
               AZ_METHOD_BLOCK)))
303
 
    return false;
304
 
 
305
 
  *auto_increment= archive_tmp.auto_increment + 1;
306
 
  rows_recorded= (ha_rows)archive_tmp.rows;
307
 
  crashed= archive_tmp.dirty;
308
 
  if (version < global_version)
309
 
  {
310
 
    version_rows= rows_recorded;
311
 
    version= global_version;
312
 
  }
313
 
  azclose(&archive_tmp);
314
 
 
315
 
  return true;
316
 
}
317
 
 
318
223
 
319
224
/*
320
225
  We create the shared memory space that we will use for the open table.
323
228
 
324
229
  See ha_example.cc for a longer description.
325
230
*/
326
 
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
 
231
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
327
232
{
 
233
  uint32_t length;
 
234
 
328
235
  pthread_mutex_lock(&archive_mutex);
329
 
 
330
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
331
 
  share= a_engine->findOpenTable(table_name);
332
 
 
333
 
  if (!share)
 
236
  length=(uint) strlen(table_name);
 
237
 
 
238
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
 
239
                                           (unsigned char*) table_name,
 
240
                                           length)))
334
241
  {
335
 
    share= new ArchiveShare(table_name);
 
242
    char *tmp_name;
 
243
    azio_stream archive_tmp;
336
244
 
337
 
    if (share == NULL)
 
245
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
246
                          &share, sizeof(*share),
 
247
                          &tmp_name, length+1,
 
248
                          NULL))
338
249
    {
339
250
      pthread_mutex_unlock(&archive_mutex);
340
251
      *rc= HA_ERR_OUT_OF_MEM;
341
252
      return(NULL);
342
253
    }
343
254
 
344
 
    if (share->prime(&stats.auto_increment_value) == false)
 
255
    share->use_count= 0;
 
256
    share->table_name_length= length;
 
257
    share->table_name= tmp_name;
 
258
    share->crashed= false;
 
259
    share->archive_write_open= false;
 
260
    fn_format(share->data_file_name, table_name, "",
 
261
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
262
    strcpy(share->table_name, table_name);
 
263
    /*
 
264
      We will use this lock for rows.
 
265
    */
 
266
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
 
267
 
 
268
    /*
 
269
      We read the meta file, but do not mark it dirty. Since we are not
 
270
      doing a write we won't mark it dirty (and we won't open it for
 
271
      anything but reading... open it for write and we will generate null
 
272
      compression writes).
 
273
    */
 
274
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY,
 
275
                 AZ_METHOD_BLOCK)))
345
276
    {
 
277
      pthread_mutex_destroy(&share->mutex);
 
278
      free(share);
346
279
      pthread_mutex_unlock(&archive_mutex);
347
280
      *rc= HA_ERR_CRASHED_ON_REPAIR;
348
 
      delete share;
349
 
 
350
 
      return NULL;
351
 
    }
352
 
 
353
 
    a_engine->addOpenTable(share->table_name, share);
 
281
      return(NULL);
 
282
    }
 
283
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
 
284
    share->rows_recorded= (ha_rows)archive_tmp.rows;
 
285
    share->crashed= archive_tmp.dirty;
 
286
    if (share->version < global_version)
 
287
    {
 
288
      share->version_rows= share->rows_recorded;
 
289
      share->version= global_version;
 
290
    }
 
291
    azclose(&archive_tmp);
 
292
 
 
293
    my_hash_insert(&archive_open_tables, (unsigned char*) share);
354
294
    thr_lock_init(&share->lock);
355
295
  }
356
296
  share->use_count++;
357
 
 
358
297
  if (share->crashed)
359
298
    *rc= HA_ERR_CRASHED_ON_USAGE;
360
299
  pthread_mutex_unlock(&archive_mutex);
369
308
*/
370
309
int ha_archive::free_share()
371
310
{
 
311
  int rc= 0;
 
312
 
372
313
  pthread_mutex_lock(&archive_mutex);
373
314
  if (!--share->use_count)
374
315
  {
375
 
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
376
 
    a_engine->deleteOpenTable(share->table_name);
377
 
    delete share;
 
316
    hash_delete(&archive_open_tables, (unsigned char*) share);
 
317
    thr_lock_delete(&share->lock);
 
318
    pthread_mutex_destroy(&share->mutex);
 
319
    /*
 
320
      We need to make sure we don't reset the crashed state.
 
321
      If we open a crashed file, wee need to close it as crashed unless
 
322
      it has been repaired.
 
323
      Since we will close the data down after this, we go on and count
 
324
      the flush on close;
 
325
    */
 
326
    if (share->archive_write_open == true)
 
327
    {
 
328
      if (azclose(&(share->archive_write)))
 
329
        rc= 1;
 
330
    }
 
331
    free((unsigned char*) share);
378
332
  }
379
333
  pthread_mutex_unlock(&archive_mutex);
380
334
 
381
 
  return 0;
 
335
  return(rc);
382
336
}
383
337
 
384
338
int ha_archive::init_archive_writer()
401
355
 
402
356
 
403
357
/*
404
 
  No locks are required because it is associated with just one Cursor instance
 
358
  No locks are required because it is associated with just one handler instance
405
359
*/
406
360
int ha_archive::init_archive_reader()
407
361
{
414
368
  {
415
369
    az_method method;
416
370
 
417
 
    switch (archive_aio_state())
 
371
    switch (archive_use_aio)
418
372
    {
419
373
    case false:
420
374
      method= AZ_METHOD_BLOCK;
437
391
  return(0);
438
392
}
439
393
 
 
394
 
 
395
/*
 
396
  We just implement one additional file extension.
 
397
*/
 
398
static const char *ha_archive_exts[] = {
 
399
  ARZ,
 
400
  NULL
 
401
};
 
402
 
 
403
const char **ha_archive::bas_ext() const
 
404
{
 
405
  return ha_archive_exts;
 
406
}
 
407
 
 
408
 
440
409
/*
441
410
  When opening a file we:
442
411
  Create/get our shared structure.
443
412
  Init out lock.
444
413
  We open the file we will read from.
445
414
*/
446
 
int ha_archive::open(const char *name, int, uint32_t)
 
415
int ha_archive::open(const char *name, int, uint32_t open_options)
447
416
{
448
417
  int rc= 0;
449
418
  share= get_share(name, &rc);
450
419
 
451
 
  /** 
452
 
    We either fix it ourselves, or we just take it offline 
453
 
 
454
 
    @todo Create some documentation in the recovery tools shipped with the engine.
455
 
  */
456
 
  if (rc == HA_ERR_CRASHED_ON_USAGE)
 
420
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
457
421
  {
 
422
    /* purecov: begin inspected */
458
423
    free_share();
459
 
    rc= repair();
460
 
 
461
 
    return 0;
 
424
    return(rc);
 
425
    /* purecov: end */
462
426
  }
463
427
  else if (rc == HA_ERR_OUT_OF_MEM)
464
428
  {
478
442
 
479
443
  thr_lock_data_init(&share->lock, &lock, NULL);
480
444
 
481
 
  return(rc);
 
445
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
 
446
  {
 
447
    return(0);
 
448
  }
 
449
  else
 
450
    return(rc);
482
451
}
483
452
 
484
453
 
527
496
  of creation.
528
497
*/
529
498
 
530
 
int ArchiveEngine::doCreateTable(Session &,
531
 
                                 Table& table_arg,
532
 
                                 drizzled::TableIdentifier &identifier,
533
 
                                 drizzled::message::Table& proto)
 
499
int ha_archive::create(const char *name, Table *table_arg,
 
500
                       HA_CREATE_INFO *create_info)
534
501
{
535
502
  char name_buff[FN_REFLEN];
536
 
  int error= 0;
 
503
  char linkname[FN_REFLEN];
 
504
  int error;
537
505
  azio_stream create_stream;            /* Archive file we are working with */
538
 
  uint64_t auto_increment_value;
539
 
  string serialized_proto;
540
 
 
541
 
  auto_increment_value= proto.options().auto_increment_value();
542
 
 
543
 
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
 
506
  File frm_file;                   /* File handler for readers */
 
507
  struct stat file_stat;
 
508
  unsigned char *frm_ptr;
 
509
  int r;
 
510
 
 
511
  stats.auto_increment_value= create_info->auto_increment_value;
 
512
 
 
513
  for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
544
514
  {
545
 
    KEY *pos= table_arg.key_info+key;
 
515
    KEY *pos= table_arg->key_info+key;
546
516
    KEY_PART_INFO *key_part=     pos->key_part;
547
517
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
548
518
 
561
531
  /*
562
532
    We reuse name_buff since it is available.
563
533
  */
564
 
  internal::fn_format(name_buff, identifier.getPath().c_str(), "", ARZ,
565
 
                      MY_REPLACE_EXT | MY_UNPACK_FILENAME);
566
 
 
567
 
  errno= 0;
568
 
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
569
 
             AZ_METHOD_BLOCK) == 0)
 
534
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
 
535
  {
 
536
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
 
537
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
538
    fn_format(linkname, name, "", ARZ,
 
539
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
540
  }
 
541
  else
 
542
  {
 
543
    fn_format(name_buff, name, "", ARZ,
 
544
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
545
    linkname[0]= 0;
 
546
  }
 
547
 
 
548
  /*
 
549
    There is a chance that the file was "discovered". In this case
 
550
    just use whatever file is there.
 
551
  */
 
552
  r= stat(name_buff, &file_stat);
 
553
  if (r == -1 && errno!=ENOENT)
 
554
  {
 
555
    return errno;
 
556
  }
 
557
  if (!r)
 
558
    return HA_ERR_TABLE_EXIST;
 
559
 
 
560
  my_errno= 0;
 
561
  if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
 
562
               AZ_METHOD_BLOCK)))
570
563
  {
571
564
    error= errno;
572
565
    goto error2;
573
566
  }
574
567
 
575
 
  proto.SerializeToString(&serialized_proto);
576
 
 
577
 
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
578
 
                  serialized_proto.length()))
579
 
    goto error2;
580
 
 
581
 
  if (proto.options().has_comment())
 
568
  if (linkname[0])
 
569
    my_symlink(name_buff, linkname, MYF(0));
 
570
  fn_format(name_buff, name, "", ".frm",
 
571
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
572
 
 
573
  /*
 
574
    Here is where we open up the frm and pass it to archive to store
 
575
  */
 
576
  if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
582
577
  {
583
 
    int write_length;
584
 
 
585
 
    write_length= azwrite_comment(&create_stream,
586
 
                                  proto.options().comment().c_str(),
587
 
                                  proto.options().comment().length());
588
 
 
589
 
    if (write_length < 0)
 
578
    if (fstat(frm_file, &file_stat))
590
579
    {
591
 
      error= errno;
592
 
      goto error2;
 
580
      frm_ptr= (unsigned char *)malloc(sizeof(unsigned char) *
 
581
                                       file_stat.st_size);
 
582
      if (frm_ptr)
 
583
      {
 
584
        my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
 
585
        azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
 
586
        free((unsigned char*)frm_ptr);
 
587
      }
593
588
    }
 
589
    my_close(frm_file, MYF(0));
594
590
  }
595
591
 
 
592
  if (create_info->comment.str)
 
593
    azwrite_comment(&create_stream, create_info->comment.str,
 
594
                    (unsigned int)create_info->comment.length);
 
595
 
596
596
  /*
597
597
    Yes you need to do this, because the starting value
598
598
    for the autoincrement may not be zero.
599
599
  */
600
 
  create_stream.auto_increment= auto_increment_value ?
601
 
    auto_increment_value - 1 : 0;
602
 
 
 
600
  create_stream.auto_increment= stats.auto_increment_value ?
 
601
    stats.auto_increment_value - 1 : 0;
603
602
  if (azclose(&create_stream))
604
603
  {
605
604
    error= errno;
609
608
  return(0);
610
609
 
611
610
error2:
612
 
  unlink(name_buff);
613
 
 
 
611
  delete_table(name);
614
612
error:
615
613
  /* Return error number, if we got one */
616
614
  return(error ? error : -1);
621
619
*/
622
620
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
623
621
{
624
 
  off_t written;
 
622
  my_off_t written;
625
623
  unsigned int r_pack_length;
626
624
 
627
625
  /* We pack the row for writing */
667
665
  unsigned char *ptr;
668
666
 
669
667
  if (fix_rec_buff(max_row_length(record)))
670
 
    return(HA_ERR_OUT_OF_MEM);
 
668
    return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
671
669
 
672
670
  /* Copy null bits */
673
671
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
702
700
  if (share->crashed)
703
701
    return(HA_ERR_CRASHED_ON_USAGE);
704
702
 
705
 
  ha_statistic_increment(&system_status_var::ha_write_count);
 
703
  ha_statistic_increment(&SSV::ha_write_count);
 
704
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
705
    table->timestamp_field->set_time();
706
706
  pthread_mutex_lock(&share->mutex);
707
707
 
708
708
  if (share->archive_write_open == false)
726
726
      rc= HA_ERR_FOUND_DUPP_KEY;
727
727
      goto error;
728
728
    }
 
729
#ifdef DEAD_CODE
 
730
    /*
 
731
      Bad news, this will cause a search for the unique value which is very
 
732
      expensive since we will have to do a table scan which will lock up
 
733
      all other writers during this period. This could perhaps be optimized
 
734
      in the future.
 
735
    */
 
736
    {
 
737
      /*
 
738
        First we create a buffer that we can use for reading rows, and can pass
 
739
        to get_row().
 
740
      */
 
741
      if (!(read_buf= (unsigned char*) malloc(table->s->reclength)))
 
742
      {
 
743
        rc= HA_ERR_OUT_OF_MEM;
 
744
        goto error;
 
745
      }
 
746
       /*
 
747
         All of the buffer must be written out or we won't see all of the
 
748
         data
 
749
       */
 
750
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
751
      /*
 
752
        Set the position of the local read thread to the beginning postion.
 
753
      */
 
754
      if (read_data_header(&archive))
 
755
      {
 
756
        rc= HA_ERR_CRASHED_ON_USAGE;
 
757
        goto error;
 
758
      }
 
759
 
 
760
      Field *mfield= table->next_number_field;
 
761
 
 
762
      while (!(get_row(&archive, read_buf)))
 
763
      {
 
764
        if (!memcmp(read_buf + mfield->offset(record),
 
765
                    table->next_number_field->ptr,
 
766
                    mfield->max_display_length()))
 
767
        {
 
768
          rc= HA_ERR_FOUND_DUPP_KEY;
 
769
          goto error;
 
770
        }
 
771
      }
 
772
    }
 
773
#endif
729
774
    else
730
775
    {
731
776
      if (temp_auto > share->archive_write.auto_increment)
936
981
    return(HA_ERR_END_OF_FILE);
937
982
  scan_rows--;
938
983
 
939
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
984
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
940
985
  current_position= aztell(&archive);
941
986
  rc= get_row(&archive, buf);
942
987
 
947
992
 
948
993
 
949
994
/*
950
 
  Thanks to the table bool is_ordered this will be called after
 
995
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
951
996
  each call to ha_archive::rnd_next() if an ordering of the rows is
952
997
  needed.
953
998
*/
954
999
 
955
1000
void ha_archive::position(const unsigned char *)
956
1001
{
957
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1002
  my_store_ptr(ref, ref_length, current_position);
958
1003
  return;
959
1004
}
960
1005
 
968
1013
 
969
1014
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
970
1015
{
971
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
972
 
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
 
1016
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1017
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
973
1018
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
974
1019
    return(HA_ERR_CRASHED_ON_USAGE);
975
1020
  return(get_row(&archive, buf));
980
1025
  rewriting the meta file. Currently it does this by calling optimize with
981
1026
  the extended flag.
982
1027
*/
983
 
int ha_archive::repair()
 
1028
int ha_archive::repair(Session* session, HA_CHECK_OPT* check_opt)
984
1029
{
985
 
  int rc= optimize();
 
1030
  check_opt->flags= T_EXTEND;
 
1031
  int rc= optimize(session, check_opt);
986
1032
 
987
1033
  if (rc)
988
1034
    return(HA_ERR_CRASHED_ON_REPAIR);
995
1041
  The table can become fragmented if data was inserted, read, and then
996
1042
  inserted again. What we do is open up the file and recompress it completely.
997
1043
*/
998
 
int ha_archive::optimize()
 
1044
int ha_archive::optimize(Session *, HA_CHECK_OPT *)
999
1045
{
1000
1046
  int rc= 0;
1001
1047
  azio_stream writer;
1010
1056
    share->archive_write_open= false;
1011
1057
  }
1012
1058
 
1013
 
  char* proto_string;
1014
 
  proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
1015
 
  if (proto_string == NULL)
1016
 
  {
1017
 
    return ENOMEM;
1018
 
  }
1019
 
  azread_frm(&archive, proto_string);
1020
 
 
1021
1059
  /* Lets create a file to contain the new data */
1022
 
  internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
 
1060
  fn_format(writer_filename, share->table_name, "", ARN,
1023
1061
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1024
1062
 
1025
1063
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1026
 
  {
1027
 
    free(proto_string);
1028
1064
    return(HA_ERR_CRASHED_ON_USAGE);
1029
 
  }
1030
 
 
1031
 
  azwrite_frm(&writer, proto_string, archive.frm_length);
1032
1065
 
1033
1066
  /*
1034
1067
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1075
1108
        if (table->found_next_number_field)
1076
1109
        {
1077
1110
          Field *field= table->found_next_number_field;
1078
 
 
1079
 
          /* Since we will need to use field to translate, we need to flip its read bit */
1080
 
          field->setReadSet();
1081
 
 
1082
1111
          uint64_t auto_value=
1083
1112
            (uint64_t) field->val_int(table->record[0] +
1084
1113
                                       field->offset(table->record[0]));
1102
1131
  azclose(&archive);
1103
1132
 
1104
1133
  // make the file we just wrote be our data file
1105
 
  rc = internal::my_rename(writer_filename,share->data_file_name,MYF(0));
1106
 
 
1107
 
  free(proto_string);
 
1134
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
 
1135
 
 
1136
 
1108
1137
  return(rc);
1109
1138
error:
1110
 
  free(proto_string);
1111
1139
  azclose(&writer);
1112
1140
 
1113
1141
  return(rc);
1120
1148
                                       THR_LOCK_DATA **to,
1121
1149
                                       enum thr_lock_type lock_type)
1122
1150
{
1123
 
  delayed_insert= false;
 
1151
  if (lock_type == TL_WRITE_DELAYED)
 
1152
    delayed_insert= true;
 
1153
  else
 
1154
    delayed_insert= false;
1124
1155
 
1125
1156
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1126
1157
  {
1132
1163
    */
1133
1164
 
1134
1165
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1135
 
         lock_type <= TL_WRITE)
 
1166
         lock_type <= TL_WRITE) && !session_in_lock_tables(session)
1136
1167
        && !session_tablespace_op(session))
1137
1168
      lock_type = TL_WRITE_ALLOW_WRITE;
1138
1169
 
1144
1175
      concurrent inserts to t2.
1145
1176
    */
1146
1177
 
1147
 
    if (lock_type == TL_READ_NO_INSERT)
 
1178
    if (lock_type == TL_READ_NO_INSERT && !session_in_lock_tables(session))
1148
1179
      lock_type = TL_READ;
1149
1180
 
1150
1181
    lock.type=lock_type;
1155
1186
  return to;
1156
1187
}
1157
1188
 
 
1189
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
 
1190
{
 
1191
  ha_archive::info(HA_STATUS_AUTO);
 
1192
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
 
1193
  {
 
1194
    create_info->auto_increment_value= stats.auto_increment_value;
 
1195
  }
 
1196
 
 
1197
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
 
1198
    create_info->data_file_name= share->real_path;
 
1199
 
 
1200
  return;
 
1201
}
 
1202
 
 
1203
 
1158
1204
/*
1159
1205
  Hints for optimizer, see ha_tina for more information
1160
1206
*/
1253
1299
}
1254
1300
 
1255
1301
/*
 
1302
  We just return state if asked.
 
1303
*/
 
1304
bool ha_archive::is_crashed() const
 
1305
{
 
1306
  return(share->crashed);
 
1307
}
 
1308
 
 
1309
/*
1256
1310
  Simple scan of the tables to make sure everything is ok.
1257
1311
*/
1258
1312
 
1259
 
int ha_archive::check(Session* session)
 
1313
int ha_archive::check(Session* session, HA_CHECK_OPT *)
1260
1314
{
1261
1315
  int rc= 0;
1262
1316
  const char *old_proc_info;
1297
1351
  }
1298
1352
}
1299
1353
 
 
1354
/*
 
1355
  Check and repair the table if needed.
 
1356
*/
 
1357
bool ha_archive::check_and_repair(Session *session)
 
1358
{
 
1359
  HA_CHECK_OPT check_opt;
 
1360
 
 
1361
  check_opt.init();
 
1362
 
 
1363
  return(repair(session, &check_opt));
 
1364
}
 
1365
 
1300
1366
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1301
1367
{
1302
1368
  archive_record_buffer *r;
1303
1369
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1304
1370
  {
1305
 
    return(NULL);
 
1371
    return(NULL); /* purecov: inspected */
1306
1372
  }
1307
1373
  r->length= (int)length;
1308
1374
 
1309
1375
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
1310
1376
  {
1311
1377
    free((char*) r);
1312
 
    return(NULL);
 
1378
    return(NULL); /* purecov: inspected */
1313
1379
  }
1314
1380
 
1315
1381
  return(r);
1322
1388
  return;
1323
1389
}
1324
1390
 
1325
 
int ArchiveEngine::doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to)
1326
 
{
1327
 
  int error= 0;
1328
 
 
1329
 
  for (const char **ext= bas_ext(); *ext ; ext++)
1330
 
  {
1331
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1332
 
    {
1333
 
      if ((error=errno) != ENOENT)
1334
 
        break;
1335
 
      error= 0;
1336
 
    }
1337
 
  }
1338
 
 
1339
 
  return error;
1340
 
}
1341
 
 
1342
 
bool ArchiveEngine::doDoesTableExist(Session&,
1343
 
                                     TableIdentifier &identifier)
1344
 
{
1345
 
  string proto_path(identifier.getPath());
1346
 
  proto_path.append(ARZ);
1347
 
 
1348
 
  if (access(proto_path.c_str(), F_OK))
1349
 
  {
1350
 
    return false;
1351
 
  }
1352
 
 
1353
 
  return true;
1354
 
}
1355
 
 
1356
 
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1357
 
                                          drizzled::SchemaIdentifier &schema_identifier,
1358
 
                                          drizzled::TableIdentifiers &set_of_identifiers)
1359
 
{
1360
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1361
 
 
1362
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
1363
 
       entry_iter != entries.end(); ++entry_iter)
1364
 
  {
1365
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
1366
 
    const string *filename= &entry->filename;
1367
 
 
1368
 
    assert(filename->size());
1369
 
 
1370
 
    const char *ext= strchr(filename->c_str(), '.');
1371
 
 
1372
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1373
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1374
 
    {  }
1375
 
    else
1376
 
    {
1377
 
      char uname[NAME_LEN + 1];
1378
 
      uint32_t file_name_len;
1379
 
 
1380
 
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1381
 
      // TODO: Remove need for memory copy here
1382
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1383
 
 
1384
 
      set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1385
 
    }
1386
 
  }
1387
 
}
 
1391
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
 
1392
  PLUGIN_VAR_NOCMDOPT,
 
1393
  "Whether or not to use asynchronous IO.",
 
1394
  NULL, NULL, true);
 
1395
 
 
1396
static struct st_mysql_sys_var* archive_system_variables[]= {
 
1397
  DRIZZLE_SYSVAR(aio),
 
1398
  NULL
 
1399
};
 
1400
 
 
1401
mysql_declare_plugin(archive)
 
1402
{
 
1403
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
1404
  "ARCHIVE",
 
1405
  "3.5",
 
1406
  "Brian Aker, MySQL AB",
 
1407
  "Archive storage engine",
 
1408
  PLUGIN_LICENSE_GPL,
 
1409
  archive_db_init, /* Plugin Init */
 
1410
  archive_db_done, /* Plugin Deinit */
 
1411
  NULL,                       /* status variables                */
 
1412
  archive_system_variables,   /* system variables                */
 
1413
  NULL                        /* config options                  */
 
1414
}
 
1415
mysql_declare_plugin_end;
1388
1416