~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Mark Atwood
  • Date: 2008-10-16 11:33:16 UTC
  • mto: (520.1.13 drizzle)
  • mto: This revision was merged to the branch mainline in revision 530.
  • Revision ID: mark@fallenpegasus.com-20081016113316-ff6jdt31ck90sjdh
an implemention of the errmsg plugin

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
 
#include <config.h>
20
 
 
21
 
#include <drizzled/sql_load.h>
22
 
#include <drizzled/error.h>
23
 
#include <drizzled/data_home.h>
24
 
#include <drizzled/session.h>
25
 
#include <drizzled/sql_base.h>
26
 
#include <drizzled/field/epoch.h>
27
 
#include <drizzled/internal/my_sys.h>
28
 
#include <drizzled/internal/iocache.h>
29
 
#include <drizzled/plugin/storage_engine.h>
30
 
 
31
 
#include <sys/stat.h>
32
 
#include <fcntl.h>
33
 
#include <algorithm>
34
 
#include <climits>
35
 
#include <boost/filesystem.hpp>
36
 
 
37
 
namespace fs=boost::filesystem;
38
 
using namespace std;
39
 
namespace drizzled
40
 
{
 
19
#include <drizzled/server_includes.h>
 
20
#include "sql_repl.h"
 
21
#include <drizzled/drizzled_error_messages.h>
 
22
 
41
23
 
42
24
class READ_INFO {
43
 
  int   cursor;
44
 
  unsigned char *buffer;                /* Buffer for read text */
45
 
  unsigned char *end_of_buff;           /* Data in bufferts ends here */
46
 
  size_t buff_length;                   /* Length of buffert */
47
 
  size_t max_length;                    /* Max length of row */
 
25
  File  file;
 
26
  unsigned char *buffer,                        /* Buffer for read text */
 
27
        *end_of_buff;                   /* Data in bufferts ends here */
 
28
  uint  buff_length,                    /* Length of buffert */
 
29
        max_length;                     /* Max length of row */
48
30
  char  *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
49
31
  uint  field_term_length,line_term_length,enclosed_length;
50
32
  int   field_term_char,line_term_char,enclosed_char,escape_char;
51
33
  int   *stack,*stack_pos;
52
34
  bool  found_end_of_line,start_of_line,eof;
53
35
  bool  need_end_io_cache;
54
 
  internal::IO_CACHE cache;
 
36
  IO_CACHE cache;
 
37
  NET *io_net;
55
38
 
56
39
public:
57
40
  bool error,line_cuted,found_null,enclosed;
59
42
        *row_end;                       /* Found row ends here */
60
43
  const CHARSET_INFO *read_charset;
61
44
 
62
 
  READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
 
45
  READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
63
46
            String &field_term,String &line_start,String &line_term,
64
 
            String &enclosed,int escape, bool is_fifo);
 
47
            String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
65
48
  ~READ_INFO();
66
49
  int read_field();
67
50
  int read_fixed_length(void);
76
59
  */
77
60
  void end_io_cache()
78
61
  {
79
 
    cache.end_io_cache();
 
62
    ::end_io_cache(&cache);
80
63
    need_end_io_cache = 0;
81
64
  }
82
65
 
83
66
  /*
84
67
    Either this method, or we need to make cache public
85
 
    Arg must be set from load() since constructor does not see
86
 
    either the table or Session value
 
68
    Arg must be set from mysql_load() since constructor does not see
 
69
    either the table or THD value
87
70
  */
88
71
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
89
72
};
90
73
 
91
 
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
74
static int read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
92
75
                             List<Item> &fields_vars, List<Item> &set_fields,
93
76
                             List<Item> &set_values, READ_INFO &read_info,
94
77
                             uint32_t skip_lines,
95
78
                             bool ignore_check_option_errors);
96
 
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
79
static int read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
97
80
                          List<Item> &fields_vars, List<Item> &set_fields,
98
81
                          List<Item> &set_values, READ_INFO &read_info,
99
82
                          String &enclosed, uint32_t skip_lines,
100
83
                          bool ignore_check_option_errors);
101
84
 
 
85
static bool write_execute_load_query_log_event(THD *thd,
 
86
                                               bool duplicates, bool ignore,
 
87
                                               bool transactional_table,
 
88
                                               THD::killed_state killed_status);
102
89
 
103
90
/*
104
91
  Execute LOAD DATA query
105
92
 
106
93
  SYNOPSYS
107
 
    load()
108
 
      session - current thread
109
 
      ex  - file_exchange object representing source cursor and its parsing rules
 
94
    mysql_load()
 
95
      thd - current thread
 
96
      ex  - sql_exchange object representing source file and its parsing rules
110
97
      table_list  - list of tables to which we are loading data
111
98
      fields_vars - list of fields and variables to which we read
112
 
                    data from cursor
 
99
                    data from file
113
100
      set_fields  - list of fields mentioned in set clause
114
101
      set_values  - expressions to assign to fields in previous list
115
102
      handle_duplicates - indicates whenever we should emit error or
116
103
                          replace row if we will meet duplicates.
117
104
      ignore -          - indicates whenever we should ignore duplicates
 
105
      read_file_from_client - is this LOAD DATA LOCAL ?
118
106
 
119
107
  RETURN VALUES
120
108
    true - error / false - success
121
109
*/
122
110
 
123
 
int load(Session *session,file_exchange *ex,TableList *table_list,
 
111
int mysql_load(THD *thd,sql_exchange *ex,TableList *table_list,
124
112
                List<Item> &fields_vars, List<Item> &set_fields,
125
113
                List<Item> &set_values,
126
 
                enum enum_duplicates handle_duplicates, bool ignore)
 
114
                enum enum_duplicates handle_duplicates, bool ignore,
 
115
                bool read_file_from_client)
127
116
{
128
 
  int file;
 
117
  char name[FN_REFLEN];
 
118
  File file;
129
119
  Table *table= NULL;
130
120
  int error;
131
121
  String *field_term=ex->field_term,*escaped=ex->escaped;
132
122
  String *enclosed=ex->enclosed;
133
123
  bool is_fifo=0;
134
 
 
135
 
  assert(table_list->getSchemaName()); // This should never be null
136
 
 
 
124
  LOAD_FILE_INFO lf_info;
 
125
  char *db = table_list->db;                    // This is never null
137
126
  /*
138
 
    If path for cursor is not defined, we will use the current database.
 
127
    If path for file is not defined, we will use the current database.
139
128
    If this is not set, we will use the directory where the table to be
140
129
    loaded is located
141
130
  */
142
 
  util::string::const_shared_ptr schema(session->schema());
143
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
144
 
  assert(tdb);
 
131
  char *tdb= thd->db ? thd->db : db;            // Result is never null
145
132
  uint32_t skip_lines= ex->skip_lines;
146
133
  bool transactional_table;
147
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
134
  THD::killed_state killed_status= THD::NOT_KILLED;
148
135
 
149
 
  /* Escape and enclosed character may be a utf8 4-byte character */
150
 
  if (escaped->length() > 4 || enclosed->length() > 4)
 
136
  if (escaped->length() > 1 || enclosed->length() > 1)
151
137
  {
152
 
    my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
 
138
    my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
 
139
               MYF(0));
153
140
    return(true);
154
141
  }
155
 
 
156
 
  if (session->openTablesLock(table_list))
 
142
  if (open_and_lock_tables(thd, table_list))
157
143
    return(true);
158
 
 
159
 
  if (setup_tables_and_check_access(session, &session->getLex()->select_lex.context,
160
 
                                    &session->getLex()->select_lex.top_join_list,
 
144
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
 
145
                                    &thd->lex->select_lex.top_join_list,
161
146
                                    table_list,
162
 
                                    &session->getLex()->select_lex.leaf_tables, true))
 
147
                                    &thd->lex->select_lex.leaf_tables, true))
163
148
     return(-1);
164
149
 
165
150
  /*
170
155
    table is marked to be 'used for insert' in which case we should never
171
156
    mark this table as 'const table' (ie, one that has only one row).
172
157
  */
173
 
  if (unique_table(table_list, table_list->next_global))
 
158
  if (unique_table(thd, table_list, table_list->next_global, 0))
174
159
  {
175
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
160
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
176
161
    return(true);
177
162
  }
178
163
 
179
164
  table= table_list->table;
180
 
  transactional_table= table->cursor->has_transactions();
 
165
  transactional_table= table->file->has_transactions();
181
166
 
182
167
  if (!fields_vars.elements)
183
168
  {
184
169
    Field **field;
185
 
    for (field= table->getFields(); *field ; field++)
 
170
    for (field=table->field; *field ; field++)
186
171
      fields_vars.push_back(new Item_field(*field));
187
 
    table->setWriteSet();
 
172
    bitmap_set_all(table->write_set);
188
173
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
189
174
    /*
190
175
      Let us also prepare SET clause, altough it is probably empty
191
176
      in this case.
192
177
    */
193
 
    if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
194
 
        setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
178
    if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
179
        setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
195
180
      return(true);
196
181
  }
197
182
  else
198
183
  {                                             // Part field list
199
184
    /* TODO: use this conds for 'WITH CHECK OPTIONS' */
200
 
    if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
201
 
        setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
202
 
        check_that_all_fields_are_given_values(session, table, table_list))
 
185
    if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
 
186
        setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
187
        check_that_all_fields_are_given_values(thd, table, table_list))
203
188
      return(true);
204
189
    /*
205
190
      Check whenever TIMESTAMP field with auto-set feature specified
207
192
    */
208
193
    if (table->timestamp_field)
209
194
    {
210
 
      if (table->isWriteSet(table->timestamp_field->position()))
211
 
      {
 
195
      if (bitmap_is_set(table->write_set,
 
196
                        table->timestamp_field->field_index))
212
197
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
213
 
      }
214
198
      else
215
199
      {
216
 
        table->setWriteSet(table->timestamp_field->position());
 
200
        bitmap_set_bit(table->write_set,
 
201
                       table->timestamp_field->field_index);
217
202
      }
218
203
    }
219
204
    /* Fix the expressions in SET clause */
220
 
    if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
205
    if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
221
206
      return(true);
222
207
  }
223
208
 
224
209
  table->mark_columns_needed_for_insert();
225
210
 
226
 
  size_t tot_length=0;
 
211
  uint32_t tot_length=0;
227
212
  bool use_blobs= 0, use_vars= 0;
228
 
  List<Item>::iterator it(fields_vars.begin());
 
213
  List_iterator_fast<Item> it(fields_vars);
229
214
  Item *item;
230
215
 
231
216
  while ((item= it++))
258
243
    return(true);
259
244
  }
260
245
 
261
 
  fs::path to_file(ex->file_name);
262
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
263
 
  if (not to_file.has_root_directory())
 
246
  /* We can't give an error in the middle when using LOCAL files */
 
247
  if (read_file_from_client && handle_duplicates == DUP_ERROR)
 
