We're sorry but this page doesn't work properly without JavaScript enabled. Please enable it to continue.
Feedback

Large-scale data extraction, structuring and matching using Python and Spark

00:00

Formal Metadata

Title
Large-scale data extraction, structuring and matching using Python and Spark
Title of Series
Number of Parts
160
Author
License
CC Attribution - NonCommercial - ShareAlike 3.0 Unported:
You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this
Identifiers
Publisher
Release Date
Language

Content Metadata

Subject Area
Genre
Abstract
Large-scale data extraction, structuring and matching using Python and Spark [EuroPython 2017 - Talk - 2017-07-14 - Anfiteatro 1] [Rimini, Italy] Motivation - Matching data collections with the aim to augment and integrate the information for any available data point that lies in two or more of these collections, is a problem that nowadays arises often. Notable examples of such data points are scientific publications for which metadata and data are kept in various repositories, and users’ profiles, whose metadata and data exist in several social networks or platforms. In our case, collections were as follows: (1) A large dump of compressed data files on s3 containing archives in the form of zips, tars, bzips and gzips, which were expected to contain published papers in the form of xmls and pdfs, amongst other files, and (2) A large store of xmls in the form of xmls, some of which are to be matched to Collection 1. Problem Statement - The problems, then, are: (1) How to best unzip the compressed archives and extract the relevant files? (2) How to extract meta-information from the xml or pdf files? (3) How to match the meta-information from the two different collections? And all of these must be done in a big-data environment. Presentation – https://drive.google.com/open?id=1hA9J80446Qh7nd8PMYZibtIR1WjMkdLXfDgwUlts7JM The presentation will describe the solution process and the use of python and Spark in the large-scale unzipping and extraction of files from archives, and how metadata was then extracted from the files to perform the matches on
95
Thumbnail
1:04:08
102
119
Thumbnail
1:00:51
Scale (map)IntelSoftwareComa BerenicesWebsiteMachine learningCore dumpStack (abstract data type)AutomationComputer fileLaurent seriesSequenceInformationData structureNetwork topologyRootField (computer science)ParsingAbstractionMenu (computing)Matching (graph theory)Mechanism designFunction (mathematics)Lattice (order)Database normalizationUsabilityExecution unitGraphics tabletVotingCASE <Informatik>Data qualityHypermediaMatching (graph theory)EmailMetropolitan area networkComputer fontNegative numberSheaf (mathematics)Normal (geometry)File formatBitQuicksortApproximationKey (cryptography)Virtual machineLocal ringRelational databaseMemory managementProgramming languageFigurate numberData managementBinary fileWater vaporAuthorizationLibrary (computing)Binary codeSelf-organizationString (computer science)Natural languageFood energyMultiplication signMessage passingDigital object identifierWordProcess (computing)Table (information)Rule of inferenceFunctional (mathematics)DatabaseMeta elementAnalytic setRight angleRow (database)Content (media)Product (business)Data structureVolume (thermodynamics)Web pageLine (geometry)Derivation (linguistics)NumberFrame problemOrder (biology)File archiverGame controllerRevision controlAutomationSession Initiation ProtocolReading (process)Computer fileElectronic mailing listMassFraction (mathematics)AbstractionGoodness of fitPiSource codePoint (geometry)Atomic numberSequenceTask (computing)DivisorAlgorithmMoving averageExecution unitRepository (publishing)Mathematical optimizationEntire functionInferencePartition (number theory)GodPosition operatorHacker (term)MereologyTerm (mathematics)SPARCSpacetimeData recoveryProbability density function10 (number)Laurent seriesProjective planeSemiconductor memoryNetwork topologyObservational studyOnline helpDisk read-and-write headContext awarenessGroup actionPreprocessorDataflowChord (peer-to-peer)Program slicingLatent heatAttribute grammarCore dumpPresentation of a groupInformationConfiguration spaceComputer programmingEstimatorStreaming mediaEntropie <Informationstheorie>AlgebraType theoryLatin squareLevel (video gaming)Data dictionaryTupleView (database)Profil (magazine)BenchmarkDifferent (Kate Ryan album)StapeldateiAreaArithmetic meanDigital libraryObject (grammar)ResultantServer (computing)Subject indexingBit rateSinc functionComputer clusterThread (computing)Java appletUniverse (mathematics)ParsingParsing2 (number)MetadataLattice (order)Computer animationLecture/ConferenceMeeting/Interview
Transcript: English(auto-generated)
Well, I hope I'm audible enough and understandable, because the question I asked people didn't seem to get it, so I hope I have better luck with this. So yeah, I would like to start with a vote of thanks to the organizers, actually, because
this has been a very enriching experience. The conference has been great, the talks were great, and the energy is so invigorating. And also to the Python community, I think it's great to have people developing an already wholesome language, so yeah, thanks a lot, everyone. My talk today is about large-scale extraction, structuring, and matching of data.
Even I don't remember that without looking at the heading, to be honest. It's a mouthful. But what I'm really trying to say is this, like, how did we manage to make sense of more than 100 million things at the workplace that I work in? My name is Deep Kayal, and let's do a quick introduction of who I am.
I work as a machine learning engineer in a company called Elsevier in Amsterdam. For all those of you who do not know Elsevier, it began more than 100 years ago as a publishing company, but right now it's foraying also into information analytics to derive useful
insights of scientific articles to help people in, for example, healthcare and education by, I don't know, giving them pairs between diseases and drugs, which have occurred simultaneously in scientific literature, so that doctors know if they're treating you
for an illness, what are the possible drugs that they can give you. For that, we need a huge pile of data to derive those insights out of. And as any information analytics company would have it, everything that we do relies on data.
So sometimes when you work with data, you have problems where you have a ton of unpreprocessed, unstructured data, and you're trying to essentially make sense of it. Sometimes you have this problem which is commonly known in literature as record linkage, where you have one sort of production quality database, your good data, and one data dump,
which is like a pile of data someone handed delivered to you which you had no control over, and which you kind of want to make sense of. We had the same problem, and we keep having the same problem sort of repeatedly,
because we want to enrich our production data as much as possible, and as data quality improves over time, such enrichments will continue to happen. And I'm sure for all those out here who work in the data industry, it's the same for you. So the problem was this. We specifically had many hive tables where we had extracted relevant stuff
from scientific papers, mostly bibliographic information like a title or an abstract, like a publication here, a DOI. DOI, if you don't know, is a digital object identifier, which is like a primary key for scientific publications. It's mostly one-to-one, not always, but mostly.
So this is our sort of good data. We know exactly how it looks like, but the quality, of course, is still improving over time. And then we also had a data dump, so as to say, which was all over the place, to be honest, because we had no control over it. It was delivered to us and is still being delivered to us in pieces by third-party vendors,
and they do their own magic to it, magic. And they do all sorts of stuff like recursed, zipped archives, PDFs, instead of more structured content and so on. So we didn't even know what we were looking at.
And it was a bigger problem than this because we had over 100 million files to deal with, so it was really hard to sort of have a summarizing inference from just taking a look at it manually. So when we got this problem to be solved, it was pretty daunting to start with,
to be honest, because there were too many things going on at the same time, and we didn't really know where to begin. But like all problems, you mustn't really hack through it, although hacker is a fancy term these days, but you should deal with stuff more scientifically. You try to break down the problem into atomic tasks, and you solve them as best as you can for each task,
and then you combine them at the end, much like you would do in divide-and-conquer algorithm. In our case, the relevant questions were, how do we untangle this mess? How do we realize what's in those archives? Once we do realize what is in the archives, how do we make sense of it and extract any useful information out of it?
Like if we know that the archive has a docx word file, then what can we do about extracting a title from the docx file to have matchable and meaningful information? Then using that information that we just extracted, how can we best match to our production database? And finally, which is a recurring question through all of these atomic subtasks,
is how do we do it at scale? Because we are talking about tens of millions to hundreds of millions of files. The tech stack, of course, was Python, which was the main programming language, and Spark for processing for the win. Python integrates surprisingly well with Spark,
so if you haven't tried it out already for your own problems, please do it. It's really great. To answer the first question of how to make sense of what is in the archives, you again have to be scientific. Of course, there's no way to generalizably automate such a process. Every compressed archive is inherently different.
You can do your best, but you can't really pass on the workflow to other people, but you can't really pass on the code, per se, because it will not work for everything. But looking at the archives, taking slices of it, looking at it manually, or also talking to people who made the archive, if you have access to them, you can make some sort of a well-formulated assumption about the archive,
and then you can code it up and then generalize it. That's how you probably code anyway. So our data, when we took a look at it, seemed to be sort of nested, or recursed, compressed archives. It was zips of zips, or zips of gzips, and tars, and so on.
But what was in those zips was mostly PDFs, or XMLs, as you would expect most scientific papers to be. So let's start with a very small example of what I'm really saying. First, we see how we distribute the data on Spark.
If I don't go into the optimization details right now, it's because it's only 30 minutes, but I'd be happy to talk about it outside this. So let's just assume this command. If you do a binary file with a Spark context, a Spark context, by the way, is how you talk to a Spark cluster.
When you initialize a Spark context, it means that you have told the resource manager that it should await commands. So you pass on your sort of magnificent 1000-tar gz files to the Spark cluster via this binary files command. Now Spark is sort of ready for processing these files.
The next thing that you need to do is tell Spark what to do. For that, you need Python. So you write these two helper functions. The first function sort of extracts information from it. You see that it takes a variable x. Now, when you do the previous thing, Spark creates what we know as RDDs,
which is essentially a key-value pair. So the key is the name of the zip, for example, 1.zip. And the value is the whole binary content of the file. So here, when you see x is being split into x0 and x1, where x0 is the name of the file, and x1 is actually the whole binary, the byte content of the file.
Then you put it through your zip and tar and gzip libraries to actually get the contents of the file. You read in everything that is XML, which is this specific case, but you can choose to do so with other types of content. And then you make a dictionary out of it, which is essentially, again, a key and a value.
The key is the path to that particular file within any zip file anywhere. And the value is the whole content of the file dumped in the dictionary. So now, if you run only the extract on Spark, you would have a list of such dictionaries, but you don't want that because that's not very parallelizable. What you want is actually a list of tuples,
like a list of key-value pairs. So you have to flatten the dictionary, and that's what the flattener function does. Once you have these functions and you've tested them, you sort of push them through the map function of Spark, which efficiently distributes your work. If you use a map function, it will distribute it to per data point,
but there are other functions which distribute more efficiently, but let's not go into that right now. You call a flat map then to flatten the contents, and then finally, you save everything that you did into what is known as a sequence file. Now, a sequence file, if you do not know what it is, is Hadoop's way of efficiently storing data.
Instead of storing one unit of information per file, what it does is it stores a massive file with, say, 1,000 files, and another massive file with 1,000 files, and this way it can effectively distribute all your files into the different nodes and executors that it has.
What you essentially did at the end is you produced this sequence file, which is a key, which is the name of the file, and the value, which is actually the whole content. I hope you can see it. So you achieved your first task of making sense of it all.
From a mangled mess of data, you managed to iron out the contents of the file and actually track it back to its source. Then the next problem is how to extract meaningful information out of it all. Here as well, the approach is the same. You have to sort of dig into the data
and do some sampling and make some assumptions, which are general enough for your use case. For our use case, we were trying to understand what kind of bibliographic information is in there, meaning titles and abstracts and DOIs and so on. And we were mostly looking at things which represented scientific articles,
so XML files and PDFs. XMLs are relatively simple because they're structured content anyway. You can just use Python's XML library in your own function. But with PDFs, we had a harder time because PDFs aren't really structured. It's an ongoing area of research, actually.
It hasn't been solved yet. There are many such tools which claim to structure PDFs, but none of them work with a gold set accuracy. What we used was this tool called Cermine, which is, I think, by a university in Poland, which uses machine learning to effectively check
where you have things in a PDF to make sense of it by comparing fonts. So a header is a bigger font and a section is a smaller font and so on. And with both textual information and these lexical information, it kind of makes sense of a PDF and splits it into a more structured format. So let's not go into PDFs for now
because it's a bit more complicated. Let's see what we can do with an XML. Here you have an example XML. What you need to consider is that you perhaps want to extract the title from it. You see that it has the title within a tag which conveniently says title.
But that's mostly the case with these well-formed tags because a title of a scientific article can only be called so few things, or an abstract of a scientific article can only be called so few things. You can quite easily have a few rules which help you extract very general things
like a title, an abstract, or a DOI, or a year. It's, of course, much harder if you want to extract something like a section or figures and so on, but we don't want to do that. What we wanted to extract were more overall metadata information and that was pretty simple to do. So then you write your own small parser for the title. You use the XML library.
You push your XML through that as a string and you find everything that is called a title or a citation title. That is what we sort of learned when we looked at the data and most of our cases were covered. With that you just return it to another function later on.
So with this XML, when you pass it through a general parser which is very easy to write, you actually get the title correctly extracted. You write such other parsers for, for example, abstract, or a DOI, or the journal volume, the issue, the page numbers, and so on,
and you put it together in a meta function which takes in your sequence file. So if you remember, the sequence file was a key and a value. The key was the name, the value was the content, and that is exactly what we're doing. The F name is the file name, which is the key, which is X0, and the content, which is actually the whole file itself, is X1.
We want to process the content and that's what we do in all these parsers. Once we have all the processed content back to us, we have to make Spark understand what it's getting back. For that, Spark understands what a row is, a row in a data frame, a table essentially. A row is, for us, might be a collection of all these values that you see,
file name, DOI, volume, and so on. So you have to tell Spark that, hey, I am expecting a file name at the first value of the row, and a DOI at the second value, volume at the third, and so on. So you have to sort of create a struct. In our case, it's called one row.
So when you get all your parsed information back, you push it through a one row to make a row that Spark understands with all your information. Finally, you want to do this not just for one file, but your entire repository of everything, represented by the sequence file. So you call the sequence file command, which makes an RDD,
and then you push your parser function through it, which gives you back rows. So once you map, you push your function through the mapper and get it back as a data frame, you have something like that, which is essentially a table of what you wanted. You have, from very unstructured data,
managed to make something realizable. And that's the quick recap here. We started off with a data dump of everything all over the place, and now we kind of know what we are dealing with. We extracted some meaningful information out of it. The final task that we had to do, and the most important, I guess,
without which the whole exercise falls apart, is the matching. So how to match two things? There are many ways to match the approximate matches exact matches. Joins are a good way to match. It's a very standard thing to do in SQL. If you have keys, which you can trust on, of course. And approximate matching is something which people do more and more.
There are techniques called like locality-sensitive hashing, or LSH, which is a very popular technique to do approximate matching. But again, let's not go into that, because I'm just trying to give you a hint of what you can do with Python and Spark. So let's see what we can do with only exact matches. Exact matches, as I said, rely a lot on how you pre-process data.
So if you have a title which is missing a dot at the end, versus which has a dot, if you do a join, you won't see it. So what we have to do is pre-process it a little efficiently to avoid such false negatives.
So the first step for that is to normalize the content. So here we have a quick normalizer, which essentially zips through the title, checks if there are any stop words.
Now, in natural language processing, there's this notion of a stop word, which are frequently occurring words like if, or the, or when. So we first get rid of those, and then we convert, we get rid of all non-alphanumeric characters, like dashes, or percentages, or question marks. And then we lowercase everything.
So that becomes our match key. So with that function that we've defined, we load up the table. We push it through Spark again, telling Spark that, please take my title and apply the normalize function on it. This is very similar to pandas. I think if you've used pandas, you know what this does.
So it will take the title and make another column called norm title, which normalizes your title in the way that you've described. So if you see here management of acute kidney problems, if you quickly browse the norm title, you see that the off has disappeared because we removed a stop word.
Now this is kind of a better matching key than your raw text. The next thing that you do to complete your match is join on different things, followed by unioning everything so that you have one match file. So you've joined by things like DOI. Of course, you have to check before that that the DOI
or any such key on which you're joining is non-empty, because if you join two things which are empty, you'll get tons of false matches. So you join things on, for example, DOI. You join the thing on the normalization of the title that you did. You can similarly normalize the abstract and join on that. All these joins, of course, are inner because you want a one,
hopefully one-to-one match. You finally do a big union, and then you take a distinct because you don't want duplicate rows. So finally what you have is this nifty little file where you have, on one side, a PUI.
It's just a primary key that we use. So the primary key of one of your databases, and the other side you have, for example, the file name to where this huge mess of things is located. So the primary key, again, of your messy data dump. Essentially what you have just managed to do is match everything against everything else.
Now it's ready for enriching. In summary, what we managed to do here was we had some production quality good data. We had a pile of data lying around waiting to be used in enrichment, which we had very little clue about. We managed, in the end, to match everything together,
what was possible to be matched, and make the match pairs ready for further products to make use of. We did that by effectively breaking down a huge problem into smaller, more tackleable subproblems, and solving each of the problem by itself with the best of our abilities and approaches.
And we did everything using Python and Spark. So that's to conclude the whole talk about what you can do with Python and Spark, or a relatively short fraction of time, just to give you a hint of everything. And feel free to reach out to me now, or outside, or over email
on the aforementioned address. And we're always looking for people to work for us on problems like these and more machine learning oriented and NLP oriented problems at Elsevier. So yeah, thanks a lot.
Thank you very much. Thank you. Do we have any questions? Hi. Great talk. Am I audible, first of all?
Yeah, you are. OK. Which version of Spark were you using for this? It was 2.1. So 2.1 even has LSH that I mentioned, for example. So it allows you to do even approximate matching flexibly. So yeah, 2.1. If I understood your presentation correctly, you have chosen RDDs over data frames?
No. You have to choose RDDs as the first version, because data frames assume structured data, right? So we didn't really have CSVs or things like that. We had to start off with a raw file, which we had no clue about when we started off. So it was embedded within an archive repository. So it was like a file within a zip within a zip.
So were you able to use the catalyst optimizer? Sorry? Were you able to use the catalyst optimizer, which directly works on the data frames? No, no, no. We had to do our own, basically. We had to partition it, yeah. So that's what I avoided here. We had to chunk those files by ourselves and make the memory management ourselves.
Yeah, good question, actually. Another question? Which library did you use to use Spark with Python?
Just PySpark. It's PySpark. I don't remember the version, maybe. Yeah, I don't remember the version, but PySpark. I think it's pretty standard. Yeah, thank you. How many nodes or slaves do you use in your architecture? How many nodes? I think it was our xlarge Amazon cluster with eight, I think.
OK, and for beginners, how hard is to set it up, for example? It's maybe a day's worth of work. Not very hard, to be honest. To optimize it is a bit harder. If you want something which you can optimize, like the memory management which you can optimize very quickly,
I would suggest starting to read Flink. Have you heard of Apache Flink? That's another side project which has a better memory optimizer. Just a minor comment. When you generated the product UI, when you generated the product UI, if you could go back to that page,
you remove the stop words and you blast the white space and you end up with a very long run-on string, like management acute kidney syndrome. Yeah, why would you choose not to blast white space into an underscore and then you can recover? Yeah, that's a good point.
I mean, we actually did much more than this, so this was just a quick thing to show you guys. We actually ended up using NLTK to remove even non-noun phrases, because, you know, if you have a title, the assumption is that someone is introducing some novel concept in a title, which is probably not a verb or an adjective.
It's more probably like a noun phrase. So you might have a title like cure for, I don't know, malaria or something. And in those cases, it's probably useful to remove everything but a noun phrase. Well, that was a bad example, actually. In longer titles, it's useful to...
What we found was that it was useful to not just remove stop words but also remove everything which wasn't noun phrases. So just keep the nouns. So there were many pre-processing things that we did and we sort of ended up with the best one that worked best for us. So this was just an example, but you're completely right.
That would probably have worked as well. Yeah, I mean, it's also possible to use word factor or something like that. Yeah. Yeah, another good question. Thanks. Are there any other questions? I just wanted to mention to the audience, if they're interested in this,
that PySpark got released on Python's package index a few days or stuff ago. So you can now pip install it. And it's pretty recent news, so... Okay, thanks.
Thanks. Also to that, couldn't you tell me what kind of reliability you can reach with this type of matching or just in your data in general? Another good question. Actually, I avoided numbers here in this presentation because when you give people an accuracy, a lot of that depends on the configuration, right? How you did stuff and what kind of things you did to validate your results.
So I can mention it here. The throughput that we got was, I think we were crunching around 70 million files in five hours on a relatively small server with nothing fancy. And we got around 50% match rate with just the digital object identifier,
which I mentioned was like a primary key for files. We didn't get more than that because a lot of DOIs were missing in the data dump that we got. When we added titles and abstracts, it was around 76 or 80% towards the end. So we could match around 80% of the dump that we got to our own data to enrich it.
So yeah, is that kind of what you're looking for? Anybody else?
How do you handle multiple languages? I mean, this presentation is with English language, but I assume you have a lot of languages. So the short answer is that in XMLs, since it's tag specific, there's an attribute of a tag, which is a language, for example,
in most scientific content that comes to us. So you can actually filter a tag out and also see what language it is. We didn't just extract English. We extracted all the possible languages of things that the XML had, basically, before we matched it. So it was not just English. It was also... And it's not that complicated, actually.
It was relatively simple to infer, given that the data already has the attribute in the tag that lang equals ES or EN or something like that. We have time for two more questions. Give me a break, guys.
So any advantage of using PySpark doing the joins here compared to using a relational database? So we didn't really use a relational database because we did all of the dumping natively in Spark anyway. But we did compare what it would take
for a multi-threaded Java program to do this. And the throughput was something like 20,000 files an hour. It was significantly less. So I mentioned like 70 million files in five hours as opposed to 20,000 files in one hour. But yeah, it's a good thing to try to benchmark it against a database.
But as I said, we had a pile of data and the first thing was to, in fact, extract information meaningfully out of it into a database and structure it. We didn't even have a database to start with. We had one database, but we didn't really have another one. Thank you. Last quick question. What is your recommendation for doing streaming?
Oh, Flink for sure.
I think I just mentioned this to someone else as well because Spark, you said streaming, right? Yeah, so Spark is, I guess you know it already, but it streams in mini batches. And the memory optimizer for Spark is good, but it's still, I think it's not the best. So Flink is an up-and-coming project of Apache as well,
which is actually the other way around when you compare it to Spark. Spark streams in small batches and Flink is actually a streaming pipeline which batches in small bits of the stream. And it has a better streaming process and it has better memory management. So if you want to begin a project the proper way, which is a streaming project,
try using Apache Flink as well. I mean, try reading about and benchmarking Apache Flink. So why not storm? Sorry? Why not storm? Why not use storm? Storm, yeah, sure. I haven't used it myself, I'm sorry. So I don't really have any experience of storm,
but I have heard good things about it. So I said Flink because Spark and Flink might be similar in a few regards. Like, I don't know, you can do machine learning on both and stuff like that. I haven't really used storm, so I don't have a benchmark to compare against, sorry. Thank you.
That's the whole time you've had for questions. So thank you again. Thanks a lot.