提交 8d32b588 编写于 作者: C Chris Bartholomew 提交者: Jia Zhai

Adding back Prometheus TYPE definitions; fixed duplicate TYPE errors; fixed...

Adding back Prometheus TYPE definitions; fixed duplicate TYPE errors; fixed format issue in metricWithRemoteCluster; added test for Prometheus types (#4183)

Fixes #3112 

### Motivation

In this pull request, I set out to fix #3112, which is caused by duplicate TYPE statements in the metrics output which leads to parsing of Prometheus metrics to fail in recent versions of Prometheus. Because of this, Prometheus will report the broker target as down.

Since I started looking at this, the type definitions have been removed (#4136) from topics metric output. I think these types are useful in Prometheus and have added them back in. 

While testing this fix in my geo-replicated setup, I found a format in error (missing quote and comma) in the TopicStats.metricWithRemoteCluster method. This pull request includes a fix for that issue.

I have also added a new test to PrometheusMetricsTest.java that fails without these changes but passed with them.

### Modifications

I added a static HashMap to TopicStats to keep track of the TYPEs that have been output. All writing of the TYPE for topics and namespaces is done with the TopicStats.metricType method. I modified that method to update the HashMap and only print the TYPE out for the first occurrence of the metric name.  I also added a method reset the HashMap, which gets called in NamespaceStatsAggregator.generate. 

### Verifying this change

This change added tests and can be verified as follows:
  - Added testDuplicateMetricTypeDefinitions which checks for:
       - duplicate TYPE definitions in the Prometheus output
       - validates that no TYPE definition appears after the first metric sample
       - ensures that all metrics have a defined type

I execute the test twice to make sure the resetting of the HashMap of the already seen metric type definitions works correctly. This test passes for me reliably (both occurrences).

I have confirmed using promtool that the metrics output will now parse without error using versions 2.7.1 and 2.9.2 (which is the latest). There are many warnings around missing HELP definitions and metrics using reserved suffixes (ex. _count), but no errors.

In addition, I have patched 2.3.1 with this fix and am currently running it in my cluster. Prometheus (2.7.1) successfully parses the metrics and I am able to see namespace and topic-level metrics.
上级 bf948909
......@@ -49,6 +49,7 @@ public class NamespaceStatsAggregator {
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats topicStats = localTopicStats.get();
printDefaultBrokerStats(stream, cluster);
......@@ -260,6 +261,7 @@ public class NamespaceStatsAggregator {
private static void metric(SimpleTextOutputStream stream, String cluster, String name,
long value) {
TopicStats.metricType(stream, name);
.write("{cluster=\"").write(cluster).write("\"} ")
.write(value).write(' ').write(System.currentTimeMillis())
......@@ -268,18 +270,21 @@ public class NamespaceStatsAggregator {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
long value) {
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
double value) {
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
String name, String remoteCluster, double value) {
TopicStats.metricType(stream, name);
stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......@@ -51,6 +51,10 @@ class TopicStats {
Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
// Used for tracking duplicate TYPE definitions
static Map<String, String> metricWithTypeDefinition = new HashMap<>();
public void reset() {
subscriptionsCount = 0;
producersCount = 0;
......@@ -74,6 +78,10 @@ class TopicStats {
static void resetTypes() {
static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
TopicStats stats) {
metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
......@@ -156,8 +164,18 @@ class TopicStats {
static void metricType(SimpleTextOutputStream stream, String name) {
if (!metricWithTypeDefinition.containsKey(name)) {
metricWithTypeDefinition.put(name, "gauge");
stream.write("# TYPE ").write(name).write(" gauge\n");
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String name, double value) {
metricType(stream, name);
.write("\",topic=\"").write(topic).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......@@ -165,6 +183,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
String name, long value) {
metricType(stream, name);
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......@@ -172,6 +191,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
String name, double value) {
metricType(stream, name);
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......@@ -179,6 +199,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
String consumerName, long consumerId, String name, long value) {
metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
......@@ -187,6 +208,7 @@ class TopicStats {
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
String consumerName, long consumerId, String name, double value) {
metricType(stream, name);
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
......@@ -196,6 +218,7 @@ class TopicStats {
private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
String topic,
String name, String remoteCluster, double value) {
metricType(stream, name);
stream.write("\",topic=\"").write(topic).write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
......@@ -21,10 +21,12 @@ package org.apache.pulsar.broker.stats;
import static com.google.common.base.Preconditions.checkArgument;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -136,6 +138,101 @@ public class PrometheusMetricsTest extends BrokerTestBase {
/** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser
finds a TYPE definition for the same metric more than once, it errors out:
This can happen when including topic metrics, since the same metric is reported multiple times with different labels. For example:
# TYPE pulsar_subscriptions_count gauge
pulsar_subscriptions_count{cluster="standalone"} 0 1556372982118
pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/metadata"} 1.0 1556372982118
pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/coordinate"} 1.0 1556372982118
pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/assignments"} 1.0 1556372982118
// Running the test twice to make sure types are present when generated multiple times
@Test(invocationCount = 2)
public void testDuplicateMetricTypeDefinitions() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());
Map<String, String> typeDefs = new HashMap<String, String>();
Map<String, String> metricNames = new HashMap<String, String>();
Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
Splitter.on("\n").split(metricsStr).forEach(line -> {
if (line.isEmpty()) {
if (line.startsWith("#")) {
// Check for duplicate type definitions
Matcher typeMatcher = typePattern.matcher(line);
String metricName = typeMatcher.group(1);
String type = typeMatcher.group(2);
// From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
// "Only one TYPE line may exist for a given metric name."
if (!typeDefs.containsKey(metricName)) {
typeDefs.put(metricName, type);
} else {
fail("Duplicate type definition found for TYPE definition " + metricName);
// From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
// "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
if (metricNames.containsKey(metricName)) {
fail("TYPE definition for " + metricName + " appears after first sample");
} else {
Matcher metricMatcher = metricNamePattern.matcher(line);
String metricName = metricMatcher.group(1);
metricNames.put(metricName, metricName);
// Metrics with no type definition
for (String metricName : metricNames.keySet()) {
if (!typeDefs.containsKey(metricName)) {
// This may be OK if this is a _sum or _count metric from a summary
if(metricName.endsWith("_sum")) {
String summaryMetricName = metricName.substring(0, metricName.indexOf("_sum"));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding summary type definition");
} else if (metricName.endsWith("_count")) {
String summaryMetricName = metricName.substring(0, metricName.indexOf("_count"));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding summary type definition");
} else {
fail("Metric " + metricName + " does not have a type definition");
* Hacky parsing of Prometheus text format. Sould be good enough for unit tests
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册