~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/csv/ha_tina.cc

  • Committer: Monty Taylor
  • Date: 2008-12-06 22:41:03 UTC
  • mto: (656.1.7 devel)
  • mto: This revision was merged to the branch mainline in revision 665.
  • Revision ID: monty@inaugust.com-20081206224103-jdouqwt9hb0f01y1
Moved non-working tests into broken suite for easier running of working tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include <config.h>
 
43
#include <drizzled/common_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
 
46
#include <drizzled/field/timestamp.h>
 
47
#include <storage/csv/ha_tina.h>
46
48
#include <drizzled/error.h>
47
49
#include <drizzled/table.h>
48
50
#include <drizzled/session.h>
49
 
#include <drizzled/internal/my_sys.h>
50
 
 
51
 
#include "ha_tina.h"
52
 
 
53
 
#include <fcntl.h>
54
 
 
55
 
#include <algorithm>
56
 
#include <vector>
57
 
#include <string>
58
 
#include <map>
59
 
 
60
 
using namespace std;
61
 
using namespace drizzled;
62
51
 
63
52
/*
64
53
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
65
54
*/
66
 
static const int META_BUFFER_SIZE = sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t)
67
 
  + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char);
68
 
static const int TINA_CHECK_HEADER = 254; // The number we use to determine corruption
69
 
static const int BLOB_MEMROOT_ALLOC_SIZE = 8192;
 
55
#define META_BUFFER_SIZE sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t) \
 
56
  + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char)
 
57
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
 
58
#define BLOB_MEMROOT_ALLOC_SIZE 8192
70
59
 
71
60
/* The file extension */
72
 
static const char* CSV_EXT = ".CSV"               // The data file
73
 
static const char* CSN_EXT = ".CSN"               // Files used during repair and update
74
 
static const char* CSM_EXT = ".CSM"               // Meta file
75
 
 
76
 
 
77
 
static int read_meta_file(int meta_file, ha_rows *rows);
78
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
61
#define CSV_EXT ".CSV"               // The data file
 
62
#define CSN_EXT ".CSN"               // Files used during repair and update
 
63
#define CSM_EXT ".CSM"               // Meta file
 
64
 
 
65
 
 
66
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
67
static int free_share(TINA_SHARE *share);
 
68
static int read_meta_file(File meta_file, ha_rows *rows);
 
69
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
70
 
 
71
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
72
extern "C" void tina_update_status(void* param);
 
73
extern "C" bool tina_check_status(void* param);
79
74
 
80
75
/* Stuff for shares */
81
76
pthread_mutex_t tina_mutex;
 
77
static HASH tina_open_tables;
 
78
static handler *tina_create_handler(handlerton *hton,
 
79
                                    TABLE_SHARE *table, 
 
80
                                    MEM_ROOT *mem_root);
 
81
 
82
82
 
83
83
/*****************************************************************************
84
84
 ** TINA tables
85
85
 *****************************************************************************/
86
86
 
87
87
/*
88
 
  If frm_error() is called in table.cc this is called to find out what file
89
 
  extensions exist for this Cursor.
 
88
  Used for sorting chains with qsort().
90
89
*/
91
 
static const char *ha_tina_exts[] = {
92
 
  CSV_EXT,
93
 
  CSM_EXT,
94
 
  NULL
95
 
};
96
 
 
97
 
class Tina : public drizzled::plugin::StorageEngine
98
 
{
99
 
  typedef std::map<string, TinaShare*> TinaMap;
100
 
  TinaMap tina_open_tables;
101
 
public:
102
 
  Tina(const string& name_arg)
103
 
   : drizzled::plugin::StorageEngine(name_arg,
104
 
                                     HTON_TEMPORARY_ONLY |
105
 
                                     HTON_NO_AUTO_INCREMENT |
106
 
                                     HTON_SKIP_STORE_LOCK),
107
 
    tina_open_tables()
108
 
  {}
109
 
  virtual ~Tina()
110
 
  {
111
 
    pthread_mutex_destroy(&tina_mutex);
112
 
  }
113
 
 
114
 
  virtual Cursor *create(Table &table)
115
 
  {
116
 
    return new ha_tina(*this, table);
117
 
  }
118
 
 
119
 
  const char **bas_ext() const {
120
 
    return ha_tina_exts;
121
 
  }
122
 
 
123
 
  int doCreateTable(Session &,
124
 
                    Table &table_arg,
125
 
                    const drizzled::identifier::Table &identifier,
126
 
                    drizzled::message::Table&);
127
 
 
128
 
  int doGetTableDefinition(Session& session,
129
 
                           const drizzled::identifier::Table &identifier,
130
 
                           drizzled::message::Table &table_message);
131
 
 
132
 
  int doDropTable(Session&, const drizzled::identifier::Table &identifier);
133
 
  TinaShare *findOpenTable(const string table_name);
134
 
  void addOpenTable(const string &table_name, TinaShare *);
135
 
  void deleteOpenTable(const string &table_name);
136
 
 
137
 
 
138
 
  uint32_t max_keys()          const { return 0; }
139
 
  uint32_t max_key_parts()     const { return 0; }
140
 
  uint32_t max_key_length()    const { return 0; }
141
 
  bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
 
  int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
143
 
 
144
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
 
                             const drizzled::identifier::Schema &schema_identifier,
146
 
                             drizzled::identifier::Table::vector &set_of_identifiers);
147
 
};
148
 
 
149
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
 
                                 const drizzled::identifier::Schema&,
151
 
                                 drizzled::identifier::Table::vector&)
152
 
{
153
 
}
154
 
 
155
 
