From ab18c480b4ee13bd123d50e421bf6c592c44a01a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 1 May 2026 16:33:32 +0100 Subject: [PATCH 1/5] Add more locks in `update_status` --- Project.toml | 2 +- src/ParallelTestRunner.jl | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/Project.toml b/Project.toml index bf694b7..053d7fe 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ParallelTestRunner" uuid = "d3525ed8-44d0-4b2c-a655-542cee43accc" authors = ["Valentin Churavy "] -version = "2.6.0" +version = "2.6.1" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index e8c97f7..d7f1ab2 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -989,8 +989,10 @@ function runtests(mod::Module, args::ParsedArgs; end function update_status() + _running_tests = Base.@lock test_lock copy(running_tests) + # only draw if we have something to show - isempty(running_tests) && return + Base.@lock(test_lock, isempty(_running_tests)) && return completed = Base.@lock results_lock length(results) total = length(tests) @@ -998,7 +1000,7 @@ function runtests(mod::Module, args::ParsedArgs; line1 = "" # line 2: running tests - test_list = sort(collect(keys(running_tests)), by = x -> running_tests[x]) + test_list = sort(collect(keys(_running_tests)), by = x -> _running_tests[x]) status_parts = map(test_list) do test "$test" end @@ -1020,16 +1022,16 @@ function runtests(mod::Module, args::ParsedArgs; est_remaining = 0.0 ## currently-running - for (test, start_time) in running_tests + for (test, start_time) in _running_tests elapsed = time() - start_time duration = get(historical_durations, test, est_per_test) est_remaining += max(0.0, duration - elapsed) end ## yet-to-run for test in tests - haskey(running_tests, test) && continue + haskey(_running_tests, test) && continue # Test is in any completed test - any(r -> test == r.test, results) && continue + Base.@lock(results_lock, any(r -> test == r.test, results)) && continue est_remaining += get(historical_durations, test, est_per_test) end From dddb1e3e95c843cdb4f2acfdbb14b5131e96fbb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 1 May 2026 17:14:51 +0100 Subject: [PATCH 2/5] Use `Base.Lockable` for `tests`, `running_tests` and `results` Add a simple compatibility shim for Julia v1.10 which doesn't have `Base.Lockable`. --- src/ParallelTestRunner.jl | 76 +++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index d7f1ab2..ea8b198 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -23,6 +23,24 @@ function anynonpass(ts::Test.AbstractTestSet) end end +# Thin compatibility shim for using `Lockable` also in Julia v1.10 +if VERSION >= v"1.11.0-DEV.1568" + const Lockable = Base.Lockable +else + # Adapted from . + struct Lockable{T, L <: Base.AbstractLock} + value::T + lock::L + end + + Lockable(value) = Lockable(value, ReentrantLock()) + Base.getindex(l::Lockable) = (Base.assert_havelock(l.lock); l.value) + + Base.lock(l::Lockable) = Base.lock(l.lock) + Base.trylock(l::Lockable) = Base.trylock(l.lock) + Base.unlock(l::Lockable) = Base.unlock(l.lock) +end + const ID_COUNTER = Threads.Atomic{Int}(0) # Thin wrapper around Malt.Worker, to handle the stdio loop differently. @@ -916,15 +934,16 @@ function runtests(mod::Module, args::ParsedArgs; filter_tests!(testsuite, args) # determine test order - tests = collect(keys(testsuite)) - Random.shuffle!(tests) + test_lock = ReentrantLock() # to protect crucial access to tests and running_tests + tests = Lockable(collect(keys(testsuite)), test_lock) + Random.shuffle!(@lock test_lock tests[]) historical_durations = load_test_history(mod) - sort!(tests, by = x -> -get(historical_durations, x, Inf)) + sort!(@lock(test_lock, tests[]), by = x -> -get(historical_durations, x, Inf)) # determine parallelism jobs = something(args.jobs, default_njobs()) - jobs = clamp(jobs, 1, length(tests)) - println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") + jobs = clamp(jobs, 1, length(@lock(test_lock, tests[]))) + println(stdout, "Running $(length(@lock(test_lock, tests[]))) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") sem = Base.Semaphore(max(1, jobs)) worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) @@ -933,10 +952,9 @@ function runtests(mod::Module, args::ParsedArgs; end t0 = time() - results = [] - running_tests = Dict{String, Float64}() # test => start_time - test_lock = ReentrantLock() # to protect crucial access to tests and running_tests results_lock = ReentrantLock() # to protect concurrent access to results + results = Lockable([], results_lock) + running_tests = Lockable(Dict{String, Float64}(), test_lock) # test => start_time worker_tasks = Task[] @@ -960,10 +978,10 @@ function runtests(mod::Module, args::ParsedArgs; # pretty print information about gc and mem usage testgroupheader = "Test" workerheader = "(Worker)" - name_align = maximum( + name_align = @lock test_lock maximum( [ textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader); - map(x -> textwidth(x) + 5, tests) + map(x -> textwidth(x) + 5, tests[]) ] ) @@ -989,18 +1007,16 @@ function runtests(mod::Module, args::ParsedArgs; end function update_status() - _running_tests = Base.@lock test_lock copy(running_tests) - # only draw if we have something to show - Base.@lock(test_lock, isempty(_running_tests)) && return - completed = Base.@lock results_lock length(results) - total = length(tests) + Base.@lock(test_lock, isempty(running_tests[])) && return + completed = Base.@lock results_lock length(results[]) + total = @lock test_lock length(tests[]) # line 1: empty line line1 = "" # line 2: running tests - test_list = sort(collect(keys(_running_tests)), by = x -> _running_tests[x]) + test_list = @lock test_lock sort(collect(keys(running_tests[])), by = x -> running_tests[][x]) status_parts = map(test_list) do test "$test" end @@ -1015,23 +1031,23 @@ function runtests(mod::Module, args::ParsedArgs; line3 = "Progress: $completed/$total tests completed" if completed > 0 # estimate per-test time (slightly pessimistic) - durations_done = Base.@lock results_lock [end_time - start_time for (_, _,_, start_time, end_time) in results] + durations_done = Base.@lock results_lock [end_time - start_time for (_, _,_, start_time, end_time) in results[]] μ = mean(durations_done) σ = length(durations_done) > 1 ? std(durations_done) : 0.0 est_per_test = μ + 0.5σ est_remaining = 0.0 ## currently-running - for (test, start_time) in _running_tests + for (test, start_time) in @lock(test_lock, running_tests[]) elapsed = time() - start_time duration = get(historical_durations, test, est_per_test) est_remaining += max(0.0, duration - elapsed) end ## yet-to-run - for test in tests - haskey(_running_tests, test) && continue + for test in @lock(test_lock, tests[]) + @lock(test_lock, haskey(running_tests[], test)) && continue # Test is in any completed test - Base.@lock(results_lock, any(r -> test == r.test, results)) && continue + Base.@lock(results_lock, any(r -> test == r.test, results[])) && continue est_remaining += get(historical_durations, test, est_per_test) end @@ -1114,7 +1130,7 @@ function runtests(mod::Module, args::ParsedArgs; end isa(ex, InterruptException) || rethrow() finally - if isempty(running_tests) && length(results) >= length(tests) + if @lock test_lock @lock results_lock isempty(running_tests[]) && length(results[]) >= length(tests[]) # XXX: only erase the status if we completed successfully. # in other cases we'll have printed "caught interrupt" clear_status() @@ -1126,9 +1142,9 @@ function runtests(mod::Module, args::ParsedArgs; # execution # - tests_to_start = Threads.Atomic{Int}(length(tests)) + tests_to_start = @lock test_lock Threads.Atomic{Int}(length(tests[])) try - @sync for test in tests + @sync for test in @lock(test_lock, tests[]) push!(worker_tasks, Threads.@spawn begin local p = nothing acquired = false @@ -1142,7 +1158,7 @@ function runtests(mod::Module, args::ParsedArgs; test_t0 = Base.@lock test_lock begin test_t0 = time() - running_tests[test] = test_t0 + running_tests[][test] = test_t0 end # pass in init_worker_code to custom worker function if defined @@ -1178,7 +1194,7 @@ function runtests(mod::Module, args::ParsedArgs; end test_t1 = time() output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) - Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) + Base.@lock results_lock push!(results[], (; test, result, output, test_t0, test_t1)) # act on the results if result isa AbstractTestRecord @@ -1212,7 +1228,7 @@ function runtests(mod::Module, args::ParsedArgs; end Base.@lock test_lock begin - delete!(running_tests, test) + delete!(running_tests[], test) end catch ex isa(ex, InterruptException) || rethrow() @@ -1268,7 +1284,7 @@ function runtests(mod::Module, args::ParsedArgs; end # print the output generated by each testset - for (testname, result, output, _start, _stop) in results + for (testname, result, output, _start, _stop) in @lock(results_lock, results[]) if !isempty(output) print(io_ctx.stdout, "\nOutput generated during execution of '") if result isa Exception || anynonpass(result[]) @@ -1326,7 +1342,7 @@ function runtests(mod::Module, args::ParsedArgs; function collect_results() with_testset(o_ts) do completed_tests = Set{String}() - for (testname, result, _output, start, stop) in results + for (testname, result, _output, start, stop) in @lock(results_lock, results[]) push!(completed_tests, testname) if result isa AbstractTestRecord @@ -1348,7 +1364,7 @@ function runtests(mod::Module, args::ParsedArgs; end # mark remaining or running tests as interrupted - for test in tests + for test in @lock(test_lock, tests[]) (test in completed_tests) && continue testset = create_testset(test) Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1))) From 6867047e87b2d1f1266f9435c0f87bb26c41ea3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 1 May 2026 17:24:19 +0100 Subject: [PATCH 3/5] Simplify `@lock` syntax We don't need to refer to the lock specifically, we can simplify refer to the lockable object. This also means we don't need to keep the lock objects around. --- src/ParallelTestRunner.jl | 54 +++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index ea8b198..c4376f5 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -934,16 +934,15 @@ function runtests(mod::Module, args::ParsedArgs; filter_tests!(testsuite, args) # determine test order - test_lock = ReentrantLock() # to protect crucial access to tests and running_tests - tests = Lockable(collect(keys(testsuite)), test_lock) - Random.shuffle!(@lock test_lock tests[]) + tests = Lockable(collect(keys(testsuite))) + @lock tests Random.shuffle!(tests[]) historical_durations = load_test_history(mod) - sort!(@lock(test_lock, tests[]), by = x -> -get(historical_durations, x, Inf)) + @lock tests sort!(tests[], by = x -> -get(historical_durations, x, Inf)) # determine parallelism jobs = something(args.jobs, default_njobs()) - jobs = clamp(jobs, 1, length(@lock(test_lock, tests[]))) - println(stdout, "Running $(length(@lock(test_lock, tests[]))) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") + jobs = @lock tests clamp(jobs, 1, length(tests[])) + @lock tests println(stdout, "Running $(length(tests[])) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") sem = Base.Semaphore(max(1, jobs)) worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) @@ -952,9 +951,8 @@ function runtests(mod::Module, args::ParsedArgs; end t0 = time() - results_lock = ReentrantLock() # to protect concurrent access to results - results = Lockable([], results_lock) - running_tests = Lockable(Dict{String, Float64}(), test_lock) # test => start_time + results = Lockable([]) + running_tests = Lockable(Dict{String, Float64}()) # test => start_time worker_tasks = Task[] @@ -978,7 +976,7 @@ function runtests(mod::Module, args::ParsedArgs; # pretty print information about gc and mem usage testgroupheader = "Test" workerheader = "(Worker)" - name_align = @lock test_lock maximum( + name_align = @lock tests maximum( [ textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader); map(x -> textwidth(x) + 5, tests[]) @@ -1008,15 +1006,15 @@ function runtests(mod::Module, args::ParsedArgs; function update_status() # only draw if we have something to show - Base.@lock(test_lock, isempty(running_tests[])) && return - completed = Base.@lock results_lock length(results[]) - total = @lock test_lock length(tests[]) + @lock(running_tests, isempty(running_tests[])) && return + completed = @lock results length(results[]) + total = @lock tests length(tests[]) # line 1: empty line line1 = "" # line 2: running tests - test_list = @lock test_lock sort(collect(keys(running_tests[])), by = x -> running_tests[][x]) + test_list = @lock running_tests sort(collect(keys(running_tests[])), by = x -> running_tests[][x]) status_parts = map(test_list) do test "$test" end @@ -1031,23 +1029,23 @@ function runtests(mod::Module, args::ParsedArgs; line3 = "Progress: $completed/$total tests completed" if completed > 0 # estimate per-test time (slightly pessimistic) - durations_done = Base.@lock results_lock [end_time - start_time for (_, _,_, start_time, end_time) in results[]] + durations_done = @lock results [end_time - start_time for (_, _,_, start_time, end_time) in results[]] μ = mean(durations_done) σ = length(durations_done) > 1 ? std(durations_done) : 0.0 est_per_test = μ + 0.5σ est_remaining = 0.0 ## currently-running - for (test, start_time) in @lock(test_lock, running_tests[]) + for (test, start_time) in @lock(running_tests, running_tests[]) elapsed = time() - start_time duration = get(historical_durations, test, est_per_test) est_remaining += max(0.0, duration - elapsed) end ## yet-to-run - for test in @lock(test_lock, tests[]) - @lock(test_lock, haskey(running_tests[], test)) && continue + for test in @lock(tests, tests[]) + @lock(running_tests, haskey(running_tests[], test)) && continue # Test is in any completed test - Base.@lock(results_lock, any(r -> test == r.test, results[])) && continue + @lock(results, any(r -> test == r.test, results[])) && continue est_remaining += get(historical_durations, test, est_per_test) end @@ -1130,7 +1128,7 @@ function runtests(mod::Module, args::ParsedArgs; end isa(ex, InterruptException) || rethrow() finally - if @lock test_lock @lock results_lock isempty(running_tests[]) && length(results[]) >= length(tests[]) + if @lock running_tests @lock results @lock tests isempty(running_tests[]) && length(results[]) >= length(tests[]) # XXX: only erase the status if we completed successfully. # in other cases we'll have printed "caught interrupt" clear_status() @@ -1142,9 +1140,9 @@ function runtests(mod::Module, args::ParsedArgs; # execution # - tests_to_start = @lock test_lock Threads.Atomic{Int}(length(tests[])) + tests_to_start = @lock tests Threads.Atomic{Int}(length(tests[])) try - @sync for test in @lock(test_lock, tests[]) + @sync for test in @lock(tests, tests[]) push!(worker_tasks, Threads.@spawn begin local p = nothing acquired = false @@ -1156,7 +1154,7 @@ function runtests(mod::Module, args::ParsedArgs; done && return - test_t0 = Base.@lock test_lock begin + test_t0 = @lock running_tests begin test_t0 = time() running_tests[][test] = test_t0 end @@ -1194,7 +1192,7 @@ function runtests(mod::Module, args::ParsedArgs; end test_t1 = time() output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) - Base.@lock results_lock push!(results[], (; test, result, output, test_t0, test_t1)) + @lock results push!(results[], (; test, result, output, test_t0, test_t1)) # act on the results if result isa AbstractTestRecord @@ -1227,7 +1225,7 @@ function runtests(mod::Module, args::ParsedArgs; Malt.stop(wrkr) end - Base.@lock test_lock begin + @lock running_tests begin delete!(running_tests[], test) end catch ex @@ -1284,7 +1282,7 @@ function runtests(mod::Module, args::ParsedArgs; end # print the output generated by each testset - for (testname, result, output, _start, _stop) in @lock(results_lock, results[]) + for (testname, result, output, _start, _stop) in @lock(results, results[]) if !isempty(output) print(io_ctx.stdout, "\nOutput generated during execution of '") if result isa Exception || anynonpass(result[]) @@ -1342,7 +1340,7 @@ function runtests(mod::Module, args::ParsedArgs; function collect_results() with_testset(o_ts) do completed_tests = Set{String}() - for (testname, result, _output, start, stop) in @lock(results_lock, results[]) + for (testname, result, _output, start, stop) in @lock(results, results[]) push!(completed_tests, testname) if result isa AbstractTestRecord @@ -1364,7 +1362,7 @@ function runtests(mod::Module, args::ParsedArgs; end # mark remaining or running tests as interrupted - for test in @lock(test_lock, tests[]) + for test in @lock(tests, tests[]) (test in completed_tests) && continue testset = create_testset(test) Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1))) From 3d77459f5b60be9ce344df3917579aed747f82a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 1 May 2026 17:42:16 +0100 Subject: [PATCH 4/5] Use `Lockable` also for `PTRWorker` --- src/ParallelTestRunner.jl | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index c4376f5..53cfbaa 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -46,18 +46,16 @@ const ID_COUNTER = Threads.Atomic{Int}(0) # Thin wrapper around Malt.Worker, to handle the stdio loop differently. struct PTRWorker <: Malt.AbstractWorker w::Malt.Worker - io::IOBuffer - io_lock::ReentrantLock + io::Lockable{IOBuffer, ReentrantLock} id::Int end function PTRWorker(; exename=Base.julia_cmd()[1], exeflags=String[], env=String[]) - io = IOBuffer() - io_lock = ReentrantLock() + io = Lockable(IOBuffer()) wrkr = Malt.Worker(; exename, exeflags, env, monitor_stdout=false, monitor_stderr=false) - stdio_loop(wrkr, io, io_lock) + stdio_loop(wrkr, io) id = ID_COUNTER[] += 1 - return PTRWorker(wrkr, io, io_lock, id) + return PTRWorker(wrkr, io, id) end worker_id(wrkr::PTRWorker) = wrkr.id @@ -311,11 +309,11 @@ function print_test_crashed(::Type{<:AbstractTestRecord}, wrkr, test, ctx::TestI end # Adapted from `Malt._stdio_loop` -function stdio_loop(worker::Malt.Worker, io, io_lock::ReentrantLock) +function stdio_loop(worker::Malt.Worker, io::Lockable) Threads.@spawn while !eof(worker.stdout) && Malt.isrunning(worker) try bytes = readavailable(worker.stdout) - @lock io_lock write(io, bytes) + @lock io write(io[], bytes) catch break end @@ -323,7 +321,7 @@ function stdio_loop(worker::Malt.Worker, io, io_lock::ReentrantLock) Threads.@spawn while !eof(worker.stderr) && Malt.isrunning(worker) try bytes = readavailable(worker.stderr) - @lock io_lock write(io, bytes) + @lock io write(io[], bytes) catch break end @@ -1191,7 +1189,7 @@ function runtests(mod::Module, args::ParsedArgs; ex end test_t1 = time() - output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) + output = @lock wrkr.io String(take!(wrkr.io[])) @lock results push!(results[], (; test, result, output, test_t0, test_t1)) # act on the results From 313e27e12006f19b34245abcd3c2f5e2b546f745 Mon Sep 17 00:00:00 2001 From: Tim Besard Date: Tue, 5 May 2026 14:58:00 +0200 Subject: [PATCH 5/5] Simplify locking additions (#133) * Fix update_status race by snapshotting under the lock instead. * Drop unnecessary Lockable wrapping of `tests` It's read-only anyway. --- src/ParallelTestRunner.jl | 47 ++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 53cfbaa..fad30c3 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -932,15 +932,15 @@ function runtests(mod::Module, args::ParsedArgs; filter_tests!(testsuite, args) # determine test order - tests = Lockable(collect(keys(testsuite))) - @lock tests Random.shuffle!(tests[]) + tests = collect(keys(testsuite)) + Random.shuffle!(tests) historical_durations = load_test_history(mod) - @lock tests sort!(tests[], by = x -> -get(historical_durations, x, Inf)) + sort!(tests, by = x -> -get(historical_durations, x, Inf)) # determine parallelism jobs = something(args.jobs, default_njobs()) - jobs = @lock tests clamp(jobs, 1, length(tests[])) - @lock tests println(stdout, "Running $(length(tests[])) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") + jobs = clamp(jobs, 1, length(tests)) + println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") sem = Base.Semaphore(max(1, jobs)) worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) @@ -974,10 +974,10 @@ function runtests(mod::Module, args::ParsedArgs; # pretty print information about gc and mem usage testgroupheader = "Test" workerheader = "(Worker)" - name_align = @lock tests maximum( + name_align = maximum( [ textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader); - map(x -> textwidth(x) + 5, tests[]) + map(x -> textwidth(x) + 5, tests) ] ) @@ -1003,16 +1003,19 @@ function runtests(mod::Module, args::ParsedArgs; end function update_status() - # only draw if we have something to show - @lock(running_tests, isempty(running_tests[])) && return - completed = @lock results length(results[]) - total = @lock tests length(tests[]) + # take consistent snapshots once, so the rest of this function operates on + # frozen data rather than racing with workers that mutate these collections + running_snapshot = @lock running_tests copy(running_tests[]) + isempty(running_snapshot) && return + results_snapshot = @lock results copy(results[]) + completed = length(results_snapshot) + total = length(tests) # line 1: empty line line1 = "" # line 2: running tests - test_list = @lock running_tests sort(collect(keys(running_tests[])), by = x -> running_tests[][x]) + test_list = sort(collect(keys(running_snapshot)), by = x -> running_snapshot[x]) status_parts = map(test_list) do test "$test" end @@ -1027,23 +1030,23 @@ function runtests(mod::Module, args::ParsedArgs; line3 = "Progress: $completed/$total tests completed" if completed > 0 # estimate per-test time (slightly pessimistic) - durations_done = @lock results [end_time - start_time for (_, _,_, start_time, end_time) in results[]] + durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results_snapshot] μ = mean(durations_done) σ = length(durations_done) > 1 ? std(durations_done) : 0.0 est_per_test = μ + 0.5σ est_remaining = 0.0 ## currently-running - for (test, start_time) in @lock(running_tests, running_tests[]) + for (test, start_time) in running_snapshot elapsed = time() - start_time duration = get(historical_durations, test, est_per_test) est_remaining += max(0.0, duration - elapsed) end ## yet-to-run - for test in @lock(tests, tests[]) - @lock(running_tests, haskey(running_tests[], test)) && continue + for test in tests + haskey(running_snapshot, test) && continue # Test is in any completed test - @lock(results, any(r -> test == r.test, results[])) && continue + any(r -> test == r.test, results_snapshot) && continue est_remaining += get(historical_durations, test, est_per_test) end @@ -1126,7 +1129,9 @@ function runtests(mod::Module, args::ParsedArgs; end isa(ex, InterruptException) || rethrow() finally - if @lock running_tests @lock results @lock tests isempty(running_tests[]) && length(results[]) >= length(tests[]) + n_running = @lock running_tests length(running_tests[]) + n_results = @lock results length(results[]) + if n_running == 0 && n_results >= length(tests) # XXX: only erase the status if we completed successfully. # in other cases we'll have printed "caught interrupt" clear_status() @@ -1138,9 +1143,9 @@ function runtests(mod::Module, args::ParsedArgs; # execution # - tests_to_start = @lock tests Threads.Atomic{Int}(length(tests[])) + tests_to_start = Threads.Atomic{Int}(length(tests)) try - @sync for test in @lock(tests, tests[]) + @sync for test in tests push!(worker_tasks, Threads.@spawn begin local p = nothing acquired = false @@ -1360,7 +1365,7 @@ function runtests(mod::Module, args::ParsedArgs; end # mark remaining or running tests as interrupted - for test in @lock(tests, tests[]) + for test in tests (test in completed_tests) && continue testset = create_testset(test) Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1)))