Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split tc programs from generic tracer #1267

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions bpf/http_maps.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#ifndef HTTP_MAPS_H
#define HTTP_MAPS_H

#include "vmlinux.h"
#include "bpf_helpers.h"
#include "http_types.h"

// Keeps track of the ongoing http connections we match for request/response
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, pid_connection_info_t);
__type(value, http_info_t);
__uint(max_entries, MAX_CONCURRENT_SHARED_REQUESTS);
__uint(pinning, BEYLA_PIN_INTERNAL);
} ongoing_http SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, connection_info_t);
__type(value, http_info_t);
__uint(max_entries, 1024);
__uint(pinning, BEYLA_PIN_INTERNAL);
} ongoing_http_fallback SEC(".maps");

#endif
1 change: 0 additions & 1 deletion bpf/http_ssl_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "http_types.h"
#include "k_tracer_defs.h"
#include "bpf_dbg.h"
#include "pid.h"
#include "sockaddr.h"
#include "tcp_info.h"
#include "pin_internal.h"
Expand Down
89 changes: 0 additions & 89 deletions bpf/k_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "tcp_info.h"
#include "k_tracer_defs.h"
#include "http_ssl_defs.h"
#include "tc_ip.h"
#include "pin_internal.h"

// Temporary tracking of accept arguments
Expand Down Expand Up @@ -691,91 +690,3 @@ int BPF_KPROBE(kprobe_sys_exit, int status) {

return 0;
}

SEC("tc_ingress")
int app_ingress(struct __sk_buff *skb) {
//bpf_printk("ingress");

protocol_info_t tcp = {};
connection_info_t conn = {};

if (!read_sk_buff(skb, &tcp, &conn)) {
return 0;
}

if (tcp_ack(&tcp)) { // ack field must be set, which means we are looking at non SYN packet
// assumes we are the only ones that added options, this can be improved
if (tcp.h_proto == ETH_P_IP && tcp.ip_len == MIN_IP_LEN + MAX_TC_TP_LEN) {
parse_ip_options_ipv4(skb, &conn, &tcp);
} else if (tcp.h_proto == ETH_P_IPV6 &&
tcp.l4_proto == IP_V6_DEST_OPTS) { // Destination options used
parse_ip_options_ipv6(skb, &conn, &tcp);
}
}

return 0;
}

static __always_inline void
update_outgoing_request_span_id(connection_info_t *conn, protocol_info_t *tcp, tp_info_pid_t *tp) {
pid_connection_info_t p_conn = {};
__builtin_memcpy(&p_conn.conn, conn, sizeof(connection_info_t));
p_conn.pid = tp->pid;

http_info_t *h_info = bpf_map_lookup_elem(&ongoing_http, &p_conn);
if (h_info && tp->valid) {
bpf_printk("Found HTTP info, resetting the span id to %x%x", tcp->seq, tcp->ack);
*((u32 *)(&h_info->tp.span_id[0])) = tcp->seq;
*((u32 *)(&h_info->tp.span_id[4])) = tcp->ack;
}
}

static __always_inline void encode_data_in_ip_options(struct __sk_buff *skb,
connection_info_t *conn,
protocol_info_t *tcp,
tp_info_pid_t *tp) {
// Handling IPv4
// We only do this if the IP header doesn't have any options, this can be improved if needed
if (tcp->h_proto == ETH_P_IP && tcp->ip_len == MIN_IP_LEN) {
bpf_printk("Adding the trace_id in the IP Options");

if (inject_tc_ip_options_ipv4(skb, conn, tcp, tp)) {
update_outgoing_request_span_id(conn, tcp, tp);
}

bpf_map_delete_elem(&outgoing_trace_map, conn);
} else if (tcp->h_proto == ETH_P_IPV6 && tcp->l4_proto == IPPROTO_TCP) { // Handling IPv6
bpf_printk("Found IPv6 header");

if (inject_tc_ip_options_ipv6(skb, conn, tcp, tp)) {
update_outgoing_request_span_id(conn, tcp, tp);
}

bpf_map_delete_elem(&outgoing_trace_map, conn);
}
}

SEC("tc_egress")
int app_egress(struct __sk_buff *skb) {
//bpf_printk("egress");

protocol_info_t tcp = {};
connection_info_t conn = {};

if (!read_sk_buff(skb, &tcp, &conn)) {
return 0;
}

sort_connection_info(&conn);

tp_info_pid_t *tp = bpf_map_lookup_elem(&outgoing_trace_map, &conn);

if (tp) {
bpf_printk("egress flags %x, sequence %x", tcp.flags, tcp.seq);
print_http_connection_info(&conn);

encode_data_in_ip_options(skb, &conn, &tcp, tp);
}

return 0;
}
1 change: 0 additions & 1 deletion bpf/protocol_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "bpf_helpers.h"
#include "http_types.h"
#include "ringbuf.h"
#include "pid.h"
#include "bpf_dbg.h"
#include "pin_internal.h"

Expand Down
19 changes: 1 addition & 18 deletions bpf/protocol_http.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
#include "bpf_helpers.h"
#include "http_types.h"
#include "ringbuf.h"
#include "pid.h"
#include "runtime.h"
#include "protocol_common.h"
#include "trace_common.h"
#include "pin_internal.h"
#include "http_maps.h"

volatile const u32 high_request_volume;

Expand All @@ -22,23 +22,6 @@ struct {
__uint(max_entries, 1);
} http_info_mem SEC(".maps");

// Keeps track of the ongoing http connections we match for request/response
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, pid_connection_info_t);
__type(value, http_info_t);
__uint(max_entries, MAX_CONCURRENT_SHARED_REQUESTS);
__uint(pinning, BEYLA_PIN_INTERNAL);
} ongoing_http SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, connection_info_t);
__type(value, http_info_t);
__uint(max_entries, 1024);
__uint(pinning, BEYLA_PIN_INTERNAL);
} ongoing_http_fallback SEC(".maps");

// empty_http_info zeroes and return the unique percpu copy in the map
// this function assumes that a given thread is not trying to use many
// instances at the same time
Expand Down
1 change: 0 additions & 1 deletion bpf/protocol_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "bpf_helpers.h"
#include "http_types.h"
#include "ringbuf.h"
#include "pid.h"
#include "protocol_common.h"
#include "http2_grpc.h"
#include "pin_internal.h"
Expand Down
1 change: 0 additions & 1 deletion bpf/protocol_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "bpf_helpers.h"
#include "http_types.h"
#include "ringbuf.h"
#include "pid.h"
#include "protocol_common.h"
#include "trace_common.h"
#include "pin_internal.h"
Expand Down
98 changes: 98 additions & 0 deletions bpf/tc_tracer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include "vmlinux.h"
#include "bpf_helpers.h"
#include "bpf_dbg.h"

#include "http_maps.h"
#include "http_types.h"
#include "tc_ip.h"
#include "tcp_info.h"

char __license[] SEC("license") = "Dual MIT/GPL";

SEC("tc_ingress")
int app_ingress(struct __sk_buff *skb) {
//bpf_printk("ingress");

protocol_info_t tcp = {};
connection_info_t conn = {};

if (!read_sk_buff(skb, &tcp, &conn)) {
return 0;
}

if (tcp_ack(&tcp)) { // ack field must be set, which means we are looking at non SYN packet
// assumes we are the only ones that added options, this can be improved
if (tcp.h_proto == ETH_P_IP && tcp.ip_len == MIN_IP_LEN + MAX_TC_TP_LEN) {
parse_ip_options_ipv4(skb, &conn, &tcp);
} else if (tcp.h_proto == ETH_P_IPV6 &&
tcp.l4_proto == IP_V6_DEST_OPTS) { // Destination options used
parse_ip_options_ipv6(skb, &conn, &tcp);
}
}

return 0;
}

static __always_inline void
update_outgoing_request_span_id(connection_info_t *conn, protocol_info_t *tcp, tp_info_pid_t *tp) {
pid_connection_info_t p_conn = {};
__builtin_memcpy(&p_conn.conn, conn, sizeof(connection_info_t));
p_conn.pid = tp->pid;

http_info_t *h_info = bpf_map_lookup_elem(&ongoing_http, &p_conn);
if (h_info && tp->valid) {
bpf_printk("Found HTTP info, resetting the span id to %x%x", tcp->seq, tcp->ack);
*((u32 *)(&h_info->tp.span_id[0])) = tcp->seq;
*((u32 *)(&h_info->tp.span_id[4])) = tcp->ack;
}
}

