Failed to consume a message from destination Queue

Kaliappans
Kaliappans Member Posts: 24
edited April 2023 in PubSub+ Event Broker #1

I am using Spring cloud stream 2021.0.2 to consume messages from queue, when some messages are sent , I can see the message get spooled continously in the queue but It is not coming inside application and processed because of below exception. Please help


org.springframework.messaging.MessagingException: Incorrect type specified for header 'deliveryAttempt'. Expected [class java.util.concurrent.atomic.AtomicInteger] but actual type is [class java.lang.String]; nested exception is java.lang.IllegalArgumentException: Incorrect type specified for header 'deliveryAttempt'. Expected [class java.util.concurrent.atomic.AtomicInteger] but actual type is [class java.lang.String]


023-04-13 16:21:39.086 WARN 27352 --- [pool-3-thread-5] s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination QUE_173707_PLATFORM_EVENT_LOGGING_DEV - attempt 3
2023-04-13 16:21:39.086 WARN 27352 --- [pool-3-thread-5] s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination QUE_173707_PLATFORM_EVENT_LOGGING_DEV - attempt 3
2023-04-13 16:21:39.086 WARN 27352 --- [pool-3-thread-5] s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination QUE_173707_PLATFORM_EVENT_LOGGING_DEV - attempt 3
2023-04-13 16:21:39.086 WARN 27352 --- [pool-3-thread-5] s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination QUE_173707_PLATFORM_EVENT_LOGGING_DEV - attempt 3
2023-04-13 16:21:39.087 WARN 27352 --- [pool-3-thread-5] s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination QUE_173707_PLATFORM_EVENT_LOGGING_DEV - attempt 3
2023-04-13 16:21:39.087 ERROR 27352 --- [pool-3-thread-5] o.s.integration.handler.LoggingHandler  : org.springframework.messaging.MessagingException: Incorrect type specified for header 'deliveryAttempt'. Expected [class java.util.concurrent.atomic.AtomicInteger] but actual type is [class java.lang.String]; nested exception is java.lang.IllegalArgumentException: Incorrect type specified for header 'deliveryAttempt'. Expected [class java.util.concurrent.atomic.AtomicInteger] but actual type is [class java.lang.String]


