~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/csv/ha_tina.cc

  • Committer: Monty Taylor
  • Date: 2008-10-16 09:12:23 UTC
  • mto: (511.1.6 codestyle)
  • mto: This revision was merged to the branch mainline in revision 521.
  • Revision ID: monty@inaugust.com-20081016091223-17ngih0qu9vssjs3
We pass -Wunused-macros now!

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
44
 
#include <drizzled/field.h>
45
 
#include <drizzled/field/blob.h>
46
 
#include <drizzled/field/timestamp.h>
47
 
#include <drizzled/error.h>
48
 
#include <drizzled/table.h>
49
 
#include <drizzled/session.h>
50
 
#include "drizzled/internal/my_sys.h"
51
 
 
 
43
#include <drizzled/common_includes.h>
52
44
#include "ha_tina.h"
53
45
 
54
 
#include <fcntl.h>
55
 
 
56
 
#include <algorithm>
57
 
#include <vector>
58
 
#include <string>
59
 
#include <map>
60
 
 
61
 
using namespace std;
62
 
using namespace drizzled;
63
46
 
64
47
/*
65
48
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
75
58
#define CSM_EXT ".CSM"               // Meta file
76
59
 
77
60
 
78
 
static int read_meta_file(int meta_file, ha_rows *rows);
79
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
61
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
62
static int free_share(TINA_SHARE *share);
 
63
static int read_meta_file(File meta_file, ha_rows *rows);
 
64
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
65
 
 
66
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
67
extern "C" void tina_update_status(void* param);
 
68
extern "C" bool tina_check_status(void* param);
80
69
 
81
70
/* Stuff for shares */
82
71
pthread_mutex_t tina_mutex;
 
72
static HASH tina_open_tables;
 
73
static handler *tina_create_handler(handlerton *hton,
 
74
                                    TABLE_SHARE *table, 
 
75
                                    MEM_ROOT *mem_root);
 
76
 
83
77
 
84
78
/*****************************************************************************
85
79
 ** TINA tables
86
80
 *****************************************************************************/
87
81
 
88
82
/*
89
 
  If frm_error() is called in table.cc this is called to find out what file
90
 
  extensions exist for this Cursor.
 
83
  Used for sorting chains with qsort().
91
84
*/
92
 
static const char *ha_tina_exts[] = {
93
 
  CSV_EXT,
94
 
  CSM_EXT,
95
 
  NULL
96
 
};
97
 
 
98
 
class Tina : public drizzled::plugin::StorageEngine
99
 
{
100
 
  typedef std::map<string, TinaShare*> TinaMap;
101
 
  TinaMap tina_open_tables;
102
 
public:
103
 
  Tina(const string& name_arg)
104
 
   : drizzled::plugin::StorageEngine(name_arg,
105
 
                                     HTON_TEMPORARY_ONLY |
106
 
                                     HTON_NO_AUTO_INCREMENT |
107
 
                                     HTON_SKIP_STORE_LOCK),
108
 
    tina_open_tables()
109
 
  {}
110
 
  virtual ~Tina()
111
 
  {
112
 
    pthread_mutex_destroy(&tina_mutex);
113
 
  }
114
 
 
115
 
  virtual Cursor *create(TableShare &table)
116
 
  {
117
 
    return new ha_tina(*this, table);
118
 
  }
119
 
 
120
 
  const char **bas_ext() const {
121
 
    return ha_tina_exts;
122
 
  }
123
 
 
124
 
  int doCreateTable(Session &,
125
 
                    Table &table_arg,
126
 
                    const drizzled::TableIdentifier &identifier,
127
 
                    drizzled::message::Table&);
128
 
 
129
 
  int doGetTableDefinition(Session& session,
130
 
                           const drizzled::TableIdentifier &identifier,
131
 
                           drizzled::message::Table &table_message);
132
 
 
133
 
  int doDropTable(Session&, const drizzled::TableIdentifier &identifier);
134
 
  TinaShare *findOpenTable(const string table_name);
135
 
  void addOpenTable(const string &table_name, TinaShare *);
136
 
  void deleteOpenTable(const string &table_name);
137
 
 
138
 
 
139
 
  uint32_t max_keys()          const { return 0; }
140
 
  uint32_t max_key_parts()     const { return 0; }
141
 
  uint32_t max_key_length()    const { return 0; }
142
 
  bool doDoesTableExist(Session& session, const drizzled::TableIdentifier &identifier);
143
 
  int doRenameTable(Session&, const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to);
144
 
 
145
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
146
 
                             const drizzled::SchemaIdentifier &schema_identifier,
147
 
                             drizzled::TableIdentifiers &set_of_identifiers);
148
 
};
149
 
 
150
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
151
 
                                 const drizzled::SchemaIdentifier&,
152
 
                                 drizzled::TableIdentifiers&)
153
 
{
154
 
}
155
 
 
156
 
int Tina::doRenameTable(Session &session,
157
 
                        const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to)
158
 
{
159
 
  int error= 0;
160
 
  for (const char **ext= bas_ext(); *ext ; ext++)
161
 
  {
162
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
 
    {
164
 
      if ((error=errno) != ENOENT)
165
 
        break;
166
 
      error= 0;
167
 
    }
168
 
  }
169
 
 
170
 
  session.renameTableMessage(from, to);
171
 
 
172
 
  return error;
173
 
}
174
 
 
175
 
bool Tina::doDoesTableExist(Session &session, const drizzled::TableIdentifier &identifier)
176
 
{
177
 
  return session.doesTableMessageExist(identifier);
178
 
}
179
 
 
180
 
 
181
 
int Tina::doDropTable(Session &session,
182
 
                      const drizzled::TableIdentifier &identifier)
183
 
{
184
 
  int error= 0;
185
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
186
 
 
187
 
  for (const char **ext= bas_ext(); *ext ; ext++)
188
 
  {
189
 
    std::string full_name= identifier.getPath();
190
 
    full_name.append(*ext);
191
 
 
192
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
193
 
    {
194
 
      if ((error= errno) != ENOENT)
195
 
        break;
196
 
    }
197
 
    else
198
 
    {
199
 
      enoent_or_zero= 0;                        // No error for ENOENT
200
 
    }
201
 
    error= enoent_or_zero;
202
 
  }
203
 
 
204
 
  session.removeTableMessage(identifier);
205
 
 
206
 
  return error;
207
 
}
208
 
 
209
 
TinaShare *Tina::findOpenTable(const string table_name)
210
 
{
211
 
  TinaMap::iterator find_iter=
212
 
    tina_open_tables.find(table_name);
213
 
 
214
 
  if (find_iter != tina_open_tables.end())
215
 
    return (*find_iter).second;
216
 
  else
217
 
    return NULL;
218
 
}
219
 
 
220
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
221
 
