67
80
Either this method, or we need to make cache public
68
81
Arg must be set from mysql_load() since constructor does not see
69
either the table or THD value
82
either the table or Session value
71
84
void set_io_cache_arg(void* arg) { cache.arg = arg; }
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
87
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
75
88
List<Item> &fields_vars, List<Item> &set_fields,
76
89
List<Item> &set_values, READ_INFO &read_info,
77
90
uint32_t skip_lines,
78
91
bool ignore_check_option_errors);
79
static int read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
92
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
80
93
List<Item> &fields_vars, List<Item> &set_fields,
81
94
List<Item> &set_values, READ_INFO &read_info,
82
95
String &enclosed, uint32_t skip_lines,
83
96
bool ignore_check_option_errors);
85
static bool write_execute_load_query_log_event(THD *thd,
86
bool duplicates, bool ignore,
87
bool transactional_table,
88
THD::killed_state killed_status);
91
100
Execute LOAD DATA query
96
ex - sql_exchange object representing source file and its parsing rules
104
session - current thread
105
ex - file_exchange object representing source cursor and its parsing rules
97
106
table_list - list of tables to which we are loading data
98
107
fields_vars - list of fields and variables to which we read
100
109
set_fields - list of fields mentioned in set clause
101
110
set_values - expressions to assign to fields in previous list
102
111
handle_duplicates - indicates whenever we should emit error or
103
112
replace row if we will meet duplicates.
104
113
ignore - - indicates whenever we should ignore duplicates
105
read_file_from_client - is this LOAD DATA LOCAL ?
108
116
true - error / false - success
111
int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
119
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
112
120
List<Item> &fields_vars, List<Item> &set_fields,
113
121
List<Item> &set_values,
114
enum enum_duplicates handle_duplicates, bool ignore,
115
bool read_file_from_client)
122
enum enum_duplicates handle_duplicates, bool ignore)
117
124
char name[FN_REFLEN];
121
128
String *field_term=ex->field_term,*escaped=ex->escaped;
122
129
String *enclosed=ex->enclosed;
124
LOAD_FILE_INFO lf_info;
125
char *db = table_list->db; // This is never null
131
char *db= table_list->db; // This is never null
127
If path for file is not defined, we will use the current database.
134
If path for cursor is not defined, we will use the current database.
128
135
If this is not set, we will use the directory where the table to be
129
136
loaded is located
131
char *tdb= thd->db ? thd->db : db; // Result is never null
138
const char *tdb= session->db.empty() ? db : session->db.c_str(); // Result is never null
132
140
uint32_t skip_lines= ex->skip_lines;
133
141
bool transactional_table;
134
THD::killed_state killed_status= THD::NOT_KILLED;
142
Session::killed_state killed_status= Session::NOT_KILLED;
136
if (escaped->length() > 1 || enclosed->length() > 1)
144
/* Escape and enclosed character may be a utf8 4-byte character */
145
if (escaped->length() > 4 || enclosed->length() > 4)
138
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
147
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
142
if (open_and_lock_tables(thd, table_list))
151
if (session->openTablesLock(table_list))
144
if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
&thd->lex->select_lex.top_join_list,
154
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
155
&session->lex->select_lex.top_join_list,
147
&thd->lex->select_lex.leaf_tables, true))
157
&session->lex->select_lex.leaf_tables, true))
155
165
table is marked to be 'used for insert' in which case we should never
156
166
mark this table as 'const table' (ie, one that has only one row).
158
if (unique_table(thd, table_list, table_list->next_global, 0))
168
if (unique_table(table_list, table_list->next_global))
160
170
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
164
174
table= table_list->table;
165
transactional_table= table->file->has_transactions();
175
transactional_table= table->cursor->has_transactions();
167
177
if (!fields_vars.elements)
170
180
for (field=table->field; *field ; field++)
171
181
fields_vars.push_back(new Item_field(*field));
172
bitmap_set_all(table->write_set);
182
table->setWriteSet();
173
183
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
175
185
Let us also prepare SET clause, altough it is probably empty
178
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
188
if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
189
setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
183
193
{ // Part field list
184
194
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
185
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(thd, table, table_list))
195
if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
196
setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
197
check_that_all_fields_are_given_values(session, table, table_list))
190
200
Check whenever TIMESTAMP field with auto-set feature specified
343
346
table->next_number_field=table->found_next_number_field;
345
348
handle_duplicates == DUP_REPLACE)
346
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
349
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
347
350
if (handle_duplicates == DUP_REPLACE)
348
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
349
table->file->ha_start_bulk_insert((ha_rows) 0);
351
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
352
table->cursor->ha_start_bulk_insert((ha_rows) 0);
350
353
table->copy_blobs=1;
352
thd->abort_on_warning= (!ignore &&
353
(thd->variables.sql_mode &
354
(MODE_STRICT_TRANS_TABLES |
355
MODE_STRICT_ALL_TABLES)));
355
session->abort_on_warning= true;
357
357
if (!field_term->length() && !enclosed->length())
358
error= read_fixed_length(thd, info, table_list, fields_vars,
358
error= read_fixed_length(session, info, table_list, fields_vars,
359
359
set_fields, set_values, read_info,
360
360
skip_lines, ignore);
362
error= read_sep_field(thd, info, table_list, fields_vars,
362
error= read_sep_field(session, info, table_list, fields_vars,
363
363
set_fields, set_values, read_info,
364
364
*enclosed, skip_lines, ignore);
365
if (table->file->ha_end_bulk_insert() && !error)
365
if (table->cursor->ha_end_bulk_insert() && !error)
367
table->file->print_error(my_errno, MYF(0));
367
table->print_error(errno, MYF(0));
370
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
371
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
370
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
371
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
372
372
table->next_number_field=0;
375
375
my_close(file,MYF(0));
376
376
free_blobs(table); /* if pack_blob was used */
377
377
table->copy_blobs=0;
378
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
378
session->count_cuted_fields= CHECK_FIELD_IGNORE;
380
380
simulated killing in the middle of per-row loop
381
381
must be effective for binlogging
383
killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
383
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
386
if (read_file_from_client)
387
while (!read_info.next_line())
390
if (mysql_bin_log.is_open())
394
Make sure last block (the one which caused the error) gets
395
logged. This is needed because otherwise after write of (to
396
the binlog, not to read_info (which is a cache))
397
Delete_file_log_event the bad block will remain in read_info
398
(because pre_read is not called at the end of the last
399
block; remember pre_read is called whenever a new block is
400
read from disk). At the end of mysql_load(), the destructor
401
of read_info will call end_io_cache() which will flush
402
read_info, so we will finally have this in the binlog:
404
Append_block # The last successfull block
406
Append_block # The failing block
408
Or could also be (for a small file)
409
Create_file # The failing block
410
which is nonsense (Delete_file is not written in this case, because:
411
Create_file has not been written, so Delete_file is not written, then
412
when read_info is destroyed end_io_cache() is called which writes
415
read_info.end_io_cache();
416
/* If the file was not empty, wrote_create_file is true */
417
if (lf_info.wrote_create_file)
419
if (thd->transaction.stmt.modified_non_trans_table)
420
write_execute_load_query_log_event(thd, handle_duplicates,
421
ignore, transactional_table,
425
Delete_file_log_event d(thd, db, transactional_table);
426
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
427
mysql_bin_log.write(&d);
432
386
error= -1; // Error on read
435
389
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
436
(uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
438
if (thd->transaction.stmt.modified_non_trans_table)
439
thd->transaction.all.modified_non_trans_table= true;
441
if (mysql_bin_log.is_open())
444
We need to do the job that is normally done inside
445
binlog_query() here, which is to ensure that the pending event
446
is written before tables are unlocked and before any other
447
events are written. We also need to update the table map
448
version for the binary log to mark that table maps are invalid
451
if (thd->current_stmt_binlog_row_based)
452
thd->binlog_flush_pending_rows_event(true);
456
As already explained above, we need to call end_io_cache() or the last
457
block will be logged only after Execute_load_query_log_event (which is
458
wrong), when read_info is destroyed.
460
read_info.end_io_cache();
461
if (lf_info.wrote_create_file)
463
write_execute_load_query_log_event(thd, handle_duplicates, ignore,
464
transactional_table,killed_status);
390
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
392
if (session->transaction.stmt.modified_non_trans_table)
393
session->transaction.all.modified_non_trans_table= true;
469
395
/* ok to client sent only after binlog write and engine commit */
470
my_ok(thd, info.copied + info.deleted, 0L, name);
396
session->my_ok(info.copied + info.deleted, 0, 0L, name);
472
398
assert(transactional_table || !(info.copied || info.deleted) ||
473
thd->transaction.stmt.modified_non_trans_table);
474
table->file->ha_release_auto_increment();
399
session->transaction.stmt.modified_non_trans_table);
400
table->cursor->ha_release_auto_increment();
475
401
table->auto_increment_field_not_null= false;
476
thd->abort_on_warning= 0;
402
session->abort_on_warning= 0;
481
/* Not a very useful function; just to avoid duplication of code */
482
static bool write_execute_load_query_log_event(THD *thd,
483
bool duplicates, bool ignore,
484
bool transactional_table,
485
THD::killed_state killed_err_arg)
487
Execute_load_query_log_event
488
e(thd, thd->query, thd->query_length,
489
(char*)thd->lex->fname_start - (char*)thd->query,
490
(char*)thd->lex->fname_end - (char*)thd->query,
491
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
492
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
493
transactional_table, false, killed_err_arg);
494
e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
495
return mysql_bin_log.write(&e);
499
407
/****************************************************************************
500
408
** Read of rows of fixed size + optional garage + optonal newline
501
409
****************************************************************************/
504
read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
412
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
505
413
List<Item> &fields_vars, List<Item> &set_fields,
506
414
List<Item> &set_values, READ_INFO &read_info,
507
415
uint32_t skip_lines, bool ignore_check_option_errors)
509
417
List_iterator_fast<Item> it(fields_vars);
510
418
Item_field *sql_field;
511
TABLE *table= table_list->table;
419
Table *table= table_list->table;
517
425
while (!read_info.read_fixed_length())
521
thd->send_kill_message();
429
session->send_kill_message();
855
766
line_term_ptr=(char*) "";
857
768
enclosed_char= (enclosed_length=enclosed_par.length()) ?
858
(uchar) enclosed_par[0] : INT_MAX;
859
field_term_char= field_term_length ? (uchar) field_term_ptr[0] : INT_MAX;
860
line_term_char= line_term_length ? (uchar) line_term_ptr[0] : INT_MAX;
769
(unsigned char) enclosed_par[0] : INT_MAX;
770
field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
771
line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
861
772
error=eof=found_end_of_line=found_null=line_cuted=0;
862
773
buff_length=tot_length;
865
776
/* Set of a stack for unget if long terminators */
866
uint length=max(field_term_length,line_term_length)+1;
777
uint32_t length= max(field_term_length,line_term_length)+1;
867
778
set_if_bigger(length,line_start.length());
868
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
779
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
870
if (!(buffer=(uchar*) my_malloc(buff_length+1,MYF(0))))
871
error=1; /* purecov: inspected */
781
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
874
785
end_of_buff=buffer+buff_length;
875
if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
876
(get_it_from_net) ? READ_NET :
786
if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
877
788
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
880
my_free((uchar*) buffer,MYF(0)); /* purecov: inspected */
791
free((unsigned char*) buffer);
1025
927
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
1026
928
if (chr == line_term_char)
1028
if (chr == line_term_char && found_enclosed_char == INT_MAX)
930
if (chr == line_term_char && found_enclosed_char == INT_MAX)
1031
if (terminator(line_term_ptr,line_term_length))
1032
{ // Maybe unexpected linefeed
1034
found_end_of_line=1;
933
if (terminator(line_term_ptr,line_term_length))
934
{ // Maybe unexpected linefeed
1040
942
if (chr == found_enclosed_char)
1042
if ((chr=GET) == found_enclosed_char)
1043
{ // Remove dupplicated
1044
*to++ = (uchar) chr;
1047
// End of enclosed field if followed by field_term or line_term
1048
if (chr == my_b_EOF ||
1049
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
1050
{ // Maybe unexpected linefeed
1052
found_end_of_line=1;
1057
if (chr == field_term_char &&
1058
terminator(field_term_ptr,field_term_length))
1066
The string didn't terminate yet.
1067
Store back next character for the loop
1070
/* copy the found term character to 'to' */
1071
chr= found_enclosed_char;
944
if ((chr=GET) == found_enclosed_char)
945
{ // Remove dupplicated
946
*to++ = (unsigned char) chr;
949
// End of enclosed field if followed by field_term or line_term
950
if (chr == my_b_EOF ||
951
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
952
{ // Maybe unexpected linefeed
959
if (chr == field_term_char &&
960
terminator(field_term_ptr,field_term_length))
968
The string didn't terminate yet.
969
Store back next character for the loop
972
/* copy the found term character to 'to' */
973
chr= found_enclosed_char;
1073
975
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
1075
if (terminator(field_term_ptr,field_term_length))
977
if (terminator(field_term_ptr,field_term_length))
1083
*to++ = (uchar) chr;
985
*to++ = (unsigned char) chr;
1086
** We come here if buffer is too small. Enlarge it and continue
1088
if (!(new_buffer=(uchar*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
988
** We come here if buffer is too small. Enlarge it and continue
990
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
1090
991
return (error=1);
1091
992
to=new_buffer + (to-buffer);
1092
993
buffer=new_buffer;