~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: lbieber
  • Date: 2010-10-01 12:16:18 UTC
  • mfrom: (1802.1.1 fix-bug-651256)
  • Revision ID: lbieber@orisndriz08-20101001121618-uqcboygpjwbiglem
Merge Vijay - fix bug 651256 - Remove --help-extended

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
19
#include "config.h"
20
 
 
21
20
#include <drizzled/sql_load.h>
22
21
#include <drizzled/error.h>
23
22
#include <drizzled/data_home.h>
24
23
#include <drizzled/session.h>
25
24
#include <drizzled/sql_base.h>
26
 
#include <drizzled/field/epoch.h>
 
25
#include <drizzled/field/timestamp.h>
27
26
#include "drizzled/internal/my_sys.h"
28
27
#include "drizzled/internal/iocache.h"
29
28
#include <drizzled/db.h>
30
 
#include "drizzled/plugin/storage_engine.h"
31
29
 
32
30
#include <sys/stat.h>
33
31
#include <fcntl.h>
77
75
  */
78
76
  void end_io_cache()
79
77
  {
80
 
    cache.end_io_cache();
 
78
    internal::end_io_cache(&cache);
81
79
    need_end_io_cache = 0;
82
80
  }
83
81
 
84
82
  /*
85
83
    Either this method, or we need to make cache public
86
 
    Arg must be set from load() since constructor does not see
 
84
    Arg must be set from mysql_load() since constructor does not see
87
85
    either the table or Session value
88
86
  */
89
87
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
105
103
  Execute LOAD DATA query
106
104
 
107
105
  SYNOPSYS
108
 
    load()
 
106
    mysql_load()
109
107
      session - current thread
110
108
      ex  - file_exchange object representing source cursor and its parsing rules
111
109
      table_list  - list of tables to which we are loading data
121
119
    true - error / false - success
122
120
*/
123
121
 
124
 
int load(Session *session,file_exchange *ex,TableList *table_list,
 
122
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
125
123
                List<Item> &fields_vars, List<Item> &set_fields,
126
124
                List<Item> &set_values,
127
125
                enum enum_duplicates handle_duplicates, bool ignore)
128
126
{
 
127
  char name[FN_REFLEN];
129
128
  int file;
130
129
  Table *table= NULL;
131
130
  int error;
132
131
  String *field_term=ex->field_term,*escaped=ex->escaped;
133
132
  String *enclosed=ex->enclosed;
134
133
  bool is_fifo=0;
135
 
 
136
 
  assert(table_list->getSchemaName()); // This should never be null
137
 
 
 
134
  char *db= table_list->db;                     // This is never null
 
135
  assert(db);
138
136
  /*
139
137
    If path for cursor is not defined, we will use the current database.
140
138
    If this is not set, we will use the directory where the table to be
141
139
    loaded is located
142
140
  */
143
 
  util::string::const_shared_ptr schema(session->schema());
144
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
141
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
145
142
  assert(tdb);
146
143
  uint32_t skip_lines= ex->skip_lines;
147
144
  bool transactional_table;
148
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
145
  Session::killed_state killed_status= Session::NOT_KILLED;
149
146
 
150
147
  /* Escape and enclosed character may be a utf8 4-byte character */
151
148
  if (escaped->length() > 4 || enclosed->length() > 4)
173
170
  */
174
171
  if (unique_table(table_list, table_list->next_global))
175
172
  {
176
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
173
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
177
174
    return(true);
178
175
  }
179
176
 
208
205
    */
209
206
    if (table->timestamp_field)
210
207
    {
211
 
      if (table->isWriteSet(table->timestamp_field->position()))
212
 
      {
 
208
      if (table->isWriteSet(table->timestamp_field->field_index))
213
209
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
214
 
      }
215
210
      else
216
211
      {
217
 
        table->setWriteSet(table->timestamp_field->position());
 
212
        table->setWriteSet(table->timestamp_field->field_index);
218
213
      }
219
214
    }
220
215
    /* Fix the expressions in SET clause */
259
254
    return(true);
260
255
  }
261
256
 
262
 
  fs::path to_file(ex->file_name);
263
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
264
 
  if (not to_file.has_root_directory())
