日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

.NET中的MassTransit分布式應(yīng)用框架詳解

瀏覽:378日期:2022-06-09 11:49:11
目錄
  • 引言
  • 快速體驗(yàn)
  • 核心概念
  • Message
  • Producer
    • 發(fā)送命令
    • 發(fā)布事件
  • Consumer
    • 無狀態(tài)消費(fèi)者
    • 有狀態(tài)消費(fèi)者
  • 應(yīng)用場景

    MassTransit是一款優(yōu)秀的分布式應(yīng)用框架,可作為分布式應(yīng)用的消息總線,也可以用作單體應(yīng)用的事件總線。

    引言

    A free, open-source distributed application framework for .NET.
    一個免費(fèi)、開源的.NET 分布式應(yīng)用框架。-- MassTransit 官網(wǎng)

    快速體驗(yàn)

    空口無憑,創(chuàng)建一個項(xiàng)目快速體驗(yàn)一下。

    • 基于worker模板創(chuàng)建一個基礎(chǔ)項(xiàng)目:dotnet new worker -n MassTransit.Demo
    • 打開項(xiàng)目,添加NuGet包:MassTransit
    • 定義訂單創(chuàng)建事件消息契約:
    using System;namespace MassTransit.Demo{    public record OrderCreatedEvent    {public Guid OrderId { get; set; }    }}

    4.修改Worker類,發(fā)送訂單創(chuàng)建事件:

    namespace MassTransit.Demo;public class Worker : BackgroundService{    readonly IBus _bus;//注冊總線    public Worker(IBus bus)    {_bus = bus;    }    protected override async Task ExecuteAsync(CancellationToken stoppingToken)    {while (!stoppingToken.IsCancellationRequested){    //模擬并發(fā)送訂單創(chuàng)建事件    await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);    await Task.Delay(1000, stoppingToken);}    }}

    5.僅需實(shí)現(xiàn)IConsumer<OrderCreatedEvent>泛型接口,即可實(shí)現(xiàn)消息的訂閱:

    public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>{    private readonly ILogger<OrderCreatedEventConsumer> _logger;    public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)    {_logger = logger;    }    public Task Consume(ConsumeContext<OrderCreatedEvent> context)    {_logger.LogInformation($"Received Order:{context.Message.OrderId}");return Task.CompletedTask;    }}

    6.注冊服務(wù):

    using MassTransit;using MassTransit.Demo;IHost host = Host.CreateDefaultBuilder(args)    .ConfigureServices(services =>    {services.AddHostedService<Worker>();services.AddMassTransit(configurator =>{    //注冊消費(fèi)者    configurator.AddConsumer<OrderCreatedEventConsumer>();    //使用基于內(nèi)存的消息路由傳輸    configurator.UsingInMemory((context, cfg) =>    {cfg.ConfigureEndpoints(context);    });});    })    .Build();await host.RunAsync();

    7.運(yùn)行項(xiàng)目,一個簡單的進(jìn)程內(nèi)事件發(fā)布訂閱的應(yīng)用就完成了。

    如果需要使用RabbitMQ 消息代理進(jìn)行消息傳輸,則僅需安裝MassTransit.RabbitMQNuGet包,然后指定使用RabbitMQ 傳輸消息即可。

    using MassTransit;using MassTransit.Demo;IHost host = Host.CreateDefaultBuilder(args)    .ConfigureServices(services =>    {services.AddHostedService<Worker>();services.AddMassTransit(configurator =>{    configurator.AddConsumer<OrderCreatedEventConsumer>();        // configurator.UsingInMemory((context, cfg) =>    // {    //     cfg.ConfigureEndpoints(context);    // });        configurator.UsingRabbitMq((context, cfg) =>    {cfg.Host(    host: "localhost",    port: 5672,    virtualHost: "/",    configure: hostConfig =>    {hostConfig.Username("guest");hostConfig.Password("guest");    });cfg.ConfigureEndpoints(context);    });});    })    .Build();await host.RunAsync();

    運(yùn)行項(xiàng)目,MassTransit會自動在指定的RabbitMQ上創(chuàng)建一個類型為fanoutMassTransit.Demo.OrderCreatedEventExchange和一個與OrderCreatedEvent同名的隊(duì)列進(jìn)行消息傳輸,如下圖所示。

    核心概念

    MassTranist 為了實(shí)現(xiàn)消息代理的透明化和應(yīng)用間消息的高效傳輸,抽象了以下概念,其中消息流轉(zhuǎn)流程如下圖所示:

    • Message:消息契約,定義了消息生產(chǎn)者和消息消費(fèi)者之間的契約。
    • Producer:生產(chǎn)者,發(fā)送消息的一方都可以稱為生產(chǎn)者。
    • SendEndpoint:發(fā)送端點(diǎn),用于將消息內(nèi)容序列化,并發(fā)送到傳輸模塊。
    • Transport:傳輸模塊,消息代理透明化的核心,用于和消息代理通信,負(fù)責(zé)發(fā)送和接收消息。
    • ReceiveEndpoint:接收端點(diǎn),用于從傳輸模塊接收消息,反序列化消息內(nèi)容,并將消息路由到消費(fèi)者。
    • Consumer:消費(fèi)者,用于消息消費(fèi)。

    從上圖可知,本質(zhì)上還是發(fā)布訂閱模式的實(shí)現(xiàn),接下來就核心概念進(jìn)行詳解。

    Message

    Message:消息,可以使用class、interface、struct和record來創(chuàng)建,消息作為一個契約,需確保創(chuàng)建后不能篡改,因此應(yīng)只保留只讀屬性且不應(yīng)包含方法和行為。MassTransit使用的是包含命名空間的完全限定名即typeof(T).FullName來表示特定的消息類型。因此若在另外的項(xiàng)目中消費(fèi)同名的消息類型,需確保消息的命名空間相同。另外需注意消息不應(yīng)繼承,以避免發(fā)送基類消息類型造成的不可預(yù)期的結(jié)果。為避免此類情況,官方建議使用接口來定義消息。在MassTransit中,消息主要分為兩種類型:

    • Command:命令,用于告訴服務(wù)做什么,命令被發(fā)送到指定端點(diǎn),僅被一個服務(wù)接收并執(zhí)行。一般以動名詞結(jié)構(gòu)命名,如:UpdateAddress、CancelOrder。
    • Event:事件,用于告訴服務(wù)什么發(fā)生了,事件被發(fā)布到多個端點(diǎn),可以被多個服務(wù)消費(fèi)。 一般以過去式結(jié)構(gòu)命名,如:AddressUpdated,OrderCanceled。

    經(jīng)過MassTransit發(fā)送的消息,會使用信封包裝,包含一些附加信息,數(shù)據(jù)結(jié)構(gòu)舉例如下:

    {    "messageId": "6c600000-873b-00ff-9a8f-08da8da85542",    "requestId": null,    "correlationId": null,    "conversationId": "6c600000-873b-00ff-9526-08da8da85544",    "initiatorId": null,    "sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true",    "destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent",    "responseAddress": null,    "faultAddress": null,    "messageType": ["urn:message:MassTransit.Demo:OrderCreatedEvent"    ],    "message": {"orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"    },    "expirationTime": null,    "sentTime": "2022-09-03T12:32:15.0796943Z",    "headers": {},    "host": {"machineName": "THINKPAD","processName": "MassTransit.Demo","processId": 24684,"assembly": "MassTransit.Demo","assemblyVersion": "1.0.0.0","frameworkVersion": "6.0.5","massTransitVersion": "8.0.6.0","operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"    }}

    從以上消息實(shí)例中可以看出一個包裝后的消息包含以下核心屬性:

    • messageId:全局唯一的消息ID
    • messageType:消息類型
    • message:消息體,也就是具體的消息實(shí)例
    • sourceAddress:消息來源地址
    • destinationAddress:消息目標(biāo)地址
    • responseAddress:響應(yīng)地址,在請求響應(yīng)模式中使用
    • faultAddress:消息異常發(fā)送地址,用于存儲異常消費(fèi)消息
    • headers:消息頭,允許應(yīng)用自定義擴(kuò)展信息
    • correlationId:關(guān)聯(lián)Id,在Saga狀態(tài)機(jī)中會用到,用來關(guān)聯(lián)系列事件
    • host:宿主,消息來源應(yīng)用的宿主信息

    Producer

    Producer,生產(chǎn)者,即用于生產(chǎn)消息。在MassTransit主要借助以下對象進(jìn)行命令的發(fā)送和事件的發(fā)布。

    從以上類圖可以看出,消息的發(fā)送主要核心依賴于兩個接口:

    • ISendEndpoint:提供了Send方法,用于發(fā)送命令。
    • IPublishEndpoint:提供了Publish方法,用于發(fā)布事件。

    但基于上圖的繼承體系,可以看出通過IBusISendEndpointProviderConsumeContext進(jìn)行命令的發(fā)送;通過IBusIPublishEndpointProvider進(jìn)行事件的發(fā)布。具體舉例如下:

    發(fā)送命令

    1.通過IBus發(fā)送:

    private readonly IBus _bus;public async Task Post(CreateOrderRequest request){    //通過以下方式配置對應(yīng)消息類型的目標(biāo)地址    EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));    await _bus.Send(request);}

    2.通過ISendEndpointProvider發(fā)送:

    private readonly ISendEndpointProvider  _sendEndpointProvider;public async Task Post(CreateOrderRequest request){    var serviceAddress = new Uri("queue:create-order");    var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);    await endpoint.Send(request);}

    3.通過ConsumeContext發(fā)送:

    public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>{        public async Task Consume(ConsumeContext<CreateOrderRequest> context)    {    	//do something elsevar destinationAddress = new Uri("queue:lock-stock");var command = new LockStockRequest(context.Message.OrderId);       await context.Send<LockStockRequest>(destinationAddress, command); 		// 也可以通過獲取`SendEndpoint`發(fā)送命令// var endpoint = await context.GetSendEndpoint(destinationAddress);// await endpoint.Send<LockStockRequest>(command);    	    }}

    發(fā)布事件

    1.通過IBus發(fā)布:

    private readonly IBus _bus;public async Task Post(CreateOrderRequest request){    //do something    await _bus.Publish(request);}

    2.通過IPublishEndpoint發(fā)布:

    private readonly IPublishEndpoint _publishEndpoint;public async Task Post(CreateOrderRequest request){    //do something    var order = CreateOrder(request);    await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}

    3.通過ConsumeContext發(fā)布:

    public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>{        public async Task Consume(ConsumeContext<CreateOrderRequest> context)    {var order = CreateOrder(conext.Message);    	await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));    }}

    Consumer

    Consumer,消費(fèi)者,即用于消費(fèi)消息。MassTransit 包括多種消費(fèi)者類型,主要分為無狀態(tài)和有狀態(tài)兩種消費(fèi)者類型。

    無狀態(tài)消費(fèi)者

    無狀態(tài)消費(fèi)者,即消費(fèi)者無狀態(tài),消息消費(fèi)完畢,消費(fèi)者就釋放。主要的消費(fèi)者類型有:IConsumer<TMessage>JobConsumerIActivityRoutingSlip等。其中IConsumer<TMessage>已經(jīng)在上面的快速體驗(yàn)部分舉例說明。而JobConsumer<TMessage>主要是對IConsumer<TMessage>的補(bǔ)充,其主要應(yīng)用場景在于執(zhí)行耗時任務(wù)。
    而對于IActivityRoutingSlip則是MassTransit Courier的核心對象,主要用于實(shí)現(xiàn)Saga模式的分布式事務(wù)。MassTransit Courier 實(shí)現(xiàn)了Routing Slip模式,通過按需有序組合一系列的Activity,得到一個用來限定消息處理順序的Routing Slip。而每個Activity的具體抽象就是IActivityIExecuteActivity。二者的差別在于IActivity定義了ExecuteCompensate兩個方法,而IExecuteActivitiy僅定義了Execute方法。其中Execute代表正向操作,Compensate代表反向補(bǔ)償操作。用一個簡單的下單流程:創(chuàng)建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。而對于具體實(shí)現(xiàn),可參閱文章:AspNetCore&MassTransit Courier實(shí)現(xiàn)分布式事務(wù)

    有狀態(tài)消費(fèi)者

    有狀態(tài)消費(fèi)者,即消費(fèi)者有狀態(tài),其狀態(tài)會持久化,代表的消費(fèi)者類型為MassTransitStateMachineMassTransitStateMachineMassTransit Automatonymous 庫定義的,Automatonymous 是一個.NET 狀態(tài)機(jī)庫,用于定義狀態(tài)機(jī),包括狀態(tài)、事件和行為。MassTransitStateMachine就是狀態(tài)機(jī)的具體抽象,可以用其編排一系列事件來實(shí)現(xiàn)狀態(tài)的流轉(zhuǎn),也可以用來實(shí)現(xiàn)Saga模式的分布式事務(wù)。并支持與EF Core和Dapper集成將狀態(tài)持久化到關(guān)系型數(shù)據(jù)庫,也支持將狀態(tài)持久化到MongoDB、Redis等數(shù)據(jù)庫。MassTransitStateMachine對于Saga模式分布式事務(wù)的實(shí)現(xiàn)方式與RoutingSlip不同,還是以簡單的下單流程:創(chuàng)建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。基于MassTransitStateMachine 實(shí)現(xiàn)分布式事務(wù)詳參后續(xù)文章。

    從上圖可知,通過MassTransitStateMachine可以將事件的執(zhí)行順序邏輯編排在一個集中的狀態(tài)機(jī)中,通過發(fā)送命令和訂閱事件來推動狀態(tài)流轉(zhuǎn),而這也正是Saga編排模式的實(shí)現(xiàn)。

    應(yīng)用場景

    了解完MassTransit的核心概念,接下來再來看下MassTransit的核心特性以及應(yīng)用場景:

    • 基于消息的請求響應(yīng)模式:可用于同步通信
    • Mediator模式:中間者模式的實(shí)現(xiàn),類似MediatR,但功能更完善
    • 計劃任務(wù):可用于執(zhí)行定時任務(wù)
    • Routing Slip 模式:可用于實(shí)現(xiàn)Saga模式的分布式事務(wù)
    • Saga 狀態(tài)機(jī):可用于實(shí)現(xiàn)Saga模式的分布式事務(wù)
    • 本地消息表:類似DotNetCore.Cap,用于實(shí)現(xiàn)最終一致性

    總體而言,MassTransit是一款優(yōu)秀的分布式應(yīng)用框架,可作為分布式應(yīng)用的消息總線,也可以用作單體應(yīng)用的事件總線。感興趣的朋友不妨一觀。

    到此這篇關(guān)于MassTransit 中的.NET 分布式應(yīng)用框架的文章就介紹到這了,更多相關(guān).NET 分布式應(yīng)用框架內(nèi)容請搜索以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持!

    標(biāo)簽: ASP.NET
    日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
    国产精品亚洲一区二区在线观看| 久久精品国产精品亚洲毛片| av中文字幕在线观看第一页| 麻豆成人av在线| 国产另类在线| 国产精品亲子伦av一区二区三区| 91精品丝袜国产高跟在线| 91嫩草精品| 国产精品v日韩精品v欧美精品网站| 国产精品三级| 精品一区二区三区免费看| 久久久久久久久成人| 精品中文字幕一区二区三区四区| 国产另类在线| 精品久久在线| 日韩免费福利视频| 蜜桃成人av| 亚洲精品看片| 国产欧美日韩亚洲一区二区三区| 国产精品igao视频网网址不卡日韩| 久久99国产精品视频| 91一区二区三区四区| 久久国产亚洲| 午夜在线播放视频欧美| 亚洲精品一级| 国产精品久久久久av蜜臀 | 黄在线观看免费网站ktv| 国产91欧美| 亚洲国产影院| 亚洲一区欧美| 国产精品美女午夜爽爽| 91一区二区三区四区| 亚洲精品1区2区| 日韩精品国产欧美| 精品国产亚洲日本| 欧美成人基地 | 国产一区二区三区不卡视频网站 | 99久久久国产精品美女| 国产模特精品视频久久久久| 亚洲毛片在线免费| 国产一区二区三区91| 免费精品国产的网站免费观看| 日av在线不卡| 国产精品毛片久久久| 国产 日韩 欧美一区| 在线国产一区| 日韩av在线播放中文字幕| 成人台湾亚洲精品一区二区| 亚洲国产不卡| 国产九九精品| 激情欧美亚洲| 日韩国产欧美在线播放| 狠狠躁少妇一区二区三区| 亚洲免费网址| 免费看久久久| 亚洲免费精品| 麻豆久久久久久| 99热免费精品| 精品一区91| 欧美在线综合| 欧美韩一区二区| 欧美日韩水蜜桃| 欧美日本久久| 欧美精选一区二区三区| 国产亚洲一区二区三区不卡| 久久在线免费| 国产亚洲观看| 亚洲激情av| 久久精品国产亚洲一区二区三区| av一区二区高清| 精品一区二区三区中文字幕视频| 亚洲一区久久| 国产激情一区| 免费在线看一区| 日韩欧美一区二区三区在线视频| 亚洲精选久久| 91精品国产自产在线观看永久∴ | 久久精品72免费观看| 亚洲va在线| 国产福利资源一区| 欧美在线综合| 三上悠亚国产精品一区二区三区| 欧美天堂在线| 日韩中文字幕一区二区三区| 天堂√中文最新版在线| 国产精品免费精品自在线观看| 国产视频一区在线观看一区免费| 久久av导航| 一区二区国产在线| 久久婷婷丁香| 久久久久伊人| 91精品国产经典在线观看| 欧美另类专区| 成人va天堂| 成人国产精品一区二区网站| 日韩高清中文字幕一区| 国产精品日韩欧美一区| 日韩精品欧美| 97人人精品| 国产精品久久久久久妇女| 亚洲精品1区| 尤物tv在线精品| 高清一区二区| 国产欧美日韩精品一区二区免费| 在线精品亚洲| 先锋影音国产一区| 欧美日中文字幕| 日韩不卡免费高清视频| 国内一区二区三区| 国产精品va| 国产三级精品三级在线观看国产| 亚洲麻豆一区| 中文字幕av亚洲精品一部二部| 黄色成人精品网站| 99tv成人| 国产成人免费精品| 国产精选久久| 国产精品4hu.www| 中文日韩欧美| 91精品国产自产在线观看永久∴ | 日韩中文字幕视频网| 五月精品视频| 亚洲h色精品| 伊人久久av| 久久久一本精品| 99成人在线视频| 欧美午夜精品一区二区三区电影| 四虎国产精品免费观看| 精品国产乱码久久久| 麻豆传媒一区二区三区| 麻豆高清免费国产一区| 91精品丝袜国产高跟在线| 日本99精品| 国产精品一区二区三区美女| 国产精品视频一区视频二区| 国产精品乱战久久久| 美女高潮久久久| 成人在线观看免费视频| 视频在线不卡免费观看| 日本午夜大片a在线观看| 狠狠躁少妇一区二区三区| jizzjizz中国精品麻豆| 伊人网在线播放| 日韩一区二区三区免费| 亚洲手机视频| 久久亚洲精品伦理| 日韩在线一二三区| 日韩精品一区二区三区中文字幕| 7777精品| 免费视频一区二区三区在线观看| 国产一区二区亚洲| 色爱av综合网| 亚洲一区二区三区四区五区午夜| 男人的天堂久久精品| 91亚洲精品视频在线观看 | 国产精品一区高清| 国产一区二区三区精品在线观看| 丝袜诱惑一区二区| 国产偷自视频区视频一区二区| 亚洲少妇在线| 欧美亚洲一级| 成人免费一区| 黄色成人精品网站| 日本综合精品一区| 精品一区二区三区视频在线播放 | 欧美激情一区| 国产一区二区三区四区大秀| 91精品一区国产高清在线gif| 亚洲免费一区二区| 国产精品一区二区精品| 日韩精品一区二区三区免费观看| 性一交一乱一区二区洋洋av| 国产剧情在线观看一区| 国产精品亚洲综合色区韩国| 亚洲欧洲美洲av| 国产女优一区| 国产精品亚洲欧美一级在线| 欧美三级网址| 免费成人av在线播放| 久久av影视| 婷婷久久一区| 欧美日韩xxxx| 99久久精品网| 97久久亚洲| 日韩中文影院| 中文字幕av一区二区三区人 | 91麻豆精品激情在线观看最新| 免费一级欧美在线观看视频| 婷婷亚洲五月| 国产美女久久| 激情综合在线| 97se亚洲| 亚洲国产专区校园欧美| 亚洲免费毛片| 91精品xxx在线观看| 亚洲精品国产日韩| 鲁鲁在线中文| 日韩av不卡在线观看| 亚洲午夜精品久久久久久app| 日本欧美一区|