~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/ha_archive.cc

  • Committer: Jay Pipes
  • Date: 2008-07-17 17:54:00 UTC
  • mto: This revision was merged to the branch mainline in revision 182.
  • Revision ID: jay@mysql.com-20080717175400-xm2aazihjra8mdzq
Removal of DBUG from libdrizzle/ - Round 2

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2003 MySQL AB
2
 
   Copyright (C) 2010 Brian Aker
3
2
 
4
3
  This program is free software; you can redistribute it and/or modify
5
4
  it under the terms of the GNU General Public License as published by
12
11
 
13
12
  You should have received a copy of the GNU General Public License
14
13
  along with this program; if not, write to the Free Software
15
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
16
 
 
17
 
 
18
 
#include <config.h>
19
 
 
20
 
#include <plugin/archive/archive_engine.h>
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
 
 
24
 
using namespace std;
25
 
using namespace drizzled;
26
 
 
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#ifdef USE_PRAGMA_IMPLEMENTATION
 
17
#pragma implementation        // gcc: Class implementation
 
18
#endif
 
19
 
 
20
#include "mysql_priv.h"
 
21
#include <myisam.h>
 
22
 
 
23
#include "ha_archive.h"
 
24
#include <my_dir.h>
 
25
 
 
26
#include <mysql/plugin.h>
27
27
 
28
28
/*
29
 
  First, if you want to understand storage engines you should look at
30
 
  ha_example.cc and ha_example.h.
 
29
  First, if you want to understand storage engines you should look at 
 
30
  ha_example.cc and ha_example.h. 
31
31
 
32
32
  This example was written as a test case for a customer who needed
33
33
  a storage engine without indexes that could compress data very well.
34
34
  So, welcome to a completely compressed storage engine. This storage
35
 
  engine only does inserts. No replace, deletes, or updates. All reads are
 
35
  engine only does inserts. No replace, deletes, or updates. All reads are 
36
36
  complete table scans. Compression is done through a combination of packing
37
37
  and making use of the zlib library
38
 
 
 
38
  
39
39
  We keep a file pointer open for each instance of ha_archive for each read
40
40
  but for writes we keep one open file handle just for that. We flush it
41
41
  only if we have a read occur. azip handles compressing lots of records
45
45
  the same time since we would want to flush).
46
46
 
47
47
  A "meta" file is kept alongside the data file. This file serves two purpose.
48
 
  The first purpose is to track the number of rows in the table. The second
49
 
  purpose is to determine if the table was closed properly or not. When the
50
 
  meta file is first opened it is marked as dirty. It is opened when the table
51
 
  itself is opened for writing. When the table is closed the new count for rows
52
 
  is written to the meta file and the file is marked as clean. If the meta file
53
 
  is opened and it is marked as dirty, it is assumed that a crash occured. At
 
48
  The first purpose is to track the number of rows in the table. The second 
 
49
  purpose is to determine if the table was closed properly or not. When the 
 
50
  meta file is first opened it is marked as dirty. It is opened when the table 
 
51
  itself is opened for writing. When the table is closed the new count for rows 
 
52
  is written to the meta file and the file is marked as clean. If the meta file 
 
53
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
54
54
  this point an error occurs and the user is told to rebuild the file.
55
55
  A rebuild scans the rows and rewrites the meta file. If corruption is found
56
56
  in the data file then the meta file is not repaired.
57
57
 
58
58
  At some point a recovery method for such a drastic case needs to be divised.
59
59
 
60
 
  Locks are row level, and you will get a consistant read.
 
60
  Locks are row level, and you will get a consistant read. 
61
61
 
62
62
  For performance as far as table scans go it is quite fast. I don't have
63
63
  good numbers but locally it has out performed both Innodb and MyISAM. For
64
64
  Innodb the question will be if the table can be fit into the buffer
65
65
  pool. For MyISAM its a question of how much the file system caches the
66
66
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
67
 
  doesn't have enough memory to cache entire table that archive turns out
68
 
  to be any faster.
 
67
  doesn't have enough memory to cache entire table that archive turns out 
 
68
  to be any faster. 
69
69
 
70
70
  Examples between MyISAM (packed) and Archive.
71
71
 
93
93
    -Brian
94
94
*/
95
95
 
96
 
/* When the engine starts up set the first version */
97
 
static uint64_t global_version= 1;
98
 
 
99
 
// We use this to find out the state of the archive aio option.
100
 
extern bool archive_aio_state(void);
 
96
/* Variables for archive share methods */
 
97
pthread_mutex_t archive_mutex;
 
98
static HASH archive_open_tables;
 
99
static unsigned int global_version;
 
100
 
 
101
/* The file extension */
 
102
#define ARZ ".ARZ"               // The data file
 
103
#define ARN ".ARN"               // Files used during an optimize call
 
104
#define ARM ".ARM"               // Meta file (deprecated)
 
105
 
 
106
/*
 
107
  uchar + uchar
 
108
*/
 
109
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
 
110
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
 
111
 
 
112
/* Static declarations for handerton */
 
113
static handler *archive_create_handler(handlerton *hton, 
 
114
                                       TABLE_SHARE *table, 
 
115
                                       MEM_ROOT *mem_root);
 
116
int archive_discover(handlerton *hton, THD* thd, const char *db, 
 
117
                     const char *name,
 
118
                     uchar **frmblob, 
 
119
                     size_t *frmlen);
 
120
 
 
121
static my_bool archive_use_aio= false;
101
122
 
102
123
/*
103
124
  Number of rows that will force a bulk insert.
109
130
*/
110
131
#define ARCHIVE_ROW_HEADER_SIZE 4
111
132
 
112
 
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
113
 
{
114
 
  ArchiveMap::iterator find_iter=
115
 
    archive_open_tables.find(table_name);
116
 
 
117
 
  if (find_iter != archive_open_tables.end())
118
 
    return (*find_iter).second;
119
 
  else
120
 
    return NULL;
121
 
}
122
 
 
123
 
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
124
 
{
125
 
  archive_open_tables[table_name]= share;
126
 
}
127
 
 
128
 
void ArchiveEngine::deleteOpenTable(const string &table_name)
129
 
{
130
 
  archive_open_tables.erase(table_name);
131
 
}
132
 
 
133
 
 
134
 
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
135
 
{
136
 
  string new_path(identifier.getPath());
137
 
 
138
 
  new_path+= ARZ;
139
 
 
140
 
  int error= unlink(new_path.c_str());
141
 
 
142
 
  if (error != 0)
143
 
  {
144
 
    error= errno= errno;
145
 
  }
146
 
 
147
 
  return error;
148
 
}
149
 
 
150
 
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const identifier::Table &identifier,
152
 
                                        drizzled::message::Table &table_proto)
153
 
{
154
 
  struct stat stat_info;
155
 
  int error= ENOENT;
156
 
  string proto_path;
157
 
 
158
 
  proto_path.reserve(FN_REFLEN);
159
 
  proto_path.assign(identifier.getPath());
160
 
 
161
 
  proto_path.append(ARZ);
162
 
 
163
 
  if (stat(proto_path.c_str(),&stat_info))
164
 
    return errno;
165
 
  else
166
 
    error= EEXIST;
167
 
 
168
 
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
170
 
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
 
      return HA_ERR_CRASHED_ON_USAGE;
173
 
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
 
    if (proto_string == NULL)
176
 
    {
177
 
      azclose(proto_stream.get());
178
 
      return ENOMEM;
179
 
    }
180
 
 
181
 
    azread_frm(proto_stream.get(), proto_string);
182
 
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
 
      error= HA_ERR_CRASHED_ON_USAGE;
185
 
 
186
 
    azclose(proto_stream.get());
187
 
    free(proto_string);
188
 
  }
189
 
 
190
 
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
 
     we do not rewrite the table proto (as it's wedged in the file header)
192
 
  */
193
 
  table_proto.set_schema(identifier.getSchemaName());
194
 
  table_proto.set_name(identifier.getTableName());
195
 
 
196
 
  return error;
197
 
}
198
 
 
199
 
 
200
 
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
202
 
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
 
133
static handler *archive_create_handler(handlerton *hton,
 
134
                                       TABLE_SHARE *table, 
 
135
                                       MEM_ROOT *mem_root)
 
136
{
 
137
  return new (mem_root) ha_archive(hton, table);
 
138
}
 
139
 
 
140
/*
 
141
  Used for hash table that tracks open tables.
 
142
*/
 
143
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
 
144
                             my_bool not_used __attribute__((unused)))
 
145
{
 
146
  *length=share->table_name_length;
 
147
  return (uchar*) share->table_name;
 
148
}
 
149
 
 
150
 
 
151
/*
 
152
  Initialize the archive handler.
 
153
 
 
154
  SYNOPSIS
 
155
    archive_db_init()
 
156
    void *
 
157
 
 
158
  RETURN
 
159
    false       OK
 
160
    true        Error
 
161
*/
 
162
 
 
163
int archive_db_init(void *p)
 
