diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index e5de503b220fd72d368cff5405696ac6f84fbe92..eaadb38fda9c93fd6d526f55cbe2de9a02480104 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -18,12 +18,15 @@ */ package org.apache.pulsar.functions.instance; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Summary; import lombok.Getter; import lombok.Setter; +import org.apache.bookkeeper.api.kv.Table; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.ProducerBuilderImpl; @@ -47,7 +50,6 @@ import org.slf4j.Logger; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; @@ -72,9 +74,8 @@ class ContextImpl implements Context, SinkContext, SourceContext { private final SecretsProvider secretsProvider; private final Map secretsMap; - @Getter - @Setter - private StateContextImpl stateContext; + @VisibleForTesting + StateContextImpl stateContext; private Map userConfigs; private ComponentStatsManager statsManager; @@ -95,7 +96,8 @@ class ContextImpl implements Context, SinkContext, SourceContext { public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, - Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) { + Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager, + Table stateTable) { this.config = config; this.logger = logger; this.publishProducers = new HashMap<>(); @@ -146,6 +148,10 @@ class ContextImpl implements Context, SinkContext, SourceContext { .quantile(0.999, 0.01) .register(collectorRegistry); this.componentType = componentType; + + if (null != stateTable) { + this.stateContext = new StateContextImpl(stateTable); + } } public void setCurrentMessageContext(Record record) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 66d4ba7c3ac79f11a928edbfe95f3fe39eeb03f4..3d9191619c5ee09557adc02be81cc137cbd909f4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -213,7 +213,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, - collectorRegistry, metricsLabels, this.componentType, this.stats); + collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable); } /** @@ -232,10 +232,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { this.componentType); javaInstance = setupJavaInstance(); - if (null != stateTable) { - StateContextImpl stateContext = new StateContextImpl(stateTable); - javaInstance.getContext().setStateContext(stateContext); - } while (true) { currentRecord = readInput(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index bfe545af274f5d9a43f308af7255bed9cb2f1169..9af7a47991d1b3130880ecceb28239f9650c105e 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -36,6 +36,7 @@ import java.nio.ByteBuffer; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.api.kv.Table; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; @@ -81,13 +82,12 @@ public class ContextImplTest { TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING)); doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync(); when(producer.newMessage()).thenReturn(messageBuilder); - context = new ContextImpl( config, logger, client, new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0], - FunctionDetails.ComponentType.FUNCTION, null); + FunctionDetails.ComponentType.FUNCTION, null, null); context.setCurrentMessageContext((Record) () -> null); } @@ -98,6 +98,7 @@ public class ContextImplTest { @Test(expectedExceptions = IllegalStateException.class) public void testGetCounterStateDisabled() { + context.getCounter("test-key"); } @@ -113,35 +114,31 @@ public class ContextImplTest { @Test public void testIncrCounterStateEnabled() throws Exception { - StateContextImpl stateContext = mock(StateContextImpl.class); - context.setStateContext(stateContext); + context.stateContext = mock(StateContextImpl.class); context.incrCounterAsync("test-key", 10L); - verify(stateContext, times(1)).incrCounter(eq("test-key"), eq(10L)); + verify(context.stateContext, times(1)).incrCounter(eq("test-key"), eq(10L)); } @Test public void testGetCounterStateEnabled() throws Exception { - StateContextImpl stateContext = mock(StateContextImpl.class); - context.setStateContext(stateContext); + context.stateContext = mock(StateContextImpl.class); context.getCounterAsync("test-key"); - verify(stateContext, times(1)).getCounter(eq("test-key")); + verify(context.stateContext, times(1)).getCounter(eq("test-key")); } @Test public void testPutStateStateEnabled() throws Exception { - StateContextImpl stateContext = mock(StateContextImpl.class); - context.setStateContext(stateContext); + context.stateContext = mock(StateContextImpl.class); ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8)); context.putStateAsync("test-key", buffer); - verify(stateContext, times(1)).put(eq("test-key"), same(buffer)); + verify(context.stateContext, times(1)).put(eq("test-key"), same(buffer)); } @Test public void testGetStateStateEnabled() throws Exception { - StateContextImpl stateContext = mock(StateContextImpl.class); - context.setStateContext(stateContext); + context.stateContext = mock(StateContextImpl.class); context.getStateAsync("test-key"); - verify(stateContext, times(1)).get(eq("test-key")); + verify(context.stateContext, times(1)).get(eq("test-key")); } @Test diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml index 18d7e0ec5cb27d6a5a8d8aadb4135a50f48e0604..bd5e77bb41ac0a7d97b77068feb54fe1adf6350f 100644 --- a/tests/docker-images/java-test-functions/pom.xml +++ b/tests/docker-images/java-test-functions/pom.xml @@ -29,6 +29,13 @@ org.apache.pulsar.tests java-test-functions Apache Pulsar :: Tests :: Docker Images :: Java Test Functions + + + org.apache.pulsar + pulsar-io-core + ${project.version} + + jar diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java new file mode 100644 index 0000000000000000000000000000000000000000..d1d074065e3cf4b1287e49828c0de634ae5e1a56 --- /dev/null +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class TestStateSink implements Sink { + + private SinkContext sinkContext; + private int count; + + @Override + public void open(Map config, SinkContext sinkContext) throws Exception { + sinkContext.putState("initial", ByteBuffer.wrap("val1".getBytes())); + this.sinkContext = sinkContext; + } + + @Override + public void write(Record record) throws Exception { + String initial = new String(sinkContext.getState("initial").array()); + String val = String.format("%s-%d", initial, count); + sinkContext.putState("now", ByteBuffer.wrap(val.getBytes())); + count++; + } + + @Override + public void close() throws Exception { + + } +} diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java new file mode 100644 index 0000000000000000000000000000000000000000..ebbd8097b21cbabe44fc0e1355f4b2ae8da77a29 --- /dev/null +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Source; +import org.apache.pulsar.io.core.SourceContext; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class TestStateSource implements Source { + + + private SourceContext sourceContext; + private int count; + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + sourceContext.putState("initial", ByteBuffer.wrap("val1".getBytes())); + this.sourceContext = sourceContext; + } + + @Override + public Record read() throws Exception { + Thread.sleep(50); + String initial = new String(sourceContext.getState("initial").array()); + String val = String.format("%s-%d", initial, count); + sourceContext.putState("now", ByteBuffer.wrap(val.getBytes())); + count++; + return () -> val; + } + + @Override + public void close() throws Exception { + + } +} \ No newline at end of file diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java index 264e9bc81d4c5bafca940163bfd5b6afbd10fe01..2e386ebe6ba1e647425e59b727ea122b6ce3b698 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java @@ -18,18 +18,20 @@ */ package org.apache.pulsar.tests.integration.functions; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.functions.FunctionState; +import org.apache.pulsar.common.policies.data.SinkStatus; +import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; @@ -38,9 +40,17 @@ import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testng.annotations.Test; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; +import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + /** * State related test cases. */ +@Slf4j public class PulsarStateTest extends PulsarStandaloneTestSuite { public static final String WORDCOUNT_PYTHON_CLASS = @@ -84,6 +94,139 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { getFunctionInfoNotFound(functionName); } + @Test + public void testSourceState() throws Exception { + String outputTopicName = "test-state-source-output-" + randomName(8); + String sourceName = "test-state-source-" + randomName(8); + + submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSource", JAVAJAR); + + // get source info + getSourceInfoSuccess(sourceName); + + // get source status + getSourceStatus(sourceName); + + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { + + retryStrategically((test) -> { + try { + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWritten > 0; + } catch (PulsarAdminException e) { + return false; + } + }, 10, 200); + + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numWritten > 0); + + FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "initial"); + assertEquals(functionState.getStringValue(), "val1"); + + functionState = admin.functions().getFunctionState("public", "default", sourceName, "now"); + assertTrue(functionState.getStringValue().matches("val1-.*")); + } + + // delete source + deleteSource(sourceName); + + getSourceInfoNotFound(sourceName); + } + + @Test + public void testSinkState() throws Exception { + String inputTopicName = "test-state-sink-input-" + randomName(8); + String sinkName = "test-state-sink-" + randomName(8); + int numMessages = 10; + + submitSinkConnector(sinkName, inputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSink", JAVAJAR); + + // get sink info + getSinkInfoSuccess(sinkName); + + // get sink status + getSinkStatus(sinkName); + + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { + + // java supports schema + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(container.getPlainTextServiceUrl()) + .build(); + @Cleanup Producer producer = client.newProducer(Schema.STRING) + .topic(inputTopicName) + .create(); + + FunctionState functionState = admin.functions().getFunctionState("public", "default", sinkName, "initial"); + assertEquals(functionState.getStringValue(), "val1"); + + for (int i = 0; i < numMessages; i++) { + producer.send("foo"); + } + + retryStrategically((test) -> { + try { + SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); + return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWrittenToSink > 0; + } catch (PulsarAdminException e) { + return false; + } + }, 10, 200); + + SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink > 0); + + functionState = admin.functions().getFunctionState("public", "default", sinkName, "now"); + assertEquals(functionState.getStringValue(), String.format("val1-%d", numMessages - 1)); + } + + // delete source + deleteSink(sinkName); + + getSinkInfoNotFound(sinkName); + } + + private void submitSourceConnector(String sourceName, + String outputTopicName, + String className, + String archive) throws Exception { + String[] commands = { + PulsarCluster.ADMIN_SCRIPT, + "sources", "create", + "--name", sourceName, + "--destinationTopicName", outputTopicName, + "--archive", archive, + "--classname", className + }; + log.info("Run command : {}", StringUtils.join(commands, ' ')); + ContainerExecResult result = container.execCmd(commands); + assertTrue( + result.getStdout().contains("\"Created successfully\""), + result.getStdout()); + } + + private void submitSinkConnector(String sinkName, + String inputTopicName, + String className, + String archive) throws Exception { + String[] commands = { + PulsarCluster.ADMIN_SCRIPT, + "sinks", "create", + "--name", sinkName, + "--inputs", inputTopicName, + "--archive", archive, + "--classname", className + }; + log.info("Run command : {}", StringUtils.join(commands, ' ')); + ContainerExecResult result = container.execCmd(commands); + assertTrue( + result.getStdout().contains("\"Created successfully\""), + result.getStdout()); + } + private static void submitExclamationFunction(Runtime runtime, String inputTopicName, String outputTopicName, @@ -151,6 +294,30 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { } } + private static void getSinkInfoSuccess(String sinkName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sinks", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sinkName + ); + assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\"")); + } + + private static void getSourceInfoSuccess(String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\"")); + } + private static void getFunctionInfoSuccess(String functionName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, @@ -178,6 +345,30 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { } } + private static void getSinkStatus(String sinkName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sinks", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sinkName + ); + assertTrue(result.getStdout().contains("\"running\" : true")); + } + + private static void getSourceStatus(String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("\"running\" : true")); + } + private static void getFunctionStatus(String functionName, int numMessages) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, @@ -243,4 +434,60 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { assertTrue(result.getStderr().isEmpty()); } + private static void deleteSource(String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "delete", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("Delete source successfully")); + assertTrue(result.getStderr().isEmpty()); + } + + private static void deleteSink(String sinkName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sinks", + "delete", + "--tenant", "public", + "--namespace", "default", + "--name", sinkName + ); + assertTrue(result.getStdout().contains("Deleted successfully")); + assertTrue(result.getStderr().isEmpty()); + } + + private static void getSourceInfoNotFound(String sourceName) throws Exception { + try { + container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName); + fail("Command should have exited with non-zero"); + } catch (ContainerExecException e) { + assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist")); + } + } + + private static void getSinkInfoNotFound(String sinkName) throws Exception { + try { + container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sinks", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sinkName); + fail("Command should have exited with non-zero"); + } catch (ContainerExecException e) { + assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist")); + } + } + }