~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

  • Committer: Brian Aker
  • Date: 2010-01-22 00:53:13 UTC
  • Revision ID: brian@gaz-20100122005313-jmizcbcdi1lt4tcx
Revert db patch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
  along with this program; if not, write to the Free Software
14
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
 
#ifdef USE_PRAGMA_IMPLEMENTATION
17
 
#pragma implementation        // gcc: Class implementation
18
 
#endif
19
16
 
20
 
#include <drizzled/common_includes.h>
21
 
#include <storage/myisam/myisam.h>
 
17
#include "config.h"
 
18
#include "drizzled/field.h"
 
19
#include "drizzled/field/blob.h"
 
20
#include "drizzled/field/timestamp.h"
 
21
#include "plugin/myisam/myisam.h"
 
22
#include "drizzled/table.h"
 
23
#include "drizzled/session.h"
22
24
 
23
25
#include "ha_archive.h"
24
26
 
 
27
#include <unistd.h>
 
28
#include <fcntl.h>
 
29
#include <sys/stat.h>
 
30
 
 
31
#include <cstdio>
 
32
#include <string>
 
33
#include <map>
 
34
 
 
35
using namespace std;
 
36
 
25
37
/*
26
 
  First, if you want to understand storage engines you should look at 
27
 
  ha_example.cc and ha_example.h. 
 
38
  First, if you want to understand storage engines you should look at
 
39
  ha_example.cc and ha_example.h.
28
40
 
29
41
  This example was written as a test case for a customer who needed
30
42
  a storage engine without indexes that could compress data very well.
31
43
  So, welcome to a completely compressed storage engine. This storage
32
 
  engine only does inserts. No replace, deletes, or updates. All reads are 
 
44
  engine only does inserts. No replace, deletes, or updates. All reads are
33
45
  complete table scans. Compression is done through a combination of packing
34
46
  and making use of the zlib library
35
 
  
 
47
 
36
48
  We keep a file pointer open for each instance of ha_archive for each read
37
49
  but for writes we keep one open file handle just for that. We flush it
38
50
  only if we have a read occur. azip handles compressing lots of records
42
54
  the same time since we would want to flush).
43
55
 
44
56
  A "meta" file is kept alongside the data file. This file serves two purpose.
45
 
  The first purpose is to track the number of rows in the table. The second 
46
 
  purpose is to determine if the table was closed properly or not. When the 
47
 
  meta file is first opened it is marked as dirty. It is opened when the table 
48
 
  itself is opened for writing. When the table is closed the new count for rows 
49
 
  is written to the meta file and the file is marked as clean. If the meta file 
50
 
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
 
57
  The first purpose is to track the number of rows in the table. The second
 
58
  purpose is to determine if the table was closed properly or not. When the
 
59
  meta file is first opened it is marked as dirty. It is opened when the table
 
60
  itself is opened for writing. When the table is closed the new count for rows
 
61
  is written to the meta file and the file is marked as clean. If the meta file
 
62
  is opened and it is marked as dirty, it is assumed that a crash occured. At
51
63
  this point an error occurs and the user is told to rebuild the file.
52
64
  A rebuild scans the rows and rewrites the meta file. If corruption is found
53
65
  in the data file then the meta file is not repaired.
54
66
 
55
67
  At some point a recovery method for such a drastic case needs to be divised.
56
68
 
57
 
  Locks are row level, and you will get a consistant read. 
 
69
  Locks are row level, and you will get a consistant read.
58
70
 
59
71
  For performance as far as table scans go it is quite fast. I don't have
60
72
  good numbers but locally it has out performed both Innodb and MyISAM. For
61
73
  Innodb the question will be if the table can be fit into the buffer
62
74
  pool. For MyISAM its a question of how much the file system caches the
63
75
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
64
 
  doesn't have enough memory to cache entire table that archive turns out 
65
 
  to be any faster. 
 
76
  doesn't have enough memory to cache entire table that archive turns out
 
77
  to be any faster.
66
78
 
67
79
  Examples between MyISAM (packed) and Archive.
68
80
 
91
103
*/
92
104
 
93
105
/* Variables for archive share methods */
94
 
pthread_mutex_t archive_mutex;
95
 
static HASH archive_open_tables;
 
106
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
 
107
 
96
108
static unsigned int global_version;
97
109
 
98
110
/* The file extension */
99
 
#define ARZ ".ARZ"               // The data file
 
111
#define ARZ ".arz"               // The data file
100
112
#define ARN ".ARN"               // Files used during an optimize call
101
 
#define ARM ".ARM"               // Meta file (deprecated)
102
 
 
103
 
/*
104
 
  uchar + uchar
105
 
*/
106
 
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
107
 
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
108
 
 
109
 
/* Static declarations for handerton */
110
 
static handler *archive_create_handler(handlerton *hton, 
111
 
                                       TABLE_SHARE *table, 
112
 
                                       MEM_ROOT *mem_root);
113
 
int archive_discover(handlerton *hton, THD* thd, const char *db, 
114
 
                     const char *name,
115
 
                     uchar **frmblob, 
116
 
                     size_t *frmlen);
 
113
 
 
114
 
117
115
 
118
116
static bool archive_use_aio= false;
119
117
 
127
125
*/
128
126
#define ARCHIVE_ROW_HEADER_SIZE 4
129
127
 
130
 
static handler *archive_create_handler(handlerton *hton,
131
 
                                       TABLE_SHARE *table, 
132
 
                                       MEM_ROOT *mem_root)
133
 
{
134
 
  return new (mem_root) ha_archive(hton, table);
135
 
}
136
 
 
137
128
/*
138
 
  Used for hash table that tracks open tables.
 
129
  We just implement one additional file extension.
139
130
*/
140
 
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
141
 
                             bool not_used __attribute__((unused)))
142
 
{
143
 
  *length=share->table_name_length;
144
 
  return (uchar*) share->table_name;
145
 
}
146
 
 
 
131
static const char *ha_archive_exts[] = {
 
132
  ARZ,
 
133
  NULL
 
134
};
 
135
 
 
136
class ArchiveEngine : public drizzled::plugin::StorageEngine
 
