首页后端开发其他后端知识在Java中实现协程的思路和方法是什么

在Java中实现协程的思路和方法是什么

时间2024-03-25 21:26:03发布访客分类其他后端知识浏览953
导读:在这篇文章中我们会学习到关于“在Java中实现协程的思路和方法是什么”的知识,小编觉得挺不错的,现在分享给大家,也给大家做个参考,希望对大家学习或工作能有帮助。下面就请大家跟着小编的思路一起来学习一下吧。 业务场景:gola...
在这篇文章中我们会学习到关于“在Java中实现协程的思路和方法是什么”的知识,小编觉得挺不错的,现在分享给大家,也给大家做个参考,希望对大家学习或工作能有帮助。下面就请大家跟着小编的思路一起来学习一下吧。


业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)

一个线程可以多个协程,一个进程也可以单独拥有多个协程。

线程进程都是同步机制,而协程则是异步。

协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。

线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。

协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程.。

线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。

废话不多说,直接上代码:

导入包:

        dependency>
    
            groupId>
    co.paralleluniverse/groupId>
    
            artifactId>
    quasar-core/artifactId>
    
            version>
    0.7.9/version>
    
            classifier>
    jdk8/classifier>
    
        /dependency>
    
WorkTools工具类:
package com.example.ai;
    

import co.paralleluniverse.fibers.Fiber;
    
import co.paralleluniverse.fibers.SuspendExecution;
    
import co.paralleluniverse.strands.SuspendableRunnable;
    

import java.util.concurrent.ArrayBlockingQueue;



public class WorkTools {
    
    //协程池中默认协程的个数为5
    private static int WORK_NUM = 5;
    
    //队列默认任务为100
    private static int TASK_COUNT = 100;
    

    //工做协程数组
    private Fiber[] workThreads;
    
    //等待队列
    private final ArrayBlockingQueueSuspendableRunnable>
     taskQueue;
    

    //用户在构造这个协程池时,但愿启动的协程数
    private final int workerNum;



    //构造方法:建立具备默认协程个数的协程池
    public WorkTools() {
    
        this(WORK_NUM,TASK_COUNT);

    }


    //建立协程池,workNum为协程池中工做协程的个数
    public WorkTools(int workerNum, int taskCount) {

        if (workerNum = 0) {
    
            workerNum = WORK_NUM;

        }

        if (taskCount = 0) {
    
            taskCount = TASK_COUNT;

        }
    
        this.workerNum = workerNum;
    
        taskQueue = new ArrayBlockingQueue(taskCount);
    
        workThreads = new Fiber[workerNum];
    
        for (int i = 0;
     i  workerNum;
 i++) {
    
            int finalI = i;
    
            workThreads[i] = new Fiber>
(new SuspendableRunnable() {

                @Override
                public void run() throws SuspendExecution, InterruptedException {
    
                    SuspendableRunnable runnable = null;

                    while (true){

                        try{
    
                            //取任务,没有则阻塞。
                            runnable = taskQueue.take();

                        }
catch (Exception e){
    
                            System.out.println(e.getMessage());

                        }

                        //存在任务则运行。
                        if(runnable != null){
    
                            runnable.run();

                        }
    

                        runnable = null;

                    }

                }

            }
    );
      //new一个工做协程

            workThreads[i].start();
  //启动工做协程

        }
    

        Runtime.getRuntime().availableProcessors();

    }

    //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
    public void execute(SuspendableRunnable task) {

        try {
    
            taskQueue.put(task);
   //put:阻塞接口的插入
        }
 catch (Exception e) {
    
            // TODO: handle exception
            System.out.println("阻塞");

        }

    }

    //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
    public void destory() {
    
        //工做协程中止工做,且置为null
        System.out.println("ready close thread...");
    
        for (int i = 0;
     i  workerNum;
 i++) {
    

            workThreads[i] = null;
 //help gc
        }
    
        taskQueue.clear();
  //清空等待队列
    }

    //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
    @Override
    public String toString() {
    
        return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();

    }

}
    

测试代码:

package com.example.ai;
    

import co.paralleluniverse.strands.SuspendableRunnable;
    
import lombok.SneakyThrows;
    

import org.springframework.boot.autoconfigure.SpringBootApplication;
    

import java.util.concurrent.CountDownLatch;




@SpringBootApplication


public class AiApplication {


    @SneakyThrows
    public static void main(String[] args) {
    
        //等待协程任务完毕后再结束主线程
        CountDownLatch cdl = new CountDownLatch(50);
    
        //开启5个协程,50个任务列队。
        WorkTools myThreadPool = new WorkTools(5, 50);
    
        for (int i = 0;
     i 50;
 i++){
    
            int finalI = i;

            myThreadPool.execute(new SuspendableRunnable() {

                @Override
                public void run() {
    
                    System.out.println(finalI);

                    try {
    
                        //延迟1秒
                        Thread.sleep(1000);
    
                        cdl.countDown();

                    }
 catch (InterruptedException e) {
    
                        System.out.println("阻塞中");

                    }

                }

            }
    );


        }
    
        //阻塞
        cdl.await();

    }


}
    

具体代码都有注释了,自行了解。我也是以线程池写法实现。

当前为解决问题:在协程阻塞过程中Fiber类会报阻塞警告,满脸懵逼啊,看着很讨厌。暂时没有办法处理,看各位大神谁有招下方评论提供给下思路。万分感谢~


通过以上内容的阐述,相信大家对“在Java中实现协程的思路和方法是什么”已经有了进一步的了解,更多相关的问题,欢迎关注网络或到官网咨询客服。

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: 在Java中实现协程的思路和方法是什么
本文地址: https://pptw.com/jishu/653048.html
详解Java中如何实现经典的排序算法 Java栈和队列的定义、应用、实现和操作是什么

游客 回复需填写必要信息