~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/ha_archive.cc

  • Committer: Monty Taylor
  • Date: 2008-10-30 18:44:27 UTC
  • mto: (520.4.35 devel)
  • mto: This revision was merged to the branch mainline in revision 572.
  • Revision ID: monty@inaugust.com-20081030184427-7cr1v6r5cqnuqm3v
Removed global sql_array.h.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2003 MySQL AB
2
 
   Copyright (C) 2010 Brian Aker
3
2
 
4
3
  This program is free software; you can redistribute it and/or modify
5
4
  it under the terms of the GNU General Public License as published by
12
11
 
13
12
  You should have received a copy of the GNU General Public License
14
13
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
16
 
 
17
 
 
18
 
#include "config.h"
19
 
 
20
 
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
 
 
24
 
using namespace std;
25
 
using namespace drizzled;
26
 
 
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
#include <drizzled/common_includes.h>
 
18
#include <storage/myisam/myisam.h>
 
19
 
 
20
#include "ha_archive.h"
27
21
 
28
22
/*
29
 
  First, if you want to understand storage engines you should look at
30
 
  ha_example.cc and ha_example.h.
 
23
  First, if you want to understand storage engines you should look at 
 
24
  ha_example.cc and ha_example.h. 
31
25
 
32
26
  This example was written as a test case for a customer who needed
33
27
  a storage engine without indexes that could compress data very well.
34
28
  So, welcome to a completely compressed storage engine. This storage
35
 
  engine only does inserts. No replace, deletes, or updates. All reads are
 
29
  engine only does inserts. No replace, deletes, or updates. All reads are 
36
30
  complete table scans. Compression is done through a combination of packing
37
31
  and making use of the zlib library
38
 
 
 
32
  
39
33
  We keep a file pointer open for each instance of ha_archive for each read
40
34
  but for writes we keep one open file handle just for that. We flush it
41
35
  only if we have a read occur. azip handles compressing lots of records
45
39
  the same time since we would want to flush).
46
40
 
47
41
  A "meta" file is kept alongside the data file. This file serves two purpose.
48
 
  The first purpose is to track the number of rows in the table. The second
49
 
  purpose is to determine if the table was closed properly or not. When the
50
 
  meta file is first opened it is marked as dirty. It is opened when the table
51
 
  itself is opened for writing. When the table is closed the new count for rows
52
 
  is written to the meta file and the file is marked as clean. If the meta file
53
 
  is opened and it is marked as dirty, it is assumed that a crash occured. At
 
42
  The first purpose is to track the number of rows in the table. The second 
 
43
  purpose is to determine if the table was closed properly or not. When the 
 
44
  meta file is first opened it is marked as dirty. It is opened when the table 
 
45
  itself is opened for writing. When the table is closed the new count for rows 
 
46
  is written to the meta file and the file is marked as clean. If the meta file 
 
47
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
54
48
  this point an error occurs and the user is told to rebuild the file.
55
49
  A rebuild scans the rows and rewrites the meta file. If corruption is found
56
50
  in the data file then the meta file is not repaired.
57
51
 
58
52
  At some point a recovery method for such a drastic case needs to be divised.
59
53
 
60
 
  Locks are row level, and you will get a consistant read.
 
54
  Locks are row level, and you will get a consistant read. 
61
55
 
62
56
  For performance as far as table scans go it is quite fast. I don't have
63
57
  good numbers but locally it has out performed both Innodb and MyISAM. For
64
58
  Innodb the question will be if the table can be fit into the buffer
65
59
  pool. For MyISAM its a question of how much the file system caches the
66
60
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
67
 
  doesn't have enough memory to cache entire table that archive turns out
68
 
  to be any faster.
 
61
  doesn't have enough memory to cache entire table that archive turns out 
 
62
  to be any faster. 
69
63
 
70
64
  Examples between MyISAM (packed) and Archive.
71
65
 
93
87
    -Brian
94
88
*/
95
89
 
96
 
/* When the engine starts up set the first version */
97
 
static uint64_t global_version= 1;
98
 
 
99
 
// We use this to find out the state of the archive aio option.
100
 
extern bool archive_aio_state(void);
 
90
/* Variables for archive share methods */
 
91
pthread_mutex_t archive_mutex;
 
92
static HASH archive_open_tables;
 
93
static unsigned int global_version;
 
94
 
 
95
/* The file extension */
 
96
#define ARZ ".ARZ"               // The data file
 
97
#define ARN ".ARN"               // Files used during an optimize call
 
98
 
 
99
 
 
100
/* Static declarations for handerton */
 
101
static handler *archive_create_handler(handlerton *hton, 
 
102
                                       TABLE_SHARE *table, 
 
103
                                       MEM_ROOT *mem_root);
 
104
int archive_discover(handlerton *hton, Session* session, const char *db, 
 
105
                     const char *name,
 
106
                     unsigned char **frmblob, 
 
107
                     size_t *frmlen);
 
108
 
 
109
static bool archive_use_aio= false;
101
110
 
102
111
/*
103
112
  Number of rows that will force a bulk insert.
109
118
*/
110
119
#define ARCHIVE_ROW_HEADER_SIZE 4
111
120
 
112
 
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
113
 
{
114
 
  ArchiveMap::iterator find_iter=
115
 
    archive_open_tables.find(table_name);
116
 
 
117
 
  if (find_iter != archive_open_tables.end())
118
 
    return (*find_iter).second;
119
 
  else
120
 
    return NULL;
121
 
}
122
 
 
123
 
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
124
 
{
125
 
  archive_open_tables[table_name]= share;
126
 
}
127
 
 
128
 
void ArchiveEngine::deleteOpenTable(const string &table_name)
129
 
{
130
 
  archive_open_tables.erase(table_name);
131
 
}
132
 
 
133
 
 
134
 
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
135
 
{
136
 
  string new_path(identifier.getPath());
137
 
 
138
 
  new_path+= ARZ;
139
 
 
140
 
  int error= unlink(new_path.c_str());
141
 
 
142
 
  if (error != 0)
143
 
  {
144
 
    error= errno= errno;
145
 
  }
146
 
 
147
 
  return error;
148
 
}
149
 
 
150
 
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const TableIdentifier &identifier,
152
 
                                        drizzled::message::Table &table_proto)
153
 
{
154
 
  struct stat stat_info;
155
 
  int error= ENOENT;
156
 
  string proto_path;
157
 
 
158
 
  proto_path.reserve(FN_REFLEN);
159
 
  proto_path.assign(identifier.getPath());
160
 
 
161
 
  proto_path.append(ARZ);
162
 
 
163
 
  if (stat(proto_path.c_str(),&stat_info))
164
 
    return errno;
165
 
  else
166
 
    error= EEXIST;
167
 
 
168
 
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
170
 
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
 
      return HA_ERR_CRASHED_ON_USAGE;
173
 
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
 
    if (proto_string == NULL)
176
 
    {
177
 
      azclose(proto_stream.get());
178
 
      return ENOMEM;
179
 
    }
180
 
 
181
 
    azread_frm(proto_stream.get(), proto_string);
182
 
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
 
      error= HA_ERR_CRASHED_ON_USAGE;
185
 
 
186
 
    azclose(proto_stream.get());
187
 
    free(proto_string);
188
 
  }
