~drizzle-trunk/drizzle/development

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 *
 *  Copyright (C) 2009 - 2010 Toru Maesaka
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; version 2 of the License.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 */

#include "config.h"
#include "ha_blitz.h"

using namespace drizzled;

/* Given two native rows, this function checks all unique fields to
   find whether the value has been updated. If a unique field value
   is updated, it checks if the new key exists in the database already.
   If it already exists, then it is a unique constraint violation. */
int ha_blitz::compare_rows_for_unique_violation(const unsigned char *old_row,
                                                const unsigned char *new_row) {
  const unsigned char *old_pos, *new_pos;
  char *key, *fetched;
  int key_len, fetched_len;

  /* For now, we are only interested in supporting a PRIMARY KEY. In the
     next phase of BlitzDB, this should loop through the key array. */
  if (share->primary_key_exists) {
    KeyInfo *pk = &table->key_info[table->getMutableShare()->getPrimaryKey()];
    KeyPartInfo *key_part = pk->key_part;
    KeyPartInfo *key_part_end = key_part + pk->key_parts;
    int key_changed = 0;

    while (key_part != key_part_end) {  
      old_pos = old_row + key_part->offset;
      new_pos = new_row + key_part->offset;

      /* In VARCHAR, first 2 bytes is reserved to represent the length
         of the key. Rip it out and move forward the row pointers. */
      if (key_part->type == HA_KEYTYPE_VARTEXT2) {
        uint16_t old_key_len = uint2korr(old_pos);
        uint16_t new_key_len = uint2korr(new_pos);
        old_pos += sizeof(old_key_len);
        new_pos += sizeof(new_key_len);

        /* Compare the actual data by repecting it's collation. */
        key_changed = my_strnncoll(&my_charset_utf8_general_ci, old_pos,
                                   old_key_len, new_pos, new_key_len);
      } else if (key_part->type == HA_KEYTYPE_BINARY) {
        return HA_ERR_UNSUPPORTED;
      } else {
        key_changed = memcmp(old_pos, new_pos, key_part->length);
      }
      key_part++;
    }

    /* Key has changed. Look up the database to see if the new value
       would violate the unique contraint. */
    if (key_changed) {
      key = key_buffer;
      key_len = make_index_key(key, table->getMutableShare()->getPrimaryKey(), new_row);
      fetched = share->dict.get_row(key, key_len, &fetched_len);

      /* Key Exists. It's a violation. */
      if (fetched != NULL) {
        free(fetched);
        this->errkey_id = table->getMutableShare()->getPrimaryKey();
        return HA_ERR_FOUND_DUPP_KEY;
      }
    }
  }

  return 0;
}

/* 
 * Comparison Function for TC's B+Tree. This is the only function
 * in BlitzDB where you can feel free to write redundant code for
 * performance. Avoid function calls unless it is necessary. If
 * you reaaaally want to separate the code block for readability,
 * make sure to separate the code with C++ template for inlining.
 *                                               - Toru
 * Currently Handles:
 *   INT       (interpreted as LONG_INT)
 *   BIGINT    (interpreted as LONGLONG)
 *   TIMESTAMP (interpreted as ULONG_INT)
 *   DATETIME  (interpreted as ULONGLONG)
 *   DATE      (interpreted as UINT_24)
 *   DOUBLE    (interpreted as DOUBLE)
 *   VARCHAR   (VARTEXT1 or VARTEXT2)
 *
 * BlitzDB Key Format:
 *   [DRIZZLE_KEY][UINT16_T][BLITZDB_UNIQUE_KEY]
 *
 * Returns:
 *   -1 = a < b
 *    0 = a == b
 *    1 = a > b
 */
int blitz_keycmp_cb(const char *a, int,
                    const char *b, int, void *opaque) {
  BlitzTree *tree = (BlitzTree *)opaque;
  int a_compared_len, b_compared_len, rv;

  rv = packed_key_cmp(tree, a, b, &a_compared_len, &b_compared_len);

  if (rv != 0)
    return rv;

  /* Getting here means that the Drizzle part of the keys are
     identical. We now compare the BlitzDB part of the key. */
  if (tree->unique) {
    /* This is a special exception that allows Drizzle's NULL
       key to be inserted duplicately into a UNIQUE tree. If
       the keys aren't NULL then it is safe to conclude that
       the keys are identical which this condition does. */
    if (*a != 0 && *b != 0)
      return 0;
  }

  char *a_pos = (char *)a + a_compared_len;
  char *b_pos = (char *)b + b_compared_len;

  uint16_t a_pk_len = uint2korr(a_pos);
  uint16_t b_pk_len = uint2korr(b_pos);

  a_pos += sizeof(a_pk_len);
  b_pos += sizeof(b_pk_len);

  if (a_pk_len == b_pk_len)
    rv = memcmp(a_pos, b_pos, a_pk_len);
  else if (a_pk_len < b_pk_len)
    rv = -1;
  else
    rv = 1;

  return rv;
}

/* General purpose comparison function for BlitzDB. We cannot reuse
   blitz_keycmp_cb() for this purpose because the 'exact match' only
   applies to BlitzDB's unique B+Tree key format in blitz_keycmp_cb().
   Here we are comparing packed Drizzle keys. */
