~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/csv/ha_tina.cc

  • Committer: Brian Aker
  • Date: 2009-02-10 00:14:40 UTC
  • Revision ID: brian@tangent.org-20090210001440-qjg8eofh3h93064b
Adding Multi-threaded Scheduler into the system.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include <config.h>
 
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
 
46
#include <drizzled/field/timestamp.h>
 
47
#include <storage/csv/ha_tina.h>
46
48
#include <drizzled/error.h>
47
49
#include <drizzled/table.h>
48
50
#include <drizzled/session.h>
49
 
#include <drizzled/internal/my_sys.h>
50
 
 
51
 
#include "ha_tina.h"
52
 
 
53
 
#include <fcntl.h>
54
 
 
55
 
#include <algorithm>
56
 
#include <vector>
57
 
#include <string>
58
 
#include <map>
59
 
 
60
 
using namespace std;
61
 
using namespace drizzled;
62
51
 
63
52
/*
64
53
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
65
54
*/
66
 
static const int META_BUFFER_SIZE = sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t)
67
 
  + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char);
68
 
static const int TINA_CHECK_HEADER = 254; // The number we use to determine corruption
69
 
static const int BLOB_MEMROOT_ALLOC_SIZE = 8192;
 
55
#define META_BUFFER_SIZE sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t) \
 
56
  + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char)
 
57
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
 
58
#define BLOB_MEMROOT_ALLOC_SIZE 8192
70
59
 
71
60
/* The file extension */
72
 
static const char* CSV_EXT = ".CSV"               // The data file
73
 
static const char* CSN_EXT = ".CSN"               // Files used during repair and update
74
 
static const char* CSM_EXT = ".CSM"               // Meta file
75
 
 
76
 
 
77
 
static int read_meta_file(int meta_file, ha_rows *rows);
78
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
61
#define CSV_EXT ".CSV"               // The data file
 
62
#define CSN_EXT ".CSN"               // Files used during repair and update
 
63
#define CSM_EXT ".CSM"               // Meta file
 
64
 
 
65
 
 
66
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
67
static int free_share(TINA_SHARE *share);
 
68
static int read_meta_file(File meta_file, ha_rows *rows);
 
69
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
70
 
 
71
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
72
extern "C" void tina_update_status(void* param);
 
73
extern "C" bool tina_check_status(void* param);
79
74
 
80
75
/* Stuff for shares */
81
76
pthread_mutex_t tina_mutex;
 
77
static HASH tina_open_tables;
 
78
static handler *tina_create_handler(handlerton *hton,
 
79
                                    TABLE_SHARE *table,
 
80
                                    MEM_ROOT *mem_root);
 
81
 
82
82
 
83
83
/*****************************************************************************
84
84
 ** TINA tables
85
85
 *****************************************************************************/
86
86
 
87
87
/*
88
 
  If frm_error() is called in table.cc this is called to find out what file
89
 
  extensions exist for this Cursor.
 
88
  Used for sorting chains with qsort().
90
89
*/
91
 
static const char *ha_tina_exts[] = {
92
 
  CSV_EXT,
93
 
  CSM_EXT,
94
 
  NULL
95
 
};
96
 
 
97
 
class Tina : public drizzled::plugin::StorageEngine
98
 
{
99
 
  typedef std::map<string, TinaShare*> TinaMap;
100
 
  TinaMap tina_open_tables;
101
 
public:
102
 
  Tina(const string& name_arg)
103
 
   : drizzled::plugin::StorageEngine(name_arg,
104
 
                                     HTON_TEMPORARY_ONLY |
105
 
                                     HTON_NO_AUTO_INCREMENT |
106
 
                                     HTON_SKIP_STORE_LOCK),
107
 
    tina_open_tables()
108
 
  {}
109
 
  virtual ~Tina()
110
 
  {
111
 
    pthread_mutex_destroy(&tina_mutex);
112
 
  }
113
 
 
114
 
  virtual Cursor *create(Table &table)
115
 
  {
116
 
    return new ha_tina(*this, table);
117
 
  }
118
 
 
119
 
  const char **bas_ext() const {
120
 
    return ha_tina_exts;
121
 
  }
122
 
 
123
 
  int doCreateTable(Session &,
124
 
                    Table &table_arg,
125
 
                    const drizzled::identifier::Table &identifier,
126
 
                    drizzled::message::Table&);
127
 
 
128
 
  int doGetTableDefinition(Session& session,
129
 
                           const drizzled::identifier::Table &identifier,
130
 
                           drizzled::message::Table &table_message);
131
 
 
132
 
  int doDropTable(Session&, const drizzled::identifier::Table &identifier);
133
 
  TinaShare *findOpenTable(const string table_name);
134
 
  void addOpenTable(const string &table_name, TinaShare *);
135
 
  void deleteOpenTable(const string &table_name);
136
 
 
137
 
 
138
 
  uint32_t max_keys()          const { return 0; }
139
 
  uint32_t max_key_parts()     const { return 0; }
140
 
  uint32_t max_key_length()    const { return 0; }
141
 
  bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
 
  int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
143
 
 
144
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
 
                             const drizzled::identifier::Schema &schema_identifier,
146
 
                             drizzled::identifier::Table::vector &set_of_identifiers);
147
 
};
148
 
 
149
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
 
                                 const drizzled::identifier::Schema&,
151
 
                                 drizzled::identifier::Table::vector&)
152
 
{
153
 
}
154
 
 
155
 