{
222
 
  tina_open_tables[table_name]= share;
223
 
}
224
 
 
225
 
void Tina::deleteOpenTable(const string &table_name)
226
 
{
227
 
  tina_open_tables.erase(table_name);
228
 
}
229
 
 
230
 
 
231
 
int Tina::doGetTableDefinition(Session &session,
232
 
                               const drizzled::TableIdentifier &identifier,
233
 
                               drizzled::message::Table &table_message)
234
 
{
235
 
  if (session.getTableMessage(identifier, table_message))
236
 
    return EEXIST;
237
 
 
238
 
  return ENOENT;
239
 
}
240
 
 
241
 
 
242
 
static Tina *tina_engine= NULL;
243
 
 
244
 
static int tina_init_func(drizzled::module::Context &context)
245
 
{
246
 
 
247
 
  tina_engine= new Tina("CSV");
248
 
  context.add(tina_engine);
249
 
 
 
85
int sort_set (tina_set *a, tina_set *b)
 
86
{
 
87
  /*
 
88
    We assume that intervals do not intersect. So, it is enought to compare
 
89
    any two points. Here we take start of intervals for comparison.
 
90
  */
 
91
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
92
}
 
93
 
 
94
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length,
 
95
                          bool not_used __attribute__((unused)))
 
96
{
 
97
  *length=share->table_name_length;
 
98
  return (unsigned char*) share->table_name;
 
99
}
 
100
 
 
101
static int tina_init_func(void *p)
 
102
{
 
103
  handlerton *tina_hton;
 
104
 
 
105
  tina_hton= (handlerton *)p;
250
106
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
251
 
  return 0;
252
 
}
253
 
 
254
 
 
255
 
 
256
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
257
 
  table_name(table_name_arg),
258
 
  data_file_name(table_name_arg),
259
 
  use_count(0),
260
 
  saved_data_file_length(0),
261
 
  update_file_opened(false),
262
 
  tina_write_opened(false),
263
 
  crashed(false),
264
 
  rows_recorded(0),
265
 
  data_file_version(0)
266
 
{
267
 
  data_file_name.append(CSV_EXT);
268
 
}
269
 
 
270
 
TinaShare::~TinaShare()
271
 
{
272
 
  pthread_mutex_destroy(&mutex);
273
 
}
 
107
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
108
                   (hash_get_key) tina_get_key,0,0);
 
109
  tina_hton->state= SHOW_OPTION_YES;
 
110
  tina_hton->create= tina_create_handler;
 
111
  tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES | 
 
112
                     HTON_NO_PARTITION);
 
113
  return 0;
 
114
}
 
115
 
 
116
static int tina_done_func(void *p __attribute__((unused)))
 
117
{
 
118
  hash_free(&tina_open_tables);
 
119
  pthread_mutex_destroy(&tina_mutex);
 
120
 
 
121
  return 0;
 
122
}
 
123
 
274
124
 
275
125
/*
276
126
  Simple lock controls.
277
127
*/
278
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
128
static TINA_SHARE *get_share(const char *table_name,
 
129
                             Table *table __attribute__((unused)))
279
130
{
280
 
  pthread_mutex_lock(&tina_mutex);
281
 
 
282
 
  Tina *a_tina= static_cast<Tina *>(engine);
283
 
  share= a_tina->findOpenTable(table_name);
284
 
 
285
 
  std::string meta_file_name;
 
131
  TINA_SHARE *share;
 
132
  char meta_file_name[FN_REFLEN];
286
133
  struct stat file_stat;
 
134
  char *tmp_name;
 
135
  uint32_t length;
 
136
 
 
137
  pthread_mutex_lock(&tina_mutex);
 
138
  length=(uint) strlen(table_name);
287
139
 
288
140
  /*
289
141
    If share is not present in the hash, create a new share and
290
142
    initialize its members.
291
143
  */
292
 
  if (! share)
 
144
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
145
                                        (unsigned char*) table_name,
 
146
                                       length)))
293
147
  {
294
 
    share= new TinaShare(table_name);
295
 
 
296
 
    if (share == NULL)
297
 
    {
298
 
      pthread_mutex_unlock(&tina_mutex);
299
 
      return NULL;
300
 
    }
301
 
 
302
 
    meta_file_name.assign(table_name);
303
 
    meta_file_name.append(CSM_EXT);
304
 
 
305
 
    if (stat(share->data_file_name.c_str(), &file_stat))
306
 
    {
307
 
      pthread_mutex_unlock(&tina_mutex);
308
 
      delete share;
309
 
      return NULL;
310
 
    }
311
 
  
 
148
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
149
                         &share, sizeof(*share),
 
150
                         &tmp_name, length+1,
 
151
                         NULL))
 
152
    {
 
153
      pthread_mutex_unlock(&tina_mutex);
 
154
      return NULL;
 
155
    }
 
156
 
 
157
    share->use_count= 0;
 
158
    share->table_name_length= length;
 
159
    share->table_name= tmp_name;
 
160
    share->crashed= false;
 
161
    share->rows_recorded= 0;
 
162
    share->update_file_opened= false;
 
163
    share->tina_write_opened= false;
 
164
    share->data_file_version= 0;
 
165
    my_stpcpy(share->table_name, table_name);
 
166
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
167
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
168
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
169
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
170
 
 
171
    if (stat(share->data_file_name, &file_stat))
 
172
      goto error;
312
173
    share->saved_data_file_length= file_stat.st_size;
313
174
 
314
 
    a_tina->addOpenTable(share->table_name, share);
315
 
 
 
175
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
176
      goto error;
 
177
    thr_lock_init(&share->lock);
316
178
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
317
179
 
318
180
    /*
321
183
      Usually this will result in auto-repair, and we will get a good
322
184
      meta-file in the end.
323
185
    */
