18
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
27
#include <drizzled/show.h>
28
#include <drizzled/lock.h>
29
#include <drizzled/session.h>
30
#include <drizzled/statement/alter_table.h>
31
#include <drizzled/charset.h>
32
#include <drizzled/gettext.h>
33
#include <drizzled/data_home.h>
34
#include <drizzled/sql_table.h>
35
#include <drizzled/table_proto.h>
36
#include <drizzled/optimizer/range.h>
37
#include <drizzled/time_functions.h>
38
#include <drizzled/records.h>
39
#include <drizzled/pthread_globals.h>
40
#include <drizzled/internal/my_sys.h>
41
#include <drizzled/internal/iocache.h>
42
#include <drizzled/plugin/storage_engine.h>
27
#include "drizzled/show.h"
28
#include "drizzled/lock.h"
29
#include "drizzled/session.h"
30
#include "drizzled/statement/alter_table.h"
31
#include "drizzled/global_charset_info.h"
34
#include "drizzled/gettext.h"
35
#include "drizzled/data_home.h"
36
#include "drizzled/sql_table.h"
37
#include "drizzled/table_proto.h"
38
#include "drizzled/optimizer/range.h"
39
#include "drizzled/time_functions.h"
40
#include "drizzled/records.h"
41
#include "drizzled/pthread_globals.h"
42
#include "drizzled/internal/my_sys.h"
43
#include "drizzled/internal/iocache.h"
44
#include "drizzled/plugin/storage_engine.h"
43
45
#include <drizzled/copy_field.h>
44
#include <drizzled/transaction_services.h>
45
#include <drizzled/filesort.h>
46
#include <drizzled/message.h>
47
#include <drizzled/message/alter_table.pb.h>
48
#include <drizzled/alter_column.h>
49
#include <drizzled/alter_info.h>
50
#include <drizzled/util/test.h>
51
#include <drizzled/open_tables_state.h>
52
#include <drizzled/table/cache.h>
47
#include "drizzled/transaction_services.h"
49
#include "drizzled/filesort.h"
51
#include "drizzled/message.h"
54
53
using namespace std;
58
58
extern pid_t current_pid;
68
message::AlterTable &alter_table_message,
68
enum enum_enable_or_disable keys_onoff,
69
69
bool error_if_not_empty);
71
71
static bool prepare_alter_table(Session *session,
73
HA_CREATE_INFO *create_info,
74
const message::Table &original_proto,
75
message::Table &table_message,
76
message::AlterTable &alter_table_message,
77
AlterInfo *alter_info);
73
HA_CREATE_INFO *create_info,
74
const message::Table &original_proto,
75
message::Table &table_message,
76
AlterInfo *alter_info);
79
78
static Table *open_alter_table(Session *session, Table *table, identifier::Table &identifier);
81
static int apply_online_alter_keys_onoff(Session *session,
83
const message::AlterTable::AlterKeysOnOff &op);
85
static int apply_online_rename_table(Session *session,
87
plugin::StorageEngine *original_engine,
88
identifier::Table &original_table_identifier,
89
identifier::Table &new_table_identifier,
90
const message::AlterTable::RenameTable &alter_operation);
92
80
namespace statement {
94
AlterTable::AlterTable(Session *in_session, Table_ident *) :
82
AlterTable::AlterTable(Session *in_session, Table_ident *ident, drizzled::ha_build_method build_arg) :
95
83
CreateTable(in_session)
97
set_command(SQLCOM_ALTER_TABLE);
85
in_session->lex->sql_command= SQLCOM_ALTER_TABLE;
87
alter_info.build_method= build_arg;
100
90
} // namespace statement
102
92
bool statement::AlterTable::execute()
104
TableList *first_table= (TableList *) lex().select_lex.table_list.first;
105
TableList *all_tables= lex().query_tables;
94
TableList *first_table= (TableList *) getSession()->lex->select_lex.table_list.first;
95
TableList *all_tables= getSession()->lex->query_tables;
106
96
assert(first_table == all_tables && first_table != 0);
107
Select_Lex *select_lex= &lex().select_lex;
97
Select_Lex *select_lex= &getSession()->lex->select_lex;
108
98
bool need_start_waiting= false;
110
100
is_engine_set= not createTableMessage().engine().name().empty();
112
102
if (is_engine_set)
114
create_info().db_type=
115
plugin::StorageEngine::findByName(session(), createTableMessage().engine().name());
104
create_info().db_type=
105
plugin::StorageEngine::findByName(*getSession(), createTableMessage().engine().name());
117
107
if (create_info().db_type == NULL)
175
165
createTableMessage(),
178
select_lex->order_list.size(),
168
select_lex->order_list.elements,
179
169
(Order *) select_lex->order_list.first,
170
getSession()->lex->ignore);
184
174
identifier::Table catch22(first_table->getSchemaName(), first_table->getTableName());
185
Table *table= session().open_tables.find_temporary_table(catch22);
175
Table *table= getSession()->find_temporary_table(catch22);
188
178
identifier::Table identifier(first_table->getSchemaName(), first_table->getTableName(), table->getMutableShare()->getPath());
189
179
identifier::Table new_identifier(select_lex->db ? select_lex->db : first_table->getSchemaName(),
190
lex().name.str ? lex().name.str : first_table->getTableName(),
180
getSession()->lex->name.str ? getSession()->lex->name.str : first_table->getTableName(),
191
181
table->getMutableShare()->getPath());
193
res= alter_table(&session(),
183
res= alter_table(getSession(),
259
249
HA_CREATE_INFO *create_info,
260
250
const message::Table &original_proto,
261
251
message::Table &table_message,
262
message::AlterTable &alter_table_message,
263
252
AlterInfo *alter_info)
254
/* New column definitions are added here */
255
List<CreateField> new_create_list;
256
/* New key definitions are added here */
257
List<Key> new_key_list;
258
List_iterator<AlterDrop> drop_it(alter_info->drop_list);
259
List_iterator<CreateField> def_it(alter_info->create_list);
260
List_iterator<AlterColumn> alter_it(alter_info->alter_list);
261
List_iterator<Key> key_it(alter_info->key_list);
262
List_iterator<CreateField> find_it(new_create_list);
263
List_iterator<CreateField> field_it(new_create_list);
264
List<Key_part_spec> key_parts;
265
265
uint32_t used_fields= create_info->used_fields;
266
vector<string> drop_keys;
267
vector<string> drop_columns;
268
vector<string> drop_fkeys;
270
/* we use drop_(keys|columns|fkey) below to check that we can do all
271
operations we've been asked to do */
272
for (int operationnr= 0; operationnr < alter_table_message.operations_size();
275
const message::AlterTable::AlterTableOperation &operation=
276
alter_table_message.operations(operationnr);
278
switch (operation.operation())
280
case message::AlterTable::AlterTableOperation::DROP_KEY:
281
drop_keys.push_back(operation.drop_name());
283
case message::AlterTable::AlterTableOperation::DROP_COLUMN:
284
drop_columns.push_back(operation.drop_name());
286
case message::AlterTable::AlterTableOperation::DROP_FOREIGN_KEY:
287
drop_fkeys.push_back(operation.drop_name());
266
KeyInfo *key_info= table->key_info;
294
269
/* Let new create options override the old ones */
295
message::Table::TableOptions *table_options= table_message.mutable_options();
270
message::Table::TableOptions *table_options;
271
table_options= table_message.mutable_options();
297
273
if (not (used_fields & HA_CREATE_USED_DEFAULT_CHARSET))
298
274
create_info->default_table_charset= table->getShare()->table_charset;
309
285
table->restoreRecordAsDefault(); /* Empty record for DEFAULT */
311
List<CreateField> new_create_list;
312
List<Key> new_key_list;
313
288
/* First collect all fields from table which isn't in drop_list */
315
290
for (Field **f_ptr= table->getFields(); (field= *f_ptr); f_ptr++)
317
292
/* Check if field should be dropped */
318
vector<string>::iterator it= drop_columns.begin();
319
while ((it != drop_columns.end()))
295
while ((drop= drop_it++))
321
if (! my_strcasecmp(system_charset_info, field->field_name, (*it).c_str()))
297
if (drop->type == AlterDrop::COLUMN &&
298
! my_strcasecmp(system_charset_info, field->field_name, drop->name))
323
300
/* Reset auto_increment value if it was dropped */
324
301
if (MTYP_TYPENR(field->unireg_check) == Field::NEXT_NUMBER &&
325
not (used_fields & HA_CREATE_USED_AUTO))
302
! (used_fields & HA_CREATE_USED_AUTO))
327
304
create_info->auto_increment_value= 0;
328
305
create_info->used_fields|= HA_CREATE_USED_AUTO;
335
if (it != drop_columns.end())
337
drop_columns.erase(it);
341
317
/* Mark that we will read the field */
342
318
field->setReadSet();
345
320
/* Check if field is changed */
346
List<CreateField>::iterator def_it= alter_info->create_list.begin();
347
322
while ((def= def_it++))
349
324
if (def->change &&
370
345
def= new CreateField(field, field);
371
346
new_create_list.push_back(def);
372
AlterInfo::alter_list_t::iterator alter(alter_info->alter_list.begin());
347
alter_it.rewind(); /* Change default if ALTER */
374
for (; alter != alter_info->alter_list.end(); alter++)
350
while ((alter= alter_it++))
376
if (not my_strcasecmp(system_charset_info,field->field_name, alter->name))
352
if (! my_strcasecmp(system_charset_info,field->field_name, alter->name))
380
if (alter != alter_info->alter_list.end())
382
def->setDefaultValue(alter->def, NULL);
358
if (def->sql_type == DRIZZLE_TYPE_BLOB)
360
my_error(ER_BLOB_CANT_HAVE_DEFAULT, MYF(0), def->change);
384
alter_info->alter_list.erase(alter);
364
if ((def->def= alter->def))
366
/* Use new default */
367
def->flags&= ~NO_DEFAULT_VALUE_FLAG;
371
def->flags|= NO_DEFAULT_VALUE_FLAG;
390
List<CreateField>::iterator def_it= alter_info->create_list.begin();
391
379
while ((def= def_it++)) /* Add new columns */
393
381
if (def->change && ! def->field)
439
427
TODO: detect the situation in compare_tables, behave based
440
428
on engine capabilities.
442
if (alter_table_message.build_method() == message::AlterTable::BUILD_ONLINE)
430
if (alter_info->build_method == HA_BUILD_ONLINE)
444
432
my_error(*session->getQueryString(), ER_NOT_SUPPORTED_YET);
436
alter_info->build_method= HA_BUILD_OFFLINE;
450
if (not alter_info->alter_list.empty())
440
if (alter_info->alter_list.elements)
452
my_error(ER_BAD_FIELD_ERROR, MYF(0), alter_info->alter_list.front().name, table->getMutableShare()->getTableName());
442
my_error(ER_BAD_FIELD_ERROR,
444
alter_info->alter_list.head()->name,
445
table->getMutableShare()->getTableName());
456
if (new_create_list.is_empty())
449
if (not new_create_list.elements)
458
my_message(ER_CANT_REMOVE_ALL_FIELDS, ER(ER_CANT_REMOVE_ALL_FIELDS), MYF(0));
451
my_message(ER_CANT_REMOVE_ALL_FIELDS,
452
ER(ER_CANT_REMOVE_ALL_FIELDS),
463
458
Collect all keys which isn't in drop list. Add only those
464
459
for which some fields exists.
466
KeyInfo *key_info= table->key_info;
467
461
for (uint32_t i= 0; i < table->getShare()->sizeKeys(); i++, key_info++)
469
463
char *key_name= key_info->name;
471
vector<string>::iterator it= drop_keys.begin();
472
while ((it != drop_keys.end()))
467
while ((drop= drop_it++))
474
if (! my_strcasecmp(system_charset_info, key_name, (*it).c_str()))
469
if (drop->type == AlterDrop::KEY &&
470
! my_strcasecmp(system_charset_info, key_name, drop->name))
479
if (it != drop_keys.end())
485
480
KeyPartInfo *key_part= key_info->key_part;
486
List<Key_part_spec> key_parts;
487
482
for (uint32_t j= 0; j < key_info->key_parts; j++, key_part++)
489
484
if (! key_part->field)
648
if (drop_keys.size())
650
my_error(ER_CANT_DROP_FIELD_OR_KEY,
652
drop_keys.front().c_str());
656
if (drop_columns.size())
658
my_error(ER_CANT_DROP_FIELD_OR_KEY,
660
drop_columns.front().c_str());
664
if (drop_fkeys.size())
666
my_error(ER_CANT_DROP_FIELD_OR_KEY,
668
drop_fkeys.front().c_str());
672
if (not alter_info->alter_list.empty())
674
my_error(ER_CANT_DROP_FIELD_OR_KEY,
676
alter_info->alter_list.front().name);
642
if (alter_info->drop_list.elements)
644
my_error(ER_CANT_DROP_FIELD_OR_KEY,
646
alter_info->drop_list.head()->name);
650
if (alter_info->alter_list.elements)
652
my_error(ER_CANT_DROP_FIELD_OR_KEY,
654
alter_info->alter_list.head()->name);
801
782
static bool alter_table_manage_keys(Session *session,
802
783
Table *table, int indexes_were_disabled,
803
const message::AlterTable &alter_table_message)
784
enum enum_enable_or_disable keys_onoff)
806
if (alter_table_message.has_alter_keys_onoff()
807
&& alter_table_message.alter_keys_onoff().enable())
787
switch (keys_onoff) {
809
789
error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
812
if ((! alter_table_message.has_alter_keys_onoff() && indexes_were_disabled)
813
|| (alter_table_message.has_alter_keys_onoff()
814
&& ! alter_table_message.alter_keys_onoff().enable()))
792
if (not indexes_were_disabled)
794
/* fall-through: disabled indexes */
816
796
error= table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
1004
979
tmp.reset(ALTER_KEYS_ONOFF);
1005
980
tmp&= alter_info->flags;
1007
if((not (tmp.any()) && not table->getShare()->getType())) // no need to touch frm
982
if (not (tmp.any()) && not table->getShare()->getType()) // no need to touch frm
1009
if (alter_table_message.has_alter_keys_onoff())
1011
error= apply_online_alter_keys_onoff(session, table,
1012
alter_table_message.alter_keys_onoff());
1014
if (error == HA_ERR_WRONG_COMMAND)
1017
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1018
ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
1023
if (alter_table_message.has_rename())
1025
error= apply_online_rename_table(session,
1028
original_table_identifier,
1029
new_table_identifier,
1030
alter_table_message.rename());
1032
if (error == HA_ERR_WRONG_COMMAND)
1035
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1036
ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
984
switch (alter_info->keys_onoff)
991
wait_while_table_is_used() ensures that table being altered is
992
opened only by this thread and that Table::TableShare::version
993
of Table object corresponding to this table is 0.
994
The latter guarantees that no DML statement will open this table
995
until ALTER Table finishes (i.e. until close_thread_tables())
996
while the fact that the table is still open gives us protection
997
from concurrent DDL statements.
1000
boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex()); /* DDL wait for/blocker */
1001
wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
1003
error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
1005
/* COND_refresh will be signaled in close_thread_tables() */
1010
boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex()); /* DDL wait for/blocker */
1011
wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
1013
error= table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
1015
/* COND_refresh will be signaled in close_thread_tables() */
1019
if (error == HA_ERR_WRONG_COMMAND)
1022
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1023
ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
1027
boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex()); /* Lock to remove all instances of table from table cache before ALTER */
1029
Unlike to the above case close_cached_table() below will remove ALL
1030
instances of Table from table cache (it will also remove table lock
1031
held by this thread). So to make actual table renaming and writing
1032
to binlog atomic we have to put them into the same critical section
1033
protected by table::Cache::singleton().mutex() mutex. This also removes gap for races between
1034
access() and rename_table() calls.
1037
if (not error && not (original_table_identifier == new_table_identifier))
1039
session->set_proc_info("rename");
1041
Then do a 'simple' rename of the table. First we need to close all
1042
instances of 'source' table.
1044
session->close_cached_table(table);
1046
Then, we want check once again that target table does not exist.
1047
Actually the order of these two steps does not matter since
1048
earlier we took name-lock on the target table, so we do them
1049
in this particular order only to be consistent with 5.0, in which
1050
we don't take this name-lock and where this order really matters.
1051
@todo Investigate if we need this access() check at all.
1053
if (plugin::StorageEngine::doesTableExist(*session, new_table_identifier))
1055
my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
1060
if (rename_table(*session, original_engine, original_table_identifier, new_table_identifier))
1067
if (error == HA_ERR_WRONG_COMMAND)
1070
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1071
ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
1043
1077
TransactionServices &transaction_services= TransactionServices::singleton();
1044
1078
transaction_services.allocateNewTransactionId();
1045
transaction_services.rawStatement(*session,
1046
*session->getQueryString(),
1047
*session->schema());
1079
write_bin_log(session, *session->getQueryString());
1048
1080
session->my_ok();
1050
1082
else if (error > EE_OK) // If we have already set the error, we pass along -1
1061
if (alter_table_message.build_method() == message::AlterTable::BUILD_ONLINE)
1063
my_error(*session->getQueryString(), ER_NOT_SUPPORTED_YET);
1067
1093
/* We have to do full alter table. */
1068
1094
new_engine= create_info->db_type;
1070
if (prepare_alter_table(session, table, create_info, original_proto, create_proto, alter_table_message, alter_info))
1096
if (prepare_alter_table(session, table, create_info, original_proto, create_proto, alter_info))
1075
1101
set_table_default_charset(create_info, new_table_identifier.getSchemaName().c_str());
1103
alter_info->build_method= HA_BUILD_OFFLINE;
1077
1105
snprintf(tmp_name, sizeof(tmp_name), "%s-%lx_%"PRIx64, TMP_FILE_PREFIX, (unsigned long) current_pid, session->thread_id);
1079
1107
/* Create a temporary table with the new format */
1337
1362
(ulong) (copied + deleted), (ulong) deleted,
1338
1363
(ulong) session->cuted_fields);
1339
1364
session->my_ok(copied + deleted, 0, 0L, tmp_name);
1365
session->some_tables_deleted= false;
1343
static int apply_online_alter_keys_onoff(Session *session,
1345
const message::AlterTable::AlterKeysOnOff &op)
1352
wait_while_table_is_used() ensures that table being altered is
1353
opened only by this thread and that Table::TableShare::version
1354
of Table object corresponding to this table is 0.
1355
The latter guarantees that no DML statement will open this table
1356
until ALTER Table finishes (i.e. until close_thread_tables())
1357
while the fact that the table is still open gives us protection
1358
from concurrent DDL statements.
1361
boost::mutex::scoped_lock scopedLock(table::Cache::mutex()); /* DDL wait for/blocker */
1362
wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
1364
error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
1366
/* COND_refresh will be signaled in close_thread_tables() */
1371
boost::mutex::scoped_lock scopedLock(table::Cache::mutex()); /* DDL wait for/blocker */
1372
wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
1374
error= table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
1376
/* COND_refresh will be signaled in close_thread_tables() */
1382
static int apply_online_rename_table(Session *session,
1384
plugin::StorageEngine *original_engine,
1385
identifier::Table &original_table_identifier,
1386
identifier::Table &new_table_identifier,
1387
const message::AlterTable::RenameTable &alter_operation)
1391
boost::mutex::scoped_lock scopedLock(table::Cache::mutex()); /* Lock to remove all instances of table from table cache before ALTER */
1393
Unlike to the above case close_cached_table() below will remove ALL
1394
instances of Table from table cache (it will also remove table lock
1395
held by this thread). So to make actual table renaming and writing
1396
to binlog atomic we have to put them into the same critical section
1397
protected by table::Cache::mutex() mutex. This also removes gap for races between
1398
access() and rename_table() calls.
1401
if (not (original_table_identifier == new_table_identifier))
1403
assert(alter_operation.to_name() == new_table_identifier.getTableName());
1404
assert(alter_operation.to_schema() == new_table_identifier.getSchemaName());
1406
session->set_proc_info("rename");
1408
Then do a 'simple' rename of the table. First we need to close all
1409
instances of 'source' table.
1411
session->close_cached_table(table);
1413
Then, we want check once again that target table does not exist.
1414
Actually the order of these two steps does not matter since
1415
earlier we took name-lock on the target table, so we do them
1416
in this particular order only to be consistent with 5.0, in which
1417
we don't take this name-lock and where this order really matters.
1418
@todo Investigate if we need this access() check at all.
1420
if (plugin::StorageEngine::doesTableExist(*session, new_table_identifier))
1422
my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
1427
if (rename_table(*session, original_engine, original_table_identifier, new_table_identifier))
1436
1370
bool alter_table(Session *session,
1437
1371
identifier::Table &original_table_identifier,
1438
1372
identifier::Table &new_table_identifier,
1450
message::AlterTable *alter_table_message= session->lex().alter_table();
1452
alter_table_message->set_catalog(original_table_identifier.getCatalogName());
1453
alter_table_message->set_schema(original_table_identifier.getSchemaName());
1454
alter_table_message->set_name(original_table_identifier.getTableName());
1456
if (alter_table_message->operations_size()
1457
&& (alter_table_message->operations(0).operation()
1458
== message::AlterTable::AlterTableOperation::DISCARD_TABLESPACE
1459
|| alter_table_message->operations(0).operation()
1460
== message::AlterTable::AlterTableOperation::IMPORT_TABLESPACE))
1385
if (alter_info->tablespace_op != NO_TABLESPACE_OP)
1462
bool discard= (alter_table_message->operations(0).operation() ==
1463
message::AlterTable::AlterTableOperation::DISCARD_TABLESPACE);
1464
1387
/* DISCARD/IMPORT TABLESPACE is always alone in an ALTER Table */
1465
return discard_or_import_tablespace(session, table_list, discard);
1388
return discard_or_import_tablespace(session, table_list, alter_info->tablespace_op);
1468
1391
session->set_proc_info("init");
1602
1524
FileSort filesort(*session);
1603
from->sort.io_cache= new internal::io_cache_st;
1525
from->sort.io_cache= new internal::IO_CACHE;
1605
1527
tables.table= from;
1606
tables.setTableName(from->getMutableShare()->getTableName());
1607
tables.alias= tables.getTableName();
1528
tables.setTableName(const_cast<char *>(from->getMutableShare()->getTableName()));
1529
tables.alias= const_cast<char *>(tables.getTableName());
1608
1530
tables.setSchemaName(const_cast<char *>(from->getMutableShare()->getSchemaName()));
1611
if (session->lex().select_lex.setup_ref_array(session, order_num) ||
1612
setup_order(session, session->lex().select_lex.ref_pointer_array,
1533
if (session->lex->select_lex.setup_ref_array(session, order_num) ||
1534
setup_order(session, session->lex->select_lex.ref_pointer_array,
1613
1535
&tables, fields, all_fields, order) ||
1614
1536
!(sortorder= make_unireg_sortorder(order, &length, NULL)) ||
1615
1537
(from->sort.found_records= filesort.run(from, sortorder, length,