Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve decompression memory management #80

Closed
yallie opened this issue Sep 22, 2020 · 2 comments
Closed

Improve decompression memory management #80

yallie opened this issue Sep 22, 2020 · 2 comments
Milestone

Comments

@yallie
Copy link
Member

yallie commented Sep 22, 2020

Zyan unpacks the compressed payload using MemoryStream.
When the buffer overflows, MemoryStream allocates the larger block and moves the data:
https://referencesource.microsoft.com/#mscorlib/system/io/memorystream.cs,283

Reallocation can be avoided if the required capacity is specified upfront:
https://referencesource.microsoft.com/#mscorlib/system/io/memorystream.cs,70

Or we can use chunked memory stream instead. Something like this perhaps:
https://github.com/Aethon/SmallBlockMemoryStream

@yallie yallie added this to the 2.14 milestone Sep 22, 2020
yallie added a commit to yallie/Zyan that referenced this issue Sep 25, 2020
yallie added a commit to yallie/Zyan that referenced this issue Sep 25, 2020
@yallie
Copy link
Member Author

yallie commented Sep 25, 2020

Benchmark code:

//
// Compile server using:
// csc test.cs /r:Zyan.Communication.dll /platform:anycpu /debug /out:server.exe
//
// Compile client using:
// csc.exe test.cs /r:Zyan.Communication.dll /platform:x86 /debug /out:client.exe
//
// Start up server.exe, then run client.exe.
//

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using Zyan.Communication;
using Zyan.Communication.ChannelSinks.Compression;
using Zyan.Communication.Protocols.Tcp;
using Zyan.Communication.Toolbox;

class CanceledSubscriptionTest
{
	static void Main()
	{
		try
		{
			StartServer();
		}
		catch
		{
			StartClient();
		}
	}

	// ------------ Server code --------------

	static void StartServer()
	{
		Console.Title = $"Server, PID: {Process.GetCurrentProcess().Id}";

		var proto = new TcpDuplexServerProtocolSetup(4321)
		{
		    CompressionMethod = CompressionMethod.Default
		};

		using (var host = new ZyanComponentHost(nameof(CanceledSubscriptionTest), proto))
		{
			host.RegisterComponent<IService, Service>();
			Console.WriteLine("Server started. Press ENTER to quit.");
			Console.ReadLine();
		}
	}

	public class Service : IService
	{
		public byte[][] GetPayload(int count)
		{
			var chunkSize = 8192;
			var arraySize = count / chunkSize;
			Console.WriteLine($"Generating {count} bytes of payload, that is, byte[{chunkSize}][{arraySize}].");

			var result = Enumerable.Range(1, arraySize).Select(i => new byte[chunkSize]).ToArray();

			Console.WriteLine($"Returning approx. {chunkSize * arraySize} bytes.");
			GC.Collect();
			GC.WaitForPendingFinalizers();
			GC.Collect();

			return result;
		}
	}

	// ------------ Shared code --------------

	public interface IService
	{
		byte[][] GetPayload(int count);
	}

	// ------------ Client code --------------

	static void StartClient()
	{
		Console.Title = $"Client, PID: {Process.GetCurrentProcess().Id}";

		var proto = new TcpDuplexClientProtocolSetup()
		{
		    CompressionMethod = CompressionMethod.Default
		};

		var url = proto.FormatUrl("localhost", 4321, nameof(CanceledSubscriptionTest));
		using (var conn = new ZyanConnection(url))
		{
			Console.WriteLine("Client started. Hit ENTER for 10 runs, ^C to quit.");

			var proxy = conn.CreateProxy<IService>();
            while (true)
			{
				var command = Console.ReadLine();
				var count = 0;
				if (!int.TryParse(command, out count))
				{
					count = 10;
				}

				try
				{
					var size = 2_343_750;
					for (var i = 0; i < count; i++)
					{
						Console.Write($"Get payload: {size} bytes...");
						var data = proxy.GetPayload(size);

						Console.WriteLine($"done!");
						size *= 2;
					}
				}
				catch (Exception ex)
				{
					Console.WriteLine($"Exception: {ex}");
					throw;
				}
			}
		}
	}
}

Plain MemoryStream, x86 — crashes on 300 Mb:

Get payload: 2343750 bytes...done!
Get payload: 4687500 bytes...done!
Get payload: 9375000 bytes...done!
Get payload: 18750000 bytes...done!
Get payload: 37500000 bytes...done!
Get payload: 75000000 bytes...done!
Get payload: 150000000 bytes...done!
Get payload: 300000000 bytes...Exception: System.OutOfMemoryException

Custom SmallBlockMemoryStream, x86 — crashes on 1200 Mb:

Get payload: 2343750 bytes...done!
Get payload: 4687500 bytes...done!
Get payload: 9375000 bytes...done!
Get payload: 18750000 bytes...done!
Get payload: 37500000 bytes...done!
Get payload: 75000000 bytes...done!
Get payload: 150000000 bytes...done!
Get payload: 300000000 bytes...done!
Get payload: 600000000 bytes...done!
Get payload: 1200000000 bytes...Exception: System.OutOfMemoryException

Custom SmallBlockMemoryStream, x64 — doesn't crash on 1200 Mb:

Get payload: 2343750 bytes...done!
Get payload: 4687500 bytes...done!
Get payload: 9375000 bytes...done!
Get payload: 18750000 bytes...done!
Get payload: 37500000 bytes...done!
Get payload: 75000000 bytes...done!
Get payload: 150000000 bytes...done!
Get payload: 300000000 bytes...done!
Get payload: 600000000 bytes...done!
Get payload: 1200000000 bytes...done!

@yallie
Copy link
Member Author

yallie commented Sep 28, 2020

Implemented in 1554ef0.

@yallie yallie closed this as completed Sep 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant