Message is not sent to the error queue for retrying

gds1174
gds1174 Member Posts: 2
edited March 2023 in General Discussions #1
stream:
  bindings:
    message-in-0:
      destination: mysubscriberqueue
      binder: some-binder
      group: somegroup
      consumer:
pub-out-0:
  destination: myerrortopicname
  binder: some-binder
  group: somegroup
retry-in-0:
  destination: myerrorqueuename
  binder: some-binder
  group: somegroup
  consumer:
    concurrency: 1

My usecase is -

I have subscibed message from mysubscriberqueue and after message is failed

i want the message to go to retry queue "myerrorqueuename" and from there i will

try to consume again later


Using the below bindings properties -

errorQueueNameOverride: "myerrorqueuename"

queueAdditionalSubscriptions: "myerrortopicname"

autoBindErrorQueue: true

errorQueueAccessType: 0

errorQueuePermission: 4

errorQueueRespectsMsgTtl: true

errorQueueMaxDeliveryAttempts: 1

errorQueueMaxMsgRedelivery: 1

errorMsgDmqEligible: true

errorQueueDiscardBehaviour: 1


"code snippet below"

inside mono disableAutoAck

.doOnNext(disableAutoAck())

On RuntimeException - rejecting the msg for retry and return mono.error

.onErrorResume((Throwable throwable) -> {
    rejectMessage().accept(message);
    return Mono.error(ClientException.builder
        .get()
        .withMessage("my error message" + message
            .getPayload()
            .getHeader()
            .getTransactionid())
        .build());


I see the below stack trace always and the message does not go back to the retry queue which is "myerrorqueuename

" in my case -

reactor.core.publisher.MonoCallable.call(MonoCallable.java:92)at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:127)at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)at reactor.core.publisher.Mono.subscribe(Mono.java:4440)at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)at 


{"timestamp":"2023-03-17 14:02:22.703","level":"ERROR","thread":"boundedElastic-44","logger":"reactor.core.publisher.Operators","message":{"input":"Operator called default onErrorDropped"},"context":"default","exception":"reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: my errorMsg\r\nCaused by: java.lang.RuntimeException: my errorMsg\r\n\tat com.myproject.notification.message.function.FunctionConfiguration.lambda$message$3(FunctionConfiguration.java:69)\r\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\r\n\tat reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onError(MonoSubscribeOn.java:152)\r\n\tat reactor.core.publisher.Operators.error(Operators.java:198)\r\n\tat reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:135)\r\n\tat reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)\r\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:4440)\r\n\tat reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)\r\n\tat 


Please suggest if the above scenario should work with this configuration and why i do get above errors, thanks in advance.

Comments

  • gds1174
    gds1174 Member Posts: 2

    Ok so i have reconfigured my queues on solace and the msg processing is now successful , i also faced the issue - Publish ACL Denied - Topic '#P2P/QUE/myQueue'"},

    which i have resolved after adding the topic name into exceptions.

    The retry issue is resolved.

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 538 admin

    Thanks for your update @gds1174 !