~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Monty Taylor
  • Date: 2009-08-12 06:25:19 UTC
  • mto: (1114.1.1 innodb-plugin-merge)
  • mto: This revision was merged to the branch mainline in revision 1183.
  • Revision ID: mordred@inaugust.com-20090812062519-cij02mrrunvnxblt
Tags: innodb-plugin-1.0.4
InnoDB Plugin 1.0.4

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
 
#include "config.h"
20
 
 
 
19
#include <drizzled/server_includes.h>
21
20
#include <drizzled/sql_load.h>
 
21
#include <drizzled/replication/replication.h>
22
22
#include <drizzled/error.h>
23
23
#include <drizzled/data_home.h>
24
24
#include <drizzled/session.h>
25
25
#include <drizzled/sql_base.h>
26
 
#include <drizzled/field/epoch.h>
27
 
#include "drizzled/internal/my_sys.h"
28
 
#include "drizzled/internal/iocache.h"
29
 
#include <drizzled/db.h>
30
 
#include "drizzled/plugin/storage_engine.h"
31
 
 
32
 
#include <sys/stat.h>
33
 
#include <fcntl.h>
34
 
#include <algorithm>
35
 
#include <climits>
36
 
#include <boost/filesystem.hpp>
37
 
 
38
 
namespace fs=boost::filesystem;
39
 
using namespace std;
40
 
namespace drizzled
41
 
{
 
26
#include <drizzled/field/timestamp.h>
42
27
 
43
28
class READ_INFO {
44
 
  int   cursor;
45
 
  unsigned char *buffer;                /* Buffer for read text */
46
 
  unsigned char *end_of_buff;           /* Data in bufferts ends here */
47
 
  size_t buff_length;                   /* Length of buffert */
48
 
  size_t max_length;                    /* Max length of row */
 
29
  File  file;
 
30
  unsigned char *buffer,                        /* Buffer for read text */
 
31
        *end_of_buff;                   /* Data in bufferts ends here */
 
32
  uint  buff_length,                    /* Length of buffert */
 
33
        max_length;                     /* Max length of row */
49
34
  char  *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
50
35
  uint  field_term_length,line_term_length,enclosed_length;
51
36
  int   field_term_char,line_term_char,enclosed_char,escape_char;
52
37
  int   *stack,*stack_pos;
53
38
  bool  found_end_of_line,start_of_line,eof;
54
39
  bool  need_end_io_cache;
55
 
  internal::IO_CACHE cache;
 
40
  IO_CACHE cache;
 
41
  NET *io_net;
56
42
 
57
43
public:
58
44
  bool error,line_cuted,found_null,enclosed;
60
46
        *row_end;                       /* Found row ends here */
61
47
  const CHARSET_INFO *read_charset;
62
48
 
63
 
  READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
 
49
  READ_INFO(File file,uint32_t tot_length, const CHARSET_INFO * const cs,
64
50
            String &field_term,String &line_start,String &line_term,
65
 
            String &enclosed,int escape, bool is_fifo);
 
51
            String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
66
52
  ~READ_INFO();
67
53
  int read_field();
68
54
  int read_fixed_length(void);
77
63
  */
78
64
  void end_io_cache()
79
65
  {
80
 
    cache.end_io_cache();
 
66
    ::end_io_cache(&cache);
81
67
    need_end_io_cache = 0;
82
68
  }
83
69
 
84
70
  /*
85
71
    Either this method, or we need to make cache public
86
 
    Arg must be set from load() since constructor does not see
 
72
    Arg must be set from mysql_load() since constructor does not see
87
73
    either the table or Session value
88
74
  */
89
75
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
90
76
};
91
77
 
92
 
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
78
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
93
79
                             List<Item> &fields_vars, List<Item> &set_fields,
94
80
                             List<Item> &set_values, READ_INFO &read_info,
95
81
                             uint32_t skip_lines,
96
82
                             bool ignore_check_option_errors);
97
 
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
83
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
98
84
                          List<Item> &fields_vars, List<Item> &set_fields,
99
85
                          List<Item> &set_values, READ_INFO &read_info,
100
86
                          String &enclosed, uint32_t skip_lines,
105
91
  Execute LOAD DATA query
106
92
 
107
93
  SYNOPSYS
108
 
    load()
 
94
    mysql_load()
109
95
      session - current thread
110
 
      ex  - file_exchange object representing source cursor and its parsing rules
 
