WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 45 additions & 55 deletions comp/core/tagger/impl-remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ const (
cacheExpiration = 1 * time.Minute
)

var (
errTaggerStreamNotStarted = errors.New("tagger stream not started")
)

// Requires defines the dependencies for the remote tagger.
type Requires struct {
compdef.In
Expand Down Expand Up @@ -466,6 +462,15 @@ func (t *remoteTagger) Subscribe(string, *types.Filter) (types.Subscription, err
}

func (t *remoteTagger) run() {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 500 * time.Millisecond
expBackoff.MaxInterval = 5 * time.Minute
expBackoff.MaxElapsedTime = noTimeout

// Use a timer to trigger the loop. Start immediately.
timer := time.NewTimer(0)
defer timer.Stop()

for {
select {
case <-t.telemetryTicker.C:
Expand All @@ -477,13 +482,15 @@ func (t *remoteTagger) run() {
t.streamCancel()
}
return
default:
case <-timer.C:
// Proceed to logic to process stream response
}

taggerStreamInitialized := false
if t.stream == nil {
if err := t.startTaggerStream(noTimeout); err != nil {
if err := t.startTaggerStream(); err != nil {
t.log.Warnf("error received trying to start stream with target %q: %s", t.options.Target, err)
timer.Reset(expBackoff.NextBackOff())
continue
}
taggerStreamInitialized = true
Expand All @@ -510,13 +517,23 @@ func (t *remoteTagger) run() {

t.log.Warnf("error received from remote tagger: %s", err)

// We need to backoff here because we might have established a
// stream but failed to receive anything (e.g. invalid auth token).
// In that case startTaggerStream succeeds immediately but Recv fails
// immediately, causing a tight loop.
timer.Reset(expBackoff.NextBackOff())
continue
}

if taggerStreamInitialized {
t.log.Info("tagger stream successfully initialized")
}

// If we successfully received a response,
// we can reset the backoff and continue immediately reading from the stream.
expBackoff.Reset()
timer.Reset(0)

t.telemetryStore.Receives.Inc()

err = t.processResponse(response)
Expand Down Expand Up @@ -572,59 +589,32 @@ func (t *remoteTagger) processResponse(response *pb.StreamTagsResponse) error {
}

// startTaggerStream tries to establish a stream with the remote gRPC endpoint.
// Since the entire remote tagger really depends on this working, it'll keep on
// retrying with an exponential backoff until maxElapsed (or forever if
// maxElapsed == 0) or the tagger is stopped.
func (t *remoteTagger) startTaggerStream(maxElapsed time.Duration) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 500 * time.Millisecond
expBackoff.MaxInterval = 5 * time.Minute
expBackoff.MaxElapsedTime = maxElapsed

var err error
timer := time.NewTimer(0) // immediate first attempt
defer timer.Stop()

for {
select {
case <-t.ctx.Done():
return errTaggerStreamNotStarted
case <-timer.C:
// Cancel any existing stream context before creating a new one
if t.streamCancel != nil {
t.streamCancel()
}
func (t *remoteTagger) startTaggerStream() (err error) {
// Cancel any existing stream context before creating a new one
if t.streamCancel != nil {
t.streamCancel()
}

t.streamCtx, t.streamCancel = context.WithCancel(
metadata.NewOutgoingContext(t.ctx, metadata.MD{
"authorization": []string{"Bearer " + t.authToken}, // TODO IPC: implement GRPC client
}),
)
var streamCtx context.Context
streamCtx, t.streamCancel = context.WithCancel(
metadata.NewOutgoingContext(t.ctx, metadata.MD{
"authorization": []string{"Bearer " + t.authToken}, // TODO IPC: implement GRPC client
}),
)
t.streamCtx = streamCtx

prefixes := make([]string, 0)
for prefix := range t.filter.GetPrefixes() {
prefixes = append(prefixes, string(prefix))
}
prefixes := make([]string, 0)
for prefix := range t.filter.GetPrefixes() {
prefixes = append(prefixes, string(prefix))
}

t.stream, err = t.client.TaggerStreamEntities(t.streamCtx, &pb.StreamTagsRequest{
Cardinality: pb.TagCardinality(t.filter.GetCardinality()),
StreamingID: fmt.Sprintf("%s:%s", flavor.GetFlavor(), uuid.New().String()),
Prefixes: prefixes,
})

if err != nil {
t.log.Debugf("unable to establish stream, will retry: %s", err)
nextBackoff := expBackoff.NextBackOff()
if nextBackoff == backoff.Stop {
return err
}
timer.Reset(nextBackoff)
continue
}
t.stream, err = t.client.TaggerStreamEntities(t.streamCtx, &pb.StreamTagsRequest{
Cardinality: pb.TagCardinality(t.filter.GetCardinality()),
StreamingID: fmt.Sprintf("%s:%s", flavor.GetFlavor(), uuid.New().String()),
Prefixes: prefixes,
})

return nil
}
}
return err
}

func (t *remoteTagger) writeList(w http.ResponseWriter, _ *http.Request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
fixes:
- |
Fixes remote tagger implementation to backoff when the stream
is initialized but receiving events through the stream fails.