~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

Merge in security refactor.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
18
18
 
19
19
#include "config.h"
20
 
 
21
20
#include <drizzled/sql_load.h>
22
21
#include <drizzled/error.h>
23
22
#include <drizzled/data_home.h>
24
23
#include <drizzled/session.h>
25
24
#include <drizzled/sql_base.h>
26
 
#include <drizzled/field/epoch.h>
 
25
#include <drizzled/field/timestamp.h>
27
26
#include "drizzled/internal/my_sys.h"
28
27
#include "drizzled/internal/iocache.h"
29
28
#include <drizzled/db.h>
30
 
#include "drizzled/plugin/storage_engine.h"
31
29
 
32
30
#include <sys/stat.h>
33
31
#include <fcntl.h>
34
32
#include <algorithm>
35
33
#include <climits>
36
 
#include <boost/filesystem.hpp>
37
34
 
38
 
namespace fs=boost::filesystem;
39
35
using namespace std;
40
36
namespace drizzled
41
37
{
77
73
  */
78
74
  void end_io_cache()
79
75
  {
80
 
    cache.end_io_cache();
 
76
    internal::end_io_cache(&cache);
81
77
    need_end_io_cache = 0;
82
78
  }
83
79
 
84
80
  /*
85
81
    Either this method, or we need to make cache public
86
 
    Arg must be set from load() since constructor does not see
 
82
    Arg must be set from mysql_load() since constructor does not see
87
83
    either the table or Session value
88
84
  */
89
85
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
90
86
};
91
87
 
92
 
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
88
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
93
89
                             List<Item> &fields_vars, List<Item> &set_fields,
94
90
                             List<Item> &set_values, READ_INFO &read_info,
95
91
                             uint32_t skip_lines,
96
92
                             bool ignore_check_option_errors);
97
 
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
93
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
98
94
                          List<Item> &fields_vars, List<Item> &set_fields,
99
95
                          List<Item> &set_values, READ_INFO &read_info,
100
96
                          String &enclosed, uint32_t skip_lines,
105
101
  Execute LOAD DATA query
106
102
 
107
103
  SYNOPSYS
108
 
    load()
 
104
    mysql_load()
109
105
      session - current thread
110
106
      ex  - file_exchange object representing source cursor and its parsing rules
111
107
      table_list  - list of tables to which we are loading data
121
117
    true - error / false - success
122
118
*/
123
119
 
124
 
int load(Session *session,file_exchange *ex,TableList *table_list,
 
120
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
125
121
                List<Item> &fields_vars, List<Item> &set_fields,
126
122
                List<Item> &set_values,
127
123
                enum enum_duplicates handle_duplicates, bool ignore)
