golang example to read from queue via solace golang API
Hi,
I was able to use the golang solace API to read messages from a topic but I need to read and acknowledge them from a queue.
Is this possible? So far I was only able to do this via amqp sdk - I used the azure one.
If yes: Can someone share a simple a code example?
It should be a non exclusive queue...
Comments
-
This example guaranteed_processor.go shows how to consume events from a queue with automatic acknowledgments.
For client acknowledgments you need to use "WithMessageClientAcknowledgement". Here is an example:
// Consume messages from a queue func (smfcli *SmfClient) Consume(queueName string, messageHandler solace.MessageHandler) (err error) { durableExclusiveQueue := resource.QueueDurableExclusive(queueName) // Build a Guaranteed message receiver and bind to the given queue persistentReceiver, err := smfcli.messagingService.CreatePersistentMessageReceiverBuilder().WithMessageClientAcknowledgement().Build(durableExclusiveQueue) // Handling a panic from a non existing queue defer func() { if err := recover(); err != nil { log.Printf("Make sure queue name '%s' exists on the broker.\nThe following error occurred when attempting to connect to create a Persistent Message Receiver:\n%s", queueName, err) } }() // Register Message callback handler to the Message Receiver if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil { return regErr } smfcli.PersistentReceiver = persistentReceiver log.Printf("Bound to queue: %s\n", queueName) // Start Persistent Message Receiver if err := smfcli.PersistentReceiver.Start(); err != nil { return err } return nil }
// Start consuming tickers err = Consume(GetConfigInstance().Engine.Queue.Tickers, engine.onTickerReceived) if err != nil { return err }
// Get the tickers func (engine *Engine) onTickerReceived(message message.InboundMessage) { }
1 -
Took me a while to find the different parts in the documentation but the following seems to work as a stand alone consumer for a non exclusive queue with client acknowledgement - in case someone else is looking for the same in the future:
package main import ( "flag" log "github.com/sirupsen/logrus" "os" "os/signal" "solace.dev/go/messaging" "solace.dev/go/messaging/pkg/solace" "solace.dev/go/messaging/pkg/solace/config" "solace.dev/go/messaging/pkg/solace/message" "solace.dev/go/messaging/pkg/solace/resource" "time" ) var ( logLevel = flag.Int("logLevel", 4, "log level (0-6)") logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output") logFormatJson = flag.Bool("logFormatJson", true, "log in json format") ) func init() { flag.Parse() log.SetLevel(log.Level(*logLevel)) log.SetReportCaller(*logReportCaller) log.SetFormatter(&log.TextFormatter{}) if *logFormatJson { log.SetFormatter(&log.JSONFormatter{}) } } func getEnv(key string) string { if val, ok := os.LookupEnv(key); ok { return val } else { log.Info("Missing environment variable ", key) os.Exit(8) } return "" } // Message Handler func MessageHandler(message message.InboundMessage) { log.Debug("Message Dump %s \n", message) payload, ok := message.GetPayloadAsBytes() if ok { log.Info(string(payload)) persistentReceiver.Ack(message) } log.Info("") } var persistentReceiver solace.PersistentMessageReceiver func main() { // Configuration parameters brokerConfig := config.ServicePropertyMap{ config.TransportLayerPropertyHost: getEnv("SOLACE_HOST"), config.ServicePropertyVPNName: getEnv("SOLACE_VPN"), config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USER"), config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD"), } messagingService, err := messaging.NewMessagingServiceBuilder(). FromConfigurationProvider(brokerConfig). WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()). Build() if err != nil { panic(err) } // Connect to the messaging serice if err := messagingService.Connect(); err != nil { panic(err) } log.Info("Connected to the broker? ", messagingService.IsConnected()) queueName := getEnv("AEM_QUEUE") durableNonExclusiveQueue := resource.QueueDurableNonExclusive(queueName) //nonDurableExclusiveQueue := resource.QueueNonDurableExclusive(queueName) // Build a Guaranteed message receiver and bind to the given queue persistentReceiver, err = messagingService.CreatePersistentMessageReceiverBuilder().WithMessageClientAcknowledgement().Build(durableNonExclusiveQueue) log.Info("Bound to queue: %s", queueName) // Handling a panic from a non existing queue defer func() { if err := recover(); err != nil { log.Printf("Make sure queue name '%s' exists on the broker.\nThe following error occurred when attempting to connect to create a Persistent Message Receiver:\n%s", queueName, err) } }() // Start Persistent Message Receiver if err := persistentReceiver.Start(); err != nil { panic(err) } log.Info("Persistent Receiver running? ", persistentReceiver.IsRunning()) if regErr := persistentReceiver.ReceiveAsync(MessageHandler); regErr != nil { panic(regErr) } // Handle interrupts c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) // Block until a signal is received. <-c // TODO // Find way to shutdown the go routine // e.g use another channel, BOOl..etc // TODO // Terminate the Persistent Receiver persistentReceiver.Terminate(2 * time.Second) log.Info("\nDirect Receiver Terminated? ", persistentReceiver.IsTerminated()) // Disconnect the Message Service messagingService.Disconnect() log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected()) }
1