生成者/消费者概念编程模型

通道是生成者/使用者概念编程模型的实现。 在此编程模型中,生成者异步生成数据,使用者异步使用该数据。 换句话说,此模型将数据从一方移交给另一方。 尝试将通道视为任何其他常见的泛型集合类型,例如 List。 主要区别在于,此集合管理同步,并通过工厂创建选项提供各种消耗模型。 这些选项控制通道的行为,例如允许它们存储的元素数,以及达到该限制时会发生什么情况,或者通道是由多个生成者还是多个使用者同时访问

channel简介

channel提供了用于在生成者和使用者之间以异步方式传递数据的一组同步数据结构。

channel(管道)提供了有界通道和无界通道

无界通道

该通道可以同时供任意数量的读取器和编写器使用。 或者,可以通过提供 UnboundedChannelOptions 实例在创建无限制通道时指定非默认行为。 该通道的容量不受限制,并且所有写入均以同步方式执行

有界通道

创建有界通道时,该通道将绑定到最大容量。 达到边界时,默认行为是通道异步阻止生成者,直到空间可用。 可以通过在创建通道时指定选项来配置此行为。 可以使用任何大于零的容量值创建有界通道

模式行为

BoundedChannelFullMode.Wait

这是默认值。 WriteAsync调用 以等待空间可用以完成写入操作。 调用 以 TryWrite 立即返回 false 。

BoundedChannelFullMode.DropNewest

删除并忽略通道中的最新项,以便为要写入的项留出空间。

BoundedChannelFullMode.DropOldest

删除并忽略通道中的最旧项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropWrite 删除要写入的项。

Channel.Writer API

生成者功能在 Channel<TWrite,TRead>.Writer 上公开。 下表详细介绍了生成者 API 和预期行为:

ChannelWriter.Complete

将通道标记为已完成,这意味着不再向该通道写入更多项。

ChannelWriter.TryComplete

尝试将通道标记为已完成,这意味着不会向通道写入更多数据。

ChannelWriter.TryWrite

尝试将指定的项写入到通道。 当与无界通道一起使用时,除非通道的编写器通过 ChannelWriter.Complete 或 ChannelWriter.TryComplete 发出完成信号,否则这将始终返回 true。

ChannelWriter.WaitToWriteAsync

返回一个 ValueTask ,当有空间可以写入项时完成。
ChannelWriter.WriteAsync 以异步方式将项写入到通道

Channel.Reader API

ChannelReader.ReadAllAsync

创建允许从通道中读取所有数据的 IAsyncEnumerable

ChannelReader.ReadAsync

以异步方式从通道中读取项。

ChannelReader.TryPeek

尝试从通道中查看项。

ChannelReader.TryRead

尝试从通道中读取项。

ChannelReader.WaitToReadAsync

返回在 ValueTask 数据可供读取时完成的 。

channel的具体使用

https://learn.microsoft.com/zh-cn/dotnet/core/extensions/channels

基于channel实现事件总线

EventDiscriptorAttribute 特性定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[AttributeUsage(AttributeTargets.Class,AllowMultiple = false,Inherited = false)]
public class EventDiscriptorAttribute:Attribute
{
/// <summary>
/// 事件2名称
/// </summary>
public string EventName { get; private set; }
/// <summary>
/// channel 容量设置
/// </summary>
public int Capacity { get; private set; }
/// <summary>
/// 是否维持一个生产者多个消费者模型
/// </summary>
public bool SigleReader { get; private set; }

public EventDiscriptorAttribute(string eventName, int capacity = 1000, bool sigleReader = true)
{
EventName = eventName;
Capacity = capacity;
SigleReader = sigleReader;
}
}

