Difference between revisions of "RabbitMQ"
(→Exchange Types) |
|||
| (2 intermediate revisions by the same user not shown) | |||
| Line 18: | Line 18: | ||
If you wish to change the default username and password of guest / guest, you can do so with the RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS environmental variables: | If you wish to change the default username and password of guest / guest, you can do so with the RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS environmental variables: | ||
docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management | docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management | ||
| + | ==== Port Mappings ==== | ||
| + | $ docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 8090:5672 rabbitmq:3-management | ||
| + | You can then go to http://localhost:8080 or http://host-ip:8080 in a browser. | ||
| + | |||
== Starting Server == | == Starting Server == | ||
To start in foreground type in bash | To start in foreground type in bash | ||
| Line 34: | Line 38: | ||
== Exchange Types == | == Exchange Types == | ||
| + | Follow [[RabbitMQ Exchange Types]] to see in detail | ||
=== Direct === | === Direct === | ||
Publish direct with queue keys | Publish direct with queue keys | ||
| Line 42: | Line 47: | ||
=== Headers === | === Headers === | ||
instead of keys it looks for headers | instead of keys it looks for headers | ||
| − | |||
== QUEUES == | == QUEUES == | ||
Latest revision as of 13:05, 23 February 2021
It's an open source message queue management system. (rabbitmq.com). it uses AMQP (Advanced, Message Queue Protocol)
Contents
Server Installation
Mac
brew update brew install rabbitmq
in ~/.bashrc or other bash files add
export PATH=$PATH:/usr/local/sbin
Docker
docker pull rabbitmq docker run -d --hostname myrabbit --name some-rabbit rabbitmq:3 docker run -d --hostname myrabbit --name some-rabbit rabbitmq:3-management
Setting default user and password
If you wish to change the default username and password of guest / guest, you can do so with the RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS environmental variables:
docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
Port Mappings
$ docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 8090:5672 rabbitmq:3-management
You can then go to http://localhost:8080 or http://host-ip:8080 in a browser.
Starting Server
To start in foreground type in bash
rabbitmq-server
To start in background
brew services start rabbitmq
Enabling Management Portal
rabbitmq -plugins enable rabbitmq_management
- User : guest
- Pass : guest
Installation of Client
in nuget packages install RabbitMQ.Client
Exchange Types
Follow RabbitMQ Exchange Types to see in detail
Direct
Publish direct with queue keys
Fanout
Ignores keys and broadcast to many queues
Topic
publish to keys with asterisk(*)
Headers
instead of keys it looks for headers
QUEUES
Queue names can be maximum 255 chars. Messages are removed from the queue once the broker sends it to the consumer or once the concsumer sends an acknowledgement to the broker.
Common Classes
used in the examples below
[Serializable]
public class Payment
{
public decimal AmountToPay;
public string CardNumber;
public string Name;
}
[Serializable]
public class PurchaseOrder
{
public decimal AmountToPay;
public string PoNumber;
public string CompanyName;
public int PaymentDayTerms;
}
public static class ObjectSerialize
{
public static byte[] Serialize(this object obj)
{
if (obj == null)
{
return null;
}
var json = JsonConvert.SerializeObject(obj);
return Encoding.ASCII.GetBytes(json);
}
public static object DeSerialize(this byte[] arrBytes, Type type)
{
var json = Encoding.Default.GetString(arrBytes);
return JsonConvert.DeserializeObject(json, type);
}
public static string DeSerializeText(this byte[] arrBytes)
{
return Encoding.Default.GetString(arrBytes);
}
}
}
Standard Queue
class Program
{
private static ConnectionFactory _factory;
private static IConnection _connection;
private static IModel _model;
private const string QueueName = "StandardQueue_ExampleQueue";
public static void Main()
{
var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
CreateQueue();
SendMessage(payment1);
SendMessage(payment2);
SendMessage(payment3);
SendMessage(payment4);
SendMessage(payment5);
SendMessage(payment6);
SendMessage(payment7);
SendMessage(payment8);
SendMessage(payment9);
SendMessage(payment10);
Recieve();
Console.ReadLine();
}
private static void CreateQueue()
{
_factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest"};
_connection = _factory.CreateConnection();
_model = _connection.CreateModel();
_model.QueueDeclare(QueueName, true, false, false, null);
}
private static void SendMessage(Payment message)
{
_model.BasicPublish("", QueueName, null, message.Serialize());
Console.WriteLine(" [x] Payment Message Sent : {0} : {1} : {2}", message.CardNumber, message.AmountToPay, message.Name);
}
public static void Recieve()
{
var consumer = new QueueingBasicConsumer(_model);
var msgCount = GetMessageCount(_model, QueueName);
_model.BasicConsume(QueueName, true, consumer);
var count = 0;
while (count < msgCount)
{
var message = (Payment)consumer.Queue.Dequeue().Body.DeSerialize(typeof(Payment));
Console.WriteLine("----- Received {0} : {1} : {2}", message.CardNumber, message.AmountToPay, message.Name);
count++;
}
}
private static uint GetMessageCount(IModel channel, string queueName)
{
var results = channel.QueueDeclare(queueName, true, false, false, null);
return results.MessageCount;
}
}
Worker Queue
(Load Balance)
Producer
public class Program
{
private static ConnectionFactory _factory;
private static IConnection _connection;
private static IModel _model;
private const string QueueName = "WorkerQueue_Queue";
static void Main()
{
var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234" };
var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234" };
var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234" };
var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234" };
var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234" };
var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234" };
var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234" };
var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" };
CreateConnection();
SendMessage(payment1);
SendMessage(payment2);
SendMessage(payment3);
SendMessage(payment4);
SendMessage(payment5);
SendMessage(payment6);
SendMessage(payment7);
SendMessage(payment8);
SendMessage(payment9);
SendMessage(payment10);
Console.ReadLine();
}
private static void CreateConnection()
{
_factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
_connection = _factory.CreateConnection();
_model = _connection.CreateModel();
_model.QueueDeclare(QueueName, true, false, false, null);
}
private static void SendMessage(Payment message)
{
_model.BasicPublish("", QueueName, null, message.Serialize());
Console.WriteLine(" Payment Sent {0}, £{1}", message.CardNumber, message.AmountToPay);
}
}
Consumer
public class Program
{
private static ConnectionFactory _factory;
private static IConnection _connection;
private const string QueueName = "WorkerQueue_Queue";
static void Main()
{
Receive();
Console.ReadLine();
}
public static void Receive()
{
_factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
using (_connection = _factory.CreateConnection())
{
using (var channel = _connection.CreateModel())
{
channel.QueueDeclare(QueueName, true, false, false, null);
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, false, consumer);
while (true)
{
var ea = consumer.Queue.Dequeue();
var message = (Payment)ea.Body.DeSerialize(typeof(Payment));
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine("----- Payment Processed {0} : {1}", message.CardNumber, message.AmountToPay);
}
}
}
}
}
in line channel.BasicQos(0, 1, false); with parameter "1" we say don't send message to queue until get an acknowledgment
So when we have more instances of consumer they share the messsages like a load balancer.
Publish & Subscribe
Producer
class Program
{
private static ConnectionFactory _factory;
private static IConnection _connection;
private static IModel _model;
private const string ExchangeName = "PublishSubscribe_Exchange";
static void Main()
{
var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234" };
var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234" };
var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234" };
var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234" };
var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234" };
var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234" };
var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234" };
var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" };
CreateConnection();
SendMessage(payment1);
SendMessage(payment2);
SendMessage(payment3);
SendMessage(payment4);
SendMessage(payment5);
SendMessage(payment6);
SendMessage(payment7);
SendMessage(payment8);
SendMessage(payment9);
SendMessage(payment10);
Console.ReadLine();
}
private static void CreateConnection()
{
_factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
_connection = _factory.CreateConnection();
_model = _connection.CreateModel();
_model.ExchangeDeclare(ExchangeName, "fanout", false);
}
private static void SendMessage(Payment message)
{
_model.BasicPublish(ExchangeName, "", null, message.Serialize());
Console.WriteLine(" Payment Sent {0}, £{1}", message.CardNumber, message.AmountToPay);
}
}
in line _model.ExchangeDeclare(ExchangeName, "fanout", false); we use fanout exchange
Consumer
class Program
{
private static ConnectionFactory _factory;
private static IConnection _connection;
private static QueueingBasicConsumer _consumer;
private const string ExchangeName = "PublishSubscribe_Exchange";
static void Main()
{
_factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
using (_connection = _factory.CreateConnection())
{
using (var channel = _connection.CreateModel())
{
var queueName = DeclareAndBindQueueToExchange(channel);
channel.BasicConsume(queueName, true, _consumer);
while (true)
{
var ea = _consumer.Queue.Dequeue();
var message = (Payment)ea.Body.DeSerialize(typeof(Payment));
Console.WriteLine("----- Payment Processed {0} : {1}", message.CardNumber, message.AmountToPay);
}
}
}
}
private static string DeclareAndBindQueueToExchange(IModel channel)
{
channel.ExchangeDeclare(ExchangeName, "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName, ExchangeName, "");
_consumer = new QueueingBasicConsumer(channel);
return queueName;
}
}
Direct Routing
Producer
class Program
{
private static ConnectionFactory _factory;
private static IConnection _connection;
private static IModel _model;
private const string ExchangeName = "DirectRouting_Exchange";
private const string CardPaymentQueueName = "CardPaymentDirectRouting_Queue";
private const string PurchaseOrderQueueName = "PurchaseOrderDirectRouting_Queue";
static void Main()
{
var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234" };
var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234" };
var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234" };
var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234" };
var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234" };
var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234" };
var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234" };
var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" };
var purchaseOrder1 = new PurchaseOrder{AmountToPay = 50.0m, CompanyName = "Company A", PaymentDayTerms = 75, PoNumber = "123434A"};
var purchaseOrder2 = new PurchaseOrder { AmountToPay = 150.0m, CompanyName = "Company B", PaymentDayTerms = 75, PoNumber = "193434B" };
var purchaseOrder3 = new PurchaseOrder { AmountToPay = 12.0m, CompanyName = "Company C", PaymentDayTerms = 75, PoNumber = "196544A" };
var purchaseOrder4 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company D", PaymentDayTerms = 75, PoNumber = "234434H" };
var purchaseOrder5 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company E", PaymentDayTerms = 75, PoNumber = "876434W" };
var purchaseOrder6 = new PurchaseOrder { AmountToPay = 7150.0m, CompanyName = "Company F", PaymentDayTerms = 75, PoNumber = "1423474U" };
var purchaseOrder7 = new PurchaseOrder { AmountToPay = 3150.0m, CompanyName = "Company G", PaymentDayTerms = 75, PoNumber = "1932344O" };
var purchaseOrder8 = new PurchaseOrder { AmountToPay = 3190.0m, CompanyName = "Company H", PaymentDayTerms = 75, PoNumber = "1123457Q" };
var purchaseOrder9 = new PurchaseOrder { AmountToPay = 50.0m, CompanyName = "Company I", PaymentDayTerms = 75, PoNumber = "1595344R" };
var purchaseOrder10 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company J", PaymentDayTerms = 75, PoNumber = "656734L" };
CreateConnection();
SendPayment(payment1);
SendPayment(payment2);
SendPayment(payment3);
SendPayment(payment4);
SendPayment(payment5);
SendPayment(payment6);
SendPayment(payment7);
SendPayment(payment8);
SendPayment(payment9);
SendPayment(payment10);
SendPurchaseOrder(purchaseOrder1);
SendPurchaseOrder(purchaseOrder2);
SendPurchaseOrder(purchaseOrder3);
SendPurchaseOrder(purchaseOrder4);
SendPurchaseOrder(purchaseOrder5);
SendPurchaseOrder(purchaseOrder6);
SendPurchaseOrder(purchaseOrder7);
SendPurchaseOrder(purchaseOrder8);
SendPurchaseOrder(purchaseOrder9);
SendPurchaseOrder(purchaseOrder10);
}
private static void SendPayment(Payment payment)
{
SendMessage(payment.Serialize(), "CardPayment");
Console.WriteLine(" Payment Sent {0}, £{1}", payment.CardNumber, payment.AmountToPay);
}
private static void SendPurchaseOrder(PurchaseOrder purchaseOrder)
{
SendMessage(purchaseOrder.Serialize(), "PurchaseOrder");
Console.WriteLine(" Purchase Order Sent {0}, £{1}, {2}, {3}", purchaseOrder.CompanyName, purchaseOrder.AmountToPay, purchaseOrder.PaymentDayTerms, purchaseOrder.PoNumber);
}
private static void CreateConnection()
{
_factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
_connection = _factory.CreateConnection();
_model = _connection.CreateModel();
_model.ExchangeDeclare(ExchangeName, "direct");
_model.QueueDeclare(CardPaymentQueueName, true, false, false, null);
_model.QueueDeclare(PurchaseOrderQueueName, true, false, false, null);
_model.QueueBind(CardPaymentQueueName, ExchangeName, "CardPayment");
_model.QueueBind(PurchaseOrderQueueName, ExchangeName, "PurchaseOrder");
}
private static void SendMessage(byte[] message, string routingKey)
{
_model.BasicPublish(ExchangeName, routingKey, null, message);
}
}
Consumer 1 (Payment)
class Program
{
private static ConnectionFactory _factory;
private static IConnection _connection;
private const string ExchangeName = "DirectRouting_Exchange";
private const string CardPaymentQueueName = "CardPaymentDirectRouting_Queue";
static void Main()
{
_factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
using (_connection = _factory.CreateConnection())
{
using (var channel = _connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct");
channel.QueueDeclare(CardPaymentQueueName, true, false, false, null);
channel.QueueBind(CardPaymentQueueName, ExchangeName, "CardPayment");
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(CardPaymentQueueName, false, consumer);
while (true)
{
var ea = consumer.Queue.Dequeue();
var message = (Payment)ea.Body.DeSerialize(typeof(Payment));
var routingKey = ea.RoutingKey;
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine("--- Payment - Routing Key <{0}> : {1} : {2}", routingKey, message.CardNumber, message.AmountToPay);
}
}
}
}
}
Consumer 1 (PurchaseOrder)
class Program
{
private static ConnectionFactory _factory;
private static IConnection _connection;
private const string ExchangeName = "DirectRouting_Exchange";
private const string PurchaseOrderQueueName = "PurchaseOrderDirectRouting_Queue";
static void Main()
{
_factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
using (_connection = _factory.CreateConnection())
{
using (var channel = _connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct");
channel.QueueDeclare(PurchaseOrderQueueName, true, false, false, null);
channel.QueueBind(PurchaseOrderQueueName, ExchangeName, "PurchaseOrder");
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(PurchaseOrderQueueName, false, consumer);
while (true)
{
var ea = consumer.Queue.Dequeue();
var message = (PurchaseOrder)ea.Body.DeSerialize(typeof(PurchaseOrder));
var routingKey = ea.RoutingKey;
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine("-- Purchase Order - Routing Key <{0}> : {1}, £{2}, {3}, {4}", routingKey, message.CompanyName, message.AmountToPay, message.PaymentDayTerms, message.PoNumber);
}
}
}
}
}