~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/ha_archive.cc

  • Committer: Brian Aker
  • Date: 2008-08-16 15:41:14 UTC
  • mto: This revision was merged to the branch mainline in revision 346.
  • Revision ID: brian@tangent.org-20080816154114-eufmwf31p6ie1nd6
Cleaned up depend in Proto utils. Modified int to bool. Put TmpTable class
into play.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
  along with this program; if not, write to the Free Software
14
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
 
16
#ifdef USE_PRAGMA_IMPLEMENTATION
 
17
#pragma implementation        // gcc: Class implementation
 
18
#endif
16
19
 
17
 
#include "drizzled/server_includes.h"
18
 
#include "drizzled/field.h"
19
 
#include "drizzled/field/blob.h"
20
 
#include "drizzled/field/timestamp.h"
21
 
#include "plugin/myisam/myisam.h"
22
 
#include "drizzled/table.h"
23
 
#include "drizzled/session.h"
24
 
#include <mysys/my_dir.h>
 
20
#include <drizzled/common_includes.h>
 
21
#include <storage/myisam/myisam.h>
25
22
 
26
23
#include "ha_archive.h"
27
24
 
28
 
#include <stdio.h>
29
 
#include <string>
30
 
#include <map>
31
 
 
32
 
using namespace std;
33
 
 
34
 
static const string engine_name("ARCHIVE");
35
 
 
36
25
/*
37
 
  First, if you want to understand storage engines you should look at
38
 
  ha_example.cc and ha_example.h.
 
26
  First, if you want to understand storage engines you should look at 
 
27
  ha_example.cc and ha_example.h. 
39
28
 
40
29
  This example was written as a test case for a customer who needed
41
30
  a storage engine without indexes that could compress data very well.
42
31
  So, welcome to a completely compressed storage engine. This storage
43
 
  engine only does inserts. No replace, deletes, or updates. All reads are
 
32
  engine only does inserts. No replace, deletes, or updates. All reads are 
44
33
  complete table scans. Compression is done through a combination of packing
45
34
  and making use of the zlib library
46
 
 
 
35
  
47
36
  We keep a file pointer open for each instance of ha_archive for each read
48
37
  but for writes we keep one open file handle just for that. We flush it
49
38
  only if we have a read occur. azip handles compressing lots of records
53
42
  the same time since we would want to flush).
54
43
 
55
44
  A "meta" file is kept alongside the data file. This file serves two purpose.
56
 
  The first purpose is to track the number of rows in the table. The second
57
 
  purpose is to determine if the table was closed properly or not. When the
58
 
  meta file is first opened it is marked as dirty. It is opened when the table
59
 
  itself is opened for writing. When the table is closed the new count for rows
60
 
  is written to the meta file and the file is marked as clean. If the meta file
61
 
  is opened and it is marked as dirty, it is assumed that a crash occured. At
 
45
  The first purpose is to track the number of rows in the table. The second 
 
46
  purpose is to determine if the table was closed properly or not. When the 
 
47
  meta file is first opened it is marked as dirty. It is opened when the table 
 
48
  itself is opened for writing. When the table is closed the new count for rows 
 
49
  is written to the meta file and the file is marked as clean. If the meta file 
 
50
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
62
51
  this point an error occurs and the user is told to rebuild the file.
63
52
  A rebuild scans the rows and rewrites the meta file. If corruption is found
64
53
  in the data file then the meta file is not repaired.
65
54
 
66
55
  At some point a recovery method for such a drastic case needs to be divised.
67
56
 
68
 
  Locks are row level, and you will get a consistant read.
 
57
  Locks are row level, and you will get a consistant read. 
69
58
 
70
59
  For performance as far as table scans go it is quite fast. I don't have
71
60
  good numbers but locally it has out performed both Innodb and MyISAM. For
72
61
  Innodb the question will be if the table can be fit into the buffer
73
62
  pool. For MyISAM its a question of how much the file system caches the
74
63
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
75
 
  doesn't have enough memory to cache entire table that archive turns out
76
 
  to be any faster.
 
64
  doesn't have enough memory to cache entire table that archive turns out 
 
65
  to be any faster. 
77
66
 
78
67
  Examples between MyISAM (packed) and Archive.
79
68
 
102
91
*/
103
92
 
104
93
/* Variables for archive share methods */
105
 
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
106
 
 
 
94
pthread_mutex_t archive_mutex;
 
95
static HASH archive_open_tables;
107
96
static unsigned int global_version;
108
97
 
109
98
/* The file extension */
110
 
#define ARZ ".arz"               // The data file
 
99
#define ARZ ".ARZ"               // The data file
111
100
#define ARN ".ARN"               // Files used during an optimize call
112
 
 
113
 
 
 
101
#define ARM ".ARM"               // Meta file (deprecated)
 
102
 
 
103
/*
 
104
  uchar + uchar
 
105
*/
 
106
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
 
107
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
 
108
 
 
109
/* Static declarations for handerton */
 
110
static handler *archive_create_handler(handlerton *hton, 
 
111
                                       TABLE_SHARE *table, 
 
112
                                       MEM_ROOT *mem_root);
 
113
int archive_discover(handlerton *hton, THD* thd, const char *db, 
 
114
                     const char *name,
 
115
                     uchar **frmblob, 
 
116
                     size_t *frmlen);
114
117
 
115
118
static bool archive_use_aio= false;
116
119
 
124
127
*/
125
128
#define ARCHIVE_ROW_HEADER_SIZE 4
126
129
 
 
130
static handler *archive_create_handler(handlerton *hton,
 
131
                                       TABLE_SHARE *table, 
 
132
                                       MEM_ROOT *mem_root)
 
133
{
 
134
  return new (mem_root) ha_archive(hton, table);
 
135
}
 
136
 
127
137
/*
128
 
  We just implement one additional file extension.
 
138
  Used for hash table that tracks open tables.
129
139
*/
130
 
static const char *ha_archive_exts[] = {
131
 
  ARZ,
132
 
  NULL
133
 
};
134
 
 
135
 
class ArchiveEngine : public drizzled::plugin::StorageEngine
136
 
{
137
 
  typedef std::map<string, ArchiveShare*> ArchiveMap;
138
 
  ArchiveMap archive_open_tables;
139
 
 
140
 
public:
141
 
  ArchiveEngine(const string &name_arg)
142
 
   : drizzled::plugin::StorageEngine(name_arg,
143
 
                                     HTON_FILE_BASED |
144
 
                                     HTON_STATS_RECORDS_IS_EXACT |
145
 
                                     HTON_HAS_RECORDS |
146
 
                                     HTON_HAS_DATA_DICTIONARY)
147
 
  {
148
 
    table_definition_ext= ARZ;
149
 
  }
150
 
 
151
 
  virtual Cursor *create(TableShare &table,
152
 
                          MEM_ROOT *mem_root)
153
 
  {
154
 
    return new (mem_root) ha_archive(*this, table);
155
 
  }
156
 
 
157
 
  const char **bas_ext() const {
158
 
    return ha_archive_exts;
159
 
  }
160
 
 
161
 
  int doCreateTable(Session *session, const char *table_name,
162
 
                    Table& table_arg,
163
 
                    drizzled::message::Table& proto);
164
 
 
165
 
  int doGetTableDefinition(Session& session,
166
 
                           const char* path,
167
 
                           const char *db,
168
 
                           const char *table_name,
169
 
                           const bool is_tmp,
170
 
                           drizzled::message::Table *table_proto);
171
 
 
172
 
  void doGetTableNames(CachedDirectory &directory, string& , set<string>& set_of_names);
173
 
 
174
 
  int doDropTable(Session&, const string table_path);
175
 
  ArchiveShare *findOpenTable(const string table_name);
176
 
  void addOpenTable(const string &table_name, ArchiveShare *);
177
 
  void deleteOpenTable(const string &table_name);
178
 
 
179
 
  uint32_t max_supported_keys()          const { return 1; }
180
 
  uint32_t max_supported_key_length()    const { return sizeof(uint64_t); }
181
 
  uint32_t max_supported_key_part_length() const { return sizeof(uint64_t); }
182
 
};
183
 
 
184
 
 
185
 
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
186
 
{
187
 
  ArchiveMap::iterator find_iter=
188
 
    archive_open_tables.find(table_name);
189
 
 
190
 
  if (find_iter != archive_open_tables.end())
191
 
    return (*find_iter).second;
192
 
  else
193
 
    return NULL;
194
 
}
195
 
 
196
 
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
197
 
{
198
 
  archive_open_tables[table_name]= share;
199
 
}
200
 
 
201
 
void ArchiveEngine::deleteOpenTable(const string &table_name)
202
 
{
203
 
  archive_open_tables.erase(table_name);
204
 
}
205
 
 
206
 
 
207
 
void ArchiveEngine::doGetTableNames(CachedDirectory &directory, 
208
 
                                    string&, 
209
 
                                    set<string>& set_of_names)
210
 
{
211
 
  CachedDirectory::Entries entries= directory.getEntries();
212
 
 
213
 
  for (CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
214
 
       entry_iter != entries.end(); ++entry_iter)
215
 
  {
216
 
    CachedDirectory::Entry *entry= *entry_iter;
217
 
    string *filename= &entry->filename;
218
 
 
219
 
    assert(filename->size());
220
 
 
221
 
    const char *ext= strchr(filename->c_str(), '.');
222
 
 
223
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
224
 
        is_prefix(filename->c_str(), TMP_FILE_PREFIX))
225
 
    {  }
226
 
    else
227
 
    {
228
 
      char uname[NAME_LEN + 1];
229
 
      uint32_t file_name_len;
230
 
 
231
 
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
232
 
      // TODO: Remove need for memory copy here
233
 
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
234
 
      set_of_names.insert(uname);
235
 
    }
236
 
  }
237
 
}
238
 
 
239
 
 
240
 
int ArchiveEngine::doDropTable(Session&,
241
 
                               const string table_path)
242
 
{
243
 
  string new_path(table_path);
244
 
 
245
 
  new_path+= ARZ;
246
 
 
247
 
  int error= unlink(new_path.c_str());
248
 
 
249
 
  if (error != 0)
250
 
  {
251
 
    error= my_errno= errno;
252
 
  }
253
 
 
254
 
  return error;
255
 
}
256
 
 
257
 
int ArchiveEngine::doGetTableDefinition(Session&,
258
 
                                        const char* path,
259
 
                                        const char *,
260
 
                                        const char *,
261
 
                                        const bool,
262
 
                                        drizzled::message::Table *table_proto)
263
 
{
264
 
  struct stat stat_info;
265
 
  int error= ENOENT;
266
 
  string proto_path;
267
 
 
268
 
  proto_path.reserve(FN_REFLEN);
269
 
  proto_path.assign(path);
270
 
 
271
 
  proto_path.append(ARZ);
272
 
 
273
 
  if (stat(proto_path.c_str(),&stat_info))
274
 
    return errno;
275
 
  else
276
 
    error= EEXIST;
277
 
 
278
 
  if (table_proto)
279
 
  {
280
 
    azio_stream proto_stream;
281
 
    char* proto_string;
282
 
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
283
 
      return HA_ERR_CRASHED_ON_USAGE;
284
 
 
285
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
286
 
    if (proto_string == NULL)
287
 
    {
288
 
      azclose(&proto_stream);
289
 
      return ENOMEM;
290
 
    }
291
 
 
292
 
    azread_frm(&proto_stream, proto_string);
293
 
 
294
 
    if (table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
295
 
      error= HA_ERR_CRASHED_ON_USAGE;
296
 
 
297
 
    azclose(&proto_stream);
298
 
    free(proto_string);
299
 
  }
300
 
 
301
 
  return error;
302
 
}
303
 
 
304
 
static ArchiveEngine *archive_engine= NULL;
 
140
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
 
141
                             bool not_used __attribute__((unused)))
 
142
{
 
143
  *length=share->table_name_length;
 
144
  return (uchar*) share->table_name;
 
145
}
 
146
 
305
147
 
306
148
/*
307
 
  Initialize the archive Cursor.
 
149
  Initialize the archive handler.
308
150
 
309
151
  SYNOPSIS
310
152
    archive_db_init()
315
157
    true        Error
316
158
*/
317
159
 
318
 
static int archive_db_init(drizzled::plugin::Registry &registry)
 
160
int archive_db_init(void *p)
319
161
{
 
162
  handlerton *archive_hton;
320
163
 
321
 
  pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
322
 
  archive_engine= new ArchiveEngine(engine_name);
323
 
  registry.add(archive_engine);
 
164
  archive_hton= (handlerton *)p;
 
165
  archive_hton->state= SHOW_OPTION_YES;
 
166
  archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
 
167
  archive_hton->create= archive_create_handler;
 
168
  archive_hton->flags= HTON_NO_FLAGS;
 
169
  archive_hton->discover= archive_discover;
324
170
 
325
171
  /* When the engine starts up set the first version */
326
172
  global_version= 1;
327
173
 
328
 
  return false;
 
174
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
 
175
    goto error;
 
176
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
 
177
                (hash_get_key) archive_get_key, 0, 0))
 
178
  {
 
179
    VOID(pthread_mutex_destroy(&archive_mutex));
 
180
  }
 
181
  else
 
182
  {
 
183
    return(false);
 
184
  }
 
185
error:
 
186
  return(true);
329
187
}
330
188
 
331
189
/*
332
 
  Release the archive Cursor.
 
190
  Release the archive handler.
333
191
 
334
192
  SYNOPSIS
335
193
    archive_db_done()
339
197
    false       OK
340
198
*/
341
199
 
342
 
static int archive_db_done(drizzled::plugin::Registry &registry)
 
