七、并发工具(上)

一、自定义线程池

1)背景:

  • 在 QPS 量比较高的情况下,我们不可能说所有的访问都创建一个线程执行,这会导致内存占用过高,甚至有可能出现 out of memory
  • 另外也要考虑 cpu 核数,如果请求超过了cpu核数,那么有一部分线程就会收到限制,然后等cpu时间片结束会进行一个上下文切换,频繁的上下文也会影响性能。

总和以上,我们可以引出线程池的概念,也就是结合前面的享元模式,创建一批线程,复用线程,既可以较少内存占用,又可以较少线程上下文切换。

2)任务数量放不满 Blocking Queue

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.TestPool")
public class Test1AndPoolHandle1 {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
        for (int i = 0; i < 5; i ++ ) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("{}", j);
            });
        }
    }
}
@Slf4j(topic = "c.TestPool")
class ThreadPool {
    // 线程池大小
    private int capacity;
    // 队列
    private BlockQueue<Runnable> blockQueue;
    // 线程队列
    private HashSet<Worker> workers = new HashSet<>();
    private long timeout;
    private TimeUnit timeUnit;

    public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize) {
        this.timeout = timeout;
        this.timeUnit = unit;
        this.capacity = capacity;
        blockQueue = new BlockQueue<>(queueSize);
    }
    // 执行任务
    public void execute(Runnable runnable) {
        synchronized (workers) {
            if (workers.size() < capacity) {
                Worker worker = new Worker(runnable);
                log.debug("开始创建线程...{}", worker);
                workers.add(worker);
                worker.start();
            } else {
                log.debug("线程已满...加入队列");
                blockQueue.put(runnable);
            }
        }

    }

    class Worker extends Thread{
        private Runnable runnable;

        public Worker(Runnable runnable) {
            this.runnable = runnable;
        }

        @Override
        public void run() {
            //执行任务
            // 1) 当任务不为空是,执行任务
            // 2) 当任务执行完毕,接着从队列中获取任务执行
            //这里有赋值不用上锁,是因为poll跟take实现都加了锁
            //            while ((runnable != null) || (runnable = blockQueue.take()) != null) {
            while ((runnable != null) || (runnable = blockQueue.poll(timeout, timeUnit)) != null) {
                try {
                    log.debug("正在执行。。。{}", runnable);
                    runnable.run();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    runnable = null;
                }
            }
            synchronized (workers) {
                log.debug("{} 被移除了", this);
                workers.remove(this);
            }
        }
    }
}

@Slf4j(topic = "c.TestPool")
class BlockQueue<T> {
    // 队列大小
    private int capacity;

    //存放队列
    private Deque<T> deque = new ArrayDeque<>();

    // 一把锁
    private ReentrantLock lock = new ReentrantLock();

    // 等待条件
    private Condition fullCondition = lock.newCondition();
    private Condition emptyCondition = lock.newCondition();

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    public int size() {
        return deque.size();
    }

    // 获取任务 设置超时时间
    public T poll(long time, TimeUnit unit) {
        try {
            lock.lock();
            long nanos = unit.toNanos(time);
            while (deque.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        log.debug("超时结束");
                        return null;
                    }
                    //进行等待
                    nanos = emptyCondition.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            return deque.removeFirst();
        } finally {
            fullCondition.signal();
            lock.unlock();
        }
    }
    // 获取任务
    public T take() {
        try {
            lock.lock();
            while (deque.isEmpty()) {
                try {
                    emptyCondition.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            //            log.debug("删除队列头...");
            return deque.removeFirst();
        } finally {
            fullCondition.signal();
            lock.unlock();
        }
    }

    // 阻塞添加存放任务
    public void put(T runnable) {
        try {
            lock.lock();
            while (deque.size() == capacity) {
                log.debug("等待加入任务队列 {} ...", runnable);
                try {
                    fullCondition.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            log.debug("存放到队列尾");
            deque.addLast(runnable);
            emptyCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}

2)任务数量放满 Blocking Queue (改进)

1. 带超时时间的 阻塞添加

// 带超时时间的存放任务
public boolean offer(T runnable, long time, TimeUnit unit) {
    try {
        lock.lock();
        long nanos = unit.toNanos(time);
        while (deque.size() == capacity) {
            log.debug("等待加入任务队列 {} ...", runnable);
            try {
                if (nanos <= 0) {
                    log.debug("等待加入任务队列超时 {} ...", runnable);
                    return false;
                }
                nanos = fullCondition.awaitNanos(nanos);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        log.debug("线程已满...加入队列{}",runnable);
        deque.addLast(runnable);
        emptyCondition.signal();
        return true;
    } finally {
        lock.unlock();
    }
}

2. 设计模式 之 策略模式

如果存放的队列已经满了,之前的版本就进入阻塞死等了,那么我们可以改进的方式

  • 带超时时间的等待
  • 让调用者放弃执行
  • 让调用者自己执行
  • 让调用者抛出异常
  • ... 等

有很多方式,如果写死的话,就会有很多else if,那么定死了,没有扩展性,所以使用策略模式,将权利下放,利用函数式接口,让调用者传进来拒绝策略。



import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.TestPool")
public class Test1AndPoolHandle1 {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
            // 1、 死等
            //queue.put(task);
            // 2、 带超时时间等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);
            // 3、 让调用者放弃
//            log.debug("放弃任务{}", task);
            // 4、 让调用者抛出异常
//            throw new RuntimeException("跑出异常" + task);
            // 5、 让调用者自己执行
            task.run();
        });
        for (int i = 0; i < 3; i ++ ) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                    log.debug("{}", j);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            });
        }
    }
}

@FunctionalInterface
interface RejectFunction<T> {
    void reject(BlockQueue<T> tBlockQueue, T task);


}
@Slf4j(topic = "c.TestPool")
class ThreadPool {
    // 线程池大小
    private int capacity;
    // 队列
    private BlockQueue<Runnable> blockQueue;
    // 线程队列
    private HashSet<Worker> workers = new HashSet<>();
    private long timeout;
    private TimeUnit timeUnit;
    private RejectFunction<Runnable> rejectFunction;

