From e68b9d00a6e05b3a941f63ffb696f91e554ac5ec Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 18 Oct 2024 20:33:49 +0200 Subject: Adding upstream version 9.0.3. Signed-off-by: Daniel Baumann --- services/actions/schedule_tasks.go | 154 +++++++++++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 services/actions/schedule_tasks.go (limited to 'services/actions/schedule_tasks.go') diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go new file mode 100644 index 0000000..18f3324 --- /dev/null +++ b/services/actions/schedule_tasks.go @@ -0,0 +1,154 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unit" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/timeutil" + webhook_module "code.gitea.io/gitea/modules/webhook" + + "github.com/nektos/act/pkg/jobparser" +) + +// StartScheduleTasks start the task +func StartScheduleTasks(ctx context.Context) error { + return startTasks(ctx) +} + +// startTasks retrieves specifications in pages, creates a schedule task for each specification, +// and updates the specification's next run time and previous run time. +// The function returns an error if there's an issue with finding or updating the specifications. +func startTasks(ctx context.Context) error { + // Set the page size + pageSize := 50 + + // Retrieve specs in pages until all specs have been retrieved + now := time.Now() + for page := 1; ; page++ { + // Retrieve the specs for the current page + specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{ + ListOptions: db.ListOptions{ + Page: page, + PageSize: pageSize, + }, + Next: now.Unix(), + }) + if err != nil { + return fmt.Errorf("find specs: %w", err) + } + + if err := specs.LoadRepos(ctx); err != nil { + return fmt.Errorf("LoadRepos: %w", err) + } + + // Loop through each spec and create a schedule task for it + for _, row := range specs { + // cancel running jobs if the event is push + if row.Schedule.Event == webhook_module.HookEventPush { + // cancel running jobs of the same workflow + if err := actions_model.CancelPreviousJobs( + ctx, + row.RepoID, + row.Schedule.Ref, + row.Schedule.WorkflowID, + webhook_module.HookEventSchedule, + ); err != nil { + log.Error("CancelPreviousJobs: %v", err) + } + } + + if row.Repo.IsArchived { + // Skip if the repo is archived + continue + } + + cfg, err := row.Repo.GetUnit(ctx, unit.TypeActions) + if err != nil { + if repo_model.IsErrUnitTypeNotExist(err) { + // Skip the actions unit of this repo is disabled. + continue + } + return fmt.Errorf("GetUnit: %w", err) + } + if cfg.ActionsConfig().IsWorkflowDisabled(row.Schedule.WorkflowID) { + continue + } + + if err := CreateScheduleTask(ctx, row.Schedule); err != nil { + log.Error("CreateScheduleTask: %v", err) + return err + } + + // Parse the spec + schedule, err := row.Parse() + if err != nil { + log.Error("Parse: %v", err) + return err + } + + // Update the spec's next run time and previous run time + row.Prev = row.Next + row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix()) + if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil { + log.Error("UpdateScheduleSpec: %v", err) + return err + } + } + + // Stop if all specs have been retrieved + if len(specs) < pageSize { + break + } + } + + return nil +} + +// CreateScheduleTask creates a scheduled task from a cron action schedule. +// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job. +func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error { + // Create a new action run based on the schedule + run := &actions_model.ActionRun{ + Title: cron.Title, + RepoID: cron.RepoID, + OwnerID: cron.OwnerID, + WorkflowID: cron.WorkflowID, + TriggerUserID: cron.TriggerUserID, + Ref: cron.Ref, + CommitSHA: cron.CommitSHA, + Event: cron.Event, + EventPayload: cron.EventPayload, + TriggerEvent: string(webhook_module.HookEventSchedule), + ScheduleID: cron.ID, + Status: actions_model.StatusWaiting, + } + + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + log.Error("GetVariablesOfRun: %v", err) + return err + } + + // Parse the workflow specification from the cron schedule + workflows, err := jobparser.Parse(cron.Content, jobparser.WithVars(vars)) + if err != nil { + return err + } + + // Insert the action run and its associated jobs into the database + if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + return err + } + + // Return nil if no errors occurred + return nil +} -- cgit v1.2.3