72
72
return plugin::UNKNOWN_ERROR;
75
string schema= getSchemaName(to_apply);
77
int rc= zmq_msg_init_size(&schemamsg, schema.length());
78
memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
75
80
to_apply.SerializeWithCachedSizesToArray(buffer);
77
int rc= zmq_msg_init_size (&msg, message_byte_length);
82
rc= zmq_msg_init_size(&msg, message_byte_length);
79
84
memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
86
// need a mutex around this since several threads can call this method at the same time
81
87
pthread_mutex_lock(&publishLock);
82
rc= zmq_send (_socket, &msg, 0);
88
rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
89
rc= zmq_send(_socket, &msg, 0);
83
90
pthread_mutex_unlock(&publishLock);
93
zmq_msg_close(&schemamsg);
87
95
return plugin::SUCCESS;
98
string ZeroMQLog::getSchemaName(const message::Transaction &txn) {
99
if(txn.statement_size() == 0) return "";
101
const message::Statement &statement= txn.statement(0);
103
switch(statement.type())
105
case message::Statement::INSERT:
106
return statement.insert_header().table_metadata().schema_name();
107
case message::Statement::UPDATE:
108
return statement.update_header().table_metadata().schema_name();
109
case message::Statement::DELETE:
110
return statement.delete_header().table_metadata().schema_name();
111
case message::Statement::CREATE_TABLE:
112
return statement.create_table_statement().table().schema();
113
case message::Statement::TRUNCATE_TABLE:
114
return statement.truncate_table_statement().table_metadata().schema_name();
115
case message::Statement::DROP_TABLE:
116
return statement.drop_table_statement().table_metadata().schema_name();
117
case message::Statement::CREATE_SCHEMA:
118
return statement.create_schema_statement().schema().name();
119
case message::Statement::DROP_SCHEMA:
120
return statement.drop_schema_statement().schema_name();
90
126
static ZeroMQLog *zeromqLogger; ///< the actual plugin