Sample code to publish a message to a topic using spring cloud stream api
Can someone please provide me a sample yml configuration used to publish a message to a topic using spring cloud stream api?
Best Answers
-
Sure @matthesu
Here you go! Note this is using theconsumer group
pattern in Spring Cloud Stream so it will create a queue calledqueue.group
that has a topic subscription oftopic/in
applied to it. So the app will process events coming in ontopic/in
and then they will be published totopic/out
.spring: cloud: stream: function: definition: uppercase bindings: uppercase-in-0: destination: queue group: group uppercase-out-0: destination: topic/out solace: bindings: uppercase-in-0: consumer: queueAdditionalSubscriptions: topic/in binders: solace-development: type: solace environment: solace: java: host: tcp://localhost:55555 msgVpn: default clientUsername: default clientPassword: default
5
Answers
-
Solace samples for SCS is hosted here: https://solace.com/samples/solace-samples-spring/spring-cloud-streams/
And here is one of the source application.yml https://github.com/SolaceSamples/solace-samples-spring/blob/master/cloud-streams-source/src/main/resources/application.yml where it is sending to the “sensor/temperature/fahrenheit” topic.
0 -
Thank you for your answer.
My requirement is little different. I am not using @EnableBinding(Source.class) . I am using a yml file similar to https://github.com/SolaceProducts/solace-spring-cloud/issues/7
I have multiple functions like process1, process2 which consumes messages from different queues. After processing the message by each function, it has to publish the corresponding response message to a topic which is dynamically created. So I need a common class which I can use to publish the message just by passing the topic name and the message dynamically. Is there any configuration which I can use similar to the way we can consume messages using function names?0 -
Hi @matthesu,
Great question! We've actually been working with the engineering team over at Spring to improve how Spring Cloud Stream (SCSt) handles publishing to dynamic destinations just over the past few months.There's going to be 2 ways of doing this:
1. [This works as of v3.0.5 of Spring Cloud Stream released at the end of May] Basically you would setup aConsumer
function and then send to dynamic topics by injecting aStreamBridge
object into your function and sending using that. StreamBridge is built into SCSt. and is smart enough to find the configured binder for you. It also caches a configurable number of channels using thespring.cloud.stream.dynamic-destination-cache-size
property which has a default of 10.@Bean Consumer<String> myConsumerFunction(StreamBridge streamBridge){ return input -> { String topic = getMyTopicUsingLogic(input); streamBridge.send(topic, input); }; }
- [This will be available in the next Solace Binder release] A new SCSt. header was reserved in v3.0.5. This header, "scst_targetDestination" , basically tells the binder (if the binder supports it) to ignore the default destination the function is bound to and instead send to the destination specified by the header. So once this is available you would essentially configure your SCSt. yml with a "default" destination and then override it for each message, or only when you need to. The benefit of this route is that the SCSt. framework doesn't create channels, cache them, etc so if you're always going to be sending to a different topic I would recommend going with this option once it's available.
@Bean public Function<Message<String>, Message<String>> functionName() { return input -> { String topic = getMyTopicUsingLogic(input); MessageBuilder<String> outboundMessage = MessageBuilder.withPayload("whatever").setHeader("scst_targetDestination", topic); }; }
Hope that helps! Once our binder supports this we'll be rolling out more examples around this as well
-Marc1 -
Thank you @marc for your response, I would like to use option 1 in your reply, but I am not able to find details about StreamBridge _object anywhere. I would like learn more about it. Any documentation or sample configuration and code about how to use it, is greatly appreciated. I am using maven dependency _spring-cloud-starter-stream-solace version 2.0.1.
I am not sure which version you were mentioning in your comment(v3.0.5)0 -
Hi @matthesu ,
Sorry about that. It's in v3.0.5+ of the Spring Cloud Stream framework itself. I don't see that Spring has added any samples yet, but you can see it used in some of the unit tests hereMore Info:
To get the Spring Cloud Stream that includes StreamBridge your pom should include something like this...Spring's versioning can definitely be confusing sometimes but my understanding is that the Spring Cloud umbrella version of Hoxton.SR5 should include the latest Spring Cloud Stream version of 3.0.5+
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.8.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <spring-cloud.version>Hoxton.SR5</spring-cloud.version> <solace-spring-cloud-bom.version>1.0.1</solace-spring-cloud-bom.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.solace.spring.cloud</groupId> <artifactId>solace-spring-cloud-bom</artifactId> <version>${solace-spring-cloud-bom.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.solace.spring.cloud</groupId> <artifactId>spring-cloud-starter-stream-solace</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
-Marc
0 -
Thank you @marc
Mean time I was trying to implement my project using imperative Function, can you please provide me a sample configuration yml file for the below sample application. I am not able to find a complete working application with its configuration
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> {
System.out.println("Received: " + value);
return value.toUpperCase()
};
}
}0 -
Sure @matthesu
Here you go! Note this is using theconsumer group
pattern in Spring Cloud Stream so it will create a queue calledqueue.group
that has a topic subscription oftopic/in
applied to it. So the app will process events coming in ontopic/in
and then they will be published totopic/out
.spring: cloud: stream: function: definition: uppercase bindings: uppercase-in-0: destination: queue group: group uppercase-out-0: destination: topic/out solace: bindings: uppercase-in-0: consumer: queueAdditionalSubscriptions: topic/in binders: solace-development: type: solace environment: solace: java: host: tcp://localhost:55555 msgVpn: default clientUsername: default clientPassword: default
5 -
Is there a complete sample unit test class for testing consumer group pattern in Spring Cloud Stream. The one which is available is partial an dis not able to run properly.
Basically I am looking for a complete unit test class for the sample app with a function _uppercase _which I mentioned above.0 -
@matthesu you want to actually test your code!?!?!
You're correct I don't see a great sample for this; once we figure it out I'll work to get it added. Following what they have in the docs I came up with the code below which seems to work for me if I have my local broker running. However, I haven't been able to get it to use the "new test binder which uses Spring Integration framework as an in-JVM Message Broker" they mentioned in the docs so let me know if you figure that part out! And also if this gives you what you want.
@Test public void sampleTest() { try (ConfigurableApplicationContext context = new SpringApplicationBuilder( TestChannelBinderConfiguration.getCompleteConfiguration(SampleApplication.class)).run( "--spring.cloud.function.definition=uppercase")) { InputDestination source = context.getBean(InputDestination.class); OutputDestination target = context.getBean(OutputDestination.class); source.send(new GenericMessage<byte[]>("hello".getBytes())); assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes()); } }
-Marc
0 -
Thanks @marc , I was using the same code for testing and was not able to make it run properly. Intellij is just giving "Error running myTestFunction" after trying to resolve maven dependencies. I am thinking it is something to do with the dependencies I am using Can you please share your pom.xml file?
Thanks a lot for your help0 -