~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Brian Aker
  • Date: 2010-06-05 00:15:57 UTC
  • mfrom: (1589.1.1 drizzle_events)
  • Revision ID: brian@gaz-20100605001557-j1k41ni5k9mis891
Merge in Barry.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
 
#include <config.h>
20
 
 
 
19
#include "config.h"
21
20
#include <drizzled/sql_load.h>
22
21
#include <drizzled/error.h>
23
22
#include <drizzled/data_home.h>
24
23
#include <drizzled/session.h>
25
24
#include <drizzled/sql_base.h>
26
 
#include <drizzled/field/epoch.h>
27
 
#include <drizzled/internal/my_sys.h>
28
 
#include <drizzled/internal/iocache.h>
29
 
#include <drizzled/plugin/storage_engine.h>
 
25
#include <drizzled/field/timestamp.h>
 
26
#include "drizzled/internal/my_sys.h"
 
27
#include "drizzled/internal/iocache.h"
 
28
#include <drizzled/db.h>
30
29
 
31
30
#include <sys/stat.h>
32
31
#include <fcntl.h>
33
32
#include <algorithm>
34
33
#include <climits>
35
 
#include <boost/filesystem.hpp>
36
34
 
37
 
namespace fs=boost::filesystem;
38
35
using namespace std;
39
36
namespace drizzled
40
37
{
76
73
  */
77
74
  void end_io_cache()
78
75
  {
79
 
    cache.end_io_cache();
 
76
    internal::end_io_cache(&cache);
80
77
    need_end_io_cache = 0;
81
78
  }
82
79
 
83
80
  /*
84
81
    Either this method, or we need to make cache public
85
 
    Arg must be set from load() since constructor does not see
 
82
    Arg must be set from mysql_load() since constructor does not see
86
83
    either the table or Session value
87
84
  */
88
85
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
89
86
};
90
87
 
91
 
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
88
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
92
89
                             List<Item> &fields_vars, List<Item> &set_fields,
93
90
                             List<Item> &set_values, READ_INFO &read_info,
94
91
                             uint32_t skip_lines,
95
92
                             bool ignore_check_option_errors);
96
 
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
93
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
97
94
                          List<Item> &fields_vars, List<Item> &set_fields,
98
95
                          List<Item> &set_values, READ_INFO &read_info,
99
96
                          String &enclosed, uint32_t skip_lines,
104
101
  Execute LOAD DATA query
105
102
 
106
103
  SYNOPSYS
107
 
    load()
 
104
    mysql_load()
108
105
      session - current thread
109
106
      ex  - file_exchange object representing source cursor and its parsing rules
110
107
      table_list  - list of tables to which we are loading data
120
117
    true - error / false - success
121
118
*/
122
119
 
123
 
int load(Session *session,file_exchange *ex,TableList *table_list,
 
120
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
124
121
                List<Item> &fields_vars, List<Item> &set_fields,
125
122
                List<Item> &set_values,
126
123
                enum enum_duplicates handle_duplicates, bool ignore)
127
124
{
 
125
  char name[FN_REFLEN];
128
126
  int file;
129
127
  Table *table= NULL;
130
128
  int error;
131
129
  String *field_term=ex->field_term,*escaped=ex->escaped;
132
130
  String *enclosed=ex->enclosed;
133
131
  bool is_fifo=0;
134
 
 
135
 
  assert(table_list->getSchemaName()); // This should never be null
136
 
 
 
132
  char *db= table_list->db;                     // This is never null
 
133
  assert(db);
137
134
  /*
138
135
    If path for cursor is not defined, we will use the current database.
139
136
    If this is not set, we will use the directory where the table to be
140
137
    loaded is located
141
138
  */
142
 
  util::string::const_shared_ptr schema(session->schema());
143
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
139
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
144
140
  assert(tdb);
145
141
  uint32_t skip_lines= ex->skip_lines;
146
142
  bool transactional_table;
147
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
143
  Session::killed_state killed_status= Session::NOT_KILLED;
148
144
 
149
145
  /* Escape and enclosed character may be a utf8 4-byte character */
150
146
  if (escaped->length() > 4 || enclosed->length() > 4)
156
152
  if (session->openTablesLock(table_list))
157
153
    return(true);
158
154
 
159
 
  if (setup_tables_and_check_access(session, &session->getLex()->select_lex.context,
160
 
                                    &session->getLex()->select_lex.top_join_list,
 
155
  if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
 
156
                                    &session->lex->select_lex.top_join_list,
161
157
                                    table_list,
162
 
                                    &session->getLex()->select_lex.leaf_tables, true))
 
158
                                    &session->lex->select_lex.leaf_tables, true))
