We are struggling with the performance of a pub/sub test fixture that publishes to a solace broker at a fixed rate. We expect our consumer to consume the same rate. We were able to publish and subscribe at a rate of 1000 messages per second. However when we tried to publish 2000 three hundred byte XML messages per second (to a topical endpoint), we saw the broker receive that data, but the broker queue begin to fill.
In the following picture we see more than a million messages queued at the broker, due to the inability for our consumer to keep up.
We’ve also tried to call receive with a timeout. However we want receive to return immediately if no data is available (yet a timeout of 0 seems to block).
We’ve configured the consumer thread to read everything that is available as fast as possible. We will only have the thread sleep when receiveNoWait returns no data.
receiveNoWait seems to return null frequently even when there are still messages in the broker topical endpoint.
We are have configured the broker using default configurations consistent with online instructions for setting up solace.
We’d appreciate some insights by solace experts. Can you point us to consumer and producer java code that is designed to handle loads approaching 10,000 msg per second
Hi @Inder_Singh, let’s take this one at a time. First, let’s get to the bottom of the performance problem. We in Solace always run our performance validation tool, sdkperf, first. This eliminates any problems in the code, so you can concentrate on working out if there’s a problem in the broker set up. Download sdkperf and find more details at SDKPerf. To get the full performance out of the broker you’ll need to use multiple producers and consumers across multiple topics and endpoints, but you should be able to get 2k msgs/sec without needing to do this.
Once you’ve got sdkperf working with a simple send and receive, let’s talk about what you’re trying to do. You are sending in delivery mode DIRECT messages, and having them land on a topic endpoint, correct? I’m not sure this is the configuration you want, it’s a fairly odd one. Do you want the messages to be persisted - can you tolerate any message loss? If you can’t tolerate any message loss, send the messages with delivery mode PERSISTENT and check you get an acknowledgement for every message. If you can tolerate message loss, then you don’t need the topic endpoint. What you probably need is a subscription. If you turn on message dump in sdkperf (-md), you’ll see that your current configuration results in NON-PERSISTENT messages. This delivery mode, and topic endpoints, were created for compatibility with JMS. In normal use, you’d use a queue, DIRECT and PERSISTENT - no topic endpoints on NON-PERSISTENT at all. From what you’ve described, I suspect you need DIRECT with a subscription and no queue or topic endpoint.
How do you create a subscription without a topic endpoint? Simple: just get the client to do it. Try it with the -stl option to sdkperf.
Lastly, we have exampe code in the API (JCSMP) samples directory. Have a look there, especially at the “adPub*” samples if you need to ensure no message loss.
Tom sum-up:
Try your performance test with sdkperf;
Check what you’re trying to do. Do you need a queue? Do you need PERSISTENT or DIRECT?
I can help with some example source code. But as Tom says: do you want Direct messaging (best effort) or Guaranteed messaging (fully persistent, can’t get lost)?
Tom,
Your inputs are amazingly well thought out. I appreciate it.
In this first round of testing we want non-persistent. We can tolerate data loss. The key is to maximize throughput (even if that means lose a tiny bit of data). Our next steps will be to look at configurations where we turn on durability and persistence.
So I think you are right. We want the equivalent of a publish/subscribe scenario, in which messages are delivered “best effort” to each of the subscribed consumers.
We will download the tool you suggest.
Aaron,
We’d certainly appreciate your code samples. We are still new to solace, and our current goal is to learn what kind of performance we can get out of solace, for different broker and transport configurations. We intend to try them all.
I’ve down loaded sdkperf, both the java and c versions. Looking now at the documentation to see how to run the java version with varied message size and message rate.
Can someone point me to an example run command. Our broker configuration is seen below:
Having just discussed this with the team, one of our early goals is to be compatible with the JMS protocols. In this context we believe that the topical endpoint provides us with that compatibility. We are specifically interested in building camel-based subscriptions to the topical endpoint. We absolutely can tolerate data loss with the first configuration we have. Is the “subscription” mode you are referring to something that is compatible with JMS?
We are able to use our test producer to produce 2000 messages per second and have the sdkperf application consume all of the messages. The following command was used to create a subscription to our published test topic. This is not binding to a topical endpoint, however we would like to run that experiment as well. ./sdkperf_java.sh -cip=malta.corp.sensis.com:31253 -stl=sargrad_2
In the pictures below we can see our test producer connection which reports that the messages published per second can vary greatly from ~1600-2700 messages per second.
The sdkperf application seems to be reporting much less variability. The admin portal seems to be reporting that the sdkperf application is completely keeping up our test producers production rate of 2000 messages per second.
We have followed your advice and eliminated the use of a topical endpoint. We are now able to produce and consume 2500 msgs/sec with fairly high reliability. We are not quite able to get to 3000 msg/sec.
Our solace broker, and our producer and consumer are running on commodity hardware (Optiplex 7010).
The producer and consumer hardware is consuming about 24 percent of the CPU.
Hello Inder! So many questions, I will try to answer them all and hopefully not miss any.
I also have to break this response into several sections because it’s too long!!
First, some confusion around Topics vs. Topic Endpoints vs. Subscriptions in Solace. Endpoints are only when you need Persistence. For Direct messaging, consumer apps subscribe to topics (with wildcards?), and publishers publish on topics. Here’s an awesome blog from Tom on Topics and Subscriptions: https://solace.com/blog/when-topics-arent-just-publish-subscribe/ and here’s one that explains what Topic Endpoints are and when to use them: https://solace.com/blog/queues-vs-topic-endpoints/
Glad you figured out SdkPerf. We use it a lot for perf testing, it’s super useful. And yeah, for Direct messaging, you just need to use the -ptl and -stl options, and -mt=direct. To improve throughput, make sure the network distance between your SdkPerf apps and the broker is minimized. I would think SdkPerf should be able to push out more than 3000 msg/s @ 300 bytes…?
You were asking why there are 2 subscriptions instead of 1 for your consumer? Every Solace client when it connects gets assigned a unique “inbox” topic subscription for receiving control messages from the broker. So a publish only app would have 1 subscription, and a consumer application with 1 added subscription would have 2. In the PubSub+ Manager GUI, you can click on the client name, and then click on the Subscriptions tab to see them.
Ok, so what kind of rates should you expect with one publisher and one consumer? Note that if you’re using the Standard edition (free) Solace PubSub+ broker, it will be rate-limited to 10,000 msg/s. Otherwise, if using the Enterprise (paid), or Evaluation (90-day free: https://solace.com/downloads/#pubsub-enterprise-evaluation) version, or the hardware appliance, you should be able to get a LOT more… especially for Direct messaging. I’ll show you what I got with my test pub and sub apps in a minute.
In your publisher, don’t worry about the sendMultiple()… not really necessary. You’re also using a TextMessage to send a byte array? Just use BytesMessages. And in the consumer, you always want to be in async receiving mode, where you register a callback Listener in the Consumer interface. Always the fastest and most efficient not to have to poll or do a receive() call.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.solace.samples.aaron;
import java.io.IOException;
import java.util.Arrays;
import com.solacesystems.jcsmp.BytesMessage;
import com.solacesystems.jcsmp.JCSMPChannelProperties;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProducerEventHandler;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.ProducerEventArgs;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageProducer;
public class FastDirectPublisher {
static final int MSG_RATE = 5000; // Note: Standard edition PubSub+ broker is limited to 10k max ingress
static final int PAYLOAD_SIZE = 300;
static volatile boolean shutdown = false;
public static void main(String... args) throws JCSMPException, IOException, InterruptedException {
// Check command line arguments
if (args.length < 2 || args[1].split("@").length != 2) {
System.out.println("Usage: FastDirectPublisher <host:port> <client-username@message-vpn> [client-password]");
System.out.println();
System.exit(-1);
}
if (args[1].split("@")[0].isEmpty()) {
System.out.println("No client-username entered");
System.out.println();
System.exit(-1);
}
if (args[1].split("@")[1].isEmpty()) {
System.out.println("No message-vpn entered");
System.out.println();
System.exit(-1);
}
System.out.println("FastDirectPublisher initializing...");
// Create a JCSMP Session
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, args[0]); // host:port
properties.setProperty(JCSMPProperties.USERNAME, args[1].split("@")[0]); // client-username
properties.setProperty(JCSMPProperties.VPN_NAME, args[1].split("@")[1]); // message-vpn
if (args.length > 2) {
properties.setProperty(JCSMPProperties.PASSWORD, args[2]); // client-password
}
JCSMPChannelProperties cp = new JCSMPChannelProperties();
cp.setTcpNoDelay(false); // high throughput magic sauce... but will hurt latencies a bit
properties.setProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES,cp);
final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties);
/** Anonymous inner-class for handling publishing events */
XMLMessageProducer prod = session.getMessageProducer(new JCSMPStreamingPublishEventHandler() {
@Override
public void responseReceived(String messageID) {
// DIRECT publishers don't get publish callbacks
}
@Override
public void handleError(String messageID, JCSMPException e, long timestamp) {
// DIRECT publishers don't get publish callbacks
}
}, new JCSMPProducerEventHandler() {
@Override
public void handleEvent(ProducerEventArgs event) {
System.out.println("Received a producer event: "+event);
// should maybe do something with this?
}
});
session.connect();
Runnable pubThread = new Runnable() {
@Override
public void run() {
final int APPROX_NANOS_BETWEEN_MSGS = (1_000_000_000 / MSG_RATE); // won't be exactly the message rate, but close
Topic topic = JCSMPFactory.onlyInstance().createTopic("aaron/test/topic");
BytesMessage message = JCSMPFactory.onlyInstance().createMessage(BytesMessage.class);
byte[] payload = new byte[PAYLOAD_SIZE];
long curNanoTime = System.nanoTime(); // grab the current time before the start of the loop
char characterOfTheMoment;
try {
while (!shutdown) {
characterOfTheMoment = (char)((curNanoTime%26)+65);
Arrays.fill(payload,(byte)characterOfTheMoment); // fill it with some "random" value
message.setData(payload);
// dynamic topics!! use StringBuilder because "+" concat operator is SLOW
topic = JCSMPFactory.onlyInstance().createTopic(new StringBuilder("aaron/test/").append(characterOfTheMoment).toString());
prod.send(message,topic);
message.reset(); // reuse this message on the next loop, to avoid having to recreate it
while (System.nanoTime() < (curNanoTime + APPROX_NANOS_BETWEEN_MSGS)) { /* SPIN HARD */ }
// BUSY WAIT! (instead of Thread.sleep)
// burns more CPU, but provides more accurate publish rate
// comment out the "while" statement to publish as fast as possible
curNanoTime = System.nanoTime();
}
} catch (JCSMPException e) {
e.printStackTrace();
} finally {
System.out.print("Shutdown! Stopping Publisher... ");
prod.close();
session.closeSession();
System.out.println("Done.");
}
}
};
Thread t = new Thread(pubThread,"Publisher Thread");
t.setDaemon(true);
t.start();
System.out.println("Connected, and running. Press [ENTER] to quit.");
// block the main thread, waiting for a quit signal
System.in.read(); // wait for user to end
System.out.println("Quitting in 1 second.");
shutdown = true;
Thread.sleep(1000);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.solace.samples.aaron;
import java.io.IOException;
import com.solacesystems.jcsmp.BytesMessage;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPGlobalProperties;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageConsumer;
import com.solacesystems.jcsmp.XMLMessageListener;
public class FastDirectConsumer {
private static volatile int msgCounter = 0;
private static volatile boolean discardFlag = false;
private static volatile boolean shutdown = false;
public static void main(String... args) throws JCSMPException, IOException, InterruptedException {
// Check command line arguments
if (args.length < 2 || args[1].split("@").length != 2) {
System.out.println("Usage: FastDirectConsumer <host:port> <client-username@message-vpn> [client-password]");
System.out.println();
System.exit(-1);
}
if (args[1].split("@")[0].isEmpty()) {
System.out.println("No client-username entered");
System.out.println();
System.exit(-1);
}
if (args[1].split("@")[1].isEmpty()) {
System.out.println("No message-vpn entered");
System.out.println();
System.exit(-1);
}
System.out.println("FastDirectConsumer initializing...");
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, args[0]); // host:port
properties.setProperty(JCSMPProperties.USERNAME, args[1].split("@")[0]); // client-username
properties.setProperty(JCSMPProperties.VPN_NAME, args[1].split("@")[1]); // message-vpn
if (args.length > 2) {
properties.setProperty(JCSMPProperties.PASSWORD, args[2]); // client-password
}
//properties.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR,true); // use only 1 thread instead of 2, but hurts throughput
{ // this next block is probably unnecessary until receiving > 100k msg/s, so delete if going slower than that
JCSMPGlobalProperties gp = new JCSMPGlobalProperties();
gp.setConsumerDefaultFlowCongestionLimit(20000); // override default (5000)
JCSMPFactory.onlyInstance().setGlobalProperties(gp);
}
final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties);
/** Anonymous inner-class for MessageListener
* This demonstrates the async threaded message callback */
final XMLMessageConsumer cons = session.getMessageConsumer(new XMLMessageListener() {
@Override
public void onReceive(BytesXMLMessage msg) {
// do you want to do anything with this message?
msgCounter++;
if (msg.getDiscardIndication()) { // lost any messages?
// If the consumer is being over-driven (i.e. publish rates too high), the broker might discard some messages for this consumer
// check this flag to know if that's happened
// to avoid discards:
// a) reduce publish rate
// b) increase size of consumer's D-1 egress buffers (check client-profile)
// c) use multiple-threads or shared subscriptions for parallel processing
discardFlag = true; // set my own flag
}
// this next block is just to have a non-trivial onReceive() callback... let's do a bit of work
boolean doSomeSillyVerifyingOfData = true; // set this to false if publisher using single/static topic
if (doSomeSillyVerifyingOfData) {
// as set in the publisher code, the payload should be filled with the same character as the last letter of the topic
BytesMessage message = (BytesMessage)msg;
if (message.getAttachmentContentLength() > 0) {
byte[] payload = message.getData();
char payloadChar = (char)payload[0];
char lastTopicChar = message.getDestination().getName().charAt(message.getDestination().getName().length()-1);
if (payloadChar != lastTopicChar) {
System.out.println("*** Topic vs. Payload discrepancy *** : didn't match! oh well!");
doSomeSillyVerifyingOfData = false; // don't do any further testing
}
}
}
}
@Override
public void onException(JCSMPException e) {
// uh oh!
System.err.println("We've had an exception thrown to the Consumer's onException()");
e.printStackTrace();
}
});
session.connect();
final Topic topic = JCSMPFactory.onlyInstance().createTopic("aaron/>");
session.addSubscription(topic);
cons.start();
Runnable statsThread = new Runnable() {
@Override
public void run() {
try {
while (!shutdown) {
Thread.sleep(1000); // wait 1 second
System.out.printf("Msgs/s: %,d%n",msgCounter);
// kinda hacky way of doing message rates, but doesn't matter too much
msgCounter = 0;
if (discardFlag) {
System.out.println("*** Egress discard detected *** : consumer unable to keep up with full message rate");
discardFlag = false; // only show the error once per second
}
}
} catch (InterruptedException e) {
System.out.println("I was awoken while waiting");
}
}
};
Thread t = new Thread(statsThread,"Stats Thread");
t.setDaemon(true);
t.start();
System.out.println("Connected, and running. Press [ENTER] to quit.");
System.in.read(); // wait for user to end
System.out.println("Quitting in 1 second.");
shutdown = true;
session.removeSubscription(topic);
Thread.sleep(1000);
// Close consumer
cons.close();
session.closeSession();
}
}
Adjust the payload size and desired message rate at the top of the publisher. Note that to make this somewhat REAL WORLD the payload changes every message, and the topic does as well. Also, these two apps aren’t exactly single-threaded, but pretty close.
And the Results??
ANYHOW, I ran this test in my lab, on two unremarkable machines… one running the Evaluation edition of the PubSub+ broker (9.2), and the other one running the SINGLE publisher and SINGLE consumer apps. Same LAN, same switch actually, and 1Gbps network. Here’s the max sustained numbers I could get for 300 byte payloads:
Hopefully you think that’s pretty good for a single publisher and single subscriber. I couldn’t quite get to 250,000 msg/s without causing some discards in the network. Haha!
We are incorporating your suggestions into our application to test performance.
We have also upgraded our network to a 1Gbps connection, but we are only using CAT5e Ethernet cables and not CAT6. We are using two boxes, one to run SdkPerf and another one to run the Solace broker.
The only thing that we changed, so far, is our network.
When we run SdkPerf to pubsub messages at a rates of sub ~5000 messages per second, we are able to sustain the rate for some time before the connection degrades to less than 500 messages per second.
How would you go about using SdkPerf to isolate performance issues? I’d hope that we can either look at broker logs, or at sdkperf to observe the degradation.
I’m working with Inder on this problem. As an aside, when we run sdkperf at 4000 msgs/sec, its only putting 4 Mb/s on the network. The network should not be the problem, yet it seems to be problematic.
IFTOP shows this below:
We’ve also looked in the solace broker logs for some indication that there is a problem, but we dont see anything in the logs or the admin portal, that would indicate why the sdkperf tool is not able to sustain 4000 msg/sec
One last observation… after the degradation occurs, when we run another instance of sdkperf (using identical arguments)… we do see that 4000 msg / sec are again published and subscribed for anywhere from 10 seconds to 2 minutes… but the degradation will again occur, and the pub/sub rate degrades to no more than about 300 to 500 msg/second.
Under these degradation circumstances we use top to verify that we are not having a CPU or memory issue on either server (broker, or sdkperf)
Hey guys… thanks for the additional comments. And glad you liked my answer + code!
Also happy that you’re upgrading your network to 1Gbps… that will help. While the bandwidth is not huge, there’s also the serialization speed… 1Gbps puts data out onto the network 10x faster, so latencies will be lower.
Anyhow, ok… troubleshooting: first, always try to simplify things… run your publisher and subscriber SdkPerfs separately… that will (should?) help isolate who is having the problem.
If you only publish, does the same performance degradation happen? If publishing at a lower rate, does the same thing happen?
If it’s the publisher having problems, does adding more subscribers (i.e. increasing the “fanout”) change the behaviour? I’d hope not.
And you’re sure you’re publishing Direct, right? If publishing Guaranteed, and the disk on the broker was very slow, I could see that maybe eventually impacting the publishing performance…?
If your network is lossy and you’re getting TCP transmissions, that could explain some drop in performance, but going from 4k down to 500 sounds pretty bad. The logs on the broker won’t tell you much… they report connections/disconnections, stuff like that. You need to look at stats. Does ifotp show your TCP window information? Or retransmissions?
If you go into the the PubSub+ Manager (web GUI) then Client Connections, Solace Clients, click on them, and then under “Connections” you can see TCP errors. Let me know if there’s anything report ed there.
That’s a good start for debugging. Hope that helps! Let me know.