Answers

  • Kaliappans
    Kaliappans Member Posts: 24

    FYI.. we are NOT setting value for 'deliveryAttempt' anywhere within our microservice

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 652 admin

    Hi @Kaliappans , that's interesting. Looking quickly at the code for the binder (which is here: https://github.com/SolaceProducts/solace-spring-cloud) I can see a couple references to deliveryAttempt that definitely make it look like it's expecting an AtomicInteger. Who/what is publishing these messaages into your queue?

    Can you dump one of the messages to the console and post it here (hiding any sensitive data if necessary!). Easiest way to do this using SdkPerf, which you can download from https://solace.com/downloads/?fwp_downloads_types=other I'd choose Java, then just run it with something that looks like this for a basic consumer that echoes to the console:

    ./sdkperf_java.sh -cip=brokerip -cu=username@vpnname -cp=password -sql=queuename -md
    

    "sql" is which queue to bind to, and "md" is message dump.


  • marc
    marc Member, Administrator, Moderator, Employee Posts: 966 admin
    edited April 2023 #4

    Hi @Kaliappans,

    This is essentially your Spring app saying "I am getting an invalid message per the constraints that I have set." It expects a certain POJO but can't convert the incoming payload into it. SO maybe your publisher and consumer are using different objects, different versions of the same object, or have different constraints?

    FYI - the Spring Cloud Stream reference guide talks about how you can add custom message converters if that is necessary: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#content-type-management

    Hope that helps!

  • Kaliappans
    Kaliappans Member Posts: 24

    @Aaron I tried with SDKperf, but getting below error when executing your command. We are using truststore and keystore for authentication purpose. Should I change anything in that command ?


    com.solacesystems.pubsub.sdkperf.core.PubSubException: Error while connecting.

        at com.solacesystems.pubsub.sdkperf.core.AbstractClientCollection.connect(AbstractClientCollection.java:473)

        at com.solacesystems.pubsub.sdkperf.SDKPerf_java.run(SDKPerf_java.java:208)

        at com.solacesystems.pubsub.sdkperf.SDKPerf_java.main(SDKPerf_java.java:119)

    Caused by: ((Client name: H17NDCFI13P0120/3676/00110001/GxGUIAu0EF  Local addr: 10.8.233.95 Local port: 56369  Remote addr: 10.47.158.91 Remote port: 55555) - ) com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Basic Authentication Is Shutdown [Subcode:58]

        at com.solacesystems.jcsmp.protocol.impl.TcpChannel.executePostOnce(TcpChannel.java:235)

        at com.solacesystems.jcsmp.protocol.impl.ChannelOpStrategyClient.performOpen(ChannelOpStrategyClient.java:101)

        at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.performOpenSingle(TcpClientChannel.java:428)

        at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.access$800(TcpClientChannel.java:124)

        at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel$ClientChannelConnect.call(TcpClientChannel.java:2610)

        at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.open(TcpClientChannel.java:404)

        at com.solacesystems.jcsmp.impl.JCSMPBasicSession.sniffRouter(JCSMPBasicSession.java:423)

        at com.solacesystems.jcsmp.impl.JCSMPBasicSession.getMessageProducer(JCSMPBasicSession.java:772)

        at com.solacesystems.pubsub.sdkperf.jcsmpcore.JcsmpClient.connect(JcsmpClient.java:1252)

        at com.solacesystems.pubsub.sdkperf.core.AbstractClientCollection.connect(AbstractClientCollection.java:471)

  • Kaliappans
    Kaliappans Member Posts: 24

    @Aaron @marc I am using Message datatype from springframwork to consume message at the receiving end. That Message class has only two fields, payload and headers. Below is the data, please let me know if anything is wrong here


    Payload: {"httpResponse":200,"responsePayload":{"messageId":"123456789","cancelCard":[{"datastoreId":"12533789","vcaId":"111111","createCardRequest":false}],"correlationId":"1111-2222-3333-4444"}}



    headers: {app_accept-encoding=gzip, deflate, br, country=USA, req-sys-id=12533789, solace_replyTo=#P2P/v:solace-tts1-cti-us-1u/8G29FQx5/service-name/209/00250002/xL5yCzgPq-/_, endDateTimestamp=2023-04-18_17:59:27:748+0530, app_connection=keep-alive, app_channelid=Citi VCA Online, app_user-agent=PostmanRuntime/7.28.1, target-protocol=kafka, app_host=hostname, threadId=65, jvmId=25000, app_postman-token=ba2a0d0c-ad6e-4d2b-b0e6-0efc77, solace_discardIndication=false, source-type=kafka, id=84ede132-c7e1-6ac5-73c1, product-name=card, api-sys-id=ebb3-4d7b-9201-1f9a8350281c, contentType=text/plain;charset=UTF-8, listenerTopicName=citi/tts/cards/choreo/mvca/datastore/v1/resp/cancelvca, order=null, App_ServiceFlowCount=5, solace_scst_messageVersion=1, solace_expiration=0, deliveryAttempt=1, solace_replicationGroupMessageId=rmid1:0a57b-5a7f5257557-00000000-f02ba02c, startDateTimestamp=2023-04-18_17:59:27:743+0530, versionDetails={"description":"description","tag":"tag","version":"0.0.0"}, listenerQueueName=DEV-QUEUE, solace_receiveTimestamp=0, timeDuration=5, stacktrace=, app_accept=*/*, region=NAM, status=SUCCESS, publishTopicName=#P2P/v:solace-tts1-cti-us-1u/8G29FQx5/service-name/209/00250002/xL5yCzgPq-/_, solace_destination=topic-name, app_content-length=210, app_api-req-id=123456789, App_OriginalRequestPayload={"messageId":"123456789","correlationId":"ebb3-4d7b-9201","clientId":"TestClient","cardNetworkType":"Backend","region":"NAM","country":"USA","getCardImage":false,"requestType":"CANCEL","channelId":"Online","citiScreeningRequired":false,"createCardRequest":false,"errorMessage":[],"cancelCard":[{"datastoreId":"0123a","vcaId":"123456","createCardRequest":false,"errorMessage":[]}],"validationSuccess":true}, skip-input-type-conversion=false, app_content-type=application/json, sourceHost=H17NFNFT13P0394, solace_dmqEligible=true, solace_priority=4, solace_correlationId=0b309a51-ebb3-4d7b-9201-1f9a8350281c, applicationName=Test-app, exceptionMessage=, timestamp=1681820967744, solace_timeToLive=0, x-citiportal-apim-client-id=mVCA_Test, binderName=choreography, acknowledgmentCallback=com.solace.spring.cloud.stream.binder.util.JCSMPAcknowledgementCallbackFactory$JCSMPAcknowledgementCallback@414f6f6, environment=local, Backend={}, singularityheader=appId=1303*ctrlguid=1681618009*acctguid=3d227243-46a9-4474-bc47-0ab43e91e045*ts=1681820967755*btname=MessageHandler.handleMessage*btcomp=2465728*bttype=POJO*bttypestr=POJO*mctype=auto*mcvalue=*guid=439d96bb-31a9-483b-b35f-cb976fc730df*exitguid=23*unresolvedexitid=8921447*cidfrom=2465728*etypeorder=CUSTOM*esubtype=CUSTOM*cidto={[UNRESOLVED][8921447]}*donotresolve=true, solace_redelivered=false}

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 652 admin

    Hi @Kaliappans. For SdkPerf with truststore, please see this recent thread: https://solace.community/discussion/comment/5902#Comment_5902

    In your list of headers, I can see deliveryAttempt:

    solace_scst_messageVersion=1,
    solace_expiration=0,
    deliveryAttempt=1,
    solace_replicationGroupMessageId=rmid1:0a57b-5a7f5257557-00000000-f02ba02c,
    startDateTimestamp=2023-04-18_17:59:27:743+0530,
    

    However I still can't tell if it's a string or int since nothing is quoted.

    Who / what kind of app is publishing this data?

    It would still be very helpful if you can get an SdkPerf dump of the message contents.