~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/csv/ha_tina.cc

  • Committer: Brian Aker
  • Date: 2009-12-08 23:37:35 UTC
  • mfrom: (1192.3.82 pandora-build)
  • mto: This revision was merged to the branch mainline in revision 1241.
  • Revision ID: brian@gaz-20091208233735-6zfhecyxizlw8bub
Merge range fix

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
40
40
 
41
41
 -Brian
42
42
*/
43
 
#include "config.h"
 
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
 
46
#include <drizzled/field/timestamp.h>
46
47
#include <drizzled/error.h>
47
48
#include <drizzled/table.h>
48
49
#include <drizzled/session.h>
49
 
#include "drizzled/internal/my_sys.h"
50
50
 
51
51
#include "ha_tina.h"
52
52
 
53
 
#include <fcntl.h>
54
 
 
55
 
#include <algorithm>
56
 
#include <vector>
57
53
#include <string>
58
54
#include <map>
59
55
 
60
56
using namespace std;
61
 
using namespace drizzled;
62
57
 
63
58
/*
64
59
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
74
69
#define CSM_EXT ".CSM"               // Meta file
75
70
 
76
71
 
77
 
static int read_meta_file(int meta_file, ha_rows *rows);
78
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
 
72
static int read_meta_file(File meta_file, ha_rows *rows);
 
73
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
74
 
 
75
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
76
extern "C" void tina_update_status(void* param);
 
77
extern "C" bool tina_check_status(void* param);
79
78
 
80
79
/* Stuff for shares */
81
80
pthread_mutex_t tina_mutex;
85
84
 *****************************************************************************/
86
85
 
87
86
/*
 
87
  Used for sorting chains with qsort().
 
88
*/
 
89
static int sort_set (tina_set *a, tina_set *b)
 
90
{
 
91
  /*
 
92
    We assume that intervals do not intersect. So, it is enought to compare
 
93
    any two points. Here we take start of intervals for comparison.
 
94
  */
 
95
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
96
}
 
97
 
 
98
 
 
99
/*
88
100
  If frm_error() is called in table.cc this is called to find out what file
89
101
  extensions exist for this Cursor.
90
102
*/
103
115
   : drizzled::plugin::StorageEngine(name_arg,
104
116
                                     HTON_TEMPORARY_ONLY |
105
117
                                     HTON_NO_AUTO_INCREMENT |
106
 
                                     HTON_SKIP_STORE_LOCK),
 
118
                                     HTON_HAS_DATA_DICTIONARY |
 
119
                                     HTON_SKIP_STORE_LOCK |
 
120
                                     HTON_FILE_BASED),
107
121
    tina_open_tables()
108
122
  {}
109
 
  virtual ~Tina()
110
 
  {
111
 
    pthread_mutex_destroy(&tina_mutex);
112
 
  }
113
 
 
114
 
  virtual Cursor *create(Table &table)
115
 
  {
116
 
    return new ha_tina(*this, table);
 
123
  virtual Cursor *create(TableShare &table,
 
124
                          MEM_ROOT *mem_root)
 
125
  {
 
126
    return new (mem_root) ha_tina(*this, table);
117
127
  }
118
128
 
119
129
  const char **bas_ext() const {
120
130
    return ha_tina_exts;
121
131
  }
122
132
 
123
 
  int doCreateTable(Session &,
124
 
                    Table &table_arg,
125
 
                    const drizzled::identifier::Table &identifier,
 
133
  int doCreateTable(Session *,
 
134
                    const char *table_name,
 
135
                    Table& table_arg,
126
136
                    drizzled::message::Table&);
127
137
 
128
138
  int doGetTableDefinition(Session& session,
129
 
                           const drizzled::identifier::Table &identifier,
130
 
                           drizzled::message::Table &table_message);
131
 
 
132
 
  int doDropTable(Session&, const drizzled::identifier::Table &identifier);
 
139
                           const char* path,
 
140
                           const char *db,
 
141
                           const char *table_name,
 
142
                           const bool is_tmp,
 
143
                           drizzled::message::Table *table_proto);
 
144
 
 
145
  /* Temp only engine, so do not return values. */
 
146
  void doGetTableNames(CachedDirectory &, string& , set<string>&) { };
 
147
 
 
148
  int doDropTable(Session&, const string table_path);
133
149
  TinaShare *findOpenTable(const string table_name);
134
150
  void addOpenTable(const string &table_name, TinaShare *);
135
151
  void deleteOpenTable(const string &table_name);
138
154
  uint32_t max_keys()          const { return 0; }
139
155
  uint32_t max_key_parts()     const { return 0; }
140
156
  uint32_t max_key_length()    const { return 0; }
141
 
  bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
 
  int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
143
 
 
144
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
 
                             const drizzled::identifier::Schema &schema_identifier,
146
 
                             drizzled::identifier::Table::vector &set_of_identifiers);
