~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/csv/ha_tina.cc

Re-org'd the replication stuff into slots.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
 
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
46
#include <drizzled/field/timestamp.h>
47
47
#include <drizzled/error.h>
48
48
#include <drizzled/table.h>
49
49
#include <drizzled/session.h>
50
 
#include "drizzled/internal/my_sys.h"
51
50
 
52
51
#include "ha_tina.h"
53
52
 
54
 
#include <fcntl.h>
55
 
 
56
 
#include <algorithm>
57
 
#include <vector>
58
53
#include <string>
59
 
#include <map>
60
54
 
61
55
using namespace std;
62
 
using namespace drizzled;
 
56
 
 
57
static const string engine_name("CSV");
63
58
 
64
59
/*
65
60
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
75
70
#define CSM_EXT ".CSM"               // Meta file
76
71
 
77
72
 
78
 
static int read_meta_file(int meta_file, ha_rows *rows);
79
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
73
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
74
static int free_share(TINA_SHARE *share);
 
75
static int read_meta_file(File meta_file, ha_rows *rows);
 
76
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
77
 
 
78
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
79
extern "C" void tina_update_status(void* param);
 
80
extern "C" bool tina_check_status(void* param);
80
81
 
81
82
/* Stuff for shares */
82
83
pthread_mutex_t tina_mutex;
 
84
static HASH tina_open_tables;
83
85
 
84
86
/*****************************************************************************
85
87
 ** TINA tables
86
88
 *****************************************************************************/
87
89
 
88
90
/*
 
91
  Used for sorting chains with qsort().
 
92
*/
 
93
static int sort_set (tina_set *a, tina_set *b)
 
94
{
 
95
  /*
 
96
    We assume that intervals do not intersect. So, it is enought to compare
 
97
    any two points. Here we take start of intervals for comparison.
 
98
  */
 
99
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
100
}
 
101
 
 
102
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
 
103
{
 
104
  *length=share->table_name_length;
 
105
  return (unsigned char*) share->table_name;
 
106
}
 
107
 
 
108
 
 
109
/*
89
110
  If frm_error() is called in table.cc this is called to find out what file
90
 
  extensions exist for this Cursor.
 
111
  extensions exist for this handler.
91
112
*/
92
113
static const char *ha_tina_exts[] = {
93
114
  CSV_EXT,
95
116
  NULL
96
117
};
97
118
 
98
 
class Tina : public drizzled::plugin::StorageEngine
 
119
class Tina : public StorageEngine
99
120
{
100
 
  typedef std::map<string, TinaShare*> TinaMap;
101
 
  TinaMap tina_open_tables;
102
121
public:
103
122
  Tina(const string& name_arg)
104
 
   : drizzled::plugin::StorageEngine(name_arg,
105
 
                                     HTON_TEMPORARY_ONLY |
106
 
                                     HTON_NO_AUTO_INCREMENT |
107
 
                                     HTON_SKIP_STORE_LOCK),
108
 
    tina_open_tables()
109
 
  {}
110
 
  virtual ~Tina()
111
 
  {
112
 
    pthread_mutex_destroy(&tina_mutex);
113
 
  }
114
 
 
115
 
  virtual Cursor *create(Table &table)
116
 
  {
117
 
    return new ha_tina(*this, table);
 
123
   : StorageEngine(name_arg, HTON_CAN_RECREATE | HTON_TEMPORARY_ONLY | HTON_FILE_BASED) {}
 
124
  virtual handler *create(TableShare *table,
 
125
                          MEM_ROOT *mem_root)
 
126
  {
 
127
    return new (mem_root) ha_tina(this, table);
118
128
  }
119
129
 
120
130
  const char **bas_ext() const {
121
131
    return ha_tina_exts;
122
132
  }
123
133
 
124
 
  int doCreateTable(Session &,
125
 
                    Table &table_arg,
126
 
                    const drizzled::TableIdentifier &identifier,
127
 
                    drizzled::message::Table&);
128
 
 
129
 
  int doGetTableDefinition(Session& session,
130
 
                           const drizzled::TableIdentifier &identifier,
131
 
                           drizzled::message::Table &table_message);
132
 
 
133
 
  int doDropTable(Session&, const drizzled::TableIdentifier &identifier);
134
 
  TinaShare *findOpenTable(const string table_name);
135
 
  void addOpenTable(const string &table_name, TinaShare *);
136
 
  void deleteOpenTable(const string &table_name);
137
 
 
138
 
 
139
 
  uint32_t max_keys()          const { return 0; }
140
 
  uint32_t max_key_parts()     const { return 0; }
141
 
  uint32_t max_key_length()    const { return 0; }
142
 
  bool doDoesTableExist(Session& session, const drizzled::TableIdentifier &identifier);
143
 
  int doRenameTable(Session&, const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to);
144
 
 
145
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
146
 
                             const drizzled::SchemaIdentifier &schema_identifier,
147
 
                             drizzled::TableIdentifier::vector &set_of_identifiers);
 
134
  int createTableImplementation(Session *, const char *table_name,
 
135
                                Table *table_arg,
 
136
                                HA_CREATE_INFO *, drizzled::message::Table*);
 
137
 
148
138
};
149
139
 
150
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
151
 
                                 const drizzled::SchemaIdentifier&,
152
 
                                 drizzled::TableIdentifier::vector&)
153
 
{
154
 
}
155
 
 
156
 
int Tina::doRenameTable(Session &session,
157
 
                        const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to)
158
 
{
159
 
  int error= 0;
160
 
  for (const char **ext= bas_ext(); *ext ; ext++)
161
 
  {
162
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
 
    {
164
 
      if ((error=errno) != ENOENT)
165
 
        break;
166
 
      error= 0;
167
 
    }
168
 
  }
169
 
 
170
 
  session.getMessageCache().renameTableMessage(from, to);
171
 
 
172
 
  return error;
173
 
}
174
 
 
175
 
