~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/filesystem_engine/filesystem_engine.cc

  • Committer: Stewart Smith
  • Date: 2008-11-21 16:06:07 UTC
  • mto: This revision was merged to the branch mainline in revision 593.
  • Revision ID: stewart@flamingspork.com-20081121160607-n6gdlt013spuo54r
remove mysql_frm_type
and fix engines to return correct value from delete_table when table doesn't exist.
(it should be ENOENT).

Also fix up some tests that manipulated frm files by hand. These tests are no longer valid and will need to be rewritten in the not too distant future.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
  Copyright (C) 2010 Zimin
3
 
 
4
 
  This program is free software; you can redistribute it and/or
5
 
  modify it under the terms of the GNU General Public License
6
 
  as published by the Free Software Foundation; either version 2
7
 
  of the License, or (at your option) any later version.
8
 
 
9
 
  This program is distributed in the hope that it will be useful,
10
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
  GNU General Public License for more details.
13
 
 
14
 
  You should have received a copy of the GNU General Public License
15
 
  along with this program; if not, write to the Free Software
16
 
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17
 
*/
18
 
 
19
 
#include "config.h"
20
 
#include <drizzled/field.h>
21
 
#include <drizzled/field/blob.h>
22
 
#include <drizzled/field/timestamp.h>
23
 
#include <drizzled/error.h>
24
 
#include <drizzled/table.h>
25
 
#include <drizzled/session.h>
26
 
#include "drizzled/internal/my_sys.h"
27
 
#include <google/protobuf/io/zero_copy_stream.h>
28
 
#include <google/protobuf/io/zero_copy_stream_impl.h>
29
 
 
30
 
#include "filesystem_engine.h"
31
 
#include "utility.h"
32
 
 
33
 
#include <fcntl.h>
34
 
 
35
 
#include <string>
36
 
#include <map>
37
 
#include <fstream>
38
 
#include <sstream>
39
 
#include <iostream>
40
 
#include <boost/algorithm/string.hpp>
41
 
 
42
 
using namespace std;
43
 
using namespace drizzled;
44
 
 
45
 
#define FILESYSTEM_EXT ".FST"
46
 
 
47
 
/* Stuff for shares */
48
 
pthread_mutex_t filesystem_mutex;
49
 
 
50
 
static const char *ha_filesystem_exts[] = {
51
 
  FILESYSTEM_EXT,
52
 
  NULL
53
 
};
54
 
 
55
 
class FilesystemEngine : public drizzled::plugin::StorageEngine
56
 
{
57
 
private:
58
 
  typedef std::map<string, FilesystemTableShare*> FilesystemMap;
59
 
  FilesystemMap fs_open_tables;
60
 
public:
61
 
  FilesystemEngine(const string& name_arg)
62
 
   : drizzled::plugin::StorageEngine(name_arg,
63
 
                                     HTON_NULL_IN_KEY |
64
 
                                     HTON_SKIP_STORE_LOCK |
65
 
                                     HTON_CAN_INDEX_BLOBS |
66
 
                                     HTON_AUTO_PART_KEY),
67
 
     fs_open_tables()
68
 
  {
69
 
    table_definition_ext= FILESYSTEM_EXT;
70
 
    pthread_mutex_init(&filesystem_mutex, MY_MUTEX_INIT_FAST);
71
 
  }
72
 
  virtual ~FilesystemEngine()
73
 
  {
74
 
    pthread_mutex_destroy(&filesystem_mutex);
75
 
  }
76
 
 
77
 
  virtual Cursor *create(TableShare &table)
78
 
  {
79
 
    return new FilesystemCursor(*this, table);
80
 
  }
81
 
 
82
 
  const char **bas_ext() const {
83
 
    return ha_filesystem_exts;
84
 
  }
85
 
 
86
 
  bool validateCreateTableOption(const std::string &key, const std::string &state);
87
 
 
88
 
  int doCreateTable(Session &,
89
 
                    Table &table_arg,
90
 
                    const drizzled::TableIdentifier &identifier,
91
 
                    drizzled::message::Table&);
92
 
 
93
 
  int doGetTableDefinition(Session& ,
94
 
                           const drizzled::TableIdentifier &,
95
 
                           drizzled::message::Table &);
96
 
 
97
 
  int doDropTable(Session&, const TableIdentifier &);
98
 
 
99
 
  /* operations on FilesystemTableShare */
100
 
  FilesystemTableShare *findOpenTable(const string table_name);
101
 
  void addOpenTable(const string &table_name, FilesystemTableShare *);
102
 
  void deleteOpenTable(const string &table_name);
103
 
 
104
 
  uint32_t max_keys()          const { return 0; }
105
 
  uint32_t max_key_parts()     const { return 0; }
106
 
  uint32_t max_key_length()    const { return 0; }
107
 
  bool doDoesTableExist(Session& , const TableIdentifier &);
108
 
  int doRenameTable(Session&, const TableIdentifier &, const TableIdentifier &);
109
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
110
 
                             const drizzled::SchemaIdentifier &schema_identifier,
111
 
                             drizzled::TableIdentifiers &set_of_identifiers);
