使用策略模式(Strategy Pattern)来灵活地生成不同类型的TCP数据包。
包括三次握手、数据传输和四次挥手。
from scapy.all import *
from scapy.all import Ether, IP, TCP, UDP, wrpcap
from abc import ABC, abstractmethod
class TcpPacketStrategy(ABC):
@abstractmethod
def generate_packets(self, src_ip, dst_ip, src_port, dst_port, seq_num, ack_num, data=None):
pass
class ThreeWayHandshakeStrategy(TcpPacketStrategy):
def generate_packets(self, src_ip, dst_ip, src_port, dst_port, seq_num, ack_num, data=None):
syn = IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags='S', seq=seq_num)
syn_ack = IP(src=dst_ip, dst=src_ip) / TCP(sport=dst_port, dport=src_port, flags='SA', seq=1000,ack=syn.seq + 1)
ack = IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags='A', seq=syn.seq + 1,ack=syn_ack.seq + 1)
return [syn, syn_ack, ack]
class DataTransferStrategy(TcpPacketStrategy):
def generate_packets(self, src_ip, dst_ip, src_port, dst_port, seq_num, ack_num, data_list=None):
packets = []
for data in data_list:
packet = IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags='PA', seq=seq_num,ack=ack_num) / Raw(load=data)
packets.append(packet)
seq_num += len(data) # 假设数据长度即为字节数
return packets
class FourWayTeardownStrategy(TcpPacketStrategy):
def generate_packets(self, src_ip, dst_ip, src_port, dst_port, seq_num, ack_num, data=None):
# 客户端发送 FIN
fin = IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags='FA', seq=seq_num, ack=ack_num)
# 服务器收到 FIN 后发送 ACK
ack1 = IP(src=dst_ip, dst=src_ip) / TCP(sport=dst_port, dport=src_port, flags='A', seq=ack_num, ack=fin.seq + 1)
# 服务器发送 FIN
fin2 = IP(src=dst_ip, dst=src_ip) / TCP(sport=dst_port, dport=src_port, flags='FA', seq=ack_num,ack=fin.seq + 1)
# 客户端发送 ACK 确认
ack2 = IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags='A', seq=fin.seq + 1,ack=fin2.seq + 1)
return [fin, ack1, fin2, ack2]
class TcpPcapGenerator:
def __init__(self, strategy: TcpPacketStrategy):
self._strategy = strategy
def set_strategy(self, strategy: TcpPacketStrategy):
self._strategy = strategy
def generate(self, src_ip, dst_ip, src_port, dst_port, seq_num, ack_num, data=None):
return self._strategy.generate_packets(src_ip, dst_ip, src_port, dst_port, seq_num, ack_num, data)
# 使用示例
if __name__ == "__main__":
src_ip = "192.168.1.2"
dst_ip = "192.168.1.1"
src_port = 12345
dst_port = 80
seq_num = 1000
ack_num = 0
# 生成三次握手数据包
generator = TcpPcapGenerator(ThreeWayHandshakeStrategy())
handshake_packets = generator.generate(src_ip, dst_ip, src_port, dst_port, seq_num, ack_num)
# 计算初始序列号和确认号
seq_num += 1
ack_num = 1001
# 生成数据传输数据包
generator.set_strategy(DataTransferStrategy())
data_packets = generator.generate(src_ip, dst_ip, src_port, dst_port, seq_num, ack_num, ["Hello", "World", "Data"])
# 更新序列号和确认号
for packet in data_packets:
seq_num += len(packet[Raw].load)
# 生成四次挥手数据包
generator.set_strategy(FourWayTeardownStrategy())
teardown_packets = generator.generate(src_ip, dst_ip, src_port, dst_port, seq_num, ack_num)
# 合并所有数据包
all_packets = handshake_packets + data_packets + teardown_packets
# 写入到PCAP文件
wrpcap('tcp_traffic.pcap', all_packets)
print("PCAP file generated: tcp_traffic.pcap")
PCAP file generated: tcp_traffic.pcap
解释:syn = IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags='S', seq=seq_num)
这行代码的作用如下:
IP 层部分
IP(src=src_ip, dst=dst_ip)
:
IP
是 Scapy 中用于生成 IP 层数据包的类。src=src_ip
指定了 IP 数据包的源 IP 地址,它是数据包的发送方的 IP 地址。dst=dst_ip
指定了 IP 数据包的目的 IP 地址,它是数据包的接收方的 IP 地址。TCP 层部分
TCP(sport=src_port, dport=dst_port, flags='S', seq=seq_num)
:
TCP
是 Scapy 中用于生成 TCP 层数据包的类。sport=src_port
指定了 TCP 数据包的源端口号,这是发送方的端口号。dport=dst_port
指定了 TCP 数据包的目的端口号,这是接收方的端口号。flags='S'
设置了 TCP 标志位为 SYN (S
),表示这是一个 SYN 数据包,用于发起一个新的 TCP 连接。seq=seq_num
设置了 TCP 数据包的序列号,通常在建立连接时初始值随机生成。组合 IP 和 TCP 部分:
IP(...) / TCP(...)
是 Scapy 中的一个语法,表示将 IP 层和 TCP 层的部分组合在一起,构成一个完整的数据包。- 斜杠
/
操作符在 Scapy 中用于将不同层的协议组合在一起,形成一个完整的分层数据包。这行代码创建了一个源 IP 地址为
src_ip
,目的 IP 地址为dst_ip
,源端口为src_port
,目的端口为dst_port
,标志位为 SYN,序列号为seq_num
的 TCP SYN 数据包。这个数据包用于发起一个新的 TCP 连接,通常是 TCP 三次握手的第一步。