~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/csv/ha_tina.cc

  • Committer: Stewart Smith
  • Date: 2008-11-21 16:06:07 UTC
  • mto: This revision was merged to the branch mainline in revision 593.
  • Revision ID: stewart@flamingspork.com-20081121160607-n6gdlt013spuo54r
remove mysql_frm_type
and fix engines to return correct value from delete_table when table doesn't exist.
(it should be ENOENT).

Also fix up some tests that manipulated frm files by hand. These tests are no longer valid and will need to be rewritten in the not too distant future.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
 
43
#include <drizzled/common_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
46
#include <drizzled/field/timestamp.h>
 
47
#include <storage/csv/ha_tina.h>
47
48
#include <drizzled/error.h>
48
49
#include <drizzled/table.h>
49
50
#include <drizzled/session.h>
50
 
#include "drizzled/internal/my_sys.h"
51
 
 
52
 
#include "ha_tina.h"
53
 
 
54
 
#include <fcntl.h>
55
 
 
56
 
#include <algorithm>
57
 
#include <vector>
58
 
#include <string>
59
 
#include <map>
60
 
 
61
 
using namespace std;
62
 
using namespace drizzled;
63
51
 
64
52
/*
65
53
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
75
63
#define CSM_EXT ".CSM"               // Meta file
76
64
 
77
65
 
78
 
static int read_meta_file(int meta_file, ha_rows *rows);
79
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
66
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
67
static int free_share(TINA_SHARE *share);
 
68
static int read_meta_file(File meta_file, ha_rows *rows);
 
69
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
70
 
 
71
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
72
extern "C" void tina_update_status(void* param);
 
73
extern "C" bool tina_check_status(void* param);
80
74
 
81
75
/* Stuff for shares */
82
76
pthread_mutex_t tina_mutex;
 
77
static HASH tina_open_tables;
 
78
static handler *tina_create_handler(handlerton *hton,
 
79
                                    TABLE_SHARE *table, 
 
80
                                    MEM_ROOT *mem_root);
 
81
 
83
82
 
84
83
/*****************************************************************************
85
84
 ** TINA tables
86
85
 *****************************************************************************/
87
86
 
88
87
/*
89
 
  If frm_error() is called in table.cc this is called to find out what file
90
 
  extensions exist for this Cursor.
 
88
  Used for sorting chains with qsort().
91
89
*/
92
 
static const char *ha_tina_exts[] = {
93
 
  CSV_EXT,
94
 
  CSM_EXT,
95
 
  NULL
96
 
};
97
 
 
98
 
class Tina : public drizzled::plugin::StorageEngine
99
 
{
100
 
  typedef std::map<string, TinaShare*> TinaMap;
101
 
  TinaMap tina_open_tables;
102
 
public:
103
 
  Tina(const string& name_arg)
104
 
   : drizzled::plugin::StorageEngine(name_arg,
105
 
                                     HTON_TEMPORARY_ONLY |
106
 
                                     HTON_NO_AUTO_INCREMENT |
107
 
                                     HTON_SKIP_STORE_LOCK),
108
 
    tina_open_tables()
109
 
  {}
110
 
  virtual ~Tina()
111
 
  {
112
 
    pthread_mutex_destroy(&tina_mutex);
113
 
  }
114
 
 
115
 
  virtual Cursor *create(TableShare &table)
116
 
  {
117
 
    return new ha_tina(*this, table);
118
 
  }
119
 
 
120
 
  const char **bas_ext() const {
121
 
    return ha_tina_exts;
122
 
  }
123
 
 
124
 
  int doCreateTable(Session &,
125
 
                    Table &table_arg,
126
 
                    const drizzled::TableIdentifier &identifier,
127
 
                    drizzled::message::Table&);
128
 
 
129
 
  int doGetTableDefinition(Session& session,
130
 
                           const drizzled::TableIdentifier &identifier,
131
 
                           drizzled::message::Table &table_message);
132
 
 
133
 
  int doDropTable(Session&, const drizzled::TableIdentifier &identifier);
134
 
  TinaShare *findOpenTable(const string table_name);
135
 
  void addOpenTable(const string &table_name, TinaShare *);
136
 
  void deleteOpenTable(const string &table_name);
137
 
 
138
 
 
139
 
  uint32_t max_keys()          const { return 0; }
140
 
  uint32_t max_key_parts()     const { return 0; }
141
 
  uint32_t max_key_length()    const { return 0; }
142
 
  bool doDoesTableExist(Session& session, const drizzled::TableIdentifier &identifier);
143
 
  int doRenameTable(Session&, const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to);
144
 
 
145
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
146
 
                             const drizzled::SchemaIdentifier &schema_identifier,
147
 
                             drizzled::TableIdentifiers &set_of_identifiers);
148
 
};
149
 
 
150
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
151
 
                                 const drizzled::SchemaIdentifier&,
152
 
                                 drizzled::TableIdentifiers&)
153
 
{
154
 
}
155
 
 
156
 
int Tina::doRenameTable(Session &session,
157
 
                        const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to)
158
 
{
159
 
  int error= 0;
160
 
  for (const char **ext= bas_ext(); *ext ; ext++)
161
 
  {
162
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
 
    {
164
 
      if ((error=errno) != ENOENT)
165
 
        break;
166
 
      error= 0;
167
 
    }
168
 
  }
169
 
 
170
 
  session.renameTableMessage(from, to);
171
 
 
172
 
  return error;
173
 
}
174
 
 
175
 
bool Tina::doDoesTableExist(Session &session, const drizzled::TableIdentifier &identifier)
176
 
{
177
 
  return session.doesTableMessageExist(identifier);
178
 
}
179
 
 
180
 
 
181
 
int Tina::doDropTable(Session &session,
182
 
                      const drizzled::TableIdentifier &identifier)
183
 
{
184
 
  int error= 0;
185
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
186
 
 
187
 
  for (const char **ext= bas_ext(); *ext ; ext++)
188
 
  {
189
 
    std::string full_name= identifier.getPath();
190
 
    full_name.append(*ext);
191
 
 
192
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
193
 
    {
194
 
      if ((error= errno) != ENOENT)
195
 
        break;
196
 
    }
197
 
    else
198
 
    {
199
 
      enoent_or_zero= 0;                        // No error for ENOENT
200
 
    }
201
 
    error= enoent_or_zero;
202
 
  }
203
 
 
204
 
  session.removeTableMessage(identifier);
205
 
 
206
 
  return error;
207
 
}
208
 
 
209
 
TinaShare *Tina::findOpenTable(const string table_name)
210
 
