~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/csv/ha_tina.cc

  • Committer: Brian Aker
  • Date: 2009-01-07 09:27:07 UTC
  • Revision ID: brian@tangent.org-20090107092707-bn67qpdllfcyh3j9
Removing dead field translator code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
 
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
46
#include <drizzled/field/timestamp.h>
 
47
#include <storage/csv/ha_tina.h>
47
48
#include <drizzled/error.h>
48
49
#include <drizzled/table.h>
49
50
#include <drizzled/session.h>
50
 
#include "drizzled/internal/my_sys.h"
51
 
 
52
 
#include "ha_tina.h"
53
 
 
54
 
#include <fcntl.h>
55
 
 
56
 
#include <algorithm>
57
 
#include <vector>
58
 
#include <string>
59
 
#include <map>
60
 
 
61
 
using namespace std;
62
 
using namespace drizzled;
63
51
 
64
52
/*
65
53
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
75
63
#define CSM_EXT ".CSM"               // Meta file
76
64
 
77
65
 
78
 
static int read_meta_file(int meta_file, ha_rows *rows);
79
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
66
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
67
static int free_share(TINA_SHARE *share);
 
68
static int read_meta_file(File meta_file, ha_rows *rows);
 
69
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
70
 
 
71
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
72
extern "C" void tina_update_status(void* param);
 
73
extern "C" bool tina_check_status(void* param);
80
74
 
81
75
/* Stuff for shares */
82
76
pthread_mutex_t tina_mutex;
 
77
static HASH tina_open_tables;
 
78
static handler *tina_create_handler(handlerton *hton,
 
79
                                    TABLE_SHARE *table,
 
80
                                    MEM_ROOT *mem_root);
 
81
 
83
82
 
84
83
/*****************************************************************************
85
84
 ** TINA tables
86
85
 *****************************************************************************/
87
86
 
88
87
/*
89
 
  If frm_error() is called in table.cc this is called to find out what file
90
 
  extensions exist for this Cursor.
 
88
  Used for sorting chains with qsort().
91
89
*/
92
 
static const char *ha_tina_exts[] = {
93
 
  CSV_EXT,
94
 
  CSM_EXT,
95
 
  NULL
96
 
};
97
 
 
98
 
class Tina : public drizzled::plugin::StorageEngine
99
 
{
100
 
  typedef std::map<string, TinaShare*> TinaMap;
101
 
  TinaMap tina_open_tables;
102
 
public:
103
 
  Tina(const string& name_arg)
104
 
   : drizzled::plugin::StorageEngine(name_arg,
105
 
                                     HTON_TEMPORARY_ONLY |
106
 
                                     HTON_NO_AUTO_INCREMENT |
107
 
                                     HTON_SKIP_STORE_LOCK),
108
 
    tina_open_tables()
109
 
  {}
110
 
  virtual ~Tina()
111
 
  {
112
 
    pthread_mutex_destroy(&tina_mutex);
113
 
  }
114
 
 
115
 
  virtual Cursor *create(Table &table)
116
 
  {
117
 
    return new ha_tina(*this, table);
118
 
  }
119
 
 
120
 
  const char **bas_ext() const {
121
 
    return ha_tina_exts;
122
 
  }
123
 
 
124
 
  int doCreateTable(Session &,
125
 
                    Table &table_arg,
126
 
                    const drizzled::TableIdentifier &identifier,
127
 
                    drizzled::message::Table&);
128
 
 
129
 
  int doGetTableDefinition(Session& session,
130
 
                           const drizzled::TableIdentifier &identifier,
131
 
                           drizzled::message::Table &table_message);
132
 
 
133
 
  int doDropTable(Session&, const drizzled::TableIdentifier &identifier);
134
 
  TinaShare *findOpenTable(const string table_name);
135
 
  void addOpenTable(const string &table_name, TinaShare *);
136
 
  void deleteOpenTable(const string &table_name);
137
 
 
138
 
 
139
 
  uint32_t max_keys()          const { return 0; }
140
 
  uint32_t max_key_parts()     const { return 0; }
141
 
  uint32_t max_key_length()    const { return 0; }
142
 
  bool doDoesTableExist(Session& session, const drizzled::TableIdentifier &identifier);
143
 
  int doRenameTable(Session&, const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to);
144
 
 
145
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
146
 
                             const drizzled::SchemaIdentifier &schema_identifier,
147
 
                             drizzled::TableIdentifier::vector &set_of_identifiers);
148
 
};
149
 
 
150
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
151
 
                                 const drizzled::SchemaIdentifier&,
152
 
                                 drizzled::TableIdentifier::vector&)
153
 
{
154
 
}
155
 
 
156
 
int Tina::doRenameTable(Session &session,
157
 
                        const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to)
158
 
{
159
 
  int error= 0;
160
 
  for (const char **ext= bas_ext(); *ext ; ext++)
161
 
  {
162
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
 
    {
164
 
      if ((error=errno) != ENOENT)
165
 
        break;
166
 
      error= 0;
167
 
    }
168
 
  }
169
 
 
170
 
  session.getMessageCache().renameTableMessage(from, to);
171
 
 
172
 
  return error;
173
 
}
174
 
 
175
 
bool Tina::doDoesTableExist(Session &session, const drizzled::TableIdentifier &identifier)
176
 
{
177
 
  return session.getMessageCache().doesTableMessageExist(identifier);
178
 
}
179
 
 
180
 
 
181
 
int Tina::doDropTable(Session &session,
182
 
                      const drizzled::TableIdentifier &identifier)
183
 
{
184
 
  int error= 0;
185
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
186
 
 
187
 
  for (const char **ext= bas_ext(); *ext ; ext++)
188
 
  {
189
 
    std::string full_name= identifier.getPath();
190
 
    full_name.append(*ext);
191
 
 
192
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
193
 
    {
194
 
      if ((error= errno) != ENOENT)
195
 
        break;
196
 
    }
197
 
    else
198
 
    {
199
 
      enoent_or_zero= 0;                        // No error for ENOENT
200
 
    }
201
 
    error= enoent_or_zero;
202
 
  }
203
 
 
204
 
  session.getMessageCache().removeTableMessage(identifier);
205
 
 
206
 
  return error;
207
 
}
208
 
 
209
 
