Solace spring cloud stream support

akg17
akg17 Member Posts: 76

Currently the project is making use of rabbit mq and message channels not functions i want to replace the rabbit mq with solace so what are the changes specific to solace.
Information carrying in message header
Re-processing from DLQ in SOLACE
feasibility and approach for duplicate handling
parallelism limits on Kafka vs SOLACE
Request mechanism for Queue creation in solace
Dynamic Routing feature support in SOLACE
Exchange type support in SOLACE
Need one sample yaml file for autobind dlq and what are the changes need to make to listen to this queue. Because i need one dlq created for each queue so if the message could not deliver and max attempts did not work then it should automatically go to dlq also do we need to create channels for this,

Best Answer

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 959 admin
    #2 Answer ✓

    Hi @akg17,

    That's a lot of info. In general the Solace binder supports the "Persistent Publish-Subscribe" and the "Consumer Group" patterns as defined by the Cloud Stream framework, so assuming you don't have any logic in your apps that was written specifically for Rabbit or Kafka the migration should be pretty smooth.

    I would start off by looking at the docs for the Solace Cloud Stream Binder here. Among other things, this page covers initial information around message headers, dynamic routing (StreamBridge or TARGET_DESTINATION headers), Failed Consumer Message Error Handling (Internal Solace DLQ, but also an Error Queue) as well as Consumer Concurrency which should tackle your parallelism approach.

    As far as your question around "Exchange Type support" in Solace I we don't have the concept of an "Exchange". In Solace a message is published to a topic (which is really just metadata on the message) and these topics can be subscribed to by Endpoints, usually a Queue Endpoint which holds those messages to be consumed. I would highly recommend reading up on Solace Topics and Queues in our docs (maybe start with Core Concepts) and/or checking out @Aaron's video here. It is important to spend the time to understand topics and how Solace dynamically routes them to get the most value out of your EDA in the long run so don't skip this part :)

    For "Request mechanism for Queue creation in solace" the Cloud Stream binder will automatically try to create the queue and apply the topic subscriptions by default. This is great in a development environment, but in pre-prod or production environments teams usually use SEMPv2 which is our RESTful management API to create the queues ahead of time. The binder then has flags (such as provisionDurableQueue) to tell the Cloud Stream apps to not try to re-create them.

    I would also check out our Cloud Stream Samples in this repo.

    That's a lot of info to get started but go ahead and dig in and let us know where you need help.

