pg_shard: Shard and Scale Out PostgreSQL
This is a modal window.
The media could not be loaded, either because the server or network failed or because the format is not supported.
Formal Metadata
Title |
| |
Alternative Title |
| |
Title of Series | ||
Number of Parts | 29 | |
Author | ||
Contributors | ||
License | CC Attribution - ShareAlike 3.0 Unported: You are free to use, adapt and copy, distribute and transmit the work or content in adapted or unchanged form for any legal and non-commercial purpose as long as the work is attributed to the author in the manner specified by the author or licensor and the work or content is shared also in adapted form only under the conditions of this | |
Identifiers | 10.5446/19139 (DOI) | |
Publisher | ||
Release Date | ||
Language | ||
Production Place | Ottawa, Canada |
Content Metadata
Subject Area | ||
Genre | ||
Abstract |
|
PGCon 201516 / 29
1
2
3
6
10
11
12
13
14
15
17
18
21
22
23
25
26
29
00:00
Petersen graphOrder (biology)Personal area networkDensity of statesRoutingDemosceneScaling (geometry)Distribution (mathematics)Real-time operating systemLevel (video gaming)Real numberSystem callRing (mathematics)Set (mathematics)SimulationRadio-frequency identificationAsynchronous Transfer ModeSemiconductor memoryPartition (number theory)WordAsynchronous Transfer ModeWechselseitige InformationGreatest elementLevel (video gaming)Multiplication signLogicQuery languageDecision theoryCartesian coordinate systemStress (mechanics)PlanningNumberProduct (business)CASE <Informatik>Computer fontDiagramScaling (geometry)Dynamical systemVirtual machine3 (number)Dimensional analysisMereologyDatabaseWebsiteLibrary (computing)BuildingEqualiser (mathematics)Boss CorporationNeuroinformatikParticle systemCycle (graph theory)Context awarenessOperator (mathematics)Order (biology)Heat transferSingle-precision floating-point formatExpressionSlide rulePhysical systemSoftwareResultantUnitäre GruppeSocial classComa BerenicesPoint (geometry)MetadataInformationArchaeological field surveyReplication (computing)Limit (category theory)Materialization (paranormal)Functional (mathematics)Riemann hypothesisMathematical modelField (computer science)File systemPhysical lawScalabilitySet (mathematics)Execution unitProcess (computing)PhysicalismShared memoryDifferent (Kate Ryan album)Real-time operating systemAmerican Physical SocietyExtension (kinesiology)Table (information)Systems engineeringConfiguration spaceConnectivity (graph theory)MultiplicationDemo (music)Covering spaceElectronic mailing listRoutingComputer fileMiniDiscRange (statistics)Software developerStapeldateiParallel portExpected valuePresentation of a groupComputer animation
09:52
Asynchronous Transfer ModeDistribution (mathematics)Server (computing)State of matterTable (information)Hash functionFinitary relationData storage deviceMaxima and minimaEvent horizonSubject indexingLogical constantField extensionDrop (liquid)Partition (number theory)Distributive propertyRegular graphIntegrated development environmentUtility softwareLogicHaar measureStructural loadTable (information)Virtual machinePlanningMathematicsServer (computing)Event horizonKey (cryptography)Data storage devicePairwise comparisonCASE <Informatik>Functional (mathematics)Connectivity (graph theory)Regular graphReplication (computing)DivisorExtension (kinesiology)State of matterCovering spaceSubject indexingLogicNumberConstraint (mathematics)UsabilityChemical equationCorrespondence (mathematics)6 (number)MetadataConfiguration spaceHash functionCartesian coordinate systemHookingOperator (mathematics)Befehlsprozessor1 (number)InformationOrder (biology)Special functionsDatabaseUser-defined functionRoundness (object)Distribution (mathematics)Connected spaceCore dumpMultiplication tableDampingMereologyQuery languageCoordinate systemAuthorizationPhysical systemMappingPartition (number theory)Point (geometry)Design by contractGastropod shellInstance (computer science)SubsetParameter (computer programming)Range (statistics)Token ringFormal languageResultantConsistencyComputer fileTheoryVariable (mathematics)outputSet (mathematics)Phase transitionSocial classLevel (video gaming)SequelLaptopParticle systemOpticsScaling (geometry)Process (computing)Limit (category theory)Single-precision floating-point formatMultiplication signWordExpressionEqualiser (mathematics)Right angleDrop (liquid)Data conversionSpherical capBit rateIdentifiabilityBitNumbering schemeFinite-state machineSystem callEntire functionElectronic mailing listWater vaporEndliche ModelltheorieLine (geometry)BlogComputer animation
19:43
Partition (number theory)ParsingRWE DeaPhase transitionMaxima and minimaOrder (biology)Metropolitan area networkAsynchronous Transfer ModePattern languageSingle-precision floating-point formatState of matterExt functorSet (mathematics)MultiplicationLogical constantMountain passDisintegrationType theoryServer (computing)BefehlsprozessorMiniDiscWeightField extensionStandard deviationTable (information)Distributive propertyStructural loadInternet forumIntegrated development environmentDifferent (Kate Ryan album)CASE <Informatik>NeuroinformatikKey (cryptography)Insertion lossFilter <Stochastik>LogicProjective planeQuery languageOperator (mathematics)PhysicalismSemantics (computer science)NumberTable (information)DivisorReplication (computing)ResultantDatabaseStatement (computer science)Partition (number theory)Cartesian coordinate systemHash functionSelectivity (electronic)MultiplicationConnected spaceFrequencyGroup actionState of matterMetadataTerm (mathematics)MereologySequelRevision controlClient (computing)Rule of inferenceMultiplication signRight angleOrder (biology)Data recoveryMechanism designRange (statistics)Context awarenessProduct (business)WebsiteFeedbackConsistencyTouch typingCorrespondence (mathematics)Exclusive orPattern languageRow (database)Category of beingTransformation (genetics)Standard errorParallel portDependent and independent variablesUser-defined functionBefehlsprozessorLinear mapComputer hardwareSpecial functionsData storage deviceType theoryBitPhysical systemSingle-precision floating-point formatWordINTEGRALInterior (topology)Distribution (mathematics)Game controllerExtension (kinesiology)PlanningSampling (statistics)SurfaceMessage passingDirection (geometry)Representation (politics)Point (geometry)Standard deviationScaling (geometry)Server (computing)Event horizonObject (grammar)Field (computer science)WorkloadStack (abstract data type)ParsingInternet forumHookingMobile appGoodness of fitSoftware bug3 (number)BlogRoutingUtility softwareDemo (music)Constraint (mathematics)SoftwareRegular graphComputer animation
29:35
Data typeFinitary relationElectronic mailing listMetropolitan area networkValue-added networkTable (information)Event horizonServer (computing)Scaling (geometry)Product (business)Order (biology)Virtual machinePhysical systemTable (information)Row (database)Data storage deviceEvent horizonQuicksortMetadataRange (statistics)Descriptive statisticsPartition (number theory)Regular graphVolume (thermodynamics)BackupDemo (music)Planar graphPhase transitionBlock (periodic table)Query languageMessage passingCASE <Informatik>Subject indexingState of matterBitField (computer science)Insertion lossRight angleOperator (mathematics)Electronic mailing listView (database)Social classWordReal numberResultantParticle systemMathematical analysisVideo gameOpen setArchaeological field surveyProcess (computing)1 (number)Service (economics)Disk read-and-write headGroup actionGreatest elementSlide rulePower (physics)Revision controlSource codeComputer animation
34:20
Electronic mailing listExt functorTable (information)Event horizonData typeMetropolitan area networkEmailMalwareData acquisitionValue-added networkDigital signalBenchmarkRadio-frequency identificationStorage area networkForm (programming)Lucas sequenceType theoryInsertion lossMoment (mathematics)Table (information)Event horizonServer (computing)Commitment schemeMetadataSource codeComputer animation
35:16
Metropolitan area networkRadio-frequency identificationSocial classRaster graphicsEvent horizonPersonal area networkBenchmarkData acquisitionEmailMalwareDigital signalTable (information)Newton's law of universal gravitationPort scannerInsertion lossIdentifiabilityPoint (geometry)Multiplication signUniverse (mathematics)Row (database)Computer fileWordFigurate numberSource codeComputer animation
36:02
Metropolitan area networkCurve fittingEvent horizonData acquisitionRadio-frequency identificationBenchmarkMessage passingEmailAutomorphismEvent horizonTable (information)WebcastBitQuicksortResultantFile formatWrapper (data mining)Extension (kinesiology)Query languageScripting languageRow (database)Data storage deviceSource code
37:00
Metropolitan area networkData acquisitionTable (information)Server (computing)Maxima and minimaValue-added networkCloud computingInformationStatement (computer science)Software testingDifferent (Kate Ryan album)EmailReal numberPauli exclusion principleEvent horizonFlow separationLinear regressionRegulärer Ausdruck <Textverarbeitung>Electronic mailing listStorage area networkScripting languagePressureData compressionInsertion lossParallel portTable (information)Data storage deviceWritingPortable communications deviceDatabase transactionRow (database)Lattice (order)WorkloadOperator (mathematics)Mixture modelForcing (mathematics)Functional (mathematics)SummierbarkeitTheoryMessage passingProcess (computing)Server (computing)BitRegular graphQuery languageMixed realityDemo (music)FreewareQuicksortReduction of orderMoment (mathematics)Source codeJSON
40:09
Table (information)Metropolitan area networkSoftware testingOrder (biology)Linear regressionFlow separationInformationEmailServer (computing)Client (computing)Value-added networkGroup actionForm (programming)ResultantInferenceInsertion lossOrder (biology)Video gameMassCartesian coordinate systemNumberServer (computing)Source code
41:01
Server (computing)MereologyData typeMetropolitan area networkClient (computing)Linker (computing)Table (information)MassDatabase transactionScripting languageHuman migrationNumberMetadataPhysical systemFunctional (mathematics)Query languageExterior algebraGodForcing (mathematics)Uniform resource locatorRegular graphVirtual machineWeightProcess (computing)Cartesian coordinate systemInformationExecution unitMereologyConsistencyBit rateCategory of beingCASE <Informatik>PhysicalismClient (computing)Online helpFile formatPulse (signal processing)DatabaseMilitary baseArithmetic progressionBitTelecommunicationAssociative propertySequelExtension (kinesiology)Core dumpLine (geometry)ResultantBlock (periodic table)CausalityDependent and independent variablesGraph coloringGroup actionError messageMedizinische InformatikRule of inferenceMultiplication signMedical imagingReplication (computing)Revision controlQuicksortOcean currentHigh availabilityData storage deviceDifferent (Kate Ryan album)Expected valueNP-hardSoftwareRoutingSource codeJSON
Transcript: English(auto-generated)
00:12
Hey, I'm Erzgun. I'm one of the founders at Citus Data. Prior to Citus, I was a software developer in the distributed systems engineering team at Amazon.com.
00:23
Today I'm going to talk about pgShard, a sharding and scaling extension for PostgreSQL. I have about 35 slides and it's a fairly technical talk. So if you have any questions, please feel free to interrupt. Before we start, I have just one slide to put things into context.
00:41
As a quick question, prior to this talk, how many of you have heard of CitusDB? And how many of you have heard of pgShard? And just to clarify, CitusDB and pgShard are two separate products that complement each other. pgShard targets the real-time reads and writes use case.
01:02
So the high-throughput reads and writes use case. In other words, it targets the NoSQL use case. And CitusDB is more applicable when you have big data sets. And you want to analyze that data in real time. So you can think of CitusDB as your massively parallel processing database. And both CitusDB and pgShard shard and replicate your data in exactly the same way.
01:26
They share the same metadata. And that's why the two products are fully compatible with each other. And also CitusDB doesn't fork Postgres, but rather extends it through the planner and executor hooks. So that's the context slide about the two products.
01:42
Now onto the talk outline. First, I'm going to talk about motivating pgShard. I'm going to first talk about the use cases where pgShard is applicable. Then I'll talk about the data. How does pgShard lay out the data in the cluster? How does the cluster dynamically scale out as you add new machines into it?
02:01
And how is the data replicated? Next, I'm going to talk about the execution logic. That's the computation there. What happens when you send a query to the pgShard cluster? How do we route that query? What happens when there are failures during the routing of the query? And last, I'm going to present pgShard's product roadmap.
02:22
And Marco is going to do a cool, live, awesome demo. Yeah? Yeah? No expectations. And this is the talk outline. Let's start with the motivation. Why pgShard? Where did it come from? We initially started out by saying, Citus only sports batch data loads.
02:42
So you take your data, you copy your data in. And now we had one customer who came in and said, I'd like to insert my data in real time. We said, CitusDB currently only sports batch loads. And they were like, isn't this PostgreSQL? Can't I just write my own library to do this? And so they went ahead and wrote their own library.
03:01
We had a second customer who came and asked the same thing. And by the third customer, we got the hint. We now took a step back and started talking to existing PostgreSQL users. You could say we did a survey of the use cases where pgShard was applicable. And what people were doing when they started running into the limitations of a single PostgreSQL database.
03:22
We saw two themes. One was doing sharding at the application level. So you take your data and it's the application that manages the sharding and the application. It is obviously a lot of effort for the end user. You need to understand distributed systems. You need to think through what happens when you have failures in those distributed systems.
03:43
And the second theme that we saw was people took their data in PostgreSQL, denormalized it, dumped it, and downloaded it into a NoSQL database. And of course, when you do that, you're giving away a lot of the functionality that PostgreSQL provides. Then we started asking, if you had the ideal PostgreSQL scaling solution, what are your top two, three wishes?
04:07
Of course, when we asked for the top ten things, it makes up a long list. We were curious, what are the most important things for the end users? And we had two answers. First, people said, I don't want to think about how to balance my cluster when I add new machines.
04:24
And similarly, I don't want to think about failure handling in the cluster. That should just work. The second one was, we want things to be simple. I don't want to set up multiple components and configure them. Also, if I'm using PostgreSQL 9.3, it should just work with PostgreSQL 9.3.
04:42
If I want JSONB out of 9.4, it should just work with that. And we took all of that in and tied it to these two architectural decisions. The first decision, dynamic scaling, is what I'm going to talk about next. There we use the concept of logical shards to make scaling out and failure handling easy.
05:04
For the simplicity decision, we leveraged PostgreSQL's extension APIs. If you look at PostgreSQL's planner and executor, they are built to read in data from disk or memory. In other words, PostgreSQL's executor operates by pulling in the data. But if you're building a distributed database, you want to take that query, transform that query,
05:25
and push your computations to the worker nodes. So, your distributed query planner and executor need to be fundamentally different than those of PostgreSQL and also fully cooperate with PostgreSQL's logic. And we'll cover this part right after the logical sharding part.
05:42
Let's start by looking at scaling out a cluster. First, I'm going to talk about how partitioning has been done in the past. Let's say you have three nodes. You partition your data set into these three nodes. A third of the data goes to node 1, a third of it goes to node 2, and so forth.
06:02
And the partitioning dimension in here is time, but it could really be anything. And the partitioning method could really be anything. The idea is, let's say you have 12 terabytes of data, and now you're putting 4 terabytes of it in one of the nodes into a table there, and 4 terabytes goes to another table on node 2.
06:21
Any ideas how this could introduce problems when you want to scale out, when you add a new machine into the cluster? Yes. So, let's say you add a new machine into your cluster. Now that you added a new machine, you need to rebalance your cluster,
06:40
because that's the point of adding a new machine, obviously. And what you're going to have to do is to transfer large data sets. So, you're transferring a terabyte of data over the network. And this is over a gigabit network, and the transfer itself is going to take hours. And not only that, you need to coordinate the transfer from node 1, node 2, and node 3, and make sure that they complete.
07:02
And while you're transferring this data, because that's a lengthy operation, you may have failures in the transfer operation itself. So, getting this right, doing this right, is a lot of work, and not only it's a lot of work, it's also lengthy work, because of the time it takes to transfer this data around. Now that we've seen the scaling out issue,
07:22
let's also take a look at how failures are handled in traditional partitioning.
07:43
Yeah, for range partitioning, that's true. So, if you had hash partitioning, say you hash partition on customer ID, you'll need to rehash. We now introduce replication into this fixture. This is the replication part. And we're using exact replicas in this setup.
08:02
So, node 4 is an exact replica of node 1. Node 5 is an exact replica of node 2. Let's see what happens when we have a failure. So, when you have a temporary failure in node 1, what will happen is node 4 will start taking on twice the load. In this case, your cluster's throughput and latency is bottlenecked on node 4.
08:24
And in a cluster of six nodes, this isn't a big deal, but imagine the case where you had 100 machines in your cluster, and you had a failure. That 100 machine cluster is now bottlenecked on that failed machine's replica. So, you don't get much out of having 100 machines in your cluster if a single machine will become the bottleneck.
08:42
Also, if this node doesn't come back up, if it's a permanent failure, you need to rereplicate four terabytes of data from a node that might already be the bottleneck. Any ideas on how to resolve this issue?
09:05
Yes. Here comes logical sharding. I believe Hadoop distributed file system, HDFS, was the first solution to introduce this at the system level. HDFS doesn't partition the data into nodes.
09:21
It partitions them into much smaller shards. In this case, they are 512 megs each. That number could obviously be configured. And then in this diagram, we have the dynamic scaling out case. We introduce a new node, number 4, into the cluster. And now we want to dynamically scale out the data.
09:40
And this is easy. Your shard rebalancer just needs to move shard number 4 from node 1, shard number 5 from node 2, and shard number 6 from node 3. And if you have a failure during the transfer step, that's okay. Because these are small instances, you can just restart the operation from scratch. In other words, the rebalancing operations that you want to perform,
10:01
they become much easier and much more flexible. The second benefit comes with the replication case. In this example, PgShard replicated your shards using a round robin policy. And the idea is, no two nodes are exact replicas of each other. Let's say we fail node number 1.
10:22
Again, in this case, shard 1's node goes to node 2, shard 6's node goes to node 6, and so forth. So when you have a temporary failure, you're evenly distributing the load across the machines in the cluster. Each node will take on about 20% more load.
10:41
And if this failure were a permanent one, again, re-replication becomes much easier because you're not re-replicating from a single machine. You're effectively re-replicating from the entire cluster, with some pieces of the shards in there. And not so impressive when you only have six machines,
11:01
but imagine you had 100 machines in your cluster. In that case, failures are much more common, and then these operations will become much easier. Quick checkpoint question, how many of you have known about these problems with traditional partitioning before? About half.
11:27
And that's how that cluster maps over to pgShard. This is the example cluster. It's an example pgShard cluster. In here, we have the worker nodes. These worker nodes are exact PostgreSQL databases. There is nothing special about these worker nodes.
11:43
And now we have a metadata server up there. The metadata server is where you go. It's again a PostgreSQL database. When you go and when you create extension pgShard, that guy becomes a metadata server. That's where you create your distributed table. That's where you send your queries,
12:00
and then that's how the queries get farmed out. The metadata server can also be called the coordinator node or the master node. And that's like the guy that you talk to when you want to run queries. And the metadata server is the authority on this metadata. The metadata is tiny. It's typically on the order of megabytes,
12:21
so you can easily copy it around or even reconstruct it. And the metadata changes only when one of three things happen. One, you create a new table, and as a result you create new shards. Two, you rebalance those shards in the cluster. The previous rebalancing example, either when you had a failure and you had to rebalance,
12:40
or you had the new machines and you had to rebalance. And three, when you fail writing to a particular shard, you need to update that shard's metadata to reflect that it's unhealthy. And the challenge is keeping this metadata consistent in the face of failures. To make this metadata example concrete,
13:01
here's a psql command line on one of the metadata tables. Here, on the left-hand side, we have identifiers for shards. And on the right-hand side, we have the hash token ranges that correspond to each shard. And as an example, a query comes into the system, say for a table that's partitioned on customer ID,
13:22
pgShard hashes the customer ID using psql hash functions and gets a hash value. It then looks up in this metadata to find the hash range that correspond to that hash value, and then picks up to find the shard or shards that correspond to the query. Again, let's say in this case we hash the customer ID, and we got a value of minus 1.6 billion.
13:43
You look up minus 1.6 billion in this table, going all the way down, it's here. And then that query is shard 10,006, and we forward the request to that particular machine. That's the state on the masternode.
14:01
For the worker nodes, they are regular PostgreSQL instances. Again, nothing special about the worker nodes. And on the workers, one shard placement corresponds to one PostgreSQL table. If you log into a worker node, drawback slash d, you will see multiple tables. And then to ensure that the table names are unique,
14:21
pgShard automatically extends the table name behind the covers. So if your table's name is click underscore events and your shard ID is 1011, the corresponding shard table's name will be click underscore events underscore 101 on the worker nodes. One thing to keep in mind
14:41
is that you create your index currently and constraint definitions before you distribute your table. So that wraps up the part on how we lay out the data in pgShard. Let's talk about the logic. The way we're thinking about pgShard is that it strikes a balance between usability and SQL functionality coverage.
15:02
After talking to PostgreSQL users, we found that they just wanted a simple component to get them rolling. And pgShard is just that. It's a drop-in PostgreSQL extension. You say create extension. It's there. You then start running SQL commands against your distributed tables. In that sense, you don't have to have any special functions
15:22
or do anything special to query your data. A lot of the key value stores out there as a comparison already have the notion of distribution baked into their APIs. So you have a key and a value, and you hash the key and you forward the request, whereas SQL as a language doesn't have the concept of distribution built into it.
15:41
And that's the balance we want to strike here. We want to support a subset of SQL without requiring any changes to your application. Here's a simple example that sets up pgShard and the distributed table. You go in, you say create extension pgShard. You create table customer use, very normal, regular, nothing special PostgreSQL table there.
16:04
That table will soon become a shell table on the master node. And to do that, you invoke master create distributed table user-defined function, which designates this customer use table as a distributed table. You also specify the partitioning column in that user-defined function.
16:23
Then you call master create worker shards. And what this guy does is the function connects to worker nodes, and then it replays the table schema on the worker nodes, the index definitions, the constraint definitions, and creates the shards and their replicas on the worker nodes.
16:40
There are two final arguments for this function. 16 is the number of shards you want to have in your cluster. This number could be 1,000. This number could be 10,000. It depends on the resources and the size of your cluster. And two is the replication factor. So if you have a replication factor of two, in case one of the replicas fails, that table and its data is still available.
17:06
And when we hook into the planning and execution stage, yes, pgShard, that's true. So if you have, say, 100 CPU cores,
17:21
one math that existing customers are using, say you have 100 CPU cores in your cluster, it's good to set that number to 1,000. That way you'll be able to scale out efficiently, like by a factor of 10x. We can figure a connection to the master,
17:41
so he knows where all the shards are. Yeah, the master node already has that information in the metadata. So when you call master create worker shards, that's when that metadata, the shard, the shard placement metadata gets filled in. But how is there additional bits? I mean, for the external node, what?
18:03
Oh, there is a pgWorkerList.conf. So there is a config file, yeah. Yes? Based on your spares, is there a downside to having a line?
18:22
At some point there is. If you have a million, then you have a million tables in your cluster. And then there is a limitation. We had 16 in there because the first thing people do is they set this up on their laptops, and then they have two databases, and that's the value where we think if you had a million in there,
18:42
then they would have a million tables on the worker nodes. So the typical math that we do is you want to scale out typically to 10x, 100 cores, 1,000, 1,000 cores, like 10,000. And tying this into the planning and execution part,
19:00
I don't know how many of you are familiar with the internal C APIs that Postgres provides, but one of the wonderful things it gives us is hooks, and you have about a dozen of them. With these hooks, you can take over any part of the command's lifetime. These are very granular. You can also chain hooks together to use more than one. And hooks are very flexible. There is no strict contract about what you have to do with them.
19:23
This also means you need to be very careful when overriding them because of the lack of the contracts. In summary, the hook APIs give us a very first-class way of hooking into the PostgreSQL system and cooperating with it. Here's the overview of our planning phase.
19:42
And just as a side note, you can have regular tables and distributed tables within the same database. For example, you can have one large JSONB table that's distributed and a dozen small regular tables all within the same database. And taking a step back, a query comes in.
20:01
PostgreSQL parses the query. We then check if that query is for a distributed table. If it's for a local table, we defer to PostgreSQL. If it's distributed, we find the partition key. We apply partition pruning and find shards that are involved in the query. We then take the parsed query or queries
20:20
and use the same de-parse logic as in PostgreSQL proper to generate back SQL statements. Here's a detailed example of planning MPG shard. An insert query comes into the cluster. PostgreSQL parses it. We then hook into the planner and see that the customer reuse table
20:40
is a distributed table. We separate out clauses on the hashed partition column. In this case, there is only one. It's that one, customer ID equals a chunk. And then we apply PostgreSQL's hashing function and look at the range that that hashed value corresponds to and pick the shard or shards involved in the query. For this, we use the same constraint exclusion mechanism
21:02
as in PostgreSQL. And when we're done there, we parse that query, de-parse that query using PostgreSQL's rule utils.c to get back the SQL statement or statements to send back forward to the worker nodes. This step is important because on a worker node,
21:20
you're going to have multiple shards, so we need to identify which shard you want to write to. Any questions on how this planning piece works? And then there is the part of the actual execution. You have to be a bit careful here because we have a distributed system.
21:42
Parallel requests are going on. We do have full consistency on writes, which means if you write something, you're going to see it when you read it back. There is none of this potential to read from a stale shard and get back stale results as you do in some of the document stores. Replicas need to be visited in order in order for the constraints to work.
22:02
And if any of the replica writes succeed during the modification, you consider the query as successful. If write to a replica has failed, you mark its metadata as the shard being inactive. Here's a visual representation of that. A new shard query comes into the master node. We determine that shard six is the shard that we want to hit.
22:23
And in this case, the replication factor is two, so we need to touch two nodes. And in here, connections are possibly already open. We use a session bound connection cache, actually the logic very similar to Postgres FTW, and we send this query to all the nodes that have those replicas.
22:44
Here worker node number three has failed, but number one succeeded, so we consider the query as a success. And the master node needs to do a bit of bookkeeping for the failure. It marks shard number six, or node number three actually on here, on the metadata as inactive. And then shard number six must now be repaired
23:01
in order to restore the replication factor. And to do this repair, the user currently calls a user-defined function. The single shard select logic is very similar to the insert. We find the shards which contain the result. We find the shard, push down the computation, get the results, and give them back to the user.
23:22
We can dynamically failover to other replicas. While that's happening, this will be invisible to the end user. And one difference between the insert is, read failures currently do not modify the metadata state. This is the part that's different than the insert. And the use case we're targeting here with the single shard is the common key value access pattern.
23:42
So you have JSONB records, you're doing writes, and now you're looking them up. Here's an example. The query comes in. The master node looks at the where closes, says, okay, this is shard number three. It asks to request to worker node one. Let's say worker one had an intermittent failure,
24:02
and that master node finds another worker with shard three. Still in the context of the same query, because the client doesn't know any better, it sends the query to there, gets the results, and gives them back to the client. And one difference between the insert case and this use case is the shard state, the metadata on the master node,
24:21
isn't marked because it's a read query. So this covers the single shard select use case. And then there's the multi-shard select. What pgShard does for the multi-shard is it pushes down the filters and projections, gets the remaining data to the master node, does the aggregations on the master node,
24:41
and any operations transformations on the master node. The part about fully distribution, taking a query and fully distributing it, that's CitusDB's bread and butter. That's CitusDB will apply many transformations and will handle distributed joins.
25:01
What's new? I guess the big new part is pgShard is already in production at New Star Antipolitics. It has been released in December version 1.1, was released in April. And part of the, actually the primary reason we're here is we're looking for your feedback. If you're interested in pgShard, if there is one or two or three features
25:20
that would make you use pgShard, please get in touch with us. We have the project on GitHub. There are forums on GitHub. We have issues or email us and tell us, hey, if pgShard had range partitioning, if it was easier to do fully automated recovery, I would use it. Or I need more SQL coverage. Please communicate with us.
25:42
In terms of the new parts, again version 1.1 was released. It has the shard repair UDF, better integration, select project pushdowns, four to five times faster inserts. And one thing to keep in mind is when you're doing inserts, you want to do these inserts concurrently because each insert is issuing a network run trip.
26:02
And then a good number of bug fixes, thanks to Josh's interior, maybe half of them were opened by Josh and other half were opened by existing customers going into production. pgShard 1.2 will be released in July. It has support for partitioning by composite types. This is in case you want to partition by two columns rather than one.
26:23
Like one example is you have the app ID and user ID, and you want to partition by two columns. People are currently looking at doing it using composite types, and 1.2 will have that feature. It has distributed copy, certain aggregate pushdowns to do the pushdowns better, and again, bug fixes.
26:42
That's 1.2. What's next for 2.1? As it stands, pgShard scales out this cargo. And if your workload is CPU intensive, the single metadata server becomes bottlenecked on CPU between 20 to 40K inserts per second, depending obviously on your hardware.
27:00
This is because the metadata server does the parsing and the routing. And pgShard 2.0 will resolve that. It will go fully distributed, and it will fully distribute the metadata. We have four proposals on the table, and we discussed them during down conference day, in the session in down conference day. One of these designs involves bidirectionally replicating the metadata,
27:22
and the metadata updates to all the nodes. This approach works best when you're capturing immutable events. That is, you have a lot of inserts, but not many updates or deletes. And again, the tricky thing there is, how do you handle failures? Like when these happen, and how do these guys talk to each other and resolve those failures? The second design we discussed is superior
27:42
in that it handles concurrent updates and deletes well, but it requires more work. In this design, we have multiple replication groups, and each query gets served by that replication group's primary. Of course, for this second approach to be simple for the end user, the primary and secondary failure logic needs to be robust and automated.
28:02
And we'll talk more about the pros and cons of these designs in our technical blog in upcoming weeks. To summarize, pgShard is a simple sharding extension for PostgreSQL. It uses many small logical shards to make moving around the data faster and easier.
28:21
You can dynamically scale out by adding new machines, or handle node failures, thanks to these many small logical shards. It's standard SQL, no special functions, and it's a standard PostgreSQL extension. You say, create extension, create table, distribute table, and you get rolling.
28:42
And it goes great with JSONB. We think JSONB with pgShard make PostgreSQL the most interesting, if not the best, NoSQL database. Thank you.
29:04
Let's take two questions, and then do the demo, because the demo is awesome too.
29:21
You'd get a warning message? Yes. So the warning would be, the select on this node failed, and that's how it's communicated to the end user today. That would again be a warning message.
29:40
For purposes of pgShard, yeah, yeah. For the inactive ones, because for the select it's easy, there is no state change. For the other one, you'll actually see that in the state, in the metadata state. It will be there.
30:05
You can't, the way people typically use distributed table with other regular tables is, they do an operation on the distributed table, typically create a temporary table, or view on the metadata server or the master node, and then they join those results together. Yeah, yeah.
30:29
If that's something you'd like, again, if there is a particular use case that you're interested in, please feel free to join the discussion on GitHub. Last question, and then we can do the questions
30:41
after Marco's demo. I guess it's possible. The metadata server, the metadata itself is tiny,
31:03
so we haven't really thought of it in that sense. If it gets to millions, and not pgShard, but we have users, again, who are using Citus DB with about a million shards, and then they're doing fine. I would expect Hadoop runs into that problem typically on the order of like thousands of machines.
31:21
If we get there, I'd be more than happy, and then we could, as in, for the redundancy, I skipped that over in the slide. You can copy the metadata either using pgBackup, or you could have an EBS backed volume.
31:41
You could take a backup of it, and you can even recreate that metadata from the cluster itself. In 2.0, that will all be resolved because it will be fully distributed.
32:02
I think the description was a futuristic demo of a JSON-B table with a dynamically changing row and columnar store behind it, which scales and is highly effective. So we put it together. So what you're looking at now is the view on the... Now we are looking on you from the future.
32:21
This is me from the future. Right. And we wanted to put something together. Also, it's a bit more interesting than just, you know, this is how we set it up. So we used some actual data. We took the last month of GitHub events data as a publicly available dataset. It's about 22 gigabytes,
32:41
and we put it on a cluster on EC2, about five worker nodes, one master node. So currently, we're looking at the master nodes. There's an events table. I mean, it's just one big list of events in JSON that GitHub publishes. And so the events table has some ID,
33:02
some timestamp, an event type, and then there's some JSON-B fields that come from those event blocks that GitHub publishes. And we have an index on there. And so this table is managed by PgShark. So this is a distributed table. So any query I do on this table
33:22
goes into, like, the executor hoops and the planar hoops of PgShark. Now, I set this table up with range partitioning. We don't support range partitioning, but we kind of sort of do. So don't try this at home. You could potentially set it up with range partitioning.
33:42
And so what we have here is there's a bunch of shards, actually, in this case, about 1,000 shards. And the idea in the system if you run it in production is you have one kind of open-ended chart. So there's one that ends in the year 3,000. So, I mean, the system will only work
34:01
for the next 9 or to 85 years. But it will basically, if you do an insert with your current time, or at least something after 6 o'clock, it will end in that chart. And at some point, we rotate the chart. So we make a new one, a new insert is going to that one. So I can do, I could probably set up
34:21
etpool-a so I can tell you what to do. I don't want to type the insert myself because there's data from vcolumns with a lot of stuff in it. You'll see it in a moment. So I'm going to do a couple of inserts. It's pretty ugly. But, so these are, like, push events. So this is one on GitHub.
34:42
Done a bunch of commits. You do git push, then that registers an event and, like, all the commits are in there. So what just happened is we didn't insert this table on this server. Actually, we have one of the worker nodes here. And so these are shards.
35:02
These are shard placements, as we call them. And so this worker node contains about 300 shard placements. And there's five worker nodes. And if I looked at my metadata correctly, so there's a shard with identifier 104543
35:22
that is currently catching all the inserts. So I should now have on my worker node, unless I already inserted the data before, then this will be slightly embarrassing, but yeah, I have five rows I just inserted. I did five inserts in that file.
35:42
So it rooted the insert to this shard placement, but also to another replica. So I can select them from there on the worker node, where it's just the regular posters table. I can also do stuff with that table. I can set up triggers and stuff. I can also just query the distributed table.
36:04
I have some, not really. So this would be get those five events from one of the replicas in a kind of transparent way, because now I'm querying the events table, which is sharded across five nodes,
36:21
but I'm getting the result as if it was a regular table. A bit of messy result due to JSONB. I have some nicer queries. But another thing I wanted to show you is this other crazy idea of having a dynamically changing row and columnar store.
36:41
So there's the C store FEW extension lets you kind of, it's a foreign data wrapper, which lets you store posts for SQL tables in a columnar format, and it can compress the data for you quite well. So I have a script, or I wrote a script
37:00
that actually goes through all the shards and then runs a kind of compression script. So it basically creates a foreign table, which is going to be a C store table, inserts the existing data into it, and then replays the old table with the compressed table. So I can actually do this more or less in parallel.
37:22
So my script takes a date. So let's say I wanted to compress all the data for the month of May. Now before I do that, I want to quickly check how big is that right now. So it's a bit of an ugly query because we're going to have both foreign tables
37:40
and regular tables. So this sums the foreign tables and the regular tables. So there's 8.9, or let's say 9 gigabytes of data at the moment. And so, and all the tables are regular tables. So if I run the compressed table script, I'm going to connect to all the workers, kind of in parallel, and run the compression function.
38:01
So that's going to happen now. What's cool is this is also the master. I can still run queries. This is an interesting query. I can still run queries while I'm doing compression because the compression is sort of transactional. I take a table and then just replace it on the fly. But I lose anything I set up on that table.
38:22
I lose triggers and stuff. So that might be a problem. So this is actually a query that gets basically commit messages from a PG shard, adds support to pushing down aggregates. So you can see that's all ongoing. I mean, this is all the GitHub data that there is. It's about 17 million events, actually.
38:44
So if I do a counter start, I should get about, yeah, 17 million. Now, so my compression job probably finished by now. And I mean, it ran in parallel. So it's pretty fast.
39:02
If I now look at the size of my table, it went down from nine gigabytes to about 7.4. So I mean, didn't compress all the data, compressed about half. Potentially I could get like 50% reduction in storage size. So for archival purposes, it works quite well.
39:20
But also like for all the data, probably rather than doing like quick lookups, which PG shards rather do that, I might want to run more analytical workloads on it and then actually compressing the data can even provide some free performance benefits. Maybe another thing I just wanted to try,
39:42
because I also added- Could you do a backslash D in this login? Yeah, sure. That's one thing I didn't show you. So now a bunch of the tables are actually pouring data. So we have a mixed columnar row-based storage where the old data is columnar and compressed and new data is kind of row-based for backpacks.
40:02
So another thing I wanted to try, because I also promised a demo soon where I actually kill a server during the three. Well, let me do it the simple way now and just kill the server.
40:25
So what happens if you do something like this in PG shard or like PG, you get some nasty error, but it will still get the result, right? Because it will try to connect to the worker that's no longer alive.
40:40
I mean, your application can ignore those, and you still get the result because it will just failover to your application. So if one of the worker nodes fails, it still answers, please. Yeah, I'm impressed this worked, actually. The answer is correct, please. Yes. No, we just randomly pick a number.
41:03
You will get an error message.
41:20
You will get an error message. It will say, I can't complete the correct results. A few cases, like, you cannot sort of nicely do it
41:55
in a transactional way. So what you would currently have to do
42:01
is probably write a script that alters all the shards first and then alters the master table. It depends a bit. I mean, there's certain things that you probably cannot do in that way. I mean, you can add a column easily. Probably if you remove a column, you can do the master first and then on the workers.
42:21
But probably if you have none columns, you might get into some of the cases. But you can work around the lack of a public table. We can provide you with that script. And then I guess if it's important to you, it can be scheduled, prioritized for version 2.0. Yep.
42:43
So pgShard, JSONB, dynamically changing columnar and role store and failure all at the same time. Yay!
43:03
Yay! Yay! Yay! Yay!
43:21
That's a good question. I would because the communication between the master node and the worker nodes are SQL. And so as long as the DPARs, the SQL queries that get generated on the master node are compatible with what the... They would be compatible, I guess, unless you're... I would expect unless you're going from 7.0 to 9.4,
43:41
I would expect them to be compatible. 9.3. 9.3. Yeah, but that's the expectation. I think if... Yeah, yeah. Why would you want to try?
44:06
Any other questions? Oh, sorry.
44:22
Yep. Yep. pgShard doesn't. CitusDB does, pgShard doesn't.
44:48
We don't. Do we? No, because it's a very hard concept in like... When you have a distributed system. Yeah, because we don't know in one shard what's in another shard. So if you want to enforce B for an E,
45:02
like you have to do a lot more network roundprints at the very least, and so it becomes much... Like it becomes... You start addressing a different use case, but it's more real-time, you know, reasonably fast. And the happy case is easy to handle. The question is what happens when you have failures in your cluster. Like and how do you ensure consistency
45:20
while also being available. It's not supported. One of the ideas for making it fully distributed is to have wide... Like wide-directionally replicate the metadata.
45:42
That's one of the design proposals. The second one is it's basically using say PostgreSQL streaming replication and sharding into replication groups. So you could have... Which makes the whole update... Because you're hitting the...
46:01
If there is a... And the idea is we take out the health information out of the metadata, and then the only thing that remains is the shard and shard placements, which is very easy to replicate. But let's say you have the metadata here, and it could really be cached anywhere. And you have a primary, secondary setup.
46:33
So you have this. You have this.
46:41
And then your shards are here. So you're sharding into replication groups, and the queries can hit either here, here, there. So the replication group would manage the shards, the health of the shards that it has. And then you have...
47:04
We are distributing the database. We are handling the consistency stuff differently in the sense that the metadata without the health information, when you take out the health information, it's very easy to keep that metadata consistent. And then here, the cluster, the replication group would be managing its primary
47:21
and then its secondaries. If this guy fails, these machines in here will be responsible for picking the new primary and keeping that data consistent. When a query comes into the system, you route the query to that replication group, and then that replication group handles that query itself. Well, there is a third alternative, which is this extension would just work on Amazon RDS
47:45
if Amazon supports the pgShard extension. So if that's something you'd like to see, please talk to Grant. It would just work, just because RDS already takes care of the high availability of the picture, and when you have the high availability part, this is really easy to resolve. And it would just work on RDS.
48:06
Yes. No, you're not.
48:37
You can handle, in this story, you can handle the distributed transactions
48:40
that touch the same shard that are within that machine, and then the distributed transactions that span across machines, that's a use case you're not intending to handle, like the financial transaction use case. You start a transaction, you debit money from my account, you credit it to your account, you commit the transaction, and our accounts live on different machines. We're not handling it in pgShard currently.
49:11
Could you repeat the question? No, there is nothing special.
49:20
Worker nodes are regular PostgreSQL tables. The only thing, and you may want to do that, is if you want to do the shard repair function, you can create an extension pgShard and that will load the shard repair function. If you don't need the shard repair function, you can just use PostgreSQL databases, and they will become regular worker nodes.
49:40
I think we're out of time. Thank you for coming.