189
 
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
 
  return error;
197
 
}
198
 
 
199
 
 
200
 
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
202
 
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
 
121
static handler *archive_create_handler(handlerton *hton,
 
122
                                       TABLE_SHARE *table, 
 
123
                                       MEM_ROOT *mem_root)
 
124
{
 
125
  return new (mem_root) ha_archive(hton, table);
 
126
}
 
127
 
 
128
/*
 
129
  Used for hash table that tracks open tables.
 
130
*/
 
131
static unsigned char* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
 
132
                             bool not_used __attribute__((unused)))
 
133
{
 
134
  *length=share->table_name_length;
 
135
  return (unsigned char*) share->table_name;
 
136
}
 
137
 
 
138
 
 
139
/*
 
140
  Initialize the archive handler.
 
141
 
 
142
  SYNOPSIS
 
143
    archive_db_init()
 
144
    void *
 
145
 
 
146
  RETURN
 
147
    false       OK
 
148
    true        Error
 
149
*/
 
150
 
 
151
int archive_db_init(void *p)
 
152
{
 
153
  handlerton *archive_hton;
 
154
 
 
155
  archive_hton= (handlerton *)p;
 
156
  archive_hton->state= SHOW_OPTION_YES;
 
157
  archive_hton->create= archive_create_handler;
 
158
  archive_hton->flags= HTON_NO_FLAGS;
 
159
  archive_hton->discover= archive_discover;
 
160
 
 
161
  /* When the engine starts up set the first version */
 
162
  global_version= 1;
 
163
 
 
164
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
 
165
    goto error;
 
166
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
 
167
                (hash_get_key) archive_get_key, 0, 0))
 
168
  {
 
169
    pthread_mutex_destroy(&archive_mutex);
 
170
  }
 
171
  else
 
172
  {
 
173
    return(false);
 
174
  }
 
175
error:
 
176
  return(true);
 
177
}
 
178
 
 
179
/*
 
180
  Release the archive handler.
 
181
 
 
182
  SYNOPSIS
 
183
    archive_db_done()
 
184
    void
 
185
 
 
186
  RETURN
 
187
    false       OK
 
188
*/
 
189
 
 
190
int archive_db_done(void *p __attribute__((unused)))
 
191
{
 
192
  hash_free(&archive_open_tables);
 
193
  pthread_mutex_destroy(&archive_mutex);
 
194
 
 
195
  return 0;
 
196
}
 
197
 
 
198
 
 
199
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
 
200
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
203
201
{
204
202
  /* Set our original buffer from pre-allocated memory */
205
203
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
206
204
 
207
205
  /* The size of the offset value we will use for position() */
208
 
  ref_length= sizeof(internal::my_off_t);
 
206
  ref_length= sizeof(my_off_t);
209
207
  archive_reader_open= false;
210
208
}
211
209
 
 
210
int archive_discover(handlerton *hton __attribute__((unused)),
 
211
                     Session* session __attribute__((unused)),
 
212
                     const char *db,
 
213
                     const char *name,
 
214
                     unsigned char **frmblob,
 
215
                     size_t *frmlen)
 
216
{
 
217
  azio_stream frm_stream;
 
218
  char az_file[FN_REFLEN];
 
219
  char *frm_ptr;
 
220
  struct stat file_stat; 
 
221
 
 
222
  fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
223
 
 
224
  if (stat(az_file, &file_stat))
 
225
    goto err;
 
226
 
 
227
  if (!(azopen(&frm_stream, az_file, O_RDONLY, AZ_METHOD_BLOCK)))
 
228
  {
 
229
    if (errno == EROFS || errno == EACCES)
 
230
      return(my_errno= errno);
 
231
    return(HA_ERR_CRASHED_ON_USAGE);
 
232
  }
 
233
 
 
234
  if (frm_stream.frm_length == 0)
 
235
    goto err;
 
236
 
 
237
  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
 
238
  azread_frm(&frm_stream, frm_ptr);
 
239
  azclose(&frm_stream);
 
240
 
 
241
  *frmlen= frm_stream.frm_length;
 
242
  *frmblob= (unsigned char*) frm_ptr;
 
243
 
 
244
  return(0);
 
245
err:
 
246
  my_errno= 0;
 
247
  return(1);
 
248
}
 
249
 
212
250
/*
213
251
  This method reads the header of a datafile and returns whether or not it was successful.
214
252
*/
223
261
  return(1);
224
262
}
225
263
 
226
 
ArchiveShare::ArchiveShare():
227
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
229
 
{
230
 
  assert(1);
231
 
}
232
 
 
233
 
ArchiveShare::ArchiveShare(const char *name):
234
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
236
 
{
237
 
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
 
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
241
 
  /*
242
 
    We will use this lock for rows.
243
 
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
245
 
}
246
 
 
247
 
ArchiveShare::~ArchiveShare()
248
 
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
251
 
  /*
252
 
    We need to make sure we don't reset the crashed state.
253
 
    If we open a crashed file, wee need to close it as crashed unless
254
 
    it has been repaired.
255
 
    Since we will close the data down after this, we go on and count
256
 
    the flush on close;
257
 
  */
258
 
  if (archive_write_open == true)
259
 
    (void)azclose(&archive_write);
260
 
}
261
 
 
262
 
bool ArchiveShare::prime(uint64_t *auto_increment)
263
 
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
265
 
 
266
 
  /*
267
 
    We read the meta file, but do not mark it dirty. Since we are not
268
 
    doing a write we won't mark it dirty (and we won't open it for
269
 
    anything but reading... open it for write and we will generate null
270
 
    compression writes).
271
 
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
273
 
               AZ_METHOD_BLOCK)))
274
 
    return false;
275
 
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
279
 
  if (version < global_version)
280
 
  {
281
 
    version_rows= rows_recorded;
282
 
    version= global_version;
283
 
  }
284
 
  azclose(archive_tmp.get());
285
 
 
286
 
  return true;
287
 
}
288
 
 
289
264
 
290
265
/*
291
 
  We create the shared memory space that we will use for the open table.
 
266
  We create the shared memory space that we will use for the open table. 
292
267
  No matter what we try to get or create a share. This is so that a repair
293
 
  table operation can occur.
 
268
  table operation can occur. 
294
269
 
295
270
  See ha_example.cc for a longer description.
296
271
*/
297
 
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
 
272
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
298
273
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
303
 
  share= a_engine->findOpenTable(table_name);
304
 
 
305
 
  if (!share)
 
274
  uint32_t length;
 
275
 
 
276
  pthread_mutex_lock(&archive_mutex);
 
277
  length=(uint) strlen(table_name);
 
278
 
 
279
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
 
