现代企业应用中的Spring Integration

是什么

Spring Integration Reference Guide

Spring Integration 提供了 Spring 编程模型的扩展,它支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。

Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。

Spring Integration 支持消息驱动的体系结构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件与基础设施进一步隔离,开发人员可以减轻复杂的集成职责。

什么促使了 Spring Integration 的诞生

  1. 复杂的应用集成需求 在现代企业应用中,应用系统应用场景复杂,需要进行互相通信和数据交互。
  2. 标准化的集成模型 基于 EIP (Enterprise Integration Patterns),Spring Integration 带来了一系列标准化的构件,便于应用程序快速构建。
  3. 便利的 Spring 介入支持 Spring 充分利用了自带的配置管理和 AOP 功能,使得消息驱动算法和功能以一致方式进行配置和扩展。
  4. 前端和后端集成需求 Spring Integration 提供了多样化的连接器,轻松完成与实现 REST, WebSocket, MQTT, Kafka 等通信协议的集成。

核心概念

Message

Message :: Spring Integration

Message Channels

Message Channels :: Spring Integration

Message Transformer

用于转换消息的内容(Payload)或头信息(Headers)。它的主要作用是将消息从一种格式或结构转换为另一种,以满足下游组件或系统的需求。

示例

Transformer 通常通过实现 GenericTransformer 接口或使用注解 @Transformer 来定义

通过 @Transformer 注解定义:

1
2
3
4
5
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "outputChannel")
public GenericTransformer<String, String> upperCaseTransformer() {
return String::toUpperCase;
}

使用 Lambda 定义:

1
2
3
4
5
6
7
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlows.from("inputChannel")
.transform((String payload) -> payload + " - Transformed")
.channel("outputChannel")
.get();
}

Message Filter

用于过滤消息。它根据指定的条件(通过 ExpressionPredicate)检查传入的消息,如果消息符合条件,则将其传递到下一个通道;否则,消息会被丢弃。消息过滤器通常与发布-订阅通道结合使用,其中多个消费者可能会收到相同的消息,可以使用过滤器来缩小要处理的消息集范围。

示例

基于表达式的过滤:

1
2
3
4
5
6
7
8
9
@Bean
public IntegrationFlow filterFlow() {
return IntegrationFlows
.from("inputChannel") // 输入通道
.filter("headers['type'] == 'important'", // 仅通过头部为 'important' 的消息
spec -> spec.discardChannel("discardChannel")) // 丢弃的消息进入 discardChannel
.channel("outputChannel") // 输出通道
.get();
}

基于自定义逻辑的过滤:

1
2
3
4
5
6
7
8
9
@Bean
public IntegrationFlow customFilterFlow() {
return IntegrationFlows
.from("inputChannel")
.filter((String payload) -> payload.contains("valid"), // 仅通过包含 "valid" 的消息
spec -> spec.throwExceptionOnRejection(true)) // 如果消息被丢弃,则抛出异常
.channel("outputChannel")
.get();
}

基于注解定义过滤器:

1
2
3
4
5
6
7
8
@Component
public class FilterProcessor {

@Filter(inputChannel = "filterInputChannel", outputChannel = "outputChannel")
public boolean filterMessage(String message) {
return message.startsWith("valid");
}
}

Message Router

用于决定接下来应该接收消息的一个或多个通道。通常,该决定基于消息的内容或消息头中可用的元数据。它类似于一个决策点,可以动态地决定消息的流向。

使用场景

  • 消息分发:根据消息类型、内容等条件,将消息分发到不同的处理流程。
  • 动态负载:根据业务逻辑动态选择通道,减少静态配置的复杂性。
  • 业务流程编排:在复杂的流程中,根据条件动态调整流程。

示例

根据消息体内容进行路由:

1
2
3
4
5
6
7
@Bean
public IntegrationFlow payloadRouterFlow() {
return IntegrationFlows.from("inputChannel")
.route(String.class,
payload -> payload.contains("error") ? "errorChannel" : "normalChannel")
.get();
}

使用注解定义路由器:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class CustomRouter {

@Router(inputChannel = "inputChannel")
public String routeMessage(String message) {
if (message.contains("important")) {
return "importantChannel";
} else {
return "normalChannel";
}
}
}

Service Activator

用于将消息从输入通道转发到具体的业务逻辑处理器,并将处理结果发送到输出通道。

