Algorithms: Quick Notes#
Bitmasks#
Bitmasking performs operations at the bit level (often faster and memory-efficient).
-
Commonly used for:
- subset representation (set of small integers),
- DP over subsets,
- toggling flags,
- fast membership checks.
-
Quick patterns
- Iterate set bits:
while s: lsb = s & -s; ...; s -= lsb - Count bits: Python:
s.bit_count() - Enumerate all subsets of a mask:
sub = mask; while sub: ...; sub = (sub - 1) & mask(also include0separately)
- Iterate set bits:
Convention:
kis 0-indexed from LSB (least-significant bit).
| Operation | Expression | Notes |
|---|---|---|
| Test kth bit | s & (1 << k) |
non-zero means set |
| Turn on kth bit | s = (1 << k) |
(common bug: don’t assign s = (1<<k)) |
| Turn off kth bit | s &= ~(1 << k) |
clears bit |
| Toggle kth bit | s ^= (1 << k) |
flips bit |
| Multiply by 2^n | s << n |
beware overflow in fixed-width languages |
| Divide by 2^n | s >> n |
signed right shift is arithmetic in many languages |
| Intersection | s & t |
set intersection |
| Union | s | t |
set union |
| Set subtraction | s & ~t |
elements in s not in t |
| Extract lowest set bit | s & (-s) |
works with two’s complement |
| Clear lowest set bit | s &= (s - 1) |
removes one lowest-set bit |
| Is power of two | s > 0 and (s & (s - 1)) == 0 |
|
| XOR swap (not recommended) | x ^= y; y ^= x; x ^= y; |
use temp var for clarity |
Sorting#
Complexity summary#
| Algorithm | Best | Average | Worst | Space | Stable |
|---|---|---|---|---|---|
| Quicksort | Ω(n log n) |
Θ(n log n) |
O(n^2) |
O(log n) |
No |
| Mergesort | Ω(n log n) |
Θ(n log n) |
O(n log n) |
O(n) |
Yes |
| Timsort (Python/Java) | Ω(n) |
Θ(n log n) |
O(n log n) |
O(n) |
Yes |
| Heapsort | Ω(n log n) |
Θ(n log n) |
O(n log n) |
O(1) |
No |
| Bubble sort | Ω(n) |
Θ(n^2) |
O(n^2) |
O(1) |
Yes |
| Insertion sort | Ω(n) |
Θ(n^2) |
O(n^2) |
O(1) |
Yes |
| Selection sort | Ω(n^2) |
Θ(n^2) |
O(n^2) |
O(1) |
No |
| Counting sort | Ω(n+k) |
Θ(n+k) |
O(n+k) |
O(k) |
Yes |
| Radix sort | Ω(n·d) |
Θ(n·d) |
O(n·d) |
O(n+k) |
Yes (typical LSD) |
| Bucket sort | Ω(n+k) |
Θ(n+k) |
O(n^2) |
O(n+k) |
depends |
k= value range (counting/bucket),d= number of digits/passes (radix).
Quicksort#
- Partition around pivot; recursively sort left/right.
- Average fast in practice; cache-friendly.
- Worst-case
O(n^2)if pivots are bad (fix: random pivot / median-of-three). - Stable: No

Mergesort#
- Divide array, sort halves, merge.
- Predictable
O(n log n)worst-case; stable. - Needs extra memory
O(n). - Great for linked lists and external sorting (disk).

