Solace spring cloud stream support
Currently the project is making use of rabbit mq and message channels not functions i want to replace the rabbit mq with solace so what are the changes specific to solace.
Information carrying in message header
Re-processing from DLQ in SOLACE
feasibility and approach for duplicate handling
parallelism limits on Kafka vs SOLACE
Request mechanism for Queue creation in solace
Dynamic Routing feature support in SOLACE
Exchange type support in SOLACE
Need one sample yaml file for autobind dlq and what are the changes need to make to listen to this queue. Because i need one dlq created for each queue so if the message could not deliver and max attempts did not work then it should automatically go to dlq also do we need to create channels for this,
Best Answer
-
Hi @akg17,
That's a lot of info. In general the Solace binder supports the "Persistent Publish-Subscribe" and the "Consumer Group" patterns as defined by the Cloud Stream framework, so assuming you don't have any logic in your apps that was written specifically for Rabbit or Kafka the migration should be pretty smooth.
I would start off by looking at the docs for the Solace Cloud Stream Binder here. Among other things, this page covers initial information around message headers, dynamic routing (StreamBridge or TARGET_DESTINATION headers), Failed Consumer Message Error Handling (Internal Solace DLQ, but also an Error Queue) as well as Consumer Concurrency which should tackle your parallelism approach.
As far as your question around "Exchange Type support" in Solace I we don't have the concept of an "Exchange". In Solace a message is published to a topic (which is really just metadata on the message) and these topics can be subscribed to by Endpoints, usually a Queue Endpoint which holds those messages to be consumed. I would highly recommend reading up on Solace Topics and Queues in our docs (maybe start with Core Concepts) and/or checking out @Aaron's video here. It is important to spend the time to understand topics and how Solace dynamically routes them to get the most value out of your EDA in the long run so don't skip this part
For "Request mechanism for Queue creation in solace" the Cloud Stream binder will automatically try to create the queue and apply the topic subscriptions by default. This is great in a development environment, but in pre-prod or production environments teams usually use SEMPv2 which is our RESTful management API to create the queues ahead of time. The binder then has flags (such as provisionDurableQueue) to tell the Cloud Stream apps to not try to re-create them.
I would also check out our Cloud Stream Samples in this repo.
That's a lot of info to get started but go ahead and dig in and let us know where you need help.
1
Answers
-
Hi @akg17,
That's a lot of info. In general the Solace binder supports the "Persistent Publish-Subscribe" and the "Consumer Group" patterns as defined by the Cloud Stream framework, so assuming you don't have any logic in your apps that was written specifically for Rabbit or Kafka the migration should be pretty smooth.
I would start off by looking at the docs for the Solace Cloud Stream Binder here. Among other things, this page covers initial information around message headers, dynamic routing (StreamBridge or TARGET_DESTINATION headers), Failed Consumer Message Error Handling (Internal Solace DLQ, but also an Error Queue) as well as Consumer Concurrency which should tackle your parallelism approach.
As far as your question around "Exchange Type support" in Solace I we don't have the concept of an "Exchange". In Solace a message is published to a topic (which is really just metadata on the message) and these topics can be subscribed to by Endpoints, usually a Queue Endpoint which holds those messages to be consumed. I would highly recommend reading up on Solace Topics and Queues in our docs (maybe start with Core Concepts) and/or checking out @Aaron's video here. It is important to spend the time to understand topics and how Solace dynamically routes them to get the most value out of your EDA in the long run so don't skip this part
For "Request mechanism for Queue creation in solace" the Cloud Stream binder will automatically try to create the queue and apply the topic subscriptions by default. This is great in a development environment, but in pre-prod or production environments teams usually use SEMPv2 which is our RESTful management API to create the queues ahead of time. The binder then has flags (such as provisionDurableQueue) to tell the Cloud Stream apps to not try to re-create them.
I would also check out our Cloud Stream Samples in this repo.
That's a lot of info to get started but go ahead and dig in and let us know where you need help.
1 -
@marc In doc its mentioned that your AC profile should allow the message to be republished in error queue what is the acl profile because i just tried it and got one exception
403: Subscription ACL Denied - Queue '#P2P/QTMP/v:9fa142dd699e/scst/an/03c7328a-4dbb-4998-86dc-6b825c4b35b5/plain/txError' - Topic 'txError'
at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.createErrorResponseFromSmpFailure=====================================================================================================
o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 secondsorg.springframework.cloud.stream.provisioning.ProvisioningException: Failed to add subscription of txError to queue #P2P/QTMP/v:9fa142dd699e/scst/an/7762ecac-24e2-423d-8fd5-12c9f0d1e5f7/plain/txError; nested exception is ((Client name: IDMAWH-L254704/81092/#001e0001/PpFmUdxVvo Local addr: 127.0.0.1 Local port: 65192 Remote addr: localhost Remote port: 55555) - ) com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Subscription ACL Denied - Queue '#P2P/QTMP/v:9fa142dd699e/scst/an/7762ecac-24e2-423d-8fd5-12c9f0d1e5f7/plain/txError' - Topic 'txError' [Subcode:27]
at com.solace.spring.cloud.stream.binder.provisioning.Solac0 -
Hi @akg17, it looks like the app is getting denied when trying to add the
txError
topic subscription to the queue. I'm assuming this is yourspring.cloud.stream.bindings.<name-in-0>.destination
value? Thedestination
that you specify on the binding is used to create the queue name (per the queue naming convention) but it also is the first topic subscription added to the queue.Your
client-username
has anacl-profile
assigned to it which allows for fine grained access control of the username. You can read more info about it here. And you can edit it in the PubSub+ Manager under the "Access Control" menu. Note the "Subscribe Default Action" setting at the top. If it's "Allow" then whatever topics you add below are what your client can't subscribe to. If it's "Disallow" then you'll want to add the ones you want the client to be able to subscribe to.0 -
403: Subscription ACL Denied - Queue '#P2P/QTMP/v:9fa142dd699e/scst/an/36b17c13-ad4a-4175-a1fa-ccbc44642a03/plain/txError' - Topic 'txError'
@marc the queue name keep on changing ever
y time i start should i add the txError or full queue name mentioned above even if i add full name still wont work because that is changing everytime. also there is somthing called smf0 -
@marc Creating function uppercase supplies to uppercase-out-0 and consumer from uppercase-in-0
When i have two different microservices where one produces to one topic and another consumes from the topic so i created topic one for each microservice so that the message can be exchange like send transaction from A -> B and then acknowledge from B->A back. Using channels it was easy for me how does the function work in this scenario.
function name will be topic like transaction - A -supplier will produce to transaction-out-0 and B consumer function consumes from transaction-in-0 and when ack back B supplier sends back to transaction-out-0 but then this is also one cosumer service so i need to produce to another topic and consumer A will read from that topic. I am confused here how does this work in multiple microservices i believe each has its own topic and then subscribe to rest other,
supply to topic transaction-out-0 and consumer from transaction-in-0 how does that match if supplier and consumer are two diff microservice.0 -
Consumer function ->
spring:
cloud:
stream:
function:
bindings:
uppercase-in-0:
destination: uppercaseSupplier function->
spring:
cloud:
stream:
function:
bindings:
uppercase-out-0:
destination: uppercasewhat changes need to match the destination so supplier produces to uppercase and consumer consumes from uppercase and FYI both are different microservices
@Bean public Consumer<String> uppercase(){ return v -> { System.out.println("Uppercasing: " + v); }; } @Bean public Supplier<String> uppercase() { return ( )-> "hello world"; }
0 -
default action should be allowed then only we will be able to publish subscribe without any restrictions (local project ) and i have 3 queues created where 2 works 1 does not because last one is error queue its created for autoBindErrorQueue
bindings: output: consumer: group: txGroup provisionDurableQueue: true autoBindErrorQueue: true errorQueueNameOverride: txError durableSubscription: true
as you can see i am not able to subscribe to this error queue but when exception occurs in main queue the message was published to it i just can subscribe to it because of the error 403: Subscription ACL Denied - Queue 'scst/wk/txGroup/plain/txError' - Topic 'txError' but as i mentioned above default action is allowed but why then i cant subscribe to this error queue @marc @hong
FYI messages are queued in error queue but there is 0 subscription ACl -access denied
please let me know how to resolve this error.0 -
@akg17 said:
Consumer function ->
spring:
cloud:
stream:
function:
bindings:
uppercase-in-0:
destination: uppercaseSupplier function->
spring:
cloud:
stream:
function:
bindings:
uppercase-out-0:
destination: uppercasewhat changes need to match the destination so supplier produces to uppercase and consumer consumes from uppercase and FYI both are different microservices
@Bean public Consumer<String> uppercase(){ return v -> { System.out.println("Uppercasing: " + v); }; }
@Bean
public Supplier uppercase() {
return ( )-> "hello world";
}Hi @akg17,
In the Solace Cloud Stream binder it's important to keep in mind that it is always publishing to topics and consuming from queues. So yourSupplier
config above would publish to theuppercase
topic and yourConsumer
config above would actually create a queue that subscribes to theuppercase
topic. I would expect that your Consumer is actually receiving the events sent by your publisher.0 -
Hi @akg17,
as you can see i am not able to subscribe to this error queue but when exception occurs in main queue the message was published to it i just can subscribe to it because of the error 403: Subscription ACL Denied - Queue 'scst/wk/txGroup/plain/txError' - Topic 'txError' but as i mentioned above default action is allowed but why then i cant subscribe to this error queue @marc @hong
You lost me a bit! Can you elaborate on when this situation occurs? Are you saying that your app is indeed publishing failed messages to the
txError
queue successfully, but you can't get another app to listen to it? or are you getting this error in the app that is trying to publish the message to the error queue and it's not working?0