summaryrefslogtreecommitdiffstats
path: root/services/actions/schedule_tasks.go
diff options
context:
space:
mode:
authorDaniel Baumann <daniel@debian.org>2024-10-18 20:33:49 +0200
committerDaniel Baumann <daniel@debian.org>2024-12-12 23:57:56 +0100
commite68b9d00a6e05b3a941f63ffb696f91e554ac5ec (patch)
tree97775d6c13b0f416af55314eb6a89ef792474615 /services/actions/schedule_tasks.go
parentInitial commit. (diff)
downloadforgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.tar.xz
forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.zip
Adding upstream version 9.0.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to 'services/actions/schedule_tasks.go')
-rw-r--r--services/actions/schedule_tasks.go154
1 files changed, 154 insertions, 0 deletions
diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go
new file mode 100644
index 0000000..18f3324
--- /dev/null
+++ b/services/actions/schedule_tasks.go
@@ -0,0 +1,154 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ actions_model "code.gitea.io/gitea/models/actions"
+ "code.gitea.io/gitea/models/db"
+ repo_model "code.gitea.io/gitea/models/repo"
+ "code.gitea.io/gitea/models/unit"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/timeutil"
+ webhook_module "code.gitea.io/gitea/modules/webhook"
+
+ "github.com/nektos/act/pkg/jobparser"
+)
+
+// StartScheduleTasks start the task
+func StartScheduleTasks(ctx context.Context) error {
+ return startTasks(ctx)
+}
+
+// startTasks retrieves specifications in pages, creates a schedule task for each specification,
+// and updates the specification's next run time and previous run time.
+// The function returns an error if there's an issue with finding or updating the specifications.
+func startTasks(ctx context.Context) error {
+ // Set the page size
+ pageSize := 50
+
+ // Retrieve specs in pages until all specs have been retrieved
+ now := time.Now()
+ for page := 1; ; page++ {
+ // Retrieve the specs for the current page
+ specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
+ ListOptions: db.ListOptions{
+ Page: page,
+ PageSize: pageSize,
+ },
+ Next: now.Unix(),
+ })
+ if err != nil {
+ return fmt.Errorf("find specs: %w", err)
+ }
+
+ if err := specs.LoadRepos(ctx); err != nil {
+ return fmt.Errorf("LoadRepos: %w", err)
+ }
+
+ // Loop through each spec and create a schedule task for it
+ for _, row := range specs {
+ // cancel running jobs if the event is push
+ if row.Schedule.Event == webhook_module.HookEventPush {
+ // cancel running jobs of the same workflow
+ if err := actions_model.CancelPreviousJobs(
+ ctx,
+ row.RepoID,
+ row.Schedule.Ref,
+ row.Schedule.WorkflowID,
+ webhook_module.HookEventSchedule,
+ ); err != nil {
+ log.Error("CancelPreviousJobs: %v", err)
+ }
+ }
+
+ if row.Repo.IsArchived {
+ // Skip if the repo is archived
+ continue
+ }
+
+ cfg, err := row.Repo.GetUnit(ctx, unit.TypeActions)
+ if err != nil {
+ if repo_model.IsErrUnitTypeNotExist(err) {
+ // Skip the actions unit of this repo is disabled.
+ continue
+ }
+ return fmt.Errorf("GetUnit: %w", err)
+ }
+ if cfg.ActionsConfig().IsWorkflowDisabled(row.Schedule.WorkflowID) {
+ continue
+ }
+
+ if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
+ log.Error("CreateScheduleTask: %v", err)
+ return err
+ }
+
+ // Parse the spec
+ schedule, err := row.Parse()
+ if err != nil {
+ log.Error("Parse: %v", err)
+ return err
+ }
+
+ // Update the spec's next run time and previous run time
+ row.Prev = row.Next
+ row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
+ if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
+ log.Error("UpdateScheduleSpec: %v", err)
+ return err
+ }
+ }
+
+ // Stop if all specs have been retrieved
+ if len(specs) < pageSize {
+ break
+ }
+ }
+
+ return nil
+}
+
+// CreateScheduleTask creates a scheduled task from a cron action schedule.
+// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
+func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
+ // Create a new action run based on the schedule
+ run := &actions_model.ActionRun{
+ Title: cron.Title,
+ RepoID: cron.RepoID,
+ OwnerID: cron.OwnerID,
+ WorkflowID: cron.WorkflowID,
+ TriggerUserID: cron.TriggerUserID,
+ Ref: cron.Ref,
+ CommitSHA: cron.CommitSHA,
+ Event: cron.Event,
+ EventPayload: cron.EventPayload,
+ TriggerEvent: string(webhook_module.HookEventSchedule),
+ ScheduleID: cron.ID,
+ Status: actions_model.StatusWaiting,
+ }
+
+ vars, err := actions_model.GetVariablesOfRun(ctx, run)
+ if err != nil {
+ log.Error("GetVariablesOfRun: %v", err)
+ return err
+ }
+
+ // Parse the workflow specification from the cron schedule
+ workflows, err := jobparser.Parse(cron.Content, jobparser.WithVars(vars))
+ if err != nil {
+ return err
+ }
+
+ // Insert the action run and its associated jobs into the database
+ if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
+ return err
+ }
+
+ // Return nil if no errors occurred
+ return nil
+}