int Tina::doRenameTable(Session &session,
156
 
                        const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
157
 
{
158
 
  int error= 0;
159
 
  for (const char **ext= bas_ext(); *ext ; ext++)
160
 
  {
161
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
162
 
    {
163
 
      if ((error=errno) != ENOENT)
164
 
        break;
165
 
      error= 0;
166
 
    }
167
 
  }
168
 
 
169
 
  session.getMessageCache().renameTableMessage(from, to);
170
 
 
171
 
  return error;
172
 
}
173
 
 
174
 
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
175
 
{
176
 
  return session.getMessageCache().doesTableMessageExist(identifier);
177
 
}
178
 
 
179
 
 
180
 
int Tina::doDropTable(Session &session,
181
 
                      const drizzled::identifier::Table &identifier)
182
 
{
183
 
  int error= 0;
184
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
185
 
 
186
 
  for (const char **ext= bas_ext(); *ext ; ext++)
187
 
  {
188
 
    std::string full_name= identifier.getPath();
189
 
    full_name.append(*ext);
190
 
 
191
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
192
 
    {
193
 
      if ((error= errno) != ENOENT)
194
 
        break;
195
 
    }
196
 
    else
197
 
    {
198
 
      enoent_or_zero= 0;                        // No error for ENOENT
199
 
    }
200
 
    error= enoent_or_zero;
201
 
  }
202
 
 
203
 
  session.getMessageCache().removeTableMessage(identifier);
204
 
 
205
 
  return error;
206
 
}
207
 
 
208
 
TinaShare *Tina::findOpenTable(const string table_name)
209
 
{
210
 
  TinaMap::iterator find_iter=
211
 
    tina_open_tables.find(table_name);
212
 
 
213
 
  if (find_iter != tina_open_tables.end())
214
 
    return (*find_iter).second;
215
 
  else
216
 
    return NULL;
217
 
}
218
 
 
219
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
220
 
{
221
 
  tina_open_tables[table_name]= share;
222
 
}
223
 
 
224
 
void Tina::deleteOpenTable(const string &table_name)
225
 
{
226
 
  tina_open_tables.erase(table_name);
227
 
}
228
 
 
229
 
 
230
 
int Tina::doGetTableDefinition(Session &session,
231
 
                               const drizzled::identifier::Table &identifier,
232
 
                               drizzled::message::Table &table_message)
233
 
{
234
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
235
 
    return EEXIST;
236
 
 
237
 
  return ENOENT;
238
 
}
239
 
 
240
 
 
241
 
static Tina *tina_engine= NULL;
242
 
 
243
 
static int tina_init_func(drizzled::module::Context &context)
244
 
{
245
 
 
246
 
  tina_engine= new Tina("CSV");
247
 
  context.add(tina_engine);
248
 
 
 
90
int sort_set (tina_set *a, tina_set *b)
 
91
{
 
92
  /*
 
93
    We assume that intervals do not intersect. So, it is enought to compare
 
94
    any two points. Here we take start of intervals for comparison.
 
95
  */
 
96
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
97
}
 
98
 
 
99
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
 
100
{
 
101
  *length=share->table_name_length;
 
102
  return (unsigned char*) share->table_name;
 
103
}
 
104
 
 
105
static int tina_init_func(void *p)
 
106
{
 
107
  handlerton *tina_hton;
 
108
 
 
109
  tina_hton= (handlerton *)p;
249
110
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
250
 
  return 0;
251
 
}
252
 
 
253
 
 
254
 
 
255
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
256
 
  table_name(table_name_arg),
257
 
  data_file_name(table_name_arg),
258
 
  use_count(0),
259
 
  saved_data_file_length(0),
260
 
  update_file_opened(false),
261
 
  tina_write_opened(false),
262
 
  crashed(false),
263
 
  rows_recorded(0),
264
 
  data_file_version(0)
265
 
{
266
 
  data_file_name.append(CSV_EXT);
267
 
}
268
 
 
269
 
TinaShare::~TinaShare()
270
 
{
271
 
  pthread_mutex_destroy(&mutex);
272
 
}
 
111
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
112
                   (hash_get_key) tina_get_key,0,0);
 
113
  tina_hton->state= SHOW_OPTION_YES;
 
114
  tina_hton->create= tina_create_handler;
 
115
  tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES |
 
116
                     HTON_NO_PARTITION);
 
117
  return 0;
 
118
}
 
119
 
 
120
static int tina_done_func(void *)
 
121
{
 
122
  hash_free(&tina_open_tables);
 
123
  pthread_mutex_destroy(&tina_mutex);
 
124
 
 
125
  return 0;
 
126
}
 
127
 
273
128
 
274
129
/*
275
130
  Simple lock controls.
276
131
*/
277
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
132
static TINA_SHARE *get_share(const char *table_name, Table *)
278
133
{
279
 
  pthread_mutex_lock(&tina_mutex);
280
 
 
281
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
282
 
  share= a_tina->findOpenTable(table_name);
283
 
 
284
 
  std::string meta_file_name;
 
134
  TINA_SHARE *share;
 
135
  char meta_file_name[FN_REFLEN];
285
136
  struct stat file_stat;
 
137
  char *tmp_name;
 
138
  uint32_t length;
 
139
 
 
140
  pthread_mutex_lock(&tina_mutex);
 
141
  length=(uint) strlen(table_name);
286
142
 
287
143
  /*
288
144
    If share is not present in the hash, create a new share and
289
145
    initialize its members.
290
146
  */
291
 
  if (! share)
 
147
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
148
                                        (unsigned char*) table_name,
 
149
                                       length)))
292
150
  {
293
 
    share= new TinaShare(table_name);
294
 
 
295
 
    if (share == NULL)
296
 
    {
297
 
      pthread_mutex_unlock(&tina_mutex);
298
 
      return NULL;
299
 
    }
300
 
 
301
 
    meta_file_name.assign(table_name);
302
 
    meta_file_name.append(CSM_EXT);
303
 
 
304
 
    if (stat(share->data_file_name.c_str(), &file_stat))
305
 
    {
306
 
      pthread_mutex_unlock(&tina_mutex);
307
 
      delete share;
308
 
      return NULL;
309
 
    }
310
 
  
 
151
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
152
                         &share, sizeof(*share),
 
153
                         &tmp_name, length+1,
 
154
                         NULL))
 
155
    {
 
156
      pthread_mutex_unlock(&tina_mutex);
 
157
      return NULL;
 
158
    }
 
159
 
 
160
    share->use_count= 0;
 
161
    share->table_name_length= length;
 
162
    share->table_name= tmp_name;
 
163
    share->crashed= false;
 
164
    share->rows_recorded= 0;
 
165
    share->update_file_opened= false;
 
166
    share->tina_write_opened= false;
 
