13
13
You should have received a copy of the GNU General Public License
14
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
18
#include "config.h"
20
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
24
22
using namespace std;
25
23
using namespace drizzled;
135
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory,
136
const SchemaIdentifier&,
137
set<string>& set_of_names)
139
drizzled::CachedDirectory::Entries entries= directory.getEntries();
141
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
142
entry_iter != entries.end(); ++entry_iter)
144
drizzled::CachedDirectory::Entry *entry= *entry_iter;
145
const string *filename= &entry->filename;
147
assert(filename->size());
149
const char *ext= strchr(filename->c_str(), '.');
151
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
152
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
156
char uname[NAME_LEN + 1];
157
uint32_t file_name_len;
159
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
160
// TODO: Remove need for memory copy here
161
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
162
set_of_names.insert(uname);
134
168
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
136
170
string new_path(identifier.getPath());
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
203
azio_stream proto_stream;
170
204
char* proto_string;
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
205
if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
206
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
208
proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
209
if (proto_string == NULL)
177
azclose(proto_stream.get());
211
azclose(&proto_stream);
181
azread_frm(proto_stream.get(), proto_string);
215
azread_frm(&proto_stream, proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
217
if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
218
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
220
azclose(&proto_stream);
187
221
free(proto_string);
200
234
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
235
TableShare &table_arg)
202
236
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
204
238
/* Set our original buffer from pre-allocated memory */
237
271
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
272
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
273
internal::fn_format(data_file_name, table_name.c_str(), "",
274
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
242
276
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
278
pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
247
281
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
283
thr_lock_delete(&lock);
284
pthread_mutex_destroy(&mutex);
252
286
We need to make sure we don't reset the crashed state.
253
287
If we open a crashed file, wee need to close it as crashed unless
258
292
if (archive_write_open == true)
259
293
(void)azclose(&archive_write);
294
pthread_mutex_destroy(&archive_mutex);
262
297
bool ArchiveShare::prime(uint64_t *auto_increment)
264
boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
299
azio_stream archive_tmp;
267
302
We read the meta file, but do not mark it dirty. Since we are not
269
304
anything but reading... open it for write and we will generate null
270
305
compression writes).
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
307
if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
308
AZ_METHOD_BLOCK)))
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
311
*auto_increment= archive_tmp.auto_increment + 1;
312
rows_recorded= (ha_rows)archive_tmp.rows;
313
crashed= archive_tmp.dirty;
279
314
if (version < global_version)
281
316
version_rows= rows_recorded;
282
317
version= global_version;
284
azclose(archive_tmp.get());
319
azclose(&archive_tmp);
297
332
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
299
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
301
pthread_mutex_lock(&a_engine->mutex());
334
pthread_mutex_lock(&archive_mutex);
336
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
337
share= a_engine->findOpenTable(table_name);
309
343
if (share == NULL)
311
pthread_mutex_unlock(&a_engine->mutex());
345
pthread_mutex_unlock(&archive_mutex);
312
346
*rc= HA_ERR_OUT_OF_MEM;
316
350
if (share->prime(&stats.auto_increment_value) == false)
318
pthread_mutex_unlock(&a_engine->mutex());
352
pthread_mutex_unlock(&archive_mutex);
319
353
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
359
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
360
thr_lock_init(&share->lock);
328
362
share->use_count++;
330
364
if (share->crashed)
331
365
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
366
pthread_mutex_unlock(&archive_mutex);
342
376
int ha_archive::free_share()
344
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
346
pthread_mutex_lock(&a_engine->mutex());
378
pthread_mutex_lock(&archive_mutex);
347
379
if (!--share->use_count)
381
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
382
a_engine->deleteOpenTable(share->table_name);
352
pthread_mutex_unlock(&a_engine->mutex());
385
pthread_mutex_unlock(&archive_mutex);
361
394
a gzip file that can be both read and written we keep a writer open
362
395
that is shared amoung all open tables.
364
if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
397
if (!(azopen(&(share->archive_write), share->data_file_name,
365
398
O_RDWR, AZ_METHOD_BLOCK)))
367
400
share->crashed= true;
388
421
az_method method;
390
if (archive_aio_state())
423
switch (archive_aio_state())
426
method= AZ_METHOD_BLOCK;
392
429
method= AZ_METHOD_AIO;
396
432
method= AZ_METHOD_BLOCK;
398
if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
434
if (!(azopen(&archive, share->data_file_name, O_RDONLY,
401
437
share->crashed= true;
440
record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
442
lock.init(&share->_lock);
476
record_buffer= create_record_buffer(table->getShare()->getRecordLength() +
477
ARCHIVE_ROW_HEADER_SIZE);
482
return(HA_ERR_OUT_OF_MEM);
485
thr_lock_data_init(&share->lock, &lock, NULL);
502
545
const drizzled::TableIdentifier &identifier,
503
546
drizzled::message::Table& proto)
548
char name_buff[FN_REFLEN];
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
550
azio_stream create_stream; /* Archive file we are working with */
507
551
uint64_t auto_increment_value;
508
552
string serialized_proto;
522
566
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
575
We reuse name_buff since it is available.
577
internal::fn_format(name_buff, identifier.getPath().c_str(), "", ARZ,
578
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
581
if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
582
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
596
if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
597
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
602
if (proto.options().has_comment())
562
604
int write_length;
564
write_length= azwrite_comment(create_stream.get(),
606
write_length= azwrite_comment(&create_stream,
565
607
proto.options().comment().c_str(),
566
608
proto.options().comment().length());
568
610
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
578
618
Yes you need to do this, because the starting value
579
619
for the autoincrement may not be zero.
581
create_stream->auto_increment= auto_increment_value ?
621
create_stream.auto_increment= auto_increment_value ?
582
622
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
624
if (azclose(&create_stream))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
636
/* Return error number, if we got one */
637
return(error ? error : -1);
603
648
/* We pack the row for writing */
604
649
r_pack_length= pack_row(buf);
606
written= azwrite_row(writer, &record_buffer[0], r_pack_length);
651
written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
652
if (written != r_pack_length)
624
669
uint32_t ha_archive::max_row_length(const unsigned char *)
626
uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
671
uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
672
length+= ARCHIVE_ROW_HEADER_SIZE;
629
674
uint32_t *ptr, *end;
630
for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
675
for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
634
length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
679
length += 2 + ((Field_blob*)table->getField(*ptr))->get_length();
646
691
return(HA_ERR_OUT_OF_MEM);
648
693
/* Copy null bits */
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
694
memcpy(record_buffer->buffer, record, table->getShare()->null_bytes);
695
ptr= record_buffer->buffer + table->getShare()->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
697
for (Field **field=table->getFields() ; *field ; field++)
654
699
if (!((*field)->is_null()))
655
700
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
703
return((unsigned int) (ptr - record_buffer->buffer));
674
719
unsigned char *read_buf= NULL;
675
720
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
721
unsigned char *record= table->record[0];
678
723
if (share->crashed)
679
724
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
726
pthread_mutex_lock(&share->mutex);
683
728
if (share->archive_write_open == false)
684
729
if (init_archive_writer())
685
730
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
733
if (table->next_number_field && record == table->record[0])
690
735
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
736
temp_auto= table->next_number_field->val_int();
694
739
We don't support decremening auto_increment. They make the performance
697
742
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
743
table->getShare()->getKeyInfo(0).flags & HA_NOSAME)
700
745
rc= HA_ERR_FOUND_DUPP_KEY;
715
760
share->rows_recorded++;
716
761
rc= real_write_row(buf, &(share->archive_write));
718
pthread_mutex_unlock(&share->mutex());
763
pthread_mutex_unlock(&share->mutex);
720
765
free((unsigned char*) read_buf);
832
877
/* Reallocate buffer if needed */
833
878
bool ha_archive::fix_rec_buff(unsigned int length)
835
record_buffer.resize(length);
880
assert(record_buffer->buffer);
882
if (length > record_buffer->length)
884
unsigned char *newptr;
885
if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
887
record_buffer->buffer= newptr;
888
record_buffer->length= length;
891
assert(length <= record_buffer->length);
840
896
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
854
910
/* Copy null bits */
855
memcpy(record, ptr, getTable()->getNullBytes());
856
ptr+= getTable()->getNullBytes();
857
for (Field **field= getTable()->getFields() ; *field ; field++)
911
memcpy(record, ptr, table->getNullBytes());
912
ptr+= table->getNullBytes();
913
for (Field **field= table->getFields() ; *field ; field++)
859
915
if (!((*field)->is_null()))
861
ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
917
ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
893
949
current_position= aztell(&archive);
894
950
rc= get_row(&archive, buf);
896
getTable()->status=rc ? STATUS_NOT_FOUND: 0;
952
table->status=rc ? STATUS_NOT_FOUND: 0;
971
1028
azread_frm(&archive, proto_string);
973
1030
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
1031
internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1032
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1034
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
979
1036
free(proto_string);
980
1037
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
1040
azwrite_frm(&writer, proto_string, archive.frm_length);
986
1043
An extended rebuild is a lot more effort. We open up each row and re-record it.
1012
1070
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1072
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1074
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1079
real_write_row(table->record[0], &writer);
1023
1081
Long term it should be possible to optimize this so that
1024
1082
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1084
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1086
Field *field= table->found_next_number_field;
1030
1088
/* Since we will need to use field to translate, we need to flip its read bit */
1031
1089
field->setReadSet();
1033
1091
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1092
(uint64_t) field->val_int(table->record[0] +
1093
field->offset(table->record[0]));
1036
1094
if (share->archive_write.auto_increment < auto_value)
1037
1095
stats.auto_increment_value=
1038
1096
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1099
share->rows_recorded= (ha_rows)writer.rows;
1044
1102
if (rc && rc != HA_ERR_END_OF_FILE)
1050
azclose(writer.get());
1051
1109
share->dirty= false;
1053
1111
azclose(&archive);
1055
1113
// make the file we just wrote be our data file
1056
rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1114
rc = internal::my_rename(writer_filename,share->data_file_name,MYF(0));
1058
1116
free(proto_string);
1061
1119
free(proto_string);
1062
azclose(writer.get());
1115
1173
If dirty, we lock, and then reset/flush the data.
1116
1174
I found that just calling azflush() doesn't always work.
1118
pthread_mutex_lock(&share->mutex());
1176
pthread_mutex_lock(&share->mutex);
1119
1177
if (share->dirty == true)
1121
1179
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1192
cause the number to be inaccurate.
1136
1194
stats.records= share->rows_recorded;
1137
pthread_mutex_unlock(&share->mutex());
1195
pthread_mutex_unlock(&share->mutex);
1139
1197
scan_rows= stats.records;
1140
1198
stats.deleted= 0;
1145
1203
struct stat file_stat; // Stat information for the data file
1147
stat(share->data_file_name.c_str(), &file_stat);
1205
stat(share->data_file_name, &file_stat);
1149
stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
1207
stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1208
stats.data_file_length= file_stat.st_size;
1151
1209
stats.create_time= file_stat.st_ctime;
1152
1210
stats.update_time= file_stat.st_mtime;
1158
1216
if (flag & HA_STATUS_AUTO)
1160
1218
init_archive_reader();
1161
pthread_mutex_lock(&share->mutex());
1219
pthread_mutex_lock(&share->mutex);
1162
1220
azflush(&archive, Z_SYNC_FLUSH);
1163
pthread_mutex_unlock(&share->mutex());
1221
pthread_mutex_unlock(&share->mutex);
1164
1222
stats.auto_increment_value= archive.auto_increment + 1;
1213
1271
const char *old_proc_info;
1215
1274
old_proc_info= get_session_proc_info(session);
1216
1275
set_session_proc_info(session, "Checking table");
1217
1276
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1277
pthread_mutex_lock(&share->mutex);
1219
1278
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1279
pthread_mutex_unlock(&share->mutex);
1223
1282
Now we will rewind the archive file so that we are positioned at the
1226
1285
init_archive_reader();
1227
1286
azflush(&archive, Z_SYNC_FLUSH);
1228
1287
read_data_header(&archive);
1229
for (uint64_t x= 0; x < share->archive_write.rows; x++)
1288
for (x= 0; x < share->archive_write.rows; x++)
1231
rc= get_row(&archive, getTable()->getInsertRecord());
1290
rc= get_row(&archive, table->record[0]);
1309
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1311
archive_record_buffer *r;
1312
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1316
r->length= (int)length;
1318
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1327
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1329
free((char*) r->buffer);
1250
1334
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1281
1365
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
1366
const drizzled::SchemaIdentifier &schema_identifier,
1283
drizzled::TableIdentifier::vector &set_of_identifiers)
1367
drizzled::TableIdentifiers &set_of_identifiers)
1285
1369
drizzled::CachedDirectory::Entries entries= directory.getEntries();