Timsort#
- Hybrid of mergesort + insertion sort on “runs”.
- Excellent for partially sorted data.
- Used by Python (
sorted,.sort) and Java (objects).
Heapsort#
- Build heap
O(n), then extract max/minntimes. - In-place
O(1)auxiliary space. - Not stable; typically slower than quicksort in practice.
Counting / Radix / Bucket (when to use)#
- Counting sort: integers in small range
[0..k). Fast and stable. - Radix sort: fixed-length integers/strings; uses stable counting per digit.
- Bucket sort: when values are uniformly distributed; choose buckets well.
Permutations#
- Definition: order matters.
- Count (distinct items):
n! - Count (choose
rfromn, order matters):nPr = n! / (n-r)! - Typical interview use:
- generate all permutations (
O(n!)), - permutations with duplicates (needs sorting + skip duplicates),
- next permutation (greedy + reverse suffix).
- generate all permutations (
1from collections import Counter
2
3
4def permutations(nums):
5 """All permutations of distinct items. O(n!)."""
6 res = []
7 used = [False] * len(nums)
8 path = []
9
10 def dfs():
11 if len(path) == len(nums):
12 res.append(path.copy())
13 return
14 for i in range(len(nums)):
15 if used[i]:
16 continue
17 used[i] = True
18 path.append(nums[i])
19 dfs()
20 path.pop()
21 used[i] = False
22
23 dfs()
24 return res
25
26
27def unique_permutations(nums):
28 """Permutations with duplicates handled. O(n!) worst-case, prunes duplicates."""
29 res = []
30 counter = Counter(nums)
31 path = []
32 n = len(nums)
33
34 def dfs():
35 if len(path) == n:
36 res.append(path.copy())
37 return
38 for x in list(counter.keys()):
39 if counter[x] == 0:
40 continue
41 counter[x] -= 1
42 path.append(x)
43 dfs()
44 path.pop()
45 counter[x] += 1
46
47 dfs()
48 return resCombinations#
- Definition: order does not matter.
- Count:
nCr = n! / (r!(n-r)!) - Typical interview use:
- generate
k-combinations via backtracking, - subset generation,
- “choose/not choose” DP patterns.
- generate
1def combinations(nums, k):
2 """All k-combinations (order doesn't matter)."""
3 res = []
4 path = []
5
6 def dfs(i):
7 if len(path) == k:
8 res.append(path.copy())
9 return
10 if i == len(nums):
11 return
12
13 # choose
14 path.append(nums[i])
15 dfs(i + 1)
16 path.pop()
17
18 # skip
19 dfs(i + 1)
20
21 dfs(0)
22 return res
23
24
25def combinations_1_to_n(n, k):
26 """Classic: choose k numbers from 1..n (pruning)."""
27 res = []
28 path = []
29
30 def dfs(start):
31 if len(path) == k:
32 res.append(path.copy())
33 return
34 # pruning: need (k-len(path)) more numbers
35 for x in range(start, n + 1):
36 if (n - x + 1) < (k - len(path)):
37 break
38 path.append(x)
39 dfs(x + 1)
40 path.pop()
41
42 dfs(1)
43 return resTree Algorithms#
-
Views of a Tree: Given a binary tree, “views” are what you see from a side/top/bottom.
- Left view: first node at each depth (BFS level-order, pick first).
- Right view: last node at each depth (BFS level-order, pick last).
- Top view: first node encountered for each horizontal distance (HD).
- Bottom view: last node encountered for each horizontal distance (HD).
-
Common implementation:
- BFS with a queue storing
(node, depth, hd). - Use maps:
- left/right:
depth -> value - top/bottom:
hd -> value
- left/right:
- BFS with a queue storing
1from collections import defaultdict
2from collections import deque
3
4
5class Node:
6 def __init__(self, val, left=None, right=None):
7 self.val = val
8 self.left = left
9 self.right = right
10
11
12def left_view(root):
13 if not root: return []
14 q = deque([(root, 0)])
15 first = {}
16 while q:
17 node, depth = q.popleft()
18 if depth not in first:
19 first[depth] = node.val
20 if node.left: q.append((node.left, depth + 1))
21 if node.right: q.append((node.right, depth + 1))
22 return [first[d] for d in sorted(first)]
23
24
25def right_view(root):
26 if not root: return []
27 q = deque([(root, 0)])
28 last = {}
29 while q:
30 node, depth = q.popleft()
31 last[depth] = node.val
32 if node.left: q.append((node.left, depth + 1))
33 if node.right: q.append((node.right, depth + 1))
34 return [last[d] for d in sorted(last)]
35
36
37def top_view(root):
38 """First node encountered at each horizontal distance (hd)."""
39 if not root: return []
40 q = deque([(root, 0)]) # node, hd
41 seen = {}
42 while q:
43 node, hd = q.popleft()
44 if hd not in seen:
45 seen[hd] = node.val
46 if node.left: q.append((node.left, hd - 1))
47 if node.right: q.append((node.right, hd + 1))
48 return [seen[h] for h in sorted(seen)]
49
50
51def bottom_view(root):
52 """Last node encountered at each horizontal distance (hd)."""
53 if not root: return []
54 q = deque([(root, 0)])
55 seen = {}
56 while q:
57 node, hd = q.popleft()
58 seen[hd] = node.val
59 if node.left: q.append((node.left, hd - 1))
60 if node.right: q.append((node.right, hd + 1))
61 return [seen[h] for h in sorted(seen)]Dynamic Programming#
-
Longest Common Subsequence
- Problem: given strings
A,B, find the longest sequence appearing in both in order (not necessarily contiguous). - Classic DP:
dp[i][j]= LCS length ofA[:i]andB[:j]- Transition:
- if
A[i-1] == B[j-1]:dp[i][j] = 1 + dp[i-1][j-1] - else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
- if
- Time:
O(n*m), Space:O(n*m)(orO(min(n,m))for length only)
- Problem: given strings
1def lcs_length(a: str, b: str) -> int:
2 """Length only, O(n*m) time, O(min(n,m)) space."""
3 if len(a) < len(b):
4 a, b = b, a # make b shorter for less memory
5 prev = [0] * (len(b) + 1)
6 for i in range(1, len(a) + 1):
7 cur = [0] * (len(b) + 1)
8 for j in range(1, len(b) + 1):
9 if a[i - 1] == b[j - 1]:
10 cur[j] = 1 + prev[j - 1]
11 else:
12 cur[j] = max(prev[j], cur[j - 1])
13 prev = cur
14 return prev[-1]
15
16
17def lcs_string(a: str, b: str) -> str:
18 """Reconstruct one LCS, O(n*m) time and space."""
19 n, m = len(a), len(b)
20 dp = [[0] * (m + 1) for _ in range(n + 1)]
21
22 for i in range(1, n + 1):
23 for j in range(1, m + 1):
24 if a[i - 1] == b[j - 1]:
25 dp[i][j] = 1 + dp[i - 1][j - 1]
26 else:
27 dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])
28
29 # backtrack
30 i, j = n, m
31 out = []
32 while i > 0 and j > 0:
33 if a[i - 1] == b[j - 1]:
34 out.append(a[i - 1])
35 i -= 1
36 j -= 1
37 elif dp[i - 1][j] >= dp[i][j - 1]:
38 i -= 1
39 else:
40 j -= 1
41
42 return "".join(reversed(out))Graph Algorithms#
Traversal: Depth First Search#
- Explore as far as possible before backtracking.
- Uses: connected components, cycle detection, topological sort, bridges/articulation points.
- Time:
O(|V| + |E|)

