目录
定时任务需求 核心逻辑 使用 Redis 实现秒级定时任务 业务服务实现动态代码 最后
定时任务需求
本文示例项目仓库:https://github.com/whuanle/HangfireDemo
主要有两个核心需求:
• 需要实现秒级定时任务; • 开发者使用定时任务要简单,不要弄复杂了;
在微服务架构中中,定时任务是最常用的基础设施组件之一,社区中有很多定时任务类库或平台,例如 Quartz.NET、xxx-job,使用方法差异很大,比如 xxx-job 的核心是 http 请求,配置定时任务实现 http 请求具体的接口,不过用起来还是比较复杂的。
在微服务中,使用的组件太多了,如果每个组件的集成都搞得很麻烦,那么服务的代码很可能会大量膨胀,并且容易出现各种 bug。以 xxx-job 为例,如果项目中有 N 个定时任务,设计 N 个 http 接口被 xxx-job 回调触发,除了 http 接口数量庞大,在各个环节中还容易出现 bug。
在近期项目需求中,刚好要用到定时任务,结合 C# 语言的特性,笔者的方法是利用 Hangfire 框架和语言特性,封装一些方法,使得开发者可以无感使用定时任务,大大简化链路和使用难度。
使用示例,结合 MediatR 框架定义 CQRS ,该 Command 将会被定时任务触发执行:
public classMyTestRequest:HangfireRequest,IRequestExecteTasResult>{}/// /// 要被定时任务执行的代码./// publicclassMyTestHandler:IRequestHandlerMyTestRequest,ExecteTasResult>{publicasyncTaskExecteTasResult>Handle(MyTestRequest request,CancellationToken cancellationToken){// 逻辑returnnewExecteTasResult{CancelTask=false};}}
要启动一个定时任务,只需要:
private readonlySendHangfireService _hangfireService;publicSendTaskController(SendHangfireService hangfireService){ _hangfireService = hangfireService;}[HttpGet("aaa")]publicasyncTaskstring>SendAsync(){await _hangfireService.Send(newMyTestRequest{CreateTime=DateTimeOffset.Now,CronExpression="* * * * * *",TaskId=Guid.NewGuid().ToString(),});return"aaa";}
通过这种方式使用定时任务,开发者只需要使用很简单的代码即可完成需求,不需要关注细节,也不需要定义各种 http 接口,并且犹豫不需要关注使用的外部定时任务框架,所以随时可以切换不同的定时任务实现。
核心逻辑
本文示例项目仓库:whuanle/HangfireDemo
示例项目结构如下:

HangfireServer 是定时任务服务实现,HangfireServer 服务只需要暴露两个接口 addtask
、cancel
,分别用于添加定时任务和取消定时任务,无论什么业务的服务,都通过 addtask
服务添加。
DemoApi 则是业务服务,业务服务只需要暴露一个· execute
接口用于触发定时任务即可。
基础逻辑如下:
Hangfire
DemoApi
序列化参数Command
添加定时任务
请求
Hangfire:存储任务
addtask
Hangfire:执行任务
发起请求
发送定时任务
定义 Command
DemoApi:执行 Command
DemoApi:execute 接口
由于项目中使用的是 MediatR 框架实现 CQRS 模式,因此很容易实现定时任务动态调用代码,只需要按照平时的 CQRS 发送定时任务命令,指定定时任务要执行的 Command 即可。
例如,有以下 Command 需要被定时任务执行:
ACommandBCommandCCommand
首先这些命令会被序列化为 json ,发送到 HangfireServer 服务,HangfireServer 在恰当时机将参数原封不动推送到 DemoApi 服务,DemoApi 服务拿到这些参数序列化为对应的类型,然后通过 MediatR 发送命令,即可实现任意命令的定时任务动态调用。
下面来分别实现 HangfireServer 、DemoApi 服务。
在 Shred 项目中添加以下文件。

其中 TaskRequest 内容如下,其它文件请参考示例项目。
public classTaskRequest{/// /// 任务 id./// publicstringTaskId{get;set;}="";/// /// 定时任务要请求的服务地址或服务名称./// publicstringServiceName{get;set;}="";/// /// 参数类型名称./// publicstringCommandType{get;set;}="";/// /// 请求参数内容,json 序列化后的字符串./// publicstringCommandBody{get;set;}="";/// /// Cron 表达式./// publicstringCronExpression{get;set;}="";/// /// 创建时间./// publicstringCreateTime{get;set;}="";}
使用 Redis 实现秒级定时任务
Hangfire 本身配置比较复杂,其分布式实现对数据库性能要求比较高,因此使用 Mysql、Sqlserver 等数据库存储数据会带了很大的压力,而且要求实现秒级定时任务,NoSql 数据库可以更加好地实现这一需求,笔者这里使用 Redis 来存储任务数据。
HangfireServer 项目结构如下:

对 HangfireServer 的设计主要分为几步:
• Hangfire 支持容器管理; • 配置 Hangfire ; • 定义 RecurringJobHandler 执行任务发起 http 请求到业务系统; • 定义 http 接口,接收定时任务;
引入类库:
PackageReference Include="Hangfire.AspNetCore" Version="1.8.18" />PackageReference Include="Hangfire.Redis.StackExchange" Version="1.12.0" />
首先是关于 Hangfire 本身的配置,现在几乎都是基于依赖注入的设计,不搞静态类型,所以我们需要实现定时任务执行器创建服务实例的,以便每次定时任务请求时,服务实例都是在一个新的容器,处以一个新的上下文中。
第一步
创建 HangfireJobActivatorScope、HangfireActivator 两个文件,实现 Hangfire 支持容器上下文。
/// /// 任务容器./// publicclassHangfireJobActivatorScope:JobActivatorScope{privatereadonlyIServiceScope _serviceScope;privatereadonlystring _jobId;/// /// Initializes a new instance of the class./// /// /// publicHangfireJobActivatorScope([NotNull]IServiceScope serviceScope,string jobId){ _serviceScope = serviceScope ??thrownewArgumentNullException(nameof(serviceScope)); _jobId = jobId;}/// publicoverrideobjectResolve(Type type){var res =ActivatorUtilities.GetServiceOrCreateInstance(_serviceScope.ServiceProvider, type);return res;}/// publicoverridevoidDisposeScope(){ _serviceScope.Dispose();}}
/// /// JobActivator./// publicclassHangfireActivator:JobActivator{privatereadonlyIServiceScopeFactory _serviceScopeFactory;/// /// Initializes a new instance of the class./// /// publicHangfireActivator(IServiceScopeFactory serviceScopeFactory){ _serviceScopeFactory = serviceScopeFactory ??thrownewArgumentNullException(nameof(serviceScopeFactory));}/// publicoverrideJobActivatorScopeBeginScope(JobActivatorContext context){returnnewHangfireJobActivatorScope(_serviceScopeFactory.CreateScope(), context.BackgroundJob.Id);}}
第二步
配置 Hangfire 服务,使其支持 Redis,并且配置一些参数。
private voidConfigureHangfire(IServiceCollection services){var options =newRedisStorageOptions{// 配置 redis 前缀,每个任务实例都会创建一个 keyPrefix="aaa:aaa:hangfire",}; services.AddHangfire( config =>{ config.UseRedisStorage("{redis连接字符串}", options).SetDataCompatibilityLevel(CompatibilityLevel.Version_180).UseSimpleAssemblyNameTypeSerializer().UseRecommendedSerializerSettings(); config.UseActivator(newHangfireActivator(services.BuildServiceProvider().GetRequiredServiceIServiceScopeFactory>()));}); services.AddHangfireServer(options =>{// 注意,这里必须设置非常小的间隔 options.SchedulePollingInterval=TimeSpan.FromSeconds(1);// 如果考虑到后续任务比较多,则需要调大此参数 options.WorkerCount=50;});}

第三步
实现 RecurringJobHandler 执行定时任务,发起 http 请求业务系统。
被调用方要返回 TaskInterfaceResponse 类型,主要考虑如果被调用方后续不需要在继续此定时任务,那么返回参数 CancelTask = tre
时,定时任务服务直接取消后续的任务即可,不需要被调用方手动调用接口取消。
public classRecurringJobHandler{privatereadonlyIServiceProvider _serviceProvider;publicRecurringJobHandler(IServiceProvider serviceProvider){ _serviceProvider = serviceProvider;}/// /// 执行任务./// /// /// Task.publicasyncTaskHandler(TaskRequest taskRequest){var ioc = _serviceProvider;var recurringJobManager = ioc.GetRequiredServiceIRecurringJobManager>();var httpClientFactory = ioc.GetRequiredServiceIHttpClientFactory>();var logger = ioc.GetRequiredServiceILoggerRecurringJobHandler>>();usingvar httpClient = httpClientFactory.CreateClient(taskRequest.ServiceName);// 无论是否请求成功,都算完成了本次任务try{// 请求子系统的接口var response =await httpClient.PostAsJsonAsync(taskRequest.ServiceName, taskRequest);var execteResult =await response.Content.ReadFromJsonAsyncExecteTasResult>();// 被调用方要求取消任务if(execteResult !=null&& execteResult.CancelTask){ recurringJobManager.RemoveIfExists(taskRequest.TaskId);}}catch(Exception ex){ logger.LogError(ex,"Task error.");}}}
第四步
配置好 Hangfire 后,开始考虑如何接收任务和发起请求,首先定义一个 Http 接口或 grpc 接口。
[ApiController][Route("/execute")]publicclassHangfireController:ControllerBase{privatereadonlyIRecurringJobManager _recurringJobManager;publicHangfireController(IRecurringJobManager recurringJobManager){ _recurringJobManager = recurringJobManager;}[HttpPost("addtask")]publicasyncTaskTaskResponse>AddTask(TaskRequestvalue){awaitTask.CompletedTask; _recurringJobManager.AddOrUpdateRecurringJobHandler>(value.TaskId, task => task.Handler(value), cronExpression:value.CronExpression, options:newRecurringJobOptions{});returnnewTaskResponse{};}[HttpPost("cancel")]publicasyncTaskTaskResponse>Cancel(CancelTaskRequestvalue){awaitTask.CompletedTask; _recurringJobManager.RemoveIfExists(value.TaskId);returnnewTaskResponse{};}}
业务服务实现动态代码
业务服务只需要暴露一个 exceute
接口给 HangfireServer 即可,DemoApi 将 Command 序列化包装为请求参数给 HangfireServer ,然后 HangfireServer 原封不动地将参数请求到 exceute
接口。

对 DemoApi 主要设计过程如下:
• 定义 SendHangfireService 服务,包装 Command 数据和一些定时任务参数,通过 http 发送到 HangfireServer 中; • 定义 ExecuteTaskHandler ,当接口被触发时,实现反序列化参数并使用 MediatR 发送 Command,实现动态执行; • 定义 ExecuteController 接口,接收 HangfireServer 请求,并调用 ExecuteTaskHandler 处理请求;
DemoApi 引入类库如下-:
PackageReference Include="Maomi.Core" Version="2.2.0" />-PackageReference Include="MediatR" Version="12.5.0" />
Maomi.Core 是一个模块化和自动服务注册框架。
第一步
定义 SendHangfireService 服务,包装 Command 数据和一些定时任务参数,通过 http 发送到 HangfireServer 中。
接收 HangfireServer 请求时,需要通过字符串查找出 Type,这就需要 DemoApi 启动时,自动扫描程序集并将对应的类型缓存起来。
为了将定时任务命令和其它 Command 区分处理,需要定义一个统一的抽象,当然也可以不这样做,也可以通过特性注解的方式做处理。
/// /// 定时任务抽象参数./// publicabstractclassHangfireRequest:IRequestHangfireResponse>{/// /// 定时任务 id./// publicstringTaskId{get;init;}=string.Empty;/// /// 该任务创建时间./// publicDateTimeOffsetCreateTime{get;init;}}
定义 HangireTypeFactory ,以便能够通过字符串快速查找 Type。
/// /// 记录 CQRS 中的命令类型,以便能够通过字符串快速查找 Type./// publicclassHangireTypeFactory{privatereadonlyConcurrentDictionarystring,Type> _typeDictionary;publicHangireTypeFactory(){ _typeDictionary =newConcurrentDictionarystring,Type>();}publicvoidAdd(Type type){if(!_typeDictionary.ContainsKey(type.Name)){ _typeDictionary[type.Name]= type;}}publicType?Get(string typeName){if(_typeDictionary.TryGetValue(typeName,outvar type)){return type;}return _typeDictionary.FirstOrDefault(x => x.Value.FullName== typeName).Value;}}
最后实现 SendHangfireService 服务,能够包装参数发送到 HangfireServer 中。
当然,可以使用 CQRS 处理。
/// /// 定时任务服务,用于发送定时任务请求./// [InjectOnScoped]publicclassSendHangfireService{privatestaticreadonlyJsonSerializerOptionsJsonOptions=newJsonSerializerOptions{AllowTrailingCommas=true,PropertyNameCaseInsensitive=true,PropertyNamingPolicy=JsonNamingPolicy.CamelCase,ReadCommentHandling=JsonCommentHandling.Skip};privatereadonlyIHttpClientFactory _httpClientFactory;publicSendHangfireService(IHttpClientFactory httpClientFactory){ _httpClientFactory = httpClientFactory;}/// /// 发送定时任务请求./// /// /// /// /// /// publicasyncTaskSendTCommand>(TCommand request) whereTCommand:HangfireRequest{usingvar httpClient = _httpClientFactory.CreateClient();var taskRequest =newTaskRequest{TaskId= request.TaskId,CommandBody=JsonSerializer.Serialize(request,JsonOptions),ServiceName="http://127.0.0.1:5000/hangfire/execute",CommandType=typeof(TCommand).Name??thrownewTypeLoadException(typeof(TCommand).Name),CreateTime= request.CreateTime.ToUnixTimeMilliseconds().ToString(),CronExpression= request.CronExpression,}; _ =await httpClient.PostAsJsonAsync("http://127.0.0.1:5001/execute/addtask", taskRequest);}/// /// 取消定时任务./// /// /// publicasyncTaskCancel(string taskId){usingvar httpClient = _httpClientFactory.CreateClient(); _ =await httpClient.PostAsJsonAsync("http://127.0.0.1:5001/hangfire/cancel",newCancelTaskRequest{TaskId= taskId});}}
第二步
要实现通过 Type 动态执行某个 Command ,其实思路比较简单,也并不需要表达式树等麻烦的方式。
笔者的实现思路如下,定义 ExecuteTaskHandler 泛型类,直接以强类型的方式触发 Command,但是为了屏蔽泛型类型强类型在代码调用中的麻烦,需要再抽象一个接口 IHangfireTaskHandler 屏蔽泛型。
/// /// 定义执行任务的抽象,便于忽略泛型处理./// publicinterfaceIHangfireTaskHandler{/// /// 执行任务./// /// /// TaskExecteTasResult> Handler(TaskRequest taskRequest);}
/// /// 用于反序列化参数并发送 Command./// /// 命令.publicclassExecuteTaskHandlerTCommand>:IHangfireTaskHandlerwhereTCommand:HangfireRequest,IRequestExecteTasResult>{privatereadonlyIMediator _mediator;/// /// Initializes a new instance of the class./// /// publicExecuteTaskHandler(IMediator mediator){ _mediator = mediator;}privatestaticreadonlyJsonSerializerOptionsJsonSerializerOptions=newJsonSerializerOptions{AllowTrailingCommas=true,PropertyNameCaseInsensitive=true,PropertyNamingPolicy=JsonNamingPolicy.CamelCase,ReadCommentHandling=JsonCommentHandling.Skip};/// publicasyncTaskExecteTasResult>Handler(TaskRequest taskRequest){var command =JsonSerializer.DeserializeTCommand>(taskRequest.CommandBody,JsonSerializerOptions)!;if(command ==null){thrownewException("解析命令参数失败");}// 处理命令的逻辑var response =await _mediator.Send(command);return response;}}
第三步
实现定时任务 execute
触发接口,然后将参数转发到 ExecuteTaskHandler 中,这里通过依赖注入的方式屏蔽和解决强类型的问题。
/// /// 定时任务触发入口./// [ApiController][Route("/hangfire")]publicclassExecuteController:ControllerBase{privatereadonlyIServiceProvider _serviceProvider;privatereadonlyHangireTypeFactory _hangireTypeFactory;publicExecuteController(IServiceProvider serviceProvider,HangireTypeFactory hangireTypeFactory){ _serviceProvider = serviceProvider; _hangireTypeFactory = hangireTypeFactory;}[HttpPost("execute")]publicasyncTaskExecteTasResult>ExecuteTask([FromBody]TaskRequest request){var commandType = _hangireTypeFactory.Get(request.CommandType);// 找不到该事件类型,取消后续事件执行if(commandType ==null){returnnewExecteTasResult{CancelTask=true};}var commandTypeHandler =typeof(ExecuteTaskHandler).MakeGenericType(commandType);var handler = _serviceProvider.GetService(commandTypeHandler)asIHangfireTaskHandler;if(handler ==null){returnnewExecteTasResult{CancelTask=true};}returnawait handler.Handler(request);}}
第四步
封装好代码后,开始最后一个环境,配置和注册服务,由于笔者使用 Maomi.Core
框架,因此服务注册配置和扫描程序集变得非常简单,只需要通过 Maomi.Core
框架提供的接口即可最简单地实现功能。
public classApiModule:Maomi.ModuleCore,IModule{privatereadonlyHangireTypeFactory _hangireTypeFactory;publicApiModule(){ _hangireTypeFactory =newHangireTypeFactory();}publicoverridevoidConfigureServices(ServiceContext context){ context.Services.AddTransient(typeof(ExecuteTaskHandler)); context.Services.AddSingleton(_hangireTypeFactory); context.Services.AddHttpClient(); context.Services.AddMediatR(o =>{ o.RegisterServicesFromAssemblies(context.Modules.Select(x => x.Assembly).ToArray());});}publicoverridevoidTypeFilter(Type type){if(!type.IsClass|| type.IsAbstract){return;}if(type.IsAssignableTo(typeof(HangfireRequest))){ _hangireTypeFactory.Add(type);}}}

第五步
开发者可以这样写定时任务 Command 以及执行器,然后通过接口触发定时任务。
public classMyTestRequest:HangfireRequest,IRequestExecteTasResult>{}/// /// 要被定时任务执行的代码./// publicclassMyTestHandler:IRequestHandlerMyTestRequest,ExecteTasResult>{privatestaticvolatileint _count;privatestaticDateTimeOffset _lastTime;publicasyncTaskExecteTasResult>Handle(MyTestRequest request,CancellationToken cancellationToken){ _count++;if(_lastTime ==default){ _lastTime =DateTimeOffset.Now;}Console.WriteLine($""" 执行时间:{DateTimeOffset.Now.ToString("HH:mm:ss.ffff")} 执行频率(每 10s):{(_count / (DateTimeOffset.Now - _lastTime).TotalSeconds * 10)} """);returnnewExecteTasResult{CancelTask=false};}}
[ApiController][Route("/test")]publicclassSendTaskController:ControllerBase{privatereadonlySendHangfireService _hangfireService;publicSendTaskController(SendHangfireService hangfireService){ _hangfireService = hangfireService;}[HttpGet("aaa")]publicasyncTaskstring>SendAsync(){await _hangfireService.Send(newMyTestRequest{CreateTime=DateTimeOffset.Now,CronExpression="* * * * * *",TaskId=Guid.NewGuid().ToString(),});return"aaa";}}
最后
启动项目测试代码,记录执行频率和时间间隔。


