~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/csv/ha_tina.cc

  • Committer: Monty Taylor
  • Date: 2010-01-12 21:34:24 UTC
  • mto: This revision was merged to the branch mainline in revision 1268.
  • Revision ID: mordred@inaugust.com-20100112213424-6mslywtlca49mvnk
Updated to pandora-buld v0.94

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include <config.h>
 
43
#include "config.h"
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
 
46
#include <drizzled/field/timestamp.h>
46
47
#include <drizzled/error.h>
47
48
#include <drizzled/table.h>
48
49
#include <drizzled/session.h>
49
 
#include <drizzled/internal/my_sys.h>
 
50
#include "drizzled/internal/my_sys.h"
50
51
 
51
52
#include "ha_tina.h"
52
53
 
53
54
#include <fcntl.h>
54
55
 
55
 
#include <algorithm>
56
 
#include <vector>
57
56
#include <string>
58
57
#include <map>
59
58
 
60
59
using namespace std;
61
 
using namespace drizzled;
62
60
 
63
61
/*
64
62
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
65
63
*/
66
 
static const int META_BUFFER_SIZE = sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t)
67
 
  + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char);
68
 
static const int TINA_CHECK_HEADER = 254; // The number we use to determine corruption
69
 
static const int BLOB_MEMROOT_ALLOC_SIZE = 8192;
 
64
#define META_BUFFER_SIZE sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t) \
 
65
  + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char)
 
66
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
 
67
#define BLOB_MEMROOT_ALLOC_SIZE 8192
70
68
 
71
69
/* The file extension */
72
 
static const char* CSV_EXT = ".CSV"               // The data file
73
 
static const char* CSN_EXT = ".CSN"               // Files used during repair and update
74
 
static const char* CSM_EXT = ".CSM"               // Meta file
 
70
#define CSV_EXT ".CSV"               // The data file
 
71
#define CSN_EXT ".CSN"               // Files used during repair and update
 
72
#define CSM_EXT ".CSM"               // Meta file
75
73
 
76
74
 
77
75
static int read_meta_file(int meta_file, ha_rows *rows);
78
76
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
79
77
 
 
78
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
79
extern "C" void tina_update_status(void* param);
 
80
extern "C" bool tina_check_status(void* param);
 
81
 
80
82
/* Stuff for shares */
81
83
pthread_mutex_t tina_mutex;
82
84
 
85
87
 *****************************************************************************/
86
88
 
87
89
/*
 
90
  Used for sorting chains with qsort().
 
91
*/
 
92
static int sort_set (tina_set *a, tina_set *b)
 
93
{
 
94
  /*
 
95
    We assume that intervals do not intersect. So, it is enought to compare
 
96
    any two points. Here we take start of intervals for comparison.
 
97
  */
 
98
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
99
}
 
100
 
 
101
 
 
102
/*
88
103
  If frm_error() is called in table.cc this is called to find out what file
89
104
  extensions exist for this Cursor.
90
105
*/
103
118
   : drizzled::plugin::StorageEngine(name_arg,
104
119
                                     HTON_TEMPORARY_ONLY |
105
120
                                     HTON_NO_AUTO_INCREMENT |
106
 
                                     HTON_SKIP_STORE_LOCK),
 
121
                                     HTON_HAS_DATA_DICTIONARY |
 
122
                                     HTON_SKIP_STORE_LOCK |
 
123
                                     HTON_FILE_BASED),
107
124
    tina_open_tables()
108
125
  {}
109
 
  virtual ~Tina()
110
 
  {
111
 
    pthread_mutex_destroy(&tina_mutex);
112
 
  }
113
 
 
114
 
  virtual Cursor *create(Table &table)
