Hi,
I was able to use the golang solace API to read messages from a topic but I need to read and acknowledge them from a queue.
Is this possible? So far I was only able to do this via amqp sdk - I used the azure one.
If yes: Can someone share a simple a code example?
It should be a non exclusive queue…
This example guaranteed_processor.go > shows how to consume events from a queue with automatic acknowledgments.
For client acknowledgments you need to use “WithMessageClientAcknowledgement”. Here is an example:
// Consume messages from a queue
func (smfcli *SmfClient) Consume(queueName string, messageHandler solace.MessageHandler) (err error) {
durableExclusiveQueue := resource.QueueDurableExclusive(queueName)
// Build a Guaranteed message receiver and bind to the given queue
persistentReceiver, err := smfcli.messagingService.CreatePersistentMessageReceiverBuilder().WithMessageClientAcknowledgement().Build(durableExclusiveQueue)
// Handling a panic from a non existing queue
defer func() {
if err := recover(); err != nil {
log.Printf(“Make sure queue name ‘%s’ exists on the broker.\nThe following error occurred when attempting to connect to create a Persistent Message Receiver:\n%s”, queueName, err)
}
}()
// Register Message callback handler to the Message Receiver
if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {
return regErr
}
smfcli.PersistentReceiver = persistentReceiver
log.Printf(“Bound to queue: %s\n”, queueName)
// Start Persistent Message Receiver
if err := smfcli.PersistentReceiver.Start(); err != nil {
return err
}
return nil
}
// Start consuming tickers
err = Consume(GetConfigInstance().Engine.Queue.Tickers, engine.onTickerReceived)
if err != nil {
return err
}
// Get the tickers
func (engine *Engine) onTickerReceived(message message.InboundMessage) {
}
Took me a while to find the different parts in the documentation but the following seems to work as a stand alone consumer for a non exclusive queue with client acknowledgement - in case someone else is looking for the same in the future:
package main
import (
“flag”
log “GitHub - sirupsen/logrus: Structured, pluggable logging for Go.”
“os”
“os/signal”
“solace.dev/go/messaging”
“solace.dev/go/messaging/pkg/solace”
“solace.dev/go/messaging/pkg/solace/config”
“solace.dev/go/messaging/pkg/solace/message”
“solace.dev/go/messaging/pkg/solace/resource”
“time”
)
var (
logLevel = flag.Int(“logLevel”, 4, “log level (0-6)”)
logReportCaller = flag.Bool(“logReportCaller”, false, “add caller to log output”)
logFormatJson = flag.Bool(“logFormatJson”, true, “log in json format”)
)
func init() {
flag.Parse()
log.SetLevel(log.Level(*logLevel))
log.SetReportCaller(*logReportCaller)
log.SetFormatter(&log.TextFormatter{})
if *logFormatJson {
log.SetFormatter(&log.JSONFormatter{})
}
}
func getEnv(key string) string {
if val, ok := os.LookupEnv(key); ok {
return val
} else {
log.Info(“Missing environment variable “, key)
os.Exit(8)
}
return “”
}
// Message Handler
func MessageHandler(message message.InboundMessage) {
log.Debug(“Message Dump %s \n”, message)
payload, ok := message.GetPayloadAsBytes()
if ok {
log.Info(string(payload))
persistentReceiver.Ack(message)
}
log.Info(””)
}
var persistentReceiver solace.PersistentMessageReceiver
func main() {
// Configuration parameters
brokerConfig := config.ServicePropertyMap{
config.TransportLayerPropertyHost: getEnv(“SOLACE_HOST”),
config.ServicePropertyVPNName: getEnv(“SOLACE_VPN”),
config.AuthenticationPropertySchemeBasicUserName: getEnv(“SOLACE_USER”),
config.AuthenticationPropertySchemeBasicPassword: getEnv(“SOLACE_PASSWORD”),
}
messagingService, err := messaging.NewMessagingServiceBuilder().
FromConfigurationProvider(brokerConfig).
WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()).
Build()
if err != nil {
panic(err)
}
// Connect to the messaging serice
if err := messagingService.Connect(); err != nil {
panic(err)
}
log.Info("Connected to the broker? ", messagingService.IsConnected())
queueName := getEnv(“AEM_QUEUE”)
durableNonExclusiveQueue := resource.QueueDurableNonExclusive(queueName)
//nonDurableExclusiveQueue := resource.QueueNonDurableExclusive(queueName)
// Build a Guaranteed message receiver and bind to the given queue
persistentReceiver, err = messagingService.CreatePersistentMessageReceiverBuilder().WithMessageClientAcknowledgement().Build(durableNonExclusiveQueue)
log.Info(“Bound to queue: %s”, queueName)
// Handling a panic from a non existing queue
defer func() {
if err := recover(); err != nil {
log.Printf(“Make sure queue name ‘%s’ exists on the broker.\nThe following error occurred when attempting to connect to create a Persistent Message Receiver:\n%s”, queueName, err)
}
}()
// Start Persistent Message Receiver
if err := persistentReceiver.Start(); err != nil {
panic(err)
}
log.Info("Persistent Receiver running? “, persistentReceiver.IsRunning())
if regErr := persistentReceiver.ReceiveAsync(MessageHandler); regErr != nil {
panic(regErr)
}
// Handle interrupts
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
// Block until a signal is received.
<-c
// TODO
// Find way to shutdown the go routine
// e.g use another channel, BOOl…etc
// TODO
// Terminate the Persistent Receiver
persistentReceiver.Terminate(2 * time.Second)
log.Info(”\nDirect Receiver Terminated? ", persistentReceiver.IsTerminated())
// Disconnect the Message Service
messagingService.Disconnect()
log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected())
}