WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 3704d9a

Browse files
committed
Handle trailer headers and unsigned aws chunked payloads
1 parent 2ab4bb4 commit 3704d9a

File tree

10 files changed

+691
-101
lines changed

10 files changed

+691
-101
lines changed

trino-aws-proxy-spi/src/main/java/io/trino/aws/proxy/spi/rest/RequestContent.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
*/
1414
package io.trino.aws.proxy.spi.rest;
1515

16+
import com.google.common.collect.ImmutableList;
17+
1618
import java.io.InputStream;
19+
import java.util.List;
1720
import java.util.Optional;
1821

1922
@FunctionalInterface
@@ -27,7 +30,9 @@ enum ContentType
2730
STANDARD,
2831
W3C_CHUNKED,
2932
AWS_CHUNKED,
33+
AWS_CHUNKED_UNSIGNED,
3034
AWS_CHUNKED_IN_W3C_CHUNKED,
35+
AWS_CHUNKED_IN_W3C_CHUNKED_UNSIGNED,
3136
}
3237

3338
default ContentType contentType()
@@ -48,5 +53,10 @@ default Optional<Integer> contentLength()
4853
return Optional.empty();
4954
}
5055

56+
default List<String> trailerHeaders()
57+
{
58+
return ImmutableList.of();
59+
}
60+
5161
Optional<InputStream> inputStream();
5262
}

trino-aws-proxy/src/main/java/io/trino/aws/proxy/server/rest/AwsChunkedInputStream.java

Lines changed: 77 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
package io.trino.aws.proxy.server.rest;
1515

1616
import com.google.common.base.Splitter;
17+
import com.google.common.collect.ImmutableList;
1718
import io.trino.aws.proxy.spi.signing.ChunkSigningSession;
1819
import jakarta.ws.rs.WebApplicationException;
1920

2021
import java.io.IOException;
2122
import java.io.InputStream;
23+
import java.nio.charset.StandardCharsets;
2224
import java.util.List;
2325
import java.util.Map;
2426
import java.util.Optional;
@@ -30,7 +32,7 @@ class AwsChunkedInputStream
3032
extends InputStream
3133
{
3234
private final InputStream delegate;
33-
private final ChunkSigningSession chunkSigningSession;
35+
private final Optional<ChunkSigningSession> chunkSigningSession;
3436

3537
private enum State
3638
{
@@ -44,12 +46,14 @@ private enum State
4446
private int bytesRemainingInChunk;
4547
private int bytesAccountedFor;
4648
private final int decodedContentLength;
49+
private final List<String> trailerHeaders;
4750

48-
AwsChunkedInputStream(InputStream delegate, ChunkSigningSession chunkSigningSession, int decodedContentLength)
51+
AwsChunkedInputStream(InputStream delegate, Optional<ChunkSigningSession> chunkSigningSession, int decodedContentLength, List<String> trailerHeaders)
4952
{
5053
this.delegate = requireNonNull(delegate, "delegate is null");
5154
this.chunkSigningSession = requireNonNull(chunkSigningSession, "chunkSigningSession is null");
5255
this.decodedContentLength = decodedContentLength;
56+
this.trailerHeaders = requireNonNull(ImmutableList.copyOf(trailerHeaders), "trailerHeaders is null");
5357
}
5458

5559
@Override
@@ -65,7 +69,7 @@ public int read()
6569
throw new WebApplicationException("Unexpected end of stream", BAD_REQUEST);
6670
}
6771

68-
chunkSigningSession.write((byte) (i & 0xff));
72+
chunkSigningSession.ifPresent(chunkSigningSession -> chunkSigningSession.write((byte) (i & 0xff)));
6973
updateBytesRemaining(1);
7074

7175
return i;
@@ -86,7 +90,7 @@ public int read(byte[] b, int off, int len)
8690
throw new WebApplicationException("Unexpected end of stream", BAD_REQUEST);
8791
}
8892

89-
chunkSigningSession.write(b, off, count);
93+
chunkSigningSession.ifPresent(chunkSigningSession -> chunkSigningSession.write(b, off, count));
9094
updateBytesRemaining(count);
9195

