~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Jay Pipes
  • Date: 2009-05-31 21:21:44 UTC
  • mto: This revision was merged to the branch mainline in revision 1046.
  • Revision ID: jpipes@serialcoder-20090531212144-dn18058mx55azhms
Yet more indentation and style cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
/* Copy data from a textfile to table */
18
18
 
19
19
#include <drizzled/server_includes.h>
20
 
#include "sql_repl.h"
 
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
 
 
 
23
#include <drizzled/session.h>
 
24
#include <drizzled/sql_base.h>
 
25
#include <drizzled/field/timestamp.h>
24
26
 
25
27
class READ_INFO {
26
28
  File  file;
27
 
  unsigned char *buffer,                        /* Buffer for read text */
28
 
        *end_of_buff;                   /* Data in bufferts ends here */
29
 
  uint  buff_length,                    /* Length of buffert */
30
 
        max_length;                     /* Max length of row */
 
29
  unsigned char *buffer;                /* Buffer for read text */
 
30
  unsigned char *end_of_buff;           /* Data in bufferts ends here */
 
31
  size_t buff_length;                   /* Length of buffert */
 
32
  size_t max_length;                    /* Max length of row */
31
33
  char  *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
32
34
  uint  field_term_length,line_term_length,enclosed_length;
33
35
  int   field_term_char,line_term_char,enclosed_char,escape_char;
35
37
  bool  found_end_of_line,start_of_line,eof;
36
38
  bool  need_end_io_cache;
37
39
  IO_CACHE cache;
38
 
  NET *io_net;
39
40
 
40
41
public:
41
42
  bool error,line_cuted,found_null,enclosed;
43
44
        *row_end;                       /* Found row ends here */
44
45
  const CHARSET_INFO *read_charset;
45
46
 
46
 
  READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
 
47
  READ_INFO(File file, size_t tot_length, const CHARSET_INFO * const cs,
47
48
            String &field_term,String &line_start,String &line_term,
48
 
            String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
 
49
            String &enclosed,int escape, bool is_fifo);
49
50
  ~READ_INFO();
50
51
  int read_field();
51
52
  int read_fixed_length(void);
83
84
                          String &enclosed, uint32_t skip_lines,
84
85
                          bool ignore_check_option_errors);
85
86
 
86
 
static bool write_execute_load_query_log_event(Session *session,
87
 
                                               bool duplicates, bool ignore,
88
 
                                               bool transactional_table,
89
 
                                               Session::killed_state killed_status);
90
87
 
91
88
/*
92
89
  Execute LOAD DATA query
94
91
  SYNOPSYS
95
92
    mysql_load()
96
93
      session - current thread
97
 
      ex  - sql_exchange object representing source file and its parsing rules
 
94
      ex  - file_exchange object representing source file and its parsing rules
98
95
      table_list  - list of tables to which we are loading data
99
96
      fields_vars - list of fields and variables to which we read
100
97
                    data from file
103
100
      handle_duplicates - indicates whenever we should emit error or
104
101
                          replace row if we will meet duplicates.
105
102
      ignore -          - indicates whenever we should ignore duplicates
106
 
      read_file_from_client - is this LOAD DATA LOCAL ?
107
103
 
108
104
  RETURN VALUES
109
105
    true - error / false - success
110
106
*/
111
107
 
112
 
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
 
108
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
113
109
                List<Item> &fields_vars, List<Item> &set_fields,
114
110
                List<Item> &set_values,
115
 
                enum enum_duplicates handle_duplicates, bool ignore,
116
 
                bool read_file_from_client)
 
111
                enum enum_duplicates handle_duplicates, bool ignore)
