using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Sodao.FastSocket.Server; using Sodao.FastSocket.Server.Messaging; using Sodao.FastSocket.SocketBase; using HxCommonLib.Log; namespace HxSocket { /// /// 服务端程序监听类:与客户端建立连接,同时接收客户端消息,发送消息到客户端。 /// 接收消息是一个线程 /// 监听客户端连接是一个线程 /// public class SocketTcpListener { private bool m_bThreadRecvRunBreak = false; //接收线程中断 private bool m_bThreadConnRunBreak = false; //连接线程中断 private TcpListener m_tcpServerListener = null; private TcpClient m_tcpClient = null; private NetworkStream m_tcpRecvStream = null; private NetworkStream m_tcpSendStream = null; public string m_strRecvMessage = null; //接收到消息内容 public bool m_bRecvMessage = false; //是否接收到消息 public bool m_bConnectedClient = false; //已经连接到客户端 public List m_listRecvMessage = new List(); //缓存所有的消息队列,同时可以收到多条消息 public string m_strDevicesConntectedStatus = ""; //设备连接状态,心跳包 /// /// //用于锁定m_listRecvMessage /// public object m_objListLock = new object(); public SocketTcpListener() { m_bConnectedClient = false; } /// /// 接收消息线程 /// void RecvThreadMethod() { try { byte[] data = new byte[1024]; //[1]TcpListener对Socket进行了封装,这各类会自己创建Socket对象,服务端端口为2018 m_tcpServerListener = new TcpListener(IPAddress.Any, 2018); //[2]开始进行监听 m_tcpServerListener.Start(); //[3]等待客户端连接过来 m_tcpClient = m_tcpServerListener.AcceptTcpClient(); if (m_tcpClient != null) { LogConstant.logger.Print("客户端已连接:" + m_tcpClient.Client.RemoteEndPoint.ToString()); m_bConnectedClient = true; //[4]取得从客户端发来的数据 m_tcpRecvStream = m_tcpClient.GetStream(); //这是一个网络流,从这个网络流可以去的从客户端发来的数据 } while (!m_bThreadRecvRunBreak) { /* * 0表示从数组的哪个索引开始读取数据 * 1024 表示最大的读取数 * */ try { if (m_bConnectedClient) { int length = m_tcpRecvStream.Read(data, 0, 1024); m_strRecvMessage = Encoding.UTF8.GetString(data, 0, length); AddMessageToList(); } else { Thread.Sleep(500); } } catch (System.Exception ex) { LogConstant.logger.Print("[ERROR] " + ex.ToString()); } } } catch (System.Exception ex) { LogConstant.logger.Print("[ERROR] " + ex.ToString()); } } void AddMessageToList() { if (m_strRecvMessage.Length > 0) { m_bRecvMessage = true; int nListRecvMsgCnt = 0; string textCommand = m_strRecvMessage; string[] asCommand = textCommand.Split(new string[] {"\r\n"}, StringSplitOptions.RemoveEmptyEntries); //防止两条命令合并为一条来接收 if (asCommand.Length >= 0) { lock (m_objListLock) { for (int i = 0; i < asCommand.Length; i++) { if (asCommand[i].Contains("COMMAND")) { if (asCommand[i].Contains("ID0")) //HeartBeat { m_strDevicesConntectedStatus = asCommand[i].Substring(asCommand[i].IndexOf("COMMAND") + 7); continue; } m_listRecvMessage.Add(asCommand[i] + "\r\n"); } } nListRecvMsgCnt = m_listRecvMessage.Count; } } LogConstant.logger.Print("接收:" + m_strRecvMessage + "=" + nListRecvMsgCnt); } } /// /// 判断客户端是否连接,断开后重新建立连接线程 /// void ConnectThreadMethod() { while (!m_bThreadConnRunBreak) { try { if (m_tcpClient != null && m_tcpClient.Connected) { try { int a1 = m_tcpClient.Client.Available; int a2 = m_tcpClient.Available; if (m_tcpClient.Client.Poll(500, SelectMode.SelectRead)) { byte[] data = new byte[512]; int nRead = m_tcpClient.Client.Receive(data); if (nRead == 0) { //socket连接已断开 m_bConnectedClient = false; } } } catch (System.Exception ex) { if (m_tcpSendStream != null) { m_tcpSendStream.Close(); m_tcpSendStream.Dispose(); m_tcpSendStream = null; } if (m_tcpClient != null) { m_tcpClient.Close(); } m_bConnectedClient = false; LogConstant.logger.Print("[ERROR] " + ex.ToString()); } } if (m_tcpClient != null && (!m_tcpClient.Connected || !m_tcpClient.Client.Connected || !m_bConnectedClient)) { if (m_tcpSendStream != null) { m_tcpSendStream.Close(); m_tcpSendStream.Dispose(); m_tcpSendStream = null; } if (m_tcpClient != null) { m_tcpClient.Close(); } m_bConnectedClient = false; LogConstant.logger.Print("客户端已断开:" + m_tcpClient.Connected); //[3]等待客户端连接过来 m_tcpClient = m_tcpServerListener.AcceptTcpClient(); if (m_tcpClient != null && m_tcpClient.Connected) { LogConstant.logger.Print("客户端已连接:" + m_tcpClient.Client.RemoteEndPoint.ToString()); m_tcpRecvStream = m_tcpClient.GetStream();//这是一个网络流,从这个网络流可以去的从客户端发来的数据 m_bConnectedClient = true; } } Thread.Sleep(1000); } catch (System.Exception ex) { LogConstant.logger.Print("[ERROR] " + ex.ToString()); Thread.Sleep(1000); } } } /// /// 发送消息线程 /// /// public void SendMessage(string strMsg) { if (m_tcpServerListener == null) { return; } if (m_tcpSendStream == null && m_tcpClient != null && m_bConnectedClient) { m_tcpSendStream = m_tcpClient.GetStream(); } if (m_tcpSendStream == null || !m_bConnectedClient) { return; } try { lock (m_tcpSendStream) { byte[] dataMsg = Encoding.UTF8.GetBytes(strMsg); m_tcpSendStream.Write(dataMsg, 0, dataMsg.Length); LogConstant.logger.Print("发送:" + strMsg); } } catch (System.Exception ex) { LogConstant.logger.Print("[ERROR] " + ex.ToString()); } } /// /// 线程开始,启动函数 /// public void SocketStart() { LogConstant.logger.Print("监听线程开始,监听端口:" + 2018); try { Thread aThread = new Thread(new ThreadStart(RecvThreadMethod)); Thread bThread = new Thread(new ThreadStart(ConnectThreadMethod)); // Try to restart the aborted thread aThread.Start(); bThread.Start(); } catch (ThreadStateException ex) { LogConstant.logger.Print("[ERROR] " + ex.ToString()); } } /// /// 线程结束 /// public void SocketClose() { if (m_tcpSendStream != null) { m_tcpSendStream.Close(); } if (m_tcpRecvStream != null) { m_tcpRecvStream.Close(); } m_bThreadConnRunBreak = true; m_bThreadRecvRunBreak = true; if (m_tcpClient != null) { m_bThreadConnRunBreak = true; Thread.Sleep(700); m_tcpClient.Close(); } if (m_tcpServerListener != null) { m_tcpServerListener.Stop(); //停止监听 m_bThreadRecvRunBreak = true; Thread.Sleep(700); } } /// /// 判断是否正常返回RETN,等到接收,30s超时 /// /// /// public bool CheckReturn(string strDevID, string strReturn) { bool bReturn = false; int nTimeOutCount = 0; //超时计数器 while (true) { lock (m_objListLock) { List listIndex = new List(); for (int i = 0; i < m_listRecvMessage.Count; i++) { string strRecvMessage = m_listRecvMessage[i]; if (strRecvMessage.Contains(strDevID) && strRecvMessage.Contains(strReturn)) { listIndex.Add(i); m_bRecvMessage = false; bReturn = true; break; } } //清空已处理的命令,从后往前清空 for (int i = listIndex.Count - 1; i >= 0; i--) { m_listRecvMessage.RemoveAt(listIndex[i]); } } if (bReturn) { break; } Thread.Sleep(10); nTimeOutCount++; if (nTimeOutCount > 500) //30s超时 { break; } } return bReturn; } /// /// 判断是否正常返回COMMANDDone /// /// /// public int CheckReturn(string strDevID) { int nRetCount = 0; lock (m_objListLock) { List listIndex = new List(); for (int i = 0; i < m_listRecvMessage.Count; i++) { string strRecvMessage = m_listRecvMessage[i]; if (strRecvMessage.Contains(strDevID) && strRecvMessage.Contains("COMMANDDone")) { listIndex.Add(i); m_bRecvMessage = false; nRetCount++; } } //清空已处理的命令,从后往前清空 for (int i = listIndex.Count - 1; i >= 0; i--) { m_listRecvMessage.RemoveAt(listIndex[i]); } } return nRetCount; } } /// /// 汇像socket通信类 /// public class HxSocketServerService : AbsSocketService { /// /// //主控连接句柄 /// private static IConnection m_mainConnection = null; /// /// //用于锁定m_listCacheMessage /// public static object m_objCacheLock = new object(); /// /// //缓存所有的消息队列,同时可以收到多条消息 /// public static List m_listCacheMessage = new List(); /// /// //设备连接状态,心跳包 /// public static string m_strDevicesConntectedStatus = ""; /// /// 获取通信连接状态 /// /// public static bool GetConnectedStatus() { if (m_mainConnection != null && m_mainConnection.Active) { return true; } else { return false; } } /// /// 连接函数 /// /// public override void OnConnected(IConnection connection) { if (m_mainConnection == null) { base.OnConnected(connection); LogConstant.logger.Print("已连上客户端: " + connection.RemoteEndPoint.ToString() + " " + connection.ConnectionID.ToString()); connection.BeginReceive(); m_mainConnection = connection; } else { m_mainConnection.BeginDisconnect(); while (m_mainConnection.Active) { Thread.Sleep(200); } m_mainConnection = null; } } /// /// 接收函数 /// /// /// public override void OnReceived(IConnection connection, CommandLineMessage message) { try { base.OnReceived(connection, message); switch (message.CmdName) { case "echo": message.Reply(connection, "echo_reply " + message.Parameters[0]); break; case "init": LogConstant.logger.Print("connection:" + connection.ConnectionID.ToString() + " init"); message.Reply(connection, "init_reply ok"); break; case "RunCommand": if (message.Parameters != null && message.Parameters.Length > 0) { string strContent = String.Join(" ", message.Parameters) + Environment.NewLine; AddMessageToList(strContent); LogConstant.logger.Print("接收:" + strContent); } break; default: AddMessageToList(message.CmdName); LogConstant.logger.Print("接收未知消息:" + message.CmdName); break; } } catch (System.Exception ex) { LogConstant.logger.Print("[ERROR] " + ex.ToString()); } } /// /// 断连函数 /// /// /// public override void OnDisconnected(IConnection connection, Exception ex) { base.OnDisconnected(connection, ex); m_mainConnection = null; LogConstant.logger.Print(connection.RemoteEndPoint.ToString() + " disconnected"); } /// /// 异常函数 /// /// /// public override void OnException(IConnection connection, Exception ex) { base.OnException(connection, ex); LogConstant.logger.Print("[ERROR] " + ex.ToString()); } /// /// ToPacket see cref="SocketBase.Packet" /// /// /// /// value is null public static Packet ToPacket(string value) { if (value == null) throw new ArgumentNullException("value"); return new Packet(Encoding.UTF8.GetBytes(string.Concat(value, Environment.NewLine))); } /// /// 发送消息 /// /// public static void SendMessage(string message) { if (m_mainConnection != null && m_mainConnection.Active) { m_mainConnection.BeginSend(ToPacket(message)); } } /// /// 添加消息到缓存队列 /// /// private void AddMessageToList(string strRecvMessage) { if (strRecvMessage.Length > 0) { int nListRecvMsgCnt = 0; //防止两条命令合并为一条来接收 string[] astrCommand = strRecvMessage.Split(new string[] { "\r\n" }, StringSplitOptions.RemoveEmptyEntries); if (astrCommand.Length >= 0) { lock (m_objCacheLock) { for (int i = 0; i < astrCommand.Length; i++) { if (astrCommand[i].Contains("ID0") && astrCommand[i].Contains("COMMAND")) //HeartBeat { m_strDevicesConntectedStatus = astrCommand[i].Substring(astrCommand[i].IndexOf("COMMAND") + 7); continue; } if (astrCommand[i].Contains("COMMAND")) { m_listCacheMessage.Add(astrCommand[i] + "\r\n"); } } nListRecvMsgCnt = m_listCacheMessage.Count; } } LogConstant.logger.Print("ListRecvMsgCnt = " + nListRecvMsgCnt + "MsgLength = " + strRecvMessage.Length); } } /// /// 判断是否正常返回RETN,等到接收,30s超时 /// /// 设备ID /// 返回的指令内容 /// 是否正常返回Retn指令 public static bool CheckReturn(string strDevID, string strReturn) { bool bReturn = false; int nTimeOutCount = 0; //超时计数器 while (true) { lock (m_objCacheLock) { List listIndex = new List(); for (int i = 0; i < m_listCacheMessage.Count; i++) { string strRecvMessage = m_listCacheMessage[i]; if (strRecvMessage.Contains(strDevID) && strRecvMessage.Contains(strReturn)) { listIndex.Add(i); bReturn = true; break; } } //清空已处理的命令,从后往前清空 for (int i = listIndex.Count - 1; i >= 0; i--) { m_listCacheMessage.RemoveAt(listIndex[i]); } } if (bReturn) { break; } Thread.Sleep(10); nTimeOutCount++; if (nTimeOutCount > 500) //30s超时 { break; } } return bReturn; } /// /// 判断是否正常返回RETN,等到接收,30s超时 /// /// 设备ID /// 返回的指令内容 /// 超时时间 ms /// 是否正常返回Retn指令 public static bool CheckReturn(string strDevID, string strRetnCmd, int nTimeout) { bool bReturn = false; int nTimeOutCount = 0; //超时计数器 int nMaxCount = nTimeout / 10; //最大超时次数 while (true) { lock (m_objCacheLock) { List listIndex = new List(); for (int i = 0; i < m_listCacheMessage.Count; i++) { string strRecvMessage = m_listCacheMessage[i]; if (strRecvMessage.Contains(strDevID) && strRecvMessage.Contains(strRetnCmd)) { listIndex.Add(i); bReturn = true; break; } } //清空已处理的命令,从后往前清空 for (int i = listIndex.Count - 1; i >= 0; i--) { m_listCacheMessage.RemoveAt(listIndex[i]); } } if (bReturn) { break; } if (nTimeOutCount++ > nMaxCount) //30s超时 { break; } Thread.Sleep(10); } return bReturn; } /// /// 判断是否正常返回COMMANDDone /// /// 设备ID /// 判断返回Done指令次数,并从缓存清空 public static int CheckDone(string strDevID) { int nRetCount = 0; lock (m_objCacheLock) { List listIndex = new List(); for (int i = 0; i < m_listCacheMessage.Count; i++) { string strRecvMessage = m_listCacheMessage[i]; if (strRecvMessage.Contains(strDevID) && strRecvMessage.Contains("COMMANDDone")) { listIndex.Add(i); nRetCount++; } } //清空已处理的命令,从后往前清空 for (int i = listIndex.Count - 1; i >= 0; i--) { m_listCacheMessage.RemoveAt(listIndex[i]); } } return nRetCount; } /// /// 判断是否正常返回COMMANDDone /// /// 设备ID /// 超时时间 ms /// 是否成功返回 public static bool CheckDone(string strDevID, int nTimeout) { bool bReturn = false; int nTimeOutCount = 0; //超时计数器 int nMaxCount = nTimeout / 10; //最大超时次数 while (true) { lock (m_objCacheLock) { for (int i = 0; i < m_listCacheMessage.Count; i++) { string strRecvMessage = m_listCacheMessage[i]; if (strRecvMessage.Contains(strDevID) && strRecvMessage.Contains("COMMANDDone")) { //清空已处理的命令,从后往前清空 m_listCacheMessage.RemoveAt(i); bReturn = true; break; } } } if (bReturn) { break; } if (nTimeOutCount++ > nMaxCount) { break; } Thread.Sleep(10); } return bReturn; } /// /// 判断是否正常返回COMMANDDone /// /// 设备ID /// 命令的唯一编号 /// 超时时间 ms /// 是否成功返回 public static bool CheckDone(string strDevID, string strCmdNo, int nTimeout) { bool bReturn = false; int nTimeOutCount = 0; //超时计数器 int nMaxCount = nTimeout / 10; //最大超时次数 while (true) { lock (m_objCacheLock) { for (int i = 0; i < m_listCacheMessage.Count; i++) { string strRecvMessage = m_listCacheMessage[i]; if (strRecvMessage.Contains(strDevID) && strRecvMessage.Contains(strCmdNo) && strRecvMessage.Contains("COMMANDDone")) { //清空已处理的命令,从后往前清空 m_listCacheMessage.RemoveAt(i); bReturn = true; break; } } } if (bReturn) { break; } if (nTimeOutCount++ > nMaxCount) { break; } Thread.Sleep(10); } return bReturn; } /// /// 获取指令返回的字符串 /// /// 设备ID /// 发送的请求指令名称如GET0,SET0,ALAM,STOP /// 超时时间 ms /// 失败返回为空 public static string GetReturnValue(string strDevID, string strInCmd, int nTimeout) { bool bReturn = false; string strReturn = ""; int nTimeOutCount = 0; //超时计数器 int nMaxCount = nTimeout / 10; //最大超时次数 while (true) //等待判断运动到位 { lock (m_objCacheLock) { List listIndex = new List(); for (int i = 0; i < m_listCacheMessage.Count; i++) { string strRecvMessage = m_listCacheMessage[i]; if (strRecvMessage.Contains(strDevID) && strRecvMessage.Contains(strInCmd))//"GET0" { listIndex.Add(i); strReturn = strRecvMessage.Substring(strRecvMessage.IndexOf(strInCmd)); bReturn = true; break; } } //清空已处理的命令 for (int i = listIndex.Count - 1; i >= 0; i--) { m_listCacheMessage.RemoveAt(listIndex[i]); } } if (bReturn) { break; } if (nTimeOutCount++ > nMaxCount) //超时 { break; } System.Threading.Thread.Sleep(10); } return strReturn; } /// /// 是否资源 /// public static void Release() { if (m_mainConnection != null) { m_mainConnection.BeginDisconnect(); m_mainConnection = null; } m_listCacheMessage.Clear(); } } }