并发编程由浅及深(一)

并发编程重要吗?当然重要,因为并发在我们的项目中真实存在,如果你不能充分了解它那么很可能造成严重的生产事故。最近隔壁项目组出了一个问题,每次请求接口之后都发现线程固定增加了5个,而且线程数一直增加没有减少,他们怀疑是中间件的问题,但实际上是因为他们的代码中线程池使用不当造成。所以了解并发是很有必要的。

并发编程由浅及深

  • 1.基础概念
  • 2.Thread
    • 2.1 线程的创建
    • 2.2 线程的中断方法
    • 2.3 线程的等待唤醒机制
  • 3.ThreadLocal(源码)
    • 3.1 new ThreadLocal<>()
    • 3.2 ThreadLocal#set()
      • 3.2.1ThreadLocal#createMap
        • 3.2.1.1 new ThreadLocalMap()
        • 3.2.1.2 ThreadLocal#getMap
    • 3.3 myThreadLocal.get() 方法
      • 3.3.1 map.getEntry(this) 方法
    • 3.4 myThreadLocal.remove() 方法
      • 3.4.1 m.remove(this)
  • 4.ReentrantLock(非公平锁实现源码)
    • 4.1 new ReentrantLock()
    • 4.2 lock.lock()
      • 4.2.1 sync.lock()
        • 4.2.1.1 acquire(1)
          • 4.2.1.1.1 tryAcquire
            • 4.2.1.1.1.1 nonfairTryAcquire()方法
          • 4.2.1.1.2 addWaiter
            • 4.2.1.1.2.1 enq()
          • 4.2.1.1.3 acquireQueued()
            • 4.2.1.1.3.1 shouldParkAfterFailedAcquire()
            • 4.2.1.1.3.1 parkAndCheckInterrupt()
    • 4.3 lock.unlock()
      • 4.3.1AbstractQueuedSynchronizer#release
        • 4.3.1.1 ReentrantLock.Sync#tryRelease
        • 4.3.1.2 AbstractQueuedSynchronizer#unparkSuccessor
  • 5.ArrayBlockingQueue
    • 5.1 ArrayBlockingQueue#ArrayBlockingQueue(int)
      • 5.1.1 ArrayBlockingQueue#ArrayBlockingQueue(int, boolean)
    • 5.2 ArrayBlockingQueue#put
      • 5.2.1 ArrayBlockingQueue#enqueue
      • 5.2.2 AbstractQueuedSynchronizer.ConditionObject#await()
        • 5.2.2.1 AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
        • 5.2.2.2 AbstractQueuedSynchronizer#fullyRelease
        • 5.2.2.2.1 AbstractQueuedSynchronizer#release
        • 5.2.2.3 AbstractQueuedSynchronizer#isOnSyncQueue
    • 5.2 ArrayBlockingQueue#take
    • 5.2.1 ArrayBlockingQueue#dequeue
  • 6 CountDownLatch
    • 6.1 CountDownLatch#CountDownLatch
    • 6.2 CountDownLatch#await()
      • 6.2.1 AbstractQueuedSynchronizer#acquireSharedInterruptibly
        • 6.2.1.1 CountDownLatch.Sync#tryAcquireShared
        • 6.2.1.2 AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
    • 6.3 CountDownLatch#countDown
      • 6.3.1 AbstractQueuedSynchronizer#releaseShared
      • 6.3.1.1 CountDownLatch.Sync#tryReleaseShared
      • 6.3.1.2 AbstractQueuedSynchronizer#doReleaseShared
      • 6.3.1.2.1 AbstractQueuedSynchronizer#unparkSuccessor

1.基础概念

  1. 并行:两台饮水机,两个人去接水一人使用一台
  2. 并发:一台饮水机,两个人去接水,需要等待一个人接完才行
  3. 异步:不需要等待结果返回
  4. 同步:需要等待结果返回
  5. 线程:是运行在进程内:比如音乐播放器打开之后,在随机播放着音乐 的同时,还可以继续搜索我们想听的音乐。可以理解有两个线程分别处理着音乐播放和音乐搜索
  6. 进程:是一个具体的应用示例:比如打开音乐播放器就是运行了一个进程
  7. 线程上下文切换:cpu调度线程时切换线程同时需要保存线程相关数据,cpu和内存之间

