~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

Updated pandora-build files to version 0.133

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
19
#include "config.h"
20
 
 
21
20
#include <drizzled/sql_load.h>
22
21
#include <drizzled/error.h>
23
22
#include <drizzled/data_home.h>
24
23
#include <drizzled/session.h>
25
24
#include <drizzled/sql_base.h>
26
 
#include <drizzled/field/epoch.h>
 
25
#include <drizzled/field/timestamp.h>
27
26
#include "drizzled/internal/my_sys.h"
28
27
#include "drizzled/internal/iocache.h"
29
28
#include <drizzled/db.h>
30
 
#include "drizzled/plugin/storage_engine.h"
31
29
 
32
30
#include <sys/stat.h>
33
31
#include <fcntl.h>
34
32
#include <algorithm>
35
33
#include <climits>
36
 
#include <boost/filesystem.hpp>
37
34
 
38
 
namespace fs=boost::filesystem;
39
35
using namespace std;
40
36
namespace drizzled
41
37
{
77
73
  */
78
74
  void end_io_cache()
79
75
  {
80
 
    cache.end_io_cache();
 
76
    internal::end_io_cache(&cache);
81
77
    need_end_io_cache = 0;
82
78
  }
83
79
 
84
80
  /*
85
81
    Either this method, or we need to make cache public
86
 
    Arg must be set from load() since constructor does not see
 
82
    Arg must be set from mysql_load() since constructor does not see
87
83
    either the table or Session value
88
84
  */
89
85
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
90
86
};
91
87
 
92
 
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
88
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
93
89
                             List<Item> &fields_vars, List<Item> &set_fields,
94
90
                             List<Item> &set_values, READ_INFO &read_info,
95
91
                             uint32_t skip_lines,
96
92
                             bool ignore_check_option_errors);
97
 
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
93
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
98
94
                          List<Item> &fields_vars, List<Item> &set_fields,
99
95
                          List<Item> &set_values, READ_INFO &read_info,
100
96
                          String &enclosed, uint32_t skip_lines,
105
101
  Execute LOAD DATA query
106
102
 
107
103
  SYNOPSYS
108
 
    load()
 
104
    mysql_load()
109
105
      session - current thread
110
106
      ex  - file_exchange object representing source cursor and its parsing rules
111
107
      table_list  - list of tables to which we are loading data
121
117
    true - error / false - success
122
118
*/
123
119
 
124
 
int load(Session *session,file_exchange *ex,TableList *table_list,
 
120
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
125
121
                List<Item> &fields_vars, List<Item> &set_fields,
126
122
                List<Item> &set_values,
127
123
                enum enum_duplicates handle_duplicates, bool ignore)
