Version 1.1.0
DataFeed.cs
- The
DataReceived
event has been removed and replaced by anIObservable<ResponseMessage>
property calledData
. (breaking) - A
Recycle(ResponseMessage)
method has been added. It enables you to reuse instances ofResponseMessage
using an internal object pool instead of allocating new ones for each message received from the feed.
ResponseMessage.cs
- The
Fields
property now returns anIReadOnlyDictionary<Field, ReadOnlyMemory<byte>>
instead of anIReadOnlyDictionary<Field, string>
. No strings are allocated by default. (breaking)
In version 1.0.* you subscribed to the DataReceived
event and received response messages containing fields with UTF-8 string values like this:
dataFeed.DataReceived += (s, e) =>
{
Console.WriteLine($"{DateTime.Now.ToShortTimeString()} - " +
$"Received a {e.Message.MessageReference} message with the following fields:");
foreach (var field in e.Message.Fields)
Console.WriteLine($"{field.Key}: {field.Value}");
};
The problem with this approach is that it creates a lot of garbage. For each message received from the data feed, there was an instance of the ResponseMessage
class created as well as an instance of the DataReceivedEventArgs
class.
In version 1.1.0, the DataReceived
event has been replaced by an IObservable<ResponseMessage>
property. This removes the need for the DataReceivedEventArgs
class completely. You now subscribe to data by calling the Subscribe
method of the Data
property and passing in an instance of a class that implements the IObserver<ResponseMessage>
interface, e.g.:
dataFeed.Data.Subscribe(new Observer(dataFeed));
...
class Observer : IObserver<ResponseMessage>
{
private readonly IDataFeed _dataFeed;
public Observer(IDataFeed dataFeed) => _dataFeed = dataFeed;
public void OnNext(ResponseMessage message)
{
Console.WriteLine($"{DateTime.Now.ToShortTimeString()} - " +
$"Received a {message.MessageReference} message with the following fields:");
foreach (var field in message.Fields)
Console.WriteLine($"{field.Key}: {Encoding.UTF8.GetString(field.Value.Span)}");
_dataFeed?.Recycle(message);
}
public void OnCompleted() { }
public void OnError(Exception exception) => Console.WriteLine(exception.Message);
}
If you are using the Reactive Extensions (Rx.NET) library, you don't have to explicitly implement IObserver<T>
as there is an extension Subscribe
method that accepts an Action<T>
:
dataFeed.Data.Subscribe(responseMessage =>
{
Console.WriteLine($"{DateTime.Now.ToShortTimeString()} - " +
$"Received a {message.MessageReference} message with the following fields:");
foreach (var field in message.Fields)
Console.WriteLine($"{field.Key}: {Encoding.UTF8.GetString(field.Value.Span)}");
dataFeed.Recycle(message);
});
Millistream.Streaming has no dependency on Rx.NET though. The IObservable<T>
and IObserver<T>
interfaces are included in both .NET Standard 1.2 and .NET Framework 4.5.
Besides this breaking change, a Recycle(ResponseMessage)
method has been added to the DataFeed
class. It resets and recycles an instance of the ResponseMessage
class. This means that you now only need to allocate a single instance of ResponseMessage
assuming that you subscribe to and handle all response messages in a single thread. You simply call the Recycle
method once you have processed the response message as shown above.
The DataFeed
class internally uses a generic ObjectPool<T>
class to keep track of the recycled objects. The implementation is based on the implementation from the .NET compiler platform Roslyn and uses a simple array to store the recycled object(s).
The other breaking change in this version is that the type of the Fields
property of the ResponseMessage
class has changed from IReadOnlyDictionary<Field, string>
to IReadOnlyDictionary<Field, ReadOnlyMemory<byte>>
. In the previous version a string
was allocated for each field of each message regardless of whether the consumer was actually interested in all values of all fields. In the new version, no strings are allocated by default. Instead the DataFeed
class adds all bytes that it reads from the feed for a message to an internal ExtendableArray<byte>
property of the ResponseMessage
class. It then uses the ReadOnlyMemory<byte>
class that was introduced in .NET Core 2.1 (and is available as a separate System.Memory NuGet package in previous versions) to assign each field a portion of the underlying array without having to allocate a new byte array (byte[]
) for each field.
ExtendableArray<byte>
is an internal type that wraps a byte array (byte[]
) and enables you to dynamically add items to it. It resizes the underlying array just like the built-in List<T>
class does, but unlike List<T>
it also has an Items
property that returns the underlying array of items without creating a copy of it. This array is passed to the constructor of the ReadOnlyMemory<byte>
that represents the value of a field along with the offset and length.
To convert a field value to a string
in the version, you can use the Encoding.UTF8.GetString
method in the consuming application. In .NET Core 2.1 there is an optimized overload that accepts a ReadOnlySpan<byte>
. You can use the Span
property of the ReadOnlyMemory<byte>
class to get one, e.g.:
foreach (KeyValuePair<Field, ReadOnlySpan<byte>> field in message.Fields)
Console.WriteLine($"{field.Key}: {Encoding.UTF8.GetString(field.Value.Span)}");
If you are interested in numeric values, the System.Buffers.Text.Utf8Parser
class has several overloads of the TryParse
method that you can use to convert a ReadOnlySpan<byte>
to any primitve data type without allocating a string
first.
Apart from the public API changes, there has also been some other performance improvements introduced in the DataFeed
class under the hood. For example, it now uses the ReadOnlySpan<byte>
class to read bytes directly from native memory without allocating managed arrays and copying bytes. Also, the slow calls to Enum.IsDefined
have been replaced by faster calls to the Contains
method of two cached hash sets that determine whether a field tag received from the feed is a valid MessageReference
or Field
enumeration value.