提交 7d75fd28 编写于 作者: C cckellogg 提交者: Sijie Guo

Pulsar website using docusaurus (#2206)

### Motivation

Improve the documentation and usability of the pulsar website. This moves the website and documentation to a new framework (https://docusaurus.io/)  which will make it easier to maintain going forward.

### Modifications

A new version of the website in site2 directory. Also updates the pulsar build docker to add the new website build dependencies.

### Result

A more usable website and documentation.

A preview of the site can be seen here: https://cckellogg.github.io/incubator-pulsar
*All the links and images might not work on this site since it's a test only site*
上级 41792f19
......@@ -50,6 +50,13 @@ RUN gpg2 --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A17031138
ENV PATH "$PATH:/usr/local/rvm/bin"
RUN rvm install 2.4.1
# Install nodejs and yarn
RUN curl -sL https://deb.nodesource.com/setup_10.x | bash -
RUN apt-get install -y nodejs
RUN curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add -
RUN echo "deb https://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list
RUN apt-get update && apt-get install yarn
# Install PIP and PDoc
RUN wget https://bootstrap.pypa.io/get-pip.py && python get-pip.py
RUN pip install pdoc
......
......@@ -1005,6 +1005,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>site/_sass/font-awesome/**</exclude>
<exclude>site/fonts/**</exclude>
<exclude>site/img/**</exclude>
<exclude>site2/**</exclude>
<exclude>generated-site/**</exclude>
<exclude>.github/*.md</exclude>
<exclude>**/.idea/*</exclude>
......
.DS_Store
node_modules
lib/core/metadata.js
lib/core/MetadataBlog.js
website/translated_docs
website/build/
website/yarn.lock
website/node_modules
website/i18n/*
# The Pulsar website and documentation
This `README` is basically the meta-documentation for the Pulsar website and documentation. Here you'll find instructions on running the site locally
## Tools
Framework [Docusaurus](https://docusaurus.io/).
Ensure you have the latest version of [Node](https://nodejs.org/en/download/) installed. We also recommend you install [Yarn](https://yarnpkg.com/en/docs/install) as well.
> You have to be on Node >= 8.x and Yarn >= 1.5.
## Running the site locally
To run the site locally:
```bash
cd website
yarn install
yarn start
```
此差异已折叠。
---
id: adaptors-spark
title: Pulsar adaptor for Apache Spark
sidebar_label: Apache Spark
---
The Spark Streaming receiver for Pulsar is a custom receiver that enables Apache [Spark Streaming](https://spark.apache.org/streaming/) to receive data from Pulsar.
An application can receive data in [Resilient Distributed Dataset](https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds) (RDD) format via the Spark Streaming Pulsar receiver and can process it in a variety of ways.
## Prerequisites
To use the receiver, include a dependency for the `pulsar-spark` library in your Java configuration.
### Maven
If you're using Maven, add this to your `pom.xml`:
```xml
<!-- in your <properties> block -->
<pulsar.version>pulsar:version</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${pulsar.version}</version>
</dependency>
```
### Gradle
If you're using Gradle, add this to your `build.gradle` file:
```groovy
def pulsarVersion = "pulsar:version"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
```
## Usage
Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` method in `JavaStreamingContext`:
```java
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
ClientConfiguration clientConf = new ClientConfiguration();
ConsumerConfiguration consConf = new ConsumerConfiguration();
String url = "pulsar://localhost:6650/";
String topic = "persistent://public/default/topic1";
String subs = "sub1";
JavaReceiverInputDStream<byte[]> msgs = jssc
.receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));
```
## Example
You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-spark/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java).
In this example, the number of messages which contain the string "Pulsar" in received messages is counted.
---
id: adaptors-storm
title: Pulsar adaptor for Apache Storm
sidebar_label: Apache Storm
---
Pulsar Storm is an adaptor for integrating with [Apache Storm](http://storm.apache.org/) topologies. It provides core Storm implementations for sending and receiving data.
An application can inject data into a Storm topology via a generic Pulsar spout, as well as consume data from a Storm topology via a generic Pulsar bolt.
## Using the Pulsar Storm Adaptor
Include dependency for Pulsar Storm Adaptor:
```xml
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-storm</artifactId>
<version>${pulsar.version}</version>
</dependency>
```
## Pulsar Spout
The Pulsar Spout allows for the data published on a topic to be consumed by a Storm topology. It emits a Storm tuple based on the message received and the `MessageToValuesMapper` provided by the client.
The tuples that fail to be processed by the downstream bolts will be re-injected by the spout with an exponential backoff, within a configurable timeout (the default is 60 seconds) or a configurable number of retries, whichever comes first, after which it is acknowledged by the consumer. Here's an example construction of a spout:
```java
// Configure a Pulsar Client
ClientConfiguration clientConf = new ClientConfiguration();
// Configure a Pulsar Consumer
ConsumerConfiguration consumerConf = new ConsumerConfiguration();
@SuppressWarnings("serial")
MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
@Override
public Values toValues(Message msg) {
return new Values(new String(msg.getData()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
declarer.declare(new Fields("string"));
}
};
// Configure a Pulsar Spout
PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");
spoutConf.setSubscriptionName("my-subscriber-name1");
spoutConf.setMessageToValuesMapper(messageToValuesMapper);
// Create a Pulsar Spout
PulsarSpout spout = new PulsarSpout(spoutConf, clientConf, consumerConf);
```
## Pulsar Bolt
The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the `TupleToMessageMapper` provided by the client.
A partitioned topic can also be used to publish messages on different topics. In the implementation of the `TupleToMessageMapper`, a "key" will need to be provided in the message which will send the messages with the same key to the same topic. Here's an example bolt:
```java
// Configure a Pulsar Client
ClientConfiguration clientConf = new ClientConfiguration();
// Configure a Pulsar Producer
ProducerConfiguration producerConf = new ProducerConfiguration();
@SuppressWarnings("serial")
TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
@Override
public Message toMessage(Tuple tuple) {
String receivedMessage = tuple.getString(0);
// message processing
String processedMsg = receivedMessage + "-processed";
return MessageBuilder.create().setContent(processedMsg.getBytes()).build();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
}
};
// Configure a Pulsar Bolt
PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");
boltConf.setTupleToMessageMapper(tupleToMessageMapper);
// Create a Pulsar Bolt
PulsarBolt bolt = new PulsarBolt(boltConf, clientConf);
```
## Example
You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-storm/src/test/java/org/apache/pulsar/storm/example/StormExample.java).
---
id: admin-api-brokers
title: Managing Brokers
sidebar_label: Brokers
---
Pulsar brokers consist of two components:
1. An HTTP server exposing a [REST interface](reference-rest-api.md) administration and {% popover topic %} lookup.
2. A dispatcher that handles all Pulsar {% popover message %} transfers.
{% popover Brokers %} can be managed via:
* The [`brokers`](reference-pulsar-admin.md#brokers) command of the [`pulsar-admin`](reference-pulsar-admin.md) tool
* The `/admin/brokers` endpoint of the admin [REST API](reference-rest-api.md)
* The `brokers` method of the {% javadoc PulsarAdmin admin org.apache.pulsar.client.admin.PulsarAdmin %} object in the [Java API](client-libraries-java.md)
In addition to being configurable when you start them up, brokers can also be [dynamically configured](#dynamic-broker-configuration).
{% include admonition.html type="info" content="
See the [Configuration](reference-configuration.md#broker) page for a full listing of broker-specific configuration parameters.
" %}
## Brokers resources
### List active brokers
Fetch all available active brokers that are serving traffic.
#### pulsar-admin
```shell
$ pulsar-admin brokers list use
```
```
broker1.use.org.com:8080
```
###### REST
{% endpoint GET /admin/brokers/:cluster %}
[More info](reference-rest-api.md#/admin/brokers/:cluster)
###### Java
```java
admin.brokers().getActiveBrokers(clusterName)
```
#### list of namespaces owned by a given broker
It finds all namespaces which are owned and served by a given broker.
###### CLI
```shell
$ pulsar-admin brokers namespaces use \
--url broker1.use.org.com:8080
```
```json
{
"my-property/use/my-ns/0x00000000_0xffffffff": {
"broker_assignment": "shared",
"is_controlled": false,
"is_active": true
}
}
```
###### REST
{% endpoint GET /admin/brokers/:cluster/:broker:/ownedNamespaces %}
###### Java
```java
admin.brokers().getOwnedNamespaces(cluster,brokerUrl);
```
### Dynamic broker configuration
One way to configure a Pulsar {% popover broker %} is to supply a [configuration](reference-configuration.md#broker) when the broker is [started up](reference-cli-tools.md#pulsar-broker).
But since all broker configuration in Pulsar is stored in {% popover ZooKeeper %}, configuration values can also be dynamically updated *while the broker is running*. When you update broker configuration dynamically, ZooKeeper will notify the broker of the change and the broker will then override any existing configuration values.
* The [`brokers`](reference-pulsar-admin.md#brokers) command for the [`pulsar-admin`](reference-pulsar-admin.md) tool has a variety of subcommands that enable you to manipulate a broker's configuration dynamically, enabling you to [update config values](#update-dynamic-configuration) and more.
* In the Pulsar admin [REST API](reference-rest-api.md), dynamic configuration is managed through the `/admin/brokers/configuration` endpoint.
### Update dynamic configuration
#### pulsar-admin
The [`update-dynamic-config`](reference-pulsar-admin.md#brokers-update-dynamic-config) subcommand will update existing configuration. It takes two arguments: the name of the parameter and the new value. Here's an example for the [`brokerShutdownTimeoutMs`](reference-configuration.md#broker-brokerShutdownTimeoutMs) parameter:
```shell
$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
```
#### REST API
{% endpoint POST /admin/brokers/configuration/:configName/:configValue %}
[More info](reference-rest-api.md#/admin/brokers/configuration/:configName/:configValue)
#### Java
```java
admin.brokers().updateDynamicConfiguration(configName, configValue);
```
### List updated values
Fetch a list of all potentially updatable configuration parameters.
#### pulsar-admin
```shell
$ pulsar-admin brokers list-dynamic-config
brokerShutdownTimeoutMs
```
#### REST API
{% endpoint GET /admin/brokers/configuration %}
[More info](reference-rest-api.md#/admin/brokers/configuration)
#### Java
```java
admin.brokers().getDynamicConfigurationNames();
```
### List all
Fetch a list of all parameters that have been dynamically updated.
#### pulsar-admin
```shell
$ pulsar-admin brokers get-all-dynamic-config
brokerShutdownTimeoutMs:100
```
#### REST API
{% endpoint GET /admin/brokers/configuration/values %}
[More info](reference-rest-api.md#/admin/brokers/configuration/values)
#### Java
```java
admin.brokers().getAllDynamicConfigurations();
```
---
id: admin-api-clusters
title: Managing Clusters
sidebar_label: Clusters
---
Pulsar clusters consist of one or more Pulsar {% popover brokers %}, one or more {% popover BookKeeper %} servers (aka {% popover bookies %}), and a {% popover ZooKeeper %} cluster that provides configuration and coordination management.
Clusters can be managed via:
* The [`clusters`](reference-pulsar-admin.md#clusters) command of the [`pulsar-admin`](reference-pulsar-admin.md) tool
* The `/admin/clusters` endpoint of the admin [REST API](reference-rest-api.md)
* The `clusters` method of the {% javadoc PulsarAdmin admin org.apache.pulsar.client.admin.PulsarAdmin %} object in the [Java API](client-libraries-java.md)
## Clusters resources
### Provision
New clusters can be provisioned using the admin interface.
> Please note that this operation requires superuser privileges.
{% include message.html id="superuser" %}
#### pulsar-admin
You can provision a new cluster using the [`create`](reference-pulsar-admin.md#clusters-create) subcommand. Here's an example:
```shell
$ pulsar-admin clusters create cluster-1 \
--url http://my-cluster.org.com:8080 \
--broker-url pulsar://my-cluster.org.com:6650
```
#### REST API
{% endpoint PUT /admin/clusters/:cluster %}
[More info](reference-rest-api.md#/admin/clusters/:cluster)
#### Java
```java
ClusterData clusterData = new ClusterData(
serviceUrl,
serviceUrlTls,
brokerServiceUrl,
brokerServiceUrlTls
);
admin.clusters().createCluster(clusterName, clusterData);
```
### Initialize cluster metadata
When provision a new cluster, you need to initialize that cluster's [metadata](getting-started-concepts-and-architecture.md#metadata-store). When initializing cluster metadata, you need to specify all of the following:
* The name of the cluster
* The local ZooKeeper connection string for the cluster
* The global ZooKeeper connection string for the entire instance
* The web service URL for the cluster
* A broker service URL enabling interaction with the {% popover brokers %} in the cluster
You must initialize cluster metadata *before* starting up any [brokers](admin-api-brokers.md) that will belong to the cluster.
{% include admonition.html type="warning" title="No cluster metadata initialization through the REST API or the Java admin API" content='
Unlike most other admin functions in Pulsar, cluster metadata initialization cannot be performed via the admin REST API or the admin Java client, as metadata initialization involves communicating with ZooKeeper directly. Instead, you can use the [`pulsar`](reference-cli-tools.md#pulsar) CLI tool, in particular the [`initialize-cluster-metadata`](reference-cli-tools.md#pulsar-initialize-cluster-metadata) command.
' %}
Here's an example cluster metadata initialization command:
```shell
bin/pulsar initialize-cluster-metadata \
--cluster us-west \
--zookeeper zk1.us-west.example.com:2181 \
--global-zookeeper zk1.us-west.example.com:2184 \
--web-service-url http://pulsar.us-west.example.com:8080/ \
--web-service-url-tls https://pulsar.us-west.example.com:8443/ \
--broker-service-url pulsar://pulsar.us-west.example.com:6650/ \
--broker-service-url-tls pulsar+ssl://pulsar.us-west.example.com:6651/
```
You'll need to use `--*-tls` flags only if you're using [TLS authentication](administration-auth.md#tls-client-auth) in your instance.
### Get configuration
You can fetch the [configuration](reference-configuration.md) for an existing cluster at any time.
#### pulsar-admin
Use the [`get`](reference-pulsar-admin.md#clusters-get) subcommand and specify the name of the cluster. Here's an example:
```shell
$ pulsar-admin clusters get cluster-1
{
"serviceUrl": "http://my-cluster.org.com:8080/",
"serviceUrlTls": null,
"brokerServiceUrl": "pulsar://my-cluster.org.com:6650/",
"brokerServiceUrlTls": null
"peerClusterNames": null
}
```
#### REST API
{% endpoint GET /admin/clusters/:cluster %}
[More info](reference-rest-api.md#/admin/clusters/:cluster)
#### Java
```java
admin.clusters().getCluster(clusterName);
```
### Update
You can update the configuration for an existing cluster at any time.
#### pulsar-admin
Use the [`update`](reference-pulsar-admin.md#clusters-update) subcommand and specify new configuration values using flags.
```shell
$ pulsar-admin clusters update cluster-1 \
--url http://my-cluster.org.com:4081 \
--broker-url pulsar://my-cluster.org.com:3350
```
#### REST
{% endpoint POST /admin/clusters/:cluster %}
[More info](reference-rest-api.md#/admin/clusters/:cluster)
#### Java
```java
ClusterData clusterData = new ClusterData(
serviceUrl,
serviceUrlTls,
brokerServiceUrl,
brokerServiceUrlTls
);
admin.clusters().updateCluster(clusterName, clusterData);
```
### Delete
Clusters can be deleted from a Pulsar {% popover instance %}.
#### pulsar-admin
Use the [`delete`](reference-pulsar-admin.md#clusters-delete) subcommand and specify the name of the cluster.
```
$ pulsar-admin clusters delete cluster-1
```
#### REST API
{% endpoint DELETE /admin/clusters/:cluster %}
[More info](reference-rest-api.md#/admin/clusters/:cluster)
#### Java
```java
admin.clusters().deleteCluster(clusterName);
```
### List
You can fetch a list of all clusters in a Pulsar {% popover instance %}.
#### pulsar-admin
Use the [`list`](reference-pulsar-admin.md#clusters-list) subcommand.
```shell
$ pulsar-admin clusters list
cluster-1
cluster-2
```
#### REST API
{% endpoint GET /admin/clusters %}
[More info](reference-rest-api.md#/admin/clusters)
###### Java
```java
admin.clusters().getClusters();
```
### Update peer-cluster data
Peer clusters can be configured for a given cluster in a Pulsar {% popover instance %}.
#### pulsar-admin
Use the [`update-peer-clusters`](reference-pulsar-admin.md#clusters-update-peer-clusters) subcommand and specify the list of peer-cluster names.
```
$ pulsar-admin update-peer-clusters cluster-1 --peer-clusters cluster-2
```
#### REST API
{% endpoint POST /admin/clusters/:cluster/peers %}
[More info](reference-rest-api.md#/admin/clusters/:cluster/peers)
#### Java
```java
admin.clusters().updatePeerClusterNames(clusterName, peerClusterList);
```
---
id: admin-api-namespaces
title: Managing Namespaces
sidebar_label: Namespaces
---
Pulsar {% popover namespaces %} are logical groupings of {% popover topics %}.
Namespaces can be managed via:
* The [`namespaces`](reference-pulsar-admin.md#clusters) command of the [`pulsar-admin`](reference-pulsar-admin.md) tool
* The `/admin/namespaces` endpoint of the admin [REST API](reference-rest-api.md)
* The `namespaces` method of the {% javadoc PulsarAdmin admin org.apache.pulsar.client.admin.PulsarAdmin %} object in the [Java API](client-libraries-java.md)
## Namespaces resources
### Create
You can create new namespaces under a given {% popover tenant %}.
#### pulsar-admin
Use the [`create`](reference-pulsar-admin.md#namespaces-create) subcommand and specify the namespace by name:
```shell
$ pulsar-admin namespaces create test-tenant/test-namespace
```
#### REST API
{% endpoint PUT /admin/namespaces/:tenant/:cluster/:namespace %}
[More info](reference-rest-api.md#/admin/namespaces/:tenant/:cluster/:namespace)
#### Java
```java
admin.namespaces().createNamespace(namespace);
```
### Get policies
You can fetch the current policies associated with a namespace at any time.
#### pulsar-admin
Use the [`policies`](reference-pulsar-admin.md#namespaces-policies) subcommand and specify the namespace:
```shell
$ pulsar-admin namespaces policies test-tenant/test-namespace
{
"auth_policies": {
"namespace_auth": {},
"destination_auth": {}
},
"replication_clusters": [],
"bundles_activated": true,
"bundles": {
"boundaries": [
"0x00000000",
"0xffffffff"
],
"numBundles": 1
},
"backlog_quota_map": {},
"persistence": null,
"latency_stats_sample_rate": {},
"message_ttl_in_seconds": 0,
"retention_policies": null,
"deleted": false
}
```
#### REST API
{% endpoint GET /admin/namespaces/:tenant/:cluster/:namespace %}
[More info](reference-rest-api.md#/admin/namespaces/:tenant/:cluster/:namespace)
#### Java
```java
admin.namespaces().getPolicies(namespace);
```
### List namespaces within a tenant
You can list all namespaces within a given Pulsar {% popover tenant %}.
#### pulsar-admin
Use the [`list`](reference-pulsar-admin.md#namespaces-list) subcommand and specify the tenant:
```shell
$ pulsar-admin namespaces list test-tenant
test-tenant/ns1
test-tenant/ns2
```
#### REST API
{% endpoint GET /admin/namespaces/:tenant %}
[More info](reference-rest-api.md#/admin/namespaces/:tenant)
#### Java
```java
admin.namespaces().getNamespaces(tenant);
```
### List namespaces within a cluster
You can list all namespaces within a given Pulsar {% popover cluster %}.
#### pulsar-admin
Use the [`list-cluster`](reference-pulsar-admin.md#namespaces-list-cluster) subcommand and specify the cluster:
```shell
$ pulsar-admin namespaces list-cluster test-tenant/cl1
test-tenant/ns1
test-tenant/ns2
```
#### REST API
{% endpoint GET /admin/namespaces/:tenant/:cluster %}
[More info](reference-rest-api.md#/admin/namespaces/:tenant/:cluster)
#### Java
```java
admin.namespaces().getNamespaces(tenant);
```
### Delete
You can delete existing namespaces from a tenant.
#### pulsar-admin
Use the [`delete`](reference-pulsar-admin.md#namespaces-delete) subcommand and specify the namespace:
```shell
$ pulsar-admin namespaces delete test-tenant/ns1
```
#### REST
{% endpoint DELETE /admin/namespaces/:tenant/:cluster/:namespace %}
[More info](reference-rest-api.md#/admin/namespaces/:tenant/:cluster/:namespace)
#### Java
```java
admin.namespaces().deleteNamespace(namespace);
```
#### set replication cluster
It sets replication clusters for a namespace, so Pulsar can internally replicate publish message from one colo to another colo.
###### CLI
```
$ pulsar-admin namespaces set-clusters test-tenant/ns1 \
--clusters cl1
```
###### REST
```
{% endpoint POST /admin/namespaces/:tenant/:namespace/replication %}
```
###### Java
```java
admin.namespaces().setNamespaceReplicationClusters(namespace, clusters);
```
#### get replication cluster
It gives a list of replication clusters for a given namespace.
###### CLI
```
$ pulsar-admin namespaces get-clusters test-tenant/cl1/ns1
```
```
cl2
```
###### REST
```
GET /admin/namespaces/{tenant}/{namespace}/replication
```
###### Java
```java
admin.namespaces().getNamespaceReplicationClusters(namespace)
```
#### set backlog quota policies
Backlog quota helps broker to restrict bandwidth/storage of a namespace once it reach certain threshold limit . Admin can set this limit and one of the following action after the limit is reached.
1. producer_request_hold: broker will hold and not persist produce request payload
2. producer_exception: broker will disconnects with client by giving exception
3. consumer_backlog_eviction: broker will start discarding backlog messages
Backlog quota restriction can be taken care by defining restriction of backlog-quota-type: destination_storage
###### CLI
```
$ pulsar-admin namespaces set-backlog-quota --limit 10 --policy producer_request_hold test-tenant/ns1
```
```
N/A
```
###### REST
```
POST /admin/namespaces/{tenant}/{namespace}/backlogQuota
```
###### Java
```java
admin.namespaces().setBacklogQuota(namespace, new BacklogQuota(limit, policy))
```
#### get backlog quota policies
It shows a configured backlog quota for a given namespace.
###### CLI
```
$ pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
```
```json
{
"destination_storage": {
"limit": 10,
"policy": "producer_request_hold"
}
}
```
###### REST
```
GET /admin/namespaces/{tenant}/{namespace}/backlogQuotaMap
```
###### Java
```java
admin.namespaces().getBacklogQuotaMap(namespace);
```
#### remove backlog quota policies
It removes backlog quota policies for a given namespace
###### CLI
```
$ pulsar-admin namespaces remove-backlog-quota test-tenant/ns1
```
```
N/A
```
###### REST
```
DELETE /admin/namespaces/{tenant}/{namespace}/backlogQuota
```
###### Java
```java
admin.namespaces().removeBacklogQuota(namespace, backlogQuotaType)
```
#### set persistence policies
Persistence policies allow to configure persistency-level for all topic messages under a given namespace.
- Bookkeeper-ack-quorum: Number of acks (guaranteed copies) to wait for each entry, default: 0
- Bookkeeper-ensemble: Number of bookies to use for a topic, default: 0
- Bookkeeper-write-quorum: How many writes to make of each entry, default: 0
- Ml-mark-delete-max-rate: Throttling rate of mark-delete operation (0 means no throttle), default: 0.0
###### CLI
```
$ pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 --bookkeeper-write-quorum 2 --ml-mark-delete-max-rate 0 test-tenant/ns1
```
```
N/A
```
###### REST
```
POST /admin/persistent/{tenant}/{namespace}/persistence
```
###### Java
```java
admin.namespaces().setPersistence(namespace,new PersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate))
```
#### get persistence policies
It shows configured persistence policies of a given namespace.
###### CLI
```
$ pulsar-admin namespaces get-persistence test-tenant/ns1
```
```json
{
"bookkeeperEnsemble": 3,
"bookkeeperWriteQuorum": 2,
"bookkeeperAckQuorum": 2,
"managedLedgerMaxMarkDeleteRate": 0
}
```
###### REST
```
GET /admin/namespaces/{tenant}/{namespace}/persistence
```
###### Java
```java
admin.namespaces().getPersistence(namespace)
```
#### unload namespace bundle
Namespace bundle is a virtual group of topics which belong to same namespace. If broker gets overloaded with number of bundles then this command can help to unload heavy bundle from that broker, so it can be served by some other less loaded broker. Namespace bundle is defined with it’s start and end range such as 0x00000000 and 0xffffffff.
###### CLI
```
$ pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/ns1
```
```
N/A
```
###### REST
```
PUT /admin/namespaces/{tenant}/{namespace}/unload
```
###### Java
```java
admin.namespaces().unloadNamespaceBundle(namespace, bundle)
```
#### set message-ttl
It configures message’s time to live (in seconds) duration.
###### CLI
```
$ pulsar-admin namespaces set-message-ttl --messageTTL 100 test-tenant/ns1
```
```
N/A
```
###### REST
```
POST /admin/namespaces/{tenant}/{namespace}/messageTTL
```
###### Java
```java
admin.namespaces().setNamespaceMessageTTL(namespace, messageTTL)
```
#### get message-ttl
It gives a message ttl of configured namespace.
###### CLI
```
$ pulsar-admin namespaces get-message-ttl test-tenant/ns1
```
```
100
```
###### REST
```
GET /admin/namespaces/{tenant}/{namespace}/messageTTL
```
###### Java
```java
admin.namespaces().getNamespaceReplicationClusters(namespace)
```
#### split bundle
Each namespace bundle can contain multiple topics and each bundle can be served by only one broker. If bundle gets heavy with multiple live topics in it then it creates load on that broker and in order to resolve this issue, admin can split bundle using this command.
###### CLI
```
$ pulsar-admin namespaces split-bundle --bundle 0x00000000_0xffffffff test-tenant/ns1
```
```
N/A
```
###### REST
```
PUT /admin/namespaces/{tenant}/{namespace}/{bundle}/split
```
###### Java
```java
admin.namespaces().splitNamespaceBundle(namespace, bundle)
```
#### clear backlog
It clears all message backlog for all the topics those belong to specific namespace. You can also clear backlog for a specific subscription as well.
###### CLI
```
$ pulsar-admin namespaces clear-backlog --sub my-subscription test-tenant/ns1
```
```
N/A
```
###### REST
```
POST /admin/namespaces/{tenant}/{namespace}/clearBacklog
```
###### Java
```java
admin.namespaces().clearNamespaceBacklogForSubscription(namespace, subscription)
```
#### clear bundle backlog
It clears all message backlog for all the topics those belong to specific NamespaceBundle. You can also clear backlog for a specific subscription as well.
###### CLI
```
$ pulsar-admin namespaces clear-backlog --bundle 0x00000000_0xffffffff --sub my-subscription test-tenant/ns1
```
```
N/A
```
###### REST
```
POST /admin/namespaces/{tenant}/{namespace}/{bundle}/clearBacklog
```
###### Java
```java
admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle, subscription)
```
#### set retention
Each namespace contains multiple topics and each topic’s retention size (storage size) should not exceed to a specific threshold or it should be stored till certain time duration. This command helps to configure retention size and time of topics in a given namespace.
###### CLI
```
$ pulsar-admin set-retention --size 10 --time 100 test-tenant/ns1
```
```
N/A
```
###### REST
```
POST /admin/namespaces/{tenant}/{namespace}/retention
```
###### Java
```java
admin.namespaces().setRetention(namespace, new RetentionPolicies(retentionTimeInMin, retentionSizeInMB))
```
#### get retention
It shows retention information of a given namespace.
###### CLI
```
$ pulsar-admin namespaces get-retention test-tenant/ns1
```
```json
{
"retentionTimeInMinutes": 10,
"retentionSizeInMB": 100
}
```
###### REST
```
GET /admin/namespaces/{tenant}/{namespace}/retention
```
###### Java
```java
admin.namespaces().getRetention(namespace)
```
#### set dispatch throttling
It sets message dispatch rate for all the topics under a given namespace.
Dispatch rate can be restricted by number of message per X seconds (`msg-dispatch-rate`) or by number of message-bytes per X second (`byte-dispatch-rate`).
dispatch rate is in second and it can be configured with `dispatch-rate-period`. Default value of `msg-dispatch-rate` and `byte-dispatch-rate` is -1 which
disables the throttling.
###### CLI
```
$ pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1
```
###### REST
```
POST /admin/namespaces/{tenant}/{namespace}/dispatchRate
```
###### Java
```java
admin.namespaces().setDispatchRate(namespace, 1000, 1048576, 1)
```
#### get configured message-rate
It shows configured message-rate for the namespace (topics under this namespace can dispatch this many messages per second)
###### CLI
```
$ pulsar-admin namespaces get-dispatch-rate test-tenant/ns1
```
```json
{
"dispatchThrottlingRatePerTopicInMsg" : 1000,
"dispatchThrottlingRatePerTopicInByte" : 1048576,
"ratePeriodInSecond" : 1
}
```
###### REST
```
GET /admin/namespaces/{tenant}/{namespace}/dispatchRate
```
###### Java
```java
admin.namespaces().getDispatchRate(namespace)
```
### Namespace isolation
Coming soon.
### Unloading from a broker
You can unload a namespace, or a {% popover namespace bundle %}, from the Pulsar {% popover broker %} that is currently responsible for it.
#### pulsar-admin
Use the [`unload`](reference-pulsar-admin.md#namespaces-unload) subcommand of the [`namespaces`](reference-pulsar-admin.md#namespaces) command.
##### Example
```shell
$ pulsar-admin namespaces unload my-tenant/my-ns
```
#### REST API
#### Java
---
id: admin-api-non-persistent-topics
title: Managing non-persistent topics
sidebar_label: Non-Persistent topics
---
Non-persistent can be used in applications that only want to consume real time published messages and
do not need persistent guarantee that can also reduce message-publish latency by removing overhead of
persisting messages.
In all of the instructions and commands below, the topic name structure is:
{% include topic.html ten="tenant" n="namespace" t="topic" %}
## Non-persistent topics resources
### Get stats
It shows current statistics of a given non-partitioned topic.
- **msgRateIn**: The sum of all local and replication publishers' publish rates in messages per second
- **msgThroughputIn**: Same as above, but in bytes per second instead of messages per second
- **msgRateOut**: The sum of all local and replication consumers' dispatch rates in messages per second
- **msgThroughputOut**: Same as above, but in bytes per second instead of messages per second
- **averageMsgSize**: The average size in bytes of messages published within the last interval
- **publishers**: The list of all local publishers into the topic. There can be zero or thousands
- **averageMsgSize**: Average message size in bytes from this publisher within the last interval
- **producerId**: Internal identifier for this producer on this topic
- **producerName**: Internal identifier for this producer, generated by the client library
- **address**: IP address and source port for the connection of this producer
- **connectedSince**: Timestamp this producer was created or last reconnected
- **subscriptions**: The list of all local subscriptions to the topic
- **my-subscription**: The name of this subscription (client defined)
- **type**: This subscription type
- **consumers**: The list of connected consumers for this subscription
- **consumerName**: Internal identifier for this consumer, generated by the client library
- **availablePermits**: The number of messages this consumer has space for in the client library's listen queue. A value of 0 means the client library's queue is full and receive() isn't being called. A nonzero value means this consumer is ready to be dispatched messages.
- **replication**: This section gives the stats for cross-colo replication of this topic
- **connected**: Whether the outbound replicator is connected
- **inboundConnection**: The IP and port of the broker in the remote cluster's publisher connection to this broker
- **inboundConnectedSince**: The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.
- **msgDropRate**: for publisher: publish: broker only allows configured number of in flight per connection, and drops all other published messages above the threshold. Broker also drops messages for subscriptions in case of unavailable limit and connection is not writable.
```json
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"msgDropRate" : 0.0,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null,
"msgDropRate" : 0.0
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers" : [ {
"msgRateOut" : 20343.506296021893,
"msgThroughputOut" : 2.0979855364233278E7,
"msgRateRedeliver" : 0.0,
"consumerName" : "fe3c0",
"availablePermits" : 950,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"address" : "/10.73.210.249:60578",
"connectedSince" : "2017-07-26 15:13:48.026-0700",
"clientVersion" : "1.19-incubating-SNAPSHOT"
} ],
"msgDropRate" : 432.2390921571593
}
},
"replication": {}
}
```
#### pulsar-admin
Topic stats can be fetched using [`stats`](reference-pulsar-admin.md#stats) command.
```shell
$ pulsar-admin non-persistent stats \
non-persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint GET /admin/non-persistent/:tenant/:namespace/:destination/stats %}
#### Java
```java
String destination = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getStats(destination);
```
### Get internal stats
It shows detailed statistics of a topic.
#### pulsar-admin
Topic internal-stats can be fetched using [`stats-internal`](reference-pulsar-admin.md#stats-internal) command.
```shell
$ pulsar-admin non-persistent stats-internal \
non-persistent://test-tenant/ns1/tp1 \
{
"entriesAddedCounter" : 48834,
"numberOfEntries" : 0,
"totalSize" : 0,
"cursors" : {
"s1" : {
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 0,
"cursorLedgerLastEntry" : 0
}
}
}
```
#### REST API
{% endpoint GET /admin/non-persistent/:tenant/:namespace/:destination/internalStats %}
#### Java
```java
String destination = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getInternalStats(destination);
```
### Create partitioned topic
Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.
#### pulsar-admin
```shell
$ bin/pulsar-admin non-persistent create-partitioned-topic \
non-persistent://my-tenant/my-namespace/my-topic \
--partitions 4
```
#### REST API
{% endpoint PUT /admin/non-persistent/:tenant/:namespace/:destination/partitions %}
#### Java
```java
String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
int numPartitions = 4;
admin.nonPersistentTopics().createPartitionedTopic(topicName, numPartitions);
```
### Get metadata
Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:
Field | Meaning
:-----|:-------
`partitions` | The number of partitions into which the topic is divided
#### pulsar-admin
```shell
$ pulsar-admin non-persistent get-partitioned-topic-metadata \
non-persistent://my-tenant/my-namespace/my-topic
{
"partitions": 4
}
```
#### REST API
{% endpoint GET /admin/non-persistent/:tenant/:namespace/:destination/partitions %}
#### Java
```java
String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getPartitionedTopicMetadata(topicName);
```
### Unload topic
It unloads a topic.
#### pulsar-admin
Topic can be unloaded using [`unload`](reference-pulsar-admin.md#unload) command.
```shell
$ pulsar-admin non-persistent unload \
non-persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint PUT /admin/non-persistent/:tenant/:namespace/:destination/unload %}
[More info](reference-rest-api.md#/admin/non-persistent/:tenant/:namespace/:destination/unload)
#### Java
```java
String destination = "non-persistent://my-tenantmy-namespace/my-topic";
admin.nonPersistentTopics().unload(destination);
```
---
id: admin-api-overview
title: The Pulsar admin interface
sidebar_label: Overview
---
The Pulsar admin interface enables you to manage all of the important entities in a Pulsar {% popover instance %}, such as {% popover properties %}, {% popover topics %}, and {% popover namespaces %}.
You can currently interact with the admin interface via:
- Making HTTP calls against the admin [REST API](reference-rest-api.md) provided by Pulsar {% popover brokers %}. For some restful apis, they might be redirected to topic owner brokers for serving
with [`307 Temporary Redirect`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/307), hence the HTTP callers should handle `307 Temporary Redirect`. If you are using `curl`, you should specify `-L`
to handle redirections.
- The `pulsar-admin` CLI tool, which is available in the `bin` folder of your [Pulsar installation](getting-started-standalone.md):
```shell
$ bin/pulsar-admin
```
Full documentation for this tool can be found in the [Pulsar command-line tools](reference-pulsar-admin.md) doc.
- A Java client interface.
> #### The REST API is the admin interface
> Under the hood, both the `pulsar-admin` CLI tool and the Java client both use the REST API. If you’d like to implement your own admin interface client, you should use the REST API as well. Full documentation can be found here.
{% include message.html id="admin_rest_api" %}
In this document, examples from each of the three available interfaces will be shown.
## Admin setup
{% include explanations/admin-setup.md %}
Each of Pulsar's three admin interfaces---the [`pulsar-admin`](reference-pulsar-admin.md) CLI tool, the [Java admin API](/api/admin), and the [REST API](reference-rest-api.md)---requires some special setup if you have [authentication](administration-auth.md#authentication-providers) enabled in your Pulsar {% popover instance %}.
### pulsar-admin
If you have [authentication](administration-auth.md#authentication-providers) enabled, you will need to provide an auth configuration to use the [`pulsar-admin`](reference-pulsar-admin.md) tool. By default, the configuration for the `pulsar-admin` tool is found in the [`conf/client.conf`](reference-configuration.md#client) file. Here are the available parameters:
{% include config.html id="client" %}
### REST API
You can find documentation for the REST API exposed by Pulsar {% popover brokers %} in [this reference document](reference-rest-api.md).
### Java admin client
To use the Java admin API, instantiate a {% javadoc PulsarAdmin admin org.apache.pulsar.client.admin.PulsarAdmin %} object, specifying a URL for a Pulsar {% popover broker %} and a {% javadoc ClientConfiguration admin org.apache.pulsar.client.admin.ClientConfiguration %}. Here's a minimal example using `localhost`:
```java
URL url = new URL("http://localhost:8080");
// Pass auth-plugin class fully-qualified name if Pulsar-security enabled
String authPluginClassName = "com.org.MyAuthPluginClass";
// Pass auth-param if auth-plugin class requires it
String authParams = "param1=value1";
boolean useTls = false;
boolean tlsAllowInsecureConnection = false;
String tlsTrustCertsFilePath = null;
ClientConfiguration config = new ClientConfiguration();
config.setAuthentication(authPluginClassName, authParams);
config.setUseTls(useTls);
config.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
config.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
PulsarAdmin admin = new PulsarAdmin(url, config);
```
---
id: admin-api-partitioned-topics
title: Managing partitioned topics
sidebar_label: Partitioned topics
---
You can use Pulsar's [admin API](admin-api-overview.md) to create and manage partitioned topics.
In all of the instructions and commands below, the topic name structure is:
{% include topic.html ten="tenant" n="namespace" t="topic" %}
## Partitioned topics resources
{% include explanations/partitioned-topic-admin.md %}
---
id: admin-api-permissions
title: Managing permissions
sidebar_label: Persmissions
---
## Permissions resources
{% include explanations/permissions.md %}
---
id: admin-api-persistent-topics
title: Managing persistent topics
sidebar_label: Persistent topics
---
Persistent helps to access topic which is a logical endpoint for publishing and consuming messages. Producers publish messages to the topic and consumers subscribe to the topic, to consume messages published to the topic.
In all of the instructions and commands below, the topic name structure is:
{% include topic.html ten="tenant" n="namespace" t="topic" %}
## Persistent topics resources
### List of topics
It provides a list of persistent topics exist under a given namespace.
#### pulsar-admin
List of topics can be fetched using [`list`](../../reference/CliTools#list) command.
```shell
$ pulsar-admin persistent list \
my-tenant/my-namespace
```
#### REST API
{% endpoint GET /admin/persistent/:tenant/:namespace %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace)
#### Java
```java
String namespace = "my-tenant/my-namespace";
admin.persistentTopics().getList(namespace);
```
### Grant permission
It grants permissions on a client role to perform specific actions on a given topic.
#### pulsar-admin
Permission can be granted using [`grant-permission`](../../reference/CliTools#grant-permission) command.
```shell
$ pulsar-admin persistent grant-permission \
--actions produce,consume --role application1 \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint POST /admin/namespaces/:tenant/:namespace/permissions/:role %}
[More info](../../reference/RestApi#/admin/namespaces/:tenant/:namespace/permissions/:role)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
String role = "test-role";
Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
admin.persistentTopics().grantPermission(destination, role, actions);
```
### Get permission
Permission can be fetched using [`permissions`](../../reference/CliTools#permissions) command.
#### pulsar-admin
```shell
$ pulsar-admin persistent permissions \
persistent://test-tenant/ns1/tp1 \
{
"application1": [
"consume",
"produce"
]
}
```
#### REST API
{% endpoint GET /admin/namespaces/:tenant/:namespace/permissions %}
[More info](../../reference/RestApi#/admin/namespaces/:tenant:namespace/permissions)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getPermissions(destination);
```
### Revoke permission
It revokes a permission which was granted on a client role.
#### pulsar-admin
Permission can be revoked using [`revoke-permission`](../../reference/CliTools#revoke-permission) command.
```shell
$ pulsar-admin persistent revoke-permission \
--role application1 \
persistent://test-tenant/ns1/tp1 \
{
"application1": [
"consume",
"produce"
]
}
```
#### REST API
{% endpoint DELETE /admin/namespaces/:tenant:namespace/permissions/:role %}
[More info](../../reference/RestApi#/admin/namespaces/:tenant/:namespace/permissions/:role)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
String role = "test-role";
admin.persistentTopics().revokePermissions(destination, role);
```
### Delete topic
It deletes a topic. The topic cannot be deleted if there's any active subscription or producers connected to it.
#### pulsar-admin
Topic can be deleted using [`delete`](../../reference/CliTools#delete) command.
```shell
$ pulsar-admin persistent delete \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint DELETE /admin/persistent/:tenant/:namespace/:destination %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:destination)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().delete(destination);
```
### Unload topic
It unloads a topic.
#### pulsar-admin
Topic can be unloaded using [`unload`](../../reference/CliTools#unload) command.
```shell
$ pulsar-admin persistent unload \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint PUT /admin/persistent/:tenant/:namespace/:destination/unload %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:destination/unload)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().unload(destination);
```
### Get stats
It shows current statistics of a given non-partitioned topic.
- **msgRateIn**: The sum of all local and replication publishers' publish rates in messages per second
- **msgThroughputIn**: Same as above, but in bytes per second instead of messages per second
- **msgRateOut**: The sum of all local and replication consumers' dispatch rates in messages per second
- **msgThroughputOut**: Same as above, but in bytes per second instead of messages per second
- **averageMsgSize**: The average size in bytes of messages published within the last interval
- **storageSize**: The sum of the ledgers' storage size for this topic. See
- **publishers**: The list of all local publishers into the topic. There can be zero or thousands
- **averageMsgSize**: Average message size in bytes from this publisher within the last interval
- **producerId**: Internal identifier for this producer on this topic
- **producerName**: Internal identifier for this producer, generated by the client library
- **address**: IP address and source port for the connection of this producer
- **connectedSince**: Timestamp this producer was created or last reconnected
- **subscriptions**: The list of all local subscriptions to the topic
- **my-subscription**: The name of this subscription (client defined)
- **msgBacklog**: The count of messages in backlog for this subscription
- **type**: This subscription type
- **msgRateExpired**: The rate at which messages were discarded instead of dispatched from this subscription due to TTL
- **consumers**: The list of connected consumers for this subscription
- **consumerName**: Internal identifier for this consumer, generated by the client library
- **availablePermits**: The number of messages this consumer has space for in the client library's listen queue. A value of 0 means the client library's queue is full and receive() isn't being called. A nonzero value means this consumer is ready to be dispatched messages.
- **replication**: This section gives the stats for cross-colo replication of this topic
- **replicationBacklog**: The outbound replication backlog in messages
- **connected**: Whether the outbound replicator is connected
- **replicationDelayInSeconds**: How long the oldest message has been waiting to be sent through the connection, if connected is true
- **inboundConnection**: The IP and port of the broker in the remote cluster's publisher connection to this broker
- **inboundConnectedSince**: The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.
```json
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
```
#### pulsar-admin
Topic stats can be fetched using [`stats`](../../reference/CliTools#stats) command.
```shell
$ pulsar-admin persistent stats \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint GET /admin/persistent/:tenant/:namespace/:destination/stats %}
[More info](../../reference/RestApi#/admin/persistent/:tenant:namespace/:destination/stats)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getStats(destination);
```
### Get internal stats
It shows detailed statistics of a topic.
- **entriesAddedCounter**: Messages published since this broker loaded this topic
- **numberOfEntries**: Total number of messages being tracked
- **totalSize**: Total storage size in bytes of all messages
- **currentLedgerEntries**: Count of messages written to the ledger currently open for writing
- **currentLedgerSize**: Size in bytes of messages written to ledger currently open for writing
- **lastLedgerCreatedTimestamp**: time when last ledger was created
- **lastLedgerCreationFailureTimestamp:** time when last ledger was failed
- **waitingCursorsCount**: How many cursors are "caught up" and waiting for a new message to be published
- **pendingAddEntriesCount**: How many messages have (asynchronous) write requests we are waiting on completion
- **lastConfirmedEntry**: The ledgerid:entryid of the last message successfully written. If the entryid is -1, then the ledger has been opened or is currently being opened but has no entries written yet.
- **state**: The state of this ledger for writing. LedgerOpened means we have a ledger open for saving published messages.
- **ledgers**: The ordered list of all ledgers for this topic holding its messages
- **cursors**: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.
- **markDeletePosition**: The ack position: the last message the subscriber acknowledged receiving
- **readPosition**: The latest position of subscriber for reading message
- **waitingReadOp**: This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.
- **pendingReadOps**: The counter for how many outstanding read requests to the BookKeepers we have in progress
- **messagesConsumedCounter**: Number of messages this cursor has acked since this broker loaded this topic
- **cursorLedger**: The ledger being used to persistently store the current markDeletePosition
- **cursorLedgerLastEntry**: The last entryid used to persistently store the current markDeletePosition
- **individuallyDeletedMessages**: If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position
- **lastLedgerSwitchTimestamp**: The last time the cursor ledger was rolled over
- **state**: The state of the cursor ledger: Open means we have a cursor ledger for saving updates of the markDeletePosition.
```json
{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}
```
#### pulsar-admin
Topic internal-stats can be fetched using [`stats-internal`](../../reference/CliTools#stats-internal) command.
```shell
$ pulsar-admin persistent stats-internal \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint GET /admin/persistent/:tenant/:namespace/:destination/internalStats %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:destination/internalStats)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getInternalStats(destination);
```
### Peek messages
It peeks N messages for a specific subscription of a given topic.
#### pulsar-admin
```shell
$ pulsar-admin persistent peek-messages \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
Message ID: 315674752:0
Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }
msg-payload
```
#### REST API
{% endpoint GET /admin/persistent/:tenant/:namespace/:destination/subscription/:subName/position/:messagePosition %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:destination/subscription/:subName/position/:messagePosition)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
int numMessages = 1;
admin.persistentTopics().peekMessages(destination, subName, numMessages);
```
### Skip messages
It skips N messages for a specific subscription of a given topic.
#### pulsar-admin
```shell
$ pulsar-admin persistent skip \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint POST /admin/persistent/:tenant/:namespace/:destination/subscription/:subName/skip/:numMessages %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:destination/subscription/:subName/skip/:numMessages)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
int numMessages = 1;
admin.persistentTopics().skipMessages(destination, subName, numMessages);
```
### Skip all messages
It skips all old messages for a specific subscription of a given topic.
#### pulsar-admin
```shell
$ pulsar-admin persistent skip-all \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint POST /admin/persistent/:tenant/:namespace/:destination/subscription/:subName/skip_all %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:destination/subscription/:subName/skip_all)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
admin.persistentTopics().skipAllMessages(destination, subName);
```
### Reset cursor
It resets a subscription’s cursor position back to the position which was recorded X minutes before. It essentially calculates time and position of cursor at X minutes before and resets it at that position.
#### pulsar-admin
```shell
$ pulsar-admin persistent reset-cursor \
--subscription my-subscription --time 10 \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint POST /admin/persistent/:tenant/:namespace/:destination/subscription/:subName/resetcursor/:timestamp %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:destination/subscription/:subName/resetcursor/:timestamp)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
long timestamp = 2342343L;
admin.persistentTopics().skipAllMessages(destination, subName, timestamp);
```
### Lookup of topic
It locates broker url which is serving the given topic.
#### pulsar-admin
```shell
$ pulsar-admin persistent lookup \
persistent://test-tenant/ns1/tp1 \
"pulsar://broker1.org.com:4480"
```
#### REST API
{% endpoint GET /lookup/v2/destination/persistent/:tenant:namespace/:destination %}
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
admin.lookup().lookupDestination(destination);
```
### Get bundle
It gives range of the bundle which contains given topic
#### pulsar-admin
```shell
$ pulsar-admin persistent bundle-range \
persistent://test-tenant/ns1/tp1 \
"0x00000000_0xffffffff"
```
#### REST API
{% endpoint GET /lookup/v2/destination/:destination_domain/:tenant/:namespace/:destination/bundle %}
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
admin.lookup().getBundleRange(destination);
```
### Get subscriptions
It shows all subscription names for a given topic.
#### pulsar-admin
```shell
$ pulsar-admin persistent subscriptions \
persistent://test-tenant/ns1/tp1 \
my-subscription
```
#### REST API
{% endpoint GET /admin/persistent/:tenant/:namespace/:destination/subscriptions %}
[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:destination/subscriptions)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getSubscriptions(destination);
```
### Unsubscribe
It can also help to unsubscribe a subscription which is no more processing further messages.
#### pulsar-admin
```shell
$ pulsar-admin persistent unsubscribe \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
```
#### REST API
{% endpoint POST /admin/namespaces/:tenant/:namespace/unsubscribe/:subscription %}
[More info](../../reference/RestApi#/admin/namespaces/:tenant/:namespace/unsubscribe/:subscription)
#### Java
```java
String destination = "persistent://my-tenant/my-namespace/my-topic";
String subscriptionName = "my-subscription";
admin.persistentTopics().deleteSubscription(destination, subscriptionName);
```
---
id: admin-api-tenants
title: Managing Tenants
sidebar_label: Tenants
---
Tenants, like namespaces, can be managed using the [admin API](admin-api-overview.md). There are currently two configurable aspects of tenants:
* Admin roles
* Allowed clusters
## Tenant resources
### List
#### pulsar-admin
You can list all of the tenants associated with an {% popover instance %} using the [`list`](reference-pulsar-admin.md#tenants-list) subcommand:
```shell
$ pulsar-admin tenants list
```
That will return a simple list, like this:
```
my-tenant-1
my-tenant-2
```
### Create
#### pulsar-admin
You can create a new tenant using the [`create`](reference-pulsar-admin.md#tenants-create) subcommand:
```shell
$ pulsar-admin tenants create my-tenant
```
When creating a tenant, you can assign admin roles using the `-r`/`--admin-roles` flag. You can specify multiple roles as a comma-separated list. Here are some examples:
```shell
$ pulsar-admin tenants create my-tenant \
--admin-roles role1,role2,role3
$ pulsar-admin tenants create my-tenant \
-r role1
```
### Get configuration
#### pulsar-admin
You can see a tenant's configuration as a JSON object using the [`get`](reference-pulsar-admin.md#tenants-get) subcommand and specifying the name of the tenant:
```shell
$ pulsar-admin tenants get my-tenant
{
"adminRoles": [
"admin1",
"admin2"
],
"allowedClusters": [
"cl1",
"cl2"
]
}
```
### Delete
#### pulsar-adnin
You can delete a tenant using the [`delete`](reference-pulsar-admin.md#tenants-delete) subcommand and specifying the tenant name:
```shell
$ pulsar-admin tenants delete my-tenant
```
### Updating
#### pulsar-admin
You can update a tenant's configuration using the [`update`](reference-pulsar-admin.md#tenants-update) subcommand
---
id: administration-auth
title: Authentication and authorization in Pulsar
sidebar_label: Authentication and authorization
---
Pulsar supports a pluggable authentication mechanism that Pulsar clients can use to authenticate with {% popover brokers %}. Pulsar can also be configured to support multiple authentication sources.
## Role tokens
In Pulsar, a *role* is a string, like `admin` or `app1`, that can represent a single client or multiple clients. Roles are used to control permission for clients to produce or consume from certain topics, administer the configuration for properties, and more.
The purpose of the [authentication provider](#authentication-providers) is to establish the identity of the client and then assign that client a *role token*. This role token is then used to determine what the client is authorized to do.
## Authentication providers
Out of the box, Pulsar supports two authentication providers:
* [TLS client auth](#tls-client-auth)
* [Athenz](#athenz)
### TLS client auth
In addition to providing connection encryption between Pulsar clients and {% popover brokers %}, [Transport Layer Security](https://en.wikipedia.org/wiki/Transport_Layer_Security) (TLS) can be used to identify clients through a certificate signed by a trusted certificate authority.
#### Creating certificates
Creating TLS certificates for Pulsar involves creating a [certificate authority](#certificate-authority) (CA), [broker certificate](#broker-certificate), and [client certificate](#client-certificate).
##### Certificate authority
The first step is to create the certificate for the CA. The CA will be used to sign both the broker and client certificates, in order to ensure that each party will trust the others.
###### Linux
```bash
$ CA.pl -newca
```
###### macOS
```bash
$ /System/Library/OpenSSL/misc/CA.pl -newca
```
After answering the question prompts, this will store CA-related files in the `./demoCA` directory. Within that directory:
* `demoCA/cacert.pem` is the public certificate. It is meant to be distributed to all parties involved.
* `demoCA/private/cakey.pem` is the private key. This is only needed when signing a new certificate for either broker or clients and it must be safely guarded.
##### Broker certificate
Once a CA certificate has been created, you can create certificate requests and sign them with the CA.
The following commands will ask you a few questions and then create the certificates. When asked for the common name, you need to match the hostname of the broker. You could also use a wildcard to match a group of broker hostnames, for example `*.broker.usw.example.com`. This ensures that the same certificate can be reused on multiple machines.
```shell
$ openssl req \
-newkey rsa:2048 \
-sha256 \
-nodes \
-out broker-cert.csr \
-outform PEM
```
Convert the key to [PKCS 8](https://en.wikipedia.org/wiki/PKCS_8) format:
```shell
$ openssl pkcs8 \
-topk8 \
-inform PEM \
-outform PEM \
-in privkey.pem \
-out broker-key.pem \
-nocrypt
```
This will create two broker certificate files named `broker-cert.csr` and `broker-key.pem`. Now you can create the signed certificate:
```shell
$ openssl ca \
-out broker-cert.pem \
-infiles broker-cert.csr
```
At this point, you should have a `broker-cert.pem` and `broker-key.pem` file. These will be needed for the broker.
##### Client certificate
To create a client certificate, repeat the steps in the previous section, but did create `client-cert.pem` and `client-key.pem` files instead.
For the client common name, you need to use a string that you intend to use as the [role token](#role-tokens) for this client, though it doesn't need to match the client hostname.
#### Configure the broker for TLS
To configure a Pulsar broker to use TLS authentication, you'll need to make some changes to the `broker.conf` configuration file, which is located in the `conf` directory of your [Pulsar installation](getting-started-standalone.md).
Add these values to the configuration file (substituting the appropriate certificate paths where necessary):
```properties
# Enable TLS and point the broker to the right certs
tlsEnabled=true
tlsCertificateFilePath=/path/to/broker-cert.pem
tlsKeyFilePath=/path/to/broker-key.pem
tlsTrustCertsFilePath=/path/to/cacert.pem
# Enable the TLS auth provider
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls
```
> A full listing of parameters available in the `conf/broker.conf` file, as well as the default values for those parameters, can be found in Broker Configuration.
#### Configure the discovery service
The discovery service used by Pulsar brokers needs to redirect all HTTPS requests, which means that it needs to be trusted by the client as well. Add this configuration in `conf/discovery.conf` in your Pulsar installation:
```properties
tlsEnabled=true
tlsCertificateFilePath=/path/to/broker-cert.pem
tlsKeyFilePath=/path/to/broker-key.pem
```
#### Configure clients
For more information on Pulsar client authentication using TLS, see the following language-specific docs:
* [Java client](client-libraries-java.md)
* [C++ client](client-libraries-cpp.md)
#### Configure CLI tools
[Command-line tools](reference-cli-tools.md) like [`pulsar-admin`](reference-pulsar-admin.md), [`pulsar-perf`](reference-cli-tools.md#pulsar-perf), and [`pulsar-client`](reference-cli-tools.md#pulsar-client) use the `conf/client.conf` config file in a Pulsar installation.
You'll need to add the following authentication parameters to that file to use TLS with Pulsar's CLI tools:
```properties
serviceUrl=https://broker.example.com:8443/
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
authParams=tlsCertFile:/path/to/client-cert.pem,tlsKeyFile:/path/to/client-key.pem
useTls=true
tlsAllowInsecureConnection=false
tlsTrustCertsFilePath=/path/to/cacert.pem
```
### Athenz
[Athenz](https://github.com/yahoo/athenz) is a role-based authentication/authorization system. In Pulsar, Athenz [role tokens](#role-tokens) (aka *z-tokens*) can be used to establish the identify of the client.
#### Athenz authentication settings
In a [decentralized Athenz system](https://github.com/yahoo/athenz/blob/master/docs/dev_decentralized_access.md) there is both an [authori**Z**ation **M**anagement **S**ystem](https://github.com/yahoo/athenz/blob/master/docs/setup_zms.md) (ZMS) server and an [authori**Z**ation **T**oken **S**ystem](https://github.com/yahoo/athenz/blob/master/docs/setup_zts.md) (ZTS) server.
To begin, you need to set up Athenz service access control. You should create domains for the *provider* (which provides some resources to other services with some authentication/authorization policies) and the *tenant* (which is provisioned to access some resources in a provider). In this case, the provider corresponds to the Pulsar service itself and the tenant corresponds to each application using Pulsar (typically, a property in Pulsar).
##### Create the tenant domain and service
On the tenant side, you need to:
1. Create a domain, such as `shopping`
2. Generate a private/public key pair
3. Create a service, such as `some_app`, on the domain with the public key
Note that the private key generated in step 2 needs to be specified when the Pulsar client connects to the broker (see client configuration examples for [Java](client-libraries-java.md#tls-authentication) and [C++](client-libraries-cpp.md#tls-authentication)).
For more specific steps involving the Athenz UI, please refer to [this doc](https://github.com/yahoo/athenz/blob/master/docs/example_service_athenz_setup.md#client-tenant-domain).
##### Create the provider domain and add the tenant service to some role members
On the provider side, you need to:
1. Create a domain, such as `pulsar`
2. Create a role
3. Add the tenant service to members of the role
Note that in step 2 any action and resource can be specified since they are not used on Pulsar. In other words, Pulsar uses the Athenz role token only for authentication, *not* for authorization.
For more specific steps involving UI, please refer to [this doc](https://github.com/yahoo/athenz/blob/master/docs/example_service_athenz_setup.md#server-provider-domain).
#### Configure the broker for Athenz
{% include message.html id="tls_role_tokens" %}
> #### TLS encryption strongly recommended
> Please note that using TLS encryption is strongly recommended when using Athenz as an authentication provider, as it can protect role tokens from being intercepted and reused (see also [this doc]()).
In the `conf/broker.conf` configuration file in your Pulsar installation, you need to provide the class name of the Athenz authentication provider as well as a comma-separated list of provider domain names.
```properties
# Add the Athenz auth provider
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderAthenz
athenzDomainNames=pulsar
# Enable TLS
tlsEnabled=true
tlsCertificateFilePath=/path/to/broker-cert.pem
tlsKeyFilePath=/path/to/broker-key.pem
```
> A full listing of parameters available in the conf/broker.conf file, as well as the default values for those parameters, can be found in [Broker Configuration]().
#### Configure clients for Athenz
For more information on Pulsar client authentication using Athenz, see the following language-specific docs:
* [Java client](client-libraries-java.md#athenz)
#### Configure CLI tools for Athenz
[Command-line tools](reference-cli-tools.md) like [`pulsar-admin`](reference-pulsar-admin.md), [`pulsar-perf`](reference-cli-tools.md#pulsar-perf), and [`pulsar-client`](reference-cli-tools.md#pulsar-client) use the `conf/client.conf` config file in a Pulsar installation.
You’ll need to add the following authentication parameters to that file to use Athenz with Pulsar’s CLI tools:
```properties
# URL for the broker
serviceUrl=https://broker.example.com:8443/
# Set Athenz auth plugin and its parameters
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationAthenz
authParams={"tenantDomain":"shopping","tenantService":"some_app","providerDomain":"pulsar","privateKey":"file:///path/to/private.pem","keyId":"v1"}
# Enable TLS
useTls=true
tlsAllowInsecureConnection=false
tlsTrustCertsFilePath=/path/to/cacert.pem
```
## Authorization
In Pulsar, the [authentication provider](#authentication-providers) is charged with properly identifying clients and associating them with [role tokens](#role-tokens). *Authorization* is the process that determines *what* clients are able to do.
Authorization in Pulsar is managed at the tenant level, which means that you can have multiple authorization schemes active in a single Pulsar instance. You could, for example, create a `shopping` tenant that has one set of [roles](#role-tokens) and applies to a shopping application used by your company, while an `inventory` tenant would be used only by an inventory application.
> When working with properties, you can specify which of your Pulsar clusters your tenant is allowed to use. This enables you to also have cluster-level authorization schemes.
## Creating a new tenant
A Pulsar tenant is typically provisioned by Pulsar {% popover instance %} administrators or by some kind of self-service portal.
Properties are managed using the [`pulsar-admin`](reference-pulsar-admin.md) tool. Here's an example property creation command:
```shell
$ bin/pulsar-admin tenants create my-tenant \
--admin-roles my-admin-role \
--allowed-clusters us-west,us-east
```
This command will create a new property `my-tenant` that will be allowed to use the clusters `us-west` and `us-east`.
A client that successfully identified itself as having the role `my-admin-role` would then be allowed to perform all administrative tasks on this tenant.
The structure of topic names in Pulsar reflects the hierarchy between tenants, clusters, and [namespaces](#managing-namespaces):
```http
persistent://tenant/namespace/topic
```
## Managing permissions
Permissions in Pulsar are managed at the namespace level (that is, within tenants and clusters).
### Grant permissions
You can grant permissions to specific roles for lists of operations such as `produce` and `consume`.
#### pulsar-admin
Use the [`grant-permission`](reference-pulsar-admin.md#namespaces-grant-permission) subcommand and specify a namespace, actions using the `--actions` flag, and a role using the `--role` flag:
```shell
$ pulsar-admin namespaces grant-permission test-tenant/ns1 \
--actions produce,consume \
--role admin10
```
Wildcard authorization can be performed when `authorizationAllowWildcardsMatching` is set to `true` in `broker.conf`.
e.g.
```shell
$ pulsar-admin namespaces grant-permission test-tenant/ns1 \
--actions produce,consume \
--role 'my.role.*'
```
Then, roles `my.role.1`, `my.role.2`, `my.role.foo`, `my.role.bar`, etc. can produce and consume.
```shell
$ pulsar-admin namespaces grant-permission test-tenant/ns1 \
--actions produce,consume \
--role '*.role.my'
```
Then, roles `1.role.my`, `2.role.my`, `foo.role.my`, `bar.role.my`, etc. can produce and consume.
**Note**: A wildcard matching works at **the beginning or end of the role name only**.
e.g.
```shell
$ pulsar-admin namespaces grant-permission test-tenant/ns1 \
--actions produce,consume \
--role 'my.*.role'
```
In this case, only the role `my.*.role` has permissions.
Roles `my.1.role`, `my.2.role`, `my.foo.role`, `my.bar.role`, etc. **cannot** produce and consume.
#### REST API
```http
POST /admin/v2/namespaces/:tenant/:namespace/permissions/:role
```
[More info](reference-rest-api.md#/admin/namespaces/:property/:cluster/:namespace/permissions/:role)
#### Java
```java
admin.namespaces().grantPermissionOnNamespace(namespace, role, getAuthActions(actions));
```
### Get permission
You can see which permissions have been granted to which roles in a namespace.
#### pulsar-admin
Use the [`permissions`](reference-pulsar-admin.md#namespaces-permissions) subcommand and specify a namespace:
```shell
$ pulsar-admin namespaces permissions test-tenant/ns1
{
"admin10": [
"produce",
"consume"
]
}
```
#### REST API
```http
GET /admin/v2/namespaces/:tenant/:namespace/permissions
```
[More info](reference-rest-api.md#/admin/namespaces/:property/:cluster/:namespace/permissions)
#### Java
```java
admin.namespaces().getPermissions(namespace);
```
### Revoke permissions
You can revoke permissions from specific roles, which means that those roles will no longer have access to the specified namespace.
#### pulsar-admin
Use the [`revoke-permission`](reference-pulsar-admin.md#revoke-permission) subcommand and specify a namespace and a role using the `--role` flag:
```shell
$ pulsar-admin namespaces revoke-permission test-tenant/ns1 \
--role admin10
```
#### REST API
```http
DELETE /admin/v2/namespaces/:tenant/:namespace/permissions/:role
```
[More info](reference-rest-api.md#/admin/namespaces/:property/:cluster/:namespace/permissions/:role)
#### Java
```java
admin.namespaces().revokePermissionsOnNamespace(namespace, role);
```
## Superusers
In Pulsar you can assign certain roles to be *superusers* of the system. A superuser is allowed to perform all administrative tasks on all properties and namespaces, as well as to publish and subscribe to all topics.
Superusers are configured in the broker configuration file in [`conf/broker.conf`](reference-configuration.md#broker) configuration file, using the [`superUserRoles`](reference-configuration.md#broker-superUserRoles) parameter:
```properties
superUserRoles=my-super-user-1,my-super-user-2
```
> A full listing of parameters available in the `conf/broker.conf` file, as well as the default values for those parameters, can be found in [Broker Configuration]().
Typically, superuser roles are used for administrators and clients but also for broker-to-broker authorization. When using [geo-replication](administration-geo.md), every broker
needs to be able to publish to other clusters' topics.
## Pulsar admin authentication
```java
String authPluginClassName = "com.org.MyAuthPluginClass";
String authParams = "param1:value1";
boolean useTls = false;
boolean tlsAllowInsecureConnection = false;
String tlsTrustCertsFilePath = null;
ClientConfiguration config = new ClientConfiguration();
config.setAuthentication(authPluginClassName, authParams);
config.setUseTls(useTls);
config.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
config.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
PulsarAdmin admin = new PulsarAdmin(url, config);
```
To use TLS:
```java
String authPluginClassName = "com.org.MyAuthPluginClass";
String authParams = "param1:value1";
boolean useTls = false;
boolean tlsAllowInsecureConnection = false;
String tlsTrustCertsFilePath = null;
ClientConfiguration config = new ClientConfiguration();
config.setAuthentication(authPluginClassName, authParams);
config.setUseTls(useTls);
config.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
config.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
PulsarAdmin admin = new PulsarAdmin(url, config);
```
---
id: administration-dashboard
title: The Pulsar dashboard
sidebar_label: Dashboard
---
The Pulsar dashboard is a web application that enables users to monitor current stats for all {% popover topics %} in tabular form.
The dashboard is a data collector that polls stats from all the brokers in a Pulsar instance (across multiple clusters) and stores all the information in a [PostgreSQL](https://www.postgresql.org/) database.
A [Django](https://www.djangoproject.com) web app is used to render the collected data.
## Install
The easiest way to use the dashboard is to run it inside a [Docker](https://www.docker.com/products/docker) container. A [`Dockerfile`](pulsar:repo_url/dashboard/Dockerfile) to generate the image is provided.
To generate the Docker image:
```shell
$ docker build -t pulsar-dashboard dashboard
```
To run the dashboard:
```shell
$ SERVICE_URL=http://broker.example.com:8080/
$ docker run -p 80:80 \
-e SERVICE_URL=$SERVICE_URL \
apachepulsar/pulsar-dashboard
```
You need to specify only one service URL for a Pulsar cluster. Internally, the collector will figure out all the existing clusters and the brokers from where it needs to pull the metrics. If you're connecting the dashboard to Pulsar running in standalone mode, the URL will be `http://localhost:8080` by default.
Once the Docker container is running, the web dashboard will be accessible via `localhost` or whichever host is being used by Docker.
### Known issues
Pulsar [authentication](administration-auth.md#authentication-providers) is not supported at this point. The dashboard's data collector does not pass any authentication-related data and will be denied access if the Pulsar broker requires authentication.
---
id: administration-geo
title: Pulsar geo-replication
sidebar_label: Geo-replication
---
*Geo-replication* is the replication of persistently stored message data across multiple clusters of a Pulsar instance.
## How it works
The diagram below illustrates the process of geo-replication across Pulsar clusters:
![Replication Diagram](/docs/assets/geo-replication.png)
In this diagram, whenever producers **P1**, **P2**, and **P3** publish messages to the topic **T1** on clusters **Cluster-A**, **Cluster-B**, and **Cluster-C**, respectively, those messages are instantly replicated across clusters. Once replicated, consumers **C1** and **C2** can consume those messages from their respective clusters.
Without geo-replication, consumers **C1** and **C2** wouldn't be able to consume messages published by producer **P3**.
## Geo-replication and Pulsar properties
Geo-replication must be enabled on a per-tenant basis in Pulsar. Geo-replication can be enabled between clusters only when a property has been created that allows access to both clusters.
Although geo-replication must be enabled between two clusters, it's actually managed at the namespace level. You must do the following to enable geo-replication for a namespace:
* [Create a global namespace](#creating-global-namespaces)
* Configure that namespace to replicate between two or more provisioned clusters
Any message published on *any* topic in that namespace will then be replicated to all clusters in the specified set.
## Local persistence and forwarding
When messages are produced on a Pulsar topic, they are first persisted in the local cluster and then forwarded asynchronously to the remote clusters.
In normal cases, when there are no connectivity issues, messages are replicated immediately, at the same time as they are dispatched to local consumers. Typically, end-to-end delivery latency is defined by the network [round-trip time](https://en.wikipedia.org/wiki/Round-trip_delay_time) (RTT) between the remote regions.
Applications can create producers and consumers in any of the clusters, even when the remote clusters are not reachable (like during a network partition).
> #### Subscriptions are local to a cluster
> While producers and consumers can publish to and consume from any cluster in a Pulsar instance, subscriptions are local to the clusters in which they are created and cannot be transferred between clusters. If you do need to transfer a subscription, you’ll need to create a new subscription in the desired cluster.
In the example in the image above, the topic **T1** is being replicated between 3 clusters, **Cluster-A**, **Cluster-B**, and **Cluster-C**.
All messages produced in any cluster will be delivered to all subscriptions in all the other clusters. In this case, consumers **C1** and **C2** will receive all messages published by producers **P1**, **P2**, and **P3**. Ordering is still guaranteed on a per-producer basis.
## Configuring replication
As stated [above](#geo-replication-and-pulsar-properties), geo-replication in Pulsar is managed at the {% popover property %} level.
### Granting permissions to properties
To establish replication to a cluster, the tenant needs permission to use that cluster. This permission can be granted when the property is created or later on.
At creation time, specify all the intended clusters:
```shell
$ bin/pulsar-admin properties create my-property \
--admin-roles my-admin-role \
--allowed-clusters us-west,us-east,us-cent
```
To update permissions of an existing property, use `update` instead of `create`.
### Creating global namespaces
Replication must be used with *global* topics, meaning topics that belong to a global namespace and are thus not tied to any particular cluster.
Global namespaces need to be created in the `global` virtual cluster. For example:
```shell
$ bin/pulsar-admin namespaces create my-tenant/my-namespace
```
Initially, the namespace is not assigned to any cluster. You can assign the namespace to clusters using the `set-clusters` subcommand:
```shell
$ bin/pulsar-admin namespaces set-clusters my-tenant/my-namespace \
--clusters us-west,us-east,us-cent
```
The replication clusters for a namespace can be changed at any time, with no disruption to ongoing traffic. Replication channels will be immediately set up or stopped in all the clusters as soon as the configuration changes.
### Using global topics
Once you've created a global namespace, any topics that producers or consumers create within that namespace will be global. Typically, each application will use the `serviceUrl` for the local cluster.
#### Selective replication
By default, messages are replicated to all clusters configured for the namespace. You can restrict replication selectively by specifying a replication list for a message. That message will then be replicated only to the subset in the replication list.
Below is an example for the [Java API](client-libraries-java.md). Note the use of the `setReplicationClusters` method when constructing the {% javadoc Message client org.apache.pulsar.client.api.Message %} object:
```java
List<String> restrictReplicationTo = Arrays.asList(
"us-west",
"us-east"
);
Producer producer = client.newProducer()
.topic("some-topic")
.create();
producer.newMessage()
.value("my-payload".getBytes())
.setReplicationClusters(restrictReplicationTo)
.send();
```
#### Topic stats
Topic-specific statistics for global topics are available via the [`pulsar-admin`](reference-pulsar-admin.md) tool and [REST API](reference-rest-api.md):
```shell
$ bin/pulsar-admin persistent stats persistent://my-tenant/my-namespace/my-topic
```
Each cluster reports its own local stats, including incoming and outgoing replication rates and backlogs.
#### Deleting a global topic
Given that global topics exist in multiple regions, it's not possible to directly delete a global topic. Instead, you should rely on automatic topic garbage collection.
In Pulsar, a topic is automatically deleted when it's no longer used, that is to say, when no producers or consumers are connected *and* there are no subscriptions. For global topics, each region will use a fault-tolerant mechanism to decide when it's safe to delete the topic locally.
To delete a global topic, close all producers and consumers on the topic and delete all its local subscriptions in every replication cluster. When Pulsar determines that no valid subscription for the topic remains across the system, it will garbage collect the topic.
---
id: administration-load-distribution
title: Pulsar load distribution
sidebar_label: Load distribution
---
## Load distribution across Pulsar brokers
Pulsar is an horizontally scalable messaging system, so it is a core requirement that the traffic
in a logical cluster must be spread across all the available Pulsar brokers, as evenly as possible.
In most cases, this is true out of the box and one shouldn't worry about it. There are, though,
multiple settings and tools to control the traffic distribution and they require a bit of
context to understand how the traffic is managed in Pulsar.
## Pulsar load manager architecture
### Dynamic assignment of topics to brokers
Topics are dynamically assigned to brokers based on the load conditions of all brokers in the
cluster.
When a clients starts using new topics that are not assigned to any broker, it will trigger a
process that, given the load conditions, it will choose the best suited broker to acquire ownership
of such topic.
In case of partitioned topics, different partitions might be assigned to different brokers. We talk
about "topic" in this context to mean either a non-partitioned topic or one partition of a topic.
The assignment is "dynamic" because it can change very quickly. For example, if the broker owning
the topic crashes, the topic will be reassigned immediately to another broker. Another scenario is
that the broker owning the topic becomes overloaded. In this case too, the topic will be
reassigned to a less loaded broker.
The dynamic assignment is made possible by the stateless nature of brokers. This also ensure that
we can quickly expand or shrink the cluster based on usage.
### Assignment granularity
The assignment of topics/partitions to brokers is not done at the individual level. The reason for
it is to amortize the amount of information that we need to keep track (eg. which topics are
assigned to a particular broker, what's the load on topics for a broker and similar).
Instead of individual topic/partition assignment, each broker takes ownership of a subset of the
topics for a namespace. This subset is called a "*bundle*" and effectively it's a sharding
mechanism.
The namespace is the "administrative" unit: many config knobs or operations are done at the
namespace level.
For assignment, a namespaces is sharded into a list of "bundles", with each bundle comprising
a portion of overall hash range of the namespace.
Topics are assigned to a particular bundle by taking the hash of the topic name and seeing in which
bundle the hash falls into.
Each bundle is independent of the others and thus is independently assigned to different brokers.
### Creating namespaces and bundles
When creating a new namespace, it will set to use the default number of bundles. This is set in
`conf/broker.conf`:
```properties
# When a namespace is created without specifying the number of bundle, this
# value will be used as the default
defaultNumberOfNamespaceBundles=4
```
One can either change the system default, or override it when creating a new namespace:
```shell
$ bin/pulsar-admin namespaces create my-tenant/my-namespace --clusters us-west --bundles 16
```
With this command, we're creating a namespace with 16 initial bundles. Therefore the topics for
this namespaces can immediately be spread across up to 16 brokers.
In general, if the expected traffic and number of topics is known in advance, it's a good idea to
start with a reasonable number of bundles instead of waiting for the system to auto-correct the
distribution.
On a same note, it is normally beneficial to start with more bundles than number of brokers,
primarily because of the hashing nature of the distribution of topics into bundles. For example,
for a namespace with 1000 topics, using something like 64 bundles will achieve a good distribution
of traffic across 16 brokers.
### Unloading topics and bundles
In Pulsar there is an admin operation of "unloading" a topic. Unloading means to close the topics,
release ownership and reassign the topics to a new broker, based on current load.
When unload happens, the client will experience a small latency blip, typically in the order of
tens of milliseconds, while the topic is reassigned.
Unloading is the mechanism used by the load-manager to perform the load shedding, but it can
also be triggered manually, for example to correct the assignments and redistribute traffic
even before having any broker overloaded.
Unloading a topic has no effect on the assignment, but it will just close and reopen the
particular topic:
```shell
pulsar-admin topics unload persistent://tenant/namespace/topic
```
To unload all topics for a namespace and trigger reassignments:
```shell
pulsar-admin namespaces unload tenant/namespace
```
### Namespace bundles splitting
Since the load for the topics in a bundle might change over time, or could just be hard to predict
upfront, bundles can be split in 2 by brokers. The new smaller bundles can then be reassigned
to different brokers.
The splitting happens based on some tunable thresholds. Any existing bundle that exceeds any
of the threshold is a candidate to be split. By default the newly split bundles are also
immediately offloaded to other brokers, to facilitate the traffic distribution.
```properties
# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=true
# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=true
# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000
# maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxSessions=1000
# maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxMsgRate=30000
# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxBandwidthMbytes=100
# maximum number of bundles in a namespace (for auto-split)
loadBalancerNamespaceMaximumBundles=128
```
### Automatic load shedding
In Pulsar's load manager there is support for automatic load shedding. This means that whenever
the system recognized a particular broker is overloaded, it will force some traffic to be
reassigned to less loaded brokers.
When a broker is identifies as overloaded, it will force to "unload" a subset of the bundles, the
ones with higher traffic, that make up for the overload percentage.
For example, the default threshold is 85% and if a broker is over quota at 95% CPU usage, then
it will unload the percent difference plus a 5% margin: `(95% - 85%) + 5% = 15%`.
Given the selection of bundles to offload is based on traffic (as a proxy measure for cpu, network
and memory), broker will unload bundles for at least 15% of traffic.
The automatic load shedding is enabled by default and can be disabled with this setting:
```properties
# Enable/disable automatic bundle unloading for load-shedding
loadBalancerSheddingEnabled=true
```
There are additional settings that apply to shedding:
```properties
# Load shedding interval. Broker periodically checks whether some traffic should be offload from
# some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=1
# Prevent the same topics to be shed and moved to other brokers more that once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30
```
#### Broker overload thresholds
The determinations of when a broker is overloaded is based on threshold of CPU, network and
memory usage. Whenever either of those metrics reaches the threshold, it will trigger the shedding
(if enabled).
By default, overload threshold is set at 85%:
```properties
# Usage threshold to determine a broker as over-loaded
loadBalancerBrokerOverloadedThresholdPercentage=85
```
The usage stats are gathered by Pulsar from the system metrics.
In case of network utilization, in some cases the network interface speed reported by Linux is
not correct and needs to be manually overridden. This is the case in AWS EC2 instances with 1Gbps
NIC speed for which the OS report 10Gbps speed.
Because of the incorrect max speed, the Pulsar load manager might think the broker has not
reached the NIC capacity, while in fact it's already using all the bandwidth and the traffic is
being slowed down.
There is a setting to correct the max NIC speed:
```properties
# Override the auto-detection of the network interfaces max speed.
# This option is useful in some environments (eg: EC2 VMs) where the max speed
# reported by Linux is not reflecting the real bandwidth available to the broker.
# Since the network usage is employed by the load manager to decide when a broker
# is overloaded, it is important to make sure the info is correct or override it
# with the right value here. The configured value can be a double (eg: 0.8) and that
# can be used to trigger load-shedding even before hitting on NIC limits.
loadBalancerOverrideBrokerNicSpeedGbps=
```
When the value is empty, Pulsar will use the value reported by the OS.
---
id: administration-proxy
title: The Pulsar proxy
sidebar_label: Pulsar proxy
---
The [Pulsar proxy](getting-started-concepts-and-architecture.md#pulsar-proxy) is an optional gateway that you can run over the brokers in a Pulsar cluster. We recommend running a Pulsar proxy in cases when direction connections between clients and Pulsar brokers are either infeasible, undesirable, or both, for example when running Pulsar in a cloud environment or on [Kubernetes](https://kubernetes.io) or an analogous platform.
## Running the proxy
In order to run the Pulsar proxy, you need to have both a local [ZooKeeper](https://zookeeper.apache.org) and configuration store quorum set up for use by your Pulsar cluster. For instructions, see [this document](deploy-bare-metal.md). Once you have ZooKeeper set up and have connection strings for both ZooKeeper quorums, you can use the [`proxy`](reference-cli-tools.md#pulsar-proxy) command of the [`pulsar`](reference-cli-tools.md#pulsar) CLI tool to start up the proxy (preferably on its own machine or in its own VM):
To start the proxy:
```bash
$ cd /path/to/pulsar/directory
$ bin/pulsar proxy \
--zookeeper-servers zk-0,zk-1,zk-2 \
--global-zookeeper-servers zk-0,zk-1,zk-2
```
> You can run as many instances of the Pulsar proxy in a cluster as you would like.
## Stopping the proxy
The Pulsar proxy runs by default in the foreground. To stop the proxy, simply stop the process in which it's running.
## Proxy frontends
We recommend running the Pulsar proxy behind some kind of load-distributing frontend, such as an [HAProxy](https://www.digitalocean.com/community/tutorials/an-introduction-to-haproxy-and-load-balancing-concepts) load balancer.
## Using Pulsar clients with the proxy
Once your Pulsar proxy is up and running, preferably behind a load-distributing [frontend](#proxy-frontends), clients can connect to the proxy via whichever address is used by the frontend. If the address were the DNS address `pulsar.cluster.default`, for example, then the connection URL for clients would be `pulsar://pulsar.cluster.default:6650`.
## Proxy configuration
The Pulsar proxy can be configured using the [`proxy.conf`](reference-configuration.md#proxy) configuration file. The following parameters are available in that file:
|Name|Description|Default|
|---|---|---|
|zookeeperServers| The ZooKeeper quorum connection string (as a comma-separated list) ||
|configurationStoreServers| Configuration store connection string (as a comma-separated list) ||
|zookeeperSessionTimeoutMs| ZooKeeper session timeout (in milliseconds) |30000|
|servicePort| The port to use for server binary Protobuf requests |6650|
|servicePortTls| The port to use to server binary Protobuf TLS requests |6651|
|statusFilePath | Path for the file used to determine the rotation status for the proxy instance when responding to service discovery health checks ||
|authenticationEnabled| Whether authentication is enabled for the Pulsar proxy |false|
|authenticationProviders| Authentication provider name list (a comma-separated list of class names) ||
|authorizationEnabled| Whether authorization is enforced by the Pulsar proxy |false|
|authorizationProvider| Authorization provider as a fully qualified class name |org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider|
|brokerClientAuthenticationPlugin| The authentication plugin used by the Pulsar proxy to authenticate with Pulsar brokers ||
|brokerClientAuthenticationParameters| The authentication parameters used by the Pulsar proxy to authenticate with Pulsar brokers ||
|brokerClientTrustCertsFilePath| The path to trusted certificates used by the Pulsar proxy to authenticate with Pulsar brokers ||
|superUserRoles| Role names that are treated as “super-users,” meaning that they will be able to perform all admin ||
|forwardAuthorizationCredentials| Whether client authorization credentials are forwared to the broker for re-authorization. Authentication must be enabled via authenticationEnabled=true for this to take effect. |false|
|maxConcurrentInboundConnections| Max concurrent inbound connections. The proxy will reject requests beyond that. |10000|
|maxConcurrentLookupRequests| Max concurrent outbound connections. The proxy will error out requests beyond that. |10000|
|tlsEnabledInProxy| Whether TLS is enabled for the proxy |false|
|tlsEnabledWithBroker| Whether TLS is enabled when communicating with Pulsar brokers |false|
|tlsCertificateFilePath| Path for the TLS certificate file ||
|tlsKeyFilePath| Path for the TLS private key file ||
|tlsTrustCertsFilePath| Path for the trusted TLS certificate pem file ||
|tlsHostnameVerificationEnabled| Whether the hostname is validated when the proxy creates a TLS connection with brokers |false|
|tlsRequireTrustedClientCertOnConnect| Whether client certificates are required for TLS. Connections are rejected if the client certificate isn’t trusted. |false|
---
id: administration-stats
title: Pulsar stats
sidebar_label: Pulsar statistics
---
## Partitioned topics
|Stat|Description|
|---|---|
|msgRateIn| The sum of all local and replication publishers’ publish rates in messages per second|
|msgThroughputIn| Same as msgRateIn but in bytes per second instead of messages per second|
|msgRateOut| The sum of all local and replication consumers’ dispatch rates in messages per second|
|msgThroughputOut| Same as msgRateOut but in bytes per second instead of messages per second|
|averageMsgSize| Average message size, in bytes, from this publisher within the last interval|
|storageSize| The sum of the ledgers’ storage size for this topic|
|publishers| The list of all local publishers into the topic. There can be anywhere from zero to thousands.|
|producerId| Internal identifier for this producer on this topic|
|producerName| Internal identifier for this producer, generated by the client library|
|address| IP address and source port for the connection of this producer|
|connectedSince| Timestamp this producer was created or last reconnected|
|subscriptions| The list of all local subscriptions to the topic|
|my-subscription| The name of this subscription (client defined)|
|msgBacklog| The count of messages in backlog for this subscription|
|type| This subscription type|
|msgRateExpired| The rate at which messages were discarded instead of dispatched from this subscription due to TTL|
|consumers| The list of connected consumers for this subscription|
|consumerName| Internal identifier for this consumer, generated by the client library|
|availablePermits| The number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. A nonzero value means this consumer is ready to be dispatched messages.|
|replication| This section gives the stats for cross-colo replication of this topic|
|replicationBacklog| The outbound replication backlog in messages|
|connected| Whether the outbound replicator is connected|
|replicationDelayInSeconds| How long the oldest message has been waiting to be sent through the connection, if connected is true|
|inboundConnection| The IP and port of the broker in the remote cluster’s publisher connection to this broker|
|inboundConnectedSince| The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.|
## Topics
|Stat|Description|
|---|---|
|entriesAddedCounter| Messages published since this broker loaded this topic|
|numberOfEntries| Total number of messages being tracked|
|totalSize| Total storage size in bytes of all messages|
|currentLedgerEntries| Count of messages written to the ledger currently open for writing|
|currentLedgerSize| Size in bytes of messages written to ledger currently open for writing|
|lastLedgerCreatedTimestamp| Time when last ledger was created|
|lastLedgerCreationFailureTimestamp| time when last ledger was failed|
|waitingCursorsCount| How many cursors are caught up and waiting for a new message to be published|
|pendingAddEntriesCount| How many messages have (asynchronous) write requests we are waiting on completion|
|lastConfirmedEntry| The ledgerid:entryid of the last message successfully written. If the entryid is -1, then the ledger has been opened or is currently being opened but has no entries written yet.|
|state| The state of the cursor ledger. Open means we have a cursor ledger for saving updates of the markDeletePosition.|
|ledgers| The ordered list of all ledgers for this topic holding its messages|
|cursors| The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.|
|markDeletePosition| The ack position: the last message the subscriber acknowledged receiving|
|readPosition| The latest position of subscriber for reading message|
|waitingReadOp| This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.|
|pendingReadOps| The counter for how many outstanding read requests to the BookKeepers we have in progress|
|messagesConsumedCounter| Number of messages this cursor has acked since this broker loaded this topic|
|cursorLedger| The ledger being used to persistently store the current markDeletePosition|
|cursorLedgerLastEntry| The last entryid used to persistently store the current markDeletePosition|
|individuallyDeletedMessages| If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position|
|lastLedgerSwitchTimestamp| The last time the cursor ledger was rolled over|
---
id: administration-zk-bk
title: ZooKeeper and BookKeeper administration
sidebar_label: ZooKeeper and BookKeeper
---
Pulsar relies on two external systems for essential tasks:
* [ZooKeeper](https://zookeeper.apache.org/) is responsible for a wide variety of configuration- and coordination-related tasks.
* [BookKeeper](http://bookkeeper.apache.org/) is responsible for [persistent storage](getting-started-concepts-and-architecture.md#persistent-storage) of message data.
ZooKeeper and BookKeeper are both open-source [Apache](https://www.apache.org/) projects.
> Skip to the [How Pulsar uses ZooKeeper and BookKeeper](#how-pulsar-uses-zookeeper-and-bookkeeper) section below for a more schematic explanation of the role of these two systems in Pulsar.
' %}
## ZooKeeper
Each Pulsar instance relies on two separate ZooKeeper quorums.
* [Local ZooKeeper](#deploying-local-zookeeper) operates at the cluster level and provides cluster-specific configuration management and coordination. Each Pulsar cluster needs to have a dedicated ZooKeeper cluster.
* [Global ZooKeeper](#deploying-global-zookeeper) operates at the instance level and provides configuration management for the entire system (and thus across clusters). The global ZooKeeper quorum can be provided by an independent cluster of machines or by the same machines used by local ZooKeeper.
### Deploying local ZooKeeper
ZooKeeper manages a variety of essential coordination- and configuration-related tasks for Pulsar.
Deploying a Pulsar instance requires you to stand up one local ZooKeeper cluster *per Pulsar cluster*.
To begin, add all ZooKeeper servers to the quorum configuration specified in the [`conf/zookeeper.conf`](reference-configuration.md#zookeeper) file. Add a `server.N` line for each node in the cluster to the configuration, where `N` is the number of the ZooKeeper node. Here's an example for a three-node cluster:
```properties
server.1=zk1.us-west.example.com:2888:3888
server.2=zk2.us-west.example.com:2888:3888
server.3=zk3.us-west.example.com:2888:3888
```
On each host, you need to specify the ID of the node in each node's `myid` file, which is in each server's `data/zookeeper` folder by default (this can be changed via the [`dataDir`](reference-configuration.md#zookeeper-dataDir) parameter).
> See the [Multi-server setup guide](https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup) in the ZooKeeper documentation for detailed info on `myid` and more.
On a ZooKeeper server at `zk1.us-west.example.com`, for example, you could set the `myid` value like this:
```shell
$ mkdir -p data/zookeeper
$ echo 1 > data/zookeeper/myid
```
On `zk2.us-west.example.com` the command would be `echo 2 > data/zookeeper/myid` and so on.
Once each server has been added to the `zookeeper.conf` configuration and has the appropriate `myid` entry, you can start ZooKeeper on all hosts (in the background, using nohup) with the [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon) CLI tool:
```shell
$ bin/pulsar-daemon start zookeeper
```
### Deploying the configuration store {#configuration-store}
The ZooKeeper cluster configured and started up in the section above is a *local* ZooKeeper cluster used to manage a single Pulsar cluster. In addition to a local cluster, however, a full Pulsar instance also requires a configuration store for handling some instance-level configuration and coordination tasks.
If you're deploying a [single-cluster](#single-cluster-pulsar-instance) instance, then you will not need a separate cluster for the configuration store. If, however, you're deploying a [multi-cluster](#multi-cluster-pulsar-instance) instance, then you should stand up a separate ZooKeeper cluster for configuration tasks.
#### Single-cluster Pulsar instance
If your Pulsar instance will consist of just one cluster, then you can deploy a configuration store on the same machines as the local ZooKeeper quorum but running on different TCP ports.
To deploy a ZooKeeper configuration store in a single-cluster instance, add the same ZooKeeper servers used by the local quorom to the configuration file in [`conf/global_zookeeper.conf`](reference-configuration.md#configuration-store) using the same method for [local ZooKeeper](#local-zookeeper), but make sure to use a different port (2181 is the default for ZooKeeper). Here's an example that uses port 2184 for a three-node ZooKeeper cluster:
```properties
clientPort=2184
server.1=zk1.us-west.example.com:2185:2186
server.2=zk2.us-west.example.com:2185:2186
server.3=zk3.us-west.example.com:2185:2186
```
As before, create the `myid` files for each server on `data/global-zookeeper/myid`.
#### Multi-cluster Pulsar instance
When deploying a global Pulsar instance, with clusters distributed across different geographical regions, the global ZooKeeper serves as a highly available and strongly consistent metadata store that can tolerate failures and partitions spanning whole regions.
The key here is to make sure the ZK quorum members are spread across at least 3
regions and that other regions are running as observers.
Again, given the very low expected load on the global ZooKeeper servers, we can
share the same hosts used for the local ZooKeeper quorum.
For example, let's assume a Pulsar instance with the following clusters `us-west`,
`us-east`, `us-central`, `eu-central`, `ap-south`. Also let's assume, each cluster
will have its own local ZK servers named such as
```
zk[1-3].${CLUSTER}.example.com
```
In this scenario we want to pick the quorum participants from few clusters and
let all the others be ZK observers. For example, to form a 7 servers quorum, we
can pick 3 servers from `us-west`, 2 from `us-central` and 2 from `us-east`.
This will guarantee that writes to global ZooKeeper will be possible even if one
of these regions is unreachable.
The ZK configuration in all the servers will look like:
```properties
clientPort=2184
server.1=zk1.us-west.example.com:2185:2186
server.2=zk2.us-west.example.com:2185:2186
server.3=zk3.us-west.example.com:2185:2186
server.4=zk1.us-central.example.com:2185:2186
server.5=zk2.us-central.example.com:2185:2186
server.6=zk3.us-central.example.com:2185:2186:observer
server.7=zk1.us-east.example.com:2185:2186
server.8=zk2.us-east.example.com:2185:2186
server.9=zk3.us-east.example.com:2185:2186:observer
server.10=zk1.eu-central.example.com:2185:2186:observer
server.11=zk2.eu-central.example.com:2185:2186:observer
server.12=zk3.eu-central.example.com:2185:2186:observer
server.13=zk1.ap-south.example.com:2185:2186:observer
server.14=zk2.ap-south.example.com:2185:2186:observer
server.15=zk3.ap-south.example.com:2185:2186:observer
```
Additionally, ZK observers will need to have:
```properties
peerType=observer
```
##### Starting the service
Once your global ZooKeeper configuration is in place, you can start up the service using [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon)
```shell
$ bin/pulsar-daemon start global-zookeeper
```
### ZooKeeper configuration
In Pulsar, ZooKeeper configuration is handled by two separate configuration files found in the `conf` directory of your Pulsar installation: `conf/zookeeper.conf` for [local ZooKeeper](#local-zookeeper) and `conf/global-zookeeper.conf` for [global ZooKeeper](#global-zookeeper).
#### Local ZooKeeper
Configuration for local ZooKeeper is handled by the [`conf/zookeeper.conf`](reference-configuration.md#zookeeper) file. The table below shows the available parameters:
{% include config.html id="zookeeper" %}
|Name|Description|Default|
|---|---|---|
|tickTime| The tick is the basic unit of time in ZooKeeper, measured in milliseconds and used to regulate things like heartbeats and timeouts. tickTime is the length of a single tick. |2000|
|initLimit| The maximum time, in ticks, that the leader ZooKeeper server allows follower ZooKeeper servers to successfully connect and sync. The tick time is set in milliseconds using the tickTime parameter. |10|
|syncLimit| The maximum time, in ticks, that a follower ZooKeeper server is allowed to sync with other ZooKeeper servers. The tick time is set in milliseconds using the tickTime parameter. |5|
|dataDir| The location where ZooKeeper will store in-memory database snapshots as well as the transaction log of updates to the database. |data/zookeeper|
|clientPort| The port on which the ZooKeeper server will listen for connections. |2181|
|autopurge.snapRetainCount| In ZooKeeper, auto purge determines how many recent snapshots of the database stored in dataDir to retain within the time interval specified by autopurge.purgeInterval (while deleting the rest). |3|
|autopurge.purgeInterval| The time interval, in hours, by which the ZooKeeper database purge task is triggered. Setting to a non-zero number will enable auto purge; setting to 0 will disable. Read this guide before enabling auto purge. |1|
|maxClientCnxns| The maximum number of client connections. Increase this if you need to handle more ZooKeeper clients. |60|
#### Global ZooKeeper
Configuration for global ZooKeeper is handled by the [`conf/global-zookeeper.conf`](reference-configuration.md#global-zookeeper) file. The table below shows the available parameters:
## BookKeeper
BookKeeper is responsible for all durable message storage in Pulsar. BookKeeper is a distributed [write-ahead log](https://en.wikipedia.org/wiki/Write-ahead_logging) WAL system that guarantees read consistency of independent message logs called ledgers. Individual BookKeeper servers are also called *bookies*.
> For a guide to managing message persistence, retention, and expiry in Pulsar, see [this cookbook](cookbooks-retention-expiry.md).
### Deploying BookKeeper
{% include explanations/deploying-bk.md %}
BookKeeper provides [persistent message storage](getting-started-concepts-and-architecture.md#persistent-storage) for Pulsar.
Each Pulsar broker needs to have its own cluster of bookies. The BookKeeper cluster shares a local ZooKeeper quorum with the Pulsar cluster.
### Configuring bookies
BookKeeper bookies can be configured using the [`conf/bookkeeper.conf`](reference-configuration.md#bookkeeper) configuration file. The most important aspect of configuring each bookie is ensuring that the [`zkServers`](reference-configuration.md#bookkeeper-zkServers) parameter is set to the connection string for the Pulsar cluster's local ZooKeeper.
### Starting up bookies
You can start up a bookie in two ways: in the foreground or as a background daemon.
To start up a bookie in the foreground, use the [`bookeeper`](reference-cli-tools.md#bookkeeper)
```shell
$ bin/pulsar-daemon start bookie
```
You can verify that the bookie is working properly using the `bookiesanity` command for the [BookKeeper shell](reference-cli-tools.md#bookkeeper-shell):
```shell
$ bin/bookkeeper shell bookiesanity
```
This will create a new ledger on the local bookie, write a few entries, read them back and finally delete the ledger.
### Hardware considerations
Bookie hosts are responsible for storing message data on disk. In order for bookies to provide optimal performance, it's essential that they have a suitable hardware configuration. There are two key dimensions to bookie hardware capacity:
* Disk I/O capacity read/write
* Storage capacity
Message entries written to bookies are always synced to disk before returning an acknowledgement to the Pulsar broker. To ensure low write latency, BookKeeper is
designed to use multiple devices:
* A **journal** to ensure durability. For sequential writes, it's critical to have fast [fsync](https://linux.die.net/man/2/fsync) operations on bookie hosts. Typically, small and fast [solid-state drives](https://en.wikipedia.org/wiki/Solid-state_drive) (SSDs) should suffice, or [hard disk drives](https://en.wikipedia.org/wiki/Hard_disk_drive) (HDDs) with a [RAID](https://en.wikipedia.org/wiki/RAID)s controller and a battery-backed write cache. Both solutions can reach fsync latency of ~0.4 ms.
* A **ledger storage device** is where data is stored until all consumers have acknowledged the message. Writes will happen in the background, so write I/O is not a big concern. Reads will happen sequentially most of the time and the backlog is drained only in case of consumer drain. To store large amounts of data, a typical configuration will involve multiple HDDs with a RAID controller.
### Configuring BookKeeper
Configurable parameters for BookKeeper bookies can be found in the [`conf/bookkeeper.conf`](reference-configuration.md#bookkeeper) file.
Minimum configuration changes required in `conf/bookkeeper.conf` are:
```properties
# Change to point to journal disk mount point
journalDirectory=data/bookkeeper/journal
# Point to ledger storage disk mount point
ledgerDirectories=data/bookkeeper/ledgers
# Point to local ZK quorum
zkServers=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181
# Change the ledger manager type
ledgerManagerType=hierarchical
```
> Consult the official [BookKeeper docs](http://bookkeeper.apache.org) for more information about BookKeeper.
## BookKeeper persistence policies
In Pulsar, you can set *persistence policies*, at the namespace level, that determine how BookKeeper handles persistent storage of messages. Policies determine four things:
* The number of acks (guaranteed copies) to wait for each ledger entry
* The number of bookies to use for a topic
* How many writes to make for each ledger entry
* The throttling rate for mark-delete operations
### Set persistence policies
You can set persistence policies for BookKeeper at the {% popover namespace %} level.
#### pulsar-admin
Use the [`set-persistence`](reference-pulsar-admin.md#namespaces-set-persistence) subcommand and specify a namespace as well as any policies that you want to apply. The available flags are:
Flag | Description | Default
:----|:------------|:-------
`-a`, `--bookkeeper-ack-quorom` | The number of acks (guaranteed copies) to wait on for each entry | 0
`-e`, `--bookkeeper-ensemble` | The number of {% popover bookies %} to use for topics in the namespace | 0
`-w`, `--bookkeeper-write-quorum` | How many writes to make for each entry | 0
`-r`, `--ml-mark-delete-max-rate` | Throttling rate for mark-delete operations (0 means no throttle) | 0
##### Example
```shell
$ pulsar-admin namespaces set-persistence my-tenant/my-ns \
--bookkeeper-ack-quorom 3 \
--bookeeper-ensemble 2
```
#### REST API
```http
POST /admin/v2/namespaces/:tenant/:namespace/persistence
```
[More info](reference-rest-api.md#/admin/namespaces/:property/:cluster/:namespace/persistence)
#### Java
```java
int bkEnsemble = 2;
int bkQuorum = 3;
int bkAckQuorum = 2;
double markDeleteRate = 0.7;
PersistencePolicies policies =
new PersistencePolicies(ensemble, quorum, ackQuorum, markDeleteRate);
admin.namespaces().setPersistence(namespace, policies);
```
### List persistence policies
You can see which persistence policy currently applies to a namespace.
#### pulsar-admin
Use the [`get-persistence`](reference-pulsar-admin.md#namespaces-get-persistence) subcommand and specify the namespace.
##### Example
```shell
$ pulsar-admin namespaces get-persistence my-tenant/my-ns
{
"bookkeeperEnsemble": 1,
"bookkeeperWriteQuorum": 1,
"bookkeeperAckQuorum", 1,
"managedLedgerMaxMarkDeleteRate": 0
}
```
#### REST API
```http
GET /admin/v2/namespaces/:tenant/:namespace/persistence
```
[More info](reference-rest-api.md#/admin/namespaces/:property/:cluster/:namespace/persistence)
#### Java
```java
PersistencePolicies policies = admin.namespaces().getPersistence(namespace);
```
## How Pulsar uses ZooKeeper and BookKeeper
This diagram illustrates the role of ZooKeeper and BookKeeper in a Pulsar cluster:
![ZooKeeper and BookKeeper](/docs/assets/pulsar-system-architecture.png)
Each Pulsar cluster consists of one or more message brokers. Each broker relies on an ensemble of bookies.
---
id: client-libraries-cpp
title: The Pulsar C++ client
sidebar_label: C++
---
## Supported platforms
The Pulsar C++ client has been successfully tested on **MacOS** and **Linux**.
## Linux
There are recipes that build RPM and Debian packages containing a
statically linked `libpulsar.so` / `libpulsar.a` with all the required
dependencies.
To build the C++ library packages, first build the Java packages:
```shell
mvn install -DskipTests
```
#### RPM
```shell
pulsar-client-cpp/pkg/rpm/docker-build-rpm.sh
```
This will build the RPM inside a Docker container and it will leave the RPMs
in `pulsar-client-cpp/pkg/rpm/RPMS/x86_64/`.
| Package name | Content |
|-----|-----|
| pulsar-client | Shared library `libpulsar.so` |
| pulsar-client-devel | Static library `libpulsar.a` and C++ and C headers |
| pulsar-client-debuginfo | Debug symbols for `libpulsar.so` |
#### Deb
To build Debian packages:
```shell
pulsar-client-cpp/pkg/deb/docker-build-deb.sh
```
Debian packages will be created at `pulsar-client-cpp/pkg/deb/BUILD/DEB/`
| Package name | Content |
|-----|-----|
| pulsar-client | Shared library `libpulsar.so` |
| pulsar-client-dev | Static library `libpulsar.a` and C++ and C headers |
## MacOS
Use the [Homebrew](https://brew.sh/) supplied recipe to build the Pulsar
client lib on MacOS.
```shell
brew install https://raw.githubusercontent.com/apache/incubator-pulsar/master/pulsar-client-cpp/homebrew/libpulsar.rb
```
If using Python 3 on MacOS, add the flag `--with-python3` to the above command.
This will install the package with the library and headers.
## Connection URLs
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here’s an example for localhost:
```http
pulsar://localhost:6650
```
A URL for a production Pulsar cluster may look something like this:
```http
pulsar://pulsar.us-west.example.com:6650
```
If you’re using TLS authentication, the URL will look like something like this:
```http
pulsar+ssl://pulsar.us-west.example.com:6651
```
## Consumer
```c++
Client client("pulsar://localhost:6650");
Consumer consumer;
Result result = client.subscribe("my-topic", "my-subscribtion-name", consumer);
if (result != ResultOk) {
LOG_ERROR("Failed to subscribe: " << result);
return -1;
}
Message msg;
while (true) {
consumer.receive(msg);
LOG_INFO("Received: " << msg
<< " with payload '" << msg.getDataAsString() << "'");
consumer.acknowledge(msg);
}
client.close();
```
## Producer
```c++
Client client("pulsar://localhost:6650");
Producer producer;
Result result = client.createProducer("my-topic", producer);
if (result != ResultOk) {
LOG_ERROR("Error creating producer: " << result);
return -1;
}
// Publish 10 messages to the topic
for (int i = 0; i < 10; i++){
Message msg = MessageBuilder().setContent("my-message").build();
Result res = producer.send(msg);
LOG_INFO("Message sent: " << res);
}
client.close();
```
## Authentication
```cpp
ClientConfiguration config = ClientConfiguration();
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/cacert.pem");
config.setTlsAllowInsecureConnection(false);
config.setAuth(pulsar::AuthTls::create(
"/path/to/client-cert.pem", "/path/to/client-key.pem"););
Client client("pulsar+ssl://my-broker.com:6651", config);
```
此差异已折叠。
---
id: client-libraries-java
title: The Pulsar Java client
sidebar_label: Java
---
The Pulsar Java client can be used both to create Java producers, consumers, and [readers](#readers) of messages and to perform [administrative tasks](admin-api-overview.md). The current version of the Java client is **pulsar:version**.
Javadoc for the Pulsar client is divided up into two domains, by package:
Package | Description | Maven Artifact
:-------|:------------|:--------------
[`org.apache.pulsar.client.api`](/api/client) | The producer and consumer API | [org.apache.pulsar:pulsar-client:pulsar:version](http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7Cpulsar:version%7Cjar)
[`org.apache.pulsar.client.admin`](/api/admin) | The Java [admin API](admin-api-overview.md) | [org.apache.pulsar:pulsar-client-admin:pulsar:version](http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client-admin%7Cpulsar:version%7Cjar)
This document will focus only on the client API for producing and consuming messages on Pulsar topics. For a guide to using the Java admin client, see [The Pulsar admin interface](admin-api-overview.md).
## Installation
The latest version of the Pulsar Java client library is available via [Maven Central](http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7Cpulsar:version%7Cjar). To use the latest version, add the `pulsar-client` library to your build configuration.
### Maven
If you're using Maven, add this to your `pom.xml`:
```xml
<!-- in your <properties> block -->
<pulsar.version>pulsar:version</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
```
### Gradle
If you're using Gradle, add this to your `build.gradle` file:
```groovy
def pulsarVersion = 'pulsar:version'
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
}
```
## Connection URLs
To connect to Pulsar using client libraries, you need to specify a [Pulsar protocol](developing-binary-protocol.md) URL.
Pulsar protocol URLs are assigned to specific clusters, use the `pulsar` scheme and have a default port of 6650. Here's an example for `localhost`:
```http
pulsar://localhost:6650
```
A URL for a production Pulsar cluster may look something like this:
```http
pulsar://pulsar.us-west.example.com:6650
```
If you're using [TLS](administration-auth.md#tls-client-auth) authentication, the URL will look like something like this:
```http
pulsar+ssl://pulsar.us-west.example.com:6651
```
## Client configuration
You can instantiate a {% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %} object using just a URL for the target Pulsar {% popover cluster %}, like this:
```java
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
```
> #### Default broker URLs for standalone clusters
> If you're running a cluster in [standalone mode](getting-started-standalone.md), the broker will be available at the `pulsar://localhost:6650` URL by default.
Check out the Javadoc for the {% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %} class for a full listing of configurable parameters.
> In addition to client-level configuration, you can also apply [producer](#configuring-producers) and [consumer](#configuring-consumers) specific configuration, as you'll see in the sections below.
## Producers
In Pulsar, producers write messages to topics. Once you've instantiated a {% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %} object (as in the section [above](#client-configuration)), you can create a {% javadoc Producer client org.apache.pulsar.client.api.Producer %} for a specific Pulsar {% popover topic %}.
```java
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
// You can then send messages to the broker and topic you specified:
producer.send("My message".getBytes());
```
By default, producers produce messages that consist of byte arrays. You can produce different types, however, by specifying a message [schema](#schemas).
```java
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message");
```
> You should always make sure to close your producers, consumers, and clients when they are no longer needed:
> ```java
> producer.close();
> consumer.close();
> client.close();
> ```
>
> Close operations can also be asynchronous:
> ```java
> producer.closeAsync()
> .thenRun(() -> System.out.println("Producer closed"));
> .exceptionally((ex) -> {
> System.err.println("Failed to close producer: " + ex);
> return ex;
> });
> ```
' %}
### Configuring producers
If you instantiate a `Producer` object specifying only a topic name, as in the example above, the producer will use the default configuration. To use a non-default configuration, there's a variety of configurable parameters that you can set. For a full listing, see the Javadoc for the {% javadoc ProducerBuilder client org.apache.pulsar.client.api.ProducerBuilder %} class. Here's an example:
```java
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
```
### Message routing
When using partitioned topics, you can specify the routing mode whenever you publish messages using a producer. For more on specifying a routing mode using the Java client, see the [Partitioned Topics](cookbooks-partitioned.md) cookbook.
### Async send
You can also publish messages [asynchronously](getting-started-concepts-and-architecture.md#send-modes) using the Java client. With async send, the producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable), the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.
Here's an example async send operation:
```java
producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent", msgId);
});
```
As you can see from the example above, async send operations return a {% javadoc MessageId client org.apache.pulsar.client.api.MessageId %} wrapped in a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture).
### Configuring messages
In addition to a value, it's possible to set additional items on a given message:
```java
producer.newMessage()
.key("my-message-key")
.value("my-async-message".getBytes())
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
```
As for the previous case, it's also possible to terminate the builder chain with `sendAsync()` and
get a future returned.
## Consumers
In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new {% popover consumer %} by first instantiating a {% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %} object and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
Once you've instantiated a {% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %} object, you can create a {% javadoc Consumer client org.apache.pulsar.client.api.Consumer %} by specifying a {% popover topic %} and a [subscription](getting-started-concepts-and-architecture.md#subscription-modes).
```java
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
```
The `subscribe` method will automatically subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a `while` loop. In this example loop, the consumer listens for messages, prints the contents of any message that's received, and then {% popover acknowledges %} that the message has been processed:
```java
do {
// Wait for a message
Message msg = consumer.receive();
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} while (true);
```
### Configuring consumers
If you instantiate a `Consumer` object specifying only a topic and subscription name, as in the example above, the consumer will use the default configuration. To use a non-default configuration, there's a variety of configurable parameters that you can set. For a full listing, see the Javadoc for the {% javadoc ConsumerBuilder client org.apache.pulsar.client.api.ConsumerBuilder %} class. Here's an example:
Here's an example configuration:
```java
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
```
### Async receive
The `receive` method will receive messages synchronously (the consumer process will be blocked until a message is available). You can also use [async receive](getting-started-concepts-and-architecture.md#receive-modes), which will return immediately with a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture) object that completes once a new message is available.
Here's an example:
```java
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
```
Async receive operations return a {% javadoc Message client org.apache.pulsar.client.api.Message %} wrapped inside of a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture).
### Multi-topic subscriptions
In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using [multi-topic subscriptions](getting-started-concepts-and-architecture.md#multi-topic-subscriptions). To use multi-topic subscriptions you can supply either a regular expression (regex) or a `List` of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.
Here are some examples:
```java
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
```
You can also subscribe to an explicit list of topics (across namespaces if you wish):
```java
List<String> topics = Arrays.asList(
"topic-1",
"topic-2",
"topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
// Alternatively:
Consumer multiTopicConsumer = consumerBuilder
.topics(
"topic-1",
"topic-2",
"topic-3"
)
.subscribe();
```
You can also subscribe to multiple topics asynchronously using the `subscribeAsync` method rather than the synchronous `subscribe` method. Here's an example:
```java
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(consumer -> {
do {
try {
Message msg = consumer.receive();
// Do something with the received message
} catch (PulsarClientException e) {
e.printStackTrace();
}
} while (true);
});
```
## Reader interface {#readers}
With the [reader interface](getting-started-concepts-and-architecture.md#reader-interface), Pulsar clients can "manually position" themselves within a topic, reading all messages from a specified message onward. The Pulsar API for Java enables you to create {% javadoc Reader client org.apache.pulsar.client.api.Reader %} objects by specifying a topic, a {% javadoc MessageId client org.apache.pulsar.client.api.MessageId %}, and {% javadoc ReaderConfiguration client org.apache.pulsar.client.api.ReaderConfiguration %}.
Here's an example:
```java
ReaderConfiguration conf = new ReaderConfiguration();
byte[] msgIdBytes = // Some message ID byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
while (true) {
Message message = reader.readNext();
// Process message
}
```
In the example above, a `Reader` object is instantiated for a specific topic and message (by ID); the reader then iterates over each message in the topic after the message identified by `msgIdBytes` (how that value is obtained depends on the application).
The code sample above shows pointing the `Reader` object to a specific message (by ID), but you can also use `MessageId.earliest` to point to the earliest available message on the topic of `MessageId.latest` to point to the most recent available message.
## Schemas
In Pulsar, all message data consists of byte arrays "under the hood." [Message schemas](getting-started-concepts-and-architecture.md#schema-registry) enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a [producer](#producers) without specifying a schema, then the producer can only produce messages of type `byte[]`. Here's an example:
```java
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create();
```
The producer above is equivalent to a `Producer<byte[]>` (in fact, you should *always* explicitly specify the type). If you'd like to use a producer for a different type of data, you'll need to specify a **schema** that informs Pulsar which data type will be transmitted over the {% popover topic %}.
### Schema example
Let's say that you have a `SensorReading` class that you'd like to transmit over a Pulsar topic:
```java
public class SensorReading {
public float temperature;
public SensorReading(float temperature) {
this.temperature = temperature;
}
// A no-arg constructor is required
public SensorReading() {
}
public float getTemperature() {
return temperature;
}
public void setTemperature(float temperature) {
this.temperature = temperature;
}
}
```
You could then create a `Producer<SensorReading>` (or `Consumer<SensorReading>`) like so:
```java
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
```
The following schema formats are currently available for Java:
* No schema or the byte array schema (which can be applied using `Schema.BYTES`):
```java
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
.topic("some-raw-bytes-topic")
.create();
```
Or, equivalently:
```java
Producer<byte[]> bytesProducer = client.newProducer()
.topic("some-raw-bytes-topic")
.create();
```
* `String` for normal UTF-8-encoded string data. This schema can be applied using `Schema.STRING`:
```java
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("some-string-topic")
.create();
```
* JSON schemas can be created for POJOs using the `JSONSchema` class. Here's an example:
```java
Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class);
Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema)
.topic("some-pojo-topic")
.create();
```
## Authentication
Pulsar currently supports two authentication schemes: [TLS](administration-auth.md#tls-client-auth) and [Athenz](administration-auth.md#athenz). The Pulsar Java client can be used with both.
### TLS Authentication
To use [TLS](administration-auth.md#tls-client-auth), you need to set TLS to `true` using the `setUseTls` method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.
Here's an example configuration:
```java
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", "/path/to/client-cert.pem");
authParams.put("tlsKeyFile", "/path/to/client-key.pem");
Authentication tlsAuth = AuthenticationFactory
.create(AuthenticationTls.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(tlsAuth)
.build();
```
### Athenz
To use [Athenz](administration-auth.md#athenz) as an authentication provider, you need to [use TLS](#tls-authentication) and provide values for four parameters in a hash:
* `tenantDomain`
* `tenantService`
* `providerDomain`
* `privateKey`
You can also set an optional `keyId`. Here's an example configuration:
```java
Map<String, String> authParams = new HashMap<>();
authParams.put("tenantDomain", "shopping"); // Tenant domain name
authParams.put("tenantService", "some_app"); // Tenant service name
authParams.put("providerDomain", "pulsar"); // Provider domain name
authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
Authentication athenzAuth = AuthenticationFactory
.create(AuthenticationAthenz.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(athenzAuth)
.build();
```
> #### Supported pattern formats
> The `privateKey` parameter supports the following three pattern formats:
> * `file:///path/to/file`
> * `file:/path/to/file`
> * `data:application/x-pem-file;base64,<base64-encoded value>`
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
---
id: develop-codebase
title: The Pulsar codebase
sidebar_label: Codebase
---
The panels below describe some of the core directories in the [Pulsar codebase](https://github.com/apache/incubator-pulsar).
{% include codebase.html %}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册