~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/filesystem_engine/filesystem_engine.cc

  • Committer: Brian Aker
  • Date: 2010-12-18 18:24:57 UTC
  • mfrom: (1999.6.3 trunk)
  • Revision ID: brian@tangent.org-20101218182457-yi1wd0so2hml1k1w
Merge in Lee's copyright header fix

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
  Copyright (C) 2010 Zimin
 
3
 
 
4
  This program is free software; you can redistribute it and/or
 
5
  modify it under the terms of the GNU General Public License
 
6
  as published by the Free Software Foundation; either version 2
 
7
  of the License, or (at your option) any later version.
 
8
 
 
9
  This program is distributed in the hope that it will be useful,
 
10
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
  GNU General Public License for more details.
 
13
 
 
14
  You should have received a copy of the GNU General Public License
 
15
  along with this program; if not, write to the Free Software
 
16
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
17
*/
 
18
 
 
19
#include "config.h"
 
20
#include <drizzled/field.h>
 
21
#include <drizzled/field/blob.h>
 
22
#include <drizzled/field/timestamp.h>
 
23
#include <drizzled/error.h>
 
24
#include <drizzled/table.h>
 
25
#include <drizzled/session.h>
 
26
#include "drizzled/internal/my_sys.h"
 
27
#include <google/protobuf/io/zero_copy_stream.h>
 
28
#include <google/protobuf/io/zero_copy_stream_impl.h>
 
29
 
 
30
#include "filesystem_engine.h"
 
31
#include "utility.h"
 
32
 
 
33
#include <fcntl.h>
 
34
 
 
35
#include <string>
 
36
#include <map>
 
37
#include <fstream>
 
38
#include <sstream>
 
39
#include <iostream>
 
40
#include <boost/algorithm/string.hpp>
 
41
 
 
42
using namespace std;
 
43
using namespace drizzled;
 
44
 
 
45
#define FILESYSTEM_EXT ".FST"
 
46
 
 
47
/* Stuff for shares */
 
48
pthread_mutex_t filesystem_mutex;
 
49
 
 
50
static const char *ha_filesystem_exts[] = {
 
51
  FILESYSTEM_EXT,
 
52
  NULL
 
53
};
 
54
 
 
55
class FilesystemEngine : public drizzled::plugin::StorageEngine
 
56
{
 
57
private:
 
58
  typedef std::map<string, FilesystemTableShare*> FilesystemMap;
 
59
  FilesystemMap fs_open_tables;
 
60
public:
 
61
  FilesystemEngine(const string& name_arg)
 
62
   : drizzled::plugin::StorageEngine(name_arg,
 
63
                                     HTON_NULL_IN_KEY |
 
64
                                     HTON_SKIP_STORE_LOCK |
 
65
                                     HTON_CAN_INDEX_BLOBS |
 
66
                                     HTON_AUTO_PART_KEY),
 
67
     fs_open_tables()
 
68
  {
 
69
    table_definition_ext= FILESYSTEM_EXT;
 
70
    pthread_mutex_init(&filesystem_mutex, MY_MUTEX_INIT_FAST);
 
71
  }
 
72
  virtual ~FilesystemEngine()
 
73
  {
 
74
    pthread_mutex_destroy(&filesystem_mutex);
 
75
  }
 
76
 
 
77
  virtual Cursor *create(Table &table)
 
78
  {
 
79
    return new FilesystemCursor(*this, table);
 
80
  }
 
81
 
 
82
  const char **bas_ext() const {
 
83
    return ha_filesystem_exts;
 
84
  }
 
85
 
 
86
  bool validateCreateTableOption(const std::string &key, const std::string &state);
 
87
 
 
88
  int doCreateTable(Session &,
 
89
                    Table &table_arg,
 
90
                    const drizzled::TableIdentifier &identifier,
 
91
                    drizzled::message::Table&);
 
92
 
 
93
  int doGetTableDefinition(Session& ,
 
94
                           const drizzled::TableIdentifier &,
 
95
                           drizzled::message::Table &);
 
96
 
 
97
  int doDropTable(Session&, const TableIdentifier &);
 
98
 
 
99
  /* operations on FilesystemTableShare */
 
100
  FilesystemTableShare *findOpenTable(const string table_name);
 
101
  void addOpenTable(const string &table_name, FilesystemTableShare *);
 
102
  void deleteOpenTable(const string &table_name);
 
103
 
 
104
  uint32_t max_keys()          const { return 0; }
 
105
  uint32_t max_key_parts()     const { return 0; }
 
106
  uint32_t max_key_length()    const { return 0; }
 
107
  bool doDoesTableExist(Session& , const TableIdentifier &);
 
108
  int doRenameTable(Session&, const TableIdentifier &, const TableIdentifier &);
 
109
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
110
                             const drizzled::SchemaIdentifier &schema_identifier,
 
111
                             drizzled::TableIdentifier::vector &set_of_identifiers);
 