{
211
 
  TinaMap::iterator find_iter=
212
 
    tina_open_tables.find(table_name);
213
 
 
214
 
  if (find_iter != tina_open_tables.end())
215
 
    return (*find_iter).second;
216
 
  else
217
 
    return NULL;
218
 
}
219
 
 
220
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
221
 
{
222
 
  tina_open_tables[table_name]= share;
223
 
}
224
 
 
225
 
void Tina::deleteOpenTable(const string &table_name)
226
 
{
227
 
  tina_open_tables.erase(table_name);
228
 
}
229
 
 
230
 
 
231
 
int Tina::doGetTableDefinition(Session &session,
232
 
                               const drizzled::TableIdentifier &identifier,
233
 
                               drizzled::message::Table &table_message)
234
 
{
235
 
  if (session.getTableMessage(identifier, table_message))
236
 
    return EEXIST;
237
 
 
238
 
  return ENOENT;
239
 
}
240
 
 
241
 
 
242
 
static Tina *tina_engine= NULL;
243
 
 
244
 
static int tina_init_func(drizzled::module::Context &context)
245
 
{
246
 
 
247
 
  tina_engine= new Tina("CSV");
248
 
  context.add(tina_engine);
249
 
 
 
90
int sort_set (tina_set *a, tina_set *b)
 
91
{
 
92
  /*
 
93
    We assume that intervals do not intersect. So, it is enought to compare
 
94
    any two points. Here we take start of intervals for comparison.
 
95
  */
 
96
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
97
}
 
98
 
 
99
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length,
 
100
                          bool not_used __attribute__((unused)))
 
101
{
 
102
  *length=share->table_name_length;
 
103
  return (unsigned char*) share->table_name;
 
104
}
 
105
 
 
106
static int tina_init_func(void *p)
 
107
{
 
108
  handlerton *tina_hton;
 
109
 
 
110
  tina_hton= (handlerton *)p;
250
111
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
251
 
  return 0;
252
 
}
253
 
 
254
 
 
255
 
 
256
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
257
 
  table_name(table_name_arg),
258
 
  data_file_name(table_name_arg),
259
 
  use_count(0),
260
 
  saved_data_file_length(0),
261
 
  update_file_opened(false),
262
 
  tina_write_opened(false),
263
 
  crashed(false),
264
 
  rows_recorded(0),
265
 
  data_file_version(0)
266
 
{
267
 
  data_file_name.append(CSV_EXT);
268
 
}
269
 
 
270
 
TinaShare::~TinaShare()
271
 
{
272
 
  pthread_mutex_destroy(&mutex);
273
 
}
 
112
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
113
                   (hash_get_key) tina_get_key,0,0);
 
114
  tina_hton->state= SHOW_OPTION_YES;
 
115
  tina_hton->create= tina_create_handler;
 
116
  tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES | 
 
117
                     HTON_NO_PARTITION);
 
118
  return 0;
 
119
}
 
120
 
 
121
static int tina_done_func(void *p __attribute__((unused)))
 
122
{
 
123
  hash_free(&tina_open_tables);
 
124
  pthread_mutex_destroy(&tina_mutex);
 
125
 
 
126
  return 0;
 
127
}
 
128
 
274
129
 
275
130
/*
276
131
  Simple lock controls.
277
132
*/
278
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
133
static TINA_SHARE *get_share(const char *table_name,
 
134
                             Table *table __attribute__((unused)))
279
135
{
280
 
  pthread_mutex_lock(&tina_mutex);
281
 
 
282
 
  Tina *a_tina= static_cast<Tina *>(engine);
283
 
  share= a_tina->findOpenTable(table_name);
284
 
 
285
 
  std::string meta_file_name;
 
136
  TINA_SHARE *share;
 
137
  char meta_file_name[FN_REFLEN];
286
138
  struct stat file_stat;
 
139
  char *tmp_name;
 
140
  uint32_t length;
 
141
 
 
142
  pthread_mutex_lock(&tina_mutex);
 
143
  length=(uint) strlen(table_name);
287
144
 
288
145
  /*
289
146
    If share is not present in the hash, create a new share and
290
147
    initialize its members.
291
148
  */
292
 
  if (! share)
 
149
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
150
                                        (unsigned char*) table_name,
 
151
                                       length)))
293
152
  {
294
 
    share= new TinaShare(table_name);
295
 
 
296
 
    if (share == NULL)
297
 
    {
298
 
      pthread_mutex_unlock(&tina_mutex);
299
 
      return NULL;
300
 
    }
301
 
 
302
 
    meta_file_name.assign(table_name);
303
 
    meta_file_name.append(CSM_EXT);
304
 
 
305
 
    if (stat(share->data_file_name.c_str(), &file_stat))
306
 
    {
307
 
      pthread_mutex_unlock(&tina_mutex);
308
 
      delete share;
309
 
      return NULL;
310
 
    }
311
 
  
 
153
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
154
                         &share, sizeof(*share),
 
155
                         &tmp_name, length+1,
 
156
                         NULL))
 
157
    {
 
158
      pthread_mutex_unlock(&tina_mutex);
 
159
      return NULL;
 
160
    }
 
161
 
 
162
    share->use_count= 0;
 
163
    share->table_name_length= length;
 
164
    share->table_name= tmp_name;
 
165
    share->crashed= false;
 
166
    share->rows_recorded= 0;
 
167
    share->update_file_opened= false;
 
168
    share->tina_write_opened= false;
 
169
    share->data_file_version= 0;
 
170
    my_stpcpy(share->table_name, table_name);
 
171
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
172
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
173
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
174
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
175
 
 
176
    if (stat(share->data_file_name, &file_stat))
 
177
      goto error;
312
178
    share->saved_data_file_length= file_stat.st_size;
313
179
 
314
 
    a_tina->addOpenTable(share->table_name, share);
315
 
 
 
180
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
181
      goto error;
 
182
    thr_lock_init(&share->lock);
316
183
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
317
184
 
318
185
    /*
321
188
      Usually this will result in auto-repair, and we will get a good
322
189
      meta-file in the end.
323
190
    */
