~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

Monty fixes pluss a few from me for charset.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
 
#include "config.h"
 
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
23
#include <drizzled/session.h>
24
24
#include <drizzled/sql_base.h>
25
25
#include <drizzled/field/timestamp.h>
26
 
#include "drizzled/internal/my_sys.h"
27
 
#include "drizzled/internal/iocache.h"
28
 
#include <drizzled/db.h>
29
26
 
30
 
#include <sys/stat.h>
31
 
#include <fcntl.h>
32
27
#include <algorithm>
33
 
#include <climits>
34
 
#include <boost/filesystem.hpp>
35
28
 
36
 
namespace fs=boost::filesystem;
37
29
using namespace std;
38
 
namespace drizzled
39
 
{
40
30
 
41
31
class READ_INFO {
42
 
  int   cursor;
 
32
  File  file;
43
33
  unsigned char *buffer;                /* Buffer for read text */
44
34
  unsigned char *end_of_buff;           /* Data in bufferts ends here */
45
35
  size_t buff_length;                   /* Length of buffert */
50
40
  int   *stack,*stack_pos;
51
41
  bool  found_end_of_line,start_of_line,eof;
52
42
  bool  need_end_io_cache;
53
 
  internal::IO_CACHE cache;
 
43
  IO_CACHE cache;
54
44
 
55
45
public:
56
46
  bool error,line_cuted,found_null,enclosed;
58
48
        *row_end;                       /* Found row ends here */
59
49
  const CHARSET_INFO *read_charset;
60
50
 
61
 
  READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
 
51
  READ_INFO(File file, size_t tot_length, const CHARSET_INFO * const cs,
62
52
            String &field_term,String &line_start,String &line_term,
63
53
            String &enclosed,int escape, bool is_fifo);
64
54
  ~READ_INFO();
75
65
  */
76
66
  void end_io_cache()
77
67
  {
78
 
    cache.end_io_cache();
 
68
    ::end_io_cache(&cache);
79
69
    need_end_io_cache = 0;
80
70
  }
81
71
 
87
77
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
88
78
};
89
79
 
90
 
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
80
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
91
81
                             List<Item> &fields_vars, List<Item> &set_fields,
92
82
                             List<Item> &set_values, READ_INFO &read_info,
93
83
                             uint32_t skip_lines,
94
84
                             bool ignore_check_option_errors);
95
 
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
85
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
96
86
                          List<Item> &fields_vars, List<Item> &set_fields,
97
87
                          List<Item> &set_values, READ_INFO &read_info,
98
88
                          String &enclosed, uint32_t skip_lines,
105
95
  SYNOPSYS
106
96
    mysql_load()
107
97
      session - current thread
108
 
      ex  - file_exchange object representing source cursor and its parsing rules
 
98
      ex  - file_exchange object representing source file and its parsing rules
109
99
      table_list  - list of tables to which we are loading data
110
100
      fields_vars - list of fields and variables to which we read
111
 
                    data from cursor
 
101
                    data from file
112
102
      set_fields  - list of fields mentioned in set clause
113
103
      set_values  - expressions to assign to fields in previous list
114
104
      handle_duplicates - indicates whenever we should emit error or
124
114
                List<Item> &set_values,
125
115
                enum enum_duplicates handle_duplicates, bool ignore)
126
116
{
127
 
  int file;
 
117
  char name[FN_REFLEN];
 
118
  File file;
128
119
  Table *table= NULL;
129
120
  int error;
130
121
  String *field_term=ex->field_term,*escaped=ex->escaped;
131
122
  String *enclosed=ex->enclosed;
132
123
  bool is_fifo=0;
133
 
 
134
 
  assert(table_list->getSchemaName()); // This should never be null
135
 
 
 
124
  char *db= table_list->db;                     // This is never null
 
125
  assert(db);
136
126
  /*
137
 
    If path for cursor is not defined, we will use the current database.
 
127
    If path for file is not defined, we will use the current database.
138
128
    If this is not set, we will use the directory where the table to be
139
129
    loaded is located
140
130
  */
141
 
  util::string::const_shared_ptr schema(session->schema());
142
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
131
  char *tdb= session->db ? session->db : db;            // Result is never null
143
132
  assert(tdb);
144
133
  uint32_t skip_lines= ex->skip_lines;
145
134
  bool transactional_table;
146
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
135
  Session::killed_state killed_status= Session::NOT_KILLED;
147
136
 
148
137
  /* Escape and enclosed character may be a utf8 4-byte character */
149
138
  if (escaped->length() > 4 || enclosed->length() > 4)
151
140
    my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
152
141
    return(true);
153
142
  }
