前言
需求为,统计linux上得上下行公网流量,常规得命令如iftop 、sar、ifstat、nload等只能获取流量得大小,不能区分公私网,所以需要通过抓取网络包并排除私网段才能拿到公网流量。下面提供了一些有效得解决思路,提供了部分得代码片段,但不提供整个代码内容。
方案
方案一:在Linux服务器上安装服务,抓取到数据包后发送到中央计算服务器
优点:①由于计算是在中央,并不会占用linux业务服务器得资源性能
缺点:①通过scp或其他传输手段,会占用一定得网络资源,特别当数据量大时
②中央计算服务器得资源易达到瓶颈,需要开多个计算节点
方案二:在Linux服务器上安装服务,抓取到数据包后本地处理,直接将结果发至中央
优点:①中央节点压力减小,数据实时性提升
缺点:①每个linux服务器都需要抓包并且处理,需要占用一定的计算资源,且单位包越大消耗越高
方案三:在同层交换机上放置旁路监控服务器,抓取广播域内所有包,逐个分析每个linux服务器上的流量情况
优点:
①独立部署,不会占用linux服务器资源
②维护相对容易,linux业务服务器与监控节点解耦,只需要维护监控节点而不需要维护linux服务器
缺点:
①需要跟linux服务器处在一个广播域,并且要求性能足够,当linux业务服务器数据过多时,易有瓶颈
②可能存在漏抓包的情况
③需要监听所有得vlan,并分别部署监控节点
方案四:加入流控设备,旁路到核心网关
优点:
①无需自研,有成熟的抓取功能,且有更多丰富的功能
缺点:
①成本较高
②缺乏控制能力,比如流量限速,阻断等
下面主要介绍第一和第二种方法得实现
方案一技术实现
先来看技术拓扑图,流程为:
- linux服务器开启tcpdump抓包
- linux服务器将pcap包发送到流量分析服务器
- 流量服务器进行公私网流量拆分,公网白名单过滤
- 获取步骤三得结果并传输到中央
下面讲解每一步得实现细节
1.linux服务器开启tcpdump抓包
核心命令为"tcpdump", "-i", interface , f"host {target_ip}", '-s 96',"-w", output_file ,'-p'
-i 接网卡名,host接希望抓取得ip(可不填),-s 96 表示只抓取包头部(可能有得包头超过这个长度,但96依然是个比较适中得值,-w 保存为pcap包,-p 关闭混杂模式(避免监听到广播得其他包使流量过多)
#执行tcpdump抓包
def run_tcpdump(target_ip):
INTERVAL = 60
interface = getInterface() #获取网卡名
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") #获取开始时间
output_file = f"{SAVEPATH}/xxx_{timestamp}.pcap"
tcpdump_command = ["tcpdump", "-i", interface , f"host {target_ip}", '-s 96',"-w", output_file ,'-p']
try:
process = subprocess.Popen(tcpdump_command) #使用subprocess执行shell命令
time.sleep(INTERVAL) #定义休眠时间
process.terminate() #结束shell命令
process.wait() #等待子进程退出,确保子进程清理完成
except Exception as e:
logger.error(f"xxx")
2.linux服务器将pcap包发送到流量分析服务器
这一个步骤没有太多可讲得,使用socket 或requests 或 subprocess(scp)都可以,目的就是将抓取到得包发送到流量分析服务器。
3.流量服务器进行公私网流量拆分,公网白名单过滤
这一步是关键步骤,公私网流量,公网白名单过滤拆分需要一定得计算性能,所以核心代码推荐用C++语言编写,博主测试过同代码情况下,python C++ java得表现能力,C++(或C)性能要远超其他语言,如同体积包,假设占用30%得单核心性能,C++可以做到1%以内。
代码要实现两点:
1.公私网彻底拆分
2.过滤公网白名单内的(不希望统计得)数据
公私网流量拆分参考以下代码片段,首先剔除三类子网地址(0xFF000000 是掩码(255.0.0.0 的二进制形式),只保留 IP 地址的最高 8 位。0x0A000000 是 10.0.0.0 的二进制形式。)
注意:现在得大型网络多采用overlay得方式,有些ip虽然在私网段,但由于走了vxlan隧道占用了公网带宽,其实也是数据公网得。后续得处理需读者自行处理。
bool is_private_ip(const std::string &ip) {
struct in_addr addr;
inet_pton(AF_INET, ip.c_str(), &addr);
uint32_t ip_addr = ntohl(addr.s_addr);
// 10.0.0.0/8
if ((ip_addr & 0xFF000000) == 0x0A000000) return true;
// 172.16.0.0/12
if ((ip_addr & 0xFFF00000) == 0xAC100000) return true;
// 192.168.0.0/16
if ((ip_addr & 0xFFFF0000) == 0xC0A80000) return true;
return false;
}
定义4个变量分别存公网下行,公网上行,私网下行,私网上行
uint64_t public_in= 0;
uint64_t public_out = 0;
uint64_t private_in = 0;
uint64_t private_out = 0;
pcap_t *handle = pcap_open_offline(pcap_file.c_str(), errbuf); #获取pcap包
pcap_next_ex(handle, &header, &data) #调用libpcap 捕获库
const struct ip *ip_header = (struct ip *)(data + sizeof(struct ether_header)); #获取包得head
std::string src_ip = inet_ntoa(ip_header->ip_src); #获取src ip
std::string dst_ip = inet_ntoa(ip_header->ip_dst); #获取dst ip
#判断是私网还是公网
if (dst_ip == target_ip) {
if (is_private_ip(src_ip)) {
private_in += pkt_len;
} else {
public_in += pkt_len;
}
} else if (src_ip == target_ip) {
if (is_private_ip(dst_ip)) {
private_out += pkt_len;
} else {
public_out += pkt_len;
}
}
接下来是过滤不希望统计的ip名单,在上面代码的基础上再做一层判断即可,
#判断是私网还是公网
if (dst_ip == target_ip) {
if (is_private_ip(src_ip)) {
private_in += pkt_len;
} else {
if (filter_write(src_ip)){ #return true则公网ip不在白名单内,需要加和
public_in += pkt_len;
}
}
} else if (src_ip == target_ip) {
if (is_private_ip(dst_ip)) {
private_out += pkt_len;
} else {
if (filter_write(src_ip)){
public_out += pkt_len;
}
}
}
4.获取步骤三得结果上报并持久化
拿到公网流量后,传输给用于持久化的程序进行入库操作(对于此类数据,建议用clickhouse进行建库,对体量小的用mysql也可以接受).关于传输方式,可以选择使用生产消费者方式,使用消息中间件缓冲数据(如Kafka)。数据量小通过http/https协议传输后插入也可以。
kafka模板
def push_kafka(data, retries=5, backoff_factor=1):
logger.info(f"上传 {data}")
# 配置 Kafka 生产者
kafka_config = {
'bootstrap.servers': KAFKA_URL, # 替换为你的 Kafka 地址
'client.id': 'my-producer',
}
producer = Producer(kafka_config)
topic = KAFKA_TOPIC # 替换为你的 Kafka 主题名
# 重试逻辑
for attempt in range(retries):
try:
# 将数据转换为 JSON 格式并发送到 Kafka
producer.produce(
topic=topic,
value=json.dumps(data),
callback=delivery_report
)
producer.flush() # 强制刷新缓冲区
logger.info("数据上传成功")
return # 成功后返回
except KafkaException as e:
logger.error(f"Kafka error occurred: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
# 等待后重试
wait_time = backoff_factor * (2 ** attempt)
logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
logger.error("所有重试均失败")
http/https直传
def post_with_retries_center(data, retries=5, backoff_factor=1):
URL_CENTER = "192.168.10.10" #填自己的地址
logger.info(f"上传{data}")
session = requests.Session()
headers = {
'Content-Type': 'application/json',
}
# 定义重试策略
retry_strategy = Retry(
total=retries,
status_forcelist=[404, 500, 502, 503, 504],
allowed_methods=["POST"],
backoff_factor=backoff_factor
)
# 创建适配器并将其安装到会话对象中
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
response = requests.post(URL_CENTER, json=data, headers=headers)
response.raise_for_status() # 如果响应状态码不是200,则抛出异常
except requests.exceptions.ConnectionError as conn_err:
logger.error(f"Connection error occurred: {conn_err}")
except requests.exceptions.Timeout as timeout_err:
logger.error(f"Timeout error occurred: {timeout_err}")
except requests.exceptions.RequestException as req_err:
logger.error(f"An error occurred: {req_err}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
return None
方案二技术实现
在Linux服务器上安装服务,抓取到数据包后本地处理,直接将结果发至中央
流程为
- linux服务器开启tcpdump抓包
- linux服务器分析包,获取私网、公网流量结果
- linux服务器推送消息中间件
- 中央消费者对消息消费并入库
方案二与方案一代码几乎相同,只是职权不同,原linux不需要处理包,现在需要处理,所以处理程序应该在linux服务器上,处理后推送到消息中间件,由中央消费者进行消费即可。代码片段这里就不贴了
结尾
本文章内容适用于基于linux系统的流量包分析,方案本身是完整闭环的。其中对包的处理快慢以及对公、私网的ip段判断、消息推送的方式这些是可以持续优化的点。由于方案本身具备一定商用价值,故没有贴上源码,若读者需要可联系博主提供。