From dd136858f1ea40ad3c94191d647487fa4f31926c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 18 Oct 2024 20:33:49 +0200 Subject: Adding upstream version 9.0.0. Signed-off-by: Daniel Baumann --- modules/actions/log.go | 224 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 modules/actions/log.go (limited to 'modules/actions/log.go') diff --git a/modules/actions/log.go b/modules/actions/log.go new file mode 100644 index 0000000..5a1425e --- /dev/null +++ b/modules/actions/log.go @@ -0,0 +1,224 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "strings" + "time" + + "code.gitea.io/gitea/models/dbfs" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/storage" + "code.gitea.io/gitea/modules/zstd" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + MaxLineSize = 64 * 1024 + DBFSPrefix = "actions_log/" + + timeFormat = "2006-01-02T15:04:05.0000000Z07:00" + defaultBufSize = MaxLineSize +) + +// WriteLogs appends logs to DBFS file for temporary storage. +// It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content. +// Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage. +func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { + flag := os.O_WRONLY + if offset == 0 { + // Create file only if offset is 0, or it could result in content holes if the file doesn't exist. + flag |= os.O_CREATE + } + name := DBFSPrefix + filename + f, err := dbfs.OpenFile(ctx, name, flag) + if err != nil { + return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err) + } + defer f.Close() + + stat, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("dbfs Stat %q: %w", name, err) + } + if stat.Size() < offset { + // If the size is less than offset, refuse to write, or it could result in content holes. + // However, if the size is greater than offset, we can still write to overwrite the content. + return nil, fmt.Errorf("size of %q is less than offset", name) + } + + if _, err := f.Seek(offset, io.SeekStart); err != nil { + return nil, fmt.Errorf("dbfs Seek %q: %w", name, err) + } + + writer := bufio.NewWriterSize(f, defaultBufSize) + + ns := make([]int, 0, len(rows)) + for _, row := range rows { + n, err := writer.WriteString(FormatLog(row.Time.AsTime(), row.Content) + "\n") + if err != nil { + return nil, err + } + ns = append(ns, n) + } + + if err := writer.Flush(); err != nil { + return nil, err + } + return ns, nil +} + +func ReadLogs(ctx context.Context, inStorage bool, filename string, offset, limit int64) ([]*runnerv1.LogRow, error) { + f, err := OpenLogs(ctx, inStorage, filename) + if err != nil { + return nil, err + } + defer f.Close() + + if _, err := f.Seek(offset, io.SeekStart); err != nil { + return nil, fmt.Errorf("file seek: %w", err) + } + + scanner := bufio.NewScanner(f) + maxLineSize := len(timeFormat) + MaxLineSize + 1 + scanner.Buffer(make([]byte, maxLineSize), maxLineSize) + + var rows []*runnerv1.LogRow + for scanner.Scan() && (int64(len(rows)) < limit || limit < 0) { + t, c, err := ParseLog(scanner.Text()) + if err != nil { + return nil, fmt.Errorf("parse log %q: %w", scanner.Text(), err) + } + rows = append(rows, &runnerv1.LogRow{ + Time: timestamppb.New(t), + Content: c, + }) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("ReadLogs scan: %w", err) + } + + return rows, nil +} + +const ( + // logZstdBlockSize is the block size for zstd compression. + // 128KB leads the compression ratio to be close to the regular zstd compression. + // And it means each read from the underlying object storage will be at least 128KB*(compression ratio). + // The compression ratio is about 30% for text files, so the actual read size is about 38KB, which should be acceptable. + logZstdBlockSize = 128 * 1024 // 128KB +) + +// TransferLogs transfers logs from DBFS to object storage. +// It happens when the file is complete and no more logs will be appended. +// It respects the file format in the filename like ".zst", and compresses the content if needed. +func TransferLogs(ctx context.Context, filename string) (func(), error) { + name := DBFSPrefix + filename + remove := func() { + if err := dbfs.Remove(ctx, name); err != nil { + log.Warn("dbfs remove %q: %v", name, err) + } + } + f, err := dbfs.Open(ctx, name) + if err != nil { + return nil, fmt.Errorf("dbfs open %q: %w", name, err) + } + defer f.Close() + + var reader io.Reader = f + if strings.HasSuffix(filename, ".zst") { + r, w := io.Pipe() + reader = r + zstdWriter, err := zstd.NewSeekableWriter(w, logZstdBlockSize) + if err != nil { + return nil, fmt.Errorf("zstd NewSeekableWriter: %w", err) + } + go func() { + defer func() { + _ = w.CloseWithError(zstdWriter.Close()) + }() + if _, err := io.Copy(zstdWriter, f); err != nil { + _ = w.CloseWithError(err) + return + } + }() + } + + if _, err := storage.Actions.Save(filename, reader, -1); err != nil { + return nil, fmt.Errorf("storage save %q: %w", filename, err) + } + return remove, nil +} + +func RemoveLogs(ctx context.Context, inStorage bool, filename string) error { + if !inStorage { + name := DBFSPrefix + filename + err := dbfs.Remove(ctx, name) + if err != nil { + return fmt.Errorf("dbfs remove %q: %w", name, err) + } + return nil + } + err := storage.Actions.Delete(filename) + if err != nil { + return fmt.Errorf("storage delete %q: %w", filename, err) + } + return nil +} + +func OpenLogs(ctx context.Context, inStorage bool, filename string) (io.ReadSeekCloser, error) { + if !inStorage { + name := DBFSPrefix + filename + f, err := dbfs.Open(ctx, name) + if err != nil { + return nil, fmt.Errorf("dbfs open %q: %w", name, err) + } + return f, nil + } + + f, err := storage.Actions.Open(filename) + if err != nil { + return nil, fmt.Errorf("storage open %q: %w", filename, err) + } + + var reader io.ReadSeekCloser = f + if strings.HasSuffix(filename, ".zst") { + r, err := zstd.NewSeekableReader(f) + if err != nil { + return nil, fmt.Errorf("zstd NewSeekableReader: %w", err) + } + reader = r + } + + return reader, nil +} + +func FormatLog(timestamp time.Time, content string) string { + // Content shouldn't contain new line, it will break log indexes, other control chars are safe. + content = strings.ReplaceAll(content, "\n", `\n`) + if len(content) > MaxLineSize { + content = content[:MaxLineSize] + } + return fmt.Sprintf("%s %s", timestamp.UTC().Format(timeFormat), content) +} + +func ParseLog(in string) (time.Time, string, error) { + index := strings.IndexRune(in, ' ') + if index < 0 { + return time.Time{}, "", fmt.Errorf("invalid log: %q", in) + } + timestamp, err := time.Parse(timeFormat, in[:index]) + if err != nil { + return time.Time{}, "", err + } + return timestamp, in[index+1:], nil +} -- cgit v1.2.3