200
int archive_db_done(void *p __attribute__((unused)))
343
201
{
344
 
  registry.remove(archive_engine);
345
 
  delete archive_engine;
346
 
 
347
 
  pthread_mutex_destroy(&archive_mutex);
 
202
  hash_free(&archive_open_tables);
 
203
  VOID(pthread_mutex_destroy(&archive_mutex));
348
204
 
349
205
  return 0;
350
206
}
351
207
 
352
208
 
353
 
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
354
 
                       TableShare &table_arg)
355
 
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
 
209
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
 
210
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
356
211
{
357
212
  /* Set our original buffer from pre-allocated memory */
358
213
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
362
217
  archive_reader_open= false;
363
218
}
364
219
 
 
220
int archive_discover(handlerton *hton __attribute__((unused)),
 
221
                     THD* thd __attribute__((unused)),
 
222
                     const char *db,
 
223
                     const char *name,
 
224
                     uchar **frmblob,
 
225
                     size_t *frmlen)
 
226
{
 
227
  azio_stream frm_stream;
 
228
  char az_file[FN_REFLEN];
 
229
  char *frm_ptr;
 
230
  struct stat file_stat; 
 
231
 
 
232
  fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
233
 
 
234
  if (stat(az_file, &file_stat))
 
235
    goto err;
 
236
 
 
237
  if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
 
238
  {
 
239
    if (errno == EROFS || errno == EACCES)
 
240
      return(my_errno= errno);
 
241
    return(HA_ERR_CRASHED_ON_USAGE);
 
242
  }
 
243
 
 
244
  if (frm_stream.frm_length == 0)
 
245
    goto err;
 
246
 
 
247
  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
 
248
  azread_frm(&frm_stream, frm_ptr);
 
249
  azclose(&frm_stream);
 
250
 
 
251
  *frmlen= frm_stream.frm_length;
 
252
  *frmblob= (uchar*) frm_ptr;
 
253
 
 
254
  return(0);
 
255
err:
 
256
  my_errno= 0;
 
257
  return(1);
 
258
}
 
259
 
365
260
/*
366
261
  This method reads the header of a datafile and returns whether or not it was successful.
367
262
*/
376
271
  return(1);
377
272
}
378
273
 
379
 
ArchiveShare::ArchiveShare():
380
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
381
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
382
 
{
383
 
  assert(1);
384
 
}
385
 
 
386
 
ArchiveShare::ArchiveShare(const char *name):
387
 
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
388
 
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
389
 
{
390
 
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
391
 
  table_name.append(name);
392
 
  fn_format(data_file_name, table_name.c_str(), "",
393
 
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
394
 
  /*
395
 
    We will use this lock for rows.
396
 
  */
397
 
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
398
 
}
399
 
 
400
 
ArchiveShare::~ArchiveShare()
401
 
{
402
 
  thr_lock_delete(&lock);
403
 
  pthread_mutex_destroy(&mutex);
404
 
  /*
405
 
    We need to make sure we don't reset the crashed state.
406
 
    If we open a crashed file, wee need to close it as crashed unless
407
 
    it has been repaired.
408
 
    Since we will close the data down after this, we go on and count
409
 
    the flush on close;
410
 
  */
411
 
  if (archive_write_open == true)
412
 
    (void)azclose(&archive_write);
413
 
}
414
 
 
415
 
bool ArchiveShare::prime(uint64_t *auto_increment)
416
 
{
417
 
  azio_stream archive_tmp;
418
 
 
419
 
  /*
420
 
    We read the meta file, but do not mark it dirty. Since we are not
421
 
    doing a write we won't mark it dirty (and we won't open it for
422
 
    anything but reading... open it for write and we will generate null
423
 
    compression writes).
424
 
  */
425
 
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
426
 
               AZ_METHOD_BLOCK)))
427
 
    return false;
428
 
 
429
 
  *auto_increment= archive_tmp.auto_increment + 1;
430
 
  rows_recorded= (ha_rows)archive_tmp.rows;
431
 
  crashed= archive_tmp.dirty;
432
 
  if (version < global_version)
433
 
  {
434
 
    version_rows= rows_recorded;
435
 
    version= global_version;
436
 
  }
437
 
  azclose(&archive_tmp);
438
 
 
439
 
  return true;
440
 
}
441
 
 
442
274
 
443
275
/*
444
 
  We create the shared memory space that we will use for the open table.
 
276
  We create the shared memory space that we will use for the open table. 
445
277
  No matter what we try to get or create a share. This is so that a repair
446
 
  table operation can occur.
 
278
  table operation can occur. 
447
279
 
448
280
  See ha_example.cc for a longer description.
449
281
*/
450
 
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
 
282
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
451
283
{
 
284
  uint length;
 
285
 
452
286
  pthread_mutex_lock(&archive_mutex);
453
 
 
454
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
455
 
  share= a_engine->findOpenTable(table_name);
456
 
 
457
 
  if (!share)
 
287
  length=(uint) strlen(table_name);
 
288
 
 
289
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
 
290
                                           (uchar*) table_name,
 
291
                                           length)))
458
292
  {
459
 
    share= new ArchiveShare(table_name);
 
293
    char *tmp_name;
 
294
    azio_stream archive_tmp;
460
295
 
461
 
    if (share == NULL)
 
296
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
297
                          &share, sizeof(*share),
 
298
                          &tmp_name, length+1,
 
299
                          NullS)) 
462
300
    {
463
301
      pthread_mutex_unlock(&archive_mutex);
464
302
      *rc= HA_ERR_OUT_OF_MEM;
465
303
      return(NULL);
466
304
    }
467
305
 
468
 
    if (share->prime(&stats.auto_increment_value) == false)
 
306
    share->use_count= 0;
 
307
    share->table_name_length= length;
 
308
    share->table_name= tmp_name;
 
309
    share->crashed= false;
 
310
    share->archive_write_open= false;
 
311
    fn_format(share->data_file_name, table_name, "",
 
312
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
313
    stpcpy(share->table_name, table_name);
 
314
    /*
 
315
      We will use this lock for rows.
 
316
    */
 
317
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
 
318
    
 
319
    /*
 
320
      We read the meta file, but do not mark it dirty. Since we are not
 
321
      doing a write we won't mark it dirty (and we won't open it for
 
322
      anything but reading... open it for write and we will generate null
 
323
      compression writes).
 
324
    */
 
325
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
 
326
                 AZ_METHOD_BLOCK)))
469
327
    {
 
328
      VOID(pthread_mutex_destroy(&share->mutex));
 
329
      free(share);
470
330
      pthread_mutex_unlock(&archive_mutex);
471
331
      *rc= HA_ERR_CRASHED_ON_REPAIR;
472
 
      delete share;
473
 
 
474
 
      return NULL;
475
 
    }
476
 
 
477
 
    a_engine->addOpenTable(share->table_name, share);
 
332
      return(NULL);
 
333
    }
 
334
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
 
335
    share->rows_recorded= (ha_rows)archive_tmp.rows;
 
336
    share->crashed= archive_tmp.dirty;
 
337
    if (share->version < global_version)
 
338
    {
 
339
      share->version_rows= share->rows_recorded;
 
340
      share->version= global_version;
 
341
    }
 
342
    azclose(&archive_tmp);
 
343
 
 
344
    VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
478
345
    thr_lock_init(&share->lock);
479
346
  }
480
347
  share->use_count++;
481
 
 
482
348
  if (share->crashed)
483
349
    *rc= HA_ERR_CRASHED_ON_USAGE;
484
350
  pthread_mutex_unlock(&archive_mutex);
487
353
}
488
354
 
489
355
 
490
 
/*
 
356
/* 
491
357
  Free the share.
492
358
  See ha_example.cc for a description.
493
359
*/
494
360
int ha_archive::free_share()
495
361
{
 
362
  int rc= 0;
 
363
 
496
364
  pthread_mutex_lock(&archive_mutex);
497
365
  if (!--share->use_count)
498
366
  {
499
 
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
500
 
    a_engine->deleteOpenTable(share->table_name);
501
 
     delete share;
 
367
    hash_delete(&archive_open_tables, (uchar*) share);
 
368
    thr_lock_delete(&share->lock);
 
369
    VOID(pthread_mutex_destroy(&share->mutex));
 
370
    /* 
 
371
      We need to make sure we don't reset the crashed state.
 
372
      If we open a crashed file, wee need to close it as crashed unless
 
373
      it has been repaired.
 
374
      Since we will close the data down after this, we go on and count
 
375
      the flush on close;
 
376
    */
 
377
    if (share->archive_write_open == true)
 
378
    {
 
379
      if (azclose(&(share->archive_write)))
 
380
        rc= 1;
 
381
    }
 
382
    my_free((uchar*) share, MYF(0));
502
383
  }
503
384
  pthread_mutex_unlock(&archive_mutex);
504
385
 
505
 
  return 0;
 
386
  return(rc);
506
387
}
507
388
 
