golang example to read from queue via solace golang API

ulrich
ulrich Member Posts: 17

Hi,

I was able to use the golang solace API to read messages from a topic but I need to read and acknowledge them from a queue.

Is this possible? So far I was only able to do this via amqp sdk - I used the azure one.

If yes: Can someone share a simple a code example?

It should be a non exclusive queue...

Comments

  • Vincent
    Vincent Member, Employee Posts: 2 Solace Employee

    This example guaranteed_processor.go shows how to consume events from a queue with automatic acknowledgments.

    For client acknowledgments you need to use "WithMessageClientAcknowledgement". Here is an example:

    // Consume messages from a queue
    
    func (smfcli *SmfClient) Consume(queueName string, messageHandler solace.MessageHandler) (err error) {
    
    	durableExclusiveQueue := resource.QueueDurableExclusive(queueName)
    
    	// Build a Guaranteed message receiver and bind to the given queue
    
    	persistentReceiver, err := smfcli.messagingService.CreatePersistentMessageReceiverBuilder().WithMessageClientAcknowledgement().Build(durableExclusiveQueue)
    
    
    
    
    	// Handling a panic from a non existing queue
    
    	defer func() {
    
    		if err := recover(); err != nil {
    
    			log.Printf("Make sure queue name '%s' exists on the broker.\nThe following error occurred when attempting to connect to create a Persistent Message Receiver:\n%s", queueName, err)
    
    		}
    
    	}()
    
    
    
    
    	// Register Message callback handler to the Message Receiver
    
    	if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {
    
    		return regErr
    
    	}
    
    	smfcli.PersistentReceiver = persistentReceiver
    
    	log.Printf("Bound to queue: %s\n", queueName)
    
    
    
    
    	// Start Persistent Message Receiver
    
    	if err := smfcli.PersistentReceiver.Start(); err != nil {
    
    		return err
    
    	}
    
    	return nil
    
    }
    


    // Start consuming tickers
    
    	err = Consume(GetConfigInstance().Engine.Queue.Tickers, engine.onTickerReceived)
    
    	if err != nil {
    
    		return err
    
    	}
    


    // Get the tickers
    
    func (engine *Engine) onTickerReceived(message message.InboundMessage) {
    
    }
    
  • ulrich
    ulrich Member Posts: 17

    Took me a while to find the different parts in the documentation but the following seems to work as a stand alone consumer for a non exclusive queue with client acknowledgement - in case someone else is looking for the same in the future:

    package main
    
    import (
       "flag"
       log "github.com/sirupsen/logrus"
       "os"
       "os/signal"
       "solace.dev/go/messaging"
       "solace.dev/go/messaging/pkg/solace"
       "solace.dev/go/messaging/pkg/solace/config"
       "solace.dev/go/messaging/pkg/solace/message"
       "solace.dev/go/messaging/pkg/solace/resource"
       "time"
    )
    
    var (
       logLevel        = flag.Int("logLevel", 4, "log level (0-6)")
       logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output")
       logFormatJson   = flag.Bool("logFormatJson", true, "log in json format")
    )
    
    func init() {
       flag.Parse()
       log.SetLevel(log.Level(*logLevel))
       log.SetReportCaller(*logReportCaller)
       log.SetFormatter(&log.TextFormatter{})
       if *logFormatJson {
          log.SetFormatter(&log.JSONFormatter{})
       }
    
    }
    
    func getEnv(key string) string {
       if val, ok := os.LookupEnv(key); ok {
          return val
       } else {
          log.Info("Missing environment variable ", key)
          os.Exit(8)
       }
       return ""
    }
    
    // Message Handler
    func MessageHandler(message message.InboundMessage) {
       log.Debug("Message Dump %s \n", message)
       payload, ok := message.GetPayloadAsBytes()
       if ok {
          log.Info(string(payload))
          persistentReceiver.Ack(message)
       }
       log.Info("")
    
    }
    
    var persistentReceiver solace.PersistentMessageReceiver
    
    func main() {
    
       // Configuration parameters
       brokerConfig := config.ServicePropertyMap{
          config.TransportLayerPropertyHost:                getEnv("SOLACE_HOST"),
          config.ServicePropertyVPNName:                    getEnv("SOLACE_VPN"),
          config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USER"),
          config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD"),
       }
       messagingService, err := messaging.NewMessagingServiceBuilder().
          FromConfigurationProvider(brokerConfig).
          WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()).
          Build()
       if err != nil {
          panic(err)
       }
    
       // Connect to the messaging serice
       if err := messagingService.Connect(); err != nil {
          panic(err)
       }
    
       log.Info("Connected to the broker? ", messagingService.IsConnected())
    
       queueName := getEnv("AEM_QUEUE")
       durableNonExclusiveQueue := resource.QueueDurableNonExclusive(queueName)
       //nonDurableExclusiveQueue := resource.QueueNonDurableExclusive(queueName)
    
       // Build a Guaranteed message receiver and bind to the given queue
       persistentReceiver, err = messagingService.CreatePersistentMessageReceiverBuilder().WithMessageClientAcknowledgement().Build(durableNonExclusiveQueue)
    
       log.Info("Bound to queue: %s", queueName)
    
       // Handling a panic from a non existing queue
    
       defer func() {
    
          if err := recover(); err != nil {
    
             log.Printf("Make sure queue name '%s' exists on the broker.\nThe following error occurred when attempting to connect to create a Persistent Message Receiver:\n%s", queueName, err)
    
          }
    
       }()
    
       // Start Persistent Message Receiver
       if err := persistentReceiver.Start(); err != nil {
          panic(err)
       }
    
       log.Info("Persistent Receiver running? ", persistentReceiver.IsRunning())
    
       if regErr := persistentReceiver.ReceiveAsync(MessageHandler); regErr != nil {
          panic(regErr)
       }
    
       // Handle interrupts
    
       c := make(chan os.Signal, 1)
       signal.Notify(c, os.Interrupt)
    
       // Block until a signal is received.
       <-c
    
       // TODO
       // Find way to shutdown the go routine
       // e.g use another channel, BOOl..etc
       // TODO
    
       // Terminate the Persistent Receiver
       persistentReceiver.Terminate(2 * time.Second)
       log.Info("\nDirect Receiver Terminated? ", persistentReceiver.IsTerminated())
    
       // Disconnect the Message Service
       messagingService.Disconnect()
       log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected())
    
    }
    
    
  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 538 admin

    Thanks for sharing @ulysses@ulrich !!