164
{
 
165
  DBUG_ENTER("archive_db_init");
 
166
  handlerton *archive_hton;
 
167
 
 
168
  archive_hton= (handlerton *)p;
 
169
  archive_hton->state= SHOW_OPTION_YES;
 
170
  archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
 
171
  archive_hton->create= archive_create_handler;
 
172
  archive_hton->flags= HTON_NO_FLAGS;
 
173
  archive_hton->discover= archive_discover;
 
174
 
 
175
  /* When the engine starts up set the first version */
 
176
  global_version= 1;
 
177
 
 
178
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
 
179
    goto error;
 
180
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
 
181
                (hash_get_key) archive_get_key, 0, 0))
 
182
  {
 
183
    VOID(pthread_mutex_destroy(&archive_mutex));
 
184
  }
 
185
  else
 
186
  {
 
187
    DBUG_RETURN(false);
 
188
  }
 
189
error:
 
190
  DBUG_RETURN(true);
 
191
}
 
192
 
 
193
/*
 
194
  Release the archive handler.
 
195
 
 
196
  SYNOPSIS
 
197
    archive_db_done()
 
198
    void
 
199
 
 
200
  RETURN
 
201
    false       OK
 
202
*/
 
203
 
 
204
int archive_db_done(void *p __attribute__((__unused__)))
 
205
{
 
206
  hash_free(&archive_open_tables);
 
207
  VOID(pthread_mutex_destroy(&archive_mutex));
 
208
 
 
209
  return 0;
 
210
}
 
211
 
 
212
 
 
213
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
 
214
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
203
215
{
204
216
  /* Set our original buffer from pre-allocated memory */
205
217
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
206
218
 
207
219
  /* The size of the offset value we will use for position() */
208
 
  ref_length= sizeof(internal::my_off_t);
 
220
  ref_length= sizeof(my_off_t);
209
221
  archive_reader_open= false;
210
222
}
211
223
 
 
224
int archive_discover(handlerton *hton __attribute__((__unused__)),
 
225
                     THD* thd __attribute__((__unused__)),
 
226
                     const char *db,
 
227
                     const char *name,
 
228
                     uchar **frmblob,
 
229
                     size_t *frmlen)
 
230
{
 
231
  DBUG_ENTER("archive_discover");
 
232
  DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name)); 
 
233
  azio_stream frm_stream;
 
234
  char az_file[FN_REFLEN];
 
235
  char *frm_ptr;
 
236
  struct stat file_stat; 
 
237
 
 
238
  fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
239
 
 
240
  if (stat(az_file, &file_stat))
 
241
    goto err;
 
242
 
 
243
  if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
 
244
  {
 
245
    if (errno == EROFS || errno == EACCES)
 
246
      DBUG_RETURN(my_errno= errno);
 
247
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
248
  }
 
249
 
 
250
  if (frm_stream.frm_length == 0)
 
251
    goto err;
 
252
 
 
253
  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
 
254
  azread_frm(&frm_stream, frm_ptr);
 
255
  azclose(&frm_stream);
 
256
 
 
257
  *frmlen= frm_stream.frm_length;
 
258
  *frmblob= (uchar*) frm_ptr;
 
259
 
 
260
  DBUG_RETURN(0);
 
261
err:
 
262
  my_errno= 0;
 
263
  DBUG_RETURN(1);
 
264
}
 
265
 
212
266
/*
213
267
  This method reads the header of a datafile and returns whether or not it was successful.
214
268
*/
215
269
int ha_archive::read_data_header(azio_stream *file_to_read)
216
270
{
 
271
  DBUG_ENTER("ha_archive::read_data_header");
 
272
 
217
273
  if (azread_init(file_to_read) == -1)
218
 
    return(HA_ERR_CRASHED_ON_USAGE);
 
274
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
219
275
 
220
276
  if (file_to_read->version >= 3)
221
 
    return(0);
222
 
 
223
 
  return(1);
224
 
}
225
 
 
226
 
ArchiveShare::ArchiveShare():
227
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
229
 
{
230
 
  assert(1);
231
 
}
232
 
 
233
 
ArchiveShare::ArchiveShare(const char *name):
234
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
236
 
{
237
 
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
238
 
  table_name.append(name);
239
 
  data_file_name.assign(table_name);
240
 
  data_file_name.append(ARZ);
241
 
  /*
242
 
    We will use this lock for rows.
243
 
  */
244
 
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
245
 
}
246
 
 
247
 
ArchiveShare::~ArchiveShare()
248
 
{
249
 
  _lock.deinit();
250
 
  pthread_mutex_destroy(&_mutex);
251
 
  /*
252
 
    We need to make sure we don't reset the crashed state.
253
 
    If we open a crashed file, wee need to close it as crashed unless
254
 
    it has been repaired.
255
 
    Since we will close the data down after this, we go on and count
256
 
    the flush on close;
257
 
  */
258
 
  if (archive_write_open == true)
259
 
    (void)azclose(&archive_write);
260
 
}
261
 
 
262
 
bool ArchiveShare::prime(uint64_t *auto_increment)
263
 
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
265
 
 
266
 
  /*
267
 
    We read the meta file, but do not mark it dirty. Since we are not
268
 
    doing a write we won't mark it dirty (and we won't open it for
269
 
    anything but reading... open it for write and we will generate null
270
 
    compression writes).
271
 
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
273
 
               AZ_METHOD_BLOCK)))
274
 
    return false;
275
 
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
279
 
  if (version < global_version)
280
 
  {
281
 
    version_rows= rows_recorded;
282
 
    version= global_version;
283
 
  }
284
 
  azclose(archive_tmp.get());
285
 
 
286
 
  return true;
 
277
    DBUG_RETURN(0);
 
278
 
 
279
  DBUG_RETURN(1);
287
280
}
288
281
 
289
282
 
290
283
/*
291
 
  We create the shared memory space that we will use for the open table.
 
284
  We create the shared memory space that we will use for the open table. 
292
285
  No matter what we try to get or create a share. This is so that a repair
293
 
  table operation can occur.
 
286
  table operation can occur. 
294
287
 
295
288
  See ha_example.cc for a longer description.
296
289
*/
297
 
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
 
290
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
298
291
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
300
 
 
301
 
  pthread_mutex_lock(&a_engine->mutex());
302
 
 
303
 
  share= a_engine->findOpenTable(table_name);
304
 
 
305
 
  if (!share)
 
292
  uint length;
 
293
  DBUG_ENTER("ha_archive::get_share");
 
294
 
 
295
  pthread_mutex_lock(&archive_mutex);
 
296
  length=(uint) strlen(table_name);
 
297
 
 
298
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
 
299
                                           (uchar*) table_name,
 
300
                                           length)))
306
301
  {
307
 
    share= new ArchiveShare(table_name);
 
302
    char *tmp_name;
 
303
    azio_stream archive_tmp;
308
304
 
309
 
    if (share == NULL)
 
305
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
306
                          &share, sizeof(*share),
 
307
                          &tmp_name, length+1,
 
308
                          NullS)) 
310
309
    {
311
 
      pthread_mutex_unlock(&a_engine->mutex());
 
310
      pthread_mutex_unlock(&archive_mutex);
312
311
      *rc= HA_ERR_OUT_OF_MEM;
313
 
      return(NULL);
 
312
      DBUG_RETURN(NULL);
314
313
    }
315
314
 
316
 
    if (share->prime(&stats.auto_increment_value) == false)
 
315
    share->use_count= 0;
 
316
    share->table_name_length= length;
 
317
    share->table_name= tmp_name;
 
318
    share->crashed= false;
 
319
    share->archive_write_open= false;
 
320
    fn_format(share->data_file_name, table_name, "",
 
321
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
322
    strmov(share->table_name, table_name);
 
323
    DBUG_PRINT("ha_archive", ("Data File %s", 
 
324
                        share->data_file_name));
 
325
    /*
 
326
      We will use this lock for rows.
 
327
    */
 
328
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
 
329
    
 
330
    /*
 
331
      We read the meta file, but do not mark it dirty. Since we are not
 
332
      doing a write we won't mark it dirty (and we won't open it for
 
333
      anything but reading... open it for write and we will generate null
 
334
      compression writes).
 
335
    */
 
336
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
 
337
                 AZ_METHOD_BLOCK)))
317
338
    {
318
 
      pthread_mutex_unlock(&a_engine->mutex());
 
339
      VOID(pthread_mutex_destroy(&share->mutex));
 
340
      free(share);
 
341
      pthread_mutex_unlock(&archive_mutex);
319
342
      *rc= HA_ERR_CRASHED_ON_REPAIR;
320
 
      delete share;
321
 
 
322
 
      return NULL;
323
 
    }
324
 
 
325
 
    a_engine->addOpenTable(share->table_name, share);
326
 
    thr_lock_init(&share->_lock);
 
343
      DBUG_RETURN(NULL);
 
344
    }
 
345
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
 
346
    share->rows_recorded= (ha_rows)archive_tmp.rows;
 
347
    share->crashed= archive_tmp.dirty;
 
348
    if (share->version < global_version)
 
349
    {
 
350
      share->version_rows= share->rows_recorded;
 
351
      share->version= global_version;
 
352
    }
 
353
    azclose(&archive_tmp);
 
354
 
 
355
    VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
 
356
    thr_lock_init(&share->lock);
327
357
  }
328
358
  share->use_count++;
329
 
 
 
359
  DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now", 
 
360
                      share->table_name_length, share->table_name,
 
361
                      share->use_count));
330
362
  if (share->crashed)
331
363
    *rc= HA_ERR_CRASHED_ON_USAGE;
332
 
  pthread_mutex_unlock(&a_engine->mutex());
 
364
  pthread_mutex_unlock(&archive_mutex);
333
365
 
334
 
  return(share);
 
366
  DBUG_RETURN(share);
335
367
}
336
368
 
337
369
 
338
 
/*
 
370
/* 
339
371
  Free the share.
340
372
  See ha_example.cc for a description.
341
373
*/
342
374
int ha_archive::free_share()
343
375
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
376
  int rc= 0;
 
377
  DBUG_ENTER("ha_archive::free_share");
 
378
  DBUG_PRINT("ha_archive",
 
379
             ("archive table %.*s has %d open handles on entrance", 
 
380
              share->table_name_length, share->table_name,
 
381
              share->use_count));
345
382
 
346
 
  pthread_mutex_lock(&a_engine->mutex());
 
383
  pthread_mutex_lock(&archive_mutex);
347
384
  if (!--share->use_count)
348
385
  {
349
 
    a_engine->deleteOpenTable(share->table_name);
350
 
    delete share;
 
386
    hash_delete(&archive_open_tables, (uchar*) share);
 
387
    thr_lock_delete(&share->lock);
 
388
    VOID(pthread_mutex_destroy(&share->mutex));
 
389
    /* 
 
390
      We need to make sure we don't reset the crashed state.
 
391
      If we open a crashed file, wee need to close it as crashed unless
 
392
      it has been repaired.
 
393
      Since we will close the data down after this, we go on and count
 
394
      the flush on close;
 
395
    */
 
396
    if (share->archive_write_open == true)
 
397
    {
 
398
      if (azclose(&(share->archive_write)))
 
399
        rc= 1;
 
400
    }
 
401
    my_free((uchar*) share, MYF(0));
351
402
  }
352
 
  pthread_mutex_unlock(&a_engine->mutex());
 
403
  pthread_mutex_unlock(&archive_mutex);
353
404
 
354
 
  return 0;
 
405
  DBUG_RETURN(rc);
355
406
}
356
407
 
357
408
int ha_archive::init_archive_writer()
358
409
{
359
 
  /*
 
410
  DBUG_ENTER("ha_archive::init_archive_writer");
 
411
  /* 
360
412
    It is expensive to open and close the data files and since you can't have
361
413
    a gzip file that can be both read and written we keep a writer open
362
414
    that is shared amoung all open tables.
363
415
  */
364
 
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
365
 
               O_RDWR, AZ_METHOD_BLOCK)))
 
