Monotonic binning with PAVA
In credit-risk modelling, a common requirement is to maintain monotonicity of the target variable with respect to risk drivers. Think PD modelling where the target is the probability of default and the risk drivers are customer characteristics and/or their financial situation.
A useful technique for this is monotonic binning. The idea is to bin the continuous variable in some user-defined way (e.g., evenly-spaced breakpoints or quantile-based), then merge consecutive bins so that their default rates are monotonic.
PAVA, or the pooled adjacent violators algorithm, is a classic method for solving optimization problems with monotonicity constraints, such as isotonic regression. In this post, we adapt its core idea to solve the monotonic-binning problem.
We’ve simplified the implementation as much as possible. What you see below is just the bare bones. Even though the code is short, it deserves some analysis for its complexity, which we’ll get to next.
PAVA
We maintain two dynamic states:
- A list of pools that represents current clusters
- A list of values that stores the statistics (number of defaults and total in each cluster)
We iteratively compare the current pool and the next one (based on their values):
- If the constraint is met, we advance by one index.
- If the constraint is violated, we merge the current and next pools and backtrack to check again. We keep merging and backtracking as needed until the constraint holds.
That’s it!
def pava(x, constraint=lambda a, b: a[0] / a[1] <= b[0] / b[1]):
"""x : list of pairs of counts [(n_default, total), ...]"""
n = len(x)
active_pools = [[i] for i in range(n)]
active_values = list(np.asarray(x))
i = 0
while i < len(active_pools) - 1:
if not constraint(active_values[i], active_values[i + 1]):
# Violation found, merge pools
active_pools[i].extend(active_pools[i + 1])
del active_pools[i + 1]
active_values[i] = active_values[i] + active_values[i + 1]
del active_values[i + 1]
# Step back to check for new violations with the merged pool
if i > 0:
i -= 1
else:
i += 1
return active_pools
Complexity
Because the index can both advance and backtrack, it's not immediately obvious what the time complexity is.
We can think of the process as alternating between two phases:
- A forward merge occurs, followed by backtracking and merging until monotonicity is restored.
- Or, if no merge is needed, the index keeps advancing by 1 until it encounters a violation that forces a merge.
Index advancement happens at most n times, because merging doesn’t affect how many positions remain ahead of the current index. Every time we advance, there are fewer spots left to advance into.
Let \(b_i\) be the number of backtracks during the i-th forward-merge round. We can show:
Since merges—and thus backtracks—occur at most n times and each merge is \(O(1)\), the overall complexity is \(O(n)\).
The catch
A sharp reader will notice that removing elements from a Python list (del
) inside the loop is actually \(O(n)\), so the binning algorithm as written is \(O(n^2)\).
To truly achieve \(O(n)\) complexity, you’d need a data structure that supports \(O(1)\) element removal (like a linked list), which we’re not using here for clarity.
Bonus: stack-based approach
Since rewriting the function pava
with a custom linked-list feels a bit obscure, I ask an AI (Gemini 2.5 pro) to find an alternative
i feel that the code with linked link is harder to follow because of all the method calling swapping things around. is there a cleaner way?
It’s as short as the original but achieves \(O(n)\) without needing a linked list:
def pavai(x, constraint=lambda a, b: a[0] / a[1] <= b[0] / b[1]):
"""
x: list of pairs of counts, e.g., [(numerator, denominator), ...]
"""
if not x:
return []
stack = [] # stores tuples of (value, indices) for each active pool
for i, val in enumerate(x):
current_value = np.asarray(val)
current_indices = [i]
# --- Backwards Merging ---
# While the stack isn’t empty and the top pool violates the constraint
# with the current pool, merge them.
while stack and not constraint(stack[-1][0], current_value):
prev_value, prev_indices = stack.pop()
# Merge it into the current pool
current_value += prev_value
current_indices = prev_indices + current_indices
# Now that all violations are resolved, push the new pool onto the stack
stack.append((current_value, current_indices))
# --- Finalization: unpack the stack into the final list of pools ---
final_pools = [indices for value, indices in stack]
return final_pools