112
private:
 
113
  void getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
 
114
                                   const drizzled::SchemaIdentifier &schema_identifier,
 
115
                                   drizzled::plugin::TableNameList *set_of_names,
 
116
                                   drizzled::TableIdentifier::vector *set_of_identifiers);
 
117
};
 
118
 
 
119
void FilesystemEngine::getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
 
120
                                                   const drizzled::SchemaIdentifier &schema_identifier,
 
121
                                                   drizzled::plugin::TableNameList *set_of_names,
 
122
                                                   drizzled::TableIdentifier::vector *set_of_identifiers)
 
123
{
 
124
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
125
 
 
126
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
 
127
      entry_iter != entries.end();
 
128
      ++entry_iter)
 
129
  {
 
130
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
131
    const string *filename= &entry->filename;
 
132
 
 
133
    assert(not filename->empty());
 
134
 
 
135
    string::size_type suffix_pos= filename->rfind('.');
 
136
 
 
137
    if (suffix_pos != string::npos &&
 
138
        boost::iequals(filename->substr(suffix_pos), FILESYSTEM_EXT) &&
 
139
        filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX))
 
140
    {
 
141
      char uname[NAME_LEN + 1];
 
142
      uint32_t file_name_len;
 
143
 
 
144
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
145
      uname[file_name_len - sizeof(FILESYSTEM_EXT) + 1]= '\0';
 
146
      if (set_of_names)
 
147
        set_of_names->insert(uname);
 
148
      if (set_of_identifiers)
 
149
        set_of_identifiers->push_back(TableIdentifier(schema_identifier, uname));
 
150
    }
 
151
  }
 
152
}
 
153
 
 
154
void FilesystemEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
155
                                             const drizzled::SchemaIdentifier &schema_identifier,
 
156
                                             drizzled::TableIdentifier::vector &set_of_identifiers)
 
157
{
 
158
  getTableNamesFromFilesystem(directory, schema_identifier, NULL, &set_of_identifiers);
 
159
}
 
160
 
 
161
int FilesystemEngine::doDropTable(Session &, const TableIdentifier &identifier)
 
162
{
 
163
  string new_path(identifier.getPath());
 
164
  new_path+= FILESYSTEM_EXT;
 
165
  int err= unlink(new_path.c_str());
 
166
  if (err)
 
167
  {
 
168
    err= errno;
 
169
  }
 
170
  return err;
 
171
}
 
172
 
 
173
bool FilesystemEngine::doDoesTableExist(Session &, const TableIdentifier &identifier)
 
174
{
 
175
  string proto_path(identifier.getPath());
 
176
  proto_path.append(FILESYSTEM_EXT);
 
177
 
 
178
  if (access(proto_path.c_str(), F_OK))
 
179
  {
 
180
    return false;
 
181
  }
 
182
 
 
183
  return true;
 
184
}
 
185
 
 
186
FilesystemTableShare *FilesystemEngine::findOpenTable(const string table_name)
 
187
{
 
188
  FilesystemMap::iterator find_iter=
 
189
    fs_open_tables.find(table_name);
 
190
 
 
191
  if (find_iter != fs_open_tables.end())
 
192
    return (*find_iter).second;
 
193
  else
 
194
    return NULL;
 
195
}
 
196
 
 
197
void FilesystemEngine::addOpenTable(const string &table_name, FilesystemTableShare *share)
 
198
{
 
199
  fs_open_tables[table_name]= share;
 
200
}
 
201
 
 
202
void FilesystemEngine::deleteOpenTable(const string &table_name)
 
203
{
 
204
  fs_open_tables.erase(table_name);
 
205
}
 
206
 
 
207
static int parseTaggedFile(const FormatInfo &fi, vector< map<string, string> > &v)
 