324
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
325
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
191
    if ((share->meta_file= my_open(meta_file_name,
 
192
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
326
193
      share->crashed= true;
327
194
 
328
195
    /*
336
203
  pthread_mutex_unlock(&tina_mutex);
337
204
 
338
205
  return share;
 
206
 
 
207
error:
 
208
  pthread_mutex_unlock(&tina_mutex);
 
209
  free((unsigned char*) share);
 
210
 
 
211
  return NULL;
339
212
}
340
213
 
341
214
 
358
231
    non-zero - error occurred
359
232
*/
360
233
 
361
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
234
static int read_meta_file(File meta_file, ha_rows *rows)
362
235
{
363
236
  unsigned char meta_buffer[META_BUFFER_SIZE];
364
237
  unsigned char *ptr= meta_buffer;
365
238
 
366
 
  lseek(meta_file, 0, SEEK_SET);
367
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
239
  my_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
 
240
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
368
241
      != META_BUFFER_SIZE)
369
242
    return(HA_ERR_CRASHED_ON_USAGE);
370
243
 
386
259
      ((bool)(*ptr)== true))
387
260
    return(HA_ERR_CRASHED_ON_USAGE);
388
261
 
389
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
262
  my_sync(meta_file, MYF(MY_WME));
390
263
 
391
264
  return(0);
392
265
}
411
284
    non-zero - error occurred
412
285
*/
413
286
 
414
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
287
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
415
288
{
416
289
  unsigned char meta_buffer[META_BUFFER_SIZE];
417
290
  unsigned char *ptr= meta_buffer;
430
303
  ptr+= 3*sizeof(uint64_t);
431
304
  *ptr= (unsigned char)dirty;
432
305
 
433
 
  lseek(meta_file, 0, SEEK_SET);
434
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
306
  my_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
 
307
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
435
308
      != META_BUFFER_SIZE)
436
309
    return(-1);
437
310
 
438
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
311
  my_sync(meta_file, MYF(MY_WME));
439
312
 
440
313
  return(0);
441
314
}
442
315
 
 
316
bool ha_tina::check_and_repair(Session *session)
 
317
{
 
318
  HA_CHECK_OPT check_opt;
 
319
 
 
320
  check_opt.init();
 
321
 
 
322
  return(repair(session, &check_opt));
 
323
}
 
324
 
 
325
 
443
326
int ha_tina::init_tina_writer()
444
327
{
445
328
  /*
450
333
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
451
334
 
452
335
  if ((share->tina_write_filedes=
453
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
336
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
454
337
  {
455
338
    share->crashed= true;
456
339
    return(1);
461
344
}
462
345
 
463
346
 
 
347
bool ha_tina::is_crashed() const
 
348
{
 
349
  return(share->crashed);
 
350
}
 
351
 
464
352
/*
465
353
  Free lock controls.
466
354
*/
467
 
int ha_tina::free_share()
 
355
static int free_share(TINA_SHARE *share)
468
356
{
469
357
  pthread_mutex_lock(&tina_mutex);
470
358
  int result_code= 0;
472
360
    /* Write the meta file. Mark it as crashed if needed. */
473
361
    (void)write_meta_file(share->meta_file, share->rows_recorded,
474
362
                          share->crashed ? true :false);
475
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
363
    if (my_close(share->meta_file, MYF(0)))
476
364
      result_code= 1;
477
365
    if (share->tina_write_opened)
478
366
    {
479
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
367
      if (my_close(share->tina_write_filedes, MYF(0)))
480
368
        result_code= 1;
481
369
      share->tina_write_opened= false;
482
370
    }
483
371
 
484
 
    Tina *a_tina= static_cast<Tina *>(engine);
485
 
    a_tina->deleteOpenTable(share->table_name);
486
 
    delete share;
 
372
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
373
    thr_lock_delete(&share->lock);
 
374
    pthread_mutex_destroy(&share->mutex);
 
375
    free((unsigned char*) share);
487
376
  }
488
377
  pthread_mutex_unlock(&tina_mutex);
489
378
 
501
390
  '\r''\n' --  DOS\Windows line ending
502
391
*/
503
392
 
504
 
static off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
505
 
                            off_t end, int *eoln_len)
 
393
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
 
394
                     off_t end, int *eoln_len)
506
395
{
507
396
  *eoln_len= 0;
508
397
 
529
418
}
530
419
 
531
420
 
532
 
 
533
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
534
 
  :Cursor(engine_arg, table_arg),
 
421
static handler *tina_create_handler(handlerton *hton,
 
422
                                    TABLE_SHARE *table, 
 
423
                                    MEM_ROOT *mem_root)
 
424
{
 
425
  return new (mem_root) ha_tina(hton, table);
 
426
}
 
427
 
 
428
 
 
429
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
 
430
  :handler(hton, table_arg),
535
431
  /*
536
 
    These definitions are found in Cursor.h
 
432
    These definitions are found in handler.h
537
433
    They are not probably completely right.
538
434
  */
539
435
  current_position(0), next_position(0), local_saved_data_file_length(0),
540
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
436
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
437
  local_data_file_version(0), records_is_known(0)
541
438
{
542
439
  /* Set our original buffers from pre-allocated memory */
543
440
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
441
  chain= chain_buffer;
544
442
  file_buff= new Transparent_file();
545
443
}
546
444
 
549
447
  Encode a buffer into the quoted format.
550
448
*/
551
449
 
552
 
int ha_tina::encode_quote(unsigned char *)
 
