~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/log_event.cc

  • Committer: Brian Aker
  • Date: 2009-01-23 02:15:04 UTC
  • mfrom: (798.2.32 drizzle)
  • Revision ID: brian@tangent.org-20090123021504-2j99e6hxab1ew601
Merge for replication removal.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
19
 
 
20
 
#include <drizzled/server_includes.h>
21
 
#include <drizzled/log_event.h>
22
 
#include <drizzled/replication/rli.h>
23
 
#include <drizzled/replication/mi.h>
24
 
#include <libdrizzle/libdrizzle.h>
25
 
#include <mysys/hash.h>
26
 
#include <drizzled/replication/utility.h>
27
 
#include <drizzled/replication/record.h>
28
 
#include <mysys/my_dir.h>
29
 
#include <drizzled/error.h>
30
 
#include <libdrizzle/pack.h>
31
 
#include <drizzled/sql_parse.h>
32
 
#include <drizzled/sql_base.h>
33
 
#include <drizzled/sql_load.h>
34
 
#include <drizzled/item/return_int.h>
35
 
#include <drizzled/item/empty_string.h>
36
 
 
37
 
#include <algorithm>
38
 
#include <string>
39
 
 
40
 
#include <mysys/base64.h>
41
 
#include <mysys/my_bitmap.h>
42
 
 
43
 
#include <drizzled/gettext.h>
44
 
#include <libdrizzle/libdrizzle.h>
45
 
#include <drizzled/error.h>
46
 
#include <drizzled/query_id.h>
47
 
#include <drizzled/tztime.h>
48
 
#include <drizzled/slave.h>
49
 
#include <drizzled/lock.h>
50
 
 
51
 
using namespace std;
52
 
 
53
 
static const char *HA_ERR(int i)
54
 
{
55
 
  switch (i) {
56
 
  case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
57
 
  case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
58
 
  case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
59
 
  case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
60
 
  case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
61
 
  case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
62
 
  case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
63
 
  case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
64
 
  case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
65
 
  case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
66
 
  case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
67
 
  case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
68
 
  case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
69
 
  case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
70
 
  case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
71
 
  case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
72
 
  case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
73
 
  case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
74
 
  case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
75
 
  case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
76
 
  case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
77
 
  case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
78
 
  case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
79
 
  case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
80
 
  case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
81
 
  case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
82
 
  case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
83
 
  case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
84
 
  case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
85
 
  case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
86
 
  case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
87
 
  case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
88
 
  case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
89
 
  case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
90
 
  case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
91
 
  case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
92
 
  case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
93
 
  case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
94
 
  case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
95
 
  case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
96
 
  case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
97
 
  case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
98
 
  case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
99
 
  case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
100
 
  case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
101
 
  case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
102
 
  case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
103
 
  case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
104
 
  case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
105
 
  case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
106
 
  }
107
 
  return 0;
108
 
}
109
 
 
110
 
/**
111
 
   Error reporting facility for Rows_log_event::do_apply_event
112
 
 
113
 
   @param level     error, warning or info
114
 
   @param ha_error  HA_ERR_ code
115
 
   @param rli       pointer to the active Relay_log_info instance
116
 
   @param session       pointer to the slave thread's session
117
 
   @param table     pointer to the event's table object
118
 
   @param type      the type of the event
119
 
   @param log_name  the master binlog file name
120
 
   @param pos       the master binlog file pos (the next after the event)
121
 
 
122
 
*/
123
 
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
124
 
                                           Relay_log_info const *rli, Session *session,
125
 
                                           Table *table, const char * type,
126
 
                                           const char *log_name, ulong pos)
127
 
{
128
 
  const char *handler_error= HA_ERR(ha_error);
129
 
  char buff[MAX_SLAVE_ERRMSG], *slider;
130
 
  const char *buff_end= buff + sizeof(buff);
131
 
  uint32_t len;
132
 
  List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
133
 
  DRIZZLE_ERROR *err;
134
 
  buff[0]= 0;
135
 
 
136
 
  for (err= it++, slider= buff; err && slider < buff_end - 1;
137
 
       slider += len, err= it++)
138
 
  {
139
 
    len= snprintf(slider, buff_end - slider,
140
 
                  _(" %s, Error_code: %d;"), err->msg, err->code);
141
 
  }
142
 
 
143
 
  rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
144
 
              _("Could not execute %s event on table %s.%s;"
145
 
                "%s handler error %s; "
146
 
                "the event's master log %s, end_log_pos %lu"),
147
 
              type, table->s->db.str,
148
 
              table->s->table_name.str,
149
 
              buff,
150
 
              handler_error == NULL? _("<unknown>") : handler_error,
151
 
              log_name, pos);
152
 
}
153
 
 
154
 
 
155
 
/*
156
 
  Cache that will automatically be written to a dedicated file on
157
 
  destruction.
158
 
 
159
 
  DESCRIPTION
160
 
 
161
 
 */
162
 
class Write_on_release_cache
163
 
{
164
 
public:
165
 
  enum flag
166
 
  {
167
 
    FLUSH_F
168
 
  };
169
 
 
170
 
  typedef unsigned short flag_set;
171
 
 
172
 
  /*
173
 
    Constructor.
174
 
 
175
 
    SYNOPSIS
176
 
      Write_on_release_cache
177
 
      cache  Pointer to cache to use
178
 
      file   File to write cache to upon destruction
179
 
      flags  Flags for the cache
180
 
 
181
 
    DESCRIPTION
182
 
 
183
 
      Class used to guarantee copy of cache to file before exiting the
184
 
      current block.  On successful copy of the cache, the cache will
185
 
      be reinited as a WRITE_CACHE.
186
 
 
187
 
      Currently, a pointer to the cache is provided in the
188
 
      constructor, but it would be possible to create a subclass
189
 
      holding the IO_CACHE itself.
190
 
   */
191
 
  Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
192
 
    : m_cache(cache), m_file(file), m_flags(flags)
193
 
  {
194
 
    reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
195
 
  }
196
 
 
197
 
  ~Write_on_release_cache()
198
 
  {
199
 
    copy_event_cache_to_file_and_reinit(m_cache, m_file);
200
 
    if (m_flags | FLUSH_F)
201
 
      fflush(m_file);
202
 
  }
203
 
 
204
 
  /*
205
 
    Return a pointer to the internal IO_CACHE.
206
 
 
207
 
    SYNOPSIS
208
 
      operator&()
209
 
 
210
 
    DESCRIPTION
211
 
 
212
 
      Function to return a pointer to the internal cache, so that the
213
 
      object can be treated as a IO_CACHE and used with the my_b_*
214
 
      IO_CACHE functions
215
 
 
216
 
    RETURN VALUE
217
 
      A pointer to the internal IO_CACHE.
218
 
   */
219
 
  IO_CACHE *operator&()
220
 
  {
221
 
    return m_cache;
222
 
  }
223
 
 
224
 
private:
225
 
  // Hidden, to prevent usage.
226
 
  Write_on_release_cache(Write_on_release_cache const&);
227
 
 
228
 
  IO_CACHE *m_cache;
229
 
  FILE *m_file;
230
 
  flag_set m_flags;
231
 
};
232
 
 
233
 
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
234
 
 
235
 
/*
236
 
  pretty_print_str()
237
 
*/
238
 
 
239
 
static void clear_all_errors(Session *session, Relay_log_info *rli)
240
 
{
241
 
  session->is_slave_error = 0;
242
 
  session->clear_error();
243
 
  rli->clear_error();
244
 
}
245
 
 
246
 
 
247
 
/**
248
 
  Ignore error code specified on command line.
249
 
*/
250
 
 
251
 
inline int ignored_error_code(int err_code)
252
 
{
253
 
  return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
254
 
          (use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
255
 
}
256
 
 
257
 
 
258
 
/*
259
 
  pretty_print_str()
260
 
*/
261
 
 
262
 
static char *pretty_print_str(char *packet, const char *str, int len)
263
 
{
264
 
  const char *end= str + len;
265
 
  char *pos= packet;
266
 
  *pos++= '\'';
267
 
  while (str < end)
268
 
  {
269
 
    char c;
270
 
    switch ((c=*str++)) {
271
 
    case '\n': *pos++= '\\'; *pos++= 'n'; break;
272
 
    case '\r': *pos++= '\\'; *pos++= 'r'; break;
273
 
    case '\\': *pos++= '\\'; *pos++= '\\'; break;
274
 
    case '\b': *pos++= '\\'; *pos++= 'b'; break;
275
 
    case '\t': *pos++= '\\'; *pos++= 't'; break;
276
 
    case '\'': *pos++= '\\'; *pos++= '\''; break;
277
 
    case 0   : *pos++= '\\'; *pos++= '0'; break;
278
 
    default:
279
 
      *pos++= c;
280
 
      break;
281
 
    }
282
 
  }
283
 
  *pos++= '\'';
284
 
  return pos;
285
 
}
286
 
 
287
 
 
288
 
/**
289
 
  Creates a temporary name for load data infile:.
290
 
 
291
 
  @param buf                  Store new filename here
292
 
  @param file_id              File_id (part of file name)
293
 
  @param event_server_id     Event_id (part of file name)
294
 
  @param ext                  Extension for file name
295
 
 
296
 
  @return
297
 
    Pointer to start of extension
298
 
*/
299
 
 
300
 
static char *slave_load_file_stem(char *buf, uint32_t file_id,
301
 
                                  int event_server_id, const char *ext)
302
 
{
303
 
  char *res;
304
 
  fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
305
 
 
306
 
  buf= strchr(buf, '\0');
307
 
  buf= int10_to_str(::server_id, buf, 10);
308
 
  *buf++ = '-';
309
 
  buf= int10_to_str(event_server_id, buf, 10);
310
 
  *buf++ = '-';
311
 
  res= int10_to_str(file_id, buf, 10);
312
 
  strcpy(res, ext);                             // Add extension last
313
 
  return res;                                   // Pointer to extension
314
 
}
315
 
 
316
 
 
317
 
/**
318
 
  Delete all temporary files used for SQL_LOAD.
319
 
*/
320
 
 
321
 
static void cleanup_load_tmpdir()
322
 
{
323
 
  MY_DIR *dirp;
324
 
  FILEINFO *file;
325
 
  uint32_t i;
326
 
  char fname[FN_REFLEN], prefbuf[31], *p;
327
 
 
328
 
  if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
329
 
    return;
330
 
 
331
 
  /*
332
 
     When we are deleting temporary files, we should only remove
333
 
     the files associated with the server id of our server.
334
 
     We don't use event_server_id here because since we've disabled
335
 
     direct binlogging of Create_file/Append_file/Exec_load events
336
 
     we cannot meet Start_log event in the middle of events from one
337
 
     LOAD DATA.
338
 
  */
339
 
  p= strncpy(prefbuf, STRING_WITH_LEN("SQL_LOAD-")) + 9;
340
 
  p= int10_to_str(::server_id, p, 10);
341
 
  *(p++)= '-';
342
 
  *p= 0;
343
 
 
344
 
  for (i=0 ; i < (uint)dirp->number_off_files; i++)
345
 
  {
346
 
    file=dirp->dir_entry+i;
347
 
    if (is_prefix(file->name, prefbuf))
348
 
    {
349
 
      fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
350
 
      my_delete(fname, MYF(0));
351
 
    }
352
 
  }
353
 
 
354
 
  my_dirend(dirp);
355
 
}
356
 
 
357
 
 
358
 
/*
359
 
  write_str()
360
 
*/
361
 
 
362
 
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
363
 
{
364
 
  unsigned char tmp[1];
365
 
  tmp[0]= (unsigned char) length;
366
 
  return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
367
 
          my_b_safe_write(file, (unsigned char*) str, length));
368
 
}
369
 
 
370
 
 
371
 
/*
372
 
  read_str()
373
 
*/
374
 
 
375
 
static inline int read_str(const char **buf, const char *buf_end,
376
 
                           const char **str, uint8_t *len)
377
 
{
378
 
  if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
379
 
    return 1;
380
 
  *len= (uint8_t) **buf;
381
 
  *str= (*buf)+1;
382
 
  (*buf)+= (uint) *len+1;
383
 
  return 0;
384
 
}
385
 
 
386
 
 
387
 
/**
388
 
  Transforms a string into "" or its expression in 0x... form.
389
 
*/
390
 
 
391
 
char *str_to_hex(char *to, const char *from, uint32_t len)
392
 
{
393
 
  if (len)
394
 
  {
395
 
    *to++= '0';
396
 
    *to++= 'x';
397
 
    to= octet2hex(to, from, len);
398
 
  }
399
 
  else
400
 
    to= strcpy(to, "\"\"")+2;
401
 
  return to;                               // pointer to end 0 of 'to'
402
 
}
403
 
 
404
 
 
405
 
/**
406
 
  Append a version of the 'from' string suitable for use in a query to
407
 
  the 'to' string.  To generate a correct escaping, the character set
408
 
  information in 'csinfo' is used.
409
 
*/
410
 
 
411
 
int
412
 
append_query_string(const CHARSET_INFO * const csinfo,
413
 
                    String const *from, String *to)
414
 
{
415
 
  char *beg, *ptr;
416
 
  uint32_t const orig_len= to->length();
417
 
  if (to->reserve(orig_len + from->length()*2+3))
418
 
    return 1;
419
 
 
420
 
  beg= to->c_ptr_quick() + to->length();
421
 
  ptr= beg;
422
 
  if (csinfo->escape_with_backslash_is_dangerous)
423
 
    ptr= str_to_hex(ptr, from->ptr(), from->length());
424
 
  else
425
 
  {
426
 
    *ptr++= '\'';
427
 
    ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
428
 
    *ptr++='\'';
429
 
  }
430
 
  to->length(orig_len + ptr - beg);
431
 
  return 0;
432
 
}
433
 
 
434
 
 
435
 
/**************************************************************************
436
 
        Log_event methods (= the parent class of all events)
437
 
**************************************************************************/
438
 
 
439
 
/**
440
 
  @return
441
 
  returns the human readable name of the event's type
442
 
*/
443
 
 
444
 
const char* Log_event::get_type_str(Log_event_type type)
445
 
{
446
 
  switch(type) {
447
 
  case START_EVENT_V3:  return "Start_v3";
448
 
  case STOP_EVENT:   return "Stop";
449
 
  case QUERY_EVENT:  return "Query";
450
 
  case ROTATE_EVENT: return "Rotate";
451
 
  case LOAD_EVENT:   return "Load";
452
 
  case NEW_LOAD_EVENT:   return "New_load";
453
 
  case SLAVE_EVENT:  return "Slave";
454
 
  case CREATE_FILE_EVENT: return "Create_file";
455
 
  case APPEND_BLOCK_EVENT: return "Append_block";
456
 
  case DELETE_FILE_EVENT: return "Delete_file";
457
 
  case EXEC_LOAD_EVENT: return "Exec_load";
458
 
  case XID_EVENT: return "Xid";
459
 
  case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
460
 
  case TABLE_MAP_EVENT: return "Table_map";
461
 
  case WRITE_ROWS_EVENT: return "Write_rows";
462
 
  case UPDATE_ROWS_EVENT: return "Update_rows";
463
 
  case DELETE_ROWS_EVENT: return "Delete_rows";
464
 
  case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
465
 
  case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
466
 
  case INCIDENT_EVENT: return "Incident";
467
 
  default: return "Unknown";                            /* impossible */
468
 
  }
469
 
}
470
 
 
471
 
const char* Log_event::get_type_str()
472
 
{
473
 
  return get_type_str(get_type_code());
474
 
}
475
 
 
476
 
 
477
 
/*
478
 
  Log_event::Log_event()
479
 
*/
480
 
 
481
 
Log_event::Log_event(Session* session_arg, uint16_t flags_arg, bool using_trans)
482
 
  :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), session(session_arg)
483
 
{
484
 
  server_id=    session->server_id;
485
 
  when=         session->start_time;
486
 
  cache_stmt=   using_trans;
487
 
}
488
 
 
489
 
 
490
 
/**
491
 
  This minimal constructor is for when you are not even sure that there
492
 
  is a valid Session. For example in the server when we are shutting down or
493
 
  flushing logs after receiving a SIGHUP (then we must write a Rotate to
494
 
  the binlog but we have no Session, so we need this minimal constructor).
495
 
*/
496
 
 
497
 
Log_event::Log_event()
498
 
  :temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
499
 
   session(0)
500
 
{
501
 
  server_id= ::server_id;
502
 
  /*
503
 
    We can't call time() here as this would cause a call before
504
 
    my_init() is called
505
 
  */
506
 
  when=         0;
507
 
  log_pos=      0;
508
 
}
509
 
 
510
 
 
511
 
/*
512
 
  Log_event::Log_event()
513
 
*/
514
 
 
515
 
Log_event::Log_event(const char* buf,
516
 
                     const Format_description_log_event* description_event)
517
 
  :temp_buf(0), cache_stmt(0)
518
 
{
519
 
  session= 0;
520
 
  when= uint4korr(buf);
521
 
  server_id= uint4korr(buf + SERVER_ID_OFFSET);
522
 
  data_written= uint4korr(buf + EVENT_LEN_OFFSET);
523
 
  if (description_event->binlog_version==1)
524
 
  {
525
 
    log_pos= 0;
526
 
    flags= 0;
527
 
    return;
528
 
  }
529
 
  /* 4.0 or newer */
530
 
  log_pos= uint4korr(buf + LOG_POS_OFFSET);
531
 
  /*
532
 
    If the log is 4.0 (so here it can only be a 4.0 relay log read by
533
 
    the SQL thread or a 4.0 master binlog read by the I/O thread),
534
 
    log_pos is the beginning of the event: we transform it into the end
535
 
    of the event, which is more useful.
536
 
    But how do you know that the log is 4.0: you know it if
537
 
    description_event is version 3 *and* you are not reading a
538
 
    Format_desc (remember that mysqlbinlog starts by assuming that 5.0
539
 
    logs are in 4.0 format, until it finds a Format_desc).
540
 
  */
541
 
  if (description_event->binlog_version==3 &&
542
 
      buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
543
 
  {
544
 
      /*
545
 
        If log_pos=0, don't change it. log_pos==0 is a marker to mean
546
 
        "don't change rli->group_master_log_pos" (see
547
 
        inc_group_relay_log_pos()). As it is unreal log_pos, adding the
548
 
        event len's is nonsense. For example, a fake Rotate event should
549
 
        not have its log_pos (which is 0) changed or it will modify
550
 
        Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
551
 
        value of (a non-zero offset which does not exist in the master's
552
 
        binlog, so which will cause problems if the user uses this value
553
 
        in CHANGE MASTER).
554
 
      */
555
 
    log_pos+= data_written; /* purecov: inspected */
556
 
  }
557
 
 
558
 
  flags= uint2korr(buf + FLAGS_OFFSET);
559
 
  if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
560
 
      (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
561
 
  {
562
 
    /*
563
 
      These events always have a header which stops here (i.e. their
564
 
      header is FROZEN).
565
 
    */
566
 
    /*
567
 
      Initialization to zero of all other Log_event members as they're
568
 
      not specified. Currently there are no such members; in the future
569
 
      there will be an event UID (but Format_description and Rotate
570
 
      don't need this UID, as they are not propagated through
571
 
      --log-slave-updates (remember the UID is used to not play a query
572
 
      twice when you have two masters which are slaves of a 3rd master).
573
 
      Then we are done.
574
 
    */
575
 
    return;
576
 
  }
577
 
  /* otherwise, go on with reading the header from buf (nothing now) */
578
 
}
579
 
 
580
 
 
581
 
int Log_event::do_update_pos(Relay_log_info *rli)
582
 
{
583
 
  /*
584
 
    rli is null when (as far as I (Guilhem) know) the caller is
585
 
    Load_log_event::do_apply_event *and* that one is called from
586
 
    Execute_load_log_event::do_apply_event.  In this case, we don't
587
 
    do anything here ; Execute_load_log_event::do_apply_event will
588
 
    call Log_event::do_apply_event again later with the proper rli.
589
 
    Strictly speaking, if we were sure that rli is null only in the
590
 
    case discussed above, 'if (rli)' is useless here.  But as we are
591
 
    not 100% sure, keep it for now.
592
 
 
593
 
    Matz: I don't think we will need this check with this refactoring.
594
 
  */
595
 
  if (rli)
596
 
  {
597
 
    /*
598
 
      bug#29309 simulation: resetting the flag to force
599
 
      wrong behaviour of artificial event to update
600
 
      rli->last_master_timestamp for only one time -
601
 
      the first FLUSH LOGS in the test.
602
 
    */
603
 
    if (debug_not_change_ts_if_art_event == 1
604
 
        && is_artificial_event())
605
 
      debug_not_change_ts_if_art_event= 0;
606
 
    rli->stmt_done(log_pos,
607
 
                   is_artificial_event() &&
608
 
                   debug_not_change_ts_if_art_event > 0 ? 0 : when);
609
 
    if (debug_not_change_ts_if_art_event == 0)
610
 
      debug_not_change_ts_if_art_event= 2;
611
 
  }
612
 
  return 0;                                   // Cannot fail currently
613
 
}
614
 
 
615
 
 
616
 
Log_event::enum_skip_reason
617
 
Log_event::do_shall_skip(Relay_log_info *rli)
618
 
{
619
 
  if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
620
 
    return EVENT_SKIP_IGNORE;
621
 
  else if (rli->slave_skip_counter > 0)
622
 
    return EVENT_SKIP_COUNT;
623
 
  else
624
 
    return EVENT_SKIP_NOT;
625
 
}
626
 
 
627
 
 
628
 
/*
629
 
  Log_event::pack_info()
630
 
*/
631
 
 
632
 
void Log_event::pack_info(Protocol *protocol)
633
 
{
634
 
  protocol->store("", &my_charset_bin);
635
 
}
636
 
 
637
 
 
638
 
const char* Log_event::get_db()
639
 
{
640
 
  return session ? session->db : 0;
641
 
}
642
 
 
643
 
 
644
 
/**
645
 
  init_show_field_list() prepares the column names and types for the
646
 
  output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
647
 
  EVENTS.
648
 
*/
649
 
 
650
 
void Log_event::init_show_field_list(List<Item>* field_list)
651
 
{
652
 
  field_list->push_back(new Item_empty_string("Log_name", 20));
653
 
  field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
654
 
                                            DRIZZLE_TYPE_LONGLONG));
655
 
  field_list->push_back(new Item_empty_string("Event_type", 20));
656
 
  field_list->push_back(new Item_return_int("Server_id", 10,
657
 
                                            DRIZZLE_TYPE_LONG));
658
 
  field_list->push_back(new Item_return_int("End_log_pos",
659
 
                                            MY_INT32_NUM_DECIMAL_DIGITS,
660
 
                                            DRIZZLE_TYPE_LONGLONG));
661
 
  field_list->push_back(new Item_empty_string("Info", 20));
662
 
}
663
 
 
664
 
/*
665
 
  Log_event::write()
666
 
*/
667
 
 
668
 
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
669
 
{
670
 
  unsigned char header[LOG_EVENT_HEADER_LEN];
671
 
  ulong now;
672
 
 
673
 
  /* Store number of bytes that will be written by this event */
674
 
  data_written= event_data_length + sizeof(header);
675
 
 
676
 
  /*
677
 
    log_pos != 0 if this is relay-log event. In this case we should not
678
 
    change the position
679
 
  */
680
 
 
681
 
  if (is_artificial_event())
682
 
  {
683
 
    /*
684
 
      We should not do any cleanup on slave when reading this. We
685
 
      mark this by setting log_pos to 0.  Start_log_event_v3() will
686
 
      detect this on reading and set artificial_event=1 for the event.
687
 
    */
688
 
    log_pos= 0;
689
 
  }
690
 
  else  if (!log_pos)
691
 
  {
692
 
    /*
693
 
      Calculate position of end of event
694
 
 
695
 
      Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
696
 
      work well.  So this will give slightly wrong positions for the
697
 
      Format_desc/Rotate/Stop events which the slave writes to its
698
 
      relay log. For example, the initial Format_desc will have
699
 
      end_log_pos=91 instead of 95. Because after writing the first 4
700
 
      bytes of the relay log, my_b_tell() still reports 0. Because
701
 
      my_b_append() does not update the counter which my_b_tell()
702
 
      later uses (one should probably use my_b_append_tell() to work
703
 
      around this).  To get right positions even when writing to the
704
 
      relay log, we use the (new) my_b_safe_tell().
705
 
 
706
 
      Note that this raises a question on the correctness of all these
707
 
      assert(my_b_tell()=rli->event_relay_log_pos).
708
 
 
709
 
      If in a transaction, the log_pos which we calculate below is not
710
 
      very good (because then my_b_safe_tell() returns start position
711
 
      of the BEGIN, so it's like the statement was at the BEGIN's
712
 
      place), but it's not a very serious problem (as the slave, when
713
 
      it is in a transaction, does not take those end_log_pos into
714
 
      account (as it calls inc_event_relay_log_pos()). To be fixed
715
 
      later, so that it looks less strange. But not bug.
716
 
    */
717
 
 
718
 
    log_pos= my_b_safe_tell(file)+data_written;
719
 
  }
720
 
 
721
 
  now= (ulong) get_time();                              // Query start time
722
 
 
723
 
  /*
724
 
    Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
725
 
    FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
726
 
    LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
727
 
    because we read them before knowing the format).
728
 
  */
729
 
 
730
 
  int4store(header, now);              // timestamp
731
 
  header[EVENT_TYPE_OFFSET]= get_type_code();
732
 
  int4store(header+ SERVER_ID_OFFSET, server_id);
733
 
  int4store(header+ EVENT_LEN_OFFSET, data_written);
734
 
  int4store(header+ LOG_POS_OFFSET, log_pos);
735
 
  int2store(header+ FLAGS_OFFSET, flags);
736
 
 
737
 
  return(my_b_safe_write(file, header, sizeof(header)) != 0);
738
 
}
739
 
 
740
 
 
741
 
time_t Log_event::get_time()
742
 
{
743
 
  Session *tmp_session;
744
 
  if (when)
745
 
    return when;
746
 
  if (session)
747
 
    return session->start_time;
748
 
  if ((tmp_session= current_session))
749
 
    return tmp_session->start_time;
750
 
  return time(0);
751
 
}
752
 
 
753
 
 
754
 
/**
755
 
  This needn't be format-tolerant, because we only read
756
 
  LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
757
 
*/
758
 
 
759
 
int Log_event::read_log_event(IO_CACHE* file, String* packet,
760
 
                              pthread_mutex_t* log_lock)
761
 
{
762
 
  ulong data_len;
763
 
  int result=0;
764
 
  char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
765
 
 
766
 
  if (log_lock)
767
 
    pthread_mutex_lock(log_lock);
768
 
  if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
769
 
  {
770
 
    /*
771
 
      If the read hits eof, we must report it as eof so the caller
772
 
      will know it can go into cond_wait to be woken up on the next
773
 
      update to the log.
774
 
    */
775
 
    if (!file->error)
776
 
      result= LOG_READ_EOF;
777
 
    else
778
 
      result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
779
 
    goto end;
780
 
  }
781
 
  data_len= uint4korr(buf + EVENT_LEN_OFFSET);
782
 
  if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
783
 
      data_len > current_session->variables.max_allowed_packet)
784
 
  {
785
 
    result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
786
 
             LOG_READ_TOO_LARGE);
787
 
    goto end;
788
 
  }
789
 
 
790
 
  /* Append the log event header to packet */
791
 
  if (packet->append(buf, sizeof(buf)))
792
 
  {
793
 
    /* Failed to allocate packet */
794
 
    result= LOG_READ_MEM;
795
 
    goto end;
796
 
  }
797
 
  data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
798
 
  if (data_len)
799
 
  {
800
 
    /* Append rest of event, read directly from file into packet */
801
 
    if (packet->append(file, data_len))
802
 
    {
803
 
      /*
804
 
        Fatal error occured when appending rest of the event
805
 
        to packet, possible failures:
806
 
        1. EOF occured when reading from file, it's really an error
807
 
           as data_len is >=0 there's supposed to be more bytes available.
808
 
           file->error will have been set to number of bytes left to read
809
 
        2. Read was interrupted, file->error would normally be set to -1
810
 
        3. Failed to allocate memory for packet, my_errno
811
 
           will be ENOMEM(file->error shuold be 0, but since the
812
 
           memory allocation occurs before the call to read it might
813
 
           be uninitialized)
814
 
      */
815
 
      result= (my_errno == ENOMEM ? LOG_READ_MEM :
816
 
               (file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
817
 
      /* Implicit goto end; */
818
 
    }
819
 
  }
820
 
 
821
 
end:
822
 
  if (log_lock)
823
 
    pthread_mutex_unlock(log_lock);
824
 
  return(result);
825
 
}
826
 
 
827
 
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
828
 
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
829
 
 
830
 
/**
831
 
  @note
832
 
    Allocates memory;  The caller is responsible for clean-up.
833
 
*/
834
 
Log_event* Log_event::read_log_event(IO_CACHE* file,
835
 
                                     pthread_mutex_t* log_lock,
836
 
                                     const Format_description_log_event
837
 
                                     *description_event)
838
 
{
839
 
  assert(description_event != 0);
840
 
  char head[LOG_EVENT_MINIMAL_HEADER_LEN];
841
 
  /*
842
 
    First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
843
 
    check the event for sanity and to know its length; no need to really parse
844
 
    it. We say "at most" because this could be a 3.23 master, which has header
845
 
    of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
846
 
    "minimal" over the set {MySQL >=4.0}).
847
 
  */
848
 
  uint32_t header_size= cmin(description_event->common_header_len,
849
 
                        LOG_EVENT_MINIMAL_HEADER_LEN);
850
 
 
851
 
  LOCK_MUTEX;
852
 
  if (my_b_read(file, (unsigned char *) head, header_size))
853
 
  {
854
 
    UNLOCK_MUTEX;
855
 
    /*
856
 
      No error here; it could be that we are at the file's end. However
857
 
      if the next my_b_read() fails (below), it will be an error as we
858
 
      were able to read the first bytes.
859
 
    */
860
 
    return(0);
861
 
  }
862
 
  uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
863
 
  char *buf= 0;
864
 
  const char *error= 0;
865
 
  Log_event *res=  0;
866
 
#ifndef max_allowed_packet
867
 
  Session *session=current_session;
868
 
  uint32_t max_allowed_packet= session ? session->variables.max_allowed_packet : ~(ulong)0;
869
 
#endif
870
 
 
871
 
  if (data_len > max_allowed_packet)
872
 
  {
873
 
    error = "Event too big";
874
 
    goto err;
875
 
  }
876
 
 
877
 
  if (data_len < header_size)
878
 
  {
879
 
    error = "Event too small";
880
 
    goto err;
881
 
  }
882
 
 
883
 
  // some events use the extra byte to null-terminate strings
884
 
  if (!(buf = (char*) malloc(data_len+1)))
885
 
  {
886
 
    error = "Out of memory";
887
 
    goto err;
888
 
  }
889
 
  buf[data_len] = 0;
890
 
  memcpy(buf, head, header_size);
891
 
  if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
892
 
  {
893
 
    error = "read error";
894
 
    goto err;
895
 
  }
896
 
  if ((res= read_log_event(buf, data_len, &error, description_event)))
897
 
    res->register_temp_buf(buf);
898
 
 
899
 
err:
900
 
  UNLOCK_MUTEX;
901
 
  if (!res)
902
 
  {
903
 
    assert(error != 0);
904
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Error in Log_event::read_log_event(): "
905
 
                    "'%s', data_len: %d, event_type: %d"),
906
 
                    error,data_len,head[EVENT_TYPE_OFFSET]);
907
 
    free(buf);
908
 
    /*
909
 
      The SQL slave thread will check if file->error<0 to know
910
 
      if there was an I/O error. Even if there is no "low-level" I/O errors
911
 
      with 'file', any of the high-level above errors is worrying
912
 
      enough to stop the SQL thread now ; as we are skipping the current event,
913
 
      going on with reading and successfully executing other events can
914
 
      only corrupt the slave's databases. So stop.
915
 
    */
916
 
    file->error= -1;
917
 
  }
