Skip to content

Commit

Permalink
Added a MessageClass property.
Browse files Browse the repository at this point in the history
  • Loading branch information
mgnsm committed May 10, 2022
1 parent 0b6de2d commit 3371e67
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 39 deletions.
50 changes: 30 additions & 20 deletions Source/Millistream.Streaming/IMarketDataFeed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,62 @@ namespace Millistream.Streaming
public interface IMarketDataFeed<TCallbackUserData, TStatusCallbackUserData>
{
/// <summary>
/// The file descriptor used by the connection. Will be -1 (or INVALID_SOCKET on Windows) if there is no connection.
/// Gets the file descriptor used by the connection. Will be -1 (or INVALID_SOCKET on Windows) if there is no connection.
/// </summary>
int FileDescriptor { get; }

/// <summary>
/// The current API error code.
/// Gets or sets the current API error code.
/// </summary>
Error ErrorCode { get; set; }

/// <summary>
/// The number of bytes received by the server since the handle was created.
/// Gets or sets the number of bytes received by the server since the handle was created.
/// </summary>
ulong ReceivedBytes { get; set; }

/// <summary>
/// The number of bytes sent by the client since the handle was created.
/// Gets or sets the number of bytes sent by the client since the handle was created.
/// </summary>
ulong SentBytes { get; set; }

/// <summary>
/// A callback function that will be called by the consume function if there are any messages to decode.
/// Gets or sets a callback function that will be called by the consume function if there are any messages to decode.
/// </summary>
DataCallback<TCallbackUserData, TStatusCallbackUserData> DataCallback { get; set; }

/// <summary>
/// Custom userdata that will be available to the data callback function.
/// Gets or sets custom userdata that will be available to the data callback function.
/// </summary>
TCallbackUserData CallbackUserData { get; set; }

/// <summary>
/// A callback function that will be called whenever there is a change of the status of the connection.
/// Gets or sets a callback function that will be called whenever there is a change of the status of the connection.
/// </summary>
StatusCallback<TStatusCallbackUserData> StatusCallback { get; set; }

/// <summary>
/// Custom userdata that will be available to the status callback function.
/// Gets or sets custom userdata that will be available to the status callback function.
/// </summary>
TStatusCallbackUserData StatusCallbackUserData { get; set; }

/// <summary>
/// The number of seconds before determining that a connect attempt has timed out. Valid values are 1 to 60. The default value is 5.
/// Gets or sets the number of seconds before determining that a connect attempt has timed out.
/// </summary>
int ConnectionTimeout { get; set; }

/// <summary>
/// The number of seconds the connection must be idle before the API sends a heartbeat request to the server. Valid values are 1 to 86400. The default is 30.
/// Gets or sets the number of seconds the connection must be idle before the API sends a heartbeat request to the server.
/// </summary>
int HeartbeatInterval { get; set; }

/// <summary>
/// How many outstanding hearbeat requests to allow before the connection is determined to be disconnected. Valid values are 1 to 100. The default is 2.
/// Gets or sets how many outstanding hearbeat requests to allow before the connection is determined to be disconnected.
/// </summary>
int MaximumMissedHeartbeats { get; set; }

/// <summary>
/// Controls whether Nagle's algorithm is used on the TCP connection. It's enabled by default.
/// Gets or sets a value indicating whether Nagle's algorithm is used on the TCP connection.
/// </summary>
bool NoDelay { get; set; }

Expand All @@ -75,42 +75,42 @@ public interface IMarketDataFeed<TCallbackUserData, TStatusCallbackUserData>
bool NoEncryption { get; set; }

/// <summary>
/// The time difference in number of seconds between the client and the server. The value should be added to the current time on the client in order to get the server time. Please not that this value can be negative if the client clock is ahead of the server clock.
/// Gets the time difference in number of seconds between the client and the server. The value should be added to the current time on the client in order to get the server time. Please not that this value can be negative if the client clock is ahead of the server clock.
/// </summary>
int TimeDifference { get; }

/// <summary>
/// A numerical address to which the API will bind before attempting to connect to a server in <see cref="Connect"/>. If the bind fails then <see cref="Connect"/> also fails. The string is copied by the API and a <see langword="NULL" /> value can be used in order to "unset" the bind address.
/// Gets or sets a numerical address to which the API will bind before attempting to connect to a server in <see cref="Connect"/>. If the bind fails then <see cref="Connect"/> also fails. The string is copied by the API and a <see langword="NULL" /> value can be used in order to "unset" the bind address.
/// </summary>
string BindAddress { get; set; }

/// <summary>
/// The time difference in number of nanoseconds between the client and the server. The value should be added to the current time on the client in order to get the server time. Please not that this value can be negative if the client clock is ahead of the server clock.
/// Gets the time difference in number of nanoseconds between the client and the server. The value should be added to the current time on the client in order to get the server time. Please not that this value can be negative if the client clock is ahead of the server clock.
/// </summary>
long TimeDifferenceNs { get; }

/// <summary>
/// A comma separated list of the message digests that the client will offer to the server upon connect.
/// Gets or sets a comma separated list of the message digests that the client will offer to the server upon connect.
/// </summary>
public string MessageDigests { get; set; }

/// <summary>
/// A comma separated list of the encryption ciphers that the client will offer to the server upon connect.
/// Gets or sets a comma separated list of the encryption ciphers that the client will offer to the server upon connect.
/// </summary>
public string Ciphers { get; set; }

/// <summary>
/// The digest chosen by the server. Only available after <see cref="Connect"/> returns.
/// Gets the digest chosen by the server. Only available after <see cref="Connect"/> returns.
/// </summary>
string MessageDigest { get; }

/// <summary>
/// The cipher chosen by the server. Only available after <see cref="Connect"/> returns.
/// Gets rhe cipher chosen by the server. Only available after <see cref="Connect"/> returns.
/// </summary>
string Cipher { get; }

/// <summary>
/// The number of seconds to wait before having to call <see cref="Consume(int)"/>.
/// Gets the number of seconds to wait before having to call <see cref="Consume(int)"/>.
/// </summary>
public int Timeout { get; }

Expand All @@ -119,6 +119,16 @@ public interface IMarketDataFeed<TCallbackUserData, TStatusCallbackUserData>
/// </summary>
public bool HandleDelay { get; set; }

/// <summary>
/// Gets the intended delay of the current message if delay-mode have been activated by setting the <see cref="HandleDelay"/> property. Note that this is the intended delay of the message and not necessarily the real delay, network latency, server latency and so on are not included.
/// </summary>
public byte Delay { get; }

/// <summary>
/// Gets the message class of the current received message.
/// </summary>
public ulong MessageClass { get; }

/// <summary>
/// Consumes data sent from the server. If there currently is no data the function waits for <paramref name="timeout"/> number of seconds, if <paramref name="timeout"/> is zero (0) the function will return immediately. If <paramref name="timeout"/> is negative then the wait period is treated as number of microseconds instead of number of seconds (i.e. -1000 will wait a maximum of 1000µs).
/// </summary>
Expand Down
10 changes: 5 additions & 5 deletions Source/Millistream.Streaming/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@ namespace Millistream.Streaming
public interface IMessage
{
/// <summary>
/// The zlib compression level used for the <see cref="AddString(uint, string)"/> and <see cref="AddString(uint, string, int)"/> methods.
/// Gets or sets the zlib compression level used for the <see cref="AddString(uint, string)"/> and <see cref="AddString(uint, string, int)"/> methods.
/// </summary>
CompressionLevel CompressionLevel { get; set; }

/// <summary>
/// The total number of messages in the message handle (the number of active + the number of reused messages currently not used for active messages).
/// Gets the total number of messages in the message handle (the number of active + the number of reused messages currently not used for active messages).
/// </summary>
int Count { get; }

/// <summary>
/// The number of active messages in the message handle.
/// Gets the number of active messages in the message handle.
/// </summary>
int ActiveCount { get; }

/// <summary>
/// The number of added fields to the current message.
/// Gets the number of added fields to the current message.
/// </summary>
int FieldCount { get; }

/// <summary>
/// Enables or disables the UTF-8 validation performed in <see cref="AddString(uint, string)"/> and <see cref="AddString(uint, string, int)"/>. It's enabled by default.
/// Enables or disables the UTF-8 validation performed in <see cref="AddString(uint, string)"/> and <see cref="AddString(uint, string, int)"/>.
/// </summary>
bool Utf8Validation { get; set; }

Expand Down
3 changes: 3 additions & 0 deletions Source/Millistream.Streaming/Interop/NativeImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ unsafe internal sealed class NativeImplementation
internal readonly delegate* unmanaged[Cdecl]<IntPtr, MDF_OPTION, ref long, int> mdf_get_long_property;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, MDF_OPTION, IntPtr, int> mdf_set_property;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, byte> mdf_get_delay;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, ulong> mdf_get_mclass;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, IntPtr, int> mdf_connect;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, void> mdf_disconnect;
internal readonly delegate* unmanaged[Cdecl]<IntPtr> mdf_message_create;
Expand Down Expand Up @@ -96,6 +97,8 @@ internal NativeImplementation(string libraryPath)

if (nativeLibrary.TryGetExport(lib, nameof(mdf_get_delay), out IntPtr address))
mdf_get_delay = (delegate* unmanaged[Cdecl]<IntPtr, byte>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_get_mclass), out address))
mdf_get_mclass = (delegate* unmanaged[Cdecl]<IntPtr, ulong>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_message_add_int), out address))
mdf_message_add_int = (delegate* unmanaged[Cdecl]<IntPtr, uint, long, int, int>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_message_add_uint), out address))
Expand Down
15 changes: 15 additions & 0 deletions Source/Millistream.Streaming/MarketDataFeed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@ public byte Delay
return _nativeImplementation.mdf_get_delay(_feedHandle);
}
}

/// <summary>
/// Gets the message class of the current received message.
/// </summary>
/// <exception cref="InvalidOperationException">The installed version of the native library doesn't include the mdf_get_mclass function.</exception>
/// <exception cref="ObjectDisposedException">The <see cref="MarketDataFeed{TCallbackData,TStatusCallbackData}"/> instance has been disposed.</exception>
public ulong MessageClass
{
get
{
ThrowIfDisposed();
Message.ThrowIfNativeFunctionIsMissing(_nativeImplementation.mdf_get_mclass, nameof(_nativeImplementation.mdf_get_mclass));
return _nativeImplementation.mdf_get_mclass(_feedHandle);
}
}
#endregion

#region Methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ public void GetAndSetPropertiesTest()

_ = mdf.TimeDifferenceNs;

//Timeout
_ = mdf.Timeout;

//Allocations
long allocatedBytes = GetTotalAllocatedBytes();
_ = mdf.FileDescriptor;
Expand Down Expand Up @@ -163,6 +160,8 @@ public void GetAndSetPropertiesTest()

_ = mdf.Delay;

_ = mdf.MessageClass;

Assert.AreEqual(allocatedBytes, GetTotalAllocatedBytes());
}

Expand Down Expand Up @@ -325,10 +324,13 @@ public void DataCallbackTest()
void OnDataReceived(object userData, MarketDataFeed<object, object> mdf)
{
Assert.AreEqual(UserData, userData);
while (mdf.GetNextMessage(out int _, out int _, out ulong _))
while (mdf.GetNextMessage(out int _, out int mclass, out ulong _))
{
Assert.AreEqual((ulong)mclass, mdf.MessageClass);
while (mdf.GetNextField(out Field field, out ReadOnlySpan<byte> value))
if (field == Field.MDF_F_REQUESTID && Encoding.UTF8.GetString(value.ToArray()) == RequestId)
requestFinished = true;
}
}
mdf.DataCallback = OnDataReceived;
//request some data
Expand Down
56 changes: 46 additions & 10 deletions Tests/Millistream.Streaming.UnitTests/MarketDataFeedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,22 @@ public void GetDelayTest()
Assert.AreEqual(Delay, mdf.Delay);
}

[TestMethod]
public void GetMessageClassTest()
{
const ulong MessageClass = ulong.MaxValue;
Mock<INativeImplementation> nativeImplementation = new();
IntPtr feedHandle = new(123);
nativeImplementation.Setup(x => x.mdf_create()).Returns(feedHandle);
nativeImplementation
.Setup(x => x.mdf_get_mclass(feedHandle))
.Returns(MessageClass);
NativeImplementation.Implementation = nativeImplementation.Object;

using MarketDataFeed mdf = new();
Assert.AreEqual(MessageClass, mdf.MessageClass);
}

[TestMethod]
public void GetAndSetDataCallbackTest()
{
Expand Down Expand Up @@ -627,16 +643,18 @@ public unsafe void DelayThrowsWhenNativeFunctionIsMissingTest()
NativeImplementation nativeImplementation = new(default);
nativeImplementation.mdf_get_delay = default;

using MarketDataFeed mdf = new(nativeImplementation);
try
{
_ = mdf.Delay;
Assert.Fail($"No expected {nameof(InvalidOperationException)} was thrown.");
}
catch (InvalidOperationException ex)
{
Assert.AreEqual($"The installed version of the native library doesn't include the {nameof(nativeImplementation.mdf_get_delay)} function.", ex.Message);
}
GetPropertyAndCatchInvalidOperationException(nativeImplementation, mdf => _ = mdf.Delay,
nameof(nativeImplementation.mdf_get_delay));
}

