WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 48120ab

Browse files
committed
Unified scheduler
1 parent 6e837a1 commit 48120ab

16 files changed

+424
-572
lines changed

.golangci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ linters:
66
- (*github.com/bsm/feedx.Writer).Discard
77
- (*github.com/bsm/feedx.Reader).Close
88
- (github.com/bsm/feedx.Consumer).Close
9+
- (*github.com/bsm/feedx.IncrementalProducer).Close
910
- (*github.com/bsm/feedx.Producer).Close
11+
- (*github.com/bsm/bfs.InMem).Close
1012
- (*github.com/bsm/bfs.Object).Close
1113
- (github.com/bsm/bfs.Writer).Discard

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ GEM
4949
rubocop-ast (>= 1.47.1, < 2.0)
5050
ruby-progressbar (~> 1.7)
5151
unicode-display_width (>= 2.4.0, < 4.0)
52-
rubocop-ast (1.47.1)
52+
rubocop-ast (1.48.0)
5353
parser (>= 3.3.7.2)
5454
prism (~> 1.4)
5555
rubocop-rake (0.7.1)

consumer.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,6 @@ import (
88
"github.com/bsm/bfs"
99
)
1010

11-
// ConsumeStatus is returned by Consumer instances.
12-
type ConsumeStatus struct {
13-
// Skipped indicates the the sync was skipped, because there were no new changes.
14-
Skipped bool
15-
// Version indicates the current version of the remote feed.
16-
Version int64
17-
// PreviousVersion indicates the last known version to the consumer before the sync.
18-
PreviousVersion int64
19-
// NumRead returns the number of items read.
20-
NumRead int64
21-
}
22-
2311
// ConsumeFunc is a callback invoked by consumers.
2412
type ConsumeFunc func(context.Context, *Reader) error
2513

