~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/csv/ha_tina.cc

Reverted 1103

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
 
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
 
46
#include <drizzled/field/timestamp.h>
46
47
#include <drizzled/error.h>
47
48
#include <drizzled/table.h>
48
49
#include <drizzled/session.h>
49
 
#include "drizzled/internal/my_sys.h"
50
50
 
51
51
#include "ha_tina.h"
52
52
 
53
 
#include <fcntl.h>
54
 
 
55
 
#include <algorithm>
56
 
#include <vector>
57
53
#include <string>
58
 
#include <map>
59
54
 
60
55
using namespace std;
61
 
using namespace drizzled;
 
56
 
 
57
static const string engine_name("CSV");
62
58
 
63
59
/*
64
60
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
74
70
#define CSM_EXT ".CSM"               // Meta file
75
71
 
76
72
 
77
 
static int read_meta_file(int meta_file, ha_rows *rows);
78
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
73
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
74
static int free_share(TINA_SHARE *share);
 
75
static int read_meta_file(File meta_file, ha_rows *rows);
 
76
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
77
 
 
78
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
79
extern "C" void tina_update_status(void* param);
 
80
extern "C" bool tina_check_status(void* param);
79
81
 
80
82
/* Stuff for shares */
81
83
pthread_mutex_t tina_mutex;
 
84
static HASH tina_open_tables;
82
85
 
83
86
/*****************************************************************************
84
87
 ** TINA tables
85
88
 *****************************************************************************/
86
89
 
87
90
/*
 
91
  Used for sorting chains with qsort().
 
92
*/
 
93
static int sort_set (tina_set *a, tina_set *b)
 
94
{
 
95
  /*
 
96
    We assume that intervals do not intersect. So, it is enought to compare
 
97
    any two points. Here we take start of intervals for comparison.
 
98
  */
 
99
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
100
}
 
101
 
 
102
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
 
103
{
 
104
  *length=share->table_name_length;
 
105
  return (unsigned char*) share->table_name;
 
106
}
 
107
 
 
108
 
 
109
/*
88
110
  If frm_error() is called in table.cc this is called to find out what file
89
 
  extensions exist for this Cursor.
 
111
  extensions exist for this handler.
90
112
*/
91
113
static const char *ha_tina_exts[] = {
92
114
  CSV_EXT,
94
116
  NULL
95
117
};
96
118
 
97
 
class Tina : public drizzled::plugin::StorageEngine
 
119
class Tina : public StorageEngine
98
120
{
99
 
  typedef std::map<string, TinaShare*> TinaMap;
100
 
  TinaMap tina_open_tables;
101
121
public:
102
122
  Tina(const string& name_arg)
103
 
   : drizzled::plugin::StorageEngine(name_arg,
104
 
                                     HTON_TEMPORARY_ONLY |
105
 
                                     HTON_NO_AUTO_INCREMENT |
106
 
                                     HTON_SKIP_STORE_LOCK),
107
 
    tina_open_tables()
108
 
  {}
109
 
  virtual ~Tina()
110
 
  {
111
 
    pthread_mutex_destroy(&tina_mutex);
112
 
  }
113
 
 
114
 
  virtual Cursor *create(Table &table)
115
 
  {
116
 
    return new ha_tina(*this, table);
 
123
   : StorageEngine(name_arg, HTON_CAN_RECREATE | HTON_TEMPORARY_ONLY | HTON_FILE_BASED) {}
 
124
  virtual handler *create(TableShare *table,
 
125
                          MEM_ROOT *mem_root)
 
126
  {
 
127
    return new (mem_root) ha_tina(this, table);
117
128
  }
118
129
 
119
130
  const char **bas_ext() const {
120
131
    return ha_tina_exts;
121
132
  }
122
133
 
123
 
  int doCreateTable(Session &,
124
 
                    Table &table_arg,
125
 
                    const drizzled::identifier::Table &identifier,
126
 
                    drizzled::message::Table&);
127
 
 
128
 
  int doGetTableDefinition(Session& session,
129
 
                           const drizzled::identifier::Table &identifier,
130
 
                           drizzled::message::Table &table_message);
131
 
 
132
 
  int doDropTable(Session&, const drizzled::identifier::Table &identifier);
133
 
  TinaShare *findOpenTable(const string table_name);
134
 
  void addOpenTable(const string &table_name, TinaShare *);
135
 
  void deleteOpenTable(const string &table_name);
136
 
 
137
 
 
138
 
  uint32_t max_keys()          const { return 0; }
139
 
  uint32_t max_key_parts()     const { return 0; }
140
 
  uint32_t max_key_length()    const { return 0; }
141
 
  bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
 
  int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
143
 
 
144
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
 
                             const drizzled::identifier::Schema &schema_identifier,
146
 
                             drizzled::identifier::Table::vector &set_of_identifiers);
 
134
  int createTableImplementation(Session *, const char *table_name,
 
135
                                Table *table_arg,
 
136
                                HA_CREATE_INFO *, drizzled::message::Table*);
 
137
 
147
138
};
148
139
 
149
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
 
                                 const drizzled::identifier::Schema&,
151
 
                                 drizzled::identifier::Table::vector&)
152
 
{
153
 
}
154
 
 
155
 