Traversal: Breadth First Search#
- Level-by-level exploration.
- Uses: shortest paths in unweighted graphs, bipartite check.
- Time:
O(|V| + |E|)

Topological Sort#
- Linear ordering such that for every edge
u -> v,ucomes beforev. - Two common methods:
- Kahn’s algorithm (in-degree + queue)
- DFS finishing times (reverse postorder)
- Time:
O(|V| + |E|) - If you cannot process all nodes (Kahn) ⇒ graph has a cycle.
Shortest path: Dijkstra#
- Single-source shortest paths with non-negative edge weights.
- Typical implementation: adjacency list + min-heap (priority queue).
- Complexity:
- With heap:
O((V+E) log V)(typical) - With array:
O(V^2)(dense graphs)
- With heap:
- Pitfalls:
- Do not use with negative weights.
- Use “lazy deletion” in heap: ignore outdated
(dist, node)pairs.

1import heapq
2
3
4def dijkstra(n, edges, src):
5 """
6 n: number of nodes labeled [0..n-1]
7 edges: adjacency list {u: [(v, w), ...]}
8 weights must be non-negative
9 Returns: dist list
10 """
11 INF = 10**18
12 dist = [INF] * n
13 dist[src] = 0
14 pq = [(0, src)] # (dist, node)
15
16 while pq:
17 d, u = heapq.heappop(pq)
18 if d != dist[u]:
19 continue # outdated entry
20 for v, w in edges.get(u, []):
21 nd = d + w
22 if nd < dist[v]:
23 dist[v] = nd
24 heapq.heappush(pq, (nd, v))
25
26 return distShortest path: Bellman–Ford#
- Handles negative weights; detects negative cycles.
- Time:
O(|V||E|) - If you can relax on V-th iteration ⇒ negative cycle reachable from source.

