~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/csv/ha_tina.cc

  • Committer: Monty Taylor
  • Date: 2009-04-14 19:16:51 UTC
  • mto: (997.2.5 mordred)
  • mto: This revision was merged to the branch mainline in revision 994.
  • Revision ID: mordred@inaugust.com-20090414191651-ltbww6hpqks8k7qk
Clarified instructions in README.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
 
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
46
#include <drizzled/field/timestamp.h>
 
47
#include <storage/csv/ha_tina.h>
47
48
#include <drizzled/error.h>
48
49
#include <drizzled/table.h>
49
50
#include <drizzled/session.h>
50
 
#include "drizzled/internal/my_sys.h"
51
 
 
52
 
#include "ha_tina.h"
53
 
 
54
 
#include <fcntl.h>
55
51
 
56
52
#include <string>
57
 
#include <map>
58
53
 
59
54
using namespace std;
60
 
using namespace drizzled;
 
55
 
 
56
static const string engine_name("CSV");
61
57
 
62
58
/*
63
59
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
73
69
#define CSM_EXT ".CSM"               // Meta file
74
70
 
75
71
 
76
 
static int read_meta_file(int meta_file, ha_rows *rows);
77
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
72
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
73
static int free_share(TINA_SHARE *share);
 
74
static int read_meta_file(File meta_file, ha_rows *rows);
 
75
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
78
76
 
79
 
void tina_get_status(void* param, int concurrent_insert);
80
 
void tina_update_status(void* param);
81
 
bool tina_check_status(void* param);
 
77
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
78
extern "C" void tina_update_status(void* param);
 
79
extern "C" bool tina_check_status(void* param);
82
80
 
83
81
/* Stuff for shares */
84
82
pthread_mutex_t tina_mutex;
 
83
static HASH tina_open_tables;
85
84
 
86
85
/*****************************************************************************
87
86
 ** TINA tables
90
89
/*
91
90
  Used for sorting chains with qsort().
92
91
*/
93
 
static int sort_set (tina_set *a, tina_set *b)
 
92
int sort_set (tina_set *a, tina_set *b)
94
93
{
95
94
  /*
96
95
    We assume that intervals do not intersect. So, it is enought to compare
99
98
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
100
99
}
101
100
 
102
 
 
103
 
/*
104
 
  If frm_error() is called in table.cc this is called to find out what file
105
 
  extensions exist for this Cursor.
106
 
*/
107
 
static const char *ha_tina_exts[] = {
108
 
  CSV_EXT,
109
 
  CSM_EXT,
110
 
  NULL
111
 
};
112
 
 
113
 
class Tina : public drizzled::plugin::StorageEngine
114
 
{
115
 
  typedef std::map<string, TinaShare*> TinaMap;
116
 
  TinaMap tina_open_tables;
 
101
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
 
102
{
 
103
  *length=share->table_name_length;
 
104
  return (unsigned char*) share->table_name;
 
105
}
 
106
 
 
107
class Tina : public StorageEngine
 
108
{
117
109
public:
118
110
  Tina(const string& name_arg)
119
 
   : drizzled::plugin::StorageEngine(name_arg,
120
 
                                     HTON_TEMPORARY_ONLY |
121
 
                                     HTON_NO_AUTO_INCREMENT |
122
 
                                     HTON_SKIP_STORE_LOCK |
123
 
                                     HTON_FILE_BASED),
124
 
    tina_open_tables()
125
 
  {}
126
 
  virtual ~Tina()
127
 
  {
128
 
    pthread_mutex_destroy(&tina_mutex);
129
 
  }
130
 
 
131
 
  virtual Cursor *create(TableShare &table,
132
 
                         drizzled::memory::Root *mem_root)
133
 
  {
134
 
    return new (mem_root) ha_tina(*this, table);
135
 
  }
136
 
 
137
 
  const char **bas_ext() const {
138
 
    return ha_tina_exts;
139
 
  }
140
 
 
141
 
  int doCreateTable(Session &,
142
 
                    Table &table_arg,
143
 
                    drizzled::TableIdentifier &identifier,
144
 
                    drizzled::message::Table&);
145
 
 
146
 
  int doGetTableDefinition(Session& session,
147
 
                           TableIdentifier &identifier,
148
 
                           drizzled::message::Table &table_message);
149
 
 
150
 
  /* Temp only engine, so do not return values. */
151
 
  void doGetTableNames(drizzled::CachedDirectory &, SchemaIdentifier&, set<string>&) { };
152
 
 
153
 
  int doDropTable(Session&, TableIdentifier &identifier);
154
 
  TinaShare *findOpenTable(const string table_name);
155
 
  void addOpenTable(const string &table_name, TinaShare *);
156
 
  void deleteOpenTable(const string &table_name);
157
 
 
158
 
 
159
 
  uint32_t max_keys()          const { return 0; }
160
 
  uint32_t max_key_parts()     const { return 0; }
161
 
  uint32_t max_key_length()    const { return 0; }
162
 
  bool doDoesTableExist(Session& session, TableIdentifier &identifier);
163
 
  int doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to);
164
 
 
165
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
166
 
                             drizzled::SchemaIdentifier &schema_identifier,
167
 
                             drizzled::TableIdentifiers &set_of_identifiers);
 
111
   : StorageEngine(name_arg, HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES |
 
112
                             HTON_NO_PARTITION) {}
 
113
  virtual handler *create(TABLE_SHARE *table,
 
114
                          MEM_ROOT *mem_root)
 
115
  {
 
116
    return new (mem_root) ha_tina(this, table);
 
117
  }
168
118
};
169
119
 
