diff options
Diffstat (limited to 'pkg/model/planner.go')
-rw-r--r-- | pkg/model/planner.go | 415 |
1 files changed, 415 insertions, 0 deletions
diff --git a/pkg/model/planner.go b/pkg/model/planner.go new file mode 100644 index 0000000..2d68145 --- /dev/null +++ b/pkg/model/planner.go @@ -0,0 +1,415 @@ +package model + +import ( + "fmt" + "io" + "io/fs" + "math" + "os" + "path/filepath" + "regexp" + "sort" + + log "github.com/sirupsen/logrus" +) + +// WorkflowPlanner contains methods for creating plans +type WorkflowPlanner interface { + PlanEvent(eventName string) (*Plan, error) + PlanJob(jobName string) (*Plan, error) + PlanAll() (*Plan, error) + GetEvents() []string +} + +// Plan contains a list of stages to run in series +type Plan struct { + Stages []*Stage +} + +// Stage contains a list of runs to execute in parallel +type Stage struct { + Runs []*Run +} + +// Run represents a job from a workflow that needs to be run +type Run struct { + Workflow *Workflow + JobID string +} + +func (r *Run) String() string { + jobName := r.Job().Name + if jobName == "" { + jobName = r.JobID + } + return jobName +} + +// Job returns the job for this Run +func (r *Run) Job() *Job { + return r.Workflow.GetJob(r.JobID) +} + +type WorkflowFiles struct { + workflowDirEntry os.DirEntry + dirPath string +} + +// NewWorkflowPlanner will load a specific workflow, all workflows from a directory or all workflows from a directory and its subdirectories +// +//nolint:gocyclo +func NewWorkflowPlanner(path string, noWorkflowRecurse bool) (WorkflowPlanner, error) { + path, err := filepath.Abs(path) + if err != nil { + return nil, err + } + + fi, err := os.Stat(path) + if err != nil { + return nil, err + } + + var workflows []WorkflowFiles + + if fi.IsDir() { + log.Debugf("Loading workflows from '%s'", path) + if noWorkflowRecurse { + files, err := os.ReadDir(path) + if err != nil { + return nil, err + } + + for _, v := range files { + workflows = append(workflows, WorkflowFiles{ + dirPath: path, + workflowDirEntry: v, + }) + } + } else { + log.Debug("Loading workflows recursively") + if err := filepath.Walk(path, + func(p string, f os.FileInfo, err error) error { + if err != nil { + return err + } + + if !f.IsDir() { + log.Debugf("Found workflow '%s' in '%s'", f.Name(), p) + workflows = append(workflows, WorkflowFiles{ + dirPath: filepath.Dir(p), + workflowDirEntry: fs.FileInfoToDirEntry(f), + }) + } + + return nil + }); err != nil { + return nil, err + } + } + } else { + log.Debugf("Loading workflow '%s'", path) + dirname := filepath.Dir(path) + + workflows = append(workflows, WorkflowFiles{ + dirPath: dirname, + workflowDirEntry: fs.FileInfoToDirEntry(fi), + }) + } + if err != nil { + return nil, err + } + + wp := new(workflowPlanner) + for _, wf := range workflows { + ext := filepath.Ext(wf.workflowDirEntry.Name()) + if ext == ".yml" || ext == ".yaml" { + f, err := os.Open(filepath.Join(wf.dirPath, wf.workflowDirEntry.Name())) + if err != nil { + return nil, err + } + + log.Debugf("Reading workflow '%s'", f.Name()) + workflow, err := ReadWorkflow(f) + if err != nil { + _ = f.Close() + if err == io.EOF { + return nil, fmt.Errorf("unable to read workflow '%s': file is empty: %w", wf.workflowDirEntry.Name(), err) + } + return nil, fmt.Errorf("workflow is not valid. '%s': %w", wf.workflowDirEntry.Name(), err) + } + _, err = f.Seek(0, 0) + if err != nil { + _ = f.Close() + return nil, fmt.Errorf("error occurring when resetting io pointer in '%s': %w", wf.workflowDirEntry.Name(), err) + } + + workflow.File = wf.workflowDirEntry.Name() + if workflow.Name == "" { + workflow.Name = wf.workflowDirEntry.Name() + } + + err = validateJobName(workflow) + if err != nil { + _ = f.Close() + return nil, err + } + + wp.workflows = append(wp.workflows, workflow) + _ = f.Close() + } + } + + return wp, nil +} + +// CombineWorkflowPlanner combines workflows to a WorkflowPlanner +func CombineWorkflowPlanner(workflows ...*Workflow) WorkflowPlanner { + return &workflowPlanner{ + workflows: workflows, + } +} + +func NewSingleWorkflowPlanner(name string, f io.Reader) (WorkflowPlanner, error) { + wp := new(workflowPlanner) + + log.Debugf("Reading workflow %s", name) + workflow, err := ReadWorkflow(f) + if err != nil { + if err == io.EOF { + return nil, fmt.Errorf("unable to read workflow '%s': file is empty: %w", name, err) + } + return nil, fmt.Errorf("workflow is not valid. '%s': %w", name, err) + } + workflow.File = name + if workflow.Name == "" { + workflow.Name = name + } + + err = validateJobName(workflow) + if err != nil { + return nil, err + } + + wp.workflows = append(wp.workflows, workflow) + + return wp, nil +} + +func validateJobName(workflow *Workflow) error { + jobNameRegex := regexp.MustCompile(`^([[:alpha:]_][[:alnum:]_\-]*)$`) + for k := range workflow.Jobs { + if ok := jobNameRegex.MatchString(k); !ok { + return fmt.Errorf("workflow is not valid. '%s': Job name '%s' is invalid. Names must start with a letter or '_' and contain only alphanumeric characters, '-', or '_'", workflow.Name, k) + } + } + return nil +} + +type workflowPlanner struct { + workflows []*Workflow +} + +// PlanEvent builds a new list of runs to execute in parallel for an event name +func (wp *workflowPlanner) PlanEvent(eventName string) (*Plan, error) { + plan := new(Plan) + if len(wp.workflows) == 0 { + log.Debug("no workflows found by planner") + return plan, nil + } + var lastErr error + + for _, w := range wp.workflows { + events := w.On() + if len(events) == 0 { + log.Debugf("no events found for workflow: %s", w.File) + continue + } + + for _, e := range events { + if e == eventName { + stages, err := createStages(w, w.GetJobIDs()...) + if err != nil { + log.Warn(err) + lastErr = err + } else { + plan.mergeStages(stages) + } + } + } + } + return plan, lastErr +} + +// PlanJob builds a new run to execute in parallel for a job name +func (wp *workflowPlanner) PlanJob(jobName string) (*Plan, error) { + plan := new(Plan) + if len(wp.workflows) == 0 { + log.Debugf("no jobs found for workflow: %s", jobName) + } + var lastErr error + + for _, w := range wp.workflows { + stages, err := createStages(w, jobName) + if err != nil { + log.Warn(err) + lastErr = err + } else { + plan.mergeStages(stages) + } + } + return plan, lastErr +} + +// PlanAll builds a new run to execute in parallel all +func (wp *workflowPlanner) PlanAll() (*Plan, error) { + plan := new(Plan) + if len(wp.workflows) == 0 { + log.Debug("no workflows found by planner") + return plan, nil + } + var lastErr error + + for _, w := range wp.workflows { + stages, err := createStages(w, w.GetJobIDs()...) + if err != nil { + log.Warn(err) + lastErr = err + } else { + plan.mergeStages(stages) + } + } + + return plan, lastErr +} + +// GetEvents gets all the events in the workflows file +func (wp *workflowPlanner) GetEvents() []string { + events := make([]string, 0) + for _, w := range wp.workflows { + found := false + for _, e := range events { + for _, we := range w.On() { + if e == we { + found = true + break + } + } + if found { + break + } + } + + if !found { + events = append(events, w.On()...) + } + } + + // sort the list based on depth of dependencies + sort.Slice(events, func(i, j int) bool { + return events[i] < events[j] + }) + + return events +} + +// MaxRunNameLen determines the max name length of all jobs +func (p *Plan) MaxRunNameLen() int { + maxRunNameLen := 0 + for _, stage := range p.Stages { + for _, run := range stage.Runs { + runNameLen := len(run.String()) + if runNameLen > maxRunNameLen { + maxRunNameLen = runNameLen + } + } + } + return maxRunNameLen +} + +// GetJobIDs will get all the job names in the stage +func (s *Stage) GetJobIDs() []string { + names := make([]string, 0) + for _, r := range s.Runs { + names = append(names, r.JobID) + } + return names +} + +// Merge stages with existing stages in plan +func (p *Plan) mergeStages(stages []*Stage) { + newStages := make([]*Stage, int(math.Max(float64(len(p.Stages)), float64(len(stages))))) + for i := 0; i < len(newStages); i++ { + newStages[i] = new(Stage) + if i >= len(p.Stages) { + newStages[i].Runs = append(newStages[i].Runs, stages[i].Runs...) + } else if i >= len(stages) { + newStages[i].Runs = append(newStages[i].Runs, p.Stages[i].Runs...) + } else { + newStages[i].Runs = append(newStages[i].Runs, p.Stages[i].Runs...) + newStages[i].Runs = append(newStages[i].Runs, stages[i].Runs...) + } + } + p.Stages = newStages +} + +func createStages(w *Workflow, jobIDs ...string) ([]*Stage, error) { + // first, build a list of all the necessary jobs to run, and their dependencies + jobDependencies := make(map[string][]string) + for len(jobIDs) > 0 { + newJobIDs := make([]string, 0) + for _, jID := range jobIDs { + // make sure we haven't visited this job yet + if _, ok := jobDependencies[jID]; !ok { + if job := w.GetJob(jID); job != nil { + jobDependencies[jID] = job.Needs() + newJobIDs = append(newJobIDs, job.Needs()...) + } + } + } + jobIDs = newJobIDs + } + + // next, build an execution graph + stages := make([]*Stage, 0) + for len(jobDependencies) > 0 { + stage := new(Stage) + for jID, jDeps := range jobDependencies { + // make sure all deps are in the graph already + if listInStages(jDeps, stages...) { + stage.Runs = append(stage.Runs, &Run{ + Workflow: w, + JobID: jID, + }) + delete(jobDependencies, jID) + } + } + if len(stage.Runs) == 0 { + return nil, fmt.Errorf("unable to build dependency graph for %s (%s)", w.Name, w.File) + } + stages = append(stages, stage) + } + + if len(stages) == 0 { + return nil, fmt.Errorf("Could not find any stages to run. View the valid jobs with `act --list`. Use `act --help` to find how to filter by Job ID/Workflow/Event Name") + } + + return stages, nil +} + +// return true iff all strings in srcList exist in at least one of the stages +func listInStages(srcList []string, stages ...*Stage) bool { + for _, src := range srcList { + found := false + for _, stage := range stages { + for _, search := range stage.GetJobIDs() { + if src == search { + found = true + } + } + } + if !found { + return false + } + } + return true +} |