线程与线程池
基本概念¶
线程¶
- Java使用Thread管理线程,创建Thread时JVM会在专门保留的内存区域中分配一大块空间。包含许多内容
- 程序计数器,指示要执行的下一条字节码
- 支持Java代码执行的栈,包含线程到达当前执行结点前所调用过的方法的相关信息以及正在执行的方法的本地变量(基本类型和对象引用)
- 堆由所有线程共享
- 本地代码栈
- 本地线程变量存储
-
用于控制线程的状态维护变
-
获取机器上的处理器数量
Runtime.getRuntime().availableProcessors();
- 通常来说线程的最佳数目就是可用处理器的数量
- 由于超线程只是会大幅加快上下文切换的速度,而并不会增加实际的计算吞吐量。因此设置计算密集型程序时不应该考虑超线程
- 大部分情况由JVM决定即可
死锁¶
- 循环等待造成无限循环
- 死锁发生的条件
- 互斥:任务使用的至少一项资源不是共享的(即同一时间只能被一个对象使用,其他对象必须等待)
- 至少一个任务必须持有一项资源,并且等待正在被另一个任务持有的资源。
- 不能从一个任务中夺走一项资源。
- 会发生循环等待。
线程池¶
- 线程池类型
- FixedThreadPool:创建一个固定大小的线程池。每提交一个任务就创建一个线程,直到线程达到线程池的最大大小。
ExecutorService executor = Executors.newFixedThreadPool(int nThreads);
- 当所有线程都在忙时,新提交的任务不会立即执行,而是会放入线程池的工作队列中等待。
- CachedThreadPool:创建一个可缓存的线程池。如果线程池长度超过处理需求,可灵活回收空闲线程,若无可回收,则新建线程。(无数目限制,除非超过内存)
ExecutorService executor = Executors.newCachedThreadPool();
- SingleThreadExecutor:创建一个单线程的Executor。如果这个线程因异常而结束,会创建一个新线程来继续执行后续的任务。
ExecutorService executor = Executors.newSingleThreadExecutor();
- ScheduledThreadPool:创建一个大小无限制的线程池。支持定时以及周期性执行任务的需求。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(int corePoolSize); scheduler.schedule(Callable <V> callable, long delay, TimeUnit unit);
- WorkStealingPool:基于工作窃取的线程池。适用于大量的短任务,使用多个队列减少竞争。(使用双端队列为每个线程维护任务,如果一个线程没有任务会从别的线程“窃取”任务)
-
ExecutorService executor = Executors.newWorkStealingPool(int parallelism);
-
基本使用方法
execute(Runnable command)
: 提交不需要返回值的任务。submit(Runnable task)
: 提交需要返回值的任务,并返回Future
对象。submit(Callable<T> task)
: 提交有返回值的任务,返回Future<T>
。executor.shutdown();
关闭线程池,仍然会执行完已经提交的任务,但是不接受新任务。
具体操作¶
线程的创建与运行¶
- 使用线程运行对象的函数式接口
class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while (countDown-- > 0) { System.out.print(status()); Thread.yield(); //后面解释 } } } public class BasicThreads { public static void main(String[] args) { //把任务装进线程里 Thread t = new Thread(new LiftOff(10)); t.start(); System.out.println("Waiting for LiftOff"); } }
-
Runnable的内容就是线程的载荷
-
或者直接为线程添加run方法
ublic class SimpleThread extends Thread { private int countDown = 5; private static int threadCount = 0; public SimpleThread() { super(Integer.toString(++threadCount)); start(); } public String toString() { return "#" + getName() + "(" + countDown + "), "; } public void run() { while (true) { System.out.print(this); if (--countDown == 0) return; } } public static void main(String[] args) { for (int i = 0; i < 5; i++) { new SimpleThread(); } } } //更简洁的写法 public class MoreBasicThreads { public static void main(String[] args) { for (int i = 0; i < 5; i++) new Thread(new LiftOff(10)).start(); System.out.println("Waiting for LiftOff"); } }
-
使用线程池
//结合线程池 public class CachedThreadPool { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) exec.execute(new LiftOff(10)); exec.shutdown(); } }
-
睡眠
TimeUnit.MILLISECONDS.sleep(100);
-
让位
Yield
- 向调度程序提示当前线程愿意放弃其当前对处理器的使用。(只是一种启发性质的建议)
- yield会临时暂停当前线程,让有同样优先级的正在等待的线程有机会执行
- 若没有正在等待的线程或者所有正在等待的线程的优先级都较低,则继续运行
-
yield方法不保证当前的线程会暂停或者停止,但是可以保证当前线程在调用yield方法时会放弃CPU
-
线程优先级
Thread.currentThread().setPriority(priority);
- 常量
Thread.MIN_PRIORITY Thread.MAX_PRIORITY
-
尽量不要做
-
守护(Daemon)进程
- 守护程序线程是一种线程,它不会阻止 JVM 在程序完成但线程仍在运行时退出。
Thread daemon = new Thread(new SimpleDaemons()); daemon.setDaemon(true); // Must call before start() daemon.start();
线程局部变量¶
-
线程局部变量,即这些变量对于每个使用它的线程都有独立初始化的副本。
-
定义一个ThreadLocal变量:
private static ThreadLocal<MyObject> myThreadLocal = new ThreadLocal<>();
ThreadLocal
对象不需要一定要在使用它的线程的托管类(即实现Runnable
接口或继承自Thread
类的类)内部定义。ThreadLocal
实例可以定义在任何地方,只要它对将要使用它的线程可见即可。关键是每个线程都可以通过这个ThreadLocal
实例访问到其独立初始化的副本。
-
设置值
myThreadLocal.set(new MyObject());
- 获取
MyObject obj = myThreadLocal.get();
- 清理
myThreadLocal.remove();
class Accessor implements Runnable { private final int id; public Accessor(int idn) { id = idn; } public void run() { while (!Thread.currentThread().isInterrupted()) { ThreadLocalVariableHolder.increment(); System.out.println(this); Thread.yield(); } } public String toString() { return "#" + id + ": " + ThreadLocalVariableHolder.get(); } } public class ThreadLocalVariableHolder { private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() { private Random rand = new Random(47); protected synchronized Integer initialValue() { return rand.nextInt(10000); } }; public static void increment() { value.set(value.get() + 1); } public static int get() { return value.get(); } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) exec.execute(new Accessor(i)); TimeUnit.SECONDS.sleep(3); // Run for a while exec.shutdownNow(); // All Accessors will quit } }
共享资源与线程安全¶
共享资源¶
- 资源竞争:正在运行的任务不止有一个时,任何任务都可能同时对一个共享资源进行读或写操作
- 守护进程
- 一个定时运行的进程或线程,它监控应用程序的执行时间,如果某个任务或整个应用程序运行时间超过预定阈值,它将执行中止操作。
- 解决资源竞争
- 第一个访问资源的任务对资源上锁,其它任务将无法使用资源直到锁被解除,使用资源的新的任务会再次对资源上锁。即共享资源的访问操作串行化,被称为互斥锁。
锁¶
使用显式lock对象
- 显式创建加锁解锁
- Lock lock = new ReentrantLock();
- lock.lock();
- lock.unlock();
- lock语句应该与try-finally配合确保锁一定会被释放
//尝试获取锁
boolean captured = lock.tryLock();
try {
System.out.println("tryLock(): " + captured);
} finally {
if(captured)
lock.unlock();
}
//有最大时间限制
try {
captured = lock.tryLock(2, TimeUnit.SECONDS);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println(
"tryLock(2, TimeUnit.SECONDS): " + captured);
} finally {
if(captured)
lock.unlock();
}
- 更高级的锁
ReentrantLock
- 如果一个线程已经持有了锁,它可以再次请求并获得锁而不会被阻塞。
ReentrantLock
会维护一个持有锁的计数器来跟踪锁的重入次数,线程每请求一次锁,计数器就增加一,每释放一次锁,计数器就减一。当计数器归零时,锁被释放。
synchronized¶
- 一个任务想要执行由 synchronized 保护的代码时,编译器会检查锁是否可用,如果可用,该任务便会获得锁,执行代码然后释放锁。
- 声明在普通方法上时:锁定的是包含该方法的对象实例
- 声明在静态方法上:锁定的是该类的Class 对象。(获取整个类的静态锁,而不是实例对象的锁,尽量避免直接使用,这回造成很大开销)
public synchronized void synchronizedMethod()
- 如果在对一个接下来会被另一个线程读取的变量进行写操作或者操作一个可能刚被另一个线程写完操作的变量,就必须使用同步
-
比如一个变量有自增以及获取值得方法,那这两个方法都应该被 synchronized 修饰
-
对具体的实例对象加锁
public static boolean findReplace(EditBuffer buf, String s, String t) { synchronized (buf) { int i = buf.toString().indexOf(s); if (i == -1) { return false; } buf.delete(i, s.length()); buf.insert(i, t); return true; } }
-
指向防止多个线程同时访问方法中的部分代码,而不是整个方法,要隔离的代码区域就是临界区。
synchronized(this) { // 同步代码块 }//参数表示要锁定的对象
-
必须先获得被锁定对象的锁才能进入代码块
- 使用临界区主要是为了提升性能,即不锁定不需要锁定但是耗时的部分
- 在其它对象上进行同步
- 通过传入其他对象作为参数,可以实现在其他对象而不是对象自身上操作锁
线程协同¶
底层直接协同¶
-
join等待线程(等待另一个线程完成之后继续)
class Joiner extends Thread { private Sleeper sleeper; // 一个自定义线程类型 public Joiner(String name, Sleeper sleeper) { super(name); this.sleeper = sleeper; start(); } public void run() { try { sleeper.join(); } catch (InterruptedException e) { System.out.println("Interrupted"); } System.out.println(getName() + " join completed"); } }
-
wait():
- 当一个线程调用
wait()
时,它会释放当前持有的监视器锁(这是与sleep的区别),并暂停执行,直到另一个线程调用相同对象的notify()
或notifyAll()
方法。 -
调用
wait()
后,当前线程进入对象的等待集(wait set),处于阻塞状态,直到被通知(或中断)。 -
notify():
notify()
方法用于唤醒在此对象监视器上等待的单个线程。-
如果有多个线程都在等待,那么会随机选择一个线程进行唤醒。被唤醒的线程将尝试重新获取对象监视器的锁。
-
notifyAll():
notifyAll()
方法用于唤醒在此对象监视器上等待的所有线程。-
一旦调用
notifyAll()
,所有处于等待集中的线程都会被唤醒,然后竞争尝试重新获取对象监视器的锁。 -
例子:汽车打蜡
class Car { private boolean waxOn = false; public synchronized void wax() { System.out.println("Wax On by " + Thread.currentThread().getName()); waxOn = true; notifyAll(); } public synchronized void buff() { System.out.println("Wax Off by " + Thread.currentThread().getName()); waxOn = false; notifyAll(); } public synchronized void waitForWaxing() throws InterruptedException { while (waxOn == false) wait(); } public synchronized void waitForBuffing() throws InterruptedException { while (waxOn == true) wait(); } } class WaxOn implements Runnable { private Car car; private String name; public WaxOn(Car c, String name) { this.car = c; this.name = name; } public void run() { Thread.currentThread().setName(name); try { while (!Thread.interrupted()) { car.waitForBuffing(); TimeUnit.MILLISECONDS.sleep(200); car.wax(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; private String name; public WaxOff(Car c, String name) { this.car = c; this.name = name; } public void run() { Thread.currentThread().setName(name); try { while (!Thread.interrupted()) { car.waitForWaxing(); TimeUnit.MILLISECONDS.sleep(500); car.buff(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax Off task"); } } public class WaxOMatic { public static void main(String[] args) throws Exception { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car, "A-OFF")); exec.execute(new WaxOn(car, "B-ON")); exec.execute(new WaxOn(car, "C-ON")); TimeUnit.SECONDS.sleep(5); // Run for a while... exec.shutdownNow(); // Interrupt all tasks } }
- 出现了问题!因为检查之后放弃了锁,在这个短暂的空闲会出问题!
CountDownLauch¶
- 用于在一组线程之间进行同步,它允许一个或多个线程等待直到在其他线程中进行的一组操作完成。
CountDownLatch
在创建时被初始化为一个给定的计数值(称为“计数”)。这个计数是指需要等待完成的操作数量。CountDownLatch latch = new CountDownLatch(N); // N是计数值
- countDown() 方法:每次调用
countDown()
方法都会将计数减少一。这通常在某个操作完成后被调用。 - await() 方法:一个或多个线程调用
await()
方法会使这些线程在CountDownLatch
上等待,直到计数达到零。
//N个等1个
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
//1个等N个
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}