آموزش هوش تجاری از ۰ تا ۱۰۰

  • ۰
  • ۰

مقدمه
 
الگوی پیام رسانی ناهمزمان یک انتخاب طراحی ضروری برای ارتباطات میکروسرویس است. در این الگوی طراحی، یک میکروسرویس می‌تواند پیامی را منتشر کند و برای ادامه کار، نیازی به صبر کردن برای تکمیل پردازش توسط مصرف‌کننده ندارد.
 
تکنیک های مختلفی وجود دارد که می تواند برای دستیابی به تعاملات ناهمزمان بین میکروسرویس ها اجرا شود. در این مقاله، من در مورد یک مورد استفاده صحبت خواهم کرد که در آن از یک واسطه پیام - RabbitMQ - و تکنیک "Fanout" برای پیاده سازی الگوی پیام رسانی ناهمزمان در عمل استفاده می کنیم.


 
الگوی پیام رسانی "Fanout" چیست؟
 
Fanout یک طرح پیام رسانی است که در آن پیام منتشر شده از یک ناشر خاص توسط چندین مشترک مختلف به طور مستقل و همزمان مصرف می شود. هدف این است که همان پیام منتشر شده توسط مصرف کنندگان مختلف مصرف شود و به روش های مختلف پردازش شود.
 
در یک واسطه پیام مانند RabbitMQ، ناشر پیامی را منتشر می کند که از طریق نوع خاصی از تبادل به نام "Fanout Exchange" در صف های مختلف قرار می گیرد.
 
مصرف‌کنندگانی که به این صف‌ها گوش می‌دهند، همان پیامی را دریافت می‌کنند که باید مصرف شود.
 
نمودار زیر یک نمایش نموداری از الگوی پیام رسانی "Fanout" را نشان می دهد.
 
Fanout Async Messaging Design در Microservices با استفاده از RabbitMQ


 
چه زمانی از الگوی Fanout استفاده کنیم؟
 
مورد استفاده از الگوی fanout در مکان‌هایی که ناشر نیاز دارد به طور ناهمزمان با چندین مصرف‌کننده در یک حجم کاری واحد ارتباط برقرار کند، قابل استفاده است.
یک مثال می تواند این باشد: فرض کنید در یک سیستم کتابخانه آنلاین، یک میکروسرویس مسئول ارائه درخواست امانت برای یک کتاب است - سرویس اجاره کتاب. در اینجا، هنگامی که یک کتاب با موفقیت به یک مشتری اجاره شد، این سرویس می تواند به عنوان یک ناشر عمل کند و پیامی را برای سایر خدماتی که مسئول (i) حذف آن از "لیست آرزوهای" مشتری، (ii) کاهش تعداد در دسترس بودن هستند، تولید کند. برای این کتاب در قفسه کتابخانه آنلاین، (iii) به روز رسانی "قفسه در حال مطالعه" مشتری. این سرویس‌های مصرف‌کننده پیام یکسان را در صف‌های مختلف گوش می‌دهند و می‌توانند به طور مستقل روی این اعلان کار کنند، بدون اینکه فرآیند اجاره را منتظر بمانند. در اینجا ناشر، پیام را در صف‌های متفاوتی که مصرف‌کنندگان علاقه‌مند برای مصرف پیام به آن متصل می‌شوند، «طرفدار» می‌کند. ناشر در اینجا، سرویس اجاره کتاب، نیازی به صبر کردن ندارد تا خدمات دیگر بر اساس پیام به پایان برسد و می تواند به طور یکپارچه و مستقل کتاب را برای مشتری صادر کند و درخواست های اجاره بیشتری را انجام دهد تا آن را برای کاربران در دسترس قرار دهد.


 
Refresher در "Exchanges"، "Bindings"، "Routing Keys" در RabbitMQ
 
به‌عنوان تجدیدنظر یا محاسبه‌کننده سریع، جنبه‌های مختلف کلیدی واسطه پیام RabbitMQ را که در نمونه اولیه استفاده می‌کنیم، مشخص می‌کنم. با این حال، من شما را تشویق می‌کنم که برای درک عمیق این موضوعات به مطالعه‌های بیشتر بروید.


 
مبادلات
 
اکسچنج جایی است که همه پیام‌های منتشر شده ابتدا از سوی ناشران ارسال می‌شوند و سپس به صف‌های مختلف هدایت می‌شوند. این را می توان با یک صندوق پستی یا یک هاب مقایسه کرد که در آن همه پیام ها می رسند و سپس بر اساس کلید مسیریابی، ویژگی هدر و اتصال به صف های مربوطه ارسال می شوند.
 
در زیر انواع مبادلات پشتیبانی شده در RabbitMQ آمده است:

 

  • مستقیم
  • Fanout
  • موضوع
  • سرصفحه ها

 

اتصالات
 
Binding یک صف را به یک صرافی پیوند می دهد.

 

کلیدهای مسیریابی
 
Routing Key یکی از ویژگی‌هایی است که صرافی‌ها هنگام تصمیم‌گیری در مورد اینکه پیام به کدام صف ارسال شود، به دنبال آن هستند. با این حال، در مورد نوع مبادله "Fanout" این کلید هیچ اهمیتی ندارد و نادیده گرفته می شود.
 


نمونه اولیه
 
در مقاله قبلی ام نحوه چرخش و ارتباط با یک ظرف RabbitMQ را نشان داده بودم. این دمو بر اساس همین زیرساخت خواهد بود.
 
برای شروع، (الف) من یک تبادل از نوع "Fanout" در نمونه در حال اجرا RabbitMQ ایجاد می کنم. پس از آن (B) من سه صف مختلف را برای اتصال به این صرافی اعلام می کنم. پس از آماده شدن، (C) من پیامی را از طریق سرویس اجاره به صرافی منتشر خواهم کرد.
 
قطعه کد زیر این مراحل را نشان می دهد،

  1. private const string FANOUT_EXCHANGE = "fanout.exchange";
  2. private const string WISHLIST_QUEUE = "wishListQueue";  
  3. private const string LIBRARY_SHELF_QUEUE = "libraryShelfQueue";  
  4. private const string READING_SHELF = "readingShelfQueue";

 

  1. var factory = new ConnectionFactory()  
  2.             {  
  3.                 Uri = new Uri("amqp://guest:guest@localhost:5672")  
  4.             };  
  5.   
  6.             using (var connection = factory.CreateConnection())  
  7.             using (var channel = connection.CreateModel())  
  8.             {  
  9.                 // declare exchange of type "Fanout"  
  10.                 channel.ExchangeDeclare(exchange:FANOUT_EXCHANGE, type: ExchangeType.Fanout);  
  11.   
  12.                 // declare queue : Wishlist Queue  
  13.                 channel.QueueDeclare(queue: WISHLIST_QUEUE,  
  14.                                      durable: false,  
  15.                                      exclusive: false,  
  16.                                      autoDelete: false,  
  17.                                      arguments: null);  
  18.                 // bindind the wishlist queue to the "Fanout" exchange  
  19.                 channel.QueueBind(queue: WISHLIST_QUEUE, exchange: FANOUT_EXCHANGE, routingKey:"");  
  20.   
  21.                 // declare queue : Library Shelf Queue  
  22.                 channel.QueueDeclare(queue: LIBRARY_SHELF_QUEUE,  
  23.                                      durable: false,  
  24.                                      exclusive: false,  
  25.                                      autoDelete: false,  
  26.                                      arguments: null);  
  27.                 // bindind the Library Shelf Queue to the "Fanout" exchange  
  28.                 channel.QueueBind(queue: LIBRARY_SHELF_QUEUE, exchange: FANOUT_EXCHANGE, routingKey: "");  
  29.   
  30.                 // declare queue : Reading Shelf Queue  
  31.                 channel.QueueDeclare(queue: READING_SHELF,  
  32.                                      durable: false,  
  33.                                      exclusive: false,  
  34.                                      autoDelete: false,  
  35.                                      arguments: null);  
  36.                 // bindind the Reading Shelf to the "Fanout" exchange  
  37.                 channel.QueueBind(queue: READING_SHELF, exchange: FANOUT_EXCHANGE, routingKey: "");  
  38.   
  39.   
  40.   
  41.   
  42.                 // publish message  
  43.                 string message = "ID of the Book which is Rented out: " + id;  
  44.                 var body = Encoding.UTF8.GetBytes(message);  
  45.   
  46.                 channel.BasicPublish(exchange: FANOUT_EXCHANGE,  
  47.                                     routingKey: "",  
  48.                                     basicProperties: null,  
  49.                                     body: body);  
  50.   
  51.             }  