112
 
private:
113
 
  void getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
114
 
                                   const drizzled::SchemaIdentifier &schema_identifier,
115
 
                                   drizzled::plugin::TableNameList *set_of_names,
116
 
                                   drizzled::TableIdentifiers *set_of_identifiers);
117
 
};
118
 
 
119
 
void FilesystemEngine::getTableNamesFromFilesystem(drizzled::CachedDirectory &directory,
120
 
                                                   const drizzled::SchemaIdentifier &schema_identifier,
121
 
                                                   drizzled::plugin::TableNameList *set_of_names,
122
 
                                                   drizzled::TableIdentifiers *set_of_identifiers)
123
 
{
124
 
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
125
 
 
126
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
127
 
      entry_iter != entries.end();
128
 
      ++entry_iter)
129
 
  {
130
 
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
131
 
    const string *filename= &entry->filename;
132
 
 
133
 
    assert(not filename->empty());
134
 
 
135
 
    string::size_type suffix_pos= filename->rfind('.');
136
 
 
137
 
    if (suffix_pos != string::npos &&
138
 
        boost::iequals(filename->substr(suffix_pos), FILESYSTEM_EXT) &&
139
 
        filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX))
140
 
    {
141
 
      char uname[NAME_LEN + 1];
142
 
      uint32_t file_name_len;
143
 
 
144
 
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
145
 
      uname[file_name_len - sizeof(FILESYSTEM_EXT) + 1]= '\0';
146
 
      if (set_of_names)
147
 
        set_of_names->insert(uname);
148
 
      if (set_of_identifiers)
149
 
        set_of_identifiers->push_back(TableIdentifier(schema_identifier, uname));
150
 
    }
151
 
  }
152
 
}
153
 
 
154
 
void FilesystemEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
155
 
                                             const drizzled::SchemaIdentifier &schema_identifier,
156
 
                                             drizzled::TableIdentifiers &set_of_identifiers)
157
 
{
158
 
  getTableNamesFromFilesystem(directory, schema_identifier, NULL, &set_of_identifiers);
159
 
}
160
 
 
161
 
int FilesystemEngine::doDropTable(Session &, const TableIdentifier &identifier)
162
 
{
163
 
  string new_path(identifier.getPath());
164
 
  new_path+= FILESYSTEM_EXT;
165
 
  int err= unlink(new_path.c_str());
166
 
  if (err)
167
 
  {
168
 
    err= errno;
169
 
  }
170
 
  return err;
171
 
}
172
 
 
173
 
bool FilesystemEngine::doDoesTableExist(Session &, const TableIdentifier &identifier)
174
 
{
175
 
  string proto_path(identifier.getPath());
176
 
  proto_path.append(FILESYSTEM_EXT);
177
 
 
178
 
  if (access(proto_path.c_str(), F_OK))
179
 
  {
180
 
    return false;
181
 
  }
182
 
 
183
 
  return true;
184
 
}
185
 
 
186
 
FilesystemTableShare *FilesystemEngine::findOpenTable(const string table_name)
187
 
{
188
 
  FilesystemMap::iterator find_iter=
189
 
    fs_open_tables.find(table_name);
190
 
 
191
 
  if (find_iter != fs_open_tables.end())
192
 
    return (*find_iter).second;
193
 
  else
194
 
    return NULL;
195
 
}
196
 
 
197
 
void FilesystemEngine::addOpenTable(const string &table_name, FilesystemTableShare *share)
198
 
{
199
 
  fs_open_tables[table_name]= share;
200
 
}
201
 
 
202
 
void FilesystemEngine::deleteOpenTable(const string &table_name)
203
 
{
204
 
  fs_open_tables.erase(table_name);
205
 
}
206
 
 
207
 
