~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/memcached_query_cache/memcached_qc.cc

  • Committer: Monty Taylor
  • Date: 2011-04-07 16:51:38 UTC
  • mfrom: (2263.6.2 remove_memcached_qc)
  • Revision ID: mordred@inaugust.com-20110407165138-4mbpizlwlwt5hbl1
Merge David: Remove memcached query cache

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* 
2
 
 * Copyright (C) 2010 Djellel Eddine Difallah
3
 
 * All rights reserved.
4
 
 *
5
 
 * Redistribution and use in source and binary forms, with or without
6
 
 * modification, are permitted provided that the following conditions are met:
7
 
 *
8
 
 *   * Redistributions of source code must retain the above copyright notice,
9
 
 *     this list of conditions and the following disclaimer.
10
 
 *   * Redistributions in binary form must reproduce the above copyright notice,
11
 
 *     this list of conditions and the following disclaimer in the documentation
12
 
 *     and/or other materials provided with the distribution.
13
 
 *   * Neither the name of Djellel Eddine Difallah nor the names of its contributors
14
 
 *     may be used to endorse or promote products derived from this software
15
 
 *     without specific prior written permission.
16
 
 *
17
 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18
 
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21
 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22
 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23
 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24
 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25
 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26
 
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
27
 
 * THE POSSIBILITY OF SUCH DAMAGE.
28
 
 */
29
 
 
30
 
 
31
 
 
32
 
#include <config.h>
33
 
 
34
 
#include <drizzled/plugin.h>
35
 
#include <drizzled/session.h>
36
 
#include <drizzled/select_send.h>
37
 
#include <drizzled/item/null.h>
38
 
 
39
 
#include <gcrypt.h>
40
 
#include <string>
41
 
#include <iostream>
42
 
#include <vector>
43
 
 
44
 
#include "memcached_qc.h"
45
 
#include "query_cache_udf_tools.h"
46
 
#include "data_dictionary_schema.h"
47
 
#include "invalidator.h"
48
 
#include <boost/program_options.hpp>
49
 
#include <drizzled/module/option_map.h>
50
 
 
51
 
using namespace drizzled;
52
 
using namespace std;
53
 
namespace po= boost::program_options;
54
 
 
55
 
static uint64_constraint expiry_time;
56
 
 
57
 
memcache::Memcache* MemcachedQueryCache::client;
58
 
std::string MemcachedQueryCache::memcached_servers;
59
 
bool sysvar_memcached_qc_enable;
60
 
 
61
 
bool MemcachedQueryCache::isSelect(string query)
62
 
{
63
 
  uint i= 0;
64
 
  /*
65
 
   Skip '(' characters in queries like following:
66
 
   (select a from t1) union (select a from t1);
67
 
  */
68
 
  const char* sql= query.c_str();
69
 
  while (sql[i] == '(')
70
 
    i++;
71
 
  /*
72
 
   Test if the query is a SELECT
73
 
   (pre-space is removed in dispatch_command).
74
 
    First '/' looks like comment before command it is not
75
 
    frequently appeared in real life, consequently we can
76
 
    check all such queries, too.
77
 
  */
78
 
  if ((my_toupper(system_charset_info, sql[i])     != 'S' ||
79
 
       my_toupper(system_charset_info, sql[i + 1]) != 'E' ||
80
 
       my_toupper(system_charset_info, sql[i + 2]) != 'L') &&
81
 
      sql[i] != '/')
82
 
  {
83
 
    return false;
84
 
  }
85
 
  return true;
86
 
}
87
 
 
88
 
bool MemcachedQueryCache::doIsCached(Session *session)
89
 
{
90
 
  if (sysvar_memcached_qc_enable && isSelect(session->query))
91
 
  {
92
 
    /* ToDo: Check against the cache content */
93
 
    string query= session->query + *session->schema();
94
 
    char* key= md5_key(query.c_str());
95
 
    if(queryCacheService.isCached(key))
96
 
    {
97
 
     session->query_cache_key.assign(key);
98
 
     free(key);
99
 
     return true;
100
 
    }
101
 
    free(key);
102
 
  }
103
 
  return false;
104
 
}
105
 
 
106
 
bool MemcachedQueryCache::doSendCachedResultset(Session *session)
107
 
