Skip to content

Commit 5b8203c

Browse files
feat: Added async Scopes
1 parent c21603b commit 5b8203c

File tree

3 files changed

+132
-0
lines changed

3 files changed

+132
-0
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package gentle.async;
2+
3+
import gentle.Error;
4+
import lombok.NonNull;
5+
6+
public record AsyncError(@NonNull Throwable throwable) implements Error {
7+
@Override
8+
public int code() {
9+
return 1;
10+
}
11+
12+
@Override
13+
public @NonNull String message() {
14+
return throwable.getMessage();
15+
}
16+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package gentle.async;
2+
3+
import lombok.AccessLevel;
4+
import lombok.NonNull;
5+
import lombok.RequiredArgsConstructor;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.concurrent.Callable;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
13+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
14+
public final class Scope implements AutoCloseable {
15+
private final ExecutorService executor;
16+
private final List<Task<?>> tasks = new ArrayList<>();
17+
private volatile boolean closed = false;
18+
19+
public static Scope open() {
20+
return new Scope(Executors.newCachedThreadPool());
21+
}
22+
23+
public static Scope open(@NonNull ExecutorService executor) {
24+
return new Scope(executor);
25+
}
26+
27+
public synchronized <T> Task<T> async(@NonNull Callable<T> supplier) {
28+
if (closed) throw new IllegalStateException("Scope already closed");
29+
Task<T> task = new Task<>(executor.submit(supplier));
30+
tasks.add(task);
31+
return task;
32+
}
33+
34+
public synchronized int size() {
35+
return tasks.size();
36+
}
37+
38+
@Override
39+
public void close() throws Exception {
40+
List<Throwable> errors = new ArrayList<>();
41+
synchronized (this) {
42+
closed = true;
43+
for (var t : tasks) {
44+
try {
45+
t.cancel();
46+
} catch (Throwable ex) {
47+
errors.add(ex);
48+
}
49+
}
50+
executor.shutdownNow();
51+
}
52+
if (errors.isEmpty()) return;
53+
var primary = errors.getFirst();
54+
for (int i = 1; i < errors.size(); i++) {
55+
primary.addSuppressed(errors.get(i));
56+
}
57+
if (primary instanceof Exception e) throw e;
58+
if (primary instanceof Error err) throw err;
59+
throw new RuntimeException(primary);
60+
}
61+
62+
@Override
63+
public String toString() {
64+
return "Scope[size=" + size() + ", closed=" + closed + "]";
65+
}
66+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package gentle.async;
2+
3+
import gentle.Result;
4+
import lombok.AccessLevel;
5+
import lombok.NonNull;
6+
import lombok.RequiredArgsConstructor;
7+
8+
import java.util.concurrent.CancellationException;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.Future;
11+
import java.util.function.Function;
12+
13+
import static gentle.Result.err;
14+
import static gentle.Result.ok;
15+
16+
@RequiredArgsConstructor (access = AccessLevel.PACKAGE)
17+
public final class Task<T> {
18+
private final Future<T> future;
19+
20+
public void cancel() {
21+
future.cancel(true);
22+
}
23+
24+
public boolean isCancelled() {
25+
return future.isCancelled();
26+
}
27+
28+
public boolean isDone() {
29+
return future.isDone();
30+
}
31+
32+
public Result<T, AsyncError> await() {
33+
try {
34+
return ok(future.get());
35+
} catch (CancellationException | ExecutionException exception) {
36+
return err(new AsyncError(exception));
37+
} catch (InterruptedException exception) {
38+
Thread.currentThread().interrupt();
39+
return err(new AsyncError(exception));
40+
}
41+
}
42+
43+
public <U> Result<U, AsyncError> map(@NonNull Function<? super T, ? extends U> f) {
44+
return await().map(f);
45+
}
46+
47+
public <U> Result<U, AsyncError> flatMap(@NonNull Function<? super T, ? extends Result<U, AsyncError>> f) {
48+
return await().flatMap(f);
49+
}
50+
}

0 commit comments

Comments
 (0)