Spring cloud stream

akg17
akg17 Member Posts: 76

In case when we siphon the message from an error queue we dont get from which queue it was thrown like rabbit does it by proving original queue name, also how to get all the headers i am using jmsTemplate or jndiTempated i see property to gather there is not separate method to just get all the headers, also there is method to get headers by property name but what if i dont know the property name and my only purpose it to get all headers and send it to somewhere else

Comments

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin

    Hi @akg17,
    Can you share what you are trying to use the original queue name for? That might help me (or others) come up with an idea for how to accomplish your goal. Right now the "destination" header contains the topic (or queue) the message was published to...so if you're talking about the error queue that the Cloud Stream binder can autobind and use then it would be publishing directly to that queue and the default queue naming pattern would be useable to figure out the original queue name as an example. There are other options as well though, you could also do custom error handling and add a custom property if desired. Another option would of course be maybe having an enhancement to the solace cloud stream binder that adds the header as well :)

    If you want access to the headers just specify that you want to receive the Message object (org.springframework.messaging.Message) itself. Once you have that object you can then access all the headers using the getHeaders()method.

    For example:

        @Bean
        public Function<Message<String>, String> uppercase() {
            return v -> {
                // Get Payload
                            System.out.println("Uppercasing: " + v.getPayload());
                            // Get all headers
                System.out.println("Headers: " + v.getHeaders());
                            // Get specific header "marc"
                System.out.println("destination: " + v.getHeaders().get("marc"));
                return v.getPayload().toUpperCase();
            };
    
  • akg17
    akg17 Member Posts: 76

    2021-05-21 15:57:14.237 INFO 14880 --- [pool-5-thread-1] c.s.s.c.s.b.u.SolaceErrorMessageHandler : Processing message 1ccb2a8f-f483-3e53-de8a-f2d1b21d2f89
    2021-05-21 15:57:14.238 INFO 14880 --- [pool-5-thread-1] backFactory$JCSMPAcknowledgementCallback : XMLMessage 1: Will be republished onto error queue output_dlq
    2021-05-21 15:57:14.238 INFO 14880 --- [pool-5-thread-1] .s.b.u.ErrorQueueRepublishCorrelationKey : Republishing XMLMessage 1 to error queue output_dlq - attempt 1 of 3

  • akg17
    akg17 Member Posts: 76

    Here when the exception occurs in main queue and it republish the message to dlq it is sending in xmlMessage,
    i have simple use case - (Siphon And Reprocess Message)
    1. Read Message from the queue not using listener but using any consumer need help how to do that, tried couple of ways
    like making use of jmsTemplate and also making use of solace consumer piece of code.
    ```
    InitialContext initialContext = null;
    SolSession session = null;
    session = (SolSession) solConnection.createSession(true, 0);
    session.start();
    initialContext = (InitialContext) jndiTemplate.getContext();
    Queue dlqQueue = (Queue) initialContext.lookup(dlq);
    MessageConsumer consumer = session.createConsumer(dlqQueue);
    TextMessage textMessage = (TextMessage) consumer.receive();

    In both scenario i was not able to case to spring Message as you mentioned  to get all headers, then i had to case to javax.jms
    even if i manage to get all headers, still the problem is with the message type that i am not able to read its of type ByteMessage error - java.lang.AbstractMethodError: com.solacesystems.jms.message.SolBytesMessage.getBody(Ljava/lang/Class;)Ljava/lang/Object;
    And here i am stuck i have to save this message along with headers to data base that could be used for reprocessing, 
    2. Reprocessing - I need to drop the same message back to original queue that is why i asked you about anyway if i manage 
    to get original queue name from dlq name i need to set all headers and drop message. I tried 
          ```
      jmsTemplate.convertAndSend(getOriginalQueueName("output_dlq"),
                    message.getBody(InputStream.class),
                    m -> {
                        Collections.list(message.getPropertyNames()).forEach(key -> {
                            try {
                                m.setStringProperty(key.toString(), message.getStringProperty(key.toString()));
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        });
    
                        return m;
                    }); 
    
      private String getOriginalQueueName(String dlqName) {
            return dlqName.substring(0, dlqName.length() - 4);
        }
    

    I am just stuck with this, any other info do you need.

  • akg17
    akg17 Member Posts: 76

    Simple use case is Siphoning and Reprocessing of Message - (FYI used spring cloud Stream)
    1. As you see above when exception was thrown from original queue the message was republished to an error queue, need to read the message from the queue not using listener but manually creating consumer. Tried couple of ways using jmsTemplate and also using solace consumer

            InitialContext initialContext = null;
            SolSession session = null;
            session = (SolSession) solConnection.createSession(true, 0);
            session.start();
            initialContext = (InitialContext) jndiTemplate.getContext();
            Queue dlqQueue = (Queue) initialContext.lookup("output_dlq");
            MessageConsumer consumer = session.createConsumer(dlqQueue);
            TextMessage message = (TextMessage) consumer.receive();
    

    once i get the message i need to save the message payload and all headers to DB for reprocessing purpose later,
    2. Reprocessing- I can get the original queue name from the dlq naming convention and then i need to set all header and payload back to Message and drop it to original queue. Something like

            jmsTemplate.convertAndSend(getOriginalQueueName("output_dlq"),
                    message.getBody(String.class), // tried converting to pojo, string, etc 
                    m -> {
                        Collections.list(message.getPropertyNames()).forEach(key -> {
                            try {
                                m.setStringProperty(key.toString(), message.getStringProperty(key.toString()));
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        });
    
                        return m;
                    });
       private String getOriginalQueueName(String dlqName) {
            return dlqName.substring(0, dlqName.length() - 4);
        }
    

    I am just trying to figure out the approach to accomplish this with solace, The problem i got using above code is when consumed message using consumer and try to read and drop i got errors because message is in byte format
    java.lang.AbstractMethodError: com.solacesystems.jms.message.SolBytesMessage.getBody(Ljava/lang/Class;)Ljava/lang/Object;

  • akg17
    akg17 Member Posts: 76

    @marc can you please have a look here,