~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Monty Taylor
  • Date: 2010-12-06 20:03:48 UTC
  • mto: (1977.1.1 build)
  • mto: This revision was merged to the branch mainline in revision 1980.
  • Revision ID: mordred@inaugust.com-20101206200348-ds31k1kc8ougn7fg
four more variables

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
31
31
#include <fcntl.h>
32
32
#include <algorithm>
33
33
#include <climits>
 
34
#include <boost/filesystem.hpp>
34
35
 
 
36
namespace fs=boost::filesystem;
35
37
using namespace std;
36
38
namespace drizzled
37
39
{
73
75
  */
74
76
  void end_io_cache()
75
77
  {
76
 
    internal::end_io_cache(&cache);
 
78
    cache.end_io_cache();
77
79
    need_end_io_cache = 0;
78
80
  }
79
81
 
85
87
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
86
88
};
87
89
 
88
 
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
90
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
89
91
                             List<Item> &fields_vars, List<Item> &set_fields,
90
92
                             List<Item> &set_values, READ_INFO &read_info,
91
93
                             uint32_t skip_lines,
92
94
                             bool ignore_check_option_errors);
93
 
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
95
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
94
96
                          List<Item> &fields_vars, List<Item> &set_fields,
95
97
                          List<Item> &set_values, READ_INFO &read_info,
96
98
                          String &enclosed, uint32_t skip_lines,
122
124
                List<Item> &set_values,
123
125
                enum enum_duplicates handle_duplicates, bool ignore)