115
 
  {
116
 
    return new ha_tina(*this, table);
 
126
  virtual Cursor *create(TableShare &table,
 
127
                         drizzled::memory::Root *mem_root)
 
128
  {
 
129
    return new (mem_root) ha_tina(*this, table);
117
130
  }
118
131
 
119
132
  const char **bas_ext() const {
120
133
    return ha_tina_exts;
121
134
  }
122
135
 
123
 
  int doCreateTable(Session &,
124
 
                    Table &table_arg,
125
 
                    const drizzled::identifier::Table &identifier,
 
136
  int doCreateTable(Session *,
 
137
                    const char *table_name,
 
138
                    Table& table_arg,
126
139
                    drizzled::message::Table&);
127
140
 
128
141
  int doGetTableDefinition(Session& session,
129
 
                           const drizzled::identifier::Table &identifier,
130
 
                           drizzled::message::Table &table_message);
131
 
 
132
 
  int doDropTable(Session&, const drizzled::identifier::Table &identifier);
 
142
                           const char* path,
 
143
                           const char *db,
 
144
                           const char *table_name,
 
145
                           const bool is_tmp,
 
146
                           drizzled::message::Table *table_proto);
 
147
 
 
148
  /* Temp only engine, so do not return values. */
 
149
  void doGetTableNames(drizzled::CachedDirectory &, string& , set<string>&) { };
 
150
 
 
151
  int doDropTable(Session&, const string table_path);
133
152
  TinaShare *findOpenTable(const string table_name);
134
153
  void addOpenTable(const string &table_name, TinaShare *);
135
154
  void deleteOpenTable(const string &table_name);
138
157
  uint32_t max_keys()          const { return 0; }
139
158
  uint32_t max_key_parts()     const { return 0; }
140
159
  uint32_t max_key_length()    const { return 0; }
141
 
  bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
 
  int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
143
 
 
144
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
 
                             const drizzled::identifier::Schema &schema_identifier,
146
 
                             drizzled::identifier::Table::vector &set_of_identifiers);
147
160
};
148
161
 
149
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
 
                                 const drizzled::identifier::Schema&,
151
 
                                 drizzled::identifier::Table::vector&)
152
 
{
153
 
}
154
 
 
155
 
int Tina::doRenameTable(Session &session,
156
 
                        const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
157
 
{
158
 
  int error= 0;
159
 
  for (const char **ext= bas_ext(); *ext ; ext++)
160
 
  {
161
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
162
 
    {
163
 
      if ((error=errno) != ENOENT)
164
 
        break;
165
 
      error= 0;
166
 
    }
167
 
  }
168
 
 
169
 
  session.getMessageCache().renameTableMessage(from, to);
170
 
 
171
 
  return error;
172
 
}
173
 
 
174
 
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
175
 
{
176
 
  return session.getMessageCache().doesTableMessageExist(identifier);
177
 
}
178
 
 
179
 
 
180
 
int Tina::doDropTable(Session &session,
181
 
                      const drizzled::identifier::Table &identifier)
 
162
int Tina::doDropTable(Session&,
 
163
                        const string table_path)
182
164
{
183
165
  int error= 0;
184
166
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
 
167
  char buff[FN_REFLEN];
 
168
  ProtoCache::iterator iter;
185
169
 
186
170
  for (const char **ext= bas_ext(); *ext ; ext++)
187
171
  {
188
 
    std::string full_name= identifier.getPath();
189
 
    full_name.append(*ext);
190
 
 
191
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
 
172
    fn_format(buff, table_path.c_str(), "", *ext,
 
173
              MY_UNPACK_FILENAME|MY_APPEND_EXT);
 
174
    if (my_delete_with_symlink(buff, MYF(0)))
192
175
    {
193
176
      if ((error= errno) != ENOENT)
194
177
        break;
195
178
    }
196
179
    else
197
 
    {
198
180
      enoent_or_zero= 0;                        // No error for ENOENT
199
 
    }
200
181
    error= enoent_or_zero;
201
182
  }
202
183
 
203
 
  session.getMessageCache().removeTableMessage(identifier);
 
184
  pthread_mutex_lock(&proto_cache_mutex);
 
185
  iter= proto_cache.find(table_path.c_str());
 
186
 
 
187
  if (iter!= proto_cache.end())
 
188
    proto_cache.erase(iter);
 
189
  pthread_mutex_unlock(&proto_cache_mutex);
204
190
 
205
191
  return error;
206
192
}
227
213
}
228
214
 
229
215
 
230
 
int Tina::doGetTableDefinition(Session &session,
231
 
                               const drizzled::identifier::Table &identifier,
232
 
                               drizzled::message::Table &table_message)
 
216
int Tina::doGetTableDefinition(Session&,
 
217
                               const char* path,
 
218
                               const char *,
 
219
                               const char *,
 
220
                               const bool,
 
221
                               drizzled::message::Table *table_proto)
233
222
{
234
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
235
 
    return EEXIST;
236
 
 
237
 
  return ENOENT;
 
223
  int error= ENOENT;
 
224
  ProtoCache::iterator iter;
 
225
 
 
226
  pthread_mutex_lock(&proto_cache_mutex);
 
227
  iter= proto_cache.find(path);
 
228
 
 
229
  if (iter!= proto_cache.end())
 
230
  {
 
231
    if (table_proto)
 
232
      table_proto->CopyFrom(((*iter).second));
 
233
    error= EEXIST;
 
234
  }
 
235
  pthread_mutex_unlock(&proto_cache_mutex);
 
236
 
 
237
  return error;
238
238
}
239
239
 
240
240
 
241
241
static Tina *tina_engine= NULL;
242
242
 
243
 
static int tina_init_func(drizzled::module::Context &context)
 
243
static int tina_init_func(drizzled::plugin::Registry &registry)
244
244
{
245
245
 
246
246
  tina_engine= new Tina("CSV");
247
 
  context.add(tina_engine);
 
247
  registry.add(tina_engine);
248
248
 
249
249
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
250
250
  return 0;
251
251
}
252
252
 
253
 
 
254
 
 
255
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
256
 
  table_name(table_name_arg),
257
 
  data_file_name(table_name_arg),
258
 
  use_count(0),
259
 
  saved_data_file_length(0),
260
 
  update_file_opened(false),
261
 
  tina_write_opened(false),
262
 
  crashed(false),
263
 
  rows_recorded(0),
264
 
  data_file_version(0)
265
 
