~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Monty Taylor
  • Date: 2008-10-23 23:53:49 UTC
  • mto: This revision was merged to the branch mainline in revision 557.
  • Revision ID: monty@inaugust.com-20081023235349-317wgwqwgccuacmq
SplitĀ outĀ nested_join.h.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
#include <drizzled/server_includes.h>
20
20
#include "sql_repl.h"
21
 
#include <drizzled/drizzled_error_messages.h>
 
21
#include <drizzled/error.h>
22
22
 
23
23
 
24
24
class READ_INFO {
66
66
  /*
67
67
    Either this method, or we need to make cache public
68
68
    Arg must be set from mysql_load() since constructor does not see
69
 
    either the table or THD value
 
69
    either the table or Session value
70
70
  */
71
71
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
72
72
};
73
73
 
74
 
static int read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
 
74
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
75
75
                             List<Item> &fields_vars, List<Item> &set_fields,
76
76
                             List<Item> &set_values, READ_INFO &read_info,
77
77
                             uint32_t skip_lines,
78
78
                             bool ignore_check_option_errors);
79
 
static int read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
 
79
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
80
80
                          List<Item> &fields_vars, List<Item> &set_fields,
81
81
                          List<Item> &set_values, READ_INFO &read_info,
82
82
                          String &enclosed, uint32_t skip_lines,
83
83
                          bool ignore_check_option_errors);
84
84
 
85
 
static bool write_execute_load_query_log_event(THD *thd,
 
85
static bool write_execute_load_query_log_event(Session *session,
86
86
                                               bool duplicates, bool ignore,
87
87
                                               bool transactional_table,
88
 
                                               THD::killed_state killed_status);
 
88
                                               Session::killed_state killed_status);
89
89
 
90
90
/*
91
91
  Execute LOAD DATA query
92
92
 
93
93
  SYNOPSYS
94
94
    mysql_load()
95
 
      thd - current thread
 
95
      session - current thread
96
96
      ex  - sql_exchange object representing source file and its parsing rules
97
97
      table_list  - list of tables to which we are loading data
98
98
      fields_vars - list of fields and variables to which we read
108
108
    true - error / false - success
109
109
*/
110
110
 
111
 
int mysql_load(THD *thd,sql_exchange *ex,TableList *table_list,
 
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
112
112
                List<Item> &fields_vars, List<Item> &set_fields,
113
113
                List<Item> &set_values,
114
114
                enum enum_duplicates handle_duplicates, bool ignore,
128
128
    If this is not set, we will use the directory where the table to be
129
129
    loaded is located
130
130
  */
131
 
  char *tdb= thd->db ? thd->db : db;            // Result is never null
 
131
  char *tdb= session->db ? session->db : db;            // Result is never null
132
132
  uint32_t skip_lines= ex->skip_lines;
133
133
  bool transactional_table;
134
 
  THD::killed_state killed_status= THD::NOT_KILLED;
 
134
  Session::killed_state killed_status= Session::NOT_KILLED;
135
135
 
136
136
  if (escaped->length() > 1 || enclosed->length() > 1)
137
137
  {
139
139
               MYF(0));
140
140
    return(true);
141
141
  }
142
 
  if (open_and_lock_tables(thd, table_list))
 
142
  if (open_and_lock_tables(session, table_list))
143
143
    return(true);
144
 
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
 
                                    &thd->lex->select_lex.top_join_list,
 
144
  if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
 
145
                                    &session->lex->select_lex.top_join_list,
146
146
                                    table_list,
147
 
                                    &thd->lex->select_lex.leaf_tables, true))
 
147
                                    &session->lex->select_lex.leaf_tables, true))
148
148
     return(-1);
149
149
 
150
150
  /*
155
155
    table is marked to be 'used for insert' in which case we should never
156
156
    mark this table as 'const table' (ie, one that has only one row).
157
157
  */
158
 
  if (unique_table(thd, table_list, table_list->next_global, 0))
 
158
  if (unique_table(session, table_list, table_list->next_global, 0))
159
159
  {
160
160
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
161
161
    return(true);
175
175
      Let us also prepare SET clause, altough it is probably empty
176
176
      in this case.
177
177
    */
178
 
    if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
 
        setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
178
    if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
179
        setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
180
180
      return(true);
181
181
  }
182
182
  else
183
183
  {                                             // Part field list
184
184
    /* TODO: use this conds for 'WITH CHECK OPTIONS' */
185
 
    if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
 
        setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
 
        check_that_all_fields_are_given_values(thd, table, table_list))
 
185
    if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
 
186
        setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
187
        check_that_all_fields_are_given_values(session, table, table_list))
188
188
      return(true);
189
189
    /*
190
190
      Check whenever TIMESTAMP field with auto-set feature specified
202
202
      }
203
203
    }
204
204
    /* Fix the expressions in SET clause */
205
 
    if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
205
    if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
206
206
      return(true);
207
207
  }
208
208
 
249
249
 
250
250
  if (read_file_from_client)
251
251
  {
252
 
    (void)net_request_file(&thd->net,ex->file_name);
 
252
    (void)net_request_file(&session->net,ex->file_name);
253
253
    file = -1;
254
254
  }
255
255
  else
259
259
#endif
260
260
    if (!dirname_length(ex->file_name))
261
261
    {
262
 
      strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NULL);
 
262
      strcpy(name, mysql_real_data_home);
 
263
      strncat(name, tdb, FN_REFLEN-strlen(mysql_real_data_home)-1);
263
264
      (void) fn_format(name, ex->file_name, name, "",
264
265
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
265
266
    }
281
282
        return(true);
282
283
 
283
284
      // if we are not in slave thread, the file must be:
284
 
      if (!thd->slave_thread &&
 
285
      if (!session->slave_thread &&
285
286
          !((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
286
287
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
287
288
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
305
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
305
306
 
306
307
  READ_INFO read_info(file,tot_length,
307
 
                      ex->cs ? ex->cs : thd->variables.collation_database,
 
308
                      ex->cs ? ex->cs : session->variables.collation_database,
308
309
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
309
310
                      info.escape_char, read_file_from_client, is_fifo);
310
311
  if (read_info.error)
316
317
 
317
318
  if (mysql_bin_log.is_open())
318
319
  {
319
 
    lf_info.thd = thd;
 
320
    lf_info.session = session;
320
321
    lf_info.wrote_create_file = 0;
321
322
    lf_info.last_pos_in_file = HA_POS_ERROR;
322
323
    lf_info.log_delayed= transactional_table;
323
324
    read_info.set_io_cache_arg((void*) &lf_info);
324
325
  }
325
326
 
326
 
  thd->count_cuted_fields= CHECK_FIELD_WARN;            /* calc cuted fields */
327
 
  thd->cuted_fields=0L;
 
327
  session->count_cuted_fields= CHECK_FIELD_WARN;                /* calc cuted fields */
 
328
  session->cuted_fields=0L;
328
329
  /* Skip lines if there is a line terminator */
329
330
  if (ex->line_term->length())
330
331
  {
349
350
    table->file->ha_start_bulk_insert((ha_rows) 0);
350
351
    table->copy_blobs=1;
351
352
 
352
 
    thd->abort_on_warning= true;
 
353
    session->abort_on_warning= true;
353
354
 
354
355
    if (!field_term->length() && !enclosed->length())
355
 
      error= read_fixed_length(thd, info, table_list, fields_vars,
 
356
      error= read_fixed_length(session, info, table_list, fields_vars,
356
357
                               set_fields, set_values, read_info,
357
358
                               skip_lines, ignore);
358
359
    else
359
 
      error= read_sep_field(thd, info, table_list, fields_vars,
 
360
      error= read_sep_field(session, info, table_list, fields_vars,
360
361
                            set_fields, set_values, read_info,
361
362
                            *enclosed, skip_lines, ignore);
362
363
    if (table->file->ha_end_bulk_insert() && !error)
372
373
    my_close(file,MYF(0));
373
374
  free_blobs(table);                            /* if pack_blob was used */
374
375
  table->copy_blobs=0;
375
 
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
 
376
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
376
377
  /* 
377
378
     simulated killing in the middle of per-row loop
378
379
     must be effective for binlogging
379
380
  */
380
 
  killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
 
381
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
381
382
  if (error)
382
383
  {
383
384
    if (read_file_from_client)
413
414
        /* If the file was not empty, wrote_create_file is true */
414
415
        if (lf_info.wrote_create_file)
415
416
        {
416
 
          if (thd->transaction.stmt.modified_non_trans_table)
417
 
            write_execute_load_query_log_event(thd, handle_duplicates,
 
417
          if (session->transaction.stmt.modified_non_trans_table)
 
418
            write_execute_load_query_log_event(session, handle_duplicates,
418
419
                                               ignore, transactional_table,
419
420
                                               killed_status);
420
421
          else
421
422
          {
422
 
            Delete_file_log_event d(thd, db, transactional_table);
 
423
            Delete_file_log_event d(session, db, transactional_table);
423
424
            d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
424
425
            mysql_bin_log.write(&d);
425
426
          }
430
431
    goto err;
431
432
  }
432
433
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
433
 
          (uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
 
434
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
434
435
 
435
 
  if (thd->transaction.stmt.modified_non_trans_table)
436
 
    thd->transaction.all.modified_non_trans_table= true;
 
436
  if (session->transaction.stmt.modified_non_trans_table)
 
437
    session->transaction.all.modified_non_trans_table= true;
437
438
 
438
439
  if (mysql_bin_log.is_open())
439
440
  {
445
446
      version for the binary log to mark that table maps are invalid
446
447
      after this point.
447
448
     */
448
 
    if (thd->current_stmt_binlog_row_based)
449
 
      thd->binlog_flush_pending_rows_event(true);
 
449
    if (session->current_stmt_binlog_row_based)
 
450
      session->binlog_flush_pending_rows_event(true);
450
451
    else
451
452
    {
452
453
      /*
457
458
      read_info.end_io_cache();
458
459
      if (lf_info.wrote_create_file)
459
460
      {
460
 
        write_execute_load_query_log_event(thd, handle_duplicates, ignore,
 
461
        write_execute_load_query_log_event(session, handle_duplicates, ignore,
461
462
                                           transactional_table,killed_status);
462
463
      }
463
464
    }
464
465
  }
465
466
 
466
467
  /* ok to client sent only after binlog write and engine commit */
467
 
  my_ok(thd, info.copied + info.deleted, 0L, name);
 
468
  my_ok(session, info.copied + info.deleted, 0L, name);
468
469
err:
469
470
  assert(transactional_table || !(info.copied || info.deleted) ||
470
 
              thd->transaction.stmt.modified_non_trans_table);
 
471
              session->transaction.stmt.modified_non_trans_table);
471
472
  table->file->ha_release_auto_increment();
472
473
  table->auto_increment_field_not_null= false;
473
 
  thd->abort_on_warning= 0;
 
474
  session->abort_on_warning= 0;
474
475
  return(error);
475
476
}
476
477
 
477
478
 
478
479
/* Not a very useful function; just to avoid duplication of code */
479
 
static bool write_execute_load_query_log_event(THD *thd,
 
480
static bool write_execute_load_query_log_event(Session *session,
480
481
                                               bool duplicates, bool ignore,
481
482
                                               bool transactional_table,
482
 
                                               THD::killed_state killed_err_arg)
 
483
                                               Session::killed_state killed_err_arg)
483
484
{
484
485
  Execute_load_query_log_event
485
 
    e(thd, thd->query, thd->query_length,
486
 
      (char*)thd->lex->fname_start - (char*)thd->query,
487
 
      (char*)thd->lex->fname_end - (char*)thd->query,
 
486
    e(session, session->query, session->query_length,
 
487
      (char*)session->lex->fname_start - (char*)session->query,
 
488
      (char*)session->lex->fname_end - (char*)session->query,
488
489
      (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
489
490
      (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
490
491
      transactional_table, false, killed_err_arg);
498
499
****************************************************************************/
499
500
 
500
501
static int
501
 
read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
 
502
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
502
503
                  List<Item> &fields_vars, List<Item> &set_fields,
503
504
                  List<Item> &set_values, READ_INFO &read_info,
504
505
                  uint32_t skip_lines, bool ignore_check_option_errors)
513
514
 
514
515
  while (!read_info.read_fixed_length())
515
516
  {
516
 
    if (thd->killed)
 
517
    if (session->killed)
517
518
    {
518
 
      thd->send_kill_message();
 
519
      session->send_kill_message();
519
520
      return(1);
520
521
    }
521
522
    if (skip_lines)
554
555
 
555
556
      if (pos == read_info.row_end)
556
557
      {
557
 
        thd->cuted_fields++;                    /* Not enough fields */
558
 
        push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
558
        session->cuted_fields++;                        /* Not enough fields */
 
559
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
559
560
                            ER_WARN_TOO_FEW_RECORDS, 
560
 
                            ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
 
561
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
561
562
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
562
563
            ((Field_timestamp*) field)->set_time();
563
564
      }
577
578
    }
578
579
    if (pos != read_info.row_end)
579
580
    {
580
 
      thd->cuted_fields++;                      /* To long row */
581
 
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
581
      session->cuted_fields++;                  /* To long row */
 
582
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
582
583
                          ER_WARN_TOO_MANY_RECORDS, 
583
 
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
 
584
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
584
585
    }
585
586
 
586
 
    if (thd->killed ||
587
 
        fill_record(thd, set_fields, set_values,
 
587
    if (session->killed ||
 
588
        fill_record(session, set_fields, set_values,
588
589
                    ignore_check_option_errors))
589
590
      return(1);
590
591
 
591
 
    err= write_record(thd, table, &info);
 
592
    err= write_record(session, table, &info);
592
593
    table->auto_increment_field_not_null= false;
593
594
    if (err)
594
595
      return(1);
601
602
      break;
602
603
    if (read_info.line_cuted)
603
604
    {
604
 
      thd->cuted_fields++;                      /* To long row */
605
 
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
605
      session->cuted_fields++;                  /* To long row */
 
606
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
606
607
                          ER_WARN_TOO_MANY_RECORDS, 
607
 
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
 
608
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
608
609
    }
609
 
    thd->row_count++;
 
610
    session->row_count++;
610
611
  }
611
612
  return(test(read_info.error));
612
613
}
614
615
 
615
616
 
616
617
static int
617
 
read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
 
618
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
618
619
               List<Item> &fields_vars, List<Item> &set_fields,
619
620
               List<Item> &set_values, READ_INFO &read_info,
620
621
               String &enclosed, uint32_t skip_lines,
632
633
 
633
634
  for (;;it.rewind())
634
635
  {
635
 
    if (thd->killed)
 
636
    if (session->killed)
636
637
    {
637
 
      thd->send_kill_message();
 
638
      session->send_kill_message();
638
639
      return(1);
639
640
    }
640
641
 
668
669
          if (field->reset())
669
670
          {
670
671
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
671
 
                     thd->row_count);
 
672
                     session->row_count);
672
673
            return(1);
673
674
          }
674
675
          field->set_null();
736
737
          if (field->reset())
737
738
          {
738
739
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
739
 
                     thd->row_count);
 
740
                     session->row_count);
740
741
            return(1);
741
742
          }
742
743
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
744
745
          /*
745
746
            QQ: We probably should not throw warning for each field.
746
747
            But how about intention to always have the same number
747
 
            of warnings in THD::cuted_fields (and get rid of cuted_fields
 
748
            of warnings in Session::cuted_fields (and get rid of cuted_fields
748
749
            in the end ?)
749
750
          */
750
 
          thd->cuted_fields++;
751
 
          push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
751
          session->cuted_fields++;
 
752
          push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
752
753
                              ER_WARN_TOO_FEW_RECORDS,
753
 
                              ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
 
754
                              ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
754
755
        }
755
756
        else if (item->type() == Item::STRING_ITEM)
756
757
        {
765
766
      }
766
767
    }
767
768
 
768
 
    if (thd->killed ||
769
 
        fill_record(thd, set_fields, set_values,
 
769
    if (session->killed ||
 
770
        fill_record(session, set_fields, set_values,
770
771
                    ignore_check_option_errors))
771
772
      return(1);
772
773
 
773
 
    err= write_record(thd, table, &info);
 
774
    err= write_record(session, table, &info);
774
775
    table->auto_increment_field_not_null= false;
775
776
    if (err)
776
777
      return(1);
782
783
      break;
783
784
    if (read_info.line_cuted)
784
785
    {
785
 
      thd->cuted_fields++;                      /* To long row */
786
 
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
786
      session->cuted_fields++;                  /* To long row */
 
787
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
787
788
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS), 
788
 
                          thd->row_count);   
789
 
      if (thd->killed)
 
789
                          session->row_count);   
 
790
      if (session->killed)
790
791
        return(1);
791
792
    }
792
 
    thd->row_count++;
 
793
    session->row_count++;
793
794
  }
794
795
  return(test(read_info.error));
795
796
}