capture 1.2.0
这是一个基于 PcapPlusPlus、PF_RING 和 ClickHouse 构建的高性能网络流量分析系统,专注于实时流量捕获、解析与存储。
Loading...
Searching...
No Matches
packet_handler.h
Go to the documentation of this file.
1
10
11#pragma once
12
13#include <vector>
14#include <queue>
15#include <unordered_map>
16#include <functional>
17#include <sstream>
18#include "TcpLayer.h"
19#include <UdpLayer.h>
20#include <IcmpLayer.h>
21#include "IPv4Layer.h"
22#include "PayloadLayer.h"
23#include "PacketUtils.h"
24#include "SystemUtils.h"
25#include <ctime>
26#include <fstream>
27#include <curl/curl.h>
28#include<string>
29#include <clickhouse/client.h>
32
38#define MAX_QUEUE_LENGTH 20000
39
48#define WRITE_STAT_LINE(description, counter, measurement) \
49 file<< description \
50 << ','\
51 << counter \
52 << ','\
53 <<measurement<< std::endl;
54
55using namespace clickhouse;
56
64void StatsSummary2File(HttpStatsCollector& collector,std::string filename)
65{
66 std::ofstream file(filename);
67 if (!file.is_open()) {
68 std::cerr << "Failed to open file: " << filename << std::endl;
69 exit ;
70 }
71 WRITE_STAT_LINE("Sample time", collector.getGeneralStats().sampleTime, "Seconds");
72 WRITE_STAT_LINE("Number of HTTP packets", collector.getGeneralStats().numOfHttpPackets, "Packets");
73 WRITE_STAT_LINE("Rate of HTTP packets", collector.getGeneralStats().httpPacketRate.totalRate, "Packets/sec");
74 WRITE_STAT_LINE("Number of HTTP flows", collector.getGeneralStats().numOfHttpFlows, "Flows");
75 WRITE_STAT_LINE("Rate of HTTP flows", collector.getGeneralStats().httpFlowRate.totalRate, "Flows/sec");
76 WRITE_STAT_LINE("Number of HTTP pipelining flows", collector.getGeneralStats().numOfHttpPipeliningFlows, "Flows");
77 WRITE_STAT_LINE("Number of HTTP transactions", collector.getGeneralStats().numOfHttpTransactions, "Transactions");
78 WRITE_STAT_LINE("Rate of HTTP transactions", collector.getGeneralStats().httpTransactionsRate.totalRate, "Transactions/sec");
79 WRITE_STAT_LINE("Total HTTP data", collector.getGeneralStats().amountOfHttpTraffic, "Bytes");
80 WRITE_STAT_LINE("Rate of HTTP data", collector.getGeneralStats().httpTrafficRate.totalRate, "Bytes/sec");
81 WRITE_STAT_LINE("Average packets per flow", collector.getGeneralStats().averageNumOfPacketsPerFlow, "Packets");
82 WRITE_STAT_LINE("Average transactions per flow", collector.getGeneralStats().averageNumOfHttpTransactionsPerFlow, "Transactions");
83 WRITE_STAT_LINE("Average data per flow", collector.getGeneralStats().averageAmountOfDataPerFlow, "Bytes");
84
85 WRITE_STAT_LINE("Number of HTTP requests", collector.getRequestStats().numOfMessages, "Requests");
86 WRITE_STAT_LINE("Rate of HTTP requests", collector.getRequestStats().messageRate.totalRate, "Requests/sec");
87 WRITE_STAT_LINE("Total data in headers", collector.getRequestStats().totalMessageHeaderSize, "Bytes");
88 WRITE_STAT_LINE("Average header size", collector.getRequestStats().averageMessageHeaderSize, "Bytes");
89
90 WRITE_STAT_LINE("Number of HTTP responses", collector.getResponseStats().numOfMessages, "Responses");
91 WRITE_STAT_LINE("Rate of HTTP responses", collector.getResponseStats().messageRate.totalRate, "Responses/sec");
92 WRITE_STAT_LINE("Total data in headers", collector.getResponseStats().totalMessageHeaderSize, "Bytes");
93 WRITE_STAT_LINE("Average header size", collector.getResponseStats().averageMessageHeaderSize, "Bytes");
94 WRITE_STAT_LINE("Num of responses with content-length", collector.getResponseStats().numOfMessagesWithContentLength, "Responses");
95 WRITE_STAT_LINE("Total body size (may be compressed)", collector.getResponseStats().totalContentLengthSize, "Bytes");
96 WRITE_STAT_LINE("Average body size", collector.getResponseStats().averageContentLengthSize, "Bytes");
97 file.close();
98}
99
107{
108private:
110 std::string table_prefix;
111 Client client;
112
113 // ClickHouse数据列定义
114 std::shared_ptr<ColumnUInt32> col_hash_val;
115 std::shared_ptr<ColumnInt32> col_packet_len;
116 std::shared_ptr<ColumnUInt32> col_tv_sec;
117 std::shared_ptr<ColumnUInt32> col_tv_nsec;
118 std::shared_ptr<ColumnString> col_src_ip;
119 std::shared_ptr<ColumnString> col_dst_ip;
120 std::shared_ptr<ColumnUInt8> col_ttl;
121 std::shared_ptr<ColumnUInt8> col_tos;
122 std::shared_ptr<ColumnUInt16> col_id;
123 std::shared_ptr<ColumnUInt16> col_offset;
124 std::shared_ptr<ColumnUInt8> col_protocol_ip;
125 std::shared_ptr<ColumnUInt8> col_protocol;
126 std::shared_ptr<ColumnUInt16> col_len_load;
127 std::shared_ptr<ColumnUInt16> col_src_port;
128 std::shared_ptr<ColumnUInt16> col_dst_port;
129 std::shared_ptr<ColumnUInt32> col_ack_num;
130 std::shared_ptr<ColumnUInt32> col_seq_num;
131 std::shared_ptr<ColumnUInt16> col_flag;
132 std::shared_ptr<ColumnUInt16> col_window;
133 std::shared_ptr<ColumnUInt16> col_len_udp;
134 std::shared_ptr<ColumnUInt8> col_icmp_type;
135 std::shared_ptr<ColumnUInt8> col_icmp_code;
137
138public:
145 packet_handler(std::string tb_prefix = ""):table_prefix(tb_prefix),client((ClientOptions().SetHost("localhost")))
146 {
147 col_hash_val = std::make_shared<ColumnUInt32>();
148 col_packet_len =std::make_shared<ColumnInt32>();
149 col_tv_sec = std::make_shared<ColumnUInt32>();
150 col_tv_nsec =std::make_shared<ColumnUInt32>();
151 col_src_ip =std::make_shared<ColumnString>();
152 col_dst_ip =std::make_shared<ColumnString>();
153 col_ttl =std::make_shared<ColumnUInt8>();
154 col_tos =std::make_shared<ColumnUInt8>();
155 col_id =std::make_shared<ColumnUInt16>();
156 col_offset =std::make_shared<ColumnUInt16>();
157 col_protocol_ip =std::make_shared<ColumnUInt8>();
158 col_protocol =std::make_shared<ColumnUInt8>();
159 col_len_load =std::make_shared<ColumnUInt16>();
160 col_src_port =std::make_shared<ColumnUInt16>();
161 col_dst_port =std::make_shared<ColumnUInt16>();
162 col_ack_num =std::make_shared<ColumnUInt32>();
163 col_seq_num =std::make_shared<ColumnUInt32>();
164 col_flag =std::make_shared<ColumnUInt16>();
165 col_window =std::make_shared<ColumnUInt16>();
166 col_len_udp =std::make_shared<ColumnUInt16>();
167 col_icmp_type =std::make_shared<ColumnUInt8>();
168 col_icmp_code =std::make_shared<ColumnUInt8>();
170 };
171
177 if(queue_len!=0) send2db(); // 线程池销毁时执行
178 }
179
186 void handle_packet(pcpp::Packet *packet)
187 {
188 queue_len++;
189
190 unifiedPacketAttr packet_attr(packet);
191 col_hash_val->Append(packet_attr.hash_val);
192 col_packet_len->Append(packet_attr.packet_len);
193 col_tv_sec->Append(packet_attr.tv_sec);
194 col_tv_nsec->Append(packet_attr.tv_nsec);
195 col_src_ip->Append(packet_attr.src_ip);
196 col_dst_ip->Append(packet_attr.dst_ip);
197 col_ttl->Append(packet_attr.ttl);
198 col_tos->Append(packet_attr.tos);
199 col_id->Append(packet_attr.id);
200 col_offset->Append(packet_attr.offset);
201 col_protocol_ip->Append(packet_attr.protocol_ip);
202 col_protocol->Append(packet_attr.protocol);
203 col_len_load->Append(packet_attr.len_load);
204 col_src_port->Append(packet_attr.src_port);
205 col_dst_port->Append(packet_attr.dst_port);
206 col_ack_num->Append(packet_attr.ack_num);
207 col_seq_num->Append(packet_attr.seq_num);
208 col_flag->Append(packet_attr.flag);
209 col_window->Append(packet_attr.window);
210 col_len_udp->Append(packet_attr.len_udp);
211 col_icmp_type->Append(packet_attr.icmp_type);
212 col_icmp_code->Append(packet_attr.icmp_code);
213 collector.collectStats(packet);
214
215 if (queue_len >= MAX_QUEUE_LENGTH) // 如果达到一定数量,就批量存入db
216 {
217 send2db();
218 flush();
219 queue_len = 0;
220 StatsSummary2File(collector,"/home/njust/program/example-app/stats.txt");
221 collector.clear();
222 }
223 }
224
229 void flush()
230 {
231 col_hash_val ->Clear();
232 col_packet_len ->Clear();
233 col_tv_sec ->Clear();
234 col_tv_nsec ->Clear();
235 col_src_ip ->Clear();
236 col_dst_ip ->Clear();
237 col_ttl ->Clear();
238 col_tos ->Clear();
239 col_id ->Clear();
240 col_offset ->Clear();
241 col_protocol_ip->Clear();
242 col_protocol ->Clear();
243 col_len_load ->Clear();
244 col_src_port ->Clear();
245 col_dst_port ->Clear();
246 col_ack_num ->Clear();
247 col_seq_num ->Clear();
248 col_flag ->Clear();
249 col_window ->Clear();
250 col_len_udp ->Clear();
251 col_icmp_type ->Clear();
252 col_icmp_code ->Clear();
253 };
254
260 void send2db()
261 {
262 // 构建主数据块
263 Block block;
264 block.AppendColumn("hashVal", col_hash_val );
265 block.AppendColumn("packetLen", col_packet_len );
266 block.AppendColumn("tvSec", col_tv_sec );
267 block.AppendColumn("tvNsec", col_tv_nsec );
268 block.AppendColumn("srcIp", col_src_ip );
269 block.AppendColumn("dstIp", col_dst_ip );
270 block.AppendColumn("ttl", col_ttl );
271 block.AppendColumn("tos", col_tos );
272 block.AppendColumn("id", col_id );
273 block.AppendColumn("offset", col_offset );
274 block.AppendColumn("protocolIP", col_protocol_ip);
275 block.AppendColumn("protocol", col_protocol );
276 block.AppendColumn("lenLoad", col_len_load );
277 block.AppendColumn("srcPort", col_src_port );
278 block.AppendColumn("dstPort", col_dst_port );
279 block.AppendColumn("ackNum", col_ack_num );
280 block.AppendColumn("seqNum", col_seq_num );
281 block.AppendColumn("flag", col_flag );
282 block.AppendColumn("window", col_window );
283 block.AppendColumn("lenUdp", col_len_udp );
284 block.AppendColumn("icmpType", col_icmp_type );
285 block.AppendColumn("icmpCode", col_icmp_code );
286
287 // 构建源IP表数据块
288 Block block_src_ip;
289 block_src_ip.AppendColumn("srcIp",col_src_ip);
290 block_src_ip.AppendColumn("hashVal",col_hash_val);
291
292 // 构建协议表数据块
293 Block block_protocol;
294 block_protocol.AppendColumn("protocol",col_protocol);
295 block_protocol.AppendColumn("hashVal",col_hash_val);
296
297 // 插入主包表
298 try
299 {
300 client.Insert("default."+table_prefix+"_packets", block);
301 }
302 catch(const std::exception& e)
303 {
304 std::cerr <<"Insert failue.retrying... error msg:" <<e.what() << '\n';
305 try
306 {
307 client.ResetConnection();
308 client.Insert("default."+table_prefix+"_packets", block);
309 }
310 catch(const std::exception& e) {
311 std::cerr<<"insert block failed!"<<std::endl;
312 }
313 }
314
315 // 插入源IP哈希表
316 try
317 {
318 client.Insert("_src_ip_hash_table", block_src_ip);
319 }
320 catch(const std::exception& e)
321 {
322 std::cerr <<"Insert failue.retrying... error msg:" <<e.what() << '\n';
323 try
324 {
325 client.ResetConnection();
326 client.Insert("_src_ip_hash_table", block_src_ip);
327 }
328 catch(const std::exception& e)
329 {
330 std::cerr <<"Insert failue and drop .error msg:" <<e.what() << '\n';
331 }
332 }
333
334 // 插入协议哈希表
335 try
336 {
337 client.Insert("_protocol_hash_table", block_protocol);
338 }
339 catch(const std::exception& e)
340 {
341 std::cerr <<"Insert failue.retrying... error msg:" <<e.what() << '\n';
342 try
343 {
344 client.ResetConnection();
345 client.Insert("_protocol_hash_table", block_protocol);
346 }
347 catch(const std::exception& e)
348 {
349 std::cerr <<"Insert failue and drop .error msg:" <<e.what() << '\n';
350 }
351 }
352 };
353};
HTTP统计信息收集器
HTTP统计信息收集器
Definition HttpStatsCollector.h:174
HttpGeneralStats & getGeneralStats()
获取HTTP通用统计信息
Definition HttpStatsCollector.h:290
HttpResponseStats & getResponseStats()
获取HTTP响应统计信息
Definition HttpStatsCollector.h:302
HttpRequestStats & getRequestStats()
获取HTTP请求统计信息
Definition HttpStatsCollector.h:296
std::shared_ptr< ColumnUInt16 > col_flag
标志位列
Definition packet_handler.h:131
std::shared_ptr< ColumnUInt32 > col_hash_val
哈希值列
Definition packet_handler.h:114
packet_handler(std::string tb_prefix="")
构造函数
Definition packet_handler.h:145
std::shared_ptr< ColumnString > col_src_ip
源IP地址列
Definition packet_handler.h:118
std::shared_ptr< ColumnUInt32 > col_seq_num
序列号列
Definition packet_handler.h:130
std::shared_ptr< ColumnUInt16 > col_offset
片偏移列
Definition packet_handler.h:123
int queue_len
当前队列长度
Definition packet_handler.h:109
std::shared_ptr< ColumnUInt16 > col_dst_port
目标端口列
Definition packet_handler.h:128
void send2db()
发送数据到数据库
Definition packet_handler.h:260
std::shared_ptr< ColumnUInt16 > col_window
窗口大小列
Definition packet_handler.h:132
void flush()
清空所有数据列
Definition packet_handler.h:229
std::shared_ptr< ColumnUInt16 > col_len_load
负载长度列
Definition packet_handler.h:126
std::shared_ptr< ColumnUInt8 > col_tos
TOS列
Definition packet_handler.h:121
std::shared_ptr< ColumnUInt16 > col_src_port
源端口列
Definition packet_handler.h:127
std::shared_ptr< ColumnUInt32 > col_tv_sec
时间戳秒数列(类型待确认)
Definition packet_handler.h:116
std::shared_ptr< ColumnUInt8 > col_protocol
传输层协议列
Definition packet_handler.h:125
std::shared_ptr< ColumnString > col_dst_ip
目标IP地址列
Definition packet_handler.h:119
std::shared_ptr< ColumnUInt32 > col_ack_num
确认号列
Definition packet_handler.h:129
HttpStatsCollector collector
HTTP统计收集器
Definition packet_handler.h:136
std::shared_ptr< ColumnInt32 > col_packet_len
包长度列(类型待确认)
Definition packet_handler.h:115
std::shared_ptr< ColumnUInt8 > col_ttl
TTL列
Definition packet_handler.h:120
~packet_handler()
析构函数
Definition packet_handler.h:176
std::shared_ptr< ColumnUInt8 > col_icmp_code
ICMP代码列
Definition packet_handler.h:135
std::shared_ptr< ColumnUInt32 > col_tv_nsec
时间戳纳秒数列
Definition packet_handler.h:117
std::shared_ptr< ColumnUInt16 > col_len_udp
UDP长度列
Definition packet_handler.h:133
Client client
ClickHouse客户端
Definition packet_handler.h:111
std::shared_ptr< ColumnUInt8 > col_icmp_type
ICMP类型列
Definition packet_handler.h:134
std::shared_ptr< ColumnUInt8 > col_protocol_ip
IP协议列
Definition packet_handler.h:124
std::string table_prefix
数据库表前缀
Definition packet_handler.h:110
void handle_packet(pcpp::Packet *packet)
处理函数
Definition packet_handler.h:186
std::shared_ptr< ColumnUInt16 > col_id
IP标识列
Definition packet_handler.h:122
包统一属性基类,表征了IPv4的IP到传输层大多数有用的属性
Definition unifiedPacketAttr.h:46
uint16_t len_load
负载长度
Definition unifiedPacketAttr.h:67
uint8_t protocol
传输层协议类型:TCP=0, UDP=1, ICMP=2
Definition unifiedPacketAttr.h:66
uint8_t icmp_type
ICMP类型
Definition unifiedPacketAttr.h:81
uint32_t ack_num
确认号(UDP无此字段)
Definition unifiedPacketAttr.h:70
uint16_t src_port
源端口(ICMP无此字段)
Definition unifiedPacketAttr.h:68
uint8_t protocol_ip
IP协议类型字段
Definition unifiedPacketAttr.h:63
uint8_t icmp_code
ICMP代码
Definition unifiedPacketAttr.h:82
uint32_t tv_sec
时间戳(秒)
Definition unifiedPacketAttr.h:51
uint16_t dst_port
目的端口(ICMP无此字段)
Definition unifiedPacketAttr.h:69
uint32_t packet_len
包长度
Definition unifiedPacketAttr.h:50
uint8_t tos
服务类型(Type of Service)
Definition unifiedPacketAttr.h:58
uint8_t ttl
生存时间(Time To Live)
Definition unifiedPacketAttr.h:57
uint16_t len_udp
UDP长度字段
Definition unifiedPacketAttr.h:78
uint16_t flag
TCP标志位
Definition unifiedPacketAttr.h:74
std::string dst_ip
目的IP地址
Definition unifiedPacketAttr.h:54
uint16_t window
TCP窗口大小
Definition unifiedPacketAttr.h:75
std::string src_ip
源IP地址
Definition unifiedPacketAttr.h:53
uint32_t tv_nsec
时间戳(纳秒)
Definition unifiedPacketAttr.h:52
uint32_t seq_num
序列号(UDP无此字段)
Definition unifiedPacketAttr.h:71
uint16_t id
IP标识字段
Definition unifiedPacketAttr.h:59
uint32_t hash_val
五元组计算的流ID
Definition unifiedPacketAttr.h:49
uint16_t offset
片偏移字段
Definition unifiedPacketAttr.h:60
void StatsSummary2File(HttpStatsCollector &collector, std::string filename)
将HTTP统计摘要写入文件
Definition packet_handler.h:64
#define MAX_QUEUE_LENGTH
最大队列长度(调整以获得最佳速度)
Definition packet_handler.h:38
#define WRITE_STAT_LINE(description, counter, measurement)
写入统计行的宏定义
Definition packet_handler.h:48
int numOfHttpPackets
HTTP包的总数
Definition HttpStatsCollector.h:56
double averageAmountOfDataPerFlow
每个流的平均HTTP流量
Definition HttpStatsCollector.h:60
double averageNumOfHttpTransactionsPerFlow
每个流的平均HTTP事务数
Definition HttpStatsCollector.h:55
double sampleTime
统计收集总时间
Definition HttpStatsCollector.h:62
Rate httpFlowRate
HTTP流的速率
Definition HttpStatsCollector.h:51
int amountOfHttpTraffic
HTTP流量总字节数
Definition HttpStatsCollector.h:59
int numOfHttpTransactions
HTTP事务的总数
Definition HttpStatsCollector.h:53
int numOfHttpFlows
HTTP流的总数
Definition HttpStatsCollector.h:50
int numOfHttpPipeliningFlows
包含至少一个HTTP管道化事务的HTTP流总数
Definition HttpStatsCollector.h:52
Rate httpTransactionsRate
HTTP事务的速率
Definition HttpStatsCollector.h:54
Rate httpTrafficRate
HTTP流量速率
Definition HttpStatsCollector.h:61
Rate httpPacketRate
HTTP包的速率
Definition HttpStatsCollector.h:57
double averageNumOfPacketsPerFlow
每个流的平均HTTP包数
Definition HttpStatsCollector.h:58
Rate messageRate
该类型HTTP消息的速率
Definition HttpStatsCollector.h:94
int numOfMessages
该类型HTTP消息的总数(请求/响应)
Definition HttpStatsCollector.h:93
double averageMessageHeaderSize
平均消息头大小
Definition HttpStatsCollector.h:96
int totalMessageHeaderSize
消息头的总大小(字节)
Definition HttpStatsCollector.h:95
int numOfMessagesWithContentLength
包含"content-length"字段的响应总数
Definition HttpStatsCollector.h:148
double averageContentLengthSize
平均正文大小
Definition HttpStatsCollector.h:150
int totalContentLengthSize
从包含"content-length"字段的响应中提取的总正文大小
Definition HttpStatsCollector.h:149
double totalRate
总体速率
Definition HttpStatsCollector.h:30
统一包属性类