We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Processing Streaming Data at a Large Scale with Kafka

00:00

Formal Metadata

Title
Processing Streaming Data at a Large Scale with Kafka
Title of Series
Part Number
32
Number of Parts
86
Author
License
CC Attribution - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Using a standard Rails stack is great, but when you want to process streams of data at a large scale you'll hit the stack's limitations. What if you want to build an analytics system on a global scale and want to stay within the Ruby world you know and love? In this talk we'll see how we can leverage Kafka to build and painlessly scale an analytics pipeline. We'll talk about Kafka's unique properties that make this possible, and we'll go through a full demo application step by step. At the end of the talk you'll have a good idea of when and how to get started with Kafka yourself.
35
BitDatabaseMultiplication signError messageMereologyMobile appSource codeServer (computing)MultiplicationLastteilungProduct (business)Streaming mediaService (economics)XMLLecture/Conference
Service (economics)IP addressCASE <Informatik>Form (programming)Line (geometry)Electric generatorWebsiteExtreme programmingSocial classBlogGraph (mathematics)Thermodynamischer ProzessLoginServer (computing)Streaming mediaAddress space
DatabaseTotal S.A.BitDifferent (Kate Ryan album)FrequencyRandomizationMultiplication signNumberScripting languageLine (geometry)CountingMathematicsText editorSocial classRight angleQuicksortGene clusterWeb browserHuman migrationStreaming mediaThermodynamischer ProzessData integrityExistenceCodeRow (database)Analytic continuation1 (number)Query languageCartesian coordinate systemField (computer science)
Video gameMultiplication signDatabaseDifferent (Kate Ryan album)Physical systemStructural loadProper mapServer (computing)Line (geometry)CalculationQuery languagePoint (geometry)Lastteilung2 (number)Single-precision floating-point formatTraffic reportingCategory of beingMathematical optimizationStapeldateiNeuroinformatikStreaming mediaSatelliteData managementFrequencyPersonal digital assistantCurveComputer animation
WordServer (computing)Partition (number theory)Service-oriented architectureMessage passingCategory of beingBitStreaming mediaCASE <Informatik>Data storage deviceTable (information)Group actionNumberChannel capacityDatabaseNetwork topologyRoundness (object)Right angleLine (geometry)Figurate numberComputer programQuicksortObject (grammar)Physical systemField (computer science)Proper mapDevice driverCodeClient (computing)Different (Kate Ryan album)Structural loadKey (cryptography)Disk read-and-write head
Data structureServer (computing)Client (computing)Group actionSystem identificationPhysical systemCASE <Informatik>Arithmetic progressionMoment (mathematics)Dependent and independent variablesService-oriented architectureMessage passingDifferent (Kate Ryan album)TrailPartition (number theory)INTEGRALDevice driverEmailDatabaseQueue (abstract data type)Analytic continuation
Partition (number theory)MereologyService-oriented architectureServer (computing)PlotterDirection (geometry)Level (video gaming)InternetworkingFamilyCASE <Informatik>
Endliche ModelltheorieTask (computing)Table (information)Bit rateAnalytic setIP addressSource codeTime seriesLoginPhysical systemCountingTrail
Point cloudCountingCodeMatching (graph theory)Standard deviationRow (database)Hash functionSound effectEndliche ModelltheorieLoop (music)CodeField (computer science)
Greatest elementComputer architectureBit rateEndliche ModelltheorieBefehlsprozessorTask (computing)RoutingBitDatabaseStructural loadServer (computing)LoginLoop (music)MereologyMultiplication sign1 (number)Partition (number theory)PreprocessorNetwork topology
Computer fileRaw image formatView (database)Web pageLine (geometry)Server (computing)Point (geometry)Web 2.0Message passingData loggerLaptopSystem callProgrammschleifeCodeRight angleDeadlockCheat <Computerspiel>
IP addressUniform resource locatorLine (geometry)Web pagePartition (number theory)CodeData storage deviceHash functionView (database)Key (cryptography)Object (grammar)CountingDot productProgrammschleifeNormal (geometry)2 (number)Loop (music)MereologyInstance (computer science)State of matterRegulärer Ausdruck <Textverarbeitung>Endliche ModelltheorieRow (database)Multiplication signTask (computing)Message passingThermodynamischer ProzessDatabaseRaw image formatBit rateAreaPlanningMoment (mathematics)Structural loadAddress spaceBuffer solutionEuler angles
WebsiteStandard deviationGame controllerView (database)Buffer solutionLoop (music)Table (information)SummierbarkeitCountingDemo (music)Maxima and minimaOpen set
BitOpen setWeb browserDifferent (Kate Ryan album)BootingChannel capacityLine (geometry)Partition (number theory)Physical systemHoaxPreprocessorFunction (mathematics)LaptopServer (computing)Wave packetElectronic mailing listMiniDiscPropagator
Price indexElectronic mailing listServer (computing)2 (number)Right angleAreaChannel capacityMultiplication signData storage deviceResultantLogic gatePartition (number theory)
NeuroinformatikPresentation of a groupFile formatMessage passingDatabaseOperator (mathematics)Overhead (computing)Java appletBitConfiguration spaceInternet service providerService (economics)Form (programming)Gamma functionTraffic reportingMultiplication signDifferent (Kate Ryan album)Lecture/Conference
XML
Transcript: English(auto-generated)
So I'm going to talk about processing streaming data with Kafka, but I will tell a little bit about myself first.
So, my name is Thijs, I work at AppSignal. So my name is pronounced like this. So this is like the tutorial for how to do that. I'm from Amsterdam, Netherlands, and actually today is the biggest holiday of the year, so Amsterdam currently looks like this,
like all over the city, and I'm skipping this party and I'm excited to be here with you today, so you're welcome. So we do a monitoring product for Ruby and Elixir apps. And as always with these kinds of products,
you start with the question, how hard can this be? And you assume it's not really going to be hard, and then of course it always actually is. So it turns out that if you do a product like that, you need to process a lot of streaming data. So how our product basically works is that we have an agent
that people install on the server, it's via Ruby gem, and it's running on all these customers' machines and they're posting data to us regularly with errors that happened and stuff that was slow, and then we process that and kind of merge it all together to make a UI out of that and do alerting.
And that's streaming data. So streaming data is usually defined like this. So it's generated continuously on a regular interval, and it comes from multiple data sources which are all posting it synchronously, simultaneously, and there's some classical problems
associated with this, so one obvious one is database locking. If you do a lot of small updates, then the database is going to be locked a big part of the time and everything will become really slow. You kind of have to load balance this stuff around and make sure that ideally you make sure that stuff ends up at the same worker servers
so you can do some smarter stuff with it. So let's look at a really simple streaming data challenge that we will use for the rest of the talk as a use case. So we've got a pretty popular website. It has visitors from all over the world. It also has servers all over the world
which are handling traffic for these visitors. And we want to do some processing on the logs basically. So we have a big stream of log lines coming in which have like the visitor's IP address and the URL they visited, you know the standard stuff.
And we actually want to turn it into this graph. So this is a graph of like the total amount of visits we had from four different countries. Is this actually hard to do? And the answer is that on a small scale it's not.
It's actually quite easy. So the simple approach is to just update a database like for every single log line. So that looks a little bit like this. So basically you just do an update query. There's a countries table. It has a country code and a count field.
And you just update the thing every single time you get a visitor from that certain country. But the issue is that a database has to make sure that all data, that data integrity is kept. So it doesn't actually understand that all these streams
are kind of continuously. And the log line actually never has to go back in time and update something. But the database has to take into account that data that already exists could be updated again. So it has to do a lock around a row. And if you do this at a really high scale
then the whole database will just lock down and there will be no time left to actually do updates. So we ran into this a number of times during our existence at AppSignal. So one thing you can do next is sharding the data. So you basically just put all the Dutch visitors
in the database one. You put the US ones in number two. And you can kind of just scale it out by just grouping the data on some kind of axis and just put it in different database clusters. But this has some downsides. So what happens is that if you want to query this data
and you want to get an overview of everything that's in there, you might have to end up querying different database clusters and manually merging all this stuff which can get really slow and complicated. And a classical one as well is changing the sharding. So if we now decide that we actually wanted counts
per browser that people use all along, then we have to completely change this around and write a big migration script and it's going to be really complicated. At AppSignal we actually do a lot more than just increment and counter. So we have to do a lot of processing on this data
as it comes in. So we not only have a bottleneck in the database itself, but we also have a bottleneck in the processing that comes before the database. So we sort of started doing this at some point. So we would have a worker server and a customer's traffic would come to one of these servers and be flushed to a database cluster.
There's a really big issue with this which is of course that a worker server could die and then this customer is going to be really unhappy because they just had a gap in their reporting for maybe 15 minutes. So what you can then do is just put a load balancer
in front of it and all the traffic will be randomly distributed to all these worker nodes. And this works as well, but then still the worker doesn't get all the customer's data. So it has to kind of do really smart optimizations. Yeah, so the data is fragmented
and we cannot really do the stuff we want to do. But actually our life would be really awesome if this were true. So we get all the data from one customer and the same worker. Because then you can start doing some pretty smart stuff. So a really simple, smart thing you can do is this. So basically, the standard way to do it
is just incrementing the counter every single time a log line arrives in our system. But if we had all the data locally, we could just cache that for a little bit. And we just have this single update query that we could run maybe every 10 seconds and that would totally decrease the load on the database.
So this is kind of what we want to do. We want to be able to batch the streams and do some caching and do some local calculations and then just write it out to a standard database. And we actually need this to do the statistical tricks we want to do.
So if you want to do proper percentiles or histograms, you're kind of forced to have all your data on one computer at some point because otherwise you cannot do the calculation. So we're back here. So we want to write all the customer's streams to a single worker which gets written to the database.
And actually if we can do this batching, then we don't really need the sharding anymore so we could maybe get away with just having a single database. But of course, we're back where we started at the beginning of the talk because we now still have a single point of failure.
This thing can fail and the customer will be really unhappy. So we need something special. And that will be Kafka for us. So we looked at a lot of different systems and Kafka has some unique properties that allow you to do cool stuff with this. So Kafka actually makes it possible to load banner stuff
and do the routing and do the failover properly. And I'm saying makes it possible, like not makes it easy. It's still like pretty hard but at least it's possible which is better than impossible in my book. So I will now try to explain Kafka to you. And there's actually a sort of complicated thing about it
and which is that there's four different concepts that you all kind of need to get and also in relation to each other to be able to understand the whole thing. So it's actually a bit of a hard thing to wrap your head around if you're not used to it. So bear with me and I'll try to make it clear to you.
So these are the four main concepts. You can, oh, the beamer is really bad. But they will show up. So the first thing is a topic. So a topic is kind of like a database table. It's just a grouping of stuff. So a topic will contain a stream of data
which can be anything. It could be a log line or some kind of JSON object or whatever. This could all be in a topic. And all these messages which are in the topic, they're in different partitions. So a topic is partitioned in say 16 pieces
and a message will always end up on one of these partitions. And the interesting thing is that you can choose how to partition data. So if a message that has the same key will always end up in the same topic. So we could therefore group all our US visitors together if we want to. We'll look at how this works in a little bit.
Broker is the Kafka word for a server basically. I'm not sure why they picked a different word, but they did. So a broker stores all this data and makes sure that it can be delivered to the final concept which is a consumer.
And consumer is the Kafka word for basically just a client or a database driver. It's just a way, something you can use from your code to read these messages that are on a topic. And Kafka kind of likes to invent its own words for some reason. So a lot of these things already have a name but they also have a Kafka name
which can be a little confusing. So these are the four concepts. I will now go into them in more detail. So this is what a topic looks like. This specific topic has three partitions and it has a stream of messages coming in which all have an offset. So the offset is that number you see there
which starts at zero and it just automatically increments up. So new data is coming in at the right side of this and all the data is going out at the left side and you can configure how long you want this data to stay around. So usually this would stay around for a few days and then after say three days
the data at the left side would just be cleaned up. It would just fall off the retention. So new messages are coming in from the right side. So if we group these messages by country as we do here then they will actually
always end up on the same partition and that's a really important thing because that will become apparent when we discuss the consumer. So next up is the broker. So a broker is the Kafka server and the partitions and the messages live on these servers and a broker is always the primary for some partitions
and secondary for others and that looks like this. So say we have three brokers. So broker one will get one to three as primary, broker two will get four to six, three will get seven to nine and actually all these brokers will be secondary for another broker's primary partitions.
So that means that if one of the brokers die actually you can redistribute all the data and it will all still be there. So in this case broker three died and broker one and broker two both got some extra partitions.
So if you still have enough capacity in your system after this failure then the whole thing will still be working. There will be no, nothing will be actually broken. It might be the case that you were maybe a little bit less extra capacity than needed and then the whole thing might slow down but if you plan it properly
then this is still fully working. The other thing, this also works the other way around. So if you go from three to six servers because you've got a big new customer and you need more capacity, it will also just automatically spread out these partitions over these brokers without you really having to do any work for it.
So the fourth and final concept I will now tell you about more in detail is the consumer. So the consumer is this Kafka client. It's basically comparable to a database driver or a Redis client. It lets you listen to a topic.
And one of the great things about Kafka is that you can have multiple consumers which all keep track of their own offset. So in this case, we have two consumers. Like one is responsible for sending out Slack notifications and the other one is responsible for sending out notifications via email.
So they both start at the beginning at offset zero. But then it actually turns out that Slack is down at the moment. So we cannot reach our API. So in this case the Slack consumer is still stalled at offset zero. It's just waiting there because it cannot continue. But the email notifications are actually going just fine.
They don't have any issue at all. And then if Slack comes back up, the Slack consumer will actually make some progress. And finally they will be at the end of the queue waiting for more messages to come in. So this is pretty neat if you integrate a lot of external systems because you can make sure
that one outage at a certain vendor is not going to impact all the other integrations here. So this example only has a single partition. So obviously you're probably going to have more partitions. So how about that?
And Kafka has a thing for that as well which is a consumer group. So the consumer can be in a group. So you give it a name and Kafka will understand that different consumers running on different servers with the same name are related to each other and it will assign partitions to them.
This actually looks a lot like how the broker works. So if you have a topic with nine partitions and three consumers, all three consumers will get one third of the partitions. And if one dies, the same thing happens. So these consumers get assigned the partitions from the broken consumer and everything
will just keep working. So a consumer always gets a full partition and since you can control to which partitions your data go, this allows you to do this routing thing I talked about earlier where you make sure that all the customer's data ends up on the same server.
So then we end up with this situation. So we actually have a very similar situation to the one we started out with where we have a few worker servers and one of them dies. But actually in this case, the customer's not going to be unhappy because the Kafka cluster will detect that the consumer is down and it will reassign a partition to a different worker.
This will happen within a matter of, say, a minute. Nobody really notices that something failed because it's just rerouted to something that is actually still working. So now we're getting to the interesting part
like seeing how can you actually use this from Ruby. Is this clear to everybody so far? There is basically not really a direct relationship.
So the brokers and the partitions both use this concept, sorry, the brokers and the consumers both use this concept of a partition in a very similar way. But actually to the consumer, it's not really relevant where the data is stored. It just knows that the Kafka broker will just tell it where to fetch data from. So from the consumer side, that's just totally transparent.
Yeah, so we're actually going to build this analytic system I just showed you earlier. So we've got the access logs right here. So they're still the same logs. They have an IP address and we can see the URL.
And we wanna end up with this table. So it's a really simple table. We'll just keep track of how many visitors we get from the US. It doesn't even take like time series into account, it's just like a total account for all visitors, like the simplest thing you can do.
And we use two Kafka topics and three rate tasks to make this happen. And our end goal is to just update data in a really simple actual record model. So the model looks like this. So this takes in, the model has a country code and a visit count field.
And it takes in a hash of country counts. So we will loop through the hash and try to fetch the country by code. If it doesn't exist, it will get created. And then we increment the visit count by the total count that was in the hash.
So this is like really standard usage of ActiveRecord, like nothing fancy going on here. And this is like the architecture of the whole thing. So we have the three rate tasks on the left side. Two Kafka topics on the right side. And then there's a model at the bottom.
So first we will import the access logs. These will be written out to a topic. Then there's some pre-processing going on. Finally we're going to aggregate them and write it all out to the database. And you might wonder why do you need the pre-processing step?
Because you could basically also just write it out to the database straight away. And the reason for this is that often the data isn't spread out evenly. So if you look at the bars in this example, actually most of our visitors are from the United States.
So if we would immediately route all this traffic to a single partition, like one worker server will have six times as much work to do as another one. And if you need to do some CPU intensive stuff, that one worker server might have like a huge load while the other ones are almost doing nothing.
And that ends up being really costly. And sometimes you cannot even fix it. You really have to do something else. And this is why we're doing some part of the work before we actually loop the data. I'll get to that in a bit. So step one is importing these logs.
And this is kind of cheating a little bit, because in reality, this isn't really streaming data because I downloaded a bunch of log files from somewhere and put them on my laptop. What this code does is it loops through all these log files and just writes them out
as messages to Kafka one by one. But in reality, all this stuff will be streaming in from all kinds of web servers. So on line six you'll see that Kafka don't deliver message call. So this tells Kafka to write out that line of, that log line to the topic raw page views.
At some point it's done and then it's, it actually imported all data. Step two is then doing the pre-processing. So we now only still have a raw log line. So there's an IP address in there and a URL. We still need to find out which country
it was actually from. So this is the second step. So we've got a Regex here that compiles the log line. I also found this somewhere online. So this splits out the log line into a few different segments.
Then we set up a GeoIP instance. So GeoIP is a way to get somebody's location based on your IP address. And then we set up a consumer and we ask it to read data from the raw page views topic. So this is a topic that we were just writing data
to earlier and we're actually getting it in this second rig task. And then for every message we pass that log line with the GeoIP and Regex thing and then we just turn it into a nicely formatted hash. So there's the timing there, the IP address, country, browser, and URL.
So we actually have like properly formatted data we can do something with in the final step. Then we write this out to the second topic. So the second topic will contain these JSON objects that are nicely formatted.
But on line 51, the actual magic thing is happening because we're setting a partition key. So this will help Kafka understand which data goes together. So everything that has the same city dot country code too will end up in that same partition. So we know for sure that we can aggregate it properly later on.
Then we get to the final step. So this, again we have a consumer. So we're now actually consuming the page view. So these contain the JSON hashes. And we set up some storage. This is from line 60 to 62.
So on line 61, there's a country counts hash and it's a normal Ruby hash. It uses the hash dot new zero syntax which means that if no value is present in the hash for a certain key, it will actually end up being the value zero instead of nil.
So we always start with a count of zero. Then it loops through all these messages. JSON parses it again because in Kafka it will be serialized. So we actually have to turn it back into a Ruby object. Then we increment the count. And then we increment that country counts
before we just introduced. So we're getting a country. We're using that to do the hash lookup. And then we increment that by one. So any time we have a visitor from a certain country like this hash will be incremented.
And then we do this thing every five seconds. So this is part of the main loop. So every five seconds this thing is invoked. And we call the active record model we introduced earlier. So this will actually write out the whole current state of the country counts hash to the database.
And then it will clear it on line 83. So after this is done, we will end up with an empty buffer again. And basically the whole process just repeats. So what happens is that this aggregation task is just reading in data for five seconds. It's incrementing the counts in that hash.
And then after five seconds it just writes current state to the database and it will be in there. And then it just restarts and does the whole thing all over again. And again if we go back to the real site it's all fairly standard. So this is our UI for this.
If you look at the controller we're just fetching the country stats that are available with the descending count. Then we also have to get the max, the sum and the max of the counts. So we know how wide the column should be.
And in the view it's just a really simple HTML table which we loop through. And that's it basically. So that's kind of the interesting thing in my mind is that you can use these Kafka principles to kind of like buffer huge amounts of incoming data and at the end that just reveals up at the end of it
which does no fancy stuff at all. So let's actually look at the demo. So I've got three tabs open here.
So this is like the importer. So this kind of fakes as you might remember being streaming data. It just keeps like pushing raw log lines into a Kafka topic.
So then we boot up a preprocessor. So if we pass it for a little bit you'll see that actually we're now getting some proper JSON. So this one is from Ivory Coast, Firefox browser. So we can easily work with this JSON data.
And we could also add a second one. So if we add a second preprocessor it will get half of the partitions and it will just double the capacity of the whole system. Well if it was actually running on a different server
of course because my laptop only has so much capacity. And then finally we run the aggregator. So this is going to output in the,
so if we look at Safari here this is going to, if we refresh this a couple of times it's going to have the same result.
But every five seconds the result will actually increase because we're not actually writing out every single update. We're just only writing out these buffered updates. So if you look at the aggregator here it's currently running for other countries to countries that are in our data set.
But again we can start a second one. And you'll notice that actually the list of countries in the first one will actually decrease.
So on the next tick of the aggregator in the top right tab there will actually be less countries than in the one slightly up. So this list is still everything. And then here on the second tick, which goes from here to here,
the second aggregator was started and Kafka noticed that the partitions had to be reassigned and then it actually spread them out over to worker servers. And we just doubled our capacity. That's what you can do with Kafka.
And that concludes my presentation. Thank you. Yeah, so the question is like what happens if a consumer dies but it hasn't committed to offset? And I actually didn't discuss committing the offset in the presentation, just keep things a bit simpler. But it comes down to that the consumer can control
when it will actually tell Kafka that it's done with some data. So when a consumer dies it can actually rerun a little bit and ingest data again. So usually that just works out really well because you only commit once you flush to the database and then it will be in sync. Yeah, so the question is is there any restriction
to the format of the messages? And well the answer is no. So a message has a key and a value and both are a byte array and you can put anything in there that you like. So we use a lot of protobuf in our Kafka topics. But you can also use JSON or whatever format you like.
Yeah, so the question is like do you do your own operations and how hard is that? And that's kind of the disadvantage of using Kafka. It's like you have to dive into a lot of Java things and you need to run Zookeeper and that's quite some overhead associated with it. You can buy it in Roku now and also AWS has something called Kinesis
which is I think was basically a Kafka ripoff. It's the same thing, only a different name. So that's one way to just buy it from service providers. And if you want to run it yourself you will be in a bit of pain to get it set up. Once it's running it's extremely robust but I'd like to understand all the configuration
and how to monitor it, it's pretty painful. Well, thank you.