We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Maggy: Asynchronous distributed hyperparameter optimization based on Apache Spark

00:00

Formal Metadata

Title
Maggy: Asynchronous distributed hyperparameter optimization based on Apache Spark
Subtitle
Asynchronous algorithms on a bulk-synchronous system
Title of Series
Number of Parts
490
Author
License
CC Attribution 2.0 Belgium:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal purpose as long as the work is attributed to the author in the manner specified by the author or licensor.
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Maggy is an open-source framework built on Apache Spark, for asynchronous parallel execution of trials for machine learning experiments. In this talk, we will present our work to tackle search as a general purpose method efficiently with Maggy, focusing on hyperparameter optimization. We show that an asynchronous system enables state-of-the-art optimization algorithms and allows extensive early stopping in order to increase the number of trials that can be performed in a given period of time on a fixed amount of resources.
Physical systemAlgorithmOpen setMathematical optimizationMachine learningOpen sourceComputing platformScalabilityMathematical optimizationSoftware engineeringGoodness of fitSoftwareComputer animation
Scale (map)Scaling (geometry)Statement (computer science)Endliche ModelltheorieBitNeuroinformatikReinforcement learningComputer animation
Scale (map)Error messageMathematical optimizationAlgorithmEndliche ModelltheorieDistribution (mathematics)SynchronizationScaling (geometry)MereologyWave packetComputer animation
Loop (music)Interior (topology)StochasticMetric systemLoop (music)BitStrategy gameBayesian networkMathematical optimizationGreedy algorithmGradient descentInterior (topology)InformationRandomizationDistribution (mathematics)GradientBlack boxProgram flowchart
Loop (music)Interior (topology)Distribution (mathematics)Function (mathematics)Data modelCompilerSpacetimeMathematical optimizationGame controllerFunctional (mathematics)Endliche ModelltheorieCodeScaling (geometry)Connectivity (graph theory)MathematicsCodeResultantNormal (geometry)Parallel portAbstractionParametrische ErregungGenerating functionProcess (computing)Observational studyVirtual machineWave packetDistribution (mathematics)Insertion lossBlack boxInformationGradientComputer animation
Game controllerParallel portLoop (music)Interior (topology)SpacetimeObservational studyQueue (abstract data type)Metric systemAlgorithmWave packetMultiplication signDecision theoryInformationBlack boxGame controllerComputer animation
Computing platformSoftware frameworkParallel portRandom numberMathematical optimizationPerturbation theoryComputing platformProjective planeConnectivity (graph theory)AlgorithmSoftware frameworkVirtual machineRandomizationComputer animation
Task (computing)Electric generatorEndliche ModelltheorieBayes-EntscheidungstheorieLevel (video gaming)Wave packetComputer animation
Rule of inferenceMedianPredictionAlgorithmMusical ensembleMathematical optimizationRule of inferenceArtificial neural networkLevel (video gaming)MedianConnectivity (graph theory)Task (computing)Mathematical optimizationCurveSynchronizationObservational studyInformationComputer animation
Task (computing)Device driverDynamic random-access memoryData modelAsynchronous Transfer ModeDirection (geometry)Inclusion mapMetric systemCurve fittingGlass floatIterationMathematical optimizationObservational studyComputing platformEndliche ModelltheorieMathematical optimizationDevice driverDifferent (Kate Ryan album)Endliche ModelltheorieIterationDiscrete groupRight angleNumberMorley's categoricity theoremSocial classAlgorithmServer (computing)Multiplication signPoint (geometry)SpacetimePairwise comparisonFunctional (mathematics)Parameter (computer programming)Decision theoryLoginSoftware frameworkFeedbackOcean currentQueue (abstract data type)Wave packetConfiguration spaceLaptopBayesian networkClient (computing)Software developerDemo (music)Connected spaceTask (computing)Particle systemMedianRepresentational state transferObservational studyTelecommunicationConnectivity (graph theory)Black boxLogicMessage passingComputer animation
AlgorithmDistribution (mathematics)MathematicsCodeDemo (music)Functional (mathematics)Wave packetContext awarenessTrailData managementGene clusterParallel portDistribution (mathematics)System callPairwise comparisonInternet service providerAreaComputer animation
Cartesian coordinate systemSemiconductor memoryBitNormal distributionEndliche ModelltheorieVirtual machineClient (computing)Arithmetic progressionCASE <Informatik>Local ringStapeldateiTrailWave packetModule (mathematics)Latent heatLetterpress printing2 (number)Parameter (computer programming)Broadcasting (networking)ConvolutionConnected spaceProcess (computing)Computer animation
Metric system1 (number)LoginComputer animation
Execution unitDirectory serviceLaptopIntegrated development environmentComputer animation
Normed vector space2 (number)Computer animation
Expert systemDew pointSubsetParallel portWave packetMereologySlide ruleGradient descentNoise (electronics)Set (mathematics)GradientFunctional (mathematics)Endliche ModelltheorieRing (mathematics)CodeResultantSoftware frameworkSource codeXMLProgram flowchart
RandomizationEndliche ModelltheorieComplex (psychology)Library (computing)LaptopAlgorithmState of matterRevision controlGraphics processing unitDistribution (mathematics)Mathematical optimizationCuboid2 (number)Source code
Graphics processing unitOpen setComputing platformComputer animation
Point cloudFacebookOpen source
Transcript: English(auto-generated)
Okay, we'll get started with the next talk. Moritz is gonna talk about the Maggi software. Yeah, good morning. My name is Moritz. I'm a software engineer at Logical Clocks, a company developing a open source platform
for scale-out machine learning. And today I'm gonna talk about asynchronous hyperparameter optimization on Spark and on Hopf's works. That's how the platform is called. To start out, a bit of a controversial statement from last year in an essay of Rich Sutton, also known as the father of reinforcement learning.
And basically what he says is that you shouldn't add human intelligence to your model development, but instead the future of AI are methods that scale with available compute. And two of those methods that seem to scale are search and learning. So you heard Spark, so a lot of people will say,
well, Spark scales with available compute, so Spark is the answer. But it's actually not that easy because Spark is a bulk synchronous system, and to make it really efficient, you would need asynchronous. And today I'm gonna show you how you can still do it on Spark. If we look at distribution and deep learning,
we don't wanna lose our generalization error, so what methods do we actually have available to improve the generalization error? We can use better regularization, hyperparameter optimization, larger training data sets, better models, or better optimization algorithms,
but actually where we can apply distribution is in these three parts. And when we look at it, we usually speak about two loops. The inner loop, which is the stochastic gradient descent training, and we can apply distribution to that fairly efficiently,
and also in Spark, we can use a bunch of workers and use the all-reduce strategy to scale the inner loop out. A bit more complicated is actually the outer loop, which we call the search loop, where we have to basically generate trials
that we think are promising, and we actually need to train them to return a single metric. It behaves like a black box. We don't have any gradient information available to optimize this loop, so we have to rely on much less efficient methods and make more greedy methods like random search or Bayesian optimization.
The inner loop is what we call the learning, and the outer loop is the search. But in reality, if you look at it, if you look at your process of building a machine learning model, this means if you wanna deploy that efficiently at scale, these two loops,
that means rewriting your training code, actually a lot of times, because usually you start out exploring and designing your model on maybe just a Python kernel, single host, but then when you do experiments, you wanna scale it out. Maybe you wanna use Spark, and then finally you wanna do data parallel training or model parallel training on a lot of data.
But the problem is that usually you have to be, iterating, so you might notice something in your experiment, so you have to go back to your single host environment, change the code, then rewrite your codes to actually fit your distribution context,
which results in a lot of code changes that you need to track and keep a history of to keep your experiments actually reproducible and your models. So how we wanna do it is with an abstraction that we call the oblivious training function. We're trying to abstract out all the dependencies
so that we have a single training function where we want the user to put all his training code inside, just like on a normal single host Python environment, and then we will do the parametrization for you. For example, with model generator functions
or hyperparameters, there's different ways to do it. So going back to the outer loop, if you look at it sequentially, we have a learning black box, we don't have any gradient information which returns a single metric, and that's the metric we want to optimize, a loss or accuracy over some search space.
And ideally, this is not just applying to black box optimization, but all kinds of search problems, and we can replace that with a global controller, so we can actually also use it for, for example, ablation studies where we leave out certain components of the model
to see what actually is the contribution of that component of the model to my final outcome of the model or final quality of the model. So how can we scale this? Actually, those learning black boxes are independent, so we can add multiple workers and run them in parallel,
but the problem is, how do we actually schedule the trials on there? That's why we need to add some kind of queue, so we would like to produce trials ahead of time, and every time a worker gets idle, he can just take a trial from the queue, which is some kind of hyperparameter combination, and start training it.
And ideally, we would like to keep the global information over all the learning curves that we are training available in this controller to make smart decisions on which trials to process next. So, but then the question is, which algorithm do we actually use for search?
How do we monitor progress? How do we actually collect the metrics and aggregate the results? And what about fault tolerance? If we lose one of the workers, we don't wanna lose days of experimentation progress, so how do we take that into account?
So we said this should be managed with platform support, and that's why we started the project called Maggie. It's a flexible framework to run asynchronous parallel hyperparameter, or not only hyperparameter experiments, but any kind of trials for machine learning, and it supports a bunch of algorithms
like asynchronous successive halving, random search, grid search, leave one component out, ablation, by using optimization, and we're working on more. So synchronous search is actually quite straightforward to implement on Spark. Say you have random search, you generate 10 trials,
you start 10 tasks, each task's gonna train a single trial, you wait until all of them finish, and then afterwards you might generate more, or you stop the experiment there. The problem is that once you add early stopping,
or asynchronous algorithms, you don't make efficient use of your resources, because hyperparameters greatly influence the training time, so some of the models will finish early, some will take longer, but in Spark you will have to wait until you reach the end of your stage to start actually a new stage,
and when you do directed search, for example by Bayesian optimization, you would need to wait until all those tasks in the first stage finish to update your Bayesian model and generate new trials, and we don't want that. And there are a bunch of more methods, I already said it, early stopping,
you can do early stopping based on the single learning curve, but actually more efficient, or better early stopping rules are based on all the information you have available, all the learning curves. For example, median stopping rule, or you can predict the performance curve, the final outcome, or so-called multi-fidelity methods
like successive halving, where you give different budgets to each of your trials, depending on how likely it is that it's a good trial, so you try to see, oh this is a good trial, so I want to train it more epochs to be really sure that it's a good trial. For example, as you see here,
this is successive halving, you start with eight trials, but you only continue with the best four, best half with the training, until the end you have only one trial left. I already said it briefly, you can also use this for ablation studies by just replacing the optimizer within a plater who leaves out certain components of your neural network
at a time, for example feature ablation, or leaving out components of your network, like layers, for example. So yeah, the challenge is how can we fit this into the Spark, bulk synchronous execution model, because we have a mismatch between what we define as a trial,
and the task and stages in Spark. And how do we do that? We actually block the executors in Spark with long-running tasks, and allow for communication between the drivers and those tasks by setting up a RPC framework in between.
So this way, we can, basically once a task starts, it registers with the server that's running on the Spark driver, asks say, here, I'm here, I'm available to do work, and the driver or the optimizer can send a trial configuration to that task,
which then starts training the model with those parameters. And at the same time, we can do heartbeating, so once these trials started training, they heartbeat back the current training accuracy or loss, so we can do global early stopping decisions.
For example, say one trial is performing worse than the median of all other trials at comparable points in time in the training, then we say, okay, stop this, start a new one right away, this is not worth my time. How does it look like inside of Maggie?
The first thing we do when the Spark driver starts is we start a little RPC server, and basically this RPC server can receive messages from the clients which are started in the Spark tasks, and every time it receives a message,
it puts it in a queue for the optimizer, the optimizer's gonna modify some shared data, and every time the server receives a heartbeat from the current trial that's running, it will look up the shared data and see, should I stop this trial or should I continue? And this has a few advantages,
because we're running this on Hopsworks, so we have a REST API available, and we can register this Maggie server with Hopsworks, and we can start another client in the Jupyter notebook that can connect to the server later and get us, for example, logs or the current metrics, so we get actually live feedback
inside the Jupyter notebook, how my experiment is performing at the moment, which is very nice, because so far you always had to go to the Spark UI, open the logs of your executors and check what's going on. Yes, how does it look like from the user API?
It's quite straightforward. You have to define a search space, which has a few different hyperparameter categories, integers, doubles, or categorical or discrete hyperparameters,
and then, like I said, we want you to define one single training function that contains all your logic for your model, giving the parameters as arguments to the function, and the only thing we need you to do is adding a callback, either to your Keras or TensorFlow model,
which creates the connection between Maggie and actually your user code, so you have to tell it which metric to actually broadcast back or heartbeat back to the Maggie driver to optimize, and you should return the same final, accurate metric,
and then, finally, last but not least, you can lagom your experiment. Lagom is Swedish, and it means just the right amount, so not too much, not too little. You give it your search space, the optimizer you want to use, and the number of trials, and there's a bunch of more parameters you can set. You can also implement your own custom optimizers.
It's straightforward, just implementing an abstract class with three methods, and you can also do your own early stopping components if you want to. From the Ablation API, currently it supports Keras, the sequential API, because you can give labels to your layers
and components in your model, and then you can define, with Maggie, you can define an ablation study where you tell it which components of your model to ablate one at a time, basically, or which features, and then the same thing again. You define a single training function to run it with.
Some results. Actually, what you should observe here is two things. On the left, we have a model which is actually quite stable, so the hyperparameters don't change the accuracy that much of the model, and as we can see, the asynchronous algorithm, Asha,
is not that much better, but actually it finds its best configuration already after half the time, and then we have half of the time of the rest of the experiment to actually train the model with more data, for example. But the real benefit comes in when you have a model that's sensitive to hyperparameters,
and we've seen it a lot with GANs, for example. Where early stopping plays a role because a lot of the models actually perform really bad depending on the hyperparameters, and with Maggie, you can basically have a lot more exploration in your search space
because you can perform many more trials in the same time. I'll do a demo in a second, just some conclusions. Yeah, you should avoid iterative hyperparameter optimization and automate your experiments because, yeah, black box optimization is hard, iterative development is slow.
There are actually state-of-the-art algorithms that can be deployed asynchronously, such as Bayesian optimization, particle swarm optimization, Asha, the asynchronous successive halving, and yeah, you can use Maggie to do that.
And it saves you resources and helps you with the ablation API to better understand your models, especially when they're sensitive or sensible to hyperparameters. What's next? We want to develop more algorithms,
and we really want to provide distribution transparency so that once you define your training function, you do not have to rewrite it to run it in different contexts. If you run it on your local machine or if you run it on Spark to scale out the experiments or if you do data parallel training,
this shouldn't matter. Then a big problem in this area is actually comparability and reusability of experiments. So maybe we can work on making experiments more comparable if you can run them on Spark clusters with a certain configuration,
they should always reproduce the same outcomes. And then something else we're working on is implicit provenance, as you will see in a second. Basically, because we have this training function that we call for the user, so it's not the user anymore that calls his own code, we can do a lot of stuff around it.
For example, tracking artifacts that you're producing and tracking the code changes instead of you having to actually write a call to a lifecycle management tool like MLflow, where you do MLflow.log, these parameters, this metric,
we can do all of that for you actually in the background and we need to add support for PyTorch. So let me do a quick demo. I started a Spark application on Hopsworks with now just three executors, a little bit of memory for each of the executors,
and as you can see, I define the search space, in this case it's a simple MNIST model, convolutional model, so I define a kernel, a pooling, the size of the pooling for the pooling layer and you can also add additional parameters one by one,
for example the dropout rates. Then just import the experiment module from Maggie, the Keras batch and callback to provide the connection to Maggie and broadcast back the metrics,
and then you just have your normal model specification as you do in Keras, as you've been doing it on your local machine probably, and you add the callbacks here. And once you've got that, you can start the experiment and because we have this local client
actually in Jupyter running, once the job starts, it's gonna take a few seconds to start the executors, we can get the progress of, track the progress of the experiment inside Jupyter with a nice progress bar and you can even call, when you call print inside your training function,
they will show up here below in your Jupyter notebook, so it's really nice to debug actually because we've seen a lot of users, they get scared when they hear Spark because they know they are not gonna get their printouts back into their Jupyter notebook, they have to go to the Spark UI
and that's really cumbersome. So as you see, the model started training now in Epoch One, three machines, yes, three machines, and the experiment's running. And then as I said, we're working on provenance, so we are actually tracking this experiment for you.
As you can see, I have an experiment running here now. If you look at the finished ones before, you can look at the hyperparameter combinations, the metrics they produce, the logs of the single trials, the experiment directory on HDFS,
and for example also the notebook that you use, so it's versioned for you to run this experiment, and also the conda environment with all your dependencies in it to reproduce the experiment. And you can open TensorBoard,
that takes a few seconds usually. We can maybe go back for a second, no. There it is, yeah. And you can look at your experiment.
It's loading in the background, yeah. Here you can see all your trials, you can look at the learning curves. Yes, to get a better understanding of the experiments. Yes, that's it from my side. I'm open to questions. Here are some acknowledgements of colleagues and contributors.
Thank you, thank you. Please stay seated while we do questions so there's not a lot of noise. Any questions for Marutz?
Hi, I think the second slide you mentioned these parallel workers where you train the data at each one of the parallel workers. Do you train part of the data at each parallel worker or you train the full data set at each parallel worker? And then you compare the overall results
or somehow you're aggregating the models, that's, I don't fully understand. It's based on ring all reduced, so each of the worker trains on a subset of the data, and after each gradient descent step, they send. You update all these models. Exactly, so they pass the gradients to their neighbor,
all of them, and aggregate the gradients and then continue with the next step. But then you still get just one model at the end that is not trained at the full data set. It's just trained at part of the data set. So how you can make sure that the end model has somehow been trained on all the data set and not just parts of it
at each parallel worker that you have? Because you're allocating a part of the training data set to each worker such that all the data you wanna use is actually allocated to some worker. That's, yeah, data parallel training. That's how it works. Yeah, great. And this RPC function has been already integrated
in the Maggie, so the user doesn't want to bother with any stuff of RPC calls between anything. It just comes with the framework. Exactly, so as you see, you just write your normal code inside the Jupyter notebook. You just have to import the library. So we wanna hide all the complexity
of our distribution from the user. Yeah, great, thank you. Yeah, there's one.
Hi, I couldn't really understand what is the algorithm behind the optimization. Can you choose it, or it's like brute force? No, you can choose it as, either you can implement your own one if you have a special one in mind, but we provide a bunch of them out of the box.
For example, random search, grid search. ASHA, which is really, yeah, it's considered state of the art at the moment, produce really good results, and it's not even directed yet, so it's based on randomness. So if you wanna add, for example, a Bayesian model to that, it's probably gonna give really good results, yes.
And the second one, I'm not, even though I've done a lot of it models, and I really use Spark, and is Spark, let's say there's a model which uses GPUs, is there some kind of support for GPUs or CPUs? So if you run it on HopsWorks, we have our own version of the YARN resource manager,
which supports GPUs as a resource, and you can allocate GPUs to your executors. So for example here, if I had, on this cluster I don't have GPUs right now, but if you have GPUs, you would be able to say, give me one GPU per executor, for example.
And yeah, the whole platform's open source, so you can go and try that. Okay, that's all we have time for. Thank you very much, Moritz. Thank you.