117
112
{
118
113
  char name[FN_REFLEN];
119
114
  File file;
122
117
  String *field_term=ex->field_term,*escaped=ex->escaped;
123
118
  String *enclosed=ex->enclosed;
124
119
  bool is_fifo=0;
125
 
  LOAD_FILE_INFO lf_info;
126
 
  char *db = table_list->db;                    // This is never null
 
120
  char *db= table_list->db;                     // This is never null
 
121
  assert(db);
127
122
  /*
128
123
    If path for file is not defined, we will use the current database.
129
124
    If this is not set, we will use the directory where the table to be
130
125
    loaded is located
131
126
  */
132
127
  char *tdb= session->db ? session->db : db;            // Result is never null
 
128
  assert(tdb);
133
129
  uint32_t skip_lines= ex->skip_lines;
134
130
  bool transactional_table;
135
131
  Session::killed_state killed_status= Session::NOT_KILLED;
136
132
 
137
 
  if (escaped->length() > 1 || enclosed->length() > 1)
 
133
  /* Escape and enclosed character may be a utf8 4-byte character */
 
134
  if (escaped->length() > 4 || enclosed->length() > 4)
138
135
  {
139
 
    my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
140
 
               MYF(0));
 
136
    my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
141
137
    return(true);
142
138
  }
143
139
  if (open_and_lock_tables(session, table_list))
170
166
    Field **field;
171
167
    for (field=table->field; *field ; field++)
172
168
      fields_vars.push_back(new Item_field(*field));
173
 
    bitmap_set_all(table->write_set);
 
169
    table->setWriteSet();
174
170
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
175
171
    /*
176
172
      Let us also prepare SET clause, altough it is probably empty
193
189
    */
194
190
    if (table->timestamp_field)
195
191
    {
196
 
      if (bitmap_is_set(table->write_set,
197
 
                        table->timestamp_field->field_index))
 
192
      if (table->isWriteSet(table->timestamp_field->field_index))
198
193
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
199
194
      else
200
195
      {
201
 
        bitmap_set_bit(table->write_set,
202
 
                       table->timestamp_field->field_index);
 
196
        table->setWriteSet(table->timestamp_field->field_index);
203
197
      }
204
198
    }
205
199
    /* Fix the expressions in SET clause */
209
203
 
210
204
  table->mark_columns_needed_for_insert();
211
205
 
212
 
  uint32_t tot_length=0;
 
206
  size_t tot_length=0;
213
207
  bool use_blobs= 0, use_vars= 0;
214
208
  List_iterator_fast<Item> it(fields_vars);
215
209
  Item *item;
244
238
    return(true);
245
239
  }
246
240
 
247
 
  /* We can't give an error in the middle when using LOCAL files */
248
 
  if (read_file_from_client && handle_duplicates == DUP_ERROR)
249
 
    ignore= 1;
250
 
 
251
 
  if (read_file_from_client)
252
 
  {
253
 
    (void)net_request_file(&session->net,ex->file_name);
254
 
    file = -1;
255
 
  }
256
 
  else
257
241
  {
258
242
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
259
243
    ex->file_name+=dirname_length(ex->file_name);
280
264
 
281
265
      struct stat stat_info;
282
266
      if (stat(name,&stat_info))
 
267
      {
 
268
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
283
269
        return(true);
 
270
      }
284
271
 
285
272
      // if we are not in slave thread, the file must be:
286
 
      if (!session->slave_thread &&
287
 
          !((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
288
 
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
 
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
 
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
273
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
274
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
275
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
276
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
291
277
      {
292
278
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
293
279
        return(true);
296
282
        is_fifo = 1;
297
283
    }
298
284
    if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
285
    {
 
286
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
299
287
      return(true);
 
288
    }
300
289
  }
301
290
 
302
291
  COPY_INFO info;
305
294
  info.handle_duplicates=handle_duplicates;
306
295
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
307
296
 
308
 
  READ_INFO read_info(file,tot_length,
309
 
                      ex->cs ? ex->cs : session->variables.collation_database,
 
297
  READ_INFO read_info(file, tot_length,
 
298
                      ex->cs ? ex->cs : get_default_db_collation(session->db),
310
299
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
311
 
                      info.escape_char, read_file_from_client, is_fifo);
 
300
                      info.escape_char, is_fifo);
312
301
  if (read_info.error)
313
302
  {
314
303
    if  (file >= 0)
316
305
    return(true);                               // Can't allocate buffers
317
306
  }
318
307
 
319
 
  if (mysql_bin_log.is_open())
320
 
  {
321
 
    lf_info.session = session;
322
 
    lf_info.wrote_create_file = 0;
323
 
    lf_info.last_pos_in_file = HA_POS_ERROR;
324
 
    lf_info.log_delayed= transactional_table;
325
 
    read_info.set_io_cache_arg((void*) &lf_info);
326
 
  }
327
 
 
328
 
  session->count_cuted_fields= CHECK_FIELD_WARN;                /* calc cuted fields */
 
308
  /*
 
309
   * Per the SQL standard, inserting NULL into a NOT NULL
 
310
   * field requires an error to be thrown.
 
311
   *
 
312
   * @NOTE
 
313
   *
 
314
   * NULL check and handling occurs in field_conv.cc
 
315
   */
 
316
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
329
317
  session->cuted_fields=0L;
330
318
  /* Skip lines if there is a line terminator */
331
319
  if (ex->line_term->length())
375
363
  free_blobs(table);                            /* if pack_blob was used */
376
364
  table->copy_blobs=0;
377
365
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
378
 
  /* 
 
366
  /*
379
367
     simulated killing in the middle of per-row loop
380
368
     must be effective for binlogging
381
369
  */
382
370
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
383
371
  if (error)
384
372
  {
385
 
    if (read_file_from_client)
386
 
      while (!read_info.next_line())
387
 
        ;
388
 
 
389
 
    if (mysql_bin_log.is_open())
390
 
    {
391
 
      {
392
 
        /*
393
 
          Make sure last block (the one which caused the error) gets
394
 
          logged.  This is needed because otherwise after write of (to
395
 
          the binlog, not to read_info (which is a cache))
396
 
          Delete_file_log_event the bad block will remain in read_info
397
 
          (because pre_read is not called at the end of the last
398
 
          block; remember pre_read is called whenever a new block is
399
 
          read from disk).  At the end of mysql_load(), the destructor
400
 
          of read_info will call end_io_cache() which will flush
401
 
          read_info, so we will finally have this in the binlog:
402
 
 
403
 
          Append_block # The last successfull block
404
 
          Delete_file
405
 
          Append_block # The failing block
406
 
          which is nonsense.
407
 
          Or could also be (for a small file)
408
 
          Create_file  # The failing block
409
 
          which is nonsense (Delete_file is not written in this case, because:
410
 
          Create_file has not been written, so Delete_file is not written, then
411
 
          when read_info is destroyed end_io_cache() is called which writes
412
 
          Create_file.
413
 
        */
414
 
        read_info.end_io_cache();
415
 
        /* If the file was not empty, wrote_create_file is true */
416
 
        if (lf_info.wrote_create_file)
417
 
        {
418
 
          if (session->transaction.stmt.modified_non_trans_table)
419
 
            write_execute_load_query_log_event(session, handle_duplicates,
420
 
                                               ignore, transactional_table,
421
 
                                               killed_status);
422
 
          else
423
 
          {
424
 
            Delete_file_log_event d(session, db, transactional_table);
425
 
            d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
426
 
            mysql_bin_log.write(&d);
427
 
          }
428
 
        }
429
 
      }
430
 
    }
431
373
    error= -1;                          // Error on read
432
374
    goto err;
433
375
  }
437
379
  if (session->transaction.stmt.modified_non_trans_table)
438
380
    session->transaction.all.modified_non_trans_table= true;
439
381
 
440
 
  if (mysql_bin_log.is_open())
441
 
  {
442
 
    /*
443
 
      We need to do the job that is normally done inside
444
 
      binlog_query() here, which is to ensure that the pending event
445
 
      is written before tables are unlocked and before any other
446
 
      events are written.  We also need to update the table map
447
 
      version for the binary log to mark that table maps are invalid
448
 
      after this point.
449
 
     */
450
 
    if (session->current_stmt_binlog_row_based)
451
 
      session->binlog_flush_pending_rows_event(true);
452
 
    else
453
 
    {
454
 
      /*
455
 
        As already explained above, we need to call end_io_cache() or the last
456
 
        block will be logged only after Execute_load_query_log_event (which is
457
 
        wrong), when read_info is destroyed.
458
 
      */
459
 
      read_info.end_io_cache();
460
 
      if (lf_info.wrote_create_file)
461
 
      {
462
 
        write_execute_load_query_log_event(session, handle_duplicates, ignore,
463
 
                                           transactional_table,killed_status);
464
 
      }
465
 
    }
466
 
  }
467
 
 
468
382
  /* ok to client sent only after binlog write and engine commit */
469
 
  my_ok(session, info.copied + info.deleted, 0L, name);
 
383
  session->my_ok(info.copied + info.deleted, 0L, name);
470
384
err:
471
385
  assert(transactional_table || !(info.copied || info.deleted) ||
472
386
              session->transaction.stmt.modified_non_trans_table);
477
391
}
478
392
 
479
393
 
480
 
/* Not a very useful function; just to avoid duplication of code */
481
 
static bool write_execute_load_query_log_event(Session *session,
482
 
                                               bool duplicates, bool ignore,
483
 
                                               bool transactional_table,
484
 
                                               Session::killed_state killed_err_arg)
485
 
{
486
 
  Execute_load_query_log_event
487
 
    e(session, session->query, session->query_length,
488
 
      (char*)session->lex->fname_start - (char*)session->query,
489
 
      (char*)session->lex->fname_end - (char*)session->query,
490
 
      (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
491
 
      (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
492
 
      transactional_table, false, killed_err_arg);
493
 
  e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
494
 
  return mysql_bin_log.write(&e);
495
 
}
496
 
 
497
 
 
498
394
/****************************************************************************
499
395
** Read of rows of fixed size + optional garage + optonal newline
500
396
****************************************************************************/
512
408
  bool err;
513
409
 
514
410
  id= 0;
515
 
 
 
411
 
516
412
  while (!read_info.read_fixed_length())
517
413
  {
518
414
    if (session->killed)
537
433
    read_info.row_end[0]=0;
538
434
#endif
539
435
 
540
 
    restore_record(table, s->default_values);
 
436
    table->restoreRecordAsDefault();
541
437
    /*
542
438
      There is no variables in fields_vars list in this format so
543
439
      this conversion is safe.
544
440
    */
545
441
    while ((sql_field= (Item_field*) it++))
546
442
    {
547
 
      Field *field= sql_field->field;                  
 
443
      Field *field= sql_field->field;
548
444
      if (field == table->next_number_field)
549
445
        table->auto_increment_field_not_null= true;
550
446
      /*
557
453
      if (pos == read_info.row_end)
558
454
      {
559
455
        session->cuted_fields++;                        /* Not enough fields */
560
 
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
561
 
                            ER_WARN_TOO_FEW_RECORDS, 
 
456
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
457
                            ER_WARN_TOO_FEW_RECORDS,
562
458
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
563
459
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
564
460
            ((Field_timestamp*) field)->set_time();
567
463
      {
568
464
        uint32_t length;
569
465
        unsigned char save_chr;
570
 
        if ((length=(uint) (read_info.row_end-pos)) >
 
466
        if ((length=(uint32_t) (read_info.row_end-pos)) >
571
467
            field->field_length)
 
468
        {
572
469
          length=field->field_length;
573
 
        save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
 
470
        }
 
471
        save_chr=pos[length];
 
472
        pos[length]='\0'; // Add temp null terminator for store()
574
473
        field->store((char*) pos,length,read_info.read_charset);
575
474
        pos[length]=save_chr;
576
475
        if ((pos+=length) > read_info.row_end)
580
479
    if (pos != read_info.row_end)
581
480
    {
582
481
      session->cuted_fields++;                  /* To long row */
583
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
584
 
                          ER_WARN_TOO_MANY_RECORDS, 
585
 
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
 
482
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
483
                          ER_WARN_TOO_MANY_RECORDS,
 
484
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
586
485
    }
587
486
 
588
487
    if (session->killed ||
594
493
    table->auto_increment_field_not_null= false;
595
494
    if (err)
596
495
      return(1);
597
 
   
 
496
 
598
497
    /*
599
498
      We don't need to reset auto-increment field since we are restoring
600
499
      its default value at the beginning of each loop iteration.
604
503
    if (read_info.line_cuted)
605
504
    {
606
505
      session->cuted_fields++;                  /* To long row */
607
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
608
 
                          ER_WARN_TOO_MANY_RECORDS, 
609
 
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
 
506
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
507
                          ER_WARN_TOO_MANY_RECORDS,
 
508
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
610
509
    }
611
510
    session->row_count++;
612
511
  }
640
539
      return(1);
641
540
    }
642
541
 
643
 
    restore_record(table, s->default_values);
 
542
    table->restoreRecordAsDefault();
644
543
 
645
544
    while ((item= it++))
646
545
    {
656
555
        continue;
657
556
 
658
557
      pos=read_info.row_start;
659
 
      length=(uint) (read_info.row_end-pos);
 
558
      length=(uint32_t) (read_info.row_end-pos);
660
559
 
661
560
      real_item= item->real_item();
662
561
 
785
684
    if (read_info.line_cuted)
786
685
    {
787
686
      session->cuted_fields++;                  /* To long row */
788
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
789
 
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS), 
790
 
                          session->row_count);   
 
687
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
688
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
 
689
                          session->row_count);
791
690
      if (session->killed)
792
691
        return(1);
793
692
    }
824
723
*/
825
724
 
826
725
 
827
 
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
 
726
READ_INFO::READ_INFO(File file_par, size_t tot_length,
 
727
                     const CHARSET_INFO * const cs,
828
728
                     String &field_term, String &line_start, String &line_term,
829
 
                     String &enclosed_par, int escape, bool get_it_from_net,
830
 
                     bool is_fifo)
 
729
                     String &enclosed_par, int escape, bool is_fifo)
831
730
  :file(file_par),escape_char(escape)
832
731
{
833
732
  read_charset= cs;
866
765
  set_if_bigger(length,line_start.length());
867
766
  stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
868
767
 
869
 
  if (!(buffer=(unsigned char*) my_malloc(buff_length+1,MYF(0))))
 
768
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
870
769
    error=1; /* purecov: inspected */
871
770
  else
872
771
  {
873
772
    end_of_buff=buffer+buff_length;
874
 
    if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
875
 
                      (get_it_from_net) ? READ_NET :
 
773
    if (init_io_cache(&cache,(false) ? -1 : file, 0,
 
774
                      (false) ? READ_NET :
876
775
                      (is_fifo ? READ_FIFO : READ_CACHE),0L,1,
877
776
                      MYF(MY_WME)))
878
777
    {
887
786
        manual assignment
888
787
      */
889
788
      need_end_io_cache = 1;
890
 
 
891
 
      if (get_it_from_net)
892
 
        cache.read_function = _my_b_net_read;
893
 
 
894
 
      if (mysql_bin_log.is_open())
895
 
        cache.pre_read = cache.pre_close =
896
 
          (IO_CACHE_CALLBACK) log_loaded_block;
897
789
    }
898
790
  }
899
791
}
905
797
  {
906
798
    if (need_end_io_cache)
907
799
      ::end_io_cache(&cache);
908
 
    free((unsigned char*) buffer);
 
800
    free(buffer);
909
801
    error=1;
910
802
  }
911
803
}
1084
976
    /*
1085
977
    ** We come here if buffer is too small. Enlarge it and continue
1086
978
    */
1087
 
    if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
1088
 
                                        MYF(MY_WME))))
 
979
    if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
1089
980
      return (error=1);
1090
981
    to=new_buffer + (to-buffer);
1091
982
    buffer=new_buffer;