int Tina::doRenameTable(Session &session,
156
 
                        const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
157
 
{
158
 
  int error= 0;
159
 
  for (const char **ext= bas_ext(); *ext ; ext++)
160
 
  {
161
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
162
 
    {
163
 
      if ((error=errno) != ENOENT)
164
 
        break;
165
 
      error= 0;
166
 
    }
167
 
  }
168
 
 
169
 
  session.getMessageCache().renameTableMessage(from, to);
170
 
 
171
 
  return error;
172
 
}
173
 
 
174
 
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
175
 
{
176
 
  return session.getMessageCache().doesTableMessageExist(identifier);
177
 
}
178
 
 
179
 
 
180
 
int Tina::doDropTable(Session &session,
181
 
                      const drizzled::identifier::Table &identifier)
182
 
{
183
 
  int error= 0;
184
 
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
185
 
 
186
 
  for (const char **ext= bas_ext(); *ext ; ext++)
187
 
  {
188
 
    std::string full_name= identifier.getPath();
189
 
    full_name.append(*ext);
190
 
 
191
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
192
 
    {
193
 
      if ((error= errno) != ENOENT)
194
 
        break;
195
 
    }
196
 
    else
197
 
    {
198
 
      enoent_or_zero= 0;                        // No error for ENOENT
199
 
    }
200
 
    error= enoent_or_zero;
201
 
  }
202
 
 
203
 
  session.getMessageCache().removeTableMessage(identifier);
204
 
 
205
 
  return error;
206
 
}
207
 
 
208
 
TinaShare *Tina::findOpenTable(const string table_name)
209
 
{
210
 
  TinaMap::iterator find_iter=
211
 
    tina_open_tables.find(table_name);
212
 
 
213
 
  if (find_iter != tina_open_tables.end())
214
 
    return (*find_iter).second;
215
 
  else
216
 
    return NULL;
217
 
}
218
 
 
219
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
220
 
{
221
 
  tina_open_tables[table_name]= share;
222
 
}
223
 
 
224
 
void Tina::deleteOpenTable(const string &table_name)
225
 
{
226
 
  tina_open_tables.erase(table_name);
227
 
}
228
 
 
229
 
 
230
 
int Tina::doGetTableDefinition(Session &session,
231
 
                               const drizzled::identifier::Table &identifier,
232
 
                               drizzled::message::Table &table_message)
233
 
{
234
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
235
 
    return EEXIST;
236
 
 
237
 
  return ENOENT;
238
 
}
239
 
 
240
 
 
241
 
static Tina *tina_engine= NULL;
242
 
 
243
 
static int tina_init_func(drizzled::module::Context &context)
244
 
{
245
 
 
246
 
  tina_engine= new Tina("CSV");
247
 
  context.add(tina_engine);
248
 
 
 
90
int sort_set (tina_set *a, tina_set *b)
 
91
{
 
92
  /*
 
93
    We assume that intervals do not intersect. So, it is enought to compare
 
94
    any two points. Here we take start of intervals for comparison.
 
95
  */
 
96
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
97
}
 
98
 
 
99
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
 
100
{
 
101
  *length=share->table_name_length;
 
102
  return (unsigned char*) share->table_name;
 
103
}
 
104
 
 
105
static int tina_init_func(void *p)
 
106
{
 
107
  handlerton *tina_hton;
 
108
 
 
109
  tina_hton= (handlerton *)p;
249
110
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
250
 
  return 0;
251
 
}
252
 
 
253
 
 
254
 
 
255
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
256
 
  table_name(table_name_arg),
257
 
  data_file_name(table_name_arg),
258
 
  use_count(0),
259
 
  saved_data_file_length(0),
260
 
  update_file_opened(false),
261
 
  tina_write_opened(false),
262
 
  crashed(false),
263
 
  rows_recorded(0),
264
 
  data_file_version(0)
265
 
{
266
 
  data_file_name.append(CSV_EXT);
267
 
}
268
 
 
269
 
TinaShare::~TinaShare()
270
 
{
271
 
  pthread_mutex_destroy(&mutex);
272
 
}
 
111
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
112
                   (hash_get_key) tina_get_key,0,0);
 
113
  tina_hton->state= SHOW_OPTION_YES;
 
114
  tina_hton->create= tina_create_handler;
 
115
  tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES | 
 
116
                     HTON_NO_PARTITION);
 
117
  return 0;
 
118
}
 
119
 
 
120
static int tina_done_func(void *)
 
121
{
 
122
  hash_free(&tina_open_tables);
 
123
  pthread_mutex_destroy(&tina_mutex);
 
124
 
 
125
  return 0;
 
126
}
 
127
 
273
128
 
274
129
/*
275
130
  Simple lock controls.
276
131
*/
277
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
132
static TINA_SHARE *get_share(const char *table_name, Table *)
278
133
{
279
 
  pthread_mutex_lock(&tina_mutex);
280
 
 
281
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
282
 
  share= a_tina->findOpenTable(table_name);
283
 
 
284
 
  std::string meta_file_name;
 
134
  TINA_SHARE *share;
 
135
  char meta_file_name[FN_REFLEN];
285
136
  struct stat file_stat;
 
137
  char *tmp_name;
 
138
  uint32_t length;
 
139
 
 
140
  pthread_mutex_lock(&tina_mutex);
 
141
  length=(uint) strlen(table_name);
286
142
 
287
143
  /*
288
144
    If share is not present in the hash, create a new share and
289
145
    initialize its members.
290
146
  */
291
 
  if (! share)
 
147
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
148
                                        (unsigned char*) table_name,
 
149
                                       length)))
292
150
  {
293
 
    share= new TinaShare(table_name);
294
 
 
295
 
    if (share == NULL)
296
 
    {
297
 
      pthread_mutex_unlock(&tina_mutex);
298
 
      return NULL;
299
 
    }
300
 
 
301
 
    meta_file_name.assign(table_name);
302
 
    meta_file_name.append(CSM_EXT);
303
 
 
304
 
    if (stat(share->data_file_name.c_str(), &file_stat))
305
 
    {
306
 
      pthread_mutex_unlock(&tina_mutex);
307
 
      delete share;
308
 
      return NULL;
309
 
    }
310
 
  
 
151
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
152
                         &share, sizeof(*share),
 
153
                         &tmp_name, length+1,
 
154
                         NULL))
 
155
    {
 
156
      pthread_mutex_unlock(&tina_mutex);
 
157
      return NULL;
 
158
    }
 
159
 
 
160
    share->use_count= 0;
 
161
    share->table_name_length= length;
 
162
    share->table_name= tmp_name;
 
163
    share->crashed= false;
 
164
    share->rows_recorded= 0;
 
165
    share->update_file_opened= false;
 
166
    share->tina_write_opened= false;
 
167
    share->data_file_version= 0;
 
168
    strcpy(share->table_name, table_name);
 
169
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
170
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
171
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
172
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
173
 
 
174
    if (stat(share->data_file_name, &file_stat))
 
175
      goto error;
311
176
    share->saved_data_file_length= file_stat.st_size;
312
177
 
313
 
    a_tina->addOpenTable(share->table_name, share);
314
 
 
 
178
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
179
      goto error;
 
180
    thr_lock_init(&share->lock);
315
181
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
316
182
 
317
183
    /*
320
186
      Usually this will result in auto-repair, and we will get a good
321
187
      meta-file in the end.
322
188
    */
