~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
   Copyright (C) 2010 Brian Aker
 
3
 
 
4
  This program is free software; you can redistribute it and/or modify
 
5
  it under the terms of the GNU General Public License as published by
 
6
  the Free Software Foundation; version 2 of the License.
 
7
 
 
8
  This program is distributed in the hope that it will be useful,
 
9
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
  GNU General Public License for more details.
 
12
 
 
13
  You should have received a copy of the GNU General Public License
 
14
  along with this program; if not, write to the Free Software
 
15
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
16
 
 
17
 
 
18
#include "config.h"
 
19
 
 
20
#include "plugin/archive/archive_engine.h"
 
21
#include <memory>
 
22
#include <boost/scoped_ptr.hpp>
 
23
 
 
24
using namespace std;
 
25
using namespace drizzled;
 
26
 
 
27
 
 
28
/*
 
29
  First, if you want to understand storage engines you should look at
 
30
  ha_example.cc and ha_example.h.
 
31
 
 
32
  This example was written as a test case for a customer who needed
 
33
  a storage engine without indexes that could compress data very well.
 
34
  So, welcome to a completely compressed storage engine. This storage
 
35
  engine only does inserts. No replace, deletes, or updates. All reads are
 
36
  complete table scans. Compression is done through a combination of packing
 
37
  and making use of the zlib library
 
38
 
 
39
  We keep a file pointer open for each instance of ha_archive for each read
 
40
  but for writes we keep one open file handle just for that. We flush it
 
41
  only if we have a read occur. azip handles compressing lots of records
 
42
  at once much better then doing lots of little records between writes.
 
43
  It is possible to not lock on writes but this would then mean we couldn't
 
44
  handle bulk inserts as well (that is if someone was trying to read at
 
45
  the same time since we would want to flush).
 
46
 
 
47
  A "meta" file is kept alongside the data file. This file serves two purpose.
 
48
  The first purpose is to track the number of rows in the table. The second
 
49
  purpose is to determine if the table was closed properly or not. When the
 
50
  meta file is first opened it is marked as dirty. It is opened when the table
 
51
  itself is opened for writing. When the table is closed the new count for rows
 
52
  is written to the meta file and the file is marked as clean. If the meta file
 
53
  is opened and it is marked as dirty, it is assumed that a crash occured. At
 
54
  this point an error occurs and the user is told to rebuild the file.
 
55
  A rebuild scans the rows and rewrites the meta file. If corruption is found
 
56
  in the data file then the meta file is not repaired.
 
57
 
 
58
  At some point a recovery method for such a drastic case needs to be divised.
 
59
 
 
60
  Locks are row level, and you will get a consistant read.
 
61
 
 
62
  For performance as far as table scans go it is quite fast. I don't have
 
63
  good numbers but locally it has out performed both Innodb and MyISAM. For
 
64
  Innodb the question will be if the table can be fit into the buffer
 
65
  pool. For MyISAM its a question of how much the file system caches the
 
66
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
 
67
  doesn't have enough memory to cache entire table that archive turns out
 
68
  to be any faster.
 
69
 
 
70
  Examples between MyISAM (packed) and Archive.
 
71
 
 
72
  Table with 76695844 identical rows:
 
73
  29680807 a_archive.ARZ
 
74
  920350317 a.MYD
 
75
 
 
76
 
 
77
  Table with 8991478 rows (all of Slashdot's comments):
 
78
  1922964506 comment_archive.ARZ
 
79
  2944970297 comment_text.MYD
 
80
 
 
81
 
 
82
  TODO:
 
83
   Allow users to set compression level.
 
84
   Allow adjustable block size.
 
85
   Implement versioning, should be easy.
 
86
   Allow for errors, find a way to mark bad rows.
 
87
   Add optional feature so that rows can be flushed at interval (which will cause less
 
88
     compression but may speed up ordered searches).
 
89
   Checkpoint the meta file to allow for faster rebuilds.
 
90
   Option to allow for dirty reads, this would lower the sync calls, which would make
 
91
     inserts a lot faster, but would mean highly arbitrary reads.
 
92
 
 
93
    -Brian
 
94
*/
 
95
 
 
96
/* When the engine starts up set the first version */
 
97
static uint64_t global_version= 1;
 
98
 
 
99
// We use this to find out the state of the archive aio option.
 
100
extern bool archive_aio_state(void);
 
101
 
 
102
/*
 
103
  Number of rows that will force a bulk insert.
 
104
*/
 
105
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
 
106
 
 
107
/*
 
108
  Size of header used for row
 
109
*/
 
110
#define ARCHIVE_ROW_HEADER_SIZE 4
 
111
 
 
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
 
113
{
 
114
  ArchiveMap::iterator find_iter=
 
115
    archive_open_tables.find(table_name);
 
116
 
 
117
  if (find_iter != archive_open_tables.end())
 
118
    return (*find_iter).second;
 
119
  else
 
120
    return NULL;
 
121
}
 
122
 
 
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
 
124
{
 
125
  archive_open_tables[table_name]= share;
 
126
}
 
127
 
 
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
 
129
{
 
130
  archive_open_tables.erase(table_name);
 
131
}
 
132
 
 
133
 
 
134
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
 
135
{
 
136
  string new_path(identifier.getPath());
 
137
 
 
138
  new_path+= ARZ;
 
139
 
 
140
  int error= unlink(new_path.c_str());
 
141
 
 
142
  if (error != 0)
 
143
  {
 
144
    error= errno= errno;
 
145
  }
 
146
 
 
147
  return error;
 
148
}
 