280
                                           (unsigned char*) table_name,
 
281
                                           length)))
306
282
  {
307
 
    share= new ArchiveShare(table_name);
 
283
    char *tmp_name;
 
284
    azio_stream archive_tmp;
308
285
 
309
 
    if (share == NULL)
 
286
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
287
                          &share, sizeof(*share),
 
288
                          &tmp_name, length+1,
 
289
                          NULL)) 
310
290
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
291
      pthread_mutex_unlock(&archive_mutex);
312
292
      *rc= HA_ERR_OUT_OF_MEM;
313
293
      return(NULL);
314
294
    }
315
295
 
316
 
    if (share->prime(&stats.auto_increment_value) == false)
 
296
    share->use_count= 0;
 
297
    share->table_name_length= length;
 
298
    share->table_name= tmp_name;
 
299
    share->crashed= false;
 
300
    share->archive_write_open= false;
 
301
    fn_format(share->data_file_name, table_name, "",
 
302
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
303
    my_stpcpy(share->table_name, table_name);
 
304
    /*
 
305
      We will use this lock for rows.
 
306
    */
 
307
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
 
308
    
 
309
    /*
 
310
      We read the meta file, but do not mark it dirty. Since we are not
 
311
      doing a write we won't mark it dirty (and we won't open it for
 
312
      anything but reading... open it for write and we will generate null
 
313
      compression writes).
 
314
    */
 
315
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY,
 
316
                 AZ_METHOD_BLOCK)))
317
317
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
318
      pthread_mutex_destroy(&share->mutex);
 
319
      free(share);
 
320
      pthread_mutex_unlock(&archive_mutex);
319
321
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
 
      delete share;
321
 
 
322
 
      return NULL;
323
 
    }
324
 
 
325
 
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
322
      return(NULL);
 
323
    }
 
324
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
 
325
    share->rows_recorded= (ha_rows)archive_tmp.rows;
 
326
    share->crashed= archive_tmp.dirty;
 
327
    if (share->version < global_version)
 
328
    {
 
329
      share->version_rows= share->rows_recorded;
 
330
      share->version= global_version;
 
331
    }
 
332
    azclose(&archive_tmp);
 
333
 
 
334
    my_hash_insert(&archive_open_tables, (unsigned char*) share);
 
335
    thr_lock_init(&share->lock);
327
336
  }
328
337
  share->use_count++;
329
 
 
330
338
  if (share->crashed)
331
339
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
340
  pthread_mutex_unlock(&archive_mutex);
333
341
 
334
342
  return(share);
335
343
}
336
344
 
337
345
 
338
 
/*
 
346
/* 
339
347
  Free the share.
340
348
  See ha_example.cc for a description.
341
349
*/
342
350
int ha_archive::free_share()
343
351
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
352
  int rc= 0;
345
353
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
354
  pthread_mutex_lock(&archive_mutex);
347
355
  if (!--share->use_count)
348
356
  {
349
 
    a_engine->deleteOpenTable(share->table_name);
350
 
    delete share;
 
357
    hash_delete(&archive_open_tables, (unsigned char*) share);
 
358
    thr_lock_delete(&share->lock);
 
359
    pthread_mutex_destroy(&share->mutex);
 
360
    /* 
 
361
      We need to make sure we don't reset the crashed state.
 
362
      If we open a crashed file, wee need to close it as crashed unless
 
363
      it has been repaired.
 
364
      Since we will close the data down after this, we go on and count
 
365
      the flush on close;
 
366
    */
 
367
    if (share->archive_write_open == true)
 
368
    {
 
369
      if (azclose(&(share->archive_write)))
 
370
        rc= 1;
 
371
    }
 
372
    free((unsigned char*) share);
351
373
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
374
  pthread_mutex_unlock(&archive_mutex);
353
375
 
354
 
  return 0;
 
376
  return(rc);
355
377
}
356
378
 
