17
17
/* Copy data from a textfile to table */
18
/* 2006-12 Erik Wetterberg : LOAD XML added */
19
#include <drizzled/server_includes.h>
20
#include "mysql_priv.h"
20
23
#include "sql_repl.h"
21
#include <drizzled/error.h>
22
#include <drizzled/data_home.h>
30
XML_TAG(int l, String f, String v);
34
XML_TAG::XML_TAG(int l, String f, String v)
27
unsigned char *buffer, /* Buffer for read text */
44
uchar *buffer, /* Buffer for read text */
28
45
*end_of_buff; /* Data in bufferts ends here */
29
46
uint buff_length, /* Length of buffert */
30
47
max_length; /* Max length of row */
36
53
bool need_end_io_cache;
56
int level; /* for load xml */
41
59
bool error,line_cuted,found_null,enclosed;
42
unsigned char *row_start, /* Found row starts here */
60
uchar *row_start, /* Found row starts here */
43
61
*row_end; /* Found row ends here */
44
const CHARSET_INFO *read_charset;
62
CHARSET_INFO *read_charset;
46
READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
64
READ_INFO(File file,uint tot_length,CHARSET_INFO *cs,
47
65
String &field_term,String &line_start,String &line_term,
48
66
String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
68
91
Either this method, or we need to make cache public
69
92
Arg must be set from mysql_load() since constructor does not see
70
either the table or Session value
93
either the table or THD value
72
95
void set_io_cache_arg(void* arg) { cache.arg = arg; }
75
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
98
static int read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
76
99
List<Item> &fields_vars, List<Item> &set_fields,
77
100
List<Item> &set_values, READ_INFO &read_info,
79
102
bool ignore_check_option_errors);
80
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
103
static int read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
81
104
List<Item> &fields_vars, List<Item> &set_fields,
82
105
List<Item> &set_values, READ_INFO &read_info,
83
String &enclosed, uint32_t skip_lines,
106
String &enclosed, ulong skip_lines,
84
107
bool ignore_check_option_errors);
86
static bool write_execute_load_query_log_event(Session *session,
109
static int read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
110
List<Item> &fields_vars, List<Item> &set_fields,
111
List<Item> &set_values, READ_INFO &read_info,
112
String &enclosed, ulong skip_lines,
113
bool ignore_check_option_errors);
115
static bool write_execute_load_query_log_event(THD *thd,
87
116
bool duplicates, bool ignore,
88
117
bool transactional_table,
89
Session::killed_state killed_status);
118
THD::killed_state killed_status);
92
121
Execute LOAD DATA query
96
session - current thread
97
126
ex - sql_exchange object representing source file and its parsing rules
98
127
table_list - list of tables to which we are loading data
99
128
fields_vars - list of fields and variables to which we read
176
205
Let us also prepare SET clause, altough it is probably empty
179
if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
180
setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
208
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
209
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
184
213
{ // Part field list
185
214
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
186
if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
187
setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
188
check_that_all_fields_are_given_values(session, table, table_list))
215
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
216
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
217
check_that_all_fields_are_given_values(thd, table, table_list))
191
220
Check whenever TIMESTAMP field with auto-set feature specified
303
memset(&info, 0, sizeof(info));
331
memset((char*) &info, 0, sizeof(info));
304
332
info.ignore= ignore;
305
333
info.handle_duplicates=handle_duplicates;
306
334
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
308
336
READ_INFO read_info(file,tot_length,
309
ex->cs ? ex->cs : session->variables.collation_database,
337
ex->cs ? ex->cs : thd->variables.collation_database,
310
338
*field_term,*ex->line_start, *ex->line_term, *enclosed,
311
339
info.escape_char, read_file_from_client, is_fifo);
312
340
if (read_info.error)
319
347
if (mysql_bin_log.is_open())
321
lf_info.session = session;
322
350
lf_info.wrote_create_file = 0;
323
351
lf_info.last_pos_in_file = HA_POS_ERROR;
324
352
lf_info.log_delayed= transactional_table;
325
353
read_info.set_io_cache_arg((void*) &lf_info);
328
session->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
329
session->cuted_fields=0L;
356
thd->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
357
thd->cuted_fields=0L;
330
358
/* Skip lines if there is a line terminator */
331
if (ex->line_term->length())
359
if (ex->line_term->length() && ex->filetype != FILETYPE_XML)
333
361
/* ex->skip_lines needs to be preserved for logging */
334
362
while (skip_lines > 0)
351
379
table->file->ha_start_bulk_insert((ha_rows) 0);
352
380
table->copy_blobs=1;
354
session->abort_on_warning= true;
382
thd->abort_on_warning= (!ignore &&
383
(thd->variables.sql_mode &
384
(MODE_STRICT_TRANS_TABLES |
385
MODE_STRICT_ALL_TABLES)));
356
if (!field_term->length() && !enclosed->length())
357
error= read_fixed_length(session, info, table_list, fields_vars,
387
if (ex->filetype == FILETYPE_XML) /* load xml */
388
error= read_xml_field(thd, info, table_list, fields_vars,
389
set_fields, set_values, read_info,
390
*(ex->line_term), skip_lines, ignore);
391
else if (!field_term->length() && !enclosed->length())
392
error= read_fixed_length(thd, info, table_list, fields_vars,
358
393
set_fields, set_values, read_info,
359
394
skip_lines, ignore);
361
error= read_sep_field(session, info, table_list, fields_vars,
396
error= read_sep_field(thd, info, table_list, fields_vars,
362
397
set_fields, set_values, read_info,
363
398
*enclosed, skip_lines, ignore);
364
399
if (table->file->ha_end_bulk_insert() && !error)
415
450
/* If the file was not empty, wrote_create_file is true */
416
451
if (lf_info.wrote_create_file)
418
if (session->transaction.stmt.modified_non_trans_table)
419
write_execute_load_query_log_event(session, handle_duplicates,
453
if (thd->transaction.stmt.modified_non_trans_table)
454
write_execute_load_query_log_event(thd, handle_duplicates,
420
455
ignore, transactional_table,
424
Delete_file_log_event d(session, db, transactional_table);
459
Delete_file_log_event d(thd, db, transactional_table);
425
460
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
426
461
mysql_bin_log.write(&d);
431
466
error= -1; // Error on read
434
sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
435
(uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
469
sprintf(name, ER(ER_LOAD_INFO), (ulong) info.records, (ulong) info.deleted,
470
(ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
437
if (session->transaction.stmt.modified_non_trans_table)
438
session->transaction.all.modified_non_trans_table= true;
472
if (thd->transaction.stmt.modified_non_trans_table)
473
thd->transaction.all.modified_non_trans_table= true;
440
475
if (mysql_bin_log.is_open())
459
494
read_info.end_io_cache();
460
495
if (lf_info.wrote_create_file)
462
write_execute_load_query_log_event(session, handle_duplicates, ignore,
497
write_execute_load_query_log_event(thd, handle_duplicates, ignore,
463
498
transactional_table,killed_status);
468
503
/* ok to client sent only after binlog write and engine commit */
469
my_ok(session, info.copied + info.deleted, 0L, name);
504
my_ok(thd, info.copied + info.deleted, 0L, name);
471
506
assert(transactional_table || !(info.copied || info.deleted) ||
472
session->transaction.stmt.modified_non_trans_table);
507
thd->transaction.stmt.modified_non_trans_table);
473
508
table->file->ha_release_auto_increment();
474
509
table->auto_increment_field_not_null= false;
475
session->abort_on_warning= 0;
510
thd->abort_on_warning= 0;
480
515
/* Not a very useful function; just to avoid duplication of code */
481
static bool write_execute_load_query_log_event(Session *session,
516
static bool write_execute_load_query_log_event(THD *thd,
482
517
bool duplicates, bool ignore,
483
518
bool transactional_table,
484
Session::killed_state killed_err_arg)
519
THD::killed_state killed_err_arg)
486
521
Execute_load_query_log_event
487
e(session, session->query, session->query_length,
488
(char*)session->lex->fname_start - (char*)session->query,
489
(char*)session->lex->fname_end - (char*)session->query,
522
e(thd, thd->query, thd->query_length,
523
(char*)thd->lex->fname_start - (char*)thd->query,
524
(char*)thd->lex->fname_end - (char*)thd->query,
490
525
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
491
526
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
492
527
transactional_table, false, killed_err_arg);
500
535
****************************************************************************/
503
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
538
read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
504
539
List<Item> &fields_vars, List<Item> &set_fields,
505
540
List<Item> &set_values, READ_INFO &read_info,
506
uint32_t skip_lines, bool ignore_check_option_errors)
541
ulong skip_lines, bool ignore_check_option_errors)
508
543
List_iterator_fast<Item> it(fields_vars);
509
544
Item_field *sql_field;
510
Table *table= table_list->table;
545
TABLE *table= table_list->table;
557
592
if (pos == read_info.row_end)
559
session->cuted_fields++; /* Not enough fields */
560
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
594
thd->cuted_fields++; /* Not enough fields */
595
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
561
596
ER_WARN_TOO_FEW_RECORDS,
562
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
563
if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
597
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
598
if (!field->maybe_null() && field->type() == FIELD_TYPE_TIMESTAMP)
564
599
((Field_timestamp*) field)->set_time();
569
unsigned char save_chr;
570
605
if ((length=(uint) (read_info.row_end-pos)) >
571
606
field->field_length)
572
607
length=field->field_length;
580
615
if (pos != read_info.row_end)
582
session->cuted_fields++; /* To long row */
583
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
617
thd->cuted_fields++; /* To long row */
618
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
584
619
ER_WARN_TOO_MANY_RECORDS,
585
ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
620
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
588
if (session->killed ||
589
fill_record(session, set_fields, set_values,
624
fill_record(thd, set_fields, set_values,
590
625
ignore_check_option_errors))
593
err= write_record(session, table, &info);
628
err= write_record(thd, table, &info);
594
629
table->auto_increment_field_not_null= false;
619
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
654
read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
620
655
List<Item> &fields_vars, List<Item> &set_fields,
621
656
List<Item> &set_values, READ_INFO &read_info,
622
String &enclosed, uint32_t skip_lines,
657
String &enclosed, ulong skip_lines,
623
658
bool ignore_check_option_errors)
625
660
List_iterator_fast<Item> it(fields_vars);
627
Table *table= table_list->table;
628
uint32_t enclosed_length;
662
TABLE *table= table_list->table;
663
uint enclosed_length;
738
773
if (field->reset())
740
775
my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
744
if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
779
if (!field->maybe_null() && field->type() == FIELD_TYPE_TIMESTAMP)
745
780
((Field_timestamp*) field)->set_time();
747
782
QQ: We probably should not throw warning for each field.
748
783
But how about intention to always have the same number
749
of warnings in Session::cuted_fields (and get rid of cuted_fields
784
of warnings in THD::cuted_fields (and get rid of cuted_fields
752
session->cuted_fields++;
753
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
788
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
754
789
ER_WARN_TOO_FEW_RECORDS,
755
ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
790
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
757
792
else if (item->type() == Item::STRING_ITEM)
785
820
if (read_info.line_cuted)
787
session->cuted_fields++; /* To long row */
788
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
822
thd->cuted_fields++; /* To long row */
823
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
789
824
ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
794
session->row_count++;
796
831
return(test(read_info.error));
835
/****************************************************************************
836
** Read rows in xml format
837
****************************************************************************/
839
read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
840
List<Item> &fields_vars, List<Item> &set_fields,
841
List<Item> &set_values, READ_INFO &read_info,
842
String &row_tag __attribute__((__unused__)),
844
bool ignore_check_option_errors)
846
List_iterator_fast<Item> it(fields_vars);
848
TABLE *table= table_list->table;
849
bool no_trans_update_stmt;
850
CHARSET_INFO *cs= read_info.read_charset;
852
no_trans_update_stmt= !table->file->has_transactions();
854
for ( ; ; it.rewind())
858
thd->send_kill_message();
862
// read row tag and save values into tag list
863
if (read_info.read_xml())
866
List_iterator_fast<XML_TAG> xmlit(read_info.taglist);
871
restore_record(table, s->default_values);
875
/* If this line is to be skipped we don't want to fill field or var */
879
/* find field in tag list */
883
while(tag && strcmp(tag->field.c_ptr(), item->name) != 0)
886
if (!tag) // found null
888
if (item->type() == Item::FIELD_ITEM)
890
Field *field= ((Item_field *) item)->field;
893
if (field == table->next_number_field)
894
table->auto_increment_field_not_null= true;
895
if (!field->maybe_null())
897
if (field->type() == FIELD_TYPE_TIMESTAMP)
898
((Field_timestamp *) field)->set_time();
899
else if (field != table->next_number_field)
900
field->set_warning(MYSQL_ERROR::WARN_LEVEL_WARN,
901
ER_WARN_NULL_TO_NOTNULL, 1);
905
((Item_user_var_as_out_param *) item)->set_null_value(cs);
909
if (item->type() == Item::FIELD_ITEM)
912
Field *field= ((Item_field *)item)->field;
913
field->set_notnull();
914
if (field == table->next_number_field)
915
table->auto_increment_field_not_null= true;
916
field->store((char *) tag->value.ptr(), tag->value.length(), cs);
919
((Item_user_var_as_out_param *) item)->set_value(
920
(char *) tag->value.ptr(),
921
tag->value.length(), cs);
935
/* Have not read any field, thus input file is simply ended */
936
if (item == fields_vars.head())
939
for ( ; item; item= it++)
941
if (item->type() == Item::FIELD_ITEM)
944
QQ: We probably should not throw warning for each field.
945
But how about intention to always have the same number
946
of warnings in THD::cuted_fields (and get rid of cuted_fields
950
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
951
ER_WARN_TOO_FEW_RECORDS,
952
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
955
((Item_user_var_as_out_param *)item)->set_null_value(cs);
959
if (thd->killed || fill_record(thd, set_fields, set_values,
960
ignore_check_option_errors))
963
if (write_record(thd, table, &info))
967
We don't need to reset auto-increment field since we are restoring
968
its default value at the beginning of each loop iteration.
970
thd->transaction.stmt.modified_non_trans_table= no_trans_update_stmt;
973
return(test(read_info.error));
800
977
/* Unescape all escape characters, mark \N as null */
854
1032
line_term_ptr=(char*) "";
856
1034
enclosed_char= (enclosed_length=enclosed_par.length()) ?
857
(unsigned char) enclosed_par[0] : INT_MAX;
858
field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
859
line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
1035
(uchar) enclosed_par[0] : INT_MAX;
1036
field_term_char= field_term_length ? (uchar) field_term_ptr[0] : INT_MAX;
1037
line_term_char= line_term_length ? (uchar) line_term_ptr[0] : INT_MAX;
860
1038
error=eof=found_end_of_line=found_null=line_cuted=0;
861
1039
buff_length=tot_length;
864
1042
/* Set of a stack for unget if long terminators */
865
uint32_t length=cmax(field_term_length,line_term_length)+1;
1043
uint length=max(field_term_length,line_term_length)+1;
866
1044
set_if_bigger(length,line_start.length());
867
1045
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
869
if (!(buffer=(unsigned char*) my_malloc(buff_length+1,MYF(0))))
1047
if (!(buffer=(uchar*) my_malloc(buff_length+1,MYF(0))))
870
1048
error=1; /* purecov: inspected */
1427
Clear taglist from tags with a specified level
1429
int READ_INFO::clear_level(int level)
1431
List_iterator<XML_TAG> xmlit(taglist);
1435
while ((tag= xmlit++))
1437
if(tag->level >= level)
1448
Convert an XML entity to Unicode value.
1452
my_xml_entity_to_char(const char *name, uint length)
1456
if (!memcmp(name, "gt", length))
1458
if (!memcmp(name, "lt", length))
1461
else if (length == 3)
1463
if (!memcmp(name, "amp", length))
1466
else if (length == 4)
1468
if (!memcmp(name, "quot", length))
1470
if (!memcmp(name, "apos", length))
1478
@brief Convert newline, linefeed, tab to space
1480
@param chr character
1482
@details According to the "XML 1.0" standard,
1483
only space (#x20) characters, carriage returns,
1484
line feeds or tabs are considered as spaces.
1485
Convert all of them to space (#x20) for parsing simplicity.
1490
return (chr == '\t' || chr == '\r' || chr == '\n') ? ' ' : chr;
1495
Read an xml value: handle multibyte and xml escape
1497
int READ_INFO::read_value(int delim, String *val)
1502
for (chr= my_tospace(GET); chr != delim && chr != my_b_EOF; )
1505
if (my_mbcharlen(read_charset, chr) > 1)
1507
int i, ml= my_mbcharlen(read_charset, chr);
1508
for (i= 1; i < ml; i++)
1512
Don't use my_tospace() in the middle of a multi-byte character
1513
TODO: check that the multi-byte sequence is valid.
1516
if (chr == my_b_EOF)
1524
for (chr= my_tospace(GET) ; chr != ';' ; chr= my_tospace(GET))
1526
if (chr == my_b_EOF)
1530
if ((chr= my_xml_entity_to_char(tmp.ptr(), tmp.length())) >= 0)
1541
chr= my_tospace(GET);
1548
Read a record in xml format
1549
tags and attributes are stored in taglist
1550
when tag set in ROWS IDENTIFIED BY is closed, we are ready and return
1552
int READ_INFO::read_xml()
1554
int chr, chr2, chr3;
1556
String tag, attribute, value;
1560
attribute.length(0);
1563
for (chr= my_tospace(GET); chr != my_b_EOF ; )
1566
case '<': /* read tag */
1567
/* TODO: check if this is a comment <!-- comment --> */
1568
chr= my_tospace(GET);
1574
if(chr2 == '-' && chr3 == '-')
1578
chr= my_tospace(GET);
1580
while(chr != '>' || chr2 != '-' || chr3 != '-')
1587
else if (chr2 == '-')
1592
chr= my_tospace(GET);
1593
if (chr == my_b_EOF)
1601
while(chr != '>' && chr != ' ' && chr != '/' && chr != my_b_EOF)
1603
if(chr != delim) /* fix for the '<field name =' format */
1605
chr= my_tospace(GET);
1608
if(chr == ' ' || chr == '>')
1611
clear_level(level + 1);
1620
case ' ': /* read attribute */
1621
while(chr == ' ') /* skip blanks */
1622
chr= my_tospace(GET);
1627
while(chr != '=' && chr != '/' && chr != '>' && chr != my_b_EOF)
1629
attribute.append(chr);
1630
chr= my_tospace(GET);
1634
case '>': /* end tag - read tag value */
1636
chr= read_value('<', &value);
1640
/* save value to list */
1641
if(tag.length() > 0 && value.length() > 0)
1642
taglist.push_front( new XML_TAG(level, tag, value));
1646
attribute.length(0);
1649
case '/': /* close tag */
1651
chr= my_tospace(GET);
1652
if(chr != '>') /* if this is an empty tag <tag /> */
1653
tag.length(0); /* we should keep tag value */
1654
while(chr != '>' && chr != my_b_EOF)
1657
chr= my_tospace(GET);
1660
if((tag.length() == line_term_length -2) &&
1661
(strncmp(tag.c_ptr_safe(), line_term_ptr + 1, tag.length()) == 0))
1662
return(0); //normal return
1664
chr= my_tospace(GET);
1667
case '=': /* attribute name end - read the value */
1668
//check for tag field and attribute name
1669
if(!memcmp(tag.c_ptr_safe(), STRING_WITH_LEN("field")) &&
1670
!memcmp(attribute.c_ptr_safe(), STRING_WITH_LEN("name")))
1673
this is format <field name="xx">xx</field>
1674
where actual fieldname is in attribute
1676
delim= my_tospace(GET);
1678
attribute.length(0);
1679
chr= '<'; /* we pretend that it is a tag */
1686
if (chr == my_b_EOF)
1688
if(chr == '"' || chr == '\'')
1694
delim= ' '; /* no delimiter, use space */
1698
chr= read_value(delim, &value);
1699
if(attribute.length() > 0 && value.length() > 0)
1700
taglist.push_front(new XML_TAG(level + 1, attribute, value));
1702
attribute.length(0);
1705
chr= my_tospace(GET);
1709
chr= my_tospace(GET);