summaryrefslogtreecommitdiffstats
path: root/services/mirror
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--services/mirror/mirror.go146
-rw-r--r--services/mirror/mirror_pull.go628
-rw-r--r--services/mirror/mirror_push.go313
-rw-r--r--services/mirror/mirror_test.go46
-rw-r--r--services/mirror/notifier.go31
-rw-r--r--services/mirror/queue.go70
6 files changed, 1234 insertions, 0 deletions
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
new file mode 100644
index 0000000..bc2d671
--- /dev/null
+++ b/services/mirror/mirror.go
@@ -0,0 +1,146 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package mirror
+
+import (
+ "context"
+ "fmt"
+
+ quota_model "code.gitea.io/gitea/models/quota"
+ repo_model "code.gitea.io/gitea/models/repo"
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
+ "code.gitea.io/gitea/modules/setting"
+)
+
+// doMirrorSync causes this request to mirror itself
+func doMirrorSync(ctx context.Context, req *SyncRequest) {
+ if req.ReferenceID == 0 {
+ log.Warn("Skipping mirror sync request, no mirror ID was specified")
+ return
+ }
+ switch req.Type {
+ case PushMirrorType:
+ _ = SyncPushMirror(ctx, req.ReferenceID)
+ case PullMirrorType:
+ _ = SyncPullMirror(ctx, req.ReferenceID)
+ default:
+ log.Error("Unknown Request type in queue: %v for MirrorID[%d]", req.Type, req.ReferenceID)
+ }
+}
+
+var errLimit = fmt.Errorf("reached limit")
+
+// Update checks and updates mirror repositories.
+func Update(ctx context.Context, pullLimit, pushLimit int) error {
+ if !setting.Mirror.Enabled {
+ log.Warn("Mirror feature disabled, but cron job enabled: skip update")
+ return nil
+ }
+ log.Trace("Doing: Update")
+
+ handler := func(bean any) error {
+ var repo *repo_model.Repository
+ var mirrorType SyncType
+ var referenceID int64
+
+ if m, ok := bean.(*repo_model.Mirror); ok {
+ if m.GetRepository(ctx) == nil {
+ log.Error("Disconnected mirror found: %d", m.ID)
+ return nil
+ }
+ repo = m.Repo
+ mirrorType = PullMirrorType
+ referenceID = m.RepoID
+ } else if m, ok := bean.(*repo_model.PushMirror); ok {
+ if m.GetRepository(ctx) == nil {
+ log.Error("Disconnected push-mirror found: %d", m.ID)
+ return nil
+ }
+ repo = m.Repo
+ mirrorType = PushMirrorType
+ referenceID = m.ID
+ } else {
+ log.Error("Unknown bean: %v", bean)
+ return nil
+ }
+
+ // Check we've not been cancelled
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("aborted")
+ default:
+ }
+
+ // Check if the repo's owner is over quota, for pull mirrors
+ if mirrorType == PullMirrorType {
+ ok, err := quota_model.EvaluateForUser(ctx, repo.OwnerID, quota_model.LimitSubjectSizeReposAll)
+ if err != nil {
+ log.Error("quota_model.EvaluateForUser: %v", err)
+ return err
+ }
+ if !ok {
+ log.Trace("Owner quota exceeded for %-v, not syncing", repo)
+ return nil
+ }
+ }
+
+ // Push to the Queue
+ if err := PushToQueue(mirrorType, referenceID); err != nil {
+ if err == queue.ErrAlreadyInQueue {
+ if mirrorType == PushMirrorType {
+ log.Trace("PushMirrors for %-v already queued for sync", repo)
+ } else {
+ log.Trace("PullMirrors for %-v already queued for sync", repo)
+ }
+ return nil
+ }
+ return err
+ }
+ return nil
+ }
+
+ pullMirrorsRequested := 0
+ if pullLimit != 0 {
+ if err := repo_model.MirrorsIterate(ctx, pullLimit, func(_ int, bean any) error {
+ if err := handler(bean); err != nil {
+ return err
+ }
+ pullMirrorsRequested++
+ return nil
+ }); err != nil && err != errLimit {
+ log.Error("MirrorsIterate: %v", err)
+ return err
+ }
+ }
+
+ pushMirrorsRequested := 0
+ if pushLimit != 0 {
+ if err := repo_model.PushMirrorsIterate(ctx, pushLimit, func(idx int, bean any) error {
+ if err := handler(bean); err != nil {
+ return err
+ }
+ pushMirrorsRequested++
+ return nil
+ }); err != nil && err != errLimit {
+ log.Error("PushMirrorsIterate: %v", err)
+ return err
+ }
+ }
+ log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested)
+ return nil
+}
+
+func queueHandler(items ...*SyncRequest) []*SyncRequest {
+ for _, req := range items {
+ doMirrorSync(graceful.GetManager().ShutdownContext(), req)
+ }
+ return nil
+}
+
+// InitSyncMirrors initializes a go routine to sync the mirrors
+func InitSyncMirrors() {
+ StartSyncMirrors(queueHandler)
+}
diff --git a/services/mirror/mirror_pull.go b/services/mirror/mirror_pull.go
new file mode 100644
index 0000000..9f7ffb2
--- /dev/null
+++ b/services/mirror/mirror_pull.go
@@ -0,0 +1,628 @@
+// Copyright 2021 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package mirror
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ repo_model "code.gitea.io/gitea/models/repo"
+ system_model "code.gitea.io/gitea/models/system"
+ "code.gitea.io/gitea/modules/cache"
+ "code.gitea.io/gitea/modules/git"
+ giturl "code.gitea.io/gitea/modules/git/url"
+ "code.gitea.io/gitea/modules/gitrepo"
+ "code.gitea.io/gitea/modules/lfs"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
+ "code.gitea.io/gitea/modules/proxy"
+ repo_module "code.gitea.io/gitea/modules/repository"
+ "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/timeutil"
+ "code.gitea.io/gitea/modules/util"
+ notify_service "code.gitea.io/gitea/services/notify"
+)
+
+// gitShortEmptySha Git short empty SHA
+const gitShortEmptySha = "0000000"
+
+// UpdateAddress writes new address to Git repository and database
+func UpdateAddress(ctx context.Context, m *repo_model.Mirror, addr string) error {
+ u, err := giturl.Parse(addr)
+ if err != nil {
+ return fmt.Errorf("invalid addr: %v", err)
+ }
+
+ remoteName := m.GetRemoteName()
+ repoPath := m.GetRepository(ctx).RepoPath()
+ // Remove old remote
+ _, _, err = git.NewCommand(ctx, "remote", "rm").AddDynamicArguments(remoteName).RunStdString(&git.RunOpts{Dir: repoPath})
+ if err != nil && !strings.HasPrefix(err.Error(), "exit status 128 - fatal: No such remote ") {
+ return err
+ }
+
+ cmd := git.NewCommand(ctx, "remote", "add").AddDynamicArguments(remoteName).AddArguments("--mirror=fetch").AddDynamicArguments(addr)
+ if strings.Contains(addr, "://") && strings.Contains(addr, "@") {
+ cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=fetch %s [repo_path: %s]", remoteName, util.SanitizeCredentialURLs(addr), repoPath))
+ } else {
+ cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=fetch %s [repo_path: %s]", remoteName, addr, repoPath))
+ }
+ _, _, err = cmd.RunStdString(&git.RunOpts{Dir: repoPath})
+ if err != nil && !strings.HasPrefix(err.Error(), "exit status 128 - fatal: No such remote ") {
+ return err
+ }
+
+ if m.Repo.HasWiki() {
+ wikiPath := m.Repo.WikiPath()
+ wikiRemotePath := repo_module.WikiRemoteURL(ctx, addr)
+ // Remove old remote of wiki
+ _, _, err = git.NewCommand(ctx, "remote", "rm").AddDynamicArguments(remoteName).RunStdString(&git.RunOpts{Dir: wikiPath})
+ if err != nil && !strings.HasPrefix(err.Error(), "exit status 128 - fatal: No such remote ") {
+ return err
+ }
+
+ cmd = git.NewCommand(ctx, "remote", "add").AddDynamicArguments(remoteName).AddArguments("--mirror=fetch").AddDynamicArguments(wikiRemotePath)
+ if strings.Contains(wikiRemotePath, "://") && strings.Contains(wikiRemotePath, "@") {
+ cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=fetch %s [repo_path: %s]", remoteName, util.SanitizeCredentialURLs(wikiRemotePath), wikiPath))
+ } else {
+ cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=fetch %s [repo_path: %s]", remoteName, wikiRemotePath, wikiPath))
+ }
+ _, _, err = cmd.RunStdString(&git.RunOpts{Dir: wikiPath})
+ if err != nil && !strings.HasPrefix(err.Error(), "exit status 128 - fatal: No such remote ") {
+ return err
+ }
+ }
+
+ // erase authentication before storing in database
+ u.User = nil
+ m.Repo.OriginalURL = u.String()
+ return repo_model.UpdateRepositoryCols(ctx, m.Repo, "original_url")
+}
+
+// mirrorSyncResult contains information of a updated reference.
+// If the oldCommitID is "0000000", it means a new reference, the value of newCommitID is empty.
+// If the newCommitID is "0000000", it means the reference is deleted, the value of oldCommitID is empty.
+type mirrorSyncResult struct {
+ refName git.RefName
+ oldCommitID string
+ newCommitID string
+}
+
+// parseRemoteUpdateOutput detects create, update and delete operations of references from upstream.
+// possible output example:
+/*
+// * [new tag] v0.1.8 -> v0.1.8
+// * [new branch] master -> origin/master
+// - [deleted] (none) -> origin/test // delete a branch
+// - [deleted] (none) -> 1 // delete a tag
+// 957a993..a87ba5f test -> origin/test
+// + f895a1e...957a993 test -> origin/test (forced update)
+*/
+// TODO: return whether it's a force update
+func parseRemoteUpdateOutput(output, remoteName string) []*mirrorSyncResult {
+ results := make([]*mirrorSyncResult, 0, 3)
+ lines := strings.Split(output, "\n")
+ for i := range lines {
+ // Make sure reference name is presented before continue
+ idx := strings.Index(lines[i], "-> ")
+ if idx == -1 {
+ continue
+ }
+
+ refName := strings.TrimSpace(lines[i][idx+3:])
+
+ switch {
+ case strings.HasPrefix(lines[i], " * [new tag]"): // new tag
+ results = append(results, &mirrorSyncResult{
+ refName: git.RefNameFromTag(refName),
+ oldCommitID: gitShortEmptySha,
+ })
+ case strings.HasPrefix(lines[i], " * [new branch]"): // new branch
+ refName = strings.TrimPrefix(refName, remoteName+"/")
+ results = append(results, &mirrorSyncResult{
+ refName: git.RefNameFromBranch(refName),
+ oldCommitID: gitShortEmptySha,
+ })
+ case strings.HasPrefix(lines[i], " - "): // Delete reference
+ isTag := !strings.HasPrefix(refName, remoteName+"/")
+ var refFullName git.RefName
+ if isTag {
+ refFullName = git.RefNameFromTag(refName)
+ } else {
+ refFullName = git.RefNameFromBranch(strings.TrimPrefix(refName, remoteName+"/"))
+ }
+ results = append(results, &mirrorSyncResult{
+ refName: refFullName,
+ newCommitID: gitShortEmptySha,
+ })
+ case strings.HasPrefix(lines[i], " + "): // Force update
+ if idx := strings.Index(refName, " "); idx > -1 {
+ refName = refName[:idx]
+ }
+ delimIdx := strings.Index(lines[i][3:], " ")
+ if delimIdx == -1 {
+ log.Error("SHA delimiter not found: %q", lines[i])
+ continue
+ }
+ shas := strings.Split(lines[i][3:delimIdx+3], "...")
+ if len(shas) != 2 {
+ log.Error("Expect two SHAs but not what found: %q", lines[i])
+ continue
+ }
+ results = append(results, &mirrorSyncResult{
+ refName: git.RefNameFromBranch(strings.TrimPrefix(refName, remoteName+"/")),
+ oldCommitID: shas[0],
+ newCommitID: shas[1],
+ })
+ case strings.HasPrefix(lines[i], " "): // New commits of a reference
+ delimIdx := strings.Index(lines[i][3:], " ")
+ if delimIdx == -1 {
+ log.Error("SHA delimiter not found: %q", lines[i])
+ continue
+ }
+ shas := strings.Split(lines[i][3:delimIdx+3], "..")
+ if len(shas) != 2 {
+ log.Error("Expect two SHAs but not what found: %q", lines[i])
+ continue
+ }
+ results = append(results, &mirrorSyncResult{
+ refName: git.RefNameFromBranch(strings.TrimPrefix(refName, remoteName+"/")),
+ oldCommitID: shas[0],
+ newCommitID: shas[1],
+ })
+
+ default:
+ log.Warn("parseRemoteUpdateOutput: unexpected update line %q", lines[i])
+ }
+ }
+ return results
+}
+
+func pruneBrokenReferences(ctx context.Context,
+ m *repo_model.Mirror,
+ repoPath string,
+ timeout time.Duration,
+ stdoutBuilder, stderrBuilder *strings.Builder,
+ isWiki bool,
+) error {
+ wiki := ""
+ if isWiki {
+ wiki = "Wiki "
+ }
+
+ stderrBuilder.Reset()
+ stdoutBuilder.Reset()
+ pruneErr := git.NewCommand(ctx, "remote", "prune").AddDynamicArguments(m.GetRemoteName()).
+ SetDescription(fmt.Sprintf("Mirror.runSync %ssPrune references: %s ", wiki, m.Repo.FullName())).
+ Run(&git.RunOpts{
+ Timeout: timeout,
+ Dir: repoPath,
+ Stdout: stdoutBuilder,
+ Stderr: stderrBuilder,
+ })
+ if pruneErr != nil {
+ stdout := stdoutBuilder.String()
+ stderr := stderrBuilder.String()
+
+ // sanitize the output, since it may contain the remote address, which may
+ // contain a password
+ stderrMessage := util.SanitizeCredentialURLs(stderr)
+ stdoutMessage := util.SanitizeCredentialURLs(stdout)
+
+ log.Error("Failed to prune mirror repository %s%-v references:\nStdout: %s\nStderr: %s\nErr: %v", wiki, m.Repo, stdoutMessage, stderrMessage, pruneErr)
+ desc := fmt.Sprintf("Failed to prune mirror repository %s'%s' references: %s", wiki, repoPath, stderrMessage)
+ if err := system_model.CreateRepositoryNotice(desc); err != nil {
+ log.Error("CreateRepositoryNotice: %v", err)
+ }
+ // this if will only be reached on a successful prune so try to get the mirror again
+ }
+ return pruneErr
+}
+
+// runSync returns true if sync finished without error.
+func runSync(ctx context.Context, m *repo_model.Mirror) ([]*mirrorSyncResult, bool) {
+ repoPath := m.Repo.RepoPath()
+ wikiPath := m.Repo.WikiPath()
+ timeout := time.Duration(setting.Git.Timeout.Mirror) * time.Second
+
+ log.Trace("SyncMirrors [repo: %-v]: running git remote update...", m.Repo)
+
+ // use fetch but not remote update because git fetch support --tags but remote update doesn't
+ cmd := git.NewCommand(ctx, "fetch")
+ if m.EnablePrune {
+ cmd.AddArguments("--prune")
+ }
+ cmd.AddArguments("--tags").AddDynamicArguments(m.GetRemoteName())
+
+ remoteURL, remoteErr := git.GetRemoteURL(ctx, repoPath, m.GetRemoteName())
+ if remoteErr != nil {
+ log.Error("SyncMirrors [repo: %-v]: GetRemoteAddress Error %v", m.Repo, remoteErr)
+ return nil, false
+ }
+
+ envs := proxy.EnvWithProxy(remoteURL.URL)
+
+ stdoutBuilder := strings.Builder{}
+ stderrBuilder := strings.Builder{}
+ if err := cmd.
+ SetDescription(fmt.Sprintf("Mirror.runSync: %s", m.Repo.FullName())).
+ Run(&git.RunOpts{
+ Timeout: timeout,
+ Dir: repoPath,
+ Env: envs,
+ Stdout: &stdoutBuilder,
+ Stderr: &stderrBuilder,
+ }); err != nil {
+ stdout := stdoutBuilder.String()
+ stderr := stderrBuilder.String()
+
+ // sanitize the output, since it may contain the remote address, which may contain a password
+ stderrMessage := util.SanitizeCredentialURLs(stderr)
+ stdoutMessage := util.SanitizeCredentialURLs(stdout)
+
+ // Now check if the error is a resolve reference due to broken reference
+ if strings.Contains(stderr, "unable to resolve reference") && strings.Contains(stderr, "reference broken") {
+ log.Warn("SyncMirrors [repo: %-v]: failed to update mirror repository due to broken references:\nStdout: %s\nStderr: %s\nErr: %v\nAttempting Prune", m.Repo, stdoutMessage, stderrMessage, err)
+ err = nil
+
+ // Attempt prune
+ pruneErr := pruneBrokenReferences(ctx, m, repoPath, timeout, &stdoutBuilder, &stderrBuilder, false)
+ if pruneErr == nil {
+ // Successful prune - reattempt mirror
+ stderrBuilder.Reset()
+ stdoutBuilder.Reset()
+ if err = cmd.
+ SetDescription(fmt.Sprintf("Mirror.runSync: %s", m.Repo.FullName())).
+ Run(&git.RunOpts{
+ Timeout: timeout,
+ Dir: repoPath,
+ Stdout: &stdoutBuilder,
+ Stderr: &stderrBuilder,
+ }); err != nil {
+ stdout := stdoutBuilder.String()
+ stderr := stderrBuilder.String()
+
+ // sanitize the output, since it may contain the remote address, which may
+ // contain a password
+ stderrMessage = util.SanitizeCredentialURLs(stderr)
+ stdoutMessage = util.SanitizeCredentialURLs(stdout)
+ }
+ }
+ }
+
+ // If there is still an error (or there always was an error)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to update mirror repository:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdoutMessage, stderrMessage, err)
+ desc := fmt.Sprintf("Failed to update mirror repository '%s': %s", repoPath, stderrMessage)
+ if err = system_model.CreateRepositoryNotice(desc); err != nil {
+ log.Error("CreateRepositoryNotice: %v", err)
+ }
+ return nil, false
+ }
+ }
+ output := stderrBuilder.String()
+
+ if err := git.WriteCommitGraph(ctx, repoPath); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: %v", m.Repo, err)
+ }
+
+ gitRepo, err := gitrepo.OpenRepository(ctx, m.Repo)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to OpenRepository: %v", m.Repo, err)
+ return nil, false
+ }
+
+ log.Trace("SyncMirrors [repo: %-v]: syncing branches...", m.Repo)
+ if _, err = repo_module.SyncRepoBranchesWithRepo(ctx, m.Repo, gitRepo, 0); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to synchronize branches: %v", m.Repo, err)
+ }
+
+ log.Trace("SyncMirrors [repo: %-v]: syncing releases with tags...", m.Repo)
+ if err = repo_module.SyncReleasesWithTags(ctx, m.Repo, gitRepo); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to synchronize tags to releases: %v", m.Repo, err)
+ }
+
+ if m.LFS && setting.LFS.StartServer {
+ log.Trace("SyncMirrors [repo: %-v]: syncing LFS objects...", m.Repo)
+ endpoint := lfs.DetermineEndpoint(remoteURL.String(), m.LFSEndpoint)
+ lfsClient := lfs.NewClient(endpoint, nil)
+ if err = repo_module.StoreMissingLfsObjectsInRepository(ctx, m.Repo, gitRepo, lfsClient); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to synchronize LFS objects for repository: %v", m.Repo, err)
+ }
+ }
+ gitRepo.Close()
+
+ log.Trace("SyncMirrors [repo: %-v]: updating size of repository", m.Repo)
+ if err := repo_module.UpdateRepoSize(ctx, m.Repo); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to update size for mirror repository: %v", m.Repo, err)
+ }
+
+ if m.Repo.HasWiki() {
+ log.Trace("SyncMirrors [repo: %-v Wiki]: running git remote update...", m.Repo)
+ stderrBuilder.Reset()
+ stdoutBuilder.Reset()
+ if err := git.NewCommand(ctx, "remote", "update", "--prune").AddDynamicArguments(m.GetRemoteName()).
+ SetDescription(fmt.Sprintf("Mirror.runSync Wiki: %s ", m.Repo.FullName())).
+ Run(&git.RunOpts{
+ Timeout: timeout,
+ Dir: wikiPath,
+ Stdout: &stdoutBuilder,
+ Stderr: &stderrBuilder,
+ }); err != nil {
+ stdout := stdoutBuilder.String()
+ stderr := stderrBuilder.String()
+
+ // sanitize the output, since it may contain the remote address, which may contain a password
+ stderrMessage := util.SanitizeCredentialURLs(stderr)
+ stdoutMessage := util.SanitizeCredentialURLs(stdout)
+
+ // Now check if the error is a resolve reference due to broken reference
+ if strings.Contains(stderrMessage, "unable to resolve reference") && strings.Contains(stderrMessage, "reference broken") {
+ log.Warn("SyncMirrors [repo: %-v Wiki]: failed to update mirror wiki repository due to broken references:\nStdout: %s\nStderr: %s\nErr: %v\nAttempting Prune", m.Repo, stdoutMessage, stderrMessage, err)
+ err = nil
+
+ // Attempt prune
+ pruneErr := pruneBrokenReferences(ctx, m, repoPath, timeout, &stdoutBuilder, &stderrBuilder, true)
+ if pruneErr == nil {
+ // Successful prune - reattempt mirror
+ stderrBuilder.Reset()
+ stdoutBuilder.Reset()
+
+ if err = git.NewCommand(ctx, "remote", "update", "--prune").AddDynamicArguments(m.GetRemoteName()).
+ SetDescription(fmt.Sprintf("Mirror.runSync Wiki: %s ", m.Repo.FullName())).
+ Run(&git.RunOpts{
+ Timeout: timeout,
+ Dir: wikiPath,
+ Stdout: &stdoutBuilder,
+ Stderr: &stderrBuilder,
+ }); err != nil {
+ stdout := stdoutBuilder.String()
+ stderr := stderrBuilder.String()
+ stderrMessage = util.SanitizeCredentialURLs(stderr)
+ stdoutMessage = util.SanitizeCredentialURLs(stdout)
+ }
+ }
+ }
+
+ // If there is still an error (or there always was an error)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v Wiki]: failed to update mirror repository wiki:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdoutMessage, stderrMessage, err)
+ desc := fmt.Sprintf("Failed to update mirror repository wiki '%s': %s", wikiPath, stderrMessage)
+ if err = system_model.CreateRepositoryNotice(desc); err != nil {
+ log.Error("CreateRepositoryNotice: %v", err)
+ }
+ return nil, false
+ }
+
+ if err := git.WriteCommitGraph(ctx, wikiPath); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: %v", m.Repo, err)
+ }
+ }
+ log.Trace("SyncMirrors [repo: %-v Wiki]: git remote update complete", m.Repo)
+ }
+
+ log.Trace("SyncMirrors [repo: %-v]: invalidating mirror branch caches...", m.Repo)
+ branches, _, err := gitrepo.GetBranchesByPath(ctx, m.Repo, 0, 0)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to GetBranches: %v", m.Repo, err)
+ return nil, false
+ }
+
+ for _, branch := range branches {
+ cache.Remove(m.Repo.GetCommitsCountCacheKey(branch.Name, true))
+ }
+
+ m.UpdatedUnix = timeutil.TimeStampNow()
+ return parseRemoteUpdateOutput(output, m.GetRemoteName()), true
+}
+
+// SyncPullMirror starts the sync of the pull mirror and schedules the next run.
+func SyncPullMirror(ctx context.Context, repoID int64) bool {
+ log.Trace("SyncMirrors [repo_id: %v]", repoID)
+ defer func() {
+ err := recover()
+ if err == nil {
+ return
+ }
+ // There was a panic whilst syncMirrors...
+ log.Error("PANIC whilst SyncMirrors[repo_id: %d] Panic: %v\nStacktrace: %s", repoID, err, log.Stack(2))
+ }()
+
+ m, err := repo_model.GetMirrorByRepoID(ctx, repoID)
+ if err != nil {
+ log.Error("SyncMirrors [repo_id: %v]: unable to GetMirrorByRepoID: %v", repoID, err)
+ return false
+ }
+ _ = m.GetRepository(ctx) // force load repository of mirror
+
+ ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Syncing Mirror %s/%s", m.Repo.OwnerName, m.Repo.Name))
+ defer finished()
+
+ log.Trace("SyncMirrors [repo: %-v]: Running Sync", m.Repo)
+ results, ok := runSync(ctx, m)
+ if !ok {
+ if err = repo_model.TouchMirror(ctx, m); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to TouchMirror: %v", m.Repo, err)
+ }
+ return false
+ }
+
+ log.Trace("SyncMirrors [repo: %-v]: Scheduling next update", m.Repo)
+ m.ScheduleNextUpdate()
+ if err = repo_model.UpdateMirror(ctx, m); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: failed to UpdateMirror with next update date: %v", m.Repo, err)
+ return false
+ }
+
+ gitRepo, err := gitrepo.OpenRepository(ctx, m.Repo)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to OpenRepository: %v", m.Repo, err)
+ return false
+ }
+ defer gitRepo.Close()
+
+ log.Trace("SyncMirrors [repo: %-v]: %d branches updated", m.Repo, len(results))
+ if len(results) > 0 {
+ if ok := checkAndUpdateEmptyRepository(ctx, m, results); !ok {
+ log.Error("SyncMirrors [repo: %-v]: checkAndUpdateEmptyRepository: %v", m.Repo, err)
+ return false
+ }
+ }
+
+ for _, result := range results {
+ // Discard GitHub pull requests, i.e. refs/pull/*
+ if result.refName.IsPull() {
+ continue
+ }
+
+ // Create reference
+ if result.oldCommitID == gitShortEmptySha {
+ commitID, err := gitRepo.GetRefCommitID(result.refName.String())
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to GetRefCommitID [ref_name: %s]: %v", m.Repo, result.refName, err)
+ continue
+ }
+ objectFormat := git.ObjectFormatFromName(m.Repo.ObjectFormatName)
+ notify_service.SyncPushCommits(ctx, m.Repo.MustOwner(ctx), m.Repo, &repo_module.PushUpdateOptions{
+ RefFullName: result.refName,
+ OldCommitID: objectFormat.EmptyObjectID().String(),
+ NewCommitID: commitID,
+ }, repo_module.NewPushCommits())
+ notify_service.SyncCreateRef(ctx, m.Repo.MustOwner(ctx), m.Repo, result.refName, commitID)
+ continue
+ }
+
+ // Delete reference
+ if result.newCommitID == gitShortEmptySha {
+ notify_service.SyncDeleteRef(ctx, m.Repo.MustOwner(ctx), m.Repo, result.refName)
+ continue
+ }
+
+ // Push commits
+ oldCommitID, err := git.GetFullCommitID(gitRepo.Ctx, gitRepo.Path, result.oldCommitID)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to get GetFullCommitID[%s]: %v", m.Repo, result.oldCommitID, err)
+ continue
+ }
+ newCommitID, err := git.GetFullCommitID(gitRepo.Ctx, gitRepo.Path, result.newCommitID)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to get GetFullCommitID [%s]: %v", m.Repo, result.newCommitID, err)
+ continue
+ }
+ commits, err := gitRepo.CommitsBetweenIDs(newCommitID, oldCommitID)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to get CommitsBetweenIDs [new_commit_id: %s, old_commit_id: %s]: %v", m.Repo, newCommitID, oldCommitID, err)
+ continue
+ }
+
+ theCommits := repo_module.GitToPushCommits(commits)
+ if len(theCommits.Commits) > setting.UI.FeedMaxCommitNum {
+ theCommits.Commits = theCommits.Commits[:setting.UI.FeedMaxCommitNum]
+ }
+
+ newCommit, err := gitRepo.GetCommit(newCommitID)
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to get commit %s: %v", m.Repo, newCommitID, err)
+ continue
+ }
+
+ theCommits.HeadCommit = repo_module.CommitToPushCommit(newCommit)
+ theCommits.CompareURL = m.Repo.ComposeCompareURL(oldCommitID, newCommitID)
+
+ notify_service.SyncPushCommits(ctx, m.Repo.MustOwner(ctx), m.Repo, &repo_module.PushUpdateOptions{
+ RefFullName: result.refName,
+ OldCommitID: oldCommitID,
+ NewCommitID: newCommitID,
+ }, theCommits)
+ }
+ log.Trace("SyncMirrors [repo: %-v]: done notifying updated branches/tags - now updating last commit time", m.Repo)
+
+ isEmpty, err := gitRepo.IsEmpty()
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to check empty git repo: %v", m.Repo, err)
+ return false
+ }
+ if !isEmpty {
+ // Get latest commit date and update to current repository updated time
+ commitDate, err := git.GetLatestCommitTime(ctx, m.Repo.RepoPath())
+ if err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to GetLatestCommitDate: %v", m.Repo, err)
+ return false
+ }
+
+ if err = repo_model.UpdateRepositoryUpdatedTime(ctx, m.RepoID, commitDate); err != nil {
+ log.Error("SyncMirrors [repo: %-v]: unable to update repository 'updated_unix': %v", m.Repo, err)
+ return false
+ }
+ }
+
+ log.Trace("SyncMirrors [repo: %-v]: Successfully updated", m.Repo)
+
+ return true
+}
+
+func checkAndUpdateEmptyRepository(ctx context.Context, m *repo_model.Mirror, results []*mirrorSyncResult) bool {
+ if !m.Repo.IsEmpty {
+ return true
+ }
+
+ hasDefault := false
+ hasMaster := false
+ hasMain := false
+ defaultBranchName := m.Repo.DefaultBranch
+ if len(defaultBranchName) == 0 {
+ defaultBranchName = setting.Repository.DefaultBranch
+ }
+ firstName := ""
+ for _, result := range results {
+ if !result.refName.IsBranch() {
+ continue
+ }
+
+ name := result.refName.BranchName()
+ if len(firstName) == 0 {
+ firstName = name
+ }
+
+ hasDefault = hasDefault || name == defaultBranchName
+ hasMaster = hasMaster || name == "master"
+ hasMain = hasMain || name == "main"
+ }
+
+ if len(firstName) > 0 {
+ if hasDefault {
+ m.Repo.DefaultBranch = defaultBranchName
+ } else if hasMaster {
+ m.Repo.DefaultBranch = "master"
+ } else if hasMain {
+ m.Repo.DefaultBranch = "main"
+ } else {
+ m.Repo.DefaultBranch = firstName
+ }
+ // Update the git repository default branch
+ if err := gitrepo.SetDefaultBranch(ctx, m.Repo, m.Repo.DefaultBranch); err != nil {
+ if !git.IsErrUnsupportedVersion(err) {
+ log.Error("Failed to update default branch of underlying git repository %-v. Error: %v", m.Repo, err)
+ desc := fmt.Sprintf("Failed to update default branch of underlying git repository '%s': %v", m.Repo.RepoPath(), err)
+ if err = system_model.CreateRepositoryNotice(desc); err != nil {
+ log.Error("CreateRepositoryNotice: %v", err)
+ }
+ return false
+ }
+ }
+ m.Repo.IsEmpty = false
+ // Update the is empty and default_branch columns
+ if err := repo_model.UpdateRepositoryCols(ctx, m.Repo, "default_branch", "is_empty"); err != nil {
+ log.Error("Failed to update default branch of repository %-v. Error: %v", m.Repo, err)
+ desc := fmt.Sprintf("Failed to update default branch of repository '%s': %v", m.Repo.RepoPath(), err)
+ if err = system_model.CreateRepositoryNotice(desc); err != nil {
+ log.Error("CreateRepositoryNotice: %v", err)
+ }
+ return false
+ }
+ }
+ return true
+}
diff --git a/services/mirror/mirror_push.go b/services/mirror/mirror_push.go
new file mode 100644
index 0000000..3a9644c
--- /dev/null
+++ b/services/mirror/mirror_push.go
@@ -0,0 +1,313 @@
+// Copyright 2021 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package mirror
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "regexp"
+ "strings"
+ "time"
+
+ "code.gitea.io/gitea/models/db"
+ repo_model "code.gitea.io/gitea/models/repo"
+ "code.gitea.io/gitea/modules/git"
+ "code.gitea.io/gitea/modules/gitrepo"
+ "code.gitea.io/gitea/modules/lfs"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
+ "code.gitea.io/gitea/modules/repository"
+ "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/timeutil"
+ "code.gitea.io/gitea/modules/util"
+)
+
+var stripExitStatus = regexp.MustCompile(`exit status \d+ - `)
+
+// AddPushMirrorRemote registers the push mirror remote.
+var AddPushMirrorRemote = addPushMirrorRemote
+
+func addPushMirrorRemote(ctx context.Context, m *repo_model.PushMirror, addr string) error {
+ addRemoteAndConfig := func(addr, path string) error {
+ cmd := git.NewCommand(ctx, "remote", "add", "--mirror=push").AddDynamicArguments(m.RemoteName, addr)
+ if strings.Contains(addr, "://") && strings.Contains(addr, "@") {
+ cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=push %s [repo_path: %s]", m.RemoteName, util.SanitizeCredentialURLs(addr), path))
+ } else {
+ cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=push %s [repo_path: %s]", m.RemoteName, addr, path))
+ }
+ if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: path}); err != nil {
+ return err
+ }
+ if _, _, err := git.NewCommand(ctx, "config", "--add").AddDynamicArguments("remote."+m.RemoteName+".push", "+refs/heads/*:refs/heads/*").RunStdString(&git.RunOpts{Dir: path}); err != nil {
+ return err
+ }
+ if _, _, err := git.NewCommand(ctx, "config", "--add").AddDynamicArguments("remote."+m.RemoteName+".push", "+refs/tags/*:refs/tags/*").RunStdString(&git.RunOpts{Dir: path}); err != nil {
+ return err
+ }
+ return nil
+ }
+
+ if err := addRemoteAndConfig(addr, m.Repo.RepoPath()); err != nil {
+ return err
+ }
+
+ if m.Repo.HasWiki() {
+ wikiRemoteURL := repository.WikiRemoteURL(ctx, addr)
+ if len(wikiRemoteURL) > 0 {
+ if err := addRemoteAndConfig(wikiRemoteURL, m.Repo.WikiPath()); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+// RemovePushMirrorRemote removes the push mirror remote.
+func RemovePushMirrorRemote(ctx context.Context, m *repo_model.PushMirror) error {
+ cmd := git.NewCommand(ctx, "remote", "rm").AddDynamicArguments(m.RemoteName)
+ _ = m.GetRepository(ctx)
+
+ if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: m.Repo.RepoPath()}); err != nil {
+ return err
+ }
+
+ if m.Repo.HasWiki() {
+ if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: m.Repo.WikiPath()}); err != nil {
+ // The wiki remote may not exist
+ log.Warn("Wiki Remote[%d] could not be removed: %v", m.ID, err)
+ }
+ }
+
+ return nil
+}
+
+// SyncPushMirror starts the sync of the push mirror and schedules the next run.
+func SyncPushMirror(ctx context.Context, mirrorID int64) bool {
+ log.Trace("SyncPushMirror [mirror: %d]", mirrorID)
+ defer func() {
+ err := recover()
+ if err == nil {
+ return
+ }
+ // There was a panic whilst syncPushMirror...
+ log.Error("PANIC whilst syncPushMirror[%d] Panic: %v\nStacktrace: %s", mirrorID, err, log.Stack(2))
+ }()
+
+ // TODO: Handle "!exist" better
+ m, exist, err := db.GetByID[repo_model.PushMirror](ctx, mirrorID)
+ if err != nil || !exist {
+ log.Error("GetPushMirrorByID [%d]: %v", mirrorID, err)
+ return false
+ }
+
+ _ = m.GetRepository(ctx)
+
+ m.LastError = ""
+
+ ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Syncing PushMirror %s/%s to %s", m.Repo.OwnerName, m.Repo.Name, m.RemoteName))
+ defer finished()
+
+ log.Trace("SyncPushMirror [mirror: %d][repo: %-v]: Running Sync", m.ID, m.Repo)
+ err = runPushSync(ctx, m)
+ if err != nil {
+ log.Error("SyncPushMirror [mirror: %d][repo: %-v]: %v", m.ID, m.Repo, err)
+ m.LastError = stripExitStatus.ReplaceAllLiteralString(err.Error(), "")
+ }
+
+ m.LastUpdateUnix = timeutil.TimeStampNow()
+
+ if err := repo_model.UpdatePushMirror(ctx, m); err != nil {
+ log.Error("UpdatePushMirror [%d]: %v", m.ID, err)
+
+ return false
+ }
+
+ log.Trace("SyncPushMirror [mirror: %d][repo: %-v]: Finished", m.ID, m.Repo)
+
+ return err == nil
+}
+
+func runPushSync(ctx context.Context, m *repo_model.PushMirror) error {
+ timeout := time.Duration(setting.Git.Timeout.Mirror) * time.Second
+
+ performPush := func(repo *repo_model.Repository, isWiki bool) error {
+ path := repo.RepoPath()
+ if isWiki {
+ path = repo.WikiPath()
+ }
+ remoteURL, err := git.GetRemoteURL(ctx, path, m.RemoteName)
+ if err != nil {
+ log.Error("GetRemoteAddress(%s) Error %v", path, err)
+ return errors.New("Unexpected error")
+ }
+
+ if setting.LFS.StartServer {
+ log.Trace("SyncMirrors [repo: %-v]: syncing LFS objects...", m.Repo)
+
+ var gitRepo *git.Repository
+ if isWiki {
+ gitRepo, err = gitrepo.OpenWikiRepository(ctx, repo)
+ } else {
+ gitRepo, err = gitrepo.OpenRepository(ctx, repo)
+ }
+ if err != nil {
+ log.Error("OpenRepository: %v", err)
+ return errors.New("Unexpected error")
+ }
+ defer gitRepo.Close()
+
+ endpoint := lfs.DetermineEndpoint(remoteURL.String(), "")
+ lfsClient := lfs.NewClient(endpoint, nil)
+ if err := pushAllLFSObjects(ctx, gitRepo, lfsClient); err != nil {
+ return util.SanitizeErrorCredentialURLs(err)
+ }
+ }
+
+ log.Trace("Pushing %s mirror[%d] remote %s", path, m.ID, m.RemoteName)
+
+ // OpenSSH isn't very intuitive when you want to specify a specific keypair.
+ // Therefore, we need to create a temporary file that stores the private key, so that OpenSSH can use it.
+ // We delete the the temporary file afterwards.
+ privateKeyPath := ""
+ if m.PublicKey != "" {
+ f, err := os.CreateTemp(os.TempDir(), m.RemoteName)
+ if err != nil {
+ log.Error("os.CreateTemp: %v", err)
+ return errors.New("unexpected error")
+ }
+
+ defer func() {
+ f.Close()
+ if err := os.Remove(f.Name()); err != nil {
+ log.Error("os.Remove: %v", err)
+ }
+ }()
+
+ privateKey, err := m.Privatekey()
+ if err != nil {
+ log.Error("Privatekey: %v", err)
+ return errors.New("unexpected error")
+ }
+
+ if _, err := f.Write(privateKey); err != nil {
+ log.Error("f.Write: %v", err)
+ return errors.New("unexpected error")
+ }
+
+ privateKeyPath = f.Name()
+ }
+ if err := git.Push(ctx, path, git.PushOptions{
+ Remote: m.RemoteName,
+ Force: true,
+ Mirror: true,
+ Timeout: timeout,
+ PrivateKeyPath: privateKeyPath,
+ }); err != nil {
+ log.Error("Error pushing %s mirror[%d] remote %s: %v", path, m.ID, m.RemoteName, err)
+
+ return util.SanitizeErrorCredentialURLs(err)
+ }
+
+ return nil
+ }
+
+ err := performPush(m.Repo, false)
+ if err != nil {
+ return err
+ }
+
+ if m.Repo.HasWiki() {
+ _, err := git.GetRemoteAddress(ctx, m.Repo.WikiPath(), m.RemoteName)
+ if err == nil {
+ err := performPush(m.Repo, true)
+ if err != nil {
+ return err
+ }
+ } else {
+ log.Trace("Skipping wiki: No remote configured")
+ }
+ }
+
+ return nil
+}
+
+func pushAllLFSObjects(ctx context.Context, gitRepo *git.Repository, lfsClient lfs.Client) error {
+ contentStore := lfs.NewContentStore()
+
+ pointerChan := make(chan lfs.PointerBlob)
+ errChan := make(chan error, 1)
+ go lfs.SearchPointerBlobs(ctx, gitRepo, pointerChan, errChan)
+
+ uploadObjects := func(pointers []lfs.Pointer) error {
+ err := lfsClient.Upload(ctx, pointers, func(p lfs.Pointer, objectError error) (io.ReadCloser, error) {
+ if objectError != nil {
+ return nil, objectError
+ }
+
+ content, err := contentStore.Get(p)
+ if err != nil {
+ log.Error("Error reading LFS object %v: %v", p, err)
+ }
+ return content, err
+ })
+ if err != nil {
+ select {
+ case <-ctx.Done():
+ return nil
+ default:
+ }
+ }
+ return err
+ }
+
+ var batch []lfs.Pointer
+ for pointerBlob := range pointerChan {
+ exists, err := contentStore.Exists(pointerBlob.Pointer)
+ if err != nil {
+ log.Error("Error checking if LFS object %v exists: %v", pointerBlob.Pointer, err)
+ return err
+ }
+ if !exists {
+ log.Trace("Skipping missing LFS object %v", pointerBlob.Pointer)
+ continue
+ }
+
+ batch = append(batch, pointerBlob.Pointer)
+ if len(batch) >= lfsClient.BatchSize() {
+ if err := uploadObjects(batch); err != nil {
+ return err
+ }
+ batch = nil
+ }
+ }
+ if len(batch) > 0 {
+ if err := uploadObjects(batch); err != nil {
+ return err
+ }
+ }
+
+ err, has := <-errChan
+ if has {
+ log.Error("Error enumerating LFS objects for repository: %v", err)
+ return err
+ }
+
+ return nil
+}
+
+func syncPushMirrorWithSyncOnCommit(ctx context.Context, repoID int64) {
+ pushMirrors, err := repo_model.GetPushMirrorsSyncedOnCommit(ctx, repoID)
+ if err != nil {
+ log.Error("repo_model.GetPushMirrorsSyncedOnCommit failed: %v", err)
+ return
+ }
+
+ for _, mirror := range pushMirrors {
+ AddPushMirrorToQueue(mirror.ID)
+ }
+}
diff --git a/services/mirror/mirror_test.go b/services/mirror/mirror_test.go
new file mode 100644
index 0000000..8ad524b
--- /dev/null
+++ b/services/mirror/mirror_test.go
@@ -0,0 +1,46 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package mirror
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_parseRemoteUpdateOutput(t *testing.T) {
+ output := `
+ * [new tag] v0.1.8 -> v0.1.8
+ * [new branch] master -> origin/master
+ - [deleted] (none) -> origin/test1
+ - [deleted] (none) -> tag1
+ + f895a1e...957a993 test2 -> origin/test2 (forced update)
+ 957a993..a87ba5f test3 -> origin/test3
+`
+ results := parseRemoteUpdateOutput(output, "origin")
+ assert.Len(t, results, 6)
+ assert.EqualValues(t, "refs/tags/v0.1.8", results[0].refName.String())
+ assert.EqualValues(t, gitShortEmptySha, results[0].oldCommitID)
+ assert.EqualValues(t, "", results[0].newCommitID)
+
+ assert.EqualValues(t, "refs/heads/master", results[1].refName.String())
+ assert.EqualValues(t, gitShortEmptySha, results[1].oldCommitID)
+ assert.EqualValues(t, "", results[1].newCommitID)
+
+ assert.EqualValues(t, "refs/heads/test1", results[2].refName.String())
+ assert.EqualValues(t, "", results[2].oldCommitID)
+ assert.EqualValues(t, gitShortEmptySha, results[2].newCommitID)
+
+ assert.EqualValues(t, "refs/tags/tag1", results[3].refName.String())
+ assert.EqualValues(t, "", results[3].oldCommitID)
+ assert.EqualValues(t, gitShortEmptySha, results[3].newCommitID)
+
+ assert.EqualValues(t, "refs/heads/test2", results[4].refName.String())
+ assert.EqualValues(t, "f895a1e", results[4].oldCommitID)
+ assert.EqualValues(t, "957a993", results[4].newCommitID)
+
+ assert.EqualValues(t, "refs/heads/test3", results[5].refName.String())
+ assert.EqualValues(t, "957a993", results[5].oldCommitID)
+ assert.EqualValues(t, "a87ba5f", results[5].newCommitID)
+}
diff --git a/services/mirror/notifier.go b/services/mirror/notifier.go
new file mode 100644
index 0000000..93d9044
--- /dev/null
+++ b/services/mirror/notifier.go
@@ -0,0 +1,31 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package mirror
+
+import (
+ "context"
+
+ repo_model "code.gitea.io/gitea/models/repo"
+ user_model "code.gitea.io/gitea/models/user"
+ "code.gitea.io/gitea/modules/repository"
+ notify_service "code.gitea.io/gitea/services/notify"
+)
+
+func init() {
+ notify_service.RegisterNotifier(&mirrorNotifier{})
+}
+
+type mirrorNotifier struct {
+ notify_service.NullNotifier
+}
+
+var _ notify_service.Notifier = &mirrorNotifier{}
+
+func (m *mirrorNotifier) PushCommits(ctx context.Context, _ *user_model.User, repo *repo_model.Repository, _ *repository.PushUpdateOptions, _ *repository.PushCommits) {
+ syncPushMirrorWithSyncOnCommit(ctx, repo.ID)
+}
+
+func (m *mirrorNotifier) SyncPushCommits(ctx context.Context, _ *user_model.User, repo *repo_model.Repository, _ *repository.PushUpdateOptions, _ *repository.PushCommits) {
+ syncPushMirrorWithSyncOnCommit(ctx, repo.ID)
+}
diff --git a/services/mirror/queue.go b/services/mirror/queue.go
new file mode 100644
index 0000000..0d9a624
--- /dev/null
+++ b/services/mirror/queue.go
@@ -0,0 +1,70 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package mirror
+
+import (
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
+ "code.gitea.io/gitea/modules/setting"
+)
+
+var mirrorQueue *queue.WorkerPoolQueue[*SyncRequest]
+
+// SyncType type of sync request
+type SyncType int
+
+const (
+ // PullMirrorType for pull mirrors
+ PullMirrorType SyncType = iota
+ // PushMirrorType for push mirrors
+ PushMirrorType
+)
+
+// SyncRequest for the mirror queue
+type SyncRequest struct {
+ Type SyncType
+ ReferenceID int64 // RepoID for pull mirror, MirrorID for push mirror
+}
+
+// StartSyncMirrors starts a go routine to sync the mirrors
+func StartSyncMirrors(queueHandle func(data ...*SyncRequest) []*SyncRequest) {
+ if !setting.Mirror.Enabled {
+ return
+ }
+ mirrorQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "mirror", queueHandle)
+ if mirrorQueue == nil {
+ log.Fatal("Unable to create mirror queue")
+ }
+ go graceful.GetManager().RunWithCancel(mirrorQueue)
+}
+
+// AddPullMirrorToQueue adds repoID to mirror queue
+func AddPullMirrorToQueue(repoID int64) {
+ addMirrorToQueue(PullMirrorType, repoID)
+}
+
+// AddPushMirrorToQueue adds the push mirror to the queue
+func AddPushMirrorToQueue(mirrorID int64) {
+ addMirrorToQueue(PushMirrorType, mirrorID)
+}
+
+func addMirrorToQueue(syncType SyncType, referenceID int64) {
+ if !setting.Mirror.Enabled {
+ return
+ }
+ go func() {
+ if err := PushToQueue(syncType, referenceID); err != nil {
+ log.Error("Unable to push sync request for to the queue for pull mirror repo[%d]. Error: %v", referenceID, err)
+ }
+ }()
+}
+
+// PushToQueue adds the sync request to the queue
+func PushToQueue(mirrorType SyncType, referenceID int64) error {
+ return mirrorQueue.Push(&SyncRequest{
+ Type: mirrorType,
+ ReferenceID: referenceID,
+ })
+}