357
379
int ha_archive::init_archive_writer()
358
380
{
359
 
  /*
 
381
  /* 
360
382
    It is expensive to open and close the data files and since you can't have
361
383
    a gzip file that can be both read and written we keep a writer open
362
384
    that is shared amoung all open tables.
363
385
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
386
  if (!(azopen(&(share->archive_write), share->data_file_name, 
365
387
               O_RDWR, AZ_METHOD_BLOCK)))
366
388
  {
367
389
    share->crashed= true;
373
395
}
374
396
 
375
397
 
376
 
/*
377
 
  No locks are required because it is associated with just one Cursor instance
 
398
/* 
 
399
  No locks are required because it is associated with just one handler instance
378
400
*/
379
401
int ha_archive::init_archive_reader()
380
402
{
381
 
  /*
 
403
  /* 
382
404
    It is expensive to open and close the data files and since you can't have
383
405
    a gzip file that can be both read and written we keep a writer open
384
406
    that is shared amoung all open tables.
387
409
  {
388
410
    az_method method;
389
411
 
390
 
    if (archive_aio_state())
 
412
    switch (archive_use_aio)
391
413
    {
 
414
    case false:
 
415
      method= AZ_METHOD_BLOCK;
 
416
      break;
 
417
    case true:
392
418
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
419
      break;
 
420
    default:
396
421
      method= AZ_METHOD_BLOCK;
397
422
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
423
    if (!(azopen(&archive, share->data_file_name, O_RDONLY, 
399
424
                 method)))
400
425
    {
401
426
      share->crashed= true;
407
432
  return(0);
408
433
}
409
434
 
 
435
 
410
436
/*
 
437
  We just implement one additional file extension.
 
438
*/
 
439
static const char *ha_archive_exts[] = {
 
440
  ARZ,
 
441
  NULL
 
442
};
 
443
 
 
444
const char **ha_archive::bas_ext() const
 
445
{
 
446
  return ha_archive_exts;
 
447
}
 
448
 
 
449
 
 
450
/* 
411
451
  When opening a file we:
412
452
  Create/get our shared structure.
413
453
  Init out lock.
414
454
  We open the file we will read from.
415
455
*/
416
 
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
456
int ha_archive::open(const char *name,
 
457
                     int mode __attribute__((unused)),
 
458
                     uint32_t open_options)
417
459
{
418
460
  int rc= 0;
419
 
  share= get_share(identifier.getPath().c_str(), &rc);
420
 
 
421
 
  /** 
422
 
    We either fix it ourselves, or we just take it offline 
423
 
 
424
 
    @todo Create some documentation in the recovery tools shipped with the engine.
425
 
  */
426
 
  if (rc == HA_ERR_CRASHED_ON_USAGE)
 
461
  share= get_share(name, &rc);
 
462
 
 
463
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
427
464
  {
 
465
    /* purecov: begin inspected */
428
466
    free_share();
429
 
    rc= repair();
430
 
 
431
 
    return 0;
 
467
    return(rc);
 
468
    /* purecov: end */    
432
469
  }
433
470
  else if (rc == HA_ERR_OUT_OF_MEM)
434
471
  {
437
474
 
438
475
  assert(share);
439
476
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
443
 
 
444
 
  return(rc);
445
 
}
446
 
 
447
 
// Should never be called
448
 
int ha_archive::open(const char *, int, uint32_t)
449
 
{
450
 
  assert(0);
451
 
  return -1;
 
477
  record_buffer= create_record_buffer(table->s->reclength + 
 
478
                                      ARCHIVE_ROW_HEADER_SIZE);
 
479
 
 
480
  if (!record_buffer)
 
481
  {
 
482
    free_share();
 
483
    return(HA_ERR_OUT_OF_MEM);
 
484
  }
 
485
 
 
486
  thr_lock_data_init(&share->lock, &lock, NULL);
 
487
 
 
488
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
 
489
  {
 
490
    return(0);
 
491
  }
 
492
  else
 
493
    return(rc);
452
494
}
453
495
 
454
496
 
457
499
 
458
500
  SYNOPSIS
459
501
    close();
460
 
 
 
502
  
461
503
  IMPLEMENTATION:
462
504
 
463
505
  We first close this storage engines file handle to the archive and
473
515
{
474
516
  int rc= 0;
475
517
 
476
 
  record_buffer.clear();
 
518
  destroy_record_buffer(record_buffer);
477
519
 
478
520
  /* First close stream */
479
521
  if (archive_reader_open == true)
489
531
 
490
532
 
491
533
/*
492
 
  We create our data file here. The format is pretty simple.
 
534
  We create our data file here. The format is pretty simple. 
493
535
  You can read about the format of the data file above.
494
 
  Unlike other storage engines we do not "pack" our data. Since we
495
 
  are about to do a general compression, packing would just be a waste of
496
 
  CPU time. If the table has blobs they are written after the row in the order
 
536
  Unlike other storage engines we do not "pack" our data. Since we 
 
537
  are about to do a general compression, packing would just be a waste of 
 
538
  CPU time. If the table has blobs they are written after the row in the order 
497
539
  of creation.
498
540
*/
499
541
 
500
 
int ArchiveEngine::doCreateTable(Session &,
501
 
                                 Table& table_arg,
502
 
                                 const drizzled::TableIdentifier &identifier,
503
 
                                 drizzled::message::Table& proto)
 
542
int ha_archive::create(const char *name, Table *table_arg,
 
543
                       HA_CREATE_INFO *create_info)
504
544
{
505
 
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
507
 
  uint64_t auto_increment_value;
508
 
  string serialized_proto;
509
 
 
510
 
  auto_increment_value= proto.options().auto_increment_value();
511
 
 
512
 
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
 
545
  char name_buff[FN_REFLEN];
 
546
  char linkname[FN_REFLEN];
 
547
  int error;
 
548
  azio_stream create_stream;            /* Archive file we are working with */
 
549
  File frm_file;                   /* File handler for readers */
 
550
  struct stat file_stat;
 
551
  unsigned char *frm_ptr;
 
552
 
 
553
  stats.auto_increment_value= create_info->auto_increment_value;
 
554
 
 
555
  for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
513
556
  {
514
 
    KeyInfo *pos= &table_arg.key_info[key];
515
 
    KeyPartInfo *key_part=     pos->key_part;
516
 
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
557
    KEY *pos= table_arg->key_info+key;
 
558
    KEY_PART_INFO *key_part=     pos->key_part;
 
559
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
517
560
 
518
561
    for (; key_part != key_part_end; key_part++)
519
562
    {
521
564
 
522
565
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
566
      {
524
 
        return -1;
 
567
        error= -1;
 
568
        goto error;
525
569
      }
526
570
    }
527
571
  }
528
572
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
531
 
 
532
 
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
 
             AZ_METHOD_BLOCK) == 0)
535
 
  {
536
 
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
540
 
  }
541
 
 
542
 
  try {
543
 
    proto.SerializeToString(&serialized_proto);
544
 
  }
545
 
  catch (...)
546
 
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
550
 
  }
551
 
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
 
                  serialized_proto.length()))
554
 
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
558
 
  }
559
 
 
560
 
  if (proto.options().has_comment())
561
 
  {
562
 
    int write_length;
563
 
 
564
 
    write_length= azwrite_comment(create_stream.get(),
565
 
                                  proto.options().comment().c_str(),
566
 
                                  proto.options().comment().length());
567
 
 
568
 
    if (write_length < 0)
569
 
    {
570
 
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
574
 
    }
 
573
  /* 
 
574
    We reuse name_buff since it is available.
 
575
  */
 
576
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
 
577
  {
 
578
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
 
579
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
580
    fn_format(linkname, name, "", ARZ,
 
581
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
582
  }
 
583
  else
 
584
  {
 
585
    fn_format(name_buff, name, "", ARZ,
 
586
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
587
    linkname[0]= 0;
575
588
  }
576
589
 
577
590
  /*
578
 
    Yes you need to do this, because the starting value
579
 
    for the autoincrement may not be zero.
 
591
    There is a chance that the file was "discovered". In this case
 
592
    just use whatever file is there.
580
593
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
582
 
    auto_increment_value - 1 : 0;
583
 
 
584
 
  if (azclose(create_stream.get()))
 
594
  if (!stat(name_buff, &file_stat))
585
595
  {
586
 
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
596
    my_errno= 0;
 
597
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
 
598
                 AZ_METHOD_BLOCK)))
 
599
    {
 
600
      error= errno;
 
601
      goto error2;
 
602
    }
 
603
 
 
604
    if (linkname[0])
 
605
      my_symlink(name_buff, linkname, MYF(0));
 
606
    fn_format(name_buff, name, "", ".frm",
 
607
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
608
 
 
609
    /*
 
610
      Here is where we open up the frm and pass it to archive to store 
 
611
    */
 
612
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
 
613
    {
 
614
      if (fstat(frm_file, &file_stat))
 
615
      {
 
616
        frm_ptr= (unsigned char *)my_malloc(sizeof(unsigned char) * file_stat.st_size, MYF(0));
 
617
        if (frm_ptr)
 
618
        {
 
619
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
 
620
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
 
621
          free((unsigned char*)frm_ptr);
 
622
        }
 
623
      }
 
624
      my_close(frm_file, MYF(0));
 
625
    }
 
626
 
 
627
    if (create_info->comment.str)
 
628
      azwrite_comment(&create_stream, create_info->comment.str, 
 
629
                      (unsigned int)create_info->comment.length);
 
630
 
 
631
    /* 
 
632
      Yes you need to do this, because the starting value 
 
633
      for the autoincrement may not be zero.
 
634
    */
 
635
    create_stream.auto_increment= stats.auto_increment_value ?
 
636
                                    stats.auto_increment_value - 1 : 0;
 
637
    if (azclose(&create_stream))
 
638
    {
 
639
      error= errno;
 
640
      goto error2;
 
641
    }
590
642
  }
 
643
  else
 
644
    my_errno= 0;
591
645
 
592
646
  return(0);
 
647
 
 
648
error2:
 
649
  delete_table(name);
 
650
error:
 
651
  /* Return error number, if we got one */
 
652
  return(error ? error : -1);
593
653
}
594
654
 
595
655
/*
597
657
*/
598
658
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
599
659
{
600
 
  off_t written;
 
660
  my_off_t written;
601
661
  unsigned int r_pack_length;
602
662
 
603
663
  /* We pack the row for writing */
604
664
  r_pack_length= pack_row(buf);
605
665
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
666
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
667
  if (written != r_pack_length)
608
668
  {
609
669
    return(-1);
616
676
}
617
677
 
618
678
 
619
 
/*
 
679
/* 
620
680
  Calculate max length needed for row. This includes
621
681
  the bytes required for the length in the header.
622
682
*/
623
683
 
624
 
uint32_t ha_archive::max_row_length(const unsigned char *)
 
684
uint32_t ha_archive::max_row_length(const unsigned char *buf __attribute__((unused)))
625
685
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
686
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
687
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
688
 
629
689
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
690
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
691
       ptr != end ;
632
692
       ptr++)
633
693
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
694
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
635
695
  }
