socket传输protobuf字节流的实例介绍
导读:收集整理的这篇文章主要介绍了socket传输protobuf字节流的实例介绍,觉得挺不错的,现在分享给大家,也给大家做个参考。上一篇讲到了数据的处理,这一篇主要讲使用多线程收发消息//创建消息数据模型 2//正式项目中,消息的结构一般是消...
收集整理的这篇文章主要介绍了socket传输protobuf字节流的实例介绍,觉得挺不错的,现在分享给大家,也给大家做个参考。上一篇讲到了数据的处理,这一篇主要讲使用多线程收发消息//创建消息数据模型 2//正式项目中,消息的结构一般是消息长度+消息id+消息主体内容 3 public class Message 4 {
public IExtensible PRotobuf;
public int messageid;
7 }
8 9 public class SocketClientTemp : MonoBehaviour 10 {
11 const int packageMaxLength = 1024;
12 13 Socket mSocket;
14 Thread threadSend;
15 Thread threadRecive;
16 Queuemessage>
allMessages = new QueueMessage>
();
17 Queuebyte[]>
sendQueue = new Queuebyte[]>
();
public bool InIT() 20 {
21 //创建一个socket对象 22 mSocket = new Socket(AddressFamily.internetwork, SocketTyPE.Stream, ProtocolType.Tcp);
23 return SocketConnection("此处是ip", 1111);
24 }
25 26 void Update() 27 {
28 AnalysisMessage();
29 }
30 31 /// summary>
32 /// 建立服务器连接 33 /// /summary>
34 /// param name="ip">
服务器的ip地址/param>
35 /// param name="port">
端口/param>
36 bool SocketConnection(string ip, int port) 37 {
38 try 39 {
40 IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(ip), port);
41 //同步连接服务器,实际使用时推荐使用异步连接,处理方式会在下一篇讲断线重连时讲到 42 mSocket.Connect(ipep);
43 //连接成功后,创建两个线程,分别用于发送和接收消息 44 threadSend = new Thread(new ThreadStart(SendMessage));
45 threadSend.Start();
46 threadRecive = new Thread(new ThreadStart(ReceiveMessage));
47 threadRecive.Start();
48 return true;
49 }
50 catch (Exception e) 51 {
52 Debug.LOG(e.ToString());
53 Close();
54 return false;
55 }
}
#region ...发送消息 59 /// summary>
60 /// 添加数据到发送队列 61 /// /summary>
62 /// param name="protobufModel">
/param>
63 /// param name="messageId">
/param>
64 public void AddSendMessageQueue(IExtensible protobufModel, int messageId) 65 {
66 sendQueue.Enqueue(BuildPackage(protobufModel, messageId));
67 }
void SendMessage() 70 {
71 //循环获取发送队列中第一个数据,然后发送到服务器 72 while (true) 73 {
74 if (sendQueue.Count == 0) 75 {
76 Thread.Sleep(100);
77 continue;
78 }
79 if (!mSocket.Connected) 80 {
81 Close();
82 break;
83 }
84 else 85 Send(sendQueue.Peek());
//发送队列中第一条数据 86 }
87 }
88 89 void Send(byte[] bytes) 90 {
91 try 92 {
93 mSocket.Send(bytes, SocketFlags.None);
94 //发送成功后,从发送队列中移除已发送的消息 95 sendQueue.Dequeue();
96 }
97 catch (SocketException e) 98 {
99 //如果错误码为10035,说明服务器缓存区满了,所以等100毫秒再次发送100 if (e.NativeErrorCode == 10035)101 {
102 Thread.Sleep(100);
103 Send(bytes);
104 }
105 else106 Debug.Log(e.ToString());
107 }
108 }
109 #endregion110 111 #region ...接收消息112 /// summary>
113 /// 解析收到的消息114 /// /summary>
115 void AnalysisMessage()116 {
117 while (allMessages.Count >
0)118 {
119 int id = allMessages.Dequeue().messageId;
120 switch (id)121 {
122 //根据消息id做不同的处理123 }
124 }
125 }
126 127 /// summary>
128 /// 接收数据129 /// /summary>
130 void ReceiveMessage()131 {
132 while (true)133 {
134 if (!mSocket.Connected)135 break;
136 byte[] recvBytesHead = GetBytesReceive(4);
137 int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(recvbytesHead, 0));
138 byte[] recvBytesBody = GetBytesReceive(bodyLength);
139 140 byte[] messageId = new byte[4];
141 Array.Copy(recvBytesBody, 0, messageId, 0, 4);
142 byte[] messageBody = new byte[bodyLength - 4];
143 Array.Copy(recvBytesBody, 4, messageBody, 0, bodyLength - 4);
144 145 if (BitConverter.IsLiTTLeEndian)146 Array.reverse(messageId);
147 FillAllPackages(BitConverter.ToInt32(messageId, 0), messageBody);
148 }
}
/// summary>
152 /// 填充接收消息队列153 /// /summary>
154 /// param name="messageId">
/param>
155 /// param name="messageBody">
/param>
156 void FillAllPackages(int messageId, byte[] messageBody)157 {
158 switch (messageId)159 {
160 //根据消息id处理消息,并添加到接收消息队列161 case 1:162 allMessages.Enqueue(new Message() 163 {
164 protobuf = ProtobufSerilizer.DeSerializetestTemp>
(messageBody), 165 messageId = messageId 166 }
);
167 break;
168 }
169 }
170 171 /// summary>
172 /// 接收数据并处理173 /// /summary>
174 /// param name="length">
/param>
175 /// returns>
/returns>
176 byte[] GetBytesReceive(int length)177 {
178 byte[] recvBytes = new byte[length];
179 while (length >
0)180 {
181 byte[] receiveBytes = new byte[length packageMaxLength ? length : packageMaxLength];
182 int iBytesBody = 0;
183 if (length >
= receiveBytes.Length)184 iBytesBody = mSocket.Receive(receiveBytes, receiveBytes.Length, 0);
185 else186 iBytesBody = mSocket.Receive(receiveBytes, length, 0);
187 receiveBytes.CopyTo(recvBytes, recvBytes.Length - length);
188 length -= iBytesBody;
189 }
190 return recvBytes;
191 }
192 #endregion193 194 /// summary>
195 /// 构建消息数据包196 /// /summary>
197 /// param name="protobufModel">
/param>
198 /// param name="messageId">
/param>
199 byte[] BuildPackage(IExtensible protobufModel, int messageId)200 {
201 byte[] b;
202 if (protobufModel != null)203 b = ProtobufSerilizer.Serialize(protobufModel);
204 else205 b = new byte[0];
206 //消息长度(int数据,长度4) + 消息id(int数据,长度4) + 消息主体内容207 ByteBuffer buf = ByteBuffer.Allocate(b.Length + 4 + 4);
208 //消息长度 = 消息主体内容长度 + 消息id长度209 buf.WriteInt(b.Length + 4);
210 buf.WriteInt(messageId);
211 212 if (protobufModel != null)213 buf.WriteBytes(b);
214 return buf.GetBytes();
215 }
216 217 void OnDestroy()218 {
219 //停止运行后,如果不关闭socket多线程,再次运行时,Unity会卡死220 Close();
221 }
222 223 /// summary>
224 /// 关闭socket,终止线程225 /// /summary>
226 public void Close()227 {
228 if (mSocket != null)229 {
230 //微软官方推荐在关闭socket前先shutdown,但是经过测试,发现网络断开后,shutdown会无法执行231 if (mSocket.Connected)232 mSocket.Shutdown(SocketShutdown.Both);
233 mSocket.Close();
234 mSocket = null;
235 }
236 //关闭线程237 if (threadSend != null)238 threadSend.Abort();
239 if (threadRecive != null)240 threadRecive.Abort();
241 threadSend = null;
242 threadRecive = null;
243 }
244 }
到这里,使用socket处理消息的收发就基本结束了,但是,某些项目为了增强体验,可能还会增加断线重连的功能,这个功能会在下一篇讲到
以上就是socket传输protobuf字节流的实例介绍的详细内容,更多请关注其它相关文章!
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: socket传输protobuf字节流的实例介绍
本文地址: https://pptw.com/jishu/592345.html