918
 
  return(res);
919
 
}
920
 
 
921
 
 
922
 
/**
923
 
  Binlog format tolerance is in (buf, event_len, description_event)
924
 
  constructors.
925
 
*/
926
 
 
927
 
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
928
 
                                     const char **error,
929
 
                                     const Format_description_log_event *description_event)
930
 
{
931
 
  Log_event* ev;
932
 
  assert(description_event != 0);
933
 
 
934
 
  /* Check the integrity */
935
 
  if (event_len < EVENT_LEN_OFFSET ||
936
 
      buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
937
 
      (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
938
 
  {
939
 
    *error="Sanity check failed";               // Needed to free buffer
940
 
    return(NULL); // general sanity check - will fail on a partial read
941
 
  }
942
 
 
943
 
  uint32_t event_type= buf[EVENT_TYPE_OFFSET];
944
 
  if (event_type > description_event->number_of_event_types &&
945
 
      event_type != FORMAT_DESCRIPTION_EVENT)
946
 
  {
947
 
    /*
948
 
      It is unsafe to use the description_event if its post_header_len
949
 
      array does not include the event type.
950
 
    */
951
 
    ev= NULL;
952
 
  }
953
 
  else
954
 
  {
955
 
    /*
956
 
      In some previuos versions (see comment in
957
 
      Format_description_log_event::Format_description_log_event(char*,...)),
958
 
      event types were assigned different id numbers than in the
959
 
      present version. In order to replicate from such versions to the
960
 
      present version, we must map those event type id's to our event
961
 
      type id's.  The mapping is done with the event_type_permutation
962
 
      array, which was set up when the Format_description_log_event
963
 
      was read.
964
 
    */
965
 
    if (description_event->event_type_permutation)
966
 
      event_type= description_event->event_type_permutation[event_type];
967
 
 
968
 
    switch(event_type) {
969
 
    case QUERY_EVENT:
970
 
      ev  = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
971
 
      break;
972
 
    case LOAD_EVENT:
973
 
      ev = new Load_log_event(buf, event_len, description_event);
974
 
      break;
975
 
    case NEW_LOAD_EVENT:
976
 
      ev = new Load_log_event(buf, event_len, description_event);
977
 
      break;
978
 
    case ROTATE_EVENT:
979
 
      ev = new Rotate_log_event(buf, event_len, description_event);
980
 
      break;
981
 
    case CREATE_FILE_EVENT:
982
 
      ev = new Create_file_log_event(buf, event_len, description_event);
983
 
      break;
984
 
    case APPEND_BLOCK_EVENT:
985
 
      ev = new Append_block_log_event(buf, event_len, description_event);
986
 
      break;
987
 
    case DELETE_FILE_EVENT:
988
 
      ev = new Delete_file_log_event(buf, event_len, description_event);
989
 
      break;
990
 
    case EXEC_LOAD_EVENT:
991
 
      ev = new Execute_load_log_event(buf, event_len, description_event);
992
 
      break;
993
 
    case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
994
 
      ev = new Start_log_event_v3(buf, description_event);
995
 
      break;
996
 
    case STOP_EVENT:
997
 
      ev = new Stop_log_event(buf, description_event);
998
 
      break;
999
 
    case XID_EVENT:
1000
 
      ev = new Xid_log_event(buf, description_event);
1001
 
      break;
1002
 
    case FORMAT_DESCRIPTION_EVENT:
1003
 
      ev = new Format_description_log_event(buf, event_len, description_event);
1004
 
      break;
1005
 
    case WRITE_ROWS_EVENT:
1006
 
      ev = new Write_rows_log_event(buf, event_len, description_event);
1007
 
      break;
1008
 
    case UPDATE_ROWS_EVENT:
1009
 
      ev = new Update_rows_log_event(buf, event_len, description_event);
1010
 
      break;
1011
 
    case DELETE_ROWS_EVENT:
1012
 
      ev = new Delete_rows_log_event(buf, event_len, description_event);
1013
 
      break;
1014
 
    case TABLE_MAP_EVENT:
1015
 
      ev = new Table_map_log_event(buf, event_len, description_event);
1016
 
      break;
1017
 
    case BEGIN_LOAD_QUERY_EVENT:
1018
 
      ev = new Begin_load_query_log_event(buf, event_len, description_event);
1019
 
      break;
1020
 
    case EXECUTE_LOAD_QUERY_EVENT:
1021
 
      ev= new Execute_load_query_log_event(buf, event_len, description_event);
1022
 
      break;
1023
 
    case INCIDENT_EVENT:
1024
 
      ev = new Incident_log_event(buf, event_len, description_event);
1025
 
      break;
1026
 
    default:
1027
 
      ev= NULL;
1028
 
      break;
1029
 
    }
1030
 
  }
1031
 
 
1032
 
  /*
1033
 
    is_valid() are small event-specific sanity tests which are
1034
 
    important; for example there are some malloc() in constructors
1035
 
    (e.g. Query_log_event::Query_log_event(char*...)); when these
1036
 
    malloc() fail we can't return an error out of the constructor
1037
 
    (because constructor is "void") ; so instead we leave the pointer we
1038
 
    wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1039
 
    Same for Format_description_log_event, member 'post_header_len'.
1040
 
  */
1041
 
  if (!ev || !ev->is_valid())
1042
 
  {
1043
 
    delete ev;
1044
 
    *error= "Found invalid event in binary log";
1045
 
    return(0);
1046
 
  }
1047
 
  return(ev);
1048
 
}
1049
 
 
1050
 
inline Log_event::enum_skip_reason
1051
 
Log_event::continue_group(Relay_log_info *rli)
1052
 
{
1053
 
  if (rli->slave_skip_counter == 1)
1054
 
    return Log_event::EVENT_SKIP_IGNORE;
1055
 
  return Log_event::do_shall_skip(rli);
1056
 
}
1057
 
 
1058
 
/**************************************************************************
1059
 
        Query_log_event methods
1060
 
**************************************************************************/
1061
 
 
1062
 
/**
1063
 
  This (which is used only for SHOW BINLOG EVENTS) could be updated to
1064
 
  print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1065
 
  only an information, it does not produce suitable queries to replay (for
1066
 
  example it does not print LOAD DATA INFILE).
1067
 
  @todo
1068
 
    show the catalog ??
1069
 
*/
1070
 
 
1071
 
void Query_log_event::pack_info(Protocol *protocol)
1072
 
{
1073
 
  // TODO: show the catalog ??
1074
 
  char *buf, *pos;
1075
 
  if (!(buf= (char*) malloc(9 + db_len + q_len)))
1076
 
    return;
1077
 
  pos= buf;
1078
 
  if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1079
 
      && db && db_len)
1080
 
  {
1081
 
    pos= strcpy(buf, "use `")+5;
1082
 
    memcpy(pos, db, db_len);
1083
 
    pos= strcpy(pos+db_len, "`; ")+3;
1084
 
  }
1085
 
  if (query && q_len)
1086
 
  {
1087
 
    memcpy(pos, query, q_len);
1088
 
    pos+= q_len;
1089
 
  }
1090
 
  protocol->store(buf, pos-buf, &my_charset_bin);
1091
 
  free(buf);
1092
 
}
1093
 
 
1094
 
 
1095
 
/**
1096
 
  Query_log_event::write().
1097
 
 
1098
 
  @note
1099
 
    In this event we have to modify the header to have the correct
1100
 
    EVENT_LEN_OFFSET as we don't yet know how many status variables we
1101
 
    will print!
1102
 
*/
1103
 
 
1104
 
bool Query_log_event::write(IO_CACHE* file)
1105
 
{
1106
 
  /**
1107
 
    @todo if catalog can be of length FN_REFLEN==512, then we are not
1108
 
    replicating it correctly, since the length is stored in a byte
1109
 
    /sven
1110
 
  */
1111
 
  unsigned char buf[QUERY_HEADER_LEN+
1112
 
            1+4+           // code of flags2 and flags2
1113
 
            1+8+           // code of sql_mode and sql_mode
1114
 
            1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1115
 
            1+4+           // code of autoinc and the 2 autoinc variables
1116
 
            1+6+           // code of charset and charset
1117
 
            1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1118
 
            1+2+           // code of lc_time_names and lc_time_names_number
1119
 
            1+2            // code of charset_database and charset_database_number
1120
 
            ], *start, *start_of_status;
1121
 
  ulong event_length;
1122
 
 
1123
 
  if (!query)
1124
 
    return 1;                                   // Something wrong with event
1125
 
 
1126
 
  /*
1127
 
    We want to store the thread id:
1128
 
    (- as an information for the user when he reads the binlog)
1129
 
    - if the query uses temporary table: for the slave SQL thread to know to
1130
 
    which master connection the temp table belongs.
1131
 
    Now imagine we (write()) are called by the slave SQL thread (we are
1132
 
    logging a query executed by this thread; the slave runs with
1133
 
    --log-slave-updates). Then this query will be logged with
1134
 
    thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1135
 
    the same name were created simultaneously on the master (in the master
1136
 
    binlog you have
1137
 
    CREATE TEMPORARY TABLE t; (thread 1)
1138
 
    CREATE TEMPORARY TABLE t; (thread 2)
1139
 
    ...)
1140
 
    then in the slave's binlog there will be
1141
 
    CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1142
 
    CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1143
 
    which is bad (same thread id!).
1144
 
 
1145
 
    To avoid this, we log the thread's thread id EXCEPT for the SQL
1146
 
    slave thread for which we log the original (master's) thread id.
1147
 
    Now this moves the bug: what happens if the thread id on the
1148
 
    master was 10 and when the slave replicates the query, a
1149
 
    connection number 10 is opened by a normal client on the slave,
1150
 
    and updates a temp table of the same name? We get a problem
1151
 
    again. To avoid this, in the handling of temp tables (sql_base.cc)
1152
 
    we use thread_id AND server_id.  TODO when this is merged into
1153
 
    4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1154
 
    and is a session variable: that's to make mysqlbinlog work with
1155
 
    temp tables. We probably need to introduce
1156
 
 
1157
 
    SET PSEUDO_SERVER_ID
1158
 
    for mysqlbinlog in 4.1. mysqlbinlog would print:
1159
 
    SET PSEUDO_SERVER_ID=
1160
 
    SET PSEUDO_THREAD_ID=
1161
 
    for each query using temp tables.
1162
 
  */
1163
 
  int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1164
 
  int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1165
 
  buf[Q_DB_LEN_OFFSET] = (char) db_len;
1166
 
  int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1167
 
 
1168
 
  /*
1169
 
    You MUST always write status vars in increasing order of code. This
1170
 
    guarantees that a slightly older slave will be able to parse those he
1171
 
    knows.
1172
 
  */
1173
 
  start_of_status= start= buf+QUERY_HEADER_LEN;
1174
 
  if (flags2_inited)
1175
 
  {
1176
 
    *start++= Q_FLAGS2_CODE;
1177
 
    int4store(start, flags2);
1178
 
    start+= 4;
1179
 
  }
1180
 
  if (lc_time_names_number)
1181
 
  {
1182
 
    assert(lc_time_names_number <= 0xFFFF);
1183
 
    *start++= Q_LC_TIME_NAMES_CODE;
1184
 
    int2store(start, lc_time_names_number);
1185
 
    start+= 2;
1186
 
  }
1187
 
  if (charset_database_number)
1188
 
  {
1189
 
    assert(charset_database_number <= 0xFFFF);
1190
 
    *start++= Q_CHARSET_DATABASE_CODE;
1191
 
    int2store(start, charset_database_number);
1192
 
    start+= 2;
1193
 
  }
1194
 
  /*
1195
 
    Here there could be code like
1196
 
    if (command-line-option-which-says-"log_this_variable" && inited)
1197
 
    {
1198
 
    *start++= Q_THIS_VARIABLE_CODE;
1199
 
    int4store(start, this_variable);
1200
 
    start+= 4;
1201
 
    }
1202
 
  */
1203
 
 
1204
 
  /* Store length of status variables */
1205
 
  status_vars_len= (uint) (start-start_of_status);
1206
 
  assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1207
 
  int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1208
 
 
1209
 
  /*
1210
 
    Calculate length of whole event
1211
 
    The "1" below is the \0 in the db's length
1212
 
  */
1213
 
  event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1214
 
 
1215
 
  return (write_header(file, event_length) ||
1216
 
          my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1217
 
          write_post_header_for_derived(file) ||
1218
 
          my_b_safe_write(file, (unsigned char*) start_of_status,
1219
 
                          (uint) (start-start_of_status)) ||
1220
 
          my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1221
 
          my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1222
 
}
1223
 
 
1224
 
/**
1225
 
  The simplest constructor that could possibly work.  This is used for
1226
 
  creating static objects that have a special meaning and are invisible
1227
 
  to the log.
1228
 
*/
1229
 
Query_log_event::Query_log_event()
1230
 
  :Log_event(), data_buf(0)
1231
 
{
1232
 
}
1233
 
 
1234
 
 
1235
 
/*
1236
 
  SYNOPSIS
1237
 
    Query_log_event::Query_log_event()
1238
 
      session_arg           - thread handle
1239
 
      query_arg         - array of char representing the query
1240
 
      query_length      - size of the  `query_arg' array
1241
 
      using_trans       - there is a modified transactional table
1242
 
      suppress_use      - suppress the generation of 'USE' statements
1243
 
      killed_status_arg - an optional with default to Session::KILLED_NO_VALUE
1244
 
                          if the value is different from the default, the arg
1245
 
                          is set to the current session->killed value.
1246
 
                          A caller might need to masquerade session->killed with
1247
 
                          Session::NOT_KILLED.
1248
 
  DESCRIPTION
1249
 
  Creates an event for binlogging
1250
 
  The value for local `killed_status' can be supplied by caller.
1251
 
*/
1252
 
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1253
 
                                 ulong query_length, bool using_trans,
1254
 
                                 bool suppress_use,
1255
 
                                 Session::killed_state killed_status_arg)
1256
 
:Log_event(session_arg,
1257
 
           (session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
1258
 
           (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1259
 
           using_trans),
1260
 
  data_buf(0), query(query_arg), catalog(session_arg->catalog),
1261
 
  db(session_arg->db), q_len((uint32_t) query_length),
1262
 
  thread_id(session_arg->thread_id),
1263
 
  /* save the original thread id; we already know the server id */
1264
 
  slave_proxy_id(session_arg->variables.pseudo_thread_id),
1265
 
  flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1266
 
  sql_mode(0),
1267
 
  auto_increment_increment(session_arg->variables.auto_increment_increment),
1268
 
  auto_increment_offset(session_arg->variables.auto_increment_offset),
1269
 
  lc_time_names_number(session_arg->variables.lc_time_names->number),
1270
 
   charset_database_number(0)
1271
 
{
1272
 
  time_t end_time;
1273
 
 
1274
 
  if (killed_status_arg == Session::KILLED_NO_VALUE)
1275
 
    killed_status_arg= session_arg->killed;
1276
 
 
1277
 
  error_code=
1278
 
    (killed_status_arg == Session::NOT_KILLED) ?
1279
 
    (session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
1280
 
    (session_arg->killed_errno());
1281
 
 
1282
 
  time(&end_time);
1283
 
  exec_time = (ulong) (end_time  - session_arg->start_time);
1284
 
  /**
1285
 
    @todo this means that if we have no catalog, then it is replicated
1286
 
    as an existing catalog of length zero. is that safe? /sven
1287
 
  */
1288
 
  catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1289
 
  /* status_vars_len is set just before writing the event */
1290
 
  db_len = (db) ? (uint32_t) strlen(db) : 0;
1291
 
  if (session_arg->variables.collation_database != session_arg->db_charset)
1292
 
    charset_database_number= session_arg->variables.collation_database->number;
1293
 
 
1294
 
  /*
1295
 
    If we don't use flags2 for anything else than options contained in
1296
 
    session_arg->options, it would be more efficient to flags2=session_arg->options
1297
 
    (OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1298
 
    But it's likely that we don't want to use 32 bits for 3 bits; in the future
1299
 
    we will probably want to reclaim the 29 bits. So we need the &.
1300
 
  */
1301
 
  flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1302
 
  assert(session_arg->variables.collation_server->number < 256*256);
1303
 
  int2store(charset+4, session_arg->variables.collation_server->number);
1304
 
  time_zone_len= 0;
1305
 
}
1306
 
 
1307
 
static void copy_str_and_move(const char **src,
1308
 
                              Log_event::Byte **dst,
1309
 
                              uint32_t len)
1310
 
{
1311
 
  memcpy(*dst, *src, len);
1312
 
  *src= (const char *)*dst;
1313
 
  (*dst)+= len;
1314
 
  *(*dst)++= 0;
1315
 
}
1316
 
 
1317
 
 
1318
 
/**
1319
 
   Macro to check that there is enough space to read from memory.
1320
 
 
1321
 
   @param PTR Pointer to memory
1322
 
   @param END End of memory
1323
 
   @param CNT Number of bytes that should be read.
1324
 
 */
1325
 
#define CHECK_SPACE(PTR,END,CNT)                      \
1326
 
  do {                                                \
1327
 
    assert((PTR) + (CNT) <= (END));                   \
1328
 
    if ((PTR) + (CNT) > (END)) {                      \
1329
 
      query= 0;                                       \
1330
 
      return;                                         \
1331
 
    }                                                 \
1332
 
  } while (0)
1333
 
 
1334
 
 
1335
 
/**
1336
 
  This is used by the SQL slave thread to prepare the event before execution.
1337
 
*/
1338
 
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1339
 
                                 const Format_description_log_event
1340
 
                                 *description_event,
1341
 
                                 Log_event_type event_type)
1342
 
  :Log_event(buf, description_event), data_buf(0), query(NULL),
1343
 
   db(NULL), catalog_len(0), status_vars_len(0),
1344
 
   flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1345
 
   auto_increment_increment(1), auto_increment_offset(1),
1346
 
   time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1347
 
{
1348
 
  uint32_t data_len;
1349
 
  uint32_t tmp;
1350
 
  uint8_t common_header_len, post_header_len;
1351
 
  Log_event::Byte *start;
1352
 
  const Log_event::Byte *end;
1353
 
  bool catalog_nz= 1;
1354
 
 
1355
 
  common_header_len= description_event->common_header_len;
1356
 
  post_header_len= description_event->post_header_len[event_type-1];
1357
 
 
1358
 
  /*
1359
 
    We test if the event's length is sensible, and if so we compute data_len.
1360
 
    We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1361
 
    We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1362
 
  */
1363
 
  if (event_len < (uint)(common_header_len + post_header_len))
1364
 
    return;
1365
 
  data_len = event_len - (common_header_len + post_header_len);
1366
 
  buf+= common_header_len;
1367
 
 
1368
 
  slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1369
 
  exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1370
 
  db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1371
 
  error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1372
 
 
1373
 
  /*
1374
 
    5.0 format starts here.
1375
 
    Depending on the format, we may or not have affected/warnings etc
1376
 
    The remnent post-header to be parsed has length:
1377
 
  */
1378
 
  tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1379
 
  if (tmp)
1380
 
  {
1381
 
    status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1382
 
    /*
1383
 
      Check if status variable length is corrupt and will lead to very
1384
 
      wrong data. We could be even more strict and require data_len to
1385
 
      be even bigger, but this will suffice to catch most corruption
1386
 
      errors that can lead to a crash.
1387
 
    */
1388
 
    if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1389
 
    {
1390
 
      query= 0;
1391
 
      return;
1392
 
    }
1393
 
    data_len-= status_vars_len;
1394
 
    tmp-= 2;
1395
 
  }
1396
 
  /*
1397
 
    We have parsed everything we know in the post header for QUERY_EVENT,
1398
 
    the rest of post header is either comes from older version MySQL or
1399
 
    dedicated to derived events (e.g. Execute_load_query...)
1400
 
  */
1401
 
 
1402
 
  /* variable-part: the status vars; only in MySQL 5.0  */
1403
 
 
1404
 
  start= (Log_event::Byte*) (buf+post_header_len);
1405
 
  end= (const Log_event::Byte*) (start+status_vars_len);
1406
 
  for (const Log_event::Byte* pos= start; pos < end;)
1407
 
  {
1408
 
    switch (*pos++) {
1409
 
    case Q_FLAGS2_CODE:
1410
 
      CHECK_SPACE(pos, end, 4);
1411
 
      flags2_inited= 1;
1412
 
      flags2= uint4korr(pos);
1413
 
      pos+= 4;
1414
 
      break;
1415
 
    case Q_LC_TIME_NAMES_CODE:
1416
 
      CHECK_SPACE(pos, end, 2);
1417
 
      lc_time_names_number= uint2korr(pos);
1418
 
      pos+= 2;
1419
 
      break;
1420
 
    case Q_CHARSET_DATABASE_CODE:
1421
 
      CHECK_SPACE(pos, end, 2);
1422
 
      charset_database_number= uint2korr(pos);
1423
 
      pos+= 2;
1424
 
      break;
1425
 
    default:
1426
 
      /* That's why you must write status vars in growing order of code */
1427
 
      pos= (const unsigned char*) end;                         // Break loop
1428
 
    }
1429
 
  }
1430
 
 
1431
 
  if (!(start= data_buf = (Log_event::Byte*) malloc(catalog_len + 1 +
1432
 
                                             time_zone_len + 1 +
1433
 
                                             data_len + 1)))
1434
 
      return;
1435
 
  if (catalog_len)                                  // If catalog is given
1436
 
  {
1437
 
    /**
1438
 
      @todo we should clean up and do only copy_str_and_move; it
1439
 
      works for both cases.  Then we can remove the catalog_nz
1440
 
      flag. /sven
1441
 
    */
1442
 
    if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1443
 
      copy_str_and_move(&catalog, &start, catalog_len);
1444
 
    else
1445
 
    {
1446
 
      memcpy(start, catalog, catalog_len+1); // copy end 0
1447
 
      catalog= (const char *)start;
1448
 
      start+= catalog_len+1;
1449
 
    }
1450
 
  }
1451
 
  if (time_zone_len)
1452
 
    copy_str_and_move(&time_zone_str, &start, time_zone_len);
1453
 
 
1454
 
  /**
1455
 
    if time_zone_len or catalog_len are 0, then time_zone and catalog
1456
 
    are uninitialized at this point.  shouldn't they point to the
1457
 
    zero-length null-terminated strings we allocated space for in the
1458
 
    my_alloc call above? /sven
1459
 
  */
1460
 
 
1461
 
  /* A 2nd variable part; this is common to all versions */
1462
 
  memcpy(start, end, data_len);          // Copy db and query
1463
 
  start[data_len]= '\0';              // End query with \0 (For safetly)
1464
 
  db= (char *)start;
1465
 
  query= (char *)(start + db_len + 1);
1466
 
  q_len= data_len - db_len -1;
1467
 
  return;
1468
 
}
1469
 
 
1470
 
 
1471
 
/*
1472
 
  Query_log_event::do_apply_event()
1473
 
*/
1474
 
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1475
 
{
1476
 
  return do_apply_event(rli, query, q_len);
1477
 
}
1478
 
 
1479
 
 
1480
 
/**
1481
 
  @todo
1482
 
  Compare the values of "affected rows" around here. Something
1483
 
  like:
1484
 
  @code
1485
 
     if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1486
 
     {
1487
 
     errmsg_printf(ERRMSG_LVL_ERROR, "Slave: did not get the expected number of affected \
1488
 
     rows running query from master - expected %d, got %d (this numbers \
1489
 
     should have matched modulo 4294967296).", 0, ...);
1490
 
     session->query_error = 1;
1491
 
     }
1492
 
  @endcode
1493
 
  We may also want an option to tell the slave to ignore "affected"
1494
 
  mismatch. This mismatch could be implemented with a new ER_ code, and
1495
 
  to ignore it you would use --slave-skip-errors...
1496
 
*/
1497
 
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1498
 
                                      const char *query_arg, uint32_t q_len_arg)
1499
 
{
1500
 
  int expected_error,actual_error= 0;
1501
 
  Query_id &query_id= Query_id::get_query_id();
1502
 
  /*
1503
 
    Colleagues: please never free(session->catalog) in MySQL. This would
1504
 
    lead to bugs as here session->catalog is a part of an alloced block,
1505
 
    not an entire alloced block (see
1506
 
    Query_log_event::do_apply_event()). Same for session->db.  Thank
1507
 
    you.
1508
 
  */
1509
 
  session->catalog= catalog_len ? (char *) catalog : (char *)"";
1510
 
  session->set_db(db, strlen(db));       /* allocates a copy of 'db' */
1511
 
  session->variables.auto_increment_increment= auto_increment_increment;
1512
 
  session->variables.auto_increment_offset=    auto_increment_offset;
1513
 
 
1514
 
  /*
1515
 
    InnoDB internally stores the master log position it has executed so far,
1516
 
    i.e. the position just after the COMMIT event.
1517
 
    When InnoDB will want to store, the positions in rli won't have
1518
 
    been updated yet, so group_master_log_* will point to old BEGIN
1519
 
    and event_master_log* will point to the beginning of current COMMIT.
1520
 
    But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1521
 
    END of the current log event (COMMIT). We save it in rli so that InnoDB can
1522
 
    access it.
1523
 
  */
1524
 
  const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1525
 
 
1526
 
  clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1527
 
  const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1528
 
 
1529
 
  /*
1530
 
    Note:   We do not need to execute reset_one_shot_variables() if this
1531
 
            db_ok() test fails.
1532
 
    Reason: The db stored in binlog events is the same for SET and for
1533
 
            its companion query.  If the SET is ignored because of
1534
 
            db_ok(), the companion query will also be ignored, and if
1535
 
            the companion query is ignored in the db_ok() test of
1536
 
            ::do_apply_event(), then the companion SET also have so
1537
 
            we don't need to reset_one_shot_variables().
1538
 
  */
1539
 
  if (1)
1540
 
  {
1541
 
    session->set_time((time_t)when);
1542
 
    session->query_length= q_len_arg;
1543
 
    session->query= (char*)query_arg;
1544
 
    session->query_id= query_id.next();
1545
 
    session->variables.pseudo_thread_id= thread_id;             // for temp tables
1546
 
 
1547
 
    if (ignored_error_code((expected_error= error_code)) ||
1548
 
        !check_expected_error(session,rli,expected_error))
1549
 
    {
1550
 
      if (flags2_inited)
1551
 
        /*
1552
 
          all bits of session->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1553
 
          must take their value from flags2.
1554
 
        */
1555
 
        session->options= flags2|(session->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1556
 
      if (time_zone_len)
1557
 
      {
1558
 
        String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1559
 
        if (!(session->variables.time_zone= my_tz_find(session, &tmp)))
1560
 
        {
1561
 
          my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1562
 
          session->variables.time_zone= global_system_variables.time_zone;
1563
 
          goto compare_errors;
1564
 
        }
1565
 
      }
1566
 
      if (lc_time_names_number)
1567
 
      {
1568
 
        if (!(session->variables.lc_time_names=
1569
 
              my_locale_by_number(lc_time_names_number)))
1570
 
        {
1571
 
          my_printf_error(ER_UNKNOWN_ERROR,
1572
 
                      "Unknown locale: '%d'", MYF(0), lc_time_names_number);
1573
 
          session->variables.lc_time_names= &my_locale_en_US;
1574
 
          goto compare_errors;
1575
 
        }
1576
 
      }
1577
 
      else
1578
 
        session->variables.lc_time_names= &my_locale_en_US;
1579
 
      if (charset_database_number)
1580
 
      {
1581
 
        const CHARSET_INFO *cs;
1582
 
        if (!(cs= get_charset(charset_database_number, MYF(0))))
1583
 
        {
1584
 
          char buf[20];
1585
 
          int10_to_str((int) charset_database_number, buf, -10);
1586
 
          my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1587
 
          goto compare_errors;
1588
 
        }
1589
 
        session->variables.collation_database= cs;
1590
 
      }
1591
 
      else
1592
 
        session->variables.collation_database= session->db_charset;
1593
 
 
1594
 
      /* Execute the query (note that we bypass dispatch_command()) */
1595
 
      const char* found_semicolon= NULL;
1596
 
      mysql_parse(session, session->query, session->query_length, &found_semicolon);
1597
 
      log_slow_statement(session);
1598
 
    }
1599
 
    else
1600
 
    {
1601
 
      /*
1602
 
        The query got a really bad error on the master (thread killed etc),
1603
 
        which could be inconsistent. Parse it to test the table names: if the
1604
 
        replicate-*-do|ignore-table rules say "this query must be ignored" then
1605
 
        we exit gracefully; otherwise we warn about the bad error and tell DBA
1606
 
        to check/fix it.
1607
 
      */
1608
 
      if (mysql_test_parse_for_slave(session, session->query, session->query_length))
1609
 
        clear_all_errors(session, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1610
 
      else
1611
 
      {
1612
 
        rli->report(ERROR_LEVEL, expected_error,
1613
 
                    _("Query partially completed on the master "
1614
 
                      "(error on master: %d) and was aborted. There is a "
1615
 
                      "chance that your master is inconsistent at this "
1616
 
                      "point. If you are sure that your master is ok, run "
1617
 
                      "this query manually on the slave and then restart the "
1618
 
                      "slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1619
 
                      "START SLAVE; . Query: '%s'"),
1620
 
                    expected_error, session->query);
1621
 
        session->is_slave_error= 1;
1622
 
      }
1623
 
      goto end;
1624
 
    }
1625
 
 
1626
 
compare_errors:
1627
 
 
1628
 
     /*
1629
 
      If we expected a non-zero error code, and we don't get the same error
1630
 
      code, and none of them should be ignored.
1631
 
    */
1632
 
    actual_error= session->is_error() ? session->main_da.sql_errno() : 0;
1633
 
    if ((expected_error != actual_error) &&
1634
 
        expected_error &&
1635
 
        !ignored_error_code(actual_error) &&
1636
 
        !ignored_error_code(expected_error))
1637
 
    {
1638
 
      rli->report(ERROR_LEVEL, 0,
1639
 
                  _("Query caused differenxt errors on master and slave.\n"
1640
 
                    "Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1641
 
                    "Default database: '%s'. Query: '%s'"),
1642
 
                  ER(expected_error),
1643
 
                  expected_error,
1644
 
                  actual_error ? session->main_da.message() : _("no error"),
1645
 
                  actual_error,
1646
 
                  print_slave_db_safe(db), query_arg);
1647
 
      session->is_slave_error= 1;
1648
 
    }
1649
 
    /*
1650
 
      If we get the same error code as expected, or they should be ignored.
1651
 
    */
1652
 
    else if (expected_error == actual_error ||
1653
 
             ignored_error_code(actual_error))
1654
 
    {
1655
 
      clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1656
 
      session->killed= Session::NOT_KILLED;
1657
 
    }
1658
 
    /*
1659
 
      Other cases: mostly we expected no error and get one.
1660
 
    */
1661
 
    else if (session->is_slave_error || session->is_fatal_error)
1662
 
    {
1663
 
      rli->report(ERROR_LEVEL, actual_error,
1664
 
                  _("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1665
 
                  (actual_error ? session->main_da.message() :
1666
 
                   _("unexpected success or fatal error")),
1667
 
                      print_slave_db_safe(session->db), query_arg);
1668
 
      session->is_slave_error= 1;
1669
 
    }
1670
 
 
1671
 
    /*
1672
 
      TODO: compare the values of "affected rows" around here. Something
1673
 
      like:
1674
 
      if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1675
 
      {
1676
 
      errmsg_printf(ERRMSG_LVL_ERROR, "Slave: did not get the expected number of affected \
1677
 
      rows running query from master - expected %d, got %d (this numbers \
1678
 
      should have matched modulo 4294967296).", 0, ...);
1679
 
      session->is_slave_error = 1;
1680
 
      }
1681
 
      We may also want an option to tell the slave to ignore "affected"
1682
 
      mismatch. This mismatch could be implemented with a new ER_ code, and
1683
 
      to ignore it you would use --slave-skip-errors...
1684
 
 
1685
 
      To do the comparison we need to know the value of "affected" which the
1686
 
      above mysql_parse() computed. And we need to know the value of
1687
 
      "affected" in the master's binlog. Both will be implemented later. The
1688
 
      important thing is that we now have the format ready to log the values
1689
 
      of "affected" in the binlog. So we can release 5.0.0 before effectively
1690
 
      logging "affected" and effectively comparing it.
1691
 
    */
1692
 
  } /* End of if (db_ok(... */
1693
 
 
1694
 
end:
1695
 
  pthread_mutex_lock(&LOCK_thread_count);
1696
 
  /*
1697
 
    Probably we have set session->query, session->db, session->catalog to point to places
1698
 
    in the data_buf of this event. Now the event is going to be deleted
1699
 
    probably, so data_buf will be freed, so the session->... listed above will be
1700
 
    pointers to freed memory.
1701
 
    So we must set them to 0, so that those bad pointers values are not later
1702
 
    used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1703
 
    don't suffer from these assignments to 0 as DROP TEMPORARY
1704
 
    Table uses the db.table syntax.
1705
 
  */
1706
 
  session->catalog= 0;
1707
 
  session->set_db(NULL, 0);                 /* will free the current database */
1708
 
  session->query= 0;                    // just to be sure
1709
 
  session->query_length= 0;
1710
 
  pthread_mutex_unlock(&LOCK_thread_count);
1711
 
  close_thread_tables(session);
1712
 
  session->first_successful_insert_id_in_prev_stmt= 0;
1713
 
  free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
1714
 
  return session->is_slave_error;
1715
 
}
1716
 
 
1717
 
int Query_log_event::do_update_pos(Relay_log_info *rli)
1718
 
{
1719
 
  return Log_event::do_update_pos(rli);
1720
 
}
1721
 
 
1722
 
 
1723
 
Log_event::enum_skip_reason
1724
 
Query_log_event::do_shall_skip(Relay_log_info *rli)
1725
 
{
1726
 
  assert(query && q_len > 0);
1727
 
 
1728
 
  if (rli->slave_skip_counter > 0)
1729
 
  {
1730
 
    if (strcmp("BEGIN", query) == 0)
1731
 
    {
1732
 
      session->options|= OPTION_BEGIN;
1733
 
      return(Log_event::continue_group(rli));
1734
 
    }
1735
 
 
1736
 
    if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1737
 
    {
1738
 
      session->options&= ~OPTION_BEGIN;
1739
 
      return(Log_event::EVENT_SKIP_COUNT);
1740
 
    }
1741
 
  }
1742
 
  return(Log_event::do_shall_skip(rli));
1743
 
}
1744
 
 
1745
 
 
1746
 
/**************************************************************************
1747
 
        Start_log_event_v3 methods
1748
 
**************************************************************************/
1749
 
 
1750
 
Start_log_event_v3::Start_log_event_v3()
1751
 
  :Log_event(), created(0), binlog_version(BINLOG_VERSION),
1752
 
   artificial_event(0), dont_set_created(0)
1753
 
{
1754
 
  memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1755
 
}
1756
 
 
1757
 
/*
1758
 
  Start_log_event_v3::pack_info()
1759
 
*/
1760
 
 
1761
 
void Start_log_event_v3::pack_info(Protocol *protocol)
1762
 
{
1763
 
  char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1764
 
  pos= strcpy(buf, "Server ver: ")+12;
1765
 
  pos= strcpy(pos, server_version)+strlen(server_version);
1766
 
  pos= strcpy(pos, ", Binlog ver: ")+14;
1767
 
  pos= int10_to_str(binlog_version, pos, 10);
1768
 
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1769
 
}
1770
 
 
1771
 
 
1772
 
/*
1773
 
  Start_log_event_v3::Start_log_event_v3()
1774
 
*/
1775
 
 
1776
 
Start_log_event_v3::Start_log_event_v3(const char* buf,
1777
 
                                       const Format_description_log_event
1778
 
                                       *description_event)
1779
 
  :Log_event(buf, description_event)
1780
 
{
1781
 
  buf+= description_event->common_header_len;
1782
 
  binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
1783
 
  memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
1784
 
         ST_SERVER_VER_LEN);
1785
 
  // prevent overrun if log is corrupted on disk
1786
 
  server_version[ST_SERVER_VER_LEN-1]= 0;
1787
 
  created= uint4korr(buf+ST_CREATED_OFFSET);
1788
 
  /* We use log_pos to mark if this was an artificial event or not */
1789
 
  artificial_event= (log_pos == 0);
1790
 
  dont_set_created= 1;
1791
 
}
1792
 
 
1793
 
 
1794
 
/*
1795
 
  Start_log_event_v3::write()
1796
 
*/
1797
 
 
1798
 
bool Start_log_event_v3::write(IO_CACHE* file)
1799
 
{
1800
 
  char buff[START_V3_HEADER_LEN];
1801
 
  int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
1802
 
  memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
1803
 
  if (!dont_set_created)
1804
 
    created= when= get_time();
1805
 
  int4store(buff + ST_CREATED_OFFSET,created);
1806
 
  return (write_header(file, sizeof(buff)) ||
1807
 
          my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
1808
 
}
1809
 
 
1810
 
 
1811
 
/**
1812
 
  Start_log_event_v3::do_apply_event() .
1813
 
  The master started
1814
 
 
1815
 
    IMPLEMENTATION
1816
 
    - To handle the case where the master died without having time to write
1817
 
    DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
1818
 
    TODO), we clean up all temporary tables that we got, if we are sure we
1819
 
    can (see below).
1820
 
 
1821
 
  @todo
1822
 
    - Remove all active user locks.
1823
 
    Guilhem 2003-06: this is true but not urgent: the worst it can cause is
1824
 
    the use of a bit of memory for a user lock which will not be used
1825
 
    anymore. If the user lock is later used, the old one will be released. In
1826
 
    other words, no deadlock problem.
1827
 
*/
1828
 
 
1829
 
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
1830
 
{
1831
 
  switch (binlog_version)
1832
 
  {
1833
 
  case 3:
1834
 
  case 4:
1835
 
    /*
1836
 
      This can either be 4.x (then a Start_log_event_v3 is only at master
1837
 
      startup so we are sure the master has restarted and cleared his temp
1838
 
      tables; the event always has 'created'>0) or 5.0 (then we have to test
1839
 
      'created').
1840
 
    */
1841
 
    if (created)
1842
 
    {
1843
 
      session->close_temporary_tables();
1844
 
      cleanup_load_tmpdir();
1845
 
    }
1846
 
    break;
1847
 
 
1848
 
    /*
1849
 
       Now the older formats; in that case load_tmpdir is cleaned up by the I/O
1850
 
       thread.
1851
 
    */
1852
 
  case 1:
1853
 
    if (strncmp(rli->relay_log.description_event_for_exec->server_version,
1854
 
                "3.23.57",7) >= 0 && created)
1855
 
    {
1856
 
      /*
1857
 
        Can distinguish, based on the value of 'created': this event was
1858
 
        generated at master startup.
1859
 
      */
1860
 
      session->close_temporary_tables();
1861
 
    }
1862
 
    /*
1863
 
      Otherwise, can't distinguish a Start_log_event generated at
1864
 
      master startup and one generated by master FLUSH LOGS, so cannot
1865
 
      be sure temp tables have to be dropped. So do nothing.
1866
 
    */
1867
 
    break;
1868
 
  default:
1869
 
    /* this case is impossible */
1870
 
    return(1);
1871
 
  }
1872
 
  return(0);
1873
 
}
1874
 
 
1875
 
/***************************************************************************
1876
 
       Format_description_log_event methods
1877
 
****************************************************************************/
1878
 
 
1879
 
/**
1880
 
  Format_description_log_event 1st ctor.
1881
 
 
1882
 
    Ctor. Can be used to create the event to write to the binary log (when the
1883
 
    server starts or when FLUSH LOGS), or to create artificial events to parse
1884
 
    binlogs from MySQL 3.23 or 4.x.
1885
 
    When in a client, only the 2nd use is possible.
1886
 
 
1887
 
  @param binlog_version         the binlog version for which we want to build
1888
 
                                an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
1889
 
                                x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
1890
 
                                old 4.0 (binlog version 2) is not supported;
1891
 
                                it should not be used for replication with
1892
 
                                5.0.
1893
 
*/
1894
 
 
1895
 
Format_description_log_event::
1896
 
Format_description_log_event(uint8_t binlog_ver, const char*)
1897
 
  :Start_log_event_v3(), event_type_permutation(0)
1898
 
{
1899
 
  binlog_version= binlog_ver;
1900
 
  switch (binlog_ver) {
1901
 
  case 4: /* MySQL 5.0 */
1902
 
    memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1903
 
    common_header_len= LOG_EVENT_HEADER_LEN;
1904
 
    number_of_event_types= LOG_EVENT_TYPES;
1905
 
    /* we'll catch malloc() error in is_valid() */
1906
 
    post_header_len=(uint8_t*) malloc(number_of_event_types*sizeof(uint8_t));
1907
 
    memset(post_header_len, 0, number_of_event_types*sizeof(uint8_t));
1908
 
    /*
1909
 
      This long list of assignments is not beautiful, but I see no way to
1910
 
      make it nicer, as the right members are #defines, not array members, so
1911
 
      it's impossible to write a loop.
1912
 
    */
1913
 
    if (post_header_len)
1914
 
    {
1915
 
      post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
1916
 
      post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
1917
 
      post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
1918
 
      post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
1919
 
      post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
1920
 
      post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
1921
 
      post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
1922
 
      post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
1923
 
      post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
1924
 
      post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
1925
 
      post_header_len[TABLE_MAP_EVENT-1]=    TABLE_MAP_HEADER_LEN;
1926
 
      post_header_len[WRITE_ROWS_EVENT-1]=   ROWS_HEADER_LEN;
1927
 
      post_header_len[UPDATE_ROWS_EVENT-1]=  ROWS_HEADER_LEN;
1928
 
      post_header_len[DELETE_ROWS_EVENT-1]=  ROWS_HEADER_LEN;
1929
 
      post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
1930
 
      post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
1931
 
      post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
1932
 
      post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
1933
 
    }
1934
 
    break;
1935
 
 
1936
 
  default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
1937
 
    assert(0);
1938
 
  }
1939
 
  calc_server_version_split();
1940
 
}
1941
 
 
1942
 
 
1943
 
/**
1944
 
  The problem with this constructor is that the fixed header may have a
1945
 
  length different from this version, but we don't know this length as we
1946
 
  have not read the Format_description_log_event which says it, yet. This
1947
 
  length is in the post-header of the event, but we don't know where the
1948
 
  post-header starts.
1949
 
 
1950
 
  So this type of event HAS to:
1951
 
  - either have the header's length at the beginning (in the header, at a
1952
 
  fixed position which will never be changed), not in the post-header. That
1953
 
  would make the header be "shifted" compared to other events.
1954
 
  - or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
1955
 
  versions, so that we know for sure.
1956
 
 
1957
 
  I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
1958
 
  it is sent before Format_description_log_event).
1959
 
*/
1960
 
 
1961
 
Format_description_log_event::
1962
 
Format_description_log_event(const char* buf,
1963
 
                             uint32_t event_len,
1964
 
                             const
1965
 
                             Format_description_log_event*
1966
 
                             description_event)
1967
 
  :Start_log_event_v3(buf, description_event), event_type_permutation(0)
1968
 
{
1969
 
  buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
1970
 
  if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
1971
 
    return; /* sanity check */
1972
 
  number_of_event_types=
1973
 
    event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
1974
 
  post_header_len= (uint8_t*) malloc(number_of_event_types*
1975
 
                                     sizeof(*post_header_len));
1976
 
  /* If alloc fails, we'll detect it in is_valid() */
1977
 
  if (post_header_len != NULL)
1978
 
    memcpy(post_header_len, buf+ST_COMMON_HEADER_LEN_OFFSET+1,
1979
 
           number_of_event_types* sizeof(*post_header_len));
1980
 
  calc_server_version_split();
1981
 
 
1982
 
  /*
1983
 
    In some previous versions, the events were given other event type
1984
 
    id numbers than in the present version. When replicating from such
1985
 
    a version, we therefore set up an array that maps those id numbers
1986
 
    to the id numbers of the present server.
1987
 
 
1988
 
    If post_header_len is null, it means malloc failed, and is_valid
1989
 
    will fail, so there is no need to do anything.
1990
 
 
1991
 
    The trees in which events have wrong id's are:
1992
 
 
1993
 
    mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
1994
 
    mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
1995
 
    mysql-5.1-wl2325-no-dd
1996
 
 
1997
 
    (this was found by grepping for two lines in sequence where the
1998
 
    first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
1999
 
    "TABLE_MAP_EVENT," in log_event.h in all trees)
2000
 
 
2001
 
    In these trees, the following server_versions existed since
2002
 
    TABLE_MAP_EVENT was introduced:
2003
 
 
2004
 
    5.1.1-a_drop5p3   5.1.1-a_drop5p4        5.1.1-alpha
2005
 
    5.1.2-a_drop5p10  5.1.2-a_drop5p11       5.1.2-a_drop5p12
2006
 
    5.1.2-a_drop5p13  5.1.2-a_drop5p14       5.1.2-a_drop5p15
2007
 
    5.1.2-a_drop5p16  5.1.2-a_drop5p16b      5.1.2-a_drop5p16c
2008
 
    5.1.2-a_drop5p17  5.1.2-a_drop5p4        5.1.2-a_drop5p5
2009
 
    5.1.2-a_drop5p6   5.1.2-a_drop5p7        5.1.2-a_drop5p8
2010
 
    5.1.2-a_drop5p9   5.1.3-a_drop5p17       5.1.3-a_drop5p17b
2011
 
    5.1.3-a_drop5p17c 5.1.4-a_drop5p18       5.1.4-a_drop5p19
2012
 
    5.1.4-a_drop5p20  5.1.4-a_drop6p0        5.1.4-a_drop6p1
2013
 
    5.1.4-a_drop6p2   5.1.5-a_drop5p20       5.2.0-a_drop6p3
2014
 
    5.2.0-a_drop6p4   5.2.0-a_drop6p5        5.2.0-a_drop6p6
2015
 
    5.2.1-a_drop6p10  5.2.1-a_drop6p11       5.2.1-a_drop6p12
2016
 
    5.2.1-a_drop6p6   5.2.1-a_drop6p7        5.2.1-a_drop6p8
2017
 
    5.2.2-a_drop6p13  5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2018
 
    5.2.2-a_drop6p13c
2019
 
 
2020
 
    (this was found by grepping for "mysql," in all historical
2021
 
    versions of configure.in in the trees listed above).
2022
 
 
2023
 
    There are 5.1.1-alpha versions that use the new event id's, so we
2024
 
    do not test that version string.  So replication from 5.1.1-alpha
2025
 
    with the other event id's to a new version does not work.
2026
 
    Moreover, we can safely ignore the part after drop[56].  This
2027
 
    allows us to simplify the big list above to the following regexes:
2028
 
 
2029
 
    5\.1\.[1-5]-a_drop5.*
2030
 
    5\.1\.4-a_drop6.*
2031
 
    5\.2\.[0-2]-a_drop6.*
2032
 
 
2033
 
    This is what we test for in the 'if' below.
2034
 
  */
2035
 
  if (post_header_len &&
2036
 
      server_version[0] == '5' && server_version[1] == '.' &&
2037
 
      server_version[3] == '.' &&
2038
 
      strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2039
 
      ((server_version[2] == '1' &&
2040
 
        server_version[4] >= '1' && server_version[4] <= '5' &&
2041
 
        server_version[12] == '5') ||
2042
 
       (server_version[2] == '1' &&
2043
 
        server_version[4] == '4' &&
2044
 
        server_version[12] == '6') ||
2045
 
       (server_version[2] == '2' &&
2046
 
        server_version[4] >= '0' && server_version[4] <= '2' &&
2047
 
        server_version[12] == '6')))
2048
 
  {
2049
 
    if (number_of_event_types != 22)
2050
 
    {
2051
 
      /* this makes is_valid() return false. */
2052
 
      free(post_header_len);
2053
 
      post_header_len= NULL;
2054
 
      return;
2055
 
    }
2056
 
    static const uint8_t perm[23]=
2057
 
      {
2058
 
        UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2059
 
        LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2060
 
        APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2061
 
        NEW_LOAD_EVENT,
2062
 
        FORMAT_DESCRIPTION_EVENT,
2063
 
        TABLE_MAP_EVENT,
2064
 
        XID_EVENT,
2065
 
        BEGIN_LOAD_QUERY_EVENT,
2066
 
        EXECUTE_LOAD_QUERY_EVENT,
2067
 
      };
2068
 
    event_type_permutation= perm;
2069
 
    /*
2070
 
      Since we use (permuted) event id's to index the post_header_len
2071
 
      array, we need to permute the post_header_len array too.
2072
 
    */
2073
 
    uint8_t post_header_len_temp[23];
2074
 
    for (int i= 1; i < 23; i++)
2075
 
      post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2076
 
    for (int i= 0; i < 22; i++)
2077
 
      post_header_len[i] = post_header_len_temp[i];
2078
 
  }
2079
 
  return;
2080
 
}
2081
 
 
2082
 
bool Format_description_log_event::write(IO_CACHE* file)
2083
 
{
2084
 
  /*
2085
 
    We don't call Start_log_event_v3::write() because this would make 2
2086
 
    my_b_safe_write().
2087
 
  */
2088
 
  unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2089
 
  int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2090
 
  memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2091
 
  if (!dont_set_created)
2092
 
    created= when= get_time();
2093
 
  int4store(buff + ST_CREATED_OFFSET,created);
2094
 
  buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2095
 
  memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2096
 
         LOG_EVENT_TYPES);
2097
 
  return (write_header(file, sizeof(buff)) ||
2098
 
          my_b_safe_write(file, buff, sizeof(buff)));
2099
 
}
2100
 
 
2101
 
 
2102
 
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2103
 
{
2104
 
  /*
2105
 
    As a transaction NEVER spans on 2 or more binlogs:
2106
 
    if we have an active transaction at this point, the master died
2107
 
    while writing the transaction to the binary log, i.e. while
2108
 
    flushing the binlog cache to the binlog. XA guarantees that master has
2109
 
    rolled back. So we roll back.
2110
 
    Note: this event could be sent by the master to inform us of the
2111
 
    format of its binlog; in other words maybe it is not at its
2112
 
    original place when it comes to us; we'll know this by checking
2113
 
    log_pos ("artificial" events have log_pos == 0).
2114
 
  */
2115
 
  if (!artificial_event && created && session->transaction.all.ha_list)
2116
 
  {
2117
 
    /* This is not an error (XA is safe), just an information */
2118
 
    rli->report(INFORMATION_LEVEL, 0,
2119
 
                _("Rolling back unfinished transaction (no COMMIT "
2120
 
                  "or ROLLBACK in relay log). A probable cause is that "
2121
 
                  "the master died while writing the transaction to "
2122
 
                  "its binary log, thus rolled back too."));
2123
 
    const_cast<Relay_log_info*>(rli)->cleanup_context(session, 1);
2124
 
  }
2125
 
  /*
2126
 
    If this event comes from ourselves, there is no cleaning task to
2127
 
    perform, we don't call Start_log_event_v3::do_apply_event()
2128
 
    (this was just to update the log's description event).
2129
 
  */
2130
 
  if (server_id != ::server_id)
2131
 
  {
2132
 
    /*
2133
 
      If the event was not requested by the slave i.e. the master sent
2134
 
      it while the slave asked for a position >4, the event will make
2135
 
      rli->group_master_log_pos advance. Say that the slave asked for
2136
 
      position 1000, and the Format_desc event's end is 96. Then in
2137
 
      the beginning of replication rli->group_master_log_pos will be
2138
 
      0, then 96, then jump to first really asked event (which is
2139
 
      >96). So this is ok.
2140
 
    */
2141
 
    return(Start_log_event_v3::do_apply_event(rli));
2142
 
  }
2143
 
  return(0);
2144
 
}
2145
 
 
2146
 
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2147
 
{
2148
 
  /* save the information describing this binlog */
2149
 
  delete rli->relay_log.description_event_for_exec;
2150
 
  rli->relay_log.description_event_for_exec= this;
2151
 
 
2152
 
  if (server_id == ::server_id)
2153
 
  {
2154
 
    /*
2155
 
      We only increase the relay log position if we are skipping
2156
 
      events and do not touch any group_* variables, nor flush the
2157
 
      relay log info.  If there is a crash, we will have to re-skip
2158
 
      the events again, but that is a minor issue.
2159
 
 
2160
 
      If we do not skip stepping the group log position (and the
2161
 
      server id was changed when restarting the server), it might well
2162
 
      be that we start executing at a position that is invalid, e.g.,
2163
 
      at a Rows_log_event or a Query_log_event preceeded by a
2164
 
      Intvar_log_event instead of starting at a Table_map_log_event or
2165
 
      the Intvar_log_event respectively.
2166
 
     */
2167
 
    rli->inc_event_relay_log_pos();
2168
 
    return 0;
2169
 
  }
2170
 
  else
2171
 
  {
2172
 
    return Log_event::do_update_pos(rli);
2173
 
  }
2174
 
}
2175
 
 
2176
 
Log_event::enum_skip_reason
2177
 
Format_description_log_event::do_shall_skip(Relay_log_info *)
2178
 
{
2179
 
  return Log_event::EVENT_SKIP_NOT;
2180
 
}
2181
 
 
2182
 
 
2183
 
/**
2184
 
   Splits the event's 'server_version' string into three numeric pieces stored
2185
 
   into 'server_version_split':
2186
 
   X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2187
 
   X.Yabc -> {X,Y,0}
2188
 
   Xabc -> {X,0,0}
2189
 
   'server_version_split' is then used for lookups to find if the server which
2190
 
   created this event has some known bug.
2191
 
*/
2192
 
void Format_description_log_event::calc_server_version_split()
2193
 
{
2194
 
  char *p= server_version, *r;
2195
 
  ulong number;
2196
 
  for (uint32_t i= 0; i<=2; i++)
2197
 
  {
2198
 
    number= strtoul(p, &r, 10);
2199
 
    server_version_split[i]= (unsigned char)number;
2200
 
    assert(number < 256); // fit in unsigned char
2201
 
    p= r;
2202
 
    assert(!((i == 0) && (*r != '.'))); // should be true in practice
2203
 
    if (*r == '.')
2204
 
      p++; // skip the dot
2205
 
  }
2206
 
}
2207
 
 
2208
 
 
2209
 
  /**************************************************************************
2210
 
        Load_log_event methods
2211
 
   General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2212
 
   going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2213
 
   However, the 5.0 slave could still have to read such events (from a 4.x
2214
 
   master), convert them (which just means maybe expand the header, when 5.0
2215
 
   servers have a UID in events) (remember that whatever is after the header
2216
 
   will be like in 4.x, as this event's format is not modified in 5.0 as we
2217
 
   will use new types of events to log the new LOAD DATA INFILE features).
2218
 
   To be able to read/convert, we just need to not assume that the common
2219
 
   header is of length LOG_EVENT_HEADER_LEN (we must use the description
2220
 
   event).
2221
 
   Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2222
 
   between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2223
 
   positions displayed in SHOW SLAVE STATUS then are fine too).
2224
 
  **************************************************************************/
2225
 
 
2226
 
/*
2227
 
  Load_log_event::pack_info()
2228
 
*/
2229
 
 
2230
 
uint32_t Load_log_event::get_query_buffer_length()
2231
 
{
2232
 
  return
2233
 
    5 + db_len + 3 +                        // "use DB; "
2234
 
    18 + fname_len + 2 +                    // "LOAD DATA INFILE 'file''"
2235
 
    7 +                                     // LOCAL
2236
 
    9 +                                     // " REPLACE or IGNORE "
2237
 
    13 + table_name_len*2 +                 // "INTO Table `table`"
2238
 
    21 + sql_ex.field_term_len*4 + 2 +      // " FIELDS TERMINATED BY 'str'"
2239
 
    23 + sql_ex.enclosed_len*4 + 2 +        // " OPTIONALLY ENCLOSED BY 'str'"
2240
 
    12 + sql_ex.escaped_len*4 + 2 +         // " ESCAPED BY 'str'"
2241
 
    21 + sql_ex.line_term_len*4 + 2 +       // " LINES TERMINATED BY 'str'"
2242
 
    19 + sql_ex.line_start_len*4 + 2 +      // " LINES STARTING BY 'str'"
2243
 
    15 + 22 +                               // " IGNORE xxx  LINES"
2244
 
    3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2245
 
}
2246
 
 
2247
 
 
2248
 
void Load_log_event::print_query(bool need_db, char *buf,
2249
 
                                 char **end, char **fn_start, char **fn_end)
2250
 
{
2251
 
  char *pos= buf;
2252
 
 
2253
 
  if (need_db && db && db_len)
2254
 
  {
2255
 
    pos= strcpy(pos, "use `")+5;
2256
 
    memcpy(pos, db, db_len);
2257
 
    pos= strcpy(pos+db_len, "`; ")+3;
2258
 
  }
2259
 
 
2260
 
  pos= strcpy(pos, "LOAD DATA ")+10;
2261
 
 
2262
 
  if (fn_start)
2263
 
    *fn_start= pos;
2264
 
 
2265
 
  if (check_fname_outside_temp_buf())
2266
 
    pos= strcpy(pos, "LOCAL ")+6;
2267
 
  pos= strcpy(pos, "INFILE '")+8;
2268
 
  memcpy(pos, fname, fname_len);
2269
 
  pos= strcpy(pos+fname_len, "' ")+2;
2270
 
 
2271
 
  if (sql_ex.opt_flags & REPLACE_FLAG)
2272
 
    pos= strcpy(pos, " REPLACE ")+9;
2273
 
  else if (sql_ex.opt_flags & IGNORE_FLAG)
2274
 
    pos= strcpy(pos, " IGNORE ")+8;
2275
 
 
2276
 
  pos= strcpy(pos ,"INTO")+4;
2277
 
 
2278
 
  if (fn_end)
2279
 
    *fn_end= pos;
2280
 
 
2281
 
  pos= strcpy(pos ," Table `")+8;
2282
 
  memcpy(pos, table_name, table_name_len);
2283
 
  pos+= table_name_len;
2284
 
 
2285
 
  /* We have to create all optinal fields as the default is not empty */
2286
 
  pos= strcpy(pos, "` FIELDS TERMINATED BY ")+23;
2287
 
  pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2288
 
  if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2289
 
    pos= strcpy(pos, " OPTIONALLY ")+12;
2290
 
  pos= strcpy(pos, " ENCLOSED BY ")+13;
2291
 
  pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2292
 
 
2293
 
  pos= strcpy(pos, " ESCAPED BY ")+12;
2294
 
  pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2295
 
 
2296
 
  pos= strcpy(pos, " LINES TERMINATED BY ")+21;
2297
 
  pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2298
 
  if (sql_ex.line_start_len)
2299
 
  {
2300
 
    pos= strcpy(pos, " STARTING BY ")+13;
2301
 
    pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2302
 
  }
2303
 
 
2304
 
  if ((long) skip_lines > 0)
2305
 
  {
2306
 
    pos= strcpy(pos, " IGNORE ")+8;
2307
 
    pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2308
 
    pos= strcpy(pos," LINES ")+7;
2309
 
  }
2310
 
 
2311
 
  if (num_fields)
2312
 
  {
2313
 
    uint32_t i;
2314
 
    const char *field= fields;
2315
 
    pos= strcpy(pos, " (")+2;
2316
 
    for (i = 0; i < num_fields; i++)
2317
 
    {
2318
 
      if (i)
2319
 
      {
2320
 
        *pos++= ' ';
2321
 
        *pos++= ',';
2322
 
      }
2323
 
      memcpy(pos, field, field_lens[i]);
2324
 
      pos+=   field_lens[i];
2325
 
      field+= field_lens[i]  + 1;
2326
 
    }
2327
 
    *pos++= ')';
2328
 
  }
2329
 
 
2330
 
  *end= pos;
2331
 
}
2332
 
 
2333
 
 
2334
 
void Load_log_event::pack_info(Protocol *protocol)
2335
 
{
2336
 
  char *buf, *end;
2337
 
 
2338
 
  if (!(buf= (char*) malloc(get_query_buffer_length())))
2339
 
    return;
2340
 
  print_query(true, buf, &end, 0, 0);
2341
 
  protocol->store(buf, end-buf, &my_charset_bin);
2342
 
  free(buf);
2343
 
}
2344
 
 
2345
 
 
2346
 
/*
2347
 
  Load_log_event::write_data_header()
2348
 
*/
2349
 
 
2350
 
bool Load_log_event::write_data_header(IO_CACHE* file)
2351
 
{
2352
 
  char buf[LOAD_HEADER_LEN];
2353
 
  int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2354
 
  int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2355
 
  int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2356
 
  buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2357
 
  buf[L_DB_LEN_OFFSET] = (char)db_len;
2358
 
  int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2359
 
  return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2360
 
}
2361
 
 
2362
 
 
2363
 
/*
2364
 
  Load_log_event::write_data_body()
2365
 
*/
2366
 
 
2367
 
bool Load_log_event::write_data_body(IO_CACHE* file)
2368
 
{
2369
 
  if (sql_ex.write_data(file))
2370
 
    return 1;
2371
 
  if (num_fields && fields && field_lens)
2372
 
  {
2373
 
    if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2374
 
        my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2375
 
      return 1;
2376
 
  }
2377
 
  return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2378
 
          my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2379
 
          my_b_safe_write(file, (unsigned char*)fname, fname_len));
2380
 
}
2381
 
 
2382
 
 
2383
 
/*
2384
 
  Load_log_event::Load_log_event()
2385
 
*/
2386
 
 
2387
 
Load_log_event::Load_log_event(Session *session_arg, sql_exchange *ex,
2388
 
                               const char *db_arg, const char *table_name_arg,
2389
 
                               List<Item> &fields_arg,
2390
 
                               enum enum_duplicates handle_dup,
2391
 
                               bool ignore, bool using_trans)
2392
 
  :Log_event(session_arg,
2393
 
             session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2394
 
             using_trans),
2395
 
   thread_id(session_arg->thread_id),
2396
 
   slave_proxy_id(session_arg->variables.pseudo_thread_id),
2397
 
   num_fields(0),fields(0),
2398
 
   field_lens(0),field_block_len(0),
2399
 
   table_name(table_name_arg ? table_name_arg : ""),
2400
 
   db(db_arg), fname(ex->file_name), local_fname(false)
2401
 
{
2402
 
  time_t end_time;
2403
 
  time(&end_time);
2404
 
  exec_time = (ulong) (end_time  - session_arg->start_time);
2405
 
  /* db can never be a zero pointer in 4.0 */
2406
 
  db_len = (uint32_t) strlen(db);
2407
 
  table_name_len = (uint32_t) strlen(table_name);
2408
 
  fname_len = (fname) ? (uint) strlen(fname) : 0;
2409
 
  sql_ex.field_term = (char*) ex->field_term->ptr();
2410
 
  sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2411
 
  sql_ex.enclosed = (char*) ex->enclosed->ptr();
2412
 
  sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2413
 
  sql_ex.line_term = (char*) ex->line_term->ptr();
2414
 
  sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2415
 
  sql_ex.line_start = (char*) ex->line_start->ptr();
2416
 
  sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2417
 
  sql_ex.escaped = (char*) ex->escaped->ptr();
2418
 
  sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2419
 
  sql_ex.opt_flags = 0;
2420
 
  sql_ex.cached_new_format = -1;
2421
 
 
2422
 
  if (ex->dumpfile)
2423
 
    sql_ex.opt_flags|= DUMPFILE_FLAG;
2424
 
  if (ex->opt_enclosed)
2425
 
    sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2426
 
 
2427
 
  sql_ex.empty_flags= 0;
2428
 
 
2429
 
  switch (handle_dup) {
2430
 
  case DUP_REPLACE:
2431
 
    sql_ex.opt_flags|= REPLACE_FLAG;
2432
 
    break;
2433
 
  case DUP_UPDATE:                              // Impossible here
2434
 
  case DUP_ERROR:
2435
 
    break;
2436
 
  }
2437
 
  if (ignore)
2438
 
    sql_ex.opt_flags|= IGNORE_FLAG;
2439
 
 
2440
 
  if (!ex->field_term->length())
2441
 
    sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2442
 
  if (!ex->enclosed->length())
2443
 
    sql_ex.empty_flags |= ENCLOSED_EMPTY;
2444
 
  if (!ex->line_term->length())
2445
 
    sql_ex.empty_flags |= LINE_TERM_EMPTY;
2446
 
  if (!ex->line_start->length())
2447
 
    sql_ex.empty_flags |= LINE_START_EMPTY;
2448
 
  if (!ex->escaped->length())
2449
 
    sql_ex.empty_flags |= ESCAPED_EMPTY;
2450
 
 
2451
 
  skip_lines = ex->skip_lines;
2452
 
 
2453
 
  List_iterator<Item> li(fields_arg);
2454
 
  field_lens_buf.length(0);
2455
 
  fields_buf.length(0);
2456
 
  Item* item;
2457
 
  while ((item = li++))
2458
 
  {
2459
 
    num_fields++;
2460
 
    unsigned char len = (unsigned char) strlen(item->name);
2461
 
    field_block_len += len + 1;
2462
 
    fields_buf.append(item->name, len + 1);
2463
 
    field_lens_buf.append((char*)&len, 1);
2464
 
  }
2465
 
 
2466
 
  field_lens = (const unsigned char*)field_lens_buf.ptr();
2467
 
  fields = fields_buf.ptr();
2468
 
}
2469
 
 
2470
 
 
2471
 
/**
2472
 
  @note
2473
 
    The caller must do buf[event_len] = 0 before he starts using the
2474
 
    constructed event.
2475
 
*/
2476
 
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2477
 
                               const Format_description_log_event *description_event)
2478
 
  :Log_event(buf, description_event), num_fields(0), fields(0),
2479
 
   field_lens(0),field_block_len(0),
2480
 
   table_name(0), db(0), fname(0), local_fname(false)
2481
 
{
2482
 
  /*
2483
 
    I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2484
 
    4.0->5.0 and 5.0->5.0 and it works.
2485
 
  */
2486
 
  if (event_len)
2487
 
    copy_log_event(buf, event_len,
2488
 
                   ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2489
 
                    LOAD_HEADER_LEN +
2490
 
                    description_event->common_header_len :
2491
 
                    LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2492
 
                   description_event);
2493
 
  /* otherwise it's a derived class, will call copy_log_event() itself */
2494
 
  return;
2495
 
}
2496
 
 
2497
 
 
2498
 
/*
2499
 
  Load_log_event::copy_log_event()
2500
 
*/
2501
 
 
2502
 
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2503
 
                                   int body_offset,
2504
 
                                   const Format_description_log_event *description_event)
2505
 
{
2506
 
  uint32_t data_len;
2507
 
  char* buf_end = (char*)buf + event_len;
2508
 
  /* this is the beginning of the post-header */
2509
 
  const char* data_head = buf + description_event->common_header_len;
2510
 
  slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2511
 
  exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2512
 
  skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2513
 
  table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2514
 
  db_len = (uint)data_head[L_DB_LEN_OFFSET];
2515
 
  num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2516
 
 
2517
 
  if ((int) event_len < body_offset)
2518
 
    return(1);
2519
 
  /*
2520
 
    Sql_ex.init() on success returns the pointer to the first byte after
2521
 
    the sql_ex structure, which is the start of field lengths array.
2522
 
  */
2523
 
  if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2524
 
                                        buf_end,
2525
 
                                        buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2526
 
    return(1);
2527
 
 
2528
 
  data_len = event_len - body_offset;
2529
 
  if (num_fields > data_len) // simple sanity check against corruption
2530
 
    return(1);
2531
 
  for (uint32_t i = 0; i < num_fields; i++)
2532
 
    field_block_len += (uint)field_lens[i] + 1;
2533
 
 
2534
 
  fields = (char*)field_lens + num_fields;
2535
 
  table_name  = fields + field_block_len;
2536
 
  db = table_name + table_name_len + 1;
2537
 
  fname = db + db_len + 1;
2538
 
  fname_len = strlen(fname);
2539
 
  // null termination is accomplished by the caller doing buf[event_len]=0
2540
 
 
2541
 
  return(0);
2542
 
}
2543
 
 
2544
 
 
2545
 
/**
2546
 
  Load_log_event::set_fields()
2547
 
 
2548
 
  @note
2549
 
    This function can not use the member variable
2550
 
    for the database, since LOAD DATA INFILE on the slave
2551
 
    can be for a different database than the current one.
2552
 
    This is the reason for the affected_db argument to this method.
2553
 
*/
2554
 
 
2555
 
void Load_log_event::set_fields(const char* affected_db,
2556
 
                                List<Item> &field_list,
2557
 
                                Name_resolution_context *context)
2558
 
{
2559
 
  uint32_t i;
2560
 
  const char* field = fields;
2561
 
  for (i= 0; i < num_fields; i++)
2562
 
  {
2563
 
    field_list.push_back(new Item_field(context,
2564
 
                                        affected_db, table_name, field));
2565
 
    field+= field_lens[i]  + 1;
2566
 
  }
2567
 
}
2568
 
 
2569
 
 
2570
 
/**
2571
 
  Does the data loading job when executing a LOAD DATA on the slave.
2572
 
 
2573
 
  @param net
2574
 
  @param rli
2575
 
  @param use_rli_only_for_errors     If set to 1, rli is provided to
2576
 
                                     Load_log_event::exec_event only for this
2577
 
                                     function to have RPL_LOG_NAME and
2578
 
                                     rli->last_slave_error, both being used by
2579
 
                                     error reports. rli's position advancing
2580
 
                                     is skipped (done by the caller which is
2581
 
                                     Execute_load_log_event::exec_event).
2582
 
                                     If set to 0, rli is provided for full use,
2583
 
                                     i.e. for error reports and position
2584
 
                                     advancing.
2585
 
 
2586
 
  @todo
2587
 
    fix this; this can be done by testing rules in
2588
 
    Create_file_log_event::exec_event() and then discarding Append_block and
2589
 
    al.
2590
 
  @todo
2591
 
    this is a bug - this needs to be moved to the I/O thread
2592
 
 
2593
 
  @retval
2594
 
    0           Success
2595
 
  @retval
2596
 
    1           Failure
2597
 
*/
2598
 
 
2599
 
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2600
 
                                   bool use_rli_only_for_errors)
2601
 
{
2602
 
  Query_id &query_id= Query_id::get_query_id();
2603
 
  session->set_db(db, strlen(db));
2604
 
  assert(session->query == 0);
2605
 
  session->query_length= 0;                         // Should not be needed
2606
 
  session->is_slave_error= 0;
2607
 
  clear_all_errors(session, const_cast<Relay_log_info*>(rli));
2608
 
 
2609
 
  /* see Query_log_event::do_apply_event() and BUG#13360 */
2610
 
  assert(!rli->m_table_map.count());
2611
 
  /*
2612
 
    Usually lex_start() is called by mysql_parse(), but we need it here
2613
 
    as the present method does not call mysql_parse().
2614
 
  */
2615
 
  lex_start(session);
2616
 
  session->reset_for_next_command();
2617
 
 
2618
 
  if (!use_rli_only_for_errors)
2619
 
  {
2620
 
    /*
2621
 
      Saved for InnoDB, see comment in
2622
 
      Query_log_event::do_apply_event()
2623
 
    */
2624
 
    const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2625
 
  }
2626
 
 
2627
 
   /*
2628
 
    We test replicate_*_db rules. Note that we have already prepared
2629
 
    the file to load, even if we are going to ignore and delete it
2630
 
    now. So it is possible that we did a lot of disk writes for
2631
 
    nothing. In other words, a big LOAD DATA INFILE on the master will
2632
 
    still consume a lot of space on the slave (space in the relay log
2633
 
    + space of temp files: twice the space of the file to load...)
2634
 
    even if it will finally be ignored.  TODO: fix this; this can be
2635
 
    done by testing rules in Create_file_log_event::do_apply_event()
2636
 
    and then discarding Append_block and al. Another way is do the
2637
 
    filtering in the I/O thread (more efficient: no disk writes at
2638
 
    all).
2639
 
 
2640
 
 
2641
 
    Note:   We do not need to execute reset_one_shot_variables() if this
2642
 
            db_ok() test fails.
2643
 
    Reason: The db stored in binlog events is the same for SET and for
2644
 
            its companion query.  If the SET is ignored because of
2645
 
            db_ok(), the companion query will also be ignored, and if
2646
 
            the companion query is ignored in the db_ok() test of
2647
 
            ::do_apply_event(), then the companion SET also have so
2648
 
            we don't need to reset_one_shot_variables().
2649
 
  */
2650
 
  if (1)
2651
 
  {
2652
 
    session->set_time((time_t)when);
2653
 
    session->query_id = query_id.next();
2654
 
    /*
2655
 
      Initing session->row_count is not necessary in theory as this variable has no
2656
 
      influence in the case of the slave SQL thread (it is used to generate a
2657
 
      "data truncated" warning but which is absorbed and never gets to the
2658
 
      error log); still we init it to avoid a Valgrind message.
2659
 
    */
2660
 
    drizzle_reset_errors(session, 0);
2661
 
 
2662
 
    TableList tables;
2663
 
    memset(&tables, 0, sizeof(tables));
2664
 
    tables.db= session->strmake(session->db, session->db_length);
2665
 
    tables.alias = tables.table_name = (char*) table_name;
2666
 
    tables.lock_type = TL_WRITE;
2667
 
    tables.updating= 1;
2668
 
 
2669
 
    // the table will be opened in mysql_load
2670
 
    {
2671
 
      char llbuff[22];
2672
 
      char *end;
2673
 
      enum enum_duplicates handle_dup;
2674
 
      bool ignore= 0;
2675
 
      char *load_data_query;
2676
 
 
2677
 
      /*
2678
 
        Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2679
 
        and written to slave's binlog if binlogging is on.
2680
 
      */
2681
 
      if (!(load_data_query= (char *)session->alloc(get_query_buffer_length() + 1)))
2682
 
      {
2683
 
        /*
2684
 
          This will set session->fatal_error in case of OOM. So we surely will notice
2685
 
          that something is wrong.
2686
 
        */
2687
 
        goto error;
2688
 
      }
2689
 
 
2690
 
      print_query(false, load_data_query, &end, (char **)&session->lex->fname_start,
2691
 
                  (char **)&session->lex->fname_end);
2692
 
      *end= 0;
2693
 
      session->query_length= end - load_data_query;
2694
 
      session->query= load_data_query;
2695
 
 
2696
 
      if (sql_ex.opt_flags & REPLACE_FLAG)
2697
 
      {
2698
 
        handle_dup= DUP_REPLACE;
2699
 
      }
2700
 
      else if (sql_ex.opt_flags & IGNORE_FLAG)
2701
 
      {
2702
 
        ignore= 1;
2703
 
        handle_dup= DUP_ERROR;
2704
 
      }
2705
 
      else
2706
 
      {
2707
 
        /*
2708
 
          When replication is running fine, if it was DUP_ERROR on the
2709
 
          master then we could choose IGNORE here, because if DUP_ERROR
2710
 
          suceeded on master, and data is identical on the master and slave,
2711
 
          then there should be no uniqueness errors on slave, so IGNORE is
2712
 
          the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2713
 
          (because the data on the master and slave happen to be different
2714
 
          (user error or bug), we want LOAD DATA to print an error message on
2715
 
          the slave to discover the problem.
2716
 
 
2717
 
          If reading from net (a 3.23 master), mysql_load() will change this
2718
 
          to IGNORE.
2719
 
        */
2720
 
        handle_dup= DUP_ERROR;
2721
 
      }
2722
 
      /*
2723
 
        We need to set session->lex->sql_command and session->lex->duplicates
2724
 
        since InnoDB tests these variables to decide if this is a LOAD
2725
 
        DATA ... REPLACE INTO ... statement even though mysql_parse()
2726
 
        is not called.  This is not needed in 5.0 since there the LOAD
2727
 
        DATA ... statement is replicated using mysql_parse(), which
2728
 
        sets the session->lex fields correctly.
2729
 
      */
2730
 
      session->lex->sql_command= SQLCOM_LOAD;
2731
 
      session->lex->duplicates= handle_dup;
2732
 
 
2733
 
      sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2734
 
      String field_term(sql_ex.field_term,sql_ex.field_term_len,&my_charset_utf8_general_ci);
2735
 
      String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,&my_charset_utf8_general_ci);
2736
 
      String line_term(sql_ex.line_term,sql_ex.line_term_len,&my_charset_utf8_general_ci);
2737
 
      String line_start(sql_ex.line_start,sql_ex.line_start_len,&my_charset_utf8_general_ci);
2738
 
      String escaped(sql_ex.escaped,sql_ex.escaped_len, &my_charset_utf8_general_ci);
2739
 
      ex.field_term= &field_term;
2740
 
      ex.enclosed= &enclosed;
2741
 
      ex.line_term= &line_term;
2742
 
      ex.line_start= &line_start;
2743
 
      ex.escaped= &escaped;
2744
 
 
2745
 
      ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2746
 
      if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2747
 
        ex.field_term->length(0);
2748
 
 
2749
 
      ex.skip_lines = skip_lines;
2750
 
      List<Item> field_list;
2751
 
      session->lex->select_lex.context.resolve_in_table_list_only(&tables);
2752
 
      set_fields(tables.db, field_list, &session->lex->select_lex.context);
2753
 
      session->variables.pseudo_thread_id= thread_id;
2754
 
      if (net)
2755
 
      {
2756
 
        // mysql_load will use session->net to read the file
2757
 
        session->net.vio = net->vio;
2758
 
        /*
2759
 
          Make sure the client does not get confused about the packet sequence
2760
 
        */
2761
 
        session->net.pkt_nr = net->pkt_nr;
2762
 
      }
2763
 
      /*
2764
 
        It is safe to use tmp_list twice because we are not going to
2765
 
        update it inside mysql_load().
2766
 
      */
2767
 
      List<Item> tmp_list;
2768
 
      if (mysql_load(session, &ex, &tables, field_list, tmp_list, tmp_list,
2769
 
                     handle_dup, ignore, net != 0))
2770
 
        session->is_slave_error= 1;
2771
 
      if (session->cuted_fields)
2772
 
      {
2773
 
        /* log_pos is the position of the LOAD event in the master log */
2774
 
        errmsg_printf(ERRMSG_LVL_WARN, _("Slave: load data infile on table '%s' at "
2775
 
                            "log position %s in log '%s' produced %ld "
2776
 
                            "warning(s). Default database: '%s'"),
2777
 
                          (char*) table_name,
2778
 
                          llstr(log_pos,llbuff), RPL_LOG_NAME,
2779
 
                          (ulong) session->cuted_fields,
2780
 
                          print_slave_db_safe(session->db));
2781
 
      }
2782
 
      if (net)
2783
 
        net->pkt_nr= session->net.pkt_nr;
2784
 
    }
2785
 
  }
2786
 
  else
2787
 
  {
2788
 
    /*
2789
 
      We will just ask the master to send us /dev/null if we do not
2790
 
      want to load the data.
2791
 
      TODO: this a bug - needs to be done in I/O thread
2792
 
    */
2793
 
    if (net)
2794
 
      skip_load_data_infile(net);
2795
 
  }
2796
 
 
2797
 
error:
2798
 
  session->net.vio = 0;
2799
 
  const char *remember_db= session->db;
2800
 
  pthread_mutex_lock(&LOCK_thread_count);
2801
 
  session->catalog= 0;
2802
 
  session->set_db(NULL, 0);                   /* will free the current database */
2803
 
  session->query= 0;
2804
 
  session->query_length= 0;
2805
 
  pthread_mutex_unlock(&LOCK_thread_count);
2806
 
  close_thread_tables(session);
2807
 
 
2808
 
  if (session->is_slave_error)
2809
 
  {
2810
 
    /* this err/sql_errno code is copy-paste from net_send_error() */
2811
 
    const char *err;
2812
 
    int sql_errno;
2813
 
    if (session->is_error())
2814
 
    {
2815
 
      err= session->main_da.message();
2816
 
      sql_errno= session->main_da.sql_errno();
2817
 
    }
2818
 
    else
2819
 
    {
2820
 
      sql_errno=ER_UNKNOWN_ERROR;
2821
 
      err=ER(sql_errno);
2822
 
    }
2823
 
    rli->report(ERROR_LEVEL, sql_errno,
2824
 
                _("Error '%s' running LOAD DATA INFILE on table '%s'. "
2825
 
                  "Default database: '%s'"),
2826
 
                err, (char*)table_name, print_slave_db_safe(remember_db));
2827
 
    free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
2828
 
    return 1;
2829
 
  }
2830
 
  free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
2831
 
 
2832
 
  if (session->is_fatal_error)
2833
 
  {
2834
 
    char buf[256];
2835
 
    snprintf(buf, sizeof(buf),
2836
 
             _("Running LOAD DATA INFILE on table '%-.64s'."
2837
 
               " Default database: '%-.64s'"),
2838
 
             (char*)table_name,
2839
 
             print_slave_db_safe(remember_db));
2840
 
 
2841
 
    rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2842
 
                ER(ER_SLAVE_FATAL_ERROR), buf);
2843
 
    return 1;
2844
 
  }
2845
 
 
2846
 
  return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
2847
 
}
2848
 
 
2849
 
 
2850
 
/**************************************************************************
2851
 
  Rotate_log_event methods
2852
 
**************************************************************************/
2853
 
 
2854
 
/*
2855
 
  Rotate_log_event::pack_info()
2856
 
*/
2857
 
 
2858
 
void Rotate_log_event::pack_info(Protocol *protocol)
2859
 
{
2860
 
  char buf1[256], buf[22];
2861
 
  String tmp(buf1, sizeof(buf1), &my_charset_utf8_general_ci);
2862
 
  tmp.length(0);
2863
 
  tmp.append(new_log_ident.c_str(), ident_len);
2864
 
  tmp.append(STRING_WITH_LEN(";pos="));
2865
 
  tmp.append(llstr(pos,buf));
2866
 
  protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
2867
 
}
2868
 
 
2869
 
 
2870
 
/*
2871
 
  Rotate_log_event::Rotate_log_event() (2 constructors)
2872
 
*/
2873
 
 
2874
 
 
2875
 
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
2876
 
                                   uint32_t ident_len_arg, uint64_t pos_arg,
2877
 
                                   uint32_t flags_arg)
2878
 
  :Log_event(), pos(pos_arg),
2879
 
   ident_len(ident_len_arg
2880
 
               ? ident_len_arg
2881
 
               : strlen(new_log_ident_arg)),
2882
 
   flags(flags_arg)
2883
 
{
2884
 
  new_log_ident.assign(new_log_ident_arg, ident_len);
2885
 
  return;
2886
 
}
2887
 
 
2888
 
 
2889
 
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
2890
 
                                   const Format_description_log_event* description_event)
2891
 
  :Log_event(buf, description_event), flags(DUP_NAME)
2892
 
{
2893
 
  // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
2894
 
  uint8_t header_size= description_event->common_header_len;
2895
 
  uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
2896
 
  uint32_t ident_offset;
2897
 
  if (event_len < header_size)
2898
 
    return;
2899
 
  buf += header_size;
2900
 
  pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
2901
 
  ident_len = (uint)(event_len -
2902
 
                     (header_size+post_header_len));
2903
 
  ident_offset = post_header_len;
2904
 
  set_if_smaller(ident_len,FN_REFLEN-1);
2905
 
  new_log_ident.assign(buf + ident_offset, ident_len);
2906
 
  return;
2907
 
}
2908
 
 
2909
 
 
2910
 
/*
2911
 
  Rotate_log_event::write()
2912
 
*/
2913
 
 
2914
 
bool Rotate_log_event::write(IO_CACHE* file)
2915
 
{
2916
 
  char buf[ROTATE_HEADER_LEN];
2917
 
  int8store(buf + R_POS_OFFSET, pos);
2918
 
  return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
2919
 
          my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
2920
 
          my_b_safe_write(file, (const unsigned char*)new_log_ident.c_str(),
2921
 
                          (uint) ident_len));
2922
 
}
2923
 
 
2924
 
 
2925
 
/*
2926
 
  Got a rotate log event from the master.
2927
 
 
2928
 
  This is mainly used so that we can later figure out the logname and
2929
 
  position for the master.
2930
 
 
2931
 
  We can't rotate the slave's BINlog as this will cause infinitive rotations
2932
 
  in a A -> B -> A setup.
2933
 
  The NOTES below is a wrong comment which will disappear when 4.1 is merged.
2934
 
 
2935
 
  @retval
2936
 
    0   ok
2937
 
*/
2938
 
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
2939
 
{
2940
 
  pthread_mutex_lock(&rli->data_lock);
2941
 
  rli->event_relay_log_pos= my_b_tell(rli->cur_log);
2942
 
  /*
2943
 
    If we are in a transaction or in a group: the only normal case is
2944
 
    when the I/O thread was copying a big transaction, then it was
2945
 
    stopped and restarted: we have this in the relay log:
2946
 
 
2947
 
    BEGIN
2948
 
    ...
2949
 
    ROTATE (a fake one)
2950
 
    ...
2951
 
    COMMIT or ROLLBACK
2952
 
 
2953
 
    In that case, we don't want to touch the coordinates which
2954
 
    correspond to the beginning of the transaction.  Starting from
2955
 
    5.0.0, there also are some rotates from the slave itself, in the
2956
 
    relay log, which shall not change the group positions.
2957
 
  */
2958
 
  if ((server_id != ::server_id || rli->replicate_same_server_id) &&
2959
 
      !rli->is_in_group())
2960
 
  {
2961
 
    rli->group_master_log_name.assign(new_log_ident);
2962
 
    rli->notify_group_master_log_name_update();
2963
 
    rli->group_master_log_pos= pos;
2964
 
    rli->group_relay_log_name.assign(rli->event_relay_log_name);
2965
 
    rli->notify_group_relay_log_name_update();
2966
 
    rli->group_relay_log_pos= rli->event_relay_log_pos;
2967
 
    /*
2968
 
      Reset session->options and sql_mode etc, because this could be the signal of
2969
 
      a master's downgrade from 5.0 to 4.0.
2970
 
      However, no need to reset description_event_for_exec: indeed, if the next
2971
 
      master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
2972
 
      master is 4.0 then the events are in the slave's format (conversion).
2973
 
    */
2974
 
    set_slave_thread_options(session);
2975
 
    session->variables.auto_increment_increment=
2976
 
      session->variables.auto_increment_offset= 1;
2977
 
  }
2978
 
  pthread_mutex_unlock(&rli->data_lock);
2979
 
  pthread_cond_broadcast(&rli->data_cond);
2980
 
  flush_relay_log_info(rli);
2981
 
 
2982
 
  return(0);
2983
 
}
2984
 
 
2985
 
 
2986
 
Log_event::enum_skip_reason
2987
 
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
2988
 
{
2989
 
  enum_skip_reason reason= Log_event::do_shall_skip(rli);
2990
 
 
2991
 
  switch (reason) {
2992
 
  case Log_event::EVENT_SKIP_NOT:
2993
 
  case Log_event::EVENT_SKIP_COUNT:
2994
 
    return Log_event::EVENT_SKIP_NOT;
2995
 
 
2996
 
  case Log_event::EVENT_SKIP_IGNORE:
2997
 
    return Log_event::EVENT_SKIP_IGNORE;
2998
 
  }
2999
 
  assert(0);
3000
 
  return Log_event::EVENT_SKIP_NOT;             // To keep compiler happy
3001
 
}
3002
 
 
3003
 
 
3004
 
/**************************************************************************
3005
 
  Xid_log_event methods
3006
 
**************************************************************************/
3007
 
 
3008
 
void Xid_log_event::pack_info(Protocol *protocol)
3009
 
{
3010
 
  char buf[128], *pos;
3011
 
  pos= strcpy(buf, "COMMIT /* xid=")+14;
3012
 
  pos= int64_t10_to_str(xid, pos, 10);
3013
 
  pos= strcpy(pos, " */")+3;
3014
 
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3015
 
}
3016
 
 
3017
 
/**
3018
 
  @note
3019
 
  It's ok not to use int8store here,
3020
 
  as long as XID::set(uint64_t) and
3021
 
  XID::get_my_xid doesn't do it either.
3022
 
  We don't care about actual values of xids as long as
3023
 
  identical numbers compare identically
3024
 
*/
3025
 
 
3026
 
Xid_log_event::
3027
 
Xid_log_event(const char* buf,
3028
 
              const Format_description_log_event *description_event)
3029
 
  :Log_event(buf, description_event)
3030
 
{
3031
 
  buf+= description_event->common_header_len;
3032
 
  memcpy(&xid, buf, sizeof(xid));
3033
 
}
3034
 
 
3035
 
 
3036
 
bool Xid_log_event::write(IO_CACHE* file)
3037
 
{
3038
 
  return write_header(file, sizeof(xid)) ||
3039
 
         my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3040
 
}
3041
 
 
3042
 
 
3043
 
int Xid_log_event::do_apply_event(const Relay_log_info *)
3044
 
{
3045
 
  return end_trans(session, COMMIT);
3046
 
}
3047
 
 
3048
 
Log_event::enum_skip_reason
3049
 
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3050
 
{
3051
 
  if (rli->slave_skip_counter > 0) {
3052
 
    session->options&= ~OPTION_BEGIN;
3053
 
    return(Log_event::EVENT_SKIP_COUNT);
3054
 
  }
3055
 
  return(Log_event::do_shall_skip(rli));
3056
 
}
3057
 
 
3058
 
 
3059
 
/**************************************************************************
3060
 
  Slave_log_event methods
3061
 
**************************************************************************/
3062
 
 
3063
 
void Slave_log_event::pack_info(Protocol *protocol)
3064
 
{
3065
 
  ostringstream stream;
3066
 
  stream << "host=" << master_host << ",port=" << master_port;
3067
 
  stream << ",log=" << master_log << ",pos=" << master_pos;
3068
 
 
3069
 
  protocol->store(stream.str().c_str(), stream.str().length(),
3070
 
                  &my_charset_bin);
3071
 
}
3072
 
 
3073
 
 
3074
 
/**
3075
 
  @todo
3076
 
  re-write this better without holding both locks at the same time
3077
 
*/
3078
 
Slave_log_event::Slave_log_event(Session* session_arg,
3079
 
                                 Relay_log_info* rli)
3080
 
  :Log_event(session_arg, 0, 0) , mem_pool(0), master_host(0)
3081
 
{
3082
 
  if (!rli->inited)                             // QQ When can this happen ?
3083
 
    return;
3084
 
 
3085
 
  Master_info* mi = rli->mi;
3086
 
  // TODO: re-write this better without holding both locks at the same time
3087
 
  pthread_mutex_lock(&mi->data_lock);
3088
 
  pthread_mutex_lock(&rli->data_lock);
3089
 
  // on OOM, just do not initialize the structure and print the error
3090
 
  if ((mem_pool = (char*)malloc(get_data_size() + 1)))
3091
 
  {
3092
 
    master_host.assign(mi->getHostname());
3093
 
    master_log.assign(rli->group_master_log_name);
3094
 
    master_port = mi->getPort();
3095
 
    master_pos = rli->group_master_log_pos;
3096
 
  }
3097
 
  else
3098
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Out of memory while recording slave event"));
3099
 
  pthread_mutex_unlock(&rli->data_lock);
3100
 
  pthread_mutex_unlock(&mi->data_lock);
3101
 
  return;
3102
 
}
3103
 
 
3104
 
 
3105
 
Slave_log_event::~Slave_log_event()
3106
 
{
3107
 
  free(mem_pool);
3108
 
}
3109
 
 
3110
 
 
3111
 
int Slave_log_event::get_data_size()
3112
 
{
3113
 
  return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3114
 
}
3115
 
 
3116
 
 
3117
 
bool Slave_log_event::write(IO_CACHE* file)
3118
 
{
3119
 
  ulong event_length= get_data_size();
3120
 
  int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3121
 
  int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3122
 
  // log and host are already there
3123
 
 
3124
 
  return (write_header(file, event_length) ||
3125
 
          my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3126
 
}
3127
 
 
3128
 
 
3129
 
void Slave_log_event::init_from_mem_pool()
3130
 
{
3131
 
  master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3132
 
  master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3133
 
#ifdef FIXME
3134
 
  /* Assign these correctly */
3135
 
  master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
3136
 
  master_log.assign();
3137
 
#endif
3138
 
}
3139
 
 
3140
 
 
3141
 
int Slave_log_event::do_apply_event(const Relay_log_info *)
3142
 
{
3143
 
  if (drizzle_bin_log.is_open())
3144
 
    drizzle_bin_log.write(this);
3145
 
  return 0;
3146
 
}
3147
 
 
3148
 
 
3149
 
/**************************************************************************
3150
 
        Stop_log_event methods
3151
 
**************************************************************************/
3152
 
 
3153
 
/*
3154
 
  The master stopped.  We used to clean up all temporary tables but
3155
 
  this is useless as, as the master has shut down properly, it has
3156
 
  written all DROP TEMPORARY Table (prepared statements' deletion is
3157
 
  TODO only when we binlog prep stmts).  We used to clean up
3158
 
  slave_load_tmpdir, but this is useless as it has been cleared at the
3159
 
  end of LOAD DATA INFILE.  So we have nothing to do here.  The place
3160
 
  were we must do this cleaning is in
3161
 
  Start_log_event_v3::do_apply_event(), not here. Because if we come
3162
 
  here, the master was sane.
3163
 
*/
3164
 
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3165
 
{
3166
 
  /*
3167
 
    We do not want to update master_log pos because we get a rotate event
3168
 
    before stop, so by now group_master_log_name is set to the next log.
3169
 
    If we updated it, we will have incorrect master coordinates and this
3170
 
    could give false triggers in MASTER_POS_WAIT() that we have reached
3171
 
    the target position when in fact we have not.
3172
 
  */
3173
 
  if (session->options & OPTION_BEGIN)
3174
 
    rli->inc_event_relay_log_pos();
3175
 
  else
3176
 
  {
3177
 
    rli->inc_group_relay_log_pos(0);
3178
 
    flush_relay_log_info(rli);
3179
 
  }
3180
 
  return 0;
3181
 
}
3182
 
 
3183
 
 
3184
 
/**************************************************************************
3185
 
        Create_file_log_event methods
3186
 
**************************************************************************/
3187
 
 
3188
 
/*
3189
 
  Create_file_log_event ctor
3190
 
*/
3191
 
 
3192
 
Create_file_log_event::
3193
 
Create_file_log_event(Session* session_arg, sql_exchange* ex,
3194
 
                      const char* db_arg, const char* table_name_arg,
3195
 
                      List<Item>& fields_arg, enum enum_duplicates handle_dup,
3196
 
                      bool ignore,
3197
 
                      unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3198
 
  :Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3199
 
                  using_trans),
3200
 
   fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3201
 
   file_id(session_arg->file_id = drizzle_bin_log.next_file_id())
3202
 
{
3203
 
  sql_ex.force_new_format();
3204
 
  return;
3205
 
}
3206
 
 
3207
 
 
3208
 
/*
3209
 
  Create_file_log_event::write_data_body()
3210
 
*/
3211
 
 
3212
 
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3213
 
{
3214
 
  bool res;
3215
 
  if ((res= Load_log_event::write_data_body(file)) || fake_base)
3216
 
    return res;
3217
 
  return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3218
 
          my_b_safe_write(file, (unsigned char*) block, block_len));
3219
 
}
3220
 
 
3221
 
 
3222
 
/*
3223
 
  Create_file_log_event::write_data_header()
3224
 
*/
3225
 
 
3226
 
bool Create_file_log_event::write_data_header(IO_CACHE* file)
3227
 
{
3228
 
  bool res;
3229
 
  unsigned char buf[CREATE_FILE_HEADER_LEN];
3230
 
  if ((res= Load_log_event::write_data_header(file)) || fake_base)
3231
 
    return res;
3232
 
  int4store(buf + CF_FILE_ID_OFFSET, file_id);
3233
 
  return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
3234
 
}
3235
 
 
3236
 
 
3237
 
/*
3238
 
  Create_file_log_event::write_base()
3239
 
*/
3240
 
 
3241
 
bool Create_file_log_event::write_base(IO_CACHE* file)
3242
 
{
3243
 
  bool res;
3244
 
  fake_base= 1;                                 // pretend we are Load event
3245
 
  res= write(file);
3246
 
  fake_base= 0;
3247
 
  return res;
3248
 
}
3249
 
 
3250
 
/*
3251
 
  Create_file_log_event ctor
3252
 
*/
3253
 
 
3254
 
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
3255
 
                                             const Format_description_log_event* description_event)
3256
 
  :Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
3257
 
{
3258
 
  uint32_t block_offset;
3259
 
  uint32_t header_len= description_event->common_header_len;
3260
 
  uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
3261
 
  uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
3262
 
  if (!(event_buf= (const char*)malloc(len)) ||
3263
 
      memcpy((char *)event_buf, buf, len) ||
3264
 
      copy_log_event(event_buf,len,
3265
 
                     ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3266
 
                      load_header_len + header_len :
3267
 
                      (fake_base ? (header_len+load_header_len) :
3268
 
                       (header_len+load_header_len) +
3269
 
                       create_file_header_len)),
3270
 
                     description_event))
3271
 
    return;
3272
 
  if (description_event->binlog_version!=1)
3273
 
  {
3274
 
    file_id= uint4korr(buf +
3275
 
                       header_len +
3276
 
                       load_header_len + CF_FILE_ID_OFFSET);
3277
 
    /*
3278
 
      Note that it's ok to use get_data_size() below, because it is computed
3279
 
      with values we have already read from this event (because we called
3280
 
      copy_log_event()); we are not using slave's format info to decode
3281
 
      master's format, we are really using master's format info.
3282
 
      Anyway, both formats should be identical (except the common_header_len)
3283
 
      as these Load events are not changed between 4.0 and 5.0 (as logging of
3284
 
      LOAD DATA INFILE does not use Load_log_event in 5.0).
3285
 
 
3286
 
      The + 1 is for \0 terminating fname
3287
 
    */
3288
 
    block_offset= (description_event->common_header_len +
3289
 
                   Load_log_event::get_data_size() +
3290
 
                   create_file_header_len + 1);
3291
 
    if (len < block_offset)
3292
 
      return;
3293
 
    block = (unsigned char*)buf + block_offset;
3294
 
    block_len = len - block_offset;
3295
 
  }
3296
 
  else
3297
 
  {
3298
 
    sql_ex.force_new_format();
3299
 
    inited_from_old = 1;
3300
 
  }
3301
 
  return;
3302
 
}
3303
 
 
3304
 
 
3305
 
/*
3306
 
  Create_file_log_event::pack_info()
3307
 
*/
3308
 
 
3309
 
void Create_file_log_event::pack_info(Protocol *protocol)
3310
 
{
3311
 
  char buf[NAME_LEN*2 + 30 + 21*2], *pos;
3312
 
  pos= strcpy(buf, "db=")+3;
3313
 
  memcpy(pos, db, db_len);
3314
 
  pos= strcpy(pos + db_len, ";table=")+7;
3315
 
  memcpy(pos, table_name, table_name_len);
3316
 
  pos= strcpy(pos + table_name_len, ";file_id=")+9;
3317
 
  pos= int10_to_str((long) file_id, pos, 10);
3318
 
  pos= strcpy(pos, ";block_len=")+11;
3319
 
  pos= int10_to_str((long) block_len, pos, 10);
3320
 
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3321
 
}
3322
 
 
3323
 
 
3324
 
/*
3325
 
  Create_file_log_event::do_apply_event()
3326
 
*/
3327
 
 
3328
 
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
3329
 
{
3330
 
  char proc_info[17+FN_REFLEN+10], *fname_buf;
3331
 
  char *ext;
3332
 
  int fd = -1;
3333
 
  IO_CACHE file;
3334
 
  int error = 1;
3335
 
 
3336
 
  memset(&file, 0, sizeof(file));
3337
 
  fname_buf= strcpy(proc_info, "Making temp file ")+17;
3338
 
  ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
3339
 
  session->set_proc_info(proc_info);
3340
 
  my_delete(fname_buf, MYF(0)); // old copy may exist already
3341
 
  if ((fd= my_create(fname_buf, CREATE_MODE,
3342
 
                     O_WRONLY | O_EXCL,
3343
 
                     MYF(MY_WME))) < 0 ||
3344
 
      init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
3345
 
                    MYF(MY_WME|MY_NABP)))
3346
 
  {
3347
 
    rli->report(ERROR_LEVEL, my_errno,
3348
 
                _("Error in Create_file event: could not open file '%s'"),
3349
 
                fname_buf);
3350
 
    goto err;
3351
 
  }
3352
 
 
3353
 
  // a trick to avoid allocating another buffer
3354
 
  fname= fname_buf;
3355
 
  fname_len= (uint) ((strcpy(ext, ".data") + 5) - fname);
3356
 
  if (write_base(&file))
3357
 
  {
3358
 
    strcpy(ext, ".info"); // to have it right in the error message
3359
 
    rli->report(ERROR_LEVEL, my_errno,
3360
 
                _("Error in Create_file event: could not write to file '%s'"),
3361
 
                fname_buf);
3362
 
    goto err;
3363
 
  }
3364
 
  end_io_cache(&file);
3365
 
  my_close(fd, MYF(0));
3366
 
 
3367
 
  // fname_buf now already has .data, not .info, because we did our trick
3368
 
  my_delete(fname_buf, MYF(0)); // old copy may exist already
3369
 
  if ((fd= my_create(fname_buf, CREATE_MODE,
3370
 
                     O_WRONLY | O_EXCL,
3371
 
                     MYF(MY_WME))) < 0)
3372
 
  {
3373
 
    rli->report(ERROR_LEVEL, my_errno,
3374
 
                _("Error in Create_file event: could not open file '%s'"),
3375
 
                fname_buf);
3376
 
    goto err;
3377
 
  }
3378
 
  if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
3379
 
  {
3380
 
    rli->report(ERROR_LEVEL, my_errno,
3381
 
                _("Error in Create_file event: write to '%s' failed"),
3382
 
                fname_buf);
3383
 
    goto err;
3384
 
  }
3385
 
  error=0;                                      // Everything is ok
3386
 
 
3387
 
err:
3388
 
  if (error)
3389
 
    end_io_cache(&file);
3390
 
  if (fd >= 0)
3391
 
    my_close(fd, MYF(0));
3392
 
  session->set_proc_info(0);
3393
 
  return error == 0;
3394
 
}
3395
 
 
3396
 
 
3397
 
/**************************************************************************
3398
 
        Append_block_log_event methods
3399
 
**************************************************************************/
3400
 
 
3401
 
/*
3402
 
  Append_block_log_event ctor
3403
 
*/
3404
 
 
3405
 
Append_block_log_event::Append_block_log_event(Session *session_arg,
3406
 
                                               const char *db_arg,
3407
 
                                               unsigned char *block_arg,
3408
 
                                               uint32_t block_len_arg,
3409
 
                                               bool using_trans)
3410
 
  :Log_event(session_arg,0, using_trans), block(block_arg),
3411
 
   block_len(block_len_arg), file_id(session_arg->file_id), db(db_arg)
3412
 
{
3413
 
}
3414
 
 
3415
 
 
3416
 
/*
3417
 
  Append_block_log_event ctor
3418
 
*/
3419
 
 
3420
 
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
3421
 
                                               const Format_description_log_event* description_event)
3422
 
  :Log_event(buf, description_event),block(0)
3423
 
{
3424
 
  uint8_t common_header_len= description_event->common_header_len;
3425
 
  uint8_t append_block_header_len=
3426
 
    description_event->post_header_len[APPEND_BLOCK_EVENT-1];
3427
 
  uint32_t total_header_len= common_header_len+append_block_header_len;
3428
 
  if (len < total_header_len)
3429
 
    return;
3430
 
  file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
3431
 
  block= (unsigned char*)buf + total_header_len;
3432
 
  block_len= len - total_header_len;
3433
 
  return;
3434
 
}
3435
 
 
3436
 
 
3437
 
/*
3438
 
  Append_block_log_event::write()
3439
 
*/
3440
 
 
3441
 
bool Append_block_log_event::write(IO_CACHE* file)
3442
 
{
3443
 
  unsigned char buf[APPEND_BLOCK_HEADER_LEN];
3444
 
  int4store(buf + AB_FILE_ID_OFFSET, file_id);
3445
 
  return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
3446
 
          my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
3447
 
          my_b_safe_write(file, (unsigned char*) block, block_len));
3448
 
}
3449
 
 
3450
 
 
3451
 
/*
3452
 
  Append_block_log_event::pack_info()
3453
 
*/
3454
 
 
3455
 
void Append_block_log_event::pack_info(Protocol *protocol)
3456
 
{
3457
 
  char buf[256];
3458
 
  uint32_t length;
3459
 
  length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
3460
 
                             block_len);
3461
 
  protocol->store(buf, length, &my_charset_bin);
3462
 
}
3463
 
 
3464
 
 
3465
 
/*
3466
 
  Append_block_log_event::get_create_or_append()
3467
 
*/
3468
 
 
3469
 
int Append_block_log_event::get_create_or_append() const
3470
 
{
3471
 
  return 0; /* append to the file, fail if not exists */
3472
 
}
3473
 
 
3474
 
/*
3475
 
  Append_block_log_event::do_apply_event()
3476
 
*/
3477
 
 
3478
 
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
3479
 
{
3480
 
  char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
3481
 
  int fd;
3482
 
  int error = 1;
3483
 
 
3484
 
  fname= strcpy(proc_info, "Making temp file ")+17;
3485
 
  slave_load_file_stem(fname, file_id, server_id, ".data");
3486
 
  session->set_proc_info(proc_info);
3487
 
  if (get_create_or_append())
3488
 
  {
3489
 
    my_delete(fname, MYF(0)); // old copy may exist already
3490
 
    if ((fd= my_create(fname, CREATE_MODE,
3491
 
                       O_WRONLY | O_EXCL,
3492
 
                       MYF(MY_WME))) < 0)
3493
 
    {
3494
 
      rli->report(ERROR_LEVEL, my_errno,
3495
 
                  _("Error in %s event: could not create file '%s'"),
3496
 
                  get_type_str(), fname);
3497
 
      goto err;
3498
 
    }
3499
 
  }
3500
 
  else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
3501
 
                         MYF(MY_WME))) < 0)
3502
 
  {
3503
 
    rli->report(ERROR_LEVEL, my_errno,
3504
 
                _("Error in %s event: could not open file '%s'"),
3505
 
                get_type_str(), fname);
3506
 
    goto err;
3507
 
  }
3508
 
  if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
3509
 
  {
3510
 
    rli->report(ERROR_LEVEL, my_errno,
3511
 
                _("Error in %s event: write to '%s' failed"),
3512
 
                get_type_str(), fname);
3513
 
    goto err;
3514
 
  }
3515
 
  error=0;
3516
 
 
3517
 
err:
3518
 
  if (fd >= 0)
3519
 
    my_close(fd, MYF(0));
3520
 
  session->set_proc_info(0);
3521
 
  return(error);
3522
 
}
3523
 
 
3524
 
 
3525
 
/**************************************************************************
3526
 
        Delete_file_log_event methods
3527
 
**************************************************************************/
3528
 
 
3529
 
/*
3530
 
  Delete_file_log_event ctor
3531
 
*/
3532
 
 
3533
 
Delete_file_log_event::Delete_file_log_event(Session *session_arg, const char* db_arg,
3534
 
                                             bool using_trans)
3535
 
  :Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3536
 
{
3537
 
}
3538
 
 
3539
 
/*
3540
 
  Delete_file_log_event ctor
3541
 
*/
3542
 
 
3543
 
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
3544
 
                                             const Format_description_log_event* description_event)
3545
 
  :Log_event(buf, description_event),file_id(0)
3546
 
{
3547
 
  uint8_t common_header_len= description_event->common_header_len;
3548
 
  uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
3549
 
  if (len < (uint)(common_header_len + delete_file_header_len))
3550
 
    return;
3551
 
  file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
3552
 
}
3553
 
 
3554
 
 
3555
 
/*
3556
 
  Delete_file_log_event::write()
3557
 
*/
3558
 
 
3559
 
bool Delete_file_log_event::write(IO_CACHE* file)
3560
 
{
3561
 
 unsigned char buf[DELETE_FILE_HEADER_LEN];
3562
 
 int4store(buf + DF_FILE_ID_OFFSET, file_id);
3563
 
 return (write_header(file, sizeof(buf)) ||
3564
 
         my_b_safe_write(file, buf, sizeof(buf)));
3565
 
}
3566
 
 
3567
 
 
3568
 
/*
3569
 
  Delete_file_log_event::pack_info()
3570
 
*/
3571
 
 
3572
 
void Delete_file_log_event::pack_info(Protocol *protocol)
3573
 
{
3574
 
  char buf[64];
3575
 
  uint32_t length;
3576
 
  length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
3577
 
  protocol->store(buf, (int32_t) length, &my_charset_bin);
3578
 
}
3579
 
 
3580
 
/*
3581
 
  Delete_file_log_event::do_apply_event()
3582
 
*/
3583
 
 
3584
 
int Delete_file_log_event::do_apply_event(const Relay_log_info *)
3585
 
{
3586
 
  char fname[FN_REFLEN+10];
3587
 
  char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
3588
 
  (void) my_delete(fname, MYF(MY_WME));
3589
 
  strcpy(ext, ".info");
3590
 
  (void) my_delete(fname, MYF(MY_WME));
3591
 
  return 0;
3592
 
}
3593
 
 
3594
 
 
3595
 
/**************************************************************************
3596
 
        Execute_load_log_event methods
3597
 
**************************************************************************/
3598
 
 
3599
 
/*
3600
 
  Execute_load_log_event ctor
3601
 
*/
3602
 
 
3603
 
Execute_load_log_event::Execute_load_log_event(Session *session_arg,
3604
 
                                               const char* db_arg,
3605
 
                                               bool using_trans)
3606
 
  :Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3607
 
{
3608
 
}
3609
 
 
3610
 
 
3611
 
/*
3612
 
  Execute_load_log_event ctor
3613
 
*/
3614
 
 
3615
 
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
3616
 
                                               const Format_description_log_event* description_event)
3617
 
  :Log_event(buf, description_event), file_id(0)
3618
 
{
3619
 
  uint8_t common_header_len= description_event->common_header_len;
3620
 
  uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
3621
 
  if (len < (uint)(common_header_len+exec_load_header_len))
3622
 
    return;
3623
 
  file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
3624
 
}
3625
 
 
3626
 
 
3627
 
/*
3628
 
  Execute_load_log_event::write()
3629
 
*/
3630
 
 
3631
 
bool Execute_load_log_event::write(IO_CACHE* file)
3632
 
{
3633
 
  unsigned char buf[EXEC_LOAD_HEADER_LEN];
3634
 
  int4store(buf + EL_FILE_ID_OFFSET, file_id);
3635
 
  return (write_header(file, sizeof(buf)) ||
3636
 
          my_b_safe_write(file, buf, sizeof(buf)));
3637
 
}
3638
 
 
3639
 
 
3640
 
/*
3641
 
  Execute_load_log_event::pack_info()
3642
 
*/
3643
 
 
3644
 
void Execute_load_log_event::pack_info(Protocol *protocol)
3645
 
{
3646
 
  char buf[64];
3647
 
  uint32_t length;
3648
 
  length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
3649
 
  protocol->store(buf, (int32_t) length, &my_charset_bin);
3650
 
}
3651
 
 
3652
 
 
3653
 
/*
3654
 
  Execute_load_log_event::do_apply_event()
3655
 
*/
3656
 
 
3657
 
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
3658
 
{
3659
 
  char fname[FN_REFLEN+10];
3660
 
  char *ext;
3661
 
  int fd;
3662
 
  int error= 1;
3663
 
  IO_CACHE file;
3664
 
  Load_log_event *lev= 0;
3665
 
 
3666
 
  ext= slave_load_file_stem(fname, file_id, server_id, ".info");
3667
 
  if ((fd = my_open(fname, O_RDONLY,
3668
 
                    MYF(MY_WME))) < 0 ||
3669
 
      init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
3670
 
                    MYF(MY_WME|MY_NABP)))
3671
 
  {
3672
 
    rli->report(ERROR_LEVEL, my_errno,
3673
 
                _("Error in Exec_load event: could not open file '%s'"),
3674
 
                fname);
3675
 
    goto err;
3676
 
  }
3677
 
  if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
3678
 
                                                         (pthread_mutex_t*)0,
3679
 
                                                         rli->relay_log.description_event_for_exec)) ||
3680
 
      lev->get_type_code() != NEW_LOAD_EVENT)
3681
 
  {
3682
 
    rli->report(ERROR_LEVEL, 0,
3683
 
                _("Error in Exec_load event: "
3684
 
                  "file '%s' appears corrupted"),
3685
 
                fname);
3686
 
    goto err;
3687
 
  }
3688
 
 
3689
 
  lev->session = session;
3690
 
  /*
3691
 
    lev->do_apply_event should use rli only for errors i.e. should
3692
 
    not advance rli's position.
3693
 
 
3694
 
    lev->do_apply_event is the place where the table is loaded (it
3695
 
    calls mysql_load()).
3696
 
  */
3697
 
 
3698
 
  const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
3699
 
  if (lev->do_apply_event(0,rli,1))
3700
 
  {
3701
 
    /*
3702
 
      We want to indicate the name of the file that could not be loaded
3703
 
      (SQL_LOADxxx).
3704
 
      But as we are here we are sure the error is in rli->last_slave_error and
3705
 
      rli->last_slave_errno (example of error: duplicate entry for key), so we
3706
 
      don't want to overwrite it with the filename.
3707
 
      What we want instead is add the filename to the current error message.
3708
 
    */
3709
 
    char *tmp= strdup(rli->last_error().message);
3710
 
    if (tmp)
3711
 
    {
3712
 
      rli->report(ERROR_LEVEL, rli->last_error().number,
3713
 
                  _("%s. Failed executing load from '%s'"),
3714
 
                  tmp, fname);
3715
 
      free(tmp);
3716
 
    }
3717
 
    goto err;
3718
 
  }
3719
 
  /*
3720
 
    We have an open file descriptor to the .info file; we need to close it
3721
 
    or Windows will refuse to delete the file in my_delete().
3722
 
  */
3723
 
  if (fd >= 0)
3724
 
  {
3725
 
    my_close(fd, MYF(0));
3726
 
    end_io_cache(&file);
3727
 
    fd= -1;
3728
 
  }
3729
 
  (void) my_delete(fname, MYF(MY_WME));
3730
 
  memcpy(ext, ".data", 6);
3731
 
  (void) my_delete(fname, MYF(MY_WME));
3732
 
  error = 0;
3733
 
 
3734
 
err:
3735
 
  delete lev;
3736
 
  if (fd >= 0)
3737
 
  {
3738
 
    my_close(fd, MYF(0));
3739
 
    end_io_cache(&file);
3740
 
  }
3741
 
  return error;
3742
 
}
3743
 
 
3744
 
 
3745
 
/**************************************************************************
3746
 
        Begin_load_query_log_event methods
3747
 
**************************************************************************/
3748
 
 
3749
 
Begin_load_query_log_event::
3750
 
Begin_load_query_log_event(Session* session_arg, const char* db_arg, unsigned char* block_arg,
3751
 
                           uint32_t block_len_arg, bool using_trans)
3752
 
  :Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
3753
 
                          using_trans)
3754
 
{
3755
 
   file_id= session_arg->file_id= drizzle_bin_log.next_file_id();
3756
 
}
3757
 
 
3758
 
 
3759
 
Begin_load_query_log_event::
3760
 
Begin_load_query_log_event(const char* buf, uint32_t len,
3761
 
                           const Format_description_log_event* desc_event)
3762
 
  :Append_block_log_event(buf, len, desc_event)
3763
 
{
3764
 
}
3765
 
 
3766
 
 
3767
 
int Begin_load_query_log_event::get_create_or_append() const
3768
 
{
3769
 
  return 1; /* create the file */
3770
 
}
3771
 
 
3772
 
 
3773
 
Log_event::enum_skip_reason
3774
 
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
3775
 
{
3776
 
  /*
3777
 
    If the slave skip counter is 1, then we should not start executing
3778
 
    on the next event.
3779
 
  */
3780
 
  return continue_group(rli);
3781
 
}
3782
 
 
3783
 
 
3784
 
/**************************************************************************
3785
 
        Execute_load_query_log_event methods
3786
 
**************************************************************************/
3787
 
 
3788
 
 
3789
 
Execute_load_query_log_event::
3790
 
Execute_load_query_log_event(Session *session_arg, const char* query_arg,
3791
 
                             ulong query_length_arg, uint32_t fn_pos_start_arg,
3792
 
                             uint32_t fn_pos_end_arg,
3793
 
                             enum_load_dup_handling dup_handling_arg,
3794
 
                             bool using_trans, bool suppress_use,
3795
 
                             Session::killed_state killed_err_arg):
3796
 
  Query_log_event(session_arg, query_arg, query_length_arg, using_trans,
3797
 
                  suppress_use, killed_err_arg),
3798
 
  file_id(session_arg->file_id), fn_pos_start(fn_pos_start_arg),
3799
 
  fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
3800
 
{
3801
 
}
3802
 
 
3803
 
 
3804
 
Execute_load_query_log_event::
3805
 
Execute_load_query_log_event(const char* buf, uint32_t event_len,
3806
 
                             const Format_description_log_event* desc_event):
3807
 
  Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
3808
 
  file_id(0), fn_pos_start(0), fn_pos_end(0)
3809
 
{
3810
 
  if (!Query_log_event::is_valid())
3811
 
    return;
3812
 
 
3813
 
  buf+= desc_event->common_header_len;
3814
 
 
3815
 
  fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
3816
 
  fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
3817
 
  dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
3818
 
 
3819
 
  if (fn_pos_start > q_len || fn_pos_end > q_len ||
3820
 
      dup_handling > LOAD_DUP_REPLACE)
3821
 
    return;
3822
 
 
3823
 
  file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
3824
 
}
3825
 
 
3826
 
 
3827
 
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
3828
 
{
3829
 
  return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
3830
 
}
3831
 
 
3832
 
 
3833
 
bool
3834
 
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
3835
 
{
3836
 
  unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
3837
 
  int4store(buf, file_id);
3838
 
  int4store(buf + 4, fn_pos_start);
3839
 
  int4store(buf + 4 + 4, fn_pos_end);
3840
 
  *(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
3841
 
  return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
3842
 
}
3843
 
 
3844
 
 
3845
 
void Execute_load_query_log_event::pack_info(Protocol *protocol)
3846
 
{
3847
 
  char *buf, *pos;
3848
 
  if (!(buf= (char*) malloc(9 + db_len + q_len + 10 + 21)))
3849
 
    return;
3850
 
  pos= buf;
3851
 
  if (db && db_len)
3852
 
  {
3853
 
    pos= strcpy(buf, "use `")+5;
3854
 
    memcpy(pos, db, db_len);
3855
 
    pos= strcpy(pos+db_len, "`; ")+3;
3856
 
  }
3857
 
  if (query && q_len)
3858
 
  {
3859
 
    memcpy(pos, query, q_len);
3860
 
    pos+= q_len;
3861
 
  }
3862
 
  pos= strcpy(pos, " ;file_id=")+10;
3863
 
  pos= int10_to_str((long) file_id, pos, 10);
3864
 
  protocol->store(buf, pos-buf, &my_charset_bin);
3865
 
  free(buf);
3866
 
}
3867
 
 
3868
 
 
3869
 
int
3870
 
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
3871
 
{
3872
 
  char *p;
3873
 
  char *buf;
3874
 
  char *fname;
3875
 
  char *fname_end;
3876
 
  int error;
3877
 
 
3878
 
  buf= (char*) malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
3879
 
                      (FN_REFLEN + 10) + 10 + 8 + 5);
3880
 
 
3881
 
  /* Replace filename and LOCAL keyword in query before executing it */
3882
 
  if (buf == NULL)
3883
 
  {
3884
 
    rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3885
 
                ER(ER_SLAVE_FATAL_ERROR),
3886
 
                _("Not enough memory"));
3887
 
    return 1;
3888
 
  }
3889
 
 
3890
 
  p= buf;
3891
 
  memcpy(p, query, fn_pos_start);
3892
 
  p+= fn_pos_start;
3893
 
  fname= (p= strncpy(p, STRING_WITH_LEN(" INFILE \'")) + 9);
3894
 
  p= slave_load_file_stem(p, file_id, server_id, ".data");
3895
 
  fname_end= p= strchr(p, '\0');                      // Safer than p=p+5
3896
 
  *(p++)='\'';
3897
 
  switch (dup_handling) {
3898
 
  case LOAD_DUP_IGNORE:
3899
 
    p= strncpy(p, STRING_WITH_LEN(" IGNORE")) + 7;
3900
 
    break;
3901
 
  case LOAD_DUP_REPLACE:
3902
 
    p= strncpy(p, STRING_WITH_LEN(" REPLACE")) + 8;
3903
 
    break;
3904
 
  default:
3905
 
    /* Ordinary load data */
3906
 
    break;
3907
 
  }
3908
 
  size_t end_len = q_len-fn_pos_end;
3909
 
  p= strncpy(p, STRING_WITH_LEN(" INTO")) + 5;
3910
 
  p= strncpy(p, query+fn_pos_end, end_len);
3911
 
  p+= end_len;
3912
 
 
3913
 
  error= Query_log_event::do_apply_event(rli, buf, p-buf);
3914
 
 
3915
 
  /* Forging file name for deletion in same buffer */
3916
 
  *fname_end= 0;
3917
 
 
3918
 
  /*
3919
 
    If there was an error the slave is going to stop, leave the
3920
 
    file so that we can re-execute this event at START SLAVE.
3921
 
  */
3922
 
  if (!error)
3923
 
    (void) my_delete(fname, MYF(MY_WME));
3924
 
 
3925
 
  free(buf);
3926
 
  return error;
3927
 
}
3928
 
 
3929
 
 
3930
 
/**************************************************************************
3931
 
        sql_ex_info methods
3932
 
**************************************************************************/
3933
 
 
3934
 
/*
3935
 
  sql_ex_info::write_data()
3936
 
*/
3937
 
 
3938
 
bool sql_ex_info::write_data(IO_CACHE* file)
3939
 
{
3940
 
  if (new_format())
3941
 
  {
3942
 
    return (write_str(file, field_term, (uint) field_term_len) ||
3943
 
            write_str(file, enclosed,   (uint) enclosed_len) ||
3944
 
            write_str(file, line_term,  (uint) line_term_len) ||
3945
 
            write_str(file, line_start, (uint) line_start_len) ||
3946
 
            write_str(file, escaped,    (uint) escaped_len) ||
3947
 
            my_b_safe_write(file,(unsigned char*) &opt_flags,1));
3948
 
  }
3949
 
  else
3950
 
    assert(0);
3951
 
  return true;
3952
 
}
3953
 
 
3954
 
 
3955
 
/*
3956
 
  sql_ex_info::init()
3957
 
*/
3958
 
 
3959
 
const char *sql_ex_info::init(const char *buf, const char *buf_end,
3960
 
                              bool use_new_format)
3961
 
{
3962
 
  cached_new_format = use_new_format;
3963
 
  if (use_new_format)
3964
 
  {
3965
 
    empty_flags=0;
3966
 
    /*
3967
 
      The code below assumes that buf will not disappear from
3968
 
      under our feet during the lifetime of the event. This assumption
3969
 
      holds true in the slave thread if the log is in new format, but is not
3970
 
      the case when we have old format because we will be reusing net buffer
3971
 
      to read the actual file before we write out the Create_file event.
3972
 
    */
3973
 
    if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
3974
 
        read_str(&buf, buf_end, &enclosed,   &enclosed_len) ||
3975
 
        read_str(&buf, buf_end, &line_term,  &line_term_len) ||
3976
 
        read_str(&buf, buf_end, &line_start, &line_start_len) ||
3977
 
        read_str(&buf, buf_end, &escaped,    &escaped_len))
3978
 
      return 0;
3979
 
    opt_flags = *buf++;
3980
 
  }
3981
 
  else
3982
 
  {
3983
 
    field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
3984
 
    field_term = buf++;                 // Use first byte in string
3985
 
    enclosed=    buf++;
3986
 
    line_term=   buf++;
3987
 
    line_start=  buf++;
3988
 
    escaped=     buf++;
3989
 
    opt_flags =  *buf++;
3990
 
    empty_flags= *buf++;
3991
 
    if (empty_flags & FIELD_TERM_EMPTY)
3992
 
      field_term_len=0;
3993
 
    if (empty_flags & ENCLOSED_EMPTY)
3994
 
      enclosed_len=0;
3995
 
    if (empty_flags & LINE_TERM_EMPTY)
3996
 
      line_term_len=0;
3997
 
    if (empty_flags & LINE_START_EMPTY)
3998
 
      line_start_len=0;
3999
 
    if (empty_flags & ESCAPED_EMPTY)
4000
 
      escaped_len=0;
4001
 
  }
4002
 
  return buf;
4003
 
}
4004
 
 
4005
 
 
4006
 
/**************************************************************************
4007
 
        Rows_log_event member functions
4008
 
**************************************************************************/
4009
 
 
4010
 
Rows_log_event::Rows_log_event(Session *session_arg, Table *tbl_arg, ulong tid,
4011
 
                               MY_BITMAP const *cols, bool is_transactional)
4012
 
  : Log_event(session_arg, 0, is_transactional),
4013
 
    m_row_count(0),
4014
 
    m_table(tbl_arg),
4015
 
    m_table_id(tid),
4016
 
    m_width(tbl_arg ? tbl_arg->s->fields : 1),
4017
 
    m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4018
 
    , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4019
 
{
4020
 
  /*
4021
 
    We allow a special form of dummy event when the table, and cols
4022
 
    are null and the table id is UINT32_MAX.  This is a temporary
4023
 
    solution, to be able to terminate a started statement in the
4024
 
    binary log: the extraneous events will be removed in the future.
4025
 
   */
4026
 
  assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4027
 
 
4028
 
  if (session_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4029
 
      set_flags(NO_FOREIGN_KEY_CHECKS_F);
4030
 
  if (session_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4031
 
      set_flags(RELAXED_UNIQUE_CHECKS_F);
4032
 
  /* if bitmap_init fails, caught in is_valid() */
4033
 
  if (likely(!bitmap_init(&m_cols,
4034
 
                          m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4035
 
                          m_width,
4036
 
                          false)))
4037
 
  {
4038
 
    /* Cols can be zero if this is a dummy binrows event */
4039
 
    if (likely(cols != NULL))
4040
 
    {
4041
 
      memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4042
 
      create_last_word_mask(&m_cols);
4043
 
    }
4044
 
  }
4045
 
  else
4046
 
  {
4047
 
    // Needed because bitmap_init() does not set it to null on failure
4048
 
    m_cols.bitmap= 0;
4049
 
  }
4050
 
}
4051
 
 
4052
 
 
4053
 
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4054
 
                               Log_event_type event_type,
4055
 
                               const Format_description_log_event
4056
 
                               *description_event)
4057
 
  : Log_event(buf, description_event),
4058
 
    m_row_count(0),
4059
 
    m_table(NULL),
4060
 
    m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4061
 
    , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4062
 
{
4063
 
  uint8_t const common_header_len= description_event->common_header_len;
4064
 
  uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4065
 
 
4066
 
  const char *post_start= buf + common_header_len;
4067
 
  post_start+= RW_MAPID_OFFSET;
4068
 
  if (post_header_len == 6)
4069
 
  {
4070
 
    /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4071
 
    m_table_id= uint4korr(post_start);
4072
 
    post_start+= 4;
4073
 
  }
4074
 
  else
4075
 
  {
4076
 
    m_table_id= (ulong) uint6korr(post_start);
4077
 
    post_start+= RW_FLAGS_OFFSET;
4078
 
  }
4079
 
 
4080
 
  m_flags= uint2korr(post_start);
4081
 
 
4082
 
  unsigned char const *const var_start=
4083
 
    (const unsigned char *)buf + common_header_len + post_header_len;
4084
 
  unsigned char const *const ptr_width= var_start;
4085
 
  unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4086
 
  m_width = net_field_length(&ptr_after_width);
4087
 
  /* if bitmap_init fails, catched in is_valid() */
4088
 
  if (likely(!bitmap_init(&m_cols,
4089
 
                          m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4090
 
                          m_width,
4091
 
                          false)))
4092
 
  {
4093
 
    memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4094
 
    create_last_word_mask(&m_cols);
4095
 
    ptr_after_width+= (m_width + 7) / 8;
4096
 
  }
4097
 
  else
4098
 
  {
4099
 
    // Needed because bitmap_init() does not set it to null on failure
4100
 
    m_cols.bitmap= NULL;
4101
 
    return;
4102
 
  }
4103
 
 
4104
 
  m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4105
 
 
4106
 
  if (event_type == UPDATE_ROWS_EVENT)
4107
 
  {
4108
 
    /* if bitmap_init fails, caught in is_valid() */
4109
 
    if (likely(!bitmap_init(&m_cols_ai,
4110
 
                            m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4111
 
                            m_width,
4112
 
                            false)))
4113
 
    {
4114
 
      memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4115
 
      create_last_word_mask(&m_cols_ai);
4116
 
      ptr_after_width+= (m_width + 7) / 8;
4117
 
    }
4118
 
    else
4119
 
    {
4120
 
      // Needed because bitmap_init() does not set it to null on failure
4121
 
      m_cols_ai.bitmap= 0;
4122
 
      return;
4123
 
    }
4124
 
  }
4125
 
 
4126
 
  const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4127
 
 
4128
 
  size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4129
 
 
4130
 
  m_rows_buf= (unsigned char*) malloc(data_size);
4131
 
  if (likely((bool)m_rows_buf))
4132
 
  {
4133
 
    m_curr_row= m_rows_buf;
4134
 
    m_rows_end= m_rows_buf + data_size;
4135
 
    m_rows_cur= m_rows_end;
4136
 
    memcpy(m_rows_buf, ptr_rows_data, data_size);
4137
 
  }
4138
 
  else
4139
 
    m_cols.bitmap= 0; // to not free it
4140
 
 
4141
 
  return;
4142
 
}
4143
 
 
4144
 
Rows_log_event::~Rows_log_event()
4145
 
{
4146
 
  if (m_cols.bitmap == m_bitbuf) // no malloc happened
4147
 
    m_cols.bitmap= 0; // so no free in bitmap_free
4148
 
  bitmap_free(&m_cols); // To pair with bitmap_init().
4149
 
  free((unsigned char*)m_rows_buf);
4150
 
}
4151
 
 
4152
 
int Rows_log_event::get_data_size()
4153
 
{
4154
 
  int const type_code= get_type_code();
4155
 
 
4156
 
  unsigned char buf[sizeof(m_width)+1];
4157
 
  unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4158
 
 
4159
 
  int data_size= ROWS_HEADER_LEN;
4160
 
  data_size+= no_bytes_in_map(&m_cols);
4161
 
  data_size+= end - buf;
4162
 
 
4163
 
  if (type_code == UPDATE_ROWS_EVENT)
4164
 
    data_size+= no_bytes_in_map(&m_cols_ai);
4165
 
 
4166
 
  data_size+= (m_rows_cur - m_rows_buf);
4167
 
  return data_size;
4168
 
}
4169
 
 
4170
 
 
4171
 
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4172
 
{
4173
 
  /*
4174
 
    When the table has a primary key, we would probably want, by default, to
4175
 
    log only the primary key value instead of the entire "before image". This
4176
 
    would save binlog space. TODO
4177
 
  */
4178
 
 
4179
 
  /*
4180
 
    If length is zero, there is nothing to write, so we just
4181
 
    return. Note that this is not an optimization, since calling
4182
 
    realloc() with size 0 means free().
4183
 
   */
4184
 
  if (length == 0)
4185
 
  {
4186
 
    m_row_count++;
4187
 
    return(0);
4188
 
  }
4189
 
 
4190
 
  assert(m_rows_buf <= m_rows_cur);
4191
 
  assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4192
 
  assert(m_rows_cur <= m_rows_end);
4193
 
 
4194
 
  /* The cast will always work since m_rows_cur <= m_rows_end */
4195
 
  if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4196
 
  {
4197
 
    size_t const block_size= 1024;
4198
 
    const size_t cur_size= m_rows_cur - m_rows_buf;
4199
 
    const size_t new_alloc=
4200
 
        block_size * ((cur_size + length + block_size - 1) / block_size);
4201
 
 
4202
 
    unsigned char* new_buf= (unsigned char*)realloc(m_rows_buf, new_alloc);
4203
 
    if (unlikely(!new_buf))
4204
 
      return(HA_ERR_OUT_OF_MEM);
4205
 
 
4206
 
    /* If the memory moved, we need to move the pointers */
4207
 
    if (new_buf != m_rows_buf)
4208
 
    {
4209
 
      m_rows_buf= new_buf;
4210
 
      m_rows_cur= m_rows_buf + cur_size;
4211
 
    }
4212
 
 
4213
 
    /*
4214
 
       The end pointer should always be changed to point to the end of
4215
 
       the allocated memory.
4216
 
    */
4217
 
    m_rows_end= m_rows_buf + new_alloc;
4218
 
  }
4219
 
 
4220
 
  assert(m_rows_cur + length <= m_rows_end);
4221
 
  memcpy(m_rows_cur, row_data, length);
4222
 
  m_rows_cur+= length;
4223
 
  m_row_count++;
4224
 
  return(0);
4225
 
}
4226
 
 
4227
 
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
4228
 
{
4229
 
  int error= 0;
4230
 
  /*
4231
 
    If m_table_id == UINT32_MAX, then we have a dummy event that does not
4232
 
    contain any data.  In that case, we just remove all tables in the
4233
 
    tables_to_lock list, close the thread tables, and return with
4234
 
    success.
4235
 
   */
4236
 
  if (m_table_id == UINT32_MAX)
4237
 
  {
4238
 
    /*
4239
 
       This one is supposed to be set: just an extra check so that
4240
 
       nothing strange has happened.
4241
 
     */
4242
 
    assert(get_flags(STMT_END_F));
4243
 
 
4244
 
    const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4245
 
    close_thread_tables(session);
4246
 
    session->clear_error();
4247
 
    return(0);
4248
 
  }
4249
 
 
4250
 
  /*
4251
 
    'session' has been set by exec_relay_log_event(), just before calling
4252
 
    do_apply_event(). We still check here to prevent future coding
4253
 
    errors.
4254
 
  */
4255
 
  assert(rli->sql_session == session);
4256
 
 
4257
 
  /*
4258
 
    If there is no locks taken, this is the first binrow event seen
4259
 
    after the table map events.  We should then lock all the tables
4260
 
    used in the transaction and proceed with execution of the actual
4261
 
    event.
4262
 
  */
4263
 
  if (!session->lock)
4264
 
  {
4265
 
    bool need_reopen= 1; /* To execute the first lap of the loop below */
4266
 
 
4267
 
    /*
4268
 
      lock_tables() reads the contents of session->lex, so they must be
4269
 
      initialized. Contrary to in
4270
 
      Table_map_log_event::do_apply_event() we don't call
4271
 
      mysql_init_query() as that may reset the binlog format.
4272
 
    */
4273
 
    lex_start(session);
4274
 
 
4275
 
    /*
4276
 
      There are a few flags that are replicated with each row event.
4277
 
      Make sure to set/clear them before executing the main body of
4278
 
      the event.
4279
 
    */
4280
 
    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4281
 
        session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4282
 
    else
4283
 
        session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4284
 
 
4285
 
    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4286
 
        session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4287
 
    else
4288
 
        session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4289
 
    /* A small test to verify that objects have consistent types */
4290
 
    assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4291
 
 
4292
 
 
4293
 
    while ((error= lock_tables(session, rli->tables_to_lock,
4294
 
                               rli->tables_to_lock_count, &need_reopen)))
4295
 
    {
4296
 
      if (!need_reopen)
4297
 
      {
4298
 
        if (session->is_slave_error || session->is_fatal_error)
4299
 
        {
4300
 
          /*
4301
 
            Error reporting borrowed from Query_log_event with many excessive
4302
 
            simplifications (we don't honour --slave-skip-errors)
4303
 
          */
4304
 
          uint32_t actual_error= session->main_da.sql_errno();
4305
 
          rli->report(ERROR_LEVEL, actual_error,
4306
 
                      _("Error '%s' in %s event: when locking tables"),
4307
 
                      (actual_error
4308
 
                       ? session->main_da.message()
4309
 
                       : _("unexpected success or fatal error")),
4310
 
                      get_type_str());
4311
 
          session->is_fatal_error= 1;
4312
 
        }
4313
 
        else
4314
 
        {
4315
 
          rli->report(ERROR_LEVEL, error,
4316
 
                      _("Error in %s event: when locking tables"),
4317
 
                      get_type_str());
4318
 
        }
4319
 
        const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4320
 
        return(error);
4321
 
      }
4322
 
 
4323
 
      TableList *tables= rli->tables_to_lock;
4324
 
      close_tables_for_reopen(session, &tables);
4325
 
 
4326
 
      uint32_t tables_count= rli->tables_to_lock_count;
4327
 
      if ((error= open_tables(session, &tables, &tables_count, 0)))
4328
 
      {
4329
 
        if (session->is_slave_error || session->is_fatal_error)
4330
 
        {
4331
 
          /*
4332
 
            Error reporting borrowed from Query_log_event with many excessive
4333
 
            simplifications (we don't honour --slave-skip-errors)
4334
 
          */
4335
 
          uint32_t actual_error= session->main_da.sql_errno();
4336
 
          rli->report(ERROR_LEVEL, actual_error,
4337
 
                      _("Error '%s' on reopening tables"),
4338
 
                      (actual_error
4339
 
                       ? session->main_da.message()
4340
 
                       : _("unexpected success or fatal error")));
4341
 
          session->is_slave_error= 1;
4342
 
        }
4343
 
        const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4344
 
        return(error);
4345
 
      }
4346
 
    }
4347
 
 
4348
 
    /*
4349
 
      When the open and locking succeeded, we check all tables to
4350
 
      ensure that they still have the correct type.
4351
 
 
4352
 
      We can use a down cast here since we know that every table added
4353
 
      to the tables_to_lock is a RPL_TableList.
4354
 
    */
4355
 
 
4356
 
    {
4357
 
      RPL_TableList *ptr= rli->tables_to_lock;
4358
 
      for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
4359
 
      {
4360
 
        if (ptr->m_tabledef.compatible_with(rli, ptr->table))
4361
 
        {
4362
 
          mysql_unlock_tables(session, session->lock);
4363
 
          session->lock= 0;
4364
 
          session->is_slave_error= 1;
4365
 
          const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4366
 
          return(ERR_BAD_TABLE_DEF);
4367
 
        }
4368
 
      }
4369
 
    }
4370
 
 
4371
 
    /*
4372
 
      ... and then we add all the tables to the table map and remove
4373
 
      them from tables to lock.
4374
 
 
4375
 
      We also invalidate the query cache for all the tables, since
4376
 
      they will now be changed.
4377
 
 
4378
 
      TODO [/Matz]: Maybe the query cache should not be invalidated
4379
 
      here? It might be that a table is not changed, even though it
4380
 
      was locked for the statement.  We do know that each
4381
 
      Rows_log_event contain at least one row, so after processing one
4382
 
      Rows_log_event, we can invalidate the query cache for the
4383
 
      associated table.
4384
 
     */
4385
 
    for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
4386
 
    {
4387
 
      const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
4388
 
    }
4389
 
  }
4390
 
 
4391
 
  Table*
4392
 
    table=
4393
 
    m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
4394
 
 
4395
 
  if (table)
4396
 
  {
4397
 
    /*
4398
 
      table == NULL means that this table should not be replicated
4399
 
      (this was set up by Table_map_log_event::do_apply_event()
4400
 
      which tested replicate-* rules).
4401
 
    */
4402
 
 
4403
 
    /*
4404
 
      It's not needed to set_time() but
4405
 
      1) it continues the property that "Time" in SHOW PROCESSLIST shows how
4406
 
      much slave is behind
4407
 
      2) it will be needed when we allow replication from a table with no
4408
 
      TIMESTAMP column to a table with one.
4409
 
      So we call set_time(), like in SBR. Presently it changes nothing.
4410
 
    */
4411
 
    session->set_time((time_t)when);
4412
 
    /*
4413
 
      There are a few flags that are replicated with each row event.
4414
 
      Make sure to set/clear them before executing the main body of
4415
 
      the event.
4416
 
    */
4417
 
    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4418
 
        session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4419
 
    else
4420
 
        session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4421
 
 
4422
 
    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4423
 
        session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4424
 
    else
4425
 
        session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4426
 
 
4427
 
    if (slave_allow_batching)
4428
 
      session->options|= OPTION_ALLOW_BATCH;
4429
 
    else
4430
 
      session->options&= ~OPTION_ALLOW_BATCH;
4431
 
 
4432
 
    /* A small test to verify that objects have consistent types */
4433
 
    assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4434
 
 
4435
 
    /*
4436
 
      Now we are in a statement and will stay in a statement until we
4437
 
      see a STMT_END_F.
4438
 
 
4439
 
      We set this flag here, before actually applying any rows, in
4440
 
      case the SQL thread is stopped and we need to detect that we're
4441
 
      inside a statement and halting abruptly might cause problems
4442
 
      when restarting.
4443
 
     */
4444
 
    const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
4445
 
 
4446
 
     if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
4447
 
      set_flags(COMPLETE_ROWS_F);
4448
 
 
4449
 
    /*
4450
 
      Set tables write and read sets.
4451
 
 
4452
 
      Read_set contains all slave columns (in case we are going to fetch
4453
 
      a complete record from slave)
4454
 
 
4455
 
      Write_set equals the m_cols bitmap sent from master but it can be
4456
 
      longer if slave has extra columns.
4457
 
     */
4458
 
 
4459
 
    bitmap_set_all(table->read_set);
4460
 
    bitmap_set_all(table->write_set);
4461
 
    if (!get_flags(COMPLETE_ROWS_F))
4462
 
      bitmap_intersect(table->write_set,&m_cols);
4463
 
 
4464
 
    this->slave_exec_mode= slave_exec_mode_options; // fix the mode
4465
 
 
4466
 
    // Do event specific preparations
4467
 
    error= do_before_row_operations(rli);
4468
 
 
4469
 
    // row processing loop
4470
 
 
4471
 
    while (error == 0 && m_curr_row < m_rows_end)
4472
 
    {
4473
 
      /* in_use can have been set to NULL in close_tables_for_reopen */
4474
 
      Session* old_session= table->in_use;
4475
 
      if (!table->in_use)
4476
 
        table->in_use= session;
4477
 
 
4478
 
      error= do_exec_row(rli);
4479
 
 
4480
 
      table->in_use = old_session;
4481
 
      switch (error)
4482
 
      {
4483
 
      case 0:
4484
 
        break;
4485
 
      /*
4486
 
        The following list of "idempotent" errors
4487
 
        means that an error from the list might happen
4488
 
        because of idempotent (more than once)
4489
 
        applying of a binlog file.
4490
 
        Notice, that binlog has a  ddl operation its
4491
 
        second applying may cause
4492
 
 
4493
 
        case HA_ERR_TABLE_DEF_CHANGED:
4494
 
        case HA_ERR_CANNOT_ADD_FOREIGN:
4495
 
 
4496
 
        which are not included into to the list.
4497
 
      */
4498
 
      case HA_ERR_RECORD_CHANGED:
4499
 
      case HA_ERR_RECORD_DELETED:
4500
 
      case HA_ERR_KEY_NOT_FOUND:
4501
 
      case HA_ERR_END_OF_FILE:
4502
 
      case HA_ERR_FOUND_DUPP_KEY:
4503
 
      case HA_ERR_FOUND_DUPP_UNIQUE:
4504
 
      case HA_ERR_FOREIGN_DUPLICATE_KEY:
4505
 
      case HA_ERR_NO_REFERENCED_ROW:
4506
 
      case HA_ERR_ROW_IS_REFERENCED:
4507
 
        if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
4508
 
        {
4509
 
          if (global_system_variables.log_warnings)
4510
 
            slave_rows_error_report(WARNING_LEVEL, error, rli, session, table,
4511
 
                                    get_type_str(),
4512
 
                                    RPL_LOG_NAME, (ulong) log_pos);
4513
 
          error= 0;
4514
 
        }
4515
 
        break;
4516
 
 
4517
 
      default:
4518
 
        session->is_slave_error= 1;
4519
 
        break;
4520
 
      }
4521
 
 
4522
 
      /*
4523
 
       If m_curr_row_end  was not set during event execution (e.g., because
4524
 
       of errors) we can't proceed to the next row. If the error is transient
4525
 
       (i.e., error==0 at this point) we must call unpack_current_row() to set
4526
 
       m_curr_row_end.
4527
 
      */
4528
 
      if (!m_curr_row_end && !error)
4529
 
        unpack_current_row(rli, &m_cols);
4530
 
 
4531
 
      // at this moment m_curr_row_end should be set
4532
 
      assert(error || m_curr_row_end != NULL);
4533
 
      assert(error || m_curr_row < m_curr_row_end);
4534
 
      assert(error || m_curr_row_end <= m_rows_end);
4535
 
 
4536
 
      m_curr_row= m_curr_row_end;
4537
 
 
4538
 
    } // row processing loop
4539
 
 
4540
 
    error= do_after_row_operations(rli, error);
4541
 
    if (!cache_stmt)
4542
 
    {
4543
 
      session->options|= OPTION_KEEP_LOG;
4544
 
    }
4545
 
  } // if (table)
4546
 
 
4547
 
  /*
4548
 
    We need to delay this clear until here bacause unpack_current_row() uses
4549
 
    master-side table definitions stored in rli.
4550
 
  */
4551
 
  if (rli->tables_to_lock && get_flags(STMT_END_F))
4552
 
    const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4553
 
  /* reset OPTION_ALLOW_BATCH as not affect later events */
4554
 
  session->options&= ~OPTION_ALLOW_BATCH;
4555
 
 
4556
 
  if (error)
4557
 
  {                     /* error has occured during the transaction */
4558
 
    slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
4559
 
                            get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
4560
 
  }
4561
 
  if (error)
4562
 
  {
4563
 
    /*
4564
 
      If one day we honour --skip-slave-errors in row-based replication, and
4565
 
      the error should be skipped, then we would clear mappings, rollback,
4566
 
      close tables, but the slave SQL thread would not stop and then may
4567
 
      assume the mapping is still available, the tables are still open...
4568
 
      So then we should clear mappings/rollback/close here only if this is a
4569
 
      STMT_END_F.
4570
 
      For now we code, knowing that error is not skippable and so slave SQL
4571
 
      thread is certainly going to stop.
4572
 
      rollback at the caller along with sbr.
4573
 
    */
4574
 
    const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
4575
 
    session->is_slave_error= 1;
4576
 
    return(error);
4577
 
  }
4578
 
 
4579
 
  /*
4580
 
    This code would ideally be placed in do_update_pos() instead, but
4581
 
    since we have no access to table there, we do the setting of
4582
 
    last_event_start_time here instead.
4583
 
  */
4584
 
  if (table && (table->s->primary_key == MAX_KEY) &&
4585
 
      !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
4586
 
  {
4587
 
    /*
4588
 
      ------------ Temporary fix until WL#2975 is implemented ---------
4589
 
 
4590
 
      This event is not the last one (no STMT_END_F). If we stop now
4591
 
      (in case of terminate_slave_thread()), how will we restart? We
4592
 
      have to restart from Table_map_log_event, but as this table is
4593
 
      not transactional, the rows already inserted will still be
4594
 
      present, and idempotency is not guaranteed (no PK) so we risk
4595
 
      that repeating leads to double insert. So we desperately try to
4596
 
      continue, hope we'll eventually leave this buggy situation (by
4597
 
      executing the final Rows_log_event). If we are in a hopeless
4598
 
      wait (reached end of last relay log and nothing gets appended
4599
 
      there), we timeout after one minute, and notify DBA about the
4600
 
      problem.  When WL#2975 is implemented, just remove the member
4601
 
      Relay_log_info::last_event_start_time and all its occurrences.
4602
 
    */
4603
 
    time_t t= time(0);
4604
 
 
4605
 
    /* don't trust time() all the time */
4606
 
    if (t == (time_t)-1)
4607
 
      return (-1);
4608
 
    const_cast<Relay_log_info*>(rli)->last_event_start_time= time(0);
4609
 
  }
4610
 
 
4611
 
  return(0);
4612
 
}
4613
 
 
4614
 
Log_event::enum_skip_reason
4615
 
Rows_log_event::do_shall_skip(Relay_log_info *rli)
4616
 
{
4617
 
  /*
4618
 
    If the slave skip counter is 1 and this event does not end a
4619
 
    statement, then we should not start executing on the next event.
4620
 
    Otherwise, we defer the decision to the normal skipping logic.
4621
 
  */
4622
 
  if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
4623
 
    return Log_event::EVENT_SKIP_IGNORE;
4624
 
  else
4625
 
    return Log_event::do_shall_skip(rli);
4626
 
}
4627
 
 
4628
 
int
4629
 
Rows_log_event::do_update_pos(Relay_log_info *rli)
4630
 
{
4631
 
  int error= 0;
4632
 
 
4633
 
  if (get_flags(STMT_END_F))
4634
 
  {
4635
 
    /*
4636
 
      If this event is not in a transaction, the call below will, if some
4637
 
      transactional storage engines are involved, commit the statement into
4638
 
      them and flush the pending event to binlog.
4639
 
      If this event is in a transaction, the call will do nothing, but a
4640
 
      Xid_log_event will come next which will, if some transactional engines
4641
 
      are involved, commit the transaction and flush the pending event to the
4642
 
      binlog.
4643
 
    */
4644
 
    error= ha_autocommit_or_rollback(session, 0);
4645
 
 
4646
 
    /*
4647
 
      Now what if this is not a transactional engine? we still need to
4648
 
      flush the pending event to the binlog; we did it with
4649
 
      session->binlog_flush_pending_rows_event(). Note that we imitate
4650
 
      what is done for real queries: a call to
4651
 
      ha_autocommit_or_rollback() (sometimes only if involves a
4652
 
      transactional engine), and a call to be sure to have the pending
4653
 
      event flushed.
4654
 
    */
4655
 
 
4656
 
    rli->cleanup_context(session, 0);
4657
 
    if (error == 0)
4658
 
    {
4659
 
      /*
4660
 
        Indicate that a statement is finished.
4661
 
        Step the group log position if we are not in a transaction,
4662
 
        otherwise increase the event log position.
4663
 
       */
4664
 
      rli->stmt_done(log_pos, when);
4665
 
 
4666
 
      /*
4667
 
        Clear any errors pushed in session->net.last_err* if for example "no key
4668
 
        found" (as this is allowed). This is a safety measure; apparently
4669
 
        those errors (e.g. when executing a Delete_rows_log_event of a
4670
 
        non-existing row, like in rpl_row_mystery22.test,
4671
 
        session->net.last_error = "Can't find record in 't1'" and last_errno=1032)
4672
 
        do not become visible. We still prefer to wipe them out.
4673
 
      */
4674
 
      session->clear_error();
4675
 
    }
4676
 
    else
4677
 
      rli->report(ERROR_LEVEL, error,
4678
 
                  _("Error in %s event: commit of row events failed, "
4679
 
                    "table `%s`.`%s`"),
4680
 
                  get_type_str(), m_table->s->db.str,
4681
 
                  m_table->s->table_name.str);
4682
 
  }
4683
 
  else
4684
 
  {
4685
 
    rli->inc_event_relay_log_pos();
4686
 
  }
4687
 
 
4688
 
  return(error);
4689
 
}
4690
 
 
4691
 
bool Rows_log_event::write_data_header(IO_CACHE *file)
4692
 
{
4693
 
  unsigned char buf[ROWS_HEADER_LEN];   // No need to init the buffer
4694
 
  assert(m_table_id != UINT32_MAX);
4695
 
  int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
4696
 
  int2store(buf + RW_FLAGS_OFFSET, m_flags);
4697
 
  return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
4698
 
}
4699
 
 
4700
 
bool Rows_log_event::write_data_body(IO_CACHE*file)
4701
 
{
4702
 
  /*
4703
 
     Note that this should be the number of *bits*, not the number of
4704
 
     bytes.
4705
 
  */
4706
 
  unsigned char sbuf[sizeof(m_width)];
4707
 
  my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
4708
 
  bool res= false;
4709
 
  unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
4710
 
  assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
4711
 
 
4712
 
  res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
4713
 
 
4714
 
  res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
4715
 
                              no_bytes_in_map(&m_cols));
4716
 
  /*
4717
 
    TODO[refactor write]: Remove the "down cast" here (and elsewhere).
4718
 
   */
4719
 
  if (get_type_code() == UPDATE_ROWS_EVENT)
4720
 
  {
4721
 
    res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
4722
 
                                no_bytes_in_map(&m_cols_ai));
4723
 
  }
4724
 
  res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
4725
 
 
4726
 
  return res;
4727
 
 
4728
 
}
4729
 
 
4730
 
 
4731
 
void Rows_log_event::pack_info(Protocol *protocol)
4732
 
{
4733
 
  char buf[256];
4734
 
  char const *const flagstr=
4735
 
    get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
4736
 
  size_t bytes= snprintf(buf, sizeof(buf),
4737
 
                         "table_id: %lu%s", m_table_id, flagstr);
4738
 
  protocol->store(buf, bytes, &my_charset_bin);
4739
 
}
4740
 
 
4741
 
 
4742
 
/**************************************************************************
4743
 
        Table_map_log_event member functions and support functions
4744
 
**************************************************************************/
4745
 
 
4746
 
/**
4747
 
  @page How replication of field metadata works.
4748
 
 
4749
 
  When a table map is created, the master first calls
4750
 
  Table_map_log_event::save_field_metadata() which calculates how many
4751
 
  values will be in the field metadata. Only those fields that require the
4752
 
  extra data are added. The method also loops through all of the fields in
4753
 
  the table calling the method Field::save_field_metadata() which returns the
4754
 
  values for the field that will be saved in the metadata and replicated to
4755
 
  the slave. Once all fields have been processed, the table map is written to
4756
 
  the binlog adding the size of the field metadata and the field metadata to
4757
 
  the end of the body of the table map.
4758
 
 
4759
 
  When a table map is read on the slave, the field metadata is read from the
4760
 
  table map and passed to the table_def class constructor which saves the
4761
 
  field metadata from the table map into an array based on the type of the
4762
 
  field. Field metadata values not present (those fields that do not use extra
4763
 
  data) in the table map are initialized as zero (0). The array size is the
4764
 
  same as the columns for the table on the slave.
4765
 
 
4766
 
  Additionally, values saved for field metadata on the master are saved as a
4767
 
  string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
4768
 
  to store the information. In cases where values require multiple bytes
4769
 
  (e.g. values > 255), the endian-safe methods are used to properly encode
4770
 
  the values on the master and decode them on the slave. When the field
4771
 
  metadata values are captured on the slave, they are stored in an array of
4772
 
  type uint16_t. This allows the least number of casts to prevent casting bugs
4773
 
  when the field metadata is used in comparisons of field attributes. When
4774
 
  the field metadata is used for calculating addresses in pointer math, the
4775
 
  type used is uint32_t.
4776
 
*/
4777
 
 
4778
 
/**
4779
 
  Save the field metadata based on the real_type of the field.
4780
 
  The metadata saved depends on the type of the field. Some fields
4781
 
  store a single byte for pack_length() while others store two bytes
4782
 
  for field_length (max length).
4783
 
 
4784
 
  @retval  0  Ok.
4785
 
 
4786
 
  @todo
4787
 
  We may want to consider changing the encoding of the information.
4788
 
  Currently, the code attempts to minimize the number of bytes written to
4789
 
  the tablemap. There are at least two other alternatives; 1) using
4790
 
  net_store_length() to store the data allowing it to choose the number of
4791
 
  bytes that are appropriate thereby making the code much easier to
4792
 
  maintain (only 1 place to change the encoding), or 2) use a fixed number
4793
 
  of bytes for each field. The problem with option 1 is that net_store_length()
4794
 
  will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
4795
 
  for fields like CHAR which can be no larger than 255 characters, the method
4796
 
  will use 3 bytes when the value is > 250. Further, every value that is
4797
 
  encoded using 2 parts (e.g., pack_length, field_length) will be numerically
4798
 
  > 250 therefore will use 3 bytes for eah value. The problem with option 2
4799
 
  is less wasteful for space but does waste 1 byte for every field that does
4800
 
  not encode 2 parts.
4801
 
*/
4802
 
int Table_map_log_event::save_field_metadata()
4803
 
{
4804
 
  int index= 0;
4805
 
  for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
4806
 
    index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
4807
 
  return(index);
4808
 
}
4809
 
 
4810
 
/*
4811
 
  Constructor used to build an event for writing to the binary log.
4812
 
  Mats says tbl->s lives longer than this event so it's ok to copy pointers
4813
 
  (tbl->s->db etc) and not pointer content.
4814
 
 */
4815
 
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl,
4816
 
                                         ulong tid, bool, uint16_t flags)
4817
 
  : Log_event(session, 0, true),
4818
 
    m_table(tbl),
4819
 
    m_dbnam(tbl->s->db.str),
4820
 
    m_dblen(m_dbnam ? tbl->s->db.length : 0),
4821
 
    m_tblnam(tbl->s->table_name.str),
4822
 
    m_tbllen(tbl->s->table_name.length),
4823
 
    m_colcnt(tbl->s->fields),
4824
 
    m_memory(NULL),
4825
 
    m_table_id(tid),
4826
 
    m_flags(flags),
4827
 
    m_data_size(0),
4828
 
    m_field_metadata(0),
4829
 
    m_field_metadata_size(0),
4830
 
    m_null_bits(0),
4831
 
    m_meta_memory(NULL)
4832
 
{
4833
 
  assert(m_table_id != UINT32_MAX);
4834
 
  /*
4835
 
    In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
4836
 
    table.cc / alloc_table_share():
4837
 
      Use the fact the key is db/0/table_name/0
4838
 
    As we rely on this let's assert it.
4839
 
  */
4840
 
  assert((tbl->s->db.str == 0) ||
4841
 
              (tbl->s->db.str[tbl->s->db.length] == 0));
4842
 
  assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
4843
 
 
4844
 
 
4845
 
  m_data_size=  TABLE_MAP_HEADER_LEN;
4846
 
  m_data_size+= m_dblen + 2;    // Include length and terminating \0
4847
 
  m_data_size+= m_tbllen + 2;   // Include length and terminating \0
4848
 
  m_data_size+= 1 + m_colcnt;   // COLCNT and column types
4849
 
 
4850
 
  /* If malloc fails, caught in is_valid() */
4851
 
  if ((m_memory= (unsigned char*) malloc(m_colcnt)))
4852
 
  {
4853
 
    m_coltype= reinterpret_cast<unsigned char*>(m_memory);
4854
 
    for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4855
 
      m_coltype[i]= m_table->field[i]->type();
4856
 
  }
4857
 
 
4858
 
  /*
4859
 
    Calculate a bitmap for the results of maybe_null() for all columns.
4860
 
    The bitmap is used to determine when there is a column from the master
4861
 
    that is not on the slave and is null and thus not in the row data during
4862
 
    replication.
4863
 
  */
4864
 
  uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
4865
 
  m_data_size+= num_null_bytes;
4866
 
  m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
4867
 
                                 &m_null_bits, num_null_bytes,
4868
 
                                 &m_field_metadata, (m_colcnt * 2),
4869
 
                                 NULL);
4870
 
 
4871
 
  memset(m_field_metadata, 0, (m_colcnt * 2));
4872
 
 
4873
 
  /*
4874
 
    Create an array for the field metadata and store it.
4875
 
  */
4876
 
  m_field_metadata_size= save_field_metadata();
4877
 
  assert(m_field_metadata_size <= (m_colcnt * 2));
4878
 
 
4879
 
  /*
4880
 
    Now set the size of the data to the size of the field metadata array
4881
 
    plus one or two bytes for number of elements in the field metadata array.
4882
 
  */
4883
 
  if (m_field_metadata_size > 255)
4884
 
    m_data_size+= m_field_metadata_size + 2;
4885
 
  else
4886
 
    m_data_size+= m_field_metadata_size + 1;
4887
 
 
4888
 
  memset(m_null_bits, 0, num_null_bytes);
4889
 
  for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4890
 
    if (m_table->field[i]->maybe_null())
4891
 
      m_null_bits[(i / 8)]+= 1 << (i % 8);
4892
 
 
4893
 
}
4894
 
 
4895
 
 
4896
 
/*
4897
 
  Constructor used by slave to read the event from the binary log.
4898
 
 */
4899
 
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
4900
 
                                         const Format_description_log_event
4901
 
                                         *description_event)
4902
 
 
4903
 
  : Log_event(buf, description_event),
4904
 
    m_table(NULL),
4905
 
    m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
4906
 
    m_colcnt(0), m_coltype(0),
4907
 
    m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
4908
 
    m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
4909
 
    m_null_bits(0), m_meta_memory(NULL)
4910
 
{
4911
 
  unsigned int bytes_read= 0;
4912
 
 
4913
 
  uint8_t common_header_len= description_event->common_header_len;
4914
 
  uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
4915
 
 
4916
 
  /* Read the post-header */
4917
 
  const char *post_start= buf + common_header_len;
4918
 
 
4919
 
  post_start+= TM_MAPID_OFFSET;
4920
 
  if (post_header_len == 6)
4921
 
  {
4922
 
    /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4923
 
    m_table_id= uint4korr(post_start);
4924
 
    post_start+= 4;
4925
 
  }
4926
 
  else
4927
 
  {
4928
 
    assert(post_header_len == TABLE_MAP_HEADER_LEN);
4929
 
    m_table_id= (ulong) uint6korr(post_start);
4930
 
    post_start+= TM_FLAGS_OFFSET;
4931
 
  }
4932
 
 
4933
 
  assert(m_table_id != UINT32_MAX);
4934
 
 
4935
 
  m_flags= uint2korr(post_start);
4936
 
 
4937
 
  /* Read the variable part of the event */
4938
 
  const char *const vpart= buf + common_header_len + post_header_len;
4939
 
 
4940
 
  /* Extract the length of the various parts from the buffer */
4941
 
  unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
4942
 
  m_dblen= *(unsigned char*) ptr_dblen;
4943
 
 
4944
 
  /* Length of database name + counter + terminating null */
4945
 
  unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
4946
 
  m_tbllen= *(unsigned char*) ptr_tbllen;
4947
 
 
4948
 
  /* Length of table name + counter + terminating null */
4949
 
  unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
4950
 
  unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
4951
 
  m_colcnt= net_field_length(&ptr_after_colcnt);
4952
 
 
4953
 
  /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
4954
 
  m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
4955
 
                                     &m_dbnam, (uint) m_dblen + 1,
4956
 
                                     &m_tblnam, (uint) m_tbllen + 1,
4957
 
                                     &m_coltype, (uint) m_colcnt,
4958
 
                                     NULL);
4959
 
 
4960
 
  if (m_memory)
4961
 
  {
4962
 
    /* Copy the different parts into their memory */
4963
 
    strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen  + 1, m_dblen + 1);
4964
 
    strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
4965
 
    memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
4966
 
 
4967
 
    ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
4968
 
    bytes_read= ptr_after_colcnt - (unsigned char *)buf;
4969
 
    if (bytes_read < event_len)
4970
 
    {
4971
 
      m_field_metadata_size= net_field_length(&ptr_after_colcnt);
4972
 
      assert(m_field_metadata_size <= (m_colcnt * 2));
4973
 
      uint32_t num_null_bytes= (m_colcnt + 7) / 8;
4974
 
      m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
4975
 
                                     &m_null_bits, num_null_bytes,
4976
 
                                     &m_field_metadata, m_field_metadata_size,
4977
 
                                     NULL);
4978
 
      memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
4979
 
      ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
4980
 
      memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
4981
 
    }
4982
 
  }
4983
 
 
4984
 
  return;
4985
 
}
4986
 
 
4987
 
Table_map_log_event::~Table_map_log_event()
4988
 
{
4989
 
  free(m_meta_memory);
4990
 
  free(m_memory);
4991
 
}
4992
 
 
4993
 
/*
4994
 
  Return value is an error code, one of:
4995
 
 
4996
 
      -1     Failure to open table   [from open_tables()]
4997
 
       0     Success
4998
 
       1     No room for more tables [from set_table()]
4999
 
       2     Out of memory           [from set_table()]
5000
 
       3     Wrong table definition
5001
 
       4     Daisy-chaining RBR with SBR not possible
5002
 
 */
5003
 
 
5004
 
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5005
 
{
5006
 
  RPL_TableList *table_list;
5007
 
  char *db_mem, *tname_mem;
5008
 
  Query_id &query_id= Query_id::get_query_id();
5009
 
  void *memory;
5010
 
  assert(rli->sql_session == session);
5011
 
 
5012
 
  /* Step the query id to mark what columns that are actually used. */
5013
 
  session->query_id= query_id.next();
5014
 
 
5015
 
  if (!(memory= my_multi_malloc(MYF(MY_WME),
5016
 
                                &table_list, (uint) sizeof(RPL_TableList),
5017
 
                                &db_mem, (uint) NAME_LEN + 1,
5018
 
                                &tname_mem, (uint) NAME_LEN + 1,
5019
 
                                NULL)))
5020
 
    return(HA_ERR_OUT_OF_MEM);
5021
 
 
5022
 
  memset(table_list, 0, sizeof(*table_list));
5023
 
  table_list->db = db_mem;
5024
 
  table_list->alias= table_list->table_name = tname_mem;
5025
 
  table_list->lock_type= TL_WRITE;
5026
 
  table_list->next_global= table_list->next_local= 0;
5027
 
  table_list->table_id= m_table_id;
5028
 
  table_list->updating= 1;
5029
 
  strcpy(table_list->db, m_dbnam);
5030
 
  strcpy(table_list->table_name, m_tblnam);
5031
 
 
5032
 
  int error= 0;
5033
 
 
5034
 
  {
5035
 
    /*
5036
 
      open_tables() reads the contents of session->lex, so they must be
5037
 
      initialized, so we should call lex_start(); to be even safer, we
5038
 
      call mysql_init_query() which does a more complete set of inits.
5039
 
    */
5040
 
    lex_start(session);
5041
 
    session->reset_for_next_command();
5042
 
 
5043
 
    /*
5044
 
      Open the table if it is not already open and add the table to
5045
 
      table map.  Note that for any table that should not be
5046
 
      replicated, a filter is needed.
5047
 
 
5048
 
      The creation of a new TableList is used to up-cast the
5049
 
      table_list consisting of RPL_TableList items. This will work
5050
 
      since the only case where the argument to open_tables() is
5051
 
      changed, is when session->lex->query_tables == table_list, i.e.,
5052
 
      when the statement requires prelocking. Since this is not
5053
 
      executed when a statement is executed, this case will not occur.
5054
 
      As a precaution, an assertion is added to ensure that the bad
5055
 
      case is not a fact.
5056
 
 
5057
 
      Either way, the memory in the list is *never* released
5058
 
      internally in the open_tables() function, hence we take a copy
5059
 
      of the pointer to make sure that it's not lost.
5060
 
    */
5061
 
    uint32_t count;
5062
 
    assert(session->lex->query_tables != table_list);
5063
 
    TableList *tmp_table_list= table_list;
5064
 
    if ((error= open_tables(session, &tmp_table_list, &count, 0)))
5065
 
    {
5066
 
      if (session->is_slave_error || session->is_fatal_error)
5067
 
      {
5068
 
        /*
5069
 
          Error reporting borrowed from Query_log_event with many excessive
5070
 
          simplifications (we don't honour --slave-skip-errors)
5071
 
        */
5072
 
        uint32_t actual_error= session->main_da.sql_errno();
5073
 
        rli->report(ERROR_LEVEL, actual_error,
5074
 
                    _("Error '%s' on opening table `%s`.`%s`"),
5075
 
                    (actual_error
5076
 
                     ? session->main_da.message()
5077
 
                     : _("unexpected success or fatal error")),
5078
 
                    table_list->db, table_list->table_name);
5079
 
        session->is_slave_error= 1;
5080
 
      }
5081
 
      goto err;
5082
 
    }
5083
 
 
5084
 
    m_table= table_list->table;
5085
 
 
5086
 
    /*
5087
 
      This will fail later otherwise, the 'in_use' field should be
5088
 
      set to the current thread.
5089
 
    */
5090
 
    assert(m_table->in_use);
5091
 
 
5092
 
    /*
5093
 
      Use placement new to construct the table_def instance in the
5094
 
      memory allocated for it inside table_list.
5095
 
 
5096
 
      The memory allocated by the table_def structure (i.e., not the
5097
 
      memory allocated *for* the table_def structure) is released
5098
 
      inside Relay_log_info::clear_tables_to_lock() by calling the
5099
 
      table_def destructor explicitly.
5100
 
    */
5101
 
    new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5102
 
         m_field_metadata, m_field_metadata_size, m_null_bits);
5103
 
    table_list->m_tabledef_valid= true;
5104
 
 
5105
 
    /*
5106
 
      We record in the slave's information that the table should be
5107
 
      locked by linking the table into the list of tables to lock.
5108
 
    */
5109
 
    table_list->next_global= table_list->next_local= rli->tables_to_lock;
5110
 
    const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5111
 
    const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5112
 
    /* 'memory' is freed in clear_tables_to_lock */
5113
 
  }
5114
 
 
5115
 
  return(error);
5116
 
 
5117
 
err:
5118
 
  free(memory);
5119
 
  return(error);
5120
 
}
5121
 
 
5122
 
Log_event::enum_skip_reason
5123
 
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5124
 
{
5125
 
  /*
5126
 
    If the slave skip counter is 1, then we should not start executing
5127
 
    on the next event.
5128
 
  */
5129
 
  return continue_group(rli);
5130
 
}
5131
 
 
5132
 
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5133
 
{
5134
 
  rli->inc_event_relay_log_pos();
5135
 
  return 0;
5136
 
}
5137
 
 
5138
 
 
5139
 
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5140
 
{
5141
 
  assert(m_table_id != UINT32_MAX);
5142
 
  unsigned char buf[TABLE_MAP_HEADER_LEN];
5143
 
  int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5144
 
  int2store(buf + TM_FLAGS_OFFSET, m_flags);
5145
 
  return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5146
 
}
5147
 
 
5148
 
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5149
 
{
5150
 
  assert(m_dbnam != NULL);
5151
 
  assert(m_tblnam != NULL);
5152
 
  /* We use only one byte per length for storage in event: */
5153
 
  assert(m_dblen < 128);
5154
 
  assert(m_tbllen < 128);
5155
 
 
5156
 
  unsigned char const dbuf[]= { (unsigned char) m_dblen };
5157
 
  unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5158
 
 
5159
 
  unsigned char cbuf[sizeof(m_colcnt)];
5160
 
  unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5161
 
  assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
5162
 
 
5163
 
  /*
5164
 
    Store the size of the field metadata.
5165
 
  */
5166
 
  unsigned char mbuf[sizeof(m_field_metadata_size)];
5167
 
  unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
5168
 
 
5169
 
  return (my_b_safe_write(file, dbuf,      sizeof(dbuf)) ||
5170
 
          my_b_safe_write(file, (const unsigned char*)m_dbnam,   m_dblen+1) ||
5171
 
          my_b_safe_write(file, tbuf,      sizeof(tbuf)) ||
5172
 
          my_b_safe_write(file, (const unsigned char*)m_tblnam,  m_tbllen+1) ||
5173
 
          my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
5174
 
          my_b_safe_write(file, m_coltype, m_colcnt) ||
5175
 
          my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
5176
 
          my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
5177
 
          my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
5178
 
 }
5179
 
 
5180
 
 
5181
 
/*
5182
 
  Print some useful information for the SHOW BINARY LOG information
5183
 
  field.
5184
 
 */
5185
 
 
5186
 
void Table_map_log_event::pack_info(Protocol *protocol)
5187
 
{
5188
 
    char buf[256];
5189
 
    size_t bytes= snprintf(buf, sizeof(buf),
5190
 
                           "table_id: %lu (%s.%s)",
5191
 
                           m_table_id, m_dbnam, m_tblnam);
5192
 
    protocol->store(buf, bytes, &my_charset_bin);
5193
 
}
5194
 
 
5195
 
 
5196
 
/**************************************************************************
5197
 
        Write_rows_log_event member functions
5198
 
**************************************************************************/
5199
 
 
5200
 
/*
5201
 
  Constructor used to build an event for writing to the binary log.
5202
 
 */
5203
 
Write_rows_log_event::Write_rows_log_event(Session *session_arg, Table *tbl_arg,
5204
 
                                           ulong tid_arg,
5205
 
                                           bool is_transactional)
5206
 
  : Rows_log_event(session_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
5207
 
{
5208
 
}
5209
 
 
5210
 
/*
5211
 
  Constructor used by slave to read the event from the binary log.
5212
 
 */
5213
 
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
5214
 
                                           const Format_description_log_event
5215
 
                                           *description_event)
5216
 
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
5217
 
{
5218
 
}
5219
 
 
5220
 
int
5221
 
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5222
 
{
5223
 
  int error= 0;
5224
 
 
5225
 
  /**
5226
 
     todo: to introduce a property for the event (handler?) which forces
5227
 
     applying the event in the replace (idempotent) fashion.
5228
 
  */
5229
 
  if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5230
 
  {
5231
 
    /*
5232
 
      We are using REPLACE semantics and not INSERT IGNORE semantics
5233
 
      when writing rows, that is: new rows replace old rows.  We need to
5234
 
      inform the storage engine that it should use this behaviour.
5235
 
    */
5236
 
 
5237
 
    /* Tell the storage engine that we are using REPLACE semantics. */
5238
 
    session->lex->duplicates= DUP_REPLACE;
5239
 
 
5240
 
    /*
5241
 
      Pretend we're executing a REPLACE command: this is needed for
5242
 
      InnoDB since it is not (properly) checking the
5243
 
      lex->duplicates flag.
5244
 
    */
5245
 
    session->lex->sql_command= SQLCOM_REPLACE;
5246
 
    /*
5247
 
       Do not raise the error flag in case of hitting to an unique attribute
5248
 
    */
5249
 
    m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
5250
 
  }
5251
 
 
5252
 
  m_table->file->ha_start_bulk_insert(0);
5253
 
  /*
5254
 
    We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
5255
 
    any TIMESTAMP column with data from the row but instead will use
5256
 
    the event's current time.
5257
 
    As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
5258
 
    columns, we know that all TIMESTAMP columns on slave will receive explicit
5259
 
    data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
5260
 
    When we allow a table without TIMESTAMP to be replicated to a table having
5261
 
    more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
5262
 
    column to be replicated into a BIGINT column and the slave's table has a
5263
 
    TIMESTAMP column, then the slave's TIMESTAMP column will take its value
5264
 
    from set_time() which we called earlier (consistent with SBR). And then in
5265
 
    some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
5266
 
    analyze if explicit data is provided for slave's TIMESTAMP columns).
5267
 
  */
5268
 
  m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
5269
 
 
5270
 
  return error;
5271
 
}
5272
 
 
5273
 
int
5274
 
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5275
 
                                              int error)
5276
 
{
5277
 
  int local_error= 0;
5278
 
  if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5279
 
  {
5280
 
    m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
5281
 
    m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
5282
 
    /*
5283
 
      resetting the extra with
5284
 
      table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
5285
 
      fires bug#27077
5286
 
      explanation: file->reset() performs this duty
5287
 
      ultimately. Still todo: fix
5288
 
    */
5289
 
  }
5290
 
  if ((local_error= m_table->file->ha_end_bulk_insert()))
5291
 
  {
5292
 
    m_table->file->print_error(local_error, MYF(0));
5293
 
  }
5294
 
  return error? error : local_error;
5295
 
}
5296
 
 
5297
 
 
5298
 
/*
5299
 
  Check if there are more UNIQUE keys after the given key.
5300
 
*/
5301
 
static int
5302
 
last_uniq_key(Table *table, uint32_t keyno)
5303
 
{
5304
 
  while (++keyno < table->s->keys)
5305
 
    if (table->key_info[keyno].flags & HA_NOSAME)
5306
 
      return 0;
5307
 
  return 1;
5308
 
}
5309
 
 
5310
 
/**
5311
 
   Check if an error is a duplicate key error.
5312
 
 
5313
 
   This function is used to check if an error code is one of the
5314
 
   duplicate key error, i.e., and error code for which it is sensible
5315
 
   to do a <code>get_dup_key()</code> to retrieve the duplicate key.
5316
 
 
5317
 
   @param errcode The error code to check.
5318
 
 
5319
 
   @return <code>true</code> if the error code is such that
5320
 
   <code>get_dup_key()</code> will return true, <code>false</code>
5321
 
   otherwise.
5322
 
 */
5323
 
bool
5324
 
is_duplicate_key_error(int errcode)
5325
 
{
5326
 
  switch (errcode)
5327
 
  {
5328
 
  case HA_ERR_FOUND_DUPP_KEY:
5329
 
  case HA_ERR_FOUND_DUPP_UNIQUE:
5330
 
    return true;
5331
 
  }
5332
 
  return false;
5333
 
}
5334
 
 
5335
 
/**
5336
 
  Write the current row into event's table.
5337
 
 
5338
 
  The row is located in the row buffer, pointed by @c m_curr_row member.
5339
 
  Number of columns of the row is stored in @c m_width member (it can be
5340
 
  different from the number of columns in the table to which we insert).
5341
 
  Bitmap @c m_cols indicates which columns are present in the row. It is assumed
5342
 
  that event's table is already open and pointed by @c m_table.
5343
 
 
5344
 
  If the same record already exists in the table it can be either overwritten
5345
 
  or an error is reported depending on the value of @c overwrite flag
5346
 
  (error reporting not yet implemented). Note that the matching record can be
5347
 
  different from the row we insert if we use primary keys to identify records in
5348
 
  the table.
5349
 
 
5350
 
  The row to be inserted can contain values only for selected columns. The
5351
 
  missing columns are filled with default values using @c prepare_record()
5352
 
  function. If a matching record is found in the table and @c overwritte is
5353
 
  true, the missing columns are taken from it.
5354
 
 
5355
 
  @param  rli   Relay log info (needed for row unpacking).
5356
 
  @param  overwrite
5357
 
                Shall we overwrite if the row already exists or signal
5358
 
                error (currently ignored).
5359
 
 
5360
 
  @returns Error code on failure, 0 on success.
5361
 
 
5362
 
  This method, if successful, sets @c m_curr_row_end pointer to point at the
5363
 
  next row in the rows buffer. This is done when unpacking the row to be
5364
 
  inserted.
5365
 
 
5366
 
  @note If a matching record is found, it is either updated using
5367
 
  @c ha_update_row() or first deleted and then new record written.
5368
 
*/
5369
 
 
5370
 
int
5371
 
Rows_log_event::write_row(const Relay_log_info *const rli,
5372
 
                          const bool overwrite)
5373
 
{
5374
 
  assert(m_table != NULL && session != NULL);
5375
 
 
5376
 
  Table *table= m_table;  // pointer to event's table
5377
 
  int error;
5378
 
  int keynum;
5379
 
  basic_string<unsigned char> key;
5380
 
 
5381
 
  /* fill table->record[0] with default values */
5382
 
 
5383
 
  /*
5384
 
     We only check if the columns have default values for non-NDB
5385
 
     engines, for NDB we ignore the check since updates are sent as
5386
 
     writes, causing errors when trying to prepare the record.
5387
 
 
5388
 
     TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
5389
 
     the engine should be able to set a flag that it want the default
5390
 
     values filled in and one flag to handle the case that the default
5391
 
     values should be checked. Maybe these two flags can be combined.
5392
 
  */
5393
 
  if ((error= prepare_record(table, &m_cols, m_width, true)))
5394
 
    return(error);
5395
 
 
5396
 
  /* unpack row into table->record[0] */
5397
 
  error= unpack_current_row(rli, &m_cols);
5398
 
 
5399
 
  // Temporary fix to find out why it fails [/Matz]
5400
 
  memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
5401
 
 
5402
 
  /*
5403
 
    Try to write record. If a corresponding record already exists in the table,
5404
 
    we try to change it using ha_update_row() if possible. Otherwise we delete
5405
 
    it and repeat the whole process again.
5406
 
 
5407
 
    TODO: Add safety measures against infinite looping.
5408
 
   */
5409
 
 
5410
 
  while ((error= table->file->ha_write_row(table->record[0])))
5411
 
  {
5412
 
    if (error == HA_ERR_LOCK_DEADLOCK ||
5413
 
        error == HA_ERR_LOCK_WAIT_TIMEOUT ||
5414
 
        (keynum= table->file->get_dup_key(error)) < 0 ||
5415
 
        !overwrite)
5416
 
    {
5417
 
      /*
5418
 
        Deadlock, waiting for lock or just an error from the handler
5419
 
        such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
5420
 
        Retrieval of the duplicate key number may fail
5421
 
        - either because the error was not "duplicate key" error
5422
 
        - or because the information which key is not available
5423
 
      */
5424
 
      table->file->print_error(error, MYF(0));
5425
 
      return(error);
5426
 
    }
5427
 
    /*
5428
 
       We need to retrieve the old row into record[1] to be able to
5429
 
       either update or delete the offending record.  We either:
5430
 
 
5431
 
       - use rnd_pos() with a row-id (available as dupp_row) to the
5432
 
         offending row, if that is possible (MyISAM and Blackhole), or else
5433
 
 
5434
 
       - use index_read_idx() with the key that is duplicated, to
5435
 
         retrieve the offending row.
5436
 
     */
5437
 
    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
5438
 
    {
5439
 
      if (table->file->inited && (error= table->file->ha_index_end()))
5440
 
        return(error);
5441
 
      if ((error= table->file->ha_rnd_init(false)))
5442
 
        return(error);
5443
 
 
5444
 
      error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
5445
 
      table->file->ha_rnd_end();
5446
 
      if (error)
5447
 
      {
5448
 
        table->file->print_error(error, MYF(0));
5449
 
        return(error);
5450
 
      }
5451
 
    }
5452
 
    else
5453
 
    {
5454
 
      if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
5455
 
      {
5456
 
        return(my_errno);
5457
 
      }
5458
 
 
5459
 
      key.reserve(table->s->max_unique_length);
5460
 
 
5461
 
      key_copy(key, table->record[0], table->key_info + keynum, 0);
5462
 
      error= table->file->index_read_idx_map(table->record[1], keynum,
5463
 
                                             key.data(),
5464
 
                                             HA_WHOLE_KEY,
5465
 
                                             HA_READ_KEY_EXACT);
5466
 
      if (error)
5467
 
      {
5468
 
        table->file->print_error(error, MYF(0));
5469
 
        return(error);
5470
 
      }
5471
 
    }
5472
 
 
5473
 
    /*
5474
 
       Now, record[1] should contain the offending row.  That
5475
 
       will enable us to update it or, alternatively, delete it (so
5476
 
       that we can insert the new row afterwards).
5477
 
     */
5478
 
 
5479
 
    /*
5480
 
      If row is incomplete we will use the record found to fill
5481
 
      missing columns.
5482
 
    */
5483
 
    if (!get_flags(COMPLETE_ROWS_F))
5484
 
    {
5485
 
      restore_record(table,record[1]);
5486
 
      error= unpack_current_row(rli, &m_cols);
5487
 
    }
5488
 
 
5489
 
    /*
5490
 
       REPLACE is defined as either INSERT or DELETE + INSERT.  If
5491
 
       possible, we can replace it with an UPDATE, but that will not
5492
 
       work on InnoDB if FOREIGN KEY checks are necessary.
5493
 
 
5494
 
       I (Matz) am not sure of the reason for the last_uniq_key()
5495
 
       check as, but I'm guessing that it's something along the
5496
 
       following lines.
5497
 
 
5498
 
       Suppose that we got the duplicate key to be a key that is not
5499
 
       the last unique key for the table and we perform an update:
5500
 
       then there might be another key for which the unique check will
5501
 
       fail, so we're better off just deleting the row and inserting
5502
 
       the correct row.
5503
 
     */
5504
 
    if (last_uniq_key(table, keynum) &&
5505
 
        !table->file->referenced_by_foreign_key())
5506
 
    {
5507
 
      error=table->file->ha_update_row(table->record[1],
5508
 
                                       table->record[0]);
5509
 
      switch (error) {
5510
 
 
5511
 
      case HA_ERR_RECORD_IS_THE_SAME:
5512
 
        error= 0;
5513
 
 
5514
 
      case 0:
5515
 
        break;
5516
 
 
5517
 
      default:
5518
 
        table->file->print_error(error, MYF(0));
5519
 
      }
5520
 
 
5521
 
      return(error);
5522
 
    }
5523
 
    else
5524
 
    {
5525
 
      if ((error= table->file->ha_delete_row(table->record[1])))
5526
 
      {
5527
 
        table->file->print_error(error, MYF(0));
5528
 
        return(error);
5529
 
      }
5530
 
      /* Will retry ha_write_row() with the offending row removed. */
5531
 
    }
5532
 
  }
5533
 
 
5534
 
  return(error);
5535
 
}
5536
 
 
5537
 
 
5538
 
int Rows_log_event::unpack_current_row(const Relay_log_info *const rli,
5539
 
                                         MY_BITMAP const *cols)
5540
 
{
5541
 
  assert(m_table);
5542
 
  ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
5543
 
  int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
5544
 
                                 &m_curr_row_end, &m_master_reclength);
5545
 
  if (m_curr_row_end > m_rows_end)
5546
 
    my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
5547
 
  ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
5548
 
  return result;
5549
 
}
5550
 
 
5551
 
 
5552
 
int
5553
 
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5554
 
{
5555
 
  assert(m_table != NULL);
5556
 
  int error=
5557
 
    write_row(rli,        /* if 1 then overwrite */
5558
 
              bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
5559
 
 
5560
 
  if (error && !session->is_error())
5561
 
  {
5562
 
    assert(0);
5563
 
    my_error(ER_UNKNOWN_ERROR, MYF(0));
5564
 
  }
5565
 
 
5566
 
  return error;
5567
 
}
5568
 
 
5569
 
 
5570
 
/**************************************************************************
5571
 
        Delete_rows_log_event member functions
5572
 
**************************************************************************/
5573
 
 
5574
 
/*
5575
 
  Compares table->record[0] and table->record[1]
5576
 
 
5577
 
  Returns TRUE if different.
5578
 
*/
5579
 
static bool record_compare(Table *table)
5580
 
{
5581
 
  /*
5582
 
    Need to set the X bit and the filler bits in both records since
5583
 
    there are engines that do not set it correctly.
5584
 
 
5585
 
    In addition, since MyISAM checks that one hasn't tampered with the
5586
 
    record, it is necessary to restore the old bytes into the record
5587
 
    after doing the comparison.
5588
 
 
5589
 
    TODO[record format ndb]: Remove it once NDB returns correct
5590
 
    records. Check that the other engines also return correct records.
5591
 
   */
5592
 
  bool result= false;
5593
 
  unsigned char saved_x[2], saved_filler[2];
5594
 
 
5595
 
  if (table->s->null_bytes > 0)
5596
 
  {
5597
 
    for (int i = 0 ; i < 2 ; ++i)
5598
 
    {
5599
 
      saved_x[i]= table->record[i][0];
5600
 
      saved_filler[i]= table->record[i][table->s->null_bytes - 1];
5601
 
      table->record[i][0]|= 1U;
5602
 
      table->record[i][table->s->null_bytes - 1]|=
5603
 
        256U - (1U << table->s->last_null_bit_pos);
5604
 
    }
5605
 
  }
5606
 
 
5607
 
  if (table->s->blob_fields + table->s->varchar_fields == 0)
5608
 
  {
5609
 
    result= cmp_record(table,record[1]);
5610
 
    goto record_compare_exit;
5611
 
  }
5612
 
 
5613
 
  /* Compare null bits */
5614
 
  if (memcmp(table->null_flags,
5615
 
             table->null_flags+table->s->rec_buff_length,
5616
 
             table->s->null_bytes))
5617
 
  {
5618
 
    result= true;                               // Diff in NULL value
5619
 
    goto record_compare_exit;
5620
 
  }
5621
 
 
5622
 
  /* Compare updated fields */
5623
 
  for (Field **ptr=table->field ; *ptr ; ptr++)
5624
 
  {
5625
 
    if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
5626
 
    {
5627
 
      result= true;
5628
 
      goto record_compare_exit;
5629
 
    }
5630
 
  }
5631
 
 
5632
 
record_compare_exit:
5633
 
  /*
5634
 
    Restore the saved bytes.
5635
 
 
5636
 
    TODO[record format ndb]: Remove this code once NDB returns the
5637
 
    correct record format.
5638
 
  */
5639
 
  if (table->s->null_bytes > 0)
5640
 
  {
5641
 
    for (int i = 0 ; i < 2 ; ++i)
5642
 
    {
5643
 
      table->record[i][0]= saved_x[i];
5644
 
      table->record[i][table->s->null_bytes - 1]= saved_filler[i];
5645
 
    }
5646
 
  }
5647
 
 
5648
 
  return result;
5649
 
}
5650
 
 
5651
 
/**
5652
 
  Locate the current row in event's table.
5653
 
 
5654
 
  The current row is pointed by @c m_curr_row. Member @c m_width tells how many
5655
 
  columns are there in the row (this can be differnet from the number of columns
5656
 
  in the table). It is assumed that event's table is already open and pointed
5657
 
  by @c m_table.
5658
 
 
5659
 
  If a corresponding record is found in the table it is stored in
5660
 
  @c m_table->record[0]. Note that when record is located based on a primary
5661
 
  key, it is possible that the record found differs from the row being located.
5662
 
 
5663
 
  If no key is specified or table does not have keys, a table scan is used to
5664
 
  find the row. In that case the row should be complete and contain values for
5665
 
  all columns. However, it can still be shorter than the table, i.e. the table
5666
 
  can contain extra columns not present in the row. It is also possible that
5667
 
  the table has fewer columns than the row being located.
5668
 
 
5669
 
  @returns Error code on failure, 0 on success.
5670
 
 
5671
 
  @post In case of success @c m_table->record[0] contains the record found.
5672
 
  Also, the internal "cursor" of the table is positioned at the record found.
5673
 
 
5674
 
  @note If the engine allows random access of the records, a combination of
5675
 
  @c position() and @c rnd_pos() will be used.
5676
 
 */
5677
 
 
5678
 
int Rows_log_event::find_row(const Relay_log_info *rli)
5679
 
{
5680
 
  assert(m_table && m_table->in_use != NULL);
5681
 
 
5682
 
  Table *table= m_table;
5683
 
  int error;
5684
 
 
5685
 
  /* unpack row - missing fields get default values */
5686
 
  prepare_record(table, &m_cols, m_width, false/* don't check errors */);
5687
 
  error= unpack_current_row(rli, &m_cols);
5688
 
 
5689
 
  // Temporary fix to find out why it fails [/Matz]
5690
 
  memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
5691
 
 
5692
 
  if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5693
 
      table->s->primary_key < MAX_KEY)
5694
 
  {
5695
 
    /*
5696
 
      Use a more efficient method to fetch the record given by
5697
 
      table->record[0] if the engine allows it.  We first compute a
5698
 
      row reference using the position() member function (it will be
5699
 
      stored in table->file->ref) and the use rnd_pos() to position
5700
 
      the "cursor" (i.e., record[0] in this case) at the correct row.
5701
 
 
5702
 
      TODO: Add a check that the correct record has been fetched by
5703
 
      comparing with the original record. Take into account that the
5704
 
      record on the master and slave can be of different
5705
 
      length. Something along these lines should work:
5706
 
 
5707
 
      ADD>>>  store_record(table,record[1]);
5708
 
              int error= table->file->rnd_pos(table->record[0], table->file->ref);
5709
 
      ADD>>>  assert(memcmp(table->record[1], table->record[0],
5710
 
                                 table->s->reclength) == 0);
5711
 
    */
5712
 
 
5713
 
    int error= table->file->rnd_pos_by_record(table->record[0]);
5714
 
    table->file->ha_rnd_end();
5715
 
    if (error)
5716
 
    {
5717
 
      table->file->print_error(error, MYF(0));
5718
 
    }
5719
 
    return(error);
5720
 
  }
5721
 
 
5722
 
  // We can't use position() - try other methods.
5723
 
 
5724
 
  /*
5725
 
    Save copy of the record in table->record[1]. It might be needed
5726
 
    later if linear search is used to find exact match.
5727
 
   */
5728
 
  store_record(table,record[1]);
5729
 
 
5730
 
  if (table->s->keys > 0)
5731
 
  {
5732
 
    /* We have a key: search the table using the index */
5733
 
    if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
5734
 
    {
5735
 
      table->file->print_error(error, MYF(0));
5736
 
      goto err;
5737
 
    }
5738
 
 
5739
 
    /* Fill key data for the row */
5740
 
 
5741
 
    assert(m_key);
5742
 
    key_copy(m_key, table->record[0], table->key_info, 0);
5743
 
 
5744
 
    /*
5745
 
      We need to set the null bytes to ensure that the filler bit are
5746
 
      all set when returning.  There are storage engines that just set
5747
 
      the necessary bits on the bytes and don't set the filler bits
5748
 
      correctly.
5749
 
    */
5750
 
    my_ptrdiff_t const pos=
5751
 
      table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
5752
 
    table->record[0][pos]= 0xFF;
5753
 
 
5754
 
    if ((error= table->file->index_read_map(table->record[0], m_key,
5755
 
                                            HA_WHOLE_KEY,
5756
 
                                            HA_READ_KEY_EXACT)))
5757
 
    {
5758
 
      table->file->print_error(error, MYF(0));
5759
 
      table->file->ha_index_end();
5760
 
      goto err;
5761
 
    }
5762
 
 
5763
 
    /*
5764
 
      Below is a minor "optimization".  If the key (i.e., key number
5765
 
      0) has the HA_NOSAME flag set, we know that we have found the
5766
 
      correct record (since there can be no duplicates); otherwise, we
5767
 
      have to compare the record with the one found to see if it is
5768
 
      the correct one.
5769
 
 
5770
 
      CAVEAT! This behaviour is essential for the replication of,
5771
 
      e.g., the mysql.proc table since the correct record *shall* be
5772
 
      found using the primary key *only*.  There shall be no
5773
 
      comparison of non-PK columns to decide if the correct record is
5774
 
      found.  I can see no scenario where it would be incorrect to
5775
 
      chose the row to change only using a PK or an UNNI.
5776
 
    */
5777
 
    if (table->key_info->flags & HA_NOSAME)
5778
 
    {
5779
 
      table->file->ha_index_end();
5780
 
      goto ok;
5781
 
    }
5782
 
 
5783
 
    /*
5784
 
      In case key is not unique, we still have to iterate over records found
5785
 
      and find the one which is identical to the row given. A copy of the
5786
 
      record we are looking for is stored in record[1].
5787
 
     */
5788
 
    while (record_compare(table))
5789
 
    {
5790
 
      /*
5791
 
        We need to set the null bytes to ensure that the filler bit
5792
 
        are all set when returning.  There are storage engines that
5793
 
        just set the necessary bits on the bytes and don't set the
5794
 
        filler bits correctly.
5795
 
 
5796
 
        TODO[record format ndb]: Remove this code once NDB returns the
5797
 
        correct record format.
5798
 
      */
5799
 
      if (table->s->null_bytes > 0)
5800
 
      {
5801
 
        table->record[0][table->s->null_bytes - 1]|=
5802
 
          256U - (1U << table->s->last_null_bit_pos);
5803
 
      }
5804
 
 
5805
 
      if ((error= table->file->index_next(table->record[0])))
5806
 
      {
5807
 
        table->file->print_error(error, MYF(0));
5808
 
        table->file->ha_index_end();
5809
 
        goto err;
5810
 
      }
5811
 
    }
5812
 
 
5813
 
    /*
5814
 
      Have to restart the scan to be able to fetch the next row.
5815
 
    */
5816
 
    table->file->ha_index_end();
5817
 
  }
5818
 
  else
5819
 
  {
5820
 
    int restart_count= 0; // Number of times scanning has restarted from top
5821
 
 
5822
 
    /* We don't have a key: search the table using rnd_next() */
5823
 
    if ((error= table->file->ha_rnd_init(1)))
5824
 
    {
5825
 
      table->file->print_error(error, MYF(0));
5826
 
      goto err;
5827
 
    }
5828
 
 
5829
 
    /* Continue until we find the right record or have made a full loop */
5830
 
    do
5831
 
    {
5832
 
      error= table->file->rnd_next(table->record[0]);
5833
 
 
5834
 
      switch (error) {
5835
 
 
5836
 
      case 0:
5837
 
      case HA_ERR_RECORD_DELETED:
5838
 
        break;
5839
 
 
5840
 
      case HA_ERR_END_OF_FILE:
5841
 
        if (++restart_count < 2)
5842
 
          table->file->ha_rnd_init(1);
5843
 
        break;
5844
 
 
5845
 
      default:
5846
 
        table->file->print_error(error, MYF(0));
5847
 
        table->file->ha_rnd_end();
5848
 
        goto err;
5849
 
      }
5850
 
    }
5851
 
    while (restart_count < 2 && record_compare(table));
5852
 
 
5853
 
    /*
5854
 
      Note: above record_compare will take into accout all record fields
5855
 
      which might be incorrect in case a partial row was given in the event
5856
 
     */
5857
 
    table->file->ha_rnd_end();
5858
 
 
5859
 
    assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
5860
 
    goto err;
5861
 
  }
5862
 
ok:
5863
 
  table->default_column_bitmaps();
5864
 
  return(0);
5865
 
err:
5866
 
  table->default_column_bitmaps();
5867
 
  return(error);
5868
 
}
5869
 
 
5870
 
 
5871
 
/*
5872
 
  Constructor used to build an event for writing to the binary log.
5873
 
 */
5874
 
 
5875
 
Delete_rows_log_event::Delete_rows_log_event(Session *session_arg, Table *tbl_arg,
5876
 
                                             ulong tid,
5877
 
                                             bool is_transactional)
5878
 
  : Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
5879
 
{
5880
 
}
5881
 
 
5882
 
/*
5883
 
  Constructor used by slave to read the event from the binary log.
5884
 
 */
5885
 
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
5886
 
                                             const Format_description_log_event
5887
 
                                             *description_event)
5888
 
  : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
5889
 
{
5890
 
}
5891
 
 
5892
 
 
5893
 
int
5894
 
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5895
 
{
5896
 
  if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5897
 
      m_table->s->primary_key < MAX_KEY)
5898
 
  {
5899
 
    /*
5900
 
      We don't need to allocate any memory for m_key since it is not used.
5901
 
    */
5902
 
    return 0;
5903
 
  }
5904
 
 
5905
 
  if (m_table->s->keys > 0)
5906
 
  {
5907
 
    // Allocate buffer for key searches
5908
 
    m_key= (unsigned char*)malloc(m_table->key_info->key_length);
5909
 
    if (!m_key)
5910
 
      return HA_ERR_OUT_OF_MEM;
5911
 
  }
5912
 
 
5913
 
  return 0;
5914
 
}
5915
 
 
5916
 
int
5917
 
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5918
 
                                               int error)
5919
 
{
5920
 
  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
5921
 
  m_table->file->ha_index_or_rnd_end();
5922
 
  free(m_key);
5923
 
  m_key= NULL;
5924
 
 
5925
 
  return error;
5926
 
}
5927
 
 
5928
 
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5929
 
{
5930
 
  int error;
5931
 
  assert(m_table != NULL);
5932
 
 
5933
 
  if (!(error= find_row(rli)))
5934
 
  {
5935
 
    /*
5936
 
      Delete the record found, located in record[0]
5937
 
    */
5938
 
    error= m_table->file->ha_delete_row(m_table->record[0]);
5939
 
  }
5940
 
  return error;
5941
 
}
5942
 
 
5943
 
 
5944
 
/**************************************************************************
5945
 
        Update_rows_log_event member functions
5946
 
**************************************************************************/
5947
 
 
5948
 
/*
5949
 
  Constructor used to build an event for writing to the binary log.
5950
 
 */
5951
 
Update_rows_log_event::Update_rows_log_event(Session *session_arg, Table *tbl_arg,
5952
 
                                             ulong tid,
5953
 
                                             bool is_transactional)
5954
 
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
5955
 
{
5956
 
  init(tbl_arg->write_set);
5957
 
}
5958
 
 
5959
 
void Update_rows_log_event::init(MY_BITMAP const *cols)
5960
 
{
5961
 
  /* if bitmap_init fails, caught in is_valid() */
5962
 
  if (likely(!bitmap_init(&m_cols_ai,
5963
 
                          m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
5964
 
                          m_width,
5965
 
                          false)))
5966
 
  {
5967
 
    /* Cols can be zero if this is a dummy binrows event */
5968
 
    if (likely(cols != NULL))
5969
 
    {
5970
 
      memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
5971
 
      create_last_word_mask(&m_cols_ai);
5972
 
    }
5973
 
  }
5974
 
}
5975
 
 
5976
 
 
5977
 
Update_rows_log_event::~Update_rows_log_event()
5978
 
{
5979
 
  if (m_cols_ai.bitmap == m_bitbuf_ai) // no malloc happened
5980
 
    m_cols_ai.bitmap= 0; // so no free in bitmap_free
5981
 
  bitmap_free(&m_cols_ai); // To pair with bitmap_init().
5982
 
}
5983
 
 
5984
 
 
5985
 
/*
5986
 
  Constructor used by slave to read the event from the binary log.
5987
 
 */
5988
 
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
5989
 
                                             const
5990
 
                                             Format_description_log_event
5991
 
                                             *description_event)
5992
 
  : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
5993
 
{
5994
 
}
5995
 
 
5996
 
 
5997
 
int
5998
 
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5999
 
{
6000
 
  if (m_table->s->keys > 0)
6001
 
  {
6002
 
    // Allocate buffer for key searches
6003
 
    m_key= (unsigned char*)malloc(m_table->key_info->key_length);
6004
 
    if (!m_key)
6005
 
      return HA_ERR_OUT_OF_MEM;
6006
 
  }
6007
 
 
6008
 
  m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6009
 
 
6010
 
  return 0;
6011
 
}
6012
 
 
6013
 
int
6014
 
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6015
 
                                               int error)
6016
 
{
6017
 
  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6018
 
  m_table->file->ha_index_or_rnd_end();
6019
 
  free(m_key); // Free for multi_malloc
6020
 
  m_key= NULL;
6021
 
 
6022
 
  return error;
6023
 
}
6024
 
 
6025
 
int
6026
 
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6027
 
{
6028
 
  assert(m_table != NULL);
6029
 
 
6030
 
  int error= find_row(rli);
6031
 
  if (error)
6032
 
  {
6033
 
    /*
6034
 
      We need to read the second image in the event of error to be
6035
 
      able to skip to the next pair of updates
6036
 
    */
6037
 
    m_curr_row= m_curr_row_end;
6038
 
    unpack_current_row(rli, &m_cols_ai);
6039
 
    return error;
6040
 
  }
6041
 
 
6042
 
  /*
6043
 
    This is the situation after locating BI:
6044
 
 
6045
 
    ===|=== before image ====|=== after image ===|===
6046
 
       ^                     ^
6047
 
       m_curr_row            m_curr_row_end
6048
 
 
6049
 
    BI found in the table is stored in record[0]. We copy it to record[1]
6050
 
    and unpack AI to record[0].
6051
 
   */
6052
 
 
6053
 
  store_record(m_table,record[1]);
6054
 
 
6055
 
  m_curr_row= m_curr_row_end;
6056
 
  error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6057
 
 
6058
 
  /*
6059
 
    Now we have the right row to update.  The old row (the one we're
6060
 
    looking for) is in record[1] and the new row is in record[0].
6061
 
  */
6062
 
 
6063
 
  // Temporary fix to find out why it fails [/Matz]
6064
 
  memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6065
 
  memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6066
 
 
6067
 
  error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6068
 
  if (error == HA_ERR_RECORD_IS_THE_SAME)
6069
 
    error= 0;
6070
 
 
6071
 
  return error;
6072
 
}
6073
 
 
6074
 
 
6075
 
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6076
 
                                       const Format_description_log_event *descr_event)
6077
 
  : Log_event(buf, descr_event)
6078
 
{
6079
 
  uint8_t const common_header_len=
6080
 
    descr_event->common_header_len;
6081
 
  uint8_t const post_header_len=
6082
 
    descr_event->post_header_len[INCIDENT_EVENT-1];
6083
 
 
6084
 
  m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6085
 
  char const *ptr= buf + common_header_len + post_header_len;
6086
 
  char const *const str_end= buf + event_len;
6087
 
  uint8_t len= 0;                   // Assignment to keep compiler happy
6088
 
  const char *str= NULL;          // Assignment to keep compiler happy
6089
 
  read_str(&ptr, str_end, &str, &len);
6090
 
  m_message.str= const_cast<char*>(str);
6091
 
  m_message.length= len;
6092
 
  return;
6093
 
}
6094
 
 
6095
 
 
6096
 
Incident_log_event::~Incident_log_event()
6097
 
{
6098
 
}
6099
 
 
6100
 
 
6101
 
const char *
6102
 
Incident_log_event::description() const
6103
 
{
6104
 
  static const char *const description[]= {
6105
 
    "NOTHING",                                  // Not used
6106
 
    "LOST_EVENTS"
6107
 
  };
6108
 
 
6109
 
  assert(0 <= m_incident);
6110
 
  assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6111
 
 
6112
 
  return description[m_incident];
6113
 
}
6114
 
 
6115
 
 
6116
 
void Incident_log_event::pack_info(Protocol *protocol)
6117
 
{
6118
 
  char buf[256];
6119
 
  size_t bytes;
6120
 
  if (m_message.length > 0)
6121
 
    bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6122
 
                    m_incident, description());
6123
 
  else
6124
 
    bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6125
 
                    m_incident, description(), m_message.str);
6126
 
  protocol->store(buf, bytes, &my_charset_bin);
6127
 
}
6128
 
 
6129
 
 
6130
 
int
6131
 
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6132
 
{
6133
 
  rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6134
 
              ER(ER_SLAVE_INCIDENT),
6135
 
              description(),
6136
 
              m_message.length > 0 ? m_message.str : "<none>");
6137
 
  return(1);
6138
 
}
6139
 
 
6140
 
 
6141
 
bool
6142
 
Incident_log_event::write_data_header(IO_CACHE *file)
6143
 
{
6144
 
  unsigned char buf[sizeof(int16_t)];
6145
 
  int2store(buf, (int16_t) m_incident);
6146
 
  return(my_b_safe_write(file, buf, sizeof(buf)));
6147
 
}
6148
 
 
6149
 
bool
6150
 
Incident_log_event::write_data_body(IO_CACHE *file)
6151
 
{
6152
 
  return(write_str(file, m_message.str, m_message.length));
6153
 
}
6154
 
 
6155
 
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6156
 
                    const Format_description_log_event* description_event)
6157
 
  :Log_event(buf, description_event)
6158
 
{
6159
 
  uint8_t header_size= description_event->common_header_len;
6160
 
  ident_len = event_len - header_size;
6161
 
  set_if_smaller(ident_len,FN_REFLEN-1);
6162
 
  log_ident= buf + header_size;
6163
 
}