170
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
171
 
                                 drizzled::SchemaIdentifier&,
172
 
                                 drizzled::TableIdentifiers&)
173
 
{
174
 
}
175
 
 
176
 
int Tina::doRenameTable(Session &session,
177
 
                        TableIdentifier &from, TableIdentifier &to)
178
 
{
179
 
  int error= 0;
180
 
  for (const char **ext= bas_ext(); *ext ; ext++)
181
 
  {
182
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
183
 
    {
184
 
      if ((error=errno) != ENOENT)
185
 
        break;
186
 
      error= 0;
187
 
    }
188
 
  }
189
 
 
190
 
  session.renameTableMessage(from, to);
191
 
 
192
 
  return error;
193
 
}
194
 
 
195
 
bool Tina::doDoesTableExist(Session &session, TableIdentifier &identifier)
196
 
{
197
 
  return session.doesTableMessageExist(identifier);
198
 
}
199
 
 
200
 
 
201
 
int Tina::doDropTable(Session &session,
202
 
                      TableIdentifier &identifier)
203
 
{
204
 
  int error= 0;
205
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
206
 
  char buff[FN_REFLEN];
207
 
 
208
 
  for (const char **ext= bas_ext(); *ext ; ext++)
209
 
  {
210
 
    internal::fn_format(buff, identifier.getPath().c_str(), "", *ext,
211
 
                        MY_UNPACK_FILENAME|MY_APPEND_EXT);
212
 
    if (internal::my_delete_with_symlink(buff, MYF(0)))
213
 
    {
214
 
      if ((error= errno) != ENOENT)
215
 
        break;
216
 
    }
217
 
    else
218
 
      enoent_or_zero= 0;                        // No error for ENOENT
219
 
    error= enoent_or_zero;
220
 
  }
221
 
 
222
 
  session.removeTableMessage(identifier);
223
 
 
224
 
  return error;
225
 
}
226
 
 
227
 
TinaShare *Tina::findOpenTable(const string table_name)
228
 
{
229
 
  TinaMap::iterator find_iter=
230
 
    tina_open_tables.find(table_name);
231
 
 
232
 
  if (find_iter != tina_open_tables.end())
233
 
    return (*find_iter).second;
234
 
  else
235
 
    return NULL;
236
 
}
237
 
 
238
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
239
 
{
240
 
  tina_open_tables[table_name]= share;
241
 
}
242
 
 
243
 
void Tina::deleteOpenTable(const string &table_name)
244
 
{
245
 
  tina_open_tables.erase(table_name);
246
 
}
247
 
 
248
 
 
249
 
int Tina::doGetTableDefinition(Session &session,
250
 
                               drizzled::TableIdentifier &identifier,
251
 
                               drizzled::message::Table &table_message)
252
 
{
253
 
  if (session.getTableMessage(identifier, table_message))
254
 
    return EEXIST;
255
 
 
256
 
  return ENOENT;
257
 
}
258
 
 
259
 
 
260
120
static Tina *tina_engine= NULL;
261
121
 
262
 
static int tina_init_func(drizzled::plugin::Context &context)
 
122
static int tina_init_func(PluginRegistry &registry)
263
123
{
264
124
 
265
 
  tina_engine= new Tina("CSV");
266
 
  context.add(tina_engine);
 
125
  tina_engine= new Tina(engine_name);
 
126
  registry.add(tina_engine);
267
127
 
268
128
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
269
 
  return 0;
270
 
}
271
 
 
272
 
 
273
 
 
274
 
TinaShare::TinaShare(const char *table_name_arg)
275
 
  : table_name(table_name_arg), use_count(0), saved_data_file_length(0),
276
 
    update_file_opened(false), tina_write_opened(false),
277
 
    crashed(false), rows_recorded(0), data_file_version(0)
278
 
{
279
 
  thr_lock_init(&lock);
280
 
  internal::fn_format(data_file_name, table_name_arg, "", CSV_EXT,
281
 
            MY_REPLACE_EXT|MY_UNPACK_FILENAME);
282
 
}
283
 
 
284
 
TinaShare::~TinaShare()
285
 
{
286
 
  thr_lock_delete(&lock);
287
 
  pthread_mutex_destroy(&mutex);
288
 
}
 
129
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
130
                   (hash_get_key) tina_get_key,0,0);
 
131
  return 0;
 
132
}
 
133
 
 
134
static int tina_done_func(PluginRegistry &registry)
 