636
696
 
637
697
  return length;
643
703
  unsigned char *ptr;
644
704
 
645
705
  if (fix_rec_buff(max_row_length(record)))
646
 
    return(HA_ERR_OUT_OF_MEM);
 
706
    return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
647
707
 
648
708
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
709
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
710
  ptr= record_buffer->buffer + table->s->null_bytes;
651
711
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
712
  for (Field **field=table->field ; *field ; field++)
653
713
  {
654
714
    if (!((*field)->is_null()))
655
715
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
716
  }
657
717
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
718
  return((unsigned int) (ptr - record_buffer->buffer));
659
719
}
660
720
 
661
721
 
662
 
/*
 
722
/* 
663
723
  Look at ha_archive::open() for an explanation of the row format.
664
724
  Here we just write out the row.
665
725
 
666
726
  Wondering about start_bulk_insert()? We don't implement it for
667
727
  archive since it optimizes for lots of writes. The only save
668
 
  for implementing start_bulk_insert() is that we could skip
 
728
  for implementing start_bulk_insert() is that we could skip 
669
729
  setting dirty to true each time.
670
730
*/
671
 
int ha_archive::doInsertRecord(unsigned char *buf)
 
731
int ha_archive::write_row(unsigned char *buf)
672
732
{
673
733
  int rc;
674
734
  unsigned char *read_buf= NULL;
675
735
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
736
  unsigned char *record=  table->record[0];
677
737
 
678
738
  if (share->crashed)
679
739
    return(HA_ERR_CRASHED_ON_USAGE);
680
740
 
681
 
  pthread_mutex_lock(&share->mutex());
 
741
  ha_statistic_increment(&SSV::ha_write_count);
 
742
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
743
    table->timestamp_field->set_time();
 
744
  pthread_mutex_lock(&share->mutex);
682
745
 
683
746
  if (share->archive_write_open == false)
684
747
    if (init_archive_writer())
685
748
      return(HA_ERR_CRASHED_ON_USAGE);
686
749
 
687
750
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
751
  if (table->next_number_field && record == table->record[0])
689
752
  {
 
753
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
754
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
755
    temp_auto= table->next_number_field->val_int();
692
756
 
693
757
    /*
694
758
      We don't support decremening auto_increment. They make the performance
695
759
      just cry.
696
760
    */
697
 
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
761
    if (temp_auto <= share->archive_write.auto_increment && 
 
762
        mkey->flags & HA_NOSAME)
699
763
    {
700
764
      rc= HA_ERR_FOUND_DUPP_KEY;
701
765
      goto error;
702
766
    }
 
767
#ifdef DEAD_CODE
 
768
    /*
 
769
      Bad news, this will cause a search for the unique value which is very 
 
770
      expensive since we will have to do a table scan which will lock up 
 
771
      all other writers during this period. This could perhaps be optimized 
 
772
      in the future.
 
773
    */
 
774
    {
 
775
      /* 
 
776
        First we create a buffer that we can use for reading rows, and can pass
 
777
        to get_row().
 
778
      */
 
779
      if (!(read_buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
780
      {
 
781
        rc= HA_ERR_OUT_OF_MEM;
 
782
        goto error;
 
783
      }
 
784
       /* 
 
785
         All of the buffer must be written out or we won't see all of the
 
786
         data 
 
787
       */
 
788
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
789
      /*
 
790
        Set the position of the local read thread to the beginning postion.
 
791
      */
 
792
      if (read_data_header(&archive))
 
793
      {
 
794
        rc= HA_ERR_CRASHED_ON_USAGE;
 
795
        goto error;
 
796
      }
 
797
 
 
798
      Field *mfield= table->next_number_field;
 
799
 
 
800
      while (!(get_row(&archive, read_buf)))
 
801
      {
 
802
        if (!memcmp(read_buf + mfield->offset(record),
 
803
                    table->next_number_field->ptr,
 
804
                    mfield->max_display_length()))
 
805
        {
 
806
          rc= HA_ERR_FOUND_DUPP_KEY;
 
807
          goto error;
 
808
        }
 
809
      }
 
810
    }
 
811
#endif
703
812
    else
704
813
    {
705
814
      if (temp_auto > share->archive_write.auto_increment)
715
824
  share->rows_recorded++;
716
825
  rc= real_write_row(buf,  &(share->archive_write));
717
826
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
827
  pthread_mutex_unlock(&share->mutex);
719
828
  if (read_buf)
720
829
    free((unsigned char*) read_buf);
721
830
 
723
832
}
724
833
 
725
834
 
726
 
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
727
 
                                    uint64_t *first_value, uint64_t *nb_reserved_values)
 
835
void ha_archive::get_auto_increment(uint64_t offset __attribute__((unused)),
 
836
                                    uint64_t increment __attribute__((unused)),
 
837
                                    uint64_t nb_desired_values __attribute__((unused)),
 
838
                                    uint64_t *first_value __attribute__((unused)),
 
839
                                    uint64_t *nb_reserved_values __attribute__((unused)))
728
840
{
729
841
  *nb_reserved_values= UINT64_MAX;
730
842
  *first_value= share->archive_write.auto_increment + 1;
731
843
}
732
844
 
733
 
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
 
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
845
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
846
int ha_archive::index_init(uint32_t keynr, bool sorted __attribute__((unused)))
735
847
{
736
848
  active_index= keynr;
737
849
  return(0);
743
855
  the optimizer that we have unique indexes, we scan
744
856
*/
745
857
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
 
                             uint32_t key_len, enum ha_rkey_function)
 
858
                             uint32_t key_len, enum ha_rkey_function find_flag)
 
859
{
 
860
  int rc;
 
861
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
 
862
  return(rc);
 
863
}
 
864
 
 
865
 
 
866
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
 
867
                               uint32_t key_len,
 
868
                               enum ha_rkey_function find_flag __attribute__((unused)))
747
869
{
748
870
  int rc;
749
871
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
872
  KEY *mkey= &table->s->key_info[index];
 
873
  current_k_offset= mkey->key_part->offset;
751
874
  current_key= key;
752
875
  current_key_len= key_len;
753
876
 
754
 
  rc= doStartTableScan(true);
 
877
  rc= rnd_init(true);
755
878
 
756
879
  if (rc)
757
880
    goto error;
773
896
}
774
897
 
775
898
 
776
 
int ha_archive::index_next(unsigned char * buf)
777
 
{
 
899
int ha_archive::index_next(unsigned char * buf) 
 
900
778
901
  bool found= 0;
779
902
 
780
903
  while (!(get_row(&archive, buf)))
786
909
    }
787
910
  }
788
911
 
789
 
  return(found ? 0 : HA_ERR_END_OF_FILE);
 
912
  return(found ? 0 : HA_ERR_END_OF_FILE); 
790
913
}
791
914
 
792
915
/*
795
918
  we assume the position will be set.
796
919
*/
797
920
 
798
 
int ha_archive::doStartTableScan(bool scan)
 
921
int ha_archive::rnd_init(bool scan)
799
922
{
800
923
  if (share->crashed)
801
924
      return(HA_ERR_CRASHED_ON_USAGE);
814
937
 
815
938
 
816
939
/*
817
 
  This is the method that is used to read a row. It assumes that the row is
 
940
  This is the method that is used to read a row. It assumes that the row is 
818
941
  positioned where you want it.
819
942
*/
820
943
int ha_archive::get_row(azio_stream *file_to_read, unsigned char *buf)
832
955
/* Reallocate buffer if needed */
833
956
bool ha_archive::fix_rec_buff(unsigned int length)
834
957
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
958
  assert(record_buffer->buffer);
 
959
 
 
960
  if (length > record_buffer->length)
 
961
  {
 
962
    unsigned char *newptr;
 
963
    if (!(newptr=(unsigned char*) my_realloc((unsigned char*) record_buffer->buffer, 
 
964
                                    length,
 
965
                                    MYF(MY_ALLOW_ZERO_PTR))))
 
966
      return(1);
 
967
    record_buffer->buffer= newptr;
 
968
    record_buffer->length= length;
 
969
  }
 
970
 
 
971
  assert(length <= record_buffer->length);
 
972
 
 
973
  return(0);
838
974
}
839
975
 
840
976
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
852
988
  }
853
989
 
854
990
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
991
  memcpy(record, ptr, table->getNullBytes());
 
992
  ptr+= table->getNullBytes();
 
993
  for (Field **field=table->field ; *field ; field++)
858
994
  {
859
995
    if (!((*field)->is_null()))
860
996
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
997
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
998
    }
863
999
  }
864
1000
  return(0);
873
1009
}
874
1010
 
875
1011
 
876
 
/*
 
1012
/* 
877
1013
  Called during ORDER BY. Its position is either from being called sequentially
878
1014
  or by having had ha_archive::rnd_pos() called before it is called.
879
1015
*/
889
1025
    return(HA_ERR_END_OF_FILE);
890
1026
  scan_rows--;
891
1027
 
892
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1028
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
893
1029
  current_position= aztell(&archive);
894
1030
  rc= get_row(&archive, buf);
895
1031
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
1032
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
1033
 
898
1034
  return(rc);
899
1035
}
900
1036
 
901
1037
 
902
1038
/*
903
 
  Thanks to the table bool is_ordered this will be called after
 
1039
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
904
1040
  each call to ha_archive::rnd_next() if an ordering of the rows is
905
1041
  needed.
906
1042
*/
907
1043
 
908
 
void ha_archive::position(const unsigned char *)
 
1044
void ha_archive::position(const unsigned char *record __attribute__((unused)))
909
1045
{
910
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1046
  my_store_ptr(ref, ref_length, current_position);
911
1047
  return;
912
1048
}
913
1049
 
921
1057
 
922
1058
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
923
1059
{
924
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
925
 
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
 
1060
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1061
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
926
1062
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
927
1063
    return(HA_ERR_CRASHED_ON_USAGE);
928
1064
  return(get_row(&archive, buf));
929
1065
}
930
1066
 
931
1067
/*
932
 
  This method repairs the meta file. It does this by walking the datafile and
 
1068
  This method repairs the meta file. It does this by walking the datafile and 
933
1069
  rewriting the meta file. Currently it does this by calling optimize with
934
1070
  the extended flag.
935
1071
*/
936
 
int ha_archive::repair()
 
1072
int ha_archive::repair(Session* session, HA_CHECK_OPT* check_opt)
937
1073
{
938
 
  int rc= optimize();
 
1074
  check_opt->flags= T_EXTEND;
 
1075
  int rc= optimize(session, check_opt);
939
1076
 
940
1077
  if (rc)
941
1078
    return(HA_ERR_CRASHED_ON_REPAIR);
946
1083
 
947
1084
/*
948
1085
  The table can become fragmented if data was inserted, read, and then
949
 
  inserted again. What we do is open up the file and recompress it completely.
 
1086
  inserted again. What we do is open up the file and recompress it completely. 
950
1087
*/
951
 
int ha_archive::optimize()
 
1088
int ha_archive::optimize(Session* session __attribute__((unused)),
 
1089
                         HA_CHECK_OPT* check_opt __attribute__((unused)))
952
1090
{
953
1091
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1092
  azio_stream writer;
 
1093
  char writer_filename[FN_REFLEN];
955
1094
 
956
1095
  init_archive_reader();
957
1096
 
962
1101
    share->archive_write_open= false;
963
1102
  }
964
1103
 
965
 
  char* proto_string;
966
 
  proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
 
  if (proto_string == NULL)
968
 
  {
969
 
    return ENOMEM;
970
 
  }
971
 
  azread_frm(&archive, proto_string);
972
 
 
973
1104
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
976
 
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
 
  {
979
 
    free(proto_string);
980
 
    return(HA_ERR_CRASHED_ON_USAGE);
981
 
  }
982
 
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
984
 
 
985
 
  /*
986
 
    An extended rebuild is a lot more effort. We open up each row and re-record it.
987
 
    Any dead rows are removed (aka rows that may have been partially recorded).
 
1105
  fn_format(writer_filename, share->table_name, "", ARN, 
 
1106
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
1107
 
 
1108
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
1109
    return(HA_ERR_CRASHED_ON_USAGE); 
 
1110
 
 
1111
  /* 
 
1112
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
 
1113
    Any dead rows are removed (aka rows that may have been partially recorded). 
988
1114
 
989
1115
    As of Archive format 3, this is the only type that is performed, before this
990
1116
    version it was just done on T_EXTEND
992
1118
  if (1)
993
1119
  {
994
1120
    /*
995
 
      Now we will rewind the archive file so that we are positioned at the
 
1121
      Now we will rewind the archive file so that we are positioned at the 
996
1122
      start of the file.
997
1123
    */
998
1124
    azflush(&archive, Z_SYNC_FLUSH);
999
1125
    rc= read_data_header(&archive);
1000
1126
 
1001
 
    /*
 
1127
    /* 
1002
1128
      On success of writing out the new header, we now fetch each row and
1003
 
      insert it into the new archive file.
 
1129
      insert it into the new archive file. 
1004
1130
    */
1005
1131
    if (!rc)
1006
1132
    {
 
1133
      uint64_t x;
1007
1134
      uint64_t rows_restored;
1008
1135
      share->rows_recorded= 0;
1009
1136
      stats.auto_increment_value= 1;
1011
1138
 
1012
1139
      rows_restored= archive.rows;
1013
1140
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1141
      for (x= 0; x < rows_restored ; x++)
1015
1142
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1143
        rc= get_row(&archive, table->record[0]);
1017
1144
 
1018
1145
        if (rc != 0)
1019
1146
          break;
1020
1147
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1148
        real_write_row(table->record[0], &writer);
1022
1149
        /*
1023
1150
          Long term it should be possible to optimize this so that
1024
1151
          it is not called on each row.
1025
1152
        */
1026
 
        if (getTable()->found_next_number_field)
 
1153
        if (table->found_next_number_field)
1027
1154
        {
1028
 
          Field *field= getTable()->found_next_number_field;
1029
 
 
1030
 
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
 
          field->setReadSet();
1032
 
 
 
1155
          Field *field= table->found_next_number_field;
1033
1156
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1157
            (uint64_t) field->val_int(table->record[0] +
 
1158
                                       field->offset(table->record[0]));
1036
1159
          if (share->archive_write.auto_increment < auto_value)
1037
1160
            stats.auto_increment_value=
1038
1161
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1162
        }
1040
1163
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1164
      share->rows_recorded= (ha_rows)writer.rows;
1042
1165
    }
1043
1166
 
1044
1167
    if (rc && rc != HA_ERR_END_OF_FILE)
1045
1168
    {
1046
1169
      goto error;
1047
1170
    }
1048
 
  }
 
