We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Designing NRT(NearRealTime) stream processing systems: Using python with Storm and Kafka

00:00

Formal Metadata

Title
Designing NRT(NearRealTime) stream processing systems: Using python with Storm and Kafka
Title of Series
Part Number
35
Number of Parts
119
Author
License
CC Attribution 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal purpose as long as the work is attributed to the author in the manner specified by the author or licensor.
Identifiers
Publisher
Release Date
Language
Production PlaceBerlin

Content Metadata

Subject Area
Genre
Abstract
konarkmodi - Designing NRT(NearRealTime) stream processing systems: Using python with Storm and Kafka The essence of near-real-time stream processing is to compute huge volumes of data as it is received. This talk will focus on creating a pipeline for collecting huge volumes of data using Kafka and processing for near-real time computations using Storm.
Keywords
80
Thumbnail
25:14
107
Thumbnail
24:35
CodeStreaming mediaSystem programmingRing (mathematics)Mathematical singularityCodecProcess (computing)Cartesian coordinate systemMereologyStreaming mediaComponent-based software engineeringNear-ringReal-time operating systemPhysical system2 (number)Moment (mathematics)StapeldateiFile formatFeedbackMultiplication signGoodness of fitString (computer science)InformationMechanism designQuery languageWebsiteEvent horizonBitDuality (mathematics)Object (grammar)Model theoryRight anglePattern recognitionoutputGroup actionSoftware developerSet (mathematics)Presentation of a groupComputer animationLecture/Conference
Streaming mediaHausdorff spaceKnotMaxima and minimaLength of staySturm's theoremMetropolitan area networkAngleRobotReal numberQueue (abstract data type)Euler anglesInsertion lossCurve fittingGamma functionInclusion mapPoint (geometry)Chi-squared distributionHigher-order logicUniform resource locatorFrame problemNumberCountingoutputGroup actionPhysical systemWhiteboardStreaming mediaBitMessage passingFormal languageFunction (mathematics)MereologyCartesian coordinate systemElectric generatorView (database)Sound effectLattice (order)SequenceProcess (computing)Electronic mailing listNumbering schemeLine (geometry)TouchscreenEvent horizonComponent-based software engineeringInterface (computing)Open sourceCircleService (economics)Computer programmingCASE <Informatik>Barrelled spaceLattice (group)Scaling (geometry)WordMultiplication signDifferent (Kate Ryan album)Arithmetic meanMetropolitan area networkPoint (geometry)Boss CorporationOrder (biology)Projective planeSource codeScalabilityVotingMechanism designClosed setMassSoftwareRouter (computing)Goodness of fitLevel (video gaming)Mixture modelModel theoryRight angleState of matterNetwork topologyFirewall (computing)Parallel portLibrary (computing)Modul <Datentyp>QuicksortCategory of beingData storage deviceVolume (thermodynamics)Variety (linguistics)Speech synthesisEnterprise architectureMathematical analysisOrder of magnitudePressureLogicBeat (acoustics)Electronic data processingSlide ruleContent (media)Fault-tolerant systemCodeTerm (mathematics)Near-ringReal-time operating systemProgramming languageDiffuser (automotive)Heegaard splittingMultiplicationTask (computing)Algebraic closureQuery languageTwitterCore dumpInterior (topology)RoutingTupleComputer animation
Metropolitan area networkArmTask (computing)ParsingStreaming mediaGamma functionComputer iconAlgebraic closureMaxima and minimaChi-squared distributionOrder (biology)Group actionBootingCountingCuboidValue-added networkHigher-order logicPersonal area networkBinary fileInclusion mapGrand Unified TheoryFile Transfer ProtocolDemo (music)CountingProjective planeReal-time operating systemJava appletLibrary (computing)Task (computing)Network topologyStreaming mediaEnterprise architectureComputer configurationImplementationUser interfaceWritingIntegrated development environmentMereologyGraph coloringDiagramNumberRobotWordTerm (mathematics)Sparse matrixVirtual machineKey (cryptography)Computer fileComplete metric spaceFault-tolerant systemUtility softwareGroup actionMultiplicationAlgebraic closureGraph (mathematics)Server (computing)Function (mathematics)Message passingWhiteboardStatisticsConfiguration spaceParsingService (economics)Virtual realityFitness functionCartesian coordinate systemMixed realityINTEGRALLoginSubsetConnected spacePoint (geometry)Demo (music)Electronic mailing listStapeldateiView (database)PhysicalismGoodness of fitProcess (computing)Physical systemClient (computing)CASE <Informatik>Data managementRow (database)Coordinate systemForm (programming)Parallel portMatrix (mathematics)TwitterDistanceLinearizationPairwise comparisonGraph (mathematics)Gene clusterEvent horizonException handlingRule of inferenceForcing (mathematics)Self-organizationVideo gameQuicksortACIDFigurate numberLevel (video gaming)Incidence algebraSocial classMotion capturePlanningGreatest elementCore dumpWater vaporLine (geometry)Formal languageBus (computing)Multiplication signOffice suiteOpen setFiber bundleSpectrum (functional analysis)Open source2 (number)Confidence intervalRight angleExtension (kinesiology)State of matterParameter (computer programming)Disk read-and-write headLogicData dictionaryCodeComputer animation
Physical systemPhysical lawArithmetic meanNormal (geometry)Network topologyData managementLoginAsynchronous Transfer ModeInstance (computer science)Real-time operating systemDiagram
Metropolitan area networkIntegrated development environmentSet (mathematics)Value-added networkService-oriented architectureSturm's theoremDensity of statesData acquisitionForm (programming)MultiplicationTriangleModul <Datentyp>ImplementationFamilyBus (computing)Library (computing)Sampling (statistics)LoginPhysical systemMetropolitan area networkData managementBlogData storage deviceCASE <Informatik>Water vaporGroup actionProcess (computing)Cartesian coordinate systemMathematical analysisOrder (biology)Source codeRule of inferenceArithmetic progressionSpecial unitary groupQuicksortPoint (geometry)Model theoryWritingMessage passingComputer fileLimit (category theory)MereologySet (mathematics)Category of beingPartition (number theory)Staff (military)Gene clusterGraph coloringCodierung <Programmierung>Different (Kate Ryan album)AreaIntegrated development environmentMultiplication signInternetworkingSimilarity (geometry)Cache (computing)LogicUniverse (mathematics)Autonomous system (mathematics)High availabilityRight angleNumberReading (process)Web pageHyperbolischer RaumOpen sourceData centerReplication (computing)Service (economics)Position operatorVirtual realityModule (mathematics)Network topologyDatabase transactionRevision controlFitness functionCuboidService-oriented architectureDefault (computer science)MiniDiscQueue (abstract data type)Parallel portJava appletSparse matrixCodeDatabaseCountingFile systemFigurate numberTupleHard disk driveData loggerSoftware repositoryElectronic data processingOcean current
Metropolitan area networkValue-added networkTrailWeb 2.0Heat transferDifferent (Kate Ryan album)Network topologyNetwork socketMereologyMultiplication signGene clusterBoss CorporationParameter (computer programming)WhiteboardMedical imagingLecture/ConferenceComputer animationProgram flowchart
MereologyComponent-based software engineeringNetwork topologyMessage passingPhysical systemMultiplicationMathematicsTask (computing)Parallel portTerm (mathematics)Electronic mailing listEmailChainRevision controlCASE <Informatik>Exception handlingNear-ringLogical constantDatabaseLibrary (computing)Pattern languageQuicksortLattice (order)Chemical equationNumberVideo gameOrder (biology)
Gamma functionMetropolitan area networkNewton's law of universal gravitationLogicCASE <Informatik>Component-based software engineeringMessage passingStreaming mediaModule (mathematics)Library (computing)Set (mathematics)Observational studyWordXMLComputer animation
Thread (computing)MultiplicationNetwork topologyMechanism designPhysical systemSet (mathematics)Scaling (geometry)TelecommunicationProcess (computing)Component-based software engineeringFault-tolerant systemMessage passingDiagramEnterprise architectureProduct (business)Open sourceSoftware frameworkReal-time operating systemTwitterNumberVirtual machineFacebookDifferent (Kate Ryan album)ExistenceTask (computing)2 (number)Term (mathematics)Structural loadTupleMereologyRight angleEvent horizonSolid geometryNegative numberGodOperator (mathematics)DataflowWhiteboardModel theoryLimit (category theory)AbstractionElectronic mailing listLevel (video gaming)
Transcript: English(auto-generated)
Welcome back, everyone. Once again, I'm Austin Bingham from 60 North up in Norway.
And I want to present to you Konark Modi, who will be talking about near real-time stream processing in Python with Storm and Kafka. Take it away. Hi, good morning, all of you. I understand I'm the only one standing between you and the lunch. So I'll try and make a little string for you guys.
All right, so the title of my talk is Designing Near Real-Time Stream Processing Systems. Now, what do we mean by data pipelines and near real-time systems? So everybody of us knows that in real time, we are generating a lot of information as and when we visit websites, we browse a lot of applications and stuff like that. And we need reliable and quick mechanisms
to collect that data. Once we are able to collect that data, we need to process and analyze that data as well. Now, systems like Hadoop let you efficiently do that. But that's, again, batch processing system. When we talk about batch processing systems, we mean that there's going to be high latency of updates in your query results. And when you have an application where you want
to provide recommendations, you want to provide better features to users, you want to provide a feedback loop mechanism, you want a layer which helps you work with the recent data and update with low latency of update time. So that is where the essence of near real-time data pipelines would come. How I treat data pipelines is majorly in three parts. One is the input. That is basically the messaging layer
where you're collecting all your user data that is coming on your streams. Second is the processing layer where you'll be processing the recent data. Now, when I say recent, it defines, say, n minutes or n days. So it's a defined moment of data that you're gonna process on your near real-time stream processing data. I'm not gonna process one month of data on my near real-time processing layer.
And then, when I process that data, there has to be a layer where I can present my data in probably a queryable format or a format that my other applications can consume in a feedback loop. So today's talk is majorly focused on two components. One is the messaging layer, that's Kafka, that I'm gonna take in the second part of the talk. And before that, we're gonna talk about
open source project, which was open source by Twitter, is now Apache project, known as Apache Storm. Primarily, Apache Storm is 50% Clojure and 50% Java, but that does not stop us from using it in Python. And that is why I'm here today to present how we can use Storm and its capabilities full-time using Python itself.
So how I try to structure the talk is, just before that, can I have a show of hands how many of us actually know what Storm is? Cool, and how many of us know what Kafka is? Right, so we have a majority of people that are still not known to Storm and Kafka, so I'll just give a brief of what Storm is and what Kafka is. Given the timeframe and the kind of content that I have,
I might speed up a little bit, but do feel free to catch me after the talk and I'll be happy to show you demos and other stuff. All right, so when we talk about stream processing, there are a lot of challenges that you need to tackle, but this slide only fits a few of them. First of all, for me, Storm, a near real-time processing system
is actually an infrastructure that will do a never-ending data processing for me. So what I mean by that is, it's not like a MapReduce job that at a given point of time, it will finish. A real-time processing system for me is, data keeps on coming in huge volume, it will keep processing it and keep pushing it ahead. It will never end for me. So that is what I mean by
real-time data processing pipelines. You have to store messages somewhere before your consumers can consume it. You might want to replay your messages again. You might want to drop a few messages and stuff like that. So all these things need to be taken care of by your near real-time processing layers. You need to route your messages. For example, you're collecting a few messages from application one,
you're collecting a few messages from application two. You want all your application one messages to route to one particular worker, application two's messages to route to a particular, to a different worker. So these are a few nuances that revolve around a real-time data pipeline processing system, right? You want to scale for high throughput. For example, you design a system that is taking care of thousands of messages today,
but tomorrow you might scale and you might want to use a system which will then process about hundred thousands of messages per second, right? So you need a design philosophy around your system that takes care of all that. When we talk about Storm, it takes care of almost all these things. For example, you use Storm and then you have already solved a lot of variety of challenges
that you can save. For example, you just want to process all the Twitter feeds that you're getting from Firehose and dump it to your DB, all right? Now, in systems like this, you really need to take care of the back pressure. For example, what if my processing layer dies? Where do all my messages go, right? That are coming from the Twitter hose. Or finally my persistent layer is down.
So how do I store my persistent layer or how does the processing layer take care of the layer ahead? And what happens to those messages, right? So, Apache Storm lets you handle extremely broad case of scenarios. For example, you want to do a query on fly and again, that has to be distributed across because your data is distributed across or it takes a lot of time, right?
So Apache Storm has something known as DRPC topologies that helps you do that as well. It is scalable, so when we go in the architecture of Storm you'll understand what we mean by scalable in terms of Apache Storm. So you have different services that do different tasks in Storm and that lets you scale as well. When we talk about scalability, we obviously talk about fault tolerance as well.
What happens to one of the workers if it goes down? What happens to one of the services that coordinates all the workers' go downs? As I mentioned earlier, at any given point of time I do not want my data processing layer to go down, right? One of the components might go down, they might come up, but in the end I never want the whole system to crash down for me because that is certainly not feasible. It is program language agnostic.
That is how we are able to use Python with Apache Storm. It supports all other languages as well. So as I mentioned, the core is in Clojure and Java, but it has multi-lang APIs that help you achieve a lot of stuff. There are a few advanced level of mechanisms that we are still not able to use with Python, but I believe in the roadmap they are planning to open that up
using the multi-lang API as well. Achieving parallelism across components. So when we understand the various components of Storm, I'll brief you about how at each level we can configure what kind of parallelism do we need. So these are the Storm components. I've tried and depicted them into two parts. One is the conceptual view.
This conceptual view helps you when you're actually programming your system for Storm. And then is the physical view. These are the actual services that will be running on your servers, on your systems. So first, it's Spout, okay? Treat Spout as an ingestion layer. It's an ingestion component. So Spout is your interface to the outer world, right?
So you write a Spout that will basically interface with, say, Twitter firehose, or a RabbitMQ layer, or a Kafka layer, or anything like that. And then Spout basically generates Stream, which we're gonna talk about. So Stream is nothing but an unbounded sequence of tuples that basically go to Bolts and other components, right?
So here in the example, what I've tried and showed is a small code snippet of Spout. Now it's doing nothing. In the real world, you will never come across a scenario because what it's doing, it's not connecting the outer world. It's just generating random sentences within itself and emitting them ahead for the further processing layers, right? So what it's actually doing is,
it says I want to generate random sentences from the list of sentences that I have, and just emit each sentence, right? Streams we've talked about. So if you can see in the code snippet, what it shows is, I'm emitting each sentence over here, right? So Stream is non-bounded sequence of tuples,
so each sentence is part of the Stream then. So each message in the Stream is the message that is coming from the Spout. Now, these Streams then communicate with Bolt. So, okay, there is a fine gray line which you cannot see. So basically, Bolt can have n number of inputs
and n number of outputs. So when we talk about inputs, Treat Stream as an input to a Bolt, okay? So a Bolt can take an input from a Spout, a Bolt can take an input from another Spout as well. A Bolt's output can be input to another Bolt, a Bolt's output can be a persistent store or anything else, or a Bolt might not just emit anything.
It might be like a dead end for your topology, okay? How we knit all these together is what we call topology. So in a topology, you will have multiple Spouts, you will have multiple Boats doing a lot of stuff. So as I mentioned, Spout is just an interface for outer world to get data into your Twitter Storm, right?
Boats are essentially your processing layers where you will write your logics, you will write your filterings, you will write your aggregation logics and all that stuff. So Bolt is essentially that. The whole architecture, when we knit it together, we call it a topology. So to your Storm cluster, you always submit a topology, right? Now, the flexibility is such that your topology can be a mixture of all technologies, all right?
So you might write a Spout that is written in Java, you might write a lot of Boats that are written in Python, and that really helps you a lot because, for example, in my environment, we are a mixture of Java and Python devs, right? So I would obviously not rewrite something that is already written in Java, or those guys might not rewrite what already has been written in Python.
So defining a topology helps me do that. How we define this topology? Now, there are multiple ways to do that because there are multiple libraries in Python that helps you connect with Storm. This is an example of stream pass library that I'm going to showcase throughout the talk today. So this is actually a Closure DSL that's written that helps me define my Python Spout and my Boats.
So what is happening over here is I have defined that I have a word Spout. So this is actually a Bolt which is taking input from word Spout, okay? We'll get into the meaning of what Shuffle is. And I've told this Bolt that the program that you need to run when you receive an input is the Sentence Splitter.
So what it's doing is it's receiving sentences and it's splitting the sentences into words. The next Bolt receives the input from Sentence Splitter, which essentially, it's getting the words from there. And over here, we've asked it to group by words. We'll come to groupings, what groupings mean. And then the word count saver is essentially doing nothing.
It's taking inputs from the previous Bolt, which is the word count Bolt, and then processing it. As I mentioned, it's a dead end. It's the last Bolt in my topology. It need not emit anything. You can make it emit anything or you need not make it emit anything, right? So this is what's happening in a basic topology. So what do we mean by groupings now? So groupings will help you tackle challenges
like you want to group by a particular user and then process its data across one worker itself. Because it's a distributed system, if your data gets distributed, you might have problem doing aggregates, right? So take a case of simple word count example. Now, your spout is emitting a lot of words and your Bolt is processing them. And at the end of it, you want counts by words, right?
Now, one way to do that is you have a persistent layer. You're dumping everything over there and that's basically, say, a red is I-N-C-R mechanism. So you're pushing words over there and the I-N-C-R mechanism will take care of that. In terms of Bolt, what I'll do is I'll group them by words on the Bolt itself so it makes sure that each word, same word goes on to the same task.
And there, I'll be maintaining aggregates for that. The other way is shuffle. Shuffle makes sure that each Bolt receives same number of streams but they might shuffle across in terms of they're not grouped by any fundamental key. And there are a lot of other grouping mechanisms also.
So the documentation has the complete list with good examples as well. Coming back to the physical view. In physical view, I'm sorry about the size of the diagram. So on the extreme left, you see Nimbus. So Nimbus, if you're coming from a Hadoop world, so Nimbus is just like a job tracker. What you do with Nimbus is you submit jobs to Nimbus. Nimbus makes sure that these topologies
are then submitted to the workers, all right? So Nimbus will take care of managing and monitoring your topologies around the cluster. When you submit a new topology, it takes care of the deployment. When a task is assigned, or in case of failure of a task, it takes care of the reassignment. Now in the second slide, I had mentioned fault tolerance. Now here comes the first part.
If my Nimbus goes down, my topologies will still keep running. What will get affected is the new topologies, we will not be able to submit the new topologies. But the topologies already running will not get affected by that. So even if your Nimbus server is down for say two hours or three hours, your topologies will still run intact, right?
Zookeeper is the coordinator between supervisor and the Nimbus. Supervisors are the actual services running on the workers that takes care of the topologies. Now Nimbus and supervisors communicate via Zookeepers. Zookeeper is nothing, it's a cluster configuration tool that helps you manage what cluster stats are so that Nimbus and supervisors can be in sync.
Again, if my supervisor goes down, so supervisor again has few parts like workers, which has tasks and executors. So if my supervisor goes down, it will make sure that another worker is spawned. So in that case, fault tolerance is maintained at each and every layer, right? Coming to the main part,
how's Tom and Python fitting together? First is stream parse, that's the library I'm going to talk about today. It's a library open sourced by Parsley, recently it got open sourced. We're going to talk about stream parse in depth today. Stream parse lets you write your topology, lets you write spouts and bolts in Python, but you have to define your topology,
final definition of topology using a Closure DSL. The advantages of that is you can then mix multiple languages, for example, Java, Spout, and Python bolts. So that is why I prefer writing that in Closure DSL. But if you want to write anything in pure Python, there's a library known as Petrel. That was open sourced by AirSage last year. So that helps you write complete topologies,
spouts, bolts, everything in Python. Now, we have another talk on 25th, which I believe is going to talk about integration using Jython. So Jim is the guy who's going to be talking about that. So he has a library called Clamps that basically takes care of mixing your Java and your Python stuff. So that's even interesting library. So I think you should visit that talk as well.
And the native Java project of Storm itself lets you write topologies. But it will not let you define a topology in Python. You have to write your topology in Java. You can write bolts in Python. There are a few implementation where people have written spouts in Python, but that's successful.
So that's the last option I would say you can use. Why I love stream parse? So this is basically the architecture diagram of stream parse. If colors are visible to you, so all the gray part is what is Storm cluster. Now, what stream parse let me do is it lets me create an environment around my whole Storm cluster.
So I am on my dev machine and I'm creating a Storm topology. Now I'm defining bolts, I'm defining spouts. What I need to do is sparse. Sparse is the utility that comes with stream parse. So when I say Sparse quick start a particular project, it gives me this beautiful project layout.
Now each folder and each file has a significance for Sparse. So what it lets me do is it lets me configure all my workers. Why I need workers is basically when you submit a Storm topology via Sparse, via stream parse, it basically does a SSH to each of your worker and deploys your Python environment over there. So for example, if I'm using something like
WebSocket client or Kafka client, that means it's external dependency that does not come with native Python. And this will make sure that it first goes onto the worker node and creates a virtual environment over there where it will install all my dependencies. How I define my dependencies are in one of the files wherein you can say that these are all my dependencies. So it's going to be in virtual environments
and you create a requirements.txt file over there. Topologies is where you define your Closure DSL. So topology slash whatever name you want to keep for your topology, you define your topology over there. This is an example of a topology. It's a word count dot clj file. So what it does is it tells that I have a spout, which is words dot py.
I have a bolt, which is word count dot py. It's as simple as that. What you see in the bottom row is p2, which essentially defines the parallelism that your bolt needs to perform with. Enough of talking. I have built a very small application around this, which I would like to showcase. So the application is built around Wikipedia edit trends.
What is happening at Wikipedia whenever somebody edits an article, they release the edit log for that. So it's real time. What I've done is there are a lot of tools available to capture that feed from Wikipedia. So I've just taken one of the tools. It gives me a fine JSON message, which looks like this. So basically we're going to trend three matrices out of it.
First is going to be action. So when you edit an article on Wikipedia, you can have multiple actions, edit, delete, create, update, et cetera. It tells you whether the article was edited by a bot or not. So is bot false, is bot true. Whether the user was logged in or not while making the changes or not. In case the user is logged in, you have a username. If not, you get an IP over here.
So we'll trend three matrices over here. One is going to be action-wise trends. So how many deletes, how many edits, how many updates. Then it's going to be humans versus bots. A graph, a bar chart. Humans versus bots. And last is going to be logged in users versus anonymous users, all right?
Now, I'm not parsing the real time logs because I was not sure about the internet connectivity. So I've taken a subset of logs from the Wikipedia feed. I've dumped it onto the Kafka layer, and I'm going to use Kafka for that. After this, we're going to come to what Kafka is all about, all right? So what I did was, I just did sparse quick start,
Wikipedia edit logs, underscore trends. It gave me that beautiful project layout that I was talking about. After that, this is the project layout that I was talking about. All the files are there. Now, what I do is I define a very simple Kafka spout. I mean, there's nothing Kafka about it. It's simple pubsub that I'm using currently over here. What it's doing is it's reading a particular topic and getting all the log files and emitting that message, right?
So when we say message over here, it's essentially a JSON log that's getting generated. Now, this board basically parses my JSON, okay? So what it does is it checks what kind of action there is. It checks whether the bot is human or not. It checks whether the user is logged in or not. And stream parse lets me emit in form of batches.
So I would say emit underscore many and list of words is a list of lists. So basically, one output of this would be human G2 underscore edit comma logged in or non-logged in. Now, I'm appending keys like G2 and G3 because I'm not that good with JavaScript
so that helps me out the web interface part of it. So please bear with me for that. So this is how I count it. Now, what I've done over here is I have a WebSocket server running. When I count that, I dump the whole dictionary onto my WebSocket and that lets me plot my D3 graphs on the web interface.
So that's the simple final word counter board that I have for this particular example. This is the DSL. Over here, I'm saying you have to use Kafka spout. Then forward the output of the Kafka spout to the parse JSON. And once the parse JSON has done its task, pass its output to the count board, all right? So this is what it looks like after parsing.
So this is a real-time parsing that is going on. So this is the final output on my WebSocket server that is coming in. And these are the graphs that I was talking about.
So first one is it's create, edit, and I think last one is delete. Then you have bot versus humans. And finally, you'll have logged in versus non-logged in users. So the whole point of this demo is that
it's pretty easy to get started with Storm. It's very easy to write your spouts and bolts. And finally, you have a real-time application that is coming in. And it does not take much of your task to manage the cluster as well. One of the headaches that come with systems like Hadoop is you have to do a lot of cluster management with that. But with Storm, systems like Storm,
you don't really have to do a lot of cluster management. And because we know that once a topology is running, it will never stop. How it will only stop is I have to manually kill that topology. Otherwise, it will always be in the running mode. So as and when the logs are coming, they will get parsed. And that is what the meaning of real-time data pipelines, at least for me, is.
So this is how we run it. So when you do a sparse run, it starts a local instance for your Storm cluster to test your topology right there on your system. Once you've tested it, and you're sure that this works pretty fine for you, you can do a sparse submit and name your topology. What it will do is, now all the topologies get submitted only to the Nimbus. Nimbus takes care of shipping your code to the workers.
But in case of a sparse, first it will SSH on each of your worker nodes, because we need to create a virtual environment for Python that basically takes care of all your dependencies. So that's an extra step that Sparse will do. Otherwise, it's similar if you submit a Java topology or a Python topology, because it gets shipped to your Nimbus via thrift.
Coming to Kafka. Why do we need systems like Kafka? I mean, we've already got so many messaging layers around. We've got a lot of databases. We've got a lot of caching solutions around. So where does Kafka fit in, and what is Kafka? So for me, Kafka is a high-throughput, distributed, persistent messaging system that takes care of PubSub as well,
that takes care of high-throughput, that takes care of distributed storing of my data as well. Now in the earlier challenges that I've been mentioning, the first point I made was I need a robust message layer that takes care of the messages until they're consumed. And also, once they are consumed, I still want to retain those messages, because there might be other consumers
that want to consume them. So when we're talking about a distributed processing layer, there might be n number of consumers that want to consume the same message. So consider a topic, for example, Wikipedia edit trench topic. Now one consumer is reading those topics, and there might be another consumer that is dumping all those log files to my Hadoop layer. So what you need is you want multiple consumptions of the same messages.
You don't want all your messages to be on one box for, say, failover and high availability of your data. You want your data to be partitioned across multiple clusters. So Kafka would let you do all that stuff. Along with that, it lets you maintain high-throughput. So you can have high number of writes and high number of reads. How it lets you do that is because Kafka traditionally is basically
a file system oriented queue, so it persists everything on disk, all right? And by design, it persists everything on disk. It's not an add-on feature that is disabled by default and you switch it on to persist message on the hard disk. So basically Kafka will store everything on the hard disk and let you retrieve that via partitions
and various brokers that you have. These are the few important concepts for Kafka. If you're getting into Kafka, these are the basic things that you need to take care of. Clusters, so clusters is basically a set of brokers. Now you can have one broker, single node, single broker. You can have single node, multiple brokers. You can have multi-node, multiple brokers. It totally depends how you want
to configure your Kafka cluster. But Kafka lets you do that. Even if you have, say, you are a multi-data center environment, Kafka lets you replay your data across data centers as well, something which is known as Kafka monitoring. Topics, so topic, everybody coming from a message queue background knows topic is just a layer which lets you group your messages together. Now the advantage of Kafka is I have multiple consumers
reading the same topic. Now each consumer can start from a different point and read to a different point. It's not like, say, I have read N messages from one consumer, so each consumer in that layer would read from N plus one point. That is what is known as offsets, offsets in Kafka. So along with ZooKeeper, you can maintain your offsets,
wherein you say this topic has been read until this point. So it's simple like a tail minus F that you do on a log file. Partition is basically you use it for replication and you can partition your topics across multiple clusters. So it's basically how I treat it as rethinking of how logs work.
So basically in your environment, don't treat it as logs. Treat it as a log-centric environment, wherein your logs will be used by multiple services. For example, you're pumping logs. Now one application is processing for real-time analysis, another application processing it for probably, say, a daily analysis. You're positioning that to Hadoop. So it's the same log, but it's been getting used
at a lot of different layers. How we can use Kafka with Python? There are currently two implementations to that. One is Kafka Python. That's the first module in the community. And the other one is SAMSA. That's, again, open-sourced by the guys at Parsley.
I'm not sure what version does SAMSA work on, but Kafka Python works on the latest version of Kafka as well. I'm not very sure of SAMSA. I haven't tried that out. Kafka plus Storm really makes a near-real data pipeline a very robust and a fault-tolerant mechanism for you. So if you're looking forward towards implementing that, you should give it a try. Both of the systems work independently,
so it's not a mandate to use Kafka with Storm or Storm with Kafka. Both of them work pretty much independently, so you are free to use any one of them. There are a lot of advanced features of Storm as well. For example, DRPCs topologies, transactional topologies, trident topologies. Now, when you're writing a near-real-time
data processing pipeline, you have to make sure that each of your messages get processed. Otherwise, at the end of it, your counts will start to differ. Now, when we say each of the messages gets processed once, we tend to forget that it's not exactly once. So there are two implementations to this. I will make sure that my messages will get processed
at least once and exactly once. So Storm lets you handle both of these things. In the basic implementation of Storm topology that we saw, it will only let you achieve the first part exactly once because it comes with a reliability API. So what reliability API does is it lets you acknowledge that whether you have processed that tuple or not.
If you've not processed a tuple at any layer during a topology, it will replay that particular tuple. What we mean by exactly once is something that you need to figure out with transactional topologies and trident topologies, that I only want to process this tuple exactly once. So there's where transactional topologies come.
DRPC is supported by multi-lang API, but in the current implementation of the multi-lang API, trident topologies are not supported. So probably in future they have that in roadmap, but I'm not sure of when they release it for multi-lang APIs. These are the resources that I usually follow, and these are pretty good. The first one is the official GitHub repo for the StreamPass library.
Currently they're working extensively on that, and they have few releases ahead as well. But in the current system also, it lets you manage your whole Storm cluster pretty well. For example, you want to ship your logs from various workers and see where your bolts are faltering. You can do that from that. You can submit your topologies, as we just saw, onto your Storm cluster as well.
Then you have Kafka and Python libraries. Official documentations are pretty good for both these systems, so it's a must read for all of you. And if you want to understand how Storm parallelism works, there's a blog by Michael Nel. It's one of the best blogs that helps you understand how parallelism works in Storm. Just like a topology,
we can continue the discussion forever, but I'll put a stop over here and open it up for questions. And just before I forget, I forgot to show you the Storm UI that comes with it.
So that's basically a Storm UI that helps you keep track of how your cluster is behaving. So I submitted a topology. Okay, so that's, okay. So my topology was Wikipedia underscore trends. That's been running since, not one minute. Let me refresh that.
So it tells you the uptime of your topology, how your other parts are behaving on the cluster. Yeah, so it's been up from 50 minutes. If you click on that, you would see the different spouts and different bolts that you have in this. So you have spouts and you have bolts. So I had one spout and I had two bolts,
count and split, so it gives me how many misses are being emitted and stuff. Why there's a difference between the total emitted and, okay, yeah. Why there's a difference between emitted and transfer because one of my bolts does not emit messages. It's a simple web socket push. Right, thank you.
Earlier versions of Storm, you had to, you wanted to edit and topology, you had to take it down and move it back up. Is that still the case? Yes, it's still the case. Yes, sure. So the question is, in earlier versions of Storm, to update a topology,
we had to restart the topology again. It's the question. So still, yes, you have to do that. There are multiple design challenges around that, which does not let you do that. For example, now it depends what kind of change you're doing. For example, if you're doing simple parallelism counts, change and stuff like that, now it depends how your tasks will then get distributed, right? So you have to bring a topology down.
There is a feature request in it which is known as Swamp, I think. So that lets you do Storm Swamp. So that will help you take care of this part wherein you define your parallelism on the fly, and you just do a Storm Swamp. That will basically rebalance all your cluster in terms of the new parallelism that you've defined. But I was reading an interesting mail chain
on the mailing list itself that had other side to it that why is it very difficult to create systems where you can change them on the fly and then let them behave the same way. Because your counts, your aggregates, how your messages have been grouped across different workers really get affected by that. So I don't think that's coming in one of the near future releases for that.
You mentioned the reliability guarantees. So let's say I've written Python, I've written a bad code, and it buys exceptions every now and then. What happens with that message pipeline? So the question is around reliability API.
Supposing the question is we've written a topology in Python, and one of the components start failing, so what happens to the whole data pipeline, right? Now there are two parts to it. First of all, I can probably go back to that code snippet.
So this is how you define acknowledgement and failures in your system. So for example, failure might be because of a component failing, or failure might be because of a business logic failing. So in any of the case, I can explicitly fail my tuple,
and that will get then replayed. But yes, if one of my workers go down, the supervisor will make sure to restart the workers. If, for example, say I'm using a module that is not available on one of my workers, stream pass will not let me submit that topology itself. So this is how that will take care of that. In case of failing of each message individually,
the reliability API explicitly lets you do that to acknowledge or to fail either of the messages, both on cases of business logic or any other scenarios that you have.
The question is how we can scale the machines by adding more number of machines and increase the throughput for the same. So supervisor is what runs your workers, right? So all you need to do is add more machines and start running supervisor over it. It will communicate with the Nimbus, and then how you will scale your topology. Yes, you have to, that's the second part that's coming.
You have to restart, but you, and it will not take care of the dynamic loads. For example, you know that your data load is going to get increased so automatically I would like to scale my storm cluster. Storm does not come with that facility. But it's very easy in terms to scale because all you have to do is start a new worker, worker around it, and all your tasks will then get redistributed.
Yes, so essentially, if you do a PS minus EF on your system and grep for JVMs,
you will see different JVMs running on it. So essentially what is happening is each supervisor runs a set of workers. So each topology will have a set of workers which in turn have executors and threads. So each one of them will have a different process. That is how you guarantee fault tolerance across multiple systems. So one of your components fail, you can still, the other components will still run for you.
So that is how it happens inside a topology.
So the question is how is Twitter storm compared with different frameworks that are available in the market? So Samza is one of the products that is open sourced by LinkedIn itself. So LinkedIn for its near real time does a lot of Samza. Then there's one other product, open sourced by Yahoo, which is S4. I'm not really sure of those products. I've never used it.
But yes, there has to be some design philosophy. That is why there is existence of few different systems that are around. There is one that is used by Facebook. I'm not sure if that's open source or not. But I've never really given a shot to other one of them. Because for me, the components that Twitter storm lists, the features that it lists were pretty strong. And the multi-lang API for me is pretty strong as well.
So those are my preferences for using Apache Storm. Yeah, I can't hear you. Sure, so the question is how does exactly once
and at least once processing works? So there's something known as the guaranteed processing wherein you will flow all your messages across your topology. Once that is done, you are at least processing it once. Now, exactly once is what I was talking about in the advanced features of Storm wherein you need to write transactional topologies which are abstraction over your basic topologies
that lets you achieve exactly once. So for at least once, all your tuples will flow through the topology. That is what Storm takes care of via its communication mechanisms. So yes, there are few internals of Storm in which it communicates via zero MQ and other mechanisms. I have few architecture diagrams for that.
I'll be happy to share that after the talk. That basically explain how each message flows through different workers. For example, one bolt to another bolt from spout to bolt and what happens when we explicitly fail it and stuff like that. So I'll be happy to share that with you.