96
      ex  - sql_exchange object representing source file and its parsing rules
111
97
      table_list  - list of tables to which we are loading data
112
98
      fields_vars - list of fields and variables to which we read
113
 
                    data from cursor
 
99
                    data from file
114
100
      set_fields  - list of fields mentioned in set clause
115
101
      set_values  - expressions to assign to fields in previous list
116
102
      handle_duplicates - indicates whenever we should emit error or
117
103
                          replace row if we will meet duplicates.
118
104
      ignore -          - indicates whenever we should ignore duplicates
 
105
      read_file_from_client - is this LOAD DATA LOCAL ?
119
106
 
120
107
  RETURN VALUES
121
108
    true - error / false - success
122
109
*/
123
110
 
124
 
int load(Session *session,file_exchange *ex,TableList *table_list,
 
111
int mysql_load(Session *session,sql_exchange *ex,TableList *table_list,
125
112
                List<Item> &fields_vars, List<Item> &set_fields,
126
113
                List<Item> &set_values,
127
 
                enum enum_duplicates handle_duplicates, bool ignore)
 
114
                enum enum_duplicates handle_duplicates, bool ignore,
 
115
                bool read_file_from_client)
128
116
{
129
 
  int file;
 
117
  char name[FN_REFLEN];
 
118
  File file;
130
119
  Table *table= NULL;
131
120
  int error;
132
121
  String *field_term=ex->field_term,*escaped=ex->escaped;
133
122
  String *enclosed=ex->enclosed;
134
123
  bool is_fifo=0;
135
 
 
136
 
  assert(table_list->getSchemaName()); // This should never be null
137
 
 
 
124
  char *db= table_list->db;                     // This is never null
 
125
  assert(db);
138
126
  /*
139
 
    If path for cursor is not defined, we will use the current database.
 
127
    If path for file is not defined, we will use the current database.
140
128
    If this is not set, we will use the directory where the table to be
141
129
    loaded is located
142
130
  */
143
 
  util::string::const_shared_ptr schema(session->schema());
144
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
131
  char *tdb= session->db ? session->db : db;            // Result is never null
145
132
  assert(tdb);
146
133
  uint32_t skip_lines= ex->skip_lines;
147
134
  bool transactional_table;
148
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
135
  Session::killed_state killed_status= Session::NOT_KILLED;
149
136
 
150
 
  /* Escape and enclosed character may be a utf8 4-byte character */
151
 
  if (escaped->length() > 4 || enclosed->length() > 4)
 
137
  if (escaped->length() > 1 || enclosed->length() > 1)
152
138
  {
153
 
    my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
 
139
    my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
 
140
               MYF(0));
154
141
    return(true);
155
142
  }
156
 
 
157
 
  if (session->openTablesLock(table_list))
 
143
  if (open_and_lock_tables(session, table_list))
158
144
    return(true);
159
 
 
160
145
  if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
161
146
                                    &session->lex->select_lex.top_join_list,
162
147
                                    table_list,
171
156
    table is marked to be 'used for insert' in which case we should never
172
157
    mark this table as 'const table' (ie, one that has only one row).
173
158
  */
174
 
  if (unique_table(table_list, table_list->next_global))
 
159
  if (unique_table(session, table_list, table_list->next_global, 0))
175
160
  {
176
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
161
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
177
162
    return(true);
178
163
  }
179
164
 
180
165
  table= table_list->table;
181
 
  transactional_table= table->cursor->has_transactions();
 
166
  transactional_table= table->file->has_transactions();
182
167
 
183
168
  if (!fields_vars.elements)
184
169
  {
185
170
    Field **field;
186
 
    for (field= table->getFields(); *field ; field++)
 
171
    for (field=table->field; *field ; field++)
187
172
      fields_vars.push_back(new Item_field(*field));
188
 
    table->setWriteSet();
 
173
    bitmap_set_all(table->write_set);
189
174
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
190
175
    /*
191
176
      Let us also prepare SET clause, altough it is probably empty
208
193
    */
209
194
    if (table->timestamp_field)
210
195
    {
211
 
      if (table->isWriteSet(table->timestamp_field->position()))
212
 
      {
 
196
      if (bitmap_is_set(table->write_set,
 
197
                        table->timestamp_field->field_index))
213
198
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
214
 
      }
215
199
      else
216
200
      {
217
 
        table->setWriteSet(table->timestamp_field->position());
 
201
        bitmap_set_bit(table->write_set,
 
202
                       table->timestamp_field->field_index);
218
203
      }
219
204
    }
220
205
    /* Fix the expressions in SET clause */
224
209
 
225
210
  table->mark_columns_needed_for_insert();
226
211
 
227
 
  size_t tot_length=0;
 
212
  uint32_t tot_length=0;
228
213
  bool use_blobs= 0, use_vars= 0;
229
214
  List_iterator_fast<Item> it(fields_vars);
230
215
  Item *item;
259
244
    return(true);
260
245
  }
