~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/logging_stats/logging_stats.cc

  • Committer: Jay Pipes
  • Date: 2010-04-08 16:27:25 UTC
  • mfrom: (1405.6.10 replication-pairs)
  • mto: This revision was merged to the branch mainline in revision 1457.
  • Revision ID: jpipes@serialcoder-20100408162725-sugbgn38oxjqclq2
Merge trunk and replication-pairs with conflict resolution

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
/**
31
31
 * @details
32
32
 *
33
 
 * This tracks current user commands. The commands are logged using
34
 
 * the post() and postEnd() logging APIs. It uses a scoreboard
35
 
 * approach that initializes the scoreboard size to the value set
36
 
 * by logging_stats_scoreboard_size. Each ScoreBoardSlot wraps 
37
 
 * a UserCommand object containing the statistics for a particular
38
 
 * session. As other statistics are added they can then be added
39
 
 * to the ScoreBoardSlot object. 
 
33
 * This plugin tracks the current user commands for a session, and copies
 
34
 * them into a cumulative vector of all commands run by a user over time. 
 
35
 * The commands are logged using the post() and postEnd() logging APIs.
 
36
 * User commands are stored in a Scoreboard where each active session
 
37
 * owns a ScoreboardSlot.  
 
38
 *
 
39
 * Scoreboard
 
40
 *
 
41
 * The scoreboard is a pre-allocated vector of vectors of ScoreboardSlots. It
 
42
 * can be thought of as a vector of buckets where each bucket contains
 
43
 * pre-allocated ScoreboardSlots. To determine which bucket gets used for
 
44
 * recording statistics the modulus operator is used on the session_id. This
 
45
 * will result in a bucket to search for a unused ScoreboardSlot.
40
46
 *
41
47
 * Locking  
42
 
 *
43
 
 * A RW lock is taken to locate a open slot for a session or to locate the
44
 
 * slot that the current session has claimed. 
45
 
 * 
46
 
 * A read lock is taken when the table is queried in the data_dictionary.
 
48
 * 
 
49
 * Each vector in the Scoreboard has its own lock. This allows session 2 
 
50
 * to not have to wait for session 1 to locate a slot to use, as they
 
51
 * will be in different buckets.  A lock is taken to locate a open slot
 
52
 * in the scoreboard for a session or to locate the slot that the current
 
53
 * session has claimed. 
 
54
 *
 
55
 * A read lock is taken on the scoreboard vector when the table is queried 
 
56
 * in the data_dictionary.
 
57
 *
 
58
 * A lock is taken when a new user is added to the cumulative vector
 
59
 * repeat connections with a already used user will not use a lock. 
 
60
 * 
 
61
 * System Variables
 
62
 * 
 
63
 * logging_stats_scoreboard_size - the size of the scoreboard this corresponds
 
64
 *   to the maximum number of concurrent connections that can be tracked
 
65
 *
 
66
 * logging_stats_max_user_count - this is used for cumulative statistics it 
 
67
 *   represents the maximum users that can be tracked 
 
68
 * 
 
69
 * logging_stats_bucket_count - the number of buckets to have in the scoreboard
 
70
 *   this splits up locking across several buckets so the entire scoreboard is 
 
71
 *   not locked at a single point in time.
 
72
 * 
 
73
 * logging_stats_enabled - enable/disable plugin 
47
74
 * 
48
75
 * TODO 
49
76
 *
50
 
 * To improve as more statistics are added, a pointer to the scoreboard
51
 
 * slot should be added to the Session object. This will avoid the session
52
 
 * having to do multiple lookups in the scoreboard.  
 
77
 * A pointer to the scoreboard slot could be added to the Session object.
 
78
 * This will avoid the session having to do multiple lookups in the scoreboard,
 
79
 * this will also avoid having to take a lock to locate the scoreboard slot 
 
80
 * being used by a particular session. 
 
81
 *
 
82
 * Allow expansion of Scoreboard and cumulative vector 
53
83
 *  
54
 
 * Save the statistics off into a vector so you can query by user/ip and get
55
 
 * commands run based on those keys over time. 
56
 
 * 
57
84
 */
58
85
 
59
86
#include "config.h"
69
96
 
70
97
static uint32_t sysvar_logging_stats_scoreboard_size= 2000;
71
98
 
72
 
pthread_rwlock_t LOCK_scoreboard;
 
99
static uint32_t sysvar_logging_stats_max_user_count= 10000;
 
100
 
 
101
static uint32_t sysvar_logging_stats_bucket_count= 10;
 
102
 
 
103
pthread_rwlock_t LOCK_cumulative_scoreboard_index;
73
104
 
74
105
LoggingStats::LoggingStats(string name_arg) : Logging(name_arg)
75
106
{
76
 
  (void) pthread_rwlock_init(&LOCK_scoreboard, NULL);
77
 
  scoreboard_size= sysvar_logging_stats_scoreboard_size;
78
 
  score_board_slots= new ScoreBoardSlot[scoreboard_size];
79
 
  for (uint32_t j=0; j < scoreboard_size; j++)
80
 
  { 
81
 
    UserCommands *user_commands= new UserCommands();
82
 
    ScoreBoardSlot *score_board_slot= &score_board_slots[j];
83
 
    score_board_slot->setUserCommands(user_commands); 
84
 
  } 
 
107
  (void) pthread_rwlock_init(&LOCK_cumulative_scoreboard_index, NULL);
 
108
  cumulative_stats_by_user_index= 0;
 
109
 
 
110
  current_scoreboard= new Scoreboard(sysvar_logging_stats_scoreboard_size, 
 
111
                                     sysvar_logging_stats_bucket_count);
 
112
 
 
113
  cumulative_stats_by_user_max= sysvar_logging_stats_max_user_count;
 
114
 
 
115
  cumulative_stats_by_user_vector= new vector<ScoreboardSlot *>(cumulative_stats_by_user_max);
 
116
  preAllocateScoreboardSlotVector(sysvar_logging_stats_max_user_count, 
 
117
                                  cumulative_stats_by_user_vector);
85
118
}
86
119
 
87
120
LoggingStats::~LoggingStats()
88
121
{
89
 
  (void) pthread_rwlock_destroy(&LOCK_scoreboard);
90
 
  delete[] score_board_slots;
 
122
  (void) pthread_rwlock_destroy(&LOCK_cumulative_scoreboard_index); 
 
123
  deleteScoreboardSlotVector(cumulative_stats_by_user_vector);
 
124
  delete current_scoreboard;
 
125
}
 
126
 
 
127
void LoggingStats::preAllocateScoreboardSlotVector(uint32_t size, 
 
128
                                                   vector<ScoreboardSlot *> *scoreboard_slot_vector)
 
129
{
 
130
  vector<ScoreboardSlot *>::iterator it= scoreboard_slot_vector->begin();
 
131
  for (uint32_t j=0; j < size; ++j)
 
132
  {
 
133
    ScoreboardSlot *scoreboard_slot= new ScoreboardSlot();
 
134
    it= scoreboard_slot_vector->insert(it, scoreboard_slot);
 
135
  }
 
136
  scoreboard_slot_vector->resize(size);
 
137
}
 
138
 
 
139
void LoggingStats::deleteScoreboardSlotVector(vector<ScoreboardSlot *> *scoreboard_slot_vector)
 
140
{
 
141
  vector<ScoreboardSlot *>::iterator it= scoreboard_slot_vector->begin();
 
142
  for (; it < scoreboard_slot_vector->end(); ++it)
 
143
  {
 
144
    delete *it;
 
145
  }
 
146
  scoreboard_slot_vector->clear();
 
147
  delete scoreboard_slot_vector;
91
148
}
92
149
 
93
150
bool LoggingStats::isBeingLogged(Session *session)
111
168
  }
112
169
113
170
 
114
 
void LoggingStats::updateScoreBoard(ScoreBoardSlot *score_board_slot,
115
 
                                    Session *session)
 
171
void LoggingStats::updateCurrentScoreboard(ScoreboardSlot *scoreboard_slot,
 
172
                                           Session *session)
116
173
{
117
174
  enum_sql_command sql_command= session->lex->sql_command;
118
175
 
119
 
  UserCommands *user_commands= score_board_slot->getUserCommands();
 
176
  UserCommands *user_commands= scoreboard_slot->getUserCommands();
120
177
 
121
178
  switch(sql_command)
122
179
  {
165
222
    return false;
166
223
  }
167
224
 
168
 
  /* Find a slot that is unused */
169
 
 
170
 
  pthread_rwlock_wrlock(&LOCK_scoreboard);
171
 
  ScoreBoardSlot *score_board_slot;
172
 
  int our_slot= UNINITIALIZED; 
173
 
  int open_slot= UNINITIALIZED;
174
 
 
175
 
  for (uint32_t j=0; j < scoreboard_size; j++)
176
 
  {
177
 
    score_board_slot= &score_board_slots[j];
178
 
 
179
 
    if (score_board_slot->isInUse() == true)
 
225
  ScoreboardSlot *scoreboard_slot= current_scoreboard->findScoreboardSlotToLog(session);
 
226
 
 
227
  /* Its possible that the scoreboard is full with active sessions in which case 
 
228
     this could be null */
 
229
  if (scoreboard_slot)
 
230
  {
 
231
    updateCurrentScoreboard(scoreboard_slot, session);
 
232
  }
 
233
  return false;
 
234
}
 
235
 
 
236
bool LoggingStats::postEnd(Session *session)
 
237
{
 
238
  if (! isEnabled() || (session->getSessionId() == 0))
 
239
  {
 
240
    return false;
 
241
  }
 
242
 
 
243
  ScoreboardSlot *scoreboard_slot= current_scoreboard->findAndResetScoreboardSlot(session);
 
244
 
 
245
  if (scoreboard_slot)
 
246
  {
 
247
    vector<ScoreboardSlot *>::iterator cumulative_it= cumulative_stats_by_user_vector->begin();
 
248
    bool found= false;
 
249
 
 
250
    /* Search if this is a pre-existing user */
 
251
    for (uint32_t h= 0; h < cumulative_stats_by_user_index; ++h)
180
252
    {
181
 
      /* Check if this session is the one using this slot */
182
 
      if (score_board_slot->getSessionId() == session->getSessionId())
183
 
      {
184
 
        our_slot= j;
185
 
        break; 
186
 
      } 
187
 
      else 
188
 
      {
189
 
        continue; 
 
253
      ScoreboardSlot *cumulative_scoreboard_slot= *cumulative_it;
 
254
      string user= cumulative_scoreboard_slot->getUser();
 
255
      if (user.compare(scoreboard_slot->getUser()) == 0)
 
256
      {
 
257
        found= true;
 
258
        cumulative_scoreboard_slot->merge(scoreboard_slot);
 
259
        break;
190
260
      }
 
261
      ++cumulative_it;
191
262
    }
192
 
    else 
 
263
 
 
264
    /* this will add a new user */
 
265
    if (! found)
193
266
    {
194
 
      /* save off the open slot */ 
195
 
      if (open_slot == -1)
196
 
      {
197
 
        open_slot= j;
198
 
      } 
199
 
      continue;
 
267
      updateCumulativeStatsByUserVector(scoreboard_slot);
200
268
    }
201
 
  }
202
 
 
203
 
  if (our_slot != UNINITIALIZED)
204
 
  {
205
 
    pthread_rwlock_unlock(&LOCK_scoreboard); 
206
 
  }
207
 
  else if (open_slot != UNINITIALIZED)
208
 
  {
209
 
    score_board_slot= &score_board_slots[open_slot];
210
 
    score_board_slot->setInUse(true);
211
 
    score_board_slot->setSessionId(session->getSessionId());
212
 
    score_board_slot->setUser(session->getSecurityContext().getUser());
213
 
    score_board_slot->setIp(session->getSecurityContext().getIp());
214
 
    pthread_rwlock_unlock(&LOCK_scoreboard);
215
 
  }
216
 
  else 
217
 
  {
218
 
    pthread_rwlock_unlock(&LOCK_scoreboard);
219
 
    /* there was no available slot for this session */
220
 
    return false;
221
 
  }
222
 
 
223
 
  updateScoreBoard(score_board_slot, session);
 
269
    delete scoreboard_slot;
 
270
  }
224
271
 
225
272
  return false;
226
273
}
227
274
 
228
 
bool LoggingStats::postEnd(Session *session)
 
275
void LoggingStats::updateCumulativeStatsByUserVector(ScoreboardSlot *current_scoreboard_slot)
229
276
{
230
 
  if (! isEnabled() || (session->getSessionId() == 0)) 
231
 
  {
232
 
    return false;
233
 
  }
234
 
 
235
 
  ScoreBoardSlot *score_board_slot;
236
 
 
237
 
  pthread_rwlock_wrlock(&LOCK_scoreboard);
238
 
 
239
 
  for (uint32_t j=0; j < scoreboard_size; j++)
240
 
  {
241
 
    score_board_slot= &score_board_slots[j];
242
 
 
243
 
    if (score_board_slot->getSessionId() == session->getSessionId())
244
 
    {
245
 
      score_board_slot->reset();
246
 
      break;
 
277
  /* Check twice if the user table is full, do not grab a lock each time */
 
278
  if (cumulative_stats_by_user_max > cumulative_stats_by_user_index)
 
279
  {
 
280
    pthread_rwlock_wrlock(&LOCK_cumulative_scoreboard_index);
 
281
 
 
282
    if (cumulative_stats_by_user_max > cumulative_stats_by_user_index)
 
283
    { 
 
284
      ScoreboardSlot *cumulative_scoreboard_slot=
 
285
        cumulative_stats_by_user_vector->at(cumulative_stats_by_user_index);
 
286
      string cumulative_scoreboard_user(current_scoreboard_slot->getUser());
 
287
      cumulative_scoreboard_slot->setUser(cumulative_scoreboard_user);
 
288
      cumulative_scoreboard_slot->merge(current_scoreboard_slot);
 
289
      ++cumulative_stats_by_user_index;
247
290
    }
 
291
 
 
292
    pthread_rwlock_unlock(&LOCK_cumulative_scoreboard_index);
248
293
  }
249
 
 
250
 
  pthread_rwlock_unlock(&LOCK_scoreboard);
251
 
 
252
 
  return false;
253
294
}
254
295
 
 
296
 
255
297
/* Plugin initialization and system variables */
256
298
 
257
299
static LoggingStats *logging_stats= NULL;
258
300
 
259
 
static CommandsTool *commands_tool= NULL;
 
301
static CurrentCommandsTool *current_commands_tool= NULL;
 
302
 
 
303
static CumulativeCommandsTool *cumulative_commands_tool= NULL;
260
304
 
261
305
static void enable(Session *,
262
306
                   drizzle_sys_var *,
280
324
 
281
325
static bool initTable()
282
326
{
283
 
  commands_tool= new(nothrow)CommandsTool(logging_stats);
284
 
 
285
 
  if (! commands_tool)
 
327
  current_commands_tool= new(nothrow)CurrentCommandsTool(logging_stats);
 
328
 
 
329
  if (! current_commands_tool)
 
330
  {
 
331
    return true;
 
332
  }
 
333
 
 
334
  cumulative_commands_tool= new(nothrow)CumulativeCommandsTool(logging_stats);
 
335
 
 
336
  if (! cumulative_commands_tool)
286
337
  {
287
338
    return true;
288
339
  }
290
341
  return false;
291
342
}
292
343
 
293
 
static int init(drizzled::plugin::Context &context)
 
344
static int init(Context &context)
294
345
{
295
346
  logging_stats= new LoggingStats("logging_stats");
296
347
 
300
351
  }
301
352
 
302
353
  context.add(logging_stats);
303
 
  context.add(commands_tool);
 
354
  context.add(current_commands_tool);
 
355
  context.add(cumulative_commands_tool);
304
356
 
305
357
  if (sysvar_logging_stats_enabled)
306
358
  {
310
362
  return 0;
311
363
}
312
364
 
 
365
static DRIZZLE_SYSVAR_UINT(max_user_count,
 
366
                           sysvar_logging_stats_max_user_count,
 
367
                           PLUGIN_VAR_RQCMDARG,
 
368
                           N_("Max number of users that will be logged"),
 
369
                           NULL, /* check func */
 
370
                           NULL, /* update func */
 
371
                           10000, /* default */
 
372
                           500, /* minimum */
 
373
                           50000,
 
374
                           0);
 
375
 
 
376
static DRIZZLE_SYSVAR_UINT(bucket_count,
 
377
                           sysvar_logging_stats_bucket_count,
 
378
                           PLUGIN_VAR_RQCMDARG,
 
379
                           N_("Max number of vector buckets to construct for logging"),
 
380
                           NULL, /* check func */
 
381
                           NULL, /* update func */
 
382
                           10, /* default */
 
383
                           5, /* minimum */
 
384
                           100,
 
385
                           0);
 
386
 
313
387
static DRIZZLE_SYSVAR_UINT(scoreboard_size,
314
388
                           sysvar_logging_stats_scoreboard_size,
315
389
                           PLUGIN_VAR_RQCMDARG,
317
391
                           NULL, /* check func */
318
392
                           NULL, /* update func */
319
393
                           2000, /* default */
320
 
                           1000, /* minimum */
 
394
                           10, /* minimum */
321
395
                           50000, 
322
396
                           0);
323
397
 
330
404
                           false /* default */);
331
405
 
332
406
static drizzle_sys_var* system_var[]= {
 
407
  DRIZZLE_SYSVAR(max_user_count),
 
408
  DRIZZLE_SYSVAR(bucket_count),
333
409
  DRIZZLE_SYSVAR(scoreboard_size),
334
410
  DRIZZLE_SYSVAR(enable),
335
411
  NULL