提交 b5089ac0 编写于 作者: R Rossen Stoyanchev

Support @RequestBody Flux<Part> in WebFlux

This commit turns the Synchronoss NIO Multipart HttpMessageReader into
a reader of Flux<Part> and creates a separate reader that aggregates
the parts into a MultiValueMap<String, Part>.

Issue: SPR-14546
上级 d43dfc7b
......@@ -21,7 +21,8 @@ import java.util.List;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.multipart.SynchronossMultipartHttpMessageReader;
import org.springframework.http.codec.multipart.MultipartHttpMessageReader;
import org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader;
import org.springframework.util.ClassUtils;
/**
......@@ -65,7 +66,9 @@ class DefaultServerCodecConfigurer extends DefaultCodecConfigurer implements Ser
super.addTypedReadersTo(result);
addReaderTo(result, FormHttpMessageReader::new);
if (synchronossMultipartPresent) {
addReaderTo(result, SynchronossMultipartHttpMessageReader::new);
SynchronossPartHttpMessageReader partReader = new SynchronossPartHttpMessageReader();
addReaderTo(result, () -> partReader);
addReaderTo(result, () -> new MultipartHttpMessageReader(partReader));
}
}
......
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.multipart;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* {@code HttpMessageReader} for reading {@code "multipart/form-data"} requests
* into a {@code MultiValueMap<String, Part>}.
*
* <p>Note that this reader depends on access to an
* {@code HttpMessageReader<Part>} for the actual parsing of multipart content.
* The purpose of this reader is to collect the parts into a map.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class MultipartHttpMessageReader implements HttpMessageReader<MultiValueMap<String, Part>> {
private static final ResolvableType MULTIPART_VALUE_TYPE = ResolvableType.forClassWithGenerics(
MultiValueMap.class, String.class, Part.class);
private final HttpMessageReader<Part> partReader;
public MultipartHttpMessageReader(HttpMessageReader<Part> partReader) {
Assert.notNull(partReader, "'partReader' is required");
this.partReader = partReader;
}
@Override
public List<MediaType> getReadableMediaTypes() {
return Collections.singletonList(MediaType.MULTIPART_FORM_DATA);
}
@Override
public boolean canRead(ResolvableType elementType, MediaType mediaType) {
return MULTIPART_VALUE_TYPE.isAssignableFrom(elementType) &&
(mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType));
}
@Override
public Flux<MultiValueMap<String, Part>> read(ResolvableType elementType,
ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Flux.from(readMono(elementType, message, hints));
}
@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType,
ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
return this.partReader.read(elementType, inputMessage, hints)
.collectMultimap(Part::getName).map(this::toMultiValueMap);
}
private LinkedMultiValueMap<String, Part> toMultiValueMap(Map<String, Collection<Part>> map) {
return new LinkedMultiValueMap<>(map.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> toList(e.getValue()))));
}
private List<Part> toList(Collection<Part> collection) {
return collection instanceof List ? (List<Part>) collection : new ArrayList<>(collection);
}
}
......@@ -25,15 +25,12 @@ import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.synchronoss.cloud.nio.multipart.Multipart;
import org.synchronoss.cloud.nio.multipart.MultipartContext;
......@@ -55,25 +52,24 @@ import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MimeType;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StreamUtils;
/**
* {@code HttpMessageReader} for {@code "multipart/form-data"} requests based
* on the Synchronoss NIO Multipart library.
* {@code HttpMessageReader} for parsing {@code "multipart/form-data"} requests
* to a stream of {@link Part}'s using the Synchronoss NIO Multipart library.
*
* <p>This reader can be provided to {@link MultipartHttpMessageReader} in order
* to aggregate all parts into a Map.
*
* @author Sebastien Deleuze
* @author Rossen Stoyanchev
* @author Arjen Poutsma
* @since 5.0
* @see <a href="https://github.com/synchronoss/nio-multipart">Synchronoss NIO Multipart</a>
* @see MultipartHttpMessageReader
*/
public class SynchronossMultipartHttpMessageReader implements HttpMessageReader<MultiValueMap<String, Part>> {
private static final ResolvableType MULTIPART_VALUE_TYPE = ResolvableType.forClassWithGenerics(
MultiValueMap.class, String.class, Part.class);
public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> {
@Override
......@@ -83,34 +79,25 @@ public class SynchronossMultipartHttpMessageReader implements HttpMessageReader<
@Override
public boolean canRead(ResolvableType elementType, MediaType mediaType) {
return MULTIPART_VALUE_TYPE.isAssignableFrom(elementType) &&
return Part.class.equals(elementType.resolve(Object.class)) &&
(mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType));
}
@Override
public Flux<MultiValueMap<String, Part>> read(ResolvableType elementType,
ReactiveHttpInputMessage message, Map<String, Object> hints) {
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message,
Map<String, Object> hints) {
return Flux.from(readMono(elementType, message, hints));
return Flux.create(new SynchronossPartGenerator(message));
}
@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType,
ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
return Flux.create(new SynchronossPartGenerator(inputMessage))
.collectMultimap(Part::getName).map(this::toMultiValueMap);
}
private LinkedMultiValueMap<String, Part> toMultiValueMap(Map<String, Collection<Part>> map) {
return new LinkedMultiValueMap<>(map.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> toList(e.getValue()))));
}
public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage message,
Map<String, Object> hints) {
private List<Part> toList(Collection<Part> collection) {
return collection instanceof List ? (List<Part>) collection : new ArrayList<>(collection);
return Mono.error(new UnsupportedOperationException(
"This reader does not support reading a single element."));
}
......
......@@ -41,7 +41,8 @@ import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.multipart.SynchronossMultipartHttpMessageReader;
import org.springframework.http.codec.multipart.MultipartHttpMessageReader;
import org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader;
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
import org.springframework.util.MimeTypeUtils;
......@@ -63,14 +64,15 @@ public class ServerCodecConfigurerTests {
@Test
public void defaultReaders() throws Exception {
List<HttpMessageReader<?>> readers = this.configurer.getReaders();
assertEquals(10, readers.size());
assertEquals(11, readers.size());
assertEquals(ByteArrayDecoder.class, getNextDecoder(readers).getClass());
assertEquals(ByteBufferDecoder.class, getNextDecoder(readers).getClass());
assertEquals(DataBufferDecoder.class, getNextDecoder(readers).getClass());
assertEquals(ResourceDecoder.class, getNextDecoder(readers).getClass());
assertStringDecoder(getNextDecoder(readers), true);
assertEquals(FormHttpMessageReader.class, readers.get(this.index.getAndIncrement()).getClass());
assertEquals(SynchronossMultipartHttpMessageReader.class, readers.get(this.index.getAndIncrement()).getClass());
assertEquals(SynchronossPartHttpMessageReader.class, readers.get(this.index.getAndIncrement()).getClass());
assertEquals(MultipartHttpMessageReader.class, readers.get(this.index.getAndIncrement()).getClass());
assertEquals(Jaxb2XmlDecoder.class, getNextDecoder(readers).getClass());
assertEquals(Jackson2JsonDecoder.class, getNextDecoder(readers).getClass());
assertStringDecoder(getNextDecoder(readers), false);
......
......@@ -102,7 +102,9 @@ public class MultipartHttpMessageWriterTests {
assertNotNull("No boundary found", contentType.getParameter("boundary"));
// see if Synchronoss NIO Multipart can read what we wrote
SynchronossMultipartHttpMessageReader reader = new SynchronossMultipartHttpMessageReader();
SynchronossPartHttpMessageReader synchronossReader = new SynchronossPartHttpMessageReader();
MultipartHttpMessageReader reader = new MultipartHttpMessageReader(synchronossReader);
MockServerHttpRequest request = MockServerHttpRequest.post("/foo")
.header(HttpHeaders.CONTENT_TYPE, contentType.toString())
.body(response.getBody());
......
......@@ -32,53 +32,57 @@ import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.MockHttpOutputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import static java.util.Collections.*;
import static org.junit.Assert.*;
import static org.springframework.http.HttpHeaders.*;
import static org.springframework.http.MediaType.*;
import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.springframework.core.ResolvableType.forClassWithGenerics;
import static org.springframework.http.HttpHeaders.CONTENT_LENGTH;
import static org.springframework.http.HttpHeaders.CONTENT_TYPE;
import static org.springframework.http.MediaType.MULTIPART_FORM_DATA;
/**
* @author Sebastien Deleuze
*/
public class SynchronossMultipartHttpMessageReaderTests {
public class SynchronossPartHttpMessageReaderTests {
private final HttpMessageReader<MultiValueMap<String, Part>> reader = new SynchronossMultipartHttpMessageReader();
private final MultipartHttpMessageReader reader =
new MultipartHttpMessageReader(new SynchronossPartHttpMessageReader());
@Test
public void canRead() {
assertTrue(this.reader.canRead(
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class),
forClassWithGenerics(MultiValueMap.class, String.class, Part.class),
MediaType.MULTIPART_FORM_DATA));
assertFalse(this.reader.canRead(
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Object.class),
forClassWithGenerics(MultiValueMap.class, String.class, Object.class),
MediaType.MULTIPART_FORM_DATA));
assertFalse(this.reader.canRead(
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class),
forClassWithGenerics(MultiValueMap.class, String.class, String.class),
MediaType.MULTIPART_FORM_DATA));
assertFalse(this.reader.canRead(
ResolvableType.forClassWithGenerics(Map.class, String.class, String.class),
forClassWithGenerics(Map.class, String.class, String.class),
MediaType.MULTIPART_FORM_DATA));
assertFalse(this.reader.canRead(
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class),
forClassWithGenerics(MultiValueMap.class, String.class, Part.class),
MediaType.APPLICATION_FORM_URLENCODED));
}
@Test
public void resolveParts() throws IOException {
ServerHttpRequest request = generateMultipartRequest();
ResolvableType elementType = ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class);
ResolvableType elementType = forClassWithGenerics(MultiValueMap.class, String.class, Part.class);
MultiValueMap<String, Part> parts = this.reader.readMono(elementType, request, emptyMap()).block();
assertEquals(2, parts.size());
......@@ -105,7 +109,7 @@ public class SynchronossMultipartHttpMessageReaderTests {
@Test
public void bodyError() {
ServerHttpRequest request = generateErrorMultipartRequest();
ResolvableType elementType = ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class);
ResolvableType elementType = forClassWithGenerics(MultiValueMap.class, String.class, Part.class);
StepVerifier.create(this.reader.readMono(elementType, request, emptyMap())).verifyError();
}
......
......@@ -103,7 +103,7 @@ public class DelegatingWebFluxConfigurationTests {
verify(webFluxConfigurer).configureArgumentResolvers(any());
assertSame(formatterRegistry.getValue(), initializerConversionService);
assertEquals(10, codecsConfigurer.getValue().getReaders().size());
assertEquals(11, codecsConfigurer.getValue().getReaders().size());
}
@Test
......
......@@ -127,7 +127,7 @@ public class WebFluxConfigurationSupportTests {
assertNotNull(adapter);
List<HttpMessageReader<?>> readers = adapter.getMessageCodecConfigurer().getReaders();
assertEquals(10, readers.size());
assertEquals(11, readers.size());
assertHasMessageReader(readers, forClass(byte[].class), APPLICATION_OCTET_STREAM);
assertHasMessageReader(readers, forClass(ByteBuffer.class), APPLICATION_OCTET_STREAM);
......
......@@ -16,8 +16,12 @@
package org.springframework.web.reactive.result.method.annotation;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
......@@ -35,6 +39,7 @@ import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.DispatcherHandler;
......@@ -68,14 +73,10 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes
}
@Test
public void part() {
test("/part");
}
private void test(String uri) {
public void requestPart() {
Mono<ClientResponse> result = webClient
.post()
.uri(uri)
.uri("/requestPart")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(generateBody()))
.exchange();
......@@ -86,6 +87,37 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes
.verifyComplete();
}
@Test
public void requestBodyMap() {
Mono<String> result = webClient
.post()
.uri("/requestBodyMap")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(generateBody()))
.retrieve()
.bodyToMono(String.class);
StepVerifier.create(result)
.consumeNextWith(body -> assertEquals("Map[barPart,fooPart]", body))
.verifyComplete();
}
@Test
public void requestBodyFlux() {
Mono<String> result = webClient
.post()
.uri("/requestBodyFlux")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(generateBody()))
.retrieve()
.bodyToMono(String.class);
StepVerifier.create(result)
.consumeNextWith(body -> assertEquals("Flux[barPart,fooPart]", body))
.verifyComplete();
}
private MultiValueMap<String, Object> generateBody() {
HttpHeaders fooHeaders = new HttpHeaders();
fooHeaders.setContentType(MediaType.TEXT_PLAIN);
......@@ -102,11 +134,22 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes
@SuppressWarnings("unused")
static class MultipartController {
@PostMapping("/part")
@PostMapping("/requestPart")
void part(@RequestPart Part fooPart) {
assertEquals("foo.txt", fooPart.getFilename().get());
}
@PostMapping("/requestBodyMap")
Mono<String> part(@RequestBody Mono<MultiValueMap<String, Part>> parts) {
return parts.map(map -> map.toSingleValueMap().entrySet().stream()
.map(Map.Entry::getKey).sorted().collect(Collectors.joining(",", "Map[", "]")));
}
@PostMapping("/requestBodyFlux")
Mono<String> part(@RequestBody Flux<Part> parts) {
return parts.map(Part::getName).collectList()
.map(names -> names.stream().sorted().collect(Collectors.joining(",", "Flux[", "]")));
}
}
@Configuration
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册