static int parseTaggedFile(const FormatInfo &fi, vector< map<string, string> > &v)
208
 
{
209
 
  int filedesc= ::open(fi.getFileName().c_str(), O_RDONLY);
210
 
  if (filedesc < 0)
211
 
    return errno;
212
 
 
213
 
  boost::scoped_ptr<TransparentFile> filebuffer(new TransparentFile);
214
 
  filebuffer->init_buff(filedesc);
215
 
 
216
 
  bool last_line_empty= false;
217
 
  map<string, string> kv;
218
 
  int pos= 0;
219
 
  string line;
220
 
  while (1)
221
 
  {
222
 
    char ch= filebuffer->get_value(pos);
223
 
    if (ch == '\0')
224
 
    {
225
 
      if (!last_line_empty)
226
 
      {
227
 
        v.push_back(kv);
228
 
        kv.clear();
229
 
      }
230
 
      break;
231
 
    }
232
 
    ++pos;
233
 
 
234
 
    if (!fi.isRowSeparator(ch))
235
 
    {
236
 
      line.push_back(ch);
237
 
      continue;
238
 
    }
239
 
 
240
 
    // if we have a new empty line,
241
 
    // it means we got the end of a section, push it to vector
242
 
    if (line.empty())
243
 
    {
244
 
      if (!last_line_empty)
245
 
      {
246
 
        v.push_back(kv);
247
 
        kv.clear();
248
 
      }
249
 
      last_line_empty= true;
250
 
      continue;
251
 
    }
252
 
 
253
 
    // parse the line
254
 
    vector<string> sv, svcopy;
255
 
    boost::split(sv, line, boost::is_any_of(fi.getColSeparator()));
256
 
    for (vector<string>::iterator iter= sv.begin();
257
 
         iter != sv.end();
258
 
         ++iter)
259
 
    {
260
 
      if (!iter->empty())
261
 
        svcopy.push_back(*iter);
262
 
    }
263
 
 
264
 
    // the first splitted string as key,
265
 
    // and the second splitted string as value.
266
 
    string key(svcopy[0]);
267
 
    boost::trim(key);
268
 
    if (svcopy.size() >= 2)
269
 
    {
270
 
      string value(svcopy[1]);
271
 
      boost::trim(value);
272
 
      kv[key]= value;
273
 
    }
274
 
    else if (svcopy.size() >= 1)
275
 
      kv[key]= "";
276
 
 
277
 
    last_line_empty= false;
278
 
    line.clear();
279
 
  }
280
 
  close(filedesc);
281
 
  return 0;
282
 
}
283
 
 
284
 
int FilesystemEngine::doGetTableDefinition(Session &,
285
 
                                           const drizzled::TableIdentifier &identifier,
286
 
                                           drizzled::message::Table &table_proto)
287
 
{
288
 
  string new_path(identifier.getPath());
289
 
  new_path.append(FILESYSTEM_EXT);
290
 
 
291
 
  int fd= ::open(new_path.c_str(), O_RDONLY);
292
 
  if (fd < 0)
293
 
    return ENOENT;
294
 
 
295
 
  google::protobuf::io::ZeroCopyInputStream* input=
296
 
    new google::protobuf::io::FileInputStream(fd);
297
 
 
298
 
  if (not input)
299
 
    return HA_ERR_CRASHED_ON_USAGE;
300
 
 
301
 
  if (not table_proto.ParseFromZeroCopyStream(input))
302
 
  {
303
 
    close(fd);
304
 
    delete input;
305
 
    if (not table_proto.IsInitialized())
306
 
    {
307
 
      my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
308
 
               table_proto.InitializationErrorString().c_str());
309
 
      return ER_CORRUPT_TABLE_DEFINITION;
310
 
    }
311
 
 
312
 
    return HA_ERR_CRASHED_ON_USAGE;
313
 
  }
314
 
  delete input;
315
 
 
316
 
  // if the file is a tagged file such as /proc/meminfo
317
 
  // then columns of this table are added dynamically here.
318
 
  FormatInfo format;
319
 
  format.parseFromTable(&table_proto);
320
 
  if (!format.isTagFormat() || !format.isFileGiven()) {
321
 
    close(fd);
322
 
    return EEXIST;
323
 
  }
324
 
 
325
 
  vector< map<string, string> > vm;
326
 
  if (parseTaggedFile(format, vm) != 0) {
327
 
    close(fd);
328
 
 
329
 
    return EEXIST;
330
 
  }
331
 
  if (vm.size() == 0) {
332
 
    close(fd);
333
 
    return EEXIST;
334
 
  }
335
 
 
336
 
  // we don't care what user provides, just clear them all
337
 
  table_proto.clear_field();
338
 
  // we take the first section as sample
339
 
  map<string, string> kv= vm[0];
340
 
  for (map<string, string>::iterator iter= kv.begin();
341
 
       iter != kv.end();
342
 
       ++iter)
343
 
  {
344
 
    // add columns to table proto
345
 
    message::Table::Field *field= table_proto.add_field();
346
 
    field->set_name(iter->first);
347
 
    field->set_type(drizzled::message::Table::Field::VARCHAR);
348
 
    message::Table::Field::StringFieldOptions *stringoption= field->mutable_string_options();
349
 
    stringoption->set_length(iter->second.length() + 1);
350
 
  }
351
 
 
352
 
  close(fd);
353
 
  return EEXIST;
354
 
}
355
 
 
356
 
FilesystemTableShare::FilesystemTableShare(const string table_name_arg)
357
 
  : use_count(0), table_name(table_name_arg),
358
 
  update_file_opened(false),
359
 
  needs_reopen(false)
360
 
{
361
 
}
362
 
 
363
 
FilesystemTableShare::~FilesystemTableShare()
364
 
{
365
 
  pthread_mutex_destroy(&mutex);
366
 
}
367
 
 
368
 
FilesystemTableShare *FilesystemCursor::get_share(const char *table_name)
369
 
{
370
 
  Guard g(filesystem_mutex);
371
 
 
372
 
  FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(engine);
373
 
  share= a_engine->findOpenTable(table_name);
374
 
 
375
 
  /*
376
 
    If share is not present in the hash, create a new share and
377
 
    initialize its members.
378
 
  */
379
 
  if (share == NULL)
380
 
  {
381
 
    share= new (nothrow) FilesystemTableShare(table_name);
382
 
    if (share == NULL)
383
 
    {
384
 
      return NULL;
385
 
    }
386
 
 
387
 
    share->format.parseFromTable(table->getShare()->getTableProto());
388
 
    if (!share->format.isFileGiven())
389
 
    {
390
 
      return NULL;
391
 
    }
392
 
    /*
393
 
     * for taggered file such as /proc/meminfo,
394
 
     * we pre-process it first, and store the parsing result in a map.
395
 
     */
396
 
    if (share->format.isTagFormat())
397
 
    {
398
 
      if (parseTaggedFile(share->format, share->vm) != 0)
399
 
      {
400
 
        return NULL;
401
 
      }
402
 
    }
403
 
    a_engine->addOpenTable(share->table_name, share);
404
 
 
405
 
    pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
406
 
  }
407
 
  share->use_count++;
408
 
 
409
 
  return share;
410
 
}
411
 
 
412
 
void FilesystemCursor::free_share()
413
 
{
414
 
  Guard g(filesystem_mutex);
415
 
 
416
 
  if (!--share->use_count){
417
 
    FilesystemEngine *a_engine= static_cast<FilesystemEngine *>(engine);
418
 
    a_engine->deleteOpenTable(share->table_name);
419
 
    pthread_mutex_destroy(&share->mutex);
420
 
    delete share;
421
 
  }
422
 
}
423
 
 
424
 
void FilesystemCursor::critical_section_enter()
425
 
{
426
 
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
427
 
      sql_command_type == SQLCOM_UPDATE ||
428
 
      sql_command_type == SQLCOM_DELETE ||
429
 
      sql_command_type == SQLCOM_INSERT ||
430
 
      sql_command_type == SQLCOM_INSERT_SELECT ||
431
 
      sql_command_type == SQLCOM_REPLACE ||
432
 
      sql_command_type == SQLCOM_REPLACE_SELECT)
433
 
    share->filesystem_lock.scan_update_begin();
434
 
  else
435
 
    share->filesystem_lock.scan_begin();
436
 
 
437
 
  thread_locked = true;
438
 
}
439
 
 
440
 
void FilesystemCursor::critical_section_exit()
441
 
{
442
 
  if (sql_command_type == SQLCOM_ALTER_TABLE ||
443
 
      sql_command_type == SQLCOM_UPDATE ||
444
 
      sql_command_type == SQLCOM_DELETE ||
445
 
      sql_command_type == SQLCOM_INSERT ||
446
 
      sql_command_type == SQLCOM_INSERT_SELECT ||
447
 
      sql_command_type == SQLCOM_REPLACE ||
448
 
      sql_command_type == SQLCOM_REPLACE_SELECT)
449
 
    share->filesystem_lock.scan_update_end();
450
 
  else
451
 
    share->filesystem_lock.scan_end();
452
 
 
453
 
  thread_locked = false;
454
 
}
455
 
 
456
 
FilesystemCursor::FilesystemCursor(drizzled::plugin::StorageEngine &engine_arg, TableShare &table_arg)
457
 
  : Cursor(engine_arg, table_arg),
458
 
    file_buff(new TransparentFile),
459
 
    thread_locked(false)
460
 
{
461
 
}
462
 
 
463
 
int FilesystemCursor::doOpen(const drizzled::TableIdentifier &identifier, int, uint32_t)
464
 
{
465
 
  if (!(share= get_share(identifier.getPath().c_str())))
466
 
    return ENOENT;
467
 
 
468
 
  file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
469
 
  if (file_desc < 0)
470
 
  {
471
 
    free_share();
472
 
    return ER_CANT_OPEN_FILE;
473
 
  }
474
 
 
475
 
  ref_length= sizeof(off_t);
476
 
  return 0;
477
 
}
478
 
 
479
 
int FilesystemCursor::close(void)
480
 
{
481
 
  int err= ::close(file_desc);
482
 
  if (err < 0)
483
 
    err= errno;
484
 
  free_share();
485
 
  return err;
486
 
}
487
 
 
488
 
int FilesystemCursor::doStartTableScan(bool)
489
 
{
490
 
  sql_command_type = session_sql_command(table->getSession());
491
 
 
492
 
  if (thread_locked)
493
 
    critical_section_exit();
494
 
  critical_section_enter();
495
 
 
496
 
  if (share->format.isTagFormat())
497
 
  {
498
 
    tag_depth= 0;
499
 
    return 0;
500
 
  }
501
 
 
502
 
  current_position= 0;
503
 
  next_position= 0;
504
 
  slots.clear();
505
 
  if (share->needs_reopen)
506
 
  {
507
 
    file_desc= ::open(share->format.getFileName().c_str(), O_RDONLY);
508
 
    if (file_desc < 0)
509
 
      return HA_ERR_CRASHED_ON_USAGE;
510
 
    share->needs_reopen= false;
511
 
  }
512
 
  file_buff->init_buff(file_desc);
513
 
  return 0;
514
 
}
515
 
 
516
 
int FilesystemCursor::find_current_row(unsigned char *buf)
517
 
{
518
 
  ptrdiff_t row_offset= buf - table->record[0];
519
 
 
520
 
  next_position= current_position;
521
 
 
522
 
  string content;
523
 
  bool line_done= false;
524
 
  bool line_blank= true;
525
 
  Field **field= table->getFields();
526
 
  for (; !line_done && *field; ++next_position)
527
 
  {
528
 
    char ch= file_buff->get_value(next_position);
529
 
    if (ch == '\0')
530
 
      return HA_ERR_END_OF_FILE;
531
 
 
532
 
    if (share->format.isEscapedChar(ch))
533
 
    {
534
 
      // read next character
535
 
      ch= file_buff->get_value(++next_position);
536
 
      if (ch == '\0')
537
 
        return HA_ERR_END_OF_FILE;
538
 
 
539
 
      content.push_back(FormatInfo::getEscapedChar(ch));
540
 
 
541
 
      continue;
542
 
    }
543
 
 
544
 
    // if we find separator
545
 
    bool is_row= share->format.isRowSeparator(ch);
546
 
    bool is_col= share->format.isColSeparator(ch);
547
 
    if (content.empty())
548
 
    {
549
 
      if (share->format.isSeparatorModeGeneral() && is_row && line_blank)
550
 
        continue;
551
 
      if (share->format.isSeparatorModeWeak() && is_col)
552
 
        continue;
553
 
    }
554
 
 
555
 
    if (is_row || is_col)
556
 
    {
557
 
      (*field)->move_field_offset(row_offset);
558
 
      if (!content.empty())
559
 
      {
560
 
        (*field)->set_notnull();
561
 
        if ((*field)->isReadSet() || (*field)->isWriteSet())
562
 
        {
563
 
          (*field)->setWriteSet();
564
 
          (*field)->store(content.c_str(),
565
 
                          (uint32_t)content.length(),
566
 
                          &my_charset_bin,
567
 
                          CHECK_FIELD_WARN);
568
 
        }
569
 
        else
570
 
          (*field)->set_default();
571
 
      }
572
 
      else
573
 
        (*field)->set_null();
574
 
      (*field)->move_field_offset(-row_offset);
575
 
 
576
 
      content.clear();
577
 
      ++field;
578
 
 
579
 
      line_blank= false;
580
 
      if (is_row)
581
 
        line_done= true;
582
 
 
583
 
      continue;
584
 
    }
585
 
    content.push_back(ch);
586
 
  }
587
 
  if (line_done)
588
 
  {
589
 
    for (; *field; ++field)
590
 
    {
591
 
      (*field)->move_field_offset(row_offset);
592
 
      (*field)->set_notnull();
593
 
      (*field)->set_default();
594
 
      (*field)->move_field_offset(-row_offset);
595
 
    }
596
 
  }
597
 
  else
598
 
  {
599
 
    // eat up characters when line_done
600
 
    while (!line_done)
601
 
    {
602
 
      char ch= file_buff->get_value(next_position);
603
 
      if (share->format.isRowSeparator(ch))
604
 
        line_done= true;
605
 
      ++next_position;
606
 
    }
607
 
  }
608
 
  return 0;
609
 
}
610
 
 
611
 
int FilesystemCursor::rnd_next(unsigned char *buf)
612
 
{
613
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
614
 
  if (share->format.isTagFormat())
615
 
  {
616
 
    if (tag_depth >= share->vm.size())
617
 
      return HA_ERR_END_OF_FILE;
618
 
 
619
 
    ptrdiff_t row_offset= buf - table->record[0];
620
 
    for (Field **field= table->getFields(); *field; field++)
621
 
    {
622
 
      string key((*field)->field_name);
623
 
      string content= share->vm[tag_depth][key];
624
 
 
625
 
      (*field)->move_field_offset(row_offset);
626
 
      if (!content.empty())
627
 
      {
628
 
        (*field)->set_notnull();
629
 
        if ((*field)->isReadSet() || (*field)->isWriteSet())
630
 
        {
631
 
          (*field)->setWriteSet();
632
 
          (*field)->store(content.c_str(),
633
 
                          (uint32_t)content.length(),
634
 
                          &my_charset_bin,
635
 
                          CHECK_FIELD_WARN);
636
 
        }
637
 
        else
638
 
        {
639
 
          (*field)->set_default();
640
 
        }
641
 
      }
642
 
      else
643
 
      {
644
 
        (*field)->set_null();
645
 
      }
646
 
      (*field)->move_field_offset(-row_offset);
647
 
    }
648
 
    ++tag_depth;
649
 
    return 0;
650
 
  }
651
 
  // normal file
652
 
  current_position= next_position;
653
 
  return find_current_row(buf);
654
 
}
655
 
 
656
 
void FilesystemCursor::position(const unsigned char *)
657
 
{
658
 
  *reinterpret_cast<off_t *>(ref)= current_position;
659
 
}
660
 
 
661
 
int FilesystemCursor::rnd_pos(unsigned char * buf, unsigned char *pos)
662
 
{
663
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_count);
664
 
  current_position= *reinterpret_cast<off_t *>(pos);
665
 
  return find_current_row(buf);
666
 
}
667
 
 
668
 
int FilesystemCursor::info(uint32_t)
669
 
{
670
 
  if (stats.records < 2)
671
 
    stats.records= 2;
672
 
  return 0;
673
 
}
674
 
 
675
 
int FilesystemCursor::openUpdateFile()
676
 
{
677
 
  if (!share->update_file_opened)
678
 
  {
679
 
    struct stat st;
680
 
    if (stat(share->format.getFileName().c_str(), &st) < 0)
681
 
      return -1;
682
 
    update_file_name= share->format.getFileName();
683
 
    update_file_name.append(".UPDATE");
684
 
    unlink(update_file_name.c_str());
685
 
    update_file_desc= ::open(update_file_name.c_str(),
686
 
                             O_RDWR | O_CREAT | O_TRUNC,
687
 
                             st.st_mode);
688
 
    if (update_file_desc < 0)
689
 
    {
690
 
      return -1;
691
 
    }
692
 
    share->update_file_opened= true;
693
 
  }
694
 
  return 0;
695
 
}
696
 
 
697
 