450
int ha_tina::encode_quote(unsigned char *buf __attribute__((unused)))
553
451
{
554
452
  char attribute_buffer[1024];
555
453
  String attribute(attribute_buffer, sizeof(attribute_buffer),
557
455
 
558
456
  buffer.length(0);
559
457
 
560
 
  for (Field **field= table->getFields() ; *field ; field++)
 
458
  for (Field **field=table->field ; *field ; field++)
561
459
  {
562
460
    const char *ptr;
563
461
    const char *end_ptr;
564
462
    const bool was_null= (*field)->is_null();
565
 
 
 
463
    
566
464
    /*
567
465
      assistance for backwards compatibility in production builds.
568
466
      note: this will not work for ENUM columns.
573
471
      (*field)->set_notnull();
574
472
    }
575
473
 
576
 
    /* 
577
 
      Since we are needing to "translate" the type into a string we 
578
 
      will need to do a val_str(). This would cause an assert() to
579
 
      normally occur since we are onlying "writing" values.
580
 
    */
581
 
    (*field)->setReadSet();
582
474
    (*field)->val_str(&attribute,&attribute);
583
 
 
 
475
    
584
476
    if (was_null)
585
477
      (*field)->set_null();
586
478
 
591
483
 
592
484
      buffer.append('"');
593
485
 
594
 
      while (ptr < end_ptr)
 
486
      while (ptr < end_ptr) 
595
487
      {
596
488
        if (*ptr == '"')
597
489
        {
645
537
*/
646
538
int ha_tina::chain_append()
647
539
{
648
 
  if (chain.size() > 0 && chain.back().second == current_position)
649
 
    chain.back().second= next_position;
 
540
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
541
    (chain_ptr -1)->end= next_position;
650
542
  else
651
 
    chain.push_back(make_pair(current_position, next_position));
 
543
  {
 
544
    /* We set up for the next position */
 
545
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
546
    {
 
547
      off_t location= chain_ptr - chain;
 
548
      chain_size += DEFAULT_CHAIN_LENGTH;
 
549
      if (chain_alloced)
 
550
      {
 
551
        /* Must cast since my_malloc unlike malloc doesn't have a void ptr */
 
552
        if ((chain= (tina_set *) my_realloc((unsigned char*)chain,
 
553
                                            chain_size, MYF(MY_WME))) == NULL)
 
554
          return -1;
 
555
      }
 
556
      else
 
557
      {
 
558
        tina_set *ptr= (tina_set *) my_malloc(chain_size * sizeof(tina_set),
 
559
                                              MYF(MY_WME));
 
560
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
561
        chain= ptr;
 
562
        chain_alloced++;
 
563
      }
 
564
      chain_ptr= chain + location;
 
565
    }
 
566
    chain_ptr->begin= current_position;
 
567
    chain_ptr->end= next_position;
 
568
    chain_ptr++;
 
569
  }
 
570
 
652
571
  return 0;
653
572
}
654
573
 
661
580
  off_t end_offset, curr_offset= current_position;
662
581
  int eoln_len;
663
582
  int error;
 
583
  bool read_all;
664
584
 
665
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
585
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
666
586
 
667
587
  /*
668
588
    We do not read further then local_saved_data_file_length in order
673
593
                       local_saved_data_file_length, &eoln_len)) == 0)
674
594
    return(HA_ERR_END_OF_FILE);
675
595
 
 
596
  /* We must read all columns in case a table is opened for update */
 
597
  read_all= !bitmap_is_clear_all(table->write_set);
676
598
  error= HA_ERR_CRASHED_ON_USAGE;
677
599
 
678
 
  memset(buf, 0, table->getShare()->null_bytes);
 
600
  memset(buf, 0, table->s->null_bytes);
679
601
 
680
 
  for (Field **field=table->getFields() ; *field ; field++)
 
602
  for (Field **field=table->field ; *field ; field++)
681
603
  {
682
604
    char curr_char;
683
605
 
742
664
      }
743
665
    }
744
666
 
745
 
    if ((*field)->isReadSet() || (*field)->isWriteSet())
 
667
    if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
746
668
    {
747
 
      /* This masks a bug in the logic for a SELECT * */
748
 
      (*field)->setWriteSet();
749
669
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
750
670
                          CHECK_FIELD_WARN))
751
671
        goto err;
752
 
 
753
672
      if ((*field)->flags & BLOB_FLAG)
754
673
      {
755
674
        Field_blob *blob= *(Field_blob**) field;
761
680
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
762
681
        if (src)
763
682
        {
764
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
765
 
          memmove(tgt, src, length);
 
683
          tgt= (unsigned char*) alloc_root(&blobroot, length);
 
684
          memcpy(tgt, src, length);
766
685
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
767
686
        }
768
687
      }
777
696
}
778
697
 
779
698
/*
 
699
  If frm_error() is called in table.cc this is called to find out what file
 
700
  extensions exist for this handler.
 
701
*/
 
702
static const char *ha_tina_exts[] = {
 
703
  CSV_EXT,
 
704
  CSM_EXT,
 
705
  NULL
 
706
};
 
707
 
 
708
const char **ha_tina::bas_ext() const
 
709
{
 
710
  return ha_tina_exts;
 
711
}
 
712
 
 
713
/*
 
714
  Three functions below are needed to enable concurrent insert functionality
 
715
  for CSV engine. For more details see mysys/thr_lock.c
 
716
*/
 
717
 
 
718
void tina_get_status(void* param,
 
719
                     int concurrent_insert __attribute__((unused)))
 
720
{
 
721
  ha_tina *tina= (ha_tina*) param;
 
722
  tina->get_status();
 
723
}
 
724
 
 
725
void tina_update_status(void* param)
 
726
{
 
727
  ha_tina *tina= (ha_tina*) param;
 
728
  tina->update_status();
 
729
}
 
730
 
 
731
/* this should exist and return 0 for concurrent insert to work */
 
732
bool tina_check_status(void* param __attribute__((unused)))
 
733
{
 
734
  return 0;
 
735
}
 
736
 
 
737
/*
 
738
  Save the state of the table
 
739
 
 
740
  SYNOPSIS
 
741
    get_status()
 
742
 
 
743
  DESCRIPTION
 
744
    This function is used to retrieve the file length. During the lock
 
745
    phase of concurrent insert. For more details see comment to
 
746
    ha_tina::update_status below.
 
747
*/
 
748
 
 
749
void ha_tina::get_status()
 
750
{
 
751
  local_saved_data_file_length= share->saved_data_file_length;
 
752
}
 
753
 
 
754
 
 
755
/*
 
756
  Correct the state of the table. Called by unlock routines
 
757
  before the write lock is released.
 
758
 
 
759
  SYNOPSIS
 
760
    update_status()
 
761
 
 
762
  DESCRIPTION
 
763
    When we employ concurrent insert lock, we save current length of the file
 
764
    during the lock phase. We do not read further saved value, as we don't
 
765
    want to interfere with undergoing concurrent insert. Writers update file
 
766
    length info during unlock with update_status().
 
767
 
 
768
  NOTE
 
769
    For log tables concurrent insert works different. The reason is that
 
770
    log tables are always opened and locked. And as they do not unlock
 
771
    tables, the file length after writes should be updated in a different
 
772
    way. 
 
773
*/
 
774
 
 
775
void ha_tina::update_status()
 
776
{
 
777
  /* correct local_saved_data_file_length for writers */
 
778
  share->saved_data_file_length= local_saved_data_file_length;
 
779
}
 
780
 
 
781
 
 
782
/*
780
783
  Open a database file. Keep in mind that tables are caches, so
781
784
  this will not be called for every request. Any sort of positions
782
785
  that need to be reset should be kept in the ::extra() call.
783
786
*/
784
 
int ha_tina::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
787
int ha_tina::open(const char *name, int mode __attribute__((unused)),
 
788
                  uint32_t open_options)
785
789
{
786
 
  if (not (share= get_share(identifier.getPath().c_str())))
787
 
    return(ENOENT);
 
790
  if (!(share= get_share(name, table)))
 
791
    return(HA_ERR_OUT_OF_MEM);
788
792
 
789
 
  if (share->crashed)
 
793
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
790
794
  {
791
 
    free_share();
 
795
    free_share(share);
792
796
    return(HA_ERR_CRASHED_ON_USAGE);
793
797
  }
794
798
 
795
799
  local_data_file_version= share->data_file_version;
796
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
800
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
797
801
    return(0);
798
802
 
799
803
  /*
800
 
    Init locking. Pass Cursor object to the locking routines,
 
804
    Init locking. Pass handler object to the locking routines,
801
805
    so that they could save/update local_saved_data_file_length value
802
806
    during locking. This is needed to enable concurrent inserts.
803
807
  */
 
808
  thr_lock_data_init(&share->lock, &lock, (void*) this);
804
809
  ref_length=sizeof(off_t);
805
810
 
 
811
  share->lock.get_status= tina_get_status;
 
812
  share->lock.update_status= tina_update_status;
 
813
  share->lock.check_status= tina_check_status;
 
814
 
806
815
  return(0);
807
816
}
808
817
 
 
818
 
809
819
/*
810
820
  Close a database file. We remove ourselves from the shared strucutre.
811
821
  If it is empty we destroy it.
813
823
int ha_tina::close(void)
814
824
{
815
825
  int rc= 0;
816
 
  rc= internal::my_close(data_file, MYF(0));
817
 
  return(free_share() || rc);
 
826
  rc= my_close(data_file, MYF(0));
 
827
  return(free_share(share) || rc);
818
828
}
819
829
 
820
830
/*
821
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
831
  This is an INSERT. At the moment this handler just seeks to the end
822
832
  of the file and appends the data. In an error case it really should
823
833
  just truncate to the original position (this is not done yet).
824
834
*/
825
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
835
int ha_tina::write_row(unsigned char * buf)
826
836
{
827
837
  int size;
828
838
 
829
839
  if (share->crashed)
830
840
      return(HA_ERR_CRASHED_ON_USAGE);
831
841
 
 
842
  ha_statistic_increment(&SSV::ha_write_count);
 
843
 
 
844
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
845
    table->timestamp_field->set_time();
 
846
 
832
847
  size= encode_quote(buf);
833
848
 
834
849
  if (!share->tina_write_opened)
836
851
      return(-1);
837
852
 
838
853
   /* use pwrite, as concurrent reader could have changed the position */
839
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
854
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
840
855
               MYF(MY_WME | MY_NABP)))
841
856
    return(-1);
842
857
 
861
876
  if (!share->update_file_opened)
862
877
  {
863
878
    if ((update_temp_file=
864
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
879
           my_create(fn_format(updated_fname, share->table_name,
865
880
                               "", CSN_EXT,
866
881
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
867
882
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
876
891
  This is called for an update.
877
892
  Make sure you put in code to increment the auto increment, also
878
893
  update any timestamp data. Currently auto increment is not being
879
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
894
  fixed since autoincrements have yet to be added to this table handler.
880
895
  This will be called in a table scan right before the previous ::rnd_next()
881
896
  call.
882
897
*/
883
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
898
int ha_tina::update_row(const unsigned char * old_data __attribute__((unused)),
 
899
                        unsigned char * new_data)
884
900
{
885
901
  int size;
886
902
  int rc= -1;
887
903
 
 
904
  ha_statistic_increment(&SSV::ha_update_count);
 
905
 
 
906
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
907
    table->timestamp_field->set_time();
 
908
 
888
909
  size= encode_quote(new_data);
889
910
 
890
911
  /*
891
 
    During update we mark each updating record as deleted
892
 
    (see the chain_append()) then write new one to the temporary data file.
893
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
912
    During update we mark each updating record as deleted 
 
913
    (see the chain_append()) then write new one to the temporary data file. 
 
914
    At the end of the sequence in the rnd_end() we append all non-marked
894
915
    records from the data file to the temporary data file then rename it.
895
916
    The temp_file_length is used to calculate new data file length.
896
917
  */
900
921
  if (open_update_temp_file_if_needed())
901
922
    goto err;
902
923
 
903
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
924
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
904
925
               MYF(MY_WME | MY_NABP)))
905
926
    goto err;
906
927
  temp_file_length+= size;
915
936
  Deletes a row. First the database will find the row, and then call this
916
937
  method. In the case of a table scan, the previous call to this will be
917
938
  the ::rnd_next() that found this row.
918
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
939
  The exception to this is an ORDER BY. This will cause the table handler
919
940
  to walk the table noting the positions of all rows that match a query.
920
941
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
921
942
  DESC, ASC).
922
943
*/
923
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
944
int ha_tina::delete_row(const unsigned char * buf __attribute__((unused)))
924
945
{
 
946
  ha_statistic_increment(&SSV::ha_delete_count);
925
947
 
926
948
  if (chain_append())
927
949
    return(-1);
939
961
 
940
962
/**
941
963
  @brief Initialize the data file.
942
 
 
 
964
  
943
965
  @details Compare the local version of the data file with the shared one.
944
966
  If they differ, there are some changes behind and we have to reopen
945
967
  the data file to make the changes visible.
946
 
  Call @c file_buff->init_buff() at the end to read the beginning of the
 
968
  Call @c file_buff->init_buff() at the end to read the beginning of the 
947
969
  data file into buffer.
948
 
 
 
970
  
949
971
  @retval  0  OK.
950
972
  @retval  1  There was an error.
951
973
*/
955
977
  if (local_data_file_version != share->data_file_version)
956
978
  {
957
979
    local_data_file_version= share->data_file_version;
958
 
    if (internal::my_close(data_file, MYF(0)) ||
959
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
980
    if (my_close(data_file, MYF(0)) ||
 
981
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
960
982
      return 1;
961
983
  }
962
984
  file_buff->init_buff(data_file);
968
990
  All table scans call this first.
969
991
  The order of a table scan is:
970
992
 
 
993
  ha_tina::store_lock
 
994
  ha_tina::external_lock
971
995
  ha_tina::info
972
996
  ha_tina::rnd_init
973
997
  ha_tina::extra
983
1007
  ha_tina::rnd_next
984
1008
  ha_tina::extra
985
1009
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1010
  ha_tina::external_lock
986
1011
  ha_tina::extra
987
1012
  ENUM HA_EXTRA_RESET   Reset database to after open
988
1013
 
992
1017
 
993
1018
*/
994
1019
 
995
 
int ha_tina::doStartTableScan(bool)
 
1020
int ha_tina::rnd_init(bool scan __attribute__((unused)))
996
1021
{
997
1022
  /* set buffer to the beginning of the file */
998
1023
  if (share->crashed || init_data_file())
1001
1026
  current_position= next_position= 0;
1002
1027
  stats.records= 0;
1003
1028
  records_is_known= 0;
1004
 
  chain.clear();
 
1029
  chain_ptr= chain;
1005
1030
 
1006
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1031
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1007
1032
 
1008
1033
  return(0);
1009
1034
}
1018
1043
  reserved for null count.
1019
1044
  Basically this works as a mask for which rows are nulled (compared to just
1020
1045
  empty).
1021
 
  This table Cursor doesn't do nulls and does not know the difference between
1022
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1046
  This table handler doesn't do nulls and does not know the difference between
 
1047
  NULL and "". This is ok since this table handler is for spreadsheets and
1023
1048
  they don't know about them either :)
1024
1049
*/
1025
1050
int ha_tina::rnd_next(unsigned char *buf)
1029
1054
  if (share->crashed)
1030
1055
      return(HA_ERR_CRASHED_ON_USAGE);
1031
1056
 
1032
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1057
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1033
1058
 
1034
1059
  current_position= next_position;
1035
1060
 
1053
1078
  its just a position. Look at the bdb code if you want to see a case
1054
1079
  where something other then a number is stored.
1055
1080
*/
1056
 
void ha_tina::position(const unsigned char *)
 
1081
void ha_tina::position(const unsigned char *record __attribute__((unused)))
1057
1082
{
1058
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1083
  my_store_ptr(ref, ref_length, current_position);
1059
1084
  return;
1060
1085
}
1061
1086
 
1062
1087
 
1063
1088
/*
1064
1089
  Used to fetch a row from a posiion stored with ::position().
1065
 
  internal::my_get_ptr() retrieves the data for you.
 
1090
  my_get_ptr() retrieves the data for you.
1066
1091
*/
1067
1092
 
1068
1093
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1069
1094
{
1070
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1071
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1095
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1096
  current_position= (off_t)my_get_ptr(pos,ref_length);
1072
1097
  return(find_current_row(buf));
1073
1098
}
1074
1099
 
1075
1100
/*
1076
1101
  ::info() is used to return information to the optimizer.
1077
 
  Currently this table Cursor doesn't implement most of the fields
 
1102
  Currently this table handler doesn't implement most of the fields
1078
1103
  really needed. SHOW also makes use of this data
1079
1104
*/
1080
 
int ha_tina::info(uint32_t)
 
1105
int ha_tina::info(uint32_t flag __attribute__((unused)))
1081
1106
{
1082
1107
  /* This is a lie, but you don't want the optimizer to see zero or 1 */
1083
 
  if (!records_is_known && stats.records < 2)
 
1108
  if (!records_is_known && stats.records < 2) 
1084
1109
    stats.records= 2;
1085
1110
  return(0);
1086
1111
}
1090
1115
  to the given "hole", stored in the buffer. "Valid" here means,
1091
1116
  not listed in the chain of deleted records ("holes").
1092
1117
*/
1093
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1118
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1094
1119
{
1095
 
  if (closest_hole == chain.end()) /* no more chains */
 
1120
  if (closest_hole == chain_ptr) /* no more chains */
1096
1121
    *end_pos= file_buff->end();
1097
1122
  else
1098
1123
    *end_pos= std::min(file_buff->end(),
1099
 
                       closest_hole->first);
1100
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1124
                       closest_hole->begin);
 
1125
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1101
1126
}
1102
1127
 
1103
1128
 
1107
1132
  slots to clean up all of the dead space we have collected while
1108
1133
  performing deletes/updates.
1109
1134
*/
1110
 
int ha_tina::doEndTableScan()
 
1135
int ha_tina::rnd_end()
1111
1136
{
 
1137
  char updated_fname[FN_REFLEN];
1112
1138
  off_t file_buffer_start= 0;
1113
1139
 
1114
 
  blobroot.free_root(MYF(0));
 
1140
  free_root(&blobroot, MYF(0));
1115
1141
  records_is_known= 1;
1116
1142
 
1117
 
  if (chain.size() > 0)
 
1143
  if ((chain_ptr - chain)  > 0)
1118
1144
  {
1119
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1145
    tina_set *ptr= chain;
1120
1146
 
1121
1147
    /*
1122
1148
      Re-read the beginning of a file (as the buffer should point to the
1128
1154
      The sort is needed when there were updates/deletes with random orders.
1129
1155
      It sorts so that we move the firts blocks to the beginning.
1130
1156
    */
1131
 
    sort(chain.begin(), chain.end());
 
1157
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1158
             (qsort_cmp)sort_set);
1132
1159
 
1133
1160
    off_t write_begin= 0, write_end;
1134
1161
 
1141
1168
    {
1142
1169
      bool in_hole= get_write_pos(&write_end, ptr);
1143
1170
      off_t write_length= write_end - write_begin;
1144
 
      if ((uint64_t)write_length > SIZE_MAX)
1145
 
      {
1146
 
        goto error;
1147
 
      }
1148
1171
 
1149
1172
      /* if there is something to write, write it */
1150
1173
      if (write_length)
1151
1174
      {
1152
 
        if (internal::my_write(update_temp_file,
 
1175
        if (my_write(update_temp_file, 
1153
1176
                     (unsigned char*) (file_buff->ptr() +
1154
1177
                               (write_begin - file_buff->start())),
1155
 
                     (size_t)write_length, MYF_RW))
 
1178
                     write_length, MYF_RW))
1156
1179
          goto error;
1157
1180
        temp_file_length+= write_length;
1158
1181
      }
1159
1182
      if (in_hole)
1160
1183
      {
1161
1184
        /* skip hole */
1162
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1185
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1163
1186
          file_buffer_start= file_buff->read_next();
1164
 
        write_begin= ptr->second;
1165
 
        ++ptr;
 
1187
        write_begin= ptr->end;
 
1188
        ptr++;
1166
1189
      }
1167
1190
      else
1168
1191
        write_begin= write_end;
1172
1195
 
1173
1196
    }
1174
1197
 
1175
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1176
 
        internal::my_close(update_temp_file, MYF(0)))
 
1198
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1199
        my_close(update_temp_file, MYF(0)))
1177
1200
      return(-1);
1178
1201
 
1179
1202
    share->update_file_opened= false;
1180
1203
 
1181
1204
    if (share->tina_write_opened)
1182
1205
    {
1183
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1206
      if (my_close(share->tina_write_filedes, MYF(0)))
1184
1207
        return(-1);
1185
1208
      /*
1186
1209
        Mark that the writer fd is closed, so that init_tina_writer()
1193
1216
      Close opened fildes's. Then move updated file in place
1194
1217
      of the old datafile.
1195
1218
    */
1196
 
    std::string rename_file= share->table_name;
1197
 
    rename_file.append(CSN_EXT);
1198
 
    if (internal::my_close(data_file, MYF(0)) ||
1199
 
        internal::my_rename(rename_file.c_str(),
1200
 
                            share->data_file_name.c_str(), MYF(0)))
 
1219
    if (my_close(data_file, MYF(0)) ||
 
1220
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1221
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1222
                  share->data_file_name, MYF(0)))
1201
1223
      return(-1);
1202
1224
 
1203
1225
    /* Open the file again */
1204
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1226
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1205
1227
      return(-1);
1206
1228
    /*
1207
 
      As we reopened the data file, increase share->data_file_version
1208
 
      in order to force other threads waiting on a table lock and
 
1229
      As we reopened the data file, increase share->data_file_version 
 
1230
      in order to force other threads waiting on a table lock and  
1209
1231
      have already opened the table to reopen the data file.
1210
1232
      That makes the latest changes become visible to them.
1211
 
      Update local_data_file_version as no need to reopen it in the
 
1233
      Update local_data_file_version as no need to reopen it in the 
1212
1234
      current thread.
1213
1235
    */
1214
1236
    share->data_file_version++;
1219
1241
      Here we record this fact to the meta-file.
1220
1242
    */
1221
1243
    (void)write_meta_file(share->meta_file, share->rows_recorded, false);
1222
 
    /*
1223
 
      Update local_saved_data_file_length with the real length of the
 
1244
    /* 
 
1245
      Update local_saved_data_file_length with the real length of the 
1224
1246
      data file.
1225
1247
    */
1226
1248
    local_saved_data_file_length= temp_file_length;
1228
1250
 
1229
1251
  return(0);
1230
1252
error:
1231
 
  internal::my_close(update_temp_file, MYF(0));
 
1253
  my_close(update_temp_file, MYF(0));
1232
1254
  share->update_file_opened= false;
1233
1255
  return(-1);
1234
1256
}
1235
1257
 
1236
1258
 
1237
1259
/*
 
1260
  Repair CSV table in the case, it is crashed.
 
1261
 
 
1262
  SYNOPSIS
 
1263
    repair()
 
1264
    session         The thread, performing repair
 
1265
    check_opt   The options for repair. We do not use it currently.
 
1266
 
 
1267
  DESCRIPTION
 
1268
    If the file is empty, change # of rows in the file and complete recovery.
 
1269
    Otherwise, scan the table looking for bad rows. If none were found,
 
1270
    we mark file as a good one and return. If a bad row was encountered,
 
1271
    we truncate the datafile up to the last good row.
 
1272
 
 
1273
   TODO: Make repair more clever - it should try to recover subsequent
 
1274
         rows (after the first bad one) as well.
 
1275
*/
 
1276
 
 
1277
int ha_tina::repair(Session* session,
 
1278
                    HA_CHECK_OPT* check_opt __attribute__((unused)))
 
1279
{
 
1280
  char repaired_fname[FN_REFLEN];
 
1281
  unsigned char *buf;
 
1282
  File repair_file;
 
1283
  int rc;
 
1284
  ha_rows rows_repaired= 0;
 
1285
  off_t write_begin= 0, write_end;
 
1286
 
 
1287
  /* empty file */
 
1288
  if (!share->saved_data_file_length)
 
1289
  {
 
1290
    share->rows_recorded= 0;
 
1291
    goto end;
 
1292
  }
 
1293
 
 
1294
  /* Don't assert in field::val() functions */
 
1295
  table->use_all_columns();
 
1296
  if (!(buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1297
    return(HA_ERR_OUT_OF_MEM);
 
1298
 
 
1299
  /* position buffer to the start of the file */
 
1300
  if (init_data_file())
 
1301
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1302
 
 
1303
  /*
 
1304
    Local_saved_data_file_length is initialized during the lock phase.
 
1305
    Sometimes this is not getting executed before ::repair (e.g. for
 
1306
    the log tables). We set it manually here.
 
1307
  */
 
1308
  local_saved_data_file_length= share->saved_data_file_length;
 
1309
  /* set current position to the beginning of the file */
 
1310
  current_position= next_position= 0;
 
1311
 
 
1312
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1313
 
 
1314
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1315
  while (!(rc= find_current_row(buf)))
 
1316
  {
 
1317
    session_inc_row_count(session);
 
1318
    rows_repaired++;
 
1319
    current_position= next_position;
 
1320
  }
 
1321
 
 
1322
  free_root(&blobroot, MYF(0));
 
1323
 
 
1324
  free((char*)buf);
 
1325
 
 
1326
  if (rc == HA_ERR_END_OF_FILE)
 
1327
  {
 
1328
    /*
 
1329
      All rows were read ok until end of file, the file does not need repair.
 
1330
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1331
      to the current amount of rows.
 
1332
    */
 
1333
    share->rows_recorded= rows_repaired;
 
1334
    goto end;
 
1335
  }
 
1336
 
 
1337
  /*
 
1338
    Otherwise we've encountered a bad row => repair is needed.
 
1339
    Let us create a temporary file.
 
1340
  */
 
1341
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1342
                                        "", CSN_EXT,
 
1343
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1344
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1345
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1346
 
 
1347
  file_buff->init_buff(data_file);
 
1348
 
 
1349
 
 
1350
  /* we just truncated the file up to the first bad row. update rows count. */
 
1351
  share->rows_recorded= rows_repaired;
 
1352
 
 
1353
  /* write repaired file */
 
1354
  while (1)
 
1355
  {
 
1356
    write_end= std::min(file_buff->end(), current_position);
 
1357
    if ((write_end - write_begin) &&
 
1358
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1359
                  write_end - write_begin, MYF_RW)))
 
1360
      return(-1);
 
1361
 
 
1362
    write_begin= write_end;
 
1363
    if (write_end== current_position)
 
1364
      break;
 
1365
    else
 
1366
      file_buff->read_next(); /* shift the buffer */
 
1367
  }
 
1368
 
 
1369
  /*
 
1370
    Close the files and rename repaired file to the datafile.
 
1371
    We have to close the files, as on Windows one cannot rename
 
1372
    a file, which descriptor is still open. EACCES will be returned
 
1373
    when trying to delete the "to"-file in my_rename().
 
1374
  */
 
1375
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1376
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1377
    return(-1);
 
1378
 
 
1379
  /* Open the file again, it should now be repaired */
 
1380
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1381
                          MYF(0))) == -1)
 
1382
     return(-1);
 
1383
 
 
1384
  /* Set new file size. The file size will be updated by ::update_status() */
 
1385
  local_saved_data_file_length= (size_t) current_position;
 
1386
 
 
1387
end:
 
1388
  share->crashed= false;
 
1389
  return(HA_ADMIN_OK);
 
1390
}
 
1391
 
 
1392
/*
1238
1393
  DELETE without WHERE calls this
1239
1394
*/
1240
1395
 
1243
1398
  int rc;
1244
1399
 
1245
1400
  if (!records_is_known)
1246
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1401
    return(my_errno=HA_ERR_WRONG_COMMAND);
1247
1402
 
1248
1403
  if (!share->tina_write_opened)
1249
1404
    if (init_tina_writer())
1262
1417
}
1263
1418
 
