13
13
You should have received a copy of the GNU General Public License
14
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
18
#include "config.h"
20
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
24
22
using namespace std;
25
23
using namespace drizzled;
134
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
135
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory,
136
const SchemaIdentifier&,
137
set<string>& set_of_names)
139
drizzled::CachedDirectory::Entries entries= directory.getEntries();
141
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
142
entry_iter != entries.end(); ++entry_iter)
144
drizzled::CachedDirectory::Entry *entry= *entry_iter;
145
const string *filename= &entry->filename;
147
assert(filename->size());
149
const char *ext= strchr(filename->c_str(), '.');
151
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
152
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
156
char uname[NAME_LEN + 1];
157
uint32_t file_name_len;
159
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
160
// TODO: Remove need for memory copy here
161
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
162
set_of_names.insert(uname);
168
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
136
170
string new_path(identifier.getPath());
150
184
int ArchiveEngine::doGetTableDefinition(Session&,
151
const identifier::Table &identifier,
185
const TableIdentifier &identifier,
152
186
drizzled::message::Table &table_proto)
154
188
struct stat stat_info;
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
203
azio_stream proto_stream;
170
204
char* proto_string;
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
205
if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
206
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
208
proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
209
if (proto_string == NULL)
177
azclose(proto_stream.get());
211
azclose(&proto_stream);
181
azread_frm(proto_stream.get(), proto_string);
215
azread_frm(&proto_stream, proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
217
if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
218
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
220
azclose(&proto_stream);
187
221
free(proto_string);
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
228
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
229
TableShare &table_arg)
202
230
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
204
232
/* Set our original buffer from pre-allocated memory */
237
265
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
266
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
267
internal::fn_format(data_file_name, table_name.c_str(), "",
268
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
242
270
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
272
pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
247
275
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
277
thr_lock_delete(&lock);
278
pthread_mutex_destroy(&mutex);
252
280
We need to make sure we don't reset the crashed state.
253
281
If we open a crashed file, wee need to close it as crashed unless
258
286
if (archive_write_open == true)
259
287
(void)azclose(&archive_write);
288
pthread_mutex_destroy(&archive_mutex);
262
291
bool ArchiveShare::prime(uint64_t *auto_increment)
264
boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
293
azio_stream archive_tmp;
267
296
We read the meta file, but do not mark it dirty. Since we are not
269
298
anything but reading... open it for write and we will generate null
270
299
compression writes).
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
301
if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
302
AZ_METHOD_BLOCK)))
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
305
*auto_increment= archive_tmp.auto_increment + 1;
306
rows_recorded= (ha_rows)archive_tmp.rows;
307
crashed= archive_tmp.dirty;
279
308
if (version < global_version)
281
310
version_rows= rows_recorded;
282
311
version= global_version;
284
azclose(archive_tmp.get());
313
azclose(&archive_tmp);
297
326
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
299
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
301
pthread_mutex_lock(&a_engine->mutex());
328
pthread_mutex_lock(&archive_mutex);
330
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
303
331
share= a_engine->findOpenTable(table_name);
309
337
if (share == NULL)
311
pthread_mutex_unlock(&a_engine->mutex());
339
pthread_mutex_unlock(&archive_mutex);
312
340
*rc= HA_ERR_OUT_OF_MEM;
316
344
if (share->prime(&stats.auto_increment_value) == false)
318
pthread_mutex_unlock(&a_engine->mutex());
346
pthread_mutex_unlock(&archive_mutex);
319
347
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
353
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
354
thr_lock_init(&share->lock);
328
356
share->use_count++;
330
358
if (share->crashed)
331
359
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
360
pthread_mutex_unlock(&archive_mutex);
342
370
int ha_archive::free_share()
344
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
346
pthread_mutex_lock(&a_engine->mutex());
372
pthread_mutex_lock(&archive_mutex);
347
373
if (!--share->use_count)
375
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
349
376
a_engine->deleteOpenTable(share->table_name);
352
pthread_mutex_unlock(&a_engine->mutex());
379
pthread_mutex_unlock(&archive_mutex);
361
388
a gzip file that can be both read and written we keep a writer open
362
389
that is shared amoung all open tables.
364
if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
391
if (!(azopen(&(share->archive_write), share->data_file_name,
365
392
O_RDWR, AZ_METHOD_BLOCK)))
367
394
share->crashed= true;
388
415
az_method method;
390
if (archive_aio_state())
417
switch (archive_aio_state())
420
method= AZ_METHOD_BLOCK;
392
423
method= AZ_METHOD_AIO;
396
426
method= AZ_METHOD_BLOCK;
398
if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
428
if (!(azopen(&archive, share->data_file_name, O_RDONLY,
401
431
share->crashed= true;
414
444
We open the file we will read from.
416
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
446
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
419
449
share= get_share(identifier.getPath().c_str(), &rc);
440
record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
442
lock.init(&share->_lock);
470
record_buffer= create_record_buffer(table->getShare()->getRecordLength() +
471
ARCHIVE_ROW_HEADER_SIZE);
476
return(HA_ERR_OUT_OF_MEM);
479
thr_lock_data_init(&share->lock, &lock, NULL);
500
537
int ArchiveEngine::doCreateTable(Session &,
501
538
Table& table_arg,
502
const drizzled::identifier::Table &identifier,
539
const drizzled::TableIdentifier &identifier,
503
540
drizzled::message::Table& proto)
542
char name_buff[FN_REFLEN];
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
544
azio_stream create_stream; /* Archive file we are working with */
507
545
uint64_t auto_increment_value;
508
546
string serialized_proto;
522
560
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
569
We reuse name_buff since it is available.
571
internal::fn_format(name_buff, identifier.getPath().c_str(), "", ARZ,
572
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
575
if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
576
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
590
if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
591
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
596
if (proto.options().has_comment())
562
598
int write_length;
564
write_length= azwrite_comment(create_stream.get(),
600
write_length= azwrite_comment(&create_stream,
565
601
proto.options().comment().c_str(),
566
602
proto.options().comment().length());
568
604
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
578
612
Yes you need to do this, because the starting value
579
613
for the autoincrement may not be zero.
581
create_stream->auto_increment= auto_increment_value ?
615
create_stream.auto_increment= auto_increment_value ?
582
616
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
618
if (azclose(&create_stream))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
630
/* Return error number, if we got one */
631
return(error ? error : -1);
603
642
/* We pack the row for writing */
604
643
r_pack_length= pack_row(buf);
606
written= azwrite_row(writer, &record_buffer[0], r_pack_length);
645
written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
646
if (written != r_pack_length)
624
663
uint32_t ha_archive::max_row_length(const unsigned char *)
626
uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
665
uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
666
length+= ARCHIVE_ROW_HEADER_SIZE;
629
668
uint32_t *ptr, *end;
630
for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
669
for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
634
length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
673
length += 2 + ((Field_blob*)table->getField(*ptr))->get_length();
646
685
return(HA_ERR_OUT_OF_MEM);
648
687
/* Copy null bits */
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
688
memcpy(record_buffer->buffer, record, table->getShare()->null_bytes);
689
ptr= record_buffer->buffer + table->getShare()->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
691
for (Field **field=table->getFields() ; *field ; field++)
654
693
if (!((*field)->is_null()))
655
694
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
697
return((unsigned int) (ptr - record_buffer->buffer));
674
713
unsigned char *read_buf= NULL;
675
714
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
715
unsigned char *record= table->record[0];
678
717
if (share->crashed)
679
718
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
720
pthread_mutex_lock(&share->mutex);
683
722
if (share->archive_write_open == false)
684
723
if (init_archive_writer())
685
724
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
727
if (table->next_number_field && record == table->record[0])
690
729
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
730
temp_auto= table->next_number_field->val_int();
694
733
We don't support decremening auto_increment. They make the performance
697
736
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
737
table->getShare()->getKeyInfo(0).flags & HA_NOSAME)
700
739
rc= HA_ERR_FOUND_DUPP_KEY;
715
754
share->rows_recorded++;
716
755
rc= real_write_row(buf, &(share->archive_write));
718
pthread_mutex_unlock(&share->mutex());
757
pthread_mutex_unlock(&share->mutex);
720
759
free((unsigned char*) read_buf);
832
871
/* Reallocate buffer if needed */
833
872
bool ha_archive::fix_rec_buff(unsigned int length)
835
record_buffer.resize(length);
874
assert(record_buffer->buffer);
876
if (length > record_buffer->length)
878
unsigned char *newptr;
879
if (!(newptr= (unsigned char *)realloc(record_buffer->buffer, length)))
881
record_buffer->buffer= newptr;
882
record_buffer->length= length;
885
assert(length <= record_buffer->length);
840
890
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
854
904
/* Copy null bits */
855
memcpy(record, ptr, getTable()->getNullBytes());
856
ptr+= getTable()->getNullBytes();
857
for (Field **field= getTable()->getFields() ; *field ; field++)
905
memcpy(record, ptr, table->getNullBytes());
906
ptr+= table->getNullBytes();
907
for (Field **field= table->getFields() ; *field ; field++)
859
909
if (!((*field)->is_null()))
861
ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
911
ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
893
943
current_position= aztell(&archive);
894
944
rc= get_row(&archive, buf);
896
getTable()->status=rc ? STATUS_NOT_FOUND: 0;
946
table->status=rc ? STATUS_NOT_FOUND: 0;
971
1022
azread_frm(&archive, proto_string);
973
1024
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
1025
internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1026
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1028
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
979
1030
free(proto_string);
980
1031
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
1034
azwrite_frm(&writer, proto_string, archive.frm_length);
986
1037
An extended rebuild is a lot more effort. We open up each row and re-record it.
1012
1064
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1066
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1068
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1073
real_write_row(table->record[0], &writer);
1023
1075
Long term it should be possible to optimize this so that
1024
1076
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1078
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1080
Field *field= table->found_next_number_field;
1030
1082
/* Since we will need to use field to translate, we need to flip its read bit */
1031
1083
field->setReadSet();
1033
1085
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1086
(uint64_t) field->val_int(table->record[0] +
1087
field->offset(table->record[0]));
1036
1088
if (share->archive_write.auto_increment < auto_value)
1037
1089
stats.auto_increment_value=
1038
1090
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1093
share->rows_recorded= (ha_rows)writer.rows;
1044
1096
if (rc && rc != HA_ERR_END_OF_FILE)
1050
azclose(writer.get());
1051
1103
share->dirty= false;
1053
1105
azclose(&archive);
1055
1107
// make the file we just wrote be our data file
1056
rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1108
rc = internal::my_rename(writer_filename,share->data_file_name,MYF(0));
1058
1110
free(proto_string);
1061
1113
free(proto_string);
1062
azclose(writer.get());
1115
1167
If dirty, we lock, and then reset/flush the data.
1116
1168
I found that just calling azflush() doesn't always work.
1118
pthread_mutex_lock(&share->mutex());
1170
pthread_mutex_lock(&share->mutex);
1119
1171
if (share->dirty == true)
1121
1173
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1134
1186
cause the number to be inaccurate.
1136
1188
stats.records= share->rows_recorded;
1137
pthread_mutex_unlock(&share->mutex());
1189
pthread_mutex_unlock(&share->mutex);
1139
1191
scan_rows= stats.records;
1140
1192
stats.deleted= 0;
1145
1197
struct stat file_stat; // Stat information for the data file
1147
stat(share->data_file_name.c_str(), &file_stat);
1199
stat(share->data_file_name, &file_stat);
1149
stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
1201
stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1202
stats.data_file_length= file_stat.st_size;
1151
1203
stats.create_time= file_stat.st_ctime;
1152
1204
stats.update_time= file_stat.st_mtime;
1158
1210
if (flag & HA_STATUS_AUTO)
1160
1212
init_archive_reader();
1161
pthread_mutex_lock(&share->mutex());
1213
pthread_mutex_lock(&share->mutex);
1162
1214
azflush(&archive, Z_SYNC_FLUSH);
1163
pthread_mutex_unlock(&share->mutex());
1215
pthread_mutex_unlock(&share->mutex);
1164
1216
stats.auto_increment_value= archive.auto_increment + 1;
1213
1265
const char *old_proc_info;
1215
old_proc_info= session->get_proc_info();
1216
session->set_proc_info("Checking table");
1268
old_proc_info= get_session_proc_info(session);
1269
set_session_proc_info(session, "Checking table");
1217
1270
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1271
pthread_mutex_lock(&share->mutex);
1219
1272
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1273
pthread_mutex_unlock(&share->mutex);
1223
1276
Now we will rewind the archive file so that we are positioned at the
1226
1279
init_archive_reader();
1227
1280
azflush(&archive, Z_SYNC_FLUSH);
1228
1281
read_data_header(&archive);
1229
for (uint64_t x= 0; x < share->archive_write.rows; x++)
1282
for (x= 0; x < share->archive_write.rows; x++)
1231
rc= get_row(&archive, getTable()->getInsertRecord());
1284
rc= get_row(&archive, table->record[0]);
1237
session->set_proc_info(old_proc_info);
1290
set_session_proc_info(session, old_proc_info);
1239
1292
if ((rc && rc != HA_ERR_END_OF_FILE))
1250
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1303
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1305
archive_record_buffer *r;
1306
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1310
r->length= (int)length;
1312
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1321
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1323
free((char*) r->buffer);
1328
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1267
1345
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const identifier::Table &identifier)
1346
const TableIdentifier &identifier)
1270
1348
string proto_path(identifier.getPath());
1271
1349
proto_path.append(ARZ);
1281
1359
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::identifier::Schema &schema_identifier,
1283
drizzled::identifier::Table::vector &set_of_identifiers)
1360
const drizzled::SchemaIdentifier &schema_identifier,
1361
drizzled::TableIdentifiers &set_of_identifiers)
1285
1363
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1302
1380
char uname[NAME_LEN + 1];
1303
1381
uint32_t file_name_len;
1305
file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1383
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
1384
// TODO: Remove need for memory copy here
1307
1385
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
1387
set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));