67
67
Either this method, or we need to make cache public
68
68
Arg must be set from mysql_load() since constructor does not see
69
either the table or Session value
69
either the table or THD value
71
71
void set_io_cache_arg(void* arg) { cache.arg = arg; }
74
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
75
75
List<Item> &fields_vars, List<Item> &set_fields,
76
76
List<Item> &set_values, READ_INFO &read_info,
77
77
uint32_t skip_lines,
78
78
bool ignore_check_option_errors);
79
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
79
static int read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
80
80
List<Item> &fields_vars, List<Item> &set_fields,
81
81
List<Item> &set_values, READ_INFO &read_info,
82
82
String &enclosed, uint32_t skip_lines,
83
83
bool ignore_check_option_errors);
85
static bool write_execute_load_query_log_event(Session *session,
85
static bool write_execute_load_query_log_event(THD *thd,
86
86
bool duplicates, bool ignore,
87
87
bool transactional_table,
88
Session::killed_state killed_status);
88
THD::killed_state killed_status);
91
91
Execute LOAD DATA query
95
session - current thread
96
96
ex - sql_exchange object representing source file and its parsing rules
97
97
table_list - list of tables to which we are loading data
98
98
fields_vars - list of fields and variables to which we read
108
108
true - error / false - success
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
111
int mysql_load(THD *thd,sql_exchange *ex,TableList *table_list,
112
112
List<Item> &fields_vars, List<Item> &set_fields,
113
113
List<Item> &set_values,
114
114
enum enum_duplicates handle_duplicates, bool ignore,
128
128
If this is not set, we will use the directory where the table to be
129
129
loaded is located
131
char *tdb= session->db ? session->db : db; // Result is never null
131
char *tdb= thd->db ? thd->db : db; // Result is never null
132
132
uint32_t skip_lines= ex->skip_lines;
133
133
bool transactional_table;
134
Session::killed_state killed_status= Session::NOT_KILLED;
134
THD::killed_state killed_status= THD::NOT_KILLED;
136
136
if (escaped->length() > 1 || enclosed->length() > 1)
142
if (open_and_lock_tables(session, table_list))
142
if (open_and_lock_tables(thd, table_list))
144
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
145
&session->lex->select_lex.top_join_list,
144
if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
&thd->lex->select_lex.top_join_list,
147
&session->lex->select_lex.leaf_tables, true))
147
&thd->lex->select_lex.leaf_tables, true))
155
155
table is marked to be 'used for insert' in which case we should never
156
156
mark this table as 'const table' (ie, one that has only one row).
158
if (unique_table(session, table_list, table_list->next_global, 0))
158
if (unique_table(thd, table_list, table_list->next_global, 0))
160
160
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
175
Let us also prepare SET clause, altough it is probably empty
178
if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
178
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
183
183
{ // Part field list
184
184
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
185
if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(session, table, table_list))
185
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(thd, table, table_list))
190
190
Check whenever TIMESTAMP field with auto-set feature specified
204
204
/* Fix the expressions in SET clause */
205
if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
205
if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
260
260
if (!dirname_length(ex->file_name))
262
strcpy(name, mysql_real_data_home);
263
strncat(name, tdb, FN_REFLEN-strlen(mysql_real_data_home)-1);
262
strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NULL);
264
263
(void) fn_format(name, ex->file_name, name, "",
265
264
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
284
283
// if we are not in slave thread, the file must be:
285
if (!session->slave_thread &&
284
if (!thd->slave_thread &&
286
285
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
287
286
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
288
287
((stat_info.st_mode & S_IFREG) == S_IFREG ||
305
304
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
307
306
READ_INFO read_info(file,tot_length,
308
ex->cs ? ex->cs : session->variables.collation_database,
307
ex->cs ? ex->cs : thd->variables.collation_database,
309
308
*field_term,*ex->line_start, *ex->line_term, *enclosed,
310
309
info.escape_char, read_file_from_client, is_fifo);
311
310
if (read_info.error)
318
317
if (mysql_bin_log.is_open())
320
lf_info.session = session;
321
320
lf_info.wrote_create_file = 0;
322
321
lf_info.last_pos_in_file = HA_POS_ERROR;
323
322
lf_info.log_delayed= transactional_table;
324
323
read_info.set_io_cache_arg((void*) &lf_info);
327
session->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
328
session->cuted_fields=0L;
326
thd->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
327
thd->cuted_fields=0L;
329
328
/* Skip lines if there is a line terminator */
330
329
if (ex->line_term->length())
350
349
table->file->ha_start_bulk_insert((ha_rows) 0);
351
350
table->copy_blobs=1;
353
session->abort_on_warning= true;
352
thd->abort_on_warning= true;
355
354
if (!field_term->length() && !enclosed->length())
356
error= read_fixed_length(session, info, table_list, fields_vars,
355
error= read_fixed_length(thd, info, table_list, fields_vars,
357
356
set_fields, set_values, read_info,
358
357
skip_lines, ignore);
360
error= read_sep_field(session, info, table_list, fields_vars,
359
error= read_sep_field(thd, info, table_list, fields_vars,
361
360
set_fields, set_values, read_info,
362
361
*enclosed, skip_lines, ignore);
363
362
if (table->file->ha_end_bulk_insert() && !error)
373
372
my_close(file,MYF(0));
374
373
free_blobs(table); /* if pack_blob was used */
375
374
table->copy_blobs=0;
376
session->count_cuted_fields= CHECK_FIELD_IGNORE;
375
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
378
377
simulated killing in the middle of per-row loop
379
378
must be effective for binlogging
381
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
380
killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
384
383
if (read_file_from_client)
414
413
/* If the file was not empty, wrote_create_file is true */
415
414
if (lf_info.wrote_create_file)
417
if (session->transaction.stmt.modified_non_trans_table)
418
write_execute_load_query_log_event(session, handle_duplicates,
416
if (thd->transaction.stmt.modified_non_trans_table)
417
write_execute_load_query_log_event(thd, handle_duplicates,
419
418
ignore, transactional_table,
423
Delete_file_log_event d(session, db, transactional_table);
422
Delete_file_log_event d(thd, db, transactional_table);
424
423
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
425
424
mysql_bin_log.write(&d);
433
432
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
434
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
433
(uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
436
if (session->transaction.stmt.modified_non_trans_table)
437
session->transaction.all.modified_non_trans_table= true;
435
if (thd->transaction.stmt.modified_non_trans_table)
436
thd->transaction.all.modified_non_trans_table= true;
439
438
if (mysql_bin_log.is_open())
458
457
read_info.end_io_cache();
459
458
if (lf_info.wrote_create_file)
461
write_execute_load_query_log_event(session, handle_duplicates, ignore,
460
write_execute_load_query_log_event(thd, handle_duplicates, ignore,
462
461
transactional_table,killed_status);
467
466
/* ok to client sent only after binlog write and engine commit */
468
my_ok(session, info.copied + info.deleted, 0L, name);
467
my_ok(thd, info.copied + info.deleted, 0L, name);
470
469
assert(transactional_table || !(info.copied || info.deleted) ||
471
session->transaction.stmt.modified_non_trans_table);
470
thd->transaction.stmt.modified_non_trans_table);
472
471
table->file->ha_release_auto_increment();
473
472
table->auto_increment_field_not_null= false;
474
session->abort_on_warning= 0;
473
thd->abort_on_warning= 0;
479
478
/* Not a very useful function; just to avoid duplication of code */
480
static bool write_execute_load_query_log_event(Session *session,
479
static bool write_execute_load_query_log_event(THD *thd,
481
480
bool duplicates, bool ignore,
482
481
bool transactional_table,
483
Session::killed_state killed_err_arg)
482
THD::killed_state killed_err_arg)
485
484
Execute_load_query_log_event
486
e(session, session->query, session->query_length,
487
(char*)session->lex->fname_start - (char*)session->query,
488
(char*)session->lex->fname_end - (char*)session->query,
485
e(thd, thd->query, thd->query_length,
486
(char*)thd->lex->fname_start - (char*)thd->query,
487
(char*)thd->lex->fname_end - (char*)thd->query,
489
488
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
490
489
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
491
490
transactional_table, false, killed_err_arg);
499
498
****************************************************************************/
502
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
501
read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
503
502
List<Item> &fields_vars, List<Item> &set_fields,
504
503
List<Item> &set_values, READ_INFO &read_info,
505
504
uint32_t skip_lines, bool ignore_check_option_errors)
556
555
if (pos == read_info.row_end)
558
session->cuted_fields++; /* Not enough fields */
559
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
557
thd->cuted_fields++; /* Not enough fields */
558
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
560
559
ER_WARN_TOO_FEW_RECORDS,
561
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
560
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
562
561
if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
563
562
((Field_timestamp*) field)->set_time();
579
578
if (pos != read_info.row_end)
581
session->cuted_fields++; /* To long row */
582
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
580
thd->cuted_fields++; /* To long row */
581
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
583
582
ER_WARN_TOO_MANY_RECORDS,
584
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
583
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
587
if (session->killed ||
588
fill_record(session, set_fields, set_values,
587
fill_record(thd, set_fields, set_values,
589
588
ignore_check_option_errors))
592
err= write_record(session, table, &info);
591
err= write_record(thd, table, &info);
593
592
table->auto_increment_field_not_null= false;
603
602
if (read_info.line_cuted)
605
session->cuted_fields++; /* To long row */
606
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
604
thd->cuted_fields++; /* To long row */
605
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
607
606
ER_WARN_TOO_MANY_RECORDS,
608
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
607
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
610
session->row_count++;
612
611
return(test(read_info.error));
618
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
617
read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
619
618
List<Item> &fields_vars, List<Item> &set_fields,
620
619
List<Item> &set_values, READ_INFO &read_info,
621
620
String &enclosed, uint32_t skip_lines,
746
745
QQ: We probably should not throw warning for each field.
747
746
But how about intention to always have the same number
748
of warnings in Session::cuted_fields (and get rid of cuted_fields
747
of warnings in THD::cuted_fields (and get rid of cuted_fields
751
session->cuted_fields++;
752
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
751
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
753
752
ER_WARN_TOO_FEW_RECORDS,
754
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
753
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
756
755
else if (item->type() == Item::STRING_ITEM)
769
if (session->killed ||
770
fill_record(session, set_fields, set_values,
769
fill_record(thd, set_fields, set_values,
771
770
ignore_check_option_errors))
774
err= write_record(session, table, &info);
773
err= write_record(thd, table, &info);
775
774
table->auto_increment_field_not_null= false;
784
783
if (read_info.line_cuted)
786
session->cuted_fields++; /* To long row */
787
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
785
thd->cuted_fields++; /* To long row */
786
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
788
787
ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
793
session->row_count++;
795
794
return(test(read_info.error));