147
157
};
148
158
 
149
 
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
 
                                 const drizzled::identifier::Schema&,
151
 
                                 drizzled::identifier::Table::vector&)
152
 
{
153
 
}
154
 
 
155
 
int Tina::doRenameTable(Session &session,
156
 
                        const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
157
 
{
158
 
  int error= 0;
159
 
  for (const char **ext= bas_ext(); *ext ; ext++)
160
 
  {
161
 
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
162
 
    {
163
 
      if ((error=errno) != ENOENT)
164
 
        break;
165
 
      error= 0;
166
 
    }
167
 
  }
168
 
 
169
 
  session.getMessageCache().renameTableMessage(from, to);
170
 
 
171
 
  return error;
172
 
}
173
 
 
174
 
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
175
 
{
176
 
  return session.getMessageCache().doesTableMessageExist(identifier);
177
 
}
178
 
 
179
 
 
180
 
int Tina::doDropTable(Session &session,
181
 
                      const drizzled::identifier::Table &identifier)
 
159
int Tina::doDropTable(Session&,
 
160
                        const string table_path)
182
161
{
183
162
  int error= 0;
184
163
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
 
164
  char buff[FN_REFLEN];
 
165
  ProtoCache::iterator iter;
185
166
 
186
167
  for (const char **ext= bas_ext(); *ext ; ext++)
187
168
  {
188
 
    std::string full_name= identifier.getPath();
189
 
    full_name.append(*ext);
190
 
 
191
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
 
169
    fn_format(buff, table_path.c_str(), "", *ext,
 
170
              MY_UNPACK_FILENAME|MY_APPEND_EXT);
 
171
    if (my_delete_with_symlink(buff, MYF(0)))
192
172
    {
193
 
      if ((error= errno) != ENOENT)
 
173
      if ((error= my_errno) != ENOENT)
194
174
        break;
195
175
    }
196
176
    else
197
 
    {
198
177
      enoent_or_zero= 0;                        // No error for ENOENT
199
 
    }
200
178
    error= enoent_or_zero;
201
179
  }
202
180
 
203
 
  session.getMessageCache().removeTableMessage(identifier);
 
181
  pthread_mutex_lock(&proto_cache_mutex);
 
182
  iter= proto_cache.find(table_path.c_str());
 
183
 
 
184
  if (iter!= proto_cache.end())
 
185
    proto_cache.erase(iter);
 
186
  pthread_mutex_unlock(&proto_cache_mutex);
204
187
 
205
188
  return error;
206
189
}
227
210
}
228
211
 
229
212
 
230
 
int Tina::doGetTableDefinition(Session &session,
231
 
                               const drizzled::identifier::Table &identifier,
232
 
                               drizzled::message::Table &table_message)
 
213
int Tina::doGetTableDefinition(Session&,
 
214
                               const char* path,
 
215
                               const char *,
 
216
                               const char *,
 
217
                               const bool,
 
218
                               drizzled::message::Table *table_proto)
233
219
{
234
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
235
 
    return EEXIST;
236
 
 
237
 
  return ENOENT;
 
220
  int error= ENOENT;
 
221
  ProtoCache::iterator iter;
 
222
 
 
223
  pthread_mutex_lock(&proto_cache_mutex);
 
224
  iter= proto_cache.find(path);
 
225
 
 
226
  if (iter!= proto_cache.end())
 
227
  {
 
228
    if (table_proto)
 
229
      table_proto->CopyFrom(((*iter).second));
 
230
    error= EEXIST;
 
231
  }
 
232
  pthread_mutex_unlock(&proto_cache_mutex);
 
233
 
 
234
  return error;
238
235
}
239
236
 
240
237
 
241
238
static Tina *tina_engine= NULL;
242
239
 
243
 
static int tina_init_func(drizzled::module::Context &context)
 
240
static int tina_init_func(drizzled::plugin::Registry &registry)
244
241
{
245
242
 
246
243
  tina_engine= new Tina("CSV");
247
 
  context.add(tina_engine);
 
244
  registry.add(tina_engine);
248
245
 
249
246
  pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
250
247
  return 0;
251
248
}
252
249
 
253
 
 
254
 
 
255
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
256
 
  table_name(table_name_arg),
257
 
  data_file_name(table_name_arg),
258
 
  use_count(0),
259
 
  saved_data_file_length(0),
260
 
  update_file_opened(false),
261
 
  tina_write_opened(false),
262
 
  crashed(false),
263
 
  rows_recorded(0),
264
 
  data_file_version(0)
265
 
{
266
 
  data_file_name.append(CSV_EXT);
 
250
static int tina_done_func(drizzled::plugin::Registry &registry)
 
251
{
 
252
  registry.remove(tina_engine);
 
253
  delete tina_engine;
 
254
 
 
255
  pthread_mutex_destroy(&tina_mutex);
 
256
 
 
257
  return 0;
 
258
}
 
259
 
 
260
 
 
261
TinaShare::TinaShare(const char *table_name_arg)
 
262
  : table_name(table_name_arg), use_count(0), saved_data_file_length(0),
 
263
    update_file_opened(false), tina_write_opened(false),
 
264
    crashed(false), rows_recorded(0), data_file_version(0)
 
265
{
 
266
  thr_lock_init(&lock);
 
267
  fn_format(data_file_name, table_name_arg, "", CSV_EXT,
 
268
            MY_REPLACE_EXT|MY_UNPACK_FILENAME);
267
269
}
268
270
 
269
271
TinaShare::~TinaShare()
270
272
{
 
273
  thr_lock_delete(&lock);
271
274
  pthread_mutex_destroy(&mutex);
272
275
}
273
276
 
274
277
/*
275
278
  Simple lock controls.
276
279
*/
277
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
280
TinaShare *ha_tina::get_share(const char *table_name)
278
281
{
279
282
  pthread_mutex_lock(&tina_mutex);
280
283
 
281
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
 
284
  Tina *a_tina= static_cast<Tina *>(engine);
282
285
  share= a_tina->findOpenTable(table_name);
283
286
 
284
 
  std::string meta_file_name;
 
287
  char meta_file_name[FN_REFLEN];
285
288
  struct stat file_stat;
286
289
 
287
290
  /*
298
301
      return NULL;
299
302
    }
300
303
 
301
 
    meta_file_name.assign(table_name);
302
 
    meta_file_name.append(CSM_EXT);
 
304
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
305
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
303
306
 
304
 
    if (stat(share->data_file_name.c_str(), &file_stat))
 
307
    if (stat(share->data_file_name, &file_stat))
305
308
    {
306
309
      pthread_mutex_unlock(&tina_mutex);
307
310
      delete share;
320
323
      Usually this will result in auto-repair, and we will get a good
321
324
      meta-file in the end.
322
325
    */
323
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
324
 
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
 
326
    if ((share->meta_file= my_open(meta_file_name,
 
327
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
325
328
      share->crashed= true;
326
329
 
327
330
    /*
357
360
    non-zero - error occurred
358
361
*/
359
362
 
360
 
static int read_meta_file(int meta_file, ha_rows *rows)
 
363
static int read_meta_file(File meta_file, ha_rows *rows)
361
364
{
362
365
  unsigned char meta_buffer[META_BUFFER_SIZE];
363
366
  unsigned char *ptr= meta_buffer;
364
367
 
365
368
  lseek(meta_file, 0, SEEK_SET);
366
 
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
369
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
367
370
      != META_BUFFER_SIZE)
368
371
    return(HA_ERR_CRASHED_ON_USAGE);
369
372
 
385
388
      ((bool)(*ptr)== true))
386
389
    return(HA_ERR_CRASHED_ON_USAGE);
387
390
 
388
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
391
  my_sync(meta_file, MYF(MY_WME));
389
392
 
390
393
  return(0);
391
394
}
410
413
    non-zero - error occurred
411
414
*/
412
415
 
413
 
static int write_meta_file(int meta_file, ha_rows rows, bool dirty)
 
416
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
414
417
{
415
418
  unsigned char meta_buffer[META_BUFFER_SIZE];
416
419
  unsigned char *ptr= meta_buffer;
430
433
  *ptr= (unsigned char)dirty;
431
434
 
432
435
  lseek(meta_file, 0, SEEK_SET);
433
 
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
436
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
434
437
      != META_BUFFER_SIZE)
435
438
    return(-1);
436
439
 
437
 
  internal::my_sync(meta_file, MYF(MY_WME));
 
440
  my_sync(meta_file, MYF(MY_WME));
438
441
 
439
442
  return(0);
440
443
}
449
452
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
450
453
 
451
454
  if ((share->tina_write_filedes=
452
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
455
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
453
456
  {
454
457
    share->crashed= true;
455
458
    return(1);
471
474
    /* Write the meta file. Mark it as crashed if needed. */
472
475
    (void)write_meta_file(share->meta_file, share->rows_recorded,
473
476
                          share->crashed ? true :false);
474
 
    if (internal::my_close(share->meta_file, MYF(0)))
 
477
    if (my_close(share->meta_file, MYF(0)))
475
478
      result_code= 1;
476
479
    if (share->tina_write_opened)
477
480
    {
478
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
481
      if (my_close(share->tina_write_filedes, MYF(0)))
479
482
        result_code= 1;
480
483
      share->tina_write_opened= false;
481
484
    }
482
485
 
483
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
 
486
    Tina *a_tina= static_cast<Tina *>(engine);
484
487
    a_tina->deleteOpenTable(share->table_name);
485
488
    delete share;
486
489
  }
529
532
 
530
533
 
531
534
 
532
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
 
535
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
533
536
  :Cursor(engine_arg, table_arg),
534
537
  /*
535
538
    These definitions are found in Cursor.h
536
539
    They are not probably completely right.
537
540
  */
538
541
  current_position(0), next_position(0), local_saved_data_file_length(0),
539
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
542
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
543
  local_data_file_version(0), records_is_known(0)
540
544
{
541
545
  /* Set our original buffers from pre-allocated memory */
542
546
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
547
  chain= chain_buffer;
543
548
  file_buff= new Transparent_file();
544
549
}
545
550
 
556
561
 
557
562
  buffer.length(0);
558
563
 
559
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
564
  for (Field **field=table->field ; *field ; field++)
560
565
  {
561
566
    const char *ptr;
562
567
    const char *end_ptr;
596
601
        {
597
602
          buffer.append('\\');
598
603
          buffer.append('"');
599
 
          (void) *ptr++;
 
604
          *ptr++;
600
605
        }
601
606
        else if (*ptr == '\r')
602
607
        {
603
608
          buffer.append('\\');
604
609
          buffer.append('r');
605
 
          (void) *ptr++;
 
610
          *ptr++;
606
611
        }
607
612
        else if (*ptr == '\\')
608
613
        {
609
614
          buffer.append('\\');
610
615
          buffer.append('\\');
611
 
          (void) *ptr++;
 
616
          *ptr++;
612
617
        }
613
618
        else if (*ptr == '\n')
614
619
        {
615
620
          buffer.append('\\');
616
621
          buffer.append('n');
617
 
          (void) *ptr++;
 
622
          *ptr++;
618
623
        }
619
624
        else
620
625
          buffer.append(*ptr++);
644
649
*/
645
650
int ha_tina::chain_append()
646
651
{
647
 
  if (chain.size() > 0 && chain.back().second == current_position)
648
 
    chain.back().second= next_position;
 
652
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
653
    (chain_ptr -1)->end= next_position;
649
654
  else
650
 
    chain.push_back(make_pair(current_position, next_position));
 
655
  {
 
656
    /* We set up for the next position */
 
657
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
658
    {
 
659
      off_t location= chain_ptr - chain;
 
660
      chain_size += DEFAULT_CHAIN_LENGTH;
 
661
      if (chain_alloced)
 
662
      {
 
663
        if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
 
664
          return -1;
 
665
      }
 
666
      else
 
667
      {
 
668
        tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
 
669
        if (ptr == NULL)
 
670
          return -1;
 
671
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
672
        chain= ptr;
 
673
        chain_alloced++;
 
674
      }
 
675
      chain_ptr= chain + location;
 
676
    }
 
677
    chain_ptr->begin= current_position;
 
678
    chain_ptr->end= next_position;
 
679
    chain_ptr++;
 
680
  }
 
681
 
651
682
  return 0;
652
683
}
653
684
 
661
692
  int eoln_len;
662
693
  int error;
663
694
 
664
 
  blobroot.free_root(MYF(drizzled::memory::MARK_BLOCKS_FREE));
 
695
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
665
696
 
666
697
  /*
667
698
    We do not read further then local_saved_data_file_length in order
674
705
 
675
706
  error= HA_ERR_CRASHED_ON_USAGE;
676
707
 
677
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
708
  memset(buf, 0, table->s->null_bytes);
678
709
 
679
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
710
  for (Field **field=table->field ; *field ; field++)
680
711
  {
681
712
    char curr_char;
682
713
 
745
776
    {
746
777
      /* This masks a bug in the logic for a SELECT * */
747
778
      (*field)->setWriteSet();
748
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
749
 
      {
 
779
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
780
                          CHECK_FIELD_WARN))
750
781
        goto err;
751
 
      }
752
782
 
753
783
      if ((*field)->flags & BLOB_FLAG)
754
784
      {
761
791
        memcpy(&src, blob->ptr + packlength, sizeof(char*));
762
792
        if (src)
763
793
        {
764
 
          tgt= (unsigned char*) blobroot.alloc_root(length);
 
794
          tgt= (unsigned char*) alloc_root(&blobroot, length);
765
795
          memmove(tgt, src, length);
766
796
          memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
767
797
        }
777
807
}
778
808
 
779
809
/*
 
810
  Three functions below are needed to enable concurrent insert functionality
 
811
  for CSV engine. For more details see mysys/thr_lock.c
 
812
*/
 
813
 
 
814
void tina_get_status(void* param, int)
 
815
{
 
816
  ha_tina *tina= (ha_tina*) param;
 
817
  tina->get_status();
 
818
}
 
819
 
 
820
void tina_update_status(void* param)
 
821
{
 
822
  ha_tina *tina= (ha_tina*) param;
 
823
  tina->update_status();
 
824
}
 
825
 
 
826
/* this should exist and return 0 for concurrent insert to work */
 
827
bool tina_check_status(void *)
 
828
{
 
829
  return 0;
 
830
}
 
831
 
 
832
/*
 
833
  Save the state of the table
 
834
 
 
835
  SYNOPSIS
 
836
    get_status()
 
837
 
 
838
  DESCRIPTION
 
839
    This function is used to retrieve the file length. During the lock
 
840
    phase of concurrent insert. For more details see comment to
 
841
    ha_tina::update_status below.
 
842
*/
 
843
 
 
844
void ha_tina::get_status()
 
845
{
 
846
  local_saved_data_file_length= share->saved_data_file_length;
 
847
}
 
848
 
 
849
 
 
850
/*
 
851
  Correct the state of the table. Called by unlock routines
 
852
  before the write lock is released.
 
853
 
 
854
  SYNOPSIS
 
855
    update_status()
 
856
 
 
857
  DESCRIPTION
 
858
    When we employ concurrent insert lock, we save current length of the file
 
859
    during the lock phase. We do not read further saved value, as we don't
 
860
    want to interfere with undergoing concurrent insert. Writers update file
 
861
    length info during unlock with update_status().
 
862
 
 
863
  NOTE
 
864
    For log tables concurrent insert works different. The reason is that
 
865
    log tables are always opened and locked. And as they do not unlock
 
866
    tables, the file length after writes should be updated in a different
 
867
    way.
 
868
*/
 
869
 
 
870
void ha_tina::update_status()
 
871
{
 
872
  /* correct local_saved_data_file_length for writers */
 
873
  share->saved_data_file_length= local_saved_data_file_length;
 
874
}
 
875
 
 
876
 
 
877
/*
780
878
  Open a database file. Keep in mind that tables are caches, so
781
879
  this will not be called for every request. Any sort of positions
782
880
  that need to be reset should be kept in the ::extra() call.
783
881
*/
784
 
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
 
882
int ha_tina::open(const char *name, int, uint32_t)
785
883
{
786
 
  if (not (share= get_share(identifier.getPath().c_str())))
 
884
  if (!(share= get_share(name)))
787
885
    return(ENOENT);
788
886
 
789
887
  if (share->crashed)
793
891
  }
794
892
 
795
893
  local_data_file_version= share->data_file_version;
796
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
894
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
797
895
    return(0);
798
896
 
799
897
  /*
801
899
    so that they could save/update local_saved_data_file_length value
802
900
    during locking. This is needed to enable concurrent inserts.
803
901
  */
 
902
  thr_lock_data_init(&share->lock, &lock, (void*) this);
804
903
  ref_length=sizeof(off_t);
805
904
 
 
905
  share->lock.get_status= tina_get_status;
 
906
  share->lock.update_status= tina_update_status;
 
907
  share->lock.check_status= tina_check_status;
 
908
 
806
909
  return(0);
807
910
}
808
911
 
 
912
 
809
913
/*
810
914
  Close a database file. We remove ourselves from the shared strucutre.
811
915
  If it is empty we destroy it.
813
917
int ha_tina::close(void)
814
918
{
815
919
  int rc= 0;
816
 
  rc= internal::my_close(data_file, MYF(0));
 
920
  rc= my_close(data_file, MYF(0));
817
921
  return(free_share() || rc);
818
922
}
819
923
 
822
926
  of the file and appends the data. In an error case it really should
823
927
  just truncate to the original position (this is not done yet).
824
928
*/
825
 
int ha_tina::doInsertRecord(unsigned char * buf)
 
929
int ha_tina::write_row(unsigned char * buf)
826
930
{
827
931
  int size;
828
932
 
829
933
  if (share->crashed)
830
934
      return(HA_ERR_CRASHED_ON_USAGE);
831
935
 
 
936
  ha_statistic_increment(&SSV::ha_write_count);
 
937
 
832
938
  size= encode_quote(buf);
833
939
 
834
940
  if (!share->tina_write_opened)
836
942
      return(-1);
837
943
 
838
944
   /* use pwrite, as concurrent reader could have changed the position */
839
 
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
945
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
840
946
               MYF(MY_WME | MY_NABP)))
841
947
    return(-1);
842
948
 
861
967
  if (!share->update_file_opened)
862
968
  {
863
969
    if ((update_temp_file=
864
 
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
 
970
           my_create(fn_format(updated_fname, share->table_name.c_str(),
865
971
                               "", CSN_EXT,
866
972
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
867
973
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
880
986
  This will be called in a table scan right before the previous ::rnd_next()
881
987
  call.
882
988
*/
883
 
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
 
989
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
884
990
{
885
991
  int size;
886
992
  int rc= -1;
887
993
 
 
994
  ha_statistic_increment(&SSV::ha_update_count);
 
995
 
 
996
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
997
    table->timestamp_field->set_time();
 
998
 
888
999
  size= encode_quote(new_data);
889
1000
 
890
1001
  /*
891
1002
    During update we mark each updating record as deleted
892
1003
    (see the chain_append()) then write new one to the temporary data file.
893
 
    At the end of the sequence in the doEndTableScan() we append all non-marked
 
1004
    At the end of the sequence in the rnd_end() we append all non-marked
894
1005
    records from the data file to the temporary data file then rename it.
895
1006
    The temp_file_length is used to calculate new data file length.
896
1007
  */
900
1011
  if (open_update_temp_file_if_needed())
901
1012
    goto err;
902
1013
 
903
 
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
1014
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
904
1015
               MYF(MY_WME | MY_NABP)))
905
1016
    goto err;
906
1017
  temp_file_length+= size;
920
1031
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
921
1032
  DESC, ASC).
922
1033
*/
923
 
int ha_tina::doDeleteRecord(const unsigned char *)
 
1034
int ha_tina::delete_row(const unsigned char *)
924
1035
{
 
1036
  ha_statistic_increment(&SSV::ha_delete_count);
925
1037
 
926
1038
  if (chain_append())
927
1039
    return(-1);
955
1067
  if (local_data_file_version != share->data_file_version)
956
1068
  {
957
1069
    local_data_file_version= share->data_file_version;
958
 
    if (internal::my_close(data_file, MYF(0)) ||
959
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
1070
    if (my_close(data_file, MYF(0)) ||
 
1071
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
960
1072
      return 1;
961
1073
  }
962
1074
  file_buff->init_buff(data_file);
992
1104
 
993
1105
*/
994
1106
 
995
 
int ha_tina::doStartTableScan(bool)
 
1107
int ha_tina::rnd_init(bool)
996
1108
{
997
1109
  /* set buffer to the beginning of the file */
998
1110
  if (share->crashed || init_data_file())
1001
1113
  current_position= next_position= 0;
1002
1114
  stats.records= 0;
1003
1115
  records_is_known= 0;
1004
 
  chain.clear();
 
1116
  chain_ptr= chain;
1005
1117
 
1006
 
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
 
1118
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1007
1119
 
1008
1120
  return(0);
1009
1121
}
1029
1141
  if (share->crashed)
1030
1142
      return(HA_ERR_CRASHED_ON_USAGE);
1031
1143
 
1032
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
1144
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1033
1145
 
1034
1146
  current_position= next_position;
1035
1147
 
1055
1167
*/
1056
1168
void ha_tina::position(const unsigned char *)
1057
1169
{
1058
 
  internal::my_store_ptr(ref, ref_length, current_position);
 
1170
  my_store_ptr(ref, ref_length, current_position);
1059
1171
  return;
1060
1172
}
1061
1173
 
1062
1174
 
1063
1175
/*
1064
1176
  Used to fetch a row from a posiion stored with ::position().
1065
 
  internal::my_get_ptr() retrieves the data for you.
 
1177
  my_get_ptr() retrieves the data for you.
1066
1178
*/
1067
1179
 
1068
1180
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1069
1181
{
1070
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1071
 
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
 
1182
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1183
  current_position= (off_t)my_get_ptr(pos,ref_length);
1072
1184
  return(find_current_row(buf));
1073
1185
}
1074
1186
 
1090
1202
  to the given "hole", stored in the buffer. "Valid" here means,
1091
1203
  not listed in the chain of deleted records ("holes").
1092
1204
*/
1093
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1205
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1094
1206
{
1095
 
  if (closest_hole == chain.end()) /* no more chains */
 
1207
  if (closest_hole == chain_ptr) /* no more chains */
1096
1208
    *end_pos= file_buff->end();
1097
1209
  else
1098
1210
    *end_pos= std::min(file_buff->end(),
1099
 
                       closest_hole->first);
1100
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1211
                       closest_hole->begin);
 
1212
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1101
1213
}
1102
1214
 
1103
1215
 
1107
1219
  slots to clean up all of the dead space we have collected while
1108
1220
  performing deletes/updates.
1109
1221
*/
1110
 
int ha_tina::doEndTableScan()
 
1222
int ha_tina::rnd_end()
1111
1223
{
 
1224
  char updated_fname[FN_REFLEN];
1112
1225
  off_t file_buffer_start= 0;
1113
1226
 
1114
 
  blobroot.free_root(MYF(0));
 
1227
  free_root(&blobroot, MYF(0));
1115
1228
  records_is_known= 1;
1116
1229
 
1117
 
  if (chain.size() > 0)
 
1230
  if ((chain_ptr - chain)  > 0)
1118
1231
  {
1119
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1232
    tina_set *ptr= chain;
1120
1233
 
1121
1234
    /*
1122
1235
      Re-read the beginning of a file (as the buffer should point to the
1128
1241
      The sort is needed when there were updates/deletes with random orders.
1129
1242
      It sorts so that we move the firts blocks to the beginning.
1130
1243
    */
1131
 
    sort(chain.begin(), chain.end());
 
1244
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1245
             (qsort_cmp)sort_set);
1132
1246
 
1133
1247
    off_t write_begin= 0, write_end;
1134
1248
 
1149
1263
      /* if there is something to write, write it */
1150
1264
      if (write_length)
1151
1265
      {
1152
 
        if (internal::my_write(update_temp_file,
 
1266
        if (my_write(update_temp_file,
1153
1267
                     (unsigned char*) (file_buff->ptr() +
1154
1268
                               (write_begin - file_buff->start())),
1155
1269
                     (size_t)write_length, MYF_RW))
1159
1273
      if (in_hole)
1160
1274
      {
1161
1275
        /* skip hole */
1162
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1276
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1163
1277
          file_buffer_start= file_buff->read_next();
1164
 
        write_begin= ptr->second;
1165
 
        ++ptr;
 
1278
        write_begin= ptr->end;
 
1279
        ptr++;
1166
1280
      }
1167
1281
      else
1168
1282
        write_begin= write_end;
1172
1286
 
1173
1287
    }
1174
1288
 
1175
 
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1176
 
        internal::my_close(update_temp_file, MYF(0)))
 
1289
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1290
        my_close(update_temp_file, MYF(0)))
1177
1291
      return(-1);
1178
1292
 
1179
1293
    share->update_file_opened= false;
1180
1294
 
1181
1295
    if (share->tina_write_opened)
1182
1296
    {
1183
 
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
 
1297
      if (my_close(share->tina_write_filedes, MYF(0)))
1184
1298
        return(-1);
1185
1299
      /*
1186
1300
        Mark that the writer fd is closed, so that init_tina_writer()
1193
1307
      Close opened fildes's. Then move updated file in place
1194
1308
      of the old datafile.
1195
1309
    */
1196
 
    std::string rename_file= share->table_name;
1197
 
    rename_file.append(CSN_EXT);
1198
 
    if (internal::my_close(data_file, MYF(0)) ||
1199
 
        internal::my_rename(rename_file.c_str(),
1200
 
                            share->data_file_name.c_str(), MYF(0)))
 
1310
    if (my_close(data_file, MYF(0)) ||
 
1311
        my_rename(fn_format(updated_fname, share->table_name.c_str(),
 
1312
                            "", CSN_EXT,
 
1313
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1314
                  share->data_file_name, MYF(0)))
1201
1315
      return(-1);
1202
1316
 
1203
1317
    /* Open the file again */
1204
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1318
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1205
1319
      return(-1);
1206
1320
    /*
1207
1321
      As we reopened the data file, increase share->data_file_version
1228
1342
 
1229
1343
  return(0);
1230
1344
error:
1231
 
  internal::my_close(update_temp_file, MYF(0));
 
1345
  my_close(update_temp_file, MYF(0));
1232
1346
  share->update_file_opened= false;
1233
1347
  return(-1);
1234
1348
}
1243
1357
  int rc;
1244
1358
 
1245
1359
  if (!records_is_known)
1246
 
    return(errno=HA_ERR_WRONG_COMMAND);
 
1360
    return(my_errno=HA_ERR_WRONG_COMMAND);
1247
1361
 
1248
1362
  if (!share->tina_write_opened)
1249
1363
    if (init_tina_writer())
1266
1380
  this (the database will call ::open() if it needs to).
1267
1381
*/
1268
1382
 
1269
 
int Tina::doCreateTable(Session &session,
 
1383
int Tina::doCreateTable(Session *, const char *table_name,
1270
1384
                        Table& table_arg,
1271
 
                        const drizzled::identifier::Table &identifier,
1272
 
                        drizzled::message::Table &create_proto)
 
1385
                        drizzled::message::Table& create_proto)
1273
1386
{
1274
1387
  char name_buff[FN_REFLEN];
1275
 
  int create_file;
 
1388
  File create_file;
1276
1389
 
1277
1390
  /*
1278
1391
    check columns
1279
1392
  */
1280
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
 
       iter != fields.end();
1283
 
       iter++)
 
1393
  for (Field **field= table_arg.s->field; *field; field++)
1284
1394
  {
1285
 
    if (not *iter) // Historical legacy for NULL array end.
1286
 
      continue;
1287
 
 
1288
 
    if ((*iter)->real_maybe_null())
 
1395
    if ((*field)->real_maybe_null())
1289
1396
    {
1290
1397
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1398
      return(HA_ERR_UNSUPPORTED);
1293
1400
  }
1294
1401
 
1295
1402
 
1296
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1403
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
 
1404
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1405
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1299
1406
    return(-1);
1300
1407
 
1301
1408
  write_meta_file(create_file, 0, false);
1302
 
  internal::my_close(create_file, MYF(0));
 
1409
  my_close(create_file, MYF(0));
1303
1410
 
1304
 
  if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
 
                                                            MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
 
                                        O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1411
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
 
1412
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1413
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1307
1414
    return(-1);
1308
1415
 
1309
 
  internal::my_close(create_file, MYF(0));
 
1416
  my_close(create_file, MYF(0));
1310
1417
 
1311
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
 
1418
  pthread_mutex_lock(&proto_cache_mutex);
 
1419
  proto_cache.insert(make_pair(table_name, create_proto));
 
1420
  pthread_mutex_unlock(&proto_cache_mutex);
1312
1421
 
1313
1422
  return 0;
1314
1423
}
1316
1425
 
1317
1426
DRIZZLE_DECLARE_PLUGIN
1318
1427
{
1319
 
  DRIZZLE_VERSION_ID,
1320
1428
  "CSV",
1321
1429
  "1.0",
1322
1430
  "Brian Aker, MySQL AB",
1323
1431
  "CSV storage engine",
1324
1432
  PLUGIN_LICENSE_GPL,
1325
1433
  tina_init_func, /* Plugin Init */
1326
 
  NULL,                       /* depends */
 
1434
  tina_done_func, /* Plugin Deinit */
 
1435
  NULL,                       /* status variables                */
 
1436
  NULL,                       /* system variables                */
1327
1437
  NULL                        /* config options                  */
1328
1438
}
1329
1439
DRIZZLE_DECLARE_PLUGIN_END;