summaryrefslogtreecommitdiffstats
path: root/internal
diff options
context:
space:
mode:
authorDaniel Baumann <daniel@debian.org>2024-10-20 22:50:50 +0200
committerDaniel Baumann <daniel@debian.org>2024-10-20 22:50:50 +0200
commit9fa26b7837ed8e6679b7e6115425cab6ecbc9a8a (patch)
treec5b6f218ae267153042529217fdabeac4849ca1e /internal
parentInitial commit. (diff)
downloadforgejo-runner-9fa26b7837ed8e6679b7e6115425cab6ecbc9a8a.tar.xz
forgejo-runner-9fa26b7837ed8e6679b7e6115425cab6ecbc9a8a.zip
Adding upstream version 3.5.1.HEADupstream/3.5.1upstreamdebian
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to 'internal')
-rw-r--r--internal/app/cmd/cache-server.go69
-rw-r--r--internal/app/cmd/cmd.go87
-rw-r--r--internal/app/cmd/create-runner-file.go164
-rw-r--r--internal/app/cmd/create-runner-file_test.go118
-rw-r--r--internal/app/cmd/daemon.go208
-rw-r--r--internal/app/cmd/exec.go495
-rw-r--r--internal/app/cmd/register.go355
-rw-r--r--internal/app/poll/poller.go167
-rw-r--r--internal/app/poll/poller_test.go263
-rw-r--r--internal/app/run/runner.go260
-rw-r--r--internal/app/run/runner_test.go37
-rw-r--r--internal/app/run/workflow.go54
-rw-r--r--internal/app/run/workflow_test.go74
-rw-r--r--internal/pkg/client/client.go19
-rw-r--r--internal/pkg/client/header.go11
-rw-r--r--internal/pkg/client/http.go82
-rw-r--r--internal/pkg/client/mocks/Client.go219
-rw-r--r--internal/pkg/config/config.example.yaml100
-rw-r--r--internal/pkg/config/config.go166
-rw-r--r--internal/pkg/config/config_test.go37
-rw-r--r--internal/pkg/config/deprecated.go62
-rw-r--r--internal/pkg/config/embed.go9
-rw-r--r--internal/pkg/config/registration.go54
-rw-r--r--internal/pkg/envcheck/doc.go5
-rw-r--r--internal/pkg/envcheck/docker.go34
-rw-r--r--internal/pkg/labels/labels.go109
-rw-r--r--internal/pkg/labels/labels_test.go63
-rw-r--r--internal/pkg/report/reporter.go437
-rw-r--r--internal/pkg/report/reporter_test.go198
-rw-r--r--internal/pkg/ver/version.go11
30 files changed, 3967 insertions, 0 deletions
diff --git a/internal/app/cmd/cache-server.go b/internal/app/cmd/cache-server.go
new file mode 100644
index 0000000..21b3352
--- /dev/null
+++ b/internal/app/cmd/cache-server.go
@@ -0,0 +1,69 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package cmd
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "os/signal"
+
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+
+ "github.com/nektos/act/pkg/artifactcache"
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+)
+
+type cacheServerArgs struct {
+ Dir string
+ Host string
+ Port uint16
+}
+
+func runCacheServer(ctx context.Context, configFile *string, cacheArgs *cacheServerArgs) func(cmd *cobra.Command, args []string) error {
+ return func(cmd *cobra.Command, args []string) error {
+ cfg, err := config.LoadDefault(*configFile)
+ if err != nil {
+ return fmt.Errorf("invalid configuration: %w", err)
+ }
+
+ initLogging(cfg)
+
+ var (
+ dir = cfg.Cache.Dir
+ host = cfg.Cache.Host
+ port = cfg.Cache.Port
+ )
+
+ // cacheArgs has higher priority
+ if cacheArgs.Dir != "" {
+ dir = cacheArgs.Dir
+ }
+ if cacheArgs.Host != "" {
+ host = cacheArgs.Host
+ }
+ if cacheArgs.Port != 0 {
+ port = cacheArgs.Port
+ }
+
+ cacheHandler, err := artifactcache.StartHandler(
+ dir,
+ host,
+ port,
+ log.StandardLogger().WithField("module", "cache_request"),
+ )
+ if err != nil {
+ return err
+ }
+
+ log.Infof("cache server is listening on %v", cacheHandler.ExternalURL())
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+ <-c
+
+ return nil
+ }
+}
diff --git a/internal/app/cmd/cmd.go b/internal/app/cmd/cmd.go
new file mode 100644
index 0000000..48341dc
--- /dev/null
+++ b/internal/app/cmd/cmd.go
@@ -0,0 +1,87 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package cmd
+
+import (
+ "context"
+ "fmt"
+ "os"
+
+ "github.com/spf13/cobra"
+
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+ "gitea.com/gitea/act_runner/internal/pkg/ver"
+)
+
+func Execute(ctx context.Context) {
+ // ./act_runner
+ rootCmd := &cobra.Command{
+ Use: "forgejo-runner [event name to run]\nIf no event name passed, will default to \"on: push\"",
+ Short: "Run Forgejo Actions locally by specifying the event name (e.g. `push`) or an action name directly.",
+ Args: cobra.MaximumNArgs(1),
+ Version: ver.Version(),
+ SilenceUsage: true,
+ }
+ configFile := ""
+ rootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "", "Config file path")
+
+ // ./act_runner register
+ var regArgs registerArgs
+ registerCmd := &cobra.Command{
+ Use: "register",
+ Short: "Register a runner to the server",
+ Args: cobra.MaximumNArgs(0),
+ RunE: runRegister(ctx, &regArgs, &configFile), // must use a pointer to regArgs
+ }
+ registerCmd.Flags().BoolVar(&regArgs.NoInteractive, "no-interactive", false, "Disable interactive mode")
+ registerCmd.Flags().StringVar(&regArgs.InstanceAddr, "instance", "", "Forgejo instance address")
+ registerCmd.Flags().StringVar(&regArgs.Token, "token", "", "Runner token")
+ registerCmd.Flags().StringVar(&regArgs.RunnerName, "name", "", "Runner name")
+ registerCmd.Flags().StringVar(&regArgs.Labels, "labels", "", "Runner tags, comma separated")
+ rootCmd.AddCommand(registerCmd)
+
+ rootCmd.AddCommand(createRunnerFileCmd(ctx, &configFile))
+
+ // ./act_runner daemon
+ daemonCmd := &cobra.Command{
+ Use: "daemon",
+ Short: "Run as a runner daemon",
+ Args: cobra.MaximumNArgs(1),
+ RunE: runDaemon(ctx, &configFile),
+ }
+ rootCmd.AddCommand(daemonCmd)
+
+ // ./act_runner exec
+ rootCmd.AddCommand(loadExecCmd(ctx))
+
+ // ./act_runner config
+ rootCmd.AddCommand(&cobra.Command{
+ Use: "generate-config",
+ Short: "Generate an example config file",
+ Args: cobra.MaximumNArgs(0),
+ Run: func(_ *cobra.Command, _ []string) {
+ fmt.Printf("%s", config.Example)
+ },
+ })
+
+ // ./act_runner cache-server
+ var cacheArgs cacheServerArgs
+ cacheCmd := &cobra.Command{
+ Use: "cache-server",
+ Short: "Start a cache server for the cache action",
+ Args: cobra.MaximumNArgs(0),
+ RunE: runCacheServer(ctx, &configFile, &cacheArgs),
+ }
+ cacheCmd.Flags().StringVarP(&cacheArgs.Dir, "dir", "d", "", "Cache directory")
+ cacheCmd.Flags().StringVarP(&cacheArgs.Host, "host", "s", "", "Host of the cache server")
+ cacheCmd.Flags().Uint16VarP(&cacheArgs.Port, "port", "p", 0, "Port of the cache server")
+ rootCmd.AddCommand(cacheCmd)
+
+ // hide completion command
+ rootCmd.CompletionOptions.HiddenDefaultCmd = true
+
+ if err := rootCmd.Execute(); err != nil {
+ os.Exit(1)
+ }
+}
diff --git a/internal/app/cmd/create-runner-file.go b/internal/app/cmd/create-runner-file.go
new file mode 100644
index 0000000..a972624
--- /dev/null
+++ b/internal/app/cmd/create-runner-file.go
@@ -0,0 +1,164 @@
+// SPDX-License-Identifier: MIT
+
+package cmd
+
+import (
+ "context"
+ "encoding/hex"
+ "fmt"
+ "os"
+
+ pingv1 "code.gitea.io/actions-proto-go/ping/v1"
+ "connectrpc.com/connect"
+ gouuid "github.com/google/uuid"
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+
+ "gitea.com/gitea/act_runner/internal/app/run"
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+ "gitea.com/gitea/act_runner/internal/pkg/ver"
+)
+
+type createRunnerFileArgs struct {
+ Connect bool
+ InstanceAddr string
+ Secret string
+ Name string
+}
+
+func createRunnerFileCmd(ctx context.Context, configFile *string) *cobra.Command {
+ var argsVar createRunnerFileArgs
+ cmd := &cobra.Command{
+ Use: "create-runner-file",
+ Short: "Create a runner file using a shared secret used to pre-register the runner on the Forgejo instance",
+ Args: cobra.MaximumNArgs(0),
+ RunE: runCreateRunnerFile(ctx, &argsVar, configFile),
+ }
+ cmd.Flags().BoolVar(&argsVar.Connect, "connect", false, "tries to connect to the instance using the secret (Forgejo v1.21 instance or greater)")
+ cmd.Flags().StringVar(&argsVar.InstanceAddr, "instance", "", "Forgejo instance address")
+ cmd.MarkFlagRequired("instance")
+ cmd.Flags().StringVar(&argsVar.Secret, "secret", "", "secret shared with the Forgejo instance via forgejo-cli actions register")
+ cmd.MarkFlagRequired("secret")
+ cmd.Flags().StringVar(&argsVar.Name, "name", "", "Runner name")
+
+ return cmd
+}
+
+// must be exactly the same as fogejo/models/actions/forgejo.go
+func uuidFromSecret(secret string) (string, error) {
+ uuid, err := gouuid.FromBytes([]byte(secret[:16]))
+ if err != nil {
+ return "", fmt.Errorf("gouuid.FromBytes %v", err)
+ }
+ return uuid.String(), nil
+}
+
+// should be exactly the same as forgejo/cmd/forgejo/actions.go
+func validateSecret(secret string) error {
+ secretLen := len(secret)
+ if secretLen != 40 {
+ return fmt.Errorf("the secret must be exactly 40 characters long, not %d", secretLen)
+ }
+ if _, err := hex.DecodeString(secret); err != nil {
+ return fmt.Errorf("the secret must be an hexadecimal string: %w", err)
+ }
+ return nil
+}
+
+func ping(cfg *config.Config, reg *config.Registration) error {
+ // initial http client
+ cli := client.New(
+ reg.Address,
+ cfg.Runner.Insecure,
+ "",
+ "",
+ ver.Version(),
+ )
+
+ _, err := cli.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{
+ Data: reg.UUID,
+ }))
+ if err != nil {
+ return fmt.Errorf("ping %s failed %w", reg.Address, err)
+ }
+ return nil
+}
+
+func runCreateRunnerFile(ctx context.Context, args *createRunnerFileArgs, configFile *string) func(cmd *cobra.Command, args []string) error {
+ return func(*cobra.Command, []string) error {
+ log.SetLevel(log.DebugLevel)
+ log.Info("Creating runner file")
+
+ //
+ // Prepare the registration data
+ //
+ cfg, err := config.LoadDefault(*configFile)
+ if err != nil {
+ return fmt.Errorf("invalid configuration: %w", err)
+ }
+
+ if err := validateSecret(args.Secret); err != nil {
+ return err
+ }
+
+ uuid, err := uuidFromSecret(args.Secret)
+ if err != nil {
+ return err
+ }
+
+ name := args.Name
+ if name == "" {
+ name, _ = os.Hostname()
+ log.Infof("Runner name is empty, use hostname '%s'.", name)
+ }
+
+ reg := &config.Registration{
+ Name: name,
+ UUID: uuid,
+ Token: args.Secret,
+ Address: args.InstanceAddr,
+ }
+
+ //
+ // Verify the Forgejo instance is reachable
+ //
+ if err := ping(cfg, reg); err != nil {
+ return err
+ }
+
+ //
+ // Save the registration file
+ //
+ if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
+ return fmt.Errorf("failed to save runner config to %s: %w", cfg.Runner.File, err)
+ }
+
+ //
+ // Verify the secret works
+ //
+ if args.Connect {
+ cli := client.New(
+ reg.Address,
+ cfg.Runner.Insecure,
+ reg.UUID,
+ reg.Token,
+ ver.Version(),
+ )
+
+ runner := run.NewRunner(cfg, reg, cli)
+ resp, err := runner.Declare(ctx, cfg.Runner.Labels)
+
+ if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented {
+ log.Warn("Cannot verify the connection because the Forgejo instance is lower than v1.21")
+ } else if err != nil {
+ log.WithError(err).Error("fail to invoke Declare")
+ return err
+ } else {
+ log.Infof("connection successful: %s, with version: %s, with labels: %v",
+ resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
+ }
+ }
+ return nil
+ }
+}
diff --git a/internal/app/cmd/create-runner-file_test.go b/internal/app/cmd/create-runner-file_test.go
new file mode 100644
index 0000000..4f3acb8
--- /dev/null
+++ b/internal/app/cmd/create-runner-file_test.go
@@ -0,0 +1,118 @@
+// SPDX-License-Identifier: MIT
+
+package cmd
+
+import (
+ "bytes"
+ "context"
+ "os"
+ "testing"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "connectrpc.com/connect"
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+ "gitea.com/gitea/act_runner/internal/pkg/ver"
+
+ "github.com/spf13/cobra"
+ "github.com/stretchr/testify/assert"
+ "gopkg.in/yaml.v3"
+)
+
+func executeCommand(ctx context.Context, cmd *cobra.Command, args ...string) (string, error) {
+ buf := new(bytes.Buffer)
+ cmd.SetOut(buf)
+ cmd.SetErr(buf)
+ cmd.SetArgs(args)
+
+ err := cmd.ExecuteContext(ctx)
+
+ return buf.String(), err
+}
+
+func Test_createRunnerFileCmd(t *testing.T) {
+ configFile := "config.yml"
+ ctx := context.Background()
+ cmd := createRunnerFileCmd(ctx, &configFile)
+ output, err := executeCommand(ctx, cmd)
+ assert.ErrorContains(t, err, `required flag(s) "instance", "secret" not set`)
+ assert.Contains(t, output, "Usage:")
+}
+
+func Test_validateSecret(t *testing.T) {
+ assert.ErrorContains(t, validateSecret("abc"), "exactly 40 characters")
+ assert.ErrorContains(t, validateSecret("ZAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"), "must be an hexadecimal")
+}
+
+func Test_uuidFromSecret(t *testing.T) {
+ uuid, err := uuidFromSecret("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
+ assert.NoError(t, err)
+ assert.EqualValues(t, uuid, "41414141-4141-4141-4141-414141414141")
+}
+
+func Test_ping(t *testing.T) {
+ cfg := &config.Config{}
+ address := os.Getenv("FORGEJO_URL")
+ if address == "" {
+ address = "https://code.forgejo.org"
+ }
+ reg := &config.Registration{
+ Address: address,
+ UUID: "create-runner-file_test.go",
+ }
+ assert.NoError(t, ping(cfg, reg))
+}
+
+func Test_runCreateRunnerFile(t *testing.T) {
+ //
+ // Set the .runner file to be in a temporary directory
+ //
+ dir := t.TempDir()
+ configFile := dir + "/config.yml"
+ runnerFile := dir + "/.runner"
+ cfg, err := config.LoadDefault("")
+ cfg.Runner.File = runnerFile
+ yamlData, err := yaml.Marshal(cfg)
+ assert.NoError(t, err)
+ assert.NoError(t, os.WriteFile(configFile, yamlData, 0o666))
+
+ instance, has := os.LookupEnv("FORGEJO_URL")
+ if !has {
+ instance = "https://code.forgejo.org"
+ }
+ secret, has := os.LookupEnv("FORGEJO_RUNNER_SECRET")
+ assert.True(t, has)
+ name := "testrunner"
+
+ //
+ // Run create-runner-file
+ //
+ ctx := context.Background()
+ cmd := createRunnerFileCmd(ctx, &configFile)
+ output, err := executeCommand(ctx, cmd, "--connect", "--secret", secret, "--instance", instance, "--name", name)
+ assert.NoError(t, err)
+ assert.EqualValues(t, "", output)
+
+ //
+ // Read back the runner file and verify its content
+ //
+ reg, err := config.LoadRegistration(runnerFile)
+ assert.NoError(t, err)
+ assert.EqualValues(t, secret, reg.Token)
+ assert.EqualValues(t, instance, reg.Address)
+
+ //
+ // Verify that fetching a task successfully returns there is
+ // no task for this runner
+ //
+ cli := client.New(
+ reg.Address,
+ cfg.Runner.Insecure,
+ reg.UUID,
+ reg.Token,
+ ver.Version(),
+ )
+ resp, err := cli.FetchTask(ctx, connect.NewRequest(&runnerv1.FetchTaskRequest{}))
+ assert.NoError(t, err)
+ assert.Nil(t, resp.Msg.Task)
+}
diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go
new file mode 100644
index 0000000..a613546
--- /dev/null
+++ b/internal/app/cmd/daemon.go
@@ -0,0 +1,208 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package cmd
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path"
+ "path/filepath"
+ "runtime"
+ "strconv"
+ "strings"
+
+ "connectrpc.com/connect"
+ "github.com/mattn/go-isatty"
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+
+ "gitea.com/gitea/act_runner/internal/app/poll"
+ "gitea.com/gitea/act_runner/internal/app/run"
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+ "gitea.com/gitea/act_runner/internal/pkg/envcheck"
+ "gitea.com/gitea/act_runner/internal/pkg/labels"
+ "gitea.com/gitea/act_runner/internal/pkg/ver"
+)
+
+func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error {
+ return func(cmd *cobra.Command, args []string) error {
+ cfg, err := config.LoadDefault(*configFile)
+ if err != nil {
+ return fmt.Errorf("invalid configuration: %w", err)
+ }
+
+ initLogging(cfg)
+ log.Infoln("Starting runner daemon")
+
+ reg, err := config.LoadRegistration(cfg.Runner.File)
+ if os.IsNotExist(err) {
+ log.Error("registration file not found, please register the runner first")
+ return err
+ } else if err != nil {
+ return fmt.Errorf("failed to load registration file: %w", err)
+ }
+
+ cfg.Tune(reg.Address)
+
+ lbls := reg.Labels
+ if len(cfg.Runner.Labels) > 0 {
+ lbls = cfg.Runner.Labels
+ }
+
+ ls := labels.Labels{}
+ for _, l := range lbls {
+ label, err := labels.Parse(l)
+ if err != nil {
+ log.WithError(err).Warnf("ignored invalid label %q", l)
+ continue
+ }
+ ls = append(ls, label)
+ }
+ if len(ls) == 0 {
+ log.Warn("no labels configured, runner may not be able to pick up jobs")
+ }
+
+ if ls.RequireDocker() {
+ dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
+ if err != nil {
+ return err
+ }
+ if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil {
+ return err
+ }
+ // if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath
+ os.Setenv("DOCKER_HOST", dockerSocketPath)
+ // empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically
+ // and assign the path to cfg.Container.DockerHost
+ if cfg.Container.DockerHost == "" {
+ cfg.Container.DockerHost = dockerSocketPath
+ }
+ // check the scheme, if the scheme is not npipe or unix
+ // set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
+ if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
+ scheme := cfg.Container.DockerHost[:protoIndex]
+ if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
+ cfg.Container.DockerHost = "-"
+ }
+ }
+ }
+
+ cli := client.New(
+ reg.Address,
+ cfg.Runner.Insecure,
+ reg.UUID,
+ reg.Token,
+ ver.Version(),
+ )
+
+ runner := run.NewRunner(cfg, reg, cli)
+ // declare the labels of the runner before fetching tasks
+ resp, err := runner.Declare(ctx, ls.Names())
+ if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented {
+ // Gitea instance is older version. skip declare step.
+ log.Warn("Because the Forgejo instance is an old version, skipping declaring the labels and version.")
+ } else if err != nil {
+ log.WithError(err).Error("fail to invoke Declare")
+ return err
+ } else {
+ log.Infof("runner: %s, with version: %s, with labels: %v, declared successfully",
+ resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
+ // if declared successfully, override the labels in the.runner file with valid labels in the config file (if specified)
+ runner.Update(ctx, ls)
+ reg.Labels = ls.ToStrings()
+ if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
+ return fmt.Errorf("failed to save runner config: %w", err)
+ }
+ }
+
+ poller := poll.New(cfg, cli, runner)
+
+ go poller.Poll()
+
+ <-ctx.Done()
+ log.Infof("runner: %s shutdown initiated, waiting [runner].shutdown_timeout=%s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
+
+ ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
+ defer cancel()
+
+ err = poller.Shutdown(ctx)
+ if err != nil {
+ log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
+ }
+ return nil
+ }
+}
+
+// initLogging setup the global logrus logger.
+func initLogging(cfg *config.Config) {
+ isTerm := isatty.IsTerminal(os.Stdout.Fd())
+ format := &log.TextFormatter{
+ DisableColors: !isTerm,
+ FullTimestamp: true,
+ }
+ log.SetFormatter(format)
+
+ if l := cfg.Log.Level; l != "" {
+ level, err := log.ParseLevel(l)
+ if err != nil {
+ log.WithError(err).
+ Errorf("invalid log level: %q", l)
+ }
+
+ // debug level
+ if level == log.DebugLevel {
+ log.SetReportCaller(true)
+ format.CallerPrettyfier = func(f *runtime.Frame) (string, string) {
+ // get function name
+ s := strings.Split(f.Function, ".")
+ funcname := "[" + s[len(s)-1] + "]"
+ // get file name and line number
+ _, filename := path.Split(f.File)
+ filename = "[" + filename + ":" + strconv.Itoa(f.Line) + "]"
+ return funcname, filename
+ }
+ log.SetFormatter(format)
+ }
+
+ if log.GetLevel() != level {
+ log.Infof("log level changed to %v", level)
+ log.SetLevel(level)
+ }
+ }
+}
+
+var commonSocketPaths = []string{
+ "/var/run/docker.sock",
+ "/run/podman/podman.sock",
+ "$HOME/.colima/docker.sock",
+ "$XDG_RUNTIME_DIR/docker.sock",
+ "$XDG_RUNTIME_DIR/podman/podman.sock",
+ `\\.\pipe\docker_engine`,
+ "$HOME/.docker/run/docker.sock",
+}
+
+func getDockerSocketPath(configDockerHost string) (string, error) {
+ // a `-` means don't mount the docker socket to job containers
+ if configDockerHost != "" && configDockerHost != "-" {
+ return configDockerHost, nil
+ }
+
+ socket, found := os.LookupEnv("DOCKER_HOST")
+ if found {
+ return socket, nil
+ }
+
+ for _, p := range commonSocketPaths {
+ if _, err := os.Lstat(os.ExpandEnv(p)); err == nil {
+ if strings.HasPrefix(p, `\\.\`) {
+ return "npipe://" + filepath.ToSlash(os.ExpandEnv(p)), nil
+ }
+ return "unix://" + filepath.ToSlash(os.ExpandEnv(p)), nil
+ }
+ }
+
+ return "", fmt.Errorf("daemon Docker Engine socket not found and docker_host config was invalid")
+}
diff --git a/internal/app/cmd/exec.go b/internal/app/cmd/exec.go
new file mode 100644
index 0000000..3e111fe
--- /dev/null
+++ b/internal/app/cmd/exec.go
@@ -0,0 +1,495 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// Copyright 2019 nektos
+// SPDX-License-Identifier: MIT
+
+package cmd
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/docker/docker/api/types/container"
+ "github.com/joho/godotenv"
+ "github.com/nektos/act/pkg/artifactcache"
+ "github.com/nektos/act/pkg/artifacts"
+ "github.com/nektos/act/pkg/common"
+ "github.com/nektos/act/pkg/model"
+ "github.com/nektos/act/pkg/runner"
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+ "golang.org/x/term"
+)
+
+type executeArgs struct {
+ runList bool
+ job string
+ event string
+ workdir string
+ workflowsPath string
+ noWorkflowRecurse bool
+ autodetectEvent bool
+ forcePull bool
+ forceRebuild bool
+ jsonLogger bool
+ envs []string
+ envfile string
+ secrets []string
+ defaultActionsURL string
+ insecureSecrets bool
+ privileged bool
+ usernsMode string
+ containerArchitecture string
+ containerDaemonSocket string
+ useGitIgnore bool
+ containerCapAdd []string
+ containerCapDrop []string
+ containerOptions string
+ artifactServerPath string
+ artifactServerAddr string
+ artifactServerPort string
+ noSkipCheckout bool
+ debug bool
+ dryrun bool
+ image string
+ cacheHandler *artifactcache.Handler
+ network string
+ enableIPv6 bool
+ githubInstance string
+}
+
+// WorkflowsPath returns path to workflow file(s)
+func (i *executeArgs) WorkflowsPath() string {
+ return i.resolve(i.workflowsPath)
+}
+
+// Envfile returns path to .env
+func (i *executeArgs) Envfile() string {
+ return i.resolve(i.envfile)
+}
+
+func (i *executeArgs) LoadSecrets() map[string]string {
+ s := make(map[string]string)
+ for _, secretPair := range i.secrets {
+ secretPairParts := strings.SplitN(secretPair, "=", 2)
+ secretPairParts[0] = strings.ToUpper(secretPairParts[0])
+ if strings.ToUpper(s[secretPairParts[0]]) == secretPairParts[0] {
+ log.Errorf("Secret %s is already defined (secrets are case insensitive)", secretPairParts[0])
+ }
+ if len(secretPairParts) == 2 {
+ s[secretPairParts[0]] = secretPairParts[1]
+ } else if env, ok := os.LookupEnv(secretPairParts[0]); ok && env != "" {
+ s[secretPairParts[0]] = env
+ } else {
+ fmt.Printf("Provide value for '%s': ", secretPairParts[0])
+ val, err := term.ReadPassword(int(os.Stdin.Fd()))
+ fmt.Println()
+ if err != nil {
+ log.Errorf("failed to read input: %v", err)
+ os.Exit(1)
+ }
+ s[secretPairParts[0]] = string(val)
+ }
+ }
+ return s
+}
+
+func readEnvs(path string, envs map[string]string) bool {
+ if _, err := os.Stat(path); err == nil {
+ env, err := godotenv.Read(path)
+ if err != nil {
+ log.Fatalf("Error loading from %s: %v", path, err)
+ }
+ for k, v := range env {
+ envs[k] = v
+ }
+ return true
+ }
+ return false
+}
+
+func (i *executeArgs) LoadEnvs() map[string]string {
+ envs := make(map[string]string)
+ if i.envs != nil {
+ for _, envVar := range i.envs {
+ e := strings.SplitN(envVar, `=`, 2)
+ if len(e) == 2 {
+ envs[e[0]] = e[1]
+ } else {
+ envs[e[0]] = ""
+ }
+ }
+ }
+ _ = readEnvs(i.Envfile(), envs)
+
+ envs["ACTIONS_CACHE_URL"] = i.cacheHandler.ExternalURL() + "/"
+
+ return envs
+}
+
+// Workdir returns path to workdir
+func (i *executeArgs) Workdir() string {
+ return i.resolve(".")
+}
+
+func (i *executeArgs) resolve(path string) string {
+ basedir, err := filepath.Abs(i.workdir)
+ if err != nil {
+ log.Fatal(err)
+ }
+ if path == "" {
+ return path
+ }
+ if !filepath.IsAbs(path) {
+ path = filepath.Join(basedir, path)
+ }
+ return path
+}
+
+func printList(plan *model.Plan) error {
+ type lineInfoDef struct {
+ jobID string
+ jobName string
+ stage string
+ wfName string
+ wfFile string
+ events string
+ }
+ lineInfos := []lineInfoDef{}
+
+ header := lineInfoDef{
+ jobID: "Job ID",
+ jobName: "Job name",
+ stage: "Stage",
+ wfName: "Workflow name",
+ wfFile: "Workflow file",
+ events: "Events",
+ }
+
+ jobs := map[string]bool{}
+ duplicateJobIDs := false
+
+ jobIDMaxWidth := len(header.jobID)
+ jobNameMaxWidth := len(header.jobName)
+ stageMaxWidth := len(header.stage)
+ wfNameMaxWidth := len(header.wfName)
+ wfFileMaxWidth := len(header.wfFile)
+ eventsMaxWidth := len(header.events)
+
+ for i, stage := range plan.Stages {
+ for _, r := range stage.Runs {
+ jobID := r.JobID
+ line := lineInfoDef{
+ jobID: jobID,
+ jobName: r.String(),
+ stage: strconv.Itoa(i),
+ wfName: r.Workflow.Name,
+ wfFile: r.Workflow.File,
+ events: strings.Join(r.Workflow.On(), `,`),
+ }
+ if _, ok := jobs[jobID]; ok {
+ duplicateJobIDs = true
+ } else {
+ jobs[jobID] = true
+ }
+ lineInfos = append(lineInfos, line)
+ if jobIDMaxWidth < len(line.jobID) {
+ jobIDMaxWidth = len(line.jobID)
+ }
+ if jobNameMaxWidth < len(line.jobName) {
+ jobNameMaxWidth = len(line.jobName)
+ }
+ if stageMaxWidth < len(line.stage) {
+ stageMaxWidth = len(line.stage)
+ }
+ if wfNameMaxWidth < len(line.wfName) {
+ wfNameMaxWidth = len(line.wfName)
+ }
+ if wfFileMaxWidth < len(line.wfFile) {
+ wfFileMaxWidth = len(line.wfFile)
+ }
+ if eventsMaxWidth < len(line.events) {
+ eventsMaxWidth = len(line.events)
+ }
+ }
+ }
+
+ jobIDMaxWidth += 2
+ jobNameMaxWidth += 2
+ stageMaxWidth += 2
+ wfNameMaxWidth += 2
+ wfFileMaxWidth += 2
+
+ fmt.Printf("%*s%*s%*s%*s%*s%*s\n",
+ -stageMaxWidth, header.stage,
+ -jobIDMaxWidth, header.jobID,
+ -jobNameMaxWidth, header.jobName,
+ -wfNameMaxWidth, header.wfName,
+ -wfFileMaxWidth, header.wfFile,
+ -eventsMaxWidth, header.events,
+ )
+ for _, line := range lineInfos {
+ fmt.Printf("%*s%*s%*s%*s%*s%*s\n",
+ -stageMaxWidth, line.stage,
+ -jobIDMaxWidth, line.jobID,
+ -jobNameMaxWidth, line.jobName,
+ -wfNameMaxWidth, line.wfName,
+ -wfFileMaxWidth, line.wfFile,
+ -eventsMaxWidth, line.events,
+ )
+ }
+ if duplicateJobIDs {
+ fmt.Print("\nDetected multiple jobs with the same job name, use `-W` to specify the path to the specific workflow.\n")
+ }
+ return nil
+}
+
+func runExecList(ctx context.Context, planner model.WorkflowPlanner, execArgs *executeArgs) error {
+ // plan with filtered jobs - to be used for filtering only
+ var filterPlan *model.Plan
+
+ // Determine the event name to be filtered
+ var filterEventName string
+
+ if len(execArgs.event) > 0 {
+ log.Infof("Using chosed event for filtering: %s", execArgs.event)
+ filterEventName = execArgs.event
+ } else if execArgs.autodetectEvent {
+ // collect all events from loaded workflows
+ events := planner.GetEvents()
+
+ // set default event type to first event from many available
+ // this way user dont have to specify the event.
+ log.Infof("Using first detected workflow event for filtering: %s", events[0])
+
+ filterEventName = events[0]
+ }
+
+ var err error
+ if execArgs.job != "" {
+ log.Infof("Preparing plan with a job: %s", execArgs.job)
+ filterPlan, err = planner.PlanJob(execArgs.job)
+ if err != nil {
+ return err
+ }
+ } else if filterEventName != "" {
+ log.Infof("Preparing plan for a event: %s", filterEventName)
+ filterPlan, err = planner.PlanEvent(filterEventName)
+ if err != nil {
+ return err
+ }
+ } else {
+ log.Infof("Preparing plan with all jobs")
+ filterPlan, err = planner.PlanAll()
+ if err != nil {
+ return err
+ }
+ }
+
+ _ = printList(filterPlan)
+
+ return nil
+}
+
+func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command, args []string) error {
+ return func(cmd *cobra.Command, args []string) error {
+ planner, err := model.NewWorkflowPlanner(execArgs.WorkflowsPath(), execArgs.noWorkflowRecurse)
+ if err != nil {
+ return err
+ }
+
+ if execArgs.runList {
+ return runExecList(ctx, planner, execArgs)
+ }
+
+ // plan with triggered jobs
+ var plan *model.Plan
+
+ // Determine the event name to be triggered
+ var eventName string
+
+ // collect all events from loaded workflows
+ events := planner.GetEvents()
+
+ if len(execArgs.event) > 0 {
+ log.Infof("Using chosed event for filtering: %s", execArgs.event)
+ eventName = execArgs.event
+ } else if len(events) == 1 && len(events[0]) > 0 {
+ log.Infof("Using the only detected workflow event: %s", events[0])
+ eventName = events[0]
+ } else if execArgs.autodetectEvent && len(events) > 0 && len(events[0]) > 0 {
+ // set default event type to first event from many available
+ // this way user dont have to specify the event.
+ log.Infof("Using first detected workflow event: %s", events[0])
+ eventName = events[0]
+ } else {
+ log.Infof("Using default workflow event: push")
+ eventName = "push"
+ }
+
+ // build the plan for this run
+ if execArgs.job != "" {
+ log.Infof("Planning job: %s", execArgs.job)
+ plan, err = planner.PlanJob(execArgs.job)
+ if err != nil {
+ return err
+ }
+ } else {
+ log.Infof("Planning jobs for event: %s", eventName)
+ plan, err = planner.PlanEvent(eventName)
+ if err != nil {
+ return err
+ }
+ }
+
+ maxLifetime := 3 * time.Hour
+ if deadline, ok := ctx.Deadline(); ok {
+ maxLifetime = time.Until(deadline)
+ }
+
+ // init a cache server
+ handler, err := artifactcache.StartHandler("", "", 0, log.StandardLogger().WithField("module", "cache_request"))
+ if err != nil {
+ return err
+ }
+ log.Infof("cache handler listens on: %v", handler.ExternalURL())
+ execArgs.cacheHandler = handler
+
+ if len(execArgs.artifactServerAddr) == 0 {
+ ip := common.GetOutboundIP()
+ if ip == nil {
+ return fmt.Errorf("unable to determine outbound IP address")
+ }
+ execArgs.artifactServerAddr = ip.String()
+ }
+
+ if len(execArgs.artifactServerPath) == 0 {
+ tempDir, err := os.MkdirTemp("", "gitea-act-")
+ if err != nil {
+ fmt.Println(err)
+ }
+ defer os.RemoveAll(tempDir)
+
+ execArgs.artifactServerPath = tempDir
+ }
+
+ // run the plan
+ config := &runner.Config{
+ Workdir: execArgs.Workdir(),
+ BindWorkdir: false,
+ ReuseContainers: false,
+ ForcePull: execArgs.forcePull,
+ ForceRebuild: execArgs.forceRebuild,
+ LogOutput: true,
+ JSONLogger: execArgs.jsonLogger,
+ Env: execArgs.LoadEnvs(),
+ Secrets: execArgs.LoadSecrets(),
+ InsecureSecrets: execArgs.insecureSecrets,
+ Privileged: execArgs.privileged,
+ UsernsMode: execArgs.usernsMode,
+ ContainerArchitecture: execArgs.containerArchitecture,
+ ContainerDaemonSocket: execArgs.containerDaemonSocket,
+ UseGitIgnore: execArgs.useGitIgnore,
+ GitHubInstance: execArgs.githubInstance,
+ ContainerCapAdd: execArgs.containerCapAdd,
+ ContainerCapDrop: execArgs.containerCapDrop,
+ ContainerOptions: execArgs.containerOptions,
+ AutoRemove: true,
+ ArtifactServerPath: execArgs.artifactServerPath,
+ ArtifactServerPort: execArgs.artifactServerPort,
+ ArtifactServerAddr: execArgs.artifactServerAddr,
+ NoSkipCheckout: execArgs.noSkipCheckout,
+ // PresetGitHubContext: preset,
+ // EventJSON: string(eventJSON),
+ ContainerNamePrefix: fmt.Sprintf("FORGEJO-ACTIONS-TASK-%s", eventName),
+ ContainerMaxLifetime: maxLifetime,
+ ContainerNetworkMode: container.NetworkMode(execArgs.network),
+ ContainerNetworkEnableIPv6: execArgs.enableIPv6,
+ DefaultActionInstance: execArgs.defaultActionsURL,
+ PlatformPicker: func(_ []string) string {
+ return execArgs.image
+ },
+ ValidVolumes: []string{"**"}, // All volumes are allowed for `exec` command
+ }
+
+ config.Env["ACT_EXEC"] = "true"
+
+ if t := config.Secrets["GITEA_TOKEN"]; t != "" {
+ config.Token = t
+ } else if t := config.Secrets["GITHUB_TOKEN"]; t != "" {
+ config.Token = t
+ }
+
+ if !execArgs.debug {
+ logLevel := log.InfoLevel
+ config.JobLoggerLevel = &logLevel
+ }
+
+ r, err := runner.New(config)
+ if err != nil {
+ return err
+ }
+
+ artifactCancel := artifacts.Serve(ctx, execArgs.artifactServerPath, execArgs.artifactServerAddr, execArgs.artifactServerPort)
+ log.Debugf("artifacts server started at %s:%s", execArgs.artifactServerPath, execArgs.artifactServerPort)
+
+ ctx = common.WithDryrun(ctx, execArgs.dryrun)
+ executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
+ artifactCancel()
+ return nil
+ })
+
+ return executor(ctx)
+ }
+}
+
+func loadExecCmd(ctx context.Context) *cobra.Command {
+ execArg := executeArgs{}
+
+ execCmd := &cobra.Command{
+ Use: "exec",
+ Short: "Run workflow locally.",
+ Args: cobra.MaximumNArgs(20),
+ RunE: runExec(ctx, &execArg),
+ }
+
+ execCmd.Flags().BoolVarP(&execArg.runList, "list", "l", false, "list workflows")
+ execCmd.Flags().StringVarP(&execArg.job, "job", "j", "", "run a specific job ID")
+ execCmd.Flags().StringVarP(&execArg.event, "event", "E", "", "run a event name")
+ execCmd.PersistentFlags().StringVarP(&execArg.workflowsPath, "workflows", "W", "./.forgejo/workflows/", "path to workflow file(s)")
+ execCmd.PersistentFlags().StringVarP(&execArg.workdir, "directory", "C", ".", "working directory")
+ execCmd.PersistentFlags().BoolVarP(&execArg.noWorkflowRecurse, "no-recurse", "", false, "Flag to disable running workflows from subdirectories of specified path in '--workflows'/'-W' flag")
+ execCmd.Flags().BoolVarP(&execArg.autodetectEvent, "detect-event", "", false, "Use first event type from workflow as event that triggered the workflow")
+ execCmd.Flags().BoolVarP(&execArg.forcePull, "pull", "p", false, "pull docker image(s) even if already present")
+ execCmd.Flags().BoolVarP(&execArg.forceRebuild, "rebuild", "", false, "rebuild local action docker image(s) even if already present")
+ execCmd.PersistentFlags().BoolVar(&execArg.jsonLogger, "json", false, "Output logs in json format")
+ execCmd.Flags().StringArrayVarP(&execArg.envs, "env", "", []string{}, "env to make available to actions with optional value (e.g. --env myenv=foo or --env myenv)")
+ execCmd.PersistentFlags().StringVarP(&execArg.envfile, "env-file", "", ".env", "environment file to read and use as env in the containers")
+ execCmd.Flags().StringArrayVarP(&execArg.secrets, "secret", "s", []string{}, "secret to make available to actions with optional value (e.g. -s mysecret=foo or -s mysecret)")
+ execCmd.PersistentFlags().BoolVarP(&execArg.insecureSecrets, "insecure-secrets", "", false, "NOT RECOMMENDED! Doesn't hide secrets while printing logs.")
+ execCmd.Flags().BoolVar(&execArg.privileged, "privileged", false, "use privileged mode")
+ execCmd.Flags().StringVar(&execArg.usernsMode, "userns", "", "user namespace to use")
+ execCmd.PersistentFlags().StringVarP(&execArg.containerArchitecture, "container-architecture", "", "", "Architecture which should be used to run containers, e.g.: linux/amd64. If not specified, will use host default architecture. Requires Docker server API Version 1.41+. Ignored on earlier Docker server platforms.")
+ execCmd.PersistentFlags().StringVarP(&execArg.containerDaemonSocket, "container-daemon-socket", "", "/var/run/docker.sock", "Path to Docker daemon socket which will be mounted to containers")
+ execCmd.Flags().BoolVar(&execArg.useGitIgnore, "use-gitignore", true, "Controls whether paths specified in .gitignore should be copied into container")
+ execCmd.Flags().StringArrayVarP(&execArg.containerCapAdd, "container-cap-add", "", []string{}, "kernel capabilities to add to the workflow containers (e.g. --container-cap-add SYS_PTRACE)")
+ execCmd.Flags().StringArrayVarP(&execArg.containerCapDrop, "container-cap-drop", "", []string{}, "kernel capabilities to remove from the workflow containers (e.g. --container-cap-drop SYS_PTRACE)")
+ execCmd.Flags().StringVarP(&execArg.containerOptions, "container-opts", "", "", "container options")
+ execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPath, "artifact-server-path", "", ".", "Defines the path where the artifact server stores uploads and retrieves downloads from. If not specified the artifact server will not start.")
+ execCmd.PersistentFlags().StringVarP(&execArg.artifactServerAddr, "artifact-server-addr", "", "", "Defines the address where the artifact server listens")
+ execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPort, "artifact-server-port", "", "34567", "Defines the port where the artifact server listens (will only bind to localhost).")
+ execCmd.PersistentFlags().StringVarP(&execArg.defaultActionsURL, "default-actions-url", "", "https://code.forgejo.org", "Defines the default base url of the action.")
+ execCmd.PersistentFlags().BoolVarP(&execArg.noSkipCheckout, "no-skip-checkout", "", false, "Do not skip actions/checkout")
+ execCmd.PersistentFlags().BoolVarP(&execArg.debug, "debug", "d", false, "enable debug log")
+ execCmd.PersistentFlags().BoolVarP(&execArg.dryrun, "dryrun", "n", false, "dryrun mode")
+ execCmd.PersistentFlags().StringVarP(&execArg.image, "image", "i", "node:20-bullseye", "Docker image to use. Use \"-self-hosted\" to run directly on the host.")
+ execCmd.PersistentFlags().StringVarP(&execArg.network, "network", "", "", "Specify the network to which the container will connect")
+ execCmd.PersistentFlags().BoolVarP(&execArg.enableIPv6, "enable-ipv6", "6", false, "Create network with IPv6 enabled.")
+ execCmd.PersistentFlags().StringVarP(&execArg.githubInstance, "gitea-instance", "", "", "Gitea instance to use.")
+
+ return execCmd
+}
diff --git a/internal/app/cmd/register.go b/internal/app/cmd/register.go
new file mode 100644
index 0000000..803511a
--- /dev/null
+++ b/internal/app/cmd/register.go
@@ -0,0 +1,355 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package cmd
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "os"
+ "os/signal"
+ goruntime "runtime"
+ "strings"
+ "time"
+
+ pingv1 "code.gitea.io/actions-proto-go/ping/v1"
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "connectrpc.com/connect"
+ "github.com/mattn/go-isatty"
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+ "gitea.com/gitea/act_runner/internal/pkg/labels"
+ "gitea.com/gitea/act_runner/internal/pkg/ver"
+)
+
+// runRegister registers a runner to the server
+func runRegister(ctx context.Context, regArgs *registerArgs, configFile *string) func(*cobra.Command, []string) error {
+ return func(cmd *cobra.Command, args []string) error {
+ log.SetReportCaller(false)
+ isTerm := isatty.IsTerminal(os.Stdout.Fd())
+ log.SetFormatter(&log.TextFormatter{
+ DisableColors: !isTerm,
+ DisableTimestamp: true,
+ })
+ log.SetLevel(log.DebugLevel)
+
+ log.Infof("Registering runner, arch=%s, os=%s, version=%s.",
+ goruntime.GOARCH, goruntime.GOOS, ver.Version())
+
+ // runner always needs root permission
+ if os.Getuid() != 0 {
+ // TODO: use a better way to check root permission
+ log.Warnf("Runner in user-mode.")
+ }
+
+ if regArgs.NoInteractive {
+ if err := registerNoInteractive(ctx, *configFile, regArgs); err != nil {
+ return err
+ }
+ } else {
+ go func() {
+ if err := registerInteractive(ctx, *configFile); err != nil {
+ log.Fatal(err)
+ return
+ }
+ os.Exit(0)
+ }()
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+ <-c
+ }
+
+ return nil
+ }
+}
+
+// registerArgs represents the arguments for register command
+type registerArgs struct {
+ NoInteractive bool
+ InstanceAddr string
+ Token string
+ RunnerName string
+ Labels string
+}
+
+type registerStage int8
+
+const (
+ StageUnknown registerStage = -1
+ StageOverwriteLocalConfig registerStage = iota + 1
+ StageInputInstance
+ StageInputToken
+ StageInputRunnerName
+ StageInputLabels
+ StageWaitingForRegistration
+ StageExit
+)
+
+var defaultLabels = []string{
+ "docker:docker://node:20-bullseye",
+}
+
+type registerInputs struct {
+ InstanceAddr string
+ Token string
+ RunnerName string
+ Labels []string
+}
+
+func (r *registerInputs) validate() error {
+ if r.InstanceAddr == "" {
+ return fmt.Errorf("instance address is empty")
+ }
+ if r.Token == "" {
+ return fmt.Errorf("token is empty")
+ }
+ if len(r.Labels) > 0 {
+ return validateLabels(r.Labels)
+ }
+ return nil
+}
+
+func validateLabels(ls []string) error {
+ for _, label := range ls {
+ if _, err := labels.Parse(label); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (r *registerInputs) assignToNext(stage registerStage, value string, cfg *config.Config) registerStage {
+ // must set instance address and token.
+ // if empty, keep current stage.
+ if stage == StageInputInstance || stage == StageInputToken {
+ if value == "" {
+ return stage
+ }
+ }
+
+ // set hostname for runner name if empty
+ if stage == StageInputRunnerName && value == "" {
+ value, _ = os.Hostname()
+ }
+
+ switch stage {
+ case StageOverwriteLocalConfig:
+ if value == "Y" || value == "y" {
+ return StageInputInstance
+ }
+ return StageExit
+ case StageInputInstance:
+ r.InstanceAddr = value
+ return StageInputToken
+ case StageInputToken:
+ r.Token = value
+ return StageInputRunnerName
+ case StageInputRunnerName:
+ r.RunnerName = value
+ // if there are some labels configured in config file, skip input labels stage
+ if len(cfg.Runner.Labels) > 0 {
+ ls := make([]string, 0, len(cfg.Runner.Labels))
+ for _, l := range cfg.Runner.Labels {
+ _, err := labels.Parse(l)
+ if err != nil {
+ log.WithError(err).Warnf("ignored invalid label %q", l)
+ continue
+ }
+ ls = append(ls, l)
+ }
+ if len(ls) == 0 {
+ log.Warn("no valid labels configured in config file, runner may not be able to pick up jobs")
+ }
+ r.Labels = ls
+ return StageWaitingForRegistration
+ }
+ return StageInputLabels
+ case StageInputLabels:
+ r.Labels = defaultLabels
+ if value != "" {
+ r.Labels = strings.Split(value, ",")
+ }
+
+ if validateLabels(r.Labels) != nil {
+ log.Infoln("Invalid labels, please input again, leave blank to use the default labels (for example, ubuntu-20.04:docker://node:20-bookworm,ubuntu-18.04:docker://node:20-bookworm)")
+ return StageInputLabels
+ }
+ return StageWaitingForRegistration
+ }
+ return StageUnknown
+}
+
+func registerInteractive(ctx context.Context, configFile string) error {
+ var (
+ reader = bufio.NewReader(os.Stdin)
+ stage = StageInputInstance
+ inputs = new(registerInputs)
+ )
+
+ cfg, err := config.LoadDefault(configFile)
+ if err != nil {
+ return fmt.Errorf("failed to load config: %v", err)
+ }
+ if f, err := os.Stat(cfg.Runner.File); err == nil && !f.IsDir() {
+ stage = StageOverwriteLocalConfig
+ }
+
+ for {
+ printStageHelp(stage)
+
+ cmdString, err := reader.ReadString('\n')
+ if err != nil {
+ return err
+ }
+ stage = inputs.assignToNext(stage, strings.TrimSpace(cmdString), cfg)
+
+ if stage == StageWaitingForRegistration {
+ log.Infof("Registering runner, name=%s, instance=%s, labels=%v.", inputs.RunnerName, inputs.InstanceAddr, inputs.Labels)
+ if err := doRegister(ctx, cfg, inputs); err != nil {
+ return fmt.Errorf("Failed to register runner: %w", err)
+ }
+ log.Infof("Runner registered successfully.")
+ return nil
+ }
+
+ if stage == StageExit {
+ return nil
+ }
+
+ if stage <= StageUnknown {
+ log.Errorf("Invalid input, please re-run act command.")
+ return nil
+ }
+ }
+}
+
+func printStageHelp(stage registerStage) {
+ switch stage {
+ case StageOverwriteLocalConfig:
+ log.Infoln("Runner is already registered, overwrite local config? [y/N]")
+ case StageInputInstance:
+ log.Infoln("Enter the Forgejo instance URL (for example, https://next.forgejo.org/):")
+ case StageInputToken:
+ log.Infoln("Enter the runner token:")
+ case StageInputRunnerName:
+ hostname, _ := os.Hostname()
+ log.Infof("Enter the runner name (if set empty, use hostname: %s):\n", hostname)
+ case StageInputLabels:
+ log.Infoln("Enter the runner labels, leave blank to use the default labels (comma-separated, for example, ubuntu-20.04:docker://node:20-bookworm,ubuntu-18.04:docker://node:20-bookworm):")
+ case StageWaitingForRegistration:
+ log.Infoln("Waiting for registration...")
+ }
+}
+
+func registerNoInteractive(ctx context.Context, configFile string, regArgs *registerArgs) error {
+ cfg, err := config.LoadDefault(configFile)
+ if err != nil {
+ return err
+ }
+ inputs := &registerInputs{
+ InstanceAddr: regArgs.InstanceAddr,
+ Token: regArgs.Token,
+ RunnerName: regArgs.RunnerName,
+ Labels: defaultLabels,
+ }
+ regArgs.Labels = strings.TrimSpace(regArgs.Labels)
+ // command line flag.
+ if regArgs.Labels != "" {
+ inputs.Labels = strings.Split(regArgs.Labels, ",")
+ }
+ // specify labels in config file.
+ if len(cfg.Runner.Labels) > 0 {
+ if regArgs.Labels != "" {
+ log.Warn("Labels from command will be ignored, use labels defined in config file.")
+ }
+ inputs.Labels = cfg.Runner.Labels
+ }
+
+ if inputs.RunnerName == "" {
+ inputs.RunnerName, _ = os.Hostname()
+ log.Infof("Runner name is empty, use hostname '%s'.", inputs.RunnerName)
+ }
+ if err := inputs.validate(); err != nil {
+ log.WithError(err).Errorf("Invalid input, please re-run act command.")
+ return nil
+ }
+ if err := doRegister(ctx, cfg, inputs); err != nil {
+ return fmt.Errorf("Failed to register runner: %w", err)
+ }
+ log.Infof("Runner registered successfully.")
+ return nil
+}
+
+func doRegister(ctx context.Context, cfg *config.Config, inputs *registerInputs) error {
+ // initial http client
+ cli := client.New(
+ inputs.InstanceAddr,
+ cfg.Runner.Insecure,
+ "",
+ "",
+ ver.Version(),
+ )
+
+ for {
+ _, err := cli.Ping(ctx, connect.NewRequest(&pingv1.PingRequest{
+ Data: inputs.RunnerName,
+ }))
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ if ctx.Err() != nil {
+ break
+ }
+ if err != nil {
+ log.WithError(err).
+ Errorln("Cannot ping the Forgejo instance server")
+ // TODO: if ping failed, retry or exit
+ time.Sleep(time.Second)
+ } else {
+ log.Debugln("Successfully pinged the Forgejo instance server")
+ break
+ }
+ }
+
+ reg := &config.Registration{
+ Name: inputs.RunnerName,
+ Token: inputs.Token,
+ Address: inputs.InstanceAddr,
+ Labels: inputs.Labels,
+ }
+
+ ls := make([]string, len(reg.Labels))
+ for i, v := range reg.Labels {
+ l, _ := labels.Parse(v)
+ ls[i] = l.Name
+ }
+ // register new runner.
+ resp, err := cli.Register(ctx, connect.NewRequest(&runnerv1.RegisterRequest{
+ Name: reg.Name,
+ Token: reg.Token,
+ Version: ver.Version(),
+ AgentLabels: ls, // Could be removed after Gitea 1.20
+ Labels: ls,
+ }))
+ if err != nil {
+ log.WithError(err).Error("poller: cannot register new runner")
+ return err
+ }
+
+ reg.ID = resp.Msg.Runner.Id
+ reg.UUID = resp.Msg.Runner.Uuid
+ reg.Name = resp.Msg.Runner.Name
+ reg.Token = resp.Msg.Runner.Token
+
+ if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
+ return fmt.Errorf("failed to save runner config: %w", err)
+ }
+ return nil
+}
diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go
new file mode 100644
index 0000000..cc89fa5
--- /dev/null
+++ b/internal/app/poll/poller.go
@@ -0,0 +1,167 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package poll
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "connectrpc.com/connect"
+ log "github.com/sirupsen/logrus"
+ "golang.org/x/time/rate"
+
+ "gitea.com/gitea/act_runner/internal/app/run"
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+)
+
+const PollerID = "PollerID"
+
+type Poller interface {
+ Poll()
+ Shutdown(ctx context.Context) error
+}
+
+type poller struct {
+ client client.Client
+ runner run.RunnerInterface
+ cfg *config.Config
+ tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
+
+ pollingCtx context.Context
+ shutdownPolling context.CancelFunc
+
+ jobsCtx context.Context
+ shutdownJobs context.CancelFunc
+
+ done chan any
+}
+
+func New(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
+ return (&poller{}).init(cfg, client, runner)
+}
+
+func (p *poller) init(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
+ pollingCtx, shutdownPolling := context.WithCancel(context.Background())
+
+ jobsCtx, shutdownJobs := context.WithCancel(context.Background())
+
+ done := make(chan any)
+
+ p.client = client
+ p.runner = runner
+ p.cfg = cfg
+
+ p.pollingCtx = pollingCtx
+ p.shutdownPolling = shutdownPolling
+
+ p.jobsCtx = jobsCtx
+ p.shutdownJobs = shutdownJobs
+ p.done = done
+
+ return p
+}
+
+func (p *poller) Poll() {
+ limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
+ wg := &sync.WaitGroup{}
+ for i := 0; i < p.cfg.Runner.Capacity; i++ {
+ wg.Add(1)
+ go p.poll(i, wg, limiter)
+ }
+ wg.Wait()
+
+ // signal the poller is finished
+ close(p.done)
+}
+
+func (p *poller) Shutdown(ctx context.Context) error {
+ p.shutdownPolling()
+
+ select {
+ case <-p.done:
+ log.Trace("all jobs are complete")
+ return nil
+
+ case <-ctx.Done():
+ log.Trace("forcing the jobs to shutdown")
+ p.shutdownJobs()
+ <-p.done
+ log.Trace("all jobs have been shutdown")
+ return ctx.Err()
+ }
+}
+
+func (p *poller) poll(id int, wg *sync.WaitGroup, limiter *rate.Limiter) {
+ log.Infof("[poller %d] launched", id)
+ defer wg.Done()
+ for {
+ if err := limiter.Wait(p.pollingCtx); err != nil {
+ log.Infof("[poller %d] shutdown", id)
+ return
+ }
+ task, ok := p.fetchTask(p.pollingCtx)
+ if !ok {
+ continue
+ }
+ p.runTaskWithRecover(p.jobsCtx, task)
+ }
+}
+
+func (p *poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
+ defer func() {
+ if r := recover(); r != nil {
+ err := fmt.Errorf("panic: %v", r)
+ log.WithError(err).Error("panic in runTaskWithRecover")
+ }
+ }()
+
+ if err := p.runner.Run(ctx, task); err != nil {
+ log.WithError(err).Error("failed to run task")
+ }
+}
+
+func (p *poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
+ reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
+ defer cancel()
+
+ // Load the version value that was in the cache when the request was sent.
+ v := p.tasksVersion.Load()
+ resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{
+ TasksVersion: v,
+ }))
+ if errors.Is(err, context.DeadlineExceeded) {
+ log.Trace("deadline exceeded")
+ err = nil
+ }
+ if err != nil {
+ if errors.Is(err, context.Canceled) {
+ log.WithError(err).Debugf("shutdown, fetch task canceled")
+ } else {
+ log.WithError(err).Error("failed to fetch task")
+ }
+ return nil, false
+ }
+
+ if resp == nil || resp.Msg == nil {
+ return nil, false
+ }
+
+ if resp.Msg.TasksVersion > v {
+ p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
+ }
+
+ if resp.Msg.Task == nil {
+ return nil, false
+ }
+
+ // got a task, set `tasksVersion` to zero to focre query db in next request.
+ p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
+
+ return resp.Msg.Task, true
+}
diff --git a/internal/app/poll/poller_test.go b/internal/app/poll/poller_test.go
new file mode 100644
index 0000000..04b1a84
--- /dev/null
+++ b/internal/app/poll/poller_test.go
@@ -0,0 +1,263 @@
+// Copyright The Forgejo Authors.
+// SPDX-License-Identifier: MIT
+
+package poll
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "connectrpc.com/connect"
+
+ "code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+)
+
+type mockPoller struct {
+ poller
+}
+
+func (o *mockPoller) Poll() {
+ o.poller.Poll()
+}
+
+type mockClient struct {
+ pingv1connect.PingServiceClient
+ runnerv1connect.RunnerServiceClient
+
+ sleep time.Duration
+ cancel bool
+ err error
+ noTask bool
+}
+
+func (o mockClient) Address() string {
+ return ""
+}
+
+func (o mockClient) Insecure() bool {
+ return true
+}
+
+func (o *mockClient) FetchTask(ctx context.Context, req *connect.Request[runnerv1.FetchTaskRequest]) (*connect.Response[runnerv1.FetchTaskResponse], error) {
+ if o.sleep > 0 {
+ select {
+ case <-ctx.Done():
+ log.Trace("fetch task done")
+ return nil, context.DeadlineExceeded
+ case <-time.After(o.sleep):
+ log.Trace("slept")
+ return nil, fmt.Errorf("unexpected")
+ }
+ }
+ if o.cancel {
+ return nil, context.Canceled
+ }
+ if o.err != nil {
+ return nil, o.err
+ }
+ task := &runnerv1.Task{}
+ if o.noTask {
+ task = nil
+ o.noTask = false
+ }
+
+ return connect.NewResponse(&runnerv1.FetchTaskResponse{
+ Task: task,
+ TasksVersion: int64(1),
+ }), nil
+}
+
+type mockRunner struct {
+ cfg *config.Runner
+ log chan string
+ panics bool
+ err error
+}
+
+func (o *mockRunner) Run(ctx context.Context, task *runnerv1.Task) error {
+ o.log <- "runner starts"
+ if o.panics {
+ log.Trace("panics")
+ o.log <- "runner panics"
+ o.panics = false
+ panic("whatever")
+ }
+ if o.err != nil {
+ log.Trace("error")
+ o.log <- "runner error"
+ err := o.err
+ o.err = nil
+ return err
+ }
+ for {
+ select {
+ case <-ctx.Done():
+ log.Trace("shutdown")
+ o.log <- "runner shutdown"
+ return nil
+ case <-time.After(o.cfg.Timeout):
+ log.Trace("after")
+ o.log <- "runner timeout"
+ return nil
+ }
+ }
+}
+
+func setTrace(t *testing.T) {
+ t.Helper()
+ log.SetReportCaller(true)
+ log.SetLevel(log.TraceLevel)
+}
+
+func TestPoller_New(t *testing.T) {
+ p := New(&config.Config{}, &mockClient{}, &mockRunner{})
+ assert.NotNil(t, p)
+}
+
+func TestPoller_Runner(t *testing.T) {
+ setTrace(t)
+ for _, testCase := range []struct {
+ name string
+ timeout time.Duration
+ noTask bool
+ panics bool
+ err error
+ expected string
+ contextTimeout time.Duration
+ }{
+ {
+ name: "Simple",
+ timeout: 10 * time.Second,
+ expected: "runner shutdown",
+ },
+ {
+ name: "Panics",
+ timeout: 10 * time.Second,
+ panics: true,
+ expected: "runner panics",
+ },
+ {
+ name: "Error",
+ timeout: 10 * time.Second,
+ err: fmt.Errorf("ERROR"),
+ expected: "runner error",
+ },
+ {
+ name: "PollTaskError",
+ timeout: 10 * time.Second,
+ noTask: true,
+ expected: "runner shutdown",
+ },
+ {
+ name: "ShutdownTimeout",
+ timeout: 1 * time.Second,
+ contextTimeout: 1 * time.Minute,
+ expected: "runner timeout",
+ },
+ } {
+ t.Run(testCase.name, func(t *testing.T) {
+ runnerLog := make(chan string, 3)
+ configRunner := config.Runner{
+ FetchInterval: 1,
+ Capacity: 1,
+ Timeout: testCase.timeout,
+ }
+ p := &mockPoller{}
+ p.init(
+ &config.Config{
+ Runner: configRunner,
+ },
+ &mockClient{
+ noTask: testCase.noTask,
+ },
+ &mockRunner{
+ cfg: &configRunner,
+ log: runnerLog,
+ panics: testCase.panics,
+ err: testCase.err,
+ })
+ go p.Poll()
+ assert.Equal(t, "runner starts", <-runnerLog)
+ var ctx context.Context
+ var cancel context.CancelFunc
+ if testCase.contextTimeout > 0 {
+ ctx, cancel = context.WithTimeout(context.Background(), testCase.contextTimeout)
+ defer cancel()
+ } else {
+ ctx, cancel = context.WithCancel(context.Background())
+ cancel()
+ }
+ p.Shutdown(ctx)
+ <-p.done
+ assert.Equal(t, testCase.expected, <-runnerLog)
+ })
+ }
+}
+
+func TestPoller_Fetch(t *testing.T) {
+ setTrace(t)
+ for _, testCase := range []struct {
+ name string
+ noTask bool
+ sleep time.Duration
+ err error
+ cancel bool
+ success bool
+ }{
+ {
+ name: "Success",
+ success: true,
+ },
+ {
+ name: "Timeout",
+ sleep: 100 * time.Millisecond,
+ },
+ {
+ name: "Canceled",
+ cancel: true,
+ },
+ {
+ name: "NoTask",
+ noTask: true,
+ },
+ {
+ name: "Error",
+ err: fmt.Errorf("random error"),
+ },
+ } {
+ t.Run(testCase.name, func(t *testing.T) {
+ configRunner := config.Runner{
+ FetchTimeout: 1 * time.Millisecond,
+ }
+ p := &mockPoller{}
+ p.init(
+ &config.Config{
+ Runner: configRunner,
+ },
+ &mockClient{
+ sleep: testCase.sleep,
+ cancel: testCase.cancel,
+ noTask: testCase.noTask,
+ err: testCase.err,
+ },
+ &mockRunner{},
+ )
+ task, ok := p.fetchTask(context.Background())
+ if testCase.success {
+ assert.True(t, ok)
+ assert.NotNil(t, task)
+ } else {
+ assert.False(t, ok)
+ assert.Nil(t, task)
+ }
+ })
+ }
+}
diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go
new file mode 100644
index 0000000..e8654b6
--- /dev/null
+++ b/internal/app/run/runner.go
@@ -0,0 +1,260 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package run
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "connectrpc.com/connect"
+ "github.com/docker/docker/api/types/container"
+ "github.com/nektos/act/pkg/artifactcache"
+ "github.com/nektos/act/pkg/common"
+ "github.com/nektos/act/pkg/model"
+ "github.com/nektos/act/pkg/runner"
+ log "github.com/sirupsen/logrus"
+
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+ "gitea.com/gitea/act_runner/internal/pkg/config"
+ "gitea.com/gitea/act_runner/internal/pkg/labels"
+ "gitea.com/gitea/act_runner/internal/pkg/report"
+ "gitea.com/gitea/act_runner/internal/pkg/ver"
+)
+
+// Runner runs the pipeline.
+type Runner struct {
+ name string
+
+ cfg *config.Config
+
+ client client.Client
+ labels labels.Labels
+ envs map[string]string
+
+ runningTasks sync.Map
+}
+
+type RunnerInterface interface {
+ Run(ctx context.Context, task *runnerv1.Task) error
+}
+
+func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client) *Runner {
+ ls := labels.Labels{}
+ for _, v := range reg.Labels {
+ if l, err := labels.Parse(v); err == nil {
+ ls = append(ls, l)
+ }
+ }
+
+ if cfg.Runner.Envs == nil {
+ cfg.Runner.Envs = make(map[string]string, 10)
+ }
+
+ cfg.Runner.Envs["GITHUB_SERVER_URL"] = reg.Address
+
+ envs := make(map[string]string, len(cfg.Runner.Envs))
+ for k, v := range cfg.Runner.Envs {
+ envs[k] = v
+ }
+ if cfg.Cache.Enabled == nil || *cfg.Cache.Enabled {
+ if cfg.Cache.ExternalServer != "" {
+ envs["ACTIONS_CACHE_URL"] = cfg.Cache.ExternalServer
+ } else {
+ cacheHandler, err := artifactcache.StartHandler(
+ cfg.Cache.Dir,
+ cfg.Cache.Host,
+ cfg.Cache.Port,
+ log.StandardLogger().WithField("module", "cache_request"),
+ )
+ if err != nil {
+ log.Errorf("cannot init cache server, it will be disabled: %v", err)
+ // go on
+ } else {
+ envs["ACTIONS_CACHE_URL"] = cacheHandler.ExternalURL() + "/"
+ }
+ }
+ }
+
+ // set artifact gitea api
+ artifactGiteaAPI := strings.TrimSuffix(cli.Address(), "/") + "/api/actions_pipeline/"
+ envs["ACTIONS_RUNTIME_URL"] = artifactGiteaAPI
+ envs["ACTIONS_RESULTS_URL"] = strings.TrimSuffix(cli.Address(), "/")
+
+ // Set specific environments to distinguish between Gitea and GitHub
+ envs["GITEA_ACTIONS"] = "true"
+ envs["GITEA_ACTIONS_RUNNER_VERSION"] = ver.Version()
+
+ return &Runner{
+ name: reg.Name,
+ cfg: cfg,
+ client: cli,
+ labels: ls,
+ envs: envs,
+ }
+}
+
+func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
+ if _, ok := r.runningTasks.Load(task.Id); ok {
+ return fmt.Errorf("task %d is already running", task.Id)
+ }
+ r.runningTasks.Store(task.Id, struct{}{})
+ defer r.runningTasks.Delete(task.Id)
+
+ ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout)
+ defer cancel()
+ reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval)
+ var runErr error
+ defer func() {
+ lastWords := ""
+ if runErr != nil {
+ lastWords = runErr.Error()
+ }
+ _ = reporter.Close(lastWords)
+ }()
+ reporter.RunDaemon()
+ runErr = r.run(ctx, task, reporter)
+
+ return nil
+}
+
+func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.Reporter) (err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ err = fmt.Errorf("panic: %v", r)
+ }
+ }()
+
+ reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue())
+
+ workflow, jobID, err := generateWorkflow(task)
+ if err != nil {
+ return err
+ }
+
+ plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
+ if err != nil {
+ return err
+ }
+ job := workflow.GetJob(jobID)
+ reporter.ResetSteps(len(job.Steps))
+
+ taskContext := task.Context.Fields
+
+ log.Infof("task %v repo is %v %v %v", task.Id, taskContext["repository"].GetStringValue(),
+ taskContext["gitea_default_actions_url"].GetStringValue(),
+ r.client.Address())
+
+ preset := &model.GithubContext{
+ Event: taskContext["event"].GetStructValue().AsMap(),
+ RunID: taskContext["run_id"].GetStringValue(),
+ RunNumber: taskContext["run_number"].GetStringValue(),
+ Actor: taskContext["actor"].GetStringValue(),
+ Repository: taskContext["repository"].GetStringValue(),
+ EventName: taskContext["event_name"].GetStringValue(),
+ Sha: taskContext["sha"].GetStringValue(),
+ Ref: taskContext["ref"].GetStringValue(),
+ RefName: taskContext["ref_name"].GetStringValue(),
+ RefType: taskContext["ref_type"].GetStringValue(),
+ HeadRef: taskContext["head_ref"].GetStringValue(),
+ BaseRef: taskContext["base_ref"].GetStringValue(),
+ Token: taskContext["token"].GetStringValue(),
+ RepositoryOwner: taskContext["repository_owner"].GetStringValue(),
+ RetentionDays: taskContext["retention_days"].GetStringValue(),
+ }
+ if t := task.Secrets["GITEA_TOKEN"]; t != "" {
+ preset.Token = t
+ } else if t := task.Secrets["GITHUB_TOKEN"]; t != "" {
+ preset.Token = t
+ }
+
+ giteaRuntimeToken := taskContext["gitea_runtime_token"].GetStringValue()
+ if giteaRuntimeToken == "" {
+ // use task token to action api token for previous Gitea Server Versions
+ giteaRuntimeToken = preset.Token
+ }
+ r.envs["ACTIONS_RUNTIME_TOKEN"] = giteaRuntimeToken
+
+ eventJSON, err := json.Marshal(preset.Event)
+ if err != nil {
+ return err
+ }
+
+ maxLifetime := 3 * time.Hour
+ if deadline, ok := ctx.Deadline(); ok {
+ maxLifetime = time.Until(deadline)
+ }
+
+ var inputs map[string]string
+ if preset.EventName == "workflow_dispatch" {
+ if inputsRaw, ok := preset.Event["inputs"]; ok {
+ inputs, _ = inputsRaw.(map[string]string)
+ }
+ }
+
+ runnerConfig := &runner.Config{
+ // On Linux, Workdir will be like "/<parent_directory>/<owner>/<repo>"
+ // On Windows, Workdir will be like "\<parent_directory>\<owner>\<repo>"
+ Workdir: filepath.FromSlash(filepath.Clean(fmt.Sprintf("/%s/%s", r.cfg.Container.WorkdirParent, preset.Repository))),
+ BindWorkdir: false,
+ ActionCacheDir: filepath.FromSlash(r.cfg.Host.WorkdirParent),
+
+ ReuseContainers: false,
+ ForcePull: r.cfg.Container.ForcePull,
+ ForceRebuild: false,
+ LogOutput: true,
+ JSONLogger: false,
+ Env: r.envs,
+ Secrets: task.Secrets,
+ GitHubInstance: strings.TrimSuffix(r.client.Address(), "/"),
+ AutoRemove: true,
+ NoSkipCheckout: true,
+ PresetGitHubContext: preset,
+ EventJSON: string(eventJSON),
+ ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id),
+ ContainerMaxLifetime: maxLifetime,
+ ContainerNetworkMode: container.NetworkMode(r.cfg.Container.Network),
+ ContainerNetworkEnableIPv6: r.cfg.Container.EnableIPv6,
+ ContainerOptions: r.cfg.Container.Options,
+ ContainerDaemonSocket: r.cfg.Container.DockerHost,
+ Privileged: r.cfg.Container.Privileged,
+ DefaultActionInstance: taskContext["gitea_default_actions_url"].GetStringValue(),
+ PlatformPicker: r.labels.PickPlatform,
+ Vars: task.Vars,
+ ValidVolumes: r.cfg.Container.ValidVolumes,
+ InsecureSkipTLS: r.cfg.Runner.Insecure,
+ Inputs: inputs,
+ }
+
+ rr, err := runner.New(runnerConfig)
+ if err != nil {
+ return err
+ }
+ executor := rr.NewPlanExecutor(plan)
+
+ reporter.Logf("workflow prepared")
+
+ // add logger recorders
+ ctx = common.WithLoggerHook(ctx, reporter)
+
+ execErr := executor(ctx)
+ reporter.SetOutputs(job.Outputs)
+ return execErr
+}
+
+func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) {
+ return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{
+ Version: ver.Version(),
+ Labels: labels,
+ }))
+}
+
+func (r *Runner) Update(ctx context.Context, labels labels.Labels) {
+ r.labels = labels
+}
diff --git a/internal/app/run/runner_test.go b/internal/app/run/runner_test.go
new file mode 100644
index 0000000..0145c70
--- /dev/null
+++ b/internal/app/run/runner_test.go
@@ -0,0 +1,37 @@
+package run
+
+import (
+ "context"
+ "testing"
+
+ "gitea.com/gitea/act_runner/internal/pkg/labels"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestLabelUpdate(t *testing.T) {
+ ctx := context.Background()
+ ls := labels.Labels{}
+
+ initialLabel, err := labels.Parse("testlabel:docker://alpine")
+ assert.NoError(t, err)
+ ls = append(ls, initialLabel)
+
+ newLs := labels.Labels{}
+
+ newLabel, err := labels.Parse("next label:host")
+ assert.NoError(t, err)
+ newLs = append(newLs, initialLabel)
+ newLs = append(newLs, newLabel)
+
+ runner := Runner{
+ labels: ls,
+ }
+
+ assert.Contains(t, runner.labels, initialLabel)
+ assert.NotContains(t, runner.labels, newLabel)
+
+ runner.Update(ctx, newLs)
+
+ assert.Contains(t, runner.labels, initialLabel)
+ assert.Contains(t, runner.labels, newLabel)
+}
diff --git a/internal/app/run/workflow.go b/internal/app/run/workflow.go
new file mode 100644
index 0000000..a6fbb71
--- /dev/null
+++ b/internal/app/run/workflow.go
@@ -0,0 +1,54 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package run
+
+import (
+ "bytes"
+ "fmt"
+ "sort"
+ "strings"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "github.com/nektos/act/pkg/model"
+ "gopkg.in/yaml.v3"
+)
+
+func generateWorkflow(task *runnerv1.Task) (*model.Workflow, string, error) {
+ workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
+ if err != nil {
+ return nil, "", err
+ }
+
+ jobIDs := workflow.GetJobIDs()
+ if len(jobIDs) != 1 {
+ return nil, "", fmt.Errorf("multiple jobs found: %v", jobIDs)
+ }
+ jobID := jobIDs[0]
+
+ needJobIDs := make([]string, 0, len(task.Needs))
+ for id, need := range task.Needs {
+ needJobIDs = append(needJobIDs, id)
+ needJob := &model.Job{
+ Outputs: need.Outputs,
+ Result: strings.ToLower(strings.TrimPrefix(need.Result.String(), "RESULT_")),
+ }
+ workflow.Jobs[id] = needJob
+ }
+ sort.Strings(needJobIDs)
+
+ rawNeeds := yaml.Node{
+ Kind: yaml.SequenceNode,
+ Content: make([]*yaml.Node, 0, len(needJobIDs)),
+ }
+ for _, id := range needJobIDs {
+ rawNeeds.Content = append(rawNeeds.Content, &yaml.Node{
+ Kind: yaml.ScalarNode,
+ Value: id,
+ })
+ }
+
+ workflow.Jobs[jobID].RawNeeds = rawNeeds
+
+ return workflow, jobID, nil
+}
diff --git a/internal/app/run/workflow_test.go b/internal/app/run/workflow_test.go
new file mode 100644
index 0000000..c7598db
--- /dev/null
+++ b/internal/app/run/workflow_test.go
@@ -0,0 +1,74 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package run
+
+import (
+ "testing"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "github.com/nektos/act/pkg/model"
+ "github.com/stretchr/testify/require"
+ "gotest.tools/v3/assert"
+)
+
+func Test_generateWorkflow(t *testing.T) {
+ type args struct {
+ task *runnerv1.Task
+ }
+ tests := []struct {
+ name string
+ args args
+ assert func(t *testing.T, wf *model.Workflow)
+ want1 string
+ wantErr bool
+ }{
+ {
+ name: "has needs",
+ args: args{
+ task: &runnerv1.Task{
+ WorkflowPayload: []byte(`
+name: Build and deploy
+on: push
+
+jobs:
+ job9:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - run: ./deploy --build ${{ needs.job1.outputs.output1 }}
+ - run: ./deploy --build ${{ needs.job2.outputs.output2 }}
+`),
+ Needs: map[string]*runnerv1.TaskNeed{
+ "job1": {
+ Outputs: map[string]string{
+ "output1": "output1 value",
+ },
+ Result: runnerv1.Result_RESULT_SUCCESS,
+ },
+ "job2": {
+ Outputs: map[string]string{
+ "output2": "output2 value",
+ },
+ Result: runnerv1.Result_RESULT_SUCCESS,
+ },
+ },
+ },
+ },
+ assert: func(t *testing.T, wf *model.Workflow) {
+ assert.DeepEqual(t, wf.GetJob("job9").Needs(), []string{"job1", "job2"})
+ },
+ want1: "job9",
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, got1, err := generateWorkflow(tt.args.task)
+ require.NoError(t, err)
+ tt.assert(t, got)
+ assert.Equal(t, got1, tt.want1)
+ })
+ }
+}
diff --git a/internal/pkg/client/client.go b/internal/pkg/client/client.go
new file mode 100644
index 0000000..57f91ad
--- /dev/null
+++ b/internal/pkg/client/client.go
@@ -0,0 +1,19 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package client
+
+import (
+ "code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
+ "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
+)
+
+// A Client manages communication with the runner.
+//
+//go:generate mockery --name Client
+type Client interface {
+ pingv1connect.PingServiceClient
+ runnerv1connect.RunnerServiceClient
+ Address() string
+ Insecure() bool
+}
diff --git a/internal/pkg/client/header.go b/internal/pkg/client/header.go
new file mode 100644
index 0000000..24844fa
--- /dev/null
+++ b/internal/pkg/client/header.go
@@ -0,0 +1,11 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package client
+
+const (
+ UUIDHeader = "x-runner-uuid"
+ TokenHeader = "x-runner-token"
+ // Deprecated: could be removed after Gitea 1.20 released
+ VersionHeader = "x-runner-version"
+)
diff --git a/internal/pkg/client/http.go b/internal/pkg/client/http.go
new file mode 100644
index 0000000..d365a77
--- /dev/null
+++ b/internal/pkg/client/http.go
@@ -0,0 +1,82 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package client
+
+import (
+ "context"
+ "crypto/tls"
+ "net/http"
+ "strings"
+
+ "code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
+ "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
+ "connectrpc.com/connect"
+)
+
+func getHTTPClient(endpoint string, insecure bool) *http.Client {
+ if strings.HasPrefix(endpoint, "https://") && insecure {
+ return &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true,
+ },
+ },
+ }
+ }
+ return http.DefaultClient
+}
+
+// New returns a new runner client.
+func New(endpoint string, insecure bool, uuid, token, version string, opts ...connect.ClientOption) *HTTPClient {
+ baseURL := strings.TrimRight(endpoint, "/") + "/api/actions"
+
+ opts = append(opts, connect.WithInterceptors(connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
+ return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
+ if uuid != "" {
+ req.Header().Set(UUIDHeader, uuid)
+ }
+ if token != "" {
+ req.Header().Set(TokenHeader, token)
+ }
+ // TODO: version will be removed from request header after Gitea 1.20 released.
+ if version != "" {
+ req.Header().Set(VersionHeader, version)
+ }
+ return next(ctx, req)
+ }
+ })))
+
+ return &HTTPClient{
+ PingServiceClient: pingv1connect.NewPingServiceClient(
+ getHTTPClient(endpoint, insecure),
+ baseURL,
+ opts...,
+ ),
+ RunnerServiceClient: runnerv1connect.NewRunnerServiceClient(
+ getHTTPClient(endpoint, insecure),
+ baseURL,
+ opts...,
+ ),
+ endpoint: endpoint,
+ insecure: insecure,
+ }
+}
+
+func (c *HTTPClient) Address() string {
+ return c.endpoint
+}
+
+func (c *HTTPClient) Insecure() bool {
+ return c.insecure
+}
+
+var _ Client = (*HTTPClient)(nil)
+
+// An HTTPClient manages communication with the runner API.
+type HTTPClient struct {
+ pingv1connect.PingServiceClient
+ runnerv1connect.RunnerServiceClient
+ endpoint string
+ insecure bool
+}
diff --git a/internal/pkg/client/mocks/Client.go b/internal/pkg/client/mocks/Client.go
new file mode 100644
index 0000000..a8bfdb1
--- /dev/null
+++ b/internal/pkg/client/mocks/Client.go
@@ -0,0 +1,219 @@
+// Code generated by mockery v2.26.1. DO NOT EDIT.
+
+package mocks
+
+import (
+ context "context"
+
+ connect "connectrpc.com/connect"
+
+ mock "github.com/stretchr/testify/mock"
+
+ pingv1 "code.gitea.io/actions-proto-go/ping/v1"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+)
+
+// Client is an autogenerated mock type for the Client type
+type Client struct {
+ mock.Mock
+}
+
+// Address provides a mock function with given fields:
+func (_m *Client) Address() string {
+ ret := _m.Called()
+
+ var r0 string
+ if rf, ok := ret.Get(0).(func() string); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(string)
+ }
+
+ return r0
+}
+
+// Declare provides a mock function with given fields: _a0, _a1
+func (_m *Client) Declare(_a0 context.Context, _a1 *connect.Request[runnerv1.DeclareRequest]) (*connect.Response[runnerv1.DeclareResponse], error) {
+ ret := _m.Called(_a0, _a1)
+
+ var r0 *connect.Response[runnerv1.DeclareResponse]
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.DeclareRequest]) (*connect.Response[runnerv1.DeclareResponse], error)); ok {
+ return rf(_a0, _a1)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.DeclareRequest]) *connect.Response[runnerv1.DeclareResponse]); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*connect.Response[runnerv1.DeclareResponse])
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, *connect.Request[runnerv1.DeclareRequest]) error); ok {
+ r1 = rf(_a0, _a1)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// FetchTask provides a mock function with given fields: _a0, _a1
+func (_m *Client) FetchTask(_a0 context.Context, _a1 *connect.Request[runnerv1.FetchTaskRequest]) (*connect.Response[runnerv1.FetchTaskResponse], error) {
+ ret := _m.Called(_a0, _a1)
+
+ var r0 *connect.Response[runnerv1.FetchTaskResponse]
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.FetchTaskRequest]) (*connect.Response[runnerv1.FetchTaskResponse], error)); ok {
+ return rf(_a0, _a1)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.FetchTaskRequest]) *connect.Response[runnerv1.FetchTaskResponse]); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*connect.Response[runnerv1.FetchTaskResponse])
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, *connect.Request[runnerv1.FetchTaskRequest]) error); ok {
+ r1 = rf(_a0, _a1)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// Insecure provides a mock function with given fields:
+func (_m *Client) Insecure() bool {
+ ret := _m.Called()
+
+ var r0 bool
+ if rf, ok := ret.Get(0).(func() bool); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(bool)
+ }
+
+ return r0
+}
+
+// Ping provides a mock function with given fields: _a0, _a1
+func (_m *Client) Ping(_a0 context.Context, _a1 *connect.Request[pingv1.PingRequest]) (*connect.Response[pingv1.PingResponse], error) {
+ ret := _m.Called(_a0, _a1)
+
+ var r0 *connect.Response[pingv1.PingResponse]
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[pingv1.PingRequest]) (*connect.Response[pingv1.PingResponse], error)); ok {
+ return rf(_a0, _a1)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[pingv1.PingRequest]) *connect.Response[pingv1.PingResponse]); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*connect.Response[pingv1.PingResponse])
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, *connect.Request[pingv1.PingRequest]) error); ok {
+ r1 = rf(_a0, _a1)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// Register provides a mock function with given fields: _a0, _a1
+func (_m *Client) Register(_a0 context.Context, _a1 *connect.Request[runnerv1.RegisterRequest]) (*connect.Response[runnerv1.RegisterResponse], error) {
+ ret := _m.Called(_a0, _a1)
+
+ var r0 *connect.Response[runnerv1.RegisterResponse]
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.RegisterRequest]) (*connect.Response[runnerv1.RegisterResponse], error)); ok {
+ return rf(_a0, _a1)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.RegisterRequest]) *connect.Response[runnerv1.RegisterResponse]); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*connect.Response[runnerv1.RegisterResponse])
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, *connect.Request[runnerv1.RegisterRequest]) error); ok {
+ r1 = rf(_a0, _a1)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// UpdateLog provides a mock function with given fields: _a0, _a1
+func (_m *Client) UpdateLog(_a0 context.Context, _a1 *connect.Request[runnerv1.UpdateLogRequest]) (*connect.Response[runnerv1.UpdateLogResponse], error) {
+ ret := _m.Called(_a0, _a1)
+
+ var r0 *connect.Response[runnerv1.UpdateLogResponse]
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.UpdateLogRequest]) (*connect.Response[runnerv1.UpdateLogResponse], error)); ok {
+ return rf(_a0, _a1)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.UpdateLogRequest]) *connect.Response[runnerv1.UpdateLogResponse]); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*connect.Response[runnerv1.UpdateLogResponse])
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, *connect.Request[runnerv1.UpdateLogRequest]) error); ok {
+ r1 = rf(_a0, _a1)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// UpdateTask provides a mock function with given fields: _a0, _a1
+func (_m *Client) UpdateTask(_a0 context.Context, _a1 *connect.Request[runnerv1.UpdateTaskRequest]) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
+ ret := _m.Called(_a0, _a1)
+
+ var r0 *connect.Response[runnerv1.UpdateTaskResponse]
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.UpdateTaskRequest]) (*connect.Response[runnerv1.UpdateTaskResponse], error)); ok {
+ return rf(_a0, _a1)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[runnerv1.UpdateTaskRequest]) *connect.Response[runnerv1.UpdateTaskResponse]); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*connect.Response[runnerv1.UpdateTaskResponse])
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, *connect.Request[runnerv1.UpdateTaskRequest]) error); ok {
+ r1 = rf(_a0, _a1)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+type mockConstructorTestingTNewClient interface {
+ mock.TestingT
+ Cleanup(func())
+}
+
+// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+func NewClient(t mockConstructorTestingTNewClient) *Client {
+ mock := &Client{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/internal/pkg/config/config.example.yaml b/internal/pkg/config/config.example.yaml
new file mode 100644
index 0000000..32dfb68
--- /dev/null
+++ b/internal/pkg/config/config.example.yaml
@@ -0,0 +1,100 @@
+# Example configuration file, it's safe to copy this as the default config file without any modification.
+
+# You don't have to copy this file to your instance,
+# just run `./act_runner generate-config > config.yaml` to generate a config file.
+
+log:
+ # The level of logging, can be trace, debug, info, warn, error, fatal
+ level: info
+
+runner:
+ # Where to store the registration result.
+ file: .runner
+ # Execute how many tasks concurrently at the same time.
+ capacity: 1
+ # Extra environment variables to run jobs.
+ envs:
+ A_TEST_ENV_NAME_1: a_test_env_value_1
+ A_TEST_ENV_NAME_2: a_test_env_value_2
+ # Extra environment variables to run jobs from a file.
+ # It will be ignored if it's empty or the file doesn't exist.
+ env_file: .env
+ # The timeout for a job to be finished.
+ # Please note that the Forgejo instance also has a timeout (3h by default) for the job.
+ # So the job could be stopped by the Forgejo instance if it's timeout is shorter than this.
+ timeout: 3h
+ # The timeout for the runner to wait for running jobs to finish when
+ # shutting down because a TERM or INT signal has been received. Any
+ # running jobs that haven't finished after this timeout will be
+ # cancelled.
+ # If unset or zero the jobs will be cancelled immediately.
+ shutdown_timeout: 3h
+ # Whether skip verifying the TLS certificate of the instance.
+ insecure: false
+ # The timeout for fetching the job from the Forgejo instance.
+ fetch_timeout: 5s
+ # The interval for fetching the job from the Forgejo instance.
+ fetch_interval: 2s
+ # The interval for reporting the job status and logs to the Forgejo instance.
+ report_interval: 1s
+ # The labels of a runner are used to determine which jobs the runner can run, and how to run them.
+ # Like: ["macos-arm64:host", "ubuntu-latest:docker://node:20-bookworm", "ubuntu-22.04:docker://node:20-bookworm"]
+ # If it's empty when registering, it will ask for inputting labels.
+ # If it's empty when execute `deamon`, will use labels in `.runner` file.
+ labels: []
+
+cache:
+ # Enable cache server to use actions/cache.
+ enabled: true
+ # The directory to store the cache data.
+ # If it's empty, the cache data will be stored in $HOME/.cache/actcache.
+ dir: ""
+ # The host of the cache server.
+ # It's not for the address to listen, but the address to connect from job containers.
+ # So 0.0.0.0 is a bad choice, leave it empty to detect automatically.
+ host: ""
+ # The port of the cache server.
+ # 0 means to use a random available port.
+ port: 0
+ # The external cache server URL. Valid only when enable is true.
+ # If it's specified, act_runner will use this URL as the ACTIONS_CACHE_URL rather than start a server by itself.
+ # The URL should generally end with "/".
+ external_server: ""
+
+container:
+ # Specifies the network to which the container will connect.
+ # Could be host, bridge or the name of a custom network.
+ # If it's empty, create a network automatically.
+ network: ""
+ # Whether to create networks with IPv6 enabled. Requires the Docker daemon to be set up accordingly.
+ # Only takes effect if "network" is set to "".
+ enable_ipv6: false
+ # Whether to use privileged mode or not when launching task containers (privileged mode is required for Docker-in-Docker).
+ privileged: false
+ # And other options to be used when the container is started (eg, --add-host=my.forgejo.url:host-gateway).
+ options:
+ # The parent directory of a job's working directory.
+ # If it's empty, /workspace will be used.
+ workdir_parent:
+ # Volumes (including bind mounts) can be mounted to containers. Glob syntax is supported, see https://github.com/gobwas/glob
+ # You can specify multiple volumes. If the sequence is empty, no volumes can be mounted.
+ # For example, if you only allow containers to mount the `data` volume and all the json files in `/src`, you should change the config to:
+ # valid_volumes:
+ # - data
+ # - /src/*.json
+ # If you want to allow any volume, please use the following configuration:
+ # valid_volumes:
+ # - '**'
+ valid_volumes: []
+ # overrides the docker client host with the specified one.
+ # If it's empty, act_runner will find an available docker host automatically.
+ # If it's "-", act_runner will find an available docker host automatically, but the docker host won't be mounted to the job containers and service containers.
+ # If it's not empty or "-", the specified docker host will be used. An error will be returned if it doesn't work.
+ docker_host: ""
+ # Pull docker image(s) even if already present
+ force_pull: false
+
+host:
+ # The parent directory of a job's working directory.
+ # If it's empty, $HOME/.cache/act/ will be used.
+ workdir_parent:
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
new file mode 100644
index 0000000..a1536b3
--- /dev/null
+++ b/internal/pkg/config/config.go
@@ -0,0 +1,166 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package config
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/joho/godotenv"
+ log "github.com/sirupsen/logrus"
+ "gopkg.in/yaml.v3"
+)
+
+// Log represents the configuration for logging.
+type Log struct {
+ Level string `yaml:"level"` // Level indicates the logging level.
+}
+
+// Runner represents the configuration for the runner.
+type Runner struct {
+ File string `yaml:"file"` // File specifies the file path for the runner.
+ Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
+ Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
+ EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
+ Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
+ ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` // ShutdownTimeout specifies the duration to wait for running jobs to complete during a shutdown of the runner.
+ Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
+ FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
+ FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
+ ReportInterval time.Duration `yaml:"report_interval"` // ReportInterval specifies the interval duration for reporting status and logs of a running job.
+ Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup
+}
+
+// Cache represents the configuration for caching.
+type Cache struct {
+ Enabled *bool `yaml:"enabled"` // Enabled indicates whether caching is enabled. It is a pointer to distinguish between false and not set. If not set, it will be true.
+ Dir string `yaml:"dir"` // Dir specifies the directory path for caching.
+ Host string `yaml:"host"` // Host specifies the caching host.
+ Port uint16 `yaml:"port"` // Port specifies the caching port.
+ ExternalServer string `yaml:"external_server"` // ExternalServer specifies the URL of external cache server
+}
+
+// Container represents the configuration for the container.
+type Container struct {
+ Network string `yaml:"network"` // Network specifies the network for the container.
+ NetworkMode string `yaml:"network_mode"` // Deprecated: use Network instead. Could be removed after Gitea 1.20
+ EnableIPv6 bool `yaml:"enable_ipv6"` // EnableIPv6 indicates whether the network is created with IPv6 enabled.
+ Privileged bool `yaml:"privileged"` // Privileged indicates whether the container runs in privileged mode.
+ Options string `yaml:"options"` // Options specifies additional options for the container.
+ WorkdirParent string `yaml:"workdir_parent"` // WorkdirParent specifies the parent directory for the container's working directory.
+ ValidVolumes []string `yaml:"valid_volumes"` // ValidVolumes specifies the volumes (including bind mounts) can be mounted to containers.
+ DockerHost string `yaml:"docker_host"` // DockerHost specifies the Docker host. It overrides the value specified in environment variable DOCKER_HOST.
+ ForcePull bool `yaml:"force_pull"` // Pull docker image(s) even if already present
+}
+
+// Host represents the configuration for the host.
+type Host struct {
+ WorkdirParent string `yaml:"workdir_parent"` // WorkdirParent specifies the parent directory for the host's working directory.
+}
+
+// Config represents the overall configuration.
+type Config struct {
+ Log Log `yaml:"log"` // Log represents the configuration for logging.
+ Runner Runner `yaml:"runner"` // Runner represents the configuration for the runner.
+ Cache Cache `yaml:"cache"` // Cache represents the configuration for caching.
+ Container Container `yaml:"container"` // Container represents the configuration for the container.
+ Host Host `yaml:"host"` // Host represents the configuration for the host.
+}
+
+// Tune the config settings accordingly to the Forgejo instance that will be used.
+func (c *Config) Tune(instanceURL string) {
+ if instanceURL == "https://codeberg.org" {
+ if c.Runner.FetchInterval < 30*time.Second {
+ log.Info("The runner is configured to be used by a public instance, fetch interval is set to 30 seconds.")
+ c.Runner.FetchInterval = 30 * time.Second
+ }
+ }
+}
+
+// LoadDefault returns the default configuration.
+// If file is not empty, it will be used to load the configuration.
+func LoadDefault(file string) (*Config, error) {
+ cfg := &Config{}
+ if file != "" {
+ content, err := os.ReadFile(file)
+ if err != nil {
+ return nil, fmt.Errorf("open config file %q: %w", file, err)
+ }
+ if err := yaml.Unmarshal(content, cfg); err != nil {
+ return nil, fmt.Errorf("parse config file %q: %w", file, err)
+ }
+ }
+ compatibleWithOldEnvs(file != "", cfg)
+
+ if cfg.Runner.EnvFile != "" {
+ if stat, err := os.Stat(cfg.Runner.EnvFile); err == nil && !stat.IsDir() {
+ envs, err := godotenv.Read(cfg.Runner.EnvFile)
+ if err != nil {
+ return nil, fmt.Errorf("read env file %q: %w", cfg.Runner.EnvFile, err)
+ }
+ if cfg.Runner.Envs == nil {
+ cfg.Runner.Envs = map[string]string{}
+ }
+ for k, v := range envs {
+ cfg.Runner.Envs[k] = v
+ }
+ }
+ }
+
+ if cfg.Log.Level == "" {
+ cfg.Log.Level = "info"
+ }
+ if cfg.Runner.File == "" {
+ cfg.Runner.File = ".runner"
+ }
+ if cfg.Runner.Capacity <= 0 {
+ cfg.Runner.Capacity = 1
+ }
+ if cfg.Runner.Timeout <= 0 {
+ cfg.Runner.Timeout = 3 * time.Hour
+ }
+ if cfg.Cache.Enabled == nil {
+ b := true
+ cfg.Cache.Enabled = &b
+ }
+ if *cfg.Cache.Enabled {
+ if cfg.Cache.Dir == "" {
+ home, _ := os.UserHomeDir()
+ cfg.Cache.Dir = filepath.Join(home, ".cache", "actcache")
+ }
+ }
+ if cfg.Container.WorkdirParent == "" {
+ cfg.Container.WorkdirParent = "workspace"
+ }
+ if cfg.Host.WorkdirParent == "" {
+ home, _ := os.UserHomeDir()
+ cfg.Host.WorkdirParent = filepath.Join(home, ".cache", "act")
+ }
+ if cfg.Runner.FetchTimeout <= 0 {
+ cfg.Runner.FetchTimeout = 5 * time.Second
+ }
+ if cfg.Runner.FetchInterval <= 0 {
+ cfg.Runner.FetchInterval = 2 * time.Second
+ }
+ if cfg.Runner.ReportInterval <= 0 {
+ cfg.Runner.ReportInterval = time.Second
+ }
+
+ // although `container.network_mode` will be deprecated, but we have to be compatible with it for now.
+ if cfg.Container.NetworkMode != "" && cfg.Container.Network == "" {
+ log.Warn("You are trying to use deprecated configuration item of `container.network_mode`, please use `container.network` instead.")
+ if cfg.Container.NetworkMode == "bridge" {
+ // Previously, if the value of `container.network_mode` is `bridge`, we will create a new network for job.
+ // But “bridge” is easily confused with the bridge network created by Docker by default.
+ // So we set the value of `container.network` to empty string to make `act_runner` automatically create a new network for job.
+ cfg.Container.Network = ""
+ } else {
+ cfg.Container.Network = cfg.Container.NetworkMode
+ }
+ }
+
+ return cfg, nil
+}
diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go
new file mode 100644
index 0000000..d2ddf2f
--- /dev/null
+++ b/internal/pkg/config/config_test.go
@@ -0,0 +1,37 @@
+// Copyright 2024 The Forgejo Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package config
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestConfigTune(t *testing.T) {
+ c := &Config{
+ Runner: Runner{},
+ }
+
+ t.Run("Public instance tuning", func(t *testing.T) {
+ c.Runner.FetchInterval = 60 * time.Second
+ c.Tune("https://codeberg.org")
+ assert.EqualValues(t, 60*time.Second, c.Runner.FetchInterval)
+
+ c.Runner.FetchInterval = 2 * time.Second
+ c.Tune("https://codeberg.org")
+ assert.EqualValues(t, 30*time.Second, c.Runner.FetchInterval)
+ })
+
+ t.Run("Non-public instance tuning", func(t *testing.T) {
+ c.Runner.FetchInterval = 60 * time.Second
+ c.Tune("https://example.com")
+ assert.EqualValues(t, 60*time.Second, c.Runner.FetchInterval)
+
+ c.Runner.FetchInterval = 2 * time.Second
+ c.Tune("https://codeberg.com")
+ assert.EqualValues(t, 2*time.Second, c.Runner.FetchInterval)
+ })
+}
diff --git a/internal/pkg/config/deprecated.go b/internal/pkg/config/deprecated.go
new file mode 100644
index 0000000..b5051aa
--- /dev/null
+++ b/internal/pkg/config/deprecated.go
@@ -0,0 +1,62 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package config
+
+import (
+ "os"
+ "strconv"
+ "strings"
+
+ log "github.com/sirupsen/logrus"
+)
+
+// Deprecated: could be removed in the future. TODO: remove it when Gitea 1.20.0 is released.
+// Be compatible with old envs.
+func compatibleWithOldEnvs(fileUsed bool, cfg *Config) {
+ handleEnv := func(key string) (string, bool) {
+ if v, ok := os.LookupEnv(key); ok {
+ if fileUsed {
+ log.Warnf("env %s has been ignored because config file is used", key)
+ return "", false
+ }
+ log.Warnf("env %s will be deprecated, please use config file instead", key)
+ return v, true
+ }
+ return "", false
+ }
+
+ if v, ok := handleEnv("GITEA_DEBUG"); ok {
+ if b, _ := strconv.ParseBool(v); b {
+ cfg.Log.Level = "debug"
+ }
+ }
+ if v, ok := handleEnv("GITEA_TRACE"); ok {
+ if b, _ := strconv.ParseBool(v); b {
+ cfg.Log.Level = "trace"
+ }
+ }
+ if v, ok := handleEnv("GITEA_RUNNER_CAPACITY"); ok {
+ if i, _ := strconv.Atoi(v); i > 0 {
+ cfg.Runner.Capacity = i
+ }
+ }
+ if v, ok := handleEnv("GITEA_RUNNER_FILE"); ok {
+ cfg.Runner.File = v
+ }
+ if v, ok := handleEnv("GITEA_RUNNER_ENVIRON"); ok {
+ splits := strings.Split(v, ",")
+ if cfg.Runner.Envs == nil {
+ cfg.Runner.Envs = map[string]string{}
+ }
+ for _, split := range splits {
+ kv := strings.SplitN(split, ":", 2)
+ if len(kv) == 2 && kv[0] != "" {
+ cfg.Runner.Envs[kv[0]] = kv[1]
+ }
+ }
+ }
+ if v, ok := handleEnv("GITEA_RUNNER_ENV_FILE"); ok {
+ cfg.Runner.EnvFile = v
+ }
+}
diff --git a/internal/pkg/config/embed.go b/internal/pkg/config/embed.go
new file mode 100644
index 0000000..cf445cf
--- /dev/null
+++ b/internal/pkg/config/embed.go
@@ -0,0 +1,9 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package config
+
+import _ "embed"
+
+//go:embed config.example.yaml
+var Example []byte
diff --git a/internal/pkg/config/registration.go b/internal/pkg/config/registration.go
new file mode 100644
index 0000000..be66b4f
--- /dev/null
+++ b/internal/pkg/config/registration.go
@@ -0,0 +1,54 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package config
+
+import (
+ "encoding/json"
+ "os"
+)
+
+const registrationWarning = "This file is automatically generated by act-runner. Do not edit it manually unless you know what you are doing. Removing this file will cause act runner to re-register as a new runner."
+
+// Registration is the registration information for a runner
+type Registration struct {
+ Warning string `json:"WARNING"` // Warning message to display, it's always the registrationWarning constant
+
+ ID int64 `json:"id"`
+ UUID string `json:"uuid"`
+ Name string `json:"name"`
+ Token string `json:"token"`
+ Address string `json:"address"`
+ Labels []string `json:"labels"`
+}
+
+func LoadRegistration(file string) (*Registration, error) {
+ f, err := os.Open(file)
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+
+ var reg Registration
+ if err := json.NewDecoder(f).Decode(&reg); err != nil {
+ return nil, err
+ }
+
+ reg.Warning = ""
+
+ return &reg, nil
+}
+
+func SaveRegistration(file string, reg *Registration) error {
+ f, err := os.Create(file)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ reg.Warning = registrationWarning
+
+ enc := json.NewEncoder(f)
+ enc.SetIndent("", " ")
+ return enc.Encode(reg)
+}
diff --git a/internal/pkg/envcheck/doc.go b/internal/pkg/envcheck/doc.go
new file mode 100644
index 0000000..8641a77
--- /dev/null
+++ b/internal/pkg/envcheck/doc.go
@@ -0,0 +1,5 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+// Package envcheck provides a simple way to check if the environment is ready to run jobs.
+package envcheck
diff --git a/internal/pkg/envcheck/docker.go b/internal/pkg/envcheck/docker.go
new file mode 100644
index 0000000..f115bc7
--- /dev/null
+++ b/internal/pkg/envcheck/docker.go
@@ -0,0 +1,34 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package envcheck
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/docker/docker/client"
+)
+
+func CheckIfDockerRunning(ctx context.Context, configDockerHost string) error {
+ opts := []client.Opt{
+ client.FromEnv,
+ }
+
+ if configDockerHost != "" {
+ opts = append(opts, client.WithHost(configDockerHost))
+ }
+
+ cli, err := client.NewClientWithOpts(opts...)
+ if err != nil {
+ return err
+ }
+ defer cli.Close()
+
+ _, err = cli.Ping(ctx)
+ if err != nil {
+ return fmt.Errorf("cannot ping the docker daemon. is it running? %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/pkg/labels/labels.go b/internal/pkg/labels/labels.go
new file mode 100644
index 0000000..f448fdf
--- /dev/null
+++ b/internal/pkg/labels/labels.go
@@ -0,0 +1,109 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package labels
+
+import (
+ "fmt"
+ "strings"
+)
+
+const (
+ SchemeHost = "host"
+ SchemeDocker = "docker"
+ SchemeLXC = "lxc"
+)
+
+type Label struct {
+ Name string
+ Schema string
+ Arg string
+}
+
+func Parse(str string) (*Label, error) {
+ splits := strings.SplitN(str, ":", 3)
+ label := &Label{
+ Name: splits[0],
+ Schema: "host",
+ Arg: "",
+ }
+ if len(splits) >= 2 {
+ label.Schema = splits[1]
+ }
+ if len(splits) >= 3 {
+ label.Arg = splits[2]
+ }
+ if label.Schema != SchemeHost && label.Schema != SchemeDocker && label.Schema != SchemeLXC {
+ return nil, fmt.Errorf("unsupported schema: %s", label.Schema)
+ }
+ return label, nil
+}
+
+type Labels []*Label
+
+func (l Labels) RequireDocker() bool {
+ for _, label := range l {
+ if label.Schema == SchemeDocker {
+ return true
+ }
+ }
+ return false
+}
+
+func (l Labels) PickPlatform(runsOn []string) string {
+ platforms := make(map[string]string, len(l))
+ for _, label := range l {
+ switch label.Schema {
+ case SchemeDocker:
+ // "//" will be ignored
+ platforms[label.Name] = strings.TrimPrefix(label.Arg, "//")
+ case SchemeHost:
+ platforms[label.Name] = "-self-hosted"
+ case SchemeLXC:
+ platforms[label.Name] = "lxc:" + strings.TrimPrefix(label.Arg, "//")
+ default:
+ // It should not happen, because Parse has checked it.
+ continue
+ }
+ }
+ for _, v := range runsOn {
+ if v, ok := platforms[v]; ok {
+ return v
+ }
+ }
+
+ // TODO: support multiple labels
+ // like:
+ // ["ubuntu-22.04"] => "ubuntu:22.04"
+ // ["with-gpu"] => "linux:with-gpu"
+ // ["ubuntu-22.04", "with-gpu"] => "ubuntu:22.04_with-gpu"
+
+ // return default.
+ // So the runner receives a task with a label that the runner doesn't have,
+ // it happens when the user have edited the label of the runner in the web UI.
+ // TODO: it may be not correct, what if the runner is used as host mode only?
+ return "node:20-bullseye"
+}
+
+func (l Labels) Names() []string {
+ names := make([]string, 0, len(l))
+ for _, label := range l {
+ names = append(names, label.Name)
+ }
+ return names
+}
+
+func (l Labels) ToStrings() []string {
+ ls := make([]string, 0, len(l))
+ for _, label := range l {
+ lbl := label.Name
+ if label.Schema != "" {
+ lbl += ":" + label.Schema
+ if label.Arg != "" {
+ lbl += ":" + label.Arg
+ }
+ }
+ ls = append(ls, lbl)
+ }
+ return ls
+}
diff --git a/internal/pkg/labels/labels_test.go b/internal/pkg/labels/labels_test.go
new file mode 100644
index 0000000..e46a27b
--- /dev/null
+++ b/internal/pkg/labels/labels_test.go
@@ -0,0 +1,63 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package labels
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gotest.tools/v3/assert"
+)
+
+func TestParse(t *testing.T) {
+ tests := []struct {
+ args string
+ want *Label
+ wantErr bool
+ }{
+ {
+ args: "ubuntu:docker://node:18",
+ want: &Label{
+ Name: "ubuntu",
+ Schema: "docker",
+ Arg: "//node:18",
+ },
+ wantErr: false,
+ },
+ {
+ args: "ubuntu:host",
+ want: &Label{
+ Name: "ubuntu",
+ Schema: "host",
+ Arg: "",
+ },
+ wantErr: false,
+ },
+ {
+ args: "ubuntu",
+ want: &Label{
+ Name: "ubuntu",
+ Schema: "host",
+ Arg: "",
+ },
+ wantErr: false,
+ },
+ {
+ args: "ubuntu:vm:ubuntu-18.04",
+ want: nil,
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.args, func(t *testing.T) {
+ got, err := Parse(tt.args)
+ if tt.wantErr {
+ require.Error(t, err)
+ return
+ }
+ require.NoError(t, err)
+ assert.DeepEqual(t, got, tt.want)
+ })
+ }
+}
diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go
new file mode 100644
index 0000000..cee5062
--- /dev/null
+++ b/internal/pkg/report/reporter.go
@@ -0,0 +1,437 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package report
+
+import (
+ "context"
+ "fmt"
+ "regexp"
+ "strings"
+ "sync"
+ "time"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "connectrpc.com/connect"
+ retry "github.com/avast/retry-go/v4"
+ log "github.com/sirupsen/logrus"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "gitea.com/gitea/act_runner/internal/pkg/client"
+)
+
+type Reporter struct {
+ ctx context.Context
+ cancel context.CancelFunc
+
+ closed bool
+ client client.Client
+ clientM sync.Mutex
+
+ logOffset int
+ logRows []*runnerv1.LogRow
+ logReplacer *strings.Replacer
+ oldnew []string
+ reportInterval time.Duration
+
+ state *runnerv1.TaskState
+ stateMu sync.RWMutex
+ outputs sync.Map
+
+ debugOutputEnabled bool
+ stopCommandEndToken string
+}
+
+func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task, reportInterval time.Duration) *Reporter {
+ var oldnew []string
+ if v := task.Context.Fields["token"].GetStringValue(); v != "" {
+ oldnew = append(oldnew, v, "***")
+ }
+ if v := task.Context.Fields["gitea_runtime_token"].GetStringValue(); v != "" {
+ oldnew = append(oldnew, v, "***")
+ }
+ for _, v := range task.Secrets {
+ oldnew = append(oldnew, v, "***")
+ }
+
+ rv := &Reporter{
+ ctx: ctx,
+ cancel: cancel,
+ client: client,
+ oldnew: oldnew,
+ reportInterval: reportInterval,
+ logReplacer: strings.NewReplacer(oldnew...),
+ state: &runnerv1.TaskState{
+ Id: task.Id,
+ },
+ }
+
+ if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
+ rv.debugOutputEnabled = true
+ }
+
+ return rv
+}
+
+func (r *Reporter) ResetSteps(l int) {
+ r.stateMu.Lock()
+ defer r.stateMu.Unlock()
+ for i := 0; i < l; i++ {
+ r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
+ Id: int64(i),
+ })
+ }
+}
+
+func (r *Reporter) Levels() []log.Level {
+ return log.AllLevels
+}
+
+func appendIfNotNil[T any](s []*T, v *T) []*T {
+ if v != nil {
+ return append(s, v)
+ }
+ return s
+}
+
+func (r *Reporter) Fire(entry *log.Entry) error {
+ r.stateMu.Lock()
+ defer r.stateMu.Unlock()
+
+ log.WithFields(entry.Data).Trace(entry.Message)
+
+ timestamp := entry.Time
+ if r.state.StartedAt == nil {
+ r.state.StartedAt = timestamppb.New(timestamp)
+ }
+
+ stage := entry.Data["stage"]
+
+ if stage != "Main" {
+ if v, ok := entry.Data["jobResult"]; ok {
+ if jobResult, ok := r.parseResult(v); ok {
+ r.state.Result = jobResult
+ r.state.StoppedAt = timestamppb.New(timestamp)
+ for _, s := range r.state.Steps {
+ if s.Result == runnerv1.Result_RESULT_UNSPECIFIED {
+ s.Result = runnerv1.Result_RESULT_CANCELLED
+ if jobResult == runnerv1.Result_RESULT_SKIPPED {
+ s.Result = runnerv1.Result_RESULT_SKIPPED
+ }
+ }
+ }
+ }
+ }
+ if !r.duringSteps() {
+ r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
+ }
+ return nil
+ }
+
+ var step *runnerv1.StepState
+ if v, ok := entry.Data["stepNumber"]; ok {
+ if v, ok := v.(int); ok && len(r.state.Steps) > v {
+ step = r.state.Steps[v]
+ }
+ }
+ if step == nil {
+ if !r.duringSteps() {
+ r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
+ }
+ return nil
+ }
+
+ if step.StartedAt == nil {
+ step.StartedAt = timestamppb.New(timestamp)
+ }
+ if v, ok := entry.Data["raw_output"]; ok {
+ if rawOutput, ok := v.(bool); ok && rawOutput {
+ if row := r.parseLogRow(entry); row != nil {
+ if step.LogLength == 0 {
+ step.LogIndex = int64(r.logOffset + len(r.logRows))
+ }
+ step.LogLength++
+ r.logRows = append(r.logRows, row)
+ }
+ }
+ } else if !r.duringSteps() {
+ r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
+ }
+ if v, ok := entry.Data["stepResult"]; ok {
+ if stepResult, ok := r.parseResult(v); ok {
+ if step.LogLength == 0 {
+ step.LogIndex = int64(r.logOffset + len(r.logRows))
+ }
+ step.Result = stepResult
+ step.StoppedAt = timestamppb.New(timestamp)
+ }
+ }
+
+ return nil
+}
+
+func (r *Reporter) RunDaemon() {
+ if r.closed {
+ return
+ }
+ if r.ctx.Err() != nil {
+ return
+ }
+
+ _ = r.ReportLog(false)
+ _ = r.ReportState()
+
+ time.AfterFunc(r.reportInterval, r.RunDaemon)
+}
+
+func (r *Reporter) Logf(format string, a ...interface{}) {
+ r.stateMu.Lock()
+ defer r.stateMu.Unlock()
+
+ r.logf(format, a...)
+}
+
+func (r *Reporter) logf(format string, a ...interface{}) {
+ if !r.duringSteps() {
+ r.logRows = append(r.logRows, &runnerv1.LogRow{
+ Time: timestamppb.Now(),
+ Content: fmt.Sprintf(format, a...),
+ })
+ }
+}
+
+func (r *Reporter) SetOutputs(outputs map[string]string) {
+ r.stateMu.Lock()
+ defer r.stateMu.Unlock()
+
+ for k, v := range outputs {
+ if len(k) > 255 {
+ r.logf("ignore output because the key is too long: %q", k)
+ continue
+ }
+ if l := len(v); l > 1024*1024 {
+ log.Println("ignore output because the value is too long:", k, l)
+ r.logf("ignore output because the value %q is too long: %d", k, l)
+ }
+ if _, ok := r.outputs.Load(k); ok {
+ continue
+ }
+ r.outputs.Store(k, v)
+ }
+}
+
+func (r *Reporter) Close(lastWords string) error {
+ r.closed = true
+
+ r.stateMu.Lock()
+ if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
+ if lastWords == "" {
+ lastWords = "Early termination"
+ }
+ for _, v := range r.state.Steps {
+ if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
+ v.Result = runnerv1.Result_RESULT_CANCELLED
+ }
+ }
+ r.state.Result = runnerv1.Result_RESULT_FAILURE
+ r.logRows = append(r.logRows, &runnerv1.LogRow{
+ Time: timestamppb.Now(),
+ Content: lastWords,
+ })
+ r.state.StoppedAt = timestamppb.Now()
+ } else if lastWords != "" {
+ r.logRows = append(r.logRows, &runnerv1.LogRow{
+ Time: timestamppb.Now(),
+ Content: lastWords,
+ })
+ }
+ r.stateMu.Unlock()
+
+ return retry.Do(func() error {
+ if err := r.ReportLog(true); err != nil {
+ return err
+ }
+ return r.ReportState()
+ }, retry.Context(r.ctx))
+}
+
+func (r *Reporter) ReportLog(noMore bool) error {
+ r.clientM.Lock()
+ defer r.clientM.Unlock()
+
+ r.stateMu.RLock()
+ rows := r.logRows
+ r.stateMu.RUnlock()
+
+ resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
+ TaskId: r.state.Id,
+ Index: int64(r.logOffset),
+ Rows: rows,
+ NoMore: noMore,
+ }))
+ if err != nil {
+ return err
+ }
+
+ ack := int(resp.Msg.AckIndex)
+ if ack < r.logOffset {
+ return fmt.Errorf("submitted logs are lost")
+ }
+
+ r.stateMu.Lock()
+ r.logRows = r.logRows[ack-r.logOffset:]
+ r.logOffset = ack
+ r.stateMu.Unlock()
+
+ if noMore && ack < r.logOffset+len(rows) {
+ return fmt.Errorf("not all logs are submitted")
+ }
+
+ return nil
+}
+
+func (r *Reporter) ReportState() error {
+ r.clientM.Lock()
+ defer r.clientM.Unlock()
+
+ r.stateMu.RLock()
+ state := proto.Clone(r.state).(*runnerv1.TaskState)
+ r.stateMu.RUnlock()
+
+ outputs := make(map[string]string)
+ r.outputs.Range(func(k, v interface{}) bool {
+ if val, ok := v.(string); ok {
+ outputs[k.(string)] = val
+ }
+ return true
+ })
+
+ resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
+ State: state,
+ Outputs: outputs,
+ }))
+ if err != nil {
+ return err
+ }
+
+ for _, k := range resp.Msg.SentOutputs {
+ r.outputs.Store(k, struct{}{})
+ }
+
+ if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
+ r.cancel()
+ }
+
+ var noSent []string
+ r.outputs.Range(func(k, v interface{}) bool {
+ if _, ok := v.(string); ok {
+ noSent = append(noSent, k.(string))
+ }
+ return true
+ })
+ if len(noSent) > 0 {
+ return fmt.Errorf("there are still outputs that have not been sent: %v", noSent)
+ }
+
+ return nil
+}
+
+func (r *Reporter) duringSteps() bool {
+ if steps := r.state.Steps; len(steps) == 0 {
+ return false
+ } else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 {
+ return false
+ } else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED {
+ return false
+ }
+ return true
+}
+
+var stringToResult = map[string]runnerv1.Result{
+ "success": runnerv1.Result_RESULT_SUCCESS,
+ "failure": runnerv1.Result_RESULT_FAILURE,
+ "skipped": runnerv1.Result_RESULT_SKIPPED,
+ "cancelled": runnerv1.Result_RESULT_CANCELLED,
+}
+
+func (r *Reporter) parseResult(result interface{}) (runnerv1.Result, bool) {
+ str := ""
+ if v, ok := result.(string); ok { // for jobResult
+ str = v
+ } else if v, ok := result.(fmt.Stringer); ok { // for stepResult
+ str = v.String()
+ }
+
+ ret, ok := stringToResult[str]
+ return ret, ok
+}
+
+var cmdRegex = regexp.MustCompile(`^::([^ :]+)( .*)?::(.*)$`)
+
+func (r *Reporter) handleCommand(originalContent, command, parameters, value string) *string {
+ if r.stopCommandEndToken != "" && command != r.stopCommandEndToken {
+ return &originalContent
+ }
+
+ switch command {
+ case "add-mask":
+ r.addMask(value)
+ return nil
+ case "debug":
+ if r.debugOutputEnabled {
+ return &value
+ }
+ return nil
+
+ case "notice":
+ // Not implemented yet, so just return the original content.
+ return &originalContent
+ case "warning":
+ // Not implemented yet, so just return the original content.
+ return &originalContent
+ case "error":
+ // Not implemented yet, so just return the original content.
+ return &originalContent
+ case "group":
+ // Rewriting into ##[] syntax which the frontend understands
+ content := "##[group]" + value
+ return &content
+ case "endgroup":
+ // Ditto
+ content := "##[endgroup]"
+ return &content
+ case "stop-commands":
+ r.stopCommandEndToken = value
+ return nil
+ case r.stopCommandEndToken:
+ r.stopCommandEndToken = ""
+ return nil
+ }
+ return &originalContent
+}
+
+func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow {
+ content := strings.TrimRightFunc(entry.Message, func(r rune) bool { return r == '\r' || r == '\n' })
+
+ matches := cmdRegex.FindStringSubmatch(content)
+ if matches != nil {
+ if output := r.handleCommand(content, matches[1], matches[2], matches[3]); output != nil {
+ content = *output
+ } else {
+ return nil
+ }
+ }
+
+ content = r.logReplacer.Replace(content)
+
+ return &runnerv1.LogRow{
+ Time: timestamppb.New(entry.Time),
+ Content: strings.ToValidUTF8(content, "?"),
+ }
+}
+
+func (r *Reporter) addMask(msg string) {
+ r.oldnew = append(r.oldnew, msg, "***")
+ r.logReplacer = strings.NewReplacer(r.oldnew...)
+}
diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go
new file mode 100644
index 0000000..524e972
--- /dev/null
+++ b/internal/pkg/report/reporter_test.go
@@ -0,0 +1,198 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package report
+
+import (
+ "context"
+ "strings"
+ "testing"
+ "time"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ connect_go "connectrpc.com/connect"
+ log "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "gitea.com/gitea/act_runner/internal/pkg/client/mocks"
+)
+
+func TestReporter_parseLogRow(t *testing.T) {
+ tests := []struct {
+ name string
+ debugOutputEnabled bool
+ args []string
+ want []string
+ }{
+ {
+ "No command", false,
+ []string{"Hello, world!"},
+ []string{"Hello, world!"},
+ },
+ {
+ "Add-mask", false,
+ []string{
+ "foo mysecret bar",
+ "::add-mask::mysecret",
+ "foo mysecret bar",
+ },
+ []string{
+ "foo mysecret bar",
+ "<nil>",
+ "foo *** bar",
+ },
+ },
+ {
+ "Debug enabled", true,
+ []string{
+ "::debug::GitHub Actions runtime token access controls",
+ },
+ []string{
+ "GitHub Actions runtime token access controls",
+ },
+ },
+ {
+ "Debug not enabled", false,
+ []string{
+ "::debug::GitHub Actions runtime token access controls",
+ },
+ []string{
+ "<nil>",
+ },
+ },
+ {
+ "notice", false,
+ []string{
+ "::notice file=file.name,line=42,endLine=48,title=Cool Title::Gosh, that's not going to work",
+ },
+ []string{
+ "::notice file=file.name,line=42,endLine=48,title=Cool Title::Gosh, that's not going to work",
+ },
+ },
+ {
+ "warning", false,
+ []string{
+ "::warning file=file.name,line=42,endLine=48,title=Cool Title::Gosh, that's not going to work",
+ },
+ []string{
+ "::warning file=file.name,line=42,endLine=48,title=Cool Title::Gosh, that's not going to work",
+ },
+ },
+ {
+ "error", false,
+ []string{
+ "::error file=file.name,line=42,endLine=48,title=Cool Title::Gosh, that's not going to work",
+ },
+ []string{
+ "::error file=file.name,line=42,endLine=48,title=Cool Title::Gosh, that's not going to work",
+ },
+ },
+ {
+ "group", false,
+ []string{
+ "::group::",
+ "::endgroup::",
+ },
+ []string{
+ "##[group]",
+ "##[endgroup]",
+ },
+ },
+ {
+ "stop-commands", false,
+ []string{
+ "::add-mask::foo",
+ "::stop-commands::myverycoolstoptoken",
+ "::add-mask::bar",
+ "::debug::Stuff",
+ "myverycoolstoptoken",
+ "::add-mask::baz",
+ "::myverycoolstoptoken::",
+ "::add-mask::wibble",
+ "foo bar baz wibble",
+ },
+ []string{
+ "<nil>",
+ "<nil>",
+ "::add-mask::bar",
+ "::debug::Stuff",
+ "myverycoolstoptoken",
+ "::add-mask::baz",
+ "<nil>",
+ "<nil>",
+ "*** bar baz ***",
+ },
+ },
+ {
+ "unknown command", false,
+ []string{
+ "::set-mask::foo",
+ },
+ []string{
+ "::set-mask::foo",
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ r := &Reporter{
+ logReplacer: strings.NewReplacer(),
+ debugOutputEnabled: tt.debugOutputEnabled,
+ }
+ for idx, arg := range tt.args {
+ rv := r.parseLogRow(&log.Entry{Message: arg})
+ got := "<nil>"
+
+ if rv != nil {
+ got = rv.Content
+ }
+
+ assert.Equal(t, tt.want[idx], got)
+ }
+ })
+ }
+}
+
+func TestReporter_Fire(t *testing.T) {
+ t.Run("ignore command lines", func(t *testing.T) {
+ client := mocks.NewClient(t)
+ client.On("UpdateLog", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
+ t.Logf("Received UpdateLog: %s", req.Msg.String())
+ return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
+ AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
+ }), nil
+ })
+ client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
+ t.Logf("Received UpdateTask: %s", req.Msg.String())
+ return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
+ })
+ ctx, cancel := context.WithCancel(context.Background())
+ taskCtx, err := structpb.NewStruct(map[string]interface{}{})
+ require.NoError(t, err)
+ reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
+ Context: taskCtx,
+ }, time.Second)
+ defer func() {
+ assert.NoError(t, reporter.Close(""))
+ }()
+ reporter.ResetSteps(5)
+
+ dataStep0 := map[string]interface{}{
+ "stage": "Main",
+ "stepNumber": 0,
+ "raw_output": true,
+ }
+
+ assert.NoError(t, reporter.Fire(&log.Entry{Message: "regular log line", Data: dataStep0}))
+ assert.NoError(t, reporter.Fire(&log.Entry{Message: "::debug::debug log line", Data: dataStep0}))
+ assert.NoError(t, reporter.Fire(&log.Entry{Message: "regular log line", Data: dataStep0}))
+ assert.NoError(t, reporter.Fire(&log.Entry{Message: "::debug::debug log line", Data: dataStep0}))
+ assert.NoError(t, reporter.Fire(&log.Entry{Message: "::debug::debug log line", Data: dataStep0}))
+ assert.NoError(t, reporter.Fire(&log.Entry{Message: "regular log line", Data: dataStep0}))
+
+ assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength)
+ })
+}
diff --git a/internal/pkg/ver/version.go b/internal/pkg/ver/version.go
new file mode 100644
index 0000000..3c07a18
--- /dev/null
+++ b/internal/pkg/ver/version.go
@@ -0,0 +1,11 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package ver
+
+// go build -ldflags "-X gitea.com/gitea/act_runner/internal/pkg/ver.version=1.2.3"
+var version = "dev"
+
+func Version() string {
+ return version
+}