We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

PySpark - Data processing in Python on top of Apache Spark.

00:00

Formal Metadata

Title
PySpark - Data processing in Python on top of Apache Spark.
Title of Series
Part Number
115
Number of Parts
173
Author
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language
Production PlaceBilbao, Euskadi, Spain

Content Metadata

Subject Area
Genre
Abstract
Peter Hoffmann - PySpark - Data processing in Python on top of Apache Spark. [Apache Spark] is a computational engine for large-scale data processing. It is responsible for scheduling, distribution and monitoring applications which consist of many computational task across many worker machines on a computing cluster. This Talk will give an overview of PySpark with a focus on Resilient Distributed Datasets and the DataFrame API. While Spark Core itself is written in Scala and runs on the JVM, PySpark exposes the Spark programming model to Python. It defines an API for Resilient Distributed Datasets (RDDs). RDDs are a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner. RDDs are immutable, partitioned collections of objects. Transformations construct a new RDD from a previous one. Actions compute a result based on an RDD. Multiple computation steps are expressed as directed acyclic graph (DAG). The DAG execution model is a generalization of the Hadoop MapReduce computation model. The Spark DataFrame API was introduced in Spark 1.3. DataFrames envolve Spark's RDD model and are inspired by Pandas and R data frames. The API provides simplified operators for filtering, aggregating, and projecting over large datasets. The DataFrame API supports diffferent data sources like JSON datasources, Parquet files, Hive tables and JDBC database connections.
Keywords
GoogolBoom (sailing)TwitterSlide ruleVotingFocus (optics)BitScheduling (computing)Task (computing)Social classVirtual machineEqualiser (mathematics)AlgorithmAnalytic setWeb crawlerComputerService (economics)Software testingJava appletComputing platformFuzzy logicCore dumpSet (mathematics)Computer engineeringMachine learningSoftware developerData storage deviceLecture/ConferenceComputer animation
GoogolBoom (sailing)Unruh effectInterior (topology)Metropolitan area networkUniformer RaumElectronic data interchangeTabu searchHand fanGraph (mathematics)Transformation (genetics)Interface (computing)Function (mathematics)Scheduling (computing)Library (computing)Core dumpTransformation (genetics)MultiplicationGroup actionResultantInteractive televisionData storage deviceGraph (mathematics)SequelStapeldateiStreaming mediaLipschitz-StetigkeitProcess (computing)Java appletData recoverySemiconductor memoryoutputSet (mathematics)CalculationContrast (vision)MereologyVirtual machineProgramming paradigmPlanningRow (database)Device driverComputer fileGene clusterSingle-precision floating-point formatPhysical systemDirected graphInsertion lossCurvatureLevel (video gaming)Graph (mathematics)Sampling (statistics)Functional (mathematics)Key (cryptography)Element (mathematics)Computer programmingReduction of orderOperator (mathematics)Lattice (order)Partition (number theory)FingerprintMachine learningMultiplication signFile systemFrame problemAlgorithmComputerLocal ringMemory managementFault-tolerant systemArmEndliche ModelltheorieDisk read-and-write headPerspective (visual)CountingCuboidSelf-organizationRight angleSystem callConnectivity (graph theory)Social classGraphics libraryElementary arithmeticLecture/ConferenceXMLComputer animation
GoogolBoom (sailing)Executive information systemInterface (computing)Maxima and minimaMetropolitan area networkLine (geometry)Information systemsData modelExt functorCountingComputer fileInheritance (object-oriented programming)Functional (mathematics)MetadataComputer programmingCalculationOperator (mathematics)Modal logicMoment (mathematics)Line (geometry)outputPlotterSpacetimeFile systemComputer fileMereologyMedical imagingResultantNumberComputerLocal ringVirtual machineProgramming paradigmProcess (computing)CountingJava appletSet (mathematics)Keyboard shortcutWordLevel (video gaming)Reduction of orderDynamical systemNormal (geometry)Lecture/ConferenceComputer animation
GoogolBoom (sailing)Metropolitan area networkExt functorInformation systemsLine (geometry)Data modelMaxima and minimaDifferent (Kate Ryan album)Java appletRevision controlObject (grammar)Type theoryCalculationMoment (mathematics)Virtual machineNormal (geometry)Context awarenessComputer programmingFrame problemDevice driverLecture/ConferenceProgram flowchart
GoogolBoom (sailing)Grand Unified TheoryRelational databaseTheory of relativityElectronic data processingComputer programmingLevel (video gaming)Declarative programmingQuery languageData storage deviceFrame problemMathematical optimizationAbstractionSequelMereologySinc functionLecture/ConferenceProgram flowchart
GoogolBoom (sailing)Computer programmingRight angleSet (mathematics)Operator (mathematics)Different (Kate Ryan album)Theory of relativityFrame problemLecture/Conference
Relational databaseMetropolitan area networkProcess (computing)Normal (geometry)Computer programmingJava appletFrame problemRaw image formatRow (database)Program flowchart
GoogolBoom (sailing)Function (mathematics)Military operationComputerMeta elementMatrix (mathematics)Port scannerRelational databaseTable (information)Mathematical singularityFrame problemComputer fileProblemorientierte ProgrammierspracheObject (grammar)PlanningPredictabilityRow (database)Functional (mathematics)SequelContext awarenessDeclarative programmingLattice (order)Form (programming)Sampling (statistics)Projective planeComputerStatement (computer science)User-defined functionTable (information)InternetworkingFunction (mathematics)Level (video gaming)MetadataSet (mathematics)Group actionElectronic data processingLecture/ConferenceComputer animation
GoogolBoom (sailing)Table (information)Port scannerRelational databaseMaxima and minimaDigital filterMathematical singularityMetropolitan area networkLinear subspaceDevice driverProcess (computing)Statement (computer science)PiSequelDeclarative programmingVariety (linguistics)Frame problemQuery languageTable (information)Mathematical optimizationComputer fileSoftware frameworkEndliche ModelltheoriePattern matchingProgramming languageDatabaseFile systemRun time (program lifecycle phase)Computer programmingInsertion lossRight angleOffice suiteNormal (geometry)PlanningRevision controlDevice driverDirected graphRule of inferenceResultantMetreSoftware developerTheory of relativityData storage deviceFile formatComplex (psychology)Lecture/ConferenceXML
GoogolBoom (sailing)Integrated development environmentPlastikkarteoutputMetropolitan area networkDiscrete element methodFront and back endsHigher-order logicExt functorType theoryValue-added networkPort scannerComputer fileIcosahedronData typeElectronic mailing listPoint (geometry)Regulärer Ausdruck <Textverarbeitung>Frame problemFile formatType theoryPoint (geometry)Frame problemField (computer science)System callData storage deviceSocial classGraph (mathematics)CuboidTable (information)Equaliser (mathematics)MiniDiscPhysical systemMoment (mathematics)Moving averageComputer fileNumeral (linguistics)Complex (psychology)Numbering schemeInferenceRow (database)Process (computing)Data typeView (database)outputArithmetic meanFile systemPredicate (grammar)Vertex (graph theory)Partition (number theory)ResultantMultiplication signExpressionRule of inferencePredictabilityGroup actionProjective planeSimulationContext awarenessData compressionFunctional (mathematics)String (computer science)Flow separationNormal (geometry)SequelInternet service providerStack (abstract data type)BuildingNetwork topologySimilarity (geometry)Lecture/ConferenceComputer animation
GoogolBoom (sailing)Demo (music)Metropolitan area networkFrame problemEvent horizonVideo gameVirtual machineMusical ensembleKey (cryptography)BitFile archiverLecture/ConferenceComputer animation
Metropolitan area networkCellular automatonInclusion mapBitSource codeComputer animation
GoogolBoom (sailing)MP3Metropolitan area networkChi-squared distributionInteloutputData typeLoginNormed vector spaceMaxima and minimaCellular automatonCountingInclusion mapEvent horizonAliasingSocial classComputer fileEvent horizonSemiconductor memorySequelContext awarenessSingle-precision floating-point formatGraph (mathematics)Revision controlStatement (computer science)ResultantVirtual machinePoint (geometry)Stability theoryMereologyMathematicsSubject indexingEuler anglesFrame problemComputer wormGreatest elementMassMultiplication signTraffic reportingCalculationDeclarative programmingNormal (geometry)Total S.A.Core dumpSoftware repositoryType theoryLaptopDemo (music)Lecture/ConferenceComputer animation
Metropolitan area networkAbstractionSet (mathematics)Computer clusterFrame problemPlanningRow (database)CuboidOperating systemMereologyRule of inferenceProgram flowchart
GoogolBoom (sailing)Information systemsProgramming languageTerm (mathematics)Frame problemCollaborationismDomain nameDeclarative programmingPoint (geometry)Demo (music)Lecture/Conference
GoogolBoom (sailing)Cellular automatonMetropolitan area networkPoint (geometry)Social classLecture/Conference
Red HatRobotComputer animation
Transcript: English(auto-generated)
Hi everybody, thanks for the introduction. Yes, my name is Peter Hoffman You can see my Twitter handle at Peter Hoffman and you can find afterwards the slide at github.com slash blue yonder Before I start my talk a little bit about me and about blue yonder. So what do I do?
I'm a software developer at blue yonder blue yonder is providing predictive analytics as a Scientists we have one of the biggest data science team in Germany Our stack is mostly Python and we are Building a platform where we run our machine learning algorithms on top of it
You see here. We are 10 people from blue yonder at the euro Python and we have nine talks So after me you have still the chance to see three other people from blue yonder You can see tomorrow Moritz talking about testing and fuzzy testing you can see Christian talking about bulk data storage with SQL
I see me and you can see Florian talking about bots and last but not least Philip is the I think he will present what blue yonder really does So let's start in
About what is spark? So spark is a distributed general proposed Computation engine it has API's to Scala Java R and Python and it's mostly for machine learning and distributed computing Spark has one core API
That's the resilient distributed data set and based on this core API all other API's are sitting on top and spark is It runs on a cluster so it on multiple machines You can use different different schedulers to run spark on a cluster like a standalone scheduler
You can use the I do yarn scheduler or you can run spark on mesos On top of spark core and they are sitting several libraries The important order the most important one is spark sequel or the spark data frame API then there's spark streaming where you can Calculate based on micro batches you can do stream computing
There's the ML lip library for machine learning and there's the graph X library, which is for graph processing Spark itself is written in Scala and runs on the virtual machine as the Java virtual machine and it is Responsible for the memory management for fault recovery and interaction with other storage systems
Spark sits on top of the tube stack so spark can access every data source that the Adobe stack provides in standalone so the core library of spark is the RDD the resilient distributed data set and the RDD is a
Logical plan to compute data based on other data sets RDDs are fully fault tolerant so that a system can Recover from the loss of single nodes in your cluster or from single failures in a calculation of parts of your RDDs
Spark will then rerun the calculation and try to recover from a machine failure There are two basic principles how you can interact with RDDs. The first one is through transformations So a transformation always takes one or more RDDs as an input and has an RDD as an output
Transformations are always lazy. That means they are not calculated on the fly but they are calculated when you call an action on an RDD and action are the Last step in a calculation plan where you really want to collect the data so you can take
Some rows of your data You can get all you can count the result set and then the data or the calculation will really be run and you'll get back your data spark tries to minimize data shuffling between the nodes in your cluster and
In contrast to the Hadoop stack, it doesn't write all intermediate results to the to the file system But it tries to keep them in memory and therefore a spark if your data fits into memory in the memory of your cluster It must it's much faster than traditional map reduce text
if you combine multiple transformations With your RDDs, you'll get the RDD lineage graph. That means That based on your partition of your input data You can have a lot of transformations one arch after another and spark tries
To group these transformations together and when possible run them on the same node Many transformations are element wise that means they can only work on an element at a time But it's not true for all operations operations like group by or join operations work on multiple elements
And as I said earlier actions are then used to get the result back and to return it to a your driver program If you know I do bought a traditional map reduce
Programming model there only map and reduce steps While spark has much more transformations. It has the map and the reduce Computation, but it also has things like flat map has a filter has a sample function You can do unions of multiple data sets
You can do an intersection you can group the data by keys You can aggregate it by keys and you can do fully join in our outer and right outer joints of your data sets What's important for spark is that it knows the partition of your input files and knows knows the data
locality of your partitions because it always try to run your Calculations where the data is so spark tries to bring the algorithms to your data and Minimize shuffling data around in your clusters
So you have a set of partitions which are atomic pieces of your data set and you have a set of dependencies Based on parent RDS and you have always functions which will calculate RDS based on your parent RDS Spark needs to know about the metadata of your data to know where your data is
Located to be able to do data locale Computation So that data shuffle which is expensive and which will really slow down your calculations is only done when it's necessary As I said earlier a spark is implemented in in Scala and runs on the Java virtual machine
So what is pi spark? pi spark is a set of bindings or API's which sits on top of the spark programming model and Which expose the programming model to your Python programs? So that's the famous word count
Example what you do is you always start with an with an input RDD That's some kind of basic file system operations Here you can load a text file from an HDFS and then you have the normal map reduce steps Where you split the lines by white space? Then you will emit each word with a number and then you do a reducment where you calculate the occurrences of the words
As Python is Dynamically typed it's possible that your RDS can hold Objects of different types that's not possible in the Java version, but the Scala version also has this possibility
At the moment pi spark not supports all API's that are supported in the Scala Version for the data frame. They are nearly provide everything but for streaming Pi spark always likes one or two versions behind the normal Scala API's
So here you see how it's done. You always have a driver context. That's on your local machine You can As I show later, you can have an IPython session or run your normal Python program This will then connect to a spark context which will talk over
pi4j to the Java virtual machine on your host which will then talk to the workers and each worker will again talk to Python or to JVM, it depends what kind of calculations you do on top of the RDDs
There's the relational data processing in spark That's a relatively new API. It's have been added to the API's in spark 1.4 only two months ago and It's a new kind to work on a higher level with your data
through declarative queries and optimized storage engines It provides an programming extractions called abstraction called data frames and it also acts as a distributed SQL query engine I'll show you later how you interact and What's really nice thing is that the query optimizer the catalyst optimizer works the same for?
Java, Scala and Python so you will gain the same speed in your Python programs as we will gain with Scala programs and The data frame API provides a rich set of relational operations
So you can interact through different API's with the data frame API. You can connect it to see so Java program can talk to it through the normal JDBC API you can directly talk to it through user programs in Python Java and Scala and
You can also switch between the data frame API and the raw LED API So what's a data frame a data frame is a distributed collection of rows grouped into named columns with a schema And it has a high level API for common data processing talks
That's projection filtering aggregation join and it has metadata sampling and user-defined functions So you can define your user-defined function in Python and use it in the sequel statements in the sequel queries As with the RDDs data frames are executed lazy. That means each data
Frame object also only represents a logical plan how to compute the data set and computing is is hold on until you really call an output action a
Data frame you can see it as an equivalent to a relational table in spark sequel and You can create it through various function using a sequel context And then once you have created it you can Operate it on it through a declarative domain specific language. So here you just we just load
People's JSON file which has some rows of JSON and then you can like you know it from sequel ice me or maybe from pandas You can do filtering selecting projection And get your data back and if you compare these two statements
So the first one is in the declarative Python way. The second one is a sequel way and they result in the same Execution plan on or in spark itself So it's only decorative if you write it in Python you'll get the same speed as the plane sequel When all the one you define in Scala
Why is this possible? It's possible because spark has the catalyst query optimization framework, which works for all languages which use the data frame API It's implemented in Scala and uses features like pattern matching and runtime meter programming to allow
Developers to specify complex relation and relational optimizations and as you can see that's from the spark website If you work with the plane RDS, the Python version is always lower than the Scala version But if you sit on top of the data frame API where you only use declarative statements
Then you'll get the same speed up as in Scala So, how do you talk to your data? as I said before Spark works on top of Hadoop so you can access all the Hadoop
file system and Drivers that are available there Through the data source API so you can talk to hive tables You can read in Avro files CSV files JSON files You can read and store the data in the parquet columnar fault format and you can also connect to a normal
JDBC databases I'll go into little into detail into the parquet data format Because I think that's a really great way to work and store data from spark
so parquet is a columnar format that's supported by many data processing systems and you can store the parquet data format in chunks into an Hadoop HDFS file system parquet automatically preserves the schema of the original data and As you can see here if you have a table with three columns the normal row oriented storage
Is that you write each row after row and the column oriented storage is that you? Save your data in column order and it had different had several advantages The first one is normally in one column. There's similar data So compression works much better if you block and code
Column wise data and then if you have data with many columns and you don't want to access all Columns every time it's much faster access to just access some columns at a time The data frame API is able to do prediction and projection pushdown that means if you underlying storage is able to
work with vertical partitioning or Horizontal partitioning the spark data frame API can push down the predicate to your storage engine and Don't have has to read all the data into spark but let the storage engine do the hard work
So can see here vertical partitioning. That means you only want to have the column B and maybe you have Some predicates on some rows, so we will see say, okay Only want the the rows where a is a two and C is in C four and five So you can split this up and only will read the result into spark two for further processing
the Data frame API not only supports tabular data It has basic types like numeric type string types and byte types, but it also supply
Provide support for complex types and nested types So you can build build tree like data and access tree like data from the data frame API You always have to provide a schema or the data has a schema There are two ways to get the schema into your data frame. The first one is to do
Schema inference this works with typed input data like Avro Parquet or JSON files or with a Normal example with dict files where it can guess the data types For your data frame or you can specify that the schema by yourself
So here we want to read in a normal CSV V file And we define a stroke type with several fields and we add to the fields the type of the field This is a call
To see what are the important classes of your spark sequel and data frame So you have the context that's the main entry point to interact with data frame with sequel like functionality You have the data frame which is the distributed collection of group data and you have columns expressions which you can work on your data frame or row is a row in the data frame and group data is
data you get from aggregation aggregation methods like group by and you have Earlier the types were described the schema So when you have a data frame, you know, that's looks like pandas I think you can select you can filter you can group by and work on the data as
You do a not local machine But you really work on a cluster and that's what I want to show So I'll show you a little example. It's from The github archive so the github archive stalls all the events that are on github for the last half year that are
27 gigabytes of JSON data about seven million events and now all my colleagues said don't do it But we'll try it anyhow To do a live demo and connect to the cluster. So it's a little bit too small, but I think we'll get it
Also, this will not work So I also wanted to show you the cluster but this will not work at all, so I'll try to only go through the
Programming statements. So we always start with a sequel context That's our entry point where we connect to the cluster I fear cluster with four machines each 40 cores and about one terabyte of from in total So what we do we just want to read a single text file or JSON file
That's one hour from one day into into the cluster. You see it here how to get it you'll say The context get the text files take one. So we'll dump it out We what we see it's normal JSON, but it's an hierarchical JSON
But as I said earlier, that's no problem because spark can work with hierarchical data Now we read it in as a chasin so it auto detects the schema
Yes, so now we read in the data and spark auto detects the schema so what we see for each event we have an Actor, that's the the person who committed to the to the to github we have a created
Version we have some payload and we have the type of the event at the bottom like a pull request or something like that and that's Spark automatically detects the schema so we don't have to do anything
So now let's try to work on the whole data Not only one hour from one day, but work on all events from the last half year from github Have a look how much events we've got So now from my macbook, which doesn't have that much memory we working on 70 million events
Let's have a look at how many pull how many events were in the Apache spark repo in the last Half year so roughly 60,000 or have
it's have again look who are the top committers in this Apache spark repo and Show it So now all the calculation is done on the cluster and only the results that comes back to my ipython notebook
so that's pretty cool because you now work on much bigger machines and don't have to do the calculation at your laptop and you also can always register data frames to the SQL that you can also run normal SQL statements
Instead of using the Python Declarative language So that's the demo Then a little summary. What's what spark so spark is the distributed general purpose cluster computation engine and
Pi spark is an API to it the resilient distributed data set is all Electrical plan work on your data data frames are high-level extract extract abstraction That are collection of rows grouped into name columns with a schema and the data frame API
Allows you to manipulate data frames through a declarative domain language So thanks for your attention and any questions
Okay So we got this one the main point why I wanted to show the demo I want to show the edge top. It's all the cluster So that's what I really like if you have once Yeah, talk to a cluster with
160 nodes and one terabyte of RAM. That's funny. So any actual questions? Then if you get any other questions, come to our booth come to the other talks and come to me. See me around
Great. Thank you Peter again