Solace JCSMP Streaming - Code Sample

Options
slsbel
slsbel Member Posts: 13

Hello,

From the perspective of using Solace JCSMP with Apache Camel, I m working on a POC to read messages from a Solace Queue in streaming mode, (read 100 messages and Ack).

Could you please share with me a sample code, where we use JCSMP API to consume messages in streaming mode?

Thank you!

Comments

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    Options

    Hi @slsbel,

    Have you seen the SolaceSamples Github repo? For instance, QueueConsumer.java implements a streaming consumer. This will use the default persistent window, to change this to 100 (if you really need to do that), adjust theSUB_ACK_WINDOW_SIZE session property.

  • slsbel
    slsbel Member Posts: 13
    Options

    Thank you @TomF. That was very useful!

  • slsbel
    slsbel Member Posts: 13
    Options

    Hello @TomF

    What is the appropriate acknowledgment mode when enabling the streaming ?

    • is the following properties configuration correct (assuming the streaming window equals to 5)?
    JCSMPProperties jcsmpProperties = solaceJavaAutoConfiguration.getJCSMPProperties();
    jcsmpProperties.setProperty(JCSMPProperties.SUB_ACK_WINDOW_SIZE,5);
    jcsmpProperties.setProperty(JCSMPProperties.MESSAGE_ACK_MODE,JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT_WINDOWED);
    


    • If I'm using a streaming acknowledgment, do I need to specifically call the message.acknowledge() method? or the broker will delete the messages automatically when the service successfully consumes 5 messages?

    Thank you!

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    Options

    Hi @slsbel,

    Unless you have a good reason to change it, I would use the default for the SUB_ACK_WINDOW_SIZE. You should also set MESSAGE_ACK_MODE to SUPPORTED_MESSAGE_ACK_CLIENT, and only acknowledge messages when you've completely finished processing them.

    MESSAGE_ACK_MODE relates to publishers, not consumers.

  • slsbel
    slsbel Member Posts: 13
    Options

    Thanks for your reply.

    What I'm trying to achieve is to read messages from the queue in bulk. for example, I want to read 10 messages at a time, and then only after processing the 10 messages I will acknowledge the broker, so those messages will be removed from the queue.

    I think, setting the ack mode to SUPPORTED_MESSAGE_ACK_CLIENT, will delete the messages one after the other after invoking msg. acknowledge() method (that's more like a JMS way), and that's not what I'm looking for. I think JCSMP API allows me to only delete the messages after consuming the last message in the (SUB_ACK_WINDOW_SIZE) but I m not sure how to implement that.

    Thank you !

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    Options

    The API reads messages in bulk from the queue using a windowing mechanism similar to TCP. The API then presents the messages to the application one by one. When you call asknowledge(), the API also batches the acknowledgements back to the broker. This is all managed for you by the API.

    You can call acknowledgements in any order you like, and only the message you acknowledge will be deleted from the queue.

    Are you trying to optimise performance by doing batched read and acknowledge?

  • slsbel
    slsbel Member Posts: 13
    Options

    :) I'm not at the optimalisation level!

    I have a requirement where I should read exactly "n" first messages from the queue and process them, in one transaction, and finally, after successful processing, I will acknowledge the n messages (to remove them from the queue) and process another set of n messages...etc.

    do you think streaming and windowed ack is the way to go?

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    Options

    Hi @slsbel,

    Ah - you need to produce and consume a block of messages as a single atomic unit? In that case you probably need transactions :-)

    We have 2 forms of transaction support, local transactions and XA transactions. XA is only available in JMS. Have a look at local transactions (session based transactions). They enable you to consume a block of messages, then take a decision: acknowledge, in which case all are acked at once, or rollback, in which case all are left on the queue.