43
#include <drizzled/server_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
#include <drizzled/field/timestamp.h>
47
#include <storage/csv/ha_tina.h>
46
48
#include <drizzled/error.h>
47
49
#include <drizzled/table.h>
48
50
#include <drizzled/session.h>
49
#include <drizzled/internal/my_sys.h>
61
using namespace drizzled;
64
53
unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
66
static const int META_BUFFER_SIZE = sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t)
67
+ sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char);
68
static const int TINA_CHECK_HEADER = 254; // The number we use to determine corruption
69
static const int BLOB_MEMROOT_ALLOC_SIZE = 8192;
55
#define META_BUFFER_SIZE sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t) \
56
+ sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char)
57
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
58
#define BLOB_MEMROOT_ALLOC_SIZE 8192
71
60
/* The file extension */
72
static const char* CSV_EXT = ".CSV" // The data file
73
static const char* CSN_EXT = ".CSN" // Files used during repair and update
74
static const char* CSM_EXT = ".CSM" // Meta file
77
static int read_meta_file(int meta_file, ha_rows *rows);
78
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
61
#define CSV_EXT ".CSV" // The data file
62
#define CSN_EXT ".CSN" // Files used during repair and update
63
#define CSM_EXT ".CSM" // Meta file
66
static TINA_SHARE *get_share(const char *table_name, Table *table);
67
static int free_share(TINA_SHARE *share);
68
static int read_meta_file(File meta_file, ha_rows *rows);
69
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
71
extern "C" void tina_get_status(void* param, int concurrent_insert);
72
extern "C" void tina_update_status(void* param);
73
extern "C" bool tina_check_status(void* param);
80
75
/* Stuff for shares */
81
76
pthread_mutex_t tina_mutex;
77
static HASH tina_open_tables;
78
static handler *tina_create_handler(handlerton *hton,
83
83
/*****************************************************************************
85
85
*****************************************************************************/
88
If frm_error() is called in table.cc this is called to find out what file
89
extensions exist for this Cursor.
88
Used for sorting chains with qsort().
91
static const char *ha_tina_exts[] = {
97
class Tina : public drizzled::plugin::StorageEngine
99
typedef std::map<string, TinaShare*> TinaMap;
100
TinaMap tina_open_tables;
102
Tina(const string& name_arg)
103
: drizzled::plugin::StorageEngine(name_arg,
104
HTON_TEMPORARY_ONLY |
105
HTON_NO_AUTO_INCREMENT |
106
HTON_SKIP_STORE_LOCK),
111
pthread_mutex_destroy(&tina_mutex);
114
virtual Cursor *create(Table &table)
116
return new ha_tina(*this, table);
119
const char **bas_ext() const {
123
int doCreateTable(Session &,
125
const drizzled::identifier::Table &identifier,
126
drizzled::message::Table&);
128
int doGetTableDefinition(Session& session,
129
const drizzled::identifier::Table &identifier,
130
drizzled::message::Table &table_message);
132
int doDropTable(Session&, const drizzled::identifier::Table &identifier);
133
TinaShare *findOpenTable(const string table_name);
134
void addOpenTable(const string &table_name, TinaShare *);
135
void deleteOpenTable(const string &table_name);
138
uint32_t max_keys() const { return 0; }
139
uint32_t max_key_parts() const { return 0; }
140
uint32_t max_key_length() const { return 0; }
141
bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
144
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
const drizzled::identifier::Schema &schema_identifier,
146
drizzled::identifier::Table::vector &set_of_identifiers);
149
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
const drizzled::identifier::Schema&,
151
drizzled::identifier::Table::vector&)
155
int Tina::doRenameTable(Session &session,
156
const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
159
for (const char **ext= bas_ext(); *ext ; ext++)
161
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
if ((error=errno) != ENOENT)
169
session.getMessageCache().renameTableMessage(from, to);
174
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
176
return session.getMessageCache().doesTableMessageExist(identifier);
180
int Tina::doDropTable(Session &session,
181
const drizzled::identifier::Table &identifier)
184
int enoent_or_zero= ENOENT; // Error if no file was deleted
186
for (const char **ext= bas_ext(); *ext ; ext++)
188
std::string full_name= identifier.getPath();
189
full_name.append(*ext);
191
if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
193
if ((error= errno) != ENOENT)
198
enoent_or_zero= 0; // No error for ENOENT
200
error= enoent_or_zero;
203
session.getMessageCache().removeTableMessage(identifier);
208
TinaShare *Tina::findOpenTable(const string table_name)
210
TinaMap::iterator find_iter=
211
tina_open_tables.find(table_name);
213
if (find_iter != tina_open_tables.end())
214
return (*find_iter).second;
219
void Tina::addOpenTable(const string &table_name, TinaShare *share)
221
tina_open_tables[table_name]= share;
224
void Tina::deleteOpenTable(const string &table_name)
226
tina_open_tables.erase(table_name);
230
int Tina::doGetTableDefinition(Session &session,
231
const drizzled::identifier::Table &identifier,
232
drizzled::message::Table &table_message)
234
if (session.getMessageCache().getTableMessage(identifier, table_message))
241
static Tina *tina_engine= NULL;
243
static int tina_init_func(drizzled::module::Context &context)
246
tina_engine= new Tina("CSV");
247
context.add(tina_engine);
90
int sort_set (tina_set *a, tina_set *b)
93
We assume that intervals do not intersect. So, it is enought to compare
94
any two points. Here we take start of intervals for comparison.
96
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
99
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
101
*length=share->table_name_length;
102
return (unsigned char*) share->table_name;
105
static int tina_init_func(void *p)
107
handlerton *tina_hton;
109
tina_hton= (handlerton *)p;
249
110
pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
255
TinaShare::TinaShare(const std::string &table_name_arg) :
256
table_name(table_name_arg),
257
data_file_name(table_name_arg),
259
saved_data_file_length(0),
260
update_file_opened(false),
261
tina_write_opened(false),
266
data_file_name.append(CSV_EXT);
269
TinaShare::~TinaShare()
271
pthread_mutex_destroy(&mutex);
111
(void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
112
(hash_get_key) tina_get_key,0,0);
113
tina_hton->state= SHOW_OPTION_YES;
114
tina_hton->create= tina_create_handler;
115
tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES |
120
static int tina_done_func(void *)
122
hash_free(&tina_open_tables);
123
pthread_mutex_destroy(&tina_mutex);
275
130
Simple lock controls.
277
TinaShare *ha_tina::get_share(const std::string &table_name)
132
static TINA_SHARE *get_share(const char *table_name, Table *)
279
pthread_mutex_lock(&tina_mutex);
281
Tina *a_tina= static_cast<Tina *>(getEngine());
282
share= a_tina->findOpenTable(table_name);
284
std::string meta_file_name;
135
char meta_file_name[FN_REFLEN];
285
136
struct stat file_stat;
140
pthread_mutex_lock(&tina_mutex);
141
length=(uint) strlen(table_name);
288
144
If share is not present in the hash, create a new share and
289
145
initialize its members.
147
if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
148
(unsigned char*) table_name,
293
share= new TinaShare(table_name);
297
pthread_mutex_unlock(&tina_mutex);
301
meta_file_name.assign(table_name);
302
meta_file_name.append(CSM_EXT);
304
if (stat(share->data_file_name.c_str(), &file_stat))
306
pthread_mutex_unlock(&tina_mutex);
151
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
152
&share, sizeof(*share),
156
pthread_mutex_unlock(&tina_mutex);
161
share->table_name_length= length;
162
share->table_name= tmp_name;
163
share->crashed= false;
164
share->rows_recorded= 0;
165
share->update_file_opened= false;
166
share->tina_write_opened= false;
167
share->data_file_version= 0;
168
strcpy(share->table_name, table_name);
169
fn_format(share->data_file_name, table_name, "", CSV_EXT,
170
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
171
fn_format(meta_file_name, table_name, "", CSM_EXT,
172
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
174
if (stat(share->data_file_name, &file_stat))
311
176
share->saved_data_file_length= file_stat.st_size;
313
a_tina->addOpenTable(share->table_name, share);
178
if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
180
thr_lock_init(&share->lock);
315
181
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
696
If frm_error() is called in table.cc this is called to find out what file
697
extensions exist for this handler.
699
static const char *ha_tina_exts[] = {
705
const char **ha_tina::bas_ext() const
711
Three functions below are needed to enable concurrent insert functionality
712
for CSV engine. For more details see mysys/thr_lock.c
715
void tina_get_status(void* param, int)
717
ha_tina *tina= (ha_tina*) param;
721
void tina_update_status(void* param)
723
ha_tina *tina= (ha_tina*) param;
724
tina->update_status();
727
/* this should exist and return 0 for concurrent insert to work */
728
bool tina_check_status(void *)
734
Save the state of the table
740
This function is used to retrieve the file length. During the lock
741
phase of concurrent insert. For more details see comment to
742
ha_tina::update_status below.
745
void ha_tina::get_status()
747
local_saved_data_file_length= share->saved_data_file_length;
752
Correct the state of the table. Called by unlock routines
753
before the write lock is released.
759
When we employ concurrent insert lock, we save current length of the file
760
during the lock phase. We do not read further saved value, as we don't
761
want to interfere with undergoing concurrent insert. Writers update file
762
length info during unlock with update_status().
765
For log tables concurrent insert works different. The reason is that
766
log tables are always opened and locked. And as they do not unlock
767
tables, the file length after writes should be updated in a different
771
void ha_tina::update_status()
773
/* correct local_saved_data_file_length for writers */
774
share->saved_data_file_length= local_saved_data_file_length;
780
779
Open a database file. Keep in mind that tables are caches, so
781
780
this will not be called for every request. Any sort of positions
782
781
that need to be reset should be kept in the ::extra() call.
784
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
783
int ha_tina::open(const char *name, int, uint32_t open_options)
786
if (not (share= get_share(identifier.getPath().c_str())))
785
if (!(share= get_share(name, table)))
788
if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
792
791
return(HA_ERR_CRASHED_ON_USAGE);
795
794
local_data_file_version= share->data_file_version;
796
if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
795
if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
800
Init locking. Pass Cursor object to the locking routines,
799
Init locking. Pass handler object to the locking routines,
801
800
so that they could save/update local_saved_data_file_length value
802
801
during locking. This is needed to enable concurrent inserts.
803
thr_lock_data_init(&share->lock, &lock, (void*) this);
804
804
ref_length=sizeof(off_t);
806
share->lock.get_status= tina_get_status;
807
share->lock.update_status= tina_update_status;
808
share->lock.check_status= tina_check_status;
810
815
Close a database file. We remove ourselves from the shared strucutre.
811
816
If it is empty we destroy it.
1231
internal::my_close(update_temp_file, MYF(0));
1251
my_close(update_temp_file, MYF(0));
1232
1252
share->update_file_opened= false;
1258
Repair CSV table in the case, it is crashed.
1262
session The thread, performing repair
1263
check_opt The options for repair. We do not use it currently.
1266
If the file is empty, change # of rows in the file and complete recovery.
1267
Otherwise, scan the table looking for bad rows. If none were found,
1268
we mark file as a good one and return. If a bad row was encountered,
1269
we truncate the datafile up to the last good row.
1271
TODO: Make repair more clever - it should try to recover subsequent
1272
rows (after the first bad one) as well.
1275
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
1277
char repaired_fname[FN_REFLEN];
1281
ha_rows rows_repaired= 0;
1282
off_t write_begin= 0, write_end;
1285
if (!share->saved_data_file_length)
1287
share->rows_recorded= 0;
1291
/* Don't assert in field::val() functions */
1292
table->use_all_columns();
1293
if (!(buf= (unsigned char*) malloc(table->s->reclength)))
1294
return(HA_ERR_OUT_OF_MEM);
1296
/* position buffer to the start of the file */
1297
if (init_data_file())
1298
return(HA_ERR_CRASHED_ON_REPAIR);
1301
Local_saved_data_file_length is initialized during the lock phase.
1302
Sometimes this is not getting executed before ::repair (e.g. for
1303
the log tables). We set it manually here.
1305
local_saved_data_file_length= share->saved_data_file_length;
1306
/* set current position to the beginning of the file */
1307
current_position= next_position= 0;
1309
init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1311
/* Read the file row-by-row. If everything is ok, repair is not needed. */
1312
while (!(rc= find_current_row(buf)))
1314
session_inc_row_count(session);
1316
current_position= next_position;
1319
free_root(&blobroot, MYF(0));
1323
if (rc == HA_ERR_END_OF_FILE)
1326
All rows were read ok until end of file, the file does not need repair.
1327
If rows_recorded != rows_repaired, we should update rows_recorded value
1328
to the current amount of rows.
1330
share->rows_recorded= rows_repaired;
1335
Otherwise we've encountered a bad row => repair is needed.
1336
Let us create a temporary file.
1338
if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
1340
MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1341
0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1342
return(HA_ERR_CRASHED_ON_REPAIR);
1344
file_buff->init_buff(data_file);
1347
/* we just truncated the file up to the first bad row. update rows count. */
1348
share->rows_recorded= rows_repaired;
1350
/* write repaired file */
1353
write_end= std::min(file_buff->end(), current_position);
1355
off_t write_length= write_end - write_begin;
1356
if ((uint64_t)write_length > SIZE_MAX)
1360
if ((write_length) &&
1361
(my_write(repair_file, (unsigned char*)file_buff->ptr(),
1362
(size_t)write_length, MYF_RW)))
1365
write_begin= write_end;
1366
if (write_end== current_position)
1369
file_buff->read_next(); /* shift the buffer */
1373
Close the files and rename repaired file to the datafile.
1374
We have to close the files, as on Windows one cannot rename
1375
a file, which descriptor is still open. EACCES will be returned
1376
when trying to delete the "to"-file in my_rename().
1378
if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
1379
my_rename(repaired_fname, share->data_file_name, MYF(0)))
1382
/* Open the file again, it should now be repaired */
1383
if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
1387
/* Set new file size. The file size will be updated by ::update_status() */
1388
local_saved_data_file_length= (size_t) current_position;
1391
share->crashed= false;
1392
return(HA_ADMIN_OK);
1238
1396
DELETE without WHERE calls this
1296
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1459
if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
1460
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1461
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1301
1464
write_meta_file(create_file, 0, false);
1302
internal::my_close(create_file, MYF(0));
1465
my_close(create_file, MYF(0));
1304
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1467
if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
1468
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1469
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1309
internal::my_close(create_file, MYF(0));
1311
session.getMessageCache().storeTableMessage(identifier, create_proto);
1317
DRIZZLE_DECLARE_PLUGIN
1472
my_close(create_file, MYF(0));
1477
int ha_tina::check(Session* session, HA_CHECK_OPT *)
1481
const char *old_proc_info;
1482
ha_rows count= share->rows_recorded;
1484
old_proc_info= get_session_proc_info(session);
1485
set_session_proc_info(session, "Checking table");
1486
if (!(buf= (unsigned char*) malloc(table->s->reclength)))
1487
return(HA_ERR_OUT_OF_MEM);
1489
/* position buffer to the start of the file */
1490
if (init_data_file())
1491
return(HA_ERR_CRASHED);
1494
Local_saved_data_file_length is initialized during the lock phase.
1495
Check does not use store_lock in certain cases. So, we set it
1498
local_saved_data_file_length= share->saved_data_file_length;
1499
/* set current position to the beginning of the file */
1500
current_position= next_position= 0;
1502
init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1504
/* Read the file row-by-row. If everything is ok, repair is not needed. */
1505
while (!(rc= find_current_row(buf)))
1507
session_inc_row_count(session);
1509
current_position= next_position;
1512
free_root(&blobroot, MYF(0));
1515
set_session_proc_info(session, old_proc_info);
1517
if ((rc != HA_ERR_END_OF_FILE) || count)
1519
share->crashed= true;
1520
return(HA_ADMIN_CORRUPT);
1523
return(HA_ADMIN_OK);
1527
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *, uint32_t)
1529
return COMPATIBLE_DATA_YES;
1532
drizzle_declare_plugin(csv)
1534
DRIZZLE_STORAGE_ENGINE_PLUGIN,
1322
1537
"Brian Aker, MySQL AB",
1323
1538
"CSV storage engine",
1324
1539
PLUGIN_LICENSE_GPL,
1325
1540
tina_init_func, /* Plugin Init */
1541
tina_done_func, /* Plugin Deinit */
1542
NULL, /* status variables */
1543
NULL, /* system variables */
1327
1544
NULL /* config options */
1329
DRIZZLE_DECLARE_PLUGIN_END;
1546
drizzle_declare_plugin_end;