~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/handler.cc

  • Committer: Brian Aker
  • Date: 2008-12-06 22:41:58 UTC
  • Revision ID: brian@tangent.org-20081206224158-oj4j95n7w0mtxxd1
Completing up replication API.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
*/
25
25
 
26
26
#include <drizzled/server_includes.h>
27
 
#include <drizzled/rpl_filter.h>
 
27
#include <libdrizzle/libdrizzle.h>
 
28
#include <mysys/hash.h>
28
29
#include <drizzled/error.h>
29
30
#include <drizzled/gettext.h>
30
31
#include <drizzled/data_home.h>
32
33
#include <drizzled/sql_parse.h>
33
34
#include <drizzled/cost_vect.h>
34
35
#include CMATH_H
 
36
#include <drizzled/session.h>
 
37
#include <drizzled/sql_base.h>
 
38
#include <drizzled/replicator.h>
35
39
 
36
40
#if defined(CMATH_NAMESPACE)
37
41
using namespace CMATH_NAMESPACE;
38
42
#endif
39
43
 
40
44
 
 
45
extern HASH open_cache;
 
46
 
41
47
KEY_CREATE_INFO default_key_create_info= { HA_KEY_ALG_UNDEF, 0, {NULL,0}, {NULL,0} };
42
48
 
43
49
/* number of entries in handlertons[] */
78
84
 
79
85
  /* Allocate a pointer array for the error message strings. */
80
86
  /* Zerofill it to avoid uninitialized gaps. */
81
 
  if (! (errmsgs= (const char**) my_malloc(HA_ERR_ERRORS * sizeof(char*),
82
 
                                           MYF(MY_WME | MY_ZEROFILL))))
 
87
  if (! (errmsgs= (const char**) malloc(HA_ERR_ERRORS * sizeof(char*))))
83
88
    return 1;
 
89
  memset(errmsgs, 0, HA_ERR_ERRORS * sizeof(char *));
84
90
 
85
91
  /* Set the dedicated error messages. */
86
92
  SETMSG(HA_ERR_KEY_NOT_FOUND,          ER(ER_KEY_NOT_FOUND));
149
155
  return 0;
150
156
}
151
157
 
152
 
 
153
158
int ha_init()
154
159
{
155
160
  int error= 0;
1031
1036
  for (info.len= MAX_XID_LIST_SIZE ;
1032
1037
       info.list==0 && info.len > MIN_XID_LIST_SIZE; info.len/=2)
1033
1038
  {
1034
 
    info.list=(XID *)my_malloc(info.len*sizeof(XID), MYF(0));
 
1039
    info.list=(XID *)malloc(info.len*sizeof(XID));
1035
1040
  }
1036
1041
  if (!info.list)
1037
1042
  {
1360
1365
             Session *session __attribute__((unused)))
1361
1366
{
1362
1367
  /* Grab the error message */
1363
 
  strmake(buff, message, sizeof(buff)-1);
 
1368
  strncpy(buff, message, sizeof(buff)-1);
1364
1369
  return true;
1365
1370
}
1366
1371
 
1367
1372
 
 
1373
struct handlerton_delete_table_args {
 
1374
  Session *session;
 
1375
  const char *path;
 
1376
  handler *file;
 
1377
  int error;
 
1378
};
 
1379
 
 
1380
static bool deletetable_handlerton(Session *unused1 __attribute__((unused)),
 
1381
                                   plugin_ref plugin,
 
1382
                                   void *args)
 
1383
{
 
1384
  struct handlerton_delete_table_args *dtargs= (struct handlerton_delete_table_args *) args;
 
1385
 
 
1386
  Session *session= dtargs->session;
 
1387
  const char *path= dtargs->path;
 
1388
 
 
1389
  handler *file;
 
1390
  char tmp_path[FN_REFLEN];
 
1391
 
 
1392
  if(dtargs->error!=ENOENT) /* already deleted table */
 
1393
    return false;
 
1394
 
 
1395
  handlerton *table_type= plugin_data(plugin, handlerton *);
 
1396
 
 
1397
  if(!table_type)
 
1398
    return false;
 
1399
 
 
1400
  if(!(table_type->state == SHOW_OPTION_YES && table_type->create))
 
1401
    return false;
 
1402
 
 
1403
  if ((file= table_type->create(table_type, NULL, session->mem_root)))
 
1404
    file->init();
 
1405
  else
 
1406
    return false;
 
1407
 
 
1408
  path= check_lowercase_names(file, path, tmp_path);
 
1409
  int error= file->ha_delete_table(path);
 
1410
 
 
1411
  if(error!=ENOENT)
 
1412
  {
 
1413
    dtargs->error= error;
 
1414
    if(dtargs->file)
 
1415
      delete dtargs->file;
 
1416
    dtargs->file= file;
 
1417
    return true;
 
1418
  }
 
1419
 
 
1420
  return false;
 
1421
}
 
1422
 
1368
1423
/**
1369
1424
  This should return ENOENT if the file doesn't exists.
1370
1425
  The .frm file will be deleted only if we return 0 or ENOENT
1371
1426
*/
1372
 
int ha_delete_table(Session *session, handlerton *table_type, const char *path,
 
1427
int ha_delete_table(Session *session, const char *path,
1373
1428
                    const char *db, const char *alias, bool generate_warning)
1374
1429
{
1375
 
  handler *file;
1376
 
  char tmp_path[FN_REFLEN];
1377
 
  int error;
 
1430
  TABLE_SHARE dummy_share;
1378
1431
  Table dummy_table;
1379
 
  TABLE_SHARE dummy_share;
 
1432
 
 
1433
  struct handlerton_delete_table_args dtargs;
 
1434
  dtargs.error= ENOENT;
 
1435
  dtargs.session= session;
 
1436
  dtargs.path= path;
 
1437
  dtargs.file= NULL;
 
1438
 
 
1439
  plugin_foreach(NULL, deletetable_handlerton, DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
1440
                 &dtargs);
1380
1441
 
1381
1442
  memset(&dummy_table, 0, sizeof(dummy_table));
1382
1443
  memset(&dummy_share, 0, sizeof(dummy_share));
1383
1444
  dummy_table.s= &dummy_share;
1384
1445
 
1385
 
  /* DB_TYPE_UNKNOWN is used in ALTER Table when renaming only .frm files */
1386
 
  if (table_type == NULL ||
1387
 
      ! (file=get_new_handler((TABLE_SHARE*)0, session->mem_root, table_type)))
1388
 
    return(ENOENT);
1389
 
 
1390
 
  path= check_lowercase_names(file, path, tmp_path);
1391
 
  if ((error= file->ha_delete_table(path)) && generate_warning)
 
1446
  if (dtargs.error && generate_warning)
1392
1447
  {
1393
1448
    /*
1394
1449
      Because file->print_error() use my_error() to generate the error message
1407
1462
    dummy_share.table_name.length= strlen(alias);
1408
1463
    dummy_table.alias= alias;
1409
1464
 
 
1465
    handler *file= dtargs.file;
1410
1466
    file->change_table_ptr(&dummy_table, &dummy_share);
1411
1467
 
1412
1468
    session->push_internal_handler(&ha_delete_table_error_handler);
1413
 
    file->print_error(error, 0);
 
1469
    file->print_error(dtargs.error, 0);
1414
1470
 
1415
1471
    session->pop_internal_handler();
1416
1472
 
1418
1474
      XXX: should we convert *all* errors to warnings here?
1419
1475
      What if the error is fatal?
1420
1476
    */
1421
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_ERROR, error,
 
1477
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_ERROR, dtargs.error,
1422
1478
                ha_delete_table_error_handler.buff);
1423
1479
  }
1424
 
  delete file;
1425
 
  return(error);
 
1480
 
 
1481
  if(dtargs.file)
 
1482
    delete dtargs.file;
 
1483
 
 
1484
  return dtargs.error;
1426
1485
}
1427
1486
 
1428
1487
/****************************************************************************
1916
1975
  {
1917
1976
    auto_inc_interval_for_cur_row.replace(nr, nb_reserved_values,
1918
1977
                                          variables->auto_increment_increment);
1919
 
    /* Row-based replication does not need to store intervals in binlog */
1920
 
    if (!session->current_stmt_binlog_row_based)
1921
 
        session->auto_inc_intervals_in_cur_stmt_for_binlog.append(auto_inc_interval_for_cur_row.minimum(),
1922
 
                                                              auto_inc_interval_for_cur_row.values(),
1923
 
                                                              variables->auto_increment_increment);
1924
1978
  }
1925
1979
 
1926
1980
  /*
2319
2373
  return 0;
2320
2374
}
2321
2375
 
2322
 
 
2323
 
static bool update_frm_version(Table *table)
2324
 
{
2325
 
  char path[FN_REFLEN];
2326
 
  File file;
2327
 
  bool result= true;
2328
 
 
2329
 
  /*
2330
 
    No need to update frm version in case table was created or checked
2331
 
    by server with the same version. This also ensures that we do not
2332
 
    update frm version for temporary tables as this code doesn't support
2333
 
    temporary tables.
2334
 
  */
2335
 
  if (table->s->mysql_version == DRIZZLE_VERSION_ID)
2336
 
    return(0);
2337
 
 
2338
 
  strxmov(path, table->s->normalized_path.str, reg_ext, NULL);
2339
 
 
2340
 
  if ((file= my_open(path, O_RDWR, MYF(MY_WME))) >= 0)
2341
 
  {
2342
 
    unsigned char version[4];
2343
 
    char *key= table->s->table_cache_key.str;
2344
 
    uint32_t key_length= table->s->table_cache_key.length;
2345
 
    Table *entry;
2346
 
    HASH_SEARCH_STATE state;
2347
 
 
2348
 
    int4store(version, DRIZZLE_VERSION_ID);
2349
 
 
2350
 
    if (pwrite(file, (unsigned char*)version, 4, 51L) == 0)
2351
 
    {
2352
 
      result= false;
2353
 
      goto err;
2354
 
    }
2355
 
 
2356
 
    for (entry=(Table*) hash_first(&open_cache,(unsigned char*) key,key_length, &state);
2357
 
         entry;
2358
 
         entry= (Table*) hash_next(&open_cache,(unsigned char*) key,key_length, &state))
2359
 
      entry->s->mysql_version= DRIZZLE_VERSION_ID;
2360
 
  }
2361
 
err:
2362
 
  if (file >= 0)
2363
 
    my_close(file,MYF(MY_WME));
2364
 
  return(result);
2365
 
}
2366
 
 
2367
 
 
2368
 
 
2369
2376
/**
2370
2377
  @return
2371
2378
    key if error because of duplicated keys
2476
2483
  }
2477
2484
  if ((error= check(session, check_opt)))
2478
2485
    return error;
2479
 
  return update_frm_version(table);
 
2486
  return HA_ADMIN_OK;
2480
2487
}
2481
2488
 
2482
2489
/**
2524
2531
 
2525
2532
  if ((result= repair(session, check_opt)))
2526
2533
    return result;
2527
 
  return update_frm_version(table);
 
2534
  return HA_ADMIN_OK;
2528
2535
}
2529
2536
 
2530
2537
 
3078
3085
  int error= -1; // Table does not exist in any handler
3079
3086
  st_discover_args args= {db, name, frmblob, frmlen};
3080
3087
 
3081
 
  if (is_prefix(name,tmp_file_prefix)) /* skip temporary tables */
 
3088
  if (is_prefix(name, TMP_FILE_PREFIX)) /* skip temporary tables */
3082
3089
    return(error);
3083
3090
 
3084
3091
  if (plugin_foreach(session, discover_handlerton,
4279
4286
  - table is not mysql.event
4280
4287
*/
4281
4288
 
4282
 
static bool check_table_binlog_row_based(Session *session, Table *table)
4283
 
{
4284
 
  if (table->s->cached_row_logging_check == -1)
4285
 
  {
4286
 
    int const check(table->s->tmp_table == NO_TMP_TABLE &&
4287
 
                    binlog_filter->db_ok(table->s->db.str));
4288
 
    table->s->cached_row_logging_check= check;
4289
 
  }
4290
 
 
4291
 
  assert(table->s->cached_row_logging_check == 0 ||
4292
 
              table->s->cached_row_logging_check == 1);
4293
 
 
4294
 
  return (session->current_stmt_binlog_row_based &&
4295
 
          table->s->cached_row_logging_check &&
4296
 
          (session->options & OPTION_BIN_LOG) &&
4297
 
          mysql_bin_log.is_open());
4298
 
}
4299
 
 
4300
 
 
4301
 
/**
4302
 
   Write table maps for all (manually or automatically) locked tables
4303
 
   to the binary log.
4304
 
 
4305
 
   This function will generate and write table maps for all tables
4306
 
   that are locked by the thread 'session'.  Either manually locked
4307
 
   (stored in Session::locked_tables) and automatically locked (stored
4308
 
   in Session::lock) are considered.
4309
 
 
4310
 
   @param session     Pointer to Session structure
4311
 
 
4312
 
   @retval 0   All OK
4313
 
   @retval 1   Failed to write all table maps
4314
 
 
4315
 
   @sa
4316
 
       Session::lock
4317
 
       Session::locked_tables
4318
 
*/
4319
 
 
4320
 
static int write_locked_table_maps(Session *session)
4321
 
{
4322
 
  if (session->get_binlog_table_maps() == 0)
4323
 
  {
4324
 
    DRIZZLE_LOCK *locks[3];
4325
 
    locks[0]= session->extra_lock;
4326
 
    locks[1]= session->lock;
4327
 
    locks[2]= session->locked_tables;
4328
 
    for (uint32_t i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
4329
 
    {
4330
 
      DRIZZLE_LOCK const *const lock= locks[i];
4331
 
      if (lock == NULL)
4332
 
        continue;
4333
 
 
4334
 
      Table **const end_ptr= lock->table + lock->table_count;
4335
 
      for (Table **table_ptr= lock->table ;
4336
 
           table_ptr != end_ptr ;
4337
 
           ++table_ptr)
4338
 
      {
4339
 
        Table *const table= *table_ptr;
4340
 
        if (table->current_lock == F_WRLCK &&
4341
 
            check_table_binlog_row_based(session, table))
4342
 
        {
4343
 
          int const has_trans= table->file->has_transactions();
4344
 
          int const error= session->binlog_write_table_map(table, has_trans);
4345
 
          /*
4346
 
            If an error occurs, it is the responsibility of the caller to
4347
 
            roll back the transaction.
4348
 
          */
4349
 
          if (unlikely(error))
4350
 
            return(1);
4351
 
        }
4352
 
      }
4353
 
    }
4354
 
  }
4355
 
  return(0);
4356
 
}
4357
 
 
4358
 
 
4359
 
typedef bool Log_func(Session*, Table*, bool, const unsigned char*, const unsigned char*);
4360
 
 
4361
 
static int binlog_log_row(Table* table,
4362
 
                          const unsigned char *before_record,
4363
 
                          const unsigned char *after_record,
4364
 
                          Log_func *log_func)
4365
 
{
4366
 
  if (table->no_replicate)
4367
 
    return 0;
 
4289
static bool  binlog_log_row(Table* table,
 
4290
                            const unsigned char *before_record,
 
4291
                            const unsigned char *after_record)
 
4292
{
4368
4293
  bool error= 0;
4369
4294
  Session *const session= table->in_use;
4370
4295
 
4371
 
  if (check_table_binlog_row_based(session, table))
4372
 
  {
4373
 
    /*
4374
 
      If there are no table maps written to the binary log, this is
4375
 
      the first row handled in this statement. In that case, we need
4376
 
      to write table maps for all locked tables to the binary log.
 
4296
  if (table->no_replicate)
 
4297
    return 0;
 
4298
 
 
4299
  if (session->getReplicationData() == NULL)
 
4300
  {
 
4301
    error= replicator_session_init(session);
 
4302
  }
 
4303
 
 
4304
  switch (session->lex->sql_command)
 
4305
  {
 
4306
  case SQLCOM_REPLACE:
 
4307
  case SQLCOM_INSERT:
 
4308
  case SQLCOM_REPLACE_SELECT:
 
4309
  case SQLCOM_INSERT_SELECT:
 
4310
    error= replicator_write_row(session, table);
 
4311
    break;
 
4312
 
 
4313
  case SQLCOM_UPDATE:
 
4314
  case SQLCOM_UPDATE_MULTI:
 
4315
    error= replicator_update_row(session, table, before_record, after_record);
 
4316
    break;
 
4317
 
 
4318
  case SQLCOM_DELETE:
 
4319
  case SQLCOM_DELETE_MULTI:
 
4320
    error= replicator_delete_row(session, table);
 
4321
    break;
 
4322
 
 
4323
    /* 
 
4324
      For everything else we ignore the event (since it just involves a temp table)
4377
4325
    */
4378
 
    if (likely(!(error= write_locked_table_maps(session))))
4379
 
    {
4380
 
      bool const has_trans= table->file->has_transactions();
4381
 
      error= (*log_func)(session, table, has_trans, before_record, after_record);
4382
 
    }
 
4326
  default:
 
4327
    break;
4383
4328
  }
4384
 
  return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
 
4329
 
 
4330
  return error;
4385
4331
}
4386
4332
 
4387
4333
int handler::ha_external_lock(Session *session, int lock_type)
4430
4376
int handler::ha_write_row(unsigned char *buf)
4431
4377
{
4432
4378
  int error;
4433
 
  Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
4434
4379
  DRIZZLE_INSERT_ROW_START();
4435
4380
 
4436
4381
  mark_trx_read_write();
4437
4382
 
4438
4383
  if (unlikely(error= write_row(buf)))
4439
4384
    return(error);
4440
 
  if (unlikely(error= binlog_log_row(table, 0, buf, log_func)))
4441
 
    return(error); /* purecov: inspected */
 
4385
 
 
4386
  if (unlikely(binlog_log_row(table, 0, buf)))
 
4387
    return HA_ERR_RBR_LOGGING_FAILED; /* purecov: inspected */
 
4388
 
4442
4389
  DRIZZLE_INSERT_ROW_END();
4443
4390
  return(0);
4444
4391
}
4447
4394
int handler::ha_update_row(const unsigned char *old_data, unsigned char *new_data)
4448
4395
{
4449
4396
  int error;
4450
 
  Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
4451
4397
 
4452
4398
  /*
4453
4399
    Some storage engines require that the new record is in record[0]
4459
4405
 
4460
4406
  if (unlikely(error= update_row(old_data, new_data)))
4461
4407
    return error;
4462
 
  if (unlikely(error= binlog_log_row(table, old_data, new_data, log_func)))
4463
 
    return error;
 
4408
 
 
4409
  if (unlikely(binlog_log_row(table, old_data, new_data)))
 
4410
    return HA_ERR_RBR_LOGGING_FAILED;
 
4411
 
4464
4412
  return 0;
4465
4413
}
4466
4414
 
4467
4415
int handler::ha_delete_row(const unsigned char *buf)
4468
4416
{
4469
4417
  int error;
4470
 
  Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
4471
4418
 
4472
4419
  mark_trx_read_write();
4473
4420
 
4474
4421
  if (unlikely(error= delete_row(buf)))
4475
4422
    return error;
4476
 
  if (unlikely(error= binlog_log_row(table, buf, 0, log_func)))
4477
 
    return error;
 
4423
 
 
4424
  if (unlikely(binlog_log_row(table, buf, 0)))
 
4425
    return HA_ERR_RBR_LOGGING_FAILED;
 
4426
 
4478
4427
  return 0;
4479
4428
}
4480
4429