135
{
 
136
  registry.remove(tina_engine);
 
137
  delete tina_engine;
 
138
 
 
139
  hash_free(&tina_open_tables);
 
140
  pthread_mutex_destroy(&tina_mutex);
 
141
 
 
142
  return 0;
 
143
}
 
144
 
289
145
 
290
146
/*
291
147
  Simple lock controls.
292
148
*/
293
 
TinaShare *ha_tina::get_share(const char *table_name)
 
149
static TINA_SHARE *get_share(const char *table_name, Table *)
294
150
{
295
 
  pthread_mutex_lock(&tina_mutex);
296
 
 
297
 
  Tina *a_tina= static_cast<Tina *>(engine);
298
 
  share= a_tina->findOpenTable(table_name);
299
 
 
 
151
  TINA_SHARE *share;
300
152
  char meta_file_name[FN_REFLEN];
301
153
  struct stat file_stat;
 
154
  char *tmp_name;
 
155
  uint32_t length;
 
156
 
 
157
  pthread_mutex_lock(&tina_mutex);
 
158
  length=(uint) strlen(table_name);
302
159
 
303
160
  /*
304
161
    If share is not present in the hash, create a new share and
305
162
    initialize its members.
306
163
  */
307
 
  if (! share)
 
164
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
165
                                        (unsigned char*) table_name,
 
166
                                       length)))
308
167
  {
309
 
    share= new TinaShare(table_name);
310
 
 
311
 
    if (share == NULL)
 
168
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
169
                         &share, sizeof(*share),
 
170
                         &tmp_name, length+1,
 
171
                         NULL))
312
172
    {
313
173
      pthread_mutex_unlock(&tina_mutex);
314
174
      return NULL;
315
175
    }
316
176
 