167
    share->data_file_version= 0;
 
168
    strcpy(share->table_name, table_name);
 
169
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
170
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
171
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
172
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
173
 
 
174
    if (stat(share->data_file_name, &file_stat))
 
175
      goto error;
311
176
    share->saved_data_file_length= file_stat.st_size;
312
177
 
313
 
    a_tina->addOpenTable(share->table_name, share);
314
 
 
 
178
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
179
      goto error;
 
180
    thr_lock_init(&share->lock);
315
181
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
316
182
 
317
183
    /*
320
186
      Usually this will result in auto-repair, and we will get a good
321
187
      meta-file in the end.
322
188
    */
323
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
324
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
189
    if ((share->meta_file= my_open(meta_file_name,
 
190
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
325
191
      share->crashed= true;
326
192
 
327
193
    /*
335
201
  pthread_mutex_unlock(&tina_mutex);
336
202
 
337
203
  return share;
 
204
 
 
205
error:
 
206
  pthread_mutex_unlock(&tina_mutex);
 
207
  free((unsigned char*) share);
 
208
 
 
209
  return NULL;
338
210
}
339
211
 
340
212
 
357
229
    non-zero - error occurred
358
230
*/
359
231
 
360
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
232
static int read_meta_file(File meta_file, ha_rows *rows)
361
233
{
362
234
  unsigned char meta_buffer[META_BUFFER_SIZE];
363
235
  unsigned char *ptr= meta_buffer;
364
236
 
365
237
  lseek(meta_file, 0, SEEK_SET);
366
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
238
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
367
239
      != META_BUFFER_SIZE)
368
240
    return(HA_ERR_CRASHED_ON_USAGE);
369
241
 
385
257
      ((bool)(*ptr)== true))
386
258
    return(HA_ERR_CRASHED_ON_USAGE);
387
259
 
388
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
260
  my_sync(meta_file, MYF(MY_WME));
389
261
 
390
262
  return(0);
391
263
}
410
282
    non-zero - error occurred
411
283
*/
412
284
 
413
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
285
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
414
286
{
415
287
  unsigned char meta_buffer[META_BUFFER_SIZE];
416
288
  unsigned char *ptr= meta_buffer;
430
302
  *ptr= (unsigned char)dirty;
431
303
 
432
304
  lseek(meta_file, 0, SEEK_SET);
433
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
305
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
434
306
      != META_BUFFER_SIZE)
435
307
    return(-1);
436
308
 
437
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
309
  my_sync(meta_file, MYF(MY_WME));
438
310
 
439
311
  return(0);
440
312
}
441
313
 
 
314
bool ha_tina::check_and_repair(Session *session)
 
315
{
 
316
  HA_CHECK_OPT check_opt;
 
317
 
 
318
  check_opt.init();
 
319
 
 
320
  return(repair(session, &check_opt));
 
321
}
 
322
 
 
323
 
442
324
int ha_tina::init_tina_writer()
443
325
{
444
326
  /*
449
331
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
450
332
 
451
333
  if ((share->tina_write_filedes=
452
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
334
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
453
335
  {
454
336
    share->crashed= true;
455
337
    return(1);
460
342
}
461
343
 
462
344
 
 
345
bool ha_tina::is_crashed() const
 
346
{
 
347
  return(share->crashed);
 
348
}
 
349
 
463
350
/*
464
351
  Free lock controls.
465
352
*/
466
 
int ha_tina::free_share()
 
353
static int free_share(TINA_SHARE *share)
467
354
{
468
355
  pthread_mutex_lock(&tina_mutex);
469
356
  int result_code= 0;
471
358
    /* Write the meta file. Mark it as crashed if needed. */
472
359
    (void)write_meta_file(share->meta_file, share->rows_recorded,
473
360
                          share->crashed ? true :false);
474
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
361
    if (my_close(share->meta_file, MYF(0)))
475
362
      result_code= 1;
476
363
    if (share->tina_write_opened)
477
364
    {
478
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
365
      if (my_close(share->tina_write_filedes, MYF(0)))
479
366
        result_code= 1;
480
367
      share->tina_write_opened= false;
481
368
    }
482
369
 
483
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
484
 
    a_tina->deleteOpenTable(share->table_name);
485
 
    delete share;
 
370
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
371
    thr_lock_delete(&share->lock);
 
372
    pthread_mutex_destroy(&share->mutex);
 
373
    free((unsigned char*) share);
486
374
  }
487
375
  pthread_mutex_unlock(&tina_mutex);
488
376
 
500
388
  '\r''\n' --  DOS\Windows line ending
501
389
*/
502
390
 
503
 
static off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
504
 
                            off_t end, int *eoln_len)
 
391
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
 
392
                     off_t end, int *eoln_len)
505
393
{
506
394
  *eoln_len= 0;
507
395
 
528
416
}
529
417
 
530
418
 
531
 
 
532
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
533
 
  :Cursor(engine_arg, table_arg),
 
419
static handler *tina_create_handler(handlerton *hton,
 
420
                                    TABLE_SHARE *table,
 
421
                                    MEM_ROOT *mem_root)
 
422
{
 
423
  return new (mem_root) ha_tina(hton, table);
 
424
}
 
425
 
 
426
 
 
427
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
 
428
  :handler(hton, table_arg),
534
429
  /*
535
 
    These definitions are found in Cursor.h
 
430
    These definitions are found in handler.h
536
431
    They are not probably completely right.
537
432
  */
538
433
  current_position(0), next_position(0), local_saved_data_file_length(0),
539
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
434
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
435
  local_data_file_version(0), records_is_known(0)
540
436
{
541
437
  /* Set our original buffers from pre-allocated memory */
542
438
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
439
  chain= chain_buffer;
543
440
  file_buff= new Transparent_file();
544
441
}
545
442
 
556
453
 
557
454
  buffer.length(0);
558
455
 
559
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
456
  for (Field **field=table->field ; *field ; field++)
560
457
  {
561
458
    const char *ptr;
562
459
    const char *end_ptr;
572
469
      (*field)->set_notnull();
573
470
    }
574
471
 
