Exploring Apache Iceberg: A Modern Data Lake Stack
This is a modal window.
The media could not be loaded, either because the server or network failed or because the format is not supported.
Formal Metadata
Title |
| |
Title of Series | ||
Number of Parts | 131 | |
Author | ||
License | CC Attribution - NonCommercial - ShareAlike 3.0 Unported: You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this | |
Identifiers | 10.5446/69440 (DOI) | |
Publisher | ||
Release Date | ||
Language |
Content Metadata
Subject Area | ||
Genre | ||
Abstract |
|
00:00
Ext functorStack (abstract data type)Design by contractWebsiteEnterprise architectureOpen sourceSoftwareComputer virusMereologyProjective planeMultiplication signSoftware engineeringOpen sourceInformation engineeringImplementationSpeech synthesisComputer animationLecture/Conference
00:50
Predicate transformer semanticsInformationIntrusion detection systemDecision theoryVirtual machineGroup actionEndliche ModelltheorieProduct (business)Internet service providerBitSoftwareContext awarenessTable (information)Enterprise architectureMathematical analysisRadical (chemistry)Power (physics)MereologyAnalytic setInformation securityFile formatComputer clusterPredictabilitySource codeComputer animation
01:54
GoogolString (computer science)Type theoryEstimationInformationDatabaseMetadataMathematicsLibrary catalogDatabaseNetwork topologyConcurrency (computer science)NamespaceInformation securityUniform resource locatorTable (information)Row (database)RootRelational databaseLevel (video gaming)Computer fileData storage deviceFile format10 (number)Object (grammar)State of matterPoint (geometry)Partition (number theory)Different (Kate Ryan album)Data typeString (computer science)Time seriesRight angleScaling (geometry)Revision controlInformationCodeWebsiteComputer engineeringConsistencyAbstractionFile systemLatent heatoutputOpen setPhysical systemSystem callVolume (thermodynamics)Order (biology)Set (mathematics)Fraction (mathematics)Link (knot theory)MereologyAnalogyType theoryLibrary (computing)Statement (computer science)Multiplication signClient (computing)Fitness functionSource codeTrailProcess (computing)Single-precision floating-point formatEvoluteCASE <Informatik>Computer architectureImage resolutionCuboidProduct (business)Ocean currentLine (geometry)Core dumpNumberDatabase transactionWritingComputer animation
09:20
Partition (number theory)Time evolutionGEDCOMLibrary catalogConfiguration spaceData warehouseRun time (program lifecycle phase)Extension (kinesiology)File formatRevision controlKey (cryptography)NamespaceTable (information)LaceGoogolConnected spaceCodeTable (information)Uniform resource locatorImplementationPartition (number theory)Computer fileFrame problemLatent heatStrategy gameConfiguration spaceData warehouseRevision controlCartesian coordinate systemIntegrated development environmentQuery languageOpen sourceClient (computing)Line (geometry)Functional (mathematics)Single-precision floating-point formatSampling (statistics)State of matterPhysical systemMathematical analysisSoftware frameworkScaling (geometry)NeuroinformatikVolume (thermodynamics)MathematicsProduct (business)Electronic mailing listLibrary catalogString (computer science)Multiplication signData storage deviceWritingTrailGroup actionWeb pageModal logicRow (database)Bound state1 (number)PlanningCategory of beingFile formatExtension (kinesiology)Run time (program lifecycle phase)CountingFilter <Stochastik>Network topologyPointer (computer programming)Time travelMetadataHuman migrationReading (process)Virtual machineNamespaceSet (mathematics)Auditory maskingCASE <Informatik>Proof theoryWorkloadComputer architecturePoint (geometry)Operator (mathematics)Transformation (genetics)Commercial Orbital Transportation ServicesStatement (computer science)Software testingComputer animation
16:47
File formatRevision controlNamespaceTable (information)Partition (number theory)GoogolMetadataSchemaevolutionInterior (topology)Key (cryptography)Proof theoryConservation of energyMilitary operationComputer fileWritingLibrary catalogFluid staticsPartial derivativeStructural loadContent (media)Moving averageImplementationSinguläres IntegralQuery languageDigital filterArrow of timeTable (information)Computer filePartition (number theory)Ocean currentInsertion lossMetadataStatement (computer science)Category of beingObject (grammar)Software testingOrder (biology)Functional (mathematics)Directory serviceRow (database)Uniform resource locatorFrame problemData warehouseNamespaceLibrary catalogConnected spaceLocal ringArrow of timeData storage deviceCASE <Informatik>CodeInstallation artLine (geometry)Query languageField (computer science)Intrusion detection systemIntegrated development environmentHorizonFitness functionParity (mathematics)Virtual machineRight angleString (computer science)Pointer (computer programming)File systemDatabaseTrailElectronic mailing listFile formatOperator (mathematics)Task (computing)PlanningSet (mathematics)NumberMathematicsMultiplication signPoint (geometry)Projective planeComputer configurationComputer animation
24:13
Special unitary groupComputer fontOrder of magnitudeHand fanBinary codeComputer fileCompact spaceLibrary catalogArithmetic meanPresentation of a groupPairwise comparisonDifferent (Kate Ryan album)File formatTable (information)CASE <Informatik>EncryptionMereologyProcess (computing)Revision controlCycle (graph theory)Configuration spaceVideo gameProcedural programmingUsabilityRight angleQuery languageOrder (biology)Set (mathematics)GodWebsite2 (number)Shared memoryComputer animationLecture/Conference
Transcript: English(auto-generated)
00:04
Hello. Thank you. Thanks for having me. Dobri Den people, that's the most check I could learn on this trip. And let's get started and explore Iceberg. A little about me. I'm Gautami, a software engineer
00:20
at Bloomberg on the Enterprise Data Lake Engineering team. And as part of the Data Lake team, we work on a data engineering stack involving Python, Apache Iceberg, Spark, Trino, Airflow, et cetera. And I have recently started contributing to PyIceberg, which is an open source project, which is the Python implementation for Iceberg.
00:41
And I flew in from New York to speak here at EuroPython today. And it's my first time speaking here, so definitely excited about that. A bit of context for anyone unfamiliar with Bloomberg. Bloomberg is a leading provider of financial data. What you see here is called the terminal.
01:00
It's our flagship product for more than four decades now. It is really a comprehensive piece of software that allows users to do in-depth analysis on financial securities, get news, analytics, et cetera. And the main users are finance professionals, basically anyone interested in global capital markets.
01:22
This talk is less about the terminal itself, but more about the data that powers the terminal. So another part of Bloomberg's business is acting as a data vendor to larger enterprises. Those are companies with software departments, research groups, who want to consume this data to test out
01:41
their investment decisions. Say they might want to build machine learning models or predictive models on top of this data. And our engineering group ingests all of this data into Iceberg tables in parquet format. So this talk is going to be about the issues we had with this raw data and how we
02:01
managed to ingest a quadrillion data points into our data lake. If someone's counting, that's like 15 zeros, a million billions. I dream about having a tiny fraction of that fortune sometimes. So let's dive in and see how we built a historical data lake
02:20
and tackle this data volume at scale. I also hope to convey some practical knowledge with code snippets on different ways to interact with Iceberg using Python. So first, let's take a look at the data to see why there are some pain points here. What you see here looks similar to a CSV file.
02:41
It's actually a Bloomberg format, which basically shows there's a timestamp. There are a bunch of rows representing financial securities. You can see Apple and IBM stocks and their stock prices. You might be wondering why this is even relevant today. That's right. But these snapshots were taken a long time ago
03:01
when some of today's widely used formats were not available. So we have tens of millions of these text files. Good thing they're on S3, on an S3 compliant object store. But some of them are very large. I'm talking petabytes of data here. And the data type information is not stored within the file itself.
03:21
We have to contact a database maintained by Bloomberg to be able to get the strongly typed data out of this. So the bottom line is that there are a large number of these files. And we need a significant amount of pre-processing to be able to get some value out of this for users, say building a time series or cross-joining data sets.
03:45
So in summary, we have tens of millions of these files. These are text files, means everything is a string, and the data volume in the order of petabytes. So we wanted to build a system to tackle these issues. And when we started to build such a system,
04:01
we also wanted some other guarantees for our data lake. One thing is having a golden copy of financial data to act as a single source of truth. And we also wanted an audit trail to track the history of these changes. And since we have been collecting these snapshots for four decades now, the schema is bound to evolve.
04:22
And we need a system that could handle that. And when we wanted to apply the changes to any of this data, we wanted to apply it in a transactional format upholding asset guarantees, because data consistency and accuracy is our utmost importance. Finally, our QC and internal Bloomberg teams
04:42
need to be able to query this data for supporting clients. And Apache Iceberg has been an ideal fit for these requirements. And what you see here is a core piece of our architecture in production. The input files are on an S3 compliant object store.
05:01
We use Apache Spark as a compute engine. And basically, we use Spark to read the input files and insert them into Iceberg. And for the storage layer, the actual data files itself reside in S3 in Parquet format. So what is Iceberg and what it isn't?
05:20
So Iceberg is an open table format. I know it's an abstract concept. I'll try my best to explain it. It is basically a standard or a specification that tells you how to manage a large collection of files in a distributed file system. So imagine it taking a bunch of files on your data store,
05:40
say S3, and manages them as if it's a database table. When you think of a database table, its schema can change. You can add, update, or delete data from it. Iceberg tracks all these changes for you. And the best part is you'll be able to query this distributed data using SQL, just like a SQL table.
06:02
For an analogy, think of having a massive library that's only growing with books every day. And Iceberg is like this librarian who is trying to update the library catalog so you can find the books you need. Again, it's an open table format, not a storage layer,
06:21
not a compute engine. So let's talk about how Iceberg does this. What you see on the right here is Iceberg's specification. I directly sourced it from their website. And this is a great way to understand what's happening under the hood. At the top level, what you see is called a catalog.
06:41
And why do you need a catalog? So if you imagine Iceberg has a way to manage all your distributed data as a table, now there are a bunch of these tables. And Iceberg would need a way to track all these tables. And that's where the catalog comes into picture. What we are using here is a JDBC catalog, which is shown as the relational database
07:01
on the left here. And the first three columns in this catalog represent what the table is, what namespace and catalog it belongs to. And the fourth column, the most important one, is the one used to track the metadata location of the table. This is basically the root of your metadata tree.
07:20
So every row in the catalog represents an Iceberg table. And the table state itself is maintained in the metadata. And the first layer of the metadata tree is a metadata.json file. And there are something called manifest files, which are Avro files. And the actual data files itself are in Parquet.
07:44
So why Iceberg? Let me also talk about some key features that truly set Iceberg apart. The process of creating these data files, the metadata files, and updating the metadata location in the catalog all happen within a single transaction.
08:01
So everything is atomic. No partial changes are committed. And if there are concurrent readers and writers accessing the same Iceberg table, the readers will always only see the data from the most recently committed metadata. And they will not be affected by any intermediate data changes due to concurrent writes.
08:22
And Iceberg also does conflict resolution when there are concurrent writers out of the box. So writers try to optimistically create these metadata files, assuming the current Iceberg version of the table wouldn't change before they finish the comment. But in case if that changes, writers
08:40
can automatically retry based on the latest Iceberg table version. Iceberg also supports in-place table evolution. Let's say you have an Iceberg table called EuroPython with these three columns, session ID, session name, and session type. And now we want to add a new column, session duration. That is possible.
09:01
And it's as simple as issuing an alter table SQL statement. And schema changes are metadata only. So no data files are rewritten. You can as well change the partition layout when your data volume changes. Data partitioning in general is a way to split large data into smaller chunks.
09:21
And by specifying a partitioning strategy, what you're essentially doing is grouping similar rows together when writing. So let's say today you have an Iceberg table that you have partitioned by year. You can see the 2023 and 2022 partitions here. And let's say your data volume changes. Maybe your application is growing so fast
09:41
and you want to optimize for your read queries. And now you want to partition by month. You can do that. Both these partitioning specs can co-exist together in Iceberg. And again, this is also metadata only. And no data files will be rewritten. So Iceberg basically eliminates the need for an expensive migration if you
10:01
want to change the partitioning strategy on big tables. Query planning. This is interesting. So Iceberg makes it really easy to push down your query filters down the metadata tree and onto the data layer itself. How does it do it? Basically, it maintains every node in this metadata layer,
10:22
maintains metadata on the nodes below it. So this is the manifest file. It keeps track of the data files. So it knows what is the record count in your data file. What are the null value counts for the columns? What are the lower and upper bounds of the columns? And the next layer is manifest list.
10:41
It tracks a list of manifest files. So it knows what is the manifest file location and what data partitions exist in each of these manifest files. So it makes it really easy for Iceberg to only open the relevant manifest files based on your query filter. And once it does that, based on the lower and upper bounds
11:01
of the columns in the data files, it can prune the data files. It can only read the relevant ones. And if the underlying format is Parquet, you can now access only the relevant columns from your Parquet. That means reading only the necessary row groups and pages from Parquet. All this means is minimizing the amount
11:21
of data being retrieved from S3, reducing your costs, and increasing performance. Iceberg also has the ability to time travel. So it keeps tracks of changes to your table. So let's say you have an Iceberg table and you inserted some data into it. It creates a snapshot. A snapshot basically represents a table state
11:42
at any point in time. And let's say after a while, you made some more data operations, like appends and deletes. It creates more snapshots. So you have the ability to time travel. So you can query your Iceberg table as of a previous snapshot ID or at any previous time.
12:01
So essentially, what we ended up with is a system that looks like this. Iceberg sits on top of the storage layer. And we layer distributed query engines, like Spark and Trino, on top of Iceberg, and then layer whatever interfaces that people are already using to perform their analysis.
12:21
We heavily use Spark and Trino right now. And there are well-maintained connectors for Spark and Trino, Spark for ingesting into Iceberg, and Trino for our support use cases, where our QC and Bloomberg internal teams need to query this data for support use cases. The key thing that I want to highlight here
12:42
is how future-proof this design is. Since Iceberg is an open spec, there is no concern about vendor lock-in. So we are not concerned that we won't be able to use whatever is trending in the future. Because if a new query engine comes along, we can maybe find a connector from the open source community,
13:02
or we can easily write something on top of the Iceberg specification. So all this means is this design is scalable and flexible. We have been using Iceberg in production for, I guess, three years now. And we are confident that this would work even if our data volume changes tomorrow.
13:21
We are in the petabyte scale of things right now, but even if that changes, we are hoping that this flexibility will work well for us. So now that we have looked at the architecture, let's take a look at some code. So there are two different ways in which you can interact with Iceberg using Python,
13:43
using PySpark, which is the Python API for Apache Spark. And Spark is a distributed computing framework. Basically, it has a cluster that are a bunch of nodes against which your queries will run against. Obviously, this is suited for larger workloads, large data sets.
14:01
On the other hand, PyIceberg is the Python implementation of the Iceberg spec. So you can run your application in a standalone Python environment on a single machine, and that's easy to access the Iceberg tables without a Spark cluster.
14:20
The first and the foremost thing when you want to set up a Spark session is having the config that Iceberg needs. So let's try to add some, this is a sample Spark session, and I'll walk you through this code line by line to see how we need to add the Iceberg config to set up the Spark session.
14:42
So first thing is adding the Iceberg Spark runtime jar. You add this to the Spark jars folder, and this will make the runtime dependencies needed for Iceberg with Spark. And the Spark session builder is used to configure and initialize the Spark session. You need the Spark SQL extensions
15:01
to get Iceberg-specific custom SQL functionalities. And one important thing before creating any Iceberg client is to set up a catalog. Catalog basically manages a collection of your Iceberg tables, so you need to specify a name for your catalog. The data warehouse location, basically the location where your data
15:22
and metadata files are gonna reside. And in this case, the catalog implementation is JDBC, so we have to provide the JDBC connection URI string. I masked the catalog name in this case, but you can basically give it whatever you want. So once this is done, you'll have a Spark session
15:41
with the catalog named whatever, and you will be able to load your Iceberg tables. Let me show you a code snippet on how to create an Iceberg table using the Spark session before. So this code snippet shows a create table statement, so basically create table if it doesn't exist,
16:01
catalog.test.europ-ython, your catalog name, your namespace, and your table name, using Iceberg. And table properties can be anything. There are a bunch of properties, Spark configuration that you can add here. What I'm providing here is a table name, as a format version, which is the Iceberg specification version.
16:21
Iceberg spec is constantly evolving, and right now, I think Iceberg spec version three is in the works now. And lastly, specify your partitioning strategy. I'm providing partition by year of session date. So it's basically Iceberg takes or computes the partition values
16:41
by taking this column session date and applying the year transform on it. And as select from data frame. And I have already created a PySpark data frame, which I'm using to create this Iceberg table. And how do we query it? It's as simple as using a simple select on table name.
17:02
And it basically retrieves the data from the PySpark data frame that I've created before. Let's take a look at the metadata JSON file. So this is how, or this is the metadata file that gets created after you run the create table statement. So you can see what schema this table has.
17:22
You can see the partitioning spec. You can see the current snapshot ID. Every data insert is associated with a snapshot ID in Iceberg. And you can see any custom table properties that you have provided. And a bunch of other stuff, like how many records are in this table? How many partitions have changed?
17:40
And stuff like that. And once the table is created, you should also be able to see your data and metadata files in the data warehouse location that you have specified while setting up the Spark session. In my case, this is an S3 object store, and that's what we see. We have the data and metadata directories.
18:03
Let's look at the order of operations that happen when we try to create an Iceberg table. So the first thing it does is creating a data file. So this is an immutable Parquet file, and you can see that this is residing in the partition session date.
18:20
The next step is creating a manifest file. A manifest file tracks data files. The next one would be manifest list, which tracks a list of manifest files. Finally, it is followed by metadata.json, which I showed before, with a snapshot ID.
18:40
Sorry, the one final step is updating this metadata location in the catalog itself. The next code snippet is about how you can evolve the schema. You can use a simple alter table statement to add a new column. And a new column is added. It has null values because there is no data in that column.
19:03
And if you want to find where this change is reflected, you have to look at metadata.json file. You'll have a new schema that's present in the JSON file, and now your Iceberg table will also point to this new schema that's created. You can see that there is a column,
19:21
session duration minutes in the new schema. And another interesting thing here is that columns in Iceberg are tracked by field IDs. And these field IDs are immutable. Your columns can always be renamed, or the order of your columns can change, even after your data files are written.
19:41
So Iceberg fetches these columns from your data files using field IDs. So let's look at the PyIsberg way of doing things now. As I mentioned, PyIsberg can run on a single machine in a standalone Python environment. So it's as simple as doing a pip install of PyIsberg.
20:02
And you can also install some optional dependencies like S3FS, if you want to interact with S3 object store. In here, it shows SQLite database, because this code snippet is showing setting up a SQL catalog, which uses a local file system to show your data and metadata files.
20:23
So the first thing, we set up a catalog, which is a SQL catalog in this case. You specify the data warehouse, you specify the connection string to your database, which is a local SQL database in this case. And we have a catalog. Now we have to create a namespace before creating a table.
20:41
That's what we do in the last line here, create namespace called test. And there are multiple ways to create a table using PyIsberg. I'm going to show you a way in which I'm reading a Parquet file using PyArrow and then using that arrow table to append to the PyIsberg table. So I first installed PyArrow,
21:02
which is a Python API for Apache Arrow, which is an in-memory storage format. And this function basically just reads the Parquet file and returns a PyArrow table, something like this. And the next step is creating the actual table itself.
21:20
You can just use the create table API on the catalog object that you created and then say create table test euro Python PyIsberg with the schema of the PyArrow table we just created. And appending data is by specifying the append API, you just say append of the PyArrow data.
21:41
And this is the local file system where you would see how your data and metadata files are populated. So you can see the euro Python PyIsberg table and you have the data directory that has the Parquet data file and the metadata directory, which has a bunch of files because I did create a table,
22:02
an empty table basically with just a schema. So that created a metadata.json and then I appended some data into it. That created another metadata.json along with the manifest files that track the data files. And this is a code snippet to query some data from the PyIsberg table we created.
22:23
This works by using the scan API for which you can provide a row filter. I'm providing something like duration minutes, greater than or equal to 35 and you can specify what columns you want to project, which is session name and session date here and limit the number of records that are being returned.
22:41
At this point in time, no data is being retrieved by PyIsberg and you can also use the low-level plan files API that returns a set of file tasks that tell you what files might contain the matching rows pertaining to this query. So I only have one data file, so I just returned a data file.
23:01
And in this case, it is actually up to the engines itself like Pandas or DuckDB to filter the actual data files. You can easily convert a table scan to Arrow or Pandas data frame. And those are the code snippets I wanted to show to help you understand how to get started
23:22
with your Iceberg journey. And this is a snippet from the next upcoming release of PyIsberg. So the community is trying its best to expand the Python API and achieve the feature parity with Spark. And the next release has some exciting updates
23:40
with respect to partition rights and metadata tables. So stay tuned and see if that's something you want to get involved with. And as a team, we are definitely exploring new horizons and trying to adopt PyIsberg wherever it's the right fit. And our team also has made some amazing contributions
24:00
to the PyIsberg community. That is where I want to end this today. Thank you for having me. I'll stick around for any questions and feel free to reach out to me. Thank you.
24:22
Thank you so much, Galtmy. Do you have any questions? Yeah, thank you for the interesting talk. I have actually two questions. Have you had any comparison between Delta table and Iceberg? Because basically the features that you mentioned are also available in Delta table. Yeah, I knew that this question would happen.
24:42
So I think when my team started exploring different table formats, I think we did explore Delta Lake as well. But I think when we first started, when we adopted Iceberg, the Treeno connector for Delta Lake is missing, if I remember correctly. And that's where we went with Iceberg. This was like three years ago.
25:01
So yeah. Okay, thank you. And what about actually compaction? Does Iceberg provide compaction? I mean, the thing is that as your data grows, you will have actually more and more Parquet files, maybe small files. And does it provide actually somehow some meaning of compaction, binary compaction,
25:21
in order to concatenate some Parquet files into bigger Parquet files? Yeah, it has a stored procedure that you can run called data compaction. It is used for the purpose that you just mentioned. It will compact smaller files into the required size. Yeah, it's there. Thank you. Hello, thank you for sharing.
25:40
I'm wondering about, because the main feature about this Iceberg catalog that from what I understand is the data traveling and having snapshots, okay? Because from what I understand, it's like the Hive catalog, but there is more feature on it, like data traveling,
26:01
having some snapshot, taking queries. But how Iceberg does manage the life cycle of these snapshots? And how is, yes, the life cycle of it? Yeah, so there is a stored procedure called expire snapshots. So based on your use case,
26:20
you can use that procedure to expire older snapshots, and that would delete any data files, unused data files, not associated with those snapshots that you are trying to expire. So that's how it maintains that. So I have to manually handle this, right? Or there is a config file or something like this that there is where the snapshots expire
26:43
or something like this? There is this expire snapshot procedure that you can maybe set up as a job that runs at a regular interval and can take care of expiring the snapshots for you. Yeah, okay. So I have, okay, thank you. I actually have two questions. So one, you mentioned that Iceberg currently
27:01
is that third version. Do you have an experience with upgrading between versions and how painful that was? And another question I have, do you have anything with Iceberg where you wish it would do more or where you think it's not working out as good as you hoped initially? I think I'm gonna take one question after the other.
27:22
For your first one, I think the specs are backward compatible, so we didn't have any issues when we migrated from the spec version one to two. And I think even in the next version, I think they're gonna add encryption, but it's backward compatible. So that was our experience. Can you please repeat your second question?
27:42
Yes, my second question is, do you have any parts of Iceberg that you think could be improved or didn't work out as good as you hoped when you first started using? I have to think about this. Maybe not, but we are trying to add features to the PyIceberg site.
28:00
So we definitely need Spark jobs for ingesting big data sets. But then we also have a workflow wherein we ingest data sets every day, so the data is very less. So like MB to GB or something like that. So we definitely want to use PyIceberg. So we are, I think, focusing on adding more features to PyIceberg,
28:22
so we could use that in a, or we could leverage that in a pipeline as well. Yeah. Thank you. Okay, thank you, Gautam, for your great, amazing presentation. Thank you so much.