~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/filesystem_engine/filesystem_engine.cc

  • Committer: Brian Aker
  • Date: 2011-02-07 23:29:10 UTC
  • mto: (2154.2.1 drizzle-build)
  • mto: This revision was merged to the branch mainline in revision 2161.
  • Revision ID: brian@tangent.org-20110207232910-lpkg95qal61supfh
Move ha_data out.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
  Copyright (C) 2010 Zimin
 
3
 
 
4
  This program is free software; you can redistribute it and/or
 
5
  modify it under the terms of the GNU General Public License
 
6
  as published by the Free Software Foundation; either version 2
 
7
  of the License, or (at your option) any later version.
 
8
 
 
9
  This program is distributed in the hope that it will be useful,
 
10
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
  GNU General Public License for more details.
 
13
 
 
14
  You should have received a copy of the GNU General Public License
 
15
  along with this program; if not, write to the Free Software
 
16
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
17
*/
 
18
 
 
19
#include "config.h"
 
20
#include <drizzled/field.h>
 
21
#include <drizzled/field/blob.h>
 
22
#include <drizzled/error.h>
 
23
#include <drizzled/table.h>
 
24
#include <drizzled/session.h>
 
25
#include "drizzled/internal/my_sys.h"
 
26
#include <google/protobuf/io/zero_copy_stream.h>
 
27
#include <google/protobuf/io/zero_copy_stream_impl.h>
 
28
 
 
29
#include "filesystem_engine.h"
 
30
#include "utility.h"
 
31
 
 
32
#include <fcntl.h>
 
33
 
 
34
#include <string>
 
35
#include <map>
 
36
#include <fstream>
 
37
#include <sstream>
 
38
#include <iostream>
 
39
#include <boost/algorithm/string.hpp>
 
40
 
 
41
using namespace std;
 
42
using namespace drizzled;
 
43
 
 
44
#define FILESYSTEM_EXT ".FST"
 
45
 
 
46
/* Stuff for shares */
 
47
pthread_mutex_t filesystem_mutex;
 
48
 
 
49
static const char *ha_filesystem_exts[] = {
 
50
  FILESYSTEM_EXT,
 
51
  NULL
 
52
};
 
53
 
 
54
class FilesystemEngine : public drizzled::plugin::StorageEngine
 
55
{
 
56
private:
 
57
  typedef std::map<string, FilesystemTableShare*> FilesystemMap;
 
58
  FilesystemMap fs_open_tables;
 
59
public:
 
60
  FilesystemEngine(const string& name_arg)
 
61
   : drizzled::plugin::StorageEngine(name_arg,
 
62
                                     HTON_NULL_IN_KEY |
 
63
                                     HTON_SKIP_STORE_LOCK |
 
64
                                     HTON_CAN_INDEX_BLOBS |
 
65
                                     HTON_AUTO_PART_KEY),
 
66
     fs_open_tables()
 
67
  {
 
68
    table_definition_ext= FILESYSTEM_EXT;
 
69
    pthread_mutex_init(&filesystem_mutex, MY_MUTEX_INIT_FAST);
 
70
  }
 
71
  virtual ~FilesystemEngine()
 
72
  {
 
73
    pthread_mutex_destroy(&filesystem_mutex);
 
74
  }
 
75
 
 
76
  virtual Cursor *create(Table &table)
 
77
  {
 
78
    return new FilesystemCursor(*this, table);
 
79
  }
 
80
 
 
81
  const char **bas_ext() const {
 
82
    return ha_filesystem_exts;
 
83
  }
 
84
 
 
85
  bool validateCreateTableOption(const std::string &key, const std::string &state);
 
86
 
 
87
  int doCreateTable(Session &,
 
88
                    Table &table_arg,
 
89
                    const drizzled::identifier::Table &identifier,
 
90
                    drizzled::message::Table&);
 
91
 
 
92
  int doGetTableDefinition(Session& ,
 
93
                           const drizzled::identifier::Table &,
 
94
                           drizzled::message::Table &);
 
95
 
 
96
  int doDropTable(Session&, const identifier::Table &);
 
97
 
 
98
  /* operations on FilesystemTableShare */
 
99
  FilesystemTableShare *findOpenTable(const string table_name);
 
100
  void addOpenTable(const string &table_name, FilesystemTableShare *);
 
101
  void deleteOpenTable(const string &table_name);
 
102
 
 
103
  uint32_t max_keys()          const { return 0; }
 
104
  uint32_t max_key_parts()     const { return 0; }
 
105
  uint32_t max_key_length()    const { return 0; }
 
106
  bool doDoesTableExist(Session& , const identifier::Table &);
 
107
  int doRenameTable(Session&, const identifier::Table &, const identifier::Table &);
 
108
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
109
                             const drizzled::identifier::Schema &schema_identifier,
 
110
                             drizzled::identifier::Table::vector &set_of_identifiers);
 