    public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize, RejectFunction<Runnable> rejectFunction) {
        this.timeout = timeout;
        this.timeUnit = unit;
        this.capacity = capacity;
        blockQueue = new BlockQueue<>(queueSize);
        this.rejectFunction = rejectFunction;
    }
    // 执行任务
    public void execute(Runnable runnable) {
        synchronized (workers) {
            if (workers.size() < capacity) {
                Worker worker = new Worker(runnable);
                log.debug("开始创建线程...{}", worker);
                workers.add(worker);
                worker.start();
            } else {
//                blockQueue.put(runnable);
                blockQueue.tryPut(rejectFunction, runnable);
            }
        }

    }

    class Worker extends Thread{
        private Runnable runnable;

        public Worker(Runnable runnable) {
            this.runnable = runnable;
        }

        @Override
        public void run() {
            //执行任务
            // 1) 当任务不为空是,执行任务
            // 2) 当任务执行完毕,接着从队列中获取任务执行
            //这里有赋值不用上锁,是因为poll跟take实现都加了锁
//            while ((runnable != null) || (runnable = blockQueue.take()) != null) {
            while ((runnable != null) || (runnable = blockQueue.poll(timeout, timeUnit)) != null) {
                try {
                    log.debug("正在执行。。。{}", runnable);
                    runnable.run();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    runnable = null;
                }
            }
            synchronized (workers) {
                log.debug("{} 被移除了", this);
                workers.remove(this);
            }
        }
    }
}

@Slf4j(topic = "c.BlockQueue")
class BlockQueue<T> {
    // 队列大小
    private int capacity;

    //存放队列
    private Deque<T> deque = new ArrayDeque<>();

    // 一把锁
    private ReentrantLock lock = new ReentrantLock();

    // 等待条件
    private Condition fullCondition = lock.newCondition();
    private Condition emptyCondition = lock.newCondition();

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    public int size() {
        return deque.size();
    }

    // 获取任务 设置超时时间
    public T poll(long time, TimeUnit unit) {
        try {
            lock.lock();
            long nanos = unit.toNanos(time);
            while (deque.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        log.debug("超时结束");
                        return null;
                    }
                    //进行等待
                    nanos = emptyCondition.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            return deque.removeFirst();
        } finally {
            fullCondition.signal();
            lock.unlock();
        }
    }
    // 获取任务
    public T take() {
        try {
            lock.lock();
            while (deque.isEmpty()) {
                try {
                    emptyCondition.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
//            log.debug("删除队列头...");
            return deque.removeFirst();
        } finally {
            fullCondition.signal();
            lock.unlock();
        }
    }

    // 存放任务
    public void put(T runnable) {
        try {
            lock.lock();
            while (deque.size() == capacity) {
                log.debug("等待加入任务队列 {} ...", runnable);
                try {
                    fullCondition.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            log.debug("线程已满...加入队列{}",runnable);
            deque.addLast(runnable);
            emptyCondition.signal();
        } finally {
            lock.unlock();
        }
    }
    // 带超时时间的存放任务
    public boolean offer(T runnable, long time, TimeUnit unit) {
        try {
            lock.lock();
            long nanos = unit.toNanos(time);
            while (deque.size() == capacity) {
                log.debug("等待加入任务队列 {} ...", runnable);
                try {
                    if (nanos <= 0) {
                        log.debug("等待加入任务队列超时 {} ...", runnable);
                        return false;
                    }
                    nanos = fullCondition.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            log.debug("线程已满...加入队列{}",runnable);
            deque.addLast(runnable);
            emptyCondition.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectFunction<T> rejectFunction , T runnable) {
        lock.lock();
        try {
            if (deque.size() == capacity) {
                rejectFunction.reject(this, runnable);
            } else {
                log.debug("线程已满...加入队列{}",runnable);
                deque.addLast(runnable);
                emptyCondition.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}

二、ThreadPoolExecutor

1)线程池状态

ThreadPoolExecutor 使用 int 的高3位来表示线程池状态,低29位表示线程数量。

从数字上比较 RUNNING > SHUTDOWN > STOP > TIDYING > TERMINATED

为什么111 比 00小,因为是一个整数的高3位,所以第一位为1表示负数。

问题:

为什么使用一个整数表示线程池状态和线程数量,不用两个变量来表示呢?

答:

因为要保证在对 线程池状态 和 线程数量 进行赋值操作时的原子性

那么他存在一个变量中,就只需要一次cas操作就可以保证原子性,既可以保存状态信息,也可以保存数量信息。

如果存在两个变量中,就需要两次cas操作才可以保证原子性,才可以对两个变量进行赋值操作。

2)构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

假设 核心线程= 2 救急线程=1 阻塞队列=2

  • 一开始任务阻塞队列空闲,线程也空闲
  • 任务1 创建 核心线程1 执行
  • 任务2 创建 核心线程2 执行
  • 核心线程满了,任务3 进入 阻塞队列
  • 核心线程满了,任务4 进入 阻塞队列
  • 核心线程满了, 阻塞队列满了,任务5 创建 救急线程1 执行
  • 核心线程满了, 阻塞队列满了,救急线程满了,任务6执行拒绝策略

救急线程空闲一定时间会自动销毁,等待下次用到在创建

核心线程一直运行。

触发救急线程创建的前提 是配合有界队列来使用

如果线程到达 最大线程数量 仍然有新的任务,这时会执行拒绝策略。

拒绝策略 jdk 提供了 4 种实现,其他著名框架也提供了实现。

3)newFixedThreadPool (固定大小线程池)

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
//可以给第二参数,传线程工厂,提供创建新线程的名字。

特点:

  • 核心线程 == 最大线程数 (没有救急线程),因此无需超时时间
  • 阻塞队列是无界的,没有指定大小,可以放任务数量任务

评价

适用于任务量已知,相对耗时的任务

4)newCachedThreadPool (带缓冲线程池)

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

特点

  • 核心线程为0,最大线程为Integer.MAX_VALUE,救急线程的空闲生存时间 60s
    • 也就是全部都是救急线程(60s后可以回收)
    • 救急线程可以无限创建
  • 队列采用 SynchronousQueue ,特点是没有容量,没有线程来获取的时候放不进去任务(类似于 一手交钱、一手交货)
  • 刚好当前线程池全部都是救急线程,每个任务都会创建一个新的线程来获取任务,合适

评价

整个线程池会不断创建新的线程,任务量有多少线程就会创建多少,然后空闲60s后释放线程

适用于任务量比较密集,但是任务执行时间比较短的情况

5)newSingleThreadExecutor (单线程线程池)

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

使用场景:

希望多个任务串行执行。线程数固定为1,当任务数量多于1时,会存放到阻塞队列中(无界)。任务执行完毕,线程也不会被摧毁释放。

那么跟 固定大小线程池设置成1 或者 自己创建一个线程执行 有什么区别

  • 自己创建一个线程执行,如果中间发生了异常,那么线程销毁了,阻塞队列中的任务就不执行了,没有什么补救措施,但是如果单线程线程池还会创建一个新的线程,保证正常工作。
  • newFixedThreadPool(1) 固定大小线程池设置成1,返回的对象还是可以修改的
    • 返回的对象还是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize 等方法修改。
  • 单线程线程池 newSingleThreadExecutor() 始终线程数量为1,不能修改
    • 返回的对象 通过new FinalizableDelegatedExecutorService进行了包装,应用了一种装饰器模式,只对外暴露 ExecutorService 接口, 因此不能调用 ThreadPoolExecutor 中特有方法修改。

6)提交任务

1. ALL

2. submit 演示

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2SubmitShow {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建固定大小线程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        
        //带有返回结果的submit执行,可以使用lambda表达式
        Future<String> future = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.debug("执行");
                Thread.sleep(1000);
                return "ok";
            }
        });
        //获取值,会等待submit执行完,相当于保护性暂停模式
        log.debug("{}", future.get());
    }
}

3. invokeAll 演示

import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2Show {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建固定大小线程池
        ExecutorService pool = Executors.newFixedThreadPool(3);
        // 调用 invokeAll 执行一个链表中的所有任务,然后将所有返回值收集成一个list 类型是Future
        // 可以设置超时时间, 如果规定时间内没有执行完所有任务,直接终止
        List<Future<Object>> futures = pool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(2000);
                    return "3";
                }
        ));
        // 便利所有Future,等待所有执行完毕之后输出
        // 如果线程数量是2 任务3进入队列等 所以等待时间 500 + 2000
        // 如果线程数量是3 任务3直接与信工 等待时间 2000
        futures.forEach(f -> {
            try {
                log.debug("{}", f.get());
            } catch (ExecutionException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

    }
}

4. invokeAny 演示

import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2SubmitShow {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建固定大小线程池
        ExecutorService pool = Executors.newFixedThreadPool(3);
        // 调用 invokeAny 执行一个链表中的所有任务,然后将执行最快的一个任务的返回值返回,然后其他任务不执行了
        // 如果只有一个线程, 那么只有第一个任务被执行,所以只会返回第一个任务的结果
        String result = pool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("begin");
                    Thread.sleep(1000);
                    log.debug("end");
                    return "1";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(500);
                    log.debug("end");
                    return "2";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(2000);
                    log.debug("end");
                    return "3";
                }
        ));
        //最后拿到 的是2  然后1 跟 3的end没输出
        log.debug("{}", result);

    }
}

7)关闭线程池

shutdown

showdownNow

其他方法

三、设计模式 之 异步模式 之 工作线程

1) 定义

  • 让有限的工作线程(Worker Thread)可以轮流异步处理无限多的任务,也可以归类为分工模式。
  • 他的典型实现是 线程池,体现了经典设计模式中的享元模式。

例如:餐厅的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客户分配一位服务员,那么成本就太高了

(对比另外一种多线程设计模式:Thread-Pre-Message 这个是一个任务创建一个新线程处理)

注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。

例如:如果一个餐厅的工人既要招呼客人(任务类型A),又要后厨做菜(任务类型B) 显然效率很低,因为他全都要干,

这时候如果分成招呼客人一批人(线程池A),做菜一批人(线程池B)更为合理,更细致的分工效率跟更高。

2)饥饿

这里的饥饿跟 synchronized 那里的饥饿有点区别,这里不是因为锁导致的,而是因为线程不足导致的,现象类似于死锁,但是不是锁导致的。

一般是固定大小线程池有饥饿现象。

  • 有两个工人在同一个线程池中表示两个线程。
  • 两个工人是全能的,既能做饭,又能处理点餐,有以下两种阶段情景
    • 1、有一个客人点餐,工人A处理点餐,等待上菜,工人B做菜,工人A上菜,一气呵成,分工合作很顺利
    • 2、有两个客人同时点餐,工人A和工人B同时处理点餐,同时等待上菜,没有额外工人(线程)去做菜,这时候就死锁了。

情况1


import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

