Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 226 additions & 132 deletions src/main/java/io/reactivex/rxjava4/core/Streamable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ public Boolean get() {

@Override
public void run() {
if (SubscriptionHelper.cancel(upstream)) {
item.lazySet(null);
}
item.lazySet(null);
SubscriptionHelper.cancel(upstream);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.internal.operators.streamable;

import java.io.Serial;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.core.*;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.internal.disposables.DisposableHelper;
import io.reactivex.rxjava4.schedulers.Schedulers;

public record StreamableIntervalRange(
long start,
long count,
long initialDelay,
long period,
TimeUnit unit,
Scheduler scheduler,
ExecutorService executor
) implements Streamable<Long> {

public StreamableIntervalRange {
if (scheduler == null && executor == null) {
throw new IllegalArgumentException("scheduler and executor cannot be both null");
}
}

@Override
public @NonNull Streamer<Long> stream(@NonNull DisposableContainer cancellation) {
var streamer = new IntervalStreamer(start, start + count);
if (scheduler != null) {
var d = scheduler.schedulePeriodicallyDirect(streamer, initialDelay, period, unit);
DisposableHelper.setOnce(streamer, d);
} else
if (executor instanceof ScheduledExecutorService se) {
var f = se.scheduleAtFixedRate(streamer, initialDelay, period, unit);
DisposableHelper.setOnce(streamer, Disposable.fromFuture(f, true));
} else {
var s = Schedulers.from(executor, true);
var d = s.schedulePeriodicallyDirect(streamer, initialDelay, period, unit);
DisposableHelper.setOnce(streamer, d);
}
cancellation.add(streamer);
return streamer;
}

static final class IntervalStreamer extends AtomicReference<Disposable>
implements Streamer<Long>, Runnable, java.util.function.Function<Boolean, Boolean>, Disposable {

@Serial
private static final long serialVersionUID = 197364198498939579L;

final long end;

long counterTask;

long counterLocal;

volatile Long available;

volatile Long current;

final AtomicReference<CompletableFuture<Boolean>> waiter;

IntervalStreamer(long start, long end) {
this.counterTask = start;
this.counterLocal = start;
this.end = end;
this.waiter = new AtomicReference<>();
}

@Override
public void run() {
var c = counterTask++;
available = c;
if (c + 1 >= end) {
DisposableHelper.dispose(this);
}
for (;;) {
var cf = waiter.get();
if (cf != null) {
cf.complete(true);
break;
}
cf = CompletableFuture.completedFuture(true);
if (waiter.compareAndSet(null, cf)) {
break;
}
}
}

@Override
public @NonNull CompletionStage<Boolean> next() {
waiter.getAndSet(null);
for (;;) {
if (counterLocal >= end) {
return NEXT_FALSE;
}
var c = current;
var a = available;
if (c == null && a != null) {
current = counterLocal++;
return NEXT_TRUE;
}
if (c != null && c < a) {
current = counterLocal++;
return NEXT_TRUE;
}
if (isDisposed()) {
return CompletableFuture.failedFuture(new CancellationException());
}
var cf = waiter.get();
if (cf != null) {
waiter.getAndSet(null);
return cf.thenApply(this);
}
cf = new CompletableFuture<Boolean>();
if (waiter.compareAndSet(null, cf)) {
return cf.thenApply(this);
}
}
}

@Override
public Boolean apply(Boolean t) {
current = counterLocal++;
return true;
}

@Override
public @NonNull Long current() {
return current;
}

@Override
public @NonNull CompletionStage<Void> finish() {
DisposableHelper.dispose(this);
return FINISHED;
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
var cf = waiter.getAndSet(null);
if (cf != null) {
cf.completeExceptionally(new CancellationException());
}
}

@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.internal.operators.streamable;

import java.io.Serial;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.core.*;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.internal.disposables.DisposableHelper;

public record StreamableTimer(long delay, @NonNull TimeUnit unit,
@Nullable Scheduler scheduler,
@Nullable ExecutorService executor) implements Streamable<Long> {

public StreamableTimer {
if (scheduler == null && executor == null) {
throw new IllegalArgumentException("scheduler and executor cannot be both null");
}
}

@Override
public @NonNull Streamer<Long> stream(@NonNull DisposableContainer cancellation) {
var streamer = new TimerStreamer();
cancellation.add(streamer);
if (scheduler != null) {
var d = scheduler.scheduleDirect(streamer, delay, unit);
DisposableHelper.setOnce(streamer, d);
} else
if (executor instanceof ScheduledExecutorService se) {
var f = se.schedule(streamer, delay, unit);
DisposableHelper.setOnce(streamer, Disposable.fromFuture(f, true));
} else {
var f = executor.submit(() -> {
try {
unit.sleep(delay);
} catch (InterruptedException ex) {
streamer.interrupedSleep(ex);
return null;
}
streamer.run();
return null;
});
DisposableHelper.setOnce(streamer, Disposable.fromFuture(f, true));
}
return streamer;
}

static final class TimerStreamer extends AtomicReference<Disposable> implements Streamer<Long>, Runnable, Disposable {

@Serial
private static final long serialVersionUID = 1738554471573342053L;

int state;

final CompletableFuture<Boolean> waiter = new CompletableFuture<>();

@Override
public void run() {
waiter.complete(true);
}

void interrupedSleep(InterruptedException ex) {
waiter.completeExceptionally(ex);
}

@Override
public @NonNull CompletionStage<Boolean> next() {
if (state == 0) {
state = 1;
return waiter;
}
return NEXT_FALSE;
}

@Override
public @NonNull Long current() {
return 0L;
}

@Override
public @NonNull CompletionStage<Void> finish() {
waiter.complete(false);
lazySet(DisposableHelper.DISPOSED);
return FINISHED;
}

@Override
public void dispose() {
if (DisposableHelper.dispose(this)) {
waiter.completeExceptionally(new CancellationException());
}
}

@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
}
}
20 changes: 19 additions & 1 deletion src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void afterEach(TestInfo info) {
* that usually depend on Thread.sleep consistency.
* @param count the number of times to retry
* @param code the code to run
* @since 4.0.0
*/
public static void withRetry(int count, Action code) {
AssertionError error = null;
Expand All @@ -85,6 +86,7 @@ public static void withRetry(int count, Action code) {
* Don't forget to {@link ExecutorService#submit(Callable)} your work!
* @param call the callback to give the VTE.
* @throws Throwable propagate exceptions
* @since 4.0.0
*/
public static void withVirtual(Consumer<ExecutorService> call) throws Throwable {
try (var exec = new ExecutorIntercept(Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()), false)) {
Expand All @@ -96,6 +98,7 @@ public static void withVirtual(Consumer<ExecutorService> call) throws Throwable
* Execute a call within a virtual thread of the standard virtual thread executor.
* @param call the call to invoke
* @throws Throwable the exception propagated out
* @since 4.0.0
*/
public static void onVirtual(Consumer<ExecutorService> call) throws Throwable {
withVirtual(exec -> exec.submit(() -> {
Expand Down Expand Up @@ -190,6 +193,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti
* Don't forget to {@link ExecutorService#submit(Callable)} your work!
* @param call the callback to give the VTE.
* @throws Throwable propagate exceptions
* @since 4.0.0
*/
public static void withSingleExecutor(Consumer<ScheduledExecutorService> call) throws Throwable {
try (var exec = Executors.newSingleThreadScheduledExecutor()) {
Expand All @@ -203,6 +207,7 @@ public static void withSingleExecutor(Consumer<ScheduledExecutorService> call) t
* Don't forget to {@link ExecutorService#submit(Callable)} your work!
* @param call the callback to give the VTE.
* @throws Throwable propagate exceptions
* @since 4.0.0
*/
public static void withCachedExecutor(Consumer<ExecutorService> call) throws Throwable {
try (var exec = new ExecutorIntercept(Executors.newCachedThreadPool(), false)) {
Expand All @@ -211,7 +216,7 @@ public static void withCachedExecutor(Consumer<ExecutorService> call) throws Thr
}

/**
* Enable thracking of the global errors for the duration of the action.
* Enable tracking of the global errors for the duration of the action.
* @param action the action to run with a list of errors encountered
* @throws Throwable the exception rethrown from the action
*/
Expand All @@ -227,4 +232,17 @@ public static void withErrorTracking(Consumer<List<Throwable>> action) throws Th
public static List<Throwable> trackPluginErrors() {
return TestHelper.trackPluginErrors();
}

/**
* Creates a virtual thread based scheduled executor service for the duration of the call, then
* closes it.
* @param call the call to handle the virtual scheduled executor service
* @throws Throwable in case the callback throws
* @since 4.0.0
*/
public static void withVirtualScheduled(Consumer<? super ScheduledExecutorService> call) throws Throwable {
try(var exec = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())) {
call.accept(exec);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.lang.ref.Cleaner;
import java.util.*;
import java.util.concurrent.*;

import org.junit.jupiter.api.*;

Expand Down
Loading
Loading