I am using github.com/Shopify/sarama package to interact with Kafka. In my current approach I could connect to the broker and get all topic names without a problem (Consumer Code below).
However, when I try to delete some topic using the admin client (Admin Code below) I receive the “dial tcp: lookup ip-x-x-xx.ec2.internal: no such host” error.
I am out of ideas why I do receive this error. I would much appreciate any hints or possible solutions.
Consumer
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
//get broker
cluster, err := sarama.NewConsumer("localhost:9092", config)
if err != nil {
panic(err)
}
defer func() {
if err := cluster.Close(); err != nil {
panic(err)
}
}()
//get all topic from cluster
topics, _ := cluster.Topics()
Admin
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_4_0_0
//admin broker
admin, err := sarama.NewClusterAdmin("localhost:9092", config)
if err != nil {
panic(err)
}
defer func() {
if err := admin.Close(); err != nil {
panic(err)
}
}()
topic := []string{"test-topic"}
output := admin.DeleteTopic(topic)
if output == nil {
fmt.Printf(" delete - %sn", topic[0])
} else {
fmt.Println(output)
}
Note I am connecting over bastion instance into a remote by forwarding ssh ports.
Update
after setting sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
I get the following:
[sarama] 2020/03/25 02:08:03 Initializing new client
[sarama] 2020/03/25 02:08:03 client/metadata fetching metadata for all topics from broker localhost:9092
[sarama] 2020/03/25 02:08:04 Connected to broker at localhost:9092 (unregistered)
[sarama] 2020/03/25 02:08:04 client/brokers registered new broker #1001 at ip-x-x-x-1.ec2.internal:9092
[sarama] 2020/03/25 02:08:04 client/brokers registered new broker #1003 at ip-x-x-x-2.ec2.internal:9092
[sarama] 2020/03/25 02:08:04 client/brokers registered new broker #1002 at ip-x-x-x-3.ec2.internal:9092
[sarama] 2020/03/25 02:08:04 Successfully initialized new client
[sarama] 2020/03/25 02:08:04 Failed to connect to broker ip-x-x-x-3.ec2.internal:9092: dial tcp: lookup ip-x-x-x-3.ec2.internal: no such host`
Update 2
my kafka server.properties:
advertised.listeners=INTERNAL://ip-x-x-x-1.ec2.internal:9091,EXTERNAL_INSECURE://ip-x-x-x-1.ec2.internal:9092
listeners=INTERNAL://:9091,EXTERNAL_INSECURE://:9092
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL_INSECURE:PLAINTEXT
2
Answers
so adding the brokers address to the known hosts
/etc/hosts
on my local machine where I execute the code did the trick./etc/hosts:
Though, I still don't understand why I didn't need to do this step when I was using
sarama.NewConsumer()
.When a client connects to the broker (in your case
localhost:9092
) the broker provides the client details of all the other brokers in the cluster. You can see this in your logs:Initial connection
Details of brokers:
The problem you have is that your client will subsequently use these broker details for further communication with the cluster. These addresses that the broker gives out are known as the advertised listeners. That is, the listeners that the broker “advertises”.
This means that your client must be able to resolve and connect to the host & port of the listener(s) that the broker returns at the initial connection.
The reason hacking your
/etc/hosts
worked is that your local client can then resolve these addresses back to localhost and then the SSH forwarding works. But this is just a hack.You should set your
advertised.listeners
in your broker configuration to addresses that a client can resolve (without needing any client-side/etc/hosts
hacks).To read more about this in detail see https://rmoff.net/2018/08/02/kafka-listeners-explained/
Edit: To be clear, you should set your
advertised.listeners
on each broker to the address that your clients can resolve – so if that’s vialocalhost SSH forwarding, set
advertised.listeners
tolocalhost:9092
.