diff options
author | Daniel Baumann <daniel@debian.org> | 2024-10-18 20:33:49 +0200 |
---|---|---|
committer | Daniel Baumann <daniel@debian.org> | 2024-12-12 23:57:56 +0100 |
commit | e68b9d00a6e05b3a941f63ffb696f91e554ac5ec (patch) | |
tree | 97775d6c13b0f416af55314eb6a89ef792474615 /models/actions/task.go | |
parent | Initial commit. (diff) | |
download | forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.tar.xz forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.zip |
Adding upstream version 9.0.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to 'models/actions/task.go')
-rw-r--r-- | models/actions/task.go | 527 |
1 files changed, 527 insertions, 0 deletions
diff --git a/models/actions/task.go b/models/actions/task.go new file mode 100644 index 0000000..8d41a63 --- /dev/null +++ b/models/actions/task.go @@ -0,0 +1,527 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "crypto/subtle" + "fmt" + "time" + + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unit" + "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/nektos/act/pkg/jobparser" + "google.golang.org/protobuf/types/known/timestamppb" + "xorm.io/builder" +) + +// ActionTask represents a distribution of job +type ActionTask struct { + ID int64 + JobID int64 + Job *ActionRunJob `xorm:"-"` + Steps []*ActionTaskStep `xorm:"-"` + Attempt int64 + RunnerID int64 `xorm:"index"` + Status Status `xorm:"index"` + Started timeutil.TimeStamp `xorm:"index"` + Stopped timeutil.TimeStamp `xorm:"index(stopped_log_expired)"` + + RepoID int64 `xorm:"index"` + OwnerID int64 `xorm:"index"` + CommitSHA string `xorm:"index"` + IsForkPullRequest bool + + Token string `xorm:"-"` + TokenHash string `xorm:"UNIQUE"` // sha256 of token + TokenSalt string + TokenLastEight string `xorm:"index token_last_eight"` + + LogFilename string // file name of log + LogInStorage bool // read log from database or from storage + LogLength int64 // lines count + LogSize int64 // blob size + LogIndexes LogIndexes `xorm:"LONGBLOB"` // line number to offset + LogExpired bool `xorm:"index(stopped_log_expired)"` // files that are too old will be deleted + + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated index"` +} + +var successfulTokenTaskCache *lru.Cache[string, any] + +func init() { + db.RegisterModel(new(ActionTask), func() error { + if setting.SuccessfulTokensCacheSize > 0 { + var err error + successfulTokenTaskCache, err = lru.New[string, any](setting.SuccessfulTokensCacheSize) + if err != nil { + return fmt.Errorf("unable to allocate Task cache: %v", err) + } + } else { + successfulTokenTaskCache = nil + } + return nil + }) +} + +func (task *ActionTask) Duration() time.Duration { + return calculateDuration(task.Started, task.Stopped, task.Status) +} + +func (task *ActionTask) IsStopped() bool { + return task.Stopped > 0 +} + +func (task *ActionTask) GetRunLink() string { + if task.Job == nil || task.Job.Run == nil { + return "" + } + return task.Job.Run.Link() +} + +func (task *ActionTask) GetCommitLink() string { + if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil { + return "" + } + return task.Job.Run.Repo.CommitLink(task.CommitSHA) +} + +func (task *ActionTask) GetRepoName() string { + if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil { + return "" + } + return task.Job.Run.Repo.FullName() +} + +func (task *ActionTask) GetRepoLink() string { + if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil { + return "" + } + return task.Job.Run.Repo.Link() +} + +func (task *ActionTask) LoadJob(ctx context.Context) error { + if task.Job == nil { + job, err := GetRunJobByID(ctx, task.JobID) + if err != nil { + return err + } + task.Job = job + } + return nil +} + +// LoadAttributes load Job Steps if not loaded +func (task *ActionTask) LoadAttributes(ctx context.Context) error { + if task == nil { + return nil + } + if err := task.LoadJob(ctx); err != nil { + return err + } + + if err := task.Job.LoadAttributes(ctx); err != nil { + return err + } + + if task.Steps == nil { // be careful, an empty slice (not nil) also means loaded + steps, err := GetTaskStepsByTaskID(ctx, task.ID) + if err != nil { + return err + } + task.Steps = steps + } + + return nil +} + +func (task *ActionTask) GenerateToken() (err error) { + task.Token, task.TokenSalt, task.TokenHash, task.TokenLastEight, err = generateSaltedToken() + return err +} + +func GetTaskByID(ctx context.Context, id int64) (*ActionTask, error) { + var task ActionTask + has, err := db.GetEngine(ctx).Where("id=?", id).Get(&task) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("task with id %d: %w", id, util.ErrNotExist) + } + + return &task, nil +} + +func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, error) { + errNotExist := fmt.Errorf("task with token %q: %w", token, util.ErrNotExist) + if token == "" { + return nil, errNotExist + } + // A token is defined as being SHA1 sum these are 40 hexadecimal bytes long + if len(token) != 40 { + return nil, errNotExist + } + for _, x := range []byte(token) { + if x < '0' || (x > '9' && x < 'a') || x > 'f' { + return nil, errNotExist + } + } + + lastEight := token[len(token)-8:] + + if id := getTaskIDFromCache(token); id > 0 { + task := &ActionTask{ + TokenLastEight: lastEight, + } + // Re-get the task from the db in case it has been deleted in the intervening period + has, err := db.GetEngine(ctx).ID(id).Get(task) + if err != nil { + return nil, err + } + if has { + return task, nil + } + successfulTokenTaskCache.Remove(token) + } + + var tasks []*ActionTask + err := db.GetEngine(ctx).Where("token_last_eight = ? AND status = ?", lastEight, StatusRunning).Find(&tasks) + if err != nil { + return nil, err + } else if len(tasks) == 0 { + return nil, errNotExist + } + + for _, t := range tasks { + tempHash := auth_model.HashToken(token, t.TokenSalt) + if subtle.ConstantTimeCompare([]byte(t.TokenHash), []byte(tempHash)) == 1 { + if successfulTokenTaskCache != nil { + successfulTokenTaskCache.Add(token, t.ID) + } + return t, nil + } + } + return nil, errNotExist +} + +func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) { + ctx, commiter, err := db.TxContext(ctx) + if err != nil { + return nil, false, err + } + defer commiter.Close() + + e := db.GetEngine(ctx) + + jobCond := builder.NewCond() + if runner.RepoID != 0 { + jobCond = builder.Eq{"repo_id": runner.RepoID} + } else if runner.OwnerID != 0 { + jobCond = builder.In("repo_id", builder.Select("`repository`.id").From("repository"). + Join("INNER", "repo_unit", "`repository`.id = `repo_unit`.repo_id"). + Where(builder.Eq{"`repository`.owner_id": runner.OwnerID, "`repo_unit`.type": unit.TypeActions})) + } + if jobCond.IsValid() { + jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond)) + } + + var jobs []*ActionRunJob + if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil { + return nil, false, err + } + + // TODO: a more efficient way to filter labels + var job *ActionRunJob + log.Trace("runner labels: %v", runner.AgentLabels) + for _, v := range jobs { + if isSubset(runner.AgentLabels, v.RunsOn) { + job = v + break + } + } + if job == nil { + return nil, false, nil + } + if err := job.LoadAttributes(ctx); err != nil { + return nil, false, err + } + + now := timeutil.TimeStampNow() + job.Attempt++ + job.Started = now + job.Status = StatusRunning + + task := &ActionTask{ + JobID: job.ID, + Attempt: job.Attempt, + RunnerID: runner.ID, + Started: now, + Status: StatusRunning, + RepoID: job.RepoID, + OwnerID: job.OwnerID, + CommitSHA: job.CommitSHA, + IsForkPullRequest: job.IsForkPullRequest, + } + if err := task.GenerateToken(); err != nil { + return nil, false, err + } + + var workflowJob *jobparser.Job + if gots, err := jobparser.Parse(job.WorkflowPayload); err != nil { + return nil, false, fmt.Errorf("parse workflow of job %d: %w", job.ID, err) + } else if len(gots) != 1 { + return nil, false, fmt.Errorf("workflow of job %d: not single workflow", job.ID) + } else { //nolint:revive + _, workflowJob = gots[0].Job() + } + + if _, err := e.Insert(task); err != nil { + return nil, false, err + } + + task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID) + if err := UpdateTask(ctx, task, "log_filename"); err != nil { + return nil, false, err + } + + if len(workflowJob.Steps) > 0 { + steps := make([]*ActionTaskStep, len(workflowJob.Steps)) + for i, v := range workflowJob.Steps { + name, _ := util.SplitStringAtByteN(v.String(), 255) + steps[i] = &ActionTaskStep{ + Name: name, + TaskID: task.ID, + Index: int64(i), + RepoID: task.RepoID, + Status: StatusWaiting, + } + } + if _, err := e.Insert(steps); err != nil { + return nil, false, err + } + task.Steps = steps + } + + job.TaskID = task.ID + if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil { + return nil, false, err + } else if n != 1 { + return nil, false, nil + } + + task.Job = job + + if err := commiter.Commit(); err != nil { + return nil, false, err + } + + return task, true, nil +} + +func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error { + sess := db.GetEngine(ctx).ID(task.ID) + if len(cols) > 0 { + sess.Cols(cols...) + } + _, err := sess.Update(task) + return err +} + +// UpdateTaskByState updates the task by the state. +// It will always update the task if the state is not final, even there is no change. +// So it will update ActionTask.Updated to avoid the task being judged as a zombie task. +func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionTask, error) { + stepStates := map[int64]*runnerv1.StepState{} + for _, v := range state.Steps { + stepStates[v.Id] = v + } + + ctx, commiter, err := db.TxContext(ctx) + if err != nil { + return nil, err + } + defer commiter.Close() + + e := db.GetEngine(ctx) + + task := &ActionTask{} + if has, err := e.ID(state.Id).Get(task); err != nil { + return nil, err + } else if !has { + return nil, util.ErrNotExist + } + + if task.Status.IsDone() { + // the state is final, do nothing + return task, nil + } + + // state.Result is not unspecified means the task is finished + if state.Result != runnerv1.Result_RESULT_UNSPECIFIED { + task.Status = Status(state.Result) + task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix()) + if err := UpdateTask(ctx, task, "status", "stopped"); err != nil { + return nil, err + } + if _, err := UpdateRunJob(ctx, &ActionRunJob{ + ID: task.JobID, + Status: task.Status, + Stopped: task.Stopped, + }, nil); err != nil { + return nil, err + } + } else { + // Force update ActionTask.Updated to avoid the task being judged as a zombie task + task.Updated = timeutil.TimeStampNow() + if err := UpdateTask(ctx, task, "updated"); err != nil { + return nil, err + } + } + + if err := task.LoadAttributes(ctx); err != nil { + return nil, err + } + + for _, step := range task.Steps { + var result runnerv1.Result + if v, ok := stepStates[step.Index]; ok { + result = v.Result + step.LogIndex = v.LogIndex + step.LogLength = v.LogLength + step.Started = convertTimestamp(v.StartedAt) + step.Stopped = convertTimestamp(v.StoppedAt) + } + if result != runnerv1.Result_RESULT_UNSPECIFIED { + step.Status = Status(result) + } else if step.Started != 0 { + step.Status = StatusRunning + } + if _, err := e.ID(step.ID).Update(step); err != nil { + return nil, err + } + } + + if err := commiter.Commit(); err != nil { + return nil, err + } + + return task, nil +} + +func StopTask(ctx context.Context, taskID int64, status Status) error { + if !status.IsDone() { + return fmt.Errorf("cannot stop task with status %v", status) + } + e := db.GetEngine(ctx) + + task := &ActionTask{} + if has, err := e.ID(taskID).Get(task); err != nil { + return err + } else if !has { + return util.ErrNotExist + } + if task.Status.IsDone() { + return nil + } + + now := timeutil.TimeStampNow() + task.Status = status + task.Stopped = now + if _, err := UpdateRunJob(ctx, &ActionRunJob{ + ID: task.JobID, + Status: task.Status, + Stopped: task.Stopped, + }, nil); err != nil { + return err + } + + if err := UpdateTask(ctx, task, "status", "stopped"); err != nil { + return err + } + + if err := task.LoadAttributes(ctx); err != nil { + return err + } + + for _, step := range task.Steps { + if !step.Status.IsDone() { + step.Status = status + if step.Started == 0 { + step.Started = now + } + step.Stopped = now + } + if _, err := e.ID(step.ID).Update(step); err != nil { + return err + } + } + + return nil +} + +func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, limit int) ([]*ActionTask, error) { + e := db.GetEngine(ctx) + + tasks := make([]*ActionTask, 0, limit) + // Check "stopped > 0" to avoid deleting tasks that are still running + return tasks, e.Where("stopped > 0 AND stopped < ? AND log_expired = ?", olderThan, false). + Limit(limit). + Find(&tasks) +} + +func isSubset(set, subset []string) bool { + m := make(container.Set[string], len(set)) + for _, v := range set { + m.Add(v) + } + + for _, v := range subset { + if !m.Contains(v) { + return false + } + } + return true +} + +func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp { + if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 { + return timeutil.TimeStamp(0) + } + return timeutil.TimeStamp(timestamp.AsTime().Unix()) +} + +func logFileName(repoFullName string, taskID int64) string { + ret := fmt.Sprintf("%s/%02x/%d.log", repoFullName, taskID%256, taskID) + + if setting.Actions.LogCompression.IsZstd() { + ret += ".zst" + } + + return ret +} + +func getTaskIDFromCache(token string) int64 { + if successfulTokenTaskCache == nil { + return 0 + } + tInterface, ok := successfulTokenTaskCache.Get(token) + if !ok { + return 0 + } + t, ok := tInterface.(int64) + if !ok { + return 0 + } + return t +} |