当前位置:首页 » 《随便一记》 » 正文

Semaphore 信号量_程序员的暴击的博客

1 人参与  2022年03月06日 13:25  分类 : 《随便一记》  评论

点击全文阅读


信号量,用来限制能同时访问共享资源的线程上限。

作用: 多个共享资源互斥的使用!并发限流,控制最大的线程数

在这里插入图片描述
Semaphore维护了一个许可集,其实就是一定数量的“许可证”。
当有线程想要访问共享资源时,需要先获取(acquire)的许可;如果许可不够了,线程需要一直等待,直到许可可用。当线程使用完共享资源后,可以归还(release)许可,以供其它需要的线程使用。

和CountDownLatch区别

CountDownLatch:

同步状态State > 0表示资源不可用,所有线程需要等待;State == 0表示资源可用,所有线程可以同时访问

Semaphore:

剩余许可数 < 0表示没有许可数了共享资源不可用,所有线程需要等待; 许可剩余数 ≥ 0表示存在许可数了共享资源可用,所有线程可以同时访问

public static void main(String[] args) {

    Semaphore semaphore = new Semaphore(3);

    for (int i = 1; i <=6; i++) {

        new Thread(()->{
            //得到
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"抢到车位");
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()+"离开车位");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                semaphore.release();
            }
        },String.valueOf(i)).start();
    }
}
限流
  • 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机
    线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)

    当资源数和线程数使用semohore 比较合适比如数据库连接池:一个线程数对应一个线程资源

  • 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,
    注意下面的实现中线程数和数据库连接数是相等的

public class PoolSemaphoreStream {
    public static void main(String[] args) {
        Pool pool = new Pool(2);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                Connection conn = pool.borrow();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                pool.free(conn);
            }).start();
        }
    }
}

class Pool {
    // 1. 连接池大小
    private final int poolSize;

    // 2. 连接对象数组
    private final Connection[] connections;

    // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    private final AtomicIntegerArray states;

    private final Semaphore semaphore;

    // 4. 构造方法初始化
    public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        try {
            Driver driver =new com.mysql.cj.jdbc.Driver();

            DriverManager.registerDriver(driver);
            for (int i = 0; i < poolSize; i++) {

                connections[i] =  DriverManager.getConnection("jdbc:mysql://localhost:3306/myemployees","root","123123");;
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    // 5. 借连接
    public Connection borrow() {// t1, t2, t3
        // 获取许可
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if(states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    System.out.println(("borrow {}" + connections[i]));
                    return connections[i];
                }
            }
        }
        // 不会执行到这里
        return null;
    }
    // 6. 归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                System.out.println(("free {}" + conn));
                semaphore.release();
                break;
            }
        }
    }
}

单位时间内限流

public class TestController {
private RateLimiter limiter = RateLimiter.create(50);
@GetMapping("/test")
public String test() {
  // limiter.acquire();
    return "ok";
  }
}

原理

semaphore.acquire(); 获得,假设如果已经满了,等待,等待被释放为止!semaphore.release(); 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一 刚开始,permits(state)为 3,这时 5 个线程来获取资源。

在这里插入图片描述
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

在这里插入图片描述
这时 Thread-4 释放了 permits,状态如下

在这里插入图片描述
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

在这里插入图片描述
内部结构
Semaphore 通过内部类实现了AQS框架提供的接口, 内部类分别实现了公平/非公平策略。


public Semaphore(int permits) {
        sync = new NonfairSync(permits);
 }
 
//可以看出Semaphore分公平与非公平
 public Semaphore(int permits, boolean fair) {
       sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }
//抽象的Sync继承与AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);//设置Semaphore的许可数
        }

        final int getPermits() {
            return getState();
        }

       //非公平的尝试获取共享锁
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();//获取可以使用的许可数
                int remaining = available - acquires;//剩余许可数
                //剩下许可数为<0或者CAS 修改总许可数为剩余许可数成功返回remaining
                //如果许可已经用完, 返回负数, 表示获取失败;如果 cas 重试成功, 返回正数, 表示获取成功
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

       //尝试释放共享锁
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();//当前许可数
                //其他的锁比如countDownLatch 和ReentrantLock这里都是减,因为释放许可的时候需要将使用掉的许可数+releases
                //这样其他线程才能获取新的许可
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))//修改当前许可数为新的许可数next
                    return true;
            }
        }

       //减少许可数
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
       // 立即获取所有可用许可证,当许可证数为0或者修改许可证为0的时候返回
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

//非公平许可
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }
        //调用 Sync的nonfairTryAcquireShared
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
   

//公平许可
static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }
        //获取共享锁
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                //有前驱节点在同步队列里,说明等待队列里有其他线程正在获取许可,则直接返回-1,体现公平性
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;//剩余许可数
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }


public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);//AQS的可中断获取共享锁操作
    }

 public void release() {
        sync.releaseShared(1);//AQS的释放共享锁操作
    }

Semaphore 资源就是许可证的数量:

  • 剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) ≥ 0:资源可用
  • 剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) < 0:资源不可用
  • 只能限制同时访问资源的线程数,至于对数据一致性的控制,Semaphore是不关心的。如果是只有一个许可的Semaphore,可以当作锁使用。
  • Semaphore 的限流是限制线程数,而不是限制资源数当资源数和线程数使用semohore 比较合适比如数据库连接池:一个线程数对应一个线程资源

点击全文阅读


本文链接:http://zhangshiyu.com/post/35818.html

线程  获取  资源  
<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1