Shortest path: Floyd–Warshall#
- All-pairs shortest paths.
- DP over intermediate nodes.
- Time:
O(|V|^3), Space:O(|V|^2) - Also used for transitive closure.
Minimum Spanning Tree (MST): Prim#
- Greedy MST for connected, undirected weighted graph.
- “Grow” a tree by always adding cheapest outgoing edge.
- Time:
O(E log V)with heap.

1# A Python program for Prims's MST for
2# adjacency list representation of graph
3# Source GeeksForGeeks
4
5from collections import defaultdict
6import sys
7
8
9class Heap():
10 def __init__(self):
11 self.array = []
12 self.size = 0
13 self.pos = []
14
15 def newMinHeapNode(self, v, dist):
16 minHeapNode = [v, dist]
17 return minHeapNode
18
19 # A utility function to swap two nodes of
20 # min heap. Needed for min heapify
21 def swapMinHeapNode(self, a, b):
22 t = self.array[a]
23 self.array[a] = self.array[b]
24 self.array[b] = t
25
26 # A standard function to heapify at given idx
27 # This function also updates position of nodes
28 # when they are swapped. Position is needed
29 # for decreaseKey()
30 def minHeapify(self, idx):
31 smallest = idx
32 left = 2 * idx + 1
33 right = 2 * idx + 2
34
35 if left < self.size and self.array[left][1] < self.array[smallest][1]:
36 smallest = left
37
38 if right < self.size and self.array[right][1] < self.array[smallest][1]:
39 smallest = right
40
41 # The nodes to be swapped in min heap
42 # if idx is not smallest
43 if smallest != idx:
44
45 # Swap positions
46 self.pos[self.array[smallest][0]] = idx
47 self.pos[self.array[idx][0]] = smallest
48
49 # Swap nodes
50 self.swapMinHeapNode(smallest, idx)
51
52 self.minHeapify(smallest)
53
54 # Standard function to extract minimum node from heap
55 def extractMin(self):
56
57 # Return NULL wif heap is empty
58 if self.isEmpty() == True:
59 return
60
61 # Store the root node
62 root = self.array[0]
63
64 # Replace root node with last node
65 lastNode = self.array[self.size - 1]
66 self.array[0] = lastNode
67
68 # Update position of last node
69 self.pos[lastNode[0]] = 0
70 self.pos[root[0]] = self.size - 1
71
72 # Reduce heap size and heapify root
73 self.size -= 1
74 self.minHeapify(0)
75
76 return root
77
78 def isEmpty(self):
79 return True if self.size == 0 else False
80
81 def decreaseKey(self, v, dist):
82
83 # Get the index of v in heap array
84
85 i = self.pos[v]
86
87 # Get the node and update its dist value
88 self.array[i][1] = dist
89
90 # Travel up while the complete tree is not
91 # hepified. This is a O(Logn) loop
92 while i > 0 and self.array[i][1] < self.array[(i - 1) / 2][1]:
93
94 # Swap this node with its parent
95 self.pos[self.array[i][0]] = (i - 1) / 2
96 self.pos[self.array[(i - 1) / 2][0]] = i
97 self.swapMinHeapNode(i, (i - 1) / 2)
98
99 # move to parent index
100 i = (i - 1) / 2
101
102 # A utility function to check if a given vertex
103 # 'v' is in min heap or not
104 def isInMinHeap(self, v):
105
106 if self.pos[v] < self.size:
107 return True
108 return False
109
110
111def printArr(parent, n):
112 for i in range(1, n):
113 print("% d - % d" % (parent[i], i))
114
115
116class Graph():
117 def __init__(self, V):
118 self.V = V
119 self.graph = defaultdict(list)
120
121 # Adds an edge to an undirected graph
122 def addEdge(self, src, dest, weight):
123
124 # Add an edge from src to dest. A new node is
125 # added to the adjacency list of src. The node
126 # is added at the begining. The first element of
127 # the node has the destination and the second
128 # elements has the weight
129 newNode = [dest, weight]
130 self.graph[src].insert(0, newNode)
131
132 # Since graph is undirected, add an edge from
133 # dest to src also
134 newNode = [src, weight]
135 self.graph[dest].insert(0, newNode)
136
137 # The main function that prints the Minimum
138 # Spanning Tree(MST) using the Prim's Algorithm.
139 # It is a O(ELogV) function
140 def PrimMST(self):
141 # Get the number of vertices in graph
142 V = self.V
143
144 # key values used to pick minimum weight edge in cut
145 key = []
146
147 # List to store contructed MST
148 parent = []
149
150 # minHeap represents set E
151 minHeap = Heap()
152
153 # Initialize min heap with all vertices. Key values of all
154 # vertices (except the 0th vertex) is is initially infinite
155 for v in range(V):
156 parent.append(-1)
157 key.append(sys.maxsize)
158 minHeap.array.append(minHeap.newMinHeapNode(v, key[v]))
159 minHeap.pos.append(v)
160
161 # Make key value of 0th vertex as 0 so
162 # that it is extracted first
163 minHeap.pos[0] = 0
164 key[0] = 0
165 minHeap.decreaseKey(0, key[0])
166
167 # Initially size of min heap is equal to V
168 minHeap.size = V
169
170 # In the following loop, min heap contains all nodes
171 # not yet added in the MST.
172 while minHeap.isEmpty() == False:
173
174 # Extract the vertex with minimum distance value
175 newHeapNode = minHeap.extractMin()
176 u = newHeapNode[0]
177
178 # Traverse through all adjacent vertices of u
179 # (the extracted vertex) and update their
180 # distance values
181 for pCrawl in self.graph[u]:
182
183 v = pCrawl[0]
184
185 # If shortest distance to v is not finalized
186 # yet, and distance to v through u is less than
187 # its previously calculated distance
188 if minHeap.isInMinHeap(v) and pCrawl[1] < key[v]:
189 key[v] = pCrawl[1]
190 parent[v] = u
191
192 # update distance value in min heap also
193 minHeap.decreaseKey(v, key[v])
194
195 printArr(parent, V)
196
197
198# Driver program to test the above functions
199graph = Graph(9)
200graph.addEdge(0, 1, 4)
201graph.addEdge(0, 7, 8)
202graph.addEdge(1, 2, 8)
203graph.addEdge(1, 7, 11)
204graph.addEdge(2, 3, 7)
205graph.addEdge(2, 8, 2)
206graph.addEdge(2, 5, 4)
207graph.addEdge(3, 4, 9)
208graph.addEdge(3, 5, 14)
209graph.addEdge(4, 5, 10)
210graph.addEdge(5, 6, 2)
211graph.addEdge(6, 7, 1)
212graph.addEdge(6, 8, 6)
213graph.addEdge(7, 8, 7)
214graph.PrimMST()Minimum Spanning Tree (MST): Kruskal#
- Sort edges by weight; add if it doesn’t form a cycle (DSU/Union-Find).
- Time:
O(E log E)(sorting dominates).

