We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Tiny Flink — Minimizing the memory footprint of Apache Flink

00:00

Formal Metadata

Title
Tiny Flink — Minimizing the memory footprint of Apache Flink
Title of Series
Number of Parts
60
Author
Contributors
License
CC Attribution 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal purpose as long as the work is attributed to the author in the manner specified by the author or licensor.
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Apache Flink has been designed for, and is mostly used with large-scale real-time data processing use-cases. Companies report about TBs of data being processed per second, or TBs of state in huge clusters. But what if you need to process low-throughput streams? Running a full, distributed Flink cluster might be an overkill, as there’s quite a bit of overhead for distributed coordination. In this talk, we’ll explore options to reduce your resource footprint. We’ll dive deeper into Flink’s MiniCluster, allowing you to run Flink in-JVM for integration tests, as a micro service or just a small processing your data in Kubernetes. We will also discuss lessons learned from running MiniCluster in production for a service offering Flink SQL in the cloud. Attend this talk if you want to learn about Apache Flink and its various options to deploy and configure it.
Musical ensembleCodierung <Programmierung>Software engineeringSoftware as a serviceStaff (military)Point cloudDiagramComputer animation
ScalabilitySemantics (computer science)Event horizonDatabaseData storage deviceInstallable File SystemStreaming mediaStapeldateiDatabase transactionBlogData streamData streamNeuroinformatikState of matterWebsiteReal-time operating systemDifferent (Kate Ryan album)MathematicsMereologyMotion captureBuildingWindowPhysical systemContent (media)Streaming mediaDatabaseEvent horizonMultiplication signSemantics (computer science)1 (number)Virtual machineResultantProcess (computing)Computer animationLecture/Conference
State of matterScalabilitySemantics (computer science)Event horizonPhysical systemPressureDifferent (Kate Ryan album)Level (video gaming)Java appletLine (geometry)Spectrum (functional analysis)Data streamProcess (computing)ExpressionLecture/ConferenceComputer animation
Event horizonSemiconductor memoryPrimitive (album)Level (video gaming)Multiplication signLink (knot theory)1 (number)
Mobile appOperations researchService (economics)Event horizonPersonal digital assistantProcess (computing)Physical systemSoftware testingLocal ringDisintegrationOperator (mathematics)Configuration spaceDefault (computer science)Maxima and minimaRead-only memoryFocus (optics)Data managementSource codeTask (computing)Memory managementProcess (computing)Spring (hydrology)SynchronizationStreaming mediaReading (process)Mathematical analysisResultantCartesian coordinate systemIntegrated development environmentAverageCASE <Informatik>Rule of inferenceSoftwareSemiconductor memoryLine (geometry)Data centerFile systemState of matterSocial classDefault (computer science)Computer configurationCodeProjective planeINTEGRALUtility softwareProduct (business)Operator (mathematics)Maxima and minimaServer (computing)Local ringData bufferService-oriented architectureUnit testingSlide ruleCodeSoftware testingSoftware developerBitService (economics)Limit (category theory)Physical systemConfiguration spaceScripting languageMultiplicationDiscrepancy theorySeries (mathematics)Operating systemLink (knot theory)Elasticity (physics)Asynchronous Transfer ModeSimilarity (geometry)Order (biology)Drop (liquid)NumberExecution unitComputer animation
Source codeApproximationProcess (computing)Memory managementRead-only memoryLimit (category theory)Semiconductor memoryVideo trackingParameter (computer programming)Thread (computing)Default (computer science)Stack (abstract data type)Reduction of orderData structureException handlingMetric systemScale (map)CodeWorkloadMultiplication signMemory managementLimit (category theory)PressureProcess (computing)Local ringLaptop2 (number)Utility softwareStack (abstract data type)Visualization (computer graphics)SpeicherbereinigungConfiguration spaceCodeRepository (publishing)BitTrailThread (computing)Level (video gaming)Parameter (computer programming)SpacetimeWeb 2.0Link (knot theory)Lebesgue integrationBlogDefault (computer science)Operating systemMetric systemSource codeHigh availabilitySemiconductor memoryRepresentational state transferMetadataInformationData storage deviceException handlingBefehlsprozessorLine (geometry)Graph (mathematics)String (computer science)Right angleAreaService-oriented architectureSerial portCASE <Informatik>BenchmarkObject (grammar)State of matterSocial classBuffer overflowOpen setCycle (graph theory)Web browserReal numberNumberMathematical analysisVirtual machinePrice indexDifferent (Kate Ryan album)ResultantResource allocationInheritance (object-oriented programming)Computer animation
Staff (military)WaveLink (knot theory)MereologyLecture/ConferenceComputer animation
MereologySemiconductor memoryThread (computing)Process (computing)Social classPoint (geometry)Task (computing)Link (knot theory)Data managementDifferent (Kate Ryan album)Service (economics)Presentation of a groupUnit testingSoftware frameworkPlug-in (computing)Configuration spaceAsynchronous Transfer ModeCountingFlow separationCodeLecture/Conference
Roundness (object)Goodness of fitLecture/Conference
Musical ensembleDiagram
Transcript: English(auto-generated)
So my name is Robert. I'm a staff software engineer at Decodable. We are building a SaaS offering for Apache Flink in the cloud. It's mostly SQL, like Flink SQL, but also you can upload your Flink jars
and we run them for you. And let me first introduce quickly what Apache Flink is. The website says stateful computations over data streams. So it means you can connect to a bunch of different systems like Kinesis, Pulsar, and most popular, Kafka.
You can also do change data capture from Postgres or MySQL where you're basically getting events for every change that you're doing in the database. And then this stream of events goes into a cluster of Flink processes which are processing your data in real time with low latency.
And the stateful computations basically means that you can, for example, build session windows, or aggregations, or basic filtering, or mapping, or lookups, and so on as part of your Flink cluster. And Flink is highly scalable, so you can add more machines.
It will have higher throughput. It supports exactly once processing semantics, which means that for registered states, or for example, for your session windows, or for your aggregations, if the system needs to restore and replay parts of your data, it will reset your state accordingly
so that the aggregations or the window contents are accurate exactly once. It supports event time semantics and watermarking for handling time, for providing correct results, even under back pressure or system outages. And you can use all these features with different APIs.
There is a spectrum of APIs from streaming SQL, which is very easy to use. You just write a few lines of pretty much standard SQL, and then this generates a Flink job for you. Or you use the more low-level data stream API in Java, Scala, Kotlin, also Python,
which is much more expressive. You have much more access to low-level primitives in Flink to use all these features, like event time, exactly once, and so on. So in this talk, I'm going to talk about minimizing Flink, like reducing the resource footprint of Flink, in particular, the memory footprint.
And I will also look at the resource consumption that you will have when running the performance numbers, like the throughput and so on that you have when running a really small Flink cluster. And the first motivation here on this slide is that the minimum cluster size that you
need to have when you deploy a Flink cluster is roughly 1.7 gigabytes of memory if you're using the default configurations of Flink and the Kubernetes operator of Flink to deploy it. And that's because the job manager needs 650 megabytes, and the task manager or multiple task managers
each need one gigabyte of main memory. And this motivation is also the reason why I've worked on this at my job, because we have customers that have hundreds of streams that they want to process, and these streams have fairly low throughput. And if these customers would launch 100 Flink clusters, it would not be feasible for them
to use Flink in production with Flink SQL, because they would have too high costs to process their data. And that's why we were looking into reducing the resource footprint of Flink as much as we can, so that these customers can also use Flink for their low throughput streams in our product.
The second motivation is also when you're deploying Flink internally, and you have low throughput use cases, and you want to have Flink deployed in a unified way alongside your other microservices, like your Spring applications, your Kubernetes applications, and you want to have a unified deployment, monitoring,
and operations for these services. So basically, you're considering everything HAVM, and you want to have Flink also deployed as a single JVM that reads data from Kafka or Pulsar, as analyzing it and writing some results somewhere. And the third motivation for trying to make Flink as small as possible
is local development and testing. You want to run Flink in your IDE so that you can test whether the code that you just wrote compiles and makes sense, or you want to run integration tests to make sure that your analysis is correct. So you don't want to live in the data center when implementing a Flink application.
The third case is actually covered, let's say, automatically in Flink. So when you just initialize a stream execution environment in Flink, it will automatically launch a mini cluster when you're running Flink from your main method. And there's also a JUnit rule that you can use to create a mini cluster in Flink unit tests.
So this is actually already covered, but the first two cases are not really covered in the Flink documentation. And in this talk, I'm going to tell you a little bit how to get started. Before we do that, we'll quickly talk about the deployment options of Flink. Nowadays, most people are using Kubernetes.
And the blue line basically shows what I think is the preferred way to deploy Flink with the standalone mode, which means you're posting some resource definitions to Kubernetes, like for the job manager and for the task manager processes. And then these processes come up, connect to each other,
and Flink will figure out how to distribute work amongst these JVMs. There's also the option to use the native Kubernetes mode, where you submit a job manager to Kubernetes. And this job manager needs a service account that has the permission to create pods. So the job manager will actually decide how many pods to allocate, depending on the jobs that you're submitting to Flink.
So with the native mode, you have more resource elasticity, because Flink can allocate and deallocate pods throughout the lifecycle, basically, of a Flink cluster. But it requires lots of access in your Kubernetes cluster, because it needs to have the permission to launch pods, which might not be OK for some environments.
There's also the option to launch Flink on Docker. You can also build stuff yourself using the best scripts provided by just launching the JVMs that you need. And there's the option to deploy Flink using mini cluster. And that's what this talk is about.
So to just get started with a empty Flink cluster that doesn't run anything, you basically need to run these few lines of codes in your, let's say, main method or somewhere else. This will start all the services that you would also start in a distributed cluster. So it's bringing up the job manager. It's bringing up a task manager, resource manager.
The task managers will connect to the job manager. I mean, they will connect just locally. There's no network involved. And other projects have similar utilities. For example, there's a class called the Kafka server in the Kafka project that allows you to launch a Kafka broker locally. Or there is the mini DFS cluster in the Apache Hadoop
file system. So if you want to launch HDFS locally, you can also just call this class. It gets tricky when you're trying to launch HDFS Kafka and Flink in the same JVM just because of class path issues. So better use something like test containers
if you have multiple systems. So when you run this main method and you try to get the heap size down, it won't work with a heap limit of 60 megabytes. It did work with a heap limit of 65 megabytes, but I wanted to go even lower. And that's why I reduced the memory configuration of Flink
to 8 megabytes. So Flink will only allocate 8 megabytes of memory buffers instead of 64 megabytes. And with that, I was able to launch an empty Flink cluster with a heap limit of 20 megabytes, which I think is quite good. It would run on my Apple Watch.
So we can launch Flink with 20 megabytes of heap space, but there are some open questions. What is the heap size when you run an actual job on this cluster? What's the throughput that you can get from such a small process? And why is there such a discrepancy between the 20 megabyte of heap and the 80 megabyte of JVM
process size as reported by the operating system? And for the remainder of this talk, I will go through these items. So the first item is to launch an actual Flink job on this cluster. And I will use a fairly simple job,
which has just a Kafka source reading from a Kafka topic, a filter operation, which is filtering out 1% of the data, and a Kafka sync. And when I run this job on the cluster, I need to add 15 more megabytes to the heap so that it doesn't fail immediately.
The process size goes up to 190 megabytes on average. Like, this is fluctuating. Some caveats. Checkpointing is not enabled, and I'm not using RocksDB. So I'm just using on-heap state, which is fine because the only state that this job has are some offsets from the Kafka source for reading data.
And also, I have to say, I'm going to do some benchmarks. And I'm not really deserializing the data. So I'm treating every incoming and outcoming data just as a string. I'm not trying to deserialize the JSON or something, which would add more CPU cycles to this experiment. So for the benchmark, I'm going to run three mini clusters.
The top left one is a mini cluster that is generating data to Kafka. Then there is a mini cluster that I want to benchmark that runs this very simple Flink pipeline. And this writes again to Kafka. And then there's a final mini cluster, which is just measuring how fast is data being
written to this Kafka cluster. I'm running this on this very MacBook, so it's not a very professional benchmarking setup because I have a browser open and stuff. But I still think it gives you a rough indication of what to expect from such a small Flink process on a single machine. So here are my results.
With the 35 megabyte heap limit that I had initially for getting this to run, it was actually failing almost immediately. Like with three megabytes of data per second, the JVM was already failing with an out of memory exception. I guess that it was just allocating stuff on the heap too fast so the garbage collector was not
able to clean up fast enough under such high memory pressure. With 50 megabytes, I got it to 25 megabytes of throughput. And with the heap limit of 100 megabyte, I got it to roughly 100 megabyte of throughput per second on this laptop. So let's look a little bit at the details.
This is from the visual JVM utility. So with a 50 megabyte heap limit, I had this run for like 10, 15 minutes, and I was increasing the data generator speed. So initially, I was generating data with six megabytes, then 12, 25. And at 36 megabytes, I think I put too much pressure
on the JVM, it failed. So we can just conclude somewhere between 25 and 36 megabytes is the limit for a 50 megabyte heap limit for running this really small Flink cluster. So I doubled the limit, and then the JVM was stable. You can see on the right hand side that there's not
much room left in the garbage collector, like 30% left or so, but it still looks healthy. You see that the garbage collector activity on the left hand side is super low, like it's a flat line. I would be worried if you would spend like 20% or 30% of your CPU time just on garbage collection, but that's not the case here. So I think the limit is somewhere else, probably
in the local Kafka broker or so. But I think 100 megabytes per second is a pretty decent throughput for such a small process. So if you imagine running this as a microservice in your Kubernetes cluster for analyzing some data,
I think that's pretty decent. If you're at checkpointing, you might need a few more megabytes of heap. And if you're doing more serialization or so, then you would probably also need more CPU resources, but I'm mostly focusing on memory in this case. So as I mentioned, there is a bit
of a difference between the heap limit and the real memory allocation that the operating system is reporting. So for the 100 megabyte heap limit, I saw the memory consumption roughly around 250, 260 megabyte. When you use top and you look at the RSS or real memory
in the Mac OS activity tool, then you see roughly 250 megabytes. So I enabled this JVM native memory tracking. I've put the JVM argument here. Then you need to call some J command. And with that, I got this analysis.
So basically, when the whole memory goes up is when I started the data generator. And what you see is that 100 megabyte is heap, and the rest is stuff, like stuff from the JVM. For example, the thread space, for each thread, there's one megabyte of stack space allocated.
The garbage collector has 50 megabytes. Metaspace has 50 megabytes. So metaspace is basically all the classes that you're loading. They are basically loaded into a separate area in the JVM called the metaspace. This is never garbage collected. It's allocated forever. The garbage collector is just metadata for the garbage collector, where it stores, basically,
the age information about the object, and so on the graph, I guess. Anyways, so this looks pretty bad, in my opinion, because the majority of memory is spent on JVM metadata. So there is this really nice blog post linked here. And in that blog post, there are some tips how to reduce the native memory allocation of a JVM.
And what I did is I reduced the thread stack size from one megabyte to 256 kilobytes. And with that, I was able to drastically reduce the allocated memory for the threads. And I also switched from the default garbage collector, which is like the G1 GC, to the serial garbage collector.
And that basically zeroed the memory consumption of the garbage collector. But of course, this is reducing the throughput. So I was doing some experimental benchmarks, and it looks like 15 megabytes or so of performance hit because the garbage collector is less efficient.
And also reducing the thread stack size is also dangerous because you might run into stack overflow exceptions because there's less space for the thread. And Flink has a high number of threads. So it has around 100 threads in this experiment, 20 of which
are for the REST API of Flink. So you can probably also optimize this by reducing the number of threads in this pool to two instead of 20. So there's more room to optimize. But to conclude my talk, I was able to scale down Flink to a process size of roughly 250 megabytes.
And this mini cluster was still able to process 100 megabytes of data per second. These mini clusters run the same code as a fully distributed cluster. So you are able to migrate your state and everything, your jobs, from this mini cluster to a fully distributed cluster if your workload is expanding over time. It supports high availability.
So you can kill this mini cluster process. When it comes up, it will look at a config map or its zookeeper and knows how to restore from the latest checkpoint. It supports all the metrics and logging integrations of Flink. It also supports the Flink web UI. So if you're adding the right jar to the glass path, Flink will allow you to connect to the 8081 port.
And you will see the status overview of Flink. And I have a GitHub repository with all the examples from this talk. So you can take the source code and run it yourself. And we have one minute left for questions. Yes, any questions? And you can also ask about Flink in general, if you want.
It doesn't have to be about minimizing Flink. Question from me, the mini cluster is part of Flink, yes? Yes. And it still allows you to run Kafka and the others, HDFS, for example, even if it's a Flink offering or part of Flink,
yes? Yes. So the mini cluster is not really documented documentation, but it's used everywhere in the internal unit tests of Flink. So it's a public class, so you can just use it. And basically, Flink supports, as I showed in my presentation, all kinds of deployment modes. And these deployment modes have different entry classes
to a framework of different, let's say, resource managers, job managers, task managers, and so on. So these different entry points are basically different configurations of how to start Flink. And mini cluster has one entry point of how to start Flink. And basically, if you look into the mini cluster class, it's launching all these services that Flink has, like HA service, plugin manager, job
manager, and so on. Pretty useful. Yes. Perhaps I missed it, but you've clearly explained how to reduce the memory footprint in a, quote unquote, empty cluster as an empty job.
But I've kind of missed the thing where you are only running on one JVM. Because in my memory, starting a mini cluster simply starts both the job and the task managers. Yes, but it's OK. So it starts the job manager and task manager threads,
but it's still within the same JVM. So it's basically one JVM. And then in the JVM, we call just the job manager.start and task manager.start code. But it's not spawning a hundred threads. You have quite a high thread count and all kinds of stuff
allocating memory. So I think maybe you should have a separate talk on mini cluster in the future, because that seems like an interesting subject. The hope was that this talk is about mini cluster. OK, good. Thank you. Thanks a lot. Thank you very much for the talk.
Big round of applause for Robert.