12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
23
#include <drizzled/session.h>
24
24
#include <drizzled/sql_base.h>
25
25
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
34
#include <boost/filesystem.hpp>
36
namespace fs=boost::filesystem;
43
unsigned char *buffer; /* Buffer for read text */
44
unsigned char *end_of_buff; /* Data in bufferts ends here */
45
size_t buff_length; /* Length of buffert */
46
size_t max_length; /* Max length of row */
29
unsigned char *buffer, /* Buffer for read text */
30
*end_of_buff; /* Data in bufferts ends here */
31
uint buff_length, /* Length of buffert */
32
max_length; /* Max length of row */
47
33
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
48
34
uint field_term_length,line_term_length,enclosed_length;
49
35
int field_term_char,line_term_char,enclosed_char,escape_char;
50
36
int *stack,*stack_pos;
51
37
bool found_end_of_line,start_of_line,eof;
52
38
bool need_end_io_cache;
53
internal::IO_CACHE cache;
56
43
bool error,line_cuted,found_null,enclosed;
87
74
void set_io_cache_arg(void* arg) { cache.arg = arg; }
90
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
77
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
91
78
List<Item> &fields_vars, List<Item> &set_fields,
92
79
List<Item> &set_values, READ_INFO &read_info,
93
80
uint32_t skip_lines,
94
81
bool ignore_check_option_errors);
95
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
82
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
96
83
List<Item> &fields_vars, List<Item> &set_fields,
97
84
List<Item> &set_values, READ_INFO &read_info,
98
85
String &enclosed, uint32_t skip_lines,
124
111
List<Item> &set_values,
125
112
enum enum_duplicates handle_duplicates, bool ignore)
114
char name[FN_REFLEN];
128
116
Table *table= NULL;
130
118
String *field_term=ex->field_term,*escaped=ex->escaped;
131
119
String *enclosed=ex->enclosed;
134
assert(table_list->getSchemaName()); // This should never be null
121
char *db= table_list->db; // This is never null
137
If path for cursor is not defined, we will use the current database.
124
If path for file is not defined, we will use the current database.
138
125
If this is not set, we will use the directory where the table to be
139
126
loaded is located
141
util::string::const_shared_ptr schema(session->schema());
142
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
128
char *tdb= session->db ? session->db : db; // Result is never null
144
130
uint32_t skip_lines= ex->skip_lines;
145
131
bool transactional_table;
146
Session::killed_state_t killed_status= Session::NOT_KILLED;
132
Session::killed_state killed_status= Session::NOT_KILLED;
148
134
/* Escape and enclosed character may be a utf8 4-byte character */
149
135
if (escaped->length() > 4 || enclosed->length() > 4)
169
153
table is marked to be 'used for insert' in which case we should never
170
154
mark this table as 'const table' (ie, one that has only one row).
172
if (unique_table(table_list, table_list->next_global))
156
if (unique_table(session, table_list, table_list->next_global, 0))
174
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
158
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
178
162
table= table_list->table;
179
transactional_table= table->cursor->has_transactions();
163
transactional_table= table->file->has_transactions();
181
165
if (!fields_vars.elements)
184
for (field= table->getFields(); *field ; field++)
168
for (field=table->field; *field ; field++)
185
169
fields_vars.push_back(new Item_field(*field));
186
table->setWriteSet();
170
bitmap_set_all(table->write_set);
187
171
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
189
173
Let us also prepare SET clause, altough it is probably empty
260
fs::path to_file(ex->file_name);
261
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
if (not to_file.has_root_directory())
264
int count_elements= 0;
265
for (fs::path::iterator iter= to_file.begin();
266
iter != to_file.end();
267
++iter, ++count_elements)
270
if (count_elements == 1)
274
target_path /= to_file;
278
target_path= to_file;
281
if (not secure_file_priv.string().empty())
283
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
285
/* Read only allowed from within dir specified by secure_file_priv */
286
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
245
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
246
ex->file_name+=dirname_length(ex->file_name);
248
if (!dirname_length(ex->file_name))
250
strcpy(name, drizzle_real_data_home);
251
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
252
(void) fn_format(name, ex->file_name, name, "",
253
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
257
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
258
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
260
if (opt_secure_file_priv &&
261
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
263
/* Read only allowed from within dir specified by secure_file_priv */
264
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
268
struct stat stat_info;
269
if (stat(name,&stat_info))
271
my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
275
// if we are not in slave thread, the file must be:
276
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
277
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
278
((stat_info.st_mode & S_IFREG) == S_IFREG ||
279
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
281
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
284
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
287
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
289
my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
291
struct stat stat_info;
292
if (stat(target_path.file_string().c_str(), &stat_info))
294
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
298
// if we are not in slave thread, the cursor must be:
299
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
300
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
304
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
311
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
313
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
317
295
memset(&info, 0, sizeof(info));
318
296
info.ignore= ignore;
319
297
info.handle_duplicates=handle_duplicates;
320
298
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
322
SchemaIdentifier identifier(*schema);
323
READ_INFO read_info(file, tot_length,
324
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
*field_term, *ex->line_start, *ex->line_term, *enclosed,
300
READ_INFO read_info(file,tot_length,
301
ex->cs ? ex->cs : session->variables.collation_database,
302
*field_term,*ex->line_start, *ex->line_term, *enclosed,
326
303
info.escape_char, is_fifo);
327
304
if (read_info.error)
330
internal::my_close(file,MYF(0)); // no files in net reading
307
my_close(file,MYF(0)); // no files in net reading
331
308
return(true); // Can't allocate buffers
375
352
error= read_sep_field(session, info, table_list, fields_vars,
376
353
set_fields, set_values, read_info,
377
354
*enclosed, skip_lines, ignore);
378
if (table->cursor->ha_end_bulk_insert() && !error)
355
if (table->file->ha_end_bulk_insert() && !error)
380
table->print_error(errno, MYF(0));
357
table->file->print_error(my_errno, MYF(0));
383
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
384
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
360
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
361
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
385
362
table->next_number_field=0;
388
internal::my_close(file,MYF(0));
365
my_close(file,MYF(0));
389
366
free_blobs(table); /* if pack_blob was used */
390
367
table->copy_blobs=0;
391
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
368
session->count_cuted_fields= CHECK_FIELD_IGNORE;
393
370
simulated killing in the middle of per-row loop
394
371
must be effective for binlogging
396
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
373
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
399
376
error= -1; // Error on read
404
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
405
(info.records - info.copied), session->cuted_fields);
407
if (session->transaction.stmt.hasModifiedNonTransData())
408
session->transaction.all.markModifiedNonTransData();
379
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
380
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
382
if (session->transaction.stmt.modified_non_trans_table)
383
session->transaction.all.modified_non_trans_table= true;
410
385
/* ok to client sent only after binlog write and engine commit */
411
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
386
session->my_ok(info.copied + info.deleted, 0L, name);
413
388
assert(transactional_table || !(info.copied || info.deleted) ||
414
session->transaction.stmt.hasModifiedNonTransData());
415
table->cursor->ha_release_auto_increment();
389
session->transaction.stmt.modified_non_trans_table);
390
table->file->ha_release_auto_increment();
416
391
table->auto_increment_field_not_null= false;
417
392
session->abort_on_warning= 0;
791
762
/* Set of a stack for unget if long terminators */
792
size_t length= max(field_term_length,line_term_length)+1;
793
set_if_bigger(length, line_start.length());
794
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
763
uint32_t length=cmax(field_term_length,line_term_length)+1;
764
set_if_bigger(length,line_start.length());
765
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
796
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
767
if (!(buffer=(unsigned char*) malloc(buff_length+1)))
768
error=1; /* purecov: inspected */
800
771
end_of_buff=buffer+buff_length;
801
if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
(false) ? internal::READ_NET :
803
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
772
if (init_io_cache(&cache,(false) ? -1 : file, 0,
774
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
806
free((unsigned char*) buffer);
777
free((unsigned char*) buffer); /* purecov: inspected */
894
865
while ( to < end_of_buff)
897
869
if ((my_mbcharlen(read_charset, chr) > 1) &&
898
870
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
900
unsigned char* p = (unsigned char*)to;
902
int ml = my_mbcharlen(read_charset, chr);
904
for (i=1; i<ml; i++) {
910
if (my_ismbchar(read_charset,
915
PUSH((unsigned char) *--to);
872
unsigned char* p = (unsigned char*)to;
874
int ml = my_mbcharlen(read_charset, chr);
876
for (i=1; i<ml; i++) {
882
if (my_ismbchar(read_charset,
887
PUSH((unsigned char) *--to);
918
891
if (chr == my_b_EOF)
920
893
if (chr == escape_char)
922
if ((chr=GET) == my_b_EOF)
924
*to++= (unsigned char) escape_char;
895
if ((chr=GET) == my_b_EOF)
897
*to++= (unsigned char) escape_char;
928
901
When escape_char == enclosed_char, we treat it like we do for
929
902
handling quotes in SQL parsing -- you can double-up the
942
915
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
943
916
if (chr == line_term_char)
945
if (chr == line_term_char && found_enclosed_char == INT_MAX)
918
if (chr == line_term_char && found_enclosed_char == INT_MAX)
948
if (terminator(line_term_ptr,line_term_length))
949
{ // Maybe unexpected linefeed
921
if (terminator(line_term_ptr,line_term_length))
922
{ // Maybe unexpected linefeed
957
930
if (chr == found_enclosed_char)
959
if ((chr=GET) == found_enclosed_char)
960
{ // Remove dupplicated
961
*to++ = (unsigned char) chr;
964
// End of enclosed field if followed by field_term or line_term
965
if (chr == my_b_EOF ||
966
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
967
{ // Maybe unexpected linefeed
974
if (chr == field_term_char &&
975
terminator(field_term_ptr,field_term_length))
983
The string didn't terminate yet.
984
Store back next character for the loop
987
/* copy the found term character to 'to' */
988
chr= found_enclosed_char;
932
if ((chr=GET) == found_enclosed_char)
933
{ // Remove dupplicated
934
*to++ = (unsigned char) chr;
937
// End of enclosed field if followed by field_term or line_term
938
if (chr == my_b_EOF ||
939
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
940
{ // Maybe unexpected linefeed
947
if (chr == field_term_char &&
948
terminator(field_term_ptr,field_term_length))
956
The string didn't terminate yet.
957
Store back next character for the loop
960
/* copy the found term character to 'to' */
961
chr= found_enclosed_char;
990
963
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
992
if (terminator(field_term_ptr,field_term_length))
965
if (terminator(field_term_ptr,field_term_length))
1000
973
*to++ = (unsigned char) chr;
1003
** We come here if buffer is too small. Enlarge it and continue
976
** We come here if buffer is too small. Enlarge it and continue
1005
978
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
1006
979
return (error=1);
1007
980
to=new_buffer + (to-buffer);