508
389
int ha_archive::init_archive_writer()
509
390
{
510
 
  /*
 
391
  /* 
511
392
    It is expensive to open and close the data files and since you can't have
512
393
    a gzip file that can be both read and written we keep a writer open
513
394
    that is shared amoung all open tables.
514
395
  */
515
 
  if (!(azopen(&(share->archive_write), share->data_file_name,
516
 
               O_RDWR, AZ_METHOD_BLOCK)))
 
396
  if (!(azopen(&(share->archive_write), share->data_file_name, 
 
397
               O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
517
398
  {
518
399
    share->crashed= true;
519
400
    return(1);
524
405
}
525
406
 
526
407
 
527
 
/*
528
 
  No locks are required because it is associated with just one Cursor instance
 
408
/* 
 
409
  No locks are required because it is associated with just one handler instance
529
410
*/
530
411
int ha_archive::init_archive_reader()
531
412
{
532
 
  /*
 
413
  /* 
533
414
    It is expensive to open and close the data files and since you can't have
534
415
    a gzip file that can be both read and written we keep a writer open
535
416
    that is shared amoung all open tables.
549
430
    default:
550
431
      method= AZ_METHOD_BLOCK;
551
432
    }
552
 
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
 
433
    if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY, 
553
434
                 method)))
554
435
    {
555
436
      share->crashed= true;
561
442
  return(0);
562
443
}
563
444
 
 
445
 
564
446
/*
 
447
  We just implement one additional file extension.
 
448
*/
 
449
static const char *ha_archive_exts[] = {
 
450
  ARZ,
 
451
  NullS
 
452
};
 
453
 
 
454
const char **ha_archive::bas_ext() const
 
455
{
 
456
  return ha_archive_exts;
 
457
}
 
458
 
 
459
 
 
460
/* 
565
461
  When opening a file we:
566
462
  Create/get our shared structure.
567
463
  Init out lock.
568
464
  We open the file we will read from.
569
465
*/
570
 
int ha_archive::open(const char *name, int, uint32_t)
 
466
int ha_archive::open(const char *name,
 
467
                     int mode __attribute__((unused)),
 
468
                     uint open_options)
571
469
{
572
470
  int rc= 0;
573
471
  share= get_share(name, &rc);
574
472
 
575
 
  /** 
576
 
    We either fix it ourselves, or we just take it offline 
577
 
 
578
 
    @todo Create some documentation in the recovery tools shipped with the engine.
579
 
  */
580
 
  if (rc == HA_ERR_CRASHED_ON_USAGE)
 
473
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
581
474
  {
 
475
    /* purecov: begin inspected */
582
476
    free_share();
583
 
    rc= repair();
584
 
 
585
 
    return 0;
 
477
    return(rc);
 
478
    /* purecov: end */    
586
479
  }
587
480
  else if (rc == HA_ERR_OUT_OF_MEM)
588
481
  {
591
484
 
592
485
  assert(share);
593
486
 
594
 
  record_buffer= create_record_buffer(table->s->reclength +
 
487
  record_buffer= create_record_buffer(table->s->reclength + 
595
488
                                      ARCHIVE_ROW_HEADER_SIZE);
596
489
 
597
490
  if (!record_buffer)
602
495
 
603
496
  thr_lock_data_init(&share->lock, &lock, NULL);
604
497
 
605
 
  return(rc);
 
498
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
 
499
  {
 
500
    return(0);
 
501
  }
 
502
  else
 
503
    return(rc);
606
504
}
607
505
 
608
506
 
611
509
 
612
510
  SYNOPSIS
613
511
    close();
614
 
 
 
512
  
615
513
  IMPLEMENTATION:
616
514
 
617
515
  We first close this storage engines file handle to the archive and
643
541
 
644
542
 
645
543
/*
646
 
  We create our data file here. The format is pretty simple.
 
544
  We create our data file here. The format is pretty simple. 
647
545
  You can read about the format of the data file above.
648
 
  Unlike other storage engines we do not "pack" our data. Since we
649
 
  are about to do a general compression, packing would just be a waste of
650
 
  CPU time. If the table has blobs they are written after the row in the order
 
546
  Unlike other storage engines we do not "pack" our data. Since we 
 
547
  are about to do a general compression, packing would just be a waste of 
 
548
  CPU time. If the table has blobs they are written after the row in the order 
651
549
  of creation.
652
550
*/
653
551
 
654
 
int ArchiveEngine::doCreateTable(Session *,
655
 
                                 const char *table_name,
656
 
                                 Table& table_arg,
657
 
                                 drizzled::message::Table& proto)
 
552
int ha_archive::create(const char *name, TABLE *table_arg,
 
553
                       HA_CREATE_INFO *create_info)
658
554
{
659
555
  char name_buff[FN_REFLEN];
660
 
  int error= 0;
 
556
  char linkname[FN_REFLEN];
 
557
  int error;
661
558
  azio_stream create_stream;            /* Archive file we are working with */
662
 
  uint64_t auto_increment_value;
663
 
  string serialized_proto;
664
 
 
665
 
  auto_increment_value= proto.options().auto_increment_value();
666
 
 
667
 
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
 
559
  File frm_file;                   /* File handler for readers */
 
560
  struct stat file_stat;
 
561
  uchar *frm_ptr;
 
562
 
 
563
  stats.auto_increment_value= create_info->auto_increment_value;
 
564
 
 
565
  for (uint key= 0; key < table_arg->sizeKeys(); key++)
668
566
  {
669
 
    KEY *pos= table_arg.key_info+key;
 
567
    KEY *pos= table_arg->key_info+key;
670
568
    KEY_PART_INFO *key_part=     pos->key_part;
671
569
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
672
570
 
682
580
    }
683
581
  }
684
582
 
685
 
  /*
 
583
  /* 
686
584
    We reuse name_buff since it is available.
687
585
  */
688
 
  fn_format(name_buff, table_name, "", ARZ,
689
 
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
690
 
 
691
 
  my_errno= 0;
692
 
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
693
 
             AZ_METHOD_BLOCK) == 0)
 
586
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
694
587
  {
695
 
    error= errno;
696
 
    goto error2;
 
588
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
 
589
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
590
    fn_format(linkname, name, "", ARZ,
 
591
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
697
592
  }
698
 
 
699
 
  proto.SerializeToString(&serialized_proto);
700
 
 
701
 
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
702
 
                  serialized_proto.length()))
703
 
    goto error2;
704
 
 
705
 
  if (proto.options().has_comment())
 
593
  else
706
594
  {
707
 
    int write_length;
708
 
 
709
 
    write_length= azwrite_comment(&create_stream,
710
 
                                  proto.options().comment().c_str(),
711
 
                                  proto.options().comment().length());
712
 
 
713
 
    if (write_length < 0)
714
 
    {
715
 
      error= errno;
716
 
      goto error2;
717
 
    }
 
595
    fn_format(name_buff, name, "", ARZ,
 
596
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
597
    linkname[0]= 0;
718
598
  }
719
599
 
720
600
  /*
721
 
    Yes you need to do this, because the starting value
722
 
    for the autoincrement may not be zero.
 
601
    There is a chance that the file was "discovered". In this case
 
602
    just use whatever file is there.
723
603
  */
724
 
  create_stream.auto_increment= auto_increment_value ?
725
 
    auto_increment_value - 1 : 0;
726
 
 
727
 
  if (azclose(&create_stream))
 
604
  if (!stat(name_buff, &file_stat))
728
605
  {
729
 
    error= errno;
730
 
    goto error2;
 
606
    my_errno= 0;
 
607
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
 
608
                 AZ_METHOD_BLOCK)))
 
609
    {
 
610
      error= errno;
 
611
      goto error2;
 
612
    }
 
613
 
 
614
    if (linkname[0])
 
615
      my_symlink(name_buff, linkname, MYF(0));
 
616
    fn_format(name_buff, name, "", ".frm",
 
617
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
618
 
 
619
    /*
 
620
      Here is where we open up the frm and pass it to archive to store 
 
621
    */
 
622
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
 
623
    {
 
624
      if (fstat(frm_file, &file_stat))
 
625
      {
 
626
        frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
 
627
        if (frm_ptr)
 
628
        {
 
629
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
 
630
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
 
631
          my_free((uchar*)frm_ptr, MYF(0));
 
632
        }
 
633
      }
 
634
      my_close(frm_file, MYF(0));
 
635
    }
 
636
 
 
637
    if (create_info->comment.str)
 
638
      azwrite_comment(&create_stream, create_info->comment.str, 
 
639
                      (unsigned int)create_info->comment.length);
 
640
 
 
641
    /* 
 
642
      Yes you need to do this, because the starting value 
 
643
      for the autoincrement may not be zero.
 
644
    */
 
645
    create_stream.auto_increment= stats.auto_increment_value ?
 
646
                                    stats.auto_increment_value - 1 : 0;
 
647
    if (azclose(&create_stream))
 
648
    {
 
649
      error= errno;
 
650
      goto error2;
 
651
    }
731
652
  }
 
653
  else
 
654
    my_errno= 0;
732
655
 
733
656
  return(0);
734
657
 
735
658
error2:
736
 
  unlink(name_buff);
737
 
 
 
659
  delete_table(name);
738
660
error:
739
661
  /* Return error number, if we got one */
740
662
  return(error ? error : -1);
743
665
/*
744
666
  This is where the actual row is written out.
745
667
*/
746
 
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
 
668
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
747
669
{
748
 
  off_t written;
 
670
  my_off_t written;
749
671
  unsigned int r_pack_length;
750
672
 
751
673
  /* We pack the row for writing */
764
686
}
765
687
 
766
688
 
767
 
/*
 
689
/* 
768
690
  Calculate max length needed for row. This includes
769
691
  the bytes required for the length in the header.
770
692
*/
771
693
 
772
 
uint32_t ha_archive::max_row_length(const unsigned char *)
 
694
uint32_t ha_archive::max_row_length(const uchar *buf __attribute__((unused)))
773
695
{
774
696
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
775
697
  length+= ARCHIVE_ROW_HEADER_SIZE;
776
698
 
777
 
  uint32_t *ptr, *end;
 
699
  uint *ptr, *end;
778
700
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
779
701
       ptr != end ;
780
702
       ptr++)
786
708
}
787
709
 
788
710
 
789
 
unsigned int ha_archive::pack_row(unsigned char *record)
 
711
unsigned int ha_archive::pack_row(uchar *record)
790
712
{
791
 
  unsigned char *ptr;
 
713
  uchar *ptr;
792
714
 
793
715
  if (fix_rec_buff(max_row_length(record)))
794
 
    return(HA_ERR_OUT_OF_MEM);
 
716
    return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
795
717
 
796
718
  /* Copy null bits */
797
719
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
807
729
}
808
730
 
809
731
 
810
 
/*
 
732
/* 
811
733
  Look at ha_archive::open() for an explanation of the row format.
812
734
  Here we just write out the row.
813
735
 
814
736
  Wondering about start_bulk_insert()? We don't implement it for
815
737
  archive since it optimizes for lots of writes. The only save
816
 
  for implementing start_bulk_insert() is that we could skip
 
738
  for implementing start_bulk_insert() is that we could skip 
817
739
  setting dirty to true each time.
818
740
*/
819
 
int ha_archive::write_row(unsigned char *buf)
 
741
int ha_archive::write_row(uchar *buf)
820
742
{
821
743
  int rc;
822
 
  unsigned char *read_buf= NULL;
 
744
  uchar *read_buf= NULL;
823
745
  uint64_t temp_auto;
824
 
  unsigned char *record=  table->record[0];
 
746
  uchar *record=  table->record[0];
825
747
 
826
748
  if (share->crashed)
827
749
    return(HA_ERR_CRASHED_ON_USAGE);
828
750
 
829
751
  ha_statistic_increment(&SSV::ha_write_count);
 
752
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
753
    table->timestamp_field->set_time();
830
754
  pthread_mutex_lock(&share->mutex);
831
755
 
832
756
  if (share->archive_write_open == false)
844
768
      We don't support decremening auto_increment. They make the performance
845
769
      just cry.
846
770
    */
847
 
    if (temp_auto <= share->archive_write.auto_increment &&
 
771
    if (temp_auto <= share->archive_write.auto_increment && 
848
772
        mkey->flags & HA_NOSAME)
849
773
    {
850
774
      rc= HA_ERR_FOUND_DUPP_KEY;
851
775
      goto error;
852
776
    }
 
777
#ifdef DEAD_CODE
 
778
    /*
 
779
      Bad news, this will cause a search for the unique value which is very 
 
780
      expensive since we will have to do a table scan which will lock up 
 
781
      all other writers during this period. This could perhaps be optimized 
 
782
      in the future.
 
783
    */
 
784
    {
 
785
      /* 
 
786
        First we create a buffer that we can use for reading rows, and can pass
 
787
        to get_row().
 
788
      */
 
789
      if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
790
      {
 
791
        rc= HA_ERR_OUT_OF_MEM;
 
792
        goto error;
 
793
      }
 
794
       /* 
 
795
         All of the buffer must be written out or we won't see all of the
 
796
         data 
 
797
       */
 
798
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
799
      /*
 
800
        Set the position of the local read thread to the beginning postion.
 
801
      */
 
802
      if (read_data_header(&archive))
 
803
      {
 
804
        rc= HA_ERR_CRASHED_ON_USAGE;
 
805
        goto error;
 
806
      }
 
807
 
 
808
      Field *mfield= table->next_number_field;
 
809
 
 
810
      while (!(get_row(&archive, read_buf)))
 
811
      {
 
812
        if (!memcmp(read_buf + mfield->offset(record),
 
813
                    table->next_number_field->ptr,
 
814
                    mfield->max_display_length()))
 
815
        {
 
816
          rc= HA_ERR_FOUND_DUPP_KEY;
 
817
          goto error;
 
818
        }
 
819
      }
 
820
    }
 
821
#endif
853
822
    else
854
823
    {
855
824
      if (temp_auto > share->archive_write.auto_increment)
867
836
error:
868
837
  pthread_mutex_unlock(&share->mutex);
869
838
  if (read_buf)
870
 
    free((unsigned char*) read_buf);
 
839
    my_free((uchar*) read_buf, MYF(0));
871
840
 
872
841
  return(rc);
873
842
}
874
843
 
875
844
 
876
 
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
877
 
                                    uint64_t *first_value, uint64_t *nb_reserved_values)
 
845
void ha_archive::get_auto_increment(uint64_t offset __attribute__((unused)),
 
846
                                    uint64_t increment __attribute__((unused)),
 
847
                                    uint64_t nb_desired_values __attribute__((unused)),
 
848
                                    uint64_t *first_value __attribute__((unused)),
 
849
                                    uint64_t *nb_reserved_values __attribute__((unused)))
878
850
{
879
851
  *nb_reserved_values= UINT64_MAX;
880
852
  *first_value= share->archive_write.auto_increment + 1;
881
853
}
882
854
 
883
855
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
884
 
int ha_archive::index_init(uint32_t keynr, bool)
 
856
int ha_archive::index_init(uint keynr, bool sorted __attribute__((unused)))
885
857
{
886
858
  active_index= keynr;
887
859
  return(0);
892
864
  No indexes, so if we get a request for an index search since we tell
893
865
  the optimizer that we have unique indexes, we scan
894
866
*/
895
 
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
896
 
                             uint32_t key_len, enum ha_rkey_function find_flag)
 
867
int ha_archive::index_read(uchar *buf, const uchar *key,
 
868
                             uint key_len, enum ha_rkey_function find_flag)
897
869
{
898
870
  int rc;
899
871
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
901
873
}
902
874
 
903
875
 
904
 
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
905
 
                               uint32_t key_len, enum ha_rkey_function)
 
876
int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
 
877
                               uint key_len,
 
878
                               enum ha_rkey_function find_flag __attribute__((unused)))
906
879
{
907
880
  int rc;
908
881
  bool found= 0;
933
906
}
934
907
 
935
908
 
936
 
int ha_archive::index_next(unsigned char * buf)
937
 
{
 
909
int ha_archive::index_next(uchar * buf) 
 
910
938
911
  bool found= 0;
939
912
 
940
913
  while (!(get_row(&archive, buf)))
946
919
    }
947
920
  }
948
921
 
949
 
  return(found ? 0 : HA_ERR_END_OF_FILE);
 
922
  return(found ? 0 : HA_ERR_END_OF_FILE); 
950
923
}
951
924
 
