using CMS.Plugin.FlowManagement.Abstractions.FlowBusiness; using CMS.Plugin.HIAWms.Apis; using CMS.Plugin.HIAWms.Application.Contracts.Dtos.WmsContainers; using CMS.Plugin.HIAWms.Domain.WmsContainers; using CMS.Plugin.HIAWms.Jobs; using CMS.Project; using CMS.Project.Abstractions; using CMS.Unit.RuntimeValue.Abstractions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Volo.Abp.BackgroundJobs; using Volo.Abp.Uow; namespace CMS.Plugin.HIAWms.ProjectService { /// /// 工程服务,和工程关联的后台服务,当以当前Key调用时会被执行 /// public class HIAWmsProjectService : BaseProjectService { private IServiceProvider _serviceProvider; private readonly ILogger _logger; private readonly IVariableDataCache _variableDataCache; private FlowVariableChannelListener _channelListener; private Dictionary _monitorVariableNames; /// /// 服务的Key,唯一,供使用 /// public override string Key => "HIAWms"; /// /// 服务描述,显示在服务列表UI上的名称 /// public override string Description => "HIAWms服务"; /// /// 启用授权 /// public override bool AuthRequired => false; /// /// Initializes a new instance of the class. /// /// The logger. /// The variable data cache. public HIAWmsProjectService(IServiceProvider serviceProvider, ILogger logger, IVariableDataCache variableDataCache) { _serviceProvider = serviceProvider; _logger = logger; _variableDataCache = variableDataCache; } /// /// 开启服务 /// /// 具有工程上下文的实例 public override async Task StartAsync(IServiceProvider serviceProvider) { if (State == ProjectServiceState.Started) { return; } // 监听变量 _monitorVariableNames = new Dictionary { { "HIAWms_Variable1", "监听变量1" }, { "HIAWms_Variable2", "监听变量2" } }; // 创建通道监听 _channelListener?.Token?.Dispose(); _channelListener = new FlowVariableChannelListener(_logger, _variableDataCache); _channelListener.CreateChannel(Key, waitListener: false, timeout: TimeSpan.FromSeconds(30), variableFilter: _monitorVariableNames.Keys.ToHashSet()); _channelListener.TagChanged += OnTagValueChanged; await base.StartAsync(serviceProvider); } /// /// 停止服务 /// /// 具有工程上下文的实例 public override async Task StopAsync(IServiceProvider serviceProvider) { if (_channelListener != null) { // 释放监听 _channelListener.TagChanged -= OnTagValueChanged; _channelListener.Token.Dispose(); _channelListener = null; } // 使用后台作业异步处理 //await _serviceProvider.GetRequiredService().EnqueueAsync(new HIAWmsArgs //{ // Subject = "HIAWms_Subject", // Body = "HIAWms_Body", //}); await base.StopAsync(serviceProvider); } /// /// Called when [tag value changed]. /// /// The sender. /// The instance containing the event data. private async void OnTagValueChanged(object sender, TagChangedEventArgs e) { var changeds = e.Changeds.Where(x => _monitorVariableNames != null && _monitorVariableNames.ContainsKey(x.Name)); if (!changeds.Any()) { return; } foreach (var changed in changeds) { var oldValue = changed.Old?.Value; var newValue = changed.New?.Value; var traceId = e.TraceId; _logger.LogInformation($"{changed.Name} 变量值发生变化,旧值{oldValue}=新值{newValue},TraceId={traceId}"); // TODO: 处理变量值变化 // Tips:https://cms-docs.shengyc.com/cms/api/%E5%90%8E%E7%AB%AF#3-%E5%8F%98%E9%87%8F%E6%A8%A1%E5%9D%97 /* 说明:通过订阅 IVariableDataCache.TagChanged 事件,您可以实时监控变量的变化。此事件会传递所有变量至事件处理函数,因此,业务层需在函数中筛选关注的变量。 注意事项: (1)性能影响: 发布事件时,事件的发送者将阻塞流程。因此,强烈建议避免在事件处理函数中执行 I/ O 操作、HTTP 接口访问或其他耗时操作,以防止对系统性能产生严重影响,导致整个系统响应延迟。 (2)高频率触发: 由于事件订阅了全量变量,触发频率可能非常高。 (3)异步处理: 鉴于事件触发频率很高,建议业务层在筛选关注变量后,使用 Task 启动新线程处理业务逻辑,以避免阻塞核心的变量监听功能,实现业务层与平台基座的解耦。 (4)并发管理: 如果业务层并发量大,必须优化代码设计和实施,以减少在高并发情况下的系统资源消耗,防止系统性能问题。 (5)代码安全: 安装并使用 CMS.CodeAnalysis 分析器来分析 IVariableDataCache.TagChanged 的使用情况。该工具能在使用不当时提供编译错误,帮助您提高代码质量。*/ _ = Task.Run(async () => { // 例1:同步处理 //await ProcessAsync(); // 例2:调用外部API //await ExecuteExternalApiAsync(); }); } } /// /// Processes the asynchronous. /// private async Task ProcessAsync() { using var scope = _serviceProvider.CreateScope(); var unitOfWorkManager = scope.ServiceProvider.GetRequiredService(); using var uow = unitOfWorkManager.Begin(requiresNew: true); var wmscontainerRepository = scope.ServiceProvider.GetRequiredService(); var count = await wmscontainerRepository.GetCountAsync(); // 如果有更新数据库操作,需提交保存 // await uow.SaveChangesAsync(); _logger.LogInformation($"ProcessAsync,Count={count}"); } } }