summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcobak78 <cobak78@noreply.code.forgejo.org>2025-01-15 13:29:07 +0100
committerearl-warren <earl-warren@noreply.code.forgejo.org>2025-01-15 13:29:07 +0100
commitba78c113265a9111dc250c62bcf812c4fe15dbc4 (patch)
treec9ab30c49d3bb4af4e4442e29cdf4ea48d4319d9
parentMerge pull request 'fix: example uses code.forgejo.org/oci' (#434) from earl-... (diff)
downloadforgejo-runner-ba78c113265a9111dc250c62bcf812c4fe15dbc4.tar.xz
forgejo-runner-ba78c113265a9111dc250c62bcf812c4fe15dbc4.zip
New "one shot" type of execution by adding a new command called one-job. (#423)
As commented here https://code.forgejo.org/forgejo/runner/issues/422, this PR aims to allow a new type of one shot execution compatible with autoscaling features and other job types. Co-authored-by: jaime merino <jaime.merino_mora@mail.schwarzª> Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/423 Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org> Co-authored-by: cobak78 <cobak78@noreply.code.forgejo.org> Co-committed-by: cobak78 <cobak78@noreply.code.forgejo.org>
-rw-r--r--RELEASE-NOTES.md1
-rw-r--r--internal/app/cmd/cmd.go9
-rw-r--r--internal/app/cmd/job.go117
-rw-r--r--internal/app/job/job.go94
-rw-r--r--internal/app/job/job_test.go169
5 files changed, 390 insertions, 0 deletions
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 76b2614..cea310d 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -3,6 +3,7 @@
## 6.1.0
* [Add `[container].force_rebuild` config option](https://code.forgejo.org/forgejo/runner/pulls/406) to force rebuilding of local docker images, even if they are already present.
+* [Add new `--one-job` flag](https://code.forgejo.org/forgejo/runner/pulls/423) to execute a previously configured runner, execute one task if it exists and exit. Motivation [here](https://code.forgejo.org/forgejo/runner/issues/422)
## 6.0.1
diff --git a/internal/app/cmd/cmd.go b/internal/app/cmd/cmd.go
index 48341dc..f8987db 100644
--- a/internal/app/cmd/cmd.go
+++ b/internal/app/cmd/cmd.go
@@ -52,6 +52,15 @@ func Execute(ctx context.Context) {
}
rootCmd.AddCommand(daemonCmd)
+ // ./act_runner job
+ jobCmd := &cobra.Command{
+ Use: "one-job",
+ Short: "Run only one job",
+ Args: cobra.MaximumNArgs(1),
+ RunE: runJob(ctx, &configFile),
+ }
+ rootCmd.AddCommand(jobCmd)
+
// ./act_runner exec
rootCmd.AddCommand(loadExecCmd(ctx))
diff --git a/internal/app/cmd/job.go b/internal/app/cmd/job.go
new file mode 100644
index 0000000..a4ded06
--- /dev/null
+++ b/internal/app/cmd/job.go
@@ -0,0 +1,117 @@
+// Copyright 2025 The Forgejo Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package cmd
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "strings"
+
+ "connectrpc.com/connect"
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+
+ "gitea.com/gitea/act_runner/internal/app/job"
+ "gitea.com/gitea/act_runner/internal/app/run"
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+ "gitea.com/gitea/act_runner/internal/pkg/envcheck"
+ "gitea.com/gitea/act_runner/internal/pkg/labels"
+ "gitea.com/gitea/act_runner/internal/pkg/ver"
+)
+
+func runJob(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error {
+ return func(cmd *cobra.Command, args []string) error {
+ cfg, err := config.LoadDefault(*configFile)
+ if err != nil {
+ return fmt.Errorf("invalid configuration: %w", err)
+ }
+
+ initLogging(cfg)
+ log.Infoln("Starting job")
+
+ reg, err := config.LoadRegistration(cfg.Runner.File)
+ if os.IsNotExist(err) {
+ log.Error("registration file not found, please register the runner first")
+ return err
+ } else if err != nil {
+ return fmt.Errorf("failed to load registration file: %w", err)
+ }
+
+ lbls := reg.Labels
+ if len(cfg.Runner.Labels) > 0 {
+ lbls = cfg.Runner.Labels
+ }
+
+ ls := labels.Labels{}
+ for _, l := range lbls {
+ label, err := labels.Parse(l)
+ if err != nil {
+ log.WithError(err).Warnf("ignored invalid label %q", l)
+ continue
+ }
+ ls = append(ls, label)
+ }
+ if len(ls) == 0 {
+ log.Warn("no labels configured, runner may not be able to pick up jobs")
+ }
+
+ if ls.RequireDocker() {
+ dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
+ if err != nil {
+ return err
+ }
+ if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil {
+ return err
+ }
+ // if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath
+ os.Setenv("DOCKER_HOST", dockerSocketPath)
+ // empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically
+ // and assign the path to cfg.Container.DockerHost
+ if cfg.Container.DockerHost == "" {
+ cfg.Container.DockerHost = dockerSocketPath
+ }
+ // check the scheme, if the scheme is not npipe or unix
+ // set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
+ if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
+ scheme := cfg.Container.DockerHost[:protoIndex]
+ if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
+ cfg.Container.DockerHost = "-"
+ }
+ }
+ }
+
+ cli := client.New(
+ reg.Address,
+ cfg.Runner.Insecure,
+ reg.UUID,
+ reg.Token,
+ ver.Version(),
+ )
+
+ runner := run.NewRunner(cfg, reg, cli)
+ // declare the labels of the runner before fetching tasks
+ resp, err := runner.Declare(ctx, ls.Names())
+ if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented {
+ // Gitea instance is older version. skip declare step.
+ log.Warn("Because the Forgejo instance is an old version, skipping declaring the labels and version.")
+ } else if err != nil {
+ log.WithError(err).Error("fail to invoke Declare")
+ return err
+ } else {
+ log.Infof("runner: %s, with version: %s, with labels: %v, declared successfully",
+ resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
+ // if declared successfully, override the labels in the.runner file with valid labels in the config file (if specified)
+ runner.Update(ctx, ls)
+ reg.Labels = ls.ToStrings()
+ if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
+ return fmt.Errorf("failed to save runner config: %w", err)
+ }
+ }
+
+ j := job.NewJob(cfg, cli, runner)
+ return j.Run(ctx)
+ }
+}
diff --git a/internal/app/job/job.go b/internal/app/job/job.go
new file mode 100644
index 0000000..dfa184a
--- /dev/null
+++ b/internal/app/job/job.go
@@ -0,0 +1,94 @@
+// Copyright 2025 The Forgejo Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package job
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync/atomic"
+
+ "connectrpc.com/connect"
+ log "github.com/sirupsen/logrus"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "gitea.com/gitea/act_runner/internal/app/run"
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+)
+
+type Job struct {
+ client client.Client
+ runner run.RunnerInterface
+ cfg *config.Config
+ tasksVersion atomic.Int64
+}
+
+func NewJob(cfg *config.Config, client client.Client, runner run.RunnerInterface) *Job {
+ j := &Job{}
+
+ j.client = client
+ j.runner = runner
+ j.cfg = cfg
+
+ return j
+}
+
+func (j *Job) Run(ctx context.Context) error {
+ task, ok := j.fetchTask(ctx)
+ if !ok {
+ return fmt.Errorf("could not fetch task")
+ }
+ return j.runTaskWithRecover(ctx, task)
+}
+
+func (j *Job) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) error {
+ defer func() {
+ if r := recover(); r != nil {
+ err := fmt.Errorf("panic: %v", r)
+ log.WithError(err).Error("panic in runTaskWithRecover")
+ }
+ }()
+
+ if err := j.runner.Run(ctx, task); err != nil {
+ log.WithError(err).Error("failed to run task")
+ return err
+ }
+ return nil
+}
+
+func (j *Job) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
+ reqCtx, cancel := context.WithTimeout(ctx, j.cfg.Runner.FetchTimeout)
+ defer cancel()
+
+ // Load the version value that was in the cache when the request was sent.
+ v := j.tasksVersion.Load()
+ resp, err := j.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{
+ TasksVersion: v,
+ }))
+ if err != nil {
+ if errors.Is(err, context.Canceled) {
+ log.WithError(err).Debugf("shutdown, fetch task canceled")
+ } else {
+ log.WithError(err).Error("failed to fetch task")
+ }
+ return nil, false
+ }
+
+ if resp == nil || resp.Msg == nil {
+ return nil, false
+ }
+
+ if resp.Msg.TasksVersion > v {
+ j.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
+ }
+
+ if resp.Msg.Task == nil {
+ return nil, false
+ }
+
+ j.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
+
+ return resp.Msg.Task, true
+}
diff --git a/internal/app/job/job_test.go b/internal/app/job/job_test.go
new file mode 100644
index 0000000..086c69f
--- /dev/null
+++ b/internal/app/job/job_test.go
@@ -0,0 +1,169 @@
+package job
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "connectrpc.com/connect"
+
+ "code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+)
+
+type mockClient struct {
+ pingv1connect.PingServiceClient
+ runnerv1connect.RunnerServiceClient
+
+ sleep time.Duration
+ cancel bool
+ err error
+ noTask bool
+}
+
+func (o mockClient) Address() string {
+ return ""
+}
+
+func (o mockClient) Insecure() bool {
+ return true
+}
+
+func (o *mockClient) FetchTask(ctx context.Context, _ *connect.Request[runnerv1.FetchTaskRequest]) (*connect.Response[runnerv1.FetchTaskResponse], error) {
+ if o.sleep > 0 {
+ select {
+ case <-ctx.Done():
+ log.Trace("fetch task done")
+ return nil, context.DeadlineExceeded
+ case <-time.After(o.sleep):
+ log.Trace("slept")
+ return nil, fmt.Errorf("unexpected")
+ }
+ }
+ if o.cancel {
+ return nil, context.Canceled
+ }
+ if o.err != nil {
+ return nil, o.err
+ }
+ task := &runnerv1.Task{}
+ if o.noTask {
+ task = nil
+ o.noTask = false
+ }
+
+ return connect.NewResponse(&runnerv1.FetchTaskResponse{
+ Task: task,
+ TasksVersion: int64(1),
+ }), nil
+}
+
+type mockRunner struct {
+ cfg *config.Runner
+ log chan string
+ panics bool
+ err error
+}
+
+func (o *mockRunner) Run(ctx context.Context, _ *runnerv1.Task) error {
+ o.log <- "runner starts"
+ if o.panics {
+ log.Trace("panics")
+ o.log <- "runner panics"
+ o.panics = false
+ panic("whatever")
+ }
+ if o.err != nil {
+ log.Trace("error")
+ o.log <- "runner error"
+ err := o.err
+ o.err = nil
+ return err
+ }
+ for {
+ select {
+ case <-ctx.Done():
+ log.Trace("shutdown")
+ o.log <- "runner shutdown"
+ return nil
+ case <-time.After(o.cfg.Timeout):
+ log.Trace("after")
+ o.log <- "runner timeout"
+ return nil
+ }
+ }
+}
+
+func TestNewJob(t *testing.T) {
+ j := NewJob(&config.Config{}, &mockClient{}, &mockRunner{})
+ assert.NotNil(t, j)
+}
+
+func setTrace(t *testing.T) {
+ t.Helper()
+ log.SetReportCaller(true)
+ log.SetLevel(log.TraceLevel)
+}
+
+func TestJob_fetchTask(t *testing.T) {
+ setTrace(t)
+ for _, testCase := range []struct {
+ name string
+ noTask bool
+ sleep time.Duration
+ err error
+ cancel bool
+ success bool
+ }{
+ {
+ name: "Success",
+ success: true,
+ },
+ {
+ name: "Canceled",
+ cancel: true,
+ },
+ {
+ name: "NoTask",
+ noTask: true,
+ },
+ {
+ name: "Error",
+ err: fmt.Errorf("random error"),
+ },
+ } {
+ t.Run(testCase.name, func(t *testing.T) {
+ configRunner := config.Runner{
+ FetchTimeout: 1 * time.Millisecond,
+ }
+
+ j := NewJob(
+ &config.Config{
+ Runner: configRunner,
+ },
+ &mockClient{
+ sleep: testCase.sleep,
+ cancel: testCase.cancel,
+ noTask: testCase.noTask,
+ err: testCase.err,
+ },
+ &mockRunner{},
+ )
+
+ task, ok := j.fetchTask(context.Background())
+ if testCase.success {
+ assert.True(t, ok)
+ assert.NotNil(t, task)
+ } else {
+ assert.False(t, ok)
+ assert.Nil(t, task)
+ }
+ })
+ }
+}