skip to Main Content

I need to establish communication in PHP with a websocket server that is hosted on a node. The first part is to establish a connection to listen for responses from the ws server. The second is to be able to send my own commands and track responses in the listener from the first part. I am using the symfony project and the ratchet/pawl package.

I have created such a client to establish a connection and send messages:

<?php

namespace AppWebSocket;

use RatchetClientWebSocket;
use RatchetClientConnector;
use ReactEventLoopLoop;
use PsrLogLoggerInterface;

class WebsocketClient
{
    private $connection;
    private $loop;
    private $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->loop = Loop::get();
        $this->logger = $logger;
    }

    public function connect(string $url)
    {
        $connector = new Connector($this->loop);

        $connector($url)->then(function (WebSocket $conn) {
            $this->connection = $conn;
            dump('Connected to WebSocket');

            $conn->on('message', function ($msg) {
                dump("Received message: {$msg}");
                $this->onMessage($msg);
            });

            $conn->on('close', function ($code = null, $reason = null) {
                dump('Connection closed ({$code} - {$reason})');
            });
        }, function (Exception $e) {
            dump("Could not connect: {$e->getMessage()}");
        });
    }

    public function send(string $message)
    {
        if ($this->connection) {
            $this->connection->send($message);
        } else {
            echo "No connection established.n";
        }
    }

    private function onMessage($message)
    {
        $this->logger->info("Received message: {$message}");
    }

    public function run()
    {
        $this->loop->run();
    }

    public function close()
    {
        if ($this->connection) {
            $this->connection->close();
        }
    }
}

I also created a symfony command that sets up the ws server listening:

<?php

namespace AppCommand;

use AppWebSocketWebsocketClient;
use SymfonyComponentConsoleCommandCommand;
use SymfonyComponentConsoleInputInputInterface;
use SymfonyComponentConsoleOutputOutputInterface;
use SymfonyComponentConsoleStyleSymfonyStyle;

class WebsocketListenCommand extends Command
{
    protected static $defaultName = 'app:listen-websocket';
    private $webSocketClient;

    public function __construct(WebsocketClient $webSocketClient)
    {
        parent::__construct();
        $this->webSocketClient = $webSocketClient;
    }

    protected function configure(): void
    {
        $this
            ->setDescription('Listens for and processes messages from WebSocket');
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);

        $io->success('Listening for WebSocket messages...');

        $this->webSocketClient->connect('ws://x.x.x.x:y');

        $this->webSocketClient->run();

        return Command::SUCCESS;
    }

}

After running the above command, everything works fine. It receives responses in real time.

I would then like to send messages to the websocket server and track the response (while keeping the listening I set up earlier). It injects the service anywhere in the code and tries to send a message:

$this->webSocketClient->send('{"some json, doesnt matter"}');

When trying to use the send method, it doesn’t see any connection and can’t send the message. I have tried setting up the connection myself during send, but then my listener stops responding. Where does the problem lie? Perhaps ZeroMQ or some other solution needs to be used here? I would appreciate your help.

2

