Difference between revisions of "RabbitMQ"

From Logic Wiki
Jump to: navigation, search
(Created page with "Category:Microservices Category:MessageQueue It's an open source message queue management system. ([http://rabbitmq.com rabbitmq.com]). it uses AMQP (Advanced, Messag...")
 
Line 278: Line 278:
  
 
So when we have more instances of consumer they share the messsages like a load balancer.  
 
So when we have more instances of consumer they share the messsages like a load balancer.  
== Publish & Subscribe ==
+
=== Publish & Subscribe ===
 
+
  
 
==== Producer====
 
==== Producer====

Revision as of 12:51, 8 February 2021


It's an open source message queue management system. (rabbitmq.com). it uses AMQP (Advanced, Message Queue Protocol)


Server Installation

Mac

brew update
brew install rabbitmq

in ~/.bashrc or other bash files add

export PATH=$PATH:/usr/local/sbin

Docker

docker pull rabbitmq
docker run -d --hostname myrabbit --name some-rabbit rabbitmq:3
docker run -d --hostname myrabbit --name some-rabbit rabbitmq:3-management

Setting default user and password

If you wish to change the default username and password of guest / guest, you can do so with the RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS environmental variables:

docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management

Starting Server

To start in foreground type in bash

rabbitmq-server

To start in background

brew services start rabbitmq

Enabling Management Portal

rabbitmq -plugins enable rabbitmq_management

http://localhost:15672

  • User : guest
  • Pass : guest

Installation of Client

in nuget packages install RabbitMQ.Client

Exchange Types

Direct

Publish direct with queue keys

Fanout

Ignores keys and broadcast to many queues

Topic

publish to keys with asterisk(*)

Headers

instead of keys it looks for headers


QUEUES

Queue names can be maximum 255 chars. Messages are removed from the queue once the broker sends it to the consumer or once the concsumer sends an acknowledgement to the broker.

Common Classes

used in the examples below

    [Serializable] 
    public class Payment
    {
        public decimal AmountToPay;
        public string CardNumber;        
        public string Name;
    }
    [Serializable]
    public class PurchaseOrder
    {
        public decimal AmountToPay;
        public string PoNumber;        
        public string CompanyName;
        public int PaymentDayTerms;
    }
 public static class ObjectSerialize
    {
        public static byte[] Serialize(this object obj)
        {
            if (obj == null)
            {
                return null;
            }

            var json = JsonConvert.SerializeObject(obj);
            return Encoding.ASCII.GetBytes(json);
        }

        public static object DeSerialize(this byte[] arrBytes, Type type)
        {
            var json = Encoding.Default.GetString(arrBytes);
            return JsonConvert.DeserializeObject(json, type);         
        }

        public static string DeSerializeText(this byte[] arrBytes)
        {
            return Encoding.Default.GetString(arrBytes);            
        }
    }
}

Standard Queue

    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;
                
        private const string QueueName = "StandardQueue_ExampleQueue";

        public static void Main()
        {            
            var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
            var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" };
                        
            CreateQueue();            
                        
            SendMessage(payment1);
            SendMessage(payment2);
            SendMessage(payment3);
            SendMessage(payment4);
            SendMessage(payment5);
            SendMessage(payment6);
            SendMessage(payment7);
            SendMessage(payment8);
            SendMessage(payment9);
            SendMessage(payment10);
                                 
            Recieve();

            Console.ReadLine();
        }

        private static void CreateQueue()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest"};
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel(); 
                     
            _model.QueueDeclare(QueueName, true, false, false, null);            
        }

        private static void SendMessage(Payment message)
        {            
            _model.BasicPublish("", QueueName, null, message.Serialize());
            Console.WriteLine(" [x] Payment Message Sent : {0} : {1} : {2}", message.CardNumber, message.AmountToPay, message.Name);            
        }

        public static void Recieve()
        {
            var consumer = new QueueingBasicConsumer(_model);            
            var msgCount = GetMessageCount(_model, QueueName);
            
            _model.BasicConsume(QueueName, true, consumer);
                    
            var count = 0;
            
            while (count < msgCount)
            {                               
                var message = (Payment)consumer.Queue.Dequeue().Body.DeSerialize(typeof(Payment));

                Console.WriteLine("----- Received {0} : {1} : {2}", message.CardNumber, message.AmountToPay, message.Name);
                count++;
            }                   
        }

        private static uint GetMessageCount(IModel channel, string queueName)
        {
            var results = channel.QueueDeclare(queueName, true, false, false, null);
            return results.MessageCount;                
        }
    }
 

Worker Queue

(Load Balance)

