2
Copyright (C) 2010 Zimin
4
This program is free software; you can redistribute it and/or
5
modify it under the terms of the GNU General Public License
6
as published by the Free Software Foundation; either version 2
7
of the License, or (at your option) any later version.
9
This program is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU General Public License for more details.
14
You should have received a copy of the GNU General Public License
15
along with this program; if not, write to the Free Software
16
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20
#include <drizzled/field.h>
21
#include <drizzled/field/blob.h>
22
#include <drizzled/error.h>
23
#include <drizzled/table.h>
24
#include <drizzled/session.h>
25
#include "drizzled/internal/my_sys.h"
26
#include <google/protobuf/io/zero_copy_stream.h>
27
#include <google/protobuf/io/zero_copy_stream_impl.h>
29
#include "filesystem_engine.h"
39
#include <boost/algorithm/string.hpp>
42
using namespace drizzled;
44
#define FILESYSTEM_EXT ".FST"
46
/* Stuff for shares */
47
pthread_mutex_t filesystem_mutex;
49
static const char *ha_filesystem_exts[] = {
54
class FilesystemEngine : public drizzled::plugin::StorageEngine
57
typedef std::map<string, FilesystemTableShare*> FilesystemMap;
58
FilesystemMap fs_open_tables;
60
FilesystemEngine(const string& name_arg)
61
: drizzled::plugin::StorageEngine(name_arg,
63
HTON_SKIP_STORE_LOCK |
64
HTON_CAN_INDEX_BLOBS |
68
table_definition_ext= FILESYSTEM_EXT;
69
pthread_mutex_init(&filesystem_mutex, MY_MUTEX_INIT_FAST);
71
virtual ~FilesystemEngine()
73
pthread_mutex_destroy(&filesystem_mutex);
76
virtual Cursor *create(Table &table)
78
return new FilesystemCursor(*this, table);
81
const char **bas_ext() const {
82
return ha_filesystem_exts;
85
bool validateCreateTableOption(const std::string &key, const std::string &state);
87
int doCreateTable(Session &,
89
const drizzled::TableIdentifier &identifier,
90
drizzled::message::Table&);
92
int doGetTableDefinition(Session& ,
93
const drizzled::TableIdentifier &,
94
drizzled::message::Table &);
96
int doDropTable(Session&, const TableIdentifier &);
98
/* operations on FilesystemTableShare */
99
FilesystemTableShare *findOpenTable(const string table_name);
100
void addOpenTable(const string &table_name, FilesystemTableShare *);
101
void deleteOpenTable(const string &table_name);
103
uint32_t max_keys() const { return 0; }
104
uint32_t max_key_parts() const { return 0; }
105
uint32_t max_key_length() const { return 0; }
106
bool doDoesTableExist(Session& , const TableIdentifier &);
107
int doRenameTable(Session&, const TableIdentifier &, const TableIdentifier &);
108
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
109
const drizzled::SchemaIdentifier &schema_identifier,
110
drizzled::TableIdentifier::vector &set_of_identifiers);
112
void getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
113
const drizzled::SchemaIdentifier &schema_identifier,
114
drizzled::plugin::TableNameList *set_of_names,
115
drizzled::TableIdentifier::vector *set_of_identifiers);
118
void FilesystemEngine::getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
119
const drizzled::SchemaIdentifier &schema_identifier,
120
drizzled::plugin::TableNameList *set_of_names,
121
drizzled::TableIdentifier::vector *set_of_identifiers)
123
drizzled::CachedDirectory::Entries entries= directory.getEntries();
125
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
126
entry_iter != entries.end();
129
drizzled::CachedDirectory::Entry *entry= *entry_iter;
130
const string *filename= &entry->filename;
132
assert(not filename->empty());
134
string::size_type suffix_pos= filename->rfind('.');
136
if (suffix_pos != string::npos &&
137
boost::iequals(filename->substr(suffix_pos), FILESYSTEM_EXT) &&
138
filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX))
140
char uname[NAME_LEN + 1];
141
uint32_t file_name_len;
143
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
144
uname[file_name_len - sizeof(FILESYSTEM_EXT) + 1]= '\0';
146
set_of_names->insert(uname);
147
if (set_of_identifiers)
148
set_of_identifiers->push_back(TableIdentifier(schema_identifier, uname));
153
void FilesystemEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
154
const drizzled::SchemaIdentifier &schema_identifier,
155
drizzled::TableIdentifier::vector &set_of_identifiers)
157
getTableNamesFromFilesystem(directory, schema_identifier, NULL, &set_of_identifiers);
160
int FilesystemEngine::doDropTable(Session &, const TableIdentifier &identifier)
162
string new_path(identifier.getPath());
163
new_path+= FILESYSTEM_EXT;
164
int err= unlink(new_path.c_str());
172
bool FilesystemEngine::doDoesTableExist(Session &, const TableIdentifier &identifier)
174
string proto_path(identifier.getPath());
175
proto_path.append(FILESYSTEM_EXT);
177
if (access(proto_path.c_str(), F_OK))
185
FilesystemTableShare *FilesystemEngine::findOpenTable(const string table_name)
187
FilesystemMap::iterator find_iter=
188
fs_open_tables.find(table_name);
190
if (find_iter != fs_open_tables.end())
191
return (*find_iter).second;
196
void FilesystemEngine::addOpenTable(const string &table_name, FilesystemTableShare *share)
198
fs_open_tables[table_name]= share;
201
void FilesystemEngine::deleteOpenTable(const string &table_name)
203
fs_open_tables.erase(table_name);
206
static int parseTaggedFile(const FormatInfo &fi, vector< map<string, string> > &v)
208
int filedesc= ::open(fi.getFileName().c_str(), O_RDONLY);
212
boost::scoped_ptr<TransparentFile> filebuffer(new TransparentFile);
213
filebuffer->init_buff(filedesc);
215
bool last_line_empty= false;
216
map<string, string> kv;
221
char ch= filebuffer->get_value(pos);
224
if (!last_line_empty)
233
if (!fi.isRowSeparator(ch))
239
// if we have a new empty line,
240
// it means we got the end of a section, push it to vector
243
if (!last_line_empty)
248
last_line_empty= true;
253
vector<string> sv, svcopy;
254
boost::split(sv, line, boost::is_any_of(fi.getColSeparator()));
255
for (vector<string>::iterator iter= sv.begin();
260
svcopy.push_back(*iter);
263
// the first splitted string as key,
264
// and the second splitted string as value.
265
string key(svcopy[0]);
267
if (svcopy.size() >= 2)
269
string value(svcopy[1]);
273
else if (svcopy.size() >= 1)
276
last_line_empty= false;
283
int FilesystemEngine::doGetTableDefinition(Session &,
284
const drizzled::TableIdentifier &identifier,
285
drizzled::message::Table &table_proto)
287
string new_path(identifier.getPath());
288
new_path.append(FILESYSTEM_EXT);
290
int fd= ::open(new_path.c_str(), O_RDONLY);
294
google::protobuf::io::ZeroCopyInputStream* input=
295
new google::protobuf::io::FileInputStream(fd);
298
return HA_ERR_CRASHED_ON_USAGE;
300
if (not table_proto.ParseFromZeroCopyStream(input))
304
if (not table_proto.IsInitialized())
306
my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
307
table_proto.InitializationErrorString().c_str());
308
return ER_CORRUPT_TABLE_DEFINITION;
311
return HA_ERR_CRASHED_ON_USAGE;
315
// if the file is a tagged file such as /proc/meminfo
316
// then columns of this table are added dynamically here.
318
format.parseFromTable(&table_proto);
319
if (not format.isTagFormat() || not format.isFileGiven())
325
std::vector< std::map<std::string, std::string> > vm;
326
if (parseTaggedFile(format, vm) != 0)
332
if (vm.size() == 0) {
337
// we don't care what user provides, just clear them all
338
table_proto.clear_field();
339
// we take the first section as sample
340
std::map<string, string> kv= vm[0];
341
for (std::map<string, string>::iterator iter= kv.begin();
345
// add columns to table proto
346
message::Table::Field *field= table_proto.add_field();
347
field->set_name(iter->first);
348
field->set_type(drizzled::message::Table::Field::VARCHAR);
349
message::Table::Field::StringFieldOptions *stringoption= field->mutable_string_options();
350
stringoption->set_length(iter->second.length() + 1);
357
FilesystemTableShare::FilesystemTableShare(const string table_name_arg)
358
: use_count(0), table_name(table_name_arg),
359
update_file_opened(false),
364
FilesystemTableShare::~FilesystemTableShare()
366
pthread_mutex_destroy(&mutex);
369
FilesystemTableShare *FilesystemCursor::get_share(const char *table_name)
371
Guard g(filesystem_mutex);
373
FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
374
share= a_engine->findOpenTable(table_name);
377
If share is not present in the hash, create a new share and
378
initialize its members.
382
share= new (nothrow) FilesystemTableShare(table_name);
388
share->format.parseFromTable(getTable()->getShare()->getTableProto());
389
if (!share->format.isFileGiven())
394
* for taggered file such as /proc/meminfo,
395
* we pre-process it first, and store the parsing result in a map.
397
if (share->format.isTagFormat())
399
if (parseTaggedFile(share->format, share->vm) != 0)
404
a_engine->addOpenTable(share->table_name, share);
406
pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
413
void FilesystemCursor::free_share()
415
Guard g(filesystem_mutex);
417
if (!--share->use_count){
418
FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
419
a_engine->deleteOpenTable(share->table_name);
420
pthread_mutex_destroy(&share->mutex);
425
void FilesystemCursor::critical_section_enter()
427
if (sql_command_type == SQLCOM_ALTER_TABLE ||
428
sql_command_type == SQLCOM_UPDATE ||
429
sql_command_type == SQLCOM_DELETE ||
430
sql_command_type == SQLCOM_INSERT ||
431
sql_command_type == SQLCOM_INSERT_SELECT ||
432
sql_command_type == SQLCOM_REPLACE ||
433
sql_command_type == SQLCOM_REPLACE_SELECT)
434
share->filesystem_lock.scan_update_begin();
436
share->filesystem_lock.scan_begin();
438
thread_locked = true;
441
void FilesystemCursor::critical_section_exit()
443
if (sql_command_type == SQLCOM_ALTER_TABLE ||
444
sql_command_type == SQLCOM_UPDATE ||
445
sql_command_type == SQLCOM_DELETE ||
446
sql_command_type == SQLCOM_INSERT ||
447
sql_command_type == SQLCOM_INSERT_SELECT ||
448
sql_command_type == SQLCOM_REPLACE ||
449
sql_command_type == SQLCOM_REPLACE_SELECT)
450
share->filesystem_lock.scan_update_end();
452
share->filesystem_lock.scan_end();
454
thread_locked = false;
457
FilesystemCursor::FilesystemCursor(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
458
: Cursor(engine_arg, table_arg),
459
file_buff(new TransparentFile),
464
int FilesystemCursor::doOpen(const drizzled::TableIdentifier &identifier, int, uint32_t)
466
if (!(share= get_share(identifier.getPath().c_str())))
469
file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
473
return ER_CANT_OPEN_FILE;
476
ref_length= sizeof(off_t);
480
int FilesystemCursor::close(void)
482
int err= ::close(file_desc);
489
int FilesystemCursor::doStartTableScan(bool)
491
sql_command_type = session_sql_command(getTable()->getSession());
494
critical_section_exit();
495
critical_section_enter();
497
if (share->format.isTagFormat())
506
if (share->needs_reopen)
508
file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
510
return HA_ERR_CRASHED_ON_USAGE;
511
share->needs_reopen= false;
513
file_buff->init_buff(file_desc);
517
int FilesystemCursor::find_current_row(unsigned char *buf)
519
ptrdiff_t row_offset= buf - getTable()->record[0];
521
next_position= current_position;
524
bool line_done= false;
525
bool line_blank= true;
526
Field **field= getTable()->getFields();
527
for (; !line_done && *field; ++next_position)
529
char ch= file_buff->get_value(next_position);
531
return HA_ERR_END_OF_FILE;
533
if (share->format.isEscapedChar(ch))
535
// read next character
536
ch= file_buff->get_value(++next_position);
538
return HA_ERR_END_OF_FILE;
540
content.push_back(FormatInfo::getEscapedChar(ch));
545
// if we find separator
546
bool is_row= share->format.isRowSeparator(ch);
547
bool is_col= share->format.isColSeparator(ch);
550
if (share->format.isSeparatorModeGeneral() && is_row && line_blank)
552
if (share->format.isSeparatorModeWeak() && is_col)
556
if (is_row || is_col)
558
(*field)->move_field_offset(row_offset);
559
if (!content.empty())
561
(*field)->set_notnull();
562
if ((*field)->isReadSet() || (*field)->isWriteSet())
564
(*field)->setWriteSet();
565
(*field)->store_and_check(CHECK_FIELD_WARN,
567
(uint32_t)content.length(),
572
(*field)->set_default();
576
(*field)->set_null();
577
(*field)->move_field_offset(-row_offset);
588
content.push_back(ch);
592
for (; *field; ++field)
594
(*field)->move_field_offset(row_offset);
595
(*field)->set_notnull();
596
(*field)->set_default();
597
(*field)->move_field_offset(-row_offset);
602
// eat up characters when line_done
605
char ch= file_buff->get_value(next_position);
606
if (share->format.isRowSeparator(ch))
614
int FilesystemCursor::rnd_next(unsigned char *buf)
616
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
617
if (share->format.isTagFormat())
619
if (tag_depth >= share->vm.size())
620
return HA_ERR_END_OF_FILE;
622
ptrdiff_t row_offset= buf - getTable()->record[0];
623
for (Field **field= getTable()->getFields(); *field; field++)
625
string key((*field)->field_name);
626
string content= share->vm[tag_depth][key];
628
(*field)->move_field_offset(row_offset);
629
if (!content.empty())
631
(*field)->set_notnull();
632
if ((*field)->isReadSet() || (*field)->isWriteSet())
634
(*field)->setWriteSet();
635
(*field)->store_and_check(CHECK_FIELD_WARN,
637
(uint32_t)content.length(),
642
(*field)->set_default();
647
(*field)->set_null();
649
(*field)->move_field_offset(-row_offset);
655
current_position= next_position;
656
return find_current_row(buf);
659
void FilesystemCursor::position(const unsigned char *)
661
*reinterpret_cast<off_t *>(ref)= current_position;
664
int FilesystemCursor::rnd_pos(unsigned char * buf, unsigned char *pos)
666
ha_statistic_increment(&system_status_var::ha_read_rnd_count);
667
current_position= *reinterpret_cast<off_t *>(pos);
668
return find_current_row(buf);
671
int FilesystemCursor::info(uint32_t)
673
if (stats.records < 2)
678
int FilesystemCursor::openUpdateFile()
680
if (!share->update_file_opened)
683
if (stat(share->format.getFileName().c_str(), &st) < 0)
685
update_file_name= share->format.getFileName();
686
update_file_name.append(".UPDATE");
687
unlink(update_file_name.c_str());
688
update_file_desc= ::open(update_file_name.c_str(),
689
O_RDWR | O_CREAT | O_TRUNC,
691
if (update_file_desc < 0)
695
share->update_file_opened= true;
700
int FilesystemCursor::doEndTableScan()
702
sql_command_type = session_sql_command(getTable()->getSession());
704
if (share->format.isTagFormat())
707
critical_section_exit();
711
if (slots.size() == 0)
714
critical_section_exit();
719
sort(slots.begin(), slots.end());
720
vector< pair<off_t, off_t> >::iterator slot_iter= slots.begin();
721
off_t write_start= 0;
723
off_t file_buffer_start= 0;
725
pthread_mutex_lock(&share->mutex);
727
file_buff->init_buff(file_desc);
728
if (openUpdateFile() < 0)
731
while (file_buffer_start != -1)
735
write_end= file_buff->end();
736
if (slot_iter != slots.end() &&
737
write_end >= slot_iter->first)
739
write_end= slot_iter->first;
743
off_t write_length= write_end - write_start;
744
if (write_in_all(update_file_desc,
745
file_buff->ptr() + (write_start - file_buff->start()),
746
write_length) != write_length)
751
while (file_buff->end() <= slot_iter->second && file_buffer_start != -1)
752
file_buffer_start= file_buff->read_next();
753
write_start= slot_iter->second;
757
write_start= write_end;
759
if (write_end == file_buff->end())
760
file_buffer_start= file_buff->read_next();
763
if (::fsync(update_file_desc) ||
764
::close(update_file_desc))
766
share->update_file_opened= false;
768
// close current file
769
if (::close(file_desc))
771
if (::rename(update_file_name.c_str(), share->format.getFileName().c_str()))
774
share->needs_reopen= true;
778
pthread_mutex_unlock(&share->mutex);
781
critical_section_exit();
786
void FilesystemCursor::recordToString(string& output)
789
drizzled::String attribute;
790
for (Field **field= getTable()->getFields(); *field; ++field)
798
output.append(share->format.getColSeparatorHead());
801
if (not (*field)->is_null())
803
(*field)->setReadSet();
804
(*field)->val_str(&attribute, &attribute);
806
output.append(attribute.ptr(), attribute.length());
813
output.append(share->format.getRowSeparatorHead());
816
int FilesystemCursor::doInsertRecord(unsigned char * buf)
820
if (share->format.isTagFormat())
823
sql_command_type = session_sql_command(getTable()->getSession());
825
critical_section_enter();
831
recordToString(output_line);
833
int fd= ::open(share->format.getFileName().c_str(), O_WRONLY | O_APPEND);
836
critical_section_exit();
840
err_write= write_in_all(fd, output_line.c_str(), output_line.length());
846
err_close= ::close(fd);
850
critical_section_exit();
859
int FilesystemCursor::doUpdateRecord(const unsigned char *, unsigned char *)
861
if (share->format.isTagFormat())
863
if (openUpdateFile())
866
// get the update information
870
if (write_in_all(update_file_desc, str.c_str(), str.length()) < 0)
878
void FilesystemCursor::addSlot()
880
if (slots.size() > 0 && slots.back().second == current_position)
881
slots.back().second= next_position;
883
slots.push_back(make_pair(current_position, next_position));
886
int FilesystemCursor::doDeleteRecord(const unsigned char *)
888
if (share->format.isTagFormat())
894
int FilesystemEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
896
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), FILESYSTEM_EXT))
901
bool FilesystemEngine::validateCreateTableOption(const std::string &key,
902
const std::string &state)
904
return FormatInfo::validateOption(key, state);
907
int FilesystemEngine::doCreateTable(Session &,
909
const drizzled::TableIdentifier &identifier,
910
drizzled::message::Table &proto)
913
format.parseFromTable(&proto);
914
if (format.isFileGiven())
916
int err= ::open(format.getFileName().c_str(), O_RDONLY);
921
string new_path(identifier.getPath());
922
new_path+= FILESYSTEM_EXT;
923
fstream output(new_path.c_str(), ios::out | ios::binary);
928
if (! proto.SerializeToOstream(&output))
931
unlink(new_path.c_str());
938
static FilesystemEngine *filesystem_engine= NULL;
940
static int filesystem_init_func(drizzled::module::Context &context)
942
filesystem_engine= new FilesystemEngine("FILESYSTEM");
943
context.add(filesystem_engine);
948
DRIZZLE_DECLARE_PLUGIN
956
filesystem_init_func, /* Plugin Init */
957
NULL, /* system variables */
958
NULL /* config options */
960
DRIZZLE_DECLARE_PLUGIN_END;