~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
19
#include "config.h"
 
20
 
20
21
#include <drizzled/sql_load.h>
21
22
#include <drizzled/error.h>
22
23
#include <drizzled/data_home.h>
23
24
#include <drizzled/session.h>
24
25
#include <drizzled/sql_base.h>
25
 
#include <drizzled/field/timestamp.h>
 
26
#include <drizzled/field/epoch.h>
26
27
#include "drizzled/internal/my_sys.h"
27
28
#include "drizzled/internal/iocache.h"
28
29
#include <drizzled/db.h>
 
30
#include "drizzled/plugin/storage_engine.h"
29
31
 
30
32
#include <sys/stat.h>
31
33
#include <fcntl.h>
32
34
#include <algorithm>
33
35
#include <climits>
 
36
#include <boost/filesystem.hpp>
34
37
 
 
38
namespace fs=boost::filesystem;
35
39
using namespace std;
36
40
namespace drizzled
37
41
{
73
77
  */
74
78
  void end_io_cache()
75
79
  {
76
 
    internal::end_io_cache(&cache);
 
80
    cache.end_io_cache();
77
81
    need_end_io_cache = 0;
78
82
  }
79
83
 
80
84
  /*
81
85
    Either this method, or we need to make cache public
82
 
    Arg must be set from mysql_load() since constructor does not see
 
86
    Arg must be set from load() since constructor does not see
83
87
    either the table or Session value
84
88
  */
85
89
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
86
90
};
87
91
 
88
 
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
92
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
89
93
                             List<Item> &fields_vars, List<Item> &set_fields,
90
94
                             List<Item> &set_values, READ_INFO &read_info,
91
95
                             uint32_t skip_lines,
92
96
                             bool ignore_check_option_errors);
93
 
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
97
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
94
98
                          List<Item> &fields_vars, List<Item> &set_fields,
95
99
                          List<Item> &set_values, READ_INFO &read_info,
96
100
                          String &enclosed, uint32_t skip_lines,
101
105
  Execute LOAD DATA query
102
106
 
103
107
  SYNOPSYS
104
 
    mysql_load()
 
108
    load()
105
109
      session - current thread
106
110
      ex  - file_exchange object representing source cursor and its parsing rules
107
111
      table_list  - list of tables to which we are loading data
117
121
    true - error / false - success
118
122
*/
119
123
 
120
 
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
 
124
int load(Session *session,file_exchange *ex,TableList *table_list,
121
125
                List<Item> &fields_vars, List<Item> &set_fields,
122
126
                List<Item> &set_values,
123
127
                enum enum_duplicates handle_duplicates, bool ignore)