bool Tina::doDoesTableExist(Session &session, const drizzled::TableIdentifier &identifier)
176
 
{
177
 
  return session.getMessageCache().doesTableMessageExist(identifier);
178
 
}
179
 
 
180
 
 
181
 
int Tina::doDropTable(Session &session,
182
 
                      const drizzled::TableIdentifier &identifier)
183
 
{
184
 
  int error= 0;
185
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
186
 
 
187
 
  for (const char **ext= bas_ext(); *ext ; ext++)
188
 
  {
189
 
    std::string full_name= identifier.getPath();
190
 
    full_name.append(*ext);
191
 
 
192
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
193
 
    {
194
 
      if ((error= errno) != ENOENT)
195
 
        break;
196
 
    }
197
 
    else
198
 
    {
199
 
      enoent_or_zero= 0;                        // No error for ENOENT
200
 
    }
201
 
    error= enoent_or_zero;
202
 
  }
203
 
 
204
 
  session.getMessageCache().removeTableMessage(identifier);
205
 
 
206
 
  return error;
207
 
}
208
 
 
209
 
TinaShare *Tina::findOpenTable(const string table_name)
210
 
{
211
 
  TinaMap::iterator find_iter=
212
 
    tina_open_tables.find(table_name);
213
 
 
214
 
  if (find_iter != tina_open_tables.end())
215
 
    return (*find_iter).second;
216
 
  else
217
 
    return NULL;
218
 
}
219
 
 
220
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
221
 
{
222
 
  tina_open_tables[table_name]= share;
223
 
}
224
 
 
225
 
void Tina::deleteOpenTable(const string &table_name)
226
 
{
227
 
  tina_open_tables.erase(table_name);
228
 
}
229
 
 
230
 
 
231
 
int Tina::doGetTableDefinition(Session &session,
232
 
                               const drizzled::TableIdentifier &identifier,
233
 
                               drizzled::message::Table &table_message)
234
 
{
235
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
236
 
    return EEXIST;
237
 
 
238
 
  return ENOENT;
239
 
}
240
 
 
241
 
 
242
140
static Tina *tina_engine= NULL;
243
141
 
244
 
static int tina_init_func(drizzled::module::Context &context)
 
142
static int tina_init_func(drizzled::plugin::Registry &registry)
245
143
{
246
144
 
247
 
  tina_engine= new Tina("CSV");
248
 
  context.add(tina_engine);
 
145
  tina_engine= new Tina(engine_name);
 
146
  registry.add(tina_engine);
249
147
 
250
148
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
251
 
  return 0;
252
 
}
253
 
 
254
 
 
255
 
 
256
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
257
 
  table_name(table_name_arg),
258
 
  data_file_name(table_name_arg),
259
 
  use_count(0),
260
 
  saved_data_file_length(0),
261
 
  update_file_opened(false),
262
 
  tina_write_opened(false),
263
 
  crashed(false),
264
 
  rows_recorded(0),
265
 
  data_file_version(0)
266
 
{
267
 
  data_file_name.append(CSV_EXT);
268
 
}
269
 
 
270
 
TinaShare::~TinaShare()
271
 
{
272
 
  pthread_mutex_destroy(&mutex);
273
 
}
 
149
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
150
                   (hash_get_key) tina_get_key,0,0);
 
151
  return 0;
 
152
}
 
153
 
 
154
static int tina_done_func(drizzled::plugin::Registry &registry)
 
155
{
 
156
  registry.remove(tina_engine);
 
157
  delete tina_engine;
 
158
 
 
159
  hash_free(&tina_open_tables);
 
160
  pthread_mutex_destroy(&tina_mutex);
 
161
 
 
162
  return 0;
 
163
}
 
164
 
274
165
 
275
166
/*
276
167
  Simple lock controls.
277
168
*/
278
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
169
static TINA_SHARE *get_share(const char *table_name, Table *)
279
170
{
280
 
  pthread_mutex_lock(&tina_mutex);
281
 
 
282
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
283
 
  share= a_tina->findOpenTable(table_name);
284
 
 
285
 
  std::string meta_file_name;
 
171
  TINA_SHARE *share;
 
172
  char meta_file_name[FN_REFLEN];
286
173
  struct stat file_stat;
 
174
  char *tmp_name;
 
175
  uint32_t length;
 
176
 
 
177
  pthread_mutex_lock(&tina_mutex);
 
178
  length=(uint) strlen(table_name);
287
179
 
288
180
  /*
289
181
    If share is not present in the hash, create a new share and
290
182
    initialize its members.
291
183
  */
292
 
  if (! share)
 
184
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
185
                                        (unsigned char*) table_name,
 
186
                                       length)))
293
187
  {
294
 
    share= new TinaShare(table_name);
295
 
 
296
 
    if (share == NULL)
297
 
    {
298
 
      pthread_mutex_unlock(&tina_mutex);
299
 
      return NULL;
300
 
    }
301
 
 
302
 
    meta_file_name.assign(table_name);
303
 
    meta_file_name.append(CSM_EXT);
304
 
 
305
 
    if (stat(share->data_file_name.c_str(), &file_stat))
306
 
    {
307
 
      pthread_mutex_unlock(&tina_mutex);
308
 
      delete share;
309
 
      return NULL;
310
 
    }
311
 
  
 
188
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
189
                         &share, sizeof(*share),
 
190
                         &tmp_name, length+1,
 
191
                         NULL))
 
192
    {
 
193
      pthread_mutex_unlock(&tina_mutex);
 
194
      return NULL;
 
195
    }
 
196
 
 
197
    share->use_count= 0;
 
198
    share->table_name_length= length;
 
