From dd136858f1ea40ad3c94191d647487fa4f31926c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 18 Oct 2024 20:33:49 +0200 Subject: Adding upstream version 9.0.0. Signed-off-by: Daniel Baumann --- modules/setting/queue.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 modules/setting/queue.go (limited to 'modules/setting/queue.go') diff --git a/modules/setting/queue.go b/modules/setting/queue.go new file mode 100644 index 0000000..251a6c1 --- /dev/null +++ b/modules/setting/queue.go @@ -0,0 +1,120 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package setting + +import ( + "path/filepath" + "runtime" + + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" +) + +// QueueSettings represent the settings for a queue from the ini +type QueueSettings struct { + Name string // not an INI option, it is the name for [queue.the-name] section + + Type string + Datadir string + ConnStr string // for leveldb or redis + Length int // max queue length before blocking + + QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue + + BatchLength int + MaxWorkers int +} + +func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) { + queueSettingsDefault := QueueSettings{ + Type: "level", // dummy, channel, level, redis + Datadir: "queues/common", // relative to AppDataPath + Length: 100000, // queue length before a channel queue will block + + QueueName: "_queue", + SetName: "_unique", + BatchLength: 20, + MaxWorkers: runtime.NumCPU() / 2, + } + if queueSettingsDefault.MaxWorkers < 1 { + queueSettingsDefault.MaxWorkers = 1 + } + if queueSettingsDefault.MaxWorkers > 10 { + queueSettingsDefault.MaxWorkers = 10 + } + + // deep copy default settings + cfg := QueueSettings{} + if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil { + return cfg, err + } else if err = json.Unmarshal(cfgBs, &cfg); err != nil { + return cfg, err + } + + cfg.Name = name + if sec, err := rootCfg.GetSection("queue"); err == nil { + if err = sec.MapTo(&cfg); err != nil { + log.Error("Failed to map queue common config for %q: %v", name, err) + return cfg, nil + } + } + if sec, err := rootCfg.GetSection("queue." + name); err == nil { + if err = sec.MapTo(&cfg); err != nil { + log.Error("Failed to map queue spec config for %q: %v", name, err) + return cfg, nil + } + if sec.HasKey("CONN_STR") { + cfg.ConnStr = sec.Key("CONN_STR").String() + } + } + + if cfg.Datadir == "" { + cfg.Datadir = queueSettingsDefault.Datadir + } + if !filepath.IsAbs(cfg.Datadir) { + cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir) + } + cfg.Datadir = filepath.ToSlash(cfg.Datadir) + + if cfg.Type == "redis" && cfg.ConnStr == "" { + cfg.ConnStr = "redis://127.0.0.1:6379/0" + } + + if cfg.Length <= 0 { + cfg.Length = queueSettingsDefault.Length + } + if cfg.MaxWorkers <= 0 { + cfg.MaxWorkers = queueSettingsDefault.MaxWorkers + } + if cfg.BatchLength <= 0 { + cfg.BatchLength = queueSettingsDefault.BatchLength + } + + return cfg, nil +} + +func LoadQueueSettings() { + loadQueueFrom(CfgProvider) +} + +func loadQueueFrom(rootCfg ConfigProvider) { + hasOld := false + handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) { + if rootCfg.Section(oldSection).HasKey(oldKey) { + hasOld = true + log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName) + } + } + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE") + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER") + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR") + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR") + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN") + handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN") + handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH") + handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH") + if hasOld { + log.Fatal("Please update your app.ini to remove deprecated config options") + } +} -- cgit v1.2.3