137
{
 
138
  typedef std::map<string, ArchiveShare*> ArchiveMap;
 
139
  ArchiveMap archive_open_tables;
 
140
 
 
141
public:
 
142
  ArchiveEngine(const string &name_arg)
 
143
   : drizzled::plugin::StorageEngine(name_arg,
 
144
                                     HTON_FILE_BASED |
 
145
                                     HTON_STATS_RECORDS_IS_EXACT |
 
146
                                     HTON_HAS_RECORDS |
 
147
                                     HTON_HAS_DATA_DICTIONARY),
 
148
     archive_open_tables()
 
149
  {
 
150
    table_definition_ext= ARZ;
 
151
  }
 
152
 
 
153
  virtual Cursor *create(TableShare &table,
 
154
                         drizzled::memory::Root *mem_root)
 
155
  {
 
156
    return new (mem_root) ha_archive(*this, table);
 
157
  }
 
158
 
 
159
  const char **bas_ext() const {
 
160
    return ha_archive_exts;
 
161
  }
 
162
 
 
163
  int doCreateTable(Session *session, const char *table_name,
 
164
                    Table& table_arg,
 
165
                    drizzled::message::Table& proto);
 
166
 
 
167
  int doGetTableDefinition(Session& session,
 
168
                           const char* path,
 
169
                           const char *db,
 
170
                           const char *table_name,
 
171
                           const bool is_tmp,
 
172
                           drizzled::message::Table *table_proto);
 
173
 
 
174
  void doGetTableNames(drizzled::CachedDirectory &directory, string& , set<string>& set_of_names);
 
175
 
 
176
  int doDropTable(Session&, const string table_path);
 
177
  ArchiveShare *findOpenTable(const string table_name);
 
178
  void addOpenTable(const string &table_name, ArchiveShare *);
 
179
  void deleteOpenTable(const string &table_name);
 
180
 
 
181
  uint32_t max_supported_keys()          const { return 1; }
 
182
  uint32_t max_supported_key_length()    const { return sizeof(uint64_t); }
 
183
  uint32_t max_supported_key_part_length() const { return sizeof(uint64_t); }
 
184
 
 
185
  uint32_t index_flags(enum  ha_key_alg) const
 
186
  {
 
187
    return HA_ONLY_WHOLE_INDEX;
 
188
  }
 
189
};
 
190
 
 
191
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
 
192
{
 
193
  ArchiveMap::iterator find_iter=
 
194
    archive_open_tables.find(table_name);
 
195
 
 
196
  if (find_iter != archive_open_tables.end())
 
197
    return (*find_iter).second;
 
198
  else
 
199
    return NULL;
 
200
}
 
201
 
 
202
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
 
203
{
 
204
  archive_open_tables[table_name]= share;
 
205
}
 
206
 
 
207
void ArchiveEngine::deleteOpenTable(const string &table_name)
 
208
{
 
209
  archive_open_tables.erase(table_name);
 
210
}
 
211
 
 
212
 
 
213
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory, 
 
214
                                    string&, 
 
215
                                    set<string>& set_of_names)
 
216
{
 
217
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
218
 
 
219
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
220
       entry_iter != entries.end(); ++entry_iter)
 
221
  {
 
222
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
223
    const string *filename= &entry->filename;
 
224
 
 
225
    assert(filename->size());
 
226
 
 
227
    const char *ext= strchr(filename->c_str(), '.');
 
228
 
 
229
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
230
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
 
231
    {  }
 
232
    else
 
233
    {
 
234
      char uname[NAME_LEN + 1];
 
235
      uint32_t file_name_len;
 
236
 
 
237
      file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
238
      // TODO: Remove need for memory copy here
 
239
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
240
      set_of_names.insert(uname);
 
241
    }
 
242
  }
 
243
}
 
244
 
 
245
 
 
246
int ArchiveEngine::doDropTable(Session&,
 
247
                               const string table_path)
 
248
{
 
249
  string new_path(table_path);
 
250
 
 
251
  new_path+= ARZ;
 
252
 
 
253
  int error= unlink(new_path.c_str());
 
254
 
 
255
  if (error != 0)
 
256
  {
 
257
    error= errno= errno;
 
258
  }
 
259
 
 
260
  return error;
 
261
}
 
262
 
 
263
int ArchiveEngine::doGetTableDefinition(Session&,
 
264
                                        const char* path,
 
265
                                        const char *,
 
266
                                        const char *,
 
267
                                        const bool,
 
268
                                        drizzled::message::Table *table_proto)
 
269
{
 
270
  struct stat stat_info;
 
271
  int error= ENOENT;
 
272
  string proto_path;
 
273
 
 
274
  proto_path.reserve(FN_REFLEN);
 
275
  proto_path.assign(path);
 
276
 
 
277
  proto_path.append(ARZ);
 
278
 
 
279
  if (stat(proto_path.c_str(),&stat_info))
 
280
    return errno;
 
281
  else
 
282
    error= EEXIST;
 
283
 
 
284
  if (table_proto)
 
285
  {
 
286
    azio_stream proto_stream;
 
287
    char* proto_string;
 
288
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
289
      return HA_ERR_CRASHED_ON_USAGE;
 
290
 
 
291
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
 
292
    if (proto_string == NULL)
 
293
    {
 
294
      azclose(&proto_stream);
 
295
      return ENOMEM;
 
296
    }
 
297
 
 
298
    azread_frm(&proto_stream, proto_string);
 
299
 
 
300
    if (table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
 
301
      error= HA_ERR_CRASHED_ON_USAGE;
 
302
 
 
303
    azclose(&proto_stream);
 
304
    free(proto_string);
 
305
  }
 
306
 
 
307
  return error;
 
308
}
 
309
 
 
310
static ArchiveEngine *archive_engine= NULL;
147
311
 
148
312
/*
149
 
  Initialize the archive handler.
 
313
  Initialize the archive Cursor.
150
314
 
151
315
  SYNOPSIS
152
316
    archive_db_init()
157
321
    true        Error
158
322
*/
159
323
 
160
 
int archive_db_init(void *p)
 
324
static int archive_db_init(drizzled::plugin::Registry &registry)
161
325
{
162
 
  handlerton *archive_hton;
163
326
 
164
 
  archive_hton= (handlerton *)p;
165
 
  archive_hton->state= SHOW_OPTION_YES;
166
 
  archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
167
 
  archive_hton->create= archive_create_handler;
168
 
  archive_hton->flags= HTON_NO_FLAGS;
169
 
  archive_hton->discover= archive_discover;
 
327
  pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
 
328
  archive_engine= new ArchiveEngine("ARCHIVE");
 
329
  registry.add(archive_engine);
170
330
 
171
331
  /* When the engine starts up set the first version */
172
332
  global_version= 1;
173
333
 
174
 
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
175
 
    goto error;
176
 
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
177
 
                (hash_get_key) archive_get_key, 0, 0))
178
 
  {
179
 
    VOID(pthread_mutex_destroy(&archive_mutex));
180
 
  }
181
 
  else
182
 
  {
183
 
    return(false);
184
 
  }
185
 
error:
186
 
  return(true);
 
334
  return false;
187
335
}
188
336
 
189
337
/*
190
 
  Release the archive handler.
 
338
  Release the archive Cursor.
191
339
 
192
340
  SYNOPSIS
193
341
    archive_db_done()
197
345
    false       OK
198
346
*/
199
347
 
200
 
int archive_db_done(void *p __attribute__((unused)))
 
348
static int archive_db_done(drizzled::plugin::Registry &registry)
201
349
{
202
 
  hash_free(&archive_open_tables);
203
 
  VOID(pthread_mutex_destroy(&archive_mutex));
 
350
  registry.remove(archive_engine);
 
351
  delete archive_engine;
 
352
 
 
353
  pthread_mutex_destroy(&archive_mutex);
204
354
 
205
355
  return 0;
206
356
}
207
357
 
208
358
 
209
 
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
210
 
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
 
359
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
 
360
                       TableShare &table_arg)
 
361
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
211
362
{
212
363
  /* Set our original buffer from pre-allocated memory */
213
364
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
217
368
  archive_reader_open= false;
218
369
}
219
370
 
220
 
int archive_discover(handlerton *hton __attribute__((unused)),
221
 
                     THD* thd __attribute__((unused)),
222
 
                     const char *db,
223
 
                     const char *name,
224
 
                     uchar **frmblob,
225
 
                     size_t *frmlen)
226
 
{
227
 
  azio_stream frm_stream;
228
 
  char az_file[FN_REFLEN];
229
 
  char *frm_ptr;
230
 
  struct stat file_stat; 
231
 
 
232
 
  fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
233
 
 
234
 
  if (stat(az_file, &file_stat))
235
 
    goto err;
236
 
 
237
 
  if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
238
 
  {
239
 
    if (errno == EROFS || errno == EACCES)
240
 
      return(my_errno= errno);
241
 
    return(HA_ERR_CRASHED_ON_USAGE);
242
 
  }
243
 
 
244
 
  if (frm_stream.frm_length == 0)
245
 
    goto err;
246
 
 
247
 
  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
248
 
  azread_frm(&frm_stream, frm_ptr);
249
 
  azclose(&frm_stream);
250
 
 
251
 
  *frmlen= frm_stream.frm_length;
252
 
  *frmblob= (uchar*) frm_ptr;
253
 
 
254
 
  return(0);
255
 
err:
256
 
  my_errno= 0;
257
 
  return(1);
258
 
}
259
 
 
260
371
/*
261
372
  This method reads the header of a datafile and returns whether or not it was successful.
262
373
*/
271
382
  return(1);
272
383
}
273
384
 
 
385
ArchiveShare::ArchiveShare():
 
386
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
 
387
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
 
388
{
 
389
  assert(1);
 
390
}
 
391
 
 
392
ArchiveShare::ArchiveShare(const char *name):
 
393
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
 
394
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
 
395
{
 
396
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
 
397
  table_name.append(name);
 
398
  fn_format(data_file_name, table_name.c_str(), "",
 
399
            ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
400
  /*
 
401
    We will use this lock for rows.
 
402
  */
 
403
  pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
 
404
}
 
405
 
 
406
ArchiveShare::~ArchiveShare()
 
407
{
 
408
  thr_lock_delete(&lock);
 
409
  pthread_mutex_destroy(&mutex);
 
410
  /*
 
411
    We need to make sure we don't reset the crashed state.
 
412
    If we open a crashed file, wee need to close it as crashed unless
 
413
    it has been repaired.
 
414
    Since we will close the data down after this, we go on and count
 
415
    the flush on close;
 
416
  */
 
417
  if (archive_write_open == true)
 
418
    (void)azclose(&archive_write);
 
419
}
 
420
 
 
421
bool ArchiveShare::prime(uint64_t *auto_increment)
 
422
{
 
423
  azio_stream archive_tmp;
 
424
 
 
425
  /*
 
426
    We read the meta file, but do not mark it dirty. Since we are not
 
427
    doing a write we won't mark it dirty (and we won't open it for
 
428
    anything but reading... open it for write and we will generate null
 
429
    compression writes).
 
430
  */
 
431
  if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
 
432
               AZ_METHOD_BLOCK)))
 
433
    return false;
 
434
 
 
435
  *auto_increment= archive_tmp.auto_increment + 1;
 
436
  rows_recorded= (ha_rows)archive_tmp.rows;
 
437
  crashed= archive_tmp.dirty;
 
438
  if (version < global_version)
 
439
  {
 
440
    version_rows= rows_recorded;
 
441
    version= global_version;
 
442
  }
 
443
  azclose(&archive_tmp);
 
444
 
 
445
  return true;
 
446
}
 
447
 
274
448
 
275
449
/*
276
 
  We create the shared memory space that we will use for the open table. 
 
450
  We create the shared memory space that we will use for the open table.
277
451
  No matter what we try to get or create a share. This is so that a repair
278
 
  table operation can occur. 
 
452
  table operation can occur.
279
453
 
280
454
  See ha_example.cc for a longer description.
281
455
*/
282
 
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
 
456
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
283
457
{
284
 
  uint length;
285
 
 
286
458
  pthread_mutex_lock(&archive_mutex);
287
 
  length=(uint) strlen(table_name);
288
 
 
289
 
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
290
 
                                           (uchar*) table_name,
291
 
                                           length)))
 
459
 
 
460
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
 
461
  share= a_engine->findOpenTable(table_name);
 
462
 
 
463
  if (!share)
292
464
  {
293
 
    char *tmp_name;
294
 
    azio_stream archive_tmp;
 
465
    share= new ArchiveShare(table_name);
295
466
 
296
 
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
297
 
                          &share, sizeof(*share),
298
 
                          &tmp_name, length+1,
299
 
                          NullS)) 
 
467
    if (share == NULL)
300
468
    {
301
469
      pthread_mutex_unlock(&archive_mutex);
302
470
      *rc= HA_ERR_OUT_OF_MEM;
303
471
      return(NULL);
304
472
    }
305
473
 
306
 
    share->use_count= 0;
307
 
    share->table_name_length= length;
308
 
    share->table_name= tmp_name;
309
 
    share->crashed= false;
310
 
    share->archive_write_open= false;
311
 
    fn_format(share->data_file_name, table_name, "",
312
 
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
313
 
    stpcpy(share->table_name, table_name);
314
 
    /*
315
 
      We will use this lock for rows.
316
 
    */
317
 
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
318
 
    
319
 
    /*
320
 
      We read the meta file, but do not mark it dirty. Since we are not
321
 
      doing a write we won't mark it dirty (and we won't open it for
322
 
      anything but reading... open it for write and we will generate null
323
 
      compression writes).
324
 
    */
325
 
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
326
 
                 AZ_METHOD_BLOCK)))
 
474
    if (share->prime(&stats.auto_increment_value) == false)
327
475
    {
328
 
      VOID(pthread_mutex_destroy(&share->mutex));
329
 
      free(share);
330
476
      pthread_mutex_unlock(&archive_mutex);
331
477
      *rc= HA_ERR_CRASHED_ON_REPAIR;
332
 
      return(NULL);
333
 
    }
334
 
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
335
 
    share->rows_recorded= (ha_rows)archive_tmp.rows;
336
 
    share->crashed= archive_tmp.dirty;
337
 
    if (share->version < global_version)
338
 
    {
339
 
      share->version_rows= share->rows_recorded;
340
 
      share->version= global_version;
341
 
    }
342
 
    azclose(&archive_tmp);
343
 
 
344
 
    VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
 
478
      delete share;
 
479
 
 
480
      return NULL;
 
481
    }
 
482
 
 
483
    a_engine->addOpenTable(share->table_name, share);
345
484
    thr_lock_init(&share->lock);
346
485
  }
347
486
  share->use_count++;
 
487
 
348
488
  if (share->crashed)
349
489
    *rc= HA_ERR_CRASHED_ON_USAGE;
350
490
  pthread_mutex_unlock(&archive_mutex);
353
493
}
354
494
 
355
495
 
356
 