TinaShare *Tina::findOpenTable(const string table_name)
210
 
{
211
 
  TinaMap::iterator find_iter=
212
 
    tina_open_tables.find(table_name);
213
 
 
214
 
  if (find_iter != tina_open_tables.end())
215
 
    return (*find_iter).second;
216
 
  else
217
 
    return NULL;
218
 
}
219
 
 
220
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
221
 
{
222
 
  tina_open_tables[table_name]= share;
223
 
}
224
 
 
225
 
void Tina::deleteOpenTable(const string &table_name)
226
 
{
227
 
  tina_open_tables.erase(table_name);
228
 
}
229
 
 
230
 
 
231
 
int Tina::doGetTableDefinition(Session &session,
232
 
                               const drizzled::TableIdentifier &identifier,
233
 
                               drizzled::message::Table &table_message)
234
 
{
235
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
236
 
    return EEXIST;
237
 
 
238
 
  return ENOENT;
239
 
}
240
 
 
241
 
 
242
 
static Tina *tina_engine= NULL;
243
 
 
244
 
static int tina_init_func(drizzled::module::Context &context)
245
 
{
246
 
 
247
 
  tina_engine= new Tina("CSV");
248
 
  context.add(tina_engine);
249
 
 
 
90
int sort_set (tina_set *a, tina_set *b)
 
91
{
 
92
  /*
 
93
    We assume that intervals do not intersect. So, it is enought to compare
 
94
    any two points. Here we take start of intervals for comparison.
 
95
  */
 
96
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
97
}
 
98
 
 
99
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
 
100
{
 
101
  *length=share->table_name_length;
 
102
  return (unsigned char*) share->table_name;
 
103
}
 
104
 
 
105
static int tina_init_func(void *p)
 
106
{
 
107
  handlerton *tina_hton;
 
108
 
 
109
  tina_hton= (handlerton *)p;
250
110
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
251
 
  return 0;
252
 
}
253
 
 
254
 
 
255
 
 
256
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
257
 
  table_name(table_name_arg),
258
 
  data_file_name(table_name_arg),
259
 
  use_count(0),
260
 
  saved_data_file_length(0),
261
 
  update_file_opened(false),
262
 
  tina_write_opened(false),
263
 
  crashed(false),
264
 
  rows_recorded(0),
265
 
  data_file_version(0)
266
 
{
267
 
  data_file_name.append(CSV_EXT);
268
 
}
269
 
 
270
 
TinaShare::~TinaShare()
271
 
{
272
 
  pthread_mutex_destroy(&mutex);
273
 
}
 
111
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
112
                   (hash_get_key) tina_get_key,0,0);
 
113
  tina_hton->state= SHOW_OPTION_YES;
 
114
  tina_hton->create= tina_create_handler;
 
115
  tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES |
 
116
                     HTON_NO_PARTITION);
 
117
  return 0;
 
118
}
 
119
 
 
120
static int tina_done_func(void *)
 
121
{
 
122
  hash_free(&tina_open_tables);
 
123
  pthread_mutex_destroy(&tina_mutex);
 
124
 
 
125
  return 0;
 
126
}
 
127
 
274
128
 
275
129
/*
276
130
  Simple lock controls.
277
131
*/
278
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
132
static TINA_SHARE *get_share(const char *table_name, Table *)
279
133
{
280
 
  pthread_mutex_lock(&tina_mutex);
281
 
 
282
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
283
 
  share= a_tina->findOpenTable(table_name);
284
 
 
285
 
  std::string meta_file_name;
 
134
  TINA_SHARE *share;
 
135
  char meta_file_name[FN_REFLEN];
286
136
  struct stat file_stat;
 
137
  char *tmp_name;
 
138
  uint32_t length;
 
139
 
 
140
  pthread_mutex_lock(&tina_mutex);
 
141
  length=(uint) strlen(table_name);
287
142
 
288
143
  /*
289
144
    If share is not present in the hash, create a new share and
290
145
    initialize its members.
291
146
  */
292
 
  if (! share)
 
147
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
148
                                        (unsigned char*) table_name,
 
149
                                       length)))
293
150
  {
294
 
    share= new TinaShare(table_name);
295
 
 
296
 
    if (share == NULL)
297
 
    {
298
 
      pthread_mutex_unlock(&tina_mutex);
299
 
      return NULL;
300
 
    }
301
 
 
302
 
    meta_file_name.assign(table_name);
303
 
    meta_file_name.append(CSM_EXT);
304
 
 
305
 
    if (stat(share->data_file_name.c_str(), &file_stat))
306
 
    {
307
 
      pthread_mutex_unlock(&tina_mutex);
308
 
      delete share;
309
 
      return NULL;
310
 
    }
311
 
  
 
151
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
152
                         &share, sizeof(*share),
 
153
                         &tmp_name, length+1,
 
154
                         NULL))
 
155
    {
 
156
      pthread_mutex_unlock(&tina_mutex);
 
157
      return NULL;
 
158
    }
 
159
 
 
160
    share->use_count= 0;
 
161
    share->table_name_length= length;
 
162
    share->table_name= tmp_name;
 
163
    share->crashed= false;
 
164
    share->rows_recorded= 0;
 
165
    share->update_file_opened= false;
 
166
    share->tina_write_opened= false;
 
167
    share->data_file_version= 0;
 
168
    strcpy(share->table_name, table_name);
 
169
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
170
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
171
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
172
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
173
 
 
174
    if (stat(share->data_file_name, &file_stat))
 
175
      goto error;
312
176
    share->saved_data_file_length= file_stat.st_size;
313
177
 
314
 
    a_tina->addOpenTable(share->table_name, share);
315
 
 
 
178
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
179
      goto error;
 
180
    thr_lock_init(&share->lock);
316
181
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
317
182
 
