Recently I had a case where I couldn't use the WCF for cross-process communication because of limits on the libraries I could load. I wrote these two classes to handle message passing and liked them so much I used them in another project; so I thought I'd share them. They will work in .NET 3.5 and greater.
The concept is really simple: when the message reader is started, it reads length-prefixed messages from a stream until the stream is closed. The message writer keeps messages in a queue and writes to the stream whenever the queue is not empty, again shutting down when the stream is closed. You can register for an event on the MessageReader to handle the messages as they come. The classes use asynchronous reads and writes to keep the overhead to a minimum. You can technically use any kind of stream, however TCP or a pipe would make the most sense. (I could see a use-case for logging actions to a file and then re-playing them too though).
It is important to note that when you receive notice of a message from the MessageReader, the current thread will be one of the ThreadPool worker threads. If you need do something in the UI, you will need to pass off the information or state change to the main thread or you'll get an exception.
MessageReader:
Code:
/// <summary>
/// This class reads length-prefixed messages from a stream until the
/// stream is closed. This class is thread safe as long as no-one else
/// is reading from the stream.
/// </summary>
class MessageReader
{
public delegate void MessageReadHandler(byte[] message);
public delegate void StreamClosedHandler();
/// <summary>
/// This event will be fired whenever a message is complete read from the stream
/// </summary>
public event MessageReadHandler MessageRead;
/// <summary>
/// This event will be fired when the stream is closed and the reader stops
/// </summary>
public event StreamClosedHandler StreamClosed;
private Stream fStream;
private byte[] fSizeBuffer = new byte[4];
private byte[] fBuffer = null;
private int fBytesRead = 0;
/// <summary>
/// Creates a new message reader with the specified stream.
/// </summary>
/// <param name="stream">The stream from which to read messages</param>
public MessageReader(Stream stream)
{
fStream = stream;
}
/// <summary>
/// Call this message to begin receiving messages. Simply close the
/// stream to stop receiving. Calling this more than once is an error.
/// </summary>
public void Start()
{
fStream.BeginRead(fSizeBuffer, 0, 4, FinishReadSize, null);
}
private void OnStreamClosed()
{
if (null != StreamClosed)
StreamClosed();
}
private void OnMessageRead(byte[] message)
{
// Ignore errors by the handlers
try
{
if (null != MessageRead)
MessageRead(message);
}
catch
{
}
}
private void FinishReadSize(IAsyncResult result)
{
try
{
// Read precisely four bytes for the length of the following message
int read = fStream.EndRead(result);
if (4 != read)
throw new Exception();
int size = BitConverter.ToInt32(fSizeBuffer, 0);
// Create a buffer to hold the message and start reading it.
fBytesRead = 0;
fBuffer = new byte[size];
fStream.BeginRead(fBuffer, 0, fBuffer.Length, FinishRead, null);
}
catch
{
OnStreamClosed();
}
}
private void FinishRead(IAsyncResult result)
{
try
{
// Finish reading from our stream. 0 bytes read means stream was closed
int read = fStream.EndRead(result);
if (0 == read)
throw new Exception();
// Increment the number of bytes we've read. If there's still more to get, get them
fBytesRead += read;
if (fBytesRead < fBuffer.Length)
{
fStream.BeginRead(fBuffer, fBytesRead, fBuffer.Length - fBytesRead, FinishRead, null);
return;
}
// Should be exactly the right number read now.
if (fBytesRead != fBuffer.Length)
throw new Exception();
// Handle the message and go get the next one.
OnMessageRead(fBuffer);
fStream.BeginRead(fSizeBuffer, 0, 4, FinishReadSize, null);
}
catch
{
OnStreamClosed();
}
}
}
MessageWriter:
Code:
/// <summary>
/// Asynchronously writes length-prefixed messages to a stream. This class
/// is thread-safe as long as no-one else writes to the stream (you may close
/// it at any time though).
/// </summary>
class MessageWriter
{
public delegate void StreamClosedHandler();
/// <summary>
/// This event will be fired when the stream is closed and the writer stops
/// </summary>
public event StreamClosedHandler StreamClosed;
private Stream fStream;
private Queue<byte[]> fToSend = new Queue<byte[]>();
private object fLock = new object();
/// <summary>
/// Creates a new message reader with the specified stream.
/// </summary>
/// <param name="stream">The stream to which to write messages</param>
public MessageWriter(Stream stream)
{
fStream = stream;
}
/// <summary>
/// Writes the specified message to the stream. A 4-byte length will be
/// prefixed onto the message
/// </summary>
/// <param name="message"></param>
public void Write(byte[] message)
{
// Prefix the message with its length
byte[] length = BitConverter.GetBytes(message.Length);
byte[] final = new byte[message.Length + 4];
Buffer.BlockCopy(length, 0, final, 0, 4);
Buffer.BlockCopy(message, 0, final, 4, message.Length);
// Add this message to the list of message to send. If it's the only one in the
// queue, fire up the async events to send it.
try
{
lock (fLock)
{
fToSend.Enqueue(final);
if (1 == fToSend.Count)
fStream.BeginWrite(final, 0, final.Length, FinishWrite, null);
}
}
catch
{
if (null != StreamClosed)
StreamClosed();
}
}
private void FinishWrite(IAsyncResult result)
{
try
{
fStream.EndWrite(result);
lock (fLock)
{
// Pop the message we just sent out of the queue
fToSend.Dequeue();
// See if there's anything else to send. Note, do not pop the message yet because
// that would indicate its safe to start writing a new message when its not.
if (fToSend.Count > 0)
{
byte[] final = fToSend.Peek();
fStream.BeginWrite(final, 0, final.Length, FinishWrite, null);
}
}
}
catch
{
if (null != StreamClosed)
StreamClosed();
}
}
}
The classes are then used like so:
Code:
public TcpClient Client { get; private set; }
public NetworkStream Stream { get; private set; }
private MessageReader fReader;
private MessageWriter fWriter;
public ClientState(TcpClient client)
{
Client = client;
Stream = client.GetStream();
// Setup the writer
fWriter = new MessageWriter(Stream);
fWriter.StreamClosed += StreamClosed;
// Start reading messages
fReader = new MessageReader(Stream);
fReader.MessageRead += GotMessage;
fReader.StreamClosed += StreamClosed;
fReader.Start();
}
private void StreamClosed()
{
try
{
Client.Close();
}
catch
{
}
}
private void GotMessage(byte[] bytes)
{
// Do stuff...
}
The message can be anything you want: XML, ProtoBuffers, text strings, serialized objects, etc.