324
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
325
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
186
    if ((share->meta_file= my_open(meta_file_name,
 
187
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
326
188
      share->crashed= true;
327
189
 
328
190
    /*
336
198
  pthread_mutex_unlock(&tina_mutex);
337
199
 
338
200
  return share;
 
201
 
 
202
error:
 
203
  pthread_mutex_unlock(&tina_mutex);
 
204
  free((unsigned char*) share);
 
205
 
 
206
  return NULL;
339
207
}
340
208
 
341
209
 
358
226
    non-zero - error occurred
359
227
*/
360
228
 
361
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
229
static int read_meta_file(File meta_file, ha_rows *rows)
362
230
{
363
231
  unsigned char meta_buffer[META_BUFFER_SIZE];
364
232
  unsigned char *ptr= meta_buffer;
365
233
 
366
 
  lseek(meta_file, 0, SEEK_SET);
367
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
234
  my_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
 
235
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
368
236
      != META_BUFFER_SIZE)
369
237
    return(HA_ERR_CRASHED_ON_USAGE);
370
238
 
386
254
      ((bool)(*ptr)== true))
387
255
    return(HA_ERR_CRASHED_ON_USAGE);
388
256
 
389
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
257
  my_sync(meta_file, MYF(MY_WME));
390
258
 
391
259
  return(0);
392
260
}
411
279
    non-zero - error occurred
412
280
*/
413
281
 
414
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
282
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
415
283
{
416
284
  unsigned char meta_buffer[META_BUFFER_SIZE];
417
285
  unsigned char *ptr= meta_buffer;
430
298
  ptr+= 3*sizeof(uint64_t);
431
299
  *ptr= (unsigned char)dirty;
432
300
 
433
 
  lseek(meta_file, 0, SEEK_SET);
434
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
301
  my_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
 
302
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
435
303
      != META_BUFFER_SIZE)
436
304
    return(-1);
437
305
 
438
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
306
  my_sync(meta_file, MYF(MY_WME));
439
307
 
440
308
  return(0);
441
309
}
442
310
 
 
311
bool ha_tina::check_and_repair(THD *thd)
 
312
{
 
313
  HA_CHECK_OPT check_opt;
 
314
 
 
315
  check_opt.init();
 
316
 
 
317
  return(repair(thd, &check_opt));
 
318
}
 
319
 
 
320
 
443
321
int ha_tina::init_tina_writer()
444
322
{
445
323
  /*
450
328
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
451
329
 
452
330
  if ((share->tina_write_filedes=
453
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
331
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
454
332
  {
455
333
    share->crashed= true;
456
334
    return(1);
461
339
}
462
340
 
463
341
 
 
342
bool ha_tina::is_crashed() const
 
343
{
 
344
  return(share->crashed);
 
345
}
 
346
 
464
347
/*
465
348
  Free lock controls.
466
349
*/
467
 
int ha_tina::free_share()
 
350
static int free_share(TINA_SHARE *share)
468
351
{
469
352
  pthread_mutex_lock(&tina_mutex);
470
353
  int result_code= 0;
472
355
    /* Write the meta file. Mark it as crashed if needed. */
473
356
    (void)write_meta_file(share->meta_file, share->rows_recorded,
474
357
                          share->crashed ? true :false);
475
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
358
    if (my_close(share->meta_file, MYF(0)))
476
359
      result_code= 1;
477
360
    if (share->tina_write_opened)
478
361
    {
479
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
362
      if (my_close(share->tina_write_filedes, MYF(0)))
480
363
        result_code= 1;
481
364
      share->tina_write_opened= false;
482
365
    }
483
366
 
484
 
    Tina *a_tina= static_cast<Tina *>(engine);
485
 
    a_tina->deleteOpenTable(share->table_name);
486
 
    delete share;
 
367
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
368
    thr_lock_delete(&share->lock);
 
369
    pthread_mutex_destroy(&share->mutex);
 
370
    free((unsigned char*) share);
487
371
  }
488
372
  pthread_mutex_unlock(&tina_mutex);
489
373
 
501
385
  '\r''\n' --  DOS\Windows line ending
502
386
*/
503
387
 
504
 
static off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
505
 
                            off_t end, int *eoln_len)
 
388
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
 
389
                     off_t end, int *eoln_len)
506
390
{
507
391
  *eoln_len= 0;
508
392
 
529
413
}
530
414
 
531
415
 
532
 
 
533
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
534
 
  :Cursor(engine_arg, table_arg),
 
416
static handler *tina_create_handler(handlerton *hton,
 
417
                                    TABLE_SHARE *table, 
 
418
                                    MEM_ROOT *mem_root)
 
419
{
 
420
  return new (mem_root) ha_tina(hton, table);
 
421
}
 
422
 
 
423
 
 
424
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
 
425
  :handler(hton, table_arg),
535
426
  /*
536
 
    These definitions are found in Cursor.h
 
427
    These definitions are found in handler.h
537
428
    They are not probably completely right.
538
429
  */
539
430
  current_position(0), next_position(0), local_saved_data_file_length(0),
540
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
431
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
432
  local_data_file_version(0), records_is_known(0)
541
433
{
542
434
  /* Set our original buffers from pre-allocated memory */
543
435
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
436
  chain= chain_buffer;
544
437
  file_buff= new Transparent_file();
545
438
}
546
439
 
549
442
  Encode a buffer into the quoted format.
550
443
*/
551
444
 
552
 
int ha_tina::encode_quote(unsigned char *)
 
