Recently I had come across the task of implementing the Twitter Streaming APIs to process Twitter Feeds (Tweets) in real-time. More specifically, the application to be built had to be able to listen for tweets ranging from multiple topics and users (we’ll refer to them as tracks). It should have been able to hot-swap these tracks almost instantaneously as well. Not long before working on this application, I had to accomplish the same task in the context of Facebook. As opposed to Twitter, which makes use of its Streaming APIs to provide applications with real-time updates, Facebook instead uses Webhooks. Webhooks, being an HTTP callback, provide developers with a level of sophistication that is much easier to deal with. The Streaming APIs provided by Twitter, however, require your application to maintain a persistent HTTP connection with one of Twitter’s many streaming endpoints. Writing an application that relies on such a persistent connection for its functioning requires us to think about it in a slightly different manner than what we suppose to use while building a traditional web application. The entire focus of this article will be on developing such an application. The Twitter Streaming APIs If you take a look at Twitter’s Developer Documentation, you’ll see that they provide us with Streaming APIs. They also provide us with REST and Ads APIs as well, but for our intended purposes. We will only be focusing on their Streaming APIs. So what exactly are the Streaming APIs? To put it in simple terms, using the Streaming APIs, your application can establish a low latency connection with Twitter’s streaming endpoints. Once a connection establishes successfully, Twitter will send forward any real-time feed your application has requested for down this connection pipeline almost instantaneously. According to the docs: The Streaming APIs give developers low latency access to Twitter’s global stream of Tweet data. A proper implementation of a streaming client will push messages indicating Tweets and other events have occurred, without any of the overhead associated with polling a REST endpoint. The Streaming APIs require the said connection to be persistent (essentially never-ending) in nature. In essence, it’s similar to downloading an infinitely large file — your system will keep on receiving data unless you manually terminate the connection. The kind of feeds your application receives will depend upon the kind of streams you will be using. The Streaming APIs provides developers with three different kinds of streams: Public Streams – Streams of the public data flowing through Twitter. Suitable for following specific users or topics, and data mining. User Streams – Single-user streams, containing roughly all of the data corresponding with a single user’s view of Twitter. Site Streams – The multi-user version of user streams. Site streams intend for servers that must connect to Twitter on behalf of many users. Like I said earlier, the feeds we receive will depend upon the kind of streams we will be using. So before we go any further, let’s take a look at the requirements of our application. Depending on which we’ll be using either Public Streams or User Streams (Site Streams are in closed beta). Our application should satisfy the following requirements: Listen to live tweets for the tracks we have requested Be able to process the received tweets without interrupting the connection with the streaming endpoint Be able to switch these tracks as they are being changed with the least amount of interruption to the said connection Keeping these requirements in mind, we now have to settle upon the streams we will be using. User streams, as stated earlier, provide data from an individual user’s point of view. According to Twitter’s documentation for User Streams: User Streams provide a stream of data and events specific to the authenticated user. Note that User Streams are not intend for server-to-server connections. If you need to make connections on behalf of multiple users from the same machine, consider using site streams. Minimize the number of connections your application makes to User Streams. Each Twitter account is limited to only a few simultaneous User Streams connections per OAuth application, regardless of IP. Once the per-application limit exceeds, the oldest connection will terminates. An account logging in from too many instances of the same OAuth application will cycle connections as the application instances reconnect and disconnect each other. Since we’re more focused on being able to receive tweets from numerous users, it isn’t clear how many users that will be. Going by the docs, User Streams won’t cut it for our application’s use-case. So now we’re left to make do with Public Streams. As opposed to User Streams, which allows us to receive feeds in context on limited individual users. Public Streams provide applications with a stream of public data flowing through Twitter. Speaking in the context of tweets specifically, if we use public streams smartly, our application won’t be bound to the scope of limited individual users in contrast to User Streams. We are not necessarily concerned about the private streams of data. We can pass along Twitter Handles (@username) corresponding to individual Twitter Users along as tracks before establishing a connection with the public streaming endpoint. This will enable our application to listen to live public tweets from multiple users. A quick point to notice before we go any further, as per the Twitter Documentation for Public Streams. Our application can only establish a single connection with a public stream at any given time. Communicating with the Streaming API: Building our application Now that we are done with all the introductory bits, we can get started on building our application. I’ll be developing the application in PHP running on Linux, using the popular Symfony Framework and Composer as a package manager. You should be able to apply the fundamental concepts and implement the project in any language of your choice. I will, however, be using Phirehose by Fennb, a third-party library to ease the process of communicating with the Twitter Streaming APIs. If you’re developing on a platform other than PHP, here’s an extensive list of third-party libraries you can make use of. You can add Phirehose to your project via Composer using the following command: PHP composer require “fennb/phirehose” Setting up our application: Create a new Symfony project (let’s call it Twitter feeds) by executing the following command: symfony new twitterFeeds Before we move along with the article, make sure you’ve installed the third-party library as mentioned above. Next, create a new bundle (let’s call this TwitterBundle) by executing the following command: PHP bin/console generate:bundle Follow along with whatever configuration details are asked of you. Next, go to your TwitterBundle directory in your project and create a new Services directory. In this directory, create a new file named TwitterService.php which we will be using. Make sure to register this service for your application. Establishing a persistent connection: Like I said before, writing an application that relies on a persistent connection for its functioning requires us to think about it in a slightly different manner. In the case of using REST APIs, where we make API calls, or Webhooks, where a callback is made to our servers, with Streaming APIs. We establish a persistent connection with a streaming endpoint which then lives on forever (ideally speaking). It processes whatever data we receive on our end through this medium. We terminate this connection only in the cases when we’ll need to restart it to update the tracks. So how do we go about establishing such said connection? One way is to make use of Background Processes. Let’s say that we have to listen for live tweets. We then need to establish a persistent connection with Twitter’s public streaming endpoints. To do so, we can execute a background process via the terminal. The background process will run as an individual process and will establish the said persistent connection our application requires. We can then track the process ID (PID) of this background process if we need to alter it in any way. To make our background process, we’ll create a console command which we can then execute from the terminal at the root directory of our project. To make our command, we can utilize the Console Component provided by the Symfony Framework. First, create a Command directory in your TwitterBundle, and then create a TwitterCommand.php file in that directory. Now let’s see the Code /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Command; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class TwitterCommand extends ContainerAwareCommand { protected function configure() { $this->setName('twitter:stream')->setDescription('Listen for Live Twitter Feeds.'); } protected function execute(InputInterface $input, OutputInterface $output) { $twitterTracks = ['#ReasonsToLoveMe']; // Define your tracks you wish to receive updates for $this->getContainer()->get('twitter.service')->listenForTweets($twitterTracks); // This will start the streaming process } } 12345678910111213141516171819202122232425262728293031 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Command; use Symfony\Component\Console\Command\Command;use Symfony\Component\Console\Input\InputInterface;use Symfony\Component\Console\Output\OutputInterface;use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class TwitterCommand extends ContainerAwareCommand{ protected function configure() { $this->setName('twitter:stream')->setDescription('Listen for Live Twitter Feeds.'); } protected function execute(InputInterface $input, OutputInterface $output) { $twitterTracks = ['#ReasonsToLoveMe']; // Define your tracks you wish to receive updates for $this->getContainer()->get('twitter.service')->listenForTweets($twitterTracks); // This will start the streaming process }} From the root directory of our project, we can now execute the code written inside the execute() function by using the following command in the terminal: PHP bin/console twitter:stream As we can see, the TwitterCommand::execute() function will, in turn, call the TwitterCommand::listenForTweets() function defined in the TwitterService, which will establish a new persistent connection with Twitter’s streaming endpoint. So let’s code this up next, in your TwitterService.php which we created earlier, /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Services; const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY; const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET; const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN; const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET; // Include the third-party library require_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php'; class TwitterListener extends \OauthPhirehose { public function __construct() { parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER); $this->consumerKey = CONSUMER_KEY; $this->consumerSecret = CONSUMER_SECRET; } public function enqueueStatus($status) { $twitterFeed = json_decode($status, true); if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) { $twitterData = array('feed' => $twitterFeed); dump('New Feed Recieved'); dump($twitterData); } } } class TwitterService { /** * This function will establish a persistent connection with one of the twitter's streaming endpoint. * @param array $twitterTrackCollection Collection of tracks which we'll receive updates for */ public function listenForTweets(array $twitterTrackCollection = []) { // Terminate the process if there are no active tracks if (empty($twitterTrackCollection)) { echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL; exit(0); } $twitterListener = new TwitterListener(); $twitterListener->setTrack($twitterTrackCollection); $twitterListener->consume(); } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Services; const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY;const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET;const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN;const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET; // Include the third-party libraryrequire_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php'; class TwitterListener extends \OauthPhirehose{ public function __construct() { parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER); $this->consumerKey = CONSUMER_KEY; $this->consumerSecret = CONSUMER_SECRET; } public function enqueueStatus($status) { $twitterFeed = json_decode($status, true); if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) { $twitterData = array('feed' => $twitterFeed); dump('New Feed Recieved'); dump($twitterData); } }} class TwitterService{ /** * This function will establish a persistent connection with one of the twitter's streaming endpoint. * @param array $twitterTrackCollection Collection of tracks which we'll receive updates for */ public function listenForTweets(array $twitterTrackCollection = []) { // Terminate the process if there are no active tracks if (empty($twitterTrackCollection)) { echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL; exit(0); } $twitterListener = new TwitterListener(); $twitterListener->setTrack($twitterTrackCollection); $twitterListener->consume(); }} Now if you execute the command we just create from your terminal, a new connection with Twitter’s Streaming API will establish. Now you’ll start receiving public tweets for the tracks you’ve defined. At the time of writing this article, “#ReasonsToLoveMe” was trending so I used that as an example track. Now every time a tweet is made that consists of any of our specified tracks (in this case, “#ReasonsToLoveMe”), that tweet will be passed along down the connection pipeline. This tweet will then be passed along to the TwitterListener::enqueueStatus() function, where you can process the received feed. However, you should avoid doing any time-intensive operations at this point. The reason is that if your application is unable to process this stream of data fast enough in contrast to the rate you’re receiving the feeds at, you risk having your connection terminates, which for most of us would be undesirable. Real-Time Twitter Feeds Till now, we have been manually starting the connection by going to our terminal and executing the command. But since real-world applications are a lot different, let’s automate this process of starting and closing the connection. In our DefaultController, let’s write an action handler (we’ll call this startStreamAction), which when executed will initiate our streaming process. Make sure you’ve configured the route for this handler in the corresponding routing.yml file. In DefaultController.php, /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Bundle\FrameworkBundle\Controller\Controller; class DefaultController extends Controller { // ..... /** * This handler will switch to the root directory and start a new background process */ public function startStreamAction(Request $request) { chdir('../'); $command = 'php bin/console twitter:listen'; exec($command); exit(0); } // ..... } 12345678910111213141516171819202122232425262728293031 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Bundle\FrameworkBundle\Controller\Controller; class DefaultController extends Controller{ // ..... /** * This handler will switch to the root directory and start a new background process */ public function startStreamAction(Request $request) { chdir('../'); $command = 'php bin/console twitter:listen'; exec($command); exit(0); } // .....} Now every time we visit the route for this handler, this command will execute separately in a new thread. With Public Streams, we only receive public feeds for the tracks. We have defined before establishing a persistent connection with the streaming endpoint. So if a need arises to update our tracks, we will need to terminate the previous connection, re-define our tracks, and then re-establish a new connection with the streaming endpoint. This process is very critical since we can have only one connection with the streaming endpoint at a time. So we need to make sure that before attempting to create a new connection, all previously held connections are close. To close a connection with the streaming endpoint, we can terminate the background process corresponding to the connection. To kill the process, we can either make use of its process ID (PID) or look it up using the name and arguments of the background process. Using the process ID, every time we establish a new connection, we’ll need to log (database, text file, etc…) the process ID corresponding to that background process. Using this log, you can then terminate any previously held background processes before initiating a new one. Let’s update our startStreamAction handler to log the process ID every time a connection is made, and in our DefaultController, create a new action handler (let’s call this stopStreamAction) to terminate any previously held background processes. We can then call this handler before we start a new background process. It ensures that only one instance of a persistent connection with the streaming endpoint is active at a time. /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Response; use Symfony\Bundle\FrameworkBundle\Controller\Controller; use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class DefaultController extends Controller { // ...... /** * This handler will first terminate any previously held background process * by firing the stopStreamAction() handler. Then it'll create a new background * process which establish a new connection with a streaming endpoint. */ public function startStreamAction(Request $request) { // Execute the stopStreamAction() handler $curlHandler = curl_init(); $stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL); curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath); curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1); curl_exec($curlHandler); // Create a new background process chdir('../'); $output = array(); $command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!'; exec($command, $output); $processId = (int) $output[0]; // Process ID of the newly created background process. Log this. exit(0); } /** * This handler will fetch a log of Process IDs corresponding to any background * process and terminate them. */ public function stopStreamAction(Request $request) { $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE; if (!empty($activeProcesses)) { foreach ($activeProcesses as $processId) { $command = 'kill ' . $processId . ' 2>&1'; exec($command, $output); $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId); } } exit(0); } // ...... } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Component\HttpFoundation\Request;use Symfony\Component\HttpFoundation\Response;use Symfony\Bundle\FrameworkBundle\Controller\Controller;use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class DefaultController extends Controller{ // ...... /** * This handler will first terminate any previously held background process * by firing the stopStreamAction() handler. Then it'll create a new background * process which establish a new connection with a streaming endpoint. */ public function startStreamAction(Request $request) { // Execute the stopStreamAction() handler $curlHandler = curl_init(); $stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL); curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath); curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1); curl_exec($curlHandler); // Create a new background process chdir('../'); $output = array(); $command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!'; exec($command, $output); $processId = (int) $output[0]; // Process ID of the newly created background process. Log this. exit(0); } /** * This handler will fetch a log of Process IDs corresponding to any background * process and terminate them. */ public function stopStreamAction(Request $request) { $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE; if (!empty($activeProcesses)) { foreach ($activeProcesses as $processId) { $command = 'kill ' . $processId . ' 2>&1'; exec($command, $output); $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId); } } exit(0); } // ......} If you look carefully, we modified the command that we’re gonna execute from the terminal using the exec() function. nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $! All those extra bits and pieces will return the process ID of the background process created via the exec() function. Now every time a new background process creates following this approach, all of our previously held background processes will terminate (if any). It ensures that only one connection is active at a time, provided you properly log their process IDs. You can improve this functionality according to your needs. All that is left now is to process the stream of data that we’ll be receiving through the connection pipeline. To do so, let’s make improvements to our Twitter Service (TwitterService.php) first, and then redirect any tweets that our application receives to a handler (let’s call this processStreamAction) that will in-turn process these tweets. In your TwitterService.php, /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Services; const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY; const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET; const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN; const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET; require_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php'; class TwitterListener extends \OauthPhirehose { private $curlHandler = null; private $processStreamPath = null; public function __construct() { parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER); $this->processStreamPath = URL_TO_PROCESS_STREAM_ACTION_HANDLER; $this->consumerKey = CONSUMER_KEY; $this->consumerSecret = CONSUMER_SECRET; $this->configureCURL(); } private function configureCURL() { $this->curlHandler = curl_init(); $headers = array('Content-type: multipart/form-data'); curl_setopt($this->curlHandler, CURLOPT_POST, true); curl_setopt($this->curlHandler, CURLOPT_URL, $this->processStreamPath); curl_setopt($this->curlHandler, CURLOPT_RETURNTRANSFER, 1); } public function enqueueStatus($status) { $twitterFeed = json_decode($status, true); if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) { $twitterData = array('feed' => $twitterFeed); curl_setopt($this->curlHandler, CURLOPT_POSTFIELDS, http_build_query($twitterData)); curl_exec($this->curlHandler); } } } class TwitterService { /** * This function will establish a persistent connection with one of the twitter's streaming endpoint. * @param array $twitterTrackCollection Collection of tracks which we'll receive updates for */ public function listenForTweets(array $twitterTrackCollection = []) { // Terminate the process if there are no active tracks if (empty($twitterTrackCollection)) { echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL; exit(0); } $twitterListener = new TwitterListener(); $twitterListener->setTrack($twitterTrackCollection); $twitterListener->consume(); } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Services; const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY;const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET;const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN;const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET; require_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php'; class TwitterListener extends \OauthPhirehose{ private $curlHandler = null; private $processStreamPath = null; public function __construct() { parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER); $this->processStreamPath = URL_TO_PROCESS_STREAM_ACTION_HANDLER; $this->consumerKey = CONSUMER_KEY; $this->consumerSecret = CONSUMER_SECRET; $this->configureCURL(); } private function configureCURL() { $this->curlHandler = curl_init(); $headers = array('Content-type: multipart/form-data'); curl_setopt($this->curlHandler, CURLOPT_POST, true); curl_setopt($this->curlHandler, CURLOPT_URL, $this->processStreamPath); curl_setopt($this->curlHandler, CURLOPT_RETURNTRANSFER, 1); } public function enqueueStatus($status) { $twitterFeed = json_decode($status, true); if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) { $twitterData = array('feed' => $twitterFeed); curl_setopt($this->curlHandler, CURLOPT_POSTFIELDS, http_build_query($twitterData)); curl_exec($this->curlHandler); } }} class TwitterService{ /** * This function will establish a persistent connection with one of the twitter's streaming endpoint. * @param array $twitterTrackCollection Collection of tracks which we'll receive updates for */ public function listenForTweets(array $twitterTrackCollection = []) { // Terminate the process if there are no active tracks if (empty($twitterTrackCollection)) { echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL; exit(0); } $twitterListener = new TwitterListener(); $twitterListener->setTrack($twitterTrackCollection); $twitterListener->consume(); }} Now every time a Twitter feed is received, TwitterListener::enqueueStatus() will forward the feed to the route we’ve specified using CURL by making a POST request. In essence, this is exactly like a Webhook if you think about it. Now let’s write the processStreamAction() handler in our DefaultController. In your DefaultController.php, /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Response; use Symfony\Bundle\FrameworkBundle\Controller\Controller; use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class DefaultController extends Controller { /** * This handler will first terminate any previously held background process * by firing the stopStreamAction() handler. Then it'll create a new background * process which establishes a new connection with a streaming endpoint. */ public function startStreamAction(Request $request) { // Execute the stopStreamAction() handler $curlHandler = curl_init(); $stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL); curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath); curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1); curl_exec($curlHandler); // Create a new background process chdir('../'); $output = array(); $command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!'; exec($command, $output); $processId = (int) $output[0]; // Process ID of the newly created background process. Log this. exit(0); } /** * This handler will fetch a log of Process IDs corresponding to any background * process and terminate them. */ public function stopStreamAction(Request $request) { $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE; if (!empty($activeProcesses)) { foreach ($activeProcesses as $processId) { $command = 'kill ' . $processId . ' 2>&1'; exec($command, $output); $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId); } } exit(0); } /** * A POST request will be made to this handler whenever our application receives * a feed. You can write your implementation here. */ public function processStreamAction(Request $request) { if (isset($_POST)) { if (array_key_exists('feed', $_POST)) { // Real-time Twitter Feed. Write your own implementation over here. $streamContent = $_POST['feed']; } } else { // Request method not supported. } exit(0); } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Component\HttpFoundation\Request;use Symfony\Component\HttpFoundation\Response;use Symfony\Bundle\FrameworkBundle\Controller\Controller;use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class DefaultController extends Controller{ /** * This handler will first terminate any previously held background process * by firing the stopStreamAction() handler. Then it'll create a new background * process which establishes a new connection with a streaming endpoint.*/public function startStreamAction(Request $request){// Execute the stopStreamAction() handler$curlHandler = curl_init();$stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL);curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath);curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1);curl_exec($curlHandler);// Create a new background processchdir('../');$output = array();$command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!';exec($command, $output);$processId = (int) $output[0]; // Process ID of the newly created background process. Log this.exit(0);} /** * This handler will fetch a log of Process IDs corresponding to any background * process and terminate them. */ public function stopStreamAction(Request $request){ $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE; if (!empty($activeProcesses)) { foreach ($activeProcesses as $processId) { $command = 'kill ' . $processId . ' 2>&1'; exec($command, $output); $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId);}} exit(0);} /** * A POST request will be made to this handler whenever our application receives * a feed. You can write your implementation here. */ public function processStreamAction(Request $request){ if (isset($_POST)) { if (array_key_exists('feed', $_POST)) { // Real-time Twitter Feed. Write your own implementation over here. $streamContent = $_POST['feed']; } } else { // Request method not supported.} exit(0);}} Now we have a fully functional application, it will listen for live feeds from Twitter for the specified tracks. It also allows us to update our tracks almost instantaneously. It makes sure that only one connection with the streaming endpoint is active at a moment. You can extend this application to your liking and make it more robust and fault-tolerant. Here at UVDesk, we follow a similar approach to provide our Twitter Users with a better user experience and social connectivity. Tag(s) Background Process Facebook API Streaming API Twitter Category(s) UVdesk