Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .autover/changes/f6d458e7-d276-40d2-b1ab-d0c4327e396f.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.Core",
"Type": "Patch",
"ChangelogMessages": [
"Fix type load issue due to Lambda Response Streaming. https://github.com/aws/aws-lambda-dotnet/issues/2430"
]
},
{
"Name": "Amazon.Lambda.RuntimeSupport",
"Type": "Patch",
"ChangelogMessages": [
"Fix type load issue due to Lambda Response Streaming. https://github.com/aws/aws-lambda-dotnet/issues/2430"
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#if NET8_0_OR_GREATER

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Lambda.Core.ResponseStreaming
{
internal class ImplLambdaResponseStream : ILambdaResponseStream
{
private readonly Delegates _innerDelegates;

internal ImplLambdaResponseStream(Delegates innerDelegates)
{
_innerDelegates = innerDelegates ?? throw new ArgumentNullException(nameof(innerDelegates));
_innerDelegates.WriteAsync = innerDelegates.WriteAsync ?? throw new ArgumentNullException(nameof(innerDelegates.WriteAsync));
_innerDelegates.BytesWritten = innerDelegates.BytesWritten ?? throw new ArgumentNullException(nameof(innerDelegates.BytesWritten));
_innerDelegates.HasError = innerDelegates.HasError ?? throw new ArgumentNullException(nameof(innerDelegates.HasError));
_innerDelegates.Dispose = innerDelegates.Dispose ?? throw new ArgumentNullException(nameof(innerDelegates.Dispose));
}

/// <inheritdoc/>
public long BytesWritten => _innerDelegates.BytesWritten();

/// <inheritdoc/>
public bool HasError => _innerDelegates.HasError();

/// <inheritdoc/>
public void Dispose() => _innerDelegates.Dispose();

/// <inheritdoc/>
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) => _innerDelegates.WriteAsync(buffer, offset, count, cancellationToken);

internal class Delegates
{
internal Func<byte[], int, int, CancellationToken, Task> WriteAsync { get; set; }
internal Func<long> BytesWritten { get; set; }
internal Func<bool> HasError { get; set; }
internal Action Dispose { get; set; }
}
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core.ResponseStreaming;
using Amazon.Lambda.RuntimeSupport.Client.ResponseStreaming;
#pragma warning disable CA2252
Expand All @@ -26,34 +24,22 @@ internal class ResponseStreamLambdaCoreInitializerIsolated
internal static void InitializeCore()
{
#if !ANALYZER_UNIT_TESTS // This precompiler directive is used to avoid the unit tests from needing a dependency on Amazon.Lambda.Core.
Func<byte[], ILambdaResponseStream> factory = (byte[] prelude) => new ImplLambdaResponseStream(ResponseStreamFactory.CreateStream(prelude));
LambdaResponseStreamFactory.SetLambdaResponseStream(factory);
#endif
}

/// <summary>
/// Implements the <see cref="ILambdaResponseStream"/> interface by wrapping a <see cref="ResponseStream"/>. This is used to connect the internal response streaming implementation to the public interfaces in Amazon.Lambda.Core.
/// </summary>
internal class ImplLambdaResponseStream : ILambdaResponseStream
{
private readonly ResponseStream _innerStream;

internal ImplLambdaResponseStream(ResponseStream innerStream)
Func<byte[], ILambdaResponseStream> factory = (byte[] prelude) =>
{
_innerStream = innerStream;
}

/// <inheritdoc/>
public long BytesWritten => _innerStream.BytesWritten;
var responseStream = ResponseStreamFactory.CreateStream(prelude);
var delegates = new ImplLambdaResponseStream.Delegates
{
WriteAsync = responseStream.WriteAsync,
BytesWritten = () => responseStream.BytesWritten,
HasError = () => responseStream.HasError,
Dispose = () => responseStream.Dispose()
};

/// <inheritdoc/>
public bool HasError => _innerStream.HasError;
return new ImplLambdaResponseStream(delegates);
};

/// <inheritdoc/>
public void Dispose() => _innerStream.Dispose();

/// <inheritdoc/>
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) => _innerStream.WriteAsync(buffer, offset, count, cancellationToken);
LambdaResponseStreamFactory.SetLambdaResponseStream(factory);
#endif
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2397,7 +2397,6 @@ private static DiagnosticResult[] GetExpectedRuntimeSupportDiagnostics()
// These are here because the internalvisibleto attribute isn't included in test compilations, so these types are inaccessible.
DiagnosticResult.CompilerError("CS0117").WithSpan(snapFile, 13, 34, 13, 71).WithArguments("Amazon.Lambda.Core.SnapshotRestore", "CopyBeforeSnapshotCallbacksToRegistry"),
DiagnosticResult.CompilerError("CS0117").WithSpan(snapFile, 14, 34, 14, 69).WithArguments("Amazon.Lambda.Core.SnapshotRestore", "CopyAfterRestoreCallbacksToRegistry"),
DiagnosticResult.CompilerError("CS0122").WithSpan($"Amazon.Lambda.RuntimeSupport{Path.DirectorySeparatorChar}Bootstrap{Path.DirectorySeparatorChar}ResponseStreaming{Path.DirectorySeparatorChar}ResponseStreamLambdaCoreInitializerIsolated.cs", 37, 51, 37, 72).WithArguments("Amazon.Lambda.Core.ResponseStreaming.ILambdaResponseStream"),
DiagnosticResult.CompilerError("CS0117").WithSpan($"Amazon.Lambda.RuntimeSupport{Path.DirectorySeparatorChar}Helpers{Path.DirectorySeparatorChar}Logging{Path.DirectorySeparatorChar}ConfigureJsonLogMessageFormatterIsolated.cs", 13, 45, 13, 80).WithArguments("Amazon.Lambda.Core.LambdaLogger", "SetConfigureStructuredLoggingAction"),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
using System;
using System.Diagnostics;
using System.IO;
using Amazon.Lambda.Core.ResponseStreaming;
using Amazon.Lambda.RuntimeSupport.Client.ResponseStreaming;
using Xunit;

namespace Amazon.Lambda.RuntimeSupport.UnitTests
Expand Down Expand Up @@ -88,5 +90,18 @@ public static string GetAllMessages(Exception e)
return writer.ToString();
}
}