261
246
 
262
 
  fs::path to_file(ex->file_name);
263
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
264
 
  if (not to_file.has_root_directory())
 
247
  /* We can't give an error in the middle when using LOCAL files */
 
248
  if (read_file_from_client && handle_duplicates == DUP_ERROR)
 
249
    ignore= 1;
 
250
 
 
251
  if (read_file_from_client)
265
252
  {
266
 
    int count_elements= 0;
267
 
    for (fs::path::iterator iter= to_file.begin();
268
 
         iter != to_file.end();
269
 
         ++iter, ++count_elements)
270
 
    { }
271
 
 
272
 
    if (count_elements == 1)
273
 
    {
274
 
      target_path /= tdb;
275
 
    }
276
 
    target_path /= to_file;
 
253
    (void)net_request_file(&session->net,ex->file_name);
 
254
    file = -1;
277
255
  }
278
256
  else
279
257
  {
280
 
    target_path= to_file;
281
 
  }
282
 
 
283
 
  if (not secure_file_priv.string().empty())
284
 
  {
285
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
286
 
    {
287
 
      /* Read only allowed from within dir specified by secure_file_priv */
288
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
258
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
259
    ex->file_name+=dirname_length(ex->file_name);
 
260
#endif
 
261
    if (!dirname_length(ex->file_name))
 
262
    {
 
263
      strcpy(name, drizzle_real_data_home);
 
264
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
265
      (void) fn_format(name, ex->file_name, name, "",
 
266
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
267
    }
 
268
    else
 
269
    {
 
270
      (void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
 
271
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
272
 
 
273
      if (opt_secure_file_priv &&
 
274
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
275
      {
 
276
        /* Read only allowed from within dir specified by secure_file_priv */
 
277
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
278
        return(true);
 
279
      }
 
280
 
 
281
      struct stat stat_info;
 
282
      if (stat(name,&stat_info))
 
283
        return(true);
 
284
 
 
285
      // if we are not in slave thread, the file must be:
 
286
      if (!session->slave_thread &&
 
287
          !((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
288
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
289
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
290
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
291
      {
 
292
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
293
        return(true);
 
294
      }
 
295
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
296
        is_fifo = 1;
 
297
    }
 
298
    if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
289
299
      return(true);
290
 
    }
291
 
  }
292
 
 
293
 
  struct stat stat_info;
294
 
  if (stat(target_path.file_string().c_str(), &stat_info))
295
 
  {
296
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
297
 
    return(true);
298
 
  }
299
 
 
300
 
  // if we are not in slave thread, the cursor must be:
301
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
302
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
303
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
305
 
  {
306
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
 
    return(true);
308
 
  }
309
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
310
 
    is_fifo = 1;
311
 
 
312
 
 
313
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
314
 
  {
315
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
316
 
    return(true);
317
 
  }
318
 
  CopyInfo info;
 
300
  }
 
301
 
 
302
  COPY_INFO info;
319
303
  memset(&info, 0, sizeof(info));
320
304
  info.ignore= ignore;
321
305
  info.handle_duplicates=handle_duplicates;
322
306
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
323
307
 
324
 
  identifier::Schema identifier(*schema);
325
 
  READ_INFO read_info(file, tot_length,
326
 
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
327
 
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
328
 
                      info.escape_char, is_fifo);
 
308
  READ_INFO read_info(file,tot_length,
 
309
                      ex->cs ? ex->cs : session->variables.collation_database,
 
310
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
 
311
                      info.escape_char, read_file_from_client, is_fifo);
329
312
  if (read_info.error)
330
313
  {
331
314
    if  (file >= 0)
332
 
      internal::my_close(file,MYF(0));                  // no files in net reading
 
315
      my_close(file,MYF(0));                    // no files in net reading
333
316
    return(true);                               // Can't allocate buffers
334
317
  }
335
318
 
336
 
  /*
337
 
   * Per the SQL standard, inserting NULL into a NOT NULL
338
 
   * field requires an error to be thrown.
339
 
   *
340
 
   * @NOTE
341
 
   *
342
 
   * NULL check and handling occurs in field_conv.cc
343
 
   */
344
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
319
  session->count_cuted_fields= CHECK_FIELD_WARN;                /* calc cuted fields */
345
320
  session->cuted_fields=0L;
346
321
  /* Skip lines if there is a line terminator */
347
322
  if (ex->line_term->length())
361
336
    table->next_number_field=table->found_next_number_field;
362
337
    if (ignore ||
363
338
        handle_duplicates == DUP_REPLACE)
364
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
339
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
365
340
    if (handle_duplicates == DUP_REPLACE)
366
 
        table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
367
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
341
        table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
342
    table->file->ha_start_bulk_insert((ha_rows) 0);
368
343
    table->copy_blobs=1;
369
344
 
370
 
    session->setAbortOnWarning(true);
 
345
    session->abort_on_warning= true;
371
346
 
372
347
    if (!field_term->length() && !enclosed->length())
373
348
      error= read_fixed_length(session, info, table_list, fields_vars,
377
352
      error= read_sep_field(session, info, table_list, fields_vars,
378
353
                            set_fields, set_values, read_info,
379
354
                            *enclosed, skip_lines, ignore);
380
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
355
    if (table->file->ha_end_bulk_insert() && !error)
381
356
    {
382
 
      table->print_error(errno, MYF(0));
 
357
      table->file->print_error(my_errno, MYF(0));
383
358
      error= 1;
384
359
    }
385
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
386
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
360
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
361
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
387
362
    table->next_number_field=0;
388
363
  }
389
364
  if (file >= 0)
390
 
    internal::my_close(file,MYF(0));
 
365
    my_close(file,MYF(0));
391
366
  free_blobs(table);                            /* if pack_blob was used */
392
367
  table->copy_blobs=0;
393
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
394
 
  /*
 
368
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
 
369
  /* 
395
370
     simulated killing in the middle of per-row loop
396
371
     must be effective for binlogging
397
372
  */
398
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
373
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
399
374
  if (error)
400
375
  {
 
376
    if (read_file_from_client)
 
377
      while (!read_info.next_line())
 
378
        ;
 
379
 
401
380
    error= -1;                          // Error on read
402
381
    goto err;
403
382
  }
404
 
 
405
 
  char msg[FN_REFLEN];
406
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
407
 
           (info.records - info.copied), session->cuted_fields);
408
 
 
409
 
  if (session->transaction.stmt.hasModifiedNonTransData())
410
 
    session->transaction.all.markModifiedNonTransData();
 
383
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
384
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
385
 
 
386
  if (session->transaction.stmt.modified_non_trans_table)
 
387
    session->transaction.all.modified_non_trans_table= true;
 
388
 
 
389
  if (drizzle_bin_log.is_open())
 
390
    session->binlog_flush_pending_rows_event(true);
411
391
 
412
392
  /* ok to client sent only after binlog write and engine commit */
413
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
393
  my_ok(session, info.copied + info.deleted, 0L, name);
414
394
err:
415
395
  assert(transactional_table || !(info.copied || info.deleted) ||
416
 
              session->transaction.stmt.hasModifiedNonTransData());
417
 
  table->cursor->ha_release_auto_increment();
 
396
              session->transaction.stmt.modified_non_trans_table);
 
397
  table->file->ha_release_auto_increment();
418
398
  table->auto_increment_field_not_null= false;
419
 
  session->setAbortOnWarning(false);
420
 
 
 
399
  session->abort_on_warning= 0;
421
400
  return(error);
422
401
}
423
402
 
427
406
****************************************************************************/
428
407
 
429
408
static int
430
 
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
409
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
431
410
                  List<Item> &fields_vars, List<Item> &set_fields,
432
411
                  List<Item> &set_values, READ_INFO &read_info,
433
412
                  uint32_t skip_lines, bool ignore_check_option_errors)
439
418
  bool err;
440
419
 
441
420
  id= 0;
442
 
 
 
421
 
443
422
  while (!read_info.read_fixed_length())
444
423
  {
445
 
    if (session->getKilled())
 
424
    if (session->killed)
446
425
    {
447
426
      session->send_kill_message();
448
427
      return(1);
460
439
    }
461
440
    it.rewind();
462
441
    unsigned char *pos=read_info.row_start;
463
 
#ifdef HAVE_VALGRIND
 
442
#ifdef HAVE_purify
464
443
    read_info.row_end[0]=0;
465
444
#endif
466
445
 
467
 
    table->restoreRecordAsDefault();
 
446
    restore_record(table, s->default_values);
468
447
    /*
469
448
      There is no variables in fields_vars list in this format so
470
449
      this conversion is safe.
471
450
    */
472
451
    while ((sql_field= (Item_field*) it++))
473
452
    {
474
 
      Field *field= sql_field->field;
 
453
      Field *field= sql_field->field;                  
475
454
      if (field == table->next_number_field)
476
455
        table->auto_increment_field_not_null= true;
477
456
      /*
484
463
      if (pos == read_info.row_end)
485
464
      {
486
465
        session->cuted_fields++;                        /* Not enough fields */
487
 
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
488
 
                            ER_WARN_TOO_FEW_RECORDS,
 
466
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
467
                            ER_WARN_TOO_FEW_RECORDS, 
489
468
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
490
 
 
491
 
        if (not field->maybe_null() and field->is_timestamp())
492
 
            ((field::Epoch::pointer) field)->set_time();
 
469
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
470
            ((Field_timestamp*) field)->set_time();
493
471
      }
494
472
      else
495
473
      {
496
474
        uint32_t length;
497
475
        unsigned char save_chr;
498
 
        if ((length=(uint32_t) (read_info.row_end-pos)) >
 
476
        if ((length=(uint) (read_info.row_end-pos)) >
499
477
            field->field_length)
500
 
        {
501
478
          length=field->field_length;
502
 
        }
503
 
        save_chr=pos[length];
504
 
        pos[length]='\0'; // Add temp null terminator for store()
 
479
        save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
505
480
        field->store((char*) pos,length,read_info.read_charset);
506
481
        pos[length]=save_chr;
507
482
        if ((pos+=length) > read_info.row_end)
511
486
    if (pos != read_info.row_end)
512
487
    {
513
488
      session->cuted_fields++;                  /* To long row */
514
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
515
 
                          ER_WARN_TOO_MANY_RECORDS,
516
 
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
 
489
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
490
                          ER_WARN_TOO_MANY_RECORDS, 
 
491
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
517
492
    }
518
493
 
519
 
    if (session->getKilled() ||
 
494
    if (session->killed ||
520
495
        fill_record(session, set_fields, set_values,
521
496
                    ignore_check_option_errors))
522
497
      return(1);
525
500
    table->auto_increment_field_not_null= false;
526
501
    if (err)
527
502
      return(1);
528
 
 
 
503
   
529
504
    /*
530
505
      We don't need to reset auto-increment field since we are restoring
531
506
      its default value at the beginning of each loop iteration.
535
510
    if (read_info.line_cuted)
536
511
    {
537
512
      session->cuted_fields++;                  /* To long row */
538
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
539
 
                          ER_WARN_TOO_MANY_RECORDS,
540
 
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
 
513
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
514
                          ER_WARN_TOO_MANY_RECORDS, 
 
515
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count); 
541
516
    }
542
517
    session->row_count++;
543
518
  }
547
522
 
548
523
 
549
524
static int
550
 
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
525
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
551
526
               List<Item> &fields_vars, List<Item> &set_fields,
552
527
               List<Item> &set_values, READ_INFO &read_info,
553
528
               String &enclosed, uint32_t skip_lines,
565
540
 
566
541
  for (;;it.rewind())
567
542
  {
568
 
    if (session->getKilled())
 
543
    if (session->killed)
569
544
    {
570
545
      session->send_kill_message();
571
546
      return(1);
572
547
    }
573
548
 
574
 
    table->restoreRecordAsDefault();
 
549
    restore_record(table, s->default_values);
575
550
 
576
551
    while ((item= it++))
577
552
    {
587
562
        continue;
588
563
 
589
564
      pos=read_info.row_start;
590
 
      length=(uint32_t) (read_info.row_end-pos);
 
565
      length=(uint) (read_info.row_end-pos);
591
566
 
592
567
      real_item= item->real_item();
593
568
 
605
580
            return(1);
606
581
          }
607
582
          field->set_null();
608
 
          if (not field->maybe_null())
 
583
          if (!field->maybe_null())
609
584
          {
610
 
            if (field->is_timestamp())
611
 
            {
612
 
              ((field::Epoch::pointer) field)->set_time();
613
 
            }
 
585
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
586
              ((Field_timestamp*) field)->set_time();
614
587
            else if (field != table->next_number_field)
615
 
            {
616
 
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
617
 
            }
 
588
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
589
                                 ER_WARN_NULL_TO_NOTNULL, 1);
618
590
          }
619
591
        }
620
592
        else if (item->type() == Item::STRING_ITEM)
660
632
    }
