~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Eric Herman
  • Date: 2008-12-07 15:29:44 UTC
  • mto: (656.1.14 devel)
  • mto: This revision was merged to the branch mainline in revision 670.
  • Revision ID: eric@mysql.com-20081207152944-cq1nx1cyi0huqj0f
Added pointer to online version of the FAQ

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
 
#include "config.h"
 
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_load.h>
 
21
#include <drizzled/replication/replication.h>
21
22
#include <drizzled/error.h>
22
23
#include <drizzled/data_home.h>
23
24
#include <drizzled/session.h>
24
25
#include <drizzled/sql_base.h>
25
26
#include <drizzled/field/timestamp.h>
26
 
#include "drizzled/internal/my_sys.h"
27
 
#include "drizzled/internal/iocache.h"
28
 
#include <drizzled/db.h>
29
 
 
30
 
#include <sys/stat.h>
31
 
#include <fcntl.h>
32
 
#include <algorithm>
33
 
#include <climits>
34
 
#include <boost/filesystem.hpp>
35
 
 
36
 
namespace fs=boost::filesystem;
37
 
using namespace std;
38
 
namespace drizzled
39
 
{
40
27
 
41
28
class READ_INFO {
42
 
  int   cursor;
43
 
  unsigned char *buffer;                /* Buffer for read text */
44
 
  unsigned char *end_of_buff;           /* Data in bufferts ends here */
45
 
  size_t buff_length;                   /* Length of buffert */
46
 
  size_t max_length;                    /* Max length of row */
 
29
  File  file;
 
30
  unsigned char *buffer,                        /* Buffer for read text */
 
31
        *end_of_buff;                   /* Data in bufferts ends here */
 
32
  uint  buff_length,                    /* Length of buffert */
 
33
        max_length;                     /* Max length of row */
47
34
  char  *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
48
35
  uint  field_term_length,line_term_length,enclosed_length;
49
36
  int   field_term_char,line_term_char,enclosed_char,escape_char;
50
37
  int   *stack,*stack_pos;
51
38
  bool  found_end_of_line,start_of_line,eof;
52
39
  bool  need_end_io_cache;
53
 
  internal::IO_CACHE cache;
 
40
  IO_CACHE cache;
 
41
  NET *io_net;
54
42
 
55
43
public:
56
44
  bool error,line_cuted,found_null,enclosed;
58
46
        *row_end;                       /* Found row ends here */
59
47
  const CHARSET_INFO *read_charset;
60
48
 
61
 
  READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
 
49
  READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
62
50
            String &field_term,String &line_start,String &line_term,
63
 
            String &enclosed,int escape, bool is_fifo);
 
51
            String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
64
52
  ~READ_INFO();
65
53
  int read_field();
66
54
  int read_fixed_length(void);
75
63
  */
76
64
  void end_io_cache()
77
65
  {
78
 
    cache.end_io_cache();
 
66
    ::end_io_cache(&cache);
79
67
    need_end_io_cache = 0;
80
68
  }
81
69
 
87
75
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
88
76
};
89
77
 
90
 
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
78
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
91
79
                             List<Item> &fields_vars, List<Item> &set_fields,
92
80
                             List<Item> &set_values, READ_INFO &read_info,
93
81
                             uint32_t skip_lines,
94
82
                             bool ignore_check_option_errors);
95
 
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
83
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
96
84
                          List<Item> &fields_vars, List<Item> &set_fields,
97
85
                          List<Item> &set_values, READ_INFO &read_info,
98
86
                          String &enclosed, uint32_t skip_lines,
105
93
  SYNOPSYS
106
94
    mysql_load()
107
95
      session - current thread
108
 
      ex  - file_exchange object representing source cursor and its parsing rules
 
96
      ex  - sql_exchange object representing source file and its parsing rules
109
97
      table_list  - list of tables to which we are loading data
110
98
      fields_vars - list of fields and variables to which we read
111
 
                    data from cursor
 
99
                    data from file
112
100
      set_fields  - list of fields mentioned in set clause
113
101
      set_values  - expressions to assign to fields in previous list
114
102
      handle_duplicates - indicates whenever we should emit error or
115
103
                          replace row if we will meet duplicates.
116
104
      ignore -          - indicates whenever we should ignore duplicates
 
105
      read_file_from_client - is this LOAD DATA LOCAL ?
117
106
 
118
107
  RETURN VALUES
119
108
    true - error / false - success
120
109
*/
121
110
 
122
 
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
 
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
123
112
                List<Item> &fields_vars, List<Item> &set_fields,
124
113
                List<Item> &set_values,
125
 
                enum enum_duplicates handle_duplicates, bool ignore)
 
114
                enum enum_duplicates handle_duplicates, bool ignore,
 
115
                bool read_file_from_client)
126
116
{
127
 
  int file;
 
117
  char name[FN_REFLEN];
 
118
  File file;
128
119
  Table *table= NULL;
129
120
  int error;
130
121
  String *field_term=ex->field_term,*escaped=ex->escaped;
131
122
  String *enclosed=ex->enclosed;
132
123
  bool is_fifo=0;
133
 
 
134
 
  assert(table_list->getSchemaName()); // This should never be null
135
 
 
 
124
  char *db= table_list->db;                     // This is never null
 
125
  assert(db);
136
126
  /*
137
 
    If path for cursor is not defined, we will use the current database.
 
127
    If path for file is not defined, we will use the current database.
138
128
    If this is not set, we will use the directory where the table to be
139
129
    loaded is located
140
130
  */
141
 
  util::string::const_shared_ptr schema(session->schema());
142
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
131
  char *tdb= session->db ? session->db : db;            // Result is never null
143
132
  assert(tdb);
144
133
  uint32_t skip_lines= ex->skip_lines;
145
134
  bool transactional_table;
146
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
135
  Session::killed_state killed_status= Session::NOT_KILLED;
147
136
 
148
 
  /* Escape and enclosed character may be a utf8 4-byte character */
149
 
  if (escaped->length() > 4 || enclosed->length() > 4)
 
137
  if (escaped->length() > 1 || enclosed->length() > 1)
150
138
  {
151
 
    my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
 
139
    my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
 
140
               MYF(0));
152
141
    return(true);
153
142
  }
154
 
 
155
 
  if (session->openTablesLock(table_list))
 
143
  if (open_and_lock_tables(session, table_list))
156
144
    return(true);
157
 
 
158
145
  if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
159
146
                                    &session->lex->select_lex.top_join_list,
160
147
                                    table_list,
169
156
    table is marked to be 'used for insert' in which case we should never
170
157
    mark this table as 'const table' (ie, one that has only one row).
171
158
  */
172
 
  if (unique_table(table_list, table_list->next_global))
 
159
  if (unique_table(session, table_list, table_list->next_global, 0))
173
160
  {
174
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
161
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
162
    return(true);
176
163
  }
177
164
 
178
165
  table= table_list->table;
179
 
  transactional_table= table->cursor->has_transactions();
 
166
  transactional_table= table->file->has_transactions();
180
167
 
181
168
  if (!fields_vars.elements)
182
169
  {
183
170
    Field **field;
184
 
    for (field= table->getFields(); *field ; field++)
 
171
    for (field=table->field; *field ; field++)
185
172
      fields_vars.push_back(new Item_field(*field));
186
 
    table->setWriteSet();
 
173
    bitmap_set_all(table->write_set);
187
174
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
188
175
    /*
189
176
      Let us also prepare SET clause, altough it is probably empty
206
193
    */
207
194
    if (table->timestamp_field)
208
195
    {
209
 
      if (table->isWriteSet(table->timestamp_field->position()))
210
 
      {
 
196
      if (bitmap_is_set(table->write_set,
 
197
                        table->timestamp_field->field_index))
211
198
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
212
 
      }
213
199
      else
214
200
      {
215
 
        table->setWriteSet(table->timestamp_field->position());
 
201
        bitmap_set_bit(table->write_set,
 
202
                       table->timestamp_field->field_index);
216
203
      }
217
204
    }
218
205
    /* Fix the expressions in SET clause */
222
209
 
223
210
  table->mark_columns_needed_for_insert();
224
211
 
225
 
  size_t tot_length=0;
 
212
  uint32_t tot_length=0;
226
213
  bool use_blobs= 0, use_vars= 0;
227
214
  List_iterator_fast<Item> it(fields_vars);
228
215
  Item *item;
257
244
    return(true);
258
245
  }
259
246
 
260
 
  fs::path to_file(ex->file_name);
261
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
 
  if (not to_file.has_root_directory())
 
247
  /* We can't give an error in the middle when using LOCAL files */
 
248
  if (read_file_from_client && handle_duplicates == DUP_ERROR)
 
