本文作者:小黑黑

[Quartz.Net]Quartz.Net - Job的数据持久化和并发问题

小黑黑 1年前 ( 2019-03-12 ) 948 抢沙发
[Quartz.Net]Quartz.Net - Job的数据持久化和并发问题摘要: 一、数据持久化(PersistJobDataAfterExecution)        Job是无...

一、数据持久化(PersistJobDataAfterExecution)

        Job是无状态的,每次执行Job,context.JobDetail获取到的值都是原始值,对其的任何修改都不会生效。 例如:我们试图修改JobDetailMap中的age属性,希望每次执行后都+1:

public static async Task Init()
{
     ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
     IScheduler scheduler = await schedulerFactory.GetScheduler();

     await scheduler.Start();

     var job = JobBuilder.Create<PrintHelloQuartz>().UsingJobData("age", 10).Build();

     var trigger1 = TriggerBuilder.Create().WithSimpleSchedule(x => x.RepeatForever().WithIntervalInSeconds(1))
                          .Build();
     await scheduler.ScheduleJob(job, trigger1);

}

public class PrintHelloQuartz : IJob
{
     public async Task Execute(IJobExecutionContext context)
     {

        int age = Convert.ToInt32(context.JobDetail.JobDataMap["age"]);

        await Console.Out.WriteLineAsync($"年龄:{age}");

        context.JobDetail.JobDataMap["age"] = age += 1;
     }
}

123.PNG

我们可以看到实际并没有生效,这是因为Job注册之后,调度器执行任务时,执行Execute()方法之前都会通过反射来创建一个该类的实例,所以每次修改必然不会生效。具体代码在JobRunShell中的Initialize()中:

public virtual async Task Initialize(QuartzScheduler sched, CancellationToken cancellationToken = default)
{
    qs = sched;

    IJob job;
    IJobDetail jobDetail = firedTriggerBundle.JobDetail;

    try
    {
       job = sched.JobFactory.NewJob(firedTriggerBundle, scheduler);    //通过反射来创建Job的实例
    }
    catch (SchedulerException se)
    {
       await sched.NotifySchedulerListenersError($"An error occurred instantiating job to be executed. job= '{jobDetail.Key}'", se, cancellationToken).ConfigureAwait(false);
       throw;
    }
    catch (Exception e)
    {
       SchedulerException se = new SchedulerException($"Problem instantiating type '{jobDetail.JobType.FullName}'", e);
       await sched.NotifySchedulerListenersError($"An error occurred instantiating job to be executed. job= '{jobDetail.Key}'", se, cancellationToken).ConfigureAwait(false);
       throw se;
    }

    jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
}

但我们可以通过在Job类上添加 [PersistJobDataAfterExecution] 特性,就可以非常简单的把Job变成有状态的。

[PersistJobDataAfterExecution]
public class PrintHelloQuartz : IJob
{
     public async Task Execute(IJobExecutionContext context)
     {

        int age = Convert.ToInt32(context.JobDetail.JobDataMap["age"]);

        await Console.Out.WriteLineAsync($"年龄:{age}");

        context.JobDetail.JobDataMap["age"] = age += 1;
     }
}

123.PNG

通过源码我们可看到,当Job执行完毕后,会判断该类是否添加 [PersistJobDataAfterExecution] 特性,如果添加了该特性,那么会将我们新设置的JobDataMap重新创建并赋值给Job的JobDetail,源码位于RAMJobStore下的TriggeredJobComplete()方法

if (jobDetail.PersistJobDataAfterExecution)
{
       JobDataMap newData = jobDetail.JobDataMap;
       if (newData != null)
       {
           newData = (JobDataMap) newData.Clone();
           newData.ClearDirtyFlag();
       }
       jd = jd.GetJobBuilder().SetJobData(newData).Build();
       jw.JobDetail = jd;
}

二、并发问题(DisallowConcurrentExecution)

        当多个Trigger同时触发一个Job或者一个Job的执行时间较长,Trigger条件已经被多次触发,那么这种情况下,Job就会出现并发问题,在这种情况下,数据的准确性必然会有所影响。我们可以使用两个Trigger来触发一个Job来看看效果:

public static async Task Init()
{
       ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
       IScheduler scheduler = await schedulerFactory.GetScheduler();

       await scheduler.Start();

       var job = JobBuilder.Create<PrintHelloQuartz>().UsingJobData("age", 10).Build();

       var trigger1 = TriggerBuilder.Create().WithSimpleSchedule(x => x.RepeatForever().WithIntervalInSeconds(1))
                           .Build();

       var trigger2 = TriggerBuilder.Create()
                       .ForJob(job)
                       .WithSimpleSchedule(x => x.RepeatForever().WithIntervalInSeconds(1)).Build();
                       
       await scheduler.ScheduleJob(job, trigger1);
       await scheduler.ScheduleJob(trigger2);
}

public class PrintHelloQuartz : IJob
{
        public async Task Execute(IJobExecutionContext context)
        {

            await Console.Out.WriteLineAsync($"任务执行时间:{DateTime.Now.ToString("ss")}");

            Thread.Sleep(1000);   //模拟这个Job费时
        }
}

123.PNG

我们可以看到两个Trigger都触发了Job,这样必然会导致数据的错误。我们可以在Job类上添加 [DisallowConcurrentExecution] 特性

[DisallowConcurrentExecution]
public class PrintHelloQuartz : IJob
{
        public async Task Execute(IJobExecutionContext context)
        {

            await Console.Out.WriteLineAsync($"任务执行时间:{DateTime.Now.ToString("ss")}");

            Thread.Sleep(1000);   //模拟这个Job费时
        }
}

123.PNG

我们可以看到Job被有序的执行,我们从源码中可以看到,当调度器开始运行后,会定时从调度程序中获取下一个要触发的触发器(timeTriggers中),当Job被执行时,会判断这个Job类上是否添加了 [DisallowConcurrentExecution] 特性,如果添加了此特性,那么就会临时将这个trigger从timeTriggers中删除,那么此时调度器获取下一个要触发的触发器时就获取不到这个Job相关联的触发器,Job就不会被触发了,等到Job执行完毕后,会将trigger重新添加到timeTriggers中,并通知调度器:

//QuartzSchedulerThread类中的Run(),
//从TtimeTriggers中获取下一个将要被触发的Trigger
triggers = new List<IOperableTrigger>(await qsRsrcs.JobStore.AcquireNextTriggers(noLaterThan, maxCount, qsRsrcs.BatchTimeWindow, CancellationToken.None).ConfigureAwait(false));

//RAMJobStore类中TriggersFired()
//如果Job带有 [DisallowConcurrentExecution] 特性,将会从timerTriggers中删除与Job相关联的Trigger,使上面获取不到此Job的Trigger
if (job.ConcurrentExecutionDisallowed)
{
       IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(job.Key);
       foreach (TriggerWrapper ttw in trigs)
       {
               if (ttw.state == InternalTriggerState.Waiting)
               {
                     ttw.state = InternalTriggerState.Blocked;
               }
               if (ttw.state == InternalTriggerState.Paused)
               {
                     ttw.state = InternalTriggerState.PausedAndBlocked;
               }
               timeTriggers.Remove(ttw);
        }
        blockedJobs.Add(job.Key);
}

//RAMJobStore类中的TriggeredJobComplete()方法
//如果Job类带有 [DisallowConcurrentExecution] 特性,会将次Job相关联的Trigger加入到timeTriggers中,并通知调度器重新获取
if (jd.ConcurrentExecutionDisallowed)
{
        blockedJobs.Remove(jd.Key);
        IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(jd.Key);
        foreach (TriggerWrapper ttw in trigs)
        {
               if (ttw.state == InternalTriggerState.Blocked)
               {
                      ttw.state = InternalTriggerState.Waiting;
                      timeTriggers.Add(ttw);
               }
               if (ttw.state == InternalTriggerState.PausedAndBlocked)
               {
                      ttw.state = InternalTriggerState.Paused;
               }
        }

        signaler.SignalSchedulingChange(null, cancellationToken);
}


分享到: 网站分享代码

发表评论

快捷回复:

评论列表 (暂无评论,948人围观)参与讨论

还没有评论,来说两句吧...