208
{
 
209
  int filedesc= ::open(fi.getFileName().c_str(), O_RDONLY);
 
210
  if (filedesc < 0)
 
211
    return errno;
 
212
 
 
213
  boost::scoped_ptr<TransparentFile> filebuffer(new TransparentFile);
 
214
  filebuffer->init_buff(filedesc);
 
215
 
 
216
  bool last_line_empty= false;
 
217
  map<string, string> kv;
 
218
  int pos= 0;
 
219
  string line;
 
220
  while (1)
 
221
  {
 
222
    char ch= filebuffer->get_value(pos);
 
223
    if (ch == '\0')
 
224
    {
 
225
      if (!last_line_empty)
 
226
      {
 
227
        v.push_back(kv);
 
228
        kv.clear();
 
229
      }
 
230
      break;
 
231
    }
 
232
    ++pos;
 
233
 
 
234
    if (!fi.isRowSeparator(ch))
 
235
    {
 
236
      line.push_back(ch);
 
237
      continue;
 
238
    }
 
239
 
 
240
    // if we have a new empty line,
 
241
    // it means we got the end of a section, push it to vector
 
242
    if (line.empty())
 
243
    {
 
244
      if (!last_line_empty)
 
245
      {
 
246
        v.push_back(kv);
 
247
        kv.clear();
 
248
      }
 
249
      last_line_empty= true;
 
250
      continue;
 
251
    }
 
252
 
 
253
    // parse the line
 
254
    vector<string> sv, svcopy;
 
255
    boost::split(sv, line, boost::is_any_of(fi.getColSeparator()));
 
256
    for (vector<string>::iterator iter= sv.begin();
 
257
         iter != sv.end();
 
258
         ++iter)
 
259
    {
 
260
      if (!iter->empty())
 
261
        svcopy.push_back(*iter);
 
262
    }
 
263
 
 
264
    // the first splitted string as key,
 
265
    // and the second splitted string as value.
 
266
    string key(svcopy[0]);
 
267
    boost::trim(key);
 
268
    if (svcopy.size() >= 2)
 
269
    {
 
270
      string value(svcopy[1]);
 
271
      boost::trim(value);
 
272
      kv[key]= value;
 
273
    }
 
274
    else if (svcopy.size() >= 1)
 
275
      kv[key]= "";
 
276
 
 
277
    last_line_empty= false;
 
278
    line.clear();
 
279
  }
 
280
  close(filedesc);
 
281
  return 0;
 
282
}
 
283
 
 
284
int FilesystemEngine::doGetTableDefinition(Session &,
 
285
                                           const drizzled::TableIdentifier &identifier,
 
286
                                           drizzled::message::Table &table_proto)
 
287
{
 
288
  string new_path(identifier.getPath());
 
289
  new_path.append(FILESYSTEM_EXT);
 
290
 
 
291
  int fd= ::open(new_path.c_str(), O_RDONLY);
 
292
  if (fd < 0)
 
293
    return ENOENT;
 
294
 
 
295
  google::protobuf::io::ZeroCopyInputStream* input=
 
296
    new google::protobuf::io::FileInputStream(fd);
 
297
 
 
298
  if (not input)
 
299
    return HA_ERR_CRASHED_ON_USAGE;
 
300
 
 
301
  if (not table_proto.ParseFromZeroCopyStream(input))
 
302
  {
 
303
    close(fd);
 
304
    delete input;
 
305
    if (not table_proto.IsInitialized())
 
306
    {
 
307
      my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
 
308
               table_proto.InitializationErrorString().c_str());
 
309
      return ER_CORRUPT_TABLE_DEFINITION;
 
310
    }
 
311
 
 
312
    return HA_ERR_CRASHED_ON_USAGE;
 
313
  }
 
314
  delete input;
 
315
 
 
316
  // if the file is a tagged file such as /proc/meminfo
 
317
  // then columns of this table are added dynamically here.
 
318
  FormatInfo format;
 
319
  format.parseFromTable(&table_proto);
 
320
  if (not format.isTagFormat() || not format.isFileGiven())
 
321
  {
 
322
    close(fd);
 
323
    return EEXIST;
 
324
  }
 
325
 
 
326
  std::vector< std::map<std::string, std::string> > vm;
 
327
  if (parseTaggedFile(format, vm) != 0)
 
328
  {
 
329
    close(fd);
 
330
 
 
331
    return EEXIST;
 
332
  }
 
333
  if (vm.size() == 0) {
 
334
    close(fd);
 
335
    return EEXIST;
 
336
  }
 
337
 
 
338
  // we don't care what user provides, just clear them all
 
339
  table_proto.clear_field();
 
340
  // we take the first section as sample
 
341
  std::map<string, string> kv= vm[0];
 
342
  for (std::map<string, string>::iterator iter= kv.begin();
 
343
       iter != kv.end();
 
344
       ++iter)
 
345
  {
 
346
    // add columns to table proto
 
347
    message::Table::Field *field= table_proto.add_field();
 
348
    field->set_name(iter->first);
 
349
    field->set_type(drizzled::message::Table::Field::VARCHAR);
 
350
    message::Table::Field::StringFieldOptions *stringoption= field->mutable_string_options();
 
351
    stringoption->set_length(iter->second.length() + 1);
 
352
  }
 
353
 
 
354
  close(fd);
 
355
  return EEXIST;
 
356
}
 
357
 
 
358
FilesystemTableShare::FilesystemTableShare(const string table_name_arg)
 
359
  : use_count(0), table_name(table_name_arg),
 
360
  update_file_opened(false),
 
361
  needs_reopen(false)
 
362
{
 
363
}
 
364
 
 
365
FilesystemTableShare::~FilesystemTableShare()
 
366
{
 
367
  pthread_mutex_destroy(&mutex);
 
368
}
 
369
 
 
370
FilesystemTableShare *FilesystemCursor::get_share(const char *table_name)
 
371
{
 
372
  Guard g(filesystem_mutex);
 
373
 
 
374
  FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
 
375
  share= a_engine->findOpenTable(table_name);
 
376
 
 
377
  /*
 
378
    If share is not present in the hash, create a new share and
 
379
    initialize its members.
 
380
  */
 
381
  if (share == NULL)
 
382
  {
 
383
    share= new (nothrow) FilesystemTableShare(table_name);
 
384
    if (share == NULL)
 
385
    {
 
386
      return NULL;
 
387
    }
 
388
 
 
389
    share->format.parseFromTable(getTable()->getShare()->getTableProto());
 
390
    if (!share->format.isFileGiven())
 
391
    {
 
392
      return NULL;
 
393
    }
 
394
    /*
 
395
     * for taggered file such as /proc/meminfo,
 
396
     * we pre-process it first, and store the parsing result in a map.
 
397
     */
 
398
    if (share->format.isTagFormat())
 
399
    {
 
400
      if (parseTaggedFile(share->format, share->vm) != 0)
 
401
      {
 
402
        return NULL;
 
403
      }
 
404
    }
 
405
    a_engine->addOpenTable(share->table_name, share);
 
406
 
 
407
    pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
 
408
  }
 
409
  share->use_count++;
 
410
 
 
411
  return share;
 
412
}
 
413
 
 
414
void FilesystemCursor::free_share()
 
415
{
 
416
  Guard g(filesystem_mutex);
 
417
 
 
418
  if (!--share->use_count){
 
419
    FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
 
420
    a_engine->deleteOpenTable(share->table_name);
 
421
    pthread_mutex_destroy(&share->mutex);
 
422
    delete share;
 
423
  }
 
424
}
 
425
 
 
426
void FilesystemCursor::critical_section_enter()
 
427
{
 
428
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
 
429
      sql_command_type == SQLCOM_UPDATE ||
 
430
      sql_command_type == SQLCOM_DELETE ||
 
431
      sql_command_type == SQLCOM_INSERT ||
 
432
      sql_command_type == SQLCOM_INSERT_SELECT ||
 
433
      sql_command_type == SQLCOM_REPLACE ||
 
434
      sql_command_type == SQLCOM_REPLACE_SELECT)
 
435
    share->filesystem_lock.scan_update_begin();
 
436
  else
 
437
    share->filesystem_lock.scan_begin();
 
438
 
 
439
  thread_locked = true;
 
440
}
 
441
 
 
442
void FilesystemCursor::critical_section_exit()
 
443
{
 
444
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
 
445
      sql_command_type == SQLCOM_UPDATE ||
 
446
      sql_command_type == SQLCOM_DELETE ||
 
447
      sql_command_type == SQLCOM_INSERT ||
 
448
      sql_command_type == SQLCOM_INSERT_SELECT ||
 
449
      sql_command_type == SQLCOM_REPLACE ||
 
450
      sql_command_type == SQLCOM_REPLACE_SELECT)
 
