13
13
You should have received a copy of the GNU General Public License
14
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
18
#include "config.h"
20
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
24
22
using namespace std;
25
23
using namespace drizzled;
134
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
135
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory,
137
set<string>& set_of_names)
139
drizzled::CachedDirectory::Entries entries= directory.getEntries();
141
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
142
entry_iter != entries.end(); ++entry_iter)
144
drizzled::CachedDirectory::Entry *entry= *entry_iter;
145
const string *filename= &entry->filename;
147
assert(filename->size());
149
const char *ext= strchr(filename->c_str(), '.');
151
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
152
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
156
char uname[NAME_LEN + 1];
157
uint32_t file_name_len;
159
file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
160
// TODO: Remove need for memory copy here
161
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
162
set_of_names.insert(uname);
168
int ArchiveEngine::doDropTable(Session&, TableIdentifier &identifier)
136
170
string new_path(identifier.getPath());
150
184
int ArchiveEngine::doGetTableDefinition(Session&,
151
const TableIdentifier &identifier,
185
TableIdentifier &identifier,
152
186
drizzled::message::Table &table_proto)
154
188
struct stat stat_info;
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
203
azio_stream proto_stream;
170
204
char* proto_string;
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
205
if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
206
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
208
proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
209
if (proto_string == NULL)
177
azclose(proto_stream.get());
211
azclose(&proto_stream);
181
azread_frm(proto_stream.get(), proto_string);
215
azread_frm(&proto_stream, proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
217
if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
218
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
220
azclose(&proto_stream);
187
221
free(proto_string);
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
228
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
229
TableShare &table_arg)
202
230
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
204
232
/* Set our original buffer from pre-allocated memory */
237
265
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
266
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
267
internal::fn_format(data_file_name, table_name.c_str(), "",
268
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
242
270
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
272
pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
247
275
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
277
thr_lock_delete(&lock);
278
pthread_mutex_destroy(&mutex);
252
280
We need to make sure we don't reset the crashed state.
253
281
If we open a crashed file, wee need to close it as crashed unless
258
286
if (archive_write_open == true)
259
287
(void)azclose(&archive_write);
288
pthread_mutex_destroy(&archive_mutex);
262
291
bool ArchiveShare::prime(uint64_t *auto_increment)
264
boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
293
azio_stream archive_tmp;
267
296
We read the meta file, but do not mark it dirty. Since we are not
269
298
anything but reading... open it for write and we will generate null
270
299
compression writes).
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
301
if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
302
AZ_METHOD_BLOCK)))
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
305
*auto_increment= archive_tmp.auto_increment + 1;
306
rows_recorded= (ha_rows)archive_tmp.rows;
307
crashed= archive_tmp.dirty;
279
308
if (version < global_version)
281
310
version_rows= rows_recorded;
282
311
version= global_version;
284
azclose(archive_tmp.get());
313
azclose(&archive_tmp);
297
326
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
299
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
301
pthread_mutex_lock(&a_engine->mutex());
328
pthread_mutex_lock(&archive_mutex);
330
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
331
share= a_engine->findOpenTable(table_name);
309
337
if (share == NULL)
311
pthread_mutex_unlock(&a_engine->mutex());
339
pthread_mutex_unlock(&archive_mutex);
312
340
*rc= HA_ERR_OUT_OF_MEM;
316
344
if (share->prime(&stats.auto_increment_value) == false)
318
pthread_mutex_unlock(&a_engine->mutex());
346
pthread_mutex_unlock(&archive_mutex);
319
347
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
353
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
354
thr_lock_init(&share->lock);
328
356
share->use_count++;
330
358
if (share->crashed)
331
359
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
360
pthread_mutex_unlock(&archive_mutex);
342
370
int ha_archive::free_share()
344
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
346
pthread_mutex_lock(&a_engine->mutex());
372
pthread_mutex_lock(&archive_mutex);
347
373
if (!--share->use_count)
375
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
376
a_engine->deleteOpenTable(share->table_name);
352
pthread_mutex_unlock(&a_engine->mutex());
379
pthread_mutex_unlock(&archive_mutex);
361
388
a gzip file that can be both read and written we keep a writer open
362
389
that is shared amoung all open tables.
364
if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
391
if (!(azopen(&(share->archive_write), share->data_file_name,
365
392
O_RDWR, AZ_METHOD_BLOCK)))
367
394
share->crashed= true;
388
415
az_method method;
390
if (archive_aio_state())
417
switch (archive_aio_state())
420
method= AZ_METHOD_BLOCK;
392
423
method= AZ_METHOD_AIO;
396
426
method= AZ_METHOD_BLOCK;
398
if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
428
if (!(azopen(&archive, share->data_file_name, O_RDONLY,
401
431
share->crashed= true;
414
444
We open the file we will read from.
416
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
446
int ha_archive::open(const char *name, int, uint32_t)
419
share= get_share(identifier.getPath().c_str(), &rc);
449
share= get_share(name, &rc);
422
452
We either fix it ourselves, or we just take it offline
440
record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
442
lock.init(&share->_lock);
470
record_buffer= create_record_buffer(table->getShare()->reclength +
471
ARCHIVE_ROW_HEADER_SIZE);
476
return(HA_ERR_OUT_OF_MEM);
479
thr_lock_data_init(&share->lock, &lock, NULL);
447
// Should never be called
448
int ha_archive::open(const char *, int, uint32_t)
500
530
int ArchiveEngine::doCreateTable(Session &,
501
531
Table& table_arg,
502
const drizzled::TableIdentifier &identifier,
532
drizzled::TableIdentifier &identifier,
503
533
drizzled::message::Table& proto)
535
char name_buff[FN_REFLEN];
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
537
azio_stream create_stream; /* Archive file we are working with */
507
538
uint64_t auto_increment_value;
508
539
string serialized_proto;
522
553
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
562
We reuse name_buff since it is available.
564
internal::fn_format(name_buff, identifier.getPath().c_str(), "", ARZ,
565
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
568
if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
569
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
575
proto.SerializeToString(&serialized_proto);
577
if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
578
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
581
if (proto.options().has_comment())
562
583
int write_length;
564
write_length= azwrite_comment(create_stream.get(),
585
write_length= azwrite_comment(&create_stream,
565
586
proto.options().comment().c_str(),
566
587
proto.options().comment().length());
568
589
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
578
597
Yes you need to do this, because the starting value
579
598
for the autoincrement may not be zero.
581
create_stream->auto_increment= auto_increment_value ?
600
create_stream.auto_increment= auto_increment_value ?
582
601
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
603
if (azclose(&create_stream))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
615
/* Return error number, if we got one */
616
return(error ? error : -1);
603
627
/* We pack the row for writing */
604
628
r_pack_length= pack_row(buf);
606
written= azwrite_row(writer, &record_buffer[0], r_pack_length);
630
written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
631
if (written != r_pack_length)
624
648
uint32_t ha_archive::max_row_length(const unsigned char *)
626
uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
650
uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
651
length+= ARCHIVE_ROW_HEADER_SIZE;
629
653
uint32_t *ptr, *end;
630
for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
654
for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
634
length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
658
length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
646
670
return(HA_ERR_OUT_OF_MEM);
648
672
/* Copy null bits */
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
673
memcpy(record_buffer->buffer, record, table->getShare()->null_bytes);
674
ptr= record_buffer->buffer + table->getShare()->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
676
for (Field **field=table->field ; *field ; field++)
654
678
if (!((*field)->is_null()))
655
679
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
682
return((unsigned int) (ptr - record_buffer->buffer));
674
698
unsigned char *read_buf= NULL;
675
699
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
700
unsigned char *record= table->record[0];
678
702
if (share->crashed)
679
703
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
705
ha_statistic_increment(&system_status_var::ha_write_count);
706
pthread_mutex_lock(&share->mutex);
683
708
if (share->archive_write_open == false)
684
709
if (init_archive_writer())
685
710
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
713
if (table->next_number_field && record == table->record[0])
690
715
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
716
temp_auto= table->next_number_field->val_int();
694
719
We don't support decremening auto_increment. They make the performance
697
722
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
723
table->getShare()->getKeyInfo(0).flags & HA_NOSAME)
700
725
rc= HA_ERR_FOUND_DUPP_KEY;
715
740
share->rows_recorded++;
716
741
rc= real_write_row(buf, &(share->archive_write));
718
pthread_mutex_unlock(&share->mutex());
743
pthread_mutex_unlock(&share->mutex);
720
745
free((unsigned char*) read_buf);
832
857
/* Reallocate buffer if needed */
833
858
bool ha_archive::fix_rec_buff(unsigned int length)
835
record_buffer.resize(length);
860
assert(record_buffer->buffer);
862
if (length > record_buffer->length)
864
unsigned char *newptr;
865
if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
867
record_buffer->buffer= newptr;
868
record_buffer->length= length;
871
assert(length <= record_buffer->length);
840
876
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
854
890
/* Copy null bits */
855
memcpy(record, ptr, getTable()->getNullBytes());
856
ptr+= getTable()->getNullBytes();
857
for (Field **field= getTable()->getFields() ; *field ; field++)
891
memcpy(record, ptr, table->getNullBytes());
892
ptr+= table->getNullBytes();
893
for (Field **field=table->field ; *field ; field++)
859
895
if (!((*field)->is_null()))
861
ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
897
ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
893
929
current_position= aztell(&archive);
894
930
rc= get_row(&archive, buf);
896
getTable()->status=rc ? STATUS_NOT_FOUND: 0;
932
table->status=rc ? STATUS_NOT_FOUND: 0;
971
1008
azread_frm(&archive, proto_string);
973
1010
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
1011
internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1012
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1014
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
979
1016
free(proto_string);
980
1017
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
1020
azwrite_frm(&writer, proto_string, archive.frm_length);
986
1023
An extended rebuild is a lot more effort. We open up each row and re-record it.
1012
1050
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1052
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1054
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1059
real_write_row(table->record[0], &writer);
1023
1061
Long term it should be possible to optimize this so that
1024
1062
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1064
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1066
Field *field= table->found_next_number_field;
1030
1068
/* Since we will need to use field to translate, we need to flip its read bit */
1031
1069
field->setReadSet();
1033
1071
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1072
(uint64_t) field->val_int(table->record[0] +
1073
field->offset(table->record[0]));
1036
1074
if (share->archive_write.auto_increment < auto_value)
1037
1075
stats.auto_increment_value=
1038
1076
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1079
share->rows_recorded= (ha_rows)writer.rows;
1044
1082
if (rc && rc != HA_ERR_END_OF_FILE)
1050
azclose(writer.get());
1051
1089
share->dirty= false;
1053
1091
azclose(&archive);
1055
1093
// make the file we just wrote be our data file
1056
rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1094
rc = internal::my_rename(writer_filename,share->data_file_name,MYF(0));
1058
1096
free(proto_string);
1061
1099
free(proto_string);
1062
azclose(writer.get());
1115
1153
If dirty, we lock, and then reset/flush the data.
1116
1154
I found that just calling azflush() doesn't always work.
1118
pthread_mutex_lock(&share->mutex());
1156
pthread_mutex_lock(&share->mutex);
1119
1157
if (share->dirty == true)
1121
1159
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1172
cause the number to be inaccurate.
1136
1174
stats.records= share->rows_recorded;
1137
pthread_mutex_unlock(&share->mutex());
1175
pthread_mutex_unlock(&share->mutex);
1139
1177
scan_rows= stats.records;
1140
1178
stats.deleted= 0;
1145
1183
struct stat file_stat; // Stat information for the data file
1147
stat(share->data_file_name.c_str(), &file_stat);
1185
stat(share->data_file_name, &file_stat);
1149
stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
1187
stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1188
stats.data_file_length= file_stat.st_size;
1151
1189
stats.create_time= file_stat.st_ctime;
1152
1190
stats.update_time= file_stat.st_mtime;
1158
1196
if (flag & HA_STATUS_AUTO)
1160
1198
init_archive_reader();
1161
pthread_mutex_lock(&share->mutex());
1199
pthread_mutex_lock(&share->mutex);
1162
1200
azflush(&archive, Z_SYNC_FLUSH);
1163
pthread_mutex_unlock(&share->mutex());
1201
pthread_mutex_unlock(&share->mutex);
1164
1202
stats.auto_increment_value= archive.auto_increment + 1;
1213
1251
const char *old_proc_info;
1215
1254
old_proc_info= get_session_proc_info(session);
1216
1255
set_session_proc_info(session, "Checking table");
1217
1256
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1257
pthread_mutex_lock(&share->mutex);
1219
1258
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1259
pthread_mutex_unlock(&share->mutex);
1223
1262
Now we will rewind the archive file so that we are positioned at the
1226
1265
init_archive_reader();
1227
1266
azflush(&archive, Z_SYNC_FLUSH);
1228
1267
read_data_header(&archive);
1229
for (uint64_t x= 0; x < share->archive_write.rows; x++)
1268
for (x= 0; x < share->archive_write.rows; x++)
1231
rc= get_row(&archive, getTable()->getInsertRecord());
1270
rc= get_row(&archive, table->record[0]);
1250
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1289
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1291
archive_record_buffer *r;
1292
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1296
r->length= (int)length;
1298
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1307
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1309
free((char*) r->buffer);
1314
int ArchiveEngine::doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to)
1267
1331
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const TableIdentifier &identifier)
1332
TableIdentifier &identifier)
1270
1334
string proto_path(identifier.getPath());
1271
1335
proto_path.append(ARZ);
1281
1345
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::SchemaIdentifier &schema_identifier,
1283
drizzled::TableIdentifier::vector &set_of_identifiers)
1346
drizzled::SchemaIdentifier &schema_identifier,
1347
drizzled::TableIdentifiers &set_of_identifiers)
1285
1349
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1302
1366
char uname[NAME_LEN + 1];
1303
1367
uint32_t file_name_len;
1305
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1369
file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
1370
// TODO: Remove need for memory copy here
1307
1371
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL