diff options
Diffstat (limited to 'internal/pkg/report/reporter.go')
-rw-r--r-- | internal/pkg/report/reporter.go | 437 |
1 files changed, 437 insertions, 0 deletions
diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go new file mode 100644 index 0000000..cee5062 --- /dev/null +++ b/internal/pkg/report/reporter.go @@ -0,0 +1,437 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package report + +import ( + "context" + "fmt" + "regexp" + "strings" + "sync" + "time" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "connectrpc.com/connect" + retry "github.com/avast/retry-go/v4" + log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "gitea.com/gitea/act_runner/internal/pkg/client" +) + +type Reporter struct { + ctx context.Context + cancel context.CancelFunc + + closed bool + client client.Client + clientM sync.Mutex + + logOffset int + logRows []*runnerv1.LogRow + logReplacer *strings.Replacer + oldnew []string + reportInterval time.Duration + + state *runnerv1.TaskState + stateMu sync.RWMutex + outputs sync.Map + + debugOutputEnabled bool + stopCommandEndToken string +} + +func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task, reportInterval time.Duration) *Reporter { + var oldnew []string + if v := task.Context.Fields["token"].GetStringValue(); v != "" { + oldnew = append(oldnew, v, "***") + } + if v := task.Context.Fields["gitea_runtime_token"].GetStringValue(); v != "" { + oldnew = append(oldnew, v, "***") + } + for _, v := range task.Secrets { + oldnew = append(oldnew, v, "***") + } + + rv := &Reporter{ + ctx: ctx, + cancel: cancel, + client: client, + oldnew: oldnew, + reportInterval: reportInterval, + logReplacer: strings.NewReplacer(oldnew...), + state: &runnerv1.TaskState{ + Id: task.Id, + }, + } + + if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" { + rv.debugOutputEnabled = true + } + + return rv +} + +func (r *Reporter) ResetSteps(l int) { + r.stateMu.Lock() + defer r.stateMu.Unlock() + for i := 0; i < l; i++ { + r.state.Steps = append(r.state.Steps, &runnerv1.StepState{ + Id: int64(i), + }) + } +} + +func (r *Reporter) Levels() []log.Level { + return log.AllLevels +} + +func appendIfNotNil[T any](s []*T, v *T) []*T { + if v != nil { + return append(s, v) + } + return s +} + +func (r *Reporter) Fire(entry *log.Entry) error { + r.stateMu.Lock() + defer r.stateMu.Unlock() + + log.WithFields(entry.Data).Trace(entry.Message) + + timestamp := entry.Time + if r.state.StartedAt == nil { + r.state.StartedAt = timestamppb.New(timestamp) + } + + stage := entry.Data["stage"] + + if stage != "Main" { + if v, ok := entry.Data["jobResult"]; ok { + if jobResult, ok := r.parseResult(v); ok { + r.state.Result = jobResult + r.state.StoppedAt = timestamppb.New(timestamp) + for _, s := range r.state.Steps { + if s.Result == runnerv1.Result_RESULT_UNSPECIFIED { + s.Result = runnerv1.Result_RESULT_CANCELLED + if jobResult == runnerv1.Result_RESULT_SKIPPED { + s.Result = runnerv1.Result_RESULT_SKIPPED + } + } + } + } + } + if !r.duringSteps() { + r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry)) + } + return nil + } + + var step *runnerv1.StepState + if v, ok := entry.Data["stepNumber"]; ok { + if v, ok := v.(int); ok && len(r.state.Steps) > v { + step = r.state.Steps[v] + } + } + if step == nil { + if !r.duringSteps() { + r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry)) + } + return nil + } + + if step.StartedAt == nil { + step.StartedAt = timestamppb.New(timestamp) + } + if v, ok := entry.Data["raw_output"]; ok { + if rawOutput, ok := v.(bool); ok && rawOutput { + if row := r.parseLogRow(entry); row != nil { + if step.LogLength == 0 { + step.LogIndex = int64(r.logOffset + len(r.logRows)) + } + step.LogLength++ + r.logRows = append(r.logRows, row) + } + } + } else if !r.duringSteps() { + r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry)) + } + if v, ok := entry.Data["stepResult"]; ok { + if stepResult, ok := r.parseResult(v); ok { + if step.LogLength == 0 { + step.LogIndex = int64(r.logOffset + len(r.logRows)) + } + step.Result = stepResult + step.StoppedAt = timestamppb.New(timestamp) + } + } + + return nil +} + +func (r *Reporter) RunDaemon() { + if r.closed { + return + } + if r.ctx.Err() != nil { + return + } + + _ = r.ReportLog(false) + _ = r.ReportState() + + time.AfterFunc(r.reportInterval, r.RunDaemon) +} + +func (r *Reporter) Logf(format string, a ...interface{}) { + r.stateMu.Lock() + defer r.stateMu.Unlock() + + r.logf(format, a...) +} + +func (r *Reporter) logf(format string, a ...interface{}) { + if !r.duringSteps() { + r.logRows = append(r.logRows, &runnerv1.LogRow{ + Time: timestamppb.Now(), + Content: fmt.Sprintf(format, a...), + }) + } +} + +func (r *Reporter) SetOutputs(outputs map[string]string) { + r.stateMu.Lock() + defer r.stateMu.Unlock() + + for k, v := range outputs { + if len(k) > 255 { + r.logf("ignore output because the key is too long: %q", k) + continue + } + if l := len(v); l > 1024*1024 { + log.Println("ignore output because the value is too long:", k, l) + r.logf("ignore output because the value %q is too long: %d", k, l) + } + if _, ok := r.outputs.Load(k); ok { + continue + } + r.outputs.Store(k, v) + } +} + +func (r *Reporter) Close(lastWords string) error { + r.closed = true + + r.stateMu.Lock() + if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { + if lastWords == "" { + lastWords = "Early termination" + } + for _, v := range r.state.Steps { + if v.Result == runnerv1.Result_RESULT_UNSPECIFIED { + v.Result = runnerv1.Result_RESULT_CANCELLED + } + } + r.state.Result = runnerv1.Result_RESULT_FAILURE + r.logRows = append(r.logRows, &runnerv1.LogRow{ + Time: timestamppb.Now(), + Content: lastWords, + }) + r.state.StoppedAt = timestamppb.Now() + } else if lastWords != "" { + r.logRows = append(r.logRows, &runnerv1.LogRow{ + Time: timestamppb.Now(), + Content: lastWords, + }) + } + r.stateMu.Unlock() + + return retry.Do(func() error { + if err := r.ReportLog(true); err != nil { + return err + } + return r.ReportState() + }, retry.Context(r.ctx)) +} + +func (r *Reporter) ReportLog(noMore bool) error { + r.clientM.Lock() + defer r.clientM.Unlock() + + r.stateMu.RLock() + rows := r.logRows + r.stateMu.RUnlock() + + resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{ + TaskId: r.state.Id, + Index: int64(r.logOffset), + Rows: rows, + NoMore: noMore, + })) + if err != nil { + return err + } + + ack := int(resp.Msg.AckIndex) + if ack < r.logOffset { + return fmt.Errorf("submitted logs are lost") + } + + r.stateMu.Lock() + r.logRows = r.logRows[ack-r.logOffset:] + r.logOffset = ack + r.stateMu.Unlock() + + if noMore && ack < r.logOffset+len(rows) { + return fmt.Errorf("not all logs are submitted") + } + + return nil +} + +func (r *Reporter) ReportState() error { + r.clientM.Lock() + defer r.clientM.Unlock() + + r.stateMu.RLock() + state := proto.Clone(r.state).(*runnerv1.TaskState) + r.stateMu.RUnlock() + + outputs := make(map[string]string) + r.outputs.Range(func(k, v interface{}) bool { + if val, ok := v.(string); ok { + outputs[k.(string)] = val + } + return true + }) + + resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ + State: state, + Outputs: outputs, + })) + if err != nil { + return err + } + + for _, k := range resp.Msg.SentOutputs { + r.outputs.Store(k, struct{}{}) + } + + if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED { + r.cancel() + } + + var noSent []string + r.outputs.Range(func(k, v interface{}) bool { + if _, ok := v.(string); ok { + noSent = append(noSent, k.(string)) + } + return true + }) + if len(noSent) > 0 { + return fmt.Errorf("there are still outputs that have not been sent: %v", noSent) + } + + return nil +} + +func (r *Reporter) duringSteps() bool { + if steps := r.state.Steps; len(steps) == 0 { + return false + } else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 { + return false + } else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED { + return false + } + return true +} + +var stringToResult = map[string]runnerv1.Result{ + "success": runnerv1.Result_RESULT_SUCCESS, + "failure": runnerv1.Result_RESULT_FAILURE, + "skipped": runnerv1.Result_RESULT_SKIPPED, + "cancelled": runnerv1.Result_RESULT_CANCELLED, +} + +func (r *Reporter) parseResult(result interface{}) (runnerv1.Result, bool) { + str := "" + if v, ok := result.(string); ok { // for jobResult + str = v + } else if v, ok := result.(fmt.Stringer); ok { // for stepResult + str = v.String() + } + + ret, ok := stringToResult[str] + return ret, ok +} + +var cmdRegex = regexp.MustCompile(`^::([^ :]+)( .*)?::(.*)$`) + +func (r *Reporter) handleCommand(originalContent, command, parameters, value string) *string { + if r.stopCommandEndToken != "" && command != r.stopCommandEndToken { + return &originalContent + } + + switch command { + case "add-mask": + r.addMask(value) + return nil + case "debug": + if r.debugOutputEnabled { + return &value + } + return nil + + case "notice": + // Not implemented yet, so just return the original content. + return &originalContent + case "warning": + // Not implemented yet, so just return the original content. + return &originalContent + case "error": + // Not implemented yet, so just return the original content. + return &originalContent + case "group": + // Rewriting into ##[] syntax which the frontend understands + content := "##[group]" + value + return &content + case "endgroup": + // Ditto + content := "##[endgroup]" + return &content + case "stop-commands": + r.stopCommandEndToken = value + return nil + case r.stopCommandEndToken: + r.stopCommandEndToken = "" + return nil + } + return &originalContent +} + +func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow { + content := strings.TrimRightFunc(entry.Message, func(r rune) bool { return r == '\r' || r == '\n' }) + + matches := cmdRegex.FindStringSubmatch(content) + if matches != nil { + if output := r.handleCommand(content, matches[1], matches[2], matches[3]); output != nil { + content = *output + } else { + return nil + } + } + + content = r.logReplacer.Replace(content) + + return &runnerv1.LogRow{ + Time: timestamppb.New(entry.Time), + Content: strings.ToValidUTF8(content, "?"), + } +} + +func (r *Reporter) addMask(msg string) { + r.oldnew = append(r.oldnew, msg, "***") + r.logReplacer = strings.NewReplacer(r.oldnew...) +} |