952
925
/*
974
947
 
975
948
 
976
949
/*
977
 
  This is the method that is used to read a row. It assumes that the row is
 
950
  This is the method that is used to read a row. It assumes that the row is 
978
951
  positioned where you want it.
979
952
*/
980
 
int ha_archive::get_row(azio_stream *file_to_read, unsigned char *buf)
 
953
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
981
954
{
982
955
  int rc;
983
956
 
996
969
 
997
970
  if (length > record_buffer->length)
998
971
  {
999
 
    unsigned char *newptr;
1000
 
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
 
972
    uchar *newptr;
 
973
    if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer, 
 
974
                                    length,
 
975
                                    MYF(MY_ALLOW_ZERO_PTR))))
1001
976
      return(1);
1002
977
    record_buffer->buffer= newptr;
1003
978
    record_buffer->length= length;
1008
983
  return(0);
1009
984
}
1010
985
 
1011
 
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
 
986
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1012
987
{
1013
988
  unsigned int read;
1014
989
  int error;
1015
 
  const unsigned char *ptr;
 
990
  const uchar *ptr;
1016
991
 
1017
992
  read= azread_row(file_to_read, &error);
1018
 
  ptr= (const unsigned char *)file_to_read->row_ptr;
 
993
  ptr= (const uchar *)file_to_read->row_ptr;
1019
994
 
1020
995
  if (error || read == 0)
1021
996
  {
1036
1011
}
1037
1012
 
1038
1013
 
1039
 
int ha_archive::get_row_version3(azio_stream *file_to_read, unsigned char *buf)
 
1014
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1040
1015
{
1041
1016
  int returnable= unpack_row(file_to_read, buf);
1042
1017
 
1044
1019
}
1045
1020
 
1046
1021
 
1047
 
/*
 
1022
/* 
1048
1023
  Called during ORDER BY. Its position is either from being called sequentially
1049
1024
  or by having had ha_archive::rnd_pos() called before it is called.
1050
1025
*/
1051
1026
 
1052
 
int ha_archive::rnd_next(unsigned char *buf)
 
1027
int ha_archive::rnd_next(uchar *buf)
1053
1028
{
1054
1029
  int rc;
1055
1030
 
1071
1046
 
1072
1047
 
1073
1048
/*
1074
 
  Thanks to the table bool is_ordered this will be called after
 
1049
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1075
1050
  each call to ha_archive::rnd_next() if an ordering of the rows is
1076
1051
  needed.
1077
1052
*/
1078
1053
 
1079
 
void ha_archive::position(const unsigned char *)
 
1054
void ha_archive::position(const uchar *record __attribute__((unused)))
1080
1055
{
1081
1056
  my_store_ptr(ref, ref_length, current_position);
1082
1057
  return;
1090
1065
  correctly ordered row.
1091
1066
*/
1092
1067
 
1093
 
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
 
1068
int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1094
1069
{
1095
1070
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1096
1071
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
1100
1075
}
1101
1076
 
1102
1077
/*
1103
 
  This method repairs the meta file. It does this by walking the datafile and
 
1078
  This method repairs the meta file. It does this by walking the datafile and 
1104
1079
  rewriting the meta file. Currently it does this by calling optimize with
1105
1080
  the extended flag.
1106
1081
*/
1107
 
int ha_archive::repair()
 
1082
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1108
1083
{
1109
 
  int rc= optimize();
 
1084
  check_opt->flags= T_EXTEND;
 
1085
  int rc= optimize(thd, check_opt);
1110
1086
 
1111
1087
  if (rc)
1112
1088
    return(HA_ERR_CRASHED_ON_REPAIR);
1117
1093
 
1118
1094
/*
1119
1095
  The table can become fragmented if data was inserted, read, and then
1120
 
  inserted again. What we do is open up the file and recompress it completely.
 
1096
  inserted again. What we do is open up the file and recompress it completely. 
1121
1097
*/
1122
 
int ha_archive::optimize()
 
1098
int ha_archive::optimize(THD* thd __attribute__((unused)),
 
1099
                         HA_CHECK_OPT* check_opt __attribute__((unused)))
1123
1100
{
1124
1101
  int rc= 0;
1125
1102
  azio_stream writer;
1134
1111
    share->archive_write_open= false;
1135
1112
  }
1136
1113
 
1137
 
  char* proto_string;
1138
 
  proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
1139
 
  if (proto_string == NULL)
1140
 
  {
1141
 
    return ENOMEM;
1142
 
  }
1143
 
  azread_frm(&archive, proto_string);
1144
 
 
1145
1114
  /* Lets create a file to contain the new data */
1146
 
  fn_format(writer_filename, share->table_name.c_str(), "", ARN,
 
1115
  fn_format(writer_filename, share->table_name, "", ARN, 
1147
1116
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1148
1117
 
1149
 
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1150
 
  {
1151
 
    free(proto_string);
1152
 
    return(HA_ERR_CRASHED_ON_USAGE);
1153
 
  }
1154
 
 
1155
 
  azwrite_frm(&writer, proto_string, archive.frm_length);
1156
 
 
1157
 
  /*
1158
 
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1159
 
    Any dead rows are removed (aka rows that may have been partially recorded).
 
1118
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
 
1119
    return(HA_ERR_CRASHED_ON_USAGE); 
 
1120
 
 
1121
  /* 
 
1122
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
 
1123
    Any dead rows are removed (aka rows that may have been partially recorded). 
1160
1124
 
1161
1125
    As of Archive format 3, this is the only type that is performed, before this
1162
1126
    version it was just done on T_EXTEND
1164
1128
  if (1)
1165
1129
  {
1166
1130
    /*
1167
 
      Now we will rewind the archive file so that we are positioned at the
 
1131
      Now we will rewind the archive file so that we are positioned at the 
1168
1132
      start of the file.
1169
1133
    */
1170
1134
    azflush(&archive, Z_SYNC_FLUSH);
1171
1135
    rc= read_data_header(&archive);
1172
1136
 
1173
 
    /*
 
1137
    /* 
1174
1138
      On success of writing out the new header, we now fetch each row and
1175
 
      insert it into the new archive file.
 
1139
      insert it into the new archive file. 
1176
1140
    */
1177
1141
    if (!rc)
1178
1142
    {
1181
1145
      share->rows_recorded= 0;
1182
1146
      stats.auto_increment_value= 1;
1183
1147
      share->archive_write.auto_increment= 0;
 
1148
      my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
1184
1149
 
1185
1150
      rows_restored= archive.rows;
1186
1151
 
1199
1164
        if (table->found_next_number_field)
1200
1165
        {
1201
1166
          Field *field= table->found_next_number_field;
1202
 
 
1203
 
          /* Since we will need to use field to translate, we need to flip its read bit */
1204
 
          field->setReadSet();
1205
 
 
1206
1167
          uint64_t auto_value=
1207
1168
            (uint64_t) field->val_int(table->record[0] +
1208
1169
                                       field->offset(table->record[0]));
1211
1172
              (share->archive_write.auto_increment= auto_value) + 1;
1212
1173
        }
1213
1174
      }
 
1175
      dbug_tmp_restore_column_map(table->read_set, org_bitmap);
1214
1176
      share->rows_recorded= (ha_rows)writer.rows;
1215
1177
    }
1216
1178
 
1218
1180
    {
1219
1181
      goto error;
1220
1182
    }
1221
 
  }
 
1183
  } 
1222
1184
 
1223
1185
  azclose(&writer);
1224
1186
  share->dirty= false;
1225
 
 
 
1187
  
1226
1188
  azclose(&archive);
1227
1189
 
1228
1190
  // make the file we just wrote be our data file
1229
1191
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1230
1192
 
1231
 
  free(proto_string);
 
1193
 
1232
1194
  return(rc);
1233
1195
error:
1234
 
  free(proto_string);
1235
1196
  azclose(&writer);
1236
1197
 
1237
 
  return(rc);
 
1198
  return(rc); 
1238
1199
}
1239
1200
 
1240
 
/*
 
1201
/* 
1241
1202
  Below is an example of how to setup row level locking.
1242
1203
*/
1243
 
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
 
1204
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1244
1205
                                       THR_LOCK_DATA **to,
1245
1206
                                       enum thr_lock_type lock_type)
1246
1207
{
1247
 
  delayed_insert= false;
 
1208
  if (lock_type == TL_WRITE_DELAYED)
 
1209
    delayed_insert= true;
 
1210
  else
 
1211
    delayed_insert= false;
1248
1212
 
1249
 
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1213
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
1250
1214
  {
1251
 
    /*
 
1215
    /* 
1252
1216
      Here is where we get into the guts of a row level lock.
1253
 
      If TL_UNLOCK is set
1254
 
      If we are not doing a LOCK Table or DISCARD/IMPORT
1255
 
      TABLESPACE, then allow multiple writers
 
1217
      If TL_UNLOCK is set 
 
1218
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
 
1219
      TABLESPACE, then allow multiple writers 
1256
1220
    */
1257
1221
 
1258
1222
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1259
 
         lock_type <= TL_WRITE)
1260
 
        && !session_tablespace_op(session))
 
1223
         lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
 
1224
        && !thd_tablespace_op(thd))
