2
* Copyright (c) 2010, Djellel Eddine Difallah
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
8
* * Redistributions of source code must retain the above copyright notice,
9
* this list of conditions and the following disclaimer.
10
* * Redistributions in binary form must reproduce the above copyright notice,
11
* this list of conditions and the following disclaimer in the documentation
12
* and/or other materials provided with the distribution.
13
* * Neither the name of Djellel Eddine Difallah nor the names of its contributors
14
* may be used to endorse or promote products derived from this software
15
* without specific prior written permission.
17
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
27
* THE POSSIBILITY OF SUCH DAMAGE.
34
#include "drizzled/plugin.h"
35
#include "drizzled/session.h"
36
#include "drizzled/select_send.h"
37
#include "drizzled/item/null.h"
44
#include "memcached_qc.h"
45
#include "query_cache_udf_tools.h"
46
#include "data_dictionary_schema.h"
47
#include "invalidator.h"
48
#include <boost/program_options.hpp>
49
#include <drizzled/module/option_map.h>
51
using namespace drizzled;
53
namespace po= boost::program_options;
55
static char* sysvar_memcached_servers= NULL;
56
static ulong expiry_time;
58
memcache::Memcache* MemcachedQueryCache::client;
59
std::string MemcachedQueryCache::memcached_servers;
61
static DRIZZLE_SessionVAR_BOOL(enable,
62
PLUGIN_VAR_SessionLOCAL,
63
"Enable Memcached Query Cache",
64
/* check_func */ NULL,
65
/* update_func */ NULL,
68
static int check_memc_servers(Session *,
73
char buff[STRING_BUFFER_USUAL_SIZE];
74
int len= sizeof(buff);
75
const char *input= value->val_str(value, buff, &len);
79
MemcachedQueryCache::setServers(input);
80
*(bool *) save= (bool) true;
84
*(bool *) save= (bool) false;
88
static void set_memc_servers(Session *,
93
if (*(bool *) save != false)
95
*(const char **) var_ptr= MemcachedQueryCache::getServers();
99
static DRIZZLE_SYSVAR_STR(servers,
100
sysvar_memcached_servers,
102
N_("List of memcached servers."),
103
check_memc_servers, /* check func */
104
set_memc_servers, /* update func */
105
"127.0.0.1:11211"); /* default value */
107
static DRIZZLE_SYSVAR_ULONG(expiry,
110
"Expiry time of memcached entries",
111
NULL, NULL, 1000, 0, ~0L, 0);
113
bool MemcachedQueryCache::isSelect(string query)
117
Skip '(' characters in queries like following:
118
(select a from t1) union (select a from t1);
120
const char* sql= query.c_str();
121
while (sql[i] == '(')
124
Test if the query is a SELECT
125
(pre-space is removed in dispatch_command).
126
First '/' looks like comment before command it is not
127
frequently appeared in real life, consequently we can
128
check all such queries, too.
130
if ((my_toupper(system_charset_info, sql[i]) != 'S' ||
131
my_toupper(system_charset_info, sql[i + 1]) != 'E' ||
132
my_toupper(system_charset_info, sql[i + 2]) != 'L') &&
140
bool MemcachedQueryCache::doIsCached(Session *session)
142
if (SessionVAR(session, enable) && isSelect(session->query))
144
/* ToDo: Check against the cache content */
145
string query= session->query+session->db;
146
char* key= md5_key(query.c_str());
147
if(queryCacheService.isCached(key))
149
session->query_cache_key.assign(key);
158
bool MemcachedQueryCache::doSendCachedResultset(Session *session)
160
/** TODO: pay attention to the case where the cache value is empty
161
* ie: there is a session in the process of caching the query
162
* and didn't finish the work
165
LEX *lex= session->lex;
166
register Select_Lex *select_lex= &lex->select_lex;
167
select_result *result=lex->result;
168
if (not result && not (result= new select_send()))
170
result->prepare(select_lex->item_list, select_lex->master_unit());
172
/* fetching the resultset from memcached */
173
vector<char> raw_resultset;
174
getClient()->get(session->query_cache_key, raw_resultset);
175
if(raw_resultset.empty())
177
message::Resultset resultset_message;
178
if (not resultset_message.ParseFromString(string(raw_resultset.begin(),raw_resultset.end())))
180
List<Item> item_list;
182
/* Send the fields */
183
message::SelectHeader header= resultset_message.select_header();
184
size_t num_fields= header.field_meta_size();
185
for (size_t y= 0; y < num_fields; y++)
187
message::FieldMeta field= header.field_meta(y);
188
string value=field.field_alias();
189
item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
191
result->send_fields(item_list);
195
message::SelectData data= resultset_message.select_data();
196
session->limit_found_rows= 0;
197
for (int j= 0; j < data.record_size(); j++)
199
message::SelectRecord record= data.record(j);
200
for (size_t y= 0; y < num_fields; y++)
202
if(record.is_null(y))
204
item_list.push_back(new Item_null());
208
string value=record.record_value(y);
209
item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
212
result->send_data(item_list);
215
/* Send End of file */
217
/* reset the cache key at the session level */
218
session->query_cache_key= "";
222
/* Check if the tables in the query do not contain
225
void MemcachedQueryCache::checkTables(Session *session, TableList* in_table)
227
for (TableList* tmp_table= in_table; tmp_table; tmp_table= tmp_table->next_global)
229
if (strcasecmp(tmp_table->db, "DATA_DICTIONARY") == 0)
231
session->lex->setCacheable(false);
237
/* init the current resultset in the session
238
* set the header message (hashkey= sql + schema)
240
bool MemcachedQueryCache::doPrepareResultset(Session *session)
242
checkTables(session, session->lex->query_tables);
243
if (SessionVAR(session, enable) && session->lex->isCacheable())
245
/* Prepare and set the key for the session */
246
string query= session->query+session->db;
247
char* key= md5_key(query.c_str());
249
/* make sure only one thread will cache the query
250
* if executed concurently
252
pthread_mutex_lock(&mutex);
254
if(not queryCacheService.isCached(key))
256
session->query_cache_key.assign(key);
259
/* create the Resultset */
260
message::Resultset *resultset= queryCacheService.setCurrentResultsetMessage(session);
262
/* setting the resultset infos */
263
resultset->set_key(session->query_cache_key);
264
resultset->set_schema(session->db);
265
resultset->set_sql(session->query);
266
pthread_mutex_unlock(&mutex);
270
pthread_mutex_unlock(&mutex);
276
/* Send the current resultset to memcached
277
* Reset the current resultset of the session
279
bool MemcachedQueryCache::doSetResultset(Session *session)
281
message::Resultset *resultset= session->getResultsetMessage();
282
if (SessionVAR(session, enable) && (not session->is_error()) && resultset != NULL && session->lex->isCacheable())
284
/* Generate the final Header */
285
queryCacheService.setResultsetHeader(*resultset, session, session->lex->query_tables);
286
/* serialize the Resultset Message */
288
resultset->SerializeToString(&output);
290
/* setting to memecahced */
291
time_t expiry= expiry_time; // ToDo: add a user defined expiry
293
std::vector<char> raw(output.size());
294
memcpy(&raw[0], output.c_str(), output.size());
295
if(not client->set(session->query_cache_key, raw, expiry, flags))
298
session->resetResultsetMessage();
302
/* Clear the Selectdata from the Resultset to be localy cached
303
* Comment if Keeping the data in the header is needed
305
resultset->clear_select_data();
307
/* add the Resultset (including the header) to the hash
308
* This is done after the memcached set
310
queryCacheService.cache[session->query_cache_key]= *resultset;
312
/* endup the current statement */
314
session->resetResultsetMessage();
320
/* Adds a record (List<Item>) to the current Resultset.SelectData
322
bool MemcachedQueryCache::doInsertRecord(Session *session, List<Item> &list)
324
if(SessionVAR(session, enable))
326
queryCacheService.addRecord(session, list);
332
char* MemcachedQueryCache::md5_key(const char *str)
334
int msg_len= strlen(str);
335
/* Length of resulting sha1 hash - gcry_md_get_algo_dlen
336
* returns digest lenght for an algo */
337
int hash_len= gcry_md_get_algo_dlen( GCRY_MD_MD5 );
338
/* output sha1 hash - this will be binary data */
339
unsigned char* hash= (unsigned char*) malloc(hash_len);
340
/* output sha1 hash - converted to hex representation
341
* 2 hex digits for every byte + 1 for trailing \0 */
342
char *out= (char *) malloc( sizeof(char) * ((hash_len*2)+1) );
344
/* calculate the SHA1 digest. This is a bit of a shortcut function
345
* most gcrypt operations require the creation of a handle, etc. */
346
gcry_md_hash_buffer( GCRY_MD_MD5, hash, str , msg_len );
347
/* Convert each byte to its 2 digit ascii
348
* hex representation and place in out */
350
for ( i = 0; i < hash_len; i++, p += 2 )
352
snprintf ( p, 3, "%02x", hash[i] );
358
/** User Defined Function print_query_cache_meta **/
359
extern plugin::Create_function<PrintQueryCacheMetaFunction> *print_query_cache_meta_func_factory;
360
plugin::Create_function<QueryCacheFlushFunction> *query_cache_flush_func= NULL;
362
/** DATA_DICTIONARY views */
363
static QueryCacheTool *query_cache_tool;
364
static QueryCacheStatusTool *query_cache_status;
365
static CachedTables *query_cached_tables;
367
static int init(module::Context &context)
369
const module::option_map &vm= context.getOptions();
371
if (vm.count("expiry"))
373
if (expiry_time > (ulong)~0L)
375
errmsg_printf(ERRMSG_LVL_ERROR, _("Invalid value of expiry\n"));
380
if (vm.count("servers"))
382
sysvar_memcached_servers= const_cast<char *>(vm["servers"].as<string>().c_str());
385
if (vm.count("enable"))
387
(SessionVAR(NULL,enable))= vm["enable"].as<bool>();
390
MemcachedQueryCache* memc= new MemcachedQueryCache("Memcached_Query_Cache", sysvar_memcached_servers);
393
Invalidator* invalidator= new Invalidator("Memcached_Query_Cache_Invalidator");
394
context.add(invalidator);
395
ReplicationServices &replication_services= ReplicationServices::singleton();
396
string replicator_name("default_replicator");
397
replication_services.attachApplier(invalidator, replicator_name);
399
/* Setup the module's UDFs */
400
print_query_cache_meta_func_factory=
401
new plugin::Create_function<PrintQueryCacheMetaFunction>("print_query_cache_meta");
402
context.add(print_query_cache_meta_func_factory);
404
query_cache_flush_func= new plugin::Create_function<QueryCacheFlushFunction>("query_cache_flush");
405
context.add(query_cache_flush_func);
407
/* Setup the module Data dict and status infos */
408
query_cache_tool= new (nothrow) QueryCacheTool();
409
context.add(query_cache_tool);
410
query_cache_status= new (nothrow) QueryCacheStatusTool();
411
context.add(query_cache_status);
412
query_cached_tables= new (nothrow) CachedTables();
413
context.add(query_cached_tables);
418
static drizzle_sys_var* vars[]= {
419
DRIZZLE_SYSVAR(enable),
420
DRIZZLE_SYSVAR(servers),
421
DRIZZLE_SYSVAR(expiry),
425
QueryCacheStatusTool::Generator::Generator(drizzled::Field **fields) :
426
plugin::TableFunction::Generator(fields)
428
status_var_ptr= vars;
431
bool QueryCacheStatusTool::Generator::populate()
435
std::ostringstream oss;
439
push((*status_var_ptr)->name);
440
if (strcmp((**status_var_ptr).name, "enable") == 0)
441
return_value= SessionVAR(&(getSession()), enable) ? "ON" : "OFF";
442
if (strcmp((**status_var_ptr).name, "servers") == 0)
443
return_value= MemcachedQueryCache::getServers();
444
if (strcmp((**status_var_ptr).name, "expiry") == 0)
447
return_value= oss.str();
450
if (return_value.length())
462
static void init_options(drizzled::module::option_context &context)
465
po::value<string>()->default_value("127.0.0.1:11211"),
466
N_("List of memcached servers."));
468
po::value<ulong>(&expiry_time)->default_value(1000),
469
N_("Expiry time of memcached entries"));
471
po::value<bool>()->default_value(false)->zero_tokens(),
472
N_("Enable Memcached Query Cache"));
475
DRIZZLE_DECLARE_PLUGIN
480
"Djellel Eddine Difallah",
481
"Caches Select resultsets in Memcached",
483
init, /* Plugin Init */
484
vars, /* system variables */
485
init_options /* config options */
487
DRIZZLE_DECLARE_PLUGIN_END;