IB: 0.2 part IV

* BEGIN_TS(), COMMIT_TS() SQL functions;
* VTQ instead of packed stores secs + usecs like my_timestamp_to_binary() does;
* versioned SELECT to IB is translated with COMMIT_TS();
* SQL fixes:
  - FOR_SYSTEM_TIME_UNSPECIFIED condition compares to TIMESTAMP_MAX_VALUE;
  - segfault fix #36: multiple execute of prepared stmt;
  - different tables to same stored procedure fix (#39)
* Fixes of previous parts: ON DUPLICATE KEY, other misc fixes.
This commit is contained in:
Aleksey Midenkov 2016-09-30 13:15:08 +00:00
parent f13bf7178d
commit 53a892fcfd
31 changed files with 585 additions and 123 deletions

View File

@ -5448,7 +5448,7 @@ bool Field_timestampf::set_max()
DBUG_ENTER("Field_timestampf::set_max");
ASSERT_COLUMN_MARKED_FOR_WRITE_OR_COMPUTED;
mi_int4store(ptr, 0x7fffffff);
mi_int4store(ptr, TIMESTAMP_MAX_VALUE);
memset(ptr + 4, 0x0, value_length() - 4);
DBUG_RETURN(FALSE);

View File

@ -35,6 +35,7 @@
#include "structs.h" /* SHOW_COMP_OPTION */
#include "sql_array.h" /* Dynamic_array<> */
#include "mdl.h"
#include "vtq.h"
#include "sql_analyze_stmt.h" // for Exec_time_tracker
@ -1387,9 +1388,10 @@ struct handlerton
TABLE_SHARE *share, HA_CREATE_INFO *info);
/*
Engine supports System Versioning
System Versioning
*/
bool versioned();
bool versioned() const;
bool (*vers_get_vtq_ts)(THD* thd, MYSQL_TIME *out, ulonglong trx_id, vtq_field_t field);
};
@ -4493,7 +4495,7 @@ int del_global_index_stat(THD *thd, TABLE* table, KEY* key_info);
int del_global_table_stat(THD *thd, LEX_STRING *db, LEX_STRING *table);
inline
bool handlerton::versioned()
bool handlerton::versioned() const
{
return flags & HTON_SUPPORTS_SYS_VERSIONING;
}

View File