318
183
    /*
321
186
      Usually this will result in auto-repair, and we will get a good
322
187
      meta-file in the end.
323
188
    */
324
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
325
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
189
    if ((share->meta_file= my_open(meta_file_name,
 
190
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
326
191
      share->crashed= true;
327
192
 
328
193
    /*
336
201
  pthread_mutex_unlock(&tina_mutex);
337
202
 
338
203
  return share;
 
204
 
 
205
error:
 
206
  pthread_mutex_unlock(&tina_mutex);
 
207
  free((unsigned char*) share);
 
208
 
 
209
  return NULL;
339
210
}
340
211
 
341
212
 
358
229
    non-zero - error occurred
359
230
*/
360
231
 
361
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
232
static int read_meta_file(File meta_file, ha_rows *rows)
362
233
{
363
234
  unsigned char meta_buffer[META_BUFFER_SIZE];
364
235
  unsigned char *ptr= meta_buffer;
365
236
 
366
237
  lseek(meta_file, 0, SEEK_SET);
367
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
238
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
368
239
      != META_BUFFER_SIZE)
369
240
    return(HA_ERR_CRASHED_ON_USAGE);
370
241
 
386
257
      ((bool)(*ptr)== true))
387
258
    return(HA_ERR_CRASHED_ON_USAGE);
388
259
 
389
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
260
  my_sync(meta_file, MYF(MY_WME));
390
261
 
391
262
  return(0);
392
263
}
411
282
    non-zero - error occurred
412
283
*/
413
284
 
414
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
285
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
415
286
{
416
287
  unsigned char meta_buffer[META_BUFFER_SIZE];
417
288
  unsigned char *ptr= meta_buffer;
431
302
  *ptr= (unsigned char)dirty;
432
303
 
433
304
  lseek(meta_file, 0, SEEK_SET);
434
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
305
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
435
306
      != META_BUFFER_SIZE)
436
307
    return(-1);
437
308
 
438
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
309
  my_sync(meta_file, MYF(MY_WME));
439
310
 
440
311
  return(0);
441
312
}
442
313
 
 
314
bool ha_tina::check_and_repair(Session *session)
 
315
{
 
316
  HA_CHECK_OPT check_opt;
 
317
 
 
318
  check_opt.init();
 
319
 
 
320
  return(repair(session, &check_opt));
 
321
}
 
322
 
 
323
 
443
324
int ha_tina::init_tina_writer()
444
325
{
445
326
  /*
450
331
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
451
332
 
452
333
  if ((share->tina_write_filedes=
453
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
334
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
454
335
  {
455
336
    share->crashed= true;
456
337
    return(1);
461
342
}
462
343
 
463
344
 
 
345
bool ha_tina::is_crashed() const
 
346
{
 
347
  return(share->crashed);
 
348
}
 
349
 
464
350
/*
465
351
  Free lock controls.
466
352
*/
467
 
int ha_tina::free_share()
 
353
static int free_share(TINA_SHARE *share)
468
354
{
469
355
  pthread_mutex_lock(&tina_mutex);
470
356
  int result_code= 0;
472
358
    /* Write the meta file. Mark it as crashed if needed. */
473
359
    (void)write_meta_file(share->meta_file, share->rows_recorded,
474
360
                          share->crashed ? true :false);
475
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
361
    if (my_close(share->meta_file, MYF(0)))
476
362
      result_code= 1;
477
363
    if (share->tina_write_opened)
478
364
    {
479
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
365
      if (my_close(share->tina_write_filedes, MYF(0)))
480
366
        result_code= 1;
481
367
      share->tina_write_opened= false;
482
368
    }
483
369
 
484
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
485
 
    a_tina->deleteOpenTable(share->table_name);
486
 
    delete share;
 
370
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
371
    thr_lock_delete(&share->lock);
 
372
    pthread_mutex_destroy(&share->mutex);
 
373
    free((unsigned char*) share);
487
374
  }
488
375
  pthread_mutex_unlock(&tina_mutex);
489
376
 
501
388
  '\r''\n' --  DOS\Windows line ending
502
389
*/
503
390
 
504
 
static off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
505
 
                            off_t end, int *eoln_len)
 
391
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
 
392
                     off_t end, int *eoln_len)
506
393
{
507
394
  *eoln_len= 0;
508
395
 
529
416
}
530
417
 
531
418
 
532
 
 
533
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
534
 
  :Cursor(engine_arg, table_arg),
 
419
static handler *tina_create_handler(handlerton *hton,
 
420
                                    TABLE_SHARE *table,
 
421
                                    MEM_ROOT *mem_root)
 
422
{
 
423
  return new (mem_root) ha_tina(hton, table);
 
424
}
 
425
 
 
426
 
 
427
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
 
428
  :handler(hton, table_arg),
535
429
  /*
536
 
    These definitions are found in Cursor.h
 
430
    These definitions are found in handler.h
537
431
    They are not probably completely right.
538
432
  */
539
433
  current_position(0), next_position(0), local_saved_data_file_length(0),
540
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
434
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
435
  local_data_file_version(0), records_is_known(0)
541
436
{
542
437
  /* Set our original buffers from pre-allocated memory */
543
438
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
439
  chain= chain_buffer;
544
440
  file_buff= new Transparent_file();
545
441
}
546
442
 
557
453
 
558
454
  buffer.length(0);
559
455
 
560
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
456
  for (Field **field=table->field ; *field ; field++)
561
457
  {
562
458
    const char *ptr;
563
459
    const char *end_ptr;
573
469
      (*field)->set_notnull();
574
470
    }
575
471
 
576
 
    /* 
577
 
      Since we are needing to "translate" the type into a string we 
578
 
      will need to do a val_str(). This would cause an assert() to
579
 
      normally occur since we are onlying "writing" values.
580
 
    */
581
 
    (*field)->setReadSet();
582
472
    (*field)->val_str(&attribute,&attribute);
583
473
 
584
474
    if (was_null)
597
487
        {
598
488
          buffer.append('\\');
599
489
          buffer.append('"');
600
 
          (void) *ptr++;
 
490
          *ptr++;
601
491
        }
