您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

实例介绍Java基于quasar实现协程池

2025/11/4 2:03:29发布12次查看
本篇文章给大家带来了关于java的相关知识,其中主要整理了基于quasar实现协程池的相关问题,一个线程可以多个协程,一个进程也可以单独拥有多个协程,线程进程都是同步机制,而协程则是异步,下面一起来看一下,希望对大家有帮助。
推荐学习:《java视频教程》
业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)
一个线程可以多个协程,一个进程也可以单独拥有多个协程。
线程进程都是同步机制,而协程则是异步。
协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。
线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。
协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的cpu资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, ui线程, 或新建新程.。
线程是协程的资源。协程通过interceptor来间接使用线程这个资源。
废话不多说,直接上代码:
导入包:
        <dependency>            <groupid>co.paralleluniverse</groupid>            <artifactid>quasar-core</artifactid>            <version>0.7.9</version>            <classifier>jdk8</classifier>        </dependency>
worktools工具类:
package com.example.ai;import co.paralleluniverse.fibers.fiber;import co.paralleluniverse.fibers.suspendexecution;import co.paralleluniverse.strands.suspendablerunnable;import java.util.concurrent.arrayblockingqueue;public class worktools {    //协程池中默认协程的个数为5    private static int work_num = 5;    //队列默认任务为100    private static int task_count = 100;    //工做协程数组    private fiber[] workthreads;    //等待队列    private final arrayblockingqueue<suspendablerunnable> taskqueue;    //用户在构造这个协程池时,但愿启动的协程数    private final int workernum;    //构造方法:建立具备默认协程个数的协程池    public worktools() {        this(work_num,task_count);    }    //建立协程池,worknum为协程池中工做协程的个数    public worktools(int workernum, int taskcount) {        if (workernum <= 0) { workernum = work_num; } if (taskcount <= 0) { taskcount = task_count; } this.workernum = workernum; taskqueue = new arrayblockingqueue(taskcount); workthreads = new fiber[workernum]; for (int i = 0; i < workernum; i++) { int finali = i; workthreads[i] = new fiber<>(new suspendablerunnable() {                @override                public void run() throws suspendexecution, interruptedexception {                    suspendablerunnable runnable = null;                    while (true){                        try{                            //取任务,没有则阻塞。                            runnable = taskqueue.take();                        }catch (exception e){                            system.out.println(e.getmessage());                        }                        //存在任务则运行。                        if(runnable != null){                            runnable.run();                        }                        runnable = null;                    }                }            });  //new一个工做协程            workthreads[i].start();  //启动工做协程        }        runtime.getruntime().availableprocessors();    }    //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定    public void execute(suspendablerunnable task) {        try {            taskqueue.put(task);   //put:阻塞接口的插入        } catch (exception e) {            // todo: handle exception            system.out.println(阻塞);        }    }    //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁    public void destory() {        //工做协程中止工做,且置为null        system.out.println(ready close thread...);        for (int i = 0; i < workernum; i++) {            workthreads[i] = null; //help gc        }        taskqueue.clear();  //清空等待队列    }    //覆盖tostring方法,返回协程信息:工做协程个数和已完成任务个数    @override    public string tostring() {        return workthread number: + workernum +  ==分割线== wait task number: + taskqueue.size();    }}
测试代码:
package com.example.ai;import co.paralleluniverse.strands.suspendablerunnable;import lombok.sneakythrows;import org.springframework.boot.autoconfigure.springbootapplication;import java.util.concurrent.countdownlatch;@springbootapplicationpublic class aiapplication {    @sneakythrows    public static void main(string[] args) {        //等待协程任务完毕后再结束主线程        countdownlatch cdl = new countdownlatch(50);        //开启5个协程,50个任务列队。        worktools mythreadpool = new worktools(5, 50);        for (int i = 0; i< 50; i++){            int finali = i;            mythreadpool.execute(new suspendablerunnable() {                @override                public void run() {                    system.out.println(finali);                    try {                        //延迟1秒                        thread.sleep(1000);                        cdl.countdown();                    } catch (interruptedexception e) {                        system.out.println(阻塞中);                    }                }            });        }        //阻塞        cdl.await();    }}
具体代码都有注释了,自行了解。我也是以线程池写法实现。
当前为解决问题:在协程阻塞过程中fiber类会报阻塞警告,满脸懵逼啊,看着很讨厌。暂时没有办法处理,看各位大神谁有招下方评论提供给下思路。万分感谢~
推荐学习:《java视频教程》
以上就是实例介绍java基于quasar实现协程池的详细内容。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product