~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/zeromq/zeromq_log.cc

  • Committer: Marcus Eriksson
  • Date: 2011-08-08 18:51:23 UTC
  • mto: This revision was merged to the branch mainline in revision 2391.
  • Revision ID: marcuse@marcuse-laptop-20110808185123-32136r30h10d0irj
add schema as first frame and add some initial docs

Show diffs side-by-side

added added

removed removed

Lines of Context:
72
72
    return plugin::UNKNOWN_ERROR;
73
73
  }
74
74
 
 
75
  string schema= getSchemaName(to_apply);
 
76
  zmq_msg_t schemamsg;
 
77
  int rc= zmq_msg_init_size(&schemamsg, schema.length());
 
78
  memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
 
79
 
75
80
  to_apply.SerializeWithCachedSizesToArray(buffer);
76
81
  zmq_msg_t msg;
77
 
  int rc= zmq_msg_init_size (&msg, message_byte_length);
 
82
  rc= zmq_msg_init_size(&msg, message_byte_length);
78
83
  assert (rc == 0);
79
84
  memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
80
85
 
 
86
  // need a mutex around this since several threads can call this method at the same time
81
87
  pthread_mutex_lock(&publishLock);
82
 
  rc= zmq_send (_socket, &msg, 0);
 
88
  rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
 
89
  rc= zmq_send(_socket, &msg, 0);
83
90
  pthread_mutex_unlock(&publishLock);
84
91
 
85
 
  zmq_msg_close (&msg);
 
92
  zmq_msg_close(&msg);
 
93
  zmq_msg_close(&schemamsg);
86
94
  delete[] buffer;
87
95
  return plugin::SUCCESS;
88
96
}
89
97
 
 
98
string ZeroMQLog::getSchemaName(const message::Transaction &txn) {
 
99
  if(txn.statement_size() == 0) return "";
 
100
 
 
101
  const message::Statement &statement= txn.statement(0);
 
102
 
 
103
  switch(statement.type())
 
104
  {
 
105
        case message::Statement::INSERT:
 
106
          return statement.insert_header().table_metadata().schema_name();
 
107
        case message::Statement::UPDATE:
 
108
          return statement.update_header().table_metadata().schema_name();
 
109
        case message::Statement::DELETE:
 
110
          return statement.delete_header().table_metadata().schema_name();
 
111
        case message::Statement::CREATE_TABLE:
 
112
          return statement.create_table_statement().table().schema();
 
113
        case message::Statement::TRUNCATE_TABLE:
 
114
          return statement.truncate_table_statement().table_metadata().schema_name();
 
115
        case message::Statement::DROP_TABLE:
 
116
          return statement.drop_table_statement().table_metadata().schema_name();
 
117
        case message::Statement::CREATE_SCHEMA:
 
118
          return statement.create_schema_statement().schema().name();
 
119
        case message::Statement::DROP_SCHEMA:
 
120
          return statement.drop_schema_statement().schema_name();
 
121
    default:
 
122
          return "";
 
123
  }
 
124
}
 
125
 
90
126
static ZeroMQLog *zeromqLogger; ///< the actual plugin
91
127
 
92
128
/**