Producer

 public class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;
        
        private const string QueueName = "WorkerQueue_Queue";

        static void Main()
        {            
            var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234" };
            var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
            var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234" };
            var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234" };
            var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234" };
            var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234" };
            var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234" };
            var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234" };
            var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
            var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" };
            
            CreateConnection();
            
            SendMessage(payment1);
            SendMessage(payment2);
            SendMessage(payment3);
            SendMessage(payment4);
            SendMessage(payment5);
            SendMessage(payment6);
            SendMessage(payment7);
            SendMessage(payment8);
            SendMessage(payment9);
            SendMessage(payment10);

            Console.ReadLine();
        }

        private static void CreateConnection()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            
            _model.QueueDeclare(QueueName, true, false, false, null);
        }

        private static void SendMessage(Payment message)
        {                        
            _model.BasicPublish("", QueueName, null, message.Serialize());
            Console.WriteLine(" Payment Sent {0}, £{1}", message.CardNumber, message.AmountToPay);                
        }
    }

Consumer

 public class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        
        private const string QueueName = "WorkerQueue_Queue";

        static void Main()
        {
            Receive();

            Console.ReadLine();
        }

        public static void Receive()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
            using (_connection = _factory.CreateConnection())
            {
                using (var channel = _connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, true, false, false, null);
                    channel.BasicQos(0, 1, false);

                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(QueueName, false, consumer); 

                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();
                        var message = (Payment)ea.Body.DeSerialize(typeof(Payment));
                        channel.BasicAck(ea.DeliveryTag, false);

                        Console.WriteLine("----- Payment Processed {0} : {1}", message.CardNumber, message.AmountToPay);
                    }
                }
            }
        }
    }

in line channel.BasicQos(0, 1, false); with parameter "1" we say don't send message to queue until get an acknowledgment

So when we have more instances of consumer they share the messsages like a load balancer.

Publish & Subscribe

Producer

    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;

        private const string ExchangeName = "PublishSubscribe_Exchange";

        static void Main()
        {
            var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234" };
            var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
            var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234" };
            var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234" };
            var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234" };
            var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234" };
            var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234" };
            var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234" };
            var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
            var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" };

            CreateConnection();

            SendMessage(payment1);
            SendMessage(payment2);
            SendMessage(payment3);
            SendMessage(payment4);
            SendMessage(payment5);
            SendMessage(payment6);
            SendMessage(payment7);
            SendMessage(payment8);
            SendMessage(payment9);
            SendMessage(payment10);

            Console.ReadLine();
        }

        private static void CreateConnection()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            _model.ExchangeDeclare(ExchangeName, "fanout", false);
        }

        private static void SendMessage(Payment message)
        {           
            _model.BasicPublish(ExchangeName, "", null, message.Serialize());
            Console.WriteLine(" Payment Sent {0}, £{1}", message.CardNumber, message.AmountToPay);             
        }
    }

in line _model.ExchangeDeclare(ExchangeName, "fanout", false); we use fanout exchange

Consumer

 class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static QueueingBasicConsumer _consumer;

        private const string ExchangeName = "PublishSubscribe_Exchange";

        static void Main()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
            using (_connection = _factory.CreateConnection())
            {
                using (var channel = _connection.CreateModel())
                {
                    var queueName = DeclareAndBindQueueToExchange(channel);
                    channel.BasicConsume(queueName, true, _consumer);

                    while (true)
                    {
                        var ea = _consumer.Queue.Dequeue();
                        var message = (Payment)ea.Body.DeSerialize(typeof(Payment));

                        Console.WriteLine("----- Payment Processed {0} : {1}", message.CardNumber, message.AmountToPay);
                    }
                }
            }
        }


        private static string DeclareAndBindQueueToExchange(IModel channel)
        {
            channel.ExchangeDeclare(ExchangeName, "fanout");
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queueName, ExchangeName, "");
            _consumer = new QueueingBasicConsumer(channel);
            return queueName;
        }
    }

Direct Routing

Producer

