~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/filesystem_engine/filesystem_engine.cc

  • Committer: Monty Taylor
  • Date: 2010-12-24 02:13:05 UTC
  • mto: This revision was merged to the branch mainline in revision 2038.
  • Revision ID: mordred@inaugust.com-20101224021305-e3slv1cyjczqorij
Changed the bzrignore file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
  Copyright (C) 2010 Zimin
 
3
 
 
4
  This program is free software; you can redistribute it and/or
 
5
  modify it under the terms of the GNU General Public License
 
6
  as published by the Free Software Foundation; either version 2
 
7
  of the License, or (at your option) any later version.
 
8
 
 
9
  This program is distributed in the hope that it will be useful,
 
10
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
  GNU General Public License for more details.
 
13
 
 
14
  You should have received a copy of the GNU General Public License
 
15
  along with this program; if not, write to the Free Software
 
16
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
17
*/
 
18
 
 
19
#include "config.h"
 
20
#include <drizzled/field.h>
 
21
#include <drizzled/field/blob.h>
 
22
#include <drizzled/error.h>
 
23
#include <drizzled/table.h>
 
24
#include <drizzled/session.h>
 
25
#include "drizzled/internal/my_sys.h"
 
26
#include <google/protobuf/io/zero_copy_stream.h>
 
27
#include <google/protobuf/io/zero_copy_stream_impl.h>
 
28
 
 
29
#include "filesystem_engine.h"
 
30
#include "utility.h"
 
31
 
 
32
#include <fcntl.h>
 
33
 
 
34
#include <string>
 
35
#include <map>
 
36
#include <fstream>
 
37
#include <sstream>
 
38
#include <iostream>
 
39
#include <boost/algorithm/string.hpp>
 
40
 
 
41
using namespace std;
 
42
using namespace drizzled;
 
43
 
 
44
#define FILESYSTEM_EXT ".FST"
 
45
 
 
46
/* Stuff for shares */
 
47
pthread_mutex_t filesystem_mutex;
 
48
 
 
49
static const char *ha_filesystem_exts[] = {
 
50
  FILESYSTEM_EXT,
 
51
  NULL
 
52
};
 
53
 
 
54
class FilesystemEngine : public drizzled::plugin::StorageEngine
 
55
{
 
56
private:
 
57
  typedef std::map<string, FilesystemTableShare*> FilesystemMap;
 
58
  FilesystemMap fs_open_tables;
 
59
public:
 
60
  FilesystemEngine(const string& name_arg)
 
61
   : drizzled::plugin::StorageEngine(name_arg,
 
62
                                     HTON_NULL_IN_KEY |
 
63
                                     HTON_SKIP_STORE_LOCK |
 
64
                                     HTON_CAN_INDEX_BLOBS |
 
65
                                     HTON_AUTO_PART_KEY),
 
66
     fs_open_tables()
 
67
  {
 
68
    table_definition_ext= FILESYSTEM_EXT;
 
69
    pthread_mutex_init(&filesystem_mutex, MY_MUTEX_INIT_FAST);
 
70
  }
 
71
  virtual ~FilesystemEngine()
 
72
  {
 
73
    pthread_mutex_destroy(&filesystem_mutex);
 
74
  }
 
75
 
 
76
  virtual Cursor *create(Table &table)
 
77
  {
 
78
    return new FilesystemCursor(*this, table);
 
79
  }
 
80
 
 
81
  const char **bas_ext() const {
 
82
    return ha_filesystem_exts;
 
83
  }
 
84
 
 
85
  bool validateCreateTableOption(const std::string &key, const std::string &state);
 
86
 
 
87
  int doCreateTable(Session &,
 
88
                    Table &table_arg,
 
89
                    const drizzled::TableIdentifier &identifier,
 
90
                    drizzled::message::Table&);
 
91
 
 
92
  int doGetTableDefinition(Session& ,
 
93
                           const drizzled::TableIdentifier &,
 
94
                           drizzled::message::Table &);
 
95
 
 
96
  int doDropTable(Session&, const TableIdentifier &);
 
97
 
 
98
  /* operations on FilesystemTableShare */
 
99
  FilesystemTableShare *findOpenTable(const string table_name);
 
100
  void addOpenTable(const string &table_name, FilesystemTableShare *);
 
101
  void deleteOpenTable(const string &table_name);
 
102
 
 
103
  uint32_t max_keys()          const { return 0; }
 
104
  uint32_t max_key_parts()     const { return 0; }
 
105
  uint32_t max_key_length()    const { return 0; }
 
106
  bool doDoesTableExist(Session& , const TableIdentifier &);
 
107
  int doRenameTable(Session&, const TableIdentifier &, const TableIdentifier &);
 
108
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
109
                             const drizzled::SchemaIdentifier &schema_identifier,
 
110
                             drizzled::TableIdentifier::vector &set_of_identifiers);
 
