~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/filesystem_engine/filesystem_engine.cc

  • Committer: Brian Aker
  • Date: 2010-11-08 18:54:26 UTC
  • mto: (1921.1.1 trunk)
  • mto: This revision was merged to the branch mainline in revision 1916.
  • Revision ID: brian@tangent.org-20101108185426-fymkf2xnelupf11x
Rename lock methods to be style + well make sense.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
  Copyright (C) 2010 Zimin
 
3
 
 
4
  This program is free software; you can redistribute it and/or
 
5
  modify it under the terms of the GNU General Public License
 
6
  as published by the Free Software Foundation; either version 2
 
7
  of the License, or (at your option) any later version.
 
8
 
 
9
  This program is distributed in the hope that it will be useful,
 
10
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
  GNU General Public License for more details.
 
13
 
 
14
  You should have received a copy of the GNU General Public License
 
15
  along with this program; if not, write to the Free Software
 
16
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
17
*/
 
18
 
 
19
#include "config.h"
 
20
#include <drizzled/field.h>
 
21
#include <drizzled/field/blob.h>
 
22
#include <drizzled/field/timestamp.h>
 
23
#include <drizzled/error.h>
 
24
#include <drizzled/table.h>
 
25
#include <drizzled/session.h>
 
26
#include "drizzled/internal/my_sys.h"
 
27
#include <google/protobuf/io/zero_copy_stream.h>
 
28
#include <google/protobuf/io/zero_copy_stream_impl.h>
 
29
 
 
30
#include "filesystem_engine.h"
 
31
#include "utility.h"
 
32
 
 
33
#include <fcntl.h>
 
34
 
 
35
#include <string>
 
36
#include <map>
 
37
#include <fstream>
 
38
#include <sstream>
 
39
#include <iostream>
 
40
#include <boost/algorithm/string.hpp>
 
41
 
 
42
using namespace std;
 
43
using namespace drizzled;
 
44
 
 
45
#define FILESYSTEM_EXT ".FST"
 
46
 
 
47
/* Stuff for shares */
 
48
pthread_mutex_t filesystem_mutex;
 
49
 
 
50
static const char *ha_filesystem_exts[] = {
 
51
  FILESYSTEM_EXT,
 
52
  NULL
 
53
};
 
54
 
 
55
class FilesystemEngine : public drizzled::plugin::StorageEngine
 
56
{
 
57
private:
 
58
  typedef std::map<string, FilesystemTableShare*> FilesystemMap;
 
59
  FilesystemMap fs_open_tables;
 
60
public:
 
61
  FilesystemEngine(const string& name_arg)
 
62
   : drizzled::plugin::StorageEngine(name_arg,
 
63
                                     HTON_NULL_IN_KEY |
 
64
                                     HTON_SKIP_STORE_LOCK |
 
65
                                     HTON_CAN_INDEX_BLOBS |
 
66
                                     HTON_AUTO_PART_KEY),
 
67
     fs_open_tables()
 
68
  {
 
69
    table_definition_ext= FILESYSTEM_EXT;
 
70
    pthread_mutex_init(&filesystem_mutex, MY_MUTEX_INIT_FAST);
 
71
  }
 
72
  virtual ~FilesystemEngine()
 
73
  {
 
74
    pthread_mutex_destroy(&filesystem_mutex);
 
75
  }
 
76
 
 
77
  virtual Cursor *create(Table &table)
 
78
  {
 
79
    return new FilesystemCursor(*this, table);
 
80
  }
 
81
 
 
82
  const char **bas_ext() const {
 
83
    return ha_filesystem_exts;
 
84
  }
 
85
 
 
86
  bool validateCreateTableOption(const std::string &key, const std::string &state);
 
87
 
 
88
  int doCreateTable(Session &,
 
89
                    Table &table_arg,
 
90
                    const drizzled::TableIdentifier &identifier,
 
91
                    drizzled::message::Table&);
 
92
 
 
93
  int doGetTableDefinition(Session& ,
 
94
                           const drizzled::TableIdentifier &,
 
95
                           drizzled::message::Table &);
 
96
 
 
97
  int doDropTable(Session&, const TableIdentifier &);
 
98
 
 
99
  /* operations on FilesystemTableShare */
 
100
  FilesystemTableShare *findOpenTable(const string table_name);
 
101
  void addOpenTable(const string &table_name, FilesystemTableShare *);
 
102
  void deleteOpenTable(const string &table_name);
 
103
 
 
104
  uint32_t max_keys()          const { return 0; }
 
105
  uint32_t max_key_parts()     const { return 0; }
 
106
  uint32_t max_key_length()    const { return 0; }
 
107
  bool doDoesTableExist(Session& , const TableIdentifier &);
 
108
  int doRenameTable(Session&, const TableIdentifier &, const TableIdentifier &);
 
109
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
110
                             const drizzled::SchemaIdentifier &schema_identifier,
 
111
                             drizzled::TableIdentifiers &set_of_identifiers);
 
112
private:
 
113
  void getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
 
114
                                   const drizzled::SchemaIdentifier &schema_identifier,
 
115
                                   drizzled::plugin::TableNameList *set_of_names,
 
116
                                   drizzled::TableIdentifiers *set_of_identifiers);
 
117
};
 
118
 
 
119
void FilesystemEngine::getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
 
120
                                                   const drizzled::SchemaIdentifier &schema_identifier,
 
121
                                                   drizzled::plugin::TableNameList *set_of_names,
 
122
                                                   drizzled::TableIdentifiers *set_of_identifiers)
 
123
{
 
124
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
125
 
 
126
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
 
127
      entry_iter != entries.end();
 
128
      ++entry_iter)
 
129
  {
 
130
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
131
    const string *filename= &entry->filename;
 
132
 
 
133
    assert(not filename->empty());
 
134
 
 
135
    string::size_type suffix_pos= filename->rfind('.');
 
136
 
 
137
    if (suffix_pos != string::npos &&
 
138
        boost::iequals(filename->substr(suffix_pos), FILESYSTEM_EXT) &&
 
139
        filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX))
 
140
    {
 
141
      char uname[NAME_LEN + 1];
 
142
      uint32_t file_name_len;
 
143
 
 
144
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
145
      uname[file_name_len - sizeof(FILESYSTEM_EXT) + 1]= '\0';
 
146
      if (set_of_names)
 
147
        set_of_names->insert(uname);
 
148
      if (set_of_identifiers)
 
149
        set_of_identifiers->push_back(TableIdentifier(schema_identifier, uname));
 
150
    }
 
151
  }
 
152
}
 
153
 
 
154
void FilesystemEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
155
                                             const drizzled::SchemaIdentifier &schema_identifier,
 
156
                                             drizzled::TableIdentifiers &set_of_identifiers)
 
157
{
 
158
  getTableNamesFromFilesystem(directory, schema_identifier, NULL, &set_of_identifiers);
 
159
}
 
160
 
 
161
int FilesystemEngine::doDropTable(Session &, const TableIdentifier &identifier)
 
162
{
 
163
  string new_path(identifier.getPath());
 
164
  new_path+= FILESYSTEM_EXT;
 
165
  int err= unlink(new_path.c_str());
 
166
  if (err)
 
167
  {
 
168
    err= errno;
 
169
  }
 
170
  return err;
 
171
}
 
172
 
 
173
bool FilesystemEngine::doDoesTableExist(Session &, const TableIdentifier &identifier)
 
174
{
 
175
  string proto_path(identifier.getPath());
 
176
  proto_path.append(FILESYSTEM_EXT);
 
177
 
 
178
  if (access(proto_path.c_str(), F_OK))
 
179
  {
 
180
    return false;
 
181
  }
 
182
 
 
183
  return true;
 
184
}
 
185
 
 
186
FilesystemTableShare *FilesystemEngine::findOpenTable(const string table_name)
 
187
{
 
188
  FilesystemMap::iterator find_iter=
 
189
    fs_open_tables.find(table_name);
 
190
 
 
191
  if (find_iter != fs_open_tables.end())
 
192
    return (*find_iter).second;
 
193
  else
 
194
    return NULL;
 
195
}
 
196
 
 
197
void FilesystemEngine::addOpenTable(const string &table_name, FilesystemTableShare *share)
 
198
{
 
199
  fs_open_tables[table_name]= share;
 
200
}
 
201
 
 
202
void FilesystemEngine::deleteOpenTable(const string &table_name)
 
203
{
 
204
  fs_open_tables.erase(table_name);
 
205
}
 
206
 
 
207
static int parseTaggedFile(const FormatInfo &fi, vector< map<string, string> > &v)
 
