switch to Fleck for WebSockets

This commit is contained in:
Christopher Willis-Ford 2022-07-11 14:58:08 -07:00
parent 8be1d8b021
commit 172fc2c534
9 changed files with 80 additions and 263 deletions

View file

@ -7,10 +7,10 @@ namespace ScratchLink.BLE;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Fleck;
using Microsoft.Extensions.DependencyInjection;
using ScratchLink.Extensions;
using ScratchLink.JsonRpc;
@ -25,9 +25,9 @@ internal abstract class BLESession<TUUID> : Session
/// <summary>
/// Initializes a new instance of the <see cref="BLESession{TUUID}"/> class.
/// </summary>
/// <inheritdoc cref="Session.Session(WebSocketContext)"/>
public BLESession(WebSocketContext context)
: base(context)
/// <inheritdoc cref="Session.Session(IWebSocketConnection)"/>
public BLESession(IWebSocketConnection webSocket)
: base(webSocket)
{
this.GattHelpers = ScratchLinkApp.Current.Services.GetService<GattHelpers<TUUID>>();
this.AllowedServices = new ();

View file

@ -1,85 +0,0 @@
// <copyright file="WebSocketExtensions.cs" company="Scratch Foundation">
// Copyright (c) Scratch Foundation. All rights reserved.
// </copyright>
namespace ScratchLink.Extensions;
using System;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// This class contains custom extensions to System.Net.WebSockets.WebSocket.
/// </summary>
internal static class WebSocketExtensions
{
/// <summary>
/// Sends a string over the WebSocket connection asynchronously.
/// </summary>
/// <param name="ws">Send a string over this WebSocket connection.</param>
/// <param name="message">Send this string over the WebSocket connection.</param>
/// <param name="endOfMessage">True if this string is the last part of a message; false otherwise.</param>
/// <param name="cancellationToken">The cancellation token to use to cancel the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public static Task SendString(this WebSocket ws, string message, bool endOfMessage, CancellationToken cancellationToken)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
var buffer = new ArraySegment<byte>(messageBytes);
return ws.SendAsync(buffer, WebSocketMessageType.Text, endOfMessage, cancellationToken);
}
/// <summary>
/// Sends JSON value over the WebSocket connection asynchronously.
/// </summary>
/// <typeparam name="TValue">The type of the message object.</typeparam>
/// <param name="ws">Send a JSON value over this WebSocket connection.</param>
/// <param name="message">Send this JSON value over the WebSocket connection.</param>
/// <param name="endOfMessage">True if this JSON value is the last part of a message; false otherwise.</param>
/// <param name="cancellationToken">The cancellation token to use to cancel the operation.</param>
/// <param name="serializerOptions">Options to control JSON serialization of the message.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public static Task SendJson<TValue>(this WebSocket ws, TValue message, bool endOfMessage, CancellationToken cancellationToken, JsonSerializerOptions serializerOptions = null)
{
var messageBytes = JsonSerializer.SerializeToUtf8Bytes(message, serializerOptions);
return ws.SendAsync(messageBytes, WebSocketMessageType.Text, endOfMessage, cancellationToken);
}
/// <summary>
/// Reads one whole message from the WebSocket and deposit the whole message onto the provided stream.
/// Reading a whole message may involve multiple calls to the WebSocket's Receive*() method.
/// </summary>
/// <param name="ws">The WebSocket from which to read a message.</param>
/// <param name="dest">The Stream which will receive the completed message.</param>
/// <param name="maxMessageSize">The maximum allowed message size. An attempt to receive a larger message will cause an exception.</param>
/// <param name="cancellationToken">The cancellation token to use to cancel the operation.</param>
/// <returns>A <see cref="Task"/> for the <see cref="WebSocketReceiveResult"/> from the last receive operation which contributing to the message.</returns>
public static async Task<WebSocketReceiveResult> ReceiveMessageToStream(this WebSocket ws, Stream dest, int maxMessageSize, CancellationToken cancellationToken)
{
const int ReceiveBufferChunkSize = 4096;
var bufferBytes = new byte[ReceiveBufferChunkSize];
var buffer = new ArraySegment<byte>(bufferBytes);
WebSocketReceiveResult result;
do
{
result = await ws.ReceiveAsync(buffer, cancellationToken);
if (result.Count > 0)
{
if (result.Count > maxMessageSize)
{
throw new WebSocketException("message too large");
}
dest.Write(bufferBytes, 0, result.Count);
maxMessageSize -= result.Count;
}
}
while (!result.EndOfMessage && !result.CloseStatus.HasValue);
return result;
}
}

View file

@ -5,11 +5,6 @@
namespace ScratchLink;
using System;
using System.Diagnostics;
using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using ScratchLink.BLE;
@ -36,18 +31,7 @@ public class ScratchLinkApp
this.sessionManager = this.Services.GetService<SessionManager>();
this.webSocketListener = new ()
{
OnWebSocketConnection = (webSocketContext) =>
{
this.sessionManager.ClientDidConnect(webSocketContext);
},
OnOtherConnection = (context) =>
{
context.Response.Headers.Clear();
context.Response.SendChunked = false;
context.Response.StatusCode = 426; // Upgrade Required
context.Response.OutputStream.Write(Encoding.UTF8.GetBytes("WebSockets required"));
context.Response.OutputStream.Close();
},
OnWebSocketConnection = this.sessionManager.ClientDidConnect,
};
}
@ -67,25 +51,7 @@ public class ScratchLinkApp
/// </summary>
public void Run()
{
if (!HttpListener.IsSupported)
{
// TODO: pop up an error message
return;
}
this.webSocketListener.Start(new[]
{
string.Format("http://127.0.0.1:{0}/", WebSocketPort),
string.Format("http://localhost:{0}/", WebSocketPort),
});
}
private void HandleSessionDebug(WebSocketContext context)
{
var origin = context.Headers.Get("origin");
var socket = context.WebSocket;
Debug.Print("New connection");
socket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
this.webSocketListener.Start(string.Format("http://0.0.0.0:{0}/", WebSocketPort));
}
/// <summary>

View file

@ -9,10 +9,10 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.WebSockets;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Fleck;
using ScratchLink.Extensions;
using ScratchLink.JsonRpc;
using ScratchLink.JsonRpc.Converters;
@ -43,7 +43,7 @@ internal class Session : IDisposable
private const int MessageSizeLimit = 1024 * 1024; // 1 MiB
private readonly WebSocketContext context;
private readonly IWebSocketConnection webSocket;
private readonly CancellationTokenSource cancellationTokenSource = new ();
private readonly JsonSerializerOptions deserializerOptions = new ()
@ -59,29 +59,19 @@ internal class Session : IDisposable
/// <summary>
/// Initializes a new instance of the <see cref="Session"/> class.
/// </summary>
/// <param name="context">The WebSocket context which this session will use for communication.</param>
public Session(WebSocketContext context)
/// <param name="webSocket">The WebSocket which this session will use for communication.</param>
public Session(IWebSocketConnection webSocket)
{
this.context = context;
this.webSocket = webSocket;
this.Handlers["getVersion"] = this.HandleGetVersion;
this.Handlers["pingMe"] = this.HandlePingMe;
}
/// <summary>
/// Gets a value indicating whether returns true if the backing WebSocket is open for communication or is expected to be in the future.
/// Gets a value indicating whether returns true if the backing WebSocket is open for communication.
/// Returns false if the backing WebSocket is closed or closing, or is in an unknown state.
/// </summary>
public bool IsOpen => this.context.WebSocket.State switch
{
WebSocketState.Connecting => true,
WebSocketState.Open => true,
WebSocketState.CloseSent => false,
WebSocketState.CloseReceived => false,
WebSocketState.Closed => false,
WebSocketState.Aborted => false,
WebSocketState.None => false,
_ => false,
};
public bool IsOpen => this.webSocket.IsAvailable;
/// <summary>
/// Gets a value indicating whether <see cref="Dispose(bool)"/> has already been called and completed on this session.
@ -104,9 +94,20 @@ internal class Session : IDisposable
/// After calling this function, do not use the WebSocket context owned by this session.
/// </summary>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public async Task Run()
public Task Run()
{
await this.CommLoop();
var runCompletion = new TaskCompletionSource<bool>();
this.webSocket.OnClose = () =>
{
runCompletion.TrySetResult(true);
};
this.webSocket.OnMessage = async message =>
{
var jsonMessage = JsonSerializer.Deserialize<JsonRpc2Message>(message, this.deserializerOptions);
await this.HandleMessage(jsonMessage, this.CancellationToken);
};
return runCompletion.Task;
}
/// <summary>
@ -131,7 +132,7 @@ internal class Session : IDisposable
{
if (disposing)
{
this.context.WebSocket.Dispose();
this.webSocket.Close();
this.cancellationTokenSource.Cancel();
this.cancellationTokenSource.Dispose();
}
@ -284,9 +285,9 @@ internal class Session : IDisposable
Params = parameters,
};
var messageBytes = JsonSerializer.SerializeToUtf8Bytes(request);
var message = JsonSerializer.Serialize(request);
await this.SocketSend(messageBytes, cancellationToken);
await this.SocketSend(message, cancellationToken);
}
/// <summary>
@ -331,7 +332,7 @@ internal class Session : IDisposable
Id = requestId,
};
var messageBytes = JsonSerializer.SerializeToUtf8Bytes(request);
var message = JsonSerializer.Serialize(request);
using var pendingRequest = new PendingRequestRecord(timeout, cancellationToken);
@ -340,7 +341,7 @@ internal class Session : IDisposable
try
{
await this.SocketSend(messageBytes, cancellationToken);
await this.SocketSend(message, cancellationToken);
return await pendingRequest.Task;
}
catch (Exception e)
@ -448,11 +449,11 @@ internal class Session : IDisposable
try
{
var responseBytes = JsonSerializer.SerializeToUtf8Bytes(response);
var responseString = JsonSerializer.Serialize(response);
try
{
await this.SocketSend(responseBytes, cancellationToken);
await this.SocketSend(responseString, cancellationToken);
}
catch (Exception e)
{
@ -470,14 +471,12 @@ internal class Session : IDisposable
return this.nextId++;
}
private async Task SocketSend(byte[] messageBytes, CancellationToken cancellationToken)
private async Task SocketSend(string message, CancellationToken cancellationToken)
{
var webSocket = this.context.WebSocket;
await this.websocketSendLock.WaitAsync(cancellationToken);
try
{
await webSocket.SendAsync(messageBytes, WebSocketMessageType.Text, true, cancellationToken);
await this.webSocket.Send(message);
}
finally
{
@ -485,63 +484,19 @@ internal class Session : IDisposable
}
}
private async Task CommLoop()
private async Task HandleMessage(JsonRpc2Message message, CancellationToken cancellationToken)
{
var cancellationToken = this.CancellationToken;
var webSocket = this.context.WebSocket;
try
if (message is JsonRpc2Request request)
{
var messageReadLock = new SemaphoreSlim(1);
var messageBuffer = new MemoryStream();
while (this.IsOpen)
{
JsonRpc2Message message;
await messageReadLock.WaitAsync(cancellationToken);
try
{
messageBuffer.SetLength(0);
var result = await webSocket.ReceiveMessageToStream(messageBuffer, MessageSizeLimit, cancellationToken);
if (messageBuffer.Length > 0)
{
messageBuffer.Position = 0;
message = JsonSerializer.Deserialize<JsonRpc2Message>(messageBuffer, this.deserializerOptions);
}
else
{
Debug.Print("Received an empty message");
continue;
}
}
finally
{
messageReadLock.Release();
}
if (message is JsonRpc2Request request)
{
await this.HandleRequest(request, cancellationToken);
}
else if (message is JsonRpc2Response response)
{
this.HandleResponse(response);
}
else
{
Debug.Print("Received a message which was not recognized as a Request or Response");
}
}
await this.HandleRequest(request, cancellationToken);
}
catch (Exception e)
else if (message is JsonRpc2Response response)
{
Debug.Print($"Session ended due to exception: {e}");
this.HandleResponse(response);
}
finally
else
{
if (this.IsOpen)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
}
Debug.Print("Received a message which was not recognized as a Request or Response");
}
}

