summaryrefslogtreecommitdiffstats
path: root/services/actions/schedule_tasks.go
blob: 18f3324fd2c261f2f6a9f6c759165ef27d135ff8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package actions

import (
	"context"
	"fmt"
	"time"

	actions_model "code.gitea.io/gitea/models/actions"
	"code.gitea.io/gitea/models/db"
	repo_model "code.gitea.io/gitea/models/repo"
	"code.gitea.io/gitea/models/unit"
	"code.gitea.io/gitea/modules/log"
	"code.gitea.io/gitea/modules/timeutil"
	webhook_module "code.gitea.io/gitea/modules/webhook"

	"github.com/nektos/act/pkg/jobparser"
)

// StartScheduleTasks start the task
func StartScheduleTasks(ctx context.Context) error {
	return startTasks(ctx)
}

// startTasks retrieves specifications in pages, creates a schedule task for each specification,
// and updates the specification's next run time and previous run time.
// The function returns an error if there's an issue with finding or updating the specifications.
func startTasks(ctx context.Context) error {
	// Set the page size
	pageSize := 50

	// Retrieve specs in pages until all specs have been retrieved
	now := time.Now()
	for page := 1; ; page++ {
		// Retrieve the specs for the current page
		specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
			ListOptions: db.ListOptions{
				Page:     page,
				PageSize: pageSize,
			},
			Next: now.Unix(),
		})
		if err != nil {
			return fmt.Errorf("find specs: %w", err)
		}

		if err := specs.LoadRepos(ctx); err != nil {
			return fmt.Errorf("LoadRepos: %w", err)
		}

		// Loop through each spec and create a schedule task for it
		for _, row := range specs {
			// cancel running jobs if the event is push
			if row.Schedule.Event == webhook_module.HookEventPush {
				// cancel running jobs of the same workflow
				if err := actions_model.CancelPreviousJobs(
					ctx,
					row.RepoID,
					row.Schedule.Ref,
					row.Schedule.WorkflowID,
					webhook_module.HookEventSchedule,
				); err != nil {
					log.Error("CancelPreviousJobs: %v", err)
				}
			}

			if row.Repo.IsArchived {
				// Skip if the repo is archived
				continue
			}

			cfg, err := row.Repo.GetUnit(ctx, unit.TypeActions)
			if err != nil {
				if repo_model.IsErrUnitTypeNotExist(err) {
					// Skip the actions unit of this repo is disabled.
					continue
				}
				return fmt.Errorf("GetUnit: %w", err)
			}
			if cfg.ActionsConfig().IsWorkflowDisabled(row.Schedule.WorkflowID) {
				continue
			}

			if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
				log.Error("CreateScheduleTask: %v", err)
				return err
			}

			// Parse the spec
			schedule, err := row.Parse()
			if err != nil {
				log.Error("Parse: %v", err)
				return err
			}

			// Update the spec's next run time and previous run time
			row.Prev = row.Next
			row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
			if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
				log.Error("UpdateScheduleSpec: %v", err)
				return err
			}
		}

		// Stop if all specs have been retrieved
		if len(specs) < pageSize {
			break
		}
	}

	return nil
}

// CreateScheduleTask creates a scheduled task from a cron action schedule.
// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
	// Create a new action run based on the schedule
	run := &actions_model.ActionRun{
		Title:         cron.Title,
		RepoID:        cron.RepoID,
		OwnerID:       cron.OwnerID,
		WorkflowID:    cron.WorkflowID,
		TriggerUserID: cron.TriggerUserID,
		Ref:           cron.Ref,
		CommitSHA:     cron.CommitSHA,
		Event:         cron.Event,
		EventPayload:  cron.EventPayload,
		TriggerEvent:  string(webhook_module.HookEventSchedule),
		ScheduleID:    cron.ID,
		Status:        actions_model.StatusWaiting,
	}

	vars, err := actions_model.GetVariablesOfRun(ctx, run)
	if err != nil {
		log.Error("GetVariablesOfRun: %v", err)
		return err
	}

	// Parse the workflow specification from the cron schedule
	workflows, err := jobparser.Parse(cron.Content, jobparser.WithVars(vars))
	if err != nil {
		return err
	}

	// Insert the action run and its associated jobs into the database
	if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
		return err
	}

	// Return nil if no errors occurred
	return nil
}