@ -5621,6 +5621,7 @@ bool Item_field::fix_fields(THD *thd, Item **reference)
expression to 'reference', i.e. it substitute that expression instead
of this Item_field
*/
DBUG_ASSERT(context);
if ((from_field= find_field_in_tables(thd, this,
context->first_name_resolution_table,
context->last_name_resolution_table,

View File

@ -3913,7 +3913,7 @@ public:
class Item_datetime_literal: public Item_temporal_literal
{
public:
Item_datetime_literal(THD *thd, MYSQL_TIME *ltime, uint dec_arg):
Item_datetime_literal(THD *thd, MYSQL_TIME *ltime, uint dec_arg= 0):
Item_temporal_literal(thd, ltime, dec_arg)
{
max_length= MAX_DATETIME_WIDTH + (decimals ? decimals + 1 : 0);

View File

@ -6678,6 +6678,92 @@ Create_func_year_week::create_native(THD *thd, LEX_STRING name,
}
/* System Versioning: BEGIN_TS(), COMMIT_TS() */
class Create_func_begin_ts : public Create_native_func
{
public:
virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
static Create_func_begin_ts s_singleton;
protected:
Create_func_begin_ts() {}
virtual ~Create_func_begin_ts() {}
};
Create_func_begin_ts Create_func_begin_ts::s_singleton;
Item*
Create_func_begin_ts::create_native(THD *thd, LEX_STRING name,
List<Item> *item_list)
{
Item *func= NULL;
int arg_count= 0;
if (item_list != NULL)
arg_count= item_list->elements;
switch (arg_count) {
case 1:
{
Item *param_1= item_list->pop();
func= new (thd->mem_root) Item_func_vtq_ts(thd, param_1, VTQ_BEGIN_TS);
break;
}
default:
{
my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
break;
}
}
return func;
}
class Create_func_commit_ts : public Create_native_func
{
public:
virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
static Create_func_commit_ts s_singleton;
protected:
Create_func_commit_ts() {}
virtual ~Create_func_commit_ts() {}
};
Create_func_commit_ts Create_func_commit_ts::s_singleton;
Item*
Create_func_commit_ts::create_native(THD *thd, LEX_STRING name,
List<Item> *item_list)
{
Item *func= NULL;
int arg_count= 0;
if (item_list != NULL)
arg_count= item_list->elements;
switch (arg_count) {
case 1:
{
Item *param_1= item_list->pop();
func= new (thd->mem_root) Item_func_vtq_ts(thd, param_1, VTQ_COMMIT_TS);
break;
}
default:
{
my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
break;
}
}
return func;
}
struct Native_func_registry
{
LEX_STRING name;
@ -6718,6 +6804,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("ASWKT") }, GEOM_BUILDER(Create_func_as_wkt)},
{ { C_STRING_WITH_LEN("ATAN") }, BUILDER(Create_func_atan)},
{ { C_STRING_WITH_LEN("ATAN2") }, BUILDER(Create_func_atan)},
{ { C_STRING_WITH_LEN("BEGIN_TS") }, BUILDER(Create_func_begin_ts)},
{ { C_STRING_WITH_LEN("BENCHMARK") }, BUILDER(Create_func_benchmark)},
{ { C_STRING_WITH_LEN("BIN") }, BUILDER(Create_func_bin)},
{ { C_STRING_WITH_LEN("BINLOG_GTID_POS") }, BUILDER(Create_func_binlog_gtid_pos)},
@ -6735,6 +6822,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("COLUMN_EXISTS") }, BUILDER(Create_func_dyncol_exists)},
{ { C_STRING_WITH_LEN("COLUMN_LIST") }, BUILDER(Create_func_dyncol_list)},
{ { C_STRING_WITH_LEN("COLUMN_JSON") }, BUILDER(Create_func_dyncol_json)},
{ { C_STRING_WITH_LEN("COMMIT_TS") }, BUILDER(Create_func_commit_ts)},
{ { C_STRING_WITH_LEN("COMPRESS") }, BUILDER(Create_func_compress)},
{ { C_STRING_WITH_LEN("CONCAT") }, BUILDER(Create_func_concat)},
{ { C_STRING_WITH_LEN("CONCAT_WS") }, BUILDER(Create_func_concat_ws)},

View File

@ -3273,3 +3273,71 @@ bool Item_func_last_day::get_date(MYSQL_TIME *ltime, ulonglong fuzzy_date)
ltime->time_type= MYSQL_TIMESTAMP_DATE;
return (null_value= 0);
}
Item_func_vtq_ts::Item_func_vtq_ts(
THD *thd,
Item* a,
vtq_field_t _vtq_field,
handlerton* _hton) :
Item_datetimefunc(thd, a),
vtq_field(_vtq_field),
hton(_hton)
{
decimals= 6;
null_value= true;
DBUG_ASSERT(arg_count == 1 && args[0]);
}
Item_func_vtq_ts::Item_func_vtq_ts(
THD *thd,
Item* a,
vtq_field_t _vtq_field) :
Item_datetimefunc(thd, a),
vtq_field(_vtq_field),
hton(NULL)
{
decimals= 6;
null_value= true;
DBUG_ASSERT(arg_count == 1 && args[0]);
}
bool Item_func_vtq_ts::get_date(MYSQL_TIME *res, ulonglong fuzzy_date)
{
THD *thd= current_thd; // can it differ from constructor's?
DBUG_ASSERT(thd);
ulonglong trx_id= args[0]->val_uint();
if (trx_id == ULONGLONG_MAX)
{
null_value= false;
thd->variables.time_zone->gmt_sec_to_TIME(res, TIMESTAMP_MAX_VALUE);
return false;
}
if (!hton)
{
if (args[0]->type() == Item::FIELD_ITEM)
{
Item_field *f=
static_cast<Item_field *>(args[0]);
DBUG_ASSERT(
f->field &&
f->field->table &&
f->field->table->s &&
f->field->table->s->db_plugin);
hton= plugin_hton(f->field->table->s->db_plugin);
DBUG_ASSERT(hton);
}
else if (innodb_plugin)
{
hton= plugin_hton(plugin_int_to_ref(innodb_plugin));
DBUG_ASSERT(hton);
}
}
if (!hton)
return true;
null_value= !hton->vers_get_vtq_ts(thd, res, trx_id, vtq_field);
return false;
}

View File

@ -1286,4 +1286,26 @@ public:
{ return get_item_copy<Item_func_last_day>(thd, mem_root, this); }
};
#include "vtq.h"
class Item_func_vtq_ts :public Item_datetimefunc
{
vtq_field_t vtq_field;
handlerton *hton;
public:
Item_func_vtq_ts(THD *thd, Item* a, vtq_field_t _vtq_field, handlerton *hton);
Item_func_vtq_ts(THD *thd, Item* a, vtq_field_t _vtq_field);
const char *func_name() const
{
if (vtq_field == VTQ_BEGIN_TS)
{
return "begin_ts";
}
return "commit_ts";
}
bool get_date(MYSQL_TIME *res, ulonglong fuzzy_date);
Item *get_copy(THD *thd, MEM_ROOT *mem_root)
{ return get_item_copy<Item_func_vtq_ts>(thd, mem_root, this); }
};
#endif /* ITEM_TIMEFUNC_INCLUDED */

View File

@ -2529,6 +2529,7 @@ sp_head::restore_thd_mem_root(THD *thd)
Item *flist= free_list; // The old list
set_query_arena(thd); // Get new free_list and mem_root
state= STMT_INITIALIZED_FOR_SP;
is_stored_procedure= true;
DBUG_PRINT("info", ("mem_root 0x%lx returned from thd mem root 0x%lx",
(ulong) &mem_root, (ulong) &thd->mem_root));

View File

