Scalability Strategies Primer: Database Sharding
Introduction
We’ve entered an era of application development that increasingly requires scalable solutions to everyday problems. By and large, the methods used to scale an application to massive proportions are fairly standardized. With a modicum of time spent researching database scalability methods, one can get the gist of the strategies involved in solving many common scalability problems. The actual implementations of such methods, however, have nary seen the light of day beyond the doors of the companies and developers that develop them.
This article is a primer, intended to shine some much needed light on the logical, process oriented implementations of these scalability strategies in the form of a broad introduction. More specifically, my intention is to teach the majority of these implementations by example. Examples will be formatted as imagined scenarios modeling real life scalability challenges.
The technology focus of this article is intended to be as non-specific as possible. However, in scenarios or example implementations where it is necessary to define a specific software stack, I have chosen the typical LAMP (Linux, Apache2, MySQL, PHP) web development stack.
This article is intended for intermediate developers familiar with the intricacies of a data access layer and its role within the typical web application.
Brief Author’s Note
Most industry standard development practices, frameworks, and design pattern fundamentals have inherent in their intended usage the requirement that the system on which they are running will be of sufficient performance to provide a consistently reliable environment. Now there are obvious exceptions to this horrifyingly broad generalization, such as any distributed design pattern for one, but I argue that the above holds true for what much of the software industry’s development is focused upon. I have found, through a combination of personal experience and fairly extensive research that most of what is taught to and learned by the typical developer is based on the absence of scale. As a result of this, the solutions to many problems of scale are often counter-intuitive.
Before delving further into the subject at hand, I encourage the reader to keep an open mind and attempt to overcome what might at first be an initial sense that you are a bit like Alice in Wonderland – that what you are reading is counter to much of what you’ve previously been taught or learned. Keeping in mind that there is no silver bullet solution, no solution that can solve all your scalability problems without requiring that you compromise on other portions of your application, will help to hammer home the reasoning behind many of the theories presented here.
Simply stated, the generally accepted development solutions to most business requirements break down at scale and must be approached from an often counter-intuitively different angle.
Database Scalability
Its important to have an understanding of some of the basic scalability terminology used throughout this article to explain the more complex concepts and exacting scalability solutions presented later.
Let’s start by defining some of that basic terminology, and discerning some of it from their often misused forms. The addition of brackets to some of the definitions below is used to specify a more database centric form of the word or phrase.
Performance – The speed at which a system is able to service a single transaction.
Scalability – The ability of a system to maintain its quality of service as overall system load increases.
Scaling Vertically – To scale a system vertically is to increase the scalability of the system by adding resources to each individual component within the system.
Scaling Horizontally – To scale a system horizontally is to increase the scalability of the system via the addition of individual components to the system.
Vertical Dataset Partitioning – To partition a [database table] dataset vertically is to partition a dataset by field [or table column].
Horizontal Dataset Partitioning – To partition a [database table] dataset horizontally is to partition a dataset by data entry [or table row].
System – A system, as it is being used in the context of this article, is a general purpose term used to describe a collection of components that are interconnected in both software and hardware, and that combine to create an integrated whole.
Shard – A single instance of a database, often run on a commodity server, which houses a subset of the system’s total dataset. Each shard within a system typically shares the same topically based schema, though the actual data on each shard is unique to that shard.
Did you notice the differentiation made between the definition of Performance and Scalability? Both words are often used interchangeably to mean one or the other, but they are in fact starkly different in definition. I thought it wise to make a concerted effort to bring these two terms to attention, as it is important to realize that this article attempts to tackle the problems specific to rolling out a solution to the Scalability of a system and not the Performance of a system. Also, please note how each definition below Scalability uses one or more previously defined terms within its own definition. We’re building upon concepts here in a way that I hope is easy to follow.
I’d like to get even more specific about the kinds of scalability problems we’re trying to solve with the strategies presented here. This article focuses on scaling horizontally by partitioning horizontally (by row). You may alternatively decide to scale horizontally by partitioning vertically (by column) as necessary, but in my experience, you end up needing to partition horizontally at some point along the path to massive scalability regardless of your decision to partition vertically. Because of this inevitability, I’ve chosen to focus my efforts on an article that details the horizontal partitioning of a database schema.
Partitioning Your Dataset
To better explain the reasoning behind partitioning your dataset, we need to first answer the following fundamental two-part question:
Why are we partitioning our dataset and how does it help us to achieve scalability of our application?
It is difficult, if not nearly impossible, to massively scale your data layer when the data is limited to residing on a single server. Whether the limiting factor is a hardware cost issue, or you’ve simply equipped your server with the highest performing hardware possible, we ultimately find ourselves up against a wall – there are inherent limitations to what is currently possible by vertical scaling our hardware, it is a simple matter of fact. If we instead take our dataset schema, duplicate it onto multiple servers (shards), and split (or partition) the data on the original single server into equal portions distributed amongst our new set of servers (shards), we can parallelize our query load across them. Adding more servers (shards) to our existing set of servers results in near limitless scalability potential. The theory behind how this works is simple, but the execution is a fair bit more complex thanks to a series of scale-specific issues one encounters along the way.
We’ll continue with the concepts behind the partitioning of the actual dataset. There are numerous dimensions, or data subsets (often topically organized tables), to which you may choose to partition your dataset, and the method to follow is highly dependent on your particular problem domain’s requirements. This process is often referred to as denormalizing your schema. At small scale, having a normalized schema is ideal, but at large scale, having your data denormalized makes scalability much simpler – we’ll get into why that is shortly. Now I’m going to be completely honest here – how one decides to partition their dataset involves some serious thought, so let’s break down a few of the more commonly encountered, real-life dataset scenarios and walk through each by example.
I’ve divided each dataset scenario first by its lookup type, then by its partitioning method. I’ve done this for a couple of reasons. First, the way in which your web application will find where a specific piece of data (commonly a table row) resides within a partitioned system is different depending on the partitioning method used. Secondly, each lookup type has several trade-offs associated with its use, and therefore I think it appropriate that each lookup type be discussed at length.
Algorithmic Lookups
To perform an algorithmic lookup is to use a rule to retrieve the shard location of the data based upon a simple algorithm seeded by some portion of the data itself.
Date and Time Partitioning
Partitioning by a dataset entry’s relevant date and time is one of the simplest ways to perform an algorithmic lookup. Often the relevant date and time that you are performing the lookup on is the date and time of the entry’s insertion into the dataset, though this is not always the case.
Take, for example, a common billing scenario involving invoices. If you are working with a large dataset that is overpowering a single server within your system, you may opt to partition that server’s dataset over multiple servers, or shards. In doing so, you relieve much of the contention on the origin server’s resources. In our example, we’re presented with the following problem:
“Our system has a single database server. That database component is over utilized, nearly to the point of failure, by intermittent but long running queries. The primary issue is the sheer amount of data within the dataset. To put this in more specific terms, we are working with approximately 25 million invoices added evenly over a period of 5 years.”
Given the above scenario, we can say the following:
- Low Volume of Reads/Writes – we aren’t working with a high volume of reads or writes
- Resource Intensive Reads – although the database reads are intermittent, they are long running and therefore each utilize an intensive amount of both CPU and Disk resources
- High Volume of Data – we’re working with a large volume of data
- Even Distribution of Data – our dataset is large, but its evenly distributed over 5 years at about 5 million entries per year
Now that we’ve broken this problem down by its core characteristics, we can begin to apply our sharding solution!
In this scenario, we’re fortunate enough to be working with a predictable dataset. Based upon historical precedents, our dataset will continue to grow over time at a rate of 5 million invoices per year. The typical commodity server is capable of handling a table of 5 million records without much difficulty (the actual number of records is arbitrary in the context of this example). Because of this, we can easily partition our dataset of 25 million records by relocating each year’s worth of records onto a separate database shard. If we apply this strategy we end up with five commodity servers, or shards, each running a separate instance of MySQL. Within each instance of MySQL is a single Invoices table containing one year’s worth of invoices or five million records.
Each year going forward, we’ll add another shard to our system, and on it will reside a single instance of MySQL with its table containing that year’s worth of records. Assuming we continue to experience linear growth and fairly evenly distributed and consistent query execution across all shards, we’ve now positioned ourselves for long-term scalability.
Determining where to insert a new row is simple. Assuming that each shard is labeled starting with zero, the following pseudo-code expression will produce the shard to insert into.
currentDate.Year - startDateOfFirstRecord.Year = shardId to insert into
The above expression can also be used to determine which shard we need to query in order to retrieve the data that we need. If we need to retrieve invoices from a year ago, we would use the following pseudocode.
(currentDate.Year - 1) - startDateOfFirstRecord.Year = shardId to query
This strategy is fairly straightforward and lends itself well to parallelization of queries that span multiple shards. For example, if we need to query records over two years, we would split the query up into two separate queries, each running on the shard that contains the specified year’s data. We’re now running these queries in parallel and taking advantage of the increase in resources that our new system provides us – running a query on two shards is effectively twice as fast as running nearly the exact same query on a single database server.
Row Count Partitioning
Another common alternative to Date and Time partitioning is Row Count Partitioning. Row Count Partitioning utilizes the same basic concepts as Date and Time partitioning, with the following exceptions:
- We aren’t limited to a single type of dataset field as our criteria. We can simply use a common, albeit simplistic, benchmark for determining dataset size – the dataset’s row count.
- We don’t need to worry about the growth rate at which our dataset grows, because the row count is a measurable result of our dataset’s growth itself.
Let’s apply the Row Count Partitioning strategy to the same scenario we used in the Date and Time Partitioning strategy. I’ve restated the scenario here so you don’t have to navigate backwards:
“Our system has a single database server. That database component is over utilized, nearly to the point of failure, by intermittent but long running queries. The primary issue is the sheer amount of data within the dataset. To put this in more specific terms, we are working with approximately 25 million invoices added evenly over a period of 5 years, though this can’t be guaranteed.”
Given the above scenario, we can say the following:
- Low Volume of Reads/Writes – we aren’t working with a high volume of reads or writes
- Resource Intensive Reads – although the database reads are intermittent, they are long running and therefore each utilize an intensive amount of both CPU and Disk resources
- High Volume of Data – we’re working with a large volume of data
- Even Distribution of Data – our dataset is large, but its evenly distributed over 5 years at about 5 million entries per year, though this can’t be guaranteed
We no longer have to worry about whether we’re working with a predictable dataset. Based upon historical precedents, our dataset will continue to grow over time at a rate of 5 million invoices per year, but this now can’t be guaranteed. The typical commodity server is capable of handling a table of 5 million records without much difficulty. Because of this, we can easily partition our dataset of 25 million records by relocating five million records at a time onto a separate database shard. If we apply this strategy we end up with five shards, each running a separate instance of MySQL. Within each instance of MySQL is a single table containing five million records worth of invoices.
Each year going forward, we’ll add another shard to our system, and on it will reside a single instance of MySQL with a table containing five million records. Now that we don’t need to worry about whether our dataset maintains its apparent even distribution, we are now positioned for long-term scalability, in addition to non-linear growth.
Determining where to insert a new row is simple. We simply set a configuration variable, within our application, indicating which of our shards is currently actively taking on additional rows. As the active shard reaches its maximum row count threshold, our application simply needs to increment (or otherwise change) the configuration variable to the next shard ready for use.
Now here’s where the flexibility we’ve received as a function of working with a row count tends to make querying the dataset slightly more difficult. The above rule cannot also be used to determine which server we need to query in order to retrieve the data that we need. If we need to retrieve invoices from a year ago, we need to query all of our shards, since we don’t have the luxury of being able to determine where the records that fell within the last year were inserted.
This isn’t necessarily as bad as it sounds. What we’ve done here is build a highly parallelized version of our existing system. Now, instead of one server querying through our dataset to find what we need, we have five shards doing the work in parallel, over smaller subsets of the same dataset. We would then take the results from each shard and combine them in our application layer, resulting in a significantly faster method of querying a large dataset.
It’s important to understand that each method of partitioning your dataset is better suited to certain problems, as opposed to others. In the case of the Row Count Partitioning strategy, we’ve explored a method of partitioning that is best used in conjunction with a large dataset that requires queries to be run often across a majority of the data within our dataset.
This strategy is fairly straightforward and lends itself well to parallelization of queries that span multiple shards. For example, if we need to query records over all five years, we would split the query up into five separate queries, each running on a separate shard. We’re now running these queries in parallel and taking advantage of the increase in resources that our new system provides us – running a query on five shards is effectively five times as fast as running nearly the exact same query on a single server.
Master Index Lookups
Using a Master Index Lookup isn’t nearly as simple or intuitive as an Algorithmic Lookup. It is, however, necessary for scenarios that require partitioning along a dimension that is specific to a particular dataset field that is not inherently consecutively ordered.
Domain Partitioning
One of the more complex partitioning methods, Domain Partitioning provides the highest amount of flexibility in choosing one’s partitioned dimension.
Determining what dimension to partition on is highly dependent on the nature of one’s database schema. However, in order to properly illustrate how you might do so, I will use a typical user-centric database pseudo-schema as our working example. I’m hoping that this example will clearly describe the process involved in determining a dimension to partition in such a way that you will be able to infer how to alter this strategy for your particular application.
The key to understanding how to partition a database schema is to first determine if you are working with a schema that can be appropriately partitioned in the first place. A schema that is well suited for partitioning along a particular dimension will have the following properties:
- The schema’s tables are naturally topically segregated as opposed to topically cross-related. For example, most tables relate to a User table, and contain rows that are particular to a single User row.
- The schema’s most significant topical tables, if split, are unlikely to require many joins across them. For example, if we are partitioning along the User table, it should be unlikely that we will need to join multiple Users that each reside on a different shard.
- The number of satellite tables, and hence their contained data, related to a single hierarchical table is small enough in size, and will stay small enough in size, so as to not become a performance bottleneck.
Please note that although the above properties make for a best case scenario dataset, failure of your dataset to have all of them does not imply that your dataset cannot be partitioned using the Domain Partitioning strategy.
Domain Partitioning is comprised of two main server types: an Index Shard and one or more Domain Shards.
The Index Shard serves as the primary lookup mechanism. At a minimum, it will contain a single index of a dataset’s partitioned dimensions and their Domain Shard locations. This can be easily represented as such – an application utilizing the Domain Partitioned strategy will first connect to and query the Index Shard to retrieve the shard location of the requested data. Once that location is returned from the Index Shard, another connection will be made to the Domain Shard storing the requested data, and the original query will then be executed, returning its result to the application.
You may be wondering if there is a high amount of overhead involved in always connecting to the Index Shard and querying it to determine where the second data retrieving query should be executed. You would be correct to assume that there is some overhead, but that overhead is often insignificant in comparison to the increase in overall system performance, as a result of this strategy’s granted parallelization. It is likely, independent of most dataset scenarios encountered, that the Index Shard contains a relatively small amount of data. Having this small amount of lookup data means that the database tables holding that data are likely to be stored entirely in memory. That, coupled with the low latencies one can achieve on a typical gigabit LAN, and also the connection pooling in use within most applications, and we can safely assume that the Index Shard will not become a major bottleneck within the system (have fun cutting down this statement in the comments, I already know it’s coming :)
Let’s proceed through our user-centric scenario by detailing it out further:
Given the above scenario, let’s start by choosing our partitioning dimension. We know that all of the data within our dataset is directly or indirectly topically related to a single entity – a user. We also know that users being added to the system are the primary source of our system’s enormous growth. Since users are defined by their user row within the user table, we can confidently say that by partitioning the user table across multiple shards, we can distribute load in a manageable manner. The user table, and its rows, is our chosen partition dimension.
Defining the Index Shard schema
Now that we know our partition dimension is a user, we need a unique identifier for each user so that we can use it to lookup our user. This is easily done by incorporating a userId key column within the user table. Having the userId key is important, but it isn’t very useful in looking up a user in and of itself. Most users are queried using their username. We can even take this a step further and say that authenticating a user is nearly as important as identifying them by their username. Authenticating a user requires both a username and a password. Armed with this common knowledge, we can begin to specify the schema of our Index Shard’s first couple of tables.
The most obvious required table within the Index Shard lookup database is a shard table. This table has the following columns:
shardId – used as the unique identifier for a shard
connectionString – a connection string used to connect to a shard
status – used to signify a shard’s status as online, offline, or in active insert mode
createdDate – the date the shard was added to the system, used for historical purposes
The second table we need to define is the user_lookup table. This table has the following columns:
userId – used to uniquely identify a user. Is the same userId used to identify a user within the user table located on each shard.
shardId –used to uniquely identify the current shard that a user is located on
username – the username corresponding with the userId. Is the same username used within the user table located on each shard.
password – the password corresponding with the userId. Is the same password used within the user table located on each shard.
Now let’s run through the four common SQL command types and apply their usage to our sharded system, making sure to note the differences in procedures used. These command types are the SELECT, INSERT, UPDATE, and DELETE.
Insert Scenario: A new user signs up.
- Connect to the Index Shard using an application configuration-level connection string.
- Query the shard table and retrieve the shard row that represents the current shard with a status of active insert mode.
- Disconnect from the Index Shard.
- Connect to the Domain Shard as specified by the previously retrieved shard row’s connectionString.
- Insert the user’s sign up information into the user table. Retrieving the userId as a result.
- Insert the user’s remaining creation information in to the user table’s related tables as necessary (i.e. user_profile, user_blog, etc).
- Disconnect from the Domain Shard.
- Connect to the Index Shard using an application configuration-level connection string.
- Insert the new user’s lookup information into the user_lookup table, using the shardId from the retrieved shard table and the userId from the Domain Shard’s user table, for the new location of the user’s information.
- Disconnect from the Index Shard.
Update Scenario: A user changes their password.
- Connect to the Index Shard using an application configuration-level connection string.
- Query the user_lookup table, using the username and current password of the user, and retrieve the user_lookup row that contains the user’s lookup information.
- Query the shard table and retrieve the shard row that represents the user’s Domain Shard location.
- Update the retrieved user_lookup row, changing the password field to the user’s new password.
- Disconnect from the Index Shard.
- Connect to the Domain Shard as specified by the previously retrieved shard row’s connectionString.
- Update the user’s user row, found using the userId as retrieved earlier from the user_lookup table, changing the password field to the user’s new password.
- Disconnect from the Domain Shard.
Delete Scenario: A user closes their account.
- Connect to the Index Shard using an application configuration-level connection string.
- Query the user_lookup table, using the username and current password of the user, and retrieve the user_lookup row that contains the user’s lookup information, saving it for later use.
- Query the shard table and retrieve the shard row that represents the user’s Domain Shard location.
- Delete the user’s user_lookup row, using the user’s username and password, or userId to find the user’s row.
- Disconnect from the Index Shard.
- Connect to the Domain Shard as specified by the previously retrieved shard row’s connectionString.
- Delete the user’s user row, found using the userId as retrieved earlier from the user_lookup table.
- Disconnect from the Domain Shard.
Select Scenario: A system visitor views a user’s profile page.
- Connect to the Index Shard using an application configuration-level connection string.
- Query the user_lookup table, using the username or userId of the user, and retrieve the user_lookup row that contains the user’s lookup information.
- Query the shard table and retrieve the shard row that represents the user’s Domain Shard location.
- Disconnect from the Index Shard.
- Connect to the Domain Shard as specified by the previously retrieved shard row’s connectionString.
- Query the user_lookup table to retrieve the user’s basic information, using the previously retrieved userId.
- As necessary, query the user’s additional profile information and blog entries via the user_profile, user_blog, and user_blog_entry tables respectively.
- Disconnect from the Domain Shard.
Our system has now been successfully Domain Partitioned. As a result, we have effectively enabled our system to scale along the User dimension far into the future. By simply adding additional shards to our system, and updating the necessary shard table row’s, we can easily increase the capacity of our system with little effort.
Dealing with Hotspots and Rebalancing Shards
It is nearly inevitable, that at some point within the lifecycle of any one of the previously discussed scalability strategies, we will encounter a scenario that requires a redistribution of our data as a measure to prevent continued performance degradation within a single overloaded shard.
Examples of real-life scenarios where this kind of behavior may be observed are numerous. Using our user-centric theme, one might notice a hotspot originating around a specific user that is getting a higher than usual query hit-rate due to some level of user importance. That user may be a prominent blogger or some other similarly popular individual. As a result of their prominence, we can imagine their user profile receiving pageviews of multiple orders of magnitude higher than the average user’s profile.
Building upon our Domain Partitioning strategy and by enumerating a further set of specifics, we can find a suitable solution to the hotspot problem. Let’s imagine that each row within our user table contains a unique identifier. This unique identifier is a Globally Unique Identifier (GUID) and its uniqueness is guaranteed throughout the system. If we also require that any related tables, such as the user_profile, user_blog, and user_blog_entry tables also contain GUID’s serving as their table keys, we can easily move any data associated with a single user amongst any of our user Domain Shards by migrating the data and updating the user_lookup table. These migrations could be automated, but are likely to be rare enough that a manual migration may be preferred given that it is sometimes difficult to estimate the required resources needed to dedicate to a hotspot user or group of users.
A simple automated migration process might look something like the following:
- Connect to the Index Shard using an application configuration-level connection string.
- Query the user_lookup table, using the username and current password of the user, and retrieve the user_lookup row that contains the user’s lookup information.
- Query the shard table and retrieve the new shard’s shard row that represents the user’s new Domain Shard location.
- Update the retrieved user_lookup row, changing the shardId field to the user’s new shard.
- Disconnect from the Index Shard.
- Connect to the user’s new Domain Shard as specified by the previously retrieved shard row’s connectionString.
- Insert the user’s original user row data into the user’s new shard’s user table.
- Insert the user’s remaining information in to the user table’s related tables as necessary (i.e. user_profile, user_blog, etc).
- Disconnect from the Domain Shard.
- Connect to the user’s old Domain Shard as specified by the user’s originally retrieved shard row’s connectionString.
- Delete the user’s user row, found using the userId as retrieved earlier from the user_lookup table.
- Disconnect from the Domain Shard.
It’s important to realize that dimension entity migrations using the above method is a storage and performance trade-off. A true GUID is a relatively large column type, usually somewhere between 16 and 36 bytes depending on whether compression is used. Working with keys of this size makes migrations simple, but can have a negative effect on queries running through the dataset. Be sure to weigh your options and plan accordingly based upon your particular business requirements.
Adding High-Availability
This article has, up until this point, discussed solutions to database scalability challenges solely. However, if one is to design both a highly scalable and highly available database solution, there are a few other points that need to be applied to the design principles presented so far. Please note that the subject of high-availability is worthy of its own article and that this section is merely a brief introduction to one of the more common failover strategies.
Providing high-availability, or failover functionality to ensure continued operation of our system, is a concept that is independent of the scalability of a system. Therefore, we can add high-availability to any scenario previously used to explain the application of our scalability strategies. There are also numerous availability strategies that one can use, similar to the differences in how we apply scalability strategies. I’ll be reviewing one of the more common availability strategies used in MySQL – master to master replication.
Master to Master Replication
Master to Master Replication involves utilizing MySQL’s replication capabilities to create redundancy of data between two servers, effectively creating a small redundant cluster.
This strategy requires two MySQL servers, which together form a single shard as defined in any one of our previously imagined scenarios. As is typical in a MySQL replication implementation, there is a master and a slave replication instance on each server within the shard’s cluster. As mentioned earlier, each shard is partitioned along a specific topical dimension. This topical dimension, implemented via a database table, must contain a unique identifier. This unique identifier must be unique not just on a single machine, but either globally or between the two servers being used to create our highly-available shard. This uniqueness is really the only data-level requirement necessary for this availability strategy.
In an effort to explain this as simply as possible, let’s assume that our unique identifier is a Globally Unique Identifier (GUID). Upon inserting a row into our primary dimension table on our first Master, our replication configuration will replicate that command to its slave system which in our case is the second Master. Because this same configuration is setup on our second Master, inserting a row into our primary dimension table on our second Master will replicate the command to our first Master. This replication setup effectively results in a high-availability, hot-failover solution.
Any table that we wish to replicate is required to have a GUID as its key so that it may be properly replicated. In practice, one should provide every table within a shard with a GUID key so as to ensure complete global independence of the dataset contained within the shard.
Now, our application layer needs to be aware of the availability of both servers within the shard cluster. In doing so, we’ve provided our application with two endpoints to which it may connect, execute commands, and retrieve data. A common method of utilizing both servers equally is to implement a simple round-robin algorithm to spread the connections between the two servers fairly evenly.
The occurrence of a single server failure within the shard cluster does not result in the loss of any significant amount of data, thanks to the replication mechanism continuously copying commands from the failed server to the running server. In order for our system to take advantage of this failover, we need to make our application layer aware of the behavior that occurs as a result of one server failing. Simply put, if one server fails within a shard cluster, our application layer needs to identify this occurrence and redirect its SQL commands to the remaining server while the failed server is being recovered via human intervention.
Wrap-Up
Originally this article’s length was meant to be no more than a few pages, coupled with a coded implementation of a simple PHP sharding framework. About half way though, I realized that the concepts themselves were so important to the complete understanding of how a sharding implementation might work, and the subject itself so immense, that I wouldn’t be doing the reader justice by cutting their explanations so short and not getting as logically detailed as is reasonable for an introduction. Hence, I rewrote much of the original article into a format that espoused real-life scenarios to explain many of the more advanced partitioning strategies. I hope you, the reader, agree that I made the correct choice – please let me know if you don’t.
I feel confident saying that we’ve covered much of what I consider to be the bare conceptual basics of horizontally scaling your application’s database layer, and that was the intent – to provide a solid primer and no more. This subject is easily worthy of a book, and in fact, the absence of such literature was largely the motivation for this article. I hope to expound further on this subject over a series of future posts, but in the meantime, I’d like to hear some feedback on the format I’ve chosen. Did you find the “teach by example” theme easy to follow? Did the article provide a good base from which to build out from? Where there any particularly confusing sections that weren’t clear? Please comment and let me know so that I may further refine my writing style for this type of technical article.
Postgres == no need for sharding?
Posted by: SFPUG | December 16, 2008 at 07:18 PM
Hi SFPUG,
Most of the above sharding strategies are just as applicable to Postgres as MySQL (or SQL Server, Oracle, etc). Database sharding is really an RDBMS independent scalability solution.
Posted by: Max Indelicato | December 16, 2008 at 09:08 PM
Why are you storing the password in the Index Shard?
How would you query the system if it receives a URL to fetch a particular Blog Entry, for instance, if someone enters the following URL: http://theblog.com/BLOG-ID ? Would we then need to always have urls that contain the user's ID (http://theblog.com/USER-ID/BLOG-ID) ?
What if we have a page that lists all the top blogs (which may be on different shards)? Would we then need to combine the results of multiple queries within the application? If yes, would it not be a problem to do *sorting* and *pagination* of huge datasets within the application layer (as opposed to the database layer)?
Is it possible (performance-wise) to use joins (and UNIONs) with *remote tables* (if the db supports it like PostgreSQL) within the database layer itself? Would that be better than doing it on the application layer?
Posted by: Nikhil Gupte | December 17, 2008 at 10:59 PM
Great article! Covers the base of scaling, and very much needed.
Thank you
Posted by: Niklas | December 18, 2008 at 03:19 AM
Good article.
You haven't talked about using limit or order by clauses in the user shards.
What if you want to display a list of users, alphabetically ordered?
Posted by: Bogdan | December 18, 2008 at 04:08 AM
Hi Nikhil,
I've numbered my responses to each of your questions for clarity:
1.) I stored the password in the Index Shard as an authentication optimization. Really, my intent was to show that you can use the Index Shard to create indexes on data beyond just the data's key. Having the password in the Index Shard means that upon authentication, we don't need to make another trip to a user's Domain Shard because all we're doing is checking the username and password anyway. Again though, this is an optimization and you could just keep the password in a user's Domain Shard solely if you wished.
2.) To obtain a user's blog entries you would create another index table in the Index Shard. An index table with the following columns would probably do what you need:
* userId
* blogId
Using the small table above, you would simply query for the blogId and retrieve the userId. If you joined that table with the original user index table, then you could find the shardId and retrieve the blog entries from the Domain Shard as specified by the shardId.
3.) Listing the top blogs would likely require some data duplication. You're correct that pagination and sorting is difficult in the application layer, and as such, it should be avoided if possible. I'd recommend creating specific Domain Shard just for the blog list using one of the other partitioning methods like Row Count Partitioning for example. A chron job or even a small application could compile the top blogs in the background on a separate server and insert them within the blog Domain Shard. Its important to remember that each partitioning strategy has its place and you may use them in conjunction with each other within the same system.
4.) In a sharded approach, I would recommend that you avoid joins across shards altogether. They can get tricky and almost always result in an overuse of resources. The solution here would be to use another partitioning method like in question 3.
Hope that answers your questions!
Posted by: Max Indelicato | December 18, 2008 at 10:22 AM
Hi Bogdan,
See the answer to Nikhil's question 3 that I provided above. Use each partitioning strategy for what it's intended for and combine them on the same set of data as necessary.
Posted by: Max Indelicato | December 18, 2008 at 10:28 AM
You kids!...
This stuff we call Database Federation. Ever heard of that?
While you need to mock about in you application code, we can just add new servers and do a rebalance of data, and that is it. No application or application configuration changes required...
Posted by: JAlexoid | December 18, 2008 at 03:36 PM
Database federation traditionally results in a SPOF - the database that is acting as the view. ***[Database federation also doesn't provide parallelized queries upon a single whole dataset spread amongst the federated servers.]*** - EDIT: I actually believe I'm wrong on this point.
Beyond that, the issue is lack of control. If in fact you do hit the limits of scability afforded to you by a specific vendor's implementation of database federation, you're out of luck. You can't easily pick apart a vendor's software to optimize as necessary, and a vendor rarely has your application's requirements in mind when they were designing their federation feature-set.
With database sharding as I've described in this article you're allowed all of the above, in addition to everything that traditional RDBMS database federation provides. You can think of database sharding as a custom approach to traditional database federation. I'll admit that canned database federation is a far easier solution, it's just not the right solution for every problem.
These are some of the reasons that many of the larger organizations (Facebook, Myspace, etc) that see obscene amounts of load on their databases, go with the custom database sharding strategy.
Posted by: Max Indelicato | December 18, 2008 at 04:04 PM
Awesome job, thanks for the article.
Posted by: rhnet | December 19, 2008 at 08:25 PM
Hi, thanks for the article. I have a question. It will sound silly for somebody who's been practicing db sharding, but please bear with me.
Let's say I have shards that store proverbial invoices. Since each shard is a stand-alone db instance, it will have "invoices" table with auto_incremental PK. Each shard auto increments the PK value on its own and so values are not unique among shards. In such a case I cannot expose an invoice by its id. www.example.com/invoices/123 could return multiple invoices from multiple shards. Am I missing anything here?
Thank you,
Yuriy
Posted by: Yuriy | December 28, 2008 at 04:22 AM
Hi Yuriy,
You've asked a really good question. In fact, I'm going to write a blog post on it tonight so that I can get into more depth than I can on a response to your comment. Keep a look out for it later on and feel free to comment on that post if your question isn't answered sufficiently in the post.
Posted by: Max Indelicato | December 28, 2008 at 02:44 PM
Sounds good! Thanks Max.
Posted by: Yuriy | December 28, 2008 at 02:56 PM