|
此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Modulith 2.0.4! |
使用应用程序事件
为了尽可能使应用程序模块彼此解耦,它们主要的交互方式应该是事件的发布与消费。 这样可以避免发起模块需要了解所有可能感兴趣的各方,这是实现应用程序模块集成测试的关键方面(请参阅 集成测试应用程序模块)。
我们经常会发现应用程序组件被定义成这样:
-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final InventoryManagement inventory;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
// Invoke related functionality
inventory.updateStockFor(order);
}
}
@Service
class OrderManagement(val inventory: InventoryManagement) {
@Transactional
fun complete(order: Order) {
inventory.updateStockFor(order)
}
}
complete(…) 方法在功能上产生了引力,因为它会吸引相关功能,从而与定义在其他应用模块中的 Spring Bean 产生交互。
这使得该组件尤其难以测试,因为为了创建 OrderManagement 的实例,我们必须先提供其所依赖的 Bean 的实例(请参阅 处理外向依赖)。
这也意味着,每当我们希望将更多功能集成到业务事件“订单完成”中时,都必须修改该类。
我们可以按以下方式更改应用程序模块的交互:
ApplicationEventPublisher 发布应用程序事件-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final ApplicationEventPublisher events;
private final OrderInternal dependency;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
events.publishEvent(new OrderCompleted(order.getId()));
}
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {
@Transactional
fun complete(order: Order) {
events.publishEvent(OrderCompleted(order.id))
}
}
请注意,我们不是依赖其他应用程序模块的 Spring Bean,而是使用 Spring 的 ApplicationEventPublisher 在主聚合完成状态转换后发布领域事件。
对于更以聚合为导向的事件发布方法,请参阅 Spring Data 的应用程序事件发布机制 以获取详细信息。
由于事件发布默认是同步进行的,因此整体安排的事务语义与上述示例保持一致。
这既有好处也有坏处:好处是我们获得了一个非常简单的的一致性模型(订单状态更改和库存更新要么都成功,要么都失败);坏处是更多被触发的相关功能会扩大事务边界,并可能导致整个事务失败,即使引发错误的功能并不关键。
另一种方法是将在事务提交时的事件消费移至异步处理,并将次要功能严格视为次要功能:
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
这现在有效地将原始事务与监听器的执行解耦了。 虽然这避免了原始业务事务的扩大,但也带来了风险:如果监听器因任何原因失败,事件发布将会丢失,除非每个监听器实际上都实现了自己的安全网。 更糟糕的是,这甚至不能完全奏效,因为系统可能在方法被调用之前就失败了。
应用程序模块监听器
若要在事务内部运行事务性事件监听器,则需要依次使用 @Transactional 对其进行注解。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
为了简化声明旨在描述通过事件集成模块的默认方式,Spring Modulith 提供了 @ApplicationModuleListener 作为快捷方式。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@ApplicationModuleListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@ApplicationModuleListener
fun on(event: OrderCompleted) { /* … */ }
}
事件发布注册表
Spring Modulith 附带了一个事件发布注册表,该注册表挂钩到 Spring 框架的核心事件发布机制。 在事件发布时,它会识别将接收该事件的事务性事件监听器,并为每个监听器(深蓝色)在原始业务事务的一部分中向事件发布日志写入条目。
每个事务性事件监听器都被封装到一个切面中,如果监听器执行成功,该切面会将日志条目标记为已完成。
如果监听器失败,日志条目将保持不变,以便根据应用程序的需求部署重试机制。
可以通过 spring.modulith.events.republish-outstanding-events-on-restart 属性启用事件的自动重新发布。
事件发布生命周期(自 2.0 版本起)
Spring Modulith 2.0 引入了专门的事件发布生命周期,以便您可以区分即将处理、正在进行中、已完成或已失败的发布。 这使得仅重新提交失败的发布以及在不会错误地将进行中的发布视为失败的情况下从崩溃中恢复变得更加容易。
发布状态
每个事件发布都有一个 EventPublication.Status:
-
PUBLISHED– 出版物已存储,正在等待处理(或即将被提取)。 -
PROCESSING– 监听器已声明该发布事件并正在执行。围绕监听器的拦截器会在调用监听器之前将此值设置为此状态,并在监听器返回时将其设置为COMPLETED或FAILED。 -
COMPLETED– 监听器成功完成。已设置完成日期(除非完成模式为DELETE)。 -
FAILED– 监听器抛出了异常,或者发布事件因过时机制被标记为失败(请参阅 事件发布的过时性与自动标记为失败)。 -
RESUBMITTED– 之前发布失败的条目已重新提交,目前正在等待处理。
出版详情
除了状态之外,每个出版物还跟踪:
-
最后重新提交日期 – 出版物最后一次重新提交的时间(如果有)。通过
EventPublication.getLastResubmissionDate()暴露。 -
完成尝试次数 – 监听器被调用的频率(包括当前运行)。在移动到
PROCESSING时递增,因此即使监听器期间发生崩溃,尝试计数仍会更新。通过EventPublication.getCompletionAttempts()暴露。
这些允许您使用重新提交 API 和选项来实现诸如“仅在失败时间超过 X 时重新提交”或“在 N 次尝试后停止”等策略。
事件发布的过时性与自动标记为失败
如果应用程序崩溃或监听器挂起,发布内容可能仍停留在 PUBLISHED、PROCESSING 或 RESUBMITTED 状态。
为了能够将这些情况视为失败并重新提交(或忽略),您可以配置每个状态在持续多长时间后被视为过时。后台任务会定期将过时的发布内容标记为 FAILED。
Spring Modulith 提供了一个陈旧性监控器(自 2.0 版本起),它以可配置的时间间隔作为定时任务运行。
当任何陈旧性持续时间被设置为非零值时,监控器即处于激活状态:每次运行时,它会发现 PUBLISHED、PROCESSING 或 RESUBMITTED 中早于对应持续时间的事件发布,并将它们标记为 FAILED。这使得可以恢复(例如通过 FailedEventPublications.resubmit(…))或以其他方式处理原本可能卡住的发布。
您可以通过 spring.modulith.events.staleness 配置属性 对其进行自定义。
如果 published、processing 和 resubmitted 全部为零(默认值),则陈旧性监控器不会注册定时任务,也不会自动将发布标记为失败。
发布失败并重新提交
注册表允许您显式地处理失败的发布:
-
FailedEventPublications(自 2.0 版本起)– 使用此类型的 Bean 仅重新提交发布失败的事件:
resubmit(ResubmissionOptions)。 -
ResubmissionOptions – 控制重新提交的工作方式:批次大小、最大进行中数量、发布的最小年龄,以及一个可选的过滤器(例如按事件类型或
completionAttempts)。使用ResubmissionOptions.defaults()创建,并使用withBatchSize(…)、withMinAge(…)、withFilter(…)等进行自定义。
重新提交会将状态从 FAILED 更改为 RESUBMITTED 并更新最后重新提交日期;当监听器即将运行时,发布状态会变为 PROCESSING,并且完成尝试次数会递增。
对于一般的“不完整”发布(包括失败的发布,以及根据配置可能包含的过时发布),现有的 IncompleteEventPublications API 仍然适用;自 2.0 版本起,除了基于谓词和持续时间的重载方法外,它还支持 resubmitIncompletePublications(ResubmissionOptions)。
Spring Boot 事件注册Starters
使用事务性事件发布日志需要在应用程序中添加一系列组件的组合。 为了简化这项任务,Spring Modulith 提供了以所选 持久化技术 为核心的Starters POM,并默认采用基于 Jackson 的 EventSerializer 实现。 可用的Starters如下:
| 持久化技术 | 构件 | 描述 |
|---|---|---|
JPA |
|
使用 JPA 作为持久化技术。 |
JDBC |
|
使用 JDBC 作为持久化技术。也适用于基于 JPA 的应用程序,但会绕过您的 JPA 提供者进行实际的事件持久化。 |
MongoDB |
|
使用 MongoDB 作为持久化技术。同时启用 MongoDB 事务,并要求服务器配置为副本集模式以进行交互。可以通过将 |
Neo4J |
|
在 Spring Data Neo4j 背后使用 Neo4j。 |
管理事件发布
在应用程序运行期间,事件发布可能需要以多种方式进行管理。
未完成的发布可能需要在给定时间后重新提交给相应的监听器。
另一方面,已完成的发布可能需要从数据库中清除或移动到归档存储中。
由于此类清理需求因应用程序而异,Spring Modulith 提供了一个 API 来处理这两种类型的发布。
该 API 可通过 spring-modulith-events-api 构件获得,您可以将其添加到您的应用程序中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
<version>2.1.0-M3</version>
</dependency>
dependencies {
implementation 'org.springframework.modulith:spring-modulith-events-api:2.1.0-M3'
}
此构件包含可作为 Spring Bean 供应用程序代码使用的主要抽象:
-
CompletedEventPublications— 此接口允许访问所有已完成的事件发布,并提供了一个 API,用于立即从数据库中清除所有这些发布,或清除早于指定时长(例如 1 分钟)的已完成发布。 -
IncompleteEventPublications— 此接口允许访问所有未完成的事件发布,以便重新提交符合给定谓词的事件、相对于原始发布日期早于给定Duration的事件,或通过resubmitIncompletePublications(ResubmissionOptions)匹配自定义条件的事件(自 2.0 版本起)。 -
FailedEventPublications(自 2.0 版本起)— 此接口允许仅通过resubmit(ResubmissionOptions)重新提交失败的事件发布,如 失败的发布与重新提交 中所述。
事件发布完成
当事务性或 @ApplicationModuleListener 执行成功完成时,事件发布将被标记为已完成。
默认情况下,完成状态是通过在 EventPublication 上设置完成日期来注册的。
这意味着已完成的发布将保留在事件发布注册表中,以便可以通过如 上文 所述的 CompletedEventPublications 接口进行检查。
由此产生的一个结果是,您需要编写一些代码,定期清理旧的、已完成的 EventPublication。
否则,它们的持久化抽象(例如关系数据库表)将无限增长,并且与存储交互以创建和完成新的 EventPublication 的过程可能会变慢。
Spring Modulith 1.3 引入了一个配置属性 spring.modulith.events.completion-mode,以支持两种额外的完成模式。
其默认值为 UPDATE,该值基于上述策略。
或者,可以将完成模式设置为 DELETE,这将更改注册表的持久化机制,改为在完成时删除 EventPublication。
这意味着 CompletedEventPublications 将不再返回任何发布记录,但与此同时,您也不必再担心需要手动从持久化存储中清理已完成的事件了。
第三个选项是 ARCHIVE 模式,它会将条目复制到归档表、集合或节点中。
对于该归档条目,会设置完成日期并移除原始条目。
与 DELETE 模式相反,已完成的事件发布仍然可以通过 CompletedEventPublications 抽象进行访问。
事件发布存储库
要实际写入事件发布日志,Spring Modulith 提供了一个 EventPublicationRepository SPI 接口,并针对支持事务的流行持久化技术(如 JPA、JDBC 和 MongoDB)提供了相应实现。
您可以通过将对应的 JAR 包添加到 Spring Modulith 应用程序中来选择要使用的持久化技术。
我们已准备了专用的 Starters(starters) 以简化该任务。
基于 JDBC 的实现将为事件发布日志创建一个专用表,除非相应的配置属性(spring.modulith.events.jdbc.schema-initialization.enabled)设置为 false。
当然,如果所需的表已经存在(例如通过 Flyway 或 Liquibase 等数据库迁移工具创建),架构创建过程也会自动跳过。
有关详细信息,请参阅附录中的 架构概述。
外部化事件
| 以下部分描述了基于异步事件监听器的 Spring Modulith 原生事件外部化功能。 虽然这是一种实用且简单的解决方案,但它缺乏开发者在实际发件箱模式实现中可能期望的关键功能。 Spring Modulith 2.1 引入了通过 Namastack Outbox 进行事件外部化的支持。 有关详细信息,请参阅文档的 相应部分。 |
应用程序模块之间交换的某些事件可能对外部系统感兴趣。 Spring Modulith 允许将选定的事件发布到各种消息代理。 要使用该支持,您需要执行以下步骤:
-
将特定于代理的 Spring Modulith 构件添加到您的项目中。
-
通过为事件类型添加 Spring Modulith 或 jMolecules 的
@Externalized注解,选择需要外部化的事件类型。 -
在注解的 value 中指定特定于代理的路由目标。
要了解如何使用其他方法选择事件进行外部化,或自定义它们在代理内的路由,请查阅事件外部化的基础。
支持的基础设施
| 代理 | 构件 | 描述 |
|---|---|---|
Kafka |
|
使用 Spring Kafka 与消息代理进行交互。 逻辑路由键将用作 Kafka 的主题(topic)和消息键(message key)。 |
AMQP |
|
使用 Spring AMQP 与任何兼容的消息代理进行交互。 例如,需要显式声明对 Spring Rabbit 的依赖。 逻辑路由键将用作 AMQP 路由键。 |
JMS |
|
使用 Spring 的核心 JMS 支持。 不支持路由键。 |
Spring 消息 |
|
使用 Spring 的核心 |
事件外部化的基础
Spring Modulith 的事件外部化是作为事务性事件监听器实现的,它委托给特定于代理的发布实现。 这意味着 Spring Modulith 的事件发布注册表能够防止在与代理交互过程中发生故障,从而确保可以通过提供的 API 重新提交发布内容。
事件外部化对发布的每个应用程序事件执行三个步骤。
-
确定事件是否应该被外部化 — 我们将此称为“事件选择”。 默认情况下,只有位于 Spring Boot 自动配置包内并使用受支持的
@Externalized注解之一进行注解的事件类型才会被选中进行外部化。 -
准备消息(可选) — 默认情况下,事件会由相应的代理基础设施原样序列化。 一个可选的映射步骤允许开发人员自定义甚至完全替换原始事件,使其负载适合外部各方。 对于 Kafka 和 AMQP,开发人员还可以向要发布的消息添加头信息。
-
确定路由目标 — 消息代理客户端需要一个逻辑目标来发布消息。 该目标通常标识物理基础设施(根据代理的不同,可以是主题、交换机或队列),并且通常从事件类型静态派生。 除非在
@Externalized注解中明确指定,否则 Spring Modulith 会使用应用程序本地的类型名称作为目标。 换句话说,在一个基础包为com.acme.app的 Spring Boot 应用程序中,事件类型com.acme.app.sample.SampleEvent将被发布到sample.SampleEvent。某些代理还允许定义一个相当动态的路由键,该键在实际目标中用于不同的目的。 默认情况下,不使用路由键。
基于注解的事件外部化配置
要通过 @Externalized 注解定义自定义路由键,可以在每个特定注解中可用的 target/value 属性中使用 $target::$key 模式。
target 和 key 都可以是 SpEL 表达式,该表达式会将事件实例配置为根对象。
-
Java
-
Kotlin
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
String getLastname() { (1)
// …
}
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
fun getLastname(): String { (1)
// …
}
}
CustomerCreated 事件通过访问器方法暴露客户的姓氏。
该方法随后在目标声明的 :: 分隔符之后的键表达式中,通过 #this.getLastname() 表达式被使用。
如果密钥计算变得更加复杂,建议将其委托给一个以事件为参数的 Spring Bean:
-
Java
-
Kotlin
@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")
编程式事件外部化配置
spring-modulith-events-api 构件包含 EventExternalizationConfiguration,允许开发者自定义上述所有步骤。
-
Java
-
Kotlin
@Configuration
class ExternalizationConfiguration {
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent.class, event -> …) (3)
.headers(event -> …) (4)
.routeKey(WithKeyProperty.class, WithKeyProperty::getKey) (5)
.build();
}
}
@Configuration
class ExternalizationConfiguration {
@Bean
fun eventExternalizationConfiguration(): EventExternalizationConfiguration {
EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent::class.java) { event -> … } (3)
.headers() { event -> … } (4)
.routeKey(WithKeyProperty::class.java, WithKeyProperty::getKey) (5)
.build()
}
}
| 1 | 我们首先创建一个EventExternalizationConfiguration的默认实例。 |
| 2 | 我们通过调用前一次调用返回的 Selector 实例上的某个 select(…) 方法来自定义事件选择。
此步骤从根本上禁用了应用程序基础包过滤器,因为我们现在只查找注解。
提供了便捷方法,可按类型、包、包与注解轻松选择事件。
此外,还提供了一种快捷方式,可在单一步骤中定义选择和路由。 |
| 3 | 我们为 SomeEvent 个实例定义了一个映射步骤。
请注意,路由仍将由原始事件实例决定,除非您在路由器上额外调用 ….routeMapped()。 |
| 4 | 我们向要发送的消息添加自定义标头,既可以像所示那样通用添加,也可以针对特定的负载类型进行添加。 |
| 5 | 我们最终通过定义一个方法句柄来提取事件实例的值,从而确定一个路由键。
或者,可以通过在前一次调用返回的 Router 实例上使用通用的 route(…) 方法,为单个事件生成完整的 RoutingKey。 |
序列化事件外部化
Spring Modulith 的事件外部化是作为事务性事件监听器实现的。 这意味着多个线程可能会同时触发与代理的交互。 当事件发布被重新提交时,这一点尤其重要。 由于代理可能会看到交互量的突然激增,某些交互可能需要更长时间,从而导致后续事件的外部化可能会先于之前的事件完成。
为防止这种情况,可以通过将 spring.modulith.events.externalization.serialize-externalization 属性设置为 true,使与代理的交互序列化,从而确保一次只发送一个事件。
Namastack Outbox 支持
如果需要高级发件箱功能,可以将事件外部化委托给 Namastack Outbox。 此功能目前仅适用于关系型数据库。 要激活该功能,首先请将 Spring Modulith Namastack Starters添加到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-starter-namastack</artifactId>
</dependency>
dependencies {
implementation 'org.springframework.modulith:spring-modulith-starter-namastack'
}
测试发布事件
以下部分介绍了一种仅专注于跟踪 Spring 应用程序事件的测试方法。
若需了解针对使用 @ApplicationModuleListener 的模块进行更全面测试的方法,请参阅 Scenario API。 |
Spring Modulith 的 @ApplicationModuleTest 使得能够将 PublishedEvents 实例注入到测试方法中,以验证在被测业务操作过程中是否发布了一组特定的事件。
-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(PublishedEvents events) {
// …
var matchingMapped = events.ofType(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
assertThat(matchingMapped).hasSize(1);
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: PublishedEvents events) {
// …
val matchingMapped = events.ofType(OrderCompleted::class.java)
.matching(OrderCompleted::getOrderId, reference.getId())
assertThat(matchingMapped).hasSize(1)
}
}
请注意,PublishedEvents 提供了一个 API,用于选择符合特定条件的事件。
验证通过 AssertJ 断言完成,该断言用于核验预期的元素数量。
如果您无论如何都使用 AssertJ 进行这些断言,也可以将 AssertablePublishedEvents 用作测试方法的参数类型,并使用其提供的流式断言 API。
AssertablePublishedEvents 来验证事件发布-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(AssertablePublishedEvents events) {
// …
assertThat(events)
.contains(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: AssertablePublishedEvents) {
// …
assertThat(events)
.contains(OrderCompleted::class.java)
.matching(OrderCompleted::getOrderId, reference.getId())
}
}
注意,assertThat(…) 表达式返回的类型允许直接对发布的事件定义约束。