~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Stewart Smith
  • Date: 2011-01-14 05:18:23 UTC
  • mto: (2086.1.3 build)
  • mto: This revision was merged to the branch mainline in revision 2087.
  • Revision ID: stewart@flamingspork.com-20110114051823-14fyn2kvg8pc5a15
\r and trailing whitespace removed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
22
22
#include <drizzled/data_home.h>
23
23
#include <drizzled/session.h>
24
24
#include <drizzled/sql_base.h>
25
 
#include <drizzled/field/timestamp.h>
 
25
#include <drizzled/field/epoch.h>
26
26
#include "drizzled/internal/my_sys.h"
27
27
#include "drizzled/internal/iocache.h"
28
28
#include <drizzled/db.h>
31
31
#include <fcntl.h>
32
32
#include <algorithm>
33
33
#include <climits>
 
34
#include <boost/filesystem.hpp>
34
35
 
 
36
namespace fs=boost::filesystem;
35
37
using namespace std;
36
38
namespace drizzled
37
39
{
73
75
  */
74
76
  void end_io_cache()
75
77
  {
76
 
    internal::end_io_cache(&cache);
 
78
    cache.end_io_cache();
77
79
    need_end_io_cache = 0;
78
80
  }
79
81
 
80
82
  /*
81
83
    Either this method, or we need to make cache public
82
 
    Arg must be set from mysql_load() since constructor does not see
 
84
    Arg must be set from load() since constructor does not see
83
85
    either the table or Session value
84
86
  */
85
87
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
86
88
};
87
89
 
88
 
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
90
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
89
91
                             List<Item> &fields_vars, List<Item> &set_fields,
90
92
                             List<Item> &set_values, READ_INFO &read_info,
91
93
                             uint32_t skip_lines,
92
94
                             bool ignore_check_option_errors);
93
 
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
95
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
94
96
                          List<Item> &fields_vars, List<Item> &set_fields,
95
97
                          List<Item> &set_values, READ_INFO &read_info,
96
98
                          String &enclosed, uint32_t skip_lines,
101
103
  Execute LOAD DATA query
102
104
 
103
105
  SYNOPSYS
104
 
    mysql_load()
 
106
    load()
105
107
      session - current thread
106
108
      ex  - file_exchange object representing source cursor and its parsing rules
107
109
      table_list  - list of tables to which we are loading data
117
119
    true - error / false - success
118
120
*/
119
121
 
120
 
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
 
122
int load(Session *session,file_exchange *ex,TableList *table_list,
121
123
                List<Item> &fields_vars, List<Item> &set_fields,
122
124
                List<Item> &set_values,
123
125
                enum enum_duplicates handle_duplicates, bool ignore)