124
128
{
125
 
  char name[FN_REFLEN];
126
129
  int file;
127
130
  Table *table= NULL;
128
131
  int error;
129
132
  String *field_term=ex->field_term,*escaped=ex->escaped;
130
133
  String *enclosed=ex->enclosed;
131
134
  bool is_fifo=0;
132
 
  char *db= table_list->db;                     // This is never null
133
 
  assert(db);
 
135
 
 
136
  assert(table_list->getSchemaName()); // This should never be null
 
137
 
134
138
  /*
135
139
    If path for cursor is not defined, we will use the current database.
136
140
    If this is not set, we will use the directory where the table to be
137
141
    loaded is located
138
142
  */
139
 
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
 
143
  util::string::const_shared_ptr schema(session->schema());
 
144
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
140
145
  assert(tdb);
141
146
  uint32_t skip_lines= ex->skip_lines;
142
147
  bool transactional_table;
143
 
  Session::killed_state killed_status= Session::NOT_KILLED;
 
148
  Session::killed_state_t killed_status= Session::NOT_KILLED;
144
149
 
145
150
  /* Escape and enclosed character may be a utf8 4-byte character */
146
151
  if (escaped->length() > 4 || enclosed->length() > 4)
168
173
  */
169
174
  if (unique_table(table_list, table_list->next_global))
170
175
  {
171
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
 
176
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
172
177
    return(true);
173
178
  }
174
179
 
178
183
  if (!fields_vars.elements)
179
184
  {
180
185
    Field **field;
181
 
    for (field=table->field; *field ; field++)
 
186
    for (field= table->getFields(); *field ; field++)
182
187
      fields_vars.push_back(new Item_field(*field));
183
188
    table->setWriteSet();
184
189
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
203
208
    */
204
209
    if (table->timestamp_field)
205
210
    {
206
 
      if (table->isWriteSet(table->timestamp_field->field_index))
 
211
      if (table->isWriteSet(table->timestamp_field->position()))
 
212
      {
207
213
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
214
      }
208
215
      else
209
216
      {
210
 
        table->setWriteSet(table->timestamp_field->field_index);
 
217
        table->setWriteSet(table->timestamp_field->position());
211
218
      }
212
219
    }
213
220
    /* Fix the expressions in SET clause */
252
259
    return(true);
253
260
  }
254
261
 
255
 
  {
256
 
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
257
 
    ex->file_name+=dirname_length(ex->file_name);
258
 
#endif
259
 
    if (!internal::dirname_length(ex->file_name))
260
 
    {
261
 
      strcpy(name, drizzle_real_data_home);
262
 
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
263
 
      (void) internal::fn_format(name, ex->file_name, name, "",
264
 
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
265
 
    }
266
 
    else
267
 
    {
268
 
      (void) internal::fn_format(name, ex->file_name, drizzle_real_data_home, "",
269
 
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
270
 
 
271
 
      if (opt_secure_file_priv &&
272
 
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
273
 
      {
274
 
        /* Read only allowed from within dir specified by secure_file_priv */
275
 
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
276
 
        return(true);
277
 
      }
278
 
 
279
 
      struct stat stat_info;
280
 
      if (stat(name,&stat_info))
281
 
      {
282
 
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
283
 
        return(true);
284
 
      }
285
 
 
286
 
      // if we are not in slave thread, the cursor must be:
287
 
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
288
 
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
 
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
 
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
291
 
      {
292
 
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
293
 
        return(true);
294
 
      }
295
 
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
296
 
        is_fifo = 1;
297
 
    }
298
 
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
299
 
    {
300
 
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
 
262
  fs::path to_file(ex->file_name);
 
263
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
 
264
  if (not to_file.has_root_directory())
 
265
  {
 
266
    int count_elements= 0;
 
267
    for (fs::path::iterator iter= to_file.begin();
 
268
         iter != to_file.end();
 
269
         ++iter, ++count_elements)
 
270
    { }
 
271
 
 
272
    if (count_elements == 1)
 
273
    {
 
274
      target_path /= tdb;
 
275
    }
 
276
    target_path /= to_file;
 
277
  }
 
278
  else
 
279
  {
 
280
    target_path= to_file;
 
281
  }
 
282
 
 
283
  if (not secure_file_priv.string().empty())
 
284
  {
 
285
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
 
286
    {
 
287
      /* Read only allowed from within dir specified by secure_file_priv */
 
288
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
301
289
      return(true);
302
290
    }
303
291
  }
304
292
 
305
 
  COPY_INFO info;
 
293
  struct stat stat_info;
 
294
  if (stat(target_path.file_string().c_str(), &stat_info))
 
295
  {
 
296
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
 
297
    return(true);
 
298
  }
 
299
 
 
300
  // if we are not in slave thread, the cursor must be:
 
301
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
302
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
303
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
304
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
305
  {
 
306
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
 
307
    return(true);
 
308
  }
 
309
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
310
    is_fifo = 1;
 
311
 
 
312
 
 
313
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
 
314
  {
 
315
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
 
316
    return(true);
 
317
  }
 
318
  CopyInfo info;
306
319
  memset(&info, 0, sizeof(info));
307
320
  info.ignore= ignore;
308
321
  info.handle_duplicates=handle_duplicates;
309
322
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
310
323
 
 
324
  identifier::Schema identifier(*schema);
311
325
  READ_INFO read_info(file, tot_length,
312
 
                      ex->cs ? ex->cs : get_default_db_collation(session->db.c_str()),
313
 
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
 
326
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
 
327
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
314
328
                      info.escape_char, is_fifo);
315
329
  if (read_info.error)
316
330
  {
353
367
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
354
368
    table->copy_blobs=1;
355
369
 
356
 
    session->abort_on_warning= true;
 
370
    session->setAbortOnWarning(true);
357
371
 
358
372
    if (!field_term->length() && !enclosed->length())
359
373
      error= read_fixed_length(session, info, table_list, fields_vars,
376
390
    internal::my_close(file,MYF(0));
377
391
  free_blobs(table);                            /* if pack_blob was used */
378
392
  table->copy_blobs=0;
379
 
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
 
393
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
380
394
  /*
381
395
     simulated killing in the middle of per-row loop
382
396
     must be effective for binlogging
383
397
  */
384
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
 
398
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
385
399
  if (error)
386
400
  {
387
401
    error= -1;                          // Error on read
388
402
    goto err;
389
403
  }
390
 
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
391
 
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
404
 
 
405
  char msg[FN_REFLEN];
 
406
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
 
407
           (info.records - info.copied), session->cuted_fields);
392
408
 
393
409
  if (session->transaction.stmt.hasModifiedNonTransData())
394
410
    session->transaction.all.markModifiedNonTransData();
395
411
 
396
412
  /* ok to client sent only after binlog write and engine commit */
397
 
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
 
413
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
398
414
err:
399
415
  assert(transactional_table || !(info.copied || info.deleted) ||
400
416
              session->transaction.stmt.hasModifiedNonTransData());
401
417
  table->cursor->ha_release_auto_increment();
402
418
  table->auto_increment_field_not_null= false;
403
 
  session->abort_on_warning= 0;
 
419
  session->setAbortOnWarning(false);
 
420
 
404
421
  return(error);
405
422
}
406
423
 
410
427
****************************************************************************/
411
428
 
412
429
static int
413
 
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
430
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
414
431
                  List<Item> &fields_vars, List<Item> &set_fields,
415
432
                  List<Item> &set_values, READ_INFO &read_info,
416
433
                  uint32_t skip_lines, bool ignore_check_option_errors)
425
442
 
426
443
  while (!read_info.read_fixed_length())
427
444
  {
428
 
    if (session->killed)
 
445
    if (session->getKilled())
429
446
    {
430
447
      session->send_kill_message();
431
448
      return(1);
443
460
    }
444
461
    it.rewind();
445
462
    unsigned char *pos=read_info.row_start;
446
 
#ifdef HAVE_purify
 
463
#ifdef HAVE_VALGRIND
447
464
    read_info.row_end[0]=0;
448
465
#endif
449
466
 
470
487
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
471
488
                            ER_WARN_TOO_FEW_RECORDS,
472
489
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
473
 
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
474
 
            ((Field_timestamp*) field)->set_time();
 
490
 
 
491
        if (not field->maybe_null() and field->is_timestamp())
 
492
            ((field::Epoch::pointer) field)->set_time();
475
493
      }
476
494
      else
477
495
      {
498
516
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
499
517
    }
500
518
 
501
 
    if (session->killed ||
 
519
    if (session->getKilled() ||
502
520
        fill_record(session, set_fields, set_values,
503
521
                    ignore_check_option_errors))
504
522
      return(1);
529
547
 
530
548
 
531
549
static int
532
 
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
550
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
533
551
               List<Item> &fields_vars, List<Item> &set_fields,
534
552
               List<Item> &set_values, READ_INFO &read_info,
535
553
               String &enclosed, uint32_t skip_lines,
547
565
 
548
566
  for (;;it.rewind())
549
567
  {
550
 
    if (session->killed)
 
568
    if (session->getKilled())
551
569
    {
552
570
      session->send_kill_message();
553
571
      return(1);
587
605
            return(1);
588
606
          }
589
607
          field->set_null();
590
 
          if (!field->maybe_null())
 
608
          if (not field->maybe_null())
591
609
          {
592
 
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
593
 
              ((Field_timestamp*) field)->set_time();
 
610
            if (field->is_timestamp())
 
611
            {
 
612
              ((field::Epoch::pointer) field)->set_time();
 
613
            }
594
614
            else if (field != table->next_number_field)
595
 
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
596
 
                                 ER_WARN_NULL_TO_NOTNULL, 1);
 
615
            {
 
616
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
 
617
            }
597
618
          }
598
619
        }
599
620
        else if (item->type() == Item::STRING_ITEM)
654
675
                     session->row_count);
655
676
            return(1);
656
677
          }
657
 
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
658
 
              ((Field_timestamp*) field)->set_time();
 
678
          if (not field->maybe_null() and field->is_timestamp())
 
679
              ((field::Epoch::pointer) field)->set_time();
659
680
          /*
660
681
            QQ: We probably should not throw warning for each field.
661
682
            But how about intention to always have the same number
680
701
      }
681
702
    }
682
703
 
683
 
    if (session->killed ||
 
704
    if (session->getKilled() ||
684
705
        fill_record(session, set_fields, set_values,
685
706
                    ignore_check_option_errors))
686
707
      return(1);
701
722
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
702
723
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
703
724
                          session->row_count);
704
 
      if (session->killed)
 
725
      if (session->getKilled())
705
726
        return(1);
706
727
    }
707
728
    session->row_count++;
775
796
 
776
797
 
777
798
  /* Set of a stack for unget if long terminators */
778
 
  uint32_t length= max(field_term_length,line_term_length)+1;
779
 
  set_if_bigger(length,line_start.length());
 
799
  size_t length= max(field_term_length,line_term_length)+1;
 
800
  set_if_bigger(length, line_start.length());
780
801
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
781
802
 
782
803
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
784
805
  else
785
806
  {
786
807
    end_of_buff=buffer+buff_length;
787
 
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
788
 
                      (false) ? internal::READ_NET :
789
 
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
790
 
                      MYF(MY_WME)))
 
808
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
 
809
                            (false) ? internal::READ_NET :
 
810
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
811
                            MYF(MY_WME)))
791
812
    {
792
813
      free((unsigned char*) buffer);
793
814
      error=1;
810
831
  if (!error)
811
832
  {
812
833
    if (need_end_io_cache)
813
 
      internal::end_io_cache(&cache);
 
834
      cache.end_io_cache();
814
835
    free(buffer);
815
836
    error=1;
816
837
  }