445
int ha_tina::encode_quote(unsigned char *buf __attribute__((unused)))
553
446
{
554
447
  char attribute_buffer[1024];
555
448
  String attribute(attribute_buffer, sizeof(attribute_buffer),
557
450
 
558
451
  buffer.length(0);
559
452
 
560
 
  for (Field **field= table->getFields() ; *field ; field++)
 
453
  for (Field **field=table->field ; *field ; field++)
561
454
  {
562
455
    const char *ptr;
563
456
    const char *end_ptr;
564
457
    const bool was_null= (*field)->is_null();
565
 
 
 
458
    
566
459
    /*
567
460
      assistance for backwards compatibility in production builds.
568
461
      note: this will not work for ENUM columns.
573
466
      (*field)->set_notnull();
574
467
    }
575
468
 
576
 
    /* 
577
 
      Since we are needing to "translate" the type into a string we 
578
 
      will need to do a val_str(). This would cause an assert() to
579
 
      normally occur since we are onlying "writing" values.
580
 
    */
581
 
    (*field)->setReadSet();
582
469
    (*field)->val_str(&attribute,&attribute);
583
 
 
 
470
    
584
471
    if (was_null)
585
472
      (*field)->set_null();
586
473
 
591
478
 
592
479
      buffer.append('"');
593
480
 
594
 
      while (ptr < end_ptr)
 
481
      while (ptr < end_ptr) 
595
482
      {
596
483
        if (*ptr == '"')
597
484
        {
645
532
*/
646
533
int ha_tina::chain_append()
647
534
{
648
 
  if (chain.size() > 0 && chain.back().second == current_position)
649
 
    chain.back().second= next_position;
 
535
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
536
    (chain_ptr -1)->end= next_position;
650
537
  else
651
 
    chain.push_back(make_pair(current_position, next_position));
 
538
  {
 
539
    /* We set up for the next position */
 
540
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
541
    {
 
542
      off_t location= chain_ptr - chain;
 
543
      chain_size += DEFAULT_CHAIN_LENGTH;
 
544
      if (chain_alloced)
 
545
      {
 
546
        /* Must cast since my_malloc unlike malloc doesn't have a void ptr */
 
547
        if ((chain= (tina_set *) my_realloc((unsigned char*)chain,
 
548
                                            chain_size, MYF(MY_WME))) == NULL)
 
549
          return -1;
 
550
      }
 
551
      else
 
552
      {
 
553
        tina_set *ptr= (tina_set *) my_malloc(chain_size * sizeof(tina_set),
 
554
                                              MYF(MY_WME));
 
555
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
556
        chain= ptr;
 
557
        chain_alloced++;
 
558
      }
 
559
      chain_ptr= chain + location;
 
560
    }
 
561
    chain_ptr->begin= current_position;
 
562
    chain_ptr->end= next_position;
 
563
    chain_ptr++;
 
564
  }
 
565
 
652
566
  return 0;
653
567
}
654
568
 
661
575
  off_t end_offset, curr_offset= current_position;
662
576
  int eoln_len;
663
577
  int error;
 
578
  bool read_all;
664
579
 
665
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
580
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
666
581
 
667
582
  /*
668
583
    We do not read further then local_saved_data_file_length in order
673
588
                       local_saved_data_file_length, &eoln_len)) == 0)
674
589
    return(HA_ERR_END_OF_FILE);
675
590
 
 
591
  /* We must read all columns in case a table is opened for update */
 
592
  read_all= !bitmap_is_clear_all(table->write_set);
676
593
  error= HA_ERR_CRASHED_ON_USAGE;
677
594
 
678
 
  memset(buf, 0, table->getShare()->null_bytes);
 
595
  memset(buf, 0, table->s->null_bytes);
679
596
 
680
 
  for (Field **field=table->getFields() ; *field ; field++)
 
597
  for (Field **field=table->field ; *field ; field++)
681
598
  {
682
599
    char curr_char;
683
 
 
 
600
    
684
601
    buffer.length(0);
685
602
    if (curr_offset >= end_offset)
686
603
      goto err;
728
645
        }
729
646
      }
730
647
    }
731
 
    else
 
648
    else 
732
649
    {
733
650
      for(; curr_offset < end_offset; curr_offset++)
734
651
      {
742
659
      }
743
660
    }
744
661
 
745
 
    if ((*field)->isReadSet() || (*field)->isWriteSet())
 
662
    if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
746
663
    {
747
 
      /* This masks a bug in the logic for a SELECT * */
748
 
      (*field)->setWriteSet();
749
664
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
750
665
                          CHECK_FIELD_WARN))
751
666
        goto err;
752
 
 
753
667
      if ((*field)->flags & BLOB_FLAG)
754
668
      {
755
669
        Field_blob *blob= *(Field_blob**) field;
756
670
        unsigned char *src, *tgt;
757
671
        uint32_t length, packlength;
758
 
 
 
672
        
759
673
        packlength= blob->pack_length_no_ptr();
760
674
        length= blob->get_length(blob->ptr);
761
675
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
762
676
        if (src)
763
677
        {
764
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
765
 
          memmove(tgt, src, length);
 
678
          tgt= (unsigned char*) alloc_root(&blobroot, length);
 
679
          memcpy(tgt, src, length);
766
680
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
767
681
        }
768
682
      }
777
691
}
778
692
 
779
693
/*
 
694
  If frm_error() is called in table.cc this is called to find out what file
 
695
  extensions exist for this handler.
 
696
*/
 
697
static const char *ha_tina_exts[] = {
 
698
  CSV_EXT,
 
699
  CSM_EXT,
 
700
  NULL
 
701
};
 
702
 
 
703
const char **ha_tina::bas_ext() const
 
704
{
 
705
  return ha_tina_exts;
 
706
}
 
707
 
 
708
/*
 
709
  Three functions below are needed to enable concurrent insert functionality
 
710
  for CSV engine. For more details see mysys/thr_lock.c
 
711
*/
 
712
 
 
713
void tina_get_status(void* param,
 
714
                     int concurrent_insert __attribute__((unused)))
 
715
{
 
716
  ha_tina *tina= (ha_tina*) param;
 
717
  tina->get_status();
 
718
}
 
719
 
 
720
void tina_update_status(void* param)
 
721
{
 
722
  ha_tina *tina= (ha_tina*) param;
 
723
  tina->update_status();
 
724
}
 
725
 
 
726
/* this should exist and return 0 for concurrent insert to work */
 
727
bool tina_check_status(void* param __attribute__((unused)))
 
728
{
 
729
  return 0;
 
730
}
 
731
 
 
732
/*
 
733
  Save the state of the table
 
734
 
 
735
  SYNOPSIS
 
736
    get_status()
 
737
 
 
738
  DESCRIPTION
 
739
    This function is used to retrieve the file length. During the lock
 
740
    phase of concurrent insert. For more details see comment to
 
741
    ha_tina::update_status below.
 
742
*/
 
743
 
 
744
void ha_tina::get_status()
 
745
{
 
746
  local_saved_data_file_length= share->saved_data_file_length;
 
747
}
 
748
 
 
749
 
 
750
/*
 
751
  Correct the state of the table. Called by unlock routines
 
752
  before the write lock is released.
 
753
 
 
754
  SYNOPSIS
 
755
    update_status()
 
756
 
 
757
  DESCRIPTION
 
758
    When we employ concurrent insert lock, we save current length of the file
 
759
    during the lock phase. We do not read further saved value, as we don't
 
760
    want to interfere with undergoing concurrent insert. Writers update file
 
761
    length info during unlock with update_status().
 
762
 
 
763
  NOTE
 
764
    For log tables concurrent insert works different. The reason is that
 
765
    log tables are always opened and locked. And as they do not unlock
 
766
    tables, the file length after writes should be updated in a different
 
767
    way. 
 
768
*/
 
769
 
 
770
void ha_tina::update_status()
 
771
{
 
772
  /* correct local_saved_data_file_length for writers */
 
773
  share->saved_data_file_length= local_saved_data_file_length;
 
774
}
 
775
 
 
776
 
 
777
/*
780
778
  Open a database file. Keep in mind that tables are caches, so
781
779
  this will not be called for every request. Any sort of positions
782
780
  that need to be reset should be kept in the ::extra() call.
783
781
*/
784
 
int ha_tina::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
782
int ha_tina::open(const char *name, int mode __attribute__((unused)),
 
783
                  uint32_t open_options)
785
784
{
786
 
  if (not (share= get_share(identifier.getPath().c_str())))
787
 
    return(ENOENT);
 
785
  if (!(share= get_share(name, table)))
 
786
    return(HA_ERR_OUT_OF_MEM);
788
787
 
789
 
  if (share->crashed)
 
788
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
790
789
  {
791
 
    free_share();
 
790
    free_share(share);
792
791
    return(HA_ERR_CRASHED_ON_USAGE);
793
792
  }
794
793
 
795
794
  local_data_file_version= share->data_file_version;
796
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
795
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
797
796
    return(0);
798
797
 
799
798
  /*
800
 
    Init locking. Pass Cursor object to the locking routines,
 
799
    Init locking. Pass handler object to the locking routines,
801
800
    so that they could save/update local_saved_data_file_length value
802
801
    during locking. This is needed to enable concurrent inserts.
803
802
  */
 
803
  thr_lock_data_init(&share->lock, &lock, (void*) this);
804
804
  ref_length=sizeof(off_t);
805
805
 
 
806
  share->lock.get_status= tina_get_status;
 
807
  share->lock.update_status= tina_update_status;
 
808
  share->lock.check_status= tina_check_status;
 
809
 
806
810
  return(0);
807
811
}
808
812
 
 
813
 
809
814
/*
810
815
  Close a database file. We remove ourselves from the shared strucutre.
811
816
  If it is empty we destroy it.
813
818
int ha_tina::close(void)
814
819
{
815
820
  int rc= 0;
816
 
  rc= internal::my_close(data_file, MYF(0));
817
 
  return(free_share() || rc);
 
821
  rc= my_close(data_file, MYF(0));
 
822
  return(free_share(share) || rc);
818
823
}
819
824
 
820
825
/*
821
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
826
  This is an INSERT. At the moment this handler just seeks to the end
822
827
  of the file and appends the data. In an error case it really should
823
828
  just truncate to the original position (this is not done yet).
824
829
*/
825
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
830
int ha_tina::write_row(unsigned char * buf)
826
831
{
827
832
  int size;
828
833
 
829
834
  if (share->crashed)
830
835
      return(HA_ERR_CRASHED_ON_USAGE);
831
836
 
 
837
  ha_statistic_increment(&SSV::ha_write_count);
 
838
 
 
839
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
840
    table->timestamp_field->set_time();
 
841
 
832
842
  size= encode_quote(buf);
833
843
 
834
844
  if (!share->tina_write_opened)
836
846
      return(-1);
837
847
 
838
848
   /* use pwrite, as concurrent reader could have changed the position */
839
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
849
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
840
850
               MYF(MY_WME | MY_NABP)))
