YARP + Nacos 动态服务发现与负载均衡实现 前言 在微服务架构中,服务发现和负载均衡是核心组件。本文介绍如何结合 YARP (Yet Another Reverse Proxy) 和 Nacos 实现动态服务发现与负载均衡。
经过调研,发现 Nacos 官方库提供了 YARP + Nacos 的扩展,其核心是通过后台任务 BackgroundService 轮询 Nacos 服务列表来动态更新自定义的 IProxyConfigProvider。但这种方式过于复杂繁琐。
实际上,Nacos SDK 本身提供了服务订阅机制,通过 Timer 轮询 Nacos 服务列表并通知服务更改,具体可查看 ServiceInfoHolder、InstancesChangeNotifier、IEventListener 等核心类。本文将基于这种更简洁的方式实现。
环境准备 Nacos 服务搭建 本文使用 r-nacos ,这是一个基于 Rust 编写的更轻量、性能更高的 Nacos 服务实现。
docker-compose.yml 配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 version: '3.8' services: nacos: image: qingpan/rnacos:stable container_name: nacos ports: - "8848:8848" - "9848:9848" - "10848:10848" volumes: - ./data:/io:rw environment: - RNACOS_INIT_ADMIN_USERNAME=admin - RNACOS_INIT_ADMIN_PASSWORD=admin - RNACOS_HTTP_PORT=8848 restart: always
架构设计 类关系图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 classDiagram class INacosNamingService { +Subscribe(serviceName, groupName, listener) +GetServiceInfo(serviceName, groupName) } class IEventListener { <<interface>> +OnEvent(IEvent event) } class DefaultServiceChangeListener { -ILogger _logger -IProxyConfigStorage _proxyConfigStorage -IServiceProvider _serviceProvider +OnEvent(IEvent event) } class InstancesChangeEvent { +string ServiceName +string GroupName +List~Instance~ Hosts } class IProxyConfigStorage { <<interface>> +string GroupName +string ServiceName +RouteConfig RouteConfig +ClusterConfig ClusterConfig +SetRouteConfig(RouteConfig) +SetClusterConfig(ClusterConfig) } class ProxyConfigStorage { +string ServiceName +string GroupName +RouteConfig RouteConfig +ClusterConfig ClusterConfig +SetRouteConfig(RouteConfig) +SetClusterConfig(ClusterConfig) } class INacosProxyConfigStorage { <<interface>> +LoadProxyConfig() (Routes, Clusters) +LoadProxyConfigAsync() Task~(Routes, Clusters)~ } class NacosProxyConfigStorage { -ILogger _logger -INacosNamingService _nacosNamingService -IServiceProvider _serviceProvider -ConcurrentDictionary~string, IProxyConfigStorage~ _proxyConfigStorages +LoadProxyConfig() (Routes, Clusters) +LoadProxyConfigAsync() Task~(Routes, Clusters)~ } class IProxyConfigChange { <<interface>> +Refresh() +RefreshAsync() Task } class ProxyConfigChange { -INacosProxyConfigStorage _nacosProxyConfigStorage -InMemoryConfigProvider _memoryConfigProvider +Refresh() +RefreshAsync() Task } class InMemoryConfigProvider { +Update(List~RouteConfig~ routes, List~ClusterConfig~ clusters) } class IProxyConfigProvider { <<interface>> +GetConfig() ProxyConfig } class YARP { +AddReverseProxy() +AddTransforms() } %% 关系定义 INacosNamingService --> DefaultServiceChangeListener : 订阅服务变更 IEventListener <|.. DefaultServiceChangeListener : 实现 DefaultServiceChangeListener --> InstancesChangeEvent : 处理 DefaultServiceChangeListener --> IProxyConfigStorage : 更新配置 IProxyConfigStorage <|.. ProxyConfigStorage : 实现 NacosProxyConfigStorage --> INacosNamingService : 使用 INacosProxyConfigStorage <|.. NacosProxyConfigStorage : 实现 NacosProxyConfigStorage --> IProxyConfigStorage : 管理 DefaultServiceChangeListener --> IProxyConfigChange : 触发刷新 IProxyConfigChange <|.. ProxyConfigChange : 实现 ProxyConfigChange --> INacosProxyConfigStorage : 获取配置 ProxyConfigChange --> InMemoryConfigProvider : 更新配置 InMemoryConfigProvider ..|> IProxyConfigProvider : 实现 YARP --> IProxyConfigProvider : 使用配置
工作流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 sequenceDiagram participant Nacos as Nacos 服务注册中心 participant Listener as DefaultServiceChangeListener participant Storage as ProxyConfigStorage participant Change as ProxyConfigChange participant Memory as InMemoryConfigProvider participant YARP as YARP 代理 Note over Nacos, YARP: 服务启动阶段 NacosProxyConfigStorage->>Nacos: 订阅服务变更 NacosProxyConfigStorage->>Listener: 创建监听器 NacosProxyConfigStorage->>Storage: 初始化配置存储 Note over Nacos, YARP: 服务变更阶段 Nacos->>Listener: 发送 InstancesChangeEvent Listener->>Listener: 解析服务实例信息 Listener->>Storage: 更新集群配置 Listener->>Change: 触发配置刷新 Change->>Memory: 更新内存配置 Memory->>YARP: 通知配置变更 YARP->>YARP: 重新加载代理配置
核心实现 1. 自定义服务变更监听器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public class DefaultServiceChangeListener : IEventListener { private readonly ILogger _logger; private readonly IProxyConfigStorage _proxyConfigStorage; private readonly IServiceProvider _serviceProvider; public DefaultServiceChangeListener (ILogger logger ) { _logger = logger; } public DefaultServiceChangeListener ( ILogger logger, IProxyConfigStorage proxyConfigStorage, IServiceProvider serviceProvider ) { _logger = logger; _proxyConfigStorage = proxyConfigStorage; _serviceProvider = serviceProvider; } public async Task OnEvent (IEvent @event ) { if (@event is InstancesChangeEvent e) { var service = e.ServiceName; var groupName = e.GroupName; var hosts = e.Hosts; var index = 1 ; var destinations = new Dictionary<string , DestinationConfig>(); foreach (var host in hosts) { _logger.LogInformation( $"Service: {service} , Group: {groupName} , " + $"Host: {host.Ip} :{host.Port} , Weight: {host.Weight} , " + $"Enabled: {host.Enabled} , Healthy: {host.Healthy} , " + $"Metadata: {string .Join("," , host.Metadata.Select(m => $"{m.Key} :{m.Value} " ))} " ); if (!host.Healthy) { continue ; } var metadata = host.Metadata; if (!metadata.TryGetValue("ClusterId" , out var clusterId) || string .IsNullOrEmpty(clusterId)) { _logger.LogWarning( $"Service: {service} , Group: {groupName} , " + $"Host: {host.Ip} :{host.Port} - Metadata ClusterId is null or empty" ); continue ; } var key = $"{clusterId} -{index} " ; var destinationConfig = new DestinationConfig { Address = $"http://{host.Ip} :{host.Port} " }; destinations.Add(key, destinationConfig); index++; } var cluster = new ClusterConfig { ClusterId = "material-zhaojin" , Destinations = destinations }; _proxyConfigStorage.SetClusterConfig(cluster); _logger.LogInformation($"Service: {service} , Group: {groupName} - Configuration updated" ); using var scope = _serviceProvider.CreateScope(); var proxyConfigChange = scope.ServiceProvider.GetRequiredService<IProxyConfigChange>(); proxyConfigChange.Refresh(); _logger.LogInformation($"Service: {service} , Group: {groupName} - Refresh completed" ); await Task.CompletedTask; } } }
2. 代理配置存储接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public interface IProxyConfigStorage { string GroupName { get ; } string ServiceName { get ; } RouteConfig RouteConfig { get ; } ClusterConfig ClusterConfig { get ; } void SetRouteConfig (RouteConfig routeConfig ) ; void SetClusterConfig (ClusterConfig clusterConfig ) ; } public class ProxyConfigStorage : IProxyConfigStorage { public string ServiceName { get ; internal set ; } public string GroupName { get ; internal set ; } public RouteConfig RouteConfig { get ; internal set ; } public ClusterConfig ClusterConfig { get ; internal set ; } public ProxyConfigStorage ( string serviceName, string groupName, RouteConfig routeConfig, ClusterConfig clusterConfig ) { ServiceName = serviceName ?? throw new ArgumentNullException(nameof (serviceName)); GroupName = groupName ?? throw new ArgumentNullException(nameof (groupName)); RouteConfig = routeConfig ?? throw new ArgumentNullException(nameof (routeConfig)); ClusterConfig = clusterConfig ?? throw new ArgumentNullException(nameof (clusterConfig)); } public void SetRouteConfig (RouteConfig routeConfig ) => RouteConfig = routeConfig; public void SetClusterConfig (ClusterConfig clusterConfig ) => ClusterConfig = clusterConfig; }
3. Nacos 代理配置存储 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public interface INacosProxyConfigStorage { (List<RouteConfig> Routes, List<ClusterConfig> Clusters) LoadProxyConfig(); Task<(List<RouteConfig> Routes, List<ClusterConfig> Clusters)> LoadProxyConfigAsync(); } public class NacosProxyConfigStorage : INacosProxyConfigStorage { private readonly ILogger<INacosProxyConfigStorage> _logger; private readonly INacosNamingService _nacosNamingService; private readonly IServiceProvider _serviceProvider; private readonly ConcurrentDictionary<string , IProxyConfigStorage> _proxyConfigStorages = new (); public NacosProxyConfigStorage ( ILogger<INacosProxyConfigStorage> logger, INacosNamingService nacosNamingService, IServiceProvider serviceProvider ) { _logger = logger; _nacosNamingService = nacosNamingService; _serviceProvider = serviceProvider; } public (List<RouteConfig> Routes, List<ClusterConfig> Clusters) LoadProxyConfig() { var routes = new List<RouteConfig>(); var clusters = new List<ClusterConfig>(); if (_proxyConfigStorages.Any()) { routes = _proxyConfigStorages.Values.Select(p => p.RouteConfig).ToList(); clusters = _proxyConfigStorages.Values.Select(p => p.ClusterConfig).ToList(); return (routes, clusters); } _logger.LogInformation("初始化代理配置" ); var route = new RouteConfig { RouteId = "material-route" , ClusterId = "material-zhaojin" , Match = new RouteMatch { Path = "/api/material/{**catch-all}" }, Metadata = new Dictionary<string , string > { { "ExampleMetadata" , "Value" } }, Timeout = TimeSpan.FromSeconds(30 ), MaxRequestBodySize = 1024 * 1024 * 1000 , }; routes.Add(route); var serviceName = "material" ; var groupName = "zhaojin" ; var proxyConfigStorage = new ProxyConfigStorage(serviceName, groupName, route, new ClusterConfig()); _nacosNamingService.Subscribe( proxyConfigStorage.ServiceName, proxyConfigStorage.GroupName, new DefaultServiceChangeListener(_logger, proxyConfigStorage, _serviceProvider)); _proxyConfigStorages.AddOrUpdate(proxyConfigStorage.ServiceName, proxyConfigStorage, (key, value ) => proxyConfigStorage); return (routes, clusters); } public Task<(List<RouteConfig> Routes, List<ClusterConfig> Clusters)> LoadProxyConfigAsync() { return Task.FromResult(LoadProxyConfig()); } }
4. 代理配置变更处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public interface IProxyConfigChange { void Refresh () ; Task RefreshAsync () ; } public class ProxyConfigChange : IProxyConfigChange { private readonly INacosProxyConfigStorage _nacosProxyConfigStorage; private readonly InMemoryConfigProvider _memoryConfigProvider; public ProxyConfigChange ( INacosProxyConfigStorage nacosProxyConfigStorage, InMemoryConfigProvider memoryConfigProvider ) { _nacosProxyConfigStorage = nacosProxyConfigStorage; _memoryConfigProvider = memoryConfigProvider; } public void Refresh () { var proxyConfig = _nacosProxyConfigStorage.LoadProxyConfig(); _memoryConfigProvider.Update(proxyConfig.Routes, proxyConfig.Clusters); } public Task RefreshAsync () { Refresh(); return Task.CompletedTask; } }
5. 服务注册配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 builder.Services.AddReverseProxy().AddTransforms(ctx => { ctx.AddRequestTransform(async transformContext => { if (transformContext.HttpContext.Request.RouteValues.TryGetValue("catch-all" , out var catchAllValue) || transformContext.HttpContext.Request.RouteValues.TryGetValue("remainder" , out catchAllValue)) { var catchAllPath = catchAllValue?.ToString(); var newPath = $"/api/{catchAllPath} " ; transformContext.Path = new PathString(newPath); } await Task.CompletedTask; }); }); builder.Services.AddTransient<IProxyConfigChange, ProxyConfigChange>(); builder.Services.AddSingleton<INacosProxyConfigStorage, NacosProxyConfigStorage>(); builder.Services.AddSingleton(sp => { var proxyConfigStorage = sp.GetRequiredService<INacosProxyConfigStorage>(); var proxyConfig = proxyConfigStorage.LoadProxyConfig(); return new InMemoryConfigProvider(proxyConfig.Routes, proxyConfig.Clusters); }); builder.Services.AddSingleton<IProxyConfigProvider>(sp => sp.GetRequiredService<InMemoryConfigProvider>());
运行效果 配置完成后,系统将自动监听 Nacos 中的服务变更,并动态更新 YARP 的代理配置。以下是运行效果截图:
总结与扩展 本文提供了一个基于 YARP + Nacos 的动态服务发现与负载均衡的简化实现。在实际生产环境中,建议考虑以下扩展:
建议的改进方向
配置持久化 :将 YARP 的路由配置持久化到数据库中,提高配置管理的灵活性
健康检查 :为集群节点添加更完善的健康检查策略
负载均衡策略 :实现更丰富的负载均衡算法(轮询、权重、最少连接等)
限流熔断 :集成限流和熔断机制,提高系统稳定性
监控告警 :添加详细的监控指标和告警机制
相关资源
这个实现虽然相对简单,但为构建更复杂的微服务网关提供了良好的基础。您可以根据实际需求进行扩展和优化。