~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/handler.cc

  • Committer: Brian Aker
  • Date: 2008-12-06 02:02:19 UTC
  • Revision ID: brian@tangent.org-20081206020219-61o4ij7tsms6qwna
First major pass through new replication.

Show diffs side-by-side

added added

removed removed

Lines of Context:
35
35
#include CMATH_H
36
36
#include <drizzled/session.h>
37
37
#include <drizzled/sql_base.h>
 
38
#include <drizzled/replicator.h>
38
39
 
39
40
#if defined(CMATH_NAMESPACE)
40
41
using namespace CMATH_NAMESPACE;
4285
4286
  - table is not mysql.event
4286
4287
*/
4287
4288
 
4288
 
static bool check_table_binlog_row_based(Session *session, Table *table)
4289
 
{
4290
 
  if (table->s->cached_row_logging_check == -1)
4291
 
  {
4292
 
    int const check(table->s->tmp_table == NO_TMP_TABLE);
4293
 
    table->s->cached_row_logging_check= check;
4294
 
  }
4295
 
 
4296
 
  assert(table->s->cached_row_logging_check == 0 ||
4297
 
              table->s->cached_row_logging_check == 1);
4298
 
 
4299
 
  return (table->s->cached_row_logging_check &&
4300
 
          (session->options & OPTION_BIN_LOG) &&
4301
 
          drizzle_bin_log.is_open());
4302
 
}
4303
 
 
4304
 
 
4305
 
/**
4306
 
   Write table maps for all (manually or automatically) locked tables
4307
 
   to the binary log.
4308
 
 
4309
 
   This function will generate and write table maps for all tables
4310
 
   that are locked by the thread 'session'.  Either manually locked
4311
 
   (stored in Session::locked_tables) and automatically locked (stored
4312
 
   in Session::lock) are considered.
4313
 
 
4314
 
   @param session     Pointer to Session structure
4315
 
 
4316
 
   @retval 0   All OK
4317
 
   @retval 1   Failed to write all table maps
4318
 
 
4319
 
   @sa
4320
 
       Session::lock
4321
 
       Session::locked_tables
4322
 
*/
4323
 
 
4324
 
static int write_locked_table_maps(Session *session)
4325
 
{
4326
 
  if (session->get_binlog_table_maps() == 0)
4327
 
  {
4328
 
    DRIZZLE_LOCK *locks[3];
4329
 
    locks[0]= session->extra_lock;
4330
 
    locks[1]= session->lock;
4331
 
    locks[2]= session->locked_tables;
4332
 
    for (uint32_t i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
4333
 
    {
4334
 
      DRIZZLE_LOCK const *const lock= locks[i];
4335
 
      if (lock == NULL)
4336
 
        continue;
4337
 
 
4338
 
      Table **const end_ptr= lock->table + lock->table_count;
4339
 
      for (Table **table_ptr= lock->table ;
4340
 
           table_ptr != end_ptr ;
4341
 
           ++table_ptr)
4342
 
      {
4343
 
        Table *const table= *table_ptr;
4344
 
        if (table->current_lock == F_WRLCK &&
4345
 
            check_table_binlog_row_based(session, table))
4346
 
        {
4347
 
          int const has_trans= table->file->has_transactions();
4348
 
          int const error= session->binlog_write_table_map(table, has_trans);
4349
 
          /*
4350
 
            If an error occurs, it is the responsibility of the caller to
4351
 
            roll back the transaction.
4352
 
          */
4353
 
          if (unlikely(error))
4354
 
            return(1);
4355
 
        }
4356
 
      }
4357
 
    }
4358
 
  }
4359
 
  return(0);
4360
 
}
4361
 
 
4362
 
 
4363
4289
typedef bool Log_func(Session*, Table*, bool, const unsigned char*, const unsigned char*);
4364
4290
 
4365
4291
static int binlog_log_row(Table* table,
4366
4292
                          const unsigned char *before_record,
4367
 
                          const unsigned char *after_record,
4368
 
                          Log_func *log_func)
 
4293
                          const unsigned char *after_record)
4369
4294
{
4370
 
  if (table->no_replicate)
4371
 
    return 0;
4372
 
 
4373
4295
  bool error= 0;
4374
4296
  Session *const session= table->in_use;
4375
4297
 
4376
 
  if (check_table_binlog_row_based(session, table))
4377
 
  {
4378
 
    /*
4379
 
      If there are no table maps written to the binary log, this is
4380
 
      the first row handled in this statement. In that case, we need
4381
 
      to write table maps for all locked tables to the binary log.
4382
 
    */
4383
 
    if (likely(!(error= write_locked_table_maps(session))))
4384
 
    {
4385
 
      bool const has_trans= table->file->has_transactions();
4386
 
      error= (*log_func)(session, table, has_trans, before_record, after_record);
4387
 
    }
 
4298
  if (table->no_replicate)
 
4299
    return 0;
 
4300
 
 
4301
  if (session->getReplicationData() == NULL)
 
4302
  {
 
4303
    error= replicator_session_init(session);
 
4304
  }
 
4305
 
 
4306
  switch (session->lex->sql_command)
 
4307
  {
 
4308
  case SQLCOM_REPLACE:
 
4309
  case SQLCOM_INSERT:
 
4310
  case SQLCOM_REPLACE_SELECT:
 
4311
  case SQLCOM_INSERT_SELECT:
 
4312
    error= replicator_write_row(session, table);
 
4313
    break;
 
4314
 
 
4315
  case SQLCOM_UPDATE:
 
4316
  case SQLCOM_UPDATE_MULTI:
 
4317
    error= replicator_update_row(session, table, before_record, after_record);
 
4318
    break;
 
4319
 
 
4320
  case SQLCOM_DELETE:
 
4321
  case SQLCOM_DELETE_MULTI:
 
4322
    error= replicator_delete_row(session, table);
 
4323
    break;
 
4324
 
 
4325
  default:
 
4326
    break;
4388
4327
  }
4389
4328
 
4390
4329
  return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
4436
4375
int handler::ha_write_row(unsigned char *buf)
4437
4376
{
4438
4377
  int error;
4439
 
  Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
4440
4378
  DRIZZLE_INSERT_ROW_START();
4441
4379
 
4442
4380
  mark_trx_read_write();
4443
4381
 
4444
4382
  if (unlikely(error= write_row(buf)))
4445
4383
    return(error);
4446
 
  if (unlikely(error= binlog_log_row(table, 0, buf, log_func)))
 
4384
  if (unlikely(error= binlog_log_row(table, 0, buf)))
4447
4385
    return(error); /* purecov: inspected */
4448
4386
  DRIZZLE_INSERT_ROW_END();
4449
4387
  return(0);
4453
4391
int handler::ha_update_row(const unsigned char *old_data, unsigned char *new_data)
4454
4392
{
4455
4393
  int error;
4456
 
  Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
4457
4394
 
4458
4395
  /*
4459
4396
    Some storage engines require that the new record is in record[0]
4465
4402
 
4466
4403
  if (unlikely(error= update_row(old_data, new_data)))
4467
4404
    return error;
4468
 
  if (unlikely(error= binlog_log_row(table, old_data, new_data, log_func)))
 
4405
  if (unlikely(error= binlog_log_row(table, old_data, new_data)))
4469
4406
    return error;
4470
4407
  return 0;
4471
4408
}
4473
4410
int handler::ha_delete_row(const unsigned char *buf)
4474
4411
{
4475
4412
  int error;
4476
 
  Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
4477
4413
 
4478
4414
  mark_trx_read_write();
4479
4415
 
4480
4416
  if (unlikely(error= delete_row(buf)))
4481
4417
    return error;
4482
 
  if (unlikely(error= binlog_log_row(table, buf, 0, log_func)))
 
4418
  if (unlikely(error= binlog_log_row(table, buf, 0)))
4483
4419
    return error;
4484
4420
  return 0;
4485
4421
}