Skip to content

Commit 9950b4a

Browse files
He-PinCopilot
andauthored
Fix Java 25 stream test timeout failures (#2573) (#2728)
Motivation: JDK 25 ForkJoinPool scheduling changes cause nightly CI stream tests to fail with timeouts — ~100+ failures per night. Modifications: - Increase timefactor from 2 to 3 for JDK 25+ in nightly CI workflow - Add 30s PatienceConfig to FlowFlatMapConcatParallelismSpec (100K elements) - Add 30s PatienceConfig to HubSpec (20K element long-stream tests) - Increase MapAsyncPartitionedSpec patience from 5s to 15s - Increase AggregateWithBoundarySpec Await timeouts from 10s to 30s Result: Timing-sensitive stream tests have sufficient timeout headroom on JDK 25, reducing false failures in nightly CI. References: #2573 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 66d7dc1 commit 9950b4a

5 files changed

Lines changed: 31 additions & 8 deletions

File tree

.github/workflows/nightly-builds.yml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,18 @@ jobs:
148148

149149
- name: Compile and Test
150150
# note that this is not running any multi-jvm tests because multi-in-test=false
151+
# JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see #2573)
151152
run: |-
153+
if [ "${{ matrix.javaVersion }}" -ge 25 ]; then
154+
TIMEFACTOR=3
155+
else
156+
TIMEFACTOR=2
157+
fi
152158
sbt \
153159
-Dpekko.cluster.assert=on \
154160
-Dpekko.log.timestamps=true \
155-
-Dpekko.test.timefactor=2 \
156-
-Dpekko.actor.testkit.typed.timefactor=2 \
161+
-Dpekko.test.timefactor=$TIMEFACTOR \
162+
-Dpekko.actor.testkit.typed.timefactor=$TIMEFACTOR \
157163
-Dpekko.test.tags.exclude=gh-exclude,timing \
158164
-Dpekko.test.multi-in-test=false \
159165
-Dio.netty.leakDetection.level=PARANOID \

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
4141
}, harvest = buffer => buffer.toSeq, emitOnTimer = None)
4242
.runWith(Sink.collection)
4343

44-
Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq)
44+
Await.result(result, 30.seconds) should be(stream.grouped(groupSize).toSeq)
4545

4646
}
4747

@@ -58,7 +58,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
5858
emitOnTimer = None)
5959
.runWith(Sink.collection)
6060

61-
Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1))
61+
Await.result(result, 30.seconds) should be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1))
6262

6363
}
6464

@@ -73,7 +73,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
7373
}, harvest = buffer => buffer.toSeq, emitOnTimer = None)
7474
.runWith(Sink.collection)
7575

76-
Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), Seq(7)))
76+
Await.result(result, 30.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), Seq(7)))
7777
}
7878

7979
}
@@ -186,7 +186,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with
186186
p.sendNext(7)
187187
p.sendComplete()
188188

189-
Await.result(result, 10.seconds) should be(Seq(Seq(1, 2), Seq(3, 4), Seq(5, 6, 7)))
189+
Await.result(result, 30.seconds) should be(Seq(Seq(1, 2), Seq(3, 4), Seq(5, 6, 7)))
190190

191191
}
192192

@@ -227,7 +227,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with
227227
p.sendNext(7)
228228
p.sendComplete()
229229

230-
Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6, 7)))
230+
Await.result(result, 30.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6, 7)))
231231

232232
}
233233

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,17 @@ import pekko.stream._
3333
import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
3434
import pekko.stream.testkit.scaladsl.TestSink
3535

36+
import org.scalatest.time.{ Seconds, Span }
37+
3638
class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
3739
pekko.stream.materializer.initial-input-buffer-size = 2
3840
""") with ScriptedTest with FutureTimeoutSupport {
41+
42+
// 100K-element tests need extra headroom, especially on JDK 25+ where
43+
// ForkJoinPool scheduling changes slow down highly-parallel workloads (#2573)
44+
override implicit val patience: PatienceConfig =
45+
PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
46+
3947
val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
4048

4149
class BoomException extends RuntimeException("BOOM~~") with NoStackTrace

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,16 @@ import pekko.stream.testkit.scaladsl.TestSink
3030
import pekko.stream.testkit.scaladsl.TestSource
3131
import pekko.testkit.EventFilter
3232

33+
import org.scalatest.time.{ Seconds, Span }
34+
3335
class HubSpec extends StreamSpec {
3436
implicit val ec: ExecutionContext = system.dispatcher
3537

38+
// Long-stream tests (20K elements) need extra headroom on JDK 25+
39+
// where ForkJoinPool scheduling changes cause slower throughput (#2573)
40+
override implicit val patience: PatienceConfig =
41+
PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
42+
3643
"MergeHub" must {
3744

3845
"work in the happy case" in {

stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,10 @@ class MapAsyncPartitionedSpec
107107

108108
import MapAsyncPartitionedSpec.TestData._
109109

110+
// Property-based tests with blocking operations need extra headroom,
111+
// especially on JDK 25+ with ForkJoinPool scheduling changes (#2573)
110112
override implicit def patienceConfig: PatienceConfig = PatienceConfig(
111-
timeout = 5.seconds,
113+
timeout = 15.seconds,
112114
interval = 100.millis)
113115

114116
private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "test-system")

0 commit comments

Comments
 (0)