http://highscalability.com/blog/2015/9/14/how-uber-scales-their-real-time-market-platform.html
Video
http://www.infoq.com/br/presentations/scaling-uber-with-nodejs
http://www.infoq.com/presentations/uber-market-platform
Architecture Overview
What drives it all are riders and drivers on their mobile phones running native applications.
The backend is primarily servicing mobile phone traffic. Clients talk to the backend over mobile data and the best effort Internet.
Clients connect to the dispatch system which matches drivers and riders, the supply and demand.
Dispatch is written almost entirely in node.js.
Maps/ETA (estimated time of arrival). For Dispatch to make an intelligent choice it’s necessary to get maps and routing information.
Street maps and historical travel times are used to estimate current travel times.
The language depends a lot on what system is being integrated with. So there’s Python, C++, and Java
Databases. A lot of different databases are used.
The oldest systems were written in Postgres.
Redis is used a lot. Some are behind Twemproxy. Some are behind a custom clustering system.
MySQL
Uber is building their own distributed column store that’s orchestrating a bunch of MySQL instances.
Some of the Dispatch services are keeping state in Riak.
Post trip pipeline. A lot of processing must happen after a trip has completed.
Collect ratings.
Send emails.
Update databases.
Schedule payments.
Written in Python.
Money. Uber integrates with many payment systems.
A good solution is to have backup requests with cross server cancellation. This is baked-in to TChannel as a first class feature. A request is sent to Service B(1) along with the information that the request is also being sent to Service B(2). Then some delay later the request is sent to Service B(2). When B(1) completes the request it cancels the request on B(2). With the delay it means in the common case B(2) didn’t perform any work. But if B(1) does fail then B(2) will process the request and return a reply in a lower latency than if B(1) was tried first, a timeout occurred, and then B(2) is tried.
http://www.infoq.com/br/presentations/a-arquitetura-de-sistemas-de-tempo-real-da-uber
Data Center Failure
• How do we replicate trip data?
• Constants updates
• Writes heavy
• Temporal, and minimal loss expected
Key insight: each driver application has trip data already Realtime Trip Replication (RTTR)
RTTR
• A key-value store on the phone
• A timeseries store for partner gps points on the phone
• Piggyback on existing communication protocols
• All data encrypted
Data in Realtime
Ops need realtime analytics
Dispatch needs data for decisions
An event-based data platform
state driver_arrived
from_state driver_accepted
timestamp 13244323342
lattitude 12.23
longitude 30.00
• Reliable replication of states
• Canonical state representation
• Domain specific APIs
Canonical representation of states
Consistency matters
• Normalize your events if possible. E.g., no PII
• More generally: keep apps robust by minimizing assumptions
• Introduce context to correlate events.
E.g., trip ID, root service
• Static join
• State tracking
Aggregation
Tip: Emit event for edges instead of nodes
Option 1: two events
• driver_dispatched
• driver_rejected
Option 2: single event
• driver_rejected
• parent: driver_dispatched
Apache Storm for Event Processing
Lessons Learned
• Know your data
• Really small graph
• Lots of them over time
• No need to have a graph database
Boolean query is a must have
• Pre-aggregation is nice, but keep all the dimensions for GROUP BY queries
• Build a query service first
Domain Specific Query Service
Separation of concerns
• Application teams care about business logic
• Someone has to worry about optimization, caching, indexing, scaling, and etc
/driverAcceptanceRate?geo_dist(10,[37,22])&time_range(2015-‐02-‐04,2015-‐03-‐06)&aggregate(timeseries(7d))&eq(msg.driverId,1)
http://www.infoq.com/br/presentations/scaling-uber-with-nodejs
Uber Unveils its Realtime Market Platform
the dispatch system is mostly built with NodeJS, but Ranney mentioned Uber wants to switch to io.js, a NodeJS fork. Ranney also briefly talked about other Uber's architecture components. Maps and ETAs are written in several languages, such as C++ and Java, due to the need to integrate with different kinds of services. All their business logic is written in Python. Uber is building their own column-oriented distributed data store but they also use Postgres, Redis, MySQL and Riak.
Videos
Designing for failure: How Uber scaled its realtime market platform - Matt Ranney (Uber)
A general theory of reactivity - Kris Kowal (Uber)
Real-time Engineering at Uber and the Evolution of an Event-Driven Architecture - Jeff Wolski
http://highscalability.com/blog/2016/9/28/how-uber-manages-a-million-writes-per-second-using-mesos-and.html
using a datacenter OS like Mesos. By statistically multiplexing services on the same machines you need 30% fewer machines, which saves money. Mesos was chosen because at the time Mesos was the only product proven to work with cluster sizes of 10s of thousands of machines
http://highscalability.com/blog/2016/10/12/lessons-learned-from-scaling-uber-to-2000-engineers-1000-ser.html
it's a Conway's Law thing, you get so many services because that's the only way so many people can be hired and become productive.
The time when things are most likely to break is when you change them.
Microservices allow teams to be formed quickly and run independently. People are being added all the time. Teams need to be formed quickly and put to work on something where they can reason about the boundaries.
Own your own uptime. You run the code that you write. All the service teams are on call for the services they run in production.
Use the best tool for the job. But best in what way? Best to write? Best to run? Best because I know it? Best because there are libraries? When you dig into it, best doesn’t mean a lot.
What if breaks? How do you troubleshoot? How do you you figure out where in the chain of services the break occurred? How do make sure the right people get paged? The right corrective actions are taken fix the problem?
JSON is great, you can look at it with your eyeballs and read it, but without types it’s a crazy mess, but not right away, the problems pop up later. When someone changes something and a couple of hops downstream they were depending on some subtle interpretation of empty string versus null, or some type coercion in one language versus another, it would cause a huge mess that would take forever to sort out. Types on interfaces would have fixed all these of the problems.
RPCs are slower than procedure calls.
http://allenlsy.com/uber-realtime-architecture
http://highscalability.com/blog/2012/6/18/google-on-latency-tolerant-systems-making-a-predictable-whol.html
Video
http://www.infoq.com/br/presentations/scaling-uber-with-nodejs
http://www.infoq.com/presentations/uber-market-platform
Architecture Overview
What drives it all are riders and drivers on their mobile phones running native applications.
The backend is primarily servicing mobile phone traffic. Clients talk to the backend over mobile data and the best effort Internet.
Clients connect to the dispatch system which matches drivers and riders, the supply and demand.
Dispatch is written almost entirely in node.js.
Maps/ETA (estimated time of arrival). For Dispatch to make an intelligent choice it’s necessary to get maps and routing information.
Street maps and historical travel times are used to estimate current travel times.
The language depends a lot on what system is being integrated with. So there’s Python, C++, and Java
Databases. A lot of different databases are used.
The oldest systems were written in Postgres.
Redis is used a lot. Some are behind Twemproxy. Some are behind a custom clustering system.
MySQL
Uber is building their own distributed column store that’s orchestrating a bunch of MySQL instances.
Some of the Dispatch services are keeping state in Riak.
Post trip pipeline. A lot of processing must happen after a trip has completed.
Collect ratings.
Send emails.
Update databases.
Schedule payments.
Written in Python.
Money. Uber integrates with many payment systems.
- Databases. A lot of different databases are used.
- The oldest systems were written in Postgres.
- Redis is used a lot. Some are behind Twemproxy. Some are behind a custom clustering system.
- MySQL
- Uber is building their own distributed column store that’s orchestrating a bunch of MySQL instances.
- Some of the Dispatch services are keeping state in Riak.
- Post trip pipeline. A lot of processing must happen after a trip has completed.
- Collect ratings.
- Send emails.
- Update databases.
- Schedule payments.
- Written in Python.
- Money. Uber integrates with many payment systems.
- Geo by supply makes a coarse first pass filter to get nearby candidate that meet requirements.
- Then the list and requirements are sent to routing / ETA to compute the ETA of how nearby they are not geographically, but by the road system.
- Sort by ETA then send it back to supply to offer it to a driver.
- In airports they have to emulate a virtual taxi queue. Supply must be queued in order to take into account the order in which they arrive.
- The problem is the data for in-process trips may not be in the backup datacenter. Rather than replicate data they use driver phones as a source of trip data.
- What happens is the Dispatch system periodically sends an encrypted State Digest down to driver phones. Now let's say there’s a datacenter failover. The next time the driver phone sends a location update to the Dispatch system the Dispatch system will detect that it doesn’t know about this trip and ask the for the State Digest. The Dispatch system then updates itself from the State Digest and the trip keeps on going like nothing happened.
- Node runs in a single process so some method must be devised to run Node on multiple CPUs on the same machine and to multiple machines.
A good solution is to have backup requests with cross server cancellation. This is baked-in to TChannel as a first class feature. A request is sent to Service B(1) along with the information that the request is also being sent to Service B(2). Then some delay later the request is sent to Service B(2). When B(1) completes the request it cancels the request on B(2). With the delay it means in the common case B(2) didn’t perform any work. But if B(1) does fail then B(2) will process the request and return a reply in a lower latency than if B(1) was tried first, a timeout occurred, and then B(2) is tried.
http://www.infoq.com/br/presentations/a-arquitetura-de-sistemas-de-tempo-real-da-uber
Membership changes
Separate failure detection from membership updates
• Do not rely on a single peer for failure detection
• Membership changes via gossip-like protocols
SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol
Data Center Failure
• How do we replicate trip data?
• Constants updates
• Writes heavy
• Temporal, and minimal loss expected
Key insight: each driver application has trip data already Realtime Trip Replication (RTTR)
RTTR
• A key-value store on the phone
• A timeseries store for partner gps points on the phone
• Piggyback on existing communication protocols
• All data encrypted
Data in Realtime
Ops need realtime analytics
Dispatch needs data for decisions
An event-based data platform
state driver_arrived
from_state driver_accepted
timestamp 13244323342
lattitude 12.23
longitude 30.00
• Reliable replication of states
• Canonical state representation
• Domain specific APIs
Canonical representation of states
Consistency matters
• Normalize your events if possible. E.g., no PII
• More generally: keep apps robust by minimizing assumptions
• Introduce context to correlate events.
E.g., trip ID, root service
• Static join
• State tracking
Aggregation
Tip: Emit event for edges instead of nodes
Option 1: two events
• driver_dispatched
• driver_rejected
Option 2: single event
• driver_rejected
• parent: driver_dispatched
Apache Storm for Event Processing
Lessons Learned
• Know your data
• Really small graph
• Lots of them over time
• No need to have a graph database
Boolean query is a must have
• Pre-aggregation is nice, but keep all the dimensions for GROUP BY queries
• Build a query service first
Domain Specific Query Service
Separation of concerns
• Application teams care about business logic
• Someone has to worry about optimization, caching, indexing, scaling, and etc
/driverAcceptanceRate?geo_dist(10,[37,22])&time_range(2015-‐02-‐04,2015-‐03-‐06)&aggregate(timeseries(7d))&eq(msg.driverId,1)
http://www.infoq.com/br/presentations/scaling-uber-with-nodejs
Trip State Machine (Extended)
Version 1
• PHP dispatch
• Outsourced to remote
contractors in Midwest
• Half the code in spanish
• Flat file
• Lifetime: 6-9 months
Tradeoffs
• Learning curve
• Scalability
• Performance
• Library ecosystem
• Database drivers
• Documentation
• Monitoring
• Production operations
Version 2
• Lifetime: 9 months
• Developed in house
• Node.js application
• Prototyped on 0.2
• Launched in production with 0.4
• MongoDB datastore
Expect the unexpected
Version 3
Mongo did not scale with volume of GPS logs (global write lock)
• Swapped mongo for redis and flat files
Decoupling storage of different types of data
• Node.js mongo client failed to recognize replica set topology changes
Be wary of immature client libraries
Focus on driving business value
Capacity planning, forecasting, and load testing are your friends
Measure everything
Version 4
Nickname: The Grid
• Multi-process dispatch
• Peer assignment
• Redis is now considered the source of truth
• Use lua interpreter for atomic operations
• Fan out to all city peers to find nearby cars
Version 5
Break out services as needed
Understand v8 to optimize Node.js applications
Stateless applications…
No single points of failure…
Replicated data stores…
Dynamic application topology…
Version 6
Version 5 - haproxy
Do the obvious
Pros
• every application is horizontally scalable
• flexible, partially dynamic topology
• failure recovery manual in the worst case
• supports primary business case very well
• conservative estimates 1-2 years of runway
Never be satisfied
Cons
• what happens when a city out scales the capacity of a single redis instance?
• who wants to wake up in the middle of the night for servers crashes?
• what about future business use cases?
World Class
• city agnostic dispatch application
• “stateless” applications
• scale to 100x current load
• flexible data model
Every now and then it’s okay to bend the rules
So why did we stick with Node.js?
• JavaScript is easy to learn
• Simple interface with thorough documentation
• Lends itself to fast prototyping
• Asynchronous, nimble
• Avoid concurrency challenges
• Increasingly mature module ecosystem
How to win with Node.js?
• measure everything - particularly response times and event loop lag
• learn to take heap dumps to debug memory issues
• strace, perf, flame graphs are necessary tools for improving performance
• small, reusable components to reduce duplication
Uber Unveils its Realtime Market Platform
The new dispatch system has two major services: supply, the drivers, and demand, the riders. These services track all the capabilities and the state machines of supply and demand. For instance, the supply service knows how many seats a vehicle has or if it can fit a wheelchair. The dispatch system has a third service, called Disco (Dispatch Optimization), whose main function is to match supply and demand. Disco enables Uber to "look into the future" and to use information as it comes in. For instance, the old dispatch system only looked to current available supply. As most partners are usually busy, this approach allowed Uber to maintain a global index. The new dispatch system is more efficient, but it requires much more data. Uber wants this new system to handle one million writes a second and a much higher read rate, so it needed to shard its data.
To achieve that kind of scale, Uber chose to use Google's S2 Geometry Library. S2 is able to split a sphere into cells, each with an id. The Earth is roughly spherical, so S2 can represent each square centimeter of it with a 64-bit integer. S2 has two important properties for Uber: it is possible to define each cell's resolution and it is possible to find the cells that cover a given area. Uber uses 3,31 km2 cells to shard its data. All this new data enables Uber to reduce wait times, extra driving by partners and the overall estimated times to arrival (ETA). So, what happens when a rider wants to use Uber? Uber uses the rider location and S2's area coverage function to look for drivers that can be matched with a rider. Uber then chooses the shortest ETA, taking into account not only the drivers who are available, but also those that will become available in time to pick up the rider.
The dispatch system is mostly built with NodeJS, meaning that it is single-threaded. Uber wants to take advantage of all cores of a machine, but it also needs to add new nodes to the system with ease. Ranney also argues that servers need to be stateful, or else the datastores won't be able to cope with the load. Uber thus opted to treat all Dispatch processes the same, whether they are running on the same machine or not. They've built ringpop to handle this problem. Ringpop uses aconsistent hash ring, also used by Amazon's Dynamo, memcached or Riak, to distribute state across nodes. To manage cluster membership and failure detection, ringpop uses SWIM, which stands for Scalable Weakly-consistent, Infection-style Process Group Membership Protocol. It is the same gossip protocol that's used by Hashicorp's Serf. Ringpop uses TChannel, also built by Uber, as its RPC protocol.
TChannel is inspired by Finagle's multiplex RPC protocol, Mux, which was built by Twitter. Uber felt the need to create its own protocol mainly because it needed to support multiple languages (javascript and python), tracing and encapsulation. Ranney told the audience that Uber is moving out of HTTP+JSON and moving towards Thrift over TChannel. Ranney claimed that TChannel is twenty times faster than HTTP when used in NodeJS.
Most of Uber's architectural choices are driven by availability and performance, as it is easy to drivers and riders turn to the competition. At Uber, everything has to be retryable, thus, idempotent and killable, including databases. Each piece of the system must be built on the assumption that the only way to shutdown a process is by crashing. All these constraints also favour small services so that if any one crashes, then the disruption is contained.
The proliferation of small services and the extreme distribution of them can have an impact on performance: the overall latency of a request is greater or equal than the latency of the slowest component. Ranney likes Google's Jeffrey Dean approach on this subject. For instance, TChannelsupports "backup requests with cross server- cancellation". This means that the same request might be sent to two instances of the same service, with a slight delay between the two. The first instance to reply handles the cancelling the request on the second instance, to cut redundant work.
Uber's approach to data center failure is ingenious. No data is replicated across data centers, as that puts a lot of constraints on availability and consistency. Uber uses the driver's phones to distribute the data. Given that the driver's phones post location updates to the server every four seconds, the server periodically replies with an encrypted state digest. If a data center fails the driver will contact a new data center to post a location update. The new data center doesn't know anything about this particular driver so it asks for the state digest and picks up from there.
the dispatch system is mostly built with NodeJS, but Ranney mentioned Uber wants to switch to io.js, a NodeJS fork. Ranney also briefly talked about other Uber's architecture components. Maps and ETAs are written in several languages, such as C++ and Java, due to the need to integrate with different kinds of services. All their business logic is written in Python. Uber is building their own column-oriented distributed data store but they also use Postgres, Redis, MySQL and Riak.
Videos
Designing for failure: How Uber scaled its realtime market platform - Matt Ranney (Uber)
A general theory of reactivity - Kris Kowal (Uber)
Real-time Engineering at Uber and the Evolution of an Event-Driven Architecture - Jeff Wolski
http://highscalability.com/blog/2016/9/28/how-uber-manages-a-million-writes-per-second-using-mesos-and.html
using a datacenter OS like Mesos. By statistically multiplexing services on the same machines you need 30% fewer machines, which saves money. Mesos was chosen because at the time Mesos was the only product proven to work with cluster sizes of 10s of thousands of machines
- Performance is good: mean read latency: 13 ms and write latency: 25 ms, and P99s look good.
- For their largest clusters they are able to support more than a million writes/sec and ~100k reads/sec.
- Agility is more important than performance
Want run everything on Mesos, including stateful services like Cassandra and Kafka.
- Mesos is Data Center OS that allows you to program against your datacenter like it’s a single pool of resources.
- Uber has build their own sharded database on top of MySQL, called Schemaless. The idea is Cassandra and Schemaless will be the two data storage options in Uber. Existing Riak installations will be moved to Cassandra.
- A single machine can run services of different kinds.
- Statistically multiplexing services on the same machine can lead to needing 30% fewer machines. This is a finding from an experiment run at Google on Borg.
- If, for example, one services uses a lot of CPU it matches well with a service that uses a lot of storage or memory, then these two services can be efficiently run on the same server. Machine utilization goes up.
- Uber has about 20 Cassandra clusters now and plans on having 100 in the future.
- Agility is more important than performance. You need to be able manage these clusters and perform different operations on them in a smooth manner.
- Why run Cassandra in a container and not just on the whole machine?
- You want to store hundreds of gigabytes of data, but you also want it replicated on multiple machines and also across datacenters.
- You also want resource isolation and performance isolation across different clusters.
- It’s very hard to get all that in a single shared cluster. If you, for example, made a 1000 node Cassandra cluster it would not scale or it would also have performance interference across different clusters.
- Largest 2 clusters: more than a million writes/sec and ~100k reads/sec
- One of the clusters is storing the location that is sent out every 30 seconds by both the driver and rider apps.
- Mean read latency: 13 ms and write latency: 25 ms
- Mostly use LOCAL_QUORUM consistency level (which means strong consistency)
Mesos Backgrounder
- Mesos abstracts CPU, memory, and storage away from machines.
- You are not looking at individual machines, you are looking at and programming to a pool of resources.
- Linear scalability. Can run on 10s of thousands of machines.
- Highly available. Zookeeper is used for leader election amongst a configurable number of replicas.
- Can launch Docker containers or Mesos containers.
- Pluggable resource isolation. Cgroups memory and CPU isolator for Linux. There’s a Posix isolator. There are different isolation mechanisms for different OSes.
- Two level scheduler. Resources from Mesos agents are offered to different frameworks. Frameworks schedule their own tasks on top of these offers.
- Operationally simple. It’s a homogenous cluster. There’s no master. There are no special nodes in the cluster.
- Sufficiently rich data model. It has columns, composite keys, counters, secondary indexes, etc
- Good integration with open source software. Hadoop, Spark, Hive all have connectors to talk to Cassandra.
Multi-datacenter support
- Independent installations of Mesos are setup in each datacenter.
- Independent instances of the framework are setup in each datacenter.
- The frameworks talk to each other and periodically exchange seeds.
- That’s all that is needed for Cassandra. By bootstrapping the seeds of the other datacenter the nodes can gossip the topology and figure out what the nodes are.
- Round trip ping latency between data centers is 77.8 ms.
- The asynchronous replication latency for P50 : 44.69 ms; P95: 46.38ms; P99: 47.44 ms;
http://highscalability.com/blog/2016/10/12/lessons-learned-from-scaling-uber-to-2000-engineers-1000-ser.html
it's a Conway's Law thing, you get so many services because that's the only way so many people can be hired and become productive.
Microservices are a way of replacing human communication with API coordination. Rather than people talking and dealing with team politics it's easier for teams to simply write new code.Scaling the traffic is not the issue. Scaling the team and the product feature release rate is the primary driver.
The time when things are most likely to break is when you change them.
Microservices allow teams to be formed quickly and run independently. People are being added all the time. Teams need to be formed quickly and put to work on something where they can reason about the boundaries.
Own your own uptime. You run the code that you write. All the service teams are on call for the services they run in production.
Use the best tool for the job. But best in what way? Best to write? Best to run? Best because I know it? Best because there are libraries? When you dig into it, best doesn’t mean a lot.
What if breaks? How do you troubleshoot? How do you you figure out where in the chain of services the break occurred? How do make sure the right people get paged? The right corrective actions are taken fix the problem?
- Dispatch: When development was brought in house it was written in Node.js and is now moving to Go.
- Core Service: the rest of the system, was originally written in Python and is now moving to Go.
- Maps was eventually brought in house and those teams are using Python and Java.
- The Data Engineering team writes their code in Python and Java.
- The in-house Metric system is written in Go.
- You start to notice that’s a lot of languages. Microservices allow you to use lots of languages.
- Teams can write in different languages and still communicated with each other. It works, but there are costs:
- Hard to share code.
- Hard to move between teams. Knowledge built up on one platform doesn’t transfer to another platform. Anyone can learn of course, but there’s a switching cost.
- What I Wish I Knew: having multiple languages can fragment culture. By embracing the microservices everywhere you can end up with camps. There’s a node camp, a Go camp, etc. It’s natural, people organize around tribes, but there’s a cost to embracing the strategy of having lots of languages everywhere.
- At scale with lots and lots of people joining really quickly the weaknesses of HTTP start to show up. Like what are status codes for? What are headers for? What goes in the query string? Is this RESTful? What method is it?
JSON is great, you can look at it with your eyeballs and read it, but without types it’s a crazy mess, but not right away, the problems pop up later. When someone changes something and a couple of hops downstream they were depending on some subtle interpretation of empty string versus null, or some type coercion in one language versus another, it would cause a huge mess that would take forever to sort out. Types on interfaces would have fixed all these of the problems.
RPCs are slower than procedure calls.
Politics happen whenever you make a decision that violates this property: Company > Team > Self.
What I Wish I Knew: if people are working on some kind of platform type feature it doesn’t sound good to hear Amazon has just released your thing as a service.
- You still try to rationalize why you should use your own private thing as a service.
- When there are problems logging itself can make those problems worse by logging too much. Need backpressure in the log to drop log entries when overload. Wish that had been put in the system earlier.
- The idea is to put pressure back on developers to log smarter, not harder.
- Uber created uber-go/zap for structured logging.
- Fanout causes a lot of performance problems.
- Distributing tracing is how you track down fanout problems. Without a way to understand a requests journey through the architecture it will be difficult to track down fanout problems.
- Uber is using OpenTracing and Zipkin.
- Another approach is to use logs. Each log entry has a common ID that threads all the services together.
- Example is given of a tricky case. The top level had a massive fanout all to the same service. When you look at the service it looks good. Every request is fast and is consistent. The problem was the top level service got a list of ID and was calling the service for each ID. Even concurrently that will take too long. Just use a batch command. Without tracing it would have been very hard to find this problem.
- Another example is a service that made many thousands of service calls. Though each call was fast the large number of them made the service slow. It turns out when traversing a list and changing a property it magically turned into a database request. Yet the database team says the database is working great because each operation is fast, but they will wonder why there are so many operations.
- The overhead of tracing can change the results. Tracing is a lot of work. One option is to not trace all requests. Trace a statistically significant portion of the requests. Uber traces about 1% of requests.
- What I Wish I Knew: Tracing requires cross-language context propagation.
- Because all these different languages are used with all these different frameworks, getting context about the request, like what user it is, are they authenticated, what geo-fence are they in, that becomes very complicated if there’s no place to put this context that will get propagated.
- Any dependent requests a service makes must propagate context even though they may not understand it. This feature would have saved so much time if it had been added a long time ago.
- Solution: run tests on production during off-peak hours.
- Causes lots of problems.
- It blows up all the metrics.
- You don’t want people to think there’s more load than there really is. To fix the problem it gets back to the context propagation problem.
- Make sure all test traffic requests have some context that says this a test request so handle your metrics differently. That has to plumb all the way through the system.
- What I Wish I Knew: what we really want to do is run load through all the services all the time because a lot of bugs only show up when traffic hits its peak.
- Want to keep systems near their peaks and the back off as real traffic increases.
- Wishes the system was long ago built to handle test traffic and account for it differently.
What I Wish I Knew: Not everybody likes failure testing, like Chaos Monkey, especially if you have to add it in later.
- What we should have done is made failure testing happen to you if you liked it or not. It’s just part of production. Your service just has to withstand random killings and slowings and perturbations of operations.
http://allenlsy.com/uber-realtime-architecture
Uber 实时系统的 Use case:
- 把乘客匹配给司机
- 计算乘车费
- 预计乘车时间,司机到达时间等
举一个更详细些的例子,UberEATS 是 Uber 的外卖服务。实时系统也为这个功能估算送餐时间。其中需要考虑的因素有:
- 餐厅此时的繁忙程度
- 做这份菜需要的时间
- 路上的交通状况
- 大致有多少可以供派遣的送餐车
所有来自乘客和司机的事件 event ,由 Kafka 收集 。Kafka 使用 Pub-sub 的订阅发布模式。Uber 整个系统中各个 microservice 之间的通信也通过了 Kafka。 Uber 使用 Samza 或 Flink 之类的工具做流处理。Kafka 也被用来记录数据库 schema(结构)的变化。
上图是 Uber real time system 的大体架构。Kafka events 的来源主要是客户端 app,web API / Service 以及数据库。然后 event message 被 Kafka 分发给其他组件。Surge 负责计算乘车费用。ELK 负责做 logging,为 debug 提供数据。 Samza / Flink 处理实时数据,为数据分析和警报功能提供依据。AWS S3 和 Hadoop 也记录 message,供线下数据分析。
Uber 对于这个系统的要求是:
- 低延迟 Low latancy:API 延迟在 5ms 以内
- 高可用性 High availability:可用性在 99.99% 以上,意思就是全年宕机大约不超过1小时
- 多个数据中心之间要可以做数据复制
- 支持做 message 的数据审计,auditing
Uber 的 Kafka 架构,是由每个地区的 regional Kafka 将 message 向中心汇总。这其中,Kafka REST proxy 拥有候补服务器 secondary Kafka。因为在 uber 的 Kafka use case 中,有些需求对于高可用性要求非常高,比如用户订车。而有些数据允许丢失,但是要求很低的延迟,比如 logging data。
为了实现 low latancy 以及 high availability,最重要的两点是:在每个步骤都采用大量批处理(batching)和异步处理(async processing),一定不要阻塞服务器。
举个例子说,从 uber 的手机app上将一条 queue message 发送给 Kafka,是通过 app 里面的 proxy client library。而 proxy client 收到 message 后,立刻 acknowledge,然后将 message 放入一个缓存 buffer 中。当 buffer 填满时,所有 buffer 里面的 message 被打包成一个 batch,一起发送给 Kafka proxy server。这样可以实现 high throughput 和 low latancy 。而在后面, Kafka proxy server 也是一样,收到这个 batch 之后,立刻 acknowledge,然后根据 message 的 broker 不同,重新分类 message,缓存并打包成 batch ,发送给 Regional Kafka。
Uber 的 Proxy client library是自己实现的。它的特点是:
- 支持高吞吐量 high throughput:非阻塞,异步,批处理
- 当 kafka server 出现故障时,能缓存 message 在本地。等 server 恢复之后,再逐步发给 server
- Topic Discovery:类似于 service discovery 的思想。Topic Discovery 负责自动发现哪个 topic 在哪个 kafka cluster 上
uReplicator 是为了改进 Kafka 的 mirror maker 功能。当一个 broker 被删除时, Kafka 会对 partition 进行 rebalance。从一个 cluster 复制 partition 到另一个 cluster 时,如果一个 cluster 已经有很多 partition 了( 比如 500 个以上),mirror maker 功能会变得很慢。
有些特别的 use case ,并不适合使用 buffer 加 batch 的机制,比如支付。在前面看到的流程中,如果 app 或者 proxy server 崩溃了, 都会造成缓存里的数据丢失。
因此,Uber 的 Kafka 也是要支持同步机制的。类似支付这样的 message,会被 proxy client 同步发给 proxy server ,但不会放进buffer ,proxy client 也不会立刻 acknowledge。Proxy server 发送 message 给 regional Kafka 时,需要至少三台服务器返回 ack ,proxy server 才向 proxy client 返回 ack。此时为了保证高一致性,而增加了延迟。Uber 的系统支持对各个步骤的机制做微调,以找到最适合自己的配置