int FilesystemCursor::doEndTableScan()
698
 
{
699
 
  sql_command_type = session_sql_command(table->getSession());
700
 
 
701
 
  if (share->format.isTagFormat())
702
 
  {
703
 
    if (thread_locked)
704
 
      critical_section_exit();
705
 
    return 0;
706
 
  }
707
 
 
708
 
  if (slots.size() == 0)
709
 
  {
710
 
    if (thread_locked)
711
 
      critical_section_exit();
712
 
    return 0;
713
 
  }
714
 
 
715
 
  int err= -1;
716
 
  sort(slots.begin(), slots.end());
717
 
  vector< pair<off_t, off_t> >::iterator slot_iter= slots.begin();
718
 
  off_t write_start= 0;
719
 
  off_t write_end= 0;
720
 
  off_t file_buffer_start= 0;
721
 
 
722
 
  pthread_mutex_lock(&share->mutex);
723
 
 
724
 
  file_buff->init_buff(file_desc);
725
 
  if (openUpdateFile() < 0)
726
 
    goto error;
727
 
 
728
 
  while (file_buffer_start != -1)
729
 
  {
730
 
    bool in_hole= false;
731
 
 
732
 
    write_end= file_buff->end();
733
 
    if (slot_iter != slots.end() &&
734
 
      write_end >= slot_iter->first)
735
 
    {
736
 
      write_end= slot_iter->first;
737
 
      in_hole= true;
738
 
    }
739
 
 
740
 
    off_t write_length= write_end - write_start;
741
 
    if (write_in_all(update_file_desc,
742
 
               file_buff->ptr() + (write_start - file_buff->start()),
743
 
               write_length) != write_length)
744
 
      goto error;
745
 
 
746
 
    if (in_hole)
747
 
    {
748
 
      while (file_buff->end() <= slot_iter->second && file_buffer_start != -1)
749
 
        file_buffer_start= file_buff->read_next();
750
 
      write_start= slot_iter->second;
751
 
      ++slot_iter;
752
 
    }
753
 
    else
754
 
      write_start= write_end;
755
 
 
756
 
    if (write_end == file_buff->end())
757
 
      file_buffer_start= file_buff->read_next();
758
 
  }
759
 
  // close update file
760
 
  if (::fsync(update_file_desc) || 
761
 
      ::close(update_file_desc))
762
 
    goto error;
763
 
  share->update_file_opened= false;
764
 
 
765
 
  // close current file
766
 
  if (::close(file_desc))
767
 
    goto error;
768
 
  if (::rename(update_file_name.c_str(), share->format.getFileName().c_str()))
769
 
    goto error;
770
 
 
771
 
  share->needs_reopen= true;
772
 
 
773
 
error:
774
 
  err= errno;
775
 
  pthread_mutex_unlock(&share->mutex);
776
 
 
777
 
  if (thread_locked)
778
 
    critical_section_exit();
779
 
 
780
 
  return err;
781
 
}
782
 
 
783
 
void FilesystemCursor::recordToString(string& output)
784
 
{
785
 
  bool first= true;
786
 
  drizzled::String attribute;
787
 
  for (Field **field= table->getFields(); *field; ++field)
788
 
  {
789
 
    if (first == true)
790
 
    {
791
 
      first= false;
792
 
    }
793
 
    else
794
 
    {
795
 
      output.append(share->format.getColSeparatorHead());
796
 
    }
797
 
 
798
 
    if (not (*field)->is_null())
799
 
    {
800
 
      (*field)->setReadSet();
801
 
      (*field)->val_str(&attribute, &attribute);
802
 
 
803
 
      output.append(attribute.ptr(), attribute.length());
804
 
    }
805
 
    else
806
 
    {
807
 
      output.append("0");
808
 
    }
809
 
  }
810
 
  output.append(share->format.getRowSeparatorHead());
811
 
}
812
 
 
813
 
int FilesystemCursor::doInsertRecord(unsigned char * buf)
814
 
{
815
 
  (void)buf;
816
 
 
817
 
  if (share->format.isTagFormat())
818
 
    return 0;
819
 
 
820
 
  sql_command_type = session_sql_command(table->getSession());
821
 
 
822
 
  critical_section_enter();
823
 
 
824
 
  int err_write= 0;
825
 
  int err_close= 0;
826
 
 
827
 
  string output_line;
828
 
  recordToString(output_line);
829
 
 
830
 
  int fd= ::open(share->format.getFileName().c_str(), O_WRONLY | O_APPEND);
831
 
  if (fd < 0)
832
 
  {
833
 
    critical_section_exit();
834
 
    return ENOENT;
835
 
  }
836
 
 
837
 
  err_write= write_in_all(fd, output_line.c_str(), output_line.length());
838
 
  if (err_write < 0)
839
 
    err_write= errno;
840
 
  else
841
 
    err_write= 0;
842
 
 
843
 
  err_close= ::close(fd);
844
 
  if (err_close < 0)
845
 
    err_close= errno;
846
 
 
847
 
  critical_section_exit();
848
 
 
849
 
  if (err_write)
850
 
    return err_write;
851
 
  if (err_close)
852
 
    return err_close;
853
 
  return 0;
854
 
}
855
 
 
856
 
int FilesystemCursor::doUpdateRecord(const unsigned char *, unsigned char *)
857
 
{
858
 
  if (share->format.isTagFormat())
859
 
    return 0;
860
 
  if (openUpdateFile())
861
 
    return errno;
862
 
 
863
 
  // get the update information
864
 
  string str;
865
 
  recordToString(str);
866
 
 
867
 
  if (write_in_all(update_file_desc, str.c_str(), str.length()) < 0)
868
 
    return errno;
869
 
 
870
 
  addSlot();
871
 
 
872
 
  return 0;
873
 
}
874
 
 
875
 
void FilesystemCursor::addSlot()
876
 
{
877
 
  if (slots.size() > 0 && slots.back().second == current_position)
878
 
    slots.back().second= next_position;
879
 
  else
880
 
    slots.push_back(make_pair(current_position, next_position));
881
 
}
882
 
 
883
 
int FilesystemCursor::doDeleteRecord(const unsigned char *)
884
 
{
885
 
  if (share->format.isTagFormat())
886
 
    return 0;
887
 
  addSlot();
888
 
  return 0;
889
 
}
890
 
 
891
 
int FilesystemEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
892
 
{
893
 
  if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), FILESYSTEM_EXT))
894
 
    return errno;
895
 
  return 0;
896
 
}
897
 
 
898
 
bool FilesystemEngine::validateCreateTableOption(const std::string &key,
899
 
                                                 const std::string &state)
900
 
{
901
 
  return FormatInfo::validateOption(key, state);
902
 
}
903
 
 
904
 
int FilesystemEngine::doCreateTable(Session &,
905
 
                        Table&,
906
 
                        const drizzled::TableIdentifier &identifier,
907
 
                        drizzled::message::Table &proto)
908
 
{
909
 
  FormatInfo format;
910
 
  format.parseFromTable(&proto);
911
 
  if (format.isFileGiven())
912
 
  {
913
 
    int err= ::open(format.getFileName().c_str(), O_RDONLY);
914
 
    if (err < 0)
915
 
      return errno;
916
 
  }
917
 
 
918
 
  string new_path(identifier.getPath());
919
 
  new_path+= FILESYSTEM_EXT;
920
 
  fstream output(new_path.c_str(), ios::out | ios::binary);
921
 
 
922
 
  if (! output)
923
 
    return 1;
924
 
 
925
 
  if (! proto.SerializeToOstream(&output))
926
 
  {
927
 
    output.close();
928
 
    unlink(new_path.c_str());
929
 
    return 1;
930
 
  }
931
 
 
932
 
  return 0;
933
 
}
934
 
 
935
 
static FilesystemEngine *filesystem_engine= NULL;
936
 
 
937
 
static int filesystem_init_func(drizzled::module::Context &context)
938
 
{
939
 
  filesystem_engine= new FilesystemEngine("FILESYSTEM");
940
 
  context.add(filesystem_engine);
941
 
 
942
 
  return 0;
943
 
}
944
 
 
945
 
DRIZZLE_DECLARE_PLUGIN
946
 
{
947
 
  DRIZZLE_VERSION_ID,
948
 
  "FILESYSTEM",
949
 
  "1.0",
950
 
  "Zimin",
951
 
  "Filesystem Engine",
952
 
  PLUGIN_LICENSE_GPL,
953
 
  filesystem_init_func, /* Plugin Init */
954
 
  NULL,                       /* system variables                */
955
 
  NULL                        /* config options                  */
956
 
}
957
 
DRIZZLE_DECLARE_PLUGIN_END;