~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/csv/ha_tina.cc

  • Committer: Monty Taylor
  • Date: 2009-08-17 18:46:08 UTC
  • mto: (1182.1.1 staging)
  • mto: This revision was merged to the branch mainline in revision 1183.
  • Revision ID: mordred@inaugust.com-20090817184608-0b2emowpjr9m6le7
"Fixed" the deadlock test. I'd still like someone to look at what's going on here.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
 
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
46
#include <drizzled/field/timestamp.h>
47
47
#include <drizzled/error.h>
48
48
#include <drizzled/table.h>
49
49
#include <drizzled/session.h>
50
 
#include "drizzled/internal/my_sys.h"
51
50
 
52
51
#include "ha_tina.h"
53
52
 
54
 
#include <fcntl.h>
55
 
 
56
53
#include <string>
57
 
#include <map>
58
54
 
59
55
using namespace std;
60
 
using namespace drizzled;
 
56
 
 
57
static const string engine_name("CSV");
61
58
 
62
59
/*
63
60
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
73
70
#define CSM_EXT ".CSM"               // Meta file
74
71
 
75
72
 
76
 
static int read_meta_file(int meta_file, ha_rows *rows);
77
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
73
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
74
static int free_share(TINA_SHARE *share);
 
75
static int read_meta_file(File meta_file, ha_rows *rows);
 
76
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
78
77
 
79
 
void tina_get_status(void* param, int concurrent_insert);
80
 
void tina_update_status(void* param);
81
 
bool tina_check_status(void* param);
 
78
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
79
extern "C" void tina_update_status(void* param);
 
80
extern "C" bool tina_check_status(void* param);
82
81
 
83
82
/* Stuff for shares */
84
83
pthread_mutex_t tina_mutex;
 
84
static HASH tina_open_tables;
85
85
 
86
86
/*****************************************************************************
87
87
 ** TINA tables
99
99
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
100
100
}
101
101
 
 
102
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
 
103
{
 
104
  *length=share->table_name_length;
 
105
  return (unsigned char*) share->table_name;
 
106
}
 
107
 
102
108
 
103
109
/*
104
110
  If frm_error() is called in table.cc this is called to find out what file
105
 
  extensions exist for this Cursor.
 
111
  extensions exist for this handler.
106
112
*/
107
113
static const char *ha_tina_exts[] = {
108
114
  CSV_EXT,
110
116
  NULL
111
117
};
112
118
 
113
 
class Tina : public drizzled::plugin::StorageEngine
 
119
class Tina : public StorageEngine
114
120
{
115
 
  typedef std::map<string, TinaShare*> TinaMap;
116
 
  TinaMap tina_open_tables;
117
121
public:
118
122
  Tina(const string& name_arg)
119
 
   : drizzled::plugin::StorageEngine(name_arg,
120
 
                                     HTON_TEMPORARY_ONLY |
121
 
                                     HTON_NO_AUTO_INCREMENT |
122
 
                                     HTON_SKIP_STORE_LOCK |
123
 
                                     HTON_FILE_BASED),
124
 
    tina_open_tables()
125
 
  {}
126
 
  virtual ~Tina()
127
 
  {
128
 
    pthread_mutex_destroy(&tina_mutex);
129
 
  }
130
 
 
131
 
  virtual Cursor *create(TableShare &table,
132
 
                         drizzled::memory::Root *mem_root)
133
 
  {
134
 
    return new (mem_root) ha_tina(*this, table);
 
123
   : StorageEngine(name_arg, HTON_CAN_RECREATE | HTON_TEMPORARY_ONLY | HTON_FILE_BASED) {}
 
124
  virtual handler *create(TableShare *table,
 
125
                          MEM_ROOT *mem_root)
 
126
  {
 
127
    return new (mem_root) ha_tina(this, table);
135
128
  }
136
129
 
137
130
  const char **bas_ext() const {
138
131
    return ha_tina_exts;
139
132
  }
140
133
 
141
 
  int doCreateTable(Session &,
142
 
                    Table &table_arg,
143
 
                    drizzled::TableIdentifier &identifier,
144
 
                    drizzled::message::Table&);
145
 
 
146
 
  int doGetTableDefinition(Session& session,
147
 
                           TableIdentifier &identifier,
148
 
                           drizzled::message::Table &table_message);
149
 
 
150
 
  /* Temp only engine, so do not return values. */
151
 
  void doGetTableNames(drizzled::CachedDirectory &, SchemaIdentifier&, set<string>&) { };
152
 
 
153
 
  int doDropTable(Session&, TableIdentifier &identifier);
154
 
  TinaShare *findOpenTable(const string table_name);
155
 
  void addOpenTable(const string &table_name, TinaShare *);
156
 
  void deleteOpenTable(const string &table_name);
157
 
 
158
 
 
159
 
  uint32_t max_keys()          const { return 0; }
160
 
  uint32_t max_key_parts()     const { return 0; }
161
 
  uint32_t max_key_length()    const { return 0; }
162
 
  bool doDoesTableExist(Session& session, TableIdentifier &identifier);
163
 
  int doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to);
164
 
 
165
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
166
 
                             drizzled::SchemaIdentifier &schema_identifier,
167
 
                             drizzled::TableIdentifiers &set_of_identifiers);
 
