Skip to content

Commit

Permalink
Added Inject methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mgnsm committed Mar 6, 2024
1 parent 5327a64 commit 5b75d1e
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Source/Millistream.Streaming/Interop/NativeImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal sealed unsafe class NativeImplementation
internal readonly delegate* unmanaged[Cdecl]<IntPtr, string, int> mdf_connect;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, void> mdf_disconnect;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, ref ushort, ref ulong, ref uint, IntPtr> mdf_extract;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, IntPtr, uint, int> mdf_inject;
internal readonly delegate* unmanaged[Cdecl]<IntPtr> mdf_message_create;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, void> mdf_message_destroy;
internal readonly delegate* unmanaged[Cdecl]<IntPtr, void> mdf_message_reset;
Expand Down Expand Up @@ -126,6 +127,8 @@ internal NativeImplementation(string libraryPath)
mdf_get_mclass = (delegate* unmanaged[Cdecl]<IntPtr, ulong>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_extract), out address))
mdf_extract = (delegate* unmanaged[Cdecl]<IntPtr, ref ushort, ref ulong, ref uint, IntPtr>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_inject), out address))
mdf_inject = (delegate* unmanaged[Cdecl]<IntPtr, IntPtr, uint, int>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_message_add_int), out address))
mdf_message_add_int = (delegate* unmanaged[Cdecl]<IntPtr, uint, long, int, int>)address;
if (nativeLibrary.TryGetExport(lib, nameof(mdf_message_add_uint), out address))
Expand Down
32 changes: 32 additions & 0 deletions Source/Millistream.Streaming/MarketDataFeed.Inject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;

namespace Millistream.Streaming
{
public partial class MarketDataFeed<TCallbackUserData, TStatusCallbackUserData>
{
/// <summary>
/// Injects a previously extracted message to the handle.
/// </summary>
/// <param name="ptr">An unmanaged pointer to a message previously extracted using the <see cref="Extract(out ushort, out ulong, out uint)"/> method.</param>
/// <param name="len">The length of the previously extracted message in bytes.</param>
/// <returns>1 if no callback function has been registered, 0 if a callback function is registered and -1 on errors.</returns>
/// <remarks>After calling this method, you should call <see cref="GetNextMessage(out int, out int, out ulong)"/> / <see cref="GetNextMessage(out ushort, out ulong)"/> as usual until it returns <see langword="false"/>. The corresponding native function is mdf_inject.</remarks>
public unsafe int Inject(IntPtr ptr, uint len) =>
_nativeImplementation.mdf_inject == default ? -1 : _nativeImplementation.mdf_inject(_feedHandle, ptr, len);

/// <summary>
/// Injects a previously extracted message to the handle.
/// </summary>
/// <param name="data">The extracted message as a memory span of bytes.</param>
/// <returns>1 if no callback function has been registered, 0 if a callback function is registered and -1 on errors.</returns>
/// <remarks>After calling this method, you should call <see cref="GetNextMessage(out int, out int, out ulong)"/> / <see cref="GetNextMessage(out ushort, out ulong)"/> as usual until it returns <see langword="false"/>. The corresponding native function is mdf_inject.</remarks>
public unsafe int Inject(ReadOnlySpan<byte> data)
{
if (_nativeImplementation.mdf_inject == default)
return -1;

fixed (byte* bytes = data)
return _nativeImplementation.mdf_inject(_feedHandle, (IntPtr)bytes, (uint)data.Length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public void CreateInstrumentsTest()
}

[TestMethod]
public void ExtractMessageTest()
public void ExtractAndInjectMessageTest()
{
using MarketDataFeed mdf = new MarketDataFeed();
using Message message = new Message();
Expand All @@ -598,16 +598,52 @@ public void ExtractMessageTest()
Assert.IsTrue(message.AddNumeric(Fields.MDF_F_REQUESTCLASS, RequestClasses.MDF_RC_QUOTE));
Assert.IsTrue(message.AddNumeric(Fields.MDF_F_REQUESTTYPE, RequestTypes.MDF_RT_IMAGE));
const ulong InsRef = 772;
Assert.IsTrue(message.AddUInt64(Fields.MDF_F_INSREFLIST, InsRef, 0));
Assert.IsTrue(message.AddString(Fields.MDF_F_INSREFLIST, InsRef.ToString()));
Assert.IsTrue(mdf.Send(message));
//consume the request
Assert.AreEqual(1, mdf.Consume(3000));
//extract the message
IntPtr pointer = mdf.Extract(out ushort mref, out ulong insref, out uint len);
Assert.AreNotEqual(default, pointer);
IntPtr ptr = mdf.Extract(out ushort mref, out ulong insref, out uint len);
Assert.AreNotEqual(default, ptr);
Assert.AreEqual(MessageReferences.MDF_M_QUOTE, mref);
Assert.AreEqual(InsRef, insref);
Assert.IsTrue(len > 0);
//inject the message into the another handle using the pointer
using MarketDataFeed targetMdf = new MarketDataFeed();
Assert.AreEqual(1, targetMdf.Inject(ptr, len));
Assert.IsTrue(targetMdf.GetNextMessage(out ushort injectedMref, out ulong injectedInsRef));
Assert.AreEqual(MessageReferences.MDF_M_QUOTE, injectedMref);
Assert.AreEqual(InsRef, injectedInsRef);
//copy the data
byte[] data = new byte[len];
Marshal.Copy(ptr, data, 0, (int)len);
//inject the message using the copied data
using MarketDataFeed targetMdf2 = new MarketDataFeed();
Assert.AreEqual(1, targetMdf2.Inject(data));
Assert.IsTrue(targetMdf2.GetNextMessage(out injectedMref, out injectedInsRef));
Assert.AreEqual(MessageReferences.MDF_M_QUOTE, injectedMref);
Assert.AreEqual(InsRef, injectedInsRef);
//inject the message into a handle that has a callback
using MarketDataFeed targetMdf3 = new MarketDataFeed();
targetMdf3.DataCallback = (data, handle) =>
{
while (handle.GetNextMessage(out injectedMref, out injectedInsRef))
{
CallbackUserData callbackUserData = data as CallbackUserData;
Assert.IsNotNull(callbackUserData);
callbackUserData.Count++;
Assert.AreEqual(MessageReferences.MDF_M_QUOTE, injectedMref);
Assert.AreEqual(InsRef, injectedInsRef);
}
};
CallbackUserData callbackUserData = new CallbackUserData();
targetMdf3.CallbackUserData = callbackUserData;
Assert.AreEqual(0, targetMdf3.Inject(ptr, len));
Assert.AreEqual(1, callbackUserData.Count);

while (mdf.GetNextField(out uint _, out _)) ;
Assert.IsFalse(mdf.GetNextMessage(out ushort _, out _));
mdf.Disconnect();
}

private string GetTestRunParameter(string parameterName)
Expand Down Expand Up @@ -695,5 +731,9 @@ private static long GetTotalAllocatedBytes()
#endif
}

private class CallbackUserData
{
public int Count { get; set; }
}
}
}
33 changes: 33 additions & 0 deletions Tests/Millistream.Streaming.UnitTests/MarketDataFeedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,39 @@ public void ExtractTest()
nativeImplementation.Verify();
}

[TestMethod]
public void InjectReturnsMinusOneWhenNativeFunctionIsMissingTest()
{
NativeImplementation nativeImplementation = new(default)
{
mdf_inject = default
};
using MarketDataFeed mdf = new(nativeImplementation);
Assert.AreEqual(-1, mdf.Inject(new IntPtr(456), 3));
Assert.AreEqual(-1, mdf.Inject(new byte[] { 4, 5, 6 }));
}

[TestMethod]
public void InjectTest()
{
IntPtr feedHandle = new(123);
IntPtr ptr = new(456);
const uint Len = 3;

Mock<INativeImplementation> nativeImplementation = new();
nativeImplementation.Setup(x => x.mdf_create()).Returns(feedHandle);
NativeImplementation.Implementation = nativeImplementation.Object;

using MarketDataFeed mdf = new();
nativeImplementation.Setup(x => x.mdf_inject(feedHandle, ptr, Len)).Returns(1).Verifiable();
Assert.AreEqual(1, mdf.Inject(ptr, Len));
nativeImplementation.Verify();

nativeImplementation.Setup(x => x.mdf_inject(feedHandle, It.IsAny<IntPtr>(), It.IsAny<uint>())).Returns(2).Verifiable();
Assert.AreEqual(2, mdf.Inject(new byte[] { 4, 5, 6 }));
nativeImplementation.Verify();
}

private static void GetInt32Property(MDF_OPTION option, Func<MarketDataFeed, int> getter)
{
const int Value = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface INativeImplementation
int mdf_connect(IntPtr handle, string server);
void mdf_disconnect(IntPtr handle);
IntPtr mdf_extract(IntPtr handle, ref ushort mref, ref ulong insref, ref uint len);
int mdf_inject(IntPtr handle, IntPtr ptr, uint len);
IntPtr mdf_message_create();
void mdf_message_destroy(IntPtr message);
void mdf_message_reset(IntPtr message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal unsafe class NativeImplementation
internal delegate*<IntPtr, string, int> mdf_connect;
internal delegate*<IntPtr, void> mdf_disconnect;
internal delegate*<IntPtr, ref ushort, ref ulong, ref uint, IntPtr> mdf_extract;
internal delegate*<IntPtr, IntPtr, uint, int> mdf_inject;
internal delegate*<IntPtr> mdf_message_create;
internal delegate*<IntPtr, void> mdf_message_destroy;
internal delegate*<IntPtr, void> mdf_message_reset;
Expand Down Expand Up @@ -78,6 +79,7 @@ internal NativeImplementation(string _)
mdf_connect = &MdfConnect;
mdf_disconnect = &MdfDisconnect;
mdf_extract = &MdfExtract;
mdf_inject = &MdfInject;
mdf_message_create = &MdfMessageCreate;
mdf_message_destroy = &MdfMessageDestroy;
mdf_message_reset = &MdfMessageReset;
Expand Down Expand Up @@ -127,6 +129,7 @@ internal NativeImplementation(string _)
private static int MdfConnect(IntPtr handle, string server) => Implementation.mdf_connect(handle, server);
private static void MdfDisconnect(IntPtr handle) => Implementation.mdf_disconnect(handle);
private static IntPtr MdfExtract(IntPtr handle, ref ushort mref, ref ulong insref, ref uint len) => Implementation.mdf_extract(handle, ref mref, ref insref, ref len);
private static int MdfInject(IntPtr handle, IntPtr ptr, uint len) => Implementation.mdf_inject(handle, ptr, len);
private static IntPtr MdfMessageCreate() => Implementation.mdf_message_create();
private static void MdfMessageDestroy(IntPtr message) => Implementation.mdf_message_destroy(message);
private static void MdfMessageReset(IntPtr message) => Implementation.mdf_message_reset(message);
Expand Down

0 comments on commit 5b75d1e

Please sign in to comment.