~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_update.cc

  • Committer: Brian Aker
  • Date: 2009-08-04 06:21:17 UTC
  • mfrom: (1108.2.2 merge)
  • Revision ID: brian@gaz-20090804062117-fef8x6y3ydzrvab3
Merge Brian

Show diffs side-by-side

added added

removed removed

Lines of Context:
719
719
 
720
720
  return false;
721
721
}
722
 
 
723
 
 
724
 
/***************************************************************************
725
 
  Update multiple tables from join
726
 
***************************************************************************/
727
 
 
728
 
/*
729
 
  Get table map for list of Item_field
730
 
*/
731
 
 
732
 
static table_map get_table_map(List<Item> *items)
733
 
{
734
 
  List_iterator_fast<Item> item_it(*items);
735
 
  Item_field *item;
736
 
  table_map map= 0;
737
 
 
738
 
  while ((item= (Item_field *) item_it++))
739
 
    map|= item->used_tables();
740
 
  return map;
741
 
}
742
 
 
743
 
 
744
 
/*
745
 
  make update specific preparation and checks after opening tables
746
 
 
747
 
  SYNOPSIS
748
 
    mysql_multi_update_prepare()
749
 
    session         thread handler
750
 
 
751
 
  RETURN
752
 
    false OK
753
 
    true  Error
754
 
*/
755
 
 
756
 
int mysql_multi_update_prepare(Session *session)
757
 
{
758
 
  LEX *lex= session->lex;
759
 
  TableList *table_list= lex->query_tables;
760
 
  TableList *tl, *leaves;
761
 
  List<Item> *fields= &lex->select_lex.item_list;
762
 
  table_map tables_for_update;
763
 
  /*
764
 
    if this multi-update was converted from usual update, here is table
765
 
    counter else junk will be assigned here, but then replaced with real
766
 
    count in open_tables()
767
 
  */
768
 
  uint32_t  table_count= lex->table_count;
769
 
  bool original_multiupdate= (session->lex->sql_command == SQLCOM_UPDATE_MULTI);
770
 
  bool need_reopen= false;
771
 
 
772
 
 
773
 
  /* following need for prepared statements, to run next time multi-update */
774
 
  session->lex->sql_command= SQLCOM_UPDATE_MULTI;
775
 
 
776
 
reopen_tables:
777
 
 
778
 
  /* open tables and create derived ones, but do not lock and fill them */
779
 
  if (((original_multiupdate || need_reopen) &&
780
 
       session->open_tables_from_list(&table_list, &table_count, false)) ||
781
 
      mysql_handle_derived(lex, &mysql_derived_prepare))
782
 
    return true;
783
 
  /*
784
 
    setup_tables() need for VIEWs. JOIN::prepare() will call setup_tables()
785
 
    second time, but this call will do nothing (there are check for second
786
 
    call in setup_tables()).
787
 
  */
788
 
 
789
 
  if (setup_tables_and_check_access(session, &lex->select_lex.context,
790
 
                                    &lex->select_lex.top_join_list,
791
 
                                    table_list,
792
 
                                    &lex->select_lex.leaf_tables, false))
793
 
    return true;
794
 
 
795
 
  if (setup_fields_with_no_wrap(session, 0, *fields, MARK_COLUMNS_WRITE, 0, 0))
796
 
    return true;
797
 
 
798
 
  tables_for_update= get_table_map(fields);
799
 
 
800
 
  /*
801
 
    Setup timestamp handling and locking mode
802
 
  */
803
 
  leaves= lex->select_lex.leaf_tables;
804
 
  for (tl= leaves; tl; tl= tl->next_leaf)
805
 
  {
806
 
    Table *table= tl->table;
807
 
    /* Only set timestamp column if this is not modified */
808
 
    if (table->timestamp_field && table->timestamp_field->isWriteSet())
809
 
      table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
810
 
 
811
 
    /* if table will be updated then check that it is unique */
812
 
    if (table->map & tables_for_update)
813
 
    {
814
 
      table->mark_columns_needed_for_update();
815
 
      /*
816
 
        If table will be updated we should not downgrade lock for it and
817
 
        leave it as is.
818
 
      */
819
 
    }
820
 
    else
821
 
    {
822
 
      /*
823
 
        If we are using the binary log, we need TL_READ_NO_INSERT to get
824
 
        correct order of statements. Otherwise, we use a TL_READ lock to
825
 
        improve performance.
826
 
      */
827
 
      tl->lock_type= TL_READ;
828
 
      tl->updating= 0;
829
 
      /* Update Table::lock_type accordingly. */
830
 
      if (!tl->placeholder())
831
 
        tl->table->reginfo.lock_type= tl->lock_type;
832
 
    }
833
 
  }
834
 
 
835
 
  /* now lock and fill tables */
836
 
  if (lock_tables(session, table_list, table_count, &need_reopen))
837
 
  {
838
 
    if (!need_reopen)
839
 
      return true;
840
 
 
841
 
    /*
842
 
      We have to reopen tables since some of them were altered or dropped
843
 
      during lock_tables() or something was done with their triggers.
844
 
      Let us do some cleanups to be able do setup_table() and setup_fields()
845
 
      once again.
846
 
    */
847
 
    List_iterator_fast<Item> it(*fields);
848
 
    Item *item;
849
 
    while ((item= it++))
850
 
      item->cleanup();
851
 
 
852
 
    session->close_tables_for_reopen(&table_list);
853
 
 
854
 
    goto reopen_tables;
855
 
  }
856
 
 
857
 
  /*
858
 
    Check that we are not using table that we are updating, but we should
859
 
    skip all tables of UPDATE SELECT itself
860
 
  */
861
 
  lex->select_lex.exclude_from_table_unique_test= true;
862
 
  /* We only need SELECT privilege for columns in the values list */
863
 
  for (tl= leaves; tl; tl= tl->next_leaf)
864
 
  {
865
 
    if (tl->lock_type != TL_READ &&
866
 
        tl->lock_type != TL_READ_NO_INSERT)
867
 
    {
868
 
      TableList *duplicate;
869
 
      if ((duplicate= unique_table(session, tl, table_list, 0)))
870
 
      {
871
 
        my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
872
 
 
873
 
        return true;
874
 
      }
875
 
    }
876
 
  }
877
 
  /*
878
 
    Set exclude_from_table_unique_test value back to false. It is needed for
879
 
    further check in multi_update::prepare whether to use record cache.
880
 
  */
881
 
  lex->select_lex.exclude_from_table_unique_test= false;
882
 
 
883
 
  if (session->fill_derived_tables() &&
884
 
      mysql_handle_derived(lex, &mysql_derived_filling))
885
 
    return true;
886
 
 
887
 
  return (false);
888
 
}
889
 
 
890
 
 
891
 
