skip to Main Content

I have a KStreams application running inside a Docker container which uses a persistent key-value store. My runtime environment is Docker 1.13.1 on RHEL 7.

I have configured state.dir with a value of /tmp/kafka-streams (which is the default).

When I start this container using "docker run", I mount /tmp/kafka-streams to a directory on my host machine which is, say for example, /mnt/storage/kafka-streams.

My application.id is "myapp". I have 288 partitions in my input topic which means my state store / changelog topic will also have that many partitions. Accordingly, when start my Docker container, I see that there a folder with the number of the partition such as 0_1, 0_2….0_288 under /mnt/storage/kafka-streams/myapp/

When I shutdown my application, I do not see any .checkpoint file in any of the partition directories.

And when I restart my application, it starts fetching the records from the changelog topic rather than reading from local disk. I suspect this is because there is no .checkpoint file in any of the partition directories. (Note : I can see the .lock and rocksdb sub-directory inside the partition directories)

This is what I see in the startup log. It seems to be bootstrapping the entire state store from the changelog topic i.e. performing network I/O rather than reading from what is on disk :

2022-05-31T12:08:02.791 [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread [myapp-f6900c0a-50ca-43a0-8a4b-95eaa
d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitte
d data in stores we have to treat it as a task corruption error and wipe out the local state of task 0_170 before re-bootstrapping
2022-05-31T12:08:02.791 [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
9e5093-StreamThread-122] Detected the states of tasks [0_170] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] are corrupted and hence needs to be re-initialized
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
        at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
        at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
        at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
        at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
  1. Should I expect to see a .checkpoint file in each of the partition directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my application ?

  2. Is this an issue because I am running my KStreams app inside a docker container ? If there were permissions issues, then I would have expected to see issues in creating the other files such as .lock or rocksdb folder (and it’s contents).

  3. If I run this application as a standalone/runnable Springboot JAR on my Windows laptop i.e. not in a Docker container, I can see that it creates the .checkpoint file as expected.

2

Answers


  1. Chosen as BEST ANSWER

    My java application inside the Docker container is run via an entrypoint script. It seems that if I stop the container, then it does not send the TERM signal to my java process and hence does not have a clean shutdown of the java KStreams application.

    So, all I needed to do was to find a way to somehow send a TERM signal to my java application inside the container.

    For the moment, I just ssh'ed into the container and did a kill -s TERM <pid> for my java process. Once I did that, it resulted in a clean shutdown and thus created the .checpoint file as well.


  2. I’m assuming you’ve configured the ENTRYPOINT in your DockerFile to a script, right?

    Something like:

    ENTRYPOINT ["/bin/run.sh"]
    

    And in that script you’ll be invoking java:

    #!/bin/bash
    
    java <cp, and thing to run>
    

    If this is the case, then TERM signals will NOT be forwarded to your Java process. You need to use exec:

    #!/bin/bash
    
    exec java <cp, and thing to run>
    

    With exec, TERM signals will be forwarded.

    Why?

    The reason your Java process is NOT getting TERM signals is that Docker is sending the TERM signal to the ENTRYPOINT, i.e. the shell process running your script, and the script is not forwarding this on to your Java Process.

    The exec command replaces the shell process with your Java process, so TERM signals are now sent to it.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search