مقدمه
الگوی پیام رسانی ناهمزمان یک انتخاب طراحی ضروری برای ارتباطات میکروسرویس است. در این الگوی طراحی، یک میکروسرویس میتواند پیامی را منتشر کند و برای ادامه کار، نیازی به صبر کردن برای تکمیل پردازش توسط مصرفکننده ندارد.
تکنیک های مختلفی وجود دارد که می تواند برای دستیابی به تعاملات ناهمزمان بین میکروسرویس ها اجرا شود. در این مقاله، من در مورد یک مورد استفاده صحبت خواهم کرد که در آن از یک واسطه پیام - RabbitMQ - و تکنیک "Fanout" برای پیاده سازی الگوی پیام رسانی ناهمزمان در عمل استفاده می کنیم.
الگوی پیام رسانی "Fanout" چیست؟
Fanout یک طرح پیام رسانی است که در آن پیام منتشر شده از یک ناشر خاص توسط چندین مشترک مختلف به طور مستقل و همزمان مصرف می شود. هدف این است که همان پیام منتشر شده توسط مصرف کنندگان مختلف مصرف شود و به روش های مختلف پردازش شود.
در یک واسطه پیام مانند RabbitMQ، ناشر پیامی را منتشر می کند که از طریق نوع خاصی از تبادل به نام "Fanout Exchange" در صف های مختلف قرار می گیرد.
مصرفکنندگانی که به این صفها گوش میدهند، همان پیامی را دریافت میکنند که باید مصرف شود.
نمودار زیر یک نمایش نموداری از الگوی پیام رسانی "Fanout" را نشان می دهد.
Fanout Async Messaging Design در Microservices با استفاده از RabbitMQ
چه زمانی از الگوی Fanout استفاده کنیم؟
مورد استفاده از الگوی fanout در مکانهایی که ناشر نیاز دارد به طور ناهمزمان با چندین مصرفکننده در یک حجم کاری واحد ارتباط برقرار کند، قابل استفاده است.
یک مثال می تواند این باشد: فرض کنید در یک سیستم کتابخانه آنلاین، یک میکروسرویس مسئول ارائه درخواست امانت برای یک کتاب است - سرویس اجاره کتاب. در اینجا، هنگامی که یک کتاب با موفقیت به یک مشتری اجاره شد، این سرویس می تواند به عنوان یک ناشر عمل کند و پیامی را برای سایر خدماتی که مسئول (i) حذف آن از "لیست آرزوهای" مشتری، (ii) کاهش تعداد در دسترس بودن هستند، تولید کند. برای این کتاب در قفسه کتابخانه آنلاین، (iii) به روز رسانی "قفسه در حال مطالعه" مشتری. این سرویسهای مصرفکننده پیام یکسان را در صفهای مختلف گوش میدهند و میتوانند به طور مستقل روی این اعلان کار کنند، بدون اینکه فرآیند اجاره را منتظر بمانند. در اینجا ناشر، پیام را در صفهای متفاوتی که مصرفکنندگان علاقهمند برای مصرف پیام به آن متصل میشوند، «طرفدار» میکند. ناشر در اینجا، سرویس اجاره کتاب، نیازی به صبر کردن ندارد تا خدمات دیگر بر اساس پیام به پایان برسد و می تواند به طور یکپارچه و مستقل کتاب را برای مشتری صادر کند و درخواست های اجاره بیشتری را انجام دهد تا آن را برای کاربران در دسترس قرار دهد.
Refresher در "Exchanges"، "Bindings"، "Routing Keys" در RabbitMQ
بهعنوان تجدیدنظر یا محاسبهکننده سریع، جنبههای مختلف کلیدی واسطه پیام RabbitMQ را که در نمونه اولیه استفاده میکنیم، مشخص میکنم. با این حال، من شما را تشویق میکنم که برای درک عمیق این موضوعات به مطالعههای بیشتر بروید.
مبادلات
اکسچنج جایی است که همه پیامهای منتشر شده ابتدا از سوی ناشران ارسال میشوند و سپس به صفهای مختلف هدایت میشوند. این را می توان با یک صندوق پستی یا یک هاب مقایسه کرد که در آن همه پیام ها می رسند و سپس بر اساس کلید مسیریابی، ویژگی هدر و اتصال به صف های مربوطه ارسال می شوند.
در زیر انواع مبادلات پشتیبانی شده در RabbitMQ آمده است:
- مستقیم
- Fanout
- موضوع
- سرصفحه ها
اتصالات
Binding یک صف را به یک صرافی پیوند می دهد.
کلیدهای مسیریابی
Routing Key یکی از ویژگیهایی است که صرافیها هنگام تصمیمگیری در مورد اینکه پیام به کدام صف ارسال شود، به دنبال آن هستند. با این حال، در مورد نوع مبادله "Fanout" این کلید هیچ اهمیتی ندارد و نادیده گرفته می شود.
نمونه اولیه
در مقاله قبلی ام نحوه چرخش و ارتباط با یک ظرف RabbitMQ را نشان داده بودم. این دمو بر اساس همین زیرساخت خواهد بود.
برای شروع، (الف) من یک تبادل از نوع "Fanout" در نمونه در حال اجرا RabbitMQ ایجاد می کنم. پس از آن (B) من سه صف مختلف را برای اتصال به این صرافی اعلام می کنم. پس از آماده شدن، (C) من پیامی را از طریق سرویس اجاره به صرافی منتشر خواهم کرد.
قطعه کد زیر این مراحل را نشان می دهد،
- private const string FANOUT_EXCHANGE = "fanout.exchange";
- private const string WISHLIST_QUEUE = "wishListQueue";
- private const string LIBRARY_SHELF_QUEUE = "libraryShelfQueue";
- private const string READING_SHELF = "readingShelfQueue";
- var factory = new ConnectionFactory()
- {
- Uri = new Uri("amqp://guest:guest@localhost:5672")
- };
- using (var connection = factory.CreateConnection())
- using (var channel = connection.CreateModel())
- {
- // declare exchange of type "Fanout"
- channel.ExchangeDeclare(exchange:FANOUT_EXCHANGE, type: ExchangeType.Fanout);
- // declare queue : Wishlist Queue
- channel.QueueDeclare(queue: WISHLIST_QUEUE,
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- // bindind the wishlist queue to the "Fanout" exchange
- channel.QueueBind(queue: WISHLIST_QUEUE, exchange: FANOUT_EXCHANGE, routingKey:"");
- // declare queue : Library Shelf Queue
- channel.QueueDeclare(queue: LIBRARY_SHELF_QUEUE,
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- // bindind the Library Shelf Queue to the "Fanout" exchange
- channel.QueueBind(queue: LIBRARY_SHELF_QUEUE, exchange: FANOUT_EXCHANGE, routingKey: "");
- // declare queue : Reading Shelf Queue
- channel.QueueDeclare(queue: READING_SHELF,
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- // bindind the Reading Shelf to the "Fanout" exchange
- channel.QueueBind(queue: READING_SHELF, exchange: FANOUT_EXCHANGE, routingKey: "");
- // publish message
- string message = "ID of the Book which is Rented out: " + id;
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: FANOUT_EXCHANGE,
- routingKey: "",
- basicProperties: null,
- body: body);
- }
اکنون اجازه دهید مشتریان خود را که برای گوش دادن به این سه صف مختلف پیکربندی شده اند، بچرخانیم. یک کد نمونه از یکی از مصرف کنندگان - WishListService - که در آن پیام ها را از صف مربوطه مصرف می کند: wishListQueue
- //establish connection
- var factory = new ConnectionFactory()
- {
- Uri = new Uri("amqp://guest:guest@localhost:5672")
- };
- var rabbitMqConnection = factory.CreateConnection();
- var rabbitMqChannel = rabbitMqConnection.CreateModel();
- rabbitMqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
- //consume the message received
- var consumer = new EventingBasicConsumer(rabbitMqChannel);
- consumer.Received += (model, args) =>
- {
- var body = args.Body;
- var message = Encoding.UTF8.GetString(body.ToArray());
- Console.WriteLine("Book ID To be removed from Wishlist: " + message);
- rabbitMqChannel.BasicAck(deliveryTag: args.DeliveryTag, multiple: true);
- Thread.Sleep(1000);
- };
- rabbitMqChannel.BasicConsume(queue: WISHLIST_QUEUE,
- autoAck: false,
- consumer: consumer);
- Console.ReadLine();
خلاصه
در این مقاله، ابتدا الگوی پیام رسانی ناهمگام Fanout را که می تواند در ارتباطات Microservice استفاده شود، درک کردیم. سپس شروع به کاوش کردیم که چگونه می توان از طریق یک واسطه پیام مانند RabbitMQ به این امر دست یافت. برای آن ما یک نمونه اولیه بر اساس سیستم کتابخانه دیجیتال آنلاین فرضی خود ایجاد کردیم که در آن سرویس اجاره یک پیام واحد را پس از اجاره موفقیت آمیز یک کتاب منتشر می کند. این پیام از طریق یک تبادل Fanout در RabbitMQ منتشر می شود که آن را در صف های محدود مختلف قرار می دهد. خدمات مصرف کننده که به صف مربوطه گوش می دهند این پیام را دریافت کرده و به طور مستقل پردازش می کنند.
- ۰۱/۱۰/۳۰