Spring cloud bus

akg17
akg17 Member Posts: 76
edited February 2022 in General Discussions #1
spring:
  cloud:
    bus:
      enabled : true

When i enable it, It is creating 2 queues like springCloudBus & error/springCloudBus ( ErrorQueue).

I can change the queue name using spring.cloud.bus.destination = "" but have not control over the error queue, infact i dont want error but no way to stop it.

I cant disable the provisioning of the error queue.

I cant disable the provisioning of the main queue also.

Is there any way i can only create main queue not error queue ?


Is there any dependency for solace spring cloud bus ? i need some kind of better understanding how to use it with solace.



when i disable it it get this error



2022-02-01 15:49:21.896 ERROR 7732 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler  : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@7f452153]; nested exception is com.solace.spring.cloud.stream.binder.util.SolaceMessageConversionException: Invalid payload received. Expected byte[], String, SDTStream, SDTMap, Serializable. Received: org.springframework.boot.actuate.endpoint.web.EndpointServlet, failedMessage=GenericMessage [payload=org.springframework.boot.actuate.endpoint.web.EndpointServlet@3750c564, headers={id=9df21161-7029-1f97-ed6e-bee787e0f7d4, timestamp=1643710761895}]

at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)

at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79)

at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)

at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)

at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)

at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)

at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)

at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)

at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)

at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)

at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)

at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)

at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)

at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)

at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)

at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)

at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:445)

at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:429)

at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:377)

at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:324)

at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)

at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)

at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)

at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:321)

at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)

at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: com.solace.spring.cloud.stream.binder.util.SolaceMessageConversionException: Invalid payload received. Expected byte[], String, SDTStream, SDTMap, Serializable. Received: org.springframework.boot.actuate.endpoint.web.EndpointServlet

at com.solace.spring.cloud.stream.binder.util.XMLMessageMapper.map(XMLMessageMapper.java:110)

at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessage(JCSMPOutboundMessageHandler.java:90)

at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1050)

at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)

... 40 more