841
851
    return(-1);
842
852
 
861
871
  if (!share->update_file_opened)
862
872
  {
863
873
    if ((update_temp_file=
864
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
874
           my_create(fn_format(updated_fname, share->table_name,
865
875
                               "", CSN_EXT,
866
876
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
867
877
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
876
886
  This is called for an update.
877
887
  Make sure you put in code to increment the auto increment, also
878
888
  update any timestamp data. Currently auto increment is not being
879
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
889
  fixed since autoincrements have yet to be added to this table handler.
880
890
  This will be called in a table scan right before the previous ::rnd_next()
881
891
  call.
882
892
*/
883
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
893
int ha_tina::update_row(const unsigned char * old_data __attribute__((unused)),
 
894
                        unsigned char * new_data)
884
895
{
885
896
  int size;
886
897
  int rc= -1;
887
898
 
 
899
  ha_statistic_increment(&SSV::ha_update_count);
 
900
 
 
901
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
902
    table->timestamp_field->set_time();
 
903
 
888
904
  size= encode_quote(new_data);
889
905
 
890
906
  /*
891
 
    During update we mark each updating record as deleted
892
 
    (see the chain_append()) then write new one to the temporary data file.
893
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
907
    During update we mark each updating record as deleted 
 
908
    (see the chain_append()) then write new one to the temporary data file. 
 
909
    At the end of the sequence in the rnd_end() we append all non-marked
894
910
    records from the data file to the temporary data file then rename it.
895
911
    The temp_file_length is used to calculate new data file length.
896
912
  */
900
916
  if (open_update_temp_file_if_needed())
901
917
    goto err;
902
918
 
903
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
919
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
904
920
               MYF(MY_WME | MY_NABP)))
905
921
    goto err;
906
922
  temp_file_length+= size;
915
931
  Deletes a row. First the database will find the row, and then call this
916
932
  method. In the case of a table scan, the previous call to this will be
917
933
  the ::rnd_next() that found this row.
918
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
934
  The exception to this is an ORDER BY. This will cause the table handler
919
935
  to walk the table noting the positions of all rows that match a query.
920
936
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
921
937
  DESC, ASC).
922
938
*/
923
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
939
int ha_tina::delete_row(const unsigned char * buf __attribute__((unused)))
924
940
{
 
941
  ha_statistic_increment(&SSV::ha_delete_count);
925
942
 
926
943
  if (chain_append())
927
944
    return(-1);
939
956
 
940
957
/**
941
958
  @brief Initialize the data file.
942
 
 
 
959
  
943
960
  @details Compare the local version of the data file with the shared one.
944
961
  If they differ, there are some changes behind and we have to reopen
945
962
  the data file to make the changes visible.
946
 
  Call @c file_buff->init_buff() at the end to read the beginning of the
 
963
  Call @c file_buff->init_buff() at the end to read the beginning of the 
947
964
  data file into buffer.
948
 
 
 
965
  
949
966
  @retval  0  OK.
950
967
  @retval  1  There was an error.
951
968
*/
955
972
  if (local_data_file_version != share->data_file_version)
956
973
  {
957
974
    local_data_file_version= share->data_file_version;
958
 
    if (internal::my_close(data_file, MYF(0)) ||
959
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
975
    if (my_close(data_file, MYF(0)) ||
 
976
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
960
977
      return 1;
961
978
  }
962
979
  file_buff->init_buff(data_file);
968
985
  All table scans call this first.
969
986
  The order of a table scan is:
970
987
 
 
988
  ha_tina::store_lock
 
989
  ha_tina::external_lock
971
990
  ha_tina::info
972
991
  ha_tina::rnd_init
973
992
  ha_tina::extra
983
1002
  ha_tina::rnd_next
984
1003
  ha_tina::extra
985
1004
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1005
  ha_tina::external_lock
986
1006
  ha_tina::extra
987
1007
  ENUM HA_EXTRA_RESET   Reset database to after open
988
1008
 
992
1012
 
993
1013
*/
994
1014
 
995
 
int ha_tina::doStartTableScan(bool)
 
1015
int ha_tina::rnd_init(bool scan __attribute__((unused)))
996
1016
{
997
1017
  /* set buffer to the beginning of the file */
998
1018
  if (share->crashed || init_data_file())
1001
1021
  current_position= next_position= 0;
1002
1022
  stats.records= 0;
1003
1023
  records_is_known= 0;
1004
 
  chain.clear();
 
1024
  chain_ptr= chain;
1005
1025
 
1006
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1026
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1007
1027
 
1008
1028
  return(0);
1009
1029
}
1018
1038
  reserved for null count.
1019
1039
  Basically this works as a mask for which rows are nulled (compared to just
1020
1040
  empty).
1021
 
  This table Cursor doesn't do nulls and does not know the difference between
1022
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1041
  This table handler doesn't do nulls and does not know the difference between
 
1042
  NULL and "". This is ok since this table handler is for spreadsheets and
1023
1043
  they don't know about them either :)
1024
1044
*/
1025
1045
int ha_tina::rnd_next(unsigned char *buf)
1029
1049
  if (share->crashed)
1030
1050
      return(HA_ERR_CRASHED_ON_USAGE);
1031
1051
 
1032
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1052
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1033
1053
 
1034
1054
  current_position= next_position;
1035
1055
 
1053
1073
  its just a position. Look at the bdb code if you want to see a case
1054
1074
  where something other then a number is stored.
1055
1075
*/
1056
 
void ha_tina::position(const unsigned char *)
 
1076
void ha_tina::position(const unsigned char *record __attribute__((unused)))
1057
1077
{
1058
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1078
  my_store_ptr(ref, ref_length, current_position);
1059
1079
  return;
1060
1080
}
1061
1081
 
1062
1082
 
1063
1083
/*
1064
1084
  Used to fetch a row from a posiion stored with ::position().
1065
 
  internal::my_get_ptr() retrieves the data for you.
 
1085
  my_get_ptr() retrieves the data for you.
1066
1086
*/
1067
1087
 
1068
1088
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1069
1089
{
1070
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1071
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1090
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1091
  current_position= (off_t)my_get_ptr(pos,ref_length);
1072
1092
  return(find_current_row(buf));
1073
1093
}
1074
1094
 
1075
1095
/*
1076
1096
  ::info() is used to return information to the optimizer.
1077
 
  Currently this table Cursor doesn't implement most of the fields
 
1097
  Currently this table handler doesn't implement most of the fields
1078
1098
  really needed. SHOW also makes use of this data
1079
1099
*/
1080
 
int ha_tina::info(uint32_t)
 
1100
int ha_tina::info(uint32_t flag __attribute__((unused)))
1081
1101
{
1082
1102
  /* This is a lie, but you don't want the optimizer to see zero or 1 */
1083
 
  if (!records_is_known && stats.records < 2)
 
1103
  if (!records_is_known && stats.records < 2) 
1084
1104
    stats.records= 2;
1085
1105
  return(0);
1086
1106
}
1090
1110
  to the given "hole", stored in the buffer. "Valid" here means,
1091
1111
  not listed in the chain of deleted records ("holes").
1092
1112
*/
1093
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1113
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1094
1114
{
1095
 
  if (closest_hole == chain.end()) /* no more chains */
 
1115
  if (closest_hole == chain_ptr) /* no more chains */
1096
1116
    *end_pos= file_buff->end();
1097
1117
  else
1098
1118
    *end_pos= std::min(file_buff->end(),
1099
 
                       closest_hole->first);
1100
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1119
                       closest_hole->begin);
 
1120
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1101
1121
}
1102
1122
 
1103
1123
 
1107
1127
  slots to clean up all of the dead space we have collected while
1108
1128
  performing deletes/updates.
1109
1129
*/
1110
 
int ha_tina::doEndTableScan()
 
1130
int ha_tina::rnd_end()
1111
1131
{
 
1132
  char updated_fname[FN_REFLEN];
1112
1133
  off_t file_buffer_start= 0;
1113
1134
 
1114
 
  blobroot.free_root(MYF(0));
 
1135
  free_root(&blobroot, MYF(0));
1115
1136
  records_is_known= 1;
1116
1137
 
1117
 
  if (chain.size() > 0)
 
1138
  if ((chain_ptr - chain)  > 0)
1118
1139
  {
1119
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1140
    tina_set *ptr= chain;
1120
1141
 
1121
1142
    /*
1122
1143
      Re-read the beginning of a file (as the buffer should point to the
1128
1149
      The sort is needed when there were updates/deletes with random orders.
1129
1150
      It sorts so that we move the firts blocks to the beginning.
1130
1151
    */
1131
 
    sort(chain.begin(), chain.end());
 
1152
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1153
             (qsort_cmp)sort_set);
1132
1154
 
1133
1155
    off_t write_begin= 0, write_end;
1134
1156
 
1141
1163
    {
1142
1164
      bool in_hole= get_write_pos(&write_end, ptr);
1143
1165
      off_t write_length= write_end - write_begin;
1144
 
      if ((uint64_t)write_length > SIZE_MAX)
1145
 
      {
1146
 
        goto error;
1147
 
      }
1148
1166
 
1149
1167
      /* if there is something to write, write it */
1150
1168
      if (write_length)
1151
1169
      {
1152
 
        if (internal::my_write(update_temp_file,
 
1170
        if (my_write(update_temp_file, 
1153
1171
                     (unsigned char*) (file_buff->ptr() +
1154
1172
                               (write_begin - file_buff->start())),
1155
 
                     (size_t)write_length, MYF_RW))
 
1173
                     write_length, MYF_RW))
1156
1174
          goto error;
1157
1175
        temp_file_length+= write_length;
1158
1176
      }
1159
1177
      if (in_hole)
1160
1178
      {
1161
1179
        /* skip hole */
1162
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1180
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1163
1181
          file_buffer_start= file_buff->read_next();
1164
 
        write_begin= ptr->second;
1165
 
        ++ptr;
 
1182
        write_begin= ptr->end;
 
1183
        ptr++;
1166
1184
      }
1167
1185
      else
1168
1186
        write_begin= write_end;
1172
1190
 
1173
1191
    }
1174
1192
 
1175
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1176
 
        internal::my_close(update_temp_file, MYF(0)))
 
1193
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1194
        my_close(update_temp_file, MYF(0)))
1177
1195
      return(-1);
1178
1196
 
1179
1197
    share->update_file_opened= false;
1180
1198
 
1181
1199
    if (share->tina_write_opened)
1182
1200
    {
1183
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1201
      if (my_close(share->tina_write_filedes, MYF(0)))
1184
1202
        return(-1);
1185
1203
      /*
1186
1204
        Mark that the writer fd is closed, so that init_tina_writer()
1193
1211
      Close opened fildes's. Then move updated file in place
1194
1212
      of the old datafile.
1195
1213
    */
1196
 
    std::string rename_file= share->table_name;
1197
 
    rename_file.append(CSN_EXT);
1198
 
    if (internal::my_close(data_file, MYF(0)) ||
1199
 
        internal::my_rename(rename_file.c_str(),
1200
 
                            share->data_file_name.c_str(), MYF(0)))
 
1214
    if (my_close(data_file, MYF(0)) ||
 
1215
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1216
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1217
                  share->data_file_name, MYF(0)))
1201
1218
      return(-1);
1202
1219
 
1203
1220
    /* Open the file again */
1204
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1221
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1205
1222
      return(-1);
1206
1223
    /*
1207
 
      As we reopened the data file, increase share->data_file_version
1208
 
      in order to force other threads waiting on a table lock and
 
1224
      As we reopened the data file, increase share->data_file_version 
 
1225
      in order to force other threads waiting on a table lock and  
1209
1226
      have already opened the table to reopen the data file.
1210
1227
      That makes the latest changes become visible to them.
1211
 
      Update local_data_file_version as no need to reopen it in the
 
1228
      Update local_data_file_version as no need to reopen it in the 
1212
1229
      current thread.
1213
1230
    */
1214
1231
    share->data_file_version++;
1219
1236
      Here we record this fact to the meta-file.
1220
1237
    */
1221
1238
    (void)write_meta_file(share->meta_file, share->rows_recorded, false);
1222
 
    /*
1223
 
      Update local_saved_data_file_length with the real length of the
 
1239
    /* 
 
1240
      Update local_saved_data_file_length with the real length of the 
1224
1241
      data file.
1225
1242
    */
1226
1243
    local_saved_data_file_length= temp_file_length;
1228
1245
 
1229
1246
  return(0);
1230
1247
error:
1231
 
  internal::my_close(update_temp_file, MYF(0));
 
1248
  my_close(update_temp_file, MYF(0));
1232
1249
  share->update_file_opened= false;
1233
1250
  return(-1);
1234
1251
}
1235
1252
 
1236
1253
 
1237
1254
/*
 
1255
  Repair CSV table in the case, it is crashed.
 
1256
 
 
1257
  SYNOPSIS
 
1258
    repair()
 
1259
    thd         The thread, performing repair
 
1260
    check_opt   The options for repair. We do not use it currently.
 
1261
 
 
1262
  DESCRIPTION
 
1263
    If the file is empty, change # of rows in the file and complete recovery.
 
1264
    Otherwise, scan the table looking for bad rows. If none were found,
 
1265
    we mark file as a good one and return. If a bad row was encountered,
 
1266
    we truncate the datafile up to the last good row.
 
1267
 
 
1268
   TODO: Make repair more clever - it should try to recover subsequent
 
1269
         rows (after the first bad one) as well.
 
1270
*/
 
1271
 
 
1272
int ha_tina::repair(THD* thd,
 
1273
                    HA_CHECK_OPT* check_opt __attribute__((unused)))
 
1274
{
 
1275
  char repaired_fname[FN_REFLEN];
 
1276
  unsigned char *buf;
 
1277
  File repair_file;
 
1278
  int rc;
 
1279
  ha_rows rows_repaired= 0;
 
1280
  off_t write_begin= 0, write_end;
 
1281
 
 
1282
  /* empty file */
 
1283
  if (!share->saved_data_file_length)
 
1284
  {
 
1285
    share->rows_recorded= 0;
 
1286
    goto end;
 
1287
  }
 
1288
 
 
1289
  /* Don't assert in field::val() functions */
 
1290
  table->use_all_columns();
 
1291
  if (!(buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1292
    return(HA_ERR_OUT_OF_MEM);
 
1293
 
 
1294
  /* position buffer to the start of the file */
 
1295
  if (init_data_file())
 
1296
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1297
 
 
1298
  /*
 
1299
    Local_saved_data_file_length is initialized during the lock phase.
 
1300
    Sometimes this is not getting executed before ::repair (e.g. for
 
1301
    the log tables). We set it manually here.
 
1302
  */
 
1303
  local_saved_data_file_length= share->saved_data_file_length;
 
1304
  /* set current position to the beginning of the file */
 
1305
  current_position= next_position= 0;
 
1306
 
 
1307
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1308
 
 
1309
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1310
  while (!(rc= find_current_row(buf)))
 
1311
  {
 
1312
    thd_inc_row_count(thd);
 
1313
    rows_repaired++;
 
1314
    current_position= next_position;
 
1315
  }
 
1316
 
 
1317
  free_root(&blobroot, MYF(0));
 
1318
 
 
1319
  free((char*)buf);
 
1320
 
 
1321
  if (rc == HA_ERR_END_OF_FILE)
 
1322
  {
 
1323
    /*
 
1324
      All rows were read ok until end of file, the file does not need repair.
 
1325
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1326
      to the current amount of rows.
 
1327
    */
 
1328
    share->rows_recorded= rows_repaired;
 
1329
    goto end;
 
1330
  }
 
1331
 
 
1332
  /*
 
1333
    Otherwise we've encountered a bad row => repair is needed.
 
1334
    Let us create a temporary file.
 
1335
  */
 
1336
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1337
                                        "", CSN_EXT,
 
1338
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1339
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1340
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1341
 
 
1342
  file_buff->init_buff(data_file);
 
1343
 
 
1344
 
 
1345
  /* we just truncated the file up to the first bad row. update rows count. */
 
1346
  share->rows_recorded= rows_repaired;
 
1347
 
 
1348
  /* write repaired file */
 
1349
  while (1)
 
1350
  {
 
1351
    write_end= std::min(file_buff->end(), current_position);
 
1352
    if ((write_end - write_begin) &&
 
1353
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1354
                  write_end - write_begin, MYF_RW)))
 
1355
      return(-1);
 
1356
 
 
1357
    write_begin= write_end;
 
1358
    if (write_end== current_position)
 
1359
      break;
 
1360
    else
 
1361
      file_buff->read_next(); /* shift the buffer */
 
1362
  }
 
1363
 
 
1364
  /*
 
1365
    Close the files and rename repaired file to the datafile.
 
1366
    We have to close the files, as on Windows one cannot rename
 
1367
    a file, which descriptor is still open. EACCES will be returned
 
1368
    when trying to delete the "to"-file in my_rename().
 
1369
  */
 
1370
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1371
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1372
    return(-1);
 
1373
 
 
1374
  /* Open the file again, it should now be repaired */
 
1375
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1376
                          MYF(0))) == -1)
 
1377
     return(-1);
 
1378
 
 
1379
  /* Set new file size. The file size will be updated by ::update_status() */
 
1380
  local_saved_data_file_length= (size_t) current_position;
 
1381
 
 
1382
end:
 
1383
  share->crashed= false;
 
1384
  return(HA_ADMIN_OK);
 
1385
}
 
1386
 
 
1387
/*
1238
1388
  DELETE without WHERE calls this
1239
1389
*/
1240
1390
 
1243
1393
  int rc;
1244
1394
 
1245
1395
  if (!records_is_known)
1246
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1396
    return(my_errno=HA_ERR_WRONG_COMMAND);
1247
1397
 
1248
1398
  if (!share->tina_write_opened)
1249
1399
    if (init_tina_writer())
1262
1412
}
1263
1413
 
1264
1414
/*
 
1415
  Called by the database to lock the table. Keep in mind that this
 
1416
  is an internal lock.
 
1417
*/
 
1418
THR_LOCK_DATA **ha_tina::store_lock(THD *thd __attribute__((unused)),
 
1419
                                    THR_LOCK_DATA **to,
 
1420
                                    enum thr_lock_type lock_type)
 
1421
{
 
1422
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1423
    lock.type=lock_type;
 
1424
  *to++= &lock;
 
1425
  return to;
 
1426
}
 
1427
 
 
1428
/* 
1265
1429
  Create a table. You do not want to leave the table open after a call to
1266
1430
  this (the database will call ::open() if it needs to).
1267
1431
*/
1268
1432
 
1269
 
int Tina::doCreateTable(Session &session,
1270
 
                        Table& table_arg,
1271
 
                        const drizzled::TableIdentifier &identifier,
1272
 
                        drizzled::message::Table &create_proto)
 
