diff options
Diffstat (limited to '')
-rw-r--r-- | services/cron/tasks.go | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/services/cron/tasks.go b/services/cron/tasks.go new file mode 100644 index 0000000..f8a7444 --- /dev/null +++ b/services/cron/tasks.go @@ -0,0 +1,230 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package cron + +import ( + "context" + "fmt" + "reflect" + "strings" + "sync" + "time" + + "code.gitea.io/gitea/models/db" + system_model "code.gitea.io/gitea/models/system" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/translation" +) + +var ( + lock = sync.Mutex{} + started = false + tasks = []*Task{} + tasksMap = map[string]*Task{} +) + +// Task represents a Cron task +type Task struct { + lock sync.Mutex + Name string + config Config + fun func(context.Context, *user_model.User, Config) error + Status string + LastMessage string + LastDoer string + ExecTimes int64 + // This stores the time of the last manual run of this task. + LastRun time.Time +} + +// DoRunAtStart returns if this task should run at the start +func (t *Task) DoRunAtStart() bool { + return t.config.DoRunAtStart() +} + +// IsEnabled returns if this task is enabled as cron task +func (t *Task) IsEnabled() bool { + return t.config.IsEnabled() +} + +// GetConfig will return a copy of the task's config +func (t *Task) GetConfig() Config { + if reflect.TypeOf(t.config).Kind() == reflect.Ptr { + // Pointer: + return reflect.New(reflect.ValueOf(t.config).Elem().Type()).Interface().(Config) + } + // Not pointer: + return reflect.New(reflect.TypeOf(t.config)).Elem().Interface().(Config) +} + +// Run will run the task incrementing the cron counter with no user defined +func (t *Task) Run() { + t.RunWithUser(&user_model.User{ + ID: -1, + Name: "(Cron)", + LowerName: "(cron)", + }, t.config) +} + +// RunWithUser will run the task incrementing the cron counter at the time with User +func (t *Task) RunWithUser(doer *user_model.User, config Config) { + if !taskStatusTable.StartIfNotRunning(t.Name) { + return + } + t.lock.Lock() + if config == nil { + config = t.config + } + t.ExecTimes++ + t.lock.Unlock() + defer func() { + taskStatusTable.Stop(t.Name) + }() + graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) { + defer func() { + if err := recover(); err != nil { + // Recover a panic within the execution of the task. + combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) + log.Error("PANIC whilst running task: %s Value: %v", t.Name, combinedErr) + } + }() + // Store the time of this run, before the function is executed, so it + // matches the behavior of what the cron library does. + t.lock.Lock() + t.LastRun = time.Now() + t.lock.Unlock() + + pm := process.GetManager() + doerName := "" + if doer != nil && doer.ID != -1 { + doerName = doer.Name + } + + ctx, _, finished := pm.AddContext(baseCtx, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "process", doerName)) + defer finished() + + if err := t.fun(ctx, doer, config); err != nil { + var message string + var status string + if db.IsErrCancelled(err) { + status = "cancelled" + message = err.(db.ErrCancelled).Message + } else { + status = "error" + message = err.Error() + } + + t.lock.Lock() + t.LastMessage = message + t.Status = status + t.LastDoer = doerName + t.lock.Unlock() + + if err := system_model.CreateNotice(ctx, system_model.NoticeTask, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "cancelled", doerName, message)); err != nil { + log.Error("CreateNotice: %v", err) + } + return + } + + t.lock.Lock() + t.Status = "finished" + t.LastMessage = "" + t.LastDoer = doerName + t.lock.Unlock() + + if config.DoNoticeOnSuccess() { + if err := system_model.CreateNotice(ctx, system_model.NoticeTask, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "finished", doerName)); err != nil { + log.Error("CreateNotice: %v", err) + } + } + }) +} + +// GetTask gets the named task +func GetTask(name string) *Task { + lock.Lock() + defer lock.Unlock() + log.Info("Getting %s in %v", name, tasksMap[name]) + + return tasksMap[name] +} + +// RegisterTask allows a task to be registered with the cron service +func RegisterTask(name string, config Config, fun func(context.Context, *user_model.User, Config) error) error { + log.Debug("Registering task: %s", name) + + i18nKey := "admin.dashboard." + name + if value := translation.NewLocale("en-US").TrString(i18nKey); value == i18nKey { + return fmt.Errorf("translation is missing for task %q, please add translation for %q", name, i18nKey) + } + + _, err := setting.GetCronSettings(name, config) + if err != nil { + log.Error("Unable to register cron task with name: %s Error: %v", name, err) + return err + } + + task := &Task{ + Name: name, + config: config, + fun: fun, + } + lock.Lock() + locked := true + defer func() { + if locked { + lock.Unlock() + } + }() + if _, has := tasksMap[task.Name]; has { + log.Error("A task with this name: %s has already been registered", name) + return fmt.Errorf("duplicate task with name: %s", task.Name) + } + + if config.IsEnabled() { + // We cannot use the entry return as there is no way to lock it + if err := addTaskToScheduler(task); err != nil { + return err + } + } + + tasks = append(tasks, task) + tasksMap[task.Name] = task + if started && config.IsEnabled() && config.DoRunAtStart() { + lock.Unlock() + locked = false + task.Run() + } + + return nil +} + +// RegisterTaskFatal will register a task but if there is an error log.Fatal +func RegisterTaskFatal(name string, config Config, fun func(context.Context, *user_model.User, Config) error) { + if err := RegisterTask(name, config, fun); err != nil { + log.Fatal("Unable to register cron task %s Error: %v", name, err) + } +} + +func addTaskToScheduler(task *Task) error { + tags := []string{task.Name, task.config.GetSchedule()} // name and schedule can't be get from job, so we add them as tag + if scheduleHasSeconds(task.config.GetSchedule()) { + scheduler = scheduler.CronWithSeconds(task.config.GetSchedule()) + } else { + scheduler = scheduler.Cron(task.config.GetSchedule()) + } + if _, err := scheduler.Tag(tags...).Do(task.Run); err != nil { + log.Error("Unable to register cron task with name: %s Error: %v", task.Name, err) + return err + } + return nil +} + +func scheduleHasSeconds(schedule string) bool { + return len(strings.Fields(schedule)) >= 6 +} |