12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17
17
Make sure to look at ha_tina.h for more details.
43
43
#include "config.h"
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
#include <drizzled/field/timestamp.h>
47
46
#include <drizzled/error.h>
48
47
#include <drizzled/table.h>
49
48
#include <drizzled/session.h>
76
77
static int read_meta_file(int meta_file, ha_rows *rows);
77
78
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
79
void tina_get_status(void* param, int concurrent_insert);
80
void tina_update_status(void* param);
81
bool tina_check_status(void* param);
83
80
/* Stuff for shares */
84
81
pthread_mutex_t tina_mutex;
88
85
*****************************************************************************/
91
Used for sorting chains with qsort().
93
static int sort_set (tina_set *a, tina_set *b)
96
We assume that intervals do not intersect. So, it is enought to compare
97
any two points. Here we take start of intervals for comparison.
99
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
104
88
If frm_error() is called in table.cc this is called to find out what file
105
89
extensions exist for this Cursor.
128
111
pthread_mutex_destroy(&tina_mutex);
131
virtual Cursor *create(TableShare &table,
132
drizzled::memory::Root *mem_root)
114
virtual Cursor *create(Table &table)
134
return new (mem_root) ha_tina(*this, table);
116
return new ha_tina(*this, table);
137
119
const char **bas_ext() const {
141
123
int doCreateTable(Session &,
142
124
Table &table_arg,
143
drizzled::TableIdentifier &identifier,
125
const drizzled::identifier::Table &identifier,
144
126
drizzled::message::Table&);
146
128
int doGetTableDefinition(Session& session,
147
TableIdentifier &identifier,
129
const drizzled::identifier::Table &identifier,
148
130
drizzled::message::Table &table_message);
150
/* Temp only engine, so do not return values. */
151
void doGetTableNames(drizzled::CachedDirectory &, SchemaIdentifier&, set<string>&) { };
153
int doDropTable(Session&, TableIdentifier &identifier);
132
int doDropTable(Session&, const drizzled::identifier::Table &identifier);
154
133
TinaShare *findOpenTable(const string table_name);
155
134
void addOpenTable(const string &table_name, TinaShare *);
156
135
void deleteOpenTable(const string &table_name);
159
138
uint32_t max_keys() const { return 0; }
160
139
uint32_t max_key_parts() const { return 0; }
161
140
uint32_t max_key_length() const { return 0; }
162
bool doDoesTableExist(Session& session, TableIdentifier &identifier);
163
int doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to);
141
bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
165
144
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
166
drizzled::SchemaIdentifier &schema_identifier,
167
drizzled::TableIdentifiers &set_of_identifiers);
145
const drizzled::identifier::Schema &schema_identifier,
146
drizzled::identifier::Table::vector &set_of_identifiers);
170
149
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
171
drizzled::SchemaIdentifier&,
172
drizzled::TableIdentifiers&)
150
const drizzled::identifier::Schema&,
151
drizzled::identifier::Table::vector&)
176
155
int Tina::doRenameTable(Session &session,
177
TableIdentifier &from, TableIdentifier &to)
156
const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
180
159
for (const char **ext= bas_ext(); *ext ; ext++)
190
session.renameTableMessage(from, to);
169
session.getMessageCache().renameTableMessage(from, to);
195
bool Tina::doDoesTableExist(Session &session, TableIdentifier &identifier)
174
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
197
return session.doesTableMessageExist(identifier);
176
return session.getMessageCache().doesTableMessageExist(identifier);
201
180
int Tina::doDropTable(Session &session,
202
TableIdentifier &identifier)
181
const drizzled::identifier::Table &identifier)
205
184
int enoent_or_zero= ENOENT; // Error if no file was deleted
206
char buff[FN_REFLEN];
208
186
for (const char **ext= bas_ext(); *ext ; ext++)
210
internal::fn_format(buff, identifier.getPath().c_str(), "", *ext,
211
MY_UNPACK_FILENAME|MY_APPEND_EXT);
212
if (internal::my_delete_with_symlink(buff, MYF(0)))
188
std::string full_name= identifier.getPath();
189
full_name.append(*ext);
191
if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
214
193
if ((error= errno) != ENOENT)
218
198
enoent_or_zero= 0; // No error for ENOENT
219
200
error= enoent_or_zero;
222
session.removeTableMessage(identifier);
203
session.getMessageCache().removeTableMessage(identifier);
249
230
int Tina::doGetTableDefinition(Session &session,
250
drizzled::TableIdentifier &identifier,
231
const drizzled::identifier::Table &identifier,
251
232
drizzled::message::Table &table_message)
253
if (session.getTableMessage(identifier, table_message))
234
if (session.getMessageCache().getTableMessage(identifier, table_message))
274
TinaShare::TinaShare(const char *table_name_arg)
275
: table_name(table_name_arg), use_count(0), saved_data_file_length(0),
276
update_file_opened(false), tina_write_opened(false),
277
crashed(false), rows_recorded(0), data_file_version(0)
255
TinaShare::TinaShare(const std::string &table_name_arg) :
256
table_name(table_name_arg),
257
data_file_name(table_name_arg),
259
saved_data_file_length(0),
260
update_file_opened(false),
261
tina_write_opened(false),
279
thr_lock_init(&lock);
280
internal::fn_format(data_file_name, table_name_arg, "", CSV_EXT,
281
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
266
data_file_name.append(CSV_EXT);
284
269
TinaShare::~TinaShare()
286
thr_lock_delete(&lock);
287
271
pthread_mutex_destroy(&mutex);
291
275
Simple lock controls.
293
TinaShare *ha_tina::get_share(const char *table_name)
277
TinaShare *ha_tina::get_share(const std::string &table_name)
295
279
pthread_mutex_lock(&tina_mutex);
297
Tina *a_tina= static_cast<Tina *>(engine);
281
Tina *a_tina= static_cast<Tina *>(getEngine());
298
282
share= a_tina->findOpenTable(table_name);
300
char meta_file_name[FN_REFLEN];
284
std::string meta_file_name;
301
285
struct stat file_stat;
317
internal::fn_format(meta_file_name, table_name, "", CSM_EXT,
318
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
301
meta_file_name.assign(table_name);
302
meta_file_name.append(CSM_EXT);
320
if (stat(share->data_file_name, &file_stat))
304
if (stat(share->data_file_name.c_str(), &file_stat))
322
306
pthread_mutex_unlock(&tina_mutex);
336
320
Usually this will result in auto-repair, and we will get a good
337
321
meta-file in the end.
339
if ((share->meta_file= internal::my_open(meta_file_name,
323
if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
340
324
O_RDWR|O_CREAT, MYF(0))) == -1)
341
325
share->crashed= true;
465
449
(void)write_meta_file(share->meta_file, share->rows_recorded, true);
467
451
if ((share->tina_write_filedes=
468
internal::my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
452
internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
470
454
share->crashed= true;
548
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
532
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
549
533
:Cursor(engine_arg, table_arg),
551
535
These definitions are found in Cursor.h
552
536
They are not probably completely right.
554
538
current_position(0), next_position(0), local_saved_data_file_length(0),
555
file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
556
local_data_file_version(0), records_is_known(0)
539
file_buff(0), local_data_file_version(0), records_is_known(0)
558
541
/* Set our original buffers from pre-allocated memory */
559
542
buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
561
543
file_buff= new Transparent_file();
663
645
int ha_tina::chain_append()
665
if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
666
(chain_ptr -1)->end= next_position;
647
if (chain.size() > 0 && chain.back().second == current_position)
648
chain.back().second= next_position;
669
/* We set up for the next position */
670
if ((off_t)(chain_ptr - chain) == (chain_size -1))
672
off_t location= chain_ptr - chain;
673
chain_size += DEFAULT_CHAIN_LENGTH;
676
if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
681
tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
684
memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
688
chain_ptr= chain + location;
690
chain_ptr->begin= current_position;
691
chain_ptr->end= next_position;
650
chain.push_back(make_pair(current_position, next_position));
719
675
error= HA_ERR_CRASHED_ON_USAGE;
721
memset(buf, 0, table->s->null_bytes);
677
memset(buf, 0, getTable()->getShare()->null_bytes);
723
for (Field **field=table->field ; *field ; field++)
679
for (Field **field= getTable()->getFields() ; *field ; field++)
790
746
/* This masks a bug in the logic for a SELECT * */
791
747
(*field)->setWriteSet();
792
if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
748
if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
796
753
if ((*field)->flags & BLOB_FLAG)
804
761
memcpy(&src, blob->ptr + packlength, sizeof(char*));
807
tgt= (unsigned char*) alloc_root(&blobroot, length);
764
tgt= (unsigned char*) blobroot.alloc_root(length);
808
765
memmove(tgt, src, length);
809
766
memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
823
Three functions below are needed to enable concurrent insert functionality
824
for CSV engine. For more details see mysys/thr_lock.c
827
void tina_get_status(void* param, int)
829
ha_tina *tina= (ha_tina*) param;
833
void tina_update_status(void* param)
835
ha_tina *tina= (ha_tina*) param;
836
tina->update_status();
839
/* this should exist and return 0 for concurrent insert to work */
840
bool tina_check_status(void *)
846
Save the state of the table
852
This function is used to retrieve the file length. During the lock
853
phase of concurrent insert. For more details see comment to
854
ha_tina::update_status below.
857
void ha_tina::get_status()
859
local_saved_data_file_length= share->saved_data_file_length;
864
Correct the state of the table. Called by unlock routines
865
before the write lock is released.
871
When we employ concurrent insert lock, we save current length of the file
872
during the lock phase. We do not read further saved value, as we don't
873
want to interfere with undergoing concurrent insert. Writers update file
874
length info during unlock with update_status().
877
For log tables concurrent insert works different. The reason is that
878
log tables are always opened and locked. And as they do not unlock
879
tables, the file length after writes should be updated in a different
883
void ha_tina::update_status()
885
/* correct local_saved_data_file_length for writers */
886
share->saved_data_file_length= local_saved_data_file_length;
891
780
Open a database file. Keep in mind that tables are caches, so
892
781
this will not be called for every request. Any sort of positions
893
782
that need to be reset should be kept in the ::extra() call.
895
int ha_tina::open(const char *name, int, uint32_t)
784
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
897
if (!(share= get_share(name)))
786
if (not (share= get_share(identifier.getPath().c_str())))
900
789
if (share->crashed)
906
795
local_data_file_version= share->data_file_version;
907
if ((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
796
if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
912
801
so that they could save/update local_saved_data_file_length value
913
802
during locking. This is needed to enable concurrent inserts.
915
thr_lock_data_init(&share->lock, &lock, (void*) this);
916
804
ref_length=sizeof(off_t);
918
share->lock.get_status= tina_get_status;
919
share->lock.update_status= tina_update_status;
920
share->lock.check_status= tina_check_status;
927
810
Close a database file. We remove ourselves from the shared strucutre.
928
811
If it is empty we destroy it.
939
822
of the file and appends the data. In an error case it really should
940
823
just truncate to the original position (this is not done yet).
942
int ha_tina::write_row(unsigned char * buf)
825
int ha_tina::doInsertRecord(unsigned char * buf)
946
829
if (share->crashed)
947
830
return(HA_ERR_CRASHED_ON_USAGE);
949
ha_statistic_increment(&system_status_var::ha_write_count);
951
832
size= encode_quote(buf);
953
834
if (!share->tina_write_opened)
999
880
This will be called in a table scan right before the previous ::rnd_next()
1002
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
883
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
1007
ha_statistic_increment(&system_status_var::ha_update_count);
1009
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1010
table->timestamp_field->set_time();
1012
888
size= encode_quote(new_data);
1015
891
During update we mark each updating record as deleted
1016
892
(see the chain_append()) then write new one to the temporary data file.
1017
At the end of the sequence in the rnd_end() we append all non-marked
893
At the end of the sequence in the doEndTableScan() we append all non-marked
1018
894
records from the data file to the temporary data file then rename it.
1019
895
The temp_file_length is used to calculate new data file length.
1044
920
The table will then be deleted/positioned based on the ORDER (so RANDOM,
1047
int ha_tina::delete_row(const unsigned char *)
923
int ha_tina::doDeleteRecord(const unsigned char *)
1049
ha_statistic_increment(&system_status_var::ha_delete_count);
1051
926
if (chain_append())
1082
957
local_data_file_version= share->data_file_version;
1083
958
if (internal::my_close(data_file, MYF(0)) ||
1084
(data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
959
(data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
1087
962
file_buff->init_buff(data_file);
1215
1090
to the given "hole", stored in the buffer. "Valid" here means,
1216
1091
not listed in the chain of deleted records ("holes").
1218
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1093
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
1220
if (closest_hole == chain_ptr) /* no more chains */
1095
if (closest_hole == chain.end()) /* no more chains */
1221
1096
*end_pos= file_buff->end();
1223
1098
*end_pos= std::min(file_buff->end(),
1224
closest_hole->begin);
1225
return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1099
closest_hole->first);
1100
return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
1232
1107
slots to clean up all of the dead space we have collected while
1233
1108
performing deletes/updates.
1235
int ha_tina::rnd_end()
1110
int ha_tina::doEndTableScan()
1237
char updated_fname[FN_REFLEN];
1238
1112
off_t file_buffer_start= 0;
1240
free_root(&blobroot, MYF(0));
1114
blobroot.free_root(MYF(0));
1241
1115
records_is_known= 1;
1243
if ((chain_ptr - chain) > 0)
1117
if (chain.size() > 0)
1245
tina_set *ptr= chain;
1119
vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
1248
1122
Re-read the beginning of a file (as the buffer should point to the
1254
1128
The sort is needed when there were updates/deletes with random orders.
1255
1129
It sorts so that we move the firts blocks to the beginning.
1257
internal::my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1258
(qsort_cmp)sort_set);
1131
sort(chain.begin(), chain.end());
1260
1133
off_t write_begin= 0, write_end;
1320
1193
Close opened fildes's. Then move updated file in place
1321
1194
of the old datafile.
1196
std::string rename_file= share->table_name;
1197
rename_file.append(CSN_EXT);
1323
1198
if (internal::my_close(data_file, MYF(0)) ||
1324
internal::my_rename(internal::fn_format(updated_fname,
1325
share->table_name.c_str(),
1327
MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1328
share->data_file_name, MYF(0)))
1199
internal::my_rename(rename_file.c_str(),
1200
share->data_file_name.c_str(), MYF(0)))
1331
1203
/* Open the file again */
1332
if (((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1204
if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
1335
1207
As we reopened the data file, increase share->data_file_version
1397
1269
int Tina::doCreateTable(Session &session,
1398
1270
Table& table_arg,
1399
drizzled::TableIdentifier &identifier,
1271
const drizzled::identifier::Table &identifier,
1400
1272
drizzled::message::Table &create_proto)
1402
1274
char name_buff[FN_REFLEN];
1408
for (Field **field= table_arg.s->field; *field; field++)
1280
const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
iter != fields.end();
1410
if ((*field)->real_maybe_null())
1285
if (not *iter) // Historical legacy for NULL array end.
1288
if ((*iter)->real_maybe_null())
1412
1290
my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1413
1291
return(HA_ERR_UNSUPPORTED);
1431
1309
internal::my_close(create_file, MYF(0));
1433
session.storeTableMessage(identifier, create_proto);
1311
session.getMessageCache().storeTableMessage(identifier, create_proto);
1445
1323
"CSV storage engine",
1446
1324
PLUGIN_LICENSE_GPL,
1447
1325
tina_init_func, /* Plugin Init */
1448
NULL, /* system variables */
1449
1327
NULL /* config options */
1451
1329
DRIZZLE_DECLARE_PLUGIN_END;