1264
1419
/*
 
1420
  Called by the database to lock the table. Keep in mind that this
 
1421
  is an internal lock.
 
1422
*/
 
1423
THR_LOCK_DATA **ha_tina::store_lock(Session *session __attribute__((unused)),
 
1424
                                    THR_LOCK_DATA **to,
 
1425
                                    enum thr_lock_type lock_type)
 
1426
{
 
1427
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1428
    lock.type=lock_type;
 
1429
  *to++= &lock;
 
1430
  return to;
 
1431
}
 
1432
 
 
1433
/* 
1265
1434
  Create a table. You do not want to leave the table open after a call to
1266
1435
  this (the database will call ::open() if it needs to).
1267
1436
*/
1268
1437
 
1269
 
int Tina::doCreateTable(Session &session,
1270
 
                        Table& table_arg,
1271
 
                        const drizzled::TableIdentifier &identifier,
1272
 
                        drizzled::message::Table &create_proto)
 
1438
int ha_tina::create(const char *name, Table *table_arg,
 
1439
                    HA_CREATE_INFO *create_info __attribute__((unused)))
1273
1440
{
1274
1441
  char name_buff[FN_REFLEN];
1275
 
  int create_file;
 
1442
  File create_file;
1276
1443
 
1277
1444
  /*
1278
1445
    check columns
1279
1446
  */
1280
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
 
       iter != fields.end();
1283
 
       iter++)
 
