12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
#include <drizzled/replication/replication.h>
21
22
#include <drizzled/error.h>
22
23
#include <drizzled/data_home.h>
23
24
#include <drizzled/session.h>
24
25
#include <drizzled/sql_base.h>
25
26
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
34
#include <boost/filesystem.hpp>
36
namespace fs=boost::filesystem;
43
unsigned char *buffer; /* Buffer for read text */
44
unsigned char *end_of_buff; /* Data in bufferts ends here */
45
size_t buff_length; /* Length of buffert */
46
size_t max_length; /* Max length of row */
30
unsigned char *buffer, /* Buffer for read text */
31
*end_of_buff; /* Data in bufferts ends here */
32
uint buff_length, /* Length of buffert */
33
max_length; /* Max length of row */
47
34
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
48
35
uint field_term_length,line_term_length,enclosed_length;
49
36
int field_term_char,line_term_char,enclosed_char,escape_char;
50
37
int *stack,*stack_pos;
51
38
bool found_end_of_line,start_of_line,eof;
52
39
bool need_end_io_cache;
53
internal::IO_CACHE cache;
56
44
bool error,line_cuted,found_null,enclosed;
87
75
void set_io_cache_arg(void* arg) { cache.arg = arg; }
90
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
78
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
91
79
List<Item> &fields_vars, List<Item> &set_fields,
92
80
List<Item> &set_values, READ_INFO &read_info,
93
81
uint32_t skip_lines,
94
82
bool ignore_check_option_errors);
95
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
83
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
96
84
List<Item> &fields_vars, List<Item> &set_fields,
97
85
List<Item> &set_values, READ_INFO &read_info,
98
86
String &enclosed, uint32_t skip_lines,
107
95
session - current thread
108
ex - file_exchange object representing source cursor and its parsing rules
96
ex - sql_exchange object representing source file and its parsing rules
109
97
table_list - list of tables to which we are loading data
110
98
fields_vars - list of fields and variables to which we read
112
100
set_fields - list of fields mentioned in set clause
113
101
set_values - expressions to assign to fields in previous list
114
102
handle_duplicates - indicates whenever we should emit error or
115
103
replace row if we will meet duplicates.
116
104
ignore - - indicates whenever we should ignore duplicates
105
read_file_from_client - is this LOAD DATA LOCAL ?
119
108
true - error / false - success
122
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
123
112
List<Item> &fields_vars, List<Item> &set_fields,
124
113
List<Item> &set_values,
125
enum enum_duplicates handle_duplicates, bool ignore)
114
enum enum_duplicates handle_duplicates, bool ignore,
115
bool read_file_from_client)
117
char name[FN_REFLEN];
128
119
Table *table= NULL;
130
121
String *field_term=ex->field_term,*escaped=ex->escaped;
131
122
String *enclosed=ex->enclosed;
134
assert(table_list->getSchemaName()); // This should never be null
124
char *db= table_list->db; // This is never null
137
If path for cursor is not defined, we will use the current database.
127
If path for file is not defined, we will use the current database.
138
128
If this is not set, we will use the directory where the table to be
139
129
loaded is located
141
util::string::const_shared_ptr schema(session->schema());
142
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
131
char *tdb= session->db ? session->db : db; // Result is never null
144
133
uint32_t skip_lines= ex->skip_lines;
145
134
bool transactional_table;
146
Session::killed_state_t killed_status= Session::NOT_KILLED;
135
Session::killed_state killed_status= Session::NOT_KILLED;
148
/* Escape and enclosed character may be a utf8 4-byte character */
149
if (escaped->length() > 4 || enclosed->length() > 4)
137
if (escaped->length() > 1 || enclosed->length() > 1)
151
my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
139
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
155
if (session->openTablesLock(table_list))
143
if (open_and_lock_tables(session, table_list))
158
145
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
159
146
&session->lex->select_lex.top_join_list,
169
156
table is marked to be 'used for insert' in which case we should never
170
157
mark this table as 'const table' (ie, one that has only one row).
172
if (unique_table(table_list, table_list->next_global))
159
if (unique_table(session, table_list, table_list->next_global, 0))
174
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
161
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
178
165
table= table_list->table;
179
transactional_table= table->cursor->has_transactions();
166
transactional_table= table->file->has_transactions();
181
168
if (!fields_vars.elements)
184
for (field= table->getFields(); *field ; field++)
171
for (field=table->field; *field ; field++)
185
172
fields_vars.push_back(new Item_field(*field));
186
table->setWriteSet();
173
bitmap_set_all(table->write_set);
187
174
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
189
176
Let us also prepare SET clause, altough it is probably empty
260
fs::path to_file(ex->file_name);
261
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
if (not to_file.has_root_directory())
247
/* We can't give an error in the middle when using LOCAL files */
248
if (read_file_from_client && handle_duplicates == DUP_ERROR)
251
if (read_file_from_client)
264
int count_elements= 0;
265
for (fs::path::iterator iter= to_file.begin();
266
iter != to_file.end();
267
++iter, ++count_elements)
270
if (count_elements == 1)
274
target_path /= to_file;
253
(void)net_request_file(&session->net,ex->file_name);
278
target_path= to_file;
281
if (not secure_file_priv.string().empty())
283
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
285
/* Read only allowed from within dir specified by secure_file_priv */
286
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
258
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
259
ex->file_name+=dirname_length(ex->file_name);
261
if (!dirname_length(ex->file_name))
263
strcpy(name, drizzle_real_data_home);
264
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
265
(void) fn_format(name, ex->file_name, name, "",
266
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
270
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
271
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
273
if (opt_secure_file_priv &&
274
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
276
/* Read only allowed from within dir specified by secure_file_priv */
277
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
281
struct stat stat_info;
282
if (stat(name,&stat_info))
285
// if we are not in slave thread, the file must be:
286
if (!session->slave_thread &&
287
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
288
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
292
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
295
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
298
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
291
struct stat stat_info;
292
if (stat(target_path.file_string().c_str(), &stat_info))
294
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
298
// if we are not in slave thread, the cursor must be:
299
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
300
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
304
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
311
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
313
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
317
303
memset(&info, 0, sizeof(info));
318
304
info.ignore= ignore;
319
305
info.handle_duplicates=handle_duplicates;
320
306
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
322
SchemaIdentifier identifier(*schema);
323
READ_INFO read_info(file, tot_length,
324
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
*field_term, *ex->line_start, *ex->line_term, *enclosed,
326
info.escape_char, is_fifo);
308
READ_INFO read_info(file,tot_length,
309
ex->cs ? ex->cs : session->variables.collation_database,
310
*field_term,*ex->line_start, *ex->line_term, *enclosed,
311
info.escape_char, read_file_from_client, is_fifo);
327
312
if (read_info.error)
330
internal::my_close(file,MYF(0)); // no files in net reading
315
my_close(file,MYF(0)); // no files in net reading
331
316
return(true); // Can't allocate buffers
335
* Per the SQL standard, inserting NULL into a NOT NULL
336
* field requires an error to be thrown.
340
* NULL check and handling occurs in field_conv.cc
342
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
319
session->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
343
320
session->cuted_fields=0L;
344
321
/* Skip lines if there is a line terminator */
345
322
if (ex->line_term->length())
375
352
error= read_sep_field(session, info, table_list, fields_vars,
376
353
set_fields, set_values, read_info,
377
354
*enclosed, skip_lines, ignore);
378
if (table->cursor->ha_end_bulk_insert() && !error)
355
if (table->file->ha_end_bulk_insert() && !error)
380
table->print_error(errno, MYF(0));
357
table->file->print_error(my_errno, MYF(0));
383
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
384
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
360
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
361
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
385
362
table->next_number_field=0;
388
internal::my_close(file,MYF(0));
365
my_close(file,MYF(0));
389
366
free_blobs(table); /* if pack_blob was used */
390
367
table->copy_blobs=0;
391
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
368
session->count_cuted_fields= CHECK_FIELD_IGNORE;
393
370
simulated killing in the middle of per-row loop
394
371
must be effective for binlogging
396
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
373
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
376
if (read_file_from_client)
377
while (!read_info.next_line())
399
380
error= -1; // Error on read
404
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
405
(info.records - info.copied), session->cuted_fields);
407
if (session->transaction.stmt.hasModifiedNonTransData())
408
session->transaction.all.markModifiedNonTransData();
383
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
384
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
386
if (session->transaction.stmt.modified_non_trans_table)
387
session->transaction.all.modified_non_trans_table= true;
389
if (drizzle_bin_log.is_open())
390
session->binlog_flush_pending_rows_event(true);
410
392
/* ok to client sent only after binlog write and engine commit */
411
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
393
my_ok(session, info.copied + info.deleted, 0L, name);
413
395
assert(transactional_table || !(info.copied || info.deleted) ||
414
session->transaction.stmt.hasModifiedNonTransData());
415
table->cursor->ha_release_auto_increment();
396
session->transaction.stmt.modified_non_trans_table);
397
table->file->ha_release_auto_increment();
416
398
table->auto_increment_field_not_null= false;
417
399
session->abort_on_warning= 0;
791
770
/* Set of a stack for unget if long terminators */
792
size_t length= max(field_term_length,line_term_length)+1;
793
set_if_bigger(length, line_start.length());
794
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
771
uint32_t length=cmax(field_term_length,line_term_length)+1;
772
set_if_bigger(length,line_start.length());
773
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
796
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
775
if (!(buffer=(unsigned char*) malloc(buff_length+1)))
776
error=1; /* purecov: inspected */
800
779
end_of_buff=buffer+buff_length;
801
if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
(false) ? internal::READ_NET :
803
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
780
if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
781
(get_it_from_net) ? READ_NET :
782
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
806
free((unsigned char*) buffer);
785
free((unsigned char*) buffer); /* purecov: inspected */
894
880
while ( to < end_of_buff)
897
884
if ((my_mbcharlen(read_charset, chr) > 1) &&
898
885
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
900
unsigned char* p = (unsigned char*)to;
902
int ml = my_mbcharlen(read_charset, chr);
904
for (i=1; i<ml; i++) {
910
if (my_ismbchar(read_charset,
915
PUSH((unsigned char) *--to);
887
unsigned char* p = (unsigned char*)to;
889
int ml = my_mbcharlen(read_charset, chr);
891
for (i=1; i<ml; i++) {
897
if (my_ismbchar(read_charset,
902
PUSH((unsigned char) *--to);
918
906
if (chr == my_b_EOF)
920
908
if (chr == escape_char)
922
if ((chr=GET) == my_b_EOF)
924
*to++= (unsigned char) escape_char;
910
if ((chr=GET) == my_b_EOF)
912
*to++= (unsigned char) escape_char;
928
916
When escape_char == enclosed_char, we treat it like we do for
929
917
handling quotes in SQL parsing -- you can double-up the
942
930
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
943
931
if (chr == line_term_char)
945
if (chr == line_term_char && found_enclosed_char == INT_MAX)
933
if (chr == line_term_char && found_enclosed_char == INT_MAX)
948
if (terminator(line_term_ptr,line_term_length))
949
{ // Maybe unexpected linefeed
936
if (terminator(line_term_ptr,line_term_length))
937
{ // Maybe unexpected linefeed
957
945
if (chr == found_enclosed_char)
959
if ((chr=GET) == found_enclosed_char)
960
{ // Remove dupplicated
961
*to++ = (unsigned char) chr;
964
// End of enclosed field if followed by field_term or line_term
965
if (chr == my_b_EOF ||
966
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
967
{ // Maybe unexpected linefeed
974
if (chr == field_term_char &&
975
terminator(field_term_ptr,field_term_length))
983
The string didn't terminate yet.
984
Store back next character for the loop
987
/* copy the found term character to 'to' */
988
chr= found_enclosed_char;
947
if ((chr=GET) == found_enclosed_char)
948
{ // Remove dupplicated
949
*to++ = (unsigned char) chr;
952
// End of enclosed field if followed by field_term or line_term
953
if (chr == my_b_EOF ||
954
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
955
{ // Maybe unexpected linefeed
962
if (chr == field_term_char &&
963
terminator(field_term_ptr,field_term_length))
971
The string didn't terminate yet.
972
Store back next character for the loop
975
/* copy the found term character to 'to' */
976
chr= found_enclosed_char;
990
978
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
992
if (terminator(field_term_ptr,field_term_length))
980
if (terminator(field_term_ptr,field_term_length))
1000
988
*to++ = (unsigned char) chr;
1003
** We come here if buffer is too small. Enlarge it and continue
1005
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
991
** We come here if buffer is too small. Enlarge it and continue
993
if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
1006
995
return (error=1);
1007
996
to=new_buffer + (to-buffer);
1008
997
buffer=new_buffer;