Skip to content
Snippets Groups Projects
Commit 5f16dd32 authored by Ryan Zuklie's avatar Ryan Zuklie Committed by Automerger Merge Worker
Browse files

Merge "Add interning support to NetworkTracing" am: e9e79833

parents 8e8100db e9e79833
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,7 @@ PERFETTO_DEFINE_DATA_SOURCE_STATIC_MEMBERS(android::bpf::NetworkTraceHandler);
namespace android {
namespace bpf {
using ::android::bpf::internal::NetworkTracePoller;
using ::perfetto::protos::pbzero::NetworkPacketBundle;
using ::perfetto::protos::pbzero::NetworkPacketEvent;
using ::perfetto::protos::pbzero::NetworkPacketTraceConfig;
using ::perfetto::protos::pbzero::TracePacket;
......@@ -59,9 +60,6 @@ using ::perfetto::protos::pbzero::TrafficDirection;
// the amount of redundant information written, thus reducing the overall trace
// size. Interning ids are similarly based on unique bundle contexts.
// Keys are PacketTraces where timestamp and length are ignored.
using BundleKey = PacketTrace;
// Based on boost::hash_combine
template <typename T, typename... Rest>
void HashCombine(std::size_t& seed, const T& val, const Rest&... rest) {
......@@ -81,19 +79,15 @@ struct BundleDetails {
(x).ifindex, (x).uid, (x).tag, (x).sport, (x).dport, (x).egress, \
(x).ipProto, (x).tcpFlags
struct BundleHash {
std::size_t operator()(const BundleKey& a) const {
std::size_t seed = 0;
HashCombine(seed, AGG_FIELDS(a));
return seed;
}
};
std::size_t BundleHash::operator()(const BundleKey& a) const {
std::size_t seed = 0;
HashCombine(seed, AGG_FIELDS(a));
return seed;
}
struct BundleEq {
bool operator()(const BundleKey& a, const BundleKey& b) const {
return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
}
};
bool BundleEq::operator()(const BundleKey& a, const BundleKey& b) const {
return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
}
// static
void NetworkTraceHandler::RegisterDataSource() {
......@@ -163,6 +157,8 @@ void NetworkTraceHandler::Write(const std::vector<PacketTrace>& packets,
}
return;
}
uint64_t minTs = std::numeric_limits<uint64_t>::max();
std::unordered_map<BundleKey, BundleDetails, BundleHash, BundleEq> bundles;
for (const PacketTrace& pkt : packets) {
BundleKey key = pkt;
......@@ -174,6 +170,8 @@ void NetworkTraceHandler::Write(const std::vector<PacketTrace>& packets,
if (mDropLocalPort) (key.egress ? key.sport : key.dport) = 0;
if (mDropRemotePort) (key.egress ? key.dport : key.sport) = 0;
minTs = std::min(minTs, pkt.timestampNs);
BundleDetails& bundle = bundles[key];
bundle.time_and_len.emplace_back(pkt.timestampNs, pkt.length);
bundle.minTs = std::min(bundle.minTs, pkt.timestampNs);
......@@ -181,15 +179,25 @@ void NetworkTraceHandler::Write(const std::vector<PacketTrace>& packets,
bundle.bytes += pkt.length;
}
// If state was cleared, emit a separate packet to indicate it. This uses the
// overall minTs so it is sorted before any packets that follow.
NetworkTraceState* incr_state = ctx.GetIncrementalState();
if (!bundles.empty() && mInternLimit && incr_state->cleared) {
auto clear = ctx.NewTracePacket();
clear->set_sequence_flags(TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
clear->set_timestamp(minTs);
incr_state->cleared = false;
}
for (const auto& kv : bundles) {
const BundleKey& key = kv.first;
const BundleDetails& details = kv.second;
auto dst = ctx.NewTracePacket();
dst->set_sequence_flags(TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
dst->set_timestamp(details.minTs);
auto* event = dst->set_network_packet_bundle();
Fill(key, event->set_ctx());
auto* event = FillWithInterning(incr_state, key, dst.get());
int count = details.time_and_len.size();
if (!mAggregationThreshold || count < mAggregationThreshold) {
......@@ -237,5 +245,39 @@ void NetworkTraceHandler::Fill(const PacketTrace& src,
}
}
NetworkPacketBundle* NetworkTraceHandler::FillWithInterning(
NetworkTraceState* state, const BundleKey& key, TracePacket* dst) {
uint64_t iid = 0;
bool found = false;
if (state->iids.size() < mInternLimit) {
auto [iter, success] = state->iids.try_emplace(key, state->iids.size() + 1);
iid = iter->second;
found = true;
if (success) {
// If we successfully empaced, record the newly interned data.
auto* packet_context = dst->set_interned_data()->add_packet_context();
Fill(key, packet_context->set_ctx());
packet_context->set_iid(iid);
}
} else {
auto iter = state->iids.find(key);
if (iter != state->iids.end()) {
iid = iter->second;
found = true;
}
}
auto* event = dst->set_network_packet_bundle();
if (found) {
event->set_iid(iid);
} else {
Fill(key, event->set_ctx());
}
return event;
}
} // namespace bpf
} // namespace android
......@@ -338,5 +338,57 @@ TEST_F(NetworkTraceHandlerTest, DropTcpFlags) {
EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_tcp_flags());
}
TEST_F(NetworkTraceHandlerTest, Interning) {
NetworkPacketTraceConfig config;
config.set_intern_limit(2);
// The test writes 4 packets coming from three sources (uids). With an intern
// limit of 2, the first two sources should be interned. This test splits this
// into individual writes since internally an unordered map is used and would
// otherwise non-deterministically choose what to intern (this is fine for
// real use, but not good for test assertions).
std::vector<std::vector<PacketTrace>> inputs = {
{PacketTrace{.timestampNs = 1, .uid = 123}},
{PacketTrace{.timestampNs = 2, .uid = 456}},
{PacketTrace{.timestampNs = 3, .uid = 789}},
{PacketTrace{.timestampNs = 4, .uid = 123}},
};
auto session = StartTracing(config);
HandlerForTest::Trace([&](HandlerForTest::TraceContext ctx) {
ctx.GetDataSourceLocked()->Write(inputs[0], ctx);
ctx.GetDataSourceLocked()->Write(inputs[1], ctx);
ctx.GetDataSourceLocked()->Write(inputs[2], ctx);
ctx.GetDataSourceLocked()->Write(inputs[3], ctx);
ctx.Flush();
});
std::vector<TracePacket> events;
ASSERT_TRUE(StopTracing(session.get(), &events));
ASSERT_EQ(events.size(), 4);
// First time seen, emit new interned data, bundle uses iid instead of ctx.
EXPECT_EQ(events[0].network_packet_bundle().iid(), 1);
ASSERT_EQ(events[0].interned_data().packet_context().size(), 1);
EXPECT_EQ(events[0].interned_data().packet_context(0).iid(), 1);
EXPECT_EQ(events[0].interned_data().packet_context(0).ctx().uid(), 123);
// First time seen, emit new interned data, bundle uses iid instead of ctx.
EXPECT_EQ(events[1].network_packet_bundle().iid(), 2);
ASSERT_EQ(events[1].interned_data().packet_context().size(), 1);
EXPECT_EQ(events[1].interned_data().packet_context(0).iid(), 2);
EXPECT_EQ(events[1].interned_data().packet_context(0).ctx().uid(), 456);
// Not enough room in intern table (limit 2), inline the context.
EXPECT_EQ(events[2].network_packet_bundle().ctx().uid(), 789);
EXPECT_EQ(events[2].interned_data().packet_context().size(), 0);
// Second time seen, no need to re-emit interned data, only record iid.
EXPECT_EQ(events[3].network_packet_bundle().iid(), 1);
EXPECT_EQ(events[3].interned_data().packet_context().size(), 0);
}
} // namespace bpf
} // namespace android
......@@ -30,10 +30,39 @@
namespace android {
namespace bpf {
// BundleKeys are PacketTraces where timestamp and length are ignored.
using BundleKey = PacketTrace;
// BundleKeys are hashed using all fields except timestamp/length.
struct BundleHash {
std::size_t operator()(const BundleKey& a) const;
};
// BundleKeys are equal if all fields except timestamp/length are equal.
struct BundleEq {
bool operator()(const BundleKey& a, const BundleKey& b) const;
};
// Track the bundles we've interned and their corresponding intern id (iid). We
// use IncrementalState (rather than state in the Handler) so that we stay in
// sync with Perfetto's periodic state clearing (which helps recover from packet
// loss). When state is cleared, the state object is replaced with a new default
// constructed instance.
struct NetworkTraceState {
bool cleared;
std::unordered_map<BundleKey, uint64_t, BundleHash, BundleEq> iids;
};
// Inject our custom incremental state type using type traits.
struct NetworkTraceTraits : public perfetto::DefaultDataSourceTraits {
using IncrementalStateType = NetworkTraceState;
};
// NetworkTraceHandler implements the android.network_packets data source. This
// class is registered with Perfetto and is instantiated when tracing starts and
// destroyed when tracing ends. There is one instance per trace session.
class NetworkTraceHandler : public perfetto::DataSource<NetworkTraceHandler> {
class NetworkTraceHandler
: public perfetto::DataSource<NetworkTraceHandler, NetworkTraceTraits> {
public:
// Registers this DataSource.
static void RegisterDataSource();
......@@ -56,6 +85,11 @@ class NetworkTraceHandler : public perfetto::DataSource<NetworkTraceHandler> {
void Fill(const PacketTrace& src,
::perfetto::protos::pbzero::NetworkPacketEvent* event);
// Fills in contextual information either inline or via interning.
::perfetto::protos::pbzero::NetworkPacketBundle* FillWithInterning(
NetworkTraceState* state, const BundleKey& key,
::perfetto::protos::pbzero::TracePacket* dst);
static internal::NetworkTracePoller sPoller;
bool mStarted;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment