Yves managed to compile an embedded version of Redis by just changing a single line in the original codebase. With it, we’re managing to get about 250,000
SET transactions per second, but it crashes or deadlocks if we try to get any faster. We’re suspecting that this is due to some nasty multi-threading issues, since Redis is mostly single-threaded, yet uses a couple more threads for slow IO operations like disk snapshots.
At this stage of the game, we’re really not sure where we should go with this aspect of our project. On one hand, having an embedded version of Redis would reduce the latency of transactions from 250 microsecond to 1 microsecond. On the other hand, there are ways to work around this issue by using pipelining, Lua scripting, and custom commands.
We have yet to confirm this, but we strongly believe that we won’t face such issues if we use the embedded Redis data pathway only for READ transactions, while using the standard client-server pathway for all WRITE transactions. This might be good enough because most of the use cases that require low latency, such as graph traversals, usually require WRITE transactions only.
By the same token, such transactions would be a lot faster in native C or C++, and we’re likely to use a C++ library like Boost for it anyway. Therefore the case for an embedded version of Redis is getting less and less obvious as our architecture is solidifying.
Following today’s work with Jim, Yves is working on turning our current codebase as a Node.js application that will then be packaged by Jim as a Docker container. This will allow us to deploy it as a set of workers on customer instances.
Jim and I spent the last six hours reviewing our architecture for Hypercube. As a result, we came up with some major simplifications of its dataflow. This will make its implementation and maintenance quite a bit simpler, without any major loss of functionality or performance.
The first major decision was to have a static number of shards. This will make it easier to isolate core.stoic from our hypercube workers through a static set of queues (one queue per shard). And this will ensure that we do not have to implement complex resharding operations, which Redis is not really designed for.
Instead of redistributing records across shards (resharding) whenever a new shard would need to be added, shards will be redistributed across cores whenever more cores are needed. To do so, we will use presharding, which is described on this article. With such a model, we will initially deploy multiple shards per core, on an initial set of cores. Then, as shards increase in size, we will add more cores and moves shards around, while keeping a constant number of shards.
In order to properly size things up and reduce to the absolute minimum the number of occurrences during which we would have to do resharding, we will offer different classes of instances that will be optimized for different sizes of datasets:
- Small: 10GB to 100GB with 16 shards
- Medium: 100GB to 1TB with 256 shards
- Large: 1TB to 10TB with 2,048 shards
According to this basic analysis, it means that shards will never be smaller than 0.391GB and never be larger than 6.250GB. Knowing that Node.js requires a minimum of 64MB and Redis requires just 1MB, our marginal overhead with our smallest shards will be 65MB over 391MB, or about 17%. But with the largest shards, it will be just over 1%. This is essentially the price to pay for a full order of magnitude of dynamic elasticity.
This overhead could be further reduced by running multiple Redis servers for every Node.js worker, in which case we could easily increase the number of pre-allocated shards by an order of magnitude, thereby offering two full orders of magnitude of elasticity without requiring any kind of resharding. If we go for this solution, we will have multiple shards per worker, otherwise, we will have a single shard per worker.
Central Redis Queuing Server
The second major decision was to deploy a central Redis queuing server between core.stoic and our workers, without using the Bull job manager. Instead, we will use standard Redis lists to implement basic FIFO queues (first in, first out), following a common pattern described in this article. With regular hardware and no clustering, we should have no problem achieving a throughput of about 250,000 messages per second (one
LPUSH and one
RPOP per message), which should be plenty enough for most use cases.
According to this architecture, core.stoic will push updates into queues managed by this server, and workers will pull updates from these queues. Therefore, core.stoic won’t have to know anything about the workers, the workers won’t have to know anything about core.stoic, and the queues won’t have to know anything about anything.
To further simplify things, we’ve decided to use one queue (one Redis list) per shard. Since the number of shards will be set in advance and will never change, the number of queues will be the same. And since sharding will be done by simple UUID pooling, core.stoic will know which queue a record update should be pushed into, while workers will know which queues they should pull updates from. It really could not get any simpler.
The third major decision was to only implement support for record-level updates, instead of supporting both record-level updates and bulk updates. The reason for it is that bulk updates that would include many records at once (millions or more) would require a different API with a direct connection from the workers to core.stoic. This would complexify our architecture while creating a tight coupling between core.stoic and the workers, which is something that we would like to avoid if possible.
Instead, we will take advantage of the fact that core.stoic can stream updates in batches, and we will batch our updates using pre-defined time windows (every second for example). According to this model, core.stoic will push batches of updates to every shard-level queue at regular interval. At full throttle, if we were to stream 250,000 records/second across 256 shards (medium instance), we would have about 1,000 records per message. Our recent tests show that we can index about 50,000 records per second on a single worker, therefore indexing 1,000 records will take about 20ms. As a result, such an architecture should allow us to get records from ElasticSearch to the Hypercube in about 1 second (latency), at a rate of a quarter million records per second.
With this architecture, bulk loading of records after mass import will be done by streaming small batches of about 1,000 records at a time, and loading a billion records (1TB) should take no more than 4,000 seconds, or just over an hour. Clearly, we should not be the bottleneck here. Your network connection will be…
Another benefit of this architecture is that we will optimize our dataflow for batches of a few hundreds of records. To do so, we will refactor our stores and indexes loading API in order to take advantage of Redis pipelining, in case we fail in our attempt of developing an embedded version of Redis. Through pipelining, we will pay the 250 microsecond client-server overhead only once per batch of records. In other words, if we have a batch of 250 records, our average overhead per record will be about 1 microsecond, which is what it would have been with an embedded Redis. Conclusion: having an embedded Redis is just a nice-to-have now.
Dedicated Hexastore Worker
Finally, we will deploy a dedicated worker for Hexstore alongside our shard-related workers. This is due to the fact that graph structures are really difficult to parallelize, and complex graph traversal algorithms do not deal well with MapReduce (read this article for a striking example). As a result, our hexastore will be deployed using a single Redis server. On Amazon, this will give us up to 240GB of RAM. Knowing that our hexastore requires roughly 100 bytes per relation, this will allow us to store about 2.5B relations. And if you were to use a dedicated server with 2TB of RAM, you’d be able to manage 20B relations. Unless you’re Facebook or LinkedIn, this should be plenty enough to get started.
Let’s build all this now…
Hugues recently added a plugin to ElasticSearch allowing us to perform joins with a reasonable level of performance. This will allow us to do things like sorting on relationship fields. It will also improve the performance of the Grid perspective quite a bit.
Our new Multiple Relationship datatype has been deployed. Testing it now…
The only problem with this basic mapping is that it would create many hashes and many arrays, which is something that Redis is not designed to handle particularly well. If we want to avoid implementing a custom data type for it, we could use a single Redis hash for every store of the hexastore. Keys for this hash would be the concatenation of the two keys for the first and second objects in our current in-memory stores, and values would be a string joining the values of the arrays in our current stores.
Such an approach would make
set operations very fast, but it would dramatically slow down the
del operations. Since the former are much more frequent than the later, this might be an acceptable trade off, but it is certainly far from ideal.
I’ll give this one a few more hours before firing my code editor…