Onlangs had ik komen over de taak van de uitvoering van de Twitter-Streaming Api ‚ s te verwerken Twitter Feeds (Tweets) in real-time. Meer specifiek, de applicatie worden gebouwd moest worden in staat om te luisteren naar tweets, variërend van meerdere onderwerpen en gebruikers (we verwijzen naar hen als tracks), en moet in staat zijn geweest om de hot-swap van deze sporen bijna onmiddellijk. Niet lang voordat het werken aan deze toepassing, ik had om dezelfde taak in het kader van Facebook. In tegenstelling tot Twitter, die gebruik maakt van hun Streaming Api ‚ s voor toepassingen met real-time updates, Facebook gebruikt in plaats van deWebhooks. Webhooks, wordt een HTTP-callback, geven ontwikkelaars met een niveau van verfijning dat is veel makkelijker om te gaan met. De Streaming Api ‚ s die door Twitter, maar vereist uw toepassing een hardnekkige HTTP-verbinding met één van Twitter is veel streaming eindpunten. Het schrijven van een aanvraag die gebaseerd is op zo een permanente verbinding voor het functioneren van ons dwingt om na te denken over het op een iets andere manier dan wat we zogenaamd gebruikt terwijl de bouw van een traditionele web application. De hele focus van dit artikel zal worden op de ontwikkeling van een dergelijke applicatie. De Twitter Streaming Api ‚ s Als u een kijkje nemen op Twitter de Developer Documentatie, zie je dat ze ons voorzien van Streaming Api ‚ s. Natuurlijk, ze bieden ook ons met RUST en Advertenties Api ’s, maar voor onze doeleinden, zullen we ons alleen richten op hun Streaming Api‘ s. De Streaming Api ‚ s vereist de genoemde verbinding persistent zijn (in wezen nooit eindigende) in de natuur. In essentie, het is vergelijkbaar met het downloaden van een oneindig groot bestand van uw systeem zal blijven ontvangen gegevens, tenzij u handmatig beëindigen van de verbinding. de aard van De feeds van uw aanvraag ontvangt zal afhangen van de aard van de stromen die je gaat gebruiken. De Streaming Api ‚ s kunnen ontwikkelaars met drie verschillende soorten streams: Openbare Streams – Streams van de publieke gegevens die via Twitter. Geschikt voor de volgende specifieke gebruikers of onderwerpen en data mining. User Streams – Single-user streams, met ongeveer alle van de gegevens die overeenkomen met een single-user ‚ s bekijken van Twitter. Site-Streams – De multi-user versie van user streams. Site-streams zijn bedoeld voor servers die moet aansluiten op Twitter namens vele gebruikers. Zoals ik al eerder zei, de feeds die wij ontvangen is afhankelijk van de aard van de stromen gaan we gebruiken. Dus voordat we verder gaan, laten we een kijkje nemen op de eisen van de toepassing, afhankelijk van je gebruik van Openbare Streams of User Streams (Site Streams zijn in een gesloten beta). Onze applicatie moet voldoen aan de volgende eisen: Luisteren naar de live tweets voor de tracks die wij hebben gevraagd. in staat Zijn om te schakelen deze sporen als ze worden gewijzigd met de minste hoeveelheid van de onderbreking van de genoemde aansluiting het Houden van deze eisen in het achterhoofd, zijn wij nu hebben om zich te vestigen op de streams we gebruiken. Gebruiker stromen, zoals eerder vermeld, bevat de gegevens van een individuele user ‚ s point of view. Volgens Twitter documentatie voor de Gebruiker Streams: User Streams zorgen voor een stroom van gegevens en gebeurtenissen die eigen zijn aan de geauthenticeerde gebruiker. Merk op dat de Gebruiker Streams zijn niet bedoeld voor server-naar-server verbindingen. Als u nodig hebt om verbindingen te maken op de rekening van meerdere gebruikers van dezelfde machine, overweeg site stromen. Beperk het aantal verbindingen van uw applicatie maakt om User Streams. Elk Twitter-account is slechts een beperkt aantal gelijktijdige Gebruikers Streams verbindingen per OAuth toepassing, ongeacht IP. Eenmaal per toepassing limiet wordt overschreden, worden de oudste verbinding beëindigd. Een account aanmeldt vanaf te veel exemplaren van dezelfde OAuth toepassing cyclus verbindingen als de toepassing gevallen opnieuw aansluiten en loskoppelen van elkaar. Omdat we meer gericht op het kunnen ontvangen van tweets van vele gebruikers, het is niet duidelijk hoeveel gebruikers zullen worden. Ga door de documenten, User Streams niet knippen voor onze toepassing van de use-case. Dus nu zitten we te doen met Openbaar Stromen. in tegenstelling tot de Gebruiker Stromen, die ons in staat stelt tot het ontvangen van feeds in een context van beperkte individuele gebruikers, Openbaar Streams leveren applicaties met een stroom van openbare gegevens die via Twitter. Spreken in het kader van de tweets met name, als we gebruik maken van het openbaar streams slim, onze toepassing niet gebonden aan de omvang van de beperkte individuele gebruikers in tegenstelling tot de Gebruiker Stromen. Sinds we zijn niet per se zorgen over de eigen stroom van gegevens, kunnen we langs Twitter Handgrepen (@gebruikersnaam) die overeenkomt met de individuele Twitter-Gebruikers en aangezien de tracks voor het vaststellen van een verbinding met het openbaar streaming eindpunt. Dit maakt het voor onze applicatie om te luisteren naar live openbare tweets van meerdere gebruikers. Snel een punt om op te merken voordat we verder gaan, volgens de Twitter-Documentatie voor Openbare Stromen, onze applicatie kan enkel vaststellen van een enkele verbinding met een openbaar streams op een gegeven moment. de Communicatie met de Streaming API: het Bouwen van toepassing Nu we klaar zijn met de inleidende stukjes, we kunnen aan de slag op het bouwen van onze applicatie. Ik zal de ontwikkeling van de applicatie in PHP draait op Linux, met behulp van de populaire Symfony Framework en Componist van de package manager. Dat gezegd zijnde, moet u in staat zijn om de toepassing van de fundamentele concepten en de uitvoering van het project in elke taal van uw keuze. ik zal, echter, worden met behulp van, dePhirehose door Fennb, een bibliotheek van derden ter vereenvoudiging van het proces van communicatie met de Twitter-Streaming-Api ‚ s. Als je de ontwikkeling op een ander platform dan PHP, hier is een uitgebreide lijst van derde-partij-bibliotheken u kunt gebruik maken van. U kunt het toevoegen van een Phirehose om uw project via Componist die met behulp van het volgende commando: php componist nodig „fennb/phirehose“ het opzetten van de toepassing: Maak een nieuwe Symfony project (we noemen het twitterFeeds) door het uitvoeren van de volgende opdracht: symfony nieuwe twitterFeeds Voordat we gaan samen met het artikel, zorg ervoor dat u hebt geïnstalleerd de bibliotheek van derden, zoals hierboven vermeld. Maak vervolgens een nieuwe bundel (we noemen dit TwitterBundle) door het uitvoeren van de volgende opdracht: php-bin/console genereren:bundel Volg samen met wat details van de configuratie wordt gevraagd van je. Vervolgens gaat u naar uw TwitterBundle in de map in uw project en maken een nieuw Services de map. In deze map maak een nieuw bestand met de naam TwitterService.php die we gebruiken. Zorg ervoor dat u zich op deze dienst voor uw toepassing. de Oprichting van een permanente verbinding: Zoals ik al eerder zei, het schrijven van een aanvraag die gebaseerd is op een permanente verbinding voor het functioneren van ons dwingt om na te denken over het op een iets andere manier. In tegenstelling tot in het geval van het gebruik van REST-Api ’s, waar we de API-aanroepen, of Webhooks, waar een callback is gemaakt om onze servers, Streaming Api‘ s, we zijn de oprichting van een permanente verbinding met een streaming eindpunt die dan leven is voor altijd (ideaal gezien), en de verwerking van die gegevens in die wij ontvangen op onze beëindigen door middel van dit medium. We beëindigen deze verbinding alleen in de gevallen dat we nodig hebben om het op te starten om de update van de tracks. Dus hoe gaan we over het vaststellen van een dergelijke zei verbinding? Een manier is om gebruik te maken van deAchtergrond Processen. laten we zeggen dat we moeten luisteren voor live tweets, we moeten dan tot het vestigen van een permanente verbinding met Twitter is openbaar streaming eindpunten. Om dit te doen, kunnen we het uitvoeren van een achtergrond proces via de terminal. Deze op de achtergrond wordt uitgevoerd als een individueel proces, en zal zij het zei permanente verbinding met onze applicatie vereist. We kunnen dan volgen de proces-ID (PID) van deze achtergrond proces als we nodig hebben om te veranderen op een andere manier. Om onze achtergrond, we maken een console-opdracht die we kunnen uitvoeren van de terminal in de root directory van ons project. Om onze opdracht, we kunnen gebruik maken van de Console Component bepaald door het Symfony Framework. Maak eerst een Commando map in je TwitterBundle, en vervolgens create TwitterCommand.php bestand in die map. /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Command; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class TwitterCommand extends ContainerAwareCommand { protected function configure() { $this->setName('twitter:stream')->setDescription('Listen for Live Twitter Feeds.'); } protected function execute(InputInterface $input, OutputInterface $output) { $twitterTracks = ['#ReasonsToLoveMe']; // Define your tracks you wish to receive updates for $this->getContainer()->get('twitter.service')->listenForTweets($twitterTracks); // This will start the streaming process } } 12345678910111213141516171819202122232425262728293031 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Command; use Symfony\Component\Console\Command\Command;use Symfony\Component\Console\Input\InputInterface;use Symfony\Component\Console\Output\OutputInterface;use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class TwitterCommand extends ContainerAwareCommand{ protected function configure() { $this->setName('twitter:stream')->setDescription('Listen for Live Twitter Feeds.'); } protected function execute(InputInterface $input, OutputInterface $output) { $twitterTracks = ['#ReasonsToLoveMe']; // Define your tracks you wish to receive updates for $this->getContainer()->get('twitter.service')->listenForTweets($twitterTracks); // This will start the streaming process }} Van de root directory van ons project, we voeren nu de code die is geschreven in de execute() – functie met behulp van de volgende opdracht uit in de terminal: php-bin/console twitter:stream Zoals we kunnen zien, de TwitterCommand::execute() functie zal op zijn beurt weer bellen met de TwitterCommand::listenForTweets() – functie gedefinieerd in de TwitterService, die zal zorgen voor een nieuwe permanente verbinding met Twitter streaming eindpunt. Dus laten we de code van deze ‚volgende‘. In uw TwitterService.php die we eerder aangemaakt, /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Services; const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY; const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET; const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN; const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET; // Include the third-party library require_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php'; class TwitterListener extends \OauthPhirehose { public function __construct() { parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER); $this->consumerKey = CONSUMER_KEY; $this->consumerSecret = CONSUMER_SECRET; } public function enqueueStatus($status) { $twitterFeed = json_decode($status, true); if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) { $twitterData = array('feed' => $twitterFeed); dump('New Feed Recieved'); dump($twitterData); } } } class TwitterService { /** * This function will establish a persistent connection with one of the twitter's streaming endpoint. * @param array $twitterTrackCollection Collection of tracks which we'll receive updates for */ public function listenForTweets(array $twitterTrackCollection = []) { // Terminate the process if there are no active tracks if (empty($twitterTrackCollection)) { echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL; exit(0); } $twitterListener = new TwitterListener(); $twitterListener->setTrack($twitterTrackCollection); $twitterListener->consume(); } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Services; const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY;const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET;const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN;const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET; // Include the third-party libraryrequire_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php'; class TwitterListener extends \OauthPhirehose{ public function __construct() { parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER); $this->consumerKey = CONSUMER_KEY; $this->consumerSecret = CONSUMER_SECRET; } public function enqueueStatus($status) { $twitterFeed = json_decode($status, true); if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) { $twitterData = array('feed' => $twitterFeed); dump('New Feed Recieved'); dump($twitterData); } }} class TwitterService{ /** * This function will establish a persistent connection with one of the twitter's streaming endpoint. * @param array $twitterTrackCollection Collection of tracks which we'll receive updates for */ public function listenForTweets(array $twitterTrackCollection = []) { // Terminate the process if there are no active tracks if (empty($twitterTrackCollection)) { echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL; exit(0); } $twitterListener = new TwitterListener(); $twitterListener->setTrack($twitterTrackCollection); $twitterListener->consume(); }} als je Nu de opdracht uitvoeren die we zojuist gemaakt hebben van de terminal, een nieuwe verbinding met Twitter Streaming API zal worden opgericht en dat je begint met het ontvangen van publieke tweets voor de tracks die u hebt gedefinieerd. Op het moment van schrijven van dit artikel, „#ReasonsToLoveMe“ is trending dus ik gebruikte dat als voorbeeld volgen. Nu elke keer een tweet is gemaakt dat bestaat uit een van onze opgegeven tracks (in dit geval „#ReasonsToLoveMe“), die tweet zal worden doorgegeven naar beneden de verbinding pijplijn. Deze tweet wordt dan doorgegeven naar de TwitterListener::enqueueStatus() functie, waar u kunt het verwerken van de ontvangen voeden. Echter je moet voorkomen dat het doen van een tijd-intensieve werking op dit punt. Reden hiervoor is dat als je aanvraag niet kan worden verwerkt door deze stroom van data snel genoeg in tegenstelling tot de rente die u ontvangt op de feeds op, loopt u het risico dat uw verbinding beëindigd, de meeste van ons zou onwenselijk zijn. Real-Time Twitter-Feeds Tot nu toe, hebben we het handmatig starten van de verbinding door te gaan naar onze terminal en het uitvoeren van de opdracht. Maar omdat in de echte wereld zijn een heleboel verschillende, laten we het automatiseren van dit proces van het starten en sluiten van de verbinding. In de DefaultController, laten we het schrijven van een actie van de handler (deze noemen we de startStreamAction), dat, wanneer het uitgevoerd zal starten onze streaming proces. Zorg ervoor dat u hebt geconfigureerd, de route voor deze handler in de bijbehorende routering.yml bestand. In de DefaultController.php /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Bundle\FrameworkBundle\Controller\Controller; class DefaultController extends Controller { // ..... /** * This handler will switch to the root directory and start a new background process */ public function startStreamAction(Request $request) { chdir('../'); $command = 'php bin/console twitter:listen'; exec($command); exit(0); } // ..... } 12345678910111213141516171819202122232425262728293031 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Bundle\FrameworkBundle\Controller\Controller; class DefaultController extends Controller{ // ..... /** * This handler will switch to the root directory and start a new background process */ public function startStreamAction(Request $request) { chdir('../'); $command = 'php bin/console twitter:listen'; exec($command); exit(0); } // .....} Nu elke keer bezoeken we de route voor deze handler, deze opdracht zal uitgevoerd worden afzonderlijk in een nieuwe thread. Met het Openbaar Stromen, we krijgen alleen openbare feeds voor de tracks die we hebben gedefinieerd voorafgaand aan de oprichting van een permanente verbinding met de streaming eindpunt. Dus als een noodzaak ontstaat voor het bijwerken van onze tracks, die we nodig zullen hebben voor het beëindigen van de vorige verbinding, opnieuw te definiëren onze tracks, en vervolgens opnieuw wordt een nieuwe verbinding met de streaming eindpunt. Dit proces is zeer essentieel, omdat we slechts één verbinding met de streaming eindpunt in een tijd (we het risico dat onze IP-verboden met meerdere aansluitingen). We moeten dus zorg ervoor dat voordat u een nieuwe verbinding maakt, worden alle eerder gehouden verbindingen worden gesloten. Om een verbinding met de streaming eindpunt, kunnen we beëindigen de achtergrond proces overeenkomt met de verbinding. Om te doden van het proces, kunnen we gebruik maken van het proces-ID (PID), of zoek het op met de naam en de argumenten van de achtergrond proces. Met behulp van de proces-ID, elke keer als we een nieuwe verbinding maken, we zullen moeten inloggen (database, tekstbestand, enz…) van de proces-ID overeenkomt met dat op de achtergrond. Met behulp van dit logboek kunt u vervolgens te beëindigen eventuele voorheen aangehouden achtergrond processen voor het initiëren van een nieuwe. Laat de update van de startStreamAction handler aan te melden voor de proces-ID elke keer als er een verbinding is gemaakt, en in de DefaultController, maak een nieuwe actie handler (laat noemen dit de stopStreamAction) te beëindigen eventuele voorheen aangehouden achtergrond processen. Dan kunnen We bellen deze handler voordat we beginnen een nieuwe achtergrond proces om ervoor te zorgen dat slechts één exemplaar van een permanente verbinding met de streaming eindpunt is tegelijkertijd actief zijn. /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Response; use Symfony\Bundle\FrameworkBundle\Controller\Controller; use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class DefaultController extends Controller { // ...... /** * This handler will first terminate any previously held background process * by firing the stopStreamAction() handler. Then it'll create a new background * process which establish a new connection with a streaming endpoint. */ public function startStreamAction(Request $request) { // Execute the stopStreamAction() handler $curlHandler = curl_init(); $stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL); curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath); curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1); curl_exec($curlHandler); // Create a new background process chdir('../'); $output = array(); $command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!'; exec($command, $output); $processId = (int) $output[0]; // Process ID of the newly created background process. Log this. exit(0); } /** * This handler will fetch a log of Process IDs corresponding to any background * process and terminate them. */ public function stopStreamAction(Request $request) { $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE; if (!empty($activeProcesses)) { foreach ($activeProcesses as $processId) { $command = 'kill ' . $processId . ' 2>&1'; exec($command, $output); $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId); } } exit(0); } // ...... } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Component\HttpFoundation\Request;use Symfony\Component\HttpFoundation\Response;use Symfony\Bundle\FrameworkBundle\Controller\Controller;use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class DefaultController extends Controller{ // ...... /** * This handler will first terminate any previously held background process * by firing the stopStreamAction() handler. Then it'll create a new background * process which establish a new connection with a streaming endpoint. */ public function startStreamAction(Request $request) { // Execute the stopStreamAction() handler $curlHandler = curl_init(); $stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL); curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath); curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1); curl_exec($curlHandler); // Create a new background process chdir('../'); $output = array(); $command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!'; exec($command, $output); $processId = (int) $output[0]; // Process ID of the newly created background process. Log this. exit(0); } /** * This handler will fetch a log of Process IDs corresponding to any background * process and terminate them. */ public function stopStreamAction(Request $request) { $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE; if (!empty($activeProcesses)) { foreach ($activeProcesses as $processId) { $command = 'kill ' . $processId . ' 2>&1'; exec($command, $output); $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId); } } exit(0); } // ......} Als je goed kijkt, bouwden wij de opdracht die we gaan uitvoeren in de terminal met behulp van de exec() functie. nohup php-bin/console twitter:luisteren > /dev/null 2>&1 & echo $! Al die extra bits en stukken zal de terugkeer van de proces-ID van de achtergrond proces gemaakt via de exec() functie. Nu elke keer een nieuwe achtergrond proces is gemaakt naar aanleiding van deze aanpak, al onze eerder gehouden op de achtergrond zal worden beëindigd (indien van toepassing). Dit moet meer dan genoeg zijn om ervoor te zorgen dat er slechts één verbinding tegelijk actief zijn, mits je goed logboek hun proces-Id ‚ s. U kunt het verbeteren van deze functionaliteit volgens uw behoeften. dat nu is voor het verwerken van de stroom van gegevens die we ontvangen via de verbinding pijplijn. Om dit te doen, laten we een verbetering van onze Twitter-Service (TwitterService.php) het eerste, en vervolgens redirect alle tweets die onze toepassing ontvangt een handler (we noemen dit de processStreamAction) die op zijn beurt weer het verwerken van deze tweets. In het TwitterService.php, /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Services; const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY; const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET; const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN; const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET; require_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php'; class TwitterListener extends \OauthPhirehose { private $curlHandler = null; private $processStreamPath = null; public function __construct() { parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER); $this->processStreamPath = URL_TO_PROCESS_STREAM_ACTION_HANDLER; $this->consumerKey = CONSUMER_KEY; $this->consumerSecret = CONSUMER_SECRET; $this->configureCURL(); } private function configureCURL() { $this->curlHandler = curl_init(); $headers = array('Content-type: multipart/form-data'); curl_setopt($this->curlHandler, CURLOPT_POST, true); curl_setopt($this->curlHandler, CURLOPT_URL, $this->processStreamPath); curl_setopt($this->curlHandler, CURLOPT_RETURNTRANSFER, 1); } public function enqueueStatus($status) { $twitterFeed = json_decode($status, true); if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) { $twitterData = array('feed' => $twitterFeed); curl_setopt($this->curlHandler, CURLOPT_POSTFIELDS, http_build_query($twitterData)); curl_exec($this->curlHandler); } } } class TwitterService { /** * This function will establish a persistent connection with one of the twitter's streaming endpoint. * @param array $twitterTrackCollection Collection of tracks which we'll receive updates for */ public function listenForTweets(array $twitterTrackCollection = []) { // Terminate the process if there are no active tracks if (empty($twitterTrackCollection)) { echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL; exit(0); } $twitterListener = new TwitterListener(); $twitterListener->setTrack($twitterTrackCollection); $twitterListener->consume(); } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Services; const CONSUMER_KEY = YOUR_APP_CONSUMER_KEY;const CONSUMER_SECRET = YOUR_APP_CONSUMER_SECRET;const APPLICATION_TOKEN = YOUR_APP_ACCESS_TOKEN;const APPLICATION_TOKEN_SECRET = YOUR_APP_ACCESS_TOKEN_SECRET; require_once __DIR__ . '/../../../vendor/fennb/phirehose/lib/OauthPhirehose.php'; class TwitterListener extends \OauthPhirehose{ private $curlHandler = null; private $processStreamPath = null; public function __construct() { parent::__construct(APPLICATION_TOKEN, APPLICATION_TOKEN_SECRET, \Phirehose::METHOD_FILTER); $this->processStreamPath = URL_TO_PROCESS_STREAM_ACTION_HANDLER; $this->consumerKey = CONSUMER_KEY; $this->consumerSecret = CONSUMER_SECRET; $this->configureCURL(); } private function configureCURL() { $this->curlHandler = curl_init(); $headers = array('Content-type: multipart/form-data'); curl_setopt($this->curlHandler, CURLOPT_POST, true); curl_setopt($this->curlHandler, CURLOPT_URL, $this->processStreamPath); curl_setopt($this->curlHandler, CURLOPT_RETURNTRANSFER, 1); } public function enqueueStatus($status) { $twitterFeed = json_decode($status, true); if (is_array($twitterFeed) && isset($twitterFeed['user']['screen_name'])) { $twitterData = array('feed' => $twitterFeed); curl_setopt($this->curlHandler, CURLOPT_POSTFIELDS, http_build_query($twitterData)); curl_exec($this->curlHandler); } }} class TwitterService{ /** * This function will establish a persistent connection with one of the twitter's streaming endpoint. * @param array $twitterTrackCollection Collection of tracks which we'll receive updates for */ public function listenForTweets(array $twitterTrackCollection = []) { // Terminate the process if there are no active tracks if (empty($twitterTrackCollection)) { echo 'No active tracks. Terminating Process...' . PHP_EOL . PHP_EOL; exit(0); } $twitterListener = new TwitterListener(); $twitterListener->setTrack($twitterTrackCollection); $twitterListener->consume(); }} Nu elke keer een twitter-feed is ontvangen, TwitterListener::enqueueStatus() van het voer voor de route die wij hebben opgegeven met KRULLEN door het maken van een POST-aanvraag. Dit is In essentie precies hetzelfde als een Webhook als je er over nadenkt. Nu laten we het schrijven van de processStreamAction() – handler in de DefaultController. In het DefaultController.php, /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Response; use Symfony\Bundle\FrameworkBundle\Controller\Controller; use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class DefaultController extends Controller { /** * This handler will first terminate any previously held background process * by firing the stopStreamAction() handler. Then it'll create a new background * process which establish a new connection with a streaming endpoint. */ public function startStreamAction(Request $request) { // Execute the stopStreamAction() handler $curlHandler = curl_init(); $stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL); curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath); curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1); curl_exec($curlHandler); // Create a new background process chdir('../'); $output = array(); $command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!'; exec($command, $output); $processId = (int) $output[0]; // Process ID of the newly created background process. Log this. exit(0); } /** * This handler will fetch a log of Process IDs corresponding to any background * process and terminate them. */ public function stopStreamAction(Request $request) { $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE; if (!empty($activeProcesses)) { foreach ($activeProcesses as $processId) { $command = 'kill ' . $processId . ' 2>&1'; exec($command, $output); $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId); } } exit(0); } /** * A POST request will be made to this handler whenever our application receives * a feed. You can write your implementation here. */ public function processStreamAction(Request $request) { if (isset($_POST)) { if (array_key_exists('feed', $_POST)) { // Real-time Twitter Feed. Write your own implementation over here. $streamContent = $_POST['feed']; } } else { // Request method not supported. } exit(0); } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 /** * Webkul Software. * * @category Webkul, Uvdesk * @package Webkul_UVDesk_TF * @author Akshay Kumar * @copyright Copyright (c) 2010-2016 Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace TwitterBundle\Controller; use Symfony\Component\HttpFoundation\Request;use Symfony\Component\HttpFoundation\Response;use Symfony\Bundle\FrameworkBundle\Controller\Controller;use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class DefaultController extends Controller{ /** * This handler will first terminate any previously held background process * by firing the stopStreamAction() handler. Then it'll create a new background * process which establish a new connection with a streaming endpoint. */ public function startStreamAction(Request $request) { // Execute the stopStreamAction() handler $curlHandler = curl_init(); $stopStreamPath = $this->generateUrl('twitter_stream_stop', array(), UrlGeneratorInterface::ABSOLUTE_URL); curl_setopt($curlHandler, CURLOPT_URL, $stopStreamPath); curl_setopt($curlHandler, CURLOPT_RETURNTRANSFER, 1); curl_exec($curlHandler); // Create a new background process chdir('../'); $output = array(); $command = 'nohup php bin/console twitter:listen > /dev/null 2>&1 & echo $!'; exec($command, $output); $processId = (int) $output[0]; // Process ID of the newly created background process. Log this. exit(0); } /** * This handler will fetch a log of Process IDs corresponding to any background * process and terminate them. */ public function stopStreamAction(Request $request) { $activeProcesses = ARRAY_COLLECTION_OF_PROCESS_IDS_TO_TERMINATE; if (!empty($activeProcesses)) { foreach ($activeProcesses as $processId) { $command = 'kill ' . $processId . ' 2>&1'; exec($command, $output); $this->get('logger')->info('Twitter Stream Process Terminated: ' . $processId); } } exit(0); } /** * A POST request will be made to this handler whenever our application receives * a feed. You can write your implementation here. */ public function processStreamAction(Request $request) { if (isset($_POST)) { if (array_key_exists('feed', $_POST)) { // Real-time Twitter Feed. Write your own implementation over here. $streamContent = $_POST['feed']; } } else { // Request method not supported. } exit(0); }} Daar ga je! Nu hebben we een volledig functionele applicatie, die niet alleen luisteren naar live feeds van Twitter voor de opgegeven nummers, maar stelt ons ook in staat om onze tracks bijna ogenblikkelijk, terwijl ervoor te zorgen dat slechts één verbinding met de streaming eindpunt is actief op een moment. U kunt uitbreiden van deze toepassing naar uw wens en maken het meer robuuste en fault tolerant. Hier op UVDesk, volgen we een soortgelijke aanpak om onze Twitter-Gebruikers met een betere user-experience en sociale connectiviteit. Tag(s) Background Process Facebook API Streaming API Twitter Category(s) uvdesk