12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
23
#include <drizzled/session.h>
24
24
#include <drizzled/sql_base.h>
25
25
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
34
#include <boost/filesystem.hpp>
36
namespace fs=boost::filesystem;
43
unsigned char *buffer; /* Buffer for read text */
44
unsigned char *end_of_buff; /* Data in bufferts ends here */
45
size_t buff_length; /* Length of buffert */
46
size_t max_length; /* Max length of row */
29
unsigned char *buffer, /* Buffer for read text */
30
*end_of_buff; /* Data in bufferts ends here */
31
uint buff_length, /* Length of buffert */
32
max_length; /* Max length of row */
47
33
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
48
34
uint field_term_length,line_term_length,enclosed_length;
49
35
int field_term_char,line_term_char,enclosed_char,escape_char;
50
36
int *stack,*stack_pos;
51
37
bool found_end_of_line,start_of_line,eof;
52
38
bool need_end_io_cache;
53
internal::IO_CACHE cache;
56
42
bool error,line_cuted,found_null,enclosed;
87
73
void set_io_cache_arg(void* arg) { cache.arg = arg; }
90
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
76
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
91
77
List<Item> &fields_vars, List<Item> &set_fields,
92
78
List<Item> &set_values, READ_INFO &read_info,
93
79
uint32_t skip_lines,
94
80
bool ignore_check_option_errors);
95
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
81
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
96
82
List<Item> &fields_vars, List<Item> &set_fields,
97
83
List<Item> &set_values, READ_INFO &read_info,
98
84
String &enclosed, uint32_t skip_lines,
124
110
List<Item> &set_values,
125
111
enum enum_duplicates handle_duplicates, bool ignore)
113
char name[FN_REFLEN];
128
115
Table *table= NULL;
130
117
String *field_term=ex->field_term,*escaped=ex->escaped;
131
118
String *enclosed=ex->enclosed;
134
assert(table_list->getSchemaName()); // This should never be null
120
char *db= table_list->db; // This is never null
137
If path for cursor is not defined, we will use the current database.
123
If path for file is not defined, we will use the current database.
138
124
If this is not set, we will use the directory where the table to be
139
125
loaded is located
141
util::string::const_shared_ptr schema(session->schema());
142
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
127
char *tdb= session->db ? session->db : db; // Result is never null
144
129
uint32_t skip_lines= ex->skip_lines;
145
130
bool transactional_table;
146
Session::killed_state_t killed_status= Session::NOT_KILLED;
131
Session::killed_state killed_status= Session::NOT_KILLED;
148
133
/* Escape and enclosed character may be a utf8 4-byte character */
149
134
if (escaped->length() > 4 || enclosed->length() > 4)
169
152
table is marked to be 'used for insert' in which case we should never
170
153
mark this table as 'const table' (ie, one that has only one row).
172
if (unique_table(table_list, table_list->next_global))
155
if (unique_table(session, table_list, table_list->next_global, 0))
174
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
157
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
178
161
table= table_list->table;
179
transactional_table= table->cursor->has_transactions();
162
transactional_table= table->file->has_transactions();
181
164
if (!fields_vars.elements)
184
for (field= table->getFields(); *field ; field++)
167
for (field=table->field; *field ; field++)
185
168
fields_vars.push_back(new Item_field(*field));
186
table->setWriteSet();
169
table->write_set->set();
187
170
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
189
172
Let us also prepare SET clause, altough it is probably empty
260
fs::path to_file(ex->file_name);
261
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
if (not to_file.has_root_directory())
264
int count_elements= 0;
265
for (fs::path::iterator iter= to_file.begin();
266
iter != to_file.end();
267
++iter, ++count_elements)
270
if (count_elements == 1)
274
target_path /= to_file;
278
target_path= to_file;
281
if (not secure_file_priv.string().empty())
283
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
285
/* Read only allowed from within dir specified by secure_file_priv */
286
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
242
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
243
ex->file_name+=dirname_length(ex->file_name);
245
if (!dirname_length(ex->file_name))
247
strcpy(name, drizzle_real_data_home);
248
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
249
(void) fn_format(name, ex->file_name, name, "",
250
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
254
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
255
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
257
if (opt_secure_file_priv &&
258
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
260
/* Read only allowed from within dir specified by secure_file_priv */
261
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
265
struct stat stat_info;
266
if (stat(name,&stat_info))
268
my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
272
// if we are not in slave thread, the file must be:
273
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
274
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
275
((stat_info.st_mode & S_IFREG) == S_IFREG ||
276
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
278
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
281
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
284
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
286
my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
291
struct stat stat_info;
292
if (stat(target_path.file_string().c_str(), &stat_info))
294
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
298
// if we are not in slave thread, the cursor must be:
299
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
300
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
304
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
311
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
313
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
317
292
memset(&info, 0, sizeof(info));
318
293
info.ignore= ignore;
319
294
info.handle_duplicates=handle_duplicates;
320
295
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
322
SchemaIdentifier identifier(*schema);
323
READ_INFO read_info(file, tot_length,
324
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
*field_term, *ex->line_start, *ex->line_term, *enclosed,
297
READ_INFO read_info(file,tot_length,
298
ex->cs ? ex->cs : session->variables.collation_database,
299
*field_term,*ex->line_start, *ex->line_term, *enclosed,
326
300
info.escape_char, is_fifo);
327
301
if (read_info.error)
330
internal::my_close(file,MYF(0)); // no files in net reading
304
my_close(file,MYF(0)); // no files in net reading
331
305
return(true); // Can't allocate buffers
375
349
error= read_sep_field(session, info, table_list, fields_vars,
376
350
set_fields, set_values, read_info,
377
351
*enclosed, skip_lines, ignore);
378
if (table->cursor->ha_end_bulk_insert() && !error)
352
if (table->file->ha_end_bulk_insert() && !error)
380
table->print_error(errno, MYF(0));
354
table->file->print_error(my_errno, MYF(0));
383
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
384
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
357
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
358
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
385
359
table->next_number_field=0;
388
internal::my_close(file,MYF(0));
362
my_close(file,MYF(0));
389
363
free_blobs(table); /* if pack_blob was used */
390
364
table->copy_blobs=0;
391
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
365
session->count_cuted_fields= CHECK_FIELD_IGNORE;
393
367
simulated killing in the middle of per-row loop
394
368
must be effective for binlogging
396
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
370
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
399
373
error= -1; // Error on read
404
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
405
(info.records - info.copied), session->cuted_fields);
407
if (session->transaction.stmt.hasModifiedNonTransData())
408
session->transaction.all.markModifiedNonTransData();
376
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
377
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
379
if (session->transaction.stmt.modified_non_trans_table)
380
session->transaction.all.modified_non_trans_table= true;
410
382
/* ok to client sent only after binlog write and engine commit */
411
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
383
session->my_ok(info.copied + info.deleted, 0L, name);
413
385
assert(transactional_table || !(info.copied || info.deleted) ||
414
session->transaction.stmt.hasModifiedNonTransData());
415
table->cursor->ha_release_auto_increment();
386
session->transaction.stmt.modified_non_trans_table);
387
table->file->ha_release_auto_increment();
416
388
table->auto_increment_field_not_null= false;
417
389
session->abort_on_warning= 0;
791
759
/* Set of a stack for unget if long terminators */
792
size_t length= max(field_term_length,line_term_length)+1;
793
set_if_bigger(length, line_start.length());
794
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
760
uint32_t length=cmax(field_term_length,line_term_length)+1;
761
set_if_bigger(length,line_start.length());
762
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
796
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
764
if (!(buffer=(unsigned char*) malloc(buff_length+1)))
765
error=1; /* purecov: inspected */
800
768
end_of_buff=buffer+buff_length;
801
if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
(false) ? internal::READ_NET :
803
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
769
if (init_io_cache(&cache,(false) ? -1 : file, 0,
771
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
806
free((unsigned char*) buffer);
774
free((unsigned char*) buffer); /* purecov: inspected */
894
862
while ( to < end_of_buff)
897
866
if ((my_mbcharlen(read_charset, chr) > 1) &&
898
867
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
900
unsigned char* p = (unsigned char*)to;
902
int ml = my_mbcharlen(read_charset, chr);
904
for (i=1; i<ml; i++) {
910
if (my_ismbchar(read_charset,
915
PUSH((unsigned char) *--to);
869
unsigned char* p = (unsigned char*)to;
871
int ml = my_mbcharlen(read_charset, chr);
873
for (i=1; i<ml; i++) {
879
if (my_ismbchar(read_charset,
884
PUSH((unsigned char) *--to);
918
888
if (chr == my_b_EOF)
920
890
if (chr == escape_char)
922
if ((chr=GET) == my_b_EOF)
924
*to++= (unsigned char) escape_char;
892
if ((chr=GET) == my_b_EOF)
894
*to++= (unsigned char) escape_char;
928
898
When escape_char == enclosed_char, we treat it like we do for
929
899
handling quotes in SQL parsing -- you can double-up the
942
912
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
943
913
if (chr == line_term_char)
945
if (chr == line_term_char && found_enclosed_char == INT_MAX)
915
if (chr == line_term_char && found_enclosed_char == INT_MAX)
948
if (terminator(line_term_ptr,line_term_length))
949
{ // Maybe unexpected linefeed
918
if (terminator(line_term_ptr,line_term_length))
919
{ // Maybe unexpected linefeed
957
927
if (chr == found_enclosed_char)
959
if ((chr=GET) == found_enclosed_char)
960
{ // Remove dupplicated
961
*to++ = (unsigned char) chr;
964
// End of enclosed field if followed by field_term or line_term
965
if (chr == my_b_EOF ||
966
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
967
{ // Maybe unexpected linefeed
974
if (chr == field_term_char &&
975
terminator(field_term_ptr,field_term_length))
983
The string didn't terminate yet.
984
Store back next character for the loop
987
/* copy the found term character to 'to' */
988
chr= found_enclosed_char;
929
if ((chr=GET) == found_enclosed_char)
930
{ // Remove dupplicated
931
*to++ = (unsigned char) chr;
934
// End of enclosed field if followed by field_term or line_term
935
if (chr == my_b_EOF ||
936
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
937
{ // Maybe unexpected linefeed
944
if (chr == field_term_char &&
945
terminator(field_term_ptr,field_term_length))
953
The string didn't terminate yet.
954
Store back next character for the loop
957
/* copy the found term character to 'to' */
958
chr= found_enclosed_char;
990
960
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
992
if (terminator(field_term_ptr,field_term_length))
962
if (terminator(field_term_ptr,field_term_length))
1000
970
*to++ = (unsigned char) chr;
1003
** We come here if buffer is too small. Enlarge it and continue
973
** We come here if buffer is too small. Enlarge it and continue
1005
975
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
1006
976
return (error=1);
1007
977
to=new_buffer + (to-buffer);