提交 e6d206b4 编写于 作者: R Rossen Stoyanchev

Extra information in WebFlux stacktraces

Use the checkpoint operator at various places in WebFlux to insert
information that Reactor then uses to enrich exceptions, via suppressed
exceptions, when error signals flow through the operator.

Closes gh-22105
上级 495ba2f5
......@@ -21,6 +21,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
......@@ -89,6 +90,8 @@ public class HandlerMethod {
@Nullable
private volatile List<Annotation[][]> interfaceParameterAnnotations;
private final String description;
/**
* Create an instance from a bean instance and a method.
......@@ -103,6 +106,7 @@ public class HandlerMethod {
this.bridgedMethod = BridgeMethodResolver.findBridgedMethod(method);
this.parameters = initMethodParameters();
evaluateResponseStatus();
this.description = initDescription(this.beanType, this.method);
}
/**
......@@ -119,6 +123,7 @@ public class HandlerMethod {
this.bridgedMethod = BridgeMethodResolver.findBridgedMethod(this.method);
this.parameters = initMethodParameters();
evaluateResponseStatus();
this.description = initDescription(this.beanType, this.method);
}
/**
......@@ -141,6 +146,7 @@ public class HandlerMethod {
this.bridgedMethod = BridgeMethodResolver.findBridgedMethod(method);
this.parameters = initMethodParameters();
evaluateResponseStatus();
this.description = initDescription(this.beanType, this.method);
}
/**
......@@ -156,6 +162,7 @@ public class HandlerMethod {
this.parameters = handlerMethod.parameters;
this.responseStatus = handlerMethod.responseStatus;
this.responseStatusReason = handlerMethod.responseStatusReason;
this.description = handlerMethod.description;
this.resolvedFromHandlerMethod = handlerMethod.resolvedFromHandlerMethod;
}
......@@ -174,6 +181,7 @@ public class HandlerMethod {
this.responseStatus = handlerMethod.responseStatus;
this.responseStatusReason = handlerMethod.responseStatusReason;
this.resolvedFromHandlerMethod = handlerMethod;
this.description = handlerMethod.description;
}
private MethodParameter[] initMethodParameters() {
......@@ -198,6 +206,14 @@ public class HandlerMethod {
}
}
private static String initDescription(Class<?> beanType, Method method) {
StringJoiner joiner = new StringJoiner(", ", "(", ")");
for (Class<?> paramType : method.getParameterTypes()) {
joiner.add(paramType.getSimpleName());
}
return beanType.getName() + "#" + method.getName() + joiner.toString();
}
/**
* Return the bean for this handler method.
......@@ -389,7 +405,7 @@ public class HandlerMethod {
@Override
public String toString() {
return this.method.toGenericString();
return this.description;
}
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -53,7 +53,7 @@ public class DefaultWebFilterChain implements WebFilterChain {
private final WebFilter currentFilter;
@Nullable
private final DefaultWebFilterChain next;
private final DefaultWebFilterChain chain;
/**
......@@ -68,7 +68,7 @@ public class DefaultWebFilterChain implements WebFilterChain {
this.handler = handler;
DefaultWebFilterChain chain = initChain(filters, handler);
this.currentFilter = chain.currentFilter;
this.next = chain.next;
this.chain = chain.chain;
}
private static DefaultWebFilterChain initChain(List<WebFilter> filters, WebHandler handler) {
......@@ -84,12 +84,12 @@ public class DefaultWebFilterChain implements WebFilterChain {
* Private constructor to represent one link in the chain.
*/
private DefaultWebFilterChain(List<WebFilter> allFilters, WebHandler handler,
@Nullable WebFilter currentFilter, @Nullable DefaultWebFilterChain next) {
@Nullable WebFilter currentFilter, @Nullable DefaultWebFilterChain chain) {
this.allFilters = allFilters;
this.currentFilter = currentFilter;
this.handler = handler;
this.next = next;
this.chain = chain;
}
/**
......@@ -117,9 +117,14 @@ public class DefaultWebFilterChain implements WebFilterChain {
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() ->
this.currentFilter != null && this.next != null ?
this.currentFilter.filter(exchange, this.next) :
this.currentFilter != null && this.chain != null ?
invokeFilter(this.currentFilter, this.chain, exchange) :
this.handler.handle(exchange));
}
private Mono<Void> invokeFilter(WebFilter current, DefaultWebFilterChain chain, ServerWebExchange exchange) {
return current.filter(exchange, chain)
.checkpoint(current.getClass().getName() + " [DefaultWebFilterChain]");
}
}
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -22,6 +22,9 @@ import java.util.List;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebExceptionHandler;
import org.springframework.web.server.WebHandler;
......@@ -41,7 +44,10 @@ public class ExceptionHandlingWebHandler extends WebHandlerDecorator {
public ExceptionHandlingWebHandler(WebHandler delegate, List<WebExceptionHandler> handlers) {
super(delegate);
this.exceptionHandlers = Collections.unmodifiableList(new ArrayList<>(handlers));
List<WebExceptionHandler> handlersToUse = new ArrayList<>();
handlersToUse.add(new CheckpointInsertingHandler());
handlersToUse.addAll(handlers);
this.exceptionHandlers = Collections.unmodifiableList(handlersToUse);
}
......@@ -71,4 +77,24 @@ public class ExceptionHandlingWebHandler extends WebHandlerDecorator {
return completion;
}
/**
* WebExceptionHandler to insert a checkpoint with current URL information.
* Must be the first in order to ensure we catch the error signal before
* the exception is handled and e.g. turned into an error response.
* @since 5.2
*/
private static class CheckpointInsertingHandler implements WebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpRequest request = exchange.getRequest();
String rawQuery = request.getURI().getRawQuery();
String query = StringUtils.hasText(rawQuery) ? "?" + rawQuery : "";
HttpMethod httpMethod = request.getMethod();
String description = "HTTP " + httpMethod + " \"" + request.getPath() + query + "\"";
return Mono.error(ex).checkpoint(description + " [ExceptionHandlingWebHandler]").cast(Void.class);
}
}
}
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -67,11 +67,6 @@ import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
*/
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
@SuppressWarnings("ThrowableInstanceNeverThrown")
private static final Exception HANDLER_NOT_FOUND_EXCEPTION =
new ResponseStatusException(HttpStatus.NOT_FOUND, "No matching handler");
@Nullable
private List<HandlerMapping> handlerMappings;
......@@ -172,8 +167,13 @@ public class DispatcherHandler implements WebHandler, ApplicationContextAware {
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
.checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]")
.onErrorResume(ex ->
result.applyExceptionHandler(ex).flatMap(exResult -> {
String text = "Exception handler " + exResult.getHandler() +
", error=\"" + ex.getMessage() + "\" [DispatcherHandler]";
return getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text);
}));
}
private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -56,12 +56,17 @@ class DefaultClientResponse implements ClientResponse {
private final String logPrefix;
private final String requestDescription;
public DefaultClientResponse(ClientHttpResponse response, ExchangeStrategies strategies,
String logPrefix, String requestDescription) {
public DefaultClientResponse(ClientHttpResponse response, ExchangeStrategies strategies, String logPrefix) {
this.response = response;
this.strategies = strategies;
this.headers = new DefaultHeaders();
this.logPrefix = logPrefix;
this.requestDescription = requestDescription;
}
......@@ -90,22 +95,35 @@ class DefaultClientResponse implements ClientResponse {
return this.response.getCookies();
}
@SuppressWarnings("unchecked")
@Override
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return extractor.extract(this.response, new BodyExtractor.Context() {
T result = extractor.extract(this.response, new BodyExtractor.Context() {
@Override
public List<HttpMessageReader<?>> messageReaders() {
return strategies.messageReaders();
}
@Override
public Optional<ServerHttpResponse> serverResponse() {
return Optional.empty();
}
@Override
public Map<String, Object> hints() {
return Hints.from(Hints.LOG_PREFIX_HINT, logPrefix);
}
});
String description = "Body from " + this.requestDescription + " [DefaultClientResponse]";
if (result instanceof Mono) {
return (T) ((Mono<?>) result).checkpoint(description);
}
else if (result instanceof Flux) {
return (T) ((Flux<?>) result).checkpoint(description);
}
else {
return result;
}
}
@Override
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -136,7 +136,7 @@ final class DefaultClientResponseBuilder implements ClientResponse.Builder {
// When building ClientResponse manually, the ClientRequest.logPrefix() has to be passed,
// e.g. via ClientResponse.Builder, but this (builder) is not used currently.
return new DefaultClientResponse(httpResponse, this.strategies, "");
return new DefaultClientResponse(httpResponse, this.strategies, "", "");
}
......
......@@ -316,8 +316,9 @@ class DefaultWebClient implements WebClient {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> exchangeFunction.exchange(request))
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
return Mono.defer(() -> exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
}
private ClientRequest.Builder initRequestBuilder() {
......@@ -445,8 +446,8 @@ class DefaultWebClient implements WebClient {
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementType) {
return this.responseMono.flatMapMany(response -> handleBody(response,
response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
return this.responseMono.flatMapMany(response ->
handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
}
private <T extends Publisher<?>> T handleBody(ClientResponse response,
......@@ -459,7 +460,8 @@ class DefaultWebClient implements WebClient {
Mono<? extends Throwable> exMono = handler.apply(response, request);
exMono = exMono.flatMap(ex -> drainBody(response, ex));
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
return errorFunction.apply(exMono);
T result = errorFunction.apply(exMono);
return insertCheckpoint(result, response.statusCode(), request);
}
}
return bodyPublisher;
......@@ -477,6 +479,22 @@ class DefaultWebClient implements WebClient {
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}
@SuppressWarnings("unchecked")
private <T extends Publisher<?>> T insertCheckpoint(T result, HttpStatus status, HttpRequest request) {
String httpMethod = request.getMethodValue();
URI uri = request.getURI();
String description = status + " from " + httpMethod + " " + uri + " [DefaultWebClient]";
if (result instanceof Mono) {
return (T) ((Mono<?>) result).checkpoint(description);
}
else if (result instanceof Flux) {
return (T) ((Flux<?>) result).checkpoint(description);
}
else {
return result;
}
}
private static Mono<WebClientResponseException> createResponseException(
ClientResponse response, HttpRequest request) {
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -105,7 +105,8 @@ public abstract class ExchangeFunctions {
.doOnCancel(() -> logger.debug(logPrefix + "Cancel signal (to close connection)"))
.map(httpResponse -> {
logResponse(httpResponse, logPrefix);
return new DefaultClientResponse(httpResponse, this.strategies, logPrefix);
return new DefaultClientResponse(
httpResponse, this.strategies, logPrefix, httpMethod.name() + " " + url);
});
}
......
......@@ -63,11 +63,16 @@ public class WebClientResponseException extends WebClientException {
* Constructor with response data only, and a default message.
* @since 5.1.4
*/
public WebClientResponseException(int statusCode, String statusText,
public WebClientResponseException(int status, String reasonPhrase,
@Nullable HttpHeaders headers, @Nullable byte[] body, @Nullable Charset charset,
@Nullable HttpRequest request) {
this(statusCode + " " + statusText, statusCode, statusText, headers, body, charset, request);
this(initMessage(status, reasonPhrase, request), status, reasonPhrase, headers, body, charset, request);
}
private static String initMessage(int status, String reasonPhrase, @Nullable HttpRequest request) {
return status + " " + reasonPhrase +
(request != null ? " from " + request.getMethodValue() + " " + request.getURI() : "");
}
/**
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -74,7 +74,9 @@ public class JettyWebSocketHandlerAdapter {
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
this.delegateSession = this.sessionFactory.apply(session);
this.delegateHandler.handle(this.delegateSession).subscribe(this.delegateSession);
this.delegateHandler.handle(this.delegateSession)
.checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]")
.subscribe(this.delegateSession);
}
@OnWebSocketMessage
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -80,7 +80,9 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
});
this.delegateHandler.handle(this.delegateSession).subscribe(this.delegateSession);
this.delegateHandler.handle(this.delegateSession)
.checkpoint(session.getRequestURI() + " [StandardWebSocketHandlerAdapter]")
.subscribe(this.delegateSession);
}
private <T> WebSocketMessage toMessage(T message) {
......
......@@ -117,7 +117,7 @@ public class ReactorNettyWebSocketClient implements WebSocketClient {
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session);
return handler.handle(session).checkpoint(url + " [ReactorNettyWebSocketClient]");
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -204,7 +204,9 @@ public class UndertowWebSocketClient implements WebSocketClient {
channel.getReceiveSetter().set(adapter);
channel.resumeReceives();
handler.handle(session).subscribe(session);
handler.handle(session)
.checkpoint(url + " [UndertowWebSocketClient]")
.subscribe(session);
}
private HandshakeInfo createHandshakeInfo(URI url, DefaultNegotiation negotiation) {
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,6 +16,7 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.net.URI;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
......@@ -81,7 +82,8 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
return handler.handle(session);
URI uri = exchange.getRequest().getURI();
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
});
}
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -93,14 +93,16 @@ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy {
}
@Override
public void onConnect(WebSocketHttpExchange httpExchange, WebSocketChannel channel) {
public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
UndertowWebSocketSession session = createSession(channel);
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
channel.getReceiveSetter().set(adapter);
channel.resumeReceives();
this.handler.handle(session).subscribe(session);
this.handler.handle(session)
.checkpoint(exchange.getRequestURI() + " [UndertowRequestUpgradeStrategy]")
.subscribe(session);
}
private UndertowWebSocketSession createSession(WebSocketChannel channel) {
......
......@@ -70,7 +70,7 @@ public class DefaultClientResponseTests {
public void createMocks() {
mockResponse = mock(ClientHttpResponse.class);
mockExchangeStrategies = mock(ExchangeStrategies.class);
defaultClientResponse = new DefaultClientResponse(mockResponse, mockExchangeStrategies, "");
defaultClientResponse = new DefaultClientResponse(mockResponse, mockExchangeStrategies, "", "");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册