diff options
Diffstat (limited to 'internal/app')
-rw-r--r-- | internal/app/cmd/cache-server.go | 69 | ||||
-rw-r--r-- | internal/app/cmd/cmd.go | 87 | ||||
-rw-r--r-- | internal/app/cmd/create-runner-file.go | 164 | ||||
-rw-r--r-- | internal/app/cmd/create-runner-file_test.go | 118 | ||||
-rw-r--r-- | internal/app/cmd/daemon.go | 208 | ||||
-rw-r--r-- | internal/app/cmd/exec.go | 495 | ||||
-rw-r--r-- | internal/app/cmd/register.go | 355 | ||||
-rw-r--r-- | internal/app/poll/poller.go | 167 | ||||
-rw-r--r-- | internal/app/poll/poller_test.go | 263 | ||||
-rw-r--r-- | internal/app/run/runner.go | 260 | ||||
-rw-r--r-- | internal/app/run/runner_test.go | 37 | ||||
-rw-r--r-- | internal/app/run/workflow.go | 54 | ||||
-rw-r--r-- | internal/app/run/workflow_test.go | 74 |
13 files changed, 2351 insertions, 0 deletions
diff --git a/internal/app/cmd/cache-server.go b/internal/app/cmd/cache-server.go new file mode 100644 index 0000000..21b3352 --- /dev/null +++ b/internal/app/cmd/cache-server.go @@ -0,0 +1,69 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "context" + "fmt" + "os" + "os/signal" + + "gitea.com/gitea/act_runner/internal/pkg/config" + + "github.com/nektos/act/pkg/artifactcache" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +type cacheServerArgs struct { + Dir string + Host string + Port uint16 +} + +func runCacheServer(ctx context.Context, configFile *string, cacheArgs *cacheServerArgs) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + cfg, err := config.LoadDefault(*configFile) + if err != nil { + return fmt.Errorf("invalid configuration: %w", err) + } + + initLogging(cfg) + + var ( + dir = cfg.Cache.Dir + host = cfg.Cache.Host + port = cfg.Cache.Port + ) + + // cacheArgs has higher priority + if cacheArgs.Dir != "" { + dir = cacheArgs.Dir + } + if cacheArgs.Host != "" { + host = cacheArgs.Host + } + if cacheArgs.Port != 0 { + port = cacheArgs.Port + } + + cacheHandler, err := artifactcache.StartHandler( + dir, + host, + port, + log.StandardLogger().WithField("module", "cache_request"), + ) + if err != nil { + return err + } + + log.Infof("cache server is listening on %v", cacheHandler.ExternalURL()) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + + return nil + } +} diff --git a/internal/app/cmd/cmd.go b/internal/app/cmd/cmd.go new file mode 100644 index 0000000..48341dc --- /dev/null +++ b/internal/app/cmd/cmd.go @@ -0,0 +1,87 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +func Execute(ctx context.Context) { + // ./act_runner + rootCmd := &cobra.Command{ + Use: "forgejo-runner [event name to run]\nIf no event name passed, will default to \"on: push\"", + Short: "Run Forgejo Actions locally by specifying the event name (e.g. `push`) or an action name directly.", + Args: cobra.MaximumNArgs(1), + Version: ver.Version(), + SilenceUsage: true, + } + configFile := "" + rootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "", "Config file path") + + // ./act_runner register + var regArgs registerArgs + registerCmd := &cobra.Command{ + Use: "register", + Short: "Register a runner to the server", + Args: cobra.MaximumNArgs(0), + RunE: runRegister(ctx, ®Args, &configFile), // must use a pointer to regArgs + } + registerCmd.Flags().BoolVar(®Args.NoInteractive, "no-interactive", false, "Disable interactive mode") + registerCmd.Flags().StringVar(®Args.InstanceAddr, "instance", "", "Forgejo instance address") + registerCmd.Flags().StringVar(®Args.Token, "token", "", "Runner token") + registerCmd.Flags().StringVar(®Args.RunnerName, "name", "", "Runner name") + registerCmd.Flags().StringVar(®Args.Labels, "labels", "", "Runner tags, comma separated") + rootCmd.AddCommand(registerCmd) + + rootCmd.AddCommand(createRunnerFileCmd(ctx, &configFile)) + + // ./act_runner daemon + daemonCmd := &cobra.Command{ + Use: "daemon", + Short: "Run as a runner daemon", + Args: cobra.MaximumNArgs(1), + RunE: runDaemon(ctx, &configFile), + } + rootCmd.AddCommand(daemonCmd) + + // ./act_runner exec + rootCmd.AddCommand(loadExecCmd(ctx)) + + // ./act_runner config + rootCmd.AddCommand(&cobra.Command{ + Use: "generate-config", + Short: "Generate an example config file", + Args: cobra.MaximumNArgs(0), + Run: func(_ *cobra.Command, _ []string) { + fmt.Printf("%s", config.Example) + }, + }) + + // ./act_runner cache-server + var cacheArgs cacheServerArgs + cacheCmd := &cobra.Command{ + Use: "cache-server", + Short: "Start a cache server for the cache action", + Args: cobra.MaximumNArgs(0), + RunE: runCacheServer(ctx, &configFile, &cacheArgs), + } + cacheCmd.Flags().StringVarP(&cacheArgs.Dir, "dir", "d", "", "Cache directory") + cacheCmd.Flags().StringVarP(&cacheArgs.Host, "host", "s", "", "Host of the cache server") + cacheCmd.Flags().Uint16VarP(&cacheArgs.Port, "port", "p", 0, "Port of the cache server") + rootCmd.AddCommand(cacheCmd) + + // hide completion command + rootCmd.CompletionOptions.HiddenDefaultCmd = true + + if err := rootCmd.Execute(); err != nil { + os.Exit(1) + } +} diff --git a/internal/app/cmd/create-runner-file.go b/internal/app/cmd/create-runner-file.go new file mode 100644 index 0000000..a972624 --- /dev/null +++ b/internal/app/cmd/create-runner-file.go @@ -0,0 +1,164 @@ +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "context" + "encoding/hex" + "fmt" + "os" + + pingv1 "code.gitea.io/actions-proto-go/ping/v1" + "connectrpc.com/connect" + gouuid "github.com/google/uuid" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "gitea.com/gitea/act_runner/internal/app/run" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +type createRunnerFileArgs struct { + Connect bool + InstanceAddr string + Secret string + Name string +} + +func createRunnerFileCmd(ctx context.Context, configFile *string) *cobra.Command { + var argsVar createRunnerFileArgs + cmd := &cobra.Command{ + Use: "create-runner-file", + Short: "Create a runner file using a shared secret used to pre-register the runner on the Forgejo instance", + Args: cobra.MaximumNArgs(0), + RunE: runCreateRunnerFile(ctx, &argsVar, configFile), + } + cmd.Flags().BoolVar(&argsVar.Connect, "connect", false, "tries to connect to the instance using the secret (Forgejo v1.21 instance or greater)") + cmd.Flags().StringVar(&argsVar.InstanceAddr, "instance", "", "Forgejo instance address") + cmd.MarkFlagRequired("instance") + cmd.Flags().StringVar(&argsVar.Secret, "secret", "", "secret shared with the Forgejo instance via forgejo-cli actions register") + cmd.MarkFlagRequired("secret") + cmd.Flags().StringVar(&argsVar.Name, "name", "", "Runner name") + + return cmd +} + +// must be exactly the same as fogejo/models/actions/forgejo.go +func uuidFromSecret(secret string) (string, error) { + uuid, err := gouuid.FromBytes([]byte(secret[:16])) + if err != nil { + return "", fmt.Errorf("gouuid.FromBytes %v", err) + } + return uuid.String(), nil +} + +// should be exactly the same as forgejo/cmd/forgejo/actions.go +func validateSecret(secret string) error { + secretLen := len(secret) + if secretLen != 40 { + return fmt.Errorf("the secret must be exactly 40 characters long, not %d", secretLen) + } + if _, err := hex.DecodeString(secret); err != nil { + return fmt.Errorf("the secret must be an hexadecimal string: %w", err) + } + return nil +} + +func ping(cfg *config.Config, reg *config.Registration) error { + // initial http client + cli := client.New( + reg.Address, + cfg.Runner.Insecure, + "", + "", + ver.Version(), + ) + + _, err := cli.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{ + Data: reg.UUID, + })) + if err != nil { + return fmt.Errorf("ping %s failed %w", reg.Address, err) + } + return nil +} + +func runCreateRunnerFile(ctx context.Context, args *createRunnerFileArgs, configFile *string) func(cmd *cobra.Command, args []string) error { + return func(*cobra.Command, []string) error { + log.SetLevel(log.DebugLevel) + log.Info("Creating runner file") + + // + // Prepare the registration data + // + cfg, err := config.LoadDefault(*configFile) + if err != nil { + return fmt.Errorf("invalid configuration: %w", err) + } + + if err := validateSecret(args.Secret); err != nil { + return err + } + + uuid, err := uuidFromSecret(args.Secret) + if err != nil { + return err + } + + name := args.Name + if name == "" { + name, _ = os.Hostname() + log.Infof("Runner name is empty, use hostname '%s'.", name) + } + + reg := &config.Registration{ + Name: name, + UUID: uuid, + Token: args.Secret, + Address: args.InstanceAddr, + } + + // + // Verify the Forgejo instance is reachable + // + if err := ping(cfg, reg); err != nil { + return err + } + + // + // Save the registration file + // + if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil { + return fmt.Errorf("failed to save runner config to %s: %w", cfg.Runner.File, err) + } + + // + // Verify the secret works + // + if args.Connect { + cli := client.New( + reg.Address, + cfg.Runner.Insecure, + reg.UUID, + reg.Token, + ver.Version(), + ) + + runner := run.NewRunner(cfg, reg, cli) + resp, err := runner.Declare(ctx, cfg.Runner.Labels) + + if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented { + log.Warn("Cannot verify the connection because the Forgejo instance is lower than v1.21") + } else if err != nil { + log.WithError(err).Error("fail to invoke Declare") + return err + } else { + log.Infof("connection successful: %s, with version: %s, with labels: %v", + resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels) + } + } + return nil + } +} diff --git a/internal/app/cmd/create-runner-file_test.go b/internal/app/cmd/create-runner-file_test.go new file mode 100644 index 0000000..4f3acb8 --- /dev/null +++ b/internal/app/cmd/create-runner-file_test.go @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "bytes" + "context" + "os" + "testing" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "connectrpc.com/connect" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/ver" + + "github.com/spf13/cobra" + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" +) + +func executeCommand(ctx context.Context, cmd *cobra.Command, args ...string) (string, error) { + buf := new(bytes.Buffer) + cmd.SetOut(buf) + cmd.SetErr(buf) + cmd.SetArgs(args) + + err := cmd.ExecuteContext(ctx) + + return buf.String(), err +} + +func Test_createRunnerFileCmd(t *testing.T) { + configFile := "config.yml" + ctx := context.Background() + cmd := createRunnerFileCmd(ctx, &configFile) + output, err := executeCommand(ctx, cmd) + assert.ErrorContains(t, err, `required flag(s) "instance", "secret" not set`) + assert.Contains(t, output, "Usage:") +} + +func Test_validateSecret(t *testing.T) { + assert.ErrorContains(t, validateSecret("abc"), "exactly 40 characters") + assert.ErrorContains(t, validateSecret("ZAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"), "must be an hexadecimal") +} + +func Test_uuidFromSecret(t *testing.T) { + uuid, err := uuidFromSecret("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") + assert.NoError(t, err) + assert.EqualValues(t, uuid, "41414141-4141-4141-4141-414141414141") +} + +func Test_ping(t *testing.T) { + cfg := &config.Config{} + address := os.Getenv("FORGEJO_URL") + if address == "" { + address = "https://code.forgejo.org" + } + reg := &config.Registration{ + Address: address, + UUID: "create-runner-file_test.go", + } + assert.NoError(t, ping(cfg, reg)) +} + +func Test_runCreateRunnerFile(t *testing.T) { + // + // Set the .runner file to be in a temporary directory + // + dir := t.TempDir() + configFile := dir + "/config.yml" + runnerFile := dir + "/.runner" + cfg, err := config.LoadDefault("") + cfg.Runner.File = runnerFile + yamlData, err := yaml.Marshal(cfg) + assert.NoError(t, err) + assert.NoError(t, os.WriteFile(configFile, yamlData, 0o666)) + + instance, has := os.LookupEnv("FORGEJO_URL") + if !has { + instance = "https://code.forgejo.org" + } + secret, has := os.LookupEnv("FORGEJO_RUNNER_SECRET") + assert.True(t, has) + name := "testrunner" + + // + // Run create-runner-file + // + ctx := context.Background() + cmd := createRunnerFileCmd(ctx, &configFile) + output, err := executeCommand(ctx, cmd, "--connect", "--secret", secret, "--instance", instance, "--name", name) + assert.NoError(t, err) + assert.EqualValues(t, "", output) + + // + // Read back the runner file and verify its content + // + reg, err := config.LoadRegistration(runnerFile) + assert.NoError(t, err) + assert.EqualValues(t, secret, reg.Token) + assert.EqualValues(t, instance, reg.Address) + + // + // Verify that fetching a task successfully returns there is + // no task for this runner + // + cli := client.New( + reg.Address, + cfg.Runner.Insecure, + reg.UUID, + reg.Token, + ver.Version(), + ) + resp, err := cli.FetchTask(ctx, connect.NewRequest(&runnerv1.FetchTaskRequest{})) + assert.NoError(t, err) + assert.Nil(t, resp.Msg.Task) +} diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go new file mode 100644 index 0000000..a613546 --- /dev/null +++ b/internal/app/cmd/daemon.go @@ -0,0 +1,208 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "runtime" + "strconv" + "strings" + + "connectrpc.com/connect" + "github.com/mattn/go-isatty" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "gitea.com/gitea/act_runner/internal/app/poll" + "gitea.com/gitea/act_runner/internal/app/run" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/envcheck" + "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + cfg, err := config.LoadDefault(*configFile) + if err != nil { + return fmt.Errorf("invalid configuration: %w", err) + } + + initLogging(cfg) + log.Infoln("Starting runner daemon") + + reg, err := config.LoadRegistration(cfg.Runner.File) + if os.IsNotExist(err) { + log.Error("registration file not found, please register the runner first") + return err + } else if err != nil { + return fmt.Errorf("failed to load registration file: %w", err) + } + + cfg.Tune(reg.Address) + + lbls := reg.Labels + if len(cfg.Runner.Labels) > 0 { + lbls = cfg.Runner.Labels + } + + ls := labels.Labels{} + for _, l := range lbls { + label, err := labels.Parse(l) + if err != nil { + log.WithError(err).Warnf("ignored invalid label %q", l) + continue + } + ls = append(ls, label) + } + if len(ls) == 0 { + log.Warn("no labels configured, runner may not be able to pick up jobs") + } + + if ls.RequireDocker() { + dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost) + if err != nil { + return err + } + if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil { + return err + } + // if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath + os.Setenv("DOCKER_HOST", dockerSocketPath) + // empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically + // and assign the path to cfg.Container.DockerHost + if cfg.Container.DockerHost == "" { + cfg.Container.DockerHost = dockerSocketPath + } + // check the scheme, if the scheme is not npipe or unix + // set cfg.Container.DockerHost to "-" because it can't be mounted to the job container + if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 { + scheme := cfg.Container.DockerHost[:protoIndex] + if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") { + cfg.Container.DockerHost = "-" + } + } + } + + cli := client.New( + reg.Address, + cfg.Runner.Insecure, + reg.UUID, + reg.Token, + ver.Version(), + ) + + runner := run.NewRunner(cfg, reg, cli) + // declare the labels of the runner before fetching tasks + resp, err := runner.Declare(ctx, ls.Names()) + if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented { + // Gitea instance is older version. skip declare step. + log.Warn("Because the Forgejo instance is an old version, skipping declaring the labels and version.") + } else if err != nil { + log.WithError(err).Error("fail to invoke Declare") + return err + } else { + log.Infof("runner: %s, with version: %s, with labels: %v, declared successfully", + resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels) + // if declared successfully, override the labels in the.runner file with valid labels in the config file (if specified) + runner.Update(ctx, ls) + reg.Labels = ls.ToStrings() + if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil { + return fmt.Errorf("failed to save runner config: %w", err) + } + } + + poller := poll.New(cfg, cli, runner) + + go poller.Poll() + + <-ctx.Done() + log.Infof("runner: %s shutdown initiated, waiting [runner].shutdown_timeout=%s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout) + defer cancel() + + err = poller.Shutdown(ctx) + if err != nil { + log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name) + } + return nil + } +} + +// initLogging setup the global logrus logger. +func initLogging(cfg *config.Config) { + isTerm := isatty.IsTerminal(os.Stdout.Fd()) + format := &log.TextFormatter{ + DisableColors: !isTerm, + FullTimestamp: true, + } + log.SetFormatter(format) + + if l := cfg.Log.Level; l != "" { + level, err := log.ParseLevel(l) + if err != nil { + log.WithError(err). + Errorf("invalid log level: %q", l) + } + + // debug level + if level == log.DebugLevel { + log.SetReportCaller(true) + format.CallerPrettyfier = func(f *runtime.Frame) (string, string) { + // get function name + s := strings.Split(f.Function, ".") + funcname := "[" + s[len(s)-1] + "]" + // get file name and line number + _, filename := path.Split(f.File) + filename = "[" + filename + ":" + strconv.Itoa(f.Line) + "]" + return funcname, filename + } + log.SetFormatter(format) + } + + if log.GetLevel() != level { + log.Infof("log level changed to %v", level) + log.SetLevel(level) + } + } +} + +var commonSocketPaths = []string{ + "/var/run/docker.sock", + "/run/podman/podman.sock", + "$HOME/.colima/docker.sock", + "$XDG_RUNTIME_DIR/docker.sock", + "$XDG_RUNTIME_DIR/podman/podman.sock", + `\\.\pipe\docker_engine`, + "$HOME/.docker/run/docker.sock", +} + +func getDockerSocketPath(configDockerHost string) (string, error) { + // a `-` means don't mount the docker socket to job containers + if configDockerHost != "" && configDockerHost != "-" { + return configDockerHost, nil + } + + socket, found := os.LookupEnv("DOCKER_HOST") + if found { + return socket, nil + } + + for _, p := range commonSocketPaths { + if _, err := os.Lstat(os.ExpandEnv(p)); err == nil { + if strings.HasPrefix(p, `\\.\`) { + return "npipe://" + filepath.ToSlash(os.ExpandEnv(p)), nil + } + return "unix://" + filepath.ToSlash(os.ExpandEnv(p)), nil + } + } + + return "", fmt.Errorf("daemon Docker Engine socket not found and docker_host config was invalid") +} diff --git a/internal/app/cmd/exec.go b/internal/app/cmd/exec.go new file mode 100644 index 0000000..3e111fe --- /dev/null +++ b/internal/app/cmd/exec.go @@ -0,0 +1,495 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// Copyright 2019 nektos +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/joho/godotenv" + "github.com/nektos/act/pkg/artifactcache" + "github.com/nektos/act/pkg/artifacts" + "github.com/nektos/act/pkg/common" + "github.com/nektos/act/pkg/model" + "github.com/nektos/act/pkg/runner" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "golang.org/x/term" +) + +type executeArgs struct { + runList bool + job string + event string + workdir string + workflowsPath string + noWorkflowRecurse bool + autodetectEvent bool + forcePull bool + forceRebuild bool + jsonLogger bool + envs []string + envfile string + secrets []string + defaultActionsURL string + insecureSecrets bool + privileged bool + usernsMode string + containerArchitecture string + containerDaemonSocket string + useGitIgnore bool + containerCapAdd []string + containerCapDrop []string + containerOptions string + artifactServerPath string + artifactServerAddr string + artifactServerPort string + noSkipCheckout bool + debug bool + dryrun bool + image string + cacheHandler *artifactcache.Handler + network string + enableIPv6 bool + githubInstance string +} + +// WorkflowsPath returns path to workflow file(s) +func (i *executeArgs) WorkflowsPath() string { + return i.resolve(i.workflowsPath) +} + +// Envfile returns path to .env +func (i *executeArgs) Envfile() string { + return i.resolve(i.envfile) +} + +func (i *executeArgs) LoadSecrets() map[string]string { + s := make(map[string]string) + for _, secretPair := range i.secrets { + secretPairParts := strings.SplitN(secretPair, "=", 2) + secretPairParts[0] = strings.ToUpper(secretPairParts[0]) + if strings.ToUpper(s[secretPairParts[0]]) == secretPairParts[0] { + log.Errorf("Secret %s is already defined (secrets are case insensitive)", secretPairParts[0]) + } + if len(secretPairParts) == 2 { + s[secretPairParts[0]] = secretPairParts[1] + } else if env, ok := os.LookupEnv(secretPairParts[0]); ok && env != "" { + s[secretPairParts[0]] = env + } else { + fmt.Printf("Provide value for '%s': ", secretPairParts[0]) + val, err := term.ReadPassword(int(os.Stdin.Fd())) + fmt.Println() + if err != nil { + log.Errorf("failed to read input: %v", err) + os.Exit(1) + } + s[secretPairParts[0]] = string(val) + } + } + return s +} + +func readEnvs(path string, envs map[string]string) bool { + if _, err := os.Stat(path); err == nil { + env, err := godotenv.Read(path) + if err != nil { + log.Fatalf("Error loading from %s: %v", path, err) + } + for k, v := range env { + envs[k] = v + } + return true + } + return false +} + +func (i *executeArgs) LoadEnvs() map[string]string { + envs := make(map[string]string) + if i.envs != nil { + for _, envVar := range i.envs { + e := strings.SplitN(envVar, `=`, 2) + if len(e) == 2 { + envs[e[0]] = e[1] + } else { + envs[e[0]] = "" + } + } + } + _ = readEnvs(i.Envfile(), envs) + + envs["ACTIONS_CACHE_URL"] = i.cacheHandler.ExternalURL() + "/" + + return envs +} + +// Workdir returns path to workdir +func (i *executeArgs) Workdir() string { + return i.resolve(".") +} + +func (i *executeArgs) resolve(path string) string { + basedir, err := filepath.Abs(i.workdir) + if err != nil { + log.Fatal(err) + } + if path == "" { + return path + } + if !filepath.IsAbs(path) { + path = filepath.Join(basedir, path) + } + return path +} + +func printList(plan *model.Plan) error { + type lineInfoDef struct { + jobID string + jobName string + stage string + wfName string + wfFile string + events string + } + lineInfos := []lineInfoDef{} + + header := lineInfoDef{ + jobID: "Job ID", + jobName: "Job name", + stage: "Stage", + wfName: "Workflow name", + wfFile: "Workflow file", + events: "Events", + } + + jobs := map[string]bool{} + duplicateJobIDs := false + + jobIDMaxWidth := len(header.jobID) + jobNameMaxWidth := len(header.jobName) + stageMaxWidth := len(header.stage) + wfNameMaxWidth := len(header.wfName) + wfFileMaxWidth := len(header.wfFile) + eventsMaxWidth := len(header.events) + + for i, stage := range plan.Stages { + for _, r := range stage.Runs { + jobID := r.JobID + line := lineInfoDef{ + jobID: jobID, + jobName: r.String(), + stage: strconv.Itoa(i), + wfName: r.Workflow.Name, + wfFile: r.Workflow.File, + events: strings.Join(r.Workflow.On(), `,`), + } + if _, ok := jobs[jobID]; ok { + duplicateJobIDs = true + } else { + jobs[jobID] = true + } + lineInfos = append(lineInfos, line) + if jobIDMaxWidth < len(line.jobID) { + jobIDMaxWidth = len(line.jobID) + } + if jobNameMaxWidth < len(line.jobName) { + jobNameMaxWidth = len(line.jobName) + } + if stageMaxWidth < len(line.stage) { + stageMaxWidth = len(line.stage) + } + if wfNameMaxWidth < len(line.wfName) { + wfNameMaxWidth = len(line.wfName) + } + if wfFileMaxWidth < len(line.wfFile) { + wfFileMaxWidth = len(line.wfFile) + } + if eventsMaxWidth < len(line.events) { + eventsMaxWidth = len(line.events) + } + } + } + + jobIDMaxWidth += 2 + jobNameMaxWidth += 2 + stageMaxWidth += 2 + wfNameMaxWidth += 2 + wfFileMaxWidth += 2 + + fmt.Printf("%*s%*s%*s%*s%*s%*s\n", + -stageMaxWidth, header.stage, + -jobIDMaxWidth, header.jobID, + -jobNameMaxWidth, header.jobName, + -wfNameMaxWidth, header.wfName, + -wfFileMaxWidth, header.wfFile, + -eventsMaxWidth, header.events, + ) + for _, line := range lineInfos { + fmt.Printf("%*s%*s%*s%*s%*s%*s\n", + -stageMaxWidth, line.stage, + -jobIDMaxWidth, line.jobID, + -jobNameMaxWidth, line.jobName, + -wfNameMaxWidth, line.wfName, + -wfFileMaxWidth, line.wfFile, + -eventsMaxWidth, line.events, + ) + } + if duplicateJobIDs { + fmt.Print("\nDetected multiple jobs with the same job name, use `-W` to specify the path to the specific workflow.\n") + } + return nil +} + +func runExecList(ctx context.Context, planner model.WorkflowPlanner, execArgs *executeArgs) error { + // plan with filtered jobs - to be used for filtering only + var filterPlan *model.Plan + + // Determine the event name to be filtered + var filterEventName string + + if len(execArgs.event) > 0 { + log.Infof("Using chosed event for filtering: %s", execArgs.event) + filterEventName = execArgs.event + } else if execArgs.autodetectEvent { + // collect all events from loaded workflows + events := planner.GetEvents() + + // set default event type to first event from many available + // this way user dont have to specify the event. + log.Infof("Using first detected workflow event for filtering: %s", events[0]) + + filterEventName = events[0] + } + + var err error + if execArgs.job != "" { + log.Infof("Preparing plan with a job: %s", execArgs.job) + filterPlan, err = planner.PlanJob(execArgs.job) + if err != nil { + return err + } + } else if filterEventName != "" { + log.Infof("Preparing plan for a event: %s", filterEventName) + filterPlan, err = planner.PlanEvent(filterEventName) + if err != nil { + return err + } + } else { + log.Infof("Preparing plan with all jobs") + filterPlan, err = planner.PlanAll() + if err != nil { + return err + } + } + + _ = printList(filterPlan) + + return nil +} + +func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + planner, err := model.NewWorkflowPlanner(execArgs.WorkflowsPath(), execArgs.noWorkflowRecurse) + if err != nil { + return err + } + + if execArgs.runList { + return runExecList(ctx, planner, execArgs) + } + + // plan with triggered jobs + var plan *model.Plan + + // Determine the event name to be triggered + var eventName string + + // collect all events from loaded workflows + events := planner.GetEvents() + + if len(execArgs.event) > 0 { + log.Infof("Using chosed event for filtering: %s", execArgs.event) + eventName = execArgs.event + } else if len(events) == 1 && len(events[0]) > 0 { + log.Infof("Using the only detected workflow event: %s", events[0]) + eventName = events[0] + } else if execArgs.autodetectEvent && len(events) > 0 && len(events[0]) > 0 { + // set default event type to first event from many available + // this way user dont have to specify the event. + log.Infof("Using first detected workflow event: %s", events[0]) + eventName = events[0] + } else { + log.Infof("Using default workflow event: push") + eventName = "push" + } + + // build the plan for this run + if execArgs.job != "" { + log.Infof("Planning job: %s", execArgs.job) + plan, err = planner.PlanJob(execArgs.job) + if err != nil { + return err + } + } else { + log.Infof("Planning jobs for event: %s", eventName) + plan, err = planner.PlanEvent(eventName) + if err != nil { + return err + } + } + + maxLifetime := 3 * time.Hour + if deadline, ok := ctx.Deadline(); ok { + maxLifetime = time.Until(deadline) + } + + // init a cache server + handler, err := artifactcache.StartHandler("", "", 0, log.StandardLogger().WithField("module", "cache_request")) + if err != nil { + return err + } + log.Infof("cache handler listens on: %v", handler.ExternalURL()) + execArgs.cacheHandler = handler + + if len(execArgs.artifactServerAddr) == 0 { + ip := common.GetOutboundIP() + if ip == nil { + return fmt.Errorf("unable to determine outbound IP address") + } + execArgs.artifactServerAddr = ip.String() + } + + if len(execArgs.artifactServerPath) == 0 { + tempDir, err := os.MkdirTemp("", "gitea-act-") + if err != nil { + fmt.Println(err) + } + defer os.RemoveAll(tempDir) + + execArgs.artifactServerPath = tempDir + } + + // run the plan + config := &runner.Config{ + Workdir: execArgs.Workdir(), + BindWorkdir: false, + ReuseContainers: false, + ForcePull: execArgs.forcePull, + ForceRebuild: execArgs.forceRebuild, + LogOutput: true, + JSONLogger: execArgs.jsonLogger, + Env: execArgs.LoadEnvs(), + Secrets: execArgs.LoadSecrets(), + InsecureSecrets: execArgs.insecureSecrets, + Privileged: execArgs.privileged, + UsernsMode: execArgs.usernsMode, + ContainerArchitecture: execArgs.containerArchitecture, + ContainerDaemonSocket: execArgs.containerDaemonSocket, + UseGitIgnore: execArgs.useGitIgnore, + GitHubInstance: execArgs.githubInstance, + ContainerCapAdd: execArgs.containerCapAdd, + ContainerCapDrop: execArgs.containerCapDrop, + ContainerOptions: execArgs.containerOptions, + AutoRemove: true, + ArtifactServerPath: execArgs.artifactServerPath, + ArtifactServerPort: execArgs.artifactServerPort, + ArtifactServerAddr: execArgs.artifactServerAddr, + NoSkipCheckout: execArgs.noSkipCheckout, + // PresetGitHubContext: preset, + // EventJSON: string(eventJSON), + ContainerNamePrefix: fmt.Sprintf("FORGEJO-ACTIONS-TASK-%s", eventName), + ContainerMaxLifetime: maxLifetime, + ContainerNetworkMode: container.NetworkMode(execArgs.network), + ContainerNetworkEnableIPv6: execArgs.enableIPv6, + DefaultActionInstance: execArgs.defaultActionsURL, + PlatformPicker: func(_ []string) string { + return execArgs.image + }, + ValidVolumes: []string{"**"}, // All volumes are allowed for `exec` command + } + + config.Env["ACT_EXEC"] = "true" + + if t := config.Secrets["GITEA_TOKEN"]; t != "" { + config.Token = t + } else if t := config.Secrets["GITHUB_TOKEN"]; t != "" { + config.Token = t + } + + if !execArgs.debug { + logLevel := log.InfoLevel + config.JobLoggerLevel = &logLevel + } + + r, err := runner.New(config) + if err != nil { + return err + } + + artifactCancel := artifacts.Serve(ctx, execArgs.artifactServerPath, execArgs.artifactServerAddr, execArgs.artifactServerPort) + log.Debugf("artifacts server started at %s:%s", execArgs.artifactServerPath, execArgs.artifactServerPort) + + ctx = common.WithDryrun(ctx, execArgs.dryrun) + executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error { + artifactCancel() + return nil + }) + + return executor(ctx) + } +} + +func loadExecCmd(ctx context.Context) *cobra.Command { + execArg := executeArgs{} + + execCmd := &cobra.Command{ + Use: "exec", + Short: "Run workflow locally.", + Args: cobra.MaximumNArgs(20), + RunE: runExec(ctx, &execArg), + } + + execCmd.Flags().BoolVarP(&execArg.runList, "list", "l", false, "list workflows") + execCmd.Flags().StringVarP(&execArg.job, "job", "j", "", "run a specific job ID") + execCmd.Flags().StringVarP(&execArg.event, "event", "E", "", "run a event name") + execCmd.PersistentFlags().StringVarP(&execArg.workflowsPath, "workflows", "W", "./.forgejo/workflows/", "path to workflow file(s)") + execCmd.PersistentFlags().StringVarP(&execArg.workdir, "directory", "C", ".", "working directory") + execCmd.PersistentFlags().BoolVarP(&execArg.noWorkflowRecurse, "no-recurse", "", false, "Flag to disable running workflows from subdirectories of specified path in '--workflows'/'-W' flag") + execCmd.Flags().BoolVarP(&execArg.autodetectEvent, "detect-event", "", false, "Use first event type from workflow as event that triggered the workflow") + execCmd.Flags().BoolVarP(&execArg.forcePull, "pull", "p", false, "pull docker image(s) even if already present") + execCmd.Flags().BoolVarP(&execArg.forceRebuild, "rebuild", "", false, "rebuild local action docker image(s) even if already present") + execCmd.PersistentFlags().BoolVar(&execArg.jsonLogger, "json", false, "Output logs in json format") + execCmd.Flags().StringArrayVarP(&execArg.envs, "env", "", []string{}, "env to make available to actions with optional value (e.g. --env myenv=foo or --env myenv)") + execCmd.PersistentFlags().StringVarP(&execArg.envfile, "env-file", "", ".env", "environment file to read and use as env in the containers") + execCmd.Flags().StringArrayVarP(&execArg.secrets, "secret", "s", []string{}, "secret to make available to actions with optional value (e.g. -s mysecret=foo or -s mysecret)") + execCmd.PersistentFlags().BoolVarP(&execArg.insecureSecrets, "insecure-secrets", "", false, "NOT RECOMMENDED! Doesn't hide secrets while printing logs.") + execCmd.Flags().BoolVar(&execArg.privileged, "privileged", false, "use privileged mode") + execCmd.Flags().StringVar(&execArg.usernsMode, "userns", "", "user namespace to use") + execCmd.PersistentFlags().StringVarP(&execArg.containerArchitecture, "container-architecture", "", "", "Architecture which should be used to run containers, e.g.: linux/amd64. If not specified, will use host default architecture. Requires Docker server API Version 1.41+. Ignored on earlier Docker server platforms.") + execCmd.PersistentFlags().StringVarP(&execArg.containerDaemonSocket, "container-daemon-socket", "", "/var/run/docker.sock", "Path to Docker daemon socket which will be mounted to containers") + execCmd.Flags().BoolVar(&execArg.useGitIgnore, "use-gitignore", true, "Controls whether paths specified in .gitignore should be copied into container") + execCmd.Flags().StringArrayVarP(&execArg.containerCapAdd, "container-cap-add", "", []string{}, "kernel capabilities to add to the workflow containers (e.g. --container-cap-add SYS_PTRACE)") + execCmd.Flags().StringArrayVarP(&execArg.containerCapDrop, "container-cap-drop", "", []string{}, "kernel capabilities to remove from the workflow containers (e.g. --container-cap-drop SYS_PTRACE)") + execCmd.Flags().StringVarP(&execArg.containerOptions, "container-opts", "", "", "container options") + execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPath, "artifact-server-path", "", ".", "Defines the path where the artifact server stores uploads and retrieves downloads from. If not specified the artifact server will not start.") + execCmd.PersistentFlags().StringVarP(&execArg.artifactServerAddr, "artifact-server-addr", "", "", "Defines the address where the artifact server listens") + execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPort, "artifact-server-port", "", "34567", "Defines the port where the artifact server listens (will only bind to localhost).") + execCmd.PersistentFlags().StringVarP(&execArg.defaultActionsURL, "default-actions-url", "", "https://code.forgejo.org", "Defines the default base url of the action.") + execCmd.PersistentFlags().BoolVarP(&execArg.noSkipCheckout, "no-skip-checkout", "", false, "Do not skip actions/checkout") + execCmd.PersistentFlags().BoolVarP(&execArg.debug, "debug", "d", false, "enable debug log") + execCmd.PersistentFlags().BoolVarP(&execArg.dryrun, "dryrun", "n", false, "dryrun mode") + execCmd.PersistentFlags().StringVarP(&execArg.image, "image", "i", "node:20-bullseye", "Docker image to use. Use \"-self-hosted\" to run directly on the host.") + execCmd.PersistentFlags().StringVarP(&execArg.network, "network", "", "", "Specify the network to which the container will connect") + execCmd.PersistentFlags().BoolVarP(&execArg.enableIPv6, "enable-ipv6", "6", false, "Create network with IPv6 enabled.") + execCmd.PersistentFlags().StringVarP(&execArg.githubInstance, "gitea-instance", "", "", "Gitea instance to use.") + + return execCmd +} diff --git a/internal/app/cmd/register.go b/internal/app/cmd/register.go new file mode 100644 index 0000000..803511a --- /dev/null +++ b/internal/app/cmd/register.go @@ -0,0 +1,355 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "bufio" + "context" + "fmt" + "os" + "os/signal" + goruntime "runtime" + "strings" + "time" + + pingv1 "code.gitea.io/actions-proto-go/ping/v1" + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "connectrpc.com/connect" + "github.com/mattn/go-isatty" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +// runRegister registers a runner to the server +func runRegister(ctx context.Context, regArgs *registerArgs, configFile *string) func(*cobra.Command, []string) error { + return func(cmd *cobra.Command, args []string) error { + log.SetReportCaller(false) + isTerm := isatty.IsTerminal(os.Stdout.Fd()) + log.SetFormatter(&log.TextFormatter{ + DisableColors: !isTerm, + DisableTimestamp: true, + }) + log.SetLevel(log.DebugLevel) + + log.Infof("Registering runner, arch=%s, os=%s, version=%s.", + goruntime.GOARCH, goruntime.GOOS, ver.Version()) + + // runner always needs root permission + if os.Getuid() != 0 { + // TODO: use a better way to check root permission + log.Warnf("Runner in user-mode.") + } + + if regArgs.NoInteractive { + if err := registerNoInteractive(ctx, *configFile, regArgs); err != nil { + return err + } + } else { + go func() { + if err := registerInteractive(ctx, *configFile); err != nil { + log.Fatal(err) + return + } + os.Exit(0) + }() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + } + + return nil + } +} + +// registerArgs represents the arguments for register command +type registerArgs struct { + NoInteractive bool + InstanceAddr string + Token string + RunnerName string + Labels string +} + +type registerStage int8 + +const ( + StageUnknown registerStage = -1 + StageOverwriteLocalConfig registerStage = iota + 1 + StageInputInstance + StageInputToken + StageInputRunnerName + StageInputLabels + StageWaitingForRegistration + StageExit +) + +var defaultLabels = []string{ + "docker:docker://node:20-bullseye", +} + +type registerInputs struct { + InstanceAddr string + Token string + RunnerName string + Labels []string +} + +func (r *registerInputs) validate() error { + if r.InstanceAddr == "" { + return fmt.Errorf("instance address is empty") + } + if r.Token == "" { + return fmt.Errorf("token is empty") + } + if len(r.Labels) > 0 { + return validateLabels(r.Labels) + } + return nil +} + +func validateLabels(ls []string) error { + for _, label := range ls { + if _, err := labels.Parse(label); err != nil { + return err + } + } + return nil +} + +func (r *registerInputs) assignToNext(stage registerStage, value string, cfg *config.Config) registerStage { + // must set instance address and token. + // if empty, keep current stage. + if stage == StageInputInstance || stage == StageInputToken { + if value == "" { + return stage + } + } + + // set hostname for runner name if empty + if stage == StageInputRunnerName && value == "" { + value, _ = os.Hostname() + } + + switch stage { + case StageOverwriteLocalConfig: + if value == "Y" || value == "y" { + return StageInputInstance + } + return StageExit + case StageInputInstance: + r.InstanceAddr = value + return StageInputToken + case StageInputToken: + r.Token = value + return StageInputRunnerName + case StageInputRunnerName: + r.RunnerName = value + // if there are some labels configured in config file, skip input labels stage + if len(cfg.Runner.Labels) > 0 { + ls := make([]string, 0, len(cfg.Runner.Labels)) + for _, l := range cfg.Runner.Labels { + _, err := labels.Parse(l) + if err != nil { + log.WithError(err).Warnf("ignored invalid label %q", l) + continue + } + ls = append(ls, l) + } + if len(ls) == 0 { + log.Warn("no valid labels configured in config file, runner may not be able to pick up jobs") + } + r.Labels = ls + return StageWaitingForRegistration + } + return StageInputLabels + case StageInputLabels: + r.Labels = defaultLabels + if value != "" { + r.Labels = strings.Split(value, ",") + } + + if validateLabels(r.Labels) != nil { + log.Infoln("Invalid labels, please input again, leave blank to use the default labels (for example, ubuntu-20.04:docker://node:20-bookworm,ubuntu-18.04:docker://node:20-bookworm)") + return StageInputLabels + } + return StageWaitingForRegistration + } + return StageUnknown +} + +func registerInteractive(ctx context.Context, configFile string) error { + var ( + reader = bufio.NewReader(os.Stdin) + stage = StageInputInstance + inputs = new(registerInputs) + ) + + cfg, err := config.LoadDefault(configFile) + if err != nil { + return fmt.Errorf("failed to load config: %v", err) + } + if f, err := os.Stat(cfg.Runner.File); err == nil && !f.IsDir() { + stage = StageOverwriteLocalConfig + } + + for { + printStageHelp(stage) + + cmdString, err := reader.ReadString('\n') + if err != nil { + return err + } + stage = inputs.assignToNext(stage, strings.TrimSpace(cmdString), cfg) + + if stage == StageWaitingForRegistration { + log.Infof("Registering runner, name=%s, instance=%s, labels=%v.", inputs.RunnerName, inputs.InstanceAddr, inputs.Labels) + if err := doRegister(ctx, cfg, inputs); err != nil { + return fmt.Errorf("Failed to register runner: %w", err) + } + log.Infof("Runner registered successfully.") + return nil + } + + if stage == StageExit { + return nil + } + + if stage <= StageUnknown { + log.Errorf("Invalid input, please re-run act command.") + return nil + } + } +} + +func printStageHelp(stage registerStage) { + switch stage { + case StageOverwriteLocalConfig: + log.Infoln("Runner is already registered, overwrite local config? [y/N]") + case StageInputInstance: + log.Infoln("Enter the Forgejo instance URL (for example, https://next.forgejo.org/):") + case StageInputToken: + log.Infoln("Enter the runner token:") + case StageInputRunnerName: + hostname, _ := os.Hostname() + log.Infof("Enter the runner name (if set empty, use hostname: %s):\n", hostname) + case StageInputLabels: + log.Infoln("Enter the runner labels, leave blank to use the default labels (comma-separated, for example, ubuntu-20.04:docker://node:20-bookworm,ubuntu-18.04:docker://node:20-bookworm):") + case StageWaitingForRegistration: + log.Infoln("Waiting for registration...") + } +} + +func registerNoInteractive(ctx context.Context, configFile string, regArgs *registerArgs) error { + cfg, err := config.LoadDefault(configFile) + if err != nil { + return err + } + inputs := ®isterInputs{ + InstanceAddr: regArgs.InstanceAddr, + Token: regArgs.Token, + RunnerName: regArgs.RunnerName, + Labels: defaultLabels, + } + regArgs.Labels = strings.TrimSpace(regArgs.Labels) + // command line flag. + if regArgs.Labels != "" { + inputs.Labels = strings.Split(regArgs.Labels, ",") + } + // specify labels in config file. + if len(cfg.Runner.Labels) > 0 { + if regArgs.Labels != "" { + log.Warn("Labels from command will be ignored, use labels defined in config file.") + } + inputs.Labels = cfg.Runner.Labels + } + + if inputs.RunnerName == "" { + inputs.RunnerName, _ = os.Hostname() + log.Infof("Runner name is empty, use hostname '%s'.", inputs.RunnerName) + } + if err := inputs.validate(); err != nil { + log.WithError(err).Errorf("Invalid input, please re-run act command.") + return nil + } + if err := doRegister(ctx, cfg, inputs); err != nil { + return fmt.Errorf("Failed to register runner: %w", err) + } + log.Infof("Runner registered successfully.") + return nil +} + +func doRegister(ctx context.Context, cfg *config.Config, inputs *registerInputs) error { + // initial http client + cli := client.New( + inputs.InstanceAddr, + cfg.Runner.Insecure, + "", + "", + ver.Version(), + ) + + for { + _, err := cli.Ping(ctx, connect.NewRequest(&pingv1.PingRequest{ + Data: inputs.RunnerName, + })) + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if ctx.Err() != nil { + break + } + if err != nil { + log.WithError(err). + Errorln("Cannot ping the Forgejo instance server") + // TODO: if ping failed, retry or exit + time.Sleep(time.Second) + } else { + log.Debugln("Successfully pinged the Forgejo instance server") + break + } + } + + reg := &config.Registration{ + Name: inputs.RunnerName, + Token: inputs.Token, + Address: inputs.InstanceAddr, + Labels: inputs.Labels, + } + + ls := make([]string, len(reg.Labels)) + for i, v := range reg.Labels { + l, _ := labels.Parse(v) + ls[i] = l.Name + } + // register new runner. + resp, err := cli.Register(ctx, connect.NewRequest(&runnerv1.RegisterRequest{ + Name: reg.Name, + Token: reg.Token, + Version: ver.Version(), + AgentLabels: ls, // Could be removed after Gitea 1.20 + Labels: ls, + })) + if err != nil { + log.WithError(err).Error("poller: cannot register new runner") + return err + } + + reg.ID = resp.Msg.Runner.Id + reg.UUID = resp.Msg.Runner.Uuid + reg.Name = resp.Msg.Runner.Name + reg.Token = resp.Msg.Runner.Token + + if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil { + return fmt.Errorf("failed to save runner config: %w", err) + } + return nil +} diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go new file mode 100644 index 0000000..cc89fa5 --- /dev/null +++ b/internal/app/poll/poller.go @@ -0,0 +1,167 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package poll + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "connectrpc.com/connect" + log "github.com/sirupsen/logrus" + "golang.org/x/time/rate" + + "gitea.com/gitea/act_runner/internal/app/run" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" +) + +const PollerID = "PollerID" + +type Poller interface { + Poll() + Shutdown(ctx context.Context) error +} + +type poller struct { + client client.Client + runner run.RunnerInterface + cfg *config.Config + tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea. + + pollingCtx context.Context + shutdownPolling context.CancelFunc + + jobsCtx context.Context + shutdownJobs context.CancelFunc + + done chan any +} + +func New(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { + return (&poller{}).init(cfg, client, runner) +} + +func (p *poller) init(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { + pollingCtx, shutdownPolling := context.WithCancel(context.Background()) + + jobsCtx, shutdownJobs := context.WithCancel(context.Background()) + + done := make(chan any) + + p.client = client + p.runner = runner + p.cfg = cfg + + p.pollingCtx = pollingCtx + p.shutdownPolling = shutdownPolling + + p.jobsCtx = jobsCtx + p.shutdownJobs = shutdownJobs + p.done = done + + return p +} + +func (p *poller) Poll() { + limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) + wg := &sync.WaitGroup{} + for i := 0; i < p.cfg.Runner.Capacity; i++ { + wg.Add(1) + go p.poll(i, wg, limiter) + } + wg.Wait() + + // signal the poller is finished + close(p.done) +} + +func (p *poller) Shutdown(ctx context.Context) error { + p.shutdownPolling() + + select { + case <-p.done: + log.Trace("all jobs are complete") + return nil + + case <-ctx.Done(): + log.Trace("forcing the jobs to shutdown") + p.shutdownJobs() + <-p.done + log.Trace("all jobs have been shutdown") + return ctx.Err() + } +} + +func (p *poller) poll(id int, wg *sync.WaitGroup, limiter *rate.Limiter) { + log.Infof("[poller %d] launched", id) + defer wg.Done() + for { + if err := limiter.Wait(p.pollingCtx); err != nil { + log.Infof("[poller %d] shutdown", id) + return + } + task, ok := p.fetchTask(p.pollingCtx) + if !ok { + continue + } + p.runTaskWithRecover(p.jobsCtx, task) + } +} + +func (p *poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) { + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("panic: %v", r) + log.WithError(err).Error("panic in runTaskWithRecover") + } + }() + + if err := p.runner.Run(ctx, task); err != nil { + log.WithError(err).Error("failed to run task") + } +} + +func (p *poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { + reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout) + defer cancel() + + // Load the version value that was in the cache when the request was sent. + v := p.tasksVersion.Load() + resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: v, + })) + if errors.Is(err, context.DeadlineExceeded) { + log.Trace("deadline exceeded") + err = nil + } + if err != nil { + if errors.Is(err, context.Canceled) { + log.WithError(err).Debugf("shutdown, fetch task canceled") + } else { + log.WithError(err).Error("failed to fetch task") + } + return nil, false + } + + if resp == nil || resp.Msg == nil { + return nil, false + } + + if resp.Msg.TasksVersion > v { + p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion) + } + + if resp.Msg.Task == nil { + return nil, false + } + + // got a task, set `tasksVersion` to zero to focre query db in next request. + p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0) + + return resp.Msg.Task, true +} diff --git a/internal/app/poll/poller_test.go b/internal/app/poll/poller_test.go new file mode 100644 index 0000000..04b1a84 --- /dev/null +++ b/internal/app/poll/poller_test.go @@ -0,0 +1,263 @@ +// Copyright The Forgejo Authors. +// SPDX-License-Identifier: MIT + +package poll + +import ( + "context" + "fmt" + "testing" + "time" + + "connectrpc.com/connect" + + "code.gitea.io/actions-proto-go/ping/v1/pingv1connect" + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect" + "gitea.com/gitea/act_runner/internal/pkg/config" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +type mockPoller struct { + poller +} + +func (o *mockPoller) Poll() { + o.poller.Poll() +} + +type mockClient struct { + pingv1connect.PingServiceClient + runnerv1connect.RunnerServiceClient + + sleep time.Duration + cancel bool + err error + noTask bool +} + +func (o mockClient) Address() string { + return "" +} + +func (o mockClient) Insecure() bool { + return true +} + +func (o *mockClient) FetchTask(ctx context.Context, req *connect.Request[runnerv1.FetchTaskRequest]) (*connect.Response[runnerv1.FetchTaskResponse], error) { + if o.sleep > 0 { + select { + case <-ctx.Done(): + log.Trace("fetch task done") + return nil, context.DeadlineExceeded + case <-time.After(o.sleep): + log.Trace("slept") + return nil, fmt.Errorf("unexpected") + } + } + if o.cancel { + return nil, context.Canceled + } + if o.err != nil { + return nil, o.err + } + task := &runnerv1.Task{} + if o.noTask { + task = nil + o.noTask = false + } + + return connect.NewResponse(&runnerv1.FetchTaskResponse{ + Task: task, + TasksVersion: int64(1), + }), nil +} + +type mockRunner struct { + cfg *config.Runner + log chan string + panics bool + err error +} + +func (o *mockRunner) Run(ctx context.Context, task *runnerv1.Task) error { + o.log <- "runner starts" + if o.panics { + log.Trace("panics") + o.log <- "runner panics" + o.panics = false + panic("whatever") + } + if o.err != nil { + log.Trace("error") + o.log <- "runner error" + err := o.err + o.err = nil + return err + } + for { + select { + case <-ctx.Done(): + log.Trace("shutdown") + o.log <- "runner shutdown" + return nil + case <-time.After(o.cfg.Timeout): + log.Trace("after") + o.log <- "runner timeout" + return nil + } + } +} + +func setTrace(t *testing.T) { + t.Helper() + log.SetReportCaller(true) + log.SetLevel(log.TraceLevel) +} + +func TestPoller_New(t *testing.T) { + p := New(&config.Config{}, &mockClient{}, &mockRunner{}) + assert.NotNil(t, p) +} + +func TestPoller_Runner(t *testing.T) { + setTrace(t) + for _, testCase := range []struct { + name string + timeout time.Duration + noTask bool + panics bool + err error + expected string + contextTimeout time.Duration + }{ + { + name: "Simple", + timeout: 10 * time.Second, + expected: "runner shutdown", + }, + { + name: "Panics", + timeout: 10 * time.Second, + panics: true, + expected: "runner panics", + }, + { + name: "Error", + timeout: 10 * time.Second, + err: fmt.Errorf("ERROR"), + expected: "runner error", + }, + { + name: "PollTaskError", + timeout: 10 * time.Second, + noTask: true, + expected: "runner shutdown", + }, + { + name: "ShutdownTimeout", + timeout: 1 * time.Second, + contextTimeout: 1 * time.Minute, + expected: "runner timeout", + }, + } { + t.Run(testCase.name, func(t *testing.T) { + runnerLog := make(chan string, 3) + configRunner := config.Runner{ + FetchInterval: 1, + Capacity: 1, + Timeout: testCase.timeout, + } + p := &mockPoller{} + p.init( + &config.Config{ + Runner: configRunner, + }, + &mockClient{ + noTask: testCase.noTask, + }, + &mockRunner{ + cfg: &configRunner, + log: runnerLog, + panics: testCase.panics, + err: testCase.err, + }) + go p.Poll() + assert.Equal(t, "runner starts", <-runnerLog) + var ctx context.Context + var cancel context.CancelFunc + if testCase.contextTimeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), testCase.contextTimeout) + defer cancel() + } else { + ctx, cancel = context.WithCancel(context.Background()) + cancel() + } + p.Shutdown(ctx) + <-p.done + assert.Equal(t, testCase.expected, <-runnerLog) + }) + } +} + +func TestPoller_Fetch(t *testing.T) { + setTrace(t) + for _, testCase := range []struct { + name string + noTask bool + sleep time.Duration + err error + cancel bool + success bool + }{ + { + name: "Success", + success: true, + }, + { + name: "Timeout", + sleep: 100 * time.Millisecond, + }, + { + name: "Canceled", + cancel: true, + }, + { + name: "NoTask", + noTask: true, + }, + { + name: "Error", + err: fmt.Errorf("random error"), + }, + } { + t.Run(testCase.name, func(t *testing.T) { + configRunner := config.Runner{ + FetchTimeout: 1 * time.Millisecond, + } + p := &mockPoller{} + p.init( + &config.Config{ + Runner: configRunner, + }, + &mockClient{ + sleep: testCase.sleep, + cancel: testCase.cancel, + noTask: testCase.noTask, + err: testCase.err, + }, + &mockRunner{}, + ) + task, ok := p.fetchTask(context.Background()) + if testCase.success { + assert.True(t, ok) + assert.NotNil(t, task) + } else { + assert.False(t, ok) + assert.Nil(t, task) + } + }) + } +} diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go new file mode 100644 index 0000000..e8654b6 --- /dev/null +++ b/internal/app/run/runner.go @@ -0,0 +1,260 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "context" + "encoding/json" + "fmt" + "path/filepath" + "strings" + "sync" + "time" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "connectrpc.com/connect" + "github.com/docker/docker/api/types/container" + "github.com/nektos/act/pkg/artifactcache" + "github.com/nektos/act/pkg/common" + "github.com/nektos/act/pkg/model" + "github.com/nektos/act/pkg/runner" + log "github.com/sirupsen/logrus" + + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/report" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +// Runner runs the pipeline. +type Runner struct { + name string + + cfg *config.Config + + client client.Client + labels labels.Labels + envs map[string]string + + runningTasks sync.Map +} + +type RunnerInterface interface { + Run(ctx context.Context, task *runnerv1.Task) error +} + +func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client) *Runner { + ls := labels.Labels{} + for _, v := range reg.Labels { + if l, err := labels.Parse(v); err == nil { + ls = append(ls, l) + } + } + + if cfg.Runner.Envs == nil { + cfg.Runner.Envs = make(map[string]string, 10) + } + + cfg.Runner.Envs["GITHUB_SERVER_URL"] = reg.Address + + envs := make(map[string]string, len(cfg.Runner.Envs)) + for k, v := range cfg.Runner.Envs { + envs[k] = v + } + if cfg.Cache.Enabled == nil || *cfg.Cache.Enabled { + if cfg.Cache.ExternalServer != "" { + envs["ACTIONS_CACHE_URL"] = cfg.Cache.ExternalServer + } else { + cacheHandler, err := artifactcache.StartHandler( + cfg.Cache.Dir, + cfg.Cache.Host, + cfg.Cache.Port, + log.StandardLogger().WithField("module", "cache_request"), + ) + if err != nil { + log.Errorf("cannot init cache server, it will be disabled: %v", err) + // go on + } else { + envs["ACTIONS_CACHE_URL"] = cacheHandler.ExternalURL() + "/" + } + } + } + + // set artifact gitea api + artifactGiteaAPI := strings.TrimSuffix(cli.Address(), "/") + "/api/actions_pipeline/" + envs["ACTIONS_RUNTIME_URL"] = artifactGiteaAPI + envs["ACTIONS_RESULTS_URL"] = strings.TrimSuffix(cli.Address(), "/") + + // Set specific environments to distinguish between Gitea and GitHub + envs["GITEA_ACTIONS"] = "true" + envs["GITEA_ACTIONS_RUNNER_VERSION"] = ver.Version() + + return &Runner{ + name: reg.Name, + cfg: cfg, + client: cli, + labels: ls, + envs: envs, + } +} + +func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { + if _, ok := r.runningTasks.Load(task.Id); ok { + return fmt.Errorf("task %d is already running", task.Id) + } + r.runningTasks.Store(task.Id, struct{}{}) + defer r.runningTasks.Delete(task.Id) + + ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout) + defer cancel() + reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval) + var runErr error + defer func() { + lastWords := "" + if runErr != nil { + lastWords = runErr.Error() + } + _ = reporter.Close(lastWords) + }() + reporter.RunDaemon() + runErr = r.run(ctx, task, reporter) + + return nil +} + +func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.Reporter) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue()) + + workflow, jobID, err := generateWorkflow(task) + if err != nil { + return err + } + + plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID) + if err != nil { + return err + } + job := workflow.GetJob(jobID) + reporter.ResetSteps(len(job.Steps)) + + taskContext := task.Context.Fields + + log.Infof("task %v repo is %v %v %v", task.Id, taskContext["repository"].GetStringValue(), + taskContext["gitea_default_actions_url"].GetStringValue(), + r.client.Address()) + + preset := &model.GithubContext{ + Event: taskContext["event"].GetStructValue().AsMap(), + RunID: taskContext["run_id"].GetStringValue(), + RunNumber: taskContext["run_number"].GetStringValue(), + Actor: taskContext["actor"].GetStringValue(), + Repository: taskContext["repository"].GetStringValue(), + EventName: taskContext["event_name"].GetStringValue(), + Sha: taskContext["sha"].GetStringValue(), + Ref: taskContext["ref"].GetStringValue(), + RefName: taskContext["ref_name"].GetStringValue(), + RefType: taskContext["ref_type"].GetStringValue(), + HeadRef: taskContext["head_ref"].GetStringValue(), + BaseRef: taskContext["base_ref"].GetStringValue(), + Token: taskContext["token"].GetStringValue(), + RepositoryOwner: taskContext["repository_owner"].GetStringValue(), + RetentionDays: taskContext["retention_days"].GetStringValue(), + } + if t := task.Secrets["GITEA_TOKEN"]; t != "" { + preset.Token = t + } else if t := task.Secrets["GITHUB_TOKEN"]; t != "" { + preset.Token = t + } + + giteaRuntimeToken := taskContext["gitea_runtime_token"].GetStringValue() + if giteaRuntimeToken == "" { + // use task token to action api token for previous Gitea Server Versions + giteaRuntimeToken = preset.Token + } + r.envs["ACTIONS_RUNTIME_TOKEN"] = giteaRuntimeToken + + eventJSON, err := json.Marshal(preset.Event) + if err != nil { + return err + } + + maxLifetime := 3 * time.Hour + if deadline, ok := ctx.Deadline(); ok { + maxLifetime = time.Until(deadline) + } + + var inputs map[string]string + if preset.EventName == "workflow_dispatch" { + if inputsRaw, ok := preset.Event["inputs"]; ok { + inputs, _ = inputsRaw.(map[string]string) + } + } + + runnerConfig := &runner.Config{ + // On Linux, Workdir will be like "/<parent_directory>/<owner>/<repo>" + // On Windows, Workdir will be like "\<parent_directory>\<owner>\<repo>" + Workdir: filepath.FromSlash(filepath.Clean(fmt.Sprintf("/%s/%s", r.cfg.Container.WorkdirParent, preset.Repository))), + BindWorkdir: false, + ActionCacheDir: filepath.FromSlash(r.cfg.Host.WorkdirParent), + + ReuseContainers: false, + ForcePull: r.cfg.Container.ForcePull, + ForceRebuild: false, + LogOutput: true, + JSONLogger: false, + Env: r.envs, + Secrets: task.Secrets, + GitHubInstance: strings.TrimSuffix(r.client.Address(), "/"), + AutoRemove: true, + NoSkipCheckout: true, + PresetGitHubContext: preset, + EventJSON: string(eventJSON), + ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id), + ContainerMaxLifetime: maxLifetime, + ContainerNetworkMode: container.NetworkMode(r.cfg.Container.Network), + ContainerNetworkEnableIPv6: r.cfg.Container.EnableIPv6, + ContainerOptions: r.cfg.Container.Options, + ContainerDaemonSocket: r.cfg.Container.DockerHost, + Privileged: r.cfg.Container.Privileged, + DefaultActionInstance: taskContext["gitea_default_actions_url"].GetStringValue(), + PlatformPicker: r.labels.PickPlatform, + Vars: task.Vars, + ValidVolumes: r.cfg.Container.ValidVolumes, + InsecureSkipTLS: r.cfg.Runner.Insecure, + Inputs: inputs, + } + + rr, err := runner.New(runnerConfig) + if err != nil { + return err + } + executor := rr.NewPlanExecutor(plan) + + reporter.Logf("workflow prepared") + + // add logger recorders + ctx = common.WithLoggerHook(ctx, reporter) + + execErr := executor(ctx) + reporter.SetOutputs(job.Outputs) + return execErr +} + +func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) { + return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{ + Version: ver.Version(), + Labels: labels, + })) +} + +func (r *Runner) Update(ctx context.Context, labels labels.Labels) { + r.labels = labels +} diff --git a/internal/app/run/runner_test.go b/internal/app/run/runner_test.go new file mode 100644 index 0000000..0145c70 --- /dev/null +++ b/internal/app/run/runner_test.go @@ -0,0 +1,37 @@ +package run + +import ( + "context" + "testing" + + "gitea.com/gitea/act_runner/internal/pkg/labels" + "github.com/stretchr/testify/assert" +) + +func TestLabelUpdate(t *testing.T) { + ctx := context.Background() + ls := labels.Labels{} + + initialLabel, err := labels.Parse("testlabel:docker://alpine") + assert.NoError(t, err) + ls = append(ls, initialLabel) + + newLs := labels.Labels{} + + newLabel, err := labels.Parse("next label:host") + assert.NoError(t, err) + newLs = append(newLs, initialLabel) + newLs = append(newLs, newLabel) + + runner := Runner{ + labels: ls, + } + + assert.Contains(t, runner.labels, initialLabel) + assert.NotContains(t, runner.labels, newLabel) + + runner.Update(ctx, newLs) + + assert.Contains(t, runner.labels, initialLabel) + assert.Contains(t, runner.labels, newLabel) +} diff --git a/internal/app/run/workflow.go b/internal/app/run/workflow.go new file mode 100644 index 0000000..a6fbb71 --- /dev/null +++ b/internal/app/run/workflow.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "bytes" + "fmt" + "sort" + "strings" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/nektos/act/pkg/model" + "gopkg.in/yaml.v3" +) + +func generateWorkflow(task *runnerv1.Task) (*model.Workflow, string, error) { + workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload)) + if err != nil { + return nil, "", err + } + + jobIDs := workflow.GetJobIDs() + if len(jobIDs) != 1 { + return nil, "", fmt.Errorf("multiple jobs found: %v", jobIDs) + } + jobID := jobIDs[0] + + needJobIDs := make([]string, 0, len(task.Needs)) + for id, need := range task.Needs { + needJobIDs = append(needJobIDs, id) + needJob := &model.Job{ + Outputs: need.Outputs, + Result: strings.ToLower(strings.TrimPrefix(need.Result.String(), "RESULT_")), + } + workflow.Jobs[id] = needJob + } + sort.Strings(needJobIDs) + + rawNeeds := yaml.Node{ + Kind: yaml.SequenceNode, + Content: make([]*yaml.Node, 0, len(needJobIDs)), + } + for _, id := range needJobIDs { + rawNeeds.Content = append(rawNeeds.Content, &yaml.Node{ + Kind: yaml.ScalarNode, + Value: id, + }) + } + + workflow.Jobs[jobID].RawNeeds = rawNeeds + + return workflow, jobID, nil +} diff --git a/internal/app/run/workflow_test.go b/internal/app/run/workflow_test.go new file mode 100644 index 0000000..c7598db --- /dev/null +++ b/internal/app/run/workflow_test.go @@ -0,0 +1,74 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "testing" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/nektos/act/pkg/model" + "github.com/stretchr/testify/require" + "gotest.tools/v3/assert" +) + +func Test_generateWorkflow(t *testing.T) { + type args struct { + task *runnerv1.Task + } + tests := []struct { + name string + args args + assert func(t *testing.T, wf *model.Workflow) + want1 string + wantErr bool + }{ + { + name: "has needs", + args: args{ + task: &runnerv1.Task{ + WorkflowPayload: []byte(` +name: Build and deploy +on: push + +jobs: + job9: + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: ./deploy --build ${{ needs.job1.outputs.output1 }} + - run: ./deploy --build ${{ needs.job2.outputs.output2 }} +`), + Needs: map[string]*runnerv1.TaskNeed{ + "job1": { + Outputs: map[string]string{ + "output1": "output1 value", + }, + Result: runnerv1.Result_RESULT_SUCCESS, + }, + "job2": { + Outputs: map[string]string{ + "output2": "output2 value", + }, + Result: runnerv1.Result_RESULT_SUCCESS, + }, + }, + }, + }, + assert: func(t *testing.T, wf *model.Workflow) { + assert.DeepEqual(t, wf.GetJob("job9").Needs(), []string{"job1", "job2"}) + }, + want1: "job9", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1, err := generateWorkflow(tt.args.task) + require.NoError(t, err) + tt.assert(t, got) + assert.Equal(t, got1, tt.want1) + }) + } +} |