163
159
     return(-1);
164
160
 
165
161
  /*
172
168
  */
173
169
  if (unique_table(table_list, table_list->next_global))
174
170
  {
175
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
171
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
176
172
    return(true);
177
173
  }
178
174
 
207
203
    */
208
204
    if (table->timestamp_field)
209
205
    {
210
 
      if (table->isWriteSet(table->timestamp_field->position()))
211
 
      {
 
206
      if (table->isWriteSet(table->timestamp_field->field_index))
212
207
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
213
 
      }
214
208
      else
215
209
      {
216
 
        table->setWriteSet(table->timestamp_field->position());
 
210
        table->setWriteSet(table->timestamp_field->field_index);
217
211
      }
218
212
    }
219
213
    /* Fix the expressions in SET clause */
225
219
 
226
220
  size_t tot_length=0;
227
221
  bool use_blobs= 0, use_vars= 0;
228
 
  List<Item>::iterator it(fields_vars.begin());
 
222
  List_iterator_fast<Item> it(fields_vars);
229
223
  Item *item;
230
224
 
231
225
  while ((item= it++))
258
252
    return(true);
259
253
  }
260
254
 
261
 
  fs::path to_file(ex->file_name);
262
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
263
 
  if (not to_file.has_root_directory())
264
 
  {
265
 
    int count_elements= 0;
266
 
    for (fs::path::iterator iter= to_file.begin();
267
 
         iter != to_file.end();
268
 
         ++iter, ++count_elements)
269
 
    { }
270
 
 
271
 
    if (count_elements == 1)
272
 
    {
273
 
      target_path /= tdb;
274
 
    }
275
 
    target_path /= to_file;
276
 
  }
277
 
  else
278
 
  {
279
 
    target_path= to_file;
280
 
  }
281
 
 
282
 
  if (not secure_file_priv.string().empty())
