2
Copyright (C) 2010 Zimin
4
This program is free software; you can redistribute it and/or
5
modify it under the terms of the GNU General Public License
6
as published by the Free Software Foundation; either version 2
7
of the License, or (at your option) any later version.
9
This program is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU General Public License for more details.
14
You should have received a copy of the GNU General Public License
15
along with this program; if not, write to the Free Software
16
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20
#include <drizzled/field.h>
21
#include <drizzled/field/blob.h>
22
#include <drizzled/error.h>
23
#include <drizzled/table.h>
24
#include <drizzled/session.h>
25
#include "drizzled/internal/my_sys.h"
26
#include <google/protobuf/io/zero_copy_stream.h>
27
#include <google/protobuf/io/zero_copy_stream_impl.h>
29
#include "filesystem_engine.h"
39
#include <boost/algorithm/string.hpp>
42
using namespace drizzled;
44
#define FILESYSTEM_EXT ".FST"
46
/* Stuff for shares */
47
pthread_mutex_t filesystem_mutex;
49
static const char *ha_filesystem_exts[] = {
54
class FilesystemEngine : public drizzled::plugin::StorageEngine
57
typedef std::map<string, FilesystemTableShare*> FilesystemMap;
58
FilesystemMap fs_open_tables;
60
FilesystemEngine(const string& name_arg)
61
: drizzled::plugin::StorageEngine(name_arg,
63
HTON_SKIP_STORE_LOCK |
64
HTON_CAN_INDEX_BLOBS |
68
table_definition_ext= FILESYSTEM_EXT;
69
pthread_mutex_init(&filesystem_mutex, MY_MUTEX_INIT_FAST);
71
virtual ~FilesystemEngine()
73
pthread_mutex_destroy(&filesystem_mutex);
76
virtual Cursor *create(Table &table)
78
return new FilesystemCursor(*this, table);
81
const char **bas_ext() const {
82
return ha_filesystem_exts;
85
bool validateCreateTableOption(const std::string &key, const std::string &state);
87
int doCreateTable(Session &,
89
const drizzled::identifier::Table &identifier,
90
drizzled::message::Table&);
92
int doGetTableDefinition(Session& ,
93
const drizzled::identifier::Table &,
94
drizzled::message::Table &);
96
int doDropTable(Session&, const identifier::Table &);
98
/* operations on FilesystemTableShare */
99
FilesystemTableShare *findOpenTable(const string table_name);
100
void addOpenTable(const string &table_name, FilesystemTableShare *);
101
void deleteOpenTable(const string &table_name);
103
uint32_t max_keys() const { return 0; }
104
uint32_t max_key_parts() const { return 0; }
105
uint32_t max_key_length() const { return 0; }
106
bool doDoesTableExist(Session& , const identifier::Table &);
107
int doRenameTable(Session&, const identifier::Table &, const identifier::Table &);
108
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
109
const drizzled::identifier::Schema &schema_identifier,
110
drizzled::identifier::Table::vector &set_of_identifiers);
112
void getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
113
const drizzled::identifier::Schema &schema_identifier,
114
drizzled::plugin::TableNameList *set_of_names,
115
drizzled::identifier::Table::vector *set_of_identifiers);
118
void FilesystemEngine::getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
119
const drizzled::identifier::Schema &schema_identifier,
120
drizzled::plugin::TableNameList *set_of_names,
121
drizzled::identifier::Table::vector *set_of_identifiers)
123
drizzled::CachedDirectory::Entries entries= directory.getEntries();
125
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
126
entry_iter != entries.end();
129
drizzled::CachedDirectory::Entry *entry= *entry_iter;
130
const string *filename= &entry->filename;
132
assert(not filename->empty());
134
string::size_type suffix_pos= filename->rfind('.');
136
if (suffix_pos != string::npos &&
137
boost::iequals(filename->substr(suffix_pos), FILESYSTEM_EXT) &&
138
filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX))
140
char uname[NAME_LEN + 1];
141
uint32_t file_name_len;
143
file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
144
uname[file_name_len - sizeof(FILESYSTEM_EXT) + 1]= '\0';
146
set_of_names->insert(uname);
147
if (set_of_identifiers)
148
set_of_identifiers->push_back(identifier::Table(schema_identifier, uname));
153
void FilesystemEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
154
const drizzled::identifier::Schema &schema_identifier,
155
drizzled::identifier::Table::vector &set_of_identifiers)
157
getTableNamesFromFilesystem(directory, schema_identifier, NULL, &set_of_identifiers);
160
int FilesystemEngine::doDropTable(Session &, const identifier::Table &identifier)
162
string new_path(identifier.getPath());
163
new_path+= FILESYSTEM_EXT;
164
int err= unlink(new_path.c_str());
172
bool FilesystemEngine::doDoesTableExist(Session &, const identifier::Table &identifier)
174
string proto_path(identifier.getPath());
175
proto_path.append(FILESYSTEM_EXT);
177
if (access(proto_path.c_str(), F_OK))
185
FilesystemTableShare *FilesystemEngine::findOpenTable(const string table_name)
187
FilesystemMap::iterator find_iter=
188
fs_open_tables.find(table_name);
190
if (find_iter != fs_open_tables.end())
191
return (*find_iter).second;
196
void FilesystemEngine::addOpenTable(const string &table_name, FilesystemTableShare *share)
198
fs_open_tables[table_name]= share;
201
void FilesystemEngine::deleteOpenTable(const string &table_name)
203
fs_open_tables.erase(table_name);
206
static int parseTaggedFile(const FormatInfo &fi, vector< map<string, string> > &v)
208
int filedesc= ::open(fi.getFileName().c_str(), O_RDONLY);
212
boost::scoped_ptr<TransparentFile> filebuffer(new TransparentFile);
213
filebuffer->init_buff(filedesc);
215
bool last_line_empty= false;
216
map<string, string> kv;
221
char ch= filebuffer->get_value(pos);
224
if (!last_line_empty)
233
if (!fi.isRowSeparator(ch))
239
// if we have a new empty line,
240
// it means we got the end of a section, push it to vector
243
if (!last_line_empty)
248
last_line_empty= true;
253
vector<string> sv, svcopy;
254
boost::split(sv, line, boost::is_any_of(fi.getColSeparator()));
255
for (vector<string>::iterator iter= sv.begin();
260
svcopy.push_back(*iter);
263
// the first splitted string as key,
264
// and the second splitted string as value.
265
string key(svcopy[0]);
267
if (svcopy.size() >= 2)
269
string value(svcopy[1]);
273
else if (svcopy.size() >= 1)
276
last_line_empty= false;
283
int FilesystemEngine::doGetTableDefinition(Session &,
284
const drizzled::identifier::Table &identifier,
285
drizzled::message::Table &table_proto)
287
string new_path(identifier.getPath());
288
new_path.append(FILESYSTEM_EXT);
290
int fd= ::open(new_path.c_str(), O_RDONLY);
294
google::protobuf::io::ZeroCopyInputStream* input=
295
new google::protobuf::io::FileInputStream(fd);
298
return HA_ERR_CRASHED_ON_USAGE;
300
if (not table_proto.ParseFromZeroCopyStream(input))
304
if (not table_proto.IsInitialized())
306
my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
307
table_proto.name().empty() ? " " : table_proto.name().c_str(),
308
table_proto.InitializationErrorString().c_str());
310
return ER_CORRUPT_TABLE_DEFINITION;
313
return HA_ERR_CRASHED_ON_USAGE;
317
// if the file is a tagged file such as /proc/meminfo
318
// then columns of this table are added dynamically here.
320
format.parseFromTable(&table_proto);
321
if (not format.isTagFormat() || not format.isFileGiven())
327
std::vector< std::map<std::string, std::string> > vm;
328
if (parseTaggedFile(format, vm) != 0)
334
if (vm.size() == 0) {
339
// we don't care what user provides, just clear them all
340
table_proto.clear_field();
341
// we take the first section as sample
342
std::map<string, string> kv= vm[0];
343
for (std::map<string, string>::iterator iter= kv.begin();
347
// add columns to table proto
348
message::Table::Field *field= table_proto.add_field();
349
field->set_name(iter->first);
350
field->set_type(drizzled::message::Table::Field::VARCHAR);
351
message::Table::Field::StringFieldOptions *stringoption= field->mutable_string_options();
352
stringoption->set_length(iter->second.length() + 1);
359
FilesystemTableShare::FilesystemTableShare(const string table_name_arg)
360
: use_count(0), table_name(table_name_arg),
361
update_file_opened(false),
366
FilesystemTableShare::~FilesystemTableShare()
368
pthread_mutex_destroy(&mutex);
371
FilesystemTableShare *FilesystemCursor::get_share(const char *table_name)
373
Guard g(filesystem_mutex);
375
FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
376
share= a_engine->findOpenTable(table_name);
379
If share is not present in the hash, create a new share and
380
initialize its members.
384
share= new (nothrow) FilesystemTableShare(table_name);
390
share->format.parseFromTable(getTable()->getShare()->getTableProto());
391
if (!share->format.isFileGiven())
396
* for taggered file such as /proc/meminfo,
397
* we pre-process it first, and store the parsing result in a map.
399
if (share->format.isTagFormat())
401
if (parseTaggedFile(share->format, share->vm) != 0)
406
a_engine->addOpenTable(share->table_name, share);
408
pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
415
void FilesystemCursor::free_share()
417
Guard g(filesystem_mutex);
419
if (!--share->use_count){
420
FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
421
a_engine->deleteOpenTable(share->table_name);
422
pthread_mutex_destroy(&share->mutex);
427
void FilesystemCursor::critical_section_enter()
429
if (sql_command_type == SQLCOM_ALTER_TABLE ||
430
sql_command_type == SQLCOM_UPDATE ||
431
sql_command_type == SQLCOM_DELETE ||
432
sql_command_type == SQLCOM_INSERT ||
433
sql_command_type == SQLCOM_INSERT_SELECT ||
434
sql_command_type == SQLCOM_REPLACE ||
435
sql_command_type == SQLCOM_REPLACE_SELECT)
436
share->filesystem_lock.scan_update_begin();
438
share->filesystem_lock.scan_begin();
440
thread_locked = true;
443
void FilesystemCursor::critical_section_exit()
445
if (sql_command_type == SQLCOM_ALTER_TABLE ||
446
sql_command_type == SQLCOM_UPDATE ||
447
sql_command_type == SQLCOM_DELETE ||
448
sql_command_type == SQLCOM_INSERT ||
449
sql_command_type == SQLCOM_INSERT_SELECT ||
450
sql_command_type == SQLCOM_REPLACE ||
451
sql_command_type == SQLCOM_REPLACE_SELECT)
452
share->filesystem_lock.scan_update_end();
454
share->filesystem_lock.scan_end();
456
thread_locked = false;
459
FilesystemCursor::FilesystemCursor(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
460
: Cursor(engine_arg, table_arg),
461
file_buff(new TransparentFile),
466
int FilesystemCursor::doOpen(const drizzled::identifier::Table &identifier, int, uint32_t)
468
if (!(share= get_share(identifier.getPath().c_str())))
471
file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
475
return ER_CANT_OPEN_FILE;
478
ref_length= sizeof(off_t);
482
int FilesystemCursor::close(void)
484
int err= ::close(file_desc);
491
int FilesystemCursor::doStartTableScan(bool)
493
sql_command_type = getTable()->getSession()->getSqlCommand();
496
critical_section_exit();
497
critical_section_enter();
499
if (share->format.isTagFormat())
508
if (share->needs_reopen)
510
file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
512
return HA_ERR_CRASHED_ON_USAGE;
513
share->needs_reopen= false;
515
file_buff->init_buff(file_desc);
519
int FilesystemCursor::find_current_row(unsigned char *buf)
521
ptrdiff_t row_offset= buf - getTable()->record[0];
523
next_position= current_position;
526
bool line_done= false;
527
bool line_blank= true;
528
Field **field= getTable()->getFields();
529
for (; !line_done && *field; ++next_position)
531
char ch= file_buff->get_value(next_position);
533
return HA_ERR_END_OF_FILE;
535
if (share->format.isEscapedChar(ch))
537
// read next character
538
ch= file_buff->get_value(++next_position);
540
return HA_ERR_END_OF_FILE;
542
content.push_back(FormatInfo::getEscapedChar(ch));
547
// if we find separator
548
bool is_row= share->format.isRowSeparator(ch);
549
bool is_col= share->format.isColSeparator(ch);
552
if (share->format.isSeparatorModeGeneral() && is_row && line_blank)
554
if (share->format.isSeparatorModeWeak() && is_col)
558
if (is_row || is_col)
560
(*field)->move_field_offset(row_offset);
561
if (!content.empty())
563
(*field)->set_notnull();
564
if ((*field)->isReadSet() || (*field)->isWriteSet())
566
(*field)->setWriteSet();
567
(*field)->store_and_check(CHECK_FIELD_WARN,
569
(uint32_t)content.length(),
574
(*field)->set_default();
578
(*field)->set_null();
579
(*field)->move_field_offset(-row_offset);
590
content.push_back(ch);
594
for (; *field; ++field)
596
(*field)->move_field_offset(row_offset);
597
(*field)->set_notnull();
598
(*field)->set_default();
599
(*field)->move_field_offset(-row_offset);
604
// eat up characters when line_done
607
char ch= file_buff->get_value(next_position);
608
if (share->format.isRowSeparator(ch))
616
int FilesystemCursor::rnd_next(unsigned char *buf)
618
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
619
if (share->format.isTagFormat())
621
if (tag_depth >= share->vm.size())
622
return HA_ERR_END_OF_FILE;
624
ptrdiff_t row_offset= buf - getTable()->record[0];
625
for (Field **field= getTable()->getFields(); *field; field++)
627
string key((*field)->field_name);
628
string content= share->vm[tag_depth][key];
630
(*field)->move_field_offset(row_offset);
631
if (!content.empty())
633
(*field)->set_notnull();
634
if ((*field)->isReadSet() || (*field)->isWriteSet())
636
(*field)->setWriteSet();
637
(*field)->store_and_check(CHECK_FIELD_WARN,
639
(uint32_t)content.length(),
644
(*field)->set_default();
649
(*field)->set_null();
651
(*field)->move_field_offset(-row_offset);
657
current_position= next_position;
658
return find_current_row(buf);
661
void FilesystemCursor::position(const unsigned char *)
663
*reinterpret_cast<off_t *>(ref)= current_position;
666
int FilesystemCursor::rnd_pos(unsigned char * buf, unsigned char *pos)
668
ha_statistic_increment(&system_status_var::ha_read_rnd_count);
669
current_position= *reinterpret_cast<off_t *>(pos);
670
return find_current_row(buf);
673
int FilesystemCursor::info(uint32_t)
675
if (stats.records < 2)
680
int FilesystemCursor::openUpdateFile()
682
if (!share->update_file_opened)
685
if (stat(share->format.getFileName().c_str(), &st) < 0)
687
update_file_name= share->format.getFileName();
688
update_file_name.append(".UPDATE");
689
unlink(update_file_name.c_str());
690
update_file_desc= ::open(update_file_name.c_str(),
691
O_RDWR | O_CREAT | O_TRUNC,
693
if (update_file_desc < 0)
697
share->update_file_opened= true;
702
int FilesystemCursor::doEndTableScan()
704
sql_command_type = getTable->getSession()->getSqlCommand();
706
if (share->format.isTagFormat())
709
critical_section_exit();
713
if (slots.size() == 0)
716
critical_section_exit();
721
sort(slots.begin(), slots.end());
722
vector< pair<off_t, off_t> >::iterator slot_iter= slots.begin();
723
off_t write_start= 0;
725
off_t file_buffer_start= 0;
727
pthread_mutex_lock(&share->mutex);
729
file_buff->init_buff(file_desc);
730
if (openUpdateFile() < 0)
733
while (file_buffer_start != -1)
737
write_end= file_buff->end();
738
if (slot_iter != slots.end() &&
739
write_end >= slot_iter->first)
741
write_end= slot_iter->first;
745
off_t write_length= write_end - write_start;
746
if (write_in_all(update_file_desc,
747
file_buff->ptr() + (write_start - file_buff->start()),
748
write_length) != write_length)
753
while (file_buff->end() <= slot_iter->second && file_buffer_start != -1)
754
file_buffer_start= file_buff->read_next();
755
write_start= slot_iter->second;
759
write_start= write_end;
761
if (write_end == file_buff->end())
762
file_buffer_start= file_buff->read_next();
765
if (::fsync(update_file_desc) ||
766
::close(update_file_desc))
768
share->update_file_opened= false;
770
// close current file
771
if (::close(file_desc))
773
if (::rename(update_file_name.c_str(), share->format.getFileName().c_str()))
776
share->needs_reopen= true;
780
pthread_mutex_unlock(&share->mutex);
783
critical_section_exit();
788
void FilesystemCursor::recordToString(string& output)
791
drizzled::String attribute;
792
for (Field **field= getTable()->getFields(); *field; ++field)
800
output.append(share->format.getColSeparatorHead());
803
if (not (*field)->is_null())
805
(*field)->setReadSet();
806
(*field)->val_str(&attribute, &attribute);
808
output.append(attribute.ptr(), attribute.length());
815
output.append(share->format.getRowSeparatorHead());
818
int FilesystemCursor::doInsertRecord(unsigned char * buf)
822
if (share->format.isTagFormat())
825
sql_command_type = getTable()->getSession()->getSqlCommand();
827
critical_section_enter();
833
recordToString(output_line);
835
int fd= ::open(share->format.getFileName().c_str(), O_WRONLY | O_APPEND);
838
critical_section_exit();
842
err_write= write_in_all(fd, output_line.c_str(), output_line.length());
848
err_close= ::close(fd);
852
critical_section_exit();
861
int FilesystemCursor::doUpdateRecord(const unsigned char *, unsigned char *)
863
if (share->format.isTagFormat())
865
if (openUpdateFile())
868
// get the update information
872
if (write_in_all(update_file_desc, str.c_str(), str.length()) < 0)
880
void FilesystemCursor::addSlot()
882
if (slots.size() > 0 && slots.back().second == current_position)
883
slots.back().second= next_position;
885
slots.push_back(make_pair(current_position, next_position));
888
int FilesystemCursor::doDeleteRecord(const unsigned char *)
890
if (share->format.isTagFormat())
896
int FilesystemEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
898
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), FILESYSTEM_EXT))
903
bool FilesystemEngine::validateCreateTableOption(const std::string &key,
904
const std::string &state)
906
return FormatInfo::validateOption(key, state);
909
int FilesystemEngine::doCreateTable(Session &,
911
const drizzled::identifier::Table &identifier,
912
drizzled::message::Table &proto)
915
format.parseFromTable(&proto);
916
if (format.isFileGiven())
918
int err= ::open(format.getFileName().c_str(), O_RDONLY);
923
string new_path(identifier.getPath());
924
new_path+= FILESYSTEM_EXT;
925
fstream output(new_path.c_str(), ios::out | ios::binary);
930
if (! proto.SerializeToOstream(&output))
933
unlink(new_path.c_str());
940
static FilesystemEngine *filesystem_engine= NULL;
942
static int filesystem_init_func(drizzled::module::Context &context)
944
filesystem_engine= new FilesystemEngine("FILESYSTEM");
945
context.add(filesystem_engine);
950
DRIZZLE_DECLARE_PLUGIN
958
filesystem_init_func, /* Plugin Init */
960
NULL /* config options */
962
DRIZZLE_DECLARE_PLUGIN_END;