124
126
{
125
 
  char name[FN_REFLEN];
126
127
  int file;
127
128
  Table *table= NULL;
128
129
  int error;
129
130
  String *field_term=ex->field_term,*escaped=ex->escaped;
130
131
  String *enclosed=ex->enclosed;
131
132
  bool is_fifo=0;
132
 
  char *db= table_list->db;                     // This is never null
133
 
  assert(db);
 
133
 
 
134
  assert(table_list->getSchemaName()); // This should never be null
 
135
 
134
136
  /*
135
137
    If path for cursor is not defined, we will use the current database.
136
138
    If this is not set, we will use the directory where the table to be
137
139
    loaded is located
138
140
  */
139
 
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
 
141
  util::string::const_shared_ptr schema(session->schema());
 
142
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
140
143
  assert(tdb);
141
144
  uint32_t skip_lines= ex->skip_lines;
142
145
  bool transactional_table;
143
 
  Session::killed_state killed_status= Session::NOT_KILLED;
 
146
  Session::killed_state_t killed_status= Session::NOT_KILLED;
144
147
 
145
148
  /* Escape and enclosed character may be a utf8 4-byte character */
146
149
  if (escaped->length() > 4 || enclosed->length() > 4)
168
171
  */
169
172
  if (unique_table(table_list, table_list->next_global))
170
173
  {
171
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
 
174
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
172
175
    return(true);
173
176
  }
174
177
 
178
181
  if (!fields_vars.elements)
179
182
  {
180
183
    Field **field;
181
 
    for (field=table->field; *field ; field++)
 
184
    for (field= table->getFields(); *field ; field++)
182
185
      fields_vars.push_back(new Item_field(*field));
183
186
    table->setWriteSet();
184
187
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
203
206
    */
204
207
    if (table->timestamp_field)
205
208
    {
206
 
      if (table->isWriteSet(table->timestamp_field->field_index))
 
209
      if (table->isWriteSet(table->timestamp_field->position()))
 
210
      {
207
211
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
212
      }
208
213
      else
209
214
      {
210
 
        table->setWriteSet(table->timestamp_field->field_index);
 
215
        table->setWriteSet(table->timestamp_field->position());
211
216
      }
212
217
    }
213
218
    /* Fix the expressions in SET clause */
252
257
    return(true);
253
258
  }
254
259
 
255
 
  {
256
 
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
257
 
    ex->file_name+=dirname_length(ex->file_name);
258
 
#endif
259
 
    if (!internal::dirname_length(ex->file_name))
260
 
    {
261
 
      strcpy(name, drizzle_real_data_home);
262
 
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
263
 
      (void) internal::fn_format(name, ex->file_name, name, "",
264
 
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
265
 
    }
266
 
    else
267
 
    {
268
 
      (void) internal::fn_format(name, ex->file_name, drizzle_real_data_home, "",
269
 
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
270
 
 
271
 
      if (opt_secure_file_priv &&
272
 
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
273
 
      {
274
 
        /* Read only allowed from within dir specified by secure_file_priv */
275
 
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
276
 
        return(true);
277
 
      }
278
 
 
279
 
      struct stat stat_info;
280
 
      if (stat(name,&stat_info))
281
 
      {
282
 
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
283
 
        return(true);
284
 
      }
285
 
 
286
 
      // if we are not in slave thread, the cursor must be:
287
 
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
288
 
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
 
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
 
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
291
 
      {
292
 
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
293
 
        return(true);
294
 
      }
295
 
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
296
 
        is_fifo = 1;
297
 
    }
298
 
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
299
 
    {
300
 
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
 
260
  fs::path to_file(ex->file_name);
 
261
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
 
262
  if (not to_file.has_root_directory())
 
263
  {
 
264
    int count_elements= 0;
 
265
    for (fs::path::iterator iter= to_file.begin();
 
266
         iter != to_file.end();
 
267
         ++iter, ++count_elements)
 
268
    { }
 
269
 
 
270
    if (count_elements == 1)
 
271
    {
 
272
      target_path /= tdb;
 
273
    }
 
274
    target_path /= to_file;
 
275
  }
 
276
  else
 
277
  {
 
278
    target_path= to_file;
 
279
  }
 
280
 
 
281
  if (not secure_file_priv.string().empty())
 
282
  {
 
283
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
 
284
    {
 
285
      /* Read only allowed from within dir specified by secure_file_priv */
 
286
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
301
287
      return(true);
302
288
    }
303
289
  }
304
290
 
305
 
  COPY_INFO info;
 
291
  struct stat stat_info;
 
292
  if (stat(target_path.file_string().c_str(), &stat_info))
 
293
  {
 
294
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
 
295
    return(true);
 
296
  }
 
297
 
 
298
  // if we are not in slave thread, the cursor must be:
 
299
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
300
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
301
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
302
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
303
  {
 
304
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
 
305
    return(true);
 
306
  }
 
307
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
308
    is_fifo = 1;
 
309
 
 
310
 
 
311
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
 
312
  {
 
313
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
 
314
    return(true);
 
315
  }
 
316
  CopyInfo info;
306
317
  memset(&info, 0, sizeof(info));
307
318
  info.ignore= ignore;
308
319
  info.handle_duplicates=handle_duplicates;
309
320
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
310
321
 
311
 
  SchemaIdentifier identifier(session->db);
 
322
  SchemaIdentifier identifier(*schema);
312
323
  READ_INFO read_info(file, tot_length,
313
324
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
314
325
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
377
388
    internal::my_close(file,MYF(0));
378
389
  free_blobs(table);                            /* if pack_blob was used */
379
390
  table->copy_blobs=0;
380
 
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
 
391
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
381
392
  /*
382
393
     simulated killing in the middle of per-row loop
383
394
     must be effective for binlogging
384
395
  */
385
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
 
396
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
386
397
  if (error)
387
398
  {
388
399
    error= -1;                          // Error on read
389
400
    goto err;
390
401
  }
391
 
  snprintf(name, sizeof(name), ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
392
 
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
402
 
 
403
  char msg[FN_REFLEN];
 
404
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
 
405
           (info.records - info.copied), session->cuted_fields);
393
406
 
394
407
  if (session->transaction.stmt.hasModifiedNonTransData())
395
408
    session->transaction.all.markModifiedNonTransData();
396
409
 
397
410
  /* ok to client sent only after binlog write and engine commit */
398
 
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
 
411
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
399
412
err:
400
413
  assert(transactional_table || !(info.copied || info.deleted) ||
401
414
              session->transaction.stmt.hasModifiedNonTransData());
411
424
****************************************************************************/
412
425
 
413
426
static int
414
 
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
427
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
415
428
                  List<Item> &fields_vars, List<Item> &set_fields,
416
429
                  List<Item> &set_values, READ_INFO &read_info,
417
430
                  uint32_t skip_lines, bool ignore_check_option_errors)
426
439
 
427
440
  while (!read_info.read_fixed_length())
428
441
  {
429
 
    if (session->killed)
 
442
    if (session->getKilled())
430
443
    {
431
444
      session->send_kill_message();
432
445
      return(1);
444
457
    }
445
458
    it.rewind();
446
459
    unsigned char *pos=read_info.row_start;
447
 
#ifdef HAVE_purify
 
460
#ifdef HAVE_VALGRIND
448
461
    read_info.row_end[0]=0;
449
462
#endif
450
463
 
471
484
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
472
485
                            ER_WARN_TOO_FEW_RECORDS,
473
486
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
474
 
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
475
 
            ((Field_timestamp*) field)->set_time();
 
487
 
 
488
        if (not field->maybe_null() and field->is_timestamp())
 
489
            ((field::Epoch::pointer) field)->set_time();
476
490
      }
477
491
      else
478
492
      {
499
513
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
500
514
    }
501
515
 
502
 
    if (session->killed ||
 
516
    if (session->getKilled() ||
503
517
        fill_record(session, set_fields, set_values,
504
518
                    ignore_check_option_errors))
505
519
      return(1);
530
544
 
531
545
 
532
546
static int
533
 
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
547
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
534
548
               List<Item> &fields_vars, List<Item> &set_fields,
535
549
               List<Item> &set_values, READ_INFO &read_info,
536
550
               String &enclosed, uint32_t skip_lines,
548
562
 
549
563
  for (;;it.rewind())
550
564
  {
551
 
    if (session->killed)
 
565
    if (session->getKilled())
552
566
    {
553
567
      session->send_kill_message();
554
568
      return(1);
588
602
            return(1);
589
603
          }
590
604
          field->set_null();
591
 
          if (!field->maybe_null())
 
605
          if (not field->maybe_null())
592
606
          {
593
 
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
594
 
              ((Field_timestamp*) field)->set_time();
 
607
            if (field->is_timestamp())
 
608
            {
 
609
              ((field::Epoch::pointer) field)->set_time();
 
610
            }
595
611
            else if (field != table->next_number_field)
596
 
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
597
 
                                 ER_WARN_NULL_TO_NOTNULL, 1);
 
612
            {
 
613
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
 
614
            }
598
615
          }
599
616
        }
600
617
        else if (item->type() == Item::STRING_ITEM)
655
672
                     session->row_count);
656
673
            return(1);
657
674
          }
658
 
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
659
 
              ((Field_timestamp*) field)->set_time();
 
675
          if (not field->maybe_null() and field->is_timestamp())
 
676
              ((field::Epoch::pointer) field)->set_time();
660
677
          /*
661
678
            QQ: We probably should not throw warning for each field.
662
679
            But how about intention to always have the same number
681
698
      }
682
699
    }
683
700
 
684
 
    if (session->killed ||
 
701
    if (session->getKilled() ||
685
702
        fill_record(session, set_fields, set_values,
686
703
                    ignore_check_option_errors))
687
704
      return(1);
702
719
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
703
720
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
704
721
                          session->row_count);
705
 
      if (session->killed)
 
722
      if (session->getKilled())
706
723
        return(1);
707
724
    }
708
725
    session->row_count++;
776
793
 
777
794
 
778
795
  /* Set of a stack for unget if long terminators */
779
 
  uint32_t length= max(field_term_length,line_term_length)+1;
780
 
  set_if_bigger(length,line_start.length());
 
796
  size_t length= max(field_term_length,line_term_length)+1;
 
797
  set_if_bigger(length, line_start.length());
781
798
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
782
799
 
783
800
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
785
802
  else
786
803
  {
787
804
    end_of_buff=buffer+buff_length;
788
 
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
789
 
                      (false) ? internal::READ_NET :
790
 
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
791
 
                      MYF(MY_WME)))
 
805
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
 
806
                            (false) ? internal::READ_NET :
 
807
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
808
                            MYF(MY_WME)))
792
809
    {
793
810
      free((unsigned char*) buffer);
794
811
      error=1;
811
828
  if (!error)
812
829
  {
813
830
    if (need_end_io_cache)
814
 
      internal::end_io_cache(&cache);
 
831
      cache.end_io_cache();
815
832
    free(buffer);
816
833
    error=1;
817
834
  }