Jackson2Tokenizer.java 7.5 KB
Newer Older
1
/*
2
 * Copyright 2002-2019 the original author or authors.
3 4 5 6 7
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
S
Spring Operator 已提交
8
 *      https://www.apache.org/licenses/LICENSE-2.0
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.http.codec.json;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

24
import com.fasterxml.jackson.core.JsonFactory;
25 26 27 28
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
29
import com.fasterxml.jackson.databind.DeserializationContext;
30 31
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
32 33 34 35 36
import com.fasterxml.jackson.databind.util.TokenBuffer;
import reactor.core.publisher.Flux;

import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
37
import org.springframework.core.io.buffer.DataBufferLimitException;
38 39 40
import org.springframework.core.io.buffer.DataBufferUtils;

/**
R
Polish  
Rossen Stoyanchev 已提交
41 42 43
 * {@link Function} to transform a JSON stream of arbitrary size, byte array
 * chunks into a {@code Flux<TokenBuffer>} where each token buffer is a
 * well-formed JSON object.
44 45
 *
 * @author Arjen Poutsma
46 47
 * @author Rossen Stoyanchev
 * @author Juergen Hoeller
48 49
 * @since 5.0
 */
P
Phillip Webb 已提交
50
final class Jackson2Tokenizer {
51 52 53

	private final JsonParser parser;

54 55
	private final DeserializationContext deserializationContext;

56 57 58 59 60 61 62 63
	private final boolean tokenizeArrayElements;

	private TokenBuffer tokenBuffer;

	private int objectDepth;

	private int arrayDepth;

64 65 66 67 68
	private final int maxInMemorySize;

	private int byteCount;


69
	// TODO: change to ByteBufferFeeder when supported by Jackson
A
Arjen Poutsma 已提交
70
	// See https://github.com/FasterXML/jackson-core/issues/478
R
Polish  
Rossen Stoyanchev 已提交
71 72
	private final ByteArrayFeeder inputFeeder;

73

74 75
	private Jackson2Tokenizer(JsonParser parser, DeserializationContext deserializationContext,
			boolean tokenizeArrayElements, int maxInMemorySize) {
76 77

		this.parser = parser;
78
		this.deserializationContext = deserializationContext;
79
		this.tokenizeArrayElements = tokenizeArrayElements;
80
		this.tokenBuffer = new TokenBuffer(parser, deserializationContext);
81
		this.inputFeeder = (ByteArrayFeeder) this.parser.getNonBlockingInputFeeder();
82
		this.maxInMemorySize = maxInMemorySize;
83 84
	}

R
Polish  
Rossen Stoyanchev 已提交
85

86
	private Flux<TokenBuffer> tokenize(DataBuffer dataBuffer) {
87
		int bufferSize = dataBuffer.readableByteCount();
88 89 90 91 92 93
		byte[] bytes = new byte[dataBuffer.readableByteCount()];
		dataBuffer.read(bytes);
		DataBufferUtils.release(dataBuffer);

		try {
			this.inputFeeder.feedInput(bytes, 0, bytes.length);
94 95 96
			List<TokenBuffer> result = parseTokenBufferFlux();
			assertInMemorySize(bufferSize, result);
			return Flux.fromIterable(result);
97 98
		}
		catch (JsonProcessingException ex) {
99
			return Flux.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
100
		}
101 102 103 104 105 106 107 108
		catch (IOException ex) {
			return Flux.error(ex);
		}
	}

	private Flux<TokenBuffer> endOfInput() {
		this.inputFeeder.endOfInput();
		try {
109 110
			List<TokenBuffer> result = parseTokenBufferFlux();
			return Flux.fromIterable(result);
111
		}
112
		catch (JsonProcessingException ex) {
113
			return Flux.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
114
		}
115
		catch (IOException ex) {
116 117 118 119
			return Flux.error(ex);
		}
	}

120
	private List<TokenBuffer> parseTokenBufferFlux() throws IOException {
121 122
		List<TokenBuffer> result = new ArrayList<>();

123 124 125
		// SPR-16151: Smile data format uses null to separate documents
		boolean previousNull = false;
		while (!this.parser.isClosed()) {
126
			JsonToken token = this.parser.nextToken();
127
			if (token == JsonToken.NOT_AVAILABLE ||
128
					token == null && previousNull) {
129 130
				break;
			}
131 132 133 134
			else if (token == null ) { // !previousNull
				previousNull = true;
				continue;
			}
135 136 137 138 139 140 141 142
			updateDepth(token);
			if (!this.tokenizeArrayElements) {
				processTokenNormal(token, result);
			}
			else {
				processTokenArray(token, result);
			}
		}
143
		return result;
144 145
	}

R
Polish  
Rossen Stoyanchev 已提交
146
	private void updateDepth(JsonToken token) {
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
		switch (token) {
			case START_OBJECT:
				this.objectDepth++;
				break;
			case END_OBJECT:
				this.objectDepth--;
				break;
			case START_ARRAY:
				this.arrayDepth++;
				break;
			case END_ARRAY:
				this.arrayDepth--;
				break;
		}
	}

	private void processTokenNormal(JsonToken token, List<TokenBuffer> result) throws IOException {
		this.tokenBuffer.copyCurrentEvent(this.parser);

166
		if ((token.isStructEnd() || token.isScalarValue()) && this.objectDepth == 0 && this.arrayDepth == 0) {
167
			result.add(this.tokenBuffer);
168
			this.tokenBuffer = new TokenBuffer(this.parser, this.deserializationContext);
169 170 171 172 173
		}

	}

	private void processTokenArray(JsonToken token, List<TokenBuffer> result) throws IOException {
174
		if (!isTopLevelArrayToken(token)) {
175 176 177
			this.tokenBuffer.copyCurrentEvent(this.parser);
		}

178
		if (this.objectDepth == 0 && (this.arrayDepth == 0 || this.arrayDepth == 1) &&
179
				(token == JsonToken.END_OBJECT || token.isScalarValue())) {
180
			result.add(this.tokenBuffer);
181
			this.tokenBuffer = new TokenBuffer(this.parser, this.deserializationContext);
182
		}
R
Polish  
Rossen Stoyanchev 已提交
183
	}
184

185
	private boolean isTopLevelArrayToken(JsonToken token) {
186 187
		return this.objectDepth == 0 && ((token == JsonToken.START_ARRAY && this.arrayDepth == 1) ||
				(token == JsonToken.END_ARRAY && this.arrayDepth == 0));
188 189
	}

190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
	private void assertInMemorySize(int currentBufferSize, List<TokenBuffer> result) {
		if (this.maxInMemorySize >= 0) {
			if (!result.isEmpty()) {
				this.byteCount = 0;
			}
			else if (currentBufferSize > Integer.MAX_VALUE - this.byteCount) {
				raiseLimitException();
			}
			else {
				this.byteCount += currentBufferSize;
				if (this.byteCount > this.maxInMemorySize) {
					raiseLimitException();
				}
			}
		}
	}

	private void raiseLimitException() {
		throw new DataBufferLimitException(
				"Exceeded limit on max bytes per JSON object: " + this.maxInMemorySize);
	}

212 213 214 215 216

	/**
	 * Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
	 * @param dataBuffers the source data buffers
	 * @param jsonFactory the factory to use
217
	 * @param objectMapper the current mapper instance
218
	 * @param tokenizeArrays if {@code true} and the "top level" JSON object is
219 220 221 222
	 * an array, each element is returned individually immediately after it is received
	 * @return the resulting token buffers
	 */
	public static Flux<TokenBuffer> tokenize(Flux<DataBuffer> dataBuffers, JsonFactory jsonFactory,
223
			ObjectMapper objectMapper, boolean tokenizeArrays, int maxInMemorySize) {
224 225 226

		try {
			JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
227 228 229 230 231
			DeserializationContext context = objectMapper.getDeserializationContext();
			if (context instanceof DefaultDeserializationContext) {
				context = ((DefaultDeserializationContext) context).createInstance(
						objectMapper.getDeserializationConfig(), parser, objectMapper.getInjectableValues());
			}
232
			Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, context, tokenizeArrays, maxInMemorySize);
233 234 235 236 237 238 239
			return dataBuffers.flatMap(tokenizer::tokenize, Flux::error, tokenizer::endOfInput);
		}
		catch (IOException ex) {
			return Flux.error(ex);
		}
	}

240
}