661
633
    if (item)
662
634
    {
663
 
      /* Have not read any field, thus input cursor is simply ended */
 
635
      /* Have not read any field, thus input file is simply ended */
664
636
      if (item == fields_vars.head())
665
637
        break;
666
638
      for (; item ; item= it++)
675
647
                     session->row_count);
676
648
            return(1);
677
649
          }
678
 
          if (not field->maybe_null() and field->is_timestamp())
679
 
              ((field::Epoch::pointer) field)->set_time();
 
650
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
651
              ((Field_timestamp*) field)->set_time();
680
652
          /*
681
653
            QQ: We probably should not throw warning for each field.
682
654
            But how about intention to always have the same number
701
673
      }
702
674
    }
703
675
 
704
 
    if (session->getKilled() ||
 
676
    if (session->killed ||
705
677
        fill_record(session, set_fields, set_values,
706
678
                    ignore_check_option_errors))
707
679
      return(1);
719
691
    if (read_info.line_cuted)
720
692
    {
721
693
      session->cuted_fields++;                  /* To long row */
722
 
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
723
 
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
724
 
                          session->row_count);
725
 
      if (session->getKilled())
 
694
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, 
 
695
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS), 
 
696
                          session->row_count);   
 
697
      if (session->killed)
726
698
        return(1);