111
private:
 
112
  void getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
 
113
                                   const drizzled::SchemaIdentifier &schema_identifier,
 
114
                                   drizzled::plugin::TableNameList *set_of_names,
 
115
                                   drizzled::TableIdentifier::vector *set_of_identifiers);
 
116
};
 
117
 
 
118
void FilesystemEngine::getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
 
119
                                                   const drizzled::SchemaIdentifier &schema_identifier,
 
120
                                                   drizzled::plugin::TableNameList *set_of_names,
 
121
                                                   drizzled::TableIdentifier::vector *set_of_identifiers)
 
122
{
 
123
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
124
 
 
125
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
 
126
      entry_iter != entries.end();
 
127
      ++entry_iter)
 
128
  {
 
129
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
130
    const string *filename= &entry->filename;
 
131
 
 
132
    assert(not filename->empty());
 
133
 
 
134
    string::size_type suffix_pos= filename->rfind('.');
 
135
 
 
136
    if (suffix_pos != string::npos &&
 
137
        boost::iequals(filename->substr(suffix_pos), FILESYSTEM_EXT) &&
 
138
        filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX))
 
139
    {
 
140
      char uname[NAME_LEN + 1];
 
141
      uint32_t file_name_len;
 
142
 
 
143
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
144
      uname[file_name_len - sizeof(FILESYSTEM_EXT) + 1]= '\0';
 
145
      if (set_of_names)
 
146
        set_of_names->insert(uname);
 
147
      if (set_of_identifiers)
 
148
        set_of_identifiers->push_back(TableIdentifier(schema_identifier, uname));
 
149
    }
 
150
  }
 
151
}
 
152
 
 
153
void FilesystemEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
154
                                             const drizzled::SchemaIdentifier &schema_identifier,
 
155
                                             drizzled::TableIdentifier::vector &set_of_identifiers)
 
156
{
 
157
  getTableNamesFromFilesystem(directory, schema_identifier, NULL, &set_of_identifiers);
 
158
}
 
159
 
 
160
int FilesystemEngine::doDropTable(Session &, const TableIdentifier &identifier)
 
161
{
 
162
  string new_path(identifier.getPath());
 
163
  new_path+= FILESYSTEM_EXT;
 
164
  int err= unlink(new_path.c_str());
 
165
  if (err)
 
166
  {
 
167
    err= errno;
 
168
  }
 
169
  return err;
 
170
}
 
171
 
 
172
bool FilesystemEngine::doDoesTableExist(Session &, const TableIdentifier &identifier)
 
173
{
 
174
  string proto_path(identifier.getPath());
 
175
  proto_path.append(FILESYSTEM_EXT);
 
176
 
 
177
  if (access(proto_path.c_str(), F_OK))
 
178
  {
 
179
    return false;
 
180
  }
 
181
 
 
182
  return true;
 
183
}
 
184
 
 
185
FilesystemTableShare *FilesystemEngine::findOpenTable(const string table_name)
 
186
{
 
187
  FilesystemMap::iterator find_iter=
 
188
    fs_open_tables.find(table_name);
 
189
 
 
190
  if (find_iter != fs_open_tables.end())
 
191
    return (*find_iter).second;
 
192
  else
 
193
    return NULL;
 
194
}
 
195
 
 
196
void FilesystemEngine::addOpenTable(const string &table_name, FilesystemTableShare *share)
 
197
{
 
198
  fs_open_tables[table_name]= share;
 
199
}
 
200
 
 
201
void FilesystemEngine::deleteOpenTable(const string &table_name)
 
202
{
 
203
  fs_open_tables.erase(table_name);
 
204
}
 
205
 
 
206
static int parseTaggedFile(const FormatInfo &fi, vector< map<string, string> > &v)
 
207
{
 
208
  int filedesc= ::open(fi.getFileName().c_str(), O_RDONLY);
 
209
  if (filedesc < 0)
 
210
    return errno;
 
211
 
 
212
  boost::scoped_ptr<TransparentFile> filebuffer(new TransparentFile);
 
213
  filebuffer->init_buff(filedesc);
 
214
 
 
215
  bool last_line_empty= false;
 
216
  map<string, string> kv;
 
217
  int pos= 0;
 
218
  string line;
 
219
  while (1)
 
220
  {
 
221
    char ch= filebuffer->get_value(pos);
 
222
    if (ch == '\0')
 
223
    {
 
224
      if (!last_line_empty)
 
225
      {
 
226
        v.push_back(kv);
 
227
        kv.clear();
 
228
      }
 
229
      break;
 
230
    }
 
231
    ++pos;
 
232
 
 
233
    if (!fi.isRowSeparator(ch))
 
234
    {
 
235
      line.push_back(ch);
 
236
      continue;
 
237
    }
 
238
 
 
239
    // if we have a new empty line,
 
240
    // it means we got the end of a section, push it to vector
 
241
    if (line.empty())
 
242
    {
 
243
      if (!last_line_empty)
 
244
      {
 
245
        v.push_back(kv);
 
246
        kv.clear();
 
247
      }
 
248
      last_line_empty= true;
 
249
      continue;
 
250
    }
 
251
 
 
252
    // parse the line
 
253
    vector<string> sv, svcopy;
 
254
    boost::split(sv, line, boost::is_any_of(fi.getColSeparator()));
 
255
    for (vector<string>::iterator iter= sv.begin();
 
256
         iter != sv.end();
 
257
         ++iter)
 
258
    {
 
259
      if (!iter->empty())
 
260
        svcopy.push_back(*iter);
 
261
    }
 
262
 
 
263
    // the first splitted string as key,
 
264
    // and the second splitted string as value.
 
265
    string key(svcopy[0]);
 
266
    boost::trim(key);
 
267
    if (svcopy.size() >= 2)
 
268
    {
 
269
      string value(svcopy[1]);
 
270
      boost::trim(value);
 
271
      kv[key]= value;
 
272
    }
 
273
    else if (svcopy.size() >= 1)
 
274
      kv[key]= "";
 
275
 
 
276
    last_line_empty= false;
 
277
    line.clear();
 
278
  }
 
279
  close(filedesc);
 
280
  return 0;
 
281
}
 
282
 
 
283
int FilesystemEngine::doGetTableDefinition(Session &,
 
284
                                           const drizzled::TableIdentifier &identifier,
 
285
                                           drizzled::message::Table &table_proto)
 
286
{
 
287
  string new_path(identifier.getPath());
 
288
  new_path.append(FILESYSTEM_EXT);
 
289
 
 
290
  int fd= ::open(new_path.c_str(), O_RDONLY);
 
291
  if (fd < 0)
 
292
    return ENOENT;
 
293
 
 
294
  google::protobuf::io::ZeroCopyInputStream* input=
 
295
    new google::protobuf::io::FileInputStream(fd);
 
296
 
 
297
  if (not input)
 
298
    return HA_ERR_CRASHED_ON_USAGE;
 
299
 
 
300
  if (not table_proto.ParseFromZeroCopyStream(input))
 
301
  {
 
302
    close(fd);
 
303
    delete input;
 
304
    if (not table_proto.IsInitialized())
 
305
    {
 
306
      my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
 
307
               table_proto.InitializationErrorString().c_str());
 
308
      return ER_CORRUPT_TABLE_DEFINITION;
 
309
    }
 
310
 
 
311
    return HA_ERR_CRASHED_ON_USAGE;
 
312
  }
 
313
  delete input;
 
314
 
 
315
  // if the file is a tagged file such as /proc/meminfo
 
316
  // then columns of this table are added dynamically here.
 
317
  FormatInfo format;
 
318
  format.parseFromTable(&table_proto);
 
319
  if (not format.isTagFormat() || not format.isFileGiven())
 
320
  {
 
321
    close(fd);
 
322
    return EEXIST;
 
323
  }
 
324
 
 
325
  std::vector< std::map<std::string, std::string> > vm;
 
326
  if (parseTaggedFile(format, vm) != 0)
 
327
  {
 
328
    close(fd);
 
329
 
 
330
    return EEXIST;
 
331
  }
 
332
  if (vm.size() == 0) {
 
333
    close(fd);
 
334
    return EEXIST;
 
335
  }
 
336
 
 
337
  // we don't care what user provides, just clear them all
 
338
  table_proto.clear_field();
 
339
  // we take the first section as sample
 
340
  std::map<string, string> kv= vm[0];
 
341
  for (std::map<string, string>::iterator iter= kv.begin();
 
342
       iter != kv.end();
 
343
       ++iter)
 
344
  {
 
345
    // add columns to table proto
 
346
    message::Table::Field *field= table_proto.add_field();
 
347
    field->set_name(iter->first);
 
348
    field->set_type(drizzled::message::Table::Field::VARCHAR);
 
349
    message::Table::Field::StringFieldOptions *stringoption= field->mutable_string_options();
 
350
    stringoption->set_length(iter->second.length() + 1);
 
351
  }
 
352
 
 
353
  close(fd);
 
354
  return EEXIST;
 
355
}
 
356
 
 
357
FilesystemTableShare::FilesystemTableShare(const string table_name_arg)
 
358
  : use_count(0), table_name(table_name_arg),
 
359
  update_file_opened(false),
 
360
  needs_reopen(false)
 
361
{
 
362
}
 
363
 
 
364
FilesystemTableShare::~FilesystemTableShare()
 
365
{
 
366
  pthread_mutex_destroy(&mutex);
 
367
}
 
368
 
 
369
FilesystemTableShare *FilesystemCursor::get_share(const char *table_name)
 
370
{
 
371
  Guard g(filesystem_mutex);
 
372
 
 
373
  FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
 
374
  share= a_engine->findOpenTable(table_name);
 
375
 
 
376
  /*
 
377
    If share is not present in the hash, create a new share and
 
378
    initialize its members.
 
379
  */
 
380
  if (share == NULL)
 
381
  {
 
382
    share= new (nothrow) FilesystemTableShare(table_name);
 
383
    if (share == NULL)
 
384
    {
 
385
      return NULL;
 
386
    }
 
387
 
 
388
    share->format.parseFromTable(getTable()->getShare()->getTableProto());
 
389
    if (!share->format.isFileGiven())
 
390
    {
 
391
      return NULL;
 
392
    }
 
393
    /*
 
394
     * for taggered file such as /proc/meminfo,
 
395
     * we pre-process it first, and store the parsing result in a map.
 
396
     */
 
397
    if (share->format.isTagFormat())
 
398
    {
 
399
      if (parseTaggedFile(share->format, share->vm) != 0)
 
400
      {
 
401
        return NULL;
 
402
      }
 
403
    }
 
404
    a_engine->addOpenTable(share->table_name, share);
 
405
 
 
406
    pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
 
407
  }
 
408
  share->use_count++;
 
409
 
 
410
  return share;
 
411
}
 
412
 
 
413
void FilesystemCursor::free_share()
 
414
{
 
415
  Guard g(filesystem_mutex);
 
416
 
 
417
  if (!--share->use_count){
 
418
    FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
 
419
    a_engine->deleteOpenTable(share->table_name);
 
420
    pthread_mutex_destroy(&share->mutex);
 
421
    delete share;
 
422
  }
 
423
}
 
424
 
 
425
void FilesystemCursor::critical_section_enter()
 
426
{
 
427
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
 
428
      sql_command_type == SQLCOM_UPDATE ||
 
429
      sql_command_type == SQLCOM_DELETE ||
 
430
      sql_command_type == SQLCOM_INSERT ||
 
431
      sql_command_type == SQLCOM_INSERT_SELECT ||
 
432
      sql_command_type == SQLCOM_REPLACE ||
 
433
      sql_command_type == SQLCOM_REPLACE_SELECT)
 
434
    share->filesystem_lock.scan_update_begin();
 
435
  else
 
436
    share->filesystem_lock.scan_begin();
 
437
 
 
438
  thread_locked = true;
 
439
}
 
440
 
 
441
void FilesystemCursor::critical_section_exit()
 
442
{
 
443
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
 
444
      sql_command_type == SQLCOM_UPDATE ||
 
445
      sql_command_type == SQLCOM_DELETE ||
 
446
      sql_command_type == SQLCOM_INSERT ||
 
447
      sql_command_type == SQLCOM_INSERT_SELECT ||
 
448
      sql_command_type == SQLCOM_REPLACE ||
 
449
      sql_command_type == SQLCOM_REPLACE_SELECT)
 