{
266
 
  data_file_name.append(CSV_EXT);
 
253
static int tina_done_func(drizzled::plugin::Registry &registry)
 
254
{
 
255
  registry.remove(tina_engine);
 
256
  delete tina_engine;
 
257
 
 
258
  pthread_mutex_destroy(&tina_mutex);
 
259
 
 
260
  return 0;
 
261
}
 
262
 
 
263
 
 
264
TinaShare::TinaShare(const char *table_name_arg)
 
265
  : table_name(table_name_arg), use_count(0), saved_data_file_length(0),
 
266
    update_file_opened(false), tina_write_opened(false),
 
267
    crashed(false), rows_recorded(0), data_file_version(0)
 
268
{
 
269
  thr_lock_init(&lock);
 
270
  fn_format(data_file_name, table_name_arg, "", CSV_EXT,
 
271
            MY_REPLACE_EXT|MY_UNPACK_FILENAME);
267
272
}
268
273
 
269
274
TinaShare::~TinaShare()
270
275
{
 
276
  thr_lock_delete(&lock);
271
277
  pthread_mutex_destroy(&mutex);
272
278
}
273
279
 
274
280
/*
275
281
  Simple lock controls.
276
282
*/
277
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
283
TinaShare *ha_tina::get_share(const char *table_name)
278
284
{
279
285
  pthread_mutex_lock(&tina_mutex);
280
286
 
281
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
 
287
  Tina *a_tina= static_cast<Tina *>(engine);
282
288
  share= a_tina->findOpenTable(table_name);
283
289
 
284
 
  std::string meta_file_name;
 
290
  char meta_file_name[FN_REFLEN];
285
291
  struct stat file_stat;
286
292
 
287
293
  /*
298
304
      return NULL;
299
305
    }
300
306
 
301
 
    meta_file_name.assign(table_name);
302
 
    meta_file_name.append(CSM_EXT);
 
307
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
308
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
303
309
 
304
 
    if (stat(share->data_file_name.c_str(), &file_stat))
 
310
    if (stat(share->data_file_name, &file_stat))
305
311
    {
306
312
      pthread_mutex_unlock(&tina_mutex);
307
313
      delete share;
320
326
      Usually this will result in auto-repair, and we will get a good
321
327
      meta-file in the end.
322
328
    */
323
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
324
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
329
    if ((share->meta_file= my_open(meta_file_name,
 
330
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
325
331
      share->crashed= true;
326
332
 
327
333
    /*
363
369
  unsigned char *ptr= meta_buffer;
364
370
 
365
371
  lseek(meta_file, 0, SEEK_SET);
366
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
372
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
367
373
      != META_BUFFER_SIZE)
368
374
    return(HA_ERR_CRASHED_ON_USAGE);
369
375
 
385
391
      ((bool)(*ptr)== true))
386
392
    return(HA_ERR_CRASHED_ON_USAGE);
387
393
 
388
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
394
  my_sync(meta_file, MYF(MY_WME));
389
395
 
390
396
  return(0);
391
397
}
430
436
  *ptr= (unsigned char)dirty;
431
437
 
432
438
  lseek(meta_file, 0, SEEK_SET);
433
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
439
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
434
440
      != META_BUFFER_SIZE)
435
441
    return(-1);
436
442
 
437
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
443
  my_sync(meta_file, MYF(MY_WME));
438
444
 
439
445
  return(0);
440
446
}
449
455
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
450
456
 
451
457
  if ((share->tina_write_filedes=
452
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
458
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
453
459
  {
454
460
    share->crashed= true;
455
461
    return(1);
471
477
    /* Write the meta file. Mark it as crashed if needed. */
472
478
    (void)write_meta_file(share->meta_file, share->rows_recorded,
473
479
                          share->crashed ? true :false);
474
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
480
    if (my_close(share->meta_file, MYF(0)))
475
481
      result_code= 1;
476
482
    if (share->tina_write_opened)
477
483
    {
478
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
484
      if (my_close(share->tina_write_filedes, MYF(0)))
479
485
        result_code= 1;
480
486
      share->tina_write_opened= false;
481
487
    }
482
488
 
483
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
 
489
    Tina *a_tina= static_cast<Tina *>(engine);
484
490
    a_tina->deleteOpenTable(share->table_name);
485
491
    delete share;
486
492
  }
529
535
 
530
536
 
531
537
 
532
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
 
538
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
533
539
  :Cursor(engine_arg, table_arg),
534
540
  /*
535
541
    These definitions are found in Cursor.h
536
542
    They are not probably completely right.
537
543
  */
538
544
  current_position(0), next_position(0), local_saved_data_file_length(0),
539
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
545
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
546
  local_data_file_version(0), records_is_known(0)
540
547
{
541
548
  /* Set our original buffers from pre-allocated memory */
542
549
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
550
  chain= chain_buffer;
543
551
  file_buff= new Transparent_file();
544
552
}
545
553
 
556
564
 
557
565
  buffer.length(0);
558
566
 
559
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
567
  for (Field **field=table->field ; *field ; field++)
560
568
  {
561
569
    const char *ptr;
562
570
    const char *end_ptr;
596
604
        {
597
605
          buffer.append('\\');
598
606
          buffer.append('"');
599
 
          (void) *ptr++;
 
607
          *ptr++;
600
608
        }
601
609
        else if (*ptr == '\r')
602
610
        {
603
611
          buffer.append('\\');
604
612
          buffer.append('r');
605
 
          (void) *ptr++;
 
613
          *ptr++;
606
614
        }
607
615
        else if (*ptr == '\\')
608
616
        {
609
617
          buffer.append('\\');
610
618
          buffer.append('\\');
611
 
          (void) *ptr++;
 
619
          *ptr++;
612
620
        }
613
621
        else if (*ptr == '\n')
614
622
        {
615
623
          buffer.append('\\');
616
624
          buffer.append('n');
617
 
          (void) *ptr++;
 
625
          *ptr++;
618
626
        }
619
627
        else
620
628
          buffer.append(*ptr++);
644
652
*/
645
653
int ha_tina::chain_append()
646
654
{
647
 
  if (chain.size() > 0 && chain.back().second == current_position)
648
 
    chain.back().second= next_position;
 
655
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
656
    (chain_ptr -1)->end= next_position;
649
657
  else
650
 
    chain.push_back(make_pair(current_position, next_position));
 
658
  {
 
659
    /* We set up for the next position */
 
660
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
661
    {
 
662
      off_t location= chain_ptr - chain;
 
663
      chain_size += DEFAULT_CHAIN_LENGTH;
 
664
      if (chain_alloced)
 
665
      {
 
666
        if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
 
667
          return -1;
 
668
      }
 
669
      else
 
670
      {
 
671
        tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
 
672
        if (ptr == NULL)
 
673
          return -1;
 
674
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
675
        chain= ptr;
 
676
        chain_alloced++;
 
677
      }
 
678
      chain_ptr= chain + location;
 
679
    }
 
680
    chain_ptr->begin= current_position;
 
681
    chain_ptr->end= next_position;
 
682
    chain_ptr++;
 
683
  }
 
684
 
651
685
  return 0;
652
686
}
653
687
 
661
695
  int eoln_len;
662
696
  int error;
663
697
 
664
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
698
  free_root(&blobroot, MYF(drizzled::memory::MARK_BLOCKS_FREE));
665
699
 
666
700
  /*
667
701
    We do not read further then local_saved_data_file_length in order
674
708
 
675
709
  error= HA_ERR_CRASHED_ON_USAGE;
676
710
 
677
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
711
  memset(buf, 0, table->s->null_bytes);
678
712
 
679
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
713
  for (Field **field=table->field ; *field ; field++)
680
714
  {
681
715
    char curr_char;
682
716
 
745
779
    {
746
780
      /* This masks a bug in the logic for a SELECT * */
747
781
      (*field)->setWriteSet();
748
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
749
 
      {
 
782
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
783
                          CHECK_FIELD_WARN))
750
784
        goto err;
751
 
      }
752
785
 
753
786
      if ((*field)->flags & BLOB_FLAG)
754
787
      {
761
794
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
762
795
        if (src)
763
796
        {
764
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
 
797
          tgt= (unsigned char*) alloc_root(&blobroot, length);
765
798
          memmove(tgt, src, length);
766
799
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
767
800
        }
777
810
}
778
811
 
779
812
/*
 
813
  Three functions below are needed to enable concurrent insert functionality
 
814
  for CSV engine. For more details see mysys/thr_lock.c
 
815
*/
 
816
 
 
817
void tina_get_status(void* param, int)
 
818
{
 
819
  ha_tina *tina= (ha_tina*) param;
 
820
  tina->get_status();
 
821
}
 
822
 
 
823
void tina_update_status(void* param)
 
824
{
 
825
  ha_tina *tina= (ha_tina*) param;
 
826
  tina->update_status();
 
827
}
 
828
 
 
829
/* this should exist and return 0 for concurrent insert to work */
 
830
bool tina_check_status(void *)
 
831
{
 
832
  return 0;
 
833
}
 
834
 
 
835
/*
 
836
  Save the state of the table
 
837
 
 
838
  SYNOPSIS
 
839
    get_status()
 
840
 
 
841
  DESCRIPTION
 
842
    This function is used to retrieve the file length. During the lock
 
843
    phase of concurrent insert. For more details see comment to
 
844
    ha_tina::update_status below.
 
845
*/
 
846
 
 
847
void ha_tina::get_status()
 
848
{
 
849
  local_saved_data_file_length= share->saved_data_file_length;
 
850
}
 
851
 
 
852
 
 
853
/*
 
854
  Correct the state of the table. Called by unlock routines
 
855
  before the write lock is released.
 
856
 
 
857
  SYNOPSIS
 
858
    update_status()
 
859
 
 
860
  DESCRIPTION
 
861
    When we employ concurrent insert lock, we save current length of the file
 
862
    during the lock phase. We do not read further saved value, as we don't
 
863
    want to interfere with undergoing concurrent insert. Writers update file
 
864
    length info during unlock with update_status().
 
865
 
 
866
  NOTE
 
867
    For log tables concurrent insert works different. The reason is that
 
868
    log tables are always opened and locked. And as they do not unlock
 
869
    tables, the file length after writes should be updated in a different
 
870
    way.
 
871
*/
 
872
 
 
873
void ha_tina::update_status()
 
874
{
 
875
  /* correct local_saved_data_file_length for writers */
 
876
  share->saved_data_file_length= local_saved_data_file_length;
 
877
}
 
878
 
 
879
 
 
880
/*
780
881
  Open a database file. Keep in mind that tables are caches, so
781
882
  this will not be called for every request. Any sort of positions
782
883
  that need to be reset should be kept in the ::extra() call.
783
884
*/
784
 
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
 
885
int ha_tina::open(const char *name, int, uint32_t)
785
886
{
786
 
  if (not (share= get_share(identifier.getPath().c_str())))
 
887
  if (!(share= get_share(name)))
787
888
    return(ENOENT);
788
889
 
789
890
  if (share->crashed)
793
894
  }
794
895
 
795
896
  local_data_file_version= share->data_file_version;
796
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
897
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
797
898
    return(0);
798
899
 
799
900
  /*
801
902
    so that they could save/update local_saved_data_file_length value
802
903
    during locking. This is needed to enable concurrent inserts.
803
904
  */
 
905
  thr_lock_data_init(&share->lock, &lock, (void*) this);
804
906
  ref_length=sizeof(off_t);
805
907
 
 
908
  share->lock.get_status= tina_get_status;
 
909
  share->lock.update_status= tina_update_status;
 
910
  share->lock.check_status= tina_check_status;
 
911
 
806
912
  return(0);
807
913
}
808
914
 
 
915
 
809
916
/*
810
917
  Close a database file. We remove ourselves from the shared strucutre.
811
918
  If it is empty we destroy it.
813
920
int ha_tina::close(void)
814
921
{
815
922
  int rc= 0;
816
 
  rc= internal::my_close(data_file, MYF(0));
 
923
  rc= my_close(data_file, MYF(0));
817
924
  return(free_share() || rc);
818
925
}
819
926
 
822
929
  of the file and appends the data. In an error case it really should
823
930
  just truncate to the original position (this is not done yet).
824
931
*/
825
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
932
int ha_tina::write_row(unsigned char * buf)
826
933
{
827
934
  int size;
828
935
 
829
936
  if (share->crashed)
830
937
      return(HA_ERR_CRASHED_ON_USAGE);
831
938
 
 
939
  ha_statistic_increment(&SSV::ha_write_count);
 
940
 
832
941
  size= encode_quote(buf);
833
942
 
834
943
  if (!share->tina_write_opened)
836
945
      return(-1);
837
946
 
838
947
   /* use pwrite, as concurrent reader could have changed the position */
839
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
948
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
840
949
               MYF(MY_WME | MY_NABP)))
841
950
    return(-1);
842
951
 
861
970
  if (!share->update_file_opened)
862
971
  {
863
972
    if ((update_temp_file=
864
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
973
           my_create(fn_format(updated_fname, share->table_name.c_str(),
865
974
                               "", CSN_EXT,
866
975
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
867
976
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
880
989
  This will be called in a table scan right before the previous ::rnd_next()
881
990
  call.
882
991
*/
883
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
992
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
884
993
{
885
994
  int size;
886
995
  int rc= -1;
887
996
 
 
997
  ha_statistic_increment(&SSV::ha_update_count);
 
998
 
 
999
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
1000
    table->timestamp_field->set_time();
 
1001
 
888
1002
  size= encode_quote(new_data);
889
1003
 
890
1004
  /*
891
1005
    During update we mark each updating record as deleted
892
1006
    (see the chain_append()) then write new one to the temporary data file.
893
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
1007
    At the end of the sequence in the rnd_end() we append all non-marked
894
1008
    records from the data file to the temporary data file then rename it.
895
1009
    The temp_file_length is used to calculate new data file length.
896
1010
  */
900
1014
  if (open_update_temp_file_if_needed())
901
1015
    goto err;
902
1016
 
903
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
1017
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
904
1018
               MYF(MY_WME | MY_NABP)))
905
1019
    goto err;
906
1020
  temp_file_length+= size;
920
1034
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
921
1035
  DESC, ASC).
922
1036
*/
923
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
1037
int ha_tina::delete_row(const unsigned char *)
924
1038
{
 
1039
  ha_statistic_increment(&SSV::ha_delete_count);
925
1040
 
926
1041
  if (chain_append())
927
1042
    return(-1);
955
1070
  if (local_data_file_version != share->data_file_version)
956
1071
  {
957
1072
    local_data_file_version= share->data_file_version;
958
 
    if (internal::my_close(data_file, MYF(0)) ||
959
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
1073
    if (my_close(data_file, MYF(0)) ||
 
1074
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
960
1075
      return 1;
961
1076
  }
962
1077
  file_buff->init_buff(data_file);
992
1107
 
993
1108
*/
994
1109
 
995
 
int ha_tina::doStartTableScan(bool)
 
1110
int ha_tina::rnd_init(bool)
996
1111
{
997
1112
  /* set buffer to the beginning of the file */
998
1113
  if (share->crashed || init_data_file())
1001
1116
  current_position= next_position= 0;
1002
1117
  stats.records= 0;
1003
1118
  records_is_known= 0;
1004
 
  chain.clear();
 
1119
  chain_ptr= chain;
1005
1120
 
1006
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1121
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE);
1007
1122
 
1008
1123
  return(0);
1009
1124
}
1029
1144
  if (share->crashed)
1030
1145
      return(HA_ERR_CRASHED_ON_USAGE);
1031
1146
 
1032
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1147
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1033
1148
 
1034
1149
  current_position= next_position;
1035
1150
 
1055
1170
*/
1056
1171
void ha_tina::position(const unsigned char *)
1057
1172
{
1058
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1173
  my_store_ptr(ref, ref_length, current_position);
1059
1174
  return;
1060
1175
}
1061
1176
 
1062
1177
 
1063
1178
/*
1064
1179
  Used to fetch a row from a posiion stored with ::position().
1065
 
  internal::my_get_ptr() retrieves the data for you.
 
1180
  my_get_ptr() retrieves the data for you.
1066
1181
*/
1067
1182
 
1068
1183
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1069
1184
{
1070
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1071
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1185
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1186
  current_position= (off_t)my_get_ptr(pos,ref_length);
1072
1187
  return(find_current_row(buf));
1073
1188
}
1074
1189
 
1090
1205
  to the given "hole", stored in the buffer. "Valid" here means,
1091
1206
  not listed in the chain of deleted records ("holes").
1092
1207
*/
1093
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1208
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1094
1209
{
1095
 
  if (closest_hole == chain.end()) /* no more chains */
 
1210
  if (closest_hole == chain_ptr) /* no more chains */
1096
1211
    *end_pos= file_buff->end();
1097
1212
  else
1098
1213
    *end_pos= std::min(file_buff->end(),
1099
 
                       closest_hole->first);
1100
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1214
                       closest_hole->begin);
 
1215
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1101
1216
}
1102
1217
 
1103
1218
 
1107
1222
  slots to clean up all of the dead space we have collected while
1108
1223
  performing deletes/updates.
1109
1224
*/
1110
 
int ha_tina::doEndTableScan()
 
1225
int ha_tina::rnd_end()
1111
1226
{
 
1227
  char updated_fname[FN_REFLEN];
1112
1228
  off_t file_buffer_start= 0;
1113
1229
 
1114
 
  blobroot.free_root(MYF(0));
 
1230
  free_root(&blobroot, MYF(0));
1115
1231
  records_is_known= 1;
1116
1232
 
1117
 
  if (chain.size() > 0)
 
1233
  if ((chain_ptr - chain)  > 0)
1118
1234
  {
1119
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1235
    tina_set *ptr= chain;
1120
1236
 
1121
1237
    /*
1122
1238
      Re-read the beginning of a file (as the buffer should point to the
1128
1244
      The sort is needed when there were updates/deletes with random orders.
1129
1245
      It sorts so that we move the firts blocks to the beginning.
1130
1246
    */
1131
 
    sort(chain.begin(), chain.end());
 
1247
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1248
             (qsort_cmp)sort_set);
1132
1249
 
1133
1250
    off_t write_begin= 0, write_end;
1134
1251
 
1149
1266
      /* if there is something to write, write it */
1150
1267
      if (write_length)
1151
1268
      {
1152
 
        if (internal::my_write(update_temp_file,
 
1269
        if (my_write(update_temp_file,
1153
1270
                     (unsigned char*) (file_buff->ptr() +
1154
1271
                               (write_begin - file_buff->start())),
1155
1272
                     (size_t)write_length, MYF_RW))
1159
1276
      if (in_hole)
1160
1277
      {
1161
1278
        /* skip hole */
1162
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1279
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1163
1280
          file_buffer_start= file_buff->read_next();
1164
 
        write_begin= ptr->second;
1165
 
        ++ptr;
 
1281
        write_begin= ptr->end;
 
1282
        ptr++;
1166
1283
      }
1167
1284
      else
1168
1285
        write_begin= write_end;
1172
1289
 
1173
1290
    }
1174
1291
 
1175
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1176
 
        internal::my_close(update_temp_file, MYF(0)))
 
1292
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1293
        my_close(update_temp_file, MYF(0)))
1177
1294
      return(-1);
1178
1295
 
1179
1296
    share->update_file_opened= false;
1180
1297
 
1181
1298
    if (share->tina_write_opened)
1182
1299
    {
1183
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1300
      if (my_close(share->tina_write_filedes, MYF(0)))
1184
1301
        return(-1);
1185
1302
      /*
1186
1303
        Mark that the writer fd is closed, so that init_tina_writer()
1193
1310
      Close opened fildes's. Then move updated file in place
1194
1311
      of the old datafile.
1195
1312
    */
1196
 
    std::string rename_file= share->table_name;
1197
 
    rename_file.append(CSN_EXT);
1198
 
    if (internal::my_close(data_file, MYF(0)) ||
1199
 
        internal::my_rename(rename_file.c_str(),
1200
 
                            share->data_file_name.c_str(), MYF(0)))
 
1313
    if (my_close(data_file, MYF(0)) ||
 
1314
        my_rename(fn_format(updated_fname, share->table_name.c_str(),
 
1315
                            "", CSN_EXT,
 
1316
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1317
                  share->data_file_name, MYF(0)))
1201
1318
      return(-1);
1202
1319
 
1203
1320
    /* Open the file again */
1204
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1321
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1205
1322
      return(-1);
1206
1323
    /*
1207
1324
      As we reopened the data file, increase share->data_file_version
1228
1345
 
1229
1346
  return(0);
1230
1347
error:
1231
 
  internal::my_close(update_temp_file, MYF(0));
 
1348
  my_close(update_temp_file, MYF(0));
1232
1349
  share->update_file_opened= false;
1233
1350
  return(-1);
1234
1351
}
1266
1383
  this (the database will call ::open() if it needs to).
1267
1384
*/
1268
1385
 
1269
 
int Tina::doCreateTable(Session &session,
 
1386
int Tina::doCreateTable(Session *, const char *table_name,
1270
1387
                        Table& table_arg,
1271
 
                        const drizzled::identifier::Table &identifier,
1272
 
                        drizzled::message::Table &create_proto)
 
1388
                        drizzled::message::Table& create_proto)
1273
1389
{
1274
1390
  char name_buff[FN_REFLEN];
1275
1391
  int create_file;
1277
1393
  /*
1278
1394
    check columns
1279
1395
  */
1280
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
 
       iter != fields.end();
1283
 
       iter++)
 
1396
  for (Field **field= table_arg.s->field; *field; field++)
1284
1397
  {
1285
 
    if (not *iter) // Historical legacy for NULL array end.
1286
 
      continue;
1287
 
 
1288
 
    if ((*iter)->real_maybe_null())
 
1398
    if ((*field)->real_maybe_null())
1289
1399
    {
1290
1400
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1401
      return(HA_ERR_UNSUPPORTED);
1293
1403
  }
1294
1404
 
1295
1405
 
1296
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1406
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
 
1407
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1408
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1299
1409
    return(-1);
1300
1410
 
1301
1411
  write_meta_file(create_file, 0, false);
1302
 
  internal::my_close(create_file, MYF(0));
 
1412
  my_close(create_file, MYF(0));
1303
1413
 
1304
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1414
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
 
1415
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1416
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1307
1417
    return(-1);
1308
1418
 
1309
 
  internal::my_close(create_file, MYF(0));
 
1419
  my_close(create_file, MYF(0));
1310
1420
 
1311
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
 
1421
  pthread_mutex_lock(&proto_cache_mutex);
 
1422
  proto_cache.insert(make_pair(table_name, create_proto));
 
1423
  pthread_mutex_unlock(&proto_cache_mutex);
1312
1424
 
1313
1425
  return 0;
1314
1426
}
1323
1435
  "CSV storage engine",
1324
1436
  PLUGIN_LICENSE_GPL,
1325
1437
  tina_init_func, /* Plugin Init */
1326
 
  NULL,                       /* depends */
 
1438
  tina_done_func, /* Plugin Deinit */
 
1439
  NULL,                       /* status variables                */
 
1440
  NULL,                       /* system variables                */
1327
1441
  NULL                        /* config options                  */
1328
1442
}
1329
1443
DRIZZLE_DECLARE_PLUGIN_END;