416
  if (!(azopen(&(share->archive_write), share->data_file_name, 
 
417
               O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
366
418
  {
 
419
    DBUG_PRINT("ha_archive", ("Could not open archive write file"));
367
420
    share->crashed= true;
368
 
    return(1);
 
421
    DBUG_RETURN(1);
369
422
  }
370
423
  share->archive_write_open= true;
371
424
 
372
 
  return(0);
 
425
  DBUG_RETURN(0);
373
426
}
374
427
 
375
428
 
376
 
/*
377
 
  No locks are required because it is associated with just one Cursor instance
 
429
/* 
 
430
  No locks are required because it is associated with just one handler instance
378
431
*/
379
432
int ha_archive::init_archive_reader()
380
433
{
381
 
  /*
 
434
  DBUG_ENTER("ha_archive::init_archive_reader");
 
435
  /* 
382
436
    It is expensive to open and close the data files and since you can't have
383
437
    a gzip file that can be both read and written we keep a writer open
384
438
    that is shared amoung all open tables.
387
441
  {
388
442
    az_method method;
389
443
 
390
 
    if (archive_aio_state())
 
444
    switch (archive_use_aio)
391
445
    {
 
446
    case false:
 
447
      method= AZ_METHOD_BLOCK;
 
448
      break;
 
449
    case true:
392
450
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
451
      break;
 
452
    default:
396
453
      method= AZ_METHOD_BLOCK;
397
454
    }
398
 
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
455
    if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY, 
399
456
                 method)))
400
457
    {
 
458
      DBUG_PRINT("ha_archive", ("Could not open archive read file"));
401
459
      share->crashed= true;
402
 
      return(1);
 
460
      DBUG_RETURN(1);
403
461
    }
404
462
    archive_reader_open= true;
405
463
  }
406
464
 
407
 
  return(0);
 
465
  DBUG_RETURN(0);
408
466
}
409
467
 
 
468
 
410
469
/*
 
470
  We just implement one additional file extension.
 
471
*/
 
472
static const char *ha_archive_exts[] = {
 
473
  ARZ,
 
474
  NullS
 
475
};
 
476
 
 
477
const char **ha_archive::bas_ext() const
 
478
{
 
479
  return ha_archive_exts;
 
480
}
 
481
 
 
482
 
 
483
/* 
411
484
  When opening a file we:
412
485
  Create/get our shared structure.
413
486
  Init out lock.
414
487
  We open the file we will read from.
415
488
*/
416
 
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
 
489
int ha_archive::open(const char *name,
 
490
                     int mode __attribute__((__unused__)),
 
491
                     uint open_options)
417
492
{
418
493
  int rc= 0;
419
 
  share= get_share(identifier.getPath().c_str(), &rc);
420
 
 
421
 
  /** 
422
 
    We either fix it ourselves, or we just take it offline 
423
 
 
424
 
    @todo Create some documentation in the recovery tools shipped with the engine.
425
 
  */
426
 
  if (rc == HA_ERR_CRASHED_ON_USAGE)
 
494
  DBUG_ENTER("ha_archive::open");
 
495
 
 
496
  DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s", 
 
497
                      (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
 
498
  share= get_share(name, &rc);
 
499
 
 
500
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
427
501
  {
 
502
    /* purecov: begin inspected */
428
503
    free_share();
429
 
    rc= repair();
430
 
 
431
 
    return 0;
 
504
    DBUG_RETURN(rc);
 
505
    /* purecov: end */    
432
506
  }
433
507
  else if (rc == HA_ERR_OUT_OF_MEM)
434
508
  {
435
 
    return(rc);
436
 
  }
437
 
 
438
 
  assert(share);
439
 
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
 
 
442
 
  lock.init(&share->_lock);
443
 
 
444
 
  return(rc);
445
 
}
446
 
 
447
 
// Should never be called
448
 
int ha_archive::open(const char *, int, uint32_t)
449
 
{
450
 
  assert(0);
451
 
  return -1;
 
509
    DBUG_RETURN(rc);
 
510
  }
 
511
 
 
512
  DBUG_ASSERT(share);
 
513
 
 
514
  record_buffer= create_record_buffer(table->s->reclength + 
 
515
                                      ARCHIVE_ROW_HEADER_SIZE);
 
516
 
 
517
  if (!record_buffer)
 
518
  {
 
519
    free_share();
 
520
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
521
  }
 
522
 
 
523
  thr_lock_data_init(&share->lock, &lock, NULL);
 
524
 
 
525
  DBUG_PRINT("ha_archive", ("archive table was crashed %s", 
 
526
                      rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
 
527
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
 
528
  {
 
529
    DBUG_RETURN(0);
 
530
  }
 
531
  else
 
532
    DBUG_RETURN(rc);
452
533
}
453
534
 
454
535
 
457
538
 
458
539
  SYNOPSIS
459
540
    close();
460
 
 
 
541
  
461
542
  IMPLEMENTATION:
462
543
 
463
544
  We first close this storage engines file handle to the archive and
472
553
int ha_archive::close(void)
473
554
{
474
555
  int rc= 0;
 
556
  DBUG_ENTER("ha_archive::close");
475
557
 
476
 
  record_buffer.clear();
 
558
  destroy_record_buffer(record_buffer);
477
559
 
478
560
  /* First close stream */
479
561
  if (archive_reader_open == true)
484
566
  /* then also close share */
485
567
  rc|= free_share();
486
568
 
487
 
  return(rc);
 
569
  DBUG_RETURN(rc);
488
570
}
489
571
 
490
572
 
491
573
/*
492
 
  We create our data file here. The format is pretty simple.
 
574
  We create our data file here. The format is pretty simple. 
493
575
  You can read about the format of the data file above.
494
 
  Unlike other storage engines we do not "pack" our data. Since we
495
 
  are about to do a general compression, packing would just be a waste of
496
 
  CPU time. If the table has blobs they are written after the row in the order
 
576
  Unlike other storage engines we do not "pack" our data. Since we 
 
577
  are about to do a general compression, packing would just be a waste of 
 
578
  CPU time. If the table has blobs they are written after the row in the order 
497
579
  of creation.
498
580
*/
499
581
 
500
 
int ArchiveEngine::doCreateTable(Session &,
501
 
                                 Table& table_arg,
502
 
                                 const drizzled::identifier::Table &identifier,
503
 
                                 drizzled::message::Table& proto)
 
582
int ha_archive::create(const char *name, TABLE *table_arg,
 
583
                       HA_CREATE_INFO *create_info)
504
584
{
505
 
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
507
 
  uint64_t auto_increment_value;
508
 
  string serialized_proto;
509
 
 
510
 
  auto_increment_value= proto.options().auto_increment_value();
511
 
 
512
 
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
 
585
  char name_buff[FN_REFLEN];
 
586
  char linkname[FN_REFLEN];
 
587
  int error;
 
588
  azio_stream create_stream;            /* Archive file we are working with */
 
589
  File frm_file;                   /* File handler for readers */
 
590
  struct stat file_stat;
 
591
  uchar *frm_ptr;
 
592
 
 
593
  DBUG_ENTER("ha_archive::create");
 
594
 
 
595
  stats.auto_increment_value= create_info->auto_increment_value;
 
596
 
 
597
  for (uint key= 0; key < table_arg->s->keys; key++)
513
598
  {
514
 
    KeyInfo *pos= &table_arg.key_info[key];
515
 
    KeyPartInfo *key_part=     pos->key_part;
516
 
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
599
    KEY *pos= table_arg->key_info+key;
 
600
    KEY_PART_INFO *key_part=     pos->key_part;
 
601
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
517
602
 
518
603
    for (; key_part != key_part_end; key_part++)
519
604
    {
521
606
 
522
607
      if (!(field->flags & AUTO_INCREMENT_FLAG))
523
608
      {
524
 
        return -1;
 
609
        error= -1;
 
610
        DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
 
611
        goto error;
525
612
      }
526
613
    }
527
614
  }
528
615
 
529
 
  std::string named_file= identifier.getPath();
530
 
  named_file.append(ARZ);
531
 
 
532
 
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
 
             AZ_METHOD_BLOCK) == 0)
535
 
  {
536
 
    error= errno;
537
 
    unlink(named_file.c_str());
538
 
 
539
 
    return(error ? error : -1);
540
 
  }
541
 
 
542
 
  try {
543
 
    proto.SerializeToString(&serialized_proto);
544
 
  }
545
 
  catch (...)
546
 
  {
547
 
    unlink(named_file.c_str());
548
 
 
549
 
    return(error ? error : -1);
550
 
  }
551
 
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
 
                  serialized_proto.length()))
554
 
  {
555
 
    unlink(named_file.c_str());
556
 
 
557
 
    return(error ? error : -1);
558
 
  }
559
 
 
560
 
  if (proto.options().has_comment())
561
 
  {
562
 
    int write_length;
563
 
 
564
 
    write_length= azwrite_comment(create_stream.get(),
565
 
                                  proto.options().comment().c_str(),
566
 
                                  proto.options().comment().length());
567
 
 
568
 
    if (write_length < 0)
569
 
    {
570
 
      error= errno;
571
 
      unlink(named_file.c_str());
572
 
 
573
 
      return(error ? error : -1);
574
 
    }
 
616
  /* 
 
617
    We reuse name_buff since it is available.
 
618
  */
 
619
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
 
620
  {
 
621
    DBUG_PRINT("ha_archive", ("archive will create stream file %s", 
 
622
                        create_info->data_file_name));
 
623
                        
 
624
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
 
625
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
626
    fn_format(linkname, name, "", ARZ,
 
627
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
628
  }
 
629
  else
 
630
  {
 
631
    fn_format(name_buff, name, "", ARZ,
 
632
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
633
    linkname[0]= 0;
575
634
  }
576
635
 
577
636
  /*
578
 
    Yes you need to do this, because the starting value
579
 
    for the autoincrement may not be zero.
 
637
    There is a chance that the file was "discovered". In this case
 
638
    just use whatever file is there.
580
639
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
582
 
    auto_increment_value - 1 : 0;
583
 
 
584
 
  if (azclose(create_stream.get()))
 
640
  if (!stat(name_buff, &file_stat))
585
641
  {
586
 
    error= errno;
587
 
    unlink(named_file.c_str());
588
 
 
589
 
    return(error ? error : -1);
 
642
    my_errno= 0;
 
643
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
 
644
                 AZ_METHOD_BLOCK)))
 
645
    {
 
646
      error= errno;
 
647
      goto error2;
 
648
    }
 
649
 
 
650
    if (linkname[0])
 
651
      my_symlink(name_buff, linkname, MYF(0));
 
652
    fn_format(name_buff, name, "", ".frm",
 
653
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
654
 
 
655
    /*
 
656
      Here is where we open up the frm and pass it to archive to store 
 
657
    */
 
658
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
 
659
    {
 
660
      if (fstat(frm_file, &file_stat))
 
661
      {
 
662
        frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
 
663
        if (frm_ptr)
 
664
        {
 
665
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
 
666
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
 
667
          my_free((uchar*)frm_ptr, MYF(0));
 
668
        }
 
669
      }
 
670
      my_close(frm_file, MYF(0));
 
671
    }
 
672
 
 
673
    if (create_info->comment.str)
 
674
      azwrite_comment(&create_stream, create_info->comment.str, 
 
675
                      (unsigned int)create_info->comment.length);
 
676
 
 
677
    /* 
 
678
      Yes you need to do this, because the starting value 
 
679
      for the autoincrement may not be zero.
 
680
    */
 
681
    create_stream.auto_increment= stats.auto_increment_value ?
 
682
                                    stats.auto_increment_value - 1 : 0;
 
683
    if (azclose(&create_stream))
 
684
    {
 
685
      error= errno;
 
686
      goto error2;
 
687
    }
590
688
  }
591
 
 
592
 
  return(0);
 
689
  else
 
690
    my_errno= 0;
 
691
 
 
692
  DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
 
693
  DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
 
694
 
 
695
 
 
696
  DBUG_RETURN(0);
 
697
 
 
698
error2:
 
699
  delete_table(name);
 
700
error:
 
701
  /* Return error number, if we got one */
 
702
  DBUG_RETURN(error ? error : -1);
593
703
}
594
704
 
595
705
/*
596
706
  This is where the actual row is written out.
597
707
*/
598
 
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
 
708
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
599
709
{
600
 
  off_t written;
 
710
  my_off_t written;
601
711
  unsigned int r_pack_length;
 
712
  DBUG_ENTER("ha_archive::real_write_row");
602
713
 
603
714
  /* We pack the row for writing */
604
715
  r_pack_length= pack_row(buf);
605
716
 
606
 
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
717
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
718
  if (written != r_pack_length)
608
719
  {
609
 
    return(-1);
 
720
    DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d", 
 
721
                                              (uint32) written, 
 
722
                                              (uint32)r_pack_length));
 
723
    DBUG_RETURN(-1);
610
724
  }
611
725
 
612
726
  if (!delayed_insert || !bulk_insert)
613
727
    share->dirty= true;
614
728
 
615
 
  return(0);
 
729
  DBUG_RETURN(0);
616
730
}
617
731
 
618
732
 
619
 
/*
 
733
/* 
620
734
  Calculate max length needed for row. This includes
621
735
  the bytes required for the length in the header.
622
736
*/
623
737
 
624
 
uint32_t ha_archive::max_row_length(const unsigned char *)
 
738
uint32 ha_archive::max_row_length(const uchar *buf __attribute__((__unused__)))
625
739
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
740
  uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
627
741
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
742
 
629
 
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
743
  uint *ptr, *end;
 
744
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
631
745
       ptr != end ;
632
746
       ptr++)
633
747
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
748
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
635
749
  }
636
750
 
637
751
  return length;
638
752
}
639
753
 
640
754
 
641
 
unsigned int ha_archive::pack_row(unsigned char *record)
 
755
unsigned int ha_archive::pack_row(uchar *record)
642
756
{
643
 
  unsigned char *ptr;
 
757
  uchar *ptr;
 
758
 
 
759
  DBUG_ENTER("ha_archive::pack_row");
 
760
 
644
761
 
645
762
  if (fix_rec_buff(max_row_length(record)))
646
 
    return(HA_ERR_OUT_OF_MEM);
 
763
    DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
647
764
 
648
765
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
766
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
767
  ptr= record_buffer->buffer + table->s->null_bytes;
651
768
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
769
  for (Field **field=table->field ; *field ; field++)
653
770
  {
654
771
    if (!((*field)->is_null()))
655
772
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
656
773
  }
657
774
 
658
 
  return((unsigned int) (ptr - &record_buffer[0]));
 
775
  DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
 
776
                           (ptr - record_buffer->buffer - 
 
777
                             ARCHIVE_ROW_HEADER_SIZE)));
 
778
 
 
779
  DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
659
780
}
660
781
 
661
782
 
662
 
/*
 
783
/* 
663
784
  Look at ha_archive::open() for an explanation of the row format.
664
785
  Here we just write out the row.
665
786
 
666
787
  Wondering about start_bulk_insert()? We don't implement it for
667
788
  archive since it optimizes for lots of writes. The only save
668
 
  for implementing start_bulk_insert() is that we could skip
 
789
  for implementing start_bulk_insert() is that we could skip 
669
790
  setting dirty to true each time.
670
791
*/
671
 
int ha_archive::doInsertRecord(unsigned char *buf)
 
792
int ha_archive::write_row(uchar *buf)
672
793
{
673
794
  int rc;
674
 
  unsigned char *read_buf= NULL;
 
795
  uchar *read_buf= NULL;
675
796
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
797
  uchar *record=  table->record[0];
 
798
  DBUG_ENTER("ha_archive::write_row");
677
799
 
678
800
  if (share->crashed)
679
 
    return(HA_ERR_CRASHED_ON_USAGE);
 
801
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
680
802
 
681
 
  pthread_mutex_lock(&share->mutex());
 
803
  ha_statistic_increment(&SSV::ha_write_count);
 
804
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
805
    table->timestamp_field->set_time();
 
806
  pthread_mutex_lock(&share->mutex);
682
807
 
683
808
  if (share->archive_write_open == false)
684
809
    if (init_archive_writer())
685
 
      return(HA_ERR_CRASHED_ON_USAGE);
686
 
 
687
 
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
810
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
811
 
 
812
 
 
813
  if (table->next_number_field && record == table->record[0])
689
814
  {
 
815
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
816
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
817
    temp_auto= table->next_number_field->val_int();
692
818
 
693
819
    /*
694
820
      We don't support decremening auto_increment. They make the performance
695
821
      just cry.
696
822
    */
697
 
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
823
    if (temp_auto <= share->archive_write.auto_increment && 
 
824
        mkey->flags & HA_NOSAME)
699
825
    {
700
826
      rc= HA_ERR_FOUND_DUPP_KEY;
701
827
      goto error;
702
828
    }
 
829
#ifdef DEAD_CODE
 
830
    /*
 
831
      Bad news, this will cause a search for the unique value which is very 
 
832
      expensive since we will have to do a table scan which will lock up 
 
833
      all other writers during this period. This could perhaps be optimized 
 
834
      in the future.
 
835
    */
 
836
    {
 
837
      /* 
 
838
        First we create a buffer that we can use for reading rows, and can pass
 
839
        to get_row().
 
840
      */
 
841
      if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
842
      {
 
843
        rc= HA_ERR_OUT_OF_MEM;
 
844
        goto error;
 
845
      }
 
846
       /* 
 
847
         All of the buffer must be written out or we won't see all of the
 
848
         data 
 
849
       */
 
850
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
851
      /*
 
852
        Set the position of the local read thread to the beginning postion.
 
853
      */
 
854
      if (read_data_header(&archive))
 
855
      {
 
856
        rc= HA_ERR_CRASHED_ON_USAGE;
 
857
        goto error;
 
858
      }
 
859
 
 
860
      Field *mfield= table->next_number_field;
 
861
 
 
862
      while (!(get_row(&archive, read_buf)))
 
863
      {
 
864
        if (!memcmp(read_buf + mfield->offset(record),
 
865
                    table->next_number_field->ptr,
 
866
                    mfield->max_display_length()))
 
867
        {
 
868
          rc= HA_ERR_FOUND_DUPP_KEY;
 
869
          goto error;
 
870
        }
 
871
      }
 
872
    }
 
873
#endif
703
874
    else
704
875
    {
705
876
      if (temp_auto > share->archive_write.auto_increment)
715
886
  share->rows_recorded++;
716
887
  rc= real_write_row(buf,  &(share->archive_write));
717
888
error:
718
 
  pthread_mutex_unlock(&share->mutex());
 
889
  pthread_mutex_unlock(&share->mutex);
719
890
  if (read_buf)
720
 
    free((unsigned char*) read_buf);
 
891
    my_free((uchar*) read_buf, MYF(0));
721
892
 
722
 
  return(rc);
 
893
  DBUG_RETURN(rc);
723
894
}
724
895
 
725
896
 
726
 
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
727
 
                                    uint64_t *first_value, uint64_t *nb_reserved_values)
 
897
void ha_archive::get_auto_increment(uint64_t offset __attribute__((__unused__)),
 
898
                                    uint64_t increment __attribute__((__unused__)),
 
899
                                    uint64_t nb_desired_values __attribute__((__unused__)),
 
900
                                    uint64_t *first_value __attribute__((__unused__)),
 
901
                                    uint64_t *nb_reserved_values __attribute__((__unused__)))
728
902
{
729
903
  *nb_reserved_values= UINT64_MAX;
730
904
  *first_value= share->archive_write.auto_increment + 1;
731
905
}
732
906
 
733
 
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
 
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
907
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
908
int ha_archive::index_init(uint keynr, bool sorted __attribute__((__unused__)))
735
909
{
 
910
  DBUG_ENTER("ha_archive::index_init");
736
911
  active_index= keynr;
737
 
  return(0);
 
912
  DBUG_RETURN(0);
738
913
}
739
914
 
740
915
 
742
917
  No indexes, so if we get a request for an index search since we tell
743
918
  the optimizer that we have unique indexes, we scan
744
919
*/
745
 
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
 
                             uint32_t key_len, enum ha_rkey_function)
 
920
int ha_archive::index_read(uchar *buf, const uchar *key,
 
921
                             uint key_len, enum ha_rkey_function find_flag)
 
922
{
 
923
  int rc;
 
924
  DBUG_ENTER("ha_archive::index_read");
 
925
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
 
926
  DBUG_RETURN(rc);
 
927
}
 
928
 
 
929
 
 
930
int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
 
931
                               uint key_len,
 
932
                               enum ha_rkey_function find_flag __attribute__((__unused__)))
747
933
{
748
934
  int rc;
749
935
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
936
  KEY *mkey= &table->s->key_info[index];
 
937
  current_k_offset= mkey->key_part->offset;
751
938
  current_key= key;
752
939
  current_key_len= key_len;
753
940
 
754
 
  rc= doStartTableScan(true);
 
941
 
 
942
  DBUG_ENTER("ha_archive::index_read_idx");
 
943
 
 
944
  rc= rnd_init(true);
755
945
 
756
946
  if (rc)
757
947
    goto error;
766
956
  }
767
957
 
768
958
  if (found)
769
 
    return(0);
 
959
    DBUG_RETURN(0);
770
960
 
771
961
error:
772
 
  return(rc ? rc : HA_ERR_END_OF_FILE);
 
962
  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
773
963
}
774
964
 
775
965
 
776
 
int ha_archive::index_next(unsigned char * buf)
777
 
{
 
966
int ha_archive::index_next(uchar * buf) 
 
967
778
968
  bool found= 0;
779
969
 
 
970
  DBUG_ENTER("ha_archive::index_next");
 
971
 
780
972
  while (!(get_row(&archive, buf)))
781
973
  {
782
974
    if (!memcmp(current_key, buf+current_k_offset, current_key_len))
786
978
    }
787
979
  }
788
980
 
789
 
  return(found ? 0 : HA_ERR_END_OF_FILE);
 
981
  DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE); 
790
982
}
791
983
 
792
984
/*
795
987
  we assume the position will be set.
796
988
*/
797
989
 
798
 
int ha_archive::doStartTableScan(bool scan)
 
990
int ha_archive::rnd_init(bool scan)
799
991
{
 
992
  DBUG_ENTER("ha_archive::rnd_init");
 
993
  
800
994
  if (share->crashed)
801
 
      return(HA_ERR_CRASHED_ON_USAGE);
 
995
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
802
996
 
803
997
  init_archive_reader();
804
998
 
805
999
  /* We rewind the file so that we can read from the beginning if scan */
806
1000
  if (scan)
807
1001
  {
 
1002
    DBUG_PRINT("info", ("archive will retrieve %llu rows", 
 
1003
                        (uint64_t) scan_rows));
 
1004
 
808
1005
    if (read_data_header(&archive))
809
 
      return(HA_ERR_CRASHED_ON_USAGE);
 
1006
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
810
1007
  }
811
1008
 
812
 
  return(0);
 
1009
  DBUG_RETURN(0);
813
1010
}
814
1011
 
815
1012
 
816
1013
/*
817
 
  This is the method that is used to read a row. It assumes that the row is
 
1014
  This is the method that is used to read a row. It assumes that the row is 
818
1015
  positioned where you want it.
819
1016
*/
820
 
int ha_archive::get_row(azio_stream *file_to_read, unsigned char *buf)
 