248
    ignore= 1;
 
249
 
 
250
  if (read_file_from_client)
264
251
  {
265
 
    int count_elements= 0;
266
 
    for (fs::path::iterator iter= to_file.begin();
267
 
         iter != to_file.end();
268
 
         ++iter, ++count_elements)
269
 
    { }
270
 
 
271
 
    if (count_elements == 1)
272
 
    {
273
 
      target_path /= tdb;
274
 
    }
275
 
    target_path /= to_file;
 
252
    (void)net_request_file(&thd->net,ex->file_name);
 
253
    file = -1;
276
254
  }
277
255
  else
278
256
  {
279
 
    target_path= to_file;
280
 
  }
281
 
 
282
 
  if (not secure_file_priv.string().empty())
283
 
  {
284
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
285
 
    {
286
 
      /* Read only allowed from within dir specified by secure_file_priv */
287
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
257
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
258
    ex->file_name+=dirname_length(ex->file_name);
 
259
#endif
 
260
    if (!dirname_length(ex->file_name))
 
261
    {
 
262
      strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NULL);
 
263
      (void) fn_format(name, ex->file_name, name, "",
 
264
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
265
    }
 
266
    else
 
267
    {
 
268
      (void) fn_format(name, ex->file_name, mysql_real_data_home, "",
 
269
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
270
 
 
271
      if (opt_secure_file_priv &&
 
272
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
273
      {
 
274
        /* Read only allowed from within dir specified by secure_file_priv */
 
275
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
276
        return(true);
 
277
      }
 
278
 
 
279
      struct stat stat_info;
 
280
      if (stat(name,&stat_info))
 
281
        return(true);
 
282
 
 
283
      // if we are not in slave thread, the file must be:
 
284
      if (!thd->slave_thread &&
 
285
          !((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
286
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
287
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
288
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
289
      {
 
290
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
291
        return(true);
 
292
      }
 
293
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
294
        is_fifo = 1;
 
295
    }
 
296
    if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
288
297
      return(true);
289
 
    }
290
 
  }
291
 
 
292
 
  struct stat stat_info;
293
 
  if (stat(target_path.file_string().c_str(), &stat_info))
294
 
  {
295
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
296
 
    return(true);
297
 
  }
298
 
 
299
 
  // if we are not in slave thread, the cursor must be:
300
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
301
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
302
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
303
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
304
 
  {
305
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
306
 
    return(true);
307
 
  }
308
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
309
 
    is_fifo = 1;
310
 
 
311
 
 
312
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
313
 
  {
314
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
315
 
    return(true);
316
 
  }
317
 
  CopyInfo info;
 
298
  }
 
299
 
 
300
  COPY_INFO info;
318
301
  memset(&info, 0, sizeof(info));
319
302
  info.ignore= ignore;
320
303
  info.handle_duplicates=handle_duplicates;
321
304
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
322
305
 
323
 
  identifier::Schema identifier(*schema);
324
 
  READ_INFO read_info(file, tot_length,
325
 
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
326
 
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
327
 
                      info.escape_char, is_fifo);
 
306
  READ_INFO read_info(file,tot_length,
 
307
                      ex->cs ? ex->cs : thd->variables.collation_database,
 
308
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
 
309
                      info.escape_char, read_file_from_client, is_fifo);
328
310
  if (read_info.error)
329
311
  {
330
312
    if  (file >= 0)
331
 
      internal::my_close(file,MYF(0));                  // no files in net reading
 
313
      my_close(file,MYF(0));                    // no files in net reading
332
314
    return(true);                               // Can't allocate buffers
333
315
  }
334
316
 
335
 
  /*
336
 
   * Per the SQL standard, inserting NULL into a NOT NULL
337
 
   * field requires an error to be thrown.
338
 
   *
339
 
   * @NOTE
340
 
   *
341
 
   * NULL check and handling occurs in field_conv.cc
342
 
   */
343
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
344
 
  session->cuted_fields=0L;
 
317
  if (mysql_bin_log.is_open())
 
318
  {
 
319
    lf_info.thd = thd;
 
320
    lf_info.wrote_create_file = 0;
 
321
    lf_info.last_pos_in_file = HA_POS_ERROR;
 
322
    lf_info.log_delayed= transactional_table;
 
323
    read_info.set_io_cache_arg((void*) &lf_info);
 
324
  }
 
325
 
 
326
  thd->count_cuted_fields= CHECK_FIELD_WARN;            /* calc cuted fields */
 
327
  thd->cuted_fields=0L;
345
328
  /* Skip lines if there is a line terminator */
346
329
  if (ex->line_term->length())
347
330
  {
360
343
    table->next_number_field=table->found_next_number_field;
361
344
    if (ignore ||
362
345
        handle_duplicates == DUP_REPLACE)
363
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
346
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
364
347
    if (handle_duplicates == DUP_REPLACE)
365
 
        table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
366
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
348
        table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
349
    table->file->ha_start_bulk_insert((ha_rows) 0);
367
350
    table->copy_blobs=1;
368
351
 
369
 
    session->setAbortOnWarning(true);
 
352
    thd->abort_on_warning= true;
370
353
 
371
354
    if (!field_term->length() && !enclosed->length())
372
 
      error= read_fixed_length(session, info, table_list, fields_vars,
 
355
      error= read_fixed_length(thd, info, table_list, fields_vars,
373
356
                               set_fields, set_values, read_info,
374
357
                               skip_lines, ignore);
375
358
    else
376
 
      error= read_sep_field(session, info, table_list, fields_vars,
 
359
      error= read_sep_field(thd, info, table_list, fields_vars,
377
360
                            set_fields, set_values, read_info,
378
361
                            *enclosed, skip_lines, ignore);
379
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
362
    if (table->file->ha_end_bulk_insert() && !error)
380
363
    {
381
 
      table->print_error(errno, MYF(0));
 
364
      table->file->print_error(my_errno, MYF(0));
382
365
      error= 1;
383
366
    }
384
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
385
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
367
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
368
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
386
369
    table->next_number_field=0;
387
370
  }
388
371
  if (file >= 0)
389
 
    internal::my_close(file,MYF(0));
 
372
    my_close(file,MYF(0));
390
373
  free_blobs(table);                            /* if pack_blob was used */
391
374
  table->copy_blobs=0;
392
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
393
 
  /*
 
375
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
 
376
  /* 
394
377
     simulated killing in the middle of per-row loop
395
378
     must be effective for binlogging
396
379
  */
397
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
380
  killed_status= (error == 0)? THD::NOT_KILLED : thd->killed;
398
381
  if (error)
399
382
  {
 
383
    if (read_file_from_client)
 
384
      while (!read_info.next_line())
 
385
        ;
 
386
 
 
387
    if (mysql_bin_log.is_open())
 
388
    {
 
389
      {
 
390
        /*
 
391
          Make sure last block (the one which caused the error) gets
 
392
          logged.  This is needed because otherwise after write of (to
 
393
          the binlog, not to read_info (which is a cache))
 
394
          Delete_file_log_event the bad block will remain in read_info
 
395
          (because pre_read is not called at the end of the last
 
396
          block; remember pre_read is called whenever a new block is
 
397
          read from disk).  At the end of mysql_load(), the destructor
 
398
          of read_info will call end_io_cache() which will flush
 
399
          read_info, so we will finally have this in the binlog:
 
400
 
 
401
          Append_block # The last successfull block
 
402
          Delete_file
 
403
          Append_block # The failing block
 
404
          which is nonsense.
 
405
          Or could also be (for a small file)
 
406
          Create_file  # The failing block
 
407
          which is nonsense (Delete_file is not written in this case, because:
 
408
          Create_file has not been written, so Delete_file is not written, then
 
409
          when read_info is destroyed end_io_cache() is called which writes
 
410
          Create_file.
 
411
        */
 
412
        read_info.end_io_cache();
 
413
        /* If the file was not empty, wrote_create_file is true */
 
414
        if (lf_info.wrote_create_file)
 
415
        {
 
416
          if (thd->transaction.stmt.modified_non_trans_table)
 
417
            write_execute_load_query_log_event(thd, handle_duplicates,
 
418
                                               ignore, transactional_table,
 
419
                                               killed_status);
 
420
          else
 
421
          {
 
422
            Delete_file_log_event d(thd, db, transactional_table);
 
423
            d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
424
            mysql_bin_log.write(&d);
 
425
          }
 
426
        }
 
427
      }
 
428
    }
400
429
    error= -1;                          // Error on read
401
430
    goto err;
402
431
  }
403
 
 
404
 
  char msg[FN_REFLEN];
405
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
406
 
           (info.records - info.copied), session->cuted_fields);
407
 
 
408
 
  if (session->transaction.stmt.hasModifiedNonTransData())
409
 
    session->transaction.all.markModifiedNonTransData();
 
432
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
433
          (uint32_t) (info.records - info.copied), (uint32_t) thd->cuted_fields);
 
434
 
 
435
  if (thd->transaction.stmt.modified_non_trans_table)
 
436
    thd->transaction.all.modified_non_trans_table= true;
 
437
 
 
438
  if (mysql_bin_log.is_open())
 
439
  {
 
440
    /*
 
441
      We need to do the job that is normally done inside
 
442
      binlog_query() here, which is to ensure that the pending event
 
443
      is written before tables are unlocked and before any other
 
444
      events are written.  We also need to update the table map
 
445
      version for the binary log to mark that table maps are invalid
 
446
      after this point.
 
447
     */
 
448
    if (thd->current_stmt_binlog_row_based)
 
449
      thd->binlog_flush_pending_rows_event(true);
 
450
    else
 
451
    {
 
452
      /*
 
453
        As already explained above, we need to call end_io_cache() or the last
 
454
        block will be logged only after Execute_load_query_log_event (which is
 
455
        wrong), when read_info is destroyed.
 
456
      */
 
457
      read_info.end_io_cache();
 
458
      if (lf_info.wrote_create_file)
 
459
      {
 
460
        write_execute_load_query_log_event(thd, handle_duplicates, ignore,
 
461
                                           transactional_table,killed_status);
 
462
      }
 
463
    }
 
464
  }
410
465
 
411
466
  /* ok to client sent only after binlog write and engine commit */
412
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
467
  my_ok(thd, info.copied + info.deleted, 0L, name);
413
468
err:
414
469
  assert(transactional_table || !(info.copied || info.deleted) ||
415
 
              session->transaction.stmt.hasModifiedNonTransData());
416
 
  table->cursor->ha_release_auto_increment();
 
470
              thd->transaction.stmt.modified_non_trans_table);
 
471
  table->file->ha_release_auto_increment();
417
472
  table->auto_increment_field_not_null= false;
418
 
  session->setAbortOnWarning(false);
419
 
 
 
473
  thd->abort_on_warning= 0;
420
474
  return(error);
421
475
}
422
476
 
423
477
 
 
478
/* Not a very useful function; just to avoid duplication of code */
 
479
static bool write_execute_load_query_log_event(THD *thd,
 
480
                                               bool duplicates, bool ignore,
 
481
                                               bool transactional_table,
 
482
                                               THD::killed_state killed_err_arg)
 
483
{
 
484
  Execute_load_query_log_event
 
485
    e(thd, thd->query, thd->query_length,
 
486
      (char*)thd->lex->fname_start - (char*)thd->query,
 
487
      (char*)thd->lex->fname_end - (char*)thd->query,
 
488
      (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
 
489
      (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
 
490
      transactional_table, false, killed_err_arg);
 
491
  e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
492
  return mysql_bin_log.write(&e);
 
493
}
 
494
 
 
495
 
424
496
/****************************************************************************
425
497
** Read of rows of fixed size + optional garage + optonal newline
426
498
****************************************************************************/
427
499
 
428
500
static int
429
 
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
501
read_fixed_length(THD *thd, COPY_INFO &info, TableList *table_list,
430
502
                  List<Item> &fields_vars, List<Item> &set_fields,
431
503
                  List<Item> &set_values, READ_INFO &read_info,
432
504
                  uint32_t skip_lines, bool ignore_check_option_errors)
433
505
{
434
 
  List<Item>::iterator it(fields_vars.begin());
 
506
  List_iterator_fast<Item> it(fields_vars);
435
507
  Item_field *sql_field;
436
508
  Table *table= table_list->table;
437
509
  uint64_t id;
438
510
  bool err;
439
511
 
440
512
  id= 0;
441
 
 
 
513
 
442
514
  while (!read_info.read_fixed_length())
443
515
  {
444
 
    if (session->getKilled())
 
516
    if (thd->killed)
445
517
    {
446
 
      session->send_kill_message();
 
518
      thd->send_kill_message();
447
519
      return(1);
448
520
    }
449
521
    if (skip_lines)
457
529
      skip_lines--;
458
530
      continue;
459
531
    }
460
 
    it= fields_vars.begin();
 
532
    it.rewind();
461
533
    unsigned char *pos=read_info.row_start;
462
 
#ifdef HAVE_VALGRIND
 
534
#ifdef HAVE_purify
463
535
    read_info.row_end[0]=0;
464
536
#endif
465
537
 
466
 
    table->restoreRecordAsDefault();
 
538
    restore_record(table, s->default_values);
467
539
    /*
468
540
      There is no variables in fields_vars list in this format so
469
541
      this conversion is safe.
470
542
    */
471
543
    while ((sql_field= (Item_field*) it++))
472
544
    {
473
 
      Field *field= sql_field->field;
 
545
      Field *field= sql_field->field;                  
474
546
      if (field == table->next_number_field)
475
547
        table->auto_increment_field_not_null= true;
476
548
      /*
482
554
 
483
555
      if (pos == read_info.row_end)
484
556
      {
485
 
        session->cuted_fields++;                        /* Not enough fields */
486
 
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
487
 
                            ER_WARN_TOO_FEW_RECORDS,
488
 
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
489
 
 
490
 
        if (not field->maybe_null() and field->is_timestamp())
491
 
            ((field::Epoch::pointer) field)->set_time();
 
557
        thd->cuted_fields++;                    /* Not enough fields */
 
558
        push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
559
                            ER_WARN_TOO_FEW_RECORDS, 
 
560
                            ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
 
561
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
562
            ((Field_timestamp*) field)->set_time();
492
563
      }
493
564
      else
494
565
      {
495
566
        uint32_t length;
496
567
        unsigned char save_chr;
497
 
        if ((length=(uint32_t) (read_info.row_end-pos)) >
 
568
        if ((length=(uint) (read_info.row_end-pos)) >
498
569
            field->field_length)
499
 
        {
500
570
          length=field->field_length;
501
 
        }
502
 
        save_chr=pos[length];
503
 
        pos[length]='\0'; // Add temp null terminator for store()
 
571
        save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
504
572
        field->store((char*) pos,length,read_info.read_charset);
505
573
        pos[length]=save_chr;
506
574
        if ((pos+=length) > read_info.row_end)
509
577
    }
510
578
    if (pos != read_info.row_end)
511
579
    {
512
 
      session->cuted_fields++;                  /* To long row */
513
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
514
 
                          ER_WARN_TOO_MANY_RECORDS,
515
 
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
 
580
      thd->cuted_fields++;                      /* To long row */
 
581
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
582
                          ER_WARN_TOO_MANY_RECORDS, 
 
583
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
516
584
    }
517
585
 
518
 
    if (session->getKilled() ||
519
 
        fill_record(session, set_fields, set_values,
 
586
    if (thd->killed ||
 
587
        fill_record(thd, set_fields, set_values,
520
588
                    ignore_check_option_errors))
521
589
      return(1);
522
590
 
523
 
    err= write_record(session, table, &info);
 
591
    err= write_record(thd, table, &info);
524
592
    table->auto_increment_field_not_null= false;
525
593
    if (err)
526
594
      return(1);
527
 
 
 
595
   
528
596
    /*
529
597
      We don't need to reset auto-increment field since we are restoring
530
598
      its default value at the beginning of each loop iteration.
533
601
      break;
534
602
    if (read_info.line_cuted)
535
603
    {
536
 
      session->cuted_fields++;                  /* To long row */
537
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
538
 
                          ER_WARN_TOO_MANY_RECORDS,
539
 
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
 
604
      thd->cuted_fields++;                      /* To long row */
 
605
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
606
                          ER_WARN_TOO_MANY_RECORDS, 
 
607
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
540
608
    }
541
 
    session->row_count++;
 
609
    thd->row_count++;
542
610
  }
543
611
  return(test(read_info.error));
544
612
}
546
614
 
547
615
 
548
616
static int
549
 
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
617
read_sep_field(THD *thd, COPY_INFO &info, TableList *table_list,
550
618
               List<Item> &fields_vars, List<Item> &set_fields,
551
619
               List<Item> &set_values, READ_INFO &read_info,
552
620
               String &enclosed, uint32_t skip_lines,
553
621
               bool ignore_check_option_errors)
554
622
{
555
 
  List<Item>::iterator it(fields_vars.begin());
 
623
  List_iterator_fast<Item> it(fields_vars);
556
624
  Item *item;
557
625
  Table *table= table_list->table;
558
626
  uint32_t enclosed_length;
562
630
  enclosed_length=enclosed.length();
563
631
  id= 0;
564
632
 
565
 
  for (;;it= fields_vars.begin())
 
633
  for (;;it.rewind())
566
634
  {
567
 
    if (session->getKilled())
 
635
    if (thd->killed)
568
636
    {
569
 
      session->send_kill_message();
 
637
      thd->send_kill_message();
570
638
      return(1);
571
639
    }
572
640
 
573
 
    table->restoreRecordAsDefault();
 
641
    restore_record(table, s->default_values);
574
642
 
575
643
    while ((item= it++))
576
644
    {
586
654
        continue;
587
655
 
588
656
      pos=read_info.row_start;
589
 
      length=(uint32_t) (read_info.row_end-pos);
 
657
      length=(uint) (read_info.row_end-pos);
590
658
 
591
659
      real_item= item->real_item();
592
660
 
600
668
          if (field->reset())
601
669
          {
602
670
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
603
 
                     session->row_count);
 
671
                     thd->row_count);
604
672
            return(1);
605
673
          }
606
674
          field->set_null();
607
 
          if (not field->maybe_null())
 
675
          if (!field->maybe_null())
608
676
          {
609
 
            if (field->is_timestamp())
610
 
            {
611
 
              ((field::Epoch::pointer) field)->set_time();
612
 
            }
 
677
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
678
              ((Field_timestamp*) field)->set_time();
613
679
            else if (field != table->next_number_field)
614
 
            {
615
 
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
616
 
            }
 
680
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
681
                                 ER_WARN_NULL_TO_NOTNULL, 1);
617
682
          }
618
683
        }
619
684
        else if (item->type() == Item::STRING_ITEM)
659
724
    }
660
725
    if (item)
661
726
    {
662
 
      /* Have not read any field, thus input cursor is simply ended */
 
727
      /* Have not read any field, thus input file is simply ended */
663
728
      if (item == fields_vars.head())
664
729
        break;
665
730
      for (; item ; item= it++)
671
736
          if (field->reset())
672
737
          {
673
738
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
674
 
                     session->row_count);
 
739
                     thd->row_count);
675
740
            return(1);
676
741
          }
677
 
          if (not field->maybe_null() and field->is_timestamp())
678
 
              ((field::Epoch::pointer) field)->set_time();
 
742
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
743
              ((Field_timestamp*) field)->set_time();
679
744
          /*
680
745
            QQ: We probably should not throw warning for each field.
681
746
            But how about intention to always have the same number
682
 
            of warnings in Session::cuted_fields (and get rid of cuted_fields
 
747
            of warnings in THD::cuted_fields (and get rid of cuted_fields
683
748
            in the end ?)
684
749
          */
685
 
          session->cuted_fields++;
686
 
          push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
750
          thd->cuted_fields++;
 
751
          push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
687
752
                              ER_WARN_TOO_FEW_RECORDS,
688
 
                              ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
 
753
                              ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
689
754
        }
690
755
        else if (item->type() == Item::STRING_ITEM)
691
756
        {
700
765
      }
701
766
    }
