skip to Main Content

I have a Kafka Consumer service

@KafkaListener(
      topics = "topic1",
      groupId = "cluster1",
      containerFactory = "KafkaListenerContainerFactory")
  public void consume(Message message) {
    logger.info(String.format("Message recieved -> %s", message.getMsg()));
    Long id = message.getId()

    RepoDetail repoDetail = testRepo.findByID(id);

    logger.info(
        String.format(
            "Message -> %s", repoDetail.getMessage()));
  }

but when it tries to hit the mongo repo i get the following error:
No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.

Any way I can call a mongoDB from Kafka Consumer?

full log stack:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.kafka.api.service.ConsumerService.consume(...)' threw exception; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2188) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2159) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2120) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2039) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1967) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1853) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1543) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1087) ~[spring-kafka-2.6.9.jar:2.6.9]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_212]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[?:1.8.0_212]
    at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_212]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
    at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.context.support.WebApplicationContextUtils.currentRequestAttributes(WebApplicationContextUtils.java:313) ~[spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.context.support.WebApplicationContextUtils.access$400(WebApplicationContextUtils.java:66) ~[spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.context.support.WebApplicationContextUtils$RequestObjectFactory.getObject(WebApplicationContextUtils.java:329) ~[spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.context.support.WebApplicationContextUtils$RequestObjectFactory.getObject(WebApplicationContextUtils.java:324) ~[spring-web-5.3.8.jar:5.3.8]
    at org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler.invoke(AutowireUtils.java:292) ~[spring-beans-5.3.8.jar:5.3.8]
    at com.sun.proxy.$Proxy156.getHeader(Unknown Source) ~[?:?]
    at com.repository.BaseRepository.getTemplate(BaseRepository.java:57) ~[repo-5.3.008.jar:?]
    at com.kafka.api.repository.impl.ProductDetailRepositoryImpl.findByID(ProductDetailRepositoryImpl.java:21) ~[classes/:?]
    at com.kafka.api.repository.impl.ProductDetailRepositoryImpl$$FastClassBySpringCGLIB$$824634c2.invoke(<generated>) ~[classes/:?]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137) ~[spring-tx-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) ~[spring-aop-5.3.8.jar:5.3.8]
    at com.kafka.api.repository.impl.ProductDetailRepositoryImpl$$EnhancerBySpringCGLIB$$ee673ffc.findByID(<generated>) ~[classes/:?]
    at com.kafka.api.service.ConsumerService.consume(ConsumerService.java:35) ~[classes/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:330) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:87) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:52) ~[spring-kafka-2.6.9.jar:2.6.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2139) ~[spring-kafka-2.6.9.jar:2.6.9]
    ... 11 more

2

Answers


  1. Yes you can. Using a BSON Document and a perfect Write Model, you can upsert data into Mongo DB through BulkWriteResult interface.

    BulkWriteResult result = getMongoClient()

        .getDatabase(config.getNamespace().getDatabaseName())                    
        .getCollection(config.getNamespace()
        .getCollectionName(),BsonDocument.class)
        .bulkWrite(writeModels, BULK_WRITE_OPTIONS);
    
    Login or Signup to reply.
  2. Declare the below bean in your config class,

    @Bean public RequestContextListener requestContextListener(){
        return new RequestContextListener();
    } 
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search