~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Brian Aker
  • Date: 2008-10-20 04:28:21 UTC
  • mto: (492.3.21 drizzle-clean-code)
  • mto: This revision was merged to the branch mainline in revision 530.
  • Revision ID: brian@tangent.org-20081020042821-rqqdrccuu8195k3y
Second pass of thd cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
71
71
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
72
72
};
73
73
 
74
 
static int read_fixed_length(Session *thd, COPY_INFO &info, TableList *table_list,
 
74
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
75
75
                             List<Item> &fields_vars, List<Item> &set_fields,
76
76
                             List<Item> &set_values, READ_INFO &read_info,
77
77
                             uint32_t skip_lines,
78
78
                             bool ignore_check_option_errors);
79
 
static int read_sep_field(Session *thd, COPY_INFO &info, TableList *table_list,
 
79
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
80
80
                          List<Item> &fields_vars, List<Item> &set_fields,
81
81
                          List<Item> &set_values, READ_INFO &read_info,
82
82
                          String &enclosed, uint32_t skip_lines,
83
83
                          bool ignore_check_option_errors);
84
84
 
85
 
static bool write_execute_load_query_log_event(Session *thd,
 
85
static bool write_execute_load_query_log_event(Session *session,
86
86
                                               bool duplicates, bool ignore,
87
87
                                               bool transactional_table,
88
88
                                               Session::killed_state killed_status);
92
92
 
93
93
  SYNOPSYS
94
94
    mysql_load()
95
 
      thd - current thread
 
95
      session - current thread
96
96
      ex  - sql_exchange object representing source file and its parsing rules
97
97
      table_list  - list of tables to which we are loading data
98
98
      fields_vars - list of fields and variables to which we read
108
108
    true - error / false - success
109
109
*/
110
110
 
111
 
int mysql_load(Session *thd,sql_exchange *ex,TableList *table_list,
 
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
112
112
                List<Item> &fields_vars, List<Item> &set_fields,
113
113
                List<Item> &set_values,
114
114
                enum enum_duplicates handle_duplicates, bool ignore,
128
128
    If this is not set, we will use the directory where the table to be
129
129
    loaded is located
130
130
  */
131
 
  char *tdb= thd->db ? thd->db : db;            // Result is never null
 
131
  char *tdb= session->db ? session->db : db;            // Result is never null
132
132
  uint32_t skip_lines= ex->skip_lines;
133
133
  bool transactional_table;
134
134
  Session::killed_state killed_status= Session::NOT_KILLED;
139
139
               MYF(0));
140
140
    return(true);
141
141
  }
142
 
  if (open_and_lock_tables(thd, table_list))
 
142
  if (open_and_lock_tables(session, table_list))
143
143
    return(true);
144
 
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
 
                                    &thd->lex->select_lex.top_join_list,
 
144
  if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
 
145
                                    &session->lex->select_lex.top_join_list,
146
146
                                    table_list,
147
 
                                    &thd->lex->select_lex.leaf_tables, true))
 
147
                                    &session->lex->select_lex.leaf_tables, true))
148
148
     return(-1);
149
149
 
150
150
  /*
155
155
    table is marked to be 'used for insert' in which case we should never
156
156
    mark this table as 'const table' (ie, one that has only one row).
157
157
  */
158
 
  if (unique_table(thd, table_list, table_list->next_global, 0))
 
158
  if (unique_table(session, table_list, table_list->next_global, 0))
159
159
  {
160
160
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
161
161
    return(true);
175
175
      Let us also prepare SET clause, altough it is probably empty
176
176
      in this case.
177
177
    */
178
 
    if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
 
        setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
178
    if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
179
        setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
180
180
      return(true);
181
181
  }
182
182
  else
183
183
  {                                             // Part field list
184
184
    /* TODO: use this conds for 'WITH CHECK OPTIONS' */
185
 
    if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
 
        setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
 
        check_that_all_fields_are_given_values(thd, table, table_list))
 
185
    if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
 
186
        setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
187
        check_that_all_fields_are_given_values(session, table, table_list))
188
188
      return(true);
189
189
    /*
190
190
      Check whenever TIMESTAMP field with auto-set feature specified
202
202
      }
203
203
    }
204
204
    /* Fix the expressions in SET clause */
205
 
    if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
205
    if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
206
206
      return(true);
207
207
  }
208
208
 
249
249
 
250
250
  if (read_file_from_client)
251
251
  {
252
 
    (void)net_request_file(&thd->net,ex->file_name);
 
252
    (void)net_request_file(&session->net,ex->file_name);
253
253
    file = -1;
254
254
  }
255
255
  else
281
281
        return(true);
282
282
 
283
283
      // if we are not in slave thread, the file must be:
284
 
      if (!thd->slave_thread &&
 
284
      if (!session->slave_thread &&
285
285
          !((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
286
286
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
287
287
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
304
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
305
305
 
306
306
  READ_INFO read_info(file,tot_length,
307
 
                      ex->cs ? ex->cs : thd->variables.collation_database,
 
307
                      ex->cs ? ex->cs : session->variables.collation_database,
308
308
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
309
309
                      info.escape_char, read_file_from_client, is_fifo);
310
310
  if (read_info.error)
316
316
 
317
317
  if (mysql_bin_log.is_open())
318
318
  {
319
 
    lf_info.thd = thd;
 
319
    lf_info.session = session;
320
320
    lf_info.wrote_create_file = 0;
321
321
    lf_info.last_pos_in_file = HA_POS_ERROR;
322
322
    lf_info.log_delayed= transactional_table;
323
323
    read_info.set_io_cache_arg((void*) &lf_info);
324
324
  }
325
325
 
326
 
  thd->count_cuted_fields= CHECK_FIELD_WARN;            /* calc cuted fields */
327
 
  thd->cuted_fields=0L;
 
326
  session->count_cuted_fields= CHECK_FIELD_WARN;                /* calc cuted fields */
 
327
  session->cuted_fields=0L;
328
328
  /* Skip lines if there is a line terminator */
329
329
  if (ex->line_term->length())
330
330
  {
349
349
    table->file->ha_start_bulk_insert((ha_rows) 0);
350
350
    table->copy_blobs=1;
351
351
 
352
 
    thd->abort_on_warning= true;
 
352
    session->abort_on_warning= true;
353
353
 
354
354
    if (!field_term->length() && !enclosed->length())
355
 
      error= read_fixed_length(thd, info, table_list, fields_vars,
 
355
      error= read_fixed_length(session, info, table_list, fields_vars,
356
356
                               set_fields, set_values, read_info,
357
357
                               skip_lines, ignore);
358
358
    else
359
 
      error= read_sep_field(thd, info, table_list, fields_vars,
 
359
      error= read_sep_field(session, info, table_list, fields_vars,
360
360
                            set_fields, set_values, read_info,
361
361
                            *enclosed, skip_lines, ignore);
362
362
    if (table->file->ha_end_bulk_insert() && !error)
372
372
    my_close(file,MYF(0));
373
373
  free_blobs(table);                            /* if pack_blob was used */
374
374
  table->copy_blobs=0;
375
 
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
 
375
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
376
376
  /* 
377
377
     simulated killing in the middle of per-row loop
378
378
     must be effective for binlogging
379
379
  */
380
 
  killed_status= (error == 0)? Session::NOT_KILLED : thd->killed;
 
380
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
381
381
  if (error)
382
382
  {
383
383
    if (read_file_from_client)
413
413
        /* If the file was not empty, wrote_create_file is true */
414
414
        if (lf_info.wrote_create_file)
415
415
        {
416
 
          if (thd->transaction.stmt.modified_non_trans_table)
417
 
            write_execute_load_query_log_event(thd, handle_duplicates,
 
416
          if (session->transaction.stmt.modified_non_trans_table)
 
417
            write_execute_load_query_log_event(session, handle_duplicates,
418
418
                                               ignore, transactional_table,
419
419
                                               killed_status);
420
420
          else
421
421
          {
422
 
            Delete_file_log_event d(thd, db, transactional_table);
 
422
            Delete_file_log_event d(session, db, transactional_table);
423
423
            d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
424
424
            mysql_bin_log.write(&d);
425
425
          }
430
430
    goto err;
431
431
  }
432
432
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
433
 
          (uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
 
433
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
434
434
 
435
 
  if (thd->transaction.stmt.modified_non_trans_table)
436
 
    thd->transaction.all.modified_non_trans_table= true;
 
435
  if (session->transaction.stmt.modified_non_trans_table)
 
436
    session->transaction.all.modified_non_trans_table= true;
437
437
 
438
438
  if (mysql_bin_log.is_open())
439
439
  {
445
445
      version for the binary log to mark that table maps are invalid
446
446
      after this point.
447
447
     */
448
 
    if (thd->current_stmt_binlog_row_based)
449
 
      thd->binlog_flush_pending_rows_event(true);
 
448
    if (session->current_stmt_binlog_row_based)
 
449
      session->binlog_flush_pending_rows_event(true);
450
450
    else
451
451
    {
452
452
      /*
457
457
      read_info.end_io_cache();
458
458
      if (lf_info.wrote_create_file)
459
459
      {
460
 
        write_execute_load_query_log_event(thd, handle_duplicates, ignore,
 
460
        write_execute_load_query_log_event(session, handle_duplicates, ignore,
461
461
                                           transactional_table,killed_status);
462
462
      }
463
463
    }
464
464
  }
465
465
 
466
466
  /* ok to client sent only after binlog write and engine commit */
467
 
  my_ok(thd, info.copied + info.deleted, 0L, name);
 
467
  my_ok(session, info.copied + info.deleted, 0L, name);
468
468
err:
469
469
  assert(transactional_table || !(info.copied || info.deleted) ||
470
 
              thd->transaction.stmt.modified_non_trans_table);
 
470
              session->transaction.stmt.modified_non_trans_table);
471
471
  table->file->ha_release_auto_increment();
472
472
  table->auto_increment_field_not_null= false;
473
 
  thd->abort_on_warning= 0;
 
473
  session->abort_on_warning= 0;
474
474
  return(error);
475
475
}
476
476
 
477
477
 
478
478
/* Not a very useful function; just to avoid duplication of code */
479
 
static bool write_execute_load_query_log_event(Session *thd,
 
479
static bool write_execute_load_query_log_event(Session *session,
480
480
                                               bool duplicates, bool ignore,
481
481
                                               bool transactional_table,
482
482
                                               Session::killed_state killed_err_arg)
483
483
{
484
484
  Execute_load_query_log_event
485
 
    e(thd, thd->query, thd->query_length,
486
 
      (char*)thd->lex->fname_start - (char*)thd->query,
487
 
      (char*)thd->lex->fname_end - (char*)thd->query,
 
485
    e(session, session->query, session->query_length,
 
486
      (char*)session->lex->fname_start - (char*)session->query,
 
487
      (char*)session->lex->fname_end - (char*)session->query,
488
488
      (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
489
489
      (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
490
490
      transactional_table, false, killed_err_arg);
498
498
****************************************************************************/
499
499
 
500
500
static int
501
 
read_fixed_length(Session *thd, COPY_INFO &info, TableList *table_list,
 
501
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
502
502
                  List<Item> &fields_vars, List<Item> &set_fields,
503
503
                  List<Item> &set_values, READ_INFO &read_info,
504
504
                  uint32_t skip_lines, bool ignore_check_option_errors)
513
513
 
514
514
  while (!read_info.read_fixed_length())
515
515
  {
516
 
    if (thd->killed)
 
516
    if (session->killed)
517
517
    {
518
 
      thd->send_kill_message();
 
518
      session->send_kill_message();
519
519
      return(1);
520
520
    }
521
521
    if (skip_lines)
554
554
 
555
555
      if (pos == read_info.row_end)
556
556
      {
557
 
        thd->cuted_fields++;                    /* Not enough fields */
558
 
        push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
557
        session->cuted_fields++;                        /* Not enough fields */
 
558
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
559
559
                            ER_WARN_TOO_FEW_RECORDS, 
560
 
                            ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
 
560
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
561
561
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
562
562
            ((Field_timestamp*) field)->set_time();
563
563
      }
577
577
    }
578
578
    if (pos != read_info.row_end)
579
579
    {
580
 
      thd->cuted_fields++;                      /* To long row */
581
 
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
580
      session->cuted_fields++;                  /* To long row */
 
581
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
582
582
                          ER_WARN_TOO_MANY_RECORDS, 
583
 
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
 
583
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
584
584
    }
585
585
 
586
 
    if (thd->killed ||
587
 
        fill_record(thd, set_fields, set_values,
 
586
    if (session->killed ||
 
587
        fill_record(session, set_fields, set_values,
588
588
                    ignore_check_option_errors))
589
589
      return(1);
590
590
 
591
 
    err= write_record(thd, table, &info);
 
591
    err= write_record(session, table, &info);
592
592
    table->auto_increment_field_not_null= false;
593
593
    if (err)
594
594
      return(1);
601
601
      break;
602
602
    if (read_info.line_cuted)
603
603
    {
604
 
      thd->cuted_fields++;                      /* To long row */
605
 
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
604
      session->cuted_fields++;                  /* To long row */
 
605
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
606
606
                          ER_WARN_TOO_MANY_RECORDS, 
607
 
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
 
607
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
608
608
    }
609
 
    thd->row_count++;
 
609
    session->row_count++;
610
610
  }
611
611
  return(test(read_info.error));
612
612
}
614
614
 
615
615
 
616
616
static int
617
 
read_sep_field(Session *thd, COPY_INFO &info, TableList *table_list,
 
617
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
618
618
               List<Item> &fields_vars, List<Item> &set_fields,
619
619
               List<Item> &set_values, READ_INFO &read_info,
620
620
               String &enclosed, uint32_t skip_lines,
632
632
 
633
633
  for (;;it.rewind())
634
634
  {
635
 
    if (thd->killed)
 
635
    if (session->killed)
636
636
    {
637
 
      thd->send_kill_message();
 
637
      session->send_kill_message();
638
638
      return(1);
639
639
    }
640
640
 
668
668
          if (field->reset())
669
669
          {
670
670
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
671
 
                     thd->row_count);
 
671
                     session->row_count);
672
672
            return(1);
673
673
          }
674
674
          field->set_null();
736
736
          if (field->reset())
737
737
          {
738
738
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
739
 
                     thd->row_count);
 
739
                     session->row_count);
740
740
            return(1);
741
741
          }
742
742
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
747
747
            of warnings in Session::cuted_fields (and get rid of cuted_fields
748
748
            in the end ?)
749
749
          */
750
 
          thd->cuted_fields++;
751
 
          push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
750
          session->cuted_fields++;
 
751
          push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
752
752
                              ER_WARN_TOO_FEW_RECORDS,
753
 
                              ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
 
753
                              ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
754
754
        }
755
755
        else if (item->type() == Item::STRING_ITEM)
756
756
        {
765
765
      }
766
766
    }
767
767
 
768
 
    if (thd->killed ||
769
 
        fill_record(thd, set_fields, set_values,
 
768
    if (session->killed ||
 
769
        fill_record(session, set_fields, set_values,
770
770
                    ignore_check_option_errors))
771
771
      return(1);
772
772
 
773
 
    err= write_record(thd, table, &info);
 
773
    err= write_record(session, table, &info);
774
774
    table->auto_increment_field_not_null= false;
775
775
    if (err)
776
776
      return(1);
782
782
      break;
783
783
    if (read_info.line_cuted)
784
784
    {
785
 
      thd->cuted_fields++;                      /* To long row */
786
 
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
785
      session->cuted_fields++;                  /* To long row */
 
786
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
787
787
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS), 
788
 
                          thd->row_count);   
789
 
      if (thd->killed)
 
788
                          session->row_count);   
 
789
      if (session->killed)
790
790
        return(1);
791
791
    }
792
 
    thd->row_count++;
 
792
    session->row_count++;
793
793
  }
794
794
  return(test(read_info.error));
795
795
}