149
 
 
150
int ArchiveEngine::doGetTableDefinition(Session&,
 
151
                                        const identifier::Table &identifier,
 
152
                                        drizzled::message::Table &table_proto)
 
153
{
 
154
  struct stat stat_info;
 
155
  int error= ENOENT;
 
156
  string proto_path;
 
157
 
 
158
  proto_path.reserve(FN_REFLEN);
 
159
  proto_path.assign(identifier.getPath());
 
160
 
 
161
  proto_path.append(ARZ);
 
162
 
 
163
  if (stat(proto_path.c_str(),&stat_info))
 
164
    return errno;
 
165
  else
 
166
    error= EEXIST;
 
167
 
 
168
  {
 
169
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
 
170
    char* proto_string;
 
171
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
172
      return HA_ERR_CRASHED_ON_USAGE;
 
173
 
 
174
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
 
175
    if (proto_string == NULL)
 
176
    {
 
177
      azclose(proto_stream.get());
 
178
      return ENOMEM;
 
179
    }
 
180
 
 
181
    azread_frm(proto_stream.get(), proto_string);
 
182
 
 
183
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
 
184
      error= HA_ERR_CRASHED_ON_USAGE;
 
185
 
 
186
    azclose(proto_stream.get());
 
187
    free(proto_string);
 
188
  }
 
189
 
 
190
  /* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
 
191
     we do not rewrite the table proto (as it's wedged in the file header)
 
192
  */
 
193
  table_proto.set_schema(identifier.getSchemaName());
 
194
  table_proto.set_name(identifier.getTableName());
 
195
 
 
196
  return error;
 
197
}
 
198
 
 
199
 
 
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
 
201
                       Table &table_arg)
 
202
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
 
203
{
 
204
  /* Set our original buffer from pre-allocated memory */
 
205
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
 
206
 
 
207
  /* The size of the offset value we will use for position() */
 
208
  ref_length= sizeof(internal::my_off_t);
 
209
  archive_reader_open= false;
 
210
}
 
211
 
 
212
/*
 
213
  This method reads the header of a datafile and returns whether or not it was successful.
 
214
*/
 
215
int ha_archive::read_data_header(azio_stream *file_to_read)
 
216
{
 
217
  if (azread_init(file_to_read) == -1)
 
218
    return(HA_ERR_CRASHED_ON_USAGE);
 
219
 
 
220
  if (file_to_read->version >= 3)
 
221
    return(0);
 
222
 
 
223
  return(1);
 
224
}
 
225
 
 
226
ArchiveShare::ArchiveShare():
 
227
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
 
228
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
 
229
{
 
230
  assert(1);
 
231
}
 
232
 
 
233
ArchiveShare::ArchiveShare(const char *name):
 
234
  use_count(0), archive_write_open(false), dirty(false), crashed(false),
 
235
  mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
 
236
{
 
237
  memset(&archive_write, 0, sizeof(azio_stream));     /* Archive file we are working with */
 
238
  table_name.append(name);
 
239
  data_file_name.assign(table_name);
 
240
  data_file_name.append(ARZ);
 
241
  /*
 
242
    We will use this lock for rows.
 
243
  */
 
244
  pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
 
245
}
 
246
 
 
247
ArchiveShare::~ArchiveShare()
 
248
{
 
249
  _lock.deinit();
 
250
  pthread_mutex_destroy(&_mutex);
 
251
  /*
 
252
    We need to make sure we don't reset the crashed state.
 
253
    If we open a crashed file, wee need to close it as crashed unless
 
254
    it has been repaired.
 
255
    Since we will close the data down after this, we go on and count
 
256
    the flush on close;
 
257
  */
 
258
  if (archive_write_open == true)
 
259
    (void)azclose(&archive_write);
 
260
}
 
261
 
 
262
bool ArchiveShare::prime(uint64_t *auto_increment)
 
263
{
 
264
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
 
265
 
 
266
  /*
 
267
    We read the meta file, but do not mark it dirty. Since we are not
 
268
    doing a write we won't mark it dirty (and we won't open it for
 
269
    anything but reading... open it for write and we will generate null
 
270
    compression writes).
 
271
  */
 
272
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
 
273
               AZ_METHOD_BLOCK)))
 
274
    return false;
 
275
 
 
276
  *auto_increment= archive_tmp->auto_increment + 1;
 
277
  rows_recorded= (ha_rows)archive_tmp->rows;
 
278
  crashed= archive_tmp->dirty;
 
279
  if (version < global_version)
 
280
  {
 
281
    version_rows= rows_recorded;
 
282
    version= global_version;
 
283
  }
 
284
  azclose(archive_tmp.get());
 
285
 
 
286
  return true;
 
287
}
 
288
 
 
289
 
 
290
/*
 
291
  We create the shared memory space that we will use for the open table.
 
292
  No matter what we try to get or create a share. This is so that a repair
 
293
  table operation can occur.
 
294
 
 
295
  See ha_example.cc for a longer description.
 
296
*/
 
297
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
 