199
    share->table_name= tmp_name;
 
200
    share->crashed= false;
 
201
    share->rows_recorded= 0;
 
202
    share->update_file_opened= false;
 
203
    share->tina_write_opened= false;
 
204
    share->data_file_version= 0;
 
205
    strcpy(share->table_name, table_name);
 
206
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
207
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
208
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
209
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
210
 
 
211
    if (stat(share->data_file_name, &file_stat))
 
212
      goto error;
312
213
    share->saved_data_file_length= file_stat.st_size;
313
214
 
314
 
    a_tina->addOpenTable(share->table_name, share);
315
 
 
 
215
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
216
      goto error;
 
217
    thr_lock_init(&share->lock);
316
218
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
317
219
 
318
220
    /*
321
223
      Usually this will result in auto-repair, and we will get a good
322
224
      meta-file in the end.
323
225
    */
324
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
325
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
226
    if ((share->meta_file= my_open(meta_file_name,
 
227
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
326
228
      share->crashed= true;
327
229
 
328
230
    /*
336
238
  pthread_mutex_unlock(&tina_mutex);
337
239
 
338
240
  return share;
 
241
 
 
242
error:
 
243
  pthread_mutex_unlock(&tina_mutex);
 
244
  free((unsigned char*) share);
 
245
 
 
246
  return NULL;
339
247
}
340
248
 
341
249
 
358
266
    non-zero - error occurred
359
267
*/
360
268
 
361
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
269
static int read_meta_file(File meta_file, ha_rows *rows)
362
270
{
363
271
  unsigned char meta_buffer[META_BUFFER_SIZE];
364
272
  unsigned char *ptr= meta_buffer;
365
273
 
366
274
  lseek(meta_file, 0, SEEK_SET);
367
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
275
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
368
276
      != META_BUFFER_SIZE)
369
277
    return(HA_ERR_CRASHED_ON_USAGE);
370
278
 
386
294
      ((bool)(*ptr)== true))
387
295
    return(HA_ERR_CRASHED_ON_USAGE);
388
296
 
389
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
297
  my_sync(meta_file, MYF(MY_WME));
390
298
 
391
299
  return(0);
392
300
}
411
319
    non-zero - error occurred
412
320
*/
413
321
 
414
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
322
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
415
323
{
416
324
  unsigned char meta_buffer[META_BUFFER_SIZE];
417
325
  unsigned char *ptr= meta_buffer;
431
339
  *ptr= (unsigned char)dirty;
432
340
 
433
341
  lseek(meta_file, 0, SEEK_SET);
434
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
342
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
435
343
      != META_BUFFER_SIZE)
436
344
    return(-1);
437
345
 
438
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
346
  my_sync(meta_file, MYF(MY_WME));
439
347
 
440
348
  return(0);
441
349
}
442
350
 
 
351
bool ha_tina::check_and_repair(Session *session)
 
352
{
 
353
  HA_CHECK_OPT check_opt;
 
354
 
 
355
  check_opt.init();
 
356
 
 
357
  return(repair(session, &check_opt));
 
358
}
 
359
 
 
360
 
443
361
int ha_tina::init_tina_writer()
444
362
{
445
363
  /*
450
368
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
451
369
 
452
370
  if ((share->tina_write_filedes=
453
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
371
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
454
372
  {
455
373
    share->crashed= true;
456
374
    return(1);
461
379
}
462
380
 
463
381
 
 
382
bool ha_tina::is_crashed() const
 
383
{
 
384
  return(share->crashed);
 
385
}
 
386
 
464
387
/*
465
388
  Free lock controls.
466
389
*/
467
 
int ha_tina::free_share()
 
390
static int free_share(TINA_SHARE *share)
468
391
{
469
392
  pthread_mutex_lock(&tina_mutex);
470
393
  int result_code= 0;
472
395
    /* Write the meta file. Mark it as crashed if needed. */
473
396
    (void)write_meta_file(share->meta_file, share->rows_recorded,
474
397
                          share->crashed ? true :false);
475
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
398
    if (my_close(share->meta_file, MYF(0)))
476
399
      result_code= 1;
477
400
    if (share->tina_write_opened)
478
401
    {
479
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
402
      if (my_close(share->tina_write_filedes, MYF(0)))
480
403
        result_code= 1;
481
404
      share->tina_write_opened= false;
482
405
    }
483
406
 
484
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
485
 
    a_tina->deleteOpenTable(share->table_name);
486
 
    delete share;
 
407
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
408
    thr_lock_delete(&share->lock);
 
409
    pthread_mutex_destroy(&share->mutex);
 
410
    free((unsigned char*) share);
487
411
  }
488
412
  pthread_mutex_unlock(&tina_mutex);
489
413
 
530
454
 
531
455
 
532
456
 
533
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
534
 
  :Cursor(engine_arg, table_arg),
 
457
ha_tina::ha_tina(StorageEngine *engine_arg, TableShare *table_arg)
 
458
  :handler(engine_arg, table_arg),
535
459
  /*
536
 
    These definitions are found in Cursor.h
 
460
    These definitions are found in handler.h
537
461
    They are not probably completely right.
538
462
  */
539
463
  current_position(0), next_position(0), local_saved_data_file_length(0),
540
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
464
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
465
  local_data_file_version(0), records_is_known(0)
541
466
{
542
467
  /* Set our original buffers from pre-allocated memory */
543
468
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
469
  chain= chain_buffer;
544
470
  file_buff= new Transparent_file();
545
471
}
546
472
 
557
483
 
558
484
  buffer.length(0);
559
485
 
560
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
486
  for (Field **field=table->field ; *field ; field++)
561
487
  {
562
488
    const char *ptr;
563
489
    const char *end_ptr;
597
523
        {
598
524
          buffer.append('\\');
599
525
          buffer.append('"');
600
 
          (void) *ptr++;
 
526
          *ptr++;
601
527
        }
602
528
        else if (*ptr == '\r')
603
529
        {
604
530
          buffer.append('\\');
605
531
          buffer.append('r');
606
 
          (void) *ptr++;
 
532
          *ptr++;
607
533
        }
608
534
        else if (*ptr == '\\')
609
535
        {
610
536
          buffer.append('\\');
611
537
          buffer.append('\\');
612
 
          (void) *ptr++;
 
538
          *ptr++;
613
539
        }
614
540
        else if (*ptr == '\n')
615
541
        {
616
542
          buffer.append('\\');
617
543
          buffer.append('n');
618
 
          (void) *ptr++;
 
544
          *ptr++;
619
545
        }
620
546
        else
621
547
          buffer.append(*ptr++);
645
571
*/
646
572
int ha_tina::chain_append()
647
573
{
648
 
  if (chain.size() > 0 && chain.back().second == current_position)
649
 
    chain.back().second= next_position;
 
574
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
575
    (chain_ptr -1)->end= next_position;
650
576
  else
651
 
    chain.push_back(make_pair(current_position, next_position));
 
577
  {
 
578
    /* We set up for the next position */
 
579
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
580
    {
 
581
      off_t location= chain_ptr - chain;
 
582
      chain_size += DEFAULT_CHAIN_LENGTH;
 
583
      if (chain_alloced)
 
584
      {
 
585
        if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
 
586
          return -1;
 
587
      }
 
588
      else
 
589
      {
 
590
        tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
 
591
        if (ptr == NULL)
 
592
          return -1;
 
593
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
594
        chain= ptr;
 
595
        chain_alloced++;
 
596
      }
 
597
      chain_ptr= chain + location;
 
598
    }
 
599
    chain_ptr->begin= current_position;
 
600
    chain_ptr->end= next_position;
 
601
    chain_ptr++;
 
602
  }
 
603
 
652
604
  return 0;
653
605
}
654
606
 
662
614
  int eoln_len;
663
615
  int error;
664
616
 
665
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
617
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
666
618
 
667
619
  /*
668
620
    We do not read further then local_saved_data_file_length in order
675
627
 
676
628
  error= HA_ERR_CRASHED_ON_USAGE;
677
629
 
678
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
630
  memset(buf, 0, table->s->null_bytes);
679
631
 
680
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
632
  for (Field **field=table->field ; *field ; field++)
681
633
  {
682
634
    char curr_char;
683
635
 
746
698
    {
747
699
      /* This masks a bug in the logic for a SELECT * */
748
700
      (*field)->setWriteSet();
749
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
750
 
      {
 
701
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
702
                          CHECK_FIELD_WARN))
751
703
        goto err;
752
 
      }
753
704
 
754
705
      if ((*field)->flags & BLOB_FLAG)
755
706
      {
762
713
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
763
714
        if (src)
764
715
        {
765
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
 
716
          tgt= (unsigned char*) alloc_root(&blobroot, length);
766
717
          memmove(tgt, src, length);
767
718
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
768
719
        }
778
729
}
779
730
 
780
731
/*
 
732
  Three functions below are needed to enable concurrent insert functionality
 
733
  for CSV engine. For more details see mysys/thr_lock.c
 
734
*/
 
735
 
 
736
void tina_get_status(void* param, int)
 
737
{
 
738
  ha_tina *tina= (ha_tina*) param;
 
739
  tina->get_status();
 
740
}
 
741
 
 
742
void tina_update_status(void* param)
 
743
{
 
744
  ha_tina *tina= (ha_tina*) param;
 
745
  tina->update_status();
 
746
}
 
747
 
 
748
/* this should exist and return 0 for concurrent insert to work */
 
749
bool tina_check_status(void *)
 
750
{
 
751
  return 0;
 
752
}
 
753
 
 
754
/*
 
755
  Save the state of the table
 
756
 
 
757
  SYNOPSIS
 
758
    get_status()
 
759
 
 
760
  DESCRIPTION
 
761
    This function is used to retrieve the file length. During the lock
 
762
    phase of concurrent insert. For more details see comment to
 
763
    ha_tina::update_status below.
 
764
*/
 
765
 
 
766
void ha_tina::get_status()
 
767
{
 
768
  local_saved_data_file_length= share->saved_data_file_length;
 
769
}
 
770
 
 
771
 
 
772
/*
 
773
  Correct the state of the table. Called by unlock routines
 
774
  before the write lock is released.
 
775
 
 
776
  SYNOPSIS
 
777
    update_status()
 
778
 
 
779
  DESCRIPTION
 
780
    When we employ concurrent insert lock, we save current length of the file
 
781
    during the lock phase. We do not read further saved value, as we don't
 
782
    want to interfere with undergoing concurrent insert. Writers update file
 
783
    length info during unlock with update_status().
 
784
 
 
785
  NOTE
 
786
    For log tables concurrent insert works different. The reason is that
 
787
    log tables are always opened and locked. And as they do not unlock
 
788
    tables, the file length after writes should be updated in a different
 
789
    way.
 
790
*/
 
791
 
 
792
void ha_tina::update_status()
 
793
{
 
794
  /* correct local_saved_data_file_length for writers */
 
795
  share->saved_data_file_length= local_saved_data_file_length;
 
796
}
 
797
 
 
798
 
 
799
/*
781
800
  Open a database file. Keep in mind that tables are caches, so
782
801
  this will not be called for every request. Any sort of positions
783
802
  that need to be reset should be kept in the ::extra() call.
784
803
*/
785
 