/* 
 
496
/*
357
497
  Free the share.
358
498
  See ha_example.cc for a description.
359
499
*/
360
500
int ha_archive::free_share()
361
501
{
362
 
  int rc= 0;
363
 
 
364
502
  pthread_mutex_lock(&archive_mutex);
365
503
  if (!--share->use_count)
366
504
  {
367
 
    hash_delete(&archive_open_tables, (uchar*) share);
368
 
    thr_lock_delete(&share->lock);
369
 
    VOID(pthread_mutex_destroy(&share->mutex));
370
 
    /* 
371
 
      We need to make sure we don't reset the crashed state.
372
 
      If we open a crashed file, wee need to close it as crashed unless
373
 
      it has been repaired.
374
 
      Since we will close the data down after this, we go on and count
375
 
      the flush on close;
376
 
    */
377
 
    if (share->archive_write_open == true)
378
 
    {
379
 
      if (azclose(&(share->archive_write)))
380
 
        rc= 1;
381
 
    }
382
 
    my_free((uchar*) share, MYF(0));
 
505
    ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
 
506
    a_engine->deleteOpenTable(share->table_name);
 
507
    delete share;
383
508
  }
384
509
  pthread_mutex_unlock(&archive_mutex);
385
510
 
386
 
  return(rc);
 
511
  return 0;
387
512
}
388
513
 
389
514
int ha_archive::init_archive_writer()
390
515
{
391
 
  /* 
 
516
  /*
392
517
    It is expensive to open and close the data files and since you can't have
393
518
    a gzip file that can be both read and written we keep a writer open
394
519
    that is shared amoung all open tables.
395
520
  */
396
 
  if (!(azopen(&(share->archive_write), share->data_file_name, 
397
 
               O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
 
521
  if (!(azopen(&(share->archive_write), share->data_file_name,
 
522
               O_RDWR, AZ_METHOD_BLOCK)))
398
523
  {
399
524
    share->crashed= true;
400
525
    return(1);
405
530
}
406
531
 
407
532
 
408
 
/* 
409
 
  No locks are required because it is associated with just one handler instance
 
533
/*
 
534
  No locks are required because it is associated with just one Cursor instance
410
535
*/
411
536
int ha_archive::init_archive_reader()
412
537
{
413
 
  /* 
 
538
  /*
414
539
    It is expensive to open and close the data files and since you can't have
415
540
    a gzip file that can be both read and written we keep a writer open
416
541
    that is shared amoung all open tables.
430
555
    default:
431
556
      method= AZ_METHOD_BLOCK;
432
557
    }
433
 
    if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY, 
 
558
    if (!(azopen(&archive, share->data_file_name, O_RDONLY,
434
559
                 method)))
435
560
    {
436
561
      share->crashed= true;
442
567
  return(0);
443
568
}
444
569
 
445
 
 
446
570
/*
447
 
  We just implement one additional file extension.
448
 
*/
449
 
static const char *ha_archive_exts[] = {
450
 
  ARZ,
451
 
  NullS
452
 
};
453
 
 
454
 
const char **ha_archive::bas_ext() const
455
 
{
456
 
  return ha_archive_exts;
457
 
}
458
 
 
459
 
 
460
 
/* 
461
571
  When opening a file we:
462
572
  Create/get our shared structure.
463
573
  Init out lock.
464
574
  We open the file we will read from.
465
575
*/
466
 
int ha_archive::open(const char *name,
467
 
                     int mode __attribute__((unused)),
468
 
                     uint open_options)
 
576
int ha_archive::open(const char *name, int, uint32_t)
469
577
{
470
578
  int rc= 0;
471
579
  share= get_share(name, &rc);
472
580
 
473
 
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
 
581
  /** 
 
582
    We either fix it ourselves, or we just take it offline 
 
583
 
 
584
    @todo Create some documentation in the recovery tools shipped with the engine.
 
585
  */
 
586
  if (rc == HA_ERR_CRASHED_ON_USAGE)
474
587
  {
475
 
    /* purecov: begin inspected */
476
588
    free_share();
477
 
    return(rc);
478
 
    /* purecov: end */    
 
589
    rc= repair();
 
590
 
 
591
    return 0;
479
592
  }
480
593
  else if (rc == HA_ERR_OUT_OF_MEM)
481
594
  {
484
597
 
485
598
  assert(share);
486
599
 
487
 
  record_buffer= create_record_buffer(table->s->reclength + 
 
600
  record_buffer= create_record_buffer(table->s->reclength +
488
601
                                      ARCHIVE_ROW_HEADER_SIZE);
489
602
 
490
603
  if (!record_buffer)
495
608
 
496
609
  thr_lock_data_init(&share->lock, &lock, NULL);
497
610
 
498
 
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
499
 
  {
500
 
    return(0);
501
 
  }
502
 
  else
503
 
    return(rc);
 
611
  return(rc);
504
612
}
505
613
 
506
614
 
509
617
 
510
618
  SYNOPSIS
511
619
    close();
512
 
  
 
620
 
513
621
  IMPLEMENTATION:
514
622
 
515
623
  We first close this storage engines file handle to the archive and
541
649
 
542
650
 
543
651
/*
544
 
  We create our data file here. The format is pretty simple. 
 
652
  We create our data file here. The format is pretty simple.
545
653
  You can read about the format of the data file above.
546
 
  Unlike other storage engines we do not "pack" our data. Since we 
547
 
  are about to do a general compression, packing would just be a waste of 
548
 
  CPU time. If the table has blobs they are written after the row in the order 
 
654
  Unlike other storage engines we do not "pack" our data. Since we
 
655
  are about to do a general compression, packing would just be a waste of
 
656
  CPU time. If the table has blobs they are written after the row in the order
549
657
  of creation.
550
658
*/
551
659
 
552
 
int ha_archive::create(const char *name, TABLE *table_arg,
553
 
                       HA_CREATE_INFO *create_info)
 
660
int ArchiveEngine::doCreateTable(Session *,
 
661
                                 const char *table_name,
 
662
                                 Table& table_arg,
 
663
                                 drizzled::message::Table& proto)
554
664
{
555
665
  char name_buff[FN_REFLEN];
556
 
  char linkname[FN_REFLEN];
557
 
  int error;
 
666
  int error= 0;
558
667
  azio_stream create_stream;            /* Archive file we are working with */
559
 
  File frm_file;                   /* File handler for readers */
560
 
  struct stat file_stat;
561
 
  uchar *frm_ptr;
562
 
 
563
 
  stats.auto_increment_value= create_info->auto_increment_value;
564
 
 
565
 
  for (uint key= 0; key < table_arg->s->keys; key++)
 
668
  uint64_t auto_increment_value;
 
669
  string serialized_proto;
 
670
 
 
671
  auto_increment_value= proto.options().auto_increment_value();
 
672
 
 
673
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
566
674
  {
567
 
    KEY *pos= table_arg->key_info+key;
 
675
    KEY *pos= table_arg.key_info+key;
568
676
    KEY_PART_INFO *key_part=     pos->key_part;
569
677
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
570
678
 
580
688
    }
581
689
  }
582
690
 
583
 
  /* 
 
691
  /*
584
692
    We reuse name_buff since it is available.
585
693
  */
586
 
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
 
694
  fn_format(name_buff, table_name, "", ARZ,
 
695
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
696
 
 
697
  errno= 0;
 
698
  if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
 
699
             AZ_METHOD_BLOCK) == 0)
587
700
  {
588
 
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
589
 
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
590
 
    fn_format(linkname, name, "", ARZ,
591
 
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
701
    error= errno;
 
702
    goto error2;
592
703
  }
593
 
  else
 
704
 
 
705
  proto.SerializeToString(&serialized_proto);
 
706
 
 
707
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
 
708
                  serialized_proto.length()))
 
709
    goto error2;
 
710
 
 
711
  if (proto.options().has_comment())
594
712
  {
595
 
    fn_format(name_buff, name, "", ARZ,
596
 
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
597
 
    linkname[0]= 0;
 
713
    int write_length;
 
714
 
 
715
    write_length= azwrite_comment(&create_stream,
 
716
                                  proto.options().comment().c_str(),
 
717
                                  proto.options().comment().length());
 
718
 
 
719
    if (write_length < 0)
 
720
    {
 
721
      error= errno;
 
722
      goto error2;
 
723
    }
598
724
  }
599
725
 
600
726
  /*
601
 
    There is a chance that the file was "discovered". In this case
602
 
    just use whatever file is there.
 
727
    Yes you need to do this, because the starting value
 
728
    for the autoincrement may not be zero.
603
729
  */
604
 
  if (!stat(name_buff, &file_stat))
 
730
  create_stream.auto_increment= auto_increment_value ?
 
731
    auto_increment_value - 1 : 0;
 
732
 
 
733
  if (azclose(&create_stream))
605
734
  {
606
 
    my_errno= 0;
607
 
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
608
 
                 AZ_METHOD_BLOCK)))
609
 
    {
610
 
      error= errno;
611
 
      goto error2;
612
 
    }
613
 
 
614
 
    if (linkname[0])
615
 
      my_symlink(name_buff, linkname, MYF(0));
616
 
    fn_format(name_buff, name, "", ".frm",
617
 
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
618
 
 
619
 
    /*
620
 
      Here is where we open up the frm and pass it to archive to store 
621
 
    */
622
 
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
623
 
    {
624
 
      if (fstat(frm_file, &file_stat))
625
 
      {
626
 
        frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
627
 
        if (frm_ptr)
628
 
        {
629
 
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
630
 
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
631
 
          my_free((uchar*)frm_ptr, MYF(0));
632
 
        }
633
 
      }
634
 
      my_close(frm_file, MYF(0));
635
 
    }
636
 
 
637
 
    if (create_info->comment.str)
638
 
      azwrite_comment(&create_stream, create_info->comment.str, 
639
 
                      (unsigned int)create_info->comment.length);
640
 
 
641
 
    /* 
642
 
      Yes you need to do this, because the starting value 
643
 
      for the autoincrement may not be zero.
644
 
    */
645
 
    create_stream.auto_increment= stats.auto_increment_value ?
646
 
                                    stats.auto_increment_value - 1 : 0;
647
 
    if (azclose(&create_stream))
648
 
    {
649
 
      error= errno;
650
 
      goto error2;
651
 
    }
 
735
    error= errno;
 
736
    goto error2;
652
737
  }
653
 
  else
654
 
    my_errno= 0;
655
738
 
656
739
  return(0);
657
740
 
658
741
error2:
659
 
  delete_table(name);
 
742
  unlink(name_buff);
 
743
 
660
744
error:
661
745
  /* Return error number, if we got one */
662
746
  return(error ? error : -1);
665
749
/*
666
750
  This is where the actual row is written out.
667
751
*/
668
 
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
 
752
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
669
753
{
670
 
  my_off_t written;
 
754
  off_t written;
671
755
  unsigned int r_pack_length;
672
756
 
673
757
  /* We pack the row for writing */
686
770
}
687
771
 
688
772
 
689
 
/* 
 
773
/*
690
774
  Calculate max length needed for row. This includes
691
775
  the bytes required for the length in the header.
692
776
*/
693
777
 
694
 
uint32_t ha_archive::max_row_length(const uchar *buf __attribute__((unused)))
 
778
uint32_t ha_archive::max_row_length(const unsigned char *)
695
779
{
696
 
  uint32_t length= (uint32_t)(table->s->reclength + table->s->fields*2);
 
780
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
697
781
  length+= ARCHIVE_ROW_HEADER_SIZE;
698
782
 
699
 
  uint *ptr, *end;
700
 
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
 
783
  uint32_t *ptr, *end;
 
784
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
701
785
       ptr != end ;
702
786
       ptr++)
703
787
  {
708
792
}
709
793
 
710
794
 
711
 
unsigned int ha_archive::pack_row(uchar *record)
 
795
unsigned int ha_archive::pack_row(unsigned char *record)
712
796
{
713
 
  uchar *ptr;
 
797
  unsigned char *ptr;
714
798
 
715
799
  if (fix_rec_buff(max_row_length(record)))
716
 
    return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
 
800
    return(HA_ERR_OUT_OF_MEM);
717
801
 
718
802
  /* Copy null bits */
719
803
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
729
813
}
730
814
 
731
815
 
732
 
/* 
 
816
/*
733
817
  Look at ha_archive::open() for an explanation of the row format.
734
818
  Here we just write out the row.
735
819
 
736
820
  Wondering about start_bulk_insert()? We don't implement it for
737
821
  archive since it optimizes for lots of writes. The only save
738
 
  for implementing start_bulk_insert() is that we could skip 
 
822
  for implementing start_bulk_insert() is that we could skip
739
823
  setting dirty to true each time.
740
824
*/
741
 
int ha_archive::write_row(uchar *buf)
 
825
int ha_archive::write_row(unsigned char *buf)
742
826
{
743
827
  int rc;
744
 
  uchar *read_buf= NULL;
 
828
  unsigned char *read_buf= NULL;
745
829
  uint64_t temp_auto;
746
 
  uchar *record=  table->record[0];
 
830
  unsigned char *record=  table->record[0];
747
831
 
748
832
  if (share->crashed)
749
833
    return(HA_ERR_CRASHED_ON_USAGE);
750
834
 
751
835
  ha_statistic_increment(&SSV::ha_write_count);
752
 
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
753
 
    table->timestamp_field->set_time();
754
836
  pthread_mutex_lock(&share->mutex);
755
837
 
756
838
  if (share->archive_write_open == false)
768
850
      We don't support decremening auto_increment. They make the performance
769
851
      just cry.
770
852
    */
771
 
    if (temp_auto <= share->archive_write.auto_increment && 
 
853
    if (temp_auto <= share->archive_write.auto_increment &&
772
854
        mkey->flags & HA_NOSAME)
773
855
    {
774
856
      rc= HA_ERR_FOUND_DUPP_KEY;
775
857
      goto error;
776
858
    }
777
 
#ifdef DEAD_CODE
778
 
    /*
779
 
      Bad news, this will cause a search for the unique value which is very 
780
 
      expensive since we will have to do a table scan which will lock up 
781
 
      all other writers during this period. This could perhaps be optimized 
782
 
      in the future.
783
 
    */
784
 
    {
785
 
      /* 
786
 
        First we create a buffer that we can use for reading rows, and can pass
787
 
        to get_row().
788
 
      */
789
 
      if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
790
 
      {
791
 
        rc= HA_ERR_OUT_OF_MEM;
792
 
        goto error;
793
 
      }
794
 
       /* 
795
 
         All of the buffer must be written out or we won't see all of the
796
 
         data 
797
 
       */
798
 
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
799
 
      /*
800
 
        Set the position of the local read thread to the beginning postion.
801
 
      */
802
 
      if (read_data_header(&archive))
803
 
      {
804
 
        rc= HA_ERR_CRASHED_ON_USAGE;
805
 
        goto error;
806
 
      }
807
 
 
808
 
      Field *mfield= table->next_number_field;
809
 
 
810
 
      while (!(get_row(&archive, read_buf)))
811
 
      {
812
 
        if (!memcmp(read_buf + mfield->offset(record),
813
 
                    table->next_number_field->ptr,
814
 
                    mfield->max_display_length()))
815
 
        {
816
 
          rc= HA_ERR_FOUND_DUPP_KEY;
817
 
          goto error;
818
 
        }
819
 
      }
820
 
    }
821
 
#endif
822
859
    else
823
860
    {
824
861
      if (temp_auto > share->archive_write.auto_increment)
836
873
error:
837
874
  pthread_mutex_unlock(&share->mutex);
838
875
  if (read_buf)
839
 
    my_free((uchar*) read_buf, MYF(0));
 
876
    free((unsigned char*) read_buf);
840
877
 
841
878
  return(rc);
842
879
}
843
880
 
844
881
 
845
 
void ha_archive::get_auto_increment(uint64_t offset __attribute__((unused)),
846
 
                                    uint64_t increment __attribute__((unused)),
847
 
                                    uint64_t nb_desired_values __attribute__((unused)),
848
 
                                    uint64_t *first_value __attribute__((unused)),
849
 
                                    uint64_t *nb_reserved_values __attribute__((unused)))
 
882
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
 
883
                                    uint64_t *first_value, uint64_t *nb_reserved_values)
850
884
{
851
885
  *nb_reserved_values= UINT64_MAX;
852
886
  *first_value= share->archive_write.auto_increment + 1;
853
887
}
854
888
 
855
889
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
856
 
int ha_archive::index_init(uint keynr, bool sorted __attribute__((unused)))
 
890
int ha_archive::index_init(uint32_t keynr, bool)
857
891
{
858
892
  active_index= keynr;
859
893
  return(0);
864
898
  No indexes, so if we get a request for an index search since we tell
865
899
  the optimizer that we have unique indexes, we scan
866
900
*/
867
 
int ha_archive::index_read(uchar *buf, const uchar *key,
868
 
                             uint key_len, enum ha_rkey_function find_flag)
 
901
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
 
902
                             uint32_t key_len, enum ha_rkey_function find_flag)
869
903
{
870
904
  int rc;
871
905
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
873
907
}
874
908
 
875
909
 
876
 
int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
877
 
                               uint key_len,
878
 
                               enum ha_rkey_function find_flag __attribute__((unused)))
 
910
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
 
911
                               uint32_t key_len, enum ha_rkey_function)
879
912
{
880
913
  int rc;
881
914
  bool found= 0;
906
939
}
907
940
 
908
941
 
909
 
int ha_archive::index_next(uchar * buf) 
910
 
 
942
int ha_archive::index_next(unsigned char * buf)
 
943
{
911
944
  bool found= 0;
912
945
 
913
946
  while (!(get_row(&archive, buf)))
919
952
    }
920
953
  }
921
954
 
922
 
  return(found ? 0 : HA_ERR_END_OF_FILE); 
 
955
  return(found ? 0 : HA_ERR_END_OF_FILE);
923
956
}
924
957
 
925
958
/*
947
980
 
948
981
 
949
982
/*
950
 
  This is the method that is used to read a row. It assumes that the row is 
 
983
  This is the method that is used to read a row. It assumes that the row is
951
984
  positioned where you want it.
952
985
*/
953
 
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
 
986
int ha_archive::get_row(azio_stream *file_to_read, unsigned char *buf)
954
987
{
955
988
  int rc;
956
989
 
969
1002
 
970
1003
  if (length > record_buffer->length)
971
1004
  {
972
 
    uchar *newptr;
973
 
    if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer, 
974
 
                                    length,
975
 
                                    MYF(MY_ALLOW_ZERO_PTR))))
 
