在金融交易、尤其是数字货币和新兴市场中,“自成交”(Wash Trade)是一种古老但愈发猖獗的市场操纵行为。它通过创造虚假的交易活跃度来误导投资者,扭曲资产的真实价值。本文面向具备扎实工程基础的中高级工程师与架构师,旨在从计算机科学第一性原理出发,系统性地剖析自成交的识别挑战,并层层递进,设计一套从简单批处理到高实时性、可演进的流式风控系统。我们将深入探讨图论、状态管理、流式计算等核心原理,并给出关键的实现代码与架构权衡,最终形成一套能在生产环境中落地的完整方案。
现象与问题背景
自成交,或称“刷量”、“对敲”,是指单一实体或关联实体同时扮演买方和卖方,与自身或关联方进行交易。这种行为本身不创造任何经济价值,其目的在于制造虚假的交易量和市场繁荣景象。在股票市场,这是被严格禁止的违法行为。但在监管尚不完善的数字货币交易所、NFT 市场或一些电商平台的刷单场景中,它却屡见不鲜。
其核心动机包括:
- 吸引流动性:一个新的交易所或交易对上线初期,通过机器人刷量,使其在行情数据网站(如 CoinMarketCap)上排名靠前,吸引真实用户入场。
- 价格操纵:通过在特定价格区间内密集对敲,制造价格支撑或阻力的假象,诱导其他交易者追涨杀跌。
- 获得交易挖矿奖励:部分平台会根据交易量返还平台币或手续费,自成交成为低成本套利的方式。
从技术视角看,识别自成交的难点在于其模式的复杂性和隐蔽性。简单的“账户A买,账户A卖”早已过时。攻击者会使用一系列复杂的策略来规避检测:
- 多账户关联:使用同一实控人下的多个账户(例如,同一身份认证信息、同一设备指纹、同一充提币地址)进行对敲。
- 环路交易:不仅仅是 A 与 B 之间,可能形成 A → B → C → A 的交易环路。
- 时间延迟:交易并非瞬间完成,而是在一段时间内交错进行,以模仿真实市场行为。
- 价格伪装:交易价格会轻微波动,而非固定在一个点上。
因此,一个有效的风控系统必须能够穿透这些伪装,从海量的交易数据中,近乎实时地识别出这些潜在的操纵行为。这不仅是合规要求,更是维护平台信誉和用户资产安全的核心能力。
关键原理拆解
在设计具体的系统之前,我们必须回归到计算机科学的基础理论。自成交识别本质上是一个在海量、高速的数据流中进行模式匹配与异常检测的问题。其背后依赖于几个核心的理论基石。
第一性原理:图论与环路检测 (Graph Theory & Cycle Detection)
我们可以将整个交易市场抽象成一个有向图 G = (V, E)。其中,顶点 V 代表交易账户,有向边 E 代表一笔交易(从买方指向卖方)。每一笔交易 (u, v) 代表账户 u 从账户 v 购买了资产。在这个模型下,最简单的自成交 (A → B, B → A) 在图中构成了一个长度为 2 的环。更复杂的模式 (A → B → C → A) 则是一个长度为 3 的环。
因此,自成交识别问题在理论上可以被转化为一个动态图中的加权环路检测问题。这里的“动态”意味着图的边是随时间不断增加的。“加权”可以指交易量、金额等。然而,在全市场范围内实时构建图并执行标准的环路检测算法(如基于深度优先搜索 DFS)的计算复杂度是极高的。一个拥有百万用户的交易所,其图的规模巨大,DFS 的时间复杂度为 O(V+E),对于流式数据来说是不可接受的。工程实践中,我们必须寻求剪枝和优化的方法,例如,将检测范围限制在“关联账户”这个子图内。
第二性原理:状态流处理 (Stateful Stream Processing)
交易数据是典型的无限数据流。我们需要在一个时间窗口内,记住“谁和谁交易过”,以便在反向交易出现时进行匹配。这正是状态流处理的核心。当一笔交易 A → B 到达时,我们需要在处理节点的状态中记录这个事件。当后续一笔交易 B → A 到达时,我们查询状态,若发现之前的记录,则匹配成功。
这里的核心是状态(State)和时间窗口(Window)。状态需要被高效地存储和访问,通常是键控状态(keyed state),例如以 `(user_id, trading_pair)` 作为 key。时间窗口定义了我们“记忆”的有效期。例如,我们只关心 1 小时内发生的反向交易。这就引出了一个关键的工程问题:状态管理。状态存在哪里?内存还是磁盘?如何保证故障恢复?现代流处理框架如 Apache Flink 提供了基于检查点(Checkpoint)机制的精确一次(Exactly-once)状态一致性保证,其底层通常使用像 RocksDB 这样的嵌入式 KV 存储来管理大于内存的庞大状态,这是实现可靠检测的基础。
第三性原理:统计学与异常检测 (Statistics & Anomaly Detection)
高级的自成交行为不会留下明显的环路特征,但其统计学特征往往与正常交易行为有显著差异。这需要我们从统计学角度建立正常行为的基线,并识别出异常点。
- 交易频率与周期性:机器人刷量往往呈现出非自然的、高度规律的交易频率。通过傅里叶变换等时序分析方法可以检测这种周期性。
- 盈亏分析:在忽略手续费的情况下,自成交的总盈亏应趋近于零。我们可以计算一个账户在某个时间窗口内,针对特定对手方的净损益,如果长期接近零,则嫌疑增大。
- 价格偏离度:自成交的价格通常紧贴市场中间价,很少出现偏离较大的市价单。计算其交易价格与市场最优报价(BBO)的偏离度的分布,可以作为一个有效特征。
这些统计特征的计算同样需要在流式环境中完成,通常通过在不同的时间窗口(滚动、滑动窗口)上进行聚合运算(如 COUNT, AVG, STDDEV)来实现。
系统架构总览
基于以上原理,我们设计一个分层、可演进的自成交检测系统。架构的核心是围绕一个强大的流处理平台构建,并辅以高效的数据管道和存储组件。
以下是该系统的逻辑架构图的文字描述:
- 数据源 (Data Source): 交易核心的撮合引擎(Matching Engine)是原始数据的生产者。每当一笔交易撮合成功,它会产生一条成交记录(Trade Log)。
- 数据总线 (Data Bus): 所有成交记录被实时发送到 Apache Kafka 集群。Kafka 作为高吞吐、低延迟的分布式消息队列,充当了整个系统的数据总线,实现了撮合引擎与风控系统的解耦。数据按交易对(如 BTC/USDT)进行分区(Partitioning),保证同一交易对的撮合顺序。
- 流处理引擎 (Stream Processing Engine): Apache Flink 是这个架构的核心。一个 Flink 集群消费 Kafka 中的交易流。Flink 作业(Job)内部会执行多个逻辑步骤:
- 数据清洗与关联:消费原始 Trade Log,并实时关联用户画像数据(如用户实控人 ID、设备指纹等),将匿名账户ID关联到具体的实体上。
- 规则/特征计算:应用一系列算法,包括基于图的环路检测和基于窗口的统计特征计算。
- 模型推理(可选):对于更高级的场景,可以加载预训练的机器学习模型(如 GNN、Isolation Forest),进行实时推理。
- 状态与元数据存储 (State & Metadata Storage):
- Flink 状态后端 (State Backend): Flink 自身管理大量的运算状态。对于高性能场景,使用 RocksDBStateBackend,它将状态数据存储在本地磁盘,并通过 Checkpoint 机制备份到 HDFS 或 S3 等分布式文件系统,实现容错。
- 外部存储 (External Storage): Redis 或其他高速 KV 存储用于存放需要跨作业或被外部服务频繁查询的元数据,例如用户的风险等级、账户关联关系等。
- 分析与存储层 (Analytical & Storage Layer):
- Flink 计算出的疑似自成交事件(我们称之为“风险信号”)被发送到另一个 Kafka Topic。
- ClickHouse 或类似的高性能列式数据库订阅这个 Topic,将风险信号持久化。ClickHouse 极快的聚合查询性能非常适合风控分析师进行后续的深入调查和报表生成。
- 处置与告警 (Action & Alerting): 一个独立的微服务(Alerting Service)消费风险信号,根据预设的规则执行处置动作,例如:发送邮件/短信告警、自动冻结可疑账户的提币权限、或者将高风险交易从交易量统计中剔除(即“清洗”)。
核心模块设计与实现
我们深入到 Flink 作业内部,看看最关键的两个识别模块如何用代码实现。这里我们使用 Flink 的 DataStream API 作为示例。
模块一:基于关联实体的实时环路检测
这是最直接也最有效的检测手段。我们的目标是发现“同一实控人下的两个账户在短时间内互相交易”。
首先,数据模型需要被丰富。原始的 Trade Log 只有 `taker_account_id` 和 `maker_account_id`,我们需要将它们 enrich 为 `taker_user_id` 和 `maker_user_id`。
// 1. 数据模型
public class EnrichedTrade {
public long tradeId;
public String symbol; // 交易对 e.g., BTC_USDT
public long buyerUserId; // 买方实控人ID
public long sellerUserId; // 卖方实控人ID
public BigDecimal price;
public BigDecimal quantity;
public long timestamp;
}
// 2. Flink 作业逻辑
DataStream<EnrichedTrade> tradeStream = source
.flatMap(new UserEnrichmentFlatMap()); // 关联用户实控人ID
// 过滤出买卖双方为同一实控人的交易
DataStream<EnrichedTrade> selfTrades = tradeStream
.filter(trade -> trade.buyerUserId == trade.sellerUserId);
// 关键逻辑:检测 A->B, B->A 对敲
// Key by (实控人ID, 交易对)
KeyedStream<EnrichedTrade, Tuple2<Long, String>> keyedStream = selfTrades
.keyBy(trade -> Tuple2.of(trade.buyerUserId, trade.symbol));
// 使用 ProcessFunction 进行有状态的检测
DataStream<WashTradeAlert> alerts = keyedStream.process(new PairMatchingProcessor(Time.minutes(5)));
这里的 `PairMatchingProcessor` 是核心。它需要维护一个状态,记录下“谁在什么时间点买了什么”。当一个反向交易来临时,它就去查询这个状态。
public class PairMatchingProcessor extends KeyedProcessFunction<Tuple2<Long, String>, EnrichedTrade, WashTradeAlert> {
// 状态句柄:存储买方账户 -> (卖方账户, 时间戳)
// Key: buyer_account_id, Value: Map<seller_account_id, timestamp>
// 我们用一个嵌套的Map来记录交易方向,实际生产中会更复杂
private transient MapState<Long, Map<Long, Long>> recentTrades;
private final long windowMillis;
public PairMatchingProcessor(Time window) {
this.windowMillis = window.toMilliseconds();
}
@Override
public void open(Configuration parameters) {
MapStateDescriptor<Long, Map<Long, Long>> descriptor =
new MapStateDescriptor<>("recent-trades", Long.class, Map.class);
recentTrades = getRuntimeContext().getMapState(descriptor);
}
@Override
public void processElement(EnrichedTrade trade, Context ctx, Collector<WashTradeAlert> out) throws Exception {
long buyer = trade.buyerAccountId; // 假设EnrichedTrade中有账户ID
long seller = trade.sellerAccountId;
long currentTime = ctx.timestamp();
// 检查是否存在反向交易: seller -> buyer
Map<Long, Long> reverseTrades = recentTrades.get(seller);
if (reverseTrades != null && reverseTrades.containsKey(buyer)) {
long previousTimestamp = reverseTrades.get(buyer);
if (currentTime - previousTimestamp < windowMillis) {
out.collect(new WashTradeAlert(trade.buyerUserId, trade.symbol, trade.tradeId, ...));
// 匹配成功后可以清理状态避免重复告警
reverseTrades.remove(buyer);
if (reverseTrades.isEmpty()) {
recentTrades.remove(seller);
} else {
recentTrades.put(seller, reverseTrades);
}
return;
}
}
// 未找到反向交易,将当前交易存入状态
Map<Long, Long> trades = recentTrades.get(buyer);
if (trades == null) {
trades = new HashMap<>();
}
trades.put(seller, currentTime);
recentTrades.put(buyer, trades);
// 注册一个定时器,用于清理过期的状态
// 这是一个非常重要的工程实践,防止状态无限增长
ctx.timerService().registerEventTimeTimer(currentTime + windowMillis);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<WashTradeAlert> out) throws Exception {
// 当定时器触发时,清理早于 (timestamp - windowMillis) 的状态
// 遍历 recentTrades,删除过期的条目。实现略。
// 这是保证状态大小可控的关键,否则内存会爆炸。
}
}
极客工程师点评: 上面的代码看似简单,但魔鬼在细节中。`onTimer` 的状态清理逻辑是生产环境的命脉。如果清理不及时或逻辑错误,Flink 作业的 state 会无限膨胀,最终导致 checkpoint 失败和作业崩溃。另外,`MapState` 的 value 如果是一个巨大的 Map,序列化和反序列化的开销会很大。在极端情况下,可能需要考虑更优化的数据结构,或者使用 RocksDB 的列族(Column Family)特性来管理状态。
模块二:基于滑动窗口的统计特征计算
对于更狡猾的对手,我们需要计算统计特征。例如,计算一个用户在特定交易对上 5 分钟内的“自成交率”(自成交量 / 总交易量)和“盈亏比”。
// Key by (实控人ID, 交易对)
keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new TradeStatsAggregator())
.filter(stats -> stats.selfTradeRatio > 0.8 && Math.abs(stats.pnl) < 1.0) // 规则:自成交率 > 80% 且盈亏接近于0
.map(stats -> new WashTradeAlert(...)); // 生成告警
// 自定义聚合器
public class TradeStatsAggregator implements AggregateFunction<EnrichedTrade, TradeStatsAccumulator, TradeStatsResult> {
@Override
public TradeStatsAccumulator createAccumulator() {
return new TradeStatsAccumulator();
}
@Override
public TradeStatsAccumulator add(EnrichedTrade value, TradeStatsAccumulator acc) {
acc.totalVolume += value.quantity.doubleValue();
acc.totalTurnover += value.price.multiply(value.quantity).doubleValue();
if (value.buyerUserId == value.sellerUserId) {
acc.selfTradeVolume += value.quantity.doubleValue();
}
// PNL 计算逻辑...
return acc;
}
@Override
public TradeStatsResult getResult(TradeStatsAccumulator acc) {
// 计算最终结果
return new TradeStatsResult(acc.selfTradeVolume / acc.totalVolume, acc.pnl, ...);
}
@Override
public TradeStatsAccumulator merge(TradeStatsAccumulator a, TradeStatsAccumulator b) {
// 合并逻辑...
return ...;
}
}
极客工程师点评: 滑动窗口的计算开销很大,因为每个元素会属于多个窗口。`SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1))` 意味着每分钟计算一次过去 5 分钟的数据。如果数据量巨大,这会导致严重的性能问题。在实践中,需要仔细评估窗口大小和滑动步长。有时候,我们会用一些近似算法,或者用 `ProcessFunction` 手动实现更灵活的窗口逻辑,以牺牲一些精度换取性能。
性能优化与高可用设计
一个生产级的风控系统,不仅要算得准,还要跑得稳、跑得快。
- 对抗层 (Trade-off 分析):
- 实时性 vs. 资源成本: 毫秒级的实时检测需要巨大的内存和 CPU 资源来维护状态和执行计算。而分钟级的近实时检测,则允许更多的批处理和优化,成本显著降低。业务上需要明确,我们是需要在交易完成的瞬间就阻止它(通常做不到),还是在几分钟内发现并处置。绝大多数场景是后者。
– 准确率 vs. 召回率: 规则设得太严(如,必须是 A->B, B->A 的完美匹配),会导致很多变种被漏掉(低召回率)。规则设得太松(如,只要是同一实控人下的账户交易就算),会导致大量正常交易被误报(低准确率)。这是一个持续对抗和调优的过程,通常会引入风险分值的概念,而不是简单的“是/否”。
- 状态访问优化: 避免在 Flink 的状态中存储大的对象。尽量使用原始类型或简单的 POJO。对于 RocksDB,调整其内存、block cache、compaction 策略是深水区,但对性能至关重要。
- 数据倾斜处理: 如果某个交易对或某个用户的交易量特别大(例如,做市商),会导致 Flink 的某些 sub-task 成为瓶颈。需要进行两阶段聚合(先在局部预聚合,再全局聚合)或为热点 key 加盐(salting)来打散负载。
- 背压(Backpressure)监控: 监控 Flink UI 上的背压指示,如果出现背压,说明下游处理不过来。需要从数据源(Kafka 分区数)、算子并行度、状态操作效率等方面综合排查。
- Flink Checkpointing/Savepointing: 必须开启 Checkpoint,并将其存储在 HDFS/S3 等高可用的分布式文件系统上。这是 Flink 作业故障后能从上一个一致性状态恢复的唯一保证。
- Kafka 数据可靠性: Kafka Topic 的副本数(replication-factor)至少为 3,并配置 `acks=all` 来保证消息不丢失。
- 端到端 Exactly-once: 通过 Flink 的两阶段提交 Sink(TwoPhaseCommitSink)或幂等写入,可以实现从 Kafka 源到 ClickHouse 目标端的端到端精确一次语义,确保风险信号不重不丢。
架构演进与落地路径
没有一个复杂的系统是一蹴而就的。自成交风控系统的建设也应遵循演进式架构的思路。
第一阶段:离线批处理 (The MVP)
在项目初期,不需要立刻上 Flink。最快的方式是:将每日的交易日志(T+1)导入到 Hive、Spark SQL 或 ClickHouse 中。然后,风控分析师可以直接编写 SQL 语句来挖掘可疑模式。例如,用自连接(self-join)找出两个账户在一天内的双向交易。这种方式延迟高,但实现成本极低,可以快速验证规则的有效性,并为后续的实时系统积累经验和标签数据。
-- ClickHouse 示例 SQL
SELECT
t1.buyer_user_id,
t1.symbol,
count() AS wash_trade_pairs
FROM trades AS t1
INNER JOIN trades AS t2 ON t1.buyer_user_id = t2.seller_user_id
AND t1.seller_user_id = t2.buyer_user_id
AND t1.symbol = t2.symbol
WHERE
t1.event_date = '2023-10-27' AND t2.event_date = '2023-10-27'
AND t1.buyer_user_id = t1.seller_user_id -- 假设提前关联好了实控人ID
AND t1.trade_id != t2.trade_id
GROUP BY
t1.buyer_user_id,
t1.symbol
HAVING wash_trade_pairs > 10;
第二阶段:实时流计算 (The Core System)
当离线规则被验证,且业务对实时性提出更高要求时,就进入本文重点讨论的阶段。搭建以 Kafka + Flink + ClickHouse 为核心的实时流处理系统。从最简单的环路检测和核心统计指标开始,逐步上线。这个阶段的目标是将检测延迟从小时/天级别降低到秒/分钟级别。
第三阶段:智能化与机器学习 (The Advanced Stage)
道高一尺,魔高一丈。当简单的规则被攻击者摸透后,他们会采用更复杂的策略来规避。此时,需要引入机器学习。第二阶段计算出的各种统计特征(自成交率、盈亏比、价格偏离度、交易频率等)可以作为特征向量,输入到异常检测模型中。
- 无监督学习: 在没有大量标注数据的情况下,可以使用孤立森林(Isolation Forest)、DBSCAN 等算法来发现与正常交易行为模式迥异的“异常团簇”。
- 有监督学习: 如果通过人工分析积累了足够多的黑白样本,可以训练如 GBDT(Gradient Boosting Decision Tree)、XGBoost 等分类模型,来对每一笔交易或每一个用户在某时间段内的行为进行风险评分。
- 图神经网络 (GNN): 对于复杂的环路和团伙作案模式,GNN 提供了更强大的图特征学习能力,能够自动从交易拓扑结构中学习高阶特征,是未来发展的方向。
这个阶段,系统架构会变得更加复杂,需要引入特征平台(Feature Store)、模型训练平台和模型服务(Model Serving)等 MLOps 组件。但其核心思想,依然是建立在对交易行为的深刻理解和扎实的工程系统之上。
延伸阅读与相关资源
-
想系统性规划股票、期货、外汇或数字币等多资产的交易系统建设,可以参考我们的
交易系统整体解决方案。 -
如果你正在评估撮合引擎、风控系统、清结算、账户体系等模块的落地方式,可以浏览
产品与服务
中关于交易系统搭建与定制开发的介绍。 -
需要针对现有架构做评估、重构或从零规划,可以通过
联系我们
和架构顾问沟通细节,获取定制化的技术方案建议。