未验证 提交 23c5505d 编写于 作者: B Boyang Jerry Peng 提交者: GitHub

cleaning up and improving function metrics (#2994)

* removing metrics module from functions

* removing other references ot Prometheus metrics server

* removing references to pulsar-functions-metrics

* fix unit tests

* revert test change

* cleaning up and improving function metrics

* slight refactoring

* fix unittest

* moving pulsar-spark tests into its own module
上级 784044b9
......@@ -88,11 +88,6 @@
<destName>java-instance.jar</destName>
<outputDirectory>instances</outputDirectory>
</file>
<file>
<source>${basedir}/../../pulsar-functions/metrics/target/PrometheusMetricsServer.jar</source>
<destName>PrometheusMetricsServer.jar</destName>
<outputDirectory>instances</outputDirectory>
</file>
<file>
<source>${basedir}/../../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar</source>
<destName>api-examples.jar</destName>
......
......@@ -37,6 +37,7 @@ import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.worker.WorkerService;
......@@ -164,7 +165,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@GET
@ApiOperation(
value = "Displays the status of a Pulsar Function running in cluster mode",
value = "Displays the status of a Pulsar Function",
response = FunctionStatus.class
)
@ApiResponses(value = {
......@@ -179,6 +180,40 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
}
@GET
@ApiOperation(
value = "Displays the stats of a Pulsar Function",
response = FunctionStats.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}/{functionName}/stats")
public Response getFunctionStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
}
@GET
@ApiOperation(
value = "Displays the stats of a Pulsar Function instance",
response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionsInstanceStats(
tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}
@GET
@ApiOperation(
value = "Lists all Pulsar Functions currently deployed in a given namespace",
......
......@@ -54,7 +54,7 @@ public class WorkerStats extends AdminResource implements Supplier<WorkerService
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
return worker.getWorkerMetrcis(clientAppId());
return worker.getWorkerMetrics(clientAppId());
}
@GET
......
......@@ -25,6 +25,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.common.functions.FunctionConfig;
......@@ -198,6 +199,39 @@ public interface Functions {
FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id)
throws PulsarAdminException;
/**
* Gets the current stats of a function instance.
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @param id
* Function instance-id
* @return
* @throws PulsarAdminException
*/
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id)
throws PulsarAdminException;
/**
* Gets the current stats of a function.
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @return
* @throws PulsarAdminException
*/
FunctionStats getFunctionStats(String tenant, String namespace, String function)
throws PulsarAdminException;
/**
* Restart function instance
*
......
......@@ -22,38 +22,36 @@ import com.google.gson.Gson;
import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
public class FunctionsImpl extends BaseResource implements Functions {
......@@ -126,6 +124,33 @@ public class FunctionsImpl extends BaseResource implements Functions {
}
}
@Override
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id) throws PulsarAdminException {
try {
Response response = request(
functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public FunctionStats getFunctionStats(String tenant, String namespace, String function) throws PulsarAdminException {
try {
Response response = request(
functions.path(tenant).path(namespace).path(function).path("stats")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(FunctionStats.class);
} catch (Exception e) {
throw getApiException(e);
} }
@Override
public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
try {
......
......@@ -76,6 +76,8 @@ public class CmdFunctions extends CmdBase {
private final UpdateFunction updater;
private final GetFunction getter;
private final GetFunctionStatus functionStatus;
@Getter
private final GetFunctionStats functionStats;
private final RestartFunction restart;
private final StopFunction stop;
private final ListFunctions lister;
......@@ -638,6 +640,24 @@ public class CmdFunctions extends CmdBase {
}
}
@Parameters(commandDescription = "Get the current stats of a Pulsar Function")
class GetFunctionStats extends FunctionCommand {
@Parameter(names = "--instance-id", description = "The function instanceId (Get-status of all instances if instance-id is not provided")
protected String instanceId;
@Override
void runCmd() throws Exception {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
if (isBlank(instanceId)) {
System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName)));
} else {
System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId))));
}
}
}
@Parameters(commandDescription = "Restart function instance")
class RestartFunction extends FunctionCommand {
......@@ -878,6 +898,7 @@ public class CmdFunctions extends CmdBase {
updater = new UpdateFunction();
getter = new GetFunction();
functionStatus = new GetFunctionStatus();
functionStats = new GetFunctionStats();
lister = new ListFunctions();
stateGetter = new StateGetter();
triggerer = new TriggerFunction();
......@@ -893,6 +914,7 @@ public class CmdFunctions extends CmdBase {
jcommander.addCommand("restart", getRestarter());
jcommander.addCommand("stop", getStopper());
jcommander.addCommand("getstatus", getStatuser());
jcommander.addCommand("stats", getFunctionStats());
jcommander.addCommand("list", getLister());
jcommander.addCommand("querystate", getStateGetter());
jcommander.addCommand("trigger", getTriggerer());
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;
import lombok.Data;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@Data
public class FunctionStats {
/**
* Overall total number of records function received from source
**/
public long receivedTotal;
/**
* Overall total number of records successfully processed by user function
**/
public long processedSuccessfullyTotal;
/**
* Overall total number of system exceptions thrown
**/
public long systemExceptionsTotal;
/**
* Overall total number of user exceptions thrown
**/
public long userExceptionsTotal;
/**
* Average process latency for function
**/
public double avgProcessLatency;
/**
* Timestamp of when the function was last invoked by any instance
**/
public long lastInvocation;
@Data
public static class FunctionInstanceStats {
/** Instance Id of function instance **/
public int instanceId;
@Data
public static class FunctionInstanceStatsData {
/**
* Total number of records function received from source for instance
**/
public long receivedTotal;
/**
* Total number of records successfully processed by user function for instance
**/
public long processedSuccessfullyTotal;
/**
* Total number of system exceptions thrown for instance
**/
public long systemExceptionsTotal;
/**
* Total number of user exceptions thrown for instance
**/
public long userExceptionsTotal;
/**
* Average process latency for function for instance
**/
public double avgProcessLatency;
/**
* Timestamp of when the function was last invoked for instance
**/
public long lastInvocation;
/**
* Map of user defined metrics
**/
public Map<String, Double> userMetrics = new HashMap<>();
}
public FunctionInstanceStatsData metrics = new FunctionInstanceStatsData();
}
public List<FunctionInstanceStats> instances = new LinkedList<>();
public void addInstance(FunctionInstanceStats functionInstanceStats) {
instances.add(functionInstanceStats);
}
public FunctionStats calculateOverall() {
lastInvocation = 0;
instances.forEach(new Consumer<FunctionInstanceStats>() {
@Override
public void accept(FunctionInstanceStats functionInstanceStats) {
FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = functionInstanceStats.getMetrics();
receivedTotal += functionInstanceStatsData.receivedTotal;
processedSuccessfullyTotal += functionInstanceStatsData.processedSuccessfullyTotal;
systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal;
userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal;
avgProcessLatency += functionInstanceStatsData.avgProcessLatency;
if (functionInstanceStatsData.lastInvocation > lastInvocation) {
lastInvocation = functionInstanceStatsData.lastInvocation;
}
}
});
// calculate average from sum
avgProcessLatency = avgProcessLatency / instances.size();
return this;
}
}
......@@ -50,11 +50,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.functions.instance.FunctionStatsManager.USER_METRIC_PREFIX;
/**
* This class implements the Context interface exposed to the user.
......@@ -67,35 +66,6 @@ class ContextImpl implements Context, SinkContext, SourceContext {
// Per Message related
private Record<?> record;
@Getter
@Setter
private class AccumulatedMetricDatum {
private double count;
private double sum;
private double max;
private double min;
AccumulatedMetricDatum() {
count = 0;
sum = 0;
max = Double.MIN_VALUE;
min = Double.MAX_VALUE;
}
public void update(double value) {
count++;
sum += value;
if (max < value) {
max = value;
}
if (min > value) {
min = value;
}
}
}
private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
private Map<String, Producer<?>> publishProducers;
private ProducerBuilderImpl<?> producerBuilder;
......@@ -118,15 +88,14 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private final static String[] userMetricsLabelNames;
static {
// add label to indicate user metric
userMetricsLabelNames = Arrays.copyOf(FunctionStats.metricsLabelNames, FunctionStats.metricsLabelNames.length + 1);
userMetricsLabelNames[FunctionStats.metricsLabelNames.length] = "metric";
userMetricsLabelNames = Arrays.copyOf(FunctionStatsManager.metricsLabelNames, FunctionStatsManager.metricsLabelNames.length + 1);
userMetricsLabelNames[FunctionStatsManager.metricsLabelNames.length] = "metric";
}
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels) {
this.config = config;
this.logger = logger;
this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>();
this.inputTopics = inputTopics;
this.topicSchema = new TopicSchema(client);
......@@ -342,40 +311,43 @@ class ContextImpl implements Context, SinkContext, SourceContext {
@Override
public void recordMetric(String metricName, double value) {
userMetricsLabels.computeIfAbsent(metricName,
s -> {
String[] userMetricLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
userMetricLabels[userMetricLabels.length - 1] = metricName;
return userMetricLabels;
});
userMetricsSummary.labels(userMetricsLabels.get(metricName)).observe(value);
accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum());
accumulatedMetrics.get(metricName).update(value);
String[] userMetricLabels = userMetricsLabels.get(metricName);
if (userMetricLabels == null) {
userMetricLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
userMetricLabels[userMetricLabels.length - 1] = metricName;
// set label for metrics before putting into userMetricsLabels map to
// prevent race condition with getMetrics calls
userMetricsSummary.labels(userMetricLabels).observe(value);
userMetricsLabels.put(metricName, userMetricLabels);
} else {
userMetricsSummary.labels(userMetricLabels).observe(value);
}
}
public MetricsData getAndResetMetrics() {
MetricsData retval = getMetrics();
public Map<String, Double> getAndResetMetrics() {
Map<String, Double> retval = getMetrics();
resetMetrics();
return retval;
}
public void resetMetrics() {
userMetricsSummary.clear();
this.accumulatedMetrics.clear();
}
public MetricsData getMetrics() {
MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder();
for (String metricName : accumulatedMetrics.keySet()) {
MetricsData.DataDigest.Builder bldr = MetricsData.DataDigest.newBuilder();
bldr.setSum(accumulatedMetrics.get(metricName).getSum());
bldr.setCount(accumulatedMetrics.get(metricName).getCount());
bldr.setMax(accumulatedMetrics.get(metricName).getMax());
bldr.setMin(accumulatedMetrics.get(metricName).getMax());
metricsDataBuilder.putMetrics(metricName, bldr.build());
public Map<String, Double> getMetrics() {
Map<String, Double> metricsMap = new HashMap<>();
for (Map.Entry<String, String[]> userMetricsLabelsEntry : userMetricsLabels.entrySet()) {
String metricName = userMetricsLabelsEntry.getKey();
String[] labels = userMetricsLabelsEntry.getValue();
Summary.Child.Value summary = userMetricsSummary.labels(labels).get();
metricsMap.put(String.format("%s%s_sum", USER_METRIC_PREFIX, metricName), summary.sum);
metricsMap.put(String.format("%s%s_count", USER_METRIC_PREFIX, metricName), summary.count);
for (Map.Entry<Double, Double> entry : summary.quantiles.entrySet()) {
Double quantile = entry.getKey();
Double value = entry.getValue();
metricsMap.put(String.format("%s%s_%s", USER_METRIC_PREFIX, metricName, quantile), value);
}
}
MetricsData retval = metricsDataBuilder.build();
return retval;
return metricsMap;
}
}
\ No newline at end of file
......@@ -34,18 +34,21 @@ import org.apache.pulsar.functions.proto.InstanceCommunication;
@Slf4j
@Getter
@Setter
public class FunctionStats {
public class FunctionStatsManager {
static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster"};
public static final String PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_";
public final static String USER_METRIC_PREFIX = "user_metric_";
/** Declare metric names **/
static final String PULSAR_FUNCTION_PROCESSED_TOTAL = "pulsar_function_processed_total";
static final String PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL = "pulsar_function_processed_successfully_total";
static final String PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL = "pulsar_function_system_exceptions_total";
static final String PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL = "pulsar_function_user_exceptions_total";
static final String PULSAR_FUNCTION_PROCESS_LATENCY_MS = "pulsar_function_process_latency_ms";
static final String PULSAR_FUNCTION_LAST_INVOCATION = "pulsar_function_last_invocation";
static final String PULSAR_FUNCTION_RECEIVED_TOTAL = "pulsar_function_received_total";
public static final String PROCESSED_TOTAL = "processed_total";
public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total";
public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total";
public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
public static final String PROCESS_LATENCY_MS = "process_latency_ms";
public static final String LAST_INVOCATION = "last_invocation";
public static final String RECEIVED_TOTAL = "received_total";
/** Declare Prometheus stats **/
......@@ -70,37 +73,37 @@ public class FunctionStats {
@Getter
private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10);
public FunctionStats(CollectorRegistry collectorRegistry) {
public FunctionStatsManager(CollectorRegistry collectorRegistry) {
// Declare function local collector registry so that it will not clash with other function instances'
// metrics collection especially in threaded mode
functionCollectorRegistry = new CollectorRegistry();
statTotalProcessed = Counter.build()
.name(PULSAR_FUNCTION_PROCESSED_TOTAL)
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL)
.help("Total number of messages processed.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
statTotalProcessedSuccessfully = Counter.build()
.name(PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL)
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
statTotalSysExceptions = Counter.build()
.name(PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL)
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
statTotalUserExceptions = Counter.build()
.name(PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL)
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
statProcessLatency = Summary.build()
.name(PULSAR_FUNCTION_PROCESS_LATENCY_MS)
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
......@@ -110,13 +113,13 @@ public class FunctionStats {
.register(collectorRegistry);
statlastInvocation = Gauge.build()
.name(PULSAR_FUNCTION_LAST_INVOCATION)
.name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the function")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
statTotalRecordsRecieved = Counter.build()
.name(PULSAR_FUNCTION_RECEIVED_TOTAL)
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
......
......@@ -26,6 +26,8 @@ import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import java.util.Map;
/**
* This is the Java Instance. This is started by the runtimeSpawner using the JavaInstanceClient
* program if invoking via a process based invocation or using JavaInstance using a thread
......@@ -74,7 +76,7 @@ public class JavaInstance implements AutoCloseable {
public void close() {
}
public InstanceCommunication.MetricsData getAndResetMetrics() {
public Map<String, Double> getAndResetMetrics() {
return context.getAndResetMetrics();
}
......@@ -82,7 +84,7 @@ public class JavaInstance implements AutoCloseable {
context.resetMetrics();
}
public InstanceCommunication.MetricsData getMetrics() {
public Map<String, Double> getMetrics() {
return context.getMetrics();
}
}
......@@ -109,7 +109,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Throwable deathException;
// function stats
private final FunctionStats stats;
private final FunctionStatsManager stats;
private Record<?> currentRecord;
......@@ -133,7 +133,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.stats = new FunctionStats(collectorRegistry);
this.stats = new FunctionStatsManager(collectorRegistry);
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.metricsLabels = new String[]{
......@@ -423,23 +423,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
public InstanceCommunication.MetricsData getAndResetMetrics() {
InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
InstanceCommunication.MetricsData metricsData = getMetrics();
stats.reset();
if (javaInstance != null) {
InstanceCommunication.MetricsData userMetrics = javaInstance.getAndResetMetrics();
if (userMetrics != null) {
bldr.putAllMetrics(userMetrics.getMetricsMap());
}
}
return bldr.build();
return metricsData;
}
public InstanceCommunication.MetricsData getMetrics() {
InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
if (javaInstance != null) {
InstanceCommunication.MetricsData userMetrics = javaInstance.getMetrics();
Map<String, Double> userMetrics = javaInstance.getMetrics();
if (userMetrics != null) {
bldr.putAllMetrics(userMetrics.getMetricsMap());
bldr.putAllUserMetrics(userMetrics);
}
}
return bldr.build();
......@@ -452,16 +446,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_TOTAL, stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL, stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL, stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL, stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_RECEIVED_TOTAL, stats.statTotalRecordsRecieved.labels(metricsLabels).get(), bldr);
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESS_LATENCY_MS,
stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0
? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count,
bldr);
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_LAST_INVOCATION, stats.statlastInvocation.labels(metricsLabels).get(), bldr);
bldr.setProcessedTotal((long) stats.statTotalProcessed.labels(metricsLabels).get());
bldr.setProcessedSuccessfullyTotal((long) stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
bldr.setSystemExceptionsTotal((long) stats.statTotalSysExceptions.labels(metricsLabels).get());
bldr.setUserExceptionsTotal((long) stats.statTotalUserExceptions.labels(metricsLabels).get());
bldr.setReceivedTotal((long) stats.statTotalRecordsRecieved.labels(metricsLabels).get());
bldr.setAvgProcessLatency(stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0
? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count);
bldr.setLastInvocation((long) stats.statlastInvocation.labels(metricsLabels).get());
return bldr;
}
......@@ -485,13 +479,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
return functionStatusBuilder;
}
private static void addSystemMetrics(String metricName, double value, InstanceCommunication.MetricsData.Builder bldr) {
InstanceCommunication.MetricsData.DataDigest digest =
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(value).setSum(value).setMax(value).setMin(0).build();
bldr.putMetrics(metricName, digest);
}
private void setupLogHandler() {
if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
!instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
......
......@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='InstanceCommunication.proto',
package='proto',
syntax='proto3',
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xd8\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12#\n\x07metrics\x18\x0f \x01(\x0b\x32\x12.proto.MetricsData\x12\x10\n\x08workerId\x18\x10 \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\"\x98\x01\n\x07Metrics\x12/\n\x07metrics\x18\x01 \x03(\x0b\x32\x1e.proto.Metrics.InstanceMetrics\x1a\\\n\x0fInstanceMetrics\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\x12\'\n\x0bmetricsData\x18\x03 \x01(\x0b\x32\x12.proto.MetricsData2\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12\x10\n\x08workerId\x18\x10 \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"V\n\x12\x46unctionStatusList\x12\r\n\x05\x65rror\x18\x02 \x01(\t\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xbd\x02\n\x0bMetricsData\x12\x15\n\rreceivedTotal\x18\x02 \x01(\x03\x12\x16\n\x0eprocessedTotal\x18\x03 \x01(\x03\x12\"\n\x1aprocessedSuccessfullyTotal\x18\x04 \x01(\x03\x12\x1d\n\x15systemExceptionsTotal\x18\x05 \x01(\x03\x12\x1b\n\x13userExceptionsTotal\x18\x06 \x01(\x03\x12\x19\n\x11\x61vgProcessLatency\x18\x07 \x01(\x01\x12\x16\n\x0elastInvocation\x18\x08 \x01(\x03\x12\x38\n\x0buserMetrics\x18\t \x03(\x0b\x32#.proto.MetricsData.UserMetricsEntry\x1a\x32\n\x10UserMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\"\x98\x01\n\x07Metrics\x12/\n\x07metrics\x18\x01 \x03(\x0b\x32\x1e.proto.Metrics.InstanceMetrics\x1a\\\n\x0fInstanceMetrics\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\x12\'\n\x0bmetricsData\x18\x03 \x01(\x0b\x32\x12.proto.MetricsData2\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
,
dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
......@@ -79,8 +79,8 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=661,
serialized_end=730,
serialized_start=624,
serialized_end=693,
)
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
......@@ -116,8 +116,8 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=732,
serialized_end=796,
serialized_start=695,
serialized_end=759,
)
_FUNCTIONSTATUS = _descriptor.Descriptor(
......@@ -226,14 +226,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='metrics', full_name='proto.FunctionStatus.metrics', index=14,
number=15, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='workerId', full_name='proto.FunctionStatus.workerId', index=15,
name='workerId', full_name='proto.FunctionStatus.workerId', index=14,
number=16, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
......@@ -252,7 +245,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=68,
serialized_end=796,
serialized_end=759,
)
......@@ -264,7 +257,14 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor(
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='functionStatusList', full_name='proto.FunctionStatusList.functionStatusList', index=0,
name='error', full_name='proto.FunctionStatusList.error', index=0,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='functionStatusList', full_name='proto.FunctionStatusList.functionStatusList', index=1,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
......@@ -282,109 +282,107 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=798,
serialized_end=869,
serialized_start=761,
serialized_end=847,
)
_METRICSDATA_DATADIGEST = _descriptor.Descriptor(
name='DataDigest',
full_name='proto.MetricsData.DataDigest',
_METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor(
name='UserMetricsEntry',
full_name='proto.MetricsData.UserMetricsEntry',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='count', full_name='proto.MetricsData.DataDigest.count', index=0,
number=1, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
name='key', full_name='proto.MetricsData.UserMetricsEntry.key', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='sum', full_name='proto.MetricsData.DataDigest.sum', index=1,
name='value', full_name='proto.MetricsData.UserMetricsEntry.value', index=1,
number=2, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='max', full_name='proto.MetricsData.DataDigest.max', index=2,
number=3, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='min', full_name='proto.MetricsData.DataDigest.min', index=3,
number=4, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=937,
serialized_end=1003,
serialized_start=1117,
serialized_end=1167,
)
_METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
name='MetricsEntry',
full_name='proto.MetricsData.MetricsEntry',
_METRICSDATA = _descriptor.Descriptor(
name='MetricsData',
full_name='proto.MetricsData',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='proto.MetricsData.MetricsEntry.key', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
name='receivedTotal', full_name='proto.MetricsData.receivedTotal', index=0,
number=2, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value', full_name='proto.MetricsData.MetricsEntry.value', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
name='processedTotal', full_name='proto.MetricsData.processedTotal', index=1,
number=3, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=1005,
serialized_end=1082,
)
_METRICSDATA = _descriptor.Descriptor(
name='MetricsData',
full_name='proto.MetricsData',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='metrics', full_name='proto.MetricsData.metrics', index=0,
number=1, type=11, cpp_type=10, label=3,
name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal', index=2,
number=4, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal', index=3,
number=5, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=4,
number=6, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=5,
number=7, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=6,
number=8, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='userMetrics', full_name='proto.MetricsData.userMetrics', index=7,
number=9, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
......@@ -392,7 +390,7 @@ _METRICSDATA = _descriptor.Descriptor(
],
extensions=[
],
nested_types=[_METRICSDATA_DATADIGEST, _METRICSDATA_METRICSENTRY, ],
nested_types=[_METRICSDATA_USERMETRICSENTRY, ],
enum_types=[
],
options=None,
......@@ -401,8 +399,8 @@ _METRICSDATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=872,
serialized_end=1082,
serialized_start=850,
serialized_end=1167,
)
......@@ -432,8 +430,8 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1084,
serialized_end=1120,
serialized_start=1169,
serialized_end=1205,
)
......@@ -477,8 +475,8 @@ _METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1183,
serialized_end=1275,
serialized_start=1268,
serialized_end=1360,
)
_METRICS = _descriptor.Descriptor(
......@@ -507,8 +505,8 @@ _METRICS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1123,
serialized_end=1275,
serialized_start=1208,
serialized_end=1360,
)
_FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
......@@ -516,12 +514,9 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.containing_type = _FUNCTIONSTATUS
_FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
_FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
_FUNCTIONSTATUS.fields_by_name['deserializationExceptions'].message_type = _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY
_FUNCTIONSTATUS.fields_by_name['metrics'].message_type = _METRICSDATA
_FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type = _FUNCTIONSTATUS
_METRICSDATA_DATADIGEST.containing_type = _METRICSDATA
_METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type = _METRICSDATA_DATADIGEST
_METRICSDATA_METRICSENTRY.containing_type = _METRICSDATA
_METRICSDATA.fields_by_name['metrics'].message_type = _METRICSDATA_METRICSENTRY
_METRICSDATA_USERMETRICSENTRY.containing_type = _METRICSDATA
_METRICSDATA.fields_by_name['userMetrics'].message_type = _METRICSDATA_USERMETRICSENTRY
_METRICS_INSTANCEMETRICS.fields_by_name['metricsData'].message_type = _METRICSDATA
_METRICS_INSTANCEMETRICS.containing_type = _METRICS
_METRICS.fields_by_name['metrics'].message_type = _METRICS_INSTANCEMETRICS
......@@ -564,17 +559,10 @@ _sym_db.RegisterMessage(FunctionStatusList)
MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData', (_message.Message,), dict(
DataDigest = _reflection.GeneratedProtocolMessageType('DataDigest', (_message.Message,), dict(
DESCRIPTOR = _METRICSDATA_DATADIGEST,
__module__ = 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.MetricsData.DataDigest)
))
,
MetricsEntry = _reflection.GeneratedProtocolMessageType('MetricsEntry', (_message.Message,), dict(
DESCRIPTOR = _METRICSDATA_METRICSENTRY,
UserMetricsEntry = _reflection.GeneratedProtocolMessageType('UserMetricsEntry', (_message.Message,), dict(
DESCRIPTOR = _METRICSDATA_USERMETRICSENTRY,
__module__ = 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.MetricsData.MetricsEntry)
# @@protoc_insertion_point(class_scope:proto.MetricsData.UserMetricsEntry)
))
,
DESCRIPTOR = _METRICSDATA,
......@@ -582,8 +570,7 @@ MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData', (_message.
# @@protoc_insertion_point(class_scope:proto.MetricsData)
))
_sym_db.RegisterMessage(MetricsData)
_sym_db.RegisterMessage(MetricsData.DataDigest)
_sym_db.RegisterMessage(MetricsData.MetricsEntry)
_sym_db.RegisterMessage(MetricsData.UserMetricsEntry)
HealthCheckResult = _reflection.GeneratedProtocolMessageType('HealthCheckResult', (_message.Message,), dict(
DESCRIPTOR = _HEALTHCHECKRESULT,
......@@ -612,8 +599,8 @@ DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication'))
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.has_options = True
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
_METRICSDATA_METRICSENTRY.has_options = True
_METRICSDATA_METRICSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
_METRICSDATA_USERMETRICSENTRY.has_options = True
_METRICSDATA_USERMETRICSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
_INSTANCECONTROL = _descriptor.ServiceDescriptor(
name='InstanceControl',
......@@ -621,8 +608,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=1278,
serialized_end=1626,
serialized_start=1363,
serialized_end=1711,
methods=[
_descriptor.MethodDescriptor(
name='GetFunctionStatus',
......
......@@ -29,27 +29,10 @@ import json
import pulsar
import util
import InstanceCommunication_pb2
from prometheus_client import Summary
from function_stats import Stats
# For keeping track of accumulated metrics
class AccumulatedMetricDatum(object):
def __init__(self):
self.count = 0.0
self.sum = 0.0
self.max = float('-inf')
self.min = float('inf')
def update(self, value):
self.count += 1
self.sum += value
if value > self.max:
self.max = value
if value < self.min:
self.min = value
class ContextImpl(pulsar.Context):
# add label to indicate user metric
......@@ -62,7 +45,6 @@ class ContextImpl(pulsar.Context):
self.user_code_dir = os.path.dirname(user_code)
self.consumers = consumers
self.secrets_provider = secrets_provider
self.accumulated_metrics = {}
self.publish_producers = {}
self.publish_serializers = {}
self.current_message_id = None
......@@ -132,9 +114,6 @@ class ContextImpl(pulsar.Context):
if metric_name not in self.user_metrics_labels:
self.user_metrics_labels[metric_name] = self.metrics_labels + [metric_name]
self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
if not metric_name in self.accumulated_metrics:
self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
self.accumulated_metrics[metric_name].update(metric_value)
def get_output_topic(self):
return self.instance_config.function_details.output
......@@ -182,13 +161,11 @@ class ContextImpl(pulsar.Context):
for labels in self.user_metrics_labels.values():
self.user_metrics_summary.labels(*labels)._sum.set(0.0)
self.user_metrics_summary.labels(*labels)._count.set(0.0)
self.accumulated_metrics.clear()
def get_metrics(self):
metrics = InstanceCommunication_pb2.MetricsData()
for metric_name, accumulated_metric in self.accumulated_metrics.items():
metrics.metrics[metric_name].count = accumulated_metric.count
metrics.metrics[metric_name].sum = accumulated_metric.sum
metrics.metrics[metric_name].max = accumulated_metric.max
metrics.metrics[metric_name].min = accumulated_metric.min
return metrics
metrics_map = {}
for metric_name, metric_labels in self.user_metrics_labels.items():
metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._sum.get()
metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._count.get()
return metrics_map
......@@ -26,28 +26,31 @@ from prometheus_client import Counter, Summary, Gauge
class Stats(object):
metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 'cluster']
TOTAL_PROCESSED = 'pulsar_function_processed_total'
TOTAL_SUCCESSFULLY_PROCESSED = 'pulsar_function_processed_successfully_total'
TOTAL_SYSTEM_EXCEPTIONS = 'pulsar_function_system_exceptions_total'
TOTAL_USER_EXCEPTIONS = 'pulsar_function_user_exceptions_total'
PROCESS_LATENCY_MS = 'pulsar_function_process_latency_ms'
LAST_INVOCATION = 'pulsar_function_last_invocation'
TOTAL_RECEIVED = 'pulsar_function_received_total'
PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
USER_METRIC_PREFIX = "user_metric_";
TOTAL_PROCESSED = 'processed_total'
TOTAL_SUCCESSFULLY_PROCESSED = 'processed_successfully_total'
TOTAL_SYSTEM_EXCEPTIONS = 'system_exceptions_total'
TOTAL_USER_EXCEPTIONS = 'user_exceptions_total'
PROCESS_LATENCY_MS = 'process_latency_ms'
LAST_INVOCATION = 'last_invocation'
TOTAL_RECEIVED = 'received_total'
# Declare Prometheus
stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names)
stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
stat_total_processed = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names)
stat_total_processed_successfully = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED,
'Total number of messages processed successfully.', metrics_label_names)
stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.',
stat_total_sys_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX+ TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.',
metrics_label_names)
stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of user exceptions.',
stat_total_user_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_USER_EXCEPTIONS, 'Total number of user exceptions.',
metrics_label_names)
stat_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in milliseconds.', metrics_label_names)
stat_process_latency_ms = Summary(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS, 'Process latency in milliseconds.', metrics_label_names)
stat_last_invocation = Gauge(LAST_INVOCATION, 'The timestamp of the last invocation of the function.', metrics_label_names)
stat_last_invocation = Gauge(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION, 'The timestamp of the last invocation of the function.', metrics_label_names)
stat_total_received = Counter(TOTAL_RECEIVED, 'Total number of messages received from source.', metrics_label_names)
stat_total_received = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_RECEIVED, 'Total number of messages received from source.', metrics_label_names)
latest_user_exception = []
latest_sys_exception = []
......
......@@ -286,20 +286,34 @@ class PythonInstance(object):
self.contextimpl.reset_metrics()
def get_metrics(self):
# First get any user metrics
metrics = self.contextimpl.get_metrics()
# Now add system metrics as well
self.add_system_metrics(Stats.TOTAL_PROCESSED, Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
self.add_system_metrics(Stats.TOTAL_SUCCESSFULLY_PROCESSED, Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(), metrics)
self.add_system_metrics(Stats.TOTAL_SYSTEM_EXCEPTIONS, Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(), metrics)
self.add_system_metrics(Stats.TOTAL_USER_EXCEPTIONS, Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(), metrics)
self.add_system_metrics(Stats.PROCESS_LATENCY_MS,
0.0 if Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
else Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() / Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get(),
metrics)
self.add_system_metrics(Stats.TOTAL_RECEIVED, Stats.stat_total_received.labels(*self.metrics_labels)._value.get(), metrics)
self.add_system_metrics(Stats.LAST_INVOCATION, Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get(), metrics)
return metrics
total_received = Stats.stat_total_received.labels(*self.metrics_labels)._value.get()
total_processed = Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
metrics_data = InstanceCommunication_pb2.MetricsData()
metrics_data.receivedTotal = int(total_received) if sys.version_info.major >= 3 else long(total_received)
metrics_data.processedTotal = int(total_processed) if sys.version_info.major >= 3 else long(total_processed)
metrics_data.processedSuccessfullyTotal = int(total_processed_successfully) if sys.version_info.major >= 3 else long(total_processed_successfully)
metrics_data.systemExceptionsTotal = int(total_sys_exceptions) if sys.version_info.major >= 3 else long(total_sys_exceptions)
metrics_data.userExceptionsTotal = int(total_user_exceptions) if sys.version_info.major >= 3 else long(total_user_exceptions)
metrics_data.avgProcessLatency = 0.0 \
if process_latency_ms_count <= 0.0 \
else process_latency_ms_sum / process_latency_ms_count
metrics_data.lastInvocation = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation)
# get any user metrics
user_metrics = self.contextimpl.get_metrics()
for metric_name, value in user_metrics.items():
metrics_data.userMetrics[metric_name] = value
return metrics_data
def add_system_metrics(self, metric_name, value, metrics):
metrics.metrics[metric_name].count = value
......@@ -312,30 +326,30 @@ class PythonInstance(object):
status.running = True
total_processed = Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
stat_total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
stat_total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
stat_total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
stat_process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
stat_process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
stat_last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
status.numProcessed = int(total_processed) if sys.version_info.major >= 3 else long(total_processed)
status.numSuccessfullyProcessed = int(stat_total_processed_successfully) if sys.version_info.major >= 3 else long(stat_total_processed_successfully)
status.numUserExceptions = int(stat_total_user_exceptions) if sys.version_info.major >= 3 else long(stat_total_user_exceptions)
total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
status.numProcessed = int(total_processed) if sys.version_info.major >= 3 else long(total_processed)
status.numSuccessfullyProcessed = int(total_processed_successfully) if sys.version_info.major >= 3 else long(total_processed_successfully)
status.numUserExceptions = int(total_user_exceptions) if sys.version_info.major >= 3 else long(total_user_exceptions)
status.instanceId = self.instance_config.instance_id
for ex, tm in self.stats.latest_user_exception:
to_add = status.latestUserExceptions.add()
to_add.exceptionString = ex
to_add.msSinceEpoch = tm
status.numSystemExceptions = int(stat_total_sys_exceptions) if sys.version_info.major >= 3 else long(stat_total_sys_exceptions)
status.numSystemExceptions = int(total_sys_exceptions) if sys.version_info.major >= 3 else long(total_sys_exceptions)
for ex, tm in self.stats.latest_sys_exception:
to_add = status.latestSystemExceptions.add()
to_add.exceptionString = ex
to_add.msSinceEpoch = tm
status.averageLatency = 0.0 \
if stat_process_latency_ms_count <= 0.0 \
else stat_process_latency_ms_sum / stat_process_latency_ms_count
status.lastInvocationTime = int(stat_last_invocation) if sys.version_info.major >= 3 else long(stat_last_invocation)
if process_latency_ms_count <= 0.0 \
else process_latency_ms_sum / process_latency_ms_count
status.lastInvocationTime = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation)
return status
def join(self):
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.metrics;
import java.util.Map;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
/**
* The metrics sink interface. <p>
* Implementations of this interface consume the {@link MetricsData}
* The calling entity pushes the {@link MetricsData} to the sink using
* {@link #processRecord(MetricsData)} method.
* And {@link #flush()} is called at an interval according to the configuration
*/
public interface MetricsSink extends AutoCloseable {
/**
* Initialize the MetricsSink
*
* @param conf An unmodifiableMap containing basic configuration
* Attempts to modify the returned map,
* whether direct or via its collection views, result in an UnsupportedOperationException.
*/
void init(Map<String, String> conf);
/**
* Process a metrics record in the sink
* @param record the record to put
* @param functionDetails functionDetails that generated this record
*/
void processRecord(MetricsData record, Function.FunctionDetails functionDetails);
/**
* Flush any buffered metrics
* It would be called at an interval according to the configuration
*/
void flush();
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*/
void close();
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.metrics;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
import com.google.protobuf.util.JsonFormat;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.metrics.sink.AbstractWebSink;
import org.apache.pulsar.functions.metrics.sink.PrometheusSink;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import java.util.HashMap;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.*;
/**
* A function container implemented using java thread.
*/
@Slf4j
public class PrometheusMetricsServer {
@Parameter(names = "--function_details", description = "Function details json\n", required = true)
protected String functionDetailsJsonString;
@Parameter(names = "--prometheus_port", description = "Port to listen for prometheus requests\n", required = true)
protected int prometheusPort;
@Parameter(names = "--grpc_port", description = "GRPC Port to query the metrics from instance\n", required = true)
protected int grpc_port;
@Parameter(names = "--collection_interval", description = "Number in seconds between collection interval\n", required = true)
protected int metricsCollectionInterval;
private FunctionDetails functionDetails;
private MetricsSink metricsSink;
private ManagedChannel channel;
private InstanceControlGrpc.InstanceControlFutureStub stub;
private ScheduledExecutorService timer;
public PrometheusMetricsServer() { }
public void start() throws Exception {
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
}
if (functionDetailsJsonString.charAt(functionDetailsJsonString.length() - 1) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
functionDetails = functionDetailsBuilder.build();
metricsSink = new PrometheusSink();
Map<String, String> config = new HashMap<>();
config.put(AbstractWebSink.KEY_PATH, "/metrics");
config.put(AbstractWebSink.KEY_PORT, String.valueOf(prometheusPort));
metricsSink.init(config);
channel = ManagedChannelBuilder.forAddress("127.0.0.1", grpc_port)
.usePlaintext(true)
.build();
stub = InstanceControlGrpc.newFutureStub(channel);
if (metricsCollectionInterval > 0) {
timer = Executors.newSingleThreadScheduledExecutor();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
CompletableFuture<InstanceCommunication.MetricsData> result = getMetrics();
try {
metricsSink.processRecord(result.get(), functionDetails);
} catch (Exception e) {
log.error("Getting metrics data failed {}/{}/{}",
functionDetails.getTenant(),
functionDetails.getNamespace(),
functionDetails.getName(),
e);
}
}
}, metricsCollectionInterval, metricsCollectionInterval, TimeUnit.SECONDS);
}
}
public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
ListenableFuture<InstanceCommunication.MetricsData> response = stub.withDeadlineAfter(10, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build());
Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
@Override
public void onFailure(Throwable throwable) {
retval.completeExceptionally(throwable);
}
@Override
public void onSuccess(InstanceCommunication.MetricsData t) {
retval.complete(t);
}
});
return retval;
}
public static void main(String[] args) throws Exception {
PrometheusMetricsServer server = new PrometheusMetricsServer();
JCommander jcommander = new JCommander(server);
jcommander.setProgramName("PrometheusMetricsServer");
// parse args by JCommander
jcommander.parse(args);
server.start();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Spanwer for spawning processes, threads, docker containers to execute functions.
*/
package org.apache.pulsar.functions.metrics;
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.metrics.sink;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.sun.net.httpserver.HttpServer;
import org.apache.pulsar.functions.metrics.MetricsSink;
/**
* A metrics sink that publishes metrics on a http endpoint
*/
abstract public class AbstractWebSink implements MetricsSink {
private static final Logger LOG = Logger.getLogger(AbstractWebSink.class.getName());
private static final int HTTP_STATUS_OK = 200;
// Metrics will be published on http://host:port/path, the port
public static final String KEY_PORT = "port";
// The path
public static final String KEY_PATH = "path";
// Maximum number of metrics getting served
private static final String KEY_METRICS_CACHE_MAX_SIZE = "metrics-cache-max-size";
private static final String DEFAULT_MAX_CACHE_SIZE = "1000000";
// Time To Live before a metric gets evicted from the cache
private static final String KEY_METRICS_CACHE_TTL_SEC = "metrics-cache-ttl-sec";
private static final String DEFAULT_CACHE_TTL_SECONDS = "600";
private HttpServer httpServer;
private long cacheMaxSize;
private long cacheTtlSeconds;
private final Ticker cacheTicker;
AbstractWebSink() {
this(Ticker.systemTicker());
}
@VisibleForTesting
AbstractWebSink(Ticker cacheTicker) {
this.cacheTicker = cacheTicker;
}
@Override
public final void init(Map<String, String> conf) {
String path = conf.get(KEY_PATH);
cacheMaxSize = Long.valueOf(conf.getOrDefault(KEY_METRICS_CACHE_MAX_SIZE,
DEFAULT_MAX_CACHE_SIZE));
cacheTtlSeconds = Long.valueOf(conf.getOrDefault(KEY_METRICS_CACHE_TTL_SEC,
DEFAULT_CACHE_TTL_SECONDS));
// initialize child classes
initialize(conf);
int port = Integer.valueOf(conf.getOrDefault(KEY_PORT, "9099"));
startHttpServer(path, port);
}
/**
* Start a http server on supplied port that will serve the metrics, as json,
* on the specified path.
*
* @param path
* @param port
*/
protected void startHttpServer(String path, int port) {
LOG.info("Starting AbstractWebMetricSink at path" + path + " and port " + port);
try {
httpServer = HttpServer.create(new InetSocketAddress(port), 0);
httpServer.createContext(path, httpExchange -> {
byte[] response = generateResponse();
httpExchange.sendResponseHeaders(HTTP_STATUS_OK, response.length);
OutputStream os = httpExchange.getResponseBody();
os.write(response);
os.close();
LOG.log(Level.INFO, "Received metrics request.");
});
httpServer.start();
} catch (IOException e) {
throw new RuntimeException("Failed to create Http server on port " + port, e);
}
}
// a convenience method for creating a metrics cache
<K, V> Cache<K, V> createCache() {
return CacheBuilder.newBuilder()
.maximumSize(cacheMaxSize)
.expireAfterWrite(cacheTtlSeconds, TimeUnit.SECONDS)
.ticker(cacheTicker)
.build();
}
abstract byte[] generateResponse() throws IOException;
abstract void initialize(Map<String, String> configuration);
@Override
public void flush() { }
@Override
public void close() {
if (httpServer != null) {
httpServer.stop(0);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.metrics.sink;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.metrics.MetricsSink;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Utils;
/**
* A metrics sink that writes to a file in json format
* We would create/overwrite a file every time the flush() in invoked
* We would save at most fileMaximum metrics file in disk
*/
public class FileSink implements MetricsSink {
private static final Logger LOG = Logger.getLogger(FileSink.class.getName());
private static final String FILENAME_KEY = "filename-output";
private static final String MAXIMUM_FILE_COUNT_KEY = "file-maximum";
// We would convert a file's metrics into a JSON object, i.e. array
// So we need to add "[" at the start and "]" at the end
private boolean isFileStart = true;
private PrintStream writer;
private String filenameKey;
private int fileMaximum = 1;
private int currentFileIndex = 0;
@Override
public void init(Map<String, String> conf) {
verifyConf(conf);
filenameKey = conf.get(FILENAME_KEY);
fileMaximum = Integer.valueOf(conf.get(MAXIMUM_FILE_COUNT_KEY));
}
private void verifyConf(Map<String, String> conf) {
if (!conf.containsKey(FILENAME_KEY)) {
throw new IllegalArgumentException("Require: " + FILENAME_KEY);
}
if (!conf.containsKey(MAXIMUM_FILE_COUNT_KEY)) {
throw new IllegalArgumentException("Require: " + MAXIMUM_FILE_COUNT_KEY);
}
}
private PrintStream openNewFile(String filename) {
// If the file already exists, set it Writable to avoid permission denied
File f = new File(filename);
if (f.exists() && !f.isDirectory()) {
f.setWritable(true);
}
try {
return new PrintStream(new FileOutputStream(filename, false), true, "UTF-8");
} catch (FileNotFoundException | UnsupportedEncodingException e) {
throw new RuntimeException("Error creating " + filename, e);
}
}
@Override
public void processRecord(InstanceCommunication.MetricsData record, Function.FunctionDetails FunctionDetails) {
if (isFileStart) {
String filenamePrefix = filenameKey + "." + FunctionDetailsUtils.getFullyQualifiedName(FunctionDetails);
writer = openNewFile(String.format("%s.%d", filenamePrefix, currentFileIndex));
writer.print("[");
isFileStart = false;
} else {
writer.print(",");
}
try {
String metrics = Utils.printJson(record);
writer.print(metrics);
} catch (Exception ex) {
}
}
@Override
public void flush() {
if (isFileStart) {
// No record has been processed since the previous flush, so create a new file
// and output an empty JSON array.
writer = openNewFile(String.format("%s.%d", filenameKey, currentFileIndex));
writer.print("[");
}
writer.print("]");
writer.flush();
writer.close();
new File(String.format("%s.%s", filenameKey, currentFileIndex)).setReadOnly();
currentFileIndex = (currentFileIndex + 1) % fileMaximum;
isFileStart = true;
}
@Override
public void close() {
if (writer != null) {
writer.close();
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.metrics.sink;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import com.google.common.cache.Cache;
import lombok.Getter;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
/**
* A web sink that exposes and endpoint that Prometheus can scrape
*
* metrics are generated in a text format and separated with a newline "\n"
* https://prometheus.io/docs/instrumenting/exposition_formats
*
* metrics format:
* heron_metric{topology="topology-name",component="component-id",instance="instance-id"} value timestamp
*/
public class PrometheusSink extends AbstractWebSink {
private static final Logger LOG = Logger.getLogger(PrometheusSink.class.getName());
private static final String PREFIX = "pulsar_function";
private static final String DELIMITER = "\n";
// This is the cache that is used to serve the metrics
@Getter
private Cache<String, Map<String, Double>> metricsCache;
public PrometheusSink() {
super();
}
@Override
void initialize(Map<String, String> configuration) {
metricsCache = createCache();
}
@Override
byte[] generateResponse() throws IOException {
metricsCache.cleanUp();
final Map<String, Map<String, Double>> metrics = metricsCache.asMap();
final StringBuilder sb = new StringBuilder();
metrics.forEach((String source, Map<String, Double> sourceMetrics) -> {
String tenant = FunctionDetailsUtils.extractTenantFromFQN(source);
String namespace = FunctionDetailsUtils.extractNamespaceFromFQN(source);
String name = FunctionDetailsUtils.extractFunctionNameFromFQN(source);
sourceMetrics.forEach((String metricName, Double value) -> {
String exportedMetricName = String.format("%s_%s", PREFIX, metricName);
sb.append(Prometheus.sanitizeMetricName(exportedMetricName))
.append("{")
.append("tenant=\"").append(tenant).append("\",")
.append("namespace=\"").append(namespace).append("\",")
.append("functionname=\"").append(name).append("\"");
sb.append("} ")
.append(Prometheus.doubleToGoString(value))
.append(" ").append(currentTimeMillis())
.append(DELIMITER);
});
});
return sb.toString().getBytes();
}
@Override
public void processRecord(MetricsData record, Function.FunctionDetails functionDetails) {
final String source = FunctionDetailsUtils.getFullyQualifiedName(functionDetails);
Map<String, Double> sourceCache = metricsCache.getIfPresent(source);
if (sourceCache == null) {
final Cache<String, Double> newSourceCache = createCache();
sourceCache = newSourceCache.asMap();
}
sourceCache.putAll(processMetrics(record));
metricsCache.put(source, sourceCache);
}
static Map<String, Double> processMetrics(MetricsData metrics) {
Map<String, Double> map = new HashMap<>();
for (Map.Entry<String, MetricsData.DataDigest> entry : metrics.getMetricsMap().entrySet()) {
map.put(entry.getKey() + "_count", entry.getValue().getCount());
map.put(entry.getKey() + "_sum", entry.getValue().getSum());
map.put(entry.getKey() + "_max", entry.getValue().getMax());
map.put(entry.getKey() + "_min", entry.getValue().getMin());
}
return map;
}
long currentTimeMillis() {
return System.currentTimeMillis();
}
// code taken from prometheus java_client repo
static final class Prometheus {
private static final Pattern METRIC_NAME_RE = Pattern.compile("[a-zA-Z_:][a-zA-Z0-9_:]*");
private static final Pattern METRIC_LABEL_NAME_RE = Pattern.compile("[a-zA-Z_][a-zA-Z0-9_]*");
private static final Pattern RESERVED_METRIC_LABEL_NAME_RE = Pattern.compile("__.*");
/**
* Throw an exception if the metric name is invalid.
*/
static void checkMetricName(String name) {
if (!METRIC_NAME_RE.matcher(name).matches()) {
throw new IllegalArgumentException("Invalid metric name: " + name);
}
}
private static final Pattern SANITIZE_PREFIX_PATTERN = Pattern.compile("^[^a-zA-Z_]");
private static final Pattern SANITIZE_BODY_PATTERN = Pattern.compile("[^a-zA-Z0-9_]");
/**
* Sanitize metric name
*/
static String sanitizeMetricName(String metricName) {
return SANITIZE_BODY_PATTERN.matcher(
SANITIZE_PREFIX_PATTERN.matcher(metricName).replaceFirst("_")
).replaceAll("_");
}
/**
* Throw an exception if the metric label name is invalid.
*/
static void checkMetricLabelName(String name) {
if (!METRIC_LABEL_NAME_RE.matcher(name).matches()) {
throw new IllegalArgumentException("Invalid metric label name: " + name);
}
if (RESERVED_METRIC_LABEL_NAME_RE.matcher(name).matches()) {
throw new IllegalArgumentException(
"Invalid metric label name, reserved for internal use: " + name);
}
}
/**
* Convert a double to its string representation in Go.
*/
static String doubleToGoString(double d) {
if (d == Double.POSITIVE_INFINITY) {
return "+Inf";
}
if (d == Double.NEGATIVE_INFINITY) {
return "-Inf";
}
if (Double.isNaN(d)) {
return "NaN";
}
return Double.toString(d);
}
private Prometheus() {
}
}
}
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Configuration:
name: pulsar-functions-kubernetes-prometheus-metrics-server
monitorInterval: 30
Appenders:
# Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.metrics.sink;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.testng.annotations.Test;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterMethod;
import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.fail;
/**
* FileSink Tester.
*/
public class FileSinkTest {
private FileSink fileSink;
private File tmpDir;
@BeforeMethod
public void before() throws IOException {
fileSink = new FileSink();
Map<String, String> conf = new HashMap<>();
tmpDir = Files.createTempDirectory("filesink").toFile();
conf.put("filename-output", tmpDir.getAbsolutePath() + "/filesink");
conf.put("file-maximum", "100");
fileSink.init(conf);
}
@AfterMethod
public void after() {
fileSink.close();
for (File file: tmpDir.listFiles()) {
file.delete();
}
tmpDir.delete();
}
/**
* Method: flush()
*/
@Test
public void testFirstFlushWithoutRecords() throws IOException {
fileSink.flush();
String content = new String(readFromFile(
new File(tmpDir, "/filesink.0").getAbsolutePath()));
assertEquals("[]", content);
}
/**
* Method: flush()
*/
@Test
public void testSuccessiveFlushWithoutRecords() throws UnsupportedEncodingException, IOException {
fileSink.flush();
fileSink.flush();
String content = new String(readFromFile(
new File(tmpDir, "/filesink.0").getAbsolutePath()));
assertEquals("[]", content);
content = new String(readFromFile(new File(tmpDir, "/filesink.1").getAbsolutePath()));
assertEquals("[]", content);
}
/**
* Method: init()
*/
@Test
public void testIllegalConf() {
FileSink sink = new FileSink();
Map<String, String> conf = new HashMap<>();
try {
sink.init(conf);
fail("Expected IllegalArgumentException.");
} catch (IllegalArgumentException e) {
assertEquals("Require: filename-output", e.getMessage());
}
sink = new FileSink();
conf.put("filename-output", tmpDir.getAbsolutePath() + "/filesink");
try {
sink.init(conf);
fail("Expected IllegalArgumentException.");
} catch (IllegalArgumentException e) {
assertEquals("Require: file-maximum", e.getMessage());
}
}
private String readFromFile(String path) throws IOException {
byte[] encoded = Files.readAllBytes(Paths.get(path));
return new String(encoded, StandardCharsets.UTF_8);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.metrics.sink;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.testng.annotations.Test;
import org.testng.annotations.BeforeMethod;
import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
public class PrometheusSinkTests {
private static final long NOW = System.currentTimeMillis();
private final class PrometheusTestSink extends PrometheusSink {
private PrometheusTestSink() {
}
@Override
protected void startHttpServer(String path, int port) {
// no need to start the server for tests
}
public Map<String, Map<String, Double>> getMetrics() {
return getMetricsCache().asMap();
}
long currentTimeMillis() {
return NOW;
}
}
private Map<String, String> defaultConf;
private InstanceCommunication.MetricsData records;
@BeforeMethod
public void before() {
defaultConf = new HashMap<>();
defaultConf.put("port", "9999");
defaultConf.put("path", "test");
defaultConf.put("flat-metrics", "true");
defaultConf.put("include-topology-name", "false");
InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
InstanceCommunication.MetricsData.DataDigest metric1 =
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(2).setSum(5).setMax(3).setMin(2).build();
bldr.putMetrics("metric_1", metric1);
InstanceCommunication.MetricsData.DataDigest metric2 =
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(3).setSum(6).setMax(3).setMin(1).build();
bldr.putMetrics("metric_2", metric2);
records = bldr.build();
}
@Test
public void testMetricsGrouping() {
PrometheusTestSink sink = new PrometheusTestSink();
sink.init(defaultConf);
Function.FunctionDetails functionDetails = createFunctionDetails("tenant", "namespace", "functionname");
sink.processRecord(records, functionDetails);
final Map<String, Map<String, Double>> metrics = sink.getMetrics();
assertTrue(metrics.containsKey(FunctionDetailsUtils.getFullyQualifiedName(functionDetails)));
}
@Test
public void testResponse() throws IOException {
PrometheusTestSink sink = new PrometheusTestSink();
sink.init(defaultConf);
Function.FunctionDetails functionDetails = createFunctionDetails("tenant", "namespace", "functionname");
sink.processRecord(records, functionDetails);
final List<String> expectedLines = Arrays.asList(
createMetric(functionDetails, "metric_1_count", 2),
createMetric(functionDetails, "metric_1_sum", 5),
createMetric(functionDetails, "metric_1_max", 3),
createMetric(functionDetails, "metric_1_min", 2),
createMetric(functionDetails, "metric_2_count", 3),
createMetric(functionDetails, "metric_2_sum", 6),
createMetric(functionDetails, "metric_2_max", 3),
createMetric(functionDetails, "metric_2_min", 1)
);
final Set<String> generatedLines =
new HashSet<>(Arrays.asList(new String(sink.generateResponse()).split("\n")));
assertEquals(expectedLines.size(), generatedLines.size());
expectedLines.forEach((String line) -> {
assertTrue(generatedLines.contains(line));
});
}
private String createMetric(Function.FunctionDetails functionDetails,
String metric, double value) {
return String.format("pulsar_function_%s"
+ "{tenant=\"%s\",namespace=\"%s\",functionname=\"%s\"}"
+ " %s %d",
metric, functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), value, NOW);
}
private Function.FunctionDetails createFunctionDetails(String tenant, String namespace, String name) {
Function.FunctionDetails.Builder bldr = Function.FunctionDetails.newBuilder();
bldr.setTenant(tenant);
bldr.setNamespace(namespace);
bldr.setName(name);
return bldr.build();
}
}
......@@ -37,7 +37,6 @@
<module>api-java</module>
<module>java-examples</module>
<module>utils</module>
<module>metrics</module>
<module>instance</module>
<module>runtime</module>
<module>runtime-shaded</module>
......
......@@ -49,7 +49,7 @@ message FunctionStatus {
// expressed in ms since epoch
int64 lastInvocationTime = 13;
string instanceId = 14;
MetricsData metrics = 15 [deprecated=true];
// MetricsData metrics = 15 [deprecated=true];
// owner of function-instance
string workerId = 16;
}
......@@ -60,13 +60,37 @@ message FunctionStatusList {
}
message MetricsData {
message DataDigest {
double count = 1;
double sum = 2;
double max = 3;
double min = 4;
}
map<string, DataDigest> metrics = 1;
// message DataDigest {
// double count = 1;
// double sum = 2;
// double max = 3;
// double min = 4;
// }
// map<string, DataDigest> metrics = 1 [deprecated=true];
// Total number of records function received from source
int64 receivedTotal = 2;
// Total number of records processed
int64 processedTotal = 3;
// Total number of records successfully processed by user function
int64 processedSuccessfullyTotal = 4;
// Total number of system exceptions thrown
int64 systemExceptionsTotal = 5;
// Total number of user exceptions thrown
int64 userExceptionsTotal = 6;
// Average process latency for function
double avgProcessLatency = 7;
// Timestamp of when the function was last invoked
int64 lastInvocation = 8;
// User defined metrics
map<string, double> userMetrics = 9;
}
message HealthCheckResult {
......
......@@ -133,7 +133,6 @@
<!-- dependencies use protobuf -->
<include>org.apache.pulsar:pulsar-functions-proto</include>
<include>org.apache.pulsar:pulsar-functions-utils</include>
<include>org.apache.pulsar:pulsar-functions-metrics</include>
<include>org.apache.pulsar:pulsar-functions-instance</include>
<include>org.apache.pulsar:pulsar-functions-runtime</include>
<include>org.apache.pulsar:pulsar-functions-api</include>
......
......@@ -39,12 +39,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-metrics</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-secrets</artifactId>
......
......@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.protobuf.Empty;
import com.google.protobuf.util.JsonFormat;
import com.squareup.okhttp.Response;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
......@@ -59,7 +58,6 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
......@@ -135,7 +133,6 @@ class KubernetesRuntime implements Runtime {
InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir,
String prometheusMetricsServerJarFile,
String logDirectory,
String userCodePkgUrl,
String originalCodeFileName,
......
......@@ -76,7 +76,6 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private final String javaInstanceJarFile;
private final String pythonInstanceFile;
private final String extraDependenciesDir;
private final String prometheusMetricsServerJarFile;
private final SecretsProviderConfigurator secretsProviderConfigurator;
private final String logDirectory = "logs/functions";
private Timer changeConfigMapTimer;
......@@ -143,7 +142,6 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.authConfig = authConfig;
this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar";
this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
this.prometheusMetricsServerJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/PrometheusMetricsServer.jar";
this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator;
}
......@@ -182,7 +180,6 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
instanceConfig,
instanceFile,
extraDependenciesDir,
prometheusMetricsServerJarFile,
logDirectory,
codePkgUrl,
originalCodeFileName,
......
......@@ -121,7 +121,6 @@
<!-- dependencies use protobuf -->
<include>org.apache.pulsar:pulsar-functions-proto</include>
<include>org.apache.pulsar:pulsar-functions-utils</include>
<include>org.apache.pulsar:pulsar-functions-metrics</include>
<include>org.apache.pulsar:pulsar-functions-instance</include>
<include>org.apache.pulsar:pulsar-functions-runtime</include>
<include>org.apache.pulsar:pulsar-functions-worker</include>
......
......@@ -36,7 +36,6 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import io.prometheus.client.CollectorRegistry;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
......@@ -46,6 +45,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.InstanceCommunication;
......@@ -55,10 +55,10 @@ import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
/**
......@@ -301,89 +301,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
}
/**
* Get status of a function instance. If this worker is not running the function instance,
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @param instanceId the function instance id
* @return the function status
*/
public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace,
String functionName, int instanceId, URI uri) {
Assignment assignment;
if (runtimeFactory.externallyManaged()) {
assignment = this.findAssignment(tenant, namespace, functionName, -1);
} else {
assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
}
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
if (assignment == null) {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setFailureException("Function has not been scheduled");
return functionStatusBuilder.build();
}
InstanceCommunication.FunctionStatus functionStatus = null;
// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
if (runtimeSpawner != null) {
try {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus
.newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get());
functionStatusBuilder.setWorkerId(assignedWorkerId);
functionStatus = functionStatusBuilder.build();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
if (functionRuntimeInfo.getStartupException() != null) {
functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
}
functionStatusBuilder.setWorkerId(assignedWorkerId);
functionStatus = functionStatusBuilder.build();
}
} else {
// query other worker
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
functionStatusBuilder.setFailureException("Function has not been scheduled");
return functionStatusBuilder.build();
}
if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
return functionStatus;
}
public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId,
boolean restart, URI uri) throws Exception {
if (runtimeFactory.externallyManaged()) {
......@@ -537,6 +454,223 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
}
/**
* Get stats of a function instance. If this worker is not running the function instance,
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @param instanceId the function instance id
* @return jsonObject containing stats for instance
*/
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(String tenant, String namespace,
String functionName, int instanceId, URI uri) {
Assignment assignment;
if (runtimeFactory.externallyManaged()) {
assignment = this.findAssignment(tenant, namespace, functionName, -1);
} else {
assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
}
if (assignment == null) {
return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
}
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
if (runtimeSpawner != null) {
return Utils.getFunctionInstanceStats(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo).getMetrics();
}
return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
} else {
// query other worker
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
}
if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
}
/**
* Get stats of all function instances.
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @return a list of function statuses
* @throws PulsarAdminException
*/
public FunctionStats getFunctionStats(String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
FunctionStats functionStats = new FunctionStats();
if (assignments.isEmpty()) {
return functionStats;
}
if (runtimeFactory.externallyManaged()) {
Assignment assignment = assignments.iterator().next();
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
if (isOwner) {
int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
for (int i = 0; i < parallelism; ++i) {
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = getFunctionInstanceStats(tenant, namespace,
functionName, i, null);
FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
functionInstanceStats.setInstanceId(i);
functionInstanceStats.setMetrics(functionInstanceStatsData);
functionStats.addInstance(functionInstanceStats);
}
} else {
// find the hostname/port of the worker who is the owner
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
return functionStats;
}
if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
} else {
for (Assignment assignment : assignments) {
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData;
if (isOwner) {
functionInstanceStatsData = getFunctionInstanceStats(tenant, namespace, functionName,
assignment.getInstance().getInstanceId(), null);
} else {
functionInstanceStatsData = this.functionAdmin.functions().getFunctionStats(
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
assignment.getInstance().getInstanceId());
}
FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
functionInstanceStats.setInstanceId(assignment.getInstance().getInstanceId());
functionInstanceStats.setMetrics(functionInstanceStatsData);
functionStats.addInstance(functionInstanceStats);
}
}
return functionStats.calculateOverall();
}
/**
* Get status of a function instance. If this worker is not running the function instance,
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @param instanceId the function instance id
* @return the function status
*/
public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace,
String functionName, int instanceId, URI uri) {
Assignment assignment;
if (runtimeFactory.externallyManaged()) {
assignment = this.findAssignment(tenant, namespace, functionName, -1);
} else {
assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
}
if (assignment == null) {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setFailureException("Function has not been scheduled");
return functionStatusBuilder.build();
}
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
InstanceCommunication.FunctionStatus functionStatus = null;
// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
if (runtimeSpawner != null) {
try {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus
.newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get());
functionStatusBuilder.setWorkerId(assignedWorkerId);
functionStatus = functionStatusBuilder.build();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
if (functionRuntimeInfo.getStartupException() != null) {
functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
}
functionStatusBuilder.setWorkerId(assignedWorkerId);
functionStatus = functionStatusBuilder.build();
}
} else {
// query other worker
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
functionStatusBuilder.setFailureException("Function has not been scheduled");
return functionStatusBuilder.build();
}
if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
return functionStatus;
}
/**
* Get statuses of all function instances.
* @param tenant the tenant the function belongs to
......@@ -545,8 +679,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
* @return a list of function statuses
* @throws PulsarAdminException
*/
public InstanceCommunication.FunctionStatusList getAllFunctionStatus(
String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
public InstanceCommunication.FunctionStatusList getAllFunctionStatus(String tenant, String namespace,
String functionName, URI uri)
throws PulsarAdminException {
Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
......@@ -555,6 +690,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
return functionStatusListBuilder.build();
}
// TODO refactor the code for externally managed.
if (runtimeFactory.externallyManaged()) {
Assignment assignment = assignments.iterator().next();
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
......
......@@ -18,17 +18,16 @@
*/
package org.apache.pulsar.functions.worker;
import org.apache.pulsar.functions.instance.FunctionStatsManager;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
/**
......@@ -59,30 +58,30 @@ public class FunctionsStatsGenerator {
if (functionRuntime != null) {
try {
InstanceCommunication.MetricsData metrics = functionRuntime.getMetrics().get();
for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> metricsEntry
: metrics.getMetricsMap().entrySet()) {
String metricName = metricsEntry.getKey();
InstanceCommunication.MetricsData.DataDigest dataDigest = metricsEntry.getValue();
String tenant = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getTenant();
String namespace = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getNamespace();
String name = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getName();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
String qualifiedNamespace = String.format("%s/%s", tenant, namespace);
String tenant = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getTenant();
String namespace = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getNamespace();
String name = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getName();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
String qualifiedNamespace = String.format("%s/%s", tenant, namespace);
metric(out, cluster, qualifiedNamespace, name, String.format("%s_count", metricName),
instanceId, dataDigest.getCount());
metric(out, cluster, qualifiedNamespace, name, String.format("%s_max", metricName),
instanceId, dataDigest.getMax());
metric(out, cluster, qualifiedNamespace,name, String.format("%s_min", metricName),
instanceId, dataDigest.getMin());
metric(out, cluster, qualifiedNamespace, name, String.format("%s_sum", metricName),
instanceId, dataDigest.getSum());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, metrics.getAvgProcessLatency());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, metrics.getProcessedSuccessfullyTotal());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESSED_TOTAL, instanceId, metrics.getProcessedTotal());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, metrics.getSystemExceptionsTotal());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, metrics.getUserExceptionsTotal());
for (Map.Entry<String, Double> userMetricsMapEntry : metrics.getUserMetricsMap().entrySet()) {
String userMetricName = userMetricsMapEntry.getKey();
Double val = userMetricsMapEntry.getValue();
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + userMetricName, instanceId, val);
}
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to collect metrics for function instance {}",
fullyQualifiedInstanceName, e);
......
......@@ -23,7 +23,12 @@ import java.net.URI;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
......@@ -37,6 +42,11 @@ import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
import org.apache.zookeeper.KeeperException.Code;
......@@ -194,5 +204,53 @@ public final class Utils {
String functionName, int instanceId) {
return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId);
}
public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo functionRuntimeInfo) {
RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
if (functionRuntimeSpawner != null) {
Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
if (functionRuntime != null) {
try {
InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
functionInstanceStats.setInstanceId(instanceId);
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData
= new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
functionInstanceStatsData.setReceivedTotal(metricsData.getReceivedTotal());
functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
// Filter out values that are NaN
Map<String, Double> statsDataMap = metricsData.getUserMetricsMap().entrySet().stream()
.filter(stringDoubleEntry -> !stringDoubleEntry.getValue().isNaN())
.collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
functionInstanceStatsData.setUserMetrics(statsDataMap);
functionInstanceStats.setMetrics(functionInstanceStatsData);
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e);
}
}
}
return functionInstanceStats;
}
public static FunctionStats getFunctionStats(Map<String, FunctionRuntimeInfo> functionRuntimes) {
FunctionStats functionStats = new FunctionStats();
for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
String fullyQualifiedInstanceName = entry.getKey();
FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
functionStats.addInstance(Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo));
}
return functionStats;
}
}
......@@ -19,15 +19,14 @@
package org.apache.pulsar.functions.worker.rest.api;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.worker.*;
import javax.ws.rs.WebApplicationException;
......@@ -36,7 +35,6 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -46,6 +44,8 @@ public class WorkerImpl {
private final Supplier<WorkerService> workerServiceSupplier;
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) {
this.workerServiceSupplier = workerServiceSupplier;
}
......@@ -122,16 +122,16 @@ public class WorkerImpl {
return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
}
public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis(String clientRole) throws IOException {
public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(String clientRole) throws IOException {
if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(clientRole + " is not authorize to get metrics")).build());
}
return getWorkerMetrcis();
return getWorkerMetrics();
}
private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics() {
if (!isWorkerServiceAvailable()) {
throw new WebApplicationException(
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
......@@ -149,6 +149,13 @@ public class WorkerImpl {
return getFunctionsMetrics();
}
@Data
public static class WorkerFunctionInstanceStats {
/** fully qualified function instance name **/
public String name;
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics;
}
private Response getFunctionsMetrics() throws IOException {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
......@@ -158,41 +165,22 @@ public class WorkerImpl {
Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager()
.getFunctionRuntimeInfos();
Metrics.Builder metricsBuilder = Metrics.newBuilder();
JsonArray metricsMapList = new JsonArray();
for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
String fullyQualifiedInstanceName = entry.getKey();
FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
if (functionRuntimeSpawner != null) {
Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
if (functionRuntime != null) {
try {
InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get();
String tenant = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.getFunctionDetails().getTenant();
String namespace = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.getFunctionDetails().getNamespace();
String name = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.getFunctionDetails().getName();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
String qualifiedFunctionName = String.format("%s/%s/%s", tenant, namespace, name);
InstanceMetrics.Builder instanceBuilder = InstanceMetrics.newBuilder();
instanceBuilder.setName(qualifiedFunctionName);
instanceBuilder.setInstanceId(instanceId);
if (metricsData != null) {
instanceBuilder.setMetricsData(metricsData);
}
metricsBuilder.addMetrics(instanceBuilder.build());
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e);
}
}
}
FunctionStats.FunctionInstanceStats functionInstanceStats = Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
metricsMapList.add(gson.toJsonTree(workerFunctionInstanceStats));
}
String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
String jsonResponse = gson.toJson(metricsMapList);
return Response.status(Status.OK).entity(jsonResponse).build();
}
}
......@@ -19,6 +19,8 @@
package org.apache.pulsar.functions.worker.rest.api.v2;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
......@@ -120,12 +122,37 @@ public class FunctionApiV2Resource extends FunctionApiResource {
}
@GET
@Path("/{tenant}/{namespace}")
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(
tenant, namespace, FunctionsImpl.FUNCTION);
@ApiOperation(
value = "Displays the stats of a Pulsar Function",
response = FunctionStats.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}/{functionName}/stats")
public Response getFunctionStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
}
@GET
@ApiOperation(
value = "Displays the stats of a Pulsar Function instance",
response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionsInstanceStats(
tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}
@POST
......
......@@ -80,12 +80,12 @@ public class WorkerStatsApiV2Resource implements Supplier<WorkerService> {
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
return worker.getWorkerMetrcis(clientAppId());
return worker.getWorkerMetrics(clientAppId());
}
@GET
@Path("/functionsmetrics")
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = WorkerImpl.WorkerFunctionInstanceStats.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running") })
public Response getFunctionsMetrics() throws IOException {
......
......@@ -28,9 +28,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.functions.metrics.MetricsSink;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -41,7 +39,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static org.mockito.Matchers.any;
......@@ -59,29 +56,6 @@ import static org.mockito.Mockito.when;
@Slf4j
public class FunctionRuntimeManagerTest {
public static class TestSink implements MetricsSink {
@Override
public void init(Map<String, String> conf) {
}
@Override
public void processRecord(InstanceCommunication.MetricsData record, Function.FunctionDetails functionDetails) {
}
@Override
public void flush() {
}
@Override
public void close() {
}
}
@Test
public void testProcessAssignmentUpdateAddFunctions() throws Exception {
......
......@@ -86,13 +86,13 @@ public class FunctionStatsGeneratorTest {
CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>();
InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
.putMetrics(
"pulsar_function_processed_total",
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
.putMetrics("pulsar_function_process_latency_ms",
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
.setReceivedTotal(101)
.setProcessedTotal(100)
.setProcessedSuccessfullyTotal(99)
.setAvgProcessLatency(10.0)
.setUserExceptionsTotal(3)
.setSystemExceptionsTotal(1)
.setLastInvocation(1542324900)
.build();
metricsDataCompletableFuture.complete(metricsData);
......@@ -121,67 +121,61 @@ public class FunctionStatsGeneratorTest {
FunctionsStatsGenerator.generate(workerService, "default", statsOut);
String str = buf.toString(Charset.defaultCharset());
buf.release();
Map<String, Metric> metrics = parseMetrics(str);
Assert.assertEquals(metrics.size(), 8);
Assert.assertEquals(metrics.size(), 7);
System.out.println("metrics: " + metrics);
Metric m = metrics.get("pulsar_function_processed_total_count");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 100.0);
m = metrics.get("pulsar_function_processed_total_max");
Metric m = metrics.get("pulsar_function_received_total");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 200.0);
assertEquals(m.value, 101.0);
m = metrics.get("pulsar_function_processed_total_sum");
m = metrics.get("pulsar_function_processed_total");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 300.0);
assertEquals(m.value, 100.0);
m = metrics.get("pulsar_function_processed_total_min");
m = metrics.get("pulsar_function_user_exceptions_total");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 0.0);
assertEquals(m.value, 3.0);
m = metrics.get("pulsar_function_process_latency_ms_count");
m = metrics.get("pulsar_function_process_latency_ms");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 10.0);
m = metrics.get("pulsar_function_process_latency_ms_max");
m = metrics.get("pulsar_function_system_exceptions_total");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 20.0);
assertEquals(m.value, 1.0);
m = metrics.get("pulsar_function_process_latency_ms_sum");
m = metrics.get("pulsar_function_last_invocation");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 30.0);
assertEquals(m.value, 1542324900.0);
m = metrics.get("pulsar_function_process_latency_ms_min");
m = metrics.get("pulsar_function_processed_successfully_total");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 0.0);
assertEquals(m.value, 99.0);
}
/**
......
......@@ -34,34 +34,6 @@
<name>Spark Streaming Pulsar Receivers</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
......@@ -97,7 +69,6 @@
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>ture</promoteTransitiveDependencies>
<artifactSet>
<includes>
<include>com.google.guava:guava</include>
......
......@@ -36,6 +36,7 @@
<module>integration</module>
<module>pulsar-kafka-compat-client-test</module>
<module>pulsar-storm-test</module>
<module>pulsar-spark-test</module>
</modules>
<build>
<plugins>
......
......@@ -18,71 +18,64 @@
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions</artifactId>
<version>2.3.0-SNAPSHOT</version>
</parent>
<parent>
<groupId>org.apache.pulsar.tests</groupId>
<artifactId>tests-parent</artifactId>
<version>2.3.0-SNAPSHOT</version>
</parent>
<artifactId>pulsar-functions-metrics</artifactId>
<name>Pulsar Functions :: Metrics</name>
<artifactId>pulsar-spark-test</artifactId>
<packaging>jar</packaging>
<name>Spark Streaming Pulsar Receivers Tests</name>
<dependencies>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-proto</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
</dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>
PrometheusMetricsServer
</finalName>
<archive>
<manifest>
<mainClass>
org.apache.pulsar.functions.sink.PrometheusMetricsServer
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
</dependency>
</dependencies>
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册