《Java并发编程实战》-6

取消与关闭


行为良好的软件能够完善地处理失败,关闭和取消等过程。

7.1任务取消

取消某个操作的原因:

  • 用户请求取消
  • 有时间限制的操作
  • 应用程序时间
  • 错误
  • 关闭

7.1.1 中断

在Java的API或语言规范中,并没有将中断与任何取消语言关联起来,但实际是激昂,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

public class Thread {
    // 中断目标线程
    public void  interupt() { ... }
    // 返回目标线程的中断状态
    public boolean isInterrupted() { ...}
    // 静态, 将清楚当前线程的中断状态,并返回它之前的值,这也是清楚中断状态的唯一方法
    public static boolean interrupted() { ... }
}

非阻塞状态下中断时,它的中断状态将被设置,中断操作将变得“有粘性” -- 如果不触发InterruptedEXception,那么中断状态将一直保持,直到明确地清除中断状态。

调用interrupt并不意味着立刻停止目标线程正在进行的工作,而只是传递了请求中断的消息。中断操作并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己(这些时刻也被称为取消点)。

通常,中断是实现取消的最合理方式。

通过中断来取消。

class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!Thread.currentThread().isInterrupted()) {
            queue.put( p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* 允许线程推出 */
        }
    }

    public void cancel() { interrupt(); }

}

7.1.2 响应中断

两种实用策略可以用于处理InterruptedException

  • 传递异常 (可能在执行某个特定于任务的清除操作之后),从而使你的方法也可以成为可中断的阻塞方法。
  • 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。

只有实现了线程中断策略的代码才可以屏蔽中断请求,在常规的任务和库代码中都不应该屏蔽中断请求。

不可取消的任务在推出前恢复中断

public Task getNextTask(BlockingQueue<Task> queue) {
    boolean interrupted = false;
    try {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            interrupted = ture;
        }
    } finally {
        if (interrupted) 
            Thread.currentThread().interrupt();
    }
}

7.1.4 示例:计时器运行

7.1.5 通过Future来实现取消

当Future.get()抛出InterruptedException或TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

7.1.6 处理不可中断的阻塞

  • Java.io包中的同步Socket I/O。
  • java.io包的同步I/O。
  • Selector的异步I/O。
  • 获取某个锁。

7.1.7 采用newTaskFor来封装非标准的取消

7.2 停止基于线程的服务

对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期的方法。

7.2.1 示例:日志服务

7.2.2 关闭ExecutorService

7.2.3 “毒丸”对象

7.2.4 示例:只执行一次的服务

7.2.5 shutdownNow的局限性

7.3 处理非正常的线程终止

未捕获异常的处理

将异常写入日志的UncaughtExceptionHandler

public class UEHLogger implements THread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, 
        "Thread terminated with exception: " + t.getName(), e);
    }
}

在运行时间较长的应用程序中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

7.4 JVM关闭

7.4.1 关闭钩子

7.4.2 守护线程

此外,守护线程通常不能用来代替应用程序管理中各个服务的生命周期。

7.4.3 终结器

避免使用终结器。

小结

在任务、线程、服务以及应用程序等模块中的生命周期结束问题,可能会增加它们在设计和实现时的负责性。Java并没有提供某种抢占式的机制来取消操作或者终结线程。相反,它提供了一种协作式的中断机制来实现取消操作,但这要依赖于如何构建取消操作的协议,以及能否始终遵循这些协议。通过使用FutureTask和Executor框架,可以帮助我们构建可取消的任务和服务。

《Java并发编程实战》-5

《Java并发编程实战》-5

6.1.1 串行地执行renwu

6.1.2 显式地为任务创建线程

6.1.3 无限制创建线程的不足

  • 线程生命周期的开销非常高。
  • 资源消耗。
  • 稳定性。

6.2 Executir框架

6.2.1 示例:基于Executor的Web服务器

6.2.2 执行策略

每当看到new Thread(runnable).start()时,并且你希望获得一种更灵活的执行策略时,请考虑使用Excecutor来代替Thread。

6.2.3 线程池

  • newFixedThreadPool
  • newCachedTheadPool
  • newSingleThread
  • newScheduleThreadPool

