summaryrefslogtreecommitdiffstats
path: root/modules/indexer
diff options
context:
space:
mode:
authorLauris BH <lauris@nix.lv>2022-01-27 09:30:51 +0100
committerGitHub <noreply@github.com>2022-01-27 09:30:51 +0100
commit8038610a4279862a87e630e4f1d1077c510f9d15 (patch)
tree802489f8ddde899e76643ea157f9020f12ca1490 /modules/indexer
parent[skip ci] Updated translations via Crowdin (diff)
downloadforgejo-8038610a4279862a87e630e4f1d1077c510f9d15.tar.xz
forgejo-8038610a4279862a87e630e4f1d1077c510f9d15.zip
Automatically pause queue if index service is unavailable (#15066)
* Handle keyword search error when issue indexer service is not available * Implement automatic disabling and resume of code indexer queue
Diffstat (limited to 'modules/indexer')
-rw-r--r--modules/indexer/code/bleve.go13
-rw-r--r--modules/indexer/code/elastic_search.go131
-rw-r--r--modules/indexer/code/indexer.go36
-rw-r--r--modules/indexer/code/indexer_test.go3
-rw-r--r--modules/indexer/code/search.go5
-rw-r--r--modules/indexer/code/wrapped.go25
-rw-r--r--modules/indexer/issues/bleve.go14
-rw-r--r--modules/indexer/issues/bleve_test.go3
-rw-r--r--modules/indexer/issues/db.go28
-rw-r--r--modules/indexer/issues/elastic_search.go123
-rw-r--r--modules/indexer/issues/indexer.go55
-rw-r--r--modules/indexer/issues/indexer_test.go17
12 files changed, 379 insertions, 74 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go
index cfadcfebd8..e2e1532095 100644
--- a/modules/indexer/code/bleve.go
+++ b/modules/indexer/code/bleve.go
@@ -271,6 +271,15 @@ func (b *BleveIndexer) Close() {
log.Info("PID: %d Repository Indexer closed", os.Getpid())
}
+// SetAvailabilityChangeCallback does nothing
+func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+}
+
+// Ping does nothing
+func (b *BleveIndexer) Ping() bool {
+ return true
+}
+
// Index indexes the data
func (b *BleveIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error {
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
@@ -319,7 +328,7 @@ func (b *BleveIndexer) Delete(repoID int64) error {
// Search searches for files in the specified repo.
// Returns the matching file-paths
-func (b *BleveIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
+func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
var (
indexerQuery query.Query
keywordQuery query.Query
@@ -372,7 +381,7 @@ func (b *BleveIndexer) Search(repoIDs []int64, language, keyword string, page, p
searchRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10))
}
- result, err := b.indexer.Search(searchRequest)
+ result, err := b.indexer.SearchInContext(ctx, searchRequest)
if err != nil {
return 0, nil, nil, err
}
diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go
index 9bd2fa301e..db37b4f66c 100644
--- a/modules/indexer/code/elastic_search.go
+++ b/modules/indexer/code/elastic_search.go
@@ -7,16 +7,20 @@ package code
import (
"bufio"
"context"
+ "errors"
"fmt"
"io"
+ "net"
"strconv"
"strings"
+ "sync"
"time"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/analyze"
"code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
@@ -39,8 +43,12 @@ var _ Indexer = &ElasticSearchIndexer{}
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
- client *elastic.Client
- indexerAliasName string
+ client *elastic.Client
+ indexerAliasName string
+ available bool
+ availabilityCallback func(bool)
+ stopTimer chan struct{}
+ lock sync.RWMutex
}
type elasticLogger struct {
@@ -78,7 +86,23 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bo
indexer := &ElasticSearchIndexer{
client: client,
indexerAliasName: indexerName,
+ available: true,
+ stopTimer: make(chan struct{}),
}
+
+ ticker := time.NewTicker(10 * time.Second)
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ indexer.checkAvailability()
+ case <-indexer.stopTimer:
+ ticker.Stop()
+ return
+ }
+ }
+ }()
+
exists, err := indexer.init()
if err != nil {
indexer.Close()
@@ -123,17 +147,17 @@ func (b *ElasticSearchIndexer) realIndexerName() string {
// Init will initialize the indexer
func (b *ElasticSearchIndexer) init() (bool, error) {
- ctx := context.Background()
+ ctx := graceful.GetManager().HammerContext()
exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
if !exists {
mapping := defaultMapping
createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
if !createIndex.Acknowledged {
return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping)
@@ -143,7 +167,7 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
// check version
r, err := b.client.Aliases().Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
realIndexerNames := r.IndicesByAlias(b.indexerAliasName)
@@ -152,10 +176,10 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
Add(b.realIndexerName(), b.indexerAliasName).
Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
if !res.Acknowledged {
- return false, fmt.Errorf("")
+ return false, fmt.Errorf("create alias %s to index %s failed", b.indexerAliasName, b.realIndexerName())
}
} else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() {
log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.",
@@ -165,16 +189,30 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
Add(b.realIndexerName(), b.indexerAliasName).
Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
if !res.Acknowledged {
- return false, fmt.Errorf("")
+ return false, fmt.Errorf("change alias %s to index %s failed", b.indexerAliasName, b.realIndexerName())
}
}
return exists, nil
}
+// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
+func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ b.availabilityCallback = callback
+}
+
+// Ping checks if elastic is available
+func (b *ElasticSearchIndexer) Ping() bool {
+ b.lock.RLock()
+ defer b.lock.RUnlock()
+ return b.available
+}
+
func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update fileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) {
// Ignore vendored files in code search
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
@@ -190,7 +228,7 @@ func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.Wr
return nil, err
}
if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil {
- return nil, fmt.Errorf("Misformatted git cat-file output: %v", err)
+ return nil, fmt.Errorf("misformatted git cat-file output: %v", err)
}
}
@@ -274,8 +312,8 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos
_, err := b.client.Bulk().
Index(b.indexerAliasName).
Add(reqs...).
- Do(context.Background())
- return err
+ Do(ctx)
+ return b.checkError(err)
}
return nil
}
@@ -284,8 +322,8 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos
func (b *ElasticSearchIndexer) Delete(repoID int64) error {
_, err := b.client.DeleteByQuery(b.indexerAliasName).
Query(elastic.NewTermsQuery("repo_id", repoID)).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
// indexPos find words positions for start and the following end on content. It will
@@ -366,7 +404,7 @@ func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages {
}
// Search searches for codes and language stats by given conditions.
-func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
+func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
searchType := esMultiMatchTypeBestFields
if isMatch {
searchType = esMultiMatchTypePhrasePrefix
@@ -407,9 +445,9 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
).
Sort("repo_id", true).
From(start).Size(pageSize).
- Do(context.Background())
+ Do(ctx)
if err != nil {
- return 0, nil, nil, err
+ return 0, nil, nil, b.checkError(err)
}
return convertResult(searchResult, kw, pageSize)
@@ -421,9 +459,9 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
Aggregation("language", aggregation).
Query(query).
Size(0). // We only needs stats information
- Do(context.Background())
+ Do(ctx)
if err != nil {
- return 0, nil, nil, err
+ return 0, nil, nil, b.checkError(err)
}
query = query.Must(langQuery)
@@ -438,9 +476,9 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
).
Sort("repo_id", true).
From(start).Size(pageSize).
- Do(context.Background())
+ Do(ctx)
if err != nil {
- return 0, nil, nil, err
+ return 0, nil, nil, b.checkError(err)
}
total, hits, _, err := convertResult(searchResult, kw, pageSize)
@@ -449,4 +487,51 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
}
// Close implements indexer
-func (b *ElasticSearchIndexer) Close() {}
+func (b *ElasticSearchIndexer) Close() {
+ select {
+ case <-b.stopTimer:
+ default:
+ close(b.stopTimer)
+ }
+}
+
+func (b *ElasticSearchIndexer) checkError(err error) error {
+ var opErr *net.OpError
+ if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
+ return err
+ }
+
+ b.setAvailability(false)
+
+ return err
+}
+
+func (b *ElasticSearchIndexer) checkAvailability() {
+ if b.Ping() {
+ return
+ }
+
+ // Request cluster state to check if elastic is available again
+ _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
+ if err != nil {
+ b.setAvailability(false)
+ return
+ }
+
+ b.setAvailability(true)
+}
+
+func (b *ElasticSearchIndexer) setAvailability(available bool) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ if b.available == available {
+ return
+ }
+
+ b.available = available
+ if b.availabilityCallback != nil {
+ // Call the callback from within the lock to ensure that the ordering remains correct
+ b.availabilityCallback(b.available)
+ }
+}
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index 9ae3abff60..d897fcccd5 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -42,9 +42,11 @@ type SearchResultLanguages struct {
// Indexer defines an interface to index and search code contents
type Indexer interface {
+ Ping() bool
+ SetAvailabilityChangeCallback(callback func(bool))
Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
- Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
+ Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
Close()
}
@@ -140,6 +142,7 @@ func Init() {
return data
}
+ unhandled := make([]queue.Data, 0, len(data))
for _, datum := range data {
indexerData, ok := datum.(*IndexerData)
if !ok {
@@ -150,10 +153,14 @@ func Init() {
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
log.Error("index: %v", err)
- continue
+ if indexer.Ping() {
+ continue
+ }
+ // Add back to queue
+ unhandled = append(unhandled, datum)
}
}
- return nil
+ return unhandled
}
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
@@ -212,6 +219,18 @@ func Init() {
indexer.set(rIndexer)
+ if queue, ok := indexerQueue.(queue.Pausable); ok {
+ rIndexer.SetAvailabilityChangeCallback(func(available bool) {
+ if !available {
+ log.Info("Code index queue paused")
+ queue.Pause()
+ } else {
+ log.Info("Code index queue resumed")
+ queue.Resume()
+ }
+ })
+ }
+
// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
@@ -262,6 +281,17 @@ func UpdateRepoIndexer(repo *repo_model.Repository) {
}
}
+// IsAvailable checks if issue indexer is available
+func IsAvailable() bool {
+ idx, err := indexer.get()
+ if err != nil {
+ log.Error("IsAvailable(): unable to get indexer: %v", err)
+ return false
+ }
+
+ return idx.Ping()
+}
+
// populateRepoIndexer populate the repo indexer with pre-existing data. This
// should only be run when the indexer is created for the first time.
func populateRepoIndexer(ctx context.Context) {
diff --git a/modules/indexer/code/indexer_test.go b/modules/indexer/code/indexer_test.go
index 0f9915c84b..d56c33653f 100644
--- a/modules/indexer/code/indexer_test.go
+++ b/modules/indexer/code/indexer_test.go
@@ -5,6 +5,7 @@
package code
import (
+ "context"
"path/filepath"
"testing"
@@ -65,7 +66,7 @@ func testIndexer(name string, t *testing.T, indexer Indexer) {
for _, kw := range keywords {
t.Run(kw.Keyword, func(t *testing.T) {
- total, res, langs, err := indexer.Search(kw.RepoIDs, "", kw.Keyword, 1, 10, false)
+ total, res, langs, err := indexer.Search(context.TODO(), kw.RepoIDs, "", kw.Keyword, 1, 10, false)
assert.NoError(t, err)
assert.EqualValues(t, len(kw.IDs), total)
assert.Len(t, langs, kw.Langs)
diff --git a/modules/indexer/code/search.go b/modules/indexer/code/search.go
index bb8dcf16b3..bb7715bafc 100644
--- a/modules/indexer/code/search.go
+++ b/modules/indexer/code/search.go
@@ -6,6 +6,7 @@ package code
import (
"bytes"
+ "context"
"strings"
"code.gitea.io/gitea/modules/highlight"
@@ -106,12 +107,12 @@ func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, erro
}
// PerformSearch perform a search on a repository
-func PerformSearch(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*SearchResultLanguages, error) {
+func PerformSearch(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*SearchResultLanguages, error) {
if len(keyword) == 0 {
return 0, nil, nil, nil
}
- total, results, resultLanguages, err := indexer.Search(repoIDs, language, keyword, page, pageSize, isMatch)
+ total, results, resultLanguages, err := indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch)
if err != nil {
return 0, nil, nil, err
}
diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go
index 56baadd6fc..ba58236fba 100644
--- a/modules/indexer/code/wrapped.go
+++ b/modules/indexer/code/wrapped.go
@@ -10,6 +10,7 @@ import (
"sync"
repo_model "code.gitea.io/gitea/models/repo"
+ "code.gitea.io/gitea/modules/log"
)
var indexer = newWrappedIndexer()
@@ -56,6 +57,26 @@ func (w *wrappedIndexer) get() (Indexer, error) {
return w.internal, nil
}
+// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
+func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+ indexer, err := w.get()
+ if err != nil {
+ log.Error("Failed to get indexer: %v", err)
+ return
+ }
+ indexer.SetAvailabilityChangeCallback(callback)
+}
+
+// Ping checks if elastic is available
+func (w *wrappedIndexer) Ping() bool {
+ indexer, err := w.get()
+ if err != nil {
+ log.Warn("Failed to get indexer: %v", err)
+ return false
+ }
+ return indexer.Ping()
+}
+
func (w *wrappedIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error {
indexer, err := w.get()
if err != nil {
@@ -72,12 +93,12 @@ func (w *wrappedIndexer) Delete(repoID int64) error {
return indexer.Delete(repoID)
}
-func (w *wrappedIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
+func (w *wrappedIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
indexer, err := w.get()
if err != nil {
return 0, nil, nil, err
}
- return indexer.Search(repoIDs, language, keyword, page, pageSize, isMatch)
+ return indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch)
}
func (w *wrappedIndexer) Close() {
diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go
index d986a0e55e..c298b7de3e 100644
--- a/modules/indexer/issues/bleve.go
+++ b/modules/indexer/issues/bleve.go
@@ -5,6 +5,7 @@
package issues
import (
+ "context"
"fmt"
"os"
"strconv"
@@ -186,6 +187,15 @@ func (b *BleveIndexer) Init() (bool, error) {
return false, err
}
+// SetAvailabilityChangeCallback does nothing
+func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+}
+
+// Ping does nothing
+func (b *BleveIndexer) Ping() bool {
+ return true
+}
+
// Close will close the bleve indexer
func (b *BleveIndexer) Close() {
if b.indexer != nil {
@@ -229,7 +239,7 @@ func (b *BleveIndexer) Delete(ids ...int64) error {
// Search searches for issues by given conditions.
// Returns the matching issue IDs
-func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
+func (b *BleveIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
var repoQueriesP []*query.NumericRangeQuery
for _, repoID := range repoIDs {
repoQueriesP = append(repoQueriesP, numericEqualityQuery(repoID, "RepoID"))
@@ -249,7 +259,7 @@ func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int)
search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false)
search.SortBy([]string{"-_score"})
- result, err := b.indexer.Search(search)
+ result, err := b.indexer.SearchInContext(ctx, search)
if err != nil {
return nil, err
}
diff --git a/modules/indexer/issues/bleve_test.go b/modules/indexer/issues/bleve_test.go
index df036fb573..926c32e242 100644
--- a/modules/indexer/issues/bleve_test.go
+++ b/modules/indexer/issues/bleve_test.go
@@ -5,6 +5,7 @@
package issues
import (
+ "context"
"os"
"testing"
@@ -84,7 +85,7 @@ func TestBleveIndexAndSearch(t *testing.T) {
}
for _, kw := range keywords {
- res, err := indexer.Search(kw.Keyword, []int64{2}, 10, 0)
+ res, err := indexer.Search(context.TODO(), kw.Keyword, []int64{2}, 10, 0)
assert.NoError(t, err)
ids := make([]int64, 0, len(res.Hits))
diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go
index f02cbddce8..e2badf64f2 100644
--- a/modules/indexer/issues/db.go
+++ b/modules/indexer/issues/db.go
@@ -4,33 +4,47 @@
package issues
-import "code.gitea.io/gitea/models"
+import (
+ "context"
+
+ "code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/models/db"
+)
// DBIndexer implements Indexer interface to use database's like search
type DBIndexer struct{}
// Init dummy function
-func (db *DBIndexer) Init() (bool, error) {
+func (i *DBIndexer) Init() (bool, error) {
return false, nil
}
+// SetAvailabilityChangeCallback dummy function
+func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+}
+
+// Ping checks if database is available
+func (i *DBIndexer) Ping() bool {
+ return db.GetEngine(db.DefaultContext).Ping() != nil
+}
+
// Index dummy function
-func (db *DBIndexer) Index(issue []*IndexerData) error {
+func (i *DBIndexer) Index(issue []*IndexerData) error {
return nil
}
// Delete dummy function
-func (db *DBIndexer) Delete(ids ...int64) error {
+func (i *DBIndexer) Delete(ids ...int64) error {
return nil
}
// Close dummy function
-func (db *DBIndexer) Close() {
+func (i *DBIndexer) Close() {
}
// Search dummy function
-func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
- total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start)
+func (i *DBIndexer) Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
+ total, ids, err := models.SearchIssueIDsByKeyword(ctx, kw, repoIDs, limit, start)
if err != nil {
return nil, err
}
diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go
index 187b69b749..97e32b8975 100644
--- a/modules/indexer/issues/elastic_search.go
+++ b/modules/indexer/issues/elastic_search.go
@@ -8,9 +8,12 @@ import (
"context"
"errors"
"fmt"
+ "net"
"strconv"
+ "sync"
"time"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"github.com/olivere/elastic/v7"
@@ -20,8 +23,12 @@ var _ Indexer = &ElasticSearchIndexer{}
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
- client *elastic.Client
- indexerName string
+ client *elastic.Client
+ indexerName string
+ available bool
+ availabilityCallback func(bool)
+ stopTimer chan struct{}
+ lock sync.RWMutex
}
type elasticLogger struct {
@@ -56,10 +63,27 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, er
return nil, err
}
- return &ElasticSearchIndexer{
+ indexer := &ElasticSearchIndexer{
client: client,
indexerName: indexerName,
- }, nil
+ available: true,
+ stopTimer: make(chan struct{}),
+ }
+
+ ticker := time.NewTicker(10 * time.Second)
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ indexer.checkAvailability()
+ case <-indexer.stopTimer:
+ ticker.Stop()
+ return
+ }
+ }
+ }()
+
+ return indexer, nil
}
const (
@@ -93,10 +117,10 @@ const (
// Init will initialize the indexer
func (b *ElasticSearchIndexer) Init() (bool, error) {
- ctx := context.Background()
+ ctx := graceful.GetManager().HammerContext()
exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
if !exists {
@@ -104,7 +128,7 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
if !createIndex.Acknowledged {
return false, errors.New("init failed")
@@ -115,6 +139,20 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
return true, nil
}
+// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
+func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ b.availabilityCallback = callback
+}
+
+// Ping checks if elastic is available
+func (b *ElasticSearchIndexer) Ping() bool {
+ b.lock.RLock()
+ defer b.lock.RUnlock()
+ return b.available
+}
+
// Index will save the index data
func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
if len(issues) == 0 {
@@ -131,8 +169,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
"content": issue.Content,
"comments": issue.Comments,
}).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
reqs := make([]elastic.BulkableRequest, 0)
@@ -154,8 +192,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
_, err := b.client.Bulk().
Index(b.indexerName).
Add(reqs...).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
// Delete deletes indexes by ids
@@ -166,8 +204,8 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
_, err := b.client.Delete().
Index(b.indexerName).
Id(fmt.Sprintf("%d", ids[0])).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
reqs := make([]elastic.BulkableRequest, 0)
@@ -182,13 +220,13 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
_, err := b.client.Bulk().
Index(b.indexerName).
Add(reqs...).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
// Search searches for issues by given conditions.
// Returns the matching issue IDs
-func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
+func (b *ElasticSearchIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments")
query := elastic.NewBoolQuery()
query = query.Must(kwQuery)
@@ -205,9 +243,9 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
Query(query).
Sort("_score", false).
From(start).Size(limit).
- Do(context.Background())
+ Do(ctx)
if err != nil {
- return nil, err
+ return nil, b.checkError(err)
}
hits := make([]Match, 0, limit)
@@ -225,4 +263,51 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
}
// Close implements indexer
-func (b *ElasticSearchIndexer) Close() {}
+func (b *ElasticSearchIndexer) Close() {
+ select {
+ case <-b.stopTimer:
+ default:
+ close(b.stopTimer)
+ }
+}
+
+func (b *ElasticSearchIndexer) checkError(err error) error {
+ var opErr *net.OpError
+ if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
+ return err
+ }
+
+ b.setAvailability(false)
+
+ return err
+}
+
+func (b *ElasticSearchIndexer) checkAvailability() {
+ if b.Ping() {
+ return
+ }
+
+ // Request cluster state to check if elastic is available again
+ _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
+ if err != nil {
+ b.setAvailability(false)
+ return
+ }
+
+ b.setAvailability(true)
+}
+
+func (b *ElasticSearchIndexer) setAvailability(available bool) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ if b.available == available {
+ return
+ }
+
+ b.available = available
+ if b.availabilityCallback != nil {
+ // Call the callback from within the lock to ensure that the ordering remains correct
+ b.availabilityCallback(b.available)
+ }
+}
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 729981ec71..3aaa27eed2 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -47,9 +47,11 @@ type SearchResult struct {
// Indexer defines an interface to indexer issues contents
type Indexer interface {
Init() (bool, error)
+ Ping() bool
+ SetAvailabilityChangeCallback(callback func(bool))
Index(issue []*IndexerData) error
Delete(ids ...int64) error
- Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
+ Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
Close()
}
@@ -111,6 +113,7 @@ func InitIssueIndexer(syncReindex bool) {
}
iData := make([]*IndexerData, 0, len(data))
+ unhandled := make([]queue.Data, 0, len(data))
for _, datum := range data {
indexerData, ok := datum.(*IndexerData)
if !ok {
@@ -119,13 +122,34 @@ func InitIssueIndexer(syncReindex bool) {
}
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
if indexerData.IsDelete {
- _ = indexer.Delete(indexerData.IDs...)
+ if err := indexer.Delete(indexerData.IDs...); err != nil {
+ log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
+ if indexer.Ping() {
+ continue
+ }
+ // Add back to queue
+ unhandled = append(unhandled, datum)
+ }
continue
}
iData = append(iData, indexerData)
}
+ if len(unhandled) > 0 {
+ for _, indexerData := range iData {
+ unhandled = append(unhandled, indexerData)
+ }
+ return unhandled
+ }
if err := indexer.Index(iData); err != nil {
log.Error("Error whilst indexing: %v Error: %v", iData, err)
+ if indexer.Ping() {
+ return nil
+ }
+ // Add back to queue
+ for _, indexerData := range iData {
+ unhandled = append(unhandled, indexerData)
+ }
+ return unhandled
}
return nil
}
@@ -193,6 +217,18 @@ func InitIssueIndexer(syncReindex bool) {
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
+ if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
+ holder.get().SetAvailabilityChangeCallback(func(available bool) {
+ if !available {
+ log.Info("Issue index queue paused")
+ queue.Pause()
+ } else {
+ log.Info("Issue index queue resumed")
+ queue.Resume()
+ }
+ })
+ }
+
// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
@@ -334,7 +370,7 @@ func DeleteRepoIssueIndexer(repo *repo_model.Repository) {
// SearchIssuesByKeyword search issue ids by keywords and repo id
// WARNNING: You have to ensure user have permission to visit repoIDs' issues
-func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
+func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
indexer := holder.get()
@@ -342,7 +378,7 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
log.Error("SearchIssuesByKeyword(): unable to get indexer!")
return nil, fmt.Errorf("unable to get issue indexer")
}
- res, err := indexer.Search(keyword, repoIDs, 50, 0)
+ res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0)
if err != nil {
return nil, err
}
@@ -351,3 +387,14 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
}
return issueIDs, nil
}
+
+// IsAvailable checks if issue indexer is available
+func IsAvailable() bool {
+ indexer := holder.get()
+ if indexer == nil {
+ log.Error("IsAvailable(): unable to get indexer!")
+ return false
+ }
+
+ return indexer.Ping()
+}
diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go
index ee6ebcdd18..d516615b56 100644
--- a/modules/indexer/issues/indexer_test.go
+++ b/modules/indexer/issues/indexer_test.go
@@ -5,6 +5,7 @@
package issues
import (
+ "context"
"os"
"path"
"path/filepath"
@@ -56,19 +57,19 @@ func TestBleveSearchIssues(t *testing.T) {
time.Sleep(5 * time.Second)
- ids, err := SearchIssuesByKeyword([]int64{1}, "issue2")
+ ids, err := SearchIssuesByKeyword(context.TODO(), []int64{1}, "issue2")
assert.NoError(t, err)
assert.EqualValues(t, []int64{2}, ids)
- ids, err = SearchIssuesByKeyword([]int64{1}, "first")
+ ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "first")
assert.NoError(t, err)
assert.EqualValues(t, []int64{1}, ids)
- ids, err = SearchIssuesByKeyword([]int64{1}, "for")
+ ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "for")
assert.NoError(t, err)
assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids)
- ids, err = SearchIssuesByKeyword([]int64{1}, "good")
+ ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "good")
assert.NoError(t, err)
assert.EqualValues(t, []int64{1}, ids)
}
@@ -79,19 +80,19 @@ func TestDBSearchIssues(t *testing.T) {
setting.Indexer.IssueType = "db"
InitIssueIndexer(true)
- ids, err := SearchIssuesByKeyword([]int64{1}, "issue2")
+ ids, err := SearchIssuesByKeyword(context.TODO(), []int64{1}, "issue2")
assert.NoError(t, err)
assert.EqualValues(t, []int64{2}, ids)
- ids, err = SearchIssuesByKeyword([]int64{1}, "first")
+ ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "first")
assert.NoError(t, err)
assert.EqualValues(t, []int64{1}, ids)
- ids, err = SearchIssuesByKeyword([]int64{1}, "for")
+ ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "for")
assert.NoError(t, err)
assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids)
- ids, err = SearchIssuesByKeyword([]int64{1}, "good")
+ ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "good")
assert.NoError(t, err)
assert.EqualValues(t, []int64{1}, ids)
}