Stream 流式编程硬核实践:惰性/短路、无副作用与并行流陷阱
大约 4 分钟
Stream 流式编程硬核实践:惰性/短路、无副作用与并行流陷阱
新手一屏速览
- 中间操作惰性;终端操作触发执行;短路操作(anyMatch/findFirst)可提前结束
- 流处理中避免可见副作用(特别是并行流);优先使用无捕获/纯函数
- 并行流适合大数据量且易切分的场景;I/O 重负载与关联计算慎用
1. 管道模型与操作分类
- 中间操作:map/filter/flatMap/peek/distinct/sorted/limit/skip(懒执行,返回新流)
- 终端操作:forEach/collect/reduce/anyMatch/findFirst/count(触发执行)
- 有状态中间操作(distinct/sorted)可能需要缓冲,增大内存与延迟
2. 无副作用与引用透明
- map/filter 中避免修改外部状态或共享集合;确保纯函数,便于并行与测试
- 输出收集使用
Collectors,如toList/toSet/groupingBy/partitioningBy - 自定义 Collector 时,确定
supplier/accumulator/combiner/finisher的线程安全与结合律
3. 装箱与分配控制
IntStream/LongStream/DoubleStream可避免频繁装箱;链中优先使用原始类型流- 热路径注意
map产生临时对象;必要时回退命令式循环减少分配
4. 并行流适用与坑位
- 适用:CPU 密集、成本均匀、易切分(如数组/ArrayList)、大规模数据
- 坑位:I/O 受限;数据来源难以拆分(如链表);有序性要求强;存在共享状态或锁
- 调度:并行流使用 ForkJoin 公共池,与其他任务竞争;必要时自建池(不推荐默认)
5. reduce 与 collect
最近建一些几十个工作内推群,各大城市都有,群里目前已经收集了很多内推岗位,大厂、中厂、小厂、外包都有。 欢迎HR、开发、测试、运维和产品加入。

扫描下方微信,备注:网站+所在城市,即可拉你进工作内推群。

int sum = IntStream.range(0, 1_000_000).reduce(0, Integer::sum);
var list = Stream.of("a","bb","c").collect(Collectors.toList());- reduce 要求结合律与无副作用;字符串拼接可用
Collectors.joining
6. 自定义 Collector 示例
public static <T> Collector<T, ?, Map<T, Long>> countingMap() {
return Collector.of(HashMap::new,
(m, t) -> m.merge(t, 1L, Long::sum),
(m1, m2) -> { m2.forEach((k,v) -> m1.merge(k,v,Long::sum)); return m1; },
Collections::unmodifiableMap);
}7. 工程清单与反模式
- 清单:原始类型流优先;避免副作用;明确有状态操作的内存影响;为性能关键路径做 JMH
- 反模式:在并行流中写共享集合;链路中过度创建临时对象;在难以拆分数据上开并行
8. 练习
- 使用自定义 Collector 实现 top-N 统计(含并行支持)
- 比较 for 循环、顺序流、并行流在不同数据规模与操作成本下的吞吐
示例代码(可直接复制运行)
示例一:纯函数管道与短路操作
import java.util.*;
import java.util.stream.*;
public class StreamPipelineDemo {
public static void main(String[] args) {
// map/filter 是惰性的,终端操作 count 才触发
long c = IntStream.rangeClosed(1, 100)
.map(x -> x * 2) // 纯函数,无副作用
.filter(x -> x % 3 == 0) // 纯函数过滤
.count(); // 终端操作
System.out.println(c);
// 短路:anyMatch 在找到后立即返回
boolean has = Stream.of("alpha","beta","gamma")
.peek(s -> {}) // 演示惰性:无输出
.anyMatch(s -> s.startsWith("g"));
System.out.println(has);
}
}示例二:避免装箱的原始类型流
import java.util.stream.*;
public class IntStreamSum {
public static void main(String[] args) {
int sum = IntStream.range(0, 1_000_000)
.map(x -> x + 1)
.sum(); // 避免 Integer 装箱
System.out.println(sum);
}
}示例三:自定义 Collector(元素计数 Map)
import java.util.*;
import java.util.stream.*;
public class CountingCollectorDemo {
public static <T> Collector<T, ?, Map<T, Long>> countingMap() {
return Collector.of(
HashMap::new,
(m, t) -> m.merge(t, 1L, Long::sum),
(m1, m2) -> { m2.forEach((k,v) -> m1.merge(k,v,Long::sum)); return m1; },
Collections::unmodifiableMap
);
}
public static void main(String[] args) {
Map<String, Long> freq = Stream.of("a","b","a","c","b","a")
.collect(countingMap());
System.out.println(freq);
}
}示例四:并行流与共享状态陷阱(错误 vs 正确)
import java.util.*;
import java.util.stream.*;
public class ParallelPitfall {
public static void main(String[] args) {
// 错误:并行流写共享 ArrayList,存在竞态与异常风险
List<Integer> target = Collections.synchronizedList(new ArrayList<>());
// 即使同步包装,也可能序列不稳定且成本高
IntStream.range(0, 10_000).parallel().forEach(target::add);
System.out.println("Size (risky): " + target.size());
// 正确:使用收集器生成结果容器
List<Integer> ok = IntStream.range(0, 10_000).parallel().boxed().collect(Collectors.toList());
System.out.println("Size (safe): " + ok.size());
}
}