12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Copy data from a textfile to table */
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
21
21
#include <drizzled/error.h>
22
22
#include <drizzled/data_home.h>
23
23
#include <drizzled/session.h>
24
24
#include <drizzled/sql_base.h>
25
25
#include <drizzled/field/timestamp.h>
26
#include "drizzled/internal/my_sys.h"
27
#include "drizzled/internal/iocache.h"
28
#include <drizzled/db.h>
32
27
#include <algorithm>
34
#include <boost/filesystem.hpp>
36
namespace fs=boost::filesystem;
37
29
using namespace std;
43
33
unsigned char *buffer; /* Buffer for read text */
44
34
unsigned char *end_of_buff; /* Data in bufferts ends here */
45
35
size_t buff_length; /* Length of buffert */
87
77
void set_io_cache_arg(void* arg) { cache.arg = arg; }
90
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
80
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
91
81
List<Item> &fields_vars, List<Item> &set_fields,
92
82
List<Item> &set_values, READ_INFO &read_info,
93
83
uint32_t skip_lines,
94
84
bool ignore_check_option_errors);
95
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
85
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
96
86
List<Item> &fields_vars, List<Item> &set_fields,
97
87
List<Item> &set_values, READ_INFO &read_info,
98
88
String &enclosed, uint32_t skip_lines,
124
114
List<Item> &set_values,
125
115
enum enum_duplicates handle_duplicates, bool ignore)
117
char name[FN_REFLEN];
128
119
Table *table= NULL;
130
121
String *field_term=ex->field_term,*escaped=ex->escaped;
131
122
String *enclosed=ex->enclosed;
134
assert(table_list->getSchemaName()); // This should never be null
124
char *db= table_list->db; // This is never null
137
If path for cursor is not defined, we will use the current database.
127
If path for file is not defined, we will use the current database.
138
128
If this is not set, we will use the directory where the table to be
139
129
loaded is located
141
util::string::const_shared_ptr schema(session->schema());
142
const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
131
char *tdb= session->db ? session->db : db; // Result is never null
144
133
uint32_t skip_lines= ex->skip_lines;
145
134
bool transactional_table;
146
Session::killed_state_t killed_status= Session::NOT_KILLED;
135
Session::killed_state killed_status= Session::NOT_KILLED;
148
137
/* Escape and enclosed character may be a utf8 4-byte character */
149
138
if (escaped->length() > 4 || enclosed->length() > 4)
169
156
table is marked to be 'used for insert' in which case we should never
170
157
mark this table as 'const table' (ie, one that has only one row).
172
if (unique_table(table_list, table_list->next_global))
159
if (unique_table(session, table_list, table_list->next_global, 0))
174
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
161
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
178
165
table= table_list->table;
179
transactional_table= table->cursor->has_transactions();
166
transactional_table= table->file->has_transactions();
181
168
if (!fields_vars.elements)
184
for (field= table->getFields(); *field ; field++)
171
for (field=table->field; *field ; field++)
185
172
fields_vars.push_back(new Item_field(*field));
186
173
table->setWriteSet();
187
174
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
260
fs::path to_file(ex->file_name);
261
fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
if (not to_file.has_root_directory())
264
int count_elements= 0;
265
for (fs::path::iterator iter= to_file.begin();
266
iter != to_file.end();
267
++iter, ++count_elements)
270
if (count_elements == 1)
274
target_path /= to_file;
278
target_path= to_file;
281
if (not secure_file_priv.string().empty())
283
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
285
/* Read only allowed from within dir specified by secure_file_priv */
286
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
246
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
247
ex->file_name+=dirname_length(ex->file_name);
249
if (!dirname_length(ex->file_name))
251
strcpy(name, drizzle_real_data_home);
252
strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
253
(void) fn_format(name, ex->file_name, name, "",
254
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
258
(void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
259
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
261
if (opt_secure_file_priv &&
262
strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
264
/* Read only allowed from within dir specified by secure_file_priv */
265
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
269
struct stat stat_info;
270
if (stat(name,&stat_info))
272
my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
276
// if we are not in slave thread, the file must be:
277
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
278
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
279
((stat_info.st_mode & S_IFREG) == S_IFREG ||
280
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
282
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
285
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
288
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
290
my_error(ER_CANT_OPEN_FILE, MYF(0), name, my_errno);
291
struct stat stat_info;
292
if (stat(target_path.file_string().c_str(), &stat_info))
294
my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
298
// if we are not in slave thread, the cursor must be:
299
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
300
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
304
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
311
if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
313
my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
317
296
memset(&info, 0, sizeof(info));
318
297
info.ignore= ignore;
319
298
info.handle_duplicates=handle_duplicates;
320
299
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
322
SchemaIdentifier identifier(*schema);
323
301
READ_INFO read_info(file, tot_length,
324
ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
*field_term, *ex->line_start, *ex->line_term, *enclosed,
302
ex->cs ? ex->cs : get_default_db_collation(session->db),
303
*field_term,*ex->line_start, *ex->line_term, *enclosed,
326
304
info.escape_char, is_fifo);
327
305
if (read_info.error)
330
internal::my_close(file,MYF(0)); // no files in net reading
308
my_close(file,MYF(0)); // no files in net reading
331
309
return(true); // Can't allocate buffers
375
353
error= read_sep_field(session, info, table_list, fields_vars,
376
354
set_fields, set_values, read_info,
377
355
*enclosed, skip_lines, ignore);
378
if (table->cursor->ha_end_bulk_insert() && !error)
356
if (table->file->ha_end_bulk_insert() && !error)
380
table->print_error(errno, MYF(0));
358
table->file->print_error(my_errno, MYF(0));
383
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
384
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
361
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
362
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
385
363
table->next_number_field=0;
388
internal::my_close(file,MYF(0));
366
my_close(file,MYF(0));
389
367
free_blobs(table); /* if pack_blob was used */
390
368
table->copy_blobs=0;
391
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
369
session->count_cuted_fields= CHECK_FIELD_IGNORE;
393
371
simulated killing in the middle of per-row loop
394
372
must be effective for binlogging
396
killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
374
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
399
377
error= -1; // Error on read
404
snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
405
(info.records - info.copied), session->cuted_fields);
407
if (session->transaction.stmt.hasModifiedNonTransData())
408
session->transaction.all.markModifiedNonTransData();
380
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
381
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
383
if (session->transaction.stmt.modified_non_trans_table)
384
session->transaction.all.modified_non_trans_table= true;
410
386
/* ok to client sent only after binlog write and engine commit */
411
session->my_ok(info.copied + info.deleted, 0, 0L, msg);
387
session->my_ok(info.copied + info.deleted, 0L, name);
413
389
assert(transactional_table || !(info.copied || info.deleted) ||
414
session->transaction.stmt.hasModifiedNonTransData());
415
table->cursor->ha_release_auto_increment();
390
session->transaction.stmt.modified_non_trans_table);
391
table->file->ha_release_auto_increment();
416
392
table->auto_increment_field_not_null= false;
417
393
session->abort_on_warning= 0;
791
767
/* Set of a stack for unget if long terminators */
792
size_t length= max(field_term_length,line_term_length)+1;
793
set_if_bigger(length, line_start.length());
794
stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
768
uint32_t length= max(field_term_length,line_term_length)+1;
769
set_if_bigger(length,line_start.length());
770
stack= stack_pos= (int*) sql_alloc(sizeof(int)*length);
796
772
if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
773
error=1; /* purecov: inspected */
800
776
end_of_buff=buffer+buff_length;
801
if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
(false) ? internal::READ_NET :
803
(is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
777
if (init_io_cache(&cache,(false) ? -1 : file, 0,
779
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
806
free((unsigned char*) buffer);
782
free((unsigned char*) buffer); /* purecov: inspected */