575
 
    /* 
576
 
      Since we are needing to "translate" the type into a string we 
577
 
      will need to do a val_str(). This would cause an assert() to
578
 
      normally occur since we are onlying "writing" values.
579
 
    */
580
 
    (*field)->setReadSet();
581
472
    (*field)->val_str(&attribute,&attribute);
582
473
 
583
474
    if (was_null)
596
487
        {
597
488
          buffer.append('\\');
598
489
          buffer.append('"');
599
 
          (void) *ptr++;
 
490
          *ptr++;
600
491
        }
601
492
        else if (*ptr == '\r')
602
493
        {
603
494
          buffer.append('\\');
604
495
          buffer.append('r');
605
 
          (void) *ptr++;
 
496
          *ptr++;
606
497
        }
607
498
        else if (*ptr == '\\')
608
499
        {
609
500
          buffer.append('\\');
610
501
          buffer.append('\\');
611
 
          (void) *ptr++;
 
502
          *ptr++;
612
503
        }
613
504
        else if (*ptr == '\n')
614
505
        {
615
506
          buffer.append('\\');
616
507
          buffer.append('n');
617
 
          (void) *ptr++;
 
508
          *ptr++;
618
509
        }
619
510
        else
620
511
          buffer.append(*ptr++);
644
535
*/
645
536
int ha_tina::chain_append()
646
537
{
647
 
  if (chain.size() > 0 && chain.back().second == current_position)
648
 
    chain.back().second= next_position;
 
538
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
539
    (chain_ptr -1)->end= next_position;
649
540
  else
650
 
    chain.push_back(make_pair(current_position, next_position));
 
541
  {
 
542
    /* We set up for the next position */
 
543
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
544
    {
 
545
      off_t location= chain_ptr - chain;
 
546
      chain_size += DEFAULT_CHAIN_LENGTH;
 
547
      if (chain_alloced)
 
548
      {
 
549
        if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
 
550
          return -1;
 
551
      }
 
552
      else
 
553
      {
 
554
        tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
 
555
        if (ptr == NULL)
 
556
          return -1;
 
557
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
558
        chain= ptr;
 
559
        chain_alloced++;
 
560
      }
 
561
      chain_ptr= chain + location;
 
562
    }
 
563
    chain_ptr->begin= current_position;
 
564
    chain_ptr->end= next_position;
 
565
    chain_ptr++;
 
566
  }
 
567
 
651
568
  return 0;
652
569
}
653
570
 
660
577
  off_t end_offset, curr_offset= current_position;
661
578
  int eoln_len;
662
579
  int error;
 
580
  bool read_all;
663
581
 
664
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
582
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
665
583
 
666
584
  /*
667
585
    We do not read further then local_saved_data_file_length in order
672
590
                       local_saved_data_file_length, &eoln_len)) == 0)
673
591
    return(HA_ERR_END_OF_FILE);
674
592
 
 
593
  /* We must read all columns in case a table is opened for update */
 
594
  read_all= !bitmap_is_clear_all(table->write_set);
675
595
  error= HA_ERR_CRASHED_ON_USAGE;
676
596
 
677
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
597
  memset(buf, 0, table->s->null_bytes);
678
598
 
679
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
599
  for (Field **field=table->field ; *field ; field++)
680
600
  {
681
601
    char curr_char;
682
602
 
741
661
      }
742
662
    }
743
663
 
744
 
    if ((*field)->isReadSet() || (*field)->isWriteSet())
 
664
    if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
745
665
    {
746
 
      /* This masks a bug in the logic for a SELECT * */
747
 
      (*field)->setWriteSet();
748
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
749
 
      {
 
666
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
667
                          CHECK_FIELD_WARN))
750
668
        goto err;
751
 
      }
752
 
 
753
669
      if ((*field)->flags & BLOB_FLAG)
754
670
      {
755
671
        Field_blob *blob= *(Field_blob**) field;
761
677
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
762
678
        if (src)
763
679
        {
764
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
 
680
          tgt= (unsigned char*) alloc_root(&blobroot, length);
765
681
          memmove(tgt, src, length);
766
682
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
767
683
        }
777
693
}
778
694
 
779
695
/*
 
696
  If frm_error() is called in table.cc this is called to find out what file
 
697
  extensions exist for this handler.
 
698
*/
 
699
static const char *ha_tina_exts[] = {
 
700
  CSV_EXT,
 
701
  CSM_EXT,
 
702
  NULL
 
703
};
 
704
 
 
705
const char **ha_tina::bas_ext() const
 
706
{
 
707
  return ha_tina_exts;
 
708
}
 
709
 
 
710
/*
 
711
  Three functions below are needed to enable concurrent insert functionality
 
712
  for CSV engine. For more details see mysys/thr_lock.c
 
713
*/
 
714
 
 
715
void tina_get_status(void* param, int)
 
716
{
 
717
  ha_tina *tina= (ha_tina*) param;
 
718
  tina->get_status();
 
719
}
 
720
 
 
721
void tina_update_status(void* param)
 
722
{
 
723
  ha_tina *tina= (ha_tina*) param;
 
724
  tina->update_status();
 
725
}
 
726
 
 
727
/* this should exist and return 0 for concurrent insert to work */
 
728
bool tina_check_status(void *)
 
729
{
 
730
  return 0;
 
731
}
 
732
 
 
733
/*
 
734
  Save the state of the table
 
735
 
 
736
  SYNOPSIS
 
737
    get_status()
 
738
 
 
739
  DESCRIPTION
 
740
    This function is used to retrieve the file length. During the lock
 
741
    phase of concurrent insert. For more details see comment to
 
742
    ha_tina::update_status below.
 
743
*/
 
744
 
 
745
void ha_tina::get_status()
 
746
{
 
747
  local_saved_data_file_length= share->saved_data_file_length;
 
748
}
 
