diff options
Diffstat (limited to 'pkg/artifactcache/handler.go')
-rw-r--r-- | pkg/artifactcache/handler.go | 162 |
1 files changed, 91 insertions, 71 deletions
diff --git a/pkg/artifactcache/handler.go b/pkg/artifactcache/handler.go index 3178260..065c7dd 100644 --- a/pkg/artifactcache/handler.go +++ b/pkg/artifactcache/handler.go @@ -35,7 +35,7 @@ type Handler struct { server *http.Server logger logrus.FieldLogger - gcing int32 // TODO: use atomic.Bool when we can use Go 1.19 + gcing atomic.Bool gcAt time.Time outboundIP string @@ -170,7 +170,7 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Para } defer db.Close() - cache, err := h.findCache(db, keys, version) + cache, err := findCache(db, keys, version) if err != nil { h.responseJSON(w, r, 500, err) return @@ -206,32 +206,17 @@ func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.P api.Key = strings.ToLower(api.Key) cache := api.ToCache() - cache.FillKeyVersionHash() db, err := h.openDB() if err != nil { h.responseJSON(w, r, 500, err) return } defer db.Close() - if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { - if !errors.Is(err, bolthold.ErrNotFound) { - h.responseJSON(w, r, 500, err) - return - } - } else { - h.responseJSON(w, r, 400, fmt.Errorf("already exist")) - return - } now := time.Now().Unix() cache.CreatedAt = now cache.UsedAt = now - if err := db.Insert(bolthold.NextSequence(), cache); err != nil { - h.responseJSON(w, r, 500, err) - return - } - // write back id to db - if err := db.Update(cache.ID, cache); err != nil { + if err := insertCache(db, cache); err != nil { h.responseJSON(w, r, 500, err) return } @@ -364,56 +349,51 @@ func (h *Handler) middleware(handler httprouter.Handle) httprouter.Handle { } // if not found, return (nil, nil) instead of an error. -func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) { - if len(keys) == 0 { - return nil, nil - } - key := keys[0] // the first key is for exact match. - - cache := &Cache{ - Key: key, - Version: version, - } - cache.FillKeyVersionHash() - - if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { - if !errors.Is(err, bolthold.ErrNotFound) { - return nil, err +func findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) { + cache := &Cache{} + for _, prefix := range keys { + // if a key in the list matches exactly, don't return partial matches + if err := db.FindOne(cache, + bolthold.Where("Key").Eq(prefix). + And("Version").Eq(version). + And("Complete").Eq(true). + SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) { + if err != nil { + return nil, fmt.Errorf("find cache: %w", err) + } + return cache, nil } - } else if cache.Complete { - return cache, nil - } - stop := fmt.Errorf("stop") - - for _, prefix := range keys[1:] { - found := false prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix)) re, err := regexp.Compile(prefixPattern) if err != nil { continue } - if err := db.ForEach(bolthold.Where("Key").RegExp(re).And("Version").Eq(version).SortBy("CreatedAt").Reverse(), func(v *Cache) error { - if !strings.HasPrefix(v.Key, prefix) { - return stop - } - if v.Complete { - cache = v - found = true - return stop - } - return nil - }); err != nil { - if !errors.Is(err, stop) { - return nil, err + if err := db.FindOne(cache, + bolthold.Where("Key").RegExp(re). + And("Version").Eq(version). + And("Complete").Eq(true). + SortBy("CreatedAt").Reverse()); err != nil { + if errors.Is(err, bolthold.ErrNotFound) { + continue } + return nil, fmt.Errorf("find cache: %w", err) } - if found { - return cache, nil - } + return cache, nil } return nil, nil } +func insertCache(db *bolthold.Store, cache *Cache) error { + if err := db.Insert(bolthold.NextSequence(), cache); err != nil { + return fmt.Errorf("insert cache: %w", err) + } + // write back id to db + if err := db.Update(cache.ID, cache); err != nil { + return fmt.Errorf("write back id to db: %w", err) + } + return nil +} + func (h *Handler) useCache(id int64) { db, err := h.openDB() if err != nil { @@ -428,14 +408,21 @@ func (h *Handler) useCache(id int64) { _ = db.Update(cache.ID, cache) } +const ( + keepUsed = 30 * 24 * time.Hour + keepUnused = 7 * 24 * time.Hour + keepTemp = 5 * time.Minute + keepOld = 5 * time.Minute +) + func (h *Handler) gcCache() { - if atomic.LoadInt32(&h.gcing) != 0 { + if h.gcing.Load() { return } - if !atomic.CompareAndSwapInt32(&h.gcing, 0, 1) { + if !h.gcing.CompareAndSwap(false, true) { return } - defer atomic.StoreInt32(&h.gcing, 0) + defer h.gcing.Store(false) if time.Since(h.gcAt) < time.Hour { h.logger.Debugf("skip gc: %v", h.gcAt.String()) @@ -444,26 +431,21 @@ func (h *Handler) gcCache() { h.gcAt = time.Now() h.logger.Debugf("gc: %v", h.gcAt.String()) - const ( - keepUsed = 30 * 24 * time.Hour - keepUnused = 7 * 24 * time.Hour - keepTemp = 5 * time.Minute - ) - db, err := h.openDB() if err != nil { return } defer db.Close() + // Remove the caches which are not completed for a while, they are most likely to be broken. var caches []*Cache - if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix())); err != nil { + if err := db.Find(&caches, bolthold. + Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()). + And("Complete").Eq(false), + ); err != nil { h.logger.Warnf("find caches: %v", err) } else { for _, cache := range caches { - if cache.Complete { - continue - } h.storage.Remove(cache.ID) if err := db.Delete(cache.ID, cache); err != nil { h.logger.Warnf("delete cache: %v", err) @@ -473,8 +455,11 @@ func (h *Handler) gcCache() { } } + // Remove the old caches which have not been used recently. caches = caches[:0] - if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix())); err != nil { + if err := db.Find(&caches, bolthold. + Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()), + ); err != nil { h.logger.Warnf("find caches: %v", err) } else { for _, cache := range caches { @@ -487,8 +472,11 @@ func (h *Handler) gcCache() { } } + // Remove the old caches which are too old. caches = caches[:0] - if err := db.Find(&caches, bolthold.Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix())); err != nil { + if err := db.Find(&caches, bolthold. + Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()), + ); err != nil { h.logger.Warnf("find caches: %v", err) } else { for _, cache := range caches { @@ -500,6 +488,38 @@ func (h *Handler) gcCache() { h.logger.Infof("deleted cache: %+v", cache) } } + + // Remove the old caches with the same key and version, keep the latest one. + // Also keep the olds which have been used recently for a while in case of the cache is still in use. + if results, err := db.FindAggregate( + &Cache{}, + bolthold.Where("Complete").Eq(true), + "Key", "Version", + ); err != nil { + h.logger.Warnf("find aggregate caches: %v", err) + } else { + for _, result := range results { + if result.Count() <= 1 { + continue + } + result.Sort("CreatedAt") + caches = caches[:0] + result.Reduction(&caches) + for _, cache := range caches[:len(caches)-1] { + if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld { + // Keep it since it has been used recently, even if it's old. + // Or it could break downloading in process. + continue + } + h.storage.Remove(cache.ID) + if err := db.Delete(cache.ID, cache); err != nil { + h.logger.Warnf("delete cache: %v", err) + continue + } + h.logger.Infof("deleted cache: %+v", cache) + } + } + } } func (h *Handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) { |