@ -7962,7 +7962,7 @@ fill_record(THD *thd, TABLE *table_arg, List<Item> &fields, List<Item> &values,
ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
rfield->field_name, table->s->table_name.str);
}
if (table->versioned_by_sql() && rfield->is_generated() &&
if (table->versioned() && rfield->is_generated() &&
!ignore_errors)
{
my_error(ER_GENERATED_FIELD_CANNOT_BE_SET_BY_USER, MYF(0));
@ -8216,7 +8216,7 @@ fill_record(THD *thd, TABLE *table, Field **ptr, List<Item> &values,
}
}
if (table->versioned_by_sql() && field->is_generated() &&
if (table->versioned() && field->is_generated() &&
!ignore_errors)
{
my_error(ER_GENERATED_FIELD_CANNOT_BE_SET_BY_USER, MYF(0));

View File

@ -706,6 +706,11 @@ extern "C" void thd_kill_timeout(THD* thd)
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
Time_zone * thd_get_timezone(THD * thd)
{
DBUG_ASSERT(thd && thd->variables.time_zone);
return thd->variables.time_zone;
}
THD::THD(my_thread_id id, bool is_wsrep_applier)
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
@ -3587,6 +3592,7 @@ void Query_arena::set_query_arena(Query_arena *set)
mem_root= set->mem_root;
free_list= set->free_list;
state= set->state;
is_stored_procedure= set->is_stored_procedure;
}

View File

@ -940,6 +940,11 @@ public:
enum_state state;
protected:
friend class sp_head;
bool is_stored_procedure;
public:
/* We build without RTTI, so dynamic_cast can't be used. */
enum Type
{
@ -947,7 +952,8 @@ public:
};
Query_arena(MEM_ROOT *mem_root_arg, enum enum_state state_arg) :
free_list(0), mem_root(mem_root_arg), state(state_arg)
free_list(0), mem_root(mem_root_arg), state(state_arg),
is_stored_procedure(state_arg == STMT_INITIALIZED_FOR_SP ? true : false)
{ INIT_ARENA_DBUG_INFO; }
/*
This constructor is used only when Query_arena is created as
@ -967,6 +973,8 @@ public:
{ return state == STMT_PREPARED || state == STMT_EXECUTED; }
inline bool is_conventional() const
{ return state == STMT_CONVENTIONAL_EXECUTION; }
inline bool is_sp_execute() const
{ return is_stored_procedure; }
inline void* alloc(size_t size) { return alloc_root(mem_root,size); }
inline void* calloc(size_t size)
@ -6021,5 +6029,4 @@ public:
#endif /* MYSQL_SERVER */
#endif /* SQL_CLASS_INCLUDED */

View File

@ -2200,6 +2200,7 @@ void st_select_lex::init_query()
join= 0;
having= prep_having= where= prep_where= 0;
cond_pushed_into_where= cond_pushed_into_having= 0;
saved_conds= 0;
olap= UNSPECIFIED_OLAP_TYPE;
having_fix_field= 0;
context.select_lex= this;

View File

@ -810,6 +810,7 @@ public:
Item *prep_having;/* saved HAVING clause for prepared statement processing */
Item *cond_pushed_into_where; /* condition pushed into the select's WHERE */
Item *cond_pushed_into_having; /* condition pushed into the select's HAVING */
Item *saved_conds;
/* Saved values of the WHERE and HAVING clauses*/
Item::cond_result cond_value, having_value;
/* point on lex in which it was created, used in view subquery detection */

View File

@ -233,6 +233,8 @@ static int plugin_array_version=0;
static bool initialized= 0;
ulong dlopen_count;
st_plugin_int* innodb_plugin= NULL;
/*
write-lock on LOCK_system_variables_hash is required before modifying
@ -1422,6 +1424,18 @@ static int plugin_initialize(MEM_ROOT *tmp_root, struct st_plugin_int *plugin,
}
state= PLUGIN_IS_READY; // plugin->init() succeeded
{
static const char * INNODB= "InnoDB";
static const uint INNODB_LEN= strlen(INNODB);
if (!my_strnncoll(&my_charset_latin1,
(const uchar *) plugin->name.str, plugin->name.length,
(const uchar *) INNODB, INNODB_LEN))
{
innodb_plugin= plugin;
}
}
if (plugin->plugin->status_vars)
{
/*

View File

@ -160,6 +160,8 @@ extern ulong plugin_maturity;
extern TYPELIB plugin_maturity_values;
extern const char *plugin_maturity_names[];
extern st_plugin_int* innodb_plugin;
extern int plugin_init(int *argc, char **argv, int init_flags);
extern void plugin_shutdown(void);
void add_plugin_options(DYNAMIC_ARRAY *options, MEM_ROOT *mem_root);

View File

@ -55,6 +55,7 @@
#include "sql_statistics.h"
#include "sql_cte.h"
#include "sql_window.h"
#include "tztime.h"
#include "debug_sync.h" // DEBUG_SYNC
#include <m_ctype.h>
@ -673,10 +674,19 @@ setup_for_system_time(THD *thd, TABLE_LIST *tables, COND **conds, SELECT_LEX *se
TABLE_LIST *table;
int versioned_tables= 0;
Query_arena *arena= 0, backup;
bool is_prepare= thd->stmt_arena->is_stmt_prepare();
if (!thd->stmt_arena->is_conventional()
&& !is_prepare
&& !thd->stmt_arena->is_sp_execute())
{
DBUG_RETURN(0);
}
for (table= tables; table; table= table->next_local)
{
if (table->table && table->table->versioned_by_sql())
if (table->table && table->table->versioned())
versioned_tables++;
else if (table->system_versioning.type != FOR_SYSTEM_TIME_UNSPECIFIED)
{
@ -688,52 +698,115 @@ setup_for_system_time(THD *thd, TABLE_LIST *tables, COND **conds, SELECT_LEX *se
if (versioned_tables == 0)
DBUG_RETURN(0);
/* For prepared statements we create items on statement arena,
because they must outlive execution phase for multiple executions. */
arena= thd->activate_stmt_arena_if_needed(&backup);
if (select_lex->saved_conds)
{
DBUG_ASSERT(thd->stmt_arena->is_sp_execute());
*conds= select_lex->saved_conds;
}
else if (thd->stmt_arena->is_sp_execute())
{
if (thd->stmt_arena->is_stmt_execute())
*conds= 0;
else if (*conds)
select_lex->saved_conds= (*conds)->copy_andor_structure(thd);
}
for (table= tables; table; table= table->next_local)
{
if (table->table && table->table->versioned_by_sql())
if (table->table && table->table->versioned())
{
Field *fstart= table->table->vers_start_field();
Field *fend= table->table->vers_end_field();
Item *istart= new (thd->mem_root) Item_field(thd, fstart);
Item *iend= new (thd->mem_root) Item_field(thd, fend);
Item *cond1= 0, *cond2= 0, *curr = 0;
DBUG_ASSERT(select_lex->parent_lex);
Name_resolution_context *context= select_lex->parent_lex->current_context();
DBUG_ASSERT(context);
Item *row_start= new (thd->mem_root) Item_field(thd, context, fstart);
Item *row_end= new (thd->mem_root) Item_field(thd, context, fend);
Item *row_end2= row_end;
if (!table->table->versioned_by_sql())
{
DBUG_ASSERT(table->table->s && table->table->s->db_plugin);
row_start= new (thd->mem_root) Item_func_vtq_ts(
thd,
row_start,
VTQ_COMMIT_TS,
plugin_hton(table->table->s->db_plugin));
row_end= new (thd->mem_root) Item_func_vtq_ts(
thd,
row_end,
VTQ_COMMIT_TS,
plugin_hton(table->table->s->db_plugin));
}
Item *cond1= 0, *cond2= 0, *curr= 0;
switch (table->system_versioning.type)
{
case FOR_SYSTEM_TIME_UNSPECIFIED:
curr= new (thd->mem_root) Item_func_now_local(thd, 6);
cond1= new (thd->mem_root) Item_func_le(thd, istart, curr);
cond2= new (thd->mem_root) Item_func_gt(thd, iend, curr);
if (table->table->versioned_by_sql())
{
MYSQL_TIME max_time;
thd->variables.time_zone->gmt_sec_to_TIME(&max_time, TIMESTAMP_MAX_VALUE);
curr= new (thd->mem_root) Item_datetime_literal(thd, &max_time);
cond1= new (thd->mem_root) Item_func_eq(thd, row_end, curr);
}
else
{
curr= new (thd->mem_root) Item_int(thd, ULONGLONG_MAX);
cond1= new (thd->mem_root) Item_func_eq(thd, row_end2, curr);
}
break;
case FOR_SYSTEM_TIME_AS_OF:
cond1= new (thd->mem_root) Item_func_le(thd, istart,
table->system_versioning.start);
cond2= new (thd->mem_root) Item_func_gt(thd, iend,
table->system_versioning.start);
cond1= new (thd->mem_root) Item_func_le(thd, row_start,
table->system_versioning.start);
cond2= new (thd->mem_root) Item_func_gt(thd, row_end,
table->system_versioning.start);
break;
case FOR_SYSTEM_TIME_FROM_TO:
cond1= new (thd->mem_root) Item_func_lt(thd, istart,
cond1= new (thd->mem_root) Item_func_lt(thd, row_start,
table->system_versioning.end);
cond2= new (thd->mem_root) Item_func_ge(thd, iend,
cond2= new (thd->mem_root) Item_func_ge(thd, row_end,
table->system_versioning.start);
break;
case FOR_SYSTEM_TIME_BETWEEN:
cond1= new (thd->mem_root) Item_func_le(thd, istart,
cond1= new (thd->mem_root) Item_func_le(thd, row_start,
table->system_versioning.end);
cond2= new (thd->mem_root) Item_func_ge(thd, iend,
cond2= new (thd->mem_root) Item_func_ge(thd, row_end,
table->system_versioning.start);
break;
default:
DBUG_ASSERT(0);
}
if (cond1 && cond2)
if (cond1)
{
COND *system_time_cond= new (thd->mem_root) Item_cond_and(thd, cond1, cond2);
thd->change_item_tree(conds, and_items(thd, *conds, system_time_cond));
cond1= and_items(thd,
*conds,
and_items(thd,
cond2,
cond1));
if (arena)
*conds= cond1;
else
thd->change_item_tree(conds, cond1);
table->system_versioning.is_moved_to_where= true;
}
}
}
if (arena)
{
thd->restore_active_arena(arena, &backup);
}
DBUG_RETURN(0);
}

View File

@ -2224,7 +2224,7 @@ int show_create_table(THD *thd, TABLE_LIST *table_list, String *packet,
hton->index_options);
}
if (table->versioned_by_sql())
if (table->versioned())
{
const Field *fs = table->vers_start_field();
const Field *fe = table->vers_end_field();
@ -2273,7 +2273,7 @@ int show_create_table(THD *thd, TABLE_LIST *table_list, String *packet,
add_table_options(thd, table, create_info_arg,
table_list->schema_table != 0, 0, packet);
if (table->versioned_by_sql())
if (table->versioned())
{
packet->append(STRING_WITH_LEN(" WITH SYSTEM VERSIONING"));
}

View File

@ -476,18 +476,6 @@ void localtime_to_TIME(MYSQL_TIME *to, struct tm *from)
}
/*
Convert seconds since Epoch to TIME
*/
void unix_time_to_TIME(MYSQL_TIME *to, time_t secs, suseconds_t usecs)
{
struct tm tm_time;
localtime_r(&secs, &tm_time);
localtime_to_TIME(to, &tm_time);
to->second_part = usecs;
}
void calc_time_from_sec(MYSQL_TIME *to, long seconds, long microseconds)
{
long t_seconds;

View File

@ -171,15 +171,6 @@ bool calc_time_diff(const MYSQL_TIME *l_time1, const MYSQL_TIME *l_time2,
int lsign, MYSQL_TIME *l_time3, ulonglong fuzzydate);
int my_time_compare(const MYSQL_TIME *a, const MYSQL_TIME *b);
void localtime_to_TIME(MYSQL_TIME *to, struct tm *from);
void unix_time_to_TIME(MYSQL_TIME *to, time_t secs, suseconds_t usecs);
inline
longlong unix_time_to_packed(time_t secs, suseconds_t usecs)
{
MYSQL_TIME mysql_time;
unix_time_to_TIME(&mysql_time, secs, usecs);
return pack_time(&mysql_time);
}
void calc_time_from_sec(MYSQL_TIME *to, long seconds, long microseconds);
uint calc_week(MYSQL_TIME *l_time, uint week_behaviour, uint *year);

View File

@ -89,6 +89,7 @@ extern my_time_t sec_since_epoch_TIME(MYSQL_TIME *t);
static const int MY_TZ_TABLES_COUNT= 4;
extern Time_zone* thd_get_timezone(THD* thd);
#endif /* !defined(TESTTIME) && !defined(TZINFO2SQL) */
#endif /* TZTIME_INCLUDED */

25
sql/vtq.h Executable file
View File

@ -0,0 +1,25 @@
#ifndef VTQ_INCLUDED
#define VTQ_INCLUDED
/* Copyright (c) 2016, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
enum vtq_field_t
{
VTQ_BEGIN_TS = 0,
VTQ_COMMIT_TS
};
#endif /* VTQ_INCLUDED */

View File

@ -1920,7 +1920,7 @@ dict_create_or_check_vtq_table(void)
row_drop_table_for_mysql("SYS_VTQ", trx, false, TRUE);
}
ib::warn() <<
ib::info() <<
"Creating VTQ system table.";
srv_file_per_table_backup = srv_file_per_table;

View File

@ -846,8 +846,8 @@ dict_process_sys_vtq(
mem_heap_t* heap, /*!< in/out: heap memory */
const rec_t* rec, /*!< in: current rec */
trx_id_t* col_trx_id, /*!< out: field values */
ullong* col_begin_ts,
ullong* col_commit_ts,
timeval* col_begin_ts,
timeval* col_commit_ts,
char** col_concurr_trx)
{
ulint len, col, concurr_n;
@ -877,7 +877,8 @@ char** col_concurr_trx)
if (len != sizeof(ullong))
return dict_print_error(heap, col, len, sizeof(ullong));
*col_begin_ts = mach_read_from_8(field);
col_begin_ts->tv_sec = mach_read_from_4(field);
col_begin_ts->tv_usec = mach_read_from_4(field + 4);
/* COMMIT_TS */
field = rec_get_nth_field_old(
rec, (col = DICT_FLD__SYS_VTQ__COMMIT_TS), &len);
@ -885,7 +886,8 @@ char** col_concurr_trx)
if (len != sizeof(ullong))
return dict_print_error(heap, col, len, sizeof(ullong));
*col_commit_ts = mach_read_from_8(field);
col_commit_ts->tv_sec = mach_read_from_4(field);
col_commit_ts->tv_usec = mach_read_from_4(field + 4);
/* CONCURR_TRX */
field = rec_get_nth_field_old(
rec, (col = DICT_FLD__SYS_VTQ__CONCURR_TRX), &len);

View File

@ -53,6 +53,7 @@ this program; if not, write to the Free Software Foundation, Inc.,
// MySQL 5.7 Header */
// #include <sql_thd_internal_api.h>
#include <table_cache.h>
#include <tztime.h>
#include <my_check_opt.h>
#include <my_bitmap.h>
#include <mysql/service_thd_alloc.h>
@ -120,6 +121,7 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include "trx0xa.h"
#include "ut0mem.h"
#include "row0ext.h"
#include "vtq.h"
#define thd_get_trx_isolation(X) ((enum_tx_isolation)thd_tx_isolation(X))
@ -1539,6 +1541,9 @@ innobase_fts_store_docid(
}
#endif
bool
innobase_get_vtq_ts(THD* thd, MYSQL_TIME *out, ulonglong in_trx_id, vtq_field_t field);
/*************************************************************//**
Check for a valid value of innobase_commit_concurrency.
@return 0 for valid innodb_commit_concurrency */
@ -3871,6 +3876,9 @@ innobase_init(
innobase_hton->table_options = innodb_table_option_list;
/* System Versioning */
innobase_hton->vers_get_vtq_ts = innobase_get_vtq_ts;
innodb_remember_check_sysvar_funcs();
ut_a(DATA_MYSQL_TRUE_VARCHAR == (ulint)MYSQL_TYPE_VARCHAR);
@ -23109,3 +23117,108 @@ ib_push_frm_error(
break;
}
}
inline
void
innobase_get_vtq_ts_result(THD* thd, vtq_query_t* q, MYSQL_TIME *out, vtq_field_t field)
{
switch (field) {
case VTQ_BEGIN_TS:
thd_get_timezone(thd)->gmt_sec_to_TIME(out, q->begin_ts.tv_sec);
out->second_part = q->begin_ts.tv_usec;
break;
case VTQ_COMMIT_TS:
thd_get_timezone(thd)->gmt_sec_to_TIME(out, q->commit_ts.tv_sec);
out->second_part = q->commit_ts.tv_usec;
break;
default:
ut_error;
}
}
UNIV_INTERN
bool
innobase_get_vtq_ts(THD* thd, MYSQL_TIME *out, ulonglong _in_trx_id, vtq_field_t field)
{
trx_t* trx;
dict_index_t* index;
btr_pcur_t pcur;
dtuple_t* tuple;
dfield_t* dfield;
trx_id_t trx_id_net;
mtr_t mtr;
mem_heap_t* heap;
rec_t* rec;
ulint len;
byte* result_net;
bool found = false;
DBUG_ENTER("innobase_get_vtq_ts");
if (_in_trx_id == 0) {
DBUG_RETURN(false);
}
ut_ad(sizeof(_in_trx_id) == sizeof(trx_id_t));
trx_id_t in_trx_id = static_cast<trx_id_t>(_in_trx_id);
trx = thd_to_trx(thd);
ut_a(trx);
vtq_query_t* q = &trx->vtq_query;
if (q->trx_id == in_trx_id) {
innobase_get_vtq_ts_result(thd, q, out, field);
DBUG_RETURN(true);
}
index = dict_table_get_first_index(dict_sys->sys_vtq);
heap = mem_heap_create(0);
ut_ad(index);
ut_ad(dict_index_is_clust(index));
mach_write_to_8(
reinterpret_cast<byte*>(&trx_id_net),
in_trx_id);
tuple = dtuple_create(heap, 1);
dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_VTQ__TRX_ID);
dfield_set_data(dfield, &trx_id_net, 8);
dict_index_copy_types(tuple, index, 1);
mtr_start_trx(&mtr, trx);
btr_pcur_open_on_user_rec(index, tuple, PAGE_CUR_GE,
BTR_SEARCH_LEAF, &pcur, &mtr);
if (!btr_pcur_is_on_user_rec(&pcur))
goto not_found;
rec = btr_pcur_get_rec(&pcur);
result_net = rec_get_nth_field_old(rec, DICT_FLD__SYS_VTQ__TRX_ID, &len);
ut_ad(len == 8);
q->trx_id = mach_read_from_8(result_net);
result_net = rec_get_nth_field_old(rec, DICT_FLD__SYS_VTQ__BEGIN_TS, &len);
ut_ad(len == 8);
q->begin_ts.tv_sec = mach_read_from_4(result_net);
q->begin_ts.tv_usec = mach_read_from_4(result_net + 4);
result_net = rec_get_nth_field_old(rec, DICT_FLD__SYS_VTQ__COMMIT_TS, &len);
ut_ad(len == 8);
q->commit_ts.tv_sec = mach_read_from_4(result_net);
q->commit_ts.tv_usec = mach_read_from_4(result_net + 4);
if (q->trx_id != in_trx_id)
goto not_found;
innobase_get_vtq_ts_result(thd, q, out, field);
found = true;
not_found:
btr_pcur_close(&pcur);
mtr_commit(&mtr);
mem_heap_free(heap);
DBUG_RETURN(found);
}

View File

@ -28,6 +28,8 @@ Modified Dec 29, 2014 Jan Lindström (Added sys_semaphore_waits)
#include "ha_prototypes.h"
#include <mysql_version.h>
#include <field.h>
#include <tztime.h>
#include "univ.i"
#include <sql_acl.h>
@ -375,22 +377,19 @@ Auxiliary function to store packed timestamp value in MYSQL_TYPE_DATETIME field.
If the value is ULINT_UNDEFINED then the field it set to NULL.
@return 0 on success */
int
field_store_packed_ts(
field_store_timeval(
/*==============*/
Field* field, /*!< in/out: target field for storage */
ullong n) /*!< in: value to store */
timeval t, /*!< in: value to store */
THD* thd)
{
int ret;
MYSQL_TIME tmp;
if (n != UINT64_UNDEFINED) {
unpack_time(n, &tmp);
ret = field->store_time(&tmp);
field->set_notnull();
} else {
ret = 0; /* success */
field->set_null();
}
thd_get_timezone(thd)->gmt_sec_to_TIME(&tmp, t.tv_sec);
tmp.second_part = t.tv_usec;
ret = field->store_time(&tmp);
field->set_notnull();
return(ret);
}
@ -9711,8 +9710,8 @@ i_s_dict_fill_vtq(
/*========================*/
THD* thd, /*!< in: thread */
ullong col_trx_id, /*!< in: table fields */
ullong col_begin_ts,
ullong col_commit_ts,
timeval& col_begin_ts,
timeval& col_commit_ts,
char* col_concurr_trx,
TABLE* table_to_fill) /*!< in/out: fill this table */
{
@ -9722,8 +9721,8 @@ i_s_dict_fill_vtq(
fields = table_to_fill->field;
OK(field_store_ullong(fields[SYS_VTQ_TRX_ID], col_trx_id));
OK(field_store_packed_ts(fields[SYS_VTQ_BEGIN_TS], col_begin_ts));
OK(field_store_packed_ts(fields[SYS_VTQ_COMMIT_TS], col_commit_ts));
OK(field_store_timeval(fields[SYS_VTQ_BEGIN_TS], col_begin_ts, thd));
OK(field_store_timeval(fields[SYS_VTQ_COMMIT_TS], col_commit_ts, thd));
OK(field_store_string(fields[SYS_VTQ_CONCURR_TRX], col_concurr_trx));
OK(schema_table_store_record(thd, table_to_fill));
@ -9770,8 +9769,8 @@ i_s_sys_vtq_fill_table(
for (int i = 0; rec && i < I_S_SYS_VTQ_LIMIT; ++i) {
const char* err_msg;
trx_id_t col_trx_id;
ullong col_begin_ts;
ullong col_commit_ts;
timeval col_begin_ts;
timeval col_commit_ts;
char* col_concurr_trx;
/* Extract necessary information from SYS_VTQ row */

View File

@ -328,8 +328,8 @@ dict_process_sys_vtq(
mem_heap_t* heap, /*!< in/out: heap memory */
const rec_t* rec, /*!< in: current rec */
ullong* col_trx_id, /*!< out: field values */
ullong* col_begin_ts,
ullong* col_commit_ts,
timeval* col_begin_ts,
timeval* col_commit_ts,
char** col_concurr_trx);
/** Update the record for space_id in SYS_TABLESPACES to this filepath.

View File

@ -35,10 +35,25 @@ void set_row_field_8(
dfield_t* dfield = dtuple_get_nth_field(row, field_num);
ut_ad(dfield->type.len == fsize);
if (dfield->len == UNIV_SQL_NULL) {
byte* buf = static_cast<byte*>(mem_heap_alloc(heap, fsize));
mach_write_to_8(buf, data);
byte* buf = reinterpret_cast<byte*>(mem_heap_alloc(heap, fsize));
dfield_set_data(dfield, buf, fsize);
} else {
mach_write_to_8(dfield->data, data);
}
mach_write_to_8(dfield->data, data);
}
UNIV_INLINE
void set_row_field_8(
dtuple_t* row,
int field_num,
timeval& data,
mem_heap_t* heap)
{
dfield_t* dfield = dtuple_get_nth_field(row, field_num);
ut_ad(dfield->type.len == 8);
if (dfield->len == UNIV_SQL_NULL) {
byte* buf = reinterpret_cast<byte*>(mem_heap_alloc(heap, 8));
dfield_set_data(dfield, buf, 8);
}
mach_write_to_4(reinterpret_cast<byte*>(dfield->data), (ulint) data.tv_sec);
mach_write_to_4(reinterpret_cast<byte*>(dfield->data) + 4, (ulint) data.tv_usec);
}

View File

@ -829,6 +829,12 @@ typedef enum {
TRX_WSREP_ABORT = 1
} trx_abort_t;
struct vtq_query_t
{
trx_id_t trx_id;
timeval begin_ts;
timeval commit_ts;
};
/** Represents an instance of rollback segment along with its state variables.*/
struct trx_undo_ptr_t {
@ -1267,7 +1273,12 @@ struct trx_t {
os_event_t wsrep_event; /* event waited for in srv_conc_slot */
#endif /* WITH_WSREP */
bool vtq_notify_on_commit; /*!< Notify VTQ for System Versioned update */
/* System Versioning */
bool vtq_notify_on_commit;
/*!< Notify VTQ for System Versioned update */
vtq_query_t vtq_query;
trx_id_t* vtq_concurr_trx;
ulint vtq_concurr_n;
ulint magic_n;
/** @return whether any persistent undo log has been generated */

View File

@ -55,7 +55,6 @@ Created 4/20/1996 Heikki Tuuri
#include "fts0types.h"
#include "m_string.h"
#include "gis0geo.h"
#include "sql_time.h"
/*************************************************************************
IMPORTANT NOTE: Any operation that generates redo MUST check that there
@ -3876,12 +3875,6 @@ vers_row_ins_vtq_low(trx_t* trx, mem_heap_t* heap, dtuple_t* row)
err = row_ins_sec_index_entry_low(
flags, BTR_MODIFY_TREE,
index, offsets_heap, heap, entry, trx->id, NULL, false, trx);
///* Report correct index name for duplicate key error. */
// No need to report on commit phase?
//if (err == DB_DUPLICATE_KEY) {
// trx->error_key_num = n_index;
//}
} while (err == DB_SUCCESS);
mem_heap_free(offsets_heap);
@ -3897,10 +3890,10 @@ void vers_notify_vtq(trx_t* trx)
mem_heap_t* heap = mem_heap_create(1024);
dtuple_t* row = dtuple_create(heap, dict_table_get_n_cols(dict_sys->sys_vtq));
ulint now_secs, now_usecs;
ut_usectime(&now_secs, &now_usecs);
ullong begin_ts = unix_time_to_packed(trx->start_time, trx->start_time_micro);
ullong commit_ts = unix_time_to_packed(now_secs, now_usecs);
timeval begin_ts, commit_ts;
begin_ts.tv_sec = trx->start_time;
begin_ts.tv_usec = trx->start_time_micro;
ut_usectime((ulint *)&commit_ts.tv_sec, (ulint *)&commit_ts.tv_usec);
dict_table_copy_types(row, dict_sys->sys_vtq);
set_row_field_8(row, DICT_FLD__SYS_VTQ__TRX_ID, trx->id, heap);
@ -3908,35 +3901,16 @@ void vers_notify_vtq(trx_t* trx)
set_row_field_8(row, DICT_FLD__SYS_VTQ__COMMIT_TS - 2, commit_ts, heap);
dfield_t* dfield = dtuple_get_nth_field(row, DICT_FLD__SYS_VTQ__CONCURR_TRX - 2);
mutex_enter(&trx_sys->mutex);
trx_ut_list_t &rw_list = trx_sys->rw_trx_list;
if (rw_list.count > 1) {
byte* buf = static_cast<byte*>(mem_heap_alloc(heap, rw_list.count * 8));
byte* ptr = buf;
ulint count = 0;
for (trx_t* ctrx = UT_LIST_GET_FIRST(rw_list);
ctrx != NULL;
ctrx = UT_LIST_GET_NEXT(trx_list, ctrx))
{
if (ctrx == trx || ctrx->state == TRX_STATE_NOT_STARTED)
continue;
mach_write_to_8(ptr, ctrx->id);
++count;
ulint count = 0;
byte* buf = NULL;
if (trx->vtq_concurr_n > 0) {
buf = static_cast<byte*>(mem_heap_alloc(heap, trx->vtq_concurr_n * 8));
for (byte* ptr = buf; count < trx->vtq_concurr_n; ++count) {
mach_write_to_8(ptr, trx->vtq_concurr_trx[count]);
ptr += 8;
}
if (count)
dfield_set_data(dfield, buf, count * 8);
else
dfield_set_data(dfield, NULL, 0);
} else {
// there must be at least current transaction
ut_ad(rw_list.count == 1 && UT_LIST_GET_FIRST(rw_list) == trx);
dfield_set_data(dfield, NULL, 0);
}
mutex_exit(&trx_sys->mutex);
dfield_set_data(dfield, buf, count * 8);
err = vers_row_ins_vtq_low(trx, heap, row);
if (DB_SUCCESS != err)

View File

@ -1489,11 +1489,22 @@ row_insert_for_mysql(
if (DICT_TF2_FLAG_IS_SET(node->table, DICT_TF2_VERSIONED)) {
ut_ad(table->vers_row_start != table->vers_row_end);
/* Return back modified fields into mysql_rec, so that
upper logic may benefit from it (f.ex. 'on duplicate key'). */
const mysql_row_templ_t* t = &prebuilt->mysql_template[table->vers_row_end];
ut_ad(t->mysql_col_len == 8);
if (historical) {
set_row_field_8(node->row, table->vers_row_end, trx->id, node->entry_sys_heap);
} else {
set_row_field_8(node->row, table->vers_row_start, trx->id, node->entry_sys_heap);
int8store(&mysql_rec[t->mysql_col_offset], trx->id);
}
else {
set_row_field_8(node->row, table->vers_row_end, IB_UINT64_MAX, node->entry_sys_heap);
int8store(&mysql_rec[t->mysql_col_offset], IB_UINT64_MAX);
t = &prebuilt->mysql_template[table->vers_row_start];
ut_ad(t->mysql_col_len == 8);
set_row_field_8(node->row, table->vers_row_start, trx->id, node->entry_sys_heap);
int8store(&mysql_rec[t->mysql_col_offset], trx->id);
}
}
@ -1948,20 +1959,25 @@ row_update_for_mysql_using_upd_graph(
if (DICT_TF2_FLAG_IS_SET(node->table, DICT_TF2_VERSIONED))
{
/* System Versioning: update sys_trx_start to current trx_id */
/* System Versioning: modify update vector to set
sys_trx_start (or sys_trx_end in case of DELETE)
to current trx_id. */
upd_t* uvect = node->update;
upd_field_t* ufield;
dict_col_t* col;
const mysql_row_templ_t* t;
unsigned col_idx;
if (node->is_delete) {
ufield = &uvect->fields[0];
uvect->n_fields = 0;
node->is_delete = false;
col = &table->cols[table->vers_row_end];
col_idx = table->vers_row_end;
} else {
ut_ad(uvect->n_fields < node->table->n_cols);
ufield = &uvect->fields[uvect->n_fields];
col = &table->cols[table->vers_row_start];
col_idx = table->vers_row_start;
}
col = &table->cols[col_idx];
UNIV_MEM_INVALID(ufield, sizeof *ufield);
ufield->field_no = dict_col_get_clust_pos(col, clust_index);
ufield->orig_len = 0;
@ -1976,6 +1992,11 @@ row_update_for_mysql_using_upd_graph(
uvect->n_fields++;
ut_ad(node->in_mysql_interface); // otherwise needs to recalculate node->cmpl_info
/* Return trx_id back to mysql_rec (for the sake of interface consistency). */
t = &prebuilt->mysql_template[col_idx];
ut_ad(t->mysql_col_len == 8);
int8store(&mysql_rec[t->mysql_col_offset], trx->id);
}
ut_a(node->pcur->rel_pos == BTR_PCUR_ON);

View File

@ -316,6 +316,10 @@ struct TrxFactory {
trx->lock.table_locks.~lock_pool_t();
trx->hit_list.~hit_list_t();
if (trx->vtq_concurr_trx) {
ut_free(trx->vtq_concurr_trx);
}
}
/** Enforce any invariants here, this is called before the transaction
@ -470,8 +474,6 @@ trx_create_low()
trx_free(). */
ut_a(trx->mod_tables.size() == 0);
trx->vtq_notify_on_commit = false;
#ifdef WITH_WSREP
trx->wsrep_event = NULL;
#endif /* WITH_WSREP */
@ -1214,6 +1216,33 @@ trx_t::assign_temp_rseg()
return(rseg);
}
/** Functor to create trx_ids array. */
struct copy_trx_ids
{
copy_trx_ids(trx_id_t* _array, ulint& _array_size)
: array(_array), array_size(_array_size)
{
array_size = 0;
}
void operator()(trx_t* trx)
{
ut_ad(mutex_own(&trx_sys->mutex));
ut_ad(trx->in_rw_trx_list);
/* trx->state cannot change from or to NOT_STARTED
while we are holding the trx_sys->mutex. It may change
from ACTIVE to PREPARED or COMMITTED. */
if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)) {
array[array_size++] = trx->id;
}
}
trx_id_t* array;
ulint& array_size;
};
/****************************************************************//**
Starts a transaction. */
static
@ -1303,6 +1332,13 @@ trx_start_low(
|| srv_read_only_mode
|| srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO);
if (UT_LIST_GET_LEN(trx_sys->rw_trx_list) > 0) {
trx->vtq_concurr_trx = static_cast<trx_id_t *>(
ut_malloc_nokey(UT_LIST_GET_LEN(trx_sys->rw_trx_list) * sizeof(trx_id_t)));
copy_trx_ids f(trx->vtq_concurr_trx, trx->vtq_concurr_n);
ut_list_map(trx_sys->rw_trx_list, f);
}
UT_LIST_ADD_FIRST(trx_sys->rw_trx_list, trx);
ut_d(trx->in_rw_trx_list = true);