View file

@ -6,8 +6,8 @@ namespace ScratchLink;
using System;
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Threading.Tasks;
using Fleck;
/// <summary>
/// This class connects a WebSocket to the appropriate session type and tracks the collection of active sessions.
@ -32,12 +32,12 @@ internal abstract class SessionManager
/// <summary>
/// Call this with a new connection context to ask the SessionManager to build and manage a session for it.
/// </summary>
/// <param name="webSocketContext">The WebSocket context which the SessionManager should adopt and connect to a session.</param>
public void ClientDidConnect(WebSocketContext webSocketContext)
/// <param name="webSocket">The WebSocket which the SessionManager should adopt and connect to a session.</param>
public void ClientDidConnect(IWebSocketConnection webSocket)
{
Task.Run(async () =>
{
using var session = this.MakeNewSession(webSocketContext);
using var session = this.MakeNewSession(webSocket);
this.sessions.TryAdd(session, true);
this.ActiveSessionCountChanged?.Invoke(this, EventArgs.Empty);
try
@ -55,7 +55,7 @@ internal abstract class SessionManager
/// <summary>
/// Create a new Session object to handle a new WebSocket connection.
/// </summary>
/// <param name="webSocketContext">Create a Session to handle this connection.</param>
/// <param name="webSocket">Create a Session to handle this connection.</param>
/// <returns>A new Session object connected to the provided context.</returns>
protected abstract Session MakeNewSession(WebSocketContext webSocketContext);
protected abstract Session MakeNewSession(IWebSocketConnection webSocket);
}

View file

@ -6,75 +6,55 @@ namespace ScratchLink;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.WebSockets;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Fleck;
/// <summary>
/// Listen for WebSocket connections and direct them to service handlers.
/// </summary>
internal class WebSocketListener
{
private readonly HttpListener listener = new ();
private readonly CancellationTokenSource cts = new ();
/// <summary>
/// Gets a value indicating whether WebSocketListener is supported in the current environment.
/// </summary>
public static bool IsSupported => HttpListener.IsSupported;
private WebSocketServer server;
/// <summary>
/// Gets or sets the action which will be called when the listener receives a WebSocket connection.
/// </summary>
public Action<WebSocketContext> OnWebSocketConnection { get; set; }
/// <summary>
/// Gets or sets the action which will be called when the listener receives a non-WebSocket connection.
/// </summary>
public Action<HttpListenerContext> OnOtherConnection { get; set; }
public Action<IWebSocketConnection> OnWebSocketConnection { get; set; }
/// <summary>
/// Start listening for connections. If already listening, stop and restart with the new prefix list.
/// </summary>
/// <param name="prefixes">
/// The list of HTTP(S) URL prefixes to listen on.
/// Use "http" instead of "ws" or "https" instead of "wss".
/// <param name="location">
/// The list of WS URL to listen on.
/// <example><code>
/// http://locahost:1234/
/// https://127.0.0.1/
/// ws://0.0.0.0:1234/
/// ws://127.0.0.1/
/// </code></example>
/// </param>
public void Start(IEnumerable<string> prefixes)
public void Start(string location)
{
if (this.listener.IsListening)
if (this.server != null)
{
throw new InvalidOperationException();
this.server.ListenerSocket.Close();
this.server.Dispose();
}
foreach (var prefix in prefixes)
{
this.listener.Prefixes.Add(prefix);
}
this.server = new WebSocketServer(location);
this.server.ListenerSocket.NoDelay = true; // disable Nagle's algorithm
this.listener.Start();
Task.Run(async () =>
this.server.Start(socket =>
{
CancellationToken token = this.cts.Token;
while (!token.IsCancellationRequested)
if (this.cts.IsCancellationRequested)
{
var context = await this.listener.GetContextAsync();
if (context.Request.IsWebSocketRequest)
{
var webSocketContext = await context.AcceptWebSocketAsync(null);
this.OnWebSocketConnection(webSocketContext);
}
else
{
this.OnOtherConnection(context);
}
socket.Close(503); // Service Unavailable: the server is stopping
return;
}
socket.OnOpen = () => this.OnWebSocketConnection(socket);
});
}
@ -84,6 +64,8 @@ internal class WebSocketListener
public void Stop()
{
this.cts.Cancel();
this.listener.Stop();
this.server.RestartAfterListenError = false; // work around statianzo/Fleck#325
this.server.ListenerSocket.Close();
this.server.Dispose();
}
}

View file

@ -13,7 +13,6 @@
<Compile Include="$(MSBuildThisFileDirectory)BLE\BLESession.cs" />
<Compile Include="$(MSBuildThisFileDirectory)BLE\GattHelpers.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Extensions\ContainerExtensions.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Extensions\WebSocketExtensions.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Extensions\JsonExtensions.cs" />
<Compile Include="$(MSBuildThisFileDirectory)JsonRpc\JsonRpc2Request.cs" />
<Compile Include="$(MSBuildThisFileDirectory)JsonRpc\JsonRpc2Message.cs" />

View file

@ -8,11 +8,11 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.WebSockets;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using CoreBluetooth;
using Fleck;
using Foundation;
using ScratchLink.BLE;
using ScratchLink.Extensions;
@ -41,9 +41,9 @@ internal class MacBLESession : BLESession<CBUUID>
/// <summary>
/// Initializes a new instance of the <see cref="MacBLESession"/> class.
/// </summary>
/// <param name="context">The web socket context.</param>
public MacBLESession(WebSocketContext context)
: base(context)
/// <param name="webSocket">The web socket.</param>
public MacBLESession(IWebSocketConnection webSocket)
: base(webSocket)
{
this.cbManager = new ();
this.cbManager.UpdatedState += this.WrapEventHandler(this.CbManager_UpdatedState);

View file

@ -4,7 +4,7 @@
namespace ScratchLink.Mac;
using System.Net.WebSockets;
using Fleck;
using ScratchLink.Mac.BLE;
/// <summary>
@ -13,15 +13,15 @@ using ScratchLink.Mac.BLE;
internal class MacSessionManager : SessionManager
{
/// <inheritdoc/>
protected override Session MakeNewSession(WebSocketContext webSocketContext)
protected override Session MakeNewSession(IWebSocketConnection webSocket)
{
var requestPath = webSocketContext.RequestUri.AbsolutePath;
var requestPath = webSocket.ConnectionInfo.Path;
return requestPath switch
{
"/scratch/ble" => new MacBLESession(webSocketContext),
"/scratch/ble" => new MacBLESession(webSocket),
// for unrecognized paths, return a base Session for debugging
_ => new Session(webSocketContext),
_ => new Session(webSocket),
};
}
}