1017
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
821
1018
{
822
1019
  int rc;
823
 
 
 
1020
  DBUG_ENTER("ha_archive::get_row");
 
1021
  DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d", 
 
1022
                            (uchar)file_to_read->version, 
 
1023
                            ARCHIVE_VERSION));
824
1024
  if (file_to_read->version == ARCHIVE_VERSION)
825
1025
    rc= get_row_version3(file_to_read, buf);
826
1026
  else
827
1027
    rc= -1;
828
1028
 
829
 
  return(rc);
 
1029
  DBUG_PRINT("ha_archive", ("Return %d\n", rc));
 
1030
 
 
1031
  DBUG_RETURN(rc);
830
1032
}
831
1033
 
832
1034
/* Reallocate buffer if needed */
833
1035
bool ha_archive::fix_rec_buff(unsigned int length)
834
1036
{
835
 
  record_buffer.resize(length);
836
 
 
837
 
  return false;
 
1037
  DBUG_ENTER("ha_archive::fix_rec_buff");
 
1038
  DBUG_PRINT("ha_archive", ("Fixing %u for %u", 
 
1039
                            length, record_buffer->length));
 
1040
  DBUG_ASSERT(record_buffer->buffer);
 
1041
 
 
1042
  if (length > record_buffer->length)
 
1043
  {
 
1044
    uchar *newptr;
 
1045
    if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer, 
 
1046
                                    length,
 
1047
                                    MYF(MY_ALLOW_ZERO_PTR))))
 
1048
      DBUG_RETURN(1);
 
1049
    record_buffer->buffer= newptr;
 
1050
    record_buffer->length= length;
 
1051
  }
 
1052
 
 
1053
  DBUG_ASSERT(length <= record_buffer->length);
 
1054
 
 
1055
  DBUG_RETURN(0);
838
1056
}
839
1057
 
840
 
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
 
1058
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
841
1059
{
 
1060
  DBUG_ENTER("ha_archive::unpack_row");
 
1061
 
842
1062
  unsigned int read;
843
1063
  int error;
844
 
  const unsigned char *ptr;
 
1064
  const uchar *ptr;
845
1065
 
846
1066
  read= azread_row(file_to_read, &error);
847
 
  ptr= (const unsigned char *)file_to_read->row_ptr;
 
1067
  ptr= (const uchar *)file_to_read->row_ptr;
848
1068
 
849
1069
  if (error || read == 0)
850
1070
  {
851
 
    return(-1);
 
1071
    DBUG_RETURN(-1);
852
1072
  }
853
1073
 
854
1074
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
1075
  memcpy(record, ptr, table->s->null_bytes);
 
1076
  ptr+= table->s->null_bytes;
 
1077
  for (Field **field=table->field ; *field ; field++)
858
1078
  {
859
1079
    if (!((*field)->is_null()))
860
1080
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
1081
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
862
1082
    }
863
1083
  }
864
 
  return(0);
 
1084
  DBUG_RETURN(0);
865
1085
}
866
1086
 
867
1087
 
868
 
int ha_archive::get_row_version3(azio_stream *file_to_read, unsigned char *buf)
 
1088
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
869
1089
{
 
1090
  DBUG_ENTER("ha_archive::get_row_version3");
 
1091
 
870
1092
  int returnable= unpack_row(file_to_read, buf);
871
1093
 
872
 
  return(returnable);
 
1094
  DBUG_RETURN(returnable);
873
1095
}
874
1096
 
875
1097
 
876
 
/*
 
1098
/* 
877
1099
  Called during ORDER BY. Its position is either from being called sequentially
878
1100
  or by having had ha_archive::rnd_pos() called before it is called.
879
1101
*/
880
1102
 
881
 
int ha_archive::rnd_next(unsigned char *buf)
 
1103
int ha_archive::rnd_next(uchar *buf)
882
1104
{
883
1105
  int rc;
 
1106
  DBUG_ENTER("ha_archive::rnd_next");
884
1107
 
885
1108
  if (share->crashed)
886
 
      return(HA_ERR_CRASHED_ON_USAGE);
 
1109
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
887
1110
 
888
1111
  if (!scan_rows)
889
 
    return(HA_ERR_END_OF_FILE);
 
1112
    DBUG_RETURN(HA_ERR_END_OF_FILE);
890
1113
  scan_rows--;
891
1114
 
892
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1115
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
893
1116
  current_position= aztell(&archive);
894
1117
  rc= get_row(&archive, buf);
895
1118
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
1119
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
1120
 
898
 
  return(rc);
 
1121
  DBUG_RETURN(rc);
899
1122
}
900
1123
 
901
1124
 
902
1125
/*
903
 
  Thanks to the table bool is_ordered this will be called after
 
1126
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
904
1127
  each call to ha_archive::rnd_next() if an ordering of the rows is
905
1128
  needed.
906
1129
*/
907
1130
 
908
 
void ha_archive::position(const unsigned char *)
 
1131
void ha_archive::position(const uchar *record __attribute__((__unused__)))
909
1132
{
910
 
  internal::my_store_ptr(ref, ref_length, current_position);
911
 
  return;
 
1133
  DBUG_ENTER("ha_archive::position");
 
1134
  my_store_ptr(ref, ref_length, current_position);
 
1135
  DBUG_VOID_RETURN;
912
1136
}
913
1137
 
914
1138
 
919
1143
  correctly ordered row.
920
1144
*/
921
1145
 
922
 
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
 
1146
int ha_archive::rnd_pos(uchar * buf, uchar *pos)
923
1147
{
924
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
925
 
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
 
1148
  DBUG_ENTER("ha_archive::rnd_pos");
 
1149
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1150
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
926
1151
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
927
 
    return(HA_ERR_CRASHED_ON_USAGE);
928
 
  return(get_row(&archive, buf));
 
1152
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
1153
  DBUG_RETURN(get_row(&archive, buf));
929
1154
}
930
1155
 
931
1156
/*
932
 
  This method repairs the meta file. It does this by walking the datafile and
 
1157
  This method repairs the meta file. It does this by walking the datafile and 
933
1158
  rewriting the meta file. Currently it does this by calling optimize with
934
1159
  the extended flag.
935
1160
*/
936
 
int ha_archive::repair()
 
1161
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
937
1162
{
938
 
  int rc= optimize();
 
1163
  DBUG_ENTER("ha_archive::repair");
 
1164
  check_opt->flags= T_EXTEND;
 
1165
  int rc= optimize(thd, check_opt);
939
1166
 
940
1167
  if (rc)
941
 
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1168
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
942
1169
 
943
1170
  share->crashed= false;
944
 
  return(0);
 
1171
  DBUG_RETURN(0);
945
1172
}
946
1173
 
947
1174
/*
948
1175
  The table can become fragmented if data was inserted, read, and then
949
 
  inserted again. What we do is open up the file and recompress it completely.
 
1176
  inserted again. What we do is open up the file and recompress it completely. 
950
1177
*/
951
 
int ha_archive::optimize()
 
1178
int ha_archive::optimize(THD* thd __attribute__((__unused__)),
 
1179
                         HA_CHECK_OPT* check_opt __attribute__((__unused__)))
952
1180
{
 
1181
  DBUG_ENTER("ha_archive::optimize");
953
1182
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
1183
  azio_stream writer;
 
1184
  char writer_filename[FN_REFLEN];
955
1185
 
956
1186
  init_archive_reader();
957
1187
 
962
1192
    share->archive_write_open= false;
963
1193
  }
964
1194
 
965
 
  char* proto_string;
966
 
  proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
 
  if (proto_string == NULL)
968
 
  {
969
 
    return ENOMEM;
970
 
  }
971
 
  azread_frm(&archive, proto_string);
972
 
 
973
1195
  /* Lets create a file to contain the new data */
974
 
  std::string writer_filename= share->table_name;
975
 
  writer_filename.append(ARN);
976
 
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
 
  {
979
 
    free(proto_string);
980
 
    return(HA_ERR_CRASHED_ON_USAGE);
981
 
  }
982
 
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
984
 
 
985
 
  /*
986
 
    An extended rebuild is a lot more effort. We open up each row and re-record it.
987
 
    Any dead rows are removed (aka rows that may have been partially recorded).
 
1196
  fn_format(writer_filename, share->table_name, "", ARN, 
 
1197
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
1198
 
 
1199
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
 
1200
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 
 
1201
 
 
1202
  /* 
 
1203
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
 
1204
    Any dead rows are removed (aka rows that may have been partially recorded). 
988
1205
 
989
1206
    As of Archive format 3, this is the only type that is performed, before this
990
1207
    version it was just done on T_EXTEND
991
1208
  */
992
1209
  if (1)