int Tina::doRenameTable(Session &session,
156
 
                        const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
157
 
{
158
 
  int error= 0;
159
 
  for (const char **ext= bas_ext(); *ext ; ext++)
160
 
  {
161
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
162
 
    {
163
 
      if ((error=errno) != ENOENT)
164
 
        break;
165
 
      error= 0;
166
 
    }
167
 
  }
168
 
 
169
 
  session.getMessageCache().renameTableMessage(from, to);
170
 
 
171
 
  return error;
172
 
}
173
 
 
174
 
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
175
 
{
176
 
  return session.getMessageCache().doesTableMessageExist(identifier);
177
 
}
178
 
 
179
 
 
180
 
int Tina::doDropTable(Session &session,
181
 
                      const drizzled::identifier::Table &identifier)
182
 
{
183
 
  int error= 0;
184
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
185
 
 
186
 
  for (const char **ext= bas_ext(); *ext ; ext++)
187
 
  {
188
 
    std::string full_name= identifier.getPath();
189
 
    full_name.append(*ext);
190
 
 
191
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
192
 
    {
193
 
      if ((error= errno) != ENOENT)
194
 
        break;
195
 
    }
196
 
    else
197
 
    {
198
 
      enoent_or_zero= 0;                        // No error for ENOENT
199
 
    }
200
 
    error= enoent_or_zero;
201
 
  }
202
 
 
203
 
  session.getMessageCache().removeTableMessage(identifier);
204
 
 
205
 
  return error;
206
 
}
207
 
 
208
 
TinaShare *Tina::findOpenTable(const string table_name)
209
 
{
210
 
  TinaMap::iterator find_iter=
211
 
    tina_open_tables.find(table_name);
212
 
 
213
 
  if (find_iter != tina_open_tables.end())
214
 
    return (*find_iter).second;
215
 
  else
216
 
    return NULL;
217
 
}
218
 
 
219
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
220
 
{
221
 
  tina_open_tables[table_name]= share;
222
 
}
223
 
 
224
 
void Tina::deleteOpenTable(const string &table_name)
225
 
{
226
 
  tina_open_tables.erase(table_name);
227
 
}
228
 
 
229
 
 
230
 
int Tina::doGetTableDefinition(Session &session,
231
 
                               const drizzled::identifier::Table &identifier,
232
 
                               drizzled::message::Table &table_message)
233
 
{
234
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
235
 
    return EEXIST;
236
 
 
237
 
  return ENOENT;
238
 
}
239
 
 
240
 
 
241
140
static Tina *tina_engine= NULL;
242
141
 
243
 
static int tina_init_func(drizzled::module::Context &context)
 
142
static int tina_init_func(PluginRegistry &registry)
244
143
{
245
144
 
246
 
  tina_engine= new Tina("CSV");
247
 
  context.add(tina_engine);
 
145
  tina_engine= new Tina(engine_name);
 
146
  registry.add(tina_engine);
248
147
 
249
148
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
250
 
  return 0;
251
 
}
252
 
 
253
 
 
254
 
 
255
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
256
 
  table_name(table_name_arg),
257
 
  data_file_name(table_name_arg),
258
 
  use_count(0),
259
 
  saved_data_file_length(0),
260
 
  update_file_opened(false),
261
 
  tina_write_opened(false),
262
 
  crashed(false),
263
 
  rows_recorded(0),
264
 
  data_file_version(0)
265
 
{
266
 
  data_file_name.append(CSV_EXT);
267
 
}
268
 
 
269
 
TinaShare::~TinaShare()
270
 
{
271
 
  pthread_mutex_destroy(&mutex);
272
 
}
 
149
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
150
                   (hash_get_key) tina_get_key,0,0);
 
151
  return 0;
 
152
}
 
153
 
 
154
static int tina_done_func(PluginRegistry &registry)
 
155
{
 
156
  registry.remove(tina_engine);
 
157
  delete tina_engine;
 
158
 
 
159
  hash_free(&tina_open_tables);
 
160
  pthread_mutex_destroy(&tina_mutex);
 
161
 
 
162
  return 0;
 
163
}
 
164
 
273
165
 
274
166
/*
275
167
  Simple lock controls.
276
168
*/
277
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
169
static TINA_SHARE *get_share(const char *table_name, Table *)
278
170
{
279
 
  pthread_mutex_lock(&tina_mutex);
280
 
 
281
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
282
 
  share= a_tina->findOpenTable(table_name);
283
 
 
284
 
  std::string meta_file_name;
 
171
  TINA_SHARE *share;
 
172
  char meta_file_name[FN_REFLEN];
285
173
  struct stat file_stat;
 
174
  char *tmp_name;
 
175
  uint32_t length;
 
176
 
 
177
  pthread_mutex_lock(&tina_mutex);
 
178
  length=(uint) strlen(table_name);
286
179
 
287
180
  /*
288
181
    If share is not present in the hash, create a new share and
289
182
    initialize its members.
290
183
  */
291
 
  if (! share)
 
184
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
185
                                        (unsigned char*) table_name,
 
186
                                       length)))
292
187
  {
293
 
    share= new TinaShare(table_name);
294
 
 
295
 
    if (share == NULL)
296
 
    {
297
 
      pthread_mutex_unlock(&tina_mutex);
298
 
      return NULL;
299
 
    }
300
 
 
301
 
    meta_file_name.assign(table_name);
302
 
    meta_file_name.append(CSM_EXT);
303
 
 
304
 
    if (stat(share->data_file_name.c_str(), &file_stat))
305
 
    {
306
 
      pthread_mutex_unlock(&tina_mutex);
307
 
      delete share;
308
 
      return NULL;
309
 
    }
310
 
  
 
188
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
189
                         &share, sizeof(*share),
 
190
                         &tmp_name, length+1,
 
191
                         NULL))
 
192
    {
 
193
      pthread_mutex_unlock(&tina_mutex);
 
194
      return NULL;
 
195
    }
 
196
 
 
197
    share->use_count= 0;
 
198
    share->table_name_length= length;
 
199
    share->table_name= tmp_name;
 
200
    share->crashed= false;
 
201
    share->rows_recorded= 0;
 
202
    share->update_file_opened= false;
 
203
    share->tina_write_opened= false;
 
204
    share->data_file_version= 0;
 
205
    strcpy(share->table_name, table_name);
 
206
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
207
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
208
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
209
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
210
 
 
211
    if (stat(share->data_file_name, &file_stat))
 
212
      goto error;
311
213
    share->saved_data_file_length= file_stat.st_size;
312
214
 
313
 
    a_tina->addOpenTable(share->table_name, share);
314
 
 
 
215
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
216
      goto error;
 
217
    thr_lock_init(&share->lock);
315
218
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
316
219
 
317
220
    /*
320
223
      Usually this will result in auto-repair, and we will get a good
321
224
      meta-file in the end.
322
225
    */
323
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
324
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
226
    if ((share->meta_file= my_open(meta_file_name,
 
227
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
325
228
      share->crashed= true;
326
229
 
327
230
    /*
335
238
  pthread_mutex_unlock(&tina_mutex);
336
239
 
337
240
  return share;
 
241
 
 
242
error:
 
243
  pthread_mutex_unlock(&tina_mutex);
 
244
  free((unsigned char*) share);
 
245
 
 
246
  return NULL;
338
247
}
339
248
 
340
249
 
357
266
    non-zero - error occurred
358
267
*/
359
268
 
360
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
269
static int read_meta_file(File meta_file, ha_rows *rows)
361
270
{
362
271
  unsigned char meta_buffer[META_BUFFER_SIZE];
363
272
  unsigned char *ptr= meta_buffer;
364
273
 
365
274
  lseek(meta_file, 0, SEEK_SET);
366
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
275
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
367
276
      != META_BUFFER_SIZE)
368
277
    return(HA_ERR_CRASHED_ON_USAGE);
369
278
 
385
294
      ((bool)(*ptr)== true))
386
295
    return(HA_ERR_CRASHED_ON_USAGE);
387
296
 
388
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
297
  my_sync(meta_file, MYF(MY_WME));
389
298
 
390
299
  return(0);
391
300
}
410
319
    non-zero - error occurred
411
320
*/
412
321
 
413
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
322
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
414
323
{
415
324
  unsigned char meta_buffer[META_BUFFER_SIZE];
416
325
  unsigned char *ptr= meta_buffer;
430
339
  *ptr= (unsigned char)dirty;
431
340
 
432
341
  lseek(meta_file, 0, SEEK_SET);
433
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
342
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
434
343
      != META_BUFFER_SIZE)
435
344
    return(-1);
436
345
 
437
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
346
  my_sync(meta_file, MYF(MY_WME));
438
347
 
439
348
  return(0);
440
349
}
441
350
 
 
351
bool ha_tina::check_and_repair(Session *session)
 
352
{
 
353
  HA_CHECK_OPT check_opt;
 
354
 
 
355
  check_opt.init();
 
356
 
 
357
  return(repair(session, &check_opt));
 
358
}
 
359
 
 
360
 
442
361
int ha_tina::init_tina_writer()
443
362
{
444
363
  /*
449
368
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
450
369
 
451
370
  if ((share->tina_write_filedes=
452
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
371
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
453
372
  {
454
373
    share->crashed= true;
455
374
    return(1);
460
379
}
461
380
 
462
381
 
 
382
bool ha_tina::is_crashed() const
 
383
{
 
384
  return(share->crashed);
 
385
}
 
386
 
463
387
/*
464
388
  Free lock controls.
465
389
*/
466
 
int ha_tina::free_share()
 
390
static int free_share(TINA_SHARE *share)
467
391
{
468
392
  pthread_mutex_lock(&tina_mutex);
469
393
  int result_code= 0;
471
395
    /* Write the meta file. Mark it as crashed if needed. */
472
396
    (void)write_meta_file(share->meta_file, share->rows_recorded,
473
397
                          share->crashed ? true :false);
474
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
398
    if (my_close(share->meta_file, MYF(0)))
475
399
      result_code= 1;
476
400
    if (share->tina_write_opened)
477
401
    {
478
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
402
      if (my_close(share->tina_write_filedes, MYF(0)))
479
403
        result_code= 1;
480
404
      share->tina_write_opened= false;
481
405
    }
482
406
 
483
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
484
 
    a_tina->deleteOpenTable(share->table_name);
485
 
    delete share;
 
407
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
408
    thr_lock_delete(&share->lock);
 
409
    pthread_mutex_destroy(&share->mutex);
 
410
    free((unsigned char*) share);
486
411
  }
487
412
  pthread_mutex_unlock(&tina_mutex);
488
413
 
529
454
 
530
455
 
531
456
 
532
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
533
 
  :Cursor(engine_arg, table_arg),
 
457
ha_tina::ha_tina(StorageEngine *engine_arg, TableShare *table_arg)
 
458
  :handler(engine_arg, table_arg),
534
459
  /*
535
 
    These definitions are found in Cursor.h
 
460
    These definitions are found in handler.h
536
461
    They are not probably completely right.
537
462
  */
538
463
  current_position(0), next_position(0), local_saved_data_file_length(0),
539
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
464
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
465
  local_data_file_version(0), records_is_known(0)
540
466
{
541
467
  /* Set our original buffers from pre-allocated memory */
542
468
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
469
  chain= chain_buffer;
543
470
  file_buff= new Transparent_file();
544
471
}
545
472
 
556
483
 
557
484
  buffer.length(0);
558
485
 
559
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
486
  for (Field **field=table->field ; *field ; field++)
560
487
  {
561
488
    const char *ptr;
562
489
    const char *end_ptr;
596
523
        {
597
524
          buffer.append('\\');
598
525
          buffer.append('"');
599
 
          (void) *ptr++;
 
526
          *ptr++;
600
527
        }
601
528
        else if (*ptr == '\r')
602
529
        {
603
530
          buffer.append('\\');
604
531
          buffer.append('r');
605
 
          (void) *ptr++;
 
532
          *ptr++;
606
533
        }
607
534
        else if (*ptr == '\\')
608
535
        {
609
536
          buffer.append('\\');
610
537
          buffer.append('\\');
611
 
          (void) *ptr++;
 
538
          *ptr++;
612
539
        }
613
540
        else if (*ptr == '\n')
614
541
        {
615
542
          buffer.append('\\');
616
543
          buffer.append('n');
617
 
          (void) *ptr++;
 
544
          *ptr++;
618
545
        }
619
546
        else
620
547
          buffer.append(*ptr++);
644
571
*/
645
572
int ha_tina::chain_append()
646
573
{
647
 
  if (chain.size() > 0 && chain.back().second == current_position)
648
 
    chain.back().second= next_position;
 
574
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
575
    (chain_ptr -1)->end= next_position;
649
576
  else
650
 
    chain.push_back(make_pair(current_position, next_position));
 
577
  {
 
578
    /* We set up for the next position */
 
579
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
580
    {
 
581
      off_t location= chain_ptr - chain;
 
582
      chain_size += DEFAULT_CHAIN_LENGTH;
 
583
      if (chain_alloced)
 
584
      {
 
585
        if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
 
586
          return -1;
 
587
      }
 
588
      else
 
589
      {
 
590
        tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
 
591
        if (ptr == NULL)
 
592
          return -1;
 
593
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
594
        chain= ptr;
 
595
        chain_alloced++;
 
596
      }
 
597
      chain_ptr= chain + location;
 
598
    }
 
599
    chain_ptr->begin= current_position;
 
600
    chain_ptr->end= next_position;
 
601
    chain_ptr++;
 
602
  }
 
603
 
651
604
  return 0;
652
605
}
653
606
 
661
614
  int eoln_len;
662
615
  int error;
663
616
 
664
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
617
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
665
618
 
666
619
  /*
667
620
    We do not read further then local_saved_data_file_length in order
674
627
 
675
628
  error= HA_ERR_CRASHED_ON_USAGE;
676
629
 
677
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
630
  memset(buf, 0, table->s->null_bytes);
678
631
 
679
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
632
  for (Field **field=table->field ; *field ; field++)
680
633
  {
681
634
    char curr_char;
682
635
 
745
698
    {
746
699
      /* This masks a bug in the logic for a SELECT * */
747
700
      (*field)->setWriteSet();
748
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
749
 
      {
 
701
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
702
                          CHECK_FIELD_WARN))
750
703
        goto err;
751
 
      }
