~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Brian Aker
  • Date: 2010-09-09 21:45:53 UTC
  • mto: (1756.1.2 build) (1768.2.1 trunk)
  • mto: This revision was merged to the branch mainline in revision 1757.
  • Revision ID: brian@tangent.org-20100909214553-e687rmf5zk9478on
Force unique to just use memory and let the OS handle paging.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Copy data from a textfile to table */
31
31
#include <fcntl.h>
32
32
#include <algorithm>
33
33
#include <climits>
34
 
#include <boost/filesystem.hpp>
35
34
 
36
 
namespace fs=boost::filesystem;
37
35
using namespace std;
38
36
namespace drizzled
39
37
{
75
73
  */
76
74
  void end_io_cache()
77
75
  {
78
 
    cache.end_io_cache();
 
76
    internal::end_io_cache(&cache);
79
77
    need_end_io_cache = 0;
80
78
  }
81
79
 
124
122
                List<Item> &set_values,
125
123
                enum enum_duplicates handle_duplicates, bool ignore)
126
124
{
 
125
  char name[FN_REFLEN];
127
126
  int file;
128
127
  Table *table= NULL;
129
128
  int error;
130
129
  String *field_term=ex->field_term,*escaped=ex->escaped;
131
130
  String *enclosed=ex->enclosed;
132
131
  bool is_fifo=0;
133
 
 
134
 
  assert(table_list->getSchemaName()); // This should never be null
135
 
 
 
132
  char *db= table_list->db;                     // This is never null
 
133
  assert(db);
136
134
  /*
137
135
    If path for cursor is not defined, we will use the current database.
138
136
    If this is not set, we will use the directory where the table to be
139
137
    loaded is located
140
138
  */
141
 
  util::string::const_shared_ptr schema(session->schema());
142
 
  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
 
139
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
143
140
  assert(tdb);
144
141
  uint32_t skip_lines= ex->skip_lines;
145
142
  bool transactional_table;
146
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
143
  Session::killed_state killed_status= Session::NOT_KILLED;
147
144
 
148
145
  /* Escape and enclosed character may be a utf8 4-byte character */
149
146
  if (escaped->length() > 4 || enclosed->length() > 4)
171
168
  */
172
169
  if (unique_table(table_list, table_list->next_global))
173
170
  {
174
 
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
171
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
175
172
    return(true);
176
173
  }
177
174
 
206
203
    */
207
204
    if (table->timestamp_field)
208
205
    {
209
 
      if (table->isWriteSet(table->timestamp_field->position()))
210
 
      {
 
206
      if (table->isWriteSet(table->timestamp_field->field_index))
211
207
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
212
 
      }
213
208
      else
214
209
      {
215
 
        table->setWriteSet(table->timestamp_field->position());
 
210
        table->setWriteSet(table->timestamp_field->field_index);
216
211
      }
217
212
    }
218
213
    /* Fix the expressions in SET clause */
257
252
    return(true);
258
253
  }
259
254
 
260
 
  fs::path to_file(ex->file_name);
261
 
  fs::path target_path(fs::system_complete(getDataHomeCatalog()));
262
 
  if (not to_file.has_root_directory())
263
 
  {
264
 
    int count_elements= 0;
265
 
    for (fs::path::iterator iter= to_file.begin();
266
 
         iter != to_file.end();
267
 
         ++iter, ++count_elements)
268
 
    { }
269
 
 
270
 
    if (count_elements == 1)
271
 
    {
272
 
      target_path /= tdb;
273
 
    }
274
 
    target_path /= to_file;
275
 
  }
276
 
  else
277
 
  {
278
 
    target_path= to_file;
279
 
  }
280
 
 
281
 
  if (not secure_file_priv.string().empty())
