We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

ETL pipeline to achieve reliability at scale

00:00

Formal Metadata

Title
ETL pipeline to achieve reliability at scale
Title of Series
Number of Parts
132
Author
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
In an online betting exchange, thousands of money related transactions are generated per minute. This data flow transforms a common and, in general, tedious task such as accounting into an interesting big data engineering problem. At Smarkets, accounting reports serve two main purposes: housekeeping of our financial operations and documentation for the relevant regulation authorities. In both cases, reliability and accuracy are crucial in the final result. The fact that these reports are generated daily, the need to cope with failure when retrieving data from previous days, and the fast growing transaction volume obsoleted the original accounting system and required a new pipeline that could scale. This talk presents the ETL pipeline designed to meet the constraints highlighted above, and explains the motivations behind the tech stack chosen for the job, which includes Python3, Luigi and Spark among others. These topics will be covered by describing the main technical problems solved with our design: - Fault tolerance and reliability, i.e ability to identify faulty steps and only rerun those instead of the whole pipeline. - Fast input/output. - Fast computations.
35
74
Thumbnail
11:59
SoftwareIntelScale (map)MIDIMetropolitan area networkDatabase transactionError messageExpert systemPhysical systemData storage deviceProcess (computing)TelecommunicationComputer hardwareStatisticsModul <Datentyp>Control flowTask (computing)Function (mathematics)outputSource codeUser interfaceData compressionSoftware maintenanceLibrary (computing)ScalabilityCache (computing)Vertex (graph theory)Device driverAsynchronous Transfer ModeDisintegrationInterface (computing)Logical constantWrapper (data mining)Event horizonInformationScheduling (computing)Task (computing)DiagramTraffic reportingPoint (geometry)Electric generatorLevel (video gaming)Process (computing)TelecommunicationMiniDiscCASE <Informatik>Instance (computer science)Modul <Datentyp>Library (computing)Medical imagingConfiguration spaceCentralizer and normalizerMathematicsAreaComputer fileOperator (mathematics)ResultantShared memoryDifferent (Kate Ryan album)GUI widgetSource codeGoodness of fitPresentation of a groupMultiplication signNumberWrapper (data mining)Term (mathematics)Software frameworkSet (mathematics)Cartesian coordinate systemStatisticsDevice driverLoginTransformation (genetics)Pairwise comparisonData storage deviceGroup actionGene clusterType theoryMultiplicationEvent horizonFunctional (mathematics)Real numberPiFile formatRow (database)Order (biology)Semiconductor memoryPartition (number theory)Lattice (order)Social classStack (abstract data type)Operating systemCuboidElectronic data processingVirtual machineScheduling (computing)File systemPattern languageMassDatabase transactionCore dumpGame controllerTotal S.A.CommutatorUser interfaceOpen sourceScripting languageSPARCContext awarenessGraph (mathematics)Error messageFormal grammarPlanningVolume (thermodynamics)Decision theoryPhysical systemQuicksortAlgorithmData managementSoftware maintenanceStapeldateiHigh availabilityFlow separationIntegrated development environmentStructural loadBinary fileData compressionCompressibility factorWater vaporFault-tolerant systemImage resolutionTravelling salesman problemConnectivity (graph theory)Greedy algorithmGraph coloringRevision controlFundamental unitFunction (mathematics)Insertion lossCodeStreaming mediaoutputScalabilitySoftwareRight angleBitNeuroinformatikCoefficient of determinationTesselationAngleSinc functionVideo gameFrame problemIntrusion detection systemOrder of magnitudeAnalytic setObject (grammar)Execution unitArithmetic meanRegulator geneMathematical optimizationLaptopCoordinate systemComputer architecture1 (number)Local ringReading (process)WritingCausalitySoftware engineeringMereologyCodierung <Programmierung>Windows RegistryParallel portOpen setInternet service providerUsabilityComputer animation
Transcript: English(auto-generated)
I'm a software engineer at Smarkets and today I'm going to talk about the ETL pipeline that we built to generate the accounting reports. I'm going to focus on the tech stack and the reasoning behind the technologies that we chose. Smarkets is an online betting exchange where people can place bets on different
events, mainly sports, but we also support other kinds of events such as political elections. In the exchange, many money-related transactions are generated. Those include deposits, withdrawals, orders to place a bet, cancel a bet, and so on.
All of these transactions need to be processed to generate the accounting reports, which include accounting statistics such as the total amount of money that somebody can have placed in bets over a month, the total amount of deposits, withdrawals, and so on.
These reports serve two main purposes. The first one is that they allow us to have control over the money that comes to Smarkets and secondly, they provide documentation for the relevant regulators so that they can know how we handle money at Smarkets.
The previous accounting pipeline was designed back in 2013 and at that point, the number of transactions that it needed to handle, it was below 190,000. The massive business growth at Smarkets during the last four years made this number
of transactions increase over an order of magnitude and now the number of transactions that the pipeline needs to process is more than 8.8 million. The previous pipeline was not able to handle this number of transactions. The main problem of the pipeline was that it was a collection of scripts without any
formal dependency definition between them and it was creating two issues. First, it was difficult to identify errors and secondly, even if you manage to identify where the error was coming from, it was difficult to know which steps of the pipeline need to be rerun in order to generate again the accounting reports.
Apart from that, the system was really slow. We are supposed to generate the accounting reports daily and it was taking more than 24 hours to run and finally, this pipeline was used as persistent storage, a volume mounted into the host running the pipeline. This volume was quite expensive and needed requirement in order to ensure that it was
not low on disk. At this point, we decided that the best solution was to redesign the whole pipeline. This diagram represents the main tasks that this pipeline needs to do. First, we need to fetch the transactions from this change and we need to generate
transaction files with those transactions. Afterwards, we need to process these transaction files to compute the daily and monthly account statistics and finally, using these account statistics, we need to generate the final accounting reports. The main requirements of the pipeline are fault tolerance and reliability.
If something goes wrong, we need to be aware of it and we should fix it quickly by running those steps of the pipeline that are affected by the issue. In terms of storage, we need fast reads and writes, high availability, high durability
and the storage should be cheap. We also need good processing performance. It shouldn't take more than a couple of hours to generate the accounting reports. Finally, we need the pipeline to be scalable. The number of transactions at this market continues to grow and we don't want to have to redesign the whole pipeline anytime soon.
In the rest of the presentation, I'm going to describe the design decisions that we made to meet these requirements and also the technologies that we chose. The accounting pipeline involves fairly long batch jobs and things can go wrong while they are running.
In particular, in our case, the communication with exchange to fetch the transactions may fail. In order to provide fault tolerance and reliability in this scenario, we made the following design decisions. We store the transactions per day and we also compute the financial stats per day. So if something goes wrong on a particular day, we just need to recompute the financial
stats for that day and not for the whole month. Sometimes things go wrong in the exchange and this creates some problems and we get missing transactions or other sort of data corruption. In order to reduce the impact of these issues on the accounting pipeline, we always
compute the stats for the last two days worth of transactions. And finally, we broke down the pipeline into modular Luigi tasks. Luigi is a Python library that allows you to define dependencies between tasks and it handles the dependency resolution for you. By breaking down the pipeline into Luigi tasks, it's really easy to identify when
things go wrong and which steps of the pipeline are affected and only run those steps instead of the whole pipeline to generate again the accounting reports. This is a simplified version of a task that we have in the pipeline, which basically generates a human readable report with account statistics.
A Luigi task is a Python class that in general defines three methods. The requires method allows us to declare all the dependencies of the task. In this case, the generateHumanReadableAccountingReport task depends on the output of
another Luigi task that generates a file with account statistics but in a binary format. The run method is where the processing takes place. In this case, reading the input file with the account statistics and generating the TSP file with those stats. And the output method allows us to define the target of the task.
In this case, the report that we want to generate. This graph is a simplification of the dependency graph generated by the Luigi Central Scheduler. The node in the top represents the task that we trigger.
In this case, will be the generateHumanReadableAccountingReport. And below it, you can see all the levels of dependencies. So generateHumanReadableAccountingReport depends on the output of generateAccountingReport, which in turn depends on the output of many generateAccountingMonthlyStats tasks.
The color of the nodes indicates the status of the task. Yellow means pending, blue means running, and green means completed. The next requirement that we wanted to achieve was efficient storage. In order to meet this requirement, we focused on two aspects. The format of the files generated by the pipeline and also where to store all these
files. Regarding the format, instead of going for a conventional row-based format like a TSP or a CSP, we decided to use the columnar format per ket. The difference between a row-based and a columnar format is the way data is stored
in disk. In a row-based format, the values of the rows are stored sequentially in disk. And this is a good idea if our access pattern consists of accessing the values of particular records. Yeah, the values of particular records.
On the contrary, in a columnar format, the values of the columns are stored sequentially in the disk. And this offers a good performance for analytical tasks like the ones in this pipeline. Since it allows us to fetch only those columns that need to be processed instead
of having to load all the file in memory. And this minimizes the amount of IO. Apart from that, since data of the same type is stored together, type-specific encodings can be used. And also, general compression algorithms work better, which maximize the compression
factor of these files, and also minimizes the amount of IO. A part can be load into Pandas data frames, and is also supported by all the Hadoop environment.
In terms of persistent storage, we decided to go with Amazon S3, since it provides all the requirements that we were looking for. It provides high durability. For regulation purposes, we need to keep the accounting reports for several years. So high durability is very important for us.
High availability, we should be able to access the reports whenever we want. Low maintenance, we don't need to care about being low on disk or, yeah, doesn't really require much maintenance. Amazon S3 is quite cheap. It also allows us to decouple the processing from the storage.
And what this means is that we can choose the instances of the pipeline based on our processing needs, instead of having to worry about high disk requirements, since all the data that we actually want to persist can be stored in S3. It can be accessed from Python using the libraries Boto or Boto3. And it comes with a nice web interface where you can check all the data
that you've stored. The next requirement that we wanted to meet was good processing performance. We wanted fast data processing, and we also wanted an engine that was able to scale. That's why we decided to go with Spark. Spark is a general purpose data processing engine.
And what Spark does is it breaks down the processing jobs into tasks and identifies those tasks that can be run in parallel on different data partitions. And it builds its own execution plans. By doing so, Spark can do a lot of processing in parallel.
Another feature that allows Spark to be really fast is that it keeps data in memory when possible instead of storing intermediate results in disk. And Spark comes with Python support through the PySpark library.
At the core of Spark, we have the RDDs, which are the fundamental unit of data in Spark. RDDs are resilient because they are immutable, and they are fault tolerant. They are also distributed because they are partitioned across multiple nodes in the Spark cluster.
And they are a data set because they hold data. There are two kinds of operations that can be applied on RDDs, transformations and actions. A transformation applies a function on the RDD and creates a new RDD. Examples of transformations are map, filter, aggregate.
Actions, on the other hand, return a final result or write data to external storage. Transformations in Spark are lazy. They are not executed right after they are called. But the transformation itself is safe and a reference to the data
that it modifies is also safe. And this is called the Spark lineage. And this allows Spark to be very efficient and also fault tolerant. These transformations are only executed when an action is triggered.
In our accounting pipeline, in our data processing pipeline, we didn't use RDDs directly. But we decided to work with Spark data frames. Spark data frames are units of data organized in columns and built on top of RDDs.
But their performance is better than the RDD's performance since optimizations are applied before the actual operations are executed. And also the data frames API is more user friendly than the RDDs one.
I'm going to explain how a Spark application runs. Spark follows a master slave architecture with a central coordinator called the driver and several workers, distributed workers, called executors. The driver instantiates the Spark context, which is in charge of breaking down the processing
job into tasks and creating the execution plans. Once Spark has these execution plans, the task scheduler within the Spark context is going to ask the cluster manager for executors to run these tasks. Spark has its own cluster manager
and it also supports other cluster managers such as Hadoop Yarn. Spark jobs can be triggered from Luigi. Luigi comes with the PySpark task that can be extended to create custom Spark jobs.
In this case, all the Spark operations can be defined in the main method of the class. So in here, for example, in this task, we are creating a report with accounting statistics for a particular account by filtering out the rest of the account IDs.
The final requirement that we wanted to achieve was scalability. In order to meet this requirement, instead of having Spark running on a single node together with the rest of the pipeline components, we wanted to configure Spark to run on a multi-node cluster.
Instead of configuring our own cluster, we decided to use Spark on EMR. EMR provides fast deployments. It takes around 10 minutes to provision a cluster. It is quite easy to use. Once you know the types of instances that you want for your pipeline and your software
requirements, doing the configuration is quite easy. It is really flexible. It allows you to choose among many different kinds of instances, frameworks to install, and you can even install external software. It comes with the EMR file system, which integrates with S3. So all the logs of the cluster together with the data generated by the cluster can be stored in S3.
The cluster can be shut down once the processing job is done without any data, any real data loss costs. All the data that you want to persist can be stored in S3. The cost of running the cluster is quite low, and you only pay while the cluster is running.
It comes with a nice web interface where you can check the configuration of the cluster, the task that you've run in the cluster, the logs, and so on. I'm going to explain a little bit how Spark runs on EMR.
EMR has two kinds of nodes, a master node and several slave nodes. The master node distributes the data and the tasks across the rest of the nodes, checks the status of the cluster and also the status of the tasks running on the cluster.
And the slave nodes are in charge of running the tasks and also storing the data on the file system of the cluster. EMR uses YARN as the resource manager to allocate resources to the tasks submitted to EMR, to the cluster to run.
When we submit a Spark application to EMR, the first thing that this application is going to be running on several YARN containers in the slave nodes. The first thing that is going to happen is that the Spark driver is going to instantiate the Spark context.
And this Spark context is going to create the execution plan. Once we have this execution plan, the Spark context is going to ask the YARN resource manager for executors to run these tasks. The Spark executors which are running on different YARN containers are going to register with the Spark context.
And this Spark context can then start sending tasks for execution to the Spark executors. To finish, I'm going to summarize the main steps involved in the accounting pipeline.
First, we use Jenkins to trigger the accounting job. The reason behind it is that Jenkins allows us to schedule the job so that it runs daily, and it's also a central place for us to have where we have most of our batch jobs, so it allows us to easily monitor if they are all fine.
The first thing that this accounting job is going to do is creating the accounting container with the latest image from the registry. And once this container is up and running, the Luigi Central Scheduler will be started. Then, a Luigi task that has as requirements
all the Luigi tasks that generate final reports is going to be triggered. The first Luigi task that needs Spark for processing is going to create the Spark cluster on EMR. And once this Spark cluster is up and running, all the Luigi tasks that need Spark processing
are going to submit Spark applications to EMR. Once the last Luigi task that needs Spark is done, the Spark cluster will be destroyed. Finally, all the data generated by the pipeline together with all the reports can be found in S3.
This is the end of my talk. Thank you very much for coming. Thanks very much. Are there any questions from the audience?
Thanks for the talk. You said you use Luigi. Did you do any comparison with the Airflow? No, we don't use, we use Luigi mainly at markets. I didn't use Airflow before.
Okay, but there's no like pros and cons. Not really, I've never used myself Airflow before. Yeah, thank you. So, yeah, I don't know. So, you said you started up a EMR cluster and ran multiple Luigi tasks on it
and then shut it down at the end. It's not, yeah, it's not like that. We run the Luigi tasks within a node and then the Luigi tasks submit steps to the EMR cluster. We don't run the Luigi task on the cluster. Okay, I was wondering if you had a way
for Luigi tasks running outside the cluster to submit jobs to the cluster. Yeah, so we had to create our own PySpark task. Okay, cool. I mean, I can show you. Is it open source? It's, no, it's not open source. I mean, I can show you the code.
It's not that hard. Okay, cool, thanks. You mentioned that you save data in Parquet files which are in S3.
And, I don't know, here was another great talk which mentioned that they were also using Parquet files and saving them in Azure and there was shown some wrapper which allows to access these Parquet files stored in Azure from local computer like any other file object. Is that even possible with S3?
Can I work with that files in S3 from my laptop? So, Parquet files are usually folders, right? They are not just a unique file. So, when we are working in S3, since we are within the same file system, you can access them easily with Voto
and read like with Pandas or with Spark. You don't have to do anything. You just read them with Spark. It comes out of the box. However, if you are in your local machine, then what we do when we want to test it is recursively download the folder
so that we can then, with Spark or with Pandas, read it. Thanks. Maybe if I get one more question? Yeah, sure, go ahead. If you submit those legit jobs from which run in one cluster,
and you mentioned that you started cluster and finished just for that run. Yeah. How do you manage all those tasks run in that one cluster? And the cluster shows up after it's... That's a good question. So, we create... So, the first tasks that needs EMR
is going to be the one in charge of creating the cluster. And then we are not going up until some of the tasks need EMR, we are not going to kill the cluster. We only kill it when the last task
that needs EMR is done. And we do so using a re-event handlers. So, basically we check whether all the tasks that have been scheduled are done.
Thanks. Anyone else? So, usually the thing with Spark is that you have to be very careful about partitioning and out of memory errors and stuff like that. Do you have any insight on the subject or does that park it intermediate steps of that for you
or something like that? Yeah, to be honest, we didn't get any sort of error like that, maybe because we are not handling like huge amounts of data. Okay, thanks.
How many rows? Yeah, a couple of rows. Yeah, in terms, okay, 8.8 million more, twice. Twice that cause we always retrieve the last two days and those are just new transactions to generate the reports.
We also have pre-process data that we need to include in the reports. So, draw transactions, new transactions that we need to generate stats from is around like, yeah, 19 million, more or less.
Thanks. Anyone else? Nope, can't see them. Okay, well, can we thank our speaker again? It was really good work. Thank you.