134
  int createTableImplementation(Session *, const char *table_name,
 
135
                                Table *table_arg,
 
136
                                HA_CREATE_INFO *, drizzled::message::Table*);
 
137
 
168
138
};
169
139
 
170
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
171
 
                                 drizzled::SchemaIdentifier&,
172
 
                                 drizzled::TableIdentifiers&)
173
 
{
174
 
}
175
 
 
176
 
int Tina::doRenameTable(Session &session,
177
 
                        TableIdentifier &from, TableIdentifier &to)
178
 
{
179
 
  int error= 0;
180
 
  for (const char **ext= bas_ext(); *ext ; ext++)
181
 
  {
182
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
183
 
    {
184
 
      if ((error=errno) != ENOENT)
185
 
        break;
186
 
      error= 0;
187
 
    }
188
 
  }
189
 
 
190
 
  session.renameTableMessage(from, to);
191
 
 
192
 
  return error;
193
 
}
194
 
 
195
 
bool Tina::doDoesTableExist(Session &session, TableIdentifier &identifier)
196
 
{
197
 
  return session.doesTableMessageExist(identifier);
198
 
}
199
 
 
200
 
 
201
 
int Tina::doDropTable(Session &session,
202
 
                      TableIdentifier &identifier)
203
 
{
204
 
  int error= 0;
205
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
206
 
  char buff[FN_REFLEN];
207
 
 
208
 
  for (const char **ext= bas_ext(); *ext ; ext++)
209
 
  {
210
 
    internal::fn_format(buff, identifier.getPath().c_str(), "", *ext,
211
 
                        MY_UNPACK_FILENAME|MY_APPEND_EXT);
212
 
    if (internal::my_delete_with_symlink(buff, MYF(0)))
213
 
    {
214
 
      if ((error= errno) != ENOENT)
215
 
        break;
216
 
    }
217
 
    else
218
 
      enoent_or_zero= 0;                        // No error for ENOENT
219
 
    error= enoent_or_zero;
220
 
  }
221
 
 
222
 
  session.removeTableMessage(identifier);
223
 
 
224
 
  return error;
225
 
}
226
 
 
227
 
TinaShare *Tina::findOpenTable(const string table_name)
228
 
{
229
 
  TinaMap::iterator find_iter=
230
 
    tina_open_tables.find(table_name);
231
 
 
232
 
  if (find_iter != tina_open_tables.end())
233
 
    return (*find_iter).second;
234
 
  else
235
 
    return NULL;
236
 
}
237
 
 
238
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
239
 
{
240
 
  tina_open_tables[table_name]= share;
241
 
}
242
 
 
243
 
void Tina::deleteOpenTable(const string &table_name)
244
 
{
245
 
  tina_open_tables.erase(table_name);
246
 
}
247
 
 
248
 
 
249
 
int Tina::doGetTableDefinition(Session &session,
250
 
                               drizzled::TableIdentifier &identifier,
251
 
                               drizzled::message::Table &table_message)
252
 
{
253
 
  if (session.getTableMessage(identifier, table_message))
254
 
    return EEXIST;
255
 
 
256
 
  return ENOENT;
257
 
}
258
 
 
259
 
 
260
140
static Tina *tina_engine= NULL;
261
141
 
262
 
static int tina_init_func(drizzled::plugin::Context &context)
 
142
static int tina_init_func(PluginRegistry &registry)
263
143
{
264
144
 
265
 
  tina_engine= new Tina("CSV");
266
 
  context.add(tina_engine);
 
145
  tina_engine= new Tina(engine_name);
 
146
  registry.add(tina_engine);
267
147
 
268
148
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
269
 
  return 0;
270
 
}
271
 
 
272
 
 
273
 
 
274
 
TinaShare::TinaShare(const char *table_name_arg)
275
 
  : table_name(table_name_arg), use_count(0), saved_data_file_length(0),
276
 
    update_file_opened(false), tina_write_opened(false),
277
 
    crashed(false), rows_recorded(0), data_file_version(0)
278
 
{
279
 
  thr_lock_init(&lock);
280
 
  internal::fn_format(data_file_name, table_name_arg, "", CSV_EXT,
281
 
            MY_REPLACE_EXT|MY_UNPACK_FILENAME);
282
 
}
283
 
 
284
 
TinaShare::~TinaShare()
285
 
{
286
 
  thr_lock_delete(&lock);
287
 
  pthread_mutex_destroy(&mutex);
288
 
}
 
149
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
150
                   (hash_get_key) tina_get_key,0,0);
 
151
  return 0;
 
152
}
 
153
 
 
154
static int tina_done_func(PluginRegistry &registry)
 
155
{
 
156
  registry.remove(tina_engine);
 
157
  delete tina_engine;
 
158
 
 
159
  hash_free(&tina_open_tables);
 
160
  pthread_mutex_destroy(&tina_mutex);
 
161
 
 
162
  return 0;
 
163
}
 
164
 
289
165
 
290
166
/*
291
167
  Simple lock controls.
292
168
*/
293
 
TinaShare *ha_tina::get_share(const char *table_name)
 
169
static TINA_SHARE *get_share(const char *table_name, Table *)
294
170
{
295
 
  pthread_mutex_lock(&tina_mutex);
296
 
 
297
 
  Tina *a_tina= static_cast<Tina *>(engine);
298
 
  share= a_tina->findOpenTable(table_name);
299
 
 
 
171
  TINA_SHARE *share;
300
172
  char meta_file_name[FN_REFLEN];
301
173
  struct stat file_stat;
 
174
  char *tmp_name;
 
175
  uint32_t length;
 
176
 
 
177
  pthread_mutex_lock(&tina_mutex);
 
178
  length=(uint) strlen(table_name);
302
179
 
303
180
  /*
304
181
    If share is not present in the hash, create a new share and
305
182
    initialize its members.
306
183
  */
307
 
  if (! share)
 
184
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
185
                                        (unsigned char*) table_name,
 
186
                                       length)))
308
187
  {
309
 
    share= new TinaShare(table_name);
310
 
 
311
 
    if (share == NULL)
 
188
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
189
                         &share, sizeof(*share),
 
190
                         &tmp_name, length+1,
 
191
                         NULL))
312
192
    {
313
193
      pthread_mutex_unlock(&tina_mutex);
314
194
      return NULL;
315
195
    }
316
196
 
