17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
23
#include <drizzled/session.h>
24
24
#include <drizzled/sql_base.h>
25
25
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
41
unsigned char *buffer; /* Buffer for read text */
42
unsigned char *end_of_buff; /* Data in bufferts ends here */
43
size_t buff_length; /* Length of buffert */
44
size_t max_length; /* Max length of row */
29
unsigned char *buffer, /* Buffer for read text */
30
*end_of_buff; /* Data in bufferts ends here */
31
uint buff_length, /* Length of buffert */
32
max_length; /* Max length of row */
45
33
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
46
34
uint field_term_length,line_term_length,enclosed_length;
47
35
int field_term_char,line_term_char,enclosed_char,escape_char;
48
36
int *stack,*stack_pos;
49
37
bool found_end_of_line,start_of_line,eof;
50
38
bool need_end_io_cache;
51
internal::IO_CACHE cache;
54
42
bool error,line_cuted,found_null,enclosed;
56
44
*row_end; /* Found row ends here */
57
45
const CHARSET_INFO *read_charset;
59
READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
47
READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
60
48
String &field_term,String &line_start,String &line_term,
61
49
String &enclosed,int escape, bool is_fifo);
105
93
session - current thread
106
ex - file_exchange object representing source cursor and its parsing rules
94
ex - file_exchange object representing source file and its parsing rules
107
95
table_list - list of tables to which we are loading data
108
96
fields_vars - list of fields and variables to which we read
110
98
set_fields - list of fields mentioned in set clause
111
99
set_values - expressions to assign to fields in previous list
112
100
handle_duplicates - indicates whenever we should emit error or
132
120
char *db= table_list->db; // This is never null
135
If path for cursor is not defined, we will use the current database.
123
If path for file is not defined, we will use the current database.
136
124
If this is not set, we will use the directory where the table to be
137
125
loaded is located
139
const char *tdb= session->db.empty() ? db : session->db.c_str(); // Result is never null
127
char *tdb= session->db ? session->db : db; // Result is never null
141
129
uint32_t skip_lines= ex->skip_lines;
142
130
bool transactional_table;
148
136
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
152
if (session->openTablesLock(table_list))
139
if (open_and_lock_tables(session, table_list))
155
141
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
156
142
&session->lex->select_lex.top_join_list,
166
152
table is marked to be 'used for insert' in which case we should never
167
153
mark this table as 'const table' (ie, one that has only one row).
169
if (unique_table(table_list, table_list->next_global))
155
if (unique_table(session, table_list, table_list->next_global, 0))
171
157
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
161
table= table_list->table;
176
transactional_table= table->cursor->has_transactions();
162
transactional_table= table->file->has_transactions();
178
164
if (!fields_vars.elements)
181
167
for (field=table->field; *field ; field++)
182
168
fields_vars.push_back(new Item_field(*field));
183
table->setWriteSet();
169
bitmap_set_all(table->write_set);
184
170
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
186
172
Let us also prepare SET clause, altough it is probably empty
204
190
if (table->timestamp_field)
206
if (table->isWriteSet(table->timestamp_field->field_index))
192
if (bitmap_is_set(table->write_set,
193
table->timestamp_field->field_index))
207
194
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
210
table->setWriteSet(table->timestamp_field->field_index);
197
bitmap_set_bit(table->write_set,
198
table->timestamp_field->field_index);
213
201
/* Fix the expressions in SET clause */
256
244
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
257
245
ex->file_name+=dirname_length(ex->file_name);
259
if (!internal::dirname_length(ex->file_name))
247
if (!dirname_length(ex->file_name))
261
249
strcpy(name, drizzle_real_data_home);
262
250
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
263
(void) internal::fn_format(name, ex->file_name, name, "",
251
(void) fn_format(name, ex->file_name, name, "",
264
252
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
268
(void) internal::fn_format(name, ex->file_name, drizzle_real_data_home, "",
256
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
269
257
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
271
259
if (opt_secure_file_priv &&
286
// if we are not in slave thread, the cursor must be:
274
// if we are not in slave thread, the file must be:
287
275
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
288
276
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
277
((stat_info.st_mode & S_IFREG) == S_IFREG ||
295
283
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
298
if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
286
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
300
my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
288
my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
308
296
info.handle_duplicates=handle_duplicates;
309
297
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
311
SchemaIdentifier identifier(session->db);
312
READ_INFO read_info(file, tot_length,
313
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
314
*field_term, *ex->line_start, *ex->line_term, *enclosed,
299
READ_INFO read_info(file,tot_length,
300
ex->cs ? ex->cs : session->variables.collation_database,
301
*field_term,*ex->line_start, *ex->line_term, *enclosed,
315
302
info.escape_char, is_fifo);
316
303
if (read_info.error)
319
internal::my_close(file,MYF(0)); // no files in net reading
306
my_close(file,MYF(0)); // no files in net reading
320
307
return(true); // Can't allocate buffers
348
335
table->next_number_field=table->found_next_number_field;
350
337
handle_duplicates == DUP_REPLACE)
351
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
338
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
352
339
if (handle_duplicates == DUP_REPLACE)
353
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
354
table->cursor->ha_start_bulk_insert((ha_rows) 0);
340
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
341
table->file->ha_start_bulk_insert((ha_rows) 0);
355
342
table->copy_blobs=1;
357
344
session->abort_on_warning= true;
364
351
error= read_sep_field(session, info, table_list, fields_vars,
365
352
set_fields, set_values, read_info,
366
353
*enclosed, skip_lines, ignore);
367
if (table->cursor->ha_end_bulk_insert() && !error)
354
if (table->file->ha_end_bulk_insert() && !error)
369
table->print_error(errno, MYF(0));
356
table->file->print_error(my_errno, MYF(0));
372
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
373
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
359
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
360
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
374
361
table->next_number_field=0;
377
internal::my_close(file,MYF(0));
364
my_close(file,MYF(0));
378
365
free_blobs(table); /* if pack_blob was used */
379
366
table->copy_blobs=0;
380
367
session->count_cuted_fields= CHECK_FIELD_IGNORE;
388
375
error= -1; // Error on read
391
snprintf(name, sizeof(name), ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
378
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
392
379
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
394
if (session->transaction.stmt.hasModifiedNonTransData())
395
session->transaction.all.markModifiedNonTransData();
381
if (session->transaction.stmt.modified_non_trans_table)
382
session->transaction.all.modified_non_trans_table= true;
397
384
/* ok to client sent only after binlog write and engine commit */
398
session->my_ok(info.copied + info.deleted, 0, 0L, name);
385
session->my_ok(info.copied + info.deleted, 0L, name);
400
387
assert(transactional_table || !(info.copied || info.deleted) ||
401
session->transaction.stmt.hasModifiedNonTransData());
402
table->cursor->ha_release_auto_increment();
388
session->transaction.stmt.modified_non_trans_table);
389
table->file->ha_release_auto_increment();
403
390
table->auto_increment_field_not_null= false;
404
391
session->abort_on_warning= 0;
480
467
unsigned char save_chr;
481
468
if ((length=(uint32_t) (read_info.row_end-pos)) >
482
469
field->field_length)
484
470
length=field->field_length;
486
save_chr=pos[length];
487
pos[length]='\0'; // Add temp null terminator for store()
471
save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
488
472
field->store((char*) pos,length,read_info.read_charset);
489
473
pos[length]=save_chr;
490
474
if ((pos+=length) > read_info.row_end)
741
READ_INFO::READ_INFO(int file_par, size_t tot_length,
742
const CHARSET_INFO * const cs,
725
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
743
726
String &field_term, String &line_start, String &line_term,
744
727
String &enclosed_par, int escape, bool is_fifo)
745
:cursor(file_par),escape_char(escape)
728
:file(file_par),escape_char(escape)
747
730
read_charset= cs;
748
731
field_term_ptr=(char*) field_term.ptr();
778
761
/* Set of a stack for unget if long terminators */
779
uint32_t length= max(field_term_length,line_term_length)+1;
762
uint32_t length=cmax(field_term_length,line_term_length)+1;
780
763
set_if_bigger(length,line_start.length());
781
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
764
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
783
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
766
if (!(buffer=(unsigned char*) malloc(buff_length+1)))
767
error=1; /* purecov: inspected */
787
770
end_of_buff=buffer+buff_length;
788
if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
789
(false) ? internal::READ_NET :
790
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
771
if (init_io_cache(&cache,(false) ? -1 : file, 0,
773
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
793
free((unsigned char*) buffer);
776
free((unsigned char*) buffer); /* purecov: inspected */
881
864
while ( to < end_of_buff)
884
868
if ((my_mbcharlen(read_charset, chr) > 1) &&
885
869
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
887
unsigned char* p = (unsigned char*)to;
889
int ml = my_mbcharlen(read_charset, chr);
891
for (i=1; i<ml; i++) {
897
if (my_ismbchar(read_charset,
902
PUSH((unsigned char) *--to);
871
unsigned char* p = (unsigned char*)to;
873
int ml = my_mbcharlen(read_charset, chr);
875
for (i=1; i<ml; i++) {
881
if (my_ismbchar(read_charset,
886
PUSH((unsigned char) *--to);
905
890
if (chr == my_b_EOF)
907
892
if (chr == escape_char)
909
if ((chr=GET) == my_b_EOF)
911
*to++= (unsigned char) escape_char;
894
if ((chr=GET) == my_b_EOF)
896
*to++= (unsigned char) escape_char;
915
900
When escape_char == enclosed_char, we treat it like we do for
916
901
handling quotes in SQL parsing -- you can double-up the
929
914
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
930
915
if (chr == line_term_char)
932
if (chr == line_term_char && found_enclosed_char == INT_MAX)
917
if (chr == line_term_char && found_enclosed_char == INT_MAX)
935
if (terminator(line_term_ptr,line_term_length))
936
{ // Maybe unexpected linefeed
920
if (terminator(line_term_ptr,line_term_length))
921
{ // Maybe unexpected linefeed
944
929
if (chr == found_enclosed_char)
946
if ((chr=GET) == found_enclosed_char)
947
{ // Remove dupplicated
948
*to++ = (unsigned char) chr;
951
// End of enclosed field if followed by field_term or line_term
952
if (chr == my_b_EOF ||
953
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
954
{ // Maybe unexpected linefeed
961
if (chr == field_term_char &&
962
terminator(field_term_ptr,field_term_length))
970
The string didn't terminate yet.
971
Store back next character for the loop
974
/* copy the found term character to 'to' */
975
chr= found_enclosed_char;
931
if ((chr=GET) == found_enclosed_char)
932
{ // Remove dupplicated
933
*to++ = (unsigned char) chr;
936
// End of enclosed field if followed by field_term or line_term
937
if (chr == my_b_EOF ||
938
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
939
{ // Maybe unexpected linefeed
946
if (chr == field_term_char &&
947
terminator(field_term_ptr,field_term_length))
955
The string didn't terminate yet.
956
Store back next character for the loop
959
/* copy the found term character to 'to' */
960
chr= found_enclosed_char;
977
962
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
979
if (terminator(field_term_ptr,field_term_length))
964
if (terminator(field_term_ptr,field_term_length))
987
972
*to++ = (unsigned char) chr;
990
** We come here if buffer is too small. Enlarge it and continue
975
** We come here if buffer is too small. Enlarge it and continue
992
977
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
993
978
return (error=1);
994
979
to=new_buffer + (to-buffer);
1089
if (my_mbcharlen(read_charset, chr) > 1)
1092
chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1095
if (chr == escape_char)
1098
if (chr == my_b_EOF)
1075
if (my_mbcharlen(read_charset, chr) > 1)
1078
chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1081
if (chr == escape_char)
1085
if (chr == my_b_EOF)