目录
数据类型 ping用到的数据类型主要有这几种:
类型
中文
描述
UnsignedShort
无符号短整数
由2个字节组成,表示整数
VarInt
可变整数
由1个或多个字节组成,表示整数
String
字符串
开头由varint表示数据长度,后面接由utf8编码的字符串
Byte
字节型
由1个字节组成,表示字节
Long
长整数
由8个字节组成,表示长整数
VarInt varint是一种可变长度的整数.
编码过程:
若数值小于或等于127则直接转为字节型
先将数字转为二进制,七位为一组,从后往前每一组在开头写入1,在最后一组写入0.
将获得的每一种二进制数转为字节型,并存入数组
将数组中的每一个字节写入到数据流中
详解varint编码原理 - Kevin Yan
python代码实现: 这段代码是很久以前写的了,可能很烂XD
至于为什么不写其他语言的…一个是我懒,一个是部分语言自带varint,还有就是我对一些语言都不熟.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 class VarInt : def __init__ (self, value ): if type (value) != bytes : self.value = bytes (self.value_of(value)) if len (self.value) == 0 : self.value = bytes ([0 ]) else : self.value = value self.__compile__() @staticmethod def value_of (value ): if type (value) == int : return_list = [] while value > 0 : if value > 127 : return_list.append(value % 128 + 128 ) value //= 128 else : return_list.append(value) value = 0 return VarInt(bytes (return_list)) elif type (value) == str : return VarInt(bytes (value, 'utf-8' )) elif type (value) == VarInt: return VarInt(value.__bytes__()) elif type (value) == list : return VarInt(bytes (value)) def __compile__ (self ): d = 128 i = -1 while d >= 128 : i += 1 d = self.value[i] if len (self.value) > 0 else 0 self.value = self.value[:i + 1 ] def __int__ (self ): return_int = 0 for i in self.value[::-1 ]: if i > 128 : return_int *= 128 return_int += i - 128 else : return_int = i return return_int
handshake包 写在前面:服务端建立连接并接受到handshake包之后应该继续s.recv()而不是s.accept(),我不希望再有人像我这个究极大傻吊一样再因为接收不到status0x00包而卡住了.
注:C表示Client,客户端; S表示Server,服务端.
握手的流程
C->S handshakeC0x00 发送握手包
服务器切换状态
握手包的结构 C->S handshakeC0x00
数据
数据类型
描述
包长度
VarInt
包id与包数据的长度之和
包id
VarInt
包的id
mc通信协议版本
VarInt
mc通信协议的版本
服务器地址
String(255)
服务器地址
服务器端口
UnsignedShort
服务器的端口,由四个字节组成
handshake类型
Byte
状态,1为ping,2为登录
例 1 0xf,0x0,0x2f,0x9,0x31,0x32,0x37,0x2e,0x30,0x2e,0x30,0x2e,0x31,0x63,0xdd,0x1
第一个0xf代表的是除去自身以外,包长度为15.
后边的0x0为包id:0.
0x2f对应1.8.x版本号:47.
0x9代表服务器地址长度为9,紧跟着的9个字节代表127.0.0.1.
0x63 0xdd转换后为25565,代表服务器端口号.
最后的0x1表示当前的状态为ping.
status包 status包的流程
C->S statusC0x00 发送状态请求包
S->C statusS0x00 返回服务器状态(包含图标motd人数之类)
C->S statusC0x01 发送ping请求包
S->C statusS0x01 返回pong包
C->S statusC0x00:
数据
数据类型
描述
包长度
VarInt
包id的长度
包id
VarInt
包的id
此包无任何字段,仅为一个请求.
S->C statusS0x00:
数据
数据类型
描述
包长度
VarInt
包id与包数据的长度之和
包id
VarInt
包的id
json回应数据
String(32767)
表示服务器状态的json
json回应数据格式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 { "version" : { "name" : "1.19" , "protocol" : 759 } , "players" : { "max" : 100 , "online" : 5 , "sample" : [ { "name" : "thinkofdeath" , "id" : "4566e69f-c907-48ee-8d71-d7ba5aa00d20" } ] } , "description" : { "text" : "Hello world" } , "favicon" : "data:image/png;base64,<data>" , "previewsChat" : true }
此json来自wiki.vg
sample里似乎必须含有至少一个样本,此样本将会在玩家把鼠标放在信号标上时显示.当然,你可以把这个当作是信息显示,不过需要注意的是,uuid 须为正确格式的uuid .
图片是base64编码的png图片,大小必须为64*64
什么?你说你不知道什么是base64?我觉得谷歌 是个好东西..
C->S statusC0x01:
数据
数据类型
描述
包长度
VarInt
包id与包数据的长度之和
包id
VarInt
包的id
识别码(大概)
Long
用于分辨pong包
识别码为客户端随机生成的一串长整数,服务器返回pong包时需要带上此参数.
S->C statusS0x01:
数据
数据类型
描述
包长度
VarInt
包id与包数据的长度之和
包id
VarInt
包的id
识别码(大概)
Long
用于分辨pong包
上面说了,直接返回一样的就行
handshake以及status的python实现 以下为部分代码 完整的我扔github上了: 点我
服务器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import jsonfrom socket import socketfrom data_types import Longfrom packet.packets.handshake.C0x0 import C0x0 as HC0x0from packet.packets.status.C0x0 import C0x0 as SC0x0from packet.packets.status.C0x1 import C0x1 as SC0x1from packet.packets.status.S0x0 import S0x0 as SS0x0from packet.packets.status.S0x1 import S0x1 as SS0x1from packet.raw_packet import RawPacket''' author: hsn8086 date: 01/06/2023(MM/DD/YY) ''' status_list = {} def client_recv (conn_: socket, addr_: tuple ): global status_list while True : try : rp = RawPacket(conn_) if len (bytes (rp)) == 0 : raise '' except : conn_.close() status_list[str (addr_)] = 'handshake' break if str (addr_) not in status_list: status_list[str (addr_)] = 'handshake' if status_list[str (addr_)] == 'handshake' : if int (rp.id ) == 0 : p = HC0x0() p.from_raw_packet(rp) protocol_ver, recv_addr, recv_port, status = p.read() if int (status) == 1 : status_list[str (addr_)] = 'status' continue if status_list[str (addr_)] == 'status' : if int (rp.id ) == 0 : p = SC0x0() p.from_raw_packet(rp) rt_p = SS0x0() rt_p += json.dumps({ "version" : { "name" : "1.19.3" , "protocol" : 761 }, "players" : { "max" : 100 , "online" : 5 , "sample" : [ { "name" : "thinkofdeath" , "id" : "4566e69f-c907-48ee-8d71-d7ba5aa00d20" } ] }, "description" : { "text" : "Hello world" }, "favicon" : "data:image/png;base64,<data>" , "previewsChat" : True }) conn_.send(bytes (rt_p)) continue if int (rp.id ) == 1 : ''' 其实这段代码是非常多余的... 正常来说 conn_.send(bytes(rp)) # 返回数据 continue 就行,不过这里为了展示包的结构,特有分开来写 ''' p = SC0x1() p.from_raw_packet(rp) payload = p.read()[0 ] rt_p = SS0x1() rt_p += Long(payload) conn_.send(bytes (rt_p)) continue if __name__ == '__main__' : s = socket() s.bind(("0.0.0.0" , 25565 )) s.listen(2000 ) while True : conn, addr = s.accept() client_recv(conn, addr)
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import copyimport jsonimport randomimport timefrom socket import socketfrom data_types import VarInt, UnsignedShort, Long, Bytefrom packet.packets.handshake.C0x0 import C0x0 as HC0x0from packet.packets.status.C0x0 import C0x0 as SC0x0from packet.packets.status.S0x0 import S0x0 as SS0x0from packet.packets.status.C0x1 import C0x1 as SC0x1from packet.packets.status.S0x1 import S0x1 as SS0x1from packet.raw_packet import RawPacketdef get_motd (addr: tuple [str , int ] = None ): if addr is None : addr_ = ("127.0.0.1" , 25565 ) else : addr_ = copy.copy(addr) s = socket() s.connect(addr_) hc0x0 = HC0x0() hc0x0 += VarInt(761 ) hc0x0 += addr_[0 ] hc0x0 += UnsignedShort(addr_[1 ]) hc0x0 += Byte([1 ]) s.send(bytes (hc0x0)) s.send(bytes (SC0x0())) rp_ss0x0 = RawPacket(s) ss0x0 = SS0x0() ss0x0.from_raw_packet(rp_ss0x0) json_str = ss0x0.read()[0 ] sc0x1 = SC0x1() payload = Long(random.randint(114514 , 1919810 )) sc0x1 += payload s.send(bytes (sc0x1)) s_time = time.time() rp_ss0x1 = RawPacket(s) delay = time.time() - s_time ss0x1 = SS0x1() ss0x1.from_raw_packet(rp_ss0x1) recv_payload = ss0x1.read()[0 ] if recv_payload == payload: return int (delay * 1000 ), json.loads(json_str) if __name__ == '__main__' : print (get_motd(('ZQAT.top' , 25565 )))
其他 btw,我有个更完整的项目 ,还在写,不过感觉这辈子写不完XD,如果闲着没事干欢迎来交pr,代码写的烂请见谅.
鸣谢 参考资料类
开发工具类
PyCharm : 功能及其全面的Python IDE.
VSCode : 兼容/扩展性极强的轻量级IDE.(虽然我只是拿来写markdown(逃))
语言翻译类
搜索引擎类
谷歌 : 对开发者极为友好的搜索引擎.
Bing : 国内能用的搜索引擎中较为优秀的存在.
百度 : SB百度让我发现了bing和谷歌是多么的好用.