19
19
#include <drizzled/server_includes.h>
20
20
#include "sql_repl.h"
21
#include <drizzled/error.h>
22
#include <drizzled/data_home.h>
21
#include <drizzled/drizzled_error_messages.h>
27
unsigned char *buffer, /* Buffer for read text */
26
uchar *buffer, /* Buffer for read text */
28
27
*end_of_buff; /* Data in bufferts ends here */
29
28
uint buff_length, /* Length of buffert */
30
29
max_length; /* Max length of row */
41
40
bool error,line_cuted,found_null,enclosed;
42
unsigned char *row_start, /* Found row starts here */
41
uchar *row_start, /* Found row starts here */
43
42
*row_end; /* Found row ends here */
44
43
const CHARSET_INFO *read_charset;
46
READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
45
READ_INFO(File file,uint tot_length, const CHARSET_INFO * const cs,
47
46
String &field_term,String &line_start,String &line_term,
48
47
String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
68
67
Either this method, or we need to make cache public
69
68
Arg must be set from mysql_load() since constructor does not see
70
either the table or Session value
69
either the table or THD value
72
71
void set_io_cache_arg(void* arg) { cache.arg = arg; }
75
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
76
75
List<Item> &fields_vars, List<Item> &set_fields,
77
76
List<Item> &set_values, READ_INFO &read_info,
78
77
uint32_t skip_lines,
79
78
bool ignore_check_option_errors);
80
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
79
static int read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
81
80
List<Item> &fields_vars, List<Item> &set_fields,
82
81
List<Item> &set_values, READ_INFO &read_info,
83
82
String &enclosed, uint32_t skip_lines,
84
83
bool ignore_check_option_errors);
86
static bool write_execute_load_query_log_event(Session *session,
85
static bool write_execute_load_query_log_event(THD *thd,
87
86
bool duplicates, bool ignore,
88
87
bool transactional_table,
89
Session::killed_state killed_status);
88
THD::killed_state killed_status);
92
91
Execute LOAD DATA query
96
session - current thread
97
96
ex - sql_exchange object representing source file and its parsing rules
98
97
table_list - list of tables to which we are loading data
99
98
fields_vars - list of fields and variables to which we read
109
108
true - error / false - success
112
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
111
int mysql_load(THD *thd,sql_exchange *ex,TableList *table_list,
113
112
List<Item> &fields_vars, List<Item> &set_fields,
114
113
List<Item> &set_values,
115
114
enum enum_duplicates handle_duplicates, bool ignore,
129
128
If this is not set, we will use the directory where the table to be
130
129
loaded is located
132
char *tdb= session->db ? session->db : db; // Result is never null
131
char *tdb= thd->db ? thd->db : db; // Result is never null
133
132
uint32_t skip_lines= ex->skip_lines;
134
133
bool transactional_table;
135
Session::killed_state killed_status= Session::NOT_KILLED;
134
THD::killed_state killed_status= THD::NOT_KILLED;
137
136
if (escaped->length() > 1 || enclosed->length() > 1)
143
if (open_and_lock_tables(session, table_list))
142
if (open_and_lock_tables(thd, table_list))
145
if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
146
&session->lex->select_lex.top_join_list,
144
if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
145
&thd->lex->select_lex.top_join_list,
148
&session->lex->select_lex.leaf_tables, true))
147
&thd->lex->select_lex.leaf_tables, true))
156
155
table is marked to be 'used for insert' in which case we should never
157
156
mark this table as 'const table' (ie, one that has only one row).
159
if (unique_table(session, table_list, table_list->next_global, 0))
158
if (unique_table(thd, table_list, table_list->next_global, 0))
161
160
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
176
175
Let us also prepare SET clause, altough it is probably empty
179
if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
180
setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
178
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
179
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
184
183
{ // Part field list
185
184
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
186
if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
187
setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
188
check_that_all_fields_are_given_values(session, table, table_list))
185
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
186
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
187
check_that_all_fields_are_given_values(thd, table, table_list))
191
190
Check whenever TIMESTAMP field with auto-set feature specified
205
204
/* Fix the expressions in SET clause */
206
if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
205
if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
210
209
table->mark_columns_needed_for_insert();
212
uint32_t tot_length=0;
213
212
bool use_blobs= 0, use_vars= 0;
214
213
List_iterator_fast<Item> it(fields_vars);
261
260
if (!dirname_length(ex->file_name))
263
strcpy(name, mysql_real_data_home);
264
strncat(name, tdb, FN_REFLEN-strlen(mysql_real_data_home)-1);
262
strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NullS);
265
263
(void) fn_format(name, ex->file_name, name, "",
266
264
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
285
283
// if we are not in slave thread, the file must be:
286
if (!session->slave_thread &&
284
if (!thd->slave_thread &&
287
285
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
288
286
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
287
((stat_info.st_mode & S_IFREG) == S_IFREG ||
306
304
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
308
306
READ_INFO read_info(file,tot_length,
309
ex->cs ? ex->cs : session->variables.collation_database,
307
ex->cs ? ex->cs : thd->variables.collation_database,
310
308
*field_term,*ex->line_start, *ex->line_term, *enclosed,
311
309
info.escape_char, read_file_from_client, is_fifo);
312
310
if (read_info.error)
319
317
if (mysql_bin_log.is_open())
321
lf_info.session = session;
322
320
lf_info.wrote_create_file = 0;
323
321
lf_info.last_pos_in_file = HA_POS_ERROR;
324
322
lf_info.log_delayed= transactional_table;
325
323
read_info.set_io_cache_arg((void*) &lf_info);
328
session->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
329
session->cuted_fields=0L;
326
thd->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
327
thd->cuted_fields=0L;
330
328
/* Skip lines if there is a line terminator */
331
329
if (ex->line_term->length())
351
349
table->file->ha_start_bulk_insert((ha_rows) 0);
352
350
table->copy_blobs=1;
354
session->abort_on_warning= true;
352
thd->abort_on_warning= true;
356
354
if (!field_term->length() && !enclosed->length())
357
error= read_fixed_length(session, info, table_list, fields_vars,
355
error= read_fixed_length(thd, info, table_list, fields_vars,
358
356
set_fields, set_values, read_info,
359
357
skip_lines, ignore);
361
error= read_sep_field(session, info, table_list, fields_vars,
359
error= read_sep_field(thd, info, table_list, fields_vars,
362
360
set_fields, set_values, read_info,
363
361
*enclosed, skip_lines, ignore);
364
362
if (table->file->ha_end_bulk_insert() && !error)
374
372
my_close(file,MYF(0));
375
373
free_blobs(table); /* if pack_blob was used */
376
374
table->copy_blobs=0;
377
session->count_cuted_fields= CHECK_FIELD_IGNORE;
375
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
379
377
simulated killing in the middle of per-row loop
380
378
must be effective for binlogging
382
killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
380
killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
385
383
if (read_file_from_client)
415
413
/* If the file was not empty, wrote_create_file is true */
416
414
if (lf_info.wrote_create_file)
418
if (session->transaction.stmt.modified_non_trans_table)
419
write_execute_load_query_log_event(session, handle_duplicates,
416
if (thd->transaction.stmt.modified_non_trans_table)
417
write_execute_load_query_log_event(thd, handle_duplicates,
420
418
ignore, transactional_table,
424
Delete_file_log_event d(session, db, transactional_table);
422
Delete_file_log_event d(thd, db, transactional_table);
425
423
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
426
424
mysql_bin_log.write(&d);
434
432
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
435
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
433
(uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
437
if (session->transaction.stmt.modified_non_trans_table)
438
session->transaction.all.modified_non_trans_table= true;
435
if (thd->transaction.stmt.modified_non_trans_table)
436
thd->transaction.all.modified_non_trans_table= true;
440
438
if (mysql_bin_log.is_open())
459
457
read_info.end_io_cache();
460
458
if (lf_info.wrote_create_file)
462
write_execute_load_query_log_event(session, handle_duplicates, ignore,
460
write_execute_load_query_log_event(thd, handle_duplicates, ignore,
463
461
transactional_table,killed_status);
468
466
/* ok to client sent only after binlog write and engine commit */
469
my_ok(session, info.copied + info.deleted, 0L, name);
467
my_ok(thd, info.copied + info.deleted, 0L, name);
471
469
assert(transactional_table || !(info.copied || info.deleted) ||
472
session->transaction.stmt.modified_non_trans_table);
470
thd->transaction.stmt.modified_non_trans_table);
473
471
table->file->ha_release_auto_increment();
474
472
table->auto_increment_field_not_null= false;
475
session->abort_on_warning= 0;
473
thd->abort_on_warning= 0;
480
478
/* Not a very useful function; just to avoid duplication of code */
481
static bool write_execute_load_query_log_event(Session *session,
479
static bool write_execute_load_query_log_event(THD *thd,
482
480
bool duplicates, bool ignore,
483
481
bool transactional_table,
484
Session::killed_state killed_err_arg)
482
THD::killed_state killed_err_arg)
486
484
Execute_load_query_log_event
487
e(session, session->query, session->query_length,
488
(char*)session->lex->fname_start - (char*)session->query,
489
(char*)session->lex->fname_end - (char*)session->query,
485
e(thd, thd->query, thd->query_length,
486
(char*)thd->lex->fname_start - (char*)thd->query,
487
(char*)thd->lex->fname_end - (char*)thd->query,
490
488
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
491
489
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
492
490
transactional_table, false, killed_err_arg);
500
498
****************************************************************************/
503
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
501
read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
504
502
List<Item> &fields_vars, List<Item> &set_fields,
505
503
List<Item> &set_values, READ_INFO &read_info,
506
504
uint32_t skip_lines, bool ignore_check_option_errors)
557
555
if (pos == read_info.row_end)
559
session->cuted_fields++; /* Not enough fields */
560
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
557
thd->cuted_fields++; /* Not enough fields */
558
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
561
559
ER_WARN_TOO_FEW_RECORDS,
562
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
560
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
563
561
if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
564
562
((Field_timestamp*) field)->set_time();
569
unsigned char save_chr;
570
568
if ((length=(uint) (read_info.row_end-pos)) >
571
569
field->field_length)
572
570
length=field->field_length;
580
578
if (pos != read_info.row_end)
582
session->cuted_fields++; /* To long row */
583
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
580
thd->cuted_fields++; /* To long row */
581
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
584
582
ER_WARN_TOO_MANY_RECORDS,
585
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
583
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
588
if (session->killed ||
589
fill_record(session, set_fields, set_values,
587
fill_record(thd, set_fields, set_values,
590
588
ignore_check_option_errors))
593
err= write_record(session, table, &info);
591
err= write_record(thd, table, &info);
594
592
table->auto_increment_field_not_null= false;
604
602
if (read_info.line_cuted)
606
session->cuted_fields++; /* To long row */
607
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
604
thd->cuted_fields++; /* To long row */
605
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
608
606
ER_WARN_TOO_MANY_RECORDS,
609
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
607
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
611
session->row_count++;
613
611
return(test(read_info.error));
619
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
617
read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
620
618
List<Item> &fields_vars, List<Item> &set_fields,
621
619
List<Item> &set_values, READ_INFO &read_info,
622
620
String &enclosed, uint32_t skip_lines,
747
745
QQ: We probably should not throw warning for each field.
748
746
But how about intention to always have the same number
749
of warnings in Session::cuted_fields (and get rid of cuted_fields
747
of warnings in THD::cuted_fields (and get rid of cuted_fields
752
session->cuted_fields++;
753
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
751
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
754
752
ER_WARN_TOO_FEW_RECORDS,
755
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
753
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
757
755
else if (item->type() == Item::STRING_ITEM)
770
if (session->killed ||
771
fill_record(session, set_fields, set_values,
769
fill_record(thd, set_fields, set_values,
772
770
ignore_check_option_errors))
775
err= write_record(session, table, &info);
773
err= write_record(thd, table, &info);
776
774
table->auto_increment_field_not_null= false;
785
783
if (read_info.line_cuted)
787
session->cuted_fields++; /* To long row */
788
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
785
thd->cuted_fields++; /* To long row */
786
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
789
787
ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
794
session->row_count++;
796
794
return(test(read_info.error));
827
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
825
READ_INFO::READ_INFO(File file_par, uint tot_length, const CHARSET_INFO * const cs,
828
826
String &field_term, String &line_start, String &line_term,
829
827
String &enclosed_par, int escape, bool get_it_from_net,
854
852
line_term_ptr=(char*) "";
856
854
enclosed_char= (enclosed_length=enclosed_par.length()) ?
857
(unsigned char) enclosed_par[0] : INT_MAX;
858
field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
859
line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
855
(uchar) enclosed_par[0] : INT_MAX;
856
field_term_char= field_term_length ? (uchar) field_term_ptr[0] : INT_MAX;
857
line_term_char= line_term_length ? (uchar) line_term_ptr[0] : INT_MAX;
860
858
error=eof=found_end_of_line=found_null=line_cuted=0;
861
859
buff_length=tot_length;
864
862
/* Set of a stack for unget if long terminators */
865
uint32_t length=cmax(field_term_length,line_term_length)+1;
863
uint length=max(field_term_length,line_term_length)+1;
866
864
set_if_bigger(length,line_start.length());
867
865
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
869
if (!(buffer=(unsigned char*) my_malloc(buff_length+1,MYF(0))))
867
if (!(buffer=(uchar*) my_malloc(buff_length+1,MYF(0))))
870
868
error=1; /* purecov: inspected */
915
913
#define PUSH(A) *(stack_pos++)=(A)
918
inline int READ_INFO::terminator(char *ptr,uint32_t length)
916
inline int READ_INFO::terminator(char *ptr,uint length)
920
918
int chr=0; // Keep gcc happy
922
920
for (i=1 ; i < length ; i++)
924
922
if ((chr=GET) != *++ptr)
1082
*to++ = (unsigned char) chr;
1080
*to++ = (uchar) chr;
1085
1083
** We come here if buffer is too small. Enlarge it and continue
1087
if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
1085
if (!(new_buffer=(uchar*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
1089
1087
return (error=1);
1090
1088
to=new_buffer + (to-buffer);
1140
1138
if ((chr=GET) == my_b_EOF)
1142
*to++= (unsigned char) escape_char;
1140
*to++= (uchar) escape_char;
1143
1141
goto found_eof;
1145
*to++ =(unsigned char) unescape((char) chr);
1143
*to++ =(uchar) unescape((char) chr);
1148
1146
if (chr == line_term_char)