752
704
 
753
705
      if ((*field)->flags & BLOB_FLAG)
754
706
      {
761
713
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
762
714
        if (src)
763
715
        {
764
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
 
716
          tgt= (unsigned char*) alloc_root(&blobroot, length);
765
717
          memmove(tgt, src, length);
766
718
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
767
719
        }
777
729
}
778
730
 
779
731
/*
 
732
  Three functions below are needed to enable concurrent insert functionality
 
733
  for CSV engine. For more details see mysys/thr_lock.c
 
734
*/
 
735
 
 
736
void tina_get_status(void* param, int)
 
737
{
 
738
  ha_tina *tina= (ha_tina*) param;
 
739
  tina->get_status();
 
740
}
 
741
 
 
742
void tina_update_status(void* param)
 
743
{
 
744
  ha_tina *tina= (ha_tina*) param;
 
745
  tina->update_status();
 
746
}
 
747
 
 
748
/* this should exist and return 0 for concurrent insert to work */
 
749
bool tina_check_status(void *)
 
750
{
 
751
  return 0;
 
752
}
 
753
 
 
754
/*
 
755
  Save the state of the table
 
756
 
 
757
  SYNOPSIS
 
758
    get_status()
 
759
 
 
760
  DESCRIPTION
 
761
    This function is used to retrieve the file length. During the lock
 
762
    phase of concurrent insert. For more details see comment to
 
763
    ha_tina::update_status below.
 
764
*/
 
765
 
 
766
void ha_tina::get_status()
 
767
{
 
768
  local_saved_data_file_length= share->saved_data_file_length;
 
769
}
 
770
 
 
771
 
 
772
/*
 
773
  Correct the state of the table. Called by unlock routines
 
774
  before the write lock is released.
 
775
 
 
776
  SYNOPSIS
 
777
    update_status()
 
778
 
 
779
  DESCRIPTION
 
780
    When we employ concurrent insert lock, we save current length of the file
 
781
    during the lock phase. We do not read further saved value, as we don't
 
782
    want to interfere with undergoing concurrent insert. Writers update file
 
783
    length info during unlock with update_status().
 
784
 
 
785
  NOTE
 
786
    For log tables concurrent insert works different. The reason is that
 
787
    log tables are always opened and locked. And as they do not unlock
 
788
    tables, the file length after writes should be updated in a different
 
789
    way.
 
790
*/
 
791
 
 
792
void ha_tina::update_status()
 
793
{
 
794
  /* correct local_saved_data_file_length for writers */
 
795
  share->saved_data_file_length= local_saved_data_file_length;
 
796
}
 
797
 
 
798
 
 
799
/*
780
800
  Open a database file. Keep in mind that tables are caches, so
781
801
  this will not be called for every request. Any sort of positions
782
802
  that need to be reset should be kept in the ::extra() call.
783
803
*/
784
 
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
 
804
int ha_tina::open(const char *name, int, uint32_t open_options)
785
805
{
786
 
  if (not (share= get_share(identifier.getPath().c_str())))
 
806
  if (!(share= get_share(name, table)))
787
807
    return(ENOENT);
788
808
 
789
 
  if (share->crashed)
 
809
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
790
810
  {
791
 
    free_share();
 
811
    free_share(share);
792
812
    return(HA_ERR_CRASHED_ON_USAGE);
793
813
  }
794
814
 
795
815
  local_data_file_version= share->data_file_version;
796
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
816
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
797
817
    return(0);
798
818
 
799
819
  /*
800
 
    Init locking. Pass Cursor object to the locking routines,
 
820
    Init locking. Pass handler object to the locking routines,
801
821
    so that they could save/update local_saved_data_file_length value
802
822
    during locking. This is needed to enable concurrent inserts.
803
823
  */
 
824
  thr_lock_data_init(&share->lock, &lock, (void*) this);
804
825
  ref_length=sizeof(off_t);
805
826
 
 
827
  share->lock.get_status= tina_get_status;
 
828
  share->lock.update_status= tina_update_status;
 
829
  share->lock.check_status= tina_check_status;
 
830
 
806
831
  return(0);
807
832
}
808
833
 
 
834
 
809
835
/*
810
836
  Close a database file. We remove ourselves from the shared strucutre.
811
837
  If it is empty we destroy it.
813
839
int ha_tina::close(void)
814
840
{
815
841
  int rc= 0;
816
 
  rc= internal::my_close(data_file, MYF(0));
817
 
  return(free_share() || rc);
 
842
  rc= my_close(data_file, MYF(0));
 
843
  return(free_share(share) || rc);
818
844
}
819
845
 
820
846
/*
821
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
847
  This is an INSERT. At the moment this handler just seeks to the end
822
848
  of the file and appends the data. In an error case it really should
823
849
  just truncate to the original position (this is not done yet).
824
850
*/
825
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
851
int ha_tina::write_row(unsigned char * buf)
826
852
{
827
853
  int size;
828
854
 
829
855
  if (share->crashed)
830
856
      return(HA_ERR_CRASHED_ON_USAGE);
831
857
 
 
858
  ha_statistic_increment(&SSV::ha_write_count);
 
859
 
832
860
  size= encode_quote(buf);
833
861
 
834
862
  if (!share->tina_write_opened)
836
864
      return(-1);
837
865
 
838
866
   /* use pwrite, as concurrent reader could have changed the position */
839
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
867
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
840
868
               MYF(MY_WME | MY_NABP)))
841
869
    return(-1);
842
870
 
861
889
  if (!share->update_file_opened)
862
890
  {
863
891
    if ((update_temp_file=
864
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
892
           my_create(fn_format(updated_fname, share->table_name,
865
893
                               "", CSN_EXT,
866
894
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
867
895
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
876
904
  This is called for an update.
877
905
  Make sure you put in code to increment the auto increment, also
878
906
  update any timestamp data. Currently auto increment is not being
879
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
907
  fixed since autoincrements have yet to be added to this table handler.
880
908
  This will be called in a table scan right before the previous ::rnd_next()
881
909
  call.
882
910
*/
883
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
911
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
884
912
{
885
913
  int size;
886
914
  int rc= -1;
887
915
 
 
916
  ha_statistic_increment(&SSV::ha_update_count);
 
917
 
 
918
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
919
    table->timestamp_field->set_time();
 
920
 
888
921
  size= encode_quote(new_data);
889
922
 
890
923
  /*
891
924
    During update we mark each updating record as deleted
892
925
    (see the chain_append()) then write new one to the temporary data file.
893
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
926
    At the end of the sequence in the rnd_end() we append all non-marked
894
927
    records from the data file to the temporary data file then rename it.
895
928
    The temp_file_length is used to calculate new data file length.
896
929
  */
900
933
  if (open_update_temp_file_if_needed())
901
934
    goto err;
902
935
 
903
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
936
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
904
937
               MYF(MY_WME | MY_NABP)))
905
938
    goto err;
906
939
  temp_file_length+= size;
915
948
  Deletes a row. First the database will find the row, and then call this
916
949
  method. In the case of a table scan, the previous call to this will be
917
950
  the ::rnd_next() that found this row.
918
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
951
  The exception to this is an ORDER BY. This will cause the table handler
919
952
  to walk the table noting the positions of all rows that match a query.
920
953
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
921
954
  DESC, ASC).
922
955
*/
923
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
956
int ha_tina::delete_row(const unsigned char *)
924
957
{
 
958
  ha_statistic_increment(&SSV::ha_delete_count);
925
959
 
926
960
  if (chain_append())
927
961
    return(-1);
955
989
  if (local_data_file_version != share->data_file_version)
956
990
  {
957
991
    local_data_file_version= share->data_file_version;
958
 
    if (internal::my_close(data_file, MYF(0)) ||
959
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
992
    if (my_close(data_file, MYF(0)) ||
 
993
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
960
994
      return 1;
961
995
  }
962
996
  file_buff->init_buff(data_file);
968
1002
  All table scans call this first.
969
1003
  The order of a table scan is:
970
1004
 
 
1005
  ha_tina::store_lock
 
1006
  ha_tina::external_lock
971
1007
  ha_tina::info
972
1008
  ha_tina::rnd_init
973
1009
  ha_tina::extra
983
1019
  ha_tina::rnd_next
984
1020
  ha_tina::extra
985
1021
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1022
  ha_tina::external_lock
986
1023
  ha_tina::extra
987
1024
  ENUM HA_EXTRA_RESET   Reset database to after open
988
1025
 
992
1029
 
993
1030
*/
994
1031
 
995
 
int ha_tina::doStartTableScan(bool)
 
1032
int ha_tina::rnd_init(bool)
996
1033
{
997
1034
  /* set buffer to the beginning of the file */
998
1035
  if (share->crashed || init_data_file())
1001
1038
  current_position= next_position= 0;
1002
1039
  stats.records= 0;
1003
1040
  records_is_known= 0;
1004
 
  chain.clear();
 
1041
  chain_ptr= chain;
1005
1042
 
1006
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1043
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1007
1044
 
1008
1045
  return(0);
1009
1046
}
1018
1055
  reserved for null count.
1019
1056
  Basically this works as a mask for which rows are nulled (compared to just
1020
1057
  empty).
1021
 
  This table Cursor doesn't do nulls and does not know the difference between
1022
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1058
  This table handler doesn't do nulls and does not know the difference between
 
1059
  NULL and "". This is ok since this table handler is for spreadsheets and
1023
1060
  they don't know about them either :)
1024
1061
*/
1025
1062
int ha_tina::rnd_next(unsigned char *buf)
1029
1066
  if (share->crashed)
1030
1067
      return(HA_ERR_CRASHED_ON_USAGE);
1031
1068
 
1032
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1069
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1033
1070
 
1034
1071
  current_position= next_position;
1035
1072
 
1055
1092
*/
1056
1093
void ha_tina::position(const unsigned char *)
1057
1094
{
1058
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1095
  my_store_ptr(ref, ref_length, current_position);
1059
1096
  return;
1060
1097
}
1061
1098
 
1062
1099
 
1063
1100
/*
1064
1101
  Used to fetch a row from a posiion stored with ::position().
1065
 
  internal::my_get_ptr() retrieves the data for you.
 
1102
  my_get_ptr() retrieves the data for you.
1066
1103
*/
1067
1104
 
1068
1105
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1069
1106
{
1070
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1071
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1107
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1108
  current_position= (off_t)my_get_ptr(pos,ref_length);
1072
1109
  return(find_current_row(buf));
1073
1110
}
1074
1111
 
1075
1112
/*
1076
1113
  ::info() is used to return information to the optimizer.
1077
 
  Currently this table Cursor doesn't implement most of the fields
 
1114
  Currently this table handler doesn't implement most of the fields
1078
1115
  really needed. SHOW also makes use of this data
1079
1116
*/
1080
1117
int ha_tina::info(uint32_t)
1090
1127
  to the given "hole", stored in the buffer. "Valid" here means,
1091
1128
  not listed in the chain of deleted records ("holes").
1092
1129
*/
1093
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1130
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1094
1131
{
1095
 
  if (closest_hole == chain.end()) /* no more chains */
 
1132
  if (closest_hole == chain_ptr) /* no more chains */
1096
1133
    *end_pos= file_buff->end();
1097
1134
  else
1098
1135
    *end_pos= std::min(file_buff->end(),
1099
 
                       closest_hole->first);
1100
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1136
                       closest_hole->begin);
 
1137
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1101
1138
}
1102
1139
 
1103
1140
 
1107
1144
  slots to clean up all of the dead space we have collected while
1108
1145
  performing deletes/updates.
1109
1146
*/
1110
 
int ha_tina::doEndTableScan()
 
1147
int ha_tina::rnd_end()
1111
1148
{
 
1149
  char updated_fname[FN_REFLEN];
1112
1150
  off_t file_buffer_start= 0;
1113
1151
 
1114
 
  blobroot.free_root(MYF(0));
 
1152
  free_root(&blobroot, MYF(0));
1115
1153
  records_is_known= 1;
1116
1154
 
1117
 
  if (chain.size() > 0)
 
1155
  if ((chain_ptr - chain)  > 0)
1118
1156
  {
1119
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1157
    tina_set *ptr= chain;
1120
1158
 
1121
1159
    /*
1122
1160
      Re-read the beginning of a file (as the buffer should point to the
1128
1166
      The sort is needed when there were updates/deletes with random orders.
1129
1167
      It sorts so that we move the firts blocks to the beginning.
1130
1168
    */
1131
 
    sort(chain.begin(), chain.end());
 
1169
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1170
             (qsort_cmp)sort_set);
1132
1171
 
1133
1172
    off_t write_begin= 0, write_end;
1134
1173
 
1149
1188
      /* if there is something to write, write it */
1150
1189
      if (write_length)
1151
1190
      {
1152
 
        if (internal::my_write(update_temp_file,
 
1191
        if (my_write(update_temp_file,
1153
1192
                     (unsigned char*) (file_buff->ptr() +
1154
1193
                               (write_begin - file_buff->start())),
1155
1194
                     (size_t)write_length, MYF_RW))
1159
1198
      if (in_hole)
1160
1199
      {
1161
1200
        /* skip hole */
1162
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1201
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1163
1202
          file_buffer_start= file_buff->read_next();
1164
 
        write_begin= ptr->second;
1165
 
        ++ptr;
 
1203
        write_begin= ptr->end;
 
1204
        ptr++;
1166
1205
      }
1167
1206
      else
1168
1207
        write_begin= write_end;
1172
1211
 
1173
1212
    }
1174
1213
 
1175
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1176
 
        internal::my_close(update_temp_file, MYF(0)))
 
1214
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1215
        my_close(update_temp_file, MYF(0)))
1177
1216
      return(-1);
1178
1217
 
1179
1218
    share->update_file_opened= false;
1180
1219
 
1181
1220
    if (share->tina_write_opened)
1182
1221
    {
1183
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1222
      if (my_close(share->tina_write_filedes, MYF(0)))
1184
1223
        return(-1);
1185
1224
      /*
1186
1225
        Mark that the writer fd is closed, so that init_tina_writer()
1193
1232
      Close opened fildes's. Then move updated file in place
1194
1233
      of the old datafile.
1195
1234
    */
1196
 
    std::string rename_file= share->table_name;
1197
 
    rename_file.append(CSN_EXT);
1198
 
    if (internal::my_close(data_file, MYF(0)) ||
1199
 
        internal::my_rename(rename_file.c_str(),
1200
 
                            share->data_file_name.c_str(), MYF(0)))
 
1235
    if (my_close(data_file, MYF(0)) ||
 
1236
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1237
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1238
                  share->data_file_name, MYF(0)))
1201
1239
      return(-1);
1202
1240
 
1203
1241
    /* Open the file again */
1204
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1242
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1205
1243
      return(-1);
1206
1244
    /*
1207
1245
      As we reopened the data file, increase share->data_file_version
1228
1266
 
1229
1267
  return(0);
1230
1268
error:
1231
 
  internal::my_close(update_temp_file, MYF(0));
 
1269
  my_close(update_temp_file, MYF(0));
1232
1270
  share->update_file_opened= false;
1233
1271
  return(-1);
1234
1272
}
1235
1273
 
1236
1274
 
1237
1275
/*
 
1276
  Repair CSV table in the case, it is crashed.
 
1277
 
 
1278
  SYNOPSIS
 
1279
    repair()
 
1280
    session         The thread, performing repair
 
1281
    check_opt   The options for repair. We do not use it currently.
 
1282
 
 
1283
  DESCRIPTION
 
1284
    If the file is empty, change # of rows in the file and complete recovery.
 
1285
    Otherwise, scan the table looking for bad rows. If none were found,
 
1286
    we mark file as a good one and return. If a bad row was encountered,
 
1287
    we truncate the datafile up to the last good row.
 
1288
 
 
1289
   TODO: Make repair more clever - it should try to recover subsequent
 
1290
         rows (after the first bad one) as well.
 
1291
*/
 
1292
 
 
1293
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
 
1294
{
 
1295
  char repaired_fname[FN_REFLEN];
 
1296
  unsigned char *buf;
 
1297
  File repair_file;
 
1298
  int rc;
 
1299
  ha_rows rows_repaired= 0;
 
1300
  off_t write_begin= 0, write_end;
 
1301
 
 
1302
  /* empty file */
 
1303
  if (!share->saved_data_file_length)
 
1304
  {
 
1305
    share->rows_recorded= 0;
 
1306
    goto end;
 
1307
  }
 
1308
 
 
1309
  /* Don't assert in field::val() functions */
 
1310
  table->use_all_columns();
 
1311
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1312
    return(HA_ERR_OUT_OF_MEM);
 
1313
 
 
1314
  /* position buffer to the start of the file */
 
1315
  if (init_data_file())
 
1316
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1317
 
 
1318
  /*
 
1319
    Local_saved_data_file_length is initialized during the lock phase.
 
1320
    Sometimes this is not getting executed before ::repair (e.g. for
 
1321
    the log tables). We set it manually here.
 
1322
  */
 
1323
  local_saved_data_file_length= share->saved_data_file_length;
 
1324
  /* set current position to the beginning of the file */
 
1325
  current_position= next_position= 0;
 
1326
 
 
1327
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1328
 
 
1329
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1330
  while (!(rc= find_current_row(buf)))
 
1331
  {
 
1332
    session_inc_row_count(session);
 
1333
    rows_repaired++;
 
1334
    current_position= next_position;
 
1335
  }
 
1336
 
 
1337
  free_root(&blobroot, MYF(0));
 
1338
 
 
1339
  free((char*)buf);
 
1340
 
 
1341
  if (rc == HA_ERR_END_OF_FILE)
 
1342
  {
 
1343
    /*
 
1344
      All rows were read ok until end of file, the file does not need repair.
 
1345
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1346
      to the current amount of rows.
 
1347
    */
 
1348
    share->rows_recorded= rows_repaired;
 
1349
    goto end;
 
1350
  }
 
1351
 
 
1352
  /*
 
1353
    Otherwise we've encountered a bad row => repair is needed.
 
1354
    Let us create a temporary file.
 
1355
  */
 
1356
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1357
                                        "", CSN_EXT,
 
1358
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1359
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1360
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1361
 
 
1362
  file_buff->init_buff(data_file);
 
1363
 
 
1364
 
 
1365
  /* we just truncated the file up to the first bad row. update rows count. */
 
1366
  share->rows_recorded= rows_repaired;
 
1367
 
 
1368
  /* write repaired file */
 
1369
  while (1)
 
1370
  {
 
1371
    write_end= std::min(file_buff->end(), current_position);
 
1372
 
 
1373
    off_t write_length= write_end - write_begin;
 
1374
    if ((uint64_t)write_length > SIZE_MAX)
 
1375
    {
 
1376
      return -1;
 
1377
    }
 
1378
    if ((write_length) &&
 
1379
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1380
                  (size_t)write_length, MYF_RW)))
 
1381
      return(-1);
 
1382
 
 
1383
    write_begin= write_end;
 
1384
    if (write_end== current_position)
 
1385
      break;
 
1386
    else
 
1387
      file_buff->read_next(); /* shift the buffer */
 
1388
  }
 
1389
 
 
1390
  /*
 
1391
    Close the files and rename repaired file to the datafile.
 
1392
    We have to close the files, as on Windows one cannot rename
 
1393
    a file, which descriptor is still open. EACCES will be returned
 
1394
    when trying to delete the "to"-file in my_rename().
 
1395
  */
 
1396
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1397
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1398
    return(-1);
 
1399
 
 
1400
  /* Open the file again, it should now be repaired */
 
1401
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1402
                          MYF(0))) == -1)
 
1403
     return(-1);
 
1404
 
 
1405
  /* Set new file size. The file size will be updated by ::update_status() */
 
1406
  local_saved_data_file_length= (size_t) current_position;
 
1407
 
 
1408
end:
 
1409
  share->crashed= false;
 
1410
  return(HA_ADMIN_OK);
 
1411
}
 
1412
 
 
1413
/*
1238
1414
  DELETE without WHERE calls this
1239
1415
*/
1240
1416
 
1243
1419
  int rc;
1244
1420
 
1245
1421
  if (!records_is_known)
1246
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1422
    return(my_errno=HA_ERR_WRONG_COMMAND);
1247
1423
 
1248
1424
  if (!share->tina_write_opened)
1249
1425
    if (init_tina_writer())
1262
1438
}
1263
1439
 
