Skip to content

Commit

Permalink
Added new overloads of the GetNextMessage method.
Browse files Browse the repository at this point in the history
  • Loading branch information
mgnsm committed May 10, 2022
1 parent 3371e67 commit 19a0e1e
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private static bool Consume(MarketDataFeed mdf, int consumeTimeout, int waitTime

if (ret == 1)
{
while (mdf.GetNextMessage(out MessageReference mref, out _, out _))
while (mdf.GetNextMessage(out MessageReference mref, out _))
{
if (mref == MessageReference.MDF_M_LOGONGREETING)
return true;
Expand All @@ -143,7 +143,7 @@ private static bool Consume(MarketDataFeed mdf, int consumeTimeout, int waitTime

private static void OnDataReceived(object userData, MarketDataFeed<object, object> handle)
{
while (handle.GetNextMessage(out int _, out _, out _))
while (handle.GetNextMessage(out ushort _, out _))
while (handle.GetNextField(out Field field, out _))
if (field == Field.MDF_F_REQUESTID)
s_dataCallbackInvoked = true;
Expand Down
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ An unofficial .NET wrapper for Millistream's low-latency, high-throughput and hi
## Installation
Millistream.NET is distributed via [NuGet](https://www.nuget.org/packages/Millistream.Streaming). The native and wrapped API can be downloaded from [Millistream's official website](https://packages.millistream.com/). Binaries are available for Linux, macOS and Windows. They come in both 32- and 64-bit versions and should work on both little- and big-endian systems. Please refer to [the official documentation](https://packages.millistream.com/documents/MDF%20C%20API.pdf) for more information about the wrapped API itself. The NuGet package does not include any native assemblies. You will have to download and install these separately.
### Native Dependency
On **Windows** you download and run an [.exe](https://packages.millistream.com/Windows/libmdf-1.0.25.exe) that will install the native `libmdf` core library along pre-built binaries of [zlib](http://zlib.net) and [OpenSSL](http://openssl.org/) (`libmdf` links against these). You can do this silently from a command prompt using Powershell:
On **Windows** you download and run an [.exe](https://packages.millistream.com/Windows/libmdf-1.0.26.exe) that will install the native `libmdf` core library along pre-built binaries of [zlib](http://zlib.net) and [OpenSSL](http://openssl.org/) (`libmdf` links against these). You can do this silently from a command prompt using Powershell:

powershell (new-object System.Net.WebClient).DownloadFile('https://packages.millistream.com/Windows/libmdf-1.0.25.exe', 'libmdf-1.0.25.exe')
.\libmdf-1.0.25.exe /S
powershell (new-object System.Net.WebClient).DownloadFile('https://packages.millistream.com/Windows/libmdf-1.0.26.exe', 'libmdf-1.0.26.exe')
.\libmdf-1.0.26.exe /S

On **macOS** you download and install a `.pkg` file, for example in a Bash shell:

curl -O https://packages.millistream.com/macOS/libmdf-1.0.23.pkg
sudo installer -pkg libmdf-1.0.23.pkg -target /
curl -O https://packages.millistream.com/macOS/libmdf-1.0.26.pkg
sudo installer -pkg libmdf-1.0.26.pkg -target /

On **Linux**, the native API and the dependent libraries are available through your distribution repository. Below is an example of how to install everything needed using the `apt-get` command-line tool on Ubuntu:

Expand Down Expand Up @@ -71,8 +71,7 @@ Console.WriteLine($"{DateTime.Now.ToShortTimeString()} - Logged in");
//6. Register a data callback (optional).
mdf.DataCallback = (data, handle) =>
{
while (handle.GetNextMessage(out MessageReference mref, out MessageClasses mclass,
out ulong insref))
while (handle.GetNextMessage(out MessageReference mref, out ulong insref))
{
Console.WriteLine($"{DateTime.Now.ToShortTimeString()} - " +
$"Received an {mref} message with the following fields:");
Expand Down Expand Up @@ -134,8 +133,7 @@ static bool Consume(MarketDataFeed mdf, MessageReference messageReference)
switch (ret)
{
case 1:
while (mdf.GetNextMessage(out MessageReference mref, out MessageClasses _,
out ulong _))
while (mdf.GetNextMessage(out MessageReference mref, out ulong _))
if (mref == messageReference)
return true;
break;
Expand Down
6 changes: 2 additions & 4 deletions Samples/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
//6. Register a data callback (optional).
mdf.DataCallback = (data, handle) =>
{
while (handle.GetNextMessage(out MessageReference mref, out MessageClasses mclass,
out ulong insref))
while (handle.GetNextMessage(out MessageReference mref, out ulong insref))
{
Console.WriteLine($"{DateTime.Now.ToShortTimeString()} - " +
$"Received an {mref} message with the following fields:");
Expand Down Expand Up @@ -96,8 +95,7 @@ static bool Consume(MarketDataFeed mdf, MessageReference messageReference)
switch (ret)
{
case 1:
while (mdf.GetNextMessage(out MessageReference mref, out MessageClasses _,
out ulong _))
while (mdf.GetNextMessage(out MessageReference mref, out ulong _))
if (mref == messageReference)
return true;
break;
Expand Down
18 changes: 17 additions & 1 deletion Source/Millistream.Streaming/IMarketDataFeed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public interface IMarketDataFeed<TCallbackUserData, TStatusCallbackUserData>
/// Consumes data sent from the server. If there currently is no data the function waits for <paramref name="timeout"/> number of seconds, if <paramref name="timeout"/> is zero (0) the function will return immediately. If <paramref name="timeout"/> is negative then the wait period is treated as number of microseconds instead of number of seconds (i.e. -1000 will wait a maximum of 1000µs).
/// </summary>
/// <param name="timeout">The wait period in seconds if positive. If negative, the value is treated as the number of microseconds to wait instead of the number of seconds.</param>
/// <returns>1 if data has been consumed that needs to be handled by <see cref="GetNextMessage(out int, out int, out ulong)" /> and no callback function has been registered. The function returns 0 on timeout or if a callback function is registered and there was data. On errors, -1 will be returned (and the connection will be dropped).</returns>
/// <returns>1 if data has been consumed that needs to be handled by <see cref="GetNextMessage(out ushort, out ulong)" /> and no callback function has been registered. The function returns 0 on timeout or if a callback function is registered and there was data. On errors, -1 will be returned (and the connection will be dropped).</returns>
int Consume(int timeout);

/// <summary>
Expand All @@ -145,6 +145,14 @@ public interface IMarketDataFeed<TCallbackUserData, TStatusCallbackUserData>
/// <returns><see langword="true" /> if a message was returned (and the <paramref name="mref"/>, <paramref name="mclass"/> and <paramref name="insref"/> fields will be filled) or <see langword="false" /> if there are no more messages in the current consumed data (or an error occured).</returns>
bool GetNextMessage(out int mref, out int mclass, out ulong insref);

/// <summary>
/// Fetches a message from the current consumed data if one is present and fills the output parameters with values representing the message fetched.
/// </summary>
/// <param name="mref">The fetched message reference. This should match a <see cref="MessageReference"/> value.</param>
/// <param name="insref">The fetched instrument reference, which is the unique id of an instrument.</param>
/// <returns><see langword="true" /> if a message was returned (and the <paramref name="mref"/> and <paramref name="insref"/> fields will be filled) or <see langword="false" /> if there are no more messages in the current consumed data (or an error occured).</returns>
bool GetNextMessage(out ushort mref, out ulong insref);

/// <summary>
/// Fetches a message from the current consumed data if one is present and fills the output parameters with values representing the message fetched.
/// </summary>
Expand All @@ -154,6 +162,14 @@ public interface IMarketDataFeed<TCallbackUserData, TStatusCallbackUserData>
/// <returns><see langword="true" /> if a message was returned (and the <paramref name="messageReference"/>, <paramref name="messageClasses"/> and <paramref name="insref"/> fields will be filled) or <see langword="false" /> if there are no more messages in the current consumed data (or an error occured).</returns>
bool GetNextMessage(out MessageReference messageReference, out MessageClasses messageClasses, out ulong insref);

/// <summary>
/// Fetches a message from the current consumed data if one is present and fills the output parameters with values representing the message fetched.
/// </summary>
/// <param name="messageReference">The fetched message reference.</param>
/// <param name="insref">The fetched instrument reference, which is the unique id of an instrument.</param>
/// <returns><see langword="true" /> if a message was returned (and the <paramref name="messageReference"/> and <paramref name="insref"/> fields will be filled) or <see langword="false" /> if there are no more messages in the current consumed data (or an error occured).</returns>
bool GetNextMessage(out MessageReference messageReference, out ulong insref);

/// <summary>
/// Fetches the next field from the current message.
/// </summary>
Expand Down
5 changes: 4 additions & 1 deletion Source/Millistream.Streaming/Interop/NativeImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ unsafe internal sealed class NativeImplementation
internal readonly delegate* unmanaged[Cdecl]<IntPtr, void> mdf_destroy;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, int, int> mdf_consume;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, ref int, ref int, ref ulong, int> mdf_get_next_message;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, ref ushort, ref ulong, int> mdf_get_next_message2;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, ref uint, ref IntPtr, int> mdf_get_next_field;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, MDF_OPTION, ref IntPtr, int> mdf_get_property;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, MDF_OPTION, ref int, int> mdf_get_int_property;
Expand Down Expand Up @@ -95,7 +96,9 @@ internal NativeImplementation(string libraryPath)
mdf_message_get_num = (delegate* unmanaged[Cdecl]<IntPtr, int>)nativeLibrary.GetExport(lib, nameof(mdf_message_get_num));
mdf_message_get_num_active = (delegate* unmanaged[Cdecl]<IntPtr, int>)nativeLibrary.GetExport(lib, nameof(mdf_message_get_num_active));

if (nativeLibrary.TryGetExport(lib, nameof(mdf_get_delay), out IntPtr address))
if (nativeLibrary.TryGetExport(lib, nameof(mdf_get_next_message2), out IntPtr address))
mdf_get_next_message2 = (delegate* unmanaged[Cdecl]<IntPtr, ref ushort, ref ulong, int>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_get_delay), out address))
mdf_get_delay = (delegate* unmanaged[Cdecl]<IntPtr, byte>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_get_mclass), out address))
mdf_get_mclass = (delegate* unmanaged[Cdecl]<IntPtr, ulong>)address;
Expand Down
53 changes: 52 additions & 1 deletion Source/Millistream.Streaming/MarketDataFeed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public ulong MessageClass
/// Consumes data sent from the server. If there currently is no data the function waits for <paramref name="timeout"/> number of seconds, if <paramref name="timeout"/> is zero (0) the function will return immediately. If <paramref name="timeout"/> is negative then the wait period is treated as number of microseconds instead of number of seconds (i.e. -1000 will wait a maximum of 1000µs).
/// </summary>
/// <param name="timeout">The wait period in seconds if positive. If negative, the value is treated as the number of microseconds to wait instead of the number of seconds.</param>
/// <returns>1 if data has been consumed that needs to be handled by <see cref="GetNextMessage(out int, out int, out ulong)" /> and no callback function has been registered. The function returns 0 on timeout or if a callback function is registered and there was data. On errors, -1 will be returned (and the connection will be dropped).</returns>
/// <returns>1 if data has been consumed that needs to be handled by <see cref="GetNextMessage(out ushort, out ulong)" /> and no callback function has been registered. The function returns 0 on timeout or if a callback function is registered and there was data. On errors, -1 will be returned (and the connection will be dropped).</returns>
/// <exception cref="ObjectDisposedException">The <see cref="MarketDataFeed{TCallbackData,TStatusCallbackData}"/> instance has been disposed.</exception>
/// <remarks>The corresponding native function is mdf_consume.</remarks>
public int Consume(int timeout)
Expand All @@ -411,6 +411,31 @@ public bool GetNextMessage(out int mref, out int mclass, out ulong insref)
return _nativeImplementation.mdf_get_next_message(_feedHandle, ref mref, ref mclass, ref insref) == 1;
}

/// <summary>
/// Fetches a message from the current consumed data if one is present and fills the output parameters with values representing the message fetched.
/// </summary>
/// <param name="mref">The fetched message reference. This should match a <see cref="MessageReference"/> value.</param>
/// <param name="insref">The fetched instrument reference, which is the unique id of an instrument.</param>
/// <returns><see langword="true" /> if a message was returned (and the <paramref name="mref"/> and <paramref name="insref"/> fields will be filled) or <see langword="false" /> if there are no more messages in the current consumed data (or an error occured).</returns>
/// <exception cref="ObjectDisposedException">The <see cref="MarketDataFeed{TCallbackData,TStatusCallbackData}"/> instance has been disposed.</exception>
/// <remarks>The corresponding native function is mdf_get_next_message2. If this function isn't included in the installed version of the native library, the mdf_get_next_message function will be called instead.</remarks>
public bool GetNextMessage(out ushort mref, out ulong insref)
{
mref = default;
insref = default;
ThrowIfDisposed();

if (_nativeImplementation.mdf_get_next_message2 == default)
{
bool ret = GetNextMessage(out int messageReference, out _, out insref);
if (messageReference >= ushort.MinValue && messageReference <= ushort.MaxValue)
mref = (ushort)messageReference;
return ret;
}

return _nativeImplementation.mdf_get_next_message2(_feedHandle, ref mref, ref insref) == 1;
}

/// <summary>
/// Fetches a message from the current consumed data if one is present and fills the output parameters with values representing the message fetched.
/// </summary>
Expand Down Expand Up @@ -440,6 +465,32 @@ public bool GetNextMessage(out MessageReference messageReference, out MessageCla
return ret;
}

/// <summary>
/// Fetches a message from the current consumed data if one is present and fills the output parameters with values representing the message fetched.
/// </summary>
/// <param name="messageReference">The fetched message reference.</param>
/// <param name="insref">The fetched instrument reference, which is the unique id of an instrument.</param>
/// <returns><see langword="true" /> if a message was returned (and the <paramref name="messageReference"/> and <paramref name="insref"/> fields will be filled) or <see langword="false" /> if there are no more messages in the current consumed data (or an error occured).</returns>
/// <exception cref="ObjectDisposedException">The <see cref="MarketDataFeed{TCallbackData,TStatusCallbackData}"/> instance has been disposed.</exception>
/// <exception cref="InvalidOperationException">An unknown/undefined message reference was fetched.</exception>
/// <remarks>The corresponding native function is mdf_get_next_message.</remarks>
public bool GetNextMessage(out MessageReference messageReference, out ulong insref)
{
bool ret = GetNextMessage(out ushort mref, out insref);
switch (ret)
{
case true:
if (!s_messageReferences.Contains(mref))
throw new InvalidOperationException($"{mref} is an unknown message reference.");
messageReference = (MessageReference)mref;
break;
default:
messageReference = default;
break;
}
return ret;
}

/// <summary>
/// Fetches the next field from the current message.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ bool ConsumeAndCount(string requestId)

if (ret == 1)
{
while (mdf.GetNextMessage(out int mref, out int _, out ulong instrumentReference))
while (mdf.GetNextMessage(out ushort mref, out ulong instrumentReference))
{
while (mdf.GetNextField(out uint field, out ReadOnlySpan<byte> value))
{
Expand Down Expand Up @@ -610,8 +610,8 @@ private static bool Consume(MarketDataFeed mdf, MessageReference messageReferenc
switch (ret)
{
case 1:
while (mdf.GetNextMessage(out int mref, out int _, out ulong _))
if (mref == (int)messageReference)
while (mdf.GetNextMessage(out MessageReference mref, out ulong _))
if (mref == messageReference)
return true;
break;
case -1:
Expand Down
Loading

0 comments on commit 19a0e1e

Please sign in to comment.