702
767
 
703
 
    if (session->getKilled() ||
704
 
        fill_record(session, set_fields, set_values,
 
768
    if (thd->killed ||
 
769
        fill_record(thd, set_fields, set_values,
705
770
                    ignore_check_option_errors))
706
771
      return(1);
707
772
 
708
 
    err= write_record(session, table, &info);
 
773
    err= write_record(thd, table, &info);
709
774
    table->auto_increment_field_not_null= false;
710
775
    if (err)
711
776
      return(1);
717
782
      break;
718
783
    if (read_info.line_cuted)
719
784
    {
720
 
      session->cuted_fields++;                  /* To long row */
721
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
722
 
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
723
 
                          session->row_count);
724
 
      if (session->getKilled())
 
785
      thd->cuted_fields++;                      /* To long row */
 
786
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
787
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS), 
 
788
                          thd->row_count);   
 
789
      if (thd->killed)
725
790
        return(1);
726
791
    }
727
 
    session->row_count++;
 
792
    thd->row_count++;
728
793
  }
729
794
  return(test(read_info.error));
730
795
}
742
807
  case 'r': return '\r';
743
808
  case 'b': return '\b';
744
809
  case '0': return 0;                           // Ascii null
745
 
  case 'Z': return '\032';                      // Win32 end of cursor
 
