我注意到 Spring Cloud Stream 响应式(Reactive)和非响应式(Reactive)行为的差异。当我使用非响应式(Reactive)方法时,消息会被一一读取。以下是流监听器的代码片段:
@StreamListener(Processor.INPUT)
public void receive2(String input) throws InterruptedException {
Thread.sleep(2000);
System.out.println(input.toUpperCase());
}
这段代码一条一条地消费消息。我可以从 RabbitMQ 管理站点看到这一点:
重启后,应用程序继续从重启前完成的位置消耗消息。但是使用响应式(Reactive)流监听器服务会立即从队列中获取所有消息。应用程序重新启动后,它不会继续处理消息,因为队列为空。
这里是 react 流监听器的片段:
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<String> input) {
input
.delayElements(Duration.ofSeconds(2))
.map(String::toUpperCase)
.doOnEach(System.out::println)
.subscribe();
}
我想了解为什么会发生这种情况,以及是否可以在处理前面的元素之前停止读取整个队列。这是我对 Reactor 运算符的错误理解还是预期的行为?是否可以以某种方式指定背压?
请您参考如下方法:
由于响应式(Reactive)是非阻塞的; delayElements() 将工作交给另一个线程;这会释放监听器线程,该线程返回到容器并确认消息;因此下一个已交付(并延迟);因此,所有消息最终都会进入调度程序的队列。响应式确实不适合这种用例,除非您使用手动确认,如下所示。
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<Message<String>> input) {
input
.doOnEach(System.out::println)
.delayElements(Duration.ofSeconds(2))
.map(m -> {
return MessageBuilder.withPayload(m.getPayload().toUpperCase())
.copyHeaders(m.getHeaders())
.build();
})
.doOnEach(System.out::println)
.doOnNext(m -> {
try {
m.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
.basicAck(m.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
}
catch (IOException e) {
e.printStackTrace();
}
})
.subscribe();
}
和
spring.cloud.stream.bindings.input.group=foo
spring.cloud.stream.rabbit.bindings.input.consumer.acknowledge-mode=manual

