gerFactory; $this->cronHelper = $cronHelper; $this->links = $links; $this->scheduledTasksRepository = $scheduledTasksRepository; $this->scheduledTaskSubscribersRepository = $scheduledTaskSubscribersRepository; $this->subscribersRepository = $subscribersRepository; $this->sendingQueuesRepository = $sendingQueuesRepository; $this->entityManager = $entityManager; $this->statisticsNewslettersRepository = $statisticsNewslettersRepository; $this->authorizedEmailsController = $authorizedEmailsController; } public function process($timer = false) { $timer = $timer ?: microtime(true); $this->enforceSendingAndExecutionLimits($timer); foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $task) { $queue = $task->getSendingQueue(); if (!$queue) { continue; } if ($task->getInProgress()) { if ($this->isTimeout($task)) { $this->stopProgress($task); } else { continue; } } $this->startProgress($task); try { $this->scheduledTasksRepository->touchAllByIds([$task->getId()]); $this->processSending($task, (int)$timer); } catch (\Exception $e) { $this->stopProgress($task); throw $e; } $this->stopProgress($task); } } private function processSending(ScheduledTaskEntity $task, int $timer): void { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'sending queue processing', ['task_id' => $task->getId()] ); $this->deleteTaskIfNewsletterDoesNotExist($task); $queue = $task->getSendingQueue(); $newsletter = $this->newsletterTask->getNewsletterFromQueue($task); if (!$queue || !$newsletter) { return; } // pre-process newsletter (render, replace shortcodes/links, etc.) $newsletter = $this->newsletterTask->preProcessNewsletter($newsletter, $task); // During pre-processing we may find that the newsletter can't be sent and we delete it including all associated entities // E.g. post notification history newsletter when there are no posts to send if (!$newsletter) { return; } $isTransactional = in_array($newsletter->getType(), [ NewsletterEntity::TYPE_AUTOMATION_TRANSACTIONAL, NewsletterEntity::TYPE_WC_TRANSACTIONAL_EMAIL, ]); // configure mailer $this->mailerTask->configureMailer($newsletter); // get newsletter segments $newsletterSegmentsIds = $newsletter->getSegmentIds(); $segmentIdsToCheck = $newsletterSegmentsIds; $filterSegmentId = $newsletter->getFilterSegmentId(); if (is_int($filterSegmentId)) { $segmentIdsToCheck[] = $filterSegmentId; } // Pause task in case some of related segments was deleted or trashed if ($newsletterSegmentsIds && !$this->checkDeletedSegments($segmentIdsToCheck)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'pause task in sending queue due deleted or trashed segment', ['task_id' => $task->getId()] ); $task->setStatus(ScheduledTaskEntity::STATUS_PAUSED); $this->scheduledTasksRepository->flush(); $this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->getSubject()); return; } // Pause task if sender domain requirements are not met if (!$this->authorizedEmailsController->isSenderAddressValid($newsletter, 'sending')) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'pause task in sending queue due to sender domain requirements', ['task_id' => $task->getId()] ); $task->setStatus(ScheduledTaskEntity::STATUS_PAUSED); $this->scheduledTasksRepository->flush(); return; } // get subscribers $subscriberBatches = new BatchIterator($task->getId(), $this->getBatchSize()); // Set invalid state for sending task for non-campaign (no-bulk) newsletters with no subscribers (e.g. welcome emails, automatic emails). // This cover cases when a welcome or automatic email was scheduled but before processing it the subscriber was deleted. // The non-campaign emails are sent only to a single recipient, and we count stats based on sending tasks statues, so we can't mark them as completed. // At the same time we want to keep a record abut processing them if ($subscriberBatches->count() === 0 && !in_array($newsletter->getType(), NewsletterEntity::CAMPAIGN_TYPES, true)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'no subscribers to process', ['task_id' => $task->getId()] ); $this->scheduledTasksRepository->invalidateTask($task); return; } /** @var int[] $subscribersToProcessIds - it's required for PHPStan */ foreach ($subscriberBatches as $subscribersToProcessIds) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'subscriber batch processing', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'subscriber_batch_count' => count($subscribersToProcessIds)] ); if (!empty($newsletterSegmentsIds[0])) { // Check that subscribers are in segments try { $foundSubscribersIds = $this->subscribersFinder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds, $filterSegmentId); } catch (InvalidStateException $exception) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'paused task in sending queue due to problem finding subscribers: ' . $exception->getMessage(), ['task_id' => $task->getId()] ); $task->setStatus(ScheduledTaskEntity::STATUS_PAUSED); $this->scheduledTasksRepository->flush(); return; } $foundSubscribers = empty($foundSubscribersIds) ? [] : $this->subscribersRepository->findBy(['id' => $foundSubscribersIds, 'deletedAt' => null]); } else { // No segments = Welcome emails or some Automatic emails. // Welcome emails or some Automatic emails use segments only for scheduling and store them as a newsletter option $queryBuilder = $this->entityManager->createQueryBuilder(); $queryBuilder->select('s') ->from(SubscriberEntity::class, 's') ->where('s.id IN (:subscriberIds)') ->setParameter('subscriberIds', $subscribersToProcessIds) ->andWhere('s.deletedAt IS NULL'); if ($newsletter->getType() === NewsletterEntity::TYPE_AUTOMATION_TRANSACTIONAL) { $queryBuilder->andWhere('s.status != :bouncedStatus') ->setParameter('bouncedStatus', SubscriberEntity::STATUS_BOUNCED); } else { $queryBuilder->andWhere('s.status = :subscribedStatus') ->setParameter('subscribedStatus', SubscriberEntity::STATUS_SUBSCRIBED); } $foundSubscribers = $queryBuilder->getQuery()->getResult(); $foundSubscribersIds = array_map(function(SubscriberEntity $subscriber) { return $subscriber->getId(); }, $foundSubscribers); } // if some subscribers weren't found, remove them from the processing list if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) { $subscribersToRemove = array_diff( $subscribersToProcessIds, $foundSubscribersIds ); $this->scheduledTaskSubscribersRepository->deleteByScheduledTaskAndSubscriberIds($task, $subscribersToRemove); $this->sendingQueuesRepository->updateCounts($queue); // if there aren't any subscribers to process in the batch (e.g. all unsubscribed or were deleted), continue with the next batch if (count($foundSubscribersIds) === 0) { continue; } } $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'before queue chunk processing', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'found_subscribers_count' => count($foundSubscribers)] ); // reschedule bounce task to run sooner, if needed $this->reScheduleBounceTask(); // Check task has not been paused before continue processing // This is needed because the task can be paused in the middle of the batch processing, // for example on API error ERROR_MESSAGE_BULK_EMAIL_FORBIDDEN if ($task->getStatus() === ScheduledTaskEntity::STATUS_PAUSED) { return; } if ($newsletter->getStatus() !== NewsletterEntity::STATUS_CORRUPT) { $this->processQueue( $task, $newsletter, $foundSubscribers, $timer ); if (!$isTransactional) { $this->entityManager->wrapInTransaction(function() use ($foundSubscribersIds) { $now = Carbon::createFromTimestamp((int)current_time('timestamp')); $this->subscribersRepository->bulkUpdateLastSendingAt($foundSubscribersIds, $now); // We're nullifying this value so these subscribers' engagement score will be recalculated the next time the cron runs $this->subscribersRepository->bulkUpdateEngagementScoreUpdatedAt($foundSubscribersIds, null); }); } $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'after queue chunk processing', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); // In case we finished end sending properly before enforcing sending and execution limits // The limit enforcing throws and exception and the sending end wouldn't be processed properly (stats notification, newsletter marked as sent etc.) if ($task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED) { $this->endSending($task, $newsletter); return; } $this->enforceSendingAndExecutionLimits($timer); } else { $this->sendingQueuesRepository->pause($queue); $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'Can\'t send corrupt newsletter', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); return; } } // At this point all batches were processed or there are no batches to process // Also none of the checks above paused or invalidated the task $this->endSending($task, $newsletter); } public function getBatchSize(): int { return $this->throttlingHandler->getBatchSize(); } /** * @param SubscriberEntity[] $subscribers */ public function processQueue(ScheduledTaskEntity $task, NewsletterEntity $newsletter, array $subscribers, $timer) { // determine if processing is done in bulk or individually $processingMethod = $this->mailerTask->getProcessingMethod(); $preparedNewsletters = []; $preparedSubscribers = []; $preparedSubscribersIds = []; $unsubscribeUrls = []; $statistics = []; $metas = []; $oneClickUnsubscribeUrls = []; $sendingQueueEntity = $task->getSendingQueue(); if (!$sendingQueueEntity) { return; } $sendingQueueMeta = $sendingQueueEntity->getMeta() ?? []; $campaignId = $sendingQueueMeta['campaignId'] ?? null; foreach ($subscribers as $subscriber) { // render shortcodes and replace subscriber data in tracked links $preparedNewsletters[] = $this->newsletterTask->prepareNewsletterForSending( $newsletter, $subscriber, $sendingQueueEntity ); // format subscriber name/address according to mailer settings $preparedSubscribers[] = $this->mailerTask->prepareSubscriberForSending( $subscriber ); $preparedSubscribersIds[] = $subscriber->getId(); // create personalized instant unsubsribe link $unsubscribeUrls[] = $this->links->getUnsubscribeUrl($sendingQueueEntity->getId(), $subscriber); $oneClickUnsubscribeUrls[] = $this->links->getOneClickUnsubscribeUrl($sendingQueueEntity->getId(), $subscriber); $metasForSubscriber = $this->mailerMetaInfo->getNewsletterMetaInfo($newsletter, $subscriber); if ($campaignId) { $metasForSubscriber['campaign_id'] = $campaignId; } $metas[] = $metasForSubscriber; // keep track of values for statistics purposes $statistics[] = [ 'newsletter_id' => $newsletter->getId(), 'subscriber_id' => $subscriber->getId(), 'queue_id' => $sendingQueueEntity->getId(), ]; if ($processingMethod === 'individual') { $this->sendNewsletter( $task, $preparedSubscribersIds[0], $preparedNewsletters[0], $preparedSubscribers[0], $statistics[0], $timer, [ 'unsubscribe_url' => $unsubscribeUrls[0], 'meta' => $metas[0], 'one_click_unsubscribe' => $oneClickUnsubscribeUrls, ] ); $preparedNewsletters = []; $preparedSubscribers = []; $preparedSubscribersIds = []; $unsubscribeUrls = []; $oneClickUnsubscribeUrls = []; $statistics = []; $metas = []; } } if ($processingMethod === 'bulk') { $this->sendNewsletters( $task, $preparedSubscribersIds, $preparedNewsletters, $preparedSubscribers, $statistics, $timer, [ 'unsubscribe_url' => $unsubscribeUrls, 'meta' => $metas, 'one_click_unsubscribe' => $oneClickUnsubscribeUrls, ] ); } } public function sendNewsletter( ScheduledTaskEntity $task, $preparedSubscriberId, $preparedNewsletter, $preparedSubscriber, $statistics, $timer, $extraParams = [] ) { // send newsletter $sendResult = $this->mailerTask->send( $preparedNewsletter, $preparedSubscriber, $extraParams ); $this->processSendResult( $task, $sendResult, [$preparedSubscriber], [$preparedSubscriberId], [$statistics], $timer ); } public function sendNewsletters( ScheduledTaskEntity $task, $preparedSubscribersIds, $preparedNewsletters, $preparedSubscribers, $statistics, $timer, $extraParams = [] ) { // send newsletters $sendResult = $this->mailerTask->sendBulk( $preparedNewsletters, $preparedSubscribers, $extraParams ); $this->processSendResult( $task, $sendResult, $preparedSubscribers, $preparedSubscribersIds, $statistics, $timer ); } /** * Checks whether some of segments was deleted or trashed * @param int[] $segmentIds */ private function checkDeletedSegments(array $segmentIds): bool { if (count($segmentIds) === 0) { return true; } $segmentIds = array_unique($segmentIds); $segments = $this->segmentsRepository->findBy(['id' => $segmentIds]); // Some segment was deleted from DB if (count($segmentIds) > count($segments)) { return false; } foreach ($segments as $segment) { if ($segment->getDeletedAt() !== null) { return false; } } return true; } private function processSendResult( ScheduledTaskEntity $task, $sendResult, array $preparedSubscribers, array $preparedSubscribersIds, array $statistics, $timer ) { // log error message and schedule retry/pause sending if ($sendResult['response'] === false) { $error = $sendResult['error']; $this->errorHandler->processError($error, $task, $preparedSubscribersIds, $preparedSubscribers); } else { $queue = $task->getSendingQueue(); if (!$queue) { return; } try { $this->scheduledTaskSubscribersRepository->updateProcessedSubscribers($task, $preparedSubscribersIds); $this->sendingQueuesRepository->updateCounts($queue); } catch (Throwable $e) { MailerLog::processError( 'processed_list_update', sprintf('QUEUE-%d-PROCESSED-LIST-UPDATE', $queue->getId()), null, true ); } } // log statistics $this->statisticsNewslettersRepository->createMultiple($statistics); // update the sent count $this->mailerTask->updateSentCount(); // enforce execution limits if queue is still being processed if ($task->getStatus() !== ScheduledTaskEntity::STATUS_COMPLETED) { $this->enforceSendingAndExecutionLimits($timer); } // trigger automation email sent hook for automation emails if ( $task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED && isset($task->getMeta()['automation']) ) { try { $this->wp->doAction('mailpoet_automation_email_sent', $task->getMeta()['automation']); } catch (Throwable $e) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'Error while executing "mailpoet_automation_email_sent action" hook', ['task_id' => $task->getId(), 'error' => $e->getMessage()] ); } } $this->throttlingHandler->processSuccess(); } public function enforceSendingAndExecutionLimits($timer) { // abort if execution limit is reached $this->cronHelper->enforceExecutionLimit($timer); // abort if sending limit has been reached MailerLog::enforceExecutionRequirements(); } private function reScheduleBounceTask() { $bounceTasks = $this->scheduledTasksRepository->findFutureScheduledByType(Bounce::TASK_TYPE); if (count($bounceTasks)) { $bounceTask = reset($bounceTasks); if (Carbon::createFromTimestamp((int)current_time('timestamp'))->addHours(42)->lessThan($bounceTask->getScheduledAt())) { $randomOffset = rand(-6 * 60 * 60, 6 * 60 * 60); $bounceTask->setScheduledAt(Carbon::createFromTimestamp((int)current_time('timestamp'))->addSeconds((36 * 60 * 60) + $randomOffset)); $this->scheduledTasksRepository->persist($bounceTask); $this->scheduledTasksRepository->flush(); } } } private function startProgress(ScheduledTaskEntity $task): void { $task->setInProgress(true); $this->scheduledTasksRepository->flush(); } private function stopProgress(ScheduledTaskEntity $task): void { // if task is not managed by entity manager, it's already deleted and detached // it can be deleted in self::processSending method if (!$this->entityManager->contains($task)) { return; } $task->setInProgress(false); $this->scheduledTasksRepository->flush(); } private function isTimeout(ScheduledTaskEntity $task): bool { $currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $updatedAt = new Carbon($task->getUpdatedAt()); if ($updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) { return true; } return false; } private function getExecutionLimit(): int { return $this->cronHelper->getDaemonExecutionLimit() * 3; } private function deleteTaskIfNewsletterDoesNotExist(ScheduledTaskEntity $task) { $queue = $task->getSendingQueue(); $newsletter = $queue ? $queue->getNewsletter() : null; if ($newsletter !== null) { return; } $this->deleteTask($task); } private function deleteTask(ScheduledTaskEntity $task) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'delete task in sending queue', ['task_id' => $task->getId()] ); $queue = $task->getSendingQueue(); if ($queue) { $this->sendingQueuesRepository->remove($queue); } $this->scheduledTaskSubscribersRepository->deleteByScheduledTask($task); $this->scheduledTasksRepository->remove($task); $this->scheduledTasksRepository->flush(); } private function endSending(ScheduledTaskEntity $task, NewsletterEntity $newsletter): void { // We should handle all transitions into these states in the processSending method and end processing there or we throw an exception // This might theoretically happen when multiple cron workers are running in parallel which we don't support and try to prevent $unexpectedStates = [ ScheduledTaskEntity::STATUS_PAUSED, ScheduledTaskEntity::STATUS_INVALID, ScheduledTaskEntity::STATUS_SCHEDULED, ]; if (in_array($task->getStatus(), $unexpectedStates)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'Sending task reached end of processing in sending queue worker in an unexpected state.', ['task_id' => $task->getId(), 'status' => $task->getStatus()] ); return; } // The task is running but there is no one to send to. // This may happen when we send to all but the execution is interrupted (e.g. by PHP time limit) and we don't update the task status // or if we trigger sending to a newsletter without any subscriber (e.g. scheduled for long time but all were deleted) // Lets set status to completed and update the queue counts if ($task->getStatus() === null && $this->scheduledTaskSubscribersRepository->countUnprocessed($task) === 0) { $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); $queue = $task->getSendingQueue(); if ($queue) { $this->sendingQueuesRepository->updateCounts($queue); } $this->scheduledTasksRepository->flush(); } // Task is completed let's do all the stuff for the completed task if ($task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'completed newsletter sending', ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); $this->newsletterTask->markNewsletterAsSent($newsletter); $this->statsNotificationsScheduler->schedule($newsletter); } } }
Fatal error: Uncaught Error: Class "MailPoet\Cron\Workers\SendingQueue\SendingQueue" not found in /htdocs/wp-content/plugins/mailpoet/lib/Util/Notices/EmailWithInvalidSegmentNotice.php:13 Stack trace: #0 /htdocs/wp-content/plugins/mailpoet/generated/FreeCachedContainer.php(2640): MailPoet\Util\Notices\PermanentNotices->__construct(Object(MailPoet\WP\Functions), Object(MailPoet\Settings\TrackingConfig), Object(MailPoet\Subscribers\SubscribersRepository), Object(MailPoet\Settings\SettingsController), Object(MailPoet\Util\License\Features\Subscribers), Object(MailPoet\Config\ServicesChecker), Object(MailPoet\Mailer\MailerFactory), Object(MailPoet\Util\Notices\SenderDomainAuthenticationNotices)) #1 /htdocs/wp-content/plugins/mailpoet/vendor-prefixed/symfony/dependency-injection/Container.php(122): MailPoetGenerated\FreeCachedContainer->getInitializerService() #2 /htdocs/wp-content/plugins/mailpoet/vendor-prefixed/symfony/dependency-injection/Container.php(110): MailPoetVendor\Symfony\Component\DependencyInjection\Container->make('MailPoet\\Config...', 1) #3 /htdocs/wp-content/plugins/mailpoet/lib/DI/ContainerWrapper.php(39): MailPoetVendor\Symfony\Component\DependencyInjection\Container->get('MailPoet\\Config...') #4 /htdocs/wp-content/plugins/mailpoet/mailpoet_initializer.php(89): MailPoet\DI\ContainerWrapper->get('MailPoet\\Config...') #5 /htdocs/wp-content/plugins/mailpoet/mailpoet.php(194): require_once('/htdocs/wp-cont...') #6 /htdocs/wp-settings.php(526): include_once('/htdocs/wp-cont...') #7 /htdocs/wp-config.php(85): require_once('/htdocs/wp-sett...') #8 /htdocs/wp-load.php(50): require_once('/htdocs/wp-conf...') #9 /htdocs/wp-blog-header.php(13): require_once('/htdocs/wp-load...') #10 /htdocs/index.php(17): require('/htdocs/wp-blog...') #11 {main} thrown in /htdocs/wp-content/plugins/mailpoet/lib/Util/Notices/EmailWithInvalidSegmentNotice.php on line 13