317
 
    internal::fn_format(meta_file_name, table_name, "", CSM_EXT,
 
177
    share->use_count= 0;
 
178
    share->table_name_length= length;
 
179
    share->table_name= tmp_name;
 
180
    share->crashed= false;
 
181
    share->rows_recorded= 0;
 
182
    share->update_file_opened= false;
 
183
    share->tina_write_opened= false;
 
184
    share->data_file_version= 0;
 
185
    strcpy(share->table_name, table_name);
 
186
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
187
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
188
    fn_format(meta_file_name, table_name, "", CSM_EXT,
318
189
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
319
190
 
320
191
    if (stat(share->data_file_name, &file_stat))
321
 
    {
322
 
      pthread_mutex_unlock(&tina_mutex);
323
 
      delete share;
324
 
      return NULL;
325
 
    }
326
 
  
 
192
      goto error;
327
193
    share->saved_data_file_length= file_stat.st_size;
328
194
 
329
 
    a_tina->addOpenTable(share->table_name, share);
330
 
 
 
195
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
196
      goto error;
 
197
    thr_lock_init(&share->lock);
331
198
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
332
199
 
333
200
    /*
336
203
      Usually this will result in auto-repair, and we will get a good
337
204
      meta-file in the end.
338
205
    */
339
 
    if ((share->meta_file= internal::my_open(meta_file_name,
340
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
206
    if ((share->meta_file= my_open(meta_file_name,
 
207
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
341
208
      share->crashed= true;
342
209
 
343
210
    /*
351
218
  pthread_mutex_unlock(&tina_mutex);
352
219
 
353
220
  return share;
 
221
 
 
222
error:
 
223
  pthread_mutex_unlock(&tina_mutex);
 
224
  free((unsigned char*) share);
 
225
 
 
226
  return NULL;
354
227
}
355
228
 
356
229
 
373
246
    non-zero - error occurred
374
247
*/
375
248
 
376
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
249
static int read_meta_file(File meta_file, ha_rows *rows)
377
250
{
378
251
  unsigned char meta_buffer[META_BUFFER_SIZE];
379
252
  unsigned char *ptr= meta_buffer;
380
253
 
381
254
  lseek(meta_file, 0, SEEK_SET);
382
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
255
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
383
256
      != META_BUFFER_SIZE)
384
257
    return(HA_ERR_CRASHED_ON_USAGE);
385
258
 
401
274
      ((bool)(*ptr)== true))
402
275
    return(HA_ERR_CRASHED_ON_USAGE);
403
276
 
404
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
277
  my_sync(meta_file, MYF(MY_WME));
405
278
 
406
279
  return(0);
407
280
}
426
299
    non-zero - error occurred
427
300
*/
428
301
 
429
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
302
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
430
303
{
431
304
  unsigned char meta_buffer[META_BUFFER_SIZE];
432
305
  unsigned char *ptr= meta_buffer;
446
319
  *ptr= (unsigned char)dirty;
447
320
 
448
321
  lseek(meta_file, 0, SEEK_SET);
449
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
322
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
450
323
      != META_BUFFER_SIZE)
451
324
    return(-1);
452
325
 
453
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
326
  my_sync(meta_file, MYF(MY_WME));
454
327
 
455
328
  return(0);
456
329
}
457
330
 
 
331
bool ha_tina::check_and_repair(Session *session)
 
332
{
 
333
  HA_CHECK_OPT check_opt;
 
334
 
 
335
  check_opt.init();
 
336
 
 
337
  return(repair(session, &check_opt));
 
338
}
 
339
 
 
340
 
458
341
int ha_tina::init_tina_writer()
459
342
{
460
343
  /*
465
348
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
466
349
 
467
350
  if ((share->tina_write_filedes=
468
 
        internal::my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
 
351
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
469
352
  {
470
353
    share->crashed= true;
471
354
    return(1);
476
359
}
477
360
 
478
361
 
 
362
bool ha_tina::is_crashed() const
 
363
{
 
364
  return(share->crashed);
 
365
}
 
366
 
479
367
/*
480
368
  Free lock controls.
481
369
*/
482
 
int ha_tina::free_share()
 
370
static int free_share(TINA_SHARE *share)
483
371
{
484
372
  pthread_mutex_lock(&tina_mutex);
485
373
  int result_code= 0;
487
375
    /* Write the meta file. Mark it as crashed if needed. */
488
376
    (void)write_meta_file(share->meta_file, share->rows_recorded,
489
377
                          share->crashed ? true :false);
490
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
378
    if (my_close(share->meta_file, MYF(0)))
491
379
      result_code= 1;
492
380
    if (share->tina_write_opened)
493
381
    {
494
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
382
      if (my_close(share->tina_write_filedes, MYF(0)))
495
383
        result_code= 1;
496
384
      share->tina_write_opened= false;
497
385
    }
498
386
 
499
 
    Tina *a_tina= static_cast<Tina *>(engine);
500
 
    a_tina->deleteOpenTable(share->table_name);
501
 
    delete share;
 
387
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
388
    thr_lock_delete(&share->lock);
 
389
    pthread_mutex_destroy(&share->mutex);
 
390
    free((unsigned char*) share);
502
391
  }
503
392
  pthread_mutex_unlock(&tina_mutex);
504
393
 
516
405
  '\r''\n' --  DOS\Windows line ending
517
406
*/
518
407
 
519
 
static off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
520
 
                            off_t end, int *eoln_len)
 
408
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
 
409
                     off_t end, int *eoln_len)
521
410
{
522
411
  *eoln_len= 0;
523
412
 
545
434
 
546
435
 
547
436
 
548
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
549
 
  :Cursor(engine_arg, table_arg),
 
437
ha_tina::ha_tina(StorageEngine *engine_arg, TABLE_SHARE *table_arg)
 
438
  :handler(engine_arg, table_arg),
550
439
  /*
551
 
    These definitions are found in Cursor.h
 
440
    These definitions are found in handler.h
552
441
    They are not probably completely right.
553
442
  */
554
443
  current_position(0), next_position(0), local_saved_data_file_length(0),
590
479
      (*field)->set_notnull();
591
480
    }
592
481
 
593
 
    /* 
594
 
      Since we are needing to "translate" the type into a string we 
595
 
      will need to do a val_str(). This would cause an assert() to
596
 
      normally occur since we are onlying "writing" values.
597
 
    */
598
 
    (*field)->setReadSet();
599
482
    (*field)->val_str(&attribute,&attribute);
600
483
 
601
484
    if (was_null)
704
587
  off_t end_offset, curr_offset= current_position;
705
588
  int eoln_len;
706
589
  int error;
 
590
  bool read_all;
707
591
 
708
 
  free_root(&blobroot, MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
592
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
709
593
 
710
594
  /*
711
595
    We do not read further then local_saved_data_file_length in order
716
600
                       local_saved_data_file_length, &eoln_len)) == 0)
717
601
    return(HA_ERR_END_OF_FILE);
718
602
 
 
603
  /* We must read all columns in case a table is opened for update */
 
604
  read_all= !table->write_set->none();
719
605
  error= HA_ERR_CRASHED_ON_USAGE;
720
606
 
721
607
  memset(buf, 0, table->s->null_bytes);
785
671
      }
786
672
    }
787
673
 
788
 
    if ((*field)->isReadSet() || (*field)->isWriteSet())
 
674
    if (read_all || table->read_set->test((*field)->field_index))
789
675
    {
790
 
      /* This masks a bug in the logic for a SELECT * */
791
 
      (*field)->setWriteSet();
792
676
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
793
677
                          CHECK_FIELD_WARN))
794
678
        goto err;
795
 
 
796
679
      if ((*field)->flags & BLOB_FLAG)
797
680
      {
798
681
        Field_blob *blob= *(Field_blob**) field;
820
703
}
821
704
 
822
705
/*
 
706
  If frm_error() is called in table.cc this is called to find out what file
 
707
  extensions exist for this handler.
 
708
*/
 
709
static const char *ha_tina_exts[] = {
 
710
  CSV_EXT,
 
711
  CSM_EXT,
 
712
  NULL
 
713
};
 
714
 
 
715
const char **ha_tina::bas_ext() const
 
716
{
 
717
  return ha_tina_exts;
 
718
}
 
719
 
 
720
/*
823
721
  Three functions below are needed to enable concurrent insert functionality
824
722
  for CSV engine. For more details see mysys/thr_lock.c
825
723
*/
892
790
  this will not be called for every request. Any sort of positions
893
791
  that need to be reset should be kept in the ::extra() call.
894
792
*/
895
 
int ha_tina::open(const char *name, int, uint32_t)
 
793
int ha_tina::open(const char *name, int, uint32_t open_options)
896
794
{
897
 
  if (!(share= get_share(name)))
 
795
  if (!(share= get_share(name, table)))
898
796
    return(ENOENT);
899
797
 
900
 
  if (share->crashed)
 
798
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
901
799
  {
902
 
    free_share();
 
800
    free_share(share);
903
801
    return(HA_ERR_CRASHED_ON_USAGE);
904
802
  }
905
803
 
906
804
  local_data_file_version= share->data_file_version;
907
 
  if ((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
 
805
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
908
806
    return(0);
909
807
 
910
808
  /*
911
 
    Init locking. Pass Cursor object to the locking routines,
 
809
    Init locking. Pass handler object to the locking routines,
912
810
    so that they could save/update local_saved_data_file_length value
913
811
    during locking. This is needed to enable concurrent inserts.
914
812
  */
930
828
int ha_tina::close(void)
931
829
{
932
830
  int rc= 0;
933
 
  rc= internal::my_close(data_file, MYF(0));
934
 
  return(free_share() || rc);
 
831
  rc= my_close(data_file, MYF(0));
 
832
  return(free_share(share) || rc);
935
833
}
936
834
 
937
835
/*
938
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
836
  This is an INSERT. At the moment this handler just seeks to the end
939
837
  of the file and appends the data. In an error case it really should
940
838
  just truncate to the original position (this is not done yet).
941
839
*/
946
844
  if (share->crashed)
947
845
      return(HA_ERR_CRASHED_ON_USAGE);
948
846
 
949
 
  ha_statistic_increment(&system_status_var::ha_write_count);
 
847
  ha_statistic_increment(&SSV::ha_write_count);
950
848
 
951
849
  size= encode_quote(buf);
952
850
 
955
853
      return(-1);
956
854
 
957
855
   /* use pwrite, as concurrent reader could have changed the position */
958
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
856
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
959
857
               MYF(MY_WME | MY_NABP)))
960
858
    return(-1);
961
859
 
980
878
  if (!share->update_file_opened)
981
879
  {
982
880
    if ((update_temp_file=
983
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
881
           my_create(fn_format(updated_fname, share->table_name,
984
882
                               "", CSN_EXT,
985
883
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
986
884
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
995
893
  This is called for an update.
996
894
  Make sure you put in code to increment the auto increment, also
997
895
  update any timestamp data. Currently auto increment is not being
998
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
896
  fixed since autoincrements have yet to be added to this table handler.
999
897
  This will be called in a table scan right before the previous ::rnd_next()
1000
898
  call.
1001
899
*/
1004
902
  int size;
1005
903
  int rc= -1;
1006
904
 
1007
 
  ha_statistic_increment(&system_status_var::ha_update_count);
 
905
  ha_statistic_increment(&SSV::ha_update_count);
1008
906
 
1009
907
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1010
908
    table->timestamp_field->set_time();
1024
922
  if (open_update_temp_file_if_needed())
1025
923
    goto err;
1026
924
 
1027
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
925
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
1028
926
               MYF(MY_WME | MY_NABP)))
1029
927
    goto err;
1030
928
  temp_file_length+= size;
1039
937
  Deletes a row. First the database will find the row, and then call this
1040
938
  method. In the case of a table scan, the previous call to this will be
1041
939
  the ::rnd_next() that found this row.
1042
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
940
  The exception to this is an ORDER BY. This will cause the table handler
1043
941
  to walk the table noting the positions of all rows that match a query.
1044
942
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
1045
943
  DESC, ASC).
1046
944
*/
1047
945
int ha_tina::delete_row(const unsigned char *)
1048
946
{
1049
 
  ha_statistic_increment(&system_status_var::ha_delete_count);
 
947
  ha_statistic_increment(&SSV::ha_delete_count);
1050
948
 
1051
949
  if (chain_append())
1052
950
    return(-1);
1080
978
  if (local_data_file_version != share->data_file_version)
1081
979
  {
1082
980
    local_data_file_version= share->data_file_version;
1083
 
    if (internal::my_close(data_file, MYF(0)) ||
1084
 
        (data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
 
981
    if (my_close(data_file, MYF(0)) ||
 
982
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
1085
983
      return 1;
1086
984
  }
1087
985
  file_buff->init_buff(data_file);
1093
991
  All table scans call this first.
1094
992
  The order of a table scan is:
1095
993
 
 
994
  ha_tina::store_lock
 
995
  ha_tina::external_lock
1096
996
  ha_tina::info
1097
997
  ha_tina::rnd_init
1098
998
  ha_tina::extra
1108
1008
  ha_tina::rnd_next
1109
1009
  ha_tina::extra
1110
1010
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1011
  ha_tina::external_lock
1111
1012
  ha_tina::extra
1112
1013
  ENUM HA_EXTRA_RESET   Reset database to after open
1113
1014
 
1128
1029
  records_is_known= 0;
1129
1030
  chain_ptr= chain;
1130
1031
 
1131
 
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE);
 
1032
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1132
1033
 
1133
1034
  return(0);
1134
1035
}
1143
1044
  reserved for null count.
1144
1045
  Basically this works as a mask for which rows are nulled (compared to just
1145
1046
  empty).
1146
 
  This table Cursor doesn't do nulls and does not know the difference between
1147
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1047
  This table handler doesn't do nulls and does not know the difference between
 
1048
  NULL and "". This is ok since this table handler is for spreadsheets and
1148
1049
  they don't know about them either :)
1149
1050
*/
1150
1051
int ha_tina::rnd_next(unsigned char *buf)
1154
1055
  if (share->crashed)
1155
1056
      return(HA_ERR_CRASHED_ON_USAGE);
1156
1057
 
1157
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1058
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1158
1059
 
1159
1060
  current_position= next_position;
1160
1061
 
1180
1081
*/
1181
1082
void ha_tina::position(const unsigned char *)
1182
1083
{
1183
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1084
  my_store_ptr(ref, ref_length, current_position);
1184
1085
  return;
1185
1086
}
1186
1087
 
1187
1088
 
1188
1089
/*
1189
1090
  Used to fetch a row from a posiion stored with ::position().
1190
 
  internal::my_get_ptr() retrieves the data for you.
 
1091
  my_get_ptr() retrieves the data for you.
1191
1092
*/
1192
1093
 
1193
1094
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1194
1095
{
1195
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1196
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1096
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1097
  current_position= (off_t)my_get_ptr(pos,ref_length);
1197
1098
  return(find_current_row(buf));
1198
1099
}
1199
1100
 
1200
1101
/*
1201
1102
  ::info() is used to return information to the optimizer.
1202
 
  Currently this table Cursor doesn't implement most of the fields
 
1103
  Currently this table handler doesn't implement most of the fields
1203
1104
  really needed. SHOW also makes use of this data
1204
1105
*/
1205
1106
int ha_tina::info(uint32_t)
1254
1155
      The sort is needed when there were updates/deletes with random orders.
1255
1156
      It sorts so that we move the firts blocks to the beginning.
1256
1157
    */
1257
 
    internal::my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1258
 
                       (qsort_cmp)sort_set);
 
1158
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1159
             (qsort_cmp)sort_set);
1259
1160
 
1260
1161
    off_t write_begin= 0, write_end;
1261
1162
 
1276
1177
      /* if there is something to write, write it */
1277
1178
      if (write_length)
1278
1179
      {
1279
 
        if (internal::my_write(update_temp_file,
 
1180
        if (my_write(update_temp_file,
1280
1181
                     (unsigned char*) (file_buff->ptr() +
1281
1182
                               (write_begin - file_buff->start())),
1282
1183
                     (size_t)write_length, MYF_RW))
1299
1200
 
1300
1201
    }
1301
1202
 
1302
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1303
 
        internal::my_close(update_temp_file, MYF(0)))
 
1203
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1204
        my_close(update_temp_file, MYF(0)))
1304
1205
      return(-1);
1305
1206
 
1306
1207
    share->update_file_opened= false;
1307
1208
 
1308
1209
    if (share->tina_write_opened)
1309
1210
    {
1310
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1211
      if (my_close(share->tina_write_filedes, MYF(0)))
1311
1212
        return(-1);
1312
1213
      /*
1313
1214
        Mark that the writer fd is closed, so that init_tina_writer()
1320
1221
      Close opened fildes's. Then move updated file in place
1321
1222
      of the old datafile.
1322
1223
    */
1323
 
    if (internal::my_close(data_file, MYF(0)) ||
1324
 
        internal::my_rename(internal::fn_format(updated_fname,
1325
 
                                                share->table_name.c_str(),
1326
 
                                                "", CSN_EXT,
1327
 
                                                MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1328
 
                            share->data_file_name, MYF(0)))
 
1224
    if (my_close(data_file, MYF(0)) ||
 
1225
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1226
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1227
                  share->data_file_name, MYF(0)))
1329
1228
      return(-1);
1330
1229
 
1331
1230
    /* Open the file again */
1332
 
    if (((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
 
1231
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1333
1232
      return(-1);
1334
1233
    /*
1335
1234
      As we reopened the data file, increase share->data_file_version
1356
1255
 
1357
1256
  return(0);
1358
1257
error:
1359
 
  internal::my_close(update_temp_file, MYF(0));
 
1258
  my_close(update_temp_file, MYF(0));
1360
1259
  share->update_file_opened= false;
1361
1260
  return(-1);
1362
1261
}
1363
1262
 
1364
1263
 
1365
1264
/*
 
1265
  Repair CSV table in the case, it is crashed.
 
1266
 
 
1267
  SYNOPSIS
 
1268
    repair()
 
1269
    session         The thread, performing repair
 
1270
    check_opt   The options for repair. We do not use it currently.
 
1271
 
 
1272
  DESCRIPTION
 
1273
    If the file is empty, change # of rows in the file and complete recovery.
 
1274
    Otherwise, scan the table looking for bad rows. If none were found,
 
1275
    we mark file as a good one and return. If a bad row was encountered,
 
1276
    we truncate the datafile up to the last good row.
 
1277
 
 
1278
   TODO: Make repair more clever - it should try to recover subsequent
 
1279
         rows (after the first bad one) as well.
 
1280
*/
 
1281
 
 
1282
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
 
1283
{
 
1284
  char repaired_fname[FN_REFLEN];
 
1285
  unsigned char *buf;
 
1286
  File repair_file;
 
1287
  int rc;
 
1288
  ha_rows rows_repaired= 0;
 
1289
  off_t write_begin= 0, write_end;
 
1290
 
 
1291
  /* empty file */
 
1292
  if (!share->saved_data_file_length)
 
1293
  {
 
1294
    share->rows_recorded= 0;
 
1295
    goto end;
 
1296
  }
 
1297
 
 
1298
  /* Don't assert in field::val() functions */
 
1299
  table->use_all_columns();
 
1300
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1301
    return(HA_ERR_OUT_OF_MEM);
 
1302
 
 
1303
  /* position buffer to the start of the file */
 
1304
  if (init_data_file())
 
1305
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1306
 
 
1307
  /*
 
1308
    Local_saved_data_file_length is initialized during the lock phase.
 
1309
    Sometimes this is not getting executed before ::repair (e.g. for
 
1310
    the log tables). We set it manually here.
 
1311
  */
 
1312
  local_saved_data_file_length= share->saved_data_file_length;
 
1313
  /* set current position to the beginning of the file */
 
1314
  current_position= next_position= 0;
 
1315
 
 
1316
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1317
 
 
1318
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1319
  while (!(rc= find_current_row(buf)))
 
1320
  {
 
1321
    session_inc_row_count(session);
 
1322
    rows_repaired++;
 
1323
    current_position= next_position;
 
1324
  }
 
1325
 
 
1326
  free_root(&blobroot, MYF(0));
 
1327
 
 
1328
  free((char*)buf);
 
1329
 
 
1330
  if (rc == HA_ERR_END_OF_FILE)
 
1331
  {
 
1332
    /*
 
1333
      All rows were read ok until end of file, the file does not need repair.
 
1334
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1335
      to the current amount of rows.
 
1336
    */
 
1337
    share->rows_recorded= rows_repaired;
 
1338
    goto end;
 
1339
  }
 
1340
 
 
1341
  /*
 
1342
    Otherwise we've encountered a bad row => repair is needed.
 
1343
    Let us create a temporary file.
 
1344
  */
 
1345
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1346
                                        "", CSN_EXT,
 
1347
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1348
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1349
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1350
 
 
1351
  file_buff->init_buff(data_file);
 
1352
 
 
1353
 
 
1354
  /* we just truncated the file up to the first bad row. update rows count. */
 
1355
  share->rows_recorded= rows_repaired;
 
1356
 
 
1357
  /* write repaired file */
 
1358
  while (1)
 
1359
  {
 
1360
    write_end= std::min(file_buff->end(), current_position);
 
1361
 
 
1362
    off_t write_length= write_end - write_begin;
 
1363
    if ((uint64_t)write_length > SIZE_MAX)
 
1364
    {
 
1365
      return -1;
 
1366
    }
 
1367
    if ((write_length) &&
 
1368
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1369
                  (size_t)write_length, MYF_RW)))
 
1370
      return(-1);
 
1371
 
 
1372
    write_begin= write_end;
 
1373
    if (write_end== current_position)
 
1374
      break;
 
1375
    else
 
1376
      file_buff->read_next(); /* shift the buffer */
 
1377
  }
 
1378
 
 
1379
  /*
 
1380
    Close the files and rename repaired file to the datafile.
 
1381
    We have to close the files, as on Windows one cannot rename
 
1382
    a file, which descriptor is still open. EACCES will be returned
 
1383
    when trying to delete the "to"-file in my_rename().
 
1384
  */
 
1385
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1386
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1387
    return(-1);
 
1388
 
 
1389
  /* Open the file again, it should now be repaired */
 
1390
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1391
                          MYF(0))) == -1)
 
1392
     return(-1);
 
1393
 
 
1394
  /* Set new file size. The file size will be updated by ::update_status() */
 
1395
  local_saved_data_file_length= (size_t) current_position;
 
1396
 
 
1397
end:
 
1398
  share->crashed= false;
 
1399
  return(HA_ADMIN_OK);
 
1400
}
 
1401
 
 
1402
/*
1366
1403
  DELETE without WHERE calls this
1367
1404
*/
1368
1405
 
1371
1408
  int rc;
1372
1409
 
1373
1410
  if (!records_is_known)
1374
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1411
    return(my_errno=HA_ERR_WRONG_COMMAND);
1375
1412
 
1376
1413
  if (!share->tina_write_opened)
1377
1414
    if (init_tina_writer())
1390
1427
}
1391
1428
 
1392
1429
/*
 
1430
  Called by the database to lock the table. Keep in mind that this
 
1431
  is an internal lock.
 
1432
*/
 
1433
THR_LOCK_DATA **ha_tina::store_lock(Session *,
 
1434
                                    THR_LOCK_DATA **to,
 
1435
                                    enum thr_lock_type lock_type)
 
1436
{
 
1437
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1438
    lock.type=lock_type;
 
1439
  *to++= &lock;
 
1440
  return to;
 
1441
}
 
1442
 
 
1443
/*
1393
1444
  Create a table. You do not want to leave the table open after a call to
1394
1445
  this (the database will call ::open() if it needs to).
1395
1446
*/
1396
1447
 
1397
 
int Tina::doCreateTable(Session &session,
1398
 
                        Table& table_arg,
1399
 
                        drizzled::TableIdentifier &identifier,
1400
 
                        drizzled::message::Table &create_proto)
 
1448
int ha_tina::create(const char *name, Table *table_arg, HA_CREATE_INFO *)
1401
1449
{
1402
1450
  char name_buff[FN_REFLEN];
1403
 
  int create_file;
 
1451
  File create_file;
1404
1452
 
1405
1453
  /*
1406
1454
    check columns
1407
1455
  */
1408
 
  for (Field **field= table_arg.s->field; *field; field++)
 
1456
  for (Field **field= table_arg->s->field; *field; field++)
1409
1457
  {
1410
1458
    if ((*field)->real_maybe_null())
1411
1459
    {
1415
1463
  }
1416
1464
 
1417
1465
 
1418
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1419
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1420
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1466
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
 
1467
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1468
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1421
1469
    return(-1);
1422
1470
 
1423
1471
  write_meta_file(create_file, 0, false);
1424
 
  internal::my_close(create_file, MYF(0));
 
1472
  my_close(create_file, MYF(0));
1425
1473
 
1426
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1427
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1428
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1474
  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
 
1475
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1476
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1429
1477
    return(-1);
1430
1478
 
1431
 
  internal::my_close(create_file, MYF(0));
1432
 
 
1433
 
  session.storeTableMessage(identifier, create_proto);
1434
 
 
1435
 
  return 0;
1436
 
}
1437
 
 
1438
 
 
1439
 
DRIZZLE_DECLARE_PLUGIN
1440
 
{
1441
 
  DRIZZLE_VERSION_ID,
 
1479
  my_close(create_file, MYF(0));
 
1480
 
 
1481
  return(0);
 
1482
}
 
1483
 
 
1484
int ha_tina::check(Session* session, HA_CHECK_OPT *)
 
1485
{
 
1486
  int rc= 0;
 
1487
  unsigned char *buf;
 
1488
  const char *old_proc_info;
 
1489
  ha_rows count= share->rows_recorded;
 
1490
 
 
1491
  old_proc_info= get_session_proc_info(session);
 
1492
  set_session_proc_info(session, "Checking table");
 
1493
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1494
    return(HA_ERR_OUT_OF_MEM);
 
1495
 
 
1496
  /* position buffer to the start of the file */
 
1497
   if (init_data_file())
 
1498
     return(HA_ERR_CRASHED);
 
1499
 
 
1500
  /*
 
1501
    Local_saved_data_file_length is initialized during the lock phase.
 
1502
    Check does not use store_lock in certain cases. So, we set it
 
1503
    manually here.
 
1504
  */
 
1505
  local_saved_data_file_length= share->saved_data_file_length;
 
1506
  /* set current position to the beginning of the file */
 
1507
  current_position= next_position= 0;
 
1508
 
 
1509
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1510
 
 
1511
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1512
  while (!(rc= find_current_row(buf)))
 
1513
  {
 
1514
    session_inc_row_count(session);
 
1515
    count--;
 
1516
    current_position= next_position;
 
1517
  }
 
1518
 
 
1519
  free_root(&blobroot, MYF(0));
 
1520
 
 
1521
  free((char*)buf);
 
1522
  set_session_proc_info(session, old_proc_info);
 
1523
 
 
1524
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1525
  {
 
1526
    share->crashed= true;
 
1527
    return(HA_ADMIN_CORRUPT);
 
1528
  }
 
1529
  else
 
1530
    return(HA_ADMIN_OK);
 
1531
}
 
1532
 
 
1533
 
 
1534
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *, uint32_t)
 
1535
{
 
1536
  return COMPATIBLE_DATA_YES;
 
1537
}
 
1538
 
 
1539
drizzle_declare_plugin(csv)
 
1540
{
1442
1541
  "CSV",
1443
1542
  "1.0",
1444
1543
  "Brian Aker, MySQL AB",
1445
1544
  "CSV storage engine",
1446
1545
  PLUGIN_LICENSE_GPL,
1447
1546
  tina_init_func, /* Plugin Init */
 
1547
  tina_done_func, /* Plugin Deinit */
 
1548
  NULL,                       /* status variables                */
1448
1549
  NULL,                       /* system variables                */
1449
1550
  NULL                        /* config options                  */
1450
1551
}
1451
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1552
drizzle_declare_plugin_end;
1452
1553