skip to Main Content

I am using ActiveMQ broker in my bot for sending message to my subscribers, and I’m receiving this error:

2023-06-09 12:57:57,650 | INFO  | Usage Manager Memory Limit (1336252826) reached on queue://VIBER, size 0. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info. | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.255.102.101:45200@61616
2023-06-09 12:58:27,735 | INFO  | Usage(default:memory:queue://VIBER:memory) percentUsage=48%, usage=646397829, limit=1336252826, percentUsageMinDelta=1%;Parent:Usage(default:memory) percentUsage=100%, usage=1336254815, limit=1336252826, percentUsageMinDelta=1%: Usage Manager Memory Limit reached. Producer (ID:49-bot-a.ukrposhta.loc-36283-1685673275857-17607:1:1:1) stopped to prevent flooding queue://VIBER. See http://activemq.apache.org/producer-flow-control.html for more info (blocking for: 8s) | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.255.102.101:45200@61616

After in some time memory of my ActiveMQ is ending and I am getting Broken pipe:

Transport Connection to: tcp://10.255.102.101:35652 failed: java.net.SocketException: Broken pipe (Write failed) | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ InactivityMonitor Worker

This is the code describing the logic for sending news in my bot:

@Override
public void sendNews(News news) {
    telegramLog.info(this.toString() + " start send news.");
    produceNews(telegramUserDao, telegramLog, QUEUE, this, news);

    //send news to users
    consumeNews(telegramLog, QUEUE, sender);
}

This is how I am starting the producers:

    protected void produceNews(UserDao userDao, BotLogger log, String queueName, Bot bot, News news) {
        List<BotUser> users = userDao.getAllUsers();
        Set<String> usersId = null;
        if (news.getFilter() != null) {
            if (news.getFilter() == NewsFilter.SHKI) {
                String botType = bot.toString();
                List<ShkiMonitoring> allByBotType = monitorDao.getAllByBotType(botType);
                if (allByBotType == null || allByBotType.isEmpty()) return;
                Comparison comparison = news.getComparison();
                if (Comparison.PATTERN == comparison) {
                    Pattern pattern = Pattern.compile(news.getFilterValue());
                    usersId = allByBotType.stream().filter(shkiMonitoring -> {
                        Matcher matcher = pattern.matcher(shkiMonitoring.getBarcode());
                        return matcher.find();
                    }).map(ShkiMonitoring::getUserId).collect(Collectors.toSet());
                }
            }
        } else {
            usersId = users.stream().map(BotUser::getId).collect(Collectors.toSet());
        }
        if (usersId == null || usersId.isEmpty()) return;

        log.info("Amount of Users: " + usersId.size());
        Set<String> newsLetters = usersId.stream()
                .map(id -> new NewsLetter(id, news))
                .map(newsLetter -> new Gson().toJson(newsLetter))
                .collect(Collectors.toSet());
        log.info("Amount of NewsLetters: " + newsLetters.size());
        //write set of users id to queue
        Thread producer = new Thread(beanFactory.getBean(NewsProducer.class, log, newsLetters, queueName, bot, news));
        producer.setName("News " + bot + " Producer Thread");
        producer.start();
        log.info("wait producer writing the messages...");
        try {
            producer.join();
        } catch (InterruptedException e) {
            log.error(e.getMessage());
        }
    }

And this is how I create the consumers according from bot type:

    public void consumeNews(BotLogger log, String queueName, Object sender) {
        if (this.newsConsumer == null) {
            this.newsConsumer = beanFactory.getBean(
                    NewsConsumer.class, log, queueName, sender);
        }
        LocalTime startSendTime = newsConsumer.getStartSendTime();
        LocalTime endSendTime = newsConsumer.getEndSendTime();
        if ((startSendTime == null || startSendTime.isBefore(LocalTime.now())) &&
                (endSendTime == null || endSendTime.isAfter(LocalTime.now()))) {
            log.info(sender.getClass().getSimpleName() + " starting consum "" + queueName + "" queue...");
            int threadsCount = (sender instanceof FacebookSender) ? 4 : (sender instanceof TelegramSender) ? 8 : 10;
            ExecutorService executorService = Executors.newFixedThreadPool(threadsCount);
            List<Callable<Void>> tasks = new ArrayList<>();
            int i;
            for (i = 0; i < threadsCount; i++) {
                tasks.add(Executors.callable(this.newsConsumer, null));
            }
            log.info(i + " consumers created for " + queueName + " queue.");
            try {
                List<Future<Void>> futures = executorService.invokeAll(tasks);
            } catch (InterruptedException e) {
                executorService.shutdown();
            } finally {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                }
            }
        }
        log.info("end consuming "" + queueName + "" queue.");
    }

