逻辑复制涉及如下几点
发布 Publication
订阅 Subscription
复制槽的管理 Replication Slot Management
冲突 Conflicts
逻辑复制的限制 Restrictions
逻辑复制的架构 Architecture
初始化数据快照 Initial Snapshot
监控 Monitoring
安全性 Security
配置参数设置 Configuration Settings
案例 case
逻辑复制 Logical Replication
流复制是实例级别的复制,是基于 WAL 日志的物理复制,其原理是主库不断的发送 WAL 日志流到备库,备库在接收 WAL 日志之后进行重放。逻辑复制是表级别的选择性复制,逻辑解析是逻辑复制的核心,其原理是主库将 WAL 日志流解析成为一定的格式,订阅者收到解析的 WAL 数据流之后进行应用,从而实现数据同步。
注意:逻辑复制并不是使用原始的 WAL 日志文件,而是将 WAL 日志解析为一定的数据格式。
逻辑复制是一种基于数据对象的复制标识通常是主键来实现复制数据对象及其更改的方法。我们使用术语 "逻辑复制" 以此来区分它与物理复制的不同。
逻辑复制使用一种发布和订阅模型,有一个或者多个订阅者订阅一个发布者节点上的一个或者多个发布,订阅者从它们所订阅的发布节点上拉取数据并且可能后续重新发布这些数据,以此来实现数据的级联复制。
一个表的逻辑复制通常是先从发布者服务器上获得该表的一个快照然后拷贝给订阅者,开启复制槽,在这之后发布者上的所有更改会被实时发送给订阅者,订阅者以与发布者相同的顺序应用那些数据,这样在一个订阅中能够保证发布的事务的一致性,该方法有时候也被称为事务性复制。
逻辑复制的典型用法:
将多个数据库合并到到单个数据库中以便进行数据分析。
在多个数据库之间共享一个数据库的一个子集。
发布 Publication
发布可以被定义在任何物理复制的主服务器上,定义有发布的节点被称为发布者。发布是一个表或者一组表生成的集合,每个发布都只存在于一个数据库中。
发布与模式不同,发布不会影响表的访问方式。如果有需要,每个表都可以被加入到多个发布。当前发布只能包含表,对象必须被明确地加入到发布,除非发布是用 ALL TABLES 创建的。
Publication 可以选择把它们产生的更改限制为 INSERT、UPDATE、DELETE 以及 TRUNCATE 的任意组合,类似于触发器如何被特定事件类型触发一样。默认情况下,所有操作类型都会被复制。
为了能够复制 UPDATE 和 DELETE 操作,被发布的表必须要配置一个 "复制标识",这样在订阅者那一端才能标识对于更新或删除合适的行。默认情况下,复制标识就是主键,也可以将一个唯一索引设置为复制标识,如果表没有合适的键,那么可以将复制标识设置成 "full",它表示整个行都成为一个键,不过这样做效率很低,只有在没有其他方案的情况下才使用。如果在发布者端设置了 "full" 之外的复制标识,在订阅者端也必须设置一个复制标识。如果在复制 UPDATE 或 DELETE 操作的发布中加入了没有复制标识的表,那么订阅者上后续的 UPDATE 或 DELETE 操作将导致错误,不管有没有复制标识,INSERT 操作都能继续下去。
每一个发布者都可以有多个订阅者。
发布者 Publication 通过使用 CREATE PUBLICATION 命令创建,之后可以使用 ALTER PUBLICATION 命令动态的增加或者移除所发布的表。ALTER PUBLICATION 的 ADD TABLE 以及 DROP TABLE 操作都是事务性的,因此一旦该事务提交,该表将以正确的快照开始或者停止复制。
CREATE PUBLICATION 定义一个新的发布
CREATE PUBLICATION name FOR TABLE table_name [, ...] |FOR ALL TABLES [ WITH ( publication_parameter = value ) ]
CREATE PUBLICATION 向当前数据库添加一个新的发布。发布的名称必须与当前数据库中任何现有发布的名称不同。
name 表示新发布的名称
FOR TABLE 指定要添加到发布的表的列表
FOR ALL TABLES 表示将发布数据库中所有表的更改,包括在将来创建的表
WITH ( publication_parameter = value ) 该子句指定发布的可选参数。支持下列参数:publish (string) 这个参数决定了哪些 DML 操作将由新的发布发布给订阅者。该值是用逗号分隔的操作列表。允许的操作是 insert/update/delete 和 truncate。默认是发布所有的,所以这个选项的默认值是 "insert,update,delete,truncate"。
注意:如果既没有指定 FOR TABLE,也没有指定 FOR ALL TABLES,那么这个发布就是以一组空表开始的,之后可以使用 ALTER PUBLICATION 向这个发布中添加表。创建发布不会开始复制,它只为未来的订阅者定义一个分组和过滤逻辑。
ALTER PUBLICATION
ALTER PUBLICATION 修改发布的定义
ALTER PUBLICATION name ADD TABLE table_name [, ...]
ALTER PUBLICATION name DROP TABLE table_name *
ALTER PUBLICATION name SET ( publication_parameter = value )
ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION name RENAME TO new_name
ALTER PUBLICATION 可以更改发布的属性
ADD TABLE 和 DROP TABLE 子句将从发布中添加和删除一个或多个表。请注意,将表添加到已订阅的发布中将需要在订阅端执行 ALTER SUBSCRIPTION ... REFRESH PUBLICATION 操作才能生效。
订阅 Subscription
订阅是逻辑复制的下游端。定义订阅的节点称为订阅服务器。一个订阅会定义到另一个数据库的连接以及它想要订阅的发布集合,可以是一个或者多个。
如果有需要,一个订阅者节点可以有多个订阅。可以在一对发布者订阅者之间定义多个订阅,在这种情况下要确保被订阅的发布对象不会重叠。
每一个订阅都将通过一个复制槽接收更改。
复制槽提供了一种自动化的方法来确保主控机在所有的后备机收到 WAL 段之前不会移除它们,并且主控机也不会移除可能导致恢复冲突的行,即使后备机断开也是如此。作为复制槽的替代,也可以使用 wal_keep_segments 阻止移除旧的 WAL 段或者使用 archive_command 把段保存在一个归档中。不过,这些方法常常会导致保留的 WAL 段比需要的更多,而复制槽只保留已知所需要的段。hot_standby_feedback 和 vacuum_defer_cleanup_age 保护了相关行不被 vacuum 移除,但是前者在后备机断开期间无法提供保护,而后者则需要被设置为一个很高的值已提供足够的保护。而复制槽正好很好的克服了这些缺点。
如果当前用户是一个超级用户,则订阅会被 pg_dump 转储,否则订阅会被跳过并且提示一个警告,因为非超级用户不能从 pg_subscription 目录中读取所有的订阅信息。
可以使用 CREATE SUBSCRIPTION 创建订阅,并且使用 ALTER SUBSCRIPTION 在任何时刻停止/继续订阅,还可以使用 DROP SUBSCRIPTION 删除订阅。
CREATE SUBSCRIPTION 定义一个新的订阅。
CREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION publication_name , ... [, ... ] ) ]
CREATE SUBSCRIPTION 为当前数据库添加一个新的订阅。订阅名称必须与数据库中任何现有的订阅不同。订阅表示到发布者的复制连接。因此,此命令不仅在本地目录中添加定义,还会在发布者上创建复制插槽。
在运行此命令的事务提交时,将启动逻辑复制工作器以复制新订阅的数据。
参数
subscription_name 新订阅的名称
CONNECTION 'conninfo' 连接发布者的字符串
PUBLICATION publication_name 要订阅的发布者上的发布名称
WITH ( subscription_parameter = value ) 该子句指定订阅的可选参数。支持的参数有:
copy_data (boolean) 指定在复制启动后是否应复制正在订阅的发布中的现有数据。默认值是 true
create_slot (boolean) 指定该命令是否要在发布者上创建复制槽。默认值是 true
enabled (boolean) 指定订阅是否应该主动复制或者是否应该只是设置但尚未启动。默认值是 true
slot_name (string) 要使用的复制插槽的名称。默认行为是使用订阅名称作为插槽的名称。当 slot_name 设置为 NONE 时,将不会有复制槽与订阅关联。这在需要稍后手动设置复制槽的情况下会使用。这样的订阅必须同时 enabled 并且 create_slot 设置为 false。
synchronous_commit (enum) 该参数的值会覆盖 synchronous_commit 设置。默认值是 off。对于逻辑复制使用 off 是安全的:如果订阅者由于缺少同步而丢失事务,数据将从发布者重新发送。进行同步逻辑复制时,不同的设置可能是合适的。逻辑复制工作者向发布者报告写入和刷新的位置,当使用同步复制时,发布者将等待实际刷新。这意味着,当订阅用于同步复制时,将订阅者的 synchronous_commit 设置为 off 可能会增加发布服务器上 COMMIT 的延迟。在这种情况下,将 synchronous_commit 设置为 local 或更高是有利的。
connect (boolean) 指定 CREATE SUBSCRIPTION 是否应该连接到发布者。将其设置为 false 将会改变默认值 enabled、create_slot 和 copy_data 为 false。不允许将 connect 设置为 false 的同时将 enabled、create_slot 或 copy_data 设置为 true。因为该选项设置为 false 时不会建立连接,因此表没有被订阅,所以当启用订阅后,不会复制任何内容。需要运行 ALTER SUBSCRIPTION ... REFRESH PUBLICATION 才能订阅表。
当一个订阅被删除重建时,同步信息会丢失。这意味着数据必须被重新同步。
模式定义不会被复制,并且被发布的表必须在订阅者上存在。只有常规表可以成为复制的目标。非常规表例如视图不能被复制。
表在发布者和订阅者之间使用完全限定的表名进行匹配。不支持复制到订阅服务器上不同名称的表。
表的列也通过名称匹配。订阅表中的列顺序不需要与发布表中的顺序一样,列的数据类型也不需要一样,只要可以将数据的文本表示形式转换为目标类型即可。目标表还可以具有发布表中不存在的额外列,额外列将使用目标表的定义中指定的默认值填充。
复制槽的管理 Replication Slot Management
前面提到过,每一个活跃的订阅都会从远程发布端上的一个复制槽接收更改。通常远程复制槽是在使用 CREATE SUBSCRIPTION 创建订阅是自动创建的,并且在使用 DROP SUBSCRIPTION 删除订阅时自动被删除。不过,在一些情况下,有必要单独操纵订阅以及其底层的复制槽。例如下面的场景:
在创建一个订阅时,复制槽已经存在,在这种情况下,可以使用 create_slot = false 选项创建订阅并关联到现有的复制槽。
在删除一个订阅时,复制槽应该被保留,当订阅者数据库正在被移动到一台不同的主机并且将从那里再被激活时,这种行为很有用。在这种情况下,可以在尝试删除该订阅之前,使用 ALTER SUBSCRIPTION 将复制槽解除关联。
在删除一个订阅是,远程主机不可达,在这种情况下,可以在尝试删除该订阅之前,使用 ALTER SUBSCRIPTION 将复制槽解除关联。如果远程数据库实例不再存在,那么不需要进一步的行动。不过,如果远程数据库实例只是不可达,那么复制槽应该被手动删除。否则它将会继续保留 WAL 并且最终可能会导致磁盘被填满。这种情况应该要仔细地研究。
冲突 Conflicts
逻辑复制的行为类似于正常的 DML 操作,即便数据在订阅者节点本地被修改,逻辑复制也会根据收到的更改来更新数据。如果流入的数据违背了任何约束,复制将停止。这种情况被称为一个冲突。在复制 UPDATE 或 DELETE 操作时,缺失的数据将不会产生冲突并且这类操作将被简单地跳过。
冲突将会产生错误并且停止复制,它必须由用户手工解决。在订阅者的服务器日志中可以找到有关冲突的详细情况。
通过更改订阅者上的数据或者跳过与已有数据冲突的事务可以解决这种冲突。通过调用 pg_replication_origin_advance() 函数可以跳过该事务,函数的参数是对应于该订阅名称的 node_name 以及一个位置。复制源头的当前位置可以在 pg_replication_origin_status 系统视图中看到。
pg_replication_origin_advance(node_name text, pos pg_lsn) 把给定节点的复制进度设置为给定的位置。这主要用于配置更改或者类似操作之后设置初始位置或者新位置。注意这个函数的不当使用可能会导致不一致的复制数据。
限制 Restrictions
逻辑复制当前有下列限制:
数据库模式 和 DDL 命令不会被复制。初始模式可以手工使用 pg_dump schemaonly 进行拷贝,后续的模式改变需要手工保持同步。当位于发布者节点的数据模式发生改变并且被复制的数据开始到达订阅者但发现不适合表的模式时,复制将报错,直至模式被更新。
序列数据 Sequence 不被复制。但是序列值被作为表的一部分会被复制,但是序列本身在订阅者上仍将显示开始值。如果订阅者被用作一个只读数据库,那么这通常不会是什么问题。否则序列需要被更新到最后的值。
支持 TRUNCATE 命令的复制。在复制截断动作时,订阅者将截断与发布者上被截断的相同的表群体。如果所有受影响的表都属于同一个订阅,则没有什么问题。但是如果订阅者上要被截断的某些表有外键链接到不属于同一订阅的表,那么在订阅者上该截断动作的应用将会失败。
大对象不会被复制。
复制只能从基表到基表。也就是说,发布端和订阅端上的表都必须是普通表,而不是视图、物化视图或者外部表。尝试复制不是基表的表将会导致错误。
架构 Architecture
逻辑复制从拷贝发布者数据库上的数据库快照开始,拷贝一旦完成,发布者上的更改会在它们发生时实时传送给订阅者,订阅者按照数据在发布者上被提交的顺序应用数据,这样任意单一订阅中所发布的事务的一致性才能得到保证。
逻辑复制被构建在一种类似于物理流复制的架构上。它由 "walsender" 和 "apply" 进程实现。walsender 进程实现对 WAL 的逻辑解码并将其载入标准逻辑解码插件 pgoutput。该插件把从 WAL 中读取的更改转换成逻辑复制协议并根据发布说明过滤数据,然后数据会被连续地使用流复制协议传输到应用工作者,应用工作者会把数据映射到本地表并以正确的事务顺序应用它们接收到的更改。
订阅者数据库上的应用进程总是将 session_replication_role 设置为 replica 运行。
逻辑复制应用进程当前仅会引发行触发器,而不会引发语句触发器。不过,初始的表同步是以类似一个 COPY 命令的方式实现的,因此会引发 INSERT 的行触发器和语句触发器。
监控 Monitoring
因为逻辑复制是基于与物理流复制相似的架构,一个发布节点上的监控也类似于对物理复制主节点的监控。
有关订阅的监控信息在 pg_stat_subscription 中可以看到,每一个订阅工作者在这个视图都有一行,一个订阅能有零个或者多个活跃订阅工作者取决于它的状态。
安全性 Security
用于复制连接的角色必须有 REPLICATION 属性或者是一个超级用户,该角色的访问必须被配置在 pg_hba.conf 中,并且它必须有 LOGIN 属性。
为了能够拷贝初始表数据,用于复制连接的角色必须在被发布的表上具有 SELECT 特权或者是一个超级用户。
要创建发布,用户必须在数据库中有 CREATE 特权。
要把表加入到一个发布,用户必须在该表上有拥有权。要创建一个自动发布所有表的发布,用户必须是一个超级用户。
要创建订阅,用户必须是一个超级用户。
订阅的应用过程将在本地数据库上以超级用户的特权运行。
特权检查仅在复制连接开始时被执行一次。在从发布者读到每一个更改记录时不会重新检查特权,在每一个更改被应用时也不会重新检查特权。
配置设置 Configuration Settings
逻辑复制要求设置一些配置选项。
在发布者端,wal_level 必须被设置为 logical,max_replication_slots 设置的值必须至少是预期要连接的订阅数加上保留给表同步的连接数。max_wal_senders 应该至少被设置为 max_replication_slots 加上同时连接的物理复制体的数量。
在订阅者端,max_replication_slots 也要求被设置。在这种情况下,它必须至少被设置为将被加入到该订阅者的订阅数。max_logical_replication_workers 必须至少被设置为订阅数加上保留给表同步的连接数。此外,可能还需要调整 max_worker_processes 以容纳复制工作者,至少为 max_logical_replication_workers + 1。注意,一些扩展和并行查询也会从 max_worker_processes 中取得 worker slots。
案例 case
将 cloud_plat 数据库的 public.pat_user 表使用逻辑复制的方式实时同步到 local_plat 的 public 下
在发布者所在数据库 cloud_plat 下创建测试表 public.pat_user
create database cloud_plat;
\c cloud_plat;
set search_path to cloud_plat;
CREATE TABLE "public"."pat_user" (......);
创建发布者并查看所有的发布者|删除发布者
CREATE PUBLICATION test_pub FOR TABLE public.pat_user;
SELECT * FROM pg_publication;
DROP PUBLICATION test_pub;
创建|查看|删除一个复制插槽,slot_name 槽名以及 plugin_name 插件名
SELECT * FROM pg_create_logical_replication_slot('slot_name', 'plugin_name');
SELECT * FROM pg_create_logical_replication_slot('cloud_plat_local_plat_slot', 'pgoutput');
SELECT * FROM pg_replication_slots;
SELECT pg_drop_replication_slot('cloud_plat_local_plat_slot');
SELECT * FROM pg_logical_slot_get_changes('cloud_plat_local_plat_slot',null,null);
SELECT * FROM pg_logical_slot_peek_changes('cloud_plat_local_plat_slot',null,null);
同步测试语句
INSERT INTO public.pat_user(column) VALUES(......);
DELETE FROM public.pat_user WHERE id = ?;
在订阅者所在的数据库 local_plat 下创建同步表 public.pat_user
create database local_plat;
\c local_plat;
set search_path to local_plat;
CREATE TABLE "public"."pat_user" (......);
SELECT * FROM public.pat_user;
创建一个订阅者,使用已创建的插槽
CREATE SUBSCRIPTION test_sub CONNECTION 'host=192.168.10.195 port=5432 dbname=cloud_plat user=postgres password=postgres' PUBLICATION test_pub WITH(create_slot = false,slot_name='cloud_plat_local_plat_slot');
查看|删除|刷新订阅者信息
SELECT * FROM pg_subscription;
DROP SUBSCRIPTION test_sub;
ALTER SUBSCRIPTION test_sub REFRESH PUBLICATION;
当发布者的模式发生变化之后,逻辑复制的复制槽就会被中断,此时只需要根据日志处理好对应的错误,逻辑复制就会自动恢复。
发布者节点进行 DDL 操作
alter table public.pat_user add add_info varchar(255)[];
在订阅节点上查看当前的复制状态:
SELECT * FROM pg_replication_slots;
SELECT * FROM pg_stat_subscription;
|列 |介绍
|: |:|
|subid |订阅的 OID|
|subname |订阅的名称|
|pid |订阅工作者进程的进程 ID|
|relid |工作者正在同步的关系的 OID,对于主应用工作者为空|
|received_lsn |接收到的最后一个预写式日志位置,这个字段的初始值是 0|
|last_msg_send_time|从源头 WAL 发送器接收到的最后一个消息的发送时间|
|last_msg_receipt_time|从源头 WAL 发送器接收到的最后一个消息的接收时间|
|latest_end_lsn |最后一个报告给源头 WAL 发送器的预写式日志位置|
|latest_end_time |报告给源头 WAL 发送器的最后一个预写式日志位置的时间|
总结
首先在发布者节点的 postgresql.conf 中设置配置选项:
wal_level = logical
需要调整 pg_hba.conf 以允许复制:
host all postgres 0.0.0.0/0 trust
然后在发布者数据库上创建发布者:
CREATE PUBLICATION pub_name FOR TABLE table_name;
在订阅者数据库上:
CREATE SUBSCRIPTION sub_name CONNECTION 'dbname=db_name host=192.168.30.140 user=postgres' PUBLICATION pub_name;
在发布节点上配置
vim postgresql.conf
wal_level = logical
max_replication_slots = 5 /允许的最大复制槽数,需要大于订阅节点的数量/
max_wal_senders = 8 /max_replication_slots 加上同时连接的物理复制体的数量/
在订阅节点上配置
max_logical_replication_workers = 5 /订阅数加上保留给表同步的连接数/
max_worker_processes = 6 /max_logical_replication_workers + 1/
在发布节点的数据库上创建:
CREATE PUBLICATION pub_name FOR TABLE schema.table_name;
查询创建的发布信息:
SELECT * FROM pg_publication;
在订阅节点所在的数据库上创建:
CREATE SUBSCRIPTION sub_name CONNECTION 'host=192.168.30.140 port=5432 dbname=db_name user=postgres password=postgres' PUBLICATION pub_name;
在发布者所在的节点上查看创建的复制槽:
SELECT * FROM pg_replication_slots;