33K Views

Twitter API: Making the best out of Streaming API

Recently I had come across the task of implementing the Twitter Streaming APIs to process Twitter Feeds (Tweets) in real-time. More specifically, the application to be build had to be able to listen for tweets ranging from multiple topics and users (we’ll refer to them as tracks), and should have been able to hot-swap these tracks almost instantaneously as well.

Not long before working on this application, I had to accomplish the same task in the context of Facebook. As opposed to Twitter, which makes use of their Streaming APIs to provide applications with real-time updates, Facebook instead uses of Webhooks. Webhooks, being a HTTP callback, provide developers with a level of sophistication that is much easier to deal with. The Streaming APIs provided by Twitter, however, requires your application to maintain a persistent HTTP connection with one of Twitter’s many streaming endpoints. Writing an application which relies on such a persistent connection for it’s functioning requires us to think about it in a slightly different manner than what we’re supposedly used to while building a traditional web application.

The entire focus of this article will be on developing such an application.

The Twitter Streaming APIs

If you take a look at Twitter’s Developer Documentation, you’ll see that they provide us with Streaming APIs. Of course, they also provide us with REST and Ads APIs as well, but for our intended purposes, we will only be focusing on their Streaming APIs.

So what exactly are the Streaming APIs? To put it in simple terms, using the Streaming APIs, your application can establish a low latency connection with Twitter’s streaming endpoints. Once a connection has been established successfully, Twitter will send forward any real-time feed your application has requested for down this connection pipeline almost instantaneously. According to the docs:

The Streaming APIs give developers low latency access to Twitter’s global stream of Tweet data. A proper implementation of a streaming client will be pushed messages indicating Tweets and other events have occurred, without any of the overhead associated with polling a REST endpoint.

The Streaming APIs requires the said connection to be persistent (essentially never-ending) in nature. In essence, it’s similar to downloading an infinitely large file  —  your system will keep on receiving data unless you manually terminate the connection.

The kind of feeds your application receives will depend upon the kind of streams you will be using. The Streaming APIs provides developers with three different kinds of streams:

  • Public Streams – Streams of the public data flowing through Twitter. Suitable for following specific users or topics, and data mining.
  • User Streams – Single-user streams, containing roughly all of the data corresponding with a single user’s view of Twitter.
  • Site Streams – The multi-user version of user streams. Site streams are intended for servers which must connect to Twitter on behalf of many users.

Like I said earlier, the feeds we receive will depend upon the kind of streams we will be using. So before we go any further, let’s take a look at the requirements of our application, depending on which we’ll be using either Public Streams or User Streams (Site Streams are in a closed beta).

Our application should satisfy the following requirements:

  • Listen to live tweets for the tracks we have requested
  • Be able to process the received tweets without interrupting the connection with the streaming endpoint
  • Be able to switch these tracks as they are being changed with least amount of interruption to the said connection

Keeping these requirements in mind, we now have to settle upon the streams we will be using. User streams, as stated earlier, provides data from an individual user’s point of view. According to Twitter’s documentation for User Streams:

User Streams provide a stream of data and events specific to the authenticated user. Note that User Streams are not intended for server-to-server connections. If you need to make connections on behalf of multiple users from the same machine, consider using site streams.

Minimize the number of connections your application makes to User Streams. Each Twitter account is limited to only a few simultaneous User Streams connections per OAuth application, regardless of IP. Once the per-application limit is exceeded, the oldest connection will be terminated. An account logging in from too many instances of the same OAuth application will cycle connections as the application instances reconnect and disconnect each other.

Since we’re more focused on being able to receive tweets from numerous users, it isn’t clear as to how many users that will be. Going by the docs, User Streams won’t cut it for our application’s use-case. So now we’re left to make do with Public Streams.

As opposed to User Streams, which allows us to receive feeds in context on limited individual users, Public Streams provide applications with a stream of public data flowing through Twitter. Speaking in context of tweets specifically, if we use public streams smartly, our application won’t be bound to scope of limited individual users in contrast to User Streams. Since we are not necessarily concerned about private stream of data, we can pass along Twitter Handles (@username) corresponding to individual Twitter Users along as tracks prior to establishing a connection with the public streaming endpoint. This will enable our application to listen for live public tweets from multiple users. A quick point to notice before we go any further, as per the Twitter Documentation for Public Streams, our application can only establish a single connection with a public streams at any given given time.

Communicating with the Streaming API: Building our application

Now that we are done with all the introductory bits, we can get started on building our application. I’ll be developing the application in PHP running on Linux, using the popular Symfony Framework and Composer as package manager. That being said, you should be able to apply the fundamental concepts and implement the project in any language of your choice.