@Slf4j(topic = "c.pool")
public class Test3Executor {
    static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // 有人点餐,执行任务
        pool.execute(() -> {
            log.debug("点餐");
            Future<String> future = pool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    log.debug("做菜");
                    Thread.sleep(1000);
                    return list.get(0);
                }
            });
            try {
                log.debug("做完{}", future.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }
}
// 21:17:54.256 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:17:54.261 [pool-1-thread-2] DEBUG c.pool - 做菜
// 21:17:55.262 [pool-1-thread-1] DEBUG c.pool - 做完红烧牛肉1

情况2:

import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

@Slf4j(topic = "c.pool")
public class Test3Executor {
    static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // 有人点餐,执行任务
        pool.execute(() -> {
            log.debug("点餐");
            Future<String> future = pool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    log.debug("做菜");
                    Thread.sleep(1000);
                    return list.get(0);
                }
            });
            try {
                log.debug("做完{}", future.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
        // 有人点餐,执行任务
        pool.execute(() -> {
            log.debug("点餐");
            Future<String> future = pool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    log.debug("做菜");
                    Thread.sleep(1000);
                    return list.get(0);
                }
            });
            try {
                log.debug("做完{}", future.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });

    }
}
// 21:19:36.930 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:19:36.930 [pool-1-thread-2] DEBUG c.pool - 点餐

3)解决饥饿

解决1(不全面):

  • 增加核心线程来解决,但是如果同时比较多,还是没有办法解决。

解决2(全面):

  • 不同功能的线程放到不同线程池
package com.itheima.test8;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

@Slf4j(topic = "c.pool")
public class Test3Executor {
    static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");

    public static void main(String[] args) {
        ExecutorService cookPool = Executors.newFixedThreadPool(1);
        ExecutorService waitPool = Executors.newFixedThreadPool(1);
        // 有人点餐,执行任务
        waitPool.execute(() -> {
            log.debug("点餐1");
            Future<String> future = cookPool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    log.debug("做菜1");
                    Thread.sleep(1000);
                    return list.get(0);
                }
            });
            try {
                log.debug("做完1{}", future.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
        // 有人点餐,执行任务
        waitPool.execute(() -> {
            log.debug("点餐2");
            Future<String> future = cookPool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    log.debug("做菜2");
                    Thread.sleep(1000);
                    return list.get(2);
                }
            });
            try {
                log.debug("做完2{}", future.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });

    }
}
// 21:35:52.924 [pool-2-thread-1] DEBUG c.pool - 点餐1
// 21:35:52.924 [pool-2-thread-2] DEBUG c.pool - 点餐2
// 21:35:52.927 [pool-1-thread-1] DEBUG c.pool - 做菜1
// 21:35:53.928 [pool-1-thread-1] DEBUG c.pool - 做菜2
// 21:35:53.928 [pool-2-thread-1] DEBUG c.pool - 做完1红烧牛肉1
// 21:35:54.930 [pool-2-thread-2] DEBUG c.pool - 做完2牛肉3

4)创建多少线程池合适?

  • 过小会导致不能充分利用cpu系统资源,容易导致饥饿
  • 过大会导致频繁发生线程上下文切换,占用更多内存

1. CPU 密集型运算

通常采用 cpu核数 + 1 能够实现最优的CPU利用率, +1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,

额外的这个线程能够顶上去,保证CPU时钟周期不被浪费。常用于数据分析时。

2. I/O 密集型运算

CPU 不总是在繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但是当你执行I/O操作时、远程RPC调用、数据库操作时等,这时候CPU资源就空闲了

你可以利用多线程提高他的利用率。

经验公式如下:

  • 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间 +等待时间) / CPU 计算时间

例如

4 核CPU计算时间是50%,其他等待时间是50%,期望cpu被100%利用, 套用公式

4 * 100% * 100% / 50% = 8

四、任务调度线程池

有些时候我们需要延迟执行任务(延迟几秒后执行),或者需要循环执行任务(每隔几秒执行一次),这时候就需要用到任务调度的线程池了。

JDK5的时候加入任务调度线程池

JDK5之前 可以使用java.util.Timer 来实现定时功能

1. Timer

优点:简单易用,

缺点:所有任务都由一个独立线程执行,所有任务都是串行执行的,如果中间有一个任务发生异常,后面的任务也不会运行了。