/*
892
 
  Setup multi-update handling and call SELECT to do the join
893
 
*/
894
 
 
895
 
bool mysql_multi_update(Session *session,
896
 
                        TableList *table_list,
897
 
                        List<Item> *fields,
898
 
                        List<Item> *values,
899
 
                        COND *conds,
900
 
                        uint64_t options,
901
 
                        enum enum_duplicates handle_duplicates, bool ignore,
902
 
                        Select_Lex_Unit *unit, Select_Lex *select_lex)
903
 
{
904
 
  multi_update *result;
905
 
  bool res;
906
 
 
907
 
  if (!(result= new multi_update(table_list,
908
 
                                 session->lex->select_lex.leaf_tables,
909
 
                                 fields, values,
910
 
                                 handle_duplicates, ignore)))
911
 
    return true;
912
 
 
913
 
  session->abort_on_warning= true;
914
 
 
915
 
  List<Item> total_list;
916
 
  res= mysql_select(session, &select_lex->ref_pointer_array,
917
 
                      table_list, select_lex->with_wild,
918
 
                      total_list,
919
 
                      conds, 0, (order_st *) NULL, (order_st *)NULL, (Item *) NULL,
920
 
                      options | SELECT_NO_JOIN_CACHE | SELECT_NO_UNLOCK |
921
 
                      OPTION_SETUP_TABLES_DONE,
922
 
                      result, unit, select_lex);
923
 
  res|= session->is_error();
924
 
  if (unlikely(res))
925
 
  {
926
 
    /* If we had a another error reported earlier then this will be ignored */
927
 
    result->send_error(ER_UNKNOWN_ERROR, ER(ER_UNKNOWN_ERROR));
928
 
    result->abort();
929
 
  }
930
 
  delete result;
931
 
  session->abort_on_warning= 0;
932
 
  return false;
933
 
}
934
 
 
935
 
 
936
 
multi_update::multi_update(TableList *table_list,
937
 
                           TableList *leaves_list,
938
 
                           List<Item> *field_list, List<Item> *value_list,
939
 
                           enum enum_duplicates handle_duplicates_arg,
940
 
                           bool ignore_arg)
941
 
  :all_tables(table_list), leaves(leaves_list),
942
 
   tmp_tables(0), updated(0), found(0), fields(field_list),
943
 
   values(value_list), table_count(0), copy_field(0),
944
 
   handle_duplicates(handle_duplicates_arg), do_update(1), trans_safe(1),
945
 
   transactional_tables(0), ignore(ignore_arg), error_handled(0)
946
 
{}
947
 
 
948
 
 
949
 
/*
950
 
  Connect fields with tables and create list of tables that are updated
951
 
*/
952
 
 
953
 
int multi_update::prepare(List<Item> &,
954
 
                          Select_Lex_Unit *)
