• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

按照执行时间轮询任务(串行或并行)

互联网 diligentman 3天前 8次浏览

自己设计的,不知道有没有坑,慎用

package cn.cloudwalk.stars.service.dealy;

import cn.cloudwalk.stars.service.dealy.cruise.CruisePoint;
import cn.cloudwalk.stars.service.dealy.cruise.CruiseRoi;
import cn.cloudwalk.stars.service.dealy.cruise.CruiseTask;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.*;

/**
 * 任务轮询(串行或并行)
 * 1.启动任务轮询 2。关闭任务轮询 3.查询当前被轮询的任务
 */
@Component
public class RoundRobinTask {

    private static ConcurrentHashMap<String, Set<ConcurrentLinkedQueue<CruiseRoi>>> taskRoiMap = new ConcurrentHashMap();
    private static ConcurrentHashMap<String, Set<DelayQueue>> taskDelayMap = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, Set<Thread>> taskThreadMap = new ConcurrentHashMap<>();
    static ExecutorService executor = Executors.newFixedThreadPool(100);

    /**
     * 开启串行任务
     *
     * @param cruiseTask
     */
    public static void startSerialTask(CruiseTask cruiseTask) throws InterruptedException {

        ConcurrentLinkedQueue<CruiseRoi> roiList = new ConcurrentLinkedQueue<>();
        //获取点位
        List<CruisePoint> pointList = cruiseTask.getPointList();
        pointList.stream().sorted(Comparator.comparing(CruisePoint::getSortNum));
        //遍历点位列表,获取ROI列表
        int index = 0;
        for (int i = 0; i < pointList.size(); i++) {
            List<CruiseRoi> rois = pointList.get(i).getRoiList();
            if (!CollectionUtils.isEmpty(rois)) {
                roiList.stream().sorted(Comparator.comparing(CruiseRoi::getSortNum));
                for (int j = 0; j < rois.size(); j++) {
                    index++;
                    //修改roi在taskList中的顺序
                    CruiseRoi cruiseRoi = rois.get(j);
                    cruiseRoi.setSortNum(index);
                    roiList.add(cruiseRoi);
                }
            }
        }
        //维护预案对应的roiList
        Set<ConcurrentLinkedQueue<CruiseRoi>> roiSet = new HashSet<>();
        roiSet.add(roiList);
        taskRoiMap.put(cruiseTask.getId(), roiSet);

        //轮询roiList
        executor.execute(new Thread(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processTask(cruiseTask.getId(), roiList);
                            System.out.println(Thread.currentThread().getName() + "线程执行完毕");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
        ));
    }

    /**
     * 开启并行任务
     *
     * @param cruiseTask
     */
    public static void startParallelTask(CruiseTask cruiseTask) throws InterruptedException {
        //roi集合
        Set<ConcurrentLinkedQueue<CruiseRoi>> roiSet = new HashSet<>();
        //获取点位
        List<CruisePoint> pointList = cruiseTask.getPointList();
        pointList.stream().sorted(Comparator.comparing(CruisePoint::getSortNum));
        //遍历点位列表,获取ROI列表

        for (int i = 0; i < pointList.size(); i++) {
            ConcurrentLinkedQueue<CruiseRoi> roiList = new ConcurrentLinkedQueue<>();
            List<CruiseRoi> rois = pointList.get(i).getRoiList();
            if (!CollectionUtils.isEmpty(rois)) {
                roiList.stream().sorted(Comparator.comparing(CruiseRoi::getSortNum));
                int index = 0;
                for (int j = 0; j < rois.size(); j++) {
                    index++;
                    //修改roi在taskList中的顺序
                    CruiseRoi cruiseRoi = rois.get(j);
                    cruiseRoi.setSortNum(index);
                    roiList.add(cruiseRoi);
                }
                roiSet.add(roiList);
            }
        }
        //维护预案对应的roiSet
        taskRoiMap.put(cruiseTask.getId(), roiSet);
        roiSet.parallelStream().forEach(roiList ->
                //轮询roiList
                executor.execute(new Thread(
                        new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    processTask(cruiseTask.getId(), roiList);
                                    System.out.println(Thread.currentThread().getName() + "线程执行完毕");
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                )));
    }

