diff --git a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java index 67001085b02..7a9a689cfa0 100644 --- a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java +++ b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java @@ -16,13 +16,11 @@ package io.grpc.servlet; -import static com.google.common.base.Preconditions.checkState; import static io.grpc.servlet.ServletServerStream.toHexString; import static java.util.logging.Level.FINE; import static java.util.logging.Level.FINEST; import com.google.common.annotations.VisibleForTesting; -import com.google.errorprone.annotations.CheckReturnValue; import io.grpc.InternalLogId; import io.grpc.servlet.ServletServerStream.ServletTransportState; import java.io.IOException; @@ -59,8 +57,18 @@ final class AsyncServletOutputStreamWriter { * *
There are two threads, the container thread (calling {@code onWritePossible()}) and the * application thread (calling {@code runOrBuffer()}) that read and update the - * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and - * only runOrBuffer() may turn it from true to false. + * {@code writeState}. The state machine has three states: + *
Called from application thread. */ private void runOrBuffer(ActionItem actionItem) throws IOException { - WriteState curState = writeState.get(); - if (curState.readyAndDrained) { // write to the outputStream directly - try { - actionItem.run(); - } catch (IllegalStateException e) { - if (actionItem == flushAction || actionItem == completeAction) { - throw e; + ActionItem itemToWrite = actionItem; + while (true) { + WriteState curState = writeState.get(); + if (curState.state == WriteState.READY_AND_DRAINED) { // write to the outputStream directly + WriteState writingState = new WriteState(WriteState.WRITING); + if (writeState.compareAndSet(curState, writingState)) { + boolean successfulExited = false; + try { + while (true) { + if (itemToWrite != null) { + try { + itemToWrite.run(); + } catch (IllegalStateException e) { + if (itemToWrite == flushAction || itemToWrite == completeAction) { + throw e; + } + writeChain.offer(itemToWrite); + writeState.set(new WriteState(WriteState.NOT_READY_OR_NOT_DRAINED)); + LockSupport.unpark(parkingThread); + successfulExited = true; + return; + } + if (itemToWrite == completeAction) { + writeState.set(new WriteState(WriteState.NOT_READY_OR_NOT_DRAINED)); + LockSupport.unpark(parkingThread); + successfulExited = true; + return; + } + } + + if (isReady.getAsBoolean()) { + itemToWrite = writeChain.poll(); + if (itemToWrite != null) { + continue; + } + WriteState latestState = writeState.get(); + if (latestState.state == WriteState.WRITING) { + if (writeState.compareAndSet( + latestState, new WriteState(WriteState.READY_AND_DRAINED))) { + LockSupport.unpark(parkingThread); + successfulExited = true; + return; + } + // CAS failed, loop again to poll and write + } else { + successfulExited = true; + return; + } + } else { + WriteState latestState = writeState.get(); + if (latestState.state == WriteState.WRITING) { + if (writeState.compareAndSet( + latestState, new WriteState(WriteState.NOT_READY_OR_NOT_DRAINED))) { + LockSupport.unpark(parkingThread); + log.finest("the servlet output stream becomes not ready"); + successfulExited = true; + return; + } + // CAS failed, loop again with itemToWrite = null + itemToWrite = null; + } else { + successfulExited = true; + return; + } + } + } + } finally { + if (!successfulExited) { + writeState.set(new WriteState(WriteState.NOT_READY_OR_NOT_DRAINED)); + LockSupport.unpark(parkingThread); + } + } + } + } else { // NOT_READY_OR_NOT_DRAINED or WRITING + writeChain.offer(itemToWrite); + if (writeState.compareAndSet(curState, new WriteState(curState.state))) { + return; + } + itemToWrite = writeChain.poll(); + if (itemToWrite == null) { + return; } - buffer(actionItem, curState); - return; - } - if (actionItem == completeAction) { - return; - } - if (!isReady.getAsBoolean()) { - boolean successful = - writeState.compareAndSet(curState, curState.withReadyAndDrained(false)); - LockSupport.unpark(parkingThread); - checkState(successful, "Bug: curState is unexpectedly changed by another thread"); - log.finest("the servlet output stream becomes not ready"); } - } else { // buffer to the writeChain - buffer(actionItem, curState); } } - private void buffer(ActionItem actionItem, WriteState curState) throws IOException { - writeChain.offer(actionItem); - if (writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) { - LockSupport.unpark(parkingThread); - } else { - checkState( - writeState.get().readyAndDrained, - "Bug: onWritePossible() should have changed readyAndDrained to true, but not"); - ActionItem lastItem = writeChain.poll(); - if (lastItem != null) { - checkState(lastItem == actionItem, "Bug: lastItem != actionItem"); - runOrBuffer(lastItem); - } - } // state has not changed since - } - /** Write actions, e.g. writeBytes, flush, complete. */ @FunctionalInterface @VisibleForTesting @@ -277,34 +346,16 @@ default void finest(String str, Object...params) {} } private static final class WriteState { + static final int READY_AND_DRAINED = 0; + static final int NOT_READY_OR_NOT_DRAINED = 1; + static final int WRITING = 2; // Active direct write in progress - static final WriteState DEFAULT = new WriteState(false); + static final WriteState DEFAULT = new WriteState(NOT_READY_OR_NOT_DRAINED); - /** - * The servlet output stream is ready and the writeChain is empty. - * - *
readyAndDrained turns from false to true when: - * {@code onWritePossible()} exits while currently there is no more data to write, but the last - * check of {@link javax.servlet.ServletOutputStream#isReady()} is true. - * - *
readyAndDrained turns from true to false when: - * {@code runOrBuffer()} exits while either the action item is written directly to the - * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()} - * right after that returns false, or the action item is buffered into the writeChain. - */ - final boolean readyAndDrained; - - WriteState(boolean readyAndDrained) { - this.readyAndDrained = readyAndDrained; - } + final int state; - /** - * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code - * runOrBuffer()} can set it to false. - */ - @CheckReturnValue - WriteState withReadyAndDrained(boolean readyAndDrained) { - return new WriteState(readyAndDrained); + WriteState(int state) { + this.state = state; } } } diff --git a/servlet/src/test/java/io/grpc/servlet/AsyncServletOutputStreamWriterTest.java b/servlet/src/test/java/io/grpc/servlet/AsyncServletOutputStreamWriterTest.java index e7474078ec9..d3d02568dd1 100644 --- a/servlet/src/test/java/io/grpc/servlet/AsyncServletOutputStreamWriterTest.java +++ b/servlet/src/test/java/io/grpc/servlet/AsyncServletOutputStreamWriterTest.java @@ -23,7 +23,6 @@ import io.grpc.servlet.AsyncServletOutputStreamWriter.Log; import java.io.IOException; import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -287,10 +286,11 @@ private static void forceReadyAndDrained(AsyncServletOutputStreamWriter writer) AtomicReference