955
 
{
956
 
  TableList *table_ref;
957
 
  table_map tables_to_update;
958
 
  Item_field *item;
959
 
  List_iterator_fast<Item> field_it(*fields);
960
 
  List_iterator_fast<Item> value_it(*values);
961
 
  uint32_t i, max_fields;
962
 
  uint32_t leaf_table_count= 0;
963
 
 
964
 
  session->count_cuted_fields= CHECK_FIELD_WARN;
965
 
  session->cuted_fields=0L;
966
 
  session->set_proc_info("updating main table");
967
 
 
968
 
  tables_to_update= get_table_map(fields);
969
 
 
970
 
  if (!tables_to_update)
971
 
  {
972
 
    my_message(ER_NO_TABLES_USED, ER(ER_NO_TABLES_USED), MYF(0));
973
 
    return(1);
974
 
  }
975
 
 
976
 
  /*
977
 
    We have to check values after setup_tables to get covering_keys right in
978
 
    reference tables
979
 
  */
980
 
 
981
 
  if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
982
 
    return(1);
983
 
 
984
 
  /*
985
 
    Save tables beeing updated in update_tables
986
 
    update_table->shared is position for table
987
 
    Don't use key read on tables that are updated
988
 
  */
989
 
  for (table_ref= leaves; table_ref; table_ref= table_ref->next_leaf)
990
 
  {
991
 
    /* TODO: add support of view of join support */
992
 
    Table *table=table_ref->table;
993
 
    leaf_table_count++;
994
 
    if (tables_to_update & table->map)
995
 
    {
996
 
      TableList *tl= (TableList*) session->memdup((char*) table_ref,
997
 
                                                sizeof(*tl));
998
 
      if (!tl)
999
 
        return(1);
1000
 
      update_tables.push_back(tl);
1001
 
      tl->shared= table_count++;
1002
 
      table->no_keyread=1;
1003
 
      table->covering_keys.reset();
1004
 
      table->pos_in_table_list= tl;
1005
 
    }
1006
 
  }
1007
 
 
1008
 
 
1009
 
  table_count=  update_tables.size();
1010
 
 
1011
 
  tmp_tables = (Table**) session->calloc(sizeof(Table *) * table_count);
1012
 
  tmp_table_param = (Tmp_Table_Param*) session->calloc(sizeof(Tmp_Table_Param) *
1013
 
                                                   table_count);
1014
 
  fields_for_table= (List_item **) session->alloc(sizeof(List_item *) *
1015
 
                                              table_count);
1016
 
  values_for_table= (List_item **) session->alloc(sizeof(List_item *) *
1017
 
                                              table_count);
1018
 
  if (session->is_fatal_error)
1019
 
    return(1);
1020
 
  for (i=0 ; i < table_count ; i++)
1021
 
  {
1022
 
    fields_for_table[i]= new List_item;
1023
 
    values_for_table[i]= new List_item;
1024
 
  }
1025
 
  if (session->is_fatal_error)
1026
 
    return(1);
1027
 
 
1028
 
  /* Split fields into fields_for_table[] and values_by_table[] */
1029
 
 
1030
 
  while ((item= (Item_field *) field_it++))
1031
 
  {
1032
 
    Item *value= value_it++;
1033
 
    uint32_t offset= item->field->table->pos_in_table_list->shared;
1034
 
    fields_for_table[offset]->push_back(item);
1035
 
    values_for_table[offset]->push_back(value);
1036
 
  }
1037
 
  if (session->is_fatal_error)
1038
 
    return(1);
1039
 
 
1040
 
  /* Allocate copy fields */
1041
 
  max_fields=0;
1042
 
  for (i=0 ; i < table_count ; i++)
1043
 
    set_if_bigger(max_fields, fields_for_table[i]->elements + leaf_table_count);
1044
 
  copy_field= new CopyField[max_fields];
1045
 
  return(session->is_fatal_error != 0);
1046
 
}
1047
 
 
1048
 
 
1049
 
/*
1050
 
  Check if table is safe to update on fly
1051
 
 
1052
 
  SYNOPSIS
1053
 
    safe_update_on_fly()
1054
 
    session                 Thread handler
1055
 
    join_tab            How table is used in join
1056
 
    all_tables          List of tables
1057
 
 
1058
 
  NOTES
1059
 
    We can update the first table in join on the fly if we know that
1060
 
    a row in this table will never be read twice. This is true under
1061
 
    the following conditions:
1062
 
 
1063
 
    - We are doing a table scan and the data is in a separate file (MyISAM) or
1064
 
      if we don't update a clustered key.
1065
 
 
1066
 
    - We are doing a range scan and we don't update the scan key or
1067
 
      the primary key for a clustered table handler.
1068
 
 
1069
 
    - Table is not joined to itself.
1070
 
 
1071
 
    This function gets information about fields to be updated from
1072
 
    the Table::write_set bitmap.
1073
 
 
1074
 
  WARNING
1075
 
    This code is a bit dependent of how make_join_readinfo() works.
1076
 
 
1077
 
  RETURN
1078
 
    0           Not safe to update
1079
 
    1           Safe to update
1080
 
*/
1081
 
 
1082
 
