提交 e31c2d8e 编写于 作者: S Sijie Guo 提交者: Matteo Merli

[functions][metrics] Update function metrics (#2686)

* Fix stats generator

* Set the response to text/plain

* fix TopicStats and NamespaceStats
上级 8d075d25
......@@ -24,12 +24,9 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.eclipse.jetty.util.ConcurrentHashSet;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.Set;
public class NamespaceStatsAggregator {
private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() {
......@@ -46,8 +43,6 @@ public class NamespaceStatsAggregator {
}
};
private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
......@@ -56,8 +51,6 @@ public class NamespaceStatsAggregator {
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
METRIC_TYPES.forEach(metric -> TopicStats.metricType(stream, metric));
bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
getTopicStats(topic, topicStats, includeConsumerMetrics);
......@@ -225,33 +218,21 @@ public class NamespaceStatsAggregator {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
long value) {
if (!METRIC_TYPES.contains(name)) {
TopicStats.metricType(stream, name);
METRIC_TYPES.add(name);
}
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
double value) {
if (!METRIC_TYPES.contains(name)) {
TopicStats.metricType(stream, name);
METRIC_TYPES.add(name);
}
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
String name, String remoteCluster, double value) {
if (!METRIC_TYPES.contains(name)) {
TopicStats.metricType(stream, name);
METRIC_TYPES.add(name);
}
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......
......@@ -20,12 +20,10 @@ package org.apache.pulsar.broker.stats.prometheus;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.eclipse.jetty.util.ConcurrentHashSet;
class TopicStats {
......@@ -47,7 +45,6 @@ class TopicStats {
Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
public void reset() {
subscriptionsCount = 0;
......@@ -71,9 +68,6 @@ class TopicStats {
static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
TopicStats stats) {
METRIC_TYPES.forEach(metric -> metricType(stream, metric));
metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
......@@ -140,11 +134,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String name, double value) {
if (!METRIC_TYPES.contains(name)) {
metricType(stream, name);
METRIC_TYPES.add(name);
}
metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
.write("\",topic=\"").write(topic).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......@@ -152,11 +142,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
String name, long value) {
if (!METRIC_TYPES.contains(name)) {
metricType(stream, name);
METRIC_TYPES.add(name);
}
metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......@@ -164,11 +150,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
String name, double value) {
if (!METRIC_TYPES.contains(name)) {
metricType(stream, name);
METRIC_TYPES.add(name);
}
metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......@@ -176,11 +158,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
String consumerName, long consumerId, String name, long value) {
if (!METRIC_TYPES.contains(name)) {
metricType(stream, name);
METRIC_TYPES.add(name);
}
metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
......@@ -189,11 +167,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
String consumerName, long consumerId, String name, double value) {
if (!METRIC_TYPES.contains(name)) {
metricType(stream, name);
METRIC_TYPES.add(name);
}
metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
......
......@@ -37,12 +37,8 @@ public class FunctionsStatsGenerator {
private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class);
private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
if (workerService != null) {
METRIC_TYPES.forEach(metric -> metricType(out, metric));
Map<String, FunctionRuntimeInfo> functionRuntimes
= workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
......@@ -98,11 +94,7 @@ public class FunctionsStatsGenerator {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace,
String functionName, String metricName, int instanceId, double value) {
if (!METRIC_TYPES.contains(metricName)) {
metricType(stream, metricName);
METRIC_TYPES.add(metricName);
}
metricType(stream, metricName);
stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
.write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......
......@@ -53,7 +53,10 @@ public class FunctionsMetricsResource extends FunctionApiResource {
out.write(payload, arrayOffset, readableBytes);
out.flush();
};
return Response.ok(streamOut).build();
return Response
.ok(streamOut)
.type(MediaType.TEXT_PLAIN_TYPE)
.build();
} finally {
buf.release();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册