diff options
author | Daniel Baumann <daniel@debian.org> | 2024-10-18 20:33:49 +0200 |
---|---|---|
committer | Daniel Baumann <daniel@debian.org> | 2024-12-12 23:57:56 +0100 |
commit | e68b9d00a6e05b3a941f63ffb696f91e554ac5ec (patch) | |
tree | 97775d6c13b0f416af55314eb6a89ef792474615 /routers/api/actions/artifacts_chunks.go | |
parent | Initial commit. (diff) | |
download | forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.tar.xz forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.zip |
Adding upstream version 9.0.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to 'routers/api/actions/artifacts_chunks.go')
-rw-r--r-- | routers/api/actions/artifacts_chunks.go | 301 |
1 files changed, 301 insertions, 0 deletions
diff --git a/routers/api/actions/artifacts_chunks.go b/routers/api/actions/artifacts_chunks.go new file mode 100644 index 0000000..cdb5658 --- /dev/null +++ b/routers/api/actions/artifacts_chunks.go @@ -0,0 +1,301 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "crypto/md5" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "errors" + "fmt" + "hash" + "io" + "path/filepath" + "sort" + "strings" + "time" + + "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/storage" +) + +func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext, + artifact *actions.ActionArtifact, + contentSize, runID, start, end, length int64, checkMd5 bool, +) (int64, error) { + // build chunk store path + storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end) + var r io.Reader = ctx.Req.Body + var hasher hash.Hash + if checkMd5 { + // use io.TeeReader to avoid reading all body to md5 sum. + // it writes data to hasher after reading end + // if hash is not matched, delete the read-end result + hasher = md5.New() + r = io.TeeReader(r, hasher) + } + // save chunk to storage + writtenSize, err := st.Save(storagePath, r, contentSize) + if err != nil { + return -1, fmt.Errorf("save chunk to storage error: %v", err) + } + var checkErr error + if checkMd5 { + // check md5 + reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header) + chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) + log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String) + // if md5 not match, delete the chunk + if reqMd5String != chunkMd5String { + checkErr = fmt.Errorf("md5 not match") + } + } + if writtenSize != contentSize { + checkErr = errors.Join(checkErr, fmt.Errorf("contentSize not match body size")) + } + if checkErr != nil { + if err := st.Delete(storagePath); err != nil { + log.Error("Error deleting chunk: %s, %v", storagePath, err) + } + return -1, checkErr + } + log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d", + storagePath, contentSize, artifact.ID, start, end) + // return chunk total size + return length, nil +} + +func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext, + artifact *actions.ActionArtifact, + contentSize, runID int64, +) (int64, error) { + // parse content-range header, format: bytes 0-1023/146515 + contentRange := ctx.Req.Header.Get("Content-Range") + start, end, length := int64(0), int64(0), int64(0) + if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil { + log.Warn("parse content range error: %v, content-range: %s", err, contentRange) + return -1, fmt.Errorf("parse content range error: %v", err) + } + return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, length, true) +} + +func appendUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext, + artifact *actions.ActionArtifact, + start, contentSize, runID int64, +) (int64, error) { + end := start + contentSize - 1 + return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, contentSize, false) +} + +type chunkFileItem struct { + RunID int64 + ArtifactID int64 + Start int64 + End int64 + Path string +} + +func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) { + storageDir := fmt.Sprintf("tmp%d", runID) + var chunks []*chunkFileItem + if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error { + baseName := filepath.Base(fpath) + // when read chunks from storage, it only contains storage dir and basename, + // no matter the subdirectory setting in storage config + item := chunkFileItem{Path: storageDir + "/" + baseName} + if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil { + return fmt.Errorf("parse content range error: %v", err) + } + chunks = append(chunks, &item) + return nil + }); err != nil { + return nil, err + } + // chunks group by artifact id + chunksMap := make(map[int64][]*chunkFileItem) + for _, c := range chunks { + chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c) + } + return chunksMap, nil +} + +func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) { + storageDir := fmt.Sprintf("tmpv4%d", runID) + var chunks []*chunkFileItem + chunkMap := map[string]*chunkFileItem{} + dummy := &chunkFileItem{} + for _, name := range blist.Latest { + chunkMap[name] = dummy + } + if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error { + baseName := filepath.Base(fpath) + if !strings.HasPrefix(baseName, "block-") { + return nil + } + // when read chunks from storage, it only contains storage dir and basename, + // no matter the subdirectory setting in storage config + item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID} + var size int64 + var b64chunkName string + if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil { + return fmt.Errorf("parse content range error: %v", err) + } + rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName) + if err != nil { + return fmt.Errorf("failed to parse chunkName: %v", err) + } + chunkName := string(rchunkName) + item.End = item.Start + size - 1 + if _, ok := chunkMap[chunkName]; ok { + chunkMap[chunkName] = &item + } + return nil + }); err != nil { + return nil, err + } + for i, name := range blist.Latest { + chunk, ok := chunkMap[name] + if !ok || chunk.Path == "" { + return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name) + } + chunks = append(chunks, chunk) + if i > 0 { + chunk.Start = chunkMap[blist.Latest[i-1]].End + 1 + chunk.End += chunk.Start + } + } + return chunks, nil +} + +func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error { + // read all db artifacts by name + artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{ + RunID: runID, + ArtifactName: artifactName, + }) + if err != nil { + return err + } + // read all uploading chunks from storage + chunksMap, err := listChunksByRunID(st, runID) + if err != nil { + return err + } + // range db artifacts to merge chunks + for _, art := range artifacts { + chunks, ok := chunksMap[art.ID] + if !ok { + log.Debug("artifact %d chunks not found", art.ID) + continue + } + if err := mergeChunksForArtifact(ctx, chunks, st, art, ""); err != nil { + return err + } + } + return nil +} + +func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact, checksum string) error { + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].Start < chunks[j].Start + }) + allChunks := make([]*chunkFileItem, 0) + startAt := int64(-1) + // check if all chunks are uploaded and in order and clean repeated chunks + for _, c := range chunks { + // startAt is -1 means this is the first chunk + // previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order + // StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing + if c.Start == (startAt + 1) { + allChunks = append(allChunks, c) + startAt = c.End + } + } + // if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely + if startAt+1 != artifact.FileCompressedSize { + log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID) + return nil + } + // use multiReader + readers := make([]io.Reader, 0, len(allChunks)) + closeReaders := func() { + for _, r := range readers { + _ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function + } + readers = nil + } + defer closeReaders() + for _, c := range allChunks { + var readCloser io.ReadCloser + var err error + if readCloser, err = st.Open(c.Path); err != nil { + return fmt.Errorf("open chunk error: %v, %s", err, c.Path) + } + readers = append(readers, readCloser) + } + mergedReader := io.MultiReader(readers...) + shaPrefix := "sha256:" + var hash hash.Hash + if strings.HasPrefix(checksum, shaPrefix) { + hash = sha256.New() + } + if hash != nil { + mergedReader = io.TeeReader(mergedReader, hash) + } + + // if chunk is gzip, use gz as extension + // download-artifact action will use content-encoding header to decide if it should decompress the file + extension := "chunk" + if artifact.ContentEncoding == "gzip" { + extension = "chunk.gz" + } + + // save merged file + storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension) + written, err := st.Save(storagePath, mergedReader, artifact.FileCompressedSize) + if err != nil { + return fmt.Errorf("save merged file error: %v", err) + } + if written != artifact.FileCompressedSize { + return fmt.Errorf("merged file size is not equal to chunk length") + } + + defer func() { + closeReaders() // close before delete + // drop chunks + for _, c := range chunks { + if err := st.Delete(c.Path); err != nil { + log.Warn("Error deleting chunk: %s, %v", c.Path, err) + } + } + }() + + if hash != nil { + rawChecksum := hash.Sum(nil) + actualChecksum := hex.EncodeToString(rawChecksum) + if !strings.HasSuffix(checksum, actualChecksum) { + return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum) + } + } + + // save storage path to artifact + log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath) + // if artifact is already uploaded, delete the old file + if artifact.StoragePath != "" { + if err := st.Delete(artifact.StoragePath); err != nil { + log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err) + } + } + + artifact.StoragePath = storagePath + artifact.Status = int64(actions.ArtifactStatusUploadConfirmed) + if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil { + return fmt.Errorf("update artifact error: %v", err) + } + + return nil +} |