323
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
324
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
189
    if ((share->meta_file= my_open(meta_file_name,
 
190
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
325
191
      share->crashed= true;
326
192
 
327
193
    /*
335
201
  pthread_mutex_unlock(&tina_mutex);
336
202
 
337
203
  return share;
 
204
 
 
205
error:
 
206
  pthread_mutex_unlock(&tina_mutex);
 
207
  free((unsigned char*) share);
 
208
 
 
209
  return NULL;
338
210
}
339
211
 
340
212
 
357
229
    non-zero - error occurred
358
230
*/
359
231
 
360
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
232
static int read_meta_file(File meta_file, ha_rows *rows)
361
233
{
362
234
  unsigned char meta_buffer[META_BUFFER_SIZE];
363
235
  unsigned char *ptr= meta_buffer;
364
236
 
365
 
  lseek(meta_file, 0, SEEK_SET);
366
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
237
  my_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
 
238
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
367
239
      != META_BUFFER_SIZE)
368
240
    return(HA_ERR_CRASHED_ON_USAGE);
369
241
 
385
257
      ((bool)(*ptr)== true))
386
258
    return(HA_ERR_CRASHED_ON_USAGE);
387
259
 
388
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
260
  my_sync(meta_file, MYF(MY_WME));
389
261
 
390
262
  return(0);
391
263
}
410
282
    non-zero - error occurred
411
283
*/
412
284
 
413
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
285
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
414
286
{
415
287
  unsigned char meta_buffer[META_BUFFER_SIZE];
416
288
  unsigned char *ptr= meta_buffer;
429
301
  ptr+= 3*sizeof(uint64_t);
430
302
  *ptr= (unsigned char)dirty;
431
303
 
432
 
  lseek(meta_file, 0, SEEK_SET);
433
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
304
  my_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
 
305
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
434
306
      != META_BUFFER_SIZE)
435
307
    return(-1);
436
308
 
437
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
309
  my_sync(meta_file, MYF(MY_WME));
438
310
 
439
311
  return(0);
440
312
}
441
313
 
 
314
bool ha_tina::check_and_repair(Session *session)
 
315
{
 
316
  HA_CHECK_OPT check_opt;
 
317
 
 
318
  check_opt.init();
 
319
 
 
320
  return(repair(session, &check_opt));
 
321
}
 
322
 
 
323
 
442
324
int ha_tina::init_tina_writer()
443
325
{
444
326
  /*
449
331
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
450
332
 
451
333
  if ((share->tina_write_filedes=
452
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
334
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
453
335
  {
454
336
    share->crashed= true;
455
337
    return(1);
460
342
}
461
343
 
462
344
 
 
345
bool ha_tina::is_crashed() const
 
346
{
 
347
  return(share->crashed);
 
348
}
 
349
 
463
350
/*
464
351
  Free lock controls.
465
352
*/
466
 
int ha_tina::free_share()
 
353
static int free_share(TINA_SHARE *share)
467
354
{
468
355
  pthread_mutex_lock(&tina_mutex);
469
356
  int result_code= 0;
471
358
    /* Write the meta file. Mark it as crashed if needed. */
472
359
    (void)write_meta_file(share->meta_file, share->rows_recorded,
473
360
                          share->crashed ? true :false);
474
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
361
    if (my_close(share->meta_file, MYF(0)))
475
362
      result_code= 1;
476
363
    if (share->tina_write_opened)
477
364
    {
478
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
365
      if (my_close(share->tina_write_filedes, MYF(0)))
479
366
        result_code= 1;
480
367
      share->tina_write_opened= false;
481
368
    }
482
369
 
483
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
484
 
    a_tina->deleteOpenTable(share->table_name);
485
 
    delete share;
 
370
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
371
    thr_lock_delete(&share->lock);
 
372
    pthread_mutex_destroy(&share->mutex);
 
373
    free((unsigned char*) share);
486
374
  }
487
375
  pthread_mutex_unlock(&tina_mutex);
488
376
 
500
388
  '\r''\n' --  DOS\Windows line ending
501
389
*/
502
390
 
503
 
static off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
504
 
                            off_t end, int *eoln_len)
 
391
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
 
392
                     off_t end, int *eoln_len)
505
393
{
506
394
  *eoln_len= 0;
507
395
 
528
416
}
529
417
 
530
418
 
531
 
 
532
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
533
 
  :Cursor(engine_arg, table_arg),
 
419
static handler *tina_create_handler(handlerton *hton,
 
420
                                    TABLE_SHARE *table, 
 
421
                                    MEM_ROOT *mem_root)
 
422
{
 
423
  return new (mem_root) ha_tina(hton, table);
 
424
}
 
425
 
 
426
 
 
427
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
 
428
  :handler(hton, table_arg),
534
429
  /*
535
 
    These definitions are found in Cursor.h
 
430
    These definitions are found in handler.h
536
431
    They are not probably completely right.
537
432
  */
538
433
  current_position(0), next_position(0), local_saved_data_file_length(0),
539
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
434
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
435
  local_data_file_version(0), records_is_known(0)
540
436
{
541
437
  /* Set our original buffers from pre-allocated memory */
542
438
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
439
  chain= chain_buffer;
543
440
  file_buff= new Transparent_file();
544
441
}
545
442
 
556
453
 
557
454
  buffer.length(0);
558
455
 
559
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
456
  for (Field **field=table->field ; *field ; field++)
