17
17
/* Copy data from a textfile to table */
19
19
#include <drizzled/server_includes.h>
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
#include <drizzled/session.h>
24
#include <drizzled/sql_base.h>
25
#include <drizzled/field/timestamp.h>
27
unsigned char *buffer, /* Buffer for read text */
28
*end_of_buff; /* Data in bufferts ends here */
29
uint buff_length, /* Length of buffert */
30
max_length; /* Max length of row */
29
unsigned char *buffer; /* Buffer for read text */
30
unsigned char *end_of_buff; /* Data in bufferts ends here */
31
size_t buff_length; /* Length of buffert */
32
size_t max_length; /* Max length of row */
31
33
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
32
34
uint field_term_length,line_term_length,enclosed_length;
33
35
int field_term_char,line_term_char,enclosed_char,escape_char;
43
44
*row_end; /* Found row ends here */
44
45
const CHARSET_INFO *read_charset;
46
READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
47
READ_INFO(File file, size_t tot_length, const CHARSET_INFO * const cs,
47
48
String &field_term,String &line_start,String &line_term,
48
String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
49
String &enclosed,int escape, bool is_fifo);
51
52
int read_fixed_length(void);
83
84
String &enclosed, uint32_t skip_lines,
84
85
bool ignore_check_option_errors);
86
static bool write_execute_load_query_log_event(Session *session,
87
bool duplicates, bool ignore,
88
bool transactional_table,
89
Session::killed_state killed_status);
92
89
Execute LOAD DATA query
103
100
handle_duplicates - indicates whenever we should emit error or
104
101
replace row if we will meet duplicates.
105
102
ignore - - indicates whenever we should ignore duplicates
106
read_file_from_client - is this LOAD DATA LOCAL ?
109
105
true - error / false - success
112
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
108
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
113
109
List<Item> &fields_vars, List<Item> &set_fields,
114
110
List<Item> &set_values,
115
enum enum_duplicates handle_duplicates, bool ignore,
116
bool read_file_from_client)
111
enum enum_duplicates handle_duplicates, bool ignore)
118
113
char name[FN_REFLEN];
122
117
String *field_term=ex->field_term,*escaped=ex->escaped;
123
118
String *enclosed=ex->enclosed;
125
LOAD_FILE_INFO lf_info;
126
char *db = table_list->db; // This is never null
120
char *db= table_list->db; // This is never null
128
123
If path for file is not defined, we will use the current database.
129
124
If this is not set, we will use the directory where the table to be
130
125
loaded is located
132
127
char *tdb= session->db ? session->db : db; // Result is never null
133
129
uint32_t skip_lines= ex->skip_lines;
134
130
bool transactional_table;
135
131
Session::killed_state killed_status= Session::NOT_KILLED;
137
if (escaped->length() > 1 || enclosed->length() > 1)
133
/* Escape and enclosed character may be a utf8 4-byte character */
134
if (escaped->length() > 4 || enclosed->length() > 4)
139
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
136
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
143
139
if (open_and_lock_tables(session, table_list))
171
167
for (field=table->field; *field ; field++)
172
168
fields_vars.push_back(new Item_field(*field));
173
bitmap_set_all(table->write_set);
169
table->setWriteSet();
174
170
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
176
172
Let us also prepare SET clause, altough it is probably empty
194
190
if (table->timestamp_field)
196
if (bitmap_is_set(table->write_set,
197
table->timestamp_field->field_index))
192
if (table->isWriteSet(table->timestamp_field->field_index))
198
193
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
201
bitmap_set_bit(table->write_set,
202
table->timestamp_field->field_index);
196
table->setWriteSet(table->timestamp_field->field_index);
205
199
/* Fix the expressions in SET clause */
247
/* We can't give an error in the middle when using LOCAL files */
248
if (read_file_from_client && handle_duplicates == DUP_ERROR)
251
if (read_file_from_client)
253
(void)net_request_file(&session->net,ex->file_name);
258
242
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
259
243
ex->file_name+=dirname_length(ex->file_name);
281
265
struct stat stat_info;
282
266
if (stat(name,&stat_info))
268
my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
285
272
// if we are not in slave thread, the file must be:
286
if (!session->slave_thread &&
287
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
288
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
273
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
274
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
275
((stat_info.st_mode & S_IFREG) == S_IFREG ||
276
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
292
278
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
305
294
info.handle_duplicates=handle_duplicates;
306
295
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
308
READ_INFO read_info(file,tot_length,
309
ex->cs ? ex->cs : session->variables.collation_database,
297
READ_INFO read_info(file, tot_length,
298
ex->cs ? ex->cs : get_default_db_collation(session->db),
310
299
*field_term,*ex->line_start, *ex->line_term, *enclosed,
311
info.escape_char, read_file_from_client, is_fifo);
300
info.escape_char, is_fifo);
312
301
if (read_info.error)
316
305
return(true); // Can't allocate buffers
319
if (mysql_bin_log.is_open())
321
lf_info.session = session;
322
lf_info.wrote_create_file = 0;
323
lf_info.last_pos_in_file = HA_POS_ERROR;
324
lf_info.log_delayed= transactional_table;
325
read_info.set_io_cache_arg((void*) &lf_info);
328
session->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
309
* Per the SQL standard, inserting NULL into a NOT NULL
310
* field requires an error to be thrown.
314
* NULL check and handling occurs in field_conv.cc
316
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
329
317
session->cuted_fields=0L;
330
318
/* Skip lines if there is a line terminator */
331
319
if (ex->line_term->length())
375
363
free_blobs(table); /* if pack_blob was used */
376
364
table->copy_blobs=0;
377
365
session->count_cuted_fields= CHECK_FIELD_IGNORE;
379
367
simulated killing in the middle of per-row loop
380
368
must be effective for binlogging
382
370
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
385
if (read_file_from_client)
386
while (!read_info.next_line())
389
if (mysql_bin_log.is_open())
393
Make sure last block (the one which caused the error) gets
394
logged. This is needed because otherwise after write of (to
395
the binlog, not to read_info (which is a cache))
396
Delete_file_log_event the bad block will remain in read_info
397
(because pre_read is not called at the end of the last
398
block; remember pre_read is called whenever a new block is
399
read from disk). At the end of mysql_load(), the destructor
400
of read_info will call end_io_cache() which will flush
401
read_info, so we will finally have this in the binlog:
403
Append_block # The last successfull block
405
Append_block # The failing block
407
Or could also be (for a small file)
408
Create_file # The failing block
409
which is nonsense (Delete_file is not written in this case, because:
410
Create_file has not been written, so Delete_file is not written, then
411
when read_info is destroyed end_io_cache() is called which writes
414
read_info.end_io_cache();
415
/* If the file was not empty, wrote_create_file is true */
416
if (lf_info.wrote_create_file)
418
if (session->transaction.stmt.modified_non_trans_table)
419
write_execute_load_query_log_event(session, handle_duplicates,
420
ignore, transactional_table,
424
Delete_file_log_event d(session, db, transactional_table);
425
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
426
mysql_bin_log.write(&d);
431
373
error= -1; // Error on read
437
379
if (session->transaction.stmt.modified_non_trans_table)
438
380
session->transaction.all.modified_non_trans_table= true;
440
if (mysql_bin_log.is_open())
443
We need to do the job that is normally done inside
444
binlog_query() here, which is to ensure that the pending event
445
is written before tables are unlocked and before any other
446
events are written. We also need to update the table map
447
version for the binary log to mark that table maps are invalid
450
if (session->current_stmt_binlog_row_based)
451
session->binlog_flush_pending_rows_event(true);
455
As already explained above, we need to call end_io_cache() or the last
456
block will be logged only after Execute_load_query_log_event (which is
457
wrong), when read_info is destroyed.
459
read_info.end_io_cache();
460
if (lf_info.wrote_create_file)
462
write_execute_load_query_log_event(session, handle_duplicates, ignore,
463
transactional_table,killed_status);
468
382
/* ok to client sent only after binlog write and engine commit */
469
my_ok(session, info.copied + info.deleted, 0L, name);
383
session->my_ok(info.copied + info.deleted, 0L, name);
471
385
assert(transactional_table || !(info.copied || info.deleted) ||
472
386
session->transaction.stmt.modified_non_trans_table);
480
/* Not a very useful function; just to avoid duplication of code */
481
static bool write_execute_load_query_log_event(Session *session,
482
bool duplicates, bool ignore,
483
bool transactional_table,
484
Session::killed_state killed_err_arg)
486
Execute_load_query_log_event
487
e(session, session->query, session->query_length,
488
(char*)session->lex->fname_start - (char*)session->query,
489
(char*)session->lex->fname_end - (char*)session->query,
490
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
491
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
492
transactional_table, false, killed_err_arg);
493
e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
494
return mysql_bin_log.write(&e);
498
394
/****************************************************************************
499
395
** Read of rows of fixed size + optional garage + optonal newline
500
396
****************************************************************************/
537
433
read_info.row_end[0]=0;
540
restore_record(table, s->default_values);
436
table->restoreRecordAsDefault();
542
438
There is no variables in fields_vars list in this format so
543
439
this conversion is safe.
545
441
while ((sql_field= (Item_field*) it++))
547
Field *field= sql_field->field;
443
Field *field= sql_field->field;
548
444
if (field == table->next_number_field)
549
445
table->auto_increment_field_not_null= true;
557
453
if (pos == read_info.row_end)
559
455
session->cuted_fields++; /* Not enough fields */
560
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
561
ER_WARN_TOO_FEW_RECORDS,
456
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
457
ER_WARN_TOO_FEW_RECORDS,
562
458
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
563
459
if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
564
460
((Field_timestamp*) field)->set_time();
569
465
unsigned char save_chr;
570
if ((length=(uint) (read_info.row_end-pos)) >
466
if ((length=(uint32_t) (read_info.row_end-pos)) >
571
467
field->field_length)
572
469
length=field->field_length;
573
save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
471
save_chr=pos[length];
472
pos[length]='\0'; // Add temp null terminator for store()
574
473
field->store((char*) pos,length,read_info.read_charset);
575
474
pos[length]=save_chr;
576
475
if ((pos+=length) > read_info.row_end)
580
479
if (pos != read_info.row_end)
582
481
session->cuted_fields++; /* To long row */
583
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
584
ER_WARN_TOO_MANY_RECORDS,
585
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
482
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
483
ER_WARN_TOO_MANY_RECORDS,
484
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
588
487
if (session->killed ||
604
503
if (read_info.line_cuted)
606
505
session->cuted_fields++; /* To long row */
607
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
608
ER_WARN_TOO_MANY_RECORDS,
609
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
506
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
507
ER_WARN_TOO_MANY_RECORDS,
508
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
611
510
session->row_count++;
785
684
if (read_info.line_cuted)
787
686
session->cuted_fields++; /* To long row */
788
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
789
ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
687
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
688
ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
791
690
if (session->killed)
827
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
726
READ_INFO::READ_INFO(File file_par, size_t tot_length,
727
const CHARSET_INFO * const cs,
828
728
String &field_term, String &line_start, String &line_term,
829
String &enclosed_par, int escape, bool get_it_from_net,
729
String &enclosed_par, int escape, bool is_fifo)
831
730
:file(file_par),escape_char(escape)
833
732
read_charset= cs;
866
765
set_if_bigger(length,line_start.length());
867
766
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
869
if (!(buffer=(unsigned char*) my_malloc(buff_length+1,MYF(0))))
768
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
870
769
error=1; /* purecov: inspected */
873
772
end_of_buff=buffer+buff_length;
874
if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
875
(get_it_from_net) ? READ_NET :
773
if (init_io_cache(&cache,(false) ? -1 : file, 0,
876
775
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
1085
977
** We come here if buffer is too small. Enlarge it and continue
1087
if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
979
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
1089
980
return (error=1);
1090
981
to=new_buffer + (to-buffer);
1091
982
buffer=new_buffer;