现代企业应用中的Spring Integration
是什么
Spring Integration Reference Guide
Spring Integration 提供了 Spring 编程模型的扩展,它支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。
Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。
Spring Integration 支持消息驱动的体系结构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件与基础设施进一步隔离,开发人员可以减轻复杂的集成职责。
什么促使了 Spring Integration 的诞生
- 复杂的应用集成需求 在现代企业应用中,应用系统应用场景复杂,需要进行互相通信和数据交互。
- 标准化的集成模型 基于 EIP (Enterprise Integration Patterns),Spring Integration 带来了一系列标准化的构件,便于应用程序快速构建。
- 便利的 Spring 介入支持 Spring 充分利用了自带的配置管理和 AOP 功能,使得消息驱动算法和功能以一致方式进行配置和扩展。
- 前端和后端集成需求 Spring Integration 提供了多样化的连接器,轻松完成与实现 REST, WebSocket, MQTT, Kafka 等通信协议的集成。
核心概念
Message
Message :: Spring Integration
Message Channels
Message Channels :: Spring Integration
Message Transformer
用于转换消息的内容(Payload)或头信息(Headers)。它的主要作用是将消息从一种格式或结构转换为另一种,以满足下游组件或系统的需求。
示例
Transformer 通常通过实现 GenericTransformer
接口或使用注解 @Transformer
来定义
通过 @Transformer
注解定义:
1 |
|
使用 Lambda 定义:
1 |
|
Message Filter
用于过滤消息。它根据指定的条件(通过 Expression
或 Predicate
)检查传入的消息,如果消息符合条件,则将其传递到下一个通道;否则,消息会被丢弃。消息过滤器通常与发布-订阅通道结合使用,其中多个消费者可能会收到相同的消息,可以使用过滤器来缩小要处理的消息集范围。
示例
基于表达式的过滤:
1 |
|
基于自定义逻辑的过滤:
1 |
|
基于注解定义过滤器:
1 |
|
Message Router
用于决定接下来应该接收消息的一个或多个通道。通常,该决定基于消息的内容或消息头中可用的元数据。它类似于一个决策点,可以动态地决定消息的流向。
使用场景
- 消息分发:根据消息类型、内容等条件,将消息分发到不同的处理流程。
- 动态负载:根据业务逻辑动态选择通道,减少静态配置的复杂性。
- 业务流程编排:在复杂的流程中,根据条件动态调整流程。
示例
根据消息体内容进行路由:
1 |
|
使用注解定义路由器:
1 |
|
Service Activator
用于将消息从输入通道转发到具体的业务逻辑处理器,并将处理结果发送到输出通道。
使用场景
- 处理从某个通道接收到的消息。
- 转发处理后的消息到其他通道。
- 实现简单的消息过滤、转换或处理。
示例
以下示例展示如何使用 @ServiceActivator
处理 MQTT 消息。当 mqttInputChannel
收到消息时,Spring Integration 会调用 mqttMessageHandler
,传递消息给业务逻辑进行处理。
1 | import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; |
如果需要将处理后的消息发送到另一个通道,可以指定输出通道:
1 |
|
Message Gateway
用于创建消息网关(Messaging Gateway)。它将消息发送到 Spring Integration 的消息通道,同时允许我们以接口的形式调用消息流,从而在应用程序中实现解耦和更高的可读性。
- 提供接口调用的入口
使用@MessagingGateway
定义的接口是消息流的入口点,调用接口方法即可向消息通道发送消息。接口的具体由GatewayProxyFactoryBean
创建动态代理对象实现。 - 实现业务逻辑与消息流的解耦
应用程序调用接口的方法,不需要直接处理消息通道的具体细节。 - 简化配置
自动将接口与消息通道绑定,无需手动配置复杂的 Spring Integration 流程。 - 支持请求-回复模式
可以轻松实现异步或同步的请求-回复消息交互。
示例
以下是一个使用 @MessagingGateway
发送消息到 MQTT 的完整示例。
-
配置消息网关和出站适配器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
public class MqttConfig {
public MessageChannel mqttOutboundChannel() {
return new DirectChannel(); // 定义输出通道
}
public MqttPahoMessageHandler mqttOutboundHandler() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientId", mqttClientFactory());
handler.setAsync(true);
handler.setDefaultQos(1);
handler.setDefaultTopic("default/topic");
handler.setOutputChannel(mqttOutboundChannel());
handler.setConverter(new DefaultPahoMessageConverter());
return handler;
}
public MqttClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://broker.hivemq.com:1883"});
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
} -
定义消息网关接口
1
2
3
4
5
6
7
public interface MqttGateway {
void sendToMqtt(String data);
void sendToMqtt(; String topic, String data)
} -
使用网关发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MqttService {
private MqttGateway mqttGateway;
public void publishMessage() {
// 发送到默认主题
mqttGateway.sendToMqtt("Hello MQTT!");
// 发送到指定主题
mqttGateway.sendToMqtt("custom/topic", "Message to custom topic");
}
}
请求-回复支持
如果接口方法有返回值,可以实现请求-回复模式。replyChannel
允许调用方通过指定的通道接收对应的响应消息。当需要从 replyChannel
找出与请求相关的回复时,通常依赖于消息的 correlationId
或者通过 MessageHeaders
来确保请求和回复的关联。
1 |
|
同步处理请求-回复
-
配置请求-回复通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RequestReplyConfig {
public MessageChannel requestChannel() {
return new DirectChannel();
}
public MessageChannel replyChannel() {
return new DirectChannel();
}
public MessageHandler requestHandler() {
return message -> {
// 模拟处理逻辑并发送响应
String payload = "Response to: " + message.getPayload();
Message<String> replyMessage = MessageBuilder.withPayload(payload)
.setHeader(MessageHeaders.REPLY_CHANNEL, message.getHeaders().getReplyChannel())
.setCorrelationId(message.getHeaders().getId())
.build();
((MessageChannel) message.getHeaders().getReplyChannel()).send(replyMessage);
};
}
} -
使用
MessagingTemplate
同步请求:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
public class RequestReplyService {
private final MessagingTemplate messagingTemplate;
private final MessageChannel requestChannel;
private final MessageChannel replyChannel;
public RequestReplyService(MessageChannel requestChannel, MessageChannel replyChannel) {
this.messagingTemplate = new MessagingTemplate();
this.requestChannel = requestChannel;
this.replyChannel = replyChannel;
}
public String sendAndReceive(String requestPayload) {
Message<String> requestMessage = MessageBuilder.withPayload(requestPayload)
.setReplyChannel(replyChannel)
.build();
// 使用 MessagingTemplate 发送请求并同步接收响应
Message<?> replyMessage = messagingTemplate.sendAndReceive(requestChannel, requestMessage);
return replyMessage != null ? replyMessage.getPayload().toString() : null;
}
}调用
sendAndReceive
方法时,MessagingTemplate
会自动等待replyChannel
的响应。
异步处理请求-回复
-
配置异步处理机制
如果需要异步处理请求和回复,可以通过
ExecutorChannel
或其他机制,将回复存储到一个映射中(例如Map
或CompletableFuture
)。1
2
3
4
5
6
7
8
9
public MessageChannel asyncReplyChannel() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(50);
executor.initialize();
return new ExecutorChannel(executor);
} -
使用
CompletableFuture
管理异步请求-回复1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class AsyncRequestReplyService {
private final Map<String, CompletableFuture<String>> futureMap = new ConcurrentHashMap<>();
private final MessageChannel requestChannel;
private final MessageChannel asyncReplyChannel;
public AsyncRequestReplyService(MessageChannel requestChannel, MessageChannel asyncReplyChannel) {
this.requestChannel = requestChannel;
this.asyncReplyChannel = asyncReplyChannel;
}
public CompletableFuture<String> sendRequest(String requestPayload) {
String correlationId = java.util.UUID.randomUUID().toString();
CompletableFuture<String> future = new CompletableFuture<>();
futureMap.put(correlationId, future);
Message<String> requestMessage = MessageBuilder.withPayload(requestPayload)
.setReplyChannel(asyncReplyChannel)
.setCorrelationId(correlationId)
.build();
requestChannel.send(requestMessage);
return future;
}
public void handleReply(Message<String> replyMessage) {
String correlationId = (String) replyMessage.getHeaders().getCorrelationId();
if (correlationId != null && futureMap.containsKey(correlationId)) {
CompletableFuture<String> future = futureMap.remove(correlationId);
if (future != null) {
future.complete(replyMessage.getPayload());
}
}
}
}使用
sendRequest
方法发送请求时,返回一个CompletableFuture
,当回复到达时,handleReply
方法会完成该CompletableFuture
。
生命周期
- 消息创建 消息通常通过
MessageBuilder
或直接通过其实现类(如GenericMessage
)进行创建。消息包含一个payload
(消息主体)和可选的headers
(消息头)。 - 消息发送 消息被发送到一个
MessageChannel
。Spring Integration 提供了多种消息通道实现,如DirectChannel
、QueueChannel
等。 - 路由消息 消息通过
MessageChannel
被路由到一个或多个消息处理器(Message Handler)。路由器(Router)可以根据消息头或消息体的内容将消息路由到不同的通道。 - 处理消息 消息处理器(Message Handler)接收消息并执行相应的处理逻辑。处理器可以是一个
@ServiceActivator
注解的方法,也可以是其他类型的处理器。 - 转换消息 消息转换器(Transformer)可以在消息处理过程中更改消息的格式或内容。
- 答复消息 某些消息处理器可能会生成一个新的消息作为响应,并将其发送到一个
replyChannel
。 - 结束 消息的生命周期在其被处理完成并且不再需要进一步处理时结束。这通常意味着消息已经被处理器消费,或者已经被成功路由到外部系统。
参考
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码适合的读者,略微了解SpringB - 掘金