并发工具与容器:Executor 进阶、Future/CompletableFuture、并发集合
大约 3 分钟
并发工具与容器:Executor 进阶、Future/CompletableFuture、并发集合
新手一屏速览
- 优先使用 ThreadPoolExecutor 并暴露指标;拒绝策略与压测阈值化
- CompletableFuture 提供组合/异常链;注意线程池与上下文传播
- 并发集合与同步工具配合:CHM、BlockingQueue、LongAdder、Semaphore/Latch/Barrier
1. ThreadPoolExecutor 进阶
- 自定义队列(ArrayBlockingQueue/LinkedBlockingQueue/SynchronousQueue)
- 饱和策略:按压测设定队列长度与拒绝策略;暴露拒绝数/队列长度/耗时分布
2. Future 与超时
Future<String> f = pool.submit(callable);
try { f.get(200, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { f.cancel(true); }- 为每次调用设置合理超时;取消时传递中断
3. CompletableFuture 编排与异常
CompletableFuture<String> f =
CompletableFuture.supplyAsync(() -> loadUser(id), pool)
.thenCombineAsync(CompletableFuture.supplyAsync(() -> loadOrders(id), pool),
(u, o) -> render(u, o), pool)
.exceptionally(ex -> fallback());- 使用
handle/exceptionally/whenComplete进行异常链管理;避免吞错
4. 并发集合与计数器
- ConcurrentHashMap 与 compute 合理使用;LongAdder 用于高并发热点计数
- 队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(可能较大)、SynchronousQueue(直接移交)
5. 同步工具
最近建一些几十个工作内推群,各大城市都有,群里目前已经收集了很多内推岗位,大厂、中厂、小厂、外包都有。 欢迎HR、开发、测试、运维和产品加入。

扫描下方微信,备注:网站+所在城市,即可拉你进工作内推群。

- Semaphore 控制并发许可;CountDownLatch 一次性栅栏;CyclicBarrier 可循环
- Phaser 处理阶段性任务;注意线程泄露与中断处理
6. 上下文传播与观测
- 异步线程间的 MDC/上下文传播使用库(如装饰器或 TransmittableThreadLocal);暴露指标至监控系统
- 对关键链路建立超时/舱壁/断路器,与线程池策略协同
7. 练习
- 使用 CompletableFuture 并行拉取三类远程数据,合并结果,设计超时与降级
- 使用不同队列与参数配置压测 ThreadPoolExecutor,并输出拒绝策略效果
示例代码(可直接复制运行)
示例一:ThreadPoolExecutor 有界队列与拒绝策略
import java.util.concurrent.*;
public class BoundedPool {
public static void main(String[] args) {
ThreadPoolExecutor ex = new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(8),
r -> { Thread t = new Thread(r); t.setName("biz-" + t.getId()); return t; },
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i=0;i<50;i++) {
final int id = i;
ex.execute(() -> {
try { Thread.sleep(50); } catch (InterruptedException ignored) {}
System.out.println("task " + id + " by " + Thread.currentThread().getName());
});
}
ex.shutdown();
}
}示例二:Future 超时与取消
import java.util.concurrent.*;
public class FutureTimeout {
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newSingleThreadExecutor();
Future<String> f = es.submit(() -> { Thread.sleep(500); return "ok"; });
try { System.out.println(f.get(100, TimeUnit.MILLISECONDS)); }
catch (TimeoutException te) { f.cancel(true); System.out.println("cancelled"); }
es.shutdownNow();
}
}示例三:CompletableFuture 组合与异常链
import java.util.concurrent.*;
public class CFCompose {
static String loadUser(int id){ return "U"+id; }
static String loadOrders(int id){ return "O"+id; }
static String render(String u, String o){ return u+"-"+o; }
static String fallback(){ return "fallback"; }
public static void main(String[] args) {
Executor e = Executors.newFixedThreadPool(4);
CompletableFuture<String> f =
CompletableFuture.supplyAsync(() -> loadUser(1), e)
.thenCombineAsync(CompletableFuture.supplyAsync(() -> loadOrders(1), e),
(u, o) -> render(u, o), e)
.exceptionally(ex -> fallback());
System.out.println(f.join());
((ExecutorService) e).shutdown();
}
}示例四:ConcurrentHashMap 与 LongAdder
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
public class CHMAndAdder {
public static void main(String[] args) throws Exception {
ConcurrentHashMap<String, LongAdder> counter = new ConcurrentHashMap<>();
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i=0;i<1000;i++) {
es.execute(() -> counter.computeIfAbsent("k", k -> new LongAdder()).increment());
}
es.shutdown(); es.awaitTermination(1, TimeUnit.SECONDS);
System.out.println(counter.get("k").sum());
}
}示例五:同步工具(Semaphore 与 CountDownLatch)
import java.util.concurrent.*;
public class SyncToolsDemo {
public static void main(String[] args) throws Exception {
Semaphore sem = new Semaphore(2); // 并发许可 2
CountDownLatch latch = new CountDownLatch(3); // 等待 3 个任务结束
ExecutorService es = Executors.newFixedThreadPool(3);
for (int i=0;i<3;i++) {
final int id = i;
es.execute(() -> {
try {
sem.acquire();
Thread.sleep(100);
System.out.println("job " + id);
} catch (InterruptedException ignored) {
} finally {
sem.release();
latch.countDown();
}
});
}
latch.await();
es.shutdown();
}
}