Java 并发:组合对象
【Java 并发】系列是对《Java Concurrency in Practice》的学习整理。
一、状态空间#
首先要明确的是处理并发问题时,我们到底在保护什么?当然是一个对象中的 field。如果这些 field 都是基本类型(primitive)的,那么这些 field 就构成了这个对象的完整状态(state);如果其中一个 field 指向另一个对象,那么这个状态应当还包括这个对象中的 field,以此类推。
所谓的保护就是要使每一个对象的状态保持在合理、安全的范围内,这个范围就是安全的状态空间(state space)。所以对于不会改动的 field,尽量使用 final 修饰使其不可变,这样可以减小状态空间,简化对对象可能状态的分析。分析包含两个部分:
- 始终通过不变约束(invariant)来判断某种状态是否合法;
- 通过后验条件(postcondition)来判断某种状态转移(state transition)是否合法。
不变约束确定了一个 field 的范围,或者说状态空间。比如一个 int 类型的 field 就需要始终保持在 [Integer.MIN_VALUE, Integer.MAX_VALUE] 之间;而一些 field 又需要满足不能是负值。
所谓后验条件,就是在执行一段代码后必须成立的条件,这个条件只能在运行期确定。比如对一个计数量自增:count++
,就需要一些同步策略(synchronization policy)保证这个操作的原子性,否则这个状态转移就是不安全的。
同步策略定义了对象如何协调对其状态的访问,并且不会违背他的不变约束和后验条件。它将不可变性、线程限制和锁结合起来,从而维护线程的安全性。为了不违背不变约束,通常用一个synchronazed 函数对 field 进行封装,保证再写入时不会变成一个非法的状态;为了不违背后验条件,涉及状态转移的操作必须满足原子性。
// 伪代码
public final class Counter {
public AtomicLong count = new AtomicLong(0); // 保证其原子性以满足后验条件
private long value = 0;
public synchronized long getValue() {
return value;
}
public synchronized long increment() { // 用函数封装以满足不变约束
if (value == Long.MAX_VALUE)
throw new IllegalStateException("counter overflow");
return ++value;
}
}
二、组合对象的线程安全#
对于一个组合对象,状态空间更加复杂,维持其线程安全就会变得困难。下面是采用监视器模式(monitor pattern)来实现线程安全的例子——车辆跟踪器。采用这种方式,线程安全由 MonitorVehicleTracker 本身来保证。所以,涉及到私有对象 locations 发布的方法都必须用 synchronized 修饰。
为了不破坏封装性,getLocations()
返回的是 locations 对象的一个深拷贝。虽然 locations 本身是 final 修饰,不可变的,但是其内部的 MutablePoint 却是可变的。所以,在深拷贝时,总是创建一个新的 MutablePoint 加入到 result 中,最后返回一个不可修改的 result 封装对象。之所以要完成这些看似额外的操作,正是 MutablePoint 的可变性(线程不完全)导致的。
@ThreadSafe
public class MonitorVehicleTracker {
private final Map<String, MutablePoint> locations;
public MonitorVehicleTracker(
Map<String, MutablePoint> locations) {
this.locations = deepCopy(locations);
}
// 需要 synchronized 同步
public synchronized Map<String, MutablePoint> getLocations() {
return deepCopy(locations); // 深拷贝
}
// 需要 synchronized 同步
public synchronized MutablePoint getLocation(String id) {
MutablePoint loc = locations.get(id);
return loc == null ? null : new MutablePoint(loc);
}
// 需要 synchronized 同步
public synchronized void setLocation(String id, int x, int y) {
MutablePoint loc = locations.get(id);
if (loc == null)
throw new IllegalArgumentException("No such ID: " + id);
loc.x = x;
loc.y = y;
}
private static Map<String, MutablePoint>
deepCopy( Map<String, MutablePoint> m) {
Map<String, MutablePoint> result =
new HashMap<String, MutablePoint>();
for (String id : m.keySet())
result.put(id, new MutablePoint(m.get(id)));
return Collections.unmodifiableMap(result);
} }
@NotThreadSafe
public class MutablePoint {
public int x, y;
public MutablePoint() {
x = 0;
y = 0;
}
public MutablePoint(MutablePoint p) {
this.x = p.x;
this.y = p.y;
}
}
如果将可变(线程不安全)的 MutablePoint 改为不可变(线程安全)的 Point,我们在获取 point 时,就不需要对其进行深拷贝,因为其他线程根本无法修改它。
@Immutable
public class Point {
public final int x, y;
public Point(int x, int y) {
this.x = x;
this.y = y;
}
}
同样的思路,不妨把 locations 也设计成线程安全的,这样的话,对其发布时就不需要额外同步了。JDK 提供了 ConcurrentMap 作为线程安全的 Map。
@ThreadSafe
public class DelegatingVehicleTracker {
private final ConcurrentMap<String, Point> locations;
// 可以获取实时的数据
private final Map<String, Point> unmodifiableMap;
public DelegatingVehicleTracker(Map<String, Point> points) {
locations = new ConcurrentHashMap<String, Point>(points);
unmodifiableMap = Collections.unmodifiableMap(locations);
}
public Map<String, Point> getLocations() {
return unmodifiableMap;
}
public Point getLocation(String id) {
return locations.get(id);
}
public void setLocation(String id, int x, int y) {
// 由于不可变,需要 new 一个新的 point
if (locations.replace(id, new Point(x, y)) == null)
throw new IllegalArgumentException(
"invalid vehicle name: " + id);
}
}
由于采用 ConcurrentMap 的 locations 本身是线程安全的,DelegatingVehicleTracker 就不需要额外的处理了,这种方式叫做委托(delegate)。同时需要注意到由于 Point 是不可变的,在 setLocation()
时,就需要用一个新的 point 进行替换了。
将 Point 设计成不可变的一大好处是 getLocations()
时,只需要返回对象构造时就创建好的 unmodifiableMap 即可。由于 unmodifiableMap 封装了 locations,所以通过 unmodifiableMap 就可以获得实时数据。当然,这种实时性也就意味着无法获得同一时间所有车辆的位置信息了(通过深拷贝则可以做到),实际使用中,应当权衡利弊选择模型。
2.1 委托的局限#
车辆跟踪器只有一个 field,即状态变量,如果一个对象有多个状态变量是否都能通过委托来实现线程安全呢?只要多个线程安全的状态变量彼此独立,且类的操作不包括任何无效状态转换,就可以将线程安全委托给这些状态变量。如果多个状态变量之间存在某些约束,那仍然需要额外的线程安全保证。
比如下面的 NumberRange 类就没有完整的保护不变约束——lower 应始终小于等于 upper。即使 lower 和 upper 都是用线程安全的类 AtomicInteger,但是他们之间的约束并没有被有效处理,check-then-act 并不是线程安全的,因为它不能保证原子性。
public class NumberRange {
// INVARIANT: lower <= upper
private final AtomicInteger lower = new AtomicInteger(0);
private final AtomicInteger upper = new AtomicInteger(0);
// 应当 synchronized 修饰
public void setLower(int i) {
// Warning -- unsafe check-then-act
if (i > upper.get())
throw new IllegalArgumentException(
"can’t set lower to " + i + " > upper");
lower.set(i);
}
// 应当 synchronized 修饰
public void setUpper(int i) {
// Warning -- unsafe check-then-act
if (i < lower.get())
throw new IllegalArgumentException(
"can’t set upper to " + i + " < lower");
upper.set(i); }
public boolean isInRange(int i) {
return (i >= lower.get() && i <= upper.get());
}
}
这一局限与 volatile 类似,由于 volatile 只能满足可见性而不满足原子性,所以一个变量只有在没有参与设计其他状态变量的不变约束时,才适合声明为 volatile 类型。
2.2 状态的发布#
再回到车辆跟踪器的例子,不管是可变的 MutablePoint 还是不可变的 Point,都存在一定的局限。有时候放宽约束,使它变成一个安全但是可变的 SafePoint 是不错的选择。
@ThreadSafe
public class SafePoint {
private int x, y;
private SafePoint(int[] a) {
this(a[0], a[1]);
}
public SafePoint(SafePoint p) {
this(p.get());
}
public SafePoint(int x, int y) {
this.x = x;
this.y = y;
}
public synchronized int[] get() {
return new int[] { x, y };
}
public synchronized void set(int x, int y) {
this.x = x; this.y = y;
}
}
PublishingVehicleTracker 和 DelegatingVehicleTracker 类似,不同之处在于,修改一个 safePoint 并不需要重新创建。
@ThreadSafe
public class PublishingVehicleTracker {
private final Map<String, SafePoint> locations;
private final Map<String, SafePoint> unmodifiableMap;
public PublishingVehicleTracker( Map<String, SafePoint> locations) {
this.locations = new ConcurrentHashMap<String, SafePoint>(locations);
this.unmodifiableMap = Collections.unmodifiableMap(this.locations);
}
public Map<String, SafePoint> getLocations() {
return unmodifiableMap;
}
public SafePoint getLocation(String id) {
return locations.get(id);
}
public void setLocation(String id, int x, int y) {
if (!locations.containsKey(id))
throw new IllegalArgumentException(
"invalid vehicle name: " + id);
// 不需要创建新的 safePoint
locations.get(id).set(x, y);
}
}
SafePoint 的确使得调用它变得更加灵活,但是也带来了问题:虽然 unmodifiableMap 是不可修改的,但是其内部的 safePoint 仍然能被修改,从而改变车辆的位置。如果后续加入一些与车辆位置向约束的状态变量,那么线程就不安全了。
三、向已有的线程安全类添加功能#
当我们使用已有的线程安全类却仍然缺少某些功能时,就需要想这些类添加功能,有三种方法:
- 直接修改源码,通常比较困难;
- 扩展一个线程安全类,需要明确知道被扩展类的类型;
- 使用一个 Helper 类对线程安全类扩展功能;
- 直接组合一个类,且不要求其线程安全。
使用第三种方法很容易犯一个错误,如下所示。
@NotThreadSafe
public class ListHelper<E> {
// 并不能明确知道其类型
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
...
public synchronized boolean putIfAbsent(E x) {
boolean absent = !list.contains(x);
if (absent)
list.add(x);
return absent;
}
}
问题在于锁的对象并不一致,当 synchronized 修饰一个非静态方法时,实际上就是对 this(一个 ListHelper 实例) 上锁,而 list 本身根本无法感知到这个实例,更不可能上同一把锁。通过查看 Collections.synchronizedList()
的注解可以知道,这个方法返回的对象是用自身进行上锁的,所以 Helper 可以这样实现:
@ThreadSafe
public class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
...
public boolean putIfAbsent(E x) {
synchronized (list) {
boolean absent = !list.contains(x);
if (absent)
list.add(x);
return absent;
}
}
}
第四种方法严格上来说不算是向线程安全类添加功能,因为他实际上可以直接封装线程不安全的类,比如传入一个 List,并不需要其是线程安全的:
@ThreadSafe
public class ImprovedList<T> implements List<T> {
private final List<T> list;
public ImprovedList(List<T> list) {
this.list = list;
}
public synchronized boolean putIfAbsent(T x) {
boolean contains = list.contains(x);
if (contains)
list.add(x);
return !contains;
}
public synchronized void clear() {
list.clear();
}
// ... similarly delegate other List methods
}
其实就是自己实现 Collections.synchronizedList()
,不过可以有选择地实现某些功能。