~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/csv/ha_tina.cc

New merge for TableShare

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
 
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
46
#include <drizzled/field/timestamp.h>
47
47
#include <drizzled/error.h>
48
48
#include <drizzled/table.h>
49
49
#include <drizzled/session.h>
50
 
#include "drizzled/internal/my_sys.h"
 
50
#include "drizzled/memory/multi_malloc.h"
51
51
 
52
52
#include "ha_tina.h"
53
53
 
54
 
#include <fcntl.h>
55
 
 
56
 
#include <algorithm>
57
 
#include <vector>
58
54
#include <string>
59
 
#include <map>
60
55
 
61
56
using namespace std;
62
 
using namespace drizzled;
 
57
 
 
58
static const string engine_name("CSV");
63
59
 
64
60
/*
65
61
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
75
71
#define CSM_EXT ".CSM"               // Meta file
76
72
 
77
73
 
78
 
static int read_meta_file(int meta_file, ha_rows *rows);
79
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
74
static TINA_SHARE *get_share(const char *table_name, Table *table);
 
75
static int free_share(TINA_SHARE *share);
 
76
static int read_meta_file(File meta_file, ha_rows *rows);
 
77
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
78
 
 
79
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
80
extern "C" void tina_update_status(void* param);
 
81
extern "C" bool tina_check_status(void* param);
80
82
 
81
83
/* Stuff for shares */
82
84
pthread_mutex_t tina_mutex;
 
85
static HASH tina_open_tables;
83
86
 
84
87
/*****************************************************************************
85
88
 ** TINA tables
86
89
 *****************************************************************************/
87
90
 
88
91
/*
 
92
  Used for sorting chains with qsort().
 
93
*/
 
94
static int sort_set (tina_set *a, tina_set *b)
 
95
{
 
96
  /*
 
97
    We assume that intervals do not intersect. So, it is enought to compare
 
98
    any two points. Here we take start of intervals for comparison.
 
99
  */
 
100
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
101
}
 
102
 
 
103
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
 
104
{
 
105
  *length=share->table_name_length;
 
106
  return (unsigned char*) share->table_name;
 
107
}
 
108
 
 
109
 
 
110
/*
89
111
  If frm_error() is called in table.cc this is called to find out what file
90
112
  extensions exist for this Cursor.
91
113
*/
97
119
 
98
120
class Tina : public drizzled::plugin::StorageEngine
99
121
{
100
 
  typedef std::map<string, TinaShare*> TinaMap;
101
 
  TinaMap tina_open_tables;
102
122
public:
103
123
  Tina(const string& name_arg)
104
 
   : drizzled::plugin::StorageEngine(name_arg,
105
 
                                     HTON_TEMPORARY_ONLY |
106
 
                                     HTON_NO_AUTO_INCREMENT |
107
 
                                     HTON_SKIP_STORE_LOCK),
108
 
    tina_open_tables()
109
 
  {}
110
 
  virtual ~Tina()
 
124
   : drizzled::plugin::StorageEngine(name_arg, HTON_TEMPORARY_ONLY | 
 
125
                                     HTON_HAS_DATA_DICTIONARY | HTON_FILE_BASED) {}
 
126
  virtual Cursor *create(TableShare &table,
 
127
                          MEM_ROOT *mem_root)
111
128
  {
112
 
    pthread_mutex_destroy(&tina_mutex);
 
129
    return new (mem_root) ha_tina(*this, table);
113
130
  }
114
131
 
115
 
  virtual Cursor *create(Table &table)
 
132
  uint64_t table_flags() const
116
133
  {
117
 
    return new ha_tina(*this, table);
 
134
    return (HA_NO_TRANSACTIONS | HA_NO_AUTO_INCREMENT);
118
135
  }
119
136
 
120
137
  const char **bas_ext() const {
121
138
    return ha_tina_exts;
122
139
  }
123
140
 
124
 
  int doCreateTable(Session &,
125
 
                    Table &table_arg,
126
 
                    const drizzled::TableIdentifier &identifier,
 
141
  int doCreateTable(Session *, 
 
142
                    const char *table_name,
 
143
                    Table& table_arg,
127
144
                    drizzled::message::Table&);
128
145
 
129
146
  int doGetTableDefinition(Session& session,
130
 
                           const drizzled::TableIdentifier &identifier,
131
 
                           drizzled::message::Table &table_message);
132
 
 
133
 
  int doDropTable(Session&, const drizzled::TableIdentifier &identifier);
134
 
  TinaShare *findOpenTable(const string table_name);
135
 
  void addOpenTable(const string &table_name, TinaShare *);
136
 
  void deleteOpenTable(const string &table_name);
137
 
 
138
 
 
139
 
  uint32_t max_keys()          const { return 0; }
140
 
  uint32_t max_key_parts()     const { return 0; }
141
 
  uint32_t max_key_length()    const { return 0; }
142
 
  bool doDoesTableExist(Session& session, const drizzled::TableIdentifier &identifier);
143
 
  int doRenameTable(Session&, const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to);
144
 
 
145
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
146
 
                             const drizzled::SchemaIdentifier &schema_identifier,
147
 
                             drizzled::TableIdentifier::vector &set_of_identifiers);
 
147
                           const char* path,
 
148
                           const char *db,
 
149
                           const char *table_name,
 
150
                           const bool is_tmp,
 
151
                           drizzled::message::Table *table_proto);
 
