From ce060142be03725398309912138c694acfa90625 Mon Sep 17 00:00:00 2001 From: "Neio.zhou" Date: Mon, 10 Jul 2017 06:02:24 +0000 Subject: [PATCH 01/15] Make sure Visual Studio 2012 can build the project. --- HttpTwo/Frames/Frame.cs | 4 ++-- HttpTwo/Frames/HeadersFrame.cs | 2 +- HttpTwo/Frames/PriorityFrame.cs | 2 +- HttpTwo/Frames/PushPromiseFrame.cs | 2 +- HttpTwo/Http2Connection.cs | 8 +++++--- HttpTwo/Http2Settings.cs | 18 +++++++++++++----- HttpTwo/Http2Stream.cs | 14 ++++++++++---- HttpTwo/Internal/FlowControlManager.cs | 7 +++++-- 8 files changed, 38 insertions(+), 19 deletions(-) diff --git a/HttpTwo/Frames/Frame.cs b/HttpTwo/Frames/Frame.cs index 468001f..409b17d 100644 --- a/HttpTwo/Frames/Frame.cs +++ b/HttpTwo/Frames/Frame.cs @@ -71,11 +71,11 @@ public uint Length { public abstract FrameType Type { get; } - public virtual byte Flags { get; protected set; } = 0x0; + public virtual byte Flags { get; protected set; } public virtual uint StreamIdentifier { get; set; } - public virtual IEnumerable Payload { get; } + public virtual IEnumerable Payload { get { return null; } } uint? payloadLength; public uint PayloadLength { diff --git a/HttpTwo/Frames/HeadersFrame.cs b/HttpTwo/Frames/HeadersFrame.cs index 6bcf15a..3c100b5 100644 --- a/HttpTwo/Frames/HeadersFrame.cs +++ b/HttpTwo/Frames/HeadersFrame.cs @@ -46,7 +46,7 @@ public ushort Weight { public bool EndHeaders { get;set; } public bool Priority { get;set; } public byte[] HeaderBlockFragment { get; set; } - public uint StreamDependency { get; set; } = 0; + public uint StreamDependency { get; set; } public override FrameType Type { get { return FrameType.Headers; } diff --git a/HttpTwo/Frames/PriorityFrame.cs b/HttpTwo/Frames/PriorityFrame.cs index f89eb5d..a8d7bb4 100644 --- a/HttpTwo/Frames/PriorityFrame.cs +++ b/HttpTwo/Frames/PriorityFrame.cs @@ -27,7 +27,7 @@ public ushort Weight { } } - public uint StreamDependency { get; set; } = 0; + public uint StreamDependency { get; set; } // type=0x1 public override FrameType Type { diff --git a/HttpTwo/Frames/PushPromiseFrame.cs b/HttpTwo/Frames/PushPromiseFrame.cs index 5e0e90a..2a19b43 100644 --- a/HttpTwo/Frames/PushPromiseFrame.cs +++ b/HttpTwo/Frames/PushPromiseFrame.cs @@ -30,7 +30,7 @@ public ushort PadLength { public byte[] HeaderBlockFragment { get;set; } - public uint StreamDependency { get; set; } = 0; + public uint StreamDependency { get; set; } public override FrameType Type { get { return FrameType.PushPromise; } diff --git a/HttpTwo/Http2Connection.cs b/HttpTwo/Http2Connection.cs index d7d940d..73a26ae 100644 --- a/HttpTwo/Http2Connection.cs +++ b/HttpTwo/Http2Connection.cs @@ -30,6 +30,8 @@ public Http2ConnectionSettings (string host, uint port = 80, bool useTls = false Port = port; UseTls = useTls; Certificates = certificates; + ConnectionTimeout = TimeSpan.FromSeconds (60); + DisablePushPromise = false; } public string Host { get; private set; } @@ -37,8 +39,8 @@ public Http2ConnectionSettings (string host, uint port = 80, bool useTls = false public bool UseTls { get; private set; } public X509CertificateCollection Certificates { get; private set; } - public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromSeconds (60); - public bool DisablePushPromise { get; set; } = false; + public TimeSpan ConnectionTimeout { get; set; } + public bool DisablePushPromise { get; set; } } public class Http2Connection @@ -128,7 +130,7 @@ await sslStream.AuthenticateAsClientAsync ( }, TaskContinuationOptions.OnlyOnFaulted).Forget (); // Start a thread to handle writing queued frames to the stream - var writeTask = Task.Factory.StartNew (write, TaskCreationOptions.LongRunning); + var writeTask = Task.Factory.StartNew(write, TaskCreationOptions.LongRunning); writeTask.ContinueWith (t => { // TODO: Handle the error Disconnect (); diff --git a/HttpTwo/Http2Settings.cs b/HttpTwo/Http2Settings.cs index f47a328..9fdfe23 100644 --- a/HttpTwo/Http2Settings.cs +++ b/HttpTwo/Http2Settings.cs @@ -8,20 +8,28 @@ public class Http2Settings public const uint DefaultMaxFrameSize = 16384; public const uint DefaultHeaderTableSize = 4096; + public Http2Settings() + { + InitialWindowSize = DefaultWindowSize; + MaxFrameSize = DefaultMaxFrameSize; + HeaderTableSize = DefaultHeaderTableSize; + EnablePush = true; + } + // 4096 is default (0x1 index) - public uint HeaderTableSize { get; set; } = DefaultHeaderTableSize; + public uint HeaderTableSize { get; set; } // 1 is default (true) (0x2 index) - public bool EnablePush { get;set; } = true; + public bool EnablePush { get;set; } // no limit initially (0x3 index) - public uint? MaxConcurrentStreams { get;set; } + public uint? MaxConcurrentStreams { get; set; } // 65,535 is default (0x4 index) - public uint InitialWindowSize { get;set; } = DefaultWindowSize; + public uint InitialWindowSize { get;set; } // 16,384 is default (0x5 index) - public uint MaxFrameSize { get;set; } = DefaultMaxFrameSize; + public uint MaxFrameSize { get;set; } // no limit initially (0x6 index) public uint? MaxHeaderListSize { get;set; } diff --git a/HttpTwo/Http2Stream.cs b/HttpTwo/Http2Stream.cs index c8f13b2..21158f6 100644 --- a/HttpTwo/Http2Stream.cs +++ b/HttpTwo/Http2Stream.cs @@ -73,8 +73,11 @@ public void ProcessReceivedFrames (IFrame frame) flowControlStateManager.IncreaseWindowSize (StreamIdentifer, windowUpdateFrame.WindowSizeIncrement); } - // Raise the event - OnFrameReceived?.Invoke (frame); + // Raise the event + if (OnFrameReceived != null) + { + OnFrameReceived.Invoke(frame); + } } public void ProcessSentFrame (IFrame frame) @@ -120,8 +123,11 @@ public void ProcessSentFrame (IFrame frame) flowControlStateManager.DecreaseWindowSize (frame.StreamIdentifier, windowDecrement); } - // Raise the event - OnFrameSent?.Invoke (frame); + // Raise the event + if (OnFrameSent != null) + { + OnFrameSent.Invoke(frame); + } } public delegate void FrameReceivedDelegate (IFrame frame); diff --git a/HttpTwo/Internal/FlowControlManager.cs b/HttpTwo/Internal/FlowControlManager.cs index d4cf6f4..27c07f4 100644 --- a/HttpTwo/Internal/FlowControlManager.cs +++ b/HttpTwo/Internal/FlowControlManager.cs @@ -57,8 +57,11 @@ public void IncreaseWindowSize (uint streamIdentifier, uint increaseByAmount) windowSizes [streamIdentifier] = newAmount; - // Fire the event - FlowControlWindowSizeIncreased?.Invoke (streamIdentifier, increaseByAmount); + // Fire the event + if (FlowControlWindowSizeIncreased != null) + { + FlowControlWindowSizeIncreased.Invoke(streamIdentifier, increaseByAmount); + } } } } From 8db04c1a4b7c6ccfcac6d153e00eab36c4108c53 Mon Sep 17 00:00:00 2001 From: "Neio.zhou" Date: Mon, 10 Jul 2017 06:50:50 +0000 Subject: [PATCH 02/15] Fix the usage of dynamic table for HPACK --- HttpTwo/Http2Client.cs | 617 ++++++++++++++++---------------- HttpTwo/Http2Connection.cs | 701 +++++++++++++++++++------------------ HttpTwo/Internal/Util.cs | 8 +- 3 files changed, 665 insertions(+), 661 deletions(-) diff --git a/HttpTwo/Http2Client.cs b/HttpTwo/Http2Client.cs index e109f6a..094a980 100644 --- a/HttpTwo/Http2Client.cs +++ b/HttpTwo/Http2Client.cs @@ -1,307 +1,310 @@ -using System; -using System.Collections.Generic; -using System.Collections.Specialized; -using System.IO; -using System.Linq; -using System.Net; -using System.Net.Http; -using System.Security.Cryptography.X509Certificates; -using System.Threading; -using System.Threading.Tasks; -using HttpTwo.Internal; - -namespace HttpTwo -{ - public class Http2Client - { - readonly Http2Connection connection; - readonly IStreamManager streamManager; - IFlowControlManager flowControlManager; - - public Http2Client (Uri uri, X509CertificateCollection certificates = null, IStreamManager streamManager = null, IFlowControlManager flowControlManager = null) - : this (new Http2ConnectionSettings (uri, certificates), streamManager, flowControlManager) - { - } - - public Http2Client (string url, X509CertificateCollection certificates = null, IStreamManager streamManager = null, IFlowControlManager flowControlManager = null) - : this (new Http2ConnectionSettings (url, certificates), streamManager, flowControlManager) - { - } - - public Http2Client (Http2ConnectionSettings connectionSettings, IStreamManager streamManager = null, IFlowControlManager flowControlManager = null) - { - this.flowControlManager = flowControlManager ?? new FlowControlManager (); - this.streamManager = streamManager ?? new StreamManager (this.flowControlManager); - this.ConnectionSettings = connectionSettings; - - connection = new Http2Connection (ConnectionSettings, this.streamManager, this.flowControlManager); - } - - public Http2ConnectionSettings ConnectionSettings { get; private set; } - - public IStreamManager StreamManager { get { return streamManager; } } - public IFlowControlManager FlowControlManager { get { return flowControlManager; } } - - public async Task Connect () - { - await connection.Connect ().ConfigureAwait (false); - } - - public async Task Post (Uri uri, NameValueCollection headers = null, byte[] data = null) - { - return await Send (uri, HttpMethod.Post, headers, data).ConfigureAwait (false); - } - - public async Task Post (Uri uri, NameValueCollection headers = null, Stream data = null) - { - return await Send (uri, HttpMethod.Post, headers, data).ConfigureAwait (false); - } - - public async Task Send (Uri uri, HttpMethod method, NameValueCollection headers = null, byte[] data = null) - { - MemoryStream ms = null; - - if (data != null) - ms = new MemoryStream (data); - - return await Send (new CancellationToken (), uri, method, headers, ms).ConfigureAwait (false); - } - - public async Task Send (Uri uri, HttpMethod method, NameValueCollection headers = null, Stream data = null) - { - return await Send (new CancellationToken (), uri, method, headers, data).ConfigureAwait (false); - } - - public async Task Send (CancellationToken cancelToken, Uri uri, HttpMethod method, NameValueCollection headers = null, Stream data = null) - { - var semaphoreClose = new SemaphoreSlim(0); - - await connection.Connect ().ConfigureAwait (false); - - var stream = await streamManager.Get ().ConfigureAwait (false); - stream.OnFrameReceived += async (frame) => - { - // Check for an end of stream state - if (stream.State == StreamState.HalfClosedRemote || stream.State == StreamState.Closed) - semaphoreClose.Release (); - }; - - var sentEndOfStream = false; - - var allHeaders = new NameValueCollection (); - allHeaders.Add (":method", method.Method.ToUpperInvariant ()); - allHeaders.Add (":path", uri.PathAndQuery); - allHeaders.Add (":scheme", uri.Scheme); - allHeaders.Add (":authority", uri.Authority); - if (headers != null && headers.Count > 0) - allHeaders.Add (headers); - - var headerData = Util.PackHeaders (allHeaders, connection.Settings.HeaderTableSize); - - var numFrames = (int)Math.Ceiling ((double)headerData.Length / (double)connection.Settings.MaxFrameSize); - - for (int i = 0; i < numFrames; i++) { - // First item is headers frame, others are continuation - IFrameContainsHeaders frame = (i == 0) ? - (IFrameContainsHeaders)new HeadersFrame (stream.StreamIdentifer) - : (IFrameContainsHeaders)new ContinuationFrame (stream.StreamIdentifer); - - // Set end flag if this is the last item - if (i == numFrames - 1) - frame.EndHeaders = true; - - var maxFrameSize = connection.Settings.MaxFrameSize; - - var amt = maxFrameSize; - if ( i * maxFrameSize + amt > headerData.Length) - amt = (uint)headerData.Length - (uint)(i * maxFrameSize); - frame.HeaderBlockFragment = new byte[amt]; - Array.Copy (headerData, i * maxFrameSize, frame.HeaderBlockFragment, 0, amt); - - // If we won't s end - if (data == null && frame is HeadersFrame) { - sentEndOfStream = true; - (frame as HeadersFrame).EndStream = true; - } - - await connection.QueueFrame (frame).ConfigureAwait (false); - } - - if (data != null) { - var supportsPosLength = true; // Keep track of if we threw exceptions trying pos/len of stream - - // Break stream up into data frames within allowed size - var dataFrameBuffer = new byte[connection.Settings.MaxFrameSize]; - while (true) { - - var rd = await data.ReadAsync (dataFrameBuffer, 0, dataFrameBuffer.Length).ConfigureAwait (false); - - if (rd <= 0) - break; - - // Make a new data frame with a buffer the size we read - var dataFrame = new DataFrame (stream.StreamIdentifer); - dataFrame.Data = new byte[rd]; - // Copy over the data we read - Array.Copy(dataFrameBuffer, 0, dataFrame.Data, 0, rd); - - try { - // See if the stream supports Length / Position to try and detect EOS - // we also want to see if we previously had an exception trying this - // and not try again if we did, since throwing exceptions every single - // read operation is wasteful - if (supportsPosLength && data.Position >= data.Length) { - dataFrame.EndStream = true; - sentEndOfStream = true; - } - } catch { - supportsPosLength = false; - sentEndOfStream = false; - } - - await connection.QueueFrame (dataFrame).ConfigureAwait (false); - } - } - - // Send an empty frame with end of stream flag - if (!sentEndOfStream) - await connection.QueueFrame(new DataFrame(stream.StreamIdentifer) { EndStream = true }).ConfigureAwait(false); - - if (!await semaphoreClose.WaitAsync (ConnectionSettings.ConnectionTimeout, cancelToken).ConfigureAwait (false)) - throw new TimeoutException (); - - var responseData = new List (); - var rxHeaderData = new List (); - - foreach (var f in stream.ReceivedFrames) { - if (f.Type == FrameType.Headers || f.Type == FrameType.Continuation) { - // Get the header data and add it to our buffer - var fch = (IFrameContainsHeaders)f; - if (fch.HeaderBlockFragment != null && fch.HeaderBlockFragment.Length > 0) - rxHeaderData.AddRange (fch.HeaderBlockFragment); - } else if (f.Type == FrameType.PushPromise) { - // TODO: In the future we need to implement PushPromise beyond grabbing header data - var fch = (IFrameContainsHeaders)f; - if (fch.HeaderBlockFragment != null && fch.HeaderBlockFragment.Length > 0) - rxHeaderData.AddRange (fch.HeaderBlockFragment); - } else if (f.Type == FrameType.Data) { - responseData.AddRange ((f as DataFrame).Data); - } else if (f.Type == FrameType.GoAway) { - var fga = f as GoAwayFrame; - if (fga != null && fga.AdditionalDebugData != null && fga.AdditionalDebugData.Length > 0) - responseData.AddRange (fga.AdditionalDebugData); - } - } - - var responseHeaders = Util.UnpackHeaders (rxHeaderData.ToArray (), - connection.Settings.MaxHeaderListSize.HasValue ? (int)connection.Settings.MaxHeaderListSize.Value : 8192, - (int)connection.Settings.HeaderTableSize); - - var strStatus = "500"; - if (responseHeaders [":status"] != null) - strStatus = responseHeaders [":status"]; - - var statusCode = HttpStatusCode.OK; - Enum.TryParse (strStatus, out statusCode); - - // Remove the stream from being tracked since we're done with it - await streamManager.Cleanup (stream.StreamIdentifer).ConfigureAwait (false); - - // Send a WINDOW_UPDATE frame to release our stream's data count - // TODO: Eventually need to do this on the stream itself too (if it's open) - await connection.FreeUpWindowSpace ().ConfigureAwait (false); - - return new Http2Response { - Status = statusCode, - Stream = stream, - Headers = responseHeaders, - Body = responseData.ToArray () - }; - } - - public async Task Ping (byte[] opaqueData, CancellationToken cancelToken) - { - if (opaqueData == null || opaqueData.Length <= 0) - throw new ArgumentNullException ("opaqueData"); - - await connection.Connect ().ConfigureAwait (false); - - var semaphoreWait = new SemaphoreSlim (0); - var opaqueDataMatch = false; - - var connectionStream = await streamManager.Get (0).ConfigureAwait (false); - - Http2Stream.FrameReceivedDelegate frameRxAction; - frameRxAction = new Http2Stream.FrameReceivedDelegate (frame => { - var pf = frame as PingFrame; - if (pf != null) { - opaqueDataMatch = pf.Ack && pf.OpaqueData != null && pf.OpaqueData.SequenceEqual (opaqueData); - semaphoreWait.Release (); - } - }); - - // Wire up the event to listen for ping response - connectionStream.OnFrameReceived += frameRxAction; - - // Construct ping request - var pingFrame = new PingFrame (); - pingFrame.OpaqueData = new byte[opaqueData.Length]; - opaqueData.CopyTo (pingFrame.OpaqueData, 0); - - // Send ping - await connection.QueueFrame (pingFrame).ConfigureAwait (false); - - // Wait for either a ping response or timeout - await semaphoreWait.WaitAsync (cancelToken).ConfigureAwait (false); - - // Cleanup the event - connectionStream.OnFrameReceived -= frameRxAction; - - return opaqueDataMatch; - } - - public async Task Disconnect () - { - return await Disconnect (Timeout.InfiniteTimeSpan).ConfigureAwait (false); - } - - public async Task Disconnect (TimeSpan timeout) - { - var connectionStream = await streamManager.Get (0).ConfigureAwait (false); - - var semaphoreWait = new SemaphoreSlim (0); - var cancelTokenSource = new CancellationTokenSource (); - var sentGoAway = false; - - var sentDelegate = new Http2Stream.FrameSentDelegate (frame => { - if (frame.Type == FrameType.GoAway) { - sentGoAway = true; - semaphoreWait.Release (); - } - }); - - connectionStream.OnFrameSent += sentDelegate; - - await connection.QueueFrame (new GoAwayFrame ()).ConfigureAwait (false); - - if (timeout != Timeout.InfiniteTimeSpan) - cancelTokenSource.CancelAfter (timeout); - - await semaphoreWait.WaitAsync (cancelTokenSource.Token).ConfigureAwait (false); - - connectionStream.OnFrameSent -= sentDelegate; - - connection.Disconnect (); - - return sentGoAway; - } - - public class Http2Response - { - public HttpStatusCode Status { get; set; } - public Http2Stream Stream { get; set; } - public NameValueCollection Headers { get;set; } - public byte[] Body { get;set; } - } - } -} +using System; +using System.Collections.Generic; +using System.Collections.Specialized; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using HttpTwo.Internal; + +namespace HttpTwo +{ + public class Http2Client + { + readonly Http2Connection connection; + readonly IStreamManager streamManager; + IFlowControlManager flowControlManager; + + public Http2Client (Uri uri, X509CertificateCollection certificates = null, IStreamManager streamManager = null, IFlowControlManager flowControlManager = null) + : this (new Http2ConnectionSettings (uri, certificates), streamManager, flowControlManager) + { + } + + public Http2Client (string url, X509CertificateCollection certificates = null, IStreamManager streamManager = null, IFlowControlManager flowControlManager = null) + : this (new Http2ConnectionSettings (url, certificates), streamManager, flowControlManager) + { + } + + public Http2Client (Http2ConnectionSettings connectionSettings, IStreamManager streamManager = null, IFlowControlManager flowControlManager = null) + { + this.flowControlManager = flowControlManager ?? new FlowControlManager (); + this.streamManager = streamManager ?? new StreamManager (this.flowControlManager); + this.ConnectionSettings = connectionSettings; + + connection = new Http2Connection (ConnectionSettings, this.streamManager, this.flowControlManager); + } + + public Http2ConnectionSettings ConnectionSettings { get; private set; } + + public IStreamManager StreamManager { get { return streamManager; } } + public IFlowControlManager FlowControlManager { get { return flowControlManager; } } + + public async Task Connect () + { + await connection.Connect ().ConfigureAwait (false); + } + + public async Task Post (Uri uri, NameValueCollection headers = null, byte[] data = null) + { + return await Send (uri, HttpMethod.Post, headers, data).ConfigureAwait (false); + } + + public async Task Post (Uri uri, NameValueCollection headers = null, Stream data = null) + { + return await Send (uri, HttpMethod.Post, headers, data).ConfigureAwait (false); + } + + public async Task Send (Uri uri, HttpMethod method, NameValueCollection headers = null, byte[] data = null) + { + MemoryStream ms = null; + + if (data != null) + ms = new MemoryStream (data); + + return await Send (new CancellationToken (), uri, method, headers, ms).ConfigureAwait (false); + } + + public async Task Send (Uri uri, HttpMethod method, NameValueCollection headers = null, Stream data = null) + { + return await Send (new CancellationToken (), uri, method, headers, data).ConfigureAwait (false); + } + + public async Task Send (CancellationToken cancelToken, Uri uri, HttpMethod method, NameValueCollection headers = null, Stream data = null) + { + var semaphoreClose = new SemaphoreSlim(0); + + await connection.Connect ().ConfigureAwait (false); + + var stream = await streamManager.Get ().ConfigureAwait (false); + stream.OnFrameReceived += async (frame) => + { + // Check for an end of stream state + if (stream.State == StreamState.HalfClosedRemote || stream.State == StreamState.Closed) + semaphoreClose.Release (); + }; + + var sentEndOfStream = false; + + var allHeaders = new NameValueCollection (); + allHeaders.Add (":method", method.Method.ToUpperInvariant ()); + allHeaders.Add (":path", uri.PathAndQuery); + allHeaders.Add (":scheme", uri.Scheme); + allHeaders.Add (":authority", uri.Authority); + if (headers != null && headers.Count > 0) + allHeaders.Add (headers); + + var headerData = Util.PackHeaders (allHeaders, connection.Settings.HeaderTableSize); + + var numFrames = (int)Math.Ceiling ((double)headerData.Length / (double)connection.Settings.MaxFrameSize); + + for (int i = 0; i < numFrames; i++) { + // First item is headers frame, others are continuation + IFrameContainsHeaders frame = (i == 0) ? + (IFrameContainsHeaders)new HeadersFrame (stream.StreamIdentifer) + : (IFrameContainsHeaders)new ContinuationFrame (stream.StreamIdentifer); + + // Set end flag if this is the last item + if (i == numFrames - 1) + frame.EndHeaders = true; + + var maxFrameSize = connection.Settings.MaxFrameSize; + + var amt = maxFrameSize; + if ( i * maxFrameSize + amt > headerData.Length) + amt = (uint)headerData.Length - (uint)(i * maxFrameSize); + frame.HeaderBlockFragment = new byte[amt]; + Array.Copy (headerData, i * maxFrameSize, frame.HeaderBlockFragment, 0, amt); + + // If we won't s end + if (data == null && frame is HeadersFrame) { + sentEndOfStream = true; + (frame as HeadersFrame).EndStream = true; + } + + await connection.QueueFrame (frame).ConfigureAwait (false); + } + + if (data != null) { + var supportsPosLength = true; // Keep track of if we threw exceptions trying pos/len of stream + + // Break stream up into data frames within allowed size + var dataFrameBuffer = new byte[connection.Settings.MaxFrameSize]; + while (true) { + + var rd = await data.ReadAsync (dataFrameBuffer, 0, dataFrameBuffer.Length).ConfigureAwait (false); + + if (rd <= 0) + break; + + // Make a new data frame with a buffer the size we read + var dataFrame = new DataFrame (stream.StreamIdentifer); + dataFrame.Data = new byte[rd]; + // Copy over the data we read + Array.Copy(dataFrameBuffer, 0, dataFrame.Data, 0, rd); + + try { + // See if the stream supports Length / Position to try and detect EOS + // we also want to see if we previously had an exception trying this + // and not try again if we did, since throwing exceptions every single + // read operation is wasteful + if (supportsPosLength && data.Position >= data.Length) { + dataFrame.EndStream = true; + sentEndOfStream = true; + } + } catch { + supportsPosLength = false; + sentEndOfStream = false; + } + + await connection.QueueFrame (dataFrame).ConfigureAwait (false); + } + } + + // Send an empty frame with end of stream flag + if (!sentEndOfStream) + await connection.QueueFrame(new DataFrame(stream.StreamIdentifer) { EndStream = true }).ConfigureAwait(false); + + if (!await semaphoreClose.WaitAsync (ConnectionSettings.ConnectionTimeout, cancelToken).ConfigureAwait (false)) + throw new TimeoutException (); + + var responseData = new List (); + var rxHeaderData = new List (); + + foreach (var f in stream.ReceivedFrames) { + if (f.Type == FrameType.Headers || f.Type == FrameType.Continuation) { + // Get the header data and add it to our buffer + var fch = (IFrameContainsHeaders)f; + if (fch.HeaderBlockFragment != null && fch.HeaderBlockFragment.Length > 0) + rxHeaderData.AddRange (fch.HeaderBlockFragment); + } else if (f.Type == FrameType.PushPromise) { + // TODO: In the future we need to implement PushPromise beyond grabbing header data + var fch = (IFrameContainsHeaders)f; + if (fch.HeaderBlockFragment != null && fch.HeaderBlockFragment.Length > 0) + rxHeaderData.AddRange (fch.HeaderBlockFragment); + } else if (f.Type == FrameType.Data) { + responseData.AddRange ((f as DataFrame).Data); + } else if (f.Type == FrameType.GoAway) { + var fga = f as GoAwayFrame; + if (fga != null && fga.AdditionalDebugData != null && fga.AdditionalDebugData.Length > 0) + responseData.AddRange (fga.AdditionalDebugData); + } + } + + if (connection.Decoder == null) + { + connection.Decoder = new HPack.Decoder(connection.Settings.MaxHeaderListSize.HasValue ? (int)connection.Settings.MaxHeaderListSize.Value : 8192, (int)connection.Settings.HeaderTableSize); + } + + var responseHeaders = Util.UnpackHeaders(connection.Decoder, rxHeaderData.ToArray()); + + var strStatus = "500"; + if (responseHeaders [":status"] != null) + strStatus = responseHeaders [":status"]; + + var statusCode = HttpStatusCode.OK; + Enum.TryParse (strStatus, out statusCode); + + // Remove the stream from being tracked since we're done with it + await streamManager.Cleanup (stream.StreamIdentifer).ConfigureAwait (false); + + // Send a WINDOW_UPDATE frame to release our stream's data count + // TODO: Eventually need to do this on the stream itself too (if it's open) + await connection.FreeUpWindowSpace ().ConfigureAwait (false); + + return new Http2Response { + Status = statusCode, + Stream = stream, + Headers = responseHeaders, + Body = responseData.ToArray () + }; + } + + public async Task Ping (byte[] opaqueData, CancellationToken cancelToken) + { + if (opaqueData == null || opaqueData.Length <= 0) + throw new ArgumentNullException ("opaqueData"); + + await connection.Connect ().ConfigureAwait (false); + + var semaphoreWait = new SemaphoreSlim (0); + var opaqueDataMatch = false; + + var connectionStream = await streamManager.Get (0).ConfigureAwait (false); + + Http2Stream.FrameReceivedDelegate frameRxAction; + frameRxAction = new Http2Stream.FrameReceivedDelegate (frame => { + var pf = frame as PingFrame; + if (pf != null) { + opaqueDataMatch = pf.Ack && pf.OpaqueData != null && pf.OpaqueData.SequenceEqual (opaqueData); + semaphoreWait.Release (); + } + }); + + // Wire up the event to listen for ping response + connectionStream.OnFrameReceived += frameRxAction; + + // Construct ping request + var pingFrame = new PingFrame (); + pingFrame.OpaqueData = new byte[opaqueData.Length]; + opaqueData.CopyTo (pingFrame.OpaqueData, 0); + + // Send ping + await connection.QueueFrame (pingFrame).ConfigureAwait (false); + + // Wait for either a ping response or timeout + await semaphoreWait.WaitAsync (cancelToken).ConfigureAwait (false); + + // Cleanup the event + connectionStream.OnFrameReceived -= frameRxAction; + + return opaqueDataMatch; + } + + public async Task Disconnect () + { + return await Disconnect (Timeout.InfiniteTimeSpan).ConfigureAwait (false); + } + + public async Task Disconnect (TimeSpan timeout) + { + var connectionStream = await streamManager.Get (0).ConfigureAwait (false); + + var semaphoreWait = new SemaphoreSlim (0); + var cancelTokenSource = new CancellationTokenSource (); + var sentGoAway = false; + + var sentDelegate = new Http2Stream.FrameSentDelegate (frame => { + if (frame.Type == FrameType.GoAway) { + sentGoAway = true; + semaphoreWait.Release (); + } + }); + + connectionStream.OnFrameSent += sentDelegate; + + await connection.QueueFrame (new GoAwayFrame ()).ConfigureAwait (false); + + if (timeout != Timeout.InfiniteTimeSpan) + cancelTokenSource.CancelAfter (timeout); + + await semaphoreWait.WaitAsync (cancelTokenSource.Token).ConfigureAwait (false); + + connectionStream.OnFrameSent -= sentDelegate; + + connection.Disconnect (); + + return sentGoAway; + } + + public class Http2Response + { + public HttpStatusCode Status { get; set; } + public Http2Stream Stream { get; set; } + public NameValueCollection Headers { get;set; } + public byte[] Body { get;set; } + } + } +} diff --git a/HttpTwo/Http2Connection.cs b/HttpTwo/Http2Connection.cs index 73a26ae..1d24404 100644 --- a/HttpTwo/Http2Connection.cs +++ b/HttpTwo/Http2Connection.cs @@ -1,350 +1,351 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Net; -using System.Net.Security; -using System.Net.Sockets; -using System.Security.Cryptography.X509Certificates; -using System.Threading; -using System.Threading.Tasks; -using HttpTwo.Internal; - -namespace HttpTwo -{ - public class Http2ConnectionSettings - { - public Http2ConnectionSettings (string url, X509CertificateCollection certificates = null) - : this (new Uri (url), certificates) - { - } - - public Http2ConnectionSettings (Uri uri, X509CertificateCollection certificates = null) - : this (uri.Host, (uint)uri.Port, uri.Scheme == Uri.UriSchemeHttps, certificates) - { - } - - public Http2ConnectionSettings (string host, uint port = 80, bool useTls = false, X509CertificateCollection certificates = null) - { - Host = host; - Port = port; - UseTls = useTls; - Certificates = certificates; - ConnectionTimeout = TimeSpan.FromSeconds (60); - DisablePushPromise = false; - } - - public string Host { get; private set; } - public uint Port { get; private set; } - public bool UseTls { get; private set; } - public X509CertificateCollection Certificates { get; private set; } - - public TimeSpan ConnectionTimeout { get; set; } - public bool DisablePushPromise { get; set; } - } - - public class Http2Connection - { - public const string ConnectionPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; - - static Http2Connection () - { - ServicePointManager.ServerCertificateValidationCallback += - (sender, certificate, chain, sslPolicyErrors) => true; - } - - public Http2Connection (Http2ConnectionSettings connectionSettings, IStreamManager streamManager, IFlowControlManager flowControlManager) - { - this.flowControlManager = flowControlManager; - this.streamManager = streamManager; - - ConnectionSettings = connectionSettings; - Settings = new Http2Settings (); - - queue = new FrameQueue (flowControlManager); - } - - public Http2Settings Settings { get; private set; } - public Http2ConnectionSettings ConnectionSettings { get; private set; } - - IFlowControlManager flowControlManager; - readonly IStreamManager streamManager; - readonly FrameQueue queue; - - TcpClient tcp; - Stream clientStream; - SslStream sslStream; - - long receivedDataCount = 0; - public uint ReceivedDataCount { - get { return (uint)Interlocked.Read (ref receivedDataCount); } - } - - public async Task Connect () - { - if (IsConnected ()) - return; - - tcp = new TcpClient (); - - // Disable Nagle for HTTP/2 - tcp.NoDelay = true; - - await tcp.ConnectAsync (ConnectionSettings.Host, (int)ConnectionSettings.Port).ConfigureAwait (false); - - if (ConnectionSettings.UseTls) { - sslStream = new SslStream (tcp.GetStream (), false, - (sender, certificate, chain, sslPolicyErrors) => true); - - await sslStream.AuthenticateAsClientAsync ( - ConnectionSettings.Host, - ConnectionSettings.Certificates ?? new X509CertificateCollection (), - System.Security.Authentication.SslProtocols.Tls12, - false).ConfigureAwait (false); - - clientStream = sslStream; - } else { - clientStream = tcp.GetStream (); - } - - // Ensure we have a size for the stream '0' - flowControlManager.GetWindowSize (0); - - // Send out preface data - var prefaceData = System.Text.Encoding.ASCII.GetBytes (ConnectionPreface); - await clientStream.WriteAsync (prefaceData, 0, prefaceData.Length).ConfigureAwait (false); - await clientStream.FlushAsync ().ConfigureAwait (false); - - // Start reading the stream on another thread - var readTask = Task.Factory.StartNew (() => { - try { read (); } - catch (Exception ex) { - Log.Debug ("Read error: " + ex); - Disconnect (); - } - }, TaskCreationOptions.LongRunning); - - readTask.ContinueWith (t => { - // TODO: Handle the error - Disconnect (); - }, TaskContinuationOptions.OnlyOnFaulted).Forget (); - - // Start a thread to handle writing queued frames to the stream - var writeTask = Task.Factory.StartNew(write, TaskCreationOptions.LongRunning); - writeTask.ContinueWith (t => { - // TODO: Handle the error - Disconnect (); - }, TaskContinuationOptions.OnlyOnFaulted).Forget (); - - // Send initial blank settings frame - var s = new SettingsFrame (); - if (ConnectionSettings.DisablePushPromise) - s.EnablePush = false; - - await QueueFrame (s).ConfigureAwait (false); - } - - public void Disconnect () - { - // complete the blocking collection - queue.Complete (); - - // We want to clean up the connection here so let's just try to close/dispose - // everything - - // Analysis disable EmptyGeneralCatchClause - try { clientStream.Close (); } catch { } - try { clientStream.Dispose (); } catch { } - - if (ConnectionSettings.UseTls && sslStream != null) { - try { sslStream.Close (); } catch { } - try { sslStream.Dispose (); } catch { } - } - - try { tcp.Client.Shutdown (SocketShutdown.Both); } catch { } - try { tcp.Client.Dispose (); } catch { } - - try { tcp.Close (); } catch { } - // Analysis restore EmptyGeneralCatchClause - - tcp = null; - sslStream = null; - clientStream = null; - } - - bool IsConnected () - { - if (tcp == null || clientStream == null || tcp.Client == null) - return false; - - if (!tcp.Connected || !tcp.Client.Connected) - return false; - - if (!tcp.Client.Poll (1000, SelectMode.SelectRead) - || !tcp.Client.Poll (1000, SelectMode.SelectWrite)) - return false; - - return true; - } - - readonly SemaphoreSlim lockWrite = new SemaphoreSlim (1); - - public async Task QueueFrame (IFrame frame) - { - await queue.Enqueue (frame).ConfigureAwait (false); - } - - public async Task FreeUpWindowSpace () - { - var sizeToFree = Interlocked.Exchange (ref receivedDataCount, 0); - - if (sizeToFree <= 0) - return; - - await QueueFrame (new WindowUpdateFrame { - StreamIdentifier = 0, - WindowSizeIncrement = (uint)sizeToFree - }).ConfigureAwait (false); - } - - readonly List buffer = new List (); - - async void read () - { - int rx; - byte[] b = new byte[4096]; - - while (true) { - - try { - rx = await clientStream.ReadAsync(b, 0, b.Length).ConfigureAwait (false); - } catch { - rx = -1; - } - - if (rx > 0) { - // Add all the bytes read into our buffer list - for (int i = 0; i < rx; i++) - buffer.Add (b [i]); - - while (true) - { - // We need at least 9 bytes to process the frame - // 9 octets is the frame header length - if (buffer.Count < 9) - break; - - // Find out the frame length - // which is a 24 bit uint, so we need to convert this as c# uint is 32 bit - var flen = new byte[4]; - flen [0] = 0x0; - flen [1] = buffer.ElementAt (0); - flen [2] = buffer.ElementAt (1); - flen [3] = buffer.ElementAt (2); - - var frameLength = BitConverter.ToUInt32 (flen.EnsureBigEndian (), 0); - - // If we are expecting a payload that's bigger than what's in our buffer - // we should keep reading from the stream - if (buffer.Count - 9 < frameLength) - break; - - // If we made it this far, the buffer has all the data we need, let's get it out to process - var data = buffer.GetRange (0, (int)frameLength + 9).ToArray (); - // remove the processed info from the buffer - buffer.RemoveRange (0, (int)frameLength + 9); - - // Get the Frame Type so we can instantiate the right subclass - var frameType = data [3]; // 4th byte in frame header is TYPE - - // we need to turn the stream id into a uint - var frameStreamIdData = new byte[4]; - Array.Copy (data, 5, frameStreamIdData, 0, 4); - uint frameStreamId = Util.ConvertFromUInt31 (frameStreamIdData.EnsureBigEndian ()); - - // Create a new typed instance of our abstract Frame - var frame = Frame.Create ((FrameType)frameType); - - try { - // Call the specific subclass implementation to parse - frame.Parse (data); - } catch (Exception ex) { - Log.Error ("Parsing Frame Failed: {0}", ex); - throw ex; - } - - Log.Debug ("<- {0}", frame); - - // If it's a settings frame, we should note the values and - // return the frame with the Ack flag set - if (frame.Type == FrameType.Settings) { - - var settingsFrame = frame as SettingsFrame; - - // Update our instance of settings with the new data - Settings.UpdateFromFrame (settingsFrame, flowControlManager); - - // See if this was an ack, if not, return an empty - // ack'd settings frame - if (!settingsFrame.Ack) - await QueueFrame (new SettingsFrame { Ack = true }).ConfigureAwait (false); - - } else if (frame.Type == FrameType.Ping) { - - var pingFrame = frame as PingFrame; - // See if we need to respond to the ping request (if it's not-ack'd) - if (!pingFrame.Ack) { - // Ack and respond - pingFrame.Ack = true; - await QueueFrame (pingFrame).ConfigureAwait (false); - } - - } else if (frame.Type == FrameType.Data) { - - // Increment our received data counter - Interlocked.Add (ref receivedDataCount, frame.PayloadLength); - } - - // Some other frame type, just pass it along to the stream - var stream = await streamManager.Get(frameStreamId).ConfigureAwait (false); - stream.ProcessReceivedFrames(frame); - } - - } else { - // Stream was closed, break out of reading loop - break; - } - } - - // Cleanup - Disconnect(); - } - - async Task write () - { - foreach (var frame in queue.GetConsumingEnumerable ()) { - if (frame == null) { - Log.Info ("Null frame dequeued"); - continue; - } - - Log.Debug ("-> {0}", frame); - - var data = frame.ToBytes ().ToArray (); - - await lockWrite.WaitAsync ().ConfigureAwait (false); - - try { - await clientStream.WriteAsync(data, 0, data.Length).ConfigureAwait (false); - await clientStream.FlushAsync().ConfigureAwait (false); - var stream = await streamManager.Get (frame.StreamIdentifier).ConfigureAwait (false); - stream.ProcessSentFrame (frame); - } catch (Exception ex) { - Log.Warn ("Error writing frame: {0}, {1}", frame.StreamIdentifier, ex); - } finally { - lockWrite.Release(); - } - } - } - } -} +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using HttpTwo.Internal; + +namespace HttpTwo +{ + public class Http2ConnectionSettings + { + public Http2ConnectionSettings (string url, X509CertificateCollection certificates = null) + : this (new Uri (url), certificates) + { + } + + public Http2ConnectionSettings (Uri uri, X509CertificateCollection certificates = null) + : this (uri.Host, (uint)uri.Port, uri.Scheme == Uri.UriSchemeHttps, certificates) + { + } + + public Http2ConnectionSettings (string host, uint port = 80, bool useTls = false, X509CertificateCollection certificates = null) + { + Host = host; + Port = port; + UseTls = useTls; + Certificates = certificates; + ConnectionTimeout = TimeSpan.FromSeconds (60); + DisablePushPromise = false; + } + + public string Host { get; private set; } + public uint Port { get; private set; } + public bool UseTls { get; private set; } + public X509CertificateCollection Certificates { get; private set; } + + public TimeSpan ConnectionTimeout { get; set; } + public bool DisablePushPromise { get; set; } + } + + public class Http2Connection + { + public const string ConnectionPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + + static Http2Connection () + { + ServicePointManager.ServerCertificateValidationCallback += + (sender, certificate, chain, sslPolicyErrors) => true; + } + + public Http2Connection (Http2ConnectionSettings connectionSettings, IStreamManager streamManager, IFlowControlManager flowControlManager) + { + this.flowControlManager = flowControlManager; + this.streamManager = streamManager; + + ConnectionSettings = connectionSettings; + Settings = new Http2Settings (); + + queue = new FrameQueue (flowControlManager); + } + + public Http2Settings Settings { get; private set; } + public Http2ConnectionSettings ConnectionSettings { get; private set; } + + IFlowControlManager flowControlManager; + readonly IStreamManager streamManager; + readonly FrameQueue queue; + + TcpClient tcp; + Stream clientStream; + SslStream sslStream; + + long receivedDataCount = 0; + public uint ReceivedDataCount { + get { return (uint)Interlocked.Read (ref receivedDataCount); } + } + + public async Task Connect () + { + if (IsConnected ()) + return; + + tcp = new TcpClient (); + + // Disable Nagle for HTTP/2 + tcp.NoDelay = true; + + await tcp.ConnectAsync (ConnectionSettings.Host, (int)ConnectionSettings.Port).ConfigureAwait (false); + + if (ConnectionSettings.UseTls) { + sslStream = new SslStream (tcp.GetStream (), false, + (sender, certificate, chain, sslPolicyErrors) => true); + + await sslStream.AuthenticateAsClientAsync ( + ConnectionSettings.Host, + ConnectionSettings.Certificates ?? new X509CertificateCollection (), + System.Security.Authentication.SslProtocols.Tls12, + false).ConfigureAwait (false); + + clientStream = sslStream; + } else { + clientStream = tcp.GetStream (); + } + + // Ensure we have a size for the stream '0' + flowControlManager.GetWindowSize (0); + + // Send out preface data + var prefaceData = System.Text.Encoding.ASCII.GetBytes (ConnectionPreface); + await clientStream.WriteAsync (prefaceData, 0, prefaceData.Length).ConfigureAwait (false); + await clientStream.FlushAsync ().ConfigureAwait (false); + + // Start reading the stream on another thread + var readTask = Task.Factory.StartNew (() => { + try { read (); } + catch (Exception ex) { + Log.Debug ("Read error: " + ex); + Disconnect (); + } + }, TaskCreationOptions.LongRunning); + + readTask.ContinueWith (t => { + // TODO: Handle the error + Disconnect (); + }, TaskContinuationOptions.OnlyOnFaulted).Forget (); + + // Start a thread to handle writing queued frames to the stream + var writeTask = Task.Factory.StartNew(write, TaskCreationOptions.LongRunning); + writeTask.ContinueWith (t => { + // TODO: Handle the error + Disconnect (); + }, TaskContinuationOptions.OnlyOnFaulted).Forget (); + + // Send initial blank settings frame + var s = new SettingsFrame (); + if (ConnectionSettings.DisablePushPromise) + s.EnablePush = false; + + await QueueFrame (s).ConfigureAwait (false); + } + + public void Disconnect () + { + // complete the blocking collection + queue.Complete (); + + // We want to clean up the connection here so let's just try to close/dispose + // everything + + // Analysis disable EmptyGeneralCatchClause + try { clientStream.Close (); } catch { } + try { clientStream.Dispose (); } catch { } + + if (ConnectionSettings.UseTls && sslStream != null) { + try { sslStream.Close (); } catch { } + try { sslStream.Dispose (); } catch { } + } + + try { tcp.Client.Shutdown (SocketShutdown.Both); } catch { } + try { tcp.Client.Dispose (); } catch { } + + try { tcp.Close (); } catch { } + // Analysis restore EmptyGeneralCatchClause + + tcp = null; + sslStream = null; + clientStream = null; + } + + bool IsConnected () + { + if (tcp == null || clientStream == null || tcp.Client == null) + return false; + + if (!tcp.Connected || !tcp.Client.Connected) + return false; + + if (!tcp.Client.Poll (1000, SelectMode.SelectRead) && !tcp.Client.Poll (1000, SelectMode.SelectWrite)) + return false; + + return true; + } + + readonly SemaphoreSlim lockWrite = new SemaphoreSlim (1); + + public HPack.Decoder Decoder { get; set; } + + public async Task QueueFrame (IFrame frame) + { + await queue.Enqueue (frame).ConfigureAwait (false); + } + + public async Task FreeUpWindowSpace () + { + var sizeToFree = Interlocked.Exchange (ref receivedDataCount, 0); + + if (sizeToFree <= 0) + return; + + await QueueFrame (new WindowUpdateFrame { + StreamIdentifier = 0, + WindowSizeIncrement = (uint)sizeToFree + }).ConfigureAwait (false); + } + + readonly List buffer = new List (); + + async void read () + { + int rx; + byte[] b = new byte[4096]; + + while (true) { + + try { + rx = await clientStream.ReadAsync(b, 0, b.Length).ConfigureAwait (false); + } catch { + rx = -1; + } + + if (rx > 0) { + // Add all the bytes read into our buffer list + for (int i = 0; i < rx; i++) + buffer.Add (b [i]); + + while (true) + { + // We need at least 9 bytes to process the frame + // 9 octets is the frame header length + if (buffer.Count < 9) + break; + + // Find out the frame length + // which is a 24 bit uint, so we need to convert this as c# uint is 32 bit + var flen = new byte[4]; + flen [0] = 0x0; + flen [1] = buffer.ElementAt (0); + flen [2] = buffer.ElementAt (1); + flen [3] = buffer.ElementAt (2); + + var frameLength = BitConverter.ToUInt32 (flen.EnsureBigEndian (), 0); + + // If we are expecting a payload that's bigger than what's in our buffer + // we should keep reading from the stream + if (buffer.Count - 9 < frameLength) + break; + + // If we made it this far, the buffer has all the data we need, let's get it out to process + var data = buffer.GetRange (0, (int)frameLength + 9).ToArray (); + // remove the processed info from the buffer + buffer.RemoveRange (0, (int)frameLength + 9); + + // Get the Frame Type so we can instantiate the right subclass + var frameType = data [3]; // 4th byte in frame header is TYPE + + // we need to turn the stream id into a uint + var frameStreamIdData = new byte[4]; + Array.Copy (data, 5, frameStreamIdData, 0, 4); + uint frameStreamId = Util.ConvertFromUInt31 (frameStreamIdData.EnsureBigEndian ()); + + // Create a new typed instance of our abstract Frame + var frame = Frame.Create ((FrameType)frameType); + + try { + // Call the specific subclass implementation to parse + frame.Parse (data); + } catch (Exception ex) { + Log.Error ("Parsing Frame Failed: {0}", ex); + throw ex; + } + + Log.Debug ("<- {0}", frame); + + // If it's a settings frame, we should note the values and + // return the frame with the Ack flag set + if (frame.Type == FrameType.Settings) { + + var settingsFrame = frame as SettingsFrame; + + // Update our instance of settings with the new data + Settings.UpdateFromFrame (settingsFrame, flowControlManager); + + // See if this was an ack, if not, return an empty + // ack'd settings frame + if (!settingsFrame.Ack) + await QueueFrame (new SettingsFrame { Ack = true }).ConfigureAwait (false); + + } else if (frame.Type == FrameType.Ping) { + + var pingFrame = frame as PingFrame; + // See if we need to respond to the ping request (if it's not-ack'd) + if (!pingFrame.Ack) { + // Ack and respond + pingFrame.Ack = true; + await QueueFrame (pingFrame).ConfigureAwait (false); + } + + } else if (frame.Type == FrameType.Data) { + + // Increment our received data counter + Interlocked.Add (ref receivedDataCount, frame.PayloadLength); + } + + // Some other frame type, just pass it along to the stream + var stream = await streamManager.Get(frameStreamId).ConfigureAwait (false); + stream.ProcessReceivedFrames(frame); + } + + } else { + // Stream was closed, break out of reading loop + break; + } + } + + // Cleanup + Disconnect(); + } + + async Task write () + { + foreach (var frame in queue.GetConsumingEnumerable ()) { + if (frame == null) { + Log.Info ("Null frame dequeued"); + continue; + } + + Log.Debug ("-> {0}", frame); + + var data = frame.ToBytes ().ToArray (); + + await lockWrite.WaitAsync ().ConfigureAwait (false); + + try { + await clientStream.WriteAsync(data, 0, data.Length).ConfigureAwait (false); + await clientStream.FlushAsync().ConfigureAwait (false); + var stream = await streamManager.Get (frame.StreamIdentifier).ConfigureAwait (false); + stream.ProcessSentFrame (frame); + } catch (Exception ex) { + Log.Warn ("Error writing frame: {0}, {1}", frame.StreamIdentifier, ex); + } finally { + lockWrite.Release(); + } + } + } + } +} diff --git a/HttpTwo/Internal/Util.cs b/HttpTwo/Internal/Util.cs index aca6d44..2eede65 100644 --- a/HttpTwo/Internal/Util.cs +++ b/HttpTwo/Internal/Util.cs @@ -2,6 +2,7 @@ using System.Collections.Specialized; using System.IO; using System.Threading.Tasks; +using HttpTwo.HPack; namespace HttpTwo.Internal { @@ -60,14 +61,13 @@ public static byte[] PackHeaders (NameValueCollection headers, uint maxHeaderTab } return headerData; - } - - public static NameValueCollection UnpackHeaders (byte[] data, int maxHeaderSize, int maxHeaderTableSize) + } + + public static NameValueCollection UnpackHeaders(Decoder hpackDecoder, byte[] data) { var headers = new NameValueCollection (); // Decode Header Block Fragments - var hpackDecoder = new HPack.Decoder (maxHeaderSize, maxHeaderTableSize); using(var binReader = new BinaryReader (new MemoryStream (data))) { hpackDecoder.Decode(binReader, (name, value, sensitive) => From 6955b3b53c67e9f680f6438159c3e31122133328 Mon Sep 17 00:00:00 2001 From: "Neio.zhou" Date: Mon, 10 Jul 2017 08:40:06 +0000 Subject: [PATCH 03/15] Fixed the usage of dynamic table for HPACK, handle both Encoder & Decoder. Fixed the connection lost & reconnect handling. --- HttpTwo/Http2Client.cs | 43 +++-- HttpTwo/Http2Connection.cs | 29 +++- HttpTwo/Http2Stream.cs | 273 +++++++++++++++--------------- HttpTwo/Internal/StreamManager.cs | 22 ++- HttpTwo/Internal/Util.cs | 201 +++++++++++----------- 5 files changed, 300 insertions(+), 268 deletions(-) diff --git a/HttpTwo/Http2Client.cs b/HttpTwo/Http2Client.cs index 094a980..c9ce321 100644 --- a/HttpTwo/Http2Client.cs +++ b/HttpTwo/Http2Client.cs @@ -14,7 +14,7 @@ namespace HttpTwo { public class Http2Client { - readonly Http2Connection connection; + Http2Connection connection; readonly IStreamManager streamManager; IFlowControlManager flowControlManager; @@ -33,8 +33,6 @@ public Http2Client (Http2ConnectionSettings connectionSettings, IStreamManager s this.flowControlManager = flowControlManager ?? new FlowControlManager (); this.streamManager = streamManager ?? new StreamManager (this.flowControlManager); this.ConnectionSettings = connectionSettings; - - connection = new Http2Connection (ConnectionSettings, this.streamManager, this.flowControlManager); } public Http2ConnectionSettings ConnectionSettings { get; private set; } @@ -42,9 +40,21 @@ public Http2Client (Http2ConnectionSettings connectionSettings, IStreamManager s public IStreamManager StreamManager { get { return streamManager; } } public IFlowControlManager FlowControlManager { get { return flowControlManager; } } - public async Task Connect () + public async Task Connect() { - await connection.Connect ().ConfigureAwait (false); + if (connection != null && !connection.IsConnected()) + { + connection.Disconnect(); + connection = null; + } + + if (connection == null) + { + streamManager.Reset(); + connection = new Http2Connection(ConnectionSettings, this.streamManager, this.flowControlManager); + await connection.Connect().ConfigureAwait(false); + } + } public async Task Post (Uri uri, NameValueCollection headers = null, byte[] data = null) @@ -76,14 +86,19 @@ public async Task Send (CancellationToken cancelToken, Uri uri, H { var semaphoreClose = new SemaphoreSlim(0); - await connection.Connect ().ConfigureAwait (false); + await Connect(); + List