将Kafka在zookeeper的默认目录,修改为自定义目录时,启动失败,抛出以下异常
INFO 2012-03-06 02:39:04,072 main kafka.server.KafkaZooKeeper Registering broker /brokers/ids/1
FATAL 2012-03-06 02:39:04,111 main kafka.server.KafkaServer Fatal error during startup.
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:620)
at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:48)
at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:60)
at kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:72)
at kafka.server.KafkaZooKeeper.registerBrokerInZk(KafkaZooKeeper.scala:57)
at kafka.log.LogManager.startup(LogManager.scala:124)
at kafka.server.KafkaServer.startup(KafkaServer.scala:80)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:47)
at kafka.Kafka$.main(Kafka.scala:60)
at kafka.Kafka.main(Kafka.scala)
经确定这是一个Bug,由于initZk()方法没有对路径进行处理导致
原代码:
private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
解决代码:
private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val chroot = {
if (config.zkConnect.indexOf("/") > 0)
config.zkConnect.substring(config.zkConnect.indexOf("/"))
else
""
}
if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
info("Created zookeeper path " + chroot)
zkClientForChrootCreation.close()
}
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
BUG
https://issues.apache.org/jira/browse/KAFKA-294
使用的版本是:kafka_2.10-0.8.1.1.tgz |