@@ -254,7 +254,7 @@ func (p *Websocket) receive(ctx *fasthttp.RequestCtx) {
254254 return
255255 }
256256
257- if err := p .upgrader .Upgrade (ctx , p .websocket ); err != nil {
257+ if err := p .upgrader .Upgrade (ctx , p .websocket ( ctx ) ); err != nil {
258258 ctx .Response .SetStatusCode (fasthttp .StatusInternalServerError )
259259 ctx .SetConnectionClose ()
260260 l .Error ("Failed to upgrade to websocket" ,
@@ -264,44 +264,55 @@ func (p *Websocket) receive(ctx *fasthttp.RequestCtx) {
264264 }
265265}
266266
267- func (p * Websocket ) websocket (frontend * websocket.Conn ) {
268- defer frontend .Close ()
267+ func (p * Websocket ) websocket (ctx * fasthttp.RequestCtx ) func (frontend * websocket.Conn ) {
268+ return func (frontend * websocket.Conn ) {
269+ defer frontend .Close ()
269270
270- l := p .logger .With (
271- zap .String ("remote_addr" , frontend .RemoteAddr ().String ()),
272- )
271+ l := p .logger .With (
272+ zap .String ("remote_addr" , frontend .RemoteAddr ().String ()),
273+ zap .Uint64 ("connection_id" , ctx .ConnID ()),
274+ )
275+
276+ backend , _ , err := p .backend .Dial (p .cfg .proxy .BackendURL , nil )
277+ if err != nil {
278+ l .Error ("Failed to proxy the websocket stream" ,
279+ zap .Error (err ),
280+ )
281+ return
282+ }
283+ defer backend .Close ()
284+
285+ addr := frontend .NetConn ().RemoteAddr ().String ()
286+ pump := newWebsocketPump (p .cfg , frontend , backend , l )
287+
288+ p .mxConnections .Lock ()
289+ p .pumps [addr ] = pump
290+ p .mxConnections .Unlock ()
291+
292+ reason := pump .run ()
293+ if reason != nil {
294+ metrics .ProxyFailureCount .Add (context .TODO (), 1 , otelapi .WithAttributes (
295+ attribute.KeyValue {Key : "proxy" , Value : attribute .StringValue (p .cfg .name )},
296+ ))
297+ l .Error ("Websocket connection failed" ,
298+ zap .Error (reason ),
299+ )
300+ }
273301
274- backend , _ , err := p .backend .Dial (p .cfg .proxy .BackendURL , nil )
275- if err != nil {
276- l .Error ("Failed to proxy the websocket stream" ,
302+ err = p .closeWebsocket (pump .backend , reason )
303+ l .Info ("Closed backend connection" ,
277304 zap .Error (err ),
278305 )
279- return
280- }
281- defer backend .Close ()
282-
283- addr := frontend .NetConn ().RemoteAddr ().String ()
284- pump := newWebsocketPump (p .cfg , frontend , backend , l )
285306
286- p .mxConnections .Lock ()
287- p .pumps [addr ] = pump
288- p .mxConnections .Unlock ()
289-
290- reason := pump .run ()
291- if reason != nil {
292- metrics .ProxyFailureCount .Add (context .TODO (), 1 , otelapi .WithAttributes (
293- attribute.KeyValue {Key : "proxy" , Value : attribute .StringValue (p .cfg .name )},
294- ))
295- l .Error ("Websocket connection failed" ,
296- zap .Error (reason ),
307+ err = p .closeWebsocket (pump .frontend , reason )
308+ l .Info ("Closed frontend connection" ,
309+ zap .Error (err ),
297310 )
298- }
299- _ = p .closeWebsocket (pump .backend , reason )
300- _ = p .closeWebsocket (pump .frontend , reason )
301311
302- p .mxConnections .Lock ()
303- delete (p .pumps , addr )
304- p .mxConnections .Unlock ()
312+ p .mxConnections .Lock ()
313+ delete (p .pumps , addr )
314+ p .mxConnections .Unlock ()
315+ }
305316}
306317
307318func (p * Websocket ) upstreamConnectionChanged (conn net.Conn , state fasthttp.ConnState ) {
0 commit comments