skip to Main Content

How to Implement Queue functionality for Product import in Magento 2
I want to implement Queue when importing product csv from admin side.

I want to just add Queue and u=when user import any csv of product then just return message with queue and in background it will imported.

2

Answers


  1. Chosen as BEST ANSWER

    Finally i completed this using below step:-

    Step 1:- Create Plugin

    <type name="MagentoImportExportControllerAdminhtmlImportStart">
            <plugin name="import_start_plugin" type="VendorNameProductPluginImportStartPlugin" sortOrder="1"/>
        </type>
    

    Step 2:- Create plugin around

    public function aroundExecute(
            ImportStart $subject,
            Closure $proceed
        ): ResultInterface {
            $data = $subject->getRequest()->getPostValue();
            if ($data['entity'] == 'catalog_product') {
                $queueCheck = $this->queueHelper->getMessages('bulk_product_import');
                if ($queueCheck == 1) {
                    $resultLayout = $this->resultFactory->create(MagentoFrameworkControllerResultFactory::TYPE_LAYOUT);
                    $resultBlock = $resultLayout->getLayout()->getBlock('import.frame.result');
                    $resultBlock
                        ->addAction('show', 'import_validation_container')
                        ->addAction('innerHTML', 'import_validation_container_header', __('Status'))
                        ->addAction('hide', ['edit_form', 'upload_button', 'messages']);
                    
                    $resultBlock->addSuccess(__('You have already tried to import product earlier which is in progress. Kindly wait to complete.'));
                    return $resultLayout;
                }
    
                $userObj = $this->auth->getUser();
                $emailId = $userObj->getEmail();
                $adminName = $userObj->getFirstname() . " " . $userObj->getLastname();
    
                $dataObject = $this->exportInfoFactory->create(
                    "csv",
                    "product_import",
                    $data,
                    ['email' => $emailId, 'name' => $adminName]
                );
    
                $this->messagePublisher->publish('bulk_product_import', $dataObject);
    
                $resultLayout = $this->resultFactory->create(MagentoFrameworkControllerResultFactory::TYPE_LAYOUT);
                $resultBlock = $resultLayout->getLayout()->getBlock('import.frame.result');
                $resultBlock
                    ->addAction('show', 'import_validation_container')
                    ->addAction('innerHTML', 'import_validation_container_header', __('Status'))
                    ->addAction('hide', ['edit_form', 'upload_button', 'messages']);
                
                $resultBlock->addSuccess(__('File is added to queue, wait to get product import success Email on your Email ID'));
                return $resultLayout;
            }
    
            // Proceed with original import execution
            return $proceed();
        }
    

    Step 3 : - Create process class for call publisher

    public function process(ExportInfoInterface $exportInfo)
        {
            try {
                $writer = new Zend_Log_Writer_Stream(BP . '/var/log/product_import_queue_log.log');
                $logger = new Zend_Log();
                $logger->addWriter($writer);
    
                date_default_timezone_set('Asia/Kolkata');
                $startTime = date('Y-m-d H:i:s');
    
                $logger->info('Product import start at '. $startTime);
    
                $exportFilter = $this->serializer->unserialize($exportInfo->getExportFilter());
                $userData = $this->serializer->unserialize($exportInfo->getUserData());
                $queueId = $this->queueHelper->getMessagesId('bulk_product_import');
                $exportlogModel = $this->exportlogModel->create();
                $exportlogModel  = $exportlogModel->getCollection()
                    ->addFieldToFilter('message_id', ['eq' => $queueId])->getLastItem();
                $collection = $this->userCollectionFactory->create();
                $collection->addFieldToFilter('email', $userData['email']);
                $collection->setPageSize(1);
                $adminUser = $collection->getFirstItem();
                $adminUserId = $adminUser->getId();
                $connection = $this->resourceConnection->getConnection();
                $tableName = $this->resourceConnection->getTableName('import_history');
                $select = $connection->select()
                    ->from($tableName, ['history_id'])
                    ->where('user_id = ?', $adminUserId)
                    ->order('started_at DESC') // or use another column for ordering if applicable
                    ->limit(1);
                $importHistoryId = $connection->fetchOne($select);
                if(empty($exportlogModel->getData())){
                    $exportlogModel = $this->exportlogModel->create();
                    $exportlogModel->setUserName($userData['name']);
                    $exportlogModel->setUserEmail($userData['email']);
                    $exportlogModel->setModuleName("Bulk Product Import");
                    $exportlogModel->setEntityType("catalog_product");
                    $exportlogModel->setMessageId($queueId);
                    $exportlogModel->setUserFilters($exportInfo->getExportFilter());
                    $exportlogModel->setSentEmail(0);
                    $exportlogModel->save();
                } 
                if ($exportFilter) {
                    $this->importModel->setData($exportFilter);
                    $this->importModel->setData('images_base_directory', $this->imagesDirProvider->getDirectory());
                    
                    $errorAggregator = $this->importModel->getErrorAggregator();
                    $errorAggregator->initValidationStrategy(
                        $this->importModel->getData(Import::FIELD_NAME_VALIDATION_STRATEGY),
                        $this->importModel->getData(Import::FIELD_NAME_ALLOWED_ERROR_COUNT)
                    );
                    
                    try {
                        $this->importModel->importSource();
                    } catch (Exception $e) {
                        $logger->info($e->getMessage());
                    }
    
                    if ($this->importModel->getErrorAggregator()->hasToBeTerminated()) {
                        $logger->info(__('Maximum error count has been reached or system error is occurred!'));
                    } else {
                        $this->importModel->invalidateIndex();
        
                        $noticeHtml = $this->historyModel->getSummary();
        
                        if ($this->historyModel->getErrorFile()) {
                            $noticeHtml .=  '<div class="import-error-wrapper">' . __('Only the first 100 errors are shown. ')
                                            . '<a href="'
                                            . $this->createDownloadUrlImportHistoryFile($this->historyModel->getErrorFile())
                                            . '">' . __('Download full report') . '</a></div>';
                            $logger->info($noticeHtml);
                        } 
                        date_default_timezone_set('Asia/Kolkata');
                        $endTime = date('Y-m-d H:i:s');
                        $logger->info('Product import success at ' . $endTime);
                        $startDateTime = new DateTime($startTime);
                        $endDateTime = new DateTime($endTime);
    
                        $interval = $startDateTime->diff($endDateTime);
                        $hours = $interval->h;
                        $minutes = $interval->i;
                        $seconds = $interval->s;
                        $executionTime =  sprintf('%02d:%02d:%02d', $hours, $minutes, $seconds);
                        $connection->update(
                            $tableName,
                            ['execution_time' => $executionTime, 'summary' => $noticeHtml],
                            ['history_id = ?' => $importHistoryId]
                        );
                    }
                }
        
            } catch (LocalizedException | FileSystemException $exception) {
                $logger->info('Something went wrong while export process. ' . $exception->getMessage());
                $this->logger->critical('Something went wrong while export process. ' . $exception->getMessage());
            }
        }
    

    ANd then create other necessary file related to queue

    I can't post complete code here But you will get idea using this code


  2. Implementing queue functionality for product import in Magento 2 involves creating a custom module that uses Magento’s built-in Message Queue system. This allows for efficient processing of large product imports in the background without blocking the user interface.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search