~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/csv/ha_tina.cc

  • Committer: Monty Taylor
  • Date: 2010-06-02 22:35:45 UTC
  • mto: This revision was merged to the branch mainline in revision 1586.
  • Revision ID: mordred@inaugust.com-20100602223545-q8ekf9b40a85nwuf
Rearragned unittests into a single exe because of how we need to link it
(thanks lifeless)
Link with server symbols without needing to build a library.
Added an additional atomics test which tests whatever version of the atomics
lib the running platform would actually use.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
  You should have received a copy of the GNU General Public License
13
13
  along with this program; if not, write to the Free Software
14
 
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Make sure to look at ha_tina.h for more details.
53
53
 
54
54
#include <fcntl.h>
55
55
 
56
 
#include <algorithm>
57
 
#include <vector>
58
56
#include <string>
59
57
#include <map>
60
58
 
78
76
static int read_meta_file(int meta_file, ha_rows *rows);
79
77
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
80
78
 
 
79
void tina_get_status(void* param, int concurrent_insert);
 
80
void tina_update_status(void* param);
 
81
bool tina_check_status(void* param);
 
82
 
81
83
/* Stuff for shares */
82
84
pthread_mutex_t tina_mutex;
83
85
 
86
88
 *****************************************************************************/
87
89
 
88
90
/*
 
91
  Used for sorting chains with qsort().
 
92
*/
 
93
static int sort_set (tina_set *a, tina_set *b)
 
94
{
 
95
  /*
 
96
    We assume that intervals do not intersect. So, it is enought to compare
 
97
    any two points. Here we take start of intervals for comparison.
 
98
  */
 
99
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
100
}
 
101
 
 
102
 
 
103
/*
89
104
  If frm_error() is called in table.cc this is called to find out what file
90
105
  extensions exist for this Cursor.
91
106
*/
112
127
    pthread_mutex_destroy(&tina_mutex);
113
128
  }
114
129
 
115
 
  virtual Cursor *create(Table &table)
 
130
  virtual Cursor *create(TableShare &table,
 
131
                         drizzled::memory::Root *mem_root)
116
132
  {
117
 
    return new ha_tina(*this, table);
 
133
    return new (mem_root) ha_tina(*this, table);
118
134
  }
119
135
 
120
136
  const char **bas_ext() const {
123
139
 
124
140
  int doCreateTable(Session &,
125
141
                    Table &table_arg,
126
 
                    const drizzled::TableIdentifier &identifier,
 
142
                    drizzled::TableIdentifier &identifier,
127
143
                    drizzled::message::Table&);
128
144
 
129
145
  int doGetTableDefinition(Session& session,
130
 
                           const drizzled::TableIdentifier &identifier,
 
146
                           TableIdentifier &identifier,
131
147
                           drizzled::message::Table &table_message);
132
148
 
133
 
  int doDropTable(Session&, const drizzled::TableIdentifier &identifier);
 
149
  /* Temp only engine, so do not return values. */
 
150
  void doGetTableNames(drizzled::CachedDirectory &, SchemaIdentifier&, set<string>&) { };
 
151
 
 
152
  int doDropTable(Session&, TableIdentifier &identifier);
134
153
  TinaShare *findOpenTable(const string table_name);
135
154
  void addOpenTable(const string &table_name, TinaShare *);
136
155
  void deleteOpenTable(const string &table_name);
139
158
  uint32_t max_keys()          const { return 0; }
140
159
  uint32_t max_key_parts()     const { return 0; }
141
160
  uint32_t max_key_length()    const { return 0; }
142
 
  bool doDoesTableExist(Session& session, const drizzled::TableIdentifier &identifier);
143
 
  int doRenameTable(Session&, const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to);
 
161
  bool doDoesTableExist(Session& session, TableIdentifier &identifier);
 
162
  int doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to);
144
163
 
145
164
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
146
 
                             const drizzled::SchemaIdentifier &schema_identifier,
147
 
                             drizzled::TableIdentifier::vector &set_of_identifiers);
 
165
                             drizzled::SchemaIdentifier &schema_identifier,
 
166
                             drizzled::TableIdentifiers &set_of_identifiers);
148
167
};
149
168
 
150
169
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
151
 
                                 const drizzled::SchemaIdentifier&,
152
 
                                 drizzled::TableIdentifier::vector&)
 
170
                                 drizzled::SchemaIdentifier&,
 
171
                                 drizzled::TableIdentifiers&)
153
172
{
154
173
}
155
174
 
156
175
int Tina::doRenameTable(Session &session,
157
 
                        const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to)
 
176
                        TableIdentifier &from, TableIdentifier &to)
158
177
{
159
178
  int error= 0;
160
179
  for (const char **ext= bas_ext(); *ext ; ext++)
167
186
    }
168
187
  }
169
188
 
170
 
  session.getMessageCache().renameTableMessage(from, to);
 
189
  session.renameTableMessage(from, to);
171
190
 
172
191
  return error;
173
192
}
174
193
 
175
 
bool Tina::doDoesTableExist(Session &session, const drizzled::TableIdentifier &identifier)
 
194
bool Tina::doDoesTableExist(Session &session, TableIdentifier &identifier)
176
195
{
177
 
  return session.getMessageCache().doesTableMessageExist(identifier);
 
196
  return session.doesTableMessageExist(identifier);
178
197
}
179
198
 
180
199
 
181
200
int Tina::doDropTable(Session &session,
182
 
                      const drizzled::TableIdentifier &identifier)
 
201
                      TableIdentifier &identifier)
183
202
{
184
203
  int error= 0;
185
204
  int enoent_or_zero= ENOENT;                   // Error if no file was deleted
 
205
  char buff[FN_REFLEN];
186
206
 
187
207
  for (const char **ext= bas_ext(); *ext ; ext++)
188
208
  {
189
 
    std::string full_name= identifier.getPath();
190
 
    full_name.append(*ext);
191
 
 
192
 
    if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
 
209
    internal::fn_format(buff, identifier.getPath().c_str(), "", *ext,
 
210
                        MY_UNPACK_FILENAME|MY_APPEND_EXT);
 
211
    if (internal::my_delete_with_symlink(buff, MYF(0)))
193
212
    {
194
213
      if ((error= errno) != ENOENT)
195
214
        break;
196
215
    }
197
216
    else
198
 
    {
199
217
      enoent_or_zero= 0;                        // No error for ENOENT
200
 
    }
201
218
    error= enoent_or_zero;
202
219
  }
203
220
 
204
 
  session.getMessageCache().removeTableMessage(identifier);
 
221
  session.removeTableMessage(identifier);
205
222
 
206
223
  return error;
207
224
}
229
246
 
230
247
 
231
248
int Tina::doGetTableDefinition(Session &session,
232
 
                               const drizzled::TableIdentifier &identifier,
 
249
                               drizzled::TableIdentifier &identifier,
233
250
                               drizzled::message::Table &table_message)