128
124
{
 
125
  char name[FN_REFLEN];
129
126
  int file;
130
127
  Table *table= NULL;
131
128
  int error;
132
129
  String *field_term=ex->field_term,*escaped=ex->escaped;
133
130
  String *enclosed=ex->enclosed;
134
131
  bool is_fifo=0;
135
 
 
136
 
  assert(table_list->getSchemaName()); // This should never be null
137
 
 
 
132
  char *db= table_list->db;                     // This is never null
 
133
  assert(db);
138
134
  /*
139
135
    If path for cursor is not defined, we will use the current database.
140
136
    If this is not set, we will use the directory where the table to be
141
137
    loaded is located
142
138
  */
143
 
  util::string::const_shared_ptr schema(session->schema());
144
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
139
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
145
140
  assert(tdb);
146
141
  uint32_t skip_lines= ex->skip_lines;
147
142
  bool transactional_table;
148
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
143
  Session::killed_state killed_status= Session::NOT_KILLED;
149
144
 
150
145
  /* Escape and enclosed character may be a utf8 4-byte character */
151
146
  if (escaped->length() > 4 || enclosed->length() > 4)
173
168
  */
174
169
  if (unique_table(table_list, table_list->next_global))
175
170
  {
176
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
171
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
177
172
    return(true);
178
173
  }
179
174
 
183
178
  if (!fields_vars.elements)
184
179
  {
185
180
    Field **field;
186
 
    for (field= table->getFields(); *field ; field++)
 
181
    for (field=table->field; *field ; field++)
187
182
      fields_vars.push_back(new Item_field(*field));
188
183
    table->setWriteSet();
189
184
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
208
203
    */
209
204
    if (table->timestamp_field)
210
205
    {
211
 
      if (table->isWriteSet(table->timestamp_field->position()))
212
 
      {
 
206
      if (table->isWriteSet(table->timestamp_field->field_index))
213
207
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
214
 
      }
215
208
      else
216
209
      {
217
 
        table->setWriteSet(table->timestamp_field->position());
 
210
        table->setWriteSet(table->timestamp_field->field_index);
218
211
      }
219
212
    }
220
213
    /* Fix the expressions in SET clause */
259
252
    return(true);
260
253
  }
261
254
 
262
 
  fs::path to_file(ex->file_name);
263
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
264
 
  if (not to_file.has_root_directory())
265
 
  {
266
 
    int count_elements= 0;
267
 
    for (fs::path::iterator iter= to_file.begin();
268
 
         iter != to_file.end();
269
 
         ++iter, ++count_elements)
270
 
    { }
271
 
 
272
 
    if (count_elements == 1)
273
 
    {
274
 
      target_path /= tdb;
275
 
    }
276
 
    target_path /= to_file;
277
 
  }
278
 
  else
279
 
  {
280
 
    target_path= to_file;
281
 
  }
282
 
 
283
 
  if (not secure_file_priv.string().empty())
284
 
  {
285
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
286
 
    {
287
 
      /* Read only allowed from within dir specified by secure_file_priv */
288
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
255
  {
 
256
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
257
    ex->file_name+=dirname_length(ex->file_name);
 
258
#endif
 
259
    if (!internal::dirname_length(ex->file_name))
 
260
    {
 
261
      strcpy(name, drizzle_real_data_home);
 
262
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
263
      (void) internal::fn_format(name, ex->file_name, name, "",
 
264
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
265
    }
 
266
    else
 
267
    {
 
268
      (void) internal::fn_format(name, ex->file_name, drizzle_real_data_home, "",
 
269
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
270
 
 
271
      if (opt_secure_file_priv &&
 
272
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
273
      {
 
274
        /* Read only allowed from within dir specified by secure_file_priv */
 
275
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
276
        return(true);
 
277
      }
 
278
 
 
279
      struct stat stat_info;
 
280
      if (stat(name,&stat_info))
 
281
      {
 
282
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
 
283
        return(true);
 
284
      }
 
285
 
 
286
      // if we are not in slave thread, the cursor must be:
 
287
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
288
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
289
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
290
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
291
      {
 
292
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
293
        return(true);
 
294
      }
 
295
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
296
        is_fifo = 1;
 
297
    }
 
298
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
299
    {
 
300
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
289
301
      return(true);
290
302
    }
291
303
  }
292
304
 
293
 
  struct stat stat_info;
294
 
  if (stat(target_path.file_string().c_str(), &stat_info))
295
 
  {
296
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
297
 
    return(true);
298
 
  }
299
 
 
300
 
  // if we are not in slave thread, the cursor must be:
301
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
302
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
303
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
304
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
305
 
  {
306
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
307
 
    return(true);
308
 
  }
309
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
310
 
    is_fifo = 1;
311
 
 
312
 
 
313
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
314
 
  {
315
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
316
 
    return(true);
317
 
  }
318
 
  CopyInfo info;
 
305
  COPY_INFO info;
319
306
  memset(&info, 0, sizeof(info));
320
307
  info.ignore= ignore;
321
308
  info.handle_duplicates=handle_duplicates;
322
309
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
323
310
 
324
 
  identifier::Schema identifier(*schema);
325
311
  READ_INFO read_info(file, tot_length,
326
 
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
327
 
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
 
312
                      ex->cs ? ex->cs : get_default_db_collation(session->db.c_str()),
 
313
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
328
314
                      info.escape_char, is_fifo);
329
315
  if (read_info.error)
330
316
  {
367
353
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
368
354
    table->copy_blobs=1;
369
355
 
370
 
    session->setAbortOnWarning(true);
 
356
    session->abort_on_warning= true;
371
357
 
372
358
    if (!field_term->length() && !enclosed->length())
373
359
      error= read_fixed_length(session, info, table_list, fields_vars,
390
376
    internal::my_close(file,MYF(0));
391
377
  free_blobs(table);                            /* if pack_blob was used */
392
378
  table->copy_blobs=0;
393
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
379
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
394
380
  /*
395
381
     simulated killing in the middle of per-row loop
396
382
     must be effective for binlogging
397
383
  */
398
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
384
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
399
385
  if (error)
400
386
  {
401
387
    error= -1;                          // Error on read
402
388
    goto err;
403
389
  }
404
 
 
405
 
  char msg[FN_REFLEN];
406
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
407
 
           (info.records - info.copied), session->cuted_fields);
408
 
 
409
 
  if (session->transaction.stmt.hasModifiedNonTransData())
410
 
    session->transaction.all.markModifiedNonTransData();
 
390
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
391
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
392
 
 
393
  if (session->transaction.stmt.modified_non_trans_table)
 
394
    session->transaction.all.modified_non_trans_table= true;
411
395
 
412
396
  /* ok to client sent only after binlog write and engine commit */
413
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
397
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
414
398
err:
415
399
  assert(transactional_table || !(info.copied || info.deleted) ||
416
 
              session->transaction.stmt.hasModifiedNonTransData());
 
400
              session->transaction.stmt.modified_non_trans_table);
417
401
  table->cursor->ha_release_auto_increment();
418
402
  table->auto_increment_field_not_null= false;
419
 
  session->setAbortOnWarning(false);
420
 
 
 
403
  session->abort_on_warning= 0;
421
404
  return(error);
422
405
}
423
406
 
427
410
****************************************************************************/
428
411
 
429
412
static int
430
 
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
413
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
431
414
                  List<Item> &fields_vars, List<Item> &set_fields,
432
415
                  List<Item> &set_values, READ_INFO &read_info,
433
416
                  uint32_t skip_lines, bool ignore_check_option_errors)
442
425
 
443
426
  while (!read_info.read_fixed_length())
444
427
  {
445
 
    if (session->getKilled())
 
428
    if (session->killed)
446
429
    {
447
430
      session->send_kill_message();
448
431
      return(1);
460
443
    }
461
444
    it.rewind();
462
445
    unsigned char *pos=read_info.row_start;
463
 
#ifdef HAVE_VALGRIND
 
446
#ifdef HAVE_purify
464
447
    read_info.row_end[0]=0;
465
448
#endif
466
449
 
487
470
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
488
471
                            ER_WARN_TOO_FEW_RECORDS,
489
472
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
490
 
 
491
 
        if (not field->maybe_null() and field->is_timestamp())
492
 
            ((field::Epoch::pointer) field)->set_time();
 
473
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
474
            ((Field_timestamp*) field)->set_time();
493
475
      }
494
476
      else
495
477
      {
516
498
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
517
499
    }
518
500
 
519
 
    if (session->getKilled() ||
 
501
    if (session->killed ||
520
502
        fill_record(session, set_fields, set_values,
521
503
                    ignore_check_option_errors))
522
504
      return(1);
547
529
 
548
530
 
549
531
static int
550
 
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
532
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
551
533
               List<Item> &fields_vars, List<Item> &set_fields,
552
534
               List<Item> &set_values, READ_INFO &read_info,
553
535
               String &enclosed, uint32_t skip_lines,
565
547
 
566
548
  for (;;it.rewind())
567
549
  {
568
 
    if (session->getKilled())
 
550
    if (session->killed)
569
551
    {
570
552
      session->send_kill_message();
571
553
      return(1);
605
587
            return(1);
606
588
          }
607
589
          field->set_null();
608
 
          if (not field->maybe_null())
 
590
          if (!field->maybe_null())
609
591
          {
610
 
            if (field->is_timestamp())
611
 
            {
612
 
              ((field::Epoch::pointer) field)->set_time();
613
 
            }
 
592
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
593
              ((Field_timestamp*) field)->set_time();
614
594
            else if (field != table->next_number_field)
615
 
            {
616
 
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
617
 
            }
 
595
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
596
                                 ER_WARN_NULL_TO_NOTNULL, 1);
618
597
          }
619
598
        }
620
599
        else if (item->type() == Item::STRING_ITEM)
675
654
                     session->row_count);
676
655
            return(1);
677
656
          }
678
 
          if (not field->maybe_null() and field->is_timestamp())
679
 
              ((field::Epoch::pointer) field)->set_time();
 
657
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
658
              ((Field_timestamp*) field)->set_time();
680
659
          /*
681
660
            QQ: We probably should not throw warning for each field.
682
661
            But how about intention to always have the same number
701
680
      }
702
681
    }
703
682
 
704
 
    if (session->getKilled() ||
 
683
    if (session->killed ||
705
684
        fill_record(session, set_fields, set_values,
706
685
                    ignore_check_option_errors))
707
686
      return(1);
722
701
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
723
702
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
724
703
                          session->row_count);
725
 
      if (session->getKilled())
 
704
      if (session->killed)
726
705
        return(1);
727
706
    }
728
707
    session->row_count++;
796
775
 
797
776
 
798
777
  /* Set of a stack for unget if long terminators */
799
 
  size_t length= max(field_term_length,line_term_length)+1;
800
 
  set_if_bigger(length, line_start.length());
 
778
  uint32_t length= max(field_term_length,line_term_length)+1;
 
779
  set_if_bigger(length,line_start.length());
801
780
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
802
781
 
803
782
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
805
784
  else
806
785
  {
807
786
    end_of_buff=buffer+buff_length;
808
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
809
 
                            (false) ? internal::READ_NET :
810
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
811
 
                            MYF(MY_WME)))
 
787
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
 
788
                      (false) ? internal::READ_NET :
 
789
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
790
                      MYF(MY_WME)))
812
791
    {
813
792
      free((unsigned char*) buffer);
814
793
      error=1;
831
810
  if (!error)
832
811
  {
833
812
    if (need_end_io_cache)
834
 
      cache.end_io_cache();
 
813
      internal::end_io_cache(&cache);
835
814
    free(buffer);
836
815
    error=1;
837
816
  }