@@ -28,7 +16,7 @@ type ConsumeFunc func(context.Context, *Reader) error
2816
type Consumer interface {
2917
// Consume initiates a sync attempt. It will consume the remote feed only if it has changed since
3018
// last invocation.
31-
Consume(context.Context, *ReaderOptions, ConsumeFunc) (*ConsumeStatus, error)
19+
Consume(context.Context, *ReaderOptions, ConsumeFunc) (*Status, error)
3220

3321
// Version indicates the most recently consumed version.
3422
Version() int64
@@ -86,21 +74,21 @@ type consumer struct {
8674
}
8775

8876
// Consume implements Consumer interface.
89-
func (c *consumer) Consume(ctx context.Context, opt *ReaderOptions, fn ConsumeFunc) (*ConsumeStatus, error) {
90-
prevVersion := c.Version()
91-
status := ConsumeStatus{
92-
PreviousVersion: prevVersion,
77+
func (c *consumer) Consume(ctx context.Context, opt *ReaderOptions, fn ConsumeFunc) (*Status, error) {
78+
localVersion := c.Version()
79+
status := Status{
80+
LocalVersion: localVersion,
9381
}
9482

9583
// retrieve remote mtime
96-
version, err := fetchRemoteVersion(ctx, c.remote)
84+
remoteVersion, err := fetchRemoteVersion(ctx, c.remote)
9785
if err != nil {
9886
return nil, err
9987
}
100-
status.Version = version
88+
status.RemoteVersion = remoteVersion
10189

10290
// skip sync unless modified
103-
if prevVersion > 0 && prevVersion == version {
91+
if skipSync(remoteVersion, localVersion) {
10492
status.Skipped = true
10593
return &status, nil
10694
}
@@ -122,8 +110,8 @@ func (c *consumer) Consume(ctx context.Context, opt *ReaderOptions, fn ConsumeFu
122110
return nil, err
123111
}
124112

125-
status.NumRead = reader.NumRead()
126-
c.version.Store(version)
113+
status.NumItems = reader.NumRead()
114+
c.version.Store(remoteVersion)
127115
return &status, nil
128116
}
129117

consumer_test.go

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,66 +12,66 @@ import (
1212

1313
func TestConsumer(t *testing.T) {
1414
t.Run("consumes", func(t *testing.T) {
15-
csm := fixConsumer(t, 33)
15+
csm := fixConsumer(t, 101)
1616
defer csm.Close()
1717

1818
if exp, got := int64(0), csm.Version(); exp != got {
1919
t.Errorf("expected %v, got %v", exp, got)
2020
}
2121

2222
// first attempt
23-
msgs := testConsume(t, csm, &feedx.ConsumeStatus{
24-
PreviousVersion: 0,
25-
Version: 33,
26-
Skipped: false,
27-
NumRead: 2,
23+
msgs := testConsume(t, csm, &feedx.Status{
24+
LocalVersion: 0,
25+
RemoteVersion: 101,
26+
Skipped: false,
27+
NumItems: 2,
2828
})
29-
if exp, got := int64(33), csm.Version(); exp != got {
29+
if exp, got := int64(101), csm.Version(); exp != got {
3030
t.Errorf("expected %v, got %v", exp, got)
3131
}
3232
if exp, got := 2, len(msgs); exp != got {
3333
t.Errorf("expected %v, got %v", exp, got)
3434
}
3535

3636
// second attempt
37-
_ = testConsume(t, csm, &feedx.ConsumeStatus{
38-
PreviousVersion: 33,
39-
Version: 33,
40-
Skipped: true,
41-
NumRead: 0,
37+
_ = testConsume(t, csm, &feedx.Status{
38+
LocalVersion: 101,
39+
RemoteVersion: 101,
40+
Skipped: true,
41+
NumItems: 0,
4242
})
4343
})
4444

4545
t.Run("always if no version", func(t *testing.T) {
4646
csm := fixConsumer(t, 0)
4747
defer csm.Close()
4848

49-
testConsume(t, csm, &feedx.ConsumeStatus{NumRead: 2})
50-
testConsume(t, csm, &feedx.ConsumeStatus{NumRead: 2})
49+
testConsume(t, csm, &feedx.Status{NumItems: 2})
50+
testConsume(t, csm, &feedx.Status{NumItems: 2})
5151
})
5252

5353
t.Run("incremental", func(t *testing.T) {
54-
csm := fixIncrementalConsumer(t, 33)
54+
csm := fixIncrementalConsumer(t, 101)
5555
defer csm.Close()
5656

5757
// first attempt
58-
msgs := testConsume(t, csm, &feedx.ConsumeStatus{
59-
PreviousVersion: 0,
60-
Version: 33,
61-
NumRead: 4,
58+
msgs := testConsume(t, csm, &feedx.Status{
59+
LocalVersion: 0,
60+
RemoteVersion: 101,
61+
NumItems: 4,
6262
})
63-
if exp, got := int64(33), csm.Version(); exp != got {
63+
if exp, got := int64(101), csm.Version(); exp != got {
6464
t.Errorf("expected %v, got %v", exp, got)
6565
}
6666
if exp, got := 4, len(msgs); exp != got {
6767
t.Errorf("expected %v, got %v", exp, got)
6868
}
6969

7070
// second attempt
71-
_ = testConsume(t, csm, &feedx.ConsumeStatus{
72-
PreviousVersion: 33,
73-
Version: 33,
74-
Skipped: true,
71+
_ = testConsume(t, csm, &feedx.Status{
72+
LocalVersion: 101,
73+
RemoteVersion: 101,
74+
Skipped: true,
7575
})
7676
})
7777

@@ -81,9 +81,12 @@ func fixConsumer(t *testing.T, version int64) feedx.Consumer {
8181
t.Helper()
8282

8383
obj := bfs.NewInMemObject("path/to/file.json")
84+
t.Cleanup(func() { _ = obj.Close() })
85+
8486
if err := writeN(obj, 2, version); err != nil {
8587
t.Fatal("unexpected error", err)
8688
}
89+
8790
csm := feedx.NewConsumerForRemote(obj)
8891
t.Cleanup(func() { _ = csm.Close() })
8992

@@ -128,7 +131,7 @@ func fixIncrementalConsumer(t *testing.T, version int64) feedx.Consumer {
128131
return csm
129132
}
130133

131-
func testConsume(t *testing.T, csm feedx.Consumer, exp *feedx.ConsumeStatus) (msgs []*testdata.MockMessage) {
134+
func testConsume(t *testing.T, csm feedx.Consumer, exp *feedx.Status) (msgs []*testdata.MockMessage) {
132135
t.Helper()
133136

134137
status, err := csm.Consume(t.Context(), nil, func(ctx context.Context, r *feedx.Reader) (err error) {

example_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func ExampleConsumer() {
5959
panic(err)
6060
}
6161

62-
fmt.Printf("STATUS skipped:%v version:%v read:%v\n", status.Skipped, status.Version, status.NumRead)
62+
fmt.Printf("STATUS skipped:%v version:%v read:%v\n", status.Skipped, status.RemoteVersion, status.NumItems)
6363
fmt.Printf("DATA %v\n", todos)
6464

6565
// Output:
@@ -80,21 +80,21 @@ func ExampleScheduler() {
8080

8181
job := feedx.Every(time.Hour).
8282
WithContext(ctx).
83-
BeforeConsume(func() bool {
84-
fmt.Println("[H] BeforeConsume")
83+
BeforeSync(func() bool {
84+
fmt.Println("[H] Before sync")
8585
return true
8686
}).
87-
AfterConsume(func(_ *feedx.ConsumeStatus, err error) {
88-
fmt.Printf("[H] AfterConsume - error:%v", err)
87+
AfterSync(func(_ *feedx.Status, err error) {
88+
fmt.Printf("[H] After sync - error:%v", err)
8989
}).
9090
Consume(csm, func(_ context.Context, _ *feedx.Reader) error {
9191
fmt.Println("[*] Consuming feed")
9292
return nil
9393
})
94-
defer job.Stop()
94+
job.Stop()
9595

9696
// Output:
97-
// [H] BeforeConsume
97+
// [H] Before sync
9898
// [*] Consuming feed
99-
// [H] AfterConsume - error:<nil>
99+
// [H] After sync - error:<nil>
100100
}

feedx.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"strconv"
7-
"time"
87

98
"github.com/bsm/bfs"
109
)
@@ -26,13 +25,18 @@ func fetchRemoteVersion(ctx context.Context, obj *bfs.Object) (int64, error) {
2625
return version, nil
2726
}
2827

29-
func epochToTime(epoch int64) time.Time {
30-
return time.Unix(epoch/1000, epoch%1000*1e6)
28+
// Status is returned by sync processes.
29+
type Status struct {
30+
// Skipped indicates the the sync was skipped, because there were no new changes.
31+
Skipped bool
32+
// LocalVersion indicates the local version before sync.
33+
LocalVersion int64
34+
// RemoteVersion indicates the remote version before sync.
35+
RemoteVersion int64
36+
// NumItems returns the number of items processed, either read of written.
37+
NumItems int64
3138
}
3239

33-
func timeToEpoch(t time.Time) int64 {
34-
if n := t.Unix()*1000 + int64(t.Nanosecond()/1e6); n > 0 {
35-
return n
36-
}
37-
return 0
40+
func skipSync(srcVersion, targetVersion int64) bool {
41+
return (srcVersion != 0 || targetVersion != 0) && srcVersion <= targetVersion
3842
}

0 commit comments

Comments
 (0)