WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit ee80da3

Browse files
committed
Add WIP WebhookForwarder
1 parent d5ba3dd commit ee80da3

File tree

5 files changed

+284
-115
lines changed

5 files changed

+284
-115
lines changed

cmd/hubproxy/main.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,24 @@ func run() error {
205205
// Create webhook handler
206206
webhookHandler := webhook.NewHandler(webhook.Options{
207207
Secret: viper.GetString("webhook-secret"),
208-
TargetURL: targetURL,
209-
HTTPClient: webhookHTTPClient,
210208
Logger: logger,
211209
Store: store,
212210
ValidateIP: viper.GetBool("validate-ip"),
213211
MetricsCollector: metricsCollector,
214212
})
215213

214+
// Forwarder requires target URL be set
215+
if targetURL != "" {
216+
webhookForwarder := webhook.NewWebhookForwarder(webhook.WebhookForwarderOptions{
217+
TargetURL: targetURL,
218+
HTTPClient: webhookHTTPClient,
219+
Storage: store,
220+
MetricsCollector: metricsCollector,
221+
Logger: logger,
222+
})
223+
go webhookForwarder.StartForwarder(ctx)
224+
}
225+
216226
// Create webhook server
217227
var webhookLn net.Listener
218228
webhookRouter := chi.NewRouter()

internal/integration/tailscale_test.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package integration
22

33
import (
4+
"context"
45
"crypto/hmac"
56
"crypto/sha256"
67
"encoding/hex"
@@ -52,23 +53,35 @@ func TestTailscaleIntegration(t *testing.T) {
5253

5354
const secret = "test-secret"
5455

55-
// Create webhook handler
56+
// Create context
57+
ctx, cancel := context.WithCancel(context.Background())
58+
defer cancel()
59+
60+
// Create webhook handler and forwarder
5661
handler := webhook.NewHandler(webhook.Options{
5762
Secret: secret,
58-
TargetURL: targetServer.URL,
59-
Store: store,
6063
Logger: logger,
61-
ValidateIP: false,
64+
Store: store,
65+
MetricsCollector: storage.NewDBMetricsCollector(store, logger),
66+
})
67+
68+
forwarder := webhook.NewWebhookForwarder(webhook.WebhookForwarderOptions{
69+
TargetURL: targetServer.URL,
70+
Storage: store,
6271
MetricsCollector: storage.NewDBMetricsCollector(store, logger),
72+
Logger: logger,
6373
})
6474

65-
// Create HTTP mux
66-
mux := http.NewServeMux()
67-
mux.Handle("/", handler)
75+
// Start the forwarder
76+
go forwarder.StartForwarder(ctx)
77+
78+
// Create test server with webhook handler
79+
server := httptest.NewServer(handler)
80+
defer server.Close()
6881

6982
t.Run("Regular server", func(t *testing.T) {
7083
// Test regular HTTP server
71-
server := httptest.NewServer(mux)
84+
server := httptest.NewServer(handler)
7285
defer server.Close()
7386

7487
// Send a test webhook
@@ -107,7 +120,7 @@ func TestTailscaleIntegration(t *testing.T) {
107120
require.NoError(t, err)
108121

109122
// Start server
110-
go http.Serve(ln, mux)
123+
go http.Serve(ln, handler)
111124

112125
// Wait for server to be ready
113126
time.Sleep(2 * time.Second)

internal/integration/webhook_test.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,24 @@ func TestWebhookIntegration(t *testing.T) {
5353
metricsCollector := storage.NewDBMetricsCollector(store, logger)
5454
metricsCollector.StartMetricsCollection(ctx, time.Second)
5555

56-
// Create webhook handler
56+
// Create webhook handler and forwarder
5757
handler := webhook.NewHandler(webhook.Options{
5858
Secret: secret,
59-
TargetURL: ts.URL,
6059
Logger: logger,
6160
Store: store,
6261
MetricsCollector: metricsCollector,
6362
})
6463

64+
forwarder := webhook.NewWebhookForwarder(webhook.WebhookForwarderOptions{
65+
TargetURL: ts.URL,
66+
Storage: store,
67+
MetricsCollector: metricsCollector,
68+
Logger: logger,
69+
})
70+
71+
// Start the forwarder
72+
go forwarder.StartForwarder(ctx)
73+
6574
// Create test server with webhook handler
6675
server := httptest.NewServer(handler)
6776
defer server.Close()
@@ -211,15 +220,24 @@ func TestWebhookUnixSocket(t *testing.T) {
211220
metricsCollector := storage.NewDBMetricsCollector(store, logger)
212221
metricsCollector.StartMetricsCollection(ctx, time.Second)
213222

214-
// Create webhook handler with Unix socket target
223+
// Create webhook handler and forwarder
215224
handler := webhook.NewHandler(webhook.Options{
216225
Secret: secret,
217-
TargetURL: "unix://" + socketPath,
218226
Logger: logger,
219227
Store: store,
220228
MetricsCollector: metricsCollector,
221229
})
222230

231+
forwarder := webhook.NewWebhookForwarder(webhook.WebhookForwarderOptions{
232+
TargetURL: "unix://" + socketPath,
233+
Storage: store,
234+
MetricsCollector: metricsCollector,
235+
Logger: logger,
236+
})
237+
238+
// Start the forwarder
239+
go forwarder.StartForwarder(ctx)
240+
223241
// Create test server with webhook handler
224242
server := httptest.NewServer(handler)
225243
defer server.Close()
@@ -246,6 +264,18 @@ func TestWebhookUnixSocket(t *testing.T) {
246264

247265
assert.Equal(t, http.StatusOK, resp.StatusCode)
248266

267+
// Wait a bit for the event to be stored
268+
time.Sleep(100 * time.Millisecond)
269+
270+
// Get the event from storage to verify it was stored
271+
events, _, err := store.ListEvents(ctx, storage.QueryOptions{})
272+
require.NoError(t, err)
273+
require.Len(t, events, 1)
274+
275+
// Trigger event processing
276+
err = forwarder.ProcessEvents(ctx)
277+
require.NoError(t, err)
278+
249279
// Wait for forwarded request
250280
select {
251281
case receivedData := <-receivedCh:

internal/webhook/forwarder.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
package webhook
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log/slog"
8+
"net"
9+
"net/http"
10+
"strings"
11+
12+
"hubproxy/internal/storage"
13+
14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
16+
)
17+
18+
var (
19+
webhookForwardedEvents = promauto.NewCounter(
20+
prometheus.CounterOpts{
21+
Name: "hubproxy_webhook_forwarded_events_total",
22+
Help: "Total number of webhook events forwarded to the target",
23+
},
24+
)
25+
26+
webhookForwardingErrors = promauto.NewCounter(
27+
prometheus.CounterOpts{
28+
Name: "hubproxy_webhook_forwarding_errors_total",
29+
Help: "Total number of webhook forwarding errors",
30+
},
31+
)
32+
)
33+
34+
type WebhookForwarder struct {
35+
storage storage.Storage
36+
metricsCollector *storage.DBMetricsCollector
37+
httpClient *http.Client
38+
targetURL string
39+
logger *slog.Logger
40+
queue chan struct{}
41+
}
42+
43+
type WebhookForwarderOptions struct {
44+
Storage storage.Storage
45+
MetricsCollector *storage.DBMetricsCollector
46+
HTTPClient *http.Client
47+
TargetURL string
48+
Logger *slog.Logger
49+
}
50+
51+
func NewWebhookForwarder(opts WebhookForwarderOptions) *WebhookForwarder {
52+
if opts.TargetURL == "" {
53+
panic("target URL is required")
54+
}
55+
if opts.Storage == nil {
56+
panic("storage is required")
57+
}
58+
if opts.Logger == nil {
59+
opts.Logger = slog.Default()
60+
}
61+
62+
httpClient := opts.HTTPClient
63+
64+
// Swap out HTTP client to use Unix socket
65+
if strings.HasPrefix(opts.TargetURL, "unix://") {
66+
socketPath := strings.TrimPrefix(opts.TargetURL, "unix://")
67+
httpClient = &http.Client{
68+
Transport: &http.Transport{
69+
DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
70+
return net.Dial("unix", socketPath)
71+
},
72+
},
73+
}
74+
}
75+
76+
// Use default HTTP client if not provided
77+
if httpClient == nil {
78+
httpClient = &http.Client{}
79+
}
80+
81+
return &WebhookForwarder{
82+
targetURL: opts.TargetURL,
83+
httpClient: httpClient,
84+
storage: opts.Storage,
85+
metricsCollector: opts.MetricsCollector,
86+
logger: opts.Logger,
87+
queue: make(chan struct{}, 1), // Buffer size 1 to allow one pending job
88+
}
89+
}
90+
91+
// TargetURL returns the configured target URL
92+
func (f *WebhookForwarder) TargetURL() string {
93+
return f.targetURL
94+
}
95+
96+
func (f *WebhookForwarder) forwardEvent(ctx context.Context, event *storage.Event) {
97+
var targetURL string
98+
// http.NewRequest still needs a valid http URI, make a fake one for unix socket path
99+
if strings.HasPrefix(f.targetURL, "unix://") {
100+
targetURL = "http://127.0.0.1/webhook"
101+
} else {
102+
targetURL = f.targetURL
103+
}
104+
105+
req, err := http.NewRequest(http.MethodPost, targetURL, strings.NewReader(string(event.Payload)))
106+
if err != nil {
107+
webhookForwardingErrors.Inc()
108+
f.logger.Error("failed to create request", "targetURL", targetURL, "error", err)
109+
return
110+
}
111+
112+
var headers map[string][]string
113+
err = json.Unmarshal(event.Headers, &headers)
114+
if err != nil {
115+
webhookForwardingErrors.Inc()
116+
f.logger.Error("failed to parse headers", "error", err)
117+
return
118+
}
119+
120+
for name, values := range headers {
121+
for _, value := range values {
122+
req.Header.Add(name, value)
123+
}
124+
}
125+
126+
if req.Header.Get("Content-Type") != "application/json" {
127+
f.logger.Warn("Content-Type header is not application/json", "Content-Type", req.Header.Get("Content-Type"))
128+
}
129+
if req.Header.Get("X-Github-Event") == "" {
130+
f.logger.Warn("X-Github-Event header is not set", "X-Github-Event", req.Header.Get("X-Github-Event"))
131+
}
132+
if req.Header.Get("X-Github-Delivery") == "" {
133+
f.logger.Warn("X-Github-Delivery header is not set", "X-Github-Delivery", req.Header.Get("X-Github-Delivery"))
134+
}
135+
if req.Header.Get("X-Hub-Signature-256") == "" {
136+
f.logger.Warn("X-Hub-Signature-256 header is not set", "X-Hub-Signature-256", req.Header.Get("X-Hub-Signature-256"))
137+
}
138+
139+
resp, err := f.httpClient.Do(req)
140+
if err != nil {
141+
webhookForwardingErrors.Inc()
142+
f.logger.Error("failed to forward request", "targetURL", targetURL, "error", err)
143+
return
144+
}
145+
defer resp.Body.Close()
146+
147+
if resp.StatusCode >= 400 {
148+
webhookForwardingErrors.Inc()
149+
f.logger.Error("target returned error", "status", resp.Status, "targetURL", targetURL)
150+
return
151+
}
152+
153+
webhookForwardedEvents.Inc()
154+
155+
err = f.storage.MarkForwarded(ctx, event.ID)
156+
if err != nil {
157+
f.logger.Error("error marking event as forwarded", "error", err)
158+
}
159+
}
160+
161+
func (f *WebhookForwarder) ProcessEvents(ctx context.Context) error {
162+
// Don't ever create a WebhookForwarder if there's no target URL
163+
if f.targetURL == "" {
164+
panic("target URL is not set")
165+
}
166+
167+
f.logger.Debug("processing webhook events from database")
168+
169+
// TODO: Only select events WEHRE forwarded_at IS NULL
170+
events, _, err := f.storage.ListEvents(ctx, storage.QueryOptions{})
171+
if err != nil {
172+
return fmt.Errorf("listing events: %w", err)
173+
}
174+
175+
if len(events) == 0 {
176+
f.logger.Debug("no events to forward")
177+
return nil
178+
}
179+
180+
f.logger.Info("forwarding webhook events", "count", len(events))
181+
182+
for _, event := range events {
183+
f.forwardEvent(ctx, event)
184+
}
185+
186+
f.metricsCollector.EnqueueGatherMetrics(ctx)
187+
188+
return nil
189+
}
190+
191+
func (f *WebhookForwarder) EnqueueProcessEvents() {
192+
select {
193+
case f.queue <- struct{}{}:
194+
f.logger.Debug("enqueued webhook processing job")
195+
default:
196+
f.logger.Debug("webhook processing job already pending")
197+
}
198+
}
199+
200+
func (f *WebhookForwarder) StartForwarder(ctx context.Context) {
201+
go func() {
202+
for {
203+
select {
204+
case <-ctx.Done():
205+
f.logger.Debug("stopped webhook forwarder")
206+
return
207+
case <-f.queue:
208+
if err := f.ProcessEvents(ctx); err != nil {
209+
f.logger.Error("failed to process webhook events", "error", err)
210+
}
211+
}
212+
}
213+
}()
214+
215+
f.EnqueueProcessEvents()
216+
}

0 commit comments

Comments
 (0)