We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Data pipelines with Celery: modular, signal-driven and manageable

00:00

Formal Metadata

Title
Data pipelines with Celery: modular, signal-driven and manageable
Title of Series
Number of Parts
131
Author
Contributors
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Writing pipelines for processing large datasets has its challenges – processing data within an acceptable time frame, dealing with unreliable and rate-limited APIs, and unexpected failures that can cause data incompleteness. In this talk we’ll discuss how to design & implement modular, efficient, and manageable workflows with Celery, Redis, and signal-based triggering. We’ll begin by exploring the motivation behind segmenting pipelines into smaller, more manageable ones. The segmentation simplifies development, enhances fault tolerance, and improves modularity, making it easier to test and debug each component. By leveraging Redis as a data store and Celery’s signals, we introduce self-triggering (or looped) pipelines that efficiently manage data batches within API rate limits and system resource constraints. We will look at an example of how we did things in the past using periodic tasks and how this new approach, instead, simplifies and increases our data throughput and completeness. Additionally, this facilitates triggering pipelines with secondary benefits, such as persisting and reporting results, which allows analysis and insight into the processed data. This can help us tackle inaccuracies and optimise data handling in budget-sensitive environments. The talk offers the attendees a perspective on designing data pipelines in Celery that they may have not seen before. We will share the techniques for implementing more effective and maintainable data pipelines in their own projects.
Modul <Datentyp>RootCASE <Informatik>Data managementModul <Datentyp>Computer animation
SoftwareDigital signalSoftware engineeringStructural equation modelingSystem programmingIntegrated development environmentSoftware frameworkHookingCurveTask (computing)Complex (psychology)Software maintenanceCodeFunctional (mathematics)Physical systemWritingScaling (geometry)Cartesian coordinate systemMessage passingProcess (computing)Series (mathematics)ScalabilityLimit (category theory)Multiplication signSoftware developerSoftware engineeringScheduling (computing)Queue (abstract data type)DigitizingForm (programming)ChainService-oriented architectureRadiusResultantInformation engineeringTelecommunicationLogicCompilation albumAdditionCycle (graph theory)INTEGRALPresentation of a groupComputer architectureVideo gameComputer animation
Process (computing)Maxima and minimaSet (mathematics)Multiplication signUtility softwareComplex (psychology)Function (mathematics)Data storage deviceModul <Datentyp>Software developerVariable (mathematics)Task (computing)Different (Kate Ryan album)Scheduling (computing)State of matterWechselseitiger AusschlussCycle (graph theory)Video gameRow (database)2 (number)Metric systemLimit (category theory)Structural loadSynchronizationStapeldatei1 (number)International Date LineLatent heatHookingEvent horizonSoftware maintenanceProcess (computing)LogicPoint (geometry)ResultantData managementComputer animation
Event horizonPoint (geometry)Task (computing)Latent heatHookingBasis <Mathematik>Task (computing)Computer animation
DatabaseTable (information)ResultantDatabaseData storage deviceStructural loadProcess (computing)Task (computing)StapeldateiQuery languageMultiplication signPoint (geometry)LoginCASE <Informatik>Beat (acoustics)WeightComputer fileSingle-precision floating-point formatComputer animation
Commercial Orbital Transportation ServicesQuery languageQuantum stateValue-added networkOrdinary differential equationTask (computing)Table (information)Frame problemPoint (geometry)State of matterMultiplicationFunctional (mathematics)Task (computing)Query languageDifferent (Kate Ryan album)Data storage device2 (number)Computer animation
Limit (category theory)Task (computing)CASE <Informatik>2 (number)Parameter (computer programming)AdditionLoop (music)Row (database)Multiplication signBit rateStructural loadData storage deviceComputer animation
Computer engineeringReal-time operating systemLoop (music)Row (database)RoutingCASE <Informatik>Parameter (computer programming)IterationTask (computing)Limit (category theory)RootCountingSystem callLine (geometry)Computer animation
ChainBroadcast programmingTask (computing)FrequencyProgrammschleifeLoginCASE <Informatik>Loop (music)Task (computing)StapeldateiChainProgrammschleifeSoftware developerScheduling (computing)CodeError messageComputer animation
Event horizonContent (media)QR codePresentation of a groupError messageMultilaterationComputer animation
Successive over-relaxationOrdinary differential equationWeb pageRoundness (object)CuboidTask (computing)BijectionRow (database)HookingArithmetic meanChainCycle (graph theory)Video gameComputer animationLecture/ConferenceMeeting/Interview
Transcript: English(auto-generated)
So, the title of the talk is data pipelines with Celery, modular, signal-driven and manageable. What we will be talking about today are some of the pros and cons and use cases where we used Celery for data pipelining, for collecting, transforming and storing data.
So who am I? I'm a software engineer at Seek & Hit, which is a digital marketing and software engineering agency. We are also a Kiwi.com vendor, who is one of the sponsors of this EuroPython.
As for my work, I'm a member of MarTech, which is marketing technology. If someone doesn't know, MarTech automation team, we collaborate with Kiwi.com, SEO, SEM and other teams,
which basically means that we have a lot of data pipelines and a lot of background processes that need to be run on a regular schedule. My own personal interests include scalable systems and data engineering.
And with that, let's start with what do we mean by data pipelines in Celery. So when we talk about data pipelines, we mean a series of processes for collecting, transforming and storing data.
And Celery, I'm sure most of you at least have heard of it, is a distributed task queue framework. Now, if we visualize a data pipeline in its most basic form, it would look something like this, where we have three steps that are connected in a series, where we collect, transform and export
or store some data. When we are talking about data pipelines in Celery, we are talking actually about basically chains, which means that we chain tasks into a series that executes some work.
And here's a small code example, if you can see it, of how we define it. So let's start with the pros and cons of using Celery for data pipelining. So the first will go through the pros.
So basically, Celery allows us scalability, so we can add more worker nodes when needed. We have a signal-driven architecture, as we will see later in the presentation, that allows us to actually hook into the lifecycle of tasks and, in more general,
in the lifecycle of the pipeline, to perform some additional work. Integration, so it can be integrated with other environments. The most basic example is with Django.
It's easy to use and flexible. Basically, if you know how to write Python functions, a Celery task can, like, encompass whatever a Python function can do, and it encompasses the same logic, so the learning curve is a little smoother than some other data pipeline frameworks.
As for the cons, so there is a setup and maintenance complexity. So as your application scales more and more and becomes larger,
it's more difficult to set up it and maintain it. The issue is dependency on message brokers, which adds some infrastructure costs. So at our team, we use Redis all of the time. Someone else might prefer RabbitMQ or something else.
So since we have this step of using message brokers, like, to communicate, we introduce latency, so worker-broker communication. And there are limited data pipeline features, of course, because Celery is, like,
general purpose task messaging queue, so some of the features require custom development. So with that said, what are the challenges in data processing?
So what are the things that we need to take care about? First, it's idempotency, which basically means that restarting the pipeline or the task should produce the same result, no matter how many times the task is restarted.
We need to, the challenge is also to efficiently handle large data sets, like, we need to be able to process the data within a reasonable time. And we need to be able to handle API limits and ensure the maximum utilization
of API requests within those allocated limits. Isolate failures, so we need to be able to manage and handle failures at each step.
There are multiple times when we, like, when we would want to make an API request or store some data, we first need to load it from Redis, then we need to, like, if anything fails, we need to return it so that we know where we left off
to continue processing, and these failures can happen. And also, we should, like, isolate all of the work that needs to be done that's not related to the main pipeline output.
And of course, modularity, we want to break down complex tasks and complex pipelines into more manageable ones, ones that are self-contained. So, how we used to do things?
For example, gathering keyword metrics for an API for one and a half million rows. So, let's be real, this is not exactly that large of a data set. It's maybe medium-sized. But how would we do it? We would, like, load some data to Redis first using the first pipeline.
Then the second pipeline would periodically load a batch of rows and make some API requests for that batch. We would get back some results from the API, and we would store the new data
again to Redis, and then we would have a fourth pipeline periodically load a batch of the new data and make, again, API requests. So, we're basically having four pipelines for something that should be much simpler.
The issues were that we had processing idleness, that we weren't processing data as fast as we could. So, for example, we would schedule the pipeline to run every two minutes, but in reality, we would only need, like, for example, 80 seconds to process everything.
And then after 80 seconds, we would be able to make another API request, but we weren't utilizing it. We were, like, we were wasting 40 seconds on each run,
which meant that those 40 seconds would accumulate, and we weren't processing all of the data as we should. Then another issue was between pipeline resource synchronization. So, when you have, like, pipelines that run over multiple days and that are
constantly being scheduled and additionally are interconnected by some logic, then you need to introduce some mutex variable to sync's resource access.
And this may be the worst thing for me personally. This also, we had increased maintenance complexity because developers needed to carefully sync the schedules between the pipelines. They needed to think when the first pipeline will run, the second, third, fourth,
and they would also need to know about the mutex variable, the states that the variable can be in. So, this was really a pain. And, of course, all this increases development time.
So, we decided to go up with a new approach where we basically utilize celery signals to hook up other pipelines or tasks at different points of a pipeline's execution.
Yeah, basically what I just said, hook in a specific lifecycle event of a task or a step in a pipeline. And not sure if you saw, if you used celery signals, but this is an example of how you can set a basic hello world signal.
So, when the hello task is finished, when it's completed correctly, then the post run trigger is called and it triggers the world task. So, that's basically the basis for our new approach.
I'm going to go through a few of the use cases that I have prepared. So, in this first use case, we are making API requests based on data from a database.
So, basically we have an analyst, a stakeholder that prepares a query for us and then we need to run that to gather the data and based on that data, we need to make some API requests. Just for note that we're using BigQuery.
Now, the issue with this use case is that the API requests had an impact on net revenue, which meant that we needed to be careful. So, what we did, we decided that, okay, you want us to make those requests
based on that data that you provide us, but what if something goes wrong in the future? So, what we wanted to do is we wanted to store the results of the query and we wanted to store all of the failed API requests,
so that at any point in time in the future, we can check out what went wrong. So, we need to send the API requests in batches. That's our main goal, but what we proposed is not something that's for the stakeholders
or for the business or the main goal of the pipeline. So, it's basically a side job to store the historical logs and the log of failed requests. So, if we stick to our old approach, then the pipelines would look something like this.
We would have two pipelines, one that flushes the Redis, reads the SQL file, loads and stores the data that was gathered from Redis, and then we would have a periodic task that is run every, let's say, two or five minutes, load batch from Redis, make API requests or failed requests,
and at the end, signal that it's done. With the new approach, we broke it down to something like this. So, basically, the first pipeline, the flush Redis, RedSQL, and load and store to Redis remains the same, but now using Celery signals and passing the job ID,
we trigger an additional task that stores the query results to a table, and also, we use the signal to trigger the next pipeline. So, the next pipeline will load some data in a batch from Redis, make an API request,
and then we have another signal to store the failed API requests. So, basically, if anything goes wrong in the store failed requests or store query results to table, then this won't affect the main pipeline.
And when we're done with a single batch for the, let's say, first API request, the end task sends a signal that will restart the pipeline again to load the next batch. So, we're not wasting time waiting for the Celery beat or waiting every two minutes for the next run.
I'm not sure how much sense it makes to have this here, but I just wanted to give a quick overview, like how we do it. We get to the table by name, we have a delete from the destination table query,
and we have some transform column function, and at the end, we upload the data frame to the table. So, here in this point, I just wanted to point out, I already mentioned that one of the challenges
is that we need to make sure that the tasks are idempotent, and one of the ways we're doing it here is if the task is restarted, then this query delete from the table will delete based on the task ID that was stored.
As for the signal function, so store failed requests, we have these two post run connects just to show that you can connect different tasks, multiple tasks, to the same handler.
And also, you can do some state checks and check if there is any data to run the next pipeline or not, so the second use case, generating text using an LLM API,
so LLMs are like everything today, and of course, we also wanted to use them, but of course, the API has one request per minute limit, so it's a limit that we need to take into account.
How did we do it? So, we want to respect the API rate limit, and we want to separate the API requests and text uploads, so we want to have one data pipeline that will keep sending API requests to the LLM API
to get the text back, and we want that pipeline to store that data somewhere, and then we have another pipeline that will just upload, dump the text wherever we need them.
So, how did we do it? So, the upper pipeline, it checks whether any prompts were inputted, loads them, makes the API requests, and waits for the API time limit to end. Once the API time limit ended, we go to the end task and re-trigger the pipeline from the start,
and also trigger the other pipeline to upload the text to some bucket or whatever destination that we want to upload them to.
Okay, and this third case, for me, it's basically a fun case. So, we have this internal API that also, of course, has some limits, but this one has some daily limits. It has a limit of 10 requests per day, and it has a limit of, I think, 1,000 rows per day,
so not much data, but it was a fun little experiment where we kind of, again, made the pipeline basically as a for loop.
So, we load some data from Redis, we make an API request to the internal API, and we finish task. Then, in the handler, we do some additional checks and calculate the parameters for the next pipeline run. So, basically, when the pipeline finishes, we check whether the task finished successfully,
we get the iteration count, we get the routes per request limit, daily request limit, and we calculate how many rows the next run of the pipeline will load from Redis.
And then we send these parameters to the next pipeline call, to the next pipeline run in the last line. Hopefully, you can see it.
That's why I mentioned that this is a fun case where we basically created a for loop using these Celery signals. So, consider using signals if you want to break apart large chains,
and you can also check whether the next Celery chain or the next tasks should execute. Consider them if you seem to be scheduling too many periodic tasks, so if your developers are taking more than five minutes to decide when this should be scheduled,
then something might be off. And you have some kind of work that needs to be done, as we saw in the first use case, where you want to store some historical logs.
So, you don't want that secondary work to interfere with the main pipeline, with the main goal of it. If you want to try to avoid some loops in your tasks, so basically, here the pipeline is triggering itself. Some of these, some of the use cases may have been modified in a way that you actually have some nested for loops to send the data in batches.
But from my experience, what I found is that that makes the code less readable,
and it takes much more effort to actually debug and find the issues when they occur and handle the errors. Thank you. Just allow me to thank kivy.com for their support in preparing this presentation.
If you want to follow kivy's tech content, you can scan the QR code or you can check out the booth. Also, just to mention, if anyone wants to talk to me, I'll be at the booth later. And if you want to get in contact, here are my contact details. Thank you.
Yeah, thank you a lot for this very nice talk. Q&A wise, we would ask you to queue up on that microphone in the middle of the rows.
So, just feel free to come up and queue up and ask your questions. We've got perfectly five minutes left for the Q&A. Hi, thanks for the talk.
One question is that you, from what I understand, you can trigger multiple tasks with one task. So, like a one-to-one relationship. Is there any way to trigger multiple tasks?
I guess so, but is there also a way to have one task triggered if three other tasks, for example, are completed? So, like a one-to-one relationship? Okay, so basically, whenever a task is finished, the lifecycle signals are connected to a single task.
So, what you could do is to add a task to the end of your three tasks that you want to execute
and then hook the signal to that one task. So, basically, I don't think, I mean, Celery doesn't support it out of the box that three tasks trigger one. But you can have three, like in a group, and then have another task chained after it.
And when that task is finished, trigger something else. Thank you. If there is no further questions, I would ask you to give another warm round of applause for Marin.