@Slf4j(topic = "c.pool")
public class Test4 {
    public static void main(String[] args) {
        Timer timer = new Timer();

        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                log.debug("task 1");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        TimerTask task2 = new TimerTask() {
            @Override
            public void run() {
                log.debug("task 2");
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        log.debug("start");
        // 一秒后执行 task1 task2
        timer.schedule(task1, 1000);
        timer.schedule(task2, 1000);
    }
}
// 16:17:24.730 [main] DEBUG c.pool - start
// 16:17:25.734 [Timer-0] DEBUG c.pool - task 1
// 16:17:27.735 [Timer-0] DEBUG c.pool - task 2

2. ScheduledExecutorService

解决了Timer的缺点,抛出异常不会终止其他任务,还可以设置多个线程参与调度。

如果ScheduledExecutorService(1) 设置成1,那么也是串行执行,但是不会终止其他任务

a. 延迟任务
package com.itheima.test8;

import lombok.extern.slf4j.Slf4j;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Slf4j(topic = "c.pool")
public class Test4 {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        pool.schedule(() -> {
            log.debug("task1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 1, TimeUnit.SECONDS);
        pool.schedule(() -> {
            log.debug("task2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 1, TimeUnit.SECONDS);
    }
}
// 16:23:45.724 [pool-1-thread-1] DEBUG c.pool - task1
// 16:23:45.724 [pool-1-thread-2] DEBUG c.pool - task2
b. 定时任务
ⅰ. scheduleAtFixedRate
@Slf4j(topic = "c.pool")
public class Test4 {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        log.debug("start");
        // 延迟1秒执行,然后每隔1秒执行一次
        // 如果任务的延迟比定时还长,那么所有任务会在上一个任务执行完之后 紧接着运行
        pool.scheduleAtFixedRate(() -> {
            log.debug("task1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 1, 1, TimeUnit.SECONDS);
    }
}
// 16:33:06.263 [main] DEBUG c.pool - start
// 16:33:07.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:09.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:11.303 [pool-1-thread-1] DEBUG c.pool - task1
ⅱ. scheduleWithFixedDelay
@Slf4j(topic = "c.pool")
public class Test4 {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        log.debug("start");
        // 不管任务执行多长时间,都会等他工作完成之后再延迟一秒执行一次
        pool.scheduleWithFixedDelay(() -> {
            log.debug("task1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 1, 1, TimeUnit.SECONDS);
    }
}
// 16:35:55.014 [main] DEBUG c.pool - start
// 16:35:56.048 [pool-1-thread-1] DEBUG c.pool - task1
// 16:35:59.051 [pool-1-thread-1] DEBUG c.pool - task1
// 16:36:02.051 [pool-1-thread-1] DEBUG c.pool - task1

正确处理线程池异常

  • try catch 手动捕捉
  • 普通线程池可以使用配合feture的返回值get()等待,如果有异常,返回值会是异常描述

3. 线程池应用

如何让每周日 18:00:00 定时执行任务?

import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestTaskRun {
    // 如何让每周日 18:00:00 定时执行任务?
    public static void main(String[] args) {
        // 获取当前时间
        LocalDateTime now = LocalDateTime.now();
        System.out.println("now = " + now);
        // 获取本周周日时间
        LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
        // 如果当前时间 > 本周周四,那么获取的time是不对的,需要是下周的
        if (now.isAfter(time)) {
            time = time.plusWeeks(1);
        }
        System.out.println("time = " + time);
        long period = 1000 * 60 * 60 * 24 * 7; // 一周时间
        long initialDelay = Duration.between(now, time).toMillis();  //获取两个时间之间的毫秒差值
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        pool.scheduleAtFixedRate(() -> {
            System.out.println("running...");
        }, initialDelay, period, TimeUnit.MILLISECONDS);
    }
}

五、Tomcat 线程池

浏览器发出一个请求

首先会经过LimitLatch(作用是限流,控制最大连接数,防止太多连接把服务器压垮),类似于J.U.C 中的 Semaphore 后面再讲

当没有超过最大连接数,到达第二个组件 Acceptor(本质上是一个线程,不断循环看有没有新的连接,主要就是负责接受socket连接,其他不管)

然后Poller (也是一个线程,死循环看连接上是否有这种可读的事件发生,但是也是负责监听是否有,如果有交给一个Executor来执行)来负责看是否有IO的读写操作

一旦Poller 发现可读,会封装一个任务对象(socketProcessor实现了runnable),提交个Executor线程池处理

Executor 线程池中的工作线程(核心救急线程模式)最终负责【处理请求】

Tomcat 线程池扩展了 ThreadPoolExecutor ,行为稍有不同

  • 如果总线程数达到 最大线程数 并且队列也满了
    • 这时不会立刻抛出 RejectedExecutionException 异常
    • 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

源码 tomcat- 7.0.42

tomcat 线程池配置

tomcat 线程池相对于 java原生线程池,tomcat进行了一些改进,主要体现在核心在于Tomcat自己定义的任务队列。

TaskQueue —— offer(Runnable o)方法

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    private transient volatile ThreadPoolExecutor parent = null;

    public TaskQueue() {
        super();
    }

    public void setParent(ThreadPoolExecutor tp) {
        parent = tp;
    }
    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) {
            return super.offer(o);
        }
        //we are maxed out on threads, simply queue the object
        // 如果线程数量已经达到最大数量,则进入队列等待执行
        if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) {
            return super.offer(o);
        }
        //we have idle threads, just add it to the queue
        // 执行到这里 最大线程数 > 当前线程数 > 核心线程数。
        // 如果提交的任务数 <= 当前线程数则说明存在空闲线程,则提交到任务队列,等待执行
        if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) {
            return super.offer(o);
        }
        //if we have less threads than maximum force creation of a new thread
        // 执行到这,说明提交的任务数已经大于当前线程数,需要创建新的线程
        if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) {
            return false;
        }
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }
}
  • submittedCount:Tomcat自定义的ThreadPoolExecutor中使用该字段来标记当前已提交的任务数,提交加一,执行完减一。
  • poolSizeNoLock:当前执行的线程数

  • 判断当前是否需要创建空闲线程执行任务,而不是全放入队列,因为LinkedBlockingQueue默认是无界的,但也可以设计最大数量;

六、Fork / Join线程池

1)概念

  • JDK 1.7 之后加入的新的线程池实现, 使用了分治思想,适用于能够进行任务拆分的 cpu 密集型运算。
  • 所谓任务拆分,是一个大任务拆分算法上相同的小任务,直到不能再拆分然后求解,最后合并,类似于归并排序,裴波那契数列等思想
  • 分治思想的基础上加入了多线程,每个任务的分解和合并交给不同线程执行,进一步提升运算效率。
  • 默认会创建与 cpu 核数大小相同的线程池。

2)使用

1. 拆分不完善(导致串行)

package com.itheima.test8;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

@Slf4j(topic = "c.TestForkJoin")
public class TestForkJoin {
    public static void main(String[] args) {
        ForkJoinPool poll = new ForkJoinPool(4);
        // 运行调用很简单
        System.out.println(poll.invoke(new MyTask(5)));
    }
}

@Slf4j(topic = "c.MyTask")
// 计算机1 - n之间的和
class MyTask extends RecursiveTask<Integer> {

    private int n;

    public MyTask(int n) {
        this.n = n;
    }

    @Override
    public String toString() {
        return "MyTask{" +
                "n=" + n +
                '}';
    }

    @Override
    protected Integer compute() {
        // 递归结束条件
        if (n == 1) {
            log.debug("join() {}", n);
            return 1;
        }

        //分解成子问题
        MyTask myTask = new MyTask(n - 1);
        myTask.fork();   //分配线程执行
        log.debug("fork() {} + {}", n, myTask);
        //获取线程返回结果
        Integer join = myTask.join();
        log.debug("join() {} + {} = {}", n, myTask, n + join);
        return n + join;  //然后拼接子问题返回答案
    }
}
// 21:07:44.838 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - fork() 5 + MyTask{n=4}
// 21:07:44.838 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - fork() 4 + MyTask{n=3}
// 21:07:44.838 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - fork() 2 + MyTask{n=1}
// 21:07:44.838 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - fork() 3 + MyTask{n=2}
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 1
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 2 + MyTask{n=1} = 3
// 21:07:44.842 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - join() 3 + MyTask{n=2} = 6
// 21:07:44.842 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - join() 4 + MyTask{n=3} = 10
// 21:07:44.842 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - join() 5 + MyTask{n=4} = 15
// 15

2. 分治思想(并行)

package com.itheima.test8;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

@Slf4j(topic = "c.TestForkJoin")
public class TestForkJoin {
    public static void main(String[] args) {
        ForkJoinPool poll = new ForkJoinPool(4);
        // 运行调用很简单
        System.out.println(poll.invoke(new AddTask(1, 5)));
    }
}
@Slf4j(topic = "c.AddTask")
// 计算机1 - n之间的和
class AddTask extends RecursiveTask<Integer> {

    private int start;
    private int end;

    public AddTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    public String toString() {
        return "AddTask{" +
                "start=" + start +
                ", end=" + end +
                '}';
    }

    @Override
    protected Integer compute() {
        // 递归结束条件
        if (start == end) {
            log.debug("join() {}", start);
            return start;
        }
        if (end == start + 1) {
            log.debug("join() {}", end + start);
            return end + start;
        }

        //分解成子问题
        int mid = (start + end) / 2;
        AddTask addTask1 = new AddTask(start, mid);
        addTask1.fork();   //分配线程执行
        log.debug("fork()  {}", addTask1);
        AddTask addTask2 = new AddTask(mid + 1, end);
        addTask2.fork();   //分配线程执行
        log.debug("fork()  {}", addTask2);
        //获取线程返回结果
        Integer join = addTask1.join() + addTask2.join();
        log.debug("join() {} , {} = {}", start, end, join);
        return join;  //然后拼接子问题返回答案
    }
}

七、AQS原理

1)概述

2)自定义锁

自定义一个同步器组件,例如可重入锁那些都是这么做。在你自定义的同步器组件内部定义一个AQS类的内部类,所有操作交给这个内部类完成,我们的自定义同步组件只需要调用这个内部类的方法

这里要注意,acquire()是AQS实现的模板方法可以直接用,tryAquire()是自定义的加锁方法, 模板会调用此自定义实现方法

它的acquire、release方法是aqs的固定机制,比如怎么把线程阻塞,怎么加入队列等等,而tryxx是自己实现的,自己决定是否可重入,是否可共享等等

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j(topic = "c.MyLock")
public class Test6Lock {
    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(() -> {
            lock.lock();
            log.debug("lock");
            lock.lock();
            log.debug("lock");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                log.debug("unlock");
                lock.unlock();
            }
        }).start();
//        new Thread(() -> {
//            lock.lock();
//            log.debug("lock");
//            try {
//                Thread.sleep(1);
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            } finally {
//                log.debug("unlock");
//                lock.unlock();
//            }
//        }).start();
    }
}