298
{
 
299
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
300
 
 
301
  pthread_mutex_lock(&a_engine->mutex());
 
302
 
 
303
  share= a_engine->findOpenTable(table_name);
 
304
 
 
305
  if (!share)
 
306
  {
 
307
    share= new ArchiveShare(table_name);
 
308
 
 
309
    if (share == NULL)
 
310
    {
 
311
      pthread_mutex_unlock(&a_engine->mutex());
 
312
      *rc= HA_ERR_OUT_OF_MEM;
 
313
      return(NULL);
 
314
    }
 
315
 
 
316
    if (share->prime(&stats.auto_increment_value) == false)
 
317
    {
 
318
      pthread_mutex_unlock(&a_engine->mutex());
 
319
      *rc= HA_ERR_CRASHED_ON_REPAIR;
 
320
      delete share;
 
321
 
 
322
      return NULL;
 
323
    }
 
324
 
 
325
    a_engine->addOpenTable(share->table_name, share);
 
326
    thr_lock_init(&share->_lock);
 
327
  }
 
328
  share->use_count++;
 
329
 
 
330
  if (share->crashed)
 
331
    *rc= HA_ERR_CRASHED_ON_USAGE;
 
332
  pthread_mutex_unlock(&a_engine->mutex());
 
333
 
 
334
  return(share);
 
335
}
 
336
 
 
337
 
 
338
/*
 
339
  Free the share.
 
340
  See ha_example.cc for a description.
 
341
*/
 
342
int ha_archive::free_share()
 
343
{
 
344
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
345
 
 
346
  pthread_mutex_lock(&a_engine->mutex());
 
347
  if (!--share->use_count)
 
348
  {
 
349
    a_engine->deleteOpenTable(share->table_name);
 
350
    delete share;
 
351
  }
 
352
  pthread_mutex_unlock(&a_engine->mutex());
 
353
 
 
354
  return 0;
 
355
}
 
356
 
 
357
int ha_archive::init_archive_writer()
 
358
{
 
359
  /*
 
360
    It is expensive to open and close the data files and since you can't have
 
361
    a gzip file that can be both read and written we keep a writer open
 
362
    that is shared amoung all open tables.
 
363
  */
 
364
  if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
 
365
               O_RDWR, AZ_METHOD_BLOCK)))
 
366
  {
 
367
    share->crashed= true;
 
368
    return(1);
 
369
  }
 
370
  share->archive_write_open= true;
 
371
 
 
372
  return(0);
 
373
}
 
374
 
 
375
 
 
376
/*
 
377
  No locks are required because it is associated with just one Cursor instance
 
378
*/
 
379
int ha_archive::init_archive_reader()
 
380
{
 
381
  /*
 
382
    It is expensive to open and close the data files and since you can't have
 
383
    a gzip file that can be both read and written we keep a writer open
 
384
    that is shared amoung all open tables.
 
385
  */
 
386
  if (archive_reader_open == false)
 
387
  {
 
388
    az_method method;
 
389
 
 
390
    if (archive_aio_state())
 
391
    {
 
392
      method= AZ_METHOD_AIO;
 
393
    }
 
394
    else
 
395
    {
 
396
      method= AZ_METHOD_BLOCK;
 
397
    }
 
398
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
 
399
                 method)))
 
400
    {
 
401
      share->crashed= true;
 
402
      return(1);
 
403
    }
 
404
    archive_reader_open= true;
 
405
  }
 
406
 
 
407
  return(0);
 
408
}
 
409
 
 
410
/*
 
411
  When opening a file we:
 
412
  Create/get our shared structure.
 
413
  Init out lock.
 
414
  We open the file we will read from.
 
415
*/
 
416
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
 
417
{
 
418
  int rc= 0;
 
419
  share= get_share(identifier.getPath().c_str(), &rc);
 
420
 
 
421
  /** 
 
422
    We either fix it ourselves, or we just take it offline 
 
423
 
 
424
    @todo Create some documentation in the recovery tools shipped with the engine.
 
425
  */
 
426
  if (rc == HA_ERR_CRASHED_ON_USAGE)
 
427
  {
 
428
    free_share();
 
429
    rc= repair();
 
430
 
 
431
    return 0;
 
432
  }
 
433
  else if (rc == HA_ERR_OUT_OF_MEM)
 
434
  {
 
435
    return(rc);
 
436
  }
 
437
 
 
438
  assert(share);
 
439
 
 
440
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
 
441
 
 
442
  lock.init(&share->_lock);
 
443
 
 
444
  return(rc);
 
445
}
 
446
 
 
447
// Should never be called
 
448
int ha_archive::open(const char *, int, uint32_t)
 
449
{
 
450
  assert(0);
 
451
  return -1;
 
452
}
 
453
 
 
454
 
 
455
/*
 
456
  Closes the file.
 
457
 
 
458
  SYNOPSIS
 
459
    close();
 
460
 
 
461
  IMPLEMENTATION:
 
462
 
 
463
  We first close this storage engines file handle to the archive and
 
464
  then remove our reference count to the table (and possibly free it
 
465
  as well).
 
466
 
 
467
  RETURN
 
468
    0  ok
 
469
    1  Error
 
470
*/
 
471
 
 
472
int ha_archive::close(void)
 
473
{
 
474
  int rc= 0;
 
475
 
 
476
  record_buffer.clear();
 
477
 
 
478
  /* First close stream */
 
479
  if (archive_reader_open == true)
 
480
  {
 
481
    if (azclose(&archive))
 
482
      rc= 1;
 
483
  }
 
484
  /* then also close share */
 
485
  rc|= free_share();
 
486
 
 
487
  return(rc);
 
488
}
 
