It’s my first Kafka program.
From a kafka_2.13-3.1.0
instance, I created a Kafka topic poids_garmin_brut
and filled it with this csv
:
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic poids_garmin_brut
kafka-console-producer.sh --broker-list localhost:9092 --topic poids_garmin_brut < "Poids(1).csv"
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
" 12 Fév. 2022",
06:17,72.2 kg,0.0 kg,22.8,25.3 %,29.7 kg,3.6 kg,54.5 %,
[...]
And at anytime now, before or after running the program I’ll show, its content can be displayed by a kafka-console-consumer
command:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic poids_garmin_brut --from-beginning
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
" 12 Fév. 2022",
06:17,72.2 kg,0.0 kg,22.8,25.3 %,29.7 kg,3.6 kg,54.5 %,
" 11 Fév. 2022",
05:54,72.2 kg,0.1 kg,22.8,25.6 %,29.7 kg,3.5 kg,54.3 %,
" 10 Fév. 2022",
06:14,72.3 kg,0.0 kg,22.8,25.9 %,29.7 kg,3.5 kg,54.1 %,
" 9 Fév. 2022",
06:06,72.3 kg,0.5 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 8 Fév. 2022",
07:14,71.8 kg,0.7 kg,22.7,26.3 %,29.6 kg,3.5 kg,53.8 %,
Here is the Java program, based on org.apache.kafka:kafka-streams:3.1.0
dependency, extracting this topic as a stream:
package extracteur.garmin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.*;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Properties;
@SpringBootApplication
public class Kafka {
/** Logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(Kafka.class);
public static void main(String[] args) {
LOGGER.info("L'extracteur de données Garmin démarre...");
/* Les données du fichier CSV d'entrée sont sous cette forme :
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
*/
// Création d'un flux sans clef et valeur : chaîne de caractères.
StreamsBuilder builder = new StreamsBuilder();
KStream<Void,String> stream = builder.stream("poids_garmin_brut");
// C'est un foreach de Kafka, pas de lambda java. Il est lazy.
stream.foreach((key, value) -> {
LOGGER.info(value);
});
KafkaStreams streams = new KafkaStreams(builder.build(), config());
streams.start();
// Fermer le flux Kafka quand la VM s'arrêtera, en faisant appeler
streams.close();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
/**
* Propriétés pour le démarrage.
* @return propriétés de configuration.
*/
private static Properties config() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return config;
}
}
But, while the logs don’t seem to report any error during execution, my program doesn’t enter the stream.forEach
, and therefore: displays no content from that topic.
(in this log I removed the dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088-
part of [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088-StreamThread-1]
you should read inside, for SO message length and lisibility. And org.apache.kafka
becames o.a.k.
).
/usr/lib/jvm/java-1.11.0-openjdk-amd64/bin/java -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true -javaagent:/opt/idea-IU-212.5284.40/lib/idea_rt.jar=41397:/opt/idea-IU-212.5284.40/bin -Dfile.encoding=UTF-8 -classpath /home/lebihan/dev/Java/garmin/target/classes:/home/lebihan/.m2/repository/org/slf4j/slf4j-api/1.7.33/slf4j-api-1.7.33.jar:/home/lebihan/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.33/log4j-over-slf4j-1.7.33.jar:/home/lebihan/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar:/home/lebihan/.m2/repository/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.6.3/spring-boot-starter-web-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter/2.6.3/spring-boot-starter-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot/2.6.3/spring-boot-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.6.3/spring-boot-autoconfigure-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.6.3/spring-boot-starter-logging-2.6.3.jar:/home/lebihan/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.17.1/log4j-to-slf4j-2.17.1.jar:/home/lebihan/.m2/repository/org/apache/logging/log4j/log4j-api/2.17.1/log4j-api-2.17.1.jar:/home/lebihan/.m2/repository/org/slf4j/jul-to-slf4j/1.7.33/jul-to-slf4j-1.7.33.jar:/home/lebihan/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/home/lebihan/.m2/repository/org/yaml/snakeyaml/1.29/snakeyaml-1.29.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.6.3/spring-boot-starter-json-2.6.3.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.13.1/jackson-datatype-jdk8-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.13.1/jackson-datatype-jsr310-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.13.1/jackson-module-parameter-names-2.13.1.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.6.3/spring-boot-starter-tomcat-2.6.3.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.56/tomcat-embed-core-9.0.56.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.56/tomcat-embed-el-9.0.56.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.56/tomcat-embed-websocket-9.0.56.jar:/home/lebihan/.m2/repository/org/springframework/spring-web/5.3.15/spring-web-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-beans/5.3.15/spring-beans-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-webmvc/5.3.15/spring-webmvc-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-aop/5.3.15/spring-aop-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-context/5.3.15/spring-context-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-expression/5.3.15/spring-expression-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-core/5.3.15/spring-core-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-jcl/5.3.15/spring-jcl-5.3.15.jar:/home/lebihan/.m2/repository/org/apache/kafka/kafka-streams/3.1.0/kafka-streams-3.1.0.jar:/home/lebihan/.m2/repository/org/apache/kafka/kafka-clients/3.0.0/kafka-clients-3.0.0.jar:/home/lebihan/.m2/repository/com/github/luben/zstd-jni/1.5.0-2/zstd-jni-1.5.0-2.jar:/home/lebihan/.m2/repository/org/lz4/lz4-java/1.7.1/lz4-java-1.7.1.jar:/home/lebihan/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.1/snappy-java-1.1.8.1.jar:/home/lebihan/.m2/repository/org/rocksdb/rocksdbjni/6.22.1.1/rocksdbjni-6.22.1.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.13.1/jackson-annotations-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.13.1/jackson-databind-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.13.1/jackson-core-2.13.1.jar extracteur.garmin.Kafka
07:57:49.720 [main] INFO extracteur.garmin.Kafka - L'extracteur de données Garmin démarre...
07:57:49.747 [main] INFO o.a.k.streams.StreamsConfig - StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = dev1
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class o.a.k.streams.errors.LogAndFailExceptionHandler
default.key.serde = class o.a.k.common.serialization.Serdes$VoidSerde
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class o.a.k.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class o.a.k.streams.processor.FailOnInvalidTimestamp
default.value.serde = class o.a.k.common.serialization.Serdes$StringSerde
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = -1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
task.timeout.ms = 300000
topology.optimization = none
upgrade.from = null
window.size.ms = null
windowed.inner.class.serde = null
windowstore.changelog.additional.retention.ms = 86400000
07:57:49.760 [main] INFO o.a.k.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id = admin
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269788
07:57:49.793 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] Kafka Streams version: 3.1.0
07:57:49.793 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] Kafka Streams commit ID: 37edeed0777bacb3
07:57:49.800 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating restore consumer client
07:57:49.802 [main] INFO o.a.k.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-restore-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class o.a.k.clients.consumer.RangeAssignor, class o.a.k.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269816
07:57:49.818 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating thread producer client
07:57:49.820 [main] INFO o.a.k.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-producer
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class o.a.k.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class o.a.k.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class o.a.k.common.serialization.ByteArraySerializer
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269828
07:57:49.830 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating consumer client
07:57:49.831 [main] INFO o.a.k.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = false
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = dev1
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [o.a.k.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
replication.factor = -1
windowstore.changelog.additional.retention.ms = 86400000
07:57:49.836 [main] INFO o.a.k.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [StreamThread-1-consumer] Cooperative rebalancing protocol is enabled now
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269840
07:57:49.844 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] State transition from CREATED to REBALANCING
07:57:49.845 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Starting
07:57:49.845 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from CREATED to STARTING
07:57:49.845 [StreamThread-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Subscribed to topic(s): poids_garmin_brut
07:57:49.845 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] State transition from REBALANCING to PENDING_SHUTDOWN
07:57:49.846 [kafka-streams-close-thread] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Informed to shut down
07:57:49.846 [kafka-streams-close-thread] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
07:57:49.919 [kafka-producer-network-thread | StreamThread-1-producer] INFO o.a.k.clients.Metadata - [Producer clientId=StreamThread-1-producer] Cluster ID: QKJGs4glRAy7besZxXNCrg
07:57:49.920 [StreamThread-1] INFO o.a.k.clients.Metadata - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Cluster ID: QKJGs4glRAy7besZxXNCrg
07:57:49.921 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Discovered group coordinator debian:9092 (id: 2147483647 rack: null)
07:57:49.922 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] (Re-)joining group
07:57:49.929 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Request joining group due to: need to re-join with the given member-id
07:57:49.929 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] (Re-)joining group
07:57:49.930 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Successfully joined group with generation Generation{generationId=3, memberId='StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c', protocol='stream'}
07:57:49.936 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] All members participating in this rebalance:
d1c8ce47-6fbf-41b7-b8aa-e3d094703088: [StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c].
07:57:49.938 [StreamThread-1] INFO o.a.k.streams.processor.internals.assignment.HighAvailabilityTaskAssignor - Decided on assignment: {d1c8ce47-6fbf-41b7-b8aa-e3d094703088=[activeTasks: ([0_0]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 1]} with no followup probing rebalance.
07:57:49.938 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Assigned tasks [0_0] including stateful [] to clients as:
d1c8ce47-6fbf-41b7-b8aa-e3d094703088=[activeTasks: ([0_0]) standbyTasks: ([])].
07:57:49.939 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Client d1c8ce47-6fbf-41b7-b8aa-e3d094703088 per-consumer assignment:
prev owned active {}
prev owned standby {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=[]}
assigned active {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=[0_0]}
revoking active {}
assigned standby {}
07:57:49.939 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Finished stable assignment of tasks, no followup rebalances required.
07:57:49.939 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Finished assignment for group at generation 3: {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=Assignment(partitions=[poids_garmin_brut-0], userDataSize=52)}
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Successfully synced group in generation Generation{generationId=3, memberId='StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c', protocol='stream'}
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Updating assignment with
Assigned partitions: [poids_garmin_brut-0]
Current owned partitions: []
Added partitions (assigned - owned): [poids_garmin_brut-0]
Revoked partitions (owned - assigned): []
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Notifying assignor about the new Assignment(partitions=[poids_garmin_brut-0], userDataSize=52)
07:57:49.944 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
07:57:49.944 [StreamThread-1] INFO o.a.k.streams.processor.internals.TaskManager - stream-thread [StreamThread-1] Handle new assignment with:
New active tasks: [0_0]
New standby tasks: []
Existing active tasks: []
Existing standby tasks: []
07:57:49.950 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Adding newly assigned partitions: poids_garmin_brut-0
07:57:49.953 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Found no committed offset for partition poids_garmin_brut-0
07:57:49.954 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Shutting down
[...]
Process finished with exit code 0
What am I doing wrong?
-
I’m running my Kafka instance and its Java program locally, on the same PC.
-
I’ve experienced
3.1.0
and2.8.1
versions of Kafka, or removed any traces of Spring in the Java program without success.
I belive I’m facing a configuration problem.
2
Answers
I fear having shamefully found the cause of my trouble :
The
streams.start()
starts the listening of the stream.But the
steams.close()
ends it immediately and close the program a second after!And what I saw, commenting that
streams.close()
statement, is that the first message frompoids_garmin_brut
topic takes around 20 seconds to be detected and displayed, provided new messages are injected in the topic, then others new injected ones comes instantaneously.I believed that the property
had for goal to ask Kafka to read a topic from its beginning, but it's not the case.
Following should work.
OUTPUT