Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3f87103
servlet: fix write when not ready in AsyncServletOutputStreamWriter
mgustimz May 2, 2026
60e9f16
servlet: fix write when not ready, narrow to writeBytes only
mgustimz May 5, 2026
44ee627
servlet: fix type comparison in runOrBuffer
mgustimz May 6, 2026
55e6629
Add AsyncServletOutputStreamWriterTest with mock isReady supplier
mgustimz May 6, 2026
3ebac15
Remove unused AtomicInteger in AsyncServletOutputStreamWriterTest
mgustimz May 7, 2026
1ef9eb8
Fix test and log message per Copilot review
mgustimz May 7, 2026
3f81019
Split flush and writeBytes into separate branches
mgustimz May 7, 2026
d79bdb6
Clean up test per Copilot review
mgustimz May 7, 2026
c73e67a
Add test validating write without onWritePossible is buffered
mgustimz May 7, 2026
e898755
Address Copilot review items from #4251754607
mgustimz May 8, 2026
8c217f5
Replace third test to cover flush-specific behavior
mgustimz May 8, 2026
ccb13ee
Fix flush test per Copilot review
mgustimz May 13, 2026
1a07b84
servlet: fix flush test assertion to account for buffered writeBytes
mgustimz May 25, 2026
5ec98e1
servlet: fix comment and add test for consecutive flush behavior
mgustimz May 25, 2026
7924aae
fix(servlet): address review comments in AsyncServletOutputStreamWriter
mgustimz May 26, 2026
ca8408e
fix(servlet): update log message for clarity and improve test imports
mgustimz May 26, 2026
512680b
fix(servlet): correct unpark placement and format action items in tests
mgustimz May 26, 2026
09c4a02
fix(servlet): reorder checkState calls and improve log message for wr…
mgustimz May 26, 2026
7ffb701
Merge branch 'grpc:master' into fix/12723-servlet-clean
mgustimz May 28, 2026
33b5258
servlet: avoid Tomcat write stall when still ready
mgustimz Jun 5, 2026
8b68f2b
servlet: preserve Undertow direct write flow
mgustimz Jun 5, 2026
835a5de
servlet: improve writer test coverage
mgustimz Jun 5, 2026
dfe0f33
servlet: cover writer race recovery paths
mgustimz Jun 5, 2026
e600550
servlet: fix reflective rethrow in writer test
mgustimz Jun 5, 2026
6c6a2e4
servlet: fix writer test import order
mgustimz Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,15 @@ private void assureReadyAndDrainedTurnsFalse() {
private void runOrBuffer(ActionItem actionItem) throws IOException {
WriteState curState = writeState.get();
if (curState.readyAndDrained) { // write to the outputStream directly
actionItem.run();
try {
actionItem.run();
} catch (IllegalStateException e) {
if (actionItem == flushAction || actionItem == completeAction) {
throw e;
}
buffer(actionItem, curState);
return;
}
if (actionItem == completeAction) {
return;
}
Expand All @@ -230,20 +238,26 @@ private void runOrBuffer(ActionItem actionItem) throws IOException {
log.finest("the servlet output stream becomes not ready");
}
} else { // buffer to the writeChain
writeChain.offer(actionItem);
if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
checkState(
writeState.get().readyAndDrained,
"Bug: onWritePossible() should have changed readyAndDrained to true, but not");
ActionItem lastItem = writeChain.poll();
if (lastItem != null) {
checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
runOrBuffer(lastItem);
}
} // state has not changed since
buffer(actionItem, curState);
}
}

private void buffer(ActionItem actionItem, WriteState curState) throws IOException {
writeChain.offer(actionItem);
if (writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
LockSupport.unpark(parkingThread);
} else {
checkState(
writeState.get().readyAndDrained,
"Bug: onWritePossible() should have changed readyAndDrained to true, but not");
ActionItem lastItem = writeChain.poll();
if (lastItem != null) {
checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
runOrBuffer(lastItem);
}
} // state has not changed since
}