489
 
 
490
 
 
491
/*
 
492
  We create our data file here. The format is pretty simple.
 
493
  You can read about the format of the data file above.
 
494
  Unlike other storage engines we do not "pack" our data. Since we
 
495
  are about to do a general compression, packing would just be a waste of
 
496
  CPU time. If the table has blobs they are written after the row in the order
 
497
  of creation.
 
498
*/
 
499
 
 
500
int ArchiveEngine::doCreateTable(Session &,
 
501
                                 Table& table_arg,
 
502
                                 const drizzled::identifier::Table &identifier,
 
503
                                 drizzled::message::Table& proto)
 
504
{
 
505
  int error= 0;
 
506
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
 
507
  uint64_t auto_increment_value;
 
508
  string serialized_proto;
 
509
 
 
510
  auto_increment_value= proto.options().auto_increment_value();
 
511
 
 
512
  for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
 
513
  {
 
514
    KeyInfo *pos= &table_arg.key_info[key];
 
515
    KeyPartInfo *key_part=     pos->key_part;
 
516
    KeyPartInfo *key_part_end= key_part + pos->key_parts;
 
517
 
 
518
    for (; key_part != key_part_end; key_part++)
 
519
    {
 
520
      Field *field= key_part->field;
 
521
 
 
522
      if (!(field->flags & AUTO_INCREMENT_FLAG))
 
523
      {
 
524
        return -1;
 
525
      }
 
526
    }
 
527
  }
 
528
 
 
529
  std::string named_file= identifier.getPath();
 
530
  named_file.append(ARZ);
 
531
 
 
532
  errno= 0;
 
533
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
 
534
             AZ_METHOD_BLOCK) == 0)
 
535
  {
 
536
    error= errno;
 
537
    unlink(named_file.c_str());
 
538
 
 
539
    return(error ? error : -1);
 
540
  }
 
541
 
 
542
  try {
 
543
    proto.SerializeToString(&serialized_proto);
 
544
  }
 
545
  catch (...)
 
546
  {
 
547
    unlink(named_file.c_str());
 
548
 
 
549
    return(error ? error : -1);
 
550
  }
 
551
 
 
552
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
 
553
                  serialized_proto.length()))
 
554
  {
 
555
    unlink(named_file.c_str());
 
556
 
 
557
    return(error ? error : -1);
 
558
  }
 
559
 
 
560
  if (proto.options().has_comment())
 
561
  {
 
562
    int write_length;
 
563
 
 
564
    write_length= azwrite_comment(create_stream.get(),
 
565
                                  proto.options().comment().c_str(),
 
566
                                  proto.options().comment().length());
 
567
 
 
568
    if (write_length < 0)
 
569
    {
 
570
      error= errno;
 
571
      unlink(named_file.c_str());
 
572
 
 
573
      return(error ? error : -1);
 
574
    }
 
575
  }
 
576
 
 
577
  /*
 
578
    Yes you need to do this, because the starting value
 
579
    for the autoincrement may not be zero.
 
580
  */
 
581
  create_stream->auto_increment= auto_increment_value ?
 
582
    auto_increment_value - 1 : 0;
 
583
 
 
584
  if (azclose(create_stream.get()))
 
585
  {
 
586
    error= errno;
 
587
    unlink(named_file.c_str());
 
588
 
 
589
    return(error ? error : -1);
 
590
  }
 
591
 
 
592
  return(0);
 
593
}
 
594
 
 
595
/*
 
596
  This is where the actual row is written out.
 
597
*/
 
598
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
 
599
{
 
600
  off_t written;
 
601
  unsigned int r_pack_length;
 
602
 
 
603
  /* We pack the row for writing */
 
604
  r_pack_length= pack_row(buf);
 
605
 
 
606
  written= azwrite_row(writer, &record_buffer[0], r_pack_length);
 
607
  if (written != r_pack_length)
 
608
  {
 
609
    return(-1);
 
610
  }
 
611
 
 
612
  if (!delayed_insert || !bulk_insert)
 
613
    share->dirty= true;
 
614
 
 
615
  return(0);
 
616
}
 
617
 
 
618
 
 
619
/*
 
620
  Calculate max length needed for row. This includes
 
621
  the bytes required for the length in the header.
 
622
*/
 
623
 
 
624
uint32_t ha_archive::max_row_length(const unsigned char *)
 
625
{
 
626
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
627
  length+= ARCHIVE_ROW_HEADER_SIZE;
 
628
 
 
629
  uint32_t *ptr, *end;
 
630
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
631
       ptr != end ;
 
632
       ptr++)
 
633
  {
 
634
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
635
  }
 
636
 
 
637
  return length;
 
638
}
 
639
 
 
640
 
 
641
unsigned int ha_archive::pack_row(unsigned char *record)
 
642
{
 
643
  unsigned char *ptr;
 
644
 
 
645
  if (fix_rec_buff(max_row_length(record)))
 
646
    return(HA_ERR_OUT_OF_MEM);
 
647
 
 
648
  /* Copy null bits */
 
649
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
 
650
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
651
 
 
652
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
653
  {
 
654
    if (!((*field)->is_null()))
 
655
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
 
656
  }
 
657
 
 
658
  return((unsigned int) (ptr - &record_buffer[0]));
 
659
}
 
660
 
 
661
 
 
662
/*
 
663
  Look at ha_archive::open() for an explanation of the row format.
 
664
  Here we just write out the row.
 
665
 
 
666
  Wondering about start_bulk_insert()? We don't implement it for
 
667
  archive since it optimizes for lots of writes. The only save
 
668
  for implementing start_bulk_insert() is that we could skip
 
669
  setting dirty to true each time.
 
670
*/
 
671
int ha_archive::doInsertRecord(unsigned char *buf)
 
672
{
 
673
  int rc;
 
674
  unsigned char *read_buf= NULL;
 
675
  uint64_t temp_auto;
 
676
  unsigned char *record=  getTable()->getInsertRecord();
 
677
 
 
678
  if (share->crashed)
 
679
    return(HA_ERR_CRASHED_ON_USAGE);
 
680
 
 
681
  pthread_mutex_lock(&share->mutex());
 
682
 
 
683
  if (share->archive_write_open == false)
 
684
    if (init_archive_writer())
 
685
      return(HA_ERR_CRASHED_ON_USAGE);
 
686
 
 
687
 
 
688
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
689
  {
 
690
    update_auto_increment();
 
691
    temp_auto= getTable()->next_number_field->val_int();
 
692
 
 
693
    /*
 
694
      We don't support decremening auto_increment. They make the performance
 
695
      just cry.
 
696
    */
 
697
    if (temp_auto <= share->archive_write.auto_increment &&
 
698
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
699
    {
 
700
      rc= HA_ERR_FOUND_DUPP_KEY;
 
701
      goto error;
 
702
    }
 
703
    else
 
704
    {
 
705
      if (temp_auto > share->archive_write.auto_increment)
 
706
        stats.auto_increment_value=
 
707
          (share->archive_write.auto_increment= temp_auto) + 1;
 
708
    }
 
709
  }
 
710
 
 
711
  /*
 
712
    Notice that the global auto_increment has been increased.
 
713
    In case of a failed row write, we will never try to reuse the value.
 
714
  */
 
715
  share->rows_recorded++;
 
716
  rc= real_write_row(buf,  &(share->archive_write));
 
717
error:
 
718
  pthread_mutex_unlock(&share->mutex());
 
719
  if (read_buf)
 
720
    free((unsigned char*) read_buf);
 
721
 
 
722
  return(rc);
 
723
}
 
724
 
 
725
 
 
726
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
 
727
                                    uint64_t *first_value, uint64_t *nb_reserved_values)
 
728
{
 
729
  *nb_reserved_values= UINT64_MAX;
 
730
  *first_value= share->archive_write.auto_increment + 1;
 
731
}
 
732
 
 
733
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
 
734
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
 
735
{
 
736
  active_index= keynr;
 
737
  return(0);
 
738
}
 
739
 
 
740
 
 
741
/*
 
742
  No indexes, so if we get a request for an index search since we tell
 
743
  the optimizer that we have unique indexes, we scan
 
744
*/
 
745
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
 
746
                             uint32_t key_len, enum ha_rkey_function)
 
747
{
 
748
  int rc;
 
749
  bool found= 0;
 
750
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
751
  current_key= key;
 
752
  current_key_len= key_len;
 
753
 
 
754
  rc= doStartTableScan(true);
 
755
 
 
756
  if (rc)
 
757
    goto error;
 
758
 
 
759
  while (!(get_row(&archive, buf)))
 
760
  {
 
761
    if (!memcmp(current_key, buf + current_k_offset, current_key_len))
 
762
    {
 
763
      found= 1;
 
764
      break;
 
765
    }
 
766
  }
 
767
 
 
768
  if (found)
 
769
    return(0);
 
770
 
 
771
error:
 
772
  return(rc ? rc : HA_ERR_END_OF_FILE);
 
773
}
 
774
 
 
775
 
 
776
int ha_archive::index_next(unsigned char * buf)
 
777
{
 
778
  bool found= 0;
 
779
 
 
780
  while (!(get_row(&archive, buf)))
 
781
  {
 
782
    if (!memcmp(current_key, buf+current_k_offset, current_key_len))
 
783
    {
 
784
      found= 1;
 
785
      break;
 
786
    }
 
787
  }
 
788
 
 
789
  return(found ? 0 : HA_ERR_END_OF_FILE);
 
790
}
 
791
 
 
792
/*
 
793
  All calls that need to scan the table start with this method. If we are told
 
794
  that it is a table scan we rewind the file to the beginning, otherwise
 
795
  we assume the position will be set.
 
796
*/
 
797
 
 
798
int ha_archive::doStartTableScan(bool scan)
 
799
{
 
800
  if (share->crashed)
 
801
      return(HA_ERR_CRASHED_ON_USAGE);
 
802
 
 
803
  init_archive_reader();
 
804
 
 
805
  /* We rewind the file so that we can read from the beginning if scan */
 
806
  if (scan)
 
807
  {
 
808
    if (read_data_header(&archive))
 
809
      return(HA_ERR_CRASHED_ON_USAGE);
 
810
  }
 
811
 
 
812
  return(0);
 
813
}
 
814
 
 
815
 
 
816
/*
 
817
  This is the method that is used to read a row. It assumes that the row is
 
818
  positioned where you want it.
 
819
*/
 
820
int ha_archive::get_row(azio_stream *file_to_read, unsigned char *buf)
 
821
{
 
822
  int rc;
 
823
 
 
824
  if (file_to_read->version == ARCHIVE_VERSION)
 
825
    rc= get_row_version3(file_to_read, buf);
 
826
  else
 
827
    rc= -1;
 
828
 
 
829
  return(rc);
 
830
}
 