int ha_tina::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
804
int ha_tina::open(const char *name, int, uint32_t open_options)
786
805
{
787
 
  if (not (share= get_share(identifier.getPath().c_str())))
 
806
  if (!(share= get_share(name, table)))
788
807
    return(ENOENT);
789
808
 
790
 
  if (share->crashed)
 
809
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
791
810
  {
792
 
    free_share();
 
811
    free_share(share);
793
812
    return(HA_ERR_CRASHED_ON_USAGE);
794
813
  }
795
814
 
796
815
  local_data_file_version= share->data_file_version;
797
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
816
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
798
817
    return(0);
799
818
 
800
819
  /*
801
 
    Init locking. Pass Cursor object to the locking routines,
 
820
    Init locking. Pass handler object to the locking routines,
802
821
    so that they could save/update local_saved_data_file_length value
803
822
    during locking. This is needed to enable concurrent inserts.
804
823
  */
 
824
  thr_lock_data_init(&share->lock, &lock, (void*) this);
805
825
  ref_length=sizeof(off_t);
806
826
 
 
827
  share->lock.get_status= tina_get_status;
 
828
  share->lock.update_status= tina_update_status;
 
829
  share->lock.check_status= tina_check_status;
 
830
 
807
831
  return(0);
808
832
}
809
833
 
 
834
 
810
835
/*
811
836
  Close a database file. We remove ourselves from the shared strucutre.
812
837
  If it is empty we destroy it.
814
839
int ha_tina::close(void)
815
840
{
816
841
  int rc= 0;
817
 
  rc= internal::my_close(data_file, MYF(0));
818
 
  return(free_share() || rc);
 
842
  rc= my_close(data_file, MYF(0));
 
843
  return(free_share(share) || rc);
819
844
}
820
845
 
821
846
/*
822
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
847
  This is an INSERT. At the moment this handler just seeks to the end
823
848
  of the file and appends the data. In an error case it really should
824
849
  just truncate to the original position (this is not done yet).
825
850
*/
826
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
851
int ha_tina::write_row(unsigned char * buf)
827
852
{
828
853
  int size;
829
854
 
830
855
  if (share->crashed)
831
856
      return(HA_ERR_CRASHED_ON_USAGE);
832
857
 
 
858
  ha_statistic_increment(&SSV::ha_write_count);
 
859
 
833
860
  size= encode_quote(buf);
834
861
 
835
862
  if (!share->tina_write_opened)
837
864
      return(-1);
838
865
 
839
866
   /* use pwrite, as concurrent reader could have changed the position */
840
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
867
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
841
868
               MYF(MY_WME | MY_NABP)))
842
869
    return(-1);
843
870
 
862
889
  if (!share->update_file_opened)