1433
int ha_tina::create(const char *name, Table *table_arg,
 
1434
                    HA_CREATE_INFO *create_info __attribute__((unused)))
1273
1435
{
1274
1436
  char name_buff[FN_REFLEN];
1275
 
  int create_file;
 
1437
  File create_file;
1276
1438
 
1277
1439
  /*
1278
1440
    check columns
1279
1441
  */
1280
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
 
       iter != fields.end();
1283
 
       iter++)
 
1442
  for (Field **field= table_arg->s->field; *field; field++)
1284
1443
  {
1285
 
    if (not *iter) // Historical legacy for NULL array end.
1286
 
      continue;
1287
 
 
1288
 
    if ((*iter)->real_maybe_null())
 
1444
    if ((*field)->real_maybe_null())
1289
1445
    {
1290
1446
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1447
      return(HA_ERR_UNSUPPORTED);
1292
1448
    }
1293
1449
  }
1294
 
 
1295
 
 
1296
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1450
  
 
1451
 
 
1452
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
 
1453
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1454
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1299
1455
    return(-1);
1300
1456
 
1301
1457
  write_meta_file(create_file, 0, false);
1302
 
  internal::my_close(create_file, MYF(0));
 
1458
  my_close(create_file, MYF(0));
1303
1459
 
1304
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1460
  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
 