831
 
 
832
/* Reallocate buffer if needed */
 
833
bool ha_archive::fix_rec_buff(unsigned int length)
 
834
{
 
835
  record_buffer.resize(length);
 
836
 
 
837
  return false;
 
838
}
 
839
 
 
840
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
 
841
{
 
842
  unsigned int read;
 
843
  int error;
 
844
  const unsigned char *ptr;
 
845
 
 
846
  read= azread_row(file_to_read, &error);
 
847
  ptr= (const unsigned char *)file_to_read->row_ptr;
 
848
 
 
849
  if (error || read == 0)
 
850
  {
 
851
    return(-1);
 
852
  }
 
853
 
 
854
  /* Copy null bits */
 
855
  memcpy(record, ptr, getTable()->getNullBytes());
 
856
  ptr+= getTable()->getNullBytes();
 
857
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
858
  {
 
859
    if (!((*field)->is_null()))
 
860
    {
 
861
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
862
    }
 
863
  }
 
864
  return(0);
 
865
}
 
866
 
 
867
 
 
868
int ha_archive::get_row_version3(azio_stream *file_to_read, unsigned char *buf)
 
869
{
 
870
  int returnable= unpack_row(file_to_read, buf);
 
871
 
 
872
  return(returnable);
 
873
}
 
874
 
 
875
 
 
876
/*
 
877
  Called during ORDER BY. Its position is either from being called sequentially
 
878
  or by having had ha_archive::rnd_pos() called before it is called.
 
879
*/
 
880
 
 
881
int ha_archive::rnd_next(unsigned char *buf)
 
882
{
 
883
  int rc;
 
884
 
 
885
  if (share->crashed)
 
886
      return(HA_ERR_CRASHED_ON_USAGE);
 
887
 
 
888
  if (!scan_rows)
 
889
    return(HA_ERR_END_OF_FILE);
 
890
  scan_rows--;
 
891
 
 
892
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
893
  current_position= aztell(&archive);
 
894
  rc= get_row(&archive, buf);
 
895
 
 
896
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
897
 
 
898
  return(rc);
 
899
}
 
900
 
 
901
 
 
902
/*
 
903
  Thanks to the table bool is_ordered this will be called after
 
904
  each call to ha_archive::rnd_next() if an ordering of the rows is
 
905
  needed.
 
906
*/
 
907
 
 
908
void ha_archive::position(const unsigned char *)
 
909
{
 
910
  internal::my_store_ptr(ref, ref_length, current_position);
 
911
  return;
 
912
}
 
913
 
 
914
 
 
915
/*
 
916
  This is called after a table scan for each row if the results of the
 
917
  scan need to be ordered. It will take *pos and use it to move the
 
918
  cursor in the file so that the next row that is called is the
 
919
  correctly ordered row.
 
920
*/
 
921
 
 
922
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
 
923
{
 
924
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
925
  current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
 
926
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
 
927
    return(HA_ERR_CRASHED_ON_USAGE);
 
928
  return(get_row(&archive, buf));
 
929
}
 
930
 
 
931
/*
 
932
  This method repairs the meta file. It does this by walking the datafile and
 
933
  rewriting the meta file. Currently it does this by calling optimize with
 
934
  the extended flag.
 
935
*/
 
936
int ha_archive::repair()
 
937
{
 
938
  int rc= optimize();
 
939
 
 
940
  if (rc)
 
941
    return(HA_ERR_CRASHED_ON_REPAIR);
 
942
 
 
943
  share->crashed= false;
 
944
  return(0);
 
945
}
 
946
 
 
947
/*
 
948
  The table can become fragmented if data was inserted, read, and then
 
949
  inserted again. What we do is open up the file and recompress it completely.
 
950
*/
 
951
int ha_archive::optimize()
 
952
{
 
953
  int rc= 0;
 
954
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
955
 
 
956
  init_archive_reader();
 
957
 
 
958
  // now we close both our writer and our reader for the rename
 
959
  if (share->archive_write_open)
 
960
  {
 
961
    azclose(&(share->archive_write));
 
962
    share->archive_write_open= false;
 
963
  }
 
964
 
 
965
  char* proto_string;
 
966
  proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
 
967
  if (proto_string == NULL)
 
968
  {
 
969
    return ENOMEM;
 
970
  }
 
971
  azread_frm(&archive, proto_string);
 
972
 
 
973
  /* Lets create a file to contain the new data */
 
974
  std::string writer_filename= share->table_name;
 
975
  writer_filename.append(ARN);
 
976
 
 
977
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
978
  {
 
979
    free(proto_string);
 
980
    return(HA_ERR_CRASHED_ON_USAGE);
 
981
  }
 
982
 
 
983
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
 
984
 
 
985
  /*
 
986
    An extended rebuild is a lot more effort. We open up each row and re-record it.
 
987
    Any dead rows are removed (aka rows that may have been partially recorded).
 
988
 
 
989
    As of Archive format 3, this is the only type that is performed, before this
 
990
    version it was just done on T_EXTEND
 
991
  */
 
992
  if (1)
 
993
  {
 
994
    /*
 
995
      Now we will rewind the archive file so that we are positioned at the
 
996
      start of the file.
 
997
    */
 
998
    azflush(&archive, Z_SYNC_FLUSH);
 
999
    rc= read_data_header(&archive);
 
1000
 
 
1001
    /*
 
1002
      On success of writing out the new header, we now fetch each row and
 
1003
      insert it into the new archive file.
 
1004
    */
 
1005
    if (!rc)
 
1006
    {
 
1007
      uint64_t rows_restored;
 
1008
      share->rows_recorded= 0;
 
1009
      stats.auto_increment_value= 1;
 
1010
      share->archive_write.auto_increment= 0;
 
1011
 
 
1012
      rows_restored= archive.rows;
 
1013
 
 
1014
      for (uint64_t x= 0; x < rows_restored ; x++)
 
1015
      {
 
1016
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1017
 
 
1018
        if (rc != 0)
 
1019
          break;
 
1020
 
 
1021
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1022
        /*
 
1023
          Long term it should be possible to optimize this so that
 
1024
          it is not called on each row.
 
1025
        */
 
1026
        if (getTable()->found_next_number_field)
 
1027
        {
 
1028
          Field *field= getTable()->found_next_number_field;
 
1029
 
 
1030
          /* Since we will need to use field to translate, we need to flip its read bit */
 
1031
          field->setReadSet();
 
1032
 
 
1033
          uint64_t auto_value=
 
1034
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
 
1035
                                               field->offset(getTable()->getInsertRecord()));
 
1036
          if (share->archive_write.auto_increment < auto_value)
 
1037
            stats.auto_increment_value=
 
1038
              (share->archive_write.auto_increment= auto_value) + 1;
 
1039
        }
 
1040
      }
 
1041
      share->rows_recorded= (ha_rows)writer->rows;
 
1042
    }
 
1043
 
 
1044
    if (rc && rc != HA_ERR_END_OF_FILE)
 
1045
    {
 
1046
      goto error;
 
1047
    }
 
1048
  }
 
1049
 
 
1050
  azclose(writer.get());
 
1051
  share->dirty= false;
 
1052
 
 
1053
  azclose(&archive);
 
1054
 
 
1055
  // make the file we just wrote be our data file
 
1056
  rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
 
1057
 
 
1058
  free(proto_string);
 
1059
  return(rc);
 
1060
error:
 
1061
  free(proto_string);
 
1062
  azclose(writer.get());
 
1063
 
 
1064
  return(rc);
 
1065
}
 
1066
 
 
1067
/*
 
1068
  Below is an example of how to setup row level locking.
 
1069
*/
 
1070
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
 
1071
                                       THR_LOCK_DATA **to,
 
1072
                                       enum thr_lock_type lock_type)
 
1073
{
 
1074
  delayed_insert= false;
 
1075
 
 
1076
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1077
  {
 
1078
    /*
 
1079
      Here is where we get into the guts of a row level lock.
 
1080
      If TL_UNLOCK is set
 
1081
      If we are not doing a LOCK Table or DISCARD/IMPORT
 
1082
      TABLESPACE, then allow multiple writers
 
1083
    */
 
1084
 
 
1085
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
 
1086
         lock_type <= TL_WRITE)
 
1087
        && ! session->doing_tablespace_operation())
 
1088
      lock_type = TL_WRITE_ALLOW_WRITE;
 
1089
 
 
1090
    /*
 
1091
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
 
1092
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
 
1093
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
 
1094
      to t2. Convert the lock to a normal read lock to allow
 
1095
      concurrent inserts to t2.
 
1096
    */
 
1097
 
 
1098
    if (lock_type == TL_READ_NO_INSERT)
 
1099
      lock_type = TL_READ;
 
1100
 
 
1101
    lock.type=lock_type;
 
1102
  }
 
1103
 
 
1104
  *to++= &lock;
 
1105
 
 
1106
  return to;
 
1107
}
 
1108
 
 
1109
/*
 
1110
  Hints for optimizer, see ha_tina for more information
 
1111
*/
 
1112
int ha_archive::info(uint32_t flag)
 
1113
{
 
1114
  /*
 
1115
    If dirty, we lock, and then reset/flush the data.
 
1116
    I found that just calling azflush() doesn't always work.
 
1117
  */
 
1118
  pthread_mutex_lock(&share->mutex());
 
1119
  if (share->dirty == true)
 
1120
  {
 
1121
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
1122
    share->rows_recorded= share->archive_write.rows;
 
1123
    share->dirty= false;
 
1124
    if (share->version < global_version)
 
1125
    {
 
1126
      share->version_rows= share->rows_recorded;
 
1127
      share->version= global_version;
 
1128
    }
 
1129
 
 
1130
  }
 
1131
 
 
1132
  /*
 
1133
    This should be an accurate number now, though bulk and delayed inserts can
 
1134
    cause the number to be inaccurate.
 
1135
  */
 
1136
  stats.records= share->rows_recorded;
 
1137
  pthread_mutex_unlock(&share->mutex());
 
1138
 
 
1139
  scan_rows= stats.records;
 
1140
  stats.deleted= 0;
 
1141
 
 
1142
  /* Costs quite a bit more to get all information */
 
1143
  if (flag & HA_STATUS_TIME)
 
1144
  {
 
1145
    struct stat file_stat;  // Stat information for the data file
 
1146
 
 
1147
    stat(share->data_file_name.c_str(), &file_stat);
 
1148
 
 
1149
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1150
    stats.data_file_length= file_stat.st_size;
 
1151
    stats.create_time= file_stat.st_ctime;
 
1152
    stats.update_time= file_stat.st_mtime;
 
1153
    stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
 
1154
  }
 
1155
  stats.delete_length= 0;
 
1156
  stats.index_file_length=0;
 
1157
 
 
1158
  if (flag & HA_STATUS_AUTO)
 
1159
  {
 
1160
    init_archive_reader();
 
1161
    pthread_mutex_lock(&share->mutex());
 
1162
    azflush(&archive, Z_SYNC_FLUSH);
 
1163
    pthread_mutex_unlock(&share->mutex());
 
1164
    stats.auto_increment_value= archive.auto_increment + 1;
 
1165
  }
 
1166
 
 
1167
  return(0);
 
1168
}
 
1169
 
 
1170
 
 
1171
/*
 
1172
  This method tells us that a bulk insert operation is about to occur. We set
 
1173
  a flag which will keep doInsertRecord from saying that its data is dirty. This in
 
1174
  turn will keep selects from causing a sync to occur.
 
1175
  Basically, yet another optimizations to keep compression working well.
 
1176
*/
 
1177
void ha_archive::start_bulk_insert(ha_rows rows)
 
1178
{
 
1179
  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
 
1180
    bulk_insert= true;
 
1181
  return;
 
1182
}
 
1183
 
 
1184
 
 
1185
/*
 
1186
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
 
1187
  flag, and set the share dirty so that the next select will call sync for us.
 
1188
*/
 
1189
int ha_archive::end_bulk_insert()
 
1190
{
 
1191
  bulk_insert= false;
 
1192
  share->dirty= true;
 
1193
  return(0);
 
1194
}
 
1195
 
 
1196
/*
 
1197
  We cancel a truncate command. The only way to delete an archive table is to drop it.
 
1198
  This is done for security reasons. In a later version we will enable this by
 
1199
  allowing the user to select a different row format.
 
1200
*/
 
1201
int ha_archive::delete_all_rows()
 
1202
{
 
1203
  return(HA_ERR_WRONG_COMMAND);
 
1204
}
 
1205
 
 
1206
/*
 
1207
  Simple scan of the tables to make sure everything is ok.
 
1208
*/
 
1209
 
 
1210
int ha_archive::check(Session* session)
 
1211
{
 
1212
  int rc= 0;
 
1213
  const char *old_proc_info;
 
1214
 
 
1215
  old_proc_info= session->get_proc_info();
 
1216
  session->set_proc_info("Checking table");
 
1217
  /* Flush any waiting data */
 
1218
  pthread_mutex_lock(&share->mutex());
 
1219
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
1220
  pthread_mutex_unlock(&share->mutex());
 
1221
 
 
1222
  /*
 
1223
    Now we will rewind the archive file so that we are positioned at the
 
1224
    start of the file.
 
1225
  */
 
1226
  init_archive_reader();
 
1227
  azflush(&archive, Z_SYNC_FLUSH);
 
1228
  read_data_header(&archive);
 
1229
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
 
1230
  {
 
1231
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1232
 
 
1233
    if (rc != 0)
 
1234
      break;
 
1235
  }
 
1236
 
 
1237
  session->set_proc_info(old_proc_info);
 
1238
 
 
1239
  if ((rc && rc != HA_ERR_END_OF_FILE))
 
1240
  {
 
1241
    share->crashed= false;
 
1242
    return(HA_ADMIN_CORRUPT);
 
1243
  }
 
1244
  else
 
1245
  {
 
1246
    return(HA_ADMIN_OK);
 
1247
  }
 
1248
}
 
1249
 
 
1250
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
 
1251
{
 
1252
  int error= 0;
 
1253
 
 
1254
  for (const char **ext= bas_ext(); *ext ; ext++)
 
1255
  {
 
1256
    if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
 
1257
    {
 
1258
      if ((error=errno) != ENOENT)
 
1259
        break;
 
1260
      error= 0;
 
1261
    }
 
1262
  }
 
1263
 
 
1264
  return error;
 
1265
}
 
1266
 
 
1267
bool ArchiveEngine::doDoesTableExist(Session&,
 
1268
                                     const identifier::Table &identifier)
 
1269
{
 
1270
  string proto_path(identifier.getPath());
 
1271
  proto_path.append(ARZ);
 
1272
 
 
1273
  if (access(proto_path.c_str(), F_OK))
 
1274
  {
 
1275
    return false;
 
1276
  }
 
1277
 
 
1278
  return true;
 
1279
}
 
1280
 
 
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
1282
                                          const drizzled::identifier::Schema &schema_identifier,
 
1283
                                          drizzled::identifier::Table::vector &set_of_identifiers)
 
1284
{
 
1285
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
 
1286
 
 
1287
  for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin(); 
 
1288
       entry_iter != entries.end(); ++entry_iter)
 
1289
  {
 
1290
    drizzled::CachedDirectory::Entry *entry= *entry_iter;
 
1291
    const string *filename= &entry->filename;
 
1292
 
 
1293
    assert(filename->size());
 
1294
 
 
1295
    const char *ext= strchr(filename->c_str(), '.');
 
1296
 
 
1297
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
 
1298
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
 
1299
    {  }
 
1300
    else
 
1301
    {
 
1302
      char uname[NAME_LEN + 1];
 
1303
      uint32_t file_name_len;
 
1304
 
 
1305
      file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
1306
      // TODO: Remove need for memory copy here
 
1307
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
 
1308
 
 
1309
      set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
 
1310
    }
 
1311
  }
 
1312
}