I will, however, be using Phirehose by Fennb, a third-party library to ease the process of communicating with the Twitter Streaming APIs. If you’re developing on platform other than PHP, here’s an extensive list of third-party libraries you can make use of. You can add Phirehose to your project via Composer using the following command:

php composer require “fennb/phirehose”

Setting up our application:

Create a new Symfony project (let’s call it twitterFeeds) by executing the following command:

symfony new twitterFeeds

Before we move along with the article, make sure you’ve installed the third-party library as mentioned above. Next, create a new bundle (let’s call this TwitterBundle) by executing the following command:

php bin/console generate:bundle

Follow along with whatever configuration details is asked from you. Next, go to your TwitterBundle directory in your project and create a new Services directory. In this directory, create a new file named TwitterService.php which we will be using. Make sure to register this service for your application.

Establishing a persistent connection:

Like I said before, writing an application which relies on a persistent connection for it’s functioning requires us to think about it in a slightly different manner. Unlike in the case of using REST APIs, where we make API calls, or Webhooks, where a callback is made to our servers, with Streaming APIs, we are establishing a persistent connection with a streaming endpoint which then lives on forever (ideally speaking), and process whatever data we receive on our end through this medium. We terminate this connection only in the cases when we’ll need to restart it in order to update the tracks. So how do we go about establishing such said connection? One way is to make use of Background Processes.

Lets say that we have to listen for live tweets, we then need to establish a persistent connection with Twitter’s public streaming endpoints. To do so, we can execute a background process via the terminal. This background process will run as an individual process, and will establish the said persistent connection our application requires. We can then track the process ID (PID) of this background process if we need to alter it in any way.

To make our background process, we’ll create a console command which we can then execute from terminal at the root directory of our project. To make our command, we can utilise the Console Component provided by the Symfony Framework. First create a Command directory in your TwitterBundle, and then create TwitterCommand.php file in that directory.

/**
 * Webkul Software.
 *
 * @category	Webkul, Uvdesk
 * @package		Webkul_UVDesk_TF
 * @author		Akshay Kumar
 * @copyright	Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com)
 * @license 	https://store.webkul.com/license.html
 */

namespace TwitterBundle\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;

class TwitterCommand extends ContainerAwareCommand
{
	protected function configure()
	{
		$this->setName('twitter:stream')->setDescription('Listen for Live Twitter Feeds.');
	}

	protected function execute(InputInterface $input, OutputInterface $output)
	{
		$twitterTracks = ['#ReasonsToLoveMe']; // Define your tracks you wish to receive updates for
		$this->getContainer()->get('twitter.service')->listenForTweets($twitterTracks); // This will start the streaming process
	}
}

From the root directory of our project, we can now execute the code written inside the execute() function by using the following command in the terminal:

php bin/console twitter:stream

As we can see, the TwitterCommand::execute() function will in-turn call the TwitterCommand::listenForTweets() function defined in the TwitterService, which will establish a new persistent connection with Twitter’s streaming endpoint. So let’s code this up next. In your TwitterService.php which we created earlier,

/**
 * Webkul Software.
 *
 * @category	Webkul, Uvdesk
 * @package		Webkul_UVDesk_TF
 * @author		Akshay Kumar
 * @copyright	Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com)
 * @license 	https://store.webkul.com/license.html
 */

namespace TwitterBundle\Services;

const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY;
const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET;
const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN;
const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET;

// Include the third-party library
require_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php';

class TwitterListener extends \OauthPhirehose
{
    public function __construct() {
        parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER);
        $this->consumerKey = CONSUMER_KEY;
        $this->consumerSecret = CONSUMER_SECRET;
    }

    public function enqueueStatus($status)
    {   
        $twitterFeed = json_decode($status, true);
        if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) {
            $twitterData = array('feed' => $twitterFeed);
            dump('New Feed Recieved');
            dump($twitterData);
        }
    }
}

class TwitterService
{
	/**
	 * This function will establish a persistent connection with one of the twitter's streaming endpoint.
	 * @param  array  $twitterTrackCollection Collection of tracks which we'll receive updates for
	 */
	public function listenForTweets(array $twitterTrackCollection = [])
    {
    	// Terminate the process if there are no active tracks
    	if (empty($twitterTrackCollection)) {
    		echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL;
    		exit(0);
    	}

    	$twitterListener = new TwitterListener();
        $twitterListener->setTrack($twitterTrackCollection);
        $twitterListener->consume();
    }
}

