We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

From built-in concurrency primitives to large scale distributed computing

00:00

Formal Metadata

Title
From built-in concurrency primitives to large scale distributed computing
Title of Series
Number of Parts
131
Author
Contributors
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
This talk is specifically designed for Python developers and data practitioners who wish to deepen their skills in asynchronous code execution, from single CPU applications to complex distributed systems with thousands of cores. We'll provide a detailed exploration and explanation of Python's asynchronous execution models and concurrency primitives, focusing on `Future` and `Executor` interfaces within the `concurrent.futures` module, and the event-driven architecture of `asyncio`. Special attention will be given to the processing of large datasets, a common challenge in data science and engineering. We will start with the fundamental concepts and then explore how they apply to large scale, distributed execution frameworks like Dask or Ray. On step-by-step examples, we aim to demonstrate simple function executions and map-reduce operations. We will illustrate efficient collaboration between different concurrency models. The session will cover the transition to large-scale, distributed execution frameworks, offering practical guidelines for scaling your computations effectively and addressing common hurdles like data serialization in distributed environments. Attendees will leave with a solid understanding of asynchronous code execution underpinnings. This talk will empower you to make informed practical decisions about applying concurrency in your data processing workflows. You will be able to seamlessly integrate new libraries or frameworks into your projects, ensuring optimal development lifecycle, performance and scalability.
Concurrency (computer science)Primitive (album)Scale (map)Queue (abstract data type)CalculationTask (computing)Context awarenessConcurrency (computer science)MultiplicationElectronic data processingQueue (abstract data type)Cycle (graph theory)Computer programmingFunctional (mathematics)Video gameContext awarenessDescriptive statisticsSocial classBefehlsprozessorTask (computing)Object (grammar)ImplementationParallel portModule (mathematics)Scaling (geometry)Multiplication signResultantProcess (computing)CalculationPrimitive (album)CASE <Informatik>Type theoryComputerAnalogyVirtual machinePosition operatorBranch (computer science)Web 2.0MappingIterationOverhead (computing)Level (video gaming)Machine learningThread (computing)Parameter (computer programming)Dependent and independent variablesComputing platform2 (number)Core dumpCodeOperator (mathematics)Self-organizationMaxima and minimaWorkload1 (number)Congruence subgroupBitSystem callInformation engineeringSystems engineeringException handlingFree variables and bound variablesServer (computing)LaptopSupercomputerFunction (mathematics)outputMathematicsError messageState of matterComputer animation
Function (mathematics)Concurrency (computer science)Free variables and bound variablesLetterpress printingResultantFunctional (mathematics)Object (grammar)Electronic mailing listProjective planeError messageLimit (category theory)BitProcess (computing)Module (mathematics)Direction (geometry)Parallel portException handling1 (number)Context awarenessLevel (video gaming)Set (mathematics)Concurrency (computer science)Contrast (vision)Line (geometry)Interpreter (computing)Thread (computing)Extension (kinesiology)Goodness of fitImplementationMultiplication signLoop (music)CASE <Informatik>IntegerLambda calculusTask (computing)Different (Kate Ryan album)Library (computing)Operator (mathematics)CodePoint (geometry)Event horizonRandomizationSerial portData managementCoroutineRandom number generationLipschitz-StetigkeitState of matterNumberDataflowVideo gameCycle (graph theory)CalculationRevision controlGradientInfinityWeightSystems engineeringCongruence subgroupType theoryDrop (liquid)Mathematical optimizationAreaFront and back endsMachine codePoint cloudComputer animation
BefehlsprozessorTask (computing)CodeConcurrency (computer science)CoroutineLoop (music)Event horizonElectronic data processingTask (computing)Process (computing)Virtual machineModule (mathematics)Computer fileSet (mathematics)Semiconductor memoryParameter (computer programming)CalculationParallel portRule of inferenceSystems engineeringCongruence subgroupDecision theoryMathematical optimizationDevice driverPower (physics)Utility softwareBefehlsprozessorClient (computing)CodeScaling (geometry)Concurrency (computer science)CASE <Informatik>ComputerType theoryPhysical systemWeb 2.0Software frameworkINTEGRALMessage passingServer (computing)Exception handlingGraphics processing unitLibrary (computing)MultiplicationResultantExecution unitHard disk driveMultiplication signMoore's lawVirtual memoryStapeldateiGene clusterInformation securityFrame problemRemote procedure callLevel (video gaming)Instance (computer science)Single-precision floating-point formatMereologyChecklistAbstractionDirection (geometry)Thread (computing)Computer animation
Concurrency (computer science)Client (computing)Exception handlingCongruence subgroupQuantum stateSlide ruleFunction (mathematics)Data managementSoftware frameworkINTEGRALRemote procedure callTelecommunicationLevel (video gaming)Functional (mathematics)Interface (computing)ImplementationScaling (geometry)Client (computing)BuildingStack (abstract data type)Gene clusterAxiom of choiceConcurrency (computer science)AuthorizationDifferent (Kate Ryan album)State observerAuthenticationData recoveryBit rateComputer hardwareComputer configurationScheduling (computing)SoftwareBitMultiplication signProcess (computing)Point (geometry)Operator (mathematics)Point cloudTask (computing)Graph (mathematics)ResultantObject (grammar)CalculationSupercomputerCASE <Informatik>Graph (mathematics)Parallel computingRun-time systemPairwise comparisonVirtual machineImage resolutionoutputInstance (computer science)Data storage deviceComputer architectureRun time (program lifecycle phase)WorkloadArithmetic meanDirection (geometry)ComputerQueue (abstract data type)Power (physics)Serial portComputer animation
2 (number)CountingOracleObject-oriented programmingLoginQueue (abstract data type)Concurrency (computer science)Thread (computing)InternetworkingContext awarenessQuantum stateImplementationModule (mathematics)AbstractionLevel (video gaming)Game controllerDifferent (Kate Ryan album)SynchronizationPrimitive (album)Process (computing)Roundness (object)Source codeCongruence subgroupRight angleQuaternion groupLecture/ConferenceComputer animation
Transcript: English(auto-generated)
Many thanks for the introduction. Many thanks to the organizers for bringing this conference, the second time actually, to Prague, my home place. I'm really glad to see you all here. My name is Jakub. I work for a company called Flyer for Hospitality and responsible for a data science
platform. And as the title said, I would like to introduce you what built-in concurrency primitives and Python offers and how to scale to large scale distributed computing.
The content approximately will be an introduction to concurrency parallelism, I will then show Python's built-in primitives for concurrency and then we will go into scaling out and I'll show you some examples how to scale based on what we've learned from the builds in Python.
So let's start. So what concurrency is or what it actually enables you, it can basically enables waiting efficiently or otherwise said, it can enable you doing other things while waiting for results
or some inputs, outputs, could be often it's API responding. So basically imagine you are waiting in multiple queues, for example, at once and that is a concurrency. I wish our country would learn how to operate multiple queues.
This is some ancient picture, not that ancient, a couple decades ago where people waited in long queues without knowing what the queues were for often or what the stock was. So that now it looks funny, it was not funny that time.
So concurrency would also let you organize your work efficiently. For example, if you're accepting some request for work, you can just respond politely, yeah, I will do it. You write a note and then eventually you will do it and respond to the request.
You can efficiently dispatch to multiple queues, multiple workers, and that's what we will show how to do efficiently and how to do it with Python API. Always remember there's some context switching, so it's not a cure for every task to do concurrently.
There's some overhead from context switching. Parallelism is often involved with concurrency, often connected to it, but what it really enables is to execute multiple things at once.
Concurrency does not need parallelism, although very often it's typically desired, and this talk is especially for data processing and machine learning contexts, and there we usually care about both. An example is vectorized CPU operations are parallel.
In our lab, we have multiple cores that operate in parallel, so that's another example. Where do we need concurrency? I mentioned already typically web servers, high performance computing, data engineering, machine learning. These are very broad concepts. Maybe everyone would find their position in each of them.
When to decide where to go concurrency parallelism, I used a very frequent analogy to coffee machines, how concurrency and parallelism can be compared. So with a single machine, you can serve multiple queues, but you still produce one
coffee at a time. So the time to process this is, if you project the time on a vertical scale, it takes longer while parallelism actually brews two coffees at the same time, but you can still make two people happy at the same time with either of these two concepts.
In data processing, we often really care about both. We need our processes to be responsive. That's where basically concurrency is applicable. We also need to process efficiently and fast, and that's where parallelism is mostly used.
So let's take a look into Python build in concurrency primitives. The module that I'm going to talk about most is concurrent futures, which was proposed
in 2009. I think it was introduced in Python 3.5, and it's very concise. There's not too much stuff in there, but it very nicely shows the abstraction, the API, and it also provides implementation for concurrent primitives.
There are other modules, threading, multi-processing, sub-process async I will also cover a bit, but I will not go into details of either of these. So let's start looking into concurrent futures, because it's a kind of nice learning journey,
how to get in touch, even if you're a beginner in this world, how to understand concurrency, especially in a Python context. The first object we are looking at, the first type, is executor. There's a very abstract description in the documentation, but if you break it down, what
it really does, if you look at the API, the methods of this class or this type, we need to create an executor, choose some parameters, we can submit tasks to the executor, we can collect results, wait for those results, and then we eventually shut down the executor.
So that's the life cycle of our concurrent programming or concurrent workloads we do with an executor. So to see these in code examples, to create an executor, there's two particular ones
in Python built-in, thread pool and process pool executor, both can take maximum workers parameter, and I think the names are quite self-explanatory. They either use threads or processes to create workers where tasks are executed.
The next step is we submit the tasks or tasks to the executor. Let's say we need to do some math, a very complex one in this case. And we want to calculate the single result.
Let's use a thread executor now, just as an example, that works with any executor. And we call a submit method, so that's a principle one. And we pass the reference to the function, and we also pass the arguments, the parameters.
Notice that we do not call directly the function here. So basically we let the executor to somehow call, somehow execute the function inside, and the executor decides how, when, where it does it.
We can also use a map method, which is very similar to the built-in map function, and returns an iterable object or generator. So maps obviously do execute or potentially execute the function on a collection,
an iterable object here, we just use a branch. So how do we collect results? We can look at a single result, and then we meet the second, maybe even the most important
type in the concurrent futures, which is called future. So if we look at the result of the previous calculation or the previous submit call, it would show us future, something that shows us a state. In this case it's already finished, but it may not be.
So future is a placeholder, basically a promise of an eventual calculation and result returned. It may end up in not being finished. It may end up in an exception, an error, but it also hopefully ends up in being finished
and returning the result we are interested into. So the result is to be fetched by the result. You can also put a timeout, so the result now is blocking. So when we call result, we are actually waiting in our code in contrast to submit.
So submit would not block the execution of your main, of the caller. It will just return future instantly. We can also request or ask the future object whether it's done.
We can also request to cancel that execution. And we want to collect multiple results. If we use the map, we basically need to iterate over the results, and the generator returned by the map returns always the result one by one as they become available.
There's some shortcoming. If we start with a slow task, it may actually block the faster ones if they are after it. So maybe this is not the optimum way, unless you want all the results at once anyways.
We can also use a bit more advanced and fine-tuned functions in concurrent futures. If we use as completed, we can... So instead of map, for example, we can just get a list of futures by calling submit multiple times,
which I do in this first line. Now if we have multiple futures, we can use as completed for these futures to wait for... to actually get a generator that yields the completed ones as soon as there is any completed.
Another function, which is even probably more powerful and more fine-grained, is wait. We can either wait for the first completed, we can wait for all the completed,
or we can also wait for the first exception. So if it's the first completed, it again gives us timely the first result that is available, and it returns actually two sets of futures, the ones that are done and the ones that are not done yet.
We may at this point decide what to do with these that are not done. We can, for example, cancel them, because we say... Let's say we call four APIs or do four calculations, and then we are just interested in the first, the fastest one, and we just cancel the slow ones.
At the very end, we need to shut down the executor. Typically, we don't do it explicitly. Often it would be just the end of the Python process that created the executor. And maybe if it's not that case, more often one would use the context manager
if we need to manage the lifecycle of the executor, which is very convenient. You don't need to call these methods one by one. What the shutdown really does or should do is to free up any resources.
So if we spawn processes, we would like to end kill those processes if we don't need the executor anymore. One example that may not be obvious why and what happens... If one wants to do with the process executor a list of random numbers,
random integers in this case. So the first one should yield eight random integers from 0 to 100. And what it really does is it always yields the very same number, which is not random.
I mean, first execution, this could be still random, but very low probability. If you execute it next, the next time you would still get these. The explanation behind is rand is not actually a standard pure function.
There is an object behind which maintains its state. And in the process executor, we clone that state, but we don't update it in the work processes. So there are some pitfalls you can fall into if one is not careful.
One limitation for thread pool, and I think it's famous or infamously known, is the global interpret lock or GIL. You may know in the future, not too far future versions,
this may not be a limitation anymore, but it still is. However, we know also probably that GIL can be released by IO operations. So especially for IO operations, thread pool executor is the good. Or with extensions like NumPy, Pandas, TensorFlow that are implemented in C,
that those also release GIL. So if you have NumPy in threads, they can run efficiently in parallel. For process pool, there's a different limitation, which is serialization.
So in process, in executor, we need to pickle. So the concurrent features use the built-in C pickle method module.
So for example, if we want to submit a lambda, the outcome is a pickling error because C pickle cannot pick a lambda. There are other unpickable objects. So we need to always send something that can be serialized.
How can we help? How can we resolve those? There's one very nice project from the joblib world, it's called Loki. It's very similar, basically a drop-in replacement for process pool executor.
But on the back end, it uses Cloud Pickle, so the best. Cloud Pickle is a serialization library that can pickle things like lambdas because it takes a different approach how to serialize code. And there's also DIL besides Cloud Pickle that can do it.
So if process pool executor is what you would like to have and you would be limited by pickling, then there's this drop-in one-to-one replacement that can help.
How concurrent features integrate or work with AsyncIO. So AsyncIO, just a short recap, don't want to go into details, is a cooperative multitasking concept. We use the async-await syntax.
It executes coroutines using an event loop manager, an event loop task switching. So it's a slightly different implementation of concurrency compared to concurrent futures. A bit confusingly, AsyncIO has also a future type,
which is not the same as concurrent futures future. I think the naming makes sense because it really does almost, it basically does the same, just it has different API and because it's so similar, we can convert between concurrent futures future and AsyncIO futures.
Either by using AsyncIO wrap future, that's one way to convert. Or maybe even more often, we would use loop method run in executor.
So if we have an executor and want to execute something basically a bit out of AsyncIO governance, we can run it in an executor like that and get an AsyncIO future directly.
So fundamentally, basically this removes the traditional, or at least the traditional perceived limitation of AsyncIO not supporting CPU-bound task concurrency or parallelism. So this is a very nice, easy way how to make this available in AsyncIO directly.
A few practical examples. I think we could find much, much, much, much more. I've picked two if we want to do some quick, or maybe even not quick,
but just quickly coded Perl batch processing. For example, running Pandas pipelines on multiple files, grid search, hyperparameters, doing it with probably a process pool executor. For example, here and just spawning the processes is a very simple way
to let you orchestrate that either on your machine or some other machine if you log in remotely. Non-blocking data processing in a web server. This would be especially relevant if that web server is implemented in AsyncIO.
Say we have fast API, we want some API endpoints to have a heavier CPU-bound task, but still want to maintain the AsyncIO concurrency. If we plug in a process pool, or even could be a thread pool executor in
and use the AsyncIO integration I showed just before, we can efficiently have that web server still responsive enough. Or if we do streaming, could be similar.
Instead of fast API, one can imagine some pops-up queue or any messaging coming in, when the process that receives the messages needs to be responsive. What we need to be careful in any case is resource utilization
and in-particle memory, because if you run out of memory, what happens typically is something gets killed and often we even don't know what actually it is or it is going to be. So I've showed so far the built-in, with a small exception of Loki,
the built-in API, basically the obstruction of executors, of futures, how it can be used. And now we are going to see how this knowledge,
the same concepts basically can be used and I should say scaled towards this larger scale distributed computing. I will use two frameworks, Dask and Ray.
This talk is not about showing all the details or not even about comparing these two, so I'll just show that basically the same concepts from concurrent futures and built-in abstractions are present in these big data frameworks.
So what happens now? We want to scale out what could be the driver. We need to process huge data sets. The calculation is just too heavy. There is a lot of data crunching.
We need to do something repetitively, a big grid search or tuning a machine learning module. So this does not fit into our single machine anymore. Sometimes we can also have really not resource-related reasons.
There could be security compliance that we just cannot do things on our local computer and then the frameworks can actually help to proxy doing things somewhere where the security compliance is met.
Or even more simply, you just want to turn your computer off and you want to let some calculation run and you can also do it with the techniques I'm going to show. So basically two main drivers in two main types of resources
would be there for scaling out either memory or computing power. I mentioned CPU, but there's also other processing units, GPUs, whatever other PUs. So the memory typically we would say
the data do not fit into my, or not my, memory's computer. The symptoms, out of memory kills, operating system swapping out, making your machine not responsive anymore. So that's how one would typically recognize
there's something wrong with memory usage. The other type of resource is processing power and the typical situation is the processing just takes too long. It still runs when it's taking too long. Sometimes this can be confused with memory because as I mentioned before, if you start swapping,
the computer starts to be slow. So we need maybe also to look at processor usage or just measure its temperature. If it's too hot, which can happen easily these days when that's 35 degrees, it's probably the processing power that we are lacking.
Before even doing that decision for scaling, we should go through some checklist and check that we actually have taken all steps not to do that because we can be spending quite some budget on there
and also our effort. So that scaling may not be that fast and that efficient. So remember to profile your code and based on profiling, optimize. Usually there's 20% of the code base or even less doing 80% of the work.
So that's the well-known parts rule. We can also take approaches of, for example, to save memory, to use memory mapping and you still have data on hard drives or SSDs,
process data in chunks if possible. If we can still run on a single machine, we don't need to do this big scaling out, we can still use Dask array for helping with that. For example, data chunking
can be very straightforward with these tools. So now let's take a look. Let's go through how we use Dask and Ray. Under the hood, I will not show that much about how it actually builds big clusters.
I will mention it, but under the hood you can imagine the resources of what we use now can be scaled into very large scales. So Dask may be better known for its data frame and that's like API.
However, this is a quotation from the documentation. It's a Python library for parallel distributed processing and it's fun. It is inspired heavily by Conquering Futures API
as we will see soon, but it can also scale to very big clusters. So let's look at the API very quickly. It should not surprise us. Now we know what Conquering Futures is. Instead of an executor, we have a client.
The client can submit similarly to what we saw in Conquering Futures and as a result, we get now a Dask future which we can wait for. We can also get Dask futures.
There are some differences like in the map behavior but we would understand it when we use it. If we need 100% compatibility, Dask can provide a Conquering Futures executor instance
by the Dask client get executor method. This is because the Dask distributed client future is not 100% compatible with Conquering Futures future. At some point, we probably need to decide
whether to work with Dask native and use its specific features which I will mention or we just stick to Conquering Futures full compatibility. With Ray, Ray is on the level that I'm going to show very similar to Dask. I think it more focuses on machine learning,
AI workloads although it's also in Dask so this is very kind of brief comparison, brief introduction. The typical example of how we do things in Ray is we decorate a function with Ray remote decorator and then we call it the resulting remote method
to do the same as submit basically. It really does more or less the same. It's just that the API is different and to get the results we use RayGet usually.
So the output is object ref so it's not a Conquering Futures future but we can actually turn the result into a future by its future method. And there is a recently still active pull request
to implement a Ray executor similar to what I showed for Dask. So we can get full Conquering Futures compatibility if we need to. Both these frameworks integrate well with AsyncIO. They just do it a bit differently in syntax.
Dask is maybe a bit straightforwardly because we can evade object refs from Ray while Dask we need to switch the asynchronous modes on but in both cases we can get very straightforward integration with AsyncIO and get evadeable objects from Dask or Ray execution.
The architecture, I don't want to go into details but both basically have if we spawn a cluster we would get a scheduler node and worker nodes
and the scheduler would synchronize the work and quite importantly what is important for scaling is we have many multiple very straightforwardly useful options to deploy in a big scale.
Kubernetes using Kubernetes operators on cloud including managed solutions or even to high performance computing job queues like PBS or Slarm if that's your cup of tea or coffee.
So what Dask and Ray solves on top of Conquering Futures is data management in the distributed world. It tries to manage it efficiently. They take a bit different approaches under the hood but on the top level it's just some management and it stores it on the workers somehow.
It stores data on workers if you want to and it also schedules tasks close to data if possible. And very importantly we can use references to data
so basically like remote data as inputs to our functions which would not be possible with the Conquering Futures directly. You would have to do that communication or resolution somehow manually. So an example that Ray puts we put some data into the cluster Ray does something
it stores it and now we have a remote function process data and we can directly use process data remote on the data ref. Now remember I could also use the data directly here and Ray would send it to the cluster but since the data already exists in the cluster
they are just picked up by the task when it's executed in the cluster. So the communication serialization is largely reduced. Similarly we can use call graphs so we can because a result of a calculation
is also an object reference or future we can do the same as we did with data with results so we can basically build call graphs. So these sending futures references is a very powerful concept that both Ray and Dask
implement. We can also do nested tasks so submit tasks from within tasks remotely. It may seem obvious but if you try to do it maybe too much
you can enter a dead-locking situation because you just don't have resources to schedule those tasks. There are ways in Dask and Ray to deal with that. Also importantly we have we can put resource requests for tasks execution
and this is not available in concurrent futures. Both Dask and Ray can manage resources which basically says how many slots for how big tasks are available on particular workers so how much we can schedule run at that point of time
or whether we need to wait for those resources to free. As an example here is how we do it with Ray. What's also important is we do it in runtime so we can base the resource
requests on for example data size that is to be processed. We also get fault tolerance, don't want to go into details but you probably know anything can fail in a software hardware world so there
are some means how Dask and Ray can recover. So the challenges we have in distributed computing that Dask and Ray can help us but we still need to be careful communication software environments or how we distribute Python packages across
workers. We also need to have observability logging because now things are happening somewhere remote and possibly authentication, authorization and costs are also important for us. Choose between Ray and Dask. In a nutshell based on what I said there's
no big differences on the level that I've gone into. Both can manage big clusters and do asynchronous parallel computing. So it's probably the choice is more on other features maybe your already existing stack integrations and so on.
That brings me to the summary. So what we've seen Python provides powerful building concurrency obstruction and some basic implementation in particular concurrent features is the higher level interface to look at
to grasp these obstructions. We can get quite seamless integration with AsyncIO and if we need to scale we use frameworks like Dask or Ray that build upon the same
obstructions but are able to resolve some of the shortcomings like data serialization, communication, resource management and scale to very large clusters. And if you would like to have your slides, I'll have my slides but they are
available on GitHub and also posted into Discord. And with that I would like to thank you for your attention here.
Alright, thanks a lot for this very interesting talk. We've already got questions on Discord. After the first question from Discord is answered you can also please queue up on the front microphone here in the middle. So the question was from Aina, I hope it was
right. I'm sorry if it was not pronounced correctly. What's the difference between threading and concurrent.futures both provide ways to perform concurrent execution? I think very shortly the threading module is more low level and it also provides
API for say lower level synchronization primitives like queues, logs and stuff. So if you want to do more fine-grained threading control go into threading but if the level of abstraction that is
in concurrent.futures is the right level I would say use concurrent.futures and rely on that abstraction. And inside concurrent.futures thread pool executor if one goes into implementation into the source code one would see threading module use inside. All right.
That was it from the internet. For any questions please queue up right now on the microphone. Thank you for your talk. I would like to ask how friendly is Jask or Ray with subprocess
basically if you want to run something outside Python in your workflow? I would say very friendly. You can just very easily spawn subprocesses from your caller context and you would get you can also do threadings but
also subprocesses. So similarly to concurrent.futures process pool executor Jask or Ray can do that locally too. If that was the question. If there is no further questions then
I would ask you to spend another round of applause for Jakub.