static bool safe_update_on_fly(Session *session, JoinTable *join_tab,
1083
 
                               TableList *table_ref, TableList *all_tables)
1084
 
{
1085
 
  Table *table= join_tab->table;
1086
 
  if (unique_table(session, table_ref, all_tables, 0))
1087
 
    return 0;
1088
 
  switch (join_tab->type) {
1089
 
  case JT_SYSTEM:
1090
 
  case JT_CONST:
1091
 
  case JT_EQ_REF:
1092
 
    return true;                                // At most one matching row
1093
 
  case JT_REF:
1094
 
  case JT_REF_OR_NULL:
1095
 
    return !is_key_used(table, join_tab->ref.key, table->write_set);
1096
 
  case JT_ALL:
1097
 
    /* If range search on index */
1098
 
    if (join_tab->quick)
1099
 
      return !join_tab->quick->is_keys_used(table->write_set);
1100
 
    /* If scanning in clustered key */
1101
 
    if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) &&
1102
 
        table->s->primary_key < MAX_KEY)
1103
 
      return !is_key_used(table, table->s->primary_key, table->write_set);
1104
 
    return true;
1105
 
  default:
1106
 
    break;                                      // Avoid compler warning
1107
 
  }
1108
 
  return false;
1109
 
 
1110
 
}
1111
 
 
1112
 
 
1113
 
/*
1114
 
  Initialize table for multi table
1115
 
 
1116
 
  IMPLEMENTATION
1117
 
    - Update first table in join on the fly, if possible
1118
 
    - Create temporary tables to store changed values for all other tables
1119
 
      that are updated (and main_table if the above doesn't hold).
1120
 
*/
1121
 
 
1122
 
bool
1123
 
multi_update::initialize_tables(JOIN *join)
1124
 
{
1125
 
  if ((session->options & OPTION_SAFE_UPDATES) && error_if_full_join(join))
1126
 
    return(1);
1127
 
  main_table=join->join_tab->table;
1128
 
  table_to_update= 0;
1129
 
 
1130
 
  /* Any update has at least one pair (field, value) */
1131
 
  assert(fields->elements);
1132
 
 
1133
 
  /* Create a temporary table for keys to all tables, except main table */
1134
 
  for (list<TableList*>::iterator it= update_tables.begin(); 
1135
 
       it != update_tables.end(); 
1136
 
       ++it)
1137
 
  {
1138
 
    Table *table= (*it)->table;
1139
 
    uint32_t cnt= (*it)->shared;
1140
 
    List<Item> temp_fields;
1141
 
    order_st     group;
1142
 
    Tmp_Table_Param *tmp_param;
1143
 
 
1144
 
    table->mark_columns_needed_for_update();
1145
 
    if (ignore)
1146
 
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1147
 
    if (table == main_table)                    // First table in join
1148
 
    {
1149
 
      if (safe_update_on_fly(session, join->join_tab, (*it), all_tables))
1150
 
      {
1151
 
        table_to_update= main_table;            // Update table on the fly
1152
 
        continue;
1153
 
      }
1154
 
    }
1155
 
    table->prepare_for_position();
1156
 
 
1157
 
    tmp_param= tmp_table_param+cnt;
1158
 
 
1159
 
    /*
1160
 
      Create a temporary table to store all fields that are changed for this
1161
 
      table. The first field in the temporary table is a pointer to the
1162
 
      original row so that we can find and update it. For the updatable
1163
 
      VIEW a few following fields are rowids of tables used in the CHECK
1164
 
      OPTION condition.
1165
 
    */
1166
 
 
1167
 
    List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
1168
 
    Table *tbl= table;
1169
 
    do
1170
 
    {
1171
 
      Field_varstring *field= new Field_varstring(tbl->file->ref_length, 0,
1172
 
                                                  tbl->alias, tbl->s, &my_charset_bin);
1173
 
      if (!field)
1174
 
        return(1);
1175
 
      field->init(tbl);
1176
 
      /*
1177
 
        The field will be converted to varstring when creating tmp table if
1178
 
        table to be updated was created by mysql 4.1. Deny this.
1179
 
      */
1180
 
      Item_field *ifield= new Item_field((Field *) field);
1181
 
      if (!ifield)
1182
 
         return(1);
1183
 
      ifield->maybe_null= 0;
1184
 
      if (temp_fields.push_back(ifield))
1185
 
        return(1);
1186
 
    } while ((tbl= tbl_it++));
1187
 
 
1188
 
    temp_fields.concat(fields_for_table[cnt]);
1189
 
 
1190
 
    /* Make an unique key over the first field to avoid duplicated updates */
1191
 
    memset(&group, 0, sizeof(group));
1192
 
    group.asc= 1;
1193
 
    group.item= (Item**) temp_fields.head_ref();
1194
 
 
1195
 
    tmp_param->quick_group=1;
1196
 
    tmp_param->field_count=temp_fields.elements;
1197
 
    tmp_param->group_parts=1;
1198
 
    tmp_param->group_length= table->file->ref_length;
1199
 
    if (!(tmp_tables[cnt]=create_tmp_table(session,
1200
 
                                           tmp_param,
1201
 
                                           temp_fields,
1202
 
                                           (order_st*) &group, 0, 0,
1203
 
                                           TMP_TABLE_ALL_COLUMNS,
1204
 
                                           HA_POS_ERROR,
1205
 
                                           (char *) "")))
1206
 
      return(1);
1207
 
    tmp_tables[cnt]->file->extra(HA_EXTRA_WRITE_CACHE);
1208
 
  }
1209
 
  return(0);
1210
 
}
1211
 
 
1212
 
 
1213
 
