17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
21
#include <drizzled/drizzled_error_messages.h>
20
#include <drizzled/sql_load.h>
21
#include <drizzled/error.h>
22
#include <drizzled/data_home.h>
23
#include <drizzled/session.h>
24
#include <drizzled/sql_base.h>
25
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
26
uchar *buffer, /* Buffer for read text */
27
*end_of_buff; /* Data in bufferts ends here */
28
uint buff_length, /* Length of buffert */
29
max_length; /* Max length of row */
41
unsigned char *buffer; /* Buffer for read text */
42
unsigned char *end_of_buff; /* Data in bufferts ends here */
43
size_t buff_length; /* Length of buffert */
44
size_t max_length; /* Max length of row */
30
45
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
31
46
uint field_term_length,line_term_length,enclosed_length;
32
47
int field_term_char,line_term_char,enclosed_char,escape_char;
33
48
int *stack,*stack_pos;
34
49
bool found_end_of_line,start_of_line,eof;
35
50
bool need_end_io_cache;
51
internal::IO_CACHE cache;
40
54
bool error,line_cuted,found_null,enclosed;
41
uchar *row_start, /* Found row starts here */
55
unsigned char *row_start, /* Found row starts here */
42
56
*row_end; /* Found row ends here */
43
57
const CHARSET_INFO *read_charset;
45
READ_INFO(File file,uint tot_length, const CHARSET_INFO * const cs,
59
READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
46
60
String &field_term,String &line_start,String &line_term,
47
String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
61
String &enclosed,int escape, bool is_fifo);
50
64
int read_fixed_length(void);
51
65
int next_line(void);
52
66
char unescape(char chr);
53
int terminator(char *ptr,uint length);
67
int terminator(char *ptr,uint32_t length);
54
68
bool find_start_of_fields();
60
74
void end_io_cache()
62
::end_io_cache(&cache);
76
internal::end_io_cache(&cache);
63
77
need_end_io_cache = 0;
67
81
Either this method, or we need to make cache public
68
82
Arg must be set from mysql_load() since constructor does not see
69
either the table or THD value
83
either the table or Session value
71
85
void set_io_cache_arg(void* arg) { cache.arg = arg; }
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
88
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
75
89
List<Item> &fields_vars, List<Item> &set_fields,
76
90
List<Item> &set_values, READ_INFO &read_info,
77
91
uint32_t skip_lines,
78
92
bool ignore_check_option_errors);
79
static int read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
93
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
80
94
List<Item> &fields_vars, List<Item> &set_fields,
81
95
List<Item> &set_values, READ_INFO &read_info,
82
96
String &enclosed, uint32_t skip_lines,
83
97
bool ignore_check_option_errors);
85
static bool write_execute_load_query_log_event(THD *thd,
86
bool duplicates, bool ignore,
87
bool transactional_table,
88
THD::killed_state killed_status);
91
101
Execute LOAD DATA query
96
ex - sql_exchange object representing source file and its parsing rules
105
session - current thread
106
ex - file_exchange object representing source cursor and its parsing rules
97
107
table_list - list of tables to which we are loading data
98
108
fields_vars - list of fields and variables to which we read
100
110
set_fields - list of fields mentioned in set clause
101
111
set_values - expressions to assign to fields in previous list
102
112
handle_duplicates - indicates whenever we should emit error or
103
113
replace row if we will meet duplicates.
104
114
ignore - - indicates whenever we should ignore duplicates
105
read_file_from_client - is this LOAD DATA LOCAL ?
108
117
true - error / false - success
111
int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
120
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
112
121
List<Item> &fields_vars, List<Item> &set_fields,
113
122
List<Item> &set_values,
114
enum enum_duplicates handle_duplicates, bool ignore,
115
bool read_file_from_client)
123
enum enum_duplicates handle_duplicates, bool ignore)
117
125
char name[FN_REFLEN];
121
129
String *field_term=ex->field_term,*escaped=ex->escaped;
122
130
String *enclosed=ex->enclosed;
124
LOAD_FILE_INFO lf_info;
125
char *db = table_list->db; // This is never null
132
char *db= table_list->db; // This is never null
127
If path for file is not defined, we will use the current database.
135
If path for cursor is not defined, we will use the current database.
128
136
If this is not set, we will use the directory where the table to be
129
137
loaded is located
131
char *tdb= thd->db ? thd->db : db; // Result is never null
139
const char *tdb= session->db.empty() ? db : session->db.c_str(); // Result is never null
132
141
uint32_t skip_lines= ex->skip_lines;
133
142
bool transactional_table;
134
THD::killed_state killed_status= THD::NOT_KILLED;
143
Session::killed_state killed_status= Session::NOT_KILLED;
136
if (escaped->length() > 1 || enclosed->length() > 1)
145
/* Escape and enclosed character may be a utf8 4-byte character */
146
if (escaped->length() > 4 || enclosed->length() > 4)
138
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
148
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
142
if (open_and_lock_tables(thd, table_list))
152
if (session->openTablesLock(table_list))
144
if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
&thd->lex->select_lex.top_join_list,
155
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
156
&session->lex->select_lex.top_join_list,
147
&thd->lex->select_lex.leaf_tables, true))
158
&session->lex->select_lex.leaf_tables, true))
155
166
table is marked to be 'used for insert' in which case we should never
156
167
mark this table as 'const table' (ie, one that has only one row).
158
if (unique_table(thd, table_list, table_list->next_global, 0))
169
if (unique_table(table_list, table_list->next_global))
160
171
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
164
175
table= table_list->table;
165
transactional_table= table->file->has_transactions();
176
transactional_table= table->cursor->has_transactions();
167
178
if (!fields_vars.elements)
170
181
for (field=table->field; *field ; field++)
171
182
fields_vars.push_back(new Item_field(*field));
172
bitmap_set_all(table->write_set);
183
table->setWriteSet();
173
184
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
175
186
Let us also prepare SET clause, altough it is probably empty
178
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
189
if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
190
setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
183
194
{ // Part field list
184
195
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
185
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(thd, table, table_list))
196
if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
197
setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
198
check_that_all_fields_are_given_values(session, table, table_list))
190
201
Check whenever TIMESTAMP field with auto-set feature specified
303
308
info.handle_duplicates=handle_duplicates;
304
309
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
306
READ_INFO read_info(file,tot_length,
307
ex->cs ? ex->cs : thd->variables.collation_database,
311
READ_INFO read_info(file, tot_length,
312
ex->cs ? ex->cs : get_default_db_collation(session->db.c_str()),
308
313
*field_term,*ex->line_start, *ex->line_term, *enclosed,
309
info.escape_char, read_file_from_client, is_fifo);
314
info.escape_char, is_fifo);
310
315
if (read_info.error)
313
my_close(file,MYF(0)); // no files in net reading
318
internal::my_close(file,MYF(0)); // no files in net reading
314
319
return(true); // Can't allocate buffers
317
if (mysql_bin_log.is_open())
320
lf_info.wrote_create_file = 0;
321
lf_info.last_pos_in_file = HA_POS_ERROR;
322
lf_info.log_delayed= transactional_table;
323
read_info.set_io_cache_arg((void*) &lf_info);
326
thd->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
327
thd->cuted_fields=0L;
323
* Per the SQL standard, inserting NULL into a NOT NULL
324
* field requires an error to be thrown.
328
* NULL check and handling occurs in field_conv.cc
330
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
331
session->cuted_fields=0L;
328
332
/* Skip lines if there is a line terminator */
329
333
if (ex->line_term->length())
343
347
table->next_number_field=table->found_next_number_field;
345
349
handle_duplicates == DUP_REPLACE)
346
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
350
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
347
351
if (handle_duplicates == DUP_REPLACE)
348
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
349
table->file->ha_start_bulk_insert((ha_rows) 0);
352
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
353
table->cursor->ha_start_bulk_insert((ha_rows) 0);
350
354
table->copy_blobs=1;
352
thd->abort_on_warning= (!ignore &&
353
(thd->variables.sql_mode &
354
(MODE_STRICT_TRANS_TABLES |
355
MODE_STRICT_ALL_TABLES)));
356
session->abort_on_warning= true;
357
358
if (!field_term->length() && !enclosed->length())
358
error= read_fixed_length(thd, info, table_list, fields_vars,
359
error= read_fixed_length(session, info, table_list, fields_vars,
359
360
set_fields, set_values, read_info,
360
361
skip_lines, ignore);
362
error= read_sep_field(thd, info, table_list, fields_vars,
363
error= read_sep_field(session, info, table_list, fields_vars,
363
364
set_fields, set_values, read_info,
364
365
*enclosed, skip_lines, ignore);
365
if (table->file->ha_end_bulk_insert() && !error)
366
if (table->cursor->ha_end_bulk_insert() && !error)
367
table->file->print_error(my_errno, MYF(0));
368
table->print_error(errno, MYF(0));
370
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
371
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
371
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
372
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
372
373
table->next_number_field=0;
375
my_close(file,MYF(0));
376
internal::my_close(file,MYF(0));
376
377
free_blobs(table); /* if pack_blob was used */
377
378
table->copy_blobs=0;
378
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
379
session->count_cuted_fields= CHECK_FIELD_IGNORE;
380
381
simulated killing in the middle of per-row loop
381
382
must be effective for binlogging
383
killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
384
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
386
if (read_file_from_client)
387
while (!read_info.next_line())
390
if (mysql_bin_log.is_open())
394
Make sure last block (the one which caused the error) gets
395
logged. This is needed because otherwise after write of (to
396
the binlog, not to read_info (which is a cache))
397
Delete_file_log_event the bad block will remain in read_info
398
(because pre_read is not called at the end of the last
399
block; remember pre_read is called whenever a new block is
400
read from disk). At the end of mysql_load(), the destructor
401
of read_info will call end_io_cache() which will flush
402
read_info, so we will finally have this in the binlog:
404
Append_block # The last successfull block
406
Append_block # The failing block
408
Or could also be (for a small file)
409
Create_file # The failing block
410
which is nonsense (Delete_file is not written in this case, because:
411
Create_file has not been written, so Delete_file is not written, then
412
when read_info is destroyed end_io_cache() is called which writes
415
read_info.end_io_cache();
416
/* If the file was not empty, wrote_create_file is true */
417
if (lf_info.wrote_create_file)
419
if (thd->transaction.stmt.modified_non_trans_table)
420
write_execute_load_query_log_event(thd, handle_duplicates,
421
ignore, transactional_table,
425
Delete_file_log_event d(thd, db, transactional_table);
426
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
427
mysql_bin_log.write(&d);
432
387
error= -1; // Error on read
435
390
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
436
(uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
438
if (thd->transaction.stmt.modified_non_trans_table)
439
thd->transaction.all.modified_non_trans_table= true;
441
if (mysql_bin_log.is_open())
444
We need to do the job that is normally done inside
445
binlog_query() here, which is to ensure that the pending event
446
is written before tables are unlocked and before any other
447
events are written. We also need to update the table map
448
version for the binary log to mark that table maps are invalid
451
if (thd->current_stmt_binlog_row_based)
452
thd->binlog_flush_pending_rows_event(true);
456
As already explained above, we need to call end_io_cache() or the last
457
block will be logged only after Execute_load_query_log_event (which is
458
wrong), when read_info is destroyed.
460
read_info.end_io_cache();
461
if (lf_info.wrote_create_file)
463
write_execute_load_query_log_event(thd, handle_duplicates, ignore,
464
transactional_table,killed_status);
391
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
393
if (session->transaction.stmt.modified_non_trans_table)
394
session->transaction.all.modified_non_trans_table= true;
469
396
/* ok to client sent only after binlog write and engine commit */
470
my_ok(thd, info.copied + info.deleted, 0L, name);
397
session->my_ok(info.copied + info.deleted, 0, 0L, name);
472
399
assert(transactional_table || !(info.copied || info.deleted) ||
473
thd->transaction.stmt.modified_non_trans_table);
474
table->file->ha_release_auto_increment();
400
session->transaction.stmt.modified_non_trans_table);
401
table->cursor->ha_release_auto_increment();
475
402
table->auto_increment_field_not_null= false;
476
thd->abort_on_warning= 0;
403
session->abort_on_warning= 0;
481
/* Not a very useful function; just to avoid duplication of code */
482
static bool write_execute_load_query_log_event(THD *thd,
483
bool duplicates, bool ignore,
484
bool transactional_table,
485
THD::killed_state killed_err_arg)
487
Execute_load_query_log_event
488
e(thd, thd->query, thd->query_length,
489
(char*)thd->lex->fname_start - (char*)thd->query,
490
(char*)thd->lex->fname_end - (char*)thd->query,
491
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
492
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
493
transactional_table, false, killed_err_arg);
494
e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
495
return mysql_bin_log.write(&e);
499
408
/****************************************************************************
500
409
** Read of rows of fixed size + optional garage + optonal newline
501
410
****************************************************************************/
504
read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
413
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
505
414
List<Item> &fields_vars, List<Item> &set_fields,
506
415
List<Item> &set_values, READ_INFO &read_info,
507
416
uint32_t skip_lines, bool ignore_check_option_errors)
509
418
List_iterator_fast<Item> it(fields_vars);
510
419
Item_field *sql_field;
511
TABLE *table= table_list->table;
420
Table *table= table_list->table;
517
426
while (!read_info.read_fixed_length())
521
thd->send_kill_message();
430
session->send_kill_message();
855
767
line_term_ptr=(char*) "";
857
769
enclosed_char= (enclosed_length=enclosed_par.length()) ?
858
(uchar) enclosed_par[0] : INT_MAX;
859
field_term_char= field_term_length ? (uchar) field_term_ptr[0] : INT_MAX;
860
line_term_char= line_term_length ? (uchar) line_term_ptr[0] : INT_MAX;
770
(unsigned char) enclosed_par[0] : INT_MAX;
771
field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
772
line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
861
773
error=eof=found_end_of_line=found_null=line_cuted=0;
862
774
buff_length=tot_length;
865
777
/* Set of a stack for unget if long terminators */
866
uint length=max(field_term_length,line_term_length)+1;
778
uint32_t length= max(field_term_length,line_term_length)+1;
867
779
set_if_bigger(length,line_start.length());
868
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
780
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
870
if (!(buffer=(uchar*) my_malloc(buff_length+1,MYF(0))))
871
error=1; /* purecov: inspected */
782
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
874
786
end_of_buff=buffer+buff_length;
875
if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
876
(get_it_from_net) ? READ_NET :
877
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
787
if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
788
(false) ? internal::READ_NET :
789
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
880
my_free((uchar*) buffer,MYF(0)); /* purecov: inspected */
792
free((unsigned char*) buffer);
1025
928
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
1026
929
if (chr == line_term_char)
1028
if (chr == line_term_char && found_enclosed_char == INT_MAX)
931
if (chr == line_term_char && found_enclosed_char == INT_MAX)
1031
if (terminator(line_term_ptr,line_term_length))
1032
{ // Maybe unexpected linefeed
1034
found_end_of_line=1;
934
if (terminator(line_term_ptr,line_term_length))
935
{ // Maybe unexpected linefeed
1040
943
if (chr == found_enclosed_char)
1042
if ((chr=GET) == found_enclosed_char)
1043
{ // Remove dupplicated
1044
*to++ = (uchar) chr;
1047
// End of enclosed field if followed by field_term or line_term
1048
if (chr == my_b_EOF ||
1049
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
1050
{ // Maybe unexpected linefeed
1052
found_end_of_line=1;
1057
if (chr == field_term_char &&
1058
terminator(field_term_ptr,field_term_length))
1066
The string didn't terminate yet.
1067
Store back next character for the loop
1070
/* copy the found term character to 'to' */
1071
chr= found_enclosed_char;
945
if ((chr=GET) == found_enclosed_char)
946
{ // Remove dupplicated
947
*to++ = (unsigned char) chr;
950
// End of enclosed field if followed by field_term or line_term
951
if (chr == my_b_EOF ||
952
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
953
{ // Maybe unexpected linefeed
960
if (chr == field_term_char &&
961
terminator(field_term_ptr,field_term_length))
969
The string didn't terminate yet.
970
Store back next character for the loop
973
/* copy the found term character to 'to' */
974
chr= found_enclosed_char;
1073
976
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
1075
if (terminator(field_term_ptr,field_term_length))
978
if (terminator(field_term_ptr,field_term_length))
1083
*to++ = (uchar) chr;
986
*to++ = (unsigned char) chr;
1086
** We come here if buffer is too small. Enlarge it and continue
1088
if (!(new_buffer=(uchar*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
989
** We come here if buffer is too small. Enlarge it and continue
991
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
1090
992
return (error=1);
1091
993
to=new_buffer + (to-buffer);
1092
994
buffer=new_buffer;