原文地址: https://debezium.io/blog/2018/12/05/automating-cache-invalidation-with-change-data-capture/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
通过更改数据捕获自动使缓存失效
2018 年 12 月 5 日 作者: Gunnar Morling
讨论 实例
Hibernate ORM / JPA 的二级缓存是一种经过验证且有效的提高应用程序性能的方法:缓存只读或很少修改的实体可以避免与数据库的往返,从而提高应用程序的响应时间。
与一级缓存不同,二级缓存与会话工厂(或 JPA 术语中的实体管理器工厂)关联,因此其内容在事务和并发会话之间共享。当然,如果缓存的实体被修改,相应的缓存条目也必须更新(或从缓存中清除)。只要数据更改是通过 Hibernate ORM 完成的,就不用担心:ORM 会自动更新缓存。
然而,当绕过应用程序时,例如直接修改数据库中的记录时,事情会变得棘手。然后,Hibernate ORM 无法知道缓存的数据已过时,因此有必要显式地使受影响的项目无效。这样做的常见方法是预见一些允许清除应用程序缓存的管理功能。为此,重要的是不要忘记调用失效功能,否则应用程序将继续使用过时的缓存数据。
接下来,我们将探索一种缓存失效的替代方法,该方法以可靠且完全自动化的方式工作:通过使用 Debezium 及其变更数据捕获(CDC) 功能,您可以跟踪数据库本身中的数据更改并做出反应任何已应用的更改。这允许近乎实时地使受影响的缓存条目失效,而不存在由于错过更改而导致数据过时的风险。如果某个条目已从缓存中逐出,Hibernate ORM 会在下次请求时从数据库加载该实体的最新版本。
示例应用程序
作为示例,考虑两个实体的简单模型,PurchaseOrder并且Item:
图片来自于官网
域模型示例
采购订单代表商品的订单,其总价格是订购数量乘以商品的基本价格。
源代码
本示例的源代码在GitHub 上提供。如果您想遵循并尝试下面描述的所有步骤,请克隆存储库并按照README.md中的说明构建项目。
将订单和项目建模为 JPA 实体非常简单:
@Entity
public class PurchaseOrder {
@Id
@GeneratedValue(generator = "sequence")
@SequenceGenerator(
name = "sequence", sequenceName = "seq_po", initialValue = 1001, allocationSize = 50
)
private long id;
private String customer;
@ManyToOne private Item item;
private int quantity;
private BigDecimal totalPrice;
// ...
}
由于项目的更改很少,因此Item应该缓存实体。这可以通过简单地指定 JPA 的@Cacheable注释来完成:
@Entity
@Cacheable
public class Item {
@Id
private long id;
private String description;
private BigDecimal price;
// ...
}
您还需要在META-INF/persistence.xml文件中启用二级缓存。该属性hibernate.cache.use_second_level_cache激活缓存本身,ENABLE_SELECTIVE缓存模式只会导致那些用 注释的实体被放入缓存中@Cacheable。启用 SQL 查询日志记录和缓存访问统计信息也是一个好主意。这样您就可以通过检查应用程序日志来验证事情是否按预期工作:
<persistence-unit name="orders-PU-JTA" transaction-type="JTA">
<jta-data-source>java:jboss/datasources/OrderDS</jta-data-source>
<shared-cache-mode>ENABLE_SELECTIVE</shared-cache-mode>
<properties>
<property name="hibernate.cache.use_second_level_cache" value="true" />
<property name="hibernate.show_sql" value="true" />
<property name="hibernate.format_sql" value="true" />
<property name="hibernate.generate_statistics" value="true" />
<!-- dialect etc. ... -->
</properties>
</persistence-unit>
当在Java EE应用程序服务器上运行时(或者Jakarta EE堆栈在捐赠给 Eclipse 基金会后的调用方式),这就是启用二级缓存所需的全部内容。对于WildFly(示例项目中使用的),默认情况下使用Infinispan键/值存储作为缓存提供程序。
现在尝试看看通过在数据库中运行一些 SQL 绕过应用程序层来修改商品价格时会发生什么。如果您已查看示例源代码,请注释掉该类DatabaseChangeEventListener并按照README.md中的说明启动应用程序。然后,您可以像这样使用curl下订单(几个示例项目已在应用程序启动时保留):
curl -H “Content-Type: application/json”
-X POST
–data ‘{ “customer” : “Billy-Bob”, “itemId” : 10003, “quantity” : 2 }’
http://localhost:8080/cache-invalidation/rest/orders
{
“id” : 1002,
“customer” : “Billy-Bob”,
“item” : {
“id” :10003,
“description” : “North By Northwest”,
“price” : 14.99
},
“quantity” : 2,
“totalPrice” : 29.98
}
响应是预期的,因为商品价格为 14.99。现在直接在数据库中更新商品的价格。该示例使用 Postgres,因此您可以使用psql CLI 实用程序来执行此操作:
docker-compose exec postgres bash -c ‘psql -U $POSTGRES_USER $POSTGRES_DB -c “UPDATE item SET price = 20.99 where id = 10003”’
使用curl 为同一商品下另一个采购订单,您会发现计算出的总价并未反映更新。不好!但这并不太令人惊讶,因为价格更新的应用完全绕过了应用程序层和 Hibernate ORM。
更改事件处理程序
现在让我们探讨如何使用 Debezium 和 CDC 对表中的更改做出反应item并使相应的缓存条目无效。
虽然 Debezium 大多数时候都部署到Kafka Connect中(从而将更改事件流式传输到 Apache Kafka 主题中),但它还有另一种操作模式,对于当前的用例来说非常方便。使用嵌入式引擎,您可以直接在应用程序中将 Debezium 连接器作为库运行。对于从数据库接收到的每个更改事件,将调用配置的回调方法,在当前情况下,该方法将从二级缓存中逐出受影响的项目。
下图展示了这种方法的设计:
图片来自于官网
架构概述
虽然这不具备 Apache Kafka 提供的可扩展性和容错能力,但它很好地满足了给定的要求。由于二级缓存与应用程序生命周期绑定,因此不需要 Kafka Connect 框架提供的偏移管理和重启功能。对于给定的用例,在应用程序运行时接收数据更改事件就足够了,并且使用嵌入式引擎可以实现这一点。
集群应用程序
请注意,在运行每个节点都有本地缓存的集群应用程序时,使用 Apache Kafka 以及将 Debezium 定期部署到 Kafka Connect 中仍然可能有意义。Kafka 和 Connect 允许您部署单个连接器实例,并让应用程序节点监听包含更改事件的主题,而不是在每个节点上注册连接器。这将导致数据库中的资源利用率降低。
将 Debezium 嵌入式引擎 ( io.debezium:debezium-embedded:0.9.0.Beta1 ) 和 Debezium Postgres 连接器 ( io.debezium:debezium-connector-postgres:0.9.0.Beta1 ) 的依赖项添加到您的项目中,用于监听数据库中任何更改的类DatabaseChangeEventListener可以这样实现:
@ApplicationScoped
public class DatabaseChangeEventListener {
@Resource
private ManagedExecutorService executorService;
@PersistenceUnit private EntityManagerFactory emf;
@PersistenceContext
private EntityManager em;
private EmbeddedEngine engine;
public void startEmbeddedEngine(@Observes @Initialized(ApplicationScoped.class) Object init) {
Configuration config = Configuration.empty()
.withSystemProperties(Function.identity()).edit()
.with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
.with(EmbeddedEngine.ENGINE_NAME, "cache-invalidation-engine")
.with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class)
.with("name", "cache-invalidation-connector")
.with("database.hostname", "postgres")
.with("database.port", 5432)
.with("database.user", "postgresuser")
.with("database.password", "postgrespw")
.with("database.server.name", "dbserver1")
.with("database.dbname", "inventory")
.with("database.whitelist", "public")
.with("snapshot.mode", "never")
.build();
this.engine = EmbeddedEngine.create()
.using(config)
.notifying(this::handleDbChangeEvent)
.build();
executorService.execute(engine);
}
@PreDestroy
public void shutdownEngine() {
engine.stop();
}
private void handleDbChangeEvent(SourceRecord record) {
if (record.topic().equals("dbserver1.public.item")) {
Long itemId = ((Struct) record.key()).getInt64("id");
Struct payload = (Struct) record.value();
Operation op = Operation.forCode(payload.getString("op"));
if (op == Operation.UPDATE || op == Operation.DELETE) {
emf.getCache().evict(Item.class, itemId);
}
}
}
}
应用程序启动时,这将配置Debezium Postgres 连接器的实例并设置用于运行连接器的嵌入式引擎。连接器选项(主机名、凭据等)与将连接器部署到 Kafka Connect 时基本相同。不需要对现有数据进行初始快照,因此快照模式设置为“从不”。
偏移存储选项用于控制如何保存连接器偏移。由于不需要处理连接器未运行时发生的任何更改事件(相反,您只需在重新启动后开始从当前位置读取日志),因此使用 Kafka Connect 提供的内存中实现。
配置完成后,嵌入式引擎必须通过实例运行Executor。@Resource由于该示例在 WildFly 中运行,因此可以通过为此目的的注入简单地获取托管执行器(请参阅JSR 236)。
嵌入式引擎被配置为handleDbChangeEvent()针对每个接收到的数据改变事件调用该方法。在此方法中,首先检查传入事件是否源自表item。如果是这种情况,并且更改事件表示UPDATEorDELETE语句,则受影响的Item实例将从二级缓存中逐出。JPA 2.0 为此目的提供了一个简单的 API,可通过EntityManagerFactory.
类就位后,当通过psqlDatabaseChangeEventListener进行另一个项目更新时,缓存条目现在将被自动逐出。更新后为该商品下第一个采购订单时,您将在应用程序日志中看到 Hibernate ORM 如何执行查询以加载订单引用的商品。此外,缓存统计信息将报告一个“L2C 未命中”。当后续订购同一商品时,将再次从缓存中获取该商品。SELECT … FROM item …
最终一致性
虽然事件处理几乎实时发生,但需要指出的是,它仍然应用最终一致性语义。这意味着在提交事务的时间点和更改事件从日志流式传输到事件处理程序并且缓存条目无效的时间点之间存在非常短的时间窗口。
避免应用程序触发的数据更改后缓存失效
上面所示的更改事件监听器满足了外部数据更改后使缓存项失效的要求。但在目前的形式中,它逐出缓存项有点过于激进:Item通过应用程序本身更新实例时,缓存项也会被清除。这不仅是不需要的(因为缓存的项目已经是当前版本),而且甚至会适得其反:多余的缓存驱逐将导致额外的数据库往返,从而导致更长的响应时间。
因此,有必要区分应用程序本身执行的数据更改和外部数据更改。只有在后一种情况下,受影响的项目才应从缓存中逐出。为此,您可以利用每个 Debezium 数据更改事件都包含原始交易 ID 的事实。跟踪应用程序本身运行的所有事务允许仅针对外部事务更改的那些项目触发缓存逐出。
考虑到这一变化,整体架构如下所示:
图片来自于官网
事务注册表的架构概述
首先要实现的是交易注册表,即用于保存交易簿的类:
@ApplicationScoped
public class KnownTransactions {
private final DefaultCacheManager cacheManager;
private final Cache<Long, Boolean> applicationTransactions;
public KnownTransactions() {
cacheManager = new DefaultCacheManager();
cacheManager.defineConfiguration(
"tx-id-cache",
new ConfigurationBuilder()
.expiration()
.lifespan(60, TimeUnit.SECONDS)
.build()
);
applicationTransactions = cacheManager.getCache("tx-id-cache");
}
@PreDestroy
public void stopCacheManager() {
cacheManager.stop();
}
public void register(long txId) {
applicationTransactions.put(txId, true);
}
public boolean isKnown(long txId) {
return Boolean.TRUE.equals(applicationTransactions.get(txId));
}
}
这使用 InfinispanDefaultCacheManager创建和维护应用程序遇到的事务 ID 的内存缓存。由于数据更改事件接近实时到达,缓存条目的 TTL 可能相当短(事实上,示例中显示的一分钟值是非常保守地选择的,通常事件应该在几秒钟内收到)。
下一步是每当应用程序处理请求时检索当前事务 ID 并将其注册到KnownTransactions. 每笔交易都应该发生一次。有多种方法可以实现此逻辑;下面FlushEventListener使用 Hibernate ORM 来实现此目的:
class TransactionRegistrationListener implements FlushEventListener {
private volatile KnownTransactions knownTransactions;
public TransactionRegistrationListener() {
}
@Override
public void onFlush(FlushEvent event) throws HibernateException {
event.getSession().getActionQueue().registerProcess( session -> {
Number txId = (Number) event.getSession().createNativeQuery("SELECT txid_current()")
.setFlushMode(FlushMode.MANUAL)
.getSingleResult();
getKnownTransactions().register(txId.longValue());
} );
}
private KnownTransactions getKnownTransactions() {
KnownTransactions value = knownTransactions;
if (value == null) {
knownTransactions = value = CDI.current().select(KnownTransactions.class).get();
}
return value;
}
}
由于没有可移植的方法来获取事务 ID,因此这是使用本机 SQL 查询来完成的。对于 Postgres,txid_current()可以为此调用该函数。Hibernate ORM 事件侦听器不受通过 CDI 的依赖注入的影响。因此,静态current()方法用于获取应用程序的 CDI 容器的句柄并获取对KnownTransactionsbean 的引用。
每当 Hibernate ORM 将其持久性上下文与数据库同步(“刷新”)时,都会调用此侦听器,这通常在提交事务时只发生一次。
手动冲洗
会话/实体管理器也可以手动刷新,在这种情况下,该txid_current()函数将被多次调用。为了简单起见,这里忽略了这一点。示例存储库中的实际代码包含此类的稍微扩展版本,它确保事务 ID 仅获取一次。
要使用 Hibernate ORM 注册刷新侦听器,必须在META-INF/services/org.hibernate.integrator.spi.IntegratorIntegrator文件中创建并声明实现:
public class TransactionRegistrationIntegrator implements Integrator {
@Override
public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory,
SessionFactoryServiceRegistry serviceRegistry) {
serviceRegistry.getService(EventListenerRegistry.class)
.appendListeners(EventType.FLUSH, new TransactionRegistrationListener());
}
@Override
public void disintegrate(SessionFactoryImplementor sessionFactory,
SessionFactoryServiceRegistry serviceRegistry) {
}
}
io.debezium.examples.cacheinvalidation.persistence.TransactionRegistrationIntegrator
在引导过程中,Hibernate ORM 将检测集成器类(通过Java 服务加载器),调用其integrate()方法,该方法依次注册事件的侦听器类FLUSH。
最后一步是在数据库更改事件处理程序中排除由应用程序本身运行的事务引起的任何事件:
@ApplicationScoped
public class DatabaseChangeEventListener {
// ...
@Inject
private KnownTransactions knownTransactions;
private void handleDbChangeEvent(SourceRecord record) {
if (record.topic().equals("dbserver1.public.item")) {
Long itemId = ((Struct) record.key()).getInt64("id");
Struct payload = (Struct) record.value();
Operation op = Operation.forCode(payload.getString("op"));
Long txId = ((Struct) payload.get("source")).getInt64("txId");
if (!knownTransactions.isKnown(txId) &&
(op == Operation.UPDATE || op == Operation.DELETE)) {
emf.getCache().evict(Item.class, itemId);
}
}
}
}
这样,所有的部分就都准备好了:缓存Item只会在外部数据更改后被逐出,但在应用程序本身完成更改后不会被逐出。为了确认,您可以使用curl调用示例的items资源:
curl -H “Content-Type: application/json”
-X PUT
–data ‘{ “description” : “North by Northwest”, “price” : 20.99}’
http://localhost:8080/cache-invalidation/rest/items/10003
在此更新后为该项目下一个订单时,您应该看到该Item实体是从缓存中获取的,即更改事件不会导致该项目的缓存条目被逐出。相反,如果您再次通过psql更新商品的价格,则应从缓存中删除该商品,并且订单请求将产生缓存未命中,然后针对SELECT数据库item中的表产生缓存未命中。
概括
在这篇博文中,我们探讨了如何利用 Debezium 和更改数据捕获在外部数据更改后使应用程序级缓存失效。与手动缓存失效相比,这种方法工作非常可靠(通过直接从数据库日志捕获更改,不会错过任何事件)并且快速(数据更改后缓存驱逐几乎实时发生)。
正如您所看到的,实现这一点不需要太多的粘合代码。虽然所示的实现在某种程度上特定于示例的实体,但应该可以以更通用的方式实现更改事件处理程序,以便它可以处理一组配置的实体类型(本质上,数据库更改侦听器将具有以通用方式将更改事件中的主键字段转换为相应实体的主键类型)。此外,此类通用实现还必须提供获取最常用数据库的当前事务 ID 的逻辑。
请告诉我们您是否认为这对于 Debezium 和 Hibernate ORM 来说是一个有趣的扩展。例如,这可能是 Debezium 旗下的一个新模块,如果您有兴趣为 Debezium 做出贡献,它也可能是一个非常好的项目。如果您对此想法有任何想法,请在下面发表评论或访问我们的邮件列表。
非常感谢 Guillaume Smet、Hans-Peter Grahsl 和 Jiri Pechanec 在撰写本文时提供的反馈!