summaryrefslogtreecommitdiffstats
path: root/internal/app/run
diff options
context:
space:
mode:
Diffstat (limited to 'internal/app/run')
-rw-r--r--internal/app/run/runner.go260
-rw-r--r--internal/app/run/runner_test.go37
-rw-r--r--internal/app/run/workflow.go54
-rw-r--r--internal/app/run/workflow_test.go74
4 files changed, 425 insertions, 0 deletions
diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go
new file mode 100644
index 0000000..e8654b6
--- /dev/null
+++ b/internal/app/run/runner.go
@@ -0,0 +1,260 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package run
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "connectrpc.com/connect"
+ "github.com/docker/docker/api/types/container"
+ "github.com/nektos/act/pkg/artifactcache"
+ "github.com/nektos/act/pkg/common"
+ "github.com/nektos/act/pkg/model"
+ "github.com/nektos/act/pkg/runner"
+ log "github.com/sirupsen/logrus"
+
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+ "gitea.com/gitea/act_runner/internal/pkg/labels"
+ "gitea.com/gitea/act_runner/internal/pkg/report"
+ "gitea.com/gitea/act_runner/internal/pkg/ver"
+)
+
+// Runner runs the pipeline.
+type Runner struct {
+ name string
+
+ cfg *config.Config
+
+ client client.Client
+ labels labels.Labels
+ envs map[string]string
+
+ runningTasks sync.Map
+}
+
+type RunnerInterface interface {
+ Run(ctx context.Context, task *runnerv1.Task) error
+}
+
+func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client) *Runner {
+ ls := labels.Labels{}
+ for _, v := range reg.Labels {
+ if l, err := labels.Parse(v); err == nil {
+ ls = append(ls, l)
+ }
+ }
+
+ if cfg.Runner.Envs == nil {
+ cfg.Runner.Envs = make(map[string]string, 10)
+ }
+
+ cfg.Runner.Envs["GITHUB_SERVER_URL"] = reg.Address
+
+ envs := make(map[string]string, len(cfg.Runner.Envs))
+ for k, v := range cfg.Runner.Envs {
+ envs[k] = v
+ }
+ if cfg.Cache.Enabled == nil || *cfg.Cache.Enabled {
+ if cfg.Cache.ExternalServer != "" {
+ envs["ACTIONS_CACHE_URL"] = cfg.Cache.ExternalServer
+ } else {
+ cacheHandler, err := artifactcache.StartHandler(
+ cfg.Cache.Dir,
+ cfg.Cache.Host,
+ cfg.Cache.Port,
+ log.StandardLogger().WithField("module", "cache_request"),
+ )
+ if err != nil {
+ log.Errorf("cannot init cache server, it will be disabled: %v", err)
+ // go on
+ } else {
+ envs["ACTIONS_CACHE_URL"] = cacheHandler.ExternalURL() + "/"
+ }
+ }
+ }
+
+ // set artifact gitea api
+ artifactGiteaAPI := strings.TrimSuffix(cli.Address(), "/") + "/api/actions_pipeline/"
+ envs["ACTIONS_RUNTIME_URL"] = artifactGiteaAPI
+ envs["ACTIONS_RESULTS_URL"] = strings.TrimSuffix(cli.Address(), "/")
+
+ // Set specific environments to distinguish between Gitea and GitHub
+ envs["GITEA_ACTIONS"] = "true"
+ envs["GITEA_ACTIONS_RUNNER_VERSION"] = ver.Version()
+
+ return &Runner{
+ name: reg.Name,
+ cfg: cfg,
+ client: cli,
+ labels: ls,
+ envs: envs,
+ }
+}
+
+func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
+ if _, ok := r.runningTasks.Load(task.Id); ok {
+ return fmt.Errorf("task %d is already running", task.Id)
+ }
+ r.runningTasks.Store(task.Id, struct{}{})
+ defer r.runningTasks.Delete(task.Id)
+
+ ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout)
+ defer cancel()
+ reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval)
+ var runErr error
+ defer func() {
+ lastWords := ""
+ if runErr != nil {
+ lastWords = runErr.Error()
+ }
+ _ = reporter.Close(lastWords)
+ }()
+ reporter.RunDaemon()
+ runErr = r.run(ctx, task, reporter)
+
+ return nil
+}
+
+func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.Reporter) (err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ err = fmt.Errorf("panic: %v", r)
+ }
+ }()
+
+ reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue())
+
+ workflow, jobID, err := generateWorkflow(task)
+ if err != nil {
+ return err
+ }
+
+ plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
+ if err != nil {
+ return err
+ }
+ job := workflow.GetJob(jobID)
+ reporter.ResetSteps(len(job.Steps))
+
+ taskContext := task.Context.Fields
+
+ log.Infof("task %v repo is %v %v %v", task.Id, taskContext["repository"].GetStringValue(),
+ taskContext["gitea_default_actions_url"].GetStringValue(),
+ r.client.Address())
+
+ preset := &model.GithubContext{
+ Event: taskContext["event"].GetStructValue().AsMap(),
+ RunID: taskContext["run_id"].GetStringValue(),
+ RunNumber: taskContext["run_number"].GetStringValue(),
+ Actor: taskContext["actor"].GetStringValue(),
+ Repository: taskContext["repository"].GetStringValue(),
+ EventName: taskContext["event_name"].GetStringValue(),
+ Sha: taskContext["sha"].GetStringValue(),
+ Ref: taskContext["ref"].GetStringValue(),
+ RefName: taskContext["ref_name"].GetStringValue(),
+ RefType: taskContext["ref_type"].GetStringValue(),
+ HeadRef: taskContext["head_ref"].GetStringValue(),
+ BaseRef: taskContext["base_ref"].GetStringValue(),
+ Token: taskContext["token"].GetStringValue(),
+ RepositoryOwner: taskContext["repository_owner"].GetStringValue(),
+ RetentionDays: taskContext["retention_days"].GetStringValue(),
+ }
+ if t := task.Secrets["GITEA_TOKEN"]; t != "" {
+ preset.Token = t
+ } else if t := task.Secrets["GITHUB_TOKEN"]; t != "" {
+ preset.Token = t
+ }
+
+ giteaRuntimeToken := taskContext["gitea_runtime_token"].GetStringValue()
+ if giteaRuntimeToken == "" {
+ // use task token to action api token for previous Gitea Server Versions
+ giteaRuntimeToken = preset.Token
+ }
+ r.envs["ACTIONS_RUNTIME_TOKEN"] = giteaRuntimeToken
+
+ eventJSON, err := json.Marshal(preset.Event)
+ if err != nil {
+ return err
+ }
+
+ maxLifetime := 3 * time.Hour
+ if deadline, ok := ctx.Deadline(); ok {
+ maxLifetime = time.Until(deadline)
+ }
+
+ var inputs map[string]string
+ if preset.EventName == "workflow_dispatch" {
+ if inputsRaw, ok := preset.Event["inputs"]; ok {
+ inputs, _ = inputsRaw.(map[string]string)
+ }
+ }
+
+ runnerConfig := &runner.Config{
+ // On Linux, Workdir will be like "/<parent_directory>/<owner>/<repo>"
+ // On Windows, Workdir will be like "\<parent_directory>\<owner>\<repo>"
+ Workdir: filepath.FromSlash(filepath.Clean(fmt.Sprintf("/%s/%s", r.cfg.Container.WorkdirParent, preset.Repository))),
+ BindWorkdir: false,
+ ActionCacheDir: filepath.FromSlash(r.cfg.Host.WorkdirParent),
+
+ ReuseContainers: false,
+ ForcePull: r.cfg.Container.ForcePull,
+ ForceRebuild: false,
+ LogOutput: true,
+ JSONLogger: false,
+ Env: r.envs,
+ Secrets: task.Secrets,
+ GitHubInstance: strings.TrimSuffix(r.client.Address(), "/"),
+ AutoRemove: true,
+ NoSkipCheckout: true,
+ PresetGitHubContext: preset,
+ EventJSON: string(eventJSON),
+ ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id),
+ ContainerMaxLifetime: maxLifetime,
+ ContainerNetworkMode: container.NetworkMode(r.cfg.Container.Network),
+ ContainerNetworkEnableIPv6: r.cfg.Container.EnableIPv6,
+ ContainerOptions: r.cfg.Container.Options,
+ ContainerDaemonSocket: r.cfg.Container.DockerHost,
+ Privileged: r.cfg.Container.Privileged,
+ DefaultActionInstance: taskContext["gitea_default_actions_url"].GetStringValue(),
+ PlatformPicker: r.labels.PickPlatform,
+ Vars: task.Vars,
+ ValidVolumes: r.cfg.Container.ValidVolumes,
+ InsecureSkipTLS: r.cfg.Runner.Insecure,
+ Inputs: inputs,
+ }
+
+ rr, err := runner.New(runnerConfig)
+ if err != nil {
+ return err
+ }
+ executor := rr.NewPlanExecutor(plan)
+
+ reporter.Logf("workflow prepared")
+
+ // add logger recorders
+ ctx = common.WithLoggerHook(ctx, reporter)
+
+ execErr := executor(ctx)
+ reporter.SetOutputs(job.Outputs)
+ return execErr
+}
+
+func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) {
+ return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{
+ Version: ver.Version(),
+ Labels: labels,
+ }))
+}
+
+func (r *Runner) Update(ctx context.Context, labels labels.Labels) {
+ r.labels = labels
+}
diff --git a/internal/app/run/runner_test.go b/internal/app/run/runner_test.go
new file mode 100644
index 0000000..0145c70
--- /dev/null
+++ b/internal/app/run/runner_test.go
@@ -0,0 +1,37 @@
+package run
+
+import (
+ "context"
+ "testing"
+
+ "gitea.com/gitea/act_runner/internal/pkg/labels"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestLabelUpdate(t *testing.T) {
+ ctx := context.Background()
+ ls := labels.Labels{}
+
+ initialLabel, err := labels.Parse("testlabel:docker://alpine")
+ assert.NoError(t, err)
+ ls = append(ls, initialLabel)
+
+ newLs := labels.Labels{}
+
+ newLabel, err := labels.Parse("next label:host")
+ assert.NoError(t, err)
+ newLs = append(newLs, initialLabel)
+ newLs = append(newLs, newLabel)
+
+ runner := Runner{
+ labels: ls,
+ }
+
+ assert.Contains(t, runner.labels, initialLabel)
+ assert.NotContains(t, runner.labels, newLabel)
+
+ runner.Update(ctx, newLs)
+
+ assert.Contains(t, runner.labels, initialLabel)
+ assert.Contains(t, runner.labels, newLabel)
+}
diff --git a/internal/app/run/workflow.go b/internal/app/run/workflow.go
new file mode 100644
index 0000000..a6fbb71
--- /dev/null
+++ b/internal/app/run/workflow.go
@@ -0,0 +1,54 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package run
+
+import (
+ "bytes"
+ "fmt"
+ "sort"
+ "strings"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "github.com/nektos/act/pkg/model"
+ "gopkg.in/yaml.v3"
+)
+
+func generateWorkflow(task *runnerv1.Task) (*model.Workflow, string, error) {
+ workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
+ if err != nil {
+ return nil, "", err
+ }
+
+ jobIDs := workflow.GetJobIDs()
+ if len(jobIDs) != 1 {
+ return nil, "", fmt.Errorf("multiple jobs found: %v", jobIDs)
+ }
+ jobID := jobIDs[0]
+
+ needJobIDs := make([]string, 0, len(task.Needs))
+ for id, need := range task.Needs {
+ needJobIDs = append(needJobIDs, id)
+ needJob := &model.Job{
+ Outputs: need.Outputs,
+ Result: strings.ToLower(strings.TrimPrefix(need.Result.String(), "RESULT_")),
+ }
+ workflow.Jobs[id] = needJob
+ }
+ sort.Strings(needJobIDs)
+
+ rawNeeds := yaml.Node{
+ Kind: yaml.SequenceNode,
+ Content: make([]*yaml.Node, 0, len(needJobIDs)),
+ }
+ for _, id := range needJobIDs {
+ rawNeeds.Content = append(rawNeeds.Content, &yaml.Node{
+ Kind: yaml.ScalarNode,
+ Value: id,
+ })
+ }
+
+ workflow.Jobs[jobID].RawNeeds = rawNeeds
+
+ return workflow, jobID, nil
+}
diff --git a/internal/app/run/workflow_test.go b/internal/app/run/workflow_test.go
new file mode 100644
index 0000000..c7598db
--- /dev/null
+++ b/internal/app/run/workflow_test.go
@@ -0,0 +1,74 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package run
+
+import (
+ "testing"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "github.com/nektos/act/pkg/model"
+ "github.com/stretchr/testify/require"
+ "gotest.tools/v3/assert"
+)
+
+func Test_generateWorkflow(t *testing.T) {
+ type args struct {
+ task *runnerv1.Task
+ }
+ tests := []struct {
+ name string
+ args args
+ assert func(t *testing.T, wf *model.Workflow)
+ want1 string
+ wantErr bool
+ }{
+ {
+ name: "has needs",
+ args: args{
+ task: &runnerv1.Task{
+ WorkflowPayload: []byte(`
+name: Build and deploy
+on: push
+
+jobs:
+ job9:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - run: ./deploy --build ${{ needs.job1.outputs.output1 }}
+ - run: ./deploy --build ${{ needs.job2.outputs.output2 }}
+`),
+ Needs: map[string]*runnerv1.TaskNeed{
+ "job1": {
+ Outputs: map[string]string{
+ "output1": "output1 value",
+ },
+ Result: runnerv1.Result_RESULT_SUCCESS,
+ },
+ "job2": {
+ Outputs: map[string]string{
+ "output2": "output2 value",
+ },
+ Result: runnerv1.Result_RESULT_SUCCESS,
+ },
+ },
+ },
+ },
+ assert: func(t *testing.T, wf *model.Workflow) {
+ assert.DeepEqual(t, wf.GetJob("job9").Needs(), []string{"job1", "job2"})
+ },
+ want1: "job9",
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, got1, err := generateWorkflow(tt.args.task)
+ require.NoError(t, err)
+ tt.assert(t, got)
+ assert.Equal(t, got1, tt.want1)
+ })
+ }
+}