首页后端开发JAVA聊聊如何自定义parallelStream的线程池

聊聊如何自定义parallelStream的线程池

时间2023-10-21 14:43:03发布访客分类JAVA浏览873
导读:序本文主要研究一下parallelStream怎么使用自定义的线程池ForkJoinPooljava/util/concurrent/ForkJoinPool.javapublic class ForkJoinPool extends Ab...

本文主要研究一下parallelStream怎么使用自定义的线程池

ForkJoinPool

java/util/concurrent/ForkJoinPool.java

public class ForkJoinPool extends AbstractExecutorService {


    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
    
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
    
        checkPermission();

    }


    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
    
        this.workerNamePrefix = workerNamePrefix;
    
        this.factory = factory;
    
        this.ueh = handler;
    
        this.config = (parallelism &
     SMASK) | mode;
    
        long np = (long)(-parallelism);
     // offset ctl counts
        this.ctl = ((np  AC_SHIFT) &
     AC_MASK) | ((np  TC_SHIFT) &
     TC_MASK);

    }


    private static ForkJoinPool makeCommonPool() {
    
        int parallelism = -1;
    
        ForkJoinWorkerThreadFactory factory = null;
    
        UncaughtExceptionHandler handler = null;

        try {
      // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
    
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
    
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
    
            if (pp != null)
                parallelism = Integer.parseInt(pp);
    
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
    
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());

        }
 catch (Exception ignore) {

        }

        if (factory == null) {
    
            if (System.getSecurityManager() == null)
                factory = new DefaultCommonPoolForkJoinWorkerThreadFactory();
    
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();

        }
    
        if (parallelism  0 &
    &
     // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) = 0)
            parallelism = 1;
    
        if (parallelism >
     MAX_CAP)
            parallelism = MAX_CAP;
    
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");

    }


}
    

parallelStream默认使用的是common的ForkJoinPool,可以通过系统属性来设置parallelism等

ForkJoinPoolFactoryBean

org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java

public class ForkJoinPoolFactoryBean implements FactoryBeanForkJoinPool>
, InitializingBean, DisposableBean {
    

	private boolean commonPool = false;
    

	private int parallelism = Runtime.getRuntime().availableProcessors();
    

	private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
    

	@Nullable
	private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
    

	private boolean asyncMode = false;
    

	private int awaitTerminationSeconds = 0;
    

	@Nullable
	private ForkJoinPool forkJoinPool;


	//......

	@Override
	public void destroy() {

		if (this.forkJoinPool != null) {
    
			// Ignored for the common pool.
			this.forkJoinPool.shutdown();
    

			// Wait for all tasks to terminate - works for the common pool as well.
			if (this.awaitTerminationSeconds >
 0) {

				try {
    
					this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);

				}

				catch (InterruptedException ex) {
    
					Thread.currentThread().interrupt();

				}

			}

		}

	}


}
	

spring3.1提供了ForkJoinPoolFactoryBean,可以用于创建并托管forkJoinPool

示例

配置

@Configuration
public class ForkJoinConfig {


    @Bean
    public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
    
        ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean();
    
        factoryBean.setCommonPool(false);
    
        // NOTE LIFO_QUEUE FOR working steal from tail of queue
        factoryBean.setAsyncMode(true);
     // NOTE true FIFO_QUEUE, false LIFO_QUEUE
        factoryBean.setParallelism(10);
    
        // factoryBean.setUncaughtExceptionHandler();
    
        factoryBean.setAwaitTerminationSeconds(60);
    
        return factoryBean;

    }

}
    

使用

    @Autowired
    ForkJoinPoolFactoryBean forkJoinPoolFactoryBean;


    public void streamParallel() throws ExecutionException, InterruptedException {
    
        ListTodoTask>
     result = forkJoinPoolFactoryBean.getObject().submit(new CallableListTodoTask>
    >
() {
    
            @Override
            public ListTodoTask>
 call() throws Exception {
    
                return IntStream.rangeClosed(1, 20).parallel().mapToObj(i ->
 {

                    log.info("thread:{
}
    ", Thread.currentThread().getName());
    
                    return new TodoTask(i, "name"+i);

                }
    ).collect(Collectors.toList());

            }

        }
    ).get();
    
        result.stream().forEach(System.out::println);

    }
    

common的workerName前缀为ForkJoinPool.commonPool-worker-

自定义的workerName前缀默认为ForkJoinPool- nextPoolId() -worker-

小结

parallelStream默认使用的是commonPool,是static代码块默认初始化,针对个别场景可以自定义ForkJoinPool,将parallelStream作为一个任务丢进去,这样子就不会影响默认的commonPool。

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: 聊聊如何自定义parallelStream的线程池
本文地址: https://pptw.com/jishu/504563.html
php设计模式终篇:一文读懂:依赖注入、控制反转、IoC容器 GC 的两种判定方法

游客 回复需填写必要信息