// 自定义锁 (不可重入锁)
class MyLock implements Lock {
    class MyAQS extends AbstractQueuedSynchronizer {
        // 独占锁  同步器类
        @Override
        protected boolean tryAcquire(int arg) {
            // cas 操作是否可以获得锁
            if (compareAndSetState(0, 1)) {
                //然后锁的Owner 是当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 释放锁之后清空Owner
            setExclusiveOwnerThread(null);
            // 因为是volatile 所以因为写屏障,所以前面不会重排指令到state后面,同步到主存
            setState(0);
            return true;
        }

        @Override  //是否持有独占锁
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public Condition newCondition() {
            return new ConditionObject();
        }

    }
    MyAQS sync = new MyAQS();

    @Override  //加锁 (加锁失败,进入队列)
    public void lock() {
        sync.acquire(1);
    }

    @Override  // 加锁,但是可以打断
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override // 尝试加锁(一次)
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override  // 尝试加锁,带超时
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override  //释放锁
    public void unlock() {
        sync.release(1);  //释放锁,底层还会唤醒等待的线程
    }

    @Override   //创建条件变量
    public Condition newCondition() {
        return sync.newCondition();
    }
}

八、ReentrantLock 原理

1)加锁成功流程

非公平锁实现原理:

加锁解锁流程:

构造器方法,默认是非公平锁实现

public ReentrantLock() {
	sync = new NonfairSync();
}

NonfairSync 继承自 AQS,原理类似于刚才的AQS实现自定义锁

2)加锁失败流程

第一次尝试:如果已经被别人加锁了,调用acquire

