~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/rpl_record.cc

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright 2007 MySQL AB. All rights reserved.
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include "mysql_priv.h"
 
17
#include "rpl_rli.h"
 
18
#include "rpl_record.h"
 
19
#include "slave.h"                  // Need to pull in slave_print_msg
 
20
#include "rpl_utility.h"
 
21
#include "rpl_rli.h"
 
22
 
 
23
/**
 
24
   Pack a record of data for a table into a format suitable for
 
25
   transfer via the binary log.
 
26
 
 
27
   The format for a row in transfer with N fields is the following:
 
28
 
 
29
   ceil(N/8) null bytes:
 
30
       One null bit for every column *regardless of whether it can be
 
31
       null or not*. This simplifies the decoding. Observe that the
 
32
       number of null bits is equal to the number of set bits in the
 
33
       @c cols bitmap. The number of null bytes is the smallest number
 
34
       of bytes necessary to store the null bits.
 
35
 
 
36
       Padding bits are 1.
 
37
 
 
38
   N packets:
 
39
       Each field is stored in packed format.
 
40
 
 
41
 
 
42
   @param table    Table describing the format of the record
 
43
 
 
44
   @param cols     Bitmap with a set bit for each column that should
 
45
                   be stored in the row
 
46
 
 
47
   @param row_data Pointer to memory where row will be written
 
48
 
 
49
   @param record   Pointer to record that should be packed. It is
 
50
                   assumed that the pointer refers to either @c
 
51
                   record[0] or @c record[1], but no such check is
 
52
                   made since the code does not rely on that.
 
53
 
 
54
   @return The number of bytes written at @c row_data.
 
55
 */
 
56
#if !defined(MYSQL_CLIENT)
 
57
size_t
 
58
pack_row(TABLE *table, MY_BITMAP const* cols,
 
59
         uchar *row_data, const uchar *record)
 
60
{
 
61
  Field **p_field= table->field, *field;
 
62
  int const null_byte_count= (bitmap_bits_set(cols) + 7) / 8;
 
63
  uchar *pack_ptr = row_data + null_byte_count;
 
64
  uchar *null_ptr = row_data;
 
65
  my_ptrdiff_t const rec_offset= record - table->record[0];
 
66
  my_ptrdiff_t const def_offset= table->s->default_values - table->record[0];
 
67
 
 
68
  DBUG_ENTER("pack_row");
 
69
 
 
70
  /*
 
71
    We write the null bits and the packed records using one pass
 
72
    through all the fields. The null bytes are written little-endian,
 
73
    i.e., the first fields are in the first byte.
 
74
   */
 
75
  unsigned int null_bits= (1U << 8) - 1;
 
76
  // Mask to mask out the correct but among the null bits
 
77
  unsigned int null_mask= 1U;
 
78
  DBUG_PRINT("debug", ("null ptr: 0x%lx; row start: %p; null bytes: %d",
 
79
                       (ulong) null_ptr, row_data, null_byte_count));
 
80
  DBUG_PRINT_BITSET("debug", "cols: %s", cols);
 
81
  for ( ; (field= *p_field) ; p_field++)
 
82
  {
 
83
    DBUG_PRINT("debug", ("field: %s; null mask: 0x%x",
 
84
                         field->field_name, null_mask));
 
85
    if (bitmap_is_set(cols, p_field - table->field))
 
86
    {
 
87
      my_ptrdiff_t offset;
 
88
      if (field->is_null(rec_offset))
 
89
      {
 
90
        DBUG_PRINT("debug", ("Is NULL; null_mask: 0x%x; null_bits: 0x%x",
 
91
                             null_mask, null_bits));
 
92
        offset= def_offset;
 
93
        null_bits |= null_mask;
 
94
      }
 
95
      else
 
96
      {
 
97
        offset= rec_offset;
 
98
        null_bits &= ~null_mask;
 
99
 
 
100
        /*
 
101
          We only store the data of the field if it is non-null
 
102
 
 
103
          For big-endian machines, we have to make sure that the
 
104
          length is stored in little-endian format, since this is the
 
105
          format used for the binlog.
 
106
        */
 
107
#ifndef DBUG_OFF
 
108
        const uchar *old_pack_ptr= pack_ptr;
 
109
#endif
 
110
        pack_ptr= field->pack(pack_ptr, field->ptr + offset,
 
111
                              field->max_data_length(), TRUE);
 
112
        DBUG_PRINT("debug", ("Packed into row; pack_ptr: 0x%lx;"
 
113
                             " pack_ptr':0x%lx; bytes: %d",
 
114
                             (ulong) old_pack_ptr,
 
115
                             (ulong) pack_ptr,
 
116
                             (int) (pack_ptr - old_pack_ptr)));
 
117
      }
 
118
 
 
119
      null_mask <<= 1;
 
120
      if ((null_mask & 0xFF) == 0)
 
121
      {
 
122
        DBUG_ASSERT(null_ptr < row_data + null_byte_count);
 
123
        null_mask = 1U;
 
124
        *null_ptr++ = null_bits;
 
125
        null_bits= (1U << 8) - 1;
 
126
      }
 
127
    }
 
128
#ifndef DBUG_OFF
 
129
    else
 
130
    {
 
131
      DBUG_PRINT("debug", ("Skipped"));
 
132
    }
 
133
#endif
 
134
  }
 
135
 
 
136
  /*
 
137
    Write the last (partial) byte, if there is one
 
138
  */
 
139
  if ((null_mask & 0xFF) > 1)
 
140
  {
 
141
    DBUG_ASSERT(null_ptr < row_data + null_byte_count);
 
142
    *null_ptr++ = null_bits;
 
143
  }
 
144
 
 
145
  /*
 
146
    The null pointer should now point to the first byte of the
 
147
    packed data. If it doesn't, something is very wrong.
 
148
  */
 
149
  DBUG_ASSERT(null_ptr == row_data + null_byte_count);
 
150
  DBUG_DUMP("row_data", row_data, pack_ptr - row_data);
 
151
  DBUG_RETURN(static_cast<size_t>(pack_ptr - row_data));
 
152
}
 
