~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/archive/ha_archive.cc

  • Committer: patrick crews
  • Date: 2010-10-07 19:35:15 UTC
  • mto: (1819.2.4 drizzle)
  • mto: This revision was merged to the branch mainline in revision 1825.
  • Revision ID: gleebix@gmail.com-20101007193515-jr6y1uz710lzte1o
Initial work on lp bug#656423 - remove use of 'mysql' from test-run tool.  Removed / substituted mtr->dtr mysql->drizzle.  Removed perl errors, but server won't start due to boost error.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
#include "config.h"
19
19
 
20
20
#include "plugin/archive/archive_engine.h"
21
 
#include <memory>
22
 
#include <boost/scoped_ptr.hpp>
23
21
 
24
22
using namespace std;
25
23
using namespace drizzled;
131
129
}
132
130
 
133
131
 
134
 
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
 
132
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
135
133
{
136
134
  string new_path(identifier.getPath());
137
135
 
148
146
}
149
147
 
150
148
int ArchiveEngine::doGetTableDefinition(Session&,
151
 
                                        const identifier::Table &identifier,
 
149
                                        const TableIdentifier &identifier,
152
150
                                        drizzled::message::Table &table_proto)
153
151
{
154
152
  struct stat stat_info;
166
164
    error= EEXIST;
167
165
 
168
166
  {
169
 
    boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
 
167
    azio_stream proto_stream;
170
168
    char* proto_string;
171
 
    if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
 
169
    if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
170
      return HA_ERR_CRASHED_ON_USAGE;
173
171
 
174
 
    proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
 
172
    proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
173
    if (proto_string == NULL)
176
174
    {
177
 
      azclose(proto_stream.get());
 
175
      azclose(&proto_stream);
178
176
      return ENOMEM;
179
177
    }
180
178
 
181
 
    azread_frm(proto_stream.get(), proto_string);
 
179
    azread_frm(&proto_stream, proto_string);
182
180
 
183
 
    if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
 
181
    if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
182
      error= HA_ERR_CRASHED_ON_USAGE;
185
183
 
186
 
    azclose(proto_stream.get());
 
184
    azclose(&proto_stream);
187
185
    free(proto_string);
188
186
  }
189
187
 
198
196
 
199
197
 
200
198
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
201
 
                       Table &table_arg)
 
199
                       TableShare &table_arg)
202
200
  :Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
203
201
{
204
202
  /* Set our original buffer from pre-allocated memory */
261
259
 
262
260
bool ArchiveShare::prime(uint64_t *auto_increment)
263
261
{
264
 
  boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
 
262
  azio_stream archive_tmp;
265
263
 
266
264
  /*
267
265
    We read the meta file, but do not mark it dirty. Since we are not
269
267
    anything but reading... open it for write and we will generate null
270
268
    compression writes).
271
269
  */
272
 
  if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
 
270
  if (!(azopen(&archive_tmp, data_file_name.c_str(), O_RDONLY,
273
271
               AZ_METHOD_BLOCK)))
274
272
    return false;
275
273
 
276
 
  *auto_increment= archive_tmp->auto_increment + 1;
277
 
  rows_recorded= (ha_rows)archive_tmp->rows;
278
 
  crashed= archive_tmp->dirty;
 
274
  *auto_increment= archive_tmp.auto_increment + 1;
 
275
  rows_recorded= (ha_rows)archive_tmp.rows;
 
276
  crashed= archive_tmp.dirty;
279
277
  if (version < global_version)
280
278
  {
281
279
    version_rows= rows_recorded;
282
280
    version= global_version;
283
281
  }
284
 
  azclose(archive_tmp.get());
 
282
  azclose(&archive_tmp);
285
283
 
286
284
  return true;
287
285
}
296
294
*/
297
295
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
298
296
{
299
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
297
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
300
298
 
301
299
  pthread_mutex_lock(&a_engine->mutex());
302
300
 
341
339
*/
342
340
int ha_archive::free_share()
343
341
{
344
 
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
 
342
  ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
345
343
 
346
344
  pthread_mutex_lock(&a_engine->mutex());
347
345
  if (!--share->use_count)
387
385
  {
388
386
    az_method method;
389
387
 
390
 
    if (archive_aio_state())
 
388
    switch (archive_aio_state())
391
389
    {
 
390
    case false:
 
391
      method= AZ_METHOD_BLOCK;
 
392
      break;
 
393
    case true:
392
394
      method= AZ_METHOD_AIO;
393
 
    }
394
 
    else
395
 
    {
 
395
      break;
 
396
    default:
396
397
      method= AZ_METHOD_BLOCK;
397
398
    }
398
399
    if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
413
414
  Init out lock.
414
415
  We open the file we will read from.
415
416
*/
416
 
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
 
417
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
417
418
{
418
419
  int rc= 0;
419
420
  share= get_share(identifier.getPath().c_str(), &rc);
437
438
 
438
439
  assert(share);
439
440
 
440
 
  record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
 
441
  record_buffer.resize(table->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
441
442
 
442
443
  lock.init(&share->_lock);
443
444
 
499
500
 
500
501
int ArchiveEngine::doCreateTable(Session &,
501
502
                                 Table& table_arg,
502
 
                                 const drizzled::identifier::Table &identifier,
 
503
                                 const drizzled::TableIdentifier &identifier,
503
504
                                 drizzled::message::Table& proto)
504
505
{
505
506
  int error= 0;
506
 
  boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
 
507
  azio_stream create_stream;            /* Archive file we are working with */
507
508
  uint64_t auto_increment_value;
508
509
  string serialized_proto;
509
510
 
530
531
  named_file.append(ARZ);
531
532
 
532
533
  errno= 0;
533
 
  if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
 
534
  if (azopen(&create_stream, named_file.c_str(), O_CREAT|O_RDWR,
534
535
             AZ_METHOD_BLOCK) == 0)
535
536
  {
536
537
    error= errno;
549
550
    return(error ? error : -1);
550
551
  }
551
552
 
552
 
  if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
 
553
  if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
554
                  serialized_proto.length()))
554
555
  {
555
556
    unlink(named_file.c_str());
561
562
  {
562
563
    int write_length;
563
564
 
564
 
    write_length= azwrite_comment(create_stream.get(),
 
565
    write_length= azwrite_comment(&create_stream,
565
566
                                  proto.options().comment().c_str(),
566
567
                                  proto.options().comment().length());
567
568
 
578
579
    Yes you need to do this, because the starting value
579
580
    for the autoincrement may not be zero.
580
581
  */
581
 
  create_stream->auto_increment= auto_increment_value ?
 
582
  create_stream.auto_increment= auto_increment_value ?
582
583
    auto_increment_value - 1 : 0;
583
584
 
584
 
  if (azclose(create_stream.get()))
 
585
  if (azclose(&create_stream))
585
586
  {
586
587
    error= errno;
587
588
    unlink(named_file.c_str());
623
624
 
624
625
uint32_t ha_archive::max_row_length(const unsigned char *)
625
626
{
626
 
  uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
 
627
  uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
628
  length+= ARCHIVE_ROW_HEADER_SIZE;
628
629
 
629
630
  uint32_t *ptr, *end;
630
 
  for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
 
631
  for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
631
632
       ptr != end ;
632
633
       ptr++)
633
634
  {
634
 
      length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
 
635
      length += 2 + ((Field_blob*)table->getField(*ptr))->get_length();
635
636
  }
636
637
 
637
638
  return length;
646
647
    return(HA_ERR_OUT_OF_MEM);
647
648
 
648
649
  /* Copy null bits */
649
 
  memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
 
  ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
 
650
  memcpy(&record_buffer[0], record, table->getShare()->null_bytes);
 
651
  ptr= &record_buffer[0] + table->getShare()->null_bytes;
651
652
 
652
 
  for (Field **field=getTable()->getFields() ; *field ; field++)
 
653
  for (Field **field=table->getFields() ; *field ; field++)
653
654
  {
654
655
    if (!((*field)->is_null()))
655
656
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
673
674
  int rc;
674
675
  unsigned char *read_buf= NULL;
675
676
  uint64_t temp_auto;
676
 
  unsigned char *record=  getTable()->getInsertRecord();
 
677
  unsigned char *record=  table->getInsertRecord();
677
678
 
678
679
  if (share->crashed)
679
680
    return(HA_ERR_CRASHED_ON_USAGE);
685
686
      return(HA_ERR_CRASHED_ON_USAGE);
686
687
 
687
688
 
688
 
  if (getTable()->next_number_field && record == getTable()->getInsertRecord())
 
689
  if (table->next_number_field && record == table->getInsertRecord())
689
690
  {
690
691
    update_auto_increment();
691
 
    temp_auto= getTable()->next_number_field->val_int();
 
692
    temp_auto= table->next_number_field->val_int();
692
693
 
693
694
    /*
694
695
      We don't support decremening auto_increment. They make the performance
695
696
      just cry.
696
697
    */
697
698
    if (temp_auto <= share->archive_write.auto_increment &&
698
 
        getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
 
699
        table->getShare()->getKeyInfo(0).flags & HA_NOSAME)
699
700
    {
700
701
      rc= HA_ERR_FOUND_DUPP_KEY;
701
702
      goto error;
747
748
{
748
749
  int rc;
749
750
  bool found= 0;
750
 
  current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
 
751
  current_k_offset= table->getShare()->getKeyInfo(0).key_part->offset;
751
752
  current_key= key;
752
753
  current_key_len= key_len;
753
754
 
852
853
  }
853
854
 
854
855
  /* Copy null bits */
855
 
  memcpy(record, ptr, getTable()->getNullBytes());
856
 
  ptr+= getTable()->getNullBytes();
857
 
  for (Field **field= getTable()->getFields() ; *field ; field++)
 
856
  memcpy(record, ptr, table->getNullBytes());
 
857
  ptr+= table->getNullBytes();
 
858
  for (Field **field= table->getFields() ; *field ; field++)
858
859
  {
859
860
    if (!((*field)->is_null()))
860
861
    {
861
 
      ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
 
862
      ptr= (*field)->unpack(record + (*field)->offset(table->getInsertRecord()), ptr);
862
863
    }
863
864
  }
864
865
  return(0);
893
894
  current_position= aztell(&archive);
894
895
  rc= get_row(&archive, buf);
895
896
 
896
 
  getTable()->status=rc ? STATUS_NOT_FOUND: 0;
 
897
  table->status=rc ? STATUS_NOT_FOUND: 0;
897
898
 
898
899
  return(rc);
899
900
}
951
952
int ha_archive::optimize()
952
953
{
953
954
  int rc= 0;
954
 
  boost::scoped_ptr<azio_stream> writer(new azio_stream);
 
955
  azio_stream writer;
955
956
 
956
957
  init_archive_reader();
957
958
 
974
975
  std::string writer_filename= share->table_name;
975
976
  writer_filename.append(ARN);
976
977
 
977
 
  if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
 
978
  if (!(azopen(&writer, writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
978
979
  {
979
980
    free(proto_string);
980
981
    return(HA_ERR_CRASHED_ON_USAGE);
981
982
  }
982
983
 
983
 
  azwrite_frm(writer.get(), proto_string, archive.frm_length);
 
984
  azwrite_frm(&writer, proto_string, archive.frm_length);
984
985
 
985
986
  /*
986
987
    An extended rebuild is a lot more effort. We open up each row and re-record it.
1013
1014
 
1014
1015
      for (uint64_t x= 0; x < rows_restored ; x++)
1015
1016
      {
1016
 
        rc= get_row(&archive, getTable()->getInsertRecord());
 
1017
        rc= get_row(&archive, table->getInsertRecord());
1017
1018
 
1018
1019
        if (rc != 0)
1019
1020
          break;
1020
1021
 
1021
 
        real_write_row(getTable()->getInsertRecord(), writer.get());
 
1022
        real_write_row(table->getInsertRecord(), &writer);
1022
1023
        /*
1023
1024
          Long term it should be possible to optimize this so that
1024
1025
          it is not called on each row.
1025
1026
        */
1026
 
        if (getTable()->found_next_number_field)
 
1027
        if (table->found_next_number_field)
1027
1028
        {
1028
 
          Field *field= getTable()->found_next_number_field;
 
1029
          Field *field= table->found_next_number_field;
1029
1030
 
1030
1031
          /* Since we will need to use field to translate, we need to flip its read bit */
1031
1032
          field->setReadSet();
1032
1033
 
1033
1034
          uint64_t auto_value=
1034
 
            (uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
 
                                               field->offset(getTable()->getInsertRecord()));
 
1035
            (uint64_t) field->val_int(table->getInsertRecord() +
 
1036
                                       field->offset(table->getInsertRecord()));
1036
1037
          if (share->archive_write.auto_increment < auto_value)
1037
1038
            stats.auto_increment_value=
1038
1039
              (share->archive_write.auto_increment= auto_value) + 1;
1039
1040
        }
1040
1041
      }
1041
 
      share->rows_recorded= (ha_rows)writer->rows;
 
1042
      share->rows_recorded= (ha_rows)writer.rows;
1042
1043
    }
1043
1044
 
1044
1045
    if (rc && rc != HA_ERR_END_OF_FILE)
1047
1048
    }
1048
1049
  }
1049
1050
 
1050
 
  azclose(writer.get());
 
1051
  azclose(&writer);
1051
1052
  share->dirty= false;
1052
1053
 
1053
1054
  azclose(&archive);
1059
1060
  return(rc);
1060
1061
error:
1061
1062
  free(proto_string);
1062
 
  azclose(writer.get());
 
1063
  azclose(&writer);
1063
1064
 
1064
1065
  return(rc);
1065
1066
}
1084
1085
 
1085
1086
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
1087
         lock_type <= TL_WRITE)
1087
 
        && ! session->doing_tablespace_operation())
 
1088
        && !session_tablespace_op(session))
1088
1089
      lock_type = TL_WRITE_ALLOW_WRITE;
1089
1090
 
1090
1091
    /*
1146
1147
 
1147
1148
    stat(share->data_file_name.c_str(), &file_stat);
1148
1149
 
1149
 
    stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
 
1150
    stats.mean_rec_length= table->getRecordLength()+ buffer.alloced_length();
1150
1151
    stats.data_file_length= file_stat.st_size;
1151
1152
    stats.create_time= file_stat.st_ctime;
1152
1153
    stats.update_time= file_stat.st_mtime;
1212
1213
  int rc= 0;
1213
1214
  const char *old_proc_info;
1214
1215
 
1215
 
  old_proc_info= session->get_proc_info();
1216
 
  session->set_proc_info("Checking table");
 
1216
  old_proc_info= get_session_proc_info(session);
 
1217
  set_session_proc_info(session, "Checking table");
1217
1218
  /* Flush any waiting data */
1218
1219
  pthread_mutex_lock(&share->mutex());
1219
1220
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1228
1229
  read_data_header(&archive);
1229
1230
  for (uint64_t x= 0; x < share->archive_write.rows; x++)
1230
1231
  {
1231
 
    rc= get_row(&archive, getTable()->getInsertRecord());
 
1232
    rc= get_row(&archive, table->getInsertRecord());
1232
1233
 
1233
1234
    if (rc != 0)
1234
1235
      break;
1235
1236
  }
1236
1237
 
1237
 
  session->set_proc_info(old_proc_info);
 
1238
  set_session_proc_info(session, old_proc_info);
1238
1239
 
1239
1240
  if ((rc && rc != HA_ERR_END_OF_FILE))
1240
1241
  {
1247
1248
  }
1248
1249
}
1249
1250
 
1250
 
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
 
1251
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1251
1252
{
1252
1253
  int error= 0;
1253
1254
 
1265
1266
}
1266
1267
 
1267
1268
bool ArchiveEngine::doDoesTableExist(Session&,
1268
 
                                     const identifier::Table &identifier)
 
1269
                                     const TableIdentifier &identifier)
1269
1270
{
1270
1271
  string proto_path(identifier.getPath());
1271
1272
  proto_path.append(ARZ);
1279
1280
}
1280
1281
 
1281
1282
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
 
                                          const drizzled::identifier::Schema &schema_identifier,
1283
 
                                          drizzled::identifier::Table::vector &set_of_identifiers)
 
1283
                                          const drizzled::SchemaIdentifier &schema_identifier,
 
1284
                                          drizzled::TableIdentifiers &set_of_identifiers)
1284
1285
{
1285
1286
  drizzled::CachedDirectory::Entries entries= directory.getEntries();
1286
1287
 
1302
1303
      char uname[NAME_LEN + 1];
1303
1304
      uint32_t file_name_len;
1304
1305
 
1305
 
      file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
 
1306
      file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
1307
      // TODO: Remove need for memory copy here
1307
1308
      uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL 
1308
1309
 
1309
 
      set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
 
1310
      set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1310
1311
    }
1311
1312
  }
1312
1313
}