~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Brian Aker
  • Date: 2008-10-06 06:47:29 UTC
  • Revision ID: brian@tangent.org-20081006064729-2i9mhjkzyvow9xsm
RemoveĀ uint.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
#include <drizzled/server_includes.h>
20
20
#include "sql_repl.h"
21
 
#include <drizzled/error.h>
 
21
#include <drizzled/drizzled_error_messages.h>
22
22
 
23
23
 
24
24
class READ_INFO {
66
66
  /*
67
67
    Either this method, or we need to make cache public
68
68
    Arg must be set from mysql_load() since constructor does not see
69
 
    either the table or Session value
 
69
    either the table or THD value
70
70
  */
71
71
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
72
72
};
73
73
 
74
 
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
75
75
                             List<Item> &fields_vars, List<Item> &set_fields,
76
76
                             List<Item> &set_values, READ_INFO &read_info,
77
77
                             uint32_t skip_lines,
78
78
                             bool ignore_check_option_errors);
79
 
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
79
static int read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
80
80
                          List<Item> &fields_vars, List<Item> &set_fields,
81
81
                          List<Item> &set_values, READ_INFO &read_info,
82
82
                          String &enclosed, uint32_t skip_lines,
83
83
                          bool ignore_check_option_errors);
84
84
 
85
 
static bool write_execute_load_query_log_event(Session *session,
 
85
static bool write_execute_load_query_log_event(THD *thd,
86
86
                                               bool duplicates, bool ignore,
87
87
                                               bool transactional_table,
88
 
                                               Session::killed_state killed_status);
 
88
                                               THD::killed_state killed_status);
89
89
 
90
90
/*
91
91
  Execute LOAD DATA query
92
92
 
93
93
  SYNOPSYS
94
94
    mysql_load()
95
 
      session - current thread
 
95
      thd - current thread
96
96
      ex  - sql_exchange object representing source file and its parsing rules
97
97
      table_list  - list of tables to which we are loading data
98
98
      fields_vars - list of fields and variables to which we read
108
108
    true - error / false - success
109
109
*/
110
110
 
111
 
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
 
111
int mysql_load(THD *thd,sql_exchange *ex,TableList *table_list,
112
112
                List<Item> &fields_vars, List<Item> &set_fields,
113
113
                List<Item> &set_values,
114
114
                enum enum_duplicates handle_duplicates, bool ignore,
128
128
    If this is not set, we will use the directory where the table to be
129
129
    loaded is located
130
130
  */
131
 
  char *tdb= session->db ? session->db : db;            // Result is never null
 
131
  char *tdb= thd->db ? thd->db : db;            // Result is never null
132
132
  uint32_t skip_lines= ex->skip_lines;
133
133
  bool transactional_table;
134
 
  Session::killed_state killed_status= Session::NOT_KILLED;
 
134
  THD::killed_state killed_status= THD::NOT_KILLED;
135
135
 
136
136
  if (escaped->length() > 1 || enclosed->length() > 1)
137
137
  {
139
139
               MYF(0));
140
140
    return(true);
141
141
  }
142
 
  if (open_and_lock_tables(session, table_list))
 
142
  if (open_and_lock_tables(thd, table_list))
143
143
    return(true);
144
 
  if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
145
 
                                    &session->lex->select_lex.top_join_list,
 
144
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
 
145
                                    &thd->lex->select_lex.top_join_list,
146
146
                                    table_list,
147
 
                                    &session->lex->select_lex.leaf_tables, true))
 
147
                                    &thd->lex->select_lex.leaf_tables, true))
148
148
     return(-1);
149
149
 
150
150
  /*
155
155
    table is marked to be 'used for insert' in which case we should never
156
156
    mark this table as 'const table' (ie, one that has only one row).
157
157
  */
158
 
  if (unique_table(session, table_list, table_list->next_global, 0))
 
158
  if (unique_table(thd, table_list, table_list->next_global, 0))
159
159
  {
160
160
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
161
161
    return(true);
175
175
      Let us also prepare SET clause, altough it is probably empty
176
176
      in this case.
177
177
    */
178
 
    if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
 
        setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
178
    if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
179
        setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
180
180
      return(true);
181
181
  }
182
182
  else
