74
70
#define CSM_EXT ".CSM" // Meta file
77
static int read_meta_file(int meta_file, ha_rows *rows);
78
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
73
static TINA_SHARE *get_share(const char *table_name, Table *table);
74
static int free_share(TINA_SHARE *share);
75
static int read_meta_file(File meta_file, ha_rows *rows);
76
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
78
extern "C" void tina_get_status(void* param, int concurrent_insert);
79
extern "C" void tina_update_status(void* param);
80
extern "C" bool tina_check_status(void* param);
80
82
/* Stuff for shares */
81
83
pthread_mutex_t tina_mutex;
84
static HASH tina_open_tables;
83
86
/*****************************************************************************
85
88
*****************************************************************************/
91
Used for sorting chains with qsort().
93
static int sort_set (tina_set *a, tina_set *b)
96
We assume that intervals do not intersect. So, it is enought to compare
97
any two points. Here we take start of intervals for comparison.
99
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
102
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
104
*length=share->table_name_length;
105
return (unsigned char*) share->table_name;
88
110
If frm_error() is called in table.cc this is called to find out what file
89
extensions exist for this Cursor.
111
extensions exist for this handler.
91
113
static const char *ha_tina_exts[] = {
97
class Tina : public drizzled::plugin::StorageEngine
119
class Tina : public StorageEngine
99
typedef std::map<string, TinaShare*> TinaMap;
100
TinaMap tina_open_tables;
102
122
Tina(const string& name_arg)
103
: drizzled::plugin::StorageEngine(name_arg,
104
HTON_TEMPORARY_ONLY |
105
HTON_NO_AUTO_INCREMENT |
106
HTON_SKIP_STORE_LOCK),
111
pthread_mutex_destroy(&tina_mutex);
114
virtual Cursor *create(Table &table)
116
return new ha_tina(*this, table);
123
: StorageEngine(name_arg, HTON_CAN_RECREATE | HTON_TEMPORARY_ONLY | HTON_FILE_BASED) {}
124
virtual handler *create(TableShare *table,
127
return new (mem_root) ha_tina(this, table);
119
130
const char **bas_ext() const {
120
131
return ha_tina_exts;
123
int doCreateTable(Session &,
125
const drizzled::identifier::Table &identifier,
126
drizzled::message::Table&);
128
int doGetTableDefinition(Session& session,
129
const drizzled::identifier::Table &identifier,
130
drizzled::message::Table &table_message);
132
int doDropTable(Session&, const drizzled::identifier::Table &identifier);
133
TinaShare *findOpenTable(const string table_name);
134
void addOpenTable(const string &table_name, TinaShare *);
135
void deleteOpenTable(const string &table_name);
138
uint32_t max_keys() const { return 0; }
139
uint32_t max_key_parts() const { return 0; }
140
uint32_t max_key_length() const { return 0; }
141
bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
144
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
const drizzled::identifier::Schema &schema_identifier,
146
drizzled::identifier::Table::vector &set_of_identifiers);
134
int createTableImplementation(Session *, const char *table_name,
136
HA_CREATE_INFO *, drizzled::message::Table*);
149
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
const drizzled::identifier::Schema&,
151
drizzled::identifier::Table::vector&)
155
int Tina::doRenameTable(Session &session,
156
const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
159
for (const char **ext= bas_ext(); *ext ; ext++)
161
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
163
if ((error=errno) != ENOENT)
169
session.getMessageCache().renameTableMessage(from, to);
174
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
176
return session.getMessageCache().doesTableMessageExist(identifier);
180
int Tina::doDropTable(Session &session,
181
const drizzled::identifier::Table &identifier)
184
int enoent_or_zero= ENOENT; // Error if no file was deleted
186
for (const char **ext= bas_ext(); *ext ; ext++)
188
std::string full_name= identifier.getPath();
189
full_name.append(*ext);
191
if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
193
if ((error= errno) != ENOENT)
198
enoent_or_zero= 0; // No error for ENOENT
200
error= enoent_or_zero;
203
session.getMessageCache().removeTableMessage(identifier);
208
TinaShare *Tina::findOpenTable(const string table_name)
210
TinaMap::iterator find_iter=
211
tina_open_tables.find(table_name);
213
if (find_iter != tina_open_tables.end())
214
return (*find_iter).second;
219
void Tina::addOpenTable(const string &table_name, TinaShare *share)
221
tina_open_tables[table_name]= share;
224
void Tina::deleteOpenTable(const string &table_name)
226
tina_open_tables.erase(table_name);
230
int Tina::doGetTableDefinition(Session &session,
231
const drizzled::identifier::Table &identifier,
232
drizzled::message::Table &table_message)
234
if (session.getMessageCache().getTableMessage(identifier, table_message))
241
140
static Tina *tina_engine= NULL;
243
static int tina_init_func(drizzled::module::Context &context)
142
static int tina_init_func(PluginRegistry ®istry)
246
tina_engine= new Tina("CSV");
247
context.add(tina_engine);
145
tina_engine= new Tina(engine_name);
146
registry.add(tina_engine);
249
148
pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
255
TinaShare::TinaShare(const std::string &table_name_arg) :
256
table_name(table_name_arg),
257
data_file_name(table_name_arg),
259
saved_data_file_length(0),
260
update_file_opened(false),
261
tina_write_opened(false),
266
data_file_name.append(CSV_EXT);
269
TinaShare::~TinaShare()
271
pthread_mutex_destroy(&mutex);
149
(void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
150
(hash_get_key) tina_get_key,0,0);
154
static int tina_done_func(PluginRegistry ®istry)
156
registry.remove(tina_engine);
159
hash_free(&tina_open_tables);
160
pthread_mutex_destroy(&tina_mutex);
275
167
Simple lock controls.
277
TinaShare *ha_tina::get_share(const std::string &table_name)
169
static TINA_SHARE *get_share(const char *table_name, Table *)
279
pthread_mutex_lock(&tina_mutex);
281
Tina *a_tina= static_cast<Tina *>(getEngine());
282
share= a_tina->findOpenTable(table_name);
284
std::string meta_file_name;
172
char meta_file_name[FN_REFLEN];
285
173
struct stat file_stat;
177
pthread_mutex_lock(&tina_mutex);
178
length=(uint) strlen(table_name);
288
181
If share is not present in the hash, create a new share and
289
182
initialize its members.
184
if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
185
(unsigned char*) table_name,
293
share= new TinaShare(table_name);
297
pthread_mutex_unlock(&tina_mutex);
301
meta_file_name.assign(table_name);
302
meta_file_name.append(CSM_EXT);
304
if (stat(share->data_file_name.c_str(), &file_stat))
306
pthread_mutex_unlock(&tina_mutex);
188
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
189
&share, sizeof(*share),
193
pthread_mutex_unlock(&tina_mutex);
198
share->table_name_length= length;
199
share->table_name= tmp_name;
200
share->crashed= false;
201
share->rows_recorded= 0;
202
share->update_file_opened= false;
203
share->tina_write_opened= false;
204
share->data_file_version= 0;
205
strcpy(share->table_name, table_name);
206
fn_format(share->data_file_name, table_name, "", CSV_EXT,
207
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
208
fn_format(meta_file_name, table_name, "", CSM_EXT,
209
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
211
if (stat(share->data_file_name, &file_stat))
311
213
share->saved_data_file_length= file_stat.st_size;
313
a_tina->addOpenTable(share->table_name, share);
215
if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
217
thr_lock_init(&share->lock);
315
218
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
732
Three functions below are needed to enable concurrent insert functionality
733
for CSV engine. For more details see mysys/thr_lock.c
736
void tina_get_status(void* param, int)
738
ha_tina *tina= (ha_tina*) param;
742
void tina_update_status(void* param)
744
ha_tina *tina= (ha_tina*) param;
745
tina->update_status();
748
/* this should exist and return 0 for concurrent insert to work */
749
bool tina_check_status(void *)
755
Save the state of the table
761
This function is used to retrieve the file length. During the lock
762
phase of concurrent insert. For more details see comment to
763
ha_tina::update_status below.
766
void ha_tina::get_status()
768
local_saved_data_file_length= share->saved_data_file_length;
773
Correct the state of the table. Called by unlock routines
774
before the write lock is released.
780
When we employ concurrent insert lock, we save current length of the file
781
during the lock phase. We do not read further saved value, as we don't
782
want to interfere with undergoing concurrent insert. Writers update file
783
length info during unlock with update_status().
786
For log tables concurrent insert works different. The reason is that
787
log tables are always opened and locked. And as they do not unlock
788
tables, the file length after writes should be updated in a different
792
void ha_tina::update_status()
794
/* correct local_saved_data_file_length for writers */
795
share->saved_data_file_length= local_saved_data_file_length;
780
800
Open a database file. Keep in mind that tables are caches, so
781
801
this will not be called for every request. Any sort of positions
782
802
that need to be reset should be kept in the ::extra() call.
784
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
804
int ha_tina::open(const char *name, int, uint32_t open_options)
786
if (not (share= get_share(identifier.getPath().c_str())))
806
if (!(share= get_share(name, table)))
809
if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
792
812
return(HA_ERR_CRASHED_ON_USAGE);
795
815
local_data_file_version= share->data_file_version;
796
if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
816
if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
800
Init locking. Pass Cursor object to the locking routines,
820
Init locking. Pass handler object to the locking routines,
801
821
so that they could save/update local_saved_data_file_length value
802
822
during locking. This is needed to enable concurrent inserts.
824
thr_lock_data_init(&share->lock, &lock, (void*) this);
804
825
ref_length=sizeof(off_t);
827
share->lock.get_status= tina_get_status;
828
share->lock.update_status= tina_update_status;
829
share->lock.check_status= tina_check_status;
810
836
Close a database file. We remove ourselves from the shared strucutre.
811
837
If it is empty we destroy it.
1231
internal::my_close(update_temp_file, MYF(0));
1269
my_close(update_temp_file, MYF(0));
1232
1270
share->update_file_opened= false;
1276
Repair CSV table in the case, it is crashed.
1280
session The thread, performing repair
1281
check_opt The options for repair. We do not use it currently.
1284
If the file is empty, change # of rows in the file and complete recovery.
1285
Otherwise, scan the table looking for bad rows. If none were found,
1286
we mark file as a good one and return. If a bad row was encountered,
1287
we truncate the datafile up to the last good row.
1289
TODO: Make repair more clever - it should try to recover subsequent
1290
rows (after the first bad one) as well.
1293
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
1295
char repaired_fname[FN_REFLEN];
1299
ha_rows rows_repaired= 0;
1300
off_t write_begin= 0, write_end;
1303
if (!share->saved_data_file_length)
1305
share->rows_recorded= 0;
1309
/* Don't assert in field::val() functions */
1310
table->use_all_columns();
1311
if (!(buf= (unsigned char*) malloc(table->s->reclength)))
1312
return(HA_ERR_OUT_OF_MEM);
1314
/* position buffer to the start of the file */
1315
if (init_data_file())
1316
return(HA_ERR_CRASHED_ON_REPAIR);
1319
Local_saved_data_file_length is initialized during the lock phase.
1320
Sometimes this is not getting executed before ::repair (e.g. for
1321
the log tables). We set it manually here.
1323
local_saved_data_file_length= share->saved_data_file_length;
1324
/* set current position to the beginning of the file */
1325
current_position= next_position= 0;
1327
init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1329
/* Read the file row-by-row. If everything is ok, repair is not needed. */
1330
while (!(rc= find_current_row(buf)))
1332
session_inc_row_count(session);
1334
current_position= next_position;
1337
free_root(&blobroot, MYF(0));
1341
if (rc == HA_ERR_END_OF_FILE)
1344
All rows were read ok until end of file, the file does not need repair.
1345
If rows_recorded != rows_repaired, we should update rows_recorded value
1346
to the current amount of rows.
1348
share->rows_recorded= rows_repaired;
1353
Otherwise we've encountered a bad row => repair is needed.
1354
Let us create a temporary file.
1356
if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
1358
MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1359
0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1360
return(HA_ERR_CRASHED_ON_REPAIR);
1362
file_buff->init_buff(data_file);
1365
/* we just truncated the file up to the first bad row. update rows count. */
1366
share->rows_recorded= rows_repaired;
1368
/* write repaired file */
1371
write_end= std::min(file_buff->end(), current_position);
1373
off_t write_length= write_end - write_begin;
1374
if ((uint64_t)write_length > SIZE_MAX)
1378
if ((write_length) &&
1379
(my_write(repair_file, (unsigned char*)file_buff->ptr(),
1380
(size_t)write_length, MYF_RW)))
1383
write_begin= write_end;
1384
if (write_end== current_position)
1387
file_buff->read_next(); /* shift the buffer */
1391
Close the files and rename repaired file to the datafile.
1392
We have to close the files, as on Windows one cannot rename
1393
a file, which descriptor is still open. EACCES will be returned
1394
when trying to delete the "to"-file in my_rename().
1396
if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
1397
my_rename(repaired_fname, share->data_file_name, MYF(0)))
1400
/* Open the file again, it should now be repaired */
1401
if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
1405
/* Set new file size. The file size will be updated by ::update_status() */
1406
local_saved_data_file_length= (size_t) current_position;
1409
share->crashed= false;
1410
return(HA_ADMIN_OK);
1238
1414
DELETE without WHERE calls this
1296
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSM_EXT,
1297
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1298
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1479
if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
1480
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1481
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1301
1484
write_meta_file(create_file, 0, false);
1302
internal::my_close(create_file, MYF(0));
1485
my_close(create_file, MYF(0));
1304
if ((create_file= internal::my_create(internal::fn_format(name_buff, identifier.getPath().c_str(), "", CSV_EXT,
1305
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1306
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1487
if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
1488
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1489
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1309
internal::my_close(create_file, MYF(0));
1311
session.getMessageCache().storeTableMessage(identifier, create_proto);
1317
DRIZZLE_DECLARE_PLUGIN
1492
my_close(create_file, MYF(0));
1497
int ha_tina::check(Session* session, HA_CHECK_OPT *)
1501
const char *old_proc_info;
1502
ha_rows count= share->rows_recorded;
1504
old_proc_info= get_session_proc_info(session);
1505
set_session_proc_info(session, "Checking table");
1506
if (!(buf= (unsigned char*) malloc(table->s->reclength)))
1507
return(HA_ERR_OUT_OF_MEM);
1509
/* position buffer to the start of the file */
1510
if (init_data_file())
1511
return(HA_ERR_CRASHED);
1514
Local_saved_data_file_length is initialized during the lock phase.
1515
Check does not use store_lock in certain cases. So, we set it
1518
local_saved_data_file_length= share->saved_data_file_length;
1519
/* set current position to the beginning of the file */
1520
current_position= next_position= 0;
1522
init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
1524
/* Read the file row-by-row. If everything is ok, repair is not needed. */
1525
while (!(rc= find_current_row(buf)))
1527
session_inc_row_count(session);
1529
current_position= next_position;
1532
free_root(&blobroot, MYF(0));
1535
set_session_proc_info(session, old_proc_info);
1537
if ((rc != HA_ERR_END_OF_FILE) || count)
1539
share->crashed= true;
1540
return(HA_ADMIN_CORRUPT);
1543
return(HA_ADMIN_OK);
1547
drizzle_declare_plugin(csv)
1322
1551
"Brian Aker, MySQL AB",
1323
1552
"CSV storage engine",
1324
1553
PLUGIN_LICENSE_GPL,
1325
1554
tina_init_func, /* Plugin Init */
1555
tina_done_func, /* Plugin Deinit */
1556
NULL, /* status variables */
1557
NULL, /* system variables */
1327
1558
NULL /* config options */
1329
DRIZZLE_DECLARE_PLUGIN_END;
1560
drizzle_declare_plugin_end;