golang publisher error message

ulrich
ulrich Member Posts: 17

Hi,

i have written a golang base message producer using the solace golang API/SDK.

I create random messages and can define a level of parallelism and a message length.

The program works fine and creates some 1k's of messages per sec for 1k long messages. But after some time it always terminates with

{"level":"info","msg":"message published:{\"message\":\"Yq4wR0ZPomc4B0w8bVwyLFxtwIkk0b5IcfbtvHlj8JrcnfraDXvR4bMtGIWJgZLKMf7c1q962BQHtiNfJbZVQWHj8XtyBCj9FgwC8D8QXzuaMGkRb1jRowzg0pftKATls2a93x4HBvtLRg0NN9wiYnRnIA8fnF7qUFVJ3piKMba43epRlkZdTx67d43tofLCkdtXUz0uWiuNncd3GNWYdKmvtRHcrlS3cofyPD8qPuidPdheiUlTf7iiM1f8wybm8Sy2GeDBjIfm9b7bzFaFTFE1Mp3FoQTz31pOU5qsO3txwaPdUa3oO76MCXZDTIMNIoLCY4dxFOvXnKlGpzrZpsRetSU7xoaR7hCFZNxRnKqLpItjO5gRKZdu4OfZLjWVIn9ljnKGinXNHpFLusDtFvRv356KaiOWYpav9ZwBzamuWTaPAHDqNLhFyGujxbNfGKGdoLpIYJyf5UCbYbK6m56v2Xsfe8ru4EHNnGTvcQ0Lh4Q0Mx3GnbDxAeUUXtgsqzlDYYBlULO8yduB3AuGG0hhXtF1LOMF8sOyySA2c1XVYp4A4v2Wgj069Tb4FVuib1gIm2PWTu7DwEPrnr4DYnl2RILVTuFZ13suN3N2WPfiy34YXGeMzTw9TXQ9Q8Tn67qY0owzCNhnzc2EHd7ONldDHg0QCsMRXAKnzYZhgbEmU0mi0qmC8nuU3ggMV4RETU7zSMp22SH7PIQO5OrTAOPeX426FP5U3XbCteCyQjxam3M2rrMuY2TqT9Wbeciywx7WWvKgyz9OWc8rDeD51SVFdN50d7DFDmlf4wl4nf5T1PIijUeQ8LdSIJ1c6Eg4PaTGpexGrhUP7LZC3mJlDu9pLWrsZyRNaLhzIrOuBVVKXOu5s2ybXhSpfQjtZfsVNdkWHNab3bkc7tbBlYk2TX0WmpgYTaPAShFWGQHHf01bpNX0fdoXDf3iYY7ZAVIknqWcfu3GvkSukcbvx0BFoV7iF7IjeAtn02Yz48jV\",\"id\":\"3ec71a88-d228-48eb-8f1a-3f83b3f73045\"}","time":"2023-01-13T12:11:44Z"}

{"level":"info","msg":"topic: CXS/demo1/5/fb42d89f-d2f8-4500-8f46-9a0a5d6ff193","time":"2023-01-13T12:11:44Z"}

2023/01/13 12:11:44 log.go:45: WARNING solClientMsg.c:1247         (7fdc277fe640) Bad msg_p pointer '0xfc000db6' in solClient_msg_dup

panic: invalid configuration provided: error duplicating message: Bad msg_p pointer '0xfc000db6' in solClient_msg_dup


As the data is random generated I can't see how a duplicate message would come in play.

Can someone explain what is happening there and how to resolve?

Tagged:

Best Answer

  • mcardy
    mcardy Member, Employee Posts: 12 Solace Employee
    #2 Answer ✓

    @ulrich can you try adding

    message.Dispose()
    

    after the call to Publish, like this:

    publishErr := directPublisher.Publish(message, resource.TopicOf(topic))
    if publishErr != nil {
       panic(publishErr)
    }
    log.Info("message published:", messageBody)
    message.Dispose()
    