111
private:
 
112
  void getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
 
113
                                   const drizzled::identifier::Schema &schema_identifier,
 
114
                                   drizzled::plugin::TableNameList *set_of_names,
 
115
                                   drizzled::identifier::Table::vector *set_of_identifiers);
 
116
};
 
117
 
 
118
void FilesystemEngine::getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
 
119
                                                   const drizzled::identifier::Schema &schema_identifier,
 
120
                                                   drizzled::plugin::TableNameList *set_of_names,
 
121
                                                   drizzled::identifier::Table::vector *set_of_identifiers)
 
122
{
 
123
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
124
 
 
125
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
 
126
      entry_iter != entries.end();
 
127
      ++entry_iter)
 
128
  {
 
129
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
130
    const string *filename= &entry->filename;
 
131
 
 
132
    assert(not filename->empty());
 
133
 
 
134
    string::size_type suffix_pos= filename->rfind('.');
 
135
 
 
136
    if (suffix_pos != string::npos &&
 
137
        boost::iequals(filename->substr(suffix_pos), FILESYSTEM_EXT) &&
 
138
        filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX))
 
139
    {
 
140
      char uname[NAME_LEN + 1];
 
141
      uint32_t file_name_len;
 
142
 
 
143
      file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
144
      uname[file_name_len - sizeof(FILESYSTEM_EXT) + 1]= '\0';
 
145
      if (set_of_names)
 
146
        set_of_names->insert(uname);
 
147
      if (set_of_identifiers)
 
148
        set_of_identifiers->push_back(identifier::Table(schema_identifier, uname));
 
149
    }
 
150
  }
 
151
}
 
152
 
 
153
void FilesystemEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
154
                                             const drizzled::identifier::Schema &schema_identifier,
 
155
                                             drizzled::identifier::Table::vector &set_of_identifiers)
 
156
{
 
157
  getTableNamesFromFilesystem(directory, schema_identifier, NULL, &set_of_identifiers);
 
158
}
 
159
 
 
160
int FilesystemEngine::doDropTable(Session &, const identifier::Table &identifier)
 
161
{
 
162
  string new_path(identifier.getPath());
 
163
  new_path+= FILESYSTEM_EXT;
 
164
  int err= unlink(new_path.c_str());
 
165
  if (err)
 
166
  {
 
167
    err= errno;
 
168
  }
 
169
  return err;
 
170
}
 
171
 
 
172
bool FilesystemEngine::doDoesTableExist(Session &, const identifier::Table &identifier)
 
173
{
 
174
  string proto_path(identifier.getPath());
 
175
  proto_path.append(FILESYSTEM_EXT);
 
176
 
 
177
  if (access(proto_path.c_str(), F_OK))
 
178
  {
 
179
    return false;
 
180
  }
 
181
 
 
182
  return true;
 
183
}
 
184
 
 
185
FilesystemTableShare *FilesystemEngine::findOpenTable(const string table_name)
 
186
{
 
187
  FilesystemMap::iterator find_iter=
 
188
    fs_open_tables.find(table_name);
 
189
 
 
190
  if (find_iter != fs_open_tables.end())
 
191
    return (*find_iter).second;
 
192
  else
 
193
    return NULL;
 
194
}
 
195
 
 
196
void FilesystemEngine::addOpenTable(const string &table_name, FilesystemTableShare *share)
 
197
{
 
198
  fs_open_tables[table_name]= share;
 
199
}
 
200
 
 
201
void FilesystemEngine::deleteOpenTable(const string &table_name)
 
202
{
 
203
  fs_open_tables.erase(table_name);
 
204
}
 
205
 
 
206
static int parseTaggedFile(const FormatInfo &fi, vector< map<string, string> > &v)
 