993
1210
  {
 
1211
    DBUG_PRINT("ha_archive", ("archive extended rebuild"));
 
1212
 
994
1213
    /*
995
 
      Now we will rewind the archive file so that we are positioned at the
 
1214
      Now we will rewind the archive file so that we are positioned at the 
996
1215
      start of the file.
997
1216
    */
998
1217
    azflush(&archive, Z_SYNC_FLUSH);
999
1218
    rc= read_data_header(&archive);
1000
1219
 
1001
 
    /*
 
1220
    /* 
1002
1221
      On success of writing out the new header, we now fetch each row and
1003
 
      insert it into the new archive file.
 
1222
      insert it into the new archive file. 
1004
1223
    */
1005
1224
    if (!rc)
1006
1225
    {
 
1226
      uint64_t x;
1007
1227
      uint64_t rows_restored;
1008
1228
      share->rows_recorded= 0;
1009
1229
      stats.auto_increment_value= 1;
1010
1230
      share->archive_write.auto_increment= 0;
 
1231
      my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
1011
1232
 
1012
1233
      rows_restored= archive.rows;
1013
1234
 
1014
 
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1235
      for (x= 0; x < rows_restored ; x++)
1015
1236
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1237
        rc= get_row(&archive, table->record[0]);
1017
1238
 
1018
1239
        if (rc != 0)
1019
1240
          break;
1020
1241
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1242
        real_write_row(table->record[0], &writer);
1022
1243
        /*
1023
1244
          Long term it should be possible to optimize this so that
1024
1245
          it is not called on each row.
1025
1246
        */
1026
 
        if (getTable()->found_next_number_field)
 
1247
        if (table->found_next_number_field)
1027
1248
        {
1028
 
          Field *field= getTable()->found_next_number_field;
1029
 
 
1030
 
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
 
          field->setReadSet();
1032
 
 
 
1249
          Field *field= table->found_next_number_field;
1033
1250
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1251
            (uint64_t) field->val_int(table->record[0] +
 
1252
                                       field->offset(table->record[0]));
1036
1253
          if (share->archive_write.auto_increment < auto_value)
1037
1254
            stats.auto_increment_value=
1038
1255
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1256
        }
1040
1257
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1258
      dbug_tmp_restore_column_map(table->read_set, org_bitmap);
 
1259
      share->rows_recorded= (ha_rows)writer.rows;
1042
1260
    }
1043
1261
 
 
1262
    DBUG_PRINT("info", ("recovered %llu archive rows", 
 
1263
                        (uint64_t)share->rows_recorded));
 
1264
 
 
1265
    DBUG_PRINT("ha_archive", ("recovered %llu archive rows", 
 
1266
                        (uint64_t)share->rows_recorded));
 
1267
 
1044
1268
    if (rc && rc != HA_ERR_END_OF_FILE)
1045
1269
    {
1046
1270
      goto error;
1047
1271
    }
1048
 
  }
 
1272
  } 
1049
1273
 
1050
 
  azclose(writer.get());
 
1274
  azclose(&writer);
1051
1275
  share->dirty= false;
1052
 
 
 
1276
  
1053
1277
  azclose(&archive);
1054
1278
 
1055
1279
  // make the file we just wrote be our data file
1056
 
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1057
 
 
1058
 
  free(proto_string);
1059
 
  return(rc);
 
1280
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
 
1281
 
 
1282
 
 
1283
  DBUG_RETURN(rc);
1060
1284
error:
1061
 
  free(proto_string);
1062
 
  azclose(writer.get());
 
1285
  DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
 
1286
  azclose(&writer);
1063
1287
 
1064
 
  return(rc);
 
1288
  DBUG_RETURN(rc); 
1065
1289
}
1066
1290
 
1067
 
/*
 
1291
/* 
1068
1292
  Below is an example of how to setup row level locking.
1069
1293
*/
1070
 
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
 
1294
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1071
1295
                                       THR_LOCK_DATA **to,
1072
1296
                                       enum thr_lock_type lock_type)
1073
1297
{
1074
 
  delayed_insert= false;
 
1298
  if (lock_type == TL_WRITE_DELAYED)
 
1299
    delayed_insert= true;
 
1300
  else
 
1301
    delayed_insert= false;
1075
1302
 
1076
 
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1303
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
1077
1304
  {
1078
 
    /*
 
1305
    /* 
1079
1306
      Here is where we get into the guts of a row level lock.
1080
 
      If TL_UNLOCK is set
1081
 
      If we are not doing a LOCK Table or DISCARD/IMPORT
1082
 
      TABLESPACE, then allow multiple writers
 
1307
      If TL_UNLOCK is set 
 
1308
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
 
1309
      TABLESPACE, then allow multiple writers 
1083
1310
    */
1084
1311
 
1085
1312
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
 
         lock_type <= TL_WRITE)
1087
 
        && ! session->doing_tablespace_operation())
 
1313
         lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
 
1314
        && !thd_tablespace_op(thd))
1088
1315
      lock_type = TL_WRITE_ALLOW_WRITE;
1089
1316
 
1090
 
    /*
 
1317
    /* 
1091
1318
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1092
1319
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1093
1320
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1094
1321
      to t2. Convert the lock to a normal read lock to allow
1095
 
      concurrent inserts to t2.
 
1322
      concurrent inserts to t2. 
1096
1323
    */
1097
1324
 
1098
 
    if (lock_type == TL_READ_NO_INSERT)
 
1325
    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) 
1099
1326
      lock_type = TL_READ;
1100
1327
 
1101
1328
    lock.type=lock_type;
1106
1333
  return to;
1107
1334
}
1108
1335
 
 
1336
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
 
1337
{
 
1338
  DBUG_ENTER("ha_archive::update_create_info");
 
1339
 
 
1340
  ha_archive::info(HA_STATUS_AUTO);
 
1341
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
 
1342
  {
 
1343
    create_info->auto_increment_value= stats.auto_increment_value;
 
1344
  }
 
1345
 
 
1346
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
 
1347
    create_info->data_file_name= share->real_path;
 
1348
 
 
1349
  DBUG_VOID_RETURN;
 
1350
}
 
1351
 
 
1352
 
1109
1353
/*
1110
1354
  Hints for optimizer, see ha_tina for more information
1111
1355
*/
1112
 
int ha_archive::info(uint32_t flag)
 