multi_update::~multi_update()
1214
 
{
1215
 
  TableList *table;
1216
 
  for (list<TableList*>::iterator it= update_tables.begin(); 
1217
 
       it != update_tables.end(); 
1218
 
       ++it)
1219
 
  {
1220
 
    table= *it;
1221
 
    table->table->no_keyread= table->table->no_cache= 0;
1222
 
    if (ignore)
1223
 
      table->table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1224
 
  }
1225
 
 
1226
 
  if (tmp_tables)
1227
 
  {
1228
 
    for (uint32_t cnt = 0; cnt < table_count; cnt++)
1229
 
    {
1230
 
      if (tmp_tables[cnt])
1231
 
      {
1232
 
        tmp_tables[cnt]->free_tmp_table(session);
1233
 
        tmp_table_param[cnt].cleanup();
1234
 
      }
1235
 
    }
1236
 
  }
1237
 
  if (copy_field)
1238
 
    delete [] copy_field;
1239
 
  session->count_cuted_fields= CHECK_FIELD_IGNORE;              // Restore this setting
1240
 
  assert(trans_safe || !updated ||
1241
 
              session->transaction.all.modified_non_trans_table);
1242
 
}
1243
 
 
1244
 
 
1245
 
bool multi_update::send_data(List<Item> &)
1246
 
