Building Event-Driven Python service using FastStream and AsyncAPI
This is a modal window.
The media could not be loaded, either because the server or network failed or because the format is not supported.
Formal Metadata
Title |
| |
Title of Series | ||
Number of Parts | 131 | |
Author | ||
Contributors | ||
License | CC Attribution - NonCommercial - ShareAlike 3.0 Unported: You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this | |
Identifiers | 10.5446/69409 (DOI) | |
Publisher | ||
Release Date | ||
Language |
Content Metadata
Subject Area | ||
Genre | ||
Abstract |
|
EuroPython 2024125 / 131
1
10
12
13
16
19
22
33
48
51
54
56
70
71
84
92
93
95
99
107
111
117
123
00:00
Event horizonBuildingProduct (business)Product (business)Event horizonLocal ringArchitectureDialectComputing platformMereologyCybersexDeterminismWeb 2.0Computer animation
00:41
Event horizonCodeOpen setEvent horizonArchitectureLatent heatStandard deviationService-oriented architectureFront and back endsSoftware architectureRepresentational state transferBridging (networking)MathematicsState of matterPattern languagePhysical systemIdentifiabilityDifferent (Kate Ryan album)outputInformationOrder (biology)Gateway (telecommunications)LogicRow (database)Data storage devicePhase transitionTrailData structureGroup actionMereologyEnterprise architectureRouter (computing)Connectivity (graph theory)Goodness of fitBus (computing)CASE <Informatik>Dependent and independent variablesMessage passingRevision controlInteractive televisionType theoryLoginProduct (business)Normal (geometry)Streaming mediaRoutingWeb 2.0Open setComputer animation
08:48
Service-oriented architectureArchitectureExploratory data analysisEvent horizonBuildingReading (process)BlogCASE <Informatik>Observational studyString (computer science)Open sourceGroup actionMilitary operationMessage passingOperations researchInformationServer (computing)Information securityDependent and independent variablesEmailKeyboard shortcutParameter (computer programming)Component-based software engineeringStandard deviationAsynchronous communicationProjective planeEvent horizonSimilarity (geometry)Dependent and independent variablesMessage passingEmailGroup actionLatent heatConnectivity (graph theory)AdditionInformationMereologyServer (computing)Physical systemOperator (mathematics)Information securitySystems engineeringComputer wormArchitectureOpen setEvent-driven programmingComputer animation
10:27
Message passingOperations researchPerturbation theoryString (computer science)Object (grammar)Thermal expansionComputer wormEvent horizonSign (mathematics)Message passingBitSoftware developerLatent heatCASE <Informatik>Descriptive statisticsSource code
11:12
Physical systemCASE <Informatik>Meeting/Interview
11:45
Message passingDependent and independent variablesStreaming mediaService-oriented architectureMiddlewareSerial portMechanism designSoftware frameworkMereologyRouter (computing)Power (physics)InjektivitätEvent horizonSemiconductor memoryImplementationMobile appSoftware testingServer (computing)CASE <Informatik>INTEGRALValidity (statistics)Endliche ModelltheorieMultiplicationParameter (computer programming)Computer animation
13:53
Service-oriented architectureComa BerenicesUsabilityComputer-generated imageryInformationMiddlewareService-oriented architectureSoftware testingParameter (computer programming)Dependent and independent variablesOcean currentMobile appPhysical systemStreaming mediaClient (computing)Functional (mathematics)Shared memoryConnected spaceComputer fileEvent horizonMessage passingLine (geometry)Medical imagingObject (grammar)Gene cluster
16:23
Computer-generated imageryInstant MessagingEvent horizonService-oriented architectureServer (computing)Structural loadSemiconductor memoryBefehlsprozessorCartesian coordinate systemScaling (geometry)Code
16:52
Absolute valueEvent horizonMobile appString (computer science)Cartesian coordinate systemCodePhysical systemMessage passingCodeSoftware testingService-oriented architectureSource code
17:34
Core dumpPersonal digital assistantPrinciple of relativityMobile appModule (mathematics)Computer animationSource code
18:03
GEDCOMEvolutionarily stable strategyMessage passingEvent horizonDefault (computer science)LoginRule of inferenceComputer animation
18:25
Smith chartSuccessive over-relaxationUsabilityRule of inferenceCartesian coordinate systemEndliche ModelltheorieSource codeComputer animation
18:47
Data modelService-oriented architectureEvent horizonDisintegrationDesign of experimentsRouter (computing)Service-oriented architectureError messageRouter (computing)RootSerial portStreaming mediaFile formatComputer fileMessage passingEvent horizonContext awarenessMereologyEndliche ModelltheorieCodeFunctional (mathematics)Module (mathematics)RoutingSoftware testingVideo gameSet (mathematics)Cartesian coordinate systemMilitary baseLevel (video gaming)INTEGRALFunction (mathematics)Run time (program lifecycle phase)ResultantMiddlewareAutomatic differentiationMathematicsRule of inferenceSource code
22:50
Server (computing)Latent heatMessage passingObject (grammar)Cross-correlationOperations researchServer (computing)Cartesian coordinate systemOperator (mathematics)Endliche ModelltheorieFunctional (mathematics)Data typeSource code
23:17
DisintegrationServer (computing)Message passingComputer wormCross-correlationOperations researchLatent heatMessage passingData structureQuery languageComputer wormEvent horizonFunctional (mathematics)Multiplication signLatent heatInformationDifferent (Kate Ryan album)Operator (mathematics)Type theoryEndliche ModelltheorieOpen setShortest path problemStreaming mediaWritingSource code
24:36
Memory managementEvent horizonContext awarenessRouter (computing)Functional (mathematics)MultiplicationData structureEvent horizonSet (mathematics)Different (Kate Ryan album)Data typeService-oriented architectureWindows RegistryOperator (mathematics)Image registrationCodeSimilarity (geometry)Process (computing)ChainData conversionSource code
26:27
Product (business)Unit testingSoftware testingEvent horizonPhase transitionDependent and independent variablesMultiplication sign1 (number)Pattern languageService-oriented architectureArchitectureNormal (geometry)Data structureExecution unitFunctional (mathematics)Event-driven programmingComputer animationLecture/ConferenceMeeting/Interview
Transcript: English(auto-generated)
00:04
I'm Abhinansi, I'm currently a product engineer at Stollby team in USD Global. We are working on building a travel platform to change or modernize our booking experience to the users.
00:21
And I'm also part of Trivandrum Python community organizing a few local regional Python events. And also I volunteer at Kerala Police Cyberdom to build web-based tools for them. So let's go and talk about event-driven architecture.
00:42
We have heard a lot about events. Events are everywhere, event-driven architecture is everywhere. But what exactly is an event? Event is basically a change in state or an update. An example would be an item being placed in the shopping cart.
01:04
Or it can contain data like the state of the system or the identifier. And it usually represents the facts that have happened in the system. There is a slight difference between event and a normal user request.
01:25
That is normally events and commands. Events are something like when the user places an order to our back end. That's actually a command.
01:40
The user represents an intent and wants to perform an action and asks to do something. Failures can happen, the order can be rejected and there will be only one entity consuming it. And then the API Gateway will perform and our back end logics will process the order.
02:05
Here it's a simple Lambda service and then when it is done it is informed to the user. And then we create an event called order created. It is sent to our event broker or messaging system. That order created event is the fact that we have created an order.
02:25
And it is an immutable information that it won't be changed or it won't change. And we can select what all events we want to consume. That is the consumers. We will be building a notification service which will be consuming that order that is created.
02:45
We should notify the user. And if it is not part of the common tenant or like it is part of an enterprise tenant, the notification should be sent to Slack also. So all this information could be curtailed and we may have multiple different consumers.
03:04
One consumer for notification, one consumer for logging into the audit log. And all those information are there. That's basically what's the difference between commands and events. And here there's an event bridge.
03:20
Here what exactly is event bridge? It's an Amazon service that's basically an event broker. Event brokers are what stand between producers and consumers. And it basically is kind of like a middleman that receives all the input information
03:41
from the producers and the consumers consumed from them are subscribed to each topic or each subject that they want to handle. And all about this. So what's actually an event driven architecture? What is it? It's actually a software architecture pattern that's built upon production, detection, consumption
04:07
and reaction to events. It uses events to trigger and communicate between multiple different decoupled services. Here we usually have three different components. Event producers are some services that create an event and generate an event
04:26
and something has happened to their state and they inform others, inform the event broker that a state is changed or an event is created. The event broker is the orchestration layer or the system that is behind handling all the consumption,
04:44
all the tracking of the request and which all producers are there, which all consumers are there. They may have also persistence like Kafka or JET stream that they have the store of the structure and we can replay from the start till the end.
05:02
And there are many different patterns usually found like event notification system where events are just sent out as notification, no track is record. State carried event system that is basically sending the change of states as an event.
05:26
And event sourcing system which keeps track of all the events and we can perform a replicate all the system and playback. We can have a replay of all the activities that happened. It is actually used mostly in enterprise scenarios to have a good audio record.
05:45
So that's it. So in traditional microservices without an event system or an event bus or a queue, all the service usually communicate directly with each other. In event driven architecture, services push the events to the event router
06:02
and they don't care about who the consumer is and what are they going to do with the data. They are not aware of the consumer. They just are aware of the standard of schema that they define to push. And consumers pull those events which they want to listen.
06:24
Service C might want to listen to all the events of service A. Service D might only wish to listen to the events of an auto-create event. And service E might only be interested in receiving critical events.
06:44
And mostly you might have heard or used a few of these tools or heard about them like Apache Kafka, Nats, RabbitMQ, EventBridge, Google Cloud PubSub. Many others are there. Just mentioning a few popular tools.
07:00
We'll be focusing on trying to develop using Nats as it's very simple to set up and get started. And we usually in large teams what happens is that we define our events we send and we build a system. Then later we don't have enough documentation, enough information to identify that as a person,
07:32
part of a consumer team, I may not have any information or insight about what's actually the logic they have written.
07:40
Or there would be a structure. So the structure would change. In case of REST APIs we usually use OpenAPI as our documentation system. And we define our APIs using OpenAPI specification. That's previously known as Swagger.
08:02
So in OpenAPI specification it's mainly intended for request or response. That is synchronous one-to-one messaging. And there's no support for asynchronous. The support for webbooks are basically an asynchronous system.
08:21
That's there in OpenAPI version 3.1. All other types of asynchronous interactions are not there. And no type of one-to-many asynchronous interaction. That's mostly seen in our event architecture. That's what's present here. That is we have one publisher, many consumers.
08:42
All those patterns are not defined by OpenAPI specification. So what if we create an OpenAPI standard for asynchronous communication? That's what AsyncAPI is for. AsyncAPI is a project under Linux Foundation which is trying to build a similar system as OpenAPI specification
09:09
for asynchronous communication and event-driven architectures. And it is fully compliant with OpenAPI specification. As you can see that in OpenAPI 3.0 the parts like info, we have the same in AsyncAPI 3.0, the similar system.
09:30
Here servers and securities are combined in AsyncAPI. And the corresponding thing to parts is basically channels.
09:42
And part item which request is being handled is basically a channel. And we have the operations are defined. Here also we have operations. And request and responses is analogous to our message.
10:00
And it contains messages, headers, payloads, and message references. And the get, post, put or post is basically our action that's happening. So similarly the other components and the other specification are also correlated
10:22
and they have many different but few additions are there. So this is what an AsyncAPI specification looks like. So here we have something like a simple AsyncAPI description of writing as simple user signup event message.
10:44
And that's there. So the concern is usually that we are usually creating documentations for our teams. But usually the documentations don't stay up to date as we developers are usually a bit lazy
11:05
to have something maintaining the documentation, come back to developing things and creating new things. So usually the case is that our documentation is not up to date.
11:21
And that's a challenge we have to face. So in case of our system we are trying to integrate this AsyncAPI into Python and have an automatically up-to-date system. That's where AsyncAPI comes in with us.
11:44
And we are integrating AsyncAPI into Python using Fasting. The major features of Fasting are it supports multiple brokers, it provides a unified API to work across multiple message brokers like Kafka, RabbitMQ, Redis.
12:02
And it leverages Pydantic's validation capability to serialize and validate incoming messages. That is we can use Pydantic models as the parameters and responses for our asynchronous events directly. And we have an automatically updated AsyncAPI documentation.
12:23
The documentation would always be auto-generated. And we can utilize the powerful dependency injection system. It is similar to what's found in our fast APIs dependency injection system. Basically we are using fast depends in our Fasting as a dependency injection.
12:51
And it supports built-in in-memory test making our CI CD pipeline reliable. And we can extend it to support our own custom implementation.
13:04
We can use lifespan when the event brokers should start, when it should end. We can create custom middlewares. We can have custom serialization mechanisms. And it's supported with all HTTP frameworks. We can simply use basically the lifespan part to start and stop when our API servers are there.
13:31
In case of Fasting API, we have a direct integration. We can directly use the fast API routers capability along with our Fasting services.
13:45
So let's dive into building our first fast stream app. It's very simple to get started. We can create a NAT broker client. We can otherwise use Kafka, RabbitMQ, or any other brokers.
14:04
Currently it supports only these four, NAT, Kafka, RabbitMQ, and Redis. And we can define our brokers. Share the connection URL here. And just write the create our function which handles our request using addbroker.subscribe.
14:25
And the test here is the subject. So which subject should our consumer or subscriber should listen to? That's what basically we can add it here. And we can also pass in other parameters like middlewares or other documentation related information.
14:45
And the base handler variable, currently this is a simple app so it supports a simple variable body. And the whole response that is received will be printed. So that's how we define.
15:00
It's so simple we can do it in two lines. So to run this we need to create a fast stream app. For that we have to import fast stream and pass on the broker that we are using and generate app. And here we have defined the NAT broker. We need to run the NAT broker on our system.
15:24
For that it's very simple and we can use a simple Docker Compose file to run our NAT event broker. We just pull the image, set the container name. The ports that are important is basically 422.
15:40
That's where our system, our Python app will be connected to the NAT service. 8 triple 2 is the monitoring port which we can use with various systems like NAT dashboard to analyze how our data packets are being sent. Port 6 triple 2 is for making our NAT system into a cluster and connecting with other clusters.
16:08
This command, minus ds, is enable jetstream, support jetstream. It's a persistent layer that we can use and adding the monitoring port as 8 triple 2.
16:23
So when we launch our NAT Docker Compose we can see that it's running how much memory and CPU load is being used. It's a simple Go application. It is highly efficient in resource usage.
16:41
And we can define which subscribers are there, how many are connected. And we can easily scale. That's the ability of NATs. So we can run and test. But we missed out something in our code. We need to send that. We have only defined the subscriber. We need to send a message to our system that we have received the body.
17:07
So that's something to pass as a producer. So that we can define as a test subject that it would run after when the app is started. So it's a one-time application thing.
17:22
So app.startup we can define. The broker.publish we can actually use anywhere in our code to publish our data or events to our system. So to run this we can write a bar string run app, call it app. Similar to how we run UbiCon app app.
17:42
App is the module name. And here we have used app itself as our package name. So it's what we are running. And as I have said, Hello Python is what I published when the app starts. And I printed as received body.
18:04
So that's what you can see here. Received Hello Python. And by default it turns on all the loggers. So it waits for the messages and all the message events with that ID that was received on which subject will be shown as a notification.
18:24
We can turn it off by simply passing comma logger equal to none or set your own custom login rules here. So that's it. A very simple application. Now let's integrate with fast API, Pydantic and async API.
18:46
So to create a simple Pydantic model for this session I'm just using a simple user ID name as a user model which is defined using Pydantic. And we'll be importing this into our code, currently written code and changing.
19:05
Instead of in the subscriber I'm passing user colon annotating with the Pydantic model. Similar to fast API, fast stream also uses annotation to identify what data is there. So if you are setting some return value annotation, it will also be validated.
19:26
So setting something else other than the intent, obviously resulting errors, runtime errors. So it's similar to fast API how we define our parts. So we can simply use our existing fast API rules and add it as a subscriber by just adding our broker subscriber line.
19:51
So we have defined imported user model and passed it and we can directly use user.name and name. So for sending that we'll be simply passing that Pydantic model instead of the message we used into the subject.
20:10
So that Pydantic model serialization would be handled by fast stream. Fast stream would automatically handle how it should be serialized into a format and it would be deserializing the similar way here.
20:25
So that's how we can create a simple Pydantic model. And to create an event producer, instead of just using our as a test method, we can separately create a producer.py file
20:45
and set something like a simple asynchronous function and use an ads broker and use br.publish command to send our request.
21:00
So we will be using the same producer file to test all of our integration. We need not write it in the same file itself. We can use another service to run the producer. So in our usual code bases we would be using only this simple part so that we can use it anywhere in our code.
21:24
And we will be defining the broker in the global level. So that's how we can produce and consume Pydantic model. So let's integrate it with fast API. So the fast API integration is pretty simple.
21:42
Instead of using an ads broker, we are now using an ads router in the fast stream module that we can define the router and include similar to how we include our routers in fast API we will be including. One thing that we have to not forget is set the lifespan context in the fast API path.
22:06
This is done so that we can start our broker when the fast API application starts and stop it when our application stops. Similarly, if you are not using fast API, if you are using Falcon or any other service,
22:20
we can directly use our ads broker and set the lifespan context of that broker as a middleware in our Falcon application so that it starts when the application starts. So this context is the same for broker and router as well.
22:43
So on doing this, this is similar. So the output will always be similar. And we can directly access our data in slash asyncap path. So from this path in our fast API application, we can see what all servers we have defined,
23:05
what all operations we have returned and what all models or data types we are using. So we have defined only a single function that is handle message, handle MSG and it listens on the subject user register.
23:24
So that's what is returned, which subject is handling by which function. That's our operation. And what message data it accepts, it accepts a user model data and the payload should be user ID, similar to what we see in OpenAPI specification.
23:46
And the message is detailing our message payload structure and the schema. These two are currently same because we are only using one simple function.
24:00
If you are writing different functions, we can see all the payload information here and different schema information here, just as we see in OpenAPI spec or as we see types and queries and mutations in GraphQL schema. Similar to that, we have developed and used our schema.
24:21
So that's how we have used to auto-generate our text. And there are a lot more things that we can explore in Fast Stream. A simple one before the time is out is we can change events. That is basically, firstly, we would be sending an event user registry
24:46
similar to what we have done. And that user registration operation can return back a model and publish it to another subject or another topic after processing.
25:01
It would be basically useful for something like we are keeping our service as a middleware service and it accepts all the incoming events, processes it, converts it, transforms it into another data structure and returns as a published topic, published event in another topic.
25:24
And the other subscriber can listen and utilize it. That's what chaining event is. We can easily chain event by just annotating a function as publisher in our code base.
25:42
This annotation would only work if there is already a subscriber. Setting a standalone publisher in a function is not supported. And we can set a standalone subscriber and have multiple topics.
26:00
We can also add multiple sets of subscribers in a single function. That is, a single function would be listening to consuming events from multiple different topics provided they all have similar data type structure that is being passed on. And it can only publish to, it can only return to one different topic.
26:29
That's it. Thank you guys. Is there any questions? Thank you very much, Abinand.
26:41
We've got some time for questions exactly. If you have any questions, please stand behind the microphones and ask them away. In the meantime, I do have a question. Do you see any pitfalls in event-driven architecture compared to the more usual ones? Yeah, obviously all the architecture patterns have their own benefits and downfalls.
27:06
The usual simple pitfall is we cannot easily debug things. So in a normal request-response pattern, we can easily find which request has gone through which function.
27:21
Which function we can easily debug. And we can put forward a simple tracing service like Datadog, APM, Sentry, APM or any other tracer and simply track what's happening there. But in event-driven, it's much more complicated. We cannot actually easily debug compared to our normal request-response structures.
27:45
Okay, thank you very much. What about unit testing? Is it also quite harder to unit test or you mock a lot? Unit test, you see, compactively it's harder as we need to know what all events would be fired well.
28:03
But we can also perform unit test easily as we only need to know what all events are coming, what all events our individual service or microservice consumes and produces. So we can define a test easily so that which event we can mock and send a few events
28:26
and what events would it be correspondingly produced, we can test and record it. And all the state changes and DP. So unit tests are not that much of a problem in ADA, I believe.
28:42
Brilliant, thank you. So it looks like we exhausted the time and questions. So give it up for Abhinay.