12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
21
#include <drizzled/sql_load.h>
22
#include <drizzled/error.h>
23
#include <drizzled/data_home.h>
24
#include <drizzled/session.h>
25
#include <drizzled/sql_base.h>
26
#include <drizzled/field/epoch.h>
27
#include <drizzled/internal/my_sys.h>
28
#include <drizzled/internal/iocache.h>
29
#include <drizzled/plugin/storage_engine.h>
35
#include <boost/filesystem.hpp>
37
namespace fs=boost::filesystem;
19
#include <drizzled/server_includes.h>
21
#include <drizzled/drizzled_error_messages.h>
44
unsigned char *buffer; /* Buffer for read text */
45
unsigned char *end_of_buff; /* Data in bufferts ends here */
46
size_t buff_length; /* Length of buffert */
47
size_t max_length; /* Max length of row */
26
unsigned char *buffer, /* Buffer for read text */
27
*end_of_buff; /* Data in bufferts ends here */
28
uint buff_length, /* Length of buffert */
29
max_length; /* Max length of row */
48
30
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
49
31
uint field_term_length,line_term_length,enclosed_length;
50
32
int field_term_char,line_term_char,enclosed_char,escape_char;
51
33
int *stack,*stack_pos;
52
34
bool found_end_of_line,start_of_line,eof;
53
35
bool need_end_io_cache;
54
internal::IO_CACHE cache;
57
40
bool error,line_cuted,found_null,enclosed;
77
60
void end_io_cache()
62
::end_io_cache(&cache);
80
63
need_end_io_cache = 0;
84
67
Either this method, or we need to make cache public
85
Arg must be set from load() since constructor does not see
86
either the table or Session value
68
Arg must be set from mysql_load() since constructor does not see
69
either the table or THD value
88
71
void set_io_cache_arg(void* arg) { cache.arg = arg; }
91
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
92
75
List<Item> &fields_vars, List<Item> &set_fields,
93
76
List<Item> &set_values, READ_INFO &read_info,
94
77
uint32_t skip_lines,
95
78
bool ignore_check_option_errors);
96
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
79
static int read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
97
80
List<Item> &fields_vars, List<Item> &set_fields,
98
81
List<Item> &set_values, READ_INFO &read_info,
99
82
String &enclosed, uint32_t skip_lines,
100
83
bool ignore_check_option_errors);
85
static bool write_execute_load_query_log_event(THD *thd,
86
bool duplicates, bool ignore,
87
bool transactional_table,
88
THD::killed_state killed_status);
104
91
Execute LOAD DATA query
108
session - current thread
109
ex - file_exchange object representing source cursor and its parsing rules
96
ex - sql_exchange object representing source file and its parsing rules
110
97
table_list - list of tables to which we are loading data
111
98
fields_vars - list of fields and variables to which we read
113
100
set_fields - list of fields mentioned in set clause
114
101
set_values - expressions to assign to fields in previous list
115
102
handle_duplicates - indicates whenever we should emit error or
116
103
replace row if we will meet duplicates.
117
104
ignore - - indicates whenever we should ignore duplicates
105
read_file_from_client - is this LOAD DATA LOCAL ?
120
108
true - error / false - success
123
int load(Session *session,file_exchange *ex,TableList *table_list,
111
int mysql_load(THD *thd,sql_exchange *ex,TableList *table_list,
124
112
List<Item> &fields_vars, List<Item> &set_fields,
125
113
List<Item> &set_values,
126
enum enum_duplicates handle_duplicates, bool ignore)
114
enum enum_duplicates handle_duplicates, bool ignore,
115
bool read_file_from_client)
117
char name[FN_REFLEN];
129
119
Table *table= NULL;
131
121
String *field_term=ex->field_term,*escaped=ex->escaped;
132
122
String *enclosed=ex->enclosed;
135
assert(table_list->getSchemaName()); // This should never be null
124
LOAD_FILE_INFO lf_info;
125
char *db = table_list->db; // This is never null
138
If path for cursor is not defined, we will use the current database.
127
If path for file is not defined, we will use the current database.
139
128
If this is not set, we will use the directory where the table to be
140
129
loaded is located
142
util::string::const_shared_ptr schema(session->schema());
143
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
131
char *tdb= thd->db ? thd->db : db; // Result is never null
145
132
uint32_t skip_lines= ex->skip_lines;
146
133
bool transactional_table;
147
Session::killed_state_t killed_status= Session::NOT_KILLED;
134
THD::killed_state killed_status= THD::NOT_KILLED;
149
/* Escape and enclosed character may be a utf8 4-byte character */
150
if (escaped->length() > 4 || enclosed->length() > 4)
136
if (escaped->length() > 1 || enclosed->length() > 1)
152
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
138
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
156
if (session->openTablesLock(table_list))
142
if (open_and_lock_tables(thd, table_list))
159
if (setup_tables_and_check_access(session, &session->getLex()->select_lex.context,
160
&session->getLex()->select_lex.top_join_list,
144
if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
&thd->lex->select_lex.top_join_list,
162
&session->getLex()->select_lex.leaf_tables, true))
147
&thd->lex->select_lex.leaf_tables, true))
170
155
table is marked to be 'used for insert' in which case we should never
171
156
mark this table as 'const table' (ie, one that has only one row).
173
if (unique_table(table_list, table_list->next_global))
158
if (unique_table(thd, table_list, table_list->next_global, 0))
175
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
160
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
179
164
table= table_list->table;
180
transactional_table= table->cursor->has_transactions();
165
transactional_table= table->file->has_transactions();
182
167
if (!fields_vars.elements)
185
for (field= table->getFields(); *field ; field++)
170
for (field=table->field; *field ; field++)
186
171
fields_vars.push_back(new Item_field(*field));
187
table->setWriteSet();
172
bitmap_set_all(table->write_set);
188
173
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
190
175
Let us also prepare SET clause, altough it is probably empty
193
if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
194
setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
178
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
198
183
{ // Part field list
199
184
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
200
if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
201
setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
202
check_that_all_fields_are_given_values(session, table, table_list))
185
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(thd, table, table_list))
205
190
Check whenever TIMESTAMP field with auto-set feature specified
261
fs::path to_file(ex->file_name);
262
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
263
if (not to_file.has_root_directory())
246
/* We can't give an error in the middle when using LOCAL files */
247
if (read_file_from_client && handle_duplicates == DUP_ERROR)
250
if (read_file_from_client)
265
int count_elements= 0;
266
for (fs::path::iterator iter= to_file.begin();
267
iter != to_file.end();
268
++iter, ++count_elements)
271
if (count_elements == 1)
275
target_path /= to_file;
252
(void)net_request_file(&thd->net,ex->file_name);
279
target_path= to_file;
282
if (not secure_file_priv.string().empty())
284
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
286
/* Read only allowed from within dir specified by secure_file_priv */
287
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
257
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
258
ex->file_name+=dirname_length(ex->file_name);
260
if (!dirname_length(ex->file_name))
262
strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NULL);
263
(void) fn_format(name, ex->file_name, name, "",
264
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
268
(void) fn_format(name, ex->file_name, mysql_real_data_home, "",
269
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
271
if (opt_secure_file_priv &&
272
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
274
/* Read only allowed from within dir specified by secure_file_priv */
275
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
279
struct stat stat_info;
280
if (stat(name,&stat_info))
283
// if we are not in slave thread, the file must be:
284
if (!thd->slave_thread &&
285
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
286
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
287
((stat_info.st_mode & S_IFREG) == S_IFREG ||
288
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
290
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
293
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
296
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
292
struct stat stat_info;
293
if (stat(target_path.file_string().c_str(), &stat_info))
295
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
299
// if we are not in slave thread, the cursor must be:
300
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
301
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
302
((stat_info.st_mode & S_IFREG) == S_IFREG ||
303
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
305
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
308
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
312
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
314
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
318
301
memset(&info, 0, sizeof(info));
319
302
info.ignore= ignore;
320
303
info.handle_duplicates=handle_duplicates;
321
304
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
323
identifier::Schema identifier(*schema);
324
READ_INFO read_info(file, tot_length,
325
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
326
*field_term, *ex->line_start, *ex->line_term, *enclosed,
327
info.escape_char, is_fifo);
306
READ_INFO read_info(file,tot_length,
307
ex->cs ? ex->cs : thd->variables.collation_database,
308
*field_term,*ex->line_start, *ex->line_term, *enclosed,
309
info.escape_char, read_file_from_client, is_fifo);
328
310
if (read_info.error)
331
internal::my_close(file,MYF(0)); // no files in net reading
313
my_close(file,MYF(0)); // no files in net reading
332
314
return(true); // Can't allocate buffers
336
* Per the SQL standard, inserting NULL into a NOT NULL
337
* field requires an error to be thrown.
341
* NULL check and handling occurs in field_conv.cc
343
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
344
session->cuted_fields=0L;
317
if (mysql_bin_log.is_open())
320
lf_info.wrote_create_file = 0;
321
lf_info.last_pos_in_file = HA_POS_ERROR;
322
lf_info.log_delayed= transactional_table;
323
read_info.set_io_cache_arg((void*) &lf_info);
326
thd->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
327
thd->cuted_fields=0L;
345
328
/* Skip lines if there is a line terminator */
346
329
if (ex->line_term->length())
360
343
table->next_number_field=table->found_next_number_field;
362
345
handle_duplicates == DUP_REPLACE)
363
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
346
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
364
347
if (handle_duplicates == DUP_REPLACE)
365
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
366
table->cursor->ha_start_bulk_insert((ha_rows) 0);
348
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
349
table->file->ha_start_bulk_insert((ha_rows) 0);
367
350
table->copy_blobs=1;
369
session->setAbortOnWarning(true);
352
thd->abort_on_warning= true;
371
354
if (!field_term->length() && !enclosed->length())
372
error= read_fixed_length(session, info, table_list, fields_vars,
355
error= read_fixed_length(thd, info, table_list, fields_vars,
373
356
set_fields, set_values, read_info,
374
357
skip_lines, ignore);
376
error= read_sep_field(session, info, table_list, fields_vars,
359
error= read_sep_field(thd, info, table_list, fields_vars,
377
360
set_fields, set_values, read_info,
378
361
*enclosed, skip_lines, ignore);
379
if (table->cursor->ha_end_bulk_insert() && !error)
362
if (table->file->ha_end_bulk_insert() && !error)
381
table->print_error(errno, MYF(0));
364
table->file->print_error(my_errno, MYF(0));
384
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
385
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
367
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
368
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
386
369
table->next_number_field=0;
389
internal::my_close(file,MYF(0));
372
my_close(file,MYF(0));
390
373
free_blobs(table); /* if pack_blob was used */
391
374
table->copy_blobs=0;
392
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
375
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
394
377
simulated killing in the middle of per-row loop
395
378
must be effective for binlogging
397
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
380
killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
383
if (read_file_from_client)
384
while (!read_info.next_line())
387
if (mysql_bin_log.is_open())
391
Make sure last block (the one which caused the error) gets
392
logged. This is needed because otherwise after write of (to
393
the binlog, not to read_info (which is a cache))
394
Delete_file_log_event the bad block will remain in read_info
395
(because pre_read is not called at the end of the last
396
block; remember pre_read is called whenever a new block is
397
read from disk). At the end of mysql_load(), the destructor
398
of read_info will call end_io_cache() which will flush
399
read_info, so we will finally have this in the binlog:
401
Append_block # The last successfull block
403
Append_block # The failing block
405
Or could also be (for a small file)
406
Create_file # The failing block
407
which is nonsense (Delete_file is not written in this case, because:
408
Create_file has not been written, so Delete_file is not written, then
409
when read_info is destroyed end_io_cache() is called which writes
412
read_info.end_io_cache();
413
/* If the file was not empty, wrote_create_file is true */
414
if (lf_info.wrote_create_file)
416
if (thd->transaction.stmt.modified_non_trans_table)
417
write_execute_load_query_log_event(thd, handle_duplicates,
418
ignore, transactional_table,
422
Delete_file_log_event d(thd, db, transactional_table);
423
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
424
mysql_bin_log.write(&d);
400
429
error= -1; // Error on read
405
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
406
(info.records - info.copied), session->cuted_fields);
408
if (session->transaction.stmt.hasModifiedNonTransData())
409
session->transaction.all.markModifiedNonTransData();
432
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
433
(uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
435
if (thd->transaction.stmt.modified_non_trans_table)
436
thd->transaction.all.modified_non_trans_table= true;
438
if (mysql_bin_log.is_open())
441
We need to do the job that is normally done inside
442
binlog_query() here, which is to ensure that the pending event
443
is written before tables are unlocked and before any other
444
events are written. We also need to update the table map
445
version for the binary log to mark that table maps are invalid
448
if (thd->current_stmt_binlog_row_based)
449
thd->binlog_flush_pending_rows_event(true);
453
As already explained above, we need to call end_io_cache() or the last
454
block will be logged only after Execute_load_query_log_event (which is
455
wrong), when read_info is destroyed.
457
read_info.end_io_cache();
458
if (lf_info.wrote_create_file)
460
write_execute_load_query_log_event(thd, handle_duplicates, ignore,
461
transactional_table,killed_status);
411
466
/* ok to client sent only after binlog write and engine commit */
412
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
467
my_ok(thd, info.copied + info.deleted, 0L, name);
414
469
assert(transactional_table || !(info.copied || info.deleted) ||
415
session->transaction.stmt.hasModifiedNonTransData());
416
table->cursor->ha_release_auto_increment();
470
thd->transaction.stmt.modified_non_trans_table);
471
table->file->ha_release_auto_increment();
417
472
table->auto_increment_field_not_null= false;
418
session->setAbortOnWarning(false);
473
thd->abort_on_warning= 0;
478
/* Not a very useful function; just to avoid duplication of code */
479
static bool write_execute_load_query_log_event(THD *thd,
480
bool duplicates, bool ignore,
481
bool transactional_table,
482
THD::killed_state killed_err_arg)
484
Execute_load_query_log_event
485
e(thd, thd->query, thd->query_length,
486
(char*)thd->lex->fname_start - (char*)thd->query,
487
(char*)thd->lex->fname_end - (char*)thd->query,
488
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
489
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
490
transactional_table, false, killed_err_arg);
491
e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
492
return mysql_bin_log.write(&e);
424
496
/****************************************************************************
425
497
** Read of rows of fixed size + optional garage + optonal newline
426
498
****************************************************************************/
429
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
501
read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
430
502
List<Item> &fields_vars, List<Item> &set_fields,
431
503
List<Item> &set_values, READ_INFO &read_info,
432
504
uint32_t skip_lines, bool ignore_check_option_errors)
434
List<Item>::iterator it(fields_vars.begin());
506
List_iterator_fast<Item> it(fields_vars);
435
507
Item_field *sql_field;
436
508
Table *table= table_list->table;
442
514
while (!read_info.read_fixed_length())
444
if (session->getKilled())
446
session->send_kill_message();
518
thd->send_kill_message();
948
1022
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
949
1023
if (chr == line_term_char)
951
if (chr == line_term_char && found_enclosed_char == INT_MAX)
1025
if (chr == line_term_char && found_enclosed_char == INT_MAX)
954
if (terminator(line_term_ptr,line_term_length))
955
{ // Maybe unexpected linefeed
1028
if (terminator(line_term_ptr,line_term_length))
1029
{ // Maybe unexpected linefeed
1031
found_end_of_line=1;
963
1037
if (chr == found_enclosed_char)
965
if ((chr=GET) == found_enclosed_char)
966
{ // Remove dupplicated
967
*to++ = (unsigned char) chr;
970
// End of enclosed field if followed by field_term or line_term
971
if (chr == my_b_EOF ||
972
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
973
{ // Maybe unexpected linefeed
980
if (chr == field_term_char &&
981
terminator(field_term_ptr,field_term_length))
989
The string didn't terminate yet.
990
Store back next character for the loop
993
/* copy the found term character to 'to' */
994
chr= found_enclosed_char;
1039
if ((chr=GET) == found_enclosed_char)
1040
{ // Remove dupplicated
1041
*to++ = (unsigned char) chr;
1044
// End of enclosed field if followed by field_term or line_term
1045
if (chr == my_b_EOF ||
1046
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
1047
{ // Maybe unexpected linefeed
1049
found_end_of_line=1;
1054
if (chr == field_term_char &&
1055
terminator(field_term_ptr,field_term_length))
1063
The string didn't terminate yet.
1064
Store back next character for the loop
1067
/* copy the found term character to 'to' */
1068
chr= found_enclosed_char;
996
1070
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
998
if (terminator(field_term_ptr,field_term_length))
1072
if (terminator(field_term_ptr,field_term_length))
1006
1080
*to++ = (unsigned char) chr;
1009
** We come here if buffer is too small. Enlarge it and continue
1011
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
1083
** We come here if buffer is too small. Enlarge it and continue
1085
if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
1012
1087
return (error=1);
1013
1088
to=new_buffer + (to-buffer);
1014
1089
buffer=new_buffer;