317
 
    internal::fn_format(meta_file_name, table_name, "", CSM_EXT,
 
197
    share->use_count= 0;
 
198
    share->table_name_length= length;
 
199
    share->table_name= tmp_name;
 
200
    share->crashed= false;
 
201
    share->rows_recorded= 0;
 
202
    share->update_file_opened= false;
 
203
    share->tina_write_opened= false;
 
204
    share->data_file_version= 0;
 
205
    strcpy(share->table_name, table_name);
 
206
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
207
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
208
    fn_format(meta_file_name, table_name, "", CSM_EXT,
318
209
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
319
210
 
320
211
    if (stat(share->data_file_name, &file_stat))
321
 
    {
322
 
      pthread_mutex_unlock(&tina_mutex);
323
 
      delete share;
324
 
      return NULL;
325
 
    }
326
 
  
 
212
      goto error;
327
213
    share->saved_data_file_length= file_stat.st_size;
328
214
 
329
 
    a_tina->addOpenTable(share->table_name, share);
330
 
 
 
215
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
216
      goto error;
 
217
    thr_lock_init(&share->lock);
331
218
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
332
219
 
333
220
    /*
336
223
      Usually this will result in auto-repair, and we will get a good
337
224
      meta-file in the end.
338
225
    */
339
 
    if ((share->meta_file= internal::my_open(meta_file_name,
340
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
226
    if ((share->meta_file= my_open(meta_file_name,
 
227
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
341
228
      share->crashed= true;
342
229
 
343
230
    /*
351
238
  pthread_mutex_unlock(&tina_mutex);
352
239
 
353
240
  return share;
 
241
 
 
242
error:
 
243
  pthread_mutex_unlock(&tina_mutex);
 
244
  free((unsigned char*) share);
 
245
 
 
246
  return NULL;
354
247
}
355
248
 
356
249
 
373
266
    non-zero - error occurred
374
267
*/
375
268
 
376
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
269
static int read_meta_file(File meta_file, ha_rows *rows)
377
270
{
378
271
  unsigned char meta_buffer[META_BUFFER_SIZE];
379
272
  unsigned char *ptr= meta_buffer;
380
273
 
381
274
  lseek(meta_file, 0, SEEK_SET);
382
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
275
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
383
276
      != META_BUFFER_SIZE)
384
277
    return(HA_ERR_CRASHED_ON_USAGE);
385
278
 
401
294
      ((bool)(*ptr)== true))
402
295
    return(HA_ERR_CRASHED_ON_USAGE);
403
296
 
404
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
297
  my_sync(meta_file, MYF(MY_WME));
405
298
 
406
299
  return(0);
407
300
}
426
319
    non-zero - error occurred
427
320
*/
428
321
 
429
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
322
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
430
323
{
431
324
  unsigned char meta_buffer[META_BUFFER_SIZE];
432
325
  unsigned char *ptr= meta_buffer;
446
339
  *ptr= (unsigned char)dirty;
447
340
 
448
341
  lseek(meta_file, 0, SEEK_SET);
449
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
342
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
450
343
      != META_BUFFER_SIZE)
451
344
    return(-1);
452
345
 
453
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
346
  my_sync(meta_file, MYF(MY_WME));
454
347
 
455
348
  return(0);
456
349
}
457
350
 
 
351
bool ha_tina::check_and_repair(Session *session)
 
352
{
 
353
  HA_CHECK_OPT check_opt;
 
354
 
 
355
  check_opt.init();
 
356
 
 
357
  return(repair(session, &check_opt));
 
358
}
 
359
 
 
360
 
458
361
int ha_tina::init_tina_writer()
459
362
{
460
363
  /*
465
368
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
466
369
 
467
370
  if ((share->tina_write_filedes=
468
 
        internal::my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
 
371
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
469
372
  {
470
373
    share->crashed= true;
471
374
    return(1);
476
379
}
477
380
 
478
381
 
 
382
bool ha_tina::is_crashed() const
 
383
{
 
384
  return(share->crashed);
 
385
}
 
386
 
479
387
/*
480
388
  Free lock controls.
481
389
*/
482
 
int ha_tina::free_share()
 
390
static int free_share(TINA_SHARE *share)
483
391
{
484
392
  pthread_mutex_lock(&tina_mutex);
485
393
  int result_code= 0;
487
395
    /* Write the meta file. Mark it as crashed if needed. */
488
396
    (void)write_meta_file(share->meta_file, share->rows_recorded,
489
397
                          share->crashed ? true :false);
490
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
398
    if (my_close(share->meta_file, MYF(0)))
491
399
      result_code= 1;
492
400
    if (share->tina_write_opened)
493
401
    {
494
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
402
      if (my_close(share->tina_write_filedes, MYF(0)))
495
403
        result_code= 1;
496
404
      share->tina_write_opened= false;
497
405
    }
498
406
 
499
 
    Tina *a_tina= static_cast<Tina *>(engine);
500
 
    a_tina->deleteOpenTable(share->table_name);
501
 
    delete share;
 
407
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
408
    thr_lock_delete(&share->lock);
 
409
    pthread_mutex_destroy(&share->mutex);
 
410
    free((unsigned char*) share);
502
411
  }
503
412
  pthread_mutex_unlock(&tina_mutex);
504
413
 
545
454
 
546
455
 
547
456
 
548
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
549
 
  :Cursor(engine_arg, table_arg),
 
457
ha_tina::ha_tina(StorageEngine *engine_arg, TableShare *table_arg)
 
458
  :handler(engine_arg, table_arg),
550
459
  /*
551
 
    These definitions are found in Cursor.h
 
460
    These definitions are found in handler.h
552
461
    They are not probably completely right.
553
462
  */
554
463
  current_position(0), next_position(0), local_saved_data_file_length(0),
705
614
  int eoln_len;
706
615
  int error;
707
616
 
708
 
  free_root(&blobroot, MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
617
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
709
618
 
710
619
  /*
711
620
    We do not read further then local_saved_data_file_length in order
892
801
  this will not be called for every request. Any sort of positions
893
802
  that need to be reset should be kept in the ::extra() call.
894
803
*/
895
 
int ha_tina::open(const char *name, int, uint32_t)
 
804
int ha_tina::open(const char *name, int, uint32_t open_options)
896
805
{
897
 
  if (!(share= get_share(name)))
 
806
  if (!(share= get_share(name, table)))
898
807
    return(ENOENT);
899
808
 
900
 
  if (share->crashed)
 
809
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
901
810
  {
902
 
    free_share();
 
811
    free_share(share);
903
812
    return(HA_ERR_CRASHED_ON_USAGE);
904
813
  }
905
814
 
906
815
  local_data_file_version= share->data_file_version;
907
 
  if ((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
 
816
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
908
817
    return(0);
909
818
 
910
819
  /*
911
 
    Init locking. Pass Cursor object to the locking routines,
 
820
    Init locking. Pass handler object to the locking routines,
912
821
    so that they could save/update local_saved_data_file_length value
913
822
    during locking. This is needed to enable concurrent inserts.
914
823
  */
930
839
int ha_tina::close(void)
931
840
{
932
841
  int rc= 0;
933
 
  rc= internal::my_close(data_file, MYF(0));
934
 
  return(free_share() || rc);
 
842
  rc= my_close(data_file, MYF(0));
 
843
  return(free_share(share) || rc);
935
844
}
936
845
 
937
846
/*
938
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
847
  This is an INSERT. At the moment this handler just seeks to the end
939
848
  of the file and appends the data. In an error case it really should
940
849
  just truncate to the original position (this is not done yet).
941
850
*/
946
855
  if (share->crashed)
947
856
      return(HA_ERR_CRASHED_ON_USAGE);
948
857
 
949
 
  ha_statistic_increment(&system_status_var::ha_write_count);
 
858
  ha_statistic_increment(&SSV::ha_write_count);
950
859
 
951
860
  size= encode_quote(buf);
952
861
 
955
864
      return(-1);
956
865
 
957
866
   /* use pwrite, as concurrent reader could have changed the position */
958
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
867
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
959
868
               MYF(MY_WME | MY_NABP)))
960
869
    return(-1);
961
870
 
980
889
  if (!share->update_file_opened)
981
890
  {
982
891
    if ((update_temp_file=
983
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
892
           my_create(fn_format(updated_fname, share->table_name,
984
893
                               "", CSN_EXT,
985
894
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
986
895
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
995
904
  This is called for an update.
996
905
  Make sure you put in code to increment the auto increment, also
997
906
  update any timestamp data. Currently auto increment is not being
998
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
907
  fixed since autoincrements have yet to be added to this table handler.
999
908
  This will be called in a table scan right before the previous ::rnd_next()
1000
909
  call.
1001
910
*/
1004
913
  int size;
1005
914
  int rc= -1;
1006
915
 
1007
 
  ha_statistic_increment(&system_status_var::ha_update_count);
 
916
  ha_statistic_increment(&SSV::ha_update_count);
1008
917
 
1009
918
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1010
919
    table->timestamp_field->set_time();
1024
933
  if (open_update_temp_file_if_needed())
1025
934
    goto err;
1026
935
 
1027
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
936
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
1028
937
               MYF(MY_WME | MY_NABP)))
1029
938
    goto err;
1030
939
  temp_file_length+= size;
1039
948
  Deletes a row. First the database will find the row, and then call this
1040
949
  method. In the case of a table scan, the previous call to this will be
1041
950
  the ::rnd_next() that found this row.
1042
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
951
  The exception to this is an ORDER BY. This will cause the table handler
1043
952
  to walk the table noting the positions of all rows that match a query.
1044
953
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
1045
954
  DESC, ASC).
1046
955
*/
1047
956
int ha_tina::delete_row(const unsigned char *)
1048
957
{
1049
 
  ha_statistic_increment(&system_status_var::ha_delete_count);
 
958
  ha_statistic_increment(&SSV::ha_delete_count);
1050
959
 
1051
960
  if (chain_append())
1052
961
    return(-1);
1080
989
  if (local_data_file_version != share->data_file_version)
1081
990
  {
1082
991
    local_data_file_version= share->data_file_version;
1083
 
    if (internal::my_close(data_file, MYF(0)) ||
1084
 
        (data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
 
992
    if (my_close(data_file, MYF(0)) ||
 
993
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
1085
994
      return 1;
1086
995
  }
1087
996
  file_buff->init_buff(data_file);
1093
1002
  All table scans call this first.
1094
1003
  The order of a table scan is:
1095
1004
 
 
1005
  ha_tina::store_lock
 
1006
  ha_tina::external_lock
1096
1007
  ha_tina::info
1097
1008
  ha_tina::rnd_init
1098
1009
  ha_tina::extra
1108
1019
  ha_tina::rnd_next
1109
1020
  ha_tina::extra
1110
1021
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1022
  ha_tina::external_lock
1111
1023
  ha_tina::extra
1112
1024
  ENUM HA_EXTRA_RESET   Reset database to after open
1113
1025
 
1128
1040
  records_is_known= 0;
1129
1041
  chain_ptr= chain;
1130
1042
 
1131
 
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE);
 
1043
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1132
1044
 
1133
1045
  return(0);
1134
1046
}
1143
1055
  reserved for null count.
1144
1056
  Basically this works as a mask for which rows are nulled (compared to just
1145
1057
  empty).
1146
 
  This table Cursor doesn't do nulls and does not know the difference between
1147
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1058
  This table handler doesn't do nulls and does not know the difference between
 
1059
  NULL and "". This is ok since this table handler is for spreadsheets and
1148
1060
  they don't know about them either :)
1149
1061
*/
1150
1062
int ha_tina::rnd_next(unsigned char *buf)
1154
1066
  if (share->crashed)
1155
1067
      return(HA_ERR_CRASHED_ON_USAGE);
1156
1068
 
1157
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1069
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1158
1070
 
1159
1071
  current_position= next_position;
1160
1072
 
1180
1092
*/
1181
1093
void ha_tina::position(const unsigned char *)
1182
1094
{
1183
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1095
  my_store_ptr(ref, ref_length, current_position);
1184
1096
  return;
1185
1097
}
1186
1098
 
1187
1099
 
1188
1100
/*
1189
1101
  Used to fetch a row from a posiion stored with ::position().
1190
 
  internal::my_get_ptr() retrieves the data for you.
 
1102
  my_get_ptr() retrieves the data for you.
1191
1103
*/
1192
1104
 
1193
1105
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1194
1106
{
1195
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1196
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1107
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1108
  current_position= (off_t)my_get_ptr(pos,ref_length);
1197
1109
  return(find_current_row(buf));
1198
1110
}
1199
1111
 
1200
1112
/*
1201
1113
  ::info() is used to return information to the optimizer.
1202
 
  Currently this table Cursor doesn't implement most of the fields
 
1114
  Currently this table handler doesn't implement most of the fields
1203
1115
  really needed. SHOW also makes use of this data
1204
1116
*/
1205
1117
int ha_tina::info(uint32_t)
1254
1166
      The sort is needed when there were updates/deletes with random orders.
1255
1167
      It sorts so that we move the firts blocks to the beginning.
1256
1168
    */
1257
 
    internal::my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1258
 
                       (qsort_cmp)sort_set);
 
1169
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1170
             (qsort_cmp)sort_set);
1259
1171
 
1260
1172
    off_t write_begin= 0, write_end;
1261
1173
 
1276
1188
      /* if there is something to write, write it */
1277
1189
      if (write_length)
1278
1190
      {
1279
 
        if (internal::my_write(update_temp_file,
 
1191
        if (my_write(update_temp_file,
1280
1192
                     (unsigned char*) (file_buff->ptr() +
1281
1193
                               (write_begin - file_buff->start())),
1282
1194
                     (size_t)write_length, MYF_RW))
1299
1211
 
1300
1212
    }
1301
1213
 
1302
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1303
 
        internal::my_close(update_temp_file, MYF(0)))
 
1214
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1215
        my_close(update_temp_file, MYF(0)))
1304
1216
      return(-1);
1305
1217
 
1306
1218
    share->update_file_opened= false;
1307
1219
 
1308
1220
    if (share->tina_write_opened)
1309
1221
    {
1310
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1222
      if (my_close(share->tina_write_filedes, MYF(0)))
1311
1223
        return(-1);
1312
1224
      /*
1313
1225
        Mark that the writer fd is closed, so that init_tina_writer()
1320
1232
      Close opened fildes's. Then move updated file in place
1321
1233
      of the old datafile.
1322
1234
    */
1323
 
    if (internal::my_close(data_file, MYF(0)) ||
1324
 
        internal::my_rename(internal::fn_format(updated_fname,
1325
 
                                                share->table_name.c_str(),
1326
 
                                                "", CSN_EXT,
1327
 
                                                MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1328
 
                            share->data_file_name, MYF(0)))
 
1235
    if (my_close(data_file, MYF(0)) ||
 
1236
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1237
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1238
                  share->data_file_name, MYF(0)))
1329
1239
      return(-1);
1330
1240
 
1331
1241
    /* Open the file again */
1332
 
    if (((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
 
1242
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1333
1243
      return(-1);
1334
1244
    /*
1335
1245
      As we reopened the data file, increase share->data_file_version
1356
1266
 
1357
1267
  return(0);
1358
1268
error:
1359
 
  internal::my_close(update_temp_file, MYF(0));
 
1269
  my_close(update_temp_file, MYF(0));
1360
1270
  share->update_file_opened= false;
1361
1271
  return(-1);
1362
1272
}
1363
1273
 
1364
1274
 
1365
1275
/*
 
1276
  Repair CSV table in the case, it is crashed.
 
1277
 
 
1278
  SYNOPSIS
 
1279
    repair()
 
1280
    session         The thread, performing repair
 
1281
    check_opt   The options for repair. We do not use it currently.
 
1282
 
 
1283
  DESCRIPTION
 
1284
    If the file is empty, change # of rows in the file and complete recovery.
 
1285
    Otherwise, scan the table looking for bad rows. If none were found,
 
1286
    we mark file as a good one and return. If a bad row was encountered,
 
1287
    we truncate the datafile up to the last good row.
 
1288
 
 
1289
   TODO: Make repair more clever - it should try to recover subsequent
 
1290
         rows (after the first bad one) as well.
 
1291
*/
 
1292
 
 
1293
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
 
1294
{
 
1295
  char repaired_fname[FN_REFLEN];
 
1296
  unsigned char *buf;
 
1297
  File repair_file;
 
1298
  int rc;
 
1299
  ha_rows rows_repaired= 0;
 
1300
  off_t write_begin= 0, write_end;
 
1301
 
 
1302
  /* empty file */
 
1303
  if (!share->saved_data_file_length)
 
1304
  {
 
1305
    share->rows_recorded= 0;
 
1306
    goto end;
 
1307
  }
 
1308
 
 
1309
  /* Don't assert in field::val() functions */
 
1310
  table->use_all_columns();
 
1311
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1312
    return(HA_ERR_OUT_OF_MEM);
 
1313
 
 
1314
  /* position buffer to the start of the file */
 
1315
  if (init_data_file())
 
1316
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1317
 
 
1318
  /*
 
1319
    Local_saved_data_file_length is initialized during the lock phase.
 
1320
    Sometimes this is not getting executed before ::repair (e.g. for
 
1321
    the log tables). We set it manually here.
 
1322
  */
 
1323
  local_saved_data_file_length= share->saved_data_file_length;
 
1324
  /* set current position to the beginning of the file */
 
1325
  current_position= next_position= 0;
 
1326
 
 
1327
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1328
 
 
1329
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1330
  while (!(rc= find_current_row(buf)))
 
1331
  {
 
1332
    session_inc_row_count(session);
 
1333
    rows_repaired++;
 
1334
    current_position= next_position;
 
1335
  }
 
1336
 
 
1337
  free_root(&blobroot, MYF(0));
 
1338
 
 
1339
  free((char*)buf);
 
1340
 
 
1341
  if (rc == HA_ERR_END_OF_FILE)
 
1342
  {
 
1343
    /*
 
1344
      All rows were read ok until end of file, the file does not need repair.
 
1345
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1346
      to the current amount of rows.
 
1347
    */
 
1348
    share->rows_recorded= rows_repaired;
 
1349
    goto end;
 
1350
  }
 
1351
 
 
1352
  /*
 
1353
    Otherwise we've encountered a bad row => repair is needed.
 
1354
    Let us create a temporary file.
 
1355
  */
 
1356
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1357
                                        "", CSN_EXT,
 
1358
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1359
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1360
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1361
 
 
1362
  file_buff->init_buff(data_file);
 
1363
 
 
1364
 
 
1365
  /* we just truncated the file up to the first bad row. update rows count. */
 
1366
  share->rows_recorded= rows_repaired;
 
1367
 
 
1368
  /* write repaired file */
 
1369
  while (1)
 
1370
  {
 
1371
    write_end= std::min(file_buff->end(), current_position);
 
1372
 
 
1373
    off_t write_length= write_end - write_begin;
 
1374
    if ((uint64_t)write_length > SIZE_MAX)
 
1375
    {
 
1376
      return -1;
 
1377
    }
 
1378
    if ((write_length) &&
 
1379
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1380
                  (size_t)write_length, MYF_RW)))
 
1381
      return(-1);
 
1382
 
 
1383
    write_begin= write_end;
 
1384
    if (write_end== current_position)
 
1385
      break;
 
1386
    else
 
1387
      file_buff->read_next(); /* shift the buffer */
 
1388
  }
 
1389
 
 
1390
  /*
 
1391
    Close the files and rename repaired file to the datafile.
 
1392
    We have to close the files, as on Windows one cannot rename
 
1393
    a file, which descriptor is still open. EACCES will be returned
 
1394
    when trying to delete the "to"-file in my_rename().
 
1395
  */
 
1396
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1397
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1398
    return(-1);
 
1399
 
 
1400
  /* Open the file again, it should now be repaired */
 
1401
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1402
                          MYF(0))) == -1)
 
1403
     return(-1);
 
1404
 
 
1405
  /* Set new file size. The file size will be updated by ::update_status() */
 
1406
  local_saved_data_file_length= (size_t) current_position;
 
1407
 
 
1408
end:
 
1409
  share->crashed= false;
 
1410
  return(HA_ADMIN_OK);
 
1411
}
 
1412
 
 
1413
/*
1366
1414
  DELETE without WHERE calls this
1367
1415
*/
1368
1416
 
1371
1419
  int rc;
1372
1420
 
1373
1421
  if (!records_is_known)
1374
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1422
    return(my_errno=HA_ERR_WRONG_COMMAND);
1375
1423
 
1376
1424
  if (!share->tina_write_opened)
1377
1425
    if (init_tina_writer())
1390
1438
}
1391
1439
 
1392
1440
/*
 
1441
  Called by the database to lock the table. Keep in mind that this
 
1442
  is an internal lock.
 
1443
*/
 
1444
THR_LOCK_DATA **ha_tina::store_lock(Session *,
 
1445
                                    THR_LOCK_DATA **to,
 
1446
                                    enum thr_lock_type lock_type)
 
1447
{
 
1448
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1449
    lock.type=lock_type;
 
1450
  *to++= &lock;
 
1451
  return to;
 
1452
}
 
1453
 
 
1454
/*
1393
1455
  Create a table. You do not want to leave the table open after a call to
1394
1456
  this (the database will call ::open() if it needs to).
1395
1457
*/
1396
1458
 
1397
 
int Tina::doCreateTable(Session &session,
1398
 
                        Table& table_arg,
1399
 
                        drizzled::TableIdentifier &identifier,
1400
 
                        drizzled::message::Table &create_proto)
 
1459
int Tina::createTableImplementation(Session *, const char *table_name,
 
1460
                                    Table *table_arg,
 
1461
                                    HA_CREATE_INFO *, drizzled::message::Table*)
1401
1462
{
1402
1463
  char name_buff[FN_REFLEN];
1403
 
  int create_file;
 
1464
  File create_file;
1404
1465
 
1405
1466
  /*
1406
1467
    check columns
1407
1468
  */
1408
 
  for (Field **field= table_arg.s->field; *field; field++)
 
1469
  for (Field **field= table_arg->s->field; *field; field++)
1409
1470
  {
1410
1471
    if ((*field)->real_maybe_null())
1411
1472
    {
1415
1476
  }
1416
1477
 
1417
1478
 
1418
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1419
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1420
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1479
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
 
1480
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1481
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1421
1482
    return(-1);
1422
1483
 
1423
1484
  write_meta_file(create_file, 0, false);
1424
 
  internal::my_close(create_file, MYF(0));
 
1485
  my_close(create_file, MYF(0));
1425
1486
 
1426
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1427
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1428
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1487
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
 
1488
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1489
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1429
1490
    return(-1);
1430
1491
 
1431
 
  internal::my_close(create_file, MYF(0));
1432
 
 
1433
 
  session.storeTableMessage(identifier, create_proto);
1434
 
 
1435
 
  return 0;
1436
 
}
1437
 
 
1438
 
 
1439
 
DRIZZLE_DECLARE_PLUGIN
1440
 
{
1441
 
  DRIZZLE_VERSION_ID,
 
1492
  my_close(create_file, MYF(0));
 
1493
 
 
1494
  return(0);
 
1495
}
 
1496
 
 
1497
int ha_tina::check(Session* session, HA_CHECK_OPT *)
 
1498
{
 
1499
  int rc= 0;
 
1500
  unsigned char *buf;
 
1501
  const char *old_proc_info;
 
1502
  ha_rows count= share->rows_recorded;
 
1503
 
 
1504
  old_proc_info= get_session_proc_info(session);
 
1505
  set_session_proc_info(session, "Checking table");
 
1506
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1507
    return(HA_ERR_OUT_OF_MEM);
 
1508
 
 
1509
  /* position buffer to the start of the file */
 
1510
   if (init_data_file())
 
1511
     return(HA_ERR_CRASHED);
 
1512
 
 
1513
  /*
 
1514
    Local_saved_data_file_length is initialized during the lock phase.
 
1515
    Check does not use store_lock in certain cases. So, we set it
 
1516
    manually here.
 
1517
  */
 
1518
  local_saved_data_file_length= share->saved_data_file_length;
 
1519
  /* set current position to the beginning of the file */
 
1520
  current_position= next_position= 0;
 
1521
 
 
1522
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1523
 
 
1524
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1525
  while (!(rc= find_current_row(buf)))
 
1526
  {
 
1527
    session_inc_row_count(session);
 
1528
    count--;
 
1529
    current_position= next_position;
 
1530
  }
 
1531
 
 
1532
  free_root(&blobroot, MYF(0));
 
1533
 
 
1534
  free((char*)buf);
 
1535
  set_session_proc_info(session, old_proc_info);
 
1536
 
 
1537
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1538
  {
 
1539
    share->crashed= true;
 
1540
    return(HA_ADMIN_CORRUPT);
 
1541
  }
 
1542
  else
 
1543
    return(HA_ADMIN_OK);
 
1544
}
 
1545
 
 
1546
 
 
1547
drizzle_declare_plugin(csv)
 
1548
{
1442
1549
  "CSV",
1443
1550
  "1.0",
1444
1551
  "Brian Aker, MySQL AB",
1445
1552
  "CSV storage engine",
1446
1553
  PLUGIN_LICENSE_GPL,
1447
1554
  tina_init_func, /* Plugin Init */
 
1555
  tina_done_func, /* Plugin Deinit */
 
1556
  NULL,                       /* status variables                */
1448
1557
  NULL,                       /* system variables                */
1449
1558
  NULL                        /* config options                  */
1450
1559
}
1451
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1560
drizzle_declare_plugin_end;
1452
1561