Sample code to publish a message to a topic using spring cloud stream api

matthesu
matthesu Unconfirmed, Member Posts: 13

Can someone please provide me a sample yml configuration used to publish a message to a topic using spring cloud stream api?

Best Answers

  • matthesu
    matthesu Unconfirmed, Member Posts: 13
    #3 Answer ✓

    Thank you @marc . It is working for me now.

Answers

  • arih
    arih Member, Employee Posts: 125 Solace Employee

    Solace samples for SCS is hosted here: https://solace.com/samples/solace-samples-spring/spring-cloud-streams/

    And here is one of the source application.yml https://github.com/SolaceSamples/solace-samples-spring/blob/master/cloud-streams-source/src/main/resources/application.yml where it is sending to the “sensor/temperature/fahrenheit” topic.

  • matthesu
    matthesu Unconfirmed, Member Posts: 13

    Thank you for your answer.
    My requirement is little different. I am not using @EnableBinding(Source.class) . I am using a yml file similar to https://github.com/SolaceProducts/solace-spring-cloud/issues/7
    I have multiple functions like process1, process2 which consumes messages from different queues. After processing the message by each function, it has to publish the corresponding response message to a topic which is dynamically created. So I need a common class which I can use to publish the message just by passing the topic name and the message dynamically. Is there any configuration which I can use similar to the way we can consume messages using function names?

  • arih
    arih Member, Employee Posts: 125 Solace Employee

    I'm not aware on how to generate a common class like that. My other sample are using Supplier but still has has the topic name defined in the app yaml file.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin
    edited June 2020 #7

    Hi @matthesu,
    Great question! We've actually been working with the engineering team over at Spring to improve how Spring Cloud Stream (SCSt) handles publishing to dynamic destinations just over the past few months.

    There's going to be 2 ways of doing this:
    1. [This works as of v3.0.5 of Spring Cloud Stream released at the end of May] Basically you would setup a Consumer function and then send to dynamic topics by injecting a StreamBridge object into your function and sending using that. StreamBridge is built into SCSt. and is smart enough to find the configured binder for you. It also caches a configurable number of channels using the spring.cloud.stream.dynamic-destination-cache-size property which has a default of 10.

        @Bean Consumer<String> myConsumerFunction(StreamBridge streamBridge){
            return input -> {
                    String topic = getMyTopicUsingLogic(input);
                streamBridge.send(topic, input);
            };
        }
    
    1. [This will be available in the next Solace Binder release] A new SCSt. header was reserved in v3.0.5. This header, "scst_targetDestination" , basically tells the binder (if the binder supports it) to ignore the default destination the function is bound to and instead send to the destination specified by the header. So once this is available you would essentially configure your SCSt. yml with a "default" destination and then override it for each message, or only when you need to. The benefit of this route is that the SCSt. framework doesn't create channels, cache them, etc so if you're always going to be sending to a different topic I would recommend going with this option once it's available.
              @Bean
                public Function<Message<String>, Message<String>> functionName() {
                            return input -> {
                                        String topic = getMyTopicUsingLogic(input);
                                        MessageBuilder<String> outboundMessage = MessageBuilder.withPayload("whatever").setHeader("scst_targetDestination",
                                                                topic);
                            };
                }
    

    Hope that helps! Once our binder supports this we'll be rolling out more examples around this as well :)
    -Marc

  • matthesu
    matthesu Unconfirmed, Member Posts: 13

    Thank you @marc for your response, I would like to use option 1 in your reply, but I am not able to find details about StreamBridge _object anywhere. I would like learn more about it. Any documentation or sample configuration and code about how to use it, is greatly appreciated. I am using maven dependency _spring-cloud-starter-stream-solace version 2.0.1.
    I am not sure which version you were mentioning in your comment(v3.0.5)

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin
    edited June 2020 #9

    Hi @matthesu ,
    Sorry about that. It's in v3.0.5+ of the Spring Cloud Stream framework itself. I don't see that Spring has added any samples yet, but you can see it used in some of the unit tests here

    More Info:

    To get the Spring Cloud Stream that includes StreamBridge your pom should include something like this...Spring's versioning can definitely be confusing sometimes but my understanding is that the Spring Cloud umbrella version of Hoxton.SR5 should include the latest Spring Cloud Stream version of 3.0.5+

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.8.RELEASE</version>
            <relativePath /> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <spring-cloud.version>Hoxton.SR5</spring-cloud.version>
            <solace-spring-cloud-bom.version>1.0.1</solace-spring-cloud-bom.version>
        </properties>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
                <dependency>
                    <groupId>com.solace.spring.cloud</groupId>
                    <artifactId>solace-spring-cloud-bom</artifactId>
                    <version>${solace-spring-cloud-bom.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <dependencies>
            <dependency>
                <groupId>com.solace.spring.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-solace</artifactId>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    

    -Marc

  • matthesu
    matthesu Unconfirmed, Member Posts: 13

    When is the next Solace Binder release planned for?

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin

    Hi @matthesu,
    I believe it will be released sometime next week. It's Canada Day so our R&D team, which is based in Ottawa, is out of the office so I'll be able to get a more specific date for you soon :)
    -Marc

  • matthesu
    matthesu Unconfirmed, Member Posts: 13

    Thank you @marc
    Mean time I was trying to implement my project using imperative Function, can you please provide me a sample configuration yml file for the below sample application. I am not able to find a complete working application with its configuration
    @SpringBootApplication
    public class SampleApplication {
    public static void main(String[] args) {
    SpringApplication.run(SampleApplication.class, args);
    }
    @Bean
    public Function<String, String> uppercase() {
    return value -> {
    System.out.println("Received: " + value);
    return value.toUpperCase()
    };
    }
    }

  • matthesu
    matthesu Unconfirmed, Member Posts: 13

    Thank you very much @marc . It worked as expected :smiley:

  • matthesu
    matthesu Unconfirmed, Member Posts: 13

    Is there a complete sample unit test class for testing consumer group pattern in Spring Cloud Stream. The one which is available is partial an dis not able to run properly.
    Basically I am looking for a complete unit test class for the sample app with a function _uppercase _which I mentioned above.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin

    @matthesu you want to actually test your code!?!?! :D

    You're correct I don't see a great sample for this; once we figure it out I'll work to get it added. Following what they have in the docs I came up with the code below which seems to work for me if I have my local broker running. However, I haven't been able to get it to use the "new test binder which uses Spring Integration framework as an in-JVM Message Broker" they mentioned in the docs so let me know if you figure that part out! And also if this gives you what you want.

        @Test
        public void sampleTest() {
            try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
                    TestChannelBinderConfiguration.getCompleteConfiguration(SampleApplication.class)).run(
                            "--spring.cloud.function.definition=uppercase")) {
                InputDestination source = context.getBean(InputDestination.class);
                OutputDestination target = context.getBean(OutputDestination.class);
                source.send(new GenericMessage<byte[]>("hello".getBytes()));
                assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes());
            }
        }
    

    -Marc

  • matthesu
    matthesu Unconfirmed, Member Posts: 13

    Thanks @marc , I was using the same code for testing and was not able to make it run properly. Intellij is just giving "Error running myTestFunction" after trying to resolve maven dependencies. I am thinking it is something to do with the dependencies I am using Can you please share your pom.xml file?
    Thanks a lot for your help

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin

    Hi @matthesu,
    I believe I was using this pom when I tested, but you might want to actually switch the versions to the latest which should be:

        <properties>
            <spring-cloud.version>Hoxton.SR6</spring-cloud.version>
            <solace-spring-cloud-bom.version>1.0.1</solace-spring-cloud-bom.version>
        </properties>
    
  • matthesu
    matthesu Unconfirmed, Member Posts: 13
    #19 Answer ✓

    Thank you @marc . It is working for me now.