Skip to content

Commit 5b577df

Browse files
feat(ebpf): add real-time PID filter map updates via process event hook
Register an eBPF process event callback (exec/exit) through the existing register_event_handle C FFI. Events are forwarded via a lock-free channel to the ProcessListener thread, which drains them every 1-second cycle and incrementally updates per-feature PID filter maps via process_single_pid(). This reduces the latency for new process detection from up to 10 seconds (full /proc scan interval) to approximately 1 second, while retaining the 10-second full scan as a fallback for correctness. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 085ad30 commit 5b577df

2 files changed

Lines changed: 252 additions & 3 deletions

File tree

agent/src/ebpf_dispatcher.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,6 +1153,20 @@ impl EbpfCollector {
11531153
warn!("ebpf start_continuous_profiler error.");
11541154
}
11551155

1156+
// Register the process event callback to receive real-time
1157+
// exec/exit notifications from the eBPF layer. This enables
1158+
// near-instant PID filter map updates instead of waiting for
1159+
// the 10-second /proc scan interval.
1160+
if ebpf::register_event_handle(
1161+
ebpf::EVENT_TYPE_PROC_EXEC | ebpf::EVENT_TYPE_PROC_EXIT,
1162+
crate::utils::process::process_event_callback,
1163+
) != 0
1164+
{
1165+
warn!("ebpf register_event_handle for process events failed");
1166+
} else {
1167+
info!("ebpf register_event_handle for process events succeeded");
1168+
}
1169+
11561170
if !is_uprobe_meltdown && !on_cpu.disabled {
11571171
let feature = "ebpf.profile.on_cpu";
11581172
process_listener.register(feature, set_feature_on_cpu);

agent/src/utils/process/linux.rs

Lines changed: 238 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,46 @@ use std::{
2424
process,
2525
sync::{
2626
atomic::{AtomicBool, Ordering::Relaxed},
27-
Arc, Mutex, RwLock,
27+
mpsc::{self, Receiver, Sender, TryRecvError},
28+
Arc, Mutex, OnceLock, RwLock,
2829
},
2930
thread::{self, JoinHandle},
3031
time::Duration,
3132
};
3233

3334
use log::{debug, error, info, trace};
3435
use nix::sys::utsname::uname;
35-
use procfs::process::all_processes_with_root;
36+
use procfs::process::{all_processes_with_root, Process};
3637

3738
use crate::config::ProcessMatcher;
39+
use crate::ebpf;
3840
use crate::platform::{get_os_app_tag_by_exec, ProcessData, ProcessDataOp};
3941

42+
// Global sender for process exec/exit events from the eBPF C callback.
43+
// The C callback (running on the sk-reader thread) sends (pid, event_type)
44+
// tuples through this channel. The ProcessListener drains them in its loop.
45+
static PROCESS_EVENT_SENDER: OnceLock<Mutex<Sender<(u32, u32)>>> = OnceLock::new();
46+
47+
/// Callback invoked from the C eBPF layer on process exec/exit events.
48+
/// This runs on the sk-reader C thread, so it must be non-blocking.
49+
/// The function pointer signature matches `void (*fn)(void *)` in the C API;
50+
/// the Rust FFI declaration in mod.rs declares it as
51+
/// `extern "C" fn(data: *mut PROCESS_EVENT)`.
52+
pub extern "C" fn process_event_callback(data: *mut ebpf::PROCESS_EVENT) {
53+
if data.is_null() {
54+
return;
55+
}
56+
let (pid, event_type) = unsafe { ((*data).pid, (*data).event_type) };
57+
58+
if let Some(sender) = PROCESS_EVENT_SENDER.get() {
59+
if let Ok(sender) = sender.lock() {
60+
// Non-blocking send: if the channel is disconnected, we silently
61+
// drop the event. The 10-second full scan will catch it.
62+
let _ = sender.send((pid, event_type));
63+
}
64+
}
65+
}
66+
4067
//返回当前进程占用内存RSS单位(字节)
4168
pub fn get_memory_rss() -> Result<u64> {
4269
let pid = process::id();
@@ -304,6 +331,9 @@ pub struct ProcessListener {
304331
config: Arc<RwLock<Config>>,
305332

306333
thread_handle: Mutex<Option<JoinHandle<()>>>,
334+
// Receiver end of the channel for eBPF process exec/exit events.
335+
// Created in new(), the sender is stored in the global PROCESS_EVENT_SENDER.
336+
event_receiver: Arc<Mutex<Receiver<(u32, u32)>>>,
307337
}
308338

309339
impl ProcessListener {
@@ -316,11 +346,17 @@ impl ProcessListener {
316346
user: String,
317347
command: Vec<String>,
318348
) -> Self {
349+
// Create a channel for eBPF process events.
350+
// The sender is stored in a global static so the C callback can reach it.
351+
let (tx, rx) = mpsc::channel();
352+
let _ = PROCESS_EVENT_SENDER.set(Mutex::new(tx));
353+
319354
let listener = Self {
320355
features: Default::default(),
321356
running: Arc::new(AtomicBool::new(false)),
322357
thread_handle: Mutex::new(None),
323358
config: Arc::new(RwLock::new(Config::new(proc_root, user, command))),
359+
event_receiver: Arc::new(Mutex::new(rx)),
324360
};
325361

326362
listener.set(process_blacklist, process_matcher);
@@ -516,14 +552,172 @@ impl ProcessListener {
516552
}
517553
}
518554

555+
/// Handle a single PID event (exec or exit) from the eBPF layer.
556+
///
557+
/// For EXEC events: reads /proc/<pid> to build ProcessData, matches against
558+
/// each feature's process_matcher list, and if a feature matches and the PID
559+
/// is not already in its list, adds it and invokes the callback.
560+
///
561+
/// For EXIT events: removes the PID from all feature lists and invokes
562+
/// callbacks for any feature whose PID list changed.
563+
fn process_single_pid(
564+
pid: u32,
565+
event_type: u32,
566+
process_data_cache: &mut HashMap<i32, ProcessData>,
567+
features: &mut Features,
568+
user: &str,
569+
command: &[String],
570+
) {
571+
let (blacklist, features) = (&mut features.blacklist, &mut features.features);
572+
if features.is_empty() {
573+
return;
574+
}
575+
576+
if event_type & ebpf::EVENT_TYPE_PROC_EXEC != 0 {
577+
// EXEC event: read process info from /proc and match against features
578+
let proc_pid = pid as i32;
579+
let process = match Process::new(proc_pid) {
580+
Ok(p) => p,
581+
Err(e) => {
582+
debug!(
583+
"process_single_pid: failed to read /proc/{}: {}",
584+
pid, e
585+
);
586+
return;
587+
}
588+
};
589+
590+
// Check blacklist
591+
match process.status().map(|s| s.name) {
592+
Ok(name) if blacklist.binary_search(&name).is_ok() => {
593+
trace!(
594+
"process_single_pid: process {name} (pid#{pid}) in blacklist, skipping"
595+
);
596+
return;
597+
}
598+
Ok(_) => (),
599+
Err(e) => {
600+
debug!(
601+
"process_single_pid: failed to get status for pid {}: {}",
602+
pid, e
603+
);
604+
return;
605+
}
606+
}
607+
608+
// Build ProcessData for this PID
609+
let pdata = match ProcessData::try_from(&process) {
610+
Ok(d) => d,
611+
Err(e) => {
612+
debug!(
613+
"process_single_pid: failed to build ProcessData for pid {}: {}",
614+
pid, e
615+
);
616+
return;
617+
}
618+
};
619+
process_data_cache.insert(proc_pid, pdata);
620+
621+
// Get tags (may be empty if command is not configured)
622+
let tags_map = match get_os_app_tag_by_exec(user, command) {
623+
Ok(tags) => tags,
624+
Err(_) => HashMap::new(),
625+
};
626+
627+
// Match the new PID against each feature
628+
for (key, node) in features.iter_mut() {
629+
if node.process_matcher.is_empty() || node.callback.is_none() {
630+
continue;
631+
}
632+
633+
let pdata = match process_data_cache.get(&proc_pid) {
634+
Some(d) => d,
635+
None => continue,
636+
};
637+
638+
let mut matched = false;
639+
let mut is_ignored = false;
640+
let mut matched_process_data = None;
641+
642+
for matcher in &node.process_matcher {
643+
if let Some(process_data) = matcher.get_process_data(pdata, &tags_map) {
644+
if matcher.ignore {
645+
is_ignored = true;
646+
break;
647+
}
648+
matched = true;
649+
matched_process_data = Some(process_data);
650+
break;
651+
}
652+
}
653+
654+
if is_ignored || !matched {
655+
continue;
656+
}
657+
658+
let pid_u32 = pid;
659+
// Check if PID is already in the list (binary search since list is sorted)
660+
if node.pids.binary_search(&pid_u32).is_ok() {
661+
continue;
662+
}
663+
664+
// Add PID and re-sort
665+
node.pids.push(pid_u32);
666+
node.pids.sort();
667+
node.pids.dedup();
668+
669+
if let Some(pd) = matched_process_data {
670+
node.process_datas.push(pd);
671+
node.process_datas.sort_by_key(|x| x.pid);
672+
node.process_datas.merge_and_dedup();
673+
}
674+
675+
debug!(
676+
"process_single_pid: Feature {} added pid {}, total {} pids.",
677+
key,
678+
pid,
679+
node.pids.len()
680+
);
681+
node.callback.as_ref().unwrap()(&node.pids, &node.process_datas);
682+
}
683+
} else if event_type & ebpf::EVENT_TYPE_PROC_EXIT != 0 {
684+
// EXIT event: remove PID from all feature lists
685+
let pid_u32 = pid;
686+
let proc_pid = pid as i32;
687+
688+
// Remove from process_data_cache
689+
process_data_cache.remove(&proc_pid);
690+
691+
for (key, node) in features.iter_mut() {
692+
if node.callback.is_none() {
693+
continue;
694+
}
695+
696+
if let Ok(idx) = node.pids.binary_search(&pid_u32) {
697+
node.pids.remove(idx);
698+
node.process_datas.retain(|pd| pd.pid != pid as u64);
699+
700+
debug!(
701+
"process_single_pid: Feature {} removed pid {}, remaining {} pids.",
702+
key,
703+
pid,
704+
node.pids.len()
705+
);
706+
node.callback.as_ref().unwrap()(&node.pids, &node.process_datas);
707+
}
708+
}
709+
}
710+
}
711+
519712
pub fn start(&self) {
520713
if self.running.swap(true, Relaxed) {
521714
return;
522715
}
523-
info!("Startting process listener ...");
716+
info!("Starting process listener ...");
524717
let features = self.features.clone();
525718
let running = self.running.clone();
526719
let config = self.config.clone();
720+
let event_receiver = self.event_receiver.clone();
527721

528722
running.store(true, Relaxed);
529723
*self.thread_handle.lock().unwrap() = Some(
@@ -534,6 +728,47 @@ impl ProcessListener {
534728
let mut process_data = HashMap::new();
535729
while running.load(Relaxed) {
536730
thread::sleep(Duration::from_secs(1));
731+
732+
// Drain all pending eBPF process events (non-blocking).
733+
// Each event triggers an incremental PID update.
734+
if let Ok(receiver) = event_receiver.lock() {
735+
let mut event_count = 0u32;
736+
loop {
737+
match receiver.try_recv() {
738+
Ok((pid, event_type)) => {
739+
let current_config = config.read().unwrap();
740+
let mut features = features.write().unwrap();
741+
Self::process_single_pid(
742+
pid,
743+
event_type,
744+
&mut process_data,
745+
&mut features,
746+
&current_config.user,
747+
&current_config.command,
748+
);
749+
drop(features);
750+
drop(current_config);
751+
event_count += 1;
752+
}
753+
Err(TryRecvError::Empty) => break,
754+
Err(TryRecvError::Disconnected) => {
755+
debug!(
756+
"process event channel disconnected, \
757+
falling back to polling only"
758+
);
759+
break;
760+
}
761+
}
762+
}
763+
if event_count > 0 {
764+
debug!(
765+
"process listener drained {} eBPF events in this cycle",
766+
event_count
767+
);
768+
}
769+
}
770+
771+
// Full /proc scan every INTERVAL seconds as fallback
537772
count += 1;
538773
if count < Self::INTERVAL {
539774
continue;

0 commit comments

Comments
 (0)