749
 
 
750
 
 
751
/*
 
752
  Correct the state of the table. Called by unlock routines
 
753
  before the write lock is released.
 
754
 
 
755
  SYNOPSIS
 
756
    update_status()
 
757
 
 
758
  DESCRIPTION
 
759
    When we employ concurrent insert lock, we save current length of the file
 
760
    during the lock phase. We do not read further saved value, as we don't
 
761
    want to interfere with undergoing concurrent insert. Writers update file
 
762
    length info during unlock with update_status().
 
763
 
 
764
  NOTE
 
765
    For log tables concurrent insert works different. The reason is that
 
766
    log tables are always opened and locked. And as they do not unlock
 
767
    tables, the file length after writes should be updated in a different
 
768
    way.
 
769
*/
 
770
 
 
771
void ha_tina::update_status()
 
772
{
 
773
  /* correct local_saved_data_file_length for writers */
 
774
  share->saved_data_file_length= local_saved_data_file_length;
 
775
}
 
776
 
 
777
 
 
778
/*
780
779
  Open a database file. Keep in mind that tables are caches, so
781
780
  this will not be called for every request. Any sort of positions
782
781
  that need to be reset should be kept in the ::extra() call.
783
782
*/
784
 
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
 
783
int ha_tina::open(const char *name, int, uint32_t open_options)
785
784
{
786
 
  if (not (share= get_share(identifier.getPath().c_str())))
 
785
  if (!(share= get_share(name, table)))
787
786
    return(ENOENT);
788
787
 
789
 
  if (share->crashed)
 
788
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
790
789
  {
791
 
    free_share();
 
790
    free_share(share);
792
791
    return(HA_ERR_CRASHED_ON_USAGE);
793
792
  }
794
793
 
795
794
  local_data_file_version= share->data_file_version;
796
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
795
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
797
796
    return(0);
798
797
 
799
798
  /*
800
 
    Init locking. Pass Cursor object to the locking routines,
 
799
    Init locking. Pass handler object to the locking routines,
801
800
    so that they could save/update local_saved_data_file_length value
802
801
    during locking. This is needed to enable concurrent inserts.
803
802
  */
 
803
  thr_lock_data_init(&share->lock, &lock, (void*) this);
804
804
  ref_length=sizeof(off_t);
805
805
 
 
806
  share->lock.get_status= tina_get_status;
 
807
  share->lock.update_status= tina_update_status;
 
808
  share->lock.check_status= tina_check_status;
 
809
 
806
810
  return(0);
807
811
}
808
812
 
 
813
 
809
814
/*
810
815
  Close a database file. We remove ourselves from the shared strucutre.
811
816
  If it is empty we destroy it.
813
818
int ha_tina::close(void)
814
819
{
815
820
  int rc= 0;
816
 
  rc= internal::my_close(data_file, MYF(0));
817
 
  return(free_share() || rc);
 
821
  rc= my_close(data_file, MYF(0));
 
822
  return(free_share(share) || rc);
818
823
}
819
824
 
820
825
/*
821
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
826
  This is an INSERT. At the moment this handler just seeks to the end
822
827
  of the file and appends the data. In an error case it really should
823
828
  just truncate to the original position (this is not done yet).
824
829
*/
825
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
830
int ha_tina::write_row(unsigned char * buf)
826
831
{
827
832
  int size;
828
833
 
829
834
  if (share->crashed)
830
835
      return(HA_ERR_CRASHED_ON_USAGE);
831
836
 
 
837
  ha_statistic_increment(&SSV::ha_write_count);
 
838
 
 
839
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
840
    table->timestamp_field->set_time();
 
841
 
832
842
  size= encode_quote(buf);
833
843
 
834
844
  if (!share->tina_write_opened)
836
846
      return(-1);
837
847
 
838
848
   /* use pwrite, as concurrent reader could have changed the position */
839
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
849
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
840
850
               MYF(MY_WME | MY_NABP)))
841
851
    return(-1);
842
852
 
861
871
  if (!share->update_file_opened)
862
872
  {
863
873
    if ((update_temp_file=
864
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
874
           my_create(fn_format(updated_fname, share->table_name,
865
875
                               "", CSN_EXT,
866
876
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
867
877
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
876
886
  This is called for an update.
877
887
  Make sure you put in code to increment the auto increment, also
878
888
  update any timestamp data. Currently auto increment is not being
879
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
889
  fixed since autoincrements have yet to be added to this table handler.
880
890
  This will be called in a table scan right before the previous ::rnd_next()
881
891
  call.
882
892
*/
883
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
893
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
884
894
{
885
895
  int size;
886
896
  int rc= -1;
887
897
 
 
898
  ha_statistic_increment(&SSV::ha_update_count);
 
899
 
 
900
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
901
    table->timestamp_field->set_time();
 
902
 
888
903
  size= encode_quote(new_data);
889
904
 
890
905
  /*
891
906
    During update we mark each updating record as deleted
892
907
    (see the chain_append()) then write new one to the temporary data file.
893
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
908
    At the end of the sequence in the rnd_end() we append all non-marked
894
909
    records from the data file to the temporary data file then rename it.
895
910
    The temp_file_length is used to calculate new data file length.
896
911
  */
900
915
  if (open_update_temp_file_if_needed())
901
916
    goto err;
902
917
 
903
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
918
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
904
919
               MYF(MY_WME | MY_NABP)))
905
920
    goto err;
906
921
  temp_file_length+= size;
915
930
  Deletes a row. First the database will find the row, and then call this
916
931
  method. In the case of a table scan, the previous call to this will be
917
932
  the ::rnd_next() that found this row.
918
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
933
  The exception to this is an ORDER BY. This will cause the table handler
919
934
  to walk the table noting the positions of all rows that match a query.
920
935
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
921
936
  DESC, ASC).
922
937
*/
923
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
938
int ha_tina::delete_row(const unsigned char *)
924
939
{
 
940
  ha_statistic_increment(&SSV::ha_delete_count);
925
941
 
926
942
  if (chain_append())
927
943
    return(-1);
955
971
  if (local_data_file_version != share->data_file_version)
956
972
  {
957
973
    local_data_file_version= share->data_file_version;
958
 
    if (internal::my_close(data_file, MYF(0)) ||
959
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
974
    if (my_close(data_file, MYF(0)) ||
 
975
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
960
976
      return 1;
961
977
  }
962
978
  file_buff->init_buff(data_file);
968
984
  All table scans call this first.
969
985
  The order of a table scan is:
970
986
 
 
987
  ha_tina::store_lock
 
988
  ha_tina::external_lock
971
989
  ha_tina::info
972
990
  ha_tina::rnd_init
973
991
  ha_tina::extra
983
1001
  ha_tina::rnd_next
984
1002
  ha_tina::extra
985
1003
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1004
  ha_tina::external_lock
986
1005
  ha_tina::extra
987
1006
  ENUM HA_EXTRA_RESET   Reset database to after open
988
1007
 
992
1011
 
993
1012
*/
994
1013
 
995
 
int ha_tina::doStartTableScan(bool)
 
1014
int ha_tina::rnd_init(bool)
996
1015
{
997
1016
  /* set buffer to the beginning of the file */
998
1017
  if (share->crashed || init_data_file())
1001
1020
  current_position= next_position= 0;
1002
1021
  stats.records= 0;
1003
1022
  records_is_known= 0;
1004
 
  chain.clear();
 
1023
  chain_ptr= chain;
1005
1024
 
1006
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1025
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1007
1026
 
1008
1027
  return(0);
1009
1028
}
1018
1037
  reserved for null count.
1019
1038
  Basically this works as a mask for which rows are nulled (compared to just
1020
1039
  empty).
1021
 
  This table Cursor doesn't do nulls and does not know the difference between
1022
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1040
  This table handler doesn't do nulls and does not know the difference between
 
1041
  NULL and "". This is ok since this table handler is for spreadsheets and
1023
1042
  they don't know about them either :)
1024
1043
*/
1025
1044
int ha_tina::rnd_next(unsigned char *buf)
1029
1048
  if (share->crashed)
1030
1049
      return(HA_ERR_CRASHED_ON_USAGE);
1031
1050
 
1032
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1051
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1033
1052
 
1034
1053
  current_position= next_position;
1035
1054
 
1055
1074
*/
1056
1075
void ha_tina::position(const unsigned char *)
1057
1076
{
1058
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1077
  my_store_ptr(ref, ref_length, current_position);
1059
1078
  return;
1060
1079
}
1061
1080
 
1062
1081
 
1063
1082
/*
1064
1083
  Used to fetch a row from a posiion stored with ::position().
1065
 
  internal::my_get_ptr() retrieves the data for you.
 
1084
  my_get_ptr() retrieves the data for you.
1066
1085
*/
1067
1086
 
1068
1087
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1069
1088
{
1070
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1071
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1089
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1090
  current_position= (off_t)my_get_ptr(pos,ref_length);
1072
1091
  return(find_current_row(buf));
1073
1092
}
1074
1093
 
1075
1094
/*
1076
1095
  ::info() is used to return information to the optimizer.
1077
 
  Currently this table Cursor doesn't implement most of the fields
 
1096
  Currently this table handler doesn't implement most of the fields
1078
1097
  really needed. SHOW also makes use of this data
1079
1098
*/
1080
1099
int ha_tina::info(uint32_t)
1090
1109
  to the given "hole", stored in the buffer. "Valid" here means,
1091
1110
  not listed in the chain of deleted records ("holes").
1092
1111
*/
1093
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1112
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1094
1113
{
1095
 
  if (closest_hole == chain.end()) /* no more chains */
 
1114
  if (closest_hole == chain_ptr) /* no more chains */
1096
1115
    *end_pos= file_buff->end();
1097
1116
  else
1098
1117
    *end_pos= std::min(file_buff->end(),
1099
 
                       closest_hole->first);
1100
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1118
                       closest_hole->begin);
 
1119
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1101
1120
}
1102
1121
 
1103
1122
 
1107
1126
  slots to clean up all of the dead space we have collected while
1108
1127
  performing deletes/updates.
1109
1128
*/
1110
 
int ha_tina::doEndTableScan()
 
1129
int ha_tina::rnd_end()
1111
1130
{
 
1131
  char updated_fname[FN_REFLEN];
1112
1132
  off_t file_buffer_start= 0;
1113
1133
 
1114
 
  blobroot.free_root(MYF(0));
 
1134
  free_root(&blobroot, MYF(0));
1115
1135
  records_is_known= 1;
1116
1136
 
1117
 
  if (chain.size() > 0)
 
1137
  if ((chain_ptr - chain)  > 0)
1118
1138
  {
1119
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1139
    tina_set *ptr= chain;
1120
1140
 
1121
1141
    /*
1122
1142
      Re-read the beginning of a file (as the buffer should point to the
1128
1148
      The sort is needed when there were updates/deletes with random orders.
1129
1149
      It sorts so that we move the firts blocks to the beginning.
1130
1150
    */
1131
 
    sort(chain.begin(), chain.end());
 
1151
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1152
             (qsort_cmp)sort_set);
1132
1153
 
1133
1154
    off_t write_begin= 0, write_end;
1134
1155
 
1149
1170
      /* if there is something to write, write it */
1150
1171
      if (write_length)
1151
1172
      {
1152
 
        if (internal::my_write(update_temp_file,
 
1173
        if (my_write(update_temp_file,
1153
1174
                     (unsigned char*) (file_buff->ptr() +
1154
1175
                               (write_begin - file_buff->start())),
1155
1176
                     (size_t)write_length, MYF_RW))
1159
1180
      if (in_hole)
1160
1181
      {
1161
1182
        /* skip hole */
1162
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1183
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1163
1184
          file_buffer_start= file_buff->read_next();
1164
 
        write_begin= ptr->second;
1165
 
        ++ptr;
 
1185
        write_begin= ptr->end;
 
1186
        ptr++;
1166
1187
      }
1167
1188
      else
1168
1189
        write_begin= write_end;
1172
1193
 
1173
1194
    }
1174
1195
 
1175
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1176
 
        internal::my_close(update_temp_file, MYF(0)))
 
1196
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1197
        my_close(update_temp_file, MYF(0)))
1177
1198
      return(-1);
1178
1199
 
1179
1200
    share->update_file_opened= false;
1180
1201
 
1181
1202
    if (share->tina_write_opened)
1182
1203
    {
1183
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1204
      if (my_close(share->tina_write_filedes, MYF(0)))
1184
1205
        return(-1);
1185
1206
      /*
1186
1207
        Mark that the writer fd is closed, so that init_tina_writer()
1193
1214
      Close opened fildes's. Then move updated file in place
1194
1215
      of the old datafile.
1195
1216
    */
1196
 
    std::string rename_file= share->table_name;
1197
 
    rename_file.append(CSN_EXT);
1198
 
    if (internal::my_close(data_file, MYF(0)) ||
1199
 
        internal::my_rename(rename_file.c_str(),
1200
 
                            share->data_file_name.c_str(), MYF(0)))
 
1217
    if (my_close(data_file, MYF(0)) ||
 
1218
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1219
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1220
                  share->data_file_name, MYF(0)))
1201
1221
      return(-1);
1202
1222
 
1203
1223
    /* Open the file again */
1204
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1224
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1205
1225
      return(-1);
1206
1226
    /*
1207
1227
      As we reopened the data file, increase share->data_file_version
1228
1248
 
1229
1249
  return(0);
1230
1250
error:
1231
 
  internal::my_close(update_temp_file, MYF(0));
 
1251
  my_close(update_temp_file, MYF(0));
1232
1252
  share->update_file_opened= false;
1233
1253
  return(-1);
1234
1254
}
1235
1255
 
1236
1256
 
1237
1257
/*
 
1258
  Repair CSV table in the case, it is crashed.
 
1259
 
 
1260
  SYNOPSIS
 
1261
    repair()
 
1262
    session         The thread, performing repair
 
1263
    check_opt   The options for repair. We do not use it currently.
 
1264
 
 
1265
  DESCRIPTION
 
1266
    If the file is empty, change # of rows in the file and complete recovery.
 
1267
    Otherwise, scan the table looking for bad rows. If none were found,
 
1268
    we mark file as a good one and return. If a bad row was encountered,
 
1269
    we truncate the datafile up to the last good row.
 
1270
 
 
1271
   TODO: Make repair more clever - it should try to recover subsequent
 
1272
         rows (after the first bad one) as well.
 
1273
*/
 
1274
 
 
1275
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
 
1276
{
 
1277
  char repaired_fname[FN_REFLEN];
 
1278
  unsigned char *buf;
 
1279
  File repair_file;
 
1280
  int rc;
 
1281
  ha_rows rows_repaired= 0;
 
1282
  off_t write_begin= 0, write_end;
 
1283
 
 
1284
  /* empty file */
 
1285
  if (!share->saved_data_file_length)
 
1286
  {
 
1287
    share->rows_recorded= 0;
 
1288
    goto end;
 
1289
  }
 
1290
 
 
1291
  /* Don't assert in field::val() functions */
 
1292
  table->use_all_columns();
 
1293
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1294
    return(HA_ERR_OUT_OF_MEM);
 
1295
 
 
1296
  /* position buffer to the start of the file */
 
1297
  if (init_data_file())
 
1298
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1299
 
 
1300
  /*
 
1301
    Local_saved_data_file_length is initialized during the lock phase.
 
1302
    Sometimes this is not getting executed before ::repair (e.g. for
 
1303
    the log tables). We set it manually here.
 
1304
  */
 
1305
  local_saved_data_file_length= share->saved_data_file_length;
 
1306
  /* set current position to the beginning of the file */
 
1307
  current_position= next_position= 0;
 
1308
 
 
1309
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1310
 
 
1311
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1312
  while (!(rc= find_current_row(buf)))
 
1313
  {
 
1314
    session_inc_row_count(session);
 
1315
    rows_repaired++;
 
1316
    current_position= next_position;
 
1317
  }
 
1318
 
 
1319
  free_root(&blobroot, MYF(0));
 
1320
 
 
1321
  free((char*)buf);
 
1322
 
 
1323
  if (rc == HA_ERR_END_OF_FILE)
 
1324
  {
 
1325
    /*
 
1326
      All rows were read ok until end of file, the file does not need repair.
 
1327
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1328
      to the current amount of rows.
 
1329
    */
 
1330
    share->rows_recorded= rows_repaired;
 
1331
    goto end;
 
1332
  }
 
1333
 
 
1334
  /*
 
1335
    Otherwise we've encountered a bad row => repair is needed.
 
1336
    Let us create a temporary file.
 
1337
  */
 
1338
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1339
                                        "", CSN_EXT,
 
1340
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1341
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1342
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1343
 
 
1344
  file_buff->init_buff(data_file);
 
1345
 
 
1346
 
 
1347
  /* we just truncated the file up to the first bad row. update rows count. */
 
1348
  share->rows_recorded= rows_repaired;
 
1349
 
 
1350
  /* write repaired file */
 
1351
  while (1)
 
1352
  {
 
1353
    write_end= std::min(file_buff->end(), current_position);
 
1354
 
 
1355
    off_t write_length= write_end - write_begin;
 
1356
    if ((uint64_t)write_length > SIZE_MAX)
 
1357
    {
 
1358
      return -1;
 
1359
    }
 
1360
    if ((write_length) &&
 
1361
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1362
                  (size_t)write_length, MYF_RW)))
 
1363
      return(-1);
 
1364
 
 
1365
    write_begin= write_end;
 
1366
    if (write_end== current_position)
 
1367
      break;
 
1368
    else
 
1369
      file_buff->read_next(); /* shift the buffer */
 
1370
  }
 
1371
 
 
1372
  /*
 
1373
    Close the files and rename repaired file to the datafile.
 
1374
    We have to close the files, as on Windows one cannot rename
 
1375
    a file, which descriptor is still open. EACCES will be returned
 
1376
    when trying to delete the "to"-file in my_rename().
 
1377
  */
 
1378
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1379
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1380
    return(-1);
 
1381
 
 
1382
  /* Open the file again, it should now be repaired */
 
1383
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1384
                          MYF(0))) == -1)
 
1385
     return(-1);
 
1386
 
 
1387
  /* Set new file size. The file size will be updated by ::update_status() */
 
1388
  local_saved_data_file_length= (size_t) current_position;
 
1389
 
 
1390
end:
 
1391
  share->crashed= false;
 
1392
  return(HA_ADMIN_OK);
 
1393
}
 
1394
 
 
1395
/*
1238
1396
  DELETE without WHERE calls this
1239
1397
*/
1240
1398
 
1243
1401
  int rc;
1244
1402
 
1245
1403
  if (!records_is_known)
1246
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1404
    return(my_errno=HA_ERR_WRONG_COMMAND);
1247
1405
 
1248
1406
  if (!share->tina_write_opened)
1249
1407
    if (init_tina_writer())
1262
1420
}
1263
1421
 
1264
1422
/*
 
1423
  Called by the database to lock the table. Keep in mind that this
 
1424
  is an internal lock.
 
1425
*/
 
1426
THR_LOCK_DATA **ha_tina::store_lock(Session *,
 
1427
                                    THR_LOCK_DATA **to,
 
1428
                                    enum thr_lock_type lock_type)
 
1429
{
 
1430
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1431
    lock.type=lock_type;
 
1432
  *to++= &lock;
 
1433
  return to;
 
1434
}
 
1435
 
 
1436
/*
1265
1437
  Create a table. You do not want to leave the table open after a call to
1266
1438
  this (the database will call ::open() if it needs to).
1267
1439
*/
1268
1440
 
1269
 
int Tina::doCreateTable(Session &session,
1270
 
                        Table& table_arg,
1271
 
                        const drizzled::identifier::Table &identifier,
1272
 
                        drizzled::message::Table &create_proto)
 
1441
int ha_tina::create(const char *name, Table *table_arg, HA_CREATE_INFO *)
1273
1442
{
1274
1443
  char name_buff[FN_REFLEN];
1275
 
  int create_file;
 
1444
  File create_file;
1276
1445
 
1277
1446
  /*
1278
1447
    check columns
1279
1448
  */
1280
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
 
       iter != fields.end();
1283
 
       iter++)
 
