Apache Flink
This is a modal window.
The media could not be loaded, either because the server or network failed or because the format is not supported.
Formal Metadata
Title |
| |
Subtitle |
| |
Title of Series | ||
Part Number | 19 | |
Number of Parts | 110 | |
Author | ||
License | CC Attribution 2.0 Belgium: You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal purpose as long as the work is attributed to the author in the manner specified by the author or licensor. | |
Identifiers | 10.5446/30938 (DOI) | |
Publisher | ||
Release Date | ||
Language |
Content Metadata
Subject Area | |
Genre |
FOSDEM 201619 / 110
4
6
10
11
13
15
17
19
20
23
25
27
30
32
36
38
39
41
42
43
44
45
46
47
48
50
52
54
58
61
62
69
71
72
75
76
78
79
80
82
87
88
91
93
94
95
96
97
101
103
104
106
107
110
00:00
SineOperations researchRun time (program lifecycle phase)Process (computing)Streaming mediaMenu (computing)Letterpress printingEvent horizonCountingDiscrete element methodData streamEvent horizonWindow functionState of matterField (computer science)WindowProjective planeLine (geometry)Multiplication signMereologyResultantFrame problemUniform resource locatorMathematicsLocal ringTupleNumberComplex (psychology)GradientFrequencyTwitterLibrary (computing)Statement (computer science)Element (mathematics)InformationSet (mathematics)Group actionDifferent (Kate Ryan album)Source codeQuicksortMessage passingNetwork topologyLevel (video gaming)Presentation of a groupBitStreaming mediaProcess (computing)Sinc functionComputer fileDataflowCountingRun time (program lifecycle phase)outputLengthLink (knot theory)NeuroinformatikRight angleNatural numberPredictabilityTheorySummierbarkeitOrder (biology)SynchronizationCASE <Informatik>InfinityStapeldateiJava appletSocial classTimestampIntegrated development environmentFault-tolerant systemData typeXMLLecture/Conference
07:47
Operator (mathematics)WindowData miningSummierbarkeitMeta elementMathematical optimizationCountingState of matterStreaming mediaProcess (computing)Event horizonAreaState of matterInsertion lossMoment (mathematics)Process (computing)TheoryInformationNeuroinformatikSemiconductor memoryStudent's t-testSlide ruleDemosceneElectronic mailing listRevision controlEndliche ModelltheorieWordMassStatement (computer science)1 (number)Term (mathematics)CASE <Informatik>System callElement (mathematics)Domain nameFood energyPhysical systemDifferent (Kate Ryan album)Observational studySummierbarkeitDataflowAnalytic continuationStreaming mediaMultiplication signPoint (geometry)Extension (kinesiology)Bit rateOperator (mathematics)Functional (mathematics)VarianceParameter (computer programming)Ocean currentContent (media)Query languageArithmetic meanoutputVapor barrierEvent horizonWindowImplementationMeasurementMechanism designBitComputer programmingCountingMultiplicationOrder (biology)ConsistencyData recoveryNetwork topology2 (number)Virtual machineLecture/Conference
15:28
Operator (mathematics)Core dumpBit rateStreaming mediaFluid staticsProcess (computing)Event horizonComputer-generated imageryElement (mathematics)Memory managementOperator (mathematics)Data storage deviceMaxima and minimaOrder (biology)Structural loadoutputNetwork topologyVirtual machineComputer architectureProcess (computing)Event horizonSoftware2 (number)Buffer solutionArithmetic meanCountingEndliche ModelltheorieMiniDiscSystem callStreaming mediaState of matterPoint cloudCodeComplex (psychology)Pattern languageRule of inferenceSound effectMechanism designGroup actionReading (process)Data managementCore dumpLine (geometry)Task (computing)MereologyDependent and independent variablesAtomic numberMultiplication signQuicksortAnalytic continuationField (computer science)Semiconductor memoryMoment (mathematics)Computer configurationMetropolitan area networkCellular automatonDisk read-and-write headCASE <Informatik>Physical lawWater vaporOntologyInheritance (object-oriented programming)Lecture/Conference
23:10
Link (knot theory)FrequencyRight angleExecution unitObject (grammar)Multiplication signPhysical systemRule of inferenceArithmetic meanVotingBasis <Mathematik>Power (physics)Product (business)Type theoryPattern languageState of matterResultantWindowMathematicsMereologyMeasurementSource codePhase transitionPoint (geometry)Semantics (computer science)Process (computing)Element (mathematics)SummierbarkeitFunction (mathematics)Data storage deviceGreatest elementQuicksortHypermediaWordRepresentational state transferDataflowLevel (video gaming)DatabaseOperator (mathematics)Message passingGroup theoryLink (knot theory)Streaming mediaCASE <Informatik>Replication (computing)Endliche ModelltheorieAnalytic continuationBenchmarkGoodness of fitNichtlineares GleichungssystemNetwork socketVirtual machineSingle-precision floating-point formatMechanism designOnline helpLatent heatVariety (linguistics)Communications protocolLocal ringTwitter2 (number)Query languageMoment (mathematics)Distribution (mathematics)Lecture/Conference
30:51
Core dumpComputer animation
Transcript: English(auto-generated)
00:06
OK, thank you. So I'm happy to present Steve Ruhrmann from Ocean Apache Flink Project with our designs. And he's going to talk about a bunch of Flink streaming.
00:22
Yeah, thanks for the introduction. I'm Anto. I'm a Flink committer. And today I'm going to tell you a little bit about stateful stream processing with Apache Flink. So for those of you who haven't heard about Flink, it's an Apache top-level project since December 2014.
00:43
By now it has become quite active with about 150 contributors working on the project. In a nutshell, Apache Flink is a parallel streaming data flow runtime with which you can do stream as well as batch processing. But in this talk, I will concentrate
01:01
on the streaming part. The runtime is capable of giving you low latency as well as high throughput if you set the configuration right. It has a fault tolerance mechanism, which gives you exactly one semantics, which is nice if you require the correct results.
01:22
And it lets you do stateful operations, which we will see that is really useful later on in my talk. But before this could be talk about stream processing and why it has become so popular these days. One of the reasons is that many of the problems
01:40
we are facing now for which we use batch systems is actually a stream in nature. So it would make sense to process the events of the data when it arrives not batching it up to process it at a later time. By doing so, you basically have faster results.
02:01
You don't have to store all the data. And you have more predictable resource consumption in your cluster because they're constantly processing the incoming events. But since all theory is gray, let's look at an example. And this example is going to compromise for the rest of the talk.
02:22
I want to come with you on tweet impressions. So you have an input stream of events where each event signifies the certain tweet has been fed to a feed. So and now what you want to extract from this input stream
02:43
is how often was maybe the blue tweet or the green tweet how many impression did it get over a given amount of time. So what you usually do or how you would do it is you have the input stream.
03:01
You group according to the tweet. Then you find a kind of time frame for computation because the input stream can be theoretically infinite. And doing an infinite aggregation of computation will take you infinitely long.
03:21
So you need some kind of, let's say, I want to know the impressions over the last hour. OK. That's quite easy, I guess, for the example. So how would we implement that with a batch of things? OK. First of all, we have to define the data type
03:42
of our input stream. It can be any Java or Scala type. In this case, I've used the case class because there's so much 16. One field is the ID, which identifies the tweet. Then we have the timestamp when the event happened
04:02
and the count value, which will be used for summing up the impressions. We have to gather execution environments. In this case, it's a stream execution environment, which will tell Flink that we are executing a streaming job. Once we have that obtained, we
04:21
can use it to read from a source. Flink supports various sources like Kafka, WebMQ, file sources, and many others. But here, I've used my magic Twitter source to directly read from Twitter the impression events. This will give us a data stream of tweets.
04:44
OK, now as we've obtained the tweets, we can start with the actual computation. The first step was, if you remember it correctly, to split the stream into events for the individual tweets. And that was done, I think, by this key-by method,
05:03
which you give the field name of your type, or which it shall group. Next, we define our time frame. And here, I set it to a time window of 10 minutes.
05:21
And within this time window, I compute by calling sum the number of impressions I've received. OK, that's the job. This gives us a new data stream where the tweet contains now the counts for the impressions
05:44
the last 10 minutes. And in order to execute the program, you have to specify a sync. In our case, it's simply printing it out. Then we have to trigger the execution by calling execute. OK, that was quite easy, I guess.
06:03
But already at this simple example, you've seen an important concept of stream processing, which is giving, we have to, for aggregations, we have to define a time frame, or what we call a window. Link comes with a different notion to windows.
06:22
The simplest one is the tumbling window, which is basically a contiguous, non-overlapping window. So it splits up your input stream into segments of four elements, in this case. And on these four elements, you can, for example, do the aggregation of summing it up,
06:44
which will give you, in this case, 27, here, 22, and 8. A slightly more powerful window is the sliding window, which allows you to have overlapping windows. So by defining a sliding window of length 4,
07:01
so it contains, again, four elements, to the slight length of 2, which basically says that the first two elements of each window are overlapping with the last two elements of the seating window. You have a different window semantics, which lets you do different things.
07:22
Here, in our example, the window length depends on the size, on the number of elements. But it could also define it depending on the time. And for time, Link supports processing time, which means the time when an element arrives at an operator,
07:41
as well as event time, which means the time when an event was created. This is really powerful, because this means that we can handle out of order elements. So if you read, for example, from multiple sources, it's not sure that order elements will arrive in order.
08:01
And for that, you need event time support. And if the windows, which are like the tumbling and the sliding window, which are supported natively by Link, isn't enough for use case, you can also use quite a custom window with the evict and trigger model, I think. That's not a big deal.
08:22
OK, so far, so good. The problem with windows is that they keep the data around. So as long as the window is not finished and hasn't been computed, you have to store the data somewhere. In case the window's either really long
08:41
or the elements are really large in size, it might not be feasible to do so. And when it comes to counting, we don't really need the information of the events, right? So the only thing we need is to know that there was an element, and then we have to increment a counter along there, for example.
09:02
So to do it a little bit more memory efficient, we can create a stateful mapper, which simply increments the counter value every time it sees a new element. That's a different way to implement a solution to this problem of counting a question.
09:21
How would that look like in the API? Well, the only difference to the previous program is that instead of a sum method, we call the mapWithState method, which takes a function, which adds two parameters.
09:41
First parameter is the event data, so our tree. The second is the state, which is a long, which you can use a little function to take the current count value and where we implement it to reflect the new one. That is quite easy.
10:02
However, what happens if the machine in which our operator once crashes? Then our data is lost, right? And henceforth, we are counting incorrectly, which is not what I've said before that I think gives you exactly what processing guarantees.
10:21
So there has to be some mean to recover from a failure like a machine outage, for example. What we basically have to do or what the system has to do, is store your state somewhere. And what this basically means is that the system
10:43
has to be capable of drawing a distributed snapshot from a parallel data flow, including the operator states. Since Flink uses a continuous stream model, which means that all the operators of the data flow are online at the same time, and the events
11:02
are continuously streamed from one operator to the next operator once they're ready, it's not so trivial. So you somehow have to synchronize the individual operators and to tell them when to take a snapshot so that the complete snapshot,
11:22
so every operator, gives you a consistent. And how this happens with Flink is by introducing our markers in the stream pipelines between the operators.
11:41
Here it's important to note that these barriers cannot overtake and cannot be overtaken by the elements, which are streamed before and behind the markers. This means that whenever an operator, let's say, hears our stateful method, sees this marker,
12:01
it knows that it has now seen all elements up to this point. And it can now take a snapshot of its state. And if all other operators behave similarly, the overall snapshot will be consistent. It can be recovered. It can be used for recovery.
12:26
In terms of processing guarantees and the streaming world, which are at most ones, at least ones, and exactly ones, with this HPCONOS barrier snapshotting algorithm, how we call it, we can guarantee
12:42
all of these three guarantees. It is the first guarantee, the at most ones, actually not so useful in real-world problems. This basically says that you might see an event in an operator, but if it crashed,
13:04
then you might not see it as well. So no guarantees at all. It's easy to realize. A slightly harder guarantee, the at least ones, basically says that you will see every element once. But in case of a failure, it might happen that you see or that you process some
13:22
of the elements multiple times. And the exactly ones means basically that in your final result, every element will be accounted for exactly once. And that is the hardest one, actually, but also the most important one.
13:41
However, as a user, just now, usually the at least ones guarantee is, for most use cases, sufficient. And you would always use or choose to be the weakest guarantee to minimize the event. OK, what does this checkpointing now
14:03
mean for our stateful mapper? Well, if we take a look at it, it looks like before. The only difference is that by enabling the checkpointing mechanism, the system inserts these barriers into your input stream. What the operator does is whenever it sees such a barrier,
14:23
it will take the value it has currently stored in this countervalue and write it to some state back end. In our case, it is HFS here. But this state back end is configurable. And currently, we support five systems,
14:40
like doing HFS, memory, state back end, and we are currently working on a works DB state back end. So for your user function or for the implementation of that, it's completely transparent. So you don't have to bother about what's happening behind the scenes. OK, let's take a look at the performance
15:06
and what the impact of this checkpointing mechanism has on the overall performance. When we talk about performance in the streaming sector, we always talk about latency and throughput.
15:21
These are the two important measures. Due to the fact that Bink has this continuous streaming model, where the operator's always online, and once an element has been processed, it can be sent to the downstream operators. It can achieve really low latency.
15:41
But to be precise, in order to also achieve the throughput rate, the elements are not directly sent to the downstream operators, but they are buffered for. And they are sent to the downstream operators once the buffer's full or once the buffer timeout has been captured. And with this buffer timeout, we basically
16:02
have an upper value for upper bound for the latency. In order to show you that, we wanted an experiment where we had the stream we ran from Kafka. We grouped on some key and counted
16:21
the elements in each group. And we ran this experiment on a 30-node GCE cluster with, I think, four cores and 15 gigabytes of RAM around the specs. And we measured the latency, which is the reading line here, versus the aggregated throughput.
16:44
And we did that for varying the size of the buffer timeout, so starting from 0 to 100 milliseconds. As you can see, with an increasing buffer timeout, the latency also increases because the elements stay
17:01
long on the buffer. And it's more likely that the elements are sent to the downstream operators once the buffer's full the longer the buffer timeout is. However, the upside of this is that also the throughput increases. So as a user, you can use this buffer timeout to tune your system, either to having lower latency
17:25
or higher throughput. Wanted to know, what's the effect of the checkpointing mechanism on the overall throughput? What we did is we fixed for the same job where we have a grouping, which means a network shuffle.
17:42
We fixed the buffer timeout to 50 milliseconds and computed once the aggregated throughput without checkpointing, and once with exactly once, so with the checkpointing enabled. And as you can see, without checkpointing,
18:00
we could achieve, I think it was like 80 or 7 million events per second. That's what we can process. And with the checkpointing of exactly once processing guarantees, we have 82 million events. So by sacrificing approximately 4% of the throughput,
18:22
you achieve exactly once processing guarantees, which is a really great oversight. OK, so much for the performance. Now let's take a look at the future and what's going to happen within the next month.
18:42
So now I've talked counting. Counting means having a long day, which is not really big. However, it happens that the state of operators can grow quickly really large. For example, if you have an NLP model in your operators, then it can easily be 700 megabytes.
19:04
And writing this model to some disk to store it can take some time. And with the current implementation, the problem is that writing the data out stores the operator and also the processing
19:22
of incoming events, which is for stream processing, well, kind of prohibitive. So what we are currently working on is an async call as snapshot so that the processing of events can continue while storing the count state to some persistent storage.
19:44
Additionally, we've seen, as I said, some of the state can be really large so that it cannot be kept in the heap. So what we additionally work on is an out-of-course state so that once your state grows so big that it cannot be held
20:02
in the memory, but it will gracefully spill to disk. That can also be retrieved again. That's one big thing we're working on. Next, we've noticed that during the stream job, the load is not always constant.
20:21
So what you usually do to meet your SLAs is to provision for the maximum input you can expect. However, this means that most of the time, you will waste some resources because you don't receive this high input.
20:44
So what we want to do or what we are currently implementing is dynamic scaling in and scaling out, which means that whenever Flink detects that there's a slow operator, for example, let's say here, then it will spawn a new operator so that the load is more evenly distributed
21:03
across all operators and that the SLAs can be kept again. And the same works the other way around as well. Then whenever an operator in idle is too long, it will be the UP of the topology
21:22
will be decreased so that you don't waste any resources. However, that only works if the architecture is capable of requesting new resources, which means new machines, and releasing these machines from a resource manager.
21:42
And currently, we support YARN as such a resource manager. We have this feature that when the job manager sees, OK, I need a new task manager to deploy some more tasks, I will tell my resource manager, and the resource manager will give me that. And also, when the task manager is idle, it can be released.
22:00
And this will also be part of the new Mesos integration, which will also happen in the next month. OK, last but not least, there's also some other good stuff going on with the cloud of Keras. Currently, we have contributors working
22:21
on Stream SQL, which will basically bring SQL to the streaming world so that you can use your SQL knowledge also with streams. And we are implementing complex event processing. So that's really easy, not touching too much code
22:42
to detect complex event patterns in your streams. For example, if you want to detect suspicious behavior in your network, you might define some rules and could be used to detect tools, for example. And with that, the only thing which is left now
23:05
is to conclude my talk. I hope you've enjoyed it. If you got excited about it and want to stay tuned, you should check out the link at apache.org. They will also find the main list, so which one can subscribe.
23:20
And take a look at the code, which you find on GitHub or follow us on Twitter. And yeah, thank you very much for your attention. When snapshotting on the local disk,
23:42
how do you ensure all colors? With local disk, of course, they don't have photons. You would have to save it to HFS, for example, where this replicated. There was a benchmark made by Yahoo.
24:00
It was also made by you that showed very less performance of what you have shown here. So how can you explain the difference? Well, actually, the benchmark done by Yahoo measured the latency of, let's say, the redest throughput.
24:21
The results were not bad for Flink, actually. They were on par with Storm, which is known for having really good latencies, actually. Moreover, here's the benchmark. The measurement of latency is difficult in distributed settings, because the clocks might not be synchronized
24:41
between the machines. So even slight changes will disturb the results of the measurements. And the way the Yahoo guys measured the latency was basically that they had a job. Then there was basically a map operation,
25:03
or a group operation. And then in this operator, they look up to a redest store to do a join operation. And this result, they wrote every second
25:21
to some database to measure the latency between the time point when it was written and the end point of the window of one second, which basically means that if you assume you have a latency of zero, then the element would arrive in the first window,
25:41
but wouldn't be written to the store before one second has passed, because only after one second was written to this database. That way, the latency measurement was kind of flawed, in our opinion.
26:02
But the overall results were so that Flink and Sloan were on par with respect to latency. throughput was not measured there. If I have my query, can I change the query?
26:21
Do you run that? No. It's at the moment like think of having a static query where the data is piped through, and then file it around. Yeah, I mean, with the API, or if you specify it as a SQL query, you
26:43
construct a data flow graph, which complies with some internal structure, which is then used to execute it from the specification distribution. I want to move away from the rational database
27:01
to even so single database speeds. Can Flink help with data transactions? Flink allows to, within Flink, the guarantees
27:23
that there won't be data lost during this checkpointing mechanism. However, once you cross the border from Flink to a different system, they have to interact. I mean, the system has to respect these checkpointing notifications to make sure that no data is
27:42
hidden twice, for example. If you do that, then yes. But you have to implement it for the specific connector you're using there. What are the different ways you can connect with Flink?
28:03
Yeah, what do you want to connect with Flink?
28:21
I mean, to connect with Flink, as a client, you have used Akka internally. So you have like, you can send an Akka message to it. But usually, what you usually do to ingest data
28:42
is you have connectors. And there's a wide variety of actually all Hadoop compatible stuff, which is supported. I mean, all Hadoop performance can be used. But there's no other way. I mean, that's not true. You can use the REST interface to request some metrics.
29:02
But there's no other way to interact with the system yet, like other protocols. Not yet, for sure. Why shouldn't you use Spark streaming? Well, you shouldn't use it. But the thing is that they use different,
29:23
the concepts are different. Spark uses mini-batches to simulate streaming, which might be enough for some use cases. However, what you basically do is you don't have such flexible windowing semantics
29:43
as with a continuous stream model. One reason why you think that the continuous stream model is a period. And because of this mini-batching, you have really high latency. So if your use case, which you want to solve,
30:02
twice lower latencies, then it's always not possible to do it with Spark, for example. Take the rest of the questions online, please. What sort of equation on IoT, and also the same with stream, is you have a socket connector, for example, on a stream stream.
30:24
Can we send for a socket? Yes, yes. That you can do. And the difference, by the way, with stream and with stream from Spark, in this case, you will receive the message for sensing immediately as they go inside. This will be a big discussion of the stuff.