Skip to main content
 首页 » 编程设计

spring中响应式Spring Cloud Stream从队列中读取所有数据,但非响应式则一条一条地读取消息

2025年12月25日23soundcode

我注意到 Spring Cloud Stream 响应式(Reactive)和非响应式(Reactive)行为的差异。当我使用非响应式(Reactive)方法时,消息会被一一读取。以下是流监听器的代码片段:

@StreamListener(Processor.INPUT) 
public void receive2(String input) throws InterruptedException { 
    Thread.sleep(2000); 
    System.out.println(input.toUpperCase()); 
} 

这段代码一条一条地消费消息。我可以从 RabbitMQ 管理站点看到这一点:

enter image description here

重启后,应用程序继续从重启前完成的位置消耗消息。但是使用响应式(Reactive)流监听器服务会立即从队列中获取所有消息。应用程序重新启动后,它不会继续处理消息,因为队列为空。

这里是 react 流监听器的片段:

@StreamListener 
public void receive1(@Input(Processor.INPUT) Flux<String> input) { 
    input 
            .delayElements(Duration.ofSeconds(2)) 
            .map(String::toUpperCase) 
            .doOnEach(System.out::println) 
            .subscribe(); 
} 

enter image description here

我想了解为什么会发生这种情况,以及是否可以在处理前面的元素之前停止读取整个队列。这是我对 Reactor 运算符的错误理解还是预期的行为?是否可以以某种方式指定背压?

请您参考如下方法:

由于响应式(Reactive)是非阻塞的; delayElements() 将工作交给另一个线程;这会释放监听器线程,该线程返回到容器并确认消息;因此下一个已交付(并延迟);因此,所有消息最终都会进入调度程序的队列。响应式确实不适合这种用例,除非您使用手动确认,如下所示。

@StreamListener 
public void receive1(@Input(Processor.INPUT) Flux<Message<String>> input) { 
    input 
            .doOnEach(System.out::println) 
            .delayElements(Duration.ofSeconds(2)) 
            .map(m -> { 
                return MessageBuilder.withPayload(m.getPayload().toUpperCase()) 
                        .copyHeaders(m.getHeaders()) 
                        .build(); 
            }) 
            .doOnEach(System.out::println) 
            .doOnNext(m -> { 
                try { 
                    m.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class) 
                        .basicAck(m.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false); 
                } 
                catch (IOException e) { 
                    e.printStackTrace(); 
                } 
            }) 
            .subscribe(); 
} 

spring.cloud.stream.bindings.input.group=foo 
spring.cloud.stream.rabbit.bindings.input.consumer.acknowledge-mode=manual