Comments

  • amackenzie
    amackenzie Member, Employee Posts: 262 Solace Employee

    There is a configfuration option in the Solace binder called provisionErrorQueue (solace-spring-cloud/solace-spring-cloud-starters/solace-spring-cloud-stream-starter at master · SolaceProducts/solace-spring-cloud (github.com)).

    Does that help?

    I have never tried Spring Cloud Bus, but the docs say that any binder on the classpath can be used for the destinations. So I would assume you would bring in the Solace Spring Cloud Stream Starter to your project.

    Spring Cloud Bus

  • akg17
    akg17 Member Posts: 76

    @amackenzie provisionErrorQueue i know this property but in case of spring cloud bus there is kind of problems all the producer and consumer property works but ...

    I cant provide errorQueueNameOverride it takes by default error/destination

    I cant configure provisionErrorQueue to false myself, luckily i have set spring.cloud.stream.solace.default.provisionErrorQueue to false so it worked.

    its a direct destination name not binding name if i had binding name i would have all the controls whatever i want but.

    spring:
      cloud:
        bus:
          destination : springCloudBus
    

    It creates 2 queue - springCloudBus & error/springCloudBus

    I don't want error queue How do i stop it ?

    previously i had rabbitmq and it does not auto create error queue for spring cloud bus,

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin
    edited February 2022 #4

    Hi @akg17,

    I haven't personally used Spring Cloud Bus, but I know there are some folks that have used it successfully before. Is there anything else in your config file? It's weird to me that the error queue would be created by default as the autoBindErrorQueue setting defaults to false . I think we basically just need to make sure that it's set to false for that specific binding

    See if your app on startup has a log entry that looks something like this. I believe that part at the end, which says "springCloudBus-in-0" in this case, is the name of your binding.

    2022-02-01 12:32:37.068 INFO 48169 --- [ restartedMain] o.s.i.monitor.IntegrationMBeanExporter  : Registering MessageChannel springCloudBus-in-0
    

    Take the binding name and add this to your config, using your binding name:

    spring.cloud.stream.solace.bindings.springCloudBus-in-0.consumer.autoBindErrorQueue = false
    


    Hope that works for you!

  • akg17
    akg17 Member Posts: 76

    Thanks Mark -

    Bindings are -

    MessageChannel - springCloudBusOutput
    Subscr. Channel - springCloudBusInput
    


    I can configure things and destination also,

  • akg17
    akg17 Member Posts: 76

    Small update needed does solace jms api now support JMS 2.0

    I need to use getBody(Class) method which would consume the byteMessage and return me the payload.

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 531 admin

    Hey @akg17 ! We hosted a talk for our Solace Community Lightning Talks a while back and one of the talks were about using Spring Cloud Bus with Solace. Feel free to check it out below if it answers any of your questions or give you some ideas.


  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin

    Hi @akg17 ,

    I can configure things and destination also,

    Excellent - glad to hear it!

    Small update needed does solace jms api now support JMS 2.0

    I need to use getBody(Class) method which would consume the byteMessage and return me the payload.

    The Solace JMS API is still only JMS 1.1 compliant right now. Can you share more info around what you're having issues with? You should be able to get the msg payload.

    *Note that the Solace Spring Cloud Stream Binder does NOT use JMS.

  • akg17
    akg17 Member Posts: 76
    edited February 2022 #9

    I can get msg payload, I am consuming byte message which was published by solace-spring-cloud-stream.

    Once i consume it i need to convert it myself from byte to String because solace does not implement Jms 2.0 getBody Method.

    #Message.getBody(CLASS) that is not implemented by solace, Its from Jms 2.0.

    This is how i am currently consuming it.

    javax.jms.Message rawMessage = jmsTemplate.receive(jndiName);
    
    Map<String, Object> headers = new HashMap<>();
    if (rawMessage instanceof BytesMessage) {
        SolBytesMessage solMessage = (SolBytesMessage) rawMessage;
        byte[] bytes = new byte[(int) solMessage.getBodyLength()];
        solMessage.readBytes(bytes);
        solMessage.getProperties().forEach((k, v) -> {
                headers.put(k, v);
        });
    String payload = new String(bytes);
    
  • amackenzie
    amackenzie Member, Employee Posts: 262 Solace Employee

    It's not that Solace doesn't implement a JMS2.0 method, it is that the Solace Spring Cloud Stream binder is not written in JMS, it's written in the native Solace API called JCSMP.

  • akg17
    akg17 Member Posts: 76


    One application use Jms (JmsTemplate & JmsListener) , One use Spring cloud stream .

    In my case I need to consumer a TextMessage coming, using stream listener and also I need to publish the message that would be again byte Message and another system has to consume it using JmsListener expecting TextMessage.

    In between these 2 system message format is different how to publish/consume each others message,

    I need to use Jms for inbound/outbound calls or is there any other way. ?

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin

    Hi @akg17,

    Okay, so I think what is happening is as follows (@amackenzie please correct me if I'm wrong ;) ):

    You have a JMS App (App 1) sending a message, which is received and processed by a Spring Cloud Stream app (App 2) which then publishes a new message. That new message is being received by another JMS app (App 3). Correct?

    The Spring Cloud Stream app is publishing a BytesMessage which the Broker is then delivering to the JMS app (App 3) as a SolBytesMessage. It is not a JMS TextMessage, which is why it can't be read as such. . Because of that I think the code you have looks correct, the payload of that SolBytesMessage would be read and then converted to a String for your app to process. Will that not work for you?

    If for some reason you can't change the app that is expecting a JMS TextMessage then you will need to explicitly publish a JMS Text Message using our JMS API. To do that you could either write a pure JMS app instead of using Cloud Stream or you could even receive using Cloud Stream + Send via JMS. Here is an example of an app that actually does that.

    Hope that helps!

  • akg17
    akg17 Member Posts: 76

    You got it right,

    I have to consume a TextMessage - currently using JmsListener

    And I Have to produce a Text Message that is also done by JmsTemplate.

    But in between all apps are spring cloud stream processing message.


    I thought if using spring cloud stream i can consume and publish text message i think it wont work because its publishing and consuming byte message so for external communication i will have to use Jms.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin
    edited February 2022 #14

    I thought if using spring cloud stream i can consume and publish text message i think it wont work because its publishing and consuming byte message so for external communication i will have to use Jms.

    Correct, JMS it is :)

    That's tricky when you're trying to change the thing in the middle. Not much flexibility.

  • akg17
    akg17 Member Posts: 76
    edited February 2022 #15

    I have One question mark .

    If i create a topic name Test and 2 Queue Test1, Test2

    both queue subscribe to Test topic end point.

    Will each queue get replica of message if i publish to topic ?

    what can i do where each queue or consumer get replica of published message.

    within same group only one get it.


    I want 3 microservice to consume to same message from same queue, like replica.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin

    Hi @akg17 ,

    Yes, that's actually the recommended way to implement the publish-subscribe pattern when using guaranteed endpoints. Have your publishers send messages to a topic. Have each consumer (or group of consumers) bind to a queue. And add your topic subscription(s) to each queue. Then when a message is published to a topic it will get put onto every queue that is subscribed to that topic. So in your case both queues will get the same message.