Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ParallelTestRunner"
uuid = "d3525ed8-44d0-4b2c-a655-542cee43accc"
authors = ["Valentin Churavy <v.churavy@gmail.com>"]
version = "2.6.0"
version = "2.6.1"

[deps]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand Down
79 changes: 49 additions & 30 deletions src/ParallelTestRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,39 @@ function anynonpass(ts::Test.AbstractTestSet)
end
end

# Thin compatibility shim for using `Lockable` also in Julia v1.10
if VERSION >= v"1.11.0-DEV.1568"
const Lockable = Base.Lockable
else
# Adapted from <https://github.com/JuliaLang/julia/pull/52898>.
struct Lockable{T, L <: Base.AbstractLock}
value::T
lock::L
end

Lockable(value) = Lockable(value, ReentrantLock())
Base.getindex(l::Lockable) = (Base.assert_havelock(l.lock); l.value)

Base.lock(l::Lockable) = Base.lock(l.lock)
Base.trylock(l::Lockable) = Base.trylock(l.lock)
Base.unlock(l::Lockable) = Base.unlock(l.lock)
end

const ID_COUNTER = Threads.Atomic{Int}(0)

# Thin wrapper around Malt.Worker, to handle the stdio loop differently.
struct PTRWorker <: Malt.AbstractWorker
w::Malt.Worker
io::IOBuffer
io_lock::ReentrantLock
io::Lockable{IOBuffer, ReentrantLock}
id::Int
end

function PTRWorker(; exename=Base.julia_cmd()[1], exeflags=String[], env=String[])
io = IOBuffer()
io_lock = ReentrantLock()
io = Lockable(IOBuffer())
wrkr = Malt.Worker(; exename, exeflags, env, monitor_stdout=false, monitor_stderr=false)
stdio_loop(wrkr, io, io_lock)
stdio_loop(wrkr, io)
id = ID_COUNTER[] += 1
return PTRWorker(wrkr, io, io_lock, id)
return PTRWorker(wrkr, io, id)
end