207
{
 
208
  int filedesc= ::open(fi.getFileName().c_str(), O_RDONLY);
 
209
  if (filedesc < 0)
 
210
    return errno;
 
211
 
 
212
  boost::scoped_ptr<TransparentFile> filebuffer(new TransparentFile);
 
213
  filebuffer->init_buff(filedesc);
 
214
 
 
215
  bool last_line_empty= false;
 
216
  map<string, string> kv;
 
217
  int pos= 0;
 
218
  string line;
 
219
  while (1)
 
220
  {
 
221
    char ch= filebuffer->get_value(pos);
 
222
    if (ch == '\0')
 
223
    {
 
224
      if (!last_line_empty)
 
225
      {
 
226
        v.push_back(kv);
 
227
        kv.clear();
 
228
      }
 
229
      break;
 
230
    }
 
231
    ++pos;
 
232
 
 
233
    if (!fi.isRowSeparator(ch))
 
234
    {
 
235
      line.push_back(ch);
 
236
      continue;
 
237
    }
 
238
 
 
239
    // if we have a new empty line,
 
240
    // it means we got the end of a section, push it to vector
 
241
    if (line.empty())
 
242
    {
 
243
      if (!last_line_empty)
 
244
      {
 
245
        v.push_back(kv);
 
246
        kv.clear();
 
247
      }
 
248
      last_line_empty= true;
 
249
      continue;
 
250
    }
 
251
 
 
252
    // parse the line
 
253
    vector<string> sv, svcopy;
 
254
    boost::split(sv, line, boost::is_any_of(fi.getColSeparator()));
 
255
    for (vector<string>::iterator iter= sv.begin();
 
256
         iter != sv.end();
 
257
         ++iter)
 
258
    {
 
259
      if (!iter->empty())
 
260
        svcopy.push_back(*iter);
 
261
    }
 
262
 
 
263
    // the first splitted string as key,
 
264
    // and the second splitted string as value.
 
265
    string key(svcopy[0]);
 
266
    boost::trim(key);
 
267
    if (svcopy.size() >= 2)
 
268
    {
 
269
      string value(svcopy[1]);
 
270
      boost::trim(value);
 
271
      kv[key]= value;
 
272
    }
 
273
    else if (svcopy.size() >= 1)
 
274
      kv[key]= "";
 
275
 
 
276
    last_line_empty= false;
 
277
    line.clear();
 
278
  }
 
279
  close(filedesc);
 
280
  return 0;
 
281
}
 
282
 
 
283
int FilesystemEngine::doGetTableDefinition(Session &,
 
284
                                           const drizzled::identifier::Table &identifier,
 
285
                                           drizzled::message::Table &table_proto)
 
286
{
 
287
  string new_path(identifier.getPath());
 
288
  new_path.append(FILESYSTEM_EXT);
 
289
 
 
290
  int fd= ::open(new_path.c_str(), O_RDONLY);
 
291
  if (fd < 0)
 
292
    return ENOENT;
 
293
 
 
294
  google::protobuf::io::ZeroCopyInputStream* input=
 
295
    new google::protobuf::io::FileInputStream(fd);
 
296
 
 
297
  if (not input)
 
298
    return HA_ERR_CRASHED_ON_USAGE;
 
299
 
 
300
  if (not table_proto.ParseFromZeroCopyStream(input))
 
301
  {
 
302
    close(fd);
 
303
    delete input;
 
304
    if (not table_proto.IsInitialized())
 
305
    {
 
306
      my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
 
307
               table_proto.name().empty() ? " " : table_proto.name().c_str(),
 
308
               table_proto.InitializationErrorString().c_str());
 
309
 
 
310
      return ER_CORRUPT_TABLE_DEFINITION;
 
311
    }
 
312
 
 
313
    return HA_ERR_CRASHED_ON_USAGE;
 
314
  }
 
315
  delete input;
 
316
 
 
317
  // if the file is a tagged file such as /proc/meminfo
 
318
  // then columns of this table are added dynamically here.
 
319
  FormatInfo format;
 
320
  format.parseFromTable(&table_proto);
 
321
  if (not format.isTagFormat() || not format.isFileGiven())
 
322
  {
 
323
    close(fd);
 
324
    return EEXIST;
 
325
  }
 
326
 
 
327
  std::vector< std::map<std::string, std::string> > vm;
 
328
  if (parseTaggedFile(format, vm) != 0)
 
329
  {
 
330
    close(fd);
 
331
 
 
332
    return EEXIST;
 
333
  }
 