1447
  for (Field **field= table_arg->s->field; *field; field++)
1284
1448
  {
1285
 
    if (not *iter) // Historical legacy for NULL array end.
1286
 
      continue;
1287
 
 
1288
 
    if ((*iter)->real_maybe_null())
 
1449
    if ((*field)->real_maybe_null())
1289
1450
    {
1290
1451
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1452
      return(HA_ERR_UNSUPPORTED);
1292
1453
    }
1293
1454
  }
1294
 
 
1295
 
 
1296
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1455
  
 
1456
 
 
1457
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
 
1458
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1459
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1299
1460
    return(-1);
1300
1461
 
1301
1462
  write_meta_file(create_file, 0, false);
1302
 
  internal::my_close(create_file, MYF(0));
 
1463
  my_close(create_file, MYF(0));
1303
1464
 
1304
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1465
  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
 
1466
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1467
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1307
1468
    return(-1);
1308
1469
 
1309
 
  internal::my_close(create_file, MYF(0));
1310
 
 
1311
 
  session.storeTableMessage(identifier, create_proto);
1312
 
 
1313
 
  return 0;
1314
 
}
1315
 
 
1316
 
 
1317
 
DRIZZLE_DECLARE_PLUGIN
1318
 
{
1319
 
  DRIZZLE_VERSION_ID,
 
1470
  my_close(create_file, MYF(0));
 
1471
 
 
1472
  return(0);
 
1473
}
 