451
    share->filesystem_lock.scan_update_end();
 
452
  else
 
453
    share->filesystem_lock.scan_end();
 
454
 
 
455
  thread_locked = false;
 
456
}
 
457
 
 
458
FilesystemCursor::FilesystemCursor(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
 
459
  : Cursor(engine_arg, table_arg),
 
460
    file_buff(new TransparentFile),
 
461
    thread_locked(false)
 
462
{
 
463
}
 
464
 
 
465
int FilesystemCursor::doOpen(const drizzled::TableIdentifier &identifier, int, uint32_t)
 
466
{
 
467
  if (!(share= get_share(identifier.getPath().c_str())))
 
468
    return ENOENT;
 
469
 
 
470
  file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
 
471
  if (file_desc < 0)
 
472
  {
 
473
    free_share();
 
474
    return ER_CANT_OPEN_FILE;
 
475
  }
 
476
 
 
477
  ref_length= sizeof(off_t);
 
478
  return 0;
 
479
}
 
480
 
 
481
int FilesystemCursor::close(void)
 
482
{
 
483
  int err= ::close(file_desc);
 
484
  if (err < 0)
 
485
    err= errno;
 
486
  free_share();
 
487
  return err;
 
488
}
 
489
 
 
490
int FilesystemCursor::doStartTableScan(bool)
 
491
{
 
492
  sql_command_type = session_sql_command(getTable()->getSession());
 
493
 
 
494
  if (thread_locked)
 
495
    critical_section_exit();
 
496
  critical_section_enter();
 
497
 
 
498
  if (share->format.isTagFormat())
 
499
  {
 
500
    tag_depth= 0;
 
501
    return 0;
 
502
  }
 
503
 
 
504
  current_position= 0;
 
505
  next_position= 0;
 
506
  slots.clear();
 
507
  if (share->needs_reopen)
 
508
  {
 
509
    file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
 
510
    if (file_desc < 0)
 
511
      return HA_ERR_CRASHED_ON_USAGE;
 
512
    share->needs_reopen= false;
 
513
  }
 
514
  file_buff->init_buff(file_desc);
 
515
  return 0;
 
516
}
 
517
 
 
518
int FilesystemCursor::find_current_row(unsigned char *buf)
 
519
{
 
520
  ptrdiff_t row_offset= buf - getTable()->record[0];
 
521
 
 
522
  next_position= current_position;
 
523
 
 
524
  string content;
 
525
  bool line_done= false;
 
526
  bool line_blank= true;
 
527
  Field **field= getTable()->getFields();
 
528
  for (; !line_done && *field; ++next_position)
 
529
  {
 
530
    char ch= file_buff->get_value(next_position);
 
531
    if (ch == '\0')
 
532
      return HA_ERR_END_OF_FILE;
 
533
 
 
534
    if (share->format.isEscapedChar(ch))
 
535
    {
 
536
      // read next character
 
537
      ch= file_buff->get_value(++next_position);
 
538
      if (ch == '\0')
 
539
        return HA_ERR_END_OF_FILE;
 
540
 
 
541
      content.push_back(FormatInfo::getEscapedChar(ch));
 
542
 
 
543
      continue;
 
544
    }
 
545
 
 
546
    // if we find separator
 
547
    bool is_row= share->format.isRowSeparator(ch);
 
548
    bool is_col= share->format.isColSeparator(ch);
 
549
    if (content.empty())
 
550
    {
 
551
      if (share->format.isSeparatorModeGeneral() && is_row && line_blank)
 
552
        continue;
 
553
      if (share->format.isSeparatorModeWeak() && is_col)
 
554
        continue;
 
555
    }
 
556
 
 
557
    if (is_row || is_col)
 
558
    {
 
559
      (*field)->move_field_offset(row_offset);
 
560
      if (!content.empty())
 
561
      {
 
562
        (*field)->set_notnull();
 
563
        if ((*field)->isReadSet() || (*field)->isWriteSet())
 
564
        {
 
565
          (*field)->setWriteSet();
 
566
          (*field)->store_and_check(CHECK_FIELD_WARN,
 
567
                                    content.c_str(),
 
568
                                    (uint32_t)content.length(),
 
569
                                    &my_charset_bin);
 
570
        }
 
571
        else
 
572
        {
 
573
          (*field)->set_default();
 
574
        }
 
575
      }
 
576
      else
 
577
        (*field)->set_null();
 
578
      (*field)->move_field_offset(-row_offset);
 
579
 
 
580
      content.clear();
 
581
      ++field;
 
582
 
 
583
      line_blank= false;
 
584
      if (is_row)
 
585
        line_done= true;
 
586
 
 
587
      continue;
 
588
    }
 
589
    content.push_back(ch);
 
590
  }
 
591
  if (line_done)
 
592
  {
 
593
    for (; *field; ++field)
 
594
    {
 
595
      (*field)->move_field_offset(row_offset);
 
596
      (*field)->set_notnull();
 
597
      (*field)->set_default();
 
598
      (*field)->move_field_offset(-row_offset);
 
599
    }
 
600
  }
 
601
  else
 
602
  {
 
603
    // eat up characters when line_done
 
604
    while (!line_done)
 
605
    {
 
606
      char ch= file_buff->get_value(next_position);
 
607
      if (share->format.isRowSeparator(ch))
 
608
        line_done= true;
 
609
      ++next_position;
 
610
    }
 
611
  }
 
612
  return 0;
 
613
}
 
614
 
 
615
int FilesystemCursor::rnd_next(unsigned char *buf)
 
616
{
 
617
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
618
  if (share->format.isTagFormat())
 
619
  {
 
620
    if (tag_depth >= share->vm.size())
 
621
      return HA_ERR_END_OF_FILE;
 
622
 
 
623
    ptrdiff_t row_offset= buf - getTable()->record[0];
 
624
    for (Field **field= getTable()->getFields(); *field; field++)
 
625
    {
 
626
      string key((*field)->field_name);
 
627
      string content= share->vm[tag_depth][key];
 
628
 
 
629
      (*field)->move_field_offset(row_offset);
 
630
      if (!content.empty())
 
631
      {
 
632
        (*field)->set_notnull();
 
633
        if ((*field)->isReadSet() || (*field)->isWriteSet())
 
634
        {
 
635
          (*field)->setWriteSet();
 
636
          (*field)->store_and_check(CHECK_FIELD_WARN,
 
637
                                    content.c_str(),
 
638
                                    (uint32_t)content.length(),
 
639
                                    &my_charset_bin);
 
640
        }
 
641
        else
 
642
        {
 
643
          (*field)->set_default();
 
644
        }
 
645
      }
 
646
      else
 
647
      {
 
648
        (*field)->set_null();
 
649
      }
 
650
      (*field)->move_field_offset(-row_offset);
 
651
    }
 
652
    ++tag_depth;
 
653
    return 0;
 
654
  }
 
655
  // normal file
 
656
  current_position= next_position;
 
657
  return find_current_row(buf);
 
658
}
 
659
 
 
660
void FilesystemCursor::position(const unsigned char *)
 
661
{
 
662
  *reinterpret_cast<off_t *>(ref)= current_position;
 
663
}
 
664
 
 
665
int FilesystemCursor::rnd_pos(unsigned char * buf, unsigned char *pos)
 
666
{
 
667
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
 
668
  current_position= *reinterpret_cast<off_t *>(pos);
 
669
  return find_current_row(buf);
 
670
}
 
671
 
 
672
int FilesystemCursor::info(uint32_t)
 
673
{
 
674
  if (stats.records < 2)
 
675
    stats.records= 2;
 
676
  return 0;
 
677
}
 
678
 
 
679
int FilesystemCursor::openUpdateFile()
 
680
{
 
681
  if (!share->update_file_opened)
 
682
  {
 
683
    struct stat st;
 
684
    if (stat(share->format.getFileName().c_str(), &st) < 0)
 
685
      return -1;
 
686
    update_file_name= share->format.getFileName();
 
687
    update_file_name.append(".UPDATE");
 
688
    unlink(update_file_name.c_str());
 
689
    update_file_desc= ::open(update_file_name.c_str(),
 
690
                             O_RDWR | O_CREAT | O_TRUNC,
 
691
                             st.st_mode);
 
692
    if (update_file_desc < 0)
 
693
    {
 
694
      return -1;
 
695
    }
 
696
    share->update_file_opened= true;
 
697
  }
 
698
  return 0;
 
699
}
 
700
 
 
701
int FilesystemCursor::doEndTableScan()
 
702
{
 
703
  sql_command_type = session_sql_command(getTable()->getSession());
 
704
 
 
705
  if (share->format.isTagFormat())
 
706
  {
 
707
    if (thread_locked)
 
708
      critical_section_exit();
 
709
    return 0;
 
710
  }
 
711
 
 
712
  if (slots.size() == 0)
 
713
  {
 
714
    if (thread_locked)
 
715
      critical_section_exit();
 
716
    return 0;
 
717
  }
 
718
 
 
719
  int err= -1;
 
720
  sort(slots.begin(), slots.end());
 
721
  vector< pair<off_t, off_t> >::iterator slot_iter= slots.begin();
 
722
  off_t write_start= 0;
 
723
  off_t write_end= 0;
 
724
  off_t file_buffer_start= 0;
 
725
 
 
726
  pthread_mutex_lock(&share->mutex);
 
727
 
 
728
  file_buff->init_buff(file_desc);
 
729
  if (openUpdateFile() < 0)
 
730
    goto error;
 
731
 
 
732
  while (file_buffer_start != -1)
 
733
  {
 
734
    bool in_hole= false;
 
735
 
 
736
    write_end= file_buff->end();
 
737
    if (slot_iter != slots.end() &&
 
738
      write_end >= slot_iter->first)
 
739
    {
 
740
      write_end= slot_iter->first;
 
741
      in_hole= true;
 
742
    }
 
743
 
 
744
    off_t write_length= write_end - write_start;
 
745
    if (write_in_all(update_file_desc,
 
746
               file_buff->ptr() + (write_start - file_buff->start()),
 
747
               write_length) != write_length)
 
748
      goto error;
 
749
 
 
750
    if (in_hole)
 
751
    {
 
752
      while (file_buff->end() <= slot_iter->second && file_buffer_start != -1)
 
753
        file_buffer_start= file_buff->read_next();
 
754
      write_start= slot_iter->second;
 
755
      ++slot_iter;
 
756
    }
 
757
    else
 
758
      write_start= write_end;
 
759
 
 
760
    if (write_end == file_buff->end())
 
761
      file_buffer_start= file_buff->read_next();
 
762
  }
 
763
  // close update file
 
764
  if (::fsync(update_file_desc) || 
 
765
      ::close(update_file_desc))
 
766
    goto error;
 
767
  share->update_file_opened= false;
 
768
 
 
769
  // close current file
 
770
  if (::close(file_desc))
 
771
    goto error;
 
772
  if (::rename(update_file_name.c_str(), share->format.getFileName().c_str()))
 
773
    goto error;
 
774
 
 
775
  share->needs_reopen= true;
 
776
 
 
777
error:
 
778
  err= errno;
 
779
  pthread_mutex_unlock(&share->mutex);
 
780
 
 
781
  if (thread_locked)
 
782
    critical_section_exit();
 
783
 
 
784
  return err;
 
785
}
 
786
 
 
787
void FilesystemCursor::recordToString(string& output)
 
788
{
 
789
  bool first= true;
 
790
  drizzled::String attribute;
 
791
  for (Field **field= getTable()->getFields(); *field; ++field)
 
792
  {
 
793
    if (first == true)
 
794
    {
 
795
      first= false;
 
796
    }
 
797
    else
 
798
    {
 
799
      output.append(share->format.getColSeparatorHead());
 
800
    }
 
801
 
 
802
    if (not (*field)->is_null())
 
803
    {
 
804
      (*field)->setReadSet();
 
805
      (*field)->val_str(&attribute, &attribute);
 
806
 
 
807
      output.append(attribute.ptr(), attribute.length());
 
808
    }
 
809
    else
 
810
    {
 
811
      output.append("0");
 
812
    }
 
813
  }
 
814
  output.append(share->format.getRowSeparatorHead());
 
815
}
 
816
 
 
817
int FilesystemCursor::doInsertRecord(unsigned char * buf)
 
818
{
 
819
  (void)buf;
 
820
 
 
821
  if (share->format.isTagFormat())
 
822
    return 0;
 
823
 
 
824
  sql_command_type = session_sql_command(getTable()->getSession());
 
825
 
 
826
  critical_section_enter();
 
827
 
 
828
  int err_write= 0;
 
829
  int err_close= 0;
 
830
 
 
831
  string output_line;
 
832
  recordToString(output_line);
 
833
 
 
834
  int fd= ::open(share->format.getFileName().c_str(), O_WRONLY | O_APPEND);
 
835
  if (fd < 0)
 
836
  {
 
837
    critical_section_exit();
 
838
    return ENOENT;
 
839
  }
 
840
 
 
841
  err_write= write_in_all(fd, output_line.c_str(), output_line.length());
 
842
  if (err_write < 0)
 
843
    err_write= errno;
 
844
  else
 
845
    err_write= 0;
 
846
 
 
847
  err_close= ::close(fd);
 
848
  if (err_close < 0)
 
849
    err_close= errno;
 
850
 
 
851
  critical_section_exit();
 
852
 
 
853
  if (err_write)
 
854
    return err_write;
 
855
  if (err_close)
 
856
    return err_close;
 
857
  return 0;
 
858
}
 
859
 
 
860
int FilesystemCursor::doUpdateRecord(const unsigned char *, unsigned char *)
 
861
{
 
862
  if (share->format.isTagFormat())
 
863
    return 0;
 
864
  if (openUpdateFile())
 
865
    return errno;
 
866
 
 
867
  // get the update information
 
868
  string str;
 
869
  recordToString(str);
 
870
 
 
871
  if (write_in_all(update_file_desc, str.c_str(), str.length()) < 0)
 
872
    return errno;
 
873
 
 
874
  addSlot();
 
875
 
 
876
  return 0;
 
877
}
 
878
 
 
879
void FilesystemCursor::addSlot()
 
880
{
 
881
  if (slots.size() > 0 && slots.back().second == current_position)
 
882
    slots.back().second= next_position;
 
883
  else
 
884
    slots.push_back(make_pair(current_position, next_position));
 
885
}
 
886
 
 
887
int FilesystemCursor::doDeleteRecord(const unsigned char *)
 
888
{
 
889
  if (share->format.isTagFormat())
 
890
    return 0;
 
891
  addSlot();
 
892
  return 0;
 
893
}
 
894
 
 
895
int FilesystemEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
 
896
{
 
897
  if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), FILESYSTEM_EXT))
 
