summaryrefslogtreecommitdiffstats
path: root/routers/api/actions/artifacts_chunks.go
diff options
context:
space:
mode:
authorDaniel Baumann <daniel@debian.org>2024-10-18 20:33:49 +0200
committerDaniel Baumann <daniel@debian.org>2024-12-12 23:57:56 +0100
commite68b9d00a6e05b3a941f63ffb696f91e554ac5ec (patch)
tree97775d6c13b0f416af55314eb6a89ef792474615 /routers/api/actions/artifacts_chunks.go
parentInitial commit. (diff)
downloadforgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.tar.xz
forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.zip
Adding upstream version 9.0.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to '')
-rw-r--r--routers/api/actions/artifacts_chunks.go301
1 files changed, 301 insertions, 0 deletions
diff --git a/routers/api/actions/artifacts_chunks.go b/routers/api/actions/artifacts_chunks.go
new file mode 100644
index 0000000..cdb5658
--- /dev/null
+++ b/routers/api/actions/artifacts_chunks.go
@@ -0,0 +1,301 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "crypto/md5"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/hex"
+ "errors"
+ "fmt"
+ "hash"
+ "io"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+
+ "code.gitea.io/gitea/models/actions"
+ "code.gitea.io/gitea/models/db"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/storage"
+)
+
+func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext,
+ artifact *actions.ActionArtifact,
+ contentSize, runID, start, end, length int64, checkMd5 bool,
+) (int64, error) {
+ // build chunk store path
+ storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
+ var r io.Reader = ctx.Req.Body
+ var hasher hash.Hash
+ if checkMd5 {
+ // use io.TeeReader to avoid reading all body to md5 sum.
+ // it writes data to hasher after reading end
+ // if hash is not matched, delete the read-end result
+ hasher = md5.New()
+ r = io.TeeReader(r, hasher)
+ }
+ // save chunk to storage
+ writtenSize, err := st.Save(storagePath, r, contentSize)
+ if err != nil {
+ return -1, fmt.Errorf("save chunk to storage error: %v", err)
+ }
+ var checkErr error
+ if checkMd5 {
+ // check md5
+ reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
+ chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
+ log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
+ // if md5 not match, delete the chunk
+ if reqMd5String != chunkMd5String {
+ checkErr = fmt.Errorf("md5 not match")
+ }
+ }
+ if writtenSize != contentSize {
+ checkErr = errors.Join(checkErr, fmt.Errorf("contentSize not match body size"))
+ }
+ if checkErr != nil {
+ if err := st.Delete(storagePath); err != nil {
+ log.Error("Error deleting chunk: %s, %v", storagePath, err)
+ }
+ return -1, checkErr
+ }
+ log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
+ storagePath, contentSize, artifact.ID, start, end)
+ // return chunk total size
+ return length, nil
+}
+
+func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
+ artifact *actions.ActionArtifact,
+ contentSize, runID int64,
+) (int64, error) {
+ // parse content-range header, format: bytes 0-1023/146515
+ contentRange := ctx.Req.Header.Get("Content-Range")
+ start, end, length := int64(0), int64(0), int64(0)
+ if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
+ log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
+ return -1, fmt.Errorf("parse content range error: %v", err)
+ }
+ return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, length, true)
+}
+
+func appendUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
+ artifact *actions.ActionArtifact,
+ start, contentSize, runID int64,
+) (int64, error) {
+ end := start + contentSize - 1
+ return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, contentSize, false)
+}
+
+type chunkFileItem struct {
+ RunID int64
+ ArtifactID int64
+ Start int64
+ End int64
+ Path string
+}
+
+func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
+ storageDir := fmt.Sprintf("tmp%d", runID)
+ var chunks []*chunkFileItem
+ if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
+ baseName := filepath.Base(fpath)
+ // when read chunks from storage, it only contains storage dir and basename,
+ // no matter the subdirectory setting in storage config
+ item := chunkFileItem{Path: storageDir + "/" + baseName}
+ if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
+ return fmt.Errorf("parse content range error: %v", err)
+ }
+ chunks = append(chunks, &item)
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ // chunks group by artifact id
+ chunksMap := make(map[int64][]*chunkFileItem)
+ for _, c := range chunks {
+ chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
+ }
+ return chunksMap, nil
+}
+
+func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
+ storageDir := fmt.Sprintf("tmpv4%d", runID)
+ var chunks []*chunkFileItem
+ chunkMap := map[string]*chunkFileItem{}
+ dummy := &chunkFileItem{}
+ for _, name := range blist.Latest {
+ chunkMap[name] = dummy
+ }
+ if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
+ baseName := filepath.Base(fpath)
+ if !strings.HasPrefix(baseName, "block-") {
+ return nil
+ }
+ // when read chunks from storage, it only contains storage dir and basename,
+ // no matter the subdirectory setting in storage config
+ item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
+ var size int64
+ var b64chunkName string
+ if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
+ return fmt.Errorf("parse content range error: %v", err)
+ }
+ rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
+ if err != nil {
+ return fmt.Errorf("failed to parse chunkName: %v", err)
+ }
+ chunkName := string(rchunkName)
+ item.End = item.Start + size - 1
+ if _, ok := chunkMap[chunkName]; ok {
+ chunkMap[chunkName] = &item
+ }
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ for i, name := range blist.Latest {
+ chunk, ok := chunkMap[name]
+ if !ok || chunk.Path == "" {
+ return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
+ }
+ chunks = append(chunks, chunk)
+ if i > 0 {
+ chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
+ chunk.End += chunk.Start
+ }
+ }
+ return chunks, nil
+}
+
+func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
+ // read all db artifacts by name
+ artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
+ RunID: runID,
+ ArtifactName: artifactName,
+ })
+ if err != nil {
+ return err
+ }
+ // read all uploading chunks from storage
+ chunksMap, err := listChunksByRunID(st, runID)
+ if err != nil {
+ return err
+ }
+ // range db artifacts to merge chunks
+ for _, art := range artifacts {
+ chunks, ok := chunksMap[art.ID]
+ if !ok {
+ log.Debug("artifact %d chunks not found", art.ID)
+ continue
+ }
+ if err := mergeChunksForArtifact(ctx, chunks, st, art, ""); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact, checksum string) error {
+ sort.Slice(chunks, func(i, j int) bool {
+ return chunks[i].Start < chunks[j].Start
+ })
+ allChunks := make([]*chunkFileItem, 0)
+ startAt := int64(-1)
+ // check if all chunks are uploaded and in order and clean repeated chunks
+ for _, c := range chunks {
+ // startAt is -1 means this is the first chunk
+ // previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
+ // StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
+ if c.Start == (startAt + 1) {
+ allChunks = append(allChunks, c)
+ startAt = c.End
+ }
+ }
+ // if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
+ if startAt+1 != artifact.FileCompressedSize {
+ log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
+ return nil
+ }
+ // use multiReader
+ readers := make([]io.Reader, 0, len(allChunks))
+ closeReaders := func() {
+ for _, r := range readers {
+ _ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
+ }
+ readers = nil
+ }
+ defer closeReaders()
+ for _, c := range allChunks {
+ var readCloser io.ReadCloser
+ var err error
+ if readCloser, err = st.Open(c.Path); err != nil {
+ return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
+ }
+ readers = append(readers, readCloser)
+ }
+ mergedReader := io.MultiReader(readers...)
+ shaPrefix := "sha256:"
+ var hash hash.Hash
+ if strings.HasPrefix(checksum, shaPrefix) {
+ hash = sha256.New()
+ }
+ if hash != nil {
+ mergedReader = io.TeeReader(mergedReader, hash)
+ }
+
+ // if chunk is gzip, use gz as extension
+ // download-artifact action will use content-encoding header to decide if it should decompress the file
+ extension := "chunk"
+ if artifact.ContentEncoding == "gzip" {
+ extension = "chunk.gz"
+ }
+
+ // save merged file
+ storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
+ written, err := st.Save(storagePath, mergedReader, artifact.FileCompressedSize)
+ if err != nil {
+ return fmt.Errorf("save merged file error: %v", err)
+ }
+ if written != artifact.FileCompressedSize {
+ return fmt.Errorf("merged file size is not equal to chunk length")
+ }
+
+ defer func() {
+ closeReaders() // close before delete
+ // drop chunks
+ for _, c := range chunks {
+ if err := st.Delete(c.Path); err != nil {
+ log.Warn("Error deleting chunk: %s, %v", c.Path, err)
+ }
+ }
+ }()
+
+ if hash != nil {
+ rawChecksum := hash.Sum(nil)
+ actualChecksum := hex.EncodeToString(rawChecksum)
+ if !strings.HasSuffix(checksum, actualChecksum) {
+ return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
+ }
+ }
+
+ // save storage path to artifact
+ log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
+ // if artifact is already uploaded, delete the old file
+ if artifact.StoragePath != "" {
+ if err := st.Delete(artifact.StoragePath); err != nil {
+ log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
+ }
+ }
+
+ artifact.StoragePath = storagePath
+ artifact.Status = int64(actions.ArtifactStatusUploadConfirmed)
+ if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
+ return fmt.Errorf("update artifact error: %v", err)
+ }
+
+ return nil
+}