9296
return count;
@@ -155,9 +159,6 @@ private void nextChunk()
155159
boolean success = false;
156160
do {
157161
List<String> parts = Splitter.on(';').trimResults().limit(2).splitToList(header);
158-
if (parts.size() != 2) {
159-
break;
160-
}
161162

162163
int chunkSize;
163164
try {
@@ -170,23 +171,41 @@ private void nextChunk()
170171
break;
171172
}
172173

173-
Optional<String> chunkSignature = Splitter.on(';').trimResults().withKeyValueSeparator('=').split(parts.get(1))
174-
.entrySet()
175-
.stream()
176-
.filter(entry -> entry.getKey().equalsIgnoreCase("chunk-signature"))
177-
.map(Map.Entry::getValue)
178-
.findFirst();
174+
if (chunkSigningSession.isPresent()) {
175+
if (parts.size() != 2) {
176+
break;
177+
}
178+
179+
Optional<String> chunkSignature = Splitter.on(';').trimResults().withKeyValueSeparator('=').split(parts.get(1))
180+
.entrySet()
181+
.stream()
182+
.filter(entry -> entry.getKey().equalsIgnoreCase("chunk-signature"))
183+
.map(Map.Entry::getValue)
184+
.findFirst();
179185

180-
if (chunkSignature.isEmpty()) {
181-
break;
186+
if (chunkSignature.isEmpty()) {
187+
break;
188+
}
189+
190+
chunkSigningSession.get().startChunk(chunkSignature.get());
191+
}
192+
else {
193+
if (parts.size() != 1) {
194+
break;
195+
}
182196
}
183197

184-
chunkSigningSession.startChunk(chunkSignature.get());
185198
bytesRemainingInChunk = chunkSize;
186199

187200
if (chunkSize == 0) {
188-
readEmptyLine();
189-
chunkSigningSession.complete();
201+
if (trailerHeaders.isEmpty()) {
202+
readEmptyLine();
203+
chunkSigningSession.ifPresent(ChunkSigningSession::complete);
204+
}
205+
else {
206+
readTrailingHeaders();
207+
readEmptyLine();
208+
}
190209
state = State.LAST_CHUNK;
191210
}
192211
bytesAccountedFor += chunkSize;
@@ -236,4 +255,44 @@ private String readLine()
236255

237256
return line.toString();
238257
}
258+
259+
private TrailerHeaderChunk readTrailingHeadersChunk()
260+
throws IOException
261+
{
262+
Optional<String> signature = Optional.empty();
263+
StringBuilder trailerHeadersChunkBuilder = new StringBuilder();
264+
for (int i = 0; i < this.trailerHeaders.size(); i++) {
265+
String trailerHeaders = readLine();
266+
List<String> trailerHeadersValues = Splitter.on(":").trimResults().limit(2).splitToList(trailerHeaders);
267+
String trailerHeaderName = trailerHeadersValues.getFirst();
268+
if ((trailerHeadersValues.size() != 2) || !this.trailerHeaders.contains(trailerHeaderName)) {
269+
throw new WebApplicationException("Trailer header is invalid: " + trailerHeaders, BAD_REQUEST);
270+
}
271+
if (trailerHeaderName.equals("x-amz-trailer-signature")) {
272+
signature = Optional.of(trailerHeadersValues.getLast());
273+
break;
274+
}
275+
else {
276+
trailerHeadersChunkBuilder.append(trailerHeaders);
277+
}
278+
}
279+
return new TrailerHeaderChunk(trailerHeadersChunkBuilder.toString(), signature);
280+
}
281+
282+
private void readTrailingHeaders()
283+
throws IOException
284+
{
285+
TrailerHeaderChunk trailerHeaderChunk = readTrailingHeadersChunk();
286+
chunkSigningSession.ifPresent(chunkSigningSession -> {
287+
if (trailerHeaderChunk.signature.isEmpty()) {
288+
throw new WebApplicationException("Expected x-amz-trailer-signature, none found", BAD_REQUEST);
289+
}
290+
chunkSigningSession.startChunk(trailerHeaderChunk.signature.get());
291+
byte[] trailerHeaderContent = trailerHeaderChunk.trailerHeaders.getBytes(StandardCharsets.UTF_8);
292+
chunkSigningSession.write(trailerHeaderContent, 0, trailerHeaderContent.length);
293+
chunkSigningSession.complete();
294+
});
295+
}
296+
297+
private record TrailerHeaderChunk(String trailerHeaders, Optional<String> signature) {}
239298
}

trino-aws-proxy/src/main/java/io/trino/aws/proxy/server/rest/RequestBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.base.Splitter;
1717
import com.google.common.base.Suppliers;
18+
import com.google.common.collect.ImmutableList;
1819
import io.airlift.log.Logger;
1920
import io.trino.aws.proxy.server.rest.RequestHeadersBuilder.InternalRequestHeaders;
2021
import io.trino.aws.proxy.server.signing.SigningQueryParameters;
@@ -152,7 +153,7 @@ private static RequestContent buildRequestContent(InputStream requestEntityStrea
152153

153154
// AWS does not mandate x-amz-decoded-content length is required for chunked transfer encoding
154155
// But we require it for simplicity (Content-Length is needed since we don't do chunking on outbound requests)
155-
case AWS_CHUNKED, W3C_CHUNKED, AWS_CHUNKED_IN_W3C_CHUNKED -> () -> {
156+
case AWS_CHUNKED, AWS_CHUNKED_UNSIGNED, W3C_CHUNKED, AWS_CHUNKED_IN_W3C_CHUNKED, AWS_CHUNKED_IN_W3C_CHUNKED_UNSIGNED -> () -> {
156157
int contentLength = requestHeaders.decodedContentLength()
157158
.orElseThrow(() -> new WebApplicationException(BAD_REQUEST));
158159
return Optional.of(contentLength);
@@ -188,6 +189,12 @@ public Optional<InputStream> inputStream()
188189
.map(bytes -> (InputStream) new ByteArrayInputStream(bytes))
189190
.or(() -> Optional.of(requestEntityStream));
190191
}
192+
193+
@Override
194+
public List<String> trailerHeaders()
195+
{
196+
return ImmutableList.copyOf(requestHeaders.requestHeaders().unmodifiedHeaders().get("x-amz-trailer"));
197+
}
191198
};
192199
}
193200
}

trino-aws-proxy/src/main/java/io/trino/aws/proxy/server/rest/RequestHeadersBuilder.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ private RequestHeadersBuilder() {}
5050
"connection",
5151
"amz-sdk-invocation-id",
5252
"amx-sdk-request",
53-
"host");
53+
"host",
54+
"x-amz-trailer");
5455

