17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
23
#include <drizzled/session.h>
24
24
#include <drizzled/sql_base.h>
25
25
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
41
unsigned char *buffer; /* Buffer for read text */
42
unsigned char *end_of_buff; /* Data in bufferts ends here */
43
size_t buff_length; /* Length of buffert */
44
size_t max_length; /* Max length of row */
29
unsigned char *buffer, /* Buffer for read text */
30
*end_of_buff; /* Data in bufferts ends here */
31
uint buff_length, /* Length of buffert */
32
max_length; /* Max length of row */
45
33
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
46
34
uint field_term_length,line_term_length,enclosed_length;
47
35
int field_term_char,line_term_char,enclosed_char,escape_char;
48
36
int *stack,*stack_pos;
49
37
bool found_end_of_line,start_of_line,eof;
50
38
bool need_end_io_cache;
51
internal::IO_CACHE cache;
54
42
bool error,line_cuted,found_null,enclosed;
56
44
*row_end; /* Found row ends here */
57
45
const CHARSET_INFO *read_charset;
59
READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
47
READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
60
48
String &field_term,String &line_start,String &line_term,
61
49
String &enclosed,int escape, bool is_fifo);
105
93
session - current thread
106
ex - file_exchange object representing source cursor and its parsing rules
94
ex - file_exchange object representing source file and its parsing rules
107
95
table_list - list of tables to which we are loading data
108
96
fields_vars - list of fields and variables to which we read
110
98
set_fields - list of fields mentioned in set clause
111
99
set_values - expressions to assign to fields in previous list
112
100
handle_duplicates - indicates whenever we should emit error or
132
120
char *db= table_list->db; // This is never null
135
If path for cursor is not defined, we will use the current database.
123
If path for file is not defined, we will use the current database.
136
124
If this is not set, we will use the directory where the table to be
137
125
loaded is located
139
const char *tdb= session->db.empty() ? db : session->db.c_str(); // Result is never null
127
char *tdb= session->db ? session->db : db; // Result is never null
141
129
uint32_t skip_lines= ex->skip_lines;
142
130
bool transactional_table;
148
136
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
152
if (session->openTablesLock(table_list))
139
if (open_and_lock_tables(session, table_list))
155
141
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
156
142
&session->lex->select_lex.top_join_list,
166
152
table is marked to be 'used for insert' in which case we should never
167
153
mark this table as 'const table' (ie, one that has only one row).
169
if (unique_table(table_list, table_list->next_global))
155
if (unique_table(session, table_list, table_list->next_global, 0))
171
157
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
161
table= table_list->table;
176
transactional_table= table->cursor->has_transactions();
162
transactional_table= table->file->has_transactions();
178
164
if (!fields_vars.elements)
181
167
for (field=table->field; *field ; field++)
182
168
fields_vars.push_back(new Item_field(*field));
183
table->setWriteSet();
169
table->write_set->set();
184
170
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
186
172
Let us also prepare SET clause, altough it is probably empty
204
190
if (table->timestamp_field)
206
if (table->isWriteSet(table->timestamp_field->field_index))
192
if (table->write_set->test(table->timestamp_field->field_index))
207
193
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
210
table->setWriteSet(table->timestamp_field->field_index);
196
table->write_set->set(table->timestamp_field->field_index);
213
199
/* Fix the expressions in SET clause */
256
242
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
257
243
ex->file_name+=dirname_length(ex->file_name);
259
if (!internal::dirname_length(ex->file_name))
245
if (!dirname_length(ex->file_name))
261
247
strcpy(name, drizzle_real_data_home);
262
248
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
263
(void) internal::fn_format(name, ex->file_name, name, "",
249
(void) fn_format(name, ex->file_name, name, "",
264
250
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
268
(void) internal::fn_format(name, ex->file_name, drizzle_real_data_home, "",
254
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
269
255
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
271
257
if (opt_secure_file_priv &&
286
// if we are not in slave thread, the cursor must be:
272
// if we are not in slave thread, the file must be:
287
273
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
288
274
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
275
((stat_info.st_mode & S_IFREG) == S_IFREG ||
295
281
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
298
if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
284
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
300
my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
286
my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
308
294
info.handle_duplicates=handle_duplicates;
309
295
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
311
SchemaIdentifier identifier(session->db);
312
READ_INFO read_info(file, tot_length,
313
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
314
*field_term, *ex->line_start, *ex->line_term, *enclosed,
297
READ_INFO read_info(file,tot_length,
298
ex->cs ? ex->cs : session->variables.collation_database,
299
*field_term,*ex->line_start, *ex->line_term, *enclosed,
315
300
info.escape_char, is_fifo);
316
301
if (read_info.error)
319
internal::my_close(file,MYF(0)); // no files in net reading
304
my_close(file,MYF(0)); // no files in net reading
320
305
return(true); // Can't allocate buffers
348
333
table->next_number_field=table->found_next_number_field;
350
335
handle_duplicates == DUP_REPLACE)
351
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
336
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
352
337
if (handle_duplicates == DUP_REPLACE)
353
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
354
table->cursor->ha_start_bulk_insert((ha_rows) 0);
338
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
339
table->file->ha_start_bulk_insert((ha_rows) 0);
355
340
table->copy_blobs=1;
357
342
session->abort_on_warning= true;
364
349
error= read_sep_field(session, info, table_list, fields_vars,
365
350
set_fields, set_values, read_info,
366
351
*enclosed, skip_lines, ignore);
367
if (table->cursor->ha_end_bulk_insert() && !error)
352
if (table->file->ha_end_bulk_insert() && !error)
369
table->print_error(errno, MYF(0));
354
table->file->print_error(my_errno, MYF(0));
372
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
373
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
357
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
358
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
374
359
table->next_number_field=0;
377
internal::my_close(file,MYF(0));
362
my_close(file,MYF(0));
378
363
free_blobs(table); /* if pack_blob was used */
379
364
table->copy_blobs=0;
380
365
session->count_cuted_fields= CHECK_FIELD_IGNORE;
388
373
error= -1; // Error on read
391
snprintf(name, sizeof(name), ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
376
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
392
377
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
394
if (session->transaction.stmt.hasModifiedNonTransData())
395
session->transaction.all.markModifiedNonTransData();
379
if (session->transaction.stmt.modified_non_trans_table)
380
session->transaction.all.modified_non_trans_table= true;
397
382
/* ok to client sent only after binlog write and engine commit */
398
session->my_ok(info.copied + info.deleted, 0, 0L, name);
383
session->my_ok(info.copied + info.deleted, 0L, name);
400
385
assert(transactional_table || !(info.copied || info.deleted) ||
401
session->transaction.stmt.hasModifiedNonTransData());
402
table->cursor->ha_release_auto_increment();
386
session->transaction.stmt.modified_non_trans_table);
387
table->file->ha_release_auto_increment();
403
388
table->auto_increment_field_not_null= false;
404
389
session->abort_on_warning= 0;
480
465
unsigned char save_chr;
481
466
if ((length=(uint32_t) (read_info.row_end-pos)) >
482
467
field->field_length)
484
468
length=field->field_length;
486
save_chr=pos[length];
487
pos[length]='\0'; // Add temp null terminator for store()
469
save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
488
470
field->store((char*) pos,length,read_info.read_charset);
489
471
pos[length]=save_chr;
490
472
if ((pos+=length) > read_info.row_end)
741
READ_INFO::READ_INFO(int file_par, size_t tot_length,
742
const CHARSET_INFO * const cs,
723
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
743
724
String &field_term, String &line_start, String &line_term,
744
725
String &enclosed_par, int escape, bool is_fifo)
745
:cursor(file_par),escape_char(escape)
726
:file(file_par),escape_char(escape)
747
728
read_charset= cs;
748
729
field_term_ptr=(char*) field_term.ptr();
778
759
/* Set of a stack for unget if long terminators */
779
uint32_t length= max(field_term_length,line_term_length)+1;
760
uint32_t length=cmax(field_term_length,line_term_length)+1;
780
761
set_if_bigger(length,line_start.length());
781
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
762
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
783
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
764
if (!(buffer=(unsigned char*) malloc(buff_length+1)))
765
error=1; /* purecov: inspected */
787
768
end_of_buff=buffer+buff_length;
788
if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
789
(false) ? internal::READ_NET :
790
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
769
if (init_io_cache(&cache,(false) ? -1 : file, 0,
771
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
793
free((unsigned char*) buffer);
774
free((unsigned char*) buffer); /* purecov: inspected */
881
862
while ( to < end_of_buff)
884
866
if ((my_mbcharlen(read_charset, chr) > 1) &&
885
867
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
887
unsigned char* p = (unsigned char*)to;
889
int ml = my_mbcharlen(read_charset, chr);
891
for (i=1; i<ml; i++) {
897
if (my_ismbchar(read_charset,
902
PUSH((unsigned char) *--to);
869
unsigned char* p = (unsigned char*)to;
871
int ml = my_mbcharlen(read_charset, chr);
873
for (i=1; i<ml; i++) {
879
if (my_ismbchar(read_charset,
884
PUSH((unsigned char) *--to);
905
888
if (chr == my_b_EOF)
907
890
if (chr == escape_char)
909
if ((chr=GET) == my_b_EOF)
911
*to++= (unsigned char) escape_char;
892
if ((chr=GET) == my_b_EOF)
894
*to++= (unsigned char) escape_char;
915
898
When escape_char == enclosed_char, we treat it like we do for
916
899
handling quotes in SQL parsing -- you can double-up the
929
912
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
930
913
if (chr == line_term_char)
932
if (chr == line_term_char && found_enclosed_char == INT_MAX)
915
if (chr == line_term_char && found_enclosed_char == INT_MAX)
935
if (terminator(line_term_ptr,line_term_length))
936
{ // Maybe unexpected linefeed
918
if (terminator(line_term_ptr,line_term_length))
919
{ // Maybe unexpected linefeed
944
927
if (chr == found_enclosed_char)
946
if ((chr=GET) == found_enclosed_char)
947
{ // Remove dupplicated
948
*to++ = (unsigned char) chr;
951
// End of enclosed field if followed by field_term or line_term
952
if (chr == my_b_EOF ||
953
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
954
{ // Maybe unexpected linefeed
961
if (chr == field_term_char &&
962
terminator(field_term_ptr,field_term_length))
970
The string didn't terminate yet.
971
Store back next character for the loop
974
/* copy the found term character to 'to' */
975
chr= found_enclosed_char;
929
if ((chr=GET) == found_enclosed_char)
930
{ // Remove dupplicated
931
*to++ = (unsigned char) chr;
934
// End of enclosed field if followed by field_term or line_term
935
if (chr == my_b_EOF ||
936
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
937
{ // Maybe unexpected linefeed
944
if (chr == field_term_char &&
945
terminator(field_term_ptr,field_term_length))
953
The string didn't terminate yet.
954
Store back next character for the loop
957
/* copy the found term character to 'to' */
958
chr= found_enclosed_char;
977
960
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
979
if (terminator(field_term_ptr,field_term_length))
962
if (terminator(field_term_ptr,field_term_length))
987
970
*to++ = (unsigned char) chr;
990
** We come here if buffer is too small. Enlarge it and continue
973
** We come here if buffer is too small. Enlarge it and continue
992
975
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
993
976
return (error=1);
994
977
to=new_buffer + (to-buffer);
1089
if (my_mbcharlen(read_charset, chr) > 1)
1092
chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1095
if (chr == escape_char)
1098
if (chr == my_b_EOF)
1073
if (my_mbcharlen(read_charset, chr) > 1)
1076
chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1079
if (chr == escape_char)
1083
if (chr == my_b_EOF)