1264
1440
/*
 
1441
  Called by the database to lock the table. Keep in mind that this
 
1442
  is an internal lock.
 
1443
*/
 
1444
THR_LOCK_DATA **ha_tina::store_lock(Session *,
 
1445
                                    THR_LOCK_DATA **to,
 
1446
                                    enum thr_lock_type lock_type)
 
1447
{
 
1448
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1449
    lock.type=lock_type;
 
1450
  *to++= &lock;
 
1451
  return to;
 
1452
}
 
1453
 
 
1454
/*
1265
1455
  Create a table. You do not want to leave the table open after a call to
1266
1456
  this (the database will call ::open() if it needs to).
1267
1457
*/
1268
1458
 
1269
 
int Tina::doCreateTable(Session &session,
1270
 
                        Table& table_arg,
1271
 
                        const drizzled::identifier::Table &identifier,
1272
 
                        drizzled::message::Table &create_proto)
 
1459
int Tina::createTableImplementation(Session *, const char *table_name,
 
1460
                                    Table *table_arg,
 
1461
                                    HA_CREATE_INFO *, drizzled::message::Table*)
1273
1462
{
1274
1463
  char name_buff[FN_REFLEN];
1275
 
  int create_file;
 
1464
  File create_file;
1276
1465
 
1277
1466
  /*
1278
1467
    check columns
1279
1468
  */
1280
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
 
       iter != fields.end();
1283
 
       iter++)
 