Answers

  • amackenzie
    amackenzie Member, Employee Posts: 268 Solace Employee

    can we see the code? Also, there are formatting options in this forum for codeblocks. You can use the 3 tildes (the ` character) to form the start and choose the paragraph formatter to end it.

    this is a code block
    


  • ulrich
    ulrich Member Posts: 17

    here you are:

    package main
    
    import (
       "encoding/json"
       "flag"
       "fmt"
       "github.com/satori/go.uuid"
       log "github.com/sirupsen/logrus"
       "math/rand"
       "os"
       "os/signal"
       "strconv"
       "time"
    
       "solace.dev/go/messaging"
       "solace.dev/go/messaging/pkg/solace/config"
       "solace.dev/go/messaging/pkg/solace/resource"
    )
    
    var (
       logLevel        = flag.Int("logLevel", 4, "log level (0-6)")
       logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output")
       logFormatJson   = flag.Bool("logFormatJson", true, "log in json format")
    )
    
    func init() {
       flag.Parse()
       log.SetLevel(log.Level(*logLevel))
       log.SetReportCaller(*logReportCaller)
       log.SetFormatter(&log.TextFormatter{})
       if *logFormatJson {
          log.SetFormatter(&log.JSONFormatter{})
       }
    
    }
    
    func getEnv(key string) string {
       if val, ok := os.LookupEnv(key); ok {
          return val
       } else {
          log.Info("Missing environment variable ", key)
          os.Exit(8)
       }
       return ""
    }
    
    func main() {
       p, err := strconv.Atoi(getEnv("PARALLEL_PRODUCER"))
    
       if err != nil {
          log.Panic("PARALLEL_PRODUCER variable has to be a positive integer ")
       }
    
       TopicPrefix := getEnv("AEM_TOPIC_PREFIX")
       log.Info(TopicPrefix)
    
       // Configuration parameters
       brokerConfig := config.ServicePropertyMap{
          config.TransportLayerPropertyHost:                getEnv("SOLACE_HOST"),
          config.ServicePropertyVPNName:                    getEnv("SOLACE_VPN"),
          config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USER"),
          config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD"),
       }
       messagingService, err := messaging.NewMessagingServiceBuilder().
          FromConfigurationProvider(brokerConfig).
          WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()).
          Build()
       if err != nil {
          panic(err)
       }
    
       // Connect to the messaging serice
       if err := messagingService.Connect(); err != nil {
          panic(err)
       }
    
       log.Info("Connected to the broker? ", messagingService.IsConnected())
    
       //  Build a Direct Message Publisher
       directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build()
       if builderErr != nil {
          panic(builderErr)
       }
    
       startErr := directPublisher.Start()
       if startErr != nil {
          panic(startErr)
       }
    
       log.Info("Direct Publisher running? ", directPublisher.IsRunning())
    
       msgSeqNum := 0
    
       eventInd, err := strconv.ParseBool(getEnv("EVENTS_IND"))
       if err != nil {
          log.Panic(err)
       }
       msgLength, err := strconv.Atoi(getEnv("MSG_LENGTH"))
       if err != nil {
          log.Panic(err)
       }
       for i := 0; i < p; i++ {
    
          go func(v int) {
    
             for directPublisher.IsReady() {
                //  Prepare outbound message payload and body
                var messageBody string
                messageBody, err = getRandomMessage(msgLength)
                if err != nil {
                   log.Panic(err)
                }
    
                id := uuid.NewV4().String()
    
                messageBuilder := messagingService.MessageBuilder().
                   WithProperty("application", "samples").
                   WithProperty("language", "go")
    
                msgSeqNum++
                message, err := messageBuilder.BuildWithStringPayload(messageBody)
                if err != nil {
                   panic(err)
                }
                topic := TopicPrefix + "/" + fmt.Sprintf("%d", v) + "/" + id
                log.Info("topic: ", topic)
                publishErr := directPublisher.Publish(message, resource.TopicOf(topic))
                if publishErr != nil {
                   panic(publishErr)
                }
                log.Info("message published:", messageBody)
             }
    
          }(i)
    
       }
    
       // Handle interrupts
    
       c := make(chan os.Signal, 1)
       signal.Notify(c, os.Interrupt)
    
       // Block until a signal is received.
       <-c
    
       // TODO
       // Find way to shutdown the go routine
       // e.g use another channel, BOOl..etc
       // TODO
    
       // Terminate the Direct Publisher
       directPublisher.Terminate(2 * time.Second)
       log.Info("\nDirect Publisher Terminated? ", directPublisher.IsTerminated())
    
       // Disconnect the Message Service
       messagingService.Disconnect()
       log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected())
    
    }
    
    var letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    
    func getRandomMessage(n int) (string, error) {
       // Seed the random number generator
       rand.Seed(time.Now().UnixNano())
    
       // Generate a random string of length n
       b := make([]byte, n)
       for i := range b {
          b[i] = letterBytes[rand.Intn(len(letterBytes))]
       }
       randomStr := string(b)
    
       // Generate a UUID
       id := uuid.NewV4()
    
       // Create the response object
       response := struct {
          Message string `json:"message"`
          ID      string `json:"id"`
       }{
          Message: randomStr,
          ID:      id.String(),
       }
    
       // Marshal the response to a JSON string
       responseJSON, err := json.Marshal(response)
       if err != nil {
          return "", err
       }
    
       return string(responseJSON), nil
    }
    
    
  • ulrich
    ulrich Member Posts: 17
    getEnv("PARALLEL_PRODUCER")
    

    defines the level of parallel internal instances this script runs - I used 10 to 40

    getEnv("AEM_TOPIC_PREFIX")
    

    just the topic prefix

  • mcardy
    mcardy Member, Employee Posts: 12 Solace Employee

    Hi @ulrich,

    There is a bug in the Go API that prevents multiple goroutines from sharing the same DirectMessagePublisher instance that will be fixed in a future release. In the meantime, using one DirectMessagePublisher instance per goroutine will resolve the issue.

  • ulrich
    ulrich Member Posts: 17

    code changed - for completeness

    package main
    
    import (
       "flag"
       log "github.com/sirupsen/logrus"
       "os"
       "os/signal"
       "solace.dev/go/messaging"
       "solace.dev/go/messaging/pkg/solace"
       "solace.dev/go/messaging/pkg/solace/config"
       "solace.dev/go/messaging/pkg/solace/message"
       "solace.dev/go/messaging/pkg/solace/resource"
       "time"
    )
    
    var (
       logLevel        = flag.Int("logLevel", 4, "log level (0-6)")
       logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output")
       logFormatJson   = flag.Bool("logFormatJson", true, "log in json format")
    )
    
    func init() {
       flag.Parse()
       log.SetLevel(log.Level(*logLevel))
       log.SetReportCaller(*logReportCaller)
       log.SetFormatter(&log.TextFormatter{})
       if *logFormatJson {
          log.SetFormatter(&log.JSONFormatter{})
       }
    
    }
    
    func getEnv(key string) string {
       if val, ok := os.LookupEnv(key); ok {
          return val
       } else {
          log.Info("Missing environment variable ", key)
          os.Exit(8)
       }
       return ""
    }
    
    // Message Handler
    func MessageHandler(message message.InboundMessage) {
       log.Debug("Message Dump %s \n", message)
       payload, ok := message.GetPayloadAsBytes()
       if ok {
          log.Info(string(payload))
          persistentReceiver.Ack(message)
       } else {
          id, ok := message.GetApplicationMessageID()
          if ok {
             log.Error("message wasn't able to be processed: ", id)
          } else {
             log.Error("general issue with message: ", message.String())
          }
       }
       log.Info("")
    
    }
    
    var persistentReceiver solace.PersistentMessageReceiver
    
    func main() {
    
       // Configuration parameters
       brokerConfig := config.ServicePropertyMap{
          config.TransportLayerPropertyHost:                getEnv("SOLACE_HOST"),
          config.ServicePropertyVPNName:                    getEnv("SOLACE_VPN"),
          config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USER"),
          config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD"),
       }
       messagingService, err := messaging.NewMessagingServiceBuilder().
          FromConfigurationProvider(brokerConfig).
          WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()).
          Build()
       if err != nil {
          panic(err)
       }
    
       // Connect to the messaging serice
       if err := messagingService.Connect(); err != nil {
          panic(err)
       }
    
       log.Info("Connected to the broker? ", messagingService.IsConnected())
    
       queueName := getEnv("AEM_QUEUE")
       durableNonExclusiveQueue := resource.QueueDurableNonExclusive(queueName)
       //nonDurableExclusiveQueue := resource.QueueNonDurableExclusive(queueName)
    
       // Build a Guaranteed message receiver and bind to the given queue
       persistentReceiver, err = messagingService.CreatePersistentMessageReceiverBuilder().WithMessageClientAcknowledgement().Build(durableNonExclusiveQueue)
    
       log.Info("Bound to queue: %s", queueName)
    
       // Handling a panic from a non existing queue
    
       defer func() {
    
          if err := recover(); err != nil {
    
             log.Printf("Make sure queue name '%s' exists on the broker.\nThe following error occurred when attempting to connect to create a Persistent Message Receiver:\n%s", queueName, err)
    
          }
    
       }()
    
       // Start Persistent Message Receiver
       if err := persistentReceiver.Start(); err != nil {
          panic(err)
       }
    
       log.Info("Persistent Receiver running? ", persistentReceiver.IsRunning())
    
       if regErr := persistentReceiver.ReceiveAsync(MessageHandler); regErr != nil {
          panic(regErr)
       }
    
       // Handle interrupts
    
       c := make(chan os.Signal, 1)
       signal.Notify(c, os.Interrupt)
    
       // Block until a signal is received.
       <-c
    
       // TODO
       // Find way to shutdown the go routine
       // e.g use another channel, BOOl..etc
       // TODO
    
       // Terminate the Persistent Receiver
       persistentReceiver.Terminate(2 * time.Second)
       log.Info("\nDirect Receiver Terminated? ", persistentReceiver.IsTerminated())
    
       // Disconnect the Message Service
       messagingService.Disconnect()
       log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected())
    
    }
    
    
  • ulrich
    ulrich Member Posts: 17

    not yet - different errors now:

    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa81affd640) datablock_free '0x7fa87413c7e0', refcount=-2 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81affd640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa81affd640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81affd640) datablock already free '0x7fa87413c7e0', refcount=-1 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa81affd640) datablock_free '0x7fa87413c7e0', refcount=-2 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa8127fc640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa8127fc640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa8127fc640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa8127fc640) datablock already free '0x7fa87413c7e0', refcount=-1 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa8127fc640) datablock_free '0x7fa87413c7e0', refcount=-2 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    {"level":"info","msg":"topic: CXS/demo1/8/b68c7425-7076-4ac0-b0cd-9f4caae4992e","time":"2023-01-13T20:22:03Z"}
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=-1 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    {"level":"info","msg":"message published:{\"message\":\"kzYAdG6KwLM056Yv6GtJmKQLdHJnx6uNb4j1GGaGxQUO9o1JnchZkZewUe1fywH94lWvD8E7x2bczU78ATSuEyG9dayDzFDeRsfrLUOJO3nDsrmjmSed0BYgVsT2yvpCO03Azj14Mo23US3OMyG1s25LTT4ubKBZrcupF45vuGCOlsyPLc5QdxyvloPTSJ67zndmcbzcd7ePugTsQRX4twXOWRx31bjr5SGJtHzQFnI6ZonrJd8Hwsrw1V7XvpI0BJecYUC6ttY2WLOsKdftz3N6zVhO2DvQh6wtIBTHxk0fq3PadmWeUQDinpGBAwx7TJj8orFBDnAmUGTThoM5H6iaSuxyMCkuwxPn6iaeX4AEaAjpBGyfBEZUVkMU5StbevIyMZk8jOaetGlazyVfNevv9MgfB3lCUUOKWC536KFRLpGUGwF71Ahb8eHuqlGcyaUfTS4pwpMptWbKsZtzTIbd6akFmQNYLVYI04idXzDH9Qy9LB2bSJQ89FKtzqPXXkZ5ZyU3F0ZAp1JBOgKnWw514Kv3TTAg8CLGLtwzDcpqhhsuHSxE0d3cVsBxy41ooR5gvEwGK70J5OK66KcATcXAZGxTggm3h4MNBDHEMtksvyYh1O51WLSdJuhWztbkjKw4MCakAFYL7XqHejiTlQj4bbUxzZEdmmRmbgcTmaMEZy0JlAdh9rC74sd2meumcLLeQE7T3kIXKCuRSCeIFFWQFfibIJpNyjqQJJDNpZ35mdvomiMhBTG1WRUCiyEQ96Ov5O2aDCkTMV6rZS00snR6flIH7lY6OPrLmlw7W6nQ7DaeWUx22x5jf04esqzoddrQboUxpupoJKlAsnnqyM1mpHSmXDEcG8MYll9eUMxhvSyV4mm9Edy7oS7bVPf3cl7DqGgflAzyNesex5MeIfLoy75p6vS7elgx8QhNgzlE3gHMrWhpjjhSdJxn4aRm7YM9n9tn1sXcttsTSyRdJDoT3F4CE3CwbrYOwQC6\",\"id\":\"87d8fee3-ef92-4c43-bf32-3df5eb1998ba\"}","time":"2023-01-13T20:22:03Z"}
    
    {"level":"info","msg":"message published:{\"message\":\"VtGYzLm9Rt9wyN6FKdyYnTNRhcy16bhmdATy8pg7nbwwwAH762sYqA2N8UGFJtvAkqS4v28JW0KygETYSdMjtHnQlVNTkwMjZQ6gR85NSMDj89zKcOaKtp17TU6cqGbgfPFveDpzkD45tLe3nH0dda8VeNuj6arT8AmN3IdjI9sZmuQDDxSPAzoBxJsTZP9NuDdPV9Fpkhu8YzFHo8FiYix3luPD01Xydb6jcU8MIRqXljoRaoXkhkRjV0O1TF4hgrDKGsi8OBHpqY4APHfeEY7Rc8DaP3hZYHCe8RdTY2vIY5vb6QFHsZEV6T1f0K3NOLZgBrdKdrbAe2hM8EiXjOAEwgzEDm4RmH3EOena2Q46QmHJJ33qCZAgu3pqLBq1CLXVYG4eApdRIZ1bElbEkHuh0Eef7zokVWxW4wRWidV8gRV7HhoX0dOc92jsJRMpk0kRaEHuABDROiobwfpG7FJkcgk0UJu4Ijuq2UZCPFcePGhGa0aFtCQMfljetUqZJFK6rDZO5pUVSiTEkdb0YRuqaC8xPiFgLNJQSqDere30QXsl1uWYcNgvGNCohgJAv4Cnrq34VngcUR3jmkAc1IVlHuvmbfg04FqGzuMgYwDWiASFTKtelqVsdpFZ6XzIarr1EJVNgPoJRiwlTSxcjQk0X7UNa1nbqmfvxjgf5z3jKOl6ny4Rrid2HWW5tS0JNEo6UURvPA9hq8qM1kzE72ArcwCRlPptQ2ytan4UO4bEBZDFAznvpZD3f8O6c7pe7mE2cvMmJx9tz8oRkrj9wE5kll8KzPTc6nSX0u5DytzVXiQRchUwZdjPw7GTRDjwlVEWHanMbUyMvvWJNgdJv6xMSAeYGPX8zQV5saHxLfHM95FVwaHVtubMeqxhPonGcjPqUp7IlwXuUsYuoFWKP8sRu7TchLWbXHXYrmpmgZxZDgEmEx8n9WMBmu3vwCtxDf0JUEi3GzQDi0syCFVOWk0bn2RKGFXQoN3n904Z\",\"id\":\"3aaceb73-57dd-422a-9cb3-03f917e02aa1\"}","time":"2023-01-13T20:22:03Z"}
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=-1 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-2 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=-2 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-3 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    {"level":"info","msg":"topic: CXS/demo1/12/874154a4-9a58-4eae-8a40-ffa5a84f7698","time":"2023-01-13T20:22:03Z"}
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943                   (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943
    
    2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943                   (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/^Cazureuser@accenture-sandbox:~/tmp/AdvancedEventMesh$ 
    
  • mcardy
    mcardy Member, Employee Posts: 12 Solace Employee
    edited January 2023 #9

    @ulrich can you share the code you are using? What platform are you on? What version of the Go API are you using?

  • ulrich
    ulrich Member Posts: 17


    This is the code.

    Ubuntu 22.04.1 LTS

    go version go1.18.1 linux/amd64

  • mcardy
    mcardy Member, Employee Posts: 12 Solace Employee

    @ulrich the changed code provided was receive code, not publishing code. Was the error occurring on the receive side now?

    Can you share the version of the Solace Go API?

  • ulrich
    ulrich Member Posts: 17

    sorry for that. Was a copy mistake with the code...

    from the go mod file

    solace.dev/go/messaging v1.1.0
    

    producer code here

    package main
    
    import (
       "encoding/json"
       "flag"
       "fmt"
       "github.com/satori/go.uuid"
       log "github.com/sirupsen/logrus"
       "math/rand"
       "os"
       "os/signal"
       "strconv"
       "time"
    
       "solace.dev/go/messaging"
       "solace.dev/go/messaging/pkg/solace/config"
       "solace.dev/go/messaging/pkg/solace/resource"
    )
    
    var (
       logLevel        = flag.Int("logLevel", 4, "log level (0-6)")
       logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output")
       logFormatJson   = flag.Bool("logFormatJson", true, "log in json format")
    )
    
    func init() {
       flag.Parse()
       log.SetLevel(log.Level(*logLevel))
       log.SetReportCaller(*logReportCaller)
       log.SetFormatter(&log.TextFormatter{})
       if *logFormatJson {
          log.SetFormatter(&log.JSONFormatter{})
       }
    
    }
    
    func getEnv(key string) string {
       if val, ok := os.LookupEnv(key); ok {
          return val
       } else {
          log.Info("Missing environment variable ", key)
          os.Exit(8)
       }
       return ""
    }
    
    func main() {
       p, err := strconv.Atoi(getEnv("PARALLEL_PRODUCER"))
    
       if err != nil {
          log.Panic("PARALLEL_PRODUCER variable has to be a positive integer ")
       }
    
       msgSeqNum := 0
    
       for i := 0; i < p; i++ {
    
          go func(v int) {
    
             msgLength, err := strconv.Atoi(getEnv("MSG_LENGTH"))
             if err != nil {
                log.Panic(err)
             }
    
             TopicPrefix := getEnv("AEM_TOPIC_PREFIX")
             log.Info(TopicPrefix)
    
             // Configuration parameters
             brokerConfig := config.ServicePropertyMap{
                config.TransportLayerPropertyHost:                getEnv("AEM_CONNECTION_STRING"),
                config.ServicePropertyVPNName:                    getEnv("AEM_VPN"),
                config.AuthenticationPropertySchemeBasicUserName: getEnv("AEM_USER"),
                config.AuthenticationPropertySchemeBasicPassword: getEnv("AEM_PASSWORD"),
             }
             messagingService, err := messaging.NewMessagingServiceBuilder().
                FromConfigurationProvider(brokerConfig).
                WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()).
                Build()
             if err != nil {
                panic(err)
             }
    
             // Connect to the messaging serice
             if err := messagingService.Connect(); err != nil {
                panic(err)
             }
    
             log.Info("Connected to the broker? ", messagingService.IsConnected())
    
             //  Build a Direct Message Publisher
             directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build()
             if builderErr != nil {
                panic(builderErr)
             }
    
             startErr := directPublisher.Start()
             if startErr != nil {
                panic(startErr)
             }
    
             log.Info("Direct Publisher running? ", directPublisher.IsRunning())
    
             for directPublisher.IsReady() {
                //  Prepare outbound message payload and body
                var messageBody string
                messageBody, err = getRandomMessage(msgLength)
                if err != nil {
                   log.Panic(err)
                }
    
                id := uuid.NewV4().String()
    
                messageBuilder := messagingService.MessageBuilder().
                   WithProperty("application", "samples").
                   WithProperty("language", "go")
    
                msgSeqNum++
                message, err := messageBuilder.BuildWithStringPayload(messageBody)
                if err != nil {
                   panic(err)
                }
                topic := TopicPrefix + "/" + fmt.Sprintf("%d", v) + "/" + id
                log.Info("topic: ", topic)
                publishErr := directPublisher.Publish(message, resource.TopicOf(topic))
                if publishErr != nil {
                   panic(publishErr)
                }
                log.Info("message published:", messageBody)
             }
    
             // Terminate the Direct Publisher
             directPublisher.Terminate(2 * time.Second)
             log.Info("\nDirect Publisher Terminated? ", directPublisher.IsTerminated())
    
             // Disconnect the Message Service
             messagingService.Disconnect()
             log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected())
    
          }(i)
    
       }
    
       // Handle interrupts
    
       c := make(chan os.Signal, 1)
       signal.Notify(c, os.Interrupt)
    
       // Block until a signal is received.
       <-c
    
       // TODO
       // Find way to shutdown the go routine
       // e.g use another channel, BOOl..etc
       // TODO
    
    }
    
    var letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    
    func getRandomMessage(n int) (string, error) {
       // Seed the random number generator
       rand.Seed(time.Now().UnixNano())
    
       // Generate a random string of length n
       b := make([]byte, n)
       for i := range b {
          b[i] = letterBytes[rand.Intn(len(letterBytes))]
       }
       randomStr := string(b)
    
       // Generate a UUID
       id := uuid.NewV4()
    
       // Create the response object
       response := struct {
          Message string `json:"message"`
          ID      string `json:"id"`
       }{
          Message: randomStr,
          ID:      id.String(),
       }
    
       // Marshal the response to a JSON string
       responseJSON, err := json.Marshal(response)
       if err != nil {
          return "", err
       }
    
       return string(responseJSON), nil
    }
    
    
  • mcardy
    mcardy Member, Employee Posts: 12 Solace Employee

    @ulrich using your code and 40 parallel publishers, I do not hit your error (tried on a few different systems and let it run for a while). Are there any other details you can provide?

    Unrelated to the problem, but you can have multiple direct publishers on the same messaging service (e.g. share the messaging service outside of the goroutines) instead of a separate messaging service per goroutine. This will use fewer client and server side resources.

  • ulrich
    ulrich Member Posts: 17

    I am back to

    {"level":"info","msg":"message published:{\"message\":\"6K1QlZmjQKqjigNX56UWX6B2SsoupmvHbJtYvCGUSoa9kq1gNoETHhvEbzjsA9Q48mGtR2hFqvMhJxFjzbHdvnALMzwEJDj853fpk4eX22GimZjZ0bj4jajKjHiJAr42FYsXvk4sU47t5xVSqrTFLcsBgzuS66CfSinDbxHb2o1MwMpFgWCFEZAhi7Wdz5k2JrsfZHcSJHa2lXflfIJlOast9K3ZUR0Ps1BU9gfWeipvS3UCss9Ifcker2LWlIU9CNz79NCQJELgqrecuZpf9TRQwI033OPJ3oGxgJrDlw2bzpk2CgYs56xETRxadynJF2oJJ3jBSATS4dTgvKwYfRTKDGo35Lo0jI42E2Ix6OnFGKqWgb0KQsEGc6EzaLWYKxtvRi6CcuPHU1TRiSpweVYTr1ssk4y7B4fwGTXFiHDj8pQMUEHsMAwb9Z5ICwklbK3IfScANldTBmsOvhZWjIM0qIsjJXVaqq0lw04FYU31ihh3fKvrKXQCXuvu3IlH9tI5MY44wwwQETwPph5ZMY5J9XYeirXq7qgWRzPOnUvHpjEiowuXoYCyUiwitFE4etVxa3B6chPb72pCxBYbv0l1PSkVoGSNRmrGMpZAjUPx7HyiU4vQW5ToOk9AQ0irtzZkdndnSbM82lNdFAWS1Nc6mm470OQdY74UZmxXQi6QZO3ZIR2PgWwXNWESuqZzbrHdB5qoot6j69NGaLeurAb4KJPcYL9XsbiK8CtWIeALsWUQuVnzIH1Jfjb93qCyljcEac1eMn0cHyx9nMKuLV0xWkHi29onncmiMZ4goSQLZJQ635dDLOoMHLZlPI2oxXNgqC4VbhRecfce3nijJVWBzko0KK0TwSF0zGF4yJWS2d0DuLoHoBIAYjn3TuIqeMahzwKQR5qm6XOXioMQvGI2ceJAOEU2RpP7ScI9DwHmHTwK6EwUiEfs97PXwm5Qhrj4PYJGamNY151hhiNLOuLoCWpLjNveNJgoQFHZ\",\"id\":\"2a388f7b-d14a-4d10-9670-e1968f5d0d3b\"}","time":"2023-01-16T18:54:11Z"}
    2023/01/16 18:54:11 log.go:45: WARNING solClientMsg.c:1247                  (7fcf6effd640) Bad msg_p pointer '0xbc000ee7' in solClient_msg_dup
    {"level":"info","msg":"topic: CXS/demo1/37/8a584b2b-fbda-4cce-ac47-06d80a30b33d","time":"2023-01-16T18:54:11Z"}
    panic: invalid configuration provided: error duplicating message: Bad msg_p pointer '0xbc000ee7' in solClient_msg_dup
    
    
    goroutine 35 [running]:
    main.main.func1(0x0?)
            /home/azureuser/tmp/AdvancedEventMesh/cmd/producer/main.go:126 +0xa05
    created by main.main
            /home/azureuser/tmp/AdvancedEventMesh/cmd/producer/main.go:58 +0x94
    exit status 2
    

    data dog states no problems

    producer runs on Azure VM and solace server on AWS.

  • mcardy
    mcardy Member, Employee Posts: 12 Solace Employee
    #15 Answer ✓

    @ulrich can you try adding

    message.Dispose()
    

    after the call to Publish, like this:

    publishErr := directPublisher.Publish(message, resource.TopicOf(topic))
    if publishErr != nil {
       panic(publishErr)
    }
    log.Info("message published:", messageBody)
    message.Dispose()
    
  • ulrich
    ulrich Member Posts: 17

    looking good. Will let the producer run over night

  • ulrich
    ulrich Member Posts: 17

    As stated above - the dispose fixed the issue. Process run stable the whole night. Thanks!

    For completeness the working code:

    package main
    
    import (
       "encoding/json"
       "flag"
       "fmt"
       "github.com/satori/go.uuid"
       log "github.com/sirupsen/logrus"
       "math/rand"
       "os"
       "os/signal"
       "strconv"
       "time"
    
       "solace.dev/go/messaging"
       "solace.dev/go/messaging/pkg/solace/config"
       "solace.dev/go/messaging/pkg/solace/resource"
    )
    
    var (
       logLevel        = flag.Int("logLevel", 4, "log level (0-6)")
       logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output")
       logFormatJson   = flag.Bool("logFormatJson", true, "log in json format")
    )
    
    func init() {
       flag.Parse()
       log.SetLevel(log.Level(*logLevel))
       log.SetReportCaller(*logReportCaller)
       log.SetFormatter(&log.TextFormatter{})
       if *logFormatJson {
          log.SetFormatter(&log.JSONFormatter{})
       }
    
    }
    
    func getEnv(key string) string {
       if val, ok := os.LookupEnv(key); ok {
          return val
       } else {
          log.Info("Missing environment variable ", key)
          os.Exit(8)
       }
       return ""
    }
    
    func main() {
       p, err := strconv.Atoi(getEnv("PARALLEL_PRODUCER"))
    
       if err != nil {
          log.Panic("PARALLEL_PRODUCER variable has to be a positive integer ")
       }
    
       msgSeqNum := 0
    
       for i := 0; i < p; i++ {
    
          go func(v int) {
    
             msgLength, err := strconv.Atoi(getEnv("MSG_LENGTH"))
             if err != nil {
                log.Panic(err)
             }
    
             TopicPrefix := getEnv("SOLACE_TOPIC_PREFIX")
             log.Info(TopicPrefix)
    
             // Configuration parameters
             brokerConfig := config.ServicePropertyMap{
                config.TransportLayerPropertyHost:                getEnv("SOLACE_HOST"),
                config.ServicePropertyVPNName:                    getEnv("SOLACE_VPN"),
                config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USER"),
                config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD"),
             }
             messagingService, err := messaging.NewMessagingServiceBuilder().
                FromConfigurationProvider(brokerConfig).
                WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()).
                Build()
             if err != nil {
                panic(err)
             }
    
             // Connect to the messaging serice
             if err := messagingService.Connect(); err != nil {
                panic(err)
             }
    
             log.Info("Connected to the broker? ", messagingService.IsConnected())
    
             //  Build a Direct Message Publisher
             directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build()
             if builderErr != nil {
                panic(builderErr)
             }
    
             startErr := directPublisher.Start()
             if startErr != nil {
                panic(startErr)
             }
    
             log.Info("Direct Publisher running? ", directPublisher.IsRunning())
    
             for directPublisher.IsReady() {
                //  Prepare outbound message payload and body
                var messageBody string
                messageBody, err = getRandomMessage(msgLength)
                if err != nil {
                   log.Panic(err)
                }
    
                id := uuid.NewV4().String()
    
                messageBuilder := messagingService.MessageBuilder().
                   WithProperty("application", "samples").
                   WithProperty("language", "go")
    
                msgSeqNum++
                message, err := messageBuilder.BuildWithStringPayload(messageBody)
                if err != nil {
                   panic(err)
                }
                topic := TopicPrefix + "/" + fmt.Sprintf("%d", v) + "/" + id
                log.Info("topic: ", topic)
                publishErr := directPublisher.Publish(message, resource.TopicOf(topic))
                if publishErr != nil {
                   panic(publishErr)
                }
                log.Info("message published:", messageBody)
                message.Dispose()
    
             }
    
             // Terminate the Direct Publisher
             directPublisher.Terminate(2 * time.Second)
             log.Info("\nDirect Publisher Terminated? ", directPublisher.IsTerminated())
    
             // Disconnect the Message Service
             messagingService.Disconnect()
             log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected())
    
          }(i)
    
       }
    
       // Handle interrupts
    
       c := make(chan os.Signal, 1)
       signal.Notify(c, os.Interrupt)
    
       // Block until a signal is received.
       <-c
    
       // TODO
       // Find way to shutdown the go routine
       // e.g use another channel, BOOl..etc
       // TODO
    
    }
    
    var letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    
    func getRandomMessage(n int) (string, error) {
       // Seed the random number generator
       rand.Seed(time.Now().UnixNano())
    
       // Generate a random string of length n
       b := make([]byte, n)
       for i := range b {
          b[i] = letterBytes[rand.Intn(len(letterBytes))]
       }
       randomStr := string(b)
    
       // Generate a UUID
       id := uuid.NewV4()
    
       // Create the response object
       response := struct {
          Message string `json:"message"`
          ID      string `json:"id"`
       }{
          Message: randomStr,
          ID:      id.String(),
       }
    
       // Marshal the response to a JSON string
       responseJSON, err := json.Marshal(response)
       if err != nil {
          return "", err
       }
    
       return string(responseJSON), nil
    }