334
  if (vm.size() == 0) {
 
335
    close(fd);
 
336
    return EEXIST;
 
337
  }
 
338
 
 
339
  // we don't care what user provides, just clear them all
 
340
  table_proto.clear_field();
 
341
  // we take the first section as sample
 
342
  std::map<string, string> kv= vm[0];
 
343
  for (std::map<string, string>::iterator iter= kv.begin();
 
344
       iter != kv.end();
 
345
       ++iter)
 
346
  {
 
347
    // add columns to table proto
 
348
    message::Table::Field *field= table_proto.add_field();
 
349
    field->set_name(iter->first);
 
350
    field->set_type(drizzled::message::Table::Field::VARCHAR);
 
351
    message::Table::Field::StringFieldOptions *stringoption= field->mutable_string_options();
 
352
    stringoption->set_length(iter->second.length() + 1);
 
353
  }
 
354
 
 
355
  close(fd);
 
356
  return EEXIST;
 
357
}
 
358
 
 
359
FilesystemTableShare::FilesystemTableShare(const string table_name_arg)
 
360
  : use_count(0), table_name(table_name_arg),
 
361
  update_file_opened(false),
 
362
  needs_reopen(false)
 
363
{
 
364
}
 
365
 
 
366
FilesystemTableShare::~FilesystemTableShare()
 
367
{
 
368
  pthread_mutex_destroy(&mutex);
 
369
}
 
370
 
 
371
FilesystemTableShare *FilesystemCursor::get_share(const char *table_name)
 
372
{
 
373
  Guard g(filesystem_mutex);
 
374
 
 
375
  FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
 
376
  share= a_engine->findOpenTable(table_name);
 
377
 
 
378
  /*
 
379
    If share is not present in the hash, create a new share and
 
380
    initialize its members.
 
381
  */
 
382
  if (share == NULL)
 
383
  {
 
384
    share= new (nothrow) FilesystemTableShare(table_name);
 
385
    if (share == NULL)
 
386
    {
 
387
      return NULL;
 
388
    }
 
389
 
 
390
    share->format.parseFromTable(getTable()->getShare()->getTableProto());
 
391
    if (!share->format.isFileGiven())
 
392
    {
 
393
      return NULL;
 
394
    }
 
395
    /*
 
396
     * for taggered file such as /proc/meminfo,
 
397
     * we pre-process it first, and store the parsing result in a map.
 
398
     */
 
399
    if (share->format.isTagFormat())
 
400
    {
 
401
      if (parseTaggedFile(share->format, share->vm) != 0)
 
402
      {
 
403
        return NULL;
 
404
      }
 
405
    }
 
406
    a_engine->addOpenTable(share->table_name, share);
 
407
 
 
408
    pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
 
409
  }
 
410
  share->use_count++;
 
411
 
 
412
  return share;
 
413
}
 
414
 
 
415
void FilesystemCursor::free_share()
 
416
{
 
417
  Guard g(filesystem_mutex);
 
418
 
 
419
  if (!--share->use_count){
 
420
    FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
 
421
    a_engine->deleteOpenTable(share->table_name);
 
422
    pthread_mutex_destroy(&share->mutex);
 
423
    delete share;
 
424
  }
 
425
}
 
426
 
 
427
void FilesystemCursor::critical_section_enter()
 
428
{
 
429
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
 
430
      sql_command_type == SQLCOM_UPDATE ||
 
431
      sql_command_type == SQLCOM_DELETE ||
 
432
      sql_command_type == SQLCOM_INSERT ||
 
433
      sql_command_type == SQLCOM_INSERT_SELECT ||
 
434
      sql_command_type == SQLCOM_REPLACE ||
 
435
      sql_command_type == SQLCOM_REPLACE_SELECT)
 
436
    share->filesystem_lock.scan_update_begin();
 
437
  else
 
438
    share->filesystem_lock.scan_begin();
 
439
 
 
440
  thread_locked = true;
 
441
}
 
442
 
 
443
void FilesystemCursor::critical_section_exit()
 
444
{
 
445
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
 
446
      sql_command_type == SQLCOM_UPDATE ||
 
447
      sql_command_type == SQLCOM_DELETE ||
 
448
      sql_command_type == SQLCOM_INSERT ||
 
449
      sql_command_type == SQLCOM_INSERT_SELECT ||
 
450
      sql_command_type == SQLCOM_REPLACE ||
 
451
      sql_command_type == SQLCOM_REPLACE_SELECT)
 