283
 
  {
284
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
285
 
    {
286
 
      /* Read only allowed from within dir specified by secure_file_priv */
287
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
255
  {
 
256
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
257
    ex->file_name+=dirname_length(ex->file_name);
 
258
#endif
 
259
    if (!internal::dirname_length(ex->file_name))
 
260
    {
 
261
      strcpy(name, data_home_real);
 
262
      strncat(name, tdb, FN_REFLEN-strlen(data_home_real)-1);
 
263
      (void) internal::fn_format(name, ex->file_name, name, "",
 
264
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
265
    }
 
266
    else
 
267
    {
 
268
      (void) internal::fn_format(name, ex->file_name, data_home_real, "",
 
269
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
270
 
 
271
      if (opt_secure_file_priv &&
 
272
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
273
      {
 
274
        /* Read only allowed from within dir specified by secure_file_priv */
 
275
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
276
        return(true);
 
277
      }
 
278
 
 
279
      struct stat stat_info;
 
280
      if (stat(name,&stat_info))
 
281
      {
 
282
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
 
283
        return(true);
 
284
      }
 
285
 
 
286
      // if we are not in slave thread, the cursor must be:
 
287
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
288
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
289
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
290
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
291
      {
 
292
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
293
        return(true);
 
294
      }
 
295
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
296
        is_fifo = 1;
 
297
    }
 
298
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
299
    {
 
300
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
288
301
      return(true);
289
302
    }
290
303
  }
291
304
 
292
 
  struct stat stat_info;
293
 
  if (stat(target_path.file_string().c_str(), &stat_info))
294
 
  {
295
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
296
 
    return(true);
297
 
  }
298
 
 
299
 
  // if we are not in slave thread, the cursor must be:
300
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
301
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
302
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
303
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
304
 
  {
305
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
306
 
    return(true);
307
 
  }
308
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
309
 
    is_fifo = 1;
310
 
 
311
 
 
312
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
313
 
  {
314
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
315
 
    return(true);
316
 
  }
317
 
  CopyInfo info;
 
305
  COPY_INFO info;
318
306
  memset(&info, 0, sizeof(info));
319
307
  info.ignore= ignore;
320
308
  info.handle_duplicates=handle_duplicates;
321
309
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
322
310
 
323
 
  identifier::Schema identifier(*schema);
 
311
  SchemaIdentifier identifier(session->db);
324
312
  READ_INFO read_info(file, tot_length,
325
313
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
326
314
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
366
354
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
367
355
    table->copy_blobs=1;
368
356
 
369
 
    session->setAbortOnWarning(true);
 
357
    session->abort_on_warning= true;
370
358
 
371
359
    if (!field_term->length() && !enclosed->length())
372
360
      error= read_fixed_length(session, info, table_list, fields_vars,
389
377
    internal::my_close(file,MYF(0));
390
378
  free_blobs(table);                            /* if pack_blob was used */
391
379
  table->copy_blobs=0;
392
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
380
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
393
381
  /*
394
382
     simulated killing in the middle of per-row loop
395
383
     must be effective for binlogging
396
384
  */
397
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
385
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
398
386
  if (error)
399
387
  {
400
388
    error= -1;                          // Error on read
401
389
    goto err;
402
390
  }
403
 
 
404
 
  char msg[FN_REFLEN];
405
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
406
 
           (info.records - info.copied), session->cuted_fields);
 
391
  snprintf(name, sizeof(name), ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
392
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
407
393
 
408
394
  if (session->transaction.stmt.hasModifiedNonTransData())
409
395
    session->transaction.all.markModifiedNonTransData();
410
396
 
411
397
  /* ok to client sent only after binlog write and engine commit */
412
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
398
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
413
399
err:
414
400
  assert(transactional_table || !(info.copied || info.deleted) ||
415
401
              session->transaction.stmt.hasModifiedNonTransData());
416
402
  table->cursor->ha_release_auto_increment();
417
403
  table->auto_increment_field_not_null= false;
418
 
  session->setAbortOnWarning(false);
419
 
 
 
404
  session->abort_on_warning= 0;
420
405
  return(error);
421
406
}
422
407
 
426
411
****************************************************************************/
427
412
 
428
413
static int
429
 
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
414
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
430
415
                  List<Item> &fields_vars, List<Item> &set_fields,
431
416
                  List<Item> &set_values, READ_INFO &read_info,
432
417
                  uint32_t skip_lines, bool ignore_check_option_errors)
433
418
{
434
 
  List<Item>::iterator it(fields_vars.begin());
 
419
  List_iterator_fast<Item> it(fields_vars);
435
420
  Item_field *sql_field;
436
421
  Table *table= table_list->table;
437
422
  uint64_t id;
441
426
 
442
427
  while (!read_info.read_fixed_length())
443
428
  {
444
 
    if (session->getKilled())
 
429
    if (session->killed)
445
430
    {
446
431
      session->send_kill_message();
447
432
      return(1);
457
442
      skip_lines--;
458
443
      continue;
459
444
    }
460
 
    it= fields_vars.begin();
 
445
    it.rewind();
461
446
    unsigned char *pos=read_info.row_start;
462
 
#ifdef HAVE_VALGRIND
 
447
#ifdef HAVE_purify
463
448
    read_info.row_end[0]=0;
464
449
#endif
465
450
 
486
471
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
487
472
                            ER_WARN_TOO_FEW_RECORDS,
488
473
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
489
 
 
490
 
        if (not field->maybe_null() and field->is_timestamp())
491
 
            ((field::Epoch::pointer) field)->set_time();
 
474
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
475
            ((Field_timestamp*) field)->set_time();
492
476
      }
493
477
      else
494
478
      {
515
499
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
516
500
    }
517
501
 
518
 
    if (session->getKilled() ||
 
502
    if (session->killed ||
519
503
        fill_record(session, set_fields, set_values,
520
504
                    ignore_check_option_errors))
521
505
      return(1);
546
530
 
547
531
 
548
532
static int
549
 
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
533
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
550
534
               List<Item> &fields_vars, List<Item> &set_fields,
551
535
               List<Item> &set_values, READ_INFO &read_info,
552
536
               String &enclosed, uint32_t skip_lines,
553
537
               bool ignore_check_option_errors)
554
538
{
555
 
  List<Item>::iterator it(fields_vars.begin());
 
539
  List_iterator_fast<Item> it(fields_vars);
556
540
  Item *item;
557
541
  Table *table= table_list->table;
558
542
  uint32_t enclosed_length;
562
546
  enclosed_length=enclosed.length();
563
547
  id= 0;
564
548
 
565
 
  for (;;it= fields_vars.begin())
 
549
  for (;;it.rewind())
566
550
  {
567
 
    if (session->getKilled())
 
551
    if (session->killed)
568
552
    {
569
553
      session->send_kill_message();
570
554
      return(1);
604
588
            return(1);
605
589
          }
606
590
          field->set_null();
607
 
          if (not field->maybe_null())
 
591
          if (!field->maybe_null())
608
592
          {
609
 
            if (field->is_timestamp())
610
 
            {
611
 
              ((field::Epoch::pointer) field)->set_time();
612
 
            }
 
593
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
594
              ((Field_timestamp*) field)->set_time();
613
595
            else if (field != table->next_number_field)
614
 
            {
615
 
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
616
 
            }
 
596
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
597
                                 ER_WARN_NULL_TO_NOTNULL, 1);
617
598
          }
618
599
        }
619
600
        else if (item->type() == Item::STRING_ITEM)
674
655
                     session->row_count);
675
656
            return(1);
676
657
          }
677
 
          if (not field->maybe_null() and field->is_timestamp())
678
 
              ((field::Epoch::pointer) field)->set_time();
 
658
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
659
              ((Field_timestamp*) field)->set_time();
679
660
          /*
680
661
            QQ: We probably should not throw warning for each field.
681
662
            But how about intention to always have the same number
700
681
      }
701
682
    }
702
683
 
703
 
    if (session->getKilled() ||
 
684
    if (session->killed ||
704
685
        fill_record(session, set_fields, set_values,
705
686
                    ignore_check_option_errors))
706
687
      return(1);
721
702
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
722
703
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
723
704
                          session->row_count);
724
 
      if (session->getKilled())
 
705
      if (session->killed)
725
706
        return(1);
726
707
    }
727
708
    session->row_count++;
795
776
 
796
777
 
797
778
  /* Set of a stack for unget if long terminators */
798
 
  size_t length= max(field_term_length,line_term_length)+1;
799
 
  set_if_bigger(length, line_start.length());
 
779
  uint32_t length= max(field_term_length,line_term_length)+1;
 
780
  set_if_bigger(length,line_start.length());
800
781
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
801
782
 
802
783
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
804
785
  else
805
786
  {
806
787
    end_of_buff=buffer+buff_length;
807
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
808
 
                            (false) ? internal::READ_NET :
809
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
810
 
                            MYF(MY_WME)))
 
788
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
 
789
                      (false) ? internal::READ_NET :
 
790
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
791
                      MYF(MY_WME)))
811
792
    {
812
793
      free((unsigned char*) buffer);
813
794
      error=1;
830
811
  if (!error)
831
812
  {
832
813
    if (need_end_io_cache)
833
 
      cache.end_io_cache();
 
814
      internal::end_io_cache(&cache);
834
815
    free(buffer);
835
816
    error=1;
836
817
  }