From 86c494fbd06c3a4a0d866bdb79ddcb6f071873a9 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 3 Jul 2026 16:17:10 +0200 Subject: [PATCH] 4.x: Streamable + timer + interval; cleanups --- .../io/reactivex/rxjava4/core/Streamable.java | 358 ++++++++++------- .../streamable/StreamableFromPublisher.java | 5 +- .../streamable/StreamableIntervalRange.java | 169 ++++++++ .../operators/streamable/StreamableTimer.java | 113 ++++++ .../io/reactivex/rxjava4/core/RxJavaTest.java | 20 +- .../streamable/StreamableBaseTest.java | 1 + .../StreamableFromPublisherTest.java | 40 +- .../StreamableIntervalRangeTest.java | 367 ++++++++++++++++++ .../streamable/StreamableTimerTest.java | 162 ++++++++ .../SharedSchedulerIsolatedTest.java | 2 +- .../rxjava4/testsupport/TestHelper.java | 9 + .../validators/CheckParamValidationTest.java | 5 + 12 files changed, 1093 insertions(+), 158 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableIntervalRange.java create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTimer.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableIntervalRangeTest.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTimerTest.java diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamable.java b/src/main/java/io/reactivex/rxjava4/core/Streamable.java index 6488165d52..7d226232c6 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamable.java @@ -90,125 +90,27 @@ public interface Streamable<@NonNull T> { // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo /** - * Returns an empty {@code Streamable} that never produces an item and just completes. - * @param the element type - * @return the {@code Streamable} instance - */ - @SuppressWarnings("unchecked") - @CheckReturnValue - @NonNull - static <@NonNull T> Streamable empty() { - return RxJavaPlugins.onAssembly((Streamable)StreamableEmpty.INSTANCE); - } - - /** - * Returns a single-element {@code Streamable} that produces the constant item and completes. - * @param the element type - * @param item the constant item to produce - * @return the {@code Streamable} instance - */ - @CheckReturnValue - @NonNull - static <@NonNull T> Streamable just(@NonNull T item) { - Objects.requireNonNull(item, "item is null"); - return RxJavaPlugins.onAssembly(new StreamableJust<>(item)); - } - - /** - * Filters out the upstream items that do not pass the given predicate - * @param predicate the callback that should return {@code true} to let the upstream value pass - * or {@code false} to ignore it and continue with the next upstream item - * @return the new {@code Streamable} instance - * @throw NullPointerException if {@code predicate} is {@code null} - */ - @CheckReturnValue - @NonNull - default Streamable filter(@NonNull Predicate predicate) { - Objects.requireNonNull(predicate, "predicate is null"); - return RxJavaPlugins.onAssembly(new StreamableFilter<>(this, predicate)); - } - - /** - * Streams all elements of the given items array. - * @param the element type of the items - * @param items the array of items to stream - * @return the new {@code Streamable} instance - * @throws NullPointerException if {@code items} is {@code null} - */ - @SafeVarargs - @CheckReturnValue - @NonNull - static <@NonNull T> Streamable fromArray(@NonNull T... items) { - Objects.requireNonNull(items, "items is null"); - return RxJavaPlugins.onAssembly(new StreamableFromArray<>(items)); - } - - /** - * Streams all elements of the given {@link Iterable} sequence. - * @param the element type of the items - * @param items the iterable of items to stream - * @return the new {@code Streamable} instance - * @throws NullPointerException if {@code items} is {@code null} - */ - @CheckReturnValue - @NonNull - static <@NonNull T> Streamable fromIterable(@NonNull Iterable items) { - Objects.requireNonNull(items, "items is null"); - return RxJavaPlugins.onAssembly(new StreamableFromIterable<>(items)); - } - - /** - * Streams all elements of the given {@link Stream} sequence. - * @param the element type of the items - * @param items the stream of items to stream - * @return the new {@code Streamable} instance - * @throws NullPointerException if {@code items} is {@code null} - */ - @CheckReturnValue - @NonNull - static <@NonNull T> Streamable fromStream(@NonNull Stream items) { - Objects.requireNonNull(items, "items is null"); - return RxJavaPlugins.onAssembly(new StreamableFromStream<>(items)); - } - - /** - * Convert any {@link java.util.concurrent.Flow.Publisher} into a {@code Streamable} sequence. - * @param the element type - * @param source Flow.Publisher to convert - * @return the new {@code Streamable} instance - */ - @CheckReturnValue - @NonNull - static Streamable fromPublisher(@NonNull Flow.Publisher source) { - Objects.requireNonNull(source, "source is null"); - return fromPublisher(source, Executors.newVirtualThreadPerTaskExecutor()); - } - - /** - * Convert any {@link java.util.concurrent.Flow.Publisher} into a {@code Streamable} sequence. - * @param the element type - * @param source Flow.Publisher to convert - * @param executor where the conversion will run - * @return the new {@code Streamable} instance + * Emits the elements of each inner sequence produced by the outer sequence. + * @param the common element type + * @param sources a streamable of inner streamables + * @param executor the executorservice where to run the virtual wait + * @return the new {@code Streamable} instance. + * @throws NullPointerException if {@code sources} or {@code exec} is {@code null} */ @CheckReturnValue @NonNull - static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNull ExecutorService executor) { - Objects.requireNonNull(source, "source is null"); + static <@NonNull T> Streamable concat(Streamable> sources, ExecutorService executor) { + Objects.requireNonNull(sources, "sources is null"); Objects.requireNonNull(executor, "executor is null"); - return RxJavaPlugins.onAssembly(new StreamableFromPublisher<>(source, executor)); - } - - /** - * Returns an {@code Streamable} that never produces an item and never terminates. - * @param the element type - * @return the {@code Streamable} instance - */ - @SuppressWarnings("unchecked") - @CheckReturnValue - @NonNull - static <@NonNull T> Streamable never() { - return RxJavaPlugins.onAssembly((Streamable)StreamableNever.INSTANCE); + return create(emitter -> { + try (var mainSource = sources.forEach(item -> { + try (var innerSource = item.forEach(emitter::emit, emitter.canceller().derive(), executor)) { + innerSource.await(); + } + }, emitter.canceller(), executor)) { + mainSource.await(); + } + }, executor); } /** @@ -317,27 +219,186 @@ static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNu } /** - * Emits the elements of each inner sequence produced by the outer sequence. - * @param the common element type - * @param sources a streamable of inner streamables - * @param executor the executorservice where to run the virtual wait - * @return the new {@code Streamable} instance. - * @throws NullPointerException if {@code sources} or {@code exec} is {@code null} + * Returns an empty {@code Streamable} that never produces an item and just completes. + * @param the element type + * @return the {@code Streamable} instance */ + @SuppressWarnings("unchecked") @CheckReturnValue @NonNull - static <@NonNull T> Streamable concat(Streamable> sources, ExecutorService executor) { - Objects.requireNonNull(sources, "sources is null"); + static <@NonNull T> Streamable empty() { + return RxJavaPlugins.onAssembly((Streamable)StreamableEmpty.INSTANCE); + } + + /** + * Filters out the upstream items that do not pass the given predicate + * @param predicate the callback that should return {@code true} to let the upstream value pass + * or {@code false} to ignore it and continue with the next upstream item + * @return the new {@code Streamable} instance + * @throw NullPointerException if {@code predicate} is {@code null} + */ + @CheckReturnValue + @NonNull + default Streamable filter(@NonNull Predicate predicate) { + Objects.requireNonNull(predicate, "predicate is null"); + return RxJavaPlugins.onAssembly(new StreamableFilter<>(this, predicate)); + } + + /** + * Streams all elements of the given items array. + * @param the element type of the items + * @param items the array of items to stream + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code items} is {@code null} + */ + @SafeVarargs + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable fromArray(@NonNull T... items) { + Objects.requireNonNull(items, "items is null"); + return RxJavaPlugins.onAssembly(new StreamableFromArray<>(items)); + } + + /** + * Streams all elements of the given {@link Iterable} sequence. + * @param the element type of the items + * @param items the iterable of items to stream + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code items} is {@code null} + */ + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable fromIterable(@NonNull Iterable items) { + Objects.requireNonNull(items, "items is null"); + return RxJavaPlugins.onAssembly(new StreamableFromIterable<>(items)); + } + + /** + * Convert any {@link java.util.concurrent.Flow.Publisher} into a {@code Streamable} sequence. + * @param the element type + * @param source Flow.Publisher to convert + * @return the new {@code Streamable} instance + */ + @CheckReturnValue + @NonNull + static Streamable fromPublisher(@NonNull Flow.Publisher source) { + Objects.requireNonNull(source, "source is null"); + return fromPublisher(source, Executors.newVirtualThreadPerTaskExecutor()); + } + + /** + * Convert any {@link java.util.concurrent.Flow.Publisher} into a {@code Streamable} sequence. + * @param the element type + * @param source Flow.Publisher to convert + * @param executor where the conversion will run + * @return the new {@code Streamable} instance + */ + @CheckReturnValue + @NonNull + static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNull ExecutorService executor) { + Objects.requireNonNull(source, "source is null"); Objects.requireNonNull(executor, "executor is null"); - return create(emitter -> { - try (var mainSource = sources.forEach(item -> { - try (var innerSource = item.forEach(emitter::emit, emitter.canceller().derive(), executor)) { - innerSource.await(); - } - }, emitter.canceller(), executor)) { - mainSource.await(); - } - }, executor); + return RxJavaPlugins.onAssembly(new StreamableFromPublisher<>(source, executor)); + } + + /** + * Streams all elements of the given {@link Stream} sequence. + * @param the element type of the items + * @param items the stream of items to stream + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code items} is {@code null} + */ + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable fromStream(@NonNull Stream items) { + Objects.requireNonNull(items, "items is null"); + return RxJavaPlugins.onAssembly(new StreamableFromStream<>(items)); + } + + /** + * Constructs a {@code Streamable} that after the initial delay, starts emitting an ever increasing + * numbers from {@code start} up to {@code start + count} exclusive with the given period. + *

+ * If there are processing delays, this source may emit multiple queued up items in a quick succession. + * @param start the first long value to emit + * @param count the number of items to emit, use {@link Long#MAX_VALUE} for an unlimited range + * @param initialDelay the time to delay before the {@code start} item is emitted + * @param period the period of how often emit the next item + * @param unit the time unit for both {@code initialDelay} and {@code period} + * @param scheduler the scheduler to use for the timed waiting + * @return the new {@code Streamable} instance + */ + static Streamable intervalRange(long start, long count, + long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + if (count < 0) { + throw new IllegalArgumentException("count >= 0 required but it was " + count); + } + + long end = start + (count - 1); + if (start > 0 && end < 0) { + throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE"); + } + + return RxJavaPlugins.onAssembly(new StreamableIntervalRange(start, count, initialDelay, period, unit, scheduler, null)); + } + + /** + * Constructs a {@code Streamable} that after the initial delay, starts emitting an ever increasing + * numbers from {@code start} up to {@code start + count} exclusive with the given period. + *

+ * If the provided {@link ExecutorService} is a {@link ScheduledExecutorService}, its + * {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} will be used. + * Otherwise, a plain {@code ExecutorService} will be wrapped via {@link Schedulers#from(Executor)}. + *

+ * If there are processing delays, this source may emit multiple queued up items in a quick succession. + * @param start the first long value to emit + * @param count the number of items to emit, use {@link Long#MAX_VALUE} for an unlimited range + * @param initialDelay the time to delay before the {@code start} item is emitted + * @param period the period of how often emit the next itme + * @param unit the time unit for both {@code initialDelay} and {@code period} + * @param executor the executor to use + * @return the new {@code Streamable} instance + */ + static Streamable intervalRange(long start, long count, + long initialDelay, long period, TimeUnit unit, ExecutorService executor) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(executor, "executor is null"); + if (count < 0) { + throw new IllegalArgumentException("count >= 0 required but it was " + count); + } + + long end = start + (count - 1); + if (start > 0 && end < 0) { + throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE"); + } + return RxJavaPlugins.onAssembly(new StreamableIntervalRange(start, count, initialDelay, period, unit, null, executor)); + } + + /** + * Returns a single-element {@code Streamable} that produces the constant item and completes. + * @param the element type + * @param item the constant item to produce + * @return the {@code Streamable} instance + */ + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable just(@NonNull T item) { + Objects.requireNonNull(item, "item is null"); + return RxJavaPlugins.onAssembly(new StreamableJust<>(item)); + } + + /** + * Returns an {@code Streamable} that never produces an item and never terminates. + * @param the element type + * @return the {@code Streamable} instance + */ + @SuppressWarnings("unchecked") + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable never() { + return RxJavaPlugins.onAssembly((Streamable)StreamableNever.INSTANCE); } /** @@ -392,6 +453,39 @@ static Streamable rangeLong(long start, long count) { return RxJavaPlugins.onAssembly(new StreamableRangeLong(start, count)); } + /** + * Signals a single 0L and completes after the given delay amount of time. + * @param delay the amount to delay the signaling of a single item + * @param unit the time unit + * @param scheduler where the timed delay should happen + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} + */ + static Streamable timer(long delay, TimeUnit unit, Scheduler scheduler) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new StreamableTimer(delay, unit, scheduler, null)); + } + + /** + * Signals a single 0L and completes after the given delay amount of time. + *

+ * If the {@code executor} is a {@link ScheduledExecutorService}, the operator will use + * its {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)} method. + * Otherwise, the {@link ExecutorService#submit(Callable)} will be invoked with an upfront + * {@link TimeUnit#sleep(long)}. + * @param delay the amount to delay the signaling of a single item + * @param unit the time unit + * @param executor where the timed delay should happen + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code unit} or {@code executor} is {@code null} + */ + static Streamable timer(long delay, TimeUnit unit, ExecutorService executor) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(executor, "executor is null"); + return RxJavaPlugins.onAssembly(new StreamableTimer(delay, unit, null, executor)); + } + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo // Operators // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java index cc33a3f42c..c91cb2c644 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java @@ -135,9 +135,8 @@ public Boolean get() { @Override public void run() { - if (SubscriptionHelper.cancel(upstream)) { - item.lazySet(null); - } + item.lazySet(null); + SubscriptionHelper.cancel(upstream); } } } diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableIntervalRange.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableIntervalRange.java new file mode 100644 index 0000000000..0eac7235f1 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableIntervalRange.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.io.Serial; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.*; +import io.reactivex.rxjava4.internal.disposables.DisposableHelper; +import io.reactivex.rxjava4.schedulers.Schedulers; + +public record StreamableIntervalRange( + long start, + long count, + long initialDelay, + long period, + TimeUnit unit, + Scheduler scheduler, + ExecutorService executor +) implements Streamable { + + public StreamableIntervalRange { + if (scheduler == null && executor == null) { + throw new IllegalArgumentException("scheduler and executor cannot be both null"); + } + } + + @Override + public @NonNull Streamer stream(@NonNull DisposableContainer cancellation) { + var streamer = new IntervalStreamer(start, start + count); + if (scheduler != null) { + var d = scheduler.schedulePeriodicallyDirect(streamer, initialDelay, period, unit); + DisposableHelper.setOnce(streamer, d); + } else + if (executor instanceof ScheduledExecutorService se) { + var f = se.scheduleAtFixedRate(streamer, initialDelay, period, unit); + DisposableHelper.setOnce(streamer, Disposable.fromFuture(f, true)); + } else { + var s = Schedulers.from(executor, true); + var d = s.schedulePeriodicallyDirect(streamer, initialDelay, period, unit); + DisposableHelper.setOnce(streamer, d); + } + cancellation.add(streamer); + return streamer; + } + + static final class IntervalStreamer extends AtomicReference + implements Streamer, Runnable, java.util.function.Function, Disposable { + + @Serial + private static final long serialVersionUID = 197364198498939579L; + + final long end; + + long counterTask; + + long counterLocal; + + volatile Long available; + + volatile Long current; + + final AtomicReference> waiter; + + IntervalStreamer(long start, long end) { + this.counterTask = start; + this.counterLocal = start; + this.end = end; + this.waiter = new AtomicReference<>(); + } + + @Override + public void run() { + var c = counterTask++; + available = c; + if (c + 1 >= end) { + DisposableHelper.dispose(this); + } + for (;;) { + var cf = waiter.get(); + if (cf != null) { + cf.complete(true); + break; + } + cf = CompletableFuture.completedFuture(true); + if (waiter.compareAndSet(null, cf)) { + break; + } + } + } + + @Override + public @NonNull CompletionStage next() { + waiter.getAndSet(null); + for (;;) { + if (counterLocal >= end) { + return NEXT_FALSE; + } + var c = current; + var a = available; + if (c == null && a != null) { + current = counterLocal++; + return NEXT_TRUE; + } + if (c != null && c < a) { + current = counterLocal++; + return NEXT_TRUE; + } + if (isDisposed()) { + return CompletableFuture.failedFuture(new CancellationException()); + } + var cf = waiter.get(); + if (cf != null) { + waiter.getAndSet(null); + return cf.thenApply(this); + } + cf = new CompletableFuture(); + if (waiter.compareAndSet(null, cf)) { + return cf.thenApply(this); + } + } + } + + @Override + public Boolean apply(Boolean t) { + current = counterLocal++; + return true; + } + + @Override + public @NonNull Long current() { + return current; + } + + @Override + public @NonNull CompletionStage finish() { + DisposableHelper.dispose(this); + return FINISHED; + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + var cf = waiter.getAndSet(null); + if (cf != null) { + cf.completeExceptionally(new CancellationException()); + } + } + + @Override + public boolean isDisposed() { + return get() == DisposableHelper.DISPOSED; + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTimer.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTimer.java new file mode 100644 index 0000000000..e3e8034f1d --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTimer.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.io.Serial; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.rxjava4.annotations.*; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.*; +import io.reactivex.rxjava4.internal.disposables.DisposableHelper; + +public record StreamableTimer(long delay, @NonNull TimeUnit unit, + @Nullable Scheduler scheduler, + @Nullable ExecutorService executor) implements Streamable { + + public StreamableTimer { + if (scheduler == null && executor == null) { + throw new IllegalArgumentException("scheduler and executor cannot be both null"); + } + } + + @Override + public @NonNull Streamer stream(@NonNull DisposableContainer cancellation) { + var streamer = new TimerStreamer(); + cancellation.add(streamer); + if (scheduler != null) { + var d = scheduler.scheduleDirect(streamer, delay, unit); + DisposableHelper.setOnce(streamer, d); + } else + if (executor instanceof ScheduledExecutorService se) { + var f = se.schedule(streamer, delay, unit); + DisposableHelper.setOnce(streamer, Disposable.fromFuture(f, true)); + } else { + var f = executor.submit(() -> { + try { + unit.sleep(delay); + } catch (InterruptedException ex) { + streamer.interrupedSleep(ex); + return null; + } + streamer.run(); + return null; + }); + DisposableHelper.setOnce(streamer, Disposable.fromFuture(f, true)); + } + return streamer; + } + + static final class TimerStreamer extends AtomicReference implements Streamer, Runnable, Disposable { + + @Serial + private static final long serialVersionUID = 1738554471573342053L; + + int state; + + final CompletableFuture waiter = new CompletableFuture<>(); + + @Override + public void run() { + waiter.complete(true); + } + + void interrupedSleep(InterruptedException ex) { + waiter.completeExceptionally(ex); + } + + @Override + public @NonNull CompletionStage next() { + if (state == 0) { + state = 1; + return waiter; + } + return NEXT_FALSE; + } + + @Override + public @NonNull Long current() { + return 0L; + } + + @Override + public @NonNull CompletionStage finish() { + waiter.complete(false); + lazySet(DisposableHelper.DISPOSED); + return FINISHED; + } + + @Override + public void dispose() { + if (DisposableHelper.dispose(this)) { + waiter.completeExceptionally(new CancellationException()); + } + } + + @Override + public boolean isDisposed() { + return get() == DisposableHelper.DISPOSED; + } + } +} diff --git a/src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java b/src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java index 30c8ead2c7..4255337acb 100644 --- a/src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java +++ b/src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java @@ -60,6 +60,7 @@ public void afterEach(TestInfo info) { * that usually depend on Thread.sleep consistency. * @param count the number of times to retry * @param code the code to run + * @since 4.0.0 */ public static void withRetry(int count, Action code) { AssertionError error = null; @@ -85,6 +86,7 @@ public static void withRetry(int count, Action code) { * Don't forget to {@link ExecutorService#submit(Callable)} your work! * @param call the callback to give the VTE. * @throws Throwable propagate exceptions + * @since 4.0.0 */ public static void withVirtual(Consumer call) throws Throwable { try (var exec = new ExecutorIntercept(Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()), false)) { @@ -96,6 +98,7 @@ public static void withVirtual(Consumer call) throws Throwable * Execute a call within a virtual thread of the standard virtual thread executor. * @param call the call to invoke * @throws Throwable the exception propagated out + * @since 4.0.0 */ public static void onVirtual(Consumer call) throws Throwable { withVirtual(exec -> exec.submit(() -> { @@ -190,6 +193,7 @@ public T invokeAny(Collection> tasks, long timeout, Ti * Don't forget to {@link ExecutorService#submit(Callable)} your work! * @param call the callback to give the VTE. * @throws Throwable propagate exceptions + * @since 4.0.0 */ public static void withSingleExecutor(Consumer call) throws Throwable { try (var exec = Executors.newSingleThreadScheduledExecutor()) { @@ -203,6 +207,7 @@ public static void withSingleExecutor(Consumer call) t * Don't forget to {@link ExecutorService#submit(Callable)} your work! * @param call the callback to give the VTE. * @throws Throwable propagate exceptions + * @since 4.0.0 */ public static void withCachedExecutor(Consumer call) throws Throwable { try (var exec = new ExecutorIntercept(Executors.newCachedThreadPool(), false)) { @@ -211,7 +216,7 @@ public static void withCachedExecutor(Consumer call) throws Thr } /** - * Enable thracking of the global errors for the duration of the action. + * Enable tracking of the global errors for the duration of the action. * @param action the action to run with a list of errors encountered * @throws Throwable the exception rethrown from the action */ @@ -227,4 +232,17 @@ public static void withErrorTracking(Consumer> action) throws Th public static List trackPluginErrors() { return TestHelper.trackPluginErrors(); } + + /** + * Creates a virtual thread based scheduled executor service for the duration of the call, then + * closes it. + * @param call the call to handle the virtual scheduled executor service + * @throws Throwable in case the callback throws + * @since 4.0.0 + */ + public static void withVirtualScheduled(Consumer call) throws Throwable { + try(var exec = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())) { + call.accept(exec); + } + } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableBaseTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableBaseTest.java index a0c0caa90c..816e7ef636 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableBaseTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableBaseTest.java @@ -15,6 +15,7 @@ import java.lang.ref.Cleaner; import java.util.*; +import java.util.concurrent.*; import org.junit.jupiter.api.*; diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisherTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisherTest.java index c71581638e..bd686a2f1f 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisherTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisherTest.java @@ -68,36 +68,42 @@ public void take() throws Throwable { } @Test - @Disabled("Don't know yet why it doesn't propagate the cancellation exception...") public void cancel() throws Throwable { withVirtual(exec -> { var pp = PublishProcessor.create(); + IO.println("test()"); + var ts = pp.toStreamable(exec) .test(exec); + IO.println("hasSubscription()"); + while (!ts.hasSubscription()) { Thread.sleep(1); } + IO.println("hasSubscribers()"); + while (!pp.hasSubscribers()) { - Thread.onSpinWait(); + Thread.sleep(1); } - var f = CompletableFuture.runAsync(() -> { - ts.awaitDone(5, TimeUnit.SECONDS); - }, exec); + IO.println("cancel()"); + + Thread.sleep(100); ts.cancel(); - f.join(); + IO.println("!hasSubscribers()"); - ts.assertFailure(CancellationException.class); + while (pp.hasSubscribers()) { + Thread.sleep(1); + } }); } @Test - @Disabled("Don't know yet why it doesn't propagate the cancellation exception...") public void cancelDebug() throws Throwable { withCachedExecutor(exec -> { var pp = PublishProcessor.create(); @@ -116,28 +122,20 @@ public void cancelDebug() throws Throwable { IO.println("hasSubscribers()"); while (!pp.hasSubscribers()) { - Thread.onSpinWait(); + Thread.sleep(1); } - IO.println("awaitDone()"); - - var f = CompletableFuture.runAsync(() -> { - ts.awaitDone(5, TimeUnit.SECONDS); - }, exec); - IO.println("cancel()"); Thread.sleep(100); ts.cancel(); - IO.println("join()"); + IO.println("!hasSubscribers()"); - f.join(); - - IO.println("assertFailure()"); - - ts.assertFailure(CancellationException.class); + while (pp.hasSubscribers()) { + Thread.sleep(1); + } }); } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableIntervalRangeTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableIntervalRangeTest.java new file mode 100644 index 0000000000..9ad5a22cba --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableIntervalRangeTest.java @@ -0,0 +1,367 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; + +import io.reactivex.rxjava4.core.Streamable; +import io.reactivex.rxjava4.schedulers.Schedulers; +import io.reactivex.rxjava4.testsupport.TestHelper; + +public class StreamableIntervalRangeTest extends StreamableBaseTest { + + @Test + public void basic() { + Streamable.intervalRange(1, 5, 20, 20, TimeUnit.MILLISECONDS, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void basicVirtual() throws Throwable { + withVirtual(exec -> { + Streamable.intervalRange(1, 5, 20, 20, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1L, 2L, 3L, 4L, 5L); + }); + } + + @Test + public void basicExecutor() throws Throwable { + withCachedExecutor(exec -> { + Streamable.intervalRange(1, 5, 20, 20, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1L, 2L, 3L, 4L, 5L); + }); + } + + @Test + public void basicVirtualSchedule() throws Throwable { + withVirtualScheduled(exec -> { + Streamable.intervalRange(1, 5, 20, 20, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1L, 2L, 3L, 4L, 5L); + }); + } + + @Test + public void singleStep() { + var streamer = new StreamableIntervalRange.IntervalStreamer(1, 6); + + streamer.run(); + + assertTrue(streamer.next().toCompletableFuture().join(), "next-join"); + + assertEquals(1L, streamer.current()); + + var cf = streamer.next().toCompletableFuture(); + + streamer.run(); + + assertTrue(cf.join(), "next-join-2"); + + assertEquals(2L, streamer.current()); + + streamer.run(); + streamer.run(); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-3"); + + assertEquals(3L, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-4"); + + assertEquals(4L, streamer.current()); + + streamer.run(); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-5"); + + assertEquals(5, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertFalse(cf.join(), "next-join-6"); + + streamer.finish().toCompletableFuture().join(); + + assertTrue(streamer.isDisposed(), "Streamer is not diposed"); + } + + @Test + public void singleStep2() { + var streamer = new StreamableIntervalRange.IntervalStreamer(1, 6); + + streamer.run(); + + assertTrue(streamer.next().toCompletableFuture().join(), "next-join"); + + assertEquals(1L, streamer.current()); + + var cf = streamer.next().toCompletableFuture(); + + streamer.run(); + + assertTrue(cf.join(), "next-join-2"); + + assertEquals(2L, streamer.current()); + + streamer.run(); + streamer.run(); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-3"); + + assertEquals(3L, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-4"); + + assertEquals(4L, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + + streamer.run(); + + assertTrue(cf.join(), "next-join-5"); + + assertEquals(5, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertFalse(cf.join(), "next-join-6"); + + streamer.finish().toCompletableFuture().join(); + + assertTrue(streamer.isDisposed(), "Streamer is not diposed"); + } + + @Test + public void singleStep3() { + var streamer = new StreamableIntervalRange.IntervalStreamer(1, 6); + + streamer.run(); + streamer.run(); + streamer.run(); + streamer.run(); + streamer.run(); + + assertTrue(streamer.next().toCompletableFuture().join(), "next-join"); + + assertEquals(1L, streamer.current()); + + var cf = streamer.next().toCompletableFuture(); + + assertTrue(cf.join(), "next-join-2"); + + assertEquals(2L, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-3"); + + assertEquals(3L, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-4"); + + assertEquals(4L, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-5"); + + assertEquals(5, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertFalse(cf.join(), "next-join-6"); + + streamer.finish().toCompletableFuture().join(); + + assertTrue(streamer.isDisposed(), "Streamer is not diposed"); + } + + @Test + public void disposable() { + TestHelper.checkDisposed(new StreamableIntervalRange.IntervalStreamer(1, 6)); + } + + @Test + public void singleStep4() { + var streamer = new StreamableIntervalRange.IntervalStreamer(1, 6); + + streamer.run(); + streamer.run(); + streamer.run(); + + assertTrue(streamer.next().toCompletableFuture().join(), "next-join"); + + assertEquals(1L, streamer.current()); + + var cf = streamer.next().toCompletableFuture(); + + assertTrue(cf.join(), "next-join-2"); + + assertEquals(2L, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + assertTrue(cf.join(), "next-join-3"); + + assertEquals(3L, streamer.current()); + + cf = streamer.next().toCompletableFuture(); + + streamer.dispose(); + + var cff = cf; + var e = assertThrows(CompletionException.class, () -> cff.join()); + assertTrue(e.getCause() instanceof CancellationException, e.getCause().toString()); + + streamer.finish().toCompletableFuture().join(); + + assertTrue(streamer.isDisposed(), "Streamer is not diposed"); + } + + @Test + public void singleStep5() { + var streamer = new StreamableIntervalRange.IntervalStreamer(1, 6); + + var cf = streamer.next().toCompletableFuture(); + + streamer.dispose(); + + var cff = cf; + var e = assertThrows(CompletionException.class, () -> cff.join()); + assertTrue(e.getCause() instanceof CancellationException, e.getCause().toString()); + + streamer.finish().toCompletableFuture().join(); + + assertTrue(streamer.isDisposed(), "Streamer is not diposed"); + } + + @Test + public void singleStep6() { + var streamer = new StreamableIntervalRange.IntervalStreamer(1, 6); + + streamer.dispose(); + + var cf = streamer.next().toCompletableFuture(); + + var cff = cf; + assertThrows(CancellationException.class, () -> cff.join()); + + streamer.finish().toCompletableFuture().join(); + + assertTrue(streamer.isDisposed(), "Streamer is not diposed"); + } + + @Test + public void singleStep7() { + var streamer = new StreamableIntervalRange.IntervalStreamer(1, 6); + + streamer.run(); + + assertTrue(streamer.next().toCompletableFuture().join(), "next-join"); + + assertEquals(1L, streamer.current()); + + var cf = streamer.next().toCompletableFuture(); + + streamer.run(); + + assertTrue(cf.join(), "next-join"); + + assertEquals(2L, streamer.current()); + + streamer.finish().toCompletableFuture().join(); + + assertTrue(streamer.isDisposed(), "Streamer is not diposed"); + } + + @Test + public void underflowLong() throws Throwable { + assertThrows(IllegalArgumentException.class, () -> { + Streamable.intervalRange(1, -1, 20, 20, TimeUnit.MILLISECONDS, Schedulers.single()); + }); + } + + @Test + public void underOverflowLong() throws Throwable { + assertThrows(IllegalArgumentException.class, () -> { + Streamable.intervalRange(2, Long.MAX_VALUE, 20, 20, TimeUnit.MILLISECONDS, Schedulers.single()); + }); + } + + @Test + public void underNoOverflowLong() throws Throwable { + Streamable.intervalRange(-2, Long.MAX_VALUE, 20, 20, TimeUnit.MILLISECONDS, Schedulers.single()); + } + + @Test + public void underflowLongExec() throws Throwable { + assertThrows(IllegalArgumentException.class, () -> { + withVirtual(exec -> { + Streamable.intervalRange(1, -1, 20, 20, TimeUnit.MILLISECONDS, exec); + }); + }); + } + + @Test + public void underOverflowLongExecutor() throws Throwable { + assertThrows(IllegalArgumentException.class, () -> { + withVirtual(exec -> { + Streamable.intervalRange(2, Long.MAX_VALUE, 20, 20, TimeUnit.MILLISECONDS, exec); + }); + }); + } + + @Test + public void underNoOverflowLongExecutor() throws Throwable { + withVirtual(exec -> { + Streamable.intervalRange(-2, Long.MAX_VALUE, 20, 20, TimeUnit.MILLISECONDS, exec); + }); + } + + @Test + public void timerRunNextRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + + var streamer = new StreamableIntervalRange.IntervalStreamer(1, 6); + + AtomicReference> result = new AtomicReference<>(); + Runnable r2 = () -> result.lazySet(streamer.next()); + + TestHelper.race(streamer, r2); + + result.get().toCompletableFuture().join(); + + assertEquals(1, streamer.current()); + } + } + + @Test + public void neitherSchedulerNoExecutor() { + assertThrows(IllegalArgumentException.class, () -> { + new StreamableIntervalRange(1, 1, 1, 1, TimeUnit.SECONDS, null, null); + }); + } +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTimerTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTimerTest.java new file mode 100644 index 0000000000..deee508260 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTimerTest.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.concurrent.*; + +import org.junit.jupiter.api.Test; + +import io.reactivex.rxjava4.core.Streamable; +import io.reactivex.rxjava4.schedulers.Schedulers; +import io.reactivex.rxjava4.testsupport.TestHelper; + +public class StreamableTimerTest extends StreamableBaseTest { + + @Test + public void basic() { + Streamable.timer(100, TimeUnit.MILLISECONDS, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(0L); + } + + @Test + public void basicExecutor() throws Throwable { + withCachedExecutor(exec -> {; + Streamable.timer(100, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(0L); + }); + } + + @Test + public void basicVirtual() throws Throwable { + withVirtual(exec -> {; + Streamable.timer(100, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(0L); + }); + } + + @Test + public void basicScheduled() throws Throwable { + try (var exec = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())) { + Streamable.timer(100, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(0L); + } + } + + @Test + public void neitherSchedulerNoExecutor() { + assertThrows(IllegalArgumentException.class, () -> { + new StreamableTimer(1, TimeUnit.SECONDS, null, null); + }); + } + + @Test + public void cancelBeforeTimerHit() { + assertThrows(CancellationException.class, () -> { + var awaitable = Streamable.timer(10, TimeUnit.MINUTES, Schedulers.single()) + .forEach(_ -> { }); + + Thread.sleep(100); + + awaitable.close(); + + awaitable.await(); + }); + } + + @Test + public void cancelBeforeTimerHitExecutorService() { + assertThrows(CancellationException.class, () -> { + withCachedExecutor(exec -> { + var awaitable = Streamable.timer(10, TimeUnit.MINUTES, exec) + .forEach(_ -> { }); + + Thread.sleep(100); + + awaitable.close(); + + awaitable.await(); + }); + }); + } + + @Test + public void cancelBeforeTimerHitScheduledExecutorService() { + assertThrows(CancellationException.class, () -> { + try (var exec = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())) { + var awaitable = Streamable.timer(10, TimeUnit.MINUTES, exec) + .forEach(_ -> { }); + + Thread.sleep(100); + + awaitable.close(); + + awaitable.await(); + }; + }); + } + + @Test + public void disposable() { + TestHelper.checkDisposed(new StreamableTimer.TimerStreamer()); + } + + @Test + public void basicZeroDelay() { + Streamable.timer(0, TimeUnit.MILLISECONDS, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(0L); + } + + @Test + public void basicExecutoreroDelay() throws Throwable { + withCachedExecutor(exec -> {; + Streamable.timer(0, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(0L); + }); + } + + @Test + public void basicVirtualeroDelay() throws Throwable { + withVirtual(exec -> {; + Streamable.timer(0, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(0L); + }); + } + + @Test + public void basicSchedulederoDelay() throws Throwable { + try (var exec = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())) { + Streamable.timer(0, TimeUnit.MILLISECONDS, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(0L); + } + } + +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerIsolatedTest.java b/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerIsolatedTest.java index 069ac617ec..076602651e 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerIsolatedTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerIsolatedTest.java @@ -47,7 +47,7 @@ public void noleak() throws Exception { long before = memoryUsage(); System.out.printf("Start: %.1f%n", before / 1024.0 / 1024.0); - for (int i = 0; i < 300 * 1000; i++) { + for (int i = 0; i < 500 * 1000; i++) { worker.schedule(Functions.EMPTY_RUNNABLE, 1, TimeUnit.DAYS); } diff --git a/src/test/java/io/reactivex/rxjava4/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava4/testsupport/TestHelper.java index 166ee803f2..abbc146a54 100644 --- a/src/test/java/io/reactivex/rxjava4/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava4/testsupport/TestHelper.java @@ -661,6 +661,15 @@ public static void doubleOnSubscribe(MaybeObserver observer) { } } + /** + * Makes sure the given {@link Disposable} is not disposed upfront, + * disposes it, checks if it got disposed and then double checks + * if disposing it again will not result in errors or no longer being disposed. + *

+ * A common use for it is to make sure the {@link Disposable#isDisposed()} gets + * covered even though naturally nothing would invoke that method in a normal flow. + * @param d the {@code Disposable} to check + */ public static void checkDisposed(Disposable d) { assertFalse(d.isDisposed(), "Disposed upfront?!"); diff --git a/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java b/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java index 41eb36ef92..dc80b4b3da 100644 --- a/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java +++ b/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java @@ -496,6 +496,9 @@ public void checkStreamable() { addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "toCompletionStage", Object.class)); addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "toCompletionStage", Object.class)); + addOverride(new ParamOverride(Streamable.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Streamable.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class, ExecutorService.class)); + // ----------------------------------------------------------------------------------- ignores = new HashMap<>(); @@ -529,6 +532,8 @@ public void checkStreamable() { // needs special param validation due to (long)start + end - 1 <= Integer.MAX_VALUE addIgnore(new ParamIgnore(Streamable.class, "range", Integer.TYPE, Integer.TYPE)); addIgnore(new ParamIgnore(Streamable.class, "rangeLong", Long.TYPE, Long.TYPE)); + addIgnore(new ParamIgnore(Streamable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addIgnore(new ParamIgnore(Streamable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class, ExecutorService.class)); // -----------------------------------------------------------------------------------