183
183
  {                                             // Part field list
184
184
    /* TODO: use this conds for 'WITH CHECK OPTIONS' */
185
 
    if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
 
        setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
 
        check_that_all_fields_are_given_values(session, table, table_list))
 
185
    if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
 
186
        setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
187
        check_that_all_fields_are_given_values(thd, table, table_list))
188
188
      return(true);
189
189
    /*
190
190
      Check whenever TIMESTAMP field with auto-set feature specified
202
202
      }
203
203
    }
204
204
    /* Fix the expressions in SET clause */
205
 
    if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
205
    if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
206
206
      return(true);
207
207
  }
208
208
 
249
249
 
250
250
  if (read_file_from_client)
251
251
  {
252
 
    (void)net_request_file(&session->net,ex->file_name);
 
252
    (void)net_request_file(&thd->net,ex->file_name);
253
253
    file = -1;
254
254
  }
255
255
  else
259
259
#endif
260
260
    if (!dirname_length(ex->file_name))
261
261
    {
262
 
      strcpy(name, mysql_real_data_home);
263
 
      strncat(name, tdb, FN_REFLEN-strlen(mysql_real_data_home)-1);
 
262
      strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NULL);
264
263
      (void) fn_format(name, ex->file_name, name, "",
265
264
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
266
265
    }
282
281
        return(true);
283
282
 
284
283
      // if we are not in slave thread, the file must be:
285
 
      if (!session->slave_thread &&
 
284
      if (!thd->slave_thread &&
286
285
          !((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
287
286
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
288
287
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
305
304
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
306
305
 
307
306
  READ_INFO read_info(file,tot_length,
308
 
                      ex->cs ? ex->cs : session->variables.collation_database,
 
307
                      ex->cs ? ex->cs : thd->variables.collation_database,
309
308
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
310
309
                      info.escape_char, read_file_from_client, is_fifo);
311
310
  if (read_info.error)
317
316
 
318
317
  if (mysql_bin_log.is_open())
319
318
  {
320
 
    lf_info.session = session;
 
319
    lf_info.thd = thd;
321
320
    lf_info.wrote_create_file = 0;
322
321
    lf_info.last_pos_in_file = HA_POS_ERROR;
323
322
    lf_info.log_delayed= transactional_table;
324
323
    read_info.set_io_cache_arg((void*) &lf_info);
325
324
  }
326
325
 
327
 
  session->count_cuted_fields= CHECK_FIELD_WARN;                /* calc cuted fields */
328
 
  session->cuted_fields=0L;
 
326
  thd->count_cuted_fields= CHECK_FIELD_WARN;            /* calc cuted fields */
 
327
  thd->cuted_fields=0L;
329
328
  /* Skip lines if there is a line terminator */
330
329
  if (ex->line_term->length())
331
330
  {
350
349
    table->file->ha_start_bulk_insert((ha_rows) 0);
351
350
    table->copy_blobs=1;
352
351
 
353
 
    session->abort_on_warning= true;
 
352
    thd->abort_on_warning= true;
354
353
 
355
354
    if (!field_term->length() && !enclosed->length())
356
 
      error= read_fixed_length(session, info, table_list, fields_vars,
 
355
      error= read_fixed_length(thd, info, table_list, fields_vars,
357
356
                               set_fields, set_values, read_info,
358
357
                               skip_lines, ignore);
359
358
    else
360
 
      error= read_sep_field(session, info, table_list, fields_vars,
 
359
      error= read_sep_field(thd, info, table_list, fields_vars,
361
360
                            set_fields, set_values, read_info,
362
361
                            *enclosed, skip_lines, ignore);
363
362
    if (table->file->ha_end_bulk_insert() && !error)
373
372
    my_close(file,MYF(0));
374
373
  free_blobs(table);                            /* if pack_blob was used */
375
374
  table->copy_blobs=0;
376
 
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
 
375
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
377
376
  /* 
378
377
     simulated killing in the middle of per-row loop
379
378
     must be effective for binlogging
380
379
  */
381
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
 
380
  killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
382
381
  if (error)
383
382
  {
384
383
    if (read_file_from_client)
414
413
        /* If the file was not empty, wrote_create_file is true */
415
414
        if (lf_info.wrote_create_file)
416
415
        {
417
 
          if (session->transaction.stmt.modified_non_trans_table)
418
 
            write_execute_load_query_log_event(session, handle_duplicates,
 
416
          if (thd->transaction.stmt.modified_non_trans_table)
 
417
            write_execute_load_query_log_event(thd, handle_duplicates,
419
418
                                               ignore, transactional_table,
420
419
                                               killed_status);
421
420
          else
422
421
          {
423
 
            Delete_file_log_event d(session, db, transactional_table);
 
422
            Delete_file_log_event d(thd, db, transactional_table);
424
423
            d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
425
424
            mysql_bin_log.write(&d);
426
425
          }
431
430
    goto err;
432
431
  }
433
432
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
434
 
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
433
          (uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
435
434
 
436
 
  if (session->transaction.stmt.modified_non_trans_table)
437
 
    session->transaction.all.modified_non_trans_table= true;
 
435
  if (thd->transaction.stmt.modified_non_trans_table)
 
436
    thd->transaction.all.modified_non_trans_table= true;
438
437
 
439
438
  if (mysql_bin_log.is_open())
440
439
  {
446
445
      version for the binary log to mark that table maps are invalid
447
446
      after this point.
448
447
     */
449
 
    if (session->current_stmt_binlog_row_based)
450
 
      session->binlog_flush_pending_rows_event(true);
 
448
    if (thd->current_stmt_binlog_row_based)
 
449
      thd->binlog_flush_pending_rows_event(true);
451
450
    else
452
451
    {
453
452
      /*
458
457
      read_info.end_io_cache();
459
458
      if (lf_info.wrote_create_file)
460
459
      {
461
 
        write_execute_load_query_log_event(session, handle_duplicates, ignore,
 
460
        write_execute_load_query_log_event(thd, handle_duplicates, ignore,
462
461
                                           transactional_table,killed_status);
463
462
      }
464
463
    }
465
464
  }
466
465
 
467
466
  /* ok to client sent only after binlog write and engine commit */
468
 
  my_ok(session, info.copied + info.deleted, 0L, name);
 
467
  my_ok(thd, info.copied + info.deleted, 0L, name);
469
468
err:
470
469
  assert(transactional_table || !(info.copied || info.deleted) ||
471
 
              session->transaction.stmt.modified_non_trans_table);
 
470
              thd->transaction.stmt.modified_non_trans_table);
472
471
  table->file->ha_release_auto_increment();
473
472
  table->auto_increment_field_not_null= false;
474
 
  session->abort_on_warning= 0;
 
473
  thd->abort_on_warning= 0;
475
474
  return(error);
476
475
}
477
476
 
478
477
 
479
478
/* Not a very useful function; just to avoid duplication of code */
480
 
static bool write_execute_load_query_log_event(Session *session,
 
479
static bool write_execute_load_query_log_event(THD *thd,
481
480
                                               bool duplicates, bool ignore,
482
481
                                               bool transactional_table,
483
 
                                               Session::killed_state killed_err_arg)
 
482
                                               THD::killed_state killed_err_arg)
484
483
{
485
484
  Execute_load_query_log_event
486
 
    e(session, session->query, session->query_length,
487
 
      (char*)session->lex->fname_start - (char*)session->query,
488
 
      (char*)session->lex->fname_end - (char*)session->query,
 
485
    e(thd, thd->query, thd->query_length,
 
486
      (char*)thd->lex->fname_start - (char*)thd->query,
 
487
      (char*)thd->lex->fname_end - (char*)thd->query,
489
488
      (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
490
489
      (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
491
490
      transactional_table, false, killed_err_arg);
499
498
****************************************************************************/
500
499
 
501
500
static int
502
 
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
501
read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
503
502
                  List<Item> &fields_vars, List<Item> &set_fields,
504
503
                  List<Item> &set_values, READ_INFO &read_info,
505
504
                  uint32_t skip_lines, bool ignore_check_option_errors)
514
513
 
515
514
  while (!read_info.read_fixed_length())
516
515
  {
517
 
    if (session->killed)
 
516
    if (thd->killed)
518
517
    {
519
 
      session->send_kill_message();
 
518
      thd->send_kill_message();
520
519
      return(1);
521
520
    }
522
521
    if (skip_lines)
555
554
 
556
555
      if (pos == read_info.row_end)
557
556
      {
558
 
        session->cuted_fields++;                        /* Not enough fields */
559
 
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
557
        thd->cuted_fields++;                    /* Not enough fields */
 
558
        push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
560
559
                            ER_WARN_TOO_FEW_RECORDS, 
561
 
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
 
560
                            ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
562
561
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
563
562
            ((Field_timestamp*) field)->set_time();
564
563
      }
578
577
    }
579
578
    if (pos != read_info.row_end)
580
579
    {
581
 
      session->cuted_fields++;                  /* To long row */
582
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
580
      thd->cuted_fields++;                      /* To long row */
 
581
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
583
582
                          ER_WARN_TOO_MANY_RECORDS, 
584
 
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
 
583
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
585
584
    }
586
585
 
587
 
    if (session->killed ||
588
 
        fill_record(session, set_fields, set_values,
 
586
    if (thd->killed ||
 
587
        fill_record(thd, set_fields, set_values,
589
588
                    ignore_check_option_errors))
590
589
      return(1);
591
590
 
592
 
    err= write_record(session, table, &info);
 
591
    err= write_record(thd, table, &info);
593
592
    table->auto_increment_field_not_null= false;
594
593
    if (err)
595
594
      return(1);
602
601
      break;
603
602
    if (read_info.line_cuted)
604
603
    {
605
 
      session->cuted_fields++;                  /* To long row */
606
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
604
      thd->cuted_fields++;                      /* To long row */
 
605
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
607
606
                          ER_WARN_TOO_MANY_RECORDS, 
608
 
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
 
607
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
609
608
    }
610
 
    session->row_count++;
 
609
    thd->row_count++;
611
610
  }
612
611
  return(test(read_info.error));
613
612
}
615
614
 
616
615
 
617
616
static int
618
 
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
617
read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
619
618
               List<Item> &fields_vars, List<Item> &set_fields,
620
619
               List<Item> &set_values, READ_INFO &read_info,
621
620
               String &enclosed, uint32_t skip_lines,
633
632
 
634
633
  for (;;it.rewind())
635
634
  {
636
 
    if (session->killed)
 
635
    if (thd->killed)
637
636
    {
638
 
      session->send_kill_message();
 
637
      thd->send_kill_message();
639
638
      return(1);
640
639
    }
641
640
 
669
668
          if (field->reset())
670
669
          {
671
670
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
672
 
                     session->row_count);
 
671
                     thd->row_count);
673
672
            return(1);
674
673
          }
675
674
          field->set_null();
737
736
          if (field->reset())
738
737
          {
739
738
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
740
 
                     session->row_count);
 
739
                     thd->row_count);
741
740
            return(1);
742
741
          }
743
742
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
745
744
          /*
746
745
            QQ: We probably should not throw warning for each field.
747
746
            But how about intention to always have the same number
748
 
            of warnings in Session::cuted_fields (and get rid of cuted_fields
 
747
            of warnings in THD::cuted_fields (and get rid of cuted_fields
749
748
            in the end ?)
750
749
          */
751
 
          session->cuted_fields++;
752
 
          push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
750
          thd->cuted_fields++;
 
751
          push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
753
752
                              ER_WARN_TOO_FEW_RECORDS,
754
 
                              ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
 
753
                              ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
755
754
        }
756
755
        else if (item->type() == Item::STRING_ITEM)
757
756
        {
766
765
      }
767
766
    }
768
767
 
769
 
    if (session->killed ||
770
 
        fill_record(session, set_fields, set_values,
 
768
    if (thd->killed ||
 
769
        fill_record(thd, set_fields, set_values,
771
770
                    ignore_check_option_errors))
772
771
      return(1);
773
772
 
774
 
    err= write_record(session, table, &info);
 
773
    err= write_record(thd, table, &info);
775
774
    table->auto_increment_field_not_null= false;
776
775
    if (err)
777
776
      return(1);
783
782
      break;
784
783
    if (read_info.line_cuted)
785
784
    {
786
 
      session->cuted_fields++;                  /* To long row */
787
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
785
      thd->cuted_fields++;                      /* To long row */
 
786
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
788
787
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS), 
789
 
                          session->row_count);   
790
 
      if (session->killed)
 
788
                          thd->row_count);   
 
789
      if (thd->killed)
791
790
        return(1);
792
791
    }
793
 
    session->row_count++;
 
792
    thd->row_count++;
794
793
  }
795
794
  return(test(read_info.error));
796
795
}