Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/internal/quic/quic.js
Original file line number Diff line number Diff line change
Expand Up @@ -2543,8 +2543,8 @@ class QuicStream {
inner.ontrailers = undefined;
inner.oninfo = undefined;
inner.onwanttrailers = undefined;
inner.headers = undefined;
inner.pendingTrailers = undefined;
// Do not reset headers here, this is still important information
// the same applies for pendingTrailers
this.#handle = undefined;
if (inner.fileHandle !== undefined) {
// Close the FileHandle that was used as a body source. The close
Expand Down
43 changes: 23 additions & 20 deletions src/quic/http3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -921,24 +921,25 @@ class Http3ApplicationImpl final : public Session::Application {
stream->ReceiveData(nullptr, 0, flags);
}

void OnStopSending(stream_id id, error_code app_error_code) {
void OnSendStopSending(stream_id id, error_code app_error_code) {
auto stream = session().FindStream(id);
if (!stream) [[unlikely]]
return;
Debug(&session(),
"HTTP/3 application received stop sending for stream %" PRIi64,
"HTTP/3 application should send stop sending for stream %" PRIi64,
id);
stream->ReceiveStopSending(QuicError::ForApplication(app_error_code));
stream->SendStopSending(app_error_code);
}

void OnResetStream(stream_id id, error_code app_error_code) {
void OnDoResetStream(stream_id id, error_code app_error_code) {
auto stream = session().FindStream(id);
if (!stream) [[unlikely]]
return;
Debug(&session(),
"HTTP/3 application received reset stream for stream %" PRIi64,
"HTTP/3 application received a request to reset stream for stream "
"%" PRIi64,
id);
stream->ReceiveStreamReset(0, QuicError::ForApplication(app_error_code));
stream->DoStreamReset(app_error_code);
}

void OnShutdown(stream_id id) {
Expand Down Expand Up @@ -1318,29 +1319,31 @@ class Http3ApplicationImpl final : public Session::Application {
return NGTCP2_SUCCESS;
}

static int on_stop_sending(nghttp3_conn* conn,
stream_id id,
error_code app_error_code,
void* conn_user_data,
void* stream_user_data) {
static int on_send_stop_sending(nghttp3_conn* conn,
stream_id id,
error_code app_error_code,
void* conn_user_data,
void* stream_user_data) {
// this callback asks the app side to send a stop sending
NGHTTP3_CALLBACK_SCOPE(app);
if (app.is_control_stream(id)) [[unlikely]] {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
app.OnStopSending(id, app_error_code);
app.OnSendStopSending(id, app_error_code);
return NGTCP2_SUCCESS;
}

static int on_reset_stream(nghttp3_conn* conn,
stream_id id,
error_code app_error_code,
void* conn_user_data,
void* stream_user_data) {
static int on_do_reset_stream(nghttp3_conn* conn,
stream_id id,
error_code app_error_code,
void* conn_user_data,
void* stream_user_data) {
// this callback ask the app side to do a reset stream
NGHTTP3_CALLBACK_SCOPE(app);
if (app.is_control_stream(id)) [[unlikely]] {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
app.OnResetStream(id, app_error_code);
app.OnDoResetStream(id, app_error_code);
return NGTCP2_SUCCESS;
}

Expand Down Expand Up @@ -1394,9 +1397,9 @@ class Http3ApplicationImpl final : public Session::Application {
on_begin_trailers,
on_receive_trailer,
on_end_trailers,
on_stop_sending,
on_send_stop_sending,
on_end_stream,
on_reset_stream,
on_do_reset_stream,
on_shutdown,
nullptr, // recv_settings (deprecated)
on_receive_origin,
Expand Down
3 changes: 2 additions & 1 deletion src/quic/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2797,7 +2797,8 @@ void Session::RemoveStream(stream_id id) {
// then we can proceed to finishing the close now. Note that the
// expectation is that the session will be destroyed once FinishClose
// returns.
if (impl_->state()->closing && impl_->state()->graceful_close) {
if (impl_->state()->closing && impl_->state()->graceful_close &&
impl_->streams_.size() == 0) {
FinishClose();
CHECK(is_destroyed());
}
Expand Down
56 changes: 32 additions & 24 deletions src/quic/streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,15 +487,7 @@ struct Stream::Impl {
code = args[0].As<BigInt>()->Uint64Value(&unused);
}

stream->EndReadable();

if (!stream->is_pending()) {
// If the stream is a local unidirectional there's nothing to do here.
if (stream->is_local_unidirectional()) return;
stream->NotifyReadableEnded(code);
} else {
stream->pending_close_read_code_ = code;
}
stream->SendStopSending(code);
}

// Sends a reset stream to the peer to tell it we will not be sending any
Expand All @@ -512,21 +504,7 @@ struct Stream::Impl {
code = args[0].As<BigInt>()->Uint64Value(&lossless);
}

if (stream->state()->reset == 1) return;

stream->EndWritable();
// We can release our outbound here now. Since the stream is being reset
// on the ngtcp2 side, we do not need to keep any of the data around
// waiting for acknowledgement that will never come.
stream->outbound_.reset();
stream->state()->reset = 1;

if (!stream->is_pending()) {
if (stream->is_remote_unidirectional()) return;
stream->NotifyWritableEnded(code);
} else {
stream->pending_close_write_code_ = code;
}
stream->DoStreamReset(code);
}

JS_METHOD(SetPriority) {
Expand Down Expand Up @@ -1823,6 +1801,36 @@ void Stream::ReceiveStreamReset(uint64_t final_size, QuicError error) {
EmitReset(error);
}

void Stream::DoStreamReset(error_code code) {
if (state()->reset == 1) return;

EndWritable();
// We can release our outbound here now. Since the stream is being reset
// on the ngtcp2 side, we do not need to keep any of the data around
// waiting for acknowledgement that will never come.
outbound_.reset();
state()->reset = 1;

if (!is_pending()) {
if (is_remote_unidirectional()) return;
NotifyWritableEnded(code);
} else {
pending_close_write_code_ = code;
}
}

void Stream::SendStopSending(error_code code) {
EndReadable();

if (!is_pending()) {
// If the stream is a local unidirectional there's nothing to do here.
if (is_local_unidirectional()) return;
NotifyReadableEnded(code);
} else {
pending_close_read_code_ = code;
}
}

// ============================================================================

void Stream::EmitBlocked() {
Expand Down
11 changes: 11 additions & 0 deletions src/quic/streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,17 @@ class Stream final : public AsyncWrap,
void ReceiveStopSending(QuicError error);
void ReceiveStreamReset(uint64_t final_size, QuicError error);

// Sends a reset stream to the peer to tell it we will not be sending any
// more data for this stream. This has the effect of shutting down the
// writable side of the stream for this peer. Any data that is held in the
// outbound queue will be dropped. The stream may still be readable.
void DoStreamReset(error_code code);

// Tells the peer to stop sending data for this stream. This has the effect
// of shutting down the readable side of the stream for this peer. Any data
// that has already been received is still readable.
void SendStopSending(error_code code);

// Currently, only HTTP/3 streams support headers. These methods are here
// to support that. They are not used when using any other QUIC application.

Expand Down
Loading