We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Distributed Workflows with Flowy

00:00

Formal Metadata

Title
Distributed Workflows with Flowy
Title of Series
Part Number
21
Number of Parts
173
Author
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language
Production PlaceBilbao, Euskadi, Spain

Content Metadata

Subject Area
Genre
Abstract
Sever Banesiu - Distributed Workflows with Flowy This presentation introduces Flowy, a library for building and running distributed, asynchronous workflows built on top of different backends (such as Amazon’s SWF). Flowy deals away with the spaghetti code that often crops up from orchestrating complex workflows. It is ideal for applications that do multi-phased batch processing, media encoding, long-running tasks, and/or background processing. We'll start by discussing Flowy's unique execution model and see how different execution topologies can be implemented on top of it. During the talk we'll run and visualize workflows using a local backend. We'll then take a look at what it takes to scale beyond a single machine by using an external service like SWF.
Keywords
51
68
Thumbnail
39:40
108
Thumbnail
29:48
Machine codeDemo (music)Process modelingSample (statistics)Scaling (geometry)Metropolitan area networkProcess modelingVideoconferencingDataflowFile formatoutputOverlay-NetzTask (computing)ThumbnailProcess (computing)Queue (abstract data type)NumberPresentation of a groupProblemorientierte ProgrammierspracheUniform resource locatorMereologyInferenceExecution unitDirection (geometry)CASE <Informatik>Library (computing)Mixed realityIndependence (probability theory)LogicConcurrency (computer science)Complex (psychology)CodeTerm (mathematics)Automatic differentiationBitEntire functionPoint (geometry)Sheaf (mathematics)Film editingMultiplication signChemical equationEndliche ModelltheorieCodierung <Programmierung>Graph (mathematics)Mixture modelSingle-precision floating-point formatThread (computing)Online helpSoftware testingDynamical systemComputer animation
Metropolitan area networkExecutive information systemDataflowNormal (geometry)Position operatorVideoconferencingUniform resource locatorRegular graphProcess modelingResultantAlgebraic closureDistribution (mathematics)outputProcess (computing)CodeQueue (abstract data type)Functional (mathematics)CASE <Informatik>Task (computing)System callSoftware testingInjektivitätFunction (mathematics)Decision theorySequenceXMLComputer animation
Metropolitan area networkFinite element methodLine (geometry)Computer fileQueue (abstract data type)Multiplication signTask (computing)Computer animation
Gamma functionLine (geometry)Computer fileQueue (abstract data type)System callWave packetRight angleExterior algebraComputer animation
Metropolitan area networkMereologyCodeRegular graph2 (number)SequenceMultiplication signConcurrency (computer science)Lecture/Conference
Reverse engineeringComputer-generated imageryMetropolitan area networkError messageParallel portSoftware testingDiagram2 (number)Multiplication signKey (cryptography)Task (computing)Arrow of timeJSONXMLComputer animationEngineering drawing
Metropolitan area networkMoving averageTask (computing)Queue (abstract data type)Uniform resource locatorAsynchronous Transfer ModeMultiplication signArithmetic progressionProxy serverTask (computing)Projective planeExterior algebraScheduling (computing)Reverse engineeringDecision theoryRight angleResultantCASE <Informatik>Ocean currentFree variables and bound variablesType theoryParameter (computer programming)Message passingQueue (abstract data type)Concurrency (computer science)Software testingDemo (music)Service (economics)Physical systemTrailOrder (biology)Phase transitionData storage deviceLogicDataflowInjektivitätVirtual machineLetterpress printingDeclarative programmingStatement (computer science)CodeMereologyState of matterCore dumpArrow of timeSystem callTraffic reportingFunctional (mathematics)Revision controlElectronic data processingFront and back endsReal numberParallel portOpen sourceEngineering drawingDiagramComputer animationProgram flowchart
Uniform resource locatorAsynchronous Transfer ModeMachine codeoutputMathematicsMetropolitan area networkResultantSerial portSystem callNormal (geometry)Error messageException handlingTask (computing)Arithmetic meanVapor barrierNeuroinformatikFunctional (mathematics)Process modelingMereologyVideoconferencingCodeNumberProxy serverCondition numberParallel portPoint (geometry)Electronic mailing listParameter (computer programming)Order (biology)Free variables and bound variablesoutputPerformance appraisalScheduling (computing)Instance (computer science)Template (C++)TupleArithmetic progressionRun time (program lifecycle phase)Decision theoryConcurrency (computer science)Codierung <Programmierung>CASE <Informatik>Electronic visual displayData structureMachine codeParticle systemMiniDiscMultiplication signGreatest elementSoftware testingCommunications protocolExecution unitBitDivisorSound effectGodWeight functionRight angleXMLComputer animation
IdempotentSingle-precision floating-point formatData typeUniformer RaumDataflowError messageFront and back endsResultantScaling (geometry)Data managementSimilarity (geometry)LogarithmPoint (geometry)CodeProcess modelingMultiplication signDecision theoryState of matterTask (computing)outputProxy serverParameter (computer programming)MereologyMathematicsLogicConfiguration spaceMaxima and minimaFault-tolerant systemCASE <Informatik>Type theoryArithmetic progressionBoolean algebraData transmissionLatent heatSystem callMessage passingMechanism designParallel portPhysical systemOperator (mathematics)Vapor barrierMultiplicationWater vaporVirtual machineDataflowAreaSoftware testingLine (geometry)Uniqueness quantificationFormal languageDistanceMachine codeWordPropagatorCommunications protocolSemantics (computer science)Run time (program lifecycle phase)Dimensional analysisTheoryXMLComputer animation
Generating functionCodeLibrary (computing)Block (periodic table)Task (computing)ThumbnailLocal ringResultantPoint (geometry)Network topologyMultilaterationLine (geometry)Graph (mathematics)Electronic mailing listProcess (computing)Greatest elementProgrammschleifeEmailOrder (biology)Condition numberSingle-precision floating-point formatMathematicsFilm editingForm (programming)Real numberFerry CorstenMultiplication signAttribute grammarLecture/Conference
Uniform resource locatorAsynchronous Transfer ModeMetropolitan area networkArmQueue (abstract data type)CodeMultiplication signNetwork topologyElectric generatorResultantVideoconferencingSoftware testingWebsiteParallel port2 (number)Task (computing)Wave packetThumbnailArithmetic meanXML
Line (geometry)Task (computing)Point (geometry)Electronic mailing listCodeThumbnailRule of inferenceProcess (computing)WordData conversionBlock (periodic table)Mobile WebPhysical systemGoodness of fitLecture/Conference
Transcript: English(auto-generated)
Hello, everyone. Thank you for joining me. My name is Sever, and this presentation is about the library that I'm working on, which makes it easy to model and run distributed workflows. There will be a Q&A section at the end, hopefully, if there is not enough time.
You can also stop me during the talk and ask me questions if anything is unclear. Okay, so let's get started. We'll start by discussing a bit what the workflow is, and then I will show you a quick demo,
and spend the next part of the presentation trying to explain what happened during the demo. So, the term workflow is used in many different contexts, but for our purpose, a distributed workflow is some kind of complex process,
which is composed of a mix of independent and interdependent units of work that are called tasks. Usually, workflows are modeled with DAGs, which stands for Directed Acyclic Graphs,
between the tasks, and they are modeled using some domain-specific language. Or with ad hoc code, like when you have a job queue,
but what you really try to accomplish is to have an entire workflow, and you use the job queue and the tasks in the job queue to do some work, but also to schedule the next steps that should happen during the workflow. Neither of those provide a good solution, and the reason for that is because DAGs are too rigid.
You cannot have dynamic stuff happening there, usually. And the ad hoc approach, where you have the job queues, tends to create code that is hard to maintain,
because the entire workflow logic is spread across all the tasks that are part of the workflow. Another problem with the ad hoc approach is that usually it's very hard to synchronize tasks between them,
so if you want to have a task started only after other tasks are finished, that's usually pretty hard to do. Flow takes a different approach for the workflow modeling problem, and it uses single-threaded Python code,
and something that I call gradual concurrency inference. Here's the toy example of a video processing workflow. At the top we have some input data, and in our case there are two URLs for a video and a subtitle,
and then there is an entire workflow that will process this data, and what it will do, it will try to overlay the subtitle on the video and encode the video in some target formats. It will also try to find some chapters, some cut points in the videos,
and extract thumbnails from there, and will try to analyze the subtitle and target some ads for this video. So the interesting thing here, and something that you cannot easily do with DAGs,
is the part where the thumbnails are extracted. This is a dynamic step, and the number of thumbnail extraction tasks can differ based on the video, so this is where you need some flexibility.
So next I would like to show you how this workflow is implemented in Flowey, and then, like I said earlier, I'll try to explain what really happened there.
So let's see. So I start with the activities, or rather the tasks,
and in this case I'm using some dummy tasks. You can see all of them have some sleep timer in there, just to simulate they are doing something,
and they are regular Python functions, there is nothing special about them. They just get some input data, do some processing, and output the result. So this is similar with what you will get in Celery or a regular job queue.
This is the workflow code, so it's the code that would implement the workflow that we saw earlier. Again, it's regular Python code, we are just calling the tasks,
but there is something funny about it because it has a closure, and we are not importing the task functions themselves. And there is a reason for this, this is a kind of dependency injection, and there is a reason for it, and we'll see later why this would be useful.
Other than that, there are just function calls and regular Python code. Actually, I'm going to demonstrate that this is not anything special by running this code.
So what I do here, I import all the tasks and the workflow function. I'm going to pass the tasks to the workflow closure, and then call the closure with the input data,
and this will run the workflow code sequentially. I'm also going to time this execution.
So it will take a while because of the timers that I have there, forcing the task to sleep, and hopefully, yeah, that's what happens.
Sorry about that, I'll try again.
I don't know what's going on, but whatever.
Yeah, something is wrong. So usually it should work with, it's just regular Python code, so there is no reason for it not to work. But the interesting part here, so running that code would take about 10 seconds
because of all the timers, and everything will happen in sequence. So the interesting part is being able to run this as a workflow and have all that concurrency happening. So I'll try to do that. Okay, so it went much faster, about two seconds,
and the reason for that is because all the tasks that could be executed in parallel were executed at the same time, as we can see in the diagram that was generated. So the arrows there represent a dependency between the tasks,
and we can see a lot of them were being executed at the same time. So I'm gonna try to explain how that works and why it went so fast versus the previous version which didn't work.
So in order to understand what was happening during the demo, I have to talk about workflow engines first. And we begin with a simple task queue
where we have all the tasks that we want to be executed. The workflows are pulling the tasks from the queue and are running them, and as I said, when you have an approach similar to this, there must be some additional code in the task
that will know to schedule other tasks when they are finished. So they also generate other tasks besides the usual data processing that they are doing. And this is not very good because the workflow logic will get spread,
and like I said, it's also very hard to synchronize between different tasks. So another idea would be to have the task generate a special type of task called a decision, and what the decision does,
instead of doing some data processing, it will only schedule other tasks in the queue. So it acts as a kind of orchestrator. Like we can see here, the flow from the storage to the worker is reversed because the orchestrate, the decision,
will read data from the data store in order to try to get a snapshot of the workflow history and the workflow state, and based on that state and all the tasks that were finished, it will try to come up with other tasks that must be executed next.
But this solution is also not very good because you could have concurrency problems. So if two tasks finish right after the other,
you can get two decisions scheduled, and if those are executed in parallel by two workers, they will generate duplicate tasks in the queue. So this is not a perfect solution. So in order to improve this even more, we need to have the queues managed in a way
that all the decisions for a particular workflow execution will happen in sequence. And for this, we introduce another layer that will ensure this.
Another thing we would also want to add is this kind of time tracking system that will know how much time a worker has spent running some tasks, and it can declare the tasks as time out
if a certain amount of time passes without the worker doing any progress. So this is not something new. This kind of workflow engine is implemented and provided by the Amazon SWF service.
It's also available as an open source alternative in the eucalyptus project with the same API that Amazon has. There is also a Redis-based engine similar to this in the works that I know of, and there's also the local backend
that you saw earlier in the demo, and the local backend will create all this engine and the workers in a single machine on a single machine and will run them only for the duration of the worker, for the duration of the workflow, and then everything gets destroyed.
So hopefully by this time, this was the workflow code in the demo, so hopefully at this time
you kind of get an understanding that this code will run multiple times. So every time a decision needs to be made for this workflow to have progress on it, this code will be executed again. So if I were to put a print statement there and run the workflow, I would see a lot of print messages.
Okay, so I mentioned earlier about the dependency injection and why that's needed, and the reason for it is because Flowy will inject some proxies
instead of the real task functions, and the proxies are callables and will act just as a task would, but they are a bit special. So when a proxy is called,
the call itself is non-blocking, so it will return very fast, and the return value of the proxy is a task result. And the task result can have three different types. It can be a placeholder in the case that we don't have a value for that task,
or maybe the task is currently running and we don't have a result for it. It can be a success if the task was completed successfully and we do have a value for it,
or it can be an error if for some reason the task failed. The other thing a proxy call does, it looks at the arguments and tries to find other task results that are part of the arguments. If any of the argument is a placeholder,
then this means that the current activity or task cannot be scheduled yet because it has dependencies that are not yet satisfied. So it will track the results of the previous proxy calls
through the entire workflow, like we can see here. So in this case, when the code is run for the first time in the workflow, the embedded subtitle task will be scheduled
and its result will be a placeholder because we don't have a value for it. But the calls for the video encoding won't schedule any activities because they will have placeholder as part of their arguments, meaning that there are unsatisfied dependencies.
And in this case, the results for the proxy calls for the encode video task will also be placeholders. So what this does, it's actually building the DAG dynamically at runtime
by tracing all the results from the proxy calls through the arguments of other proxy calls. And finally, workflow finished its execution when the return value contains no placeholders,
meaning that all the activities or all the tasks that were needed to compose the final result are finished. And like you can see here, this is true even for data structures. So we have here a tuple and the values are inside the tuple
and this will continue to work and the templates there are in our list and those will also get picked up. So you can use any kind of data structures for the return data as long as it can be JSON serialized. That's what it's used for serialization.
So there are a couple of important things to keep in mind when writing a workflow. Basically what you want is for all the decision executions to have the same execution path in your code for the same workflow instance,
so for all the decisions that belong to the same workflow instance. This usually means that you have to use pure functions in your workflow or if you want some kind of side effects, either send those values through the input data to the workflow or have dedicated activities for them
or dedicated tasks for them. So the other thing you can do with the task result is to use it as a Python value. Like we see here, I'm squaring two numbers and then I'm adding them together
and when this happens, if any of the value involved is a placeholder, meaning that there is no result for it yet, a special exception is raised that will interrupt the execution of this function.
So in effect, this acts as a barrier in your workflow and it won't get passed until you have the values for the results that are involved. This also means that if you have code
after this displays that can be concurrent, it won't be detected so you have to make sure that you access the values as late as possible to have the greatest concurrency. A similar thing happens
in the original code of the example where we iterate over the chapters that are found in the video. So here too, this acts as a barrier but being at the bottom, it didn't affect the rest of the code so you may have not noticed it.
Another example is when you have a situation like this one. So here I'm squaring two numbers and then I may want to do some optional additional computation
and it's not clear in what order the if conditions should be written because in this case, if the b computation, so squaring of the b is the first one to finish
because I have the conditional on the a value, it will have to wait until the result for a is available to progress further in the workflow. And no matter how I try to write the code, there will always be a case where the workflow cannot make progress
until the other value is available. And this is kind of a problem but it can be solved with something that is called a sub-workflow. So here I refactored the code that did the processing for each number in part
in a sub-workflow and then in the main workflow, I'm using the sub-workflows as I would use a regular task and this way they can all happen in parallel and when both are finished, I can sum them and return the result.
So workflows are a great way to do more complex things that you couldn't without them. And another thing to notice here, in the main workflow, I didn't have to do anything special
to use the sub-workflows. They are used just as regular tasks. So for error handling, you might expect the error handling to look something like this.
This is how a normal Python code would look like if you had some exceptions in a function. But this is not possible because as I said earlier, the proxy call is non-blocking so you cannot get the exception at this point.
So actually, this is the place where you have to write your try-except clause. So the reason for this is because only at this point we can force the evaluation of the result
and only at this point we know for sure if the computation was successful or not. And this looks a bit strange and I don't like it too much. There is a better way of doing it using the wait function and it comes in flowy.
And what this does, it will try to dereference the task result and it's similar as doing an operation on it. And the name is a reminder that this will act as a barrier
so nothing will pass this point until, not only that it won't pass this point but won't be detected even if it could be executed in parallel until this value is available. But this is not always the case.
Maybe you don't want to use the value in the workflow itself. You just want to pass the value from a task to another task. And in this case, how do you pick up errors?
So what would happen here if the result for B is an error? When you're passing an error in the arguments of another proxy call, the proxy call will also return an error. So the errors propagate from one task to the other.
And if the result value that you try to return from the workflow contains errors, then the workflow itself will fail. So you cannot dodge errors. You have to deal with them.
Or you can ignore them by not making them part of the final result, in which case you will get some warning message that you had some errors that were not picked up by your code or handled. So the workflows can also scale
by using some of the other backends that I mentioned earlier, the Amazon one or Eucalyptus. And there are, when you want to scale, basically nothing changes in the workflow, so you would still use the code that you saw earlier.
There are some additional configurations that you have to do that happens outside of the code, so are not part of the code. Because when you scale and you want to run the workflow on multiple machines, in a distributed system there can be all kinds of failures.
There are some execution timers that you can set, and those will help you with fault tolerance. There is another type of error that you can get when you scale, which is a timeout error,
which is a subclass of the task error that we saw earlier. So you can have special handling for timeouts. There is automatic retry mechanisms in place for the timeouts, and you can configure them as you wish.
There is also the notion of the harbits, and the harbits are some callables that a task can call, and what it does when a harbit is called, it will send a message to the backend telling the backend that the current task
is still doing progress, but another thing that it does, it will return a boolean value in the task, and that boolean value can be used to know if the task timed out, in which case you can abandon its execution because even if it finishes the execution successfully,
its result will be rejected by the backend. Another thing to keep in mind, you should aim to have tasks written in such a way that they can run multiple times just because of the failures that can happen
and the retries. The tasks, or the activities, I'm using the, they mean mostly the same thing, can be implemented in other languages, so you can use Flowy only for orchestration and workflow modeling, so the engine and the logic to run the activities.
There are some restrictions on the size of the data that can be passed as input or the result size. Each worker, so when you are scaling
and you run on multiple machines, you would have workers that are running continuously, not like we had for the local backend where they were running only for the duration of the workflow, and those workers are single-threaded, single-process, so if you want more of them on single machine, you have to use your own process manager
and start them and make sure that they are alive. And if the history gets too large, so the decision must use the workflow history, the workflow execution history, and the workflow state to make decisions, and if the history gets too large, and actually the history,
the data that is transferred because of the history has an exponential growth, you can reduce that by using subworkflows. Subworkflows will only appear as a single entity in the history, so you can get logarithmic data transfer
by using subworkflows in a smart way. And because of the fault tolerance built in, you can scale down, so you could,
like for example, all the workers can die at some point in time, and then after a while they would come back online, and the workflow progress won't be lost. You may still lose the progress on specific tasks, but the workflow itself, the workflow progress
won't be lost, and this is very useful for workflows that take a very long time to run. I think the maximum duration for Amazon is like one year for a workflow, so this can be very useful in some situations. You can scale up very easily, just start new machines, and they will connect to the queues
and start pulling tasks that need to be executed. Thank you, that was all. If you have questions, I think now is a good time.
How does this compare to Celery? There is Celery, you can create tasks and it will automate them. How can you compare it? So with Celery, so Celery is a distributed task queue,
or job queue, and it's a bit different because here you have the orchestration of the tasks, so if you have many tasks and you want them to operate in a certain way with some dependencies between them and to pass that data between them,
you can do that by writing single-threaded code, and from that single-threaded code, the dependency graph will be inferred for you and it will make sure that the tasks are scheduled in the correct order and they get the data they need passed in.
So I would use Celery for one-off jobs, sending an email or something, but not for hundreds of jobs that are somehow interdependent. It also has Canvas, which is more like a DAG
where you define your workflow topology before not in such a dynamic way you can do with single-threaded Python code where you can have conditions and for loops and all that. Thank you. What asynchronous library do you use
at the bottom of the lobby? Sorry. What? Asynchronous library? AsyncDB? AsyncDB? I don't think I'm using any asynchronous library.
For the local backend, I'm using the futures module to implement the workers, but there is no asynchronous library involved. Okay, thanks.
Yeah, in the example workflow, you showed one of the tasks returns the list, so the list of chapter points that then gets fed into something that builds thumbnails for the chapters.
Do you have to wait? Does that task essentially block until every single chapter has been found? Or would it be possible, maybe with code changes, to support, say, a generator function so you could start building a thumbnail for the first chapter while the task is still finding the later chapters? Yeah, so here it will block, so any code under the thumbnails line
won't be executed until we have the chapters, and this is because the find chapters returns a list, and it's a single result, and we cannot get partial results from the task, so we have to wait until the entire result is available.
Yeah, so anything below that will be blocked until the result is available, and this isn't such a big problem, usually, because there are ways to write the code, and this doesn't become a problem, or if it is a problem,
you can create a sub-workflow, so I could have a sub-workflow that would do only the find chapters and the thumbnail generation, and then call the sub-workflow from here, and have that running in parallel with the other code.
Sorry, just to follow up then. Does that mean that in this example, add tags, which you could start processing immediately, won't be executed immediately because you're waiting for the video encoding to finish? No, so in this case, in this example, all the tasks that can be executed in parallel
will be executed in parallel, so the actual execution topology will basically look exactly like this one, so this is how it will get executed. That's why the workflow duration was about two seconds instead of 11 or something.
The time for the last question? Kind of a repeat of the previous one. He made a good point about, not the thumbnail line, but the line above where it's finding the chapters and returning the list.
It won't return from find chapters until it's found all three of the chapters, but could you convert find chapters to be a generator or get it to return next chapter, and then you can do the thumbnail for the first chapter while find chapters is still finding the same chapter? Yeah, you could have a task that will only find the first chapter
and return that, and then call the task again, and it will resume from that point. You can actually send the last chapter and find the next one, and this way you can solve the problem if you want to, so it really depends on how you write your code.
The only rule you have to remember is that when you try to access a value in the workflow, it will block until the value is available. That's basically the only thing you need to know. Anything below that point won't be detected and cannot be concurrent, and that can be solved through subworkflows.
Saver, thank you very much for your talk. Thank you.