12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
Make sure to look at ha_tina.h for more details.
43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
#include <drizzled/field/timestamp.h>
46
47
#include <drizzled/error.h>
47
48
#include <drizzled/table.h>
48
49
#include <drizzled/session.h>
49
#include "drizzled/internal/my_sys.h"
51
51
#include "ha_tina.h"
60
56
using namespace std;
61
using namespace drizzled;
64
59
unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
74
69
#define CSM_EXT ".CSM" // Meta file
77
static int read_meta_file(int meta_file, ha_rows *rows);
78
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
72
static int read_meta_file(File meta_file, ha_rows *rows);
73
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
75
extern "C" void tina_get_status(void* param, int concurrent_insert);
76
extern "C" void tina_update_status(void* param);
77
extern "C" bool tina_check_status(void* param);
80
79
/* Stuff for shares */
81
80
pthread_mutex_t tina_mutex;
85
84
*****************************************************************************/
87
Used for sorting chains with qsort().
89
static int sort_set (tina_set *a, tina_set *b)
92
We assume that intervals do not intersect. So, it is enought to compare
93
any two points. Here we take start of intervals for comparison.
95
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
88
100
If frm_error() is called in table.cc this is called to find out what file
89
101
extensions exist for this Cursor.
103
115
: drizzled::plugin::StorageEngine(name_arg,
104
116
HTON_TEMPORARY_ONLY |
105
117
HTON_NO_AUTO_INCREMENT |
106
HTON_SKIP_STORE_LOCK),
118
HTON_HAS_DATA_DICTIONARY |
119
HTON_SKIP_STORE_LOCK |
107
121
tina_open_tables()
111
pthread_mutex_destroy(&tina_mutex);
114
virtual Cursor *create(Table &table)
116
return new ha_tina(*this, table);
123
virtual Cursor *create(TableShare &table,
126
return new (mem_root) ha_tina(*this, table);
119
129
const char **bas_ext() const {
120
130
return ha_tina_exts;
123
int doCreateTable(Session &,
125
const drizzled::identifier::Table &identifier,
133
int doCreateTable(Session *,
134
const char *table_name,
126
136
drizzled::message::Table&);
128
138
int doGetTableDefinition(Session& session,
129
const drizzled::identifier::Table &identifier,
130
drizzled::message::Table &table_message);
132
int doDropTable(Session&, const drizzled::identifier::Table &identifier);
141
const char *table_name,
143
drizzled::message::Table *table_proto);
145
/* Temp only engine, so do not return values. */
146
void doGetTableNames(CachedDirectory &, string& , set<string>&) { };
148
int doDropTable(Session&, const string table_path);
133
149
TinaShare *findOpenTable(const string table_name);
134
150
void addOpenTable(const string &table_name, TinaShare *);
135
151
void deleteOpenTable(const string &table_name);
138
154
uint32_t max_keys() const { return 0; }
139
155
uint32_t max_key_parts() const { return 0; }
140
156
uint32_t max_key_length() const { return 0; }
141
bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
144
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
const drizzled::identifier::Schema &schema_identifier,
146
drizzled::identifier::Table::vector &set_of_identifiers);
149
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
const drizzled::identifier::Schema&,
151
drizzled::identifier::Table::vector&)
155
int Tina::doRenameTable(Session &session,
156
const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
159
for (const char **ext= bas_ext(); *ext ; ext++)
161
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
if ((error=errno) != ENOENT)
169
session.getMessageCache().renameTableMessage(from, to);
174
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
176
return session.getMessageCache().doesTableMessageExist(identifier);
180
int Tina::doDropTable(Session &session,
181
const drizzled::identifier::Table &identifier)
159
int Tina::doDropTable(Session&,
160
const string table_path)
184
163
int enoent_or_zero= ENOENT; // Error if no file was deleted
164
char buff[FN_REFLEN];
165
ProtoCache::iterator iter;
186
167
for (const char **ext= bas_ext(); *ext ; ext++)
188
std::string full_name= identifier.getPath();
189
full_name.append(*ext);
191
if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
169
fn_format(buff, table_path.c_str(), "", *ext,
170
MY_UNPACK_FILENAME|MY_APPEND_EXT);
171
if (my_delete_with_symlink(buff, MYF(0)))
193
if ((error= errno) != ENOENT)
173
if ((error= my_errno) != ENOENT)
198
177
enoent_or_zero= 0; // No error for ENOENT
200
178
error= enoent_or_zero;
203
session.getMessageCache().removeTableMessage(identifier);
181
pthread_mutex_lock(&proto_cache_mutex);
182
iter= proto_cache.find(table_path.c_str());
184
if (iter!= proto_cache.end())
185
proto_cache.erase(iter);
186
pthread_mutex_unlock(&proto_cache_mutex);
230
int Tina::doGetTableDefinition(Session &session,
231
const drizzled::identifier::Table &identifier,
232
drizzled::message::Table &table_message)
213
int Tina::doGetTableDefinition(Session&,
218
drizzled::message::Table *table_proto)
234
if (session.getMessageCache().getTableMessage(identifier, table_message))
221
ProtoCache::iterator iter;
223
pthread_mutex_lock(&proto_cache_mutex);
224
iter= proto_cache.find(path);
226
if (iter!= proto_cache.end())
229
table_proto->CopyFrom(((*iter).second));
232
pthread_mutex_unlock(&proto_cache_mutex);
241
238
static Tina *tina_engine= NULL;
243
static int tina_init_func(drizzled::module::Context &context)
240
static int tina_init_func(drizzled::plugin::Registry ®istry)
246
243
tina_engine= new Tina("CSV");
247
context.add(tina_engine);
244
registry.add(tina_engine);
249
246
pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
255
TinaShare::TinaShare(const std::string &table_name_arg) :
256
table_name(table_name_arg),
257
data_file_name(table_name_arg),
259
saved_data_file_length(0),
260
update_file_opened(false),
261
tina_write_opened(false),
266
data_file_name.append(CSV_EXT);
250
static int tina_done_func(drizzled::plugin::Registry ®istry)
252
registry.remove(tina_engine);
255
pthread_mutex_destroy(&tina_mutex);
261
TinaShare::TinaShare(const char *table_name_arg)
262
: table_name(table_name_arg), use_count(0), saved_data_file_length(0),
263
update_file_opened(false), tina_write_opened(false),
264
crashed(false), rows_recorded(0), data_file_version(0)
266
thr_lock_init(&lock);
267
fn_format(data_file_name, table_name_arg, "", CSV_EXT,
268
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
269
271
TinaShare::~TinaShare()
273
thr_lock_delete(&lock);
271
274
pthread_mutex_destroy(&mutex);
275
278
Simple lock controls.
277
TinaShare *ha_tina::get_share(const std::string &table_name)
280
TinaShare *ha_tina::get_share(const char *table_name)
279
282
pthread_mutex_lock(&tina_mutex);
281
Tina *a_tina= static_cast<Tina *>(getEngine());
284
Tina *a_tina= static_cast<Tina *>(engine);
282
285
share= a_tina->findOpenTable(table_name);
284
std::string meta_file_name;
287
char meta_file_name[FN_REFLEN];
285
288
struct stat file_stat;
301
meta_file_name.assign(table_name);
302
meta_file_name.append(CSM_EXT);
304
fn_format(meta_file_name, table_name, "", CSM_EXT,
305
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
304
if (stat(share->data_file_name.c_str(), &file_stat))
307
if (stat(share->data_file_name, &file_stat))
306
309
pthread_mutex_unlock(&tina_mutex);
357
360
non-zero - error occurred
360
static int read_meta_file(int meta_file, ha_rows *rows)
363
static int read_meta_file(File meta_file, ha_rows *rows)
362
365
unsigned char meta_buffer[META_BUFFER_SIZE];
363
366
unsigned char *ptr= meta_buffer;
365
368
lseek(meta_file, 0, SEEK_SET);
366
if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
369
if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
367
370
!= META_BUFFER_SIZE)
368
371
return(HA_ERR_CRASHED_ON_USAGE);
430
433
*ptr= (unsigned char)dirty;
432
435
lseek(meta_file, 0, SEEK_SET);
433
if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
436
if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
434
437
!= META_BUFFER_SIZE)
437
internal::my_sync(meta_file, MYF(MY_WME));
440
my_sync(meta_file, MYF(MY_WME));
471
474
/* Write the meta file. Mark it as crashed if needed. */
472
475
(void)write_meta_file(share->meta_file, share->rows_recorded,
473
476
share->crashed ? true :false);
474
if (internal::my_close(share->meta_file, MYF(0)))
477
if (my_close(share->meta_file, MYF(0)))
476
479
if (share->tina_write_opened)
478
if (internal::my_close(share->tina_write_filedes, MYF(0)))
481
if (my_close(share->tina_write_filedes, MYF(0)))
480
483
share->tina_write_opened= false;
483
Tina *a_tina= static_cast<Tina *>(getEngine());
486
Tina *a_tina= static_cast<Tina *>(engine);
484
487
a_tina->deleteOpenTable(share->table_name);
532
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
535
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
533
536
:Cursor(engine_arg, table_arg),
535
538
These definitions are found in Cursor.h
536
539
They are not probably completely right.
538
541
current_position(0), next_position(0), local_saved_data_file_length(0),
539
file_buff(0), local_data_file_version(0), records_is_known(0)
542
file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
543
local_data_file_version(0), records_is_known(0)
541
545
/* Set our original buffers from pre-allocated memory */
542
546
buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
543
548
file_buff= new Transparent_file();
645
650
int ha_tina::chain_append()
647
if (chain.size() > 0 && chain.back().second == current_position)
648
chain.back().second= next_position;
652
if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
653
(chain_ptr -1)->end= next_position;
650
chain.push_back(make_pair(current_position, next_position));
656
/* We set up for the next position */
657
if ((off_t)(chain_ptr - chain) == (chain_size -1))
659
off_t location= chain_ptr - chain;
660
chain_size += DEFAULT_CHAIN_LENGTH;
663
if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
668
tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
671
memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
675
chain_ptr= chain + location;
677
chain_ptr->begin= current_position;
678
chain_ptr->end= next_position;
761
791
memcpy(&src, blob->ptr + packlength, sizeof(char*));
764
tgt= (unsigned char*) blobroot.alloc_root(length);
794
tgt= (unsigned char*) alloc_root(&blobroot, length);
765
795
memmove(tgt, src, length);
766
796
memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
810
Three functions below are needed to enable concurrent insert functionality
811
for CSV engine. For more details see mysys/thr_lock.c
814
void tina_get_status(void* param, int)
816
ha_tina *tina= (ha_tina*) param;
820
void tina_update_status(void* param)
822
ha_tina *tina= (ha_tina*) param;
823
tina->update_status();
826
/* this should exist and return 0 for concurrent insert to work */
827
bool tina_check_status(void *)
833
Save the state of the table
839
This function is used to retrieve the file length. During the lock
840
phase of concurrent insert. For more details see comment to
841
ha_tina::update_status below.
844
void ha_tina::get_status()
846
local_saved_data_file_length= share->saved_data_file_length;
851
Correct the state of the table. Called by unlock routines
852
before the write lock is released.
858
When we employ concurrent insert lock, we save current length of the file
859
during the lock phase. We do not read further saved value, as we don't
860
want to interfere with undergoing concurrent insert. Writers update file
861
length info during unlock with update_status().
864
For log tables concurrent insert works different. The reason is that
865
log tables are always opened and locked. And as they do not unlock
866
tables, the file length after writes should be updated in a different
870
void ha_tina::update_status()
872
/* correct local_saved_data_file_length for writers */
873
share->saved_data_file_length= local_saved_data_file_length;
780
878
Open a database file. Keep in mind that tables are caches, so
781
879
this will not be called for every request. Any sort of positions
782
880
that need to be reset should be kept in the ::extra() call.
784
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
882
int ha_tina::open(const char *name, int, uint32_t)
786
if (not (share= get_share(identifier.getPath().c_str())))
884
if (!(share= get_share(name)))
789
887
if (share->crashed)
801
899
so that they could save/update local_saved_data_file_length value
802
900
during locking. This is needed to enable concurrent inserts.
902
thr_lock_data_init(&share->lock, &lock, (void*) this);
804
903
ref_length=sizeof(off_t);
905
share->lock.get_status= tina_get_status;
906
share->lock.update_status= tina_update_status;
907
share->lock.check_status= tina_check_status;
810
914
Close a database file. We remove ourselves from the shared strucutre.
811
915
If it is empty we destroy it.
861
967
if (!share->update_file_opened)
863
969
if ((update_temp_file=
864
internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
970
my_create(fn_format(updated_fname, share->table_name.c_str(),
866
972
MY_REPLACE_EXT | MY_UNPACK_FILENAME),
867
973
0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
880
986
This will be called in a table scan right before the previous ::rnd_next()
883
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
989
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
994
ha_statistic_increment(&SSV::ha_update_count);
996
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
997
table->timestamp_field->set_time();
888
999
size= encode_quote(new_data);
891
1002
During update we mark each updating record as deleted
892
1003
(see the chain_append()) then write new one to the temporary data file.
893
At the end of the sequence in the doEndTableScan() we append all non-marked
1004
At the end of the sequence in the rnd_end() we append all non-marked
894
1005
records from the data file to the temporary data file then rename it.
895
1006
The temp_file_length is used to calculate new data file length.
955
1067
if (local_data_file_version != share->data_file_version)
957
1069
local_data_file_version= share->data_file_version;
958
if (internal::my_close(data_file, MYF(0)) ||
959
(data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
1070
if (my_close(data_file, MYF(0)) ||
1071
(data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
962
1074
file_buff->init_buff(data_file);
1056
1168
void ha_tina::position(const unsigned char *)
1058
internal::my_store_ptr(ref, ref_length, current_position);
1170
my_store_ptr(ref, ref_length, current_position);
1064
1176
Used to fetch a row from a posiion stored with ::position().
1065
internal::my_get_ptr() retrieves the data for you.
1177
my_get_ptr() retrieves the data for you.
1068
1180
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1070
ha_statistic_increment(&system_status_var::ha_read_rnd_count);
1071
current_position= (off_t)internal::my_get_ptr(pos,ref_length);
1182
ha_statistic_increment(&SSV::ha_read_rnd_count);
1183
current_position= (off_t)my_get_ptr(pos,ref_length);
1072
1184
return(find_current_row(buf));
1090
1202
to the given "hole", stored in the buffer. "Valid" here means,
1091
1203
not listed in the chain of deleted records ("holes").
1093
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
1205
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1095
if (closest_hole == chain.end()) /* no more chains */
1207
if (closest_hole == chain_ptr) /* no more chains */
1096
1208
*end_pos= file_buff->end();
1098
1210
*end_pos= std::min(file_buff->end(),
1099
closest_hole->first);
1100
return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
1211
closest_hole->begin);
1212
return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1107
1219
slots to clean up all of the dead space we have collected while
1108
1220
performing deletes/updates.
1110
int ha_tina::doEndTableScan()
1222
int ha_tina::rnd_end()
1224
char updated_fname[FN_REFLEN];
1112
1225
off_t file_buffer_start= 0;
1114
blobroot.free_root(MYF(0));
1227
free_root(&blobroot, MYF(0));
1115
1228
records_is_known= 1;
1117
if (chain.size() > 0)
1230
if ((chain_ptr - chain) > 0)
1119
vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
1232
tina_set *ptr= chain;
1122
1235
Re-read the beginning of a file (as the buffer should point to the
1128
1241
The sort is needed when there were updates/deletes with random orders.
1129
1242
It sorts so that we move the firts blocks to the beginning.
1131
sort(chain.begin(), chain.end());
1244
my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1245
(qsort_cmp)sort_set);
1133
1247
off_t write_begin= 0, write_end;
1149
1263
/* if there is something to write, write it */
1150
1264
if (write_length)
1152
if (internal::my_write(update_temp_file,
1266
if (my_write(update_temp_file,
1153
1267
(unsigned char*) (file_buff->ptr() +
1154
1268
(write_begin - file_buff->start())),
1155
1269
(size_t)write_length, MYF_RW))
1175
if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
1176
internal::my_close(update_temp_file, MYF(0)))
1289
if (my_sync(update_temp_file, MYF(MY_WME)) ||
1290
my_close(update_temp_file, MYF(0)))
1179
1293
share->update_file_opened= false;
1181
1295
if (share->tina_write_opened)
1183
if (internal::my_close(share->tina_write_filedes, MYF(0)))
1297
if (my_close(share->tina_write_filedes, MYF(0)))
1186
1300
Mark that the writer fd is closed, so that init_tina_writer()
1193
1307
Close opened fildes's. Then move updated file in place
1194
1308
of the old datafile.
1196
std::string rename_file= share->table_name;
1197
rename_file.append(CSN_EXT);
1198
if (internal::my_close(data_file, MYF(0)) ||
1199
internal::my_rename(rename_file.c_str(),
1200
share->data_file_name.c_str(), MYF(0)))
1310
if (my_close(data_file, MYF(0)) ||
1311
my_rename(fn_format(updated_fname, share->table_name.c_str(),
1313
MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1314
share->data_file_name, MYF(0)))
1203
1317
/* Open the file again */
1204
if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
1318
if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1207
1321
As we reopened the data file, increase share->data_file_version
1266
1380
this (the database will call ::open() if it needs to).
1269
int Tina::doCreateTable(Session &session,
1383
int Tina::doCreateTable(Session *, const char *table_name,
1270
1384
Table& table_arg,
1271
const drizzled::identifier::Table &identifier,
1272
drizzled::message::Table &create_proto)
1385
drizzled::message::Table& create_proto)
1274
1387
char name_buff[FN_REFLEN];
1280
const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
iter != fields.end();
1393
for (Field **field= table_arg.s->field; *field; field++)
1285
if (not *iter) // Historical legacy for NULL array end.
1288
if ((*iter)->real_maybe_null())
1395
if ((*field)->real_maybe_null())
1290
1397
my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1398
return(HA_ERR_UNSUPPORTED);
1296
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1403
if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
1404
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1405
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1301
1408
write_meta_file(create_file, 0, false);
1302
internal::my_close(create_file, MYF(0));
1409
my_close(create_file, MYF(0));
1304
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1411
if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
1412
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1413
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1309
internal::my_close(create_file, MYF(0));
1416
my_close(create_file, MYF(0));
1311
session.getMessageCache().storeTableMessage(identifier, create_proto);
1418
pthread_mutex_lock(&proto_cache_mutex);
1419
proto_cache.insert(make_pair(table_name, create_proto));
1420
pthread_mutex_unlock(&proto_cache_mutex);
1317
1426
DRIZZLE_DECLARE_PLUGIN
1322
1430
"Brian Aker, MySQL AB",
1323
1431
"CSV storage engine",
1324
1432
PLUGIN_LICENSE_GPL,
1325
1433
tina_init_func, /* Plugin Init */
1434
tina_done_func, /* Plugin Deinit */
1435
NULL, /* status variables */
1436
NULL, /* system variables */
1327
1437
NULL /* config options */
1329
1439
DRIZZLE_DECLARE_PLUGIN_END;