summaryrefslogtreecommitdiffstats
path: root/services/actions/job_emitter.go
diff options
context:
space:
mode:
authorDaniel Baumann <daniel@debian.org>2024-10-18 20:33:49 +0200
committerDaniel Baumann <daniel@debian.org>2024-12-12 23:57:56 +0100
commite68b9d00a6e05b3a941f63ffb696f91e554ac5ec (patch)
tree97775d6c13b0f416af55314eb6a89ef792474615 /services/actions/job_emitter.go
parentInitial commit. (diff)
downloadforgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.tar.xz
forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.zip
Adding upstream version 9.0.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to 'services/actions/job_emitter.go')
-rw-r--r--services/actions/job_emitter.go162
1 files changed, 162 insertions, 0 deletions
diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go
new file mode 100644
index 0000000..1f859fc
--- /dev/null
+++ b/services/actions/job_emitter.go
@@ -0,0 +1,162 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ actions_model "code.gitea.io/gitea/models/actions"
+ "code.gitea.io/gitea/models/db"
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/queue"
+
+ "github.com/nektos/act/pkg/jobparser"
+ "xorm.io/builder"
+)
+
+var jobEmitterQueue *queue.WorkerPoolQueue[*jobUpdate]
+
+type jobUpdate struct {
+ RunID int64
+}
+
+func EmitJobsIfReady(runID int64) error {
+ err := jobEmitterQueue.Push(&jobUpdate{
+ RunID: runID,
+ })
+ if errors.Is(err, queue.ErrAlreadyInQueue) {
+ return nil
+ }
+ return err
+}
+
+func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
+ ctx := graceful.GetManager().ShutdownContext()
+ var ret []*jobUpdate
+ for _, update := range items {
+ if err := checkJobsOfRun(ctx, update.RunID); err != nil {
+ ret = append(ret, update)
+ }
+ }
+ return ret
+}
+
+func checkJobsOfRun(ctx context.Context, runID int64) error {
+ jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID})
+ if err != nil {
+ return err
+ }
+ if err := db.WithTx(ctx, func(ctx context.Context) error {
+ idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
+ for _, job := range jobs {
+ idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
+ }
+
+ updates := newJobStatusResolver(jobs).Resolve()
+ for _, job := range jobs {
+ if status, ok := updates[job.ID]; ok {
+ job.Status = status
+ if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
+ return err
+ } else if n != 1 {
+ return fmt.Errorf("no affected for updating blocked job %v", job.ID)
+ }
+ }
+ }
+ return nil
+ }); err != nil {
+ return err
+ }
+ CreateCommitStatus(ctx, jobs...)
+ return nil
+}
+
+type jobStatusResolver struct {
+ statuses map[int64]actions_model.Status
+ needs map[int64][]int64
+ jobMap map[int64]*actions_model.ActionRunJob
+}
+
+func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
+ idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
+ jobMap := make(map[int64]*actions_model.ActionRunJob)
+ for _, job := range jobs {
+ idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
+ jobMap[job.ID] = job
+ }
+
+ statuses := make(map[int64]actions_model.Status, len(jobs))
+ needs := make(map[int64][]int64, len(jobs))
+ for _, job := range jobs {
+ statuses[job.ID] = job.Status
+ for _, need := range job.Needs {
+ for _, v := range idToJobs[need] {
+ needs[job.ID] = append(needs[job.ID], v.ID)
+ }
+ }
+ }
+ return &jobStatusResolver{
+ statuses: statuses,
+ needs: needs,
+ jobMap: jobMap,
+ }
+}
+
+func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
+ ret := map[int64]actions_model.Status{}
+ for i := 0; i < len(r.statuses); i++ {
+ updated := r.resolve()
+ if len(updated) == 0 {
+ return ret
+ }
+ for k, v := range updated {
+ ret[k] = v
+ r.statuses[k] = v
+ }
+ }
+ return ret
+}
+
+func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
+ ret := map[int64]actions_model.Status{}
+ for id, status := range r.statuses {
+ if status != actions_model.StatusBlocked {
+ continue
+ }
+ allDone, allSucceed := true, true
+ for _, need := range r.needs[id] {
+ needStatus := r.statuses[need]
+ if !needStatus.IsDone() {
+ allDone = false
+ }
+ if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) {
+ allSucceed = false
+ }
+ }
+ if allDone {
+ if allSucceed {
+ ret[id] = actions_model.StatusWaiting
+ } else {
+ // Check if the job has an "if" condition
+ hasIf := false
+ if wfJobs, _ := jobparser.Parse(r.jobMap[id].WorkflowPayload); len(wfJobs) == 1 {
+ _, wfJob := wfJobs[0].Job()
+ hasIf = len(wfJob.If.Value) > 0
+ }
+
+ if hasIf {
+ // act_runner will check the "if" condition
+ ret[id] = actions_model.StatusWaiting
+ } else {
+ // If the "if" condition is empty and not all dependent jobs completed successfully,
+ // the job should be skipped.
+ ret[id] = actions_model.StatusSkipped
+ }
+ }
+ }
+ }
+ return ret
+}