602
492
        else if (*ptr == '\r')
603
493
        {
604
494
          buffer.append('\\');
605
495
          buffer.append('r');
606
 
          (void) *ptr++;
 
496
          *ptr++;
607
497
        }
608
498
        else if (*ptr == '\\')
609
499
        {
610
500
          buffer.append('\\');
611
501
          buffer.append('\\');
612
 
          (void) *ptr++;
 
502
          *ptr++;
613
503
        }
614
504
        else if (*ptr == '\n')
615
505
        {
616
506
          buffer.append('\\');
617
507
          buffer.append('n');
618
 
          (void) *ptr++;
 
508
          *ptr++;
619
509
        }
620
510
        else
621
511
          buffer.append(*ptr++);
645
535
*/
646
536
int ha_tina::chain_append()
647
537
{
648
 
  if (chain.size() > 0 && chain.back().second == current_position)
649
 
    chain.back().second= next_position;
 
538
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
539
    (chain_ptr -1)->end= next_position;
650
540
  else
651
 
    chain.push_back(make_pair(current_position, next_position));
 
541
  {
 
542
    /* We set up for the next position */
 
543
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
544
    {
 
545
      off_t location= chain_ptr - chain;
 
546
      chain_size += DEFAULT_CHAIN_LENGTH;
 
547
      if (chain_alloced)
 
548
      {
 
549
        if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
 
550
          return -1;
 
551
      }
 
552
      else
 
553
      {
 
554
        tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
 
555
        if (ptr == NULL)
 
556
          return -1;
 
557
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
558
        chain= ptr;
 
559
        chain_alloced++;
 
560
      }
 
561
      chain_ptr= chain + location;
 
562
    }
 
563
    chain_ptr->begin= current_position;
 
564
    chain_ptr->end= next_position;
 
565
    chain_ptr++;
 
566
  }
 
567
 
652
568
  return 0;
653
569
}
654
570
 
661
577
  off_t end_offset, curr_offset= current_position;
662
578
  int eoln_len;
663
579
  int error;
 
580
  bool read_all;
664
581
 
665
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
582
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
666
583
 
667
584
  /*
668
585
    We do not read further then local_saved_data_file_length in order
673
590
                       local_saved_data_file_length, &eoln_len)) == 0)
674
591
    return(HA_ERR_END_OF_FILE);
675
592
 
 
593
  /* We must read all columns in case a table is opened for update */
 
594
  read_all= !bitmap_is_clear_all(table->write_set);
676
595
  error= HA_ERR_CRASHED_ON_USAGE;
677
596
 
678
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
597
  memset(buf, 0, table->s->null_bytes);
679
598
 
680
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
599
  for (Field **field=table->field ; *field ; field++)
681
600
  {
682
601
    char curr_char;
683
602
 
742
661
      }
743
662
    }
744
663
 
745
 
    if ((*field)->isReadSet() || (*field)->isWriteSet())
 
664
    if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
746
665
    {
747
 
      /* This masks a bug in the logic for a SELECT * */
748
 
      (*field)->setWriteSet();
749
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
750
 
      {
 
666
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
667
                          CHECK_FIELD_WARN))
751
668
        goto err;
752
 
      }
753
 
 
754
669
      if ((*field)->flags & BLOB_FLAG)
755
670
      {
756
671
        Field_blob *blob= *(Field_blob**) field;
762
677
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
763
678
        if (src)
764
679
        {
765
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
 
680
          tgt= (unsigned char*) alloc_root(&blobroot, length);
766
681
          memmove(tgt, src, length);
767
682
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
768
683
        }
778
693
}
779
694
 
780
695
/*
 
696
  If frm_error() is called in table.cc this is called to find out what file
 
697
  extensions exist for this handler.
 
698
*/
 
699
static const char *ha_tina_exts[] = {
 
700
  CSV_EXT,
 
701
  CSM_EXT,
 
702
  NULL
 
703
};
 
704
 
 
705
const char **ha_tina::bas_ext() const
 
706
{
 
707
  return ha_tina_exts;
 
708
}
 
709
 
 
710
/*
 
711
  Three functions below are needed to enable concurrent insert functionality
 
712
  for CSV engine. For more details see mysys/thr_lock.c
 
713
*/
 
714
 
 
715
void tina_get_status(void* param, int)
 
716
{
 
717
  ha_tina *tina= (ha_tina*) param;
 
718
  tina->get_status();
 
719
}
 
720
 
 
721
void tina_update_status(void* param)
 
722
{
 
723
  ha_tina *tina= (ha_tina*) param;
 
724
  tina->update_status();
 
725
}
 
726
 
 
727
/* this should exist and return 0 for concurrent insert to work */
 
728
bool tina_check_status(void *)
 
729
{
 
730
  return 0;
 
731
}
 
732
 
 
733
/*
 
734
  Save the state of the table
 
735
 
 
736
  SYNOPSIS
 
737
    get_status()
 
738
 
 
739
  DESCRIPTION
 
740
    This function is used to retrieve the file length. During the lock
 
741
    phase of concurrent insert. For more details see comment to
 
742
    ha_tina::update_status below.
 
743
*/
 
744
 
 
745
void ha_tina::get_status()
 
746
{
 
747
  local_saved_data_file_length= share->saved_data_file_length;
 
748
}
 
749
 
 
750
 
 
751
/*
 
752
  Correct the state of the table. Called by unlock routines
 
753
  before the write lock is released.
 
754
 
 
755
  SYNOPSIS
 
756
    update_status()
 
757
 
 
758
  DESCRIPTION
 
759
    When we employ concurrent insert lock, we save current length of the file
 
760
    during the lock phase. We do not read further saved value, as we don't
 
761
    want to interfere with undergoing concurrent insert. Writers update file
 
762
    length info during unlock with update_status().
 
763
 
 
764
  NOTE
 
765
    For log tables concurrent insert works different. The reason is that
 
766
    log tables are always opened and locked. And as they do not unlock
 
767
    tables, the file length after writes should be updated in a different
 
768
    way.
 
769
*/
 