Answers


  1. Chosen as BEST ANSWER

    Ok it looks like using ZeroMQ has solved the problem. It saves the collated WS connection, which I can then use anywhere in the code as I wanted.

    I created a pusher class with a method to distribute messages to all collated connections (onOpen):

    class Pusher implements MessageComponentInterface
    {
    protected SplObjectStorage $clients;
    ...
    public function onOpen($conn): void
    {
        // Store the new connection to send messages to later
        $this->clients->attach($conn);
    
        dump("New connection!");
    }
    ...
    public function onPushingOutMessage($entry): void
    {
        if (is_array($entry)) {
            $entry = json_encode($entry);
        }
    
        /** @var WebSocket $client */
        foreach ($this->clients as $client) {
            $client->send($entry);
        }
    
        dump("Pushed message $entry to all connected clients");
    }
    ...
    

    I have added a ZeroMQ class that connects to WS:

    class ZeroMQTransport
    {
        // Name of method that MessageComponent interfaces should implement to handle pushed updates to websocket clients
        public const string PUSH_HANDLER = 'onPushingOutMessage';
        public const string DSN = 'tcp://127.0.0.1:XXXX';// XXXX - TCP port
    
        private ZMQSocket $socket;
    
        public function __construct(private readonly PayloadBuilder $messageBuilder)
        {
            $context = new ZMQContext();
            $socket = $context->getSocket(ZMQ::SOCKET_PUSH, static::PUSH_HANDLER);
            $socket->connect(static::DSN);
    
            $this->socket = $socket;
        }
    
        public function send(MessageInterface $message): void
        {
            $payload = $this->messageBuilder->build($message);
            try {
                $this->socket->send($payload);
            } catch (ZMQSocketException $e) {
                echo $e->getMessage();
            }
        }
    
        public function __destruct()
        {
            $this->socket->disconnect(self::DSN);
        }
    
        private function __clone() {}
    
        public function __wakeup()
        {
            throw new Exception("Cannot unserialize singleton");
        }
    }
    

    An updated connect method in the WS client:

    public function connect(): void
    {
        // Listen for the web server to make a ZeroMQ push
        $context = new Context($this->loop);
        $pull = $context->getSocket(ZMQ::SOCKET_PULL);
        try {
            $pull->bind(ZeroMQTransport::DSN);
        } catch (ZMQSocketException $e) {
            $this->logger->error('Error in initializing the socket listener: ' . $e->getMessage() . "nStack: " . $e->getTraceAsString());
        }
    
        $pull->on('message', function ($payload) {
            $this->pusher->onPushingOutMessage($payload);
        });
    
        $connector = new Connector($this->loop);
    
        $connector($this->url)->then(function (WebSocket $conn) {
            $this->logger->info("Connected to WebSocket");
            $conn->on('message', function ($msg) {
                dump("Received message: $msg");
                $this->logger->info("Received message: $msg");
                $this->onMessage($msg);
            });
    
            $conn->on('error', function (Exception $exception) use ($conn) {
                $this->logger->error("Connection error");
                $this->pusher->onError($conn, $exception);
            });
    
            $conn->on('close', function ($code = null, $reason = null) use ($conn) {
                $this->logger->error("Connection closed ($code - $reason)");
                $this->pusher->onClose($conn);
            });
    
            $this->connection = $conn;
            $this->pusher->onOpen($conn);
        }, function (Exception $e) {
            $this->logger->critical("Could not connect: {$e->getMessage()}");
        });
    }
    

    Now I can send a WS message and catch the response in my listener:

    $this->zeroMQTransport->send($message);
    

  2. It injects the service anywhere in the code

    You’re getting a different PHP object for each instantiation. So, unless you’re explicitly calling connect() with the new object, you’re not getting a connection at all, and send() won’t possibly work.

    If you are explicitly calling connect() in the new object, then you’re getting a different connection. This may or may not be ok for your requirements, it depends what you’re trying to do.

    So, if you need to both receive and send, your best bet would probably be to have one PHP client script act as a long-lived listener. It would just sit and wait to receive messages. Then when you need to send a message, you’d create a new client object, make a new connection, send the message, and then immediately disconnect. If the receiving side needs to know which client sent the message, that information would have to be embedded in the message, because it would come from a different connection.

    Alternatively, you might be able to simply use an HTTP POST to send your messages.

    Also Alternatively, you might be able to use SSE via Mercure.

    If none of those is possible, then you’ve got a non-trivial problem to work around. Whatever data you need to build the message that you want to send will need to be somehow passed into the long-lived listener script, so that it can perform the send using the same connection that it already has open. This could be done with a lightweight message queue, so your infinite listener loop would listen for websocket messages and then check the queue for new pending sends, and send anything it finds there. This is rather roundabout though, and you should be able to make use of one of the simpler methods.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search