gin框架实践连载番外篇Cron定时任务

引言

  • 很多时候我们需要定时处理一些任务,我们今天来学习一下接入到gin框架
  • 选用github.com/robfig/cron/v3的原因 github star 6.7k 作者一直在维护
  • 常见写法
  • 源码解读
  • 如何保证关闭/重启的时候,正在运行的任务要完成后才能继续执行关闭/重启
  • 案例代码地址

1、快速使用

go mod init github.com/18211167516/go-lib/cron/rebfig_cron
复制代码

新建main.go文件

package main

import (
  "fmt"
  "time"

  "github.com/robfig/cron/v3"
)

type testJob struct{}

//实现了 type Job interface {Run()}
func (t testJob) Run() {
	fmt.Println("i.m test job")
}

func main() {
  c := cron.New()

  c.AddFunc("@every 1s", func() {
    fmt.Println("tick every 1 second")
  })
  
  c.AddJob("* * * * *", testJob{})

  c.Start()
  
  select {} 
}

复制代码
  • 创建corn对象,用于管理定时任务
  • 使用addFunc添加定时任务
  • Start方法启动定时任务。启动一个新goroutine
  • select{} 防止主goroutine 退出

2、New()的选项

实际上是返回 type Option func(*Cron)

目前有内置5个选项

2.1 WithLocation 指定时区

loc,_ := time.LoadLocation("America/Los_Angeles")
c := cron.New(cron.WithLocation(loc))
复制代码

2.2 WithSeconds 支持粒度到秒级(默认是和crontab一样分钟级)

实际上WithSeconds就是使用WithParser实现

c:=cron.New(cron.WithSeconds())
//每2秒执行一次
c.AddFunc("*/2 * * * * *", func() {
		file, _ := os.OpenFile("log.txt", os.O_APPEND|os.O_CREATE, 0755)
		defer file.Close()
		fmt.Println("test 11")
		file.Write([]byte("test 111\r\n"))
	})
复制代码

2.3 WithParser 使用自定义解析器

实现接口 type ScheduleParser interface {

Parse(spec string) (Schedule, error)
复制代码

}

paeser:= cron.NewParser(
		cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
	)
    
c := cron.New(cron.WithParser(paeser))
c.AddFunc("1 * * * * *", func () {
  fmt.Println("every 1 second")
})
复制代码

2.4 WithChain Job包装器(Job中间件)

  • 默认3个中间件
  Recover 捕获内部Job产生的 panic;
  DelayIfStillRunning 触发时,如果上一次任务还未执行完成(耗时太长),则等待上一次任务完成之后再执行
  SkipIfStillRunning 触发时,如果上一次任务还未完成,则跳过此次执行
复制代码
  • 使用

type testJob struct{}

func (t testJob) Run() {
	panic("test job")
	//fmt.Println("i.m test job")
}

logger := cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))
//全局
c := cron.New(cron.WithChain(cron.Recover(logger)))

c.addFunc("* * * * * *",func(){
	panic("1232132")
})

//局部中间件
c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(testJob{}))

复制代码
  • 主要实现
//先执行Job中间件在执行具体Job
func (c Chain) Then(j Job) Job {
	for i := range c.wrappers {
		j = c.wrappers[len(c.wrappers)-i-1](j)
	}
	return j
}
复制代码

2.5 WithLogger 自定义日志Logger

先看默认的Logger

var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))

func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  return printfLogger{l, false}
}

func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  return printfLogger{l, true}
}

type printfLogger struct {
  logger  interface{ Printf(string, ...interface{}) }
  logInfo bool
}

func (pl printfLogger) Info(msg string, keysAndValues ...interface{}) {
	if pl.logInfo {
		keysAndValues = formatTimes(keysAndValues)
		pl.logger.Printf(
			formatString(len(keysAndValues)),
			append([]interface{}{msg}, keysAndValues...)...)
	}
}

func (pl printfLogger) Error(err error, msg string, keysAndValues ...interface{}) {
	keysAndValues = formatTimes(keysAndValues)
	pl.logger.Printf(
		formatString(len(keysAndValues)+2),
		append([]interface{}{msg, "error", err}, keysAndValues...)...)
}
复制代码

只要实现Logger接口就可以自定义日志Logger

  type Logger interface {
      // Info logs routine messages about cron's operation.
      Info(msg string, keysAndValues ...interface{})
      // Error logs an error condition.
      Error(err error, msg string, keysAndValues ...interface{})
  }
复制代码
    //实现控制台和文件日志双写
    f, _ := os.Create("cron.log")

	c := cron.New(cron.WithSeconds(), cron.WithLogger(
		cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))))

	c.AddFunc("*/2 * * * * *", func() {
		fmt.Println("test 11")
	})
复制代码

3、创建任务

3.1 addFunc()

 c := cron.New()

  c.AddFunc("@every 1s", func() {
    fmt.Println("tick every 1 second")
  })
  