770
 
 
771
void ha_tina::update_status()
 
772
{
 
773
  /* correct local_saved_data_file_length for writers */
 
774
  share->saved_data_file_length= local_saved_data_file_length;
 
775
}
 
776
 
 
777
 
 
778
/*
781
779
  Open a database file. Keep in mind that tables are caches, so
782
780
  this will not be called for every request. Any sort of positions
783
781
  that need to be reset should be kept in the ::extra() call.
784
782
*/
785
 
int ha_tina::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
783
int ha_tina::open(const char *name, int, uint32_t open_options)
786
784
{
787
 
  if (not (share= get_share(identifier.getPath().c_str())))
788
 
    return(ENOENT);
 
785
  if (!(share= get_share(name, table)))
 
786
    return(HA_ERR_OUT_OF_MEM);
789
787
 
790
 
  if (share->crashed)
 
788
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
791
789
  {
792
 
    free_share();
 
790
    free_share(share);
793
791
    return(HA_ERR_CRASHED_ON_USAGE);
794
792
  }
795
793
 
796
794
  local_data_file_version= share->data_file_version;
797
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
795
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
798
796
    return(0);
799
797
 
800
798
  /*
801
 
    Init locking. Pass Cursor object to the locking routines,
 
799
    Init locking. Pass handler object to the locking routines,
802
800
    so that they could save/update local_saved_data_file_length value
803
801
    during locking. This is needed to enable concurrent inserts.
804
802
  */
 
803
  thr_lock_data_init(&share->lock, &lock, (void*) this);
805
804
  ref_length=sizeof(off_t);
806
805
 
 
806
  share->lock.get_status= tina_get_status;
 
807
  share->lock.update_status= tina_update_status;
 
808
  share->lock.check_status= tina_check_status;
 
809
 
807
810
  return(0);
808
811
}
809
812
 
 
813
 
810
814
/*
811
815
  Close a database file. We remove ourselves from the shared strucutre.
812
816
  If it is empty we destroy it.
814
818
int ha_tina::close(void)
815
819
{
816
820
  int rc= 0;
817
 
  rc= internal::my_close(data_file, MYF(0));
818
 
  return(free_share() || rc);
 
821
  rc= my_close(data_file, MYF(0));
 
822
  return(free_share(share) || rc);
819
823
}
820
824
 
821
825
/*
822
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
826
  This is an INSERT. At the moment this handler just seeks to the end
823
827
  of the file and appends the data. In an error case it really should
824
828
  just truncate to the original position (this is not done yet).
825
829
*/
826
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
830
int ha_tina::write_row(unsigned char * buf)
827
831
{
828
832
  int size;
829
833
 
830
834
  if (share->crashed)
831
835
      return(HA_ERR_CRASHED_ON_USAGE);
832
836
 
 
837
  ha_statistic_increment(&SSV::ha_write_count);
 
838
 
 
839
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
840
    table->timestamp_field->set_time();
 
841
 
833
842
  size= encode_quote(buf);
834
843
 
835
844
  if (!share->tina_write_opened)
837
846
      return(-1);
838
847
 
839
848
   /* use pwrite, as concurrent reader could have changed the position */
840
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
849
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
841
850
               MYF(MY_WME | MY_NABP)))
842
851
    return(-1);
843
852
 
862
871
  if (!share->update_file_opened)