154
 
 
155
 
  if (session->openTablesLock(table_list))
 
143
  if (session->open_and_lock_tables(table_list))
156
144
    return(true);
157
 
 
158
145
  if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
159
146
                                    &session->lex->select_lex.top_join_list,
160
147
                                    table_list,
169
156
    table is marked to be 'used for insert' in which case we should never
170
157
    mark this table as 'const table' (ie, one that has only one row).
171
158
  */
172
 
  if (unique_table(table_list, table_list->next_global))
 
159
  if (unique_table(session, table_list, table_list->next_global, 0))
173
160
  {
174
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
161
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
162
    return(true);
176
163
  }
177
164
 
178
165
  table= table_list->table;
179
 
  transactional_table= table->cursor->has_transactions();
 
166
  transactional_table= table->file->has_transactions();
180
167
 
181
168
  if (!fields_vars.elements)
182
169
  {
183
170
    Field **field;
184
 
    for (field= table->getFields(); *field ; field++)
 
171
    for (field=table->field; *field ; field++)
185
172
      fields_vars.push_back(new Item_field(*field));
186
173
    table->setWriteSet();
187
174
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
206
193
    */
207
194
    if (table->timestamp_field)
208
195
    {
209
 
      if (table->isWriteSet(table->timestamp_field->position()))
210
 
      {
 
196
      if (table->isWriteSet(table->timestamp_field->field_index))
211
197
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
212
 
      }
213
198
      else
214
199
      {
215
 
        table->setWriteSet(table->timestamp_field->position());
 
200
        table->setWriteSet(table->timestamp_field->field_index);
216
201
      }
217
202
    }
218
203
    /* Fix the expressions in SET clause */
257
242
    return(true);
258
243
  }
259
244
 
260
 
  fs::path to_file(ex->file_name);
261
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
 
  if (not to_file.has_root_directory())
263
 
  {
264
 
    int count_elements= 0;
265
 
    for (fs::path::iterator iter= to_file.begin();
266
 
         iter != to_file.end();
267
 
         ++iter, ++count_elements)
268
 
    { }
269
 
 
270
 
    if (count_elements == 1)
271
 
    {
272
 
      target_path /= tdb;
273
 
    }
274
 
    target_path /= to_file;
275
 
  }
276
 
  else
277
 
  {
278
 
    target_path= to_file;
279
 
  }
280
 
 
281
 
  if (not secure_file_priv.string().empty())