234
251
{
235
 
  if (session.getMessageCache().getTableMessage(identifier, table_message))
 
252
  if (session.getTableMessage(identifier, table_message))
236
253
    return EEXIST;
237
254
 
238
255
  return ENOENT;
253
270
 
254
271
 
255
272
 
256
 
TinaShare::TinaShare(const std::string &table_name_arg) : 
257
 
  table_name(table_name_arg),
258
 
  data_file_name(table_name_arg),
259
 
  use_count(0),
260
 
  saved_data_file_length(0),
261
 
  update_file_opened(false),
262
 
  tina_write_opened(false),
263
 
  crashed(false),
264
 
  rows_recorded(0),
265
 
  data_file_version(0)
 
273
TinaShare::TinaShare(const char *table_name_arg)
 
274
  : table_name(table_name_arg), use_count(0), saved_data_file_length(0),
 
275
    update_file_opened(false), tina_write_opened(false),
 
276
    crashed(false), rows_recorded(0), data_file_version(0)
266
277
{
267
 
  data_file_name.append(CSV_EXT);
 
278
  thr_lock_init(&lock);
 
279
  internal::fn_format(data_file_name, table_name_arg, "", CSV_EXT,
 
280
            MY_REPLACE_EXT|MY_UNPACK_FILENAME);
268
281
}
269
282
 
270
283
TinaShare::~TinaShare()
271
284
{
 
285
  thr_lock_delete(&lock);
272
286
  pthread_mutex_destroy(&mutex);
273
287
}
274
288
 
275
289
/*
276
290
  Simple lock controls.
277
291
*/
278
 
TinaShare *ha_tina::get_share(const std::string &table_name)
 
292
TinaShare *ha_tina::get_share(const char *table_name)
279
293
{
280
294
  pthread_mutex_lock(&tina_mutex);
281
295
 
282
 
  Tina *a_tina= static_cast<Tina *>(getEngine());
 
296
  Tina *a_tina= static_cast<Tina *>(engine);
283
297
  share= a_tina->findOpenTable(table_name);
284
298
 
285
 
  std::string meta_file_name;
 
299
  char meta_file_name[FN_REFLEN];
286
300
  struct stat file_stat;
287
301
 
288
302
  /*
299
313
      return NULL;
300
314
    }
301
315
 
302
 
    meta_file_name.assign(table_name);
303
 
    meta_file_name.append(CSM_EXT);
 
316
    internal::fn_format(meta_file_name, table_name, "", CSM_EXT,
 
317
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
304
318
 
305
 
    if (stat(share->data_file_name.c_str(), &file_stat))
 
319
    if (stat(share->data_file_name, &file_stat))
306
320
    {
307
321
      pthread_mutex_unlock(&tina_mutex);
308
322
      delete share;
321
335
      Usually this will result in auto-repair, and we will get a good
322
336
      meta-file in the end.
323
337
    */
324
 
    if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
 
338
    if ((share->meta_file= internal::my_open(meta_file_name,
325
339
                                             O_RDWR|O_CREAT, MYF(0))) == -1)
326
340
      share->crashed= true;
327
341
 
450
464
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
451
465
 
452
466
  if ((share->tina_write_filedes=
453
 
        internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
 
467
        internal::my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
454
468
  {
455
469
    share->crashed= true;
456
470
    return(1);
481
495
      share->tina_write_opened= false;
482
496
    }
483
497
 
484
 
    Tina *a_tina= static_cast<Tina *>(getEngine());
 
498
    Tina *a_tina= static_cast<Tina *>(engine);
485
499
    a_tina->deleteOpenTable(share->table_name);
486
500
    delete share;
487
501
  }
530
544
 
531
545
 
532
546
 
533
 
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
 
547
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
534
548
  :Cursor(engine_arg, table_arg),
535
549
  /*
536
550
    These definitions are found in Cursor.h
537
551
    They are not probably completely right.
538
552
  */
539
553
  current_position(0), next_position(0), local_saved_data_file_length(0),
540
 
  file_buff(0), local_data_file_version(0), records_is_known(0)
 
554
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
555
  local_data_file_version(0), records_is_known(0)
541
556
{
542
557
  /* Set our original buffers from pre-allocated memory */
543
558
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
559
  chain= chain_buffer;
544
560
  file_buff= new Transparent_file();
545
561
}
546
562
 
557
573
 
558
574
  buffer.length(0);
559
575
 
560
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
576
  for (Field **field=table->field ; *field ; field++)
561
577
  {
562
578
    const char *ptr;
563
579
    const char *end_ptr;
597
613
        {
598
614
          buffer.append('\\');
599
615
          buffer.append('"');
600
 
          (void) *ptr++;
 
616
          *ptr++;
601
617
        }
602
618
        else if (*ptr == '\r')
603
619
        {
604
620
          buffer.append('\\');
605
621
          buffer.append('r');
606
 
          (void) *ptr++;
 
622
          *ptr++;
607
623
        }
608
624
        else if (*ptr == '\\')
609
625
        {
610
626
          buffer.append('\\');
611
627
          buffer.append('\\');
612
 
          (void) *ptr++;
 
628
          *ptr++;
613
629
        }
614
630
        else if (*ptr == '\n')
615
631
        {
616
632
          buffer.append('\\');
617
633
          buffer.append('n');
618
 
          (void) *ptr++;
 
634
          *ptr++;
619
635
        }
620
636
        else
621
637
          buffer.append(*ptr++);
645
661
*/
646
662
int ha_tina::chain_append()
647
663
{
648
 
  if (chain.size() > 0 && chain.back().second == current_position)
649
 
    chain.back().second= next_position;
 
664
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
665
    (chain_ptr -1)->end= next_position;
650
666
  else
651
 
    chain.push_back(make_pair(current_position, next_position));
 
667
  {
 
668
    /* We set up for the next position */
 
669
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
670
    {
 
671
      off_t location= chain_ptr - chain;
 
672
      chain_size += DEFAULT_CHAIN_LENGTH;
 
673
      if (chain_alloced)
 
674
      {
 
675
        if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
 
676
          return -1;
 
677
      }
 
678
      else
 
679
      {
 
680
        tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
 
681
        if (ptr == NULL)
 
682
          return -1;
 
683
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
684
        chain= ptr;
 
685
        chain_alloced++;
 
686
      }
 
687
      chain_ptr= chain + location;
 
688
    }
 
689
    chain_ptr->begin= current_position;
 
690
    chain_ptr->end= next_position;
 
691
    chain_ptr++;
 
692
  }
 
693
 
652
694
  return 0;
653
695
}
654
696
 
675
717
 
676
718
  error= HA_ERR_CRASHED_ON_USAGE;
677
719
 
678
 
  memset(buf, 0, getTable()->getShare()->null_bytes);
 
720
  memset(buf, 0, table->getShare()->null_bytes);
679
721
 
680
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
722
  for (Field **field=table->field ; *field ; field++)
681
723
  {
682
724
    char curr_char;
683
725
 
746
788
    {
747
789
      /* This masks a bug in the logic for a SELECT * */
748
790
      (*field)->setWriteSet();
749
 
      if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
750
 
      {
 
791
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
792
                          CHECK_FIELD_WARN))
751
793
        goto err;
752
 
      }
753
794
 
754
795
      if ((*field)->flags & BLOB_FLAG)
755
796
      {
778
819
}
779
820
 
780
821
/*
 
822
  Three functions below are needed to enable concurrent insert functionality
 
823
  for CSV engine. For more details see mysys/thr_lock.c
 
824
*/
 
825
 
 
826
void tina_get_status(void* param, int)
 
827
{
 
828
  ha_tina *tina= (ha_tina*) param;
 
829
  tina->get_status();
 
830
}
 
831
 
 
832
void tina_update_status(void* param)
 
833
{
 
834
  ha_tina *tina= (ha_tina*) param;
 
835
  tina->update_status();
 
836
}
 
837
 
 
838
/* this should exist and return 0 for concurrent insert to work */
 
839
bool tina_check_status(void *)
 
840
{
 
841
  return 0;
 
842
}
 
843
 
 
844
/*
 
845
  Save the state of the table
 
846
 
 
847
  SYNOPSIS
 
848
    get_status()
 
849
 
 
850
  DESCRIPTION
 
851
    This function is used to retrieve the file length. During the lock
 
852
    phase of concurrent insert. For more details see comment to
 
853
    ha_tina::update_status below.
 
854
*/
 
855
 
 
856
void ha_tina::get_status()
 
857
{
 
858
  local_saved_data_file_length= share->saved_data_file_length;
 
859
}
 
860
 
 
861
 
 
862
/*
 
863
  Correct the state of the table. Called by unlock routines
 
864
  before the write lock is released.
 
865
 
 
866
  SYNOPSIS
 
867
    update_status()
 
868
 
 
869
  DESCRIPTION
 
870
    When we employ concurrent insert lock, we save current length of the file
 
871
    during the lock phase. We do not read further saved value, as we don't
 
872
    want to interfere with undergoing concurrent insert. Writers update file
 
873
    length info during unlock with update_status().
 
874
 
 
875
  NOTE
 
876
    For log tables concurrent insert works different. The reason is that
 
877
    log tables are always opened and locked. And as they do not unlock
 
878
    tables, the file length after writes should be updated in a different
 
879
    way.
 
880
*/
 
881
 
 
882
void ha_tina::update_status()
 
883
{
 
884
  /* correct local_saved_data_file_length for writers */
 
885
  share->saved_data_file_length= local_saved_data_file_length;
 
886
}
 
887
 
 
888
 
 
889
/*
781
890
  Open a database file. Keep in mind that tables are caches, so
782
891
  this will not be called for every request. Any sort of positions
783
892
  that need to be reset should be kept in the ::extra() call.
784
893
*/
785
 
int ha_tina::doOpen(const TableIdentifier &identifier, int , uint32_t )
 
894
int ha_tina::open(const char *name, int, uint32_t)
786
895
{
787
 
  if (not (share= get_share(identifier.getPath().c_str())))
 
896
  if (!(share= get_share(name)))
788
897
    return(ENOENT);
789
898
 
790
899
  if (share->crashed)
794
903
  }
795
904
 
796
905
  local_data_file_version= share->data_file_version;
797
 
  if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
906
  if ((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
798
907
    return(0);
799
908
 
800
909
  /*
802
911
    so that they could save/update local_saved_data_file_length value
803
912
    during locking. This is needed to enable concurrent inserts.
804
913
  */
 
914
  thr_lock_data_init(&share->lock, &lock, (void*) this);
805
915
  ref_length=sizeof(off_t);
806
916
 
 
917
  share->lock.get_status= tina_get_status;
 
918
  share->lock.update_status= tina_update_status;
 
919
  share->lock.check_status= tina_check_status;
 
920
 
807
921
  return(0);
808
922
}
809
923
 
 
924
 
810
925
/*
811
926
  Close a database file. We remove ourselves from the shared strucutre.
812
927
  If it is empty we destroy it.
830
945
  if (share->crashed)
831
946
      return(HA_ERR_CRASHED_ON_USAGE);
832
947
 
 
948
  ha_statistic_increment(&system_status_var::ha_write_count);
 
949
 
833
950
  size= encode_quote(buf);
834
951
 
835
952
  if (!share->tina_write_opened)
886
1003
  int size;
887
1004
  int rc= -1;
888
1005
 
 
1006
  ha_statistic_increment(&system_status_var::ha_update_count);
 
1007
 
 
1008
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
1009
    table->timestamp_field->set_time();
 
1010
 
889
1011
  size= encode_quote(new_data);
890
1012
 
891
1013
  /*
923
1045
*/
924
1046
int ha_tina::doDeleteRecord(const unsigned char *)
925
1047
{
 
1048
  ha_statistic_increment(&system_status_var::ha_delete_count);
926
1049
 
927
1050
  if (chain_append())
928
1051
    return(-1);
957
1080
  {
958
1081
    local_data_file_version= share->data_file_version;
959
1082
    if (internal::my_close(data_file, MYF(0)) ||
960
 
        (data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
 
1083
        (data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
961
1084
      return 1;
962
1085
  }
963
1086
  file_buff->init_buff(data_file);
1002
1125
  current_position= next_position= 0;
1003
1126
  stats.records= 0;
1004
1127
  records_is_known= 0;
1005
 
  chain.clear();
 
1128
  chain_ptr= chain;
1006
1129
 
1007
1130
  blobroot.init_alloc_root(BLOB_MEMROOT_ALLOC_SIZE);
1008
1131
 
1091
1214
  to the given "hole", stored in the buffer. "Valid" here means,
1092
1215
  not listed in the chain of deleted records ("holes").
1093
1216
*/
1094
 
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
 
1217
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1095
1218
{
1096
 
  if (closest_hole == chain.end()) /* no more chains */
 
1219
  if (closest_hole == chain_ptr) /* no more chains */
1097
1220
    *end_pos= file_buff->end();
1098
1221
  else
1099
1222
    *end_pos= std::min(file_buff->end(),
1100
 
                       closest_hole->first);
1101
 
  return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
 
1223
                       closest_hole->begin);
 
1224
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1102
1225
}
1103
1226
 
1104
1227
 
1110
1233
*/
1111
1234
int ha_tina::doEndTableScan()
1112
1235
{
 
1236
  char updated_fname[FN_REFLEN];
1113
1237
  off_t file_buffer_start= 0;
1114
1238
 
1115
1239
  blobroot.free_root(MYF(0));
1116
1240
  records_is_known= 1;
1117
1241
 
1118
 
  if (chain.size() > 0)
 
1242
  if ((chain_ptr - chain)  > 0)
1119
1243
  {
1120
 
    vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
 
1244
    tina_set *ptr= chain;
1121
1245
 
1122
1246
    /*
1123
1247
      Re-read the beginning of a file (as the buffer should point to the
1129
1253
      The sort is needed when there were updates/deletes with random orders.
1130
1254
      It sorts so that we move the firts blocks to the beginning.
1131
1255
    */
1132
 
    sort(chain.begin(), chain.end());
 
1256
    internal::my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1257
                       (qsort_cmp)sort_set);
1133
1258
 
1134
1259
    off_t write_begin= 0, write_end;
1135
1260
 
1160
1285
      if (in_hole)
1161
1286
      {
1162
1287
        /* skip hole */
1163
 
        while (file_buff->end() <= ptr->second && file_buffer_start != -1)
 
1288
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
1164
1289
          file_buffer_start= file_buff->read_next();
1165
 
        write_begin= ptr->second;
1166
 
        ++ptr;
 
1290
        write_begin= ptr->end;
 
1291
        ptr++;
1167
1292
      }
1168
1293
      else
1169
1294
        write_begin= write_end;
1194
1319
      Close opened fildes's. Then move updated file in place
1195
1320
      of the old datafile.
1196
1321
    */
1197
 
    std::string rename_file= share->table_name;
1198
 
    rename_file.append(CSN_EXT);
1199
1322
    if (internal::my_close(data_file, MYF(0)) ||
1200
 
        internal::my_rename(rename_file.c_str(),
1201
 
                            share->data_file_name.c_str(), MYF(0)))
 
1323
        internal::my_rename(internal::fn_format(updated_fname,
 
1324
                                                share->table_name.c_str(),
 
1325
                                                "", CSN_EXT,
 
1326
                                                MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1327
                            share->data_file_name, MYF(0)))
1202
1328
      return(-1);
1203
1329
 
1204
1330
    /* Open the file again */
1205
 
    if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
 
1331
    if (((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1206
1332
      return(-1);
1207
1333
    /*
1208
1334
      As we reopened the data file, increase share->data_file_version
1269
1395
 
1270
1396
int Tina::doCreateTable(Session &session,
1271
1397
                        Table& table_arg,
1272
 
                        const drizzled::TableIdentifier &identifier,
 
1398
                        drizzled::TableIdentifier &identifier,
1273
1399
                        drizzled::message::Table &create_proto)
1274
1400
{
1275
1401
  char name_buff[FN_REFLEN];
1278
1404
  /*
1279
1405
    check columns
1280
1406
  */
1281
 
  const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1282
 
  for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1283
 
       iter != fields.end();
1284
 
       iter++)
 
1407
  for (Field **field= table_arg.getMutableShare()->getFields(); *field; field++)
1285
1408
  {
1286
 
    if (not *iter) // Historical legacy for NULL array end.
1287
 
      continue;
1288
 
 
1289
 
    if ((*iter)->real_maybe_null())
 
1409
    if ((*field)->real_maybe_null())
1290
1410
    {
1291
1411
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1292
1412
      return(HA_ERR_UNSUPPORTED);
1309
1429
 
1310
1430
  internal::my_close(create_file, MYF(0));
1311
1431
 
1312
 
  session.getMessageCache().storeTableMessage(identifier, create_proto);
 
1432
  session.storeTableMessage(identifier, create_proto);
1313
1433
 
1314
1434
  return 0;
1315
1435
}