diff options
author | Daniel Baumann <daniel@debian.org> | 2024-10-18 20:33:49 +0200 |
---|---|---|
committer | Daniel Baumann <daniel@debian.org> | 2024-12-12 23:57:56 +0100 |
commit | e68b9d00a6e05b3a941f63ffb696f91e554ac5ec (patch) | |
tree | 97775d6c13b0f416af55314eb6a89ef792474615 /services/actions/job_emitter.go | |
parent | Initial commit. (diff) | |
download | forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.tar.xz forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.zip |
Adding upstream version 9.0.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to 'services/actions/job_emitter.go')
-rw-r--r-- | services/actions/job_emitter.go | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go new file mode 100644 index 0000000..1f859fc --- /dev/null +++ b/services/actions/job_emitter.go @@ -0,0 +1,162 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "errors" + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/queue" + + "github.com/nektos/act/pkg/jobparser" + "xorm.io/builder" +) + +var jobEmitterQueue *queue.WorkerPoolQueue[*jobUpdate] + +type jobUpdate struct { + RunID int64 +} + +func EmitJobsIfReady(runID int64) error { + err := jobEmitterQueue.Push(&jobUpdate{ + RunID: runID, + }) + if errors.Is(err, queue.ErrAlreadyInQueue) { + return nil + } + return err +} + +func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { + ctx := graceful.GetManager().ShutdownContext() + var ret []*jobUpdate + for _, update := range items { + if err := checkJobsOfRun(ctx, update.RunID); err != nil { + ret = append(ret, update) + } + } + return ret +} + +func checkJobsOfRun(ctx context.Context, runID int64) error { + jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID}) + if err != nil { + return err + } + if err := db.WithTx(ctx, func(ctx context.Context) error { + idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) + for _, job := range jobs { + idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + } + + updates := newJobStatusResolver(jobs).Resolve() + for _, job := range jobs { + if status, ok := updates[job.ID]; ok { + job.Status = status + if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { + return err + } else if n != 1 { + return fmt.Errorf("no affected for updating blocked job %v", job.ID) + } + } + } + return nil + }); err != nil { + return err + } + CreateCommitStatus(ctx, jobs...) + return nil +} + +type jobStatusResolver struct { + statuses map[int64]actions_model.Status + needs map[int64][]int64 + jobMap map[int64]*actions_model.ActionRunJob +} + +func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { + idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) + jobMap := make(map[int64]*actions_model.ActionRunJob) + for _, job := range jobs { + idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + jobMap[job.ID] = job + } + + statuses := make(map[int64]actions_model.Status, len(jobs)) + needs := make(map[int64][]int64, len(jobs)) + for _, job := range jobs { + statuses[job.ID] = job.Status + for _, need := range job.Needs { + for _, v := range idToJobs[need] { + needs[job.ID] = append(needs[job.ID], v.ID) + } + } + } + return &jobStatusResolver{ + statuses: statuses, + needs: needs, + jobMap: jobMap, + } +} + +func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { + ret := map[int64]actions_model.Status{} + for i := 0; i < len(r.statuses); i++ { + updated := r.resolve() + if len(updated) == 0 { + return ret + } + for k, v := range updated { + ret[k] = v + r.statuses[k] = v + } + } + return ret +} + +func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { + ret := map[int64]actions_model.Status{} + for id, status := range r.statuses { + if status != actions_model.StatusBlocked { + continue + } + allDone, allSucceed := true, true + for _, need := range r.needs[id] { + needStatus := r.statuses[need] + if !needStatus.IsDone() { + allDone = false + } + if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) { + allSucceed = false + } + } + if allDone { + if allSucceed { + ret[id] = actions_model.StatusWaiting + } else { + // Check if the job has an "if" condition + hasIf := false + if wfJobs, _ := jobparser.Parse(r.jobMap[id].WorkflowPayload); len(wfJobs) == 1 { + _, wfJob := wfJobs[0].Job() + hasIf = len(wfJob.If.Value) > 0 + } + + if hasIf { + // act_runner will check the "if" condition + ret[id] = actions_model.StatusWaiting + } else { + // If the "if" condition is empty and not all dependent jobs completed successfully, + // the job should be skipped. + ret[id] = actions_model.StatusSkipped + } + } + } + } + return ret +} |