727
699
    }
728
700
    session->row_count++;
743
715
  case 'r': return '\r';
744
716
  case 'b': return '\b';
745
717
  case '0': return 0;                           // Ascii null
746
 
  case 'Z': return '\032';                      // Win32 end of cursor
 
718
  case 'Z': return '\032';                      // Win32 end of file
747
719
  case 'N': found_null=1;
748
720
 
749
721
    /* fall through */
758
730
*/
759
731
 
760
732
 
761
 
READ_INFO::READ_INFO(int file_par, size_t tot_length,
762
 
                     const CHARSET_INFO * const cs,
 
733
READ_INFO::READ_INFO(File file_par, uint32_t tot_length, const CHARSET_INFO * const cs,
763
734
                     String &field_term, String &line_start, String &line_term,
764
 
                     String &enclosed_par, int escape, bool is_fifo)
765
 
  :cursor(file_par),escape_char(escape)
 
735
                     String &enclosed_par, int escape, bool get_it_from_net,
 
736
                     bool is_fifo)
 
737
  :file(file_par),escape_char(escape)
766
738
{
767
739
  read_charset= cs;
768
740
  field_term_ptr=(char*) field_term.ptr();
796
768
 
797
769
 
798
770
  /* Set of a stack for unget if long terminators */
799
 
  size_t length= max(field_term_length,line_term_length)+1;
800
 
  set_if_bigger(length, line_start.length());
801
 
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
 
771
  uint32_t length=cmax(field_term_length,line_term_length)+1;
 
772
  set_if_bigger(length,line_start.length());
 
773
  stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
802
774
 
803
 
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
804
 
    error=1;
 
775
  if (!(buffer=(unsigned char*) my_malloc(buff_length+1,MYF(0))))
 
776
    error=1; /* purecov: inspected */
805
777
  else
806
778
  {
807
779
    end_of_buff=buffer+buff_length;
808
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
809
 
                            (false) ? internal::READ_NET :
810
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
811
 
                            MYF(MY_WME)))
 
