I have a Kafka Consumer service
@KafkaListener(
topics = "topic1",
groupId = "cluster1",
containerFactory = "KafkaListenerContainerFactory")
public void consume(Message message) {
logger.info(String.format("Message recieved -> %s", message.getMsg()));
Long id = message.getId()
RepoDetail repoDetail = testRepo.findByID(id);
logger.info(
String.format(
"Message -> %s", repoDetail.getMessage()));
}
but when it tries to hit the mongo repo i get the following error:
No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
Any way I can call a mongoDB from Kafka Consumer?
full log stack:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.kafka.api.service.ConsumerService.consume(...)' threw exception; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2188) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2159) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2120) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2039) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1967) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1853) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1543) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1087) ~[spring-kafka-2.6.9.jar:2.6.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_212]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[?:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.context.support.WebApplicationContextUtils.currentRequestAttributes(WebApplicationContextUtils.java:313) ~[spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.context.support.WebApplicationContextUtils.access$400(WebApplicationContextUtils.java:66) ~[spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.context.support.WebApplicationContextUtils$RequestObjectFactory.getObject(WebApplicationContextUtils.java:329) ~[spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.context.support.WebApplicationContextUtils$RequestObjectFactory.getObject(WebApplicationContextUtils.java:324) ~[spring-web-5.3.8.jar:5.3.8]
at org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler.invoke(AutowireUtils.java:292) ~[spring-beans-5.3.8.jar:5.3.8]
at com.sun.proxy.$Proxy156.getHeader(Unknown Source) ~[?:?]
at com.repository.BaseRepository.getTemplate(BaseRepository.java:57) ~[repo-5.3.008.jar:?]
at com.kafka.api.repository.impl.ProductDetailRepositoryImpl.findByID(ProductDetailRepositoryImpl.java:21) ~[classes/:?]
at com.kafka.api.repository.impl.ProductDetailRepositoryImpl$$FastClassBySpringCGLIB$$824634c2.invoke(<generated>) ~[classes/:?]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.8.jar:5.3.8]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.8.jar:5.3.8]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.8.jar:5.3.8]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.8.jar:5.3.8]
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137) ~[spring-tx-5.3.8.jar:5.3.8]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.8.jar:5.3.8]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.8.jar:5.3.8]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) ~[spring-aop-5.3.8.jar:5.3.8]
at com.kafka.api.repository.impl.ProductDetailRepositoryImpl$$EnhancerBySpringCGLIB$$ee673ffc.findByID(<generated>) ~[classes/:?]
at com.kafka.api.service.ConsumerService.consume(ConsumerService.java:35) ~[classes/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:330) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:87) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:52) ~[spring-kafka-2.6.9.jar:2.6.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2139) ~[spring-kafka-2.6.9.jar:2.6.9]
... 11 more
2
Answers
Yes you can. Using a BSON Document and a perfect Write Model, you can upsert data into Mongo DB through BulkWriteResult interface.
BulkWriteResult result = getMongoClient()
Declare the below bean in your config class,