863
890
  {
864
891
    if ((update_temp_file=
865
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
892
           my_create(fn_format(updated_fname, share->table_name,
866
893
                               "", CSN_EXT,
867
894
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
868
895
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
877
904
  This is called for an update.
878
905
  Make sure you put in code to increment the auto increment, also
879
906
  update any timestamp data. Currently auto increment is not being
880
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
907
  fixed since autoincrements have yet to be added to this table handler.
881
908
  This will be called in a table scan right before the previous ::rnd_next()
882
909
  call.
883
910
*/
884
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
911
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
885
912
{
886
913
  int size;
887
914
  int rc= -1;
888
915
 
 
916
  ha_statistic_increment(&SSV::ha_update_count);
 
917
 
 
918
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
919
    table->timestamp_field->set_time();
 
920
 
889
921
  size= encode_quote(new_data);
890
922
 
891
923
  /*
892
924
    During update we mark each updating record as deleted
893
925
    (see the chain_append()) then write new one to the temporary data file.
894
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
926
    At the end of the sequence in the rnd_end() we append all non-marked
895
927
    records from the data file to the temporary data file then rename it.
896
928
    The temp_file_length is used to calculate new data file length.
897
929
  */
901
933
  if (open_update_temp_file_if_needed())
902
934
    goto err;
903
935
 
904
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
936
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
905
937
               MYF(MY_WME | MY_NABP)))
906
938
    goto err;
907
939
  temp_file_length+= size;
916
948
  Deletes a row. First the database will find the row, and then call this
917
949
  method. In the case of a table scan, the previous call to this will be
918
950
  the ::rnd_next() that found this row.
919
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
951
  The exception to this is an ORDER BY. This will cause the table handler
920
952
  to walk the table noting the positions of all rows that match a query.
921
953
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
922
954
  DESC, ASC).
923
955
*/
924
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
956
int ha_tina::delete_row(const unsigned char *)
925
957
{
 
958
  ha_statistic_increment(&SSV::ha_delete_count);
926
959
 
927
960
  if (chain_append())
928
961
    return(-1);
956
989
  if (local_data_file_version != share->data_file_version)
957
990
  {
958
991
    local_data_file_version= share->data_file_version;
959
 
    if (internal::my_close(data_file, MYF(0)) ||
960
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
992
    if (my_close(data_file, MYF(0)) ||
 
993
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
961
994
      return 1;
962
995
  }
963
996
  file_buff->init_buff(data_file);
969
1002
  All table scans call this first.
970
1003
  The order of a table scan is:
971
1004
 
 
1005
  ha_tina::store_lock
 
1006
  ha_tina::external_lock
972
1007
  ha_tina::info
973
1008
  ha_tina::rnd_init
974
1009
  ha_tina::extra
984
1019
  ha_tina::rnd_next
985
1020
  ha_tina::extra
986
1021
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1022
  ha_tina::external_lock
987
1023
  ha_tina::extra
988
1024
  ENUM HA_EXTRA_RESET   Reset database to after open
989
1025
 
993
1029
 
994
1030
*/
995
1031
 
996
 
int ha_tina::doStartTableScan(bool)
 
1032
int ha_tina::rnd_init(bool)
997
1033
{
998
1034
  /* set buffer to the beginning of the file */
999
1035
  if (share->crashed || init_data_file())
1002
1038
  current_position= next_position= 0;
1003
1039
  stats.records= 0;
1004
1040
  records_is_known= 0;
1005
 
  chain.clear();
 
1041
  chain_ptr= chain;
1006
1042
 
1007
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1043
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1008
1044
 
1009
1045
  return(0);
1010
1046
}
1019
1055
  reserved for null count.
1020
1056
  Basically this works as a mask for which rows are nulled (compared to just
1021
1057
  empty).
1022
 
  This table Cursor doesn't do nulls and does not know the difference between
1023
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1058
  This table handler doesn't do nulls and does not know the difference between
 
1059
  NULL and "". This is ok since this table handler is for spreadsheets and
1024
1060
  they don't know about them either :)
1025
1061
*/
1026
1062
int ha_tina::rnd_next(unsigned char *buf)
1030
1066
  if (share->crashed)
1031
1067
      return(HA_ERR_CRASHED_ON_USAGE);
1032
1068
 
1033
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1069
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1034
1070
 
1035
1071
  current_position= next_position;
1036
1072
 
1056
1092
*/
1057
1093
void ha_tina::position(const unsigned char *)
1058
1094
{
1059
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1095
  my_store_ptr(ref, ref_length, current_position);
1060
1096
  return;
1061
1097
}
1062
1098
 
1063
1099
 
1064
1100
/*
1065
1101
  Used to fetch a row from a posiion stored with ::position().
1066
 
  internal::my_get_ptr() retrieves the data for you.
 
1102
  my_get_ptr() retrieves the data for you.
1067
1103
*/
1068
1104
 
1069
1105
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1070
1106
{
1071
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1072
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1107
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1108
  current_position= (off_t)my_get_ptr(pos,ref_length);
1073
1109
  return(find_current_row(buf));
1074
1110
}
1075
1111
 
1076
1112
/*
1077
1113
  ::info() is used to return information to the optimizer.
1078
 
  Currently this table Cursor doesn't implement most of the fields
 
1114
  Currently this table handler doesn't implement most of the fields
1079
1115
  really needed. SHOW also makes use of this data
1080
1116
*/
1081
1117
int ha_tina::info(uint32_t)
1091
1127
  to the given "hole", stored in the buffer. "Valid" here means,
1092
1128
  not listed in the chain of deleted records ("holes").
1093
1129
*/
1094
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1130
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1095
1131
{
1096
 
  if (closest_hole == chain.end()) /* no more chains */
 
1132
  if (closest_hole == chain_ptr) /* no more chains */
1097
1133
    *end_pos= file_buff->end();
1098
1134
  else
1099
1135
    *end_pos= std::min(file_buff->end(),
1100
 
                       closest_hole->first);
1101
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1136
                       closest_hole->begin);
 
1137
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1102
1138
}
1103
1139
 
1104
1140
 
1108
1144
  slots to clean up all of the dead space we have collected while
1109
1145
  performing deletes/updates.
1110
1146
*/
1111
 
int ha_tina::doEndTableScan()
 
1147
int ha_tina::rnd_end()
1112
1148
{
 
1149
  char updated_fname[FN_REFLEN];
1113
1150
  off_t file_buffer_start= 0;
1114
1151
 
1115
 
  blobroot.free_root(MYF(0));
 
1152
  free_root(&blobroot, MYF(0));
1116
1153
  records_is_known= 1;
1117
1154
 
1118
 
  if (chain.size() > 0)
 
1155
  if ((chain_ptr - chain)  > 0)
1119
1156
  {
1120
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1157
    tina_set *ptr= chain;
1121
1158
 
1122
1159
    /*
1123
1160
      Re-read the beginning of a file (as the buffer should point to the
1129
1166
      The sort is needed when there were updates/deletes with random orders.
1130
1167
      It sorts so that we move the firts blocks to the beginning.
1131
1168
    */
1132
 
    sort(chain.begin(), chain.end());
 
1169
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1170
             (qsort_cmp)sort_set);
1133
1171
 
1134
1172
    off_t write_begin= 0, write_end;
1135
1173
 
1150
1188
      /* if there is something to write, write it */
1151
1189
      if (write_length)
1152
1190
      {
1153
 
        if (internal::my_write(update_temp_file,
 
1191
        if (my_write(update_temp_file,
1154
1192
                     (unsigned char*) (file_buff->ptr() +
1155
1193
                               (write_begin - file_buff->start())),
1156
1194
                     (size_t)write_length, MYF_RW))
1160
1198
      if (in_hole)
1161
1199
      {
1162
1200
        /* skip hole */
1163
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1201
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1164
1202
          file_buffer_start= file_buff->read_next();
1165
 
        write_begin= ptr->second;
1166
 
        ++ptr;
 
1203
        write_begin= ptr->end;
 
1204
        ptr++;
1167
1205
      }
1168
1206
      else
1169
1207
        write_begin= write_end;
1173
1211
 
1174
1212
    }
1175
1213
 
1176
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1177
 
        internal::my_close(update_temp_file, MYF(0)))
 
1214
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1215
        my_close(update_temp_file, MYF(0)))
1178
1216
      return(-1);
1179
1217
 
1180
1218
    share->update_file_opened= false;
1181
1219
 
1182
1220
    if (share->tina_write_opened)
1183
1221
    {
1184
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1222
      if (my_close(share->tina_write_filedes, MYF(0)))
1185
1223
        return(-1);
1186
1224
      /*
1187
1225
        Mark that the writer fd is closed, so that init_tina_writer()
1194
1232
      Close opened fildes's. Then move updated file in place
1195
1233
      of the old datafile.
1196
1234
    */
1197
 
    std::string rename_file= share->table_name;
1198
 
    rename_file.append(CSN_EXT);
1199
 
    if (internal::my_close(data_file, MYF(0)) ||
1200
 
        internal::my_rename(rename_file.c_str(),
1201
 
                            share->data_file_name.c_str(), MYF(0)))
 
1235
    if (my_close(data_file, MYF(0)) ||
 
1236
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1237
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1238
                  share->data_file_name, MYF(0)))
1202
1239
      return(-1);
1203
1240
 
1204
1241
    /* Open the file again */
1205
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1242
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1206
1243
      return(-1);
1207
1244
    /*
1208
1245
      As we reopened the data file, increase share->data_file_version
1229
1266
 
1230
1267
  return(0);
1231
1268
error:
1232
 
  internal::my_close(update_temp_file, MYF(0));
 
1269
  my_close(update_temp_file, MYF(0));
1233
1270
  share->update_file_opened= false;
1234
1271
  return(-1);
1235
1272
}
1236
1273
 
1237
1274
 
1238
1275
/*
 
1276
  Repair CSV table in the case, it is crashed.
 
1277
 
 
1278
  SYNOPSIS
 
1279
    repair()
 
1280
    session         The thread, performing repair
 
1281
    check_opt   The options for repair. We do not use it currently.
 
1282
 
 
1283
  DESCRIPTION
 
1284
    If the file is empty, change # of rows in the file and complete recovery.
 
1285
    Otherwise, scan the table looking for bad rows. If none were found,
 
1286
    we mark file as a good one and return. If a bad row was encountered,
 
1287
    we truncate the datafile up to the last good row.
 
1288
 
 
1289
   TODO: Make repair more clever - it should try to recover subsequent
 
1290
         rows (after the first bad one) as well.
 
1291
*/
 
1292
 
 
1293
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
 
1294
{
 
1295
  char repaired_fname[FN_REFLEN];
 
1296
  unsigned char *buf;
 
1297
  File repair_file;
 
1298
  int rc;
 
1299
  ha_rows rows_repaired= 0;
 
1300
  off_t write_begin= 0, write_end;
 
1301
 
 
1302
  /* empty file */
 
1303
  if (!share->saved_data_file_length)
 
1304
  {
 
1305
    share->rows_recorded= 0;
 
1306
    goto end;
 
1307
  }
 
1308
 
 
1309
  /* Don't assert in field::val() functions */
 
1310
  table->use_all_columns();
 
1311
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1312
    return(HA_ERR_OUT_OF_MEM);
 
1313
 
 
1314
  /* position buffer to the start of the file */
 
1315
  if (init_data_file())
 
1316
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1317
 
 
1318
  /*
 
1319
    Local_saved_data_file_length is initialized during the lock phase.
 
1320
    Sometimes this is not getting executed before ::repair (e.g. for
 
1321
    the log tables). We set it manually here.
 
1322
  */
 
1323
  local_saved_data_file_length= share->saved_data_file_length;
 
1324
  /* set current position to the beginning of the file */
 
1325
  current_position= next_position= 0;
 
1326
 
 
1327
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1328
 
 
1329
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1330
  while (!(rc= find_current_row(buf)))
 
1331
  {
 
1332
    session_inc_row_count(session);
 
1333
    rows_repaired++;
 
1334
    current_position= next_position;
 
1335
  }
 
1336
 
 
1337
  free_root(&blobroot, MYF(0));
 
1338
 
 
1339
  free((char*)buf);
 
1340
 
 
1341
  if (rc == HA_ERR_END_OF_FILE)
 
1342
  {
 
1343
    /*
 
1344
      All rows were read ok until end of file, the file does not need repair.
 
1345
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1346
      to the current amount of rows.
 
1347
    */
 
1348
    share->rows_recorded= rows_repaired;
 
1349
    goto end;
 
1350
  }
 
1351
 
 
1352
  /*
 
1353
    Otherwise we've encountered a bad row => repair is needed.
 
1354
    Let us create a temporary file.
 
1355
  */
 
1356
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1357
                                        "", CSN_EXT,
 
1358
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1359
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1360
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1361
 
 
1362
  file_buff->init_buff(data_file);
 
1363
 
 
1364
 
 
1365
  /* we just truncated the file up to the first bad row. update rows count. */
 
1366
  share->rows_recorded= rows_repaired;
 
1367
 
 
1368
  /* write repaired file */
 
1369
  while (1)
 
1370
  {
 
1371
    write_end= std::min(file_buff->end(), current_position);
 
1372
 
 
1373
    off_t write_length= write_end - write_begin;
 
1374
    if ((uint64_t)write_length > SIZE_MAX)
 
1375
    {
 
1376
      return -1;
 
1377
    }
 
1378
    if ((write_length) &&
 
1379
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1380
                  (size_t)write_length, MYF_RW)))
 
1381
      return(-1);
 
1382
 
 
1383
    write_begin= write_end;
 
1384
    if (write_end== current_position)
 
1385
      break;
 
1386
    else
 
1387
      file_buff->read_next(); /* shift the buffer */
 
1388
  }
 
1389
 
 
1390
  /*
 
1391
    Close the files and rename repaired file to the datafile.
 
1392
    We have to close the files, as on Windows one cannot rename
 
1393
    a file, which descriptor is still open. EACCES will be returned
 
1394
    when trying to delete the "to"-file in my_rename().
 
1395
  */
 
1396
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1397
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1398
    return(-1);
 
1399
 
 
1400
  /* Open the file again, it should now be repaired */
 
1401
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1402
                          MYF(0))) == -1)
 
1403
     return(-1);
 
1404
 
 
1405
  /* Set new file size. The file size will be updated by ::update_status() */
 
1406
  local_saved_data_file_length= (size_t) current_position;
 
1407
 
 
1408
end:
 
1409
  share->crashed= false;
 
1410
  return(HA_ADMIN_OK);
 
1411
}
 
1412
 
 
1413
/*
1239
1414
  DELETE without WHERE calls this
1240
1415
*/
1241
1416
 
1244
1419
  int rc;
1245
1420
 
1246
1421
  if (!records_is_known)
1247
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1422
    return(my_errno=HA_ERR_WRONG_COMMAND);
1248
1423
 
1249
1424
  if (!share->tina_write_opened)
1250
1425
    if (init_tina_writer())
1263
1438
}
1264
1439
 
