golang publisher error message
Hi,
i have written a golang base message producer using the solace golang API/SDK.
I create random messages and can define a level of parallelism and a message length.
The program works fine and creates some 1k's of messages per sec for 1k long messages. But after some time it always terminates with
{"level":"info","msg":"message published:{\"message\":\"Yq4wR0ZPomc4B0w8bVwyLFxtwIkk0b5IcfbtvHlj8JrcnfraDXvR4bMtGIWJgZLKMf7c1q962BQHtiNfJbZVQWHj8XtyBCj9FgwC8D8QXzuaMGkRb1jRowzg0pftKATls2a93x4HBvtLRg0NN9wiYnRnIA8fnF7qUFVJ3piKMba43epRlkZdTx67d43tofLCkdtXUz0uWiuNncd3GNWYdKmvtRHcrlS3cofyPD8qPuidPdheiUlTf7iiM1f8wybm8Sy2GeDBjIfm9b7bzFaFTFE1Mp3FoQTz31pOU5qsO3txwaPdUa3oO76MCXZDTIMNIoLCY4dxFOvXnKlGpzrZpsRetSU7xoaR7hCFZNxRnKqLpItjO5gRKZdu4OfZLjWVIn9ljnKGinXNHpFLusDtFvRv356KaiOWYpav9ZwBzamuWTaPAHDqNLhFyGujxbNfGKGdoLpIYJyf5UCbYbK6m56v2Xsfe8ru4EHNnGTvcQ0Lh4Q0Mx3GnbDxAeUUXtgsqzlDYYBlULO8yduB3AuGG0hhXtF1LOMF8sOyySA2c1XVYp4A4v2Wgj069Tb4FVuib1gIm2PWTu7DwEPrnr4DYnl2RILVTuFZ13suN3N2WPfiy34YXGeMzTw9TXQ9Q8Tn67qY0owzCNhnzc2EHd7ONldDHg0QCsMRXAKnzYZhgbEmU0mi0qmC8nuU3ggMV4RETU7zSMp22SH7PIQO5OrTAOPeX426FP5U3XbCteCyQjxam3M2rrMuY2TqT9Wbeciywx7WWvKgyz9OWc8rDeD51SVFdN50d7DFDmlf4wl4nf5T1PIijUeQ8LdSIJ1c6Eg4PaTGpexGrhUP7LZC3mJlDu9pLWrsZyRNaLhzIrOuBVVKXOu5s2ybXhSpfQjtZfsVNdkWHNab3bkc7tbBlYk2TX0WmpgYTaPAShFWGQHHf01bpNX0fdoXDf3iYY7ZAVIknqWcfu3GvkSukcbvx0BFoV7iF7IjeAtn02Yz48jV\",\"id\":\"3ec71a88-d228-48eb-8f1a-3f83b3f73045\"}","time":"2023-01-13T12:11:44Z"}
{"level":"info","msg":"topic: CXS/demo1/5/fb42d89f-d2f8-4500-8f46-9a0a5d6ff193","time":"2023-01-13T12:11:44Z"}
2023/01/13 12:11:44 log.go:45: WARNING solClientMsg.c:1247 (7fdc277fe640) Bad msg_p pointer '0xfc000db6' in solClient_msg_dup
panic: invalid configuration provided: error duplicating message: Bad msg_p pointer '0xfc000db6' in solClient_msg_dup
As the data is random generated I can't see how a duplicate message would come in play.
Can someone explain what is happening there and how to resolve?
Best Answer
-
@ulrich can you try adding
message.Dispose()
after the call to Publish, like this:
publishErr := directPublisher.Publish(message, resource.TopicOf(topic)) if publishErr != nil { panic(publishErr) } log.Info("message published:", messageBody) message.Dispose()
0
Answers
-
can we see the code? Also, there are formatting options in this forum for codeblocks. You can use the 3 tildes (the ` character) to form the start and choose the paragraph formatter to end it.
this is a code block
0 -
here you are:
package main import ( "encoding/json" "flag" "fmt" "github.com/satori/go.uuid" log "github.com/sirupsen/logrus" "math/rand" "os" "os/signal" "strconv" "time" "solace.dev/go/messaging" "solace.dev/go/messaging/pkg/solace/config" "solace.dev/go/messaging/pkg/solace/resource" ) var ( logLevel = flag.Int("logLevel", 4, "log level (0-6)") logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output") logFormatJson = flag.Bool("logFormatJson", true, "log in json format") ) func init() { flag.Parse() log.SetLevel(log.Level(*logLevel)) log.SetReportCaller(*logReportCaller) log.SetFormatter(&log.TextFormatter{}) if *logFormatJson { log.SetFormatter(&log.JSONFormatter{}) } } func getEnv(key string) string { if val, ok := os.LookupEnv(key); ok { return val } else { log.Info("Missing environment variable ", key) os.Exit(8) } return "" } func main() { p, err := strconv.Atoi(getEnv("PARALLEL_PRODUCER")) if err != nil { log.Panic("PARALLEL_PRODUCER variable has to be a positive integer ") } TopicPrefix := getEnv("AEM_TOPIC_PREFIX") log.Info(TopicPrefix) // Configuration parameters brokerConfig := config.ServicePropertyMap{ config.TransportLayerPropertyHost: getEnv("SOLACE_HOST"), config.ServicePropertyVPNName: getEnv("SOLACE_VPN"), config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USER"), config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD"), } messagingService, err := messaging.NewMessagingServiceBuilder(). FromConfigurationProvider(brokerConfig). WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()). Build() if err != nil { panic(err) } // Connect to the messaging serice if err := messagingService.Connect(); err != nil { panic(err) } log.Info("Connected to the broker? ", messagingService.IsConnected()) // Build a Direct Message Publisher directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build() if builderErr != nil { panic(builderErr) } startErr := directPublisher.Start() if startErr != nil { panic(startErr) } log.Info("Direct Publisher running? ", directPublisher.IsRunning()) msgSeqNum := 0 eventInd, err := strconv.ParseBool(getEnv("EVENTS_IND")) if err != nil { log.Panic(err) } msgLength, err := strconv.Atoi(getEnv("MSG_LENGTH")) if err != nil { log.Panic(err) } for i := 0; i < p; i++ { go func(v int) { for directPublisher.IsReady() { // Prepare outbound message payload and body var messageBody string messageBody, err = getRandomMessage(msgLength) if err != nil { log.Panic(err) } id := uuid.NewV4().String() messageBuilder := messagingService.MessageBuilder(). WithProperty("application", "samples"). WithProperty("language", "go") msgSeqNum++ message, err := messageBuilder.BuildWithStringPayload(messageBody) if err != nil { panic(err) } topic := TopicPrefix + "/" + fmt.Sprintf("%d", v) + "/" + id log.Info("topic: ", topic) publishErr := directPublisher.Publish(message, resource.TopicOf(topic)) if publishErr != nil { panic(publishErr) } log.Info("message published:", messageBody) } }(i) } // Handle interrupts c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) // Block until a signal is received. <-c // TODO // Find way to shutdown the go routine // e.g use another channel, BOOl..etc // TODO // Terminate the Direct Publisher directPublisher.Terminate(2 * time.Second) log.Info("\nDirect Publisher Terminated? ", directPublisher.IsTerminated()) // Disconnect the Message Service messagingService.Disconnect() log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected()) } var letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" func getRandomMessage(n int) (string, error) { // Seed the random number generator rand.Seed(time.Now().UnixNano()) // Generate a random string of length n b := make([]byte, n) for i := range b { b[i] = letterBytes[rand.Intn(len(letterBytes))] } randomStr := string(b) // Generate a UUID id := uuid.NewV4() // Create the response object response := struct { Message string `json:"message"` ID string `json:"id"` }{ Message: randomStr, ID: id.String(), } // Marshal the response to a JSON string responseJSON, err := json.Marshal(response) if err != nil { return "", err } return string(responseJSON), nil }
0 -
Hi @ulrich,
There is a bug in the Go API that prevents multiple goroutines from sharing the same DirectMessagePublisher instance that will be fixed in a future release. In the meantime, using one DirectMessagePublisher instance per goroutine will resolve the issue.
1 -
code changed - for completeness
package main import ( "flag" log "github.com/sirupsen/logrus" "os" "os/signal" "solace.dev/go/messaging" "solace.dev/go/messaging/pkg/solace" "solace.dev/go/messaging/pkg/solace/config" "solace.dev/go/messaging/pkg/solace/message" "solace.dev/go/messaging/pkg/solace/resource" "time" ) var ( logLevel = flag.Int("logLevel", 4, "log level (0-6)") logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output") logFormatJson = flag.Bool("logFormatJson", true, "log in json format") ) func init() { flag.Parse() log.SetLevel(log.Level(*logLevel)) log.SetReportCaller(*logReportCaller) log.SetFormatter(&log.TextFormatter{}) if *logFormatJson { log.SetFormatter(&log.JSONFormatter{}) } } func getEnv(key string) string { if val, ok := os.LookupEnv(key); ok { return val } else { log.Info("Missing environment variable ", key) os.Exit(8) } return "" } // Message Handler func MessageHandler(message message.InboundMessage) { log.Debug("Message Dump %s \n", message) payload, ok := message.GetPayloadAsBytes() if ok { log.Info(string(payload)) persistentReceiver.Ack(message) } else { id, ok := message.GetApplicationMessageID() if ok { log.Error("message wasn't able to be processed: ", id) } else { log.Error("general issue with message: ", message.String()) } } log.Info("") } var persistentReceiver solace.PersistentMessageReceiver func main() { // Configuration parameters brokerConfig := config.ServicePropertyMap{ config.TransportLayerPropertyHost: getEnv("SOLACE_HOST"), config.ServicePropertyVPNName: getEnv("SOLACE_VPN"), config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USER"), config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD"), } messagingService, err := messaging.NewMessagingServiceBuilder(). FromConfigurationProvider(brokerConfig). WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()). Build() if err != nil { panic(err) } // Connect to the messaging serice if err := messagingService.Connect(); err != nil { panic(err) } log.Info("Connected to the broker? ", messagingService.IsConnected()) queueName := getEnv("AEM_QUEUE") durableNonExclusiveQueue := resource.QueueDurableNonExclusive(queueName) //nonDurableExclusiveQueue := resource.QueueNonDurableExclusive(queueName) // Build a Guaranteed message receiver and bind to the given queue persistentReceiver, err = messagingService.CreatePersistentMessageReceiverBuilder().WithMessageClientAcknowledgement().Build(durableNonExclusiveQueue) log.Info("Bound to queue: %s", queueName) // Handling a panic from a non existing queue defer func() { if err := recover(); err != nil { log.Printf("Make sure queue name '%s' exists on the broker.\nThe following error occurred when attempting to connect to create a Persistent Message Receiver:\n%s", queueName, err) } }() // Start Persistent Message Receiver if err := persistentReceiver.Start(); err != nil { panic(err) } log.Info("Persistent Receiver running? ", persistentReceiver.IsRunning()) if regErr := persistentReceiver.ReceiveAsync(MessageHandler); regErr != nil { panic(regErr) } // Handle interrupts c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) // Block until a signal is received. <-c // TODO // Find way to shutdown the go routine // e.g use another channel, BOOl..etc // TODO // Terminate the Persistent Receiver persistentReceiver.Terminate(2 * time.Second) log.Info("\nDirect Receiver Terminated? ", persistentReceiver.IsTerminated()) // Disconnect the Message Service messagingService.Disconnect() log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected()) }
0 -
not yet - different errors now:
2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa81affd640) datablock_free '0x7fa87413c7e0', refcount=-2 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81affd640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa81affd640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81affd640) datablock already free '0x7fa87413c7e0', refcount=-1 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa81affd640) datablock_free '0x7fa87413c7e0', refcount=-2 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa8127fc640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa8127fc640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa8127fc640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa8127fc640) datablock already free '0x7fa87413c7e0', refcount=-1 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa8127fc640) datablock_free '0x7fa87413c7e0', refcount=-2 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 {"level":"info","msg":"topic: CXS/demo1/8/b68c7425-7076-4ac0-b0cd-9f4caae4992e","time":"2023-01-13T20:22:03Z"} 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=-1 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 {"level":"info","msg":"message published:{\"message\":\"kzYAdG6KwLM056Yv6GtJmKQLdHJnx6uNb4j1GGaGxQUO9o1JnchZkZewUe1fywH94lWvD8E7x2bczU78ATSuEyG9dayDzFDeRsfrLUOJO3nDsrmjmSed0BYgVsT2yvpCO03Azj14Mo23US3OMyG1s25LTT4ubKBZrcupF45vuGCOlsyPLc5QdxyvloPTSJ67zndmcbzcd7ePugTsQRX4twXOWRx31bjr5SGJtHzQFnI6ZonrJd8Hwsrw1V7XvpI0BJecYUC6ttY2WLOsKdftz3N6zVhO2DvQh6wtIBTHxk0fq3PadmWeUQDinpGBAwx7TJj8orFBDnAmUGTThoM5H6iaSuxyMCkuwxPn6iaeX4AEaAjpBGyfBEZUVkMU5StbevIyMZk8jOaetGlazyVfNevv9MgfB3lCUUOKWC536KFRLpGUGwF71Ahb8eHuqlGcyaUfTS4pwpMptWbKsZtzTIbd6akFmQNYLVYI04idXzDH9Qy9LB2bSJQ89FKtzqPXXkZ5ZyU3F0ZAp1JBOgKnWw514Kv3TTAg8CLGLtwzDcpqhhsuHSxE0d3cVsBxy41ooR5gvEwGK70J5OK66KcATcXAZGxTggm3h4MNBDHEMtksvyYh1O51WLSdJuhWztbkjKw4MCakAFYL7XqHejiTlQj4bbUxzZEdmmRmbgcTmaMEZy0JlAdh9rC74sd2meumcLLeQE7T3kIXKCuRSCeIFFWQFfibIJpNyjqQJJDNpZ35mdvomiMhBTG1WRUCiyEQ96Ov5O2aDCkTMV6rZS00snR6flIH7lY6OPrLmlw7W6nQ7DaeWUx22x5jf04esqzoddrQboUxpupoJKlAsnnqyM1mpHSmXDEcG8MYll9eUMxhvSyV4mm9Edy7oS7bVPf3cl7DqGgflAzyNesex5MeIfLoy75p6vS7elgx8QhNgzlE3gHMrWhpjjhSdJxn4aRm7YM9n9tn1sXcttsTSyRdJDoT3F4CE3CwbrYOwQC6\",\"id\":\"87d8fee3-ef92-4c43-bf32-3df5eb1998ba\"}","time":"2023-01-13T20:22:03Z"} {"level":"info","msg":"message published:{\"message\":\"VtGYzLm9Rt9wyN6FKdyYnTNRhcy16bhmdATy8pg7nbwwwAH762sYqA2N8UGFJtvAkqS4v28JW0KygETYSdMjtHnQlVNTkwMjZQ6gR85NSMDj89zKcOaKtp17TU6cqGbgfPFveDpzkD45tLe3nH0dda8VeNuj6arT8AmN3IdjI9sZmuQDDxSPAzoBxJsTZP9NuDdPV9Fpkhu8YzFHo8FiYix3luPD01Xydb6jcU8MIRqXljoRaoXkhkRjV0O1TF4hgrDKGsi8OBHpqY4APHfeEY7Rc8DaP3hZYHCe8RdTY2vIY5vb6QFHsZEV6T1f0K3NOLZgBrdKdrbAe2hM8EiXjOAEwgzEDm4RmH3EOena2Q46QmHJJ33qCZAgu3pqLBq1CLXVYG4eApdRIZ1bElbEkHuh0Eef7zokVWxW4wRWidV8gRV7HhoX0dOc92jsJRMpk0kRaEHuABDROiobwfpG7FJkcgk0UJu4Ijuq2UZCPFcePGhGa0aFtCQMfljetUqZJFK6rDZO5pUVSiTEkdb0YRuqaC8xPiFgLNJQSqDere30QXsl1uWYcNgvGNCohgJAv4Cnrq34VngcUR3jmkAc1IVlHuvmbfg04FqGzuMgYwDWiASFTKtelqVsdpFZ6XzIarr1EJVNgPoJRiwlTSxcjQk0X7UNa1nbqmfvxjgf5z3jKOl6ny4Rrid2HWW5tS0JNEo6UURvPA9hq8qM1kzE72ArcwCRlPptQ2ytan4UO4bEBZDFAznvpZD3f8O6c7pe7mE2cvMmJx9tz8oRkrj9wE5kll8KzPTc6nSX0u5DytzVXiQRchUwZdjPw7GTRDjwlVEWHanMbUyMvvWJNgdJv6xMSAeYGPX8zQV5saHxLfHM95FVwaHVtubMeqxhPonGcjPqUp7IlwXuUsYuoFWKP8sRu7TchLWbXHXYrmpmgZxZDgEmEx8n9WMBmu3vwCtxDf0JUEi3GzQDi0syCFVOWk0bn2RKGFXQoN3n904Z\",\"id\":\"3aaceb73-57dd-422a-9cb3-03f917e02aa1\"}","time":"2023-01-13T20:22:03Z"} 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=-1 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-2 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=-2 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-3 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 {"level":"info","msg":"topic: CXS/demo1/12/874154a4-9a58-4eae-8a40-ffa5a84f7698","time":"2023-01-13T20:22:03Z"} 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:41: ERROR solClientMsg.c:943 (7fa81bfff640) datablock_free '0x7fa87413c7e0', refcount=-1 is less then 0 /opt/cvsdirs/loadbuild/jenkins/slave/workspace/ccsmp-build@2/impl/solClientMsg.c:943 2023/01/13 20:22:03 log.go:37: CRITICAL solClientMsg.c:943 (7fa81bfff640) datablock already free '0x7fa87413c7e0', refcount=0 /opt/^Cazureuser@accenture-sandbox:~/tmp/AdvancedEventMesh$
0 -
sorry for that. Was a copy mistake with the code...
from the go mod file
solace.dev/go/messaging v1.1.0
producer code here
package main import ( "encoding/json" "flag" "fmt" "github.com/satori/go.uuid" log "github.com/sirupsen/logrus" "math/rand" "os" "os/signal" "strconv" "time" "solace.dev/go/messaging" "solace.dev/go/messaging/pkg/solace/config" "solace.dev/go/messaging/pkg/solace/resource" ) var ( logLevel = flag.Int("logLevel", 4, "log level (0-6)") logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output") logFormatJson = flag.Bool("logFormatJson", true, "log in json format") ) func init() { flag.Parse() log.SetLevel(log.Level(*logLevel)) log.SetReportCaller(*logReportCaller) log.SetFormatter(&log.TextFormatter{}) if *logFormatJson { log.SetFormatter(&log.JSONFormatter{}) } } func getEnv(key string) string { if val, ok := os.LookupEnv(key); ok { return val } else { log.Info("Missing environment variable ", key) os.Exit(8) } return "" } func main() { p, err := strconv.Atoi(getEnv("PARALLEL_PRODUCER")) if err != nil { log.Panic("PARALLEL_PRODUCER variable has to be a positive integer ") } msgSeqNum := 0 for i := 0; i < p; i++ { go func(v int) { msgLength, err := strconv.Atoi(getEnv("MSG_LENGTH")) if err != nil { log.Panic(err) } TopicPrefix := getEnv("AEM_TOPIC_PREFIX") log.Info(TopicPrefix) // Configuration parameters brokerConfig := config.ServicePropertyMap{ config.TransportLayerPropertyHost: getEnv("AEM_CONNECTION_STRING"), config.ServicePropertyVPNName: getEnv("AEM_VPN"), config.AuthenticationPropertySchemeBasicUserName: getEnv("AEM_USER"), config.AuthenticationPropertySchemeBasicPassword: getEnv("AEM_PASSWORD"), } messagingService, err := messaging.NewMessagingServiceBuilder(). FromConfigurationProvider(brokerConfig). WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()). Build() if err != nil { panic(err) } // Connect to the messaging serice if err := messagingService.Connect(); err != nil { panic(err) } log.Info("Connected to the broker? ", messagingService.IsConnected()) // Build a Direct Message Publisher directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build() if builderErr != nil { panic(builderErr) } startErr := directPublisher.Start() if startErr != nil { panic(startErr) } log.Info("Direct Publisher running? ", directPublisher.IsRunning()) for directPublisher.IsReady() { // Prepare outbound message payload and body var messageBody string messageBody, err = getRandomMessage(msgLength) if err != nil { log.Panic(err) } id := uuid.NewV4().String() messageBuilder := messagingService.MessageBuilder(). WithProperty("application", "samples"). WithProperty("language", "go") msgSeqNum++ message, err := messageBuilder.BuildWithStringPayload(messageBody) if err != nil { panic(err) } topic := TopicPrefix + "/" + fmt.Sprintf("%d", v) + "/" + id log.Info("topic: ", topic) publishErr := directPublisher.Publish(message, resource.TopicOf(topic)) if publishErr != nil { panic(publishErr) } log.Info("message published:", messageBody) } // Terminate the Direct Publisher directPublisher.Terminate(2 * time.Second) log.Info("\nDirect Publisher Terminated? ", directPublisher.IsTerminated()) // Disconnect the Message Service messagingService.Disconnect() log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected()) }(i) } // Handle interrupts c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) // Block until a signal is received. <-c // TODO // Find way to shutdown the go routine // e.g use another channel, BOOl..etc // TODO } var letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" func getRandomMessage(n int) (string, error) { // Seed the random number generator rand.Seed(time.Now().UnixNano()) // Generate a random string of length n b := make([]byte, n) for i := range b { b[i] = letterBytes[rand.Intn(len(letterBytes))] } randomStr := string(b) // Generate a UUID id := uuid.NewV4() // Create the response object response := struct { Message string `json:"message"` ID string `json:"id"` }{ Message: randomStr, ID: id.String(), } // Marshal the response to a JSON string responseJSON, err := json.Marshal(response) if err != nil { return "", err } return string(responseJSON), nil }
0 -
@ulrich using your code and 40 parallel publishers, I do not hit your error (tried on a few different systems and let it run for a while). Are there any other details you can provide?
Unrelated to the problem, but you can have multiple direct publishers on the same messaging service (e.g. share the messaging service outside of the goroutines) instead of a separate messaging service per goroutine. This will use fewer client and server side resources.
0 -
I am back to
{"level":"info","msg":"message published:{\"message\":\"6K1QlZmjQKqjigNX56UWX6B2SsoupmvHbJtYvCGUSoa9kq1gNoETHhvEbzjsA9Q48mGtR2hFqvMhJxFjzbHdvnALMzwEJDj853fpk4eX22GimZjZ0bj4jajKjHiJAr42FYsXvk4sU47t5xVSqrTFLcsBgzuS66CfSinDbxHb2o1MwMpFgWCFEZAhi7Wdz5k2JrsfZHcSJHa2lXflfIJlOast9K3ZUR0Ps1BU9gfWeipvS3UCss9Ifcker2LWlIU9CNz79NCQJELgqrecuZpf9TRQwI033OPJ3oGxgJrDlw2bzpk2CgYs56xETRxadynJF2oJJ3jBSATS4dTgvKwYfRTKDGo35Lo0jI42E2Ix6OnFGKqWgb0KQsEGc6EzaLWYKxtvRi6CcuPHU1TRiSpweVYTr1ssk4y7B4fwGTXFiHDj8pQMUEHsMAwb9Z5ICwklbK3IfScANldTBmsOvhZWjIM0qIsjJXVaqq0lw04FYU31ihh3fKvrKXQCXuvu3IlH9tI5MY44wwwQETwPph5ZMY5J9XYeirXq7qgWRzPOnUvHpjEiowuXoYCyUiwitFE4etVxa3B6chPb72pCxBYbv0l1PSkVoGSNRmrGMpZAjUPx7HyiU4vQW5ToOk9AQ0irtzZkdndnSbM82lNdFAWS1Nc6mm470OQdY74UZmxXQi6QZO3ZIR2PgWwXNWESuqZzbrHdB5qoot6j69NGaLeurAb4KJPcYL9XsbiK8CtWIeALsWUQuVnzIH1Jfjb93qCyljcEac1eMn0cHyx9nMKuLV0xWkHi29onncmiMZ4goSQLZJQ635dDLOoMHLZlPI2oxXNgqC4VbhRecfce3nijJVWBzko0KK0TwSF0zGF4yJWS2d0DuLoHoBIAYjn3TuIqeMahzwKQR5qm6XOXioMQvGI2ceJAOEU2RpP7ScI9DwHmHTwK6EwUiEfs97PXwm5Qhrj4PYJGamNY151hhiNLOuLoCWpLjNveNJgoQFHZ\",\"id\":\"2a388f7b-d14a-4d10-9670-e1968f5d0d3b\"}","time":"2023-01-16T18:54:11Z"} 2023/01/16 18:54:11 log.go:45: WARNING solClientMsg.c:1247 (7fcf6effd640) Bad msg_p pointer '0xbc000ee7' in solClient_msg_dup {"level":"info","msg":"topic: CXS/demo1/37/8a584b2b-fbda-4cce-ac47-06d80a30b33d","time":"2023-01-16T18:54:11Z"} panic: invalid configuration provided: error duplicating message: Bad msg_p pointer '0xbc000ee7' in solClient_msg_dup goroutine 35 [running]: main.main.func1(0x0?) /home/azureuser/tmp/AdvancedEventMesh/cmd/producer/main.go:126 +0xa05 created by main.main /home/azureuser/tmp/AdvancedEventMesh/cmd/producer/main.go:58 +0x94 exit status 2
data dog states no problems
producer runs on Azure VM and solace server on AWS.
0 -
@ulrich can you try adding
message.Dispose()
after the call to Publish, like this:
publishErr := directPublisher.Publish(message, resource.TopicOf(topic)) if publishErr != nil { panic(publishErr) } log.Info("message published:", messageBody) message.Dispose()
0 -
As stated above - the dispose fixed the issue. Process run stable the whole night. Thanks!
For completeness the working code:
package main import ( "encoding/json" "flag" "fmt" "github.com/satori/go.uuid" log "github.com/sirupsen/logrus" "math/rand" "os" "os/signal" "strconv" "time" "solace.dev/go/messaging" "solace.dev/go/messaging/pkg/solace/config" "solace.dev/go/messaging/pkg/solace/resource" ) var ( logLevel = flag.Int("logLevel", 4, "log level (0-6)") logReportCaller = flag.Bool("logReportCaller", false, "add caller to log output") logFormatJson = flag.Bool("logFormatJson", true, "log in json format") ) func init() { flag.Parse() log.SetLevel(log.Level(*logLevel)) log.SetReportCaller(*logReportCaller) log.SetFormatter(&log.TextFormatter{}) if *logFormatJson { log.SetFormatter(&log.JSONFormatter{}) } } func getEnv(key string) string { if val, ok := os.LookupEnv(key); ok { return val } else { log.Info("Missing environment variable ", key) os.Exit(8) } return "" } func main() { p, err := strconv.Atoi(getEnv("PARALLEL_PRODUCER")) if err != nil { log.Panic("PARALLEL_PRODUCER variable has to be a positive integer ") } msgSeqNum := 0 for i := 0; i < p; i++ { go func(v int) { msgLength, err := strconv.Atoi(getEnv("MSG_LENGTH")) if err != nil { log.Panic(err) } TopicPrefix := getEnv("SOLACE_TOPIC_PREFIX") log.Info(TopicPrefix) // Configuration parameters brokerConfig := config.ServicePropertyMap{ config.TransportLayerPropertyHost: getEnv("SOLACE_HOST"), config.ServicePropertyVPNName: getEnv("SOLACE_VPN"), config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USER"), config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD"), } messagingService, err := messaging.NewMessagingServiceBuilder(). FromConfigurationProvider(brokerConfig). WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()). Build() if err != nil { panic(err) } // Connect to the messaging serice if err := messagingService.Connect(); err != nil { panic(err) } log.Info("Connected to the broker? ", messagingService.IsConnected()) // Build a Direct Message Publisher directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build() if builderErr != nil { panic(builderErr) } startErr := directPublisher.Start() if startErr != nil { panic(startErr) } log.Info("Direct Publisher running? ", directPublisher.IsRunning()) for directPublisher.IsReady() { // Prepare outbound message payload and body var messageBody string messageBody, err = getRandomMessage(msgLength) if err != nil { log.Panic(err) } id := uuid.NewV4().String() messageBuilder := messagingService.MessageBuilder(). WithProperty("application", "samples"). WithProperty("language", "go") msgSeqNum++ message, err := messageBuilder.BuildWithStringPayload(messageBody) if err != nil { panic(err) } topic := TopicPrefix + "/" + fmt.Sprintf("%d", v) + "/" + id log.Info("topic: ", topic) publishErr := directPublisher.Publish(message, resource.TopicOf(topic)) if publishErr != nil { panic(publishErr) } log.Info("message published:", messageBody) message.Dispose() } // Terminate the Direct Publisher directPublisher.Terminate(2 * time.Second) log.Info("\nDirect Publisher Terminated? ", directPublisher.IsTerminated()) // Disconnect the Message Service messagingService.Disconnect() log.Info("Messaging Service Disconnected? ", !messagingService.IsConnected()) }(i) } // Handle interrupts c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) // Block until a signal is received. <-c // TODO // Find way to shutdown the go routine // e.g use another channel, BOOl..etc // TODO } var letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" func getRandomMessage(n int) (string, error) { // Seed the random number generator rand.Seed(time.Now().UnixNano()) // Generate a random string of length n b := make([]byte, n) for i := range b { b[i] = letterBytes[rand.Intn(len(letterBytes))] } randomStr := string(b) // Generate a UUID id := uuid.NewV4() // Create the response object response := struct { Message string `json:"message"` ID string `json:"id"` }{ Message: randomStr, ID: id.String(), } // Marshal the response to a JSON string responseJSON, err := json.Marshal(response) if err != nil { return "", err } return string(responseJSON), nil }
1