5556
record InternalRequestHeaders(
5657
RequestHeaders requestHeaders,
@@ -75,7 +76,9 @@ static InternalRequestHeaders parseHeaders(MultiMap allRequestHeaders)
7576
Builder builder = new Builder();
7677
allRequestHeaders.forEach((headerName, headerValues) -> {
7778
switch (headerName) {
78-
case "authorization", "x-amz-security-token" -> {} // these get handled separately
79+
case "authorization", "x-amz-security-token" -> {
80+
// these get handled separately
81+
}
7982
case "content-length" -> builder.contentLength(headerValues);
8083
case "x-amz-decoded-content-length" -> builder.decodedContentLength(headerValues);
8184
case "content-encoding" -> builder.contentEncoding(headerValues);
@@ -193,13 +196,22 @@ private void addPassthroughHeader(String headerName, List<String> headerValues)
193196
passthroughHeadersBuilder.addAll(headerName, headerValues);
194197
}
195198

199+
private String requiredContentSha256()
200+
{
201+
return contentSha256.orElseThrow(() -> new WebApplicationException(BAD_REQUEST));
202+
}
203+
196204
private void assertContentTypeValid(ContentType actualContentType)
197205
{
198-
if (actualContentType == ContentType.AWS_CHUNKED || actualContentType == ContentType.AWS_CHUNKED_IN_W3C_CHUNKED || actualContentType == ContentType.W3C_CHUNKED) {
206+
if (actualContentType == ContentType.AWS_CHUNKED
207+
|| actualContentType == ContentType.AWS_CHUNKED_IN_W3C_CHUNKED
208+
|| actualContentType == ContentType.W3C_CHUNKED
209+
|| actualContentType == ContentType.AWS_CHUNKED_UNSIGNED
210+
|| actualContentType == ContentType.AWS_CHUNKED_IN_W3C_CHUNKED_UNSIGNED) {
199211
if (decodedContentLength.isEmpty()) {
200212
throw new WebApplicationException(LENGTH_REQUIRED);
201213
}
202-
String sha256 = contentSha256.orElseThrow(() -> new WebApplicationException(BAD_REQUEST));
214+
String sha256 = requiredContentSha256();
203215
if (actualContentType != ContentType.W3C_CHUNKED && !sha256.startsWith("STREAMING-")) {
204216
throw new WebApplicationException(BAD_REQUEST);
205217
}
@@ -209,11 +221,21 @@ private void assertContentTypeValid(ContentType actualContentType)
209221
private InternalRequestHeaders build(MultiMap allHeaders)
210222
{
211223
Optional<ContentType> applicableContentType = switch (seenRequestPayloadContentTypes.size()) {
212-
case 0, 1 -> seenRequestPayloadContentTypes.stream().findFirst();
224+
case 0 -> Optional.empty();
225+
case 1 -> {
226+
Optional<ContentType> contentType = seenRequestPayloadContentTypes.stream().findFirst();
227+
if (contentType.get().equals(ContentType.AWS_CHUNKED) && requiredContentSha256().startsWith("STREAMING-UNSIGNED-PAYLOAD")) {
228+
yield Optional.of(ContentType.AWS_CHUNKED_UNSIGNED);
229+
}
230+
yield contentType;
231+
}
213232
case 2 -> {
214233
if (!seenRequestPayloadContentTypes.containsAll(ImmutableSet.of(ContentType.AWS_CHUNKED, ContentType.W3C_CHUNKED))) {
215234
throw new WebApplicationException(BAD_REQUEST);
216235
}
236+
if (requiredContentSha256().startsWith("STREAMING-UNSIGNED-PAYLOAD")) {
237+
yield Optional.of(ContentType.AWS_CHUNKED_IN_W3C_CHUNKED_UNSIGNED);
238+
}
217239
yield Optional.of(ContentType.AWS_CHUNKED_IN_W3C_CHUNKED);
218240
}
219241
default -> throw new WebApplicationException(BAD_REQUEST);

trino-aws-proxy/src/main/java/io/trino/aws/proxy/server/rest/TrinoS3ProxyClient.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,16 @@ private Optional<InputStream> contentInputStream(RequestContent requestContent,
212212
{
213213
return switch (requestContent.contentType()) {
214214
case AWS_CHUNKED, AWS_CHUNKED_IN_W3C_CHUNKED -> requestContent.inputStream()
215-
.map(inputStream -> new AwsChunkedInputStream(limitStreamController.wrap(inputStream), signingMetadata.requiredSigningContext().chunkSigningSession(), requestContent.contentLength().orElseThrow()));
216-
215+
.map(inputStream -> new AwsChunkedInputStream(limitStreamController.wrap(inputStream),
216+
Optional.of(signingMetadata.requiredSigningContext().chunkSigningSession()),
217+
requestContent.contentLength().orElseThrow(),
218+
requestContent.trailerHeaders()));
219+
case AWS_CHUNKED_UNSIGNED, AWS_CHUNKED_IN_W3C_CHUNKED_UNSIGNED -> requestContent.inputStream()
220+
.map(inputStream -> new AwsChunkedInputStream(
221+
limitStreamController.wrap(inputStream),
222+
Optional.empty(),
223+
requestContent.contentLength().orElseThrow(),
224+
requestContent.trailerHeaders()));
217225
case STANDARD, W3C_CHUNKED -> requestContent.inputStream().map(inputStream -> {
218226
SigningContext signingContext = signingMetadata.requiredSigningContext();
219227
return signingContext.contentHash()

trino-aws-proxy/src/test/java/io/trino/aws/proxy/server/TestGenericRestRequests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ private StatusResponse doAwsChunkedUpload(
248248

249249
URI requestUri = UriBuilder.fromUri(baseUri).path(bucket).path(key).build();
250250
RequestAuthorization requestAuthorization = signRequest(requestSigningCredential, requestUri, requestDate, "PUT", requestHeaderBuilder.build());
251-
String chunkedContent = chunkedPayloadMutator.apply(TestingChunkSigningSession.build(chunkSigningCredential, requestAuthorization.signature(), requestDate).generateChunkedStream(contentToUpload, partitionCount));
251+
String chunkedContent = chunkedPayloadMutator.apply(TestingChunkSigningSession.build(chunkSigningCredential, requestAuthorization.signature(), requestDate).generateChunkedStream(contentToUpload, partitionCount, Optional.empty()));
252252
Request.Builder requestBuilder = preparePut().setUri(requestUri);
253253

254254
requestHeaderBuilder.add("Authorization", requestAuthorization.authorization());

0 commit comments

Comments
 (0)