This repository has been archived by the owner on Mar 1, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader_t.hpp
117 lines (93 loc) · 3.65 KB
/
reader_t.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//
// Created by System Administrator on 2019-01-29.
//
#ifndef HEARTBEAT_READER_T_HPP
#define HEARTBEAT_READER_T_HPP
#include <string>
#include <vector>
#include <probe_dto_t.hpp>
#include <tins/tins.h>
//#include <database_t.hpp>
#include <iostream>
#include <fstream>
#include <process_options_t.hpp>
struct output_file_t{
public:
explicit output_file_t (const std::string & ofile, int round, int snapshot) : m_ofstream(), m_round(round), m_snapshot(snapshot){
m_ofstream.open(ofile);
}
~output_file_t(){
m_ofstream.close();
}
void operator ()(const std::vector<probe_dto_t> & probes_dto) {
for (const auto & probe_dto : probes_dto){
m_ofstream.precision(1);
// Compute the /24 prefix
uint32_t prefix = (probe_dto.m_indirect_ip >> 8) << 8; // Get the 24 bits of the network.
m_ofstream << std::fixed << probe_dto.m_source_ip << "," << prefix << "," << probe_dto.m_indirect_ip << "," <<
probe_dto.m_reply_ip <<","
<< unsigned(probe_dto.m_proto) << ","
<< probe_dto.m_sport <<"," << probe_dto.m_dport
<< "," << unsigned(probe_dto.m_ttl)
<< "," << unsigned(probe_dto.m_ttl_from_udp_length)
<< "," << unsigned(probe_dto.m_type) << "," << unsigned(probe_dto.m_code) <<
"," << probe_dto.m_rtt <<
"," << unsigned(probe_dto.m_reply_ttl) <<
"," << probe_dto.m_reply_size <<
// This was for versioned probe engine of Clickhouse.
// "," << m_round << ",1," << m_snapshot << "\n";
"," << m_round << "," << m_snapshot << "\n";
}
}
private:
std::ofstream m_ofstream;
int m_round;
int m_snapshot;
};
class reader_t {
public:
explicit reader_t(const process_options_t & options);
template<typename OutputF>
void output(const std::string & pcap_file, int batch_size, OutputF & output_f){
using namespace Tins;
std::vector<probe_dto_t> batch;
batch.reserve(batch_size);
long n_packets = 0;
FileSniffer sniffer {pcap_file};
// long total_packets = std::distance(sniffer.begin(), sniffer.end());
// std::cout << total_packets << " packets to read.\n";
auto handler = [this, &n_packets, &batch, &output_f, batch_size](Packet & packet) {
++n_packets;
if (n_packets % 1000000 == 0){
std::cout << n_packets << " packets read\n";
}
// if (n_packets < 600000000){
// return true;
// }
auto probe_dto = read_packet(packet);
// Check integrity of the probe
if (probe_dto.m_dport != 0){
batch.push_back(probe_dto);
}
if (batch.size() == batch_size){
// Proceed to transaction.
output_f(batch);
batch.clear();
}
return true;
};
sniffer.sniff_loop(handler);
output_f(batch);
std::cout << n_packets << "Packets read\n";
}
void set_reference_time(const std::string & start_time_log_file);
private:
probe_dto_t read_packet(const Tins::Packet &) const;
double compute_rtt_from_tcp(uint32_t seq_number, double receive_time) const;
double compute_rtt_from_udp(uint16_t checksum, double receive_time) const;
// void insert_batch (const std::vector<probe_dto_t> &);
process_options_t m_options;
// database_t* m_db;
std::vector<double> reference_times;
};
#endif //HEARTBEAT_READER_T_HPP