使用场景

  • 处理从某个通道接收到的消息。
  • 转发处理后的消息到其他通道。
  • 实现简单的消息过滤、转换或处理。

示例

以下示例展示如何使用 @ServiceActivator 处理 MQTT 消息。当 mqttInputChannel 收到消息时,Spring Integration 会调用 mqttMessageHandler,传递消息给业务逻辑进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttIntegrationConfig {

@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel(); // 定义一个输入通道
}

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler mqttMessageHandler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic", String.class);
String payload = (String) message.getPayload();
System.out.println("Received message: " + payload + " from topic: " + topic);
};
}
}

如果需要将处理后的消息发送到另一个通道,可以指定输出通道:

1
2
3
4
5
@ServiceActivator(inputChannel = "mqttInputChannel", outputChannel = "processedOutputChannel")
public String processMessage(String message) {
System.out.println("Processing message: " + message);
return message.toUpperCase(); // 返回处理结果
}

Message Gateway

用于创建消息网关(Messaging Gateway)。它将消息发送到 Spring Integration 的消息通道,同时允许我们以接口的形式调用消息流,从而在应用程序中实现解耦和更高的可读性。

  1. 提供接口调用的入口
    使用 @MessagingGateway 定义的接口是消息流的入口点,调用接口方法即可向消息通道发送消息。接口的具体由 GatewayProxyFactoryBean 创建动态代理对象实现。
  2. 实现业务逻辑与消息流的解耦
    应用程序调用接口的方法,不需要直接处理消息通道的具体细节。
  3. 简化配置
    自动将接口与消息通道绑定,无需手动配置复杂的 Spring Integration 流程。
  4. 支持请求-回复模式
    可以轻松实现异步或同步的请求-回复消息交互。

示例

以下是一个使用 @MessagingGateway 发送消息到 MQTT 的完整示例。

  1. 配置消息网关和出站适配器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;

    @Configuration
    public class MqttConfig {

    @Bean
    public MessageChannel mqttOutboundChannel() {
    return new DirectChannel(); // 定义输出通道
    }

    @Bean
    public MqttPahoMessageHandler mqttOutboundHandler() {
    MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientId", mqttClientFactory());
    handler.setAsync(true);
    handler.setDefaultQos(1);
    handler.setDefaultTopic("default/topic");
    handler.setOutputChannel(mqttOutboundChannel());
    handler.setConverter(new DefaultPahoMessageConverter());
    return handler;
    }

    @Bean
    public MqttClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[]{"tcp://broker.hivemq.com:1883"});
    options.setCleanSession(true);
    factory.setConnectionOptions(options);
    return factory;
    }
    }
  2. 定义消息网关接口

    1
    2
    3
    4
    5
    6
    7
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {

    void sendToMqtt(String data);

    void sendToMqtt(@Header("mqtt_topic") String topic, String data);
    }
  3. 使用网关发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Service
    public class MqttService {

    @Autowired
    private MqttGateway mqttGateway;

    public void publishMessage() {
    // 发送到默认主题
    mqttGateway.sendToMqtt("Hello MQTT!");

    // 发送到指定主题
    mqttGateway.sendToMqtt("custom/topic", "Message to custom topic");
    }
    }

请求-回复支持

如果接口方法有返回值,可以实现请求-回复模式。replyChannel 允许调用方通过指定的通道接收对应的响应消息。当需要从 replyChannel 找出与请求相关的回复时,通常依赖于消息的 correlationId 或者通过 MessageHeaders 来确保请求和回复的关联。

1
2
3
4
@MessagingGateway(defaultRequestChannel = "requestChannel", defaultReplyChannel = "replyChannel")
public interface RequestReplyGateway {
String sendAndReceive(String data);
}