560
457
  {
561
458
    const char *ptr;
562
459
    const char *end_ptr;
563
460
    const bool was_null= (*field)->is_null();
564
 
 
 
461
    
565
462
    /*
566
463
      assistance for backwards compatibility in production builds.
567
464
      note: this will not work for ENUM columns.
572
469
      (*field)->set_notnull();
573
470
    }
574
471
 
575
 
    /* 
576
 
      Since we are needing to "translate" the type into a string we 
577
 
      will need to do a val_str(). This would cause an assert() to
578
 
      normally occur since we are onlying "writing" values.
579
 
    */
580
 
    (*field)->setReadSet();
581
472
    (*field)->val_str(&attribute,&attribute);
582
 
 
 
473
    
583
474
    if (was_null)
584
475
      (*field)->set_null();
585
476
 
590
481
 
591
482
      buffer.append('"');
592
483
 
593
 
      while (ptr < end_ptr)
 
484
      while (ptr < end_ptr) 
594
485
      {
595
486
        if (*ptr == '"')
596
487
        {
597
488
          buffer.append('\\');
598
489
          buffer.append('"');
599
 
          (void) *ptr++;
 
490
          *ptr++;
600
491
        }
601
492
        else if (*ptr == '\r')
602
493
        {
603
494
          buffer.append('\\');
604
495
          buffer.append('r');
605
 
          (void) *ptr++;
 
496
          *ptr++;
606
497
        }
607
498
        else if (*ptr == '\\')
608
499
        {
609
500
          buffer.append('\\');
610
501
          buffer.append('\\');
611
 
          (void) *ptr++;
 
502
          *ptr++;
612
503
        }
613
504
        else if (*ptr == '\n')
614
505
        {
615
506
          buffer.append('\\');
616
507
          buffer.append('n');
617
 
          (void) *ptr++;
 
508
          *ptr++;
618
509
        }
619
510
        else
620
511
          buffer.append(*ptr++);
644
535
*/
645
536
int ha_tina::chain_append()
646
537
{
647
 
  if (chain.size() > 0 && chain.back().second == current_position)
648
 
    chain.back().second= next_position;
 
538
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
539
    (chain_ptr -1)->end= next_position;
649
540
  else
650
 
    chain.push_back(make_pair(current_position, next_position));
 
541
  {
 
542
    /* We set up for the next position */
 
543
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
544
    {
 
545
      off_t location= chain_ptr - chain;
 
546
      chain_size += DEFAULT_CHAIN_LENGTH;
 
547
      if (chain_alloced)
 
548
      {
 
549
        /* Must cast since my_malloc unlike malloc doesn't have a void ptr */
 
550
        if ((chain= (tina_set *) my_realloc((unsigned char*)chain,
 
551
                                            chain_size, MYF(MY_WME))) == NULL)
 
552
          return -1;
 
553
      }
 
554
      else
 
555
      {
 
556
        tina_set *ptr= (tina_set *) my_malloc(chain_size * sizeof(tina_set),
 
557
                                              MYF(MY_WME));
 
558
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
559
        chain= ptr;
 
560
        chain_alloced++;
 
561
      }
 
562
      chain_ptr= chain + location;
 
563
    }
 
564
    chain_ptr->begin= current_position;
 
565
    chain_ptr->end= next_position;
 
566
    chain_ptr++;
 
567
  }
 
568
 
651
569
  return 0;
652
570
}
653
571
 
660
578
  off_t end_offset, curr_offset= current_position;
661
579
  int eoln_len;
662
580
  int error;
 
581
  bool read_all;
663
582
 
664
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
583
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
665
584
 
666
585
  /*
667
586
    We do not read further then local_saved_data_file_length in order
672
591
                       local_saved_data_file_length, &eoln_len)) == 0)
673
592
    return(HA_ERR_END_OF_FILE);
674
593
 
 
594
  /* We must read all columns in case a table is opened for update */
 
595
  read_all= !bitmap_is_clear_all(table->write_set);
675
596
  error= HA_ERR_CRASHED_ON_USAGE;
676
597
 
677
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
598
  memset(buf, 0, table->s->null_bytes);
678
599
 
679
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
600
  for (Field **field=table->field ; *field ; field++)
680
601
  {
681
602
    char curr_char;
682
603
 
741
662
      }
742
663
    }
743
664
 
744
 
    if ((*field)->isReadSet() || (*field)->isWriteSet())
 
665
    if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
745
666
    {
746
 
      /* This masks a bug in the logic for a SELECT * */
747
 
      (*field)->setWriteSet();
748
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
749
 
      {
 
667
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
668
                          CHECK_FIELD_WARN))
750
669
        goto err;
751
 
      }
752
 
 
753
670
      if ((*field)->flags & BLOB_FLAG)
754
671
      {
755
672
        Field_blob *blob= *(Field_blob**) field;
761
678
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
762
679
        if (src)
763
680
        {
764
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
 
681
          tgt= (unsigned char*) alloc_root(&blobroot, length);
765
682
          memmove(tgt, src, length);
766
683
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
767
684
        }
777
694
}
778
695
 
779
696
/*
 
697
  If frm_error() is called in table.cc this is called to find out what file
 
698
  extensions exist for this handler.
 
699
*/
 
700
static const char *ha_tina_exts[] = {
 
701
  CSV_EXT,
 
702
  CSM_EXT,
 
703
  NULL
 
704
};
 
705
 
 
706
const char **ha_tina::bas_ext() const
 
707
{
 
708
  return ha_tina_exts;
 
709
}
 
710
 
 
711
/*
 
712
  Three functions below are needed to enable concurrent insert functionality
 
713
  for CSV engine. For more details see mysys/thr_lock.c
 
714
*/
 
715
 
 
716
void tina_get_status(void* param, int)
 
717
{
 
718
  ha_tina *tina= (ha_tina*) param;
 
719
  tina->get_status();
 
720
}
 
721
 
 
722
void tina_update_status(void* param)
 
723
{
 
724
  ha_tina *tina= (ha_tina*) param;
 
725
  tina->update_status();
 
726
}
 
727
 
 
728
/* this should exist and return 0 for concurrent insert to work */
 
729
bool tina_check_status(void *)
 
730
{
 
731
  return 0;
 
732
}
 
733
 
 
734
/*
 
735
  Save the state of the table
 
736
 
 
737
  SYNOPSIS
 
738
    get_status()
 
739
 
 
740
  DESCRIPTION
 
741
    This function is used to retrieve the file length. During the lock
 
742
    phase of concurrent insert. For more details see comment to
 
743
    ha_tina::update_status below.
 
744
*/
 
745
 
 
746
void ha_tina::get_status()
 
747
{
 
748
  local_saved_data_file_length= share->saved_data_file_length;
 
749
}
 
750
 
 
751
 
 
752
/*
 
753
  Correct the state of the table. Called by unlock routines
 
754
  before the write lock is released.
 
755
 
 
756
  SYNOPSIS
 
757
    update_status()
 
758
 
 
759
  DESCRIPTION
 
760
    When we employ concurrent insert lock, we save current length of the file
 
761
    during the lock phase. We do not read further saved value, as we don't
 
762
    want to interfere with undergoing concurrent insert. Writers update file
 
763
    length info during unlock with update_status().
 
764
 
 
765
  NOTE
 
766
    For log tables concurrent insert works different. The reason is that
 
767
    log tables are always opened and locked. And as they do not unlock
 
768
    tables, the file length after writes should be updated in a different
 
769
    way. 
 
770
*/
 
771
 
 
772
void ha_tina::update_status()
 
773
{
 
774
  /* correct local_saved_data_file_length for writers */
 
775
  share->saved_data_file_length= local_saved_data_file_length;
 
776
}
 
777
 
 
778
 
 
779
/*
780
780
  Open a database file. Keep in mind that tables are caches, so
781
781
  this will not be called for every request. Any sort of positions
782
782
  that need to be reset should be kept in the ::extra() call.
783
783
*/
784
 
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
 
