blob: 102e79e5416dd8d2ec8b92b8a809869cd0439081 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package queue
import (
"context"
"time"
)
var pushBlockTime = 5 * time.Second
type baseQueue interface {
PushItem(ctx context.Context, data []byte) error
PopItem(ctx context.Context) ([]byte, error)
HasItem(ctx context.Context, data []byte) (bool, error)
Len(ctx context.Context) (int, error)
Close() error
RemoveAll(ctx context.Context) error
}
func popItemByChan(ctx context.Context, popItemFn func(ctx context.Context) ([]byte, error)) (chanItem chan []byte, chanErr chan error) {
chanItem = make(chan []byte)
chanErr = make(chan error)
go func() {
for {
it, err := popItemFn(ctx)
if err != nil {
close(chanItem)
chanErr <- err
return
}
if it == nil {
close(chanItem)
close(chanErr)
return
}
chanItem <- it
}
}()
return chanItem, chanErr
}
|