improve thread safety and error handling

This commit is contained in:
Christopher Willis-Ford 2022-05-02 16:44:09 -07:00
parent 51ab6b38b4
commit 9ccdabb64d
2 changed files with 68 additions and 27 deletions

View file

@ -129,12 +129,14 @@
}
didReceiveCall(method, params) {
addLine(`Received call to method: ${method}`);
switch (method) {
case 'didDiscoverPeripheral':
addLine(`Peripheral discovered: ${stringify(params)}`);
this.discoveredPeripheralId = params['peripheralId'];
break;
case 'ping':
addLine(`Responding to ping`);
return 42;
}
}

View file

@ -32,7 +32,7 @@ internal class Session : IDisposable
/// <summary>
/// Default timeout for remote requests.
/// </summary>
protected static readonly TimeSpan DefaultRequestTimeout = TimeSpan.FromMinutes(1);
protected static readonly TimeSpan DefaultRequestTimeout = TimeSpan.FromSeconds(3);
/// <summary>
/// Stores the mapping from method names to handlers.
@ -50,7 +50,9 @@ internal class Session : IDisposable
};
private readonly Dictionary<RequestId, PendingRequestRecord> pendingRequests = new ();
private RequestId nextId = 0;
private RequestId nextId = 1; // some clients have trouble with ID=0
private SemaphoreSlim websocketSendLock = new (1);
/// <summary>
/// Initializes a new instance of the <see cref="Session"/> class.
@ -128,9 +130,13 @@ internal class Session : IDisposable
var pingResult = await this.SendRequest("ping", null, cancellationToken);
Debug.Print($"Got result from ping: {pingResult}");
}
catch (JsonRpc2Exception e)
{
Debug.Print($"Got JSON-RPC error from ping: {e.Error}");
}
catch (Exception e)
{
Debug.Print($"Got exception from ping: {e}");
Debug.Print($"Got unrecognized exception from ping: {e}");
}
});
return Task.FromResult<object>("willPing");
@ -154,7 +160,8 @@ internal class Session : IDisposable
var messageBytes = JsonSerializer.SerializeToUtf8Bytes(request);
var webSocket = this.context.WebSocket;
await webSocket.SendAsync(messageBytes, WebSocketMessageType.Text, true, cancellationToken);
await this.SocketSend(messageBytes, cancellationToken);
}
/// <summary>
@ -193,15 +200,19 @@ internal class Session : IDisposable
using (var pendingRequest = new PendingRequestRecord(cancellationToken, timeout))
{
// register the pending request BEFORE `await webSocket.SendAsync`, just in case
// register the pending request BEFORE sending the request, just in case the response comes back before we get back from `await`
this.pendingRequests[requestId] = pendingRequest;
try
{
await webSocket.SendAsync(messageBytes, WebSocketMessageType.Text, true, cancellationToken);
await this.SocketSend(messageBytes, cancellationToken);
return await pendingRequest.Task;
}
catch (Exception e)
{
pendingRequest.TrySetException(e);
throw;
}
finally
{
this.pendingRequests.Remove(requestId);
@ -250,18 +261,18 @@ internal class Session : IDisposable
}
catch (Exception)
{
Debug.Print($"Response appears to have invalid ID = ${response.Id}");
Debug.Print($"Response appears to have invalid ID = {response.Id}");
return;
}
var requestRecord = this.pendingRequests.GetValueOrDefault(responseId, null);
if (requestRecord == null)
{
Debug.Print($"Could not find request record with ID = ${response.Id}");
Debug.Print($"Could not find request record with ID = {response.Id}");
return;
}
requestRecord.SetResult(response.Error, response.Result);
requestRecord.TrySetResult(response.Error, response.Result);
}
private async Task SendResponse(object id, object result, JsonRpc2Error error, CancellationToken cancellationToken)
@ -274,8 +285,7 @@ internal class Session : IDisposable
};
var responseBytes = JsonSerializer.SerializeToUtf8Bytes(response);
var webSocket = this.context.WebSocket;
await webSocket.SendAsync(responseBytes, WebSocketMessageType.Text, true, cancellationToken);
await this.SocketSend(responseBytes, cancellationToken);
}
private RequestId GetNextId()
@ -283,38 +293,65 @@ internal class Session : IDisposable
return this.nextId++;
}
private async Task SocketSend(byte[] messageBytes, CancellationToken cancellationToken)
{
var webSocket = this.context.WebSocket;
await this.websocketSendLock.WaitAsync();
try
{
await webSocket.SendAsync(messageBytes, WebSocketMessageType.Text, true, cancellationToken);
}
finally
{
this.websocketSendLock.Release();
}
}
private async void CommLoop()
{
var cancellationToken = this.cancellationTokenSource.Token;
var webSocket = this.context.WebSocket;
try
{
var messageReadLock = new SemaphoreSlim(1);
var messageBuffer = new MemoryStream();
while (this.IsOpen)
{
messageBuffer.SetLength(0);
var result = await webSocket.ReceiveMessageToStream(messageBuffer, MessageSizeLimit, cancellationToken);
if (messageBuffer.Length > 0)
JsonRpc2Message message;
await messageReadLock.WaitAsync();
try
{
messageBuffer.Position = 0;
var message = JsonSerializer.Deserialize<JsonRpc2Message>(messageBuffer, this.deserializerOptions);
if (message is JsonRpc2Request request)
messageBuffer.SetLength(0);
var result = await webSocket.ReceiveMessageToStream(messageBuffer, MessageSizeLimit, cancellationToken);
if (messageBuffer.Length > 0)
{
await this.HandleRequest(request, cancellationToken);
}
else if (message is JsonRpc2Response response)
{
this.HandleResponse(response);
messageBuffer.Position = 0;
message = JsonSerializer.Deserialize<JsonRpc2Message>(messageBuffer, this.deserializerOptions);
}
else
{
Debug.Print("Received a message which was not recognized as a Request or Response");
Debug.Print("Received an empty message");
continue;
}
}
finally
{
messageReadLock.Release();
}
if (message is JsonRpc2Request request)
{
await this.HandleRequest(request, cancellationToken);
}
else if (message is JsonRpc2Response response)
{
this.HandleResponse(response);
}
else
{
Debug.Print("Received an empty message");
Debug.Print("Received a message which was not recognized as a Request or Response");
}
}
}
@ -359,7 +396,9 @@ internal class Session : IDisposable
public void Cancel() => this.cancellationTokenSource.Cancel();
public void SetResult(JsonRpc2Error error, object result)
public void TrySetException(Exception exception) => this.completionSource.TrySetException(exception);
public void TrySetResult(JsonRpc2Error error, object result)
{
if (error != null)
{