1261
1225
      lock_type = TL_WRITE_ALLOW_WRITE;
1262
1226
 
1263
 
    /*
 
1227
    /* 
1264
1228
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1265
1229
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1266
1230
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1267
1231
      to t2. Convert the lock to a normal read lock to allow
1268
 
      concurrent inserts to t2.
 
1232
      concurrent inserts to t2. 
1269
1233
    */
1270
1234
 
1271
 
    if (lock_type == TL_READ_NO_INSERT)
 
1235
    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) 
1272
1236
      lock_type = TL_READ;
1273
1237
 
1274
1238
    lock.type=lock_type;
1279
1243
  return to;
1280
1244
}
1281
1245
 
 
1246
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
 
1247
{
 
1248
  ha_archive::info(HA_STATUS_AUTO);
 
1249
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
 
1250
  {
 
1251
    create_info->auto_increment_value= stats.auto_increment_value;
 
1252
  }
 
1253
 
 
1254
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
 
1255
    create_info->data_file_name= share->real_path;
 
1256
 
 
1257
  return;
 
1258
}
 
1259
 
 
1260
 
1282
1261
/*
1283
1262
  Hints for optimizer, see ha_tina for more information
1284
1263
*/
1285
 
int ha_archive::info(uint32_t flag)
 
1264
int ha_archive::info(uint flag)
1286
1265
{
1287
 
  /*
 
1266
  /* 
1288
1267
    If dirty, we lock, and then reset/flush the data.
1289
1268
    I found that just calling azflush() doesn't always work.
1290
1269
  */
1302
1281
 
1303
1282
  }