784
int ha_tina::open(const char *name, int, uint32_t open_options)
785
785
{
786
 
  if (not (share= get_share(identifier.getPath().c_str())))
787
 
    return(ENOENT);
 
786
  if (!(share= get_share(name, table)))
 
787
    return(HA_ERR_OUT_OF_MEM);
788
788
 
789
 
  if (share->crashed)
 
789
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
790
790
  {
791
 
    free_share();
 
791
    free_share(share);
792
792
    return(HA_ERR_CRASHED_ON_USAGE);
793
793
  }
794
794
 
795
795
  local_data_file_version= share->data_file_version;
796
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
796
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
797
797
    return(0);
798
798
 
799
799
  /*
800
 
    Init locking. Pass Cursor object to the locking routines,
 
800
    Init locking. Pass handler object to the locking routines,
801
801
    so that they could save/update local_saved_data_file_length value
802
802
    during locking. This is needed to enable concurrent inserts.
803
803
  */
 
804
  thr_lock_data_init(&share->lock, &lock, (void*) this);
804
805
  ref_length=sizeof(off_t);
805
806
 
 
807
  share->lock.get_status= tina_get_status;
 
808
  share->lock.update_status= tina_update_status;
 
809
  share->lock.check_status= tina_check_status;
 
810
 
806
811
  return(0);
807
812
}
808
813
 
 
814
 
809
815
/*
810
816
  Close a database file. We remove ourselves from the shared strucutre.
811
817
  If it is empty we destroy it.
813
819
int ha_tina::close(void)
814
820
{
815
821
  int rc= 0;
816
 
  rc= internal::my_close(data_file, MYF(0));
817
 
  return(free_share() || rc);
 
822
  rc= my_close(data_file, MYF(0));
 
823
  return(free_share(share) || rc);
818
824
}
819
825
 
820
826
/*
821
 
  This is an INSERT. At the moment this Cursor just seeks to the end
 
827
  This is an INSERT. At the moment this handler just seeks to the end
822
828
  of the file and appends the data. In an error case it really should
823
829
  just truncate to the original position (this is not done yet).
824
830
*/
825
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
831
int ha_tina::write_row(unsigned char * buf)
826
832
{
827
833
  int size;
828
834
 
829
835
  if (share->crashed)
830
836
      return(HA_ERR_CRASHED_ON_USAGE);
831
837
 
 
838
  ha_statistic_increment(&SSV::ha_write_count);
 
839
 
 
840
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
841
    table->timestamp_field->set_time();
 
842
 
832
843
  size= encode_quote(buf);
833
844
 
834
845
  if (!share->tina_write_opened)
836
847
      return(-1);
837
848
 
838
849
   /* use pwrite, as concurrent reader could have changed the position */
839
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
850
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
840
851
               MYF(MY_WME | MY_NABP)))
841
852
    return(-1);
842
853
 
861
872
  if (!share->update_file_opened)