450
    share->filesystem_lock.scan_update_end();
 
451
  else
 
452
    share->filesystem_lock.scan_end();
 
453
 
 
454
  thread_locked = false;
 
455
}
 
456
 
 
457
FilesystemCursor::FilesystemCursor(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
 
458
  : Cursor(engine_arg, table_arg),
 
459
    file_buff(new TransparentFile),
 
460
    thread_locked(false)
 
461
{
 
462
}
 
463
 
 
464
int FilesystemCursor::doOpen(const drizzled::TableIdentifier &identifier, int, uint32_t)
 
465
{
 
466
  if (!(share= get_share(identifier.getPath().c_str())))
 
467
    return ENOENT;
 
468
 
 
469
  file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
 
470
  if (file_desc < 0)
 
471
  {
 
472
    free_share();
 
473
    return ER_CANT_OPEN_FILE;
 
474
  }
 
475
 
 
476
  ref_length= sizeof(off_t);
 
477
  return 0;
 
478
}
 
479
 
 
480
int FilesystemCursor::close(void)
 
481
{
 
482
  int err= ::close(file_desc);
 
483
  if (err < 0)
 
484
    err= errno;
 
485
  free_share();
 
486
  return err;
 
487
}
 
488
 
 
489
int FilesystemCursor::doStartTableScan(bool)
 
490
{
 
491
  sql_command_type = session_sql_command(getTable()->getSession());
 
492
 
 
493
  if (thread_locked)
 
494
    critical_section_exit();
 
495
  critical_section_enter();
 
496
 
 
497
  if (share->format.isTagFormat())
 
498
  {
 
499
    tag_depth= 0;
 
500
    return 0;
 
501
  }
 
502
 
 
503
  current_position= 0;
 
504
  next_position= 0;
 
505
  slots.clear();
 
506
  if (share->needs_reopen)
 
507
  {
 
508
    file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
 
509
    if (file_desc < 0)
 
510
      return HA_ERR_CRASHED_ON_USAGE;
 
511
    share->needs_reopen= false;
 
512
  }
 
513
  file_buff->init_buff(file_desc);
 
514
  return 0;
 
515
}
 
516
 
 
517
int FilesystemCursor::find_current_row(unsigned char *buf)
 
518
{
 
519
  ptrdiff_t row_offset= buf - getTable()->record[0];
 
520
 
 
521
  next_position= current_position;
 
522
 
 
523
  string content;
 
524
  bool line_done= false;
 
525
  bool line_blank= true;
 
526
  Field **field= getTable()->getFields();
 
527
  for (; !line_done && *field; ++next_position)
 
528
  {
 
529
    char ch= file_buff->get_value(next_position);
 
530
    if (ch == '\0')
 
531
      return HA_ERR_END_OF_FILE;
 
532
 
 
533
    if (share->format.isEscapedChar(ch))
 
534
    {
 
535
      // read next character
 
536
      ch= file_buff->get_value(++next_position);
 
537
      if (ch == '\0')
 
538
        return HA_ERR_END_OF_FILE;
 
539
 
 
540
      content.push_back(FormatInfo::getEscapedChar(ch));
 
541
 
 
542
      continue;
 
543
    }
 
544
 
 
545
    // if we find separator
 
546
    bool is_row= share->format.isRowSeparator(ch);
 
547
    bool is_col= share->format.isColSeparator(ch);
 
548
    if (content.empty())
 
549
    {
 
550
      if (share->format.isSeparatorModeGeneral() && is_row && line_blank)
 
551
        continue;
 
552
      if (share->format.isSeparatorModeWeak() && is_col)
 
553
        continue;
 
554
    }
 
555
 
 
556
    if (is_row || is_col)
 
557
    {
 
558
      (*field)->move_field_offset(row_offset);
 
559
      if (!content.empty())
 
560
      {
 
561
        (*field)->set_notnull();
 
562
        if ((*field)->isReadSet() || (*field)->isWriteSet())
 
563
        {
 
564
          (*field)->setWriteSet();
 
565
          (*field)->store_and_check(CHECK_FIELD_WARN,
 
566
                                    content.c_str(),
 
567
                                    (uint32_t)content.length(),
 
568
                                    &my_charset_bin);
 
569
        }
 
570
        else
 
571
        {
 
572
          (*field)->set_default();
 
573
        }
 
574
      }
 
575
      else
 
576
        (*field)->set_null();
 
577
      (*field)->move_field_offset(-row_offset);
 
578
 
 
579
      content.clear();
 
580
      ++field;
 
581
 
 
582
      line_blank= false;
 
583
      if (is_row)
 
584
        line_done= true;
 
585
 
 
586
      continue;
 
587
    }
 
588
    content.push_back(ch);
 
589
  }
 
590
  if (line_done)
 
591
  {
 
592
    for (; *field; ++field)
 
593
    {
 
594
      (*field)->move_field_offset(row_offset);
 
595
      (*field)->set_notnull();
 
596
      (*field)->set_default();
 
597
      (*field)->move_field_offset(-row_offset);
 
598
    }
 
599
  }
 
600
  else
 
601
  {
 
602
    // eat up characters when line_done
 
603
    while (!line_done)
 
604
    {
 
605
      char ch= file_buff->get_value(next_position);
 
606
      if (share->format.isRowSeparator(ch))
 
607
        line_done= true;
 
608
      ++next_position;
 
609
    }
 
610
  }
 
611
  return 0;
 
612
}
 
613
 
 
614
int FilesystemCursor::rnd_next(unsigned char *buf)
 
615
{
 
616
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
617
  if (share->format.isTagFormat())
 
618
  {
 
619
    if (tag_depth >= share->vm.size())
 
620
      return HA_ERR_END_OF_FILE;
 
621
 
 
622
    ptrdiff_t row_offset= buf - getTable()->record[0];
 
623
    for (Field **field= getTable()->getFields(); *field; field++)
 
624
    {
 
625
      string key((*field)->field_name);
 
626
      string content= share->vm[tag_depth][key];
 
627
 
 
628
      (*field)->move_field_offset(row_offset);
 
629
      if (!content.empty())
 
630
      {
 
631
        (*field)->set_notnull();
 
632
        if ((*field)->isReadSet() || (*field)->isWriteSet())
 
633
        {
 
634
          (*field)->setWriteSet();
 
635
          (*field)->store_and_check(CHECK_FIELD_WARN,
 
636
                                    content.c_str(),
 
637
                                    (uint32_t)content.length(),
 
638
                                    &my_charset_bin);
 
639
        }
 
640
        else
 
641
        {
 
642
          (*field)->set_default();
 
643
        }
 
644
      }
 
645
      else
 
646
      {
 
647
        (*field)->set_null();
 
648
      }
 
649
      (*field)->move_field_offset(-row_offset);
 
650
    }
 
651
    ++tag_depth;
 
652
    return 0;
 
653
  }
 
654
  // normal file
 
655
  current_position= next_position;
 
656
  return find_current_row(buf);
 
657
}
 
658
 
 
659
void FilesystemCursor::position(const unsigned char *)
 
660
{
 
661
  *reinterpret_cast<off_t *>(ref)= current_position;
 
662
}
 
663
 
 
664
int FilesystemCursor::rnd_pos(unsigned char * buf, unsigned char *pos)
 
665
{
 
666
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
 
667
  current_position= *reinterpret_cast<off_t *>(pos);
 
668
  return find_current_row(buf);
 
669
}
 
670
 
 
671
int FilesystemCursor::info(uint32_t)
 
672
{
 
673
  if (stats.records < 2)
 
674
    stats.records= 2;
 
675
  return 0;
 
676
}
 
677
 
 
678
int FilesystemCursor::openUpdateFile()
 
679
{
 
680
  if (!share->update_file_opened)
 
681
  {
 
682
    struct stat st;
 
683
    if (stat(share->format.getFileName().c_str(), &st) < 0)
 
684
      return -1;
 
685
    update_file_name= share->format.getFileName();
 
686
    update_file_name.append(".UPDATE");
 
687
    unlink(update_file_name.c_str());
 
688
    update_file_desc= ::open(update_file_name.c_str(),
 
689
                             O_RDWR | O_CREAT | O_TRUNC,
 
690
                             st.st_mode);
 
691
    if (update_file_desc < 0)
 
692
    {
 
693
      return -1;
 
694
    }
 
695
    share->update_file_opened= true;
 
696
  }
 
697
  return 0;
 
698
}
 
699
 
 
700
int FilesystemCursor::doEndTableScan()
 
701
{
 
702
  sql_command_type = session_sql_command(getTable()->getSession());
 
703
 
 
704
  if (share->format.isTagFormat())
 
705
  {
 
706
    if (thread_locked)
 
707
      critical_section_exit();
 
708
    return 0;
 
709
  }
 
710
 
 
711
  if (slots.size() == 0)
 
712
  {
 
713
    if (thread_locked)
 
714
      critical_section_exit();
 
715
    return 0;
 
716
  }
 
717
 
 
718
  int err= -1;
 
719
  sort(slots.begin(), slots.end());
 
720
  vector< pair<off_t, off_t> >::iterator slot_iter= slots.begin();
 
721
  off_t write_start= 0;
 
722
  off_t write_end= 0;
 
723
  off_t file_buffer_start= 0;
 
724
 
 
725
  pthread_mutex_lock(&share->mutex);
 
726
 
 
727
  file_buff->init_buff(file_desc);
 
728
  if (openUpdateFile() < 0)
 
729
    goto error;
 
730
 
 
731
  while (file_buffer_start != -1)
 
732
  {
 
733
    bool in_hole= false;
 
734
 
 
735
    write_end= file_buff->end();
 
736
    if (slot_iter != slots.end() &&
 
737
      write_end >= slot_iter->first)
 
738
    {
 
739
      write_end= slot_iter->first;
 
740
      in_hole= true;
 
741
    }
 
742
 
 
743
    off_t write_length= write_end - write_start;
 
744
    if (write_in_all(update_file_desc,
 
745
               file_buff->ptr() + (write_start - file_buff->start()),
 
746
               write_length) != write_length)
 
747
      goto error;
 
748
 
 
749
    if (in_hole)
 
750
    {
 
751
      while (file_buff->end() <= slot_iter->second && file_buffer_start != -1)
 
752
        file_buffer_start= file_buff->read_next();
 
753
      write_start= slot_iter->second;
 
754
      ++slot_iter;
 
755
    }
 
756
    else
 
757
      write_start= write_end;
 
758
 
 
759
    if (write_end == file_buff->end())
 
760
      file_buffer_start= file_buff->read_next();
 
761
  }
 
762
  // close update file
 
763
  if (::fsync(update_file_desc) || 
 
764
      ::close(update_file_desc))
 
765
    goto error;
 
766
  share->update_file_opened= false;
 
767
 
 
768
  // close current file
 
769
  if (::close(file_desc))
 
770
    goto error;
 
771
  if (::rename(update_file_name.c_str(), share->format.getFileName().c_str()))
 
772
    goto error;
 
773
 
 
774
  share->needs_reopen= true;
 
775
 
 
776
error:
 
777
  err= errno;
 
778
  pthread_mutex_unlock(&share->mutex);
 
779
 
 
780
  if (thread_locked)
 
781
    critical_section_exit();
 
782
 
 
783
  return err;
 
784
}
 
785
 
 
786
void FilesystemCursor::recordToString(string& output)
 
787
{
 
788
  bool first= true;
 
789
  drizzled::String attribute;
 
790
  for (Field **field= getTable()->getFields(); *field; ++field)
 
791
  {
 
792
    if (first == true)
 
793
    {
 
794
      first= false;
 
795
    }
 
796
    else
 
797
    {
 
798
      output.append(share->format.getColSeparatorHead());
 
799
    }
 
800
 
 
801
    if (not (*field)->is_null())
 
802
    {
 
803
      (*field)->setReadSet();
 
804
      (*field)->val_str(&attribute, &attribute);
 
805
 
 
806
      output.append(attribute.ptr(), attribute.length());
 
807
    }
 
808
    else
 
809
    {
 
810
      output.append("0");
 
811
    }
 
812
  }
 
813
  output.append(share->format.getRowSeparatorHead());
 
814
}
 
815
 
 
816
int FilesystemCursor::doInsertRecord(unsigned char * buf)
 
817
{
 
818
  (void)buf;
 
819
 
 
820
  if (share->format.isTagFormat())
 
821
    return 0;
 
822
 
 
823
  sql_command_type = session_sql_command(getTable()->getSession());
 
824
 
 
825
  critical_section_enter();
 
826
 
 
827
  int err_write= 0;
 
828
  int err_close= 0;
 
829
 
 
830
  string output_line;
 
831
  recordToString(output_line);
 
832
 
 
833
  int fd= ::open(share->format.getFileName().c_str(), O_WRONLY | O_APPEND);
 
834
  if (fd < 0)
 
835
  {
 
836
    critical_section_exit();
 
837
    return ENOENT;
 
838
  }
 
839
 
 
840
  err_write= write_in_all(fd, output_line.c_str(), output_line.length());
 
841
  if (err_write < 0)
 
842
    err_write= errno;
 
843
  else
 
844
    err_write= 0;
 
845
 
 
846
  err_close= ::close(fd);
 
847
  if (err_close < 0)
 
848
    err_close= errno;
 
849
 
 
850
  critical_section_exit();
 
851
 
 
852
  if (err_write)
 
853
    return err_write;
 
854
  if (err_close)
 
855
    return err_close;
 
856
  return 0;
 
857
}
 
858
 
 
859
int FilesystemCursor::doUpdateRecord(const unsigned char *, unsigned char *)
 
860
{
 
861
  if (share->format.isTagFormat())
 
862
    return 0;
 
863
  if (openUpdateFile())
 
864
    return errno;
 
865
 
 
866
  // get the update information
 
867
  string str;
 
868
  recordToString(str);
 
869
 
 
870
  if (write_in_all(update_file_desc, str.c_str(), str.length()) < 0)
 
871
    return errno;
 
872
 
 
873
  addSlot();
 
874
 
 
875
  return 0;
 
876
}
 
877
 
 
878
void FilesystemCursor::addSlot()
 
879
{
 
880
  if (slots.size() > 0 && slots.back().second == current_position)
 
881
    slots.back().second= next_position;
 
882
  else
 
883
    slots.push_back(make_pair(current_position, next_position));
 
884
}
 
885
 
 
886
int FilesystemCursor::doDeleteRecord(const unsigned char *)
 
887
{
 
888
  if (share->format.isTagFormat())
 
889
    return 0;
 
890
  addSlot();
 
891
  return 0;
 
892
}
 
893
 
 
894
int FilesystemEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
 
895
{
 
896
  if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), FILESYSTEM_EXT))
 
