This is the fourth blogpost in our series about real-time personalization at Paytm. In this blogpost we will go over the major lessons we learned and experiences we had from architecting, building, deploying, and managing the personalization engine.
Cache is Not Free
Before making the first release of the real-time personalization engine, we started conducting load tests. Our initial estimates for usage and traffic were 1000 requests per second. We conducted load tests using the Gor tool for replaying traffic into our Akka cluster. Our test results were measured using Datadog metric which are shipped to Datadog’s cloud-based hosting. We built dashboards around the system performance under test and sustainable traffic. The test results show that the Akka cluster based system itself was capable of handling request rates well about 1000 requests per second. However our system does not exist in a vacuum. We have external dependencies on other systems as well. Our performance will be impacted by the externalities.
There major external dependency that signalled a flag requiring more attention is an ElasticSearch cluster which is hosted by another team. The ElasticSearch serves a pool of catalog products as candidates for scoring by our real-time machine learning models. For every request to our system, we make several search queries to the ElasticSearch cluster. Our computation will then put on hold, thanks to the asynchronous nature of Akka, until the results come back from ElasticSearch. Only after that, we can resume the computation and serve the response. In other words, our response time will be impacted by the performance of the external ElasticSearch cluster. Since this particular ElasticSearch cluster belongs to another system already in production, we have to be mindful of the impact of our added load of requests. During our load tests, the ElasticSearch cluster performance started to degrade significantly when the request rate is increased above 800 requests per second. It became apparent to us that 800 requests per second is approximately the limit the ElasticSearch cluster can handle.
Given the challenge, the engineering team started tackling it quickly. We started looking into the log files, thanks to the ELK setup. We started noticing that the requests made to the external ElasticSearch cluster are very similar for most customers who visit the website with similar product category preferences. It is true that the products in the Catalog will change during the day, for example, the stock availability, price and discounts may change during the day. But we noticed that the resulting product from our queries will not change within a short period of time, for example within 15 minutes.
This idempotent nature gave us a perfect solution: cache it (surprise!). The machine learning models will handle the per-customer customization based on the candidate products, which do not need to change within a given short window of 10-15 minutes. After evaluating few candidate solutions we found that Redis, offered by Amazon AWS will be the most reasonable solution for us. We quickly set up a Redis instance, made it accessible from all hosts in the application cluster. We changed one of the components so that the ElasticSearch results are cached with key being the request, and the returned payload as value. We use Time to Live (TTL) of 15 minute. The outcome was better than everyone in the team had expected. The cache hit rate reached as high as 99% with the load test data. Given that the external ElasticSearch cluster is able to handle up to 800 requests per second without issue, we were optimistic that Redis made it possible for us to support much much more than 800 requests per second. Simple math will tell you that 800 divided by 1% = 80,000 requests per second. By using cache we have increased the theoretical throughput of our ElasticSearch queries 100x. Even though the rest of our application cluster does not handle this request rate at this moment, we can scale our system without worrying about performance of the ElasticSearch part of our system anymore.
Once we finished the rest of the performance testing, we released the Real-time Personalization Engine on time. We setup on-call duties for the team using PagetDuty. Our typical latency was 50ms and no noticeable incidents. Everybody in team was happy and slept like a baby (at least for a few nights).
A week after the release, our team on-call received a PagerDuty in the middle of the night (in India time zone, it’s about morning peak traffic time) alerting that our system’s overall latency shot up multiple times from 50ms to 300ms.
We started looking at the Datadog dashboards we set up for monitoring the system performance. The dashboards showed that request per second rate was 3200 while average the ElasticSearch requests cache hit rate still remained very high. All system components seemed to work fine, except.. well, as you might have guessed, our favourite cache.
There were a few issues:
- The latency for fetching from Redis started to increase exponentially when the request per second went beyond about 3000. Cache which is supposed to speed up things, now became the bottleneck and slowed down the whole system.
- Every 15 minute, there was a huge spike in number of requests the system made to the external ElasticSearch cluster. Whenever this happened, the system slowed down further.
Further investigation revealed that the first issue happened because the cache fetch requests have saturated the network link between our cluster and the Redis host. Simply put, the 750 Mbits/s network is too slow :-). For the second issue, it happened because of the TTL = 15 min we set for all the cache items. Due to the sheer number of customer requests, most cache keys got warmed up almost immediately after system start-up and they ended up expiring also at about the same time in 15 minutes. This explains the cyclic pattern of requests spiking every 15 minutes.
Having found out the issue, the solution was pretty straightforward:
- Avoid saturating the bandwidth by reducing the size of data transferred from the actor system to Redis back and forth. We applied Kryo serialization to our cache key and value and it resulted in reducing cache item size by 30%.
- Use sharding. Our initial setup had only one Redis host which has a finite network bandwidth at the end of the day. Instead of single host Redis, we setup a cluster of Redis hosts which evenly share the key space and thus the reduce the network traffic to a single host.
- Mitigate the periodical spikes. We staggered the TTL of each cache item randomly between 10-15 minutes. This greatly smoothed the spikes caused by cache misses.
In short, we learnt from the process that cache should be treated equally as any other components in your system. Design your caching from the beginning, estimate its load and it access patterns. Without care, it can become your next bottleneck.
Actor is Cheap
Well, I’m talking about Akka Actor, of course. As was introduced in previous blogposts, our Akka based system creates a new orchestrator actor for each customer request. This actor is responsible for handling the entire workflow of the recommendation process: fetch required metadata, interact with different machine learning components, assemble and rack partial results, and send the final result back to the front end to be displayed to the customer.
When we first designed this approach, we were still hesitant and worried that creating one actor for each request could be a problem for stability and scalability of the system. But eventually the system proved that it is not an issue at all. With our current requests per second rate, the Akka cluster is creating thousands or even 10s of thousands of actors every second without any issues at all. Of course, we took great care in ensuring each actor is terminated promptly after the request is served, whether successfully or unsuccessfully. In other words, it will not live indefinitely in the memory.
With this per-request actor approach, the programming model is greatly simplified as the programmer can take advantage of the Akka paradigm of “single-threaded illusion” within each actor.
We planned to carry on applying this approach for our next system.
Write Affects Read
As was discussed before, our Real-time Recommendation Engine is backed by several machine learning models trained offline using batch written using Apache Spark. The output of the training jobs are exported to online store which is used by the recommendation system.
Because of the strict latency requirements for our system (50ms), and our constantly growing requests per second rate, we need our storage system to be performant and scalable. Among multiple possible options, we have picked Cassandra as our main storage technology.
Our trained machine learning models are mix of different hierarchal models. Prominently, we have a collaborative filtering model which is mainly matrix-based. The real-time evaluation of this model is a matrix-vector product operation. Vectors are keyed by customer id. Therefore cache would not help much as majority of our requests are from distinct customers. The real time system thus has to make thousands or tens of thousands of fetch requests into Cassandra during peak time.
Although in general, the performance of Cassandra can be improved by simply increasing replication factor of each table while adding more hosts, we still learnt a few important practices which greatly improved the general storage performance without incurring extra cost. Among them, the most significant ones are regarding how we mitigate the performance of impact on read operations while the backend is writing to the Cassandra cluster.
As was mentioned earlier, the machine learning training output is exported to Cassandra from our offline batch jobs. Those jobs run daily to generate the most up-to-date training models. We soon noticed that when the batch jobs are writing to the Cassandra tables, the read latency increases, although not surprising. Also, when Cassandra is busy doing internal bookkeeping, such as SSTable compaction, the performance degradation of reads is even greater to a point that our service SLA will be violated. To mitigate the performance impact caused by writes, we have done the following which proved to be very effective:
- Schedule our batch writing job to run during off-peak traffic hours. This is very useful since the end user of our systems are customers in 1 time zone. Late night time in India will have very few requests very second. And we can utilize this time to do our writes.
- Writing to Cassandra directly using its client API could degrade the performance of the read if the data is not immediately used by the client. This is because the new data will fill up the memtables in Cassandra or the disk cache. All reads that depends on the old data will require to be fetched from the disk again. In order to overcome this issue, we decided to create SSTables locally using Spark. Each partition in spark is mapped to 1 SSTable and then we upload the SSTables to Cassandra. This strategy is ideal for our scenario because we can only use the new model when all data is available. The data is uploaded to Cassandra without impacting the disk cache and the memtables of Cassandra. Additionally, uploading a SSTable is much faster than writing batches of record to Cassandra using the Client API.
- In our case, new data are coming daily and we need a strategy to reclaim disk space efficiently without impacting the read performance. We decided to configure Cassandra to use Time Window Compaction Strategy. This strategy works best when the data is time series and it facilitates the deletions of old data by simply setting TTL on the SSTable. When the compaction starts, it only requires to go through the SSTable metadata to find out if the file should be deleted or not which makes the deletion of outdated data very efficient.
It was definitely an enjoyable and enlightening engineering experience to build the real-time recommendation engine at Paytm. At the time of writing this blog post (January 2017) we handle over 50 million request every day, we serve recommendations for over 40 million customers, our peak traffic is over 3000 requests per second, and our average response time is approximately 50ms. Given all our learnings we compiled a checklist best practises that we now use in building subsequent machine-learning production systems at Paytm Labs. We will share some of those items here.
- Use centralized logging (ELK) to allow for real-time debugging.
- Ensure clean exit of cluster to eliminate memory leakage problems.
- Use Kamon TraceContext ID for correlating messages across components during debugging a distributed system.
- When using Sigar native library, you must run multiple JVMs from different folders otherwise the JVM will crash.
- Add a numeric increasing version to actor system name for backward incompatible deployment.
- Expose cluster membership state and all routers state including routees as an API endpoint.
- Run at least 3 dedicated Akka seed nodes that should not be restarted often with each deployment.
- Only restart your seed nodes when the actor system name version changes.
- Use split-brain resolver to handle network partition problems and maintain a reliable cluster state.
- configuration and dashboard metrics should be peer reviewed as strictly as code
- Cassandra configurations memory should be reviewed and default configurations should not be used.
- Dashboard metrics will be used in making decision about the system state so it needs to be defined accurately before it can be trusted.