282
 
  {
283
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
284
 
    {
285
 
      /* Read only allowed from within dir specified by secure_file_priv */
286
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
255
  {
 
256
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
257
    ex->file_name+=dirname_length(ex->file_name);
 
258
#endif
 
259
    if (!internal::dirname_length(ex->file_name))
 
260
    {
 
261
      strcpy(name, data_home_real);
 
262
      strncat(name, tdb, FN_REFLEN-strlen(data_home_real)-1);
 
263
      (void) internal::fn_format(name, ex->file_name, name, "",
 
264
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
265
    }
 
266
    else
 
267
    {
 
268
      (void) internal::fn_format(name, ex->file_name, data_home_real, "",
 
269
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
270
 
 
271
      if (opt_secure_file_priv &&
 
272
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
273
      {
 
274
        /* Read only allowed from within dir specified by secure_file_priv */
 
275
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
276
        return(true);
 
277
      }
 
278
 
 
279
      struct stat stat_info;
 
280
      if (stat(name,&stat_info))
 
281
      {
 
282
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
 
283
        return(true);
 
284
      }
 
285
 
 
286
      // if we are not in slave thread, the cursor must be:
 
287
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
288
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
289
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
290
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
291
      {
 
292
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
293
        return(true);
 
294
      }
 
295
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
296
        is_fifo = 1;
 
297
    }
 
298
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
299
    {
 
300
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
287
301
      return(true);
288
302
    }
289
303
  }
290
304
 
291
 
  struct stat stat_info;
292
 
  if (stat(target_path.file_string().c_str(), &stat_info))
293
 
  {
294
 
    my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
295
 
    return(true);
296
 
  }
297
 
 
298
 
  // if we are not in slave thread, the cursor must be:
299
 
  if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
300
 
        (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
301
 
        ((stat_info.st_mode & S_IFREG) == S_IFREG ||
302
 
         (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
303
 
  {
304
 
    my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
305
 
    return(true);
306
 
  }
307
 
  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
308
 
    is_fifo = 1;
309
 
 
310
 
 
311
 
  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
312
 
  {
313
 
    my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
314
 
    return(true);
315
 
  }
316
305
  CopyInfo info;
317
306
  memset(&info, 0, sizeof(info));
318
307
  info.ignore= ignore;
319
308
  info.handle_duplicates=handle_duplicates;
320
309
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
321
310
 
322
 
  SchemaIdentifier identifier(*schema);
 
311
  SchemaIdentifier identifier(session->db);
323
312
  READ_INFO read_info(file, tot_length,
324
313
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
325
314
                      *field_term, *ex->line_start, *ex->line_term, *enclosed,
393
382
     simulated killing in the middle of per-row loop
394
383
     must be effective for binlogging
395
384
  */
396
 
  killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
 
385
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
397
386
  if (error)
398
387
  {
399
388
    error= -1;                          // Error on read
400
389
    goto err;
401
390
  }
402
 
 
403
 
  char msg[FN_REFLEN];
404
 
  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
 
391
  snprintf(name, sizeof(name), ER(ER_LOAD_INFO), info.records, info.deleted,
405
392
           (info.records - info.copied), session->cuted_fields);
406
393
 
407
394
  if (session->transaction.stmt.hasModifiedNonTransData())
408
395
    session->transaction.all.markModifiedNonTransData();
409
396
 
410
397
  /* ok to client sent only after binlog write and engine commit */
411
 
  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
 
398
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
412
399
err:
413
400
  assert(transactional_table || !(info.copied || info.deleted) ||
414
401
              session->transaction.stmt.hasModifiedNonTransData());
439
426
 
440
427
  while (!read_info.read_fixed_length())
441
428
  {
442
 
    if (session->getKilled())
 
429
    if (session->killed)
443
430
    {
444
431
      session->send_kill_message();
445
432
      return(1);
457
444
    }
458
445
    it.rewind();
459
446
    unsigned char *pos=read_info.row_start;
460
 
#ifdef HAVE_VALGRIND
 
447
#ifdef HAVE_purify
461
448
    read_info.row_end[0]=0;
462
449
#endif
463
450
 
512
499
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
513
500
    }
514
501
 
515
 
    if (session->getKilled() ||
 
502
    if (session->killed ||
516
503
        fill_record(session, set_fields, set_values,
517
504
                    ignore_check_option_errors))
518
505
      return(1);
561
548
 
562
549
  for (;;it.rewind())
563
550
  {
564
 
    if (session->getKilled())
 
551
    if (session->killed)
565
552
    {
566
553
      session->send_kill_message();
567
554
      return(1);
694
681
      }
695
682
    }
696
683
 
697
 
    if (session->getKilled() ||
 
684
    if (session->killed ||
698
685
        fill_record(session, set_fields, set_values,
699
686
                    ignore_check_option_errors))
700
687
      return(1);
715
702
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
716
703
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
717
704
                          session->row_count);
718
 
      if (session->getKilled())
 
705
      if (session->killed)
719
706
        return(1);
720
707
    }
721
708
    session->row_count++;
789
776
 
790
777
 
791
778
  /* Set of a stack for unget if long terminators */
792
 
  size_t length= max(field_term_length,line_term_length)+1;
793
 
  set_if_bigger(length, line_start.length());
 
779
  uint32_t length= max(field_term_length,line_term_length)+1;
 
780
  set_if_bigger(length,line_start.length());
794
781
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
795
782
 
796
783
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
798
785
  else
799
786
  {
800
787
    end_of_buff=buffer+buff_length;
801
 
    if (cache.init_io_cache((false) ? -1 : cursor, 0,
802
 
                            (false) ? internal::READ_NET :
803
 
                            (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
804
 
                            MYF(MY_WME)))
 
788
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
 
789
                      (false) ? internal::READ_NET :
 
790
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
791
                      MYF(MY_WME)))
805
792
    {
806
793
      free((unsigned char*) buffer);
807
794
      error=1;
824
811
  if (!error)
825
812
  {
826
813
    if (need_end_io_cache)
827
 
      cache.end_io_cache();
 
814
      internal::end_io_cache(&cache);
828
815
    free(buffer);
829
816
    error=1;
830
817
  }