int packed_key_cmp(BlitzTree *tree, const char *a, const char *b,
                   int *a_compared_len, int *b_compared_len) {
  BlitzKeyPart *curr_part;
  char *a_pos = (char *)a;
  char *b_pos = (char *)b;
  int a_next_offset = 0;
  int b_next_offset = 0;

  *a_compared_len = 0;
  *b_compared_len = 0;

  for (int i = 0; i < tree->nparts; i++) {
    curr_part = &tree->parts[i];

    if (curr_part->null_bitmask) {
      *a_compared_len += 1;
      *b_compared_len += 1;

      if (*a_pos != *b_pos)
        return ((int)*a_pos - (int)*b_pos);

      b_pos++;

      if (!*a_pos++) {
        curr_part++;
        continue;
      }
    }

    switch ((enum ha_base_keytype)curr_part->type) {
    case HA_KEYTYPE_LONG_INT: {
      int32_t a_int_val = sint4korr(a_pos);
      int32_t b_int_val = sint4korr(b_pos);

      *a_compared_len += curr_part->length;
      *b_compared_len += curr_part->length;

      if (a_int_val < b_int_val)
        return -1;
      else if (a_int_val > b_int_val)
        return 1;

      a_next_offset = b_next_offset = curr_part->length;
      break; 
    }
    case HA_KEYTYPE_ULONG_INT: {
      uint32_t a_int_val = uint4korr(a_pos);
      uint32_t b_int_val = uint4korr(b_pos);

      *a_compared_len += curr_part->length;
      *b_compared_len += curr_part->length;

      if (a_int_val < b_int_val)
        return -1;
      else if (a_int_val > b_int_val)
        return 1;

      a_next_offset = b_next_offset = curr_part->length;
      break;
    }
    case HA_KEYTYPE_LONGLONG: {
      int64_t a_int_val = sint8korr(a_pos);
      int64_t b_int_val = sint8korr(b_pos);

      *a_compared_len += curr_part->length;
      *b_compared_len += curr_part->length;

      if (a_int_val < b_int_val)
        return -1;
      else if (a_int_val > b_int_val)
        return 1;

      a_next_offset = b_next_offset = curr_part->length;
      break;
    }
    case HA_KEYTYPE_ULONGLONG: {
      uint64_t a_int_val = uint8korr(a_pos);
      uint64_t b_int_val = uint8korr(b_pos);

      *a_compared_len += curr_part->length;
      *b_compared_len += curr_part->length;

      if (a_int_val < b_int_val)
        return -1;
      else if (a_int_val > b_int_val)
        return 1;

      a_next_offset = b_next_offset = curr_part->length;
      break;
    }
    case HA_KEYTYPE_DOUBLE: {
      double a_double_val, b_double_val;
      float8get(a_double_val, a_pos);
      float8get(b_double_val, b_pos);

      *a_compared_len += curr_part->length;
      *b_compared_len += curr_part->length;

      if (a_double_val < b_double_val)
        return -1;
      else if (a_double_val > b_double_val)
        return 1;

      a_next_offset = b_next_offset = curr_part->length;
      break;
    }
    case HA_KEYTYPE_VARTEXT1: {
      uint8_t a_varchar_len = *(uint8_t *)a_pos;
      uint8_t b_varchar_len = *(uint8_t *)b_pos;
      int key_changed;

      a_pos++;
      b_pos++;

      *a_compared_len += a_varchar_len + sizeof(a_varchar_len);
      *b_compared_len += b_varchar_len + sizeof(b_varchar_len);

      /* Compare the texts by respecting collation. */
      key_changed = my_strnncoll(&my_charset_utf8_general_ci,
                                 (unsigned char *)a_pos, a_varchar_len,
                                 (unsigned char *)b_pos, b_varchar_len);
      if (key_changed < 0)
        return -1;
      else if (key_changed > 0)
        return 1;

      a_next_offset = a_varchar_len;
      b_next_offset = b_varchar_len;
      break;
    }
    case HA_KEYTYPE_VARTEXT2: {
      uint16_t a_varchar_len = uint2korr(a_pos);
      uint16_t b_varchar_len = uint2korr(b_pos);
      int key_changed;

      a_pos += sizeof(a_varchar_len);
      b_pos += sizeof(b_varchar_len);

      *a_compared_len += a_varchar_len + sizeof(a_varchar_len);
      *b_compared_len += b_varchar_len + sizeof(b_varchar_len);

      /* Compare the texts by respecting collation. */
      key_changed = my_strnncoll(&my_charset_utf8_general_ci,
                                 (unsigned char *)a_pos, a_varchar_len,
                                 (unsigned char *)b_pos, b_varchar_len);
      if (key_changed < 0)
        return -1;
      else if (key_changed > 0)
        return 1;

      a_next_offset = a_varchar_len;
      b_next_offset = b_varchar_len;
      break; 
    }
    default:
      break;
    }

    a_pos += a_next_offset;
    b_pos += b_next_offset;
    curr_part++;
  }

  return 0;
}