1474
 
 
1475
int ha_tina::check(Session* session,
 
1476
                   HA_CHECK_OPT* check_opt __attribute__((unused)))
 
1477
{
 
1478
  int rc= 0;
 
1479
  unsigned char *buf;
 
1480
  const char *old_proc_info;
 
1481
  ha_rows count= share->rows_recorded;
 
1482
 
 
1483
  old_proc_info= get_session_proc_info(session);
 
1484
  set_session_proc_info(session, "Checking table");
 
1485
  if (!(buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1486
    return(HA_ERR_OUT_OF_MEM);
 
1487
 
 
1488
  /* position buffer to the start of the file */
 
1489
   if (init_data_file())
 
1490
     return(HA_ERR_CRASHED);
 
1491
 
 
1492
  /*
 
1493
    Local_saved_data_file_length is initialized during the lock phase.
 
1494
    Check does not use store_lock in certain cases. So, we set it
 
1495
    manually here.
 
1496
  */
 
1497
  local_saved_data_file_length= share->saved_data_file_length;
 
1498
  /* set current position to the beginning of the file */
 
1499
  current_position= next_position= 0;
 
1500
 
 
1501
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1502
 
 
1503
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1504
  while (!(rc= find_current_row(buf)))
 
1505
  {
 
1506
    session_inc_row_count(session);
 
1507
    count--;
 
1508
    current_position= next_position;
 
1509
  }
 
1510
  
 
1511
  free_root(&blobroot, MYF(0));
 
1512
 
 
1513
  free((char*)buf);
 
1514
  set_session_proc_info(session, old_proc_info);
 
1515
 
 
1516
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1517
  {
 
1518
    share->crashed= true;
 
1519
    return(HA_ADMIN_CORRUPT);
 
1520
  }
 
1521
  else
 
1522
    return(HA_ADMIN_OK);
 
1523
}
 
1524
 
 
1525
 
 
1526
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *info __attribute__((unused)),
 
1527
                                         uint32_t table_changes __attribute__((unused)))
 
1528
{
 
1529
  return COMPATIBLE_DATA_YES;
 
1530
}
 
1531
 
 
1532
mysql_declare_plugin(csv)
 
1533
{
 
1534
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
1320
1535
  "CSV",
1321
1536
  "1.0",
1322
1537
  "Brian Aker, MySQL AB",
1323
1538
  "CSV storage engine",
1324
1539
  PLUGIN_LICENSE_GPL,
1325
1540
  tina_init_func, /* Plugin Init */
 
1541
  tina_done_func, /* Plugin Deinit */
 
1542
  NULL,                       /* status variables                */
1326
1543
  NULL,                       /* system variables                */
1327
1544
  NULL                        /* config options                  */
1328
1545
}
1329
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1546
mysql_declare_plugin_end;
1330
1547