We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

How do you scale a logging infrastructure to accept a billion messages a day

00:00

Formal Metadata

Title
How do you scale a logging infrastructure to accept a billion messages a day
Title of Series
Number of Parts
163
Author
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Over the past year, OpenTable have been re-architecting their system from a monolithic architecture to move more towards microservices and small applications. As the infrastructure has changed, so to the logging infrastructure has had to change. Originally we had a logging solution where all logs where based in SQL Server. We then adopted the ELK stack. This allowed us to be able to scale more. As the company moved into the cloud, we had to be able to scale even more. We decided that we would move to Apache Kafka. This talk is all about the steps we went through and how we approached the work as well as the lessons learned. We are now in a position where we are able to elastically scale our logging infrastructure easily. At the end of this talk, Paul will have demonstrated why Apache Kafka is a perfect addition to the ELK stack and how Apache Kafka allows us to add more resiliency and redundancy into our logging infrastructure.
28
30
76
128
Stack (abstract data type)EmailScale (map)Message passingApplication service providerSoftware developerWeightFreewareComputer architectureServer (computing)Metric systemSequelPort scannerLoginMessage passingTable (information)DatabasePhysical systemSoftware developerCartesian coordinate systemCentralizer and normalizerQuery languageBlogSpacetimeProcedural programmingStatement (computer science)CodeCASE <Informatik>Projective planeNumbering schemeScaling (geometry)Equaliser (mathematics)Elasticity (physics)Open sourceConfiguration managementWeightWindowData managementWritingBuildingSQL ServerComputer animation
Revision controlInformationNumbering schemeProduct (business)Type theoryPhysical systemDemo (music)Computer animation
Gamma functionPrice indexQuery languageServer (computing)Dependent and independent variablesConic sectionCartesian coordinate systemDescriptive statisticsProcess (computing)QuicksortMereologyType theoryInformationWordVisualization (computer graphics)Error messageOrder (biology)Physical systemLoginMultiplication signData storage deviceBit rateProjective planeVideo gamePasswordInformation privacyAreaField (computer science)Query languageResponse time (technology)Client (computing)Uniform resource locatorComputer animation
Hardware-in-the-loop simulationStack (abstract data type)Software developerIterationOperations researchWebsiteProjective planeInternetworkingMessage passingIterationInformationConfiguration spaceMechanism designLevel (video gaming)outputInstance (computer science)Stack (abstract data type)Software developerGoodness of fitBuffer overflowProof theoryComputer architectureMultiplication signReading (process)Set (mathematics)Product (business)Data centerConfiguration managementClosed setScaling (geometry)Computer animation
Source codePoint cloudBlogPhysical systemInterprozesskommunikationPartition (number theory)Function (mathematics)Local GroupTransport Layer SecurityOrder (biology)Java appletInstance (computer science)Message passingServer (computing)NumberRight angleMereologyPartition (number theory)MiniDiscWindowCartesian coordinate systemInformation2 (number)Data storage devicePhysical systemService-oriented architectureSequenceMechanism designMultiplicationComplex (psychology)InterprozesskommunikationCentralizer and normalizerDifferent (Kate Ryan album)IdentifiabilityBitState of matterGroup actionDirection (geometry)Queue (abstract data type)Data centerPartition function (statistical mechanics)Confluence (abstract rewriting)Fault-tolerant systemUniform boundedness principleWeightSoftware developerLoginLibrary (computing)InjektivitätGraph (mathematics)BackupType theoryTouchscreenConnected spaceCuboidRoundness (object)SoftwareMathematicsAbstractionCoprocessorGraph (mathematics)Slide ruleCASE <Informatik>Event horizonStack (abstract data type)Gene clusterMultiplication signSubject indexingCasting (performing arts)Stress (mechanics)Heegaard splittingClient (computing)Local ringBit ratePhysical lawElasticity (physics)Intrusion detection systemProcess (computing)Ferry CorstenLevel (video gaming)Disk read-and-write headConfidence intervalCrash (computing)MappingExecution unitLimit (category theory)Optical disc driveGoodness of fitOpen sourceReading (process)Cohen's kappaGrass (card game)SpacetimeComputer animation
IterationPhysical systemMessage passingData centerNumberQueue (abstract data type)Process (computing)Computer architecturePartition (number theory)LoginMultiplication signAbstractionCartesian coordinate systemData storage deviceCoprocessorInformationLengthSubject indexingIntrusion detection systemGene clusterKey (cryptography)InterprozesskommunikationMiniDiscScaling (geometry)Point (geometry)Web pageStack (abstract data type)ForceSoftware developerSpacetime2 (number)BitCentralizer and normalizerVolume (thermodynamics)Fisher's exact testException handlingArithmetic meanComputer animation
Computer architectureRight angleMoving averageMultiplication signInstance (computer science)Physical systemLoginNumberPoint (geometry)Complete metric spaceCartesian coordinate systemOrder (biology)Insertion lossCasting (performing arts)Structural loadVideo gameType theory
NumberAverageVolumeSpacetimeVertex (graph theory)Server (computing)Metric systemElasticity (physics)Message passingContent (media)Functional (mathematics)Queue (abstract data type)NumberMultiplication signPhysical systemGraph (mathematics)Subject indexingSoftware developerInformationMetric systemGraph (mathematics)SpacetimeRight angleBefehlsprozessorCartesian coordinate systemMathematicsDifferent (Kate Ryan album)Dot productPlanningMassServer (computing)Web pagePartition (number theory)Semiconductor memoryOperator (mathematics)Order (biology)EstimatorOpen sourceTotal S.A.AverageComputer architectureLoginTime seriesMereologyDatabaseMemory managementError messageBit rateElasticity (physics)StatisticsScaling (geometry)ResultantState of matterVideo gameSolid geometryReal numberLocal ringPoint (geometry)CASE <Informatik>Gene clusterSurgeryFile archiverWeightData centerGreatest elementTouchscreenSet (mathematics)ArmData storage deviceUniform resource locatorStack (abstract data type)Table (information)
EmailMenu (computing)RoutingExecution unitVotingData typeInstance (computer science)Partition (number theory)Complex (psychology)Data storage deviceLatent heatRight angleQuery languageMessage passingCartesian coordinate systemCodeType theoryNumberPhysical systemAverageMathematicsBlogDeterminismMetric systemMultiplication signSoftware testingComputer fileSoftware developerHydraulic jumpPoint (geometry)Thread (computing)Order (biology)Graph (mathematics)Software development kitLoginForceResponse time (technology)Series (mathematics)Product (business)Focus (optics)TrailMathematical morphologyGraph (mathematics)DatabaseTwitterLink (knot theory)Real-time operating systemInterprozesskommunikationGroup actionOverhead (computing)MassCasting (performing arts)Semiconductor memoryAdditionDifferent (Kate Ryan album)Term (mathematics)Single-precision floating-point formatSubject indexingServer (computing)Instance (computer science)Reading (process)AbstractionEstimatorSet (mathematics)Elasticity (physics)Diagram
Execution unitPointer (computer programming)Magnifying glassSingle-precision floating-point formatVirtual machineInternetworkingOrder (biology)State of matterWeb pagePlotterPhysical lawTouchscreenQuery languageLoginGraph (mathematics)Computer animationSource codeXML
Vertex (graph theory)Instance (computer science)Data typeTwitterCartesian coordinate systemOrder (biology)Term (mathematics)Local ringProcess (computing)Right angleSoftware developerOpen sourceFile systemComputer architectureInstance (computer science)InformationConnected spaceGoodness of fitMultiplication signReading (process)Physical systemServer (computing)Time seriesLogicLoginBitMassDatabaseFamilyMetric systemWordTimestampCAN busMathematicsCASE <Informatik>Memory managementOperator (mathematics)Grass (card game)Sound effectScaling (geometry)Uniform resource locatorWindowBlack boxQueue (abstract data type)BuildingGraph (mathematics)Computer fileEvent horizonComputer animation
Stack (abstract data type)Cartesian coordinate systemMathematicsCentralizer and normalizerCodeData centerCASE <Informatik>InterprozesskommunikationEmailMetric systemQueue (abstract data type)Message passingPartition (number theory)WeightOperator (mathematics)Multiplication signTwitterSpacetimePhysical systemGoodness of fit2 (number)Client (computing)Entire functionFilter <Stochastik>Order (biology)LoginOpen sourceLevel (video gaming)Server (computing)Group actionSoftware developerInformationCuboidFile formatSimilarity (geometry)Data structureDatabaseBus (computing)QuicksortSoftware testingTime seriesGene clusterPotenz <Mathematik>Point (geometry)Right angleSoftware bugSet (mathematics)State of matterForm (programming)Decision theoryStrategy gameVideo gameStack (abstract data type)Forcing (mathematics)Type theoryElasticity (physics)Physical lawAreaSubject indexing
Division (mathematics)Software developer
Transcript: English(auto-generated)
Okay, let's get started. Thank you all for coming here. My name is Paul Stack. I'm here to talk about how I scaled a login infrastructure that was available to accept messages up to a billion messages a day. I no longer work for the company that I did this at, but I only very recently changed. This is actually a
very relevant talk still because it's work that I did very recently, and I actually still have open sourced all the code. They actually allowed me to do that, I didn't just press buttons. But the code is all available and I can show anybody any snippets at the end if they need anything. I have some contact details, so get in contact with me if you want to see any of the actual code.
I'm an infrastructure engineer. I no longer do software development. I actually build on infrastructure, so dealing with configuration management systems and configuring build systems and architecture and logging and messaging and metrics, etc.
I'm a reformed .NET developer. I no longer enjoy working in the Windows space. I much rather enjoy working in the Linux space now. I'm a DevOps extremist and I absolutely love conferences. The background to this project was at my last company we had a legacy, and I do mean a legacy logging solution.
By legacy, I mean it was actually in a SQL server database. The central logging system was a SQL server database and people would push logs through it via stored procedures. And then to search the logs, they would actually have to log into SQL server management studio and write queries and
try to do full text searching across a very large database and SQL server was causing some problems for our applications. Now, some of the bottlenecks that people found is that developers were doing full table scans and not having no lock on the queries, which we sometimes do forget to put no locks on our SQL statements.
It's not a problem. So we wanted to have something that would allow us to search the logs in a better way. We started using the ELK stack. When I say ELK, I mean Elasticsearch, Logstash, and Kibana. Anyone use ELK stack? Excellent. A few people. Anybody not know what it is? Excellent. That means I can show you some stuff, which is even better.
So Elasticsearch is a schema-free document database and it allows full text searching because it's based on top of Lucene. Logstash is a data pipeline that will allow you to process your logs and other data and it will transport them to an Elasticsearch system for searching.
And Kibana is a very simple UI that sits across the top and allows you to visualize things. Let's see if I can show you. So this is actually a public demo. You can go and you can Google Kibana 4, which is the version of Kibana that's currently in production.
And you can go and have a look at it and you can see the types of information that you get. So we'll come back to the actual fun stuff, which is the dashboards that you can make across the top. But effectively, you transport lumps of JSON, whatever you want in there. It's schema-less, so you can put whatever you want into this system and then you can start to query and filter based on top.
And if we choose... Oh, that's not good. There we go.
So that's a lump of JSON that gets thrown in. And as you can see, you have lots of fields. Fields have a type and therefore, because there are types, you can search based on the types. Very simple, very easy. Now, you can put all sorts of information in there. You can put your status codes, your client IPs, as you can see. You can put request URLs. You can put never, please, ever store passwords in there.
I have actually seen some people pushing passwords into their logs and you can see them in plain text. Please do not do that. That is a bad practice. I'm not saying you're wrong because I hate that, but it's a bad practice to do that. And anything that's there, you can actually sort and you can visualize and you can search based on.
So with Kibana, you can actually build pretty dashboards because it's basically a JavaScript-style application and you can apply CSS and JavaScript and start to build how your system looks and you can start to build systems that look like this. And that is actually doing live queries and you can see, excuse me,
you can see there's like 102 people are doing whatever in this specific part of the world and you can see in America and that part of America and you can start to go around. But also, you can visualize response times and errors over time and success rates versus failure rates, et cetera, et cetera. So it's an extremely rich way in order to get your information and visualize your information
in order to make it visible for people. Sorry, I'm just waiting on that to refresh.
So we wanted to take this stack and we wanted to take all of that logging information and now when we started the project, we were only maybe logging 10 million messages a day, which is not a huge amount, because we were never a very big site. We were never really taking hundreds of millions of hits on the internet. We're not something as big as Stack Overflow or something like that
because they have huge sets of data. So we thought, let's put this in place and let's see if we can grow it and expand it and scale it. We could have used another tool like Splunk, which we could have bought and just put in place, but we thought it's better if we grow our own and we adapt as we go. So we had iteration zero. A developer actually took a jar, it's an all-in-one jar that you can download from their site
and run it on one instance. And that spins up all three and you can just push data to it and that was okay. And it worked for a while. It was a good proof of concept that showed that we could really get some level of information and that we could actually search that information.
It took about 15 million messages a day, but it was more down than it was up. So it was time to make it production ready. Let's deploy some DevOps practices because apparently that's the best way to do things. And everything that we actually built was via configuration management, so we could scale it very, very simply. So, we went to iteration one.
And iteration one was using Redis as an input mechanism. Now, the architecture looked like this. If you forget about the secondary data center right now, I'll talk to you what happens in the main data center. We have a bunch of application servers and they needed to talk to a system to push data to it.
Now, we called those log processors. Log processors would store the data temporarily so that Logstash could actually index it and push it to Elasticsearch and that was actually done via log indexers. And then we had a large Elasticsearch cluster. We had quite a few issues with this, a huge lot of issues.
Now, the internals of the system looked like this. Applications could either push data in one of two ways. These slides are available online, by the way. You could push data into Redis directly. Actually, some of our developers wrote some logging libraries from Node.js and .NET
that actually connected to Redis and pushed logs in there and it did it in an asynchronous manner and it was good. Or, if there was rsyslog enabled or nxlog, which are tools in order to pick up our IIS logs or our Windows event logs, that was connected directly to Logstash and Logstash was forwarding the data into Redis. Redis was being used as the storage mechanism
ready for the data to go to Elasticsearch. This was bad. This was very bad. So, we had many data centers and every data center needed to push that data into our central data center as well. So we were trying to do HTTP connections between the systems.
Now, the biggest issue we faced was when we did have a network issue between the data centers, our queues would build up in Redis. So, let's just say the log processor nodes in the secondary data center, one of those clusters actually got to having 12 million messages in our Redis queue.
And, as we know, Redis likes to store things in RAM. We had actually disabled storing things on disk because we thought that that would be very fast. And, as we were storing on RAM, when Redis crashed, we lost all of our information.
We lost over 12 million logs. We actually lost more than that because we didn't realize that we had hit the RAM limit. So, the boxes were like 16 gig of RAM. We had given Redis like 12 gig or 14 gig of RAM and the graph was growing and growing and growing and we could see the RAM filling up and then it flatlined. And we were like, oh, we've stopped sending the messages.
That actually wasn't the case. Redis was de-queuing. So, more messages were going in, other messages were getting pushed off the queue. So, every time a message was going in, we were losing another one and we potentially lost approximately 20 million messages. And that was not so good. That was really not so good. So, we decided to look around at a new piece of the system
and we started looking at Apache Kafka. Anyone heard of Kafka? Oh, more people have heard of Kafka than the ELK stack. That's impressive. So, Kafka is actually a pub sub. It's really good.
It was originally open sourced out of LinkedIn in 2011 and in 2014, they actually created their own company based on Kafka and it's called Confluent. Now, there's a few different pieces to Kafka. Kafka is made up of topics, producers, consumers and brokers.
So, feeds of messages that are of the same type are stored together in a topic. Systems that publish data to Kafka are called producers and systems that read data from Kafka are consumers. It's not rocket science, right?
They've made the names quite easy. Now, brokers are, you can have distributed Kafka systems and you can store data across multiple replicas and multiple brokers. So, it's just actually storing data across different pieces. Communication is all via TCP and it's backed by ZooKeeper.
Now, usually when I tell people Kafka's backed by ZooKeeper, they groan and they're like, oh my god, ZooKeeper. ZooKeeper is actually very good. It's just a bit of a beast to manage. Anyone use ZooKeeper? One person. Yes. At least there's one. Well, I do as well, so it's two, I guess. So, there's not many.
I've forgotten. .NET, it's not very common in the .NET world, actually. I gave this talk at a Java conference and everyone was like, yeah, I love ZooKeeper. So, Kafka topics look as follows. You basically, each topic is split over a number of partitions. Each partition is an immutable and ordered sequence of messages
that gets appended to. Think of it as a commit, log, and get. Any single change creates a new message and it continually is ordered and it keeps driving forward and forward. Unlike it, you cannot change the state once it has been written. Messages are assigned an ID and that's their unique identifier
and there can be replicated over a number of servers for fault tolerance. A partition has a server that acts as a follower and the other servers in the cluster will act as... Excuse me. A partition has a server that acts as a leader and other nodes in the cluster will act as followers.
If you have a Kafka cluster of three nodes and you have three topics, each topic will have a different leader. They do not always have the same leader because it likes to balance out the communication across the cluster. Kafka producers are probably the most complex part of the system.
Producers are responsible to choose where to publish data. So if you write an application and you want to publish some data to Kafka, you would have to tell Kafka what topic you are writing the data to. But the producer is actually responsible for handling the internals
and it does it in partitions and it can either do it via round robin or via partition functions. My partition functions make things very complex and you can start to say, even within this topic space, I want to store some messages in this partition style and other messages in this partition style. So you can actually really start to become much more fine-grained.
Consumers are easy. Consumers can either be done via queuing or PubSub. But Kafka takes care of that layer of abstraction for you because it creates what's called the Kafka consumer group. Kafka consumer group is actually...
It's a layer of abstraction that Kafka provides itself in its clients. And dependent on whether it's multiple systems reading or if it has to be done via PubSub, Kafka will decide that for you and it will relay the messages as needed. The other important thing is that Kafka is actually strong ordered.
Lots of people say that they do this and I'm almost sure that you could use another messaging system in place for this. We needed the messages to be read out in the order in which they were written
because we didn't want an IAS log that was coming in at 11.55 to be read out four hours after it came in. We needed it to be written in the exact order in which it was fed in so that our graphs were much more consistent and the data was much more consistent. Kafka say they guarantee that
because it's an immutable sequence and it's a commit log and it increments, then that's how they can guarantee it. Now, the complexity with Kafka is that when you have a consumer, a consumer can only talk directly to one Kafka partition.
So if your application has got four partitions and Kafka is writing data across four partitions, you actually need a pool of four consumers because it's a one-to-one mapping in order to get that data out. If you only provide one consumer,
you have three partitions that you will not be reading data from. This is how they say they guarantee strong ordering. As I said, it's backed by Zookeeper. I use a system called Exhibitor for managing my Zookeeper instances. It's a co-process that allows you to run backups and also monitor
and also visualize what's happening inside your Zookeeper cluster. This is actually open-sourced by Netflix, which has just made a lot of people's lives a lot easier. When you're in it, that last screen shows you all the different Zookeeper servers in the cluster.
Then in the next slide, you can start to dig into some of the messages and you can see the anatomy of your Kafka topics and how many brokers and partitions, and you can start to see the offset IDs, et cetera. This is all brilliant. We thought this message and technology is really cool, so let's introduce this message and technology into our pipeline.
We actually created a system that looked very similar to this. This was actually the system. I don't know why I said that. Things change very slightly here. There was a few things going on. Firstly, our Elasticsearch cluster had grown to over 90 M3 large nodes
in EC2. That is approximately 744 gig of RAM. The number of messages had really scaled up. Really scaled up. It was like tenfold what we were doing before. Elasticsearch was very hungry for RAM, and we had to spend a huge amount of time managing our Elasticsearch cluster.
We tried to make things quite simple with Kafka. What we did is we introduced a Kafka cluster and a ZooKeeper cluster, and the log processor nodes would talk to Kafka. There is no log indexer nodes anymore.
We got rid of all that layer. The internals we changed very slightly as well. No longer were we using Redis as the temporal storage. We kept Redis in place because we didn't want to break any of our developers' applications, and they would continue logging as normal to Redis.
But as soon as data arrived in Redis, Logstash would pick it up and throw it straight into Kafka. We had monitors in place that always said our Redis queues should be as close to zero as possible. We did not want our queue length growing,
and if we had a queue length that was growing, we knew that there was a problem in the system and we could fire alerts based on that. Anybody who wanted to were just talking still directly to Logstash, and that was just going straight into Kafka as well. Now, between the data centers, we got rid of this HTTP communication of a push.
Before, in data center 2, we were trying to push that data across into the main data center. It was very forceful, and if the system was down, we would lose messages. Now, we wanted to change it because Kafka is a PubSub. We thought, we can subscribe to it. So Kafka has a process built in called the mirror maker,
and what you do is you tell it, Kafka then becomes both a consumer and a publisher because it's reading data from one Kafka cluster and publishing it to another. So it's very versatile, and we could deploy this mirror maker and just tell it to talk to whatever one of our data centers
and pull all of that rich login information in. So things were growing. Things were getting very fast. Any questions so far? Excellent. It means everyone understands perfectly. Oh, sorry. What is ZooKeeper? Oh, ZooKeeper is like a...
Think of it as a very simple key value store. You just throw data into it in a key value way, and it will allow you to search back out. So the reason Kafka uses ZooKeeper to back things is that it needs to know what offset IDs it's at, what partitions are there, what topics are there. So it just stores all of that information,
and all of the nodes in the Kafka cluster talk directly to ZooKeeper and say, hey, what other nodes are available? How many partitions do I have to go across? And it just uses it as its brain, as its storage. Kafka itself actually is extremely simple and just stores on disk. It's just if you need to expand your Kafka cluster,
you don't want to add more nodes, add bigger disks. Just keep adding volumes, especially if you're in EC2, you can continue attaching more and more volumes, and it will grow and grow and allow you to store more and more data. We were actually using M1, M1 XLs, which each node allowed us to store about 1.6 terabyte of data on it, which allowed us to store data for up to 30 days.
Okay, so that's the way it was. This was going well. And as any typical developer, we thought we would make improvement. We love adding layers of abstraction and more improvements to systems
because we like tinkering with things. And what we decided was is that this architecture wasn't quite right, is what we thought. Because each secondary data center was being handled in a very slightly different way
to the central data center. Because in the secondary data center, applications would write basically and have their logs stored in Kafka, and that would be pulled across into the central data center. But in the main data center, we were writing to Kafka, and that was going directly to Elasticsearch,
and it wasn't being pulled through. So we weren't able to decouple if there was any problems in Kafka. We would just lose all of our logs. So we added another layer of Kafka clusters. I still don't know why we did this now, looking back on it, it was ridiculous. And as you can see, things started to get really complex.
Now, the secondary, we named things feeder Kafka's and main Kafka's. And what we did is every application would send their logs to a feeder Kafka, and then the main Kafka cluster would bring them through. I can see, you were going,
one, two, what? Exactly, exactly. That's exactly what looking at it now, it's crazy. But we thought that it made sense, but in looking back, the other architecture was probably better. Now, we did something completely different at the same time. We had scaled up hugely here.
We had really scaled up hugely. So we thought that at this point we were up to 120 M3 nodes for our Elasticsearch cluster, and that was becoming really painful in order to keep alive. So we thought, let's resize our instances. M3 nodes are about 8 GB of RAM each, so we changed to R32 XLs,
which were 32 GB of RAM each. So we changed our cluster massively, complete instance types, we had a roll and rebuild of everything, and right now, before I left, we ran about 30 R32 XLs, which was about 1.8 TB of RAM. So we were starting to hold a lot of data in this system,
a huge amount of data in this system. And the numbers looked as follows. So we usually kept logs in Elasticsearch for 30 days, and then our Elasticsearch indexes were archived. That means that we approximately had 12 billion active documents that could be searched at any point,
which was a lot to be able to search, and that our Elasticsearch space used was about 25 to 30 TB in EC2. Our average doc size was about 1.2 KB, and using this new architecture in Valentine's Day 2015, we were able to get to about 750 million documents collected
without any failure. Without failure. Now, I'm not saying that we didn't lose messages. We'll talk about that a little bit. The last company I worked for had three very busy times of the year. Mother's Day in the States, not in the...
I don't know when Mother's Day is celebrated in Norway. Is it the same as America? So in the UK, we have a completely different day for it. But Mother's Day in the States, New Year's Eve, and Valentine's Day. And these were basically the three times
where all hands were on deck in order to make sure that our systems were alive. So we needed this infrastructure to be completely rock-solid, or our system, our engineers and developers wouldn't realize if there were actually any problems. So as you can imagine, the login infrastructure became extremely critical.
As it was a messaging system, you could throw anything through this, okay? To me, a message is a message. It doesn't care about what its contents is, so logs are a message. But system metrics are a message as well. So we could have made our system metrics better and pushed that through this. And that was one of the plans, but then I left the company.
So I believe that that's still one of their plans. We needed a huge amount of metrics and monitoring around this, okay? And we used Nadios. Anyone use Nadios? A few. Anybody not use a monitoring tool?
Go and see your ops people. Ask them what they use. See how they monitor the system. Start to pay some attention, and you'll actually see some real benefits in being able to keep an eye on your systems. And we had alerts on pretty much everything in the cluster, okay? We had alerts on our Elasticsearch cluster, our ZooKeeper and Kafka nodes, our Logstash nodes, our Redis nodes, everything, okay?
And they looked similar to this style, okay? So you were checking that the Redis service was alive. You were checking the Logstash service was alive. You were checking that Elasticsearch wasn't in split-brain. You were checking that the number of Elasticsearch nodes in the cluster was what it was supposed to be. So we basically checked every part of the cluster
in order to try and make sure it was there. But we were actually also collecting a lot of metrics in the cluster as well. Now, the metrics that we collected, you have what's called the Kafka Offset Monitor, okay? It's a small application that's open source, and you can go and grab it.
And you actually just point it at your Kafka cluster, and because it uses ZooKeeper, it can go and read how many messages are stored in the queue, how many have been read from the queue, and any consumers, it will show you the lag between messages still in the queue to be read and messages that have actually been gone through. And we could actually start to graph and start to see
what the lag was between our data centers. Now, the second graph from the top left, that graph should almost always be towards zero. But we could see that there were a huge amount of messages coming through the system, and we could see that there was always a lag.
So we were actually able to go to our developers and say, hey, you're probably getting a two- or a three-minute lag on your metrics, so therefore don't panic, okay? Or if it was absolutely critical, we would have had to go and do something about it. We would have had to scale up the number of consumers and the number of partitions. But we were actually always able to check and find out
on a rough estimate of how long it was taking the messages they make their way through the cluster. Normally, the Kafka offset monitor runs as a stand-alone application, but we wanted to push those metrics through to our metric system, which is called Graphite.
Graphite is an open-source time series database that allows you to search data and allows you to store data. And we wanted to push all of that information into Graphite, so we forked the Kafka offset monitor. We actually implemented pushing it to Graphite, and OpenTable are actually waiting on a pull request to be accepted for the Kafka offset monitor for that functionality to go back in.
We also wanted to collect a huge amount of metrics on Elasticsearch. Elasticsearch is a beast. It's one of the most incredible tools that I've used because it is just so versatile.
But it is extremely hungry for RAM, extremely hungry for RAM, and we needed to actually monitor every piece of it. So we monitored on the top left, we have the number of documents online. The graph below that shows the average doc size.
The graph below that is the number of documents actually currently being indexed, and the number of nodes are online, and the space used, et cetera, et cetera. Now, that top left-hand graph actually shows the growth of the number of documents online on the lead-up to Valentine's Day.
And we actually went from six million documents, yeah, about six billion documents to about 12 billion. So we doubled the size of data in RAM. And as you can imagine, there was lots of fires happening at the same time.
So we needed to drill further in, and we wanted to look at things like the heap size and the JVM usage and the recycle time and the refresh rate, and we were able to get all of this information. It's just JVM statistics, and you can grab all that information, and you can graph it.
And there's lots and lots and lots of graphs that go with this that do different things. Visibility rocks. If you don't have visibility into your system, you cannot understand if your system is working. Now, I had a talk yesterday, and I showed this exact same graph
and I'm just about to show, and this is what we allowed our developers to do based on our logging system. So this is a page from one of the teams. One of the teams looks after a number of APIs, and it shows the total of requests to each of those APIs, the number of non-200 requests,
so they could see the error rate on their applications, they could see the time taken for requests, and they also actually have, on the top right, they have what's called deployment. Now, it's not hugely visible, but in the main graph right in the middle of the screen, you see some little black dots along the bottom axes,
and that is actually plotting when a deployment happened, so that you could see the changes in the application after a deployment. And you could very quickly identify if there was a CPU increase or a massive memory increase or that response times went through the roof. And we were doing that based on our logging infrastructure,
which is why it became extremely critical for this logging infrastructure to scale and be able to adapt and grow. As you can imagine, when it came to Valentine's Day, this needed to be as close to as real time as humanly possible, because you don't want a 20-minute lag, and you don't want to be 20 minutes behind your system
and understand that if your system has gone response times from 10 milliseconds to 500 milliseconds, and you're only finding that out 20 minutes after customers. This is what the message system, Kafka, allowed us to do, because every time we needed more consumers to be, more metrics to be read out of Kafka faster, we increased the number of consumers,
re-partitioned, and we could read much faster. Can you, sorry? You can re-partition Kafka as you go. You don't actually have to take anything off. And as you can imagine, it can be dangerous,
as in anything, but because Kafka is storing the data on disk, all you're actually doing is changing the number of files that it has, because it stores it in files and it sits in a little ticker and it's literally just one, one, one, one, one, one, one. And it just has to go through extra jumps, which means that you have more files.
Think of it as like threads, right? Each file, the more files you have or the more threads that you have, you can read faster from them. And that's what you were able to do as you went, and you could scale up the consumer groups very simply, just point at a Kafka, and it would just go and read the extra ones. So it's very clever in terms of doing that. And the elasticity of it in EC2
meant that we could spin those up and destroy them when needed. So you didn't have to have very long running nodes for Kafka in order to actually read the data. And everything was in EBS, so if we lost that Kafka node, you could potentially reattach the EBS node, but we had always had replicas. So we usually ran four partitions and one replica of data.
So we usually had data across two of the three nodes, which is like the minimum specification that you should do in a distributed system like that. I talked about message size. I said that the average message size was about 1.2 kilobytes.
Without visibility into our system, we would not have realized that some specific teams were logging a huge amount of data. One team actually turned on the trace by mistake in production and were logging messages up to 50 megabytes in size. That's 50 megabytes of JSON.
The time taken to index that by Elasticsearch was pretty high. And having graphs that look like this gave us the average message size, and we could see when it grew, we could dig into a specific time portion, and we could go and not blame the team, of course,
but we could go and speak to the team. It's like, I think you've enabled tracing again. But this is the type of visibility you need into your system. It really is, because what we wanted to do is we wanted to be able to attribute a cost. Logging is an overhead that's shared by many teams.
Our logging infrastructure here was probably costing us about $11,000 a month to run an EC2, but that was quite a small price to pay for almost as close to real time as we could get it. We were able to go, as every team were creating their own logs,
they were actually sending in their tags with their logs because they could put whatever they wanted in their JSON. They could say, the team is this, and the application is this, and we could actually start to work out team X is actually using logging five times more than team Y. In some business-focused companies
that want to work out what the cost of each application is, you could actually attribute a cost for logging and metrics for those teams as well. It's all about visibility. You cannot see into your system unless you have graphs in place like this. I would never have been able to do this via SQL Server queries.
Not an absolute hope. It would have been extremely difficult. So, what would I do differently? So many things. Firstly, I would definitely not have introduced that third layer of abstraction.
One of the reactions down here at the front was, I couldn't have done that any better myself because when we look at it now, we're developers. We love to improve things. We love to make them what we think is better. But when we actually look at it sanely,
you're like, why on earth did I do that? I would not do that ever again. But what I would do is I would be very aware of my success of Reads and Writes within the system. Has anyone ever heard of the Jepson series by a guy called Afar on Twitter? It's incredible.
If you haven't read it, the link is here, afar.com slash tags slash Jepson. He has basically taken every single database, in turn, and written a blog post about it, pointing out its flaws. And when I say flaws, that's where it guarantees right activity,
and he's actually proven that it can't guarantee right activity. And he's published all his code that does this, and you can go and you can actually run your tests. They're quite expensive tests. You have to spin up a lot of kit in order to do them. But it's incredible. So I would become much more aware of if my system was actually writing the logs,
which is very important. I would put a lot more metrics in place in order to say a thousand messages going in, a thousand messages have just been written to Elasticsearch, or X amount, whatever you want. The reason we didn't focus on that right there and then was because of business value.
That was not giving us a huge amount of value. Because it was a logging system. It wasn't used as an audit trail or it wasn't used for billing purposes, so we could afford to lose a percentage of logs. Now, looking at it, if it was 50% of logs that we're losing, then we're in trouble. We think it was probably in the region of about 5%.
It's probably a rough estimate in order to get that. Go and read this database series. It will help you choose technology sets. It's incredible. It really is. The next thing I would do better is I would actually choose the right EC2 instances from the offset.
As well as changing the system in order to introduce Kafka, we tried to change the Elasticsearch instances at the same time, and it was so many different layers of complexity. I've said it before, Elasticsearch likes RAM.
It likes a lot of RAM because it's doing a lot of queries on the fly. And because it's doing queries on the fly, every time someone opens a dashboard for seven days, it will take all of that data and store that seven days' worth of data in RAM so that you can query it fast. Now, you can see, hopefully,
I'm hoping I have a good internet. There you go. So you can actually start to say status, OK, status,
and it will go off, and it's actually searching in RAM in order to, for all the logs that have status included in them, there's going to be millions here, and it will highlight on the screen exactly where it's used. But it has actually worked out every single tag that's currently in all of those documents,
and it has plotted them on the left-hand side of the page. So you can actually start to build up some very complex queries and start to say things like, let's add ID in there, let's see all the machine IDs, and then you have a little magnifying glass, and it's just added a query to the top left in the red, and you can start to see it plotting on the graph.
So it has to be able to do all of this very fast, and the only way that it can do that is actually through RAM.
Managing 120 nodes is a lot more difficult than managing 30 nodes, as you can imagine. So if you're in EC2, definitely spend some time researching about using the right size node for your system. It's very difficult. If you have a very good architecture and system that you can destroy very easily and bring back up,
and everything can be brought up as a Phoenix, then it's much easier. The JVM. It is a very large black box. I have no... I can't say that. I had absolutely no idea what the JVM did.
I still am not very sure, to be totally honest. It's amazing. It's incredibly powerful and very complex. But since I got into the operations world, I've had to be exposed to a lot more of the JVM tools. And because I've been exposed a lot more to the JVM tools, I need to understand the JVM more.
This is why we started to graph our JVM usage, our JVM heap, and all of that rich data that came. Personally speaking, it's just pretty graphs, and I don't know what they are, and I have to learn, and I'm continuing to learn every day in order to do that. But it's all available to read and to see the trends as it goes through.
I would say one thing. I have learned a huge amount as I'm building this system, and I have applied that exact same thought process to where I'm working right now. And we are building a Kafka cluster, and we're building Zookeeper instances on a much, much smaller scale. But there was a huge amount of rich learning in order to get that.
Redis was painful for us only because we didn't use Redis correctly. Anyone use Redis? A couple of people. Redis is a bit of a base to cluster.
You can do it, and you can use Redis Sentinel, but we didn't have any Redis cluster in place. We had nine instances of Redis, and every single one of them were completely standalone, and we just had monitors to make sure that Redis was up and that the queues were zero. We probably should have had a lot more sophistication in place around our Redis cluster,
but by failing in that area, it led us to use Kafka, which ended up being a better system anyway. The other thing that is the real problem is cost. I could have bought Splunk. Splunk is a tool that will do this for you.
And we probably estimated up front that Splunk would cost us about $200,000 a year and that we would have to pay that, and it would be hosted. We wouldn't have to care about it. It was there. We thought that we could do better. We're developers. Why can we not think that we can do better? So what we thought is, we'll build our own.
We'll open source everything, and we'll use all these tools and build our own. If we were spending at least $11,000 a month on a login solution, then we were actually very close to being that we could have justified the Splunk payment up front and actually just gone for it.
So the lesson there is that don't reinvent the wheel. We're very good at that as developers. We try and always reinvent things and do things our own way. I'm not saying I would go and use another tool. I would buy a tool off the shelf and do this. If that's what you want to do, that's great.
Disregard everything I've said and go and do it. This is more a case of we learned a huge amount about our systems and our ability to grow and adapt as the company grew and adapt and changed from a monolithic architecture to a microservice. Micro-monolithic based architecture. There's so many terms around microservices.
I love the term micro-monolith, so I keep using that as a microservice. We went from one application, from one main application to 50 different applications. And having all of that right into Splunk, which would have been a hosted solution, would have been troublesome for us.
And we have one last learning from it. Please, please, please, please do not log via HTTP. Please. Because if you're in an application
and you create a HTTP request to a system that doesn't exist or has gone down and you don't have very good guard logic in place to retry or timeout, you will crash your application. That may or may not have happened to us a few times. It did. It happened a lot. It happened a huge amount. But it made our developers better
because they learned from it. They actually blamed the logging infrastructure a lot more and they were like, your logging infrastructure is dying again and we're like, well, you guys can't code. I'm just joking, that's not actually the case. So HTTP logging is bad. The way I would do it is write to local disk and ship the logs. Use a log shipper. If you're on Windows, there's a tool called NX log.
And it's free. It's a very simple, small EXE that will run on Windows. And it will pick up IS logs, Windows event logs, and it will ship them to a location that you tell it. And you could deploy that on your application, write using log for net or whatever the cool kids use these days for logging,
and write to a local file system and then actually get the shipping after. So your application is working much more effectively. It's not having to make extra connections outbound. It's just making connections to the file system. That was one of the key learnings of this entire experience. Are there any questions? Yes.
So the question is, why would we use Graphite and Grafana for our metrics rather than just using Elasticsearch?
We could have. We could easily have done that. We feel, we felt that Graphite as a time series database was fantastic and that we could actually it's designed for much smaller chunks of information. So time series data is like timestamp metric, timestamp metric.
It's very, very small. It's very tiny and compressed. Now if we were mixing huge text search from massive lumps of JSON in the same system as very small time series changes then we may not have been able to be very effective at reading that data back. So let me put it a different way.
Elasticsearch and Grafana was great visibility for the developers. Grafana was very good visibility of metrics of the system for the operations guys. So that they could effectively not bring each other down if that makes much more sense. Any other questions?
So the question is have I seen anyone using it for application messaging? Yes. Lots of people are starting to use this as application based messaging. Unfortunately if you're in the .net space
the client that connects to it is kind of poor at the minute. They're an open source client. You can go and help them. I actually found this to be an easier system to set up as a message bus than I did for RabbitMQ. It's a trade off right? It's like when you choose a database
technology you have to choose it for the right reasons. The same thing for a message system. If you need a very strong ordered message queue this will do it. And it won't care if it's logs or metrics or an application itself. What's the primary issue with the client at the minute then? I actually don't know because I haven't written .net in a long time. I would
maybe it's just my C sharp code to be totally honest with you. Check it out though. Go and have a look. Evaluate it and see. It's really simple to spin up. If you want some code I will give you the code. I have a couple of vagrant boxes that you can spin up locally and test this and it will give you a cluster. So if you want to give me a shout out mail me and I'll send that to you. There was another question over here.
Would I ever re-index into Elasticsearch? I'm not sure I understand. Oh I see. 30 days only on Elasticsearch.
We tried to keep our Kafka clusters as only for a few days. So the central Kafka cluster in the main data center which was feed in Elasticsearch, we tried to keep a week's worth of logs there. Because in case there was a catastrophic failure in our Elasticsearch cluster we could at least replay 7 days worth of data into a new Elasticsearch.
In the actual secondary data centers themselves, we only ever kept one day. Because if an entire data center went down for more than a day we were probably going to be in worse trouble than logging. Any other questions?
Very good question. So the question is were we able to use this system in order to optimize what we were logging because there was a huge amount of information going through there. Yes is the answer. So we did this in a few ways.
I wish I still worked at the company because I could actually show you the dashboard that do this. Firstly everything had a type. Every log message that went through this system had a type of log that it was. And it had info debug, the application
which team was writing it. And we could see which teams were writing more logs. Now when we were able to analyze this we were able to say application A is logging 100 million messages a day but yet is only getting 25,000 requests. So we could very quickly correlate between a huge amount of messages
because it's a very chatty application and what the actual messages were themselves. Now it helped us identify that in the service discovery system every node was writing a message every five seconds just saying I'm alive, I'm alive, I'm alive and we could very quickly with Logstash filter those messages out because Logstash provides filters
that when data goes through it it can either reject it or mutate that state so that it can move it into a different format. And we could quickly go we don't need those log messages let's just discard those. Nobody's application had to change which is like backwards compatibility for developers they never had to change things unless we told them you've got to completely change the structure of
your data going in. Make sense? Any other questions? So the question is how do we decide our partition strategy? That's actually what we did
to start with. We were like four sound okay? Yeah let's go with four. And we were no more sophisticated than that. When we needed to repartition you go in it's like exponential okay you go one, two, four, eight, 16, 32 et cetera, et cetera. So we needed to do that and we actually
got up at one point I think we were running on 64 partitions but that was only for a very short amount of time and then we actually toned it right back down. So we found four was perfect for our level of data. Now LinkedIn they push hundreds of billions of messages through this every day and I've
been on a podcast with some of their engineers and they were like saying 64 partitions is perfect for them. So I guess it purely depends on how many messages you're going through but also how many nodes in the cluster as well and how many consumers that you want et cetera. If you only have a single application that consumes from it, it's probably best
only to have one partition. Topics is completely up to you abstract. So logs were a topic for us. Another topic was metrics. Any similar grouping of data that you wanted to be
able to consume in its same fashion was just formed into a topic. So it's just like we just found it. Let's create some very high level buckets and let's just throw data into those buckets. Any last questions? Awesome. I have all this code OK. This is all open source and I can give anybody any puppet code that they want in order
to see this in action and I have vagrant boxes and all sorts of manifests. So if you do want to see it or you want access to some of the code that will help you spin this up probably best not to mail me because I'm the worst on email in the world. But if you tweet me I'm like constantly on Twitter then I'll actually be able to send you some stuff back pretty fast. Thank you all very
much. We're finished a few minutes early so that means you get lunch first. Yes!