worker_id(wrkr::PTRWorker) = wrkr.id
Expand Down Expand Up @@ -293,19 +309,19 @@ function print_test_crashed(::Type{<:AbstractTestRecord}, wrkr, test, ctx::TestI
end

# Adapted from `Malt._stdio_loop`
function stdio_loop(worker::Malt.Worker, io, io_lock::ReentrantLock)
function stdio_loop(worker::Malt.Worker, io::Lockable)
Threads.@spawn while !eof(worker.stdout) && Malt.isrunning(worker)
try
bytes = readavailable(worker.stdout)
@lock io_lock write(io, bytes)
@lock io write(io[], bytes)
catch
break
end
end
Threads.@spawn while !eof(worker.stderr) && Malt.isrunning(worker)
try
bytes = readavailable(worker.stderr)
@lock io_lock write(io, bytes)
@lock io write(io[], bytes)
catch
break
end
Expand Down Expand Up @@ -933,10 +949,8 @@ function runtests(mod::Module, args::ParsedArgs;
end

t0 = time()
results = []
running_tests = Dict{String, Float64}() # test => start_time
test_lock = ReentrantLock() # to protect crucial access to tests and running_tests
results_lock = ReentrantLock() # to protect concurrent access to results
results = Lockable([])
running_tests = Lockable(Dict{String, Float64}()) # test => start_time

worker_tasks = Task[]

Expand Down Expand Up @@ -989,16 +1003,19 @@ function runtests(mod::Module, args::ParsedArgs;
end

function update_status()
# only draw if we have something to show
isempty(running_tests) && return
completed = Base.@lock results_lock length(results)
# take consistent snapshots once, so the rest of this function operates on
# frozen data rather than racing with workers that mutate these collections
running_snapshot = @lock running_tests copy(running_tests[])
isempty(running_snapshot) && return
results_snapshot = @lock results copy(results[])
completed = length(results_snapshot)
total = length(tests)

# line 1: empty line
line1 = ""

# line 2: running tests
test_list = sort(collect(keys(running_tests)), by = x -> running_tests[x])
test_list = sort(collect(keys(running_snapshot)), by = x -> running_snapshot[x])
status_parts = map(test_list) do test
"$test"
end
Expand All @@ -1013,23 +1030,23 @@ function runtests(mod::Module, args::ParsedArgs;
line3 = "Progress: $completed/$total tests completed"
if completed > 0
# estimate per-test time (slightly pessimistic)
durations_done = Base.@lock results_lock [end_time - start_time for (_, _,_, start_time, end_time) in results]
durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results_snapshot]
μ = mean(durations_done)
σ = length(durations_done) > 1 ? std(durations_done) : 0.0
est_per_test = μ + 0.5σ

est_remaining = 0.0
## currently-running
for (test, start_time) in running_tests
for (test, start_time) in running_snapshot
elapsed = time() - start_time
duration = get(historical_durations, test, est_per_test)
est_remaining += max(0.0, duration - elapsed)
end
## yet-to-run
for test in tests
haskey(running_tests, test) && continue
haskey(running_snapshot, test) && continue
# Test is in any completed test
any(r -> test == r.test, results) && continue
any(r -> test == r.test, results_snapshot) && continue
est_remaining += get(historical_durations, test, est_per_test)
end

Expand Down Expand Up @@ -1112,7 +1129,9 @@ function runtests(mod::Module, args::ParsedArgs;
end
isa(ex, InterruptException) || rethrow()
finally
if isempty(running_tests) && length(results) >= length(tests)
n_running = @lock running_tests length(running_tests[])
n_results = @lock results length(results[])
if n_running == 0 && n_results >= length(tests)
# XXX: only erase the status if we completed successfully.
# in other cases we'll have printed "caught interrupt"
clear_status()
Expand All @@ -1138,9 +1157,9 @@ function runtests(mod::Module, args::ParsedArgs;

done && return

test_t0 = Base.@lock test_lock begin
test_t0 = @lock running_tests begin
test_t0 = time()
running_tests[test] = test_t0
running_tests[][test] = test_t0
end

# pass in init_worker_code to custom worker function if defined
Expand Down Expand Up @@ -1175,8 +1194,8 @@ function runtests(mod::Module, args::ParsedArgs;
ex
end
test_t1 = time()
output = Base.@lock wrkr.io_lock String(take!(wrkr.io))
Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1))
output = @lock wrkr.io String(take!(wrkr.io[]))
@lock results push!(results[], (; test, result, output, test_t0, test_t1))

# act on the results
if result isa AbstractTestRecord
Expand Down Expand Up @@ -1209,8 +1228,8 @@ function runtests(mod::Module, args::ParsedArgs;
Malt.stop(wrkr)
end

Base.@lock test_lock begin
delete!(running_tests, test)
@lock running_tests begin
delete!(running_tests[], test)
end
catch ex
isa(ex, InterruptException) || rethrow()
Expand Down Expand Up @@ -1266,7 +1285,7 @@ function runtests(mod::Module, args::ParsedArgs;
end

# print the output generated by each testset
for (testname, result, output, _start, _stop) in results
for (testname, result, output, _start, _stop) in @lock(results, results[])
if !isempty(output)
print(io_ctx.stdout, "\nOutput generated during execution of '")
if result isa Exception || anynonpass(result[])
Expand Down Expand Up @@ -1324,7 +1343,7 @@ function runtests(mod::Module, args::ParsedArgs;
function collect_results()
with_testset(o_ts) do
completed_tests = Set{String}()
for (testname, result, _output, start, stop) in results
for (testname, result, _output, start, stop) in @lock(results, results[])
push!(completed_tests, testname)

if result isa AbstractTestRecord
Expand Down
Loading