~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/csv/ha_tina.cc

  • Committer: Monty Taylor
  • Date: 2010-02-05 08:11:15 UTC
  • mfrom: (1283 build)
  • mto: (1273.13.43 fix_is)
  • mto: This revision was merged to the branch mainline in revision 1300.
  • Revision ID: mordred@inaugust.com-20100205081115-dr82nvrwv4lvw7sd
Merged trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
57
57
#include <map>
58
58
 
59
59
using namespace std;
 
60
using namespace drizzled;
60
61
 
61
62
/*
62
63
  unsigned char + unsigned char + uint64_t + uint64_t + uint64_t + uint64_t + unsigned char
75
76
static int read_meta_file(int meta_file, ha_rows *rows);
76
77
static int write_meta_file(int meta_file, ha_rows rows, bool dirty);
77
78
 
78
 
extern "C" void tina_get_status(void* param, int concurrent_insert);
79
 
extern "C" void tina_update_status(void* param);
80
 
extern "C" bool tina_check_status(void* param);
 
79
void tina_get_status(void* param, int concurrent_insert);
 
80
void tina_update_status(void* param);
 
81
bool tina_check_status(void* param);
81
82
 
82
83
/* Stuff for shares */
83
84
pthread_mutex_t tina_mutex;
169
170
 
170
171
  for (const char **ext= bas_ext(); *ext ; ext++)
171
172
  {
172
 
    fn_format(buff, table_path.c_str(), "", *ext,
 
173
    internal::fn_format(buff, table_path.c_str(), "", *ext,
173
174
              MY_UNPACK_FILENAME|MY_APPEND_EXT);
174
 
    if (my_delete_with_symlink(buff, MYF(0)))
 
175
    if (internal::my_delete_with_symlink(buff, MYF(0)))
175
176
    {
176
177
      if ((error= errno) != ENOENT)
177
178
        break;
267
268
    crashed(false), rows_recorded(0), data_file_version(0)
268
269
{
269
270
  thr_lock_init(&lock);
270
 
  fn_format(data_file_name, table_name_arg, "", CSV_EXT,
 
271
  internal::fn_format(data_file_name, table_name_arg, "", CSV_EXT,
271
272
            MY_REPLACE_EXT|MY_UNPACK_FILENAME);
272
273
}
273
274
 
304
305
      return NULL;
305
306
    }
306
307
 
307
 
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
308
    internal::fn_format(meta_file_name, table_name, "", CSM_EXT,
308
309
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
309
310
 
310
311
    if (stat(share->data_file_name, &file_stat))
326
327
      Usually this will result in auto-repair, and we will get a good
327
328
      meta-file in the end.
328
329
    */
329
 
    if ((share->meta_file= my_open(meta_file_name,
 
330
    if ((share->meta_file= internal::my_open(meta_file_name,
330
331
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
331
332
      share->crashed= true;
332
333
 
369
370
  unsigned char *ptr= meta_buffer;
370
371
 
371
372
  lseek(meta_file, 0, SEEK_SET);
372
 
  if (my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
 
373
  if (internal::my_read(meta_file, (unsigned char*)meta_buffer, META_BUFFER_SIZE, 0)
373
374
      != META_BUFFER_SIZE)
374
375
    return(HA_ERR_CRASHED_ON_USAGE);
375
376
 
391
392
      ((bool)(*ptr)== true))
392
393
    return(HA_ERR_CRASHED_ON_USAGE);
393
394
 
394
 
  my_sync(meta_file, MYF(MY_WME));
 
395
  internal::my_sync(meta_file, MYF(MY_WME));
395
396
 
396
397
  return(0);
397
398
}
436
437
  *ptr= (unsigned char)dirty;
437
438
 
438
439
  lseek(meta_file, 0, SEEK_SET);
439
 
  if (my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
 
440
  if (internal::my_write(meta_file, (unsigned char *)meta_buffer, META_BUFFER_SIZE, 0)
440
441
      != META_BUFFER_SIZE)
441
442
    return(-1);
442
443
 
443
 
  my_sync(meta_file, MYF(MY_WME));
 
444
  internal::my_sync(meta_file, MYF(MY_WME));
444
445
 
445
446
  return(0);
446
447
}
455
456
  (void)write_meta_file(share->meta_file, share->rows_recorded, true);
456
457
 
457
458
  if ((share->tina_write_filedes=
458
 
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
 
459
        internal::my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
459
460
  {
460
461
    share->crashed= true;
461
462
    return(1);
477
478
    /* Write the meta file. Mark it as crashed if needed. */
478
479
    (void)write_meta_file(share->meta_file, share->rows_recorded,
479
480
                          share->crashed ? true :false);
480
 
    if (my_close(share->meta_file, MYF(0)))
 
481
    if (internal::my_close(share->meta_file, MYF(0)))
481
482
      result_code= 1;
482
483
    if (share->tina_write_opened)
483
484
    {
484
 
      if (my_close(share->tina_write_filedes, MYF(0)))
 
485
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
485
486
        result_code= 1;
486
487
      share->tina_write_opened= false;
487
488
    }
894
895
  }
895
896
 
896
897
  local_data_file_version= share->data_file_version;
897
 
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
 
898
  if ((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
898
899
    return(0);
899
900
 
900
901
  /*
920
921
int ha_tina::close(void)
921
922
{
922
923
  int rc= 0;
923
 
  rc= my_close(data_file, MYF(0));
 
924
  rc= internal::my_close(data_file, MYF(0));
924
925
  return(free_share() || rc);
925
926
}
926
927
 
945
946
      return(-1);
946
947
 
947
948
   /* use pwrite, as concurrent reader could have changed the position */
948
 
  if (my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
 
949
  if (internal::my_write(share->tina_write_filedes, (unsigned char*)buffer.ptr(), size,
949
950
               MYF(MY_WME | MY_NABP)))
950
951
    return(-1);
951
952
 
970
971
  if (!share->update_file_opened)
971
972
  {
972
973
    if ((update_temp_file=
973
 
           my_create(fn_format(updated_fname, share->table_name.c_str(),
 
974
           internal::my_create(internal::fn_format(updated_fname, share->table_name.c_str(),
974
975
                               "", CSN_EXT,
975
976
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
976
977
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1014
1015
  if (open_update_temp_file_if_needed())
1015
1016
    goto err;
1016
1017
 
1017
 
  if (my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
 
1018
  if (internal::my_write(update_temp_file, (unsigned char*)buffer.ptr(), size,
1018
1019
               MYF(MY_WME | MY_NABP)))
1019
1020
    goto err;
1020
1021
  temp_file_length+= size;
1070
1071
  if (local_data_file_version != share->data_file_version)
1071
1072
  {
1072
1073
    local_data_file_version= share->data_file_version;
1073
 
    if (my_close(data_file, MYF(0)) ||
1074
 
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
 
1074
    if (internal::my_close(data_file, MYF(0)) ||
 
1075
        (data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
1075
1076
      return 1;
1076
1077
  }
1077
1078
  file_buff->init_buff(data_file);
1170
1171
*/
1171
1172
void ha_tina::position(const unsigned char *)
1172
1173
{
1173
 
  my_store_ptr(ref, ref_length, current_position);
 
1174
  internal::my_store_ptr(ref, ref_length, current_position);
1174
1175
  return;
1175
1176
}
1176
1177
 
1177
1178
 
1178
1179
/*
1179
1180
  Used to fetch a row from a posiion stored with ::position().
1180
 
  my_get_ptr() retrieves the data for you.
 
1181
  internal::my_get_ptr() retrieves the data for you.
1181
1182
*/
1182
1183
 
1183
1184
int ha_tina::rnd_pos(unsigned char * buf, unsigned char *pos)
1184
1185
{
1185
1186
  ha_statistic_increment(&SSV::ha_read_rnd_count);
1186
 
  current_position= (off_t)my_get_ptr(pos,ref_length);
 
1187
  current_position= (off_t)internal::my_get_ptr(pos,ref_length);
1187
1188
  return(find_current_row(buf));
1188
1189
}
1189
1190
 
1244
1245
      The sort is needed when there were updates/deletes with random orders.
1245
1246
      It sorts so that we move the firts blocks to the beginning.
1246
1247
    */
1247
 
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1248
 
             (qsort_cmp)sort_set);
 
1248
    internal::my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1249
                       (qsort_cmp)sort_set);
1249
1250
 
1250
1251
    off_t write_begin= 0, write_end;
1251
1252
 
1266
1267
      /* if there is something to write, write it */
1267
1268
      if (write_length)
1268
1269
      {
1269
 
        if (my_write(update_temp_file,
 
1270
        if (internal::my_write(update_temp_file,
1270
1271
                     (unsigned char*) (file_buff->ptr() +
1271
1272
                               (write_begin - file_buff->start())),
1272
1273
                     (size_t)write_length, MYF_RW))
1289
1290
 
1290
1291
    }
1291
1292
 
1292
 
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
1293
 
        my_close(update_temp_file, MYF(0)))
 
1293
    if (internal::my_sync(update_temp_file, MYF(MY_WME)) ||
 
1294
        internal::my_close(update_temp_file, MYF(0)))
1294
1295
      return(-1);
1295
1296
 
1296
1297
    share->update_file_opened= false;
1297
1298
 
1298
1299
    if (share->tina_write_opened)
1299
1300
    {
1300
 
      if (my_close(share->tina_write_filedes, MYF(0)))
 
1301
      if (internal::my_close(share->tina_write_filedes, MYF(0)))
1301
1302
        return(-1);
1302
1303
      /*
1303
1304
        Mark that the writer fd is closed, so that init_tina_writer()
1310
1311
      Close opened fildes's. Then move updated file in place
1311
1312
      of the old datafile.
1312
1313
    */
1313
 
    if (my_close(data_file, MYF(0)) ||
1314
 
        my_rename(fn_format(updated_fname, share->table_name.c_str(),
1315
 
                            "", CSN_EXT,
1316
 
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1317
 
                  share->data_file_name, MYF(0)))
 
1314
    if (internal::my_close(data_file, MYF(0)) ||
 
1315
        internal::my_rename(internal::fn_format(updated_fname,
 
1316
                                                share->table_name.c_str(),
 
1317
                                                "", CSN_EXT,
 
1318
                                                MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1319
                            share->data_file_name, MYF(0)))
1318
1320
      return(-1);
1319
1321
 
1320
1322
    /* Open the file again */
1321
 
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
 
1323
    if (((data_file= internal::my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1322
1324
      return(-1);
1323
1325
    /*
1324
1326
      As we reopened the data file, increase share->data_file_version
1345
1347
 
1346
1348
  return(0);
1347
1349
error:
1348
 
  my_close(update_temp_file, MYF(0));
 
1350
  internal::my_close(update_temp_file, MYF(0));
1349
1351
  share->update_file_opened= false;
1350
1352
  return(-1);
1351
1353
}
1403
1405
  }
1404
1406
 
1405
1407
 
1406
 
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSM_EXT,
 
1408
  if ((create_file= internal::my_create(internal::fn_format(name_buff, table_name, "", CSM_EXT,
1407
1409
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
1408
1410
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1409
1411
    return(-1);
1410
1412
 
1411
1413
  write_meta_file(create_file, 0, false);
1412
 
  my_close(create_file, MYF(0));
 
1414
  internal::my_close(create_file, MYF(0));
1413
1415
 
1414
 
  if ((create_file= my_create(fn_format(name_buff, table_name, "", CSV_EXT,
 
1416
  if ((create_file= internal::my_create(internal::fn_format(name_buff, table_name, "", CSV_EXT,
1415
1417
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1416
1418
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1417
1419
    return(-1);
1418
1420
 
1419
 
  my_close(create_file, MYF(0));
 
1421
  internal::my_close(create_file, MYF(0));
1420
1422
 
1421
1423
  pthread_mutex_lock(&proto_cache_mutex);
1422
1424
  proto_cache.insert(make_pair(table_name, create_proto));