Skip to content

Commit

Permalink
Added an Extract method
Browse files Browse the repository at this point in the history
  • Loading branch information
mgnsm committed Mar 6, 2024
1 parent 14a22c8 commit 5327a64
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Source/Millistream.Streaming/Interop/NativeImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ internal sealed unsafe class NativeImplementation
internal readonly delegate* unmanaged[Cdecl]<IntPtr, ulong> mdf_get_mclass;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, string, int> mdf_connect;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, void> mdf_disconnect;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, ref ushort, ref ulong, ref uint, IntPtr> mdf_extract;
internal readonly delegate* unmanaged[Cdecl]<IntPtr> mdf_message_create;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, void> mdf_message_destroy;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, void> mdf_message_reset;
Expand Down Expand Up @@ -123,6 +124,8 @@ internal NativeImplementation(string libraryPath)
mdf_get_delay = (delegate* unmanaged[Cdecl]<IntPtr, byte>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_get_mclass), out address))
mdf_get_mclass = (delegate* unmanaged[Cdecl]<IntPtr, ulong>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_extract), out address))
mdf_extract = (delegate* unmanaged[Cdecl]<IntPtr, ref ushort, ref ulong, ref uint, IntPtr>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_message_add_int), out address))
mdf_message_add_int = (delegate* unmanaged[Cdecl]<IntPtr, uint, long, int, int>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_message_add_uint), out address))
Expand Down
24 changes: 24 additions & 0 deletions Source/Millistream.Streaming/MarketDataFeed.Extract.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;

