线程安全的数据类型
原子对象¶
- 原子性:
- 原子操作是指不会被线程调度器中断的操作
- 比如++就有读取、写入等多个基本操作组成,并不具有原子性
-
满足原子性不代表不会出问题:
- 可见性:一个任务进行的修改,是否对另一个任务是立即可见的
-
java.util.concurrent.atomic
包中包含了一系列的类,这些类提供了原子操作的能力。可以在多线程环境下安全地操作值,而无需使用synchronized
关键字。 -
AtomicIntege
、AtomicReference
等一系列对象 -
一定程度上替代stnchroized,原子对象可以在多线程环境进行无锁的线程安全操作,实现了原子性可见性
- 比如判断原子对象的值和操作之间就可能存在问题!这时还是要使用线程锁
- 即多对象多操作时不能仅仅依赖源自对象
- 作为共享变量时的优先考虑
- 原子类型保证了当多个线程同时更新同一个变量时的线程安全。这意味着每次操作都是完整的,不会被其他线程的操作中断。
线程安全集合¶
-
ConcurrentHashMap
是HashMap
的线程安全版本。它通过使用分段锁(锁分离技术)提供更高的并发性。 -
ConcurrentSkipListMap
: 键值对存储,类似于TreeMap
,但线程安全。基于跳表的并发集合,提供了一种可排序的并发集合实现。 -
ConcurrentSkipListSet
: 类似于TreeSet
的并发变体。 -
ConcurrentLinkedQueue
: 一个高效的线程安全无界队列。非阻塞算法,适用于高并发情况 -
ConcurrentLinkedDeque
: 双端队列版本,支持两端的插入和移除。 -
BlockingQueue
是一个队列,支持阻塞的插入和移除操作。常见的实现有ArrayBlockingQueue
,LinkedBlockingQueue
,PriorityBlockingQueue
,SynchronousQueue
等。 -
DelayQueue
用于放置实现了Delayed
接口的元素,其中的元素只能在其指定的延迟过期后才能从队列中取走。class DelayedElement implements Delayed { private final long delayTime; // 延迟时间 private final long expire; // 到期时间 public DelayedElement(long delay, TimeUnit unit) { this.delayTime = TimeUnit.MILLISECONDS.convert(delay, unit); this.expire = System.currentTimeMillis() + delayTime; } //距离到期的时间 @Override public long getDelay(TimeUnit unit) { long diff = expire - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } //比较延迟 @Override public int compareTo(Delayed o) { DelayedElement that = (DelayedElement) o; return Long.compare(this.expire, that.expire); } }
-
无锁集合的实现策略
- 复制策略:
- 修改是在部分数据结构的一个单独副本上进行的。只有修改完成后才会与主数据结构进行交换,然后读取方才能看见修改。
- 注意读取放看不到未完成的修改
-
CAS策略:
- 从内存中取出一个值,计算新值的同时保存旧值,计算完成后笔算旧值是否与当前内存中的值相同,如果不相同就读取新值并进行重复计算(内存被修改了)
-
synchronized创建同步版本的集合
synchronizedCollection(Collection<T> c)
: 返回指定集合的同步(线程安全)版本。synchronizedList(List<T> list)
: 返回指定列表的同步(线程安全)版本。synchronizedMap(Map<K,V> m)
: 返回指定映射的同步(线程安全)版本。synchronizedSet(Set<T> s)
: 返回指定集合的同步(线程安全)版本。
并行流(推荐)¶
-
.parallel()
将流转化为一个并行流,对于大数据集,使用并行流可以显著提高性能。public class ParallelPrime { static final int COUNT = 100_000; public static boolean isPrime(long n) { return rangeClosed(2, (long)Math.sqrt(n)) .noneMatch(i -> n % i == 0); } public static void main(String [] args) throws IOException { Timer timer = new Timer(); List <String> primes = iterate(2, i -> i + 1) .parallel() // [1] .filter(ParallelPrime:: isPrime) .limit(COUNT) .mapToObj(Long:: toString) .collect(Collectors.toList()); System.out.println(timer.duration()); Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE); } }
-
流的并行化会将输入数据拆分为多个片段,针对独立片段可以使用各种算法
-
数组的切分非常轻量均匀
- 但是链表的切分只会拆分为第一个元素和剩余部分
- 因此range(无状态生成器)并行可以加速计算
-
但是iterate(迭代生成器)不能被很好的划分,相反使用并发反而会拖慢程序的执行
-
不要盲目使用
parallel
并不总是能让程序运行更快 - 使用并行流生成随机顺序序列(这是由于并行后读取顺序不确定)
List <Integer> x = IntStream.range(0, 30)
.limit(10)
.parallel()
.boxed()
.collect(Collectors.toList());