Developing elegant workflows in Python code with Apache Airflow

Video in TIB AV-Portal: Developing elegant workflows in Python code with Apache Airflow

Formal Metadata

Developing elegant workflows in Python code with Apache Airflow
Title of Series
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this license.
Release Date

Content Metadata

Subject Area
Developing elegant workflows in Python code with Apache Airflow [EuroPython 2017 - Talk - 2017-07-13 - Anfiteatro 1] [Rimini, Italy] Every time a new batch of data comes in, you start a set of tasks. Some tasks can run in parallel, some must run in a sequence, perhaps on a number of different machines. That's a workflow. Did you ever draw a block diagram of your workflow? Imagine you could bring that diagram to life and actually run it as it looks on the whiteboard. With Airflow you can just about do that. Apache Airflow is an open-source Python tool for orchestrating data processing pipelines. In each workflow tasks are arranged into a directed acyclic graph (DAG). Shape of this graph decides the overall logic of the workflow. A DAG can have many branches and you can decide which of them to follow and which to skip at execution time. This creates a resilient design because each task can be retried multiple times if an error occurs. Airflow can even be stopped entirely and running workflows will resume by restarting the last unfinished task. Logs for each task are stored separately and are easily accessible through a friendly web UI. In my talk I will go over basic Airflow concepts and through examples demonstrate how easy it is to define your own workflows in Python code. We'll also go over ways to extend Airflow by adding custom task operators, sensors and plugins
Intel Scheduling (computing) Multiplication sign Source code 1 (number) Information technology consulting Subset Magnetic stripe card Web 2.0 Blog Scalar field Software framework Process (computing) Physical system Area Source code Email Touchscreen Broadcast programming Menu (computing) Bit Sequence Type theory Data management Process (computing) System programming Software framework Software testing Right angle Task (computing) Physical system Resultant Point (geometry) Dataflow Open source Computer file Event horizon Field (computer science) Wave packet Sequence Software Software testing Traffic reporting YouTube Task (computing) Projective plane Code Scalability Word Event horizon Game theory Scheduling (computing)
User interface Graph (mathematics) Demo (music) View (database) Table (information) Login Computer Error message Symbol table Task (computing)
Point (geometry) Dataflow Information Decision theory Code Decision theory Function (mathematics) Annulus (mathematics) Function (mathematics) output output Task (computing) Task (computing)
Group action Context awareness Code Graph (mathematics) Multiplication sign Source code Database Dirac delta function Shape (magazine) Parameter (computer programming) Bit rate Analogy Logic Information Process (computing) Error message Social class Source code Block (periodic table) Computer Sound effect Streaming media Parameter (computer programming) Bit Instance (computer science) Type theory Message passing Data management Arithmetic mean Process (computing) Order (biology) Website Task (computing) Resultant Point (geometry) Dataflow Functional (mathematics) Statistics Numbering scheme Branch (computer science) Distance Computer Power (physics) Latent heat Telecommunication Operator (mathematics) Software testing output Task (computing) Default (computer science) Default (computer science) Electronic data processing Shift operator Graph (mathematics) Information Graph of a function Operator (mathematics) Database Line (geometry) Shape (magazine) Word Logic Personal digital assistant Function (mathematics) Graph of a function Statement (computer science) Object (grammar) Graph (mathematics)
Context awareness Graph (mathematics) System administrator View (database) Mereology Type theory Computer configuration Different (Kate Ryan album) Information Exception handling Scripting language Area Trigonometry Electronic mailing list Branch (computer science) Parameter (computer programming) Instance (computer science) Entire function Type theory Macro (computer science) Process (computing) Order (biology) Task (computing) Electric current Dataflow Empennage Functional (mathematics) Inheritance (object-oriented programming) Link (knot theory) Connectivity (graph theory) Branch (computer science) Menu (computing) Rule of inference Template (C++) Operator (mathematics) Software testing Plug-in (computing) Task (computing) Default (computer science) User interface Rule of inference Plug-in (computing) Addition Default (computer science) Stapeldatei Graph (mathematics) Information Interface (computing) Operator (mathematics) Directory service Template (C++) Component-based software engineering Causality Word Personal digital assistant Function (mathematics) Exception handling
Dataflow Group action Context awareness Game controller Server (computing) Presentation of a group Service (economics) INTEGRAL Decision theory Multiplication sign Connectivity (graph theory) 1 (number) Virtual machine Set (mathematics) Login Roundness (object) Blog Term (mathematics) Operator (mathematics) Hierarchy Cuboid Software testing Metropolitan area network Task (computing) Social class Physical system User interface Addition Graph (mathematics) Information Projective plane Parallel port Data mining Type theory Arithmetic mean Process (computing) output
and thank you for coming to my talk uh it's
going to be on the air flow and workflows and but 1st just a few words about me and how we perceive I work with Python but also with JavaScript and with Linux also blog which some of you may know and I'm currently attacks the that internal and also a consultant at a company called the path right so let's talk about work how many of you know what I mean when I see work for your hands but this like for people in there and so it's a very vague time but it's a popular word also so it can be very confusing when I'm just saying that I'm going to talk about work for us so I'm going to define them a little bit too narrow down what I mean and I game of workflows I mean that we have a sequence of tasks that are started on some schedule or may be triggered by an event happening somewhere and these will carry out some work for us this is frequently used with data processing pipelines and other jobs in the big data field that it could workflow would could looks something like this I get some data from some data sources came in and downloading it for for processing then I'm sending enough for processing on an honor system somewhere else then have to monitor whether the processing was completed successfully and if it's done and is done I get the results of the processing back I generate a report and I sent this report out by e-mail to to some people so that's a very typical workflow example but is this workflow and methodologies so generic that they're examples of almost everywhere so many types of ppl uh workflows and I could be defined data warehousing is also a place where you know and use workflow you could use a workflow when you're doing a b testing to handle some of the automatic steps for you that anomaly detection is another area where workflows are used for training recommender systems that were just presented in the previous talk are probably the using some workflow system and to to get this job done and orchestrating automated testing this is actually what we are using air flow for in that Intel so that's why I marked that there and another example from bioinformatics field you could be processing some Gino every time a new gene file is published somewhere so all of these jobs could be handled by and by a workflow because of this a whole slew of new and workflow managers has started to come up in recent years and there is these which are listed on the screen these 5 are just a small subset of all of the ones that are available and these are the more more well known ones but I will be speaking today about air flow and airflow or Apache airflow is an open source project is written entirely in Python using uh some well known Python open source and technologies itself so it's based on flask user celery and it was originally developed by there being being but it's grown very quickly and extensively over the last couple of years so it currently has almost 300 contributors 4 thousand comments and many many stars on get have and it's used by hundreds of companies were using it at Intel I guess be and uses it and Yahoo title and many others are using it also and that had that that flow provides you with 3 things it provides you with a framework for writing your own workflows it provides a scheduler and quite a scalable executor for running the workflow tasks and it also provides a web UI for monitoring the workflows as they are running and for viewing lots so in this talk I will focus primarily on this 1st point on the framework in which you can use in air flow to define your own workflows and tasks and I will not be speaking very much about the executor that or the scalar but there already was a torque on that this morning a good talk with for for that it on Monday and food and was speaking about air flow also so if you missed that 1 and untrue can find it on youtube later right so before I begin showing you code examples and other things I will take a minute to show you airflow
itself the and give you a demo so when you set it up and run careful on your computer you will get this web interface to so it lists and all the workflows you have do you have defined in the table and if you click on this little place symbol right there you will be able to start work for manually just like that and then you can go in and take a look at your workflow this 1 is called Hello world and you will see that it's currently being executed it's running this task already managed to complete this task is going to be scheduled and 2nd and you can see hope you the whole history of your work for you workflows runs and for each of them and you get for each task you get an entry in this table you can click on it In view the logs of a particular uh of a
particular task if any errors occurred to the logs would be here and you can click on a graph you and then you will see that another view of the same of the same workflow from which you can also the wood slot so my hello
world and an example returns hello world so it works so that's this day you I and it's actually very easy to get to this point and installing air flow and setting up this quite easy and that I
will be talking more about the code needed them to write workflows in the 2nd but before I do that and I want to talk about what actually flows in a workflow a workflow actually called workflow so this in every task that we have in our workflow makes decisions and those decisions are based on the input to the workflow back was to the workflow run that was started and also the output of the upstream tasks so all information flows from
downstream to the from from upstream to downstream so stand like a river and I want you to think of the river for a minute like a river a workflow starts begin somewhere so it has a source may have many tributaries and which joined together to form this river and it also ends up somewhere like a river flowing down into the sea or it can form at many final branches like that like River Delta what a workflow was also does is it can have branches so OK this is where the analogy breaks down a little bit because rivers don't usually do that and the bad workflows do they can have very many branches which can split up from the main branch of the logic of the workflow and then back together to form the final workflow result so this is really a river it's a it's a graph it's a directed by cycling graph set where the information always promote flows from upstream to downstream and you can actually use that very creatively when you're designing your workflow because if you put some information into your into your workflow at any point in the it's like putting a message in a bottle into the river at some point it will slow down and passed every point like this you have to put many bottles into the river you wanted it to reach every point but the point is you can put information upstream and it'll flow that's specific put some information at point B it will be available to all points in the graph downstream of that effect but some information from the same thing happens and then finally uh at this end point where all the branches combine I get all the information and I can generate my report or do whatever I need to do with all this information so when year and generate when you're writing your workflows with this in mind you can make them quite modular and that make them use information from the from sources upstream in uh the tasks that are running after that look at that's the stats enough about rivers in about the and about the magic of the graphs let's get to the their flow and how workflow for powerful works with this so airflow uses the concept of the directed acyclic graph for the whole workflow definitions and that allows you to define the logic of your workflow as the shape of the of the graph and this is very easily done this is that this is actually complete the code example of their Hello World workflow that I was showing that at the beginning site from missing some import statements and so no working you through these couple of lines that it's very simple so 1st of all there's some python function that I want the workflow to execute this 1 just returns Hello world and and then I define as the DAG just by specifying a couple of parameters uh and using that bag of the context manager I define a couple of tasks by instantiating these operators so first one is called dummy operator 2nd 1 is called by phone operator but these these create past and in order to combine these tasks into a graph I can use this to the pitch shift operator that was overridden to allow joining tasks together so this method of defining graphs is very quick and easy and you when you get used to it you get it allows you to create an graphs as complex as you need and moreover since this is being defined in Python code you can use any looping logic that uh that you want to define more complex and more complex graphs um that next airflow concept I want to talk about is the operator this is that the way you define and the actions of a single task an operator is essentially a Python class with with them execute method and all that that's all you have to create to have a very robust and very robust entry in your graph and in your workflow because this will automatically be retried and uh if it fails it can be repeated until until it succeeds and therefore each of these function should be and potent so that if it runs multiple times it won't have unintended consequences but an an example is just a simple In fact I made this a slightly more complex than it needs to be because all it needs to be the class with the execute method but I
added this 1 parameter up there to show you that you can also be parametrized your your tasks by it and the definition of the bag when you're when you're putting them in the final and the final died out by passing the parameters that through them through the and you in its function for another concept airflow users is called from and sensors are long-running tasks and this is very useful for monitoring purposes so he habitat if you have some data processing job running somewhere you may want to check on it to periodically to see if it finished and the air flow gives you the ability to do this very simply if you define a sensor class where the pope method the pope method will be called repeatedly until it returns true so a very simple example is this 1 I have a sensor with with the pope with it and this 1 and this example is slightly silly but it just checks of the current time the minute is divisible by 3 and if it's not it returns False which means that the method will be called again after a certain number of time I think it's 1 minute by default and uh it until it returns true it will be called again and again and then finally when we reach the point where the current minute is the poetry we will return true and the sensor will x another very important concept in air flow is an ex-con or cross comes the it's a means of communicating between tasks and that this is just actually uh the way to save things in the database a simple way to say things in the database and and retrieve them later so because these things this mass of these messages that you pass are saved in the database as pickled objects is best suited for small pieces of data like object IDC rather than whole objects but it works very well when you use this way so it's very easy to use in your operator in your task in and execute function you have a parameter called context and if you just retrieve the task instance to the running past distance from this uh running at execution context you can uh call x com push function and to pass some information into into cross come and then in another task downstream of of that 1 you can call x comp or to retrieve this information and use it later the and you can also do the trick for scanning all upstream tasks and by using something like this code example which has these 3 lines and then in the middle here where I'm getting all the upstream tasks from the from the graph and then I'm calling x comp pull on all of the ideas of the upstream tasks and requiring that all the upstream tasks for a specific them for a specific piece of information and I get an error rate of older and of all the defined database cities in this case for example um what you can do when you're defining your workflows with air flow is actually create reusable operators and this is what makes an air flow workflows very modular because if you use a loosely coupled functions as your operator functions and meaning that you have only very few necessary parameters and past and by x and most other parameters are optional and have same defaults then you can put an operator like that a task like that in very many different types of workflows so if in this example have a I have a pink uh operator called nexus which can be something that collects information from a lot of a lot of upstream tasks and combines it somehow but it can also to be used in a different place in the different graph uh where it doesn't have all the same information coming from upstream but it knows how to behave well in that context as well and it plays a slightly different role in another workflow so this proved to be a very powerful technique and for us when we're doing an hour test aka test orchestration using your flow because word defining like blocks of of code which fit in many different places and and are able and we're able to combine them into very many different workflows by
reusing the same components in different contexts so if you uh if you pay attention to these details you you would be able to do the same thing yeah and so let's look back at the typical workflow that we started there tasks that you are part of the workflow are and defined in air flow through the operators that 1 that's used for monitoring the processing is a long-running sensor and all information that passes from upstream to downstream tasks goes through this CrossComm functionality there is some there some more interesting uh things that you can do also have for example if you want to and follow follow a certain branch of the graph and skip others you can use an operator called of that branch operator with the the branch Python operator so that's a very simple example of there you have a graph with and 3 with 3 tasks 1 upstream and downstream and upstream task the branching task decides which task to follow simply by returning the ID of the the past downstream that needs to be executed all others will be scared another way to skip tasks and the site and and therefore maybe skip entire branches of your workflow that you don't want to execute is by using a special kind of them the airflow exception called the airflow skip exception and this exception uh will force this particular task to be skipped whereas all other types of exceptions of they're not court will cause the task to be retried and if they're retry doesn't ultimately work out and they will so this skip exception uh is like putting them area stopping the flow of of the workflow downstream but you can actually control whether you're really stopping won't and that the IXA you should buying and deciding what to trigger rule your and tasks with your particular task test so by default that all tasks require all upstream tasks to be successful but in you can change that to a different 1 of 1 of the options listed up there and the 1 that I find particularly useful as all done which means that when the task and succeeded or failed your downstream task will execute and if you write your operators in such a way that they know how to behave even when the upstream failed there was skipped then and your you can continue their execution through through your work I'm going downstream so they're all done this is like opening the them from from downstream and of task you can do a lot of other very useful things with air flow that I'm not going to have to get into the details of that but for example you can run bash commands and as your as your tasks so in order to execute bash commands on worker you can use that the batch of operator and which allows you to pass in a batch script which is actually wrapped in ginger templates so you're actually running a bash script generated by a ginger template i guess an example is the moon worth more than a lot of words you can tell that put templated into the bash operator and that this will be the 1st executed and then the bash command there will be a will be executed on on the work yes so to therefore also allows you to to to to a lot of plug-ins to extend that so many writing plug-ins is also very simple you just and create a subclass of uh of airflow plugin and put it in the target directory and then you can put you can define a whole list of things that airflow users and then make them they have available to your instance of of air flow so operators we already talked about you can also define menu links for the web interface that I was showing at the beginning you can create whole admin views because it's based on an flask admin so you can you can add create additional views and and administrative interface or even entire flask blueprints that you can plug in and it's actually very expandable so I'm sure it will it'll be useful for many cases that you have right that's it from me and
there is there tutorial available on my blog so if you want to get started quickly and with trying year flow you there and think you few me how how you have any questions please raise your hand and so I once asked those air flow or integrate with distributed systems so if you want to distribute your present for example you run some methods and you actually want to on distributed over multiple servers How can you then a because you trigger like an execution of a process with air flow so how can you then decide that if for example not runs in your local machine but them on a different server somewhere else so um then the underlying technology that were using for distributing work along different workers a salary and salary gives you a lot of control over where things get get executed so that would be 1 way and other than that I guess it would be more manual and and and automatic but um we didn't actually have to uh make these decisions because all our workers are capable of running the same set of tasks so and not going to give you a definitive answer of celery is on there so so going that way and work hi I'm a have to question actually the 1st 1 is like this it's a resilient yeah is it like to use it in collection of them on desk or get and do you have any used his of 4 sensors yes yes yes we do because we are actually uh like a mention we're using it for orchestrating and it's not automatic testing and these tests can run for a long time so we're actually checking on the executed 1st which they're not the same as or careful workers are a for workers are just the ones that triggered the test but we're checking and execute a set of they finished running all the test and then when they're done then then and we can make some decisions about what what to do next and sensors to work in that context quite well and the I had a great stock of things for that but 2 questions that have is blue room air flowing more than 1 molds and the other question is have you seen full being used should tasks that he choirman input that I'm not sure I I heard you correctly you OK so yeah the 1st question was so who you run of fill in more than 1 note like candy to handle works all being running more than 1 component 1 of them 1 would 1 1 server and and out the question is have you seen have you will ever have experience of running our full for tasks that require man knowing which some user input out of so 1st question actually has a good answer because uh we are running and as at the triplicate of of service so we have um 3 Web interface hosts 3 3 worker host 3 schedulers and without actually having to do very much more airflow was the able to behave very well in this context that was running in parallel on 3 different servers all 3 services were running together and they are able to exchange information so when I click to view logs on uh on the web interface it's able to fool the logs from the correct worker it knows where the where the loves a so it's able to work on multiple uh Nolde's like that very well in terms of manual input and to do that we had to use we have to create an API that has that that that will have a user interface but stood at API were actually calling airflows uh methods for starting some some workflows with additional input from users that is not something that comes out of the box but we were able to extend by adding some API methods to the ministries and the yeah thank you for the talk I was very interesting on the 1 question that would be very useful for a project of mine is uncanny actually group the operators and their use to groups and when you say you want to group them the mean you want to group them uh logically like in the class hierarchy or group them into uh into smaller workflows now for example if I always use the same 5 operator synthesizer consideration and then kind I put them into 1 Over Ricci operator somehow so there is there is an operator type that I haven't experimented with and I'm not sure how well it works but it's there for this purpose it's called the sub-DAG operators to create like like in the DAG with a bunch of operators uh graph of of the 5 operators we like you were talking about and then use that as an operator itself so you kind of put the whole graph into another graph but avenues that's Omniture how how well it works OK thank you so much thank you me how please give another round of applause to me how