第二次尝试:如果这时候加锁的人已经释放,则不需要继续往下,否则就创建AddWaiter构造一个链表(即head、tail),添加一个Node链表节点,加入队列

  • 首次创建会创建两个,第一个Node称为 Dummy(哑元)或哨兵,用来占位,并不关联线程。
  • 黄色三角形表示该Node的 waitStatus 状态,默认 0 为正常状态

进入acquireQueue () 逻辑

第三个尝试:循环判断当前节点的前驱节点是不是头结点,说明他是第一个节点,有资格去再次尝试获取锁

进入 shouldParkAfterFailAcquire()逻辑

把前驱节点改成状态改成 -1,代表前驱节点有资格唤醒后继结点,因为如果进行park ,需要有人唤醒,第一次会方法会返回false

第四次尝试:再次尝试获取,获取失败,再次进入shouldParkAfterFailAcquire,因为前驱节点node已经是状态-1,所以这次返回true,进行一个park,阻塞住

3)解锁竞争成功流程

如果有多个线程经历上述过程竞争失败,变成这个样子

这时候 Thread - 0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

然后判断头结点是不是不为空,并且 waitStatus 状态不为0,那么就执行 unparkSuccessor 唤醒距离头结点最近的节点,然后他有资格继续尝试获取锁

获取到锁之后,设置state = 1, 并且 exclusiveOwnerThread = 当前线程,然后将头结点替换成当前节点 】

4)解锁竞争失败流程

因为是非公平锁,此时有另外一个线程Thread4,不是队列中的,而是第一次访问的线程,那么就会发生一个竞争

Thread-1 如果竞争失败,Thread-4 的线程就会设置成 exclusiveOwnerThread ,state = 1

Thread - 1 竞争失败再次进入 acquireQueue 流程,获取锁失败,重新进入 park 阻塞

公平锁的意思也就是 新线程来的时候会不会直接加入队列中。

5)锁重入原理

6)可打断原理

不可打断锁原理:

打断不会影响到获取锁停止,只有获得锁之后才会返回打断标记

可打断原理:

如果打断之后直接抛出异常,不会继续获取锁了

7)公平锁原理

公平锁原理:新线程进来获取锁的时候,判断是不是队列中有线程等待,来判断是否可以进行竞争锁。

8)条件变量 - await

每个条件变量其实对应着一个等待队列, 其实现类是 ConditionObject

开始Thread-0持有锁,调用await,进入 COnditionObject 的 addConditionWaiter 流程

创建 新的 Node 状态为 -2(Node.CONDITION) ,关联 Thread-0,加入等待队列尾部

然后进入 AQS 的fullyRelease 流程, 释放同步器上的锁,也就是当前线程的所有锁

为什么是fullyRelease 不是 因为可能发生了锁重入,所以需要释放所有锁

state = 0, exclusiveOwnerThread = null

然后unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么Thread - 1 竞争成功。

state = 1, exclusiveOwnerThread = thread1

9)条件变量 - signal

假设 Thread- 1 调用 signal 要来唤醒 条件变量中等待的 Thread - 0

调用 signal 方法,然后唤醒 条件变量中的 第一个节点,然后条件变量中的当前点 置为null,

转移成功 然后加入到等待队列队尾等待

转移失败 因为有些时候有时限的等待超时,或者被打断,那么就没有必要加入到队列队尾

加入队列,获取前一个节点,然后设置state 为 -1,因为最后一个是0(前一个节点设置成-1,表示有资格唤醒unpark新加入的线程)

如果前一个节点的状态 > 0 ,表示被取消了,那么需要唤醒当前线程,去找能够当做前驱节点的点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/363650.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【多个SpringBoot模块项目如何变成聚合项目】

【前言】 项目虽然是Eureka、OpenFeign 进行服务注册和服务调用&#xff0c;但是每个模块都是一个单独的SpringBoot&#xff0c;启动每个模块都需要单独启动一个idea,觉得这个过于繁琐&#xff0c;现在想把项目变成一个聚合项目&#xff0c;只需要启动一个idea即可。 【过程】…

09. 异常处理

目录 1、前言 2、常见的异常 3、异常处理try...except...finally 4、异常信息解读 5、raise 6、自定义异常 7、小结 1、前言 在编程中&#xff0c;异常&#xff08;Exception&#xff09;是程序在运行期间检测到的错误或异常状况。当程序执行过程中发生了一些无法继续执…

vue前端html导出pdf

package.json中添加依赖 调用方&#xff1a; import htmlToPdf from ../../../utils/file/htmlToPdf.js// 下载方法&#xff0c;pdfDownloadDpi为onClickDownLoad() {htmlToPdf.getPdf(标题1, jsfgyzcpgxmShow, this.pdfDownloadDpi)}htmlToPdf.js // 页面导出为pdf格式 imp…

Backtrader 文档学习- Broker - Cheat-On-Open

Backtrader 文档学习- Broker - Cheat-On-Open 1.概述 V1.9.44.116增加了Cheat On Open的支持。对于全押的人来说&#xff0c;这似乎是一个必需的功能&#xff0c;用bar的收盘价后进行计算&#xff0c;希望与开盘价相匹配。 当开盘价差距&#xff08;上涨或下跌&#xff0c;取…

杂题——试题-算法训练-P0602

分析&#xff1a; 把要重排序的数字转成数组对数组进行排序&#xff0c;从小到大排序数组转成字符串&#xff0c;字符串转成数字&#xff0c;得到最小数再把最小数的字符串反转&#xff0c;得到最大数注意&#xff1a; 在java语言中&#xff0c;如果使用Arrays.toString(digits…

DevOps系列文章之 Git命令:过滤日志

使用git log命令查找特定提交历史的高级技巧。其中的一些技巧配合格式化日志命令使用有奇效。 按照数量过滤 使用git log命令时最基本的过滤方式就是按照可以显示的日志条数进行过滤。如果你只对最近几次提交感兴趣&#xff0c;这会节省在页面上查看所有提交的麻烦。 git lo…

