信号量(Semaphore)是一种控制多线程(进程)访问共享资源的同步机制,是由荷兰的Dijkstra大佬在1962年前后提出来的。
信号量的原理 信号量机制包含以下几个核心概念:
信号量S,整型变量,需要初始化值大于0
P原语,荷兰语Prolaag(probeer te verlagen),表示减少信号量,该操作必须是原子的
V原语,荷兰语Verhogen,表示增加信号量,该操作必须是原子的
从上图不难看出信号量的两个核心操作,P和V:
P操作,原子减少S,然后如果S < 0
,则阻塞当前线程
V操作,原子增加S,然后如果S <= 0
,则唤醒一个阻塞的线程
信号量一般被用来控制多线程对共享资源的访问,允许最多S个线程同时访问临界区,多于S个的线程会被P操作阻塞,直到有线程执行完临界区代码后,调用V操作唤醒。所以PV操作必须是成对出现的。
那么信号量可以用来干什么呢?
信号量似乎天生就是为限流而生的,我们可以很容易用信号量实现一个限流器 。
信号量可以用来实现互斥锁 ,初始化信号量S = 1
,这样就只能有一个线程能访问临界区。很明显这是一个不可重入的锁。
信号量甚至能够实现条件变量,比如阻塞队列
动手实现一个信号量 学习这些经典理论的时候,最好的办法还是用自己熟悉的编程语言实现一遍。Java并发包提供了一个信号量的java.util.concurrent.Semaphore
,是用AbstractQueuedSynchronizer
的共享模式实现的,以后会单独分析关于AQS相关的原理,这里不再展开描述,其核心思想是CAS。 下面是我用Java实现的一个简单的信号量,这里使用synchronized
来替代互斥锁
信号量实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class Semaphore { private int s; public Semaphore (int s) { this .s = s; } public synchronized void p (int decr) { s -= decr; if (s < 0 ) { try { wait(); } catch (InterruptedException e) { } } } public synchronized void v (int incr) { s += incr; if (s <= 0 ) { notify(); } } }
用信号量限流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Limiter implements Executor { private Semaphore semaphore; public Limiter (int limit) { semaphore = new Semaphore(limit); } public void execute (Runnable runnable) { if (runnable != null ) { new Thread(() -> { semaphore.p(1 ); runnable.run(); semaphore.v(1 ); }).start(); } } }
用信号量实现互斥锁 1 2 3 4 5 6 7 8 9 10 11 12 public class SemaphoreLock { private Semaphore semaphore = new Semaphore(1 ); public void lock () { semaphore.p(1 ); } public void unlock () { semaphore.v(1 ); } }
用信号量实现阻塞队列 实现阻塞队列需要两个信号量和一个锁(锁也可以用信号量代替)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class SemaphoreBlockingQueue <T > { private Semaphore notFull; private Semaphore notEmpty; private SemaphoreLock lock = new SemaphoreLock(); private Object[] table; private int size; public SemaphoreBlockingQueue (int cap) { if (cap < 1 ) { throw new IllegalArgumentException("capacity must be > 0" ); } notEmpty = new Semaphore(0 ); notFull = new Semaphore(cap); table = new Object[cap]; } public void add (T t) { notFull.p(1 ); lock.lock(); table[size++] = t; lock.unlock(); notEmpty.v(1 ); } @SuppressWarnings ("unchecked" ) public T poll () { T element; notEmpty.p(1 ); lock.lock(); element = (T) table[--size]; lock.unlock(); notFull.v(1 ); return element; } }
本文整理自
信号量机制
仅做个人学习总结所用,遵循CC 4.0 BY-SA版权协议,如有侵权请联系删除!