From 6c997e0722ad3dde583833376da02b5bc0b91b95 Mon Sep 17 00:00:00 2001 From: Purushottam Sinha Date: Sat, 20 Jun 2026 19:16:50 +0530 Subject: [PATCH] [FLINK-39770][tests][JUnit5 migration] Module: flink-state-processing-api --- .../state/api/SavepointDeepCopyTest.java | 49 ++++-------- .../state/api/SavepointReaderITTestBase.java | 63 ++++++++------- .../api/SavepointReaderKeyedStateITCase.java | 10 +-- .../api/SavepointWindowReaderITCase.java | 77 ++++++------------- .../state/api/SavepointWriterITCase.java | 30 +++----- .../api/SavepointWriterWindowITCase.java | 59 +++++++------- .../api/StateBootstrapTransformationTest.java | 44 ++++------- .../input/BroadcastStateInputFormatTest.java | 12 +-- .../api/input/BufferingCollectorTest.java | 20 ++--- .../api/input/KeyedStateInputFormatTest.java | 32 ++------ .../api/input/ListStateInputFormatTest.java | 15 ++-- .../api/input/MultiStateKeyIteratorTest.java | 25 +++--- .../StreamOperatorContextBuilderTest.java | 13 ++-- .../api/input/UnionStateInputFormatTest.java | 15 ++-- .../state/api/input/WindowReaderTest.java | 32 ++++---- .../KeyedStateBootstrapOperatorTest.java | 30 ++++---- .../api/output/SavepointOutputFormatTest.java | 43 +++++------ .../state/api/output/SnapshotUtilsTest.java | 25 +++--- .../api/runtime/OperatorIDGeneratorTest.java | 11 +-- .../state/api/utils/SavepointTestBase.java | 31 +++++--- 20 files changed, 273 insertions(+), 363 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java index 144aeb9d1c6dab..eeb1d0f475310b 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java @@ -32,15 +32,16 @@ import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.AbstractID; import org.apache.flink.util.Collector; import org.apache.commons.lang3.RandomStringUtils; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.File; import java.io.IOException; @@ -54,26 +55,20 @@ import java.util.stream.Stream; import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; -import static org.hamcrest.Matchers.everyItem; -import static org.hamcrest.Matchers.isIn; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Test the savepoint deep copy. */ -@RunWith(value = Parameterized.class) -public class SavepointDeepCopyTest extends AbstractTestBaseJUnit4 { +@ExtendWith(ParameterizedTestExtension.class) +class SavepointDeepCopyTest extends AbstractTestBase { private static final MemorySize FILE_STATE_SIZE_THRESHOLD = new MemorySize(1); private static final String TEXT = "The quick brown fox jumps over the lazy dog"; private static final String RANDOM_VALUE = RandomStringUtils.randomAlphanumeric(120); - private final StateBackend backend; + @Parameter public StateBackend backend; - public SavepointDeepCopyTest(StateBackend backend) throws Exception { - this.backend = backend; - } - - @Parameterized.Parameters(name = "State Backend: {0}") + @Parameters(name = "State Backend: {0}") public static Collection data() { return Arrays.asList(new HashMapStateBackend(), new EmbeddedRocksDBStateBackend()); } @@ -132,8 +127,8 @@ public void readKey(String key, Context ctx, Collector> o * * @throws Exception throw exceptions when anything goes wrong */ - @Test - public void testSavepointDeepCopy() throws Exception { + @TestTemplate + void testSavepointDeepCopy() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); @@ -156,9 +151,7 @@ public void testSavepointDeepCopy() throws Exception { Set stateFiles1 = getFileNamesInDirectory(Paths.get(savepointPath1)); - Assert.assertTrue( - "Failed to bootstrap savepoint1 with additional state files", - stateFiles1.size() > 1); + assertThat(stateFiles1).hasSizeGreaterThan(1); // create savepoint2 from savepoint1 created above File savepointUrl2 = createAndRegisterTempFile(new AbstractID().toHexString()); @@ -175,14 +168,9 @@ public void testSavepointDeepCopy() throws Exception { Set stateFiles2 = getFileNamesInDirectory(Paths.get(savepointPath1)); - Assert.assertTrue( - "Failed to create savepoint2 from savepoint1 with additional state files", - stateFiles2.size() > 1); + assertThat(stateFiles2).hasSizeGreaterThan(1); - assertThat( - "At least one state file in savepoint1 are not in savepoint2", - stateFiles1, - everyItem(isIn(stateFiles2))); + assertThat(stateFiles1).isSubsetOf(stateFiles2); // Try to fromExistingSavepoint savepoint2 and read the state of "Operator1" (which has not // been @@ -197,10 +185,7 @@ public void testSavepointDeepCopy() throws Exception { .size(); long expectedKeyNum = Arrays.stream(TEXT.split(" ")).distinct().count(); - Assert.assertEquals( - "Unexpected number of keys in the state of Operator1", - expectedKeyNum, - actuallyKeyNum); + assertThat(actuallyKeyNum).isEqualTo(expectedKeyNum); } private static Set getFileNamesInDirectory(Path path) throws IOException { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java index 809dc201be0815..35f5f1b270dc99 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java @@ -30,7 +30,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.io.InputStatus; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -43,7 +43,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.source.AbstractTestSource; import org.apache.flink.test.util.source.SingleSplitEnumerator; import org.apache.flink.test.util.source.TestSourceReader; @@ -51,8 +52,8 @@ import org.apache.flink.util.AbstractID; import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.time.Duration; @@ -65,9 +66,11 @@ import java.util.stream.Collectors; import static org.apache.flink.state.api.utils.SavepointTestBase.waitForAllRunningOrSomeTerminal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** IT case for reading state. */ -public abstract class SavepointReaderITTestBase extends AbstractTestBaseJUnit4 { +public abstract class SavepointReaderITTestBase extends AbstractTestBase { static final String UID = "stateful-operator"; static final String LIST_NAME = "list"; @@ -82,6 +85,13 @@ public abstract class SavepointReaderITTestBase extends AbstractTestBaseJUnit4 { private final MapStateDescriptor broadcast; + private RestClusterClient clusterClient; + + @BeforeEach + void setClusterClient(@InjectClusterClient RestClusterClient clusterClient) { + this.clusterClient = clusterClient; + } + SavepointReaderITTestBase( ListStateDescriptor list, ListStateDescriptor union, @@ -93,7 +103,7 @@ public abstract class SavepointReaderITTestBase extends AbstractTestBaseJUnit4 { } @Test - public void testOperatorStateInputFormat() throws Exception { + void testOperatorStateInputFormat() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); @@ -133,10 +143,7 @@ private void verifyListState(String path, StreamExecutionEnvironment env) throws List listResult = JobResultRetriever.collect(readListState(savepoint)); listResult.sort(Comparator.naturalOrder()); - Assert.assertEquals( - "Unexpected elements read from list state", - SavepointSource.getElements(), - listResult); + assertThat(listResult).isEqualTo(SavepointSource.getElements()); } private void verifyUnionState(String path, StreamExecutionEnvironment env) throws Exception { @@ -144,10 +151,7 @@ private void verifyUnionState(String path, StreamExecutionEnvironment env) throw List unionResult = JobResultRetriever.collect(readUnionState(savepoint)); unionResult.sort(Comparator.naturalOrder()); - Assert.assertEquals( - "Unexpected elements read from union state", - SavepointSource.getElements(), - unionResult); + assertThat(unionResult).isEqualTo(SavepointSource.getElements()); } private void verifyBroadcastState(String path, StreamExecutionEnvironment env) @@ -168,24 +172,19 @@ private void verifyBroadcastState(String path, StreamExecutionEnvironment env) .sorted(Comparator.naturalOrder()) .collect(Collectors.toList()); - Assert.assertEquals( - "Unexpected element in broadcast state keys", - SavepointSource.getElements(), - broadcastStateKeys); - - Assert.assertEquals( - "Unexpected element in broadcast state values", - SavepointSource.getElements().stream() - .map(Object::toString) - .sorted() - .collect(Collectors.toList()), - broadcastStateValues); + assertThat(broadcastStateKeys).isEqualTo(SavepointSource.getElements()); + + assertThat(broadcastStateValues) + .isEqualTo( + SavepointSource.getElements().stream() + .map(Object::toString) + .sorted() + .collect(Collectors.toList())); } private String takeSavepoint(JobGraph jobGraph) throws Exception { SavepointSource.initializeForTest(); - ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); JobID jobId = jobGraph.getJobID(); Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5)); @@ -193,9 +192,9 @@ private String takeSavepoint(JobGraph jobGraph) throws Exception { String dirPath = getTempDirPath(new AbstractID().toHexString()); try { - JobID jobID = client.submitJob(jobGraph).get(); + JobID jobID = clusterClient.submitJob(jobGraph).get(); - waitForAllRunningOrSomeTerminal(jobID, MINI_CLUSTER_RESOURCE); + waitForAllRunningOrSomeTerminal(jobID, clusterClient); boolean finished = false; while (deadline.hasTimeLeft()) { if (SavepointSource.isFinished()) { @@ -212,14 +211,14 @@ private String takeSavepoint(JobGraph jobGraph) throws Exception { } if (!finished) { - Assert.fail("Failed to initialize state within deadline"); + fail("Failed to initialize state within deadline"); } CompletableFuture path = - client.triggerSavepoint(jobID, dirPath, SavepointFormatType.CANONICAL); + clusterClient.triggerSavepoint(jobID, dirPath, SavepointFormatType.CANONICAL); return path.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } finally { - client.cancel(jobId).get(); + clusterClient.cancel(jobId).get(); } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java index de3bdcd5f3afab..eed34f1053f6e9 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java @@ -33,8 +33,7 @@ import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; @@ -43,6 +42,8 @@ import java.util.Objects; import java.util.Set; +import static org.assertj.core.api.Assertions.assertThat; + /** IT case for reading state. */ public abstract class SavepointReaderKeyedStateITCase extends SavepointTestBase { @@ -57,7 +58,7 @@ public abstract class SavepointReaderKeyedStateITCase protected abstract Tuple2 getStateBackendTuple(); @Test - public void testUserKeyedStateReader() throws Exception { + void testUserKeyedStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -81,8 +82,7 @@ public void testUserKeyedStateReader() throws Exception { Set expected = new HashSet<>(elements); - Assert.assertEquals( - "Unexpected results from keyed state", expected, new HashSet<>(results)); + assertThat(new HashSet<>(results)).isEqualTo(expected); } private static class KeyedStatefulOperator extends KeyedProcessFunction { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java index 91834123ac6d40..d81ace3e6b5617 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java @@ -47,16 +47,15 @@ import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; import org.apache.flink.util.Collector; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + /** IT Case for reading window operator state. */ -public abstract class SavepointWindowReaderITCase - extends SavepointTestBase { +abstract class SavepointWindowReaderITCase extends SavepointTestBase { private static final String uid = "stateful-operator"; private static final Integer[] numbers = {1, 2, 3}; @@ -64,7 +63,7 @@ public abstract class SavepointWindowReaderITCase protected abstract Tuple2 getStateBackendTuple(); @Test - public void testReduceWindowStateReader() throws Exception { + void testReduceWindowStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -91,14 +90,11 @@ public void testReduceWindowStateReader() throws Exception { .window(TumblingEventTimeWindows.of(Duration.ofMillis(10))) .reduce(uid, new ReduceSum(), Types.INT, Types.INT)); - Assert.assertThat( - "Unexpected results from keyed state", - results, - Matchers.containsInAnyOrder(numbers)); + assertThat(results).containsExactlyInAnyOrder(numbers); } @Test - public void testReduceEvictorWindowStateReader() throws Exception { + void testReduceEvictorWindowStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -127,14 +123,11 @@ public void testReduceEvictorWindowStateReader() throws Exception { .evictor() .reduce(uid, new ReduceSum(), Types.INT, Types.INT)); - Assert.assertThat( - "Unexpected results from keyed state", - results, - Matchers.containsInAnyOrder(numbers)); + assertThat(results).containsExactlyInAnyOrder(numbers); } @Test - public void testAggregateWindowStateReader() throws Exception { + void testAggregateWindowStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -162,14 +155,11 @@ public void testAggregateWindowStateReader() throws Exception { .aggregate( uid, new AggregateSum(), Types.INT, Types.INT, Types.INT)); - Assert.assertThat( - "Unexpected results from keyed state", - results, - Matchers.containsInAnyOrder(numbers)); + assertThat(results).containsExactlyInAnyOrder(numbers); } @Test - public void testAggregateEvictorWindowStateReader() throws Exception { + void testAggregateEvictorWindowStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -199,14 +189,11 @@ public void testAggregateEvictorWindowStateReader() throws Exception { .aggregate( uid, new AggregateSum(), Types.INT, Types.INT, Types.INT)); - Assert.assertThat( - "Unexpected results from keyed state", - results, - Matchers.containsInAnyOrder(numbers)); + assertThat(results).containsExactlyInAnyOrder(numbers); } @Test - public void testProcessWindowStateReader() throws Exception { + void testProcessWindowStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -238,14 +225,11 @@ public void testProcessWindowStateReader() throws Exception { Types.INT, Types.INT)); - Assert.assertThat( - "Unexpected results from keyed state", - results, - Matchers.containsInAnyOrder(numbers)); + assertThat(results).containsExactlyInAnyOrder(numbers); } @Test - public void testProcessEvictorWindowStateReader() throws Exception { + void testProcessEvictorWindowStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -279,14 +263,11 @@ public void testProcessEvictorWindowStateReader() throws Exception { Types.INT, Types.INT)); - Assert.assertThat( - "Unexpected results from keyed state", - results, - Matchers.containsInAnyOrder(numbers)); + assertThat(results).containsExactlyInAnyOrder(numbers); } @Test - public void testApplyWindowStateReader() throws Exception { + void testApplyWindowStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -318,14 +299,11 @@ public void testApplyWindowStateReader() throws Exception { Types.INT, Types.INT)); - Assert.assertThat( - "Unexpected results from keyed state", - results, - Matchers.containsInAnyOrder(numbers)); + assertThat(results).containsExactlyInAnyOrder(numbers); } @Test - public void testApplyEvictorWindowStateReader() throws Exception { + void testApplyEvictorWindowStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -359,14 +337,11 @@ public void testApplyEvictorWindowStateReader() throws Exception { Types.INT, Types.INT)); - Assert.assertThat( - "Unexpected results from keyed state", - results, - Matchers.containsInAnyOrder(numbers)); + assertThat(results).containsExactlyInAnyOrder(numbers); } @Test - public void testWindowTriggerStateReader() throws Exception { + void testWindowTriggerStateReader() throws Exception { Tuple2 backendTuple = getStateBackendTuple(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0); @@ -397,8 +372,7 @@ public void testWindowTriggerStateReader() throws Exception { Types.INT, Types.LONG)); - Assert.assertThat( - "Unexpected results from trigger state", results, Matchers.contains(1L, 1L, 1L)); + assertThat(results).containsExactly(1L, 1L, 1L); } private static class NoOpProcessWindowFunction @@ -433,11 +407,8 @@ public void readWindow( Iterable elements, Collector out) throws Exception { - Assert.assertEquals("Unexpected window", new TimeWindow(0, 10), context.window()); - Assert.assertThat( - "Unexpected registered timers", - context.registeredEventTimeTimers(), - Matchers.contains(9L)); + assertThat(context.window()).isEqualTo(new TimeWindow(0, 10)); + assertThat(context.registeredEventTimeTimers()).containsExactly(9L); out.collect(elements.iterator().next()); } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java index d7a5586b6024f8..a0f7785dd5af4f 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java @@ -47,13 +47,12 @@ import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.AbstractID; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; @@ -64,9 +63,10 @@ import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.within; /** IT test for writing savepoints. */ -public class SavepointWriterITCase extends AbstractTestBaseJUnit4 { +class SavepointWriterITCase extends AbstractTestBase { private static final long CHECKPOINT_ID = 42; @@ -86,25 +86,25 @@ public class SavepointWriterITCase extends AbstractTestBaseJUnit4 { Arrays.asList(new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3)); @Test - public void testDefaultStateBackend() throws Exception { + void testDefaultStateBackend() throws Exception { testStateBootstrapAndModification(new Configuration(), null); } @Test - public void testHashMapStateBackend() throws Exception { + void testHashMapStateBackend() throws Exception { testStateBootstrapAndModification( new Configuration().set(StateBackendOptions.STATE_BACKEND, "hashmap"), new HashMapStateBackend()); } @Test - public void testEmbeddedRocksDBStateBackend() throws Exception { + void testEmbeddedRocksDBStateBackend() throws Exception { testStateBootstrapAndModification( new Configuration().set(StateBackendOptions.STATE_BACKEND, "rocksdb"), new EmbeddedRocksDBStateBackend()); } - public void testStateBootstrapAndModification(Configuration config, StateBackend backend) + void testStateBootstrapAndModification(Configuration config, StateBackend backend) throws Exception { final String savepointPath = getTempDirPath(new AbstractID().toHexString()); @@ -387,14 +387,11 @@ public void initializeState(FunctionInitializationContext context) throws Except expected.add(3); for (Integer number : state.get()) { - Assert.assertTrue("Duplicate state", expected.contains(number)); + assertThat(expected).contains(number); expected.remove(number); } - Assert.assertTrue( - "Failed to bootstrap all state elements: " - + Arrays.toString(expected.toArray()), - expected.isEmpty()); + assertThat(expected).isEmpty(); } } @@ -421,11 +418,8 @@ public static class CurrencyValidationFunction @Override public void processElement(CurrencyRate value, ReadOnlyContext ctx, Collector out) throws Exception { - Assert.assertEquals( - "Incorrect currency rate", - value.rate, - ctx.getBroadcastState(descriptor).get(value.currency), - 0.0001); + assertThat(ctx.getBroadcastState(descriptor).get(value.currency)) + .isCloseTo(value.rate, within(0.0001)); } @Override diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java index 514f9ac7d3e3d6..8fb998680747bf 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java @@ -45,14 +45,16 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.AbstractID; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; import java.util.ArrayList; @@ -65,8 +67,8 @@ /** IT Test for writing savepoints to the {@code WindowOperator}. */ @SuppressWarnings("unchecked") -@RunWith(Parameterized.class) -public class SavepointWriterWindowITCase extends AbstractTestBaseJUnit4 { +@ExtendWith(ParameterizedTestExtension.class) +class SavepointWriterWindowITCase extends AbstractTestBase { private static final String UID = "uid"; @@ -113,7 +115,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBaseJUnit4 { new EmbeddedRocksDBStateBackend(), new Configuration().set(StateBackendOptions.STATE_BACKEND, "rocksdb"))); - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") public static Collection data() { List parameterList = new ArrayList<>(); for (Tuple3 stateBackend : STATE_BACKENDS) { @@ -133,29 +135,24 @@ public static Collection data() { return parameterList; } - private final WindowBootstrap windowBootstrap; + @SuppressWarnings("unused") + @Parameter + public String ignore; - private final WindowStream windowStream; + @Parameter(1) + public WindowBootstrap windowBootstrap; - private final StateBackend stateBackend; + @Parameter(2) + public WindowStream windowStream; - private final Configuration configuration; + @Parameter(3) + public StateBackend stateBackend; - @SuppressWarnings("unused") - public SavepointWriterWindowITCase( - String ignore, - WindowBootstrap windowBootstrap, - WindowStream windowStream, - StateBackend stateBackend, - Configuration configuration) { - this.windowBootstrap = windowBootstrap; - this.windowStream = windowStream; - this.stateBackend = stateBackend; - this.configuration = configuration; - } + @Parameter(4) + public Configuration configuration; - @Test - public void testTumbleWindow() throws Exception { + @TestTemplate + void testTumbleWindow() throws Exception { final String savepointPath = getTempDirPath(new AbstractID().toHexString()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); @@ -198,8 +195,8 @@ public void testTumbleWindow() throws Exception { .containsAll(STANDARD_MATCHER); } - @Test - public void testTumbleWindowWithEvictor() throws Exception { + @TestTemplate + void testTumbleWindowWithEvictor() throws Exception { final String savepointPath = getTempDirPath(new AbstractID().toHexString()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); @@ -244,8 +241,8 @@ public void testTumbleWindowWithEvictor() throws Exception { .containsAll(EVICTOR_MATCHER); } - @Test - public void testSlideWindow() throws Exception { + @TestTemplate + void testSlideWindow() throws Exception { final String savepointPath = getTempDirPath(new AbstractID().toHexString()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); @@ -291,8 +288,8 @@ public void testSlideWindow() throws Exception { .containsAll(STANDARD_MATCHER); } - @Test - public void testSlideWindowWithEvictor() throws Exception { + @TestTemplate + void testSlideWindowWithEvictor() throws Exception { final String savepointPath = getTempDirPath(new AbstractID().toHexString()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java index 5928c809c6dbf2..98d2d4e04c9174 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java @@ -31,16 +31,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.util.AbstractTestBase; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; /** Tests for bootstrap transformations. */ -public class StateBootstrapTransformationTest extends AbstractTestBaseJUnit4 { +class StateBootstrapTransformationTest extends AbstractTestBase { @Test - public void testBroadcastStateTransformationParallelism() { + void testBroadcastStateTransformationParallelism() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(10); @@ -58,14 +59,11 @@ public void testBroadcastStateTransformationParallelism() { new Path(), maxParallelism); - Assert.assertEquals( - "Broadcast transformations should always be run at parallelism 1", - 1, - result.getParallelism()); + assertThat(result.getParallelism()).isOne(); } @Test - public void testDefaultParallelismRespectedWhenLessThanMaxParallelism() { + void testDefaultParallelismRespectedWhenLessThanMaxParallelism() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); @@ -83,14 +81,11 @@ public void testDefaultParallelismRespectedWhenLessThanMaxParallelism() { new Path(), maxParallelism); - Assert.assertEquals( - "The parallelism of a data set should not change when less than the max parallelism of the savepoint", - env.getParallelism(), - result.getParallelism()); + assertThat(result.getParallelism()).isEqualTo(env.getParallelism()); } @Test - public void testMaxParallelismRespected() { + void testMaxParallelismRespected() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(10); @@ -108,14 +103,11 @@ public void testMaxParallelismRespected() { new Path(), maxParallelism); - Assert.assertEquals( - "The parallelism of a data set should be constrained my the savepoint max parallelism", - 4, - result.getParallelism()); + assertThat(result.getParallelism()).isEqualTo(4); } @Test - public void testOperatorSpecificMaxParallelismRespected() { + void testOperatorSpecificMaxParallelismRespected() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); @@ -134,14 +126,11 @@ public void testOperatorSpecificMaxParallelismRespected() { new Path(), maxParallelism); - Assert.assertEquals( - "The parallelism of a data set should be constrained my the savepoint max parallelism", - 1, - result.getParallelism()); + assertThat(result.getParallelism()).isOne(); } @Test - public void testStreamConfig() { + void testStreamConfig() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream input = env.fromData(""); @@ -159,10 +148,7 @@ public void testStreamConfig() { KeySelector selector = config.getStatePartitioner(0, Thread.currentThread().getContextClassLoader()); - Assert.assertEquals( - "Incorrect key selector forwarded to stream operator", - CustomKeySelector.class, - selector.getClass()); + assertThat(selector.getClass()).isEqualTo(CustomKeySelector.class); } private static class CustomKeySelector implements KeySelector { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java index 518fca97b2ba52..c0d321dcb56c97 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java @@ -34,20 +34,21 @@ import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static org.assertj.core.api.Assertions.assertThat; + /** Test for operator broadcast state input format. */ -public class BroadcastStateInputFormatTest { +class BroadcastStateInputFormatTest { private static MapStateDescriptor descriptor = new MapStateDescriptor<>("state", Types.INT, Types.INT); @Test - public void testReadBroadcastState() throws Exception { + void testReadBroadcastState() throws Exception { try (TwoInputStreamOperatorTestHarness testHarness = getTestHarness()) { testHarness.open(); @@ -83,8 +84,7 @@ public void testReadBroadcastState() throws Exception { expected.put(2, 2); expected.put(3, 3); - Assert.assertEquals( - "Failed to read correct list state from state backend", expected, results); + assertThat(results).isEqualTo(expected); } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BufferingCollectorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BufferingCollectorTest.java index 933f4caed536b2..b06030c72e80c6 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BufferingCollectorTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BufferingCollectorTest.java @@ -18,27 +18,27 @@ package org.apache.flink.state.api.input; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; /** Test of the buffering collector. */ -public class BufferingCollectorTest { +class BufferingCollectorTest { @Test - public void testNestRemovesElement() { + void testNestRemovesElement() { BufferingCollector collector = new BufferingCollector<>(); collector.collect(1); - Assert.assertTrue("Failed to add element to collector", collector.hasNext()); - Assert.assertEquals( - "Incorrect element removed from collector", Integer.valueOf(1), collector.next()); - Assert.assertFalse("Failed to drop element from collector", collector.hasNext()); + assertThat(collector.hasNext()).isTrue(); + assertThat(collector.next()).isOne(); + assertThat(collector.hasNext()).isFalse(); } @Test - public void testEmptyCollectorReturnsNull() { + void testEmptyCollectorReturnsNull() { BufferingCollector collector = new BufferingCollector<>(); - Assert.assertNull("Empty collector did not return null", collector.next()); + assertThat(collector.next()).isNull(); } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java index 75018679b75cbc..7a69cdab1eca16 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java @@ -46,26 +46,22 @@ import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; -import org.junit.Assert; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import javax.annotation.Nonnull; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Set; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for keyed state input format. */ -@RunWith(Parameterized.class) class KeyedStateInputFormatTest { private static ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("state", Types.INT); @@ -88,8 +84,7 @@ void testCreatePartitionedInputSplits(boolean asyncState) throws Exception { new KeyedStateReaderOperator<>(new ReaderFunction(), Types.INT), new ExecutionConfig()); KeyGroupRangeInputSplit[] splits = format.createInputSplits(4); - Assert.assertEquals( - "Failed to properly partition operator state into input splits", 4, splits.length); + assertThat(splits.length).isEqualTo(4); } @ParameterizedTest(name = "Enable async state = {0}") @@ -110,10 +105,7 @@ void testMaxParallelismRespected(boolean asyncState) throws Exception { new KeyedStateReaderOperator<>(new ReaderFunction(), Types.INT), new ExecutionConfig()); KeyGroupRangeInputSplit[] splits = format.createInputSplits(129); - Assert.assertEquals( - "Failed to properly partition operator state into input splits", - 128, - splits.length); + assertThat(splits.length).isEqualTo(128); } @ParameterizedTest(name = "Enable async state = {0}") @@ -139,7 +131,7 @@ void testReadState(boolean asyncState) throws Exception { List data = readInputSplit(split, userFunction); - Assert.assertEquals("Incorrect data read from input split", Arrays.asList(1, 2, 3), data); + assertThat(data).isEqualTo(List.of(1, 2, 3)); } @ParameterizedTest(name = "Enable async state = {0}") @@ -165,8 +157,7 @@ void testReadMultipleOutputPerKey(boolean asyncState) throws Exception { List data = readInputSplit(split, userFunction); - Assert.assertEquals( - "Incorrect data read from input split", Arrays.asList(1, 1, 2, 2, 3, 3), data); + assertThat(data).isEqualTo(List.of(1, 1, 2, 2, 3, 3)); } @ParameterizedTest(name = "Enable async state = {0}") @@ -217,8 +208,7 @@ void testReadTime() throws Exception { List data = readInputSplit(split, userFunction); - Assert.assertEquals( - "Incorrect data read from input split", Arrays.asList(1, 1, 2, 2, 3, 3), data); + assertThat(data).isEqualTo(List.of(1, 1, 2, 2, 3, 3)); } @Nonnull @@ -390,18 +380,12 @@ public void readKey( Integer key, KeyedStateReaderFunction.Context ctx, Collector out) throws Exception { Set eventTimers = ctx.registeredEventTimeTimers(); - Assert.assertEquals( - "Each key should have exactly one event timer for key " + key, - 1, - eventTimers.size()); + assertThat(eventTimers).hasSize(1); out.collect(eventTimers.iterator().next().intValue()); Set procTimers = ctx.registeredProcessingTimeTimers(); - Assert.assertEquals( - "Each key should have exactly one processing timer for key " + key, - 1, - procTimers.size()); + assertThat(procTimers).hasSize(1); out.collect(procTimers.iterator().next().intValue()); } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java index ff9252ad5944d4..fbd34136925b70 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java @@ -36,21 +36,21 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + /** Test for operator list state input format. */ -public class ListStateInputFormatTest { +class ListStateInputFormatTest { private static ListStateDescriptor descriptor = new ListStateDescriptor<>("state", Types.INT); @Test - public void testReadListOperatorState() throws Exception { + void testReadListOperatorState() throws Exception { try (OneInputStreamOperatorTestHarness testHarness = getTestHarness()) { testHarness.open(); @@ -86,10 +86,7 @@ public void testReadListOperatorState() throws Exception { results.sort(Comparator.naturalOrder()); - Assert.assertEquals( - "Failed to read correct list state from state backend", - Arrays.asList(1, 2, 3), - results); + assertThat(results).isEqualTo(List.of(1, 2, 3)); } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java index 260df82dfa4fc4..64e17d37147b54 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java @@ -57,13 +57,11 @@ import org.apache.flink.runtime.state.ttl.mock.MockRestoreOperation; import org.apache.flink.runtime.state.ttl.mock.MockStateBackend; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -73,8 +71,10 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.assertj.core.api.Assertions.assertThat; + /** Test for the multi-state key iterator. */ -public class MultiStateKeyIteratorTest { +class MultiStateKeyIteratorTest { private static final List> descriptors; static { @@ -161,7 +161,7 @@ private static void clearKey( } @Test - public void testIteratorPullsKeyFromAllDescriptors() throws Exception { + void testIteratorPullsKeyFromAllDescriptors() throws Exception { AbstractKeyedStateBackend keyedStateBackend = createKeyedStateBackend(); setKey(keyedStateBackend, descriptors.get(0), 1); @@ -176,12 +176,11 @@ public void testIteratorPullsKeyFromAllDescriptors() throws Exception { keys.add(iterator.next()); } - Assert.assertEquals("Unexpected number of keys", 2, keys.size()); - Assert.assertEquals("Unexpected keys found", Arrays.asList(1, 2), keys); + assertThat(keys).isEqualTo(List.of(1, 2)); } @Test - public void testIteratorSkipsEmptyDescriptors() throws Exception { + void testIteratorSkipsEmptyDescriptors() throws Exception { AbstractKeyedStateBackend keyedStateBackend = createKeyedStateBackend(); List> threeDescriptors = new ArrayList<>(3); @@ -206,13 +205,12 @@ public void testIteratorSkipsEmptyDescriptors() throws Exception { keys.add(iterator.next()); } - Assert.assertEquals("Unexpected number of keys", 2, keys.size()); - Assert.assertEquals("Unexpected keys found", Arrays.asList(1, 2), keys); + assertThat(keys).isEqualTo(List.of(1, 2)); } /** Test for lazy enumeration of inner iterators. */ @Test - public void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError { + void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError { CountingKeysKeyedStateBackend keyedStateBackend = createCountingKeysKeyedStateBackend(100_000_000); MultiStateKeyIterator testedIterator = @@ -220,10 +218,7 @@ public void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError testedIterator.hasNext(); - Assert.assertEquals( - "Unexpected number of keys enumerated", - 1, - keyedStateBackend.numberOfKeysEnumerated); + assertThat(keyedStateBackend.numberOfKeysEnumerated).isOne(); } /** diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java index 91fd3781797744..3de0ffec0b5fca 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java @@ -28,18 +28,20 @@ import org.apache.flink.state.api.utils.CustomStateBackendFactory; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + /** Tests for the stream operator context builder. */ -public class StreamOperatorContextBuilderTest { +class StreamOperatorContextBuilderTest { private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorContextBuilderTest.class); - @Test(expected = CustomStateBackendFactory.ExpectedException.class) - public void testStateBackendLoading() throws Exception { + @Test + void testStateBackendLoading() throws Exception { Configuration configuration = new Configuration(); configuration.set( StateBackendOptions.STATE_BACKEND, @@ -67,6 +69,7 @@ public int getSplitNumber() { null, context.getExecutionConfig()); - builder.build(LOG); + assertThatThrownBy(() -> builder.build(LOG)) + .isInstanceOf(CustomStateBackendFactory.ExpectedException.class); } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java index 248a0ea5a6eeb6..510c15c77109dd 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java @@ -37,21 +37,21 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + /** Test for operator union state input format. */ -public class UnionStateInputFormatTest { +class UnionStateInputFormatTest { private static ListStateDescriptor descriptor = new ListStateDescriptor<>("state", Types.INT); @Test - public void testReadUnionOperatorState() throws Exception { + void testReadUnionOperatorState() throws Exception { try (OneInputStreamOperatorTestHarness testHarness = getTestHarness()) { testHarness.open(); @@ -82,10 +82,7 @@ public void testReadUnionOperatorState() throws Exception { results.sort(Comparator.naturalOrder()); - Assert.assertEquals( - "Failed to read correct list state from state backend", - Arrays.asList(1, 2, 3), - results); + assertThat(results).isEqualTo(List.of(1, 2, 3)); } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java index 96313f8c40d67f..828e1b3b43c3ba 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java @@ -55,31 +55,31 @@ import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.function.Function; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.mockito.Mockito.mock; /** Tests reading window state. */ @SuppressWarnings("unchecked") -public class WindowReaderTest { +class WindowReaderTest { private static final int MAX_PARALLELISM = 128; private static final String UID = "uid"; @Test - public void testReducingWindow() throws Exception { + void testReducingWindow() throws Exception { WindowOperator operator = getWindowOperator( stream -> @@ -102,11 +102,11 @@ public void testReducingWindow() throws Exception { new ExecutionConfig()); List list = readState(format); - Assert.assertEquals(Arrays.asList(1, 1), list); + assertThat(list).isEqualTo(List.of(1, 1)); } @Test - public void testSessionWindow() throws Exception { + void testSessionWindow() throws Exception { WindowOperator operator = getWindowOperator( stream -> @@ -129,11 +129,11 @@ public void testSessionWindow() throws Exception { new ExecutionConfig()); List list = readState(format); - Assert.assertEquals(Collections.singletonList(2), list); + assertThat(list).isEqualTo(Collections.singletonList(2)); } @Test - public void testAggregateWindow() throws Exception { + void testAggregateWindow() throws Exception { WindowOperator operator = getWindowOperator( stream -> @@ -156,11 +156,11 @@ public void testAggregateWindow() throws Exception { new ExecutionConfig()); List list = readState(format); - Assert.assertEquals(Arrays.asList(1, 1), list); + assertThat(list).isEqualTo(List.of(1, 1)); } @Test - public void testProcessReader() throws Exception { + void testProcessReader() throws Exception { WindowOperator operator = getWindowOperator( stream -> @@ -182,11 +182,11 @@ public void testProcessReader() throws Exception { new ExecutionConfig()); List list = readState(format); - Assert.assertEquals(Arrays.asList(1, 1), list); + assertThat(list).isEqualTo(List.of(1, 1)); } @Test - public void testPerPaneAndPerKeyState() throws Exception { + void testPerPaneAndPerKeyState() throws Exception { WindowOperator operator = getWindowOperator( stream -> @@ -209,7 +209,7 @@ public void testPerPaneAndPerKeyState() throws Exception { new ExecutionConfig()); List> list = readState(format); - Assert.assertEquals(Arrays.asList(Tuple2.of(2, 1), Tuple2.of(2, 1)), list); + assertThat(list).isEqualTo(List.of(Tuple2.of(2, 1), Tuple2.of(2, 1))); } private static WindowOperator getWindowOperator( @@ -257,14 +257,14 @@ private static OperatorState getOperatorState( DataStream dataStream) { Transformation transformation = dataStream.getTransformation(); if (!(transformation instanceof OneInputTransformation)) { - Assert.fail("This test only supports window operators"); + fail("This test only supports window operators"); } OneInputTransformation oneInput = (OneInputTransformation) transformation; StreamOperator operator = oneInput.getOperator(); if (!(operator instanceof WindowOperator)) { - Assert.fail("This test only supports window operators"); + fail("This test only supports window operators"); } return (WindowOperator) operator; diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java index b5b034a0762578..147d884c7c9e3c 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java @@ -36,16 +36,16 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.Collector; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import static org.assertj.core.api.Assertions.assertThat; /** Test writing keyed bootstrap state. */ -public class KeyedStateBootstrapOperatorTest { +class KeyedStateBootstrapOperatorTest { private static final ValueStateDescriptor descriptor = new ValueStateDescriptor<>("state", Types.LONG); @@ -54,11 +54,11 @@ public class KeyedStateBootstrapOperatorTest { private static final Long PROC_TIMER = Long.MAX_VALUE - 2; - @Rule public TemporaryFolder folder = new TemporaryFolder(); + @TempDir private java.nio.file.Path folder; @Test - public void testTimerStateRestorable() throws Exception { - Path path = new Path(folder.newFolder().toURI()); + void testTimerStateRestorable() throws Exception { + Path path = new Path(TempDirUtils.newFolder(folder).toURI()); OperatorSubtaskState state; KeyedStateBootstrapOperator bootstrapOperator = @@ -88,8 +88,8 @@ public void testTimerStateRestorable() throws Exception { } @Test - public void testNonTimerStatesRestorableByNonProcessesOperator() throws Exception { - Path path = new Path(folder.newFolder().toURI()); + void testNonTimerStatesRestorableByNonProcessesOperator() throws Exception { + Path path = new Path(TempDirUtils.newFolder(folder).toURI()); OperatorSubtaskState state; KeyedStateBootstrapOperator bootstrapOperator = @@ -123,7 +123,8 @@ private KeyedOneInputStreamOperatorTestHarness getHarness( bootstrapOperator, id -> id, Types.LONG, 128, 1, 0); harness.setStateBackend(new EmbeddedRocksDBStateBackend()); - harness.setCheckpointStorage(new FileSystemCheckpointStorage(folder.newFolder().toURI())); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage(TempDirUtils.newFolder(folder).toURI())); if (state != null) { harness.initializeState(state); } @@ -151,10 +152,7 @@ private OperatorSubtaskState getState( private void assertHarnessOutput( KeyedOneInputStreamOperatorTestHarness harness, T... output) { - Assert.assertThat( - "The output from the operator does not match the expected values", - harness.extractOutputValues(), - Matchers.containsInAnyOrder(output)); + assertThat(harness.extractOutputValues()).containsExactlyInAnyOrder(output); } private static class TimerBootstrapFunction extends KeyedStateBootstrapFunction { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java index 6a425bea0dc042..20ad8e348bd0bc 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java @@ -28,29 +28,32 @@ import org.apache.flink.state.api.runtime.SavepointLoader; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.util.Collections; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + /** Test for writing output savepoint metadata. */ -public class SavepointOutputFormatTest { +class SavepointOutputFormatTest { - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir private File temporaryFolder; - @Test(expected = IllegalStateException.class) - public void testSavepointOutputFormatOnlyWorksWithParallelismOne() throws Exception { - Path path = new Path(temporaryFolder.newFolder().getAbsolutePath()); + @Test + void testSavepointOutputFormatOnlyWorksWithParallelismOne() throws Exception { + Path path = new Path(temporaryFolder.getAbsolutePath()); SavepointOutputFormat format = createSavepointOutputFormat(path); - format.open(FirstAttemptInitializationContext.of(0, 2)); + assertThatThrownBy(() -> format.open(FirstAttemptInitializationContext.of(0, 2))) + .isInstanceOf(IllegalStateException.class); } @Test - public void testSavepointOutputFormat() throws Exception { - Path path = new Path(temporaryFolder.newFolder().getAbsolutePath()); + void testSavepointOutputFormat() throws Exception { + Path path = new Path(temporaryFolder.getAbsolutePath()); SavepointOutputFormat format = createSavepointOutputFormat(path); CheckpointMetadata metadata = createSavepoint(); @@ -61,20 +64,12 @@ public void testSavepointOutputFormat() throws Exception { CheckpointMetadata metadataOnDisk = SavepointLoader.loadSavepointMetadata(path.getPath()); - Assert.assertEquals( - "Incorrect checkpoint id", - metadata.getCheckpointId(), - metadataOnDisk.getCheckpointId()); + assertThat(metadataOnDisk.getCheckpointId()).isEqualTo(metadata.getCheckpointId()); - Assert.assertEquals( - "Incorrect number of operator states in savepoint", - metadata.getOperatorStates().size(), - metadataOnDisk.getOperatorStates().size()); + assertThat(metadataOnDisk.getOperatorStates()).hasSameSizeAs(metadata.getOperatorStates()); - Assert.assertEquals( - "Incorrect operator state in savepoint", - metadata.getOperatorStates().iterator().next(), - metadataOnDisk.getOperatorStates().iterator().next()); + assertThat(metadataOnDisk.getOperatorStates().iterator().next()) + .isEqualTo(metadata.getOperatorStates().iterator().next()); } private CheckpointMetadata createSavepoint() { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java index 00e37e63d4456a..302a9e65ac2d2c 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java @@ -33,23 +33,24 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + /** Tests that snapshot utils can properly snapshot an operator. */ -public class SnapshotUtilsTest { +class SnapshotUtilsTest { private static final List EXPECTED_CALL_OPERATOR_SNAPSHOT = Arrays.asList("prepareSnapshotPreBarrier", "snapshotState", "notifyCheckpointComplete"); - @Rule public TemporaryFolder folder = new TemporaryFolder(); + @TempDir private File folder; private static final List ACTUAL_ORDER_TRACKING = Collections.synchronizedList(new ArrayList<>(EXPECTED_CALL_OPERATOR_SNAPSHOT.size())); @@ -57,17 +58,17 @@ public class SnapshotUtilsTest { private static SnapshotType actualSnapshotType; @Test - public void testSnapshotUtilsLifecycleWithDefaultSavepointFormatType() throws Exception { + void testSnapshotUtilsLifecycleWithDefaultSavepointFormatType() throws Exception { testSnapshotUtilsLifecycleWithSavepointFormatType(SavepointFormatType.DEFAULT); } @Test - public void testSnapshotUtilsLifecycleWithCanonicalSavepointFormatType() throws Exception { + void testSnapshotUtilsLifecycleWithCanonicalSavepointFormatType() throws Exception { testSnapshotUtilsLifecycleWithSavepointFormatType(SavepointFormatType.CANONICAL); } @Test - public void testSnapshotUtilsLifecycleWithNativeSavepointFormatType() throws Exception { + void testSnapshotUtilsLifecycleWithNativeSavepointFormatType() throws Exception { testSnapshotUtilsLifecycleWithSavepointFormatType(SavepointFormatType.NATIVE); } @@ -75,7 +76,7 @@ private void testSnapshotUtilsLifecycleWithSavepointFormatType( SavepointFormatType savepointFormatType) throws Exception { ACTUAL_ORDER_TRACKING.clear(); StreamOperator operator = new LifecycleOperator(); - Path path = new Path(folder.newFolder().getAbsolutePath()); + Path path = new Path(folder.getAbsolutePath()); SnapshotUtils.snapshot( 0L, @@ -88,8 +89,8 @@ private void testSnapshotUtilsLifecycleWithSavepointFormatType( path, savepointFormatType); - Assert.assertEquals(SavepointType.savepoint(savepointFormatType), actualSnapshotType); - Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING); + assertThat(actualSnapshotType).isEqualTo(SavepointType.savepoint(savepointFormatType)); + assertThat(ACTUAL_ORDER_TRACKING).isEqualTo(EXPECTED_CALL_OPERATOR_SNAPSHOT); } private static class LifecycleOperator implements StreamOperator { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java index 9e471297b9f5f1..f6d29a49201d6f 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java @@ -25,27 +25,28 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.stream.StreamSupport; +import static org.assertj.core.api.Assertions.assertThat; + /** * Test that {@code OperatorIDGenerator} creates ids from uids exactly the same as the job graph * generator. */ -public class OperatorIDGeneratorTest { +class OperatorIDGeneratorTest { private static final String UID = "uid"; private static final String OPERATOR_NAME = "operator"; @Test - public void testOperatorIdMatchesUid() { + void testOperatorIdMatchesUid() { OperatorID expectedId = getOperatorID(); OperatorID generatedId = OperatorIDGenerator.fromUid(UID); - Assert.assertEquals(expectedId, generatedId); + assertThat(generatedId).isEqualTo(expectedId); } private static OperatorID getOperatorID() { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java index 97d14baa121134..0da89badf9adef 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -30,10 +31,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.legacy.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.AbstractID; +import org.junit.jupiter.api.BeforeEach; + import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -46,7 +49,14 @@ import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; /** A test base that includes utilities for taking a savepoint. */ -public abstract class SavepointTestBase extends AbstractTestBaseJUnit4 { +public abstract class SavepointTestBase extends AbstractTestBase { + + private RestClusterClient clusterClient; + + @BeforeEach + void setClusterClient(@InjectClusterClient RestClusterClient clusterClient) { + this.clusterClient = clusterClient; + } public String takeSavepoint(StreamExecutionEnvironment executionEnvironment) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -56,26 +66,23 @@ public String takeSavepoint(StreamExecutionEnvironment executionEnvironment) { JobID jobId = jobGraph.getJobID(); - ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); - try { - JobID jobID = client.submitJob(jobGraph).get(); + JobID jobID = clusterClient.submitJob(jobGraph).get(); - waitForAllRunningOrSomeTerminal(jobID, MINI_CLUSTER_RESOURCE); + waitForAllRunningOrSomeTerminal(jobID, clusterClient); - return triggerSavepoint(client, jobID).get(5, TimeUnit.MINUTES); + return triggerSavepoint(clusterClient, jobID).get(5, TimeUnit.MINUTES); } catch (Exception e) { throw new RuntimeException("Failed to take savepoint", e); } finally { - client.cancel(jobId); + clusterClient.cancel(jobId); } } public static void waitForAllRunningOrSomeTerminal( - JobID jobID, MiniClusterWithClientResource miniClusterResource) throws Exception { + JobID jobID, RestClusterClient clusterClient) throws Exception { while (true) { - JobDetailsInfo jobInfo = - miniClusterResource.getRestClusterClient().getJobDetails(jobID).get(); + JobDetailsInfo jobInfo = clusterClient.getJobDetails(jobID).get(); Set vertexStates = jobInfo.getJobVertexInfos().stream() .map(JobDetailsInfo.JobVertexDetailsInfo::getExecutionState)