/** Write actions, e.g. writeBytes, flush, complete. */
@FunctionalInterface
@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
/*
* Copyright 2026 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.servlet;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import io.grpc.servlet.AsyncServletOutputStreamWriter.ActionItem;
import io.grpc.servlet.AsyncServletOutputStreamWriter.Log;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
Comment thread
mgustimz marked this conversation as resolved.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Unit test for {@link AsyncServletOutputStreamWriter} with a mock isReady supplier. */
@RunWith(JUnit4.class)
public class AsyncServletOutputStreamWriterTest {

@Test
public void writeBytes_notReadyException_buffersUntilOnWritePossible() throws IOException {
List<String> actions = new ArrayList<>();
AtomicBoolean rejectWrites = new AtomicBoolean(true);

BiFunction<byte[], Integer, ActionItem> writeAction =
(bytes, numBytes) -> () -> {
if (rejectWrites.get()) {
throw new IllegalStateException("not ready");
}
actions.add("write");
};
ActionItem flushAction = () -> { };
ActionItem completeAction = () -> { };

AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
writeAction, flushAction, completeAction, () -> true, new Log() {});

writer.onWritePossible();

writer.writeBytes(new byte[]{1}, 1);

assertEquals("Write should be buffered until onWritePossible", 0, actions.size());

rejectWrites.set(false);
writer.onWritePossible();
assertEquals("Buffered write should drain after onWritePossible", 1, actions.size());
}

@Test
public void writeBytes_consecutiveWithIsReadyTrue_allGoDirect() throws IOException {
List<byte[]> writtenData = new ArrayList<>();

BiFunction<byte[], Integer, ActionItem> writeAction =
(bytes, numBytes) -> () -> writtenData.add(Arrays.copyOf(bytes, numBytes));
ActionItem flushAction = () -> { };
ActionItem completeAction = () -> { };

AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
writeAction, flushAction, completeAction, () -> true, new Log() {});

writer.onWritePossible();

for (int i = 0; i < 5; i++) {
writer.writeBytes(new byte[]{(byte) i}, 1);
}

assertEquals("All writes should complete", 5, writtenData.size());
}

@Test
public void writeBytes_isReadyFalseAfterWrite_buffersNextWrite() throws IOException {
List<byte[]> writtenData = new ArrayList<>();
AtomicBoolean isReady = new AtomicBoolean(true);

BiFunction<byte[], Integer, ActionItem> writeAction =
(bytes, numBytes) -> () -> {
writtenData.add(Arrays.copyOf(bytes, numBytes));
isReady.set(false);
};
ActionItem flushAction = () -> { };
ActionItem completeAction = () -> { };

AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
writeAction, flushAction, completeAction, isReady::get, new Log() {});

writer.onWritePossible();

writer.writeBytes(new byte[]{1}, 1);
writer.writeBytes(new byte[]{2}, 1);
assertEquals("Second write should be buffered", 1, writtenData.size());

isReady.set(true);
writer.onWritePossible();
assertEquals("Buffered write should drain", 2, writtenData.size());
}

@Test
public void flush_isReadyFalse_buffersUntilOnWritePossible() throws IOException {
List<String> actions = new ArrayList<>();
AtomicBoolean isReady = new AtomicBoolean(true);

BiFunction<byte[], Integer, ActionItem> writeAction =
(bytes, numBytes) -> () -> actions.add("write");
ActionItem flushAction = () -> {
actions.add("flush");
isReady.set(false);
};
ActionItem completeAction = () -> { };

AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
writeAction, flushAction, completeAction, isReady::get, new Log() {});

writer.onWritePossible();

writer.flush();
assertEquals("First flush should execute directly", 1, actions.size());

writer.flush();
assertEquals("Second flush should be buffered", 1, actions.size());

isReady.set(true);
writer.onWritePossible();
assertEquals("Both flushes should complete after onWritePossible", 2, actions.size());
}

