在使用多个源(包括循环复制)的复制设置中,可能会出现不同的源尝试更新同一行的不同数据的情况。NDB 集群复制中的冲突解决机制允许用户定义的解决列来确定是否应用源上的更新。
NDB 集群支持的某些冲突解决类型(NDB$OLD()
、NDB$MAX()
和 NDB$MAX_DELETE_WIN()
;NDB$MAX_INS()
和 NDB$MAX_DEL_WIN_INS()
)将用户定义的列实现为“时间戳”列(尽管其类型不能是 TIMESTAMP
,如本节后面所述)。这些类型的冲突解决总是按行应用,而不是按事务应用。基于 epoch 的冲突解决函数 NDB$EPOCH()
和 NDB$EPOCH_TRANS()
比较 epochs 的复制顺序(因此这些函数是事务性的)。可以使用不同的方法来比较解决列值,以解决冲突;可以在单个表、数据库或服务器上设置该方法,或者使用模式匹配在一组表上设置该方法。请参阅 使用通配符匹配,以获取关于在 db
、table_name
和 server_id
列中的模式匹配信息。
您还应该注意,应用程序负责确保解决列正确填充相关值,以便解决函数可以在确定是否应用更新时做出适当的选择。
冲突解决需要在源和副本上进行准备。这些任务在以下列表中描述:
-
在写入二进制日志的源上,您必须确定哪些列被发送(所有列或仅更新的列)。这可以通过在 MySQL 服务器上应用 mysqld 启动选项
--ndb-log-updated-only
(在本节后面描述),或在一个或多个特定表上通过在mysql.ndb_replication
表中放置适当的条目(请参阅 ndb_replication 表)。Note如果您正在复制具有非常大列(例如
TEXT
或BLOB
列)的表,则--ndb-log-updated-only
也可以用于减少二进制日志的大小,并避免可能的复制失败由于超过max_allowed_packet
。查看 第 19.5.1.20 节,“复制和 max_allowed_packet”,以获取有关此问题的更多信息。
-
在副本上,您必须确定要应用哪种冲突解决方法(“最新时间戳获胜”、“相同时间戳获胜”、“主获胜”、“主获胜,完整事务”或无)。这是在使用
mysql.ndb_replication
系统表完成的,并应用于一个或多个特定表(见 ndb_replication 表)。 -
NDB 集群还支持读取冲突检测,即检测一个集群中的某一行的读取与另一个集群中的同一行的更新或删除之间的冲突。这需要独占读锁,通过将
ndb_log_exclusive_reads
设置为 1 在副本上实现。所有冲突读取的行都记录在异常表中。有关更多信息,请参阅 读取冲突检测和解决。 -
使用
NDB$MAX_INS()
或NDB$MAX_DEL_WIN_INS()
时,NDB
可以将WRITE_ROW
事件 idempotently 应用,映射到插入操作如果incoming 行不存在,或者更新操作如果它存在。使用除
NDB$MAX_INS()
或NDB$MAX_DEL_WIN_INS()
之外的任何冲突解决函数时,incoming 写入操作总是被拒绝,如果行已经存在。
使用函数 NDB$OLD()
、NDB$MAX()
、NDB$MAX_DELETE_WIN()
、NDB$MAX_INS()
和 NDB$MAX_DEL_WIN_INS()
进行基于时间戳的冲突解决时,我们通常将用于确定更新的列称为“时间戳”列。但是,该列的数据类型从不 TIMESTAMP
;相反,它的数据类型应该是 INT
(INTEGER
) 或 BIGINT
。“时间戳”列也应该是 UNSIGNED
和 NOT NULL
。
本节后面讨论的 NDB$EPOCH()
和 NDB$EPOCH_TRANS()
函数通过比较主 NDB 集群和次 NDB 集群上的复制 epoch 的相对顺序工作,不使用时间戳。
我们可以将更新操作视为“before”和“after”图像,即表的更新前和更新后的状态。通常,当更新具有主键的表时,“before”图像并不重要;但是,当我们需要在副本上确定是否使用更新的值时,我们需要确保将两个图像都写入源的二进制日志中。这是使用 --ndb-log-update-as-write
选项为 mysqld 实现的,如本节后面所述。
通常情况下,冲突解决是在可能发生冲突的服务器上启用的。就像日志记录方法选择一样,它是通过 mysql.ndb_replication
表中的条目启用的。
NBT_UPDATED_ONLY_MINIMAL
和 NBT_UPDATED_FULL_MINIMAL
可以与 NDB$EPOCH()
、NDB$EPOCH2()
和 NDB$EPOCH_TRANS()
一起使用,因为这些不需要非主键列的“before”值。需要旧值的冲突解决算法,如 NDB$MAX()
和 NDB$OLD()
,不能与这些 binlog_type
值正确工作。
本节提供了关于使用 NDB 复制进行冲突检测和解决的函数的详细信息。
NDB$OLD()
如果源和副本上的 column_name
值相同,则应用更新;否则,不应用更新并将异常写入日志。下面是伪代码示例:
if (source_old_column_value == replica_current_column_value)
apply_update();
else
log_exception();
此函数可用于 “相同值获胜” 冲突解决。这类冲突解决确保从错误的源更新不应用于副本。
该函数使用源的 “before” 图像中的列值。
NDB$MAX()
对于更新或删除操作,如果来自源的 “时间戳” 列值高于副本上的值,则应用更新;否则,不应用更新。下面是伪代码示例:
if (source_new_column_value > replica_current_column_value)
apply_update();
此函数可用于 “最晚时间戳获胜” 冲突解决。这类冲突解决确保在冲突情况下,最新更新的版本将被保留。
该函数对写操作之间的冲突没有影响,除非写操作使用相同的主键,否则总是被拒绝;只有在没有使用相同主键的写操作存在时,才被接受和应用。
该函数使用源的 “after” 图像中的列值。
NDB$MAX_DELETE_WIN()
这是 NDB$MAX()
的一种变体。由于删除操作没有时间戳,因此使用 NDB$MAX()
进行删除实际上是使用 NDB$OLD
,但对于某些用例,这不是最优的。对于 NDB$MAX_DELETE_WIN()
,如果源的“时间戳”列值高于副本的时间戳列值,则应用该行的添加或更新操作。然而,删除操作总是被视为具有更高的值。这可以通过以下伪代码来说明:
if ( (source_new_column_value > replica_current_column_value)
||
operation.type == "delete")
apply_update();
该函数可用于“最大时间戳,删除优先”冲突解决。这类冲突解决方案确保,在冲突情况下,删除或最近更新的行版本将被保留。
与 NDB$MAX()
相似,该函数使用源的“after”图像中的列值。
NDB$MAX_INS()
该函数提供了冲突写操作的解决方案。这些冲突由 “NDB$MAX_INS()” 按照以下方式处理:
-
如果没有冲突的写操作,则应用该操作(这与
NDB$MAX()
相同)。 -
否则,应用“最大时间戳优先”冲突解决方案,如下所示:
-
如果传入写操作的时间戳大于冲突写操作的时间戳,则应用传入操作。
-
如果传入写操作的时间戳不大于冲突写操作的时间戳,则拒绝传入写操作。
-
在处理插入操作时,NDB$MAX_INS()
将源和副本的时间戳进行比较,如下所示的伪代码:
if (source_new_column_value > replica_current_column_value)
apply_insert();
else
log_exception();
对于更新操作,源的更新时间戳列值将与副本的时间戳列值进行比较,如下所示:
if (source_new_column_value > replica_current_column_value)
apply_update();
else
log_exception();
这与 NDB$MAX()
相同。
对于删除操作,处理方式与 NDB$MAX()
相同(因此与 NDB$OLD()
相同),如下所示:
if (source_new_column_value == replica_current_column_value)
apply_delete();
else
log_exception();
NDB$MAX_DEL_WIN_INS()
该函数提供了冲突写操作的解决方案,包括“删除优先”解决方案,如 NDB$MAX_DELETE_WIN()
。 写冲突由 NDB$MAX_DEL_WIN_INS()
按照以下方式处理:
-
如果没有冲突的写操作,则应用该操作(这与
NDB$MAX_DELETE_WIN()
相同)。 -
否则,应用“最大时间戳优先”冲突解决方案,如下所示:
-
如果传入写操作的时间戳大于冲突写操作的时间戳,则应用传入操作。
-
如果传入写操作的时间戳不大于冲突写操作的时间戳,则拒绝传入写操作。
-
NDB$MAX_DEL_WIN_INS()
对插入操作的处理可以用伪代码表示如下:
if (source_new_column_value > replica_current_column_value)
apply_insert();
else
log_exception();
对于更新操作,源的更新时间戳列值将与副本的时间戳列值进行比较,如下所示:
if (source_new_column_value > replica_current_column_value)
apply_update();
else
log_exception();
删除操作使用“删除优先”策略(与 NDB$MAX_DELETE_WIN()
相同),总是应用删除操作,而不考虑时间戳值,如下所示:
if (operation.type == "delete")
apply_delete();
对于更新和删除操作之间的冲突,该函数的行为与 NDB$MAX_DELETE_WIN()
相同。
NDB$EPOCH()
NDB$EPOCH()
函数跟踪副本集群上复制 epoch 的相对顺序,以确定来自副本的更改是否与来自本地的更改冲突。
以下关于 NDB$EPOCH()
的描述也适用于 NDB$EPOCH_TRANS()
,除非特别注明。
NDB$EPOCH()
是非对称的,在双向复制配置中操作一个 NDB 集群(有时称为 “活动-活动” 复制)。我们这里将其称为主要集群,而另一个为次要集群。主要集群上的副本负责检测和处理冲突,而次要集群上的副本不参与冲突检测或处理。
当主要集群上的副本检测到冲突时,它会将事件注入自己的二进制日志,以补偿这些冲突;这确保次要 NDB 集群最终与主要集群重新对齐,从而防止主要和次要集群的分歧。这一补偿和重新对齐机制要求主要 NDB 集群始终在冲突中获胜——即,主要集群的更改始终优先于次要集群的更改在冲突中。这一 “主要集群始终获胜” 规则具有以下含义:
-
在主要集群上提交的更改数据是一致的,不会被冲突检测和解决所撤销或回滚。
-
从主要集群读取的数据是一致的。任何在主要集群上提交的更改(本地或来自副本)都不会被撤销。
-
在次要集群上进行的更改数据可能会被撤销,如果主要集群确定它们与冲突。
-
在次要集群上读取的每一行都是自我一致的,总是反映次要集群或主要集群提交的状态。
-
在次要集群上读取的行集可能不会在给定的时间点上是一致的。对于
NDB$EPOCH_TRANS()
,这是一个暂时状态;对于NDB$EPOCH()
,它可以是一个持久状态。 -
假设在足够长的时间内没有冲突,次要 NDB 集群上的所有数据最终将与主要集群的数据一致。
NDB$EPOCH()
和 NDB$EPOCH_TRANS()
不需要任何用户架构修改或应用程序更改来提供冲突检测。然而,需要仔细考虑架构的使用和访问模式,以确保整个系统在指定的限制内行为。
每个 NDB$EPOCH()
和 NDB$EPOCH_TRANS()
函数都可以带有一个可选参数;这是用于表示 epoch 的低 32 位的位数,应该至少设置为以下计算值:
CEIL( LOG2( TimeBetweenGlobalCheckpoints / TimeBetweenEpochs ), 1)
对于这些配置参数的默认值(2000 和 100 毫秒),这将给出 5 位的值,因此默认值(6)应该足够,除非使用了不同的 TimeBetweenGlobalCheckpoints
、TimeBetweenEpochs
或两者。太小的值可能导致虚假阳性,而太大的值可能导致数据库中的空间浪费。
两个 NDB$EPOCH()
和 NDB$EPOCH_TRANS()
都将插入冲突行的条目到相关的异常表中,前提是这些表已经根据相同的异常表架构规则定义(见 NDB$OLD())。您必须在创建数据表之前创建任何异常表。
与本节中讨论的其他冲突检测函数一样,NDB$EPOCH()
和 NDB$EPOCH_TRANS()
是通过在 mysql.ndb_replication
表中包含相关条目来激活的(见 ndb_replication Table)。在这个场景中,主要和次要 NDB 集群的角色完全由 mysql.ndb_replication
表条目确定。
由于 NDB$EPOCH()
和 NDB$EPOCH_TRANS()
使用的冲突检测算法是非对称的,因此您必须为主要和次要副本的 server_id
条目使用不同的值。
仅使用 DELETE
操作之间的冲突不足以触发使用 NDB$EPOCH()
或 NDB$EPOCH_TRANS()
的冲突,并且相对 epoch 内的位置不重要。
当使用 NDB$EPOCH()
进行冲突检测时,以下限制当前适用:
-
冲突是使用 NDB 集群 epoch 边界检测的,粒度与
TimeBetweenEpochs
成正比(默认:100 毫秒)。最小冲突窗口是指在两个集群上同时更新同一数据时总是报告冲突的最小时间长度。这总是一个非零的时间长度,大致等于2 * (延迟 + 排队 + TimeBetweenEpochs)
。这意味着—假设TimeBetweenEpochs
的默认值,并忽略集群之间的延迟(以及任何排队延迟)—最小冲突窗口大小约为 200 毫秒。这最小窗口应该在查看应用程序预期的 “竞赛” 模式时考虑。 -
使用
NDB$EPOCH()
和NDB$EPOCH_TRANS()
函数的表格需要额外的存储空间;每行需要 1 到 32 位额外的空间,具体取决于传递给函数的值。 -
删除操作之间的冲突可能会导致主从集群之间的分歧。当在两个集群上同时删除一行时,冲突可以被检测到,但不会被记录,因为该行已经被删除。这意味着,任何后续的重新对齐操作中的冲突都不会被检测到,从而导致分歧。
删除操作应该被外部序列化,或者路由到一个集群上。或者,应该事务性地更新一行,以便跟踪删除操作的冲突。这可能需要对应用程序进行更改。
-
当前仅支持使用
NDB$EPOCH()
或NDB$EPOCH_TRANS()
进行冲突检测的两个 NDB 集群在双向 “活动-活动” 配置中。 -
当前不支持使用
NDB$EPOCH()
或NDB$EPOCH_TRANS()
的表格具有BLOB
或TEXT
列。
NDB$EPOCH_TRANS()
NDB$EPOCH_TRANS()
扩展了 NDB$EPOCH()
函数。冲突是使用同样的方式检测和处理的,使用 “主胜所有” 规则(见 NDB$EPOCH()),但额外地认为在同一事务中更新的所有行也存在冲突。
此外,任何依赖于冲突事务的交易也被认为存在冲突,这些依赖关系是根据二进制日志的内容确定的。由于二进制日志仅包含数据修改操作(插入、更新和删除),因此仅使用重叠的数据修改来确定事务之间的依赖关系。
NDB$EPOCH_TRANS()
受到与 NDB$EPOCH()
相同的条件和限制,并且还需要在二进制日志中记录所有事务 ID,使用 --ndb-log-transaction-id
设置为 ON
。这将添加可变数量的开销(每行最多 13 字节)。
请参阅 NDB$EPOCH()。
NDB$EPOCH2()
NDB$EPOCH2()
函数与 NDB$EPOCH()
相似,除了 NDB$EPOCH2()
在双向复制拓扑结构中提供删除-删除处理。 在这种情况下,通过设置 ndb_conflict_role
系统变量为适当的值(通常为 PRIMARY
和 SECONDARY
),将主从角色分配给两个源。然后,secondary 上的修改将被反映回 primary,然后 conditional 地应用于 secondary。
NDB$EPOCH2_TRANS()
NDB$EPOCH2_TRANS()
扩展了NDB$EPOCH2()
函数。冲突被检测和处理的方式相同,分配主要和次要角色给复制集群,但有一个额外的条件,即同一事务中更新的所有其他行也被视为冲突。也就是说,NDB$EPOCH2()
重新对齐单个冲突行,而NDB$EPOCH_TRANS()
重新对齐冲突事务。
其中NDB$EPOCH()
和NDB$EPOCH_TRANS()
使用每行指定的元数据,确定主服务器上是否存在来自次要服务器的并发更改;并发更改被视为冲突,随后更新异常表并重新对齐次要服务器。问题是,当主服务器上删除一行时,没有最后修改的时期来确定是否存在冲突,从而导致冲突删除操作未被检测到。这可能导致分歧,例如一个集群上的删除操作与另一个集群上的删除和插入操作同时发生,这就是为什么删除操作只能路由到一个集群时使用NDB$EPOCH()
和NDB$EPOCH_TRANS()
。
NDB$EPOCH2()
绕过了上述问题—在主服务器上存储已删除行的信息—通过忽略删除-删除冲突,并避免了可能的分歧。这样做是通过反映来自次要服务器的操作回到次要服务器来实现的。在返回次要服务器时,可以重新应用来自主服务器的操作,删除了来自主服务器的操作。
使用NDB$EPOCH2()
时,需要注意,次要服务器应用来自主服务器的删除,直到反映操作恢复该行。在理论上,次要服务器上的后续插入或更新与来自主服务器的删除冲突,但我们选择忽略这种情况,以防止集群之间的分歧。换言之,在删除后,主服务器不检测冲突,而是立即采用次要服务器的后续更改。由于这种情况,次要服务器的状态可能会回到多个以前提交的状态中,其中一些可能是可见的。
您还应该注意,反映所有来自次要服务器的操作到主服务器将增加主服务器日志二进制日志的大小,以及带宽、CPU 使用率和磁盘 I/O 的需求。
在次要服务器上应用反映操作取决于目标行的状态。可以通过检查Ndb_conflict_reflected_op_prepare_count
和Ndb_conflict_reflected_op_discard_count
状态变量来跟踪反映更改的应用情况。应用的更改数量只是这两个值之间的差异(注意:Ndb_conflict_reflected_op_prepare_count
总是大于或等于Ndb_conflict_reflected_op_discard_count
)。
事件将被应用,如果且仅当以下两个条件都为真:
-
行的存在性—即是否存在—符合事件类型。对于删除和更新操作,行必须已经存在。对于插入操作,行必须不存在。
-
行的最后修改是由主服务器完成的。可能是通过执行反映操作来实现的。
如果这两个条件都没有满足,反映操作将被次要服务器丢弃。
要使用NDB$OLD()
冲突解决函数,还需要为每个使用这种冲突解决方法的NDB
表创建一个异常表。同样适用于使用NDB$EPOCH()
或NDB$EPOCH_TRANS()
。该表的名称是原始表的名称,后缀为$EX
。(例如,如果原始表的名称是mytable
,则对应的异常表名称应该是mytable$EX
。)创建异常表的语法如下所示:
CREATE TABLE original_table$EX (
[NDB$]server_id INT UNSIGNED,
[NDB$]source_server_id INT UNSIGNED,
[NDB$]source_epoch BIGINT UNSIGNED,
[NDB$]count INT UNSIGNED,
[NDB$OP_TYPE ENUM('WRITE_ROW','UPDATE_ROW', 'DELETE_ROW',
'REFRESH_ROW', 'READ_ROW') NOT NULL,]
[NDB$CFT_CAUSE ENUM('ROW_DOES_NOT_EXIST', 'ROW_ALREADY_EXISTS',
'DATA_IN_CONFLICT', 'TRANS_IN_CONFLICT') NOT NULL,]
[NDB$ORIG_TRANSID BIGINT UNSIGNED NOT NULL,]
original_table_pk_columns,
[orig_table_column|orig_table_column$OLD|orig_table_column$NEW,]
[additional_columns,]
PRIMARY KEY([NDB$]server_id, [NDB$]source_server_id, [NDB$]source_epoch, [NDB$]count)
) ENGINE=NDB;
前四列是必需的。前四列和原始表的主键列的名称不太关键;然而,我们建议出于清晰和一致的原因,使用这里显示的名称来命名 server_id
、source_server_id
、source_epoch
和 count
列,并使用原始表的主键列相同的名称。
如果异常表使用一个或多个可选列 NDB$OP_TYPE
、NDB$CFT_CAUSE
或 NDB$ORIG_TRANSID
,那么每个必需列也必须使用 NDB$
前缀命名。如果您想要,即使不定义任何可选列,也可以使用 NDB$
前缀命名必需列,但是在这种情况下,所有四个必需列都必须使用该前缀。
在这些列之后,原始表的主键列将被复制到异常表中,按照它们在原始表中的顺序。这些列的数据类型应该与原始表的主键列相同或更大。可以使用原始表主键列的子集。
异常表必须使用 NDB
存储引擎。(本节后面将显示使用 NDB$OLD()
的异常表示例。)
可以在复制的主键列之后定义额外的列,但不能在它们之前;这些额外列不能是 NOT NULL
。NDB Cluster 支持三个预定义的可选列 NDB$OP_TYPE
、NDB$CFT_CAUSE
和 NDB$ORIG_TRANSID
,它们将在下面几段中描述。
NDB$OP_TYPE
:该列可以用于获取导致冲突的操作类型。如果您使用该列,请按照以下方式定义:
NDB$OP_TYPE ENUM('WRITE_ROW', 'UPDATE_ROW', 'DELETE_ROW',
'REFRESH_ROW', 'READ_ROW') NOT NULL
WRITE_ROW
、UPDATE_ROW
和 DELETE_ROW
操作类型表示用户启动的操作。REFRESH_ROW
操作是冲突解决中生成的补偿事务发送回原始集群的操作。READ_ROW
操作是使用排他行锁定义的用户启动的读取跟踪操作。
NDB$CFT_CAUSE
:您可以定义一个可选列 NDB$CFT_CAUSE
,它提供了注册冲突的原因。该列,如果使用,应按照以下方式定义:
NDB$CFT_CAUSE ENUM('ROW_DOES_NOT_EXIST', 'ROW_ALREADY_EXISTS',
'DATA_IN_CONFLICT', 'TRANS_IN_CONFLICT') NOT NULL
ROW_DOES_NOT_EXIST
可以报告为 UPDATE_ROW
和 WRITE_ROW
操作的原因;ROW_ALREADY_EXISTS
可以报告为 WRITE_ROW
事件。DATA_IN_CONFLICT
报告行冲突函数检测到的冲突;TRANS_IN_CONFLICT
报告事务冲突函数拒绝了整个事务的所有操作。
NDB$ORIG_TRANSID
:如果使用,NDB$ORIG_TRANSID
列将包含原始事务的 ID。该列应按照以下方式定义:
NDB$ORIG_TRANSID BIGINT UNSIGNED NOT NULL
NDB$ORIG_TRANSID
是 NDB
生成的 64 位值。该值可以用于关联来自同一冲突事务的多个异常表条目,来自同一个或不同的异常表。
可以定义其他参考列,它们不是原始表的主键的一部分,可以命名为
或 colname
$OLD
。colname
$NEW
引用更新和删除操作中的旧值,即包含 colname
$OLDDELETE_ROW
事件的操作。
可以用于引用插入和更新操作中的新值,即使用 colname
$NEWWRITE_ROW
事件、UPDATE_ROW
事件或两者。如果冲突操作不提供某个参考列的值,该列将包含 NULL
或该列的默认值。
在设置复制表时,mysql.ndb_replication
表将被读取,因此必须在创建要复制的表之前将对应的行插入到 mysql.ndb_replication
表中。
有多个状态变量可以用来监控冲突检测。您可以通过当前值的 Ndb_conflict_fn_epoch
系统状态变量,查看自从这个副本最后一次重新启动以来,NDB$EPOCH()
发现了多少行冲突。
Ndb_conflict_fn_epoch_trans
提供了通过 NDB$EPOCH_TRANS()
直接发现的冲突行数。Ndb_conflict_fn_epoch2
和 Ndb_conflict_fn_epoch2_trans
显示了通过 NDB$EPOCH2()
和 NDB$EPOCH2_TRANS()
发现的冲突行数。实际重新对齐的行数,包括那些由于成员关系或依赖于同一事务的其他冲突行的影响,而是由 Ndb_conflict_trans_row_reject_count
给出。
另一个服务器状态变量 Ndb_conflict_fn_max
提供了自从最后一次启动 mysqld 以来,因 “最大时间戳获胜” 冲突解决而未应用于当前 SQL 节点的行数。Ndb_conflict_fn_max_del_win
提供了基于 NDB$MAX_DELETE_WIN()
结果的冲突解决次数。
Ndb_conflict_fn_max_ins
跟踪了使用 NDB$MAX_INS()
的写操作次数;Ndb_conflict_fn_max_del_win_ins
提供了使用 NDB$MAX_DEL_WIN_INS()
的写操作次数。
自从最后一次重新启动 mysqld 以来,因 “相同时间戳获胜” 冲突解决而未应用于当前 SQL 节点的行数,由全局状态变量 Ndb_conflict_fn_old
给出。此外,还将未使用的行的主键插入到一个 异常表 中,如本节其他地方所述。
以下示例假设您已经设置了一个工作的 NDB Cluster 复制设置,如 第 25.7.5 节,“为 NDB Cluster 复制做准备” 和 第 25.7.6 节,“启动 NDB Cluster 复制(单个复制通道)” 所述。
NDB$MAX() 示例。 假设您想在表 test.t1
上启用 “最大时间戳获胜” 冲突解决,使用列 mycol
作为 “时间戳”。这可以通过以下步骤完成:
-
确保已经启动了源mysqld,带有
--ndb-log-update-as-write=OFF
。 -
在源上执行以下
INSERT
语句:INSERT INTO mysql.ndb_replication VALUES ('test', 't1', 0, NULL, 'NDB$MAX(mycol)');
Note如果
ndb_replication
表不存在,必须创建它。请参阅ndb_replication 表。将 0 插入到
server_id
列中,表明所有 SQL 节点访问该表时都应使用冲突解决。如果您想在特定的mysqld上使用冲突解决,请使用实际的服务器 ID。将
NULL
插入到binlog_type
列中具有相同的效果,相当于插入 0 (NBT_DEFAULT
);服务器默认值将被使用。 -
创建
test.t1
表:CREATE TABLE test.t1 ( columns mycol INT UNSIGNED, columns ) ENGINE=NDB;
现在,当对该表执行更新操作时,冲突解决将被应用,并且具有最大
mycol
值的行版本将被写入副本中。
其他binlog_type
选项,如NBT_UPDATED_ONLY_USE_UPDATE
(6
),应用于控制源上的日志记录,而不是使用命令行选项。
NDB$OLD() 示例。假设有一个NDB
表,如下所定义的表正在被复制,您想为该表启用“相同时间戳获胜”的冲突解决:
CREATE TABLE test.t2 (
a INT UNSIGNED NOT NULL,
b CHAR(25) NOT NULL,
columns,
mycol INT UNSIGNED NOT NULL,
columns,
PRIMARY KEY pk (a, b)
) ENGINE=NDB;
需要按照以下步骤执行:
-
首先——并且是在创建
test.t2
之前——您必须将一行插入到mysql.ndb_replication
表中,如下所示:INSERT INTO mysql.ndb_replication VALUES ('test', 't2', 0, 0, 'NDB$OLD(mycol)');
可能的
binlog_type
列值在本节前面显示;在这里,我们使用0
指定服务器默认日志记录行为。值'NDB$OLD(mycol)'
应插入到conflict_fn
列中。 -
为
test.t2
创建适当的异常表。该表创建语句显示了所有必需的列;任何其他列必须在这些列之后声明,并在表的主键定义之前。CREATE TABLE test.t2$EX ( server_id INT UNSIGNED, source_server_id INT UNSIGNED, source_epoch BIGINT UNSIGNED, count INT UNSIGNED, a INT UNSIGNED NOT NULL, b CHAR(25) NOT NULL, [additional_columns,] PRIMARY KEY(server_id, source_server_id, source_epoch, count) ) ENGINE=NDB;
我们可以包括关于冲突类型、原因和原始事务 ID 的信息的其他列。我们不需要为原始表的所有主键列提供匹配的列。这意味着您可以创建异常表,如下所示:
CREATE TABLE test.t2$EX ( NDB$server_id INT UNSIGNED, NDB$source_server_id INT UNSIGNED, NDB$source_epoch BIGINT UNSIGNED, NDB$count INT UNSIGNED, a INT UNSIGNED NOT NULL, NDB$OP_TYPE ENUM('WRITE_ROW','UPDATE_ROW', 'DELETE_ROW', 'REFRESH_ROW', 'READ_ROW') NOT NULL, NDB$CFT_CAUSE ENUM('ROW_DOES_NOT_EXIST', 'ROW_ALREADY_EXISTS', 'DATA_IN_CONFLICT', 'TRANS_IN_CONFLICT') NOT NULL, NDB$ORIG_TRANSID BIGINT UNSIGNED NOT NULL, [additional_columns,] PRIMARY KEY(NDB$server_id, NDB$source_server_id, NDB$source_epoch, NDB$count) ) ENGINE=NDB;
Note由于我们在表定义中包括了至少一个列
NDB$OP_TYPE
、NDB$CFT_CAUSE
或NDB$ORIG_TRANSID
,因此NDB$
前缀是必需的。 -
创建
test.t2
表,如前所示。
这些步骤必须为每个表执行,以便使用NDB$OLD()
进行冲突解决。对于每个这样的表,必须在mysql.ndb_replication
中有一个对应的行,并且必须在同一个数据库中有一个异常表。
读取冲突检测和解决。 NDB Cluster 还支持跟踪读取操作,这使得在环形复制设置中可以管理读取某行的冲突和在另一个集群中的更新或删除该行。这个示例使用employee
和department
表来模拟一个场景,在该场景中,员工从一个部门移到另一个部门在源集群(以下简称集群 A)中,而在副本集群(以下简称集群 B)中更新员工的前一个部门的员工数目。
数据表已经使用以下 SQL 语句创建:
# Employee table
CREATE TABLE employee (
id INT PRIMARY KEY,
name VARCHAR(2000),
dept INT NOT NULL
) ENGINE=NDB;
# Department table
CREATE TABLE department (
id INT PRIMARY KEY,
name VARCHAR(2000),
members INT
) ENGINE=NDB;
两个表的内容包括以下SELECT
语句的部分输出:
mysql> SELECT id, name, dept FROM employee;
+---------------+------+
| id | name | dept |
+------+--------+------+
...
| 998 | Mike | 3 |
| 999 | Joe | 3 |
| 1000 | Mary | 3 |
...
+------+--------+------+
mysql> SELECT id, name, members FROM department;
+-----+-------------+---------+
| id | name | members |
+-----+-------------+---------+
...
| 3 | Old project | 24 |
...
+-----+-------------+---------+
我们假设已经使用了一个异常表,该表包括四个必需的列(这些列用于该表的主键),可选的操作类型和原因列,以及原始表的主键列,使用以下 SQL 语句创建:
CREATE TABLE employee$EX (
NDB$server_id INT UNSIGNED,
NDB$source_server_id INT UNSIGNED,
NDB$source_epoch BIGINT UNSIGNED,
NDB$count INT UNSIGNED,
NDB$OP_TYPE ENUM( 'WRITE_ROW','UPDATE_ROW', 'DELETE_ROW',
'REFRESH_ROW','READ_ROW') NOT NULL,
NDB$CFT_CAUSE ENUM( 'ROW_DOES_NOT_EXIST',
'ROW_ALREADY_EXISTS',
'DATA_IN_CONFLICT',
'TRANS_IN_CONFLICT') NOT NULL,
id INT NOT NULL,
PRIMARY KEY(NDB$server_id, NDB$source_server_id, NDB$source_epoch, NDB$count)
) ENGINE=NDB;
假设在两个集群上同时发生了两个事务。在集群 A 上,我们创建了一个新部门,然后将员工编号 999 移动到该部门,使用以下 SQL 语句:
BEGIN;
INSERT INTO department VALUES (4, "New project", 1);
UPDATE employee SET dept = 4 WHERE id = 999;
COMMIT;
与此同时,在集群 B 上,另一个事务从 employee
表中读取,如下所示:
BEGIN;
SELECT name FROM employee WHERE id = 999;
UPDATE department SET members = members - 1 WHERE id = 3;
commit;
这些冲突的事务通常不会被冲突解决机制检测,因为冲突是读取(SELECT
)和更新操作之间的冲突。你可以通过在副本集群上执行 SET
ndb_log_exclusive_reads
= 1
来规避这个问题。这将导致在源集群上读取的行被标记为需要在副本集群上解决冲突。如果我们在记录这些事务之前启用了独占读取,那么在集群 B 上的读取将被跟踪并发送到集群 A 进行解决;随后,在员工行上检测到冲突,并且集群 B 上的事务将被中止。
冲突将被记录在异常表中(在集群 A 上),作为 READ_ROW
操作(请参阅 冲突解决异常表,以获取操作类型的描述),如下所示:
mysql> SELECT id, NDB$OP_TYPE, NDB$CFT_CAUSE FROM employee$EX;
+-------+-------------+-------------------+
| id | NDB$OP_TYPE | NDB$CFT_CAUSE |
+-------+-------------+-------------------+
...
| 999 | READ_ROW | TRANS_IN_CONFLICT |
+-------+-------------+-------------------+
任何在读取操作中找到的现有行都将被标记。这意味着,来自同一个冲突的多个行可能会被记录在异常表中,如在检查集群 A 上的更新和集群 B 上的读取操作之间的冲突时所示。
BEGIN;
INSERT INTO department VALUES (4, "New project", 0);
UPDATE employee SET dept = 4 WHERE dept = 3;
SELECT COUNT(*) INTO @count FROM employee WHERE dept = 4;
UPDATE department SET members = @count WHERE id = 4;
COMMIT;
同时,在集群 B 上执行的事务包含以下语句:
SET ndb_log_exclusive_reads = 1; # Must be set if not already enabled
...
BEGIN;
SELECT COUNT(*) INTO @count FROM employee WHERE dept = 3 FOR UPDATE;
UPDATE department SET members = @count WHERE id = 3;
COMMIT;
在这种情况下,第二个事务的 SELECT
语句中的所有三个匹配 WHERE
条件的行都将被读取,并因此在异常表中被标记,如下所示:
mysql> SELECT id, NDB$OP_TYPE, NDB$CFT_CAUSE FROM employee$EX;
+-------+-------------+-------------------+
| id | NDB$OP_TYPE | NDB$CFT_CAUSE |
+-------+-------------+-------------------+
...
| 998 | READ_ROW | TRANS_IN_CONFLICT |
| 999 | READ_ROW | TRANS_IN_CONFLICT |
| 1000 | READ_ROW | TRANS_IN_CONFLICT |
...
+-------+-------------+-------------------+
读取跟踪仅基于现有行。基于给定条件的读取仅跟踪冲突的现有行,而不是在交叉事务中插入的行。这类似于 NDB 集群单实例中的独占行锁定。
插入冲突检测和解决示例。 以下示例演示了插入冲突检测函数的使用。我们假设正在复制两个表 t1
和 t2
在数据库 test
中,并且我们想使用插入冲突检测函数 NDB$MAX_INS()
对 t1
和 NDB$MAX_DEL_WIN_INS()
对 t2
进行配置。两个数据表将在后面的设置过程中创建。
设置插入冲突解决类似于设置其他冲突检测和解决算法,如前面的示例所示。如果 mysql.ndb_replication
表用于配置二进制日志记录和冲突解决,尚不存在,则需要首先创建它,如下所示:
CREATE TABLE mysql.ndb_replication (
db VARBINARY(63),
table_name VARBINARY(63),
server_id INT UNSIGNED,
binlog_type INT UNSIGNED,
conflict_fn VARBINARY(128),
PRIMARY KEY USING HASH (db, table_name, server_id)
) ENGINE=NDB
PARTITION BY KEY(db,table_name);
该 ndb_replication
表按表 basis 进行操作;也就是说,我们需要为每个要设置的表插入一行,包含表信息、binlog_type
值、冲突解决函数的名称和时间戳列(X
)的名称,如下所示:
INSERT INTO mysql.ndb_replication VALUES ("test", "t1", 0, 7, "NDB$MAX_INS(X)");
INSERT INTO mysql.ndb_replication VALUES ("test", "t2", 0, 7, "NDB$MAX_DEL_WIN_INS(X)");
这里,我们将 binlog_type
设置为 NBT_FULL_USE_UPDATE
(7
),这意味着总是记录完整的行。请参阅 ndb_replication 表,以获取其他可能的值。
你也可以创建一个异常表,对应于每个使用冲突解决的 NDB
表。异常表记录了冲突解决函数拒绝的所有行。可以使用以下两个 SQL 语句创建异常表:
CREATE TABLE `t1$EX` (
NDB$server_id INT UNSIGNED,
NDB$source_server_id INT UNSIGNED,
NDB$source_epoch BIGINT UNSIGNED,
NDB$count INT UNSIGNED,
NDB$OP_TYPE ENUM('WRITE_ROW', 'UPDATE_ROW', 'DELETE_ROW',
'REFRESH_ROW', 'READ_ROW') NOT NULL,
NDB$CFT_CAUSE ENUM('ROW_DOES_NOT_EXIST', 'ROW_ALREADY_EXISTS',
'DATA_IN_CONFLICT', 'TRANS_IN_CONFLICT') NOT NULL,
a INT NOT NULL,
PRIMARY KEY(NDB$server_id, NDB$source_server_id,
NDB$source_epoch, NDB$count)
) ENGINE=NDB;
CREATE TABLE `t2$EX` (
NDB$server_id INT UNSIGNED,
NDB$source_server_id INT UNSIGNED,
NDB$source_epoch BIGINT UNSIGNED,
NDB$count INT UNSIGNED,
NDB$OP_TYPE ENUM('WRITE_ROW', 'UPDATE_ROW', 'DELETE_ROW',
'REFRESH_ROW', 'READ_ROW') NOT NULL,
NDB$CFT_CAUSE ENUM( 'ROW_DOES_NOT_EXIST', 'ROW_ALREADY_EXISTS',
'DATA_IN_CONFLICT', 'TRANS_IN_CONFLICT') NOT NULL,
a INT NOT NULL,
PRIMARY KEY(NDB$server_id, NDB$source_server_id,
NDB$source_epoch, NDB$count)
) ENGINE=NDB;
最后,在创建了异常表后,你可以使用以下两个 SQL 语句创建要复制和受冲突解决控制的数据表:
CREATE TABLE t1 (
a INT PRIMARY KEY,
b VARCHAR(32),
X INT UNSIGNED
) ENGINE=NDB;
CREATE TABLE t2 (
a INT PRIMARY KEY,
b VARCHAR(32),
X INT UNSIGNED
) ENGINE=NDB;
对于每个表,X
列被用作时间戳列。
一旦在源上创建,t1
和 t2
将被复制并可以假设存在于源和副本上。在本示例的其余部分,我们使用 mysql>
表示连接到源的 mysql 客户端,并使用 mysqlR>
表示在副本上运行的 mysql 客户端。
首先,我们在源上插入每个表的一行,如下所示:
mysqlS> INSERT INTO t1 VALUES (1, 'Initial X=1', 1);
Query OK, 1 row affected (0.01 sec)
mysqlS> INSERT INTO t2 VALUES (1, 'Initial X=1', 1);
Query OK, 1 row affected (0.01 sec)
我们可以确定,这两行将被复制,而不会引发任何冲突,因为副本上的表在发出 INSERT
语句之前不包含任何行。我们可以通过在副本上选择表来验证,如下所示:
mysqlR> TABLE t1 ORDER BY a;
+---+-------------+------+
| a | b | X |
+---+-------------+------+
| 1 | Initial X=1 | 1 |
+---+-------------+------+
1 row in set (0.00 sec)
mysqlR> TABLE t2 ORDER BY a;
+---+-------------+------+
| a | b | X |
+---+-------------+------+
| 1 | Initial X=1 | 1 |
+---+-------------+------+
1 row in set (0.00 sec)
接下来,我们在副本上插入新行,如下所示:
mysqlR> INSERT INTO t1 VALUES (2, 'Replica X=2', 2);
Query OK, 1 row affected (0.01 sec)
mysqlR> INSERT INTO t2 VALUES (2, 'Replica X=2', 2);
Query OK, 1 row affected (0.01 sec)
现在,我们在源上插入具有更大时间戳 (X
) 列值的冲突行,使用以下语句:
mysqlS> INSERT INTO t1 VALUES (2, 'Replica X=20', 20);
Query OK, 1 row affected (0.01 sec)
mysqlS> INSERT INTO t2 VALUES (2, 'Replica X=20', 20);
Query OK, 1 row affected (0.01 sec)
现在,我们通过在副本上选择(再次)从两个表来观察结果,如下所示:
mysqlR> TABLE t1 ORDER BY a;
+---+-------------+-------+
| a | b | X |
+---+-------------+-------+
| 1 | Initial X=1 | 1 |
+---+-------------+-------+
| 2 | Source X=20 | 20 |
+---+-------------+-------+
2 rows in set (0.00 sec)
mysqlR> TABLE t2 ORDER BY a;
+---+-------------+-------+
| a | b | X |
+---+-------------+-------+
| 1 | Initial X=1 | 1 |
+---+-------------+-------+
| 1 | Source X=20 | 20 |
+---+-------------+-------+
2 rows in set (0.00 sec)
源上插入的行,具有更大的时间戳值,已经替换了副本上的行。在副本上,我们接下来插入两个新的行,这些行不与 t1
或 t2
中的任何现有行冲突,如下所示:
mysqlR> INSERT INTO t1 VALUES (3, 'Replica X=30', 30);
Query OK, 1 row affected (0.01 sec)
mysqlR> INSERT INTO t2 VALUES (3, 'Replica X=30', 30);
Query OK, 1 row affected (0.01 sec)
在源上插入更多行,具有相同的主键值 (3
),引发冲突,如前所示,但这次我们使用的时间戳列值小于副本上冲突行的时间戳列值。
mysqlS> INSERT INTO t1 VALUES (3, 'Source X=3', 3);
Query OK, 1 row affected (0.01 sec)
mysqlS> INSERT INTO t2 VALUES (3, 'Source X=3', 3);
Query OK, 1 row affected (0.01 sec)
我们可以通过查询表看到,来自源的所有插入都被副本拒绝,而副本上之前插入的行没有被覆盖,如下所示在副本上的 mysql 客户端:
mysqlR> TABLE t1 ORDER BY a;
+---+--------------+-------+
| a | b | X |
+---+--------------+-------+
| 1 | Initial X=1 | 1 |
+---+--------------+-------+
| 2 | Source X=20 | 20 |
+---+--------------+-------+
| 3 | Replica X=30 | 30 |
+---+--------------+-------+
3 rows in set (0.00 sec)
mysqlR> TABLE t2 ORDER BY a;
+---+--------------+-------+
| a | b | X |
+---+--------------+-------+
| 1 | Initial X=1 | 1 |
+---+--------------+-------+
| 2 | Source X=20 | 20 |
+---+--------------+-------+
| 3 | Replica X=30 | 30 |
+---+--------------+-------+
3 rows in set (0.00 sec)
您可以在异常表中看到被拒绝的行的信息,如下所示:
mysqlR> SELECT NDB$server_id, NDB$source_server_id, NDB$count,
> NDB$OP_TYPE, NDB$CFT_CAUSE, a
> FROM t1$EX
> ORDER BY NDB$count\G
*************************** 1. row ***************************
NDB$server_id : 2
NDB$source_server_id: 1
NDB$count : 1
NDB$OP_TYPE : WRITE_ROW
NDB$CFT_CAUSE : DATA_IN_CONFLICT
a : 3
1 row in set (0.00 sec)
mysqlR> SELECT NDB$server_id, NDB$source_server_id, NDB$count,
> NDB$OP_TYPE, NDB$CFT_CAUSE, a
> FROM t2$EX
> ORDER BY NDB$count\G
*************************** 1. row ***************************
NDB$server_id : 2
NDB$source_server_id: 1
NDB$count : 1
NDB$OP_TYPE : WRITE_ROW
NDB$CFT_CAUSE : DATA_IN_CONFLICT
a : 3
1 row in set (0.00 sec)
正如我们之前所见,来自源的其他行都没有被副本拒绝,只有那些具有较小时间戳值的行与副本上的冲突行冲突。