Stripe是一家全球领先的金融科技公司,总部位于旧金山,短短十余年间,已经从一家初创成长为全球最大的独角兽公司,在多处都设有办事处。 Stripe OA 相比amazon,google等难一点。但是经常练真题还是能在半小时内A出来。如果OA过了很快就能发vo。还是值得试一试,北美等oa/vo不熟的都可以问。
分享2道 Stripe OA 经典题(包含解答),都是2026 Intern 最近的题。
Stripe OA Q1
题目模拟负载均衡的状态题,按请求顺序处理 WebSocket 连接。先实现最少连接数的负载均衡,再支持连接断开,最后引入基于 objectId 的粘性路由,保证同一个 notebook 始终落在同一台 Jupyter server 上,即使牺牲负载均衡。
Part 1:基础负载均衡(test cases 1–4)
规则:
- 对每个 CONNECT 请求,选择当前活跃连接数最少的 server
- 如果有多个 server 并列最少,选index 最小的
- 不考虑用户、object,只按当前负载
Part 2:处理断连(test cases 5–8)
在 Part 1 基础上,新增:
DISCONNECT 请求
- 表示某个 connection 从之前分配的 server 上断开
- 需要:
- 找到这个 connection 当初连到的 server
- 将该 server 的活跃连接数减 1
假设:所有 DISCONNECT 都是合法的(一定对应之前的 CONNECT)
Part 3:基于 objectId 的路由(test cases 9–11)
新增一个非常关键的约束:
如果多个连接使用同一个 objectId(同一个 notebook),
那么它们必须被路由到同一个 server。
原因:Jupyter server 内存中保存 notebook 状态,同一个 object 必须落在同一个 server,保证一致性。
解题思路
- 维护每台服务器的活跃连接数
cnt、连接明细con、以及对象到目标机的绑定obj_to_t与引用计数obj_use。 CONNECT:若对象已绑定则必须发往该机;否则在未禁用且未满的机器里选“最小负载,若并列取较小下标”。成功时输出一条日志。DISCONNECT:从所在机器移除并更新对象引用计数;若对象计数归零则解除绑定。SHUTDOWN:把目标机上的连接按“连接ID首次出现的顺序”驱逐,临时将该机加入disabled,先全部断开再按既有规则逐个重连(依然受对象固定与容量限制约束),最后解除禁用。
代码
def route_request(numTargets, maxConnectionsPerTarget, requests):
n, cap = numTargets, maxConnectionsPerTarget
cnt = [0] * n
con = {}
per_t = [set() for _ in range(n)]
obj_to_t = {}
obj_use = {}
disabled = set()
logs = []
seen_order = {}
order_seq = 0
def pick_target(exclude_disabled=True):
best_t, best_load = None, None
for i in range(n):
if exclude_disabled and i in disabled:
continue
if cnt[i] >= cap:
continue
if best_t is None or cnt[i] < best_load or (cnt[i] == best_load and i < best_t):
best_t, best_load = i, cnt[i]
return best_t
def attach(cid, uid, oid):
if oid in obj_to_t:
t = obj_to_t[oid]
if t in disabled or cnt[t] >= cap:
return False
else:
t = pick_target()
if t is None:
return False
con[cid] = (t, uid, oid)
per_t[t].add(cid)
cnt[t] += 1
obj_use[oid] = obj_use.get(oid, 0) + 1
obj_to_t[oid] = t
logs.append(f"{cid},{uid},{t+1}")
return True
def detach(cid):
v = con.pop(cid, None)
if not v:
return
t, _, oid = v
if cid in per_t[t]:
per_t[t].remove(cid)
cnt[t] -= 1
k = obj_use.get(oid, 0) - 1
if k <= 0:
obj_use.pop(oid, None)
obj_to_t.pop(oid, None)
else:
obj_use[oid] = k
for line in requests:
parts = [x.strip() for x in line.split(",")]
op = parts[0]
if op == "CONNECT":
_, cid, uid, oid = parts
if cid not in seen_order:
order_seq += 1
seen_order[cid] = order_seq
attach(cid, uid, oid)
elif op == "DISCONNECT":
_, cid, uid, oid = parts
if cid in con:
detach(cid)
else:
_, idx = parts
t = int(idx) - 1
disabled.add(t)
evict_list = sorted(
[cid for cid in per_t[t] if cid in con and con[cid][0] == t],
key=lambda c: (seen_order.get(c, float("inf")), c)
)
triples = []
for cid in evict_list:
_, uid, oid = con[cid]
triples.append((cid, uid, oid))
for cid in evict_list:
detach(cid)
per_t[t].clear()
for cid, uid, oid in triples:
attach(cid, uid, oid)
disabled.discard(t)
return logsStripe OA Q2
题目是一个风控规则引擎题,需要根据交易和对应规则,对商户风险分进行多轮全量更新。先初始化 base score,再按规则分别处理金额阈值的乘法更新,以及同一用户高频交易的加法累积,最后按商户字典序输出结果。
规则 1:金额阈值 × 乘法因子
- 如果某笔交易的
amount > rule.min_transaction_amount - 则:
current_score *= rule.multiplicative_factor
规则 2:同一用户高频交易 + 加法因子
- 如果 同一个 customer_id 对 同一个 merchant_id 的交易次数 ≥ 3
- 则:
- 从第 3 笔开始
- 累加对应规则的 additive_factor
- 并且是累积生效
解题思路
- 先读取
merchants_list,初始化每个商家的base_score。 - 将
transactions_list和rules_list按索引配对,得到包含交易与对应规则的对象。 - 分三次遍历:① 若交易金额大于阈值,乘上
multiplicative_factor;② 对同一商家+客户的交易数≥3,则累加所有additive_factor;③ 对同一商家+客户+小时的交易数≥3,根据小时区间决定加/减penalty。 - 最后按商家 ID 字典序输出
"merchant,score"。
代码
from typing import List, Dict, Tuple
def calculate_merchant_fraud_score(transactions_list: List[str],
rules_list: List[str],
merchants_list: List[str]) -> List[str]:
score: Dict[str, int] = {}
merchants: List[str] = []
for s in merchants_list:
m, base = [x.strip() for x in s.split(',', 1)]
score[m] = int(base)
merchants.append(m)
txs: List[Tuple[str, int, str, int, int, int, int, int]] = []
for i in range(len(transactions_list)):
t0, a0, c0, h0 = [x.strip() for x in transactions_list[i].split(',')]
r0 = [x.strip() for x in rules_list[i].split(',')]
txs.append((t0, int(a0), c0, int(h0),
int(r0[0]), int(r0[1]), int(r0[2]), int(r0[3])))
for m, amt, c, h, min_amt, mul, add, pen in txs:
if m in score and amt > min_amt:
score[m] = score[m] * mul
cnt_pair: Dict[str, int] = {}
sum_add: Dict[str, int] = {}
for m, amt, c, h, min_amt, mul, add, pen in txs:
if m not in score:
continue
key = f"{m}#{c}"
cnt_pair[key] = cnt_pair.get(key, 0) + 1
sum_add[key] = sum_add.get(key, 0) + add
for key, cnt in cnt_pair.items():
if cnt >= 3:
m = key.split('#', 1)[0]
score[m] = score.get(m, 0) + sum_add.get(key, 0)
cnt_hour: Dict[str, int] = {}
sum_pen: Dict[str, int] = {}
hour_by_key: Dict[str, int] = {}
for m, amt, c, h, min_amt, mul, add, pen in txs:
if m not in score:
continue
key = f"{m}#{c}#{h}"
cnt_hour[key] = cnt_hour.get(key, 0) + 1
sum_pen[key] = sum_pen.get(key, 0) + pen
hour_by_key.setdefault(key, h)
for key, cnt in cnt_hour.items():
if cnt >= 3:
h = hour_by_key[key]
sign = 1 if 12 <= h <= 17 else (-1 if (9 <= h <= 11) or (18 <= h <= 21) else 0)
if sign:
m = key.split('#', 1)[0]
score[m] = score.get(m, 0) + sign * sum_pen[key]
merchants.sort()
return [f"{m},{score.get(m, 0)}" for m in merchants]Stripe 的OA做完很快会有结果,这个简历投递是10月月初投递,10/13收到OA,当天完成,11/17 Tech Screen Invite,11/21 Tech Screen,11/27 VO Invite,果然非大厂只要简历硬+OA全AC,不用refer也能拿面。Stripe的后续面试也已经整理好了,可以参考。
了解更多
Stripe 面试 过程里,面试官更看重解题思路的表达和技术细节的思考,Coding时边写边讲会很加分 Stripe VO的节奏很稳定,Stripe的Tag题目也比较多,相比其他公司,难度和流程都比较容易掌握。最近也面了Stripe、亚马逊、TikTok等公司的VO,基本都顺利通过。每轮下来都比较顺利,没把握的可以多练习。
Interview Aid 专注北美技术岗位的面试、OA帮助,如果你近期也拿到Stripe的面试,可以参考Stripe 一亩三分地的Timeline,也可以与我们联系获取相关资料,同时我们免费提供Stripe真题参考。