43
#include <drizzled/common_includes.h>
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
#include <drizzled/field/timestamp.h>
47
#include <storage/csv/ha_tina.h>
46
48
#include <drizzled/error.h>
47
49
#include <drizzled/table.h>
48
50
#include <drizzled/session.h>
49
#include <drizzled/internal/my_sys.h>
61
using namespace drizzled;
64
53
unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
66
static const int META_BUFFER_SIZE = sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t)
67
+ sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char);
68
static const int TINA_CHECK_HEADER = 254; // The number we use to determine corruption
69
static const int BLOB_MEMROOT_ALLOC_SIZE = 8192;
55
#define META_BUFFER_SIZE sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t) \
56
+ sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char)
57
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
58
#define BLOB_MEMROOT_ALLOC_SIZE 8192
71
60
/* The file extension */
72
static const char* CSV_EXT = ".CSV" // The data file
73
static const char* CSN_EXT = ".CSN" // Files used during repair and update
74
static const char* CSM_EXT = ".CSM" // Meta file
77
static int read_meta_file(int meta_file, ha_rows *rows);
78
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
61
#define CSV_EXT ".CSV" // The data file
62
#define CSN_EXT ".CSN" // Files used during repair and update
63
#define CSM_EXT ".CSM" // Meta file
66
static TINA_SHARE *get_share(const char *table_name, Table *table);
67
static int free_share(TINA_SHARE *share);
68
static int read_meta_file(File meta_file, ha_rows *rows);
69
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
71
extern "C" void tina_get_status(void* param, int concurrent_insert);
72
extern "C" void tina_update_status(void* param);
73
extern "C" bool tina_check_status(void* param);
80
75
/* Stuff for shares */
81
76
pthread_mutex_t tina_mutex;
77
static HASH tina_open_tables;
78
static handler *tina_create_handler(handlerton *hton,
83
83
/*****************************************************************************
85
85
*****************************************************************************/
88
If frm_error() is called in table.cc this is called to find out what file
89
extensions exist for this Cursor.
88
Used for sorting chains with qsort().
91
static const char *ha_tina_exts[] = {
97
class Tina : public drizzled::plugin::StorageEngine
99
typedef std::map<string, TinaShare*> TinaMap;
100
TinaMap tina_open_tables;
102
Tina(const string& name_arg)
103
: drizzled::plugin::StorageEngine(name_arg,
104
HTON_TEMPORARY_ONLY |
105
HTON_NO_AUTO_INCREMENT |
106
HTON_SKIP_STORE_LOCK),
111
pthread_mutex_destroy(&tina_mutex);
114
virtual Cursor *create(Table &table)
116
return new ha_tina(*this, table);
119
const char **bas_ext() const {
123
int doCreateTable(Session &,
125
const drizzled::identifier::Table &identifier,
126
drizzled::message::Table&);
128
int doGetTableDefinition(Session& session,
129
const drizzled::identifier::Table &identifier,
130
drizzled::message::Table &table_message);
132
int doDropTable(Session&, const drizzled::identifier::Table &identifier);
133
TinaShare *findOpenTable(const string table_name);
134
void addOpenTable(const string &table_name, TinaShare *);
135
void deleteOpenTable(const string &table_name);
138
uint32_t max_keys() const { return 0; }
139
uint32_t max_key_parts() const { return 0; }
140
uint32_t max_key_length() const { return 0; }
141
bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
144
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
const drizzled::identifier::Schema &schema_identifier,
146
drizzled::identifier::Table::vector &set_of_identifiers);
149
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
const drizzled::identifier::Schema&,
151
drizzled::identifier::Table::vector&)
155
int Tina::doRenameTable(Session &session,
156
const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
159
for (const char **ext= bas_ext(); *ext ; ext++)
161
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
if ((error=errno) != ENOENT)
169
session.getMessageCache().renameTableMessage(from, to);
174
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
176
return session.getMessageCache().doesTableMessageExist(identifier);
180
int Tina::doDropTable(Session &session,
181
const drizzled::identifier::Table &identifier)
184
int enoent_or_zero= ENOENT; // Error if no file was deleted
186
for (const char **ext= bas_ext(); *ext ; ext++)
188
std::string full_name= identifier.getPath();
189
full_name.append(*ext);
191
if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
193
if ((error= errno) != ENOENT)
198
enoent_or_zero= 0; // No error for ENOENT
200
error= enoent_or_zero;
203
session.getMessageCache().removeTableMessage(identifier);
208
TinaShare *Tina::findOpenTable(const string table_name)
210
TinaMap::iterator find_iter=
211
tina_open_tables.find(table_name);
213
if (find_iter != tina_open_tables.end())
214
return (*find_iter).second;
219
void Tina::addOpenTable(const string &table_name, TinaShare *share)
221
tina_open_tables[table_name]= share;
224
void Tina::deleteOpenTable(const string &table_name)
226
tina_open_tables.erase(table_name);
230
int Tina::doGetTableDefinition(Session &session,
231
const drizzled::identifier::Table &identifier,
232
drizzled::message::Table &table_message)
234
if (session.getMessageCache().getTableMessage(identifier, table_message))
241
static Tina *tina_engine= NULL;
243
static int tina_init_func(drizzled::module::Context &context)
246
tina_engine= new Tina("CSV");
247
context.add(tina_engine);
90
int sort_set (tina_set *a, tina_set *b)
93
We assume that intervals do not intersect. So, it is enought to compare
94
any two points. Here we take start of intervals for comparison.
96
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
99
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
101
*length=share->table_name_length;
102
return (unsigned char*) share->table_name;
105
static int tina_init_func(void *p)
107
handlerton *tina_hton;
109
tina_hton= (handlerton *)p;
249
110
pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
255
TinaShare::TinaShare(const std::string &table_name_arg) :
256
table_name(table_name_arg),
257
data_file_name(table_name_arg),
259
saved_data_file_length(0),
260
update_file_opened(false),
261
tina_write_opened(false),
266
data_file_name.append(CSV_EXT);
269
TinaShare::~TinaShare()
271
pthread_mutex_destroy(&mutex);
111
(void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
112
(hash_get_key) tina_get_key,0,0);
113
tina_hton->state= SHOW_OPTION_YES;
114
tina_hton->create= tina_create_handler;
115
tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES |
120
static int tina_done_func(void *)
122
hash_free(&tina_open_tables);
123
pthread_mutex_destroy(&tina_mutex);
275
130
Simple lock controls.
277
TinaShare *ha_tina::get_share(const std::string &table_name)
132
static TINA_SHARE *get_share(const char *table_name, Table *)
279
pthread_mutex_lock(&tina_mutex);
281
Tina *a_tina= static_cast<Tina *>(getEngine());
282
share= a_tina->findOpenTable(table_name);
284
std::string meta_file_name;
135
char meta_file_name[FN_REFLEN];
285
136
struct stat file_stat;
140
pthread_mutex_lock(&tina_mutex);
141
length=(uint) strlen(table_name);
288
144
If share is not present in the hash, create a new share and
289
145
initialize its members.
147
if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
148
(unsigned char*) table_name,
293
share= new TinaShare(table_name);
297
pthread_mutex_unlock(&tina_mutex);
301
meta_file_name.assign(table_name);
302
meta_file_name.append(CSM_EXT);
304
if (stat(share->data_file_name.c_str(), &file_stat))
306
pthread_mutex_unlock(&tina_mutex);
151
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
152
&share, sizeof(*share),
156
pthread_mutex_unlock(&tina_mutex);
161
share->table_name_length= length;
162
share->table_name= tmp_name;
163
share->crashed= false;
164
share->rows_recorded= 0;
165
share->update_file_opened= false;
166
share->tina_write_opened= false;
167
share->data_file_version= 0;
168
strcpy(share->table_name, table_name);
169
fn_format(share->data_file_name, table_name, "", CSV_EXT,
170
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
171
fn_format(meta_file_name, table_name, "", CSM_EXT,
172
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
174
if (stat(share->data_file_name, &file_stat))
311
176
share->saved_data_file_length= file_stat.st_size;
313
a_tina->addOpenTable(share->table_name, share);
178
if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
180
thr_lock_init(&share->lock);
315
181
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
697
If frm_error() is called in table.cc this is called to find out what file
698
extensions exist for this handler.
700
static const char *ha_tina_exts[] = {
706
const char **ha_tina::bas_ext() const
712
Three functions below are needed to enable concurrent insert functionality
713
for CSV engine. For more details see mysys/thr_lock.c
716
void tina_get_status(void* param, int)
718
ha_tina *tina= (ha_tina*) param;
722
void tina_update_status(void* param)
724
ha_tina *tina= (ha_tina*) param;
725
tina->update_status();
728
/* this should exist and return 0 for concurrent insert to work */
729
bool tina_check_status(void *)
735
Save the state of the table
741
This function is used to retrieve the file length. During the lock
742
phase of concurrent insert. For more details see comment to
743
ha_tina::update_status below.
746
void ha_tina::get_status()
748
local_saved_data_file_length= share->saved_data_file_length;
753
Correct the state of the table. Called by unlock routines
754
before the write lock is released.
760
When we employ concurrent insert lock, we save current length of the file
761
during the lock phase. We do not read further saved value, as we don't
762
want to interfere with undergoing concurrent insert. Writers update file
763
length info during unlock with update_status().
766
For log tables concurrent insert works different. The reason is that
767
log tables are always opened and locked. And as they do not unlock
768
tables, the file length after writes should be updated in a different
772
void ha_tina::update_status()
774
/* correct local_saved_data_file_length for writers */
775
share->saved_data_file_length= local_saved_data_file_length;
780
780
Open a database file. Keep in mind that tables are caches, so
781
781
this will not be called for every request. Any sort of positions
782
782
that need to be reset should be kept in the ::extra() call.
784
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
784
int ha_tina::open(const char *name, int, uint32_t open_options)
786
if (not (share= get_share(identifier.getPath().c_str())))
786
if (!(share= get_share(name, table)))
787
return(HA_ERR_OUT_OF_MEM);
789
if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
792
792
return(HA_ERR_CRASHED_ON_USAGE);
795
795
local_data_file_version= share->data_file_version;
796
if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
796
if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
800
Init locking. Pass Cursor object to the locking routines,
800
Init locking. Pass handler object to the locking routines,
801
801
so that they could save/update local_saved_data_file_length value
802
802
during locking. This is needed to enable concurrent inserts.
804
thr_lock_data_init(&share->lock, &lock, (void*) this);
804
805
ref_length=sizeof(off_t);
807
share->lock.get_status= tina_get_status;
808
share->lock.update_status= tina_update_status;
809
share->lock.check_status= tina_check_status;
810
816
Close a database file. We remove ourselves from the shared strucutre.
811
817
If it is empty we destroy it.
1231
internal::my_close(update_temp_file, MYF(0));
1248
my_close(update_temp_file, MYF(0));
1232
1249
share->update_file_opened= false;
1255
Repair CSV table in the case, it is crashed.
1259
session The thread, performing repair
1260
check_opt The options for repair. We do not use it currently.
1263
If the file is empty, change # of rows in the file and complete recovery.
1264
Otherwise, scan the table looking for bad rows. If none were found,
1265
we mark file as a good one and return. If a bad row was encountered,
1266
we truncate the datafile up to the last good row.
1268
TODO: Make repair more clever - it should try to recover subsequent
1269
rows (after the first bad one) as well.
1272
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
1274
char repaired_fname[FN_REFLEN];
1278
ha_rows rows_repaired= 0;
1279
off_t write_begin= 0, write_end;
1282
if (!share->saved_data_file_length)
1284
share->rows_recorded= 0;
1288
/* Don't assert in field::val() functions */
1289
table->use_all_columns();
1290
if (!(buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
1291
return(HA_ERR_OUT_OF_MEM);
1293
/* position buffer to the start of the file */
1294
if (init_data_file())
1295
return(HA_ERR_CRASHED_ON_REPAIR);
1298
Local_saved_data_file_length is initialized during the lock phase.
1299
Sometimes this is not getting executed before ::repair (e.g. for
1300
the log tables). We set it manually here.
1302
local_saved_data_file_length= share->saved_data_file_length;
1303
/* set current position to the beginning of the file */
1304
current_position= next_position= 0;
1306
init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1308
/* Read the file row-by-row. If everything is ok, repair is not needed. */
1309
while (!(rc= find_current_row(buf)))
1311
session_inc_row_count(session);
1313
current_position= next_position;
1316
free_root(&blobroot, MYF(0));
1320
if (rc == HA_ERR_END_OF_FILE)
1323
All rows were read ok until end of file, the file does not need repair.
1324
If rows_recorded != rows_repaired, we should update rows_recorded value
1325
to the current amount of rows.
1327
share->rows_recorded= rows_repaired;
1332
Otherwise we've encountered a bad row => repair is needed.
1333
Let us create a temporary file.
1335
if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
1337
MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1338
0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1339
return(HA_ERR_CRASHED_ON_REPAIR);
1341
file_buff->init_buff(data_file);
1344
/* we just truncated the file up to the first bad row. update rows count. */
1345
share->rows_recorded= rows_repaired;
1347
/* write repaired file */
1350
write_end= std::min(file_buff->end(), current_position);
1351
if ((write_end - write_begin) &&
1352
(my_write(repair_file, (unsigned char*)file_buff->ptr(),
1353
write_end - write_begin, MYF_RW)))
1356
write_begin= write_end;
1357
if (write_end== current_position)
1360
file_buff->read_next(); /* shift the buffer */
1364
Close the files and rename repaired file to the datafile.
1365
We have to close the files, as on Windows one cannot rename
1366
a file, which descriptor is still open. EACCES will be returned
1367
when trying to delete the "to"-file in my_rename().
1369
if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
1370
my_rename(repaired_fname, share->data_file_name, MYF(0)))
1373
/* Open the file again, it should now be repaired */
1374
if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
1378
/* Set new file size. The file size will be updated by ::update_status() */
1379
local_saved_data_file_length= (size_t) current_position;
1382
share->crashed= false;
1383
return(HA_ADMIN_OK);
1238
1387
DELETE without WHERE calls this
1414
Called by the database to lock the table. Keep in mind that this
1415
is an internal lock.
1417
THR_LOCK_DATA **ha_tina::store_lock(Session *,
1419
enum thr_lock_type lock_type)
1421
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1422
lock.type=lock_type;
1265
1428
Create a table. You do not want to leave the table open after a call to
1266
1429
this (the database will call ::open() if it needs to).
1269
int Tina::doCreateTable(Session &session,
1271
const drizzled::identifier::Table &identifier,
1272
drizzled::message::Table &create_proto)
1432
int ha_tina::create(const char *name, Table *table_arg, HA_CREATE_INFO *)
1274
1434
char name_buff[FN_REFLEN];
1280
const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
iter != fields.end();
1440
for (Field **field= table_arg->s->field; *field; field++)
1285
if (not *iter) // Historical legacy for NULL array end.
1288
if ((*iter)->real_maybe_null())
1442
if ((*field)->real_maybe_null())
1290
1444
my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1445
return(HA_ERR_UNSUPPORTED);
1296
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1450
if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
1451
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1452
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1301
1455
write_meta_file(create_file, 0, false);
1302
internal::my_close(create_file, MYF(0));
1456
my_close(create_file, MYF(0));
1304
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1458
if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
1459
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1460
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1309
internal::my_close(create_file, MYF(0));
1311
session.getMessageCache().storeTableMessage(identifier, create_proto);
1317
DRIZZLE_DECLARE_PLUGIN
1463
my_close(create_file, MYF(0));
1468
int ha_tina::check(Session* session, HA_CHECK_OPT *)
1472
const char *old_proc_info;
1473
ha_rows count= share->rows_recorded;
1475
old_proc_info= get_session_proc_info(session);
1476
set_session_proc_info(session, "Checking table");
1477
if (!(buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
1478
return(HA_ERR_OUT_OF_MEM);
1480
/* position buffer to the start of the file */
1481
if (init_data_file())
1482
return(HA_ERR_CRASHED);
1485
Local_saved_data_file_length is initialized during the lock phase.
1486
Check does not use store_lock in certain cases. So, we set it
1489
local_saved_data_file_length= share->saved_data_file_length;
1490
/* set current position to the beginning of the file */
1491
current_position= next_position= 0;
1493
init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1495
/* Read the file row-by-row. If everything is ok, repair is not needed. */
1496
while (!(rc= find_current_row(buf)))
1498
session_inc_row_count(session);
1500
current_position= next_position;
1503
free_root(&blobroot, MYF(0));
1506
set_session_proc_info(session, old_proc_info);
1508
if ((rc != HA_ERR_END_OF_FILE) || count)
1510
share->crashed= true;
1511
return(HA_ADMIN_CORRUPT);
1514
return(HA_ADMIN_OK);
1518
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *, uint32_t)
1520
return COMPATIBLE_DATA_YES;
1523
mysql_declare_plugin(csv)
1525
DRIZZLE_STORAGE_ENGINE_PLUGIN,
1322
1528
"Brian Aker, MySQL AB",
1323
1529
"CSV storage engine",
1324
1530
PLUGIN_LICENSE_GPL,
1325
1531
tina_init_func, /* Plugin Init */
1532
tina_done_func, /* Plugin Deinit */
1533
NULL, /* status variables */
1534
NULL, /* system variables */
1327
1535
NULL /* config options */
1329
DRIZZLE_DECLARE_PLUGIN_END;
1537
mysql_declare_plugin_end;