Message is not sent to the error queue for retrying
stream: bindings: message-in-0: destination: mysubscriberqueue binder: some-binder group: somegroup consumer: pub-out-0: destination: myerrortopicname binder: some-binder group: somegroup retry-in-0: destination: myerrorqueuename binder: some-binder group: somegroup consumer: concurrency: 1
My usecase is -
I have subscibed message from mysubscriberqueue and after message is failed
i want the message to go to retry queue "myerrorqueuename" and from there i will
try to consume again later
Using the below bindings properties -
errorQueueNameOverride: "myerrorqueuename"
queueAdditionalSubscriptions: "myerrortopicname"
autoBindErrorQueue: true
errorQueueAccessType: 0
errorQueuePermission: 4
errorQueueRespectsMsgTtl: true
errorQueueMaxDeliveryAttempts: 1
errorQueueMaxMsgRedelivery: 1
errorMsgDmqEligible: true
errorQueueDiscardBehaviour: 1
"code snippet below"
inside mono disableAutoAck
.doOnNext(disableAutoAck())
On RuntimeException - rejecting the msg for retry and return mono.error
.onErrorResume((Throwable throwable) -> { rejectMessage().accept(message); return Mono.error(ClientException.builder .get() .withMessage("my error message" + message .getPayload() .getHeader() .getTransactionid()) .build());
I see the below stack trace always and the message does not go back to the retry queue which is "myerrorqueuename
" in my case -
reactor.core.publisher.MonoCallable.call(MonoCallable.java:92)at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:127)at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)at reactor.core.publisher.Mono.subscribe(Mono.java:4440)at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)at
{"timestamp":"2023-03-17 14:02:22.703","level":"ERROR","thread":"boundedElastic-44","logger":"reactor.core.publisher.Operators","message":{"input":"Operator called default onErrorDropped"},"context":"default","exception":"reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: my errorMsg\r\nCaused by: java.lang.RuntimeException: my errorMsg\r\n\tat com.myproject.notification.message.function.FunctionConfiguration.lambda$message$3(FunctionConfiguration.java:69)\r\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\r\n\tat reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onError(MonoSubscribeOn.java:152)\r\n\tat reactor.core.publisher.Operators.error(Operators.java:198)\r\n\tat reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:135)\r\n\tat reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)\r\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:4440)\r\n\tat reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)\r\n\tat
Please suggest if the above scenario should work with this configuration and why i do get above errors, thanks in advance.