152
 
 
153
  /* Temp only engine, so do not return values. */
 
154
  void doGetTableNames(CachedDirectory &, string& , set<string>&) { };
 
155
 
 
156
  int doDropTable(Session&, const string table_path);
148
157
};
149
158
 
150
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
151
 
                                 const drizzled::SchemaIdentifier&,
152
 
                                 drizzled::TableIdentifier::vector&)
153
 
{
154
 
}
155
 
 
156
 
int Tina::doRenameTable(Session &session,
157
 
                        const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to)
158
 
{
159
 
  int error= 0;
160
 
  for (const char **ext= bas_ext(); *ext ; ext++)
161
 
  {
162
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
 
    {
164
 
      if ((error=errno) != ENOENT)
165
 
        break;
166
 
      error= 0;
167
 
    }
168
 
  }
169
 
 
170
 
  session.getMessageCache().renameTableMessage(from, to);
171
 
 
172
 
  return error;
173
 
}
174
 
 
175
 
bool Tina::doDoesTableExist(Session &session, const drizzled::TableIdentifier &identifier)
176
 
{
177
 
  return session.getMessageCache().doesTableMessageExist(identifier);
178
 
}
179
 
 
180
 
 
181
 
int Tina::doDropTable(Session &session,
182
 
                      const drizzled::TableIdentifier &identifier)
 
159
int Tina::doDropTable(Session&,
 
160
                        const string table_path)
183
161
{
184
162
  int error= 0;
185
163
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
 
164
  char buff[FN_REFLEN];
 
165
  ProtoCache::iterator iter;
186
166
 
187
167
  for (const char **ext= bas_ext(); *ext ; ext++)
188
168
  {
189
 
    std::string full_name= identifier.getPath();
190
 
    full_name.append(*ext);
191
 
 
192
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
 
169
    fn_format(buff, table_path.c_str(), "", *ext,
 
170
              MY_UNPACK_FILENAME|MY_APPEND_EXT);
 
171
    if (my_delete_with_symlink(buff, MYF(0)))
193
172
    {
194
 
      if ((error= errno) != ENOENT)
 
173
      if ((error= my_errno) != ENOENT)
195
174
        break;
196
175
    }
197
176
    else
198
 
    {
199
177
      enoent_or_zero= 0;                        // No error for ENOENT
200
 
    }
201
178
    error= enoent_or_zero;
202
179
  }
203
180
 
204
 
  session.getMessageCache().removeTableMessage(identifier);
205
 
 
206
 
  return error;
207
 
}
208
 
 
209
 
TinaShare *Tina::findOpenTable(const string table_name)
210
 
{
211
 
  TinaMap::iterator find_iter=
212
 
    tina_open_tables.find(table_name);
213
 
 
214
 
  if (find_iter != tina_open_tables.end())
215
 
    return (*find_iter).second;
216
 
  else
217
 
    return NULL;
218
 
}
219
 
 
220
 
void Tina::addOpenTable(const string &table_name, TinaShare *share)
221
 
{
222
 
  tina_open_tables[table_name]= share;
223
 
}
224
 
 
225
 
void Tina::deleteOpenTable(const string &table_name)
226
 
{
227
 
  tina_open_tables.erase(table_name);
228
 
}
229
 
 
230
 
 
231
 
int Tina::doGetTableDefinition(Session &session,
232
 
                               const drizzled::TableIdentifier &identifier,
233
 
                               drizzled::message::Table &table_message)
234
 
{
235
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
236
 
    return EEXIST;
237
 
 
238
 
  return ENOENT;
 
181
  pthread_mutex_lock(&proto_cache_mutex);
 
182
  iter= proto_cache.find(table_path.c_str());
 
183
 
 
184
  if (iter!= proto_cache.end())
 
185
    proto_cache.erase(iter);
 
186
  pthread_mutex_unlock(&proto_cache_mutex);
 
187
 
 
188
  return error;
 
189
}
 
190
 
 
191
int Tina::doGetTableDefinition(Session&,
 
192
                               const char* path,
 
193
                               const char *,
 
194
                               const char *,
 
195
                               const bool,
 
196
                               drizzled::message::Table *table_proto)
 
