We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Introduction to Reactive Programming with RxPY

00:00

Formal Metadata

Title
Introduction to Reactive Programming with RxPY
Title of Series
Number of Parts
490
Author
License
CC Attribution 2.0 Belgium:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal purpose as long as the work is attributed to the author in the manner specified by the author or licensor.
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Reactive Programming is an event based programming method. ReactiveX is a cross-platform implementation of Reactive Programming. It is heavily inspired from functional programming and contains many operators that allow to create, modify, and combine streams of events. Moreover it is composable and extensible. This short introduction presents Reactive Programming through RxPY, the Python implementation of ReactiveX.
Computer programmingComputer programData managementError messageConcurrency (computer science)Computer programmingPresentation of a groupEndliche ModelltheorieVirtual machineComputer animation
Core dumpSoftware developerFirmwareSoftware developerPersonal digital assistantAuthorizationInternetworkingComputer programmingComputer animation
Computer programmingGraph (mathematics)WritingEvent horizonCodeImplementationState observerStreaming mediaOperator (mathematics)Transformation (genetics)Operator (mathematics)Programming paradigmLevel (video gaming)Function (mathematics)Event horizonoutputCodeDiagramTransformation (genetics)State observerCycle (graph theory)NeuroinformatikRectangleGraph (mathematics)Structured programmingArrow of timeFunctional programmingRotationType theoryParameter (computer programming)Formal languageObject-oriented programmingLibrary (computing)Imperative programmingMereologyStreaming mediaProgramming languageMultiplication signSequence diagramSoftware design patternDataflowRevision controlImplementationComputer programmingComputer animation
Raw image formatOperator (mathematics)Source codeDigital filterLevel (video gaming)File formatLetterpress printingLambda calculusDiagramOperator (mathematics)Electronic mailing listLevel (video gaming)Complex (psychology)Letterpress printingFunctional programmingPower (physics)NeuroinformatikGraph (mathematics)Pay televisionoutputGraph (mathematics)ResultantDot productState observerEvent horizonSource codeArithmetic meanComputer programmingTransformation (genetics)CodeSoftwareError messageNatural numberEndliche ModelltheorieCASE <Informatik>Object-oriented programmingComputer animation
Demo (music)Error messageOperator (mathematics)Graph (mathematics)Level (video gaming)Connected spaceComputer animation
Source codeData typeIntegrated development environmentThread (computing)Concurrency (computer science)BefehlsprozessorEvent horizonSoftware frameworkOperator (mathematics)Software frameworkLevel (video gaming)Concurrency (computer science)Pay televisionThread (computing)Exception handlingFlow separationString (computer science)IntegerLogic programmingType theoryEvent horizonSource codeBefehlsprozessorError messageDifferent (Kate Ryan album)CASE <Informatik>Scheduling (computing)Complex (psychology)Computer animation
Source codePredictionStreaming mediaMetric systemEvent horizonCASE <Informatik>Configuration spaceoutputRepresentational state transferMereologyCombinational logicSoftware frameworkFront and back endsService (economics)ResultantPrimitive (album)Element (mathematics)Endliche ModelltheorieSource codeReading (process)NeuroinformatikData managementVirtual machineFlow separationComputer animation
Installation artComputer programmingTwitterBlogGraph (mathematics)Thread (computing)Concurrency (computer science)CodeFunctional programmingLoop (music)Data structureComplex (psychology)Data managementLevel (video gaming)InformationError messageDefault (computer science)Generic programmingElement (mathematics)NeuroinformatikElectronic mailing listInterpreter (computing)Operator (mathematics)Installation artEndliche ModelltheorieEvent horizonSlide ruleSoftware testingScheduling (computing)AnalogyPresentation of a groupDifferent (Kate Ryan album)Link (knot theory)Computer animation
Open sourcePoint cloudFacebook
Transcript: English(auto-generated)
OK, next up for today, please welcome Romain, who's going to explain to us what is reactive programming. Thank you. Hello everyone.
So my name is Romain Piquet, and I will talk about reactive programming. So it's been a long day, but please stay awake, at least for the next 20 minutes. So, the presentation will start with an introduction on reactive programming, and more specifically,
ReactiveX. Then I will go through a simple example on how to use it, and we'll see how to deal with errors, and how to deal with concurrency, and I will conclude with a more realistic example on how ReactiveX can be used with something that we use to deploy our machine
learning models. Just before we start, let me introduce myself. So I'm a data scientist at Softatom. At Softatom, we're a medium-sized company where we develop firmwares for internet routers,
TV decoders, and vocal assistants. So most of our developers are embedded developers doing C and C++, but on my team, we are doing almost exclusively Python. I'm also one of the developers of the RxPy library, and I am also the author of
a book on this topic. So let's go and let's start with just a simple poll. How many of you already used the reactive programming before? Raise your hand. Yeah. Five? Ten people? Okay. That's great.
Did you use ReactiveX for the few that used it? No? Yes, one maybe? Okay. So you're the correct audience for me today. That's fine. So to say it simply, and maybe in a vague way, reactive programming is one way to write
even-driven code. So it's not really tied to a programming language or paradigm. You can use it with imperative programming or object-oriented programming or functional programming. You can use it with, let's say, almost any programming language.
It's really just a way to structure code. It's done in a way that you write a computation graph. So on the right, you have some example of a graph that you can do. And the nodes represent computations that are done, transformations.
And the edges represent data flow. So basically, you just write a computation graph, and the transformation will apply to each item that flows to this stream of events. So you can write acyclic graph, like on the top, but also cycle graph.
And in general, when you have a real application, you have both in it. It's mostly an acyclic graph, but with some parts that contain some cycles. ReactiveX is one of the most used libraries to, well, it's a very used one.
It was originally developed by Microsoft. And it's been open-sourced in 2012. And since then, it's been written in many different languages. And the RxPy is the Python implementation of ReactiveX. Last summer, we released the version 3 of this library.
And it's a major release, mainly due to the fact that we use the pipe operator. You will see later what it means. But the big thing is that it makes it much more easy to write your own operators and to extend the library.
Another great thing is that we now have a documentation, so that's fine. That's really great because before, you had to read the code to see how it works. So now, you can read the doc online. So that's something that is really great, I think. And we dropped the support for Python 2.
But anyway, nobody's using it now, I suppose. So this shouldn't be an issue. So if you want to understand what is ReactiveX, you basically need to understand three things. The first one is what is an observable.
An observable is just another name for a stream of events. And by the way, we do not name events, but usually we name them items. So an observable, it's something that emits some items. The second thing you have to know is what's an observer. So if you use object-oriented programming, you probably already heard this term.
And by the way, it's the same meaning. An observer, it's an entity that will receive some items. So it's something that subscribes to an observable and will receive its time times. By the way, ReactiveX, it's more or less the observer design pattern that's implemented
in a functional way instead of an object-oriented way. And when you combine observables and observers, you can write operators. So an operator is a transformation that is done on the events that are emitted by the observables.
So let's see with an example. So this is what we call a marble diagram. It's similar to a UML sequence diagram but rotated. So the arrows represent observables. So here on the top, there's an observable that emits items 1, 2, 3, and 4.
The rectangles represent the operator. So here it's the map operator. And the map operator takes a function as a parameter and it applies this function to each item that is received.
So in this example, the function just decreases the value. So when item 1 is emitted, it goes through the map operator. The value is decreased and the map operator emits on a new observable the value 0. So we have this for all items that are emitted by the observer.
Sorry, the observable. So on this example, at the end, the map operator emits 0, 1, 2, 3. The nice thing here is that the type of the input, it's an observable. But the type of the output, it's also an observable. So operators take observables as input and also as output.
So this is what makes them easy to plug them together. And basically, when you write a reactivity application, you just plug in many operators together and that's how you write your computation graph.
So let's see with an example. We'll write a very simple graph where we will chain only two operators. So this is another kind of diagram that I call a reactivity diagram. It's inspired from UML activity diagrams, but the meaning is different.
In an activity diagram, you represent a code flow. Here, the diagram represents a data flow. So on the top, you have a source observable. The black dots represent a source. And the items will flow through it. We will first use the map operator to decrease the values of items that are emitted.
And then we will use the filter operator to keep only the event values. And then the result will be an observable that emits the result of these two transformations. So this program is something that does not really have a start and an end.
Time-wise, it will start when the observable is started and it will end only if the source observable finishes at some time, which may not be the case depending on the source that we use. So let's see this river, some code.
So we start with a few inputs to get the reactivity models loaded. There's one for all the base functions and another one dedicated to get all the operators that are available. We create an observable, in this case, from a list.
So in a real case, usually the source is something that comes from an IO like a network connection, or it could be a timer that will tick every second or things like that, or it can be a user input from an UI. But here, for an example, we create a hard-coded observable
that will emit items 1, 2, 3, 4. Then we will declare the computation graph that we want to do on this observable. So this is where we use the pipe method on the source observable. And we will just chain the map operator and the filter operator.
So on the map operator, we tell it to decrease the value for each item that are emitted. And then the filter operator, we tell it to keep only the event values. So the values whose modulus 2 is 0. The result of this column is also an observable.
So what source.pipe returns, it's also an observable. So if I run it this way, nothing happens because we just have something that created observables, but nothing is executed yet. We just declared the computation graph that we want to do.
So observables are lazy. They only emit items when somebody subscribes to them. So that's what we have to do next. We subscribe to this observable. And in our case, we just print each value that is emitted at the end.
So we use the next callback to, in our case, print the value. And there are two other callbacks that are available to be notified when the observable completes or when an error occurs. And this subscription function returns a disposable object that can be used to unsubscribe from the observable.
So that's all. Here we've got our first sample, but we have almost everything that you have to know to write a reactive code. So if you execute this, the source observable will start emitting item 1.
It will go through the map operator that will decrease the value. So the item becomes 0. Then it goes through the filter operator. 0 is an even value, so it will be printed on the subscription callback. Then item 2 is emitted. 2 minus 1 is 1. 1 is not an even value, so it will be dropped, and nothing will be printed.
Then 3 is emitted. 3 minus 1 is 2. 2 is an even value, so 2 will be printed, and 4 is emitted. 4 minus 1 is 3. 3 is not an even value, so it will also be dropped. So you've got this at the end. Pretty simple. But the important thing here is that, as you can see,
you have smaller computation blocks, and the power comes from the combination of them. You can combine them in many ways and build really complex graphs with complex computations in a quite easy way. With events that are asynchronous by nature.
So what we did here was simple, and it seems that it's too simple to be realistic. But in fact, when we connected our two operators together,
we did not make a single connection. So that's one important thing to know, and it's an interesting thing to handle errors with ReactiveX. So basically, when we connect operators together,
there are two connections that are done. There's one happy path where the items are emitted. So if everything goes well, the items flow through the happy path. So the map operator takes items from the happy path and outputs them on it, as well as the filter one.
But if anything wrong happens, there's also a failure path that allows to propagate errors and stop all the data flow. So typically, if there's an error that occurs in the map operator, this error will not propagate through the happy path,
but through the error path, and then all operators that are after in the graph can handle this error in a graceful way and do more or less complex things on it. So this means that what we've just implemented with no special logic to handle errors,
it natively handles them. So for example, if we replace our source observable, and the second item, we replace the integer with a string, something wrong will happen, because on the map operator, we will try to decrease a string,
and this is not allowed in Python. So what happens is just this. We first get the item one that is emitted and decreased. This works well. But when the string is emitted, the map operator raises an exception, and this exception is propagated through the filter operator.
In this case, the filter operator does nothing but just forwards it, and it goes up until the subscription error callback. And after that, nothing happens, because once an observable has completed,
either on success or on failure, there's no items that can flow after that. So that's why we don't have the value two that is emitted after that. So that's one simple case, but it's one of the very usual way to handle errors. There's something wrong, we do something,
and we just sub the program. There are some other operators that are available to do more complex things on errors, but that's really the base way of how it's managed, and it allows to handle many cases already, and in an easy way.
Another thing that can be done with RxPy and ReactiveX is an easy way to deal with concurrency. So this is done with what we call schedulers. So you can handle different types of concurrency, CPU concurrency with thread pools or threads that are created on demand.
We can also deal with IO concurrency. We support several kinds of event loops, such as asyncio, but also some other ones, like twistedg event. And there's also some built-in support for several frameworks that have main loops,
such as Qt and GTK. So the thing that is interesting here also, just like most of the things in ReactiveX, is that it's extensible, so you can write your own schedulers and do other kinds of scheduling if you need it.
I don't have an example here because it would use too much time, but you can see on the documentation there are many examples on how to use it. So how can we use it in real life? That's a typical example on how my team deploy machine learning model on our backend.
So each model is deployed as a Docker container, and we use PyPy as an interpreter. We use asyncio for all the input-output management. We use RxPy for all the feature engineering
and the model execution. And we use another small framework that is called cyclotron that is used to separate the IOs from the data flow. So this is the case where everything works really well together because on our backend we use Kafka
as a source on the destination on events. So everything that we do, we read events from Kafka, we do some computation, and we write the result to Kafka. And we have many microservices that are chained together this way. So on a typical deployment,
we have our service that runs with RxPy for the related computation part. It receives events from Kafka with IO Kafka. So we have a Kafka consumer that is done with asyncio. It receives some configuration with Consul.
Consul, it's a tool that allows to deal with configuration, and it's exposed via REST APIs. The result of the computation is exposed on a Kafka topic, and we also expose some operational metrics with primitives.
And so this is an example that is simple, but it can deal with quite complex ways. Because the Kafka source here, there's a single row, but it can be a combination of several topics. So we can read many topics at the same time,
combine them together with RxPy, and do all our computation. And this is a case where all of these elements fit very well together. Kafka with asyncio and RxPy, it's really things that do great things with each part, each other.
So that's all for me. If you want more on this, you can just play with it. You just pimp install Rx, and you've got it. The documentation is available on readthedoc. You've got some more generic information on reactivics. And also I did a link to a presentation by Scott Blasching
for all the error management on the railway analogy. This is really a great talk that he did a few years ago. I suggest that you read it if you're interested in the topic. Thank you very much.
Don't be shy with the questions. The first three people asking questions will have a copy of my book. Does it support MicroPython? I did not try it.
Let's talk together later. I have done something with reactive programming, but I had to use Rx Lua, and I wanted to switch to RxPy with MicroPython. Sorry, I didn't hear that. The first question was, does it support MicroPython?
Neither, please. The question was, does it support MicroPython? That's it.
His question was, does it support MicroPython? I didn't try it, but from what I saw in MicroPython, it may work. I'm not sure that there are things that are not supported by it, that should not work, but I didn't even test it.
I have two questions, please. What you showed us, I think in slide 14, was the contrived example that you had, Braemade data source. You had Braemade data source, which is the static list.
My question is, what is the default execution model? Is it async? No, the default model execution, everything is running on the interpreter main thread,
but then you can change it and use an async-au event loop or a qt event loop if you want. We can talk after.
Bonjour, bonjour. My question is, what would you recommend to manage complexity of large pipelines? It's all great when you write two or three steps in a pipeline, but it gets very large, like 15, 20 elements, and every year passes and you need to adjust something in seven steps.
How do you manage complexity? The way to manage complexity with this kind of code is the same one than the usual way. You split your code in many small functions that wrap smaller pieces of operations so that you use composition to get some small elements
that are combined together. At the end, you have one graph that should stay simple, but each element in it is also another graph that does all the computation, and you can deal with it this way. You have to compose all the operators together to do higher level computation.
Otherwise, yes, you can quickly get very big things and hard to manage. How do you deal with concurrency access to a data structure between the different pipelines?
Concurrency is managed by schedulers. Each operator does not handle concurrency in itself, so it assumes that everything is executing on the same thread.
But if you need to combine things that can come from multiple threads, there are some schedulers that are dedicated to it and that can serialize access to the next step of the computation graph.