863
872
  {
864
873
    if ((update_temp_file=
865
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
874
           my_create(fn_format(updated_fname, share->table_name,
866
875
                               "", CSN_EXT,
867
876
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
868
877
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
877
886
  This is called for an update.
878
887
  Make sure you put in code to increment the auto increment, also
879
888
  update any timestamp data. Currently auto increment is not being
880
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
889
  fixed since autoincrements have yet to be added to this table handler.
881
890
  This will be called in a table scan right before the previous ::rnd_next()
882
891
  call.
883
892
*/
884
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
893
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
885
894
{
886
895
  int size;
887
896
  int rc= -1;
888
897
 
 
898
  ha_statistic_increment(&SSV::ha_update_count);
 
899
 
 
900
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
901
    table->timestamp_field->set_time();
 
902
 
889
903
  size= encode_quote(new_data);
890
904
 
891
905
  /*
892
906
    During update we mark each updating record as deleted
893
907
    (see the chain_append()) then write new one to the temporary data file.
894
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
908
    At the end of the sequence in the rnd_end() we append all non-marked
895
909
    records from the data file to the temporary data file then rename it.
896
910
    The temp_file_length is used to calculate new data file length.
897
911
  */
901
915
  if (open_update_temp_file_if_needed())
902
916
    goto err;
903
917
 
904
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
918
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
905
919
               MYF(MY_WME | MY_NABP)))
906
920
    goto err;
907
921
  temp_file_length+= size;
916
930
  Deletes a row. First the database will find the row, and then call this
917
931
  method. In the case of a table scan, the previous call to this will be
918
932
  the ::rnd_next() that found this row.
919
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
933
  The exception to this is an ORDER BY. This will cause the table handler
920
934
  to walk the table noting the positions of all rows that match a query.
921
935
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
922
936
  DESC, ASC).
923
937
*/
924
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
938
int ha_tina::delete_row(const unsigned char *)
925
939
{
 
940
  ha_statistic_increment(&SSV::ha_delete_count);
926
941
 
927
942
  if (chain_append())
928
943
    return(-1);
956
971
  if (local_data_file_version != share->data_file_version)
957
972
  {
958
973
    local_data_file_version= share->data_file_version;
959
 
    if (internal::my_close(data_file, MYF(0)) ||
960
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
974
    if (my_close(data_file, MYF(0)) ||
 
975
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
961
976
      return 1;
962
977
  }
963
978
  file_buff->init_buff(data_file);
969
984
  All table scans call this first.
970
985
  The order of a table scan is:
971
986
 
 
987
  ha_tina::store_lock
 
988
  ha_tina::external_lock
972
989
  ha_tina::info
973
990
  ha_tina::rnd_init
974
991
  ha_tina::extra
984
1001
  ha_tina::rnd_next
985
1002
  ha_tina::extra
986
1003
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1004
  ha_tina::external_lock
987
1005
  ha_tina::extra
988
1006
  ENUM HA_EXTRA_RESET   Reset database to after open
989
1007
 
993
1011
 
994
1012
*/
995
1013
 
996
 
int ha_tina::doStartTableScan(bool)
 
1014
int ha_tina::rnd_init(bool)
997
1015
{
998
1016
  /* set buffer to the beginning of the file */
999
1017
  if (share->crashed || init_data_file())
1002
1020
  current_position= next_position= 0;
1003
1021
  stats.records= 0;
1004
1022
  records_is_known= 0;
1005
 
  chain.clear();
 
1023
  chain_ptr= chain;
1006
1024
 
1007
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1025
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1008
1026
 
1009
1027
  return(0);
1010
1028
}
1019
1037
  reserved for null count.
1020
1038
  Basically this works as a mask for which rows are nulled (compared to just
1021
1039
  empty).
1022
 
  This table Cursor doesn't do nulls and does not know the difference between
1023
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1040
  This table handler doesn't do nulls and does not know the difference between
 
1041
  NULL and "". This is ok since this table handler is for spreadsheets and
1024
1042
  they don't know about them either :)
1025
1043
*/
1026
1044
int ha_tina::rnd_next(unsigned char *buf)
1030
1048
  if (share->crashed)
1031
1049
      return(HA_ERR_CRASHED_ON_USAGE);
1032
1050
 
1033
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1051
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1034
1052
 
1035
1053
  current_position= next_position;
1036
1054
 
1056
1074
*/
1057
1075
void ha_tina::position(const unsigned char *)
1058
1076
{
1059
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1077
  my_store_ptr(ref, ref_length, current_position);
1060
1078
  return;
1061
1079
}
1062
1080
 
1063
1081
 
1064
1082
/*
1065
1083
  Used to fetch a row from a posiion stored with ::position().
1066
 
  internal::my_get_ptr() retrieves the data for you.
 
1084
  my_get_ptr() retrieves the data for you.
1067
1085
*/
1068
1086
 
1069
1087
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1070
1088
{
1071
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1072
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1089
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1090
  current_position= (off_t)my_get_ptr(pos,ref_length);
1073
1091
  return(find_current_row(buf));
1074
1092
}
1075
1093
 
1076
1094
/*
1077
1095
  ::info() is used to return information to the optimizer.
1078
 
  Currently this table Cursor doesn't implement most of the fields
 
1096
  Currently this table handler doesn't implement most of the fields
1079
1097
  really needed. SHOW also makes use of this data
1080
1098
*/
1081
1099
int ha_tina::info(uint32_t)
1091
1109
  to the given "hole", stored in the buffer. "Valid" here means,
1092
1110
  not listed in the chain of deleted records ("holes").
1093
1111
*/
1094
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1112
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1095
1113
{
1096
 
  if (closest_hole == chain.end()) /* no more chains */
 
1114
  if (closest_hole == chain_ptr) /* no more chains */
1097
1115
    *end_pos= file_buff->end();
1098
1116
  else
1099
1117
    *end_pos= std::min(file_buff->end(),
1100
 
                       closest_hole->first);
1101
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1118
                       closest_hole->begin);
 
1119
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1102
1120
}
1103
1121
 
1104
1122
 
1108
1126
  slots to clean up all of the dead space we have collected while
1109
1127
  performing deletes/updates.
1110
1128
*/
1111
 
int ha_tina::doEndTableScan()
 
1129
int ha_tina::rnd_end()
1112
1130
{
 
1131
  char updated_fname[FN_REFLEN];
1113
1132
  off_t file_buffer_start= 0;
1114
1133
 
1115
 
  blobroot.free_root(MYF(0));
 
1134
  free_root(&blobroot, MYF(0));
1116
1135
  records_is_known= 1;
1117
1136
 
1118
 
  if (chain.size() > 0)
 
1137
  if ((chain_ptr - chain)  > 0)
1119
1138
  {
1120
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1139
    tina_set *ptr= chain;
1121
1140
 
1122
1141
    /*
1123
1142
      Re-read the beginning of a file (as the buffer should point to the
1129
1148
      The sort is needed when there were updates/deletes with random orders.
1130
1149
      It sorts so that we move the firts blocks to the beginning.
1131
1150
    */
1132
 
    sort(chain.begin(), chain.end());
 
1151
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1152
             (qsort_cmp)sort_set);
1133
1153
 
1134
1154
    off_t write_begin= 0, write_end;
1135
1155
 
1142
1162
    {
1143
1163
      bool in_hole= get_write_pos(&write_end, ptr);
1144
1164
      off_t write_length= write_end - write_begin;
1145
 
      if ((uint64_t)write_length > SIZE_MAX)
1146
 
      {
1147
 
        goto error;
1148
 
      }
1149
1165
 
1150
1166
      /* if there is something to write, write it */
1151
1167
      if (write_length)
1152
1168
      {
1153
 
        if (internal::my_write(update_temp_file,
 
1169
        if (my_write(update_temp_file,
1154
1170
                     (unsigned char*) (file_buff->ptr() +
1155
1171
                               (write_begin - file_buff->start())),
1156
 
                     (size_t)write_length, MYF_RW))
 
1172
                     write_length, MYF_RW))
1157
1173
          goto error;
1158
1174
        temp_file_length+= write_length;
1159
1175
      }
1160
1176
      if (in_hole)
1161
1177
      {
1162
1178
        /* skip hole */
1163
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1179
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1164
1180
          file_buffer_start= file_buff->read_next();
1165
 
        write_begin= ptr->second;
1166
 
        ++ptr;
 
1181
        write_begin= ptr->end;
 
1182
        ptr++;
1167
1183
      }
1168
1184
      else
1169
1185
        write_begin= write_end;
1173
1189
 
1174
1190
    }
1175
1191
 
1176
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1177
 
        internal::my_close(update_temp_file, MYF(0)))
 
1192
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1193
        my_close(update_temp_file, MYF(0)))
1178
1194
      return(-1);
1179
1195
 
1180
1196
    share->update_file_opened= false;
1181
1197
 
1182
1198
    if (share->tina_write_opened)
1183
1199
    {
1184
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1200
      if (my_close(share->tina_write_filedes, MYF(0)))
1185
1201
        return(-1);
1186
1202
      /*
1187
1203
        Mark that the writer fd is closed, so that init_tina_writer()
1194
1210
      Close opened fildes's. Then move updated file in place
1195
1211
      of the old datafile.
1196
1212
    */
1197
 
    std::string rename_file= share->table_name;
1198
 
    rename_file.append(CSN_EXT);
1199
 
    if (internal::my_close(data_file, MYF(0)) ||
1200
 
        internal::my_rename(rename_file.c_str(),
1201
 
                            share->data_file_name.c_str(), MYF(0)))
 
1213
    if (my_close(data_file, MYF(0)) ||
 
1214
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1215
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1216
                  share->data_file_name, MYF(0)))
1202
1217
      return(-1);
1203
1218
 
1204
1219
    /* Open the file again */
1205
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1220
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1206
1221
      return(-1);
1207
1222
    /*
1208
1223
      As we reopened the data file, increase share->data_file_version
1229
1244
 
1230
1245
  return(0);
1231
1246
error:
1232
 
  internal::my_close(update_temp_file, MYF(0));
 
1247
  my_close(update_temp_file, MYF(0));
1233
1248
  share->update_file_opened= false;
1234
1249
  return(-1);
1235
1250
}
1236
1251
 
1237
1252
 
1238
1253
/*
 
1254
  Repair CSV table in the case, it is crashed.
 
1255
 
 
1256
  SYNOPSIS
 
1257
    repair()
 
1258
    session         The thread, performing repair
 
1259
    check_opt   The options for repair. We do not use it currently.
 
1260
 
 
1261
  DESCRIPTION
 
1262
    If the file is empty, change # of rows in the file and complete recovery.
 
1263
    Otherwise, scan the table looking for bad rows. If none were found,
 
1264
    we mark file as a good one and return. If a bad row was encountered,
 
1265
    we truncate the datafile up to the last good row.
 
1266
 
 
1267
   TODO: Make repair more clever - it should try to recover subsequent
 
1268
         rows (after the first bad one) as well.
 
1269
*/
 
1270
 
 
1271
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
 
1272
{
 
1273
  char repaired_fname[FN_REFLEN];
 
1274
  unsigned char *buf;
 
1275
  File repair_file;
 
1276
  int rc;
 
1277
  ha_rows rows_repaired= 0;
 
1278
  off_t write_begin= 0, write_end;
 
1279
 
 
1280
  /* empty file */
 
1281
  if (!share->saved_data_file_length)
 
1282
  {
 
1283
    share->rows_recorded= 0;
 
1284
    goto end;
 
1285
  }
 
1286
 
 
1287
  /* Don't assert in field::val() functions */
 
1288
  table->use_all_columns();
 
1289
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1290
    return(HA_ERR_OUT_OF_MEM);
 
1291
 
 
1292
  /* position buffer to the start of the file */
 
1293
  if (init_data_file())
 
1294
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1295
 
 
1296
  /*
 
1297
    Local_saved_data_file_length is initialized during the lock phase.
 
1298
    Sometimes this is not getting executed before ::repair (e.g. for
 
1299
    the log tables). We set it manually here.
 
1300
  */
 
1301
  local_saved_data_file_length= share->saved_data_file_length;
 
1302
  /* set current position to the beginning of the file */
 
1303
  current_position= next_position= 0;
 
1304
 
 
1305
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1306
 
 
1307
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1308
  while (!(rc= find_current_row(buf)))
 
1309
  {
 
1310
    session_inc_row_count(session);
 
1311
    rows_repaired++;
 
1312
    current_position= next_position;
 
1313
  }
 
1314
 
 
1315
  free_root(&blobroot, MYF(0));
 
1316
 
 
1317
  free((char*)buf);
 
1318
 
 
1319
  if (rc == HA_ERR_END_OF_FILE)
 
1320
  {
 
1321
    /*
 
1322
      All rows were read ok until end of file, the file does not need repair.
 
1323
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1324
      to the current amount of rows.
 
1325
    */
 
1326
    share->rows_recorded= rows_repaired;
 
1327
    goto end;
 
1328
  }
 
1329
 
 
1330
  /*
 
1331
    Otherwise we've encountered a bad row => repair is needed.
 
1332
    Let us create a temporary file.
 
1333
  */
 
1334
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1335
                                        "", CSN_EXT,
 
1336
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1337
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1338
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1339
 
 
1340
  file_buff->init_buff(data_file);
 
1341
 
 
1342
 
 
1343
  /* we just truncated the file up to the first bad row. update rows count. */
 
1344
  share->rows_recorded= rows_repaired;
 
1345
 
 
1346
  /* write repaired file */
 
1347
  while (1)
 
1348
  {
 
1349
    write_end= std::min(file_buff->end(), current_position);
 
1350
    if ((write_end - write_begin) &&
 
1351
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1352
                  write_end - write_begin, MYF_RW)))
 
1353
      return(-1);
 
1354
 
 
1355
    write_begin= write_end;
 
1356
    if (write_end== current_position)
 
1357
      break;
 
1358
    else
 
1359
      file_buff->read_next(); /* shift the buffer */
 
1360
  }
 
1361
 
 
1362
  /*
 
1363
    Close the files and rename repaired file to the datafile.
 
1364
    We have to close the files, as on Windows one cannot rename
 
1365
    a file, which descriptor is still open. EACCES will be returned
 
1366
    when trying to delete the "to"-file in my_rename().
 
1367
  */
 
1368
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1369
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1370
    return(-1);
 
1371
 
 
1372
  /* Open the file again, it should now be repaired */
 
1373
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1374
                          MYF(0))) == -1)
 
1375
     return(-1);
 
1376
 
 
1377
  /* Set new file size. The file size will be updated by ::update_status() */
 
1378
  local_saved_data_file_length= (size_t) current_position;
 
1379
 
 
1380
end:
 
1381
  share->crashed= false;
 
1382
  return(HA_ADMIN_OK);
 
1383
}
 
1384
 
 
1385
/*
1239
1386
  DELETE without WHERE calls this
1240
1387
*/
1241
1388
 
1244
1391
  int rc;
1245
1392
 
1246
1393
  if (!records_is_known)
1247
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1394
    return(my_errno=HA_ERR_WRONG_COMMAND);
1248
1395
 
1249
1396
  if (!share->tina_write_opened)
1250
1397
    if (init_tina_writer())
1263
1410
}
1264
1411
 
1265
1412
/*
 
1413
  Called by the database to lock the table. Keep in mind that this
 
1414
  is an internal lock.
 
1415
*/
 
1416
THR_LOCK_DATA **ha_tina::store_lock(Session *,
 
1417
                                    THR_LOCK_DATA **to,
 
1418
                                    enum thr_lock_type lock_type)
 
1419
{
 
1420
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1421
    lock.type=lock_type;
 
1422
  *to++= &lock;
 
1423
  return to;
 
1424
}
 
