12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
Make sure to look at ha_tina.h for more details.
44
44
#include <drizzled/field.h>
45
45
#include <drizzled/field/blob.h>
46
#include <drizzled/field/timestamp.h>
46
47
#include <drizzled/error.h>
47
48
#include <drizzled/table.h>
48
49
#include <drizzled/session.h>
49
#include <drizzled/internal/my_sys.h>
50
#include "drizzled/internal/my_sys.h"
51
52
#include "ha_tina.h"
64
63
unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
66
static const int META_BUFFER_SIZE = sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t)
67
+ sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char);
68
static const int TINA_CHECK_HEADER = 254; // The number we use to determine corruption
69
static const int BLOB_MEMROOT_ALLOC_SIZE = 8192;
65
#define META_BUFFER_SIZE sizeof(unsigned char) + sizeof(unsigned char) + sizeof(uint64_t) \
66
+ sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(unsigned char)
67
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
68
#define BLOB_MEMROOT_ALLOC_SIZE 8192
71
70
/* The file extension */
72
static const char* CSV_EXT = ".CSV" // The data file
73
static const char* CSN_EXT = ".CSN" // Files used during repair and update
74
static const char* CSM_EXT = ".CSM" // Meta file
71
#define CSV_EXT ".CSV" // The data file
72
#define CSN_EXT ".CSN" // Files used during repair and update
73
#define CSM_EXT ".CSM" // Meta file
77
76
static int read_meta_file(int meta_file, ha_rows *rows);
78
77
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
79
void tina_get_status(void* param, int concurrent_insert);
80
void tina_update_status(void* param);
81
bool tina_check_status(void* param);
80
83
/* Stuff for shares */
81
84
pthread_mutex_t tina_mutex;
85
88
*****************************************************************************/
91
Used for sorting chains with qsort().
93
static int sort_set (tina_set *a, tina_set *b)
96
We assume that intervals do not intersect. So, it is enought to compare
97
any two points. Here we take start of intervals for comparison.
99
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
88
104
If frm_error() is called in table.cc this is called to find out what file
89
105
extensions exist for this Cursor.
111
129
pthread_mutex_destroy(&tina_mutex);
114
virtual Cursor *create(Table &table)
132
virtual Cursor *create(TableShare &table,
133
drizzled::memory::Root *mem_root)
116
return new ha_tina(*this, table);
135
return new (mem_root) ha_tina(*this, table);
119
138
const char **bas_ext() const {
120
139
return ha_tina_exts;
123
int doCreateTable(Session &,
125
const drizzled::identifier::Table &identifier,
142
int doCreateTable(Session *,
144
drizzled::TableIdentifier &identifier,
126
145
drizzled::message::Table&);
128
147
int doGetTableDefinition(Session& session,
129
const drizzled::identifier::Table &identifier,
148
TableIdentifier &identifier,
130
149
drizzled::message::Table &table_message);
132
int doDropTable(Session&, const drizzled::identifier::Table &identifier);
151
/* Temp only engine, so do not return values. */
152
void doGetTableNames(drizzled::CachedDirectory &, string& , set<string>&) { };
154
int doDropTable(Session&, TableIdentifier &identifier);
133
155
TinaShare *findOpenTable(const string table_name);
134
156
void addOpenTable(const string &table_name, TinaShare *);
135
157
void deleteOpenTable(const string &table_name);
138
160
uint32_t max_keys() const { return 0; }
139
161
uint32_t max_key_parts() const { return 0; }
140
162
uint32_t max_key_length() const { return 0; }
141
bool doDoesTableExist(Session& session, const drizzled::identifier::Table &identifier);
142
int doRenameTable(Session&, const drizzled::identifier::Table &from, const drizzled::identifier::Table &to);
144
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
145
const drizzled::identifier::Schema &schema_identifier,
146
drizzled::identifier::Table::vector &set_of_identifiers);
163
bool doDoesTableExist(Session& session, TableIdentifier &identifier);
164
int doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to);
149
void Tina::doGetTableIdentifiers(drizzled::CachedDirectory&,
150
const drizzled::identifier::Schema&,
151
drizzled::identifier::Table::vector&)
155
168
int Tina::doRenameTable(Session &session,
156
const drizzled::identifier::Table &from, const drizzled::identifier::Table &to)
169
TableIdentifier &from, TableIdentifier &to)
159
172
for (const char **ext= bas_ext(); *ext ; ext++)
169
session.getMessageCache().renameTableMessage(from, to);
182
session.renameTableMessage(from, to);
174
bool Tina::doDoesTableExist(Session &session, const drizzled::identifier::Table &identifier)
187
bool Tina::doDoesTableExist(Session &session, TableIdentifier &identifier)
176
return session.getMessageCache().doesTableMessageExist(identifier);
189
return session.doesTableMessageExist(identifier);
180
193
int Tina::doDropTable(Session &session,
181
const drizzled::identifier::Table &identifier)
194
TableIdentifier &identifier)
184
197
int enoent_or_zero= ENOENT; // Error if no file was deleted
198
char buff[FN_REFLEN];
186
200
for (const char **ext= bas_ext(); *ext ; ext++)
188
std::string full_name= identifier.getPath();
189
full_name.append(*ext);
191
if (internal::my_delete_with_symlink(full_name.c_str(), MYF(0)))
202
internal::fn_format(buff, identifier.getPath().c_str(), "", *ext,
203
MY_UNPACK_FILENAME|MY_APPEND_EXT);
204
if (internal::my_delete_with_symlink(buff, MYF(0)))
193
206
if ((error= errno) != ENOENT)
198
210
enoent_or_zero= 0; // No error for ENOENT
200
211
error= enoent_or_zero;
203
session.getMessageCache().removeTableMessage(identifier);
214
session.removeTableMessage(identifier);
230
241
int Tina::doGetTableDefinition(Session &session,
231
const drizzled::identifier::Table &identifier,
242
drizzled::TableIdentifier &identifier,
232
243
drizzled::message::Table &table_message)
234
if (session.getMessageCache().getTableMessage(identifier, table_message))
245
if (session.getTableMessage(identifier, table_message))
255
TinaShare::TinaShare(const std::string &table_name_arg) :
256
table_name(table_name_arg),
257
data_file_name(table_name_arg),
259
saved_data_file_length(0),
260
update_file_opened(false),
261
tina_write_opened(false),
266
TinaShare::TinaShare(const char *table_name_arg)
267
: table_name(table_name_arg), use_count(0), saved_data_file_length(0),
268
update_file_opened(false), tina_write_opened(false),
269
crashed(false), rows_recorded(0), data_file_version(0)
266
data_file_name.append(CSV_EXT);
271
thr_lock_init(&lock);
272
internal::fn_format(data_file_name, table_name_arg, "", CSV_EXT,
273
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
269
276
TinaShare::~TinaShare()
278
thr_lock_delete(&lock);
271
279
pthread_mutex_destroy(&mutex);
275
283
Simple lock controls.
277
TinaShare *ha_tina::get_share(const std::string &table_name)
285
TinaShare *ha_tina::get_share(const char *table_name)
279
287
pthread_mutex_lock(&tina_mutex);
281
Tina *a_tina= static_cast<Tina *>(getEngine());
289
Tina *a_tina= static_cast<Tina *>(engine);
282
290
share= a_tina->findOpenTable(table_name);
284
std::string meta_file_name;
292
char meta_file_name[FN_REFLEN];
285
293
struct stat file_stat;
301
meta_file_name.assign(table_name);
302
meta_file_name.append(CSM_EXT);
309
internal::fn_format(meta_file_name, table_name, "", CSM_EXT,
310
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
304
if (stat(share->data_file_name.c_str(), &file_stat))
312
if (stat(share->data_file_name, &file_stat))
306
314
pthread_mutex_unlock(&tina_mutex);
320
328
Usually this will result in auto-repair, and we will get a good
321
329
meta-file in the end.
323
if ((share->meta_file= internal::my_open(meta_file_name.c_str(),
331
if ((share->meta_file= internal::my_open(meta_file_name,
324
332
O_RDWR|O_CREAT, MYF(0))) == -1)
325
333
share->crashed= true;
449
457
(void)write_meta_file(share->meta_file, share->rows_recorded, true);
451
459
if ((share->tina_write_filedes=
452
internal::my_open(share->data_file_name.c_str(), O_RDWR|O_APPEND, MYF(0))) == -1)
460
internal::my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
454
462
share->crashed= true;
532
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
540
ha_tina::ha_tina(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
533
541
:Cursor(engine_arg, table_arg),
535
543
These definitions are found in Cursor.h
536
544
They are not probably completely right.
538
546
current_position(0), next_position(0), local_saved_data_file_length(0),
539
file_buff(0), local_data_file_version(0), records_is_known(0)
547
file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
548
local_data_file_version(0), records_is_known(0)
541
550
/* Set our original buffers from pre-allocated memory */
542
551
buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
543
553
file_buff= new Transparent_file();
645
655
int ha_tina::chain_append()
647
if (chain.size() > 0 && chain.back().second == current_position)
648
chain.back().second= next_position;
657
if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
658
(chain_ptr -1)->end= next_position;
650
chain.push_back(make_pair(current_position, next_position));
661
/* We set up for the next position */
662
if ((off_t)(chain_ptr - chain) == (chain_size -1))
664
off_t location= chain_ptr - chain;
665
chain_size += DEFAULT_CHAIN_LENGTH;
668
if ((chain= (tina_set *) realloc(chain, chain_size)) == NULL)
673
tina_set *ptr= (tina_set *) malloc(chain_size * sizeof(tina_set));
676
memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
680
chain_ptr= chain + location;
682
chain_ptr->begin= current_position;
683
chain_ptr->end= next_position;
675
711
error= HA_ERR_CRASHED_ON_USAGE;
677
memset(buf, 0, getTable()->getShare()->null_bytes);
713
memset(buf, 0, table->s->null_bytes);
679
for (Field **field= getTable()->getFields() ; *field ; field++)
715
for (Field **field=table->field ; *field ; field++)
746
782
/* This masks a bug in the logic for a SELECT * */
747
783
(*field)->setWriteSet();
748
if ((*field)->store_and_check(CHECK_FIELD_WARN, buffer.c_ptr(), buffer.length(), buffer.charset()))
784
if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
753
788
if ((*field)->flags & BLOB_FLAG)
761
796
memcpy(&src, blob->ptr + packlength, sizeof(char*));
764
tgt= (unsigned char*) blobroot.alloc_root(length);
799
tgt= (unsigned char*) alloc_root(&blobroot, length);
765
800
memmove(tgt, src, length);
766
801
memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
815
Three functions below are needed to enable concurrent insert functionality
816
for CSV engine. For more details see mysys/thr_lock.c
819
void tina_get_status(void* param, int)
821
ha_tina *tina= (ha_tina*) param;
825
void tina_update_status(void* param)
827
ha_tina *tina= (ha_tina*) param;
828
tina->update_status();
831
/* this should exist and return 0 for concurrent insert to work */
832
bool tina_check_status(void *)
838
Save the state of the table
844
This function is used to retrieve the file length. During the lock
845
phase of concurrent insert. For more details see comment to
846
ha_tina::update_status below.
849
void ha_tina::get_status()
851
local_saved_data_file_length= share->saved_data_file_length;
856
Correct the state of the table. Called by unlock routines
857
before the write lock is released.
863
When we employ concurrent insert lock, we save current length of the file
864
during the lock phase. We do not read further saved value, as we don't
865
want to interfere with undergoing concurrent insert. Writers update file
866
length info during unlock with update_status().
869
For log tables concurrent insert works different. The reason is that
870
log tables are always opened and locked. And as they do not unlock
871
tables, the file length after writes should be updated in a different
875
void ha_tina::update_status()
877
/* correct local_saved_data_file_length for writers */
878
share->saved_data_file_length= local_saved_data_file_length;
780
883
Open a database file. Keep in mind that tables are caches, so
781
884
this will not be called for every request. Any sort of positions
782
885
that need to be reset should be kept in the ::extra() call.
784
int ha_tina::doOpen(const identifier::Table &identifier, int , uint32_t )
887
int ha_tina::open(const char *name, int, uint32_t)
786
if (not (share= get_share(identifier.getPath().c_str())))
889
if (!(share= get_share(name)))
789
892
if (share->crashed)
795
898
local_data_file_version= share->data_file_version;
796
if ((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
899
if ((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
801
904
so that they could save/update local_saved_data_file_length value
802
905
during locking. This is needed to enable concurrent inserts.
907
thr_lock_data_init(&share->lock, &lock, (void*) this);
804
908
ref_length=sizeof(off_t);
910
share->lock.get_status= tina_get_status;
911
share->lock.update_status= tina_update_status;
912
share->lock.check_status= tina_check_status;
810
919
Close a database file. We remove ourselves from the shared strucutre.
811
920
If it is empty we destroy it.
822
931
of the file and appends the data. In an error case it really should
823
932
just truncate to the original position (this is not done yet).
825
int ha_tina::doInsertRecord(unsigned char * buf)
934
int ha_tina::write_row(unsigned char * buf)
829
938
if (share->crashed)
830
939
return(HA_ERR_CRASHED_ON_USAGE);
941
ha_statistic_increment(&system_status_var::ha_write_count);
832
943
size= encode_quote(buf);
834
945
if (!share->tina_write_opened)
880
991
This will be called in a table scan right before the previous ::rnd_next()
883
int ha_tina::doUpdateRecord(const unsigned char *, unsigned char * new_data)
994
int ha_tina::update_row(const unsigned char *, unsigned char * new_data)
999
ha_statistic_increment(&system_status_var::ha_update_count);
1001
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1002
table->timestamp_field->set_time();
888
1004
size= encode_quote(new_data);
891
1007
During update we mark each updating record as deleted
892
1008
(see the chain_append()) then write new one to the temporary data file.
893
At the end of the sequence in the doEndTableScan() we append all non-marked
1009
At the end of the sequence in the rnd_end() we append all non-marked
894
1010
records from the data file to the temporary data file then rename it.
895
1011
The temp_file_length is used to calculate new data file length.
920
1036
The table will then be deleted/positioned based on the ORDER (so RANDOM,
923
int ha_tina::doDeleteRecord(const unsigned char *)
1039
int ha_tina::delete_row(const unsigned char *)
1041
ha_statistic_increment(&system_status_var::ha_delete_count);
926
1043
if (chain_append())
957
1074
local_data_file_version= share->data_file_version;
958
1075
if (internal::my_close(data_file, MYF(0)) ||
959
(data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1)
1076
(data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
962
1079
file_buff->init_buff(data_file);
1090
1207
to the given "hole", stored in the buffer. "Valid" here means,
1091
1208
not listed in the chain of deleted records ("holes").
1093
bool ha_tina::get_write_pos(off_t *end_pos, vector< pair<off_t, off_t> >::iterator &closest_hole)
1210
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
1095
if (closest_hole == chain.end()) /* no more chains */
1212
if (closest_hole == chain_ptr) /* no more chains */
1096
1213
*end_pos= file_buff->end();
1098
1215
*end_pos= std::min(file_buff->end(),
1099
closest_hole->first);
1100
return (closest_hole != chain.end()) && (*end_pos == closest_hole->first);
1216
closest_hole->begin);
1217
return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1107
1224
slots to clean up all of the dead space we have collected while
1108
1225
performing deletes/updates.
1110
int ha_tina::doEndTableScan()
1227
int ha_tina::rnd_end()
1229
char updated_fname[FN_REFLEN];
1112
1230
off_t file_buffer_start= 0;
1114
blobroot.free_root(MYF(0));
1232
free_root(&blobroot, MYF(0));
1115
1233
records_is_known= 1;
1117
if (chain.size() > 0)
1235
if ((chain_ptr - chain) > 0)
1119
vector< pair<off_t, off_t> >::iterator ptr= chain.begin();
1237
tina_set *ptr= chain;
1122
1240
Re-read the beginning of a file (as the buffer should point to the
1128
1246
The sort is needed when there were updates/deletes with random orders.
1129
1247
It sorts so that we move the firts blocks to the beginning.
1131
sort(chain.begin(), chain.end());
1249
internal::my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1250
(qsort_cmp)sort_set);
1133
1252
off_t write_begin= 0, write_end;
1193
1312
Close opened fildes's. Then move updated file in place
1194
1313
of the old datafile.
1196
std::string rename_file= share->table_name;
1197
rename_file.append(CSN_EXT);
1198
1315
if (internal::my_close(data_file, MYF(0)) ||
1199
internal::my_rename(rename_file.c_str(),
1200
share->data_file_name.c_str(), MYF(0)))
1316
internal::my_rename(internal::fn_format(updated_fname,
1317
share->table_name.c_str(),
1319
MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1320
share->data_file_name, MYF(0)))
1203
1323
/* Open the file again */
1204
if (((data_file= internal::my_open(share->data_file_name.c_str(), O_RDONLY, MYF(0))) == -1))
1324
if (((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1207
1327
As we reopened the data file, increase share->data_file_version
1266
1386
this (the database will call ::open() if it needs to).
1269
int Tina::doCreateTable(Session &session,
1389
int Tina::doCreateTable(Session *session,
1270
1390
Table& table_arg,
1271
const drizzled::identifier::Table &identifier,
1391
drizzled::TableIdentifier &identifier,
1272
1392
drizzled::message::Table &create_proto)
1274
1394
char name_buff[FN_REFLEN];
1280
const drizzled::TableShare::Fields fields(table_arg.getShare()->getFields());
1281
for (drizzled::TableShare::Fields::const_iterator iter= fields.begin();
1282
iter != fields.end();
1400
for (Field **field= table_arg.s->field; *field; field++)
1285
if (not *iter) // Historical legacy for NULL array end.
1288
if ((*iter)->real_maybe_null())
1402
if ((*field)->real_maybe_null())
1290
1404
my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1291
1405
return(HA_ERR_UNSUPPORTED);
1309
1423
internal::my_close(create_file, MYF(0));
1311
session.getMessageCache().storeTableMessage(identifier, create_proto);
1425
session->storeTableMessage(identifier, create_proto);
1323
1437
"CSV storage engine",
1324
1438
PLUGIN_LICENSE_GPL,
1325
1439
tina_init_func, /* Plugin Init */
1440
NULL, /* system variables */
1327
1441
NULL /* config options */
1329
1443
DRIZZLE_DECLARE_PLUGIN_END;