We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Building Workflows With Celery

00:00

Formal Metadata

Title
Building Workflows With Celery
Title of Series
Number of Parts
50
Author
Contributors
License
CC Attribution - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Task Queues is a topic which most developers will eventually have to dive into, specially in today’s web development world. The idea is really simple: whenever one has any functionality which might take too long to perform, one can spawn a process which will take care of this functionality without having to block the app’s main loop. A task queue will use worker processes to execute these long-running tasks and the user does not have to wait until the task is done. Instead, an acknowledged message is presented to de user while the task is executed in the background. This concept is really important when building web applications. HTTP Requests have timeout and making the user wait a long time for something to finish is not a good user experience practice. Usually, these tasks are used in groups creating a workflow where the work is distributed into smaller tasks. Celery is usually the first project one encounters when searching for task queues and Django. I have been using Celery for over four years. The Celery project is one of the most robust task queues out there. It is certainly not the only task queue. And, it can be difficult planning the correct architecture for a specific workflow. This talk will explain enough of Celery’s basics to understand how to build workflows with Celery. Building workflows with Celery is never straight forward. This is mainly because Celery offers the building blocks to build workflows but it tries to move out of the way. By not being too intrusive, Celery allows building complex workflows. I will explain common patters and tips to successfully use celery to build workflow of different complexities.
CodeDistribution (mathematics)Process (computing)Front and back endsMassPartial derivativeLetterpress printingElectronic signatureTask (computing)Similarity (geometry)SerializabilityComputer configurationParameter (computer programming)Link (knot theory)Electronic signatureTask (computing)Queue (abstract data type)Different (Kate Ryan album)Functional (mathematics)Process (computing)Partial derivativeEndliche ModelltheorieCartesian coordinate systemRepresentation (politics)CodeString (computer science)ResultantData storage deviceComputer configurationData dictionaryMessage passingSerial portType theoryParameter (computer programming)Keyboard shortcutObject (grammar)1 (number)Computer fileError messageNumberVirtual machineService-oriented architectureRemote procedure callInstance (computer science)Front and back endsProcess (computing)Computer architectureCore dumpMereologyLine (geometry)Transportation theory (mathematics)Metropolitan area networkComputer animation
Task (computing)Local GroupElectronic mailing listEmailAddress spaceSample (statistics)Random numberLoginEvent horizonInformation securityProcess (computing)Dependent and independent variablesProcess (computing)Error messageGroup actionDifferent (Kate Ryan album)Task (computing)Chord (peer-to-peer)Parallel portPrimitive (album)Order (biology)Object (grammar)BitError messageLink (knot theory)Dependent and independent variablesDatabaseOutlierCodeResultantFunctional (mathematics)ChainMachine learningSystem callEntire functionLevel (video gaming)Parameter (computer programming)Event horizonMedical imagingException handlingInstance (computer science)Set (mathematics)Mathematical analysisVirtual machineMultiplicationComputer animation
Local GroupLoginEvent horizonDependent and independent variablesFront and back endsTask (computing)Inheritance (object-oriented programming)Slide ruleSoftware repositoryKolmogorov complexityMessage passingResultantSocial classDifferent (Kate Ryan album)Order (biology)Software bugProjective planeData managementError messageTask (computing)Link (knot theory)CodeGroup actionMobile appObject (grammar)Front and back endsLatent heatProcess (computing)Configuration spacePersonal identification numberType theoryProduct (business)Online helpImplementationDecision theoryCoroutineProgrammschleifeException handlingInheritance (object-oriented programming)Entire functionDrop (liquid)Level (video gaming)Computer animation
Coma BerenicesInternet service providerData typeXMLComputer animation
Transcript: English(auto-generated)
Buenas tardes. I promise you this talk will be in English.
Introducing Jose Coronel, the title of his presentation, Building Workflows with Celery. So yeah, we'll welcome Jose Coronel for his talk in English.
How's it going, everybody? So let's start. Well, first, yeah, my name is Jose Valderán Coronel. I work for the Texas Advanced Computer Center at Austin. And I'm also a small part of the core team for Celery.
So let's start with this hopefully very interesting presentation. And first let's start with a very quick overview of what Celery is for anybody who doesn't know out there. And Celery is basically a task queue which what allows us to do is to run things asynchronously.
Now the only thing is that you might be thinking, well, nowadays we have all these different projects like Async.io and all the different asynchronic world out there. But the very interesting thing about Celery is that it allows us to do distributed things. So run things remotely in other computers.
Also, it gives us a lot of different things in a way that we can, for instance, apply a retry policy on different tasks that doesn't run or they error out or something like that. And we can also store results in different ways. This is just like a very quick also way to view how the Celery architecture actually works.
And everything starts with our main application which is usually what we call our producer. And we call it like that because this is actually what creates or produces all the different messages that are going to be sent to the workers.
And the workers are the ones who are actually going to run these different tasks that we're queuing. The cool thing about this is that if you think about it, you can run different tasks in different machines that have different resources or have access to other different resources.
Inside each one of these workers, what we have is usually a main process and then a number of other worker processes. And this is how we can run things also concurrently. And all of these, actually the way that they communicate between themselves is using something that's called a transport which is usually something like RabbitMQ
or we can also use Redis. And there's other supports for other types of transports, even a simple file transport. And we can also use a backend which is usually what we name, what we use to save the different task results.
So, how can we start building these different workflows with Celery? Because we have talked about how Celery basically just runs different code in remote machines, right? In distributed machines, if you can call it that. So everything starts actually in Celery with what we call signatures.
And signatures are basically very related to what Python partials are. They're implemented in a different way, but the idea of it is basically the same. So the way that they work is that if we have a task, for instance here, the task that we're going to use is projects.tasks.my-task.
We can create a signature from this task. And this signature is basically going to be a representation of what we have here at the end of the slide, which is just a way to actually execute that function itself,
which is projects.tasks.my-task with some specific arguments. It could be just like normal parameters or also keyword parameters. The other cool thing about these type of signatures is that they're easily serializable.
Sorry, that's really hard to pronounce. Serializable, yeah. Is that okay? Let's just continue with that, yeah. Okay, so for instance, the serialization of this task, or well, of this signature, as we can take a look at it here, it's actually just a very, very simple dictionary.
And this is what allows us to actually send this message into our workers through something like RabbitMQ or Redis or any other type of broker and have that worker know what it needs to execute. So we can see signatures actually starts creating this message that we're going to send.
So the message itself doesn't contain any code. And that's one of the things that we have to kind of get a grasp on because we have to have the code that we need to run in each one of the workers. That way the producer only tells the worker what it needs to run,
with which parameters, and then the worker does its job. The other cool thing about signatures is that we're able to define different options that Celery actually uses. So for instance, we can specify different queues on where these tasks are going to run.
And we have this example here where we're actually setting a custom queue whenever we're creating our signature. Then the other good thing is that we can create a signature and then after creating that signature, we can set different arguments for it or we can merge different options. So for instance, here on the first line, we're creating a signature
or we're using a signature that we have already created here and we are actually using another parameter which is going to be this string with the value of new model ID. Or we can also set another queue that that task is going to go to.
Now, this is the very beginning of how to start creating workflows. The other idea that we need to realize here is that we can create signatures in different ways, right? And one of the other ways is to create it directly from the task object.
So we can see here there are some shortcuts which is .s and .si. So .si, well .s first creates a regular signature as we just saw and .si, what it does is that it creates an immutable signature and that signature will not allow to get any new parameters or anything like that.
The other thing to see here is callbacks because now we know how we can send these tasks to different workers. Now, how can we start creating these workflows? So the very first step of it is to create a callback after one task gets run or one function runs and it's successful, then we can run something else.
And this is the way that we can actually start setting different callbacks. It's very simple. One of the main ways to do it is whenever we queue one task, we can tell Celery to actually run something after it has been successful using the link parameter. The link underscore error, that's going to be the error callback
whenever that task fails. Or, if we're creating a signature, then we can actually set these different links or link error to callbacks. Now, after this, this is the very basic ideas on how everything that has to do with workflows work in Celery.
And then it's a little bit hard to just kind of start building workflows with just these two different objects. So Celery allows us, or well, Celery gives us these different primitives which we can use to start creating better
and just a little bit more complicated workflows. The first one is a chain, which is basically what we just saw. It's running first one function, and then once that function executes, we can run something else. So first this one, and then we send the result to whatever the callback is going to be. And this is the way that we can actually define a chain.
As we can see here, it might be a little bit difficult to see first, but the way that what we're using here is a pipe operator, operand. Or we can also just use the actual chain function that we can import from Celery. The other thing that we can use is a group. Basically what we do with a group is that we queue multiple tasks
with different arguments, but they are going to run in parallel, as we can see here in the image. So for instance here, what we're going to do is that we're going to run an array of tasks, and they're going to run each one with different parameters, and they're going to run in parallel here.
And we're creating this array of tasks here in these four statements. The other primitive that we can use are courts. And this, as we can see from the image, actually it's calling one or more tasks and running those in parallel, and then running a callback after each one of those has been run successfully.
And this is basically what we can also call a group with a callback. So there's two ways to create a court. One is to create a header, which is an array of one or more tasks, and then a callback, and execute that. But also, we can just chain a group with a callback,
and that's basically the same thing. Celery will know that this is going to be a court. The other primitive is map, which is something that we can use whenever we have an array of parameters, and we need to run this array on the same task. And the only difference is that it's not going to be in parallel as a group.
It's going to be sequential. And we also have star map, which is basically the same thing as map, but it's whenever the array has each one of the elements, it's another interval. So let's see which kind of workflows can we create from this. Let's think about whenever somebody logs into an account,
and you want to check if that user is logging in from a new device. So there's different things that we can do in the background. And let's go through kind of just like the requirements that we need for it. First, we need to check if the device has been used before. Then we need to, if the device has been used before,
then we need to notify the user that this is a new device, and we need to save that event into a database. These two can probably run in parallel, so we're already kind of building this workflow in our own mind. Then we can send whatever we get from all these events,
we can send it to an external API. Let's say that it's not really external to the company, but it's just an API that another group is developing. So for us, it's going to be external. And let's say that that API does some machine learning stuff, like outlier analysis or something like that. And then finally, we get whatever response we get from this API,
and we post-process it in different ways. We could probably, again, save it into a database or maybe send another notification, or maybe just flip a switch somewhere. So let's start. How can we start drawing our workflow? The first step is going to be just to check if the device is a new device.
And then we have these two tasks, which we already said that we can run in parallel. So we start kind of drawing them here, so we have an idea of the different tools that we can use. Then we're going to do a call to a third-party API,
which is over here, this node. And then we're finally going to post-process whatever response we're going to get. So the first one is going to be a task, as we were talking about these ones. And then after this, we can use a group in order to run these two tasks in parallel.
Then the result of this group is going to be an array, because the group is going to wait until each one of these tasks are going to finish to go to the next step. Then this array of results are going to be sent to this other task, which is going to be our step three.
And this one, which is basically a callback, is going to post all of this data into this third-party API. And then this response is going to be fed into this last step, which is our last callback. And all of these, we have to put into a chain. That way, we can go from step to step.
So let's take a look at it in code. This is basically how it looks. And as we can see, Celery gives us different tools, all the different tools that we're talking about, in order to do this, and it's very easy to read and it's very easy to write, too. So first, as we can see here, we have our step one.
Then we have this step two, which is the group that we're running in parallel. Then we have our step three, which is where we're posting to our third-party API. And then we're post-processing the result from this external API. Now, this is only a very simple example
with no errors, and we're assuming that everything is perfect in the world, which is not. So let's try and figure out how can we handle errors. There are different ways, and we can handle it in different levels from within the workflow. One thing that we can do is that for every error,
we can just retry one task. Or the other thing is that we can set different callback errors, and instead of retrying the task, we probably switch another, I mean, flip another switch or maybe do something else, right? It depends on what kind of error it is. So if we want to retry the one task,
then the only thing is that we can only do it in the task level. We cannot do it in the entire group level. We will have to do it in one task. But within one group, if we start retrying one of the tasks that are running parallel, the group is, well, the entire chain is not going to move to the next step until every single one of the tasks in a group finishes.
That means even if one of the tasks is retrying and retrying. This is one of the ways that we can retry within a task. And it is a little bit manual. There is another way that we can do it, which is setting an auto-retry and specify
the different exceptions that we can catch. But in my, well, historically, actually, in the different things that we have implemented, we've seen that it's easier to put it this way because it's more explicit what we're actually trying to do. The other thing that we can do
is that we can set different callbacks. We can set a callback on a task level, so whenever this step is going to fail, then we're going to fire off this callback, which this is one of the ways that we can do it. As we can see, it's the same code that we had before, but we're adding a dot on error, and then the different, the handler.
Then we, or we can also do a callback just on one of the tasks on a group, or even we can do it on the entire group itself. So for instance, if we want to do it on the entire group, we can also use this link underscore error, which is what we were looking at before. Or the other thing is that we can also add a callback error for the entire chain itself.
It depends, it really depends on a lot of different things and it's a decision on whatever it is that each team is implementing. This is a way to add an error callback after, or if anything goes wrong in a chain.
The other thing to take a look at here is how can we handle different results. So we have our entire, our entire workflow, but we want to check, we want to go back, we want to check what were the results of each one of these tasks. So the good thing is that Celery gives a specific task ID,
which is unique to each one of the tasks, and we can grab that task and use this class, which is called AsyncResult, in order to grab the result of that specific task. And there's different ways to do it. One way to do it is to use the AsyncResult class,
and we have to give it a task ID, then we have to specify which backend that we're using. Usually it's going to be Redis, or it could also be a database, any other database, or it could be, it could also be a RabbitMQ, but that depends also on your needs. And you also have to specify which Celery app are you using
because of the different configuration that you can set on each one of the apps. The other way to do it is to use the AsyncResult class that comes from the task object, and that way we only have to give it a task ID, and we don't have to specify the backend or the app because the task object is already bound to a specific app
which has configured the backend correctly. So one way to do it is to grab that and then to use the specific task ID in order to get the result. Now these are different things that we can use from the result object, different attributes,
in order to analyze it. So the first one is how we can get the value of the actual result, then if we're within a workflow like this one or within a chain, we can go up and down in the chain or in the workflow by using .children and .parent. Now the only thing is that this workflow is very interesting
because the second step is actually a group result. It's not an AsyncTask. So the class that we're going to have to use here is called alsoGroupResult, which is basically an array of AsyncResults. And here it works a little bit different, but most of it, it's basically the same thing.
So for both classes, we're going to have these different methods. The first one is to check if the result is ready because all of this is running asynchronously and we don't know when is it going to end. So we can continuously check if a result is going to be ready. We can also check if it has been successful.
It's either going to be one task or if it's a group result, then it's going to check if every one of the results within that group were successful. Then whenever we have an AsyncResult, we can get the result, which is the method that we can use to get the same thing as we were doing
when we were accessing result.result. Then whenever we have a group result, we can use .join, which what it does is that it loops through each one of the AsyncResults that is inside one group and it's going to wait for each one
of those AsyncResults to be ready and then it's going to return an array of each one of those results. And that was pretty much all the different ways that we can create and manage different workflows in Celery. This is where you can contact me if you have any questions or outside in the halls.
Also, Celery priority is going to be in sprints, so drop by to pick up some stickers and or pins. And we also have some stickers and pins right here in the front if you guys want to pick up some. Thank you, Josue.
Hey, in your experience, which processing backend do you prefer, like RabbitMQ versus Redis or whatever? Well, in my experience, it's usually better to use RabbitMQ because it supports more things and it's more robust.
And the only thing is that whenever we're doing, like for instance, this type of workflows, Celery has to send different messages. You're not going to see those, right? But your backend is going to see it. So if it's an actual implementation of AMQP, then it's going to be more robust and it's going to handle these multiple messages better. Thanks for the talk. We use some workflows in our work
and we had some bugs like things weren't being called because of some, like even though we are calling retry or like messages being lost, even though the configuration kind of handled that. So do you know if this is well tested
inside of Celery or did you have any problems like that because we had many bugs on workflows and kind of right now, we are trying not to build so complex workflows for better debugging at production. Do you have any problems like that? Well, yeah, it's always better to keep your workflows
as simple as possible. And we do test a lot of these different things within the Celery project. But there are some bugs around it. I mean, these are very complicated things to implement. So yeah, if you have some bugs, I do recommend you to submit some issues. We're always taking a look at those
even though if we take time. Also, if you want to drop by the springs and help out, that would be awesome. Can I invoke coroutines from within Celery? Is that possible? Can you what, sorry? Coroutines, like async IO things. Oh, well, yes, there is a way to do it,
but it's not fully supported by Celery right now because it doesn't support all the way to 3.7. We're working on that and hopefully, well, soon we're going to do that release that's going to fully support 3.7. Okay, just one more thing. If you guys are going to drop by the springs, we have a lot of different beginner tasks
and also advanced tasks. So feel free to drop by and to grab some swag.