引言&协议概述
(CMPP)是中国移动为实现短信业务而制定的一种通信协议,用于在客户端(SP,Service Provider)和中国移动短信网关之间传输短消息,有时也叫做移动梦网短信业务。CMPP3.0是该协议的第三个版本,相比于前两个版本,它增加了对长短信的支持、优化了数据结构等。本文对CMPP协议进行介绍,并给出Python实现CMPP协议栈的思路。
Python的asyncio模块提供了一套简洁的异步IO编程模型,非常适合用于实现协议栈。
CMPP协议基于客户端/服务端模型工作。由客户端(短信应用,如手机,应用程序等)先和ISMG(Internet Short Message Gateway 互联网短信网关)建立起TCP长连接,并使用CMPP命令与ISMG进行交互,实现短信的发送和接收。在CMPP协议中,无需同步等待响应就可以发送下一个指令,实现者可以根据自己的需要,实现同步、异步两种消息传输模式,满足不同场景下的性能要求。
连接成功,发送短信并查询短信发送成功
连接成功,从ISMG接收到短信
协议帧介绍
在CMPP协议中,每个PDU都包含两个部分:CMPP Header和CMPP Body。
CMPP Header
Header包含以下字段,大小长度都是4字节
- Total Length:整个PDU的长度,包括Header和Body。
- Command ID:用于标识PDU的类型(例如,Connect、Submit等)。
- Sequence Id:序列号,用来匹配请求和响应。
用Python Asyncio实现CMPP协议栈里的建立连接
可以以本文的代码作为基础,很容易地在上面扩展。
代码结构组织如下:
.
├── LICENSE
├── README.md
├── cmpp
│ ├── __init__.py
│ ├── client.py
│ ├── protocol.py
│ └── utils.py
├── requirements.txt
├── setup.cfg
└── setup.py
- cmpp/protocol.py:定义不同 CMPP 协议数据单元 (PDU) 的数据类,包括 CmppHeader、CmppConnect、CmppConnectResp、CmppSubmit 和 CmppSubmitResp
- cmpp/client.py:该类处理与 ISMG(互联网短消息网关)的连接以及发送/接收 PDU。 主要 asyncio 进行异步 I/O 操作
- cmpp/utils.py:定义 BoundAtomic 类,它是一种线程安全的方式来管理具有最小值和最大值的序列号。保证CMPP序列号在一定的范围内
- setup.py:配置要分发的包,指定包名称、版本、作者和依赖项等元数据。
利用Python锁实现sequence_id
sequence_id是从1到0x7FFFFFFF的值
import threading
class BoundAtomic:
def __init__(self, min_val: int, max_val: int):
assert min_val <= max_val, "min must be less than or equal to max"
self.min = min_val
self.max = max_val
self.value = min_val
self.lock = threading.Lock()
def next_val(self) -> int:
with self.lock:
if self.value >= self.max:
self.value = self.min
else:
self.value += 1
return self.value
在Python中定义CMPP PDU,篇幅有限,仅定义数个PDU
from dataclasses import dataclass
from typing import Union, List
@dataclass
class CmppHeader:
total_length: int
command_id: int
sequence_id: int
@dataclass
class CmppConnect:
source_addr: str
authenticator_source: bytes
version: int
timestamp: int
@dataclass
class CmppConnectResp:
status: int
authenticator_ismg: str
version: int
@dataclass
class CmppSubmit:
msg_id: int
pk_total: int
pk_number: int
registered_delivery: int
msg_level: int
service_id: str
fee_user_type: int
fee_terminal_id: str
fee_terminal_type: int
tp_pid: int
tp_udhi: int
msg_fmt: int
msg_src: str
fee_type: str
fee_code: str
valid_time: str
at_time: str
src_id: str
dest_usr_tl: int
dest_terminal_id: List[str]
dest_terminal_type: int
msg_length: int
msg_content: bytes
link_id: str
@dataclass
class CmppSubmitResp:
msg_id: int
result: int
@dataclass
class CmppPdu:
header: CmppHeader
body: Union[CmppHeader, CmppConnectResp, CmppSubmit, CmppSubmitResp]
实现编解码方法
@dataclass
class CmppConnect:
source_addr: str
authenticator_source: bytes
version: int
# MMDDHHMMSS format
timestamp: int
def encode(self) -> bytes:
source_addr_bytes = self.source_addr.encode('utf-8').ljust(6, b'\x00')
version_byte = self.version.to_bytes(1, 'big')
timestamp_bytes = self.timestamp.to_bytes(4, 'big')
return source_addr_bytes + self.authenticator_source + version_byte + timestamp_bytes
@dataclass
class CmppConnectResp:
status: int
authenticator_ismg: str
version: int
@staticmethod
def decode(data: bytes) -> 'CmppConnectResp':
status = int.from_bytes(data[0:4], 'big')
authenticator_ismg = data[4:20].rstrip(b'\x00').decode('utf-8')
version = data[20]
return CmppConnectResp(status=status, authenticator_ismg=authenticator_ismg, version=version)
@dataclass
class CmppPdu:
header: CmppHeader
body: Union[CmppConnect, CmppConnectResp, CmppSubmit, CmppSubmitResp]
def encode(self) -> bytes:
body_bytes = self.body.encode()
self.header.total_length = len(body_bytes) + 12
header_bytes = (self.header.total_length.to_bytes(4, 'big') +
self.header.command_id.to_bytes(4, 'big') +
self.header.sequence_id.to_bytes(4, 'big'))
return header_bytes + body_bytes
@staticmethod
def decode(data: bytes) -> 'CmppPdu':
header = CmppHeader(total_length=int.from_bytes(data[0:4], 'big'),
command_id=int.from_bytes(data[4:8], 'big'),
sequence_id=int.from_bytes(data[8:12], 'big'))
body_data = data[12:header.total_length]
if header.command_id == CONNECT_RESP_ID:
body = CmppConnectResp.decode(body_data)
else:
raise NotImplementedError("not implemented yet.")
return CmppPdu(header=header, body=body)
asyncio tcp流相关代码
class CmppClient:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.sequence_id = BoundAtomic(1, 0x7FFFFFFF)
self.reader = None
self.writer = None
async def connect(self):
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
async def close(self):
if self.writer:
self.writer.close()
实现同步的connect_ismg方法
async def connect_ismg(self, request: CmppConnect):
if self.writer is None or self.reader is None:
raise ConnectionError("Client is not connected")
sequence_id = self.sequence_id.next_val()
header = CmppHeader(0, command_id=CONNECT_ID, sequence_id=sequence_id)
pdu: CmppPdu = CmppPdu(header=header, body=request)
self.writer.write(pdu.encode())
await self.writer.drain()
length_bytes = await self.reader.readexactly(4)
response_length = int.from_bytes(length_bytes)
response_data = await self.reader.readexactly(response_length)
return CmppPdu.decode(response_data)
运行example,验证连接成功
async def main():
client = CmppClient(host='localhost', port=7890)
await client.connect()
print("Connected to ISMG")
connect_request = CmppConnect(
source_addr='source_addr',
authenticator_source=b'authenticator_source',
version=0,
timestamp=1122334455,
)
connect_response = await client.connect_ismg(connect_request)
print(f"Connect response: {connect_response}")
await client.close()
print("Connection closed")
asyncio.run(main())
总结
本文简单对CMPP协议进行了介绍,并尝试用python实现协议栈,但实际商用发送短信往往更加复杂,面临诸如流控、运营商对接、传输层安全等问题,可以选择华为云消息&短信(Message & SMS)服务。