在 worker service 中使用 quartz.net

  • 以下示例基于 Quartz.Net 3.0.7(在最新的 3.1-beta2 中已经包含了支持 Microsoft DI 的方法)

在 worker service 中,通过官网示例,会发现 quartz.net 并未生效,究其原因系 DI 未注入导致,原生 quartz.net(3.0.7)是通过 CreateInstance 来创建实例的,本文旨在解决在 Worker Service、Console 中使用 quartz.net 无效的问题。

项目结构如下:

初始化

JobSchedule.cs

用来配置 Job,如果需要更多配置,可以扩展该类。

public class JobSchedule
{
    public JobSchedule(Type jobType, string cronExpression)
    {
        JobType = jobType;
        CronExpression = cronExpression;
    }

    public Type JobType { get; }
    public string CronExpression { get; }
}

SingletonJobFactory.cs

默认情况下,Quartz 是通过 Activator.CreateInstance 来创建实例的,这里因为要使用 IoC,所以这里需要自定义一个 IJobFactory 以来使用构造函数注入。

public class SingletonJobFactory : IJobFactory
{
    private readonly IServiceProvider _serviceProvider;
    public SingletonJobFactory(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }

    public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
    {
        return _serviceProvider.GetRequiredService(bundle.JobDetail.JobType) as IJob;
    }

    public void ReturnJob(IJob job) { }
}
  • P.S.: 目前 Quartz.Net 的机制导致只能使用 Singleton 或者 Transient 类型来进行注入,否则无法保证 IJob 的实现是安全的。

QuartzHostedService.cs

宿主服务,以便可以在后台运行

public class QuartzHostedService : IHostedService
{
    #region Implementation of IHostedService

    private readonly ISchedulerFactory _schedulerFactory;
    private readonly IJobFactory _jobFactory;
    private readonly IEnumerable<JobSchedule> _jobSchedules;

    public QuartzHostedService(
        ISchedulerFactory schedulerFactory,
        IJobFactory jobFactory,
        IEnumerable<JobSchedule> jobSchedules //IEnumerable允许你注入多个Job
        )
    {
        _schedulerFactory = schedulerFactory;
        _jobSchedules = jobSchedules;
        _jobFactory = jobFactory;
    }
    public IScheduler Scheduler { get; set; }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        Scheduler = await _schedulerFactory.GetScheduler(cancellationToken);
        Scheduler.JobFactory = _jobFactory;

        foreach (var jobSchedule in _jobSchedules)
        {
            var job = CreateJob(jobSchedule);
            var trigger = CreateTrigger(jobSchedule);

            await Scheduler.ScheduleJob(job, trigger, cancellationToken);
        }

        await Scheduler.Start(cancellationToken);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await Scheduler?.Shutdown(cancellationToken);
    }

    private static IJobDetail CreateJob(JobSchedule schedule)
    {
        var jobType = schedule.JobType;
        return JobBuilder
            .Create(jobType)
            .WithIdentity(jobType.FullName)
            .WithDescription(jobType.Name)
            .Build();
    }

    private static ITrigger CreateTrigger(JobSchedule schedule)
    {
        return TriggerBuilder
            .Create()
            .WithIdentity($"{schedule.JobType.FullName}.trigger")
            .WithCronSchedule(schedule.CronExpression)
            .WithDescription(schedule.CronExpression)
            .Build();
    }

    #endregion
}

配置:IoC

Program.cs

public class Program
{
    public static void Main(string[] args)
    {
        CreateHostBuilder(args).Build().Run();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {

                services.AddSingleton<IJobFactory, SingletonJobFactory>();
                services.AddSingleton<ISchedulerFactory, StdSchedulerFactory>();

                // Jobs
                services.AddSingleton<MyJob>();
                services.AddSingleton<MyJob2>();
                services.AddSingleton(new JobSchedule(
                    jobType: typeof(MyJob),
                    cronExpression: "0/5 * * * * ?")); // 每5s
                //不同的规则可以单独建立
                services.AddSingleton(new JobSchedule(
                    jobType: typeof(MyJob2),
                    cronExpression: "0/1 * * * * ?")); // 每1s
                services.AddHostedService<QuartzHostedService>();

                services.AddHostedService<Worker>();
            });
}

Job 示例

[DisallowConcurrentExecution]
public class MyJob : IJob
{
    private readonly ILogger<MyJob> _logger;

    public MyJob(ILogger<MyJob> logger)
    {
        _logger = logger;
    }
    #region Implementation of IJob

    /// <summary>
    /// Called by the <see cref="T:Quartz.IScheduler" /> when a <see cref="T:Quartz.ITrigger" />
    /// fires that is associated with the <see cref="T:Quartz.IJob" />.
    /// </summary>
    /// <remarks>
    /// The implementation may wish to set a  result object on the
    /// JobExecutionContext before this method exits.  The result itself
    /// is meaningless to Quartz, but may be informative to
    /// <see cref="T:Quartz.IJobListener" />s or
    /// <see cref="T:Quartz.ITriggerListener" />s that are watching the job's
    /// execution.
    /// </remarks>
    /// <param name="context">The execution context.</param>
    public async Task Execute(IJobExecutionContext context)
    {
        await Task.Run(() =>
       {
           _logger.LogInformation($"I am MyJob,DetailGroup={context.JobDetail.Key.Group},DetailName={context.JobDetail.Key.Name}");
       });
    }

    #endregion
}

特性说明:

[DisallowConcurrentExecution]:防止并行执行相同的 Job。

其他说明

因为注入时只能使用 Singleton 或者 Transient,所以对于 Scoped 类型的 DI无法使用,如果硬要使用的话,可以通过如下方式进行:

public class MyJob : IJob
{
    // Inject the DI provider
    private readonly IServiceProvider _provider;
    public MyJob( IServiceProvider provider)
    {
        _provider = provider;
    }

    public async Task Execute(IJobExecutionContext context)
    {
        await Task.Run(() =>
       {
           using(var scope = _provider.CreateScope())
            {
                // Scoped service
                var service = scope.ServiceProvider.GetService<IScopedService>();
                _logger.LogInformation("MyJob Scope Tips。");
            }
       });
    }
}

参考