208
{
 
209
  int filedesc= ::open(fi.getFileName().c_str(), O_RDONLY);
 
210
  if (filedesc < 0)
 
211
    return errno;
 
212
 
 
213
  boost::scoped_ptr<TransparentFile> filebuffer(new TransparentFile);
 
214
  filebuffer->init_buff(filedesc);
 
215
 
 
216
  bool last_line_empty= false;
 
217
  map<string, string> kv;
 
218
  int pos= 0;
 
219
  string line;
 
220
  while (1)
 
221
  {
 
222
    char ch= filebuffer->get_value(pos);
 
223
    if (ch == '\0')
 
224
    {
 
225
      if (!last_line_empty)
 
226
      {
 
227
        v.push_back(kv);
 
228
        kv.clear();
 
229
      }
 
230
      break;
 
231
    }
 
232
    ++pos;
 
233
 
 
234
    if (!fi.isRowSeparator(ch))
 
235
    {
 
236
      line.push_back(ch);
 
237
      continue;
 
238
    }
 
239
 
 
240
    // if we have a new empty line,
 
241
    // it means we got the end of a section, push it to vector
 
242
    if (line.empty())
 
243
    {
 
244
      if (!last_line_empty)
 
245
      {
 
246
        v.push_back(kv);
 
247
        kv.clear();
 
248
      }
 
249
      last_line_empty= true;
 
250
      continue;
 
251
    }
 
252
 
 
253
    // parse the line
 
254
    vector<string> sv, svcopy;
 
255
    boost::split(sv, line, boost::is_any_of(fi.getColSeparator()));
 
256
    for (vector<string>::iterator iter= sv.begin();
 
257
         iter != sv.end();
 
258
         ++iter)
 
259
    {
 
260
      if (!iter->empty())
 
261
        svcopy.push_back(*iter);
 
262
    }
 
263
 
 
264
    // the first splitted string as key,
 
265
    // and the second splitted string as value.
 
266
    string key(svcopy[0]);
 
267
    boost::trim(key);
 
268
    if (svcopy.size() >= 2)
 
269
    {
 
270
      string value(svcopy[1]);
 
271
      boost::trim(value);
 
272
      kv[key]= value;
 
273
    }
 
274
    else if (svcopy.size() >= 1)
 
275
      kv[key]= "";
 
276
 
 
277
    last_line_empty= false;
 
278
    line.clear();
 
279
  }
 
280
  close(filedesc);
 
281
  return 0;
 
282
}
 
283
 
 
284
int FilesystemEngine::doGetTableDefinition(Session &,
 
285
                                           const drizzled::TableIdentifier &identifier,
 
286
                                           drizzled::message::Table &table_proto)
 
287
{
 
288
  string new_path(identifier.getPath());
 
289
  new_path.append(FILESYSTEM_EXT);
 
290
 
 
291
  int fd= ::open(new_path.c_str(), O_RDONLY);
 
292
  if (fd < 0)
 
293
    return ENOENT;
 
294
 
 
295
  google::protobuf::io::ZeroCopyInputStream* input=
 
296
    new google::protobuf::io::FileInputStream(fd);
 
297
 
 
298
  if (not input)
 
299
    return HA_ERR_CRASHED_ON_USAGE;
 
300
 
 
301
  if (not table_proto.ParseFromZeroCopyStream(input))
 
302
  {
 
303
    close(fd);
 
304
    delete input;
 
305
    if (not table_proto.IsInitialized())
 
306
    {
 
307
      my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
 
308
               table_proto.InitializationErrorString().c_str());
 
309
      return ER_CORRUPT_TABLE_DEFINITION;
 
310
    }
 
311
 
 
312
    return HA_ERR_CRASHED_ON_USAGE;
 
313
  }
 
314
  delete input;
 
315
 
 
316
  // if the file is a tagged file such as /proc/meminfo
 
317
  // then columns of this table are added dynamically here.
 
318
  FormatInfo format;
 
319
  format.parseFromTable(&table_proto);
 
320
  if (!format.isTagFormat() || !format.isFileGiven()) {
 
321
    close(fd);
 
322
    return EEXIST;
 
323
  }
 
324
 
 
325
  vector< map<string, string> > vm;
 
326
  if (parseTaggedFile(format, vm) != 0) {
 
327
    close(fd);
 
328
 
 
329
    return EEXIST;
 
330
  }
 