Answers

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 959 admin
    #3 Answer ✓

    Hi @akg17,

    That's a lot of info. In general the Solace binder supports the "Persistent Publish-Subscribe" and the "Consumer Group" patterns as defined by the Cloud Stream framework, so assuming you don't have any logic in your apps that was written specifically for Rabbit or Kafka the migration should be pretty smooth.

    I would start off by looking at the docs for the Solace Cloud Stream Binder here. Among other things, this page covers initial information around message headers, dynamic routing (StreamBridge or TARGET_DESTINATION headers), Failed Consumer Message Error Handling (Internal Solace DLQ, but also an Error Queue) as well as Consumer Concurrency which should tackle your parallelism approach.

    As far as your question around "Exchange Type support" in Solace I we don't have the concept of an "Exchange". In Solace a message is published to a topic (which is really just metadata on the message) and these topics can be subscribed to by Endpoints, usually a Queue Endpoint which holds those messages to be consumed. I would highly recommend reading up on Solace Topics and Queues in our docs (maybe start with Core Concepts) and/or checking out @Aaron's video here. It is important to spend the time to understand topics and how Solace dynamically routes them to get the most value out of your EDA in the long run so don't skip this part :)

    For "Request mechanism for Queue creation in solace" the Cloud Stream binder will automatically try to create the queue and apply the topic subscriptions by default. This is great in a development environment, but in pre-prod or production environments teams usually use SEMPv2 which is our RESTful management API to create the queues ahead of time. The binder then has flags (such as provisionDurableQueue) to tell the Cloud Stream apps to not try to re-create them.

    I would also check out our Cloud Stream Samples in this repo.

    That's a lot of info to get started but go ahead and dig in and let us know where you need help.

  • akg17
    akg17 Member Posts: 76

    @marc In doc its mentioned that your AC profile should allow the message to be republished in error queue what is the acl profile because i just tried it and got one exception

    403: Subscription ACL Denied - Queue '#P2P/QTMP/v:9fa142dd699e/scst/an/03c7328a-4dbb-4998-86dc-6b825c4b35b5/plain/txError' - Topic 'txError'
    at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.createErrorResponseFromSmpFailure

    =====================================================================================================
    o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds

    org.springframework.cloud.stream.provisioning.ProvisioningException: Failed to add subscription of txError to queue #P2P/QTMP/v:9fa142dd699e/scst/an/7762ecac-24e2-423d-8fd5-12c9f0d1e5f7/plain/txError; nested exception is ((Client name: IDMAWH-L254704/81092/#001e0001/PpFmUdxVvo Local addr: 127.0.0.1 Local port: 65192 Remote addr: localhost Remote port: 55555) - ) com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Subscription ACL Denied - Queue '#P2P/QTMP/v:9fa142dd699e/scst/an/7762ecac-24e2-423d-8fd5-12c9f0d1e5f7/plain/txError' - Topic 'txError' [Subcode:27]
    at com.solace.spring.cloud.stream.binder.provisioning.Solac

  • akg17
    akg17 Member Posts: 76

    Please find the log file

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 959 admin
    edited May 2021 #6

    Hi @akg17, it looks like the app is getting denied when trying to add the txError topic subscription to the queue. I'm assuming this is your spring.cloud.stream.bindings.<name-in-0>.destination value? The destination that you specify on the binding is used to create the queue name (per the queue naming convention) but it also is the first topic subscription added to the queue.

    Your client-username has an acl-profile assigned to it which allows for fine grained access control of the username. You can read more info about it here. And you can edit it in the PubSub+ Manager under the "Access Control" menu. Note the "Subscribe Default Action" setting at the top. If it's "Allow" then whatever topics you add below are what your client can't subscribe to. If it's "Disallow" then you'll want to add the ones you want the client to be able to subscribe to.

  • akg17
    akg17 Member Posts: 76

    403: Subscription ACL Denied - Queue '#P2P/QTMP/v:9fa142dd699e/scst/an/36b17c13-ad4a-4175-a1fa-ccbc44642a03/plain/txError' - Topic 'txError'

    @marc the queue name keep on changing ever
    y time i start should i add the txError or full queue name mentioned above even if i add full name still wont work because that is changing everytime. also there is somthing called smf

  • akg17
    akg17 Member Posts: 76
    edited May 2021 #8

    @marc Creating function uppercase supplies to uppercase-out-0 and consumer from uppercase-in-0
    When i have two different microservices where one produces to one topic and another consumes from the topic so i created topic one for each microservice so that the message can be exchange like send transaction from A -> B and then acknowledge from B->A back. Using channels it was easy for me how does the function work in this scenario.
    function name will be topic like transaction - A -supplier will produce to transaction-out-0 and B consumer function consumes from transaction-in-0 and when ack back B supplier sends back to transaction-out-0 but then this is also one cosumer service so i need to produce to another topic and consumer A will read from that topic. I am confused here how does this work in multiple microservices i believe each has its own topic and then subscribe to rest other,
    supply to topic transaction-out-0 and consumer from transaction-in-0 how does that match if supplier and consumer are two diff microservice.

  • akg17
    akg17 Member Posts: 76
    edited May 2021 #9

    Consumer function ->
    spring:
    cloud:
    stream:
    function:
    bindings:
    uppercase-in-0:
    destination: uppercase

    Supplier function->
    spring:
    cloud:
    stream:
    function:
    bindings:
    uppercase-out-0:
    destination: uppercase

    what changes need to match the destination so supplier produces to uppercase and consumer consumes from uppercase and FYI both are different microservices

    @Bean
    public Consumer<String> uppercase(){
        return v -> {
            System.out.println("Uppercasing: " + v);
        };
    }
    @Bean
    public Supplier<String> uppercase() {
        return ( )-> "hello world";
    }
    
  • akg17
    akg17 Member Posts: 76
    edited May 2021 #10

    default action should be allowed then only we will be able to publish subscribe without any restrictions (local project ) and i have 3 queues created where 2 works 1 does not because last one is error queue its created for autoBindErrorQueue

    bindings:
              output:
                consumer:
                  group: txGroup
                  provisionDurableQueue: true
                  autoBindErrorQueue: true
                  errorQueueNameOverride: txError
                  durableSubscription: true
    

    as you can see i am not able to subscribe to this error queue but when exception occurs in main queue the message was published to it i just can subscribe to it because of the error 403: Subscription ACL Denied - Queue 'scst/wk/txGroup/plain/txError' - Topic 'txError' but as i mentioned above default action is allowed but why then i cant subscribe to this error queue @marc @hong

    FYI messages are queued in error queue but there is 0 subscription ACl -access denied
    please let me know how to resolve this error.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 959 admin

    @akg17 said:
    Consumer function ->
    spring:
    cloud:
    stream:
    function:
    bindings:
    uppercase-in-0:
    destination: uppercase

    Supplier function->
    spring:
    cloud:
    stream:
    function:
    bindings:
    uppercase-out-0:
    destination: uppercase

    what changes need to match the destination so supplier produces to uppercase and consumer consumes from uppercase and FYI both are different microservices

    @Bean
    public Consumer<String> uppercase(){
        return v -> {
            System.out.println("Uppercasing: " + v);
        };
    }
    

    @Bean
    public Supplier uppercase() {
    return ( )-> "hello world";
    }

    Hi @akg17,
    In the Solace Cloud Stream binder it's important to keep in mind that it is always publishing to topics and consuming from queues. So your Supplier config above would publish to the uppercase topic and your Consumer config above would actually create a queue that subscribes to the uppercase topic. I would expect that your Consumer is actually receiving the events sent by your publisher.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 959 admin

    Hi @akg17,

    as you can see i am not able to subscribe to this error queue but when exception occurs in main queue the message was published to it i just can subscribe to it because of the error 403: Subscription ACL Denied - Queue 'scst/wk/txGroup/plain/txError' - Topic 'txError' but as i mentioned above default action is allowed but why then i cant subscribe to this error queue @marc @hong

    You lost me a bit! Can you elaborate on when this situation occurs? Are you saying that your app is indeed publishing failed messages to the txError queue successfully, but you can't get another app to listen to it? or are you getting this error in the app that is trying to publish the message to the error queue and it's not working?

  • akg17
    akg17 Member Posts: 76

    @marc I was able to publish and subscribe, in ACL profile i enabled default action allow, so i see the main queue was able to republish the message and i added the listener to subscribe, Probably i need look for upper regions how to create and assign profiles. Thanks.