153
#endif
 
154
 
 
155
 
 
156
/**
 
157
   Unpack a row into @c table->record[0].
 
158
 
 
159
   The function will always unpack into the @c table->record[0]
 
160
   record.  This is because there are too many dependencies on where
 
161
   the various member functions of Field and subclasses expect to
 
162
   write.
 
163
 
 
164
   The row is assumed to only consist of the fields for which the corresponding
 
165
   bit in bitset @c cols is set; the other parts of the record are left alone.
 
166
 
 
167
   At most @c colcnt columns are read: if the table is larger than
 
168
   that, the remaining fields are not filled in.
 
169
 
 
170
   @param rli     Relay log info
 
171
   @param table   Table to unpack into
 
172
   @param colcnt  Number of columns to read from record
 
173
   @param row_data
 
174
                  Packed row data
 
175
   @param cols    Pointer to bitset describing columns to fill in
 
176
   @param row_end Pointer to variable that will hold the value of the
 
177
                  one-after-end position for the row
 
178
   @param master_reclength
 
179
                  Pointer to variable that will be set to the length of the
 
180
                  record on the master side
 
181
 
 
182
   @retval 0 No error
 
183
 
 
184
   @retval ER_NO_DEFAULT_FOR_FIELD
 
185
   Returned if one of the fields existing on the slave but not on the
 
186
   master does not have a default value (and isn't nullable)
 
187
 
 
188
 */
 
189
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 
190
int
 
191
unpack_row(Relay_log_info const *rli,
 
192
           TABLE *table, uint const colcnt,
 
193
           uchar const *const row_data, MY_BITMAP const *cols,
 
194
           uchar const **const row_end, ulong *const master_reclength)
 
195
{
 
196
  DBUG_ENTER("unpack_row");
 
197
  DBUG_ASSERT(row_data);
 
198
  size_t const master_null_byte_count= (bitmap_bits_set(cols) + 7) / 8;
 
199
  int error= 0;
 
200
 
 
201
  uchar const *null_ptr= row_data;
 
202
  uchar const *pack_ptr= row_data + master_null_byte_count;
 
203
 
 
204
  Field **const begin_ptr = table->field;
 
205
  Field **field_ptr;
 
206
  Field **const end_ptr= begin_ptr + colcnt;
 
207
 
 
208
  DBUG_ASSERT(null_ptr < row_data + master_null_byte_count);
 
209
 
 
210
  // Mask to mask out the correct bit among the null bits
 
211
  unsigned int null_mask= 1U;
 
212
  // The "current" null bits
 
213
  unsigned int null_bits= *null_ptr++;
 
214
  uint i= 0;
 
215
 
 
216
  /*
 
217
    Use the rli class to get the table's metadata. If tabledef is not NULL
 
218
    we are processing data from a master. If tabledef is NULL then it is
 
219
    assumed that the packed row comes from the table to which it is
 
220
    unpacked.
 
221
  */
 
222
  table_def *tabledef= rli ? ((Relay_log_info*)rli)->get_tabledef(table) : 0;
 
223
  for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr)
 