124
126
{
125
 
  char name[FN_REFLEN];
126
127
  int file;
127
128
  Table *table= NULL;
128
129
  int error;
129
130
  String *field_term=ex->field_term,*escaped=ex->escaped;
130
131
  String *enclosed=ex->enclosed;
131
132
  bool is_fifo=0;
132
 
  char *db= table_list->db;                     // This is never null
133
 
  assert(db);
 
133
 
 
134
  assert(table_list->getSchemaName()); // This should never be null
 
135
 
134
136
  /*
135
137
    If path for cursor is not defined, we will use the current database.
136
138
    If this is not set, we will use the directory where the table to be
137
139
    loaded is located
138
140
  */
139
 
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
 
141
  const char *tdb= session->db.empty() ? table_list->getSchemaName()  : session->db.c_str();            // Result is never null
140
142
  assert(tdb);
141
143
  uint32_t skip_lines= ex->skip_lines;
142
144
  bool transactional_table;
143
 
  Session::killed_state killed_status= Session::NOT_KILLED;
 
145
  Session::killed_state_t killed_status= Session::NOT_KILLED;
144
146
 
145
147
  /* Escape and enclosed character may be a utf8 4-byte character */
146
148
  if (escaped->length() > 4 || enclosed->length() > 4)
168
170
  */
169
171
  if (unique_table(table_list, table_list->next_global))
170
172
  {
171
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
 
173
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
172
174
    return(true);
173
175
  }
174
176
 
178
180
  if (!fields_vars.elements)
179
181
  {
180
182
    Field **field;
181
 
    for (field=table->field; *field ; field++)
 
183
    for (field= table->getFields(); *field ; field++)
182
184
      fields_vars.push_back(new Item_field(*field));
183
185
    table->setWriteSet();
184
186
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
252
254
    return(true);
253
255
  }
254
256
 
255
 
  {
256
 
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
257
 
    ex->file_name+=dirname_length(ex->file_name);
258
 
#endif
259
 
    if (!internal::dirname_length(ex->file_name))
260
 
    {
261
 
      strcpy(name, drizzle_real_data_home);
262
 
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
263
 
      (void) internal::fn_format(name, ex->file_name, name, "",
264
 
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
265
 
    }
266
 
    else
267
 
    {
268
 
      (void) internal::fn_format(name, ex->file_name, drizzle_real_data_home, "",
269
 
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
270
 
 
271
 
      if (opt_secure_file_priv &&
272
 
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
273
 
      {
274
 
        /* Read only allowed from within dir specified by secure_file_priv */
275
 
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
276
 
        return(true);
277
 
      }
278
 
 
279
 
      struct stat stat_info;
280
 
      if (stat(name,&stat_info))
281
 
      {
282
 
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
283
 
        return(true);
284
 
      }
285
 
 
286
 
      // if we are not in slave thread, the cursor must be:
287
 
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
288
 
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
289
 
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
290
 
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
291
 
      {
292
 
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
293
 
        return(true);
294
 
      }
295
 
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
296
 
        is_fifo = 1;
297
 
    }
298
 
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
299
 
    {
300
 
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
 
257
  fs::path to_file(ex->file_name);
 
258
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
 
259
  if (not to_file.has_root_directory())
 
260
  {
 
261
    int count_elements= 0;
 
262
    for (fs::path::iterator iter= to_file.begin();
 
263
         iter != to_file.end();
 
264
         ++iter, ++count_elements)
 
265
    { }
 
266
 
 
267
    if (count_elements == 1)
 
268
    {
 
269
      target_path /= tdb;
 
270
    }
 
271
    target_path /= to_file;
 
272
  }
 
273
  else
 
274
  {
 
275
    target_path= to_file;
 
276
  }
 
277
 
 
278
  if (not secure_file_priv.string().empty())
 
279
  {
 
280
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
 
281
    {
 
282
      /* Read only allowed from within dir specified by secure_file_priv */
 
283
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
301
284
      return(true);
302
285
    }
303
286
  }
304
287
 
305
 
  COPY_INFO info;
 
288
  struct stat stat_info;
 
289
  if (stat(target_path.file_string().c_str(), &stat_info))
 
290
  {
 
291
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
 
292
    return(true);
 
293
  }
 
294
 
 
295
  // if we are not in slave thread, the cursor must be:
 
296
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
297
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
298
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
299
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
300
  {
 
301
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
 
302
    return(true);
 
303
  }
 
304
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
305
    is_fifo = 1;
 
306
 
 
307
 
 
308
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
 
309
  {
 
310
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
 
311
    return(true);
 
312
  }
 
313
  CopyInfo info;
306
314
  memset(&info, 0, sizeof(info));
307
315
  info.ignore= ignore;
308
316
  info.handle_duplicates=handle_duplicates;
377
385
    internal::my_close(file,MYF(0));
378
386
  free_blobs(table);                            /* if pack_blob was used */
379
387
  table->copy_blobs=0;
380
 
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
 
388
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
381
389
  /*
382
390
     simulated killing in the middle of per-row loop
383
391
     must be effective for binlogging
384
392
  */
385
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
 
393
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
386
394
  if (error)
387
395
  {
388
396
    error= -1;                          // Error on read
389
397
    goto err;
390
398
  }
391
 
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
392
 
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
399
 
 
400
  char msg[FN_REFLEN];
 
401
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
 
402
           (info.records - info.copied), session->cuted_fields);
393
403
 
394
404
  if (session->transaction.stmt.hasModifiedNonTransData())
395
405
    session->transaction.all.markModifiedNonTransData();
396
406
 
397
407
  /* ok to client sent only after binlog write and engine commit */
398
 
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
 
408
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
399
409
err:
400
410
  assert(transactional_table || !(info.copied || info.deleted) ||
401
411
              session->transaction.stmt.hasModifiedNonTransData());
411
421
****************************************************************************/
412
422
 
413
423
static int
414
 
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
424
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
415
425
                  List<Item> &fields_vars, List<Item> &set_fields,
416
426
                  List<Item> &set_values, READ_INFO &read_info,
417
427
                  uint32_t skip_lines, bool ignore_check_option_errors)
426
436
 
427
437
  while (!read_info.read_fixed_length())
428
438
  {
429
 
    if (session->killed)
 
439
    if (session->getKilled())
430
440
    {
431
441
      session->send_kill_message();
432
442
      return(1);
444
454
    }
445
455
    it.rewind();
446
456
    unsigned char *pos=read_info.row_start;
447
 
#ifdef HAVE_purify
 
457
#ifdef HAVE_VALGRIND
448
458
    read_info.row_end[0]=0;
449
459
#endif
450
460
 
499
509
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
500
510
    }
501
511
 
502
 
    if (session->killed ||
 
512
    if (session->getKilled() ||
503
513
        fill_record(session, set_fields, set_values,
504
514
                    ignore_check_option_errors))
505
515
      return(1);
530
540
 
531
541
 
532
542
static int
533
 
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
543
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
534
544
               List<Item> &fields_vars, List<Item> &set_fields,
535
545
               List<Item> &set_values, READ_INFO &read_info,
536
546
               String &enclosed, uint32_t skip_lines,
548
558
 
549
559
  for (;;it.rewind())
550
560
  {
551
 
    if (session->killed)
 
561
    if (session->getKilled())
552
562
    {
553
563
      session->send_kill_message();
554
564
      return(1);
681
691
      }
682
692
    }
683
693
 
684
 
    if (session->killed ||
 
694
    if (session->getKilled() ||
685
695
        fill_record(session, set_fields, set_values,
686
696
                    ignore_check_option_errors))
687
697
      return(1);
702
712
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
703
713
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
704
714
                          session->row_count);
705
 
      if (session->killed)
 
715
      if (session->getKilled())
706
716
        return(1);
707
717
    }
708
718
    session->row_count++;
776
786
 
777
787
 
778
788
  /* Set of a stack for unget if long terminators */
779
 
  uint32_t length= max(field_term_length,line_term_length)+1;
780
 
  set_if_bigger(length,line_start.length());
 
789
  size_t length= max(field_term_length,line_term_length)+1;
 
790
  set_if_bigger(length, line_start.length());
781
791
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
782
792
 
783
793
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
785
795
  else
786
796
  {
787
797
    end_of_buff=buffer+buff_length;
788
 
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
789
 
                      (false) ? internal::READ_NET :
790
 
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
791
 
                      MYF(MY_WME)))
 
798
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
 
799
                            (false) ? internal::READ_NET :
 
800
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
801
                            MYF(MY_WME)))
792
802
    {
793
803
      free((unsigned char*) buffer);
794
804
      error=1;
811
821
  if (!error)
812
822
  {
813
823
    if (need_end_io_cache)
814
 
      internal::end_io_cache(&cache);
 
824
      cache.end_io_cache();
815
825
    free(buffer);
816
826
    error=1;
817
827
  }