推荐学习:《java视频教程》
业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)
一个线程可以多个协程,一个进程也可以单独拥有多个协程。
线程进程都是同步机制,而协程则是异步。
协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。
线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。
协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的cpu资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, ui线程, 或新建新程.。
线程是协程的资源。协程通过interceptor来间接使用线程这个资源。
废话不多说,直接上代码:
导入包:
<dependency> <groupid>co.paralleluniverse</groupid> <artifactid>quasar-core</artifactid> <version>0.7.9</version> <classifier>jdk8</classifier> </dependency>
worktools工具类:
package com.example.ai;import co.paralleluniverse.fibers.fiber;import co.paralleluniverse.fibers.suspendexecution;import co.paralleluniverse.strands.suspendablerunnable;import java.util.concurrent.arrayblockingqueue;public class worktools { //协程池中默认协程的个数为5 private static int work_num = 5; //队列默认任务为100 private static int task_count = 100; //工做协程数组 private fiber[] workthreads; //等待队列 private final arrayblockingqueue<suspendablerunnable> taskqueue; //用户在构造这个协程池时,但愿启动的协程数 private final int workernum; //构造方法:建立具备默认协程个数的协程池 public worktools() { this(work_num,task_count); } //建立协程池,worknum为协程池中工做协程的个数 public worktools(int workernum, int taskcount) { if (workernum <= 0) { workernum = work_num; } if (taskcount <= 0) { taskcount = task_count; } this.workernum = workernum; taskqueue = new arrayblockingqueue(taskcount); workthreads = new fiber[workernum]; for (int i = 0; i < workernum; i++) { int finali = i; workthreads[i] = new fiber<>(new suspendablerunnable() { @override public void run() throws suspendexecution, interruptedexception { suspendablerunnable runnable = null; while (true){ try{ //取任务,没有则阻塞。 runnable = taskqueue.take(); }catch (exception e){ system.out.println(e.getmessage()); } //存在任务则运行。 if(runnable != null){ runnable.run(); } runnable = null; } } }); //new一个工做协程 workthreads[i].start(); //启动工做协程 } runtime.getruntime().availableprocessors(); } //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定 public void execute(suspendablerunnable task) { try { taskqueue.put(task); //put:阻塞接口的插入 } catch (exception e) { // todo: handle exception system.out.println(阻塞); } } //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁 public void destory() { //工做协程中止工做,且置为null system.out.println(ready close thread...); for (int i = 0; i < workernum; i++) { workthreads[i] = null; //help gc } taskqueue.clear(); //清空等待队列 } //覆盖tostring方法,返回协程信息:工做协程个数和已完成任务个数 @override public string tostring() { return workthread number: + workernum + ==分割线== wait task number: + taskqueue.size(); }}
测试代码:
package com.example.ai;import co.paralleluniverse.strands.suspendablerunnable;import lombok.sneakythrows;import org.springframework.boot.autoconfigure.springbootapplication;import java.util.concurrent.countdownlatch;@springbootapplicationpublic class aiapplication { @sneakythrows public static void main(string[] args) { //等待协程任务完毕后再结束主线程 countdownlatch cdl = new countdownlatch(50); //开启5个协程,50个任务列队。 worktools mythreadpool = new worktools(5, 50); for (int i = 0; i< 50; i++){ int finali = i; mythreadpool.execute(new suspendablerunnable() { @override public void run() { system.out.println(finali); try { //延迟1秒 thread.sleep(1000); cdl.countdown(); } catch (interruptedexception e) { system.out.println(阻塞中); } } }); } //阻塞 cdl.await(); }}
具体代码都有注释了,自行了解。我也是以线程池写法实现。
当前为解决问题:在协程阻塞过程中fiber类会报阻塞警告,满脸懵逼啊,看着很讨厌。暂时没有办法处理,看各位大神谁有招下方评论提供给下思路。万分感谢~
推荐学习:《java视频教程》
以上就是实例介绍java基于quasar实现协程池的详细内容。
