12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
21
20
#include <drizzled/sql_load.h>
22
21
#include <drizzled/error.h>
23
22
#include <drizzled/data_home.h>
24
23
#include <drizzled/session.h>
25
24
#include <drizzled/sql_base.h>
26
#include <drizzled/field/epoch.h>
27
#include "drizzled/internal/my_sys.h"
28
#include "drizzled/internal/iocache.h"
29
#include <drizzled/db.h>
30
#include "drizzled/plugin/storage_engine.h"
25
#include <drizzled/field/timestamp.h>
34
27
#include <algorithm>
36
#include <boost/filesystem.hpp>
38
namespace fs=boost::filesystem;
39
29
using namespace std;
45
33
unsigned char *buffer; /* Buffer for read text */
46
34
unsigned char *end_of_buff; /* Data in bufferts ends here */
47
35
size_t buff_length; /* Length of buffert */
78
66
void end_io_cache()
68
::end_io_cache(&cache);
81
69
need_end_io_cache = 0;
85
73
Either this method, or we need to make cache public
86
Arg must be set from load() since constructor does not see
74
Arg must be set from mysql_load() since constructor does not see
87
75
either the table or Session value
89
77
void set_io_cache_arg(void* arg) { cache.arg = arg; }
92
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
80
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
93
81
List<Item> &fields_vars, List<Item> &set_fields,
94
82
List<Item> &set_values, READ_INFO &read_info,
95
83
uint32_t skip_lines,
96
84
bool ignore_check_option_errors);
97
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
85
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
98
86
List<Item> &fields_vars, List<Item> &set_fields,
99
87
List<Item> &set_values, READ_INFO &read_info,
100
88
String &enclosed, uint32_t skip_lines,
121
109
true - error / false - success
124
int load(Session *session,file_exchange *ex,TableList *table_list,
112
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
125
113
List<Item> &fields_vars, List<Item> &set_fields,
126
114
List<Item> &set_values,
127
115
enum enum_duplicates handle_duplicates, bool ignore)
117
char name[FN_REFLEN];
130
119
Table *table= NULL;
132
121
String *field_term=ex->field_term,*escaped=ex->escaped;
133
122
String *enclosed=ex->enclosed;
136
assert(table_list->getSchemaName()); // This should never be null
124
char *db= table_list->db; // This is never null
139
If path for cursor is not defined, we will use the current database.
127
If path for file is not defined, we will use the current database.
140
128
If this is not set, we will use the directory where the table to be
141
129
loaded is located
143
util::string::const_shared_ptr schema(session->schema());
144
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
131
char *tdb= session->db ? session->db : db; // Result is never null
146
133
uint32_t skip_lines= ex->skip_lines;
147
134
bool transactional_table;
148
Session::killed_state_t killed_status= Session::NOT_KILLED;
135
Session::killed_state killed_status= Session::NOT_KILLED;
150
137
/* Escape and enclosed character may be a utf8 4-byte character */
151
138
if (escaped->length() > 4 || enclosed->length() > 4)
171
158
table is marked to be 'used for insert' in which case we should never
172
159
mark this table as 'const table' (ie, one that has only one row).
174
if (unique_table(table_list, table_list->next_global))
161
if (unique_table(session, table_list, table_list->next_global, 0))
176
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
163
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
180
167
table= table_list->table;
181
transactional_table= table->cursor->has_transactions();
168
transactional_table= table->file->has_transactions();
183
170
if (!fields_vars.elements)
186
for (field= table->getFields(); *field ; field++)
173
for (field=table->field; *field ; field++)
187
174
fields_vars.push_back(new Item_field(*field));
188
175
table->setWriteSet();
189
176
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
262
fs::path to_file(ex->file_name);
263
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
264
if (not to_file.has_root_directory())
266
int count_elements= 0;
267
for (fs::path::iterator iter= to_file.begin();
268
iter != to_file.end();
269
++iter, ++count_elements)
272
if (count_elements == 1)
276
target_path /= to_file;
280
target_path= to_file;
283
if (not secure_file_priv.string().empty())
285
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
287
/* Read only allowed from within dir specified by secure_file_priv */
288
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
248
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
249
ex->file_name+=dirname_length(ex->file_name);
251
if (!dirname_length(ex->file_name))
253
strcpy(name, drizzle_real_data_home);
254
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
255
(void) fn_format(name, ex->file_name, name, "",
256
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
260
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
261
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
263
if (opt_secure_file_priv &&
264
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
266
/* Read only allowed from within dir specified by secure_file_priv */
267
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
271
struct stat stat_info;
272
if (stat(name,&stat_info))
274
my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
278
// if we are not in slave thread, the file must be:
279
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
280
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
281
((stat_info.st_mode & S_IFREG) == S_IFREG ||
282
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
284
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
287
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
290
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
292
my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
293
struct stat stat_info;
294
if (stat(target_path.file_string().c_str(), &stat_info))
296
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
300
// if we are not in slave thread, the cursor must be:
301
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
302
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
303
((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
306
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
309
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
313
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
315
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
319
298
memset(&info, 0, sizeof(info));
320
299
info.ignore= ignore;
321
300
info.handle_duplicates=handle_duplicates;
322
301
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
324
identifier::Schema identifier(*schema);
325
303
READ_INFO read_info(file, tot_length,
326
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
327
*field_term, *ex->line_start, *ex->line_term, *enclosed,
304
ex->cs ? ex->cs : get_default_db_collation(session->db),
305
*field_term,*ex->line_start, *ex->line_term, *enclosed,
328
306
info.escape_char, is_fifo);
329
307
if (read_info.error)
332
internal::my_close(file,MYF(0)); // no files in net reading
310
my_close(file,MYF(0)); // no files in net reading
333
311
return(true); // Can't allocate buffers
361
339
table->next_number_field=table->found_next_number_field;
363
341
handle_duplicates == DUP_REPLACE)
364
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
342
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
365
343
if (handle_duplicates == DUP_REPLACE)
366
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
367
table->cursor->ha_start_bulk_insert((ha_rows) 0);
344
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
345
table->file->ha_start_bulk_insert((ha_rows) 0);
368
346
table->copy_blobs=1;
370
session->setAbortOnWarning(true);
348
session->abort_on_warning= true;
372
350
if (!field_term->length() && !enclosed->length())
373
351
error= read_fixed_length(session, info, table_list, fields_vars,
377
355
error= read_sep_field(session, info, table_list, fields_vars,
378
356
set_fields, set_values, read_info,
379
357
*enclosed, skip_lines, ignore);
380
if (table->cursor->ha_end_bulk_insert() && !error)
358
if (table->file->ha_end_bulk_insert() && !error)
382
table->print_error(errno, MYF(0));
360
table->file->print_error(my_errno, MYF(0));
385
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
386
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
363
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
364
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
387
365
table->next_number_field=0;
390
internal::my_close(file,MYF(0));
368
my_close(file,MYF(0));
391
369
free_blobs(table); /* if pack_blob was used */
392
370
table->copy_blobs=0;
393
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
371
session->count_cuted_fields= CHECK_FIELD_IGNORE;
395
373
simulated killing in the middle of per-row loop
396
374
must be effective for binlogging
398
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
376
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
401
379
error= -1; // Error on read
406
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
407
(info.records - info.copied), session->cuted_fields);
409
if (session->transaction.stmt.hasModifiedNonTransData())
410
session->transaction.all.markModifiedNonTransData();
382
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
383
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
385
if (session->transaction.stmt.modified_non_trans_table)
386
session->transaction.all.modified_non_trans_table= true;
412
388
/* ok to client sent only after binlog write and engine commit */
413
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
389
session->my_ok(info.copied + info.deleted, 0, 0L, name);
415
391
assert(transactional_table || !(info.copied || info.deleted) ||
416
session->transaction.stmt.hasModifiedNonTransData());
417
table->cursor->ha_release_auto_increment();
392
session->transaction.stmt.modified_non_trans_table);
393
table->file->ha_release_auto_increment();
418
394
table->auto_increment_field_not_null= false;
419
session->setAbortOnWarning(false);
395
session->abort_on_warning= 0;
798
769
/* Set of a stack for unget if long terminators */
799
size_t length= max(field_term_length,line_term_length)+1;
800
set_if_bigger(length, line_start.length());
801
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
770
uint32_t length= max(field_term_length,line_term_length)+1;
771
set_if_bigger(length,line_start.length());
772
stack= stack_pos= (int*) sql_alloc(sizeof(int)*length);
803
774
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
775
error=1; /* purecov: inspected */
807
778
end_of_buff=buffer+buff_length;
808
if (cache.init_io_cache((false) ? -1 : cursor, 0,
809
(false) ? internal::READ_NET :
810
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
779
if (init_io_cache(&cache,(false) ? -1 : file, 0,
781
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
813
free((unsigned char*) buffer);
784
free((unsigned char*) buffer); /* purecov: inspected */