1425
 
 
1426
/*
1266
1427
  Create a table. You do not want to leave the table open after a call to
1267
1428
  this (the database will call ::open() if it needs to).
1268
1429
*/
1269
1430
 
1270
 
int Tina::doCreateTable(Session &session,
1271
 
                        Table& table_arg,
1272
 
                        const drizzled::TableIdentifier &identifier,
1273
 
                        drizzled::message::Table &create_proto)
 
1431
int ha_tina::create(const char *name, Table *table_arg, HA_CREATE_INFO *)
1274
1432
{
1275
1433
  char name_buff[FN_REFLEN];
1276
 
  int create_file;
 
1434
  File create_file;
1277
1435
 
1278
1436
  /*
1279
1437
    check columns
1280
1438
  */
1281
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1282
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1283
 
       iter != fields.end();
1284
 
       iter++)
 
1439
  for (Field **field= table_arg->s->field; *field; field++)
1285
1440
  {
1286
 
    if (not *iter) // Historical legacy for NULL array end.
1287
 
      continue;
1288
 
 
1289
 
    if ((*iter)->real_maybe_null())
 
1441
    if ((*field)->real_maybe_null())
1290
1442
    {
1291
1443
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1292
1444
      return(HA_ERR_UNSUPPORTED);
1294
1446
  }
1295
1447
 
1296
1448
 
1297
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1298
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1299
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1449
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
 
1450
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1451
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1300
1452
    return(-1);
1301
1453
 
1302
1454
  write_meta_file(create_file, 0, false);
1303
 
  internal::my_close(create_file, MYF(0));
 
1455
  my_close(create_file, MYF(0));
1304
1456
 
1305
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1306
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1307
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1457
  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
 
1458
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1459
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1308
1460
    return(-1);
1309
1461
 
1310
 
  internal::my_close(create_file, MYF(0));
1311
 
 
1312
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
1313
 
 
1314
 
  return 0;
1315
 
}
1316
 
 
1317
 
 
1318
 
DRIZZLE_DECLARE_PLUGIN
1319
 
{
1320
 
  DRIZZLE_VERSION_ID,
 
1462
  my_close(create_file, MYF(0));
 
1463
 
 
1464
  return(0);
 
1465
}
 
1466
 
 
1467
int ha_tina::check(Session* session, HA_CHECK_OPT *)
 
1468
{
 
1469
  int rc= 0;
 
1470
  unsigned char *buf;
 
1471
  const char *old_proc_info;
 
1472
  ha_rows count= share->rows_recorded;
 
1473
 
 
1474
  old_proc_info= get_session_proc_info(session);
 
1475
  set_session_proc_info(session, "Checking table");
 
1476
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1477
    return(HA_ERR_OUT_OF_MEM);
 
1478
 
 
1479
  /* position buffer to the start of the file */
 
1480
   if (init_data_file())
 
1481
     return(HA_ERR_CRASHED);
 
1482
 
 
1483
  /*
 
1484
    Local_saved_data_file_length is initialized during the lock phase.
 
1485
    Check does not use store_lock in certain cases. So, we set it
 
1486
    manually here.
 
1487
  */
 
1488
  local_saved_data_file_length= share->saved_data_file_length;
 
1489
  /* set current position to the beginning of the file */
 
1490
  current_position= next_position= 0;
 
1491
 
 
1492
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1493
 
 
1494
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1495
  while (!(rc= find_current_row(buf)))
 
1496
  {
 
1497
    session_inc_row_count(session);
 
1498
    count--;
 
1499
    current_position= next_position;
 
1500
  }
 
1501
 
 
1502
  free_root(&blobroot, MYF(0));
 
1503
 
 
1504
  free((char*)buf);
 
1505
  set_session_proc_info(session, old_proc_info);
 
1506
 
 
1507
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1508
  {
 
1509
    share->crashed= true;
 
1510
    return(HA_ADMIN_CORRUPT);
 
1511
  }
 
1512
  else
 
1513
    return(HA_ADMIN_OK);
 
1514
}
 
1515
 
 
1516
 
 
1517
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *, uint32_t)
 
1518
{
 
1519
  return COMPATIBLE_DATA_YES;
 
1520
}
 
1521
 
 
1522
mysql_declare_plugin(csv)
 
1523
{
 
1524
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
1321
1525
  "CSV",
1322
1526
  "1.0",
1323
1527
  "Brian Aker, MySQL AB",
1324
1528
  "CSV storage engine",
1325
1529
  PLUGIN_LICENSE_GPL,
1326
1530
  tina_init_func, /* Plugin Init */
 
1531
  tina_done_func, /* Plugin Deinit */
 
1532
  NULL,                       /* status variables                */
1327
1533
  NULL,                       /* system variables                */
1328
1534
  NULL                        /* config options                  */
1329
1535
}
1330
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1536
mysql_declare_plugin_end;
1331
1537