We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Get in control of your workflows with Airflow

00:00

Formal Metadata

Title
Get in control of your workflows with Airflow
Title of Series
Part Number
49
Number of Parts
169
Author
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Christian Trebing - Get in control of your workflows with Airflow Airflow is an open source Python package from Airbnb to control your workflows. This talk will explain the concepts behind Airflow, demonstrating how to define your own workflows in Python code and how to extend the functionality with new task operators and UI blueprints by developing your own plugins. You'll also get to hear about our experiences at Blue Yonder, using this tool in real-world scenarios. ----- Whenever you work with data, sooner or later you stumble across the definition of your workflows. At what point should you process your customer's data? What subsequent steps are necessary? And what went wrong with your data processing last Saturday night? At Blue Yonder we use Airflow, an open source Python package from Airbnb to solve these problems. It can be extended with new functionality by developing plugins in Python, without the need to fork the repo. With Airflow, we define workflows as directed acyclic graphs and get a shiny UI for free. Airflow comes with some task operators which can be used out of the box to complete certain tasks. For more specific cases, tasks can be developed by the end user. Best of all: even the configuration is done completely in Python!
Red HatDemonEstimationGame controllerMachine learningDataflowComputer configurationProcess (computing)Decision theoryVirtual machineMultiplication signResultantState of matterInterleavingSoftware developerError messageVideo gameFlow separationLecture/Conference
Error messageProjective planeSocial classMultiplication signCuboidPoint (geometry)Branch (computer science)Exception handlingProcess (computing)Computer fileComputer animation
Error messageCAN busProcess (computing)Limit (category theory)Concurrency (computer science)Moving averageMultiplication signScheduling (computing)DatabaseProcess (computing)Decision theoryMechanism designPhase transitionQuicksortType theoryElectronic data processingLecture/ConferenceComputer animation
Process (computing)Limit (category theory)Concurrency (computer science)Interface (computing)Open sourceNP-hardLimit (category theory)Projective planePerturbation theoryTask (computing)Flow separationOpen sourceDecision theoryImplementationMultiplication signPoint (geometry)Concurrency (computer science)Lecture/ConferenceComputer animation
SpacetimeCategory of beingProjective planeImplementationStack (abstract data type)DataflowDifferent (Kate Ryan album)Open sourceLecture/Conference
CodeInterface (computing)DatabaseArtificial neural networkLoginControl systemProcess (computing)CodeServer (computing)DatabaseSoftware developerSlide ruleProjective planeWeightRepresentational state transferMoment (mathematics)Extension (kinesiology)Open sourceLimit (category theory)Revision controlInformationComputer animation
Uniform resource nameMaxima and minimaMIDICodeDynamical systemPredictabilityGraph (mathematics)Dirac delta functionOperator (mathematics)Multiplication signDecision theorySet (mathematics)Scheduling (computing)Hand fanConnected spaceDirection (geometry)Control flowUniform resource locatorPosition operatorForm (programming)Parameter (computer programming)WebsiteOpen setLecture/ConferenceComputer animation
Decision theoryBranch (computer science)Complex (psychology)Lecture/Conference
TouchscreenView (database)Network topologyScheduling (computing)Graph coloringRun time (program lifecycle phase)Task (computing)Position operatorPredictabilitySequenceDemosceneNetwork topologyView (database)Roundness (object)BitPoint (geometry)Decision theoryDifferent (Kate Ryan album)Computer animation
NumberTask (computing)Process (computing)Decision theoryExecution unitBitPhysical systemFront and back endsFunction (mathematics)View (database)Lecture/ConferenceComputer animation
Operator (mathematics)Block (periodic table)BuildingOperator (mathematics)Process (computing)DataflowCodeStatement (computer science)WeightDatabaseEmailLecture/ConferenceComputer animation
Computer fileDatabaseProcess (computing)File systemOperator (mathematics)Lecture/Conference
Level (video gaming)Operator (mathematics)Personal area networkAddressing modeUniform boundedness principleTask (computing)Process (computing)Intrusion detection systemMoving averageInfinityInterior (topology)Default (computer science)3 (number)System callSummierbarkeitFlynn's taxonomyPlug-in (computing)Configuration spacePhysical systemCodeOperator (mathematics)LengthNatural numberDatabaseRun time (program lifecycle phase)Configuration spaceHydraulic jumpSocial classProcess (computing)Complete metric spaceLine (geometry)Decision theoryConnected spaceVirtual machineConsistencyPredictabilityWebsiteView (database)Standard deviationComputer filePlug-in (computing)Source codeXML
Plug-in (computing)ImplementationSoftware repositoryFunction (mathematics)Revision controlPlug-in (computing)Point (geometry)Data managementDataflowDiagramFunctional (mathematics)Letterpress printingRange (statistics)Physical systemServer (computing)Operator (mathematics)Web 2.0Software repositoryLecture/ConferenceComputer animation
Process (computing)Different (Kate Ryan album)Scheduling (computing)Server (computing)World Wide Web ConsortiumDatabaseLocal ringSoftware frameworkVertex (graph theory)Server (computing)Database1 (number)Flow separationUniqueness quantificationProcess (computing)Electronic data processingScheduling (computing)Web 2.0Lecture/ConferenceComputer animation
Mathematical morphologyTask (computing)Open setProduct (business)Software testingMultiplication signParallel portSoftware developerLimit (category theory)Lecture/Conference
Different (Kate Ryan album)Scheduling (computing)World Wide Web ConsortiumServer (computing)DatabaseProcess (computing)Local ringVertex (graph theory)Software frameworkSoftware frameworkTask (computing)Physical systemMultiplicationWorkloadPower (physics)Process (computing)Similarity (geometry)Scheduling (computing)Scripting languageExpert systemComputer animation
Physical systemInstance (computer science)DatabaseFunction (mathematics)Mechanism designPlug-in (computing)Scheduling (computing)Connected spaceInstance (computer science)Physical systemCentralizer and normalizerSampling (statistics)WeightLecture/ConferenceComputer animation
Plug-in (computing)Projective planeInterpreter (computing)TelecommunicationBitGoodness of fitFunctional (mathematics)Multiplication signOpen setScheduling (computing)Pattern recognitionWebsiteDataflowLecture/Conference
Task (computing)Process (computing)QuicksortCellular automatonDataflowStructural loadGroup actionTransformation (genetics)Multiplication signServer (computing)Computer animation
IterationPhysical systemDescriptive statisticsElectronic data processingTask (computing)Scheduling (computing)ConsistencyService (economics)Set (mathematics)Lecture/Conference
InfinityIterationDiscrete element methodSpecial unitary groupPoint (geometry)Task (computing)Process (computing)Multiplication signOcean currentServer (computing)Scheduling (computing)Computer animation
CodePresentation of a groupProcess (computing)Multiplication signProjective planeArithmetic meanComputer configurationLecture/Conference
Plug-in (computing)Web pageDataflowMereologyProjective planeKey (cryptography)Software repositoryInstance (computer science)WikiComputer animation
Focus (optics)Stack (abstract data type)Flow separationPhysical systemSoftware testingLimit (category theory)IterationTask (computing)Scheduling (computing)Volume (thermodynamics)Process (computing)LogicData managementComputer virusWeb pageCodePresentation of a groupImplementationGroup actionRoundness (object)WordDirection (geometry)Moment (mathematics)Staff (military)Operator (mathematics)Query languageLecture/Conference
INTEGRALScheduling (computing)TheoryFocus (optics)Software testingDifferent (Kate Ryan album)Unit testingClient (computing)Lecture/Conference
Transcript: English(auto-generated)
Hi, welcome in this last session in the PyCharm room for today. Our first speaker is Christian Treibing who is going to talk about getting in control of your workflows with Airflow. So hi, welcome to my talk, getting in control of your workflows with Airflow.
I'm Christian Treibing and I'm working as a software developer at Blue Yonder, so we have a booth also here if you're interested later on, just drop by and ask any questions. So, imagine the following scenario, which I know personally from my daily life.
You are at a data-driven company. Each night, you get data from your customers and this data wants to be processed. That's how you make money. Processing happens in separate steps, so, for example, you have to take care that this data is well-lit, you have to book that data, you apply some machine learning steps, you have to take decisions based on the results of the machine learning, and if errors happen,
then you need to get an overview of what happened, why did it happen, when did it happen, especially since most of this stuff is running at night and you need to see it next morning what possibly went wrong. And as you already might have guessed, we have tight time schedule, so time does matter
and processing time. What options do you have to realise such a scenario? The first thing that comes to mind of most developers is doing it with Grun, and we also had many projects where we started with that. It's a great way to start, it works out of the box, but you only have time triggers.
You cannot say this Grun job depends on that Grun job, please start afterwards. You just say at some time, start. So, for example, at 22 o'clock, book your data at midnight, do the predict run, and at 2 o'clock, do the decide run.
Besides that, the error handling also is hard. You always have to search for the correct log files when something went wrong. So now, as I said, we have a tight time schedule, and you would like to get finished earlier. So as we see, each of these steps roughly runs around one and a half hours, so why not
compressing that? So we could do better. We could here start the predict at half before midnight, and at one o'clock, start to decide. It works most of the time, but sometimes your database is slow, sometimes you have other issues. And then one run takes longer, here the book data run maybe takes ten minutes longer,
the data is not there, the predict run fails, the decision run fails, your completely run fails, which is very bad when you discover it the next morning, because your customer cannot get the data he wants. So that's an issue with Grond, because of that, when we used a Grond mechanism, we always had buffers, and yeah, if the schedule was not too tight, that worked fine.
But what about the next step? Our customer sends more data, the processing time gets longer, and we need to find better solutions. Why not writing our own tool? It's so simple. We just have to check that the first run stopped and that the next run will start. That cannot be that hard.
And the start is very easy. In multiple projects, we did that, and it worked for the first steps, but afterwards, you see the limit soon. You have maybe concurrency, that you have multiple tasks running at once. You need to know why what task failed. You might not only wanting to do timely triggers, but also trigger manual things afterwards.
You might want a UI or an external end point. At that point, you have to take a decision. Either you can accept the limits, that's fine, or your own workflow implementation gets much more complex than you thought initially.
So you are stuck. We were in that situation also. We wanted to harmonise all these workflow tools we had in our different projects, and we had to look at several open source implementations. There are many interesting things with many different properties.
So, for example, we also had to look at Spotify Luigi, but this was more an HDFS-based tool which was not in our technology stack, and also several other tools. In the end, we decided for Airflow, which is an open source project initiated by Airbnb, therefore the name Airflow. Why did we decide for that?
Well, the tool itself is written in Python, we know that, and we like it, and the one thing that was really cool is that the workflows are defined in Python code. So they are not sitting in some JSON files, not sitting in some database rows, but really each workflow is a Python code, you can enter it in your version control system,
you get all versioning, and that's really a very good way of managing that. It has most of the features I said you will run within the limitations, so you can have a look at the present and the past runs, it has logging features, it's great that it's extensible, so you can write your own extensions in Python code and plug it
in without having to modify the open source code, but it detects these plugins, I will tell more on that later. It is under active development, so at the moment it's an Apache incubator project, and people are reacting on the pull request, so there's lots of traffic in there, and you can see that it gets further.
It has a nice UI, which I will show you, you can define your own REST interface, and it's relatively lightweight, you have two processes on a server, and you need a database to store that information. How does a workflow look like? This is the Python code I talked about. Mainly, it is a dynamic acyclic, no, not a directed acyclic graph, each workflow, and
you instantiate it, you get some parameters, like when is the first run, what scheduling do you have, you can give that in time deltas also, and you can define your workflow steps as operators, so here I tell more about the operators later, but we have three
steps, we are booking the data, we are predicting, and we take the decision, and the connection between the steps you do via the set upstream, so you say before the predict happens, the data needs to happen, and before the decide happens, the predict needs to happen.
So this should be, the graphic doesn't work here, but okay, let's go to the next complex stuff, so maybe you want to have a fan in, fan out. We have more data, and the predict run takes longer, we want to parallelise that, and maybe we say we do some prediction for German customers and some prediction for the UK locations,
so by that, I can say if you predict Germany, predict UK, both depend on the booking of the data, and the decision depends on both of them, so it's very nicely to describe, and it will give you that graph directly. By that, you can build arbitrary complex workflows, you also have the possibility for
decisions and for switches, but at least for us, we did not need them up to now. So most of our workflows are quite linear, just with a few branches in there. So how does the nice CI look like? I already promised you.
You have here an overview where you see what workflows do you have, what is the schedule of them, and also what are the statuses most recently, so it's a little bit small to read, but you have here saying which tasks, how many tasks have run correctly, how many tasks are running currently, also you can see what are erroneous and what are currently
up for retry. You can run each view, you can have a look at each deck run explicitly, so you see here the sequence, this is colour coded also that you can see which step was successful, which is currently running erroneous and so forth.
So this is a run that did not start. So this is just scheduled, but starting was not done up to now. The tree view shows you an overview of all the runs. So here you see each column is a run day, so you see for each day here, these three
days went correctly, all green, and the last run currently had an issue here within the second step, it's yellow, this means it's up for retry. So you get a nice overview on how did it behave in the past and currently. Also which helps in that is the runtime view, for which you can see, for example,
performance degradation, where we see here we have three runs, and these colours are all different tasks, so let's say this is the booking data step, the blue one, this is the prediction step, and this is the decision step, and you see one behaves the same and the other two changed over time.
So very useful for seeing which of the steps might have taken longer. You can see each run also as a Gantt chart to see when was each step happening, and you have a log view which really is useful where you can output things like unfortunately
it's a little bit smaller, here it says the decision task has started the job in the backend system and the job ID is 17, and the next iteration, it asks what is the status now, and then we see it is finished. By that you can see how each task was processed.
Now, what are the building blocks of your workflows? These are operators, and there are already many operators delivered in Airflow. As an example, you can operate, you can start things on the Bash, you can start things with HTTP request, you can execute statements on databases, you can write directly
Python code which is executed, or you can send mails, and this is just a few examples, there are more in Airflow delivered. There are not only these operators, but also sensors, sensors are steps in your workflow that wait for things, so an HTTP sensor could, for example, always query an URL and ask whether it is finished or what is the status on that, and based
on that it will wait, or it will proceed in the workflow. In the same way, an HDFS sensor could check for files on the file system, and an SQL sensor could check for values in the database. Many things already you can do with these operators, but there might be situations
when you need more. For example, for us, we had an asynchronous processing in our back-end systems, so we had here our Airflow system, we had our back-end system, for example, the machine learning system for the predictions, we wanted to start a job, so we trigger an HTTP request there, we get back a job ID, and then we let it run for five minutes,
half an hour or so, and we constantly ask whether it is finished or not, and when it is finished, we can start the next job. This would be possible to do already with standard methods of Airflow, so we could use the simple HTTP operator to start it, and the sensor, as I described, to wait
until it is finished. This works, but it has the disadvantage that you don't see directly how long did it take. So, you remember the last view with the run times, I would like to see how long did my decision take, and therefore, I want that this step decide as a certain length,
the length is the length that it took on the back-end system. So, this is possible. We can do this with a new operator. I won't explain each line in detail. Also, you can find afterwards this as a complete Airflow example plugin on a GitHub repo, which you can see afterwards and can check for each line.
So, we have an HTTP collection defined. We have some endpoint decide that we can trigger that, and it delivers us a job ID, and we have a job status. We can ask when we have the job ID, what is the status of that. So, within the execution, we run the post on the decide to get back the job ID.
Then we wait for the job with the job ID, and once the status is finished, we are done. And then, within the Airflow database, we know how long did this decision step take. Now, how do you get these operators into your system?
As I said, we don't want to modify the Airflow code directly, but we can do this in a Python package, we can say we have this plugin that has some own operators, that has some blueprints, and lay that in our file system, and in the Airflow configuration,
we just can say your plugin is here, and your workflow definitions are there, and on the start of Airflow, it will detect them automatically. Also, that plugin is defined in Python. As you can see here, we have the Airflow plugin manager, and you just say inherit from the Airflow plugin, we have our Europe Python plugin.
This has three operators I need, and also a blueprint. What is it about that blueprint? Why do you need that? We had the requirement we wanted to have an endpoint to talk in a REST style with our Airflow system, so that we can also programmatically say I want manually to start a trigger,
I want to know is a diagram finished or not. This functionality was not there in Airflow, but you can write it as a fast blueprint, you can define the endpoints, and it is detected automatically and added within the web server. Also, this you will see in the example repo.
How would such a REST endpoint look like? We have here the Airflow server running on port 8080, and we have defined this endpoint trigger, and we say we give the name of the workflow, which is daily processing, and we get back the name of the workflow and the run ID, which we can use afterwards to ask for the status.
So this works fine. Now, what happens inside of Airflow? It works with two processes, at least two processes, I should say. We have a scheduling process that takes care when each job should run, and we have a web server that gives the UI and all the other blueprints.
Also, you need a database. Several databases are supported. We are using at our company the Postgres and SQLite. SQLite currently has a restriction that you cannot run parallel tasks on them, but we are using the SQLite more for the development testing stuff, so this is fine. And for production, you can use the Postgres, and there you don't have that limitation.
You can also look how do you want your tasks to be executed. We are using most of the time just HTTP requests. We are saying we trigger a task in the backend system. We are waiting until it is finished. So the Airflow system itself, there's no high workload on that. So we are happy that this runs within the scheduler process directly,
or when we want to have multiple tasks in parallel, we work with subprocesses. But it's also possible if you trigger the stuff via bash scripts or similar things that you want more power behind the executor nodes itself. And to do that, you can use Celery, which is a framework with multiple worker nodes,
and you can use that. There is already a connection from Airflow to Celery. How we use it? Most of the things I already mentioned in the meantime. We use the automatic schedules, and we have manual triggers. We use one Airflow instance per system we manage. So we also had how do we that connection? Do we have one central company Airflow instance or one Airflow instance per system?
And for us, it was easier to do it that way. Databases, we use Postgres and SQLite. They execute us a lightweight. And also, we are contributing to Airflow. This is really good. That works fine. These external triggers, you can trigger them manually. They were not there one year ago.
And we needed them definitely before using Airflow. So we wrote a pull request that was also worked with, and now within these two pull requests, this is an Airflow. And we also have some necessary functionality for the plugin detection, so we also open pull request there, and there's some active communication with the community.
With all these good things about Airflow, there are at least a few challenges I want to make you aware of because these were things we struggled a little bit and also with the project teams using Airflow at our side. This has to do with how is scheduling handled
and how is the start time interpreted. So scheduling, there are two dates that are important for that. It's the start date. This means when did the processing of this task of this workflow start on the server. So that's quite easy. It's the time of the server. But there's also an execution date that is quite prominently shown in the UI,
and that sometimes shows strange values. These values are consistent and they are explainable, but they are not always obvious. The reason is the history from Airflow. So this was used in ETL scenario. So this extract transform load.
And this means that for each, they wanted to process daily data, which was coming in the whole day long. So let's say on 19th of July, the whole day data came in, and then you wanted to process that data for the 19th of July. And when can you process that data? You can process it only after the 19th, which is the 20th.
So let's say today. So today, this task of data processing runs, and what is the execution date? It's the 19th. So it's always one iteration back. This is because they said originally, well, this is more a description. This is the data from the 19th. Therefore, it's the execution date.
That's fine. When you know that, it does not scare you that you think the system is doing wild things, but that's consistent. But yeah, you have to get used to that. We have some workflow starting in a weekly schedule, which means when I trigger that now, it gives me the start date of Monday the week before. Also that it's consistent, but you need to get used to that.
Then we have the start date. You might remember for 10 minutes ago that we give a start date for each workflow, and if the workflow is scheduled automatically, and you start the server, it will know that it has to fill up tasks.
So when we say start date is today, 20th of July, no, we start the server at 20th of July, the scheduler. At the start date, we have given the 17th of July, then it will detect that there are some runs missing, and it will fill these runs automatically. So it will first trigger the run with execution date 17th,
then execution date 18th, and as a regular run, then the 19th will be processed at the correct point of time. You need to check whether this is applicable for you. So when you have these things, I would need really to process this data. That's fine. When you have more of a thing, I want to trigger something in the backend,
and I need to trigger it just once, because this backend job will take care of all cleaning up that stuff. Then this is a little bit strange and can lead to issues when you trigger it too much, but you can work around with that when you give the correct start time already. So you can determine that in code. You can determine that in a variable. There are several options.
I won't discuss them in detail, but this is the thing you should have in mind when you do that. When you wonder why does this backfill happen, it's possible to handle that, but you need to know the concept behind that. If you have some further questions, maybe we can discuss afterwards. Okay. And that's it also from my presentation.
I give you here, that's the incubator project for Airflow. It has a nice documentation, which is here. Also very useful is the common pitfalls page in the Airflow wiki. There also the stuff with the execution date is explained in more detail. And the plugin, which I have shown you parts of, you can find here at our Blue Younger repo.
You can download that. You have the steps in the readme on how to use that in your Airflow instance. So that's it from my side. Any questions?
Yes, hello. Thanks for the presentation. You showed us the GUI. Is it possible to manage test dependencies
in this GUI, or is it just to display things that you wrote in the code? The workflow definitions itself, you do that in code. So you can view that code from the GUI, but you have to change it in your code editor. Okay, because in our firm,
we've got a homemade scheduler and well, with I think 100,000 tasks inside. And is it scalable, Airflow? Do you use this amount of tasks in your system? No, we don't have that high data volume in our system.
So for us, it's more that we have per systems these nightly runs that have several tasks, but not a thousand some millions of that. I've seen in the documentation page from Airbnb, these stacks seem to be much bigger, so it would be worth to ask them
what is the limit for that. But we did not reach it up to now. Hi, I would like to ask about the execution date and the run date. Is it possible to configure it? Because we have similar example
when you collect data for last month and you want to run it, for example, 15 days later, or in the opposite direction, you want to collect data for next month and run it 50 days before that. If you can configure it, like the delay, or maybe even if you can postpone it,
if you see, okay, I will run tomorrow, but if I have no data tomorrow, I will retry the day after tomorrow. Well, at first, the logic is not configurable. So this is in the scheduling code itself, regarding the stuff running two weeks after
or two weeks from now. I would say I have no quick answer to that. Maybe we could discuss it afterwards. I think you can do many things with the scheduling. Because the scheduling just helps you when does it run, you also have the possibility to schedule a run each day and as a first task of the run,
decide on whether you really want to run or not. So this might be the first iteration when you say you implement the more complex logic than in your first task, but maybe there's also other things. Okay, thank you. Let's have one more question. Did you evaluate other tools
when you decided about Airflow and why did you decide for Airflow? Yeah, for example, we had to look at Luigi, but that was based on an HDFS stack, which we did not run. So therefore, this was too heavyweight just to have a workflow system to set up this. We also had to look at several OpenStack implementations,
but their main focus was on doing heavy work lifting with execution processes and how these are distributed. And since we had very lightweight processes, but needed more UI features and more possibilities to define own operators,
this also just was not the main focus. When you see these two things create, but the main focus is a different thing, then yeah, it's good to have a look at other tools. Because in theory also Jenkins does a lot of things like that with some plugins. So if you already have Jenkins, you have to convince your team to use something else.
So what could be one thing that you can do that otherwise you cannot do? I mean Jenkins is great. We also use Jenkins for our integration testing for scheduling our unit tests, but not for the daily productive runs.
Okay, let's give a big hand to Christian.