namespace Millistream.Streaming
{
public partial class MarketDataFeed<TCallbackUserData, TStatusCallbackUserData>
{
/// <summary>
/// Extracts the current message from the handle.
/// </summary>
/// <param name="mref">The message reference of the extracted message. This should match a <see cref="MessageReferences"/> value.</param>
/// <param name="insref">The unique instrument reference of the extracted message.</param>
/// <param name="len">The length of the extracted message in bytes.</param>
/// <returns>An unmanaged pointer to the extracted message if there was a message to return or <see langword="default(IntPtr)"/> if there are no more messages in the stream.</returns>
/// <remarks>The caller must make a copy of the returned data since the pointer is to the internal state of the handle. The corresponding native function is mdf_extract.</remarks>
public unsafe IntPtr Extract(out ushort mref, out ulong insref, out uint len)
{
mref = default;
insref = default;
len = default;
return _nativeImplementation.mdf_extract == default ? default
: _nativeImplementation.mdf_extract(_feedHandle, ref mref, ref insref, ref len);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,32 @@ public void CreateInstrumentsTest()
Assert.IsTrue(succeeded);
}

[TestMethod]
public void ExtractMessageTest()
{
using MarketDataFeed mdf = new MarketDataFeed();
using Message message = new Message();
//connect
Assert.IsTrue(mdf.Connect(GetTestRunParameter("host")));
//log on
Assert.IsTrue(LogOn(mdf, message));
//subscribe to quotes for instrument 772
Assert.IsTrue(message.Add(0, MessageReferences.MDF_M_REQUEST));
Assert.IsTrue(message.AddNumeric(Fields.MDF_F_REQUESTCLASS, RequestClasses.MDF_RC_QUOTE));
Assert.IsTrue(message.AddNumeric(Fields.MDF_F_REQUESTTYPE, RequestTypes.MDF_RT_IMAGE));
const ulong InsRef = 772;
Assert.IsTrue(message.AddUInt64(Fields.MDF_F_INSREFLIST, InsRef, 0));
Assert.IsTrue(mdf.Send(message));
//consume the request
Assert.AreEqual(1, mdf.Consume(3000));
//extract the message
IntPtr pointer = mdf.Extract(out ushort mref, out ulong insref, out uint len);
Assert.AreNotEqual(default, pointer);
Assert.AreEqual(MessageReferences.MDF_M_QUOTE, mref);
Assert.AreEqual(InsRef, insref);
Assert.IsTrue(len > 0);
}

private string GetTestRunParameter(string parameterName)
{
if (string.IsNullOrEmpty(parameterName))
Expand Down
46 changes: 46 additions & 0 deletions Tests/Millistream.Streaming.UnitTests/MarketDataFeedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class MarketDataFeedTests
private delegate void GetUInt64PropertyCallback(IntPtr handle, int option, ref ulong value);
private delegate void GetInt64PropertyCallback(IntPtr handle, int option, ref long value);
private delegate void GetIntPtrPropertyCallback(IntPtr handle, int option, ref IntPtr value);
private delegate void ExtractCallback(IntPtr handle, ref ushort mref, ref ulong insref, ref uint len);

[AssemblyInitialize]
public static void DoNotInitializeDefaultMarketDataFeedByDefaultTest(TestContext _1)
Expand Down Expand Up @@ -796,6 +797,51 @@ public void FetchInvalidTagTest()
#pragma warning restore CS0618
}

[TestMethod]
public void ExtractReturnsTheDefaultValueWhenNativeFunctionIsMissingTest()
{
NativeImplementation nativeImplementation = new(default)
{
mdf_extract = default
};
using MarketDataFeed mdf = new(nativeImplementation);
Assert.AreEqual(default, mdf.Extract(out ushort mref, out ulong insref, out uint len));
Assert.AreEqual(default, mref);
Assert.AreEqual(default, insref);
Assert.AreEqual(default, len);
}


[TestMethod]
public void ExtractTest()
{
IntPtr feedHandle = new(123);
IntPtr ptr = new(456);
const ushort MessageReference = MessageReferences.MDF_M_QUOTE;
const ulong InstrumentReference = 1000;
const uint Length = 500;

Mock<INativeImplementation> nativeImplementation = new();
nativeImplementation.Setup(x => x.mdf_create()).Returns(feedHandle);
NativeImplementation.Implementation = nativeImplementation.Object;

using MarketDataFeed mdf = new();
nativeImplementation.Setup(x => x.mdf_extract(feedHandle, ref It.Ref<ushort>.IsAny, ref It.Ref<ulong>.IsAny, ref It.Ref<uint>.IsAny))
.Callback(new ExtractCallback((IntPtr _, ref ushort mref, ref ulong insref, ref uint len) =>
{
mref = MessageReference;
insref = InstrumentReference;
len = Length;
}))
.Returns(ptr)
.Verifiable();
Assert.AreEqual(ptr, mdf.Extract(out ushort mref, out ulong insref, out uint len));
Assert.AreEqual(MessageReference, mref);
Assert.AreEqual(InstrumentReference, insref);
Assert.AreEqual(Length, len);
nativeImplementation.Verify();
}

private static void GetInt32Property(MDF_OPTION option, Func<MarketDataFeed, int> getter)
{
const int Value = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public interface INativeImplementation
ulong mdf_get_mclass(IntPtr handle);
int mdf_connect(IntPtr handle, string server);
void mdf_disconnect(IntPtr handle);
IntPtr mdf_extract(IntPtr handle, ref ushort mref, ref ulong insref, ref uint len);
IntPtr mdf_message_create();
void mdf_message_destroy(IntPtr message);
void mdf_message_reset(IntPtr message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal unsafe class NativeImplementation
internal delegate*<IntPtr, ulong> mdf_get_mclass;
internal delegate*<IntPtr, string, int> mdf_connect;
internal delegate*<IntPtr, void> mdf_disconnect;
internal delegate*<IntPtr, ref ushort, ref ulong, ref uint, IntPtr> mdf_extract;
internal delegate*<IntPtr> mdf_message_create;
internal delegate*<IntPtr, void> mdf_message_destroy;
internal delegate*<IntPtr, void> mdf_message_reset;
Expand Down Expand Up @@ -76,6 +77,7 @@ internal NativeImplementation(string _)
mdf_get_mclass = &MdfGetMClass;
mdf_connect = &MdfConnect;
mdf_disconnect = &MdfDisconnect;
mdf_extract = &MdfExtract;
mdf_message_create = &MdfMessageCreate;
mdf_message_destroy = &MdfMessageDestroy;
mdf_message_reset = &MdfMessageReset;
Expand Down Expand Up @@ -124,6 +126,7 @@ internal NativeImplementation(string _)
private static ulong MdfGetMClass(IntPtr handle) => Implementation.mdf_get_mclass(handle);
private static int MdfConnect(IntPtr handle, string server) => Implementation.mdf_connect(handle, server);
private static void MdfDisconnect(IntPtr handle) => Implementation.mdf_disconnect(handle);
private static IntPtr MdfExtract(IntPtr handle, ref ushort mref, ref ulong insref, ref uint len) => Implementation.mdf_extract(handle, ref mref, ref insref, ref len);
private static IntPtr MdfMessageCreate() => Implementation.mdf_message_create();
private static void MdfMessageDestroy(IntPtr message) => Implementation.mdf_message_destroy(message);
private static void MdfMessageReset(IntPtr message) => Implementation.mdf_message_reset(message);
Expand Down

0 comments on commit 5327a64

Please sign in to comment.