43
#include <drizzled/common_includes.h>
43
#include <drizzled/server_includes.h>
44
#include <drizzled/field.h>
45
#include <drizzled/field/blob.h>
46
#include <drizzled/field/timestamp.h>
47
#include <drizzled/error.h>
48
#include <drizzled/table.h>
49
#include <drizzled/session.h>
44
51
#include "ha_tina.h"
57
static const string engine_name("CSV");
48
uchar + uchar + uint64_t + uint64_t + uint64_t + uint64_t + uchar
60
unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
50
#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(uint64_t) \
51
+ sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uchar)
62
#define META_BUFFER_SIZE sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t) \
63
+ sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char)
52
64
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
53
65
#define BLOB_MEMROOT_ALLOC_SIZE 8192
58
70
#define CSM_EXT ".CSM" // Meta file
61
static TINA_SHARE *get_share(const char *table_name, TABLE *table);
73
static TINA_SHARE *get_share(const char *table_name, Table *table);
62
74
static int free_share(TINA_SHARE *share);
63
75
static int read_meta_file(File meta_file, ha_rows *rows);
64
76
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
91
99
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
94
static uchar* tina_get_key(TINA_SHARE *share, size_t *length,
95
bool not_used __attribute__((unused)))
102
static unsigned char* tina_get_key(TINA_SHARE *share, size_t *length, bool)
97
104
*length=share->table_name_length;
98
return (uchar*) share->table_name;
105
return (unsigned char*) share->table_name;
101
static int tina_init_func(void *p)
103
handlerton *tina_hton;
105
tina_hton= (handlerton *)p;
106
VOID(pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST));
110
If frm_error() is called in table.cc this is called to find out what file
111
extensions exist for this handler.
113
static const char *ha_tina_exts[] = {
119
class Tina : public StorageEngine
122
Tina(const string& name_arg)
123
: StorageEngine(name_arg, HTON_CAN_RECREATE | HTON_TEMPORARY_ONLY) {}
124
virtual handler *create(TableShare *table,
127
return new (mem_root) ha_tina(this, table);
130
const char **bas_ext() const {
134
int createTableImpl(Session *, const char *table_name, Table *table_arg,
139
static Tina *tina_engine= NULL;
141
static int tina_init_func(PluginRegistry ®istry)
144
tina_engine= new Tina(engine_name);
145
registry.add(tina_engine);
147
pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST);
107
148
(void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
108
149
(hash_get_key) tina_get_key,0,0);
109
tina_hton->state= SHOW_OPTION_YES;
110
tina_hton->db_type= DB_TYPE_CSV_DB;
111
tina_hton->create= tina_create_handler;
112
tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES |
117
static int tina_done_func(void *p __attribute__((unused)))
153
static int tina_done_func(PluginRegistry ®istry)
155
registry.remove(tina_engine);
119
158
hash_free(&tina_open_tables);
120
159
pthread_mutex_destroy(&tina_mutex);
127
166
Simple lock controls.
129
static TINA_SHARE *get_share(const char *table_name,
130
TABLE *table __attribute__((unused)))
168
static TINA_SHARE *get_share(const char *table_name, Table *)
132
170
TINA_SHARE *share;
133
171
char meta_file_name[FN_REFLEN];
134
172
struct stat file_stat;
138
176
pthread_mutex_lock(&tina_mutex);
139
177
length=(uint) strlen(table_name);
163
201
share->update_file_opened= false;
164
202
share->tina_write_opened= false;
165
203
share->data_file_version= 0;
166
stpcpy(share->table_name, table_name);
204
strcpy(share->table_name, table_name);
167
205
fn_format(share->data_file_name, table_name, "", CSV_EXT,
168
206
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
169
207
fn_format(meta_file_name, table_name, "", CSM_EXT,
174
212
share->saved_data_file_length= file_stat.st_size;
176
if (my_hash_insert(&tina_open_tables, (uchar*) share))
214
if (my_hash_insert(&tina_open_tables, (unsigned char*) share))
178
216
thr_lock_init(&share->lock);
179
217
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
230
268
static int read_meta_file(File meta_file, ha_rows *rows)
232
uchar meta_buffer[META_BUFFER_SIZE];
233
uchar *ptr= meta_buffer;
270
unsigned char meta_buffer[META_BUFFER_SIZE];
271
unsigned char *ptr= meta_buffer;
235
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
236
if (my_read(meta_file, (uchar*)meta_buffer, META_BUFFER_SIZE, 0)
273
lseek(meta_file, 0, SEEK_SET);
274
if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
237
275
!= META_BUFFER_SIZE)
238
276
return(HA_ERR_CRASHED_ON_USAGE);
283
321
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
285
uchar meta_buffer[META_BUFFER_SIZE];
286
uchar *ptr= meta_buffer;
323
unsigned char meta_buffer[META_BUFFER_SIZE];
324
unsigned char *ptr= meta_buffer;
288
*ptr= (uchar)TINA_CHECK_HEADER;
290
*ptr= (uchar)TINA_VERSION;
326
*ptr= (unsigned char)TINA_CHECK_HEADER;
327
ptr+= sizeof(unsigned char);
328
*ptr= (unsigned char)TINA_VERSION;
329
ptr+= sizeof(unsigned char);
292
330
int8store(ptr, (uint64_t)rows);
293
331
ptr+= sizeof(uint64_t);
294
332
memset(ptr, 0, 3*sizeof(uint64_t));
297
335
We'll need them later.
299
337
ptr+= 3*sizeof(uint64_t);
338
*ptr= (unsigned char)dirty;
302
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
303
if (my_write(meta_file, (uchar *)meta_buffer, META_BUFFER_SIZE, 0)
340
lseek(meta_file, 0, SEEK_SET);
341
if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
304
342
!= META_BUFFER_SIZE)
365
403
share->tina_write_opened= false;
368
hash_delete(&tina_open_tables, (uchar*) share);
406
hash_delete(&tina_open_tables, (unsigned char*) share);
369
407
thr_lock_delete(&share->lock);
370
408
pthread_mutex_destroy(&share->mutex);
371
my_free((uchar*) share, MYF(0));
409
free((unsigned char*) share);
373
411
pthread_mutex_unlock(&tina_mutex);
386
424
'\r''\n' -- DOS\Windows line ending
389
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
390
off_t end, int *eoln_len)
427
static off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
428
off_t end, int *eoln_len)
417
static handler *tina_create_handler(handlerton *hton,
421
return new (mem_root) ha_tina(hton, table);
425
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
426
:handler(hton, table_arg),
456
ha_tina::ha_tina(StorageEngine *engine_arg, TableShare *table_arg)
457
:handler(engine_arg, table_arg),
428
459
These definitions are found in handler.h
429
460
They are not probably completely right.
443
474
Encode a buffer into the quoted format.
446
int ha_tina::encode_quote(uchar *buf __attribute__((unused)))
477
int ha_tina::encode_quote(unsigned char *)
448
479
char attribute_buffer[1024];
449
480
String attribute(attribute_buffer, sizeof(attribute_buffer),
450
481
&my_charset_bin);
452
my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
453
483
buffer.length(0);
455
485
for (Field **field=table->field ; *field ; field++)
546
581
chain_size += DEFAULT_CHAIN_LENGTH;
547
582
if (chain_alloced)
549
/* Must cast since my_malloc unlike malloc doesn't have a void ptr */
550
if ((chain= (tina_set *) my_realloc((uchar*)chain,
551
chain_size, MYF(MY_WME))) == NULL)
584
if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
556
tina_set *ptr= (tina_set *) my_malloc(chain_size * sizeof(tina_set),
589
tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
558
592
memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
576
int ha_tina::find_current_row(uchar *buf)
610
int ha_tina::find_current_row(unsigned char *buf)
578
612
off_t end_offset, curr_offset= current_position;
580
my_bitmap_map *org_bitmap;
584
616
free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
592
624
local_saved_data_file_length, &eoln_len)) == 0)
593
625
return(HA_ERR_END_OF_FILE);
595
/* We must read all columns in case a table is opened for update */
596
read_all= !bitmap_is_clear_all(table->write_set);
597
/* Avoid asserts in ::store() for columns that are not going to be updated */
598
org_bitmap= dbug_tmp_use_all_columns(table, table->write_set);
599
627
error= HA_ERR_CRASHED_ON_USAGE;
601
629
memset(buf, 0, table->s->null_bytes);
668
if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
696
if ((*field)->isReadSet() || (*field)->isWriteSet())
698
/* This masks a bug in the logic for a SELECT * */
699
(*field)->setWriteSet();
670
700
if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
671
701
CHECK_FIELD_WARN))
673
704
if ((*field)->flags & BLOB_FLAG)
675
706
Field_blob *blob= *(Field_blob**) field;
677
uint length, packlength;
707
unsigned char *src, *tgt;
708
uint32_t length, packlength;
679
710
packlength= blob->pack_length_no_ptr();
680
711
length= blob->get_length(blob->ptr);
681
712
memcpy(&src, blob->ptr + packlength, sizeof(char*));
684
tgt= (uchar*) alloc_root(&blobroot, length);
685
memcpy(tgt, src, length);
715
tgt= (unsigned char*) alloc_root(&blobroot, length);
716
memmove(tgt, src, length);
686
717
memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
695
dbug_tmp_restore_column_map(table->write_set, org_bitmap);
701
If frm_error() is called in table.cc this is called to find out what file
702
extensions exist for this handler.
704
static const char *ha_tina_exts[] = {
710
const char **ha_tina::bas_ext() const
716
731
Three functions below are needed to enable concurrent insert functionality
717
732
for CSV engine. For more details see mysys/thr_lock.c
720
void tina_get_status(void* param,
721
int concurrent_insert __attribute__((unused)))
735
void tina_get_status(void* param, int)
723
737
ha_tina *tina= (ha_tina*) param;
724
738
tina->get_status();
786
800
this will not be called for every request. Any sort of positions
787
801
that need to be reset should be kept in the ::extra() call.
789
int ha_tina::open(const char *name, int mode __attribute__((unused)),
803
int ha_tina::open(const char *name, int, uint32_t open_options)
792
805
if (!(share= get_share(name, table)))
793
return(HA_ERR_OUT_OF_MEM);
795
808
if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
855
865
/* use pwrite, as concurrent reader could have changed the position */
856
if (my_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size,
866
if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
857
867
MYF(MY_WME | MY_NABP)))
911
920
size= encode_quote(new_data);
914
During update we mark each updating record as deleted
915
(see the chain_append()) then write new one to the temporary data file.
923
During update we mark each updating record as deleted
924
(see the chain_append()) then write new one to the temporary data file.
916
925
At the end of the sequence in the rnd_end() we append all non-marked
917
926
records from the data file to the temporary data file then rename it.
918
927
The temp_file_length is used to calculate new data file length.
923
932
if (open_update_temp_file_if_needed())
926
if (my_write(update_temp_file, (uchar*)buffer.ptr(), size,
935
if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
927
936
MYF(MY_WME | MY_NABP)))
929
938
temp_file_length+= size;
965
974
@brief Initialize the data file.
967
976
@details Compare the local version of the data file with the shared one.
968
977
If they differ, there are some changes behind and we have to reopen
969
978
the data file to make the changes visible.
970
Call @c file_buff->init_buff() at the end to read the beginning of the
979
Call @c file_buff->init_buff() at the end to read the beginning of the
971
980
data file into buffer.
974
983
@retval 1 There was an error.
1080
1089
its just a position. Look at the bdb code if you want to see a case
1081
1090
where something other then a number is stored.
1083
void ha_tina::position(const uchar *record __attribute__((unused)))
1092
void ha_tina::position(const unsigned char *)
1085
1094
my_store_ptr(ref, ref_length, current_position);
1092
1101
my_get_ptr() retrieves the data for you.
1095
int ha_tina::rnd_pos(uchar * buf, uchar *pos)
1104
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1097
1106
ha_statistic_increment(&SSV::ha_read_rnd_count);
1098
1107
current_position= (off_t)my_get_ptr(pos,ref_length);
1104
1113
Currently this table handler doesn't implement most of the fields
1105
1114
really needed. SHOW also makes use of this data
1107
int ha_tina::info(uint flag __attribute__((unused)))
1116
int ha_tina::info(uint32_t)
1109
1118
/* This is a lie, but you don't want the optimizer to see zero or 1 */
1110
if (!records_is_known && stats.records < 2)
1119
if (!records_is_known && stats.records < 2)
1111
1120
stats.records= 2;
1122
1131
if (closest_hole == chain_ptr) /* no more chains */
1123
1132
*end_pos= file_buff->end();
1125
*end_pos= min(file_buff->end(),
1126
closest_hole->begin);
1134
*end_pos= std::min(file_buff->end(),
1135
closest_hole->begin);
1127
1136
return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1171
1180
bool in_hole= get_write_pos(&write_end, ptr);
1172
1181
off_t write_length= write_end - write_begin;
1182
if ((uint64_t)write_length > SIZE_MAX)
1174
1187
/* if there is something to write, write it */
1175
1188
if (write_length)
1177
if (my_write(update_temp_file,
1178
(uchar*) (file_buff->ptr() +
1190
if (my_write(update_temp_file,
1191
(unsigned char*) (file_buff->ptr() +
1179
1192
(write_begin - file_buff->start())),
1180
write_length, MYF_RW))
1193
(size_t)write_length, MYF_RW))
1182
1195
temp_file_length+= write_length;
1228
1241
if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1231
As we reopened the data file, increase share->data_file_version
1232
in order to force other threads waiting on a table lock and
1244
As we reopened the data file, increase share->data_file_version
1245
in order to force other threads waiting on a table lock and
1233
1246
have already opened the table to reopen the data file.
1234
1247
That makes the latest changes become visible to them.
1235
Update local_data_file_version as no need to reopen it in the
1248
Update local_data_file_version as no need to reopen it in the
1236
1249
current thread.
1238
1251
share->data_file_version++;
1243
1256
Here we record this fact to the meta-file.
1245
1258
(void)write_meta_file(share->meta_file, share->rows_recorded, false);
1247
Update local_saved_data_file_length with the real length of the
1260
Update local_saved_data_file_length with the real length of the
1250
1263
local_saved_data_file_length= temp_file_length;
1276
1289
rows (after the first bad one) as well.
1279
int ha_tina::repair(THD* thd,
1280
HA_CHECK_OPT* check_opt __attribute__((unused)))
1292
int ha_tina::repair(Session* session, HA_CHECK_OPT *)
1282
1294
char repaired_fname[FN_REFLEN];
1284
1296
File repair_file;
1286
1298
ha_rows rows_repaired= 0;
1296
1308
/* Don't assert in field::val() functions */
1297
1309
table->use_all_columns();
1298
if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1310
if (!(buf= (unsigned char*) malloc(table->s->reclength)))
1299
1311
return(HA_ERR_OUT_OF_MEM);
1301
1313
/* position buffer to the start of the file */
1316
1328
/* Read the file row-by-row. If everything is ok, repair is not needed. */
1317
1329
while (!(rc= find_current_row(buf)))
1319
thd_inc_row_count(thd);
1331
session_inc_row_count(session);
1320
1332
rows_repaired++;
1321
1333
current_position= next_position;
1324
1336
free_root(&blobroot, MYF(0));
1326
my_free((char*)buf, MYF(0));
1328
1340
if (rc == HA_ERR_END_OF_FILE)
1355
1367
/* write repaired file */
1358
write_end= min(file_buff->end(), current_position);
1359
if ((write_end - write_begin) &&
1360
(my_write(repair_file, (uchar*)file_buff->ptr(),
1361
write_end - write_begin, MYF_RW)))
1370
write_end= std::min(file_buff->end(), current_position);
1372
off_t write_length= write_end - write_begin;
1373
if ((uint64_t)write_length > SIZE_MAX)
1377
if ((write_length) &&
1378
(my_write(repair_file, (unsigned char*)file_buff->ptr(),
1379
(size_t)write_length, MYF_RW)))
1364
1382
write_begin= write_end;
1436
1454
Create a table. You do not want to leave the table open after a call to
1437
1455
this (the database will call ::open() if it needs to).
1440
int ha_tina::create(const char *name, TABLE *table_arg,
1441
HA_CREATE_INFO *create_info __attribute__((unused)))
1458
int Tina::createTableImpl(Session *, const char *table_name, Table *table_arg,
1443
1461
char name_buff[FN_REFLEN];
1444
1462
File create_file;
1454
1472
return(HA_ERR_UNSUPPORTED);
1459
if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
1477
if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
1460
1478
MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1461
1479
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1464
1482
write_meta_file(create_file, 0, false);
1465
1483
my_close(create_file, MYF(0));
1467
if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
1485
if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
1468
1486
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1469
1487
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1477
int ha_tina::check(THD* thd,
1478
HA_CHECK_OPT* check_opt __attribute__((unused)))
1495
int ha_tina::check(Session* session, HA_CHECK_OPT *)
1482
1499
const char *old_proc_info;
1483
1500
ha_rows count= share->rows_recorded;
1485
old_proc_info= thd_proc_info(thd, "Checking table");
1486
if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1502
old_proc_info= get_session_proc_info(session);
1503
set_session_proc_info(session, "Checking table");
1504
if (!(buf= (unsigned char*) malloc(table->s->reclength)))
1487
1505
return(HA_ERR_OUT_OF_MEM);
1489
1507
/* position buffer to the start of the file */
1504
1522
/* Read the file row-by-row. If everything is ok, repair is not needed. */
1505
1523
while (!(rc= find_current_row(buf)))
1507
thd_inc_row_count(thd);
1525
session_inc_row_count(session);
1509
1527
current_position= next_position;
1512
1530
free_root(&blobroot, MYF(0));
1514
my_free((char*)buf, MYF(0));
1515
thd_proc_info(thd, old_proc_info);
1533
set_session_proc_info(session, old_proc_info);
1517
1535
if ((rc != HA_ERR_END_OF_FILE) || count)
1527
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *info __attribute__((unused)),
1528
uint table_changes __attribute__((unused)))
1530
return COMPATIBLE_DATA_YES;
1533
mysql_declare_plugin(csv)
1535
DRIZZLE_STORAGE_ENGINE_PLUGIN,
1545
drizzle_declare_plugin(csv)
1538
1549
"Brian Aker, MySQL AB",