class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;

        private const string ExchangeName = "DirectRouting_Exchange";
        private const string CardPaymentQueueName = "CardPaymentDirectRouting_Queue";
        private const string PurchaseOrderQueueName = "PurchaseOrderDirectRouting_Queue";

        static void Main()
        {
            var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234" };
            var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
            var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234" };
            var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234" };
            var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234" };
            var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234" };
            var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234" };
            var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234" };
            var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
            var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" };

            var purchaseOrder1 = new PurchaseOrder{AmountToPay = 50.0m, CompanyName = "Company A", PaymentDayTerms = 75, PoNumber = "123434A"};
            var purchaseOrder2 = new PurchaseOrder { AmountToPay = 150.0m, CompanyName = "Company B", PaymentDayTerms = 75, PoNumber = "193434B" };
            var purchaseOrder3 = new PurchaseOrder { AmountToPay = 12.0m, CompanyName = "Company C", PaymentDayTerms = 75, PoNumber = "196544A" };
            var purchaseOrder4 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company D", PaymentDayTerms = 75, PoNumber = "234434H" };
            var purchaseOrder5 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company E", PaymentDayTerms = 75, PoNumber = "876434W" };
            var purchaseOrder6 = new PurchaseOrder { AmountToPay = 7150.0m, CompanyName = "Company F", PaymentDayTerms = 75, PoNumber = "1423474U" };
            var purchaseOrder7 = new PurchaseOrder { AmountToPay = 3150.0m, CompanyName = "Company G", PaymentDayTerms = 75, PoNumber = "1932344O" };
            var purchaseOrder8 = new PurchaseOrder { AmountToPay = 3190.0m, CompanyName = "Company H", PaymentDayTerms = 75, PoNumber = "1123457Q" };
            var purchaseOrder9 = new PurchaseOrder { AmountToPay = 50.0m, CompanyName = "Company I", PaymentDayTerms = 75, PoNumber =   "1595344R" };
            var purchaseOrder10 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company J", PaymentDayTerms = 75, PoNumber = "656734L" };

            CreateConnection();

            SendPayment(payment1);
            SendPayment(payment2);
            SendPayment(payment3);
            SendPayment(payment4);
            SendPayment(payment5);
            SendPayment(payment6);
            SendPayment(payment7);
            SendPayment(payment8);
            SendPayment(payment9);
            SendPayment(payment10);

            SendPurchaseOrder(purchaseOrder1);
            SendPurchaseOrder(purchaseOrder2);
            SendPurchaseOrder(purchaseOrder3);
            SendPurchaseOrder(purchaseOrder4);
            SendPurchaseOrder(purchaseOrder5);
            SendPurchaseOrder(purchaseOrder6);
            SendPurchaseOrder(purchaseOrder7);
            SendPurchaseOrder(purchaseOrder8);
            SendPurchaseOrder(purchaseOrder9);
            SendPurchaseOrder(purchaseOrder10);
        }

        private static void SendPayment(Payment payment)
        {
            SendMessage(payment.Serialize(), "CardPayment");
            Console.WriteLine(" Payment Sent {0}, £{1}", payment.CardNumber, payment.AmountToPay); 
        }

        private static void SendPurchaseOrder(PurchaseOrder purchaseOrder)
        {
            SendMessage(purchaseOrder.Serialize(), "PurchaseOrder");
            Console.WriteLine(" Purchase Order Sent {0}, £{1}, {2}, {3}", purchaseOrder.CompanyName, purchaseOrder.AmountToPay, purchaseOrder.PaymentDayTerms, purchaseOrder.PoNumber); 
        }

        private static void CreateConnection()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            _model.ExchangeDeclare(ExchangeName, "direct");
            _model.QueueDeclare(CardPaymentQueueName, true, false, false, null);
            _model.QueueDeclare(PurchaseOrderQueueName, true, false, false, null);

            _model.QueueBind(CardPaymentQueueName, ExchangeName, "CardPayment");
            _model.QueueBind(PurchaseOrderQueueName, ExchangeName, "PurchaseOrder");
        }

        private static void SendMessage(byte[] message, string routingKey)
        {                       
            _model.BasicPublish(ExchangeName, routingKey, null, message);          
        }
    }

Consumer 1 (Payment)

  class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;

        private const string ExchangeName = "DirectRouting_Exchange";
        private const string CardPaymentQueueName = "CardPaymentDirectRouting_Queue";

        static void Main()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
            using (_connection = _factory.CreateConnection())
            {
                using (var channel = _connection.CreateModel())
                {
                    channel.ExchangeDeclare(ExchangeName, "direct");
                    channel.QueueDeclare(CardPaymentQueueName, true, false, false, null);
                    channel.QueueBind(CardPaymentQueueName, ExchangeName, "CardPayment");
                    channel.BasicQos(0, 1, false);

                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(CardPaymentQueueName, false, consumer);

                    while (true)
                    {                        
                        var ea = consumer.Queue.Dequeue();
                        var message = (Payment)ea.Body.DeSerialize(typeof(Payment));                        
                        var routingKey = ea.RoutingKey;
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine("--- Payment - Routing Key <{0}> : {1} : {2}", routingKey, message.CardNumber, message.AmountToPay);
                    }
                }
            }
        }
    }

Consumer 1 (PurchaseOrder)

  class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;

        private const string ExchangeName = "DirectRouting_Exchange";
        private const string PurchaseOrderQueueName = "PurchaseOrderDirectRouting_Queue";

        static void Main()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
            using (_connection = _factory.CreateConnection())
            {
                using (var channel = _connection.CreateModel())
                {
                    channel.ExchangeDeclare(ExchangeName, "direct");
                    channel.QueueDeclare(PurchaseOrderQueueName, true, false, false, null);
                    channel.QueueBind(PurchaseOrderQueueName, ExchangeName, "PurchaseOrder");
                    channel.BasicQos(0, 1, false);

                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(PurchaseOrderQueueName, false, consumer);

                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();
                        var message = (PurchaseOrder)ea.Body.DeSerialize(typeof(PurchaseOrder));
                        var routingKey = ea.RoutingKey;
                        channel.BasicAck(ea.DeliveryTag, false);

                        Console.WriteLine("-- Purchase Order - Routing Key <{0}> : {1}, £{2}, {3}, {4}", routingKey, message.CompanyName, message.AmountToPay, message.PaymentDayTerms, message.PoNumber); 
                    }
                }
            }
        }
    }