862
873
  {
863
874
    if ((update_temp_file=
864
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
875
           my_create(fn_format(updated_fname, share->table_name,
865
876
                               "", CSN_EXT,
866
877
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
867
878
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
876
887
  This is called for an update.
877
888
  Make sure you put in code to increment the auto increment, also
878
889
  update any timestamp data. Currently auto increment is not being
879
 
  fixed since autoincrements have yet to be added to this table Cursor.
 
890
  fixed since autoincrements have yet to be added to this table handler.
880
891
  This will be called in a table scan right before the previous ::rnd_next()
881
892
  call.
882
893
*/
883
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
894
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
884
895
{
885
896
  int size;
886
897
  int rc= -1;
887
898
 
 
899
  ha_statistic_increment(&SSV::ha_update_count);
 
900
 
 
901
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
902
    table->timestamp_field->set_time();
 
903
 
888
904
  size= encode_quote(new_data);
889
905
 
890
906
  /*
891
 
    During update we mark each updating record as deleted
892
 
    (see the chain_append()) then write new one to the temporary data file.
893
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
907
    During update we mark each updating record as deleted 
 
908
    (see the chain_append()) then write new one to the temporary data file. 
 
909
    At the end of the sequence in the rnd_end() we append all non-marked
894
910
    records from the data file to the temporary data file then rename it.
895
911
    The temp_file_length is used to calculate new data file length.
896
912
  */
900
916
  if (open_update_temp_file_if_needed())
901
917
    goto err;
902
918
 
903
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
919
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
904
920
               MYF(MY_WME | MY_NABP)))
905
921
    goto err;
906
922
  temp_file_length+= size;
915
931
  Deletes a row. First the database will find the row, and then call this
916
932
  method. In the case of a table scan, the previous call to this will be
917
933
  the ::rnd_next() that found this row.
918
 
  The exception to this is an ORDER BY. This will cause the table Cursor
 
934
  The exception to this is an ORDER BY. This will cause the table handler
919
935
  to walk the table noting the positions of all rows that match a query.
920
936
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
921
937
  DESC, ASC).
922
938
*/
923
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
939
int ha_tina::delete_row(const unsigned char *)
924
940
{
 
941
  ha_statistic_increment(&SSV::ha_delete_count);
925
942
 
926
943
  if (chain_append())
927
944
    return(-1);
939
956
 
940
957
/**
941
958
  @brief Initialize the data file.
942
 
 
 
959
  
943
960
  @details Compare the local version of the data file with the shared one.
944
961
  If they differ, there are some changes behind and we have to reopen
945
962
  the data file to make the changes visible.
946
 
  Call @c file_buff->init_buff() at the end to read the beginning of the
 
963
  Call @c file_buff->init_buff() at the end to read the beginning of the 
947
964
  data file into buffer.
948
 
 
 
965
  
949
966
  @retval  0  OK.
950
967
  @retval  1  There was an error.
951
968
*/
955
972
  if (local_data_file_version != share->data_file_version)
956
973
  {
957
974
    local_data_file_version= share->data_file_version;
958
 
    if (internal::my_close(data_file, MYF(0)) ||
959
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
975
    if (my_close(data_file, MYF(0)) ||
 
976
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
960
977
      return 1;
961
978
  }
962
979
  file_buff->init_buff(data_file);
968
985
  All table scans call this first.
969
986
  The order of a table scan is:
970
987
 
 
988
  ha_tina::store_lock
 
989
  ha_tina::external_lock
971
990
  ha_tina::info
972
991
  ha_tina::rnd_init
973
992
  ha_tina::extra
983
1002
  ha_tina::rnd_next
984
1003
  ha_tina::extra
985
1004
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1005
  ha_tina::external_lock
986
1006
  ha_tina::extra
987
1007
  ENUM HA_EXTRA_RESET   Reset database to after open
988
1008
 
992
1012
 
993
1013
*/
994
1014
 
995
 
int ha_tina::doStartTableScan(bool)
 
1015
int ha_tina::rnd_init(bool)
996
1016
{
997
1017
  /* set buffer to the beginning of the file */
998
1018
  if (share->crashed || init_data_file())
1001
1021
  current_position= next_position= 0;
1002
1022
  stats.records= 0;
1003
1023
  records_is_known= 0;
1004
 
  chain.clear();
 
1024
  chain_ptr= chain;
1005
1025
 
1006
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1026
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1007
1027
 
1008
1028
  return(0);
1009
1029
}
1018
1038
  reserved for null count.
1019
1039
  Basically this works as a mask for which rows are nulled (compared to just
1020
1040
  empty).
1021
 
  This table Cursor doesn't do nulls and does not know the difference between
1022
 
  NULL and "". This is ok since this table Cursor is for spreadsheets and
 
1041
  This table handler doesn't do nulls and does not know the difference between
 
1042
  NULL and "". This is ok since this table handler is for spreadsheets and
1023
1043
  they don't know about them either :)
1024
1044
*/
1025
1045
int ha_tina::rnd_next(unsigned char *buf)
1029
1049
  if (share->crashed)
1030
1050
      return(HA_ERR_CRASHED_ON_USAGE);
1031
1051
 
1032
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1052
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1033
1053
 
1034
1054
  current_position= next_position;
1035
1055
 
1055
1075
*/
1056
1076
void ha_tina::position(const unsigned char *)
1057
1077
{
1058
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1078
  my_store_ptr(ref, ref_length, current_position);
1059
1079
  return;
1060
1080
}
1061
1081
 
1062
1082
 
1063
1083
/*
1064
1084
  Used to fetch a row from a posiion stored with ::position().
1065
 
  internal::my_get_ptr() retrieves the data for you.
 
1085
  my_get_ptr() retrieves the data for you.
1066
1086
*/
1067
1087
 
1068
1088
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1069
1089
{
1070
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1071
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1090
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1091
  current_position= (off_t)my_get_ptr(pos,ref_length);
1072
1092
  return(find_current_row(buf));
1073
1093
}
1074
1094
 
1075
1095
/*
1076
1096
  ::info() is used to return information to the optimizer.
1077
 
  Currently this table Cursor doesn't implement most of the fields
 
1097
  Currently this table handler doesn't implement most of the fields
1078
1098
  really needed. SHOW also makes use of this data
1079
1099
*/
1080
1100
int ha_tina::info(uint32_t)
1081
1101
{
1082
1102
  /* This is a lie, but you don't want the optimizer to see zero or 1 */
1083
 
  if (!records_is_known && stats.records < 2)
 
1103
  if (!records_is_known && stats.records < 2) 
1084
1104
    stats.records= 2;
1085
1105
  return(0);
1086
1106
}
1090
1110
  to the given "hole", stored in the buffer. "Valid" here means,
1091
1111
  not listed in the chain of deleted records ("holes").
1092
1112
*/
1093
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1113
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1094
1114
{
1095
 
  if (closest_hole == chain.end()) /* no more chains */
 
1115
  if (closest_hole == chain_ptr) /* no more chains */
1096
1116
    *end_pos= file_buff->end();
1097
1117
  else
1098
1118
    *end_pos= std::min(file_buff->end(),
1099
 
                       closest_hole->first);
1100
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1119
                       closest_hole->begin);
 
1120
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1101
1121
}
1102
1122
 
1103
1123
 
1107
1127
  slots to clean up all of the dead space we have collected while
1108
1128
  performing deletes/updates.
1109
1129
*/
1110
 
int ha_tina::doEndTableScan()
 
1130
int ha_tina::rnd_end()
1111
1131
{
 
1132
  char updated_fname[FN_REFLEN];
1112
1133
  off_t file_buffer_start= 0;
1113
1134
 
1114
 
  blobroot.free_root(MYF(0));
 
1135
  free_root(&blobroot, MYF(0));
1115
1136
  records_is_known= 1;
1116
1137
 
1117
 
  if (chain.size() > 0)
 
1138
  if ((chain_ptr - chain)  > 0)
1118
1139
  {
1119
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1140
    tina_set *ptr= chain;
1120
1141
 
1121
1142
    /*
1122
1143
      Re-read the beginning of a file (as the buffer should point to the
1128
1149
      The sort is needed when there were updates/deletes with random orders.
1129
1150
      It sorts so that we move the firts blocks to the beginning.
1130
1151
    */
1131
 
    sort(chain.begin(), chain.end());
 
1152
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1153
             (qsort_cmp)sort_set);
1132
1154
 
1133
1155
    off_t write_begin= 0, write_end;
1134
1156
 
1141
1163
    {
1142
1164
      bool in_hole= get_write_pos(&write_end, ptr);
1143
1165
      off_t write_length= write_end - write_begin;
1144
 
      if ((uint64_t)write_length > SIZE_MAX)
1145
 
      {
1146
 
        goto error;
1147
 
      }
1148
1166
 
1149
1167
      /* if there is something to write, write it */
1150
1168
      if (write_length)
1151
1169
      {
1152
 
        if (internal::my_write(update_temp_file,
 
1170
        if (my_write(update_temp_file, 
1153
1171
                     (unsigned char*) (file_buff->ptr() +
1154
1172
                               (write_begin - file_buff->start())),
1155
 
                     (size_t)write_length, MYF_RW))
 
1173
                     write_length, MYF_RW))
1156
1174
          goto error;
1157
1175
        temp_file_length+= write_length;
1158
1176
      }
1159
1177
      if (in_hole)
1160
1178
      {
1161
1179
        /* skip hole */
1162
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1180
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1163
1181
          file_buffer_start= file_buff->read_next();
1164
 
        write_begin= ptr->second;
1165
 
        ++ptr;
 
1182
        write_begin= ptr->end;
 
1183
        ptr++;
1166
1184
      }
1167
1185
      else
1168
1186
        write_begin= write_end;
1172
1190
 
1173
1191
    }
1174
1192
 
1175
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1176
 
        internal::my_close(update_temp_file, MYF(0)))
 
1193
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1194
        my_close(update_temp_file, MYF(0)))
1177
1195
      return(-1);
1178
1196
 
1179
1197
    share->update_file_opened= false;
1180
1198
 
1181
1199
    if (share->tina_write_opened)
1182
1200
    {
1183
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1201
      if (my_close(share->tina_write_filedes, MYF(0)))
1184
1202
        return(-1);
1185
1203
      /*
1186
1204
        Mark that the writer fd is closed, so that init_tina_writer()
1193
1211
      Close opened fildes's. Then move updated file in place
1194
1212
      of the old datafile.
1195
1213
    */
1196
 
    std::string rename_file= share->table_name;
1197
 
    rename_file.append(CSN_EXT);
1198
 
    if (internal::my_close(data_file, MYF(0)) ||
1199
 
        internal::my_rename(rename_file.c_str(),
1200
 
                            share->data_file_name.c_str(), MYF(0)))
 
1214
    if (my_close(data_file, MYF(0)) ||
 
1215
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1216
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1217
                  share->data_file_name, MYF(0)))
1201
1218
      return(-1);
1202
1219
 
1203
1220
    /* Open the file again */
1204
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1221
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1205
1222
      return(-1);
1206
1223
    /*
1207
 
      As we reopened the data file, increase share->data_file_version
1208
 
      in order to force other threads waiting on a table lock and
 
1224
      As we reopened the data file, increase share->data_file_version 
 
1225
      in order to force other threads waiting on a table lock and  
1209
1226
      have already opened the table to reopen the data file.
1210
1227
      That makes the latest changes become visible to them.
1211
 
      Update local_data_file_version as no need to reopen it in the
 
1228
      Update local_data_file_version as no need to reopen it in the 
1212
1229
      current thread.
1213
1230
    */
1214
1231
    share->data_file_version++;
1219
1236
      Here we record this fact to the meta-file.
1220
1237
    */
1221
1238
    (void)write_meta_file(share->meta_file, share->rows_recorded, false);
1222
 
    /*
1223
 
      Update local_saved_data_file_length with the real length of the
 
1239
    /* 
 
1240
      Update local_saved_data_file_length with the real length of the 
1224
1241
      data file.
1225
1242
    */
1226
1243
    local_saved_data_file_length= temp_file_length;
1228
1245
 
1229
1246
  return(0);
1230
1247
error:
1231
 
  internal::my_close(update_temp_file, MYF(0));
 
1248
  my_close(update_temp_file, MYF(0));
1232
1249
  share->update_file_opened= false;
1233
1250
  return(-1);
1234
1251
}
1235
1252
 
1236
1253
 
1237
1254
/*
 
1255
  Repair CSV table in the case, it is crashed.
 
1256
 
 
1257
  SYNOPSIS
 
1258
    repair()
 
1259
    session         The thread, performing repair
 
1260
    check_opt   The options for repair. We do not use it currently.
 
1261
 
 
1262
  DESCRIPTION
 
1263
    If the file is empty, change # of rows in the file and complete recovery.
 
1264
    Otherwise, scan the table looking for bad rows. If none were found,
 
1265
    we mark file as a good one and return. If a bad row was encountered,
 
1266
    we truncate the datafile up to the last good row.
 
1267
 
 
1268
   TODO: Make repair more clever - it should try to recover subsequent
 
1269
         rows (after the first bad one) as well.
 
1270
*/
 
1271
 
 
1272
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
 
1273
{
 
1274
  char repaired_fname[FN_REFLEN];
 
1275
  unsigned char *buf;
 
1276
  File repair_file;
 
1277
  int rc;
 
1278
  ha_rows rows_repaired= 0;
 
1279
  off_t write_begin= 0, write_end;
 
1280
 
 
1281
  /* empty file */
 
1282
  if (!share->saved_data_file_length)
 
1283
  {
 
1284
    share->rows_recorded= 0;
 
1285
    goto end;
 
1286
  }
 
1287
 
 
1288
  /* Don't assert in field::val() functions */
 
1289
  table->use_all_columns();
 
1290
  if (!(buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1291
    return(HA_ERR_OUT_OF_MEM);
 
1292
 
 
1293
  /* position buffer to the start of the file */
 
1294
  if (init_data_file())
 
1295
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1296
 
 
1297
  /*
 
1298
    Local_saved_data_file_length is initialized during the lock phase.
 
1299
    Sometimes this is not getting executed before ::repair (e.g. for
 
1300
    the log tables). We set it manually here.
 
1301
  */
 
1302
  local_saved_data_file_length= share->saved_data_file_length;
 
1303
  /* set current position to the beginning of the file */
 
1304
  current_position= next_position= 0;
 
1305
 
 
1306
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1307
 
 
1308
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1309
  while (!(rc= find_current_row(buf)))
 
1310
  {
 
1311
    session_inc_row_count(session);
 
1312
    rows_repaired++;
 
1313
    current_position= next_position;
 
1314
  }
 
1315
 
 
1316
  free_root(&blobroot, MYF(0));
 
1317
 
 
1318
  free((char*)buf);
 
1319
 
 
1320
  if (rc == HA_ERR_END_OF_FILE)
 
1321
  {
 
1322
    /*
 
1323
      All rows were read ok until end of file, the file does not need repair.
 
1324
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1325
      to the current amount of rows.
 
1326
    */
 
1327
    share->rows_recorded= rows_repaired;
 
1328
    goto end;
 
1329
  }
 
1330
 
 
1331
  /*
 
1332
    Otherwise we've encountered a bad row => repair is needed.
 
1333
    Let us create a temporary file.
 
1334
  */
 
1335
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1336
                                        "", CSN_EXT,
 
1337
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1338
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1339
    return(HA_ERR_CRASHED_ON_REPAIR);
 
1340
 
 
1341
  file_buff->init_buff(data_file);
 
1342
 
 
1343
 
 
1344
  /* we just truncated the file up to the first bad row. update rows count. */
 
1345
  share->rows_recorded= rows_repaired;
 
1346
 
 
1347
  /* write repaired file */
 
1348
  while (1)
 
1349
  {
 
1350
    write_end= std::min(file_buff->end(), current_position);
 
1351
    if ((write_end - write_begin) &&
 
1352
        (my_write(repair_file, (unsigned char*)file_buff->ptr(),
 
1353
                  write_end - write_begin, MYF_RW)))
 
1354
      return(-1);
 
1355
 
 
1356
    write_begin= write_end;
 
1357
    if (write_end== current_position)
 
1358
      break;
 
1359
    else
 
1360
      file_buff->read_next(); /* shift the buffer */
 
1361
  }
 
1362
 
 
1363
  /*
 
1364
    Close the files and rename repaired file to the datafile.
 
1365
    We have to close the files, as on Windows one cannot rename
 
1366
    a file, which descriptor is still open. EACCES will be returned
 
1367
    when trying to delete the "to"-file in my_rename().
 
1368
  */
 
1369
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1370
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1371
    return(-1);
 
1372
 
 
1373
  /* Open the file again, it should now be repaired */
 
1374
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1375
                          MYF(0))) == -1)
 
1376
     return(-1);
 
1377
 
 
1378
  /* Set new file size. The file size will be updated by ::update_status() */
 
1379
  local_saved_data_file_length= (size_t) current_position;
 
1380
 
 
1381
end:
 
1382
  share->crashed= false;
 
1383
  return(HA_ADMIN_OK);
 
1384
}
 
1385
 
 
1386
/*
1238
1387
  DELETE without WHERE calls this
1239
1388
*/
1240
1389
 
1243
1392
  int rc;
1244
1393
 
1245
1394
  if (!records_is_known)
1246
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1395
    return(my_errno=HA_ERR_WRONG_COMMAND);
1247
1396
 
1248
1397
  if (!share->tina_write_opened)
1249
1398
    if (init_tina_writer())
1262
1411
}
1263
1412
 
1264
1413
/*
 
1414
  Called by the database to lock the table. Keep in mind that this
 
1415
  is an internal lock.
 
1416
*/
 
1417
THR_LOCK_DATA **ha_tina::store_lock(Session *,
 
1418
                                    THR_LOCK_DATA **to,
 
1419
                                    enum thr_lock_type lock_type)
 
1420
{
 
1421
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1422
    lock.type=lock_type;
 
1423
  *to++= &lock;
 
1424
  return to;
 
1425
}
 
1426
 
 
1427
/* 
1265
1428
  Create a table. You do not want to leave the table open after a call to
1266
1429
  this (the database will call ::open() if it needs to).
1267
1430
*/
1268
1431
 
1269
 
int Tina::doCreateTable(Session &session,
1270
 
                        Table& table_arg,
1271
 
                        const drizzled::identifier::Table &identifier,
1272
 
                        drizzled::message::Table &create_proto)
 
1432
int ha_tina::create(const char *name, Table *table_arg, HA_CREATE_INFO *)
1273
1433
{
1274
1434
  char name_buff[FN_REFLEN];
1275
 
  int create_file;
 
1435
  File create_file;
1276
1436
 
1277
1437
  /*
1278
1438
    check columns
1279
1439
  */
1280
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
 
       iter != fields.end();
1283
 
       iter++)
 
1440
  for (Field **field= table_arg->s->field; *field; field++)
1284
1441
  {
1285
 
    if (not *iter) // Historical legacy for NULL array end.
1286
 
      continue;
1287
 
 
1288
 
    if ((*iter)->real_maybe_null())
 
1442
    if ((*field)->real_maybe_null())
1289
1443
    {
1290
1444
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1445
      return(HA_ERR_UNSUPPORTED);
1292
1446
    }
1293
1447
  }
1294
 
 
1295
 
 
1296
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1448
  
 
1449
 
 
1450
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
 
1451
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1452
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1299
1453
    return(-1);
1300
1454
 
1301
1455
  write_meta_file(create_file, 0, false);
1302
 
  internal::my_close(create_file, MYF(0));
 
1456
  my_close(create_file, MYF(0));
1303
1457
 
1304
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1458
  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
 
1459
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1460
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1307
1461
    return(-1);
1308
1462
 
1309
 
  internal::my_close(create_file, MYF(0));
1310
 
 
1311
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
1312
 
 
1313
 
  return 0;
1314
 
}
1315
 
 
1316
 
 
1317
 
