We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Advanced asyncio: Solving Real-world Production Problems

00:00

Formal Metadata

Title
Advanced asyncio: Solving Real-world Production Problems
Title of Series
Number of Parts
118
Author
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
By building a simplified chaos monkey service, we will walk through how to create a good foundation for an asyncio-based service, including graceful shutdowns, proper exception handling, and testing asynchronous code. We’ll get into the hairier topics as well, covering topics like working with synchronous code, debugging and profiling, and working with threaded code. We’ll learn how to approach asynchronous and concurrent programming with Python’s asyncio library, take away some best practices, and learn what pitfalls to avoid. Outline: (40 minutes + 5 min Q&A, if unable to get 45 minutes, then 30 min slot with no time for Q&A) Intro (2m) Speaker/company intro Setting the context/purpose of talk Foundations (9m - trimmed to 6m for 30 min slot) Initial setup of an asyncio service (2m) --- Required boilerplate code --- Inspiration from official asyncio tutorial docs Making a service emactually/em concurrent (5m) --- non-blocking vs concurrent --- when to be concurrent vs serial --- using callbacks vs awaits vs scheduling tasks (create_task) vs asyncio.Events --- Making synchronous code asyncio-friendly (2m) Intermediate (9m - trimmed to 6m for 30 min slot) Graceful shutdowns (3m) --- What a signal handler is, why it’s needed --- What signals to listen to --- Gotchas of cancelling tasks, asyncio.shield + shutdown behavior Exception handling (3m) --- Difference between top-level exception handling and handling within other coroutines --- Avoid mistakenly swallowing/missing raised exceptions --- Making use of loop.setemexception/emhandler Making threaded code asyncio-“friendly” (3m) --- Calling threaded code from coroutines (aka running within a ThreadPoolExecutor) --- Calling coroutines from from threaded code (aka runemcoroutine/emthreadsafe) Advanced (19m - trimmed to 15m for 30 min slot) Testing asyncio code (7m) --- Benefits of debug mode --- How to mock coroutines Debugging an asyncio service (5m) --- Reinforce debug mode --- Using “tricks"" like codeasyncio.all_tasks/code with logging, codeloop.slow_callback_duration/code, adding context/stack trace in default exception handler Profiling (7m) --- Basic profiling (cProfile, strace) - not that different from sync code --- Continuous profiling with 3rd party tools, i.e. github.com/what-studio/profiling --- PyCharm’s asyncio & thread profiler --- How to properly trace a workflow/request (e.g. for the purpose of distributed tracing) (to be cut if not enough time) Wrap up/Review (1m) /ol
Keywords
20
58
GoogolPoint cloudRootStaff (military)BuildingDigital signal processingVirtual machineProduct (business)Computer animation
Group actionSelf-organizationOpen sourceProjective planeMultiplication signFault-tolerant systemException handlingBitCodePiPresentation of a groupStatistical hypothesis testingProfil (magazine)Lecture/Conference
Statistical hypothesis testingSlide ruleReliefMessage passingString (computer science)Axiom of choiceNumerical digitLattice (order)SimulationRandom numberInformationMilitary operationBlock (periodic table)LoginConcurrency (computer science)Message passingFunctional (mathematics)Loop (music)MeasurementReal numberSimilarity (geometry)Arithmetic meanDatabaseService (economics)Block (periodic table)Data miningEntire functionCovering spaceQueue (abstract data type)Task (computing)CoroutineEvent horizonSimulationLevel (video gaming)Instance (computer science)Software bugCASE <Informatik>Web crawlerCodeChemical equationPrice indexRight angleLink (knot theory)Streamlines, streaklines, and pathlinesComputer programObject (grammar)Arithmetic progressionGroup actionError messageFlow separationPoint (geometry)LogicType theoryoutputInformation securityWeightModule (mathematics)Slide ruleSoftware developerLine (geometry)QuicksortAdditionGame controllerAbstractionMechanism designSynchronizationMereologyData managementConcurrency (computer science)Complex (psychology)SpacetimeChaos (cosmogony)Directed graphMetric systemData structureVector potentialTheory of everything
Task (computing)Message passingDatabaseInformationMessage passingMultiplication signProgramming paradigmConcurrency (computer science)Order (biology)CoroutineTask (computing)Frame problemQuicksortoutputObject (grammar)BitCodeSystem callShift operatorSequenceFlow separationSinc functionLecture/ConferenceComputer animation
CodeService (economics)Computer-assisted translationSequenceSystem callInstance (computer science)Lecture/Conference
Loop (music)Interrupt <Informatik>InformationProcess (computing)DatabaseTask (computing)Metric systemTerm (mathematics)Lambda calculusUncertainty principleInterior (topology)Fault-tolerant systemQueue (abstract data type)Service (economics)Keyboard shortcutEvent horizonScheduling (computing)Boilerplate (text)Open setDatabaseMixed realityTerm (mathematics)QuicksortBlock (periodic table)Interrupt <Informatik>Interior (topology)Task (computing)Loop (music)Message passingComputer programLine (geometry)Standard deviationConnected spaceException handlingUniform boundedness principleCoroutineFunctional (mathematics)Metric systemState of matterCache (computing)CodeInstance (computer science)outputClosed setComputer animation
Task (computing)Fault-tolerant systemLoop (music)Term (mathematics)Lambda calculusConcurrency (computer science)InformationError messageInterior (topology)Event horizonTask (computing)Functional (mathematics)Inheritance (object-oriented programming)Line (geometry)CoroutineException handlingService (economics)Computer programTerm (mathematics)Loop (music)QuicksortMilitary baseScripting languageConnected spaceComputer fileDirected graphDemonCore dumpLecture/ConferenceMeeting/InterviewComputer animation
Exception handlingTask (computing)InformationDatabaseError messageContext awarenessSoftware maintenanceLoop (music)Message passingCoroutineThread (computing)LoginConcurrency (computer science)Event horizonComputer fileCodeStatistical hypothesis testingPatch (Unix)Exception handlingCoroutineOrder (biology)outputResultantInstance (computer science)QuicksortTask (computing)CodeLoop (music)Statistical hypothesis testingCausalityMixed realityPlug-in (computing)Level (video gaming)Directed graphHardy spaceThread (computing)Library (computing)Message passingPoint (geometry)SoftwareConnected spaceClient (computing)Distribution (mathematics)Patch (Unix)Functional (mathematics)Queue (abstract data type)Module (mathematics)Scheduling (computing)SynchronizationEvent horizonObject (grammar)Default (computer science)BitDatabaseSystem callExecution unitImperative programmingRevision controlObject-oriented programmingSocial classOnline helpMultiplication signUnit testingSound effectState of matterExistenceLecture/ConferenceComputer animation
Message passingException handlingLoop (music)Expected valueMathematicsSoftware maintenanceQueue (abstract data type)Term (mathematics)Lambda calculusEvent horizonStatistical hypothesis testingThread (computing)DemonExecution unitCodeTask (computing)Asynchronous Transfer ModeContext awarenessModule (mathematics)Computer fileError messageMilitary operationElectric currentRandom numberInformationDatabaseExt functorControl flowStatisticsStructural loadObject (grammar)Telephone number mappingSystem callLibrary (computing)Thread (computing)Functional (mathematics)Asynchronous Transfer ModeSystem callInstance (computer science)Loop (music)Letterpress printingTask (computing)Stack (abstract data type)Event horizonProfil (magazine)InformationBookmark (World Wide Web)NumberCoroutineCodeComputer programException handling2 (number)Context awarenessHoaxStatistical hypothesis testingExpected valuePoint (geometry)Process (computing)Product (business)Multiplication signBitQuicksortBefehlsprozessorDefault (computer science)Pattern languageSound effectPatch (Unix)Run time (program lifecycle phase)Scheduling (computing)Frame problemINTEGRALLevel (video gaming)Standard deviationDistribution (mathematics)Unit testingSource codeOnline helpInheritance (object-oriented programming)AdditionMessage passingScripting languageSynchronizationAreaBlock (periodic table)outputSoftware bugStatisticsInternet service providerDebuggerDirected graphComputer animation
Interior (topology)Graphical user interface19 (number)Local GroupFunction (mathematics)Profil (magazine)Network topologyScripting languageSheaf (mathematics)Graph coloringInformationSystem callRight angleLecture/ConferenceComputer animation
Rule of inferenceDreizehnCellular automatonSurjective functionMenu (computing)MIDIGeometryDuality (mathematics)PermianVacuumMach's principleOptical disc driveGraphical user interfaceTwin primeHill differential equationExecution unitCache (computing)Online helpGraph (mathematics)System callLevel (video gaming)Greatest elementView (database)Multiplication signScripting languageGroup actionGraph coloringVisualization (computer graphics)Service (economics)Computer animation
Menu (computing)CodeInterior (topology)LoginExecution unitFunction (mathematics)Line (geometry)Content (media)User profileKey (cryptography)Multiplication signPoint (geometry)Cache (computing)2 (number)CoroutineAreaThread (computing)Scripting languageVector potentialLine (geometry)Library (computing)CodeLoginProfil (magazine)Process (computing)Order (biology)Functional (mathematics)Function (mathematics)Default (computer science)Service (economics)Event horizonStatistical hypothesis testingProduct (business)Scaling (geometry)LogarithmLoop (music)outputResultantGreen's functionComputer animation
System callCodeServer (computing)Multiplication signProfil (magazine)Service (economics)AreaCASE <Informatik>Statistical hypothesis testingBlogCodeDifferent (Kate Ryan album)Function (mathematics)Office suiteLine (geometry)BitLink (knot theory)Web crawlerVisualization (computer graphics)Product (business)outputSlide ruleCodecSource codeComputer animation
Transcript: English(auto-generated)
Good morning. Well, are we doing okay? Not that hungover, I hope. I think it's probably the most expensive hangover I've had. Been a while. Anyways, my name is Lynn Root. I am a staff engineer at Spotify. I've been at Spotify for nearly six years.
But since the beginning of the year, I've been working on building machine learning infrastructure for really smart people that do digital signal processing and need to productionize that. It's pretty fun.
So if anyone here uses Apache Beam or does streaming with data pipelines, I'd love to chat afterwards. I'm also Spotify's FOSS evangelist. I help a lot of teams with their projects and tools to get them open source under the Spotify GitHub organization.
And lastly, I am one of the global leaders of PyLadies, which is a mentorship group for women and friends to help increase diversity in the Python community. I brought a lot of stickers with me. So if you want a PyLadies sticker, come find me. One random clap. Thank you.
All right, so this is the agenda for today. It doesn't look it, but it's a bit jam-packed. I'm gonna be covering some graceful shutdowns, exception handling, and threading, along with testing, debugging, and profiling. I'll use probably most of my time, if not all. So I won't take any questions, but we can go outside afterwards and talk.
And this presentation is very code heavy, but don't worry. The slides, for everything, the full write-up and the code is at this link, and I'll show it again at the end. Okay, so Async.io, right? The concurrent Python programmers dream.
It's the answer to everyone's asynchronous prayers, right? The Async.io module has various layers of abstraction, allowing developers as much control as they need and are comfortable with. Simple hello world-like examples do show how it can be so simple.
But it's easy to get lulled into a false sense of security. These sort of hello world examples aren't that helpful. We're led to believe that we can do a lot with the structured async and await API layer. Some tutorials, while great for the developer to get their toes wet,
try to illustrate real-world examples, but are actually just beefed up hello world examples. Some tutorials even misuse the Async.io interface, allowing one to easily fall into the depths of callback hell. And there are some tutorials that will get you easily up and running with Async.io, but then
you may not realize that it's not exactly correct or not what you want, or it only gets you part of the way there. And while some tutorials and walkthroughs do a lot to improve upon the basic, like, hello world use case, maybe it's just still just a web crawler.
And I'm not sure about others, but I'm not really building web crawlers at Spotify. I've built services that do need to make a lot of HTTP requests, sure, and they need to be non-blocking, but these services of mine, they also have to react to PubSub events to measure progress of actions initiated from those PubSub events,
handle any incomplete actions or other external errors, deal with PubSub message lease management, and measure service level indicators and send metrics. And we need to do this with non-Async.io friendly dependencies. So for me, my problem got difficult quickly.
So allow me to provide you a real-world example that actually comes from the real world. Has anyone heard of Netflix's Chaos Monkey? I see some hands. So a few years ago at Spotify, we built something similar, like a chaos-creating service that does
periodic hard restarts of our entire fleet of instances. And so we're gonna do the same here. We're gonna build a service called Mayhem Mandrill, we'll pun off Chaos Monkey, which we will listen for a PubSub message and restart a host based off of that message. And so as we build this service,
I'll point out best practices that I may or may not have realized when first using Async.io. And this will essentially become the type of resource that Passline would have wanted about like three years ago. And again, don't worry about the code in the slides. I'll have a link referred that refers to all the code at the end.
So we're gonna start with some foundational code. We're gonna write a simple publisher. So here's where we're gonna start. We have a while true loop. We have a unique ID for each message to publish to our queue. I want to highlight that we're not going to await the queue.put of a message. Async.io.createTask will actually schedule the coroutine on the loop without blocking the rest of our for loop.
The create task method does return a task object, but we can also use it as sort of a fire and forget mechanism. If we added the await here, everything after this and within the publish coroutine will be blocked.
This isn't an issue with our current setup, but it could be if we were to limit the size of our queue, then that await would be awaiting on space to free up in the queue. So we're just gonna stick with the Async.io.createTask. So we have a publisher coroutine function, and now we need a similar consumer.
So this consumer will consume the messages that we've published. It's sort of similar to the publisher. We have a while true loop and await on the queue for a message. But here we don't want to create a task of queue.get. It makes sense to block the rest of the coroutine on this because there isn't much to do if there are no messages to consume.
I want to highlight this again. We're only blocking within the scope of the consume coroutine. We're not blocking the actual event loop. So let's replace Async.io.sleep with a function that will restart a host.
I'm sure it looks like I'm just like pushing the simulation of IO work to the restart host function. But in doing so, I'm actually able to create a task out of it. So therefore we're not blocking on awaiting for more messages.
Perhaps we want to do more than one thing per message. For example, in addition to restarting a host, maybe we would like to store that message in a database for potential replaying later as well. So we'll make use of Async.io.createTask again for the save coroutine to schedule on the loop.
Basically chucking it over to the loop to execute when it can. And in this example, the two tasks of restarting and saving don't need to depend on one another. I'm completely sidestepping the potential concern or complexity of should we restart a host if we can't save to the database and vice versa.
But maybe you actually want your work to happen serially. You may not want to have concurrency for some asynchronous tasks. So for instance, maybe you restart hosts that have an uptime of more than seven days. So this is similar to like within banking, you should check the balance of an account before you actually debit it.
So needing code to be serial or sequential, to have steps or dependencies, it doesn't mean that you can't be asynchronous. The awaitLastRestart date will yield to the loop, but that doesn't mean that restart host will be the next thing on that loop that the loop executes.
It will just allow other things to happen outside of this coroutine. So with that in mind, I will just put all this message-related logic into a separate coroutine so we don't block the consumption of messages. Saving a message shouldn't block a restart host if needed, so we'll
return to it being a task. And then we're just going to remove the uptime check and just restart hosts indiscriminately because why not? Well, so we pulled the message from the queue and fanned out work based off of that message. And now we need to perform like finalization work on that message.
So often with PubSub technologies, if you don't acknowledge a message within a predefined time frame, it will get redelivered. So for a finalization task, we should acknowledge the message so it isn't redelivered. We currently have two separate tasks, save and restart host, and we want to make sure that they are both done
before the message is cleaned up. Now we could go back to the sequential awaits, since that's a very direct way of manipulating the ordering. But we can also use callbacks on a completed task.
What we therefore want is somehow to have a task that wraps around the two coroutines of like save and restart host, since we have to wait for both to finish before cleaning up can happen. We can make use of asyncio.gather, which returns a future-like object. To which we can attach the callback of cleanup via addDoneCallback.
We can now just await that future in order to kick off the save and restart host coroutines. And then obviously the callback of cleanup will be called once those two are done. And so visualizing this a bit, you can see that both the save and the restart coroutine are complete, and then the cleanup
will be called to signify that the message is actually completely done. And we've also maintained appropriate concurrency. I don't know about you, but I have an allergy to callbacks. So perhaps we also need cleanup to be non-blocking. And so then we can just actually await clean after awaiting gather itself.
Which I think is a much cleaner looking. So to quickly review, asyncio is pretty easy to use, but being easy to use doesn't automatically mean that you're using it correctly.
You can't just throw around async and await keywords around blocking code. Sort of a shift in a mental paradigm. Both within needing to think about what work can be farmed out and let it do its thing, and then what dependencies are there and where code might still need to be sequential.
But having your steps within your code, like having first A, then B, and then C, may seem like it's blocking when it's not. Sequential code can still be asynchronous. For instance, I might have to call customer service for something and wait to be taken off hold for them.
But while I wait, I can put the phone on speaker and like pet my super needy cat. So I might be single-threaded as a person, but I can still like multitask. So often you'll want your service to gracefully shut down if it receives a signal of some sort, like cleaning up open database connections,
stop consuming new messages, finish responding to current requests while not accepting any new requests, that kind of thing. So if we happen to restart an instance of our own service, we should probably clean up the mess that we've made before exiting completely out. So here's some typical boilerplate code to get the service running.
We have a queue instance, setting up the loop, scheduling the publish and the consume tasks, and then starting the event loop. Maybe you even catch the commonly known like keyword interrupt exception. So if we run it as is and then send it the
int term or int single, we see that we do get to that accept and that finally block with these two log lines. However, if we send our program a signal other than SIGINT like SIGTERM, we see that we don't actually reach that finally clause
where we're logging that we're closing up the loop. It should also be pointed out that even if you're to only ever expect a SIGINT signal or a keyword interrupt signal, it could happen outside of the catching of exception, potentially causing the service to end up in an incomplete or otherwise unknown state.
So instead of catching keyword interrupt, we can use a signal handler on the loop itself. So first we define our shutdown behavior, a coroutine that will be responsible for doing all of our unnecessary shutdown tasks. Here I'm just like closing database connections, returning messages as not act so that they can be delivered and not dropped.
And then cleaning up or collecting all outstanding tasks except for the shutdown task itself and then canceling them. Now we don't necessarily need to cancel pending tasks. We could just collect them and allow them to finish. We may also want to take this opportunity to flush any collected metrics so that they're not lost.
So then let's add our shutdown coroutine function to the event loop. So the first thing we do is set up our loop first and then add our signal handler with the desired signals that we want to respond to and then remove the keyboard interrupt.
So then running this again, we actually do see that we get to that finally clause. Now you might be wondering which signals to react to and apparently there is no standard.
Basically, you should be aware of how you're running your service and handle accordingly. It seems like it could get particularly messy with conflicting signals and when adding Docker to the mix. Another misleading API in AsyncIO is the shield method.
And so the docs say that it's meant to shield a future from cancellation. But if you have a coroutine that must not be canceled during shutdown, AsyncIO.shield will not help you. So this is because the task in AsyncIO.shield creates the task that gets created gets included in AsyncIO.allTasks and therefore receives the cancellation signal just like the rest of the tasks.
So to help illustrate this a little bit, I have a simple Async function with a long sleep that then finally logs a line saying done. And we had to shield it from cancellation. So as per the docs, we have a parent coroutine shielding the coroutine to get canceled.
And so the task that is running, the parent task that is running the shielded coroutine, if that's canceled, it shouldn't affect the shielded coroutine. And so then we add our parent tasks to our main sort of function and then when we're running this and interrupting it after a second,
we see that we don't actually get to the done log line and that it's immediately canceled. Even if our shutdown coroutine function skips canceling the shielded coroutine or even the parent tasks, it still ends up getting canceled.
So basically we don't really have any nurseries in AsyncIO core to clean ourselves up. It is upon us to be responsible and close up connections and files that were open and respond to outstanding requests, basically leave things how we found them. Doing our cleanup in the finally clause isn't enough though, since a signal could be sent outside of the try-accept clause.
So as we construct our loop, we should tell how the loop should be deconstructed as soon as possible in the program to ensure that all of our bases are covered. We also want to be aware of when our program could be shut down,
which is closely tied to how we run our program. If it's a manual script, then SIGINT is fine. But if it's like a demonized, within a demonized Docker container, then SIGTERM might be more appropriate. And finally if you use SHIELD in a service that has a signal handler, you should be
aware of its funky behavior. So you might have noticed that we're not doing any handling of exceptions so far. Let's revisit our coroutine, our restart host coroutine, and we're gonna add like a super realistic exception.
So running this, we do see that the super serious exception is raised, but we actually get a task exception was never retrieved. This is because we don't properly handle the result of the task when it raises.
What we can do is define sort of a global exception handler. This is super simple or super like simplified. And then attach it to our loop, similar to signal handling. And so if we were to rerun this, we do see that that logging of exception that we are actually handling that.
But perhaps you want to treat exceptions more specifically from certain tasks. It's good to have an exception handling on a global level, but also on a more specific level.
So let's revisit our handle message coroutine. Say for instance, you're fine with just logging when a saved message fails, but you want to knack or not acknowledge the pubsub message and put it back to the queue if, to retry the whole message if restart fails.
So since asyncio.gather returns results, we can add more of a fine-grain exception handler to this. And handle the results as we wish. I want to highlight that setting return exceptions to true is super imperative here.
Otherwise exceptions will be handled by the default handler that is set. So be sure that there's some sort of exception handling either globally, individually, or a mix. Most probably a mix. Otherwise exceptions may go unnoticed or cause weird behavior.
I also personally like using asyncio.gather because the order of returned results are deterministic, but it's easy to get tripped up with it. By default it will swallow exceptions, but happily continue working on other tasks that were given. If the exception is never returned, then weird behavior can happen.
All right, sometimes you need to work with threads. I'm sorry. Just like a threaded pubsub client. And you might want to consume a message on one thread, and then handle the message within a coroutine on the main event loop.
So let's first attempt to use the asyncio API that we're familiar with, and update our synchronous callback function with creating a task via asyncio.createTask from the handle message coroutine that we defined earlier.
And then we call our a threaded consume function via a thread pool executor. But we don't get very far. At this point, we're in another thread, and there's no loop running on that thread. It's only on the main thread.
So if we take what we have right now and update our function to use the main event loop, we actually do get it working, or it looks like it worked. But this is deceptive. We're sort of lucky that it works. Some folks can probably already see that we're not being thread-safe.
So instead of loop.createTask, we should be using asyncio's thread-safe API, the run coroutine thread-safe. It can be difficult to tell when you're not being thread-safe, particularly when it looks like it works, as it did in our previous attempt.
But in a bit, I'll actually show you how to easily surface when there's an issue of thread safety. So in my opinion, it isn't too difficult to work with threaded code in asyncio, particularly or similarly to how we work with non-async code in the async world. We just make use of the thread pool executor,
which essentially creates an awaitable for us. However, it's difficult to work with both threads and asyncio when there's some sort of shared state between a thread and the main loop. And so if you must, then use the thread-safe APIs that asyncio gives you. And it took me an embarrassingly long time to realize that this existed.
Alright, now on to testing. So for a more simplistic starting point, we're going to test asyncio code before we introduce threading. So to start simple, we're going to test the save coroutine using pytest.
Since save is a coroutine, our tests will need to run the coroutine in the event loop. So like so, which Python 3.7 makes it easy for us with asyncio.run. Or with older Python 3 versions, we have to construct and deconstruct the loop yourself. But there is a better way.
So there's a pytest plugin called pytest-asyncio that will essentially do that hard work for you. And then all you need to do is mark the particular tasks that are testing async code with a decorator from that plugin. As well as make it so that the test function itself is actually a coroutine function.
So now when running the test, the plugin will essentially do the work for you of constructing and deconstructing the event loop. The pytest-asyncio plugin can get you pretty far, but it doesn't help you when you need to mock out coroutines.
So for instance, our save coroutine function calls another coroutine function, the asyncio.sleep, or maybe it's an actual call to the database. You don't actually want to wait for asyncio.sleep to complete, or you don't actually want a connection to the database to happen.
So both the unittest.mock and the pytest.mock libraries do not support asynchronous mocks, and so we're gonna have to work around this a bit. So first, we do make use of the pytest.mock library, and we create a pytest fixture that is essentially returning a function. The outer function itself returns inner function as a fixture that we'll use in our tests.
And then the inner function is basically creating and returning a mock object that we'll use in our test, as well as a stub coroutine that the mock will end up calling. It also patches, if needed, the coroutine function with the stub, so we can avoid network calls, sleeps, et cetera.
So then we're gonna create another pytest fixture that will use the create-corout-mock fixture to mock and patch asyncio.sleep. We don't need the stub coroutine that it returns, so we can just essentially throw that away.
And then we're gonna use the mock-sleep fixture in our test save function. So what we've done here basically is patched asyncio.sleep within our mayhem module with the stub coroutine function. Then we just assert that the mocked asyncio.sleep object is called when mayhem.save is called.
Because now we have a mock object instead of an actual coroutine, we can do anything that is supported with your standard mock objects, like assert called once with setting return values and side effects. So it's pretty simple, I guess, ish.
But maybe you want to test code that calls create task. And we can't simply use the create-corout-mock fixture however with this. For instance, let's revisit our consume coroutine, which creates and schedules a task on the loop out of handle message.
We first need to create a couple of fixtures for the queue that gets passed in. First we'll mock and patch asyncio queue class in our module. And then we'll use that mock queue fixture in another one, a mock get fixture.
So unlike our mock-sleep fixture, we will use the stub coroutine that create-corout-mock returns, and then set it to the mock queue get method. So here's our test consume function, where we're giving our newly created fixtures. So let's try to use the create-corout-mock to mock and patch the call to handle message via a coroutine via create-task.
Note that we're setting the mock get side effect to one real value and one exception to make sure that we're not permanently stuck in that while loop.
And finally we want to assert that the mock for handle message has been called after our consume has been run. So when running this, we see that mock handle message is not actually called, like we're expecting. This is because the scheduled tasks are only scheduled at this point and pending, and we sort of need to nudge them along.
So we can do this by collecting all running tasks, but it's not the test itself. And running them explicitly. This is a bit clunky, I know. If you use the unit test from standard library, there is a package called AsyncTest that handles this better and exhausts the scheduled tasks for you.
So I hear that you're wanting 100% test coverage, which is great, but it might be difficult for our main function. We need to set up signal handling and exception handling, and we need to create a few tasks and then start and close the loop.
We can't exactly use the event loop fixture that PyTest AsyncIO library gives us, as it is now. We need to sort of manipulate the event loop that PyTest AsyncIO will use when it injects it into the tested code.
So what we do is update the testing event loop. We can override the close behavior. If we close the loop during the test, we will lose access to the exception and signal handlers that we set up within the main function. So we actually want to close the loop after we're done with the test.
And then we can also use the mock to assert that our main function actually closes the loop. So now we'll write a test main function that actually borders on an integration or functional test. We want to make sure that in addition to expecting calls to publish and consume, that shutdown gets called when expected.
But we can't exactly mock out shutdown with a create-coro-mock, since it will patch it with just another coroutine, and therefore run the mocked coroutine each time it receives a signal, rather than canceling the tasks and stopping the loop. So instead we're going to mock out what's called with the shutdown coroutine, the asyncio.gather.
And then here I'm starting a thread that will actually send the process a signal after a tenth of a second. So after starting the thread, we then call main function that we want to test.
So looking at the second half of the test, we can assert that the loop setup is the way that we expected, as well as our mocked out functions having been called. Returning to the first half of the test, here you might want to parameterize the test function itself, to test not just the SIGINT signal, but all the signals that we're expecting,
and probably test with a signal that you're not expecting, like a SIG user or something like that. So basically, the TLDR is to use pytest-asyncio.
There's also a package called AsyncTest for a unit test that functions similarly to pytest-asyncio. And the bonus is that it will exhaust the task schedule in a loop for you automatically, as well as provide coroutine mocks. So alright, we're decent programmers and have code coverage, but sometimes shit breaks.
We need to figure out what's going on. And we can use everyone's favorite debugger, printing, even if you won't admit it. So if you have one tiny little thing to debug, you can use the print stack method on the instance.
So when you run this, it will print the stack for you for each running task. You can also increase the number of frames that are printed as well. But you probably will actually need to use AsyncIO's debug mode within the standard library itself.
So along with setting our logging level to debug, we can easily turn on AsyncIO's debug mode while we run our script. If we didn't have proper exception handling setup, we'd get information about which task is affected, and also what is called a source traceback that gives us more context in addition to our normal traceback.
So without debug mode, we get told that there's an exception that's not properly handled, but with debug mode, it gives us additional clues as to what might be going on and where they might be. Another very handy thing that I wish I knew of a few years ago,
is that if you have threads and event loop interacting with each other, debug mode will surface not being thread safe as a runtime error and just quit out. Super helpful. One also really nice feature about AsyncIO's debug mode is how it kind of acts like a tiny profiler
that will log async calls that are slower than 100 milliseconds. So we can fake a coroutine by putting a blocking call with time.sleep. And so when we run the script, we can see that it will surface slow to finish tasks,
potentially highlighting an unnecessarily blocking call. The default for what's considered slow is 100 milliseconds, but that is easily configurable too. You can set it directly on the loop with slow callback duration in seconds on the loop directly itself.
So much like some people's testing philosophies, sometimes we want to debug in production because why not? But usually you don't want full on debug mode while in production. So there's this lightweight package called aio-debug that will log slow callbacks for you.
And it also comes with the ability to report delayed calls to StatsD if you use StatsD. And so that's all what this library does, so it's super lightweight and quite handy.
So you can easily print the stack of a task if needed, but you do get a lot with AsyncIO's debug mode. It gives more information around unhandled exceptions, when you're not being thread safe, and when there are slow to complete tasks. And if you want to understand slow tasks while in production, aio-debug is a lightweight library that essentially does only that.
As we saw with AsyncIO debug mode, the event loop can already track coroutines that take up too much CPU time to execute, but it might be hard to tell when it's an anomaly or what is a pattern. So a lot of folks might first reach for a C profile when trying to understand performance, and we can try that here too.
But there isn't that much to glean from this. The top item here is essentially the event loop itself. And even if we looked at our own code specifically, you can kind of get a picture of what's going on, but it doesn't immediately surface any bottlenecks or particularly slow areas.
Of course our main function would have the most time cumulatively since that's where the event loop is ran, but nothing is immediately obvious. So I recently discovered kcache-grind, even though it's been around for a while, but you can use it with Python.
And to do so, we first save the output of C profile, and then use this package called pyprof to call tree, which takes the output of C profile and converts the data into something that kcache-grind can understand. And so when the script is ran, you're met with this UI.
And it's okay if you can't make anything out, but basically on the left-hand side, there's profiling data that we would otherwise see from the output of C profile. And then the two sections on the right shows information about callers and callees, including a call graph, which is on the bottom, and a map of the callees, which is on the top.
If we limit our view to only our script and start clicking around, we can start to get an idea of where time is most spent. The visualization groups modules together by color. And so when I first ran the service, I noticed there's a lot of blue on the callee map that's on the top there.
And if you click into that blue, it's actually logging that's taking a lot of time. Let me go back to that point in a second. So kcache-grind allows us to get a broad picture of what's going on and
gives us visual clues of where to look for potential areas of unnecessary time spent. But then there's the line profiler package, and we can use it to hone in on areas of our code that we're suspicious of. So after installing the profiler, you can add a profile decorator where you want to profile.
Here I'm just decorating the save care routine for now. The line profiler library comes with a tool called current prof that we will invoke our script with, and then we render the output of line profiler itself. And this is a line-by-line assessment of our decorated code.
The total time spent in this function is just over two milliseconds, and the majority of that time is spent in logging. Now, if only there was something we can do about that. Coincidentally, someone has done something about it. There's a package called aaologger that allows for non-blocking logging.
So if we switch out our default logger with aaologger and rerun line profiler, we can see that our total time spent in that function has halved, as well as time spent while logging. So certainly these are minuscule improvements that we're doing here. There's probably a lot more that we can do, but if you imagine this sort of on a larger scale, we could probably save a lot of time.
And also, as I see it, if we have an event loop, let's try and take full advantage of it. So we've profiled using C profiler and line profiler, but we had to stop the service in order to look at results. So perhaps you'd like a live profiler along with your production testing and debugging, if you do that.
There's a package called profiling that provides an interactive UI, and it supports asyncio as well as threads and greenlets. Granted, you can't attach to an already running process with this particular tool. You'll need to launch a service with it.
And when you do, you get this text-based UI that regularly updates. You can drill down, you can pause it while inspecting something, and restart it. You're able to save performance data to view it with this UI at a later time, and also the tool provides a server which you can then remotely connect to it.
So the TLDR of profiling, there isn't much difference with profiling asyncio code from non-asyncio code. It can be a little confusing, though, with looking at C profile output. To get an initial picture of your service's performance, using C profiler with kcache-grind can help surface areas to investigate.
Without that visualization, we saw that it can be a bit difficult to see the hotspots. Once you have an idea of those hotspot areas, you can use a line profiler to get a line-by-line performance data. And finally, if you want to profile in production, I suggest taking a look at the profiling package.
So in essence, this talk is something that I would have liked a year ago, speaking to Passelin here. But I'm hoping that there are others who can benefit from a use case that is not a web crawler. This is the link to a full blog post, the slides, and all the code.
And I must give my obligatory spiel. Spotify is hiring for various engineering and data science positions at all of our engineering offices. Stockholm, London, New York, and Boston. So if you're interested, you can come talk to me. And thank you very much.