67
70
Either this method, or we need to make cache public
68
71
Arg must be set from mysql_load() since constructor does not see
69
either the table or THD value
72
either the table or Session value
71
74
void set_io_cache_arg(void* arg) { cache.arg = arg; }
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
77
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
75
78
List<Item> &fields_vars, List<Item> &set_fields,
76
79
List<Item> &set_values, READ_INFO &read_info,
77
80
uint32_t skip_lines,
78
81
bool ignore_check_option_errors);
79
static int read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
82
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
80
83
List<Item> &fields_vars, List<Item> &set_fields,
81
84
List<Item> &set_values, READ_INFO &read_info,
82
85
String &enclosed, uint32_t skip_lines,
83
86
bool ignore_check_option_errors);
85
static bool write_execute_load_query_log_event(THD *thd,
86
bool duplicates, bool ignore,
87
bool transactional_table,
88
THD::killed_state killed_status);
91
90
Execute LOAD DATA query
96
ex - sql_exchange object representing source file and its parsing rules
94
session - current thread
95
ex - file_exchange object representing source file and its parsing rules
97
96
table_list - list of tables to which we are loading data
98
97
fields_vars - list of fields and variables to which we read
117
116
char name[FN_REFLEN];
121
120
String *field_term=ex->field_term,*escaped=ex->escaped;
122
121
String *enclosed=ex->enclosed;
124
LOAD_FILE_INFO lf_info;
125
char *db = table_list->db; // This is never null
123
char *db= table_list->db; // This is never null
127
126
If path for file is not defined, we will use the current database.
128
127
If this is not set, we will use the directory where the table to be
129
128
loaded is located
131
char *tdb= thd->db ? thd->db : db; // Result is never null
130
char *tdb= session->db ? session->db : db; // Result is never null
132
132
uint32_t skip_lines= ex->skip_lines;
133
133
bool transactional_table;
134
THD::killed_state killed_status= THD::NOT_KILLED;
134
Session::killed_state killed_status= Session::NOT_KILLED;
136
if (escaped->length() > 1 || enclosed->length() > 1)
136
/* Escape and enclosed character may be a utf8 4-byte character */
137
if (escaped->length() > 4 || enclosed->length() > 4)
138
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
139
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
142
if (open_and_lock_tables(thd, table_list))
142
if (open_and_lock_tables(session, table_list))
144
if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
&thd->lex->select_lex.top_join_list,
144
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
145
&session->lex->select_lex.top_join_list,
147
&thd->lex->select_lex.leaf_tables, true))
147
&session->lex->select_lex.leaf_tables, true))
175
175
Let us also prepare SET clause, altough it is probably empty
178
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
178
if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
183
183
{ // Part field list
184
184
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
185
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(thd, table, table_list))
185
if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(session, table, table_list))
190
190
Check whenever TIMESTAMP field with auto-set feature specified
349
354
table->file->ha_start_bulk_insert((ha_rows) 0);
350
355
table->copy_blobs=1;
352
thd->abort_on_warning= (!ignore &&
353
(thd->variables.sql_mode &
354
(MODE_STRICT_TRANS_TABLES |
355
MODE_STRICT_ALL_TABLES)));
357
session->abort_on_warning= true;
357
359
if (!field_term->length() && !enclosed->length())
358
error= read_fixed_length(thd, info, table_list, fields_vars,
360
error= read_fixed_length(session, info, table_list, fields_vars,
359
361
set_fields, set_values, read_info,
360
362
skip_lines, ignore);
362
error= read_sep_field(thd, info, table_list, fields_vars,
364
error= read_sep_field(session, info, table_list, fields_vars,
363
365
set_fields, set_values, read_info,
364
366
*enclosed, skip_lines, ignore);
365
367
if (table->file->ha_end_bulk_insert() && !error)
375
377
my_close(file,MYF(0));
376
378
free_blobs(table); /* if pack_blob was used */
377
379
table->copy_blobs=0;
378
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
380
session->count_cuted_fields= CHECK_FIELD_IGNORE;
380
382
simulated killing in the middle of per-row loop
381
383
must be effective for binlogging
383
killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
385
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
386
388
if (read_file_from_client)
387
389
while (!read_info.next_line())
390
if (mysql_bin_log.is_open())
394
Make sure last block (the one which caused the error) gets
395
logged. This is needed because otherwise after write of (to
396
the binlog, not to read_info (which is a cache))
397
Delete_file_log_event the bad block will remain in read_info
398
(because pre_read is not called at the end of the last
399
block; remember pre_read is called whenever a new block is
400
read from disk). At the end of mysql_load(), the destructor
401
of read_info will call end_io_cache() which will flush
402
read_info, so we will finally have this in the binlog:
404
Append_block # The last successfull block
406
Append_block # The failing block
408
Or could also be (for a small file)
409
Create_file # The failing block
410
which is nonsense (Delete_file is not written in this case, because:
411
Create_file has not been written, so Delete_file is not written, then
412
when read_info is destroyed end_io_cache() is called which writes
415
read_info.end_io_cache();
416
/* If the file was not empty, wrote_create_file is true */
417
if (lf_info.wrote_create_file)
419
if (thd->transaction.stmt.modified_non_trans_table)
420
write_execute_load_query_log_event(thd, handle_duplicates,
421
ignore, transactional_table,
425
Delete_file_log_event d(thd, db, transactional_table);
426
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
427
mysql_bin_log.write(&d);
432
392
error= -1; // Error on read
435
395
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
436
(uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
438
if (thd->transaction.stmt.modified_non_trans_table)
439
thd->transaction.all.modified_non_trans_table= true;
441
if (mysql_bin_log.is_open())
444
We need to do the job that is normally done inside
445
binlog_query() here, which is to ensure that the pending event
446
is written before tables are unlocked and before any other
447
events are written. We also need to update the table map
448
version for the binary log to mark that table maps are invalid
451
if (thd->current_stmt_binlog_row_based)
452
thd->binlog_flush_pending_rows_event(true);
456
As already explained above, we need to call end_io_cache() or the last
457
block will be logged only after Execute_load_query_log_event (which is
458
wrong), when read_info is destroyed.
460
read_info.end_io_cache();
461
if (lf_info.wrote_create_file)
463
write_execute_load_query_log_event(thd, handle_duplicates, ignore,
464
transactional_table,killed_status);
396
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
398
if (session->transaction.stmt.modified_non_trans_table)
399
session->transaction.all.modified_non_trans_table= true;
469
401
/* ok to client sent only after binlog write and engine commit */
470
my_ok(thd, info.copied + info.deleted, 0L, name);
402
session->my_ok(info.copied + info.deleted, 0L, name);
472
404
assert(transactional_table || !(info.copied || info.deleted) ||
473
thd->transaction.stmt.modified_non_trans_table);
405
session->transaction.stmt.modified_non_trans_table);
474
406
table->file->ha_release_auto_increment();
475
407
table->auto_increment_field_not_null= false;
476
thd->abort_on_warning= 0;
408
session->abort_on_warning= 0;
481
/* Not a very useful function; just to avoid duplication of code */
482
static bool write_execute_load_query_log_event(THD *thd,
483
bool duplicates, bool ignore,
484
bool transactional_table,
485
THD::killed_state killed_err_arg)
487
Execute_load_query_log_event
488
e(thd, thd->query, thd->query_length,
489
(char*)thd->lex->fname_start - (char*)thd->query,
490
(char*)thd->lex->fname_end - (char*)thd->query,
491
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
492
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
493
transactional_table, false, killed_err_arg);
494
e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
495
return mysql_bin_log.write(&e);
499
413
/****************************************************************************
500
414
** Read of rows of fixed size + optional garage + optonal newline
501
415
****************************************************************************/
504
read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
418
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
505
419
List<Item> &fields_vars, List<Item> &set_fields,
506
420
List<Item> &set_values, READ_INFO &read_info,
507
421
uint32_t skip_lines, bool ignore_check_option_errors)
509
423
List_iterator_fast<Item> it(fields_vars);
510
424
Item_field *sql_field;
511
TABLE *table= table_list->table;
425
Table *table= table_list->table;
517
431
while (!read_info.read_fixed_length())
521
thd->send_kill_message();
435
session->send_kill_message();
581
495
if (pos != read_info.row_end)
583
thd->cuted_fields++; /* To long row */
584
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
585
ER_WARN_TOO_MANY_RECORDS,
586
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
497
session->cuted_fields++; /* To long row */
498
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
499
ER_WARN_TOO_MANY_RECORDS,
500
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
590
fill_record(thd, set_fields, set_values,
503
if (session->killed ||
504
fill_record(session, set_fields, set_values,
591
505
ignore_check_option_errors))
594
err= write_record(thd, table, &info);
508
err= write_record(session, table, &info);
595
509
table->auto_increment_field_not_null= false;
600
514
We don't need to reset auto-increment field since we are restoring
601
515
its default value at the beginning of each loop iteration.