{
1247
 
  for (list<TableList*>::iterator it= update_tables.begin(); 
1248
 
       it != update_tables.end(); 
1249
 
       ++it)
1250
 
  {
1251
 
    Table *table= (*it)->table;
1252
 
    uint32_t offset= (*it)->shared;
1253
 
    /*
1254
 
      Check if we are using outer join and we didn't find the row
1255
 
      or if we have already updated this row in the previous call to this
1256
 
      function.
1257
 
 
1258
 
      The same row may be presented here several times in a join of type
1259
 
      UPDATE t1 FROM t1,t2 SET t1.a=t2.a
1260
 
 
1261
 
      In this case we will do the update for the first found row combination.
1262
 
      The join algorithm guarantees that we will not find the a row in
1263
 
      t1 several times.
1264
 
    */
1265
 
    if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
1266
 
      continue;
1267
 
 
1268
 
    /*
1269
 
      We can use compare_record() to optimize away updates if
1270
 
      the table handler is returning all columns OR if
1271
 
      if all updated columns are read
1272
 
    */
1273
 
    if (table == table_to_update)
1274
 
    {
1275
 
      bool can_compare_record;
1276
 
      can_compare_record= (!(table->file->ha_table_flags() &
1277
 
                             HA_PARTIAL_COLUMN_READ) ||
1278
 
                           bitmap_is_subset(table->write_set,
1279
 
                                            table->read_set));
1280
 
      table->status|= STATUS_UPDATED;
1281
 
      table->storeRecord();
1282
 
      if (fill_record(session, *fields_for_table[offset],
1283
 
                      *values_for_table[offset], 0))
1284
 
        return(1);
1285
 
 
1286
 
      found++;
1287
 
      if (!can_compare_record || table->compare_record())
1288
 
      {
1289
 
        int error;
1290
 
        if (!updated++)
1291
 
        {
1292
 
          /*
1293
 
            Inform the main table that we are going to update the table even
1294
 
            while we may be scanning it.  This will flush the read cache
1295
 
            if it's used.
1296
 
          */
1297
 
          main_table->file->extra(HA_EXTRA_PREPARE_FOR_UPDATE);
1298
 
        }
1299
 
        if ((error=table->file->ha_update_row(table->record[1],
1300
 
                                              table->record[0])) &&
1301
 
            error != HA_ERR_RECORD_IS_THE_SAME)
1302
 
        {
1303
 
          updated--;
1304
 
          if (!ignore ||
1305
 
              table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1306
 
          {
1307
 
            /*
1308
 
              If (ignore && error == is ignorable) we don't have to
1309
 
              do anything; otherwise...
1310
 
            */
1311
 
            myf flags= 0;
1312
 
 
1313
 
            if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1314
 
              flags|= ME_FATALERROR; /* Other handler errors are fatal */
1315
 
 
1316
 
            prepare_record_for_error_message(error, table);
1317
 
            table->file->print_error(error,MYF(flags));
1318
 
            return(1);
1319
 
          }
1320
 
        }
1321
 
        else
1322
 
        {
1323
 
          if (error == HA_ERR_RECORD_IS_THE_SAME)
1324
 
          {
1325
 
            error= 0;
1326
 
            updated--;
1327
 
          }
1328
 
          /* non-transactional or transactional table got modified   */
1329
 
          /* either multi_update class' flag is raised in its branch */
1330
 
          if (table->file->has_transactions())
1331
 
            transactional_tables= 1;
1332
 
          else
1333
 
          {
1334
 
            trans_safe= 0;
1335
 
            session->transaction.stmt.modified_non_trans_table= true;
1336
 
          }
1337
 
        }
1338
 
      }
1339
 
    }
1340
 
    else
1341
 
    {
1342
 
      int error;
1343
 
      Table *tmp_table= tmp_tables[offset];
1344
 
      /*
1345
 
       For updatable VIEW store rowid of the updated table and
1346
 
       rowids of tables used in the CHECK OPTION condition.
1347
 
      */
1348
 
      uint32_t field_num= 0;
1349
 
      List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
1350
 
      Table *tbl= table;
1351
 
      do
1352
 
      {
1353
 
        tbl->file->position(tbl->record[0]);
1354
 
        Field_varstring *ref_field=
1355
 
          reinterpret_cast<Field_varstring *>(tmp_table->field[field_num]);
1356
 
        ref_field->store((char *)tbl->file->ref, tbl->file->ref_length,
1357
 
                         &my_charset_bin);
1358
 
        field_num++;
1359
 
      } while ((tbl= tbl_it++));
1360
 
 
1361
 
      /* Store regular updated fields in the row. */
1362
 
      fill_record(session,
1363
 
                  tmp_table->field + 1 + unupdated_check_opt_tables.elements,
1364
 
                  *values_for_table[offset], 1);
1365
 
 
1366
 
      /* Write row, ignoring duplicated updates to a row */
1367
 
      error= tmp_table->file->ha_write_row(tmp_table->record[0]);
1368
 
      if (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE)
1369
 
      {
1370
 
        if (error &&
1371
 
            create_myisam_from_heap(session, tmp_table,
1372
 
                                         tmp_table_param[offset].start_recinfo,
1373
 
                                         &tmp_table_param[offset].recinfo,
1374
 
                                         error, 1))
1375
 
        {
1376
 
          do_update=0;
1377
 
          return(1);                    // Not a table_is_full error
1378
 
        }
1379
 
        found++;
1380
 
      }
1381
 
    }
1382
 
  }
1383
 
  return(0);
1384
 
}
1385
 
 
1386
 
 
1387
 
void multi_update::send_error(uint32_t errcode,const char *err)
1388
 
{
1389
 
  /* First send error what ever it is ... */
1390
 
  my_error(errcode, MYF(0), err);
1391
 
}
1392
 
 
1393
 
 
1394
 
void multi_update::abort()
1395
 
{
1396
 
  /* the error was handled or nothing deleted and no side effects return */
1397
 
  if (error_handled ||
1398
 
      (!session->transaction.stmt.modified_non_trans_table && !updated))
1399
 
    return;
1400
 
  /*
1401
 
    If all tables that has been updated are trans safe then just do rollback.
1402
 
    If not attempt to do remaining updates.
1403
 
  */
1404
 
 
1405
 
  if (! trans_safe)
1406
 
  {
1407
 
    assert(session->transaction.stmt.modified_non_trans_table);
1408
 
    if (do_update && table_count > 1)
1409
 
    {
1410
 
      /* Add warning here */
1411
 
      /*
1412
 
         todo/fixme: do_update() is never called with the arg 1.
1413
 
         should it change the signature to become argless?
1414
 
      */
1415
 
      do_updates();
1416
 
    }
1417
 
  }
1418
 
  if (session->transaction.stmt.modified_non_trans_table)
1419
 
  {
1420
 
    session->transaction.all.modified_non_trans_table= true;
1421
 
  }
1422
 
  assert(trans_safe || !updated || session->transaction.stmt.modified_non_trans_table);
1423
 
}
1424
 
 
1425
 
 
1426
 
int multi_update::do_updates()
1427
 
{
1428
 
  TableList *cur_table;
1429
 
  int local_error= 0;
1430
 
  ha_rows org_updated;
1431
 
  Table *table, *tmp_table;
1432
 
  List_iterator_fast<Table> check_opt_it(unupdated_check_opt_tables);
1433
 
 
1434
 
  do_update= 0;                                 // Don't retry this function
1435
 
  if (!found)
1436
 
    return(0);
1437
 
  for (list<TableList*>::iterator it= update_tables.begin(); 
1438
 
       it != update_tables.end(); 
1439
 
       ++it)
1440
 
  {
1441
 
    cur_table= *it;
1442
 
    bool can_compare_record;
1443
 
    uint32_t offset= cur_table->shared;
1444
 
 
1445
 
    table = cur_table->table;
1446
 
    if (table == table_to_update)
1447
 
      continue;                                 // Already updated
1448
 
    org_updated= updated;
1449
 
    tmp_table= tmp_tables[cur_table->shared];
1450
 
    tmp_table->file->extra(HA_EXTRA_CACHE);     // Change to read cache
1451
 
    (void) table->file->ha_rnd_init(0);
1452
 
    table->file->extra(HA_EXTRA_NO_CACHE);
1453
 
 
1454
 
    check_opt_it.rewind();
1455
 
    while(Table *tbl= check_opt_it++)
1456
 
    {
1457
 
      if (tbl->file->ha_rnd_init(1))
1458
 
        goto err;
1459
 
      tbl->file->extra(HA_EXTRA_CACHE);
1460
 
    }
1461
 
 
1462
 
    /*
1463
 
      Setup copy functions to copy fields from temporary table
1464
 
    */
1465
 
    List_iterator_fast<Item> field_it(*fields_for_table[offset]);
1466
 
    Field **field= tmp_table->field +
1467
 
                   1 + unupdated_check_opt_tables.elements; // Skip row pointers
1468
 
    CopyField *copy_field_ptr= copy_field, *copy_field_end;
1469
 
    for ( ; *field ; field++)
1470
 
    {
1471
 
      Item_field *item= (Item_field* ) field_it++;
1472
 
      (copy_field_ptr++)->set(item->field, *field, 0);
1473
 
    }
1474
 
    copy_field_end=copy_field_ptr;
1475
 
 
1476
 
    if ((local_error = tmp_table->file->ha_rnd_init(1)))
1477
 
      goto err;
1478
 
 
1479
 
    can_compare_record= (!(table->file->ha_table_flags() &
1480
 
                           HA_PARTIAL_COLUMN_READ) ||
1481
 
                         bitmap_is_subset(table->write_set,
1482
 
                                          table->read_set));
1483
 
 
1484
 
    for (;;)
1485
 
    {
1486
 
      if (session->killed && trans_safe)
1487
 
        goto err;
1488
 
      if ((local_error=tmp_table->file->rnd_next(tmp_table->record[0])))
1489
 
      {
1490
 
        if (local_error == HA_ERR_END_OF_FILE)
1491
 
          break;
1492
 
        if (local_error == HA_ERR_RECORD_DELETED)
1493
 
          continue;                             // May happen on dup key
1494
 
        goto err;
1495
 
      }
1496
 
 
1497
 
      /* call rnd_pos() using rowids from temporary table */
1498
 
      check_opt_it.rewind();
1499
 
      Table *tbl= table;
1500
 
      uint32_t field_num= 0;
1501
 
      do
1502
 
      {
1503
 
        Field_varstring *ref_field=
1504
 
          reinterpret_cast<Field_varstring *>(tmp_table->field[field_num]);
1505
 
        if((local_error=
1506
 
              tbl->file->rnd_pos(tbl->record[0],
1507
 
                                (unsigned char *) ref_field->ptr
1508
 
                                 + ref_field->length_bytes)))
1509
 
          goto err;
1510
 
        field_num++;
1511
 
      } while((tbl= check_opt_it++));
1512
 
 
1513
 
      table->status|= STATUS_UPDATED;
1514
 
      table->storeRecord();
1515
 
 
1516
 
      /* Copy data from temporary table to current table */
1517
 
      for (copy_field_ptr=copy_field;
1518
 
           copy_field_ptr != copy_field_end;
1519
 
           copy_field_ptr++)
1520
 
        (*copy_field_ptr->do_copy)(copy_field_ptr);
1521
 
 
1522
 
      if (!can_compare_record || table->compare_record())
1523
 
      {
1524
 
        if ((local_error=table->file->ha_update_row(table->record[1],
1525
 
                                                    table->record[0])) &&
1526
 
            local_error != HA_ERR_RECORD_IS_THE_SAME)
1527
 
        {
1528
 
          if (!ignore ||
1529
 
              table->file->is_fatal_error(local_error, HA_CHECK_DUP_KEY))
1530
 
            goto err;
1531
 
        }
1532
 
        if (local_error != HA_ERR_RECORD_IS_THE_SAME)
1533
 
          updated++;
1534
 
        else
1535
 
          local_error= 0;
1536
 
      }
1537
 
    }
1538
 
 
1539
 
    if (updated != org_updated)
1540
 
    {
1541
 
      if (table->file->has_transactions())
1542
 
        transactional_tables= 1;
1543
 
      else
1544
 
      {
1545
 
        trans_safe= 0;                          // Can't do safe rollback
1546
 
        session->transaction.stmt.modified_non_trans_table= true;
1547
 
      }
1548
 
    }
1549
 
    (void) table->file->ha_rnd_end();
1550
 
    (void) tmp_table->file->ha_rnd_end();
1551
 
    check_opt_it.rewind();
1552
 
    while (Table *tbl= check_opt_it++)
1553
 
        tbl->file->ha_rnd_end();
1554
 
  }
1555
 
  return(0);
1556
 
 
1557
 
err:
1558
 
  {
1559
 
    prepare_record_for_error_message(local_error, table);
1560
 
    table->file->print_error(local_error,MYF(ME_FATALERROR));
1561
 
  }
1562
 
 
1563
 
  (void) table->file->ha_rnd_end();
1564
 
  (void) tmp_table->file->ha_rnd_end();
1565
 
  check_opt_it.rewind();
1566
 
  while (Table *tbl= check_opt_it++)
1567
 
      tbl->file->ha_rnd_end();
1568
 
 
1569
 
  if (updated != org_updated)
1570
 
  {
1571
 
    if (table->file->has_transactions())
1572
 
      transactional_tables= 1;
1573
 
    else
1574
 
    {
1575
 
      trans_safe= 0;
1576
 
      session->transaction.stmt.modified_non_trans_table= true;
1577
 
    }
1578
 
  }
1579
 
  return(1);
1580
 
}
1581
 
 
1582
 
 
1583
 
/* out: 1 if error, 0 if success */
1584
 
 
1585
 
bool multi_update::send_eof()
1586
 
{
1587
 
  char buff[STRING_BUFFER_USUAL_SIZE];
1588
 
  uint64_t id;
1589
 
  Session::killed_state killed_status= Session::NOT_KILLED;
1590
 
 
1591
 
  session->set_proc_info("updating reference tables");
1592
 
 
1593
 
  /*
1594
 
     Does updates for the last n - 1 tables, returns 0 if ok;
1595
 
     error takes into account killed status gained in do_updates()
1596
 
  */
1597
 
  int local_error = (table_count) ? do_updates() : 0;
1598
 
  /*
1599
 
    if local_error is not set ON until after do_updates() then
1600
 
    later carried out killing should not affect binlogging.
1601
 
  */
1602
 
  killed_status= (local_error == 0)? Session::NOT_KILLED : session->killed;
1603
 
  session->set_proc_info("end");
1604
 
 
1605
 
  /*
1606
 
    Write the SQL statement to the binlog if we updated
1607
 
    rows and we succeeded or if we updated some non
1608
 
    transactional tables.
1609
 
 
1610
 
    The query has to binlog because there's a modified non-transactional table
1611
 
    either from the query's list or via a stored routine: bug#13270,23333
1612
 
  */
1613
 
 
1614
 
  assert(trans_safe || !updated ||
1615
 
              session->transaction.stmt.modified_non_trans_table);
1616
 
  if (local_error == 0 || session->transaction.stmt.modified_non_trans_table)
1617
 
  {
1618
 
    if (session->transaction.stmt.modified_non_trans_table)
1619
 
      session->transaction.all.modified_non_trans_table= true;
1620
 
  }
1621
 
  if (local_error != 0)
1622
 
    error_handled= true; // to force early leave from ::send_error()
1623
 
 
1624
 
  if (local_error > 0) // if the above log write did not fail ...
1625
 
  {
1626
 
    /* Safety: If we haven't got an error before (can happen in do_updates) */
1627
 
    my_message(ER_UNKNOWN_ERROR, "An error occured in multi-table update",
1628
 
               MYF(0));
1629
 
    return true;
1630
 
  }
1631
 
 
1632
 
  id= session->arg_of_last_insert_id_function ?
1633
 
    session->first_successful_insert_id_in_prev_stmt : 0;
1634
 
  sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
1635
 
          (ulong) session->cuted_fields);
1636
 
  session->row_count_func=
1637
 
    (session->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
1638
 
  session->my_ok((ulong) session->row_count_func, id, buff);
1639
 
 
1640
 
  return false;
1641
 
}