@Test
public void flush_consecutiveWithIsReadyTrue_bothGoDirect() throws IOException {
List<String> actions = new ArrayList<>();

BiFunction<byte[], Integer, ActionItem> writeAction =
(bytes, numBytes) -> () -> actions.add("write");
ActionItem flushAction = () -> actions.add("flush");
ActionItem completeAction = () -> { };

AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
writeAction, flushAction, completeAction, () -> true, new Log() {});

writer.onWritePossible();

writer.flush();
writer.flush();

assertEquals("Both flushes should execute directly", 2, actions.size());
}

@Test
public void complete_readyAndDrained_runsDirectly() throws IOException {
AtomicInteger completeCount = new AtomicInteger();

AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
(bytes, numBytes) -> () -> { },
() -> { },
completeCount::incrementAndGet,
() -> true,
new Log() {});

writer.onWritePossible();

writer.complete();

assertEquals(1, completeCount.get());
}

@Test
public void complete_notReadyAndDrained_buffersUntilOnWritePossible() throws IOException {
AtomicInteger completeCount = new AtomicInteger();

AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
(bytes, numBytes) -> () -> { },
() -> { },
completeCount::incrementAndGet,
() -> true,
new Log() {});

writer.complete();
assertEquals(0, completeCount.get());

writer.onWritePossible();
assertEquals(1, completeCount.get());
}

@Test
public void writeBytes_onWritePossibleWinsRace_drainsBufferedWrite() throws Exception {
List<String> actions = new ArrayList<>();
AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
(bytes, numBytes) -> () -> actions.add("write"),
() -> {
},
() -> {
},
() -> true,
new Log() {
});
replaceWriteChain(writer, new ConcurrentLinkedQueue<ActionItem>() {
@Override
public boolean offer(ActionItem actionItem) {
boolean offered = super.offer(actionItem);
try {
writer.onWritePossible();
} catch (IOException e) {
throw new AssertionError(e);
}
return offered;
}
});

writer.writeBytes(new byte[]{1}, 1);

assertEquals(1, actions.size());
}

@Test
public void writeBytes_readyStateWinsRace_retriesWrite() throws Exception {
List<String> actions = new ArrayList<>();
AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
(bytes, numBytes) -> () -> actions.add("write"),
() -> {
},
() -> {
},
() -> true,
new Log() {
});
replaceWriteChain(writer, new ConcurrentLinkedQueue<ActionItem>() {
@Override
public boolean offer(ActionItem actionItem) {
boolean offered = super.offer(actionItem);
try {
forceReadyAndDrained(writer);
} catch (ReflectiveOperationException e) {
throw new LinkageError(e.getMessage(), e);
}
return offered;
}
});

writer.writeBytes(new byte[]{1}, 1);

assertEquals(1, actions.size());
}

private static void replaceWriteChain(
AsyncServletOutputStreamWriter writer, ConcurrentLinkedQueue<ActionItem> writeChain)
throws ReflectiveOperationException {
Field writeChainField = AsyncServletOutputStreamWriter.class.getDeclaredField("writeChain");
writeChainField.setAccessible(true);
writeChainField.set(writer, writeChain);
}

private static void forceReadyAndDrained(AsyncServletOutputStreamWriter writer)
throws ReflectiveOperationException {
Field writeStateField = AsyncServletOutputStreamWriter.class.getDeclaredField("writeState");
writeStateField.setAccessible(true);
@SuppressWarnings("unchecked")
AtomicReference<Object> writeState =
(AtomicReference<Object>) writeStateField.get(writer);
Object curState = writeState.get();
Method withReadyAndDrained = curState.getClass().getDeclaredMethod(
"withReadyAndDrained", boolean.class);
withReadyAndDrained.setAccessible(true);
writeState.set(withReadyAndDrained.invoke(curState, true));
}

@Test
public void flush_notReadyException_isPropagated() throws IOException {
AsyncServletOutputStreamWriter writer =
new AsyncServletOutputStreamWriter(
(bytes, numBytes) -> () -> { },
() -> {
throw new IllegalStateException("not ready");
},
() -> { },
() -> true,
new Log() {});

writer.onWritePossible();

assertThrows(IllegalStateException.class, writer::flush);
}
}
Loading