博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java-CompletionService
阅读量:6281 次
发布时间:2019-06-22

本文共 4578 字,大约阅读时间需要 15 分钟。

JDK的CompletionService提供了一种将生产新的异步任务与使用已完成任务的结果分离开来的服务,生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

任务:

现在要向服务器发送HTTP请求,服务端对于每个请求都需要做很多额外操作,很消耗时间,则可以将每个请求接受之后,提交到CompletionService异步处理,等执行完毕之后,在返回给客户端。

import java.util.concurrent.Callable;  import java.util.concurrent.CompletionService;  import java.util.concurrent.ExecutorCompletionService;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;  import java.util.concurrent.Future;  public class CompletionServiceTest {
private ExecutorService threadPool = Executors.newCachedThreadPool(); private CompletionService
completionService = new ExecutorCompletionService
( Executors.newCachedThreadPool()); public CompletionServiceTest() { new Thread() { public void run() { while (true) { try { Future
f = completionService.take(); /** * 获取响应信息,返回给客户端 * 如果completionService任务队列为空,此处将阻塞 */ Response resp = f.get(); System.out.println(resp.getId()); } catch (Exception e) { System.out.println("Exception happened:"+e.getMessage()); } } }; }.start(); } class Request{ private int rid; private String body; public int getRid() { return rid; } public void setRid(int rid) { this.rid = rid; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class Response { private int id; private String body; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class HTTPExecutor { public Future
execute(final Request request) { Future
f = threadPool.submit(new Callable
() { public Response call() throws Exception { Response response = new Response(); Thread.currentThread().sleep(3000); response.setId(request.getRid()); response.setBody("response"); return response; } }); return f; } } public void submitHTTP(final Request request) { completionService.submit(new Callable
() { public Response call() throws Exception { return new HTTPExecutor().execute(request).get(); } }); } public static void main(String[] args) { CompletionServiceTest t = new CompletionServiceTest(); for (int i = 0; i < 10; i++) { /** * 发送10个HTTP请求 */ Request request =t.new Request(); request.setRid(i); request.setBody("request"); t.submitHTTP(request); } } }

ExecutorCompletionService源码

public ExecutorCompletionService(Executor executor) {          if (executor == null)              throw new NullPointerException();          this.executor = executor;          this.aes = (executor instanceof AbstractExecutorService) ?              (AbstractExecutorService) executor : null;          this.completionQueue = new LinkedBlockingQueue
>(); } public ExecutorCompletionService(Executor executor, BlockingQueue
> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future
submit(Callable
task) { if (task == null) throw new NullPointerException(); RunnableFuture
f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }

通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际执行任务,内部管理了一个阻塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步执行该RunnableFuture,当其状态变为done后,添加CompletionService的阻塞队列中,外部通过调用take()(阻塞)或者poll()(非阻塞,为空返回null)方法获取执行结果。

转载地址:http://pysva.baihongyu.com/

你可能感兴趣的文章
IOE,为什么去IOE?
查看>>
java 用反射简单应用,将Object简单转换成map
查看>>
Storm中的Worker
查看>>
dangdang.ddframe.job中页面修改表达式后进行检查
查看>>
Web基础架构:负载均衡和LVS
查看>>
Linux下c/c++相对路径动态库的生成与使用
查看>>
SHELL实现跳板机,只允许用户执行少量允许的命令
查看>>
SpringBoot 整合Redis
查看>>
2014上半年大片早知道
查看>>
Android 6.0指纹识别App开发案例
查看>>
正文提取算法
查看>>
轻松学PHP
查看>>
Linux中的网络监控命令
查看>>
this的用法
查看>>
windows下安装redis
查看>>
CentOS7 yum 安装git
查看>>
启动日志中频繁出现以下信息
查看>>
httpd – 对Apache的DFOREGROUND感到困惑
查看>>
分布式锁的一点理解
查看>>
idea的maven项目,install下载重复下载本地库中已有的jar包,而且下载后jar包都是lastupdated问题...
查看>>