How to Implement Queue functionality for Product import in Magento 2
I want to implement Queue when importing product csv from admin side.

I want to just add Queue and u=when user import any csv of product then just return message with queue and in background it will imported.



  1. Chosen as BEST ANSWER

    Finally i completed this using below step:-

    Step 1:- Create Plugin

    <type name="MagentoImportExportControllerAdminhtmlImportStart">
            <plugin name="import_start_plugin" type="VendorNameProductPluginImportStartPlugin" sortOrder="1"/>

    Step 2:- Create plugin around

    public function aroundExecute(
            ImportStart $subject,
            Closure $proceed
        ): ResultInterface {
            $data = $subject->getRequest()->getPostValue();
            if ($data['entity'] == 'catalog_product') {
                $queueCheck = $this->queueHelper->getMessages('bulk_product_import');
                if ($queueCheck == 1) {
                    $resultLayout = $this->resultFactory->create(MagentoFrameworkControllerResultFactory::TYPE_LAYOUT);
                    $resultBlock = $resultLayout->getLayout()->getBlock('import.frame.result');
                        ->addAction('show', 'import_validation_container')
                        ->addAction('innerHTML', 'import_validation_container_header', __('Status'))
                        ->addAction('hide', ['edit_form', 'upload_button', 'messages']);
                    $resultBlock->addSuccess(__('You have already tried to import product earlier which is in progress. Kindly wait to complete.'));
                    return $resultLayout;
                $userObj = $this->auth->getUser();
                $emailId = $userObj->getEmail();
                $adminName = $userObj->getFirstname() . " " . $userObj->getLastname();
                $dataObject = $this->exportInfoFactory->create(
                    ['email' => $emailId, 'name' => $adminName]
                $this->messagePublisher->publish('bulk_product_import', $dataObject);
                $resultLayout = $this->resultFactory->create(MagentoFrameworkControllerResultFactory::TYPE_LAYOUT);
                $resultBlock = $resultLayout->getLayout()->getBlock('import.frame.result');
                    ->addAction('show', 'import_validation_container')
                    ->addAction('innerHTML', 'import_validation_container_header', __('Status'))
                    ->addAction('hide', ['edit_form', 'upload_button', 'messages']);
                $resultBlock->addSuccess(__('File is added to queue, wait to get product import success Email on your Email ID'));
                return $resultLayout;
            // Proceed with original import execution
            return $proceed();

    Step 3 : - Create process class for call publisher

    public function process(ExportInfoInterface $exportInfo)
            try {
                $writer = new Zend_Log_Writer_Stream(BP . '/var/log/product_import_queue_log.log');
                $logger = new Zend_Log();
                $startTime = date('Y-m-d H:i:s');
                $logger->info('Product import start at '. $startTime);
                $exportFilter = $this->serializer->unserialize($exportInfo->getExportFilter());
                $userData = $this->serializer->unserialize($exportInfo->getUserData());
                $queueId = $this->queueHelper->getMessagesId('bulk_product_import');
                $exportlogModel = $this->exportlogModel->create();
                $exportlogModel  = $exportlogModel->getCollection()
                    ->addFieldToFilter('message_id', ['eq' => $queueId])->getLastItem();
                $collection = $this->userCollectionFactory->create();
                $collection->addFieldToFilter('email', $userData['email']);
                $adminUser = $collection->getFirstItem();
                $adminUserId = $adminUser->getId();
                $connection = $this->resourceConnection->getConnection();
                $tableName = $this->resourceConnection->getTableName('import_history');
                $select = $connection->select()
                    ->from($tableName, ['history_id'])
                    ->where('user_id = ?', $adminUserId)
                    ->order('started_at DESC') // or use another column for ordering if applicable
                $importHistoryId = $connection->fetchOne($select);
                    $exportlogModel = $this->exportlogModel->create();
                    $exportlogModel->setModuleName("Bulk Product Import");
                if ($exportFilter) {
                    $this->importModel->setData('images_base_directory', $this->imagesDirProvider->getDirectory());
                    $errorAggregator = $this->importModel->getErrorAggregator();
                    try {
                    } catch (Exception $e) {
                    if ($this->importModel->getErrorAggregator()->hasToBeTerminated()) {
                        $logger->info(__('Maximum error count has been reached or system error is occurred!'));
                    } else {
                        $noticeHtml = $this->historyModel->getSummary();
                        if ($this->historyModel->getErrorFile()) {
                            $noticeHtml .=  '<div class="import-error-wrapper">' . __('Only the first 100 errors are shown. ')
                                            . '<a href="'
                                            . $this->createDownloadUrlImportHistoryFile($this->historyModel->getErrorFile())
                                            . '">' . __('Download full report') . '</a></div>';
                        $endTime = date('Y-m-d H:i:s');
                        $logger->info('Product import success at ' . $endTime);
                        $startDateTime = new DateTime($startTime);
                        $endDateTime = new DateTime($endTime);
                        $interval = $startDateTime->diff($endDateTime);
                        $hours = $interval->h;
                        $minutes = $interval->i;
                        $seconds = $interval->s;
                        $executionTime =  sprintf('%02d:%02d:%02d', $hours, $minutes, $seconds);
                            ['execution_time' => $executionTime, 'summary' => $noticeHtml],
                            ['history_id = ?' => $importHistoryId]
            } catch (LocalizedException | FileSystemException $exception) {
                $logger->info('Something went wrong while export process. ' . $exception->getMessage());
                $this->logger->critical('Something went wrong while export process. ' . $exception->getMessage());

    ANd then create other necessary file related to queue

    I can't post complete code here But you will get idea using this code

  2. Implementing queue functionality for product import in Magento 2 involves creating a custom module that uses Magento’s built-in Message Queue system. This allows for efficient processing of large product imports in the background without blocking the user interface.