Now if you execute the command we just created from your terminal, a new connection with Twitter’s Streaming API will be established and you’ll start receiving public tweets for the tracks you’ve defined. At the time of writing this article, “#ReasonsToLoveMe” was trending so I used that as an example track. Now every time a tweet is made which consists of any of our specified tracks (in this case, “#ReasonsToLoveMe”), that tweet will be passed along down the connection pipeline. This tweet will then be passed along to the TwitterListener::enqueueStatus() function, where you can process the received feed. However, you should avoid doing any time-intensive operation at this point. Reason being that if your application is unable to process this stream of data fast enough in contrast to the rate you’re receiving the feeds at, you risk having your connection terminated, which for the most us would be undesirable.

Real-Time Twitter Feeds

Real-Time Twitter Feeds

Till now, we have been manually starting the connection by going to our terminal and executing the command. But since real world applications are a lot different, let’s automate this process of starting and closing the connection. In our DefaultController, let’s write an action handler (we’ll call this startStreamAction), which when executed will initiate our streaming process. Make sure you’ve configured the route for this handler in the corresponding routing.yml file. In DefaultController.php,

/**
 * Webkul Software.
 *
 * @category	Webkul, Uvdesk
 * @package		Webkul_UVDesk_TF
 * @author		Akshay Kumar
 * @copyright	Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com)
 * @license 	https://store.webkul.com/license.html
 */

namespace TwitterBundle\Controller;

use Symfony\Bundle\FrameworkBundle\Controller\Controller;

class DefaultController extends Controller
{
    // .....

    /**
     * This handler will switch to the root directory and start a new background process
     */
    public function startStreamAction(Request $request)
    {
        chdir('../');
        $command = 'php bin/console twitter:listen';
        exec($command);
        exit(0);
    }

    // .....
}

Now every time we visit the route for this handler, this command will be executed separately in a new thread.

With Public Streams, we only receive public feeds for the tracks we’ve defined prior to establishing a persistent connection with the streaming endpoint. So if a need arises to update our tracks, we will need to terminate the previous connection, re-define our tracks, and then re-establish a new connection with the streaming endpoint. This process is very critical since we can have only one connection with the streaming endpoint at a time (we risk having our IP banned with multiple connections). So we need to make sure that before attempting to create a new connection, all previously held connections are closed.

To close a connection with the streaming endpoint, we can terminate the background process corresponding to the connection. To kill the process, we can either make use of it’s process ID (PID), or look it up using the name and arguments of the background process. Using the process ID, every time we establish a new connection, we’ll need to log (database, text file, etc…) the process ID corresponding to that background process. Using this log, you can then terminate any previously held background processes before initiating a new one. Let’s update our startStreamAction handler to log the process ID every time a connection is made, and in our DefaultController, create a new action handler (lets call this stopStreamAction) to terminate any previously held background processes. We can then call this handler before we start a new background process to ensure that only one instance of a persistent connection with the streaming endpoint is active at a time.

/**
 * Webkul Software.
 *
 * @category	Webkul, Uvdesk
 * @package		Webkul_UVDesk_TF
 * @author		Akshay Kumar
 * @copyright	Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com)
 * @license 	https://store.webkul.com/license.html
 */

namespace TwitterBundle\Controller;

use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;

class DefaultController extends Controller
{
    // ......

    /**
     * This handler will first terminate any previously held background process  
     * by firing the stopStreamAction() handler. Then it'll create a new background  
     * process which establish a new connection with a streaming endpoint.
     */
    public function startStreamAction(Request $request)
    {
    	// Execute the stopStreamAction() handler
        $curlHandler = curl_init();
        $stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL);
        curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath);
        curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1);
        curl_exec($curlHandler);

        // Create a new background process
        chdir('../');
        $output = array();
        $command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!';
        exec($command, $output);
        $processId = (int) $output[0]; // Process ID of the newly created background process. Log this.
        exit(0);
    }

    /**
     * This handler will fetch a log of Process IDs corresponding to any background
     * process and terminate them.
     */
    public function stopStreamAction(Request $request)
    {
        $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE;

        if (!empty($activeProcesses)) {
            foreach ($activeProcesses as $processId) {
                $command = 'kill ' . $processId . ' 2>&1';
                exec($command, $output);
                $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId);
            }
        }
        exit(0);
    }

    // ......
}

If you look carefully, we modified the command that we’re gonna execute from the terminal using the exec() function.

nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!

All those extra bits and pieces will return the process ID of the background process created via the exec() function. Now every time a new background process is created following this approach, all of our previously held background process will be terminated (if any). This should be more than enough to ensure that only one connection is active at a time, provided you properly log their process IDs. You can improve this functionality according to your needs.

All that is left now is to process the stream of data that we’ll be receiving through the connection pipeline. To do so, let’s make improvement to our Twitter Service (TwitterService.php) first, and then redirect any tweets that our application receives to a handler (let’s call this processStreamAction) that will in-turn process these tweets. In your TwitterService.php,