249
    ignore= 1;
 
250
 
 
251
  if (read_file_from_client)
263
252
  {
264
 
    int count_elements= 0;
265
 
    for (fs::path::iterator iter= to_file.begin();
266
 
         iter != to_file.end();
267
 
         ++iter, ++count_elements)
268
 
    { }
269
 
 
270
 
    if (count_elements == 1)
271
 
    {
272
 
      target_path /= tdb;
273
 
    }
274
 
    target_path /= to_file;
 
253
    (void)net_request_file(&session->net,ex->file_name);
 
254
    file = -1;
275
255
  }
276
256
  else
277
257
  {
278
 
    target_path= to_file;
279
 
  }
280
 
 
281
 
  if (not secure_file_priv.string().empty())
282
 
  {
283
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
284
 
    {
285
 
      /* Read only allowed from within dir specified by secure_file_priv */
286
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
258
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
259
    ex->file_name+=dirname_length(ex->file_name);
 
260
#endif
 
261
    if (!dirname_length(ex->file_name))
 
262
    {
 
263
      strcpy(name, drizzle_real_data_home);
 
264
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
265
      (void) fn_format(name, ex->file_name, name, "",
 
266
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
267
    }
 
268
    else
 
269
    {
 
270
      (void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
 
271
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
272
 
 
273
      if (opt_secure_file_priv &&
 
274
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
275
      {
 
276
        /* Read only allowed from within dir specified by secure_file_priv */
 
277
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
278
        return(true);
 
279
      }
 
280
 
 
281
      struct stat stat_info;
 
282
      if (stat(name,&stat_info))
 
283
        return(true);
 
284
 
 
285
      // if we are not in slave thread, the file must be:
 
286
      if (!session->slave_thread &&
 
287
          !((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
288
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
289
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
290
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
291
      {
 
292
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
293
        return(true);
 
294
      }
 
295
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
296
        is_fifo = 1;
 
297
    }
 
298
    if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
287
299
      return(true);
288
 
    }
289
 
  }
290
 
 
291
 
  struct stat stat_info;
292
 
  if (stat(target_path.file_string().c_str(), &stat_info))
293
 
  {
294
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
295
 
    return(true);
296
 
  }
297
 
 
298
 
  // if we are not in slave thread, the cursor must be:
299
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
300
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
303
 
  {
304
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
305
 
    return(true);
306
 
  }
307
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
308
 
    is_fifo = 1;
309
 
 
310
 
 
311
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
312
 
  {
313
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
314
 
    return(true);
315
 
  }
316
 
  CopyInfo info;
 
300
  }
 
301
 
 
302
  COPY_INFO info;
317
303
  memset(&info, 0, sizeof(info));
318
304
  info.ignore= ignore;
319
305
  info.handle_duplicates=handle_duplicates;
320
306
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
321
307
 
322
 
  SchemaIdentifier identifier(*schema);
323
 
  READ_INFO read_info(file, tot_length,
324
 
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
 
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
326
 
                      info.escape_char, is_fifo);
 
308
  READ_INFO read_info(file,tot_length,
 
309
                      ex->cs ? ex->cs : session->variables.collation_database,
 
310
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
 
311
                      info.escape_char, read_file_from_client, is_fifo);
327
312
  if (read_info.error)
328
313
  {
329
314
    if  (file >= 0)
330
 
      internal::my_close(file,MYF(0));                  // no files in net reading
 
315
      my_close(file,MYF(0));                    // no files in net reading
331
316
    return(true);                               // Can't allocate buffers
332
317
  }
333
318
 
334
 
  /*
335
 
   * Per the SQL standard, inserting NULL into a NOT NULL
336
 
   * field requires an error to be thrown.
337
 
   *
338
 
   * @NOTE
339
 
   *
340
 
   * NULL check and handling occurs in field_conv.cc
341
 
   */
342
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
319
  session->count_cuted_fields= CHECK_FIELD_WARN;                /* calc cuted fields */
343
320
  session->cuted_fields=0L;
344
321
  /* Skip lines if there is a line terminator */
345
322
  if (ex->line_term->length())
359
336
    table->next_number_field=table->found_next_number_field;
360
337
    if (ignore ||
361
338
        handle_duplicates == DUP_REPLACE)
362
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
339
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
363
340
    if (handle_duplicates == DUP_REPLACE)
364
 
        table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
365
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
341
        table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
342
    table->file->ha_start_bulk_insert((ha_rows) 0);
366
343
    table->copy_blobs=1;
367
344
 
368
345
    session->abort_on_warning= true;
375
352
      error= read_sep_field(session, info, table_list, fields_vars,
376
353
                            set_fields, set_values, read_info,
377
354
                            *enclosed, skip_lines, ignore);
378
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
355
    if (table->file->ha_end_bulk_insert() && !error)
379
356
    {
380
 
      table->print_error(errno, MYF(0));
 
357
      table->file->print_error(my_errno, MYF(0));
381
358
      error= 1;
382
359
    }
383
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
384
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
360
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
361
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
385
362
    table->next_number_field=0;
386
363
  }
387
364
  if (file >= 0)
388
 
    internal::my_close(file,MYF(0));
 
365
    my_close(file,MYF(0));
389
366
  free_blobs(table);                            /* if pack_blob was used */
390
367
  table->copy_blobs=0;
391
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
368
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
392
369
  /*
393
370
     simulated killing in the middle of per-row loop
394
371
     must be effective for binlogging
395
372
  */
396
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
373
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
397
374
  if (error)
398
375
  {
 
376
    if (read_file_from_client)
 
377
      while (!read_info.next_line())
 
378
        ;
 
379
 
399
380
    error= -1;                          // Error on read
400
381
    goto err;
401
382
  }
402
 
 
403
 
  char msg[FN_REFLEN];
404
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
405
 
           (info.records - info.copied), session->cuted_fields);
406
 
 
407
 
  if (session->transaction.stmt.hasModifiedNonTransData())
408
 
    session->transaction.all.markModifiedNonTransData();
 
383
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
384
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
385
 
 
386
  if (session->transaction.stmt.modified_non_trans_table)
 
387
    session->transaction.all.modified_non_trans_table= true;
 
388
 
 
389
  if (drizzle_bin_log.is_open())
 
390
    session->binlog_flush_pending_rows_event(true);
409
391
 
410
392
  /* ok to client sent only after binlog write and engine commit */
411
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
393
  my_ok(session, info.copied + info.deleted, 0L, name);
412
394
err:
413
395
  assert(transactional_table || !(info.copied || info.deleted) ||
414
 
              session->transaction.stmt.hasModifiedNonTransData());
415
 
  table->cursor->ha_release_auto_increment();
 
396
              session->transaction.stmt.modified_non_trans_table);
 
397
  table->file->ha_release_auto_increment();
416
398
  table->auto_increment_field_not_null= false;
417
399
  session->abort_on_warning= 0;
418
400
  return(error);
424
406
****************************************************************************/
425
407
 
426
408
static int
427
 
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
409
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
428
410
                  List<Item> &fields_vars, List<Item> &set_fields,
429
411
                  List<Item> &set_values, READ_INFO &read_info,
430
412
                  uint32_t skip_lines, bool ignore_check_option_errors)
439
421
 
440
422
  while (!read_info.read_fixed_length())
441
423
  {
442
 
    if (session->getKilled())
 
424
    if (session->killed)
443
425
    {
444
426
      session->send_kill_message();
445
427
      return(1);
457
439
    }
458
440
    it.rewind();
459
441
    unsigned char *pos=read_info.row_start;
460
 
#ifdef HAVE_VALGRIND
 
442
#ifdef HAVE_purify
461
443
    read_info.row_end[0]=0;
462
444
#endif
463
445
 
464
 
    table->restoreRecordAsDefault();
 
446
    restore_record(table, s->default_values);
465
447
    /*
466
448
      There is no variables in fields_vars list in this format so
467
449
      this conversion is safe.
491
473
      {
492
474
        uint32_t length;
493
475
        unsigned char save_chr;
494
 
        if ((length=(uint32_t) (read_info.row_end-pos)) >
 
476
        if ((length=(uint) (read_info.row_end-pos)) >
495
477
            field->field_length)
496
 
        {
497
478
          length=field->field_length;
498
 
        }
499
 
        save_chr=pos[length];
500
 
        pos[length]='\0'; // Add temp null terminator for store()
 
479
        save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
501
480
        field->store((char*) pos,length,read_info.read_charset);
502
481
        pos[length]=save_chr;
503
482
        if ((pos+=length) > read_info.row_end)
512
491
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
513
492
    }
514
493
 
515
 
    if (session->getKilled() ||
 
494
    if (session->killed ||
516
495
        fill_record(session, set_fields, set_values,
517
496
                    ignore_check_option_errors))
518
497
      return(1);
543
522
 
544
523
 
545
524
static int
546
 
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
525
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
547
526
               List<Item> &fields_vars, List<Item> &set_fields,
548
527
               List<Item> &set_values, READ_INFO &read_info,
549
528
               String &enclosed, uint32_t skip_lines,
561
540
 
562
541
  for (;;it.rewind())
563
542
  {
564
 
    if (session->getKilled())
 
543
    if (session->killed)
565
544
    {
566
545
      session->send_kill_message();
567
546
      return(1);
568
547
    }
569
548
 
570
 
    table->restoreRecordAsDefault();
 
549
    restore_record(table, s->default_values);
571
550
 
572
551
    while ((item= it++))
573
552
    {
583
562
        continue;
584
563
 
585
564
      pos=read_info.row_start;
586
 
      length=(uint32_t) (read_info.row_end-pos);
 
565
      length=(uint) (read_info.row_end-pos);
587
566
 
588
567
      real_item= item->real_item();
589
568
 
653
632
    }
654
633
    if (item)
655
634
    {
656
 
      /* Have not read any field, thus input cursor is simply ended */
 
635
      /* Have not read any field, thus input file is simply ended */
657
636
      if (item == fields_vars.head())
658
637
        break;
659
638
      for (; item ; item= it++)
694
673
      }
695
674
    }