452
    share->filesystem_lock.scan_update_end();
 
453
  else
 
454
    share->filesystem_lock.scan_end();
 
455
 
 
456
  thread_locked = false;
 
457
}
 
458
 
 
459
FilesystemCursor::FilesystemCursor(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
 
460
  : Cursor(engine_arg, table_arg),
 
461
    file_buff(new TransparentFile),
 
462
    thread_locked(false)
 
463
{
 
464
}
 
465
 
 
466
int FilesystemCursor::doOpen(const drizzled::identifier::Table &identifier, int, uint32_t)
 
467
{
 
468
  if (!(share= get_share(identifier.getPath().c_str())))
 
469
    return ENOENT;
 
470
 
 
471
  file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
 
472
  if (file_desc < 0)
 
473
  {
 
474
    free_share();
 
475
    return ER_CANT_OPEN_FILE;
 
476
  }
 
477
 
 
478
  ref_length= sizeof(off_t);
 
479
  return 0;
 
480
}
 
481
 
 
482
int FilesystemCursor::close(void)
 
483
{
 
484
  int err= ::close(file_desc);
 
485
  if (err < 0)
 
486
    err= errno;
 
487
  free_share();
 
488
  return err;
 
489
}
 
490
 
 
491
int FilesystemCursor::doStartTableScan(bool)
 
492
{
 
493
  sql_command_type = session_sql_command(getTable()->getSession());
 
494
 
 
495
  if (thread_locked)
 
496
    critical_section_exit();
 
497
  critical_section_enter();
 
498
 
 
499
  if (share->format.isTagFormat())
 
500
  {
 
501
    tag_depth= 0;
 
502
    return 0;
 
503
  }
 
504
 
 
505
  current_position= 0;
 
506
  next_position= 0;
 
507
  slots.clear();
 
508
  if (share->needs_reopen)
 
509
  {
 
510
    file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
 
511
    if (file_desc < 0)
 
512
      return HA_ERR_CRASHED_ON_USAGE;
 
513
    share->needs_reopen= false;
 
514
  }
 
515
  file_buff->init_buff(file_desc);
 
516
  return 0;
 
517
}
 
518
 
 
519
int FilesystemCursor::find_current_row(unsigned char *buf)
 
520
{
 
521
  ptrdiff_t row_offset= buf - getTable()->record[0];
 
522
 
 
523
  next_position= current_position;
 
524
 
 
525
  string content;
 
526
  bool line_done= false;
 
527
  bool line_blank= true;
 
528
  Field **field= getTable()->getFields();
 
529
  for (; !line_done && *field; ++next_position)
 
530
  {
 
531
    char ch= file_buff->get_value(next_position);
 
532
    if (ch == '\0')
 
533
      return HA_ERR_END_OF_FILE;
 
534
 
 
535
    if (share->format.isEscapedChar(ch))
 
536
    {
 
537
      // read next character
 
538
      ch= file_buff->get_value(++next_position);
 
539
      if (ch == '\0')
 
540
        return HA_ERR_END_OF_FILE;
 
541
 
 
542
      content.push_back(FormatInfo::getEscapedChar(ch));
 
543
 
 
544
      continue;
 
545
    }
 
546
 
 
547
    // if we find separator
 
548
    bool is_row= share->format.isRowSeparator(ch);
 
549
    bool is_col= share->format.isColSeparator(ch);
 
550
    if (content.empty())
 
551
    {
 
552
      if (share->format.isSeparatorModeGeneral() && is_row && line_blank)
 
553
        continue;
 
554
      if (share->format.isSeparatorModeWeak() && is_col)
 
555
        continue;
 
556
    }
 
557
 
 
558
    if (is_row || is_col)
 
559
    {
 
560
      (*field)->move_field_offset(row_offset);
 
561
      if (!content.empty())
 
562
      {
 
563
        (*field)->set_notnull();
 
564
        if ((*field)->isReadSet() || (*field)->isWriteSet())
 
565
        {
 
566
          (*field)->setWriteSet();
 
567
          (*field)->store_and_check(CHECK_FIELD_WARN,
 
568
                                    content.c_str(),
 
569
                                    (uint32_t)content.length(),
 
570
                                    &my_charset_bin);
 
571
        }
 
572
        else
 
573
        {
 
574
          (*field)->set_default();
 
575
        }
 
576
      }
 
577
      else
 
578
        (*field)->set_null();
 
579
      (*field)->move_field_offset(-row_offset);
 
580
 
 
581
      content.clear();
 
582
      ++field;
 
583
 
 
584
      line_blank= false;
 
585
      if (is_row)
 
586
        line_done= true;
 
587
 
 
588
      continue;
 
589
    }
 
590
    content.push_back(ch);
 
591
  }
 
592
  if (line_done)
 
593
  {
 
594
    for (; *field; ++field)
 
595
    {
 
596
      (*field)->move_field_offset(row_offset);
 
597
      (*field)->set_notnull();
 
598
      (*field)->set_default();
 
599
      (*field)->move_field_offset(-row_offset);
 
600
    }
 
601
  }
 
602
  else
 
603
  {
 
604
    // eat up characters when line_done
 
605
    while (!line_done)
 
606
    {
 
607
      char ch= file_buff->get_value(next_position);
 
608
      if (share->format.isRowSeparator(ch))
 
609
        line_done= true;
 
610
      ++next_position;
 
611
    }
 
612
  }
 
613
  return 0;
 
614
}
 
615
 
 
616
int FilesystemCursor::rnd_next(unsigned char *buf)
 
617
{
 
618
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
619
  if (share->format.isTagFormat())
 
620
  {
 
621
    if (tag_depth >= share->vm.size())
 
622
      return HA_ERR_END_OF_FILE;
 
623
 
 
624
    ptrdiff_t row_offset= buf - getTable()->record[0];
 
625
    for (Field **field= getTable()->getFields(); *field; field++)
 
626
    {
 
627
      string key((*field)->field_name);
 
628
      string content= share->vm[tag_depth][key];
 
629
 
 
630
      (*field)->move_field_offset(row_offset);
 
631
      if (!content.empty())
 
632
      {
 
633
        (*field)->set_notnull();
 
634
        if ((*field)->isReadSet() || (*field)->isWriteSet())
 
635
        {
 
636
          (*field)->setWriteSet();
 
637
          (*field)->store_and_check(CHECK_FIELD_WARN,
 
638
                                    content.c_str(),
 
639
                                    (uint32_t)content.length(),
 
640
                                    &my_charset_bin);
 
641
        }
 
642
        else
 
643
        {
 
644
          (*field)->set_default();
 
645
        }
 
646
      }
 
647
      else
 
648
      {
 
649
        (*field)->set_null();
 
650
      }
 
651
      (*field)->move_field_offset(-row_offset);
 
652
    }
 
653
    ++tag_depth;
 
654
    return 0;
 
655
  }
 
656
  // normal file
 
657
  current_position= next_position;
 
658
  return find_current_row(buf);
 
659
}
 
660
 
 
661
void FilesystemCursor::position(const unsigned char *)
 
662
{
 
663
  *reinterpret_cast<off_t *>(ref)= current_position;
 
664
}
 
665
 
 
666
int FilesystemCursor::rnd_pos(unsigned char * buf, unsigned char *pos)
 
667
{
 
668
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
 
669
  current_position= *reinterpret_cast<off_t *>(pos);
 
670
  return find_current_row(buf);
 
671
}
 
672
 
 
673
int FilesystemCursor::info(uint32_t)
 
674
{
 
675
  if (stats.records < 2)
 
676
    stats.records= 2;
 
677
  return 0;
 
678
}
 
679
 
 
680
int FilesystemCursor::openUpdateFile()
 
681
{
 
682
  if (!share->update_file_opened)
 
683
  {
 
684
    struct stat st;
 
685
    if (stat(share->format.getFileName().c_str(), &st) < 0)
 
686
      return -1;
 
687
    update_file_name= share->format.getFileName();
 
688
    update_file_name.append(".UPDATE");
 
689
    unlink(update_file_name.c_str());
 
690
    update_file_desc= ::open(update_file_name.c_str(),
 
691
                             O_RDWR | O_CREAT | O_TRUNC,
 
692
                             st.st_mode);
 
693
    if (update_file_desc < 0)
 
694
    {
 
695
      return -1;
 
696
    }
 
697
    share->update_file_opened= true;
 
698
  }
 
699
  return 0;
 
700
}
 
701
 
 
702
int FilesystemCursor::doEndTableScan()
 
703
{
 
704
  sql_command_type = session_sql_command(getTable()->getSession());
 
705
 
 
706
  if (share->format.isTagFormat())
 
707
  {
 
708
    if (thread_locked)
 
709
      critical_section_exit();
 
710
    return 0;
 
711
  }
 
712
 
 
713
  if (slots.size() == 0)
 
714
  {
 
715
    if (thread_locked)
 
716
      critical_section_exit();
 
717
    return 0;
 
718
  }
 
719
 
 
720
  int err= -1;
 
721
  sort(slots.begin(), slots.end());
 
722
  vector< pair<off_t, off_t> >::iterator slot_iter= slots.begin();
 
723
  off_t write_start= 0;
 
724
  off_t write_end= 0;
 
725
  off_t file_buffer_start= 0;
 
726
 
 
727
  pthread_mutex_lock(&share->mutex);
 
728
 
 
729
  file_buff->init_buff(file_desc);
 
730
  if (openUpdateFile() < 0)
 
731
    goto error;
 
732
 
 
733
  while (file_buffer_start != -1)
 
734
  {
 
735
    bool in_hole= false;
 
736
 
 
737
    write_end= file_buff->end();
 
738
    if (slot_iter != slots.end() &&
 
739
      write_end >= slot_iter->first)
 
740
    {
 
741
      write_end= slot_iter->first;
 
742
      in_hole= true;
 
743
    }
 
744
 
 
745
    off_t write_length= write_end - write_start;
 
746
    if (write_in_all(update_file_desc,
 
747
               file_buff->ptr() + (write_start - file_buff->start()),
 
748
               write_length) != write_length)
 
749
      goto error;
 
750
 
 
751
    if (in_hole)
 
752
    {
 
753
      while (file_buff->end() <= slot_iter->second && file_buffer_start != -1)
 
754
        file_buffer_start= file_buff->read_next();
 
755
      write_start= slot_iter->second;
 
756
      ++slot_iter;
 
757
    }
 
758
    else
 
759
      write_start= write_end;
 
760
 
 
761
    if (write_end == file_buff->end())
 
762
      file_buffer_start= file_buff->read_next();
 
763
  }
 
764
  // close update file
 
765
  if (::fsync(update_file_desc) || 
 
766
      ::close(update_file_desc))
 
767
    goto error;
 
768
  share->update_file_opened= false;
 
769
 
 
770
  // close current file
 
771
  if (::close(file_desc))
 
772
    goto error;
 
773
  if (::rename(update_file_name.c_str(), share->format.getFileName().c_str()))
 
774
    goto error;
 
775
 
 
776
  share->needs_reopen= true;
 
777
 
 
778
error:
 
779
  err= errno;
 
780
  pthread_mutex_unlock(&share->mutex);
 
781
 
 
782
  if (thread_locked)
 
783
    critical_section_exit();
 
784
 
 
785
  return err;
 
786
}
 
787
 
 
788
void FilesystemCursor::recordToString(string& output)
 
789
{
 
790
  bool first= true;
 
791
  drizzled::String attribute;
 
792
  for (Field **field= getTable()->getFields(); *field; ++field)
 
793
  {
 
794
    if (first == true)
 
795
    {
 
796
      first= false;
 
797
    }
 
798
    else
 
799
    {
 
800
      output.append(share->format.getColSeparatorHead());
 
801
    }
 
802
 
 
803
    if (not (*field)->is_null())
 
804
    {
 
805
      (*field)->setReadSet();
 
806
      (*field)->val_str(&attribute, &attribute);
 
807
 
 
808
      output.append(attribute.ptr(), attribute.length());
 
809
    }
 
810
    else
 
811
    {
 
812
      output.append("0");
 
813
    }
 
814
  }
 
815
  output.append(share->format.getRowSeparatorHead());
 
816
}
 
817
 
 
818
int FilesystemCursor::doInsertRecord(unsigned char * buf)
 
819
{
 
820
  (void)buf;
 
821
 
 
822
  if (share->format.isTagFormat())
 
823
    return 0;
 
824
 
 
825
  sql_command_type = session_sql_command(getTable()->getSession());
 
826
 
 
827
  critical_section_enter();
 
828
 
 
829
  int err_write= 0;
 
830
  int err_close= 0;
 
831
 
 
832
  string output_line;
 
833
  recordToString(output_line);
 
834
 
 
835
  int fd= ::open(share->format.getFileName().c_str(), O_WRONLY | O_APPEND);
 
836
  if (fd < 0)
 
837
  {
 
838
    critical_section_exit();
 
839
    return ENOENT;
 
840
  }
 
841
 
 
842
  err_write= write_in_all(fd, output_line.c_str(), output_line.length());
 
843
  if (err_write < 0)
 
844
    err_write= errno;
 
845
  else
 
846
    err_write= 0;
 
847
 
 
848
  err_close= ::close(fd);
 
849
  if (err_close < 0)
 
850
    err_close= errno;
 
851
 
 
852
  critical_section_exit();
 
853
 
 
854
  if (err_write)
 
855
    return err_write;
 
856
  if (err_close)
 
857
    return err_close;
 
858
  return 0;
 
859
}
 
860
 
 
861
int FilesystemCursor::doUpdateRecord(const unsigned char *, unsigned char *)
 
862
{
 
863
  if (share->format.isTagFormat())
 
864
    return 0;
 
865
  if (openUpdateFile())
 
866
    return errno;
 
867
 
 
868
  // get the update information
 
869
  string str;
 
870
  recordToString(str);
 
871
 
 
872
  if (write_in_all(update_file_desc, str.c_str(), str.length()) < 0)
 
873
    return errno;
 
874
 
 
875
  addSlot();
 
876
 
 
877
  return 0;
 
878
}
 
879
 
 
880
void FilesystemCursor::addSlot()
 
881
{
 
882
  if (slots.size() > 0 && slots.back().second == current_position)
 
883
    slots.back().second= next_position;
 
884
  else
 
885
    slots.push_back(make_pair(current_position, next_position));
 
886
}
 
887
 
 
888
int FilesystemCursor::doDeleteRecord(const unsigned char *)
 
889
{
 
890
  if (share->format.isTagFormat())
 
891
    return 0;
 
892
  addSlot();
 
893
  return 0;
 
894
}
 
895
 
 
896
int FilesystemEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
 
897
{
 
898
  if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), FILESYSTEM_EXT))
 