[TestMethod]
public unsafe void MessageClassThrowsWhenNativeFunctionIsMissingTest()
{
NativeImplementation nativeImplementation = new(default);
nativeImplementation.mdf_get_mclass = default;

GetPropertyAndCatchInvalidOperationException(nativeImplementation, mdf => _ = mdf.MessageClass,
nameof(nativeImplementation.mdf_get_mclass));
}

[TestMethod]
Expand Down Expand Up @@ -763,6 +781,10 @@ public unsafe void DelayThrowsWhenNativeFunctionIsMissingTest()
[ExpectedException(typeof(ObjectDisposedException))]
public void CannotGetDelayAfterDisposeTest() => _ = GetDisposedMdf().Delay;

[TestMethod]
[ExpectedException(typeof(ObjectDisposedException))]
public void CannotGetMessageClassAfterDisposeTest() => _ = GetDisposedMdf().MessageClass;

[TestMethod]
[ExpectedException(typeof(ObjectDisposedException))]
public void CannotGetDataCallbackfterDisposeTest() => _ = GetDisposedMdf().DataCallback;
Expand Down Expand Up @@ -971,5 +993,19 @@ private static void Compare(string expectedValue, IntPtr actualValue)
Marshal.Copy(actualValue, bytes, 0, expectedValue.Length);
Assert.AreEqual(expectedValue, Encoding.UTF8.GetString(bytes));
}

private static void GetPropertyAndCatchInvalidOperationException(NativeImplementation nativeImplementation, Action<MarketDataFeed> action, string nativeFunctionName)
{
using MarketDataFeed mdf = new(nativeImplementation);
try
{
action(mdf);
Assert.Fail($"No expected {nameof(InvalidOperationException)} was thrown.");
}
catch (InvalidOperationException ex)
{
Assert.AreEqual($"The installed version of the native library doesn't include the {nativeFunctionName} function.", ex.Message);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public interface INativeImplementation
int mdf_get_property(IntPtr handle, int option, ref long value);
int mdf_set_property(IntPtr handle, int option, IntPtr value);
byte mdf_get_delay(IntPtr handle);
ulong mdf_get_mclass(IntPtr handle);
int mdf_connect(IntPtr handle, IntPtr server);
void mdf_disconnect(IntPtr handle);
IntPtr mdf_message_create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal unsafe class NativeImplementation
internal delegate*<IntPtr, MDF_OPTION, ref long, int> mdf_get_long_property;
internal delegate*<IntPtr, MDF_OPTION, IntPtr, int> mdf_set_property;
internal delegate*<IntPtr, byte> mdf_get_delay;
internal delegate*<IntPtr, ulong> mdf_get_mclass;
internal delegate*<IntPtr, IntPtr, int> mdf_connect;
internal delegate*<IntPtr, void> mdf_disconnect;
internal delegate*<IntPtr> mdf_message_create;
Expand Down Expand Up @@ -63,6 +64,7 @@ internal NativeImplementation(string _)
mdf_get_long_property = &MdfGetInt64Property;
mdf_set_property = &MdfSetProperty;
mdf_get_delay = &MdfGetDelay;
mdf_get_mclass = &MdfGetMClass;
mdf_connect = &MdfConnect;
mdf_disconnect = &MdfDisconnect;
mdf_message_create = &MdfMessageCreate;
Expand Down Expand Up @@ -102,6 +104,7 @@ internal NativeImplementation(string _)
private static int MdfGetInt64Property(IntPtr handle, MDF_OPTION option, ref long value) => Implementation.mdf_get_property(handle, (int)option, ref value);
private static int MdfSetProperty(IntPtr handle, MDF_OPTION option, IntPtr value) => Implementation.mdf_set_property(handle, (int)option, value);
private static byte MdfGetDelay(IntPtr handle) => Implementation.mdf_get_delay(handle);
private static ulong MdfGetMClass(IntPtr handle) => Implementation.mdf_get_mclass(handle);
private static int MdfConnect(IntPtr handle, IntPtr server) => Implementation.mdf_connect(handle, server);
private static void MdfDisconnect(IntPtr handle) => Implementation.mdf_disconnect(handle);
private static IntPtr MdfMessageCreate() => Implementation.mdf_message_create();
Expand Down

0 comments on commit 3371e67

Please sign in to comment.