1304
1283
 
1305
 
  /*
 
1284
  /* 
1306
1285
    This should be an accurate number now, though bulk and delayed inserts can
1307
1286
    cause the number to be inaccurate.
1308
1287
  */
1317
1296
  {
1318
1297
    struct stat file_stat;  // Stat information for the data file
1319
1298
 
1320
 
    stat(share->data_file_name, &file_stat);
 
1299
    VOID(stat(share->data_file_name, &file_stat));
1321
1300
 
1322
1301
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1323
1302
    stats.data_file_length= file_stat.st_size;
1355
1334
}
1356
1335
 
1357
1336
 
1358
 
/*
 
1337
/* 
1359
1338
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1360
1339
  flag, and set the share dirty so that the next select will call sync for us.
1361
1340
*/
1368
1347
 
1369
1348
/*
1370
1349
  We cancel a truncate command. The only way to delete an archive table is to drop it.
1371
 
  This is done for security reasons. In a later version we will enable this by
 
1350
  This is done for security reasons. In a later version we will enable this by 
1372
1351
  allowing the user to select a different row format.
1373
1352
*/
1374
1353
int ha_archive::delete_all_rows()
1377
1356
}
1378
1357
 
1379
1358
/*
 
1359
  We just return state if asked.
 
1360
*/
 
1361
bool ha_archive::is_crashed() const 
 
1362
{
 
1363
  return(share->crashed); 
 
1364
}
 
1365
 
 
1366
/*
1380
1367
  Simple scan of the tables to make sure everything is ok.
1381
1368
*/
1382
1369
 
1383
 
int ha_archive::check(Session* session)
 
1370
int ha_archive::check(THD* thd,
 
1371
                      HA_CHECK_OPT* check_opt __attribute__((unused)))
1384
1372
{
1385
1373
  int rc= 0;
1386
1374
  const char *old_proc_info;
1387
1375
  uint64_t x;
1388
1376
 
1389
 
  old_proc_info= get_session_proc_info(session);
1390
 
  set_session_proc_info(session, "Checking table");
 
1377
  old_proc_info= thd_proc_info(thd, "Checking table");
1391
1378
  /* Flush any waiting data */
1392
1379
  pthread_mutex_lock(&share->mutex);
1393
1380
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1394
1381
  pthread_mutex_unlock(&share->mutex);
1395
1382
 
1396
1383
  /*
1397
 
    Now we will rewind the archive file so that we are positioned at the
 
1384
    Now we will rewind the archive file so that we are positioned at the 
1398
1385
    start of the file.
1399
1386
  */
1400
1387
  init_archive_reader();
1408
1395
      break;
1409
1396
  }
1410
1397
 
1411
 
  set_session_proc_info(session, old_proc_info);
 
1398
  thd_proc_info(thd, old_proc_info);
1412
1399
 
1413
 
  if ((rc && rc != HA_ERR_END_OF_FILE))
 
1400
  if ((rc && rc != HA_ERR_END_OF_FILE))  
1414
1401
  {
1415
1402
    share->crashed= false;
1416
1403
    return(HA_ADMIN_CORRUPT);
1421
1408
  }
1422
1409
}
1423
1410
 
1424
 
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
 
1411
/*
 
1412
  Check and repair the table if needed.
 
1413
*/
 
1414
bool ha_archive::check_and_repair(THD *thd) 
 
1415
{
 
1416
  HA_CHECK_OPT check_opt;
 
1417
 
 
1418
  check_opt.init();
 
1419
 
 
1420
  return(repair(thd, &check_opt));
 
1421
}
 
1422
 
 
1423
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) 
1425
1424
{
1426
1425
  archive_record_buffer *r;
1427
 
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
 
1426
  if (!(r= 
 
1427
        (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
 
1428
                                           MYF(MY_WME))))
1428
1429
  {
1429
 
    return(NULL);
 
1430
    return(NULL); /* purecov: inspected */
1430
1431
  }
1431
1432
  r->length= (int)length;
1432
1433
 
1433
 
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
 
1434
  if (!(r->buffer= (uchar*) my_malloc(r->length,
 
1435
                                    MYF(MY_WME))))
1434
1436
  {
1435
 
    free((char*) r);
1436
 
    return(NULL);
 
1437
    my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
 
1438
    return(NULL); /* purecov: inspected */
1437
1439
  }
1438
1440
 
1439
1441
  return(r);
1440
1442
}
1441
1443
 
1442
 
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
 
1444
void ha_archive::destroy_record_buffer(archive_record_buffer *r) 
1443
1445
{
1444
 
  free((char*) r->buffer);
1445
 
  free((char*) r);
 
1446
  my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
 
1447
  my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1446
1448
  return;
1447
1449
}
1448
1450
 
1456
1458
  NULL
1457
1459
};
1458
1460
 
1459
 
drizzle_declare_plugin
 
1461
mysql_declare_plugin(archive)
1460
1462
{
 
1463
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
1461
1464
  "ARCHIVE",
1462
1465
  "3.5",
1463
1466
  "Brian Aker, MySQL AB",
1469
1472
  archive_system_variables,   /* system variables                */
1470
1473
  NULL                        /* config options                  */
1471
1474
}
1472
 
drizzle_declare_plugin_end;
 
1475
mysql_declare_plugin_end;
1473
1476