I have referred sample code provided by the Solace team for implemented Pub/Sub Queue message ( GitHub - SolaceSamples/solace-samples-dotnet: Getting Started Samples for the Solace .NET API )
Sample code is implemented part of Microsoft .Net 6 Worker service and scheduled to run every 30 mins. I am facing new issue and i really don’t know what went wrong in my piece code. working completely inconsistent and i don’t find any other reference or technical support. Following issues are occurring
Scenario: I have 10 different Queues for subscribe message and each queue will have at least 20 message available( larger json formatted) which mean 10 Queue X 20 Message = 200 Message to download(subscribe) every scheduled run.
Issue 1: All the Queue has a message(20 each) and downloaded all messages from 8 Queue and 2 other Queue incomplete in first run, if i rerun second time, all the pending message from those 2 queue also downloaded.
in simple, all messages are not downloading at one go.
Issue 2: its slightly different than 1st issue. All the Queue has a message(20 each) and downloaded all messages from 7 Queue and 3 other Queue incomplete. even if i rerun multiple time, pending message still remain in solace Queue and there is no error or log to figure out the issue.
To simplify, few messages permanently stuck in solace and unable to download forever.
Issue 3: few occasions, I have multiple message in particular Queue. Mostly all the message downloaded one go. However, sometime only one message getting download per scheduled run.
Which mean, every scheduled run only one message subscribed at a time and not all the message.
Issue 4: few occasions, Messages missing in solace. which mean, i didn’t acknowledge the message however its missing from Queue.
Note: The code that is implemented in .Net Worker service, i slightly tweaked to Asynchronous call. Also, same piece code I implemented in AWS Lambda using C# and all above issue remain same.
QueueName1: HNK/MTC/PROPOSAL/PROPOSAL/PROPOSAL/3.0.0/LMDM/UPD/APAC/VN/OND/CUSTOM
QueueName2: HNK/STP/PRODUCT/MATERIAL/FINISHED/3.0.0/LMDM/UPD/APAC/VN/OND/CUSTOM
QueueName3: HNK/MTC/SALES/SALESORG/SALESORG/1.0.0/LMDM/UPD/APAC/VN/OND/CUSTOM >
using System;
using System.Text;
using SolaceSystems.Solclient.Messaging;
using System.Threading;
namespace Lambda_Queue_Service
{
public class SolaceSubscribeMessage : IDisposable
{
string VPNName { get; set; }
string UserName { get; set; }
string Password { get; set; }
private bool disposedValue = false;
private static string folderName = “data”;
private ISession Session = null;
private IQueue Queue = null;
private IFlow Flow = null;
private EventWaitHandle WaitEventWaitHandle = new AutoResetEvent(false);
public static async Task SubScribeMessage(string Host, string VPNName, string UserName, string Password, string queueName, string TemplateCode)
{
string host = Host;
string username = UserName;
string vpnname = VPNName;
string password = Password;
ContextFactoryProperties cfp = new ContextFactoryProperties()
{
SolClientLogLevel = SolLogLevel.Warning
};
cfp.LogToConsoleError();
ContextFactory.Instance.Init(cfp);
try
{
using (IContext context = ContextFactory.Instance.CreateContext(new ContextProperties(), null))
{
using (SolaceSubscribeMessage queueConsumer = new SolaceSubscribeMessage()
{
VPNName = vpnname,
UserName = username,
Password = password
})
{
await queueConsumer.Run(context, host, queueName, TemplateCode);
}
}
}
catch (Exception ex)
{
Console.WriteLine(“Exception thrown: {0} - {1}”, ex.Message, TemplateCode);
}
finally
{
// Dispose Solace Systems Messaging API
ContextFactory.Instance.Cleanup();
}
}
async Task Run(IContext context, string host, string queueName, string TemplateCode)
{
folderName = TemplateCode;
SessionProperties sessionProps = new SessionProperties()
{
Host = host,
VPNName = VPNName,
UserName = UserName,
Password = Password,
ReconnectRetries = 10,
SSLValidateCertificate = false,
SdkBufferSize = 8000000,
SocketSendBufferSizeInBytes = 8000000,
SocketReceiveBufferSizeInBytes = 8000000,
ConnectTimeoutInMsecs = 300,
ConnectRetries = 10
};
Session = context.CreateSession(sessionProps, HandleMessageEvent, null);
ReturnCode returnCode = Session.Connect();
if (returnCode == ReturnCode.SOLCLIENT_OK)
{
EndpointProperties endpointProps = new EndpointProperties()
{
Permission = EndpointProperties.EndpointPermission.Consume,
AccessType = EndpointProperties.EndpointAccessType.Exclusive
};
Queue = ContextFactory.Instance.CreateQueue(queueName);
Session.Provision(Queue, endpointProps,
ProvisionFlag.IgnoreErrorIfEndpointAlreadyExists | ProvisionFlag.WaitForConfirm, null);
Flow = Session.CreateFlow(new FlowProperties()
{
AckMode = MessageAckMode.ClientAck
},
Queue, null, HandleMessageEvent, HandleFlowEvent);
Flow.Start();
WaitEventWaitHandle.WaitOne(5000);
}
else
{
Console.WriteLine(“Error connecting, return code: {0}”, returnCode);
}
}
private async void HandleMessageEvent(object source, MessageEventArgs args)
{
using (IMessage message = args.Message)
{
Console.WriteLine(folderName + “- Message - " + DateTime.Now.ToString());
string responseMsg = Encoding.ASCII.GetString(message.BinaryAttachment);
Guid guid = Guid.NewGuid();
AmazonS3Utility s3Utility = new AmazonS3Utility();
await s3Utility.UploadFileAsync(responseMsg, string.Format(”{0}_data.json", guid), folderName); // Upload the message to AWS S3
Flow.Ack(message.ADMessageId);
WaitEventWaitHandle.Set();
}
}
public void HandleFlowEvent(object sender, FlowEventArgs args)
{
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
if (Session != null)
{
Session.Dispose();
Session = null;
}
if (Queue != null)
{
Queue.Dispose();
Queue = null;
}
if (Flow != null)
{
Flow.Dispose();
Flow = null;
}
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
Lambda_CSharp_Code.txt (5.6 KB)
SampleJSON2.txt (8.01 KB)
SampleJSON3.txt (4.74 KB)
SampleJSON1.txt (8.01 KB)