diff options
author | cobak78 <cobak78@noreply.code.forgejo.org> | 2025-01-15 13:29:07 +0100 |
---|---|---|
committer | earl-warren <earl-warren@noreply.code.forgejo.org> | 2025-01-15 13:29:07 +0100 |
commit | ba78c113265a9111dc250c62bcf812c4fe15dbc4 (patch) | |
tree | c9ab30c49d3bb4af4e4442e29cdf4ea48d4319d9 | |
parent | Merge pull request 'fix: example uses code.forgejo.org/oci' (#434) from earl-... (diff) | |
download | forgejo-runner-ba78c113265a9111dc250c62bcf812c4fe15dbc4.tar.xz forgejo-runner-ba78c113265a9111dc250c62bcf812c4fe15dbc4.zip |
New "one shot" type of execution by adding a new command called one-job. (#423)
As commented here https://code.forgejo.org/forgejo/runner/issues/422, this PR aims to allow a new type of one shot execution compatible with autoscaling features and other job types.
Co-authored-by: jaime merino <jaime.merino_mora@mail.schwarzª>
Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/423
Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org>
Co-authored-by: cobak78 <cobak78@noreply.code.forgejo.org>
Co-committed-by: cobak78 <cobak78@noreply.code.forgejo.org>
-rw-r--r-- | RELEASE-NOTES.md | 1 | ||||
-rw-r--r-- | internal/app/cmd/cmd.go | 9 | ||||
-rw-r--r-- | internal/app/cmd/job.go | 117 | ||||
-rw-r--r-- | internal/app/job/job.go | 94 | ||||
-rw-r--r-- | internal/app/job/job_test.go | 169 |
5 files changed, 390 insertions, 0 deletions
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 76b2614..cea310d 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -3,6 +3,7 @@ ## 6.1.0 * [Add `[container].force_rebuild` config option](https://code.forgejo.org/forgejo/runner/pulls/406) to force rebuilding of local docker images, even if they are already present. +* [Add new `--one-job` flag](https://code.forgejo.org/forgejo/runner/pulls/423) to execute a previously configured runner, execute one task if it exists and exit. Motivation [here](https://code.forgejo.org/forgejo/runner/issues/422) ## 6.0.1 diff --git a/internal/app/cmd/cmd.go b/internal/app/cmd/cmd.go index 48341dc..f8987db 100644 --- a/internal/app/cmd/cmd.go +++ b/internal/app/cmd/cmd.go @@ -52,6 +52,15 @@ func Execute(ctx context.Context) { } rootCmd.AddCommand(daemonCmd) + // ./act_runner job + jobCmd := &cobra.Command{ + Use: "one-job", + Short: "Run only one job", + Args: cobra.MaximumNArgs(1), + RunE: runJob(ctx, &configFile), + } + rootCmd.AddCommand(jobCmd) + // ./act_runner exec rootCmd.AddCommand(loadExecCmd(ctx)) diff --git a/internal/app/cmd/job.go b/internal/app/cmd/job.go new file mode 100644 index 0000000..a4ded06 --- /dev/null +++ b/internal/app/cmd/job.go @@ -0,0 +1,117 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "context" + "fmt" + "os" + "strings" + + "connectrpc.com/connect" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "gitea.com/gitea/act_runner/internal/app/job" + "gitea.com/gitea/act_runner/internal/app/run" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/envcheck" + "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +func runJob(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + cfg, err := config.LoadDefault(*configFile) + if err != nil { + return fmt.Errorf("invalid configuration: %w", err) + } + + initLogging(cfg) + log.Infoln("Starting job") + + reg, err := config.LoadRegistration(cfg.Runner.File) + if os.IsNotExist(err) { + log.Error("registration file not found, please register the runner first") + return err + } else if err != nil { + return fmt.Errorf("failed to load registration file: %w", err) + } + + lbls := reg.Labels + if len(cfg.Runner.Labels) > 0 { + lbls = cfg.Runner.Labels + } + + ls := labels.Labels{} + for _, l := range lbls { + label, err := labels.Parse(l) + if err != nil { + log.WithError(err).Warnf("ignored invalid label %q", l) + continue + } + ls = append(ls, label) + } + if len(ls) == 0 { + log.Warn("no labels configured, runner may not be able to pick up jobs") + } + + if ls.RequireDocker() { + dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost) + if err != nil { + return err + } + if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil { + return err + } + // if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath + os.Setenv("DOCKER_HOST", dockerSocketPath) + // empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically + // and assign the path to cfg.Container.DockerHost + if cfg.Container.DockerHost == "" { + cfg.Container.DockerHost = dockerSocketPath + } + // check the scheme, if the scheme is not npipe or unix + // set cfg.Container.DockerHost to "-" because it can't be mounted to the job container + if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 { + scheme := cfg.Container.DockerHost[:protoIndex] + if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") { + cfg.Container.DockerHost = "-" + } + } + } + + cli := client.New( + reg.Address, + cfg.Runner.Insecure, + reg.UUID, + reg.Token, + ver.Version(), + ) + + runner := run.NewRunner(cfg, reg, cli) + // declare the labels of the runner before fetching tasks + resp, err := runner.Declare(ctx, ls.Names()) + if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented { + // Gitea instance is older version. skip declare step. + log.Warn("Because the Forgejo instance is an old version, skipping declaring the labels and version.") + } else if err != nil { + log.WithError(err).Error("fail to invoke Declare") + return err + } else { + log.Infof("runner: %s, with version: %s, with labels: %v, declared successfully", + resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels) + // if declared successfully, override the labels in the.runner file with valid labels in the config file (if specified) + runner.Update(ctx, ls) + reg.Labels = ls.ToStrings() + if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil { + return fmt.Errorf("failed to save runner config: %w", err) + } + } + + j := job.NewJob(cfg, cli, runner) + return j.Run(ctx) + } +} diff --git a/internal/app/job/job.go b/internal/app/job/job.go new file mode 100644 index 0000000..dfa184a --- /dev/null +++ b/internal/app/job/job.go @@ -0,0 +1,94 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package job + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + + "connectrpc.com/connect" + log "github.com/sirupsen/logrus" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "gitea.com/gitea/act_runner/internal/app/run" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" +) + +type Job struct { + client client.Client + runner run.RunnerInterface + cfg *config.Config + tasksVersion atomic.Int64 +} + +func NewJob(cfg *config.Config, client client.Client, runner run.RunnerInterface) *Job { + j := &Job{} + + j.client = client + j.runner = runner + j.cfg = cfg + + return j +} + +func (j *Job) Run(ctx context.Context) error { + task, ok := j.fetchTask(ctx) + if !ok { + return fmt.Errorf("could not fetch task") + } + return j.runTaskWithRecover(ctx, task) +} + +func (j *Job) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) error { + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("panic: %v", r) + log.WithError(err).Error("panic in runTaskWithRecover") + } + }() + + if err := j.runner.Run(ctx, task); err != nil { + log.WithError(err).Error("failed to run task") + return err + } + return nil +} + +func (j *Job) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { + reqCtx, cancel := context.WithTimeout(ctx, j.cfg.Runner.FetchTimeout) + defer cancel() + + // Load the version value that was in the cache when the request was sent. + v := j.tasksVersion.Load() + resp, err := j.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: v, + })) + if err != nil { + if errors.Is(err, context.Canceled) { + log.WithError(err).Debugf("shutdown, fetch task canceled") + } else { + log.WithError(err).Error("failed to fetch task") + } + return nil, false + } + + if resp == nil || resp.Msg == nil { + return nil, false + } + + if resp.Msg.TasksVersion > v { + j.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion) + } + + if resp.Msg.Task == nil { + return nil, false + } + + j.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0) + + return resp.Msg.Task, true +} diff --git a/internal/app/job/job_test.go b/internal/app/job/job_test.go new file mode 100644 index 0000000..086c69f --- /dev/null +++ b/internal/app/job/job_test.go @@ -0,0 +1,169 @@ +package job + +import ( + "context" + "fmt" + "testing" + "time" + + "connectrpc.com/connect" + + "code.gitea.io/actions-proto-go/ping/v1/pingv1connect" + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect" + "gitea.com/gitea/act_runner/internal/pkg/config" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +type mockClient struct { + pingv1connect.PingServiceClient + runnerv1connect.RunnerServiceClient + + sleep time.Duration + cancel bool + err error + noTask bool +} + +func (o mockClient) Address() string { + return "" +} + +func (o mockClient) Insecure() bool { + return true +} + +func (o *mockClient) FetchTask(ctx context.Context, _ *connect.Request[runnerv1.FetchTaskRequest]) (*connect.Response[runnerv1.FetchTaskResponse], error) { + if o.sleep > 0 { + select { + case <-ctx.Done(): + log.Trace("fetch task done") + return nil, context.DeadlineExceeded + case <-time.After(o.sleep): + log.Trace("slept") + return nil, fmt.Errorf("unexpected") + } + } + if o.cancel { + return nil, context.Canceled + } + if o.err != nil { + return nil, o.err + } + task := &runnerv1.Task{} + if o.noTask { + task = nil + o.noTask = false + } + + return connect.NewResponse(&runnerv1.FetchTaskResponse{ + Task: task, + TasksVersion: int64(1), + }), nil +} + +type mockRunner struct { + cfg *config.Runner + log chan string + panics bool + err error +} + +func (o *mockRunner) Run(ctx context.Context, _ *runnerv1.Task) error { + o.log <- "runner starts" + if o.panics { + log.Trace("panics") + o.log <- "runner panics" + o.panics = false + panic("whatever") + } + if o.err != nil { + log.Trace("error") + o.log <- "runner error" + err := o.err + o.err = nil + return err + } + for { + select { + case <-ctx.Done(): + log.Trace("shutdown") + o.log <- "runner shutdown" + return nil + case <-time.After(o.cfg.Timeout): + log.Trace("after") + o.log <- "runner timeout" + return nil + } + } +} + +func TestNewJob(t *testing.T) { + j := NewJob(&config.Config{}, &mockClient{}, &mockRunner{}) + assert.NotNil(t, j) +} + +func setTrace(t *testing.T) { + t.Helper() + log.SetReportCaller(true) + log.SetLevel(log.TraceLevel) +} + +func TestJob_fetchTask(t *testing.T) { + setTrace(t) + for _, testCase := range []struct { + name string + noTask bool + sleep time.Duration + err error + cancel bool + success bool + }{ + { + name: "Success", + success: true, + }, + { + name: "Canceled", + cancel: true, + }, + { + name: "NoTask", + noTask: true, + }, + { + name: "Error", + err: fmt.Errorf("random error"), + }, + } { + t.Run(testCase.name, func(t *testing.T) { + configRunner := config.Runner{ + FetchTimeout: 1 * time.Millisecond, + } + + j := NewJob( + &config.Config{ + Runner: configRunner, + }, + &mockClient{ + sleep: testCase.sleep, + cancel: testCase.cancel, + noTask: testCase.noTask, + err: testCase.err, + }, + &mockRunner{}, + ) + + task, ok := j.fetchTask(context.Background()) + if testCase.success { + assert.True(t, ok) + assert.NotNil(t, task) + } else { + assert.False(t, ok) + assert.Nil(t, task) + } + }) + } +} |