定义通道容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//通道容器单列注入,在拓展类中初始化
public class EventHandlerContainer : IEventHandlerContainer
{
public ConcurrentBag<EventDiscription> Events { get; private set; }

private readonly IServiceCollection Services;

public EventHandlerContainer(IServiceCollection services)
{
Events = new ConcurrentBag<EventDiscription>();
Services = services;
services.AddSingleton<IEventHandlerContainer>(this);
}

private bool Check(Type type)
{
var discription = Events.FirstOrDefault(p=>p.EtoType == type);

return discription is null;
}

///订阅并且注入EventHandler
public void Subscribe(Type eto,Type handler)
{
if(!Check(eto))
{
return;
}

Events.Add(new EventDiscription(eto, handler));

var handlerbaseType = typeof(IEventHandler<>);

var handlertype = handlerbaseType.MakeGenericType(eto);

if(Services.Any(P=>P.ServiceType==handlertype))
{
return;
}

Services.AddTransient(handlertype, handler);
}

public void Subscribe<TEto, THandler>()
where TEto : class
where THandler :IEventHandler<TEto>
{
Subscribe(typeof(TEto),typeof(THandler));
}

事件管理器

事件管理器通过线程安全字典管理事件通道和事件的触发

可以看到在Subscribe 方法中消费者并不是在订阅后立即执行的而是放到EventTrigger中的定义的异步事件中去

消费者执行最后又.,NET提供的托管任务去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
public class EventHandlerManager : IEventHandlerManager, IDisposable
{
private ConcurrentDictionary<string, Channel<string>> Channels;

private bool IsDiposed = false;

private readonly IServiceProvider ServiceProvider;

private readonly CancellationToken _cancellation;

private readonly IEventHandlerContainer _eventHandlerContainer;

private readonly ILogger _logger;

private bool IsInitConsumer = true;

public EventHandlerManager(
IServiceProvider serviceProvider,
IEventHandlerContainer eventHandlerContainer,
ILoggerFactory loggerFactory
)
{
ServiceProvider = serviceProvider;
_cancellation = CancellationToken.None;
_eventHandlerContainer = eventHandlerContainer;
Channels = new ConcurrentDictionary<string, Channel<string>>();
_logger = loggerFactory.CreateLogger<IEventHandlerManager>();
}

public async Task CreateChannles()
{
var eventDiscriptions = _eventHandlerContainer.Events;

foreach (var item in eventDiscriptions)
{
var attribute = item.EtoType
.GetCustomAttributes()
.OfType<EventDiscriptorAttribute>()
.FirstOrDefault();

if (attribute is null)
{
ThorwEventAttributeNullException.ThorwException();
}

var channel = Channels.GetValueOrDefault(attribute.EventName);

if (channel is not null)
{
return;
}

channel = Channel.CreateBounded<string>(
new BoundedChannelOptions(attribute.Capacity)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait
}
);

Channels.TryAdd(attribute.EventName, channel);

_logger.LogInformation($"创建通信管道{item.EtoType}--{attribute.EventName}");
}
await Task.CompletedTask;
}

private Channel<string> Check(Type type)
{
var attribute = type.GetCustomAttributes()
.OfType<EventDiscriptorAttribute>()
.FirstOrDefault();

if (attribute is null)
{
ThorwEventAttributeNullException.ThorwException();
}

var channel = Channels.GetValueOrDefault(attribute.EventName);

if (channel is null)
{
ThrowChannelNullException.ThrowException(attribute.EventName);
}

return channel;
}

public void Dispose()
{
IsDiposed = true;
IsInitConsumer = true;
_cancellation.ThrowIfCancellationRequested();
}

/// <summary>
/// 生产者
/// </summary>
/// <typeparam name="TEto"></typeparam>
/// <param name="eto"></param>
/// <returns></returns>
public async Task WriteAsync<TEto>(TEto eto)
where TEto : class
{
var channel = Check(typeof(TEto));
//因为使用的是有限容量通道存在消息堆积情况,在消息堆积的情况下处理等待消息是否可写入
while (await channel.Writer.WaitToWriteAsync())
{
var data = JsonConvert.SerializeObject(eto);

await channel.Writer.WriteAsync(data, _cancellation);

break;
}
}

/// <summary>
/// 消费者
/// </summary>
/// <returns></returns>
public async Task StartConsumer()
{
if (!IsInitConsumer)
{
return;
}

foreach (var item in _eventHandlerContainer.Events)
{
_ = Task.Factory.StartNew(async () =>
{
var attribute = item.EtoType
.GetCustomAttributes()
.OfType<EventDiscriptorAttribute>()
.FirstOrDefault();

var scope = ServiceProvider.CreateAsyncScope();

var channel = Check(item.EtoType);

var handlerType = typeof(IEventHandler<>).MakeGenericType(item.EtoType);

var handler = scope.ServiceProvider.GetRequiredService(handlerType);

var reader = channel.Reader;

try
{
while (await channel.Reader.WaitToReadAsync())
{
while (reader.TryRead(out string str))
{
var data = JsonConvert.DeserializeObject(str, item.EtoType);

_logger.LogInformation(str);

await (Task)
handlerType
.GetMethod("HandelrAsync")
.Invoke(handler, new object[] { data });
}
}
}
catch (Exception e)
{
_logger.LogInformation($"本地事件总线异常{e.Source}--{e.Message}--{e.Data}");
throw;
}
});
}
IsInitConsumer = false;
await Task.CompletedTask;
}
}

托管任务执行EventHandlerManager StartConsumer()方法

1
2
3
4
5
6
7
8
9
10
11
12
public class EventBusBackgroundService : BackgroundService
{
private readonly IEventHandlerManager _eventHandlerManager;
public EventBusBackgroundService(IEventHandlerManager eventHandlerManager)
{
_eventHandlerManager = eventHandlerManager;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _eventHandlerManager.StartConsumer();
}
}

拓展类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public  static class EventBusExtensions
{
//添加事件总线并且添加channle管道
public static IServiceCollection AddEventBusAndSubscribes(this IServiceCollection services,Action<EventHandlerContainer> action)
{
services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

services.AddTransient<ILocalEventBus, LocalEventBus>();

services.AddHostedService<EventBusBackgroundService>();

EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

action.Invoke(eventHandlerContainer);

return services;
}

//创建通信管道
public static async Task InitChannles(this IServiceProvider serviceProvider,Action<IEventHandlerManager> action)
{
var scope = serviceProvider.CreateAsyncScope();

var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();

await eventhandlerManager.CreateChannles();

action.Invoke(eventhandlerManager);
}

//添加本地事件总线
public static IServiceCollection AddEventBus(this IServiceCollection services)
{
services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

services.AddTransient<ILocalEventBus, LocalEventBus>();

services.AddHostedService<EventBusBackgroundService>();

return services;
}

//添加通信管道
public static IServiceCollection Subscribes(this IServiceCollection services, Action<EventHandlerContainer> action)
{
EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

action.Invoke(eventHandlerContainer);

return services;
}
}

使用


1
2
3
4
5
6
7
8
9
10
11
12
context.Services.AddEventBus();
//添加通信管道
context.Services.Subscribes(p =>
{
p.Subscribe<TestEto,TestEventHandler>();
});
//
var scope = context.ServiceProvider.CreateScope();

var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
//初始化通信管道
await eventhandlerManager.CreateChannles();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  //定义EventHandler
public class TestEventHandler : IEventHandler<TestEto>,ITransientInjection
{
private ILogger _logger;
public TestEventHandler(ILoggerFactory factory)
{
_logger = factory.CreateLogger<TestEventHandler>();
}
public Task HandelrAsync(TestEto eto)
{
_logger.LogInformation($"{typeof(TestEto).Name}--{eto.Name}--{eto.Description}");
return Task.CompletedTask;
}
}

//构造函数注入
[HttpGet]
public async Task TestLocalEventBus()
{
TestEto eto = null;

for(var i = 0; i < 100; i++)
{
eto = new TestEto()
{
Name ="LocalEventBus" + i.ToString(),
Description ="wyg"+i.ToString(),
};
await _localEventBus.PublichAsync(eto);
}
}

总结

作为一个才毕业一年的初级程序员的我来说这次的channel的事件总线的封装还存在着许多不足

1.无法对消息进行持久化的管理

2.没有对消息异常进行处理

3.没有做到像abp那样自动订阅

当然还存在着一些我不知道问题,欢迎各位大佬提出问题指正

源码链接