    /**
     * 任务处理
     */
    private static void processTask(String taskId, ConcurrentLinkedQueue<CruiseRoi> roiList) throws InterruptedException {
        //获取当前要遍历预案的taskList
        try {
            //模拟当前时间
            int currentTime = 2;
            // 存入线程集合
            if (!CollectionUtils.isEmpty(taskThreadMap.get(taskId))) {
                Set<Thread> threads = taskThreadMap.get(taskId);
                threads.add(Thread.currentThread());
                taskThreadMap.put(taskId, threads);
            } else {
                Set<Thread> threads = new HashSet<>();
                threads.add(Thread.currentThread());
                taskThreadMap.put(taskId, threads);
            }
            DelayQueue delayQueue = new DelayQueue();
            if (!CollectionUtils.isEmpty(taskDelayMap.get(taskId))) {
                Set<DelayQueue> delayQueues = taskDelayMap.get(taskId);
                delayQueues.add(delayQueue);
                taskDelayMap.put(taskId, delayQueues);
            } else {
                Set<DelayQueue> delayQueues = new HashSet<>();
                delayQueues.add(delayQueue);
                taskDelayMap.put(taskId, delayQueues);
            }

            while (roiList != null &&
                    !roiList.isEmpty() &&
                    !Thread.currentThread().isInterrupted()) {
                //取出队头的roi,进行执行判断
                CruiseRoi cruiseRoi = roiList.peek();

                if (currentTime < cruiseRoi.getStartTime() || currentTime > cruiseRoi.getEndTime()) {
                    //不在有效时间内,则跳过该围界
                    System.out.println("跳过围界" + cruiseRoi);
                    CruiseRoi pollRoi = roiList.poll();
                    roiList.offer(pollRoi);
                } else {
                    //放入延时队列,等待该任务执行时间过去
                    System.out.println("执行围界" + cruiseRoi);
                    DelayTask delayTask = new DelayTask(cruiseRoi, cruiseRoi.getPeriod());
                    delayQueue.put(delayTask);
                    delayQueue.take();
                    //执行完成放入队尾
                    if (roiList != null) {
                        CruiseRoi pollRoi = roiList.poll();
                        roiList.offer(pollRoi);
                    }
                }
            }
        } catch (Exception e) {
            //e.printStackTrace();
        }

    }