2.Thread

2.1 线程的创建

线程的创建到底有几种?这个答案其实不是固定。要看回答的方向

There are two ways to create a new thread of execution
eg1:new Thread().start()
eg2:R implements Runnable

上面这句话是Thread类上面的注释,也就是官方说是两种,但是实际使用过程中根据根据这两种又有衍生和变化。概括有一下几种

1. new Thread().start();
2. R implements Runnable
	new Thread(R).start();
3. C implements Callable
	new Thread(new FutureTask<>(C)).start();
4. R implements Runnable
	new ThreadPoolExecutor().execute(R);

2.2 线程的中断方法

1. Thread.interrupt() 给线程加一个中断标记
2. Thread.interrupted() 判断线程是否中断,会清除标记
3. Thread.isInterrupted() 判断线程是否中断,不会清除标记

2.3 线程的等待唤醒机制

1. synchronizedwait(),notify()
2. locksupport 的 park(),unpark()
3. condition wait() signal()

3.ThreadLocal(源码)

下面这段代码大概就是我们使用ThreadLocal的方式,接下来将根据这段代码中的方法对ThreadLocal原理进行剖析。

    // 1.new
    ThreadLocal<Map<String,String>> myThreadLocal = new ThreadLocal<>();

    // 2.存值
    Map<String,String> map = new HashMap<>();
    map.put("org","0001");
    myThreadLocal.set(map);

    // 3.获取值
    map = myThreadLocal.get();
    String org = map.get("org");
    System.out.println(org);

    // 4.清空
    myThreadLocal.remove();

3.1 new ThreadLocal<>()

    public ThreadLocal() {
    }

3.2 ThreadLocal#set()

    public void set(T value) {
    	// 获取当前线程
        Thread t = Thread.currentThread();
        // 获取thread对应的map
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
        	// 创建map
            createMap(t, value);
    }

3.2.1ThreadLocal#createMap

    void createMap(Thread t, T firstValue) {
        // 给thread的threadLocals属性赋值
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
3.2.1.1 new ThreadLocalMap()
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
			// 创建初始长度为16的数组
            table = new Entry[INITIAL_CAPACITY];
            // firstKey传的是this 即myThreadLocal对象本身
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            // 数组中存入值:key=myThreadLocal,firstValue=map
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }
3.2.1.2 ThreadLocal#getMap
    ThreadLocalMap getMap(Thread t) {
    	// 获取thread中的ThreadLocalMap属性
        return t.threadLocals;
    }

执行完myThreadLocal.set(map),最终结构如下图:
在这里插入图片描述

3.3 myThreadLocal.get() 方法

    public T get() {
        Thread t = Thread.currentThread();
        // 获取thread的属性ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null) {
        	// 获取key为myThreadLocal的Entry 
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                // 返回value即为set存储的map
                return result;
            }
        }
        // 如果当前线程未set值,默认返回null
        return setInitialValue();
    }

3.3.1 map.getEntry(this) 方法

       private Entry getEntry(ThreadLocal<?> key) {
       	   // 和set时一样,使用myThreadLocal对象获取数组的下标
           int i = key.threadLocalHashCode & (table.length - 1);
           Entry e = table[i];
           // 当前下标位置数组值不为null,且key值相等。即为get返回值
           if (e != null && e.get() == key)
               return e;
           else
               return getEntryAfterMiss(key, i, e);
       }

3.4 myThreadLocal.remove() 方法

     public void remove() {
     	// 获取当前程的属性ThreadLocalMap
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
         	// 移除当前myThreadLocal对象所在下标的数组的值
             m.remove(this);
     }

3.4.1 m.remove(this)

        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            // 获取对象下标的数组值
            for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();
                    expungeStaleEntry(i);
                    return;
                }
            }
        }

4.ReentrantLock(非公平锁实现源码)

