skip to Main Content

I am using a composer package owlycode/streaming-bird to call twitter stream API. The stream API opens a socket between your app and twitter, to receive tweets that have a specified keyword. In my case the keyword is ‘hello’.

Here is the code using owlycode/streaming-bird package:

 <?PHP
    $oauthToken = '';
    $oauthSecret = '';
    $consumerKey = '';
    $consumerSecret = '';

    $bird = new StreamingBird($consumerKey, $consumerSecret, $oauthToken, $oauthSecret);

    $bird
        ->createStreamReader(StreamReader::METHOD_FILTER)
        ->setTrack(['hello']) // Fetch every tweet containing one of the following words
        ->consume(function ($tweet) { // Now we provide a callback to execute on every received tweet.
            echo '------------------------' . "n";
            echo $tweet['text'] . "n";
        });
  ?>

My problem is when this connection is closed by error, I am unable to know that. So I am unable to reconnect with twitter again.

Is there anything in PHP that searches open sockets based on their domain name?

Maybe something like

  check_if_socket_open('https://stream.twitter.com/1.1/statuses/firehose.json')

?

Note: I cannot use socket_get_status, because I don’t have the socket variable.

3

Answers


  1. It looks like you can use socket_get_status after all if you just make a small addition in the package itself.

    These two functions are in streamreader class, socket handler is available here.

    public function consume(callable $handler)
    {
        $this->running = true;
        while ($this->running) { /// while $this->running is true socket will try to reconnect always. 
            $this->consumeOnce($handler);
        }
    }
        protected function consumeOnce(callable $handler)
        {
            $this->connection = $this->connect();
            $lastStreamActivity = time();
            $this->connection->read(function ($tweet) use (&$lastStreamActivity, $handler) {
                $idle = (time() - $lastStreamActivity);
                $this->monitor->stat('max_idle_time', $idle);
                $this->monitor->stat('idle_time', $idle);
                $this->monitor->stat('tweets', 1);
                $lastStreamActivity = time();
                call_user_func($handler, $tweet, $this->monitor);
            });
            $this->connection->close();
        }
    

    In connection class you have the socket handler available so you can grab socket status when trying to read data from socket. Below is a slightly modified read function

    public function read(callable $callback, $timeout = 5)
            {
                $this->pool = [$this->connection];
        stream_set_timeout($this->connection, $timeout);
        $info = stream_get_meta_data($this->connection);
                while ($this->connection !== null && !feof($this->connection) && stream_select($this->pool, $fdw, $fde, $timeout) !== false && $info['timed_out']!==true) {
                    // @todo safeguard no tweets but connection OK. (reconnect)
                    $this->pool = [$this->connection];
                    $chunkInfo = trim(fgets($this->connection));
                    if (!$chunkInfo) {
                        continue;
                    }
                    $len = hexdec($chunkInfo) + 2;
                    $streamInput = '';
                    while (!feof($this->connection)) {
                        $streamInput .= fread($this->connection, $len-strlen($streamInput));
                        if (strlen($streamInput)>=$len) {
                            break;
                        }
                    }
                    $this->buffer .= substr($streamInput, 0, -2);
                    $data = json_decode($this->buffer, true);
                    if ($data) {
                        call_user_func($callback, $data);
                        $this->buffer = '';
                    }
                }
            }
    
    Login or Signup to reply.
  2. There is no way to check socket status, if you’ve no access to the socket.

    If you’re searching for a workaround without touching StreamBird’s code, then you can make a class based on OwlyCodeStreamingBird, then implement its connect method:

    <?php
    class MyStreamReader extends OwlyCodeStreamingBird
    {
      protected $stream;
    
      protected function connect($timeout = 5, $attempts = 10)
      {
        return $this->stream = parent::connect($timeout, $attempts);
      }
    
      protected function isConnected() 
      {
        return $this->stream && stream_get_meta_data($this->stream)['eof'];
      }
    }
    
    
    class MyStreamingBird extends OwlyCodeStreamingBird
    {
      public function createStreamReader($method)
      {
        $oauth = new OwlyCodeStreamingBirdOauth($this->consumerKey,
          $this->consumerSecret, $this->oauthToken, $this->oauthSecret);
        return new MyStreamReader(new OwlyCodeStreamingBirdConnection(), $oauth, $method);
      }
    }
    
    
    $bird = new MyStreamingBird($consumerKey, $consumerSecret, $oauthToken, $oauthSecret);
    $reader = $bird->createStreamReader(StreamReader::METHOD_FILTER); // ...
    
    $reader->isConnected();
    

    You can also make a class based on OwlyCodeStreamingBird, which has access to the stream as well. However, you’ll have to keep track of these streams, because it’s a factory method.

    Login or Signup to reply.
  3. Looking at the implementation of class StreamingBird, you see you can easily create the streamreader instance yourself, with full control over the connection:

    namespace OwlyCodeStreamingBird;
    // Let's instantiate the Oauth signature handler
    $oauth = new Oauth($consumerKey, $consumerSecret, $oauthToken, $oauthSecret);
    // Let's create our own Connection object!
    $connection = new Connection();
    
    // And here comes our Reader
    $reader = new StreamReader($connection, $oauth, StreamReader::METHOD_FILTER);
    $reader->setTrack(['hello'])
           ->consume(function ($tweet) {
                echo '------------------------' . "n";
                echo $tweet['text'] . "n";
           });
    
    // Voilà
    print_r(socket_get_status($connection->connection));
    

    The Connection object stores the socket resource in a public property $connection:

    public $connection;
    // ... 
    @$this->connection = fsockopen($host, $port, $errNo, $errStr, $timeout);
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search