810
  case 'Z': return '\032';                      // Win32 end of file
746
811
  case 'N': found_null=1;
747
812
 
748
813
    /* fall through */
757
822
*/
758
823
 
759
824
 
760
 
READ_INFO::READ_INFO(int file_par, size_t tot_length,
761
 
                     const CHARSET_INFO * const cs,
 
825
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
762
826
                     String &field_term, String &line_start, String &line_term,
763
 
                     String &enclosed_par, int escape, bool is_fifo)
764
 
  :cursor(file_par),escape_char(escape)
 
827
                     String &enclosed_par, int escape, bool get_it_from_net,
 
828
                     bool is_fifo)
 
829
  :file(file_par),escape_char(escape)
765
830
{
766
831
  read_charset= cs;
767
832
  field_term_ptr=(char*) field_term.ptr();
795
860
 
796
861
 
797
862
  /* Set of a stack for unget if long terminators */
798
 
  size_t length= max(field_term_length,line_term_length)+1;
799
 
  set_if_bigger(length, line_start.length());
800
 
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
 
863
  uint32_t length=cmax(field_term_length,line_term_length)+1;
 
864
  set_if_bigger(length,line_start.length());
 
865
  stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
801
866
 
802
 
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
803
 
    error=1;
 
867
  if (!(buffer=(unsigned char*) my_malloc(buff_length+1,MYF(0))))
 
868
    error=1; /* purecov: inspected */
804
869
  else
805
870
  {
806
871
    end_of_buff=buffer+buff_length;
807
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
808
 
                            (false) ? internal::READ_NET :
809
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
810
 
                            MYF(MY_WME)))
 
872
    if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
 
873
                      (get_it_from_net) ? READ_NET :
 
874
                      (is_fifo ? READ_FIFO : READ_CACHE),0L,1,
 
875
                      MYF(MY_WME)))
811
876
    {
812
 
      free((unsigned char*) buffer);
 
877
      free((unsigned char*) buffer); /* purecov: inspected */
813
878
      error=1;
814
879
    }
815
880
    else
820
885
        manual assignment
821
886
      */
822
887
      need_end_io_cache = 1;
 
888
 
 
889
      if (get_it_from_net)
 
890
        cache.read_function = _my_b_net_read;
 
891
 
 
892
      if (mysql_bin_log.is_open())
 
893
        cache.pre_read = cache.pre_close =
 
894
          (IO_CACHE_CALLBACK) log_loaded_block;
823
895
    }
824
896
  }
825
897
}
830
902
  if (!error)
831
903
  {
832
904
    if (need_end_io_cache)
833
 
      cache.end_io_cache();
834
 
    free(buffer);
 
905
      ::end_io_cache(&cache);
 
906
    free((unsigned char*) buffer);
835
907
    error=1;
836
908
  }
837
909
}
900
972
    while ( to < end_of_buff)
901
973
    {
902
974
      chr = GET;
 
975
#ifdef USE_MB
903
976
      if ((my_mbcharlen(read_charset, chr) > 1) &&
904
977
          to+my_mbcharlen(read_charset, chr) <= end_of_buff)
905
978
      {
906
 
        unsigned char* p = (unsigned char*)to;
907
 
        *to++ = chr;
908
 
        int ml = my_mbcharlen(read_charset, chr);
909
 
        int i;
910
 
        for (i=1; i<ml; i++) {
911
 
          chr = GET;
912
 
          if (chr == my_b_EOF)
913
 
            goto found_eof;
914
 
          *to++ = chr;
915
 
        }
916
 
        if (my_ismbchar(read_charset,
917
 
              (const char *)p,
918
 
              (const char *)to))
919
 
          continue;
920
 
        for (i=0; i<ml; i++)
921
 
          PUSH((unsigned char) *--to);
922
 
        chr = GET;
 
979
          unsigned char* p = (unsigned char*)to;
 
980
          *to++ = chr;
 
981
          int ml = my_mbcharlen(read_charset, chr);
 
982
          int i;
 
983
          for (i=1; i<ml; i++) {
 
984
              chr = GET;
 
985
              if (chr == my_b_EOF)
 
986
                  goto found_eof;
 
987
              *to++ = chr;
 
988
          }
 
989
          if (my_ismbchar(read_charset,
 
990
                          (const char *)p,
 
991
                          (const char *)to))
 
992
            continue;
 
993
          for (i=0; i<ml; i++)
 
994
            PUSH((unsigned char) *--to);
 
995
          chr = GET;
923
996
      }
 
997
#endif
924
998
      if (chr == my_b_EOF)
925
 
        goto found_eof;
 
999
        goto found_eof;
926
1000
      if (chr == escape_char)
927
1001
      {
928
 
        if ((chr=GET) == my_b_EOF)
929
 
        {
930
 
          *to++= (unsigned char) escape_char;
931
 
          goto found_eof;
932
 
        }
 
1002
        if ((chr=GET) == my_b_EOF)
 
1003
        {
 
1004
          *to++= (unsigned char) escape_char;
 
1005
          goto found_eof;
 
1006
        }
933
1007
        /*
934
1008
          When escape_char == enclosed_char, we treat it like we do for
935
1009
          handling quotes in SQL parsing -- you can double-up the
948
1022
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
949
1023
      if (chr == line_term_char)
950
1024
#else
951
 
        if (chr == line_term_char && found_enclosed_char == INT_MAX)
 
1025
      if (chr == line_term_char && found_enclosed_char == INT_MAX)
952
1026
#endif
953
 
        {
954
 
          if (terminator(line_term_ptr,line_term_length))
955
 
          {                                     // Maybe unexpected linefeed
956
 
            enclosed=0;
957
 
            found_end_of_line=1;
958
 
            row_start=buffer;
959
 
            row_end=  to;
960
 
            return 0;
961
 
          }
962
 
        }
 
1027
      {
 
1028
        if (terminator(line_term_ptr,line_term_length))
 
1029
        {                                       // Maybe unexpected linefeed
 
1030
          enclosed=0;
 
1031
          found_end_of_line=1;
 
1032
          row_start=buffer;
 
1033
          row_end=  to;
 
1034
          return 0;
 
1035
        }
 
1036
      }
963
1037
      if (chr == found_enclosed_char)
964
1038
      {
965
 
        if ((chr=GET) == found_enclosed_char)
966
 
        {                                       // Remove dupplicated
967
 
          *to++ = (unsigned char) chr;
968
 
          continue;
969
 
        }
970
 
        // End of enclosed field if followed by field_term or line_term
971
 
        if (chr == my_b_EOF ||
972
 
            (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
973
 
        {                                       // Maybe unexpected linefeed
974
 
          enclosed=1;
975
 
          found_end_of_line=1;
976
 
          row_start=buffer+1;
977
 
          row_end=  to;
978
 
          return 0;
979
 
        }
980
 
        if (chr == field_term_char &&
981
 
            terminator(field_term_ptr,field_term_length))
982
 
        {
983
 
          enclosed=1;
984
 
          row_start=buffer+1;
985
 
          row_end=  to;
986
 
          return 0;
987
 
        }
988
 
        /*
989
 
           The string didn't terminate yet.
990
 
           Store back next character for the loop
991
 
         */
992
 
        PUSH(chr);
993
 
        /* copy the found term character to 'to' */
994
 
        chr= found_enclosed_char;
 
1039
        if ((chr=GET) == found_enclosed_char)
 
1040
        {                                       // Remove dupplicated
 
1041
          *to++ = (unsigned char) chr;
 
1042
          continue;
 
1043
        }
 
1044
        // End of enclosed field if followed by field_term or line_term
 
1045
        if (chr == my_b_EOF ||
 
1046
            (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
 
1047
        {                                       // Maybe unexpected linefeed
 
1048
          enclosed=1;
 
1049
          found_end_of_line=1;
 
1050
          row_start=buffer+1;
 
1051
          row_end=  to;
 
1052
          return 0;
 
1053
        }
 
1054
        if (chr == field_term_char &&
 
1055
            terminator(field_term_ptr,field_term_length))
 
1056
        {
 
1057
          enclosed=1;
 
1058
          row_start=buffer+1;
 
1059
          row_end=  to;
 
1060
          return 0;
 
1061
        }
 
1062
        /*
 
1063
          The string didn't terminate yet.
 
1064
          Store back next character for the loop
 
1065
        */
 
1066
        PUSH(chr);
 
1067
        /* copy the found term character to 'to' */
 
1068
        chr= found_enclosed_char;
995
1069
      }
996
1070
      else if (chr == field_term_char && found_enclosed_char == INT_MAX)
997
1071
      {
998
 
        if (terminator(field_term_ptr,field_term_length))
999
 
        {
1000
 
          enclosed=0;
1001
 
          row_start=buffer;
1002
 
          row_end=  to;
1003
 
          return 0;
1004
 
        }
 
1072
        if (terminator(field_term_ptr,field_term_length))
 
1073
        {
 
1074
          enclosed=0;
 
1075
          row_start=buffer;
 
1076
          row_end=  to;
 
1077
          return 0;
 
1078
        }
1005
1079
      }
1006
1080
      *to++ = (unsigned char) chr;
1007
1081
    }
1008
1082
    /*
1009
 
     ** We come here if buffer is too small. Enlarge it and continue
1010
 
     */
1011
 
    if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
 
1083
    ** We come here if buffer is too small. Enlarge it and continue
 
1084
    */
 
1085
    if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
 
1086
                                        MYF(MY_WME))))
1012
1087
      return (error=1);
1013
1088
    to=new_buffer + (to-buffer);
1014
1089
    buffer=new_buffer;
1029
1104
 
1030
1105
  NOTES
1031
1106
    The row may not be fixed size on disk if there are escape
1032
 
    characters in the cursor.
 
1107
    characters in the file.
1033
1108
 
1034
1109
  IMPLEMENTATION NOTE
1035
1110
    One can't use fixed length with multi-byte charset **
1105
1180
  for (;;)
1106
1181
  {
1107
1182
    int chr = GET;
1108
 
    if (my_mbcharlen(read_charset, chr) > 1)
1109
 
    {
1110
 
      for (uint32_t i=1;
1111
 
          chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1112
 
          i++)
1113
 
        chr = GET;
1114
 
      if (chr == escape_char)
1115
 
        continue;
1116
 
    }
1117
 
    if (chr == my_b_EOF)
1118
 
    {
 
1183
#ifdef USE_MB
 
1184
   if (my_mbcharlen(read_charset, chr) > 1)
 
1185
   {
 
1186
       for (uint32_t i=1;
 
1187
            chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
 
1188
            i++)
 
1189
           chr = GET;
 
1190
       if (chr == escape_char)
 
1191
           continue;
 
1192
   }
 
1193
#endif
 
1194
   if (chr == my_b_EOF)
 
1195
   {
1119
1196
      eof=1;
1120
1197
      return 1;
1121
1198
    }
1123
1200
    {
1124
1201
      line_cuted=1;
1125
1202
      if (GET == my_b_EOF)
1126
 
        return 1;
 
1203
        return 1;
1127
1204
      continue;
1128
1205
    }
1129
1206
    if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
1162
1239
}
1163
1240
 
1164
1241
 
1165
 
} /* namespace drizzled */