780
    if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
 
781
                      (get_it_from_net) ? READ_NET :
 
782
                      (is_fifo ? READ_FIFO : READ_CACHE),0L,1,
 
783
                      MYF(MY_WME)))
812
784
    {
813
 
      free((unsigned char*) buffer);
 
785
      free((unsigned char*) buffer); /* purecov: inspected */
814
786
      error=1;
815
787
    }
816
788
    else
821
793
        manual assignment
822
794
      */
823
795
      need_end_io_cache = 1;
 
796
 
 
797
      if (get_it_from_net)
 
798
        cache.read_function = _my_b_net_read;
 
799
 
 
800
      if (drizzle_bin_log.is_open())
 
801
        cache.pre_read = cache.pre_close =
 
802
          (IO_CACHE_CALLBACK) log_loaded_block;
824
803
    }
825
804
  }
826
805
}
831
810
  if (!error)
832
811
  {
833
812
    if (need_end_io_cache)
834
 
      cache.end_io_cache();
835
 
    free(buffer);
 
813
      ::end_io_cache(&cache);
 
814
    free((unsigned char*) buffer);
836
815
    error=1;
837
816
  }
838
817
}
901
880
    while ( to < end_of_buff)
902
881
    {
903
882
      chr = GET;
 
883
#ifdef USE_MB
904
884
      if ((my_mbcharlen(read_charset, chr) > 1) &&
905
885
          to+my_mbcharlen(read_charset, chr) <= end_of_buff)
906
886
      {
907
 
        unsigned char* p = (unsigned char*)to;
908
 
        *to++ = chr;
909
 
        int ml = my_mbcharlen(read_charset, chr);
910
 
        int i;
911
 
        for (i=1; i<ml; i++) {
912
 
          chr = GET;
913
 
          if (chr == my_b_EOF)
914
 
            goto found_eof;
915
 
          *to++ = chr;
916
 
        }
917
 
        if (my_ismbchar(read_charset,
918
 
              (const char *)p,
919
 
              (const char *)to))
920
 
          continue;
921
 
        for (i=0; i<ml; i++)
922
 
          PUSH((unsigned char) *--to);
923
 
        chr = GET;
 
887
          unsigned char* p = (unsigned char*)to;
 
888
          *to++ = chr;
 
889
          int ml = my_mbcharlen(read_charset, chr);
 
890
          int i;
 
891
          for (i=1; i<ml; i++) {
 
892
              chr = GET;
 
893
              if (chr == my_b_EOF)
 
894
                  goto found_eof;
 
895
              *to++ = chr;
 
896
          }
 
897
          if (my_ismbchar(read_charset,
 
898
                          (const char *)p,
 
899
                          (const char *)to))
 
900
            continue;
 
901
          for (i=0; i<ml; i++)
 
902
            PUSH((unsigned char) *--to);
 
903
          chr = GET;
924
904
      }
 
905
#endif
925
906
      if (chr == my_b_EOF)
926
 
        goto found_eof;
 
907
        goto found_eof;
927
908
      if (chr == escape_char)
928
909
      {
929
 
        if ((chr=GET) == my_b_EOF)
930
 
        {
931
 
          *to++= (unsigned char) escape_char;
932
 
          goto found_eof;
933
 
        }
 
910
        if ((chr=GET) == my_b_EOF)
 
911
        {
 
912
          *to++= (unsigned char) escape_char;
 
913
          goto found_eof;
 
914
        }
934
915
        /*
935
916
          When escape_char == enclosed_char, we treat it like we do for
936
917
          handling quotes in SQL parsing -- you can double-up the
949
930
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
950
931
      if (chr == line_term_char)
951
932
#else
952
 
        if (chr == line_term_char && found_enclosed_char == INT_MAX)
 
933
      if (chr == line_term_char && found_enclosed_char == INT_MAX)
953
934
#endif
954
 
        {
955
 
          if (terminator(line_term_ptr,line_term_length))
956
 
          {                                     // Maybe unexpected linefeed
957
 
            enclosed=0;
958
 
            found_end_of_line=1;
959
 
            row_start=buffer;
960
 
            row_end=  to;
961
 
            return 0;
962
 
          }
963
 
        }
 
935
      {
 
936
        if (terminator(line_term_ptr,line_term_length))
 
937
        {                                       // Maybe unexpected linefeed
 
938
          enclosed=0;
 
939
          found_end_of_line=1;
 
940
          row_start=buffer;
 
941
          row_end=  to;
 
942
          return 0;
 
943
        }
 
944
      }
964
945
      if (chr == found_enclosed_char)
965
946
      {
966
 
        if ((chr=GET) == found_enclosed_char)
967
 
        {                                       // Remove dupplicated
968
 
          *to++ = (unsigned char) chr;
969
 
          continue;
970
 
        }
971
 
        // End of enclosed field if followed by field_term or line_term
972
 
        if (chr == my_b_EOF ||
973
 
            (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
974
 
        {                                       // Maybe unexpected linefeed
975
 
          enclosed=1;
976
 
          found_end_of_line=1;
977
 
          row_start=buffer+1;
978
 
          row_end=  to;
979
 
          return 0;
980
 
        }
981
 
        if (chr == field_term_char &&
982
 
            terminator(field_term_ptr,field_term_length))
983
 
        {
984
 
          enclosed=1;
985
 
          row_start=buffer+1;
986
 
          row_end=  to;
987
 
          return 0;
988
 
        }
989
 
        /*
990
 
           The string didn't terminate yet.
991
 
           Store back next character for the loop
992
 
         */
993
 
        PUSH(chr);
994
 
        /* copy the found term character to 'to' */
995
 
        chr= found_enclosed_char;
 
947
        if ((chr=GET) == found_enclosed_char)
 
948
        {                                       // Remove dupplicated
 
949
          *to++ = (unsigned char) chr;
 
950
          continue;
 
951
        }
 
952
        // End of enclosed field if followed by field_term or line_term
 
953
        if (chr == my_b_EOF ||
 
954
            (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
 
955
        {                                       // Maybe unexpected linefeed
 
956
          enclosed=1;
 
957
          found_end_of_line=1;
 
958
          row_start=buffer+1;
 
959
          row_end=  to;
 
960
          return 0;
 
961
        }
 
962
        if (chr == field_term_char &&
 
963
            terminator(field_term_ptr,field_term_length))
 
964
        {
 
965
          enclosed=1;
 
966
          row_start=buffer+1;
 
967
          row_end=  to;
 
968
          return 0;
 
969
        }
 
970
        /*
 
971
          The string didn't terminate yet.
 
972
          Store back next character for the loop
 
973
        */
 
974
        PUSH(chr);
 
975
        /* copy the found term character to 'to' */
 
976
        chr= found_enclosed_char;
996
977
      }
997
978
      else if (chr == field_term_char && found_enclosed_char == INT_MAX)
998
979
      {
999
 
        if (terminator(field_term_ptr,field_term_length))
1000
 
        {
1001
 
          enclosed=0;
1002
 
          row_start=buffer;
1003
 
          row_end=  to;
1004
 
          return 0;
1005
 
        }
 
980
        if (terminator(field_term_ptr,field_term_length))
 
981
        {
 
982
          enclosed=0;
 
983
          row_start=buffer;
 
984
          row_end=  to;
 
985
          return 0;
 
986
        }
1006
987
      }
1007
988
      *to++ = (unsigned char) chr;
1008
989
    }
1009
990
    /*
1010
 
     ** We come here if buffer is too small. Enlarge it and continue
1011
 
     */
1012
 
    if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
 
991
    ** We come here if buffer is too small. Enlarge it and continue
 
992
    */
 
993
    if (!(new_buffer=(unsigned char*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
 
994
                                        MYF(MY_WME))))
1013
995
      return (error=1);
1014
996
    to=new_buffer + (to-buffer);
1015
997
    buffer=new_buffer;
1030
1012
 
1031
1013
  NOTES
1032
1014
    The row may not be fixed size on disk if there are escape
1033
 
    characters in the cursor.
 
1015
    characters in the file.
1034
1016
 
1035
1017
  IMPLEMENTATION NOTE
1036
1018
    One can't use fixed length with multi-byte charset **
1106
1088
  for (;;)
1107
1089
  {
1108
1090
    int chr = GET;
1109
 
    if (my_mbcharlen(read_charset, chr) > 1)
1110
 
    {
1111
 
      for (uint32_t i=1;
1112
 
          chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1113
 
          i++)
1114
 
        chr = GET;
1115
 
      if (chr == escape_char)
1116
 
        continue;
1117
 
    }
1118
 
    if (chr == my_b_EOF)
1119
 
    {
 
1091
#ifdef USE_MB
 
1092
   if (my_mbcharlen(read_charset, chr) > 1)
 
1093
   {
 
1094
       for (uint32_t i=1;
 
1095
            chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
 
1096
            i++)
 
1097
           chr = GET;
 
1098
       if (chr == escape_char)
 
1099
           continue;
 
1100
   }
 
1101
#endif
 
1102
   if (chr == my_b_EOF)
 
1103
   {
1120
1104
      eof=1;
1121
1105
      return 1;
1122
1106
    }
1124
1108
    {
1125
1109
      line_cuted=1;
1126
1110
      if (GET == my_b_EOF)
1127
 
        return 1;
 
1111
        return 1;
1128
1112
      continue;
1129
1113
    }
1130
1114
    if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
1163
1147
}
1164
1148
 
1165
1149
 
1166
 
} /* namespace drizzled */