summaryrefslogtreecommitdiffstats
path: root/services/repository/archiver
diff options
context:
space:
mode:
authorDaniel Baumann <daniel@debian.org>2024-10-18 20:33:49 +0200
committerDaniel Baumann <daniel@debian.org>2024-12-12 23:57:56 +0100
commite68b9d00a6e05b3a941f63ffb696f91e554ac5ec (patch)
tree97775d6c13b0f416af55314eb6a89ef792474615 /services/repository/archiver
parentInitial commit. (diff)
downloadforgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.tar.xz
forgejo-e68b9d00a6e05b3a941f63ffb696f91e554ac5ec.zip
Adding upstream version 9.0.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to '')
-rw-r--r--services/repository/archiver/archiver.go377
-rw-r--r--services/repository/archiver/archiver_test.go134
2 files changed, 511 insertions, 0 deletions
diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go
new file mode 100644
index 0000000..c74712b
--- /dev/null
+++ b/services/repository/archiver/archiver.go
@@ -0,0 +1,377 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package archiver
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+ "time"
+
+ "code.gitea.io/gitea/models/db"
+ repo_model "code.gitea.io/gitea/models/repo"
+ "code.gitea.io/gitea/modules/git"
+ "code.gitea.io/gitea/modules/gitrepo"
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
+ "code.gitea.io/gitea/modules/queue"
+ "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/storage"
+)
+
+// ArchiveRequest defines the parameters of an archive request, which notably
+// includes the specific repository being archived as well as the commit, the
+// name by which it was requested, and the kind of archive being requested.
+// This is entirely opaque to external entities, though, and mostly used as a
+// handle elsewhere.
+type ArchiveRequest struct {
+ RepoID int64
+ refName string
+ Type git.ArchiveType
+ CommitID string
+ ReleaseID int64
+}
+
+// ErrUnknownArchiveFormat request archive format is not supported
+type ErrUnknownArchiveFormat struct {
+ RequestFormat string
+}
+
+// Error implements error
+func (err ErrUnknownArchiveFormat) Error() string {
+ return fmt.Sprintf("unknown format: %s", err.RequestFormat)
+}
+
+// Is implements error
+func (ErrUnknownArchiveFormat) Is(err error) bool {
+ _, ok := err.(ErrUnknownArchiveFormat)
+ return ok
+}
+
+// RepoRefNotFoundError is returned when a requested reference (commit, tag) was not found.
+type RepoRefNotFoundError struct {
+ RefName string
+}
+
+// Error implements error.
+func (e RepoRefNotFoundError) Error() string {
+ return fmt.Sprintf("unrecognized repository reference: %s", e.RefName)
+}
+
+func (e RepoRefNotFoundError) Is(err error) bool {
+ _, ok := err.(RepoRefNotFoundError)
+ return ok
+}
+
+// NewRequest creates an archival request, based on the URI. The
+// resulting ArchiveRequest is suitable for being passed to ArchiveRepository()
+// if it's determined that the request still needs to be satisfied.
+func NewRequest(ctx context.Context, repoID int64, repo *git.Repository, uri string) (*ArchiveRequest, error) {
+ r := &ArchiveRequest{
+ RepoID: repoID,
+ }
+
+ var ext string
+ switch {
+ case strings.HasSuffix(uri, ".zip"):
+ ext = ".zip"
+ r.Type = git.ZIP
+ case strings.HasSuffix(uri, ".tar.gz"):
+ ext = ".tar.gz"
+ r.Type = git.TARGZ
+ case strings.HasSuffix(uri, ".bundle"):
+ ext = ".bundle"
+ r.Type = git.BUNDLE
+ default:
+ return nil, ErrUnknownArchiveFormat{RequestFormat: uri}
+ }
+
+ r.refName = strings.TrimSuffix(uri, ext)
+
+ // Get corresponding commit.
+ commitID, err := repo.ConvertToGitID(r.refName)
+ if err != nil {
+ return nil, RepoRefNotFoundError{RefName: r.refName}
+ }
+
+ r.CommitID = commitID.String()
+
+ release, err := repo_model.GetRelease(ctx, repoID, r.refName)
+ if err != nil {
+ if !repo_model.IsErrReleaseNotExist(err) {
+ return nil, err
+ }
+ }
+ if release != nil {
+ r.ReleaseID = release.ID
+ }
+
+ return r, nil
+}
+
+// GetArchiveName returns the name of the caller, based on the ref used by the
+// caller to create this request.
+func (aReq *ArchiveRequest) GetArchiveName() string {
+ return strings.ReplaceAll(aReq.refName, "/", "-") + "." + aReq.Type.String()
+}
+
+// Await awaits the completion of an ArchiveRequest. If the archive has
+// already been prepared the method returns immediately. Otherwise an archiver
+// process will be started and its completion awaited. On success the returned
+// RepoArchiver may be used to download the archive. Note that even if the
+// context is cancelled/times out a started archiver will still continue to run
+// in the background.
+func (aReq *ArchiveRequest) Await(ctx context.Context) (*repo_model.RepoArchiver, error) {
+ archiver, err := repo_model.GetRepoArchiver(ctx, aReq.RepoID, aReq.Type, aReq.CommitID)
+ if err != nil {
+ return nil, fmt.Errorf("models.GetRepoArchiver: %w", err)
+ }
+
+ if archiver != nil {
+ archiver.ReleaseID = aReq.ReleaseID
+ }
+
+ if archiver != nil && archiver.Status == repo_model.ArchiverReady {
+ // Archive already generated, we're done.
+ return archiver, nil
+ }
+
+ if err := StartArchive(aReq); err != nil {
+ return nil, fmt.Errorf("archiver.StartArchive: %w", err)
+ }
+
+ poll := time.NewTicker(time.Second * 1)
+ defer poll.Stop()
+
+ for {
+ select {
+ case <-graceful.GetManager().HammerContext().Done():
+ // System stopped.
+ return nil, graceful.GetManager().HammerContext().Err()
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-poll.C:
+ archiver, err = repo_model.GetRepoArchiver(ctx, aReq.RepoID, aReq.Type, aReq.CommitID)
+ if err != nil {
+ return nil, fmt.Errorf("repo_model.GetRepoArchiver: %w", err)
+ }
+ if archiver != nil && archiver.Status == repo_model.ArchiverReady {
+ archiver.ReleaseID = aReq.ReleaseID
+ return archiver, nil
+ }
+ }
+ }
+}
+
+func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver, error) {
+ txCtx, committer, err := db.TxContext(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer committer.Close()
+ ctx, _, finished := process.GetManager().AddContext(txCtx, fmt.Sprintf("ArchiveRequest[%d]: %s", r.RepoID, r.GetArchiveName()))
+ defer finished()
+
+ archiver, err := repo_model.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID)
+ if err != nil {
+ return nil, err
+ }
+
+ if archiver != nil {
+ // FIXME: If another process are generating it, we think it's not ready and just return
+ // Or we should wait until the archive generated.
+ if archiver.Status == repo_model.ArchiverGenerating {
+ return nil, nil
+ }
+ } else {
+ archiver = &repo_model.RepoArchiver{
+ RepoID: r.RepoID,
+ Type: r.Type,
+ CommitID: r.CommitID,
+ Status: repo_model.ArchiverGenerating,
+ }
+ if err := db.Insert(ctx, archiver); err != nil {
+ return nil, err
+ }
+ }
+
+ rPath := archiver.RelativePath()
+ _, err = storage.RepoArchives.Stat(rPath)
+ if err == nil {
+ if archiver.Status == repo_model.ArchiverGenerating {
+ archiver.Status = repo_model.ArchiverReady
+ if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil {
+ return nil, err
+ }
+ }
+ return archiver, committer.Commit()
+ }
+
+ if !errors.Is(err, os.ErrNotExist) {
+ return nil, fmt.Errorf("unable to stat archive: %w", err)
+ }
+
+ rd, w := io.Pipe()
+ defer func() {
+ w.Close()
+ rd.Close()
+ }()
+ done := make(chan error, 1) // Ensure that there is some capacity which will ensure that the goroutine below can always finish
+ repo, err := repo_model.GetRepositoryByID(ctx, archiver.RepoID)
+ if err != nil {
+ return nil, fmt.Errorf("archiver.LoadRepo failed: %w", err)
+ }
+
+ gitRepo, err := gitrepo.OpenRepository(ctx, repo)
+ if err != nil {
+ return nil, err
+ }
+ defer gitRepo.Close()
+
+ go func(done chan error, w *io.PipeWriter, archiver *repo_model.RepoArchiver, gitRepo *git.Repository) {
+ defer func() {
+ if r := recover(); r != nil {
+ done <- fmt.Errorf("%v", r)
+ }
+ }()
+
+ if archiver.Type == git.BUNDLE {
+ err = gitRepo.CreateBundle(
+ ctx,
+ archiver.CommitID,
+ w,
+ )
+ } else {
+ err = gitRepo.CreateArchive(
+ ctx,
+ archiver.Type,
+ w,
+ setting.Repository.PrefixArchiveFiles,
+ archiver.CommitID,
+ )
+ }
+ _ = w.CloseWithError(err)
+ done <- err
+ }(done, w, archiver, gitRepo)
+
+ // TODO: add lfs data to zip
+ // TODO: add submodule data to zip
+
+ if _, err := storage.RepoArchives.Save(rPath, rd, -1); err != nil {
+ return nil, fmt.Errorf("unable to write archive: %w", err)
+ }
+
+ err = <-done
+ if err != nil {
+ return nil, err
+ }
+
+ if archiver.Status == repo_model.ArchiverGenerating {
+ archiver.Status = repo_model.ArchiverReady
+ if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil {
+ return nil, err
+ }
+ }
+
+ return archiver, committer.Commit()
+}
+
+// ArchiveRepository satisfies the ArchiveRequest being passed in. Processing
+// will occur in a separate goroutine, as this phase may take a while to
+// complete. If the archive already exists, ArchiveRepository will not do
+// anything. In all cases, the caller should be examining the *ArchiveRequest
+// being returned for completion, as it may be different than the one they passed
+// in.
+func ArchiveRepository(ctx context.Context, request *ArchiveRequest) (*repo_model.RepoArchiver, error) {
+ return doArchive(ctx, request)
+}
+
+var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
+
+// Init initializes archiver
+func Init(ctx context.Context) error {
+ handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
+ for _, archiveReq := range items {
+ log.Trace("ArchiverData Process: %#v", archiveReq)
+ if _, err := doArchive(ctx, archiveReq); err != nil {
+ log.Error("Archive %v failed: %v", archiveReq, err)
+ }
+ }
+ return nil
+ }
+
+ archiverQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo-archive", handler)
+ if archiverQueue == nil {
+ return errors.New("unable to create repo-archive queue")
+ }
+ go graceful.GetManager().RunWithCancel(archiverQueue)
+
+ return nil
+}
+
+// StartArchive push the archive request to the queue
+func StartArchive(request *ArchiveRequest) error {
+ has, err := archiverQueue.Has(request)
+ if err != nil {
+ return err
+ }
+ if has {
+ return nil
+ }
+ return archiverQueue.Push(request)
+}
+
+func deleteOldRepoArchiver(ctx context.Context, archiver *repo_model.RepoArchiver) error {
+ if _, err := db.DeleteByID[repo_model.RepoArchiver](ctx, archiver.ID); err != nil {
+ return err
+ }
+ p := archiver.RelativePath()
+ if err := storage.RepoArchives.Delete(p); err != nil {
+ log.Error("delete repo archive file failed: %v", err)
+ }
+ return nil
+}
+
+// DeleteOldRepositoryArchives deletes old repository archives.
+func DeleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration) error {
+ log.Trace("Doing: ArchiveCleanup")
+
+ for {
+ archivers, err := db.Find[repo_model.RepoArchiver](ctx, repo_model.FindRepoArchiversOption{
+ ListOptions: db.ListOptions{
+ PageSize: 100,
+ Page: 1,
+ },
+ OlderThan: olderThan,
+ })
+ if err != nil {
+ log.Trace("Error: ArchiveClean: %v", err)
+ return err
+ }
+
+ for _, archiver := range archivers {
+ if err := deleteOldRepoArchiver(ctx, archiver); err != nil {
+ return err
+ }
+ }
+ if len(archivers) < 100 {
+ break
+ }
+ }
+
+ log.Trace("Finished: ArchiveCleanup")
+ return nil
+}
+
+// DeleteRepositoryArchives deletes all repositories' archives.
+func DeleteRepositoryArchives(ctx context.Context) error {
+ if err := repo_model.DeleteAllRepoArchives(ctx); err != nil {
+ return err
+ }
+ return storage.Clean(storage.RepoArchives)
+}
diff --git a/services/repository/archiver/archiver_test.go b/services/repository/archiver/archiver_test.go
new file mode 100644
index 0000000..9f822a3
--- /dev/null
+++ b/services/repository/archiver/archiver_test.go
@@ -0,0 +1,134 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package archiver
+
+import (
+ "testing"
+ "time"
+
+ "code.gitea.io/gitea/models/db"
+ "code.gitea.io/gitea/models/unittest"
+ "code.gitea.io/gitea/services/contexttest"
+
+ _ "code.gitea.io/gitea/models/actions"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestMain(m *testing.M) {
+ unittest.MainTest(m)
+}
+
+func TestArchive_Basic(t *testing.T) {
+ require.NoError(t, unittest.PrepareTestDatabase())
+
+ ctx, _ := contexttest.MockContext(t, "user27/repo49")
+ firstCommit, secondCommit := "51f84af23134", "aacbdfe9e1c4"
+
+ contexttest.LoadRepo(t, ctx, 49)
+ contexttest.LoadGitRepo(t, ctx)
+ defer ctx.Repo.GitRepo.Close()
+
+ bogusReq, err := NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip")
+ require.NoError(t, err)
+ assert.NotNil(t, bogusReq)
+ assert.EqualValues(t, firstCommit+".zip", bogusReq.GetArchiveName())
+
+ // Check a series of bogus requests.
+ // Step 1, valid commit with a bad extension.
+ bogusReq, err = NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".dilbert")
+ require.Error(t, err)
+ assert.Nil(t, bogusReq)
+
+ // Step 2, missing commit.
+ bogusReq, err = NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "dbffff.zip")
+ require.Error(t, err)
+ assert.Nil(t, bogusReq)
+
+ // Step 3, doesn't look like branch/tag/commit.
+ bogusReq, err = NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "db.zip")
+ require.Error(t, err)
+ assert.Nil(t, bogusReq)
+
+ bogusReq, err = NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "master.zip")
+ require.NoError(t, err)
+ assert.NotNil(t, bogusReq)
+ assert.EqualValues(t, "master.zip", bogusReq.GetArchiveName())
+
+ bogusReq, err = NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "test/archive.zip")
+ require.NoError(t, err)
+ assert.NotNil(t, bogusReq)
+ assert.EqualValues(t, "test-archive.zip", bogusReq.GetArchiveName())
+
+ // Now two valid requests, firstCommit with valid extensions.
+ zipReq, err := NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip")
+ require.NoError(t, err)
+ assert.NotNil(t, zipReq)
+
+ tgzReq, err := NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".tar.gz")
+ require.NoError(t, err)
+ assert.NotNil(t, tgzReq)
+
+ secondReq, err := NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".zip")
+ require.NoError(t, err)
+ assert.NotNil(t, secondReq)
+
+ inFlight := make([]*ArchiveRequest, 3)
+ inFlight[0] = zipReq
+ inFlight[1] = tgzReq
+ inFlight[2] = secondReq
+
+ ArchiveRepository(db.DefaultContext, zipReq)
+ ArchiveRepository(db.DefaultContext, tgzReq)
+ ArchiveRepository(db.DefaultContext, secondReq)
+
+ // Make sure sending an unprocessed request through doesn't affect the queue
+ // count.
+ ArchiveRepository(db.DefaultContext, zipReq)
+
+ // Sleep two seconds to make sure the queue doesn't change.
+ time.Sleep(2 * time.Second)
+
+ zipReq2, err := NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip")
+ require.NoError(t, err)
+ // This zipReq should match what's sitting in the queue, as we haven't
+ // let it release yet. From the consumer's point of view, this looks like
+ // a long-running archive task.
+ assert.Equal(t, zipReq, zipReq2)
+
+ // We still have the other three stalled at completion, waiting to remove
+ // from archiveInProgress. Try to submit this new one before its
+ // predecessor has cleared out of the queue.
+ ArchiveRepository(db.DefaultContext, zipReq2)
+
+ // Now we'll submit a request and TimedWaitForCompletion twice, before and
+ // after we release it. We should trigger both the timeout and non-timeout
+ // cases.
+ timedReq, err := NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".tar.gz")
+ require.NoError(t, err)
+ assert.NotNil(t, timedReq)
+ ArchiveRepository(db.DefaultContext, timedReq)
+
+ zipReq2, err = NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip")
+ require.NoError(t, err)
+ // Now, we're guaranteed to have released the original zipReq from the queue.
+ // Ensure that we don't get handed back the released entry somehow, but they
+ // should remain functionally equivalent in all fields. The exception here
+ // is zipReq.cchan, which will be non-nil because it's a completed request.
+ // It's fine to go ahead and set it to nil now.
+
+ assert.Equal(t, zipReq, zipReq2)
+ assert.NotSame(t, zipReq, zipReq2)
+
+ // Same commit, different compression formats should have different names.
+ // Ideally, the extension would match what we originally requested.
+ assert.NotEqual(t, zipReq.GetArchiveName(), tgzReq.GetArchiveName())
+ assert.NotEqual(t, zipReq.GetArchiveName(), secondReq.GetArchiveName())
+}
+
+func TestErrUnknownArchiveFormat(t *testing.T) {
+ err := ErrUnknownArchiveFormat{RequestFormat: "master"}
+ assert.ErrorIs(t, err, ErrUnknownArchiveFormat{})
+}