897
    return errno;
 
898
  return 0;
 
899
}
 
900
 
 
901
bool FilesystemEngine::validateCreateTableOption(const std::string &key,
 
902
                                                 const std::string &state)
 
903
{
 
904
  return FormatInfo::validateOption(key, state);
 
905
}
 
906
 
 
907
int FilesystemEngine::doCreateTable(Session &,
 
908
                        Table&,
 
909
                        const drizzled::TableIdentifier &identifier,
 
910
                        drizzled::message::Table &proto)
 
911
{
 
912
  FormatInfo format;
 
913
  format.parseFromTable(&proto);
 
914
  if (format.isFileGiven())
 
915
  {
 
916
    int err= ::open(format.getFileName().c_str(), O_RDONLY);
 
917
    if (err < 0)
 
918
      return errno;
 
919
  }
 
920
 
 
921
  string new_path(identifier.getPath());
 
922
  new_path+= FILESYSTEM_EXT;
 
923
  fstream output(new_path.c_str(), ios::out | ios::binary);
 
924
 
 
925
  if (! output)
 
926
    return 1;
 
927
 
 
928
  if (! proto.SerializeToOstream(&output))
 
929
  {
 
930
    output.close();
 
931
    unlink(new_path.c_str());
 
932
    return 1;
 
933
  }
 
934
 
 
935
  return 0;
 
936
}
 
937
 
 
938
static FilesystemEngine *filesystem_engine= NULL;
 
939
 
 
940
static int filesystem_init_func(drizzled::module::Context &context)
 
941
{
 
942
  filesystem_engine= new FilesystemEngine("FILESYSTEM");
 
943
  context.add(filesystem_engine);
 
944
 
 
945
  return 0;
 
946
}
 
947
 
 
948
DRIZZLE_DECLARE_PLUGIN
 
949
{
 
950
  DRIZZLE_VERSION_ID,
 
951
  "FILESYSTEM",
 
952
  "1.0",
 
953
  "Zimin",
 
954
  "Filesystem Engine",
 
955
  PLUGIN_LICENSE_GPL,
 
956
  filesystem_init_func, /* Plugin Init */
 
957
  NULL,                       /* system variables                */
 
958
  NULL                        /* config options                  */
 
959
}
 
960
DRIZZLE_DECLARE_PLUGIN_END;