diff --git a/Project.toml b/Project.toml index bf694b7..053d7fe 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ParallelTestRunner" uuid = "d3525ed8-44d0-4b2c-a655-542cee43accc" authors = ["Valentin Churavy "] -version = "2.6.0" +version = "2.6.1" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index e8c97f7..fad30c3 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -23,23 +23,39 @@ function anynonpass(ts::Test.AbstractTestSet) end end +# Thin compatibility shim for using `Lockable` also in Julia v1.10 +if VERSION >= v"1.11.0-DEV.1568" + const Lockable = Base.Lockable +else + # Adapted from . + struct Lockable{T, L <: Base.AbstractLock} + value::T + lock::L + end + + Lockable(value) = Lockable(value, ReentrantLock()) + Base.getindex(l::Lockable) = (Base.assert_havelock(l.lock); l.value) + + Base.lock(l::Lockable) = Base.lock(l.lock) + Base.trylock(l::Lockable) = Base.trylock(l.lock) + Base.unlock(l::Lockable) = Base.unlock(l.lock) +end + const ID_COUNTER = Threads.Atomic{Int}(0) # Thin wrapper around Malt.Worker, to handle the stdio loop differently. struct PTRWorker <: Malt.AbstractWorker w::Malt.Worker - io::IOBuffer - io_lock::ReentrantLock + io::Lockable{IOBuffer, ReentrantLock} id::Int end function PTRWorker(; exename=Base.julia_cmd()[1], exeflags=String[], env=String[]) - io = IOBuffer() - io_lock = ReentrantLock() + io = Lockable(IOBuffer()) wrkr = Malt.Worker(; exename, exeflags, env, monitor_stdout=false, monitor_stderr=false) - stdio_loop(wrkr, io, io_lock) + stdio_loop(wrkr, io) id = ID_COUNTER[] += 1 - return PTRWorker(wrkr, io, io_lock, id) + return PTRWorker(wrkr, io, id) end worker_id(wrkr::PTRWorker) = wrkr.id @@ -293,11 +309,11 @@ function print_test_crashed(::Type{<:AbstractTestRecord}, wrkr, test, ctx::TestI end # Adapted from `Malt._stdio_loop` -function stdio_loop(worker::Malt.Worker, io, io_lock::ReentrantLock) +function stdio_loop(worker::Malt.Worker, io::Lockable) Threads.@spawn while !eof(worker.stdout) && Malt.isrunning(worker) try bytes = readavailable(worker.stdout) - @lock io_lock write(io, bytes) + @lock io write(io[], bytes) catch break end @@ -305,7 +321,7 @@ function stdio_loop(worker::Malt.Worker, io, io_lock::ReentrantLock) Threads.@spawn while !eof(worker.stderr) && Malt.isrunning(worker) try bytes = readavailable(worker.stderr) - @lock io_lock write(io, bytes) + @lock io write(io[], bytes) catch break end @@ -933,10 +949,8 @@ function runtests(mod::Module, args::ParsedArgs; end t0 = time() - results = [] - running_tests = Dict{String, Float64}() # test => start_time - test_lock = ReentrantLock() # to protect crucial access to tests and running_tests - results_lock = ReentrantLock() # to protect concurrent access to results + results = Lockable([]) + running_tests = Lockable(Dict{String, Float64}()) # test => start_time worker_tasks = Task[] @@ -989,16 +1003,19 @@ function runtests(mod::Module, args::ParsedArgs; end function update_status() - # only draw if we have something to show - isempty(running_tests) && return - completed = Base.@lock results_lock length(results) + # take consistent snapshots once, so the rest of this function operates on + # frozen data rather than racing with workers that mutate these collections + running_snapshot = @lock running_tests copy(running_tests[]) + isempty(running_snapshot) && return + results_snapshot = @lock results copy(results[]) + completed = length(results_snapshot) total = length(tests) # line 1: empty line line1 = "" # line 2: running tests - test_list = sort(collect(keys(running_tests)), by = x -> running_tests[x]) + test_list = sort(collect(keys(running_snapshot)), by = x -> running_snapshot[x]) status_parts = map(test_list) do test "$test" end @@ -1013,23 +1030,23 @@ function runtests(mod::Module, args::ParsedArgs; line3 = "Progress: $completed/$total tests completed" if completed > 0 # estimate per-test time (slightly pessimistic) - durations_done = Base.@lock results_lock [end_time - start_time for (_, _,_, start_time, end_time) in results] + durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results_snapshot] μ = mean(durations_done) σ = length(durations_done) > 1 ? std(durations_done) : 0.0 est_per_test = μ + 0.5σ est_remaining = 0.0 ## currently-running - for (test, start_time) in running_tests + for (test, start_time) in running_snapshot elapsed = time() - start_time duration = get(historical_durations, test, est_per_test) est_remaining += max(0.0, duration - elapsed) end ## yet-to-run for test in tests - haskey(running_tests, test) && continue + haskey(running_snapshot, test) && continue # Test is in any completed test - any(r -> test == r.test, results) && continue + any(r -> test == r.test, results_snapshot) && continue est_remaining += get(historical_durations, test, est_per_test) end @@ -1112,7 +1129,9 @@ function runtests(mod::Module, args::ParsedArgs; end isa(ex, InterruptException) || rethrow() finally - if isempty(running_tests) && length(results) >= length(tests) + n_running = @lock running_tests length(running_tests[]) + n_results = @lock results length(results[]) + if n_running == 0 && n_results >= length(tests) # XXX: only erase the status if we completed successfully. # in other cases we'll have printed "caught interrupt" clear_status() @@ -1138,9 +1157,9 @@ function runtests(mod::Module, args::ParsedArgs; done && return - test_t0 = Base.@lock test_lock begin + test_t0 = @lock running_tests begin test_t0 = time() - running_tests[test] = test_t0 + running_tests[][test] = test_t0 end # pass in init_worker_code to custom worker function if defined @@ -1175,8 +1194,8 @@ function runtests(mod::Module, args::ParsedArgs; ex end test_t1 = time() - output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) - Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) + output = @lock wrkr.io String(take!(wrkr.io[])) + @lock results push!(results[], (; test, result, output, test_t0, test_t1)) # act on the results if result isa AbstractTestRecord @@ -1209,8 +1228,8 @@ function runtests(mod::Module, args::ParsedArgs; Malt.stop(wrkr) end - Base.@lock test_lock begin - delete!(running_tests, test) + @lock running_tests begin + delete!(running_tests[], test) end catch ex isa(ex, InterruptException) || rethrow() @@ -1266,7 +1285,7 @@ function runtests(mod::Module, args::ParsedArgs; end # print the output generated by each testset - for (testname, result, output, _start, _stop) in results + for (testname, result, output, _start, _stop) in @lock(results, results[]) if !isempty(output) print(io_ctx.stdout, "\nOutput generated during execution of '") if result isa Exception || anynonpass(result[]) @@ -1324,7 +1343,7 @@ function runtests(mod::Module, args::ParsedArgs; function collect_results() with_testset(o_ts) do completed_tests = Set{String}() - for (testname, result, _output, start, stop) in results + for (testname, result, _output, start, stop) in @lock(results, results[]) push!(completed_tests, testname) if result isa AbstractTestRecord