1265
1440
/*
 
1441
  Called by the database to lock the table. Keep in mind that this
 
1442
  is an internal lock.
 
1443
*/
 
1444
THR_LOCK_DATA **ha_tina::store_lock(Session *,
 
1445
                                    THR_LOCK_DATA **to,
 
1446
                                    enum thr_lock_type lock_type)
 
1447
{
 
1448
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1449
    lock.type=lock_type;
 
1450
  *to++= &lock;
 
1451
  return to;
 
1452
}
 
1453
 
 
1454
/*
1266
1455
  Create a table. You do not want to leave the table open after a call to
1267
1456
  this (the database will call ::open() if it needs to).
1268
1457
*/
1269
1458
 
1270
 
int Tina::doCreateTable(Session &session,
1271
 
                        Table& table_arg,
1272
 
                        const drizzled::TableIdentifier &identifier,
1273
 
                        drizzled::message::Table &create_proto)
 
1459
int Tina::createTableImplementation(Session *, const char *table_name,
 
1460
                                    Table *table_arg,
 
1461
                                    HA_CREATE_INFO *, drizzled::message::Table*)
1274
1462
{
1275
1463
  char name_buff[FN_REFLEN];
1276
 
  int create_file;
 
1464
  File create_file;
1277
1465
 
1278
1466
  /*
1279
1467
    check columns
1280
1468
  */
1281
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1282
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1283
 
       iter != fields.end();
1284
 
       iter++)
 