282
 
  {
283
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
284
 
    {
285
 
      /* Read only allowed from within dir specified by secure_file_priv */
286
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
245
  {
 
246
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
247
    ex->file_name+=dirname_length(ex->file_name);
 
248
#endif
 
249
    if (!dirname_length(ex->file_name))
 
250
    {
 
251
      strcpy(name, drizzle_real_data_home);
 
252
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
253
      (void) fn_format(name, ex->file_name, name, "",
 
254
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
255
    }
 
256
    else
 
257
    {
 
258
      (void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
 
259
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
260
 
 
261
      if (opt_secure_file_priv &&
 
262
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
263
      {
 
264
        /* Read only allowed from within dir specified by secure_file_priv */
 
265
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
266
        return(true);
 
267
      }
 
268
 
 
269
      struct stat stat_info;
 
270
      if (stat(name,&stat_info))
 
271
      {
 
272
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
 
273
        return(true);
 
274
      }
 
275
 
 
276
      // if we are not in slave thread, the file must be:
 
277
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
278
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
279
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
280
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
281
      {
 
282
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
283
        return(true);
 
284
      }
 
285
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
286
        is_fifo = 1;
 
287
    }
 
288
    if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
289
    {
 
290
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
287
291
      return(true);
288
292
    }
289
293
  }
290
294
 
291
 
  struct stat stat_info;
292
 
  if (stat(target_path.file_string().c_str(), &stat_info))
293
 
  {
294
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
295
 
    return(true);
296
 
  }
297
 
 
298
 
  // if we are not in slave thread, the cursor must be:
299
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
300
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
303
 
  {
304
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
305
 
    return(true);
306
 
  }
307
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
308
 
    is_fifo = 1;
309
 
 
310
 
 
311
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
312
 
  {
313
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
314
 
    return(true);
315
 
  }
316
 
  CopyInfo info;
 
295
  COPY_INFO info;
317
296
  memset(&info, 0, sizeof(info));
318
297
  info.ignore= ignore;
319
298
  info.handle_duplicates=handle_duplicates;
320
299
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
321
300
 
322
 
  SchemaIdentifier identifier(*schema);
323
301
  READ_INFO read_info(file, tot_length,
324
 
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
 
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
 
302
                      ex->cs ? ex->cs : get_default_db_collation(session->db),
 
303
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
326
304
                      info.escape_char, is_fifo);
327
305
  if (read_info.error)
328
306
  {
329
307
    if  (file >= 0)
330
 
      internal::my_close(file,MYF(0));                  // no files in net reading
 
308
      my_close(file,MYF(0));                    // no files in net reading
331
309
    return(true);                               // Can't allocate buffers
332
310
  }
333
311
 
359
337
    table->next_number_field=table->found_next_number_field;
360
338
    if (ignore ||
361
339
        handle_duplicates == DUP_REPLACE)
362
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
340
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
363
341
    if (handle_duplicates == DUP_REPLACE)
364
 
        table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
365
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
342
        table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
343
    table->file->ha_start_bulk_insert((ha_rows) 0);
366
344
    table->copy_blobs=1;
367
345
 
368
346
    session->abort_on_warning= true;
375
353
      error= read_sep_field(session, info, table_list, fields_vars,
376
354
                            set_fields, set_values, read_info,
377
355
                            *enclosed, skip_lines, ignore);
378
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
356
    if (table->file->ha_end_bulk_insert() && !error)
379
357
    {
380
 
      table->print_error(errno, MYF(0));
 
358
      table->file->print_error(my_errno, MYF(0));
381
359
      error= 1;
382
360
    }
383
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
384
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
361
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
362
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
385
363
    table->next_number_field=0;
386
364
  }
387
365
  if (file >= 0)
388
 
    internal::my_close(file,MYF(0));
 
366
    my_close(file,MYF(0));
389
367
  free_blobs(table);                            /* if pack_blob was used */
390
368
  table->copy_blobs=0;
391
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
369
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
392
370
  /*
393
371
     simulated killing in the middle of per-row loop
394
372
     must be effective for binlogging
395
373
  */
396
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
374
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
397
375
  if (error)
398
376
  {
399
377
    error= -1;                          // Error on read
400
378
    goto err;
401
379
  }
402
 
 
403
 
  char msg[FN_REFLEN];
404
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
405
 
           (info.records - info.copied), session->cuted_fields);
406
 
 
407
 
  if (session->transaction.stmt.hasModifiedNonTransData())
408
 
    session->transaction.all.markModifiedNonTransData();
 
380
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
381
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
382
 
 
383
  if (session->transaction.stmt.modified_non_trans_table)
 
384
    session->transaction.all.modified_non_trans_table= true;
409
385
 
410
386
  /* ok to client sent only after binlog write and engine commit */
411
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
387
  session->my_ok(info.copied + info.deleted, 0L, name);
412
388
err:
413
389
  assert(transactional_table || !(info.copied || info.deleted) ||
414
 
              session->transaction.stmt.hasModifiedNonTransData());
415
 
  table->cursor->ha_release_auto_increment();
 
390
              session->transaction.stmt.modified_non_trans_table);
 
391
  table->file->ha_release_auto_increment();
416
392
  table->auto_increment_field_not_null= false;
417
393
  session->abort_on_warning= 0;
418
394
  return(error);
424
400
****************************************************************************/
425
401
 
426
402
static int
427
 
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
403
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
428
404
                  List<Item> &fields_vars, List<Item> &set_fields,
429
405
                  List<Item> &set_values, READ_INFO &read_info,
430
406
                  uint32_t skip_lines, bool ignore_check_option_errors)
439
415
 
440
416
  while (!read_info.read_fixed_length())
441
417
  {
442
 
    if (session->getKilled())
 
418
    if (session->killed)
443
419
    {
444
420
      session->send_kill_message();
445
421
      return(1);
457
433
    }
458
434
    it.rewind();
459
435
    unsigned char *pos=read_info.row_start;
460
 
#ifdef HAVE_VALGRIND
 
436
#ifdef HAVE_purify
461
437
    read_info.row_end[0]=0;
462
438
#endif
463
439
 
512
488
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
513
489
    }
514
490
 
515
 
    if (session->getKilled() ||
 
491
    if (session->killed ||
516
492
        fill_record(session, set_fields, set_values,
517
493
                    ignore_check_option_errors))
518
494
      return(1);
543
519
 
544
520
 
545
521
static int
546
 
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
522
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
547
523
               List<Item> &fields_vars, List<Item> &set_fields,
548
524
               List<Item> &set_values, READ_INFO &read_info,
549
525
               String &enclosed, uint32_t skip_lines,
561
537
 
562
538
  for (;;it.rewind())
563
539
  {
564
 
    if (session->getKilled())
 
540
    if (session->killed)
565
541
    {
566
542
      session->send_kill_message();
567
543
      return(1);
653
629
    }
654
630
    if (item)
655
631
    {
656
 
      /* Have not read any field, thus input cursor is simply ended */
 
632
      /* Have not read any field, thus input file is simply ended */
657
633
      if (item == fields_vars.head())
658
634
        break;
659
635
      for (; item ; item= it++)
694
670
      }
695
671
    }
696
672
 
697
 
    if (session->getKilled() ||
 
673
    if (session->killed ||
698
674
        fill_record(session, set_fields, set_values,
699
675
                    ignore_check_option_errors))
700
676
      return(1);
715
691
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
716
692
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
717
693
                          session->row_count);
718
 
      if (session->getKilled())
 
694
      if (session->killed)
719
695
        return(1);
720
696
    }
721
697
    session->row_count++;
736
712
  case 'r': return '\r';
737
713
  case 'b': return '\b';
738
714
  case '0': return 0;                           // Ascii null
739
 
  case 'Z': return '\032';                      // Win32 end of cursor
 
715
  case 'Z': return '\032';                      // Win32 end of file
740
716
  case 'N': found_null=1;
741
717
 
742
718
    /* fall through */
751
727
*/
752
728
 
753
729
 
754
 
READ_INFO::READ_INFO(int file_par, size_t tot_length,
 
730
READ_INFO::READ_INFO(File file_par, size_t tot_length,
755
731
                     const CHARSET_INFO * const cs,
756
732
                     String &field_term, String &line_start, String &line_term,
757
733
                     String &enclosed_par, int escape, bool is_fifo)
758
 
  :cursor(file_par),escape_char(escape)
 
734
  :file(file_par),escape_char(escape)
759
735
{
760
736
  read_charset= cs;
761
737
  field_term_ptr=(char*) field_term.ptr();
789
765
 
790
766
 
791
767
  /* Set of a stack for unget if long terminators */
792
 
  size_t length= max(field_term_length,line_term_length)+1;
793
 
  set_if_bigger(length, line_start.length());
794
 
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
 
768
  uint32_t length= max(field_term_length,line_term_length)+1;
 
769
  set_if_bigger(length,line_start.length());
 
770
  stack= stack_pos= (int*) sql_alloc(sizeof(int)*length);
795
771
 
796
772
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
797
 
    error=1;
 
773
    error=1; /* purecov: inspected */
798
774
  else
799
775
  {
800
776
    end_of_buff=buffer+buff_length;
801
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
 
                            (false) ? internal::READ_NET :
803
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
804
 
                            MYF(MY_WME)))
 
777
    if (init_io_cache(&cache,(false) ? -1 : file, 0,
 
778
                      (false) ? READ_NET :
 
779
                      (is_fifo ? READ_FIFO : READ_CACHE),0L,1,
 
780
                      MYF(MY_WME)))
805
781
    {
806
 
      free((unsigned char*) buffer);
 
782
      free((unsigned char*) buffer); /* purecov: inspected */
807
783
      error=1;
808
784
    }
809
785
    else
824
800
  if (!error)
825
801
  {
826
802
    if (need_end_io_cache)
827
 
      cache.end_io_cache();
 
803
      ::end_io_cache(&cache);
828
804
    free(buffer);
829
805
    error=1;
830
806
  }
1023
999
 
1024
1000
  NOTES
1025
1001
    The row may not be fixed size on disk if there are escape
1026
 
    characters in the cursor.
 
1002
    characters in the file.
1027
1003
 
1028
1004
  IMPLEMENTATION NOTE
1029
1005
    One can't use fixed length with multi-byte charset **
1156
1132
}
1157
1133
 
1158
1134
 
1159
 
} /* namespace drizzled */