Skip to content

Commit a4d7320

Browse files
committed
Ensure fork recovery before serving requests
1 parent 85e82b5 commit a4d7320

File tree

4 files changed

+122
-32
lines changed

4 files changed

+122
-32
lines changed

crates/sandchest-agent/src/service.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ impl GuestAgent for GuestAgentService {
4747
&self,
4848
request: Request<ExecRequest>,
4949
) -> Result<Response<Self::ExecStream>, Status> {
50+
snapshot::ensure_recovered_if_needed(&self.session_manager).await;
5051
snapshot::note_user_activity();
5152
let stream = crate::exec::spawn_exec(request.into_inner());
5253
Ok(Response::new(stream))
@@ -56,6 +57,7 @@ impl GuestAgent for GuestAgentService {
5657
&self,
5758
request: Request<CreateSessionRequest>,
5859
) -> Result<Response<SessionResponse>, Status> {
60+
snapshot::ensure_recovered_if_needed(&self.session_manager).await;
5961
snapshot::note_user_activity();
6062
let req = request.into_inner();
6163
let session_id = self
@@ -71,6 +73,7 @@ impl GuestAgent for GuestAgentService {
7173
&self,
7274
request: Request<SessionExecRequest>,
7375
) -> Result<Response<Self::SessionExecStream>, Status> {
76+
snapshot::ensure_recovered_if_needed(&self.session_manager).await;
7477
snapshot::note_user_activity();
7578
let req = request.into_inner();
7679
let stream = self
@@ -84,6 +87,7 @@ impl GuestAgent for GuestAgentService {
8487
&self,
8588
request: Request<SessionInputRequest>,
8689
) -> Result<Response<()>, Status> {
90+
snapshot::ensure_recovered_if_needed(&self.session_manager).await;
8791
snapshot::note_user_activity();
8892
let req = request.into_inner();
8993
self.session_manager
@@ -96,6 +100,7 @@ impl GuestAgent for GuestAgentService {
96100
&self,
97101
request: Request<DestroySessionRequest>,
98102
) -> Result<Response<()>, Status> {
103+
snapshot::ensure_recovered_if_needed(&self.session_manager).await;
99104
snapshot::note_user_activity();
100105
let req = request.into_inner();
101106
self.session_manager
@@ -108,6 +113,7 @@ impl GuestAgent for GuestAgentService {
108113
&self,
109114
request: Request<Streaming<FileChunk>>,
110115
) -> Result<Response<PutFileResponse>, Status> {
116+
snapshot::ensure_recovered_if_needed(&self.session_manager).await;
111117
snapshot::note_user_activity();
112118
let response = crate::files::put_file(request.into_inner()).await?;
113119
Ok(Response::new(response))
@@ -119,6 +125,7 @@ impl GuestAgent for GuestAgentService {
119125
&self,
120126
request: Request<GetFileRequest>,
121127
) -> Result<Response<Self::GetFileStream>, Status> {
128+
snapshot::ensure_recovered_if_needed(&self.session_manager).await;
122129
snapshot::note_user_activity();
123130
let stream = crate::files::spawn_get_file(request.into_inner());
124131
Ok(Response::new(stream))
@@ -128,6 +135,7 @@ impl GuestAgent for GuestAgentService {
128135
&self,
129136
request: Request<ListFilesRequest>,
130137
) -> Result<Response<ListFilesResponse>, Status> {
138+
snapshot::ensure_recovered_if_needed(&self.session_manager).await;
131139
snapshot::note_user_activity();
132140
let response = crate::files::list_files(request.into_inner()).await?;
133141
Ok(Response::new(response))

crates/sandchest-agent/src/session.rs

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -487,15 +487,21 @@ async fn run_session_exec(
487487
/// Look for the sentinel pattern in the buffer. Returns (output_before_sentinel, exit_code).
488488
fn extract_sentinel(buf: &[u8], sentinel_marker: &str) -> Option<(Vec<u8>, i32)> {
489489
let buf_str = String::from_utf8_lossy(buf);
490-
let marker_pos = buf_str.find(sentinel_marker)?;
491-
492-
let after_marker = &buf_str[marker_pos + sentinel_marker.len()..];
493-
let suffix_pos = after_marker.find(SENTINEL_SUFFIX)?;
494-
let exit_code_str = &after_marker[..suffix_pos];
495-
let exit_code: i32 = exit_code_str.parse().unwrap_or(-1);
490+
let mut search_from = 0;
491+
492+
while let Some(relative_pos) = buf_str[search_from..].find(sentinel_marker) {
493+
let marker_pos = search_from + relative_pos;
494+
let after_marker = &buf_str[marker_pos + sentinel_marker.len()..];
495+
let suffix_pos = after_marker.find(SENTINEL_SUFFIX)?;
496+
let exit_code_str = &after_marker[..suffix_pos];
497+
if let Ok(exit_code) = exit_code_str.parse::<i32>() {
498+
let output = buf[..marker_pos].to_vec();
499+
return Some((output, exit_code));
500+
}
501+
search_from = marker_pos + sentinel_marker.len();
502+
}
496503

497-
let output = buf[..marker_pos].to_vec();
498-
Some((output, exit_code))
504+
None
499505
}
500506

501507
/// Strip the echoed command line from PTY output.
@@ -526,6 +532,20 @@ fn strip_command_echo(output: &[u8], cmd: &str) -> Vec<u8> {
526532
#[cfg(test)]
527533
mod tests {
528534
use super::*;
535+
use tokio_stream::StreamExt;
536+
537+
async fn collect_session_events(
538+
manager: &SessionManager,
539+
session_id: &str,
540+
cmd: &str,
541+
) -> Vec<Result<ExecEvent, Status>> {
542+
manager
543+
.spawn_session_exec(session_id, cmd.to_string(), 10)
544+
.await
545+
.unwrap()
546+
.collect()
547+
.await
548+
}
529549

530550
// ---- extract_sentinel tests ----
531551

@@ -590,8 +610,7 @@ mod tests {
590610
fn extract_sentinel_invalid_exit_code() {
591611
let marker = "__SC_SENTINEL_999_";
592612
let buf = b"__SC_SENTINEL_999_notanumber__\n";
593-
let (_, code) = extract_sentinel(buf, marker).unwrap();
594-
assert_eq!(code, -1); // parse failure falls back to -1
613+
assert!(extract_sentinel(buf, marker).is_none());
595614
}
596615

597616
#[test]
@@ -611,6 +630,19 @@ mod tests {
611630
assert_eq!(code, 255);
612631
}
613632

633+
#[test]
634+
fn extract_sentinel_skips_echoed_marker_literal() {
635+
let marker = "__SC_SENTINEL_123_";
636+
let buf =
637+
b"echo \"__SC_SENTINEL_123_${__sc_exit}__\"\nreal output\n__SC_SENTINEL_123_0__\n";
638+
let (output, code) = extract_sentinel(buf, marker).unwrap();
639+
assert_eq!(
640+
String::from_utf8_lossy(&output),
641+
"echo \"__SC_SENTINEL_123_${__sc_exit}__\"\nreal output\n"
642+
);
643+
assert_eq!(code, 0);
644+
}
645+
614646
// ---- strip_command_echo tests ----
615647

616648
#[test]
@@ -746,6 +778,46 @@ mod tests {
746778
manager.destroy_session(&id).await.unwrap();
747779
}
748780

781+
#[tokio::test]
782+
async fn session_exec_runs_command_and_preserves_working_directory() {
783+
let manager = SessionManager::new();
784+
let env = HashMap::new();
785+
let session_id = manager.create_session("", "", &env).await.unwrap();
786+
787+
let prime_events = collect_session_events(
788+
&manager,
789+
&session_id,
790+
"cd /tmp && printf '%s' test-session > /tmp/session-test.txt",
791+
)
792+
.await;
793+
let prime_exit = prime_events
794+
.iter()
795+
.find_map(|event| match event.as_ref().ok()?.event.as_ref()? {
796+
exec_event::Event::Exit(exit) => Some(exit.exit_code),
797+
_ => None,
798+
});
799+
assert_eq!(prime_exit, Some(0));
800+
801+
let persisted_events =
802+
collect_session_events(&manager, &session_id, "pwd && cat /tmp/session-test.txt").await;
803+
let mut stdout = Vec::new();
804+
let mut exit = None;
805+
for event in &persisted_events {
806+
let event = event.as_ref().unwrap();
807+
match event.event.as_ref() {
808+
Some(exec_event::Event::Stdout(data)) => stdout.extend_from_slice(data),
809+
Some(exec_event::Event::Exit(info)) => exit = Some(info.exit_code),
810+
_ => {}
811+
}
812+
}
813+
814+
assert_eq!(exit, Some(0));
815+
let normalized = String::from_utf8_lossy(&stdout).replace("\r\n", "\n");
816+
assert_eq!(normalized.trim_end(), "/tmp\ntest-session");
817+
818+
manager.destroy_session(&session_id).await.unwrap();
819+
}
820+
749821
#[tokio::test]
750822
async fn session_manager_destroy_all() {
751823
let manager = SessionManager::new();

crates/sandchest-agent/src/snapshot.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33
use std::sync::atomic::{AtomicU64, Ordering};
44
use std::time::{SystemTime, UNIX_EPOCH};
55

6+
use tokio::sync::Mutex;
67
use tracing::{info, warn};
78

89
use crate::session::SessionManager;
@@ -13,6 +14,7 @@ const STALE_THRESHOLD_SECS: u64 = 5;
1314
#[cfg(target_os = "linux")]
1415
const URANDOM_SEED_BYTES: usize = 256;
1516
static LAST_USER_ACTIVITY_SECS: AtomicU64 = AtomicU64::new(0);
17+
static RECOVERY_LOCK: Mutex<()> = Mutex::const_new(());
1618

1719
fn current_unix_secs() -> u64 {
1820
SystemTime::now()
@@ -229,6 +231,34 @@ async fn perform_fork_recovery(session_manager: &SessionManager) {
229231
info!("Fork recovery complete — agent ready");
230232
}
231233

234+
pub async fn ensure_recovered_if_needed(session_manager: &SessionManager) {
235+
let _guard = RECOVERY_LOCK.lock().await;
236+
let heartbeat_path = Path::new(HEARTBEAT_PATH);
237+
238+
let Some(file_ts) = read_heartbeat_timestamp(heartbeat_path) else {
239+
return;
240+
};
241+
242+
let now = current_unix_secs();
243+
let last_activity = last_user_activity_secs();
244+
245+
if should_perform_fork_recovery(file_ts, now, last_activity) {
246+
info!(
247+
stale_secs = now - file_ts,
248+
"Stale heartbeat detected before serving traffic — running fork recovery"
249+
);
250+
perform_fork_recovery(session_manager).await;
251+
} else if heartbeat_is_stale(file_ts, now) {
252+
warn!(
253+
stale_secs = now - file_ts,
254+
last_activity_secs = last_activity,
255+
heartbeat_secs = file_ts,
256+
"Skipping fork recovery because the agent has already served post-resume traffic"
257+
);
258+
write_heartbeat().await;
259+
}
260+
}
261+
232262
/// Write current timestamp to the heartbeat file.
233263
async fn write_heartbeat() {
234264
let ts = SystemTime::now()
@@ -252,27 +282,7 @@ async fn write_heartbeat() {
252282
pub fn start_snapshot_watcher(session_manager: Arc<SessionManager>) {
253283
tokio::spawn(async move {
254284
loop {
255-
// Check for stale heartbeat BEFORE writing a fresh one
256-
let heartbeat_path = Path::new(HEARTBEAT_PATH);
257-
if let Some(file_ts) = read_heartbeat_timestamp(heartbeat_path) {
258-
let now = current_unix_secs();
259-
let last_activity = last_user_activity_secs();
260-
261-
if should_perform_fork_recovery(file_ts, now, last_activity) {
262-
info!(
263-
stale_secs = now - file_ts,
264-
"Stale heartbeat detected — snapshot restore likely"
265-
);
266-
perform_fork_recovery(&session_manager).await;
267-
} else if heartbeat_is_stale(file_ts, now) {
268-
warn!(
269-
stale_secs = now - file_ts,
270-
last_activity_secs = last_activity,
271-
heartbeat_secs = file_ts,
272-
"Skipping fork recovery because the agent has already served post-resume traffic"
273-
);
274-
}
275-
}
285+
ensure_recovered_if_needed(&session_manager).await;
276286

277287
write_heartbeat().await;
278288

packages/admin-cli/src/sandbox-smoke.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ export async function runSandboxSmokeTest(
355355
const persistedResult = await session.exec(`pwd && cat "${sessionPath}"`, { timeout: 60 })
356356
assertExecSuccess(persistedResult, 'session persisted exec')
357357

358-
const output = persistedResult.stdout.trimEnd()
358+
const output = persistedResult.stdout.replace(/\r\n/g, '\n').trimEnd()
359359
assert(
360360
output === `/tmp\nsession:${runId}`,
361361
`Session state did not persist as expected: ${JSON.stringify(output)}`,

0 commit comments

Comments
 (0)