I have the next method of my NewsProducer bean:

    @Override
    public void run() {
        log.info("producer start, queue name : " + queueName);
        log.info("broker url : " + brokerUrl);
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            log.info("producer# initializing activeMQ factory...");
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
            log.info("producer# initializing connection...");
            connection = factory.createConnection();
            connection.start();
            log.info("connection started " + connection.toString());
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            log.info("session started " + session.toString());
            Destination destination = session.createQueue(queueName);
            producer = session.createProducer(destination);
            log.info("producer started " + producer.toString());
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            log.info("news letters size : " + newsLetters.size());
            for (String newsLetter : newsLetters) {
                TextMessage textMessage = session.createTextMessage(newsLetter);
                producer.send(textMessage);
            }
            session.commit();
            log.info(newsLetters.size() + " news letters committed.");
            //update news
            log.info("updating news status...");
            news.setBotStatus(bot.toString(), false);
            log.info(bot.toString() + " set bot status " + news.isBotStatus(bot.toString()));
            newsDao.updateStatus(bot.toString(), news);
            log.info("News status updated for " + bot);
        } catch (Exception e) {
            log.warn(e.getMessage());
            try {
                if (session != null) {
                    session.rollback();
                }
            } catch (JMSException e1) {
                log.warn(e1.getMessage());
            }
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (JMSException e) {
                log.warn(e.getMessage());
            }
        }
    }

All news for sending are saved in the database. Every half of hour to start the thread, which checks the queue and news in my database. If news is there it start sending process:

@Override
    public void run() {
        botLoggerT.info("RUN ___ BEFORE QUEUE");
        bot.checkQueue();
        botLoggerT.info("RUN ___ AFTER QUEUE");
        List<News> news = newsDao.getNewNews(bot.toString());
        botLoggerT.info("RUN NEWS " + news.size());
        if (news != null && news.size() > 0) {
            news.forEach(n -> bot.sendNews(n));
        }
    }

But I can not undesrstand why i have this error. What am I doing wrong? In viber I have 500,000 subscribers and 250,000 subscribers in Telegram.

This is the systemUsage settings of my ActiveMQ:

          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

and destinationPolicy :

  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry topic=">" >
            <!-- The constantPendingMessageLimitStrategy is used to prevent
                 slow topic consumers to block producers and affect other consumers
                 by limiting the number of messages that are retained
                 For more information, see:

                 http://activemq.apache.org/slow-consumer-handling.html

            -->
          <pendingMessageLimitStrategy>
            <constantPendingMessageLimitStrategy limit="1000"/>
          </pendingMessageLimitStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>

2

Answers


  1. Chosen as BEST ANSWER

    @Matt Pavlovich is right. The problem was here , in this part of code :

     for (String newsLetter : newsLetters) {
                    TextMessage textMessage = session.createTextMessage(newsLetter);
                    producer.send(textMessage);
                }
                session.commit();
    

    Since there were a large number of messages to send, this version of the code created a very large batch to send. By changing this part of code to as :

    for (String newsLetter : newsLetters) {
                    TextMessage textMessage = session.createTextMessage(newsLetter);
                    producer.send(textMessage);
                    countLetter++;
                    if (countLetter%1000 == 0) {
                        session.commit();System.out.println(countLetter);
                    }
                }
                session.commit();
    

    everything was fine.


  2. Most likely you need to commit your transacted batch in smaller chunks. Most messaging systems are geared around the 10k message count as an upper end for a single transaction and a commit size in the 128 to 1024 is most commonly used.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search