197
{
 
198
  int error= ENOENT;
 
199
  ProtoCache::iterator iter;
 
200
 
 
201
  pthread_mutex_lock(&proto_cache_mutex);
 
202
  iter= proto_cache.find(path);
 
203
 
 
204
  if (iter!= proto_cache.end())
 
205
  {
 
206
    if (table_proto)
 
207
      table_proto->CopyFrom(((*iter).second));
 
208
    error= EEXIST;
 
209
  }
 
210
  pthread_mutex_unlock(&proto_cache_mutex);
 
211
 
 
212
  return error;
239
213
}
240
214
 
241
215
 
242
216
static Tina *tina_engine= NULL;
243
217
 
244
 
static int tina_init_func(drizzled::module::Context &context)
 
218
static int tina_init_func(drizzled::plugin::Registry &registry)
245
219
{
246
220
 
247
 
  tina_engine= new Tina("CSV");
248
 
  context.add(tina_engine);
 
221
  tina_engine= new Tina(engine_name);
 
222
  registry.add(tina_engine);
249
223
 
250
224
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
251
 
  return 0;
252
 
}
253
 
 
254
 
 
255
 
 
256
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
257
 
  table_name(table_name_arg),
258
 
  data_file_name(table_name_arg),
259
 
  use_count(0),
260
 
  saved_data_file_length(0),
261
 
  update_file_opened(false),
262
 
  tina_write_opened(false),
263
 
  crashed(false),
264
 
  rows_recorded(0),
265
 
  data_file_version(0)
266
 
{
267
 
  data_file_name.append(CSV_EXT);
268
 
}
269
 
 
270
 
TinaShare::~TinaShare()
271
 
{
272
 
  pthread_mutex_destroy(&mutex);
273
 
}
 
225
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
226
                   (hash_get_key) tina_get_key,0,0);
 
227
  return 0;
 
228
}
 
229
 
 
230
static int tina_done_func(drizzled::plugin::Registry &registry)
 
231
{
 
232
  registry.remove(tina_engine);
 
233
  delete tina_engine;
 
234
 
 
235
  hash_free(&tina_open_tables);
 
236
  pthread_mutex_destroy(&tina_mutex);
 
237
 
 
238
  return 0;
 
239
}
 
240
 
274
241
 
275
242
/*
276
243
  Simple lock controls.
277
244
*/
278
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
245
static TINA_SHARE *get_share(const char *table_name, Table *)
279
246
{
280
 
  pthread_mutex_lock(&tina_mutex);
281
 
 
282
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
283
 
  share= a_tina->findOpenTable(table_name);
284
 
 
285
 
  std::string meta_file_name;
 
247
  TINA_SHARE *share;
 
248
  char meta_file_name[FN_REFLEN];
286
249
  struct stat file_stat;
 
250
  char *tmp_name;
 
251
  uint32_t length;
 
252
 
 
253
  pthread_mutex_lock(&tina_mutex);
 
254
  length=(uint) strlen(table_name);
287
255
 
288
256
  /*
289
257
    If share is not present in the hash, create a new share and
290
258
    initialize its members.
291
259
  */
292
 
  if (! share)
 
260
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
261
                                        (unsigned char*) table_name,
 
262
                                       length)))