اکنون اجازه دهید مشتریان خود را که برای گوش دادن به این سه صف مختلف پیکربندی شده اند، بچرخانیم. یک کد نمونه از یکی از مصرف کنندگان - WishListService - که در آن پیام ها را از صف مربوطه مصرف می کند: wishListQueue

  1. //establish connection  
  2.             var factory = new ConnectionFactory()  
  3.             {  
  4.                 Uri = new Uri("amqp://guest:guest@localhost:5672")  
  5.             };  
  6.             var rabbitMqConnection = factory.CreateConnection();  
  7.             var rabbitMqChannel = rabbitMqConnection.CreateModel();  
  8.   
  9.             rabbitMqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);  
  10.   
  11.             //consume the message received  
  12.             var consumer = new EventingBasicConsumer(rabbitMqChannel);  
  13.             consumer.Received += (model, args) =>  
  14.             {  
  15.                 var body = args.Body;  
  16.                 var message = Encoding.UTF8.GetString(body.ToArray());  
  17.                 Console.WriteLine("Book ID To be removed from Wishlist: " + message);  
  18.                 rabbitMqChannel.BasicAck(deliveryTag: args.DeliveryTag, multiple: true);  
  19.                 Thread.Sleep(1000);  
  20.             };  
  21.             rabbitMqChannel.BasicConsume(queue: WISHLIST_QUEUE,  
  22.                                          autoAck: false,  
  23.                                          consumer: consumer);  
  24.             Console.ReadLine();  


 
خلاصه
 
در این مقاله، ابتدا الگوی پیام رسانی ناهمگام Fanout را که می تواند در ارتباطات Microservice استفاده شود، درک کردیم. سپس شروع به کاوش کردیم که چگونه می توان از طریق یک واسطه پیام مانند RabbitMQ به این امر دست یافت. برای آن ما یک نمونه اولیه بر اساس سیستم کتابخانه دیجیتال آنلاین فرضی خود ایجاد کردیم که در آن سرویس اجاره یک پیام واحد را پس از اجاره موفقیت آمیز یک کتاب منتشر می کند. این پیام از طریق یک تبادل Fanout در RabbitMQ منتشر می شود که آن را در صف های محدود مختلف قرار می دهد. خدمات مصرف کننده که به صف مربوطه گوش می دهند این پیام را دریافت کرده و به طور مستقل پردازش می کنند. 

  • ۰۱/۱۰/۳۰
  • sahar saha sql

نظرات (۰)

هیچ نظری هنوز ثبت نشده است

ارسال نظر

ارسال نظر آزاد است، اما اگر قبلا در بیان ثبت نام کرده اید می توانید ابتدا وارد شوید.
شما میتوانید از این تگهای html استفاده کنید:
<b> یا <strong>، <em> یا <i>، <u>، <strike> یا <s>، <sup>، <sub>، <blockquote>، <code>، <pre>، <hr>، <br>، <p>، <a href="" title="">، <span style="">، <div align="">
تجدید کد امنیتی