1461
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1462
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1307
1463
    return(-1);
1308
1464
 
1309
 
  internal::my_close(create_file, MYF(0));
1310
 
 
1311
 
  session.storeTableMessage(identifier, create_proto);
1312
 
 
1313
 
  return 0;
1314
 
}
1315
 
 
1316
 
 
1317
 
DRIZZLE_DECLARE_PLUGIN
1318
 
{
1319
 
  DRIZZLE_VERSION_ID,
 
1465
  my_close(create_file, MYF(0));
 
1466
 
 
1467
  return(0);
 
1468
}
 
1469
 
 
1470
int ha_tina::check(THD* thd,
 
1471
                   HA_CHECK_OPT* check_opt __attribute__((unused)))
 
1472
{
 
1473
  int rc= 0;
 
1474
  unsigned char *buf;
 
1475
  const char *old_proc_info;
 
1476
  ha_rows count= share->rows_recorded;
 
1477
 
 
1478
  old_proc_info= get_thd_proc_info(thd);
 
1479
  set_thd_proc_info(thd, "Checking table");
 
1480
  if (!(buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1481
    return(HA_ERR_OUT_OF_MEM);
 
1482
 
 
1483
  /* position buffer to the start of the file */
 
1484
   if (init_data_file())
 
1485
     return(HA_ERR_CRASHED);
 
1486
 
 
1487
  /*
 
1488
    Local_saved_data_file_length is initialized during the lock phase.
 
1489
    Check does not use store_lock in certain cases. So, we set it
 
1490
    manually here.
 
1491
  */
 
1492
  local_saved_data_file_length= share->saved_data_file_length;
 
1493
  /* set current position to the beginning of the file */
 
1494
  current_position= next_position= 0;
 
1495
 
 
1496
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1497
 
 
1498
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1499
  while (!(rc= find_current_row(buf)))
 
1500
  {
 
1501
    thd_inc_row_count(thd);
 
1502
    count--;
 
1503
    current_position= next_position;
 
1504
  }
 
1505
  
 
1506
  free_root(&blobroot, MYF(0));
 
1507
 
 
1508
  free((char*)buf);
 
1509
  set_thd_proc_info(thd, old_proc_info);
 
1510
 
 
1511
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1512
  {
 
1513
    share->crashed= true;
 
1514
    return(HA_ADMIN_CORRUPT);
 
1515
  }
 
1516
  else
 
1517
    return(HA_ADMIN_OK);
 
1518
}
 
1519
 
 
1520
 
 
1521
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *info __attribute__((unused)),
 
1522
                                         uint32_t table_changes __attribute__((unused)))
 
1523
{
 
1524
  return COMPATIBLE_DATA_YES;
 
1525
}
 
1526
 
 
1527
mysql_declare_plugin(csv)
 
1528
{
 
1529
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
1320
1530
  "CSV",
1321
1531
  "1.0",
1322
1532
  "Brian Aker, MySQL AB",
1323
1533
  "CSV storage engine",
1324
1534
  PLUGIN_LICENSE_GPL,
1325
1535
  tina_init_func, /* Plugin Init */
 
1536
  tina_done_func, /* Plugin Deinit */
 
1537
  NULL,                       /* status variables                */
1326
1538
  NULL,                       /* system variables                */
1327
1539
  NULL                        /* config options                  */
1328
1540
}
1329
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1541
mysql_declare_plugin_end;
1330
1542