/**
 * Webkul Software.
 *
 * @category	Webkul, Uvdesk
 * @package		Webkul_UVDesk_TF
 * @author		Akshay Kumar
 * @copyright	Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com)
 * @license 	https://store.webkul.com/license.html
 */

namespace TwitterBundle\Services;

const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY;
const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET;
const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN;
const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET;

require_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php';

class TwitterListener extends \OauthPhirehose
{
    private $curlHandler = null;
    private $processStreamPath = null;

    public function __construct() {
        parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER);
        $this->processStreamPath = URL_TO_PROCESS_STREAM_ACTION_HANDLER;
        $this->consumerKey = CONSUMER_KEY;
        $this->consumerSecret = CONSUMER_SECRET;
        $this->configureCURL();
    }

    private function configureCURL()
    {
        $this->curlHandler = curl_init();
        $headers = array('Content-type: multipart/form-data');
        curl_setopt($this->curlHandler, CURLOPT_POST, true);
        curl_setopt($this->curlHandler, CURLOPT_URL, $this->processStreamPath);
        curl_setopt($this->curlHandler, CURLOPT_RETURNTRANSFER, 1);
    }

    public function enqueueStatus($status)
    {   
        $twitterFeed = json_decode($status, true);
        if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) {
            $twitterData = array('feed' => $twitterFeed);
            curl_setopt($this->curlHandler, CURLOPT_POSTFIELDS, http_build_query($twitterData));
            curl_exec($this->curlHandler);
        }
    }
}

class TwitterService
{
	/**
	 * This function will establish a persistent connection with one of the twitter's streaming endpoint.
	 * @param  array  $twitterTrackCollection Collection of tracks which we'll receive updates for
	 */
	public function listenForTweets(array $twitterTrackCollection = [])
    {
    	// Terminate the process if there are no active tracks
    	if (empty($twitterTrackCollection)) {
    		echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL;
    		exit(0);
    	}

    	$twitterListener = new TwitterListener();
        $twitterListener->setTrack($twitterTrackCollection);
        $twitterListener->consume();
    }
}

Now every time a twitter feed is received, TwitterListener::enqueueStatus() will forward the feed to the route we’ve specified using CURL by making a POST request. In essence, this is exactly like a Webhook if you think about it. Now let’s write the processStreamAction() handler in our DefaultController. In your DefaultController.php,

/**
 * Webkul Software.
 *
 * @category	Webkul, Uvdesk
 * @package		Webkul_UVDesk_TF
 * @author		Akshay Kumar
 * @copyright	Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com)
 * @license 	https://store.webkul.com/license.html
 */

namespace TwitterBundle\Controller;

use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;

class DefaultController extends Controller
{
    /**
     * This handler will first terminate any previously held background process  
     * by firing the stopStreamAction() handler. Then it'll create a new background  
     * process which establishes a new connection with a streaming endpoint.
*/
public function startStreamAction(Request $request)
{
// Execute the stopStreamAction() handler
$curlHandler = curl_init();
$stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL);
curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath);
curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1);
curl_exec($curlHandler);
// Create a new background process
chdir('../');
$output = array();
$command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!';
exec($command, $output);
$processId = (int) $output[0]; // Process ID of the newly created background process. Log this.
exit(0);
}
 /**
 * This handler will fetch a log of Process IDs corresponding to any background
 * process and terminate them.
 */
 public function stopStreamAction(Request $request)
{
 $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE;
 if (!empty($activeProcesses)) {
 foreach ($activeProcesses as $processId) {
 $command = 'kill ' . $processId . ' 2>&1';
 exec($command, $output);
 $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId);
}
}
 exit(0);
}
 /**
 * A POST request will be made to this handler whenever our application receives
 * a feed. You can write your implementation here.
 */
 public function processStreamAction(Request $request)
{
 if (isset($_POST)) {
 if (array_key_exists('feed', $_POST)) {
 // Real-time Twitter Feed. Write your own implementation over here.
 $streamContent = $_POST['feed'];
 }
 } else {
 // Request method not supported.
}
 exit(0);
}
}

There you go! Now we have a fully functional application which will not only listen for live feeds from Twitter for the specified tracks, but also allows us to update our tracks almost instantaneously, while making sure that only one connection with the streaming endpoint is active at a moment. You can extend this application to your liking and make it more robust and fault tolerant. Here at UVDesk, we follow a similar approach to provide our Twitter Users with better user-experience and social connectivity.

Category(s) UVdesk
. . .

Comment

Add Your Comment

Be the first to comment.

css.php