同步处理请求-回复

  1. 配置请求-回复通道

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Configuration
    public class RequestReplyConfig {

    @Bean
    public MessageChannel requestChannel() {
    return new DirectChannel();
    }

    @Bean
    public MessageChannel replyChannel() {
    return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler requestHandler() {
    return message -> {
    // 模拟处理逻辑并发送响应
    String payload = "Response to: " + message.getPayload();
    Message<String> replyMessage = MessageBuilder.withPayload(payload)
    .setHeader(MessageHeaders.REPLY_CHANNEL, message.getHeaders().getReplyChannel())
    .setCorrelationId(message.getHeaders().getId())
    .build();
    ((MessageChannel) message.getHeaders().getReplyChannel()).send(replyMessage);
    };
    }
    }
  2. 使用 MessagingTemplate 同步请求:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import org.springframework.integration.core.MessagingTemplate;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;

    @Service
    public class RequestReplyService {

    private final MessagingTemplate messagingTemplate;
    private final MessageChannel requestChannel;
    private final MessageChannel replyChannel;

    public RequestReplyService(MessageChannel requestChannel, MessageChannel replyChannel) {
    this.messagingTemplate = new MessagingTemplate();
    this.requestChannel = requestChannel;
    this.replyChannel = replyChannel;
    }

    public String sendAndReceive(String requestPayload) {
    Message<String> requestMessage = MessageBuilder.withPayload(requestPayload)
    .setReplyChannel(replyChannel)
    .build();
    // 使用 MessagingTemplate 发送请求并同步接收响应
    Message<?> replyMessage = messagingTemplate.sendAndReceive(requestChannel, requestMessage);
    return replyMessage != null ? replyMessage.getPayload().toString() : null;
    }
    }

    调用 sendAndReceive 方法时,MessagingTemplate 会自动等待 replyChannel 的响应。

异步处理请求-回复

  1. 配置异步处理机制

    如果需要异步处理请求和回复,可以通过 ExecutorChannel 或其他机制,将回复存储到一个映射中(例如 MapCompletableFuture)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Bean
    public MessageChannel asyncReplyChannel() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(20);
    executor.setQueueCapacity(50);
    executor.initialize();
    return new ExecutorChannel(executor);
    }
  2. 使用 CompletableFuture 管理异步请求-回复

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Service
    public class AsyncRequestReplyService {

    private final Map<String, CompletableFuture<String>> futureMap = new ConcurrentHashMap<>();
    private final MessageChannel requestChannel;
    private final MessageChannel asyncReplyChannel;

    public AsyncRequestReplyService(MessageChannel requestChannel, MessageChannel asyncReplyChannel) {
    this.requestChannel = requestChannel;
    this.asyncReplyChannel = asyncReplyChannel;
    }

    public CompletableFuture<String> sendRequest(String requestPayload) {
    String correlationId = java.util.UUID.randomUUID().toString();
    CompletableFuture<String> future = new CompletableFuture<>();
    futureMap.put(correlationId, future);

    Message<String> requestMessage = MessageBuilder.withPayload(requestPayload)
    .setReplyChannel(asyncReplyChannel)
    .setCorrelationId(correlationId)
    .build();
    requestChannel.send(requestMessage);

    return future;
    }

    @ServiceActivator(inputChannel = "asyncReplyChannel")
    public void handleReply(Message<String> replyMessage) {
    String correlationId = (String) replyMessage.getHeaders().getCorrelationId();
    if (correlationId != null && futureMap.containsKey(correlationId)) {
    CompletableFuture<String> future = futureMap.remove(correlationId);
    if (future != null) {
    future.complete(replyMessage.getPayload());
    }
    }
    }
    }

    使用 sendRequest 方法发送请求时,返回一个 CompletableFuture,当回复到达时,handleReply 方法会完成该 CompletableFuture

生命周期

  1. 消息创建 消息通常通过 MessageBuilder 或直接通过其实现类(如 GenericMessage)进行创建。消息包含一个 payload (消息主体)和可选的 headers (消息头)。
  2. 消息发送 消息被发送到一个 MessageChannel。Spring Integration 提供了多种消息通道实现,如 DirectChannelQueueChannel 等。
  3. 路由消息 消息通过 MessageChannel 被路由到一个或多个消息处理器(Message Handler)。路由器(Router)可以根据消息头或消息体的内容将消息路由到不同的通道。
  4. 处理消息 消息处理器(Message Handler)接收消息并执行相应的处理逻辑。处理器可以是一个 @ServiceActivator 注解的方法,也可以是其他类型的处理器。
  5. 转换消息 消息转换器(Transformer)可以在消息处理过程中更改消息的格式或内容。
  6. 答复消息 某些消息处理器可能会生成一个新的消息作为响应,并将其发送到一个 replyChannel
  7. 结束 消息的生命周期在其被处理完成并且不再需要进一步处理时结束。这通常意味着消息已经被处理器消费,或者已经被成功路由到外部系统。

参考

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码适合的读者,略微了解SpringB - 掘金