using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace Sodao.FastSocket.Server
{
///
/// upd server
///
///
public sealed class UdpServer : IUdpServer where TMessage : class, Messaging.IMessage
{
#region Private Members
private readonly int _port;
private readonly int _messageBufferSize;
private Socket _socket = null;
private AsyncSendPool _pool = null;
private readonly Protocol.IUdpProtocol _protocol = null;
private readonly IUdpService _service = null;
#endregion
#region Constructors
///
/// new
///
///
///
///
public UdpServer(int port, Protocol.IUdpProtocol protocol,
IUdpService service)
: this(port, 2048, protocol, service)
{
}
///
/// new
///
///
///
///
///
/// protocol is null.
/// service is null.
public UdpServer(int port, int messageBufferSize,
Protocol.IUdpProtocol protocol,
IUdpService service)
{
if (protocol == null) throw new ArgumentNullException("protocol");
if (service == null) throw new ArgumentNullException("service");
this._port = port;
this._messageBufferSize = messageBufferSize;
this._protocol = protocol;
this._service = service;
}
#endregion
#region Private Methods
///
/// 异步接收数据
///
///
private void BeginReceive(SocketAsyncEventArgs e)
{
if (!this._socket.ReceiveFromAsync(e))
ThreadPool.QueueUserWorkItem(_ => this.ReceiveCompleted(this, e));
}
///
/// completed handle
///
///
///
private void ReceiveCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
var session = new UdpSession(e.RemoteEndPoint, this);
TMessage message = null;
try { message = this._protocol.Parse(new ArraySegment(e.Buffer, 0, e.BytesTransferred)); }
catch (Exception ex)
{
SocketBase.Log.Trace.Error(ex.Message, ex);
this._service.OnError(session, ex);
}
if (message != null) this._service.OnReceived(session, message);
}
//receive again
this.BeginReceive(e);
}
#endregion
#region IUdpServer Members
///
/// start
///
public void Start()
{
this._socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this._socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
this._socket.Bind(new IPEndPoint(IPAddress.Any, this._port));
this._socket.DontFragment = true;
this._pool = new AsyncSendPool(this._messageBufferSize, this._socket);
var e = new SocketAsyncEventArgs();
e.RemoteEndPoint = new IPEndPoint(IPAddress.Any, 0);
e.SetBuffer(new byte[this._messageBufferSize], 0, this._messageBufferSize);
e.Completed += this.ReceiveCompleted;
this.BeginReceive(e);
}
///
/// stop
///
public void Stop()
{
this._socket.Close();
this._socket = null;
this._pool = null;
}
///
/// send to...
///
///
///
public void SendTo(EndPoint endPoint, byte[] payload)
{
this._pool.SendAsync(endPoint, payload);
}
#endregion
///
/// 用于异步发送的对象池
///
private class AsyncSendPool
{
#region Private Members
private const int MAXPOOLSIZE = 3000;
private readonly int _messageBufferSize;
private readonly Socket _socket = null;
private readonly ConcurrentStack _stack =
new ConcurrentStack();
#endregion
#region Constructors
///
/// new
///
///
///
public AsyncSendPool(int messageBufferSize, Socket socket)
{
if (socket == null) throw new ArgumentNullException("socket");
this._messageBufferSize = messageBufferSize;
this._socket = socket;
}
#endregion
#region Private Methods
///
/// send completed handle
///
///
///
private void SendCompleted(object sender, SocketAsyncEventArgs e)
{
this.Release(e);
}
#endregion
#region Public Methods
///
/// acquire
///
///
public SocketAsyncEventArgs Acquire()
{
SocketAsyncEventArgs e;
if (this._stack.TryPop(out e)) return e;
e = new SocketAsyncEventArgs();
e.SetBuffer(new byte[this._messageBufferSize], 0, this._messageBufferSize);
e.Completed += this.SendCompleted;
return e;
}
///
/// release
///
///
public void Release(SocketAsyncEventArgs e)
{
if (this._stack.Count >= MAXPOOLSIZE)
{
e.Completed -= this.SendCompleted;
e.Dispose();
return;
}
this._stack.Push(e);
}
///
/// sned async
///
///
///
/// endPoint is null
/// payload is null or empty
/// payload length大于messageBufferSize
public void SendAsync(EndPoint endPoint, byte[] payload)
{
if (endPoint == null) throw new ArgumentNullException("endPoint");
if (payload == null || payload.Length == 0) throw new ArgumentNullException("payload");
if (payload.Length > this._messageBufferSize)
throw new ArgumentOutOfRangeException("payload.Length", "payload length大于messageBufferSize");
var e = this.Acquire();
e.RemoteEndPoint = endPoint;
Buffer.BlockCopy(payload, 0, e.Buffer, 0, payload.Length);
e.SetBuffer(0, payload.Length);
if (!this._socket.SendToAsync(e))
ThreadPool.QueueUserWorkItem(_ => this.Release(e));
}
#endregion
}
}
}