From e68b9d00a6e05b3a941f63ffb696f91e554ac5ec Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 18 Oct 2024 20:33:49 +0200 Subject: Adding upstream version 9.0.3. Signed-off-by: Daniel Baumann --- services/mirror/mirror_push.go | 313 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 313 insertions(+) create mode 100644 services/mirror/mirror_push.go (limited to 'services/mirror/mirror_push.go') diff --git a/services/mirror/mirror_push.go b/services/mirror/mirror_push.go new file mode 100644 index 0000000..3a9644c --- /dev/null +++ b/services/mirror/mirror_push.go @@ -0,0 +1,313 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package mirror + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "regexp" + "strings" + "time" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/gitrepo" + "code.gitea.io/gitea/modules/lfs" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/repository" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" +) + +var stripExitStatus = regexp.MustCompile(`exit status \d+ - `) + +// AddPushMirrorRemote registers the push mirror remote. +var AddPushMirrorRemote = addPushMirrorRemote + +func addPushMirrorRemote(ctx context.Context, m *repo_model.PushMirror, addr string) error { + addRemoteAndConfig := func(addr, path string) error { + cmd := git.NewCommand(ctx, "remote", "add", "--mirror=push").AddDynamicArguments(m.RemoteName, addr) + if strings.Contains(addr, "://") && strings.Contains(addr, "@") { + cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=push %s [repo_path: %s]", m.RemoteName, util.SanitizeCredentialURLs(addr), path)) + } else { + cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=push %s [repo_path: %s]", m.RemoteName, addr, path)) + } + if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: path}); err != nil { + return err + } + if _, _, err := git.NewCommand(ctx, "config", "--add").AddDynamicArguments("remote."+m.RemoteName+".push", "+refs/heads/*:refs/heads/*").RunStdString(&git.RunOpts{Dir: path}); err != nil { + return err + } + if _, _, err := git.NewCommand(ctx, "config", "--add").AddDynamicArguments("remote."+m.RemoteName+".push", "+refs/tags/*:refs/tags/*").RunStdString(&git.RunOpts{Dir: path}); err != nil { + return err + } + return nil + } + + if err := addRemoteAndConfig(addr, m.Repo.RepoPath()); err != nil { + return err + } + + if m.Repo.HasWiki() { + wikiRemoteURL := repository.WikiRemoteURL(ctx, addr) + if len(wikiRemoteURL) > 0 { + if err := addRemoteAndConfig(wikiRemoteURL, m.Repo.WikiPath()); err != nil { + return err + } + } + } + + return nil +} + +// RemovePushMirrorRemote removes the push mirror remote. +func RemovePushMirrorRemote(ctx context.Context, m *repo_model.PushMirror) error { + cmd := git.NewCommand(ctx, "remote", "rm").AddDynamicArguments(m.RemoteName) + _ = m.GetRepository(ctx) + + if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: m.Repo.RepoPath()}); err != nil { + return err + } + + if m.Repo.HasWiki() { + if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: m.Repo.WikiPath()}); err != nil { + // The wiki remote may not exist + log.Warn("Wiki Remote[%d] could not be removed: %v", m.ID, err) + } + } + + return nil +} + +// SyncPushMirror starts the sync of the push mirror and schedules the next run. +func SyncPushMirror(ctx context.Context, mirrorID int64) bool { + log.Trace("SyncPushMirror [mirror: %d]", mirrorID) + defer func() { + err := recover() + if err == nil { + return + } + // There was a panic whilst syncPushMirror... + log.Error("PANIC whilst syncPushMirror[%d] Panic: %v\nStacktrace: %s", mirrorID, err, log.Stack(2)) + }() + + // TODO: Handle "!exist" better + m, exist, err := db.GetByID[repo_model.PushMirror](ctx, mirrorID) + if err != nil || !exist { + log.Error("GetPushMirrorByID [%d]: %v", mirrorID, err) + return false + } + + _ = m.GetRepository(ctx) + + m.LastError = "" + + ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Syncing PushMirror %s/%s to %s", m.Repo.OwnerName, m.Repo.Name, m.RemoteName)) + defer finished() + + log.Trace("SyncPushMirror [mirror: %d][repo: %-v]: Running Sync", m.ID, m.Repo) + err = runPushSync(ctx, m) + if err != nil { + log.Error("SyncPushMirror [mirror: %d][repo: %-v]: %v", m.ID, m.Repo, err) + m.LastError = stripExitStatus.ReplaceAllLiteralString(err.Error(), "") + } + + m.LastUpdateUnix = timeutil.TimeStampNow() + + if err := repo_model.UpdatePushMirror(ctx, m); err != nil { + log.Error("UpdatePushMirror [%d]: %v", m.ID, err) + + return false + } + + log.Trace("SyncPushMirror [mirror: %d][repo: %-v]: Finished", m.ID, m.Repo) + + return err == nil +} + +func runPushSync(ctx context.Context, m *repo_model.PushMirror) error { + timeout := time.Duration(setting.Git.Timeout.Mirror) * time.Second + + performPush := func(repo *repo_model.Repository, isWiki bool) error { + path := repo.RepoPath() + if isWiki { + path = repo.WikiPath() + } + remoteURL, err := git.GetRemoteURL(ctx, path, m.RemoteName) + if err != nil { + log.Error("GetRemoteAddress(%s) Error %v", path, err) + return errors.New("Unexpected error") + } + + if setting.LFS.StartServer { + log.Trace("SyncMirrors [repo: %-v]: syncing LFS objects...", m.Repo) + + var gitRepo *git.Repository + if isWiki { + gitRepo, err = gitrepo.OpenWikiRepository(ctx, repo) + } else { + gitRepo, err = gitrepo.OpenRepository(ctx, repo) + } + if err != nil { + log.Error("OpenRepository: %v", err) + return errors.New("Unexpected error") + } + defer gitRepo.Close() + + endpoint := lfs.DetermineEndpoint(remoteURL.String(), "") + lfsClient := lfs.NewClient(endpoint, nil) + if err := pushAllLFSObjects(ctx, gitRepo, lfsClient); err != nil { + return util.SanitizeErrorCredentialURLs(err) + } + } + + log.Trace("Pushing %s mirror[%d] remote %s", path, m.ID, m.RemoteName) + + // OpenSSH isn't very intuitive when you want to specify a specific keypair. + // Therefore, we need to create a temporary file that stores the private key, so that OpenSSH can use it. + // We delete the the temporary file afterwards. + privateKeyPath := "" + if m.PublicKey != "" { + f, err := os.CreateTemp(os.TempDir(), m.RemoteName) + if err != nil { + log.Error("os.CreateTemp: %v", err) + return errors.New("unexpected error") + } + + defer func() { + f.Close() + if err := os.Remove(f.Name()); err != nil { + log.Error("os.Remove: %v", err) + } + }() + + privateKey, err := m.Privatekey() + if err != nil { + log.Error("Privatekey: %v", err) + return errors.New("unexpected error") + } + + if _, err := f.Write(privateKey); err != nil { + log.Error("f.Write: %v", err) + return errors.New("unexpected error") + } + + privateKeyPath = f.Name() + } + if err := git.Push(ctx, path, git.PushOptions{ + Remote: m.RemoteName, + Force: true, + Mirror: true, + Timeout: timeout, + PrivateKeyPath: privateKeyPath, + }); err != nil { + log.Error("Error pushing %s mirror[%d] remote %s: %v", path, m.ID, m.RemoteName, err) + + return util.SanitizeErrorCredentialURLs(err) + } + + return nil + } + + err := performPush(m.Repo, false) + if err != nil { + return err + } + + if m.Repo.HasWiki() { + _, err := git.GetRemoteAddress(ctx, m.Repo.WikiPath(), m.RemoteName) + if err == nil { + err := performPush(m.Repo, true) + if err != nil { + return err + } + } else { + log.Trace("Skipping wiki: No remote configured") + } + } + + return nil +} + +func pushAllLFSObjects(ctx context.Context, gitRepo *git.Repository, lfsClient lfs.Client) error { + contentStore := lfs.NewContentStore() + + pointerChan := make(chan lfs.PointerBlob) + errChan := make(chan error, 1) + go lfs.SearchPointerBlobs(ctx, gitRepo, pointerChan, errChan) + + uploadObjects := func(pointers []lfs.Pointer) error { + err := lfsClient.Upload(ctx, pointers, func(p lfs.Pointer, objectError error) (io.ReadCloser, error) { + if objectError != nil { + return nil, objectError + } + + content, err := contentStore.Get(p) + if err != nil { + log.Error("Error reading LFS object %v: %v", p, err) + } + return content, err + }) + if err != nil { + select { + case <-ctx.Done(): + return nil + default: + } + } + return err + } + + var batch []lfs.Pointer + for pointerBlob := range pointerChan { + exists, err := contentStore.Exists(pointerBlob.Pointer) + if err != nil { + log.Error("Error checking if LFS object %v exists: %v", pointerBlob.Pointer, err) + return err + } + if !exists { + log.Trace("Skipping missing LFS object %v", pointerBlob.Pointer) + continue + } + + batch = append(batch, pointerBlob.Pointer) + if len(batch) >= lfsClient.BatchSize() { + if err := uploadObjects(batch); err != nil { + return err + } + batch = nil + } + } + if len(batch) > 0 { + if err := uploadObjects(batch); err != nil { + return err + } + } + + err, has := <-errChan + if has { + log.Error("Error enumerating LFS objects for repository: %v", err) + return err + } + + return nil +} + +func syncPushMirrorWithSyncOnCommit(ctx context.Context, repoID int64) { + pushMirrors, err := repo_model.GetPushMirrorsSyncedOnCommit(ctx, repoID) + if err != nil { + log.Error("repo_model.GetPushMirrorsSyncedOnCommit failed: %v", err) + return + } + + for _, mirror := range pushMirrors { + AddPushMirrorToQueue(mirror.ID) + } +} -- cgit v1.2.3