17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
#include <drizzled/replication/replication.h>
21
22
#include <drizzled/error.h>
22
23
#include <drizzled/data_home.h>
23
24
#include <drizzled/session.h>
24
25
#include <drizzled/sql_base.h>
25
26
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
41
unsigned char *buffer; /* Buffer for read text */
42
unsigned char *end_of_buff; /* Data in bufferts ends here */
43
size_t buff_length; /* Length of buffert */
44
size_t max_length; /* Max length of row */
30
unsigned char *buffer, /* Buffer for read text */
31
*end_of_buff; /* Data in bufferts ends here */
32
uint buff_length, /* Length of buffert */
33
max_length; /* Max length of row */
45
34
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
46
35
uint field_term_length,line_term_length,enclosed_length;
47
36
int field_term_char,line_term_char,enclosed_char,escape_char;
48
37
int *stack,*stack_pos;
49
38
bool found_end_of_line,start_of_line,eof;
50
39
bool need_end_io_cache;
51
internal::IO_CACHE cache;
54
44
bool error,line_cuted,found_null,enclosed;
56
46
*row_end; /* Found row ends here */
57
47
const CHARSET_INFO *read_charset;
59
READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
49
READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
60
50
String &field_term,String &line_start,String &line_term,
61
String &enclosed,int escape, bool is_fifo);
51
String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
64
54
int read_fixed_length(void);
105
95
session - current thread
106
ex - file_exchange object representing source cursor and its parsing rules
96
ex - sql_exchange object representing source file and its parsing rules
107
97
table_list - list of tables to which we are loading data
108
98
fields_vars - list of fields and variables to which we read
110
100
set_fields - list of fields mentioned in set clause
111
101
set_values - expressions to assign to fields in previous list
112
102
handle_duplicates - indicates whenever we should emit error or
113
103
replace row if we will meet duplicates.
114
104
ignore - - indicates whenever we should ignore duplicates
105
read_file_from_client - is this LOAD DATA LOCAL ?
117
108
true - error / false - success
120
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
121
112
List<Item> &fields_vars, List<Item> &set_fields,
122
113
List<Item> &set_values,
123
enum enum_duplicates handle_duplicates, bool ignore)
114
enum enum_duplicates handle_duplicates, bool ignore,
115
bool read_file_from_client)
125
117
char name[FN_REFLEN];
127
119
Table *table= NULL;
129
121
String *field_term=ex->field_term,*escaped=ex->escaped;
132
124
char *db= table_list->db; // This is never null
135
If path for cursor is not defined, we will use the current database.
127
If path for file is not defined, we will use the current database.
136
128
If this is not set, we will use the directory where the table to be
137
129
loaded is located
139
const char *tdb= session->db.empty() ? db : session->db.c_str(); // Result is never null
131
char *tdb= session->db ? session->db : db; // Result is never null
141
133
uint32_t skip_lines= ex->skip_lines;
142
134
bool transactional_table;
148
140
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
152
if (session->openTablesLock(table_list))
143
if (open_and_lock_tables(session, table_list))
155
145
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
156
146
&session->lex->select_lex.top_join_list,
166
156
table is marked to be 'used for insert' in which case we should never
167
157
mark this table as 'const table' (ie, one that has only one row).
169
if (unique_table(table_list, table_list->next_global))
159
if (unique_table(session, table_list, table_list->next_global, 0))
171
161
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
165
table= table_list->table;
176
transactional_table= table->cursor->has_transactions();
166
transactional_table= table->file->has_transactions();
178
168
if (!fields_vars.elements)
181
171
for (field=table->field; *field ; field++)
182
172
fields_vars.push_back(new Item_field(*field));
183
table->setWriteSet();
173
bitmap_set_all(table->write_set);
184
174
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
186
176
Let us also prepare SET clause, altough it is probably empty
204
194
if (table->timestamp_field)
206
if (table->isWriteSet(table->timestamp_field->field_index))
196
if (bitmap_is_set(table->write_set,
197
table->timestamp_field->field_index))
207
198
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
210
table->setWriteSet(table->timestamp_field->field_index);
201
bitmap_set_bit(table->write_set,
202
table->timestamp_field->field_index);
213
205
/* Fix the expressions in SET clause */
247
/* We can't give an error in the middle when using LOCAL files */
248
if (read_file_from_client && handle_duplicates == DUP_ERROR)
251
if (read_file_from_client)
253
(void)net_request_file(&session->net,ex->file_name);
256
258
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
257
259
ex->file_name+=dirname_length(ex->file_name);
259
if (!internal::dirname_length(ex->file_name))
261
if (!dirname_length(ex->file_name))
261
263
strcpy(name, drizzle_real_data_home);
262
264
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
263
(void) internal::fn_format(name, ex->file_name, name, "",
265
(void) fn_format(name, ex->file_name, name, "",
264
266
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
268
(void) internal::fn_format(name, ex->file_name, drizzle_real_data_home, "",
270
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
269
271
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
271
273
if (opt_secure_file_priv &&
286
// if we are not in slave thread, the cursor must be:
287
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
288
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
288
// if we are not in slave thread, the file must be:
289
if (!session->slave_thread &&
290
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
291
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
292
((stat_info.st_mode & S_IFREG) == S_IFREG ||
293
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
292
295
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
295
298
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
298
if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
301
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
300
my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
303
my_error(ER_CANT_OPEN_FILE, MYF(0), my_errno);
308
311
info.handle_duplicates=handle_duplicates;
309
312
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
311
SchemaIdentifier identifier(session->db);
312
READ_INFO read_info(file, tot_length,
313
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
314
*field_term, *ex->line_start, *ex->line_term, *enclosed,
315
info.escape_char, is_fifo);
314
READ_INFO read_info(file,tot_length,
315
ex->cs ? ex->cs : session->variables.collation_database,
316
*field_term,*ex->line_start, *ex->line_term, *enclosed,
317
info.escape_char, read_file_from_client, is_fifo);
316
318
if (read_info.error)
319
internal::my_close(file,MYF(0)); // no files in net reading
321
my_close(file,MYF(0)); // no files in net reading
320
322
return(true); // Can't allocate buffers
348
350
table->next_number_field=table->found_next_number_field;
350
352
handle_duplicates == DUP_REPLACE)
351
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
353
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
352
354
if (handle_duplicates == DUP_REPLACE)
353
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
354
table->cursor->ha_start_bulk_insert((ha_rows) 0);
355
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
356
table->file->ha_start_bulk_insert((ha_rows) 0);
355
357
table->copy_blobs=1;
357
359
session->abort_on_warning= true;
364
366
error= read_sep_field(session, info, table_list, fields_vars,
365
367
set_fields, set_values, read_info,
366
368
*enclosed, skip_lines, ignore);
367
if (table->cursor->ha_end_bulk_insert() && !error)
369
if (table->file->ha_end_bulk_insert() && !error)
369
table->print_error(errno, MYF(0));
371
table->file->print_error(my_errno, MYF(0));
372
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
373
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
374
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
375
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
374
376
table->next_number_field=0;
377
internal::my_close(file,MYF(0));
379
my_close(file,MYF(0));
378
380
free_blobs(table); /* if pack_blob was used */
379
381
table->copy_blobs=0;
380
382
session->count_cuted_fields= CHECK_FIELD_IGNORE;
385
387
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
390
if (read_file_from_client)
391
while (!read_info.next_line())
388
394
error= -1; // Error on read
391
snprintf(name, sizeof(name), ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
397
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
392
398
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
394
if (session->transaction.stmt.hasModifiedNonTransData())
395
session->transaction.all.markModifiedNonTransData();
400
if (session->transaction.stmt.modified_non_trans_table)
401
session->transaction.all.modified_non_trans_table= true;
397
403
/* ok to client sent only after binlog write and engine commit */
398
session->my_ok(info.copied + info.deleted, 0, 0L, name);
404
my_ok(session, info.copied + info.deleted, 0L, name);
400
406
assert(transactional_table || !(info.copied || info.deleted) ||
401
session->transaction.stmt.hasModifiedNonTransData());
402
table->cursor->ha_release_auto_increment();
407
session->transaction.stmt.modified_non_trans_table);
408
table->file->ha_release_auto_increment();
403
409
table->auto_increment_field_not_null= false;
404
410
session->abort_on_warning= 0;
480
486
unsigned char save_chr;
481
if ((length=(uint32_t) (read_info.row_end-pos)) >
487
if ((length=(uint) (read_info.row_end-pos)) >
482
488
field->field_length)
484
489
length=field->field_length;
486
save_chr=pos[length];
487
pos[length]='\0'; // Add temp null terminator for store()
490
save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
488
491
field->store((char*) pos,length,read_info.read_charset);
489
492
pos[length]=save_chr;
490
493
if ((pos+=length) > read_info.row_end)
741
READ_INFO::READ_INFO(int file_par, size_t tot_length,
742
const CHARSET_INFO * const cs,
744
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
743
745
String &field_term, String &line_start, String &line_term,
744
String &enclosed_par, int escape, bool is_fifo)
745
:cursor(file_par),escape_char(escape)
746
String &enclosed_par, int escape, bool get_it_from_net,
748
:file(file_par),escape_char(escape)
747
750
read_charset= cs;
748
751
field_term_ptr=(char*) field_term.ptr();
778
781
/* Set of a stack for unget if long terminators */
779
uint32_t length= max(field_term_length,line_term_length)+1;
782
uint32_t length=cmax(field_term_length,line_term_length)+1;
780
783
set_if_bigger(length,line_start.length());
781
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
784
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
783
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
786
if (!(buffer=(unsigned char*) malloc(buff_length+1)))
787
error=1; /* purecov: inspected */
787
790
end_of_buff=buffer+buff_length;
788
if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
789
(false) ? internal::READ_NET :
790
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
791
if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
792
(get_it_from_net) ? READ_NET :
793
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
793
free((unsigned char*) buffer);
796
free((unsigned char*) buffer); /* purecov: inspected */
881
891
while ( to < end_of_buff)
884
895
if ((my_mbcharlen(read_charset, chr) > 1) &&
885
896
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
887
unsigned char* p = (unsigned char*)to;
889
int ml = my_mbcharlen(read_charset, chr);
891
for (i=1; i<ml; i++) {
897
if (my_ismbchar(read_charset,
902
PUSH((unsigned char) *--to);
898
unsigned char* p = (unsigned char*)to;
900
int ml = my_mbcharlen(read_charset, chr);
902
for (i=1; i<ml; i++) {
908
if (my_ismbchar(read_charset,
913
PUSH((unsigned char) *--to);
905
917
if (chr == my_b_EOF)
907
919
if (chr == escape_char)
909
if ((chr=GET) == my_b_EOF)
911
*to++= (unsigned char) escape_char;
921
if ((chr=GET) == my_b_EOF)
923
*to++= (unsigned char) escape_char;
915
927
When escape_char == enclosed_char, we treat it like we do for
916
928
handling quotes in SQL parsing -- you can double-up the
929
941
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
930
942
if (chr == line_term_char)
932
if (chr == line_term_char && found_enclosed_char == INT_MAX)
944
if (chr == line_term_char && found_enclosed_char == INT_MAX)
935
if (terminator(line_term_ptr,line_term_length))
936
{ // Maybe unexpected linefeed
947
if (terminator(line_term_ptr,line_term_length))
948
{ // Maybe unexpected linefeed
944
956
if (chr == found_enclosed_char)
946
if ((chr=GET) == found_enclosed_char)
947
{ // Remove dupplicated
948
*to++ = (unsigned char) chr;
951
// End of enclosed field if followed by field_term or line_term
952
if (chr == my_b_EOF ||
953
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
954
{ // Maybe unexpected linefeed
961
if (chr == field_term_char &&
962
terminator(field_term_ptr,field_term_length))
970
The string didn't terminate yet.
971
Store back next character for the loop
974
/* copy the found term character to 'to' */
975
chr= found_enclosed_char;
958
if ((chr=GET) == found_enclosed_char)
959
{ // Remove dupplicated
960
*to++ = (unsigned char) chr;
963
// End of enclosed field if followed by field_term or line_term
964
if (chr == my_b_EOF ||
965
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
966
{ // Maybe unexpected linefeed
973
if (chr == field_term_char &&
974
terminator(field_term_ptr,field_term_length))
982
The string didn't terminate yet.
983
Store back next character for the loop
986
/* copy the found term character to 'to' */
987
chr= found_enclosed_char;
977
989
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
979
if (terminator(field_term_ptr,field_term_length))
991
if (terminator(field_term_ptr,field_term_length))
987
999
*to++ = (unsigned char) chr;
990
** We come here if buffer is too small. Enlarge it and continue
1002
** We come here if buffer is too small. Enlarge it and continue
992
1004
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
993
1005
return (error=1);
994
1006
to=new_buffer + (to-buffer);
1089
if (my_mbcharlen(read_charset, chr) > 1)
1092
chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1095
if (chr == escape_char)
1098
if (chr == my_b_EOF)
1102
if (my_mbcharlen(read_charset, chr) > 1)
1105
chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1108
if (chr == escape_char)
1112
if (chr == my_b_EOF)