Building Data Workflows with Luigi and Kubernetes
This is a modal window.
The media could not be loaded, either because the server or network failed or because the format is not supported.
Formal Metadata
Title |
| |
Subtitle |
| |
Title of Series | ||
Number of Parts | 118 | |
Author | ||
License | CC Attribution - NonCommercial - ShareAlike 3.0 Unported: You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this | |
Identifiers | 10.5446/44818 (DOI) | |
Publisher | ||
Release Date | ||
Language |
Content Metadata
Subject Area | ||
Genre | ||
Abstract |
| |
Keywords |
EuroPython 201953 / 118
2
7
8
15
20
30
33
36
39
40
45
49
51
55
57
58
63
64
66
68
69
71
74
77
78
80
82
96
98
105
107
108
110
113
115
00:00
BuildingGoogolPoint cloudGame theoryLecture/Conference
00:25
GoogolPoint cloudTwitterStapeldateiOpen setCodeScheduling (computing)ExplosionDatabase transactionCore dumpDatabaseModul <Datentyp>Scripting languageImplementationData managementPoint (geometry)Right angleDatabase transactionGraphical user interfaceCASE <Informatik>DatabaseCellular automatonLine (geometry)SpacetimeCodeInformation engineeringMultiplication signTerm (mathematics)Bit rateAnalytic setInternetworkingMereologyBitGradient descentVideo gameImplementationWeb 2.0Scripting languageProduct (business)Open source2 (number)Data analysisChaos (cosmogony)Traffic reportingDataflowSoftware developerGoodness of fitPoint cloudComputer fileProcess (computing)Normal (geometry)Staff (military)ResultantCore dumpComputer animation
07:05
DatabaseTask (computing)Computer fontProcess (computing)Analytic setQuery languageComputer fileSeries (mathematics)Right angleSource code
08:23
ExplosionImplementationModule (mathematics)Arrow of timeFunction (mathematics)Task (computing)Traffic reportingDirectory serviceRight angleSource code
08:53
ExplosionImplementationCodeSource codeStapeldateiOpen setNormal (geometry)Scheduling (computing)CASE <Informatik>DatabaseScripting languageStapeldateiAnalytic setKettenkomplexPoint (geometry)MultiplicationStructural loadTraffic reportingComputer animation
09:58
ExplosionImplementationSource codeScheduling (computing)Module (mathematics)Task (computing)Compilation albumBroadcast programmingComplete metric spaceBuildingWindows RegistryComputer-generated imageryProcess (computing)Scheduling (computing)Decision theoryService (economics)Error messageCodeCubeComplete metric spaceTask (computing)Medical imagingProduct (business)Traffic reportingVirtual machineWeb applicationWeb 2.0Line (geometry)Normal (geometry)Computer animation
12:53
Medical imagingWindows RegistryCodeGroup actionCubeSource codeJSONXML
13:18
ThumbnailInclusion mapNormed vector spaceProjective planeDemonProcess (computing)Set (mathematics)Computer animation
13:50
ThumbnailProcess (computing)Scheduling (computing)Task (computing)Graphical user interfaceCubeSource codeComputer animation
14:15
ThumbnailWindowLemma (mathematics)Information managementExplosionSource codeComputer-generated imageryScale (map)StapeldateiComplex (psychology)Scaling (geometry)Data managementImage resolutionContinuous integrationSoftware testingPoint cloudGoogolTable (information)Task (computing)State of matterProcess (computing)ResultantBitMereology1 (number)QuicksortStructural loadGoogolError messagePower (physics)Multiplication signCASE <Informatik>Scaling (geometry)Right angleScalabilitySlide ruleArithmetic progressionSoftware developerComplex (psychology)BuildingStapeldateiComputer animationXML
18:36
MiniDiscTime zoneDefault (computer science)Computer fileMultiplication signMedical imagingProduct (business)CASE <Informatik>Process (computing)Inheritance (object-oriented programming)Directory serviceRight angleCodeData managementComplex (psychology)Substitute goodTask (computing)Vapor barrierService (economics)MereologyGoogolDirection (geometry)DataflowLoginSoftware developerDifferent (Kate Ryan album)MultiplicationSimilarity (geometry)Endliche ModelltheorieQueue (abstract data type)Message passingMechanism designStack (abstract data type)ExpressionPhysical systemDevice driverLevel (video gaming)WritingDivision (mathematics)HierarchyBuildingNP-hardData storage deviceLecture/Conference
26:57
CodeData managementSimilarity (geometry)Task (computing)Open sourceINTEGRALDataflowScripting languageSource code
27:39
Task (computing)Scripting languageSoftware testingWorkloadDifferent (Kate Ryan album)Extension (kinesiology)Line (geometry)BitPoint (geometry)Medical imagingInformation engineeringAreaChord (peer-to-peer)Traffic reportingDirection (geometry)CodeLecture/Conference
Transcript: English(auto-generated)
00:02
Hi, hello, namaste. I'm originally from Nepal, so I'm currently living in Germany and working in Germany. Today I'm going to talk about building data workflows with Luigi and Kubernetes. Before we start, a few things about me.
00:28
Hi, my name is Noor Kumar. I'm currently working at Breininger. Breininger is a traditional fashion house, mostly popular in South and West Germany. And currently
00:41
Breininger is expanding in online e-commerce space quite rapidly. I'm working at Breininger with a data team, a data lake team, and we use Python Luigi with Kubernetes running on Google Cloud. I was a web dev in past life and
01:05
then companies came to me and then they wanted me to do data, so I moved to the more data engineering role. And you can find me on the
01:25
Can I ask one question? Do you work with data engineering? It's kind of a new term. How many of you are using data engineering? Okay, okay, slowly, yeah, slowly it's growing. Actually quite a lot, more than I expected.
01:45
How many of you are using Luigi? Whoa. How many of you are using Airflow? Okay, pretty decent. Okay, so today I'll just introduce Luigi because Luigi is
02:01
pretty small, it's really lightweight. I think if you just see the readme of the Luigi, you must get the idea. Then I'll talk about Kubernetes, how you can learn Luigi and Kubernetes, and I think that part is pretty interesting for this crowd. Okay, so just a few things about Luigi. It's a workflow pipeline
02:28
tool. If you are using Airflow, pretty similar to Airflow. It was open source by Spotify. It's already pretty mature, actually pretty old, and you can
02:44
write basically the data flow or basically pipelines as a normal Python code, so that's always a plus point. It's really lightweight and it comes with basic web UI to basically see the jobs and stuff like that.
03:05
It has tons of packages. You can run Hadoop jobs using this as our orchestration tool. You can run BigQuery, you can run AWS stuff if you need to upload some file or if you need to query. Let's say the stuff like this, it
03:22
already has tons of country packages, so you can do almost everything with really few lines of code, right? And it doesn't have scheduler, by the way. I'll come back to this point later in my talk. So just to demonstrate, let's
03:46
make a use case. Let's assume one use case. Let's say you work in a company where they have ice cream franchises. They have a lot of ice cream shops and the manager or your colleagues or data analysts want to see the daily
04:04
sales of yesterday. Every morning they come to work and then they want to see what happened yesterday, how was the company sold, right? So we need to do a few things to achieve this, right? You have a prod database where all the
04:21
transactions happen, right? So you want to dump the prod database to somewhere because you don't want to do aggregation on prod database, otherwise you can kill the prod database. And then you want to inject somewhere in analytics database. It could be Redshift, it could be BigQuery or it could be anything, it
04:42
could be Postgres, whatever. And then you want to run aggregation on this analytics database and then update the dashboard and send out to everyone, right? So since we are in a Python conference and I know all of you are really good Python developers, so you wrote an awesome Python script and settled using
05:07
cron. So dump sales data, ingest analytics database and then aggregate data and then cool, maybe profit, right? So it looks pretty great, right? I mean,
05:23
runs, works. Maybe not, like we have a couple of issues with this implementation. What happens when your FOSTA fails? So if you see this here, we settled hourly because we think maybe the dumping database takes a bit of
05:42
time and then ingestion takes a bit of time, so we kind of managed our cron to start one for Android, right? Assuming that the FOSTA finishes within one hour, second one finishes within one hour and stuff like this. And one hour because everybody is doing big data, right? I mean, we have big data. So yeah,
06:05
we have got a couple of problems here. What happens when FOSTA fails? What if FOSTA takes longer than one hour or if you have to run this same reporting for the last five days or last one month or last one year,
06:23
which can happen? And how do you see if this absorbs then all successfully? And somehow, if you mistakenly run multiple times, what happens to your dashboard? Is it broken or what happens, right? So since now we know our use
06:48
case and we saw the Python implementation, I want to show you the LUIS implementation of this. And I have to open my Python. So yeah, so this is my LUIS implementation. I put everything in one file. So yeah,
07:08
yeah, I already hear some laugh. I put everything in one file and you can see SQL queries, plain SQL queries. So don't worry about that. What I want to point out is how you can implement some kind of task or some kind of job in
07:22
LUIS. I can increase the font size. Is this good enough? So the first thing I
07:40
wrote is a DOM database task and then I have low to analytics DB tasks and then I have an aggregate task, right? So the one important thing you see here is low to analytics DB task depends on the previous task. So the DOM database
08:01
task. And if you see the aggregate task, this depends on the previous one. And the last one depends on a previous one. So this is how you can change the series of bad jobs together in LUIS. And running this is pretty easy. So I'm
08:28
currently using PPNB, so I have to trigger like this. So I have to go to that directory and then I can just run. So the output looks pretty, pretty a lot
08:41
of stuff in there, but you can see here we run like four tasks and then we have the report, right? Yeah, so looks pretty, pretty okay. And if you see in the UI, this is how it will look like. So first I dump the database and then
09:08
load it to our analytics database and then did the aggregation and the report is steady. So we kind of solve our problems that I mentioned earlier here.
09:23
I'm sorry, where was it? Here. So we solved some of the problems like what happens when first one fails? They are chained together, so the next one will just stay in waiting state. And we also solved the second one that if first
09:40
one takes longer than that, it will just keep waiting, right? Then I'll come to the other points later. So we saw that you can do kind of chaining of multiple batch scripts or batch jobs and then you can run with simple
10:03
command line use. And since this is a reporting thing that has to be in production, you have to somehow run it. So as I said earlier, Luigi doesn't have scheduler, so you have to use cron. Usually cron is used, so you need
10:25
some way to trigger the task. So if I run with Luigi, it would be, where is mine? So yeah, if I had to run with cron, it would look like this. So I was
10:50
pretty surprised when I saw that Luigi doesn't have its own scheduler, because it's pretty common to have scheduler in this kind of pipeline tool. But it turns out that it's actually not bad. It's a design
11:07
decision made by Luigi team. Not having scheduler means you are really flexible to do whatever you like. You're really flexible to run from different places, and one of them is from Kubernetes. So that's what I'm going to
11:25
talk about in the next. So what is Kubernetes or cron job in Kubernetes? You can run a lot of stuff in Kubernetes. You can run normal services like web apps, which run 24-7. You can run jobs,
11:48
which do a particular thing, and then they are done. And then cron job is basically jobs in a schedule, like with the cron. So the jobs are known, also
12:07
called run to completion. So Kubernetes runs it, and then there is error or failure, then it reschedules them. So it always tries to complete the job. So we saw earlier that it was pretty easy to run from local, but
12:30
what about Kubernetes? So I have a simple Kubernetes setup on my local machine. I installed a mini cube, so I have mini cube cluster running. And if
12:42
you see here, so this was my Luigi code. So what I did is I have built a Docker image out of it, which is pretty easy. Nine lines of code, and then you have Docker image. I have uploaded this image on Docker registry
13:00
and deployed to Kubernetes, which is mini cube right now on my local, and Kubernetes deployment looks like this. And then running on Kubernetes, you can see the command, run Python and Luigi, stuff like that. So which is the setup I have here? You can see here I have one cron job, and then I
13:25
also deployed the Luigi daemon also on Kubernetes. So you can see here deployment Luigi, and the UI of the Luigi, you see, this is the UI of the Luigi that I deployed. So all this setup I already did beforehand, because
13:43
we don't have much time, but if you are interested in setup, I have a project in GitHub so you can easily follow it. So let's see, I have one cron job here, and I want to run it. So this is schedule from 7 to 16.
14:00
So right now it's 16, 19, so it will not run, but I can also manually trigger this task using cube cron command line tool. So let's see what happens. So I should see a pod, which is in running state, and you see here,
14:23
and then let's see in our Luigi UI. So you see there are a couple of tasks in pending state, some are already done, and there is one running, the aggregate task is in running state, and this is how you can track the progress of the
14:42
jobs. Some of them are done, and if there is error you could see here, others are just waiting here. So it looks pretty cool. So we are able to run on Kubernetes, and Kubernetes has a cron job,
15:03
which creates a job, which creates a pod. So it sounds a bit like a Russian dog, but it's pretty easy once you get into this. So that's what the Kubernetes did here, I had deployed the cron job, and then it created a couple
15:23
of jobs, actually one job, and then it was created a pod, which is still running. So yeah, we are able to run Luigi, same way I ran on local, but this has quite a bit of benefits. The main benefit being the scalability
15:46
of your pipeline. So since it's on Kubernetes, you could scale endlessly. In our case, we use the Kubernetes engine, that means at midnight, we have hundreds of jobs running, which are automatically scaled up,
16:05
Kubernetes automatically scales up, because the load is really high during that time, and then after all the jobs are done, all the Kubernetes nodes go down. We only have two nodes, the rest are dynamic scaling. So this is, I personally find,
16:23
really powerful, but apart from this, you get all the benefits of Kubernetes, from containerization to easier deployment to flexibility with the infrastructure and stuff like this. So that is it. It's a bit rough, but you can follow the example on my GitHub,
16:57
and then you can, if you are interested, just try this.
17:02
So I personally find it very powerful setup, that Louis being lightweight, it's really easy to containerize and deploy on Kubernetes. As a result, you can build complex batch processes, which are easy to scale and maintain,
17:30
and you get the benefit of both, the pipeline to Louis, and then the infrastructure side of Kubernetes, like horizontal scaling and the deployment like I said earlier.
17:51
So in short, that was it. I have one hidden slide, which is my team is hiding.
18:03
If you like Python, if you like Kubernetes, those kind of stuff, please feel free to talk to me. We are just based in Stuttgart, just two hours from here, and we are really looking
18:21
for developers. With that, any questions? Thanks, Lars and Eddie.
18:45
Thanks for the talk. Great, awesome. Question, if you have users or customers in multiple time zones, do you have an experience, what's the best substitute for crontab to manage tasks in multiple time zones? I personally didn't really have to deal with that, because
19:06
our developers or our users are all in the same time zone. I have unfortunately no right answer for that. So I have a question about here. I'm here. Are you looking at the wrong direction?
19:26
Yeah. So does Luigi support rescuing of jobs, meaning, sorry, rescuing of jobs, so as a workflow management system, if you're in the middle of your job and something failed, can you go and fix it and just start from where you failed? Yeah, Luigi handles that.
19:44
In my setup, the setup with the Kubernetes, I showed you, Kubernetes actually handles it, so we are not using the Luigi feature, so whenever something fails, Kubernetes will run again. So we are using that part, but Luigi can also do that. Hi, thank you for the talk.
20:05
I was wondering, have you encountered situations where you need different Kubernetes pods to have different rights, and how do you manage that? For example, you want one job to run with production rights, but maybe another job to run with dev rights, not the same rights.
20:23
Yeah, so usually we use service accounts to run jobs. So one job has one service account, the other job has other service account, and the service accounts have limited permissions. So if you need a storage account access, you only give a storage account access to the service account.
20:42
That's how we do. Thank you. Thank you for your talk. We are currently in the stage where we are evaluating Luigi versus Airflow. I'm looking for recommendations. You know, so what we saw is that it was really pretty hard to write code that express
21:12
nested steps, and the complex like, hierarchy of tasks. We found a way to make it with Airflow, and I guess maybe there
21:28
is a way to do it with Luigi, and we just missed it. Do you know how to do that? Yeah, with Luigi it's a bit complicated. The branching model is supported, but it's a bit hard.
21:40
Airflow has definitely better mechanism on this case. On Luigi, we actually don't do that kind of super complicated branching or nested jobs. Really quick, did you work on a use case where you have a NFS mounting to your Kubernetes image? Did you mount a disk to your Docker image?
22:10
Monitor the disk. Did you mount a disk? We have a problem, which is accessing large quantity of data, and we want to move to Docker
22:24
instead of accessing the local disk. One barrier that we have is to mount NFS data to the image and access it. My question is, did you have a similar use case?
22:44
Not really. With Luigi, we usually only process the smaller data set, and Luigi is mostly for orchestration part, so all the heavy lifting we actually do with other tools like BigQuery
23:02
and stuff like that. I was wondering what your reason was for using Luigi versus Airflow. The Luigi and Airflow are both Python things,
23:28
but they really have a different use case. Airflow is really big, and it comes with everything you need. However, the one main drawback was the Kubernetes support is still
23:42
not so good. They recently added the Kubernetes support, where each task can run on a Kubernetes pod, but this is not so stabilized yet. This is one of the main reasons that we prefer Luigi. Other than that, Airflow is also like, if you have your pipelines, you have to copy in some
24:05
directory, so you cannot really build Docker images and deploy like I did with Luigi. So the packaging and deploying Luigi is a lot easier and flexible, and Kubernetes,
24:24
sorry, Airflow, you are always in the Airflow way of doing things, so it's really heavy, heavy thing. It gives a lot of things, but it's also not that flexible. Hi, question. Thanks for the talk. How do you handle the logs, so when
24:45
integrating with Kubernetes, do you get the logs from the pod then in Luigi? So since we use a Google Kubernetes engine, the logs are sent to Stackdriver by default.
25:01
So yeah, so if you saw the running, Luigi will basically give this yesterday out and which is sent to the Stackdriver. Okay, and then how do you handle the secrets? So if the secrets are pushed to the logs, do you hide them? We use Kubernetes secrets, so they are
25:23
not in the logs. Okay, thank you. Is it possible to configure Luigi in a way to run jobs, not just based on the crontab, but also, I don't know, some Redis message queues or a file being dropped in a directory
25:40
or something like that? And how does that work? Because you were starting a Docker container for every run, but I think something would be there for all of the time. Yeah, that's actually a really interesting use case. We are also thinking about that. There is no built-in way to do it, so you have to build something yourself. Like if you
26:03
upload a file to S3 and then you want to trigger Luigi, then you have to build this part yourself. So like I said earlier, running Luigi is just a command line thing, right? So you would
26:22
run this command when the file is uploaded. So this part you have to implement yourself. There is no default way to do it. Thank you for your talk. When you showed your example with the code and the dependencies between the tasks, it looks like the code is
26:44
kind of coupled strongly to the code that is running, so the Luigi task division part and the stuff that is running. Is this very flexible or how strong is the vendor locking if someone
27:04
switched to something? Is it complicated to do that? As far as this Airflow also has like similar kind of contribs, so Luigi has really a lot of contribs. So I mean it's an open source, so it's not really a vendor locking thing. That's
27:22
not really what I meant. It's about how deep is the integration of the managing of the task and the definition of the dependencies and the code itself. Is it coupled a lot or are the stuff that you want to do, the scripts that you're running,
27:42
separate things and you just define the tasks separately and can edit, for example, some command line options for what you want to run? I think I got your point. So what I do is we usually build one pipeline for certain stuff, like building one report is one pipeline,
28:05
so this is one image, Docker image for us. So we have like hundreds of images to handle all the workloads for the company. Does that answer your question? I'm getting a little bit different direction, but maybe you take this offline.
28:23
Yeah, we can also talk later, yeah. Okay. We could probably have one more short and quick question. Yes, please. And probably the last one. Yes. Actually, testing Luigi is really, really easy because it's really lightweight,
28:45
so we use Pytest to test it. We have some extension built to support Luigi, but generally it's really, really easy. It's like any other Python code. So thanks for joining me. I really liked it. There was a lot of questions. I was a bit afraid
29:05
when I submitted the talk, thinking that there might not be enough interest, but I'm really excited that there were a lot of questions and if you use Python in data engineering, I really want to talk to you. Please come to me. Thank you.