2
* Copyright (C) 2010 Djellel Eddine Difallah
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
8
* * Redistributions of source code must retain the above copyright notice,
9
* this list of conditions and the following disclaimer.
10
* * Redistributions in binary form must reproduce the above copyright notice,
11
* this list of conditions and the following disclaimer in the documentation
12
* and/or other materials provided with the distribution.
13
* * Neither the name of Djellel Eddine Difallah nor the names of its contributors
14
* may be used to endorse or promote products derived from this software
15
* without specific prior written permission.
17
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
27
* THE POSSIBILITY OF SUCH DAMAGE.
34
#include <drizzled/plugin.h>
35
#include <drizzled/session.h>
36
#include <drizzled/select_send.h>
37
#include <drizzled/item/null.h>
44
#include "memcached_qc.h"
45
#include "query_cache_udf_tools.h"
46
#include "data_dictionary_schema.h"
47
#include "invalidator.h"
48
#include <boost/program_options.hpp>
49
#include <drizzled/module/option_map.h>
51
using namespace drizzled;
53
namespace po= boost::program_options;
55
static uint64_constraint expiry_time;
57
memcache::Memcache* MemcachedQueryCache::client;
58
std::string MemcachedQueryCache::memcached_servers;
59
bool sysvar_memcached_qc_enable;
61
bool MemcachedQueryCache::isSelect(string query)
65
Skip '(' characters in queries like following:
66
(select a from t1) union (select a from t1);
68
const char* sql= query.c_str();
72
Test if the query is a SELECT
73
(pre-space is removed in dispatch_command).
74
First '/' looks like comment before command it is not
75
frequently appeared in real life, consequently we can
76
check all such queries, too.
78
if ((my_toupper(system_charset_info, sql[i]) != 'S' ||
79
my_toupper(system_charset_info, sql[i + 1]) != 'E' ||
80
my_toupper(system_charset_info, sql[i + 2]) != 'L') &&
88
bool MemcachedQueryCache::doIsCached(Session *session)
90
if (sysvar_memcached_qc_enable && isSelect(session->query))
92
/* ToDo: Check against the cache content */
93
string query= session->query + *session->schema();
94
char* key= md5_key(query.c_str());
95
if(queryCacheService.isCached(key))
97
session->query_cache_key.assign(key);
106
bool MemcachedQueryCache::doSendCachedResultset(Session *session)
108
/** TODO: pay attention to the case where the cache value is empty
109
* ie: there is a session in the process of caching the query
110
* and didn't finish the work
113
LEX *lex= session->lex;
114
register Select_Lex *select_lex= &lex->select_lex;
115
select_result *result=lex->result;
116
if (not result && not (result= new select_send()))
118
result->prepare(select_lex->item_list, select_lex->master_unit());
120
/* fetching the resultset from memcached */
121
vector<char> raw_resultset;
122
getClient()->get(session->query_cache_key, raw_resultset);
123
if(raw_resultset.empty())
125
message::Resultset resultset_message;
126
if (not resultset_message.ParseFromString(string(raw_resultset.begin(),raw_resultset.end())))
128
List<Item> item_list;
130
/* Send the fields */
131
message::SelectHeader header= resultset_message.select_header();
132
size_t num_fields= header.field_meta_size();
133
for (size_t y= 0; y < num_fields; y++)
135
message::FieldMeta field= header.field_meta(y);
136
string value=field.field_alias();
137
item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
139
result->send_fields(item_list);
143
message::SelectData data= resultset_message.select_data();
144
session->limit_found_rows= 0;
145
for (int j= 0; j < data.record_size(); j++)
147
message::SelectRecord record= data.record(j);
148
for (size_t y= 0; y < num_fields; y++)
150
if(record.is_null(y))
152
item_list.push_back(new Item_null());
156
string value=record.record_value(y);
157
item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
160
result->send_data(item_list);
163
/* Send End of file */
165
/* reset the cache key at the session level */
166
session->query_cache_key= "";
170
/* Check if the tables in the query do not contain
173
void MemcachedQueryCache::checkTables(Session *session, TableList* in_table)
175
for (TableList* tmp_table= in_table; tmp_table; tmp_table= tmp_table->next_global)
177
if (strcasecmp(tmp_table->db, "DATA_DICTIONARY") == 0)
179
session->lex().setCacheable(false);
185
/* init the current resultset in the session
186
* set the header message (hashkey= sql + schema)
188
bool MemcachedQueryCache::doPrepareResultset(Session *session)
190
checkTables(session, session->lex().query_tables);
191
if (sysvar_memcached_qc_enable && session->lex().isCacheable())
193
/* Prepare and set the key for the session */
194
string query= session->query + *session->schema();
195
char* key= md5_key(query.c_str());
197
/* make sure only one thread will cache the query
198
* if executed concurently
200
pthread_mutex_lock(&mutex);
202
if(not queryCacheService.isCached(key))
204
session->query_cache_key.assign(key);
207
/* create the Resultset */
208
message::Resultset *resultset= queryCacheService.setCurrentResultsetMessage(session);
210
/* setting the resultset infos */
211
resultset->set_key(session->query_cache_key);
212
resultset->set_schema(*session->schema());
213
resultset->set_sql(session->query);
214
pthread_mutex_unlock(&mutex);
218
pthread_mutex_unlock(&mutex);
224
/* Send the current resultset to memcached
225
* Reset the current resultset of the session
227
bool MemcachedQueryCache::doSetResultset(Session *session)
229
message::Resultset *resultset= session->getResultsetMessage();
230
if (sysvar_memcached_qc_enable && (not session->is_error()) && resultset != NULL && session->lex().isCacheable())
232
/* Generate the final Header */
233
queryCacheService.setResultsetHeader(*resultset, session, session->lex().query_tables);
234
/* serialize the Resultset Message */
236
resultset->SerializeToString(&output);
238
/* setting to memecahced */
239
time_t expiry= expiry_time; // ToDo: add a user defined expiry
241
std::vector<char> raw(output.size());
242
memcpy(&raw[0], output.c_str(), output.size());
243
if(not client->set(session->query_cache_key, raw, expiry, flags))
246
session->resetResultsetMessage();
250
/* Clear the Selectdata from the Resultset to be localy cached
251
* Comment if Keeping the data in the header is needed
253
resultset->clear_select_data();
255
/* add the Resultset (including the header) to the hash
256
* This is done after the memcached set
258
queryCacheService.cache[session->query_cache_key]= *resultset;
260
/* endup the current statement */
262
session->resetResultsetMessage();
268
/* Adds a record (List<Item>) to the current Resultset.SelectData
270
bool MemcachedQueryCache::doInsertRecord(Session *session, List<Item> &list)
272
if(sysvar_memcached_qc_enable)
274
queryCacheService.addRecord(session, list);
280
char* MemcachedQueryCache::md5_key(const char *str)
282
int msg_len= strlen(str);
283
/* Length of resulting sha1 hash - gcry_md_get_algo_dlen
284
* returns digest lenght for an algo */
285
int hash_len= gcry_md_get_algo_dlen( GCRY_MD_MD5 );
286
/* output sha1 hash - this will be binary data */
287
unsigned char* hash= (unsigned char*) malloc(hash_len);
288
/* output sha1 hash - converted to hex representation
289
* 2 hex digits for every byte + 1 for trailing \0 */
290
char *out= (char *) malloc( sizeof(char) * ((hash_len*2)+1) );
292
/* calculate the SHA1 digest. This is a bit of a shortcut function
293
* most gcrypt operations require the creation of a handle, etc. */
294
gcry_md_hash_buffer( GCRY_MD_MD5, hash, str , msg_len );
295
/* Convert each byte to its 2 digit ascii
296
* hex representation and place in out */
298
for ( i = 0; i < hash_len; i++, p += 2 )
300
snprintf ( p, 3, "%02x", hash[i] );
306
/** User Defined Function print_query_cache_meta **/
307
extern plugin::Create_function<PrintQueryCacheMetaFunction> *print_query_cache_meta_func_factory;
308
plugin::Create_function<QueryCacheFlushFunction> *query_cache_flush_func= NULL;
310
/** DATA_DICTIONARY views */
311
static QueryCacheTool *query_cache_tool;
312
static QueryCacheStatusTool *query_cache_status;
313
static CachedTables *query_cached_tables;
315
static int init(module::Context &context)
317
const module::option_map &vm= context.getOptions();
319
MemcachedQueryCache* memc= new MemcachedQueryCache("Memcached_Query_Cache", vm["servers"].as<string>());
322
Invalidator* invalidator= new Invalidator("Memcached_Query_Cache_Invalidator");
323
context.add(invalidator);
324
ReplicationServices &replication_services= ReplicationServices::singleton();
325
string replicator_name("default_replicator");
326
replication_services.attachApplier(invalidator, replicator_name);
328
/* Setup the module's UDFs */
329
print_query_cache_meta_func_factory=
330
new plugin::Create_function<PrintQueryCacheMetaFunction>("print_query_cache_meta");
331
context.add(print_query_cache_meta_func_factory);
333
query_cache_flush_func= new plugin::Create_function<QueryCacheFlushFunction>("query_cache_flush");
334
context.add(query_cache_flush_func);
336
/* Setup the module Data dict and status infos */
337
query_cache_tool= new QueryCacheTool();
338
context.add(query_cache_tool);
339
query_cache_status= new QueryCacheStatusTool();
340
context.add(query_cache_status);
341
query_cached_tables= new CachedTables();
342
context.add(query_cached_tables);
344
context.registerVariable(new sys_var_constrained_value<uint64_t>("expiry", expiry_time));
345
context.registerVariable(new sys_var_const_string_val("servers", vm["servers"].as<string>()));
346
context.registerVariable(new sys_var_bool_ptr("enable", &sysvar_memcached_qc_enable));
350
QueryCacheStatusTool::Generator::Generator(drizzled::Field **fields) :
351
plugin::TableFunction::Generator(fields)
353
status_var_ptr= vars;
356
bool QueryCacheStatusTool::Generator::populate()
363
push((*status_var_ptr)->name);
364
if (strcmp((**status_var_ptr).name, "enable") == 0)
365
return_value= sysvar_memcached_qc_enable ? "ON" : "OFF";
366
if (strcmp((**status_var_ptr).name, "servers") == 0)
367
return_value= MemcachedQueryCache::getServers();
368
if (strcmp((**status_var_ptr).name, "expiry") == 0)
369
return_value= boost::lexical_cast<std::string>(expiry_time);
372
if (return_value.length())
384
static void init_options(drizzled::module::option_context &context)
387
po::value<string>()->default_value("127.0.0.1:11211"),
388
_("List of memcached servers."));
390
po::value<uint64_constraint>(&expiry_time)->default_value(1000),
391
_("Expiry time of memcached entries"));
393
po::value<bool>(&sysvar_memcached_qc_enable)->default_value(false)->zero_tokens(),
394
_("Enable Memcached Query Cache"));
397
DRIZZLE_DECLARE_PLUGIN
402
"Djellel Eddine Difallah",
403
"Caches Select resultsets in Memcached",
405
init, /* Plugin Init */
407
init_options /* config options */
409
DRIZZLE_DECLARE_PLUGIN_END;