提交 ade95423 编写于 作者: B Boyang Jerry Peng 提交者: Jia Zhai

Fix pulsar sink and source state (#5046)

(cherry picked from commit 40d6248e)
上级 52af2020
......@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.functions.instance;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
......@@ -47,7 +50,6 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
......@@ -72,9 +74,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private final SecretsProvider secretsProvider;
private final Map<String, Object> secretsMap;
@Getter
@Setter
private StateContextImpl stateContext;
@VisibleForTesting
StateContextImpl stateContext;
private Map<String, Object> userConfigs;
private ComponentStatsManager statsManager;
......@@ -95,7 +96,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) {
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
Table<ByteBuf, ByteBuf> stateTable) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
......@@ -146,6 +148,10 @@ class ContextImpl implements Context, SinkContext, SourceContext {
.quantile(0.999, 0.01)
.register(collectorRegistry);
this.componentType = componentType;
if (null != stateTable) {
this.stateContext = new StateContextImpl(stateTable);
}
}
public void setCurrentMessageContext(Record<?> record) {
......
......@@ -213,7 +213,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats);
collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable);
}
/**
......@@ -232,10 +232,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.componentType);
javaInstance = setupJavaInstance();
if (null != stateTable) {
StateContextImpl stateContext = new StateContextImpl(stateTable);
javaInstance.getContext().setStateContext(stateContext);
}
while (true) {
currentRecord = readInput();
......
......@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
......@@ -81,13 +82,12 @@ public class ContextImplTest {
TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
when(producer.newMessage()).thenReturn(messageBuilder);
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null);
FunctionDetails.ComponentType.FUNCTION, null, null);
context.setCurrentMessageContext((Record<String>) () -> null);
}
......@@ -98,6 +98,7 @@ public class ContextImplTest {
@Test(expectedExceptions = IllegalStateException.class)
public void testGetCounterStateDisabled() {
context.getCounter("test-key");
}
......@@ -113,35 +114,31 @@ public class ContextImplTest {
@Test
public void testIncrCounterStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
context.stateContext = mock(StateContextImpl.class);
context.incrCounterAsync("test-key", 10L);
verify(stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
verify(context.stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
}
@Test
public void testGetCounterStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
context.stateContext = mock(StateContextImpl.class);
context.getCounterAsync("test-key");
verify(stateContext, times(1)).getCounter(eq("test-key"));
verify(context.stateContext, times(1)).getCounter(eq("test-key"));
}
@Test
public void testPutStateStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
context.stateContext = mock(StateContextImpl.class);
ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
context.putStateAsync("test-key", buffer);
verify(stateContext, times(1)).put(eq("test-key"), same(buffer));
verify(context.stateContext, times(1)).put(eq("test-key"), same(buffer));
}
@Test
public void testGetStateStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
context.stateContext = mock(StateContextImpl.class);
context.getStateAsync("test-key");
verify(stateContext, times(1)).get(eq("test-key"));
verify(context.stateContext, times(1)).get(eq("test-key"));
}
@Test
......
......@@ -29,6 +29,13 @@
<groupId>org.apache.pulsar.tests</groupId>
<artifactId>java-test-functions</artifactId>
<name>Apache Pulsar :: Tests :: Docker Images :: Java Test Functions</name>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<packaging>jar</packaging>
<profiles>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import java.nio.ByteBuffer;
import java.util.Map;
public class TestStateSink implements Sink<String> {
private SinkContext sinkContext;
private int count;
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
sinkContext.putState("initial", ByteBuffer.wrap("val1".getBytes()));
this.sinkContext = sinkContext;
}
@Override
public void write(Record<String> record) throws Exception {
String initial = new String(sinkContext.getState("initial").array());
String val = String.format("%s-%d", initial, count);
sinkContext.putState("now", ByteBuffer.wrap(val.getBytes()));
count++;
}
@Override
public void close() throws Exception {
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import java.nio.ByteBuffer;
import java.util.Map;
public class TestStateSource implements Source<String> {
private SourceContext sourceContext;
private int count;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
sourceContext.putState("initial", ByteBuffer.wrap("val1".getBytes()));
this.sourceContext = sourceContext;
}
@Override
public Record<String> read() throws Exception {
Thread.sleep(50);
String initial = new String(sourceContext.getState("initial").array());
String val = String.format("%s-%d", initial, count);
sourceContext.putState("now", ByteBuffer.wrap(val.getBytes()));
count++;
return () -> val;
}
@Override
public void close() throws Exception {
}
}
\ No newline at end of file
......@@ -18,18 +18,20 @@
*/
package org.apache.pulsar.tests.integration.functions;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
......@@ -38,9 +40,17 @@ import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.annotations.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
/**
* State related test cases.
*/
@Slf4j
public class PulsarStateTest extends PulsarStandaloneTestSuite {
public static final String WORDCOUNT_PYTHON_CLASS =
......@@ -84,6 +94,139 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
getFunctionInfoNotFound(functionName);
}
@Test
public void testSourceState() throws Exception {
String outputTopicName = "test-state-source-output-" + randomName(8);
String sourceName = "test-state-source-" + randomName(8);
submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSource", JAVAJAR);
// get source info
getSourceInfoSuccess(sourceName);
// get source status
getSourceStatus(sourceName);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
retryStrategically((test) -> {
try {
SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWritten > 0;
} catch (PulsarAdminException e) {
return false;
}
}, 10, 200);
SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numWritten > 0);
FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "initial");
assertEquals(functionState.getStringValue(), "val1");
functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
assertTrue(functionState.getStringValue().matches("val1-.*"));
}
// delete source
deleteSource(sourceName);
getSourceInfoNotFound(sourceName);
}
@Test
public void testSinkState() throws Exception {
String inputTopicName = "test-state-sink-input-" + randomName(8);
String sinkName = "test-state-sink-" + randomName(8);
int numMessages = 10;
submitSinkConnector(sinkName, inputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSink", JAVAJAR);
// get sink info
getSinkInfoSuccess(sinkName);
// get sink status
getSinkStatus(sinkName);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
// java supports schema
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(container.getPlainTextServiceUrl())
.build();
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
FunctionState functionState = admin.functions().getFunctionState("public", "default", sinkName, "initial");
assertEquals(functionState.getStringValue(), "val1");
for (int i = 0; i < numMessages; i++) {
producer.send("foo");
}
retryStrategically((test) -> {
try {
SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWrittenToSink > 0;
} catch (PulsarAdminException e) {
return false;
}
}, 10, 200);
SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink > 0);
functionState = admin.functions().getFunctionState("public", "default", sinkName, "now");
assertEquals(functionState.getStringValue(), String.format("val1-%d", numMessages - 1));
}
// delete source
deleteSink(sinkName);
getSinkInfoNotFound(sinkName);
}
private void submitSourceConnector(String sourceName,
String outputTopicName,
String className,
String archive) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"sources", "create",
"--name", sourceName,
"--destinationTopicName", outputTopicName,
"--archive", archive,
"--classname", className
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = container.execCmd(commands);
assertTrue(
result.getStdout().contains("\"Created successfully\""),
result.getStdout());
}
private void submitSinkConnector(String sinkName,
String inputTopicName,
String className,
String archive) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"sinks", "create",
"--name", sinkName,
"--inputs", inputTopicName,
"--archive", archive,
"--classname", className
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = container.execCmd(commands);
assertTrue(
result.getStdout().contains("\"Created successfully\""),
result.getStdout());
}
private static void submitExclamationFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
......@@ -151,6 +294,30 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
}
}
private static void getSinkInfoSuccess(String sinkName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sinks",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", sinkName
);
assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\""));
}
private static void getSourceInfoSuccess(String sourceName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sources",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", sourceName
);
assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\""));
}
private static void getFunctionInfoSuccess(String functionName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
......@@ -178,6 +345,30 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
}
}
private static void getSinkStatus(String sinkName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sinks",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", sinkName
);
assertTrue(result.getStdout().contains("\"running\" : true"));
}
private static void getSourceStatus(String sourceName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sources",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", sourceName
);
assertTrue(result.getStdout().contains("\"running\" : true"));
}
private static void getFunctionStatus(String functionName, int numMessages) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
......@@ -243,4 +434,60 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
assertTrue(result.getStderr().isEmpty());
}
private static void deleteSource(String sourceName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sources",
"delete",
"--tenant", "public",
"--namespace", "default",
"--name", sourceName
);
assertTrue(result.getStdout().contains("Delete source successfully"));
assertTrue(result.getStderr().isEmpty());
}
private static void deleteSink(String sinkName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sinks",
"delete",
"--tenant", "public",
"--namespace", "default",
"--name", sinkName
);
assertTrue(result.getStdout().contains("Deleted successfully"));
assertTrue(result.getStderr().isEmpty());
}
private static void getSourceInfoNotFound(String sourceName) throws Exception {
try {
container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sources",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", sourceName);
fail("Command should have exited with non-zero");
} catch (ContainerExecException e) {
assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
}
}
private static void getSinkInfoNotFound(String sinkName) throws Exception {
try {
container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sinks",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", sinkName);
fail("Command should have exited with non-zero");
} catch (ContainerExecException e) {
assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist"));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册