use current node load estimation when placing search jobs#6390
use current node load estimation when placing search jobs#6390trinity-1686a wants to merge 10 commits into
Conversation
| fn compute_split_cost(split_metadata: &SplitMetadata) -> usize { | ||
| pub(crate) fn compute_split_cost(num_docs: u64) -> usize { | ||
| // TODO this formula could be tuned a lot more. The general idea is that there is a fixed | ||
| // cost to searching a split, plus a somewhat-linear cost depending on the size of the split |
There was a problem hiding this comment.
this should include if we have aggregations or not, or ideally a cost from the query
There was a problem hiding this comment.
i'm not sure how to factor that in exactly, we don't have a good cost model for aggregations.
A simple count() group by X on a low cardinality field isn't usually very expensive, but a cardinality(Y) group by Z with both being high cardinality is much more expensive, though i have no idea of by how-much. And this also depends on the selectivity of the query
There was a problem hiding this comment.
We would probably have a very rough estimate on the query AST. Can you add the TODO to eventually track aggregation cost and estimate query selectivity?
| // Seed each candidate node with its current load so the placer avoids | ||
| // routing work to already-loaded nodes. If a node fails to report its | ||
| // load (error or timeout), `load` stays `None`: we still route work | ||
| // there if all other nodes are overloaded, but we prefer reachable |
There was a problem hiding this comment.
maybe we should error instead if all nodes are not reachable or overloaded
| } | ||
|
|
||
| async fn get_load(&self) -> usize { | ||
| self.searcher_context.search_permit_provider.get_load() |
There was a problem hiding this comment.
would it be useful to add a metrics to track the load per node ?
| const GET_LOAD_TIMEOUT: Duration = Duration::from_millis(200); | ||
| let load_futures = candidate_nodes.iter_mut().map(|node| { | ||
| let mut client = node.client.clone(); | ||
| async move { tokio::time::timeout(GET_LOAD_TIMEOUT, client.get_load()).await } |
There was a problem hiding this comment.
I wonder if we should have a 1 sec cache for the load to reduce traffic
|
|
||
| let total_load: usize = jobs.iter().map(|job| job.cost()).sum(); | ||
|
|
||
| // Compute `target_load` using only reachable nodes (those with a known |
There was a problem hiding this comment.
Braindump:
I think there's a risk involved with regards to caching.
Assuming node A has the highest affinity for split X
=> We can assume node A has caches filled for split X.
Scenario: high load.
node A = 110, node B = 100.
=> We end up putting queries for split X split on node B. This has a higher cost because node B needs to fill the caches for split X (2x query cost?). The formula we use always adds a constant 5 to fill caches.
This may cause cache evictions on node B, which increases the real cost more.
=> higher total load on the cluster
The query will be longer on node B, which will report the cost for longer, so it has self-correcting properties.
We can expect for some splits to be more queried often, because they are newer, so we want more nodes on them (for some time) and this is in the right direction.
When placing jobs, first query all nodes for their current load, and bias placement toward less loaded nodes to even load on the cluster
this improves on a problem where some nodes might be overloaded while other are underloaded, causing queueing despite not all nodes being at max capacity
in testing, this was seen improving slightly p50+ under constant light load, and increased the max qps before latency explodes. Metrics also showed all searcher busy when some would be only part-time working before
future improvements: