67
67
Either this method, or we need to make cache public
68
68
Arg must be set from mysql_load() since constructor does not see
69
either the table or THD value
69
either the table or Session value
71
71
void set_io_cache_arg(void* arg) { cache.arg = arg; }
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
74
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
75
75
List<Item> &fields_vars, List<Item> &set_fields,
76
76
List<Item> &set_values, READ_INFO &read_info,
77
77
uint32_t skip_lines,
78
78
bool ignore_check_option_errors);
79
static int read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
79
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
80
80
List<Item> &fields_vars, List<Item> &set_fields,
81
81
List<Item> &set_values, READ_INFO &read_info,
82
82
String &enclosed, uint32_t skip_lines,
83
83
bool ignore_check_option_errors);
85
static bool write_execute_load_query_log_event(THD *thd,
85
static bool write_execute_load_query_log_event(Session *session,
86
86
bool duplicates, bool ignore,
87
87
bool transactional_table,
88
THD::killed_state killed_status);
88
Session::killed_state killed_status);
91
91
Execute LOAD DATA query
95
session - current thread
96
96
ex - sql_exchange object representing source file and its parsing rules
97
97
table_list - list of tables to which we are loading data
98
98
fields_vars - list of fields and variables to which we read
108
108
true - error / false - success
111
int mysql_load(THD *thd,sql_exchange *ex,TableList *table_list,
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
112
112
List<Item> &fields_vars, List<Item> &set_fields,
113
113
List<Item> &set_values,
114
114
enum enum_duplicates handle_duplicates, bool ignore,
128
128
If this is not set, we will use the directory where the table to be
129
129
loaded is located
131
char *tdb= thd->db ? thd->db : db; // Result is never null
131
char *tdb= session->db ? session->db : db; // Result is never null
132
132
uint32_t skip_lines= ex->skip_lines;
133
133
bool transactional_table;
134
THD::killed_state killed_status= THD::NOT_KILLED;
134
Session::killed_state killed_status= Session::NOT_KILLED;
136
136
if (escaped->length() > 1 || enclosed->length() > 1)
142
if (open_and_lock_tables(thd, table_list))
142
if (open_and_lock_tables(session, table_list))
144
if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
&thd->lex->select_lex.top_join_list,
144
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
145
&session->lex->select_lex.top_join_list,
147
&thd->lex->select_lex.leaf_tables, true))
147
&session->lex->select_lex.leaf_tables, true))
155
155
table is marked to be 'used for insert' in which case we should never
156
156
mark this table as 'const table' (ie, one that has only one row).
158
if (unique_table(thd, table_list, table_list->next_global, 0))
158
if (unique_table(session, table_list, table_list->next_global, 0))
160
160
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
175
Let us also prepare SET clause, altough it is probably empty
178
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
178
if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
183
183
{ // Part field list
184
184
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
185
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(thd, table, table_list))
185
if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(session, table, table_list))
190
190
Check whenever TIMESTAMP field with auto-set feature specified
204
204
/* Fix the expressions in SET clause */
205
if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
205
if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
260
260
if (!dirname_length(ex->file_name))
262
strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NULL);
262
strcpy(name, mysql_real_data_home);
263
strncat(name, tdb, FN_REFLEN-strlen(mysql_real_data_home)-1);
263
264
(void) fn_format(name, ex->file_name, name, "",
264
265
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
283
284
// if we are not in slave thread, the file must be:
284
if (!thd->slave_thread &&
285
if (!session->slave_thread &&
285
286
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
286
287
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
287
288
((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
305
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
306
307
READ_INFO read_info(file,tot_length,
307
ex->cs ? ex->cs : thd->variables.collation_database,
308
ex->cs ? ex->cs : session->variables.collation_database,
308
309
*field_term,*ex->line_start, *ex->line_term, *enclosed,
309
310
info.escape_char, read_file_from_client, is_fifo);
310
311
if (read_info.error)
317
318
if (mysql_bin_log.is_open())
320
lf_info.session = session;
320
321
lf_info.wrote_create_file = 0;
321
322
lf_info.last_pos_in_file = HA_POS_ERROR;
322
323
lf_info.log_delayed= transactional_table;
323
324
read_info.set_io_cache_arg((void*) &lf_info);
326
thd->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
327
thd->cuted_fields=0L;
327
session->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
328
session->cuted_fields=0L;
328
329
/* Skip lines if there is a line terminator */
329
330
if (ex->line_term->length())
349
350
table->file->ha_start_bulk_insert((ha_rows) 0);
350
351
table->copy_blobs=1;
352
thd->abort_on_warning= true;
353
session->abort_on_warning= true;
354
355
if (!field_term->length() && !enclosed->length())
355
error= read_fixed_length(thd, info, table_list, fields_vars,
356
error= read_fixed_length(session, info, table_list, fields_vars,
356
357
set_fields, set_values, read_info,
357
358
skip_lines, ignore);
359
error= read_sep_field(thd, info, table_list, fields_vars,
360
error= read_sep_field(session, info, table_list, fields_vars,
360
361
set_fields, set_values, read_info,
361
362
*enclosed, skip_lines, ignore);
362
363
if (table->file->ha_end_bulk_insert() && !error)
372
373
my_close(file,MYF(0));
373
374
free_blobs(table); /* if pack_blob was used */
374
375
table->copy_blobs=0;
375
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
376
session->count_cuted_fields= CHECK_FIELD_IGNORE;
377
378
simulated killing in the middle of per-row loop
378
379
must be effective for binlogging
380
killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
381
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
383
384
if (read_file_from_client)
413
414
/* If the file was not empty, wrote_create_file is true */
414
415
if (lf_info.wrote_create_file)
416
if (thd->transaction.stmt.modified_non_trans_table)
417
write_execute_load_query_log_event(thd, handle_duplicates,
417
if (session->transaction.stmt.modified_non_trans_table)
418
write_execute_load_query_log_event(session, handle_duplicates,
418
419
ignore, transactional_table,
422
Delete_file_log_event d(thd, db, transactional_table);
423
Delete_file_log_event d(session, db, transactional_table);
423
424
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
424
425
mysql_bin_log.write(&d);
432
433
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
433
(uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
434
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
435
if (thd->transaction.stmt.modified_non_trans_table)
436
thd->transaction.all.modified_non_trans_table= true;
436
if (session->transaction.stmt.modified_non_trans_table)
437
session->transaction.all.modified_non_trans_table= true;
438
439
if (mysql_bin_log.is_open())
457
458
read_info.end_io_cache();
458
459
if (lf_info.wrote_create_file)
460
write_execute_load_query_log_event(thd, handle_duplicates, ignore,
461
write_execute_load_query_log_event(session, handle_duplicates, ignore,
461
462
transactional_table,killed_status);
466
467
/* ok to client sent only after binlog write and engine commit */
467
my_ok(thd, info.copied + info.deleted, 0L, name);
468
my_ok(session, info.copied + info.deleted, 0L, name);
469
470
assert(transactional_table || !(info.copied || info.deleted) ||
470
thd->transaction.stmt.modified_non_trans_table);
471
session->transaction.stmt.modified_non_trans_table);
471
472
table->file->ha_release_auto_increment();
472
473
table->auto_increment_field_not_null= false;
473
thd->abort_on_warning= 0;
474
session->abort_on_warning= 0;
478
479
/* Not a very useful function; just to avoid duplication of code */
479
static bool write_execute_load_query_log_event(THD *thd,
480
static bool write_execute_load_query_log_event(Session *session,
480
481
bool duplicates, bool ignore,
481
482
bool transactional_table,
482
THD::killed_state killed_err_arg)
483
Session::killed_state killed_err_arg)
484
485
Execute_load_query_log_event
485
e(thd, thd->query, thd->query_length,
486
(char*)thd->lex->fname_start - (char*)thd->query,
487
(char*)thd->lex->fname_end - (char*)thd->query,
486
e(session, session->query, session->query_length,
487
(char*)session->lex->fname_start - (char*)session->query,
488
(char*)session->lex->fname_end - (char*)session->query,
488
489
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
489
490
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
490
491
transactional_table, false, killed_err_arg);
498
499
****************************************************************************/
501
read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
502
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
502
503
List<Item> &fields_vars, List<Item> &set_fields,
503
504
List<Item> &set_values, READ_INFO &read_info,
504
505
uint32_t skip_lines, bool ignore_check_option_errors)
555
556
if (pos == read_info.row_end)
557
thd->cuted_fields++; /* Not enough fields */
558
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
558
session->cuted_fields++; /* Not enough fields */
559
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
559
560
ER_WARN_TOO_FEW_RECORDS,
560
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
561
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
561
562
if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
562
563
((Field_timestamp*) field)->set_time();
578
579
if (pos != read_info.row_end)
580
thd->cuted_fields++; /* To long row */
581
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
581
session->cuted_fields++; /* To long row */
582
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
582
583
ER_WARN_TOO_MANY_RECORDS,
583
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
584
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
587
fill_record(thd, set_fields, set_values,
587
if (session->killed ||
588
fill_record(session, set_fields, set_values,
588
589
ignore_check_option_errors))
591
err= write_record(thd, table, &info);
592
err= write_record(session, table, &info);
592
593
table->auto_increment_field_not_null= false;
602
603
if (read_info.line_cuted)
604
thd->cuted_fields++; /* To long row */
605
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
605
session->cuted_fields++; /* To long row */
606
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
606
607
ER_WARN_TOO_MANY_RECORDS,
607
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
608
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
610
session->row_count++;
611
612
return(test(read_info.error));
617
read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
618
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
618
619
List<Item> &fields_vars, List<Item> &set_fields,
619
620
List<Item> &set_values, READ_INFO &read_info,
620
621
String &enclosed, uint32_t skip_lines,
745
746
QQ: We probably should not throw warning for each field.
746
747
But how about intention to always have the same number
747
of warnings in THD::cuted_fields (and get rid of cuted_fields
748
of warnings in Session::cuted_fields (and get rid of cuted_fields
751
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
751
session->cuted_fields++;
752
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
752
753
ER_WARN_TOO_FEW_RECORDS,
753
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
754
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
755
756
else if (item->type() == Item::STRING_ITEM)
769
fill_record(thd, set_fields, set_values,
769
if (session->killed ||
770
fill_record(session, set_fields, set_values,
770
771
ignore_check_option_errors))
773
err= write_record(thd, table, &info);
774
err= write_record(session, table, &info);
774
775
table->auto_increment_field_not_null= false;
783
784
if (read_info.line_cuted)
785
thd->cuted_fields++; /* To long row */
786
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
786
session->cuted_fields++; /* To long row */
787
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
787
788
ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
793
session->row_count++;
794
795
return(test(read_info.error));