生成者/消费者概念编程模型 通道是生成者/使用者概念编程模型的实现。 在此编程模型中,生成者异步生成数据,使用者异步使用该数据。 换句话说,此模型将数据从一方移交给另一方。 尝试将通道视为任何其他常见的泛型集合类型,例如 List。 主要区别在于,此集合管理同步,并通过工厂创建选项提供各种消耗模型。 这些选项控制通道的行为,例如允许它们存储的元素数,以及达到该限制时会发生什么情况,或者通道是由多个生成者还是多个使用者同时访问
channel简介 channel提供了用于在生成者和使用者之间以异步方式传递数据的一组同步数据结构。
channel(管道)提供了有界通道和无界通道
无界通道 该通道可以同时供任意数量的读取器和编写器使用。 或者,可以通过提供 UnboundedChannelOptions 实例在创建无限制通道时指定非默认行为。 该通道的容量不受限制,并且所有写入均以同步方式执行
有界通道 创建有界通道时,该通道将绑定到最大容量。 达到边界时,默认行为是通道异步阻止生成者,直到空间可用。 可以通过在创建通道时指定选项来配置此行为。 可以使用任何大于零的容量值创建有界通道
模式行为 BoundedChannelFullMode.Wait 这是默认值。 WriteAsync调用 以等待空间可用以完成写入操作。 调用 以 TryWrite 立即返回 false 。
BoundedChannelFullMode.DropNewest 删除并忽略通道中的最新项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropOldest 删除并忽略通道中的最旧项,以便为要写入的项留出空间。 BoundedChannelFullMode.DropWrite 删除要写入的项。
Channel.Writer API 生成者功能在 Channel<TWrite,TRead>.Writer 上公开。 下表详细介绍了生成者 API 和预期行为:
ChannelWriter.Complete 将通道标记为已完成,这意味着不再向该通道写入更多项。
ChannelWriter.TryComplete 尝试将通道标记为已完成,这意味着不会向通道写入更多数据。
ChannelWriter.TryWrite 尝试将指定的项写入到通道。 当与无界通道一起使用时,除非通道的编写器通过 ChannelWriter.Complete 或 ChannelWriter.TryComplete 发出完成信号,否则这将始终返回 true。
ChannelWriter.WaitToWriteAsync 返回一个 ValueTask ,当有空间可以写入项时完成。 ChannelWriter.WriteAsync 以异步方式将项写入到通道
Channel.Reader API ChannelReader.ReadAllAsync 创建允许从通道中读取所有数据的 IAsyncEnumerable。
ChannelReader.ReadAsync 以异步方式从通道中读取项。
ChannelReader.TryPeek 尝试从通道中查看项。
ChannelReader.TryRead 尝试从通道中读取项。
ChannelReader.WaitToReadAsync 返回在 ValueTask 数据可供读取时完成的 。
channel的具体使用 https://learn.microsoft.com/zh-cn/dotnet/core/extensions/channels
基于channel实现事件总线 EventDiscriptorAttribute 特性定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 [AttributeUsage(AttributeTargets.Class,AllowMultiple = false,Inherited = false) ] public class EventDiscriptorAttribute :Attribute { public string EventName { get ; private set ; } public int Capacity { get ; private set ; } public bool SigleReader { get ; private set ; } public EventDiscriptorAttribute (string eventName, int capacity = 1000 , bool sigleReader = true ) { EventName = eventName; Capacity = capacity; SigleReader = sigleReader; } }
定义通道容器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class EventHandlerContainer : IEventHandlerContainer { public ConcurrentBag<EventDiscription> Events { get ; private set ; } private readonly IServiceCollection Services; public EventHandlerContainer (IServiceCollection services ) { Events = new ConcurrentBag<EventDiscription>(); Services = services; services.AddSingleton<IEventHandlerContainer>(this ); } private bool Check (Type type ) { var discription = Events.FirstOrDefault(p=>p.EtoType == type); return discription is null ; } public void Subscribe (Type eto,Type handler ) { if (!Check(eto)) { return ; } Events.Add(new EventDiscription(eto, handler)); var handlerbaseType = typeof (IEventHandler<>); var handlertype = handlerbaseType.MakeGenericType(eto); if (Services.Any(P=>P.ServiceType==handlertype)) { return ; } Services.AddTransient(handlertype, handler); } public void Subscribe <TEto , THandler >() where TEto : class where THandler :IEventHandler<TEto> { Subscribe(typeof (TEto),typeof (THandler)); }
事件管理器 事件管理器通过线程安全字典管理事件通道和事件的触发
可以看到在Subscribe 方法中消费者并不是在订阅后立即执行的而是放到EventTrigger中的定义的异步事件中去
消费者执行最后又.,NET提供的托管任务去执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 public class EventHandlerManager : IEventHandlerManager , IDisposable { private ConcurrentDictionary<string , Channel<string >> Channels; private bool IsDiposed = false ; private readonly IServiceProvider ServiceProvider; private readonly CancellationToken _cancellation; private readonly IEventHandlerContainer _eventHandlerContainer; private readonly ILogger _logger; private bool IsInitConsumer = true ; public EventHandlerManager ( IServiceProvider serviceProvider, IEventHandlerContainer eventHandlerContainer, ILoggerFactory loggerFactory ) { ServiceProvider = serviceProvider; _cancellation = CancellationToken.None; _eventHandlerContainer = eventHandlerContainer; Channels = new ConcurrentDictionary<string , Channel<string >>(); _logger = loggerFactory.CreateLogger<IEventHandlerManager>(); } public async Task CreateChannles () { var eventDiscriptions = _eventHandlerContainer.Events; foreach (var item in eventDiscriptions) { var attribute = item.EtoType .GetCustomAttributes() .OfType<EventDiscriptorAttribute>() .FirstOrDefault(); if (attribute is null ) { ThorwEventAttributeNullException.ThorwException(); } var channel = Channels.GetValueOrDefault(attribute.EventName); if (channel is not null ) { return ; } channel = Channel.CreateBounded<string >( new BoundedChannelOptions(attribute.Capacity) { SingleWriter = true , SingleReader = false , AllowSynchronousContinuations = false , FullMode = BoundedChannelFullMode.Wait } ); Channels.TryAdd(attribute.EventName, channel); _logger.LogInformation($"创建通信管道{item.EtoType} --{attribute.EventName} " ); } await Task.CompletedTask; } private Channel<string > Check (Type type ) { var attribute = type.GetCustomAttributes() .OfType<EventDiscriptorAttribute>() .FirstOrDefault(); if (attribute is null ) { ThorwEventAttributeNullException.ThorwException(); } var channel = Channels.GetValueOrDefault(attribute.EventName); if (channel is null ) { ThrowChannelNullException.ThrowException(attribute.EventName); } return channel; } public void Dispose () { IsDiposed = true ; IsInitConsumer = true ; _cancellation.ThrowIfCancellationRequested(); } public async Task WriteAsync <TEto >(TEto eto ) where TEto : class { var channel = Check(typeof (TEto)); while (await channel.Writer.WaitToWriteAsync()) { var data = JsonConvert.SerializeObject(eto); await channel.Writer.WriteAsync(data, _cancellation); break ; } } public async Task StartConsumer () { if (!IsInitConsumer) { return ; } foreach (var item in _eventHandlerContainer.Events) { _ = Task.Factory.StartNew(async () => { var attribute = item.EtoType .GetCustomAttributes() .OfType<EventDiscriptorAttribute>() .FirstOrDefault(); var scope = ServiceProvider.CreateAsyncScope(); var channel = Check(item.EtoType); var handlerType = typeof (IEventHandler<>).MakeGenericType(item.EtoType); var handler = scope.ServiceProvider.GetRequiredService(handlerType); var reader = channel.Reader; try { while (await channel.Reader.WaitToReadAsync()) { while (reader.TryRead(out string str)) { var data = JsonConvert.DeserializeObject(str, item.EtoType); _logger.LogInformation(str); await (Task) handlerType .GetMethod("HandelrAsync" ) .Invoke(handler, new object [] { data }); } } } catch (Exception e) { _logger.LogInformation($"本地事件总线异常{e.Source} --{e.Message} --{e.Data} " ); throw ; } }); } IsInitConsumer = false ; await Task.CompletedTask; } }
托管任务执行EventHandlerManager StartConsumer()方法 1 2 3 4 5 6 7 8 9 10 11 12 public class EventBusBackgroundService : BackgroundService { private readonly IEventHandlerManager _eventHandlerManager; public EventBusBackgroundService (IEventHandlerManager eventHandlerManager ) { _eventHandlerManager = eventHandlerManager; } protected override async Task ExecuteAsync (CancellationToken stoppingToken ) { await _eventHandlerManager.StartConsumer(); } }
拓展类定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public static class EventBusExtensions { public static IServiceCollection AddEventBusAndSubscribes (this IServiceCollection services,Action<EventHandlerContainer> action ) { services.AddSingleton<IEventHandlerManager, EventHandlerManager>(); services.AddTransient<ILocalEventBus, LocalEventBus>(); services.AddHostedService<EventBusBackgroundService>(); EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services); action.Invoke(eventHandlerContainer); return services; } public static async Task InitChannles (this IServiceProvider serviceProvider,Action<IEventHandlerManager> action ) { var scope = serviceProvider.CreateAsyncScope(); var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>(); await eventhandlerManager.CreateChannles(); action.Invoke(eventhandlerManager); } public static IServiceCollection AddEventBus (this IServiceCollection services ) { services.AddSingleton<IEventHandlerManager, EventHandlerManager>(); services.AddTransient<ILocalEventBus, LocalEventBus>(); services.AddHostedService<EventBusBackgroundService>(); return services; } public static IServiceCollection Subscribes (this IServiceCollection services, Action<EventHandlerContainer> action ) { EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services); action.Invoke(eventHandlerContainer); return services; } }
使用
1 2 3 4 5 6 7 8 9 10 11 12 context.Services.AddEventBus(); //添加通信管道 context.Services.Subscribes(p => { p.Subscribe<TestEto,TestEventHandler>(); }); // var scope = context.ServiceProvider.CreateScope(); var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>(); //初始化通信管道 await eventhandlerManager.CreateChannles();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 //定义EventHandler public class TestEventHandler : IEventHandler<TestEto>,ITransientInjection { private ILogger _logger; public TestEventHandler(ILoggerFactory factory) { _logger = factory.CreateLogger<TestEventHandler>(); } public Task HandelrAsync(TestEto eto) { _logger.LogInformation($"{typeof(TestEto).Name}--{eto.Name}--{eto.Description}"); return Task.CompletedTask; } } //构造函数注入 [HttpGet] public async Task TestLocalEventBus() { TestEto eto = null; for(var i = 0; i < 100; i++) { eto = new TestEto() { Name ="LocalEventBus" + i.ToString(), Description ="wyg"+i.ToString(), }; await _localEventBus.PublichAsync(eto); } }
总结 作为一个才毕业一年的初级程序员的我来说这次的channel的事件总线的封装还存在着许多不足
1.无法对消息进行持久化的管理
2.没有对消息异常进行处理
3.没有做到像abp那样自动订阅
当然还存在着一些我不知道问题,欢迎各位大佬提出问题指正
源码链接