1005
    unsigned char *newptr;
 
1006
    if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
976
1007
      return(1);
977
1008
    record_buffer->buffer= newptr;
978
1009
    record_buffer->length= length;
983
1014
  return(0);
984
1015
}
985
1016
 
986
 
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
 
1017
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
987
1018
{
988
1019
  unsigned int read;
989
1020
  int error;
990
 
  const uchar *ptr;
 
1021
  const unsigned char *ptr;
991
1022
 
992
1023
  read= azread_row(file_to_read, &error);
993
 
  ptr= (const uchar *)file_to_read->row_ptr;
 
1024
  ptr= (const unsigned char *)file_to_read->row_ptr;
994
1025
 
995
1026
  if (error || read == 0)
996
1027
  {
998
1029
  }
999
1030
 
1000
1031
  /* Copy null bits */
1001
 
  memcpy(record, ptr, table->s->null_bytes);
1002
 
  ptr+= table->s->null_bytes;
 
1032
  memcpy(record, ptr, table->getNullBytes());
 
1033
  ptr+= table->getNullBytes();
1003
1034
  for (Field **field=table->field ; *field ; field++)
1004
1035
  {
1005
1036
    if (!((*field)->is_null()))
1011
1042
}
1012
1043
 
1013
1044
 
1014
 
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
 
1045
int ha_archive::get_row_version3(azio_stream *file_to_read, unsigned char *buf)
1015
1046
{
1016
1047
  int returnable= unpack_row(file_to_read, buf);
1017
1048
 
1019
1050
}
1020
1051
 
1021
1052
 
1022
 
/* 
 
1053
/*
1023
1054
  Called during ORDER BY. Its position is either from being called sequentially
1024
1055
  or by having had ha_archive::rnd_pos() called before it is called.
1025
1056
*/
1026
1057
 
1027
 
int ha_archive::rnd_next(uchar *buf)
 
1058
int ha_archive::rnd_next(unsigned char *buf)
1028
1059
{
1029
1060
  int rc;
1030
1061
 
1046
1077
 
1047
1078
 
1048
1079
/*
1049
 
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
 
1080
  Thanks to the table bool is_ordered this will be called after
1050
1081
  each call to ha_archive::rnd_next() if an ordering of the rows is
1051
1082
  needed.
1052
1083
*/
1053
1084
 
1054
 
void ha_archive::position(const uchar *record __attribute__((unused)))
 
1085
void ha_archive::position(const unsigned char *)
1055
1086
{
1056
1087
  my_store_ptr(ref, ref_length, current_position);
1057
1088
  return;
1065
1096
  correctly ordered row.
1066
1097
*/
1067
1098
 
1068
 
int ha_archive::rnd_pos(uchar * buf, uchar *pos)
 
1099
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
1069
1100
{
1070
1101
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1071
1102
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
1075
1106
}
1076
1107
 
1077
1108
/*
1078
 
  This method repairs the meta file. It does this by walking the datafile and 
 
1109
  This method repairs the meta file. It does this by walking the datafile and
1079
1110
  rewriting the meta file. Currently it does this by calling optimize with
1080
1111
  the extended flag.
1081
1112
*/
1082
 
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
 
1113
int ha_archive::repair()
1083
1114
{
1084
 
  check_opt->flags= T_EXTEND;
1085
 
  int rc= optimize(thd, check_opt);
 
1115
  int rc= optimize();
1086
1116
 
1087
1117
  if (rc)
1088
1118
    return(HA_ERR_CRASHED_ON_REPAIR);
1093
1123
 
1094
1124
/*
1095
1125
  The table can become fragmented if data was inserted, read, and then
1096
 
  inserted again. What we do is open up the file and recompress it completely. 
 
1126
  inserted again. What we do is open up the file and recompress it completely.
1097
1127
*/
1098
 
int ha_archive::optimize(THD* thd __attribute__((unused)),
1099
 
                         HA_CHECK_OPT* check_opt __attribute__((unused)))
 
1128
int ha_archive::optimize()
1100
1129
{
1101
1130
  int rc= 0;
1102
1131
  azio_stream writer;
1111
1140
    share->archive_write_open= false;
1112
1141
  }
1113
1142
 
 
1143
  char* proto_string;
 
1144
  proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
 
1145
  if (proto_string == NULL)
 
1146
  {
 
1147
    return ENOMEM;
 
1148
  }
 
1149
  azread_frm(&archive, proto_string);
 
1150
 
1114
1151
  /* Lets create a file to contain the new data */
1115
 
  fn_format(writer_filename, share->table_name, "", ARN, 
 
1152
  fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1116
1153
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1117
1154
 
1118
 
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
1119
 
    return(HA_ERR_CRASHED_ON_USAGE); 
1120
 
 
1121
 
  /* 
1122
 
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
1123
 
    Any dead rows are removed (aka rows that may have been partially recorded). 
 
1155
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
1156
  {
 
1157
    free(proto_string);
 
1158
    return(HA_ERR_CRASHED_ON_USAGE);
 
1159
  }
 
1160
 
 
1161
  azwrite_frm(&writer, proto_string, archive.frm_length);
 
1162
 
 
1163
  /*
 
1164
    An extended rebuild is a lot more effort. We open up each row and re-record it.
 
1165
    Any dead rows are removed (aka rows that may have been partially recorded).
1124
1166
 
1125
1167
    As of Archive format 3, this is the only type that is performed, before this
1126
1168
    version it was just done on T_EXTEND
1128
1170
  if (1)
1129
1171
  {
1130
1172
    /*
1131
 
      Now we will rewind the archive file so that we are positioned at the 
 
1173
      Now we will rewind the archive file so that we are positioned at the
1132
1174
      start of the file.
1133
1175
    */
1134
1176
    azflush(&archive, Z_SYNC_FLUSH);
1135
1177
    rc= read_data_header(&archive);
1136
1178
 
1137
 
    /* 
 
1179
    /*
1138
1180
      On success of writing out the new header, we now fetch each row and
1139
 
      insert it into the new archive file. 
 
1181
      insert it into the new archive file.
1140
1182
    */
1141
1183
    if (!rc)
1142
1184
    {
1145
1187
      share->rows_recorded= 0;
1146
1188
      stats.auto_increment_value= 1;
1147
1189
      share->archive_write.auto_increment= 0;
1148
 
      my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
1149
1190
 
1150
1191
      rows_restored= archive.rows;
1151
1192
 
1164
1205
        if (table->found_next_number_field)
1165
1206
        {
1166
1207
          Field *field= table->found_next_number_field;
 
1208
 
 
1209
          /* Since we will need to use field to translate, we need to flip its read bit */
 
1210
          field->setReadSet();
 
1211
 
1167
1212
          uint64_t auto_value=
1168
1213
            (uint64_t) field->val_int(table->record[0] +
1169
1214
                                       field->offset(table->record[0]));
1172
1217
              (share->archive_write.auto_increment= auto_value) + 1;
1173
1218
        }
1174
1219
      }
1175
 
      dbug_tmp_restore_column_map(table->read_set, org_bitmap);
1176
1220
      share->rows_recorded= (ha_rows)writer.rows;
1177
1221
    }
1178
1222
 
1180
1224
    {
1181
1225
      goto error;
1182
1226
    }
1183
 
  } 
 
1227
  }
1184
1228
 
1185
1229
  azclose(&writer);
1186
1230
  share->dirty= false;
1187
 
  
 
1231
 
1188
1232
  azclose(&archive);
1189
1233
 
1190
1234
  // make the file we just wrote be our data file
1191
1235
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1192
1236
 
1193
 
 
 
1237
  free(proto_string);
1194
1238
  return(rc);
1195
1239
error:
 
1240
  free(proto_string);
1196
1241
  azclose(&writer);
1197
1242
 
1198
 
  return(rc); 
 
1243
  return(rc);
1199
1244
}
1200
1245
 
1201
 
/* 
 
1246
/*
1202
1247
  Below is an example of how to setup row level locking.
1203
1248
*/
1204
 
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
 
1249
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
1205
1250
                                       THR_LOCK_DATA **to,
1206
1251
                                       enum thr_lock_type lock_type)
