12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
21
20
#include <drizzled/sql_load.h>
22
21
#include <drizzled/error.h>
23
22
#include <drizzled/data_home.h>
24
23
#include <drizzled/session.h>
25
24
#include <drizzled/sql_base.h>
26
#include <drizzled/field/epoch.h>
27
#include <drizzled/internal/my_sys.h>
28
#include <drizzled/internal/iocache.h>
29
#include <drizzled/plugin/storage_engine.h>
25
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
31
30
#include <sys/stat.h>
33
32
#include <algorithm>
35
#include <boost/filesystem.hpp>
37
namespace fs=boost::filesystem;
38
35
using namespace std;
77
74
void end_io_cache()
76
internal::end_io_cache(&cache);
80
77
need_end_io_cache = 0;
84
81
Either this method, or we need to make cache public
85
Arg must be set from load() since constructor does not see
82
Arg must be set from mysql_load() since constructor does not see
86
83
either the table or Session value
88
85
void set_io_cache_arg(void* arg) { cache.arg = arg; }
91
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
88
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
92
89
List<Item> &fields_vars, List<Item> &set_fields,
93
90
List<Item> &set_values, READ_INFO &read_info,
94
91
uint32_t skip_lines,
95
92
bool ignore_check_option_errors);
96
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
93
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
97
94
List<Item> &fields_vars, List<Item> &set_fields,
98
95
List<Item> &set_values, READ_INFO &read_info,
99
96
String &enclosed, uint32_t skip_lines,
120
117
true - error / false - success
123
int load(Session *session,file_exchange *ex,TableList *table_list,
120
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
124
121
List<Item> &fields_vars, List<Item> &set_fields,
125
122
List<Item> &set_values,
126
123
enum enum_duplicates handle_duplicates, bool ignore)
125
char name[FN_REFLEN];
129
127
Table *table= NULL;
131
129
String *field_term=ex->field_term,*escaped=ex->escaped;
132
130
String *enclosed=ex->enclosed;
135
assert(table_list->getSchemaName()); // This should never be null
132
char *db= table_list->db; // This is never null
138
135
If path for cursor is not defined, we will use the current database.
139
136
If this is not set, we will use the directory where the table to be
140
137
loaded is located
142
util::string::const_shared_ptr schema(session->schema());
143
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
139
const char *tdb= session->db.empty() ? db : session->db.c_str(); // Result is never null
145
141
uint32_t skip_lines= ex->skip_lines;
146
142
bool transactional_table;
147
Session::killed_state_t killed_status= Session::NOT_KILLED;
143
Session::killed_state killed_status= Session::NOT_KILLED;
149
145
/* Escape and enclosed character may be a utf8 4-byte character */
150
146
if (escaped->length() > 4 || enclosed->length() > 4)
261
fs::path to_file(ex->file_name);
262
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
263
if (not to_file.has_root_directory())
265
int count_elements= 0;
266
for (fs::path::iterator iter= to_file.begin();
267
iter != to_file.end();
268
++iter, ++count_elements)
271
if (count_elements == 1)
275
target_path /= to_file;
279
target_path= to_file;
282
if (not secure_file_priv.string().empty())
284
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
286
/* Read only allowed from within dir specified by secure_file_priv */
287
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
256
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
257
ex->file_name+=dirname_length(ex->file_name);
259
if (!internal::dirname_length(ex->file_name))
261
strcpy(name, data_home_real);
262
strncat(name, tdb, FN_REFLEN-strlen(data_home_real)-1);
263
(void) internal::fn_format(name, ex->file_name, name, "",
264
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
268
(void) internal::fn_format(name, ex->file_name, data_home_real, "",
269
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
271
if (opt_secure_file_priv &&
272
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
274
/* Read only allowed from within dir specified by secure_file_priv */
275
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
279
struct stat stat_info;
280
if (stat(name,&stat_info))
282
my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
286
// if we are not in slave thread, the cursor must be:
287
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
288
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
292
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
295
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
298
if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
300
my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
292
struct stat stat_info;
293
if (stat(target_path.file_string().c_str(), &stat_info))
295
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
299
// if we are not in slave thread, the cursor must be:
300
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
301
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
302
((stat_info.st_mode & S_IFREG) == S_IFREG ||
303
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
305
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
308
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
312
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
314
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
318
306
memset(&info, 0, sizeof(info));
319
307
info.ignore= ignore;
320
308
info.handle_duplicates=handle_duplicates;
321
309
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
323
identifier::Schema identifier(*schema);
311
SchemaIdentifier identifier(session->db);
324
312
READ_INFO read_info(file, tot_length,
325
313
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
326
314
*field_term, *ex->line_start, *ex->line_term, *enclosed,
389
377
internal::my_close(file,MYF(0));
390
378
free_blobs(table); /* if pack_blob was used */
391
379
table->copy_blobs=0;
392
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
380
session->count_cuted_fields= CHECK_FIELD_IGNORE;
394
382
simulated killing in the middle of per-row loop
395
383
must be effective for binlogging
397
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
385
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
400
388
error= -1; // Error on read
405
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
406
(info.records - info.copied), session->cuted_fields);
391
snprintf(name, sizeof(name), ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
392
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
408
394
if (session->transaction.stmt.hasModifiedNonTransData())
409
395
session->transaction.all.markModifiedNonTransData();
411
397
/* ok to client sent only after binlog write and engine commit */
412
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
398
session->my_ok(info.copied + info.deleted, 0, 0L, name);
414
400
assert(transactional_table || !(info.copied || info.deleted) ||
415
401
session->transaction.stmt.hasModifiedNonTransData());
416
402
table->cursor->ha_release_auto_increment();
417
403
table->auto_increment_field_not_null= false;
418
session->setAbortOnWarning(false);
404
session->abort_on_warning= 0;
426
411
****************************************************************************/
429
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
414
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
430
415
List<Item> &fields_vars, List<Item> &set_fields,
431
416
List<Item> &set_values, READ_INFO &read_info,
432
417
uint32_t skip_lines, bool ignore_check_option_errors)
434
List<Item>::iterator it(fields_vars.begin());
419
List_iterator_fast<Item> it(fields_vars);
435
420
Item_field *sql_field;
436
421
Table *table= table_list->table;
549
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
533
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
550
534
List<Item> &fields_vars, List<Item> &set_fields,
551
535
List<Item> &set_values, READ_INFO &read_info,
552
536
String &enclosed, uint32_t skip_lines,
553
537
bool ignore_check_option_errors)
555
List<Item>::iterator it(fields_vars.begin());
539
List_iterator_fast<Item> it(fields_vars);
557
541
Table *table= table_list->table;
558
542
uint32_t enclosed_length;
606
590
field->set_null();
607
if (not field->maybe_null())
591
if (!field->maybe_null())
609
if (field->is_timestamp())
611
((field::Epoch::pointer) field)->set_time();
593
if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
594
((Field_timestamp*) field)->set_time();
613
595
else if (field != table->next_number_field)
615
field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
596
field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
597
ER_WARN_NULL_TO_NOTNULL, 1);
619
600
else if (item->type() == Item::STRING_ITEM)
806
787
end_of_buff=buffer+buff_length;
807
if (cache.init_io_cache((false) ? -1 : cursor, 0,
808
(false) ? internal::READ_NET :
809
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
788
if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
789
(false) ? internal::READ_NET :
790
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
812
793
free((unsigned char*) buffer);