1356
int ha_archive::info(uint flag)
1113
1357
{
1114
 
  /*
 
1358
  DBUG_ENTER("ha_archive::info");
 
1359
 
 
1360
  /* 
1115
1361
    If dirty, we lock, and then reset/flush the data.
1116
1362
    I found that just calling azflush() doesn't always work.
1117
1363
  */
1118
 
  pthread_mutex_lock(&share->mutex());
 
1364
  pthread_mutex_lock(&share->mutex);
1119
1365
  if (share->dirty == true)
1120
1366
  {
 
1367
    DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1121
1368
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1122
1369
    share->rows_recorded= share->archive_write.rows;
1123
1370
    share->dirty= false;
1129
1376
 
1130
1377
  }
1131
1378
 
1132
 
  /*
 
1379
  /* 
1133
1380
    This should be an accurate number now, though bulk and delayed inserts can
1134
1381
    cause the number to be inaccurate.
1135
1382
  */
1136
1383
  stats.records= share->rows_recorded;
1137
 
  pthread_mutex_unlock(&share->mutex());
 
1384
  pthread_mutex_unlock(&share->mutex);
1138
1385
 
1139
1386
  scan_rows= stats.records;
1140
1387
  stats.deleted= 0;
1141
1388
 
 
1389
  DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1142
1390
  /* Costs quite a bit more to get all information */
1143
1391
  if (flag & HA_STATUS_TIME)
1144
1392
  {
1145
1393
    struct stat file_stat;  // Stat information for the data file
1146
1394
 
1147
 
    stat(share->data_file_name.c_str(), &file_stat);
 
1395
    VOID(stat(share->data_file_name, &file_stat));
1148
1396
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1397
    stats.mean_rec_length= table->s->reclength + buffer.alloced_length();
1150
1398
    stats.data_file_length= file_stat.st_size;
1151
1399
    stats.create_time= file_stat.st_ctime;
1152
1400
    stats.update_time= file_stat.st_mtime;
1158
1406
  if (flag & HA_STATUS_AUTO)
1159
1407
  {
1160
1408
    init_archive_reader();
1161
 
    pthread_mutex_lock(&share->mutex());
 
1409
    pthread_mutex_lock(&share->mutex);
1162
1410
    azflush(&archive, Z_SYNC_FLUSH);
1163
 
    pthread_mutex_unlock(&share->mutex());
 
1411
    pthread_mutex_unlock(&share->mutex);
1164
1412
    stats.auto_increment_value= archive.auto_increment + 1;
1165
1413
  }
1166
1414
 
1167
 
  return(0);
 
1415
  DBUG_RETURN(0);
1168
1416
}
1169
1417
 
1170
1418
 
1171
1419
/*
1172
1420
  This method tells us that a bulk insert operation is about to occur. We set
1173
 
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
 
1421
  a flag which will keep write_row from saying that its data is dirty. This in
1174
1422
  turn will keep selects from causing a sync to occur.
1175
1423
  Basically, yet another optimizations to keep compression working well.
1176
1424
*/
1177
1425
void ha_archive::start_bulk_insert(ha_rows rows)
1178
1426
{
 
1427
  DBUG_ENTER("ha_archive::start_bulk_insert");
1179
1428
  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1180
1429
    bulk_insert= true;
1181
 
  return;
 
1430
  DBUG_VOID_RETURN;
1182
1431
}
1183
1432
 
1184
1433
 
1185
 
/*
 
1434
/* 
1186
1435
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1187
1436
  flag, and set the share dirty so that the next select will call sync for us.
1188
1437
*/
1189
1438
int ha_archive::end_bulk_insert()
1190
1439
{
 
1440
  DBUG_ENTER("ha_archive::end_bulk_insert");
1191
1441
  bulk_insert= false;
1192
1442
  share->dirty= true;
1193
 
  return(0);
 
1443
  DBUG_RETURN(0);
1194
1444
}
1195
1445
 
1196
1446
/*
1197
1447
  We cancel a truncate command. The only way to delete an archive table is to drop it.
1198
 
  This is done for security reasons. In a later version we will enable this by
 
1448
  This is done for security reasons. In a later version we will enable this by 
1199
1449
  allowing the user to select a different row format.
1200
1450
*/
1201
1451
int ha_archive::delete_all_rows()
1202
1452
{
1203
 
  return(HA_ERR_WRONG_COMMAND);
 
1453
  DBUG_ENTER("ha_archive::delete_all_rows");
 
1454
  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
 
1455
}
 
1456
 
 
1457
/*
 
1458
  We just return state if asked.
 
1459
*/
 
1460
bool ha_archive::is_crashed() const 
 
1461
{
 
1462
  DBUG_ENTER("ha_archive::is_crashed");
 
1463
  DBUG_RETURN(share->crashed); 
1204
1464
}
1205
1465
 
1206
1466
/*
1207
1467
  Simple scan of the tables to make sure everything is ok.
1208
1468
*/
1209
1469
 
1210
 
int ha_archive::check(Session* session)
 
1470
int ha_archive::check(THD* thd,
 
1471
                      HA_CHECK_OPT* check_opt __attribute__((__unused__)))
1211
1472
{
1212
1473
  int rc= 0;
1213
1474
  const char *old_proc_info;
 
1475
  uint64_t x;
 
1476
  DBUG_ENTER("ha_archive::check");
1214
1477
 
1215
 
  old_proc_info= session->get_proc_info();
1216
 
  session->set_proc_info("Checking table");
 
1478
  old_proc_info= thd_proc_info(thd, "Checking table");
1217
1479
  /* Flush any waiting data */
1218
 
  pthread_mutex_lock(&share->mutex());
 
1480
  pthread_mutex_lock(&share->mutex);
1219
1481
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
 
  pthread_mutex_unlock(&share->mutex());
 
1482
  pthread_mutex_unlock(&share->mutex);
1221
1483
 
1222
1484
  /*
1223
 
    Now we will rewind the archive file so that we are positioned at the
 
1485
    Now we will rewind the archive file so that we are positioned at the 
1224
1486
    start of the file.
1225
1487
  */
1226
1488
  init_archive_reader();
1227
1489
  azflush(&archive, Z_SYNC_FLUSH);
1228
1490
  read_data_header(&archive);
1229
 
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1491
  for (x= 0; x < share->archive_write.rows; x++)
1230
1492
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1493
    rc= get_row(&archive, table->record[0]);
1232
1494
 
1233
1495
    if (rc != 0)
1234
1496
      break;
1235
1497
  }
1236
1498
 
1237
 
  session->set_proc_info(old_proc_info);
 
1499
  thd_proc_info(thd, old_proc_info);
1238
1500
 
1239
 
  if ((rc && rc != HA_ERR_END_OF_FILE))
 
1501
  if ((rc && rc != HA_ERR_END_OF_FILE))  
1240
1502
  {
1241
1503
    share->crashed= false;
1242
 
    return(HA_ADMIN_CORRUPT);
 
1504
    DBUG_RETURN(HA_ADMIN_CORRUPT);
1243
1505
  }
1244
1506
  else
1245
1507
  {
1246
 
    return(HA_ADMIN_OK);
1247
 
  }
1248
 
}
1249
 
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1251
 
{
1252
 
  int error= 0;
1253
 
 
1254
 
  for (const char **ext= bas_ext(); *ext ; ext++)
1255
 
  {
1256
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1257
 
    {
1258
 
      if ((error=errno) != ENOENT)
1259
 
        break;
1260
 
      error= 0;
1261
 
    }
1262
 
  }
1263
 
 
1264
 
  return error;
1265
 
}
1266
 
 
1267
 
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const identifier::Table &identifier)
1269
 
{
1270
 
  string proto_path(identifier.getPath());
1271
 
  proto_path.append(ARZ);
1272
 
 
1273
 
  if (access(proto_path.c_str(), F_OK))
1274
 
  {
1275
 
    return false;
1276
 
  }
1277
 
 
1278
 
  return true;
1279
 
}
1280
 
 
1281
 
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::identifier::Schema &schema_identifier,
1283
 
                                          drizzled::identifier::Table::vector &set_of_identifiers)
1284
 
{
1285
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
 
 
1287
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
1288
 
       entry_iter != entries.end(); ++entry_iter)
1289
 
  {
1290
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
 
    const string *filename= &entry->filename;
1292
 
 
1293
 
    assert(filename->size());
1294
 
 
1295
 
    const char *ext= strchr(filename->c_str(), '.');
1296
 
 
1297
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1299
 
    {  }
1300
 
    else
1301
 
    {
1302
 
      char uname[NAME_LEN + 1];
1303
 
      uint32_t file_name_len;
1304
 
 
1305
 
      file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
 
      // TODO: Remove need for memory copy here
1307
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
 
 
1309
 
      set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
1310
 
    }
1311
 
  }
1312
 
}
 
1508
    DBUG_RETURN(HA_ADMIN_OK);
 
1509
  }
 
1510
}
 
1511
 
 
1512
/*
 
1513
  Check and repair the table if needed.
 
1514
*/
 
1515
bool ha_archive::check_and_repair(THD *thd) 
 
1516
{
 
1517
  HA_CHECK_OPT check_opt;
 
1518
  DBUG_ENTER("ha_archive::check_and_repair");
 
1519
 
 
1520
  check_opt.init();
 
1521
 
 
1522
  DBUG_RETURN(repair(thd, &check_opt));
 
1523
}
 
1524
 
 
1525
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) 
 
1526
{
 
1527
  DBUG_ENTER("ha_archive::create_record_buffer");
 
1528
  archive_record_buffer *r;
 
1529
  if (!(r= 
 
1530
        (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
 
1531
                                           MYF(MY_WME))))
 
1532
  {
 
1533
    DBUG_RETURN(NULL); /* purecov: inspected */
 
1534
  }
 
1535
  r->length= (int)length;
 
1536
 
 
1537
  if (!(r->buffer= (uchar*) my_malloc(r->length,
 
1538
                                    MYF(MY_WME))))
 
1539
  {
 
1540
    my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
 
1541
    DBUG_RETURN(NULL); /* purecov: inspected */
 
1542
  }
 
1543
 
 
1544
  DBUG_RETURN(r);
 
1545
}
 
1546
 
 
1547
void ha_archive::destroy_record_buffer(archive_record_buffer *r) 
 
1548
{
 
1549
  DBUG_ENTER("ha_archive::destroy_record_buffer");
 
1550
  my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
 
1551
  my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
 
1552
  DBUG_VOID_RETURN;
 
1553
}
 
1554
 
 
1555
static MYSQL_SYSVAR_BOOL(aio, archive_use_aio,
 
1556
  PLUGIN_VAR_NOCMDOPT,
 
1557
  "Whether or not to use asynchronous IO.",
 
1558
  NULL, NULL, true);
 
1559
 
 
1560
static struct st_mysql_sys_var* archive_system_variables[]= {
 
1561
  MYSQL_SYSVAR(aio),
 
1562
  NULL
 
1563
};
 
1564
 
 
1565
struct st_mysql_storage_engine archive_storage_engine=
 
1566
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
 
1567
 
 
1568
mysql_declare_plugin(archive)
 
1569
{
 
1570
  MYSQL_STORAGE_ENGINE_PLUGIN,
 
1571
  &archive_storage_engine,
 
1572
  "ARCHIVE",
 
1573
  "Brian Aker, MySQL AB",
 
1574
  "Archive storage engine",
 
1575
  PLUGIN_LICENSE_GPL,
 
1576
  archive_db_init, /* Plugin Init */
 
1577
  archive_db_done, /* Plugin Deinit */
 
1578
  0x0350 /* 3.0 */,
 
1579
  NULL,                       /* status variables                */
 
1580
  archive_system_variables,   /* system variables                */
 
1581
  NULL                        /* config options                  */
 
1582
}
 
1583
mysql_declare_plugin_end;
 
1584