internal static ImplLambdaResponseStream ConvertToImplLambdaResponseStream(ResponseStream responseStream)
{
var implStream = new ImplLambdaResponseStream(new ImplLambdaResponseStream.Delegates
{
BytesWritten = () => responseStream.BytesWritten,
HasError = () => responseStream.HasError,
Dispose = () => responseStream.Dispose(),
WriteAsync = responseStream.WriteAsync
});

return implStream;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ public async Task ThreadPoolStarvation_BlockingHandlers_AllInvocationsDequeued()

var testRuntimeApiClient = new TestMultiConcurrencyRuntimeApiClient(environmentVariables, invocationEvents);

// Use a thread-safe counter to track dequeued invocations
int dequeuedCount = 0;
var allDequeuedEvent = new ManualResetEventSlim(false);

// Wrap the test client to track dequeue operations in a thread-safe manner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public class LambdaResponseStreamTests
var output = new MemoryStream();
await inner.SetHttpOutputStreamAsync(output);

var implStream = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var implStream = Common.ConvertToImplLambdaResponseStream(inner);
var lambdaStream = new LambdaResponseStream(implStream);
return (lambdaStream, output);
}
Expand All @@ -176,7 +176,7 @@ public class LambdaResponseStreamTests
public void LambdaResponseStream_IsStreamSubclass()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.IsAssignableFrom<Stream>(stream);
Expand All @@ -186,7 +186,7 @@ public void LambdaResponseStream_IsStreamSubclass()
public void CanWrite_IsTrue()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.True(stream.CanWrite);
Expand All @@ -196,7 +196,7 @@ public void CanWrite_IsTrue()
public void CanRead_IsFalse()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.False(stream.CanRead);
Expand All @@ -206,7 +206,7 @@ public void CanRead_IsFalse()
public void CanSeek_IsFalse()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.False(stream.CanSeek);
Expand All @@ -216,7 +216,7 @@ public void CanSeek_IsFalse()
public void Read_ThrowsNotImplementedException()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.Throws<NotImplementedException>(() => stream.Read(new byte[1], 0, 1));
Expand All @@ -226,7 +226,7 @@ public void Read_ThrowsNotImplementedException()
public void ReadAsync_ThrowsNotImplementedException()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

// ReadAsync throws synchronously (not async) — capture the thrown task
Expand All @@ -239,7 +239,7 @@ public void ReadAsync_ThrowsNotImplementedException()
public void Seek_ThrowsNotImplementedException()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.Throws<NotImplementedException>(() => stream.Seek(0, SeekOrigin.Begin));
Expand All @@ -249,7 +249,7 @@ public void Seek_ThrowsNotImplementedException()
public void Position_Get_ThrowsNotSupportedException()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.Throws<NotSupportedException>(() => _ = stream.Position);
Expand All @@ -259,7 +259,7 @@ public void Position_Get_ThrowsNotSupportedException()
public void Position_Set_ThrowsNotSupportedException()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.Throws<NotSupportedException>(() => stream.Position = 0);
Expand All @@ -269,7 +269,7 @@ public void Position_Set_ThrowsNotSupportedException()
public void SetLength_ThrowsNotSupportedException()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var stream = new LambdaResponseStream(impl);

Assert.Throws<NotSupportedException>(() => stream.SetLength(100));
Expand Down Expand Up @@ -342,7 +342,7 @@ public async Task WriteAsync_DelegatesToInnerResponseStream()
var output = new MemoryStream();
await inner.SetHttpOutputStreamAsync(output);

var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
var data = new byte[] { 1, 2, 3 };

await impl.WriteAsync(data, 0, data.Length);
Expand All @@ -357,7 +357,7 @@ public async Task BytesWritten_ReflectsInnerStreamBytesWritten()
var output = new MemoryStream();
await inner.SetHttpOutputStreamAsync(output);

var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);
await impl.WriteAsync(new byte[7], 0, 7);

Assert.Equal(7, impl.BytesWritten);
Expand All @@ -367,7 +367,7 @@ public async Task BytesWritten_ReflectsInnerStreamBytesWritten()
public void HasError_InitiallyFalse()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);

Assert.False(impl.HasError);
}
Expand All @@ -378,7 +378,7 @@ public void HasError_TrueAfterReportError()
var inner = new ResponseStream(Array.Empty<byte>());
inner.ReportError(new Exception("test"));

var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);

Assert.True(impl.HasError);
}
Expand All @@ -387,7 +387,7 @@ public void HasError_TrueAfterReportError()
public void Dispose_DisposesInnerStream()
{
var inner = new ResponseStream(Array.Empty<byte>());
var impl = new ResponseStreamLambdaCoreInitializerIsolated.ImplLambdaResponseStream(inner);
var impl = Common.ConvertToImplLambdaResponseStream(inner);

// Should not throw
impl.Dispose();
Expand Down
Loading