diff options
author | Daniel Baumann <daniel@debian.org> | 2024-10-20 22:50:50 +0200 |
---|---|---|
committer | Daniel Baumann <daniel@debian.org> | 2024-10-20 22:50:50 +0200 |
commit | 9fa26b7837ed8e6679b7e6115425cab6ecbc9a8a (patch) | |
tree | c5b6f218ae267153042529217fdabeac4849ca1e /internal/app/run | |
parent | Initial commit. (diff) | |
download | forgejo-runner-debian.tar.xz forgejo-runner-debian.zip |
Adding upstream version 3.5.1.HEADupstream/3.5.1upstreamdebian
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to 'internal/app/run')
-rw-r--r-- | internal/app/run/runner.go | 260 | ||||
-rw-r--r-- | internal/app/run/runner_test.go | 37 | ||||
-rw-r--r-- | internal/app/run/workflow.go | 54 | ||||
-rw-r--r-- | internal/app/run/workflow_test.go | 74 |
4 files changed, 425 insertions, 0 deletions
diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go new file mode 100644 index 0000000..e8654b6 --- /dev/null +++ b/internal/app/run/runner.go @@ -0,0 +1,260 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "context" + "encoding/json" + "fmt" + "path/filepath" + "strings" + "sync" + "time" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "connectrpc.com/connect" + "github.com/docker/docker/api/types/container" + "github.com/nektos/act/pkg/artifactcache" + "github.com/nektos/act/pkg/common" + "github.com/nektos/act/pkg/model" + "github.com/nektos/act/pkg/runner" + log "github.com/sirupsen/logrus" + + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/report" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +// Runner runs the pipeline. +type Runner struct { + name string + + cfg *config.Config + + client client.Client + labels labels.Labels + envs map[string]string + + runningTasks sync.Map +} + +type RunnerInterface interface { + Run(ctx context.Context, task *runnerv1.Task) error +} + +func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client) *Runner { + ls := labels.Labels{} + for _, v := range reg.Labels { + if l, err := labels.Parse(v); err == nil { + ls = append(ls, l) + } + } + + if cfg.Runner.Envs == nil { + cfg.Runner.Envs = make(map[string]string, 10) + } + + cfg.Runner.Envs["GITHUB_SERVER_URL"] = reg.Address + + envs := make(map[string]string, len(cfg.Runner.Envs)) + for k, v := range cfg.Runner.Envs { + envs[k] = v + } + if cfg.Cache.Enabled == nil || *cfg.Cache.Enabled { + if cfg.Cache.ExternalServer != "" { + envs["ACTIONS_CACHE_URL"] = cfg.Cache.ExternalServer + } else { + cacheHandler, err := artifactcache.StartHandler( + cfg.Cache.Dir, + cfg.Cache.Host, + cfg.Cache.Port, + log.StandardLogger().WithField("module", "cache_request"), + ) + if err != nil { + log.Errorf("cannot init cache server, it will be disabled: %v", err) + // go on + } else { + envs["ACTIONS_CACHE_URL"] = cacheHandler.ExternalURL() + "/" + } + } + } + + // set artifact gitea api + artifactGiteaAPI := strings.TrimSuffix(cli.Address(), "/") + "/api/actions_pipeline/" + envs["ACTIONS_RUNTIME_URL"] = artifactGiteaAPI + envs["ACTIONS_RESULTS_URL"] = strings.TrimSuffix(cli.Address(), "/") + + // Set specific environments to distinguish between Gitea and GitHub + envs["GITEA_ACTIONS"] = "true" + envs["GITEA_ACTIONS_RUNNER_VERSION"] = ver.Version() + + return &Runner{ + name: reg.Name, + cfg: cfg, + client: cli, + labels: ls, + envs: envs, + } +} + +func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { + if _, ok := r.runningTasks.Load(task.Id); ok { + return fmt.Errorf("task %d is already running", task.Id) + } + r.runningTasks.Store(task.Id, struct{}{}) + defer r.runningTasks.Delete(task.Id) + + ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout) + defer cancel() + reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval) + var runErr error + defer func() { + lastWords := "" + if runErr != nil { + lastWords = runErr.Error() + } + _ = reporter.Close(lastWords) + }() + reporter.RunDaemon() + runErr = r.run(ctx, task, reporter) + + return nil +} + +func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.Reporter) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue()) + + workflow, jobID, err := generateWorkflow(task) + if err != nil { + return err + } + + plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID) + if err != nil { + return err + } + job := workflow.GetJob(jobID) + reporter.ResetSteps(len(job.Steps)) + + taskContext := task.Context.Fields + + log.Infof("task %v repo is %v %v %v", task.Id, taskContext["repository"].GetStringValue(), + taskContext["gitea_default_actions_url"].GetStringValue(), + r.client.Address()) + + preset := &model.GithubContext{ + Event: taskContext["event"].GetStructValue().AsMap(), + RunID: taskContext["run_id"].GetStringValue(), + RunNumber: taskContext["run_number"].GetStringValue(), + Actor: taskContext["actor"].GetStringValue(), + Repository: taskContext["repository"].GetStringValue(), + EventName: taskContext["event_name"].GetStringValue(), + Sha: taskContext["sha"].GetStringValue(), + Ref: taskContext["ref"].GetStringValue(), + RefName: taskContext["ref_name"].GetStringValue(), + RefType: taskContext["ref_type"].GetStringValue(), + HeadRef: taskContext["head_ref"].GetStringValue(), + BaseRef: taskContext["base_ref"].GetStringValue(), + Token: taskContext["token"].GetStringValue(), + RepositoryOwner: taskContext["repository_owner"].GetStringValue(), + RetentionDays: taskContext["retention_days"].GetStringValue(), + } + if t := task.Secrets["GITEA_TOKEN"]; t != "" { + preset.Token = t + } else if t := task.Secrets["GITHUB_TOKEN"]; t != "" { + preset.Token = t + } + + giteaRuntimeToken := taskContext["gitea_runtime_token"].GetStringValue() + if giteaRuntimeToken == "" { + // use task token to action api token for previous Gitea Server Versions + giteaRuntimeToken = preset.Token + } + r.envs["ACTIONS_RUNTIME_TOKEN"] = giteaRuntimeToken + + eventJSON, err := json.Marshal(preset.Event) + if err != nil { + return err + } + + maxLifetime := 3 * time.Hour + if deadline, ok := ctx.Deadline(); ok { + maxLifetime = time.Until(deadline) + } + + var inputs map[string]string + if preset.EventName == "workflow_dispatch" { + if inputsRaw, ok := preset.Event["inputs"]; ok { + inputs, _ = inputsRaw.(map[string]string) + } + } + + runnerConfig := &runner.Config{ + // On Linux, Workdir will be like "/<parent_directory>/<owner>/<repo>" + // On Windows, Workdir will be like "\<parent_directory>\<owner>\<repo>" + Workdir: filepath.FromSlash(filepath.Clean(fmt.Sprintf("/%s/%s", r.cfg.Container.WorkdirParent, preset.Repository))), + BindWorkdir: false, + ActionCacheDir: filepath.FromSlash(r.cfg.Host.WorkdirParent), + + ReuseContainers: false, + ForcePull: r.cfg.Container.ForcePull, + ForceRebuild: false, + LogOutput: true, + JSONLogger: false, + Env: r.envs, + Secrets: task.Secrets, + GitHubInstance: strings.TrimSuffix(r.client.Address(), "/"), + AutoRemove: true, + NoSkipCheckout: true, + PresetGitHubContext: preset, + EventJSON: string(eventJSON), + ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id), + ContainerMaxLifetime: maxLifetime, + ContainerNetworkMode: container.NetworkMode(r.cfg.Container.Network), + ContainerNetworkEnableIPv6: r.cfg.Container.EnableIPv6, + ContainerOptions: r.cfg.Container.Options, + ContainerDaemonSocket: r.cfg.Container.DockerHost, + Privileged: r.cfg.Container.Privileged, + DefaultActionInstance: taskContext["gitea_default_actions_url"].GetStringValue(), + PlatformPicker: r.labels.PickPlatform, + Vars: task.Vars, + ValidVolumes: r.cfg.Container.ValidVolumes, + InsecureSkipTLS: r.cfg.Runner.Insecure, + Inputs: inputs, + } + + rr, err := runner.New(runnerConfig) + if err != nil { + return err + } + executor := rr.NewPlanExecutor(plan) + + reporter.Logf("workflow prepared") + + // add logger recorders + ctx = common.WithLoggerHook(ctx, reporter) + + execErr := executor(ctx) + reporter.SetOutputs(job.Outputs) + return execErr +} + +func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) { + return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{ + Version: ver.Version(), + Labels: labels, + })) +} + +func (r *Runner) Update(ctx context.Context, labels labels.Labels) { + r.labels = labels +} diff --git a/internal/app/run/runner_test.go b/internal/app/run/runner_test.go new file mode 100644 index 0000000..0145c70 --- /dev/null +++ b/internal/app/run/runner_test.go @@ -0,0 +1,37 @@ +package run + +import ( + "context" + "testing" + + "gitea.com/gitea/act_runner/internal/pkg/labels" + "github.com/stretchr/testify/assert" +) + +func TestLabelUpdate(t *testing.T) { + ctx := context.Background() + ls := labels.Labels{} + + initialLabel, err := labels.Parse("testlabel:docker://alpine") + assert.NoError(t, err) + ls = append(ls, initialLabel) + + newLs := labels.Labels{} + + newLabel, err := labels.Parse("next label:host") + assert.NoError(t, err) + newLs = append(newLs, initialLabel) + newLs = append(newLs, newLabel) + + runner := Runner{ + labels: ls, + } + + assert.Contains(t, runner.labels, initialLabel) + assert.NotContains(t, runner.labels, newLabel) + + runner.Update(ctx, newLs) + + assert.Contains(t, runner.labels, initialLabel) + assert.Contains(t, runner.labels, newLabel) +} diff --git a/internal/app/run/workflow.go b/internal/app/run/workflow.go new file mode 100644 index 0000000..a6fbb71 --- /dev/null +++ b/internal/app/run/workflow.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "bytes" + "fmt" + "sort" + "strings" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/nektos/act/pkg/model" + "gopkg.in/yaml.v3" +) + +func generateWorkflow(task *runnerv1.Task) (*model.Workflow, string, error) { + workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload)) + if err != nil { + return nil, "", err + } + + jobIDs := workflow.GetJobIDs() + if len(jobIDs) != 1 { + return nil, "", fmt.Errorf("multiple jobs found: %v", jobIDs) + } + jobID := jobIDs[0] + + needJobIDs := make([]string, 0, len(task.Needs)) + for id, need := range task.Needs { + needJobIDs = append(needJobIDs, id) + needJob := &model.Job{ + Outputs: need.Outputs, + Result: strings.ToLower(strings.TrimPrefix(need.Result.String(), "RESULT_")), + } + workflow.Jobs[id] = needJob + } + sort.Strings(needJobIDs) + + rawNeeds := yaml.Node{ + Kind: yaml.SequenceNode, + Content: make([]*yaml.Node, 0, len(needJobIDs)), + } + for _, id := range needJobIDs { + rawNeeds.Content = append(rawNeeds.Content, &yaml.Node{ + Kind: yaml.ScalarNode, + Value: id, + }) + } + + workflow.Jobs[jobID].RawNeeds = rawNeeds + + return workflow, jobID, nil +} diff --git a/internal/app/run/workflow_test.go b/internal/app/run/workflow_test.go new file mode 100644 index 0000000..c7598db --- /dev/null +++ b/internal/app/run/workflow_test.go @@ -0,0 +1,74 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "testing" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/nektos/act/pkg/model" + "github.com/stretchr/testify/require" + "gotest.tools/v3/assert" +) + +func Test_generateWorkflow(t *testing.T) { + type args struct { + task *runnerv1.Task + } + tests := []struct { + name string + args args + assert func(t *testing.T, wf *model.Workflow) + want1 string + wantErr bool + }{ + { + name: "has needs", + args: args{ + task: &runnerv1.Task{ + WorkflowPayload: []byte(` +name: Build and deploy +on: push + +jobs: + job9: + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: ./deploy --build ${{ needs.job1.outputs.output1 }} + - run: ./deploy --build ${{ needs.job2.outputs.output2 }} +`), + Needs: map[string]*runnerv1.TaskNeed{ + "job1": { + Outputs: map[string]string{ + "output1": "output1 value", + }, + Result: runnerv1.Result_RESULT_SUCCESS, + }, + "job2": { + Outputs: map[string]string{ + "output2": "output2 value", + }, + Result: runnerv1.Result_RESULT_SUCCESS, + }, + }, + }, + }, + assert: func(t *testing.T, wf *model.Workflow) { + assert.DeepEqual(t, wf.GetJob("job9").Needs(), []string{"job1", "job2"}) + }, + want1: "job9", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1, err := generateWorkflow(tt.args.task) + require.NoError(t, err) + tt.assert(t, got) + assert.Equal(t, got1, tt.want1) + }) + } +} |