未验证 提交 79dd772f 编写于 作者: M Matteo Merli 提交者: GitHub

Fixed intermittent test failures with "bind error" (#2725)

上级 c7b91192
......@@ -72,7 +72,7 @@ public class SLAMonitoringTest {
void setup() throws Exception {
log.info("---- Initializing SLAMonitoringTest -----");
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// start brokers
......
......@@ -112,7 +112,7 @@ public class AntiAffinityNamespaceGroupTest {
void setup() throws Exception {
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// Start broker 1
......
......@@ -120,7 +120,7 @@ public class LoadBalancerTest {
@BeforeMethod
void setup() throws Exception {
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(),
SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
......
......@@ -146,7 +146,7 @@ public class ModularLoadManagerImplTest {
void setup() throws Exception {
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// Start broker 1
......
......@@ -127,7 +127,7 @@ public class SimpleLoadManagerImplTest {
void setup() throws Exception {
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// Start broker 1
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
......@@ -44,7 +45,7 @@ public class AdvertisedAddressTest {
@BeforeMethod
public void setup() throws Exception {
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
ServiceConfiguration config = new ServiceConfiguration();
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
......
......@@ -27,6 +27,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
......@@ -53,7 +54,7 @@ import com.google.common.collect.Sets;
/**
*/
public class BacklogQuotaManagerTest {
protected static int BROKER_SERVICE_PORT = 16650;
protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
PulsarService pulsar;
ServiceConfiguration config;
......@@ -62,15 +63,15 @@ public class BacklogQuotaManagerTest {
LocalBookkeeperEnsemble bkEnsemble;
private final int ZOOKEEPER_PORT = 12759;
protected final int BROKER_WEBSERVICE_PORT = 15782;
private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
@BeforeMethod
void setup() throws Exception {
try {
// start local bookie and zookeeper
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// start pulsar service
......
......@@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
......@@ -60,7 +61,7 @@ import org.testng.annotations.Test;
/**
*/
public class BrokerBkEnsemblesTests {
protected static int BROKER_SERVICE_PORT = 16650;
protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
protected PulsarService pulsar;
ServiceConfiguration config;
......@@ -69,10 +70,9 @@ public class BrokerBkEnsemblesTests {
LocalBookkeeperEnsemble bkEnsemble;
private final int ZOOKEEPER_PORT = 12759;
protected final int BROKER_WEBSERVICE_PORT = 15782;
private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
protected final int bkBasePort = 5001;
private final int numberOfBookies;
public BrokerBkEnsemblesTests() {
......@@ -87,7 +87,7 @@ public class BrokerBkEnsemblesTests {
protected void setup() throws Exception {
try {
// start local bookie and zookeeper
bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, 5001);
bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// start pulsar service
......
......@@ -105,7 +105,7 @@ public class ReplicatorTestBase {
// Start region 1
int zkPort1 = PortManager.nextFreePort();
bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
bkEnsemble1.start();
int webServicePort1 = PortManager.nextFreePort();
......@@ -143,7 +143,7 @@ public class ReplicatorTestBase {
// Start zk & bks
int zkPort2 = PortManager.nextFreePort();
bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
bkEnsemble2.start();
int webServicePort2 = PortManager.nextFreePort();
......@@ -177,7 +177,7 @@ public class ReplicatorTestBase {
// Start zk & bks
int zkPort3 = PortManager.nextFreePort();
bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
bkEnsemble3.start();
int webServicePort3 = PortManager.nextFreePort();
......
......@@ -104,7 +104,7 @@ public class V1_ReplicatorTestBase {
// Start region 1
int zkPort1 = PortManager.nextFreePort();
bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
bkEnsemble1.start();
int webServicePort1 = PortManager.nextFreePort();
......@@ -142,7 +142,7 @@ public class V1_ReplicatorTestBase {
// Start zk & bks
int zkPort2 = PortManager.nextFreePort();
bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
bkEnsemble2.start();
int webServicePort2 = PortManager.nextFreePort();
......@@ -176,7 +176,7 @@ public class V1_ReplicatorTestBase {
// Start zk & bks
int zkPort3 = PortManager.nextFreePort();
bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
bkEnsemble3.start();
int webServicePort3 = PortManager.nextFreePort();
......
......@@ -871,7 +871,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// Start region 1
int zkPort1 = PortManager.nextFreePort();
bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
bkEnsemble1.start();
int webServicePort1 = PortManager.nextFreePort();
......@@ -901,7 +901,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// Start zk & bks
int zkPort2 = PortManager.nextFreePort();
bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
bkEnsemble2.start();
int webServicePort2 = PortManager.nextFreePort();
......@@ -927,7 +927,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// Start zk & bks
int zkPort3 = PortManager.nextFreePort();
bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
bkEnsemble3.start();
int webServicePort3 = PortManager.nextFreePort();
......
......@@ -98,7 +98,7 @@ public class PulsarWorkerAssignmentTest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
String brokerServiceUrl = "http://127.0.0.1:" + brokerServicePort;
......
......@@ -101,7 +101,7 @@ public class PulsarFunctionAdminTest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
......@@ -126,7 +126,7 @@ public class PulsarFunctionAdminTest {
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
functionsWorkerService = createPulsarFunctionWorker(config);
urlTls = new URL(brokerServiceUrl);
......@@ -160,11 +160,11 @@ public class PulsarFunctionAdminTest {
workerConfig.getClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();
TenantInfo propAdmin = new TenantInfo();
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);
Thread.sleep(100);
}
......
......@@ -92,7 +92,7 @@ public class PulsarFunctionTlsTest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
config = spy(new ServiceConfiguration());
......
......@@ -133,7 +133,7 @@ public class PulsarSinkE2ETest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
......
......@@ -66,6 +66,14 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>managed-ledger-original</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-common</artifactId>
......
......@@ -39,6 +39,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
......@@ -81,8 +82,25 @@ public class LocalBookkeeperEnsemble {
int numberOfBookies;
private boolean clearOldData = false;
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort) {
this(numberOfBookies, zkPort, bkBasePort, null, null, true);
private static class BasePortManager implements Supplier<Integer> {
private int port;
public BasePortManager(int basePort) {
this.port = basePort;
}
@Override
public synchronized Integer get() {
return port++;
}
}
private final Supplier<Integer> portManager;
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier<Integer> portManager) {
this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager);
}
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
......@@ -103,10 +121,22 @@ public class LocalBookkeeperEnsemble {
String bkDataDirName,
boolean clearOldData,
String advertisedAddress) {
this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress,
new BasePortManager(bkBasePort));
}
public LocalBookkeeperEnsemble(int numberOfBookies,
int zkPort,
int streamStoragePort,
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
String advertisedAddress,
Supplier<Integer> portManager) {
this.numberOfBookies = numberOfBookies;
this.HOSTPORT = "127.0.0.1:" + zkPort;
this.ZooKeeperDefaultPort = zkPort;
this.initialPort = bkBasePort;
this.portManager = portManager;
this.streamStoragePort = streamStoragePort;
this.zkDataDirName = zkDataDirName;
this.bkDataDirName = bkDataDirName;
......@@ -128,7 +158,6 @@ public class LocalBookkeeperEnsemble {
String bkDataDirName;
BookieServer bs[];
ServerConfiguration bsConfs[];
Integer initialPort = 5000;
// Stream/Table Storage
StreamStorageLifecycleComponent streamStorage;
......@@ -221,7 +250,7 @@ public class LocalBookkeeperEnsemble {
cleanDirectory(bkDataDir);
}
int bookiePort = initialPort + i;
int bookiePort = portManager.get();
// Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully
String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort);
......@@ -257,7 +286,7 @@ public class LocalBookkeeperEnsemble {
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE);
}
bs[i].start();
LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, initialPort + i,
LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, bookiePort,
bkDataDir.getAbsolutePath());
}
}
......
......@@ -18,18 +18,18 @@
*/
package org.apache.pulsar.zookeeper;
import java.io.File;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
import java.io.File;
import org.apache.bookkeeper.test.PortManager;
import org.apache.commons.io.FileUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.bookkeeper.test.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@Test
public class LocalBookkeeperEnsembleTest {
......@@ -62,16 +62,14 @@ public class LocalBookkeeperEnsembleTest {
final int numBk = 1;
final int zkPort = PortManager.nextFreePort();
final int bkPort = PortManager.nextFreePort();
// Start local Bookies/ZooKeepers and confirm that they are running at specified ports
LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, zkPort, bkPort);
LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, zkPort, () -> PortManager.nextFreePort());
ensemble.start();
assertTrue(ensemble.getZkServer().isRunning());
assertEquals(ensemble.getZkServer().getClientPort(), zkPort);
assertTrue(ensemble.getZkClient().getState().isConnected());
assertTrue(ensemble.getBookies()[0].isRunning());
assertEquals(ensemble.getBookies()[0].getLocalAddress().getPort(), bkPort);
// Stop local Bookies/ZooKeepers and confirm that they are correctly closed
ensemble.stop();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册