GeoMesa: Distributed Spatiotemporal Analytics
This is a modal window.
The media could not be loaded, either because the server or network failed or because the format is not supported.
Formal Metadata
Title |
| |
Title of Series | ||
Number of Parts | 188 | |
Author | ||
License | CC Attribution 3.0 Germany: You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal purpose as long as the work is attributed to the author in the manner specified by the author or licensor. | |
Identifiers | 10.5446/31648 (DOI) | |
Publisher | ||
Release Date | ||
Language | ||
Producer | ||
Production Year | 2014 | |
Production Place | Portland, Oregon, United States of America |
Content Metadata
Subject Area | ||
Genre | ||
Abstract |
| |
Keywords |
00:00
Order (biology)Computer fileControl flowEndliche ModelltheorieBlock (periodic table)Electronic mailing listEvent horizonDatabase normalizationParallel portTask (computing)Type theoryProcess (computing)FeldrechnerCartesian coordinate systemStack (abstract data type)Transformation (genetics)WebsiteQueue (abstract data type)DistanceTwitterTopologySource codeRaster graphicsDatabaseIterationOpen sourceRepresentation (politics)File systemPlug-in (computing)Message passingComputerPoint (geometry)Field (computer science)Tablet computerUniform resource locatorServer (computing)FamilySet (mathematics)Different (Kate Ryan album)LinearizationRelational databaseSpacetimeSingle-precision floating-point formatMereologyGeometryProjective planeHash functionBitSlide ruleCategory of beingPartition (number theory)Level (video gaming)RecursionComponent-based software engineeringSubsetNetwork topologyCovering spaceData compressionLibrary (computing)Filter <Stochastik>Reduction of orderResultantQuery languageRight angleNatural numberSurfaceMomentumDrop (liquid)Computing platformIntegrated development environmentPredicate (grammar)Data storage devicePhysical systemClient (computing)TupleoutputForm (programming)WordMiniDiscNeuroinformatikSummierbarkeitSoftware developerStapeldateiPhase transition1 (number)Associative propertyProgramming paradigmGraph (mathematics)Multiplication signInformation securityCapillary actionQuicksortCanonical ensembleSpace-filling curveAddress spacePhysicalismPixelPriority queueLine (geometry)Sparse matrixElement (mathematics)PlanningPolygonComputer architectureDimensional analysis2 (number)Real-time operating systemNumberAttribute grammarPredictabilityRow (database)TimestampRevision controlTable (information)CountingWritingGoodness of fitPattern languageService-oriented architectureBefehlsprozessorProgram slicingStorage area networkTraverse (surveying)Mathematical analysisImage resolutionMaxima and minimaPosition operatorFunctional (mathematics)Population densityBoundary value problemDot productInformationDomain nameCellular automatonInterleavingCodierung <Programmierung>SequenceDivisorComplex (psychology)CASE <Informatik>Axiom of choiceTrailScaling (geometry)Streaming mediaNeighbourhood (graph theory)DialectGene clusterPort scannerRange (statistics)Heegaard splittingHelmholtz decompositionReading (process)Vector potentialWeightContext awarenessCache (computing)Matrix (mathematics)Parameter (computer programming)Operator (mathematics)Real numberIncidence algebraProxy serverComputer animation
Transcript: English(auto-generated)
00:00
My name is Anthony Fox, I'm from Charlottesville, Virginia. I work for a company called CCRI, Commonwealth Computer Research, and I'm here to talk to you about distributed spatiotemporal analytics, specifically built on top of the Hadoop ecosystem and Accumulo distributed column family database.
00:24
GeoMESA is a location tech open source project, part of a family of excellent projects that include GeoTrellis and Spatial4j and JTS and UDIG and a handful of other spatial capabilities under the Eclipse Foundation.
00:42
So I didn't want to make any assumptions about what people know about the Hadoop ecosystem, so I'm gonna cover just enough of it that the rest of the talk is not opaque to everybody. Just to show hands how many people have heard of Hadoop, how many people have written a MapReduce job.
01:03
Okay, so there's quite a bit of experience here, but I'm gonna cover just enough for those of you who haven't. After covering the Hadoop ecosystem, I'm gonna talk a bit about how Accumulo, I'm sorry, how GeoMESA fits into that infrastructure and how we enable spatiotemporal indexing
01:23
on top of this distributed database. But then the primary part of this talk is a dissection of three example analytics and how they execute across the different components in a traditional Hadoop stack. So I'm gonna try to get to that as quickly as I can
01:42
without being too confusing. So the first thing we're gonna cover is the Hadoop ecosystem. We're gonna cover HDFS and MapReduce. Then we're gonna talk about Accumulo, which is the database that's built on top of HDFS, and then talk briefly about some of the libraries that extend and simplify the process
02:01
of developing for Hadoop. The stack that I'm going to cover and that's going to be exemplified in all of the analytics is essentially what you see on the slide here. So at its core, we have HDFS, the distributed file system.
02:20
Accumulo's built on top of HDFS, as is other databases that are similar to Accumulo like HBase and Cassandra. GeoMesa is built on top of Accumulo and it also has plugins inside of GeoServer, which is shown on the right. And then the computational libraries that are on top of the Hadoop stack,
02:42
there's, you have your traditional MapReduce. That's the batch analytic processing out of the Google paper from years ago. You've got a fairly new capability called Spark that's gaining a lot of momentum. It's very good for doing low latency type computations in a distributed fashion.
03:01
And then you've got Storm, which is the streaming analytic platform that's on top of Accumulo. It tuples in in a stream, executes a computation and stores that wherever you want it to be stored. Kafka's shown on the left. It enables things like Storm within this environment. It's a high performance queuing system,
03:22
message queuing system. But what we've done with GeoMesa is we have built plugins for GeoServer so that any access, any OGC access can come in through GeoServer and then be transparently executed on this stack. So any OGC client can make a WMS request,
03:42
which then distributes across the resources that are backed by this stack. Or a WFS query or a WPS request might come in and execute in one of MapReduce, Spark, or Storm. And I'm gonna talk through some examples of that. So first, HDFS and MapReduce.
04:03
HDFS is a block file system. It takes very large files on the order of terabytes, breaks them into blocks of, by default, 128 megs, but it's obviously configurable, and then distributes those blocks across all of your resources. So it also replicates those blocks
04:21
for redundancy and failover. The blocks establish data parallelism. So Hadoop is very good at data parallel computations, and MapReduce is very data parallel. You send a map task, one map task per block, it executes on that block, and if you have 100 blocks in your file,
04:41
you're gonna get 100 parallel tasks that execute. MapReduce is also predicated on associative computation. So there's, in the MapReduce paradigm, you've got map tasks which go over your raw data and aggregate it in some form and emit results. And then you have a shuffle step which organizes your data, sorts it,
05:03
and then sends it through a reduce step which does some aggregation. The canonical example, the hello world of MapReduce is word count. So probably everybody that has written a MapReduce job has at least seen this or even written it. But the idea is that you have a huge text file and it's broken up into these 128 megabyte blocks.
05:23
Each map task processes one of the blocks and emits all of the words and the number one associated with each word. And then each reduce task, each execution of the reducer happens against a single word and it aggregates the sum of all of those words
05:40
that came out of the map tasks. The shuffle sorting phase happens in between in order to get to that single reduce step for a single word. And the summation is the associative operation there, it's the reduction. But more importantly for this audience is how neatly a heat map computation
06:02
maps to the same paradigm. So let's say you have a huge text file of wicked geometries and it's broken up into these blocks. You send out a block, you send out a map task per block, you send your computation to the data. For each feature in the block file, each line is a wicket.
06:21
You emit, you compute the world to screen the pixels that that wicket impacts and you emit pixel and the number one. Then in the reduce step after everything's shuffled and the pixels are marked, you sum up all of the elements that hit a pixel and you have a heat map.
06:42
It's very simple and it's very effective in a batch computation. I'm gonna show how we've implemented that in Accumulo for low latency MapReduce. So MapReduce is a computational paradigm. It's not tied to the MapReduce infrastructure
07:00
that Hadoop has. You can do it in other contexts as well, including Spark. So how's that look? Okay, so Accumulo is a distributed database built on top of Hadoop. It was based on Google's Bigtable paper, which came out in 2007. That paper spawned a number of implementations,
07:22
including HBase, which I think is the most widely used column family database on the Hadoop platform. But Cassandra's another instantiation of the same concept. You get column oriented storage. It's a key value store. I'm gonna go into more details about this in a second. But column oriented storage gives you nice compression
07:43
and also gives you an arbitrary number of columns. Each row can have a different set of columns. You have a more schema-less structure, although it's not no schema. One of the primary constraints of these distributed databases is that they impose a single type of indexing capability
08:03
and that's lexicographic indexes. Your data has to be sorted and the database will sort your data for you. And if you think back to the blocks that I just mentioned, it naturally conforms to that block level partitioning. So a block corresponds to a lexicographic subset
08:22
of your key space. And that's one of the primary challenges with indexing spatial data in an Accumulo column family database. I'm gonna go into more detail about that. We leverage a couple of nice things about Accumulo. Bloom filters help us to filter files that we don't need to access
08:41
to satisfy a query or a predicate. And server-side iterators is something that I think actually distinguishes Accumulo from the other databases like HBase. It's a natural extension point. You can drop jar files in that implement iterators and they are stuck into the iterator stack
09:02
that's executing and traversing over your data. We use these to do spatio-temporal type predicates. So we do all of the D9IM topological predicates within iterators inside of the database. And we can also do analytics with these iterators. So I'm gonna show you that.
09:22
More detail, I mentioned it's a key value store. The key though is actually broken up into a five tuple consisting of a row ID, a column family, a column qualifier. Visibility is the security marking within Accumulo. Accumulo is very sensitive to security. And a timestamp to do versioning.
09:42
You can have multiple versions of a piece of data. And the value is an uninterpreted blob of bytes. Data is distributed. It's a distributed database obviously. So tables are broken into tablets and tablets are sent out across all of the tablet servers.
10:03
The tablet servers are processes that are running on all of the nodes in your cluster. And they are responsible for a single tablet or perhaps multiple tablets of a single table. So they know that they're specifically responsible for a subset of the entire key space.
10:21
And if a query happens to hit that subset, then that tablet server will be communicated with to satisfy that query. Accumulo has fantastic write performance. If you pre-split your table, you're essentially saying to each tablet server that you're responsible for this small subset.
10:42
And as your data comes in in a streaming fashion, you assign it to a particular tablet server based on its key. And so you're getting this dispersed write capability. You're spinning multiple disks as quickly as you can. That's how you get really good write performance
11:01
with Accumulo. Like these other column family databases, HBase and Cassandra, Accumulo is quite low level. You have to lay your data out according to the way that your access patterns dictate. So in a RDBMS, in a Postgres,
11:23
you lay out your data in a tabular form and it's very careful to structure the data on disk for performant access. In Accumulo, you have to map your data to that lexicographic index, which has implications for data layout. So I'm gonna talk about that with GeoMESA.
11:44
There are many libraries that you can use to simplify the process of development on top of Hadoop. On the left is a few libraries that are great for doing batch analytics. Some of these libraries like Cascading and Scalding and Pig
12:02
take your computation and in a sense, compile it to a set of staged MapReduce jobs. Spark does something very similar, but executes in its own execution environment for low latency purposes. On the right, you've got streaming analytics.
12:20
The canonical example is Storm, which is a directed acyclic graph representation of your computation. And then there's Spark as well, which has a sort of a new capability now called Spark Streaming, which I haven't had enough time to work with,
12:40
but it's pretty shiny and I'm looking forward to working with it. So let's talk a bit about GeoMESA and how we actually store spatiotemporal data and query it. There's three aspects to that that are critical for understanding it. One is we have to deal with this lexicographic index and to do that, we use what's called
13:01
space-filling curves. That implies a physical data layout that has implications for performance. Finally, we have to address query planning in this structure so that we can actually respond to queries with arbitrary polygons. So the problem is that we have multi-dimensional data.
13:21
The dimensions of primary interest to this audience is lat, long, and time, but often we have dozens of other attributes as well. For lat, long, and time, we have to map that into a single dimension linear lexicographic index. To do that, we linearize the key space using geo hashes.
13:42
Geo hashes I'm gonna describe in the next set of slides. They're a form of space-filling curve, but they have some very nice properties that are recursive prefix trees. You get nice compression because often they share a prefix within a cumulo and there's tunable precision. You can add or remove bits
14:01
to your representation of a geometry so that you can scale up to worldwide data sets or scale down to regional data sets and still use all of your clusters resources. So the way that, so geo hashes are a z-curve.
14:20
There's Hilbert curves, there's a handful of other space-filling curves, but basically they map a multi-dimensional space into a single dimension. So remember, we've just got this lexicographic dimension which we can work with. A geo hash is an interleaved bits, it interleaves the bits of splitting along lat and long dimensions.
14:42
So the first split is on longitude, on the left you get a zero, on the right you get a one. At the next level you split on latitude and you append a zero if you're above the line, you append a one if you're below the line, and you keep going until you get down to the level of resolution that you care about.
15:01
And the level of resolution that you care about is a function of the data boundaries of your problem domain. So if it's the world, you might go to 25 bits of resolution, but if it's a region, if it's the mid-Atlantic, you might go to 35 or 40 bits of resolution.
15:21
This interleaving of bits induces a linear walk through the space, and the linear walk is lexicographic. So if you take that binary string, we can base 32, it's actually not base 32, but it's similar to a base 32 encoding of that binary string, and the lexicographic properties
15:40
are such that it traverses the space as you see here. You can go to any level of resolution as well, which is a nice property of geo hashes. So how does this translate to laying out data within Accumulo and GeoMESA? As an example, we're gonna look at events in downtown San Francisco. So the first thing that we've done
16:01
is we've gridded our space down to 25 bits of resolution, which corresponds to five characters. In a base 32 encoding. So we're gonna look at one of these coarse resolution blocks of data. So the first thing that we have to do
16:21
is you see our tablet service on the right. The tablet service is distributed across the CPUs that are distributed across the disks. We wanna spin those disks in a optimal manner. So first thing we have to do is allocate a slice of space in a structured manner on those disks. Now, what you see is that within that block
16:41
that we care about, 9Q8YY, downtown San Francisco, we've allocated a slice on all of our tablet servers. So we're actually gonna uniformly distribute the data to all of those tablet servers. And we do that by prefixing the data with a shard ID modulo the number of tablet servers that you would like to get,
17:01
the level of parallelism that you wanna get. That's represented by coloring the dots in the map. So all of the green dots go to tablet server one. All of the yellow dots go to tablet server two, and the red dots to tablet server three. And that happens within that level of resolution. But it's repeated for every one of these grid cells
17:23
in a structured way. So we know if a query comes in that we have to hit these three tablet servers, but when we hit these three tablet servers, we can quickly jump to the slice that corresponds to 9Q8YY. So in our case, we're spinning three disks, but we're spinning them in a structured way
17:40
so that we quickly traverse our data. So how do we do query planning in this context? What I'm showing you here is a CQL, an OGC CQL query with three predicates. It's got a spatial predicate, the BBox query at the top,
18:00
a temporal between predicate, a during sort of predicate, and an attribute predicate. And the idea of query planning in general is to minimize false positive disk reads. We don't wanna traverse data that we don't have to, and maximize true positive disk throughput. So we wanna spin those disks, or as many disks as we can,
18:22
to get the data off of disk and come back from the satisfaction of the predicate query. Since we have three attributes in our predicates, we've got space, time, and a attribute called tweet text. GMA actually has secondary indexes
18:42
on any of the attributes that you have in your data. So we have to choose the primary index that we care about that would reduce the cardinality of the results at the most. So if you're doing a Postgres explain on your query, you'll often see it chooses a particular index and then does a sequential scan
19:01
across the results of that index and applies the predicates that it didn't use as an index or it does two indexes and it does a bitmap intersection of the results of those sets. In our case, we're going to talk about the spatio-temporal aspect. So assume for now that in this query,
19:22
we decide that the spatio-temporal predicates combined reduce the result set the most. So we're gonna ignore the attribute query, we're gonna actually apply that in parallel across our data. So the first thing that we have to do is we have to take our polygon and decompose it
19:41
into the set of covering geo hashes that correspond to the ranges that we have to scan in our accumulator database. We recursively iterate over the polygon using a priority queue where the priority is based on the distance from the center of the geo hash that we're looking at to the center
20:01
of our predicate polygon. And in this manner, we can optimally discover the covering geo hashes and ignore any of the other geo hashes that we don't have to traverse down into. So we get a set of geo hashes at different resolutions.
20:20
So you can see kind of in the center there that there's a fairly large geo hash. That's at a lower resolution than some of the ones around the edges that have to cover the complexity of the border of the polygon. That corresponds to scan. So then we send the scan out to all of the different tablet servers and we send with the scanner the attribute filter.
20:43
So we say, we know now that you only need to scan these small ranges of data and you have to apply this attribute filter which is tweet text like phosphor G in parallel on the server side. So that's the sequential scan part but it's against such a reduced subset
21:02
of the data that it's much faster. And that's the general idea between within geomesis spatio-temporal querying. So that's the background material. Now I'm gonna jump into the, we're gonna decompose and dissect three analytics
21:22
and how they execute across all of those components of the Hadoop stack that I put up as an image before. The three analytics are density computations, streaming analytics for things like anomaly detection or tracking and the final one is spatio-temporal event predictions.
21:42
So starting with density computations, this is a minimal stack, a minimal use of components within the stack. We wanna take dots on a map that have some information but not that much information and turn it into a heat map that has much more information. We already talked about how you might do that
22:01
in a map reduced fashion but what we can do is do that all within a cumulo. So I said that we have uniformly spread the data across all of our tablets but the data represents that single cell in all of the tablets. So what we do is we send out an iterator
22:21
which we've stacked on top of this stack of iterators, the extension point of a cumulo that is traversing our data and it's initializing a sparse matrix. The sparse matrix for each tablet server covers the whole cell but it doesn't cover all the data in the cell. So the map task is initializing the sparse matrix.
22:43
It's sent back as a sparse matrix which is a compressed form, compressed representation of all this data and then within the client side, all of those matrices are summed together. That's our associative operation and the result is a heat map. So request comes in.
23:00
In this case, it's via WMS and it requests a heat map via a styling parameter in the SLD portion of the request. GeoServer is acting as the client of a cumulo in this case. It sends out requests to a cumulo to each tablet server that has a slice of the data that we care about. Each tablet server executes and computes a sparse matrix
23:25
of the data that it knows about for that cell, sends it back to the client which aggregates it into a single representation and sends it out over the OGC request. So that's pretty simple. The second analytic that I wanted to talk about
23:42
utilizes Storm for streaming analytics. Some of the use cases are epidemiology, how diseases propagate around the world which is particularly apropos with the Ebola stuff that's happening now. Geofencing, you might wanna put a virtual polygon around an area and see when things enter
24:01
or leave the area of interest. Tracking problems, I already mentioned. One of the interesting things recently has been event detection in streams of data. There's a company called Jawbone that makes a Fitbit-like health monitor and they had this really interesting,
24:22
this really interesting analysis of the sleep patterns of their users after the Napa earthquake. They could see how the sleep pattern was disrupted as you went out from the epicenter. And that's what's showing up on the top right there in that line chart.
24:40
And the URL is listed there as well. It's pretty interesting to go to. So you could take in a Twitter stream, you can monitor it, you can infer the sleep patterns or the disruptions of sleep patterns. You can cluster mentions for impact analysis and for potential rapid epicenter analysis for emergency resource allocation,
25:01
those types of applications. The architecture stack looks like this. You have GeoServer sitting in front of a Kafka queue. You have a fire hose of data, Twitter's fire hose of data as an external source being published to Kafka topics. The storm topology that's running
25:21
and represented by those bolts in between Accumulo and Kafka has a spout that's listening to the particular topics. A spout is Storm's vocabulary for something that pulls data into the computational topology. So it reads off the spout, it reads messages and then it sends it to this computation
25:41
which might do filtering for messages about earthquakes and then do some sort of clustering like DB scan and one of the far right topologies. It writes to Accumulo. It also potentially reads from Accumulo to get static contextual information to improve the analytic and the result is written
26:00
both from Accumulo and from Storm out through a topic that GeoServer's listening to. So what we've implemented within GeoServer is a data store that retrieves its data from Kafka and it only retrieves the last 30 minutes of data. So every time you hit a WMS or a WFS against that,
26:21
you're gonna get that 30 minute cache. So it's a real time data source. So there's lots of applications of streaming analytics in this context and we use this sort of infrastructure and this set of components to do that. The last analytic that I wanna talk about
26:40
is spatiotemporal event prediction. The applications are real estate buying and selling patterns, again epidemiology but we're gonna work through a criminal incident prediction example and this one's going to use traditional MapReduce as its computational backbone. So the idea is that we're going to model
27:01
a criminal's preferences for where they intend to commit a crime and we're gonna use spatial features as a proxy for the choice factors that they use when making these decisions. The underlying principle is that we're looking at crimes, economic crimes, not necessarily crimes of passion.
27:21
Economic crimes have a rational foundation to them so we can model them and we can predict them. The example that I have up here explicitly is breaking and entering. So if you think about breaking and entering, there's some choices that you're going to make about where you might commit a breaking and entering crime.
27:42
You're gonna use factors like the nearest police station, the neighborhood demographics and the distance to the nearest highway on-ramp, lighting in the neighborhood of interest and a whole host of other factors, all which are represented as vector features.
28:01
So in order to do this analysis, we need to take those vector features, we need to take the locations of historical events, historical breaking and entering events and determine which of those vector features have an impact, have a predictive impact on the activities.
28:21
So the first thing that happens is this comes in as a WPS request. The inputs to the model are a list of features to consider and the historical events. And the first thing is, so this breaks down into a two-stage MapReduce job. The first thing I have to do is vector to raster transformation of those features.
28:40
We have to say, given any site on my map, what is its relationship to a police station? And we compute a distance to that police station. So we parallelize over the different features and we have 50, 60, 100 different features. We send those out to map tasks within Hadoop. So the top task tracker
29:00
might be working on the police station's problem, the middle one is working on the demographics problem and so forth. Each one of those requests the data out of GeoMesa and brings it back and computes a raster representation of that data. Those raster fields are then sent back to GeoServer,
29:22
which is acting as the client again. GeoServer takes that and fuses it with the locations of historical events and it does that using a statistical model. So now it's estimated a model. It knows the weights of different factors and it needs to predict across the entire geospatial context
29:43
where the most likely places that an attack is going to occur, a criminal act is going to occur. So in order to do that, we have to apply this model, which may have a 80 dimensional matrix representation to every discretized cell in our map
30:02
and that's an expensive operation. So we can parallelize that again by blocking out different portions of the map and sending each block to a different map task for execution, which is shown here. Each one is applying the model to a different region of space and the aggregation, the reduction is done in GeoServer
30:24
and the result is a threat surface, a breaking and entering threat surface in downtown San Francisco. So that concludes the talk that I was giving today. I've listed a bunch of references here.
30:41
Hopefully this is available online so if anybody's interested, you can go to any of these websites and I'm happy to take any questions from anybody.