提交 be5e847a 编写于 作者: S Sijie Guo 提交者: Sanjeev Kulkarni

[functions] change instance id from string to int and expose number of instances in context (#2411)

* [functions] change instance id from string to int and expose number of instances in context

 ### Motivation

When writing a connector reading from a list of sources, it is hard for the connector implementation to decide how
to distribute the list of sources across the function instances. because there is no way to tell how many function
instances is running.

 ### Changes

- change instance id from string to integer (since the implementation is already assuming instance id is an integer)
- add getNumInstances in the context
- expose both interfaces in source and sink connector context

* Fix compilation
上级 cb52d519
......@@ -1258,7 +1258,7 @@ public class CmdFunctions extends CmdBase {
// TODO: correctly implement function version and id
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(Integer.toString(i + instanceIdOffset));
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(Utils.findAvailablePort());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
......
......@@ -86,7 +86,14 @@ public interface Context {
*
* @return the instance id
*/
String getInstanceId();
int getInstanceId();
/**
* Get the number of instances that invoke this function.
*
* @return the number of instances that invoke this function.
*/
int getNumInstances();
/**
* The version of the function that we are executing
......
......@@ -177,8 +177,13 @@ class ContextImpl implements Context, SinkContext, SourceContext {
}
@Override
public String getInstanceId() {
return config.getInstanceId().toString();
public int getInstanceId() {
return config.getInstanceId();
}
@Override
public int getNumInstances() {
return config.getFunctionDetails().getParallelism();
}
@Override
......
......@@ -35,10 +35,19 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@EqualsAndHashCode
@ToString
public class InstanceConfig {
private String instanceId;
private int instanceId;
private String functionId;
private String functionVersion;
private FunctionDetails functionDetails;
private int maxBufferedTuples;
private int port;
/**
* Get the string representation of {@link #getInstanceId()}.
*
* @return the string representation of {@link #getInstanceId()}.
*/
public String getInstanceName() {
return "" + instanceId;
}
}
......@@ -51,7 +51,6 @@ import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
......@@ -65,7 +64,6 @@ import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Build
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.utils.ConsumerConfig;
......@@ -144,7 +142,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// initialize the thread context
ThreadContext.put("function", FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
ThreadContext.put("instance", instanceConfig.getInstanceId());
ThreadContext.put("instance", instanceConfig.getInstanceName());
log.info("Starting Java Instance {} : \n Details = {}",
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
......@@ -239,17 +237,18 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
private void loadJars() throws Exception {
try {
// Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(instanceConfig.getFunctionId(), instanceConfig.getInstanceId(),
jarFile);
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
jarFile);
} catch (FileNotFoundException e) {
log.info("For Function {} Loading as NAR failed with {}; treating it as Jar instead", instanceConfig, e);
// create the function class loader
fnCache.registerFunctionInstance(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceId(),
instanceConfig.getInstanceName(),
Arrays.asList(jarFile),
Collections.emptyList());
}
......@@ -393,7 +392,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// once the thread quits, clean up the instance
fnCache.unregisterFunctionInstance(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceId());
instanceConfig.getInstanceName());
log.info("Unloading JAR files for function {}", instanceConfig);
}
......
......@@ -40,7 +40,7 @@ public interface WindowContext {
*
* @return the instance id
*/
String getInstanceId();
int getInstanceId();
/**
* The version of the function that we are executing
......
......@@ -42,7 +42,7 @@ public class WindowContextImpl implements WindowContext {
}
@Override
public String getInstanceId() {
public int getInstanceId() {
return this.context.getInstanceId();
}
......
......@@ -54,7 +54,7 @@ public class JavaInstanceMain implements AutoCloseable {
protected String jarFile;
@Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
protected String instanceId;
protected int instanceId;
@Parameter(names = "--function_id", description = "Function Id\n", required = true)
protected String functionId;
......
......@@ -128,7 +128,7 @@ class ProcessRuntime implements Runtime {
// TODO:- Find a platform independent way of controlling memory for a python application
}
args.add("--instance_id");
args.add(instanceConfig.getInstanceId());
args.add(instanceConfig.getInstanceName());
args.add("--function_id");
args.add(instanceConfig.getFunctionId());
args.add("--function_version");
......
......@@ -106,8 +106,8 @@ public class RuntimeSpawner implements AutoCloseable {
public CompletableFuture<FunctionStatus> getFunctionStatus() {
return runtime.getFunctionStatus().thenApply(f -> {
FunctionStatus.Builder builder = FunctionStatus.newBuilder();
builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceId());
FunctionStatus.Builder builder = FunctionStatus.newBuilder();
builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceName());
if (!f.getRunning() && runtimeDeathException != null) {
builder.setFailureException(runtimeDeathException.getMessage());
}
......
......@@ -104,7 +104,7 @@ public class ProcessRuntimeTest {
config.setFunctionDetails(createFunctionDetails(runtime));
config.setFunctionId(java.util.UUID.randomUUID().toString());
config.setFunctionVersion("1.0");
config.setInstanceId(java.util.UUID.randomUUID().toString());
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
return config;
......
......@@ -165,7 +165,7 @@ public class FunctionActioner implements AutoCloseable {
// TODO: set correct function id and version when features implemented
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
......
......@@ -19,6 +19,21 @@
package org.apache.pulsar.io.core;
public interface SinkContext {
/**
* The id of the instance that invokes this function.
*
* @return the instance id
*/
int getInstanceId();
/**
* Get the number of instances that invoke this function.
*
* @return the number of instances that invoke this function.
*/
int getNumInstances();
/**
* Record a user defined metric
* @param metricName The name of the metric
......
......@@ -19,6 +19,21 @@
package org.apache.pulsar.io.core;
public interface SourceContext {
/**
* The id of the instance that invokes this function.
*
* @return the instance id
*/
int getInstanceId();
/**
* Get the number of instances that invoke this function.
*
* @return the number of instances that invoke this function.
*/
int getNumInstances();
/**
* Record a user defined metric
* @param metricName The name of the metric
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册