    //关闭预案
    public static void stopTask(String taskId) {
        try {
            //关闭线程
            Iterator<Map.Entry<String, Set<Thread>>> threadIterator = taskThreadMap.entrySet().iterator();
            while (threadIterator.hasNext()) {
                Map.Entry<String, Set<Thread>> next = threadIterator.next();
                if (next.getKey().equals(taskId)) {
                    Set<Thread> threads = next.getValue();
                    threads.parallelStream().forEach(thread -> {
                        thread.interrupt();
                    });
                    threadIterator.remove();
                }
            }

        } catch (Exception e) {
            //e.printStackTrace();
        }

        //清空延时队列
        Iterator<Map.Entry<String, Set<DelayQueue>>> delayIterator = taskDelayMap.entrySet().iterator();
        while (delayIterator.hasNext()) {
            Map.Entry<String, Set<DelayQueue>> next = delayIterator.next();
            if (next.getKey().equals(taskId)) {
                Set<DelayQueue> delayQueues = next.getValue();
                delayQueues.parallelStream().forEach(
                        delayQueue -> delayQueue = null
                );
                delayIterator.remove();
            }
        }

        //清空roiList
        Iterator<Map.Entry<String, Set<ConcurrentLinkedQueue<CruiseRoi>>>> roiIterator = taskRoiMap.entrySet().iterator();
        while (roiIterator.hasNext()) {
            Map.Entry<String, Set<ConcurrentLinkedQueue<CruiseRoi>>> next = roiIterator.next();
            if (next.getKey().equals(taskId)) {
                Set<ConcurrentLinkedQueue<CruiseRoi>> queues = next.getValue();
                queues.parallelStream().forEach(queue ->
                        queue = null);
                roiIterator.remove();
            }

        }

        System.out.println("终止当前轮询的预案,taskId:" + taskId);
    }
    //获取预案执行的ROI
    public static  List<CruiseRoi> getExecuteRoi(String taskId) {
        Iterator<Map.Entry<String, Set<ConcurrentLinkedQueue<CruiseRoi>>>> roiIterator = taskRoiMap.entrySet().iterator();
        List<CruiseRoi> roiList = new ArrayList<>();
        while (roiIterator.hasNext()) {
            Map.Entry<String, Set<ConcurrentLinkedQueue<CruiseRoi>>> next = roiIterator.next();
            if (next.getKey().equals(taskId)) {
                Set<ConcurrentLinkedQueue<CruiseRoi>> queues = next.getValue();
                queues.parallelStream().forEach(queue ->
                        roiList.add(queue.peek()));
            }
        }
        return roiList;

    }

    public static void main(String[] args) throws InterruptedException {
        //初始化预案
        for (int i = 1; i <= 1; i++) {
            String taskId = "预案" + i;
            CruiseTask cruiseTask = new CruiseTask();
            cruiseTask.setId(taskId);
            CruisePoint point = new CruisePoint();
            point.setId("1");
            point.setSortNum(1);
            CruisePoint point2 = new CruisePoint();
            point2.setId("2");
            point2.setSortNum(2);
            CruisePoint poin3 = new CruisePoint();
            point.setId("3");
            point.setSortNum(3);
            List<CruisePoint> pointList = new ArrayList<>();
            pointList.add(point);
            pointList.add(point2);
            pointList.add(poin3);

            CruiseRoi roi = new CruiseRoi();
            roi.setId(taskId + ":1-1");
            roi.setSortNum(1);
            roi.setPeriod(10000);
            roi.setStartTime(1);
            roi.setEndTime(10);

            CruiseRoi roi2 = new CruiseRoi();
            roi2.setId(taskId + ":1-2");
            roi2.setSortNum(2);
            roi2.setPeriod(10000);
            roi2.setStartTime(1);
            roi2.setEndTime(10);

            List<CruiseRoi> roi1s = new ArrayList<>();
            roi1s.add(roi);
            roi1s.add(roi2);
            pointList.get(0).setRoiList(roi1s);

            CruiseRoi roi4 = new CruiseRoi();
            roi4.setId(taskId + ":2-2");
            roi4.setSortNum(2);
            roi4.setPeriod(10000);
            roi4.setStartTime(1);
            roi4.setEndTime(10);

            CruiseRoi roi3 = new CruiseRoi();
            roi3.setId(taskId + ":2-1");
            roi3.setSortNum(1);
            roi3.setPeriod(10000);
            roi3.setStartTime(1);
            roi3.setEndTime(10);
            List<CruiseRoi> roi2s = new ArrayList<>();
            roi2s.add(roi3);
            roi2s.add(roi4);
            pointList.get(1).setRoiList(roi2s);
            cruiseTask.setPointList(pointList);
            startParallelTask(cruiseTask);
        }
        System.out.println("当前预案正在执行的ROI:"+getExecuteRoi("预案1"));
        Thread.sleep(30000);
        System.out.println("当前预案正在执行的ROI:"+getExecuteRoi("预案1"));
        Thread.sleep(100000);
        stopTask("预案1");
    }
}


程序员灯塔
转载请注明原文链接:按照执行时间轮询任务(串行或并行)
喜欢 (0)