898
    return errno;
 
899
  return 0;
 
900
}
 
901
 
 
902
bool FilesystemEngine::validateCreateTableOption(const std::string &key,
 
903
                                                 const std::string &state)
 
904
{
 
905
  return FormatInfo::validateOption(key, state);
 
906
}
 
907
 
 
908
int FilesystemEngine::doCreateTable(Session &,
 
909
                        Table&,
 
910
                        const drizzled::TableIdentifier &identifier,
 
911
                        drizzled::message::Table &proto)
 
912
{
 
913
  FormatInfo format;
 
914
  format.parseFromTable(&proto);
 
915
  if (format.isFileGiven())
 
916
  {
 
917
    int err= ::open(format.getFileName().c_str(), O_RDONLY);
 
918
    if (err < 0)
 
919
      return errno;
 
920
  }
 
921
 
 
922
  string new_path(identifier.getPath());
 
923
  new_path+= FILESYSTEM_EXT;
 
924
  fstream output(new_path.c_str(), ios::out | ios::binary);
 
925
 
 
926
  if (! output)
 
927
    return 1;
 
928
 
 
929
  if (! proto.SerializeToOstream(&output))
 
930
  {
 
931
    output.close();
 
932
    unlink(new_path.c_str());
 
933
    return 1;
 
934
  }
 
935
 
 
936
  return 0;
 
937
}
 
938
 
 
939
static FilesystemEngine *filesystem_engine= NULL;
 
940
 
 
941
static int filesystem_init_func(drizzled::module::Context &context)
 
942
{
 
943
  filesystem_engine= new FilesystemEngine("FILESYSTEM");
 
944
  context.add(filesystem_engine);
 
945
 
 
946
  return 0;
 
947
}
 
948
 
 
949
DRIZZLE_DECLARE_PLUGIN
 
950
{
 
951
  DRIZZLE_VERSION_ID,
 
952
  "FILESYSTEM",
 
953
  "1.0",
 
954
  "Zimin",
 
955
  "Filesystem Engine",
 
956
  PLUGIN_LICENSE_GPL,
 
957
  filesystem_init_func, /* Plugin Init */
 
958
  NULL,                       /* system variables                */
 
959
  NULL                        /* config options                  */
 
960
}
 
961
DRIZZLE_DECLARE_PLUGIN_END;