12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
21
20
#include <drizzled/sql_load.h>
21
#include <drizzled/replication/replication.h>
22
22
#include <drizzled/error.h>
23
23
#include <drizzled/data_home.h>
24
24
#include <drizzled/session.h>
25
25
#include <drizzled/sql_base.h>
26
#include <drizzled/field/epoch.h>
27
#include "drizzled/internal/my_sys.h"
28
#include "drizzled/internal/iocache.h"
29
#include <drizzled/db.h>
30
#include "drizzled/plugin/storage_engine.h"
36
#include <boost/filesystem.hpp>
38
namespace fs=boost::filesystem;
26
#include <drizzled/field/timestamp.h>
45
unsigned char *buffer; /* Buffer for read text */
46
unsigned char *end_of_buff; /* Data in bufferts ends here */
47
size_t buff_length; /* Length of buffert */
48
size_t max_length; /* Max length of row */
30
unsigned char *buffer, /* Buffer for read text */
31
*end_of_buff; /* Data in bufferts ends here */
32
uint buff_length, /* Length of buffert */
33
max_length; /* Max length of row */
49
34
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
50
35
uint field_term_length,line_term_length,enclosed_length;
51
36
int field_term_char,line_term_char,enclosed_char,escape_char;
52
37
int *stack,*stack_pos;
53
38
bool found_end_of_line,start_of_line,eof;
54
39
bool need_end_io_cache;
55
internal::IO_CACHE cache;
58
44
bool error,line_cuted,found_null,enclosed;
78
64
void end_io_cache()
66
::end_io_cache(&cache);
81
67
need_end_io_cache = 0;
85
71
Either this method, or we need to make cache public
86
Arg must be set from load() since constructor does not see
72
Arg must be set from mysql_load() since constructor does not see
87
73
either the table or Session value
89
75
void set_io_cache_arg(void* arg) { cache.arg = arg; }
92
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
78
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
93
79
List<Item> &fields_vars, List<Item> &set_fields,
94
80
List<Item> &set_values, READ_INFO &read_info,
95
81
uint32_t skip_lines,
96
82
bool ignore_check_option_errors);
97
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
83
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
98
84
List<Item> &fields_vars, List<Item> &set_fields,
99
85
List<Item> &set_values, READ_INFO &read_info,
100
86
String &enclosed, uint32_t skip_lines,
105
91
Execute LOAD DATA query
109
95
session - current thread
110
ex - file_exchange object representing source cursor and its parsing rules
96
ex - sql_exchange object representing source file and its parsing rules
111
97
table_list - list of tables to which we are loading data
112
98
fields_vars - list of fields and variables to which we read
114
100
set_fields - list of fields mentioned in set clause
115
101
set_values - expressions to assign to fields in previous list
116
102
handle_duplicates - indicates whenever we should emit error or
117
103
replace row if we will meet duplicates.
118
104
ignore - - indicates whenever we should ignore duplicates
105
read_file_from_client - is this LOAD DATA LOCAL ?
121
108
true - error / false - success
124
int load(Session *session,file_exchange *ex,TableList *table_list,
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
125
112
List<Item> &fields_vars, List<Item> &set_fields,
126
113
List<Item> &set_values,
127
enum enum_duplicates handle_duplicates, bool ignore)
114
enum enum_duplicates handle_duplicates, bool ignore,
115
bool read_file_from_client)
117
char name[FN_REFLEN];
130
119
Table *table= NULL;
132
121
String *field_term=ex->field_term,*escaped=ex->escaped;
133
122
String *enclosed=ex->enclosed;
136
assert(table_list->getSchemaName()); // This should never be null
124
char *db= table_list->db; // This is never null
139
If path for cursor is not defined, we will use the current database.
127
If path for file is not defined, we will use the current database.
140
128
If this is not set, we will use the directory where the table to be
141
129
loaded is located
143
util::string::const_shared_ptr schema(session->schema());
144
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
131
char *tdb= session->db ? session->db : db; // Result is never null
146
133
uint32_t skip_lines= ex->skip_lines;
147
134
bool transactional_table;
148
Session::killed_state_t killed_status= Session::NOT_KILLED;
135
Session::killed_state killed_status= Session::NOT_KILLED;
150
/* Escape and enclosed character may be a utf8 4-byte character */
151
if (escaped->length() > 4 || enclosed->length() > 4)
137
if (escaped->length() > 1 || enclosed->length() > 1)
153
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
139
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
157
if (session->openTablesLock(table_list))
143
if (open_and_lock_tables(session, table_list))
160
145
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
161
146
&session->lex->select_lex.top_join_list,
171
156
table is marked to be 'used for insert' in which case we should never
172
157
mark this table as 'const table' (ie, one that has only one row).
174
if (unique_table(table_list, table_list->next_global))
159
if (unique_table(session, table_list, table_list->next_global, 0))
176
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
161
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
180
165
table= table_list->table;
181
transactional_table= table->cursor->has_transactions();
166
transactional_table= table->file->has_transactions();
183
168
if (!fields_vars.elements)
186
for (field= table->getFields(); *field ; field++)
171
for (field=table->field; *field ; field++)
187
172
fields_vars.push_back(new Item_field(*field));
188
table->setWriteSet();
173
bitmap_set_all(table->write_set);
189
174
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
191
176
Let us also prepare SET clause, altough it is probably empty
262
fs::path to_file(ex->file_name);
263
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
264
if (not to_file.has_root_directory())
247
/* We can't give an error in the middle when using LOCAL files */
248
if (read_file_from_client && handle_duplicates == DUP_ERROR)
251
if (read_file_from_client)
266
int count_elements= 0;
267
for (fs::path::iterator iter= to_file.begin();
268
iter != to_file.end();
269
++iter, ++count_elements)
272
if (count_elements == 1)
276
target_path /= to_file;
253
(void)net_request_file(&session->net,ex->file_name);
280
target_path= to_file;
283
if (not secure_file_priv.string().empty())
285
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
287
/* Read only allowed from within dir specified by secure_file_priv */
288
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
258
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
259
ex->file_name+=dirname_length(ex->file_name);
261
if (!dirname_length(ex->file_name))
263
strcpy(name, drizzle_real_data_home);
264
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
265
(void) fn_format(name, ex->file_name, name, "",
266
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
270
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
271
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
273
if (opt_secure_file_priv &&
274
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
276
/* Read only allowed from within dir specified by secure_file_priv */
277
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
281
struct stat stat_info;
282
if (stat(name,&stat_info))
285
// if we are not in slave thread, the file must be:
286
if (!session->slave_thread &&
287
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
288
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
292
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
295
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
298
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
293
struct stat stat_info;
294
if (stat(target_path.file_string().c_str(), &stat_info))
296
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
300
// if we are not in slave thread, the cursor must be:
301
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
302
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
303
((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
306
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
309
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
313
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
315
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
319
303
memset(&info, 0, sizeof(info));
320
304
info.ignore= ignore;
321
305
info.handle_duplicates=handle_duplicates;
322
306
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
324
identifier::Schema identifier(*schema);
325
READ_INFO read_info(file, tot_length,
326
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
327
*field_term, *ex->line_start, *ex->line_term, *enclosed,
328
info.escape_char, is_fifo);
308
READ_INFO read_info(file,tot_length,
309
ex->cs ? ex->cs : session->variables.collation_database,
310
*field_term,*ex->line_start, *ex->line_term, *enclosed,
311
info.escape_char, read_file_from_client, is_fifo);
329
312
if (read_info.error)
332
internal::my_close(file,MYF(0)); // no files in net reading
315
my_close(file,MYF(0)); // no files in net reading
333
316
return(true); // Can't allocate buffers
337
* Per the SQL standard, inserting NULL into a NOT NULL
338
* field requires an error to be thrown.
342
* NULL check and handling occurs in field_conv.cc
344
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
319
session->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
345
320
session->cuted_fields=0L;
346
321
/* Skip lines if there is a line terminator */
347
322
if (ex->line_term->length())
377
352
error= read_sep_field(session, info, table_list, fields_vars,
378
353
set_fields, set_values, read_info,
379
354
*enclosed, skip_lines, ignore);
380
if (table->cursor->ha_end_bulk_insert() && !error)
355
if (table->file->ha_end_bulk_insert() && !error)
382
table->print_error(errno, MYF(0));
357
table->file->print_error(my_errno, MYF(0));
385
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
386
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
360
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
361
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
387
362
table->next_number_field=0;
390
internal::my_close(file,MYF(0));
365
my_close(file,MYF(0));
391
366
free_blobs(table); /* if pack_blob was used */
392
367
table->copy_blobs=0;
393
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
368
session->count_cuted_fields= CHECK_FIELD_IGNORE;
395
370
simulated killing in the middle of per-row loop
396
371
must be effective for binlogging
398
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
373
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
376
if (read_file_from_client)
377
while (!read_info.next_line())
401
380
error= -1; // Error on read
406
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
407
(info.records - info.copied), session->cuted_fields);
409
if (session->transaction.stmt.hasModifiedNonTransData())
410
session->transaction.all.markModifiedNonTransData();
383
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
384
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
386
if (session->transaction.stmt.modified_non_trans_table)
387
session->transaction.all.modified_non_trans_table= true;
389
if (drizzle_bin_log.is_open())
390
session->binlog_flush_pending_rows_event(true);
412
392
/* ok to client sent only after binlog write and engine commit */
413
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
393
my_ok(session, info.copied + info.deleted, 0L, name);
415
395
assert(transactional_table || !(info.copied || info.deleted) ||
416
session->transaction.stmt.hasModifiedNonTransData());
417
table->cursor->ha_release_auto_increment();
396
session->transaction.stmt.modified_non_trans_table);
397
table->file->ha_release_auto_increment();
418
398
table->auto_increment_field_not_null= false;
419
session->setAbortOnWarning(false);
399
session->abort_on_warning= 0;
484
463
if (pos == read_info.row_end)
486
465
session->cuted_fields++; /* Not enough fields */
487
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
488
ER_WARN_TOO_FEW_RECORDS,
466
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
467
ER_WARN_TOO_FEW_RECORDS,
489
468
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
491
if (not field->maybe_null() and field->is_timestamp())
492
((field::Epoch::pointer) field)->set_time();
469
if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
470
((Field_timestamp*) field)->set_time();
497
475
unsigned char save_chr;
498
if ((length=(uint32_t) (read_info.row_end-pos)) >
476
if ((length=(uint) (read_info.row_end-pos)) >
499
477
field->field_length)
501
478
length=field->field_length;
503
save_chr=pos[length];
504
pos[length]='\0'; // Add temp null terminator for store()
479
save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
505
480
field->store((char*) pos,length,read_info.read_charset);
506
481
pos[length]=save_chr;
507
482
if ((pos+=length) > read_info.row_end)
798
770
/* Set of a stack for unget if long terminators */
799
size_t length= max(field_term_length,line_term_length)+1;
800
set_if_bigger(length, line_start.length());
801
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
771
uint32_t length=cmax(field_term_length,line_term_length)+1;
772
set_if_bigger(length,line_start.length());
773
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
803
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
775
if (!(buffer=(unsigned char*) my_malloc(buff_length+1,MYF(0))))
776
error=1; /* purecov: inspected */
807
779
end_of_buff=buffer+buff_length;
808
if (cache.init_io_cache((false) ? -1 : cursor, 0,
809
(false) ? internal::READ_NET :
810
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
780
if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
781
(get_it_from_net) ? READ_NET :
782
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
813
free((unsigned char*) buffer);
785
free((unsigned char*) buffer); /* purecov: inspected */
901
880
while ( to < end_of_buff)
904
884
if ((my_mbcharlen(read_charset, chr) > 1) &&
905
885
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
907
unsigned char* p = (unsigned char*)to;
909
int ml = my_mbcharlen(read_charset, chr);
911
for (i=1; i<ml; i++) {
917
if (my_ismbchar(read_charset,
922
PUSH((unsigned char) *--to);
887
unsigned char* p = (unsigned char*)to;
889
int ml = my_mbcharlen(read_charset, chr);
891
for (i=1; i<ml; i++) {
897
if (my_ismbchar(read_charset,
902
PUSH((unsigned char) *--to);
925
906
if (chr == my_b_EOF)
927
908
if (chr == escape_char)
929
if ((chr=GET) == my_b_EOF)
931
*to++= (unsigned char) escape_char;
910
if ((chr=GET) == my_b_EOF)
912
*to++= (unsigned char) escape_char;
935
916
When escape_char == enclosed_char, we treat it like we do for
936
917
handling quotes in SQL parsing -- you can double-up the
949
930
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
950
931
if (chr == line_term_char)
952
if (chr == line_term_char && found_enclosed_char == INT_MAX)
933
if (chr == line_term_char && found_enclosed_char == INT_MAX)
955
if (terminator(line_term_ptr,line_term_length))
956
{ // Maybe unexpected linefeed
936
if (terminator(line_term_ptr,line_term_length))
937
{ // Maybe unexpected linefeed
964
945
if (chr == found_enclosed_char)
966
if ((chr=GET) == found_enclosed_char)
967
{ // Remove dupplicated
968
*to++ = (unsigned char) chr;
971
// End of enclosed field if followed by field_term or line_term
972
if (chr == my_b_EOF ||
973
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
974
{ // Maybe unexpected linefeed
981
if (chr == field_term_char &&
982
terminator(field_term_ptr,field_term_length))
990
The string didn't terminate yet.
991
Store back next character for the loop
994
/* copy the found term character to 'to' */
995
chr= found_enclosed_char;
947
if ((chr=GET) == found_enclosed_char)
948
{ // Remove dupplicated
949
*to++ = (unsigned char) chr;
952
// End of enclosed field if followed by field_term or line_term
953
if (chr == my_b_EOF ||
954
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
955
{ // Maybe unexpected linefeed
962
if (chr == field_term_char &&
963
terminator(field_term_ptr,field_term_length))
971
The string didn't terminate yet.
972
Store back next character for the loop
975
/* copy the found term character to 'to' */
976
chr= found_enclosed_char;
997
978
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
999
if (terminator(field_term_ptr,field_term_length))
980
if (terminator(field_term_ptr,field_term_length))
1007
988
*to++ = (unsigned char) chr;
1010
** We come here if buffer is too small. Enlarge it and continue
1012
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
991
** We come here if buffer is too small. Enlarge it and continue
993
if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
1013
995
return (error=1);
1014
996
to=new_buffer + (to-buffer);
1015
997
buffer=new_buffer;