下面的源码实现主要列的是非公平锁,公平锁代码和非公平锁差不多所以鼓励大家自己看看。看源码前请大家想象一个场景,现在有两个线程,第一个线程获取锁成功了在执行业务逻辑时,此时第二个线程过来加锁。下面的源码分析的就是上面这样一个场景。

常规使用如下:

 // 1.创建 ReentrantLock 对象
        ReentrantLock lock = new ReentrantLock(); 
        // 2.获取锁
        lock.lock(); 
        try {
           // 业务逻辑
        } finally {
            // 3.释放锁
            lock.unlock(); 
        }

4.1 new ReentrantLock()

    public ReentrantLock() {
    	// 给ReentrantLock属性赋值;NonfairSync实际上就是抽象的队列同步器
        sync = new NonfairSync();
    }

NonfairSync类的继承关系
在这里插入图片描述

4.2 lock.lock()

    public void lock() {
        sync.lock();
    }

4.2.1 sync.lock()

   final void lock() {
   		// 使用CAS的方式尝试将同步等待队列中的state由0改为1
       if (compareAndSetState(0, 1))
       		// 修改成功代表获取锁成功,将独占线程赋值为当前线程
           setExclusiveOwnerThread(Thread.currentThread());
       else
       		// 1.获取锁
           acquire(1);
   }
4.2.1.1 acquire(1)
    public final void acquire(int arg) {
    	// tryAcquire尝试获取一次锁,如果失败了addWaiter进入队列,然后再次获取锁acquireQueued
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
4.2.1.1.1 tryAcquire
	protected final boolean tryAcquire(int acquires) {
			// 非公平获取锁
            return nonfairTryAcquire(acquires);
        }
4.2.1.1.1.1 nonfairTryAcquire()方法
	// acquires从2.1方法中传过来 固定为1
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 获取同步等待队列的state属性,默认值为0,如果不为0代表有线程获取锁成功
        int c = getState();
        if (c == 0) {
        	// 如果当前没有线程获取到锁,就直接cas加锁
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // c!=0,当前已有线程获取了锁。判断是不是同一个线程
        else if (current == getExclusiveOwnerThread()) {
        	// 可重入锁实现 通过state值自增判断重入几次
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
4.2.1.1.2 addWaiter
	// node从4.2.1.1传递的是null
    private Node addWaiter(Node mode) {
    	// 构建node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 1.将尾节点赋值给变量pred
        Node pred = tail;
        // 2.判断尾节点是否为空,为空则代表链表为空未进行初始化(双向指针构造的)
        if (pred != null) {
        	// 2.1 尾节点不为空,则将当前node节点添加到链表尾部
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 2.2 尾节点为空,则构建链表,并且插入
        enq(node);
        return node;
    }
4.2.1.1.2.1 enq()
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 1.当尾节点为空,构造链表(new node()设置为头节点,并且尾节点和头节点指向同一个node)
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            // 2.尾节点不为空,将当前获取锁失败的线程所构建的node作为尾节点,并且修改双指针指向
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

我们假设thread1,先获取到了锁,那么thread2执行完上面的enq()方法,双向链表形状大概如下图所示
在这里插入图片描述

4.2.1.1.3 acquireQueued()
	// node为获取锁失败的线程构建的node,arg为1
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//1.获取node节点的前置节点
                final Node p = node.predecessor();
                //2 如果前置节点为头节点,则链表中实际只有一个线程在等待。再次尝试获取锁,如果获取到了锁,将当前node设置为head
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //3 判断线程是否应该阻塞,修改node节点waitStatus==-1等待被唤醒,并且阻塞线程
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
4.2.1.1.3.1 shouldParkAfterFailedAcquire()

Node有一个volatile int waitStatus属性
默认=0;SIGNAL=-1;CANCELLED=1;CONDITION=-2;PROPAGATE=-3

	// pred:当前线程构建节点的前置节点 node:当前线程构建的节点
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 第二次循环waitStatus==-1,因为4.2.1.1.3 有for (;;)所以会再次进入
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        	// 第一次循环waitStatus==0,默认走这里。通过cas将0修改为-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

执行完shouldParkAfterFailedAcquire方法后,链表状态大概如下图:
在这里插入图片描述

4.2.1.1.3.1 parkAndCheckInterrupt()
	// 底层使用UNSAFE.park将线程阻塞在这里
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

4.3 lock.unlock()

    public void unlock() {
        sync.release(1);
    }

4.3.1AbstractQueuedSynchronizer#release

    public final boolean release(int arg) {
    	// 1.尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	// 2.唤醒等待的后置节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
4.3.1.1 ReentrantLock.Sync#tryRelease
		// releases==1
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            // 如果释放锁的线程不等于加锁线程抛异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 释放锁。即将独占线程置为null,恢复state=0
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
4.3.1.2 AbstractQueuedSynchronizer#unparkSuccessor
	// 这里的node==head
    private void unparkSuccessor(Node node) {
    	// ws==-1
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // head节点的下一个节点即为thread2所在节点。唤醒阻塞中的thread2
        if (s != null)
            LockSupport.unpark(s.thread);
    }

5.ArrayBlockingQueue

整个源码逻辑很多,下面梳理的代码,以最简单的情况为例。一个Producer线程向队列中添加元素,一个Consumer线程从队列中获取元素。有了这个前提再看下面的源码会更加容易一些。

常见使用方式如下:

	......省略
    ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(16);
	......省略
    abq.put(new Random(100).nextInt());
	......省略
    abq.take();

5.1 ArrayBlockingQueue#ArrayBlockingQueue(int)

	// capacity初始容量
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

5.1.1 ArrayBlockingQueue#ArrayBlockingQueue(int, boolean)

	// fair是否公平? 默认false
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 创建指定长度的数组
        this.items = new Object[capacity];
        // 创建非公平锁
        lock = new ReentrantLock(fair);
        // 获取condition
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

5.2 ArrayBlockingQueue#put

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 加锁(可中断锁)
        lock.lockInterruptibly();
        try {
        	// count为数组中存的元素个数 相等则将producer线程await在这里
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

5.2.1 ArrayBlockingQueue#enqueue

    private void enqueue(E x) {
        final Object[] items = this.items;
        // putIndex数组下标>>存数下标 默认putIndex==0
        items[putIndex] = x;
        // 循环数组实现。数组存满之后,下标恢复为0
        if (++putIndex == items.length)
            putIndex = 0;
        // count记录数组中存的元素格式
        count++;
        // 队列中有元素之后,唤醒consumer线程进行消费
        notEmpty.signal();
    }

5.2.2 AbstractQueuedSynchronizer.ConditionObject#await()

     public final void await() throws InterruptedException {
         if (Thread.interrupted())
             throw new InterruptedException();
          // 见 5.2.2.1 
         Node node = addConditionWaiter();
         // 这里返回savedState==1
         int savedState = fullyRelease(node);
         int interruptMode = 0;
         // isOnSyncQueue 返回fasle,所以while条件成立
         while (!isOnSyncQueue(node)) {
         	// producer 线程在这里park阻塞
             LockSupport.park(this);
             if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                 break;
         }
         if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
             interruptMode = REINTERRUPT;
         if (node.nextWaiter != null) // clean up if cancelled
             unlinkCancelledWaiters();
         if (interruptMode != 0)
             reportInterruptAfterWait(interruptMode);
     }
5.2.2.1 AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
    private Node addConditionWaiter() {
    	// lastWaiter默认为null
        Node t = lastWaiter;
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

第一次执行完addConditionWaiter方法,Node大概是如下结构
在这里插入图片描述

5.2.2.2 AbstractQueuedSynchronizer#fullyRelease
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
        	// 在5.2中先加锁了,所以这里默认返回savedState==1
            int savedState = getState();
            // 这里if条件成立 直接return 1
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
5.2.2.2.1 AbstractQueuedSynchronizer#release
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
        	// 前面的代码没有给head赋值 所以这里null==head,这里直接返回true
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
5.2.2.3 AbstractQueuedSynchronizer#isOnSyncQueue
  final boolean isOnSyncQueue(Node node) {
  	  // node前面执行addConditionWaiter方法时,-2==waitStatus 。所以这里直接return fale
      if (node.waitStatus == Node.CONDITION || node.prev == null)
          return false;
      if (node.next != null) // If has successor, it must be on queue
          return true;
      return findNodeFromTail(node);
  }

5.2 ArrayBlockingQueue#take

   public E take() throws InterruptedException {
       final ReentrantLock lock = this.lock;
       // 先加锁
       lock.lockInterruptibly();
       try {
       	   // 当数组中元素数量=0时,让consumer线程await在这里
           while (count == 0)
               notEmpty.await();
           // 队列中有元素时 获取数组元素
           return dequeue();
       } finally {
           lock.unlock();
       }
   }

5.2.1 ArrayBlockingQueue#dequeue

   private E dequeue() {
       final Object[] items = this.items;
       // takeIndex为获取数组时的下标 默认从0开始
       E x = (E) items[takeIndex];
       items[takeIndex] = null;
       // 当取到数组最后一个元素值重置下标
       if (++takeIndex == items.length)
           takeIndex = 0;
       // 没消费一个元素 数组元素个数-1
       count--;
       if (itrs != null)
           itrs.elementDequeued();
       // 当队列中元素被消费后 唤醒producer线程进行put
       notFull.signal();
       return x;
   }

6 CountDownLatch

源码分析的场景如下:第一个线程启动后调用await方法被阻塞,第二个线程调用countDown方法将state置为0,再唤醒第一个阻塞的线程

常见使用方法如下:

   CountDownLatch countDownLatch = new CountDownLatch(1);
   
   countDownLatch.await();
   
   countDownLatch.countDown();

6.1 CountDownLatch#CountDownLatch

  // 这里为了简单,我们示例中假设cont==1
  public CountDownLatch(int count) {
      if (count < 0) throw new IllegalArgumentException("count < 0");
      // Sync extends AbstractQueuedSynchronizer (这一步相当于state=count)
      this.sync = new Sync(count);
  }

6.2 CountDownLatch#await()

    public void await() throws InterruptedException {
    	// 判断state!=0时阻塞线程
        sync.acquireSharedInterruptibly(1);
    }

6.2.1 AbstractQueuedSynchronizer#acquireSharedInterruptibly

   public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
        // 判断state是否等于0 ,因为初始化时我们设置state==1,所以这 -1<0  if成立
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
   }
6.2.1.1 CountDownLatch.Sync#tryAcquireShared
   protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
   }
6.2.1.2 AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
	// addWaiter 在前面ReentrantLock中讲过。主要构建双向链表
   final Node node = addWaiter(Node.SHARED);
   boolean failed = true;
   try {
       for (;;) {
           final Node p = node.predecessor();
           if (p == head) {
           	    // 这里又判断 state是否等于0 ,所以if条件不成立
               int r = tryAcquireShared(arg);
               if (r >= 0) {
                   setHeadAndPropagate(node, r);
                   p.next = null; // help GC
                   failed = false;
                   return;
               }
           }
           // 最终线程会被park方法阻塞在这里。这个方法在ReentrantLock中讲过
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())
               throw new InterruptedException();
       }
   } finally {
       if (failed)
           cancelAcquire(node);
   }
}

6.3 CountDownLatch#countDown

  public void countDown() {
  	  // 执行state-1,当sate==0,唤醒阻塞的线程
      sync.releaseShared(1);
  }

6.3.1 AbstractQueuedSynchronizer#releaseShared

 public final boolean releaseShared(int arg) {
 	 // if条件成立
     if (tryReleaseShared(arg)) {
         doReleaseShared();
         return true;
     }
     return false;
 }

6.3.1.1 CountDownLatch.Sync#tryReleaseShared

  protected boolean tryReleaseShared(int releases) {
      for (;;) {
      	  // 这里getState()==1
          int c = getState();
          if (c == 0)
              return false;
          int nextc = c-1;
          // 使用cas将state从1修改为0
          if (compareAndSetState(c, nextc))
              return nextc == 0;
      }
  }

6.3.1.2 AbstractQueuedSynchronizer#doReleaseShared

到这里再看一下前面的node的结构
在这里插入图片描述

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 根据前面构建的node的结构,接下来的两个if条件都是满足的
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; 
                    // 这里唤醒后面的阻塞线程
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

6.3.1.2.1 AbstractQueuedSynchronizer#unparkSuccessor

// 这里node是前面传进来的head
private void unparkSuccessor(Node node) {
   int ws = node.waitStatus;
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);
   // s节点的线程就是我们自己被阻塞的线程
   Node s = node.next;
   if (s == null || s.waitStatus > 0) {
       s = null;
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
   }
   // 通过unpark唤醒我们前面通过park方法阻塞的线程
   if (s != null)
       LockSupport.unpark(s.thread);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/146639.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

景联文科技:驾驭数据浪潮,赋能AI产业——全球领先的数据标注解决方案供应商

根据IDC相关数据统计&#xff0c;全球数据量正在经历爆炸式增长&#xff0c;预计将从2016年的16.1ZB猛增至2025年的163ZB&#xff0c;其中大部分是非结构化数据&#xff0c;被直接利用&#xff0c;必须通过数据标注转化为AI可识别的格式&#xff0c;才能最大限度地发挥其应用价…

飞书开发学习笔记(六)-网页应用免登

飞书开发学习笔记(六)-网页应用免登 一.上一例的问题修正 在上一例中&#xff0c;飞书登录查看网页的界面显示是有误的&#xff0c;看了代码&#xff0c;理论上登录成功之后&#xff0c;应该显示用户名等信息。 最后的res.nickName是用户名&#xff0c;res.i18nName.en_us是英…

leetcode二分查找算法题

目录 1.二分查找2.在排序数组中查找元素的第一个和最后一个位置3.x的平方根4.搜索插入位置5.山脉数组的峰顶索引6. 寻找峰值7.寻找旋转排序数组中的最小值8.8.0~n-1中缺失的数字 1.二分查找 二分查找 class Solution { public:int search(vector<int>& nums, int …

Ubuntu 17.10 “Artful Aardvark” 发布首个 Beta

Ubuntu 17.10 “Artful Aardvark” 首个 Beta 版已发布。 按照 Ubuntu 17.10 的发布日程 &#xff0c;Ubuntu 17.10 首个 beta 版按时发布了。不过参与本次测试版的没有 Ubuntu 官方风味版本&#xff08;要尝试的话可以考虑每日构建 ISO&#xff09;&#xff0c;包括了 Kubunt…

uniapp插件开发

安装android studio&#xff1a;安装目录下bin下的此文件&#xff0c;是用来修改分配给android studio的占用内存。 Android 11足够用。 创建新项目&#xff1a; 目录结构介绍&#xff1a; UI组件介绍&#xff1a;在设计程序界面时可以使用可视化拖拽的方式&#xff0c;没有必要…

RT-Thread STM32F407 五步完成OLED移植

这里使用RT-Thread Studio提供的IIC API驱动函数进行移植 第一步&#xff0c;进入RT-Thread Settings配置IIC驱动 第二步&#xff0c;进入board.h&#xff0c;定义IIC宏 第三步&#xff0c;进入STM32CubeMX&#xff0c;配置时钟及IIC 第四步&#xff0c;添加oled.c及oled…

Mozilla 面向基于 Debian 的 Linux 发行版

导读Mozilla 公司今天发布新闻稿&#xff0c;表示面向 Debian、Ubuntu 和 Linux Mint 等基于 Debian 的发行版&#xff0c;推出了.deb 格式的 Firefox Nightly 浏览器安装包&#xff0c;便于用户在上述发行版中更轻松地安装。 本次更新的亮点之一在于采用 APT 存储库&#xff0…

毫米波雷达模块的目标检测与跟踪

毫米波雷达技术在目标检测与跟踪方面具有独特的优势&#xff0c;其高精度、不受光照影响等特点使其在汽车、军事、工业等领域广泛应用。本文深入探讨毫米波雷达模块在目标检测与跟踪方面的研究现状、关键技术以及未来发展方向。 随着科技的不断进步&#xff0c;毫米波雷达技术在…

Zabbix 5.0部署(centos7+server+MySQL+Apache)

环境 系统IPZABBIX版本主机名centos7192.168.231.2195.0zabbix-server 安装zabbix 我选择版本是zabbix-5.0 zabbix的官网是Zabbix :: The Enterprise-Class Open Source Network Monitoring Solution 安装Zabbix软件源 rpm -Uvh https://repo.zabbix.com/zabbix/5.0/rhel/7/…

【开发工具】gitee还不用会?我直接拿捏 >_>

&#x1f308;键盘敲烂&#xff0c;年薪30万&#x1f308; 目录 git的一些前置操作 如何获取本地仓库 本地仓库的操作 远程仓库操作 合并两个仓库&#xff08;通用方法&#xff09; 从远程仓库拉取文件报错 fatal:refusing to merge unrelated histories 分支操作 注意&…

人工智能基础_机器学习033_多项式回归升维_多项式回归代码实现_非线性数据预测_升维后的数据对非线性数据预测---人工智能工作笔记0073

然后我们来实际的操作一下看看,多项式升维的作用,其实就是为了,来对,非线性的数据进行拟合. 我们直接看代码 import numpy as np import matplotlib.pyplot as plt from sklearn.linear_model import LinearRegression X=np.linspace(-1,11,num=100) 从-1到11中获取100个数…

MVC使用的设计模式

MVC使用的设计模式 一、背景 MVC模式是"Model-View-Controller"的缩写&#xff0c;中文翻译为"模式-视图-控制器"。MVC应用程序总是由这三个部分组成。Event(事件)导致Controller改变Model或View&#xff0c;或者同时改变两者。只要Controller改变了Model…

No210.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

JVM虚拟机:垃圾回收之三色标记

本文重点 在前面的课程中我们已经学习了垃圾回收器CMS和G1,其中CMS和G1中的mixedGC都存在四个过程,这四个过程中有一个过程叫做并发标记,也就是说程序一边运行,一边标记垃圾。这个过程最困难的是:如果在标记垃圾的时候,如果对象的引用关系发生了改变,此时应该如何处理?…

Spark通过三种方式创建DataFrame

通过toDF方法创建DataFrame 通过toDF的方法创建 集合rdd中元素类型是样例类的时候&#xff0c;转成DataFrame之后列名默认是属性名集合rdd中元素类型是元组的时候&#xff0c;转成DataFrame之后列名默认就是_N集合rdd中元素类型是元组/样例类的时候&#xff0c;转成DataFrame…

【Python】一文带你掌握数据容器之集合,字典

目录&#xff1a; 一、集合 思考&#xff1a;我们目前接触到了列表、元组、字符串三个数据容器了。基本满足大多数的使用场景为何又需要学习新的集合类型呢? 通过特性来分析: &#xff08;1&#xff09;列表可修改、支持重复元素且有序 &#xff08;2&#xff09;元组、字符…

数据结构第四课 -----线性表之栈

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

常见排序算法实现

&#x1f495;"每一天都是值得被热爱的"&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;常见排序算法实现 1.排序的概念 所谓排序&#xff0c;就是按照特定顺序重新排列序列的操作 排序的稳定性&#xff1a; 当一个序列中存在相同的元素时 排序过…

1、NPC 三电平SVPWM simulink仿真

1、SVPWM时间计算函数&#xff0c;是从matlab的SVPWM3L_TimingCalculation.p文件中反汇编出来的函数&#xff1a; function [TgABC_On ,TgABC_Off ,Sn ]SVPWM3L_TimingCalculation_frompfile (Vref ,DeltaVdc ,Fsw ) %#codegen %coder .allowpcode (plain ); TgABC_On [0 ,0 ,…

Genio 700安卓核心板-MT8390安卓核心板规格参数

Genio 700(MT8390)安卓核心板是一款专门针对智能家居、互动零售、工业和商业应用的高性能边缘人工智能物联网平台。它集成了高度响应的边缘处理、先进的多媒体功能、各种传感器和连接选项&#xff0c;并支持多任务操作系统。 )安卓核心板采用高效的芯片内人工智能多处理器(APU)…