成熟的汽车制造供应商协同平台 要具备哪些功能特性?

汽车行业是一个产业链长且“重”的行业&#xff0c;整个业务流程包括了研发、设计、采购、库存、生产、销售、售后等一系列环节&#xff0c;在每一个环节都涉及到很多信息交换的需求。对内要保证研发、采购、营销等业务环节信息流通高效安全&#xff0c;对外要与上、下游合作伙…

springboot-前后端分离——第一篇

本篇主要对前后端分离的一些基础知识进行总结&#xff0c;主要对HTTP请求协议、HTTP响应格式、Http协议解析等进行总结。重点在于简单了解前端如何向服务端发送请求&#xff0c;服务端如何接收请求并返回响应结果。 一、简单案例&#xff1a; 首先创建一个springboot项目&…

使用pygame建立一个简单的使用键盘方向键移动的方块小游戏

import pygame import sys# 初始化pygame pygame.init()# 设置窗口大小 screen_size (640, 480) # 创建窗口 screen pygame.display.set_mode(screen_size) # 设置窗口标题 pygame.display.set_caption("使用键盘方向键移动的方块的简单小游戏")# 设置颜色 bg_colo…

帅气的性能监控平台Grafana(Windows下使用Grafana监控系统指标与GPU指标)

帅气的性能监控平台Grafana&#xff08;Windows下使用Grafana监控系统指标与GPU指标&#xff09; 前情提要 系统环境准备 windows_exporter下载 nvidia_gpu_exporter下载 prometheus下载 Grafana下载 安装指导 windows_exporter安装与nvidia_gpu_exporter安装 promethe…

ApacheNginx配置ssl证书

一、Apache配置ssl Linux版本&#xff1a;CentOS Linux release 7.9.2009 (Core) Apache版本&#xff1a;Apache/2.4.6 (CentOS) 1、安装Apache&#xff08;使用默认yum源&#xff09; [root10-35-1-25 ~]# yum -y install httpd2、查Apache版本&启动Apache [root10-35-…

深度解读NVMe计算存储协议-2

近日&#xff0c;NVME协议组织为了解决这些性能问题并为供应商提供标准化机制&#xff0c;在其架构中集成优化的计算功能&#xff0c;开发了NVM Express (NVMe) 计算存储特性。 计算存储的核心特性包括两个命令集&#xff1a;计算程序集和子系统本地内存。 其中&#xff0c;计算…

postgresql|数据库|pg_repack插件的部署和使用

一&#xff0c; 表和索引的膨胀现象 Postgres SQL 实现的MVCC的机制不同于 oracle &#xff0c; mysql innodb 的 undo tablespace 的机制。 表上所用的更新和删除等操作的行为&#xff0c;都不会实际的删除或修改&#xff0c;而是标记为死元祖 &#xff08;dead rows or dead…

非鸿蒙官方低代码源码生成器

介绍 鸿蒙低代码可视化开发神器快速对鸿蒙ArkUI生成源码&#xff0c;结合类似小程序类似设计&#xff0c;页面设计底部菜单&#xff0c;支持宫格组件、轮播图、图文列表、图片组件、文本内容组件&#xff0c;快速对接第三方HttpApi。通过鸿蒙扩展axios扩展库加载数据源&#x…

jmeter+nmon+crontab简单的执行接口定时压测

一、概述 临时接到任务要对系统的接口进行压测&#xff0c;上面的要求就是&#xff1a;压测&#xff0c;并发2000 在不熟悉系统的情况下&#xff0c;按目前的需求&#xff0c;需要做的步骤&#xff1a; 需要有接口脚本需要能监控系统性能需要能定时执行脚本 二、观察 >…

Spring的事件监听机制

这里写自定义目录标题 1. 概述&#xff08;重点&#xff09;2. ApplicationEventMulticaster2.1 SimpleApplicationEventMulticaster2.2 AbstractApplicationEventMulticaster 3. ApplicationListener3.1 注册监听器3.2 自定义 4. SpringApplicationRunListeners 1. 概述&#…

协会认证!百望云荣获信创工委会年度“卓越贡献成员单位”称号

当前&#xff0c;新一轮科技革命和产业变革正加速重塑全球经济结构&#xff0c;强化企业科技创新的主体地位&#xff0c;推动创新链、产业链、人才链深度融合&#xff0c;加快科技成果产业化进程至关重要。 近日&#xff0c;中国电子工业标准化技术协会信息技术应用创新工作委员…

对付勒索病毒,复杂的往往无法落地

一道道复杂门墙防护安全&#xff0c; 还是一个精密的锁更安全&#xff1f; &#x1f447;&#x1f447;&#x1f447; 在网络数据安全问题频发的当下&#xff0c;除了常规的备份、灾备措施以外&#xff0c;企业是否有做好应对最坏情况的准备&#xff1f;一旦病毒绕过了一道道…

shell - 免交互

一.Here Document 免交互 1. 交互的概念 交互&#xff1a;当计算机播放某多媒体程序的时候&#xff0c;编程人员可以发出指令控制该程序的运行&#xff0c;而不是程序单方面执行下去&#xff0c;程序在接受到编程人员相应的指令后而相应地做出反应。 对于Linux操作系统中&…

ztest中ddof起什么作用

⭐️ statsmodels 中 ztest 基本使用 statsmodels 也是一个强大的统计分析库&#xff0c;提供了丰富的统计模型和检验功能。对于 Z 检验&#xff0c;statsmodels 提供了 ztest 函数。 以下是使用 statsmodels 进行 Z 检验的示例&#xff1a; from statsmodels.stats.weights…