~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/handler.cc

  • Committer: Monty Taylor
  • Date: 2008-12-06 23:17:26 UTC
  • mfrom: (664 drizzle)
  • mto: This revision was merged to the branch mainline in revision 665.
  • Revision ID: monty@inaugust.com-20081206231726-ut9ntnazs3rega56
Merged with trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
35
35
#include CMATH_H
36
36
#include <drizzled/session.h>
37
37
#include <drizzled/sql_base.h>
 
38
#include <drizzled/replicator.h>
38
39
 
39
40
#if defined(CMATH_NAMESPACE)
40
41
using namespace CMATH_NAMESPACE;
4286
4287
  - table is not mysql.event
4287
4288
*/
4288
4289
 
4289
 
static bool check_table_binlog_row_based(Session *session, Table *table)
4290
 
{
4291
 
  if (table->s->cached_row_logging_check == -1)
4292
 
  {
4293
 
    int const check(table->s->tmp_table == NO_TMP_TABLE);
4294
 
    table->s->cached_row_logging_check= check;
4295
 
  }
4296
 
 
4297
 
  assert(table->s->cached_row_logging_check == 0 ||
4298
 
              table->s->cached_row_logging_check == 1);
4299
 
 
4300
 
  return (table->s->cached_row_logging_check &&
4301
 
          (session->options & OPTION_BIN_LOG) &&
4302
 
          drizzle_bin_log.is_open());
4303
 
}
4304
 
 
4305
 
 
4306
 
/**
4307
 
   Write table maps for all (manually or automatically) locked tables
4308
 
   to the binary log.
4309
 
 
4310
 
   This function will generate and write table maps for all tables
4311
 
   that are locked by the thread 'session'.  Either manually locked
4312
 
   (stored in Session::locked_tables) and automatically locked (stored
4313
 
   in Session::lock) are considered.
4314
 
 
4315
 
   @param session     Pointer to Session structure
4316
 
 
4317
 
   @retval 0   All OK
4318
 
   @retval 1   Failed to write all table maps
4319
 
 
4320
 
   @sa
4321
 
       Session::lock
4322
 
       Session::locked_tables
4323
 
*/
4324
 
 
4325
 
static int write_locked_table_maps(Session *session)
4326
 
{
4327
 
  if (session->get_binlog_table_maps() == 0)
4328
 
  {
4329
 
    DRIZZLE_LOCK *locks[3];
4330
 
    locks[0]= session->extra_lock;
4331
 
    locks[1]= session->lock;
4332
 
    locks[2]= session->locked_tables;
4333
 
    for (uint32_t i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
4334
 
    {
4335
 
      DRIZZLE_LOCK const *const lock= locks[i];
4336
 
      if (lock == NULL)
4337
 
        continue;
4338
 
 
4339
 
      Table **const end_ptr= lock->table + lock->table_count;
4340
 
      for (Table **table_ptr= lock->table ;
4341
 
           table_ptr != end_ptr ;
4342
 
           ++table_ptr)
4343
 
      {
4344
 
        Table *const table= *table_ptr;
4345
 
        if (table->current_lock == F_WRLCK &&
4346
 
            check_table_binlog_row_based(session, table))
4347
 
        {
4348
 
          int const has_trans= table->file->has_transactions();
4349
 
          int const error= session->binlog_write_table_map(table, has_trans);
4350
 
          /*
4351
 
            If an error occurs, it is the responsibility of the caller to
4352
 
            roll back the transaction.
4353
 
          */
4354
 
          if (unlikely(error))
4355
 
            return(1);
4356
 
        }
4357
 
      }
4358
 
    }
4359
 
  }
4360
 
  return(0);
4361
 
}
4362
 
 
4363
 
 
4364
 
typedef bool Log_func(Session*, Table*, bool, const unsigned char*, const unsigned char*);
4365
 
 
4366
 
static int binlog_log_row(Table* table,
4367
 
                          const unsigned char *before_record,
4368
 
                          const unsigned char *after_record,
4369
 
                          Log_func *log_func)
4370
 
{
4371
 
  if (table->no_replicate)
4372
 
    return 0;
4373
 
 
 
4290
static bool  binlog_log_row(Table* table,
 
4291
                            const unsigned char *before_record,
 
4292
                            const unsigned char *after_record)
 
4293
{
4374
4294
  bool error= 0;
4375
4295
  Session *const session= table->in_use;
4376
4296
 
4377
 
  if (check_table_binlog_row_based(session, table))
4378
 
  {
4379
 
    /*
4380
 
      If there are no table maps written to the binary log, this is
4381
 
      the first row handled in this statement. In that case, we need
4382
 
      to write table maps for all locked tables to the binary log.
 
4297
  if (table->no_replicate)
 
4298
    return 0;
 
4299
 
 
4300
  if (session->getReplicationData() == NULL)
 
4301
  {
 
4302
    error= replicator_session_init(session);
 
4303
  }
 
4304
 
 
4305
  switch (session->lex->sql_command)
 
4306
  {
 
4307
  case SQLCOM_REPLACE:
 
4308
  case SQLCOM_INSERT:
 
4309
  case SQLCOM_REPLACE_SELECT:
 
4310
  case SQLCOM_INSERT_SELECT:
 
4311
    error= replicator_write_row(session, table);
 
4312
    break;
 
4313
 
 
4314
  case SQLCOM_UPDATE:
 
4315
  case SQLCOM_UPDATE_MULTI:
 
4316
    error= replicator_update_row(session, table, before_record, after_record);
 
4317
    break;
 
4318
 
 
4319
  case SQLCOM_DELETE:
 
4320
  case SQLCOM_DELETE_MULTI:
 
4321
    error= replicator_delete_row(session, table);
 
4322
    break;
 
4323
 
 
4324
    /* 
 
4325
      For everything else we ignore the event (since it just involves a temp table)
4383
4326
    */
4384
 
    if (likely(!(error= write_locked_table_maps(session))))
4385
 
    {
4386
 
      bool const has_trans= table->file->has_transactions();
4387
 
      error= (*log_func)(session, table, has_trans, before_record, after_record);
4388
 
    }
 
4327
  default:
 
4328
    break;
4389
4329
  }
4390
4330
 
4391
 
  return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
 
4331
  return error;
4392
4332
}
4393
4333
 
4394
4334
int handler::ha_external_lock(Session *session, int lock_type)
4437
4377
int handler::ha_write_row(unsigned char *buf)
4438
4378
{
4439
4379
  int error;
4440
 
  Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
4441
4380
  DRIZZLE_INSERT_ROW_START();
4442
4381
 
4443
4382
  mark_trx_read_write();
4444
4383
 
4445
4384
  if (unlikely(error= write_row(buf)))
4446
4385
    return(error);
4447
 
  if (unlikely(error= binlog_log_row(table, 0, buf, log_func)))
4448
 
    return(error); /* purecov: inspected */
 
4386
 
 
4387
  if (unlikely(binlog_log_row(table, 0, buf)))
 
4388
    return HA_ERR_RBR_LOGGING_FAILED; /* purecov: inspected */
 
4389
 
4449
4390
  DRIZZLE_INSERT_ROW_END();
4450
4391
  return(0);
4451
4392
}
4454
4395
int handler::ha_update_row(const unsigned char *old_data, unsigned char *new_data)
4455
4396
{
4456
4397
  int error;
4457
 
  Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
4458
4398
 
4459
4399
  /*
4460
4400
    Some storage engines require that the new record is in record[0]
4466
4406
 
4467
4407
  if (unlikely(error= update_row(old_data, new_data)))
4468
4408
    return error;
4469
 
  if (unlikely(error= binlog_log_row(table, old_data, new_data, log_func)))
4470
 
    return error;
 
4409
 
 
4410
  if (unlikely(binlog_log_row(table, old_data, new_data)))
 
4411
    return HA_ERR_RBR_LOGGING_FAILED;
 
4412
 
4471
4413
  return 0;
4472
4414
}
4473
4415
 
4474
4416
int handler::ha_delete_row(const unsigned char *buf)
4475
4417
{
4476
4418
  int error;
4477
 
  Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
4478
4419
 
4479
4420
  mark_trx_read_write();
4480
4421
 
4481
4422
  if (unlikely(error= delete_row(buf)))
4482
4423
    return error;
4483
 
  if (unlikely(error= binlog_log_row(table, buf, 0, log_func)))
4484
 
    return error;
 
4424
 
 
4425
  if (unlikely(binlog_log_row(table, buf, 0)))
 
4426
    return HA_ERR_RBR_LOGGING_FAILED;
 
4427
 
4485
4428
  return 0;
4486
4429
}
4487
4430