c.Start()
复制代码

3.2 addJob()

type testJob struct{}

//实现了 type Job interface {Run()}
func (t testJob) Run() {
	fmt.Println("i.m test job")
}

c.AddJob("* * * * *", testJob{})

c.Start()
复制代码

3.3 源码解读

step1 AddFunc基于addJob实现


func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
	return c.AddJob(spec, FuncJob(cmd))
}

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
	//解析时间格式
	schedule, err := c.parser.Parse(spec)
	if err != nil {
		return 0, err
	}
	return c.Schedule(schedule, cmd), nil
}
复制代码

step2

func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
	//并发锁
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
    //自增ID
	c.nextID++
	entry := &Entry{
		ID:         c.nextID,
		Schedule:   schedule,
		WrappedJob: c.chain.Then(cmd),
		Job:        cmd,
	}
    //在服务已经启动的情况下新增任务(不知道为什么有这种操作)
	if !c.running {
		c.entries = append(c.entries, entry)
	} else {
		c.add <- entry
	}
	return entry.ID
}
复制代码

step3 最终运行 c.Start(),核心是cron,run()方法

  • 遍历任务,获取每个任务下次执行时间
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
	}
复制代码
  • 排序启动定时器
//无线循环
	for {
		// 任务排序
		sort.Sort(byTime(c.entries))

		var timer *time.Timer
        //设置定时器(多少时间后执行)
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}
       }
复制代码
  • 监听channel
		for {
			select {
            //定时器触发
			case now = <-timer.C:
				now = now.In(c.location)
				c.logger.Info("wake", "now", now)

				// Run every entry whose next time was less than now
                //遍历全部任务
				for _, e := range c.entries {
                	// 判断如果第一个执行时间少于当前时间或者时间零点,则跳出不执行
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
                    //执行Job包装器
					c.startJob(e.WrappedJob)
                    //更新
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
				}
			// 运行后新增任务
			case newEntry := <-c.add:
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)
				c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
			// 
			case replyChan := <-c.snapshot:
				replyChan <- c.entrySnapshot()
				continue
                
			//服务停止
            
			case <-c.stop:
            	//停止服务
				timer.Stop()
				c.logger.Info("stop")
				return
			//移除任务会影响服务停止
			case id := <-c.remove:
				timer.Stop()
				now = c.now()
				c.removeEntry(id)
				c.logger.Info("removed", "entry", id)
			}

			break
		}

复制代码
  • time.Stop()

使用 sync.WaitGroup来实现

   func (c *Cron) Stop() context.Context {
      c.runningMu.Lock()
      defer c.runningMu.Unlock()
      if c.running {
          c.stop <- struct{}{}
          c.running = false
      }
      //后台阻塞阻塞代码
      ctx, cancel := context.WithCancel(context.Background())
      go func() {
      		//计数器为0
          c.jobWaiter.Wait()
          //取消阻塞
          cancel()
      }()
      return ctx
  }
复制代码
  • 执行任务
func (c *Cron) startJob(j Job) {
	//计数器加一
	c.jobWaiter.Add(1)
	go func() {
		defer c.jobWaiter.Done()
		j.Run()
	}()
}
复制代码

4. 时间格式

  • 预定义格式
  @yearly:也可以写作@annually,表示每年第一天的 0 点。等价于0 0 1 1 *;
  @monthly:表示每月第一天的 0 点。等价于0 0 1 * *;
  @weekly:表示每周第一天的 0 点,注意第一天为周日,即周六结束,周日开始的那个 0 点。等价于0 0 * * 0;
  @daily:也可以写作@midnight,表示每天 0 点。等价于0 0 * * *;
  @hourly:表示每小时的开始。等价于0 * * * *。
复制代码
  • 固定间隔格式
	@every <duration>  每个duration触发
    c.AddFunc("@every 1s", func() {
		fmt.Println("test 111")
	})
    
	c.AddFunc(fmt.Sprint("@every ", time.Duration(1)*time.Second), func() {
		fmt.Println("test 222")
	})
复制代码
  • 自定义时间格式

默认支持5位到分钟级(等同于crontab)

c.AddFunc("* * * * *", func () {
  fmt.Println("every 1 分钟")
})

//支持秒
c := cron.New(cron.WithSeconds())
c.AddFunc("* * * * * *", func () {
  fmt.Println("every 1 秒钟")
})
复制代码

5. 总结

麻雀虽小五脏俱全,因为golang特性,天生支持线程安全,保证任务执行的完整性
如果考虑将任务持久化层也抽离,可能更利于日后扩展分布式,3个中间件个人认为应该默认就加上(我想没有人愿意因为某个任务报错导致全部任务终止)

6、参考

  1. Go每日一库之cron
  2. time包用法攻略
  3. cron

7、系列文章