I have been thinking about writing some of the best practices that I have seen people have successfully deployed while designing Distributed Systems (2 years back) and wanted to share some of those:
- Observability: (Telemetry data): Logs, metrics, and traces are the three pillars of observability. (Traces follow individual requests through a system, illustrating the holistic ecosystem that our request passes through). How many times a short URL has been used, what was user location? Where and we're going to store these statistics? This is one module that should be present in every distributed scalable architecture design. The reason is simple: tracking each user request in a distributed system composed of microservices is very hard in the absence of a tracking mechanism. Just like we do profiling in kernel for operating systems, similarly we would want to use tracing to understand where each request is spending time and why? Using Prometheus and Grafana are 2 open source systems, we can monitor each request visually. Prometheus is used for monitoring & alerting to scrape & store time series data. Use client libraries provided by Prometheus to instrument application code. Use alertmanager to handle alerts. It can be very useful for machine-centric monitoring and querying that time (time series numeric data) in a microservices architecture. The data from Prometheus is graphed using Grafana. The only issue with the above is that it requires lot of coding and time consuming. If the platform is hosted on the cloud, we could use API Gateway that takes care of a lot of stuff user does not have to worry about. We can see what microservice is calling a database and if it's misbehaving leading to DDoS of the DB and exceeding DB limits leading to unresponsive database.
- Spark accumulators & Hadoop counters: Accumulators in Spark are variables used for aggregating info. across executors (operations like sum, max (Not average). When log lines are blank, we use them to find # of blank log lines, # of times network failed etc. Spark updates accumulators inside actions only! [not for transformations e.g. if task is restarted and lineage recomputed, we don’t update accumulators again, only compute once]. e.g accumulator inside map()/filter() won’t get executed unless some action happens on RDD. (i.e. computation inside transactions happen lazily!). We should use tools for tracing and monitoring these counters. One of the tools can be Zipkin that can attach a unique id to a microservice and then track it.
- MySQL Sharding: How do we support MySQL to support millions of QPS (queries/second)? Scaling MySQL > 1 TB can be a challenge in itself. Beyond 1TB can be a real issue and performance issues start hitting the database. Trying to manually code sharding through good engineering practices is feasible but hard and error prone. We can put some sharding logic in application code but it's not easy. Resharding and then modifying "SQL queries with whereclause" is not scalable in the long run as data grows exponentially. One way to scale MySQL is to use opensource Vitess. It's a "sharding middleware" that sits above MySQL providing auto-sharding capability thereby eliminating the use of manual sharding and modifying "whereclause" for SQL queries. Vitess will also help in doing a kill() for misbehaving queries (DDoS from client to db). If a query is fetching more rows that it should, Vitess will throw an error thereby keeping the MySQL system healthy. Another clever solution Vitess has for "reads" is that if multiple queries are hitting the same data/row, Vitess will hold off all queries but one, let it finish and then use the result of that one query to share with others acting like a cache!
- How do we scale reads? In #3 above using Vitess in a MySQL cluster, we add read-replicas. Once we do that, without Vitess, the system has to load balance across read-replicas. Vitess allows to do this for us. If one goes down, it will uniformly balance against the others. And if a master goes down, it'll promote a new replica as the master and continue to serve traffic as if nothing has happened as there no downtime at all.
- Using multiple east-west traffic management platforms: Normally people use Istio for managing east-west traffic for a cluster. In the present of multiple clusters, we can use something called virtual mesh by having each cluster use a version of Istio or one cluster using Istio and another one using Hystrix. The issue happens now is that each cluster has its own source, destination and policy rule. With 2 clusters, we have multiple sources, destinations and > one policy rules leading to lot of problems. One of the recommendation is to avoid using 2 different EW traffic platforms and another is to use one cluster and manage the traffic at higher level so src, dstn, policy rules do not cause conflicts.
- Hotspot problem in sharding: It is very common to shard and hence achieve high performance. One of the key problems is: shard key? How do we decide that? What should be used as a key? One way in general people have used is shard by user_id, another one is shard by "content id". There's another one that uses a hybrid approach alongwith epoch time as well (epoch time = time elapsed in seconds since 1 Jan 1970). This takes care of the hotspot problem if a user becomes very popular or lot of requests are coming from a single user leading to imbalanced shards with one shard getting used more than the others. So, each request will have epoch time also appended to it making it unique.
- Container Orchestration: The most popular one is Kubernetes but there are others as well namely Docker Swarm, Apache Mesos and Red Hat OpenShift.. Kubernetes is very good at keeping the container infrastructure healthy by pinging periodically each container ensuing good health. If orchestrator doesn't get a response to healthy ping, k8s as it's popularly called removes, replaces and restarts the cluster. But there are some minute details reg. k8s that can become an issue while operating a large cluster of containers. One of them is "autoscaling". If average CPU across all "pods exceeds 70%", the k8s controller increases pods up to a configurable pod number. It will decrease this number one by one when this number reads min value. During this downscaling and upscaling, there's time that it takes to perform this operation and this is where one has to code in the logic @ application level. Some problems with k8s I have seen are:
- Kubernetes won’t automatically guarantee that resources are properly allocated between different workloads running in a cluster. To set that up, you need to set up resource quotas manually.
- Kubernetes is not very good at helping you optimize your costs. It doesn’t notify you if the servers in a cluster are only being used at e.g. 10% capacity, which may mean you’re wasting money on an over-provisioned infrastructure.
- When scheduler decides to move pod to another node to free up resources on a given node (virtual/physical), the pod is moved and recreated.(same pod with different name and on different node). This way we horizontally scale an application. For stateless microservices, this is ok as the state is stored in database, extra services can be taken down without affecting application availability. That's why people call these services "ephemeral". But what about all the Operational metrics that we collected? Do we lose all those? Unfortunately, all that is lost unless we code it explicitly in the application code and hence Observability is very tricky and hard in a k8s cluster.
- Common k8s problems like crash loops are very hard to debug given the nature of stateless microservices, ephemeral services, upscaling and downscaling a service based on demand. Common k8s problems when upscaling a cluster is we have x amount of memory available, requirement for a container is x-10, the container will get scheduled and run on the cluster. But once container requirement reaches > x, it will not be allowed to run, application will crash and then k8s will restart it again. This will lead to an infinite loop of start-->crash-->restart.