6.2.4 Executor的声明周期

6.2.5 延迟任务与周期任务

6.3 找出可利用的并行性

6.3.1 示例:串行的页面渲染器

6.3.2 携带结果的任务Callable与Future

6.3.3 实例:使用Future实现页面渲染器

6.3.4 在异构任务并行化中存在的局限

6.3.6 实例:使用CompletionService实现页面渲染器

6.3.7 为任务设置时限

6.3.8 示例:旅行预订门户网站

小结

Executor框架将任务提交与执行策略解耦开来,同时还支持多种不同类型的执行策略。
要想将应用程序分解为不同的任务时获得最大的好处,必须定义清晰的任务边界。

《Java并发编程实战》-4

《Java并发编程实战》-4

5.1 同步容器类

Collections.synchronizedXxx等工厂方法创建的同步实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

5.5.1 同步容器的问题

在使用客户端加锁的Vector上的复合操作

public static Object getList(Vector list) {
    synchronized (list) {
        int lastIndex = list.size() - 1;
        return list.get(lastIndex);
    }
}

public static void deleteLast(Vector list) {
    synchronized (this) {
        int lastIndex = list.size() - 1;
        list.remove(lastIndex);
    }
}

5.1.2 迭代器与ConcurrentModificationException

当容器类发现容器在迭代过程中被修改时,就会抛出一个"ConcurrentModificationException
",这就是“快速失败”。

5.1.3 隐藏迭代器

正如封装对象的状态有助于维持不变性条件,封装对象的同步机制同样有助确保实施同步策略。

5.2 并发容器

通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。

并发容器 同步容器 普通容器
BlockingQueue ConcurrentLinkedQueue PriorityQueue
ConcurrentHashMap, SkipListMap hashtable, Collections.synchroniedList(TreeSet set) hashmap,SortedMap, SortedSet
CopyOnWriteArrayList Vector, Collections.synchroniedList(List list) ArrayList, LinkedList
CopyOnWriteArraySet, ConcurrentSkipListListSet Collections.synchroniedSet(Set set) LinkedHashSet, HashSet, TreeSet

5.2.1 ConcurrentHashMap

ConcurrentHashMap使用更细粒度的加锁机制---分段锁来实现更大程度的共享。

ConcurrentHashMap具有弱一致性(Weakly Consistent),而非"及时失败"。

只有当应用程序需要加锁Map以进行独占访问时,才应该放弃使用ConcurrentHashMap。

5.2.2 额外的原子Map操作

public interface ConcurrentMap<K,V> extends Map<K,V> {
    // 仅当K没有相应的映射值时才插入
    V putIfAbsent(K key, V value);

    // 仅当K被映射到oldValue时才替换为newValue
    boolean remove(K key, V value);

    // 仅当K被映射到oldValue时才替换为newValue
    boolean replace(K key, V oldValue, V newValue);

    // 仅当K被映射到某个值时才替换为newValue
    boolean replace(K key, V newValue);
}

5.2.3 CopyOnWriteArrayList

CopyOnWriteArrayLsit用于替代同步List,某些情况下它提供了更好的并发行性能,并且在迭代期间不需要对容器进行加锁或复制(类似地,CopyOnWriteArraySet的作用是替代同步Set)。

每当修改容器时都会复制底层数组,这需要一定的开销。

仅当迭代操作远远多于修改操作时,才应该使用"写入时负责"容器。

5.3 阻塞队列和生产者-消费者模式

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

BlockingQueue的方法:

  • put 如果队列满则阻塞
  • take 如果队列为空则阻塞
  • offer 如果队列满则返回一个失败状态,一般是false
  • poll 如果队列空则返回一个表示空的数据,一般是null

BlockingQueue的多种实现:

  • LinkedBlockingQueue 基于链表,FIFO
  • ArrayBlockingQueue 基于数组, FIFO
  • PriorityBlockingQueue 按优先级排序的队列
  • SynchronousQueue 它不会为队列中元素维护储存空间,它维护一组线程,这些线程在等待着吧元素加入或者移出队列。

5.3.1 示例:桌面搜索

5.3.2 串行线程封闭

对象池利用了串行线程封闭,将一个对象“借给”一个请求线程。

还可以通过ConcurrentMap的原子方法remove或者AtomicReference的原子方法compareAndSet来转笔可变对象的所有的所有权(但必须确保只有一个线程能接受被转移的对象)。

双端队列与工作密取

Deque是一个双端队列,实现了在队头和队尾的高效插入和移除。具体实现包括ArrayList和LinkedBlockingDeque。

双端队列适用于工作密取模式。

5.4 阻塞方法与中断方法

  • 传递
  • 恢复中断
public class TackRunnable implements Runnable {
    try {
            processTack(queue.take());
        } catch (InterruptedException e) {
            // 恢复被中断的状态
            Thread.currentThread().interrupt();
        }
}

5.5 同步工具类

5.5.1 闭锁

CoutDwonLatch,倒计时门闩,又名闭锁、倒计时计数器,是一种同步工具类,可以延迟线程大的进度直到其到达终止状态。

闭锁可以用来确保某些活动直到其他活动都完成后才能继续执行。

  • 确保某个计算在其需要得所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须现在这个闭锁上等待。
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务启动后会释放锁S,这样其他依赖S的服务才能继续执行。
  • 等待直到某个操作的所有参与者(例如,在多个玩家游戏中的所有玩家)都就绪再继续执行。这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

5.5.2 FutureTask

FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于一下三种状态:

  • 等待运行(Wating to run)
  • 正在运行(Running)
  • 运行完成(Completed)

信号量

计数信号量用来控制同时访问某个某个特殊资源的操作数量,或者同时执行某个指定操作的数量。计算信号量还可以用来实现某种资源池,或者对容器施加边界。

5.5.4 栅栏

栅栏类似于闭锁,他能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程线程。

CyclicBarrier可以使一定数量的参与反复地在栅栏位置汇集,他在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。

另一种形式的栅栏使Exchange,他是一种两方栅栏,各方在栅栏位置上交换数据。当两分法执行不对称的操作时,Exchanger会非常有用,例如当一个线程向缓冲区写入数据,而另一个线程从缓冲区读取数据。

5.6 构建高效且可伸缩的结果缓存

使用ConcurrentHashMap和FutureTask来构架缓存

public interface Computable<A, V> {
    V compute(A arg) throws InterruptedExcetion;
}

public class Menoizer<A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<A, V> c;

    public Menoizer(Computable<A, V> c) { this.c = c; }
    while (true) {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                public V call() throws InterruptedException {
                    return c.compute(args);
                }
            };
            FutureTask<V> ft = new FutureTask<V>(eval);
            // 复合操作(“若没有则添加”)
            f = cache.putIfAbsent(arg, ft);
            if(f == null) {
                f = ft;
                ft.run();
            }
        }
        try{
            // 若正在计算,则阻塞等待结果
            return f.get();
        } catch (CancellationException e) {
            // 出现异常则去除缓存,防止缓存污染
            cache.renove(arg,f);
        } catch (ExecutionException e) {
            throw launderThrowable(w.getCause())
        }
    }
}

第一部分小结

  • 可变状态是至关重要的。
    所有的并发问题都可以归结为如何协调对并发状态的访问,可变状态越少,就越容易确保线程安全。
  • 尽量将域声明为final类型,除非需要它们是可变的。
  • 不可变对象一定是线程安全的。
    不可变对象能极大地降低并发编程的复杂性。它们更为简单而且安全,可以任意共享而无须使用加锁或保护性复制等机制。
  • 封装有助于管理复杂性
    在编写线程安全的程序时,虽然可以将所有数据都保存在全局变量中,但为什么要这么做?将数据封装在对象中,更易于维持不变性条件:将同步机制封装在对象中,更易于遵循同步策略。
  • 用锁来保护每个可变变量。
  • 当保护同一个不变性条件中的所有变量时,要使用用一个锁。
  • 在执行复合操作期间,要持有锁。
  • 如果从多个线程中访问同一个可变变量没有同步机制,那么程序会出现问题。
  • 不要故作聪明地推断出不需要使用同步。
  • 在设计过程中考虑线程安全,或者在文档中明确地指出它不是线程安全的。
  • 将同步策略文档化。