12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
23
#include <drizzled/session.h>
24
24
#include <drizzled/sql_base.h>
25
25
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
34
#include <boost/filesystem.hpp>
36
namespace fs=boost::filesystem;
43
29
unsigned char *buffer; /* Buffer for read text */
44
30
unsigned char *end_of_buff; /* Data in bufferts ends here */
45
31
size_t buff_length; /* Length of buffert */
87
73
void set_io_cache_arg(void* arg) { cache.arg = arg; }
90
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
76
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
91
77
List<Item> &fields_vars, List<Item> &set_fields,
92
78
List<Item> &set_values, READ_INFO &read_info,
93
79
uint32_t skip_lines,
94
80
bool ignore_check_option_errors);
95
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
81
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
96
82
List<Item> &fields_vars, List<Item> &set_fields,
97
83
List<Item> &set_values, READ_INFO &read_info,
98
84
String &enclosed, uint32_t skip_lines,
124
110
List<Item> &set_values,
125
111
enum enum_duplicates handle_duplicates, bool ignore)
113
char name[FN_REFLEN];
128
115
Table *table= NULL;
130
117
String *field_term=ex->field_term,*escaped=ex->escaped;
131
118
String *enclosed=ex->enclosed;
134
assert(table_list->getSchemaName()); // This should never be null
120
char *db= table_list->db; // This is never null
137
If path for cursor is not defined, we will use the current database.
123
If path for file is not defined, we will use the current database.
138
124
If this is not set, we will use the directory where the table to be
139
125
loaded is located
141
util::string::const_shared_ptr schema(session->schema());
142
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
127
char *tdb= session->db ? session->db : db; // Result is never null
144
129
uint32_t skip_lines= ex->skip_lines;
145
130
bool transactional_table;
146
Session::killed_state_t killed_status= Session::NOT_KILLED;
131
Session::killed_state killed_status= Session::NOT_KILLED;
148
133
/* Escape and enclosed character may be a utf8 4-byte character */
149
134
if (escaped->length() > 4 || enclosed->length() > 4)
169
152
table is marked to be 'used for insert' in which case we should never
170
153
mark this table as 'const table' (ie, one that has only one row).
172
if (unique_table(table_list, table_list->next_global))
155
if (unique_table(session, table_list, table_list->next_global, 0))
174
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
157
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
178
161
table= table_list->table;
179
transactional_table= table->cursor->has_transactions();
162
transactional_table= table->file->has_transactions();
181
164
if (!fields_vars.elements)
184
for (field= table->getFields(); *field ; field++)
167
for (field=table->field; *field ; field++)
185
168
fields_vars.push_back(new Item_field(*field));
186
169
table->setWriteSet();
187
170
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
260
fs::path to_file(ex->file_name);
261
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
if (not to_file.has_root_directory())
264
int count_elements= 0;
265
for (fs::path::iterator iter= to_file.begin();
266
iter != to_file.end();
267
++iter, ++count_elements)
270
if (count_elements == 1)
274
target_path /= to_file;
278
target_path= to_file;
281
if (not secure_file_priv.string().empty())
283
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
285
/* Read only allowed from within dir specified by secure_file_priv */
286
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
242
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
243
ex->file_name+=dirname_length(ex->file_name);
245
if (!dirname_length(ex->file_name))
247
strcpy(name, drizzle_real_data_home);
248
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
249
(void) fn_format(name, ex->file_name, name, "",
250
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
254
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
255
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
257
if (opt_secure_file_priv &&
258
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
260
/* Read only allowed from within dir specified by secure_file_priv */
261
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
265
struct stat stat_info;
266
if (stat(name,&stat_info))
268
my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
272
// if we are not in slave thread, the file must be:
273
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
274
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
275
((stat_info.st_mode & S_IFREG) == S_IFREG ||
276
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
278
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
281
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
284
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
286
my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
291
struct stat stat_info;
292
if (stat(target_path.file_string().c_str(), &stat_info))
294
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
298
// if we are not in slave thread, the cursor must be:
299
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
300
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
304
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
311
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
313
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
317
292
memset(&info, 0, sizeof(info));
318
293
info.ignore= ignore;
319
294
info.handle_duplicates=handle_duplicates;
320
295
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
322
SchemaIdentifier identifier(*schema);
323
297
READ_INFO read_info(file, tot_length,
324
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
*field_term, *ex->line_start, *ex->line_term, *enclosed,
298
ex->cs ? ex->cs : get_default_db_collation(session->db),
299
*field_term,*ex->line_start, *ex->line_term, *enclosed,
326
300
info.escape_char, is_fifo);
327
301
if (read_info.error)
330
internal::my_close(file,MYF(0)); // no files in net reading
304
my_close(file,MYF(0)); // no files in net reading
331
305
return(true); // Can't allocate buffers
375
349
error= read_sep_field(session, info, table_list, fields_vars,
376
350
set_fields, set_values, read_info,
377
351
*enclosed, skip_lines, ignore);
378
if (table->cursor->ha_end_bulk_insert() && !error)
352
if (table->file->ha_end_bulk_insert() && !error)
380
table->print_error(errno, MYF(0));
354
table->file->print_error(my_errno, MYF(0));
383
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
384
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
357
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
358
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
385
359
table->next_number_field=0;
388
internal::my_close(file,MYF(0));
362
my_close(file,MYF(0));
389
363
free_blobs(table); /* if pack_blob was used */
390
364
table->copy_blobs=0;
391
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
365
session->count_cuted_fields= CHECK_FIELD_IGNORE;
393
367
simulated killing in the middle of per-row loop
394
368
must be effective for binlogging
396
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
370
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
399
373
error= -1; // Error on read
404
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
405
(info.records - info.copied), session->cuted_fields);
407
if (session->transaction.stmt.hasModifiedNonTransData())
408
session->transaction.all.markModifiedNonTransData();
376
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
377
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
379
if (session->transaction.stmt.modified_non_trans_table)
380
session->transaction.all.modified_non_trans_table= true;
410
382
/* ok to client sent only after binlog write and engine commit */
411
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
383
session->my_ok(info.copied + info.deleted, 0L, name);
413
385
assert(transactional_table || !(info.copied || info.deleted) ||
414
session->transaction.stmt.hasModifiedNonTransData());
415
table->cursor->ha_release_auto_increment();
386
session->transaction.stmt.modified_non_trans_table);
387
table->file->ha_release_auto_increment();
416
388
table->auto_increment_field_not_null= false;
417
389
session->abort_on_warning= 0;
791
763
/* Set of a stack for unget if long terminators */
792
size_t length= max(field_term_length,line_term_length)+1;
793
set_if_bigger(length, line_start.length());
794
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
764
uint32_t length=cmax(field_term_length,line_term_length)+1;
765
set_if_bigger(length,line_start.length());
766
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
796
768
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
769
error=1; /* purecov: inspected */
800
772
end_of_buff=buffer+buff_length;
801
if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
(false) ? internal::READ_NET :
803
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
773
if (init_io_cache(&cache,(false) ? -1 : file, 0,
775
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
806
free((unsigned char*) buffer);
778
free((unsigned char*) buffer); /* purecov: inspected */
894
866
while ( to < end_of_buff)
897
870
if ((my_mbcharlen(read_charset, chr) > 1) &&
898
871
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
900
unsigned char* p = (unsigned char*)to;
902
int ml = my_mbcharlen(read_charset, chr);
904
for (i=1; i<ml; i++) {
910
if (my_ismbchar(read_charset,
915
PUSH((unsigned char) *--to);
873
unsigned char* p = (unsigned char*)to;
875
int ml = my_mbcharlen(read_charset, chr);
877
for (i=1; i<ml; i++) {
883
if (my_ismbchar(read_charset,
888
PUSH((unsigned char) *--to);
918
892
if (chr == my_b_EOF)
920
894
if (chr == escape_char)
922
if ((chr=GET) == my_b_EOF)
924
*to++= (unsigned char) escape_char;
896
if ((chr=GET) == my_b_EOF)
898
*to++= (unsigned char) escape_char;
928
902
When escape_char == enclosed_char, we treat it like we do for
929
903
handling quotes in SQL parsing -- you can double-up the
942
916
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
943
917
if (chr == line_term_char)
945
if (chr == line_term_char && found_enclosed_char == INT_MAX)
919
if (chr == line_term_char && found_enclosed_char == INT_MAX)
948
if (terminator(line_term_ptr,line_term_length))
949
{ // Maybe unexpected linefeed
922
if (terminator(line_term_ptr,line_term_length))
923
{ // Maybe unexpected linefeed
957
931
if (chr == found_enclosed_char)
959
if ((chr=GET) == found_enclosed_char)
960
{ // Remove dupplicated
961
*to++ = (unsigned char) chr;
964
// End of enclosed field if followed by field_term or line_term
965
if (chr == my_b_EOF ||
966
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
967
{ // Maybe unexpected linefeed
974
if (chr == field_term_char &&
975
terminator(field_term_ptr,field_term_length))
983
The string didn't terminate yet.
984
Store back next character for the loop
987
/* copy the found term character to 'to' */
988
chr= found_enclosed_char;
933
if ((chr=GET) == found_enclosed_char)
934
{ // Remove dupplicated
935
*to++ = (unsigned char) chr;
938
// End of enclosed field if followed by field_term or line_term
939
if (chr == my_b_EOF ||
940
(chr == line_term_char && terminator(line_term_ptr, line_term_length)))
941
{ // Maybe unexpected linefeed
948
if (chr == field_term_char &&
949
terminator(field_term_ptr,field_term_length))
957
The string didn't terminate yet.
958
Store back next character for the loop
961
/* copy the found term character to 'to' */
962
chr= found_enclosed_char;
990
964
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
992
if (terminator(field_term_ptr,field_term_length))
966
if (terminator(field_term_ptr,field_term_length))
1000
974
*to++ = (unsigned char) chr;
1003
** We come here if buffer is too small. Enlarge it and continue
977
** We come here if buffer is too small. Enlarge it and continue
1005
979
if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
1006
980
return (error=1);
1007
981
to=new_buffer + (to-buffer);