1# Python program for Kruskal's algorithm to find
2# Minimum Spanning Tree of a given connected,
3# undirected and weighted graph. Source: GeeksForGeeks.
4
5
6#Class to represent a graph
7class Graph:
8 def __init__(self, vertices):
9 self.V = vertices #No. of vertices
10 self.graph = []
11
12 # function to add an edge to graph
13 def addEdge(self, u, v, w):
14 self.graph.append([u, v, w])
15
16 # A utility function to find set of an element i
17 # (uses path compression technique)
18 def find(self, parent, i):
19 if parent[i] == i:
20 return i
21 return self.find(parent, parent[i])
22
23 # A function that does union of two sets of x and y
24 # (uses union by rank)
25 def union(self, parent, rank, x, y):
26 xroot = self.find(parent, x)
27 yroot = self.find(parent, y)
28
29 # Attach smaller rank tree under root of
30 # high rank tree (Union by Rank)
31 if rank[xroot] < rank[yroot]:
32 parent[xroot] = yroot
33 elif rank[xroot] > rank[yroot]:
34 parent[yroot] = xroot
35
36 # If ranks are same, then make one as root
37 # and increment its rank by one
38 else:
39 parent[yroot] = xroot
40 rank[xroot] += 1
41
42 # The main function to construct MST using Kruskal's
43 # algorithm
44 def KruskalMST(self):
45
46 result = [] #This will store the resultant MST
47
48 i = 0 # An index variable, used for sorted edges
49 e = 0 # An index variable, used for result[]
50
51 # Step 1: Sort all the edges in non-decreasing
52 # order of their
53 # weight. If we are not allowed to change the
54 # given graph, we can create a copy of graph
55 self.graph = sorted(self.graph, key=lambda item: item[2])
56
57 parent = []
58 rank = []
59
60 # Create V subsets with single elements
61 for node in range(self.V):
62 parent.append(node)
63 rank.append(0)
64
65 # Number of edges to be taken is equal to V-1
66 while e < self.V - 1:
67
68 # Step 2: Pick the smallest edge and increment
69 # the index for next iteration
70 u, v, w = self.graph[i]
71 i = i + 1
72 x = self.find(parent, u)
73 y = self.find(parent, v)
74
75 # If including this edge does't cause cycle,
76 # include it in result and increment the index
77 # of result for next edge
78 if x != y:
79 e = e + 1
80 result.append([u, v, w])
81 self.union(parent, rank, x, y)
82 # Else discard the edge
83
84 # print the contents of result[] to display the built MST
85 print("Following are the edges in the constructed MST")
86 for u, v, weight in result:
87 #print str(u) + " -- " + str(v) + " == " + str(weight)
88 print("%d -- %d == %d" % (u, v, weight))
89
90
91# Driver code
92g = Graph(4)
93g.addEdge(0, 1, 10)
94g.addEdge(0, 2, 6)
95g.addEdge(0, 3, 5)
96g.addEdge(1, 3, 15)
97g.addEdge(2, 3, 4)
98
99g.KruskalMST()Greedy Algorithms#
-
Greedy algorithms pick the locally optimal option at each step.
-
Typically applicable when:
- Optimal substructure
- Greedy choice property (local choice can be extended to global optimum)
-
Examples:
- interval scheduling
- Huffman coding
- MST (Prim/Kruskal)
- Dijkstra (greedy choice on next closest node)
Probability: mapping to a larger sample space#
Sometimes you’re given probabilities on a coarse sample space and must expand to a finer one.
-
Key identity:
- If coarse events partition space:
A1, A2, ... - And fine event is
B:P(B) = Σ P(B | Ai) P(Ai)(Law of Total Probability)
- If coarse events partition space:
-
Mapping rule (coarse → refined outcomes):
- For each coarse outcome
Ai, assign probabilities to refined outcomes{rij}such that:P(rij) = P(Ai) * P(rij | Ai)- and
Σ_j P(rij | Ai) = 1
- For each coarse outcome
-
Common interview pattern:
- “Given P(even)=0.6, P(odd)=0.4, and uniform within parity, compute P(roll=2).”
P(2)=P(even)*1/3=0.2
- “Given P(even)=0.6, P(odd)=0.4, and uniform within parity, compute P(roll=2).”
1def refine_distribution(coarse_probs, conditional_refinement):
2 """
3 coarse_probs: dict coarse_outcome -> P(coarse)
4 conditional_refinement: dict coarse_outcome -> dict refined_outcome -> P(refined | coarse)
5
6 Returns refined_probs: dict refined_outcome -> P(refined)
7
8 Requires each conditional distribution sums to ~1.
9 """
10 refined = {}
11 for coarse, p_coarse in coarse_probs.items():
12 cond = conditional_refinement[coarse]
13 for r, p_r_given_c in cond.items():
14 refined[r] = refined.get(r, 0.0) + p_coarse * p_r_given_c
15 return refinedPiecewise functions: Tax slabs#
-
Tax slabs are typically progressive:
- Income is split into brackets, each taxed at its rate.
- Total tax is sum over brackets.
-
Implementation pattern:
- iterate brackets in ascending order,
- taxable amount in bracket =
min(income, upper) - lower(clamped to ≥ 0), - add
taxable * rate.
-
Pitfalls:
- inclusive/exclusive boundaries,
- handling “no upper bound” last slab,
- rounding rules.
1def tax_progressive(income, slabs):
2 """
3 slabs: list of (lower_inclusive, upper_exclusive_or_None, rate)
4 Example:
5 slabs = [
6 (0, 250000, 0.0),
7 (250000, 500000, 0.05),
8 (500000, 1000000, 0.20),
9 (1000000, None, 0.30),
10 ]
11 """
12 tax = 0.0
13 for lo, hi, rate in slabs:
14 if income <= lo:
15 break
16 upper = income if hi is None else min(income, hi)
17 taxable = max(0.0, upper - lo)
18 tax += taxable * rate
19 if hi is not None and income < hi:
20 break
21 return taxStreams of Data (online algorithms)#
When data arrives continuously and you can’t store everything.
-
Common tasks + standard tools
- Running mean/variance: Welford’s algorithm (
O(1)memory) - Top-k / median:
- top-k: min-heap of size k
- median: two heaps (max-heap left, min-heap right)
- Sliding window max/min: monotonic deque
- Random sampling from unknown-length stream: reservoir sampling
- Membership (approx): Bloom filter
- Frequency (approx): Count-Min Sketch / Misra–Gries (heavy hitters)
- Running mean/variance: Welford’s algorithm (
-
Key concept:
- Choose exact vs approximate depending on memory/latency constraints.
1import random
2
3
4def reservoir_sample(stream, k, rng=random):
5 """
6 Sample k items uniformly from a stream of unknown length.
7 Returns a list of size min(k, n).
8 """
9 reservoir = []
10 for i, x in enumerate(stream, start=1):
11 if i <= k:
12 reservoir.append(x)
13 else:
14 j = rng.randint(1, i)
15 if j <= k:
16 reservoir[j - 1] = x
17 return reservoir 1import math
2
3
4class RunningStats:
5 """Welford's algorithm: online mean/variance in O(1) memory."""
6 def __init__(self):
7 self.n = 0
8 self.mean = 0.0
9 self.M2 = 0.0
10
11 def add(self, x: float):
12 self.n += 1
13 delta = x - self.mean
14 self.mean += delta / self.n
15 delta2 = x - self.mean
16 self.M2 += delta * delta2
17
18 def variance(self):
19 if self.n < 2:
20 return 0.0
21 return self.M2 / (self.n - 1) # sample variance
22
23 def stddev(self):
24 return math.sqrt(self.variance()) 1from collections import deque
2
3
4def sliding_window_max(nums, k):
5 """
6 Monotonic deque: O(n)
7 Returns list of max of each window of size k.
8 """
9 dq = deque() # store indices, values decreasing
10 out = []
11 for i, x in enumerate(nums):
12 while dq and dq[0] <= i - k:
13 dq.popleft()
14 while dq and nums[dq[-1]] <= x:
15 dq.pop()
16 dq.append(i)
17 if i >= k - 1:
18 out.append(nums[dq[0]])
19 return out 1import heapq
2
3
4class RunningMedian:
5 """
6 Two heaps:
7 left: max-heap (store negatives)
8 right: min-heap
9 """
10 def __init__(self):
11 self.left = []
12 self.right = []
13
14 def add(self, x: float):
15 if not self.left or x <= -self.left[0]:
16 heapq.heappush(self.left, -x)
17 else:
18 heapq.heappush(self.right, x)
19
20 # rebalance
21 if len(self.left) > len(self.right) + 1:
22 heapq.heappush(self.right, -heapq.heappop(self.left))
23 elif len(self.right) > len(self.left):
24 heapq.heappush(self.left, -heapq.heappop(self.right))
25
26 def median(self):
27 if not self.left and not self.right:
28 return None
29 if len(self.left) == len(self.right):
30 return (-self.left[0] + self.right[0]) / 2.0
31 return -self.left[0]