I am using ActiveMQ broker in my bot for sending message to my subscribers, and I’m receiving this error:
2023-06-09 12:57:57,650 | INFO | Usage Manager Memory Limit (1336252826) reached on queue://VIBER, size 0. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info. | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.255.102.101:45200@61616
2023-06-09 12:58:27,735 | INFO | Usage(default:memory:queue://VIBER:memory) percentUsage=48%, usage=646397829, limit=1336252826, percentUsageMinDelta=1%;Parent:Usage(default:memory) percentUsage=100%, usage=1336254815, limit=1336252826, percentUsageMinDelta=1%: Usage Manager Memory Limit reached. Producer (ID:49-bot-a.ukrposhta.loc-36283-1685673275857-17607:1:1:1) stopped to prevent flooding queue://VIBER. See http://activemq.apache.org/producer-flow-control.html for more info (blocking for: 8s) | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.255.102.101:45200@61616
After in some time memory of my ActiveMQ is ending and I am getting Broken pipe
:
Transport Connection to: tcp://10.255.102.101:35652 failed: java.net.SocketException: Broken pipe (Write failed) | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ InactivityMonitor Worker
This is the code describing the logic for sending news in my bot:
@Override
public void sendNews(News news) {
telegramLog.info(this.toString() + " start send news.");
produceNews(telegramUserDao, telegramLog, QUEUE, this, news);
//send news to users
consumeNews(telegramLog, QUEUE, sender);
}
This is how I am starting the producers:
protected void produceNews(UserDao userDao, BotLogger log, String queueName, Bot bot, News news) {
List<BotUser> users = userDao.getAllUsers();
Set<String> usersId = null;
if (news.getFilter() != null) {
if (news.getFilter() == NewsFilter.SHKI) {
String botType = bot.toString();
List<ShkiMonitoring> allByBotType = monitorDao.getAllByBotType(botType);
if (allByBotType == null || allByBotType.isEmpty()) return;
Comparison comparison = news.getComparison();
if (Comparison.PATTERN == comparison) {
Pattern pattern = Pattern.compile(news.getFilterValue());
usersId = allByBotType.stream().filter(shkiMonitoring -> {
Matcher matcher = pattern.matcher(shkiMonitoring.getBarcode());
return matcher.find();
}).map(ShkiMonitoring::getUserId).collect(Collectors.toSet());
}
}
} else {
usersId = users.stream().map(BotUser::getId).collect(Collectors.toSet());
}
if (usersId == null || usersId.isEmpty()) return;
log.info("Amount of Users: " + usersId.size());
Set<String> newsLetters = usersId.stream()
.map(id -> new NewsLetter(id, news))
.map(newsLetter -> new Gson().toJson(newsLetter))
.collect(Collectors.toSet());
log.info("Amount of NewsLetters: " + newsLetters.size());
//write set of users id to queue
Thread producer = new Thread(beanFactory.getBean(NewsProducer.class, log, newsLetters, queueName, bot, news));
producer.setName("News " + bot + " Producer Thread");
producer.start();
log.info("wait producer writing the messages...");
try {
producer.join();
} catch (InterruptedException e) {
log.error(e.getMessage());
}
}
And this is how I create the consumers according from bot type:
public void consumeNews(BotLogger log, String queueName, Object sender) {
if (this.newsConsumer == null) {
this.newsConsumer = beanFactory.getBean(
NewsConsumer.class, log, queueName, sender);
}
LocalTime startSendTime = newsConsumer.getStartSendTime();
LocalTime endSendTime = newsConsumer.getEndSendTime();
if ((startSendTime == null || startSendTime.isBefore(LocalTime.now())) &&
(endSendTime == null || endSendTime.isAfter(LocalTime.now()))) {
log.info(sender.getClass().getSimpleName() + " starting consum "" + queueName + "" queue...");
int threadsCount = (sender instanceof FacebookSender) ? 4 : (sender instanceof TelegramSender) ? 8 : 10;
ExecutorService executorService = Executors.newFixedThreadPool(threadsCount);
List<Callable<Void>> tasks = new ArrayList<>();
int i;
for (i = 0; i < threadsCount; i++) {
tasks.add(Executors.callable(this.newsConsumer, null));
}
log.info(i + " consumers created for " + queueName + " queue.");
try {
List<Future<Void>> futures = executorService.invokeAll(tasks);
} catch (InterruptedException e) {
executorService.shutdown();
} finally {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
}
}
log.info("end consuming "" + queueName + "" queue.");
}
I have the next method of my NewsProducer
bean:
@Override
public void run() {
log.info("producer start, queue name : " + queueName);
log.info("broker url : " + brokerUrl);
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
log.info("producer# initializing activeMQ factory...");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
log.info("producer# initializing connection...");
connection = factory.createConnection();
connection.start();
log.info("connection started " + connection.toString());
session = connection.createSession(true, Session.SESSION_TRANSACTED);
log.info("session started " + session.toString());
Destination destination = session.createQueue(queueName);
producer = session.createProducer(destination);
log.info("producer started " + producer.toString());
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
log.info("news letters size : " + newsLetters.size());
for (String newsLetter : newsLetters) {
TextMessage textMessage = session.createTextMessage(newsLetter);
producer.send(textMessage);
}
session.commit();
log.info(newsLetters.size() + " news letters committed.");
//update news
log.info("updating news status...");
news.setBotStatus(bot.toString(), false);
log.info(bot.toString() + " set bot status " + news.isBotStatus(bot.toString()));
newsDao.updateStatus(bot.toString(), news);
log.info("News status updated for " + bot);
} catch (Exception e) {
log.warn(e.getMessage());
try {
if (session != null) {
session.rollback();
}
} catch (JMSException e1) {
log.warn(e1.getMessage());
}
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
log.warn(e.getMessage());
}
}
}
All news for sending are saved in the database. Every half of hour to start the thread, which checks the queue and news in my database. If news is there it start sending process:
@Override
public void run() {
botLoggerT.info("RUN ___ BEFORE QUEUE");
bot.checkQueue();
botLoggerT.info("RUN ___ AFTER QUEUE");
List<News> news = newsDao.getNewNews(bot.toString());
botLoggerT.info("RUN NEWS " + news.size());
if (news != null && news.size() > 0) {
news.forEach(n -> bot.sendNews(n));
}
}
But I can not undesrstand why i have this error. What am I doing wrong? In viber I have 500,000 subscribers and 250,000 subscribers in Telegram.
This is the systemUsage
settings of my ActiveMQ:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
and destinationPolicy :
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
2
Answers
@Matt Pavlovich is right. The problem was here , in this part of code :
Since there were a large number of messages to send, this version of the code created a very large batch to send. By changing this part of code to as :
everything was fine.
Most likely you need to commit your transacted batch in smaller chunks. Most messaging systems are geared around the 10k message count as an upper end for a single transaction and a commit size in the 128 to 1024 is most commonly used.