{
108
 
  /** TODO: pay attention to the case where the cache value is empty
109
 
  * ie: there is a session in the process of caching the query
110
 
  * and didn't finish the work
111
 
  */ 
112
 
  
113
 
  LEX *lex= session->lex;
114
 
  register Select_Lex *select_lex= &lex->select_lex;
115
 
  select_result *result=lex->result;
116
 
  if (not result && not (result= new select_send()))
117
 
    return true;
118
 
  result->prepare(select_lex->item_list, select_lex->master_unit());
119
 
 
120
 
  /* fetching the resultset from memcached */  
121
 
  vector<char> raw_resultset; 
122
 
  getClient()->get(session->query_cache_key, raw_resultset);
123
 
  if(raw_resultset.empty())
124
 
    return false;
125
 
  message::Resultset resultset_message;
126
 
  if (not resultset_message.ParseFromString(string(raw_resultset.begin(),raw_resultset.end())))
127
 
    return false;
128
 
  List<Item> item_list;
129
 
 
130
 
  /* Send the fields */
131
 
  message::SelectHeader header= resultset_message.select_header();
132
 
  size_t num_fields= header.field_meta_size();
133
 
  for (size_t y= 0; y < num_fields; y++)
134
 
  {
135
 
    message::FieldMeta field= header.field_meta(y);
136
 
    string value=field.field_alias();
137
 
    item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
138
 
  }
139
 
  result->send_fields(item_list);
140
 
  item_list.empty();
141
 
 
142
 
  /* Send the Data */
143
 
  message::SelectData data= resultset_message.select_data();
144
 
  session->limit_found_rows= 0; 
145
 
  for (int j= 0; j < data.record_size(); j++)
146
 
  {
147
 
    message::SelectRecord record= data.record(j);
148
 
    for (size_t y= 0; y < num_fields; y++)
149
 
    {
150
 
      if(record.is_null(y))
151
 
      {
152
 
        item_list.push_back(new Item_null());
153
 
      }
154
 
      else
155
 
      {
156
 
        string value=record.record_value(y);
157
 
        item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
158
 
      }
159
 
    }
160
 
    result->send_data(item_list);
161
 
    item_list.empty();
162
 
  }
163
 
  /* Send End of file */
164
 
  result->send_eof();
165
 
  /* reset the cache key at the session level */
166
 
  session->query_cache_key= "";
167
 
  return false;
168
 
}
169
 
 
170
 
/* Check if the tables in the query do not contain
171
 
 * Data_dictionary
172
 
 */
173
 
void MemcachedQueryCache::checkTables(Session *session, TableList* in_table)
174
 
{
175
 
  for (TableList* tmp_table= in_table; tmp_table; tmp_table= tmp_table->next_global)
176
 
  {
177
 
    if (strcasecmp(tmp_table->db, "DATA_DICTIONARY") == 0)
178
 
    {
179
 
      session->lex().setCacheable(false);
180
 
      break;
181
 
    }
182
 
  } 
183
 
}
184
 
 
185
 
/* init the current resultset in the session
186
 
 * set the header message (hashkey= sql + schema)
187
 
 */
188
 
bool MemcachedQueryCache::doPrepareResultset(Session *session)
189
 
{               
190
 
  checkTables(session, session->lex().query_tables);
191
 
  if (sysvar_memcached_qc_enable && session->lex().isCacheable())
192
 
  {
193
 
    /* Prepare and set the key for the session */
194
 
    string query= session->query + *session->schema();
195
 
    char* key= md5_key(query.c_str());
196
 
 
197
 
    /* make sure only one thread will cache the query 
198
 
     * if executed concurently
199
 
     */
200
 
    pthread_mutex_lock(&mutex);
201
 
 
202
 
    if(not queryCacheService.isCached(key))
203
 
    {
204
 
      session->query_cache_key.assign(key);
205
 
      free(key);
206
 
    
207
 
      /* create the Resultset */
208
 
      message::Resultset *resultset= queryCacheService.setCurrentResultsetMessage(session);
209
 
  
210
 
      /* setting the resultset infos */
211
 
      resultset->set_key(session->query_cache_key);
212
 
      resultset->set_schema(*session->schema());
213
 
      resultset->set_sql(session->query);
214
 
      pthread_mutex_unlock(&mutex);
215
 
      
216
 
      return true;
217
 
    }
218
 
    pthread_mutex_unlock(&mutex);
219
 
    free(key);
220
 
  }
221
 
  return false;
222
 
}
223
 
 
224
 
/* Send the current resultset to memcached
225
 
 * Reset the current resultset of the session
226
 
 */
227
 
bool MemcachedQueryCache::doSetResultset(Session *session)
228
 
{               
229
 
  message::Resultset *resultset= session->getResultsetMessage();
230
 
  if (sysvar_memcached_qc_enable && (not session->is_error()) && resultset != NULL && session->lex().isCacheable())
231
 
  {
232
 
    /* Generate the final Header */
233
 
    queryCacheService.setResultsetHeader(*resultset, session, session->lex().query_tables);
234
 
    /* serialize the Resultset Message */
235
 
    std::string output;
236
 
    resultset->SerializeToString(&output);
237
 
 
238
 
    /* setting to memecahced */
239
 
    time_t expiry= expiry_time;  // ToDo: add a user defined expiry
240
 
    uint32_t flags= 0;
241
 
    std::vector<char> raw(output.size());
242
 
    memcpy(&raw[0], output.c_str(), output.size());
243
 
    if(not client->set(session->query_cache_key, raw, expiry, flags))
244
 
    {
245
 
      delete resultset;
246
 
      session->resetResultsetMessage();
247
 
      return false;
248
 
    }
249
 
    
250
 
    /* Clear the Selectdata from the Resultset to be localy cached
251
 
     * Comment if Keeping the data in the header is needed
252
 
     */
253
 
    resultset->clear_select_data();
254
 
 
255
 
    /* add the Resultset (including the header) to the hash 
256
 
     * This is done after the memcached set
257
 
     */
258
 
    queryCacheService.cache[session->query_cache_key]= *resultset;
259
 
 
260
 
    /* endup the current statement */
261
 
    delete resultset;
262
 
    session->resetResultsetMessage();
263
 
    return true;
264
 
  }
265
 
  return false;
266
 
}
267
 
 
268
 
/* Adds a record (List<Item>) to the current Resultset.SelectData
269
 
 */
270
 
bool MemcachedQueryCache::doInsertRecord(Session *session, List<Item> &list)
271
 
{               
272
 
  if(sysvar_memcached_qc_enable)
273
 
  {
274
 
    queryCacheService.addRecord(session, list);
275
 
    return true;
276
 
  }
277
 
  return false;
278
 
}
279
 
 
280
 
char* MemcachedQueryCache::md5_key(const char *str)
281
 
{
282
 
  int msg_len= strlen(str);
283
 
  /* Length of resulting sha1 hash - gcry_md_get_algo_dlen
284
 
  * returns digest lenght for an algo */
285
 
  int hash_len= gcry_md_get_algo_dlen( GCRY_MD_MD5 );
286
 
  /* output sha1 hash - this will be binary data */
287
 
  unsigned char* hash= (unsigned char*) malloc(hash_len);
288
 
  /* output sha1 hash - converted to hex representation
289
 
  * 2 hex digits for every byte + 1 for trailing \0 */
290
 
  char *out= (char *) malloc( sizeof(char) * ((hash_len*2)+1) );
291
 
  char *p= out;
292
 
  /* calculate the SHA1 digest. This is a bit of a shortcut function
293
 
  * most gcrypt operations require the creation of a handle, etc. */
294
 
  gcry_md_hash_buffer( GCRY_MD_MD5, hash, str , msg_len );
295
 
  /* Convert each byte to its 2 digit ascii
296
 
  * hex representation and place in out */
297
 
  int i;
298
 
  for ( i = 0; i < hash_len; i++, p += 2 )
299
 
  {
300
 
    snprintf ( p, 3, "%02x", hash[i] );
301
 
  }
302
 
  free(hash);
303
 
  return out;
304
 
}
305
 
 
306
 
/** User Defined Function print_query_cache_meta **/
307
 
extern plugin::Create_function<PrintQueryCacheMetaFunction> *print_query_cache_meta_func_factory;
308
 
plugin::Create_function<QueryCacheFlushFunction> *query_cache_flush_func= NULL;
309
 
 
310
 
/** DATA_DICTIONARY views */
311
 
static QueryCacheTool *query_cache_tool;
312
 
static QueryCacheStatusTool *query_cache_status;
313
 
static CachedTables *query_cached_tables;
314
 
 
315
 
static int init(module::Context &context)
316
 
{
317
 
  const module::option_map &vm= context.getOptions();
318
 
 
319
 
  MemcachedQueryCache* memc= new MemcachedQueryCache("Memcached_Query_Cache", vm["servers"].as<string>());
320
 
  context.add(memc);
321
 
 
322
 
  Invalidator* invalidator= new Invalidator("Memcached_Query_Cache_Invalidator");
323
 
  context.add(invalidator);
324
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
325
 
  string replicator_name("default_replicator");
326
 
  replication_services.attachApplier(invalidator, replicator_name);
327
 
  
328
 
  /* Setup the module's UDFs */
329
 
  print_query_cache_meta_func_factory=
330
 
    new plugin::Create_function<PrintQueryCacheMetaFunction>("print_query_cache_meta");
331
 
  context.add(print_query_cache_meta_func_factory);
332
 
  
333
 
  query_cache_flush_func= new plugin::Create_function<QueryCacheFlushFunction>("query_cache_flush");
334
 
  context.add(query_cache_flush_func);
335
 
 
336
 
  /* Setup the module Data dict and status infos */
337
 
  query_cache_tool= new QueryCacheTool();
338
 
  context.add(query_cache_tool);
339
 
  query_cache_status= new QueryCacheStatusTool();
340
 
  context.add(query_cache_status);
341
 
  query_cached_tables= new CachedTables();
342
 
  context.add(query_cached_tables);
343
 
  
344
 
  context.registerVariable(new sys_var_constrained_value<uint64_t>("expiry", expiry_time));
345
 
  context.registerVariable(new sys_var_const_string_val("servers", vm["servers"].as<string>()));
346
 
  context.registerVariable(new sys_var_bool_ptr("enable", &sysvar_memcached_qc_enable));
347
 
  return 0;
348
 
}
349
 
 
350
 
QueryCacheStatusTool::Generator::Generator(drizzled::Field **fields) :
351
 
  plugin::TableFunction::Generator(fields)
352
 
353
 
  status_var_ptr= vars;
354
 
}
355
 
 
356
 
bool QueryCacheStatusTool::Generator::populate()
357
 
{
358
 
  if (*status_var_ptr)
359
 
  {
360
 
    string return_value;
361
 
 
362
 
    /* VARIABLE_NAME */
363
 
    push((*status_var_ptr)->name);
364
 
    if (strcmp((**status_var_ptr).name, "enable") == 0)
365
 
      return_value= sysvar_memcached_qc_enable ? "ON" : "OFF";
366
 
    if (strcmp((**status_var_ptr).name, "servers") == 0) 
367
 
      return_value= MemcachedQueryCache::getServers();
368
 
    if (strcmp((**status_var_ptr).name, "expiry") == 0)
369
 
      return_value= boost::lexical_cast<std::string>(expiry_time);
370
 
 
371
 
    /* VARIABLE_VALUE */
372
 
    if (return_value.length())
373
 
      push(return_value);
374
 
    else 
375
 
      push(" ");
376
 
 
377
 
    status_var_ptr++;
378
 
 
379
 
    return true;
380
 
  }
381
 
  return false;
382
 
}
383
 
 
384
 
static void init_options(drizzled::module::option_context &context)
385
 
{
386
 
  context("servers",
387
 
          po::value<string>()->default_value("127.0.0.1:11211"),
388
 
          _("List of memcached servers."));
389
 
  context("expiry",
390
 
          po::value<uint64_constraint>(&expiry_time)->default_value(1000),
391
 
          _("Expiry time of memcached entries"));
392
 
  context("enable",
393
 
          po::value<bool>(&sysvar_memcached_qc_enable)->default_value(false)->zero_tokens(),
394
 
          _("Enable Memcached Query Cache"));
395
 
}
396
 
 
397
 
DRIZZLE_DECLARE_PLUGIN
398
 
{
399
 
  DRIZZLE_VERSION_ID,
400
 
  "Query_Cache",
401
 
  "0.3",
402
 
  "Djellel Eddine Difallah",
403
 
  "Caches Select resultsets in Memcached",
404
 
  PLUGIN_LICENSE_BSD,
405
 
  init,   /* Plugin Init      */
406
 
  NULL, /* depends */
407
 
  init_options    /* config options   */
408
 
}
409
 
DRIZZLE_DECLARE_PLUGIN_END;