1469
  for (Field **field= table_arg->s->field; *field; field++)
1285
1470
  {
1286
 
    if (not *iter) // Historical legacy for NULL array end.
1287
 
      continue;
1288
 
 
1289
 
    if ((*iter)->real_maybe_null())
 
1471
    if ((*field)->real_maybe_null())
1290
1472
    {
1291
1473
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1292
1474
      return(HA_ERR_UNSUPPORTED);
1294
1476
  }
1295
1477
 
1296
1478
 
1297
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1298
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1299
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1479
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
 
1480
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1481
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1300
1482
    return(-1);
1301
1483
 
1302
1484
  write_meta_file(create_file, 0, false);
1303
 
  internal::my_close(create_file, MYF(0));
 
1485
  my_close(create_file, MYF(0));
1304
1486
 
1305
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1306
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1307
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1487
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
 
1488
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1489
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1308
1490
    return(-1);
1309
1491
 
1310
 
  internal::my_close(create_file, MYF(0));
1311
 
 
1312
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
1313
 
 
1314
 
  return 0;
1315
 
}
1316
 
 
1317
 
 
1318
 
DRIZZLE_DECLARE_PLUGIN
1319
 
{
1320
 
  DRIZZLE_VERSION_ID,
 
1492
  my_close(create_file, MYF(0));
 
1493
 
 
1494
  return(0);
 
1495
}
 
1496
 
 
1497
int ha_tina::check(Session* session, HA_CHECK_OPT *)
 
1498
{
 
1499
  int rc= 0;
 
1500
  unsigned char *buf;
 
1501
  const char *old_proc_info;
 
1502
  ha_rows count= share->rows_recorded;
 
1503
 
 
1504
  old_proc_info= get_session_proc_info(session);
 
1505
  set_session_proc_info(session, "Checking table");
 
1506
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1507
    return(HA_ERR_OUT_OF_MEM);
 
1508
 
 
1509
  /* position buffer to the start of the file */
 
1510
   if (init_data_file())
 
1511
     return(HA_ERR_CRASHED);
 
1512
 
 
1513
  /*
 
1514
    Local_saved_data_file_length is initialized during the lock phase.
 
1515
    Check does not use store_lock in certain cases. So, we set it
 
1516
    manually here.
 
1517
  */
 
1518
  local_saved_data_file_length= share->saved_data_file_length;
 
1519
  /* set current position to the beginning of the file */
 
1520
  current_position= next_position= 0;
 
1521
 
 
1522
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1523
 
 
1524
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1525
  while (!(rc= find_current_row(buf)))
 
1526
  {
 
1527
    session_inc_row_count(session);
 
1528
    count--;
 
1529
    current_position= next_position;
 
1530
  }
 
1531
 
 
1532
  free_root(&blobroot, MYF(0));
 
1533
 
 
1534
  free((char*)buf);
 
1535
  set_session_proc_info(session, old_proc_info);
 
1536
 
 
1537
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1538
  {
 
1539
    share->crashed= true;
 
1540
    return(HA_ADMIN_CORRUPT);
 
1541
  }
 
1542
  else
 
1543
    return(HA_ADMIN_OK);
 
1544
}
 
1545
 
 
1546
 
 
1547
drizzle_declare_plugin(csv)
 
1548
{
1321
1549
  "CSV",
1322
1550
  "1.0",
1323
1551
  "Brian Aker, MySQL AB",
1324
1552
  "CSV storage engine",
1325
1553
  PLUGIN_LICENSE_GPL,
1326
1554
  tina_init_func, /* Plugin Init */
 
1555
  tina_done_func, /* Plugin Deinit */
 
1556
  NULL,                       /* status variables                */
1327
1557
  NULL,                       /* system variables                */
1328
1558
  NULL                        /* config options                  */
1329
1559
}
1330
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1560
drizzle_declare_plugin_end;
1331
1561