1207
1252
{
1208
 
  if (lock_type == TL_WRITE_DELAYED)
1209
 
    delayed_insert= true;
1210
 
  else
1211
 
    delayed_insert= false;
 
1253
  delayed_insert= false;
1212
1254
 
1213
 
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
 
1255
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1214
1256
  {
1215
 
    /* 
 
1257
    /*
1216
1258
      Here is where we get into the guts of a row level lock.
1217
 
      If TL_UNLOCK is set 
1218
 
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
1219
 
      TABLESPACE, then allow multiple writers 
 
1259
      If TL_UNLOCK is set
 
1260
      If we are not doing a LOCK Table or DISCARD/IMPORT
 
1261
      TABLESPACE, then allow multiple writers
1220
1262
    */
1221
1263
 
1222
1264
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1223
 
         lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1224
 
        && !thd_tablespace_op(thd))
 
1265
         lock_type <= TL_WRITE)
 
1266
        && !session_tablespace_op(session))
1225
1267
      lock_type = TL_WRITE_ALLOW_WRITE;
1226
1268
 
1227
 
    /* 
 
1269
    /*
1228
1270
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1229
1271
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1230
1272
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1231
1273
      to t2. Convert the lock to a normal read lock to allow
1232
 
      concurrent inserts to t2. 
 
1274
      concurrent inserts to t2.
1233
1275
    */
1234
1276
 
1235
 
    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) 
 