293
263
  {
294
 
    share= new TinaShare(table_name);
295
 
 
296
 
    if (share == NULL)
297
 
    {
298
 
      pthread_mutex_unlock(&tina_mutex);
299
 
      return NULL;
300
 
    }
301
 
 
302
 
    meta_file_name.assign(table_name);
303
 
    meta_file_name.append(CSM_EXT);
304
 
 
305
 
    if (stat(share->data_file_name.c_str(), &file_stat))
306
 
    {
307
 
      pthread_mutex_unlock(&tina_mutex);
308
 
      delete share;
309
 
      return NULL;
310
 
    }
311
 
  
 
264
    if (!drizzled::memory::multi_malloc(true,
 
265
                         &share, sizeof(*share),
 
266
                         &tmp_name, length+1,
 
267
                         NULL))
 
268
    {
 
269
      pthread_mutex_unlock(&tina_mutex);
 
270
      return NULL;
 
271
    }
 
272
 
 
273
    share->use_count= 0;
 
274
    share->table_name_length= length;
 
275
    share->table_name= tmp_name;
 
276
    share->crashed= false;
 
277
    share->rows_recorded= 0;
 
278
    share->update_file_opened= false;
 
279
    share->tina_write_opened= false;
 
280
    share->data_file_version= 0;
 
281
    strcpy(share->table_name, table_name);
 
282
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
283
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
284
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
285
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
286
 
 
287
    if (stat(share->data_file_name, &file_stat))
 
288
      goto error;
312
289
    share->saved_data_file_length= file_stat.st_size;
313
290
 
314
 
    a_tina->addOpenTable(share->table_name, share);
315
 
 
 
291
    if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
 
292
      goto error;
 
293
    thr_lock_init(&share->lock);
316
294
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
317
295
 
318
296
    /*
321
299
      Usually this will result in auto-repair, and we will get a good
322
300
      meta-file in the end.
323
301
    */
324
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
325
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
302
    if ((share->meta_file= my_open(meta_file_name,
 
303
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
326
304
      share->crashed= true;
327
305
 
328
306
    /*
336
314
  pthread_mutex_unlock(&tina_mutex);
337
315
 
338
316
  return share;
 
317
 
 
318
error:
 
319
  pthread_mutex_unlock(&tina_mutex);
 
320
  free((unsigned char*) share);
 
321
 
 
322
  return NULL;
339
323
}
340
324
 
341
325
 
358
342
    non-zero - error occurred
359
343
*/
360
344
 
361
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
345
static int read_meta_file(File meta_file, ha_rows *rows)
362
346
{
363
347
  unsigned char meta_buffer[META_BUFFER_SIZE];
364
348
  unsigned char *ptr= meta_buffer;
365
349
 
366
350
  lseek(meta_file, 0, SEEK_SET);
367
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
351
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
368
352
      != META_BUFFER_SIZE)
369
353
    return(HA_ERR_CRASHED_ON_USAGE);
370
354
 
386
370
      ((bool)(*ptr)== true))
387
371
    return(HA_ERR_CRASHED_ON_USAGE);
388
372
 
389
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
373
  my_sync(meta_file, MYF(MY_WME));
390
374
 
391
375
  return(0);
392
376
}
411
395
    non-zero - error occurred
412
396
*/
413
397
 
414
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
398
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
415
399
{
416
400
  unsigned char meta_buffer[META_BUFFER_SIZE];
417
401
  unsigned char *ptr= meta_buffer;
431
415
  *ptr= (unsigned char)dirty;
432
416
 
433
417
  lseek(meta_file, 0, SEEK_SET);
434
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
418
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
435
419
      != META_BUFFER_SIZE)
436
420
    return(-1);
437
421
 
438
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
422
  my_sync(meta_file, MYF(MY_WME));
439
423
 
440
424
  return(0);
441
425
}
450
434
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
451
435
 
452
436
  if ((share->tina_write_filedes=
453
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
437
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
454
438
  {
455
439
    share->crashed= true;
456
440
    return(1);
464
448
/*
465
449
  Free lock controls.
466
450
*/
467
 
int ha_tina::free_share()
 
451
static int free_share(TINA_SHARE *share)
468
452
{
469
453
  pthread_mutex_lock(&tina_mutex);
470
454
  int result_code= 0;
472
456
    /* Write the meta file. Mark it as crashed if needed. */
473
457
    (void)write_meta_file(share->meta_file, share->rows_recorded,
474
458
                          share->crashed ? true :false);
475
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
459
    if (my_close(share->meta_file, MYF(0)))
476
460
      result_code= 1;
477
461
    if (share->tina_write_opened)
478
462
    {
479
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
463
      if (my_close(share->tina_write_filedes, MYF(0)))
480
464
        result_code= 1;
481
465
      share->tina_write_opened= false;
482
466
    }
483
467
 
484
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
485
 
    a_tina->deleteOpenTable(share->table_name);
486
 
    delete share;
 
468
    hash_delete(&tina_open_tables, (unsigned char*) share);
 
469
    thr_lock_delete(&share->lock);
 
470
    pthread_mutex_destroy(&share->mutex);
 
471
    free((unsigned char*) share);
487
472
  }
488
473
  pthread_mutex_unlock(&tina_mutex);
489
474
 
530
515
 
531
516
 
532
517
 
533
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
 
518
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
534
519
  :Cursor(engine_arg, table_arg),
535
520
  /*
536
521
    These definitions are found in Cursor.h
537
522
    They are not probably completely right.
538
523
  */
539
524
  current_position(0), next_position(0), local_saved_data_file_length(0),
540
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
525
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
526
  local_data_file_version(0), records_is_known(0)
541
527
{
542
528
  /* Set our original buffers from pre-allocated memory */
543
529
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
530
  chain= chain_buffer;
544
531
  file_buff= new Transparent_file();
545
532
}
546
533
 
557
544
 
558
545
  buffer.length(0);
559
546
 
560
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
547
  for (Field **field=table->field ; *field ; field++)
561
548
  {
562
549
    const char *ptr;
563
550
    const char *end_ptr;
597
584
        {
598
585
          buffer.append('\\');
599
586
          buffer.append('"');
600
 
          (void) *ptr++;
 
587
          *ptr++;
601
588
        }
602
589
        else if (*ptr == '\r')
603
590
        {
604
591
          buffer.append('\\');
605
592
          buffer.append('r');
606
 
          (void) *ptr++;
 
593
          *ptr++;
607
594
        }
608
595
        else if (*ptr == '\\')
609
596
        {
610
597
          buffer.append('\\');
611
598
          buffer.append('\\');
612
 
          (void) *ptr++;
 
599
          *ptr++;
613
600
        }
614
601
        else if (*ptr == '\n')
615
602
        {
616
603
          buffer.append('\\');
617
604
          buffer.append('n');
618
 
          (void) *ptr++;
 
605
          *ptr++;
619
606
        }
620
607
        else
621
608
          buffer.append(*ptr++);
645
632
*/
646
633
int ha_tina::chain_append()
647
634
{
648
 
  if (chain.size() > 0 && chain.back().second == current_position)
649
 
    chain.back().second= next_position;
 
635
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
636
    (chain_ptr -1)->end= next_position;
650
637
  else
651
 
    chain.push_back(make_pair(current_position, next_position));
 
638
  {
 
639
    /* We set up for the next position */
 
640
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
641
    {
 
642
      off_t location= chain_ptr - chain;
 
643
      chain_size += DEFAULT_CHAIN_LENGTH;
 
644
      if (chain_alloced)
 
645
      {
 
646
        if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
 
647
          return -1;
 
648
      }
 
649
      else
 
650
      {
 
651
        tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
 
652
        if (ptr == NULL)
 
653
          return -1;
 
654
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
655
        chain= ptr;
 
656
        chain_alloced++;
 
657
      }
 
658
      chain_ptr= chain + location;
 
659
    }
 
660
    chain_ptr->begin= current_position;
 
661
    chain_ptr->end= next_position;
 
662
    chain_ptr++;
 
663
  }
 
664
 
652
665
  return 0;
653
666
}
654
667
 
662
675
  int eoln_len;
663
676
  int error;
664
677
 
665
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
678
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
666
679
 
667
680
  /*
668
681
    We do not read further then local_saved_data_file_length in order
675
688
 
676
689
  error= HA_ERR_CRASHED_ON_USAGE;
677
690
 
678
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
691
  memset(buf, 0, table->s->null_bytes);
679
692
 
680
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
693
  for (Field **field=table->field ; *field ; field++)
681
694
  {
682
695
    char curr_char;
683
696
 
746
759
    {
747
760
      /* This masks a bug in the logic for a SELECT * */
748
761
      (*field)->setWriteSet();
749
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
750
 
      {
 
762
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
763
                          CHECK_FIELD_WARN))
751
764
        goto err;
752
 
      }
753
765
 
754
766
      if ((*field)->flags & BLOB_FLAG)
755
767
      {
762
774
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
763
775
        if (src)
764
776
        {
765
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
 
777
          tgt= (unsigned char*) alloc_root(&blobroot, length);
766
778
          memmove(tgt, src, length);
767
779
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
768
780
        }
778
790
}
779
791
 
780
792
/*
 
793
  Three functions below are needed to enable concurrent insert functionality
 
794
  for CSV engine. For more details see mysys/thr_lock.c
 
795
*/
 
796
 
 
797
void tina_get_status(void* param, int)
 
798
{
 
799
  ha_tina *tina= (ha_tina*) param;
 
800
  tina->get_status();
 
801
}
 
802
 
 
803
void tina_update_status(void* param)
 
804
{
 
805
  ha_tina *tina= (ha_tina*) param;
 
806
  tina->update_status();
 
807
}
 
808
 
 
809
/* this should exist and return 0 for concurrent insert to work */
 
810
bool tina_check_status(void *)
 
811
{
 
812
  return 0;
 
813
}
 
814
 
 
815
/*
 
816
  Save the state of the table
 
817
 
 
818
  SYNOPSIS
 
819
    get_status()
 
820
 
 
821
  DESCRIPTION
 
822
    This function is used to retrieve the file length. During the lock
 
823
    phase of concurrent insert. For more details see comment to
 
824
    ha_tina::update_status below.
 
825
*/
 
826
 
 
827
void ha_tina::get_status()
 
828
{
 
829
  local_saved_data_file_length= share->saved_data_file_length;
 
830
}
 
831
 
 
832
 
 
833
/*
 
834
  Correct the state of the table. Called by unlock routines
 
835
  before the write lock is released.
 
836
 
 
837
  SYNOPSIS
 
838
    update_status()
 
839
 
 
840
  DESCRIPTION
 
841
    When we employ concurrent insert lock, we save current length of the file
 
842
    during the lock phase. We do not read further saved value, as we don't
 
843
    want to interfere with undergoing concurrent insert. Writers update file
 
844
    length info during unlock with update_status().
 
845
 
 
846
  NOTE
 
847
    For log tables concurrent insert works different. The reason is that
 
848
    log tables are always opened and locked. And as they do not unlock
 
849
    tables, the file length after writes should be updated in a different
 
850
    way.
 
851
*/
 
852
 
 
853
void ha_tina::update_status()
 
854
{
 
855
  /* correct local_saved_data_file_length for writers */
 
856
  share->saved_data_file_length= local_saved_data_file_length;
 
857
}
 
858
 
 
859
 
 
860
/*
781
861
  Open a database file. Keep in mind that tables are caches, so
782
862
  this will not be called for every request. Any sort of positions
783
863
  that need to be reset should be kept in the ::extra() call.
784
864
*/
785
 
int ha_tina::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
865
int ha_tina::open(const char *name, int, uint32_t open_options)
786
866
{
787
 
  if (not (share= get_share(identifier.getPath().c_str())))
 
867
  if (!(share= get_share(name, table)))
788
868
    return(ENOENT);
789
869
 
790
 
  if (share->crashed)
 
870
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
791
871
  {
792
 
    free_share();
 
872
    free_share(share);
793
873
    return(HA_ERR_CRASHED_ON_USAGE);
794
874
  }
795
875
 
796
876
  local_data_file_version= share->data_file_version;
797
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
877
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
798
878
    return(0);
799
879
 
800
880
  /*
802
882
    so that they could save/update local_saved_data_file_length value
803
883
    during locking. This is needed to enable concurrent inserts.
804
884
  */
 
885
  thr_lock_data_init(&share->lock, &lock, (void*) this);
805
886
  ref_length=sizeof(off_t);
806
887
 
 
888
  share->lock.get_status= tina_get_status;
 
889
  share->lock.update_status= tina_update_status;
 
890
  share->lock.check_status= tina_check_status;
 
891
 
807
892
  return(0);
808
893
}
809
894
 
 
895
 
810
896
/*
811
897
  Close a database file. We remove ourselves from the shared strucutre.
812
898
  If it is empty we destroy it.
814
900
int ha_tina::close(void)
815
901
{
816
902
  int rc= 0;
817
 
  rc= internal::my_close(data_file, MYF(0));
818
 
  return(free_share() || rc);
 
903
  rc= my_close(data_file, MYF(0));
 
904
  return(free_share(share) || rc);
819
905
}
820
906
 
821
907
/*
823
909
  of the file and appends the data. In an error case it really should
824
910
  just truncate to the original position (this is not done yet).
825
911
*/
826
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
912
int ha_tina::write_row(unsigned char * buf)
827
913
{
828
914
  int size;
829
915
 
830
916
  if (share->crashed)
831
917
      return(HA_ERR_CRASHED_ON_USAGE);
832
918
 
 
919
  ha_statistic_increment(&SSV::ha_write_count);
 
920
 
833
921
  size= encode_quote(buf);
834
922
 
835
923
  if (!share->tina_write_opened)
837
925
      return(-1);
838
926
 
839
927
   /* use pwrite, as concurrent reader could have changed the position */
840
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
928
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
841
929
               MYF(MY_WME | MY_NABP)))
842
930
    return(-1);
843
931
 
862
950
  if (!share->update_file_opened)
863
951
  {
864
952
    if ((update_temp_file=
865
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
953
           my_create(fn_format(updated_fname, share->table_name,
866
954
                               "", CSN_EXT,
867
955
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
868
956
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
881
969
  This will be called in a table scan right before the previous ::rnd_next()
882
970
  call.
883
971
*/
884
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
972
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
885
973
{
886
974
  int size;
887
975
  int rc= -1;
888
976
 
 
977
  ha_statistic_increment(&SSV::ha_update_count);
 
978
 
 
979
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
980
    table->timestamp_field->set_time();
 
981
 
889
982
  size= encode_quote(new_data);
890
983
 
891
984
  /*
892
985
    During update we mark each updating record as deleted
893
986
    (see the chain_append()) then write new one to the temporary data file.
894
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
987
    At the end of the sequence in the rnd_end() we append all non-marked
895
988
    records from the data file to the temporary data file then rename it.
896
989
    The temp_file_length is used to calculate new data file length.
897
990
  */
901
994
  if (open_update_temp_file_if_needed())
902
995
    goto err;
903
996
 
904
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
997
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
905
998
               MYF(MY_WME | MY_NABP)))
906
999
    goto err;
907
1000
  temp_file_length+= size;
921
1014
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
922
1015
  DESC, ASC).
923
1016
*/
924
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
1017
int ha_tina::delete_row(const unsigned char *)
925
1018
{
 
1019
  ha_statistic_increment(&SSV::ha_delete_count);
926
1020
 
927
1021
  if (chain_append())
928
1022
    return(-1);
956
1050
  if (local_data_file_version != share->data_file_version)
957
1051
  {
958
1052
    local_data_file_version= share->data_file_version;
959
 
    if (internal::my_close(data_file, MYF(0)) ||
960
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
1053
    if (my_close(data_file, MYF(0)) ||
 
1054
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
961
1055
      return 1;
962
1056
  }
963
1057
  file_buff->init_buff(data_file);
993
1087
 
994
1088
*/
995
1089
 
996
 
int ha_tina::doStartTableScan(bool)
 
1090
int ha_tina::rnd_init(bool)
997
1091
{
998
1092
  /* set buffer to the beginning of the file */
999
1093
  if (share->crashed || init_data_file())
1002
1096
  current_position= next_position= 0;
1003
1097
  stats.records= 0;
1004
1098
  records_is_known= 0;
1005
 
  chain.clear();
 
1099
  chain_ptr= chain;
1006
1100
 
1007
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1101
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1008
1102
 
1009
1103
  return(0);
1010
1104
}
1030
1124
  if (share->crashed)
1031
1125
      return(HA_ERR_CRASHED_ON_USAGE);
1032
1126
 
1033
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1127
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1034
1128
 
1035
1129
  current_position= next_position;
1036
1130
 
1056
1150
*/
1057
1151
void ha_tina::position(const unsigned char *)
1058
1152
{
1059
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1153
  my_store_ptr(ref, ref_length, current_position);
1060
1154
  return;
1061
1155
}
1062
1156
 
1063
1157
 
1064
1158
/*
1065
1159
  Used to fetch a row from a posiion stored with ::position().
1066
 
  internal::my_get_ptr() retrieves the data for you.
 
1160
  my_get_ptr() retrieves the data for you.
1067
1161
*/
1068
1162
 
1069
1163
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1070
1164
{
1071
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1072
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1165
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1166
  current_position= (off_t)my_get_ptr(pos,ref_length);
1073
1167
  return(find_current_row(buf));
1074
1168
}
1075
1169
 
1091
1185
  to the given "hole", stored in the buffer. "Valid" here means,
1092
1186
  not listed in the chain of deleted records ("holes").
1093
1187
*/
1094
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1188
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1095
1189
{
1096
 
  if (closest_hole == chain.end()) /* no more chains */
 
1190
  if (closest_hole == chain_ptr) /* no more chains */
1097
1191
    *end_pos= file_buff->end();
1098
1192
  else
1099
1193
    *end_pos= std::min(file_buff->end(),
1100
 
                       closest_hole->first);
1101
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1194
                       closest_hole->begin);
 
1195
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1102
1196
}
1103
1197
 
1104
1198
 
1108
1202
  slots to clean up all of the dead space we have collected while
1109
1203
  performing deletes/updates.
1110
1204
*/
1111
 
int ha_tina::doEndTableScan()
 
1205
int ha_tina::rnd_end()
1112
1206
{
 
1207
  char updated_fname[FN_REFLEN];
1113
1208
  off_t file_buffer_start= 0;
1114
1209
 
1115
 
  blobroot.free_root(MYF(0));
 
1210
  free_root(&blobroot, MYF(0));
1116
1211
  records_is_known= 1;
1117
1212
 
1118
 
  if (chain.size() > 0)
 
1213
  if ((chain_ptr - chain)  > 0)
1119
1214
  {
1120
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1215
    tina_set *ptr= chain;
1121
1216
 
1122
1217
    /*
1123
1218
      Re-read the beginning of a file (as the buffer should point to the
1129
1224
      The sort is needed when there were updates/deletes with random orders.
1130
1225
      It sorts so that we move the firts blocks to the beginning.
1131
1226
    */
1132
 
    sort(chain.begin(), chain.end());
 
1227
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1228
             (qsort_cmp)sort_set);
1133
1229
 
1134
1230
    off_t write_begin= 0, write_end;
1135
1231
 
1150
1246
      /* if there is something to write, write it */
1151
1247
      if (write_length)
1152
1248
      {
1153
 
        if (internal::my_write(update_temp_file,
 
1249
        if (my_write(update_temp_file,
1154
1250
                     (unsigned char*) (file_buff->ptr() +
1155
1251
                               (write_begin - file_buff->start())),
1156
1252
                     (size_t)write_length, MYF_RW))
1160
1256
      if (in_hole)
1161
1257
      {
1162
1258
        /* skip hole */
1163
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1259
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1164
1260
          file_buffer_start= file_buff->read_next();
1165
 
        write_begin= ptr->second;
1166
 
        ++ptr;
 
1261
        write_begin= ptr->end;
 
1262
        ptr++;
1167
1263
      }
1168
1264
      else
1169
1265
        write_begin= write_end;
1173
1269
 
1174
1270
    }
1175
1271
 
1176
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1177
 
        internal::my_close(update_temp_file, MYF(0)))
 
1272
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1273
        my_close(update_temp_file, MYF(0)))
1178
1274
      return(-1);
1179
1275
 
1180
1276
    share->update_file_opened= false;
1181
1277
 
1182
1278
    if (share->tina_write_opened)
1183
1279
    {
1184
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1280
      if (my_close(share->tina_write_filedes, MYF(0)))
1185
1281
        return(-1);
1186
1282
      /*
1187
1283
        Mark that the writer fd is closed, so that init_tina_writer()
1194
1290
      Close opened fildes's. Then move updated file in place
1195
1291
      of the old datafile.
1196
1292
    */
1197
 
    std::string rename_file= share->table_name;
1198
 
    rename_file.append(CSN_EXT);
1199
 
    if (internal::my_close(data_file, MYF(0)) ||
1200
 
        internal::my_rename(rename_file.c_str(),
1201
 
                            share->data_file_name.c_str(), MYF(0)))
 
1293
    if (my_close(data_file, MYF(0)) ||
 
1294
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1295
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1296
                  share->data_file_name, MYF(0)))
1202
1297
      return(-1);
1203
1298
 
1204
1299
    /* Open the file again */
1205
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1300
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1206
1301
      return(-1);
1207
1302
    /*
1208
1303
      As we reopened the data file, increase share->data_file_version
1229
1324
 
1230
1325
  return(0);
1231
1326
error:
1232
 
  internal::my_close(update_temp_file, MYF(0));
 
1327
  my_close(update_temp_file, MYF(0));
1233
1328
  share->update_file_opened= false;
1234
1329
  return(-1);
1235
1330
}
1244
1339
  int rc;
1245
1340
 
1246
1341
  if (!records_is_known)
1247
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1342
    return(my_errno=HA_ERR_WRONG_COMMAND);
1248
1343
 
1249
1344
  if (!share->tina_write_opened)
1250
1345
    if (init_tina_writer())
1267
1362
  this (the database will call ::open() if it needs to).
1268
1363
*/
1269
1364
 
1270
 
int Tina::doCreateTable(Session &session,
 
1365
int Tina::doCreateTable(Session *, const char *table_name,
1271
1366
                        Table& table_arg,
1272
 
                        const drizzled::TableIdentifier &identifier,
1273
 
                        drizzled::message::Table &create_proto)
 
1367
                        drizzled::message::Table& create_proto)
1274
1368
{
1275
1369
  char name_buff[FN_REFLEN];
1276
 
  int create_file;
 
1370
  File create_file;
1277
1371
 
1278
1372
  /*
1279
1373
    check columns
1280
1374
  */
1281
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1282
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1283
 
       iter != fields.end();
1284
 
       iter++)
 
1375
  for (Field **field= table_arg.s->field; *field; field++)
1285
1376
  {
1286
 
    if (not *iter) // Historical legacy for NULL array end.
1287
 
      continue;
1288
 
 
1289
 
    if ((*iter)->real_maybe_null())
 
1377
    if ((*field)->real_maybe_null())
1290
1378
    {
1291
1379
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1292
1380
      return(HA_ERR_UNSUPPORTED);
1294
1382
  }
1295
1383
 
1296
1384
 
1297
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1298
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1299
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1385
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
 
1386
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1387
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1300
1388
    return(-1);
1301
1389
 
1302
1390
  write_meta_file(create_file, 0, false);
1303
 
  internal::my_close(create_file, MYF(0));
 
1391
  my_close(create_file, MYF(0));
1304
1392
 
1305
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1306
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1307
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1393
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
 
1394
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1395
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1308
1396
    return(-1);
1309
1397
 
1310
 
  internal::my_close(create_file, MYF(0));
 
1398
  my_close(create_file, MYF(0));
1311
1399
 
1312
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
 
1400
  pthread_mutex_lock(&proto_cache_mutex);
 
1401
  proto_cache.insert(make_pair(table_name, create_proto));
 
1402
  pthread_mutex_unlock(&proto_cache_mutex);
1313
1403
 
1314
1404
  return 0;
1315
1405
}
1316
1406
 
1317
1407
 
1318
 
DRIZZLE_DECLARE_PLUGIN
 
1408
drizzle_declare_plugin
1319
1409
{
1320
 
  DRIZZLE_VERSION_ID,
1321
1410
  "CSV",
1322
1411
  "1.0",
1323
1412
  "Brian Aker, MySQL AB",
1324
1413
  "CSV storage engine",
1325
1414
  PLUGIN_LICENSE_GPL,
1326
1415
  tina_init_func, /* Plugin Init */
 
1416
  tina_done_func, /* Plugin Deinit */
 
1417
  NULL,                       /* status variables                */
1327
1418
  NULL,                       /* system variables                */
1328
1419
  NULL                        /* config options                  */
1329
1420
}
1330
 
DRIZZLE_DECLARE_PLUGIN_END;
 
1421
drizzle_declare_plugin_end;
1331
1422