~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Lee Bieber
  • Date: 2010-01-30 23:42:02 UTC
  • mto: This revision was merged to the branch mainline in revision 1282.
  • Revision ID: lbieber@lee-biebers-macbook-pro.local-20100130234202-sxmqfteqwiq15ptg
add target to japanese tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
31
31
#include <fcntl.h>
32
32
#include <algorithm>
33
33
#include <climits>
34
 
#include <boost/filesystem.hpp>
35
34
 
36
 
namespace fs=boost::filesystem;
 
35
using namespace drizzled;
37
36
using namespace std;
38
 
namespace drizzled
39
 
{
40
37
 
41
38
class READ_INFO {
42
39
  int   cursor;
50
47
  int   *stack,*stack_pos;
51
48
  bool  found_end_of_line,start_of_line,eof;
52
49
  bool  need_end_io_cache;
53
 
  internal::IO_CACHE cache;
 
50
  IO_CACHE cache;
54
51
 
55
52
public:
56
53
  bool error,line_cuted,found_null,enclosed;
75
72
  */
76
73
  void end_io_cache()
77
74
  {
78
 
    cache.end_io_cache();
 
75
    ::end_io_cache(&cache);
79
76
    need_end_io_cache = 0;
80
77
  }
81
78
 
87
84
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
88
85
};
89
86
 
90
 
static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
87
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
91
88
                             List<Item> &fields_vars, List<Item> &set_fields,
92
89
                             List<Item> &set_values, READ_INFO &read_info,
93
90
                             uint32_t skip_lines,
94
91
                             bool ignore_check_option_errors);
95
 
static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
92
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
96
93
                          List<Item> &fields_vars, List<Item> &set_fields,
97
94
                          List<Item> &set_values, READ_INFO &read_info,
98
95
                          String &enclosed, uint32_t skip_lines,
124
121
                List<Item> &set_values,
125
122
                enum enum_duplicates handle_duplicates, bool ignore)