static __always_inline void encode_data_in_ip_options(struct __sk_buff *skb,
connection_info_t *conn,
protocol_info_t *tcp,
tp_info_pid_t *tp) {
// Handling IPv4
// We only do this if the IP header doesn't have any options, this can be improved if needed
if (tcp->h_proto == ETH_P_IP && tcp->ip_len == MIN_IP_LEN) {
bpf_printk("Adding the trace_id in the IP Options");

if (inject_tc_ip_options_ipv4(skb, conn, tcp, tp)) {
update_outgoing_request_span_id(conn, tcp, tp);
}

bpf_map_delete_elem(&outgoing_trace_map, conn);
} else if (tcp->h_proto == ETH_P_IPV6 && tcp->l4_proto == IPPROTO_TCP) { // Handling IPv6
bpf_printk("Found IPv6 header");

if (inject_tc_ip_options_ipv6(skb, conn, tcp, tp)) {
update_outgoing_request_span_id(conn, tcp, tp);
}

bpf_map_delete_elem(&outgoing_trace_map, conn);
}
}

SEC("tc_egress")
int app_egress(struct __sk_buff *skb) {
//bpf_printk("egress");

protocol_info_t tcp = {};
connection_info_t conn = {};

if (!read_sk_buff(skb, &tcp, &conn)) {
return 0;
}

sort_connection_info(&conn);

tp_info_pid_t *tp = bpf_map_lookup_elem(&outgoing_trace_map, &conn);

if (tp) {
bpf_printk("egress flags %x, sequence %x", tcp.flags, tcp.seq);
print_http_connection_info(&conn);

encode_data_in_ip_options(skb, &conn, &tcp, tp);
}

return 0;
}
4 changes: 4 additions & 0 deletions pkg/beyla/os.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func checkCapabilitiesForSetOptions(config *Config, caps *helpers.OSCapabilities
if config.Enabled(FeatureAppO11y) {
testAndSet(caps, capError, unix.CAP_CHECKPOINT_RESTORE)
testAndSet(caps, capError, unix.CAP_SYS_PTRACE)

if config.EBPF.UseLinuxTC {
testAndSet(caps, capError, unix.CAP_NET_ADMIN)
}
}

if config.Enabled(FeatureNetO11y) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ func RunBeyla(ctx context.Context, cfg *beyla.Config) error {

func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) error {
slog.Info("starting Beyla in Application Observability mode")

wg := sync.WaitGroup{}
defer wg.Wait()

instr := appolly.New(ctx, ctxInfo, config)
if err := instr.FindAndInstrument(); err != nil {
if err := instr.FindAndInstrument(&wg); err != nil {
return fmt.Errorf("can't find target process: %w", err)
}
if err := instr.ReadAndForward(); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"log/slog"
"sync"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover"
Expand Down Expand Up @@ -43,15 +44,17 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config)

// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
func (i *Instrumenter) FindAndInstrument() error {
func (i *Instrumenter) FindAndInstrument(wg *sync.WaitGroup) error {
finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo, i.tracesInput)
foundProcesses, deletedProcesses, err := finder.Start()
if err != nil {
return fmt.Errorf("couldn't start Process Finder: %w", err)
}
// In background, listen indefinitely for each new process and run its
// associated ebpf.ProcessTracer once it is found.
wg.Add(1)
go func() {
defer wg.Done()
log := log()
type cancelCtx struct {
ctx context.Context
Expand All @@ -76,7 +79,11 @@ func (i *Instrumenter) FindAndInstrument() error {
cctx.ctx, cctx.cancel = context.WithCancel(i.ctx)
contexts[pt.FileInfo.Ino] = cctx
}
go pt.Tracer.Run(cctx.ctx, i.tracesInput)
wg.Add(1)
go func() {
defer wg.Done()
pt.Tracer.Run(cctx.ctx, i.tracesInput)
}()
}
case dp := <-deletedProcesses:
log.Debug("stopping ProcessTracer because there are no more instances of such process",
Expand Down
5 changes: 5 additions & 0 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/ebpf/generictracer"
"github.com/grafana/beyla/pkg/internal/ebpf/gotracer"
"github.com/grafana/beyla/pkg/internal/ebpf/tctracer"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
Expand Down Expand Up @@ -97,5 +98,9 @@ func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Trac
}

func newGenericTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
if cfg.EBPF.UseLinuxTC {
return []ebpf.Tracer{generictracer.New(cfg, metrics), tctracer.New(cfg)}
Copy link
Contributor Author

@rafaelroquetto rafaelroquetto Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be done here, because simply not calling tctracer.SetupTC() is not enough. The mere attempt to load the ebpf program (as in ebpf.LoadAndAssign()), which happens way before SetupTC() is called, will trigger the kernel verifier in case CAP_NET_ADMIN is not present (which may be a legitmate case when UseLinuxTC is false).

This also keeps the logic giving it visibility where it belongs IMHO.

}

return []ebpf.Tracer{generictracer.New(cfg, metrics)}
}
Loading
Loading