diff options
author | Daniel Baumann <daniel@debian.org> | 2024-10-18 20:33:49 +0200 |
---|---|---|
committer | Daniel Baumann <daniel@debian.org> | 2024-10-18 20:33:49 +0200 |
commit | dd136858f1ea40ad3c94191d647487fa4f31926c (patch) | |
tree | 58fec94a7b2a12510c9664b21793f1ed560c6518 /models/actions | |
parent | Initial commit. (diff) | |
download | forgejo-dd136858f1ea40ad3c94191d647487fa4f31926c.tar.xz forgejo-dd136858f1ea40ad3c94191d647487fa4f31926c.zip |
Adding upstream version 9.0.0.
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to '')
27 files changed, 3643 insertions, 0 deletions
diff --git a/models/actions/artifact.go b/models/actions/artifact.go new file mode 100644 index 0000000..3d0a288 --- /dev/null +++ b/models/actions/artifact.go @@ -0,0 +1,176 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// This artifact server is inspired by https://github.com/nektos/act/blob/master/pkg/artifacts/server.go. +// It updates url setting and uses ObjectStore to handle artifacts persistence. + +package actions + +import ( + "context" + "errors" + "time" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + + "xorm.io/builder" +) + +// ArtifactStatus is the status of an artifact, uploading, expired or need-delete +type ArtifactStatus int64 + +const ( + ArtifactStatusUploadPending ArtifactStatus = iota + 1 // 1, ArtifactStatusUploadPending is the status of an artifact upload that is pending + ArtifactStatusUploadConfirmed // 2, ArtifactStatusUploadConfirmed is the status of an artifact upload that is confirmed + ArtifactStatusUploadError // 3, ArtifactStatusUploadError is the status of an artifact upload that is errored + ArtifactStatusExpired // 4, ArtifactStatusExpired is the status of an artifact that is expired + ArtifactStatusPendingDeletion // 5, ArtifactStatusPendingDeletion is the status of an artifact that is pending deletion + ArtifactStatusDeleted // 6, ArtifactStatusDeleted is the status of an artifact that is deleted +) + +func init() { + db.RegisterModel(new(ActionArtifact)) +} + +// ActionArtifact is a file that is stored in the artifact storage. +type ActionArtifact struct { + ID int64 `xorm:"pk autoincr"` + RunID int64 `xorm:"index unique(runid_name_path)"` // The run id of the artifact + RunnerID int64 + RepoID int64 `xorm:"index"` + OwnerID int64 + CommitSHA string + StoragePath string // The path to the artifact in the storage + FileSize int64 // The size of the artifact in bytes + FileCompressedSize int64 // The size of the artifact in bytes after gzip compression + ContentEncoding string // The content encoding of the artifact + ArtifactPath string `xorm:"index unique(runid_name_path)"` // The path to the artifact when runner uploads it + ArtifactName string `xorm:"index unique(runid_name_path)"` // The name of the artifact when runner uploads it + Status int64 `xorm:"index"` // The status of the artifact, uploading, expired or need-delete + CreatedUnix timeutil.TimeStamp `xorm:"created"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated index"` + ExpiredUnix timeutil.TimeStamp `xorm:"index"` // The time when the artifact will be expired +} + +func CreateArtifact(ctx context.Context, t *ActionTask, artifactName, artifactPath string, expiredDays int64) (*ActionArtifact, error) { + if err := t.LoadJob(ctx); err != nil { + return nil, err + } + artifact, err := getArtifactByNameAndPath(ctx, t.Job.RunID, artifactName, artifactPath) + if errors.Is(err, util.ErrNotExist) { + artifact := &ActionArtifact{ + ArtifactName: artifactName, + ArtifactPath: artifactPath, + RunID: t.Job.RunID, + RunnerID: t.RunnerID, + RepoID: t.RepoID, + OwnerID: t.OwnerID, + CommitSHA: t.CommitSHA, + Status: int64(ArtifactStatusUploadPending), + ExpiredUnix: timeutil.TimeStamp(time.Now().Unix() + 3600*24*expiredDays), + } + if _, err := db.GetEngine(ctx).Insert(artifact); err != nil { + return nil, err + } + return artifact, nil + } else if err != nil { + return nil, err + } + return artifact, nil +} + +func getArtifactByNameAndPath(ctx context.Context, runID int64, name, fpath string) (*ActionArtifact, error) { + var art ActionArtifact + has, err := db.GetEngine(ctx).Where("run_id = ? AND artifact_name = ? AND artifact_path = ?", runID, name, fpath).Get(&art) + if err != nil { + return nil, err + } else if !has { + return nil, util.ErrNotExist + } + return &art, nil +} + +// UpdateArtifactByID updates an artifact by id +func UpdateArtifactByID(ctx context.Context, id int64, art *ActionArtifact) error { + art.ID = id + _, err := db.GetEngine(ctx).ID(id).AllCols().Update(art) + return err +} + +type FindArtifactsOptions struct { + db.ListOptions + RepoID int64 + RunID int64 + ArtifactName string + Status int +} + +func (opts FindArtifactsOptions) ToConds() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + if opts.RunID > 0 { + cond = cond.And(builder.Eq{"run_id": opts.RunID}) + } + if opts.ArtifactName != "" { + cond = cond.And(builder.Eq{"artifact_name": opts.ArtifactName}) + } + if opts.Status > 0 { + cond = cond.And(builder.Eq{"status": opts.Status}) + } + + return cond +} + +// ActionArtifactMeta is the meta data of an artifact +type ActionArtifactMeta struct { + ArtifactName string + FileSize int64 + Status ArtifactStatus +} + +// ListUploadedArtifactsMeta returns all uploaded artifacts meta of a run +func ListUploadedArtifactsMeta(ctx context.Context, runID int64) ([]*ActionArtifactMeta, error) { + arts := make([]*ActionArtifactMeta, 0, 10) + return arts, db.GetEngine(ctx).Table("action_artifact"). + Where("run_id=? AND (status=? OR status=?)", runID, ArtifactStatusUploadConfirmed, ArtifactStatusExpired). + GroupBy("artifact_name"). + Select("artifact_name, sum(file_size) as file_size, max(status) as status"). + Find(&arts) +} + +// ListNeedExpiredArtifacts returns all need expired artifacts but not deleted +func ListNeedExpiredArtifacts(ctx context.Context) ([]*ActionArtifact, error) { + arts := make([]*ActionArtifact, 0, 10) + return arts, db.GetEngine(ctx). + Where("expired_unix < ? AND status = ?", timeutil.TimeStamp(time.Now().Unix()), ArtifactStatusUploadConfirmed).Find(&arts) +} + +// ListPendingDeleteArtifacts returns all artifacts in pending-delete status. +// limit is the max number of artifacts to return. +func ListPendingDeleteArtifacts(ctx context.Context, limit int) ([]*ActionArtifact, error) { + arts := make([]*ActionArtifact, 0, limit) + return arts, db.GetEngine(ctx). + Where("status = ?", ArtifactStatusPendingDeletion).Limit(limit).Find(&arts) +} + +// SetArtifactExpired sets an artifact to expired +func SetArtifactExpired(ctx context.Context, artifactID int64) error { + _, err := db.GetEngine(ctx).Where("id=? AND status = ?", artifactID, ArtifactStatusUploadConfirmed).Cols("status").Update(&ActionArtifact{Status: int64(ArtifactStatusExpired)}) + return err +} + +// SetArtifactNeedDelete sets an artifact to need-delete, cron job will delete it +func SetArtifactNeedDelete(ctx context.Context, runID int64, name string) error { + _, err := db.GetEngine(ctx).Where("run_id=? AND artifact_name=? AND status = ?", runID, name, ArtifactStatusUploadConfirmed).Cols("status").Update(&ActionArtifact{Status: int64(ArtifactStatusPendingDeletion)}) + return err +} + +// SetArtifactDeleted sets an artifact to deleted +func SetArtifactDeleted(ctx context.Context, artifactID int64) error { + _, err := db.GetEngine(ctx).ID(artifactID).Cols("status").Update(&ActionArtifact{Status: int64(ArtifactStatusDeleted)}) + return err +} diff --git a/models/actions/forgejo.go b/models/actions/forgejo.go new file mode 100644 index 0000000..5ea77f4 --- /dev/null +++ b/models/actions/forgejo.go @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "crypto/subtle" + "fmt" + + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/util" + + gouuid "github.com/google/uuid" +) + +func RegisterRunner(ctx context.Context, ownerID, repoID int64, token string, labels *[]string, name, version string) (*ActionRunner, error) { + uuid, err := gouuid.FromBytes([]byte(token[:16])) + if err != nil { + return nil, fmt.Errorf("gouuid.FromBytes %v", err) + } + uuidString := uuid.String() + + var runner ActionRunner + + has, err := db.GetEngine(ctx).Where("uuid=?", uuidString).Get(&runner) + if err != nil { + return nil, fmt.Errorf("GetRunner %v", err) + } + + var mustUpdateSecret bool + if has { + // + // The runner exists, check if the rest of the token has changed. + // + mustUpdateSecret = subtle.ConstantTimeCompare( + []byte(runner.TokenHash), + []byte(auth_model.HashToken(token, runner.TokenSalt)), + ) != 1 + } else { + // + // The runner does not exist yet, create it + // + runner = ActionRunner{ + UUID: uuidString, + AgentLabels: []string{}, + } + + if err := runner.UpdateSecret(token); err != nil { + return &runner, fmt.Errorf("can't set new runner's secret: %w", err) + } + + if err := CreateRunner(ctx, &runner); err != nil { + return &runner, fmt.Errorf("can't create new runner %w", err) + } + } + + // + // Update the existing runner + // + name, _ = util.SplitStringAtByteN(name, 255) + + cols := []string{"name", "owner_id", "repo_id", "version"} + runner.Name = name + runner.OwnerID = ownerID + runner.RepoID = repoID + runner.Version = version + if labels != nil { + runner.AgentLabels = *labels + cols = append(cols, "agent_labels") + } + if mustUpdateSecret { + if err := runner.UpdateSecret(token); err != nil { + return &runner, fmt.Errorf("can't change runner's secret: %w", err) + } + cols = append(cols, "token_hash", "token_salt") + } + + if err := UpdateRunner(ctx, &runner, cols...); err != nil { + return &runner, fmt.Errorf("can't update the runner %+v %w", runner, err) + } + + return &runner, nil +} diff --git a/models/actions/forgejo_test.go b/models/actions/forgejo_test.go new file mode 100644 index 0000000..9295fc6 --- /dev/null +++ b/models/actions/forgejo_test.go @@ -0,0 +1,178 @@ +// SPDX-License-Identifier: MIT + +package actions + +import ( + "crypto/subtle" + "testing" + + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unittest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestActions_RegisterRunner_Token(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + ownerID := int64(0) + repoID := int64(0) + token := "0123456789012345678901234567890123456789" + labels := []string{} + name := "runner" + version := "v1.2.3" + runner, err := RegisterRunner(db.DefaultContext, ownerID, repoID, token, &labels, name, version) + require.NoError(t, err) + assert.EqualValues(t, name, runner.Name) + + assert.EqualValues(t, 1, subtle.ConstantTimeCompare([]byte(runner.TokenHash), []byte(auth_model.HashToken(token, runner.TokenSalt))), "the token cannot be verified with the same method as routers/api/actions/runner/interceptor.go as of 8228751c55d6a4263f0fec2932ca16181c09c97d") +} + +// TestActions_RegisterRunner_TokenUpdate tests that a token's secret is updated +// when a runner already exists and RegisterRunner is called with a token +// parameter whose first 16 bytes match that record but where the last 24 bytes +// do not match. +func TestActions_RegisterRunner_TokenUpdate(t *testing.T) { + const recordID = 12345678 + oldToken := "7e577e577e577e57feedfacefeedfacefeedface" + newToken := "7e577e577e577e57deadbeefdeadbeefdeadbeef" + require.NoError(t, unittest.PrepareTestDatabase()) + before := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: recordID}) + require.Equal(t, + before.TokenHash, auth_model.HashToken(oldToken, before.TokenSalt), + "the initial token should match the runner's secret", + ) + + RegisterRunner(db.DefaultContext, before.OwnerID, before.RepoID, newToken, nil, before.Name, before.Version) + + after := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: recordID}) + + assert.Equal(t, before.UUID, after.UUID) + assert.NotEqual(t, + after.TokenHash, auth_model.HashToken(oldToken, after.TokenSalt), + "the old token can still be verified", + ) + assert.Equal(t, + after.TokenHash, auth_model.HashToken(newToken, after.TokenSalt), + "the new token cannot be verified", + ) +} + +func TestActions_RegisterRunner_CreateWithLabels(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + ownerID := int64(0) + repoID := int64(0) + token := "0123456789012345678901234567890123456789" + name := "runner" + version := "v1.2.3" + labels := []string{"woop", "doop"} + labelsCopy := labels // labels may be affected by the tested function so we copy them + + runner, err := RegisterRunner(db.DefaultContext, ownerID, repoID, token, &labels, name, version) + require.NoError(t, err) + + // Check that the returned record has been updated, except for the labels + assert.EqualValues(t, ownerID, runner.OwnerID) + assert.EqualValues(t, repoID, runner.RepoID) + assert.EqualValues(t, name, runner.Name) + assert.EqualValues(t, version, runner.Version) + assert.EqualValues(t, labelsCopy, runner.AgentLabels) + + // Check that whatever is in the DB has been updated, except for the labels + after := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: runner.ID}) + assert.EqualValues(t, ownerID, after.OwnerID) + assert.EqualValues(t, repoID, after.RepoID) + assert.EqualValues(t, name, after.Name) + assert.EqualValues(t, version, after.Version) + assert.EqualValues(t, labelsCopy, after.AgentLabels) +} + +func TestActions_RegisterRunner_CreateWithoutLabels(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + ownerID := int64(0) + repoID := int64(0) + token := "0123456789012345678901234567890123456789" + name := "runner" + version := "v1.2.3" + + runner, err := RegisterRunner(db.DefaultContext, ownerID, repoID, token, nil, name, version) + require.NoError(t, err) + + // Check that the returned record has been updated, except for the labels + assert.EqualValues(t, ownerID, runner.OwnerID) + assert.EqualValues(t, repoID, runner.RepoID) + assert.EqualValues(t, name, runner.Name) + assert.EqualValues(t, version, runner.Version) + assert.EqualValues(t, []string{}, runner.AgentLabels) + + // Check that whatever is in the DB has been updated, except for the labels + after := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: runner.ID}) + assert.EqualValues(t, ownerID, after.OwnerID) + assert.EqualValues(t, repoID, after.RepoID) + assert.EqualValues(t, name, after.Name) + assert.EqualValues(t, version, after.Version) + assert.EqualValues(t, []string{}, after.AgentLabels) +} + +func TestActions_RegisterRunner_UpdateWithLabels(t *testing.T) { + const recordID = 12345678 + token := "7e577e577e577e57feedfacefeedfacefeedface" + require.NoError(t, unittest.PrepareTestDatabase()) + unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: recordID}) + + newOwnerID := int64(1) + newRepoID := int64(1) + newName := "rennur" + newVersion := "v4.5.6" + newLabels := []string{"warp", "darp"} + labelsCopy := newLabels // labels may be affected by the tested function so we copy them + + runner, err := RegisterRunner(db.DefaultContext, newOwnerID, newRepoID, token, &newLabels, newName, newVersion) + require.NoError(t, err) + + // Check that the returned record has been updated + assert.EqualValues(t, newOwnerID, runner.OwnerID) + assert.EqualValues(t, newRepoID, runner.RepoID) + assert.EqualValues(t, newName, runner.Name) + assert.EqualValues(t, newVersion, runner.Version) + assert.EqualValues(t, labelsCopy, runner.AgentLabels) + + // Check that whatever is in the DB has been updated + after := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: recordID}) + assert.EqualValues(t, newOwnerID, after.OwnerID) + assert.EqualValues(t, newRepoID, after.RepoID) + assert.EqualValues(t, newName, after.Name) + assert.EqualValues(t, newVersion, after.Version) + assert.EqualValues(t, labelsCopy, after.AgentLabels) +} + +func TestActions_RegisterRunner_UpdateWithoutLabels(t *testing.T) { + const recordID = 12345678 + token := "7e577e577e577e57feedfacefeedfacefeedface" + require.NoError(t, unittest.PrepareTestDatabase()) + before := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: recordID}) + + newOwnerID := int64(1) + newRepoID := int64(1) + newName := "rennur" + newVersion := "v4.5.6" + + runner, err := RegisterRunner(db.DefaultContext, newOwnerID, newRepoID, token, nil, newName, newVersion) + require.NoError(t, err) + + // Check that the returned record has been updated, except for the labels + assert.EqualValues(t, newOwnerID, runner.OwnerID) + assert.EqualValues(t, newRepoID, runner.RepoID) + assert.EqualValues(t, newName, runner.Name) + assert.EqualValues(t, newVersion, runner.Version) + assert.EqualValues(t, before.AgentLabels, runner.AgentLabels) + + // Check that whatever is in the DB has been updated, except for the labels + after := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: recordID}) + assert.EqualValues(t, newOwnerID, after.OwnerID) + assert.EqualValues(t, newRepoID, after.RepoID) + assert.EqualValues(t, newName, after.Name) + assert.EqualValues(t, newVersion, after.Version) + assert.EqualValues(t, before.AgentLabels, after.AgentLabels) +} diff --git a/models/actions/main_test.go b/models/actions/main_test.go new file mode 100644 index 0000000..3cfb395 --- /dev/null +++ b/models/actions/main_test.go @@ -0,0 +1,19 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "testing" + + "code.gitea.io/gitea/models/unittest" +) + +func TestMain(m *testing.M) { + unittest.MainTest(m, &unittest.TestOptions{ + FixtureFiles: []string{ + "action_runner.yml", + "action_runner_token.yml", + }, + }) +} diff --git a/models/actions/run.go b/models/actions/run.go new file mode 100644 index 0000000..8b40cb7 --- /dev/null +++ b/models/actions/run.go @@ -0,0 +1,421 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "slices" + "strings" + "time" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/json" + api "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + webhook_module "code.gitea.io/gitea/modules/webhook" + + "github.com/nektos/act/pkg/jobparser" + "xorm.io/builder" +) + +// ActionRun represents a run of a workflow file +type ActionRun struct { + ID int64 + Title string + RepoID int64 `xorm:"index unique(repo_index)"` + Repo *repo_model.Repository `xorm:"-"` + OwnerID int64 `xorm:"index"` + WorkflowID string `xorm:"index"` // the name of workflow file + Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository + TriggerUserID int64 `xorm:"index"` + TriggerUser *user_model.User `xorm:"-"` + ScheduleID int64 + Ref string `xorm:"index"` // the commit/tag/… that caused the run + CommitSHA string + IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow. + NeedApproval bool // may need approval if it's a fork pull request + ApprovedBy int64 `xorm:"index"` // who approved + Event webhook_module.HookEventType // the webhook event that causes the workflow to run + EventPayload string `xorm:"LONGTEXT"` + TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow + Status Status `xorm:"index"` + Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed + // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 + Started timeutil.TimeStamp + Stopped timeutil.TimeStamp + // PreviousDuration is used for recording previous duration + PreviousDuration time.Duration + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` +} + +func init() { + db.RegisterModel(new(ActionRun)) + db.RegisterModel(new(ActionRunIndex)) +} + +func (run *ActionRun) HTMLURL() string { + if run.Repo == nil { + return "" + } + return fmt.Sprintf("%s/actions/runs/%d", run.Repo.HTMLURL(), run.Index) +} + +func (run *ActionRun) Link() string { + if run.Repo == nil { + return "" + } + return fmt.Sprintf("%s/actions/runs/%d", run.Repo.Link(), run.Index) +} + +// RefLink return the url of run's ref +func (run *ActionRun) RefLink() string { + refName := git.RefName(run.Ref) + if refName.IsPull() { + return run.Repo.Link() + "/pulls/" + refName.ShortName() + } + return git.RefURL(run.Repo.Link(), run.Ref) +} + +// PrettyRef return #id for pull ref or ShortName for others +func (run *ActionRun) PrettyRef() string { + refName := git.RefName(run.Ref) + if refName.IsPull() { + return "#" + strings.TrimSuffix(strings.TrimPrefix(run.Ref, git.PullPrefix), "/head") + } + return refName.ShortName() +} + +// LoadAttributes load Repo TriggerUser if not loaded +func (run *ActionRun) LoadAttributes(ctx context.Context) error { + if run == nil { + return nil + } + + if err := run.LoadRepo(ctx); err != nil { + return err + } + + if err := run.Repo.LoadAttributes(ctx); err != nil { + return err + } + + if run.TriggerUser == nil { + u, err := user_model.GetPossibleUserByID(ctx, run.TriggerUserID) + if err != nil { + return err + } + run.TriggerUser = u + } + + return nil +} + +func (run *ActionRun) LoadRepo(ctx context.Context) error { + if run == nil || run.Repo != nil { + return nil + } + + repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID) + if err != nil { + return err + } + run.Repo = repo + return nil +} + +func (run *ActionRun) Duration() time.Duration { + return calculateDuration(run.Started, run.Stopped, run.Status) + run.PreviousDuration +} + +func (run *ActionRun) GetPushEventPayload() (*api.PushPayload, error) { + if run.Event == webhook_module.HookEventPush { + var payload api.PushPayload + if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil { + return nil, err + } + return &payload, nil + } + return nil, fmt.Errorf("event %s is not a push event", run.Event) +} + +func (run *ActionRun) GetPullRequestEventPayload() (*api.PullRequestPayload, error) { + if run.Event == webhook_module.HookEventPullRequest || run.Event == webhook_module.HookEventPullRequestSync { + var payload api.PullRequestPayload + if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil { + return nil, err + } + return &payload, nil + } + return nil, fmt.Errorf("event %s is not a pull request event", run.Event) +} + +func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { + _, err := db.GetEngine(ctx).ID(repo.ID). + SetExpr("num_action_runs", + builder.Select("count(*)").From("action_run"). + Where(builder.Eq{"repo_id": repo.ID}), + ). + SetExpr("num_closed_action_runs", + builder.Select("count(*)").From("action_run"). + Where(builder.Eq{ + "repo_id": repo.ID, + }.And( + builder.In("status", + StatusSuccess, + StatusFailure, + StatusCancelled, + StatusSkipped, + ), + ), + ), + ). + Update(repo) + return err +} + +// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event. +// It's useful when a new run is triggered, and all previous runs needn't be continued anymore. +func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { + // Find all runs in the specified repository, reference, and workflow with non-final status + runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{ + RepoID: repoID, + Ref: ref, + WorkflowID: workflowID, + TriggerEvent: event, + Status: []Status{StatusRunning, StatusWaiting, StatusBlocked}, + }) + if err != nil { + return err + } + + // If there are no runs found, there's no need to proceed with cancellation, so return nil. + if total == 0 { + return nil + } + + // Iterate over each found run and cancel its associated jobs. + for _, run := range runs { + // Find all jobs associated with the current run. + jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ + RunID: run.ID, + }) + if err != nil { + return err + } + + // Iterate over each job and attempt to cancel it. + for _, job := range jobs { + // Skip jobs that are already in a terminal state (completed, cancelled, etc.). + status := job.Status + if status.IsDone() { + continue + } + + // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. + if job.TaskID == 0 { + job.Status = StatusCancelled + job.Stopped = timeutil.TimeStampNow() + + // Update the job's status and stopped time in the database. + n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") + if err != nil { + return err + } + + // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. + if n == 0 { + return fmt.Errorf("job has changed, try again") + } + + // Continue with the next job. + continue + } + + // If the job has an associated task, try to stop the task, effectively cancelling the job. + if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { + return err + } + } + } + + // Return nil to indicate successful cancellation of all running and waiting jobs. + return nil +} + +// InsertRun inserts a run +func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error { + ctx, commiter, err := db.TxContext(ctx) + if err != nil { + return err + } + defer commiter.Close() + + index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) + if err != nil { + return err + } + run.Index = index + + if err := db.Insert(ctx, run); err != nil { + return err + } + + if run.Repo == nil { + repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID) + if err != nil { + return err + } + run.Repo = repo + } + + if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { + return err + } + + runJobs := make([]*ActionRunJob, 0, len(jobs)) + var hasWaiting bool + for _, v := range jobs { + id, job := v.Job() + needs := job.Needs() + if err := v.SetJob(id, job.EraseNeeds()); err != nil { + return err + } + payload, _ := v.Marshal() + status := StatusWaiting + if len(needs) > 0 || run.NeedApproval { + status = StatusBlocked + } else { + hasWaiting = true + } + job.Name, _ = util.SplitStringAtByteN(job.Name, 255) + runJobs = append(runJobs, &ActionRunJob{ + RunID: run.ID, + RepoID: run.RepoID, + OwnerID: run.OwnerID, + CommitSHA: run.CommitSHA, + IsForkPullRequest: run.IsForkPullRequest, + Name: job.Name, + WorkflowPayload: payload, + JobID: id, + Needs: needs, + RunsOn: job.RunsOn(), + Status: status, + }) + } + if err := db.Insert(ctx, runJobs); err != nil { + return err + } + + // if there is a job in the waiting status, increase tasks version. + if hasWaiting { + if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + return err + } + } + + return commiter.Commit() +} + +func GetLatestRun(ctx context.Context, repoID int64) (*ActionRun, error) { + var run ActionRun + has, err := db.GetEngine(ctx).Where("repo_id=?", repoID).OrderBy("id DESC").Limit(1).Get(&run) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("latest run: %w", util.ErrNotExist) + } + return &run, nil +} + +func GetLatestRunForBranchAndWorkflow(ctx context.Context, repoID int64, branch, workflowFile, event string) (*ActionRun, error) { + var run ActionRun + q := db.GetEngine(ctx).Where("repo_id=?", repoID).And("workflow_id=?", workflowFile) + if event != "" { + q = q.And("event=?", event) + } + if branch != "" { + q = q.And("ref=?", branch) + } + has, err := q.Desc("id").Get(&run) + if err != nil { + return nil, err + } else if !has { + return nil, util.NewNotExistErrorf("run with repo_id %d, ref %s, event %s, workflow_id %s", repoID, branch, event, workflowFile) + } + return &run, nil +} + +func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) { + var run ActionRun + has, err := db.GetEngine(ctx).Where("id=?", id).Get(&run) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("run with id %d: %w", id, util.ErrNotExist) + } + + return &run, nil +} + +func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error) { + run := &ActionRun{ + RepoID: repoID, + Index: index, + } + has, err := db.GetEngine(ctx).Get(run) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("run with index %d %d: %w", repoID, index, util.ErrNotExist) + } + + return run, nil +} + +// UpdateRun updates a run. +// It requires the inputted run has Version set. +// It will return error if the version is not matched (it means the run has been changed after loaded). +func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { + sess := db.GetEngine(ctx).ID(run.ID) + if len(cols) > 0 { + sess.Cols(cols...) + } + affected, err := sess.Update(run) + if err != nil { + return err + } + if affected == 0 { + return fmt.Errorf("run has changed") + // It's impossible that the run is not found, since Gitea never deletes runs. + } + + if run.Status != 0 || slices.Contains(cols, "status") { + if run.RepoID == 0 { + run, err = GetRunByID(ctx, run.ID) + if err != nil { + return err + } + } + if run.Repo == nil { + repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID) + if err != nil { + return err + } + run.Repo = repo + } + if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { + return err + } + } + + return nil +} + +type ActionRunIndex db.ResourceIndex diff --git a/models/actions/run_job.go b/models/actions/run_job.go new file mode 100644 index 0000000..4b86640 --- /dev/null +++ b/models/actions/run_job.go @@ -0,0 +1,180 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "slices" + "time" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + + "xorm.io/builder" +) + +// ActionRunJob represents a job of a run +type ActionRunJob struct { + ID int64 + RunID int64 `xorm:"index"` + Run *ActionRun `xorm:"-"` + RepoID int64 `xorm:"index"` + OwnerID int64 `xorm:"index"` + CommitSHA string `xorm:"index"` + IsForkPullRequest bool + Name string `xorm:"VARCHAR(255)"` + Attempt int64 + WorkflowPayload []byte + JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id + Needs []string `xorm:"JSON TEXT"` + RunsOn []string `xorm:"JSON TEXT"` + TaskID int64 // the latest task of the job + Status Status `xorm:"index"` + Started timeutil.TimeStamp + Stopped timeutil.TimeStamp + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated index"` +} + +func init() { + db.RegisterModel(new(ActionRunJob)) +} + +func (job *ActionRunJob) Duration() time.Duration { + return calculateDuration(job.Started, job.Stopped, job.Status) +} + +func (job *ActionRunJob) LoadRun(ctx context.Context) error { + if job.Run == nil { + run, err := GetRunByID(ctx, job.RunID) + if err != nil { + return err + } + job.Run = run + } + return nil +} + +// LoadAttributes load Run if not loaded +func (job *ActionRunJob) LoadAttributes(ctx context.Context) error { + if job == nil { + return nil + } + + if err := job.LoadRun(ctx); err != nil { + return err + } + + return job.Run.LoadAttributes(ctx) +} + +func GetRunJobByID(ctx context.Context, id int64) (*ActionRunJob, error) { + var job ActionRunJob + has, err := db.GetEngine(ctx).Where("id=?", id).Get(&job) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("run job with id %d: %w", id, util.ErrNotExist) + } + + return &job, nil +} + +func GetRunJobsByRunID(ctx context.Context, runID int64) ([]*ActionRunJob, error) { + var jobs []*ActionRunJob + if err := db.GetEngine(ctx).Where("run_id=?", runID).OrderBy("id").Find(&jobs); err != nil { + return nil, err + } + return jobs, nil +} + +func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) { + e := db.GetEngine(ctx) + + sess := e.ID(job.ID) + if len(cols) > 0 { + sess.Cols(cols...) + } + + if cond != nil { + sess.Where(cond) + } + + affected, err := sess.Update(job) + if err != nil { + return 0, err + } + + if affected == 0 || (!slices.Contains(cols, "status") && job.Status == 0) { + return affected, nil + } + + if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() { + // if the status of job changes to waiting again, increase tasks version. + if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + return 0, err + } + } + + if job.RunID == 0 { + var err error + if job, err = GetRunJobByID(ctx, job.ID); err != nil { + return 0, err + } + } + + { + // Other goroutines may aggregate the status of the run and update it too. + // So we need load the run and its jobs before updating the run. + run, err := GetRunByID(ctx, job.RunID) + if err != nil { + return 0, err + } + jobs, err := GetRunJobsByRunID(ctx, job.RunID) + if err != nil { + return 0, err + } + run.Status = aggregateJobStatus(jobs) + if run.Started.IsZero() && run.Status.IsRunning() { + run.Started = timeutil.TimeStampNow() + } + if run.Stopped.IsZero() && run.Status.IsDone() { + run.Stopped = timeutil.TimeStampNow() + } + if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil { + return 0, fmt.Errorf("update run %d: %w", run.ID, err) + } + } + + return affected, nil +} + +func aggregateJobStatus(jobs []*ActionRunJob) Status { + allDone := true + allWaiting := true + hasFailure := false + for _, job := range jobs { + if !job.Status.IsDone() { + allDone = false + } + if job.Status != StatusWaiting && !job.Status.IsDone() { + allWaiting = false + } + if job.Status == StatusFailure || job.Status == StatusCancelled { + hasFailure = true + } + } + if allDone { + if hasFailure { + return StatusFailure + } + return StatusSuccess + } + if allWaiting { + return StatusWaiting + } + return StatusRunning +} diff --git a/models/actions/run_job_list.go b/models/actions/run_job_list.go new file mode 100644 index 0000000..6c5d3b3 --- /dev/null +++ b/models/actions/run_job_list.go @@ -0,0 +1,80 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/builder" +) + +type ActionJobList []*ActionRunJob + +func (jobs ActionJobList) GetRunIDs() []int64 { + return container.FilterSlice(jobs, func(j *ActionRunJob) (int64, bool) { + return j.RunID, j.RunID != 0 + }) +} + +func (jobs ActionJobList) LoadRuns(ctx context.Context, withRepo bool) error { + runIDs := jobs.GetRunIDs() + runs := make(map[int64]*ActionRun, len(runIDs)) + if err := db.GetEngine(ctx).In("id", runIDs).Find(&runs); err != nil { + return err + } + for _, j := range jobs { + if j.RunID > 0 && j.Run == nil { + j.Run = runs[j.RunID] + } + } + if withRepo { + var runsList RunList = make([]*ActionRun, 0, len(runs)) + for _, r := range runs { + runsList = append(runsList, r) + } + return runsList.LoadRepos(ctx) + } + return nil +} + +func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) error { + return jobs.LoadRuns(ctx, withRepo) +} + +type FindRunJobOptions struct { + db.ListOptions + RunID int64 + RepoID int64 + OwnerID int64 + CommitSHA string + Statuses []Status + UpdatedBefore timeutil.TimeStamp +} + +func (opts FindRunJobOptions) ToConds() builder.Cond { + cond := builder.NewCond() + if opts.RunID > 0 { + cond = cond.And(builder.Eq{"run_id": opts.RunID}) + } + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + if opts.OwnerID > 0 { + cond = cond.And(builder.Eq{"owner_id": opts.OwnerID}) + } + if opts.CommitSHA != "" { + cond = cond.And(builder.Eq{"commit_sha": opts.CommitSHA}) + } + if len(opts.Statuses) > 0 { + cond = cond.And(builder.In("status", opts.Statuses)) + } + if opts.UpdatedBefore > 0 { + cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore}) + } + return cond +} diff --git a/models/actions/run_list.go b/models/actions/run_list.go new file mode 100644 index 0000000..4046c7d --- /dev/null +++ b/models/actions/run_list.go @@ -0,0 +1,138 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/container" + webhook_module "code.gitea.io/gitea/modules/webhook" + + "xorm.io/builder" +) + +type RunList []*ActionRun + +// GetUserIDs returns a slice of user's id +func (runs RunList) GetUserIDs() []int64 { + return container.FilterSlice(runs, func(run *ActionRun) (int64, bool) { + return run.TriggerUserID, true + }) +} + +func (runs RunList) GetRepoIDs() []int64 { + return container.FilterSlice(runs, func(run *ActionRun) (int64, bool) { + return run.RepoID, true + }) +} + +func (runs RunList) LoadTriggerUser(ctx context.Context) error { + userIDs := runs.GetUserIDs() + users := make(map[int64]*user_model.User, len(userIDs)) + if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil { + return err + } + for _, run := range runs { + if run.TriggerUserID == user_model.ActionsUserID { + run.TriggerUser = user_model.NewActionsUser() + } else { + run.TriggerUser = users[run.TriggerUserID] + if run.TriggerUser == nil { + run.TriggerUser = user_model.NewGhostUser() + } + } + } + return nil +} + +func (runs RunList) LoadRepos(ctx context.Context) error { + repoIDs := runs.GetRepoIDs() + repos, err := repo_model.GetRepositoriesMapByIDs(ctx, repoIDs) + if err != nil { + return err + } + for _, run := range runs { + run.Repo = repos[run.RepoID] + } + return nil +} + +type FindRunOptions struct { + db.ListOptions + RepoID int64 + OwnerID int64 + WorkflowID string + Ref string // the commit/tag/… that caused this workflow + TriggerUserID int64 + TriggerEvent webhook_module.HookEventType + Approved bool // not util.OptionalBool, it works only when it's true + Status []Status +} + +func (opts FindRunOptions) ToConds() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + if opts.OwnerID > 0 { + cond = cond.And(builder.Eq{"owner_id": opts.OwnerID}) + } + if opts.WorkflowID != "" { + cond = cond.And(builder.Eq{"workflow_id": opts.WorkflowID}) + } + if opts.TriggerUserID > 0 { + cond = cond.And(builder.Eq{"trigger_user_id": opts.TriggerUserID}) + } + if opts.Approved { + cond = cond.And(builder.Gt{"approved_by": 0}) + } + if len(opts.Status) > 0 { + cond = cond.And(builder.In("status", opts.Status)) + } + if opts.Ref != "" { + cond = cond.And(builder.Eq{"ref": opts.Ref}) + } + if opts.TriggerEvent != "" { + cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent}) + } + return cond +} + +func (opts FindRunOptions) ToOrders() string { + return "`id` DESC" +} + +type StatusInfo struct { + Status int + DisplayedStatus string +} + +// GetStatusInfoList returns a slice of StatusInfo +func GetStatusInfoList(ctx context.Context) []StatusInfo { + // same as those in aggregateJobStatus + allStatus := []Status{StatusSuccess, StatusFailure, StatusWaiting, StatusRunning} + statusInfoList := make([]StatusInfo, 0, 4) + for _, s := range allStatus { + statusInfoList = append(statusInfoList, StatusInfo{ + Status: int(s), + DisplayedStatus: s.String(), + }) + } + return statusInfoList +} + +// GetActors returns a slice of Actors +func GetActors(ctx context.Context, repoID int64) ([]*user_model.User, error) { + actors := make([]*user_model.User, 0, 10) + + return actors, db.GetEngine(ctx).Where(builder.In("id", builder.Select("`action_run`.trigger_user_id").From("`action_run`"). + GroupBy("`action_run`.trigger_user_id"). + Where(builder.Eq{"`action_run`.repo_id": repoID}))). + Cols("id", "name", "full_name", "avatar", "avatar_email", "use_custom_avatar"). + OrderBy(user_model.GetOrderByName()). + Find(&actors) +} diff --git a/models/actions/runner.go b/models/actions/runner.go new file mode 100644 index 0000000..175f211 --- /dev/null +++ b/models/actions/runner.go @@ -0,0 +1,362 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "encoding/binary" + "encoding/hex" + "fmt" + "strings" + "time" + + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/shared/types" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/optional" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/translation" + "code.gitea.io/gitea/modules/util" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "xorm.io/builder" +) + +// ActionRunner represents runner machines +// +// It can be: +// 1. global runner, OwnerID is 0 and RepoID is 0 +// 2. org/user level runner, OwnerID is org/user ID and RepoID is 0 +// 3. repo level runner, OwnerID is 0 and RepoID is repo ID +// +// Please note that it's not acceptable to have both OwnerID and RepoID to be non-zero, +// or it will be complicated to find runners belonging to a specific owner. +// For example, conditions like `OwnerID = 1` will also return runner {OwnerID: 1, RepoID: 1}, +// but it's a repo level runner, not an org/user level runner. +// To avoid this, make it clear with {OwnerID: 0, RepoID: 1} for repo level runners. +type ActionRunner struct { + ID int64 + UUID string `xorm:"CHAR(36) UNIQUE"` + Name string `xorm:"VARCHAR(255)"` + Version string `xorm:"VARCHAR(64)"` + OwnerID int64 `xorm:"index"` + Owner *user_model.User `xorm:"-"` + RepoID int64 `xorm:"index"` + Repo *repo_model.Repository `xorm:"-"` + Description string `xorm:"TEXT"` + Base int // 0 native 1 docker 2 virtual machine + RepoRange string // glob match which repositories could use this runner + + Token string `xorm:"-"` + TokenHash string `xorm:"UNIQUE"` // sha256 of token + TokenSalt string + // TokenLastEight string `xorm:"token_last_eight"` // it's unnecessary because we don't find runners by token + + LastOnline timeutil.TimeStamp `xorm:"index"` + LastActive timeutil.TimeStamp `xorm:"index"` + + // Store labels defined in state file (default: .runner file) of `act_runner` + AgentLabels []string `xorm:"TEXT"` + + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` + Deleted timeutil.TimeStamp `xorm:"deleted"` +} + +const ( + RunnerOfflineTime = time.Minute + RunnerIdleTime = 10 * time.Second +) + +// BelongsToOwnerName before calling, should guarantee that all attributes are loaded +func (r *ActionRunner) BelongsToOwnerName() string { + if r.RepoID != 0 { + return r.Repo.FullName() + } + if r.OwnerID != 0 { + return r.Owner.Name + } + return "" +} + +func (r *ActionRunner) BelongsToOwnerType() types.OwnerType { + if r.RepoID != 0 { + return types.OwnerTypeRepository + } + if r.OwnerID != 0 { + if r.Owner.Type == user_model.UserTypeOrganization { + return types.OwnerTypeOrganization + } else if r.Owner.Type == user_model.UserTypeIndividual { + return types.OwnerTypeIndividual + } + } + return types.OwnerTypeSystemGlobal +} + +// if the logic here changed, you should also modify FindRunnerOptions.ToCond +func (r *ActionRunner) Status() runnerv1.RunnerStatus { + if time.Since(r.LastOnline.AsTime()) > RunnerOfflineTime { + return runnerv1.RunnerStatus_RUNNER_STATUS_OFFLINE + } + if time.Since(r.LastActive.AsTime()) > RunnerIdleTime { + return runnerv1.RunnerStatus_RUNNER_STATUS_IDLE + } + return runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE +} + +func (r *ActionRunner) StatusName() string { + return strings.ToLower(strings.TrimPrefix(r.Status().String(), "RUNNER_STATUS_")) +} + +func (r *ActionRunner) StatusLocaleName(lang translation.Locale) string { + return lang.TrString("actions.runners.status." + r.StatusName()) +} + +func (r *ActionRunner) IsOnline() bool { + status := r.Status() + if status == runnerv1.RunnerStatus_RUNNER_STATUS_IDLE || status == runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE { + return true + } + return false +} + +// Editable checks if the runner is editable by the user +func (r *ActionRunner) Editable(ownerID, repoID int64) bool { + if ownerID == 0 && repoID == 0 { + return true + } + if ownerID > 0 && r.OwnerID == ownerID { + return true + } + return repoID > 0 && r.RepoID == repoID +} + +// LoadAttributes loads the attributes of the runner +func (r *ActionRunner) LoadAttributes(ctx context.Context) error { + if r.OwnerID > 0 { + var user user_model.User + has, err := db.GetEngine(ctx).ID(r.OwnerID).Get(&user) + if err != nil { + return err + } + if has { + r.Owner = &user + } + } + if r.RepoID > 0 { + var repo repo_model.Repository + has, err := db.GetEngine(ctx).ID(r.RepoID).Get(&repo) + if err != nil { + return err + } + if has { + r.Repo = &repo + } + } + return nil +} + +func (r *ActionRunner) GenerateToken() (err error) { + r.Token, r.TokenSalt, r.TokenHash, _, err = generateSaltedToken() + return err +} + +// UpdateSecret updates the hash based on the specified token. It does not +// ensure that the runner's UUID matches the first 16 bytes of the token. +func (r *ActionRunner) UpdateSecret(token string) error { + saltBytes, err := util.CryptoRandomBytes(16) + if err != nil { + return fmt.Errorf("CryptoRandomBytes %v", err) + } + + salt := hex.EncodeToString(saltBytes) + + r.Token = token + r.TokenSalt = salt + r.TokenHash = auth_model.HashToken(token, salt) + return nil +} + +func init() { + db.RegisterModel(&ActionRunner{}) +} + +type FindRunnerOptions struct { + db.ListOptions + RepoID int64 + OwnerID int64 // it will be ignored if RepoID is set + Sort string + Filter string + IsOnline optional.Option[bool] + WithAvailable bool // not only runners belong to, but also runners can be used +} + +func (opts FindRunnerOptions) ToConds() builder.Cond { + cond := builder.NewCond() + + if opts.RepoID > 0 { + c := builder.NewCond().And(builder.Eq{"repo_id": opts.RepoID}) + if opts.WithAvailable { + c = c.Or(builder.Eq{"owner_id": builder.Select("owner_id").From("repository").Where(builder.Eq{"id": opts.RepoID})}) + c = c.Or(builder.Eq{"repo_id": 0, "owner_id": 0}) + } + cond = cond.And(c) + } else if opts.OwnerID > 0 { // OwnerID is ignored if RepoID is set + c := builder.NewCond().And(builder.Eq{"owner_id": opts.OwnerID}) + if opts.WithAvailable { + c = c.Or(builder.Eq{"repo_id": 0, "owner_id": 0}) + } + cond = cond.And(c) + } + + if opts.Filter != "" { + cond = cond.And(builder.Like{"name", opts.Filter}) + } + + if opts.IsOnline.Has() { + if opts.IsOnline.Value() { + cond = cond.And(builder.Gt{"last_online": time.Now().Add(-RunnerOfflineTime).Unix()}) + } else { + cond = cond.And(builder.Lte{"last_online": time.Now().Add(-RunnerOfflineTime).Unix()}) + } + } + return cond +} + +func (opts FindRunnerOptions) ToOrders() string { + switch opts.Sort { + case "online": + return "last_online DESC" + case "offline": + return "last_online ASC" + case "alphabetically": + return "name ASC" + case "reversealphabetically": + return "name DESC" + case "newest": + return "id DESC" + case "oldest": + return "id ASC" + } + return "last_online DESC" +} + +// GetRunnerByUUID returns a runner via uuid +func GetRunnerByUUID(ctx context.Context, uuid string) (*ActionRunner, error) { + var runner ActionRunner + has, err := db.GetEngine(ctx).Where("uuid=?", uuid).Get(&runner) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("runner with uuid %s: %w", uuid, util.ErrNotExist) + } + return &runner, nil +} + +// GetRunnerByID returns a runner via id +func GetRunnerByID(ctx context.Context, id int64) (*ActionRunner, error) { + var runner ActionRunner + has, err := db.GetEngine(ctx).Where("id=?", id).Get(&runner) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("runner with id %d: %w", id, util.ErrNotExist) + } + return &runner, nil +} + +// UpdateRunner updates runner's information. +func UpdateRunner(ctx context.Context, r *ActionRunner, cols ...string) error { + e := db.GetEngine(ctx) + var err error + if len(cols) == 0 { + _, err = e.ID(r.ID).AllCols().Update(r) + } else { + _, err = e.ID(r.ID).Cols(cols...).Update(r) + } + return err +} + +// DeleteRunner deletes a runner by given ID. +func DeleteRunner(ctx context.Context, id int64) error { + runner, err := GetRunnerByID(ctx, id) + if err != nil { + return err + } + + // Replace the UUID, which was either based on the secret's first 16 bytes or an UUIDv4, + // with a sequence of 8 0xff bytes followed by the little-endian version of the record's + // identifier. This will prevent the deleted record's identifier from colliding with any + // new record. + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(id)) + runner.UUID = fmt.Sprintf("ffffffff-ffff-ffff-%.2x%.2x-%.2x%.2x%.2x%.2x%.2x%.2x", + b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7]) + + err = UpdateRunner(ctx, runner, "UUID") + if err != nil { + return err + } + + _, err = db.DeleteByID[ActionRunner](ctx, id) + return err +} + +// CreateRunner creates new runner. +func CreateRunner(ctx context.Context, t *ActionRunner) error { + if t.OwnerID != 0 && t.RepoID != 0 { + // It's trying to create a runner that belongs to a repository, but OwnerID has been set accidentally. + // Remove OwnerID to avoid confusion; it's not worth returning an error here. + t.OwnerID = 0 + } + return db.Insert(ctx, t) +} + +func CountRunnersWithoutBelongingOwner(ctx context.Context) (int64, error) { + // Only affect action runners were a owner ID is set, as actions runners + // could also be created on a repository. + return db.GetEngine(ctx).Table("action_runner"). + Join("LEFT", "`user`", "`action_runner`.owner_id = `user`.id"). + Where("`action_runner`.owner_id != ?", 0). + And(builder.IsNull{"`user`.id"}). + Count(new(ActionRunner)) +} + +func FixRunnersWithoutBelongingOwner(ctx context.Context) (int64, error) { + subQuery := builder.Select("`action_runner`.id"). + From("`action_runner`"). + Join("LEFT", "`user`", "`action_runner`.owner_id = `user`.id"). + Where(builder.Neq{"`action_runner`.owner_id": 0}). + And(builder.IsNull{"`user`.id"}) + b := builder.Delete(builder.In("id", subQuery)).From("`action_runner`") + res, err := db.GetEngine(ctx).Exec(b) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func CountRunnersWithoutBelongingRepo(ctx context.Context) (int64, error) { + return db.GetEngine(ctx).Table("action_runner"). + Join("LEFT", "`repository`", "`action_runner`.repo_id = `repository`.id"). + Where("`action_runner`.repo_id != ?", 0). + And(builder.IsNull{"`repository`.id"}). + Count(new(ActionRunner)) +} + +func FixRunnersWithoutBelongingRepo(ctx context.Context) (int64, error) { + subQuery := builder.Select("`action_runner`.id"). + From("`action_runner`"). + Join("LEFT", "`repository`", "`action_runner`.repo_id = `repository`.id"). + Where(builder.Neq{"`action_runner`.repo_id": 0}). + And(builder.IsNull{"`repository`.id"}) + b := builder.Delete(builder.In("id", subQuery)).From("`action_runner`") + res, err := db.GetEngine(ctx).Exec(b) + if err != nil { + return 0, err + } + return res.RowsAffected() +} diff --git a/models/actions/runner_list.go b/models/actions/runner_list.go new file mode 100644 index 0000000..3ef8ebb --- /dev/null +++ b/models/actions/runner_list.go @@ -0,0 +1,65 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/container" +) + +type RunnerList []*ActionRunner + +// GetUserIDs returns a slice of user's id +func (runners RunnerList) GetUserIDs() []int64 { + return container.FilterSlice(runners, func(runner *ActionRunner) (int64, bool) { + return runner.OwnerID, runner.OwnerID != 0 + }) +} + +func (runners RunnerList) LoadOwners(ctx context.Context) error { + userIDs := runners.GetUserIDs() + users := make(map[int64]*user_model.User, len(userIDs)) + if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil { + return err + } + for _, runner := range runners { + if runner.OwnerID > 0 && runner.Owner == nil { + runner.Owner = users[runner.OwnerID] + } + } + return nil +} + +func (runners RunnerList) getRepoIDs() []int64 { + return container.FilterSlice(runners, func(runner *ActionRunner) (int64, bool) { + return runner.RepoID, runner.RepoID > 0 + }) +} + +func (runners RunnerList) LoadRepos(ctx context.Context) error { + repoIDs := runners.getRepoIDs() + repos := make(map[int64]*repo_model.Repository, len(repoIDs)) + if err := db.GetEngine(ctx).In("id", repoIDs).Find(&repos); err != nil { + return err + } + + for _, runner := range runners { + if runner.RepoID > 0 && runner.Repo == nil { + runner.Repo = repos[runner.RepoID] + } + } + return nil +} + +func (runners RunnerList) LoadAttributes(ctx context.Context) error { + if err := runners.LoadOwners(ctx); err != nil { + return err + } + + return runners.LoadRepos(ctx) +} diff --git a/models/actions/runner_test.go b/models/actions/runner_test.go new file mode 100644 index 0000000..26ef4c4 --- /dev/null +++ b/models/actions/runner_test.go @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: MIT + +package actions + +import ( + "encoding/binary" + "fmt" + "testing" + + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unittest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestUpdateSecret checks that ActionRunner.UpdateSecret() sets the Token, +// TokenSalt and TokenHash fields based on the specified token. +func TestUpdateSecret(t *testing.T) { + runner := ActionRunner{} + token := "0123456789012345678901234567890123456789" + + err := runner.UpdateSecret(token) + + require.NoError(t, err) + assert.Equal(t, token, runner.Token) + assert.Regexp(t, "^[0-9a-f]{32}$", runner.TokenSalt) + assert.Equal(t, runner.TokenHash, auth_model.HashToken(token, runner.TokenSalt)) +} + +func TestDeleteRunner(t *testing.T) { + const recordID = 12345678 + require.NoError(t, unittest.PrepareTestDatabase()) + before := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: recordID}) + + err := DeleteRunner(db.DefaultContext, recordID) + require.NoError(t, err) + + var after ActionRunner + found, err := db.GetEngine(db.DefaultContext).ID(recordID).Unscoped().Get(&after) + require.NoError(t, err) + assert.True(t, found) + + // Most fields (namely Name, Version, OwnerID, RepoID, Description, Base, RepoRange, + // TokenHash, TokenSalt, LastOnline, LastActive, AgentLabels and Created) are unaffected + assert.Equal(t, before.Name, after.Name) + assert.Equal(t, before.Version, after.Version) + assert.Equal(t, before.OwnerID, after.OwnerID) + assert.Equal(t, before.RepoID, after.RepoID) + assert.Equal(t, before.Description, after.Description) + assert.Equal(t, before.Base, after.Base) + assert.Equal(t, before.RepoRange, after.RepoRange) + assert.Equal(t, before.TokenHash, after.TokenHash) + assert.Equal(t, before.TokenSalt, after.TokenSalt) + assert.Equal(t, before.LastOnline, after.LastOnline) + assert.Equal(t, before.LastActive, after.LastActive) + assert.Equal(t, before.AgentLabels, after.AgentLabels) + assert.Equal(t, before.Created, after.Created) + + // Deleted contains a value + assert.NotNil(t, after.Deleted) + + // UUID was modified + assert.NotEqual(t, before.UUID, after.UUID) + // UUID starts with ffffffff-ffff-ffff- + assert.Equal(t, "ffffffff-ffff-ffff-", after.UUID[:19]) + // UUID ends with LE binary representation of record ID + idAsBinary := make([]byte, 8) + binary.LittleEndian.PutUint64(idAsBinary, uint64(recordID)) + idAsHexadecimal := fmt.Sprintf("%.2x%.2x-%.2x%.2x%.2x%.2x%.2x%.2x", idAsBinary[0], + idAsBinary[1], idAsBinary[2], idAsBinary[3], idAsBinary[4], idAsBinary[5], + idAsBinary[6], idAsBinary[7]) + assert.Equal(t, idAsHexadecimal, after.UUID[19:]) +} diff --git a/models/actions/runner_token.go b/models/actions/runner_token.go new file mode 100644 index 0000000..fd6ba7e --- /dev/null +++ b/models/actions/runner_token.go @@ -0,0 +1,120 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" +) + +// ActionRunnerToken represents runner tokens +// +// It can be: +// 1. global token, OwnerID is 0 and RepoID is 0 +// 2. org/user level token, OwnerID is org/user ID and RepoID is 0 +// 3. repo level token, OwnerID is 0 and RepoID is repo ID +// +// Please note that it's not acceptable to have both OwnerID and RepoID to be non-zero, +// or it will be complicated to find tokens belonging to a specific owner. +// For example, conditions like `OwnerID = 1` will also return token {OwnerID: 1, RepoID: 1}, +// but it's a repo level token, not an org/user level token. +// To avoid this, make it clear with {OwnerID: 0, RepoID: 1} for repo level tokens. +type ActionRunnerToken struct { + ID int64 + Token string `xorm:"UNIQUE"` + OwnerID int64 `xorm:"index"` + Owner *user_model.User `xorm:"-"` + RepoID int64 `xorm:"index"` + Repo *repo_model.Repository `xorm:"-"` + IsActive bool // true means it can be used + + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` + Deleted timeutil.TimeStamp `xorm:"deleted"` +} + +func init() { + db.RegisterModel(new(ActionRunnerToken)) +} + +// GetRunnerToken returns a action runner via token +func GetRunnerToken(ctx context.Context, token string) (*ActionRunnerToken, error) { + var runnerToken ActionRunnerToken + has, err := db.GetEngine(ctx).Where("token=?", token).Get(&runnerToken) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("runner token %q: %w", token, util.ErrNotExist) + } + return &runnerToken, nil +} + +// UpdateRunnerToken updates runner token information. +func UpdateRunnerToken(ctx context.Context, r *ActionRunnerToken, cols ...string) (err error) { + e := db.GetEngine(ctx) + + if len(cols) == 0 { + _, err = e.ID(r.ID).AllCols().Update(r) + } else { + _, err = e.ID(r.ID).Cols(cols...).Update(r) + } + return err +} + +// NewRunnerToken creates a new active runner token and invalidate all old tokens +// ownerID will be ignored and treated as 0 if repoID is non-zero. +func NewRunnerToken(ctx context.Context, ownerID, repoID int64) (*ActionRunnerToken, error) { + if ownerID != 0 && repoID != 0 { + // It's trying to create a runner token that belongs to a repository, but OwnerID has been set accidentally. + // Remove OwnerID to avoid confusion; it's not worth returning an error here. + ownerID = 0 + } + + token, err := util.CryptoRandomString(40) + if err != nil { + return nil, err + } + runnerToken := &ActionRunnerToken{ + OwnerID: ownerID, + RepoID: repoID, + IsActive: true, + Token: token, + } + + return runnerToken, db.WithTx(ctx, func(ctx context.Context) error { + if _, err := db.GetEngine(ctx).Where("owner_id =? AND repo_id = ?", ownerID, repoID).Cols("is_active").Update(&ActionRunnerToken{ + IsActive: false, + }); err != nil { + return err + } + + _, err = db.GetEngine(ctx).Insert(runnerToken) + return err + }) +} + +// GetLatestRunnerToken returns the latest runner token +func GetLatestRunnerToken(ctx context.Context, ownerID, repoID int64) (*ActionRunnerToken, error) { + if ownerID != 0 && repoID != 0 { + // It's trying to get a runner token that belongs to a repository, but OwnerID has been set accidentally. + // Remove OwnerID to avoid confusion; it's not worth returning an error here. + ownerID = 0 + } + + var runnerToken ActionRunnerToken + has, err := db.GetEngine(ctx).Where("owner_id=? AND repo_id=?", ownerID, repoID). + OrderBy("id DESC").Get(&runnerToken) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("runner token: %w", util.ErrNotExist) + } + return &runnerToken, nil +} diff --git a/models/actions/runner_token_test.go b/models/actions/runner_token_test.go new file mode 100644 index 0000000..35c9a9d --- /dev/null +++ b/models/actions/runner_token_test.go @@ -0,0 +1,41 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "testing" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unittest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetLatestRunnerToken(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + token := unittest.AssertExistsAndLoadBean(t, &ActionRunnerToken{ID: 3}) + expectedToken, err := GetLatestRunnerToken(db.DefaultContext, 1, 0) + require.NoError(t, err) + assert.EqualValues(t, expectedToken, token) +} + +func TestNewRunnerToken(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + token, err := NewRunnerToken(db.DefaultContext, 1, 0) + require.NoError(t, err) + expectedToken, err := GetLatestRunnerToken(db.DefaultContext, 1, 0) + require.NoError(t, err) + assert.EqualValues(t, expectedToken, token) +} + +func TestUpdateRunnerToken(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + token := unittest.AssertExistsAndLoadBean(t, &ActionRunnerToken{ID: 3}) + token.IsActive = true + require.NoError(t, UpdateRunnerToken(db.DefaultContext, token)) + expectedToken, err := GetLatestRunnerToken(db.DefaultContext, 1, 0) + require.NoError(t, err) + assert.EqualValues(t, expectedToken, token) +} diff --git a/models/actions/schedule.go b/models/actions/schedule.go new file mode 100644 index 0000000..acb9961 --- /dev/null +++ b/models/actions/schedule.go @@ -0,0 +1,140 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "time" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/timeutil" + webhook_module "code.gitea.io/gitea/modules/webhook" +) + +// ActionSchedule represents a schedule of a workflow file +type ActionSchedule struct { + ID int64 + Title string + Specs []string + RepoID int64 `xorm:"index"` + Repo *repo_model.Repository `xorm:"-"` + OwnerID int64 `xorm:"index"` + WorkflowID string + TriggerUserID int64 + TriggerUser *user_model.User `xorm:"-"` + Ref string + CommitSHA string + Event webhook_module.HookEventType + EventPayload string `xorm:"LONGTEXT"` + Content []byte + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` +} + +func init() { + db.RegisterModel(new(ActionSchedule)) +} + +// GetSchedulesMapByIDs returns the schedules by given id slice. +func GetSchedulesMapByIDs(ctx context.Context, ids []int64) (map[int64]*ActionSchedule, error) { + schedules := make(map[int64]*ActionSchedule, len(ids)) + return schedules, db.GetEngine(ctx).In("id", ids).Find(&schedules) +} + +// GetReposMapByIDs returns the repos by given id slice. +func GetReposMapByIDs(ctx context.Context, ids []int64) (map[int64]*repo_model.Repository, error) { + repos := make(map[int64]*repo_model.Repository, len(ids)) + return repos, db.GetEngine(ctx).In("id", ids).Find(&repos) +} + +// CreateScheduleTask creates new schedule task. +func CreateScheduleTask(ctx context.Context, rows []*ActionSchedule) error { + // Return early if there are no rows to insert + if len(rows) == 0 { + return nil + } + + // Begin transaction + ctx, committer, err := db.TxContext(ctx) + if err != nil { + return err + } + defer committer.Close() + + // Loop through each schedule row + for _, row := range rows { + // Create new schedule row + if err = db.Insert(ctx, row); err != nil { + return err + } + + // Loop through each schedule spec and create a new spec row + now := time.Now() + + for _, spec := range row.Specs { + specRow := &ActionScheduleSpec{ + RepoID: row.RepoID, + ScheduleID: row.ID, + Spec: spec, + } + // Parse the spec and check for errors + schedule, err := specRow.Parse() + if err != nil { + continue // skip to the next spec if there's an error + } + + specRow.Next = timeutil.TimeStamp(schedule.Next(now).Unix()) + + // Insert the new schedule spec row + if err = db.Insert(ctx, specRow); err != nil { + return err + } + } + } + + // Commit transaction + return committer.Commit() +} + +func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error { + ctx, committer, err := db.TxContext(ctx) + if err != nil { + return err + } + defer committer.Close() + + if _, err := db.GetEngine(ctx).Delete(&ActionSchedule{RepoID: id}); err != nil { + return err + } + + if _, err := db.GetEngine(ctx).Delete(&ActionScheduleSpec{RepoID: id}); err != nil { + return err + } + + return committer.Commit() +} + +func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error { + // If actions disabled when there is schedule task, this will remove the outdated schedule tasks + // There is no other place we can do this because the app.ini will be changed manually + if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil { + return fmt.Errorf("DeleteCronTaskByRepo: %v", err) + } + if cancelPreviousJobs { + // cancel running cron jobs of this repository and delete old schedules + if err := CancelPreviousJobs( + ctx, + repo.ID, + repo.DefaultBranch, + "", + webhook_module.HookEventSchedule, + ); err != nil { + return fmt.Errorf("CancelPreviousJobs: %v", err) + } + } + return nil +} diff --git a/models/actions/schedule_list.go b/models/actions/schedule_list.go new file mode 100644 index 0000000..5361b94 --- /dev/null +++ b/models/actions/schedule_list.go @@ -0,0 +1,83 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/container" + + "xorm.io/builder" +) + +type ScheduleList []*ActionSchedule + +// GetUserIDs returns a slice of user's id +func (schedules ScheduleList) GetUserIDs() []int64 { + return container.FilterSlice(schedules, func(schedule *ActionSchedule) (int64, bool) { + return schedule.TriggerUserID, true + }) +} + +func (schedules ScheduleList) GetRepoIDs() []int64 { + return container.FilterSlice(schedules, func(schedule *ActionSchedule) (int64, bool) { + return schedule.RepoID, true + }) +} + +func (schedules ScheduleList) LoadTriggerUser(ctx context.Context) error { + userIDs := schedules.GetUserIDs() + users := make(map[int64]*user_model.User, len(userIDs)) + if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil { + return err + } + for _, schedule := range schedules { + if schedule.TriggerUserID == user_model.ActionsUserID { + schedule.TriggerUser = user_model.NewActionsUser() + } else { + schedule.TriggerUser = users[schedule.TriggerUserID] + if schedule.TriggerUser == nil { + schedule.TriggerUser = user_model.NewGhostUser() + } + } + } + return nil +} + +func (schedules ScheduleList) LoadRepos(ctx context.Context) error { + repoIDs := schedules.GetRepoIDs() + repos, err := repo_model.GetRepositoriesMapByIDs(ctx, repoIDs) + if err != nil { + return err + } + for _, schedule := range schedules { + schedule.Repo = repos[schedule.RepoID] + } + return nil +} + +type FindScheduleOptions struct { + db.ListOptions + RepoID int64 + OwnerID int64 +} + +func (opts FindScheduleOptions) ToConds() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + if opts.OwnerID > 0 { + cond = cond.And(builder.Eq{"owner_id": opts.OwnerID}) + } + + return cond +} + +func (opts FindScheduleOptions) ToOrders() string { + return "`id` DESC" +} diff --git a/models/actions/schedule_spec.go b/models/actions/schedule_spec.go new file mode 100644 index 0000000..923e5f7 --- /dev/null +++ b/models/actions/schedule_spec.go @@ -0,0 +1,73 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "strings" + "time" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/timeutil" + + "github.com/robfig/cron/v3" +) + +// ActionScheduleSpec represents a schedule spec of a workflow file +type ActionScheduleSpec struct { + ID int64 + RepoID int64 `xorm:"index"` + Repo *repo_model.Repository `xorm:"-"` + ScheduleID int64 `xorm:"index"` + Schedule *ActionSchedule `xorm:"-"` + + // Next time the job will run, or the zero time if Cron has not been + // started or this entry's schedule is unsatisfiable + Next timeutil.TimeStamp `xorm:"index"` + // Prev is the last time this job was run, or the zero time if never. + Prev timeutil.TimeStamp + Spec string + + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` +} + +// Parse parses the spec and returns a cron.Schedule +// Unlike the default cron parser, Parse uses UTC timezone as the default if none is specified. +func (s *ActionScheduleSpec) Parse() (cron.Schedule, error) { + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + schedule, err := parser.Parse(s.Spec) + if err != nil { + return nil, err + } + + // If the spec has specified a timezone, use it + if strings.HasPrefix(s.Spec, "TZ=") || strings.HasPrefix(s.Spec, "CRON_TZ=") { + return schedule, nil + } + + specSchedule, ok := schedule.(*cron.SpecSchedule) + // If it's not a spec schedule, like "@every 5m", timezone is not relevant + if !ok { + return schedule, nil + } + + // Set the timezone to UTC + specSchedule.Location = time.UTC + return specSchedule, nil +} + +func init() { + db.RegisterModel(new(ActionScheduleSpec)) +} + +func UpdateScheduleSpec(ctx context.Context, spec *ActionScheduleSpec, cols ...string) error { + sess := db.GetEngine(ctx).ID(spec.ID) + if len(cols) > 0 { + sess.Cols(cols...) + } + _, err := sess.Update(spec) + return err +} diff --git a/models/actions/schedule_spec_list.go b/models/actions/schedule_spec_list.go new file mode 100644 index 0000000..4dc43f9 --- /dev/null +++ b/models/actions/schedule_spec_list.go @@ -0,0 +1,105 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/container" + + "xorm.io/builder" +) + +type SpecList []*ActionScheduleSpec + +func (specs SpecList) GetScheduleIDs() []int64 { + return container.FilterSlice(specs, func(spec *ActionScheduleSpec) (int64, bool) { + return spec.ScheduleID, true + }) +} + +func (specs SpecList) LoadSchedules(ctx context.Context) error { + if len(specs) == 0 { + return nil + } + + scheduleIDs := specs.GetScheduleIDs() + schedules, err := GetSchedulesMapByIDs(ctx, scheduleIDs) + if err != nil { + return err + } + for _, spec := range specs { + spec.Schedule = schedules[spec.ScheduleID] + } + + repoIDs := specs.GetRepoIDs() + repos, err := GetReposMapByIDs(ctx, repoIDs) + if err != nil { + return err + } + for _, spec := range specs { + spec.Repo = repos[spec.RepoID] + } + + return nil +} + +func (specs SpecList) GetRepoIDs() []int64 { + return container.FilterSlice(specs, func(spec *ActionScheduleSpec) (int64, bool) { + return spec.RepoID, true + }) +} + +func (specs SpecList) LoadRepos(ctx context.Context) error { + if len(specs) == 0 { + return nil + } + + repoIDs := specs.GetRepoIDs() + repos, err := repo_model.GetRepositoriesMapByIDs(ctx, repoIDs) + if err != nil { + return err + } + for _, spec := range specs { + spec.Repo = repos[spec.RepoID] + } + return nil +} + +type FindSpecOptions struct { + db.ListOptions + RepoID int64 + Next int64 +} + +func (opts FindSpecOptions) ToConds() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + + if opts.Next > 0 { + cond = cond.And(builder.Lte{"next": opts.Next}) + } + + return cond +} + +func (opts FindSpecOptions) ToOrders() string { + return "`id` DESC" +} + +func FindSpecs(ctx context.Context, opts FindSpecOptions) (SpecList, int64, error) { + specs, total, err := db.FindAndCount[ActionScheduleSpec](ctx, opts) + if err != nil { + return nil, 0, err + } + + if err := SpecList(specs).LoadSchedules(ctx); err != nil { + return nil, 0, err + } + return specs, total, nil +} diff --git a/models/actions/schedule_spec_test.go b/models/actions/schedule_spec_test.go new file mode 100644 index 0000000..0c26fce --- /dev/null +++ b/models/actions/schedule_spec_test.go @@ -0,0 +1,71 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestActionScheduleSpec_Parse(t *testing.T) { + // Mock the local timezone is not UTC + local := time.Local + tz, err := time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + defer func() { + time.Local = local + }() + time.Local = tz + + now, err := time.Parse(time.RFC3339, "2024-07-31T15:47:55+08:00") + require.NoError(t, err) + + tests := []struct { + name string + spec string + want string + wantErr assert.ErrorAssertionFunc + }{ + { + name: "regular", + spec: "0 10 * * *", + want: "2024-07-31T10:00:00Z", + wantErr: assert.NoError, + }, + { + name: "invalid", + spec: "0 10 * *", + want: "", + wantErr: assert.Error, + }, + { + name: "with timezone", + spec: "TZ=America/New_York 0 10 * * *", + want: "2024-07-31T14:00:00Z", + wantErr: assert.NoError, + }, + { + name: "timezone irrelevant", + spec: "@every 5m", + want: "2024-07-31T07:52:55Z", + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &ActionScheduleSpec{ + Spec: tt.spec, + } + got, err := s.Parse() + tt.wantErr(t, err) + + if err == nil { + assert.Equal(t, tt.want, got.Next(now).UTC().Format(time.RFC3339)) + } + }) + } +} diff --git a/models/actions/status.go b/models/actions/status.go new file mode 100644 index 0000000..eda2234 --- /dev/null +++ b/models/actions/status.go @@ -0,0 +1,104 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "code.gitea.io/gitea/modules/translation" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" +) + +// Status represents the status of ActionRun, ActionRunJob, ActionTask, or ActionTaskStep +type Status int + +const ( + StatusUnknown Status = iota // 0, consistent with runnerv1.Result_RESULT_UNSPECIFIED + StatusSuccess // 1, consistent with runnerv1.Result_RESULT_SUCCESS + StatusFailure // 2, consistent with runnerv1.Result_RESULT_FAILURE + StatusCancelled // 3, consistent with runnerv1.Result_RESULT_CANCELLED + StatusSkipped // 4, consistent with runnerv1.Result_RESULT_SKIPPED + StatusWaiting // 5, isn't a runnerv1.Result + StatusRunning // 6, isn't a runnerv1.Result + StatusBlocked // 7, isn't a runnerv1.Result +) + +var statusNames = map[Status]string{ + StatusUnknown: "unknown", + StatusWaiting: "waiting", + StatusRunning: "running", + StatusSuccess: "success", + StatusFailure: "failure", + StatusCancelled: "cancelled", + StatusSkipped: "skipped", + StatusBlocked: "blocked", +} + +// String returns the string name of the Status +func (s Status) String() string { + return statusNames[s] +} + +// LocaleString returns the locale string name of the Status +func (s Status) LocaleString(lang translation.Locale) string { + return lang.TrString("actions.status." + s.String()) +} + +// IsDone returns whether the Status is final +func (s Status) IsDone() bool { + return s.In(StatusSuccess, StatusFailure, StatusCancelled, StatusSkipped) +} + +// HasRun returns whether the Status is a result of running +func (s Status) HasRun() bool { + return s.In(StatusSuccess, StatusFailure) +} + +func (s Status) IsUnknown() bool { + return s == StatusUnknown +} + +func (s Status) IsSuccess() bool { + return s == StatusSuccess +} + +func (s Status) IsFailure() bool { + return s == StatusFailure +} + +func (s Status) IsCancelled() bool { + return s == StatusCancelled +} + +func (s Status) IsSkipped() bool { + return s == StatusSkipped +} + +func (s Status) IsWaiting() bool { + return s == StatusWaiting +} + +func (s Status) IsRunning() bool { + return s == StatusRunning +} + +func (s Status) IsBlocked() bool { + return s == StatusBlocked +} + +// In returns whether s is one of the given statuses +func (s Status) In(statuses ...Status) bool { + for _, v := range statuses { + if s == v { + return true + } + } + return false +} + +func (s Status) AsResult() runnerv1.Result { + if s.IsDone() { + return runnerv1.Result(s) + } + return runnerv1.Result_RESULT_UNSPECIFIED +} diff --git a/models/actions/task.go b/models/actions/task.go new file mode 100644 index 0000000..8d41a63 --- /dev/null +++ b/models/actions/task.go @@ -0,0 +1,527 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "crypto/subtle" + "fmt" + "time" + + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unit" + "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/nektos/act/pkg/jobparser" + "google.golang.org/protobuf/types/known/timestamppb" + "xorm.io/builder" +) + +// ActionTask represents a distribution of job +type ActionTask struct { + ID int64 + JobID int64 + Job *ActionRunJob `xorm:"-"` + Steps []*ActionTaskStep `xorm:"-"` + Attempt int64 + RunnerID int64 `xorm:"index"` + Status Status `xorm:"index"` + Started timeutil.TimeStamp `xorm:"index"` + Stopped timeutil.TimeStamp `xorm:"index(stopped_log_expired)"` + + RepoID int64 `xorm:"index"` + OwnerID int64 `xorm:"index"` + CommitSHA string `xorm:"index"` + IsForkPullRequest bool + + Token string `xorm:"-"` + TokenHash string `xorm:"UNIQUE"` // sha256 of token + TokenSalt string + TokenLastEight string `xorm:"index token_last_eight"` + + LogFilename string // file name of log + LogInStorage bool // read log from database or from storage + LogLength int64 // lines count + LogSize int64 // blob size + LogIndexes LogIndexes `xorm:"LONGBLOB"` // line number to offset + LogExpired bool `xorm:"index(stopped_log_expired)"` // files that are too old will be deleted + + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated index"` +} + +var successfulTokenTaskCache *lru.Cache[string, any] + +func init() { + db.RegisterModel(new(ActionTask), func() error { + if setting.SuccessfulTokensCacheSize > 0 { + var err error + successfulTokenTaskCache, err = lru.New[string, any](setting.SuccessfulTokensCacheSize) + if err != nil { + return fmt.Errorf("unable to allocate Task cache: %v", err) + } + } else { + successfulTokenTaskCache = nil + } + return nil + }) +} + +func (task *ActionTask) Duration() time.Duration { + return calculateDuration(task.Started, task.Stopped, task.Status) +} + +func (task *ActionTask) IsStopped() bool { + return task.Stopped > 0 +} + +func (task *ActionTask) GetRunLink() string { + if task.Job == nil || task.Job.Run == nil { + return "" + } + return task.Job.Run.Link() +} + +func (task *ActionTask) GetCommitLink() string { + if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil { + return "" + } + return task.Job.Run.Repo.CommitLink(task.CommitSHA) +} + +func (task *ActionTask) GetRepoName() string { + if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil { + return "" + } + return task.Job.Run.Repo.FullName() +} + +func (task *ActionTask) GetRepoLink() string { + if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil { + return "" + } + return task.Job.Run.Repo.Link() +} + +func (task *ActionTask) LoadJob(ctx context.Context) error { + if task.Job == nil { + job, err := GetRunJobByID(ctx, task.JobID) + if err != nil { + return err + } + task.Job = job + } + return nil +} + +// LoadAttributes load Job Steps if not loaded +func (task *ActionTask) LoadAttributes(ctx context.Context) error { + if task == nil { + return nil + } + if err := task.LoadJob(ctx); err != nil { + return err + } + + if err := task.Job.LoadAttributes(ctx); err != nil { + return err + } + + if task.Steps == nil { // be careful, an empty slice (not nil) also means loaded + steps, err := GetTaskStepsByTaskID(ctx, task.ID) + if err != nil { + return err + } + task.Steps = steps + } + + return nil +} + +func (task *ActionTask) GenerateToken() (err error) { + task.Token, task.TokenSalt, task.TokenHash, task.TokenLastEight, err = generateSaltedToken() + return err +} + +func GetTaskByID(ctx context.Context, id int64) (*ActionTask, error) { + var task ActionTask + has, err := db.GetEngine(ctx).Where("id=?", id).Get(&task) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("task with id %d: %w", id, util.ErrNotExist) + } + + return &task, nil +} + +func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, error) { + errNotExist := fmt.Errorf("task with token %q: %w", token, util.ErrNotExist) + if token == "" { + return nil, errNotExist + } + // A token is defined as being SHA1 sum these are 40 hexadecimal bytes long + if len(token) != 40 { + return nil, errNotExist + } + for _, x := range []byte(token) { + if x < '0' || (x > '9' && x < 'a') || x > 'f' { + return nil, errNotExist + } + } + + lastEight := token[len(token)-8:] + + if id := getTaskIDFromCache(token); id > 0 { + task := &ActionTask{ + TokenLastEight: lastEight, + } + // Re-get the task from the db in case it has been deleted in the intervening period + has, err := db.GetEngine(ctx).ID(id).Get(task) + if err != nil { + return nil, err + } + if has { + return task, nil + } + successfulTokenTaskCache.Remove(token) + } + + var tasks []*ActionTask + err := db.GetEngine(ctx).Where("token_last_eight = ? AND status = ?", lastEight, StatusRunning).Find(&tasks) + if err != nil { + return nil, err + } else if len(tasks) == 0 { + return nil, errNotExist + } + + for _, t := range tasks { + tempHash := auth_model.HashToken(token, t.TokenSalt) + if subtle.ConstantTimeCompare([]byte(t.TokenHash), []byte(tempHash)) == 1 { + if successfulTokenTaskCache != nil { + successfulTokenTaskCache.Add(token, t.ID) + } + return t, nil + } + } + return nil, errNotExist +} + +func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) { + ctx, commiter, err := db.TxContext(ctx) + if err != nil { + return nil, false, err + } + defer commiter.Close() + + e := db.GetEngine(ctx) + + jobCond := builder.NewCond() + if runner.RepoID != 0 { + jobCond = builder.Eq{"repo_id": runner.RepoID} + } else if runner.OwnerID != 0 { + jobCond = builder.In("repo_id", builder.Select("`repository`.id").From("repository"). + Join("INNER", "repo_unit", "`repository`.id = `repo_unit`.repo_id"). + Where(builder.Eq{"`repository`.owner_id": runner.OwnerID, "`repo_unit`.type": unit.TypeActions})) + } + if jobCond.IsValid() { + jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond)) + } + + var jobs []*ActionRunJob + if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil { + return nil, false, err + } + + // TODO: a more efficient way to filter labels + var job *ActionRunJob + log.Trace("runner labels: %v", runner.AgentLabels) + for _, v := range jobs { + if isSubset(runner.AgentLabels, v.RunsOn) { + job = v + break + } + } + if job == nil { + return nil, false, nil + } + if err := job.LoadAttributes(ctx); err != nil { + return nil, false, err + } + + now := timeutil.TimeStampNow() + job.Attempt++ + job.Started = now + job.Status = StatusRunning + + task := &ActionTask{ + JobID: job.ID, + Attempt: job.Attempt, + RunnerID: runner.ID, + Started: now, + Status: StatusRunning, + RepoID: job.RepoID, + OwnerID: job.OwnerID, + CommitSHA: job.CommitSHA, + IsForkPullRequest: job.IsForkPullRequest, + } + if err := task.GenerateToken(); err != nil { + return nil, false, err + } + + var workflowJob *jobparser.Job + if gots, err := jobparser.Parse(job.WorkflowPayload); err != nil { + return nil, false, fmt.Errorf("parse workflow of job %d: %w", job.ID, err) + } else if len(gots) != 1 { + return nil, false, fmt.Errorf("workflow of job %d: not single workflow", job.ID) + } else { //nolint:revive + _, workflowJob = gots[0].Job() + } + + if _, err := e.Insert(task); err != nil { + return nil, false, err + } + + task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID) + if err := UpdateTask(ctx, task, "log_filename"); err != nil { + return nil, false, err + } + + if len(workflowJob.Steps) > 0 { + steps := make([]*ActionTaskStep, len(workflowJob.Steps)) + for i, v := range workflowJob.Steps { + name, _ := util.SplitStringAtByteN(v.String(), 255) + steps[i] = &ActionTaskStep{ + Name: name, + TaskID: task.ID, + Index: int64(i), + RepoID: task.RepoID, + Status: StatusWaiting, + } + } + if _, err := e.Insert(steps); err != nil { + return nil, false, err + } + task.Steps = steps + } + + job.TaskID = task.ID + if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil { + return nil, false, err + } else if n != 1 { + return nil, false, nil + } + + task.Job = job + + if err := commiter.Commit(); err != nil { + return nil, false, err + } + + return task, true, nil +} + +func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error { + sess := db.GetEngine(ctx).ID(task.ID) + if len(cols) > 0 { + sess.Cols(cols...) + } + _, err := sess.Update(task) + return err +} + +// UpdateTaskByState updates the task by the state. +// It will always update the task if the state is not final, even there is no change. +// So it will update ActionTask.Updated to avoid the task being judged as a zombie task. +func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionTask, error) { + stepStates := map[int64]*runnerv1.StepState{} + for _, v := range state.Steps { + stepStates[v.Id] = v + } + + ctx, commiter, err := db.TxContext(ctx) + if err != nil { + return nil, err + } + defer commiter.Close() + + e := db.GetEngine(ctx) + + task := &ActionTask{} + if has, err := e.ID(state.Id).Get(task); err != nil { + return nil, err + } else if !has { + return nil, util.ErrNotExist + } + + if task.Status.IsDone() { + // the state is final, do nothing + return task, nil + } + + // state.Result is not unspecified means the task is finished + if state.Result != runnerv1.Result_RESULT_UNSPECIFIED { + task.Status = Status(state.Result) + task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix()) + if err := UpdateTask(ctx, task, "status", "stopped"); err != nil { + return nil, err + } + if _, err := UpdateRunJob(ctx, &ActionRunJob{ + ID: task.JobID, + Status: task.Status, + Stopped: task.Stopped, + }, nil); err != nil { + return nil, err + } + } else { + // Force update ActionTask.Updated to avoid the task being judged as a zombie task + task.Updated = timeutil.TimeStampNow() + if err := UpdateTask(ctx, task, "updated"); err != nil { + return nil, err + } + } + + if err := task.LoadAttributes(ctx); err != nil { + return nil, err + } + + for _, step := range task.Steps { + var result runnerv1.Result + if v, ok := stepStates[step.Index]; ok { + result = v.Result + step.LogIndex = v.LogIndex + step.LogLength = v.LogLength + step.Started = convertTimestamp(v.StartedAt) + step.Stopped = convertTimestamp(v.StoppedAt) + } + if result != runnerv1.Result_RESULT_UNSPECIFIED { + step.Status = Status(result) + } else if step.Started != 0 { + step.Status = StatusRunning + } + if _, err := e.ID(step.ID).Update(step); err != nil { + return nil, err + } + } + + if err := commiter.Commit(); err != nil { + return nil, err + } + + return task, nil +} + +func StopTask(ctx context.Context, taskID int64, status Status) error { + if !status.IsDone() { + return fmt.Errorf("cannot stop task with status %v", status) + } + e := db.GetEngine(ctx) + + task := &ActionTask{} + if has, err := e.ID(taskID).Get(task); err != nil { + return err + } else if !has { + return util.ErrNotExist + } + if task.Status.IsDone() { + return nil + } + + now := timeutil.TimeStampNow() + task.Status = status + task.Stopped = now + if _, err := UpdateRunJob(ctx, &ActionRunJob{ + ID: task.JobID, + Status: task.Status, + Stopped: task.Stopped, + }, nil); err != nil { + return err + } + + if err := UpdateTask(ctx, task, "status", "stopped"); err != nil { + return err + } + + if err := task.LoadAttributes(ctx); err != nil { + return err + } + + for _, step := range task.Steps { + if !step.Status.IsDone() { + step.Status = status + if step.Started == 0 { + step.Started = now + } + step.Stopped = now + } + if _, err := e.ID(step.ID).Update(step); err != nil { + return err + } + } + + return nil +} + +func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, limit int) ([]*ActionTask, error) { + e := db.GetEngine(ctx) + + tasks := make([]*ActionTask, 0, limit) + // Check "stopped > 0" to avoid deleting tasks that are still running + return tasks, e.Where("stopped > 0 AND stopped < ? AND log_expired = ?", olderThan, false). + Limit(limit). + Find(&tasks) +} + +func isSubset(set, subset []string) bool { + m := make(container.Set[string], len(set)) + for _, v := range set { + m.Add(v) + } + + for _, v := range subset { + if !m.Contains(v) { + return false + } + } + return true +} + +func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp { + if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 { + return timeutil.TimeStamp(0) + } + return timeutil.TimeStamp(timestamp.AsTime().Unix()) +} + +func logFileName(repoFullName string, taskID int64) string { + ret := fmt.Sprintf("%s/%02x/%d.log", repoFullName, taskID%256, taskID) + + if setting.Actions.LogCompression.IsZstd() { + ret += ".zst" + } + + return ret +} + +func getTaskIDFromCache(token string) int64 { + if successfulTokenTaskCache == nil { + return 0 + } + tInterface, ok := successfulTokenTaskCache.Get(token) + if !ok { + return 0 + } + t, ok := tInterface.(int64) + if !ok { + return 0 + } + return t +} diff --git a/models/actions/task_list.go b/models/actions/task_list.go new file mode 100644 index 0000000..df4b43c --- /dev/null +++ b/models/actions/task_list.go @@ -0,0 +1,87 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/builder" +) + +type TaskList []*ActionTask + +func (tasks TaskList) GetJobIDs() []int64 { + return container.FilterSlice(tasks, func(t *ActionTask) (int64, bool) { + return t.JobID, t.JobID != 0 + }) +} + +func (tasks TaskList) LoadJobs(ctx context.Context) error { + jobIDs := tasks.GetJobIDs() + jobs := make(map[int64]*ActionRunJob, len(jobIDs)) + if err := db.GetEngine(ctx).In("id", jobIDs).Find(&jobs); err != nil { + return err + } + for _, t := range tasks { + if t.JobID > 0 && t.Job == nil { + t.Job = jobs[t.JobID] + } + } + + // TODO: Replace with "ActionJobList(maps.Values(jobs))" once available + var jobsList ActionJobList = make([]*ActionRunJob, 0, len(jobs)) + for _, j := range jobs { + jobsList = append(jobsList, j) + } + return jobsList.LoadAttributes(ctx, true) +} + +func (tasks TaskList) LoadAttributes(ctx context.Context) error { + return tasks.LoadJobs(ctx) +} + +type FindTaskOptions struct { + db.ListOptions + RepoID int64 + OwnerID int64 + CommitSHA string + Status Status + UpdatedBefore timeutil.TimeStamp + StartedBefore timeutil.TimeStamp + RunnerID int64 +} + +func (opts FindTaskOptions) ToConds() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + if opts.OwnerID > 0 { + cond = cond.And(builder.Eq{"owner_id": opts.OwnerID}) + } + if opts.CommitSHA != "" { + cond = cond.And(builder.Eq{"commit_sha": opts.CommitSHA}) + } + if opts.Status > StatusUnknown { + cond = cond.And(builder.Eq{"status": opts.Status}) + } + if opts.UpdatedBefore > 0 { + cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore}) + } + if opts.StartedBefore > 0 { + cond = cond.And(builder.Lt{"started": opts.StartedBefore}) + } + if opts.RunnerID > 0 { + cond = cond.And(builder.Eq{"runner_id": opts.RunnerID}) + } + return cond +} + +func (opts FindTaskOptions) ToOrders() string { + return "`id` DESC" +} diff --git a/models/actions/task_output.go b/models/actions/task_output.go new file mode 100644 index 0000000..eab5b93 --- /dev/null +++ b/models/actions/task_output.go @@ -0,0 +1,55 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" +) + +// ActionTaskOutput represents an output of ActionTask. +// So the outputs are bound to a task, that means when a completed job has been rerun, +// the outputs of the job will be reset because the task is new. +// It's by design, to avoid the outputs of the old task to be mixed with the new task. +type ActionTaskOutput struct { + ID int64 + TaskID int64 `xorm:"INDEX UNIQUE(task_id_output_key)"` + OutputKey string `xorm:"VARCHAR(255) UNIQUE(task_id_output_key)"` + OutputValue string `xorm:"MEDIUMTEXT"` +} + +func init() { + db.RegisterModel(new(ActionTaskOutput)) +} + +// FindTaskOutputByTaskID returns the outputs of the task. +func FindTaskOutputByTaskID(ctx context.Context, taskID int64) ([]*ActionTaskOutput, error) { + var outputs []*ActionTaskOutput + return outputs, db.GetEngine(ctx).Where("task_id=?", taskID).Find(&outputs) +} + +// FindTaskOutputKeyByTaskID returns the keys of the outputs of the task. +func FindTaskOutputKeyByTaskID(ctx context.Context, taskID int64) ([]string, error) { + var keys []string + return keys, db.GetEngine(ctx).Table(ActionTaskOutput{}).Where("task_id=?", taskID).Cols("output_key").Find(&keys) +} + +// InsertTaskOutputIfNotExist inserts a new task output if it does not exist. +func InsertTaskOutputIfNotExist(ctx context.Context, taskID int64, key, value string) error { + return db.WithTx(ctx, func(ctx context.Context) error { + sess := db.GetEngine(ctx) + if exist, err := sess.Exist(&ActionTaskOutput{TaskID: taskID, OutputKey: key}); err != nil { + return err + } else if exist { + return nil + } + _, err := sess.Insert(&ActionTaskOutput{ + TaskID: taskID, + OutputKey: key, + OutputValue: value, + }) + return err + }) +} diff --git a/models/actions/task_step.go b/models/actions/task_step.go new file mode 100644 index 0000000..3af1fe3 --- /dev/null +++ b/models/actions/task_step.go @@ -0,0 +1,41 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "time" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/timeutil" +) + +// ActionTaskStep represents a step of ActionTask +type ActionTaskStep struct { + ID int64 + Name string `xorm:"VARCHAR(255)"` + TaskID int64 `xorm:"index unique(task_index)"` + Index int64 `xorm:"index unique(task_index)"` + RepoID int64 `xorm:"index"` + Status Status `xorm:"index"` + LogIndex int64 + LogLength int64 + Started timeutil.TimeStamp + Stopped timeutil.TimeStamp + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` +} + +func (step *ActionTaskStep) Duration() time.Duration { + return calculateDuration(step.Started, step.Stopped, step.Status) +} + +func init() { + db.RegisterModel(new(ActionTaskStep)) +} + +func GetTaskStepsByTaskID(ctx context.Context, taskID int64) ([]*ActionTaskStep, error) { + var steps []*ActionTaskStep + return steps, db.GetEngine(ctx).Where("task_id=?", taskID).OrderBy("`index` ASC").Find(&steps) +} diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go new file mode 100644 index 0000000..d8df353 --- /dev/null +++ b/models/actions/tasks_version.go @@ -0,0 +1,105 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/timeutil" +) + +// ActionTasksVersion +// If both ownerID and repoID is zero, its scope is global. +// If ownerID is not zero and repoID is zero, its scope is org (there is no user-level runner currently). +// If ownerID is zero and repoID is not zero, its scope is repo. +type ActionTasksVersion struct { + ID int64 `xorm:"pk autoincr"` + OwnerID int64 `xorm:"UNIQUE(owner_repo)"` + RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"` + Version int64 + CreatedUnix timeutil.TimeStamp `xorm:"created"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` +} + +func init() { + db.RegisterModel(new(ActionTasksVersion)) +} + +func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (int64, error) { + var tasksVersion ActionTasksVersion + has, err := db.GetEngine(ctx).Where("owner_id = ? AND repo_id = ?", ownerID, repoID).Get(&tasksVersion) + if err != nil { + return 0, err + } else if !has { + return 0, nil + } + return tasksVersion.Version, err +} + +func insertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) { + tasksVersion := &ActionTasksVersion{ + OwnerID: ownerID, + RepoID: repoID, + Version: 1, + } + if _, err := db.GetEngine(ctx).Insert(tasksVersion); err != nil { + return nil, err + } + return tasksVersion, nil +} + +func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) error { + result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID) + if err != nil { + return err + } + affected, err := result.RowsAffected() + if err != nil { + return err + } + + if affected == 0 { + // if update sql does not affect any rows, the database may be broken, + // so re-insert the row of version data here. + if _, err := insertTasksVersion(ctx, ownerID, repoID); err != nil { + return err + } + } + + return nil +} + +func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { + ctx, commiter, err := db.TxContext(ctx) + if err != nil { + return err + } + defer commiter.Close() + + // 1. increase global + if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Global): %v", err) + return err + } + + // 2. increase owner + if ownerID > 0 { + if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Owner): %v", err) + return err + } + } + + // 3. increase repo + if repoID > 0 { + if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { + log.Error("IncreaseTasksVersionByScope(Repo): %v", err) + return err + } + } + + return commiter.Commit() +} diff --git a/models/actions/utils.go b/models/actions/utils.go new file mode 100644 index 0000000..1265794 --- /dev/null +++ b/models/actions/utils.go @@ -0,0 +1,84 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "io" + "time" + + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" +) + +func generateSaltedToken() (string, string, string, string, error) { + salt, err := util.CryptoRandomString(10) + if err != nil { + return "", "", "", "", err + } + buf, err := util.CryptoRandomBytes(20) + if err != nil { + return "", "", "", "", err + } + token := hex.EncodeToString(buf) + hash := auth_model.HashToken(token, salt) + return token, salt, hash, token[len(token)-8:], nil +} + +/* +LogIndexes is the index for mapping log line number to buffer offset. +Because it uses varint encoding, it is impossible to predict its size. +But we can make a simple estimate with an assumption that each log line has 200 byte, then: +| lines | file size | index size | +|-----------|---------------------|--------------------| +| 100 | 20 KiB(20000) | 258 B(258) | +| 1000 | 195 KiB(200000) | 2.9 KiB(2958) | +| 10000 | 1.9 MiB(2000000) | 34 KiB(34715) | +| 100000 | 19 MiB(20000000) | 386 KiB(394715) | +| 1000000 | 191 MiB(200000000) | 4.1 MiB(4323626) | +| 10000000 | 1.9 GiB(2000000000) | 47 MiB(49323626) | +| 100000000 | 19 GiB(20000000000) | 490 MiB(513424280) | +*/ +type LogIndexes []int64 + +func (indexes *LogIndexes) FromDB(b []byte) error { + reader := bytes.NewReader(b) + for { + v, err := binary.ReadVarint(reader) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("binary ReadVarint: %w", err) + } + *indexes = append(*indexes, v) + } +} + +func (indexes *LogIndexes) ToDB() ([]byte, error) { + buf, i := make([]byte, binary.MaxVarintLen64*len(*indexes)), 0 + for _, v := range *indexes { + n := binary.PutVarint(buf[i:], v) + i += n + } + return buf[:i], nil +} + +var timeSince = time.Since + +func calculateDuration(started, stopped timeutil.TimeStamp, status Status) time.Duration { + if started == 0 { + return 0 + } + s := started.AsTime() + if status.IsDone() { + return stopped.AsTime().Sub(s) + } + return timeSince(s).Truncate(time.Second) +} diff --git a/models/actions/utils_test.go b/models/actions/utils_test.go new file mode 100644 index 0000000..98c048d --- /dev/null +++ b/models/actions/utils_test.go @@ -0,0 +1,90 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "math" + "testing" + "time" + + "code.gitea.io/gitea/modules/timeutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLogIndexes_ToDB(t *testing.T) { + tests := []struct { + indexes LogIndexes + }{ + { + indexes: []int64{1, 2, 0, -1, -2, math.MaxInt64, math.MinInt64}, + }, + } + for _, tt := range tests { + t.Run("", func(t *testing.T) { + got, err := tt.indexes.ToDB() + require.NoError(t, err) + + indexes := LogIndexes{} + require.NoError(t, indexes.FromDB(got)) + + assert.Equal(t, tt.indexes, indexes) + }) + } +} + +func Test_calculateDuration(t *testing.T) { + oldTimeSince := timeSince + defer func() { + timeSince = oldTimeSince + }() + + timeSince = func(t time.Time) time.Duration { + return timeutil.TimeStamp(1000).AsTime().Sub(t) + } + type args struct { + started timeutil.TimeStamp + stopped timeutil.TimeStamp + status Status + } + tests := []struct { + name string + args args + want time.Duration + }{ + { + name: "unknown", + args: args{ + started: 0, + stopped: 0, + status: StatusUnknown, + }, + want: 0, + }, + { + name: "running", + args: args{ + started: 500, + stopped: 0, + status: StatusRunning, + }, + want: 500 * time.Second, + }, + { + name: "done", + args: args{ + started: 500, + stopped: 600, + status: StatusSuccess, + }, + want: 100 * time.Second, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, calculateDuration(tt.args.started, tt.args.stopped, tt.args.status), "calculateDuration(%v, %v, %v)", tt.args.started, tt.args.stopped, tt.args.status) + }) + } +} diff --git a/models/actions/variable.go b/models/actions/variable.go new file mode 100644 index 0000000..d0f917d --- /dev/null +++ b/models/actions/variable.go @@ -0,0 +1,139 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "strings" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/builder" +) + +// ActionVariable represents a variable that can be used in actions +// +// It can be: +// 1. global variable, OwnerID is 0 and RepoID is 0 +// 2. org/user level variable, OwnerID is org/user ID and RepoID is 0 +// 3. repo level variable, OwnerID is 0 and RepoID is repo ID +// +// Please note that it's not acceptable to have both OwnerID and RepoID to be non-zero, +// or it will be complicated to find variables belonging to a specific owner. +// For example, conditions like `OwnerID = 1` will also return variable {OwnerID: 1, RepoID: 1}, +// but it's a repo level variable, not an org/user level variable. +// To avoid this, make it clear with {OwnerID: 0, RepoID: 1} for repo level variables. +type ActionVariable struct { + ID int64 `xorm:"pk autoincr"` + OwnerID int64 `xorm:"UNIQUE(owner_repo_name)"` + RepoID int64 `xorm:"INDEX UNIQUE(owner_repo_name)"` + Name string `xorm:"UNIQUE(owner_repo_name) NOT NULL"` + Data string `xorm:"LONGTEXT NOT NULL"` + CreatedUnix timeutil.TimeStamp `xorm:"created NOT NULL"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` +} + +func init() { + db.RegisterModel(new(ActionVariable)) +} + +func InsertVariable(ctx context.Context, ownerID, repoID int64, name, data string) (*ActionVariable, error) { + if ownerID != 0 && repoID != 0 { + // It's trying to create a variable that belongs to a repository, but OwnerID has been set accidentally. + // Remove OwnerID to avoid confusion; it's not worth returning an error here. + ownerID = 0 + } + + variable := &ActionVariable{ + OwnerID: ownerID, + RepoID: repoID, + Name: strings.ToUpper(name), + Data: data, + } + return variable, db.Insert(ctx, variable) +} + +type FindVariablesOpts struct { + db.ListOptions + RepoID int64 + OwnerID int64 // it will be ignored if RepoID is set + Name string +} + +func (opts FindVariablesOpts) ToConds() builder.Cond { + cond := builder.NewCond() + // Since we now support instance-level variables, + // there is no need to check for null values for `owner_id` and `repo_id` + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + if opts.RepoID != 0 { // if RepoID is set + // ignore OwnerID and treat it as 0 + cond = cond.And(builder.Eq{"owner_id": 0}) + } else { + cond = cond.And(builder.Eq{"owner_id": opts.OwnerID}) + } + + if opts.Name != "" { + cond = cond.And(builder.Eq{"name": strings.ToUpper(opts.Name)}) + } + return cond +} + +func FindVariables(ctx context.Context, opts FindVariablesOpts) ([]*ActionVariable, error) { + return db.Find[ActionVariable](ctx, opts) +} + +func UpdateVariable(ctx context.Context, variable *ActionVariable) (bool, error) { + count, err := db.GetEngine(ctx).ID(variable.ID).Cols("name", "data"). + Update(&ActionVariable{ + Name: variable.Name, + Data: variable.Data, + }) + return count != 0, err +} + +func DeleteVariable(ctx context.Context, id int64) error { + if _, err := db.DeleteByID[ActionVariable](ctx, id); err != nil { + return err + } + return nil +} + +func GetVariablesOfRun(ctx context.Context, run *ActionRun) (map[string]string, error) { + variables := map[string]string{} + + if err := run.LoadRepo(ctx); err != nil { + log.Error("LoadRepo: %v", err) + return nil, err + } + + // Global + globalVariables, err := db.Find[ActionVariable](ctx, FindVariablesOpts{}) + if err != nil { + log.Error("find global variables: %v", err) + return nil, err + } + + // Org / User level + ownerVariables, err := db.Find[ActionVariable](ctx, FindVariablesOpts{OwnerID: run.Repo.OwnerID}) + if err != nil { + log.Error("find variables of org: %d, error: %v", run.Repo.OwnerID, err) + return nil, err + } + + // Repo level + repoVariables, err := db.Find[ActionVariable](ctx, FindVariablesOpts{RepoID: run.RepoID}) + if err != nil { + log.Error("find variables of repo: %d, error: %v", run.RepoID, err) + return nil, err + } + + // Level precedence: Repo > Org / User > Global + for _, v := range append(globalVariables, append(ownerVariables, repoVariables...)...) { + variables[v.Name] = v.Data + } + + return variables, nil +} |