12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
Make sure to look at ha_tina.h for more details.
78
76
static int read_meta_file(int meta_file, ha_rows *rows);
79
77
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
79
void tina_get_status(void* param, int concurrent_insert);
80
void tina_update_status(void* param);
81
bool tina_check_status(void* param);
81
83
/* Stuff for shares */
82
84
pthread_mutex_t tina_mutex;
86
88
*****************************************************************************/
91
Used for sorting chains with qsort().
93
static int sort_set (tina_set *a, tina_set *b)
96
We assume that intervals do not intersect. So, it is enought to compare
97
any two points. Here we take start of intervals for comparison.
99
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
89
104
If frm_error() is called in table.cc this is called to find out what file
90
105
extensions exist for this Cursor.
104
119
: drizzled::plugin::StorageEngine(name_arg,
105
120
HTON_TEMPORARY_ONLY |
106
121
HTON_NO_AUTO_INCREMENT |
107
HTON_SKIP_STORE_LOCK),
122
HTON_HAS_DATA_DICTIONARY |
123
HTON_SKIP_STORE_LOCK |
108
125
tina_open_tables()
112
pthread_mutex_destroy(&tina_mutex);
115
virtual Cursor *create(Table &table)
117
return new ha_tina(*this, table);
127
virtual Cursor *create(TableShare &table,
128
drizzled::memory::Root *mem_root)
130
return new (mem_root) ha_tina(*this, table);
120
133
const char **bas_ext() const {
121
134
return ha_tina_exts;
124
int doCreateTable(Session &,
126
const drizzled::TableIdentifier &identifier,
137
int doCreateTable(Session *,
138
const char *table_name,
127
140
drizzled::message::Table&);
129
142
int doGetTableDefinition(Session& session,
130
const drizzled::TableIdentifier &identifier,
131
drizzled::message::Table &table_message);
133
int doDropTable(Session&, const drizzled::TableIdentifier &identifier);
145
const char *table_name,
147
drizzled::message::Table *table_proto);
149
/* Temp only engine, so do not return values. */
150
void doGetTableNames(drizzled::CachedDirectory &, string& , set<string>&) { };
152
int doDropTable(Session&, const string table_path);
134
153
TinaShare *findOpenTable(const string table_name);
135
154
void addOpenTable(const string &table_name, TinaShare *);
136
155
void deleteOpenTable(const string &table_name);
139
158
uint32_t max_keys() const { return 0; }
140
159
uint32_t max_key_parts() const { return 0; }
141
160
uint32_t max_key_length() const { return 0; }
142
bool doDoesTableExist(Session& session, const drizzled::TableIdentifier &identifier);
143
int doRenameTable(Session&, const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to);
145
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
146
const drizzled::SchemaIdentifier &schema_identifier,
147
drizzled::TableIdentifier::vector &set_of_identifiers);
150
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
151
const drizzled::SchemaIdentifier&,
152
drizzled::TableIdentifier::vector&)
156
int Tina::doRenameTable(Session &session,
157
const drizzled::TableIdentifier &from, const drizzled::TableIdentifier &to)
160
for (const char **ext= bas_ext(); *ext ; ext++)
162
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
164
if ((error=errno) != ENOENT)
170
session.getMessageCache().renameTableMessage(from, to);
175
bool Tina::doDoesTableExist(Session &session, const drizzled::TableIdentifier &identifier)
177
return session.getMessageCache().doesTableMessageExist(identifier);
181
int Tina::doDropTable(Session &session,
182
const drizzled::TableIdentifier &identifier)
163
int Tina::doDropTable(Session&,
164
const string table_path)
185
167
int enoent_or_zero= ENOENT; // Error if no file was deleted
168
char buff[FN_REFLEN];
169
ProtoCache::iterator iter;
187
171
for (const char **ext= bas_ext(); *ext ; ext++)
189
std::string full_name= identifier.getPath();
190
full_name.append(*ext);
192
if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
173
internal::fn_format(buff, table_path.c_str(), "", *ext,
174
MY_UNPACK_FILENAME|MY_APPEND_EXT);
175
if (internal::my_delete_with_symlink(buff, MYF(0)))
194
177
if ((error= errno) != ENOENT)
199
181
enoent_or_zero= 0; // No error for ENOENT
201
182
error= enoent_or_zero;
204
session.getMessageCache().removeTableMessage(identifier);
185
pthread_mutex_lock(&proto_cache_mutex);
186
iter= proto_cache.find(table_path.c_str());
188
if (iter!= proto_cache.end())
189
proto_cache.erase(iter);
190
pthread_mutex_unlock(&proto_cache_mutex);
231
int Tina::doGetTableDefinition(Session &session,
232
const drizzled::TableIdentifier &identifier,
233
drizzled::message::Table &table_message)
217
int Tina::doGetTableDefinition(Session&,
222
drizzled::message::Table *table_proto)
235
if (session.getMessageCache().getTableMessage(identifier, table_message))
225
ProtoCache::iterator iter;
227
pthread_mutex_lock(&proto_cache_mutex);
228
iter= proto_cache.find(path);
230
if (iter!= proto_cache.end())
233
table_proto->CopyFrom(((*iter).second));
236
pthread_mutex_unlock(&proto_cache_mutex);
242
242
static Tina *tina_engine= NULL;
244
static int tina_init_func(drizzled::module::Context &context)
244
static int tina_init_func(drizzled::plugin::Registry ®istry)
247
247
tina_engine= new Tina("CSV");
248
context.add(tina_engine);
248
registry.add(tina_engine);
250
250
pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
256
TinaShare::TinaShare(const std::string &table_name_arg) :
257
table_name(table_name_arg),
258
data_file_name(table_name_arg),
260
saved_data_file_length(0),
261
update_file_opened(false),
262
tina_write_opened(false),
267
data_file_name.append(CSV_EXT);
254
static int tina_done_func(drizzled::plugin::Registry ®istry)
256
registry.remove(tina_engine);
259
pthread_mutex_destroy(&tina_mutex);
265
TinaShare::TinaShare(const char *table_name_arg)
266
: table_name(table_name_arg), use_count(0), saved_data_file_length(0),
267
update_file_opened(false), tina_write_opened(false),
268
crashed(false), rows_recorded(0), data_file_version(0)
270
thr_lock_init(&lock);
271
internal::fn_format(data_file_name, table_name_arg, "", CSV_EXT,
272
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
270
275
TinaShare::~TinaShare()
277
thr_lock_delete(&lock);
272
278
pthread_mutex_destroy(&mutex);
276
282
Simple lock controls.
278
TinaShare *ha_tina::get_share(const std::string &table_name)
284
TinaShare *ha_tina::get_share(const char *table_name)
280
286
pthread_mutex_lock(&tina_mutex);
282
Tina *a_tina= static_cast<Tina *>(getEngine());
288
Tina *a_tina= static_cast<Tina *>(engine);
283
289
share= a_tina->findOpenTable(table_name);
285
std::string meta_file_name;
291
char meta_file_name[FN_REFLEN];
286
292
struct stat file_stat;
302
meta_file_name.assign(table_name);
303
meta_file_name.append(CSM_EXT);
308
internal::fn_format(meta_file_name, table_name, "", CSM_EXT,
309
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
305
if (stat(share->data_file_name.c_str(), &file_stat))
311
if (stat(share->data_file_name, &file_stat))
307
313
pthread_mutex_unlock(&tina_mutex);
533
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
539
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
534
540
:Cursor(engine_arg, table_arg),
536
542
These definitions are found in Cursor.h
537
543
They are not probably completely right.
539
545
current_position(0), next_position(0), local_saved_data_file_length(0),
540
file_buff(0), local_data_file_version(0), records_is_known(0)
546
file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
547
local_data_file_version(0), records_is_known(0)
542
549
/* Set our original buffers from pre-allocated memory */
543
550
buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
544
552
file_buff= new Transparent_file();
646
654
int ha_tina::chain_append()
648
if (chain.size() > 0 && chain.back().second == current_position)
649
chain.back().second= next_position;
656
if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
657
(chain_ptr -1)->end= next_position;
651
chain.push_back(make_pair(current_position, next_position));
660
/* We set up for the next position */
661
if ((off_t)(chain_ptr - chain) == (chain_size -1))
663
off_t location= chain_ptr - chain;
664
chain_size += DEFAULT_CHAIN_LENGTH;
667
if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
672
tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
675
memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
679
chain_ptr= chain + location;
681
chain_ptr->begin= current_position;
682
chain_ptr->end= next_position;
762
795
memcpy(&src, blob->ptr + packlength, sizeof(char*));
765
tgt= (unsigned char*) blobroot.alloc_root(length);
798
tgt= (unsigned char*) alloc_root(&blobroot, length);
766
799
memmove(tgt, src, length);
767
800
memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
814
Three functions below are needed to enable concurrent insert functionality
815
for CSV engine. For more details see mysys/thr_lock.c
818
void tina_get_status(void* param, int)
820
ha_tina *tina= (ha_tina*) param;
824
void tina_update_status(void* param)
826
ha_tina *tina= (ha_tina*) param;
827
tina->update_status();
830
/* this should exist and return 0 for concurrent insert to work */
831
bool tina_check_status(void *)
837
Save the state of the table
843
This function is used to retrieve the file length. During the lock
844
phase of concurrent insert. For more details see comment to
845
ha_tina::update_status below.
848
void ha_tina::get_status()
850
local_saved_data_file_length= share->saved_data_file_length;
855
Correct the state of the table. Called by unlock routines
856
before the write lock is released.
862
When we employ concurrent insert lock, we save current length of the file
863
during the lock phase. We do not read further saved value, as we don't
864
want to interfere with undergoing concurrent insert. Writers update file
865
length info during unlock with update_status().
868
For log tables concurrent insert works different. The reason is that
869
log tables are always opened and locked. And as they do not unlock
870
tables, the file length after writes should be updated in a different
874
void ha_tina::update_status()
876
/* correct local_saved_data_file_length for writers */
877
share->saved_data_file_length= local_saved_data_file_length;
781
882
Open a database file. Keep in mind that tables are caches, so
782
883
this will not be called for every request. Any sort of positions
783
884
that need to be reset should be kept in the ::extra() call.
785
int ha_tina::doOpen(const TableIdentifier &identifier, int , uint32_t )
886
int ha_tina::open(const char *name, int, uint32_t)
787
if (not (share= get_share(identifier.getPath().c_str())))
888
if (!(share= get_share(name)))
790
891
if (share->crashed)
802
903
so that they could save/update local_saved_data_file_length value
803
904
during locking. This is needed to enable concurrent inserts.
906
thr_lock_data_init(&share->lock, &lock, (void*) this);
805
907
ref_length=sizeof(off_t);
909
share->lock.get_status= tina_get_status;
910
share->lock.update_status= tina_update_status;
911
share->lock.check_status= tina_check_status;
811
918
Close a database file. We remove ourselves from the shared strucutre.
812
919
If it is empty we destroy it.
881
990
This will be called in a table scan right before the previous ::rnd_next()
884
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
993
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
998
ha_statistic_increment(&system_status_var::ha_update_count);
1000
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1001
table->timestamp_field->set_time();
889
1003
size= encode_quote(new_data);
892
1006
During update we mark each updating record as deleted
893
1007
(see the chain_append()) then write new one to the temporary data file.
894
At the end of the sequence in the doEndTableScan() we append all non-marked
1008
At the end of the sequence in the rnd_end() we append all non-marked
895
1009
records from the data file to the temporary data file then rename it.
896
1010
The temp_file_length is used to calculate new data file length.
958
1073
local_data_file_version= share->data_file_version;
959
1074
if (internal::my_close(data_file, MYF(0)) ||
960
(data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
1075
(data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
963
1078
file_buff->init_buff(data_file);
1091
1206
to the given "hole", stored in the buffer. "Valid" here means,
1092
1207
not listed in the chain of deleted records ("holes").
1094
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
1209
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1096
if (closest_hole == chain.end()) /* no more chains */
1211
if (closest_hole == chain_ptr) /* no more chains */
1097
1212
*end_pos= file_buff->end();
1099
1214
*end_pos= std::min(file_buff->end(),
1100
closest_hole->first);
1101
return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
1215
closest_hole->begin);
1216
return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1108
1223
slots to clean up all of the dead space we have collected while
1109
1224
performing deletes/updates.
1111
int ha_tina::doEndTableScan()
1226
int ha_tina::rnd_end()
1228
char updated_fname[FN_REFLEN];
1113
1229
off_t file_buffer_start= 0;
1115
blobroot.free_root(MYF(0));
1231
free_root(&blobroot, MYF(0));
1116
1232
records_is_known= 1;
1118
if (chain.size() > 0)
1234
if ((chain_ptr - chain) > 0)
1120
vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
1236
tina_set *ptr= chain;
1123
1239
Re-read the beginning of a file (as the buffer should point to the
1129
1245
The sort is needed when there were updates/deletes with random orders.
1130
1246
It sorts so that we move the firts blocks to the beginning.
1132
sort(chain.begin(), chain.end());
1248
internal::my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1249
(qsort_cmp)sort_set);
1134
1251
off_t write_begin= 0, write_end;
1194
1311
Close opened fildes's. Then move updated file in place
1195
1312
of the old datafile.
1197
std::string rename_file= share->table_name;
1198
rename_file.append(CSN_EXT);
1199
1314
if (internal::my_close(data_file, MYF(0)) ||
1200
internal::my_rename(rename_file.c_str(),
1201
share->data_file_name.c_str(), MYF(0)))
1315
internal::my_rename(internal::fn_format(updated_fname,
1316
share->table_name.c_str(),
1318
MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1319
share->data_file_name, MYF(0)))
1204
1322
/* Open the file again */
1205
if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
1323
if (((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1208
1326
As we reopened the data file, increase share->data_file_version
1267
1385
this (the database will call ::open() if it needs to).
1270
int Tina::doCreateTable(Session &session,
1388
int Tina::doCreateTable(Session *, const char *table_name,
1271
1389
Table& table_arg,
1272
const drizzled::TableIdentifier &identifier,
1273
drizzled::message::Table &create_proto)
1390
drizzled::message::Table& create_proto)
1275
1392
char name_buff[FN_REFLEN];
1276
1393
int create_file;
1281
const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1282
for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1283
iter != fields.end();
1398
for (Field **field= table_arg.s->field; *field; field++)
1286
if (not *iter) // Historical legacy for NULL array end.
1289
if ((*iter)->real_maybe_null())
1400
if ((*field)->real_maybe_null())
1291
1402
my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1292
1403
return(HA_ERR_UNSUPPORTED);
1297
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1298
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1299
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1408
if ((create_file= internal::my_create(internal::fn_format(name_buff, table_name, "", CSM_EXT,
1409
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1410
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1302
1413
write_meta_file(create_file, 0, false);
1303
1414
internal::my_close(create_file, MYF(0));
1305
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1306
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1307
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1416
if ((create_file= internal::my_create(internal::fn_format(name_buff, table_name, "", CSV_EXT,
1417
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1418
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1310
1421
internal::my_close(create_file, MYF(0));
1312
session.getMessageCache().storeTableMessage(identifier, create_proto);
1423
pthread_mutex_lock(&proto_cache_mutex);
1424
proto_cache.insert(make_pair(table_name, create_proto));
1425
pthread_mutex_unlock(&proto_cache_mutex);