331
  if (vm.size() == 0) {
 
332
    close(fd);
 
333
    return EEXIST;
 
334
  }
 
335
 
 
336
  // we don't care what user provides, just clear them all
 
337
  table_proto.clear_field();
 
338
  // we take the first section as sample
 
339
  map<string, string> kv= vm[0];
 
340
  for (map<string, string>::iterator iter= kv.begin();
 
341
       iter != kv.end();
 
342
       ++iter)
 
343
  {
 
344
    // add columns to table proto
 
345
    message::Table::Field *field= table_proto.add_field();
 
346
    field->set_name(iter->first);
 
347
    field->set_type(drizzled::message::Table::Field::VARCHAR);
 
348
    message::Table::Field::StringFieldOptions *stringoption= field->mutable_string_options();
 
349
    stringoption->set_length(iter->second.length() + 1);
 
350
  }
 
351
 
 
352
  close(fd);
 
353
  return EEXIST;
 
354
}
 
355
 
 
356
FilesystemTableShare::FilesystemTableShare(const string table_name_arg)
 
357
  : use_count(0), table_name(table_name_arg),
 
358
  update_file_opened(false),
 
359
  needs_reopen(false)
 
360
{
 
361
}
 
362
 
 
363
FilesystemTableShare::~FilesystemTableShare()
 
364
{
 
365
  pthread_mutex_destroy(&mutex);
 
366
}
 
367
 
 
368
FilesystemTableShare *FilesystemCursor::get_share(const char *table_name)
 
369
{
 
370
  Guard g(filesystem_mutex);
 
371
 
 
372
  FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
 
373
  share= a_engine->findOpenTable(table_name);
 
374
 
 
375
  /*
 
376
    If share is not present in the hash, create a new share and
 
377
    initialize its members.
 
378
  */
 
379
  if (share == NULL)
 
380
  {
 
381
    share= new (nothrow) FilesystemTableShare(table_name);
 
382
    if (share == NULL)
 
383
    {
 
384
      return NULL;
 
385
    }
 
386
 
 
387
    share->format.parseFromTable(getTable()->getShare()->getTableProto());
 
388
    if (!share->format.isFileGiven())
 
389
    {
 
390
      return NULL;
 
391
    }
 
392
    /*
 
393
     * for taggered file such as /proc/meminfo,
 
394
     * we pre-process it first, and store the parsing result in a map.
 
395
     */
 
396
    if (share->format.isTagFormat())
 
397
    {
 
398
      if (parseTaggedFile(share->format, share->vm) != 0)
 
399
      {
 
400
        return NULL;
 
401
      }
 
402
    }
 
403
    a_engine->addOpenTable(share->table_name, share);
 
404
 
 
405
    pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
 
406
  }
 
407
  share->use_count++;
 
408
 
 
409
  return share;
 
410
}
 
411
 
 
412
void FilesystemCursor::free_share()
 
413
{
 
414
  Guard g(filesystem_mutex);
 
415
 
 
416
  if (!--share->use_count){
 
417
    FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(getEngine());
 
418
    a_engine->deleteOpenTable(share->table_name);
 
419
    pthread_mutex_destroy(&share->mutex);
 
420
    delete share;
 
421
  }
 
422
}
 
423
 
 
424
void FilesystemCursor::critical_section_enter()
 
425
{
 
426
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
 
427
      sql_command_type == SQLCOM_UPDATE ||
 
428
      sql_command_type == SQLCOM_DELETE ||
 
429
      sql_command_type == SQLCOM_INSERT ||
 
430
      sql_command_type == SQLCOM_INSERT_SELECT ||
 
431
      sql_command_type == SQLCOM_REPLACE ||
 
432
      sql_command_type == SQLCOM_REPLACE_SELECT)
 
433
    share->filesystem_lock.scan_update_begin();
 
434
  else
 
435
    share->filesystem_lock.scan_begin();
 
436
 
 
437
  thread_locked = true;
 
438
}
 
439
 
 
440
void FilesystemCursor::critical_section_exit()
 
441
{
 
442
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
 
443
      sql_command_type == SQLCOM_UPDATE ||
 
444
      sql_command_type == SQLCOM_DELETE ||
 
445
      sql_command_type == SQLCOM_INSERT ||
 
446
      sql_command_type == SQLCOM_INSERT_SELECT ||
 
447
      sql_command_type == SQLCOM_REPLACE ||
 
448
      sql_command_type == SQLCOM_REPLACE_SELECT)
 
449
    share->filesystem_lock.scan_update_end();
 
450
  else
 
451
    share->filesystem_lock.scan_end();
 
452
 
 
453
  thread_locked = false;
 
454
}
 
455
 
 
456
FilesystemCursor::FilesystemCursor(drizzled::plugin::StorageEngine &engine_arg, Table &table_arg)
 
457
  : Cursor(engine_arg, table_arg),
 
458
    file_buff(new TransparentFile),
 
459
    thread_locked(false)
 
460
{
 
461
}
 
462
 
 
463
int FilesystemCursor::doOpen(const drizzled::TableIdentifier &identifier, int, uint32_t)
 
464
{
 
465
  if (!(share= get_share(identifier.getPath().c_str())))
 
466
    return ENOENT;
 
467
 
 
468
  file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
 
469
  if (file_desc < 0)
 
470
  {
 
471
    free_share();
 
472
    return ER_CANT_OPEN_FILE;
 
473
  }
 
474
 
 
475
  ref_length= sizeof(off_t);
 
476
  return 0;
 
477
}
 
478
 
 
479
int FilesystemCursor::close(void)
 
480
{
 
481
  int err= ::close(file_desc);
 
482
  if (err < 0)
 
483
    err= errno;
 
484
  free_share();
 
485
  return err;
 
486
}
 
487
 
 
488
int FilesystemCursor::doStartTableScan(bool)
 
489
{
 
490
  sql_command_type = session_sql_command(getTable()->getSession());
 
491
 
 
492
  if (thread_locked)
 
493
    critical_section_exit();
 
494
  critical_section_enter();
 
495
 
 
496
  if (share->format.isTagFormat())
 
497
  {
 
498
    tag_depth= 0;
 
499
    return 0;
 
500
  }
 
501
 
 
502
  current_position= 0;
 
503
  next_position= 0;
 
504
  slots.clear();
 
505
  if (share->needs_reopen)
 
506
  {
 
507
    file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
 
508
    if (file_desc < 0)
 
509
      return HA_ERR_CRASHED_ON_USAGE;
 
510
    share->needs_reopen= false;
 
511
  }
 
512
  file_buff->init_buff(file_desc);
 
513
  return 0;
 
514
}
 
515
 
 
516
int FilesystemCursor::find_current_row(unsigned char *buf)
 
517
{
 
518
  ptrdiff_t row_offset= buf - getTable()->record[0];
 
519
 
 
520
  next_position= current_position;
 
521
 
 
522
  string content;
 
523
  bool line_done= false;
 
524
  bool line_blank= true;
 
525
  Field **field= getTable()->getFields();
 
526
  for (; !line_done && *field; ++next_position)
 
527
  {
 
528
    char ch= file_buff->get_value(next_position);
 
529
    if (ch == '\0')
 
530
      return HA_ERR_END_OF_FILE;
 
531
 
 
532
    if (share->format.isEscapedChar(ch))
 
533
    {
 
534
      // read next character
 
535
      ch= file_buff->get_value(++next_position);
 
536
      if (ch == '\0')
 
537
        return HA_ERR_END_OF_FILE;
 
538
 
 
539
      content.push_back(FormatInfo::getEscapedChar(ch));
 
540
 
 
541
      continue;
 
542
    }
 
543
 
 
544
    // if we find separator
 
545
    bool is_row= share->format.isRowSeparator(ch);
 
546
    bool is_col= share->format.isColSeparator(ch);
 
547
    if (content.empty())
 
548
    {
 
549
      if (share->format.isSeparatorModeGeneral() && is_row && line_blank)
 
550
        continue;
 
551
      if (share->format.isSeparatorModeWeak() && is_col)
 
552
        continue;
 
553
    }
 
554
 
 
555
    if (is_row || is_col)
 
556
    {
 
557
      (*field)->move_field_offset(row_offset);
 
558
      if (!content.empty())
 
559
      {
 
560
        (*field)->set_notnull();
 
561
        if ((*field)->isReadSet() || (*field)->isWriteSet())
 
562
        {
 
563
          (*field)->setWriteSet();
 
564
          (*field)->store(content.c_str(),
 
565
                          (uint32_t)content.length(),
 
566
                          &my_charset_bin,
 
567
                          CHECK_FIELD_WARN);
 
568
        }
 
569
        else
 
570
          (*field)->set_default();
 
571
      }
 
572
      else
 
573
        (*field)->set_null();
 
574
      (*field)->move_field_offset(-row_offset);
 
575
 
 
576
      content.clear();
 
577
      ++field;
 
578
 
 
579
      line_blank= false;
 
580
      if (is_row)
 
581
        line_done= true;
 
582
 
 
583
      continue;
 
584
    }
 
585
    content.push_back(ch);
 
586
  }
 
587
  if (line_done)
 
588
  {
 
589
    for (; *field; ++field)
 
590
    {
 
591
      (*field)->move_field_offset(row_offset);
 
592
      (*field)->set_notnull();
 
593
      (*field)->set_default();
 
594
      (*field)->move_field_offset(-row_offset);
 
595
    }
 
596
  }
 
597
  else
 
598
  {
 
599
    // eat up characters when line_done
 
600
    while (!line_done)
 
601
    {
 
602
      char ch= file_buff->get_value(next_position);
 
603
      if (share->format.isRowSeparator(ch))
 
604
        line_done= true;
 
605
      ++next_position;
 
606
    }
 
607
  }
 
608
  return 0;
 
609
}
 
610
 
 
611
int FilesystemCursor::rnd_next(unsigned char *buf)
 
612
{
 
613
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
614
  if (share->format.isTagFormat())
 
615
  {
 
616
    if (tag_depth >= share->vm.size())
 
617
      return HA_ERR_END_OF_FILE;
 
618
 
 
619
    ptrdiff_t row_offset= buf - getTable()->record[0];
 
620
    for (Field **field= getTable()->getFields(); *field; field++)
 
621
    {
 
622
      string key((*field)->field_name);
 
623
      string content= share->vm[tag_depth][key];
 
624
 
 
625
      (*field)->move_field_offset(row_offset);
 
626
      if (!content.empty())
 
627
      {
 
628
        (*field)->set_notnull();
 
629
        if ((*field)->isReadSet() || (*field)->isWriteSet())
 
630
        {
 
631
          (*field)->setWriteSet();
 
632
          (*field)->store(content.c_str(),
 
633
                          (uint32_t)content.length(),
 
634
                          &my_charset_bin,
 
635
                          CHECK_FIELD_WARN);
 
636
        }
 
637
        else
 
638
        {
 
639
          (*field)->set_default();
 
640
        }
 
641
      }
 
642
      else
 
643
      {
 
644
        (*field)->set_null();
 
645
      }
 
646
      (*field)->move_field_offset(-row_offset);
 
647
    }
 
648
    ++tag_depth;
 
649
    return 0;
 
650
  }
 
651
  // normal file
 
652
  current_position= next_position;
 
653
  return find_current_row(buf);
 
654
}
 
655
 
 
656
void FilesystemCursor::position(const unsigned char *)
 
657
{
 
658
  *reinterpret_cast<off_t *>(ref)= current_position;
 
659
}
 
660
 
 
661
int FilesystemCursor::rnd_pos(unsigned char * buf, unsigned char *pos)
 
662
{
 
663
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
 
664
  current_position= *reinterpret_cast<off_t *>(pos);
 
665
  return find_current_row(buf);
 
666
}
 
667
 
 
668
int FilesystemCursor::info(uint32_t)
 
669
{
 
670
  if (stats.records < 2)
 
671
    stats.records= 2;
 
672
  return 0;
 
673
}
 
674
 
 
675
int FilesystemCursor::openUpdateFile()
 
676
{
 
677
  if (!share->update_file_opened)
 
678
  {
 
679
    struct stat st;
 
680
    if (stat(share->format.getFileName().c_str(), &st) < 0)
 
681
      return -1;
 
682
    update_file_name= share->format.getFileName();
 
683
    update_file_name.append(".UPDATE");
 
684
    unlink(update_file_name.c_str());
 
685
    update_file_desc= ::open(update_file_name.c_str(),
 
686
                             O_RDWR | O_CREAT | O_TRUNC,
 
687
                             st.st_mode);
 
688
    if (update_file_desc < 0)
 
689
    {
 
690
      return -1;
 
691
    }
 
692
    share->update_file_opened= true;
 
693
  }
 
694
  return 0;
 
695
}
 
696
 
 
697
int FilesystemCursor::doEndTableScan()
 
698
{
 
699
  sql_command_type = session_sql_command(getTable()->getSession());
 
700
 
 
701
  if (share->format.isTagFormat())
 
702
  {
 
703
    if (thread_locked)
 
704
      critical_section_exit();
 
705
    return 0;
 
706
  }
 
707
 
 
708
  if (slots.size() == 0)
 
709
  {
 
710
    if (thread_locked)
 
711
      critical_section_exit();
 
712
    return 0;
 
713
  }
 
714
 
 
715
  int err= -1;
 
716
  sort(slots.begin(), slots.end());
 
717
  vector< pair<off_t, off_t> >::iterator slot_iter= slots.begin();
 
718
  off_t write_start= 0;
 
719
  off_t write_end= 0;
 
720
  off_t file_buffer_start= 0;
 
721
 
 
722
  pthread_mutex_lock(&share->mutex);
 
723
 
 
724
  file_buff->init_buff(file_desc);
 
725
  if (openUpdateFile() < 0)
 
726
    goto error;
 
727
 
 
728
  while (file_buffer_start != -1)
 
729
  {
 
730
    bool in_hole= false;
 
731
 
 
732
    write_end= file_buff->end();
 
733
    if (slot_iter != slots.end() &&
 
734
      write_end >= slot_iter->first)
 
735
    {
 
736
      write_end= slot_iter->first;
 
737
      in_hole= true;
 
738
    }
 
739
 
 
740
    off_t write_length= write_end - write_start;
 
741
    if (write_in_all(update_file_desc,
 
742
               file_buff->ptr() + (write_start - file_buff->start()),
 
743
               write_length) != write_length)
 
744
      goto error;
 
745
 
 
746
    if (in_hole)
 
747
    {
 
748
      while (file_buff->end() <= slot_iter->second && file_buffer_start != -1)
 
749
        file_buffer_start= file_buff->read_next();
 
750
      write_start= slot_iter->second;
 
751
      ++slot_iter;
 
752
    }
 
753
    else
 
754
      write_start= write_end;
 
755
 
 
756
    if (write_end == file_buff->end())
 
757
      file_buffer_start= file_buff->read_next();
 
758
  }
 
759
  // close update file
 
760
  if (::fsync(update_file_desc) || 
 
761
      ::close(update_file_desc))
 
762
    goto error;
 
763
  share->update_file_opened= false;
 
764
 
 
765
  // close current file
 
766
  if (::close(file_desc))
 
767
    goto error;
 
768
  if (::rename(update_file_name.c_str(), share->format.getFileName().c_str()))
 
769
    goto error;
 
770
 
 
771
  share->needs_reopen= true;
 
772
 
 
773
error:
 
774
  err= errno;
 
775
  pthread_mutex_unlock(&share->mutex);
 
776
 
 
777
  if (thread_locked)
 
778
    critical_section_exit();
 
779
 
 
780
  return err;
 
781
}
 
782
 
 
783
void FilesystemCursor::recordToString(string& output)
 
784
{
 
785
  bool first= true;
 
786
  drizzled::String attribute;
 
787
  for (Field **field= getTable()->getFields(); *field; ++field)
 
788
  {
 
789
    if (first == true)
 
790
    {
 
791
      first= false;
 
792
    }
 
793
    else
 
794
    {
 
795
      output.append(share->format.getColSeparatorHead());
 
796
    }
 
797
 
 
798
    if (not (*field)->is_null())
 
799
    {
 
800
      (*field)->setReadSet();
 
801
      (*field)->val_str(&attribute, &attribute);
 
802
 
 
803
      output.append(attribute.ptr(), attribute.length());
 
804
    }
 
805
    else
 
806
    {
 
807
      output.append("0");
 
808
    }
 
809
  }
 
810
  output.append(share->format.getRowSeparatorHead());
 
811
}
 
812
 
 
813
int FilesystemCursor::doInsertRecord(unsigned char * buf)
 
814
{
 
815
  (void)buf;
 
816
 
 
817
  if (share->format.isTagFormat())
 
818
    return 0;
 
819
 
 
820
  sql_command_type = session_sql_command(getTable()->getSession());
 
821
 
 
822
  critical_section_enter();
 
823
 
 
824
  int err_write= 0;
 
825
  int err_close= 0;
 
826
 
 
827
  string output_line;
 
828
  recordToString(output_line);
 
829
 
 
830
  int fd= ::open(share->format.getFileName().c_str(), O_WRONLY | O_APPEND);
 
831
  if (fd < 0)
 
832
  {
 
833
    critical_section_exit();
 
834
    return ENOENT;
 
835
  }
 
836
 
 
837
  err_write= write_in_all(fd, output_line.c_str(), output_line.length());
 
838
  if (err_write < 0)
 
839
    err_write= errno;
 
840
  else
 
841
    err_write= 0;
 
842
 
 
843
  err_close= ::close(fd);
 
844
  if (err_close < 0)
 
845
    err_close= errno;
 
846
 
 
847
  critical_section_exit();
 
848
 
 
849
  if (err_write)
 
850
    return err_write;
 
851
  if (err_close)
 
852
    return err_close;
 
853
  return 0;
 
854
}
 
855
 
 
856
int FilesystemCursor::doUpdateRecord(const unsigned char *, unsigned char *)
 
857
{
 
858
  if (share->format.isTagFormat())
 
859
    return 0;
 
860
  if (openUpdateFile())
 
861
    return errno;
 
862
 
 
863
  // get the update information
 
864
  string str;
 
865
  recordToString(str);
 
866
 
 
867
  if (write_in_all(update_file_desc, str.c_str(), str.length()) < 0)
 
868
    return errno;
 
869
 
 
870
  addSlot();
 
871
 
 
872
  return 0;
 
873
}
 
874
 
 
875
void FilesystemCursor::addSlot()
 
876
{
 
877
  if (slots.size() > 0 && slots.back().second == current_position)
 
878
    slots.back().second= next_position;
 
879
  else
 
880
    slots.push_back(make_pair(current_position, next_position));
 
881
}
 
882
 
 
883
int FilesystemCursor::doDeleteRecord(const unsigned char *)
 
884
{
 
885
  if (share->format.isTagFormat())
 
886
    return 0;
 
887
  addSlot();
 
888
  return 0;
 
889
}
 
890
 
 
891
int FilesystemEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
 
892
{
 
893
  if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), FILESYSTEM_EXT))
 
894
    return errno;
 
895
  return 0;
 
896
}
 
897
 
 
898
bool FilesystemEngine::validateCreateTableOption(const std::string &key,
 
899
                                                 const std::string &state)
 
900
{
 
901
  return FormatInfo::validateOption(key, state);
 
902
}
 
903
 
 
904
int FilesystemEngine::doCreateTable(Session &,
 
905
                        Table&,
 
906
                        const drizzled::TableIdentifier &identifier,
 
907
                        drizzled::message::Table &proto)
 
908
{
 
909
  FormatInfo format;
 
910
  format.parseFromTable(&proto);
 
911
  if (format.isFileGiven())
 
912
  {
 
913
    int err= ::open(format.getFileName().c_str(), O_RDONLY);
 
914
    if (err < 0)
 
915
      return errno;
 
916
  }
 
917
 
 
918
  string new_path(identifier.getPath());
 
919
  new_path+= FILESYSTEM_EXT;
 
920
  fstream output(new_path.c_str(), ios::out | ios::binary);
 
921
 
 
922
  if (! output)
 
923
    return 1;
 
924
 
 
925
  if (! proto.SerializeToOstream(&output))
 
926
  {
 
927
    output.close();
 
928
    unlink(new_path.c_str());
 
929
    return 1;
 
930
  }
 
931
 
 
932
  return 0;
 
933
}
 
934
 
 
935
static FilesystemEngine *filesystem_engine= NULL;
 
936
 
 
937
static int filesystem_init_func(drizzled::module::Context &context)
 
938
{
 
939
  filesystem_engine= new FilesystemEngine("FILESYSTEM");
 
940
  context.add(filesystem_engine);
 
941
 
 
942
  return 0;
 
943
}
 
944
 
 
945
DRIZZLE_DECLARE_PLUGIN
 
946
{
 
947
  DRIZZLE_VERSION_ID,
 
948
  "FILESYSTEM",
 
949
  "1.0",
 
950
  "Zimin",
 
951
  "Filesystem Engine",
 
952
  PLUGIN_LICENSE_GPL,
 
953
  filesystem_init_func, /* Plugin Init */
 
954
  NULL,                       /* system variables                */
 
955
  NULL                        /* config options                  */
 
956
}
 
957
DRIZZLE_DECLARE_PLUGIN_END;