265
 
  {
266
 
    int count_elements= 0;
267
 
    for (fs::path::iterator iter= to_file.begin();
268
 
         iter != to_file.end();
269
 
         ++iter, ++count_elements)
270
 
    { }
271
 
 
272
 
    if (count_elements == 1)
273
 
    {
274
 
      target_path /= tdb;
275
 
    }
276
 
    target_path /= to_file;
277
 
  }
278
 
  else
279
 
  {
280
 
    target_path= to_file;
281
 
  }
282
 
 
283
 
  if (not secure_file_priv.string().empty())
284
 
  {
285
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
286
 
    {
287
 
      /* Read only allowed from within dir specified by secure_file_priv */
288
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
257
  {
 
258
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
259
    ex->file_name+=dirname_length(ex->file_name);
 
260
#endif
 
261
    if (!internal::dirname_length(ex->file_name))
 
262
    {
 
263
      strcpy(name, getDataHomeCatalog().c_str());
 
264
      strncat(name, "/", 1);
 
265
      strncat(name, tdb, FN_REFLEN-getDataHomeCatalog().size());
 
266
      (void) internal::fn_format(name, ex->file_name, name, "",
 
267
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
268
    }
 
269
    else
 
270
    {
 
271
      (void) internal::fn_format(name, ex->file_name, getDataHomeCatalog().c_str(), "",
 
272
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
273
 
 
274
      if (opt_secure_file_priv)
 
275
      {
 
276
        fs::path secure_file_path(fs::system_complete(fs::path(opt_secure_file_priv)));
 
277
        fs::path target_path(fs::system_complete(fs::path(name)));
 
278
        if (target_path.file_string().substr(0, secure_file_path.file_string().size()) != secure_file_path.file_string())
 
279
        {
 
280
          /* Read only allowed from within dir specified by secure_file_priv */
 
281
          my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
282
          return(true);
 
283
        }
 
284
      }
 
285
 
 
286
      struct stat stat_info;
 
287
      if (stat(name,&stat_info))
 
288
      {
 
289
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
 
290
        return(true);
 
291
      }
 
292
 
 
293
      // if we are not in slave thread, the cursor must be:
 
294
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
295
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
296
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
297
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
298
      {
 
299
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
300
        return(true);
 
301
      }
 
302
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
303
        is_fifo = 1;
 
304
    }
 
305
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
306
    {
 
307
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
289
308
      return(true);
290
309
    }
291
310
  }
292
311
 
293
 
  struct stat stat_info;
294
 
  if (stat(target_path.file_string().c_str(), &stat_info))
295
 
  {
296
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
297
 
    return(true);
298
 
  }
299
 
 
300
 
  // if we are not in slave thread, the cursor must be:
301
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
302
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
303
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
305
 
  {
306
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
 
    return(true);
308
 
  }
309
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
310
 
    is_fifo = 1;
311
 
 
312
 
 
313
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
314
 
  {
315
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
316
 
    return(true);
317
 
  }
318
312
  CopyInfo info;
319
313
  memset(&info, 0, sizeof(info));
320
314
  info.ignore= ignore;
321
315
  info.handle_duplicates=handle_duplicates;
322
316
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
323
317
 
324
 
  identifier::Schema identifier(*schema);
 
318
  SchemaIdentifier identifier(session->db);
325
319
  READ_INFO read_info(file, tot_length,
326
320
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
327
321
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
367
361
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
368
362
    table->copy_blobs=1;
369
363
 
370
 
    session->setAbortOnWarning(true);
 
364
    session->abort_on_warning= true;
371
365
 
372
366
    if (!field_term->length() && !enclosed->length())
373
367
      error= read_fixed_length(session, info, table_list, fields_vars,
395
389
     simulated killing in the middle of per-row loop
396
390
     must be effective for binlogging
397
391
  */
398
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
392
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
399
393
  if (error)
400
394
  {
401
395
    error= -1;                          // Error on read
402
396
    goto err;
403
397
  }
404
 
 
405
 
  char msg[FN_REFLEN];
406
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
 
398
  snprintf(name, sizeof(name), ER(ER_LOAD_INFO), info.records, info.deleted,
407
399
           (info.records - info.copied), session->cuted_fields);
408
400
 
409
401
  if (session->transaction.stmt.hasModifiedNonTransData())
410
402
    session->transaction.all.markModifiedNonTransData();
411
403
 
412
404
  /* ok to client sent only after binlog write and engine commit */
413
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
405
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
414
406
err:
415
407
  assert(transactional_table || !(info.copied || info.deleted) ||
416
408
              session->transaction.stmt.hasModifiedNonTransData());
417
409
  table->cursor->ha_release_auto_increment();
418
410
  table->auto_increment_field_not_null= false;
419
 
  session->setAbortOnWarning(false);
420
 
 
 
411
  session->abort_on_warning= 0;
421
412
  return(error);
422
413
}
423
414
 
442
433
 
443
434
  while (!read_info.read_fixed_length())
444
435
  {
445
 
    if (session->getKilled())
 
436
    if (session->killed)
446
437
    {
447
438
      session->send_kill_message();
448
439
      return(1);
460
451
    }
461
452
    it.rewind();
462
453
    unsigned char *pos=read_info.row_start;
463
 
#ifdef HAVE_VALGRIND
 
454
#ifdef HAVE_purify
464
455
    read_info.row_end[0]=0;
465
456
#endif
466
457
 
487
478
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
488
479
                            ER_WARN_TOO_FEW_RECORDS,
489
480
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
490
 
 
491
 
        if (not field->maybe_null() and field->is_timestamp())
492
 
            ((field::Epoch::pointer) field)->set_time();
 
481
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
482
            ((Field_timestamp*) field)->set_time();
493
483
      }
494
484
      else
495
485
      {
516
506
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
517
507
    }
518
508
 
519
 
    if (session->getKilled() ||
 
509
    if (session->killed ||
520
510
        fill_record(session, set_fields, set_values,
521
511
                    ignore_check_option_errors))
522
512
      return(1);
565
555
 
566
556
  for (;;it.rewind())
567
557
  {
568
 
    if (session->getKilled())
 
558
    if (session->killed)
569
559
    {
570
560
      session->send_kill_message();
571
561
      return(1);
605
595
            return(1);
606
596
          }
607
597
          field->set_null();
608
 
          if (not field->maybe_null())
 
598
          if (!field->maybe_null())
609
599
          {
610
 
            if (field->is_timestamp())
611
 
            {
612
 
              ((field::Epoch::pointer) field)->set_time();
613
 
            }
 
600
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
601
              ((Field_timestamp*) field)->set_time();
614
602
            else if (field != table->next_number_field)
615
 
            {
616
 
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
617
 
            }
 
603
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
604
                                 ER_WARN_NULL_TO_NOTNULL, 1);
618
605
          }
619
606
        }
620
607
        else if (item->type() == Item::STRING_ITEM)
675
662
                     session->row_count);
676
663
            return(1);
677
664
          }
678
 
          if (not field->maybe_null() and field->is_timestamp())
679
 
              ((field::Epoch::pointer) field)->set_time();
 
665
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
666
              ((Field_timestamp*) field)->set_time();
680
667
          /*
681
668
            QQ: We probably should not throw warning for each field.
682
669
            But how about intention to always have the same number
701
688
      }
702
689
    }
703
690
 
704
 
    if (session->getKilled() ||
 
691
    if (session->killed ||
705
692
        fill_record(session, set_fields, set_values,
706
693
                    ignore_check_option_errors))
707
694
      return(1);
722
709
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
723
710
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
724
711
                          session->row_count);
725
 
      if (session->getKilled())
 
712
      if (session->killed)
726
713
        return(1);
727
714
    }
728
715
    session->row_count++;
796
783
 
797
784
 
798
785
  /* Set of a stack for unget if long terminators */
799
 
  size_t length= max(field_term_length,line_term_length)+1;
800
 
  set_if_bigger(length, line_start.length());
 
786
  uint32_t length= max(field_term_length,line_term_length)+1;
 
787
  set_if_bigger(length,line_start.length());
801
788
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
802
789
 
803
790
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
805
792
  else
806
793
  {
807
794
    end_of_buff=buffer+buff_length;
808
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
809
 
                            (false) ? internal::READ_NET :
810
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
811
 
                            MYF(MY_WME)))
 
795
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
 
796
                      (false) ? internal::READ_NET :
 
797
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
798
                      MYF(MY_WME)))
812
799
    {
813
800
      free((unsigned char*) buffer);
814
801
      error=1;
831
818
  if (!error)
832
819
  {
833
820
    if (need_end_io_cache)
834
 
      cache.end_io_cache();
 
821
      internal::end_io_cache(&cache);
835
822
    free(buffer);
836
823
    error=1;
837
824
  }