128
124
{
 
125
  char name[FN_REFLEN];
129
126
  int file;
130
127
  Table *table= NULL;
131
128
  int error;
132
129
  String *field_term=ex->field_term,*escaped=ex->escaped;
133
130
  String *enclosed=ex->enclosed;
134
131
  bool is_fifo=0;
135
 
 
136
 
  assert(table_list->getSchemaName()); // This should never be null
137
 
 
 
132
  char *db= table_list->db;                     // This is never null
 
133
  assert(db);
138
134
  /*
139
135
    If path for cursor is not defined, we will use the current database.
140
136
    If this is not set, we will use the directory where the table to be
141
137
    loaded is located
142
138
  */
143
 
  util::string::const_shared_ptr schema(session->schema());
144
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
139
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
145
140
  assert(tdb);
146
141
  uint32_t skip_lines= ex->skip_lines;
147
142
  bool transactional_table;
148
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
143
  Session::killed_state killed_status= Session::NOT_KILLED;
149
144
 
150
145
  /* Escape and enclosed character may be a utf8 4-byte character */
151
146
  if (escaped->length() > 4 || enclosed->length() > 4)
173
168
  */
174
169
  if (unique_table(table_list, table_list->next_global))
175
170
  {
176
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
171
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
177
172
    return(true);
178
173
  }
179
174
 
183
178
  if (!fields_vars.elements)
184
179
  {
185
180
    Field **field;
186
 
    for (field= table->getFields(); *field ; field++)
 
181
    for (field=table->field; *field ; field++)
187
182
      fields_vars.push_back(new Item_field(*field));
188
183
    table->setWriteSet();
189
184
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
208
203
    */
209
204
    if (table->timestamp_field)
210
205
    {
211
 
      if (table->isWriteSet(table->timestamp_field->position()))
212
 
      {
 
206
      if (table->isWriteSet(table->timestamp_field->field_index))
213
207
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
214
 
      }
215
208
      else
216
209
      {
217
 
        table->setWriteSet(table->timestamp_field->position());
 
210
        table->setWriteSet(table->timestamp_field->field_index);
218
211
      }
219
212
    }
220
213
    /* Fix the expressions in SET clause */
259
252
    return(true);
260
253
  }
261
254
 
262
 
  fs::path to_file(ex->file_name);
263
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
264
 
  if (not to_file.has_root_directory())
265
 
  {
266
 
    int count_elements= 0;
267
 
    for (fs::path::iterator iter= to_file.begin();
268
 
         iter != to_file.end();
269
 
         ++iter, ++count_elements)
270
 
    { }
271
 
 
272
 
    if (count_elements == 1)
273
 
    {
274
 
      target_path /= tdb;
275
 
    }
276
 
    target_path /= to_file;
277
 
  }
278
 
  else
279
 
  {
280
 
    target_path= to_file;
281
 
  }
282
 
 
283
 
  if (not secure_file_priv.string().empty())
284
 
  {
285
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
286
 
    {
287
 
      /* Read only allowed from within dir specified by secure_file_priv */
288
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
255
  {
 
256
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
257
    ex->file_name+=dirname_length(ex->file_name);
 
258
#endif
 
259
    if (!internal::dirname_length(ex->file_name))
 
260
    {
 
261
      strcpy(name, data_home_real);
 
262
      strncat(name, tdb, FN_REFLEN-strlen(data_home_real)-1);
 
263
      (void) internal::fn_format(name, ex->file_name, name, "",
 
264
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
265
    }
 
266
    else
 
267
    {
 
268
      (void) internal::fn_format(name, ex->file_name, data_home_real, "",
 
269
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
270
 
 
271
      if (opt_secure_file_priv &&
 
272
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
273
      {
 
274
        /* Read only allowed from within dir specified by secure_file_priv */
 
275
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
276
        return(true);
 
277
      }
 
278
 
 
279
      struct stat stat_info;
 
280
      if (stat(name,&stat_info))
 
281
      {
 
282
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
 
283
        return(true);
 
284
      }
 
285
 
 
286
      // if we are not in slave thread, the cursor must be:
 
287
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
288
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
289
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
290
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
291
      {
 
292
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
293
        return(true);
 
294
      }
 
295
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
296
        is_fifo = 1;
 
297
    }
 
298
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
299
    {
 
300
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
289
301
      return(true);
290
302
    }
291
303
  }
292
304
 
293
 
  struct stat stat_info;
294
 
  if (stat(target_path.file_string().c_str(), &stat_info))
295
 
  {
296
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
297
 
    return(true);
298
 
  }
299
 
 
300
 
  // if we are not in slave thread, the cursor must be:
301
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
302
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
303
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
305
 
  {
306
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
 
    return(true);
308
 
  }
309
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
310
 
    is_fifo = 1;
311
 
 
312
 
 
313
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
314
 
  {
315
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
316
 
    return(true);
317
 
  }
318
 
  CopyInfo info;
 
305
  COPY_INFO info;
319
306
  memset(&info, 0, sizeof(info));
320
307
  info.ignore= ignore;
321
308
  info.handle_duplicates=handle_duplicates;
322
309
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
323
310
 
324
 
  identifier::Schema identifier(*schema);
 
311
  SchemaIdentifier identifier(session->db);
325
312
  READ_INFO read_info(file, tot_length,
326
313
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
327
314
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
367
354
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
368
355
    table->copy_blobs=1;
369
356
 
370
 
    session->setAbortOnWarning(true);
 
357
    session->abort_on_warning= true;
371
358
 
372
359
    if (!field_term->length() && !enclosed->length())
373
360
      error= read_fixed_length(session, info, table_list, fields_vars,
390
377
    internal::my_close(file,MYF(0));
391
378
  free_blobs(table);                            /* if pack_blob was used */
392
379
  table->copy_blobs=0;
393
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
380
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
394
381
  /*
395
382
     simulated killing in the middle of per-row loop
396
383
     must be effective for binlogging
397
384
  */
398
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
385
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
399
386
  if (error)
400
387
  {
401
388
    error= -1;                          // Error on read
402
389
    goto err;
403
390
  }
404
 
 
405
 
  char msg[FN_REFLEN];
406
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
407
 
           (info.records - info.copied), session->cuted_fields);
 
391
  snprintf(name, sizeof(name), ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
392
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
408
393
 
409
394
  if (session->transaction.stmt.hasModifiedNonTransData())
410
395
    session->transaction.all.markModifiedNonTransData();
411
396
 
412
397
  /* ok to client sent only after binlog write and engine commit */
413
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
398
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
414
399
err:
415
400
  assert(transactional_table || !(info.copied || info.deleted) ||
416
401
              session->transaction.stmt.hasModifiedNonTransData());
417
402
  table->cursor->ha_release_auto_increment();
418
403
  table->auto_increment_field_not_null= false;
419
 
  session->setAbortOnWarning(false);
420
 
 
 
404
  session->abort_on_warning= 0;
421
405
  return(error);
422
406
}
423
407
 
427
411
****************************************************************************/
428
412
 
429
413
static int
430
 
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
414
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
431
415
                  List<Item> &fields_vars, List<Item> &set_fields,
432
416
                  List<Item> &set_values, READ_INFO &read_info,
433
417
                  uint32_t skip_lines, bool ignore_check_option_errors)
442
426
 
443
427
  while (!read_info.read_fixed_length())
444
428
  {
445
 
    if (session->getKilled())
 
429
    if (session->killed)
446
430
    {
447
431
      session->send_kill_message();
448
432
      return(1);
460
444
    }
461
445
    it.rewind();
462
446
    unsigned char *pos=read_info.row_start;
463
 
#ifdef HAVE_VALGRIND
 
447
#ifdef HAVE_purify
464
448
    read_info.row_end[0]=0;
465
449
#endif
466
450
 
487
471
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
488
472
                            ER_WARN_TOO_FEW_RECORDS,
489
473
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
490
 
 
491
 
        if (not field->maybe_null() and field->is_timestamp())
492
 
            ((field::Epoch::pointer) field)->set_time();
 
474
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
475
            ((Field_timestamp*) field)->set_time();
493
476
      }
494
477
      else
495
478
      {
516
499
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
517
500
    }
518
501
 
519
 
    if (session->getKilled() ||
 
502
    if (session->killed ||
520
503
        fill_record(session, set_fields, set_values,
521
504
                    ignore_check_option_errors))
522
505
      return(1);
547
530
 
548
531
 
549
532
static int
550
 
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
533
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
551
534
               List<Item> &fields_vars, List<Item> &set_fields,
552
535
               List<Item> &set_values, READ_INFO &read_info,
553
536
               String &enclosed, uint32_t skip_lines,
565
548
 
566
549
  for (;;it.rewind())
567
550
  {
568
 
    if (session->getKilled())
 
551
    if (session->killed)
569
552
    {
570
553
      session->send_kill_message();
571
554
      return(1);
605
588
            return(1);
606
589
          }
607
590
          field->set_null();
608
 
          if (not field->maybe_null())
 
591
          if (!field->maybe_null())
609
592
          {
610
 
            if (field->is_timestamp())
611
 
            {
612
 
              ((field::Epoch::pointer) field)->set_time();
613
 
            }
 
593
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
594
              ((Field_timestamp*) field)->set_time();
614
595
            else if (field != table->next_number_field)
615
 
            {
616
 
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
617
 
            }
 
596
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
597
                                 ER_WARN_NULL_TO_NOTNULL, 1);
618
598
          }
619
599
        }
620
600
        else if (item->type() == Item::STRING_ITEM)
675
655
                     session->row_count);
676
656
            return(1);
677
657
          }
678
 
          if (not field->maybe_null() and field->is_timestamp())
679
 
              ((field::Epoch::pointer) field)->set_time();
 
658
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
659
              ((Field_timestamp*) field)->set_time();
680
660
          /*
681
661
            QQ: We probably should not throw warning for each field.
682
662
            But how about intention to always have the same number
701
681
      }
702
682
    }
703
683
 
704
 
    if (session->getKilled() ||
 
684
    if (session->killed ||
705
685
        fill_record(session, set_fields, set_values,
706
686
                    ignore_check_option_errors))
707
687
      return(1);
722
702
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
723
703
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
724
704
                          session->row_count);
725
 
      if (session->getKilled())
 
705
      if (session->killed)
726
706
        return(1);
727
707
    }
728
708
    session->row_count++;
796
776
 
797
777
 
798
778
  /* Set of a stack for unget if long terminators */
799
 
  size_t length= max(field_term_length,line_term_length)+1;
800
 
  set_if_bigger(length, line_start.length());
 
779
  uint32_t length= max(field_term_length,line_term_length)+1;
 
780
  set_if_bigger(length,line_start.length());
801
781
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
802
782
 
803
783
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
805
785
  else
806
786
  {
807
787
    end_of_buff=buffer+buff_length;
808
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
809
 
                            (false) ? internal::READ_NET :
810
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
811
 
                            MYF(MY_WME)))
 
788
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
 
789
                      (false) ? internal::READ_NET :
 
790
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
791
                      MYF(MY_WME)))
812
792
    {
813
793
      free((unsigned char*) buffer);
814
794
      error=1;
831
811
  if (!error)
832
812
  {
833
813
    if (need_end_io_cache)
834
 
      cache.end_io_cache();
 
814
      internal::end_io_cache(&cache);
835
815
    free(buffer);
836
816
    error=1;
837
817
  }