banner
NEWS LETTER

mc通信:ping的原理及实现

Scroll down

目录

数据类型

ping用到的数据类型主要有这几种:

类型 中文 描述
UnsignedShort 无符号短整数 由2个字节组成,表示整数
VarInt 可变整数 由1个或多个字节组成,表示整数
String 字符串 开头由varint表示数据长度,后面接由utf8编码的字符串
Byte 字节型 由1个字节组成,表示字节
Long 长整数 由8个字节组成,表示长整数

VarInt

varint是一种可变长度的整数.

编码过程:

  • 若数值小于或等于127则直接转为字节型
  • 先将数字转为二进制,七位为一组,从后往前每一组在开头写入1,在最后一组写入0.
  • 将获得的每一种二进制数转为字节型,并存入数组
  • 将数组中的每一个字节写入到数据流中

详解varint编码原理 - Kevin Yan

python代码实现:

这段代码是很久以前写的了,可能很烂XD

至于为什么不写其他语言的…一个是我懒,一个是部分语言自带varint,还有就是我对一些语言都不熟.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class VarInt:
def __init__(self, value):
if type(value) != bytes:
self.value = bytes(self.value_of(value))
if len(self.value) == 0:
self.value = bytes([0])
else:
self.value = value
self.__compile__()

@staticmethod
def value_of(value):
if type(value) == int:
return_list = []
while value > 0:
if value > 127:
return_list.append(value % 128 + 128)
value //= 128
else:
return_list.append(value)
value = 0

return VarInt(bytes(return_list))
elif type(value) == str:
return VarInt(bytes(value, 'utf-8'))
elif type(value) == VarInt:
return VarInt(value.__bytes__())
elif type(value) == list:
return VarInt(bytes(value))

def __compile__(self):
d = 128
i = -1

while d >= 128:
i += 1
d = self.value[i] if len(self.value) > 0 else 0

self.value = self.value[:i + 1]

def __int__(self):
return_int = 0
for i in self.value[::-1]:
if i > 128:
return_int *= 128
return_int += i - 128
else:
return_int = i
return return_int

handshake包

写在前面:服务端建立连接并接受到handshake包之后应该继续s.recv()而不是s.accept(),我不希望再有人像我这个究极大傻吊一样再因为接收不到status0x00包而卡住了.

注:C表示Client,客户端; S表示Server,服务端.

握手的流程

  • C->S handshakeC0x00 发送握手包
  • 服务器切换状态

握手包的结构

C->S handshakeC0x00

数据 数据类型 描述
包长度 VarInt 包id与包数据的长度之和
包id VarInt 包的id
mc通信协议版本 VarInt mc通信协议的版本
服务器地址 String(255) 服务器地址
服务器端口 UnsignedShort 服务器的端口,由四个字节组成
handshake类型 Byte 状态,1为ping,2为登录

1
0xf,0x0,0x2f,0x9,0x31,0x32,0x37,0x2e,0x30,0x2e,0x30,0x2e,0x31,0x63,0xdd,0x1

第一个0xf代表的是除去自身以外,包长度为15.

后边的0x0为包id:0.

0x2f对应1.8.x版本号:47.

0x9代表服务器地址长度为9,紧跟着的9个字节代表127.0.0.1.

0x63 0xdd转换后为25565,代表服务器端口号.

最后的0x1表示当前的状态为ping.

status包

status包的流程

  • C->S statusC0x00 发送状态请求包
  • S->C statusS0x00 返回服务器状态(包含图标motd人数之类)
  • C->S statusC0x01 发送ping请求包
  • S->C statusS0x01 返回pong包

C->S statusC0x00:

数据 数据类型 描述
包长度 VarInt 包id的长度
包id VarInt 包的id

此包无任何字段,仅为一个请求.

S->C statusS0x00:

数据 数据类型 描述
包长度 VarInt 包id与包数据的长度之和
包id VarInt 包的id
json回应数据 String(32767) 表示服务器状态的json

json回应数据格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"version": {
"name": "1.19", //版本名字(?),似乎在版本不兼容时可以用此作为提示信息
"protocol": 759 //通信协议版本,服务器常用此判断
},
"players": {
"max": 100, //最大玩家数
"online": 5, //在线玩家数
"sample": [ //样本
{
"name": "thinkofdeath", //玩家名
"id": "4566e69f-c907-48ee-8d71-d7ba5aa00d20" //uuid
}
]
},
"description": {
"text": "Hello world" //motd,似乎可以用颜色代码
},
"favicon": "data:image/png;base64,<data>", //图标
"previewsChat": true
}

此json来自wiki.vg

sample里似乎必须含有至少一个样本,此样本将会在玩家把鼠标放在信号标上时显示.当然,你可以把这个当作是信息显示,不过需要注意的是,uuid
须为正确格式的uuid.

图片是base64编码的png图片,大小必须为64*64

什么?你说你不知道什么是base64?我觉得谷歌是个好东西..

C->S statusC0x01:

数据 数据类型 描述
包长度 VarInt 包id与包数据的长度之和
包id VarInt 包的id
识别码(大概) Long 用于分辨pong包

识别码为客户端随机生成的一串长整数,服务器返回pong包时需要带上此参数.

S->C statusS0x01:

数据 数据类型 描述
包长度 VarInt 包id与包数据的长度之和
包id VarInt 包的id
识别码(大概) Long 用于分辨pong包

上面说了,直接返回一样的就行

handshake以及status的python实现

以下为部分代码
完整的我扔github上了: 点我

服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import json
from socket import socket

from data_types import Long
from packet.packets.handshake.C0x0 import C0x0 as HC0x0
from packet.packets.status.C0x0 import C0x0 as SC0x0
from packet.packets.status.C0x1 import C0x1 as SC0x1
from packet.packets.status.S0x0 import S0x0 as SS0x0
from packet.packets.status.S0x1 import S0x1 as SS0x1
from packet.raw_packet import RawPacket

'''
author: hsn8086
date: 01/06/2023(MM/DD/YY)
'''

status_list = {} # 状态列表


def client_recv(conn_: socket, addr_: tuple):
global status_list # 声明status为全局变量
while True: # 进入接收循环
try:
rp = RawPacket(conn_) # 获取原始包

if len(bytes(rp)) == 0: # 如果无法接收到数据包则停止运行
raise ''

except:
conn_.close() # 关闭连接
status_list[str(addr_)] = 'handshake' # 连接状态初始化
break

if str(addr_) not in status_list: # 判断键是否存在
status_list[str(addr_)] = 'handshake' # 连接状态初始化

if status_list[str(addr_)] == 'handshake': # 判断连接状态
if int(rp.id) == 0: # 判断包id
p = HC0x0()
p.from_raw_packet(rp) # 获取handshake0x00包
protocol_ver, recv_addr, recv_port, status = p.read() # 读取包
if int(status) == 1:
status_list[str(addr_)] = 'status' # 切换状态
continue # 结束本轮
if status_list[str(addr_)] == 'status':
if int(rp.id) == 0:
p = SC0x0()
p.from_raw_packet(rp) # 读取status0x00包
rt_p = SS0x0()
rt_p += json.dumps({
"version": {
"name": "1.19.3",
"protocol": 761
},
"players": {
"max": 100,
"online": 5,
"sample": [
{
"name": "thinkofdeath",
"id": "4566e69f-c907-48ee-8d71-d7ba5aa00d20"
}
]
},
"description": {
"text": "Hello world"
},
"favicon": "data:image/png;base64,<data>",
"previewsChat": True
}) # 加入数据

conn_.send(bytes(rt_p)) # 返回数据
continue
if int(rp.id) == 1:
'''
其实这段代码是非常多余的...
正常来说
conn_.send(bytes(rp)) # 返回数据
continue
就行,不过这里为了展示包的结构,特有分开来写
'''
p = SC0x1()
p.from_raw_packet(rp) # 读取status0x01包
payload = p.read()[0]
rt_p = SS0x1()
rt_p += Long(payload)
conn_.send(bytes(rt_p)) # 返回数据
continue


if __name__ == '__main__': # 程序入口
s = socket() # 创建socket对象

s.bind(("0.0.0.0", 25565)) # 绑定ip
s.listen(2000) # 监听
while True:
conn, addr = s.accept() # 建立连接
client_recv(conn, addr) # 进入处理函数

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import copy
import json
import random
import time
from socket import socket

from data_types import VarInt, UnsignedShort, Long, Byte
from packet.packets.handshake.C0x0 import C0x0 as HC0x0
from packet.packets.status.C0x0 import C0x0 as SC0x0
from packet.packets.status.S0x0 import S0x0 as SS0x0
from packet.packets.status.C0x1 import C0x1 as SC0x1
from packet.packets.status.S0x1 import S0x1 as SS0x1
from packet.raw_packet import RawPacket


def get_motd(addr: tuple[str, int] = None):
if addr is None: # 判断参数是否为空
addr_ = ("127.0.0.1", 25565) # 使用默认参数
else:
addr_ = copy.copy(addr)
s = socket() # 创建socket对象
s.connect(addr_) # 连接服务器

hc0x0 = HC0x0() # 创建包handshakeC0x0
hc0x0 += VarInt(761) # 协议版本号
hc0x0 += addr_[0] # 服务器地址
hc0x0 += UnsignedShort(addr_[1]) # 服务器端口号
hc0x0 += Byte([1]) # 接下来的状态
s.send(bytes(hc0x0)) # 发送包

s.send(bytes(SC0x0())) # 发送包statusC0x0

rp_ss0x0 = RawPacket(s) # 接收原始包statusS0x0
ss0x0 = SS0x0()
ss0x0.from_raw_packet(rp_ss0x0) # 解析包
json_str = ss0x0.read()[0] # 读取参数

sc0x1 = SC0x1() # 创建statusC0x1
payload = Long(random.randint(114514, 1919810)) # 创建payload
sc0x1 += payload
s.send(bytes(sc0x1)) # 发送statusC0x1
s_time = time.time() # 开始计时

rp_ss0x1 = RawPacket(s) # 接收原始包statusS0x1
delay = time.time() - s_time # 计算延迟
ss0x1 = SS0x1()
ss0x1.from_raw_packet(rp_ss0x1) # 解析包
recv_payload = ss0x1.read()[0] # 获取参数
if recv_payload == payload: # 比较参数
return int(delay * 1000), json.loads(json_str) # 返回数据


if __name__ == '__main__':
print(get_motd(('ZQAT.top', 25565)))

其他

btw,我有个更完整的项目,还在写,不过感觉这辈子写不完XD,如果闲着没事干欢迎来交pr,代码写的烂请见谅.

鸣谢

参考资料类

  • wiki.vg : 超详细的minecraft开发wiki!!

开发工具类

  • PyCharm : 功能及其全面的Python IDE.
  • VSCode : 兼容/扩展性极强的轻量级IDE.(虽然我只是拿来写markdown(逃))

语言翻译类

  • DeepL : 基于深度学习的翻译软件.
  • Google translate : 老牌翻译软件,一般被我拿来命名变量.

搜索引擎类

  • 谷歌 : 对开发者极为友好的搜索引擎.
  • Bing : 国内能用的搜索引擎中较为优秀的存在.
  • 百度 : SB百度让我发现了bing和谷歌是多么的好用.
其他文章