Skip to main content
 首页 » 编程设计

spring-integration中Spring 集成 DSL Scatter-Gather 多个收件人流的异步/并行执行

2025年05月04日85虾米姐

我们正在尝试使用分散-聚集对不同的接收者进行并行调用,并且效果很好。但是,除非第一个收件人流程完成(在 Zipkin 中跟踪),否则第二个收件人流程不会启动。有没有一种方法可以使所有接收者异步。与执行者 channel 的拆分聚合非常相似。

public IntegrationFlow flow1() { 
 
        return flow -> flow 
                .split().channel(c -> c.executor(Executors.newCachedThreadPool())) 
                .scatterGather( 
                        scatterer -> scatterer 
                                .applySequence(true) 
                                .recipientFlow(flow2()) 
                                .recipientFlow(flow3()) 
                                .recipientFlow(flow4()) 
                                .recipientFlow(flow5()), 
                        gatherer -> gatherer 
                                .outputProcessor(messageGroup -> { 
                                    Object request = gatherResponse(messageGroup); 
                                    return createResponse(request); 
                                })) 
                .aggregate(); 
    } 

flow2()、flow3()、flow4() 方法是以 InterationFlow 作为返回类型的方法。

示例代码flow2():

public IntegrationFlow flow2() { 
        return integrationFlowDefinition -> integrationFlowDefinition 
                .enrichHeaders( 
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)) 
                .transform(ele -> createRequest1(ele))                   
                .wireTap("asyncXMLLogging") 
                .handle(wsGateway.applyAsHandler(endpoint1)) 
                .transform( 
                        ele -> response2(ele)); 
    } 

请您参考如下方法:

通过提到的执行器 channel 确实可以实现这一点。所有接收者流都必须真正从 ExecutorChannel 开始。在您的情况下,您必须将它们全部修改为如下所示:

public IntegrationFlow flow2() { 
    return IntegrationFlows.from(MessageChannels.executor(taskExexecutor())) 
            .enrichHeaders( 
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)) 
            .transform(ele -> createRequest1(ele))                   
            .wireTap("asyncXMLLogging") 
            .handle(wsGateway.applyAsHandler(endpoint1)) 
            .transform( 
                    ele -> response2(ele)) 
            .get(); 
} 

注意IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))。这正是使每个子流异步的方法。

更新

对于没有对子流程进行 IntegrationFlow 改进的旧 Spring Integration 版本,我们可以这样做:

public IntegrationFlow flow2() { 
    return integrationFlowDefinition -> integrationFlowDefinition 
            .channel(c -> c.executor(Executors.newCachedThreadPool())) 
            .enrichHeaders( 
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)) 
            .transform(ele -> createRequest1(ele))                   
            .wireTap("asyncXMLLogging") 
            .handle(wsGateway.applyAsHandler(endpoint1)) 
            .transform( 
                    ele -> response2(ele)); 
} 

这与您在上面的评论中显示的内容类似。