696
675
 
697
 
    if (session->getKilled() ||
 
676
    if (session->killed ||
698
677
        fill_record(session, set_fields, set_values,
699
678
                    ignore_check_option_errors))
700
679
      return(1);
715
694
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
716
695
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
717
696
                          session->row_count);
718
 
      if (session->getKilled())
 
697
      if (session->killed)
719
698
        return(1);
720
699
    }
721
700
    session->row_count++;
736
715
  case 'r': return '\r';
737
716
  case 'b': return '\b';
738
717
  case '0': return 0;                           // Ascii null
739
 
  case 'Z': return '\032';                      // Win32 end of cursor
 
718
  case 'Z': return '\032';                      // Win32 end of file
740
719
  case 'N': found_null=1;
741
720
 
742
721
    /* fall through */
751
730
*/
752
731
 
753
732
 
754
 
READ_INFO::READ_INFO(int file_par, size_t tot_length,
755
 
                     const CHARSET_INFO * const cs,
 
733
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
756
734
                     String &field_term, String &line_start, String &line_term,
757
 
                     String &enclosed_par, int escape, bool is_fifo)
758
 
  :cursor(file_par),escape_char(escape)
 
735
                     String &enclosed_par, int escape, bool get_it_from_net,
 
736
                     bool is_fifo)
 
737
  :file(file_par),escape_char(escape)
759
738
{
760
739
  read_charset= cs;
761
740
  field_term_ptr=(char*) field_term.ptr();
789
768
 
790
769
 
791
770
  /* Set of a stack for unget if long terminators */
792
 
  size_t length= max(field_term_length,line_term_length)+1;
793
 
  set_if_bigger(length, line_start.length());
794
 
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
 
771
  uint32_t length=cmax(field_term_length,line_term_length)+1;
 
772
  set_if_bigger(length,line_start.length());
 
773
  stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
795
774
 
796
 
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
797
 
    error=1;
 
775
  if (!(buffer=(unsigned char*) malloc(buff_length+1)))
 
776
    error=1; /* purecov: inspected */
798
777
  else
799
778
  {
800
779
    end_of_buff=buffer+buff_length;
801
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
 
                            (false) ? internal::READ_NET :
803
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
804
 
                            MYF(MY_WME)))
 
780
    if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
 
781
                      (get_it_from_net) ? READ_NET :
 
782
                      (is_fifo ? READ_FIFO : READ_CACHE),0L,1,
 
783
                      MYF(MY_WME)))
805
784
    {
806
 
      free((unsigned char*) buffer);
 
785
      free((unsigned char*) buffer); /* purecov: inspected */
807
786
      error=1;
808
787
    }
809
788
    else
814
793
        manual assignment
815
794
      */
816
795
      need_end_io_cache = 1;
 
796
 
 
797
      if (get_it_from_net)
 
798
        cache.read_function = _my_b_net_read;
 
799
 
 
800
      if (drizzle_bin_log.is_open())
 
801
        cache.pre_read = cache.pre_close =
 
802
          (IO_CACHE_CALLBACK) log_loaded_block;
817
803
    }
818
804
  }
819
805
}
824
810
  if (!error)
825
811
  {
826
812
    if (need_end_io_cache)
827
 
      cache.end_io_cache();
828
 
    free(buffer);
 
813
      ::end_io_cache(&cache);
 
814
    free((unsigned char*) buffer);
829
815
    error=1;
830
816
  }
831
817
}
894
880
    while ( to < end_of_buff)
895
881
    {
896
882
      chr = GET;
 
883
#ifdef USE_MB
897
884
      if ((my_mbcharlen(read_charset, chr) > 1) &&
898
885
          to+my_mbcharlen(read_charset, chr) <= end_of_buff)
899
886
      {
900
 
        unsigned char* p = (unsigned char*)to;
901
 
        *to++ = chr;
902
 
        int ml = my_mbcharlen(read_charset, chr);
903
 
        int i;
904
 
        for (i=1; i<ml; i++) {
905
 
          chr = GET;
906
 
          if (chr == my_b_EOF)
907
 
            goto found_eof;
908
 
          *to++ = chr;
909
 
        }
910
 
        if (my_ismbchar(read_charset,
911
 
              (const char *)p,
912
 
              (const char *)to))
913
 
          continue;
914
 
        for (i=0; i<ml; i++)
915
 
          PUSH((unsigned char) *--to);
916
 
        chr = GET;
 
887
          unsigned char* p = (unsigned char*)to;
 
888
          *to++ = chr;
 
889
          int ml = my_mbcharlen(read_charset, chr);
 
890
          int i;
 
891
          for (i=1; i<ml; i++) {
 
892
              chr = GET;
 
893
              if (chr == my_b_EOF)
 
894
                  goto found_eof;
 
895
              *to++ = chr;
 
896
          }
 
897
          if (my_ismbchar(read_charset,
 
898
                          (const char *)p,
 
899
                          (const char *)to))
 
900
            continue;
 
901
          for (i=0; i<ml; i++)
 
902
            PUSH((unsigned char) *--to);
 
903
          chr = GET;
917
904
      }
 
905
#endif
918
906
      if (chr == my_b_EOF)
919
 
        goto found_eof;
 
907
        goto found_eof;
920
908
      if (chr == escape_char)
921
909
      {
922
 
        if ((chr=GET) == my_b_EOF)
923
 
        {
924
 
          *to++= (unsigned char) escape_char;
925
 
          goto found_eof;
926
 
        }
 
910
        if ((chr=GET) == my_b_EOF)
 
911
        {
 
912
          *to++= (unsigned char) escape_char;
 
913
          goto found_eof;
 
914
        }
927
915
        /*
928
916
          When escape_char == enclosed_char, we treat it like we do for
929
917
          handling quotes in SQL parsing -- you can double-up the
942
930
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
943
931
      if (chr == line_term_char)
944
932
#else
945
 
        if (chr == line_term_char && found_enclosed_char == INT_MAX)
 
933
      if (chr == line_term_char && found_enclosed_char == INT_MAX)
946
934
#endif
947
 
        {
948
 
          if (terminator(line_term_ptr,line_term_length))
949
 
          {                                     // Maybe unexpected linefeed
950
 
            enclosed=0;
951
 
            found_end_of_line=1;
952
 
            row_start=buffer;
953
 
            row_end=  to;
954
 
            return 0;
955
 
          }
956
 
        }
 
935
      {
 
936
        if (terminator(line_term_ptr,line_term_length))
 
937
        {                                       // Maybe unexpected linefeed
 
938
          enclosed=0;
 
939
          found_end_of_line=1;
 
940
          row_start=buffer;
 
941
          row_end=  to;
 
942
          return 0;
 
943
        }
 
944
      }
957
945
      if (chr == found_enclosed_char)
958
946
      {
959
 
        if ((chr=GET) == found_enclosed_char)
960
 
        {                                       // Remove dupplicated
961
 
          *to++ = (unsigned char) chr;
962
 
          continue;
963
 
        }
964
 
        // End of enclosed field if followed by field_term or line_term
965
 
        if (chr == my_b_EOF ||
966
 
            (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
967
 
        {                                       // Maybe unexpected linefeed
968
 
          enclosed=1;
969
 
          found_end_of_line=1;
970
 
          row_start=buffer+1;
971
 
          row_end=  to;
972
 
          return 0;
973
 
        }
974
 
        if (chr == field_term_char &&
975
 
            terminator(field_term_ptr,field_term_length))
976
 
        {
977
 
          enclosed=1;
978
 
          row_start=buffer+1;
979
 
          row_end=  to;
980
 
          return 0;
981
 
        }
982
 
        /*
983
 
           The string didn't terminate yet.
984
 
           Store back next character for the loop
985
 
         */
986
 
        PUSH(chr);
987
 
        /* copy the found term character to 'to' */
988
 
        chr= found_enclosed_char;
 
947
        if ((chr=GET) == found_enclosed_char)
 
948
        {                                       // Remove dupplicated
 
949
          *to++ = (unsigned char) chr;
 
950
          continue;
 
951
        }
 
952
        // End of enclosed field if followed by field_term or line_term
 
953
        if (chr == my_b_EOF ||
 
954
            (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
 
955
        {                                       // Maybe unexpected linefeed
 
956
          enclosed=1;
 
957
          found_end_of_line=1;
 
958
          row_start=buffer+1;
 
959
          row_end=  to;
 
960
          return 0;
 
961
        }
 
962
        if (chr == field_term_char &&
 
963
            terminator(field_term_ptr,field_term_length))
 
964
        {
 
965
          enclosed=1;
 
966
          row_start=buffer+1;
 
967
          row_end=  to;
 
968
          return 0;
 
969
        }
 
970
        /*
 
971
          The string didn't terminate yet.
 
972
          Store back next character for the loop
 
973
        */
 
974
        PUSH(chr);
 
975
        /* copy the found term character to 'to' */
 
976
        chr= found_enclosed_char;
989
977
      }
990
978
      else if (chr == field_term_char && found_enclosed_char == INT_MAX)
991
979
      {
992
 
        if (terminator(field_term_ptr,field_term_length))
993
 
        {
994
 
          enclosed=0;
995
 
          row_start=buffer;
996
 
          row_end=  to;
997
 
          return 0;
998
 
        }
 
980
        if (terminator(field_term_ptr,field_term_length))
 
981
        {
 
982
          enclosed=0;
 
983
          row_start=buffer;
 
984
          row_end=  to;
 
985
          return 0;
 
986
        }
999
987
      }
1000
988
      *to++ = (unsigned char) chr;
1001
989
    }
1002
990
    /*
1003
 
     ** We come here if buffer is too small. Enlarge it and continue
1004
 
     */
1005
 
    if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
 
991
    ** We come here if buffer is too small. Enlarge it and continue
 
992
    */
 
993
    if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
 
994
                                        MYF(MY_WME))))
1006
995
      return (error=1);
1007
996
    to=new_buffer + (to-buffer);
1008
997
    buffer=new_buffer;
1023
1012
 
1024
1013
  NOTES
1025
1014
    The row may not be fixed size on disk if there are escape
1026
 
    characters in the cursor.
 
1015
    characters in the file.
1027
1016
 
1028
1017
  IMPLEMENTATION NOTE
1029
1018
    One can't use fixed length with multi-byte charset **
1099
1088
  for (;;)
1100
1089
  {
1101
1090
    int chr = GET;
1102
 
    if (my_mbcharlen(read_charset, chr) > 1)
1103
 
    {
1104
 
      for (uint32_t i=1;
1105
 
          chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1106
 
          i++)
1107
 
        chr = GET;
1108
 
      if (chr == escape_char)
1109
 
        continue;
1110
 
    }
1111
 
    if (chr == my_b_EOF)
1112
 
    {
 
1091
#ifdef USE_MB
 
1092
   if (my_mbcharlen(read_charset, chr) > 1)
 
1093
   {
 
1094
       for (uint32_t i=1;
 
1095
            chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
 
1096
            i++)
 
1097
           chr = GET;
 
1098
       if (chr == escape_char)
 
1099
           continue;
 
1100
   }
 
1101
#endif
 
1102
   if (chr == my_b_EOF)
 
1103
   {
1113
1104
      eof=1;
1114
1105
      return 1;
1115
1106
    }
1117
1108
    {
1118
1109
      line_cuted=1;
1119
1110
      if (GET == my_b_EOF)
1120
 
        return 1;
 
1111
        return 1;
1121
1112
      continue;
1122
1113
    }
1123
1114
    if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
1156
1147
}
1157
1148
 
1158
1149
 
1159
 
} /* namespace drizzled */