Min and Max Aggregations with Updates in Real Time
This is a modal window.
The media could not be loaded, either because the server or network failed or because the format is not supported.
Formal Metadata
Title |
| |
Title of Series | ||
Number of Parts | 56 | |
Author | ||
Contributors | ||
License | CC Attribution 3.0 Unported: You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal purpose as long as the work is attributed to the author in the manner specified by the author or licensor. | |
Identifiers | 10.5446/67156 (DOI) | |
Publisher | ||
Release Date | ||
Language |
Content Metadata
Subject Area | ||
Genre | ||
Abstract |
|
Berlin Buzzwords 202248 / 56
22
26
38
46
56
00:00
Musical ensembleMaxima and minimaReal-time operating systemPhysical systemSlide ruleXMLUMLLecture/ConferenceMeeting/Interview
00:22
Computing platformEnterprise architectureStaff (military)SoftwareCore dumpFile formatSummierbarkeitSubject indexingDatabaseField (computer science)Set (mathematics)StapeldateiSynchronizationSoftware engineeringEvent horizonCalculationComputer architectureQueue (abstract data type)Real-time operating systemMaxima and minimaQuery languageMobile appStreaming mediaData storage deviceReading (process)Phase transitionTransformation (genetics)Mathematical optimizationCircleTable (information)CubeType theoryComputing platformRaw image formatInternet service providerBitMeasurementCountingMereologyRevision controlRow (database)Analytic setKey (cryptography)Staff (military)CASE <Informatik>MetadataObject (grammar)Focus (optics)MathematicsEnterprise architectureMoving averageConnected spaceComputer animation
06:12
CubeInheritance (object-oriented programming)Table (information)MathematicsMemory managementResultantStreaming mediaEvent horizonCalculationMultiplication signElement (mathematics)Maxima and minimaRow (database)LogicPairwise comparisonRootCASE <Informatik>Type theoryKey (cryptography)CountingRight angleGoodness of fitFunctional (mathematics)Moving averageData storage deviceInsertion lossAdditionComputer animation
11:56
Meeting/Interview
12:12
Memory managementReal numberCASE <Informatik>MathematicsArithmetic meanMultiplication signMeeting/Interview
12:32
Right angleData storage deviceRule of inferenceStreaming mediaMultiplication signRow (database)Electronic visual displayCubeTable (information)State of matterMobile appLecture/ConferenceMeeting/Interview
13:17
MathematicsMessage passingCASE <Informatik>Lecture/ConferenceMeeting/Interview
13:46
Normed vector spaceMessage passingState of matterCASE <Informatik>MathematicsPhase transitionProduct (business)Process (computing)Meeting/Interview
14:18
BitScaling (geometry)Message passingReal-time operating system2 (number)Row (database)Meeting/Interview
14:40
Real-time operating systemBit rate2 (number)Meeting/Interview
15:06
Musical ensembleLogicLecture/Conference
15:22
StapeldateiMaxima and minimaMilitary baseMeeting/Interview
15:47
Real-time operating systemMereologyLecture/Conference
16:14
Buffer solutionMemory managementCalculationPhase transitionCASE <Informatik>Maxima and minimaMeeting/Interview
16:37
Streaming mediaMemory managementMaxima and minimaElement (mathematics)ResultantCASE <Informatik>Meeting/Interview
17:12
Row (database)CASE <Informatik>Maxima and minimaMeeting/Interview
17:31
CASE <Informatik>Control flowMemory management
17:55
WebsiteMemory managementCASE <Informatik>Maxima and minimaCalculationMeeting/Interview
18:12
Multiplication signLecture/Conference
18:28
Functional (mathematics)AverageExtension (kinesiology)Summierbarkeit
18:57
Musical ensembleAdditionMeeting/Interview
19:16
Functional (mathematics)Limit (category theory)Extension (kinesiology)Maxima and minimaLogicLecture/Conference
19:33
Maxima and minimaLogicBitDifferent (Kate Ryan album)Meeting/InterviewLecture/Conference
19:59
Musical ensembleLecture/ConferenceMeeting/InterviewJSONXMLUML
Transcript: English(auto-generated)
00:07
Hi, everyone. My name is Meenakshi. And I'll be presenting the min and max aggregations with updates in the real-time ingestion system.
00:21
Can I see the slides? OK, great. So yeah, moving on. So a little bit about me. My name is Meenakshi, and I'm a staff software engineer in the data platform org in the Enterprise Insights at Twilio.
00:41
What we do there is, for our customers, we have a real-time analytics pipeline. And we handle data ingestion, do the real-time analytics, and provide dashboards and APIs for our customers to query.
01:03
So looking at today's agenda, I'll go through the real-time analytics architecture of our team, giving a little bit background of how to set up the data set, fact, and cube in OLAP, and then moving
01:21
on to the details of how we do the min-max calculation with updates. So this is our architecture. On the top part, we have the data collection. You can see the real-time ingestion pipeline. The bottom part shows the batch pipeline. And as the database to store all the data,
01:43
we use Apache Kudu. And we have a read pipeline where we provide APIs to query that data. So we'll be focusing on the real-time ingestion pipeline because I'll be focusing on the min-max aggregations during real-time ingestion. So during that phase, so if you
02:02
can see, the green circles represent the Kafka topics where we get raw events from our customers. And those raw events, we send it to a real-time mapper, which is a transformation. It's just simple transformations. It's a Kafka Streams app. And it does transformations like converting an event
02:24
into an object format or cleaning up the data. And again, after the transformation is done, it writes to a changelog topic. The events from the changelog topic are then processed by a Kafka Sync connector.
02:41
That is a data Kudu consumer or Kafka Connect app. And then basically the raw events are written in a tabular format in our database, in the fact format. And then you can see the real-time indexer. That is our Cuber app. That is another Kafka Streams app, which does the actual work.
03:02
That is the aggregation. It also handles deduplication. So it handles duplicates and then it does aggregation. So we support some count min and max aggregations. And then again, the Connect app, the Kafka Connect app that is the data Kudu consumer handles the,
03:23
using a Sync connector, handles these aggregated records and writes them to the Kudu. Looking at the read pipeline, we use Apache Calcite for optimization of the queries and then provide them via API and our dashboard.
03:43
And then the batch pipeline is just used to handle updates which are more than three days old and also do some cleanup. Like when accounts are deleted, we delete the data in our database. Moving on.
04:09
So we'll be looking at the data set fact and Q for setting up some background. So looking at the data set, you can think of a data set like a metadata of the data.
04:22
For example, you determine, you say what is the incoming topic which you will be reading? What is the data set name? What is the data set name? Is like per use case, you have this data set. What is the table name? That is where the draw events will be stored. What will be the table name of that?
04:42
Now the main focus is indexes. That is what kind of measures do you need? Like for example, you need a count of the records or you need a max on a currency entry. And what is the roll up? Like if we support day, minute, hourly
05:00
and weekly and the key version and the value version. Basically this is the schema version of the Avro record in the income of the incoming topic. So you can also do like schema upgrades and change the versions and so on. The column fields actually determines
05:21
what will be the schema of your table. What will be the primary keys, all the details of the schema of the table. Moving on, so this is an example of a fact table. You can see that all the primary keys have been defined and you can see the types of the columns and the fields
05:42
and the comment, et cetera. All these actually go into the column field in the data set, like in the columns field in the data set. And also you can think of a fact table as a collection of raw events in a tabular format.
06:02
Looking at the cube table, it is just an aggregation. So you can see that it has columns such as sum of a particular field, like sum of bytes downlink or max of currency ends, count of records. So ordered by all this primary key.
06:22
Moving on, so why not, we could have just used a math package, right? Math.min or Math.max to calculate the min and max, coming to the calculation details. So we look at why it will result in incorrect results.
06:43
So consider a stream of events like this with incoming records, which have ID one, status undelivered, value two, and then we have another status undelivered with value four and then a status delivered with value five. Now, when we want to calculate the max of value
07:02
for the given status, you can see that undelivered will be four. When you just use the math package to calculate Math.max of the values over the status, then you can see that it's easy and it gives you correct results, but there is a case
07:21
where it will give you incorrect results. This is like the best case scenario that we didn't have to do anything. So for example, the logic would have been like, if you have a record of type in, you just look at the new data, you look at the existing data, you do a comparison, and based on that, you do a min or max.
07:43
Now, why it will result in incorrect results is because you can only see a strictly increasing, when you do a max, you'll only get strictly increasing results in the actual max value or strictly decreasing results when you do a min.
08:02
So that is what we are trying to avoid because when you do an update, so this is what we do, we handle updates as well. So for example, suppose you have initially the one, two, three records with this ID one, two, three, and with the original status
08:21
undelivered, undelivered, delivered. And then later on, the undelivered status changes to delivered and its value changes to three. So in that case, now the undelivered max value should have been two, but actually when you use a math.max package,
08:40
math.max function, you just get four, which is incorrect. You actually want undelivered to be two and delivered to be five, which again, as the best case, we see that, okay, it is unaffected, so it's good for us. But so that is the case
09:01
that we are trying to handle that with updates. So how do we handle that? We actually use, we came up with a logic to use a heap. For example, if we are doing a max calculation, we will be using a min heap to store all the values,
09:21
the incoming values that we are seeing. And we just do a collection.max value on the heap. For example, let's look at the same example of the update scenario. Consider only the first three steps, that is when we get undelivered, undelivered, and delivered. At that time, if you can see on the left-hand side,
09:43
left bottom, that for undelivered, the heap contains two and four, and you just do collections.max on the heap, so you get the correct value four, and delivered, the correct value is five. And when an update happens,
10:00
that is, we have adders and subtractors when an update happens. So first the value will be removed, that is removed from the aggregates. So when you do a remove of undelivered four, the value will actually be removed from the heap,
10:23
and then you get the max value by just doing a max of the heap. So you get the correct value that is two for undelivered, and the delivered still remains unchanged. And the next step would be adding off the delivered three. So the delivered will be added to the delivered min heap,
10:42
and then you get the correct value as five. So we have used that logic. Some of the details that I would like to go through is, we always try to store, for example, the min heap will have,
11:01
it will try to always store the largest values when doing a max calculation. So you'll always get the max value most of the times. And when you're trying to replace, like for example, if the heap size is limited,
11:21
so when you're trying to replace an element in the heap, we always do a comparison with the root. Only if the new element is greater than the heap, then we do the insertion, otherwise the new element is ignored. So those were some of the interesting details that went into the min and max calculations.
11:43
So that's it I have for today. Let me know if you have any questions.
12:01
We really thank you for the conference. Maybe if you have some question. Hello, hi, this is Ankush. I was curious about the real world use case where why is this data getting updated? Like first it was undelivered, then it switched to delivered
12:23
and do you have to then keep this history, the array min heap you were talking about for a long time, can the data from one day ago, one week ago or one month ago change? Because then that might mean you need to store a lot of history probably. Right, right.
12:41
So answering your second question, that is how much data we need to store. So we need to store only until the roll-up time. So if you are doing a day aggregation, then we just need to store it for the day and that will be handled by the ROXDB states who are provided by the Streams app itself.
13:02
But and to give you an answer about where the array is stored, we do store it in the Avro record of the topic but we do not store it in the actual table, that is the cube table. And can you repeat your first question?
13:24
Sorry, the first question was why does the data change? What's happening because of the data changes? So for example, let me take a messaging use case. Like you have the SMS messages. So it has different states.
13:41
Like sometimes it is, first it is undelivered, then it will be maybe processing, then it will be delivered. So in those cases there are basically updates to the same message state which are getting changed. And I think updates are really common
14:00
in a lot of scenarios as well. Like the same product maybe having updates or like especially state changes on a particular products of a process or items of a process. So mainly those use cases. We have another question.
14:24
Hello, can you tell us a little bit about the scale of the ingest messages and how close to real time is the availability of those? Sure, so the scale is, I think we handle about 20,000 records per second.
14:41
We can ingest more than that. We have tested around so far. And at a rate of, I mean the data is available to be queried in around less than two minutes. That's our SLA. But it's usually just a few seconds
15:01
that is usually available to be queried in real time. Hello, have you considered implementing this logic
15:20
using Spark or Flink? And what was the trade off, if so? So we do use Spark in our batch processing. If there was a batch pipeline. So that is Spark based. And there the min and max are directly available. So for the batch pipeline we directly used those.
15:45
But we haven't tested Spark in a real time ingestion scenario. We haven't done that part, yeah. We still have three minutes if we have more questions.
16:15
So what I understand from your explanation is that the size of the buffer,
16:20
the heap that you called it is limited in size. What happens if you have more state changes than the size of your heap? Right, right, that is the edge case scenario. Like for example, when we are doing the max calculation and we have all the max values,
16:43
or monotonically increasing values come in the stream. And up to the size of the heap. And so basically all the min elements are evicted from the heap. And when that happens, and when updates happen
17:01
to all these max elements in the heap, what happens is you will get an incorrect result. That's the edge case scenario. Yeah, basically it will either give you the current, whatever the new record's value as the max value
17:23
in that case, which would be not right. Is that a problem in your use case? Sorry? Is that a problem in your use case? As in, does something break if it gives the wrong answer?
17:41
Or is it just a dashboard that shows something incorrect to a business user? Yeah, it would be an incorrect. I would say it wouldn't be, I mean it would be an incorrect, but it would be also based on your heap size and your updates to the max values of your heap.
18:03
In case of max calculation. So it's like a really edge case scenario. Okay, thank you. Yeah, sure. Maybe we have the time for one last question.
18:29
Thanks for the talk. Question, imagine if now you need not only min and max, but some other aggregation function. How easy it would be to such extent?
18:45
Some? For example, average, some, or whatever else. So we already have some, we already support some. And some would be very easy
19:02
because we just do the addition based on what we currently have, right? So that is what we do for some. The question was, I would say, in general, not only some, but is it easy to do extent
19:21
for any aggregation function, or are there any limitations for that? Oh, for any aggregation function? No, we have done this specifically for min and max. This is very, yeah, custom logic for min and max. We haven't done it, like I think other aggregations
19:41
would need maybe similar, but a little bit different logic. Thanks. So thank you, Meenakshi. We can maybe all thank you together. So have a nice day, everyone.
20:02
And maybe we can approach her. Thank you.