224
  {
 
225
    Field *const f= *field_ptr;
 
226
 
 
227
    DBUG_PRINT("debug", ("%sfield: %s; null mask: 0x%x; null bits: 0x%lx;"
 
228
                         " row start: %p; null bytes: %ld",
 
229
                         bitmap_is_set(cols, field_ptr -  begin_ptr) ? "+" : "-",
 
230
                         f->field_name, null_mask, (ulong) null_bits,
 
231
                         pack_ptr, (ulong) master_null_byte_count));
 
232
 
 
233
    /*
 
234
      No need to bother about columns that does not exist: they have
 
235
      gotten default values when being emptied above.
 
236
     */
 
237
    if (bitmap_is_set(cols, field_ptr -  begin_ptr))
 
238
    {
 
239
      if ((null_mask & 0xFF) == 0)
 
240
      {
 
241
        DBUG_ASSERT(null_ptr < row_data + master_null_byte_count);
 
242
        null_mask= 1U;
 
243
        null_bits= *null_ptr++;
 
244
      }
 
245
 
 
246
      DBUG_ASSERT(null_mask & 0xFF); // One of the 8 LSB should be set
 
247
 
 
248
      /* Field...::unpack() cannot return 0 */
 
249
      DBUG_ASSERT(pack_ptr != NULL);
 
250
 
 
251
      if ((null_bits & null_mask) && f->maybe_null())
 
252
      {
 
253
        DBUG_PRINT("debug", ("Was NULL; null mask: 0x%x; null bits: 0x%x",
 
254
                             null_mask, null_bits));
 
255
 
 
256
        f->set_null();
 
257
      }
 
258
      else
 
259
      {
 
260
        f->set_notnull();
 
261
 
 
262
        /*
 
263
          We only unpack the field if it was non-null.
 
264
          Use the master's size information if available else call
 
265
          normal unpack operation.
 
266
        */
 
267
        /*
 
268
          Use the master's metadata if we are processing data from a slave
 
269
          (tabledef not NULL). If tabledef is NULL then it is assumed that
 
270
          the packed row comes from the table to which it is unpacked.
 
271
        */
 
272
        uint16 metadata= tabledef ? tabledef->field_metadata(i) : 0;
 
273
#ifndef DBUG_OFF
 
274
        uchar const *const old_pack_ptr= pack_ptr;
 
275
#endif
 
276
        pack_ptr= f->unpack(f->ptr, pack_ptr, metadata, TRUE);
 
277
        DBUG_PRINT("debug", ("Unpacked; metadata: 0x%x;"
 
278
                             " pack_ptr: 0x%lx; pack_ptr': 0x%lx; bytes: %d",
 
279
                             metadata, (ulong) old_pack_ptr, (ulong) pack_ptr,
 
280
                             (int) (pack_ptr - old_pack_ptr)));
 
281
      }
 
282
 
 
283
      null_mask <<= 1;
 
284
    }
 
285
#ifndef DBUG_OFF
 
286
    else
 
287
    {
 
288
      DBUG_PRINT("debug", ("Non-existent: skipped"));
 
289
    }
 
290
#endif
 
291
    i++;
 
292
  }
 
293
 
 
294
  /*
 
295
    throw away master's extra fields
 
296
 
 
297
    Use the master's max_cols if we are processing data from a slave
 
298
    (tabledef not NULL). If tabledef is NULL then it is assumed that
 
299
    there are no extra columns.
 
300
  */
 
301
  uint max_cols= tabledef ? min(tabledef->size(), cols->n_bits) : 0;
 
302
  for (; i < max_cols; i++)
 