899
    return errno;
 
900
  return 0;
 
901
}
 
902
 
 
903
bool FilesystemEngine::validateCreateTableOption(const std::string &key,
 
904
                                                 const std::string &state)
 
905
{
 
906
  return FormatInfo::validateOption(key, state);
 
907
}
 
908
 
 
909
int FilesystemEngine::doCreateTable(Session &,
 
910
                                    Table&,
 
911
                                    const drizzled::identifier::Table &identifier,
 
912
                                    drizzled::message::Table &proto)
 
913
{
 
914
  FormatInfo format;
 
915
  format.parseFromTable(&proto);
 
916
  if (format.isFileGiven())
 
917
  {
 
918
    int err= ::open(format.getFileName().c_str(), O_RDONLY);
 
919
    if (err < 0)
 
920
      return errno;
 
921
  }
 
922
 
 
923
  string new_path(identifier.getPath());
 
924
  new_path+= FILESYSTEM_EXT;
 
925
  fstream output(new_path.c_str(), ios::out | ios::binary);
 
926
 
 
927
  if (! output)
 
928
    return 1;
 
929
 
 
930
  if (! proto.SerializeToOstream(&output))
 
931
  {
 
932
    output.close();
 
933
    unlink(new_path.c_str());
 
934
    return 1;
 
935
  }
 
936
 
 
937
  return 0;
 
938
}
 
939
 
 
940
static FilesystemEngine *filesystem_engine= NULL;
 
941
 
 
942
static int filesystem_init_func(drizzled::module::Context &context)
 
943
{
 
944
  filesystem_engine= new FilesystemEngine("FILESYSTEM");
 
945
  context.add(filesystem_engine);
 
946
 
 
947
  return 0;
 
948
}
 
949
 
 
950
DRIZZLE_DECLARE_PLUGIN
 
951
{
 
952
  DRIZZLE_VERSION_ID,
 
953
  "FILESYSTEM",
 
954
  "1.0",
 
955
  "Zimin",
 
956
  "Filesystem Engine",
 
957
  PLUGIN_LICENSE_GPL,
 
958
  filesystem_init_func, /* Plugin Init */
 
959
  NULL,                       /* depends */
 
960
  NULL                        /* config options                  */
 
961
}
 
962
DRIZZLE_DECLARE_PLUGIN_END;