From 59c067f04b16145563c1c5b165204542dda98d54 Mon Sep 17 00:00:00 2001 From: Andres Felipe Osorio Date: Mon, 21 Oct 2024 17:18:45 -0500 Subject: [PATCH 1/2] Added "exchange" and "routeKey" connection rabbitMQ --- src/Seq.Input.RabbitMQ/RabbitMQInput.cs | 28 +++++++++++++++++++++- src/Seq.Input.RabbitMQ/RabbitMQListener.cs | 28 +++++++++++++--------- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/Seq.Input.RabbitMQ/RabbitMQInput.cs b/src/Seq.Input.RabbitMQ/RabbitMQInput.cs index edc6555..ee98404 100644 --- a/src/Seq.Input.RabbitMQ/RabbitMQInput.cs +++ b/src/Seq.Input.RabbitMQ/RabbitMQInput.cs @@ -1,6 +1,7 @@ using System; using System.IO; using System.Text; +using RabbitMQ.Client; using Seq.Apps; namespace Seq.Input.RabbitMQ @@ -50,6 +51,27 @@ public class RabbitMQInput : SeqApp, IPublishJson, IDisposable HelpText = "The RabbitMQ queue name to receive events from. The default is `Logs`.")] public string RabbitMQQueue { get; set; } = "logs"; + [SeqAppSetting( + DisplayName = "RabbitMQ exchange name", + IsOptional = true, + HelpText = "The name of the RabbitMQ exchange from which to pull events. This is the exchange " + + "where the messages are published.")] + public string rabbitMQExchangeName { get; set; } = ""; + + [SeqAppSetting( + DisplayName = "RabbitMQ exchange type", + IsOptional = true, + HelpText = "The type of the RabbitMQ exchange (e.g., direct, topic, fanout, or headers). " + + "Determines how messages are routed to the queue.")] + public string rabbitMQExchangeType { get; set; } = ExchangeType.Direct; + + [SeqAppSetting( + DisplayName = "RabbitMQ Route key", + IsOptional = true, + HelpText = "The routing key used for binding the queue to the exchange. " + + "This key is used to route messages from the exchange to the queue.")] + public string rabbitMQRouteKey { get; set; } = ""; + [SeqAppSetting( DisplayName = "Require SSL", IsOptional = true, @@ -83,6 +105,7 @@ public class RabbitMQInput : SeqApp, IPublishJson, IDisposable public void Start(TextWriter inputWriter) { var sync = new object(); + void Receive(ReadOnlyMemory body) { try @@ -107,6 +130,9 @@ void Receive(ReadOnlyMemory body) RabbitMQUser, RabbitMQPassword, RabbitMQQueue, + rabbitMQExchangeName, + rabbitMQExchangeType, + rabbitMQRouteKey, IsSsl, IsQueueDurable, IsQueueAutoDelete, @@ -124,4 +150,4 @@ public void Dispose() _listener?.Dispose(); } } -} +} \ No newline at end of file diff --git a/src/Seq.Input.RabbitMQ/RabbitMQListener.cs b/src/Seq.Input.RabbitMQ/RabbitMQListener.cs index 30ed239..bf79bc5 100644 --- a/src/Seq.Input.RabbitMQ/RabbitMQListener.cs +++ b/src/Seq.Input.RabbitMQ/RabbitMQListener.cs @@ -1,5 +1,4 @@ using System; -using System.Net.Security; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -14,13 +13,16 @@ public RabbitMQListener( Action> receive, string rabbitMQHost, string rabbitMQVHost, - int rabbitMQPort, - string rabbitMQUser, + int rabbitMQPort, + string rabbitMQUser, string rabbitMQPassword, - string rabbitMQQueue, + string rabbitMQQueue, + string rabbitMQExchangeName, + string rabbitMQExchangeType, + string rabbitMQRouteKey, bool isSsl, - bool isQueueDurable, - bool isQueueAutoDelete, + bool isQueueDurable, + bool isQueueAutoDelete, bool isQueueExclusive, bool isReceiveAutoAck) { @@ -36,16 +38,20 @@ public RabbitMQListener( Enabled = isSsl } }; - + _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); + _channel.ExchangeDeclare(exchange: rabbitMQExchangeName, type: rabbitMQExchangeType); + _channel.QueueDeclare( - rabbitMQQueue, - durable: isQueueDurable, + rabbitMQQueue, + durable: isQueueDurable, exclusive: isQueueExclusive, - autoDelete: isQueueAutoDelete, + autoDelete: isQueueAutoDelete, arguments: null); + + _channel.QueueBind(queue: rabbitMQQueue, exchange: rabbitMQExchangeName, routingKey: rabbitMQRouteKey); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => receive(ea.Body); @@ -63,4 +69,4 @@ public void Dispose() _connection?.Close(); } } -} +} \ No newline at end of file From 4cdf5fc186695a9bc51f2c99e58e14a61b205619 Mon Sep 17 00:00:00 2001 From: Andres Felipe Osorio Date: Wed, 23 Oct 2024 09:14:09 -0500 Subject: [PATCH 2/2] Validation rabbitMQExchangeName and rabbitMQRouteKey --- .../RabbitMQListener.cs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) rename src/{Seq.Input.RabbitMQ => PipeOsorio.Seq.Input.RabbitMQ}/RabbitMQListener.cs (77%) diff --git a/src/Seq.Input.RabbitMQ/RabbitMQListener.cs b/src/PipeOsorio.Seq.Input.RabbitMQ/RabbitMQListener.cs similarity index 77% rename from src/Seq.Input.RabbitMQ/RabbitMQListener.cs rename to src/PipeOsorio.Seq.Input.RabbitMQ/RabbitMQListener.cs index bf79bc5..e775d34 100644 --- a/src/Seq.Input.RabbitMQ/RabbitMQListener.cs +++ b/src/PipeOsorio.Seq.Input.RabbitMQ/RabbitMQListener.cs @@ -42,7 +42,13 @@ public RabbitMQListener( _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); - _channel.ExchangeDeclare(exchange: rabbitMQExchangeName, type: rabbitMQExchangeType); + if (rabbitMQExchangeName != string.Empty) + _channel.ExchangeDeclare( + exchange: rabbitMQExchangeName, + type: rabbitMQExchangeType, + durable: isQueueDurable, + autoDelete: isQueueAutoDelete + ); _channel.QueueDeclare( rabbitMQQueue, @@ -50,8 +56,13 @@ public RabbitMQListener( exclusive: isQueueExclusive, autoDelete: isQueueAutoDelete, arguments: null); - - _channel.QueueBind(queue: rabbitMQQueue, exchange: rabbitMQExchangeName, routingKey: rabbitMQRouteKey); + + if (rabbitMQRouteKey != string.Empty) + _channel.QueueBind( + queue: rabbitMQQueue, + exchange: rabbitMQExchangeName, + routingKey: rabbitMQRouteKey + ); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => receive(ea.Body);