303
  {
 
304
    if (bitmap_is_set(cols, i))
 
305
    {
 
306
      if ((null_mask & 0xFF) == 0)
 
307
      {
 
308
        DBUG_ASSERT(null_ptr < row_data + master_null_byte_count);
 
309
        null_mask= 1U;
 
310
        null_bits= *null_ptr++;
 
311
      }
 
312
      DBUG_ASSERT(null_mask & 0xFF); // One of the 8 LSB should be set
 
313
 
 
314
      if (!((null_bits & null_mask) && tabledef->maybe_null(i)))
 
315
        pack_ptr+= tabledef->calc_field_size(i, (uchar *) pack_ptr);
 
316
      null_mask <<= 1;
 
317
    }
 
318
  }
 
319
 
 
320
  /*
 
321
    We should now have read all the null bytes, otherwise something is
 
322
    really wrong.
 
323
   */
 
324
  DBUG_ASSERT(null_ptr == row_data + master_null_byte_count);
 
325
 
 
326
  DBUG_DUMP("row_data", row_data, pack_ptr - row_data);
 
327
 
 
328
  *row_end = pack_ptr;
 
329
  if (master_reclength)
 
330
  {
 
331
    if (*field_ptr)
 
332
      *master_reclength = (*field_ptr)->ptr - table->record[0];
 
333
    else
 
334
      *master_reclength = table->s->reclength;
 
335
  }
 
336
  
 
337
  DBUG_RETURN(error);
 
338
}
 
339
 
 
340
/**
 
341
  Fills @c table->record[0] with default values.
 
342
 
 
343
  First @c empty_record() is called and then, additionally, fields are
 
344
  initialized explicitly with a call to @c set_default().
 
345
 
 
346
  For optimization reasons, the explicit initialization can be skipped
 
347
  for fields that are not marked in the @c cols vector. These fields
 
348
  will be set later, and filling them with default values is
 
349
  unnecessary.
 
350
 
 
351
  If @c check is true, fields are explicitly initialized only if they
 
352
  have default value or can be NULL. Otherwise error is reported. If
 
353
  @c check is false, no error is reported and the field is not set to
 
354
  any value.
 
355
 
 
356
  @todo When flag is added to allow engine to handle default values
 
357
  itself, the record should not be emptied and default values not set.
 
358
 
 
359
  @param table[in,out] Table whose record[0] buffer is prepared. 
 
360
  @param cols[in]      Vector of bits denoting columns that will be set
 
361
                       elsewhere
 
362
  @param check[in]     Indicates if errors should be checked when setting default
 
363
                       values.
 
364
 
 
365
  @retval 0                       Success
 
366
  @retval ER_NO_DEFAULT_FOR_FIELD Default value could not be set for a field
 
367
 */ 
 
368
int prepare_record(TABLE *const table, 
 
369
                   const MY_BITMAP *cols, uint width, const bool check)
 
370
{
 
371
  DBUG_ENTER("prepare_record");
 
372
 
 
373
  int error= 0;
 
374
  empty_record(table);
 
375
 
 
376
  /*
 
377
    Explicit initialization of fields. For fields that are not in the
 
378
    cols for the row, we set them to default. If the fields are in
 
379
    addition to what exists on the master, we give an error if the
 
380
    have no sensible default.
 
381
  */
 
382
 
 
383
  DBUG_PRINT_BITSET("debug", "cols: %s", cols);
 
384
  for (Field **field_ptr= table->field ; *field_ptr ; ++field_ptr)
 
385
  {
 
386
    if ((uint) (field_ptr - table->field) >= cols->n_bits ||
 
387
        !bitmap_is_set(cols, field_ptr - table->field))
 
388
    {
 
389
      uint32 const mask= NOT_NULL_FLAG | NO_DEFAULT_VALUE_FLAG;
 
390
      Field *const f= *field_ptr;
 
391
 
 
392
      if (check && ((f->flags & mask) == mask))
 
393
      {
 
394
        my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), f->field_name);
 
395
        error = HA_ERR_ROWS_EVENT_APPLY;
 
396
      }
 
397
      else
 
398
      {
 
399
        DBUG_PRINT("debug", ("Set default; field: %s", f->field_name));
 
400
        f->set_default();
 
401
      }
 
402
    }
 
403
  }
 
404
 
 
405
  DBUG_RETURN(error);
 
406
}
 
407
 
 
408
#endif // HAVE_REPLICATION