1449
  for (Field **field= table_arg->s->field; *field; field++)
1284
1450
  {
1285
 
    if (not *iter) // Historical legacy for NULL array end.
1286
 
      continue;
1287
 
 
1288
 
    if ((*iter)->real_maybe_null())
 
1451
    if ((*field)->real_maybe_null())
1289
1452
    {
1290
1453
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1454
      return(HA_ERR_UNSUPPORTED);
1293
1456
  }
1294
1457
 
1295
1458
 
1296
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1459
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
 
1460
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1461
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1299
1462
    return(-1);
1300
1463
 
1301
1464
  write_meta_file(create_file, 0, false);
1302
 
  internal::my_close(create_file, MYF(0));
 
1465
  my_close(create_file, MYF(0));
1303
1466
 
1304
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1467
  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
 
1468
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1469
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1307
1470
    return(-1);
1308
1471
 
1309
 
  internal::my_close(create_file, MYF(0));
1310
 
 
1311
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
1312
 
 
1313
 
  return 0;
1314
 
}
1315
 
 
1316
 
 
1317
 
DRIZZLE_DECLARE_PLUGIN
1318
 
{
1319
 
  DRIZZLE_VERSION_ID,
 
1472
  my_close(create_file, MYF(0));
 
1473
 
 
1474
  return(0);
 
1475
}
 
1476
 
 
1477
int ha_tina::check(Session* session, HA_CHECK_OPT *)
 
1478
{
 
1479
  int rc= 0;
 
1480
  unsigned char *buf;
 
1481
  const char *old_proc_info;
 
1482
  ha_rows count= share->rows_recorded;
 
1483
 
 
1484
  old_proc_info= get_session_proc_info(session);
 
1485
  set_session_proc_info(session, "Checking table");
 
1486
  if (!(buf= (unsigned char*) malloc(table->s->reclength)))
 
1487
    return(HA_ERR_OUT_OF_MEM);
 
1488
 
 
1489
  /* position buffer to the start of the file */
 
1490
   if (init_data_file())
 
1491
     return(HA_ERR_CRASHED);
 
1492
 
 
1493
  /*
 
1494
    Local_saved_data_file_length is initialized during the lock phase.
 
1495
    Check does not use store_lock in certain cases. So, we set it
 
1496
    manually here.
 
1497
  */
 
1498
  local_saved_data_file_length= share->saved_data_file_length;
 
1499
  /* set current position to the beginning of the file */
 
1500
  current_position= next_position= 0;
 
1501
 
 
1502
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1503
 
 
1504
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1505
  while (!(rc= find_current_row(buf)))
 
1506
  {
 
1507
    session_inc_row_count(session);
 
1508
    count--;
 
1509
    current_position= next_position;
 
1510
  }
 
1511
 
 
1512
  free_root(&blobroot, MYF(0));
 
1513
 
 
1514
  free((char*)buf);
 
1515
  set_session_proc_info(session, old_proc_info);
 
1516
 
 
1517
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1518
  {
 
1519
    share->crashed= true;
 
1520
    return(HA_ADMIN_CORRUPT);
 
1521
  }
 
1522
  else
 
1523
    return(HA_ADMIN_OK);
 
1524
}
 
1525
 
 
1526
 
 
1527
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *, uint32_t)
 
1528
{
 
1529
  return COMPATIBLE_DATA_YES;
 
1530
}
 
1531
 
 
1532
drizzle_declare_plugin(csv)
 
1533
{
 
1534
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
1320
1535
  "CSV",
1321
1536
  "1.0",
1322
1537
  "Brian Aker, MySQL AB",
1323
1538
  "CSV storage engine",
1324
1539
  PLUGIN_LICENSE_GPL,
1325
1540
  tina_init_func, /* Plugin Init */
1326
 
  NULL,                       /* depends */
 
1541
  tina_done_func, /* Plugin Deinit */
 
1542
  NULL,                       /* status variables                */
 
1543
  NULL,                       /* system variables                */
1327
1544
  NULL                        /* config options                  */
1328
1545
}
1329
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1546
drizzle_declare_plugin_end;
1330
1547