summaryrefslogtreecommitdiffstats
path: root/routers/api/actions/runner/runner.go
diff options
context:
space:
mode:
Diffstat (limited to 'routers/api/actions/runner/runner.go')
-rw-r--r--routers/api/actions/runner/runner.go289
1 files changed, 289 insertions, 0 deletions
diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go
new file mode 100644
index 0000000..017bdf6
--- /dev/null
+++ b/routers/api/actions/runner/runner.go
@@ -0,0 +1,289 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package runner
+
+import (
+ "context"
+ "errors"
+ "net/http"
+
+ actions_model "code.gitea.io/gitea/models/actions"
+ repo_model "code.gitea.io/gitea/models/repo"
+ user_model "code.gitea.io/gitea/models/user"
+ "code.gitea.io/gitea/modules/actions"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/util"
+ actions_service "code.gitea.io/gitea/services/actions"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
+ "connectrpc.com/connect"
+ gouuid "github.com/google/uuid"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func NewRunnerServiceHandler() (string, http.Handler) {
+ return runnerv1connect.NewRunnerServiceHandler(
+ &Service{},
+ connect.WithCompressMinBytes(1024),
+ withRunner,
+ )
+}
+
+var _ runnerv1connect.RunnerServiceClient = (*Service)(nil)
+
+type Service struct {
+ runnerv1connect.UnimplementedRunnerServiceHandler
+}
+
+// Register for new runner.
+func (s *Service) Register(
+ ctx context.Context,
+ req *connect.Request[runnerv1.RegisterRequest],
+) (*connect.Response[runnerv1.RegisterResponse], error) {
+ if req.Msg.Token == "" || req.Msg.Name == "" {
+ return nil, errors.New("missing runner token, name")
+ }
+
+ runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
+ if err != nil {
+ return nil, errors.New("runner registration token not found")
+ }
+
+ if !runnerToken.IsActive {
+ return nil, errors.New("runner registration token has been invalidated, please use the latest one")
+ }
+
+ if runnerToken.OwnerID > 0 {
+ if _, err := user_model.GetUserByID(ctx, runnerToken.OwnerID); err != nil {
+ return nil, errors.New("owner of the token not found")
+ }
+ }
+
+ if runnerToken.RepoID > 0 {
+ if _, err := repo_model.GetRepositoryByID(ctx, runnerToken.RepoID); err != nil {
+ return nil, errors.New("repository of the token not found")
+ }
+ }
+
+ labels := req.Msg.Labels
+
+ // create new runner
+ name, _ := util.SplitStringAtByteN(req.Msg.Name, 255)
+ runner := &actions_model.ActionRunner{
+ UUID: gouuid.New().String(),
+ Name: name,
+ OwnerID: runnerToken.OwnerID,
+ RepoID: runnerToken.RepoID,
+ Version: req.Msg.Version,
+ AgentLabels: labels,
+ }
+ if err := runner.GenerateToken(); err != nil {
+ return nil, errors.New("can't generate token")
+ }
+
+ // create new runner
+ if err := actions_model.CreateRunner(ctx, runner); err != nil {
+ return nil, errors.New("can't create new runner")
+ }
+
+ // update token status
+ runnerToken.IsActive = true
+ if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
+ return nil, errors.New("can't update runner token status")
+ }
+
+ res := connect.NewResponse(&runnerv1.RegisterResponse{
+ Runner: &runnerv1.Runner{
+ Id: runner.ID,
+ Uuid: runner.UUID,
+ Token: runner.Token,
+ Name: runner.Name,
+ Version: runner.Version,
+ Labels: runner.AgentLabels,
+ },
+ })
+
+ return res, nil
+}
+
+func (s *Service) Declare(
+ ctx context.Context,
+ req *connect.Request[runnerv1.DeclareRequest],
+) (*connect.Response[runnerv1.DeclareResponse], error) {
+ runner := GetRunner(ctx)
+ runner.AgentLabels = req.Msg.Labels
+ runner.Version = req.Msg.Version
+ if err := actions_model.UpdateRunner(ctx, runner, "agent_labels", "version"); err != nil {
+ return nil, status.Errorf(codes.Internal, "update runner: %v", err)
+ }
+
+ return connect.NewResponse(&runnerv1.DeclareResponse{
+ Runner: &runnerv1.Runner{
+ Id: runner.ID,
+ Uuid: runner.UUID,
+ Token: runner.Token,
+ Name: runner.Name,
+ Version: runner.Version,
+ Labels: runner.AgentLabels,
+ },
+ }), nil
+}
+
+// FetchTask assigns a task to the runner
+func (s *Service) FetchTask(
+ ctx context.Context,
+ req *connect.Request[runnerv1.FetchTaskRequest],
+) (*connect.Response[runnerv1.FetchTaskResponse], error) {
+ runner := GetRunner(ctx)
+
+ var task *runnerv1.Task
+ tasksVersion := req.Msg.TasksVersion // task version from runner
+ latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
+ } else if latestVersion == 0 {
+ if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
+ return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
+ }
+ // if we don't increase the value of `latestVersion` here,
+ // the response of FetchTask will return tasksVersion as zero.
+ // and the runner will treat it as an old version of Gitea.
+ latestVersion++
+ }
+
+ if tasksVersion != latestVersion {
+ // if the task version in request is not equal to the version in db,
+ // it means there may still be some tasks not be assigned.
+ // try to pick a task for the runner that send the request.
+ if t, ok, err := pickTask(ctx, runner); err != nil {
+ log.Error("pick task failed: %v", err)
+ return nil, status.Errorf(codes.Internal, "pick task: %v", err)
+ } else if ok {
+ task = t
+ }
+ }
+ res := connect.NewResponse(&runnerv1.FetchTaskResponse{
+ Task: task,
+ TasksVersion: latestVersion,
+ })
+ return res, nil
+}
+
+// UpdateTask updates the task status.
+func (s *Service) UpdateTask(
+ ctx context.Context,
+ req *connect.Request[runnerv1.UpdateTaskRequest],
+) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
+ task, err := actions_model.UpdateTaskByState(ctx, req.Msg.State)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "update task: %v", err)
+ }
+
+ for k, v := range req.Msg.Outputs {
+ if len(k) > 255 {
+ log.Warn("Ignore the output of task %d because the key is too long: %q", task.ID, k)
+ continue
+ }
+ // The value can be a maximum of 1 MB
+ if l := len(v); l > 1024*1024 {
+ log.Warn("Ignore the output %q of task %d because the value is too long: %v", k, task.ID, l)
+ continue
+ }
+ // There's another limitation on GitHub that the total of all outputs in a workflow run can be a maximum of 50 MB.
+ // We don't check the total size here because it's not easy to do, and it doesn't really worth it.
+ // See https://docs.github.com/en/actions/using-jobs/defining-outputs-for-jobs
+
+ if err := actions_model.InsertTaskOutputIfNotExist(ctx, task.ID, k, v); err != nil {
+ log.Warn("Failed to insert the output %q of task %d: %v", k, task.ID, err)
+ // It's ok not to return errors, the runner will resend the outputs.
+ }
+ }
+ sentOutputs, err := actions_model.FindTaskOutputKeyByTaskID(ctx, task.ID)
+ if err != nil {
+ log.Warn("Failed to find the sent outputs of task %d: %v", task.ID, err)
+ // It's not to return errors, it can be handled when the runner resends sent outputs.
+ }
+
+ if err := task.LoadJob(ctx); err != nil {
+ return nil, status.Errorf(codes.Internal, "load job: %v", err)
+ }
+ if err := task.Job.LoadRun(ctx); err != nil {
+ return nil, status.Errorf(codes.Internal, "load run: %v", err)
+ }
+
+ // don't create commit status for cron job
+ if task.Job.Run.ScheduleID == 0 {
+ actions_service.CreateCommitStatus(ctx, task.Job)
+ }
+
+ if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
+ if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
+ log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
+ }
+ }
+
+ return connect.NewResponse(&runnerv1.UpdateTaskResponse{
+ State: &runnerv1.TaskState{
+ Id: req.Msg.State.Id,
+ Result: task.Status.AsResult(),
+ },
+ SentOutputs: sentOutputs,
+ }), nil
+}
+
+// UpdateLog uploads log of the task.
+func (s *Service) UpdateLog(
+ ctx context.Context,
+ req *connect.Request[runnerv1.UpdateLogRequest],
+) (*connect.Response[runnerv1.UpdateLogResponse], error) {
+ res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
+
+ task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "get task: %v", err)
+ }
+ ack := task.LogLength
+
+ if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack {
+ res.Msg.AckIndex = ack
+ return res, nil
+ }
+
+ if task.LogInStorage {
+ return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
+ }
+
+ rows := req.Msg.Rows[ack-req.Msg.Index:]
+ ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "write logs: %v", err)
+ }
+ task.LogLength += int64(len(rows))
+ for _, n := range ns {
+ task.LogIndexes = append(task.LogIndexes, task.LogSize)
+ task.LogSize += int64(n)
+ }
+
+ res.Msg.AckIndex = task.LogLength
+
+ var remove func()
+ if req.Msg.NoMore {
+ task.LogInStorage = true
+ remove, err = actions.TransferLogs(ctx, task.LogFilename)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
+ }
+ }
+
+ if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
+ return nil, status.Errorf(codes.Internal, "update task: %v", err)
+ }
+ if remove != nil {
+ remove()
+ }
+
+ return res, nil
+}