1277
    if (lock_type == TL_READ_NO_INSERT)
1236
1278
      lock_type = TL_READ;
1237
1279
 
1238
1280
    lock.type=lock_type;
1243
1285
  return to;
1244
1286
}
1245
1287
 
1246
 
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1247
 
{
1248
 
  ha_archive::info(HA_STATUS_AUTO);
1249
 
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1250
 
  {
1251
 
    create_info->auto_increment_value= stats.auto_increment_value;
1252
 
  }
1253
 
 
1254
 
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
1255
 
    create_info->data_file_name= share->real_path;
1256
 
 
1257
 
  return;
1258
 
}
1259
 
 
1260
 
 
1261
1288
/*
1262
1289
  Hints for optimizer, see ha_tina for more information
1263
1290
*/
1264
 
int ha_archive::info(uint flag)
 
1291
int ha_archive::info(uint32_t flag)
1265
1292
{
1266
 
  /* 
 
1293
  /*
1267
1294
    If dirty, we lock, and then reset/flush the data.
1268
1295
    I found that just calling azflush() doesn't always work.
1269
1296
  */
1281
1308
 
1282
1309
  }
1283
1310
 
1284
 
  /* 
 
1311
  /*
1285
1312
    This should be an accurate number now, though bulk and delayed inserts can
1286
1313
    cause the number to be inaccurate.
1287
1314
  */
1296
1323
  {
1297
1324
    struct stat file_stat;  // Stat information for the data file
1298
1325
 
1299
 
    VOID(stat(share->data_file_name, &file_stat));
 
1326
    stat(share->data_file_name, &file_stat);
1300
1327
 
1301
 
    stats.mean_rec_length= table->s->reclength + buffer.alloced_length();
 
1328
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1302
1329
    stats.data_file_length= file_stat.st_size;
1303
1330
    stats.create_time= file_stat.st_ctime;
1304
1331
    stats.update_time= file_stat.st_mtime;
1334
1361
}
1335
1362
 
1336
1363
 
1337
 
/* 
 
1364
/*
1338
1365
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1339
1366
  flag, and set the share dirty so that the next select will call sync for us.
1340
1367
*/
1347
1374
 
1348
1375
/*
1349
1376
  We cancel a truncate command. The only way to delete an archive table is to drop it.
1350
 
  This is done for security reasons. In a later version we will enable this by 
 
1377
  This is done for security reasons. In a later version we will enable this by
1351
1378
  allowing the user to select a different row format.
1352
1379
*/
1353
1380
int ha_archive::delete_all_rows()
1356
1383
}
1357
1384
 
1358
1385
/*
1359
 
  We just return state if asked.
1360
 
*/
1361
 
bool ha_archive::is_crashed() const 
1362
 
{
1363
 
  return(share->crashed); 
1364
 
}
1365
 
 
1366
 
/*
1367
1386
  Simple scan of the tables to make sure everything is ok.
1368
1387
*/
1369
1388
 
1370
 
int ha_archive::check(THD* thd,
1371
 
                      HA_CHECK_OPT* check_opt __attribute__((unused)))
 
1389
int ha_archive::check(Session* session)
1372
1390
{
1373
1391
  int rc= 0;
1374
1392
  const char *old_proc_info;
1375
1393
  uint64_t x;
1376
1394
 
1377
 
  old_proc_info= thd_proc_info(thd, "Checking table");
 
1395
  old_proc_info= get_session_proc_info(session);
 
1396
  set_session_proc_info(session, "Checking table");
1378
1397
  /* Flush any waiting data */
1379
1398
  pthread_mutex_lock(&share->mutex);
1380
1399
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1381
1400
  pthread_mutex_unlock(&share->mutex);
1382
1401
 
1383
1402
  /*
1384
 
    Now we will rewind the archive file so that we are positioned at the 
 
1403
    Now we will rewind the archive file so that we are positioned at the
1385
1404
    start of the file.
1386
1405
  */
1387
1406
  init_archive_reader();
1395
1414
      break;
1396
1415
  }
1397
1416
 
1398
 
  thd_proc_info(thd, old_proc_info);
 
1417
  set_session_proc_info(session, old_proc_info);
1399
1418
 
1400
 
  if ((rc && rc != HA_ERR_END_OF_FILE))  
 
1419
  if ((rc && rc != HA_ERR_END_OF_FILE))
1401
1420
  {
1402
1421
    share->crashed= false;
1403
1422
    return(HA_ADMIN_CORRUPT);
1408
1427
  }
1409
1428
}
1410
1429
 
1411
 
/*
1412
 
  Check and repair the table if needed.
1413
 
*/
1414
 
bool ha_archive::check_and_repair(THD *thd) 
1415
 
{
1416
 
  HA_CHECK_OPT check_opt;
1417
 
 
1418
 
  check_opt.init();
1419
 
 
1420
 
  return(repair(thd, &check_opt));
1421
 
}
1422
 
 
1423
 
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) 
 
1430
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1424
1431
{
1425
1432
  archive_record_buffer *r;
1426
 
  if (!(r= 
1427
 
        (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1428
 
                                           MYF(MY_WME))))
 
1433
  if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1429
1434
  {
1430
 
    return(NULL); /* purecov: inspected */
 
1435
    return(NULL);
1431
1436
  }
1432
1437
  r->length= (int)length;
1433
1438
 
1434
 
  if (!(r->buffer= (uchar*) my_malloc(r->length,
1435
 
                                    MYF(MY_WME))))
 
1439
  if (!(r->buffer= (unsigned char*) malloc(r->length)))
1436
1440
  {
1437
 
    my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1438
 
    return(NULL); /* purecov: inspected */
 
1441
    free((char*) r);
 
1442
    return(NULL);
1439
1443
  }
1440
1444
 
1441
1445
  return(r);
1442
1446
}
1443
1447
 
1444
 
void ha_archive::destroy_record_buffer(archive_record_buffer *r) 
 
1448
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1445
1449
{
1446
 
  my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
1447
 
  my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
 
1450
  free((char*) r->buffer);
 
1451
  free((char*) r);
1448
1452
  return;
1449
1453
}
1450
1454
 
1453
1457
  "Whether or not to use asynchronous IO.",
1454
1458
  NULL, NULL, true);
1455
1459
 
1456
 
static struct st_mysql_sys_var* archive_system_variables[]= {
 
1460
static drizzle_sys_var* archive_system_variables[]= {
1457
1461
  DRIZZLE_SYSVAR(aio),
1458
1462
  NULL
1459
1463
};
1460
1464
 
1461
 
mysql_declare_plugin(archive)
 
1465
DRIZZLE_DECLARE_PLUGIN
1462
1466
{
1463
 
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
1467
  DRIZZLE_VERSION_ID,
1464
1468
  "ARCHIVE",
1465
1469
  "3.5",
1466
1470
  "Brian Aker, MySQL AB",
1472
1476
  archive_system_variables,   /* system variables                */
1473
1477
  NULL                        /* config options                  */
1474
1478
}
1475
 
mysql_declare_plugin_end;
 
1479
DRIZZLE_DECLARE_PLUGIN_END;
1476
1480