DRIZZLE_DECLARE_PLUGIN
1318
 
{
1319
 
  DRIZZLE_VERSION_ID,
 
1463
  my_close(create_file, MYF(0));
 
1464
 
 
1465
  return(0);
 
1466
}
 
1467
 
 
1468
int ha_tina::check(Session* session, HA_CHECK_OPT *)
 
1469
{
 
1470
  int rc= 0;
 
1471
  unsigned char *buf;
 
1472
  const char *old_proc_info;
 
1473
  ha_rows count= share->rows_recorded;
 
1474
 
 
1475
  old_proc_info= get_session_proc_info(session);
 
1476
  set_session_proc_info(session, "Checking table");
 
1477
  if (!(buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1478
    return(HA_ERR_OUT_OF_MEM);
 
1479
 
 
1480
  /* position buffer to the start of the file */
 
1481
   if (init_data_file())
 
1482
     return(HA_ERR_CRASHED);
 
1483
 
 
1484
  /*
 
1485
    Local_saved_data_file_length is initialized during the lock phase.
 
1486
    Check does not use store_lock in certain cases. So, we set it
 
1487
    manually here.
 
1488
  */
 
1489
  local_saved_data_file_length= share->saved_data_file_length;
 
1490
  /* set current position to the beginning of the file */
 
1491
  current_position= next_position= 0;
 
1492
 
 
1493
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1494
 
 
1495
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1496
  while (!(rc= find_current_row(buf)))
 
1497
  {
 
1498
    session_inc_row_count(session);
 
1499
    count--;
 
1500
    current_position= next_position;
 
1501
  }
 
1502
  
 
1503
  free_root(&blobroot, MYF(0));
 
1504
 
 
1505
  free((char*)buf);
 
1506
  set_session_proc_info(session, old_proc_info);
 
1507
 
 
1508
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1509
  {
 
1510
    share->crashed= true;
 
1511
    return(HA_ADMIN_CORRUPT);
 
1512
  }
 
1513
  else
 
1514
    return(HA_ADMIN_OK);
 
1515
}
 
1516
 
 
1517
 
 
1518
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *, uint32_t)
 
1519
{
 
1520
  return COMPATIBLE_DATA_YES;
 
1521
}
 
1522
 
 
1523
mysql_declare_plugin(csv)
 
1524
{
 
1525
  DRIZZLE_STORAGE_ENGINE_PLUGIN,
1320
1526
  "CSV",
1321
1527
  "1.0",
1322
1528
  "Brian Aker, MySQL AB",
1323
1529
  "CSV storage engine",
1324
1530
  PLUGIN_LICENSE_GPL,
1325
1531
  tina_init_func, /* Plugin Init */
1326
 
  NULL,                       /* depends */
 
1532
  tina_done_func, /* Plugin Deinit */
 
1533
  NULL,                       /* status variables                */
 
1534
  NULL,                       /* system variables                */
1327
1535
  NULL                        /* config options                  */
1328
1536
}
1329
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1537
mysql_declare_plugin_end;
1330
1538