126
123
{
 
124
  char name[FN_REFLEN];
127
125
  int file;
128
126
  Table *table= NULL;
129
127
  int error;
130
128
  String *field_term=ex->field_term,*escaped=ex->escaped;
131
129
  String *enclosed=ex->enclosed;
132
130
  bool is_fifo=0;
133
 
 
134
 
  assert(table_list->getSchemaName()); // This should never be null
135
 
 
 
131
  char *db= table_list->db;                     // This is never null
 
132
  assert(db);
136
133
  /*
137
134
    If path for cursor is not defined, we will use the current database.
138
135
    If this is not set, we will use the directory where the table to be
139
136
    loaded is located
140
137
  */
141
 
  util::string::const_shared_ptr schema(session->schema());
142
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
138
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
143
139
  assert(tdb);
144
140
  uint32_t skip_lines= ex->skip_lines;
145
141
  bool transactional_table;
146
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
142
  Session::killed_state killed_status= Session::NOT_KILLED;
147
143
 
148
144
  /* Escape and enclosed character may be a utf8 4-byte character */
149
145
  if (escaped->length() > 4 || enclosed->length() > 4)
171
167
  */
172
168
  if (unique_table(table_list, table_list->next_global))
173
169
  {
174
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
170
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
171
    return(true);
176
172
  }
177
173
 
181
177
  if (!fields_vars.elements)
182
178
  {
183
179
    Field **field;
184
 
    for (field= table->getFields(); *field ; field++)
 
180
    for (field=table->field; *field ; field++)
185
181
      fields_vars.push_back(new Item_field(*field));
186
182
    table->setWriteSet();
187
183
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
206
202
    */
207
203
    if (table->timestamp_field)
208
204
    {
209
 
      if (table->isWriteSet(table->timestamp_field->position()))
210
 
      {
 
205
      if (table->isWriteSet(table->timestamp_field->field_index))
211
206
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
212
 
      }
213
207
      else
214
208
      {
215
 
        table->setWriteSet(table->timestamp_field->position());
 
209
        table->setWriteSet(table->timestamp_field->field_index);
216
210
      }
217
211
    }
218
212
    /* Fix the expressions in SET clause */
257
251
    return(true);
258
252
  }
259
253
 
260
 
  fs::path to_file(ex->file_name);
261
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
 
  if (not to_file.has_root_directory())
263
 
  {
264
 
    int count_elements= 0;
265
 
    for (fs::path::iterator iter= to_file.begin();
266
 
         iter != to_file.end();
267
 
         ++iter, ++count_elements)
268
 
    { }
269
 
 
270
 
    if (count_elements == 1)
271
 
    {
272
 
      target_path /= tdb;
273
 
    }
274
 
    target_path /= to_file;
275
 
  }
276
 
  else
277
 
  {
278
 
    target_path= to_file;
279
 
  }
280
 
 
281
 
  if (not secure_file_priv.string().empty())
282
 
  {
283
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
284
 
    {
285
 
      /* Read only allowed from within dir specified by secure_file_priv */
286
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
254
  {
 
255
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
256
    ex->file_name+=dirname_length(ex->file_name);
 
257
#endif
 
258
    if (!dirname_length(ex->file_name))
 
259
    {
 
260
      strcpy(name, drizzle_real_data_home);
 
261
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
262
      (void) fn_format(name, ex->file_name, name, "",
 
263
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
264
    }
 
265
    else
 
266
    {
 
267
      (void) fn_format(name, ex->file_name, drizzle_real_data_home, "",
 
268
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
269
 
 
270
      if (opt_secure_file_priv &&
 
271
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
272
      {
 
273
        /* Read only allowed from within dir specified by secure_file_priv */
 
274
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
275
        return(true);
 
276
      }
 
277
 
 
278
      struct stat stat_info;
 
279
      if (stat(name,&stat_info))
 
280
      {
 
281
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
 
282
        return(true);
 
283
      }
 
284
 
 
285
      // if we are not in slave thread, the cursor must be:
 
286
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
287
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
288
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
289
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
290
      {
 
291
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
292
        return(true);
 
293
      }
 
294
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
295
        is_fifo = 1;
 
296
    }
 
297
    if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
298
    {
 
299
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
287
300
      return(true);
288
301
    }
289
302
  }
290
303
 
291
 
  struct stat stat_info;
292
 
  if (stat(target_path.file_string().c_str(), &stat_info))
293
 
  {
294
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
295
 
    return(true);
296
 
  }
297
 
 
298
 
  // if we are not in slave thread, the cursor must be:
299
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
300
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
303
 
  {
304
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
305
 
    return(true);
306
 
  }
307
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
308
 
    is_fifo = 1;
309
 
 
310
 
 
311
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
312
 
  {
313
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
314
 
    return(true);
315
 
  }
316
 
  CopyInfo info;
 
304
  COPY_INFO info;
317
305
  memset(&info, 0, sizeof(info));
318
306
  info.ignore= ignore;
319
307
  info.handle_duplicates=handle_duplicates;
320
308
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
321
309
 
322
 
  SchemaIdentifier identifier(*schema);
323
310
  READ_INFO read_info(file, tot_length,
324
 
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
 
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
 
311
                      ex->cs ? ex->cs : get_default_db_collation(session->db.c_str()),
 
312
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
326
313
                      info.escape_char, is_fifo);
327
314
  if (read_info.error)
328
315
  {
329
316
    if  (file >= 0)
330
 
      internal::my_close(file,MYF(0));                  // no files in net reading
 
317
      my_close(file,MYF(0));                    // no files in net reading
331
318
    return(true);                               // Can't allocate buffers
332
319
  }
333
320
 
385
372
    table->next_number_field=0;
386
373
  }
387
374
  if (file >= 0)
388
 
    internal::my_close(file,MYF(0));
 
375
    my_close(file,MYF(0));
389
376
  free_blobs(table);                            /* if pack_blob was used */
390
377
  table->copy_blobs=0;
391
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
378
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
392
379
  /*
393
380
     simulated killing in the middle of per-row loop
394
381
     must be effective for binlogging
395
382
  */
396
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
383
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
397
384
  if (error)
398
385
  {
399
386
    error= -1;                          // Error on read
400
387
    goto err;
401
388
  }
402
 
 
403
 
  char msg[FN_REFLEN];
404
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
405
 
           (info.records - info.copied), session->cuted_fields);
406
 
 
407
 
  if (session->transaction.stmt.hasModifiedNonTransData())
408
 
    session->transaction.all.markModifiedNonTransData();
 
389
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
390
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
391
 
 
392
  if (session->transaction.stmt.modified_non_trans_table)
 
393
    session->transaction.all.modified_non_trans_table= true;
409
394
 
410
395
  /* ok to client sent only after binlog write and engine commit */
411
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
396
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
412
397
err:
413
398
  assert(transactional_table || !(info.copied || info.deleted) ||
414
 
              session->transaction.stmt.hasModifiedNonTransData());
 
399
              session->transaction.stmt.modified_non_trans_table);
415
400
  table->cursor->ha_release_auto_increment();
416
401
  table->auto_increment_field_not_null= false;
417
402
  session->abort_on_warning= 0;
424
409
****************************************************************************/
425
410
 
426
411
static int
427
 
read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
 
412
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
428
413
                  List<Item> &fields_vars, List<Item> &set_fields,
429
414
                  List<Item> &set_values, READ_INFO &read_info,
430
415
                  uint32_t skip_lines, bool ignore_check_option_errors)
439
424
 
440
425
  while (!read_info.read_fixed_length())
441
426
  {
442
 
    if (session->getKilled())
 
427
    if (session->killed)
443
428
    {
444
429
      session->send_kill_message();
445
430
      return(1);
457
442
    }
458
443
    it.rewind();
459
444
    unsigned char *pos=read_info.row_start;
460
 
#ifdef HAVE_VALGRIND
 
445
#ifdef HAVE_purify
461
446
    read_info.row_end[0]=0;
462
447
#endif
463
448
 
512
497
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
513
498
    }
514
499
 
515
 
    if (session->getKilled() ||
 
500
    if (session->killed ||
516
501
        fill_record(session, set_fields, set_values,
517
502
                    ignore_check_option_errors))
518
503
      return(1);
543
528
 
544
529
 
545
530
static int
546
 
read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
 
531
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
547
532
               List<Item> &fields_vars, List<Item> &set_fields,
548
533
               List<Item> &set_values, READ_INFO &read_info,
549
534
               String &enclosed, uint32_t skip_lines,
561
546
 
562
547
  for (;;it.rewind())
563
548
  {
564
 
    if (session->getKilled())
 
549
    if (session->killed)
565
550
    {
566
551
      session->send_kill_message();
567
552
      return(1);
694
679
      }
695
680
    }
696
681
 
697
 
    if (session->getKilled() ||
 
682
    if (session->killed ||
698
683
        fill_record(session, set_fields, set_values,
699
684
                    ignore_check_option_errors))
700
685
      return(1);
715
700
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
716
701
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
717
702
                          session->row_count);
718
 
      if (session->getKilled())
 
703
      if (session->killed)
719
704
        return(1);
720
705
    }
721
706
    session->row_count++;
789
774
 
790
775
 
791
776
  /* Set of a stack for unget if long terminators */
792
 
  size_t length= max(field_term_length,line_term_length)+1;
793
 
  set_if_bigger(length, line_start.length());
 
777
  uint32_t length= max(field_term_length,line_term_length)+1;
 
778
  set_if_bigger(length,line_start.length());
794
779
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
795
780
 
796
781
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
798
783
  else
799
784
  {
800
785
    end_of_buff=buffer+buff_length;
801
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
 
                            (false) ? internal::READ_NET :
803
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
804
 
                            MYF(MY_WME)))
 
786
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
 
787
                      (false) ? READ_NET :
 
788
                      (is_fifo ? READ_FIFO : READ_CACHE),0L,1,
 
789
                      MYF(MY_WME)))
805
790
    {
806
791
      free((unsigned char*) buffer);
807
792
      error=1;
824
809
  if (!error)
825
810
  {
826
811
    if (need_end_io_cache)
827
 
      cache.end_io_cache();
 
812
      ::end_io_cache(&cache);
828
813
    free(buffer);
829
814
    error=1;
830
815
  }
1156
1141
}
1157
1142
 
1158
1143
 
1159
 
} /* namespace drizzled */