1469
  for (Field **field= table_arg->s->field; *field; field++)
1284
1470
  {
1285
 
    if (not *iter) // Historical legacy for NULL array end.
1286
 
      continue;
1287
 
 
1288
 
    if ((*iter)->real_maybe_null())
 
1471
    if ((*field)->real_maybe_null())
1289
1472
    {
1290
1473
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1474
      return(HA_ERR_UNSUPPORTED);
1293
1476
  }
1294
1477
 
1295
1478
 
1296
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1479
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
 
1480
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1481
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1299
1482
    return(-1);
1300
1483
 
1301
1484
  write_meta_file(create_file, 0, false);
1302
 
  internal::my_close(create_file, MYF(0));
 
1485
  my_close(create_file, MYF(0));
1303
1486
 
1304
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1487
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
 
1488
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1489
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1307
1490
    return(-1);
1308
1491
 
1309
 
  internal::my_close(create_file, MYF(0));
1310
 
 
1311
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
1312
 
 
1313
 
  return 0;
1314
 
}
1315
 
 
1316
 
 
1317
 
DRIZZLE_DECLARE_PLUGIN
1318
 
{
1319
 
  DRIZZLE_VERSION_ID,
 
1492
  my_close(create_file, MYF(0));
 
1493
 
 
1494
  return(0);
 
1495
}
 
1496
 
 
1497
int ha_tina::check(Session* session, HA_CHECK_OPT *)
 
1498
{
 
1499
  int rc= 0;
 
1500
  unsigned char *buf;
 
1501
  const char *old_proc_info;
 
1502
  ha_rows count= share->rows_recorded;
 
1503
 
 
1504
  old_proc_info= get_session_proc_info(session);
 
1505
  set_session_proc_info(session, "Checking table");
 
1506
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1507
    return(HA_ERR_OUT_OF_MEM);
 
1508
 
 
1509
  /* position buffer to the start of the file */
 
1510
   if (init_data_file())
 
1511
     return(HA_ERR_CRASHED);
 
1512
 
 
1513
  /*
 
1514
    Local_saved_data_file_length is initialized during the lock phase.
 
1515
    Check does not use store_lock in certain cases. So, we set it
 
1516
    manually here.
 
1517
  */
 
1518
  local_saved_data_file_length= share->saved_data_file_length;
 
1519
  /* set current position to the beginning of the file */
 
1520
  current_position= next_position= 0;
 
1521
 
 
1522
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1523
 
 
1524
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1525
  while (!(rc= find_current_row(buf)))
 
1526
  {
 
1527
    session_inc_row_count(session);
 
1528
    count--;
 
1529
    current_position= next_position;
 
1530
  }
 
1531
 
 
1532
  free_root(&blobroot, MYF(0));
 
1533
 
 
1534
  free((char*)buf);
 
1535
  set_session_proc_info(session, old_proc_info);
 
1536
 
 
1537
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1538
  {
 
1539
    share->crashed= true;
 
1540
    return(HA_ADMIN_CORRUPT);
 
1541
  }
 
1542
  else
 
1543
    return(HA_ADMIN_OK);
 
1544
}
 
1545
 
 
1546
 
 
1547
drizzle_declare_plugin(csv)
 
1548
{
1320
1549
  "CSV",
1321
1550
  "1.0",
1322
1551
  "Brian Aker, MySQL AB",
1323
1552
  "CSV storage engine",
1324
1553
  PLUGIN_LICENSE_GPL,
1325
1554
  tina_init_func, /* Plugin Init */
1326
 
  NULL,                       /* depends */
 
1555
  tina_done_func, /* Plugin Deinit */
 
1556
  NULL,                       /* status variables                */
 
1557
  NULL,                       /* system variables                */
1327
1558
  NULL                        /* config options                  */
1328
1559
}
1329
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1560
drizzle_declare_plugin_end;
1330
1561