1171
  } 
1049
1172
 
1050
 
  azclose(writer.get());
 
1173
  azclose(&writer);
1051
1174
  share->dirty= false;
1052
 
 
 
1175
  
1053
1176
  azclose(&archive);
1054
1177
 
1055
1178
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1057
 
 
1058
 
  free(proto_string);
 
1179
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
 
1180
 
 
1181
 
1059
1182
  return(rc);
1060
1183
error:
1061
 
  free(proto_string);
1062
 
  azclose(writer.get());
 
1184
  azclose(&writer);
1063
1185
 
1064
 
  return(rc);
 
1186
  return(rc); 
1065
1187
}
1066
1188
 
1067
 
/*
 
1189
/* 
1068
1190
  Below is an example of how to setup row level locking.
1069
1191
*/
1070
1192
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
1071
1193
                                       THR_LOCK_DATA **to,
1072
1194
                                       enum thr_lock_type lock_type)
1073
1195
{
1074
 
  delayed_insert= false;
 
1196
  if (lock_type == TL_WRITE_DELAYED)
 
1197
    delayed_insert= true;
 
1198
  else
 
1199
    delayed_insert= false;
1075
1200
 
1076
 
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1201
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
1077
1202
  {
1078
 
    /*
 
1203
    /* 
1079
1204
      Here is where we get into the guts of a row level lock.
1080
 
      If TL_UNLOCK is set
 
1205
      If TL_UNLOCK is set 
1081
1206
      If we are not doing a LOCK Table or DISCARD/IMPORT
1082
 
      TABLESPACE, then allow multiple writers
 
1207
      TABLESPACE, then allow multiple writers 
1083
1208
    */
1084
1209
 
1085
1210
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
 
         lock_type <= TL_WRITE)
 
1211
         lock_type <= TL_WRITE) && !session_in_lock_tables(session)
1087
1212
        && !session_tablespace_op(session))
1088
1213
      lock_type = TL_WRITE_ALLOW_WRITE;
1089
1214
 
1090
 
    /*
 
1215
    /* 
1091
1216
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1092
1217
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1093
1218
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1094
1219
      to t2. Convert the lock to a normal read lock to allow
1095
 
      concurrent inserts to t2.
 
1220
      concurrent inserts to t2. 
1096
1221
    */
1097
1222
 
1098
 
    if (lock_type == TL_READ_NO_INSERT)
 
1223
    if (lock_type == TL_READ_NO_INSERT && !session_in_lock_tables(session)) 
1099
1224
      lock_type = TL_READ;
1100
1225
 
1101
1226
    lock.type=lock_type;
1106
1231
  return to;
1107
1232
}
1108
1233
 
 
1234
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
 
1235
{
 
1236
  ha_archive::info(HA_STATUS_AUTO);
 
1237
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
 
1238
  {
 
1239
    create_info->auto_increment_value= stats.auto_increment_value;
 
1240
  }
 
1241
 
 
1242
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
 
1243
    create_info->data_file_name= share->real_path;
 
1244
 
 
1245
  return;
 
1246
}
 
1247
 
 
1248
 
1109
1249
/*
1110
1250
  Hints for optimizer, see ha_tina for more information
1111
1251
*/
1112
1252
int ha_archive::info(uint32_t flag)
1113
1253
{
1114
 
  /*
 
1254
  /* 
1115
1255
    If dirty, we lock, and then reset/flush the data.
1116
1256
    I found that just calling azflush() doesn't always work.
1117
1257
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1258
  pthread_mutex_lock(&share->mutex);
1119
1259
  if (share->dirty == true)
1120
1260
  {
1121
1261
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1129
1269
 
1130
1270
  }
1131
1271
 
1132
 
  /*
 
1272
  /* 
1133
1273
    This should be an accurate number now, though bulk and delayed inserts can
1134
1274
    cause the number to be inaccurate.
1135
1275
  */
1136
1276
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1277
  pthread_mutex_unlock(&share->mutex);
1138
1278
 
1139
1279
  scan_rows= stats.records;
1140
1280
  stats.deleted= 0;
1144
1284
  {
1145
1285
    struct stat file_stat;  // Stat information for the data file
1146
1286
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1287
    stat(share->data_file_name, &file_stat);
1148
1288
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1289
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1290
    stats.data_file_length= file_stat.st_size;
1151
1291
    stats.create_time= file_stat.st_ctime;
1152
1292
    stats.update_time= file_stat.st_mtime;
1158
1298
  if (flag & HA_STATUS_AUTO)
1159
1299
  {
1160
1300
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1301
    pthread_mutex_lock(&share->mutex);
1162
1302
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1303
    pthread_mutex_unlock(&share->mutex);
1164
1304
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1305
  }
1166
1306
 
1170
1310
 
1171
1311
/*
1172
1312
  This method tells us that a bulk insert operation is about to occur. We set
1173
 
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
 
1313
  a flag which will keep write_row from saying that its data is dirty. This in
1174
1314
  turn will keep selects from causing a sync to occur.
1175
1315
  Basically, yet another optimizations to keep compression working well.
1176
1316
*/
1182
1322
}
1183
1323
 
1184
1324
 
1185
 
/*
 
1325
/* 
1186
1326
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1187
1327
  flag, and set the share dirty so that the next select will call sync for us.
1188
1328
*/
1195
1335
 
1196
1336
/*
1197
1337
  We cancel a truncate command. The only way to delete an archive table is to drop it.
1198
 
  This is done for security reasons. In a later version we will enable this by
 
1338
  This is done for security reasons. In a later version we will enable this by 
1199
1339
  allowing the user to select a different row format.
1200
1340
*/
1201
1341
int ha_archive::delete_all_rows()
1204
1344
}
1205
1345
 
1206
1346
/*
 
1347
  We just return state if asked.
 
1348
*/
 
1349
bool ha_archive::is_crashed() const 
 
1350
{
 
1351
  return(share->crashed); 
 
1352
}
 
1353
 
 
1354
/*
1207
1355
  Simple scan of the tables to make sure everything is ok.
1208
1356
*/
1209
1357
 
1210
 
int ha_archive::check(Session* session)
 
1358
int ha_archive::check(Session* session,
 
1359
                      HA_CHECK_OPT* check_opt __attribute__((unused)))
1211
1360
{
1212
1361
  int rc= 0;
1213
1362
  const char *old_proc_info;
 
1363
  uint64_t x;
1214
1364
 
1215
1365
  old_proc_info= get_session_proc_info(session);
1216
1366
  set_session_proc_info(session, "Checking table");
1217
1367
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1368
  pthread_mutex_lock(&share->mutex);
1219
1369
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1370
  pthread_mutex_unlock(&share->mutex);
1221
1371
 
1222
1372
  /*
1223
 
    Now we will rewind the archive file so that we are positioned at the
 
1373
    Now we will rewind the archive file so that we are positioned at the 
1224
1374
    start of the file.
1225
1375
  */
1226
1376
  init_archive_reader();
1227
1377
  azflush(&archive, Z_SYNC_FLUSH);
1228
1378
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1379
  for (x= 0; x < share->archive_write.rows; x++)
1230
1380
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1381
    rc= get_row(&archive, table->record[0]);
1232
1382
 
1233
1383
    if (rc != 0)
1234
1384
      break;
1236
1386
 
1237
1387
  set_session_proc_info(session, old_proc_info);
1238
1388
 
1239
 
  if ((rc && rc != HA_ERR_END_OF_FILE))
 
1389
  if ((rc && rc != HA_ERR_END_OF_FILE))  
1240
1390
  {
1241
1391
    share->crashed= false;
1242
1392
    return(HA_ADMIN_CORRUPT);
1247
1397
  }
1248
1398
}
1249
1399
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1251
 
{
1252
 
  int error= 0;
1253
 
 
1254
 
  for (const char **ext= bas_ext(); *ext ; ext++)
1255
 
  {
1256
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1257
 
    {
1258
 
      if ((error=errno) != ENOENT)
1259
 
        break;
1260
 
      error= 0;
1261
 
    }
1262
 
  }
1263
 
 
1264
 
  return error;
1265
 
}
1266
 
 
1267
 
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const TableIdentifier &identifier)
1269
 
{
1270
 
  string proto_path(identifier.getPath());
1271
 
  proto_path.append(ARZ);
1272
 
 
1273
 
  if (access(proto_path.c_str(), F_OK))
1274
 
  {
1275
 
    return false;
1276
 
  }
1277
 
 
1278
 
  return true;
1279
 
}
1280
 
 
1281
 
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::SchemaIdentifier &schema_identifier,
1283
 
                                          drizzled::TableIdentifier::vector &set_of_identifiers)
1284
 
{
1285
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
 
 
1287
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
1288
 
       entry_iter != entries.end(); ++entry_iter)
1289
 
  {
1290
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
 
    const string *filename= &entry->filename;
1292
 
 
1293
 
    assert(filename->size());
1294
 
 
1295
 
    const char *ext= strchr(filename->c_str(), '.');
1296
 
 
1297
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1299
 
    {  }
1300
 
    else
1301
 
    {
1302
 
      char uname[NAME_LEN + 1];
1303
 
      uint32_t file_name_len;
1304
 
 
1305
 
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
 
      // TODO: Remove need for memory copy here
1307
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
 
 
1309
 
      set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1310
 
    }
1311
 
  }
1312
 
}
 
1400
/*
 
1401
  Check and repair the table if needed.
 
1402
*/
 
1403
bool ha_archive::check_and_repair(Session *session) 
 
1404
{
 
1405
  HA_CHECK_OPT check_opt;
 
1406
 
 
1407
  check_opt.init();
 
1408
 
 
1409
  return(repair(session, &check_opt));
 
1410
}
 
1411
 
 
1412
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) 
 
1413
{
 
1414
  archive_record_buffer *r;
 
1415
  if (!(r= 
 
1416
        (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
 
1417
                                           MYF(MY_WME))))
 
1418
  {
 
1419
    return(NULL); /* purecov: inspected */
 
1420
  }
 
1421
  r->length= (int)length;
 
1422
 
 
1423
  if (!(r->buffer= (unsigned char*) my_malloc(r->length,
 
1424
                                    MYF(MY_WME))))
 
1425
  {
 
1426
    free((char*) r);
 
1427
    return(NULL); /* purecov: inspected */
 
1428
  }
 
1429
 
 
1430
  return(r);
 
1431
}
 
1432
 
 
1433
void ha_archive::destroy_record_buffer(archive_record_buffer *r) 
 
1434
{
 
1435
  free((char*) r->buffer);
 
1436
  free((char*) r);
 
1437
  return;
 
1438
}
 
1439
 
 
1440
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
 
1441
  PLUGIN_VAR_NOCMDOPT,
 
1442
  "Whether or not to use asynchronous IO.",
 
1443
  NULL, NULL, true);
 
1444
 
 
1445
static struct st_mysql_sys_var* archive_system_variables[]= {
 
1446
  DRIZZLE_SYSVAR(aio),
 
1447
  NULL
 
1448
};
 
1449
 
 
1450
mysql_declare_plugin(archive)
 
1451
{
 
1452
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
1453
  "ARCHIVE",
 
1454
  "3.5",
 
1455
  "Brian Aker, MySQL AB",
 
1456
  "Archive storage engine",
 
1457
  PLUGIN_LICENSE_GPL,
 
1458
  archive_db_init, /* Plugin Init */
 
1459
  archive_db_done, /* Plugin Deinit */
 
1460
  NULL,                       /* status variables                */
 
1461
  archive_system_variables,   /* system variables                */
 
1462
  NULL                        /* config options                  */
 
1463
}
 
1464
mysql_declare_plugin_end;
 
1465