文章目录
- 4.4 线程应用实例
- 4.4.1 等待超时模式
- 4.4.2 一个简单的数据库连接池示例
- 4.4.3 线程池技术及其示例
- 4.4.4 一个基于线程池技术的简单 Web 服务器
 
4.4 线程应用实例
4.4.1 等待超时模式
开发人员经常会遇到这样的方法调用场景:调用一个方法时等待一段时间(一般来说期给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。
前面的章节介绍了等待/通知的经典范式,即加锁、条件循环和处理逻辑3个步骤,而这种范式无法做到超时等待。而超时等待的加入,只需要对经典范式做出非常小的改动,改动内容如下所示。
假设超时时间段是T,那么可以推断出在当前时间now+T之后就会超时。
定义如下变量。
- 等待持续时间:REMAINING=T。
- 超时时间:FUTURE=now+T。
这时仅需要wait(REMAINING)即可,在wait(REMAINING)返回之后会将执行:REMAINING=FUTURE-now。如果REMAINING小于等于0,表示已经超时,直接退出否则将继续执行wait(REMAINING)。
上述描述等待超时模式的伪代码如下。
public synchronized Object get(long mills) throws InerruptedException {
	long future = System.currentTimeMillis() + mills;
	long remaining = mills;
	
	// 当超时大于0并且result返回值不满足要求
	while((result == null) && remaining > 0) {
		wait(remaining);
		remaining = future - System.currentTimeMillis();
	}
	
	return result;
}
可以看出,等待超时模式就是在等待/通知范式基础上增加了超时控制,这使得该模式相比原有范式更具有灵活性,因为即使方法执行时间过长,也不会“永久”阻塞调用者,而是会按照调用者的要求“按时”返回。
4.4.2 一个简单的数据库连接池示例
我们使用等待超时模式来构造一个简单的数据库连接池,在示例中模拟从连接池中获取、使用和释放连接的过程,而客户端获取连接的过程被设定为等待超时的模式,也就是在1000毫秒内如果无法获取到可用连接,将会返回给客户端一个null。设定连接池的大小为10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。
首先看一下连接池的定义。它通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,调用方需要先调用fetchConnection(long) 方法来指定在多少毫秒内超时来获取连接,当连接使用完成后,需要调用releaseConnection(Connection) 方法将连接放回线程池,示例代码如下:
import java.sql.Connection;
import java.util.LinkedList;
public class ConnectionPool {
    private LinkedList<Connection> pool = new LinkedList<>();
    public ConnectionPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                // 连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }
    // 在mills内无法获取到连接,将会返回nu11
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool) {
            // 完成超时
            if (mills <= 0) {
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if (!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return  result;
            }
        }
    }
}
由于java.sql.Connection是一个接口,最终的实现是由数据库驱动提供方来实现的,考虑到只是个示例,我们通过动态代理构造了一个Connection,该Connection 的代理实现仅仅是在commit()方法调用时休眠100毫秒,示例代码如下。
public class ConnectionDriver {
    static class ConnectionHandler implements InvocationHandler {
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getName().equals("commit")) {
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return null;
        }
    }
    // 创建一个Connection的代理,在commit时休眠100毫秒
    public static final Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
                new Class<?>[] {Connection.class}, new ConnectionHandler());
    }
}
下面通过一个示例来测试简易数据库连接池的工作情况,模拟客户端ConnectionRunner获取、使用、最后释放连接的过程,当它使用时连接将会增加获取到连接的数量,反之,将会增加未获取到连接的数量,示例代码如下。
import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class ConnectionPoolTest {
    static ConnectionPool pool = new ConnectionPool(10);
    // 保证所有ConnectionRunner能够同时开始
    static CountDownLatch start = new CountDownLatch(1);
    // main线程将会等待所有ConnectionRunnerJ结束后才能继续执行
    static CountDownLatch end;
    public static void main(String[] args) throws Exception {
        // 线程数量,可以修改线程数量进行观察
        int threadCount = 10;
        end = new CountDownLatch(threadCount);
        int count = 20;
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");
            thread.start();
        }
        start.countDown();
        end.await();
        System.out.println("total invoke: " + (threadCount * count));
        System.out.println("got connection: " + got);
        System.out.println("not got connection " + notGot);
    }
    static class ConnectionRunner implements Runnable {
        int count;
        AtomicInteger got;
        AtomicInteger notGot;
        public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }
        @Override
        public void run() {
            try {
                start.await();
            } catch (Exception exception) {
            }
            while (count > 0) {
                try {
                    // 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
                    // 分别统计连接获取的数量got和未获取到的数量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                    }
                } catch (Exception exception) {
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}
上述示例中使用了CountDownLatch来确保ConnectionRunnerThread 能够同时开始执行并且在全部结束之后,才使main线程从等待状态中返回。当前设定的场景是10个线程同时运行获取连接池(10个连接)中的连接,通过调节线程数量来观察未获取到连接的情况。线程数、总获取次数、获取到的数量、未获取到的数量以及未获取到的比率,如下表所示(本书作者机器CPU:i7-3635QM,内存为8GB,实际输出可能与此表不同)。
线程数量与连接获取的关系:
| 线程数量 | 总获取次数 | 获取到次数 | 未获取到次数 | 未获取到比率 | 
|---|---|---|---|---|
| 10 | 200 | 200 | 0 | 0% | 
| 20 | 400 | 387 | 13 | 3.25% | 
| 30 | 600 | 542 | 58 | 9.67% | 
| 40 | 800 | 700 | 100 | 12.5% | 
| 50 | 1000 | 828 | 172 | 17.2% | 
从表中的数据统计可以看出,在资源一定的情况下(连接池中的10个连接),随着客户端线程的逐步增加,客户端出现超时无法获取连接的比率不断升高。虽然客户端线程在这种超时获取的模式下会出现连接无法获取的情况,但是它能够保证客户端线程不会一直挂在连接获取的操作上,而是“按时”返回,并告知客户端连接获取出现问题,是系统的一种自我保护机制。数据库连接池的设计也可以复用到其他的资源获取的场景,针对昂贵资源(比如数据库连接)的获取都应该加以超时限制。
4.4.3 线程池技术及其示例
对于服务端的程序,经常面对的是客户端传入的短小(执行时间短、工作内容较为单一)任务,需要服务端快速处理并返回结果。如果服务端每次接受到一个任务,创建一个线程然后进行执行,这在原型阶段是个不错的选择,但是面对成千上万的任务递交进服务器时如果还是采用一个任务一个线程的方式,那么将会创建数以万记的线程,这不是一个好的选择。因为这会使操作系统频繁的进行线程上下文切换,无故增加系统的负载,而线程的创建和消亡都是需要耗费系统资源的,也无疑浪费了系统资源。
线程池技术能够很好地解决这个问题,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面,面对过量任务的提交能够平缓的劣化。
下面先看一下一个简单的线程池接口定义,示例代码如下。
public interface ThreadPool<Job extends Runnable> {
    // 执行一个Job,这个Job需要实现Runnable
    void execute(Job job);
    // 关闭线程池
    void shutdown();
    // 增加工作者线程
    void addWorkers(int num);
    // 减少工作者线程
    void removeWorker(int num);
    // 得到正在等待执行的任务数量
    int getJobSize();
}
客户端可以通过 execute(Job)方法将Job提交入线程池执行,而客户端自身不用等待 Job的执行完成。除了execute(Job)方法以外,线程池接口提供了增大 / 减少工作者线程以及关闭线程池的方法。这里工作者线程代表着一个重复执行Job的线程,而每个由客户端提交的Job都将进人到一个工作队列中等待工作者线程的处理。
接下来是线程池接口的默认实现,示例代码如下。
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    // 线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    // 线程池默认的数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 这是一个工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<Job>();
    // 工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    // 工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    // 线程编号生成
    private AtomicLong threadNum = new AtomicLong();
    public DefaultThreadPool() {
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }
    public DefaultThreadPool(int num) {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }
    public void execute(Job job) {
        if (job != null) {
            // 添加一个工作,然后进行通知
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }
    public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }
    @Override
    public void addWorkers(int num) {
        synchronized (jobs) {
            // 限制新增的Worker数量不能超过最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }
    public void removeWorker(int num) {
        synchronized (jobs) {
            if (num >= this.workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
            // 按照给定的数量停止Worker
            int count = 0;
            while (count < num) {
                Worker worker = workers.get(count);
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }
    public int getJobSize() {
        return jobs.size();
    }
    // 初始化线程工作者
    private void initializeWorkers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }
    // 工作者,负责消费任务
    class Worker implements Runnable {
        // 是否工作
        private volatile boolean running = true;
        @Override
        public void run() {
            while (running) {
                Job job = null;
                synchronized (jobs) {
                    // 如果工作者列表是空的,那么就wait
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException exception) {
                            // 感知到外部对WorkerThread的中断操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 取出一个Job
                    job = jobs.removeFirst();
                }
                if (job != null) {
                    try {
                        job.run();
                    } catch (Exception exception) {
                        // 忽略Job执行中的Exception
                    }
                }
            }
        }
        public void shutdown() {
            running = false;
        }
    }
}
从线程池的实现可以看到,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中添加Job,而每个工作者线程会不断地从jobs上取出一个Job进行执行,当jobs为空时,工作者线程进入等待状态。
添加一个Job后,对工作队列jobs调用了其notify()方法,而不是notifyAll()方法,因为能够确定有工作者线程被唤醒,这时使用notify()方法将会比notifyAll()方法获得更小的开销(避免将等待队列中的线程全部移动到阻塞队列中)。
可以看到,线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队列上取工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。
4.4.4 一个基于线程池技术的简单 Web 服务器
目前的浏览器都支持多线程访问,比如说在请求一个HTML页面的时候,页面中包含的图片资源、样式资源会被浏览器发起并发的获取,这样用户就不会遇到一直等到一个图完全下载完成才能继续查看文字内容的尴尬情况。
如果 web服务器是单线程的,多线程的浏览器也没有用武之地,因为服务端还是一个请求一个请求的顺序处理。因此,大部分Web服务器都是支持并发访问的。常用的Java Web服务器,如Tomcat、Jetty,在其处理请求的过程中都使用到了线程池技术。
下面通过使用前一节中的线程池来构造一个简单的Web服务器,这个Web服务器用来处理HTTP请求,目前只能处理简单的文本和JPG图片内容。这个Web服务器使用main线程不断地接受客户端Socket的连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处理多个客户端请求,示例代码所示。
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class SimpleHttpServer {
    // 处理HttpRequest的线程池
    static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(1);
    // SimpleHttpServer的根路径
    static String basePath;
    static ServerSocket serverSocket;
    // 服务监听端口
    static int port = 8080;
    public static void setPort(int port) {
        if (port > 0) {
            SimpleHttpServer.port = port;
        }
    }
    public static void setBasePath(String basePath) {
        if (basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()) {
            SimpleHttpServer.basePath = basePath;
        }
    }
    // 启动SimpleHttpServer
    public static void start() throws Exception {
        serverSocket = new ServerSocket(port);
        Socket socket = null;
        while ((socket = serverSocket.accept()) != null) {
            // 接收一个客户端Socket,生成一个HttpRequestHandler,放入线程池执行
            threadPool.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }
    static class HttpRequestHandler implements Runnable {
        private Socket socket;
        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }
        @Override
        public void run() {
            String line = null;
            BufferedReader br = null;
            BufferedReader reader = null;
            PrintWriter out = null;
            InputStream in = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                // 由相对路径计算出绝对路径
                String filePath = basePath + header.split(" ")[1];
                out = new PrintWriter(socket.getOutputStream());
                // 如果请求资源的后缀为jpg或者ico,则读取资源并输出
                if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                    in = new FileInputStream(filePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int i = 0;
                    while ((i = in.read()) != -1) {
                        baos.write(i);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: image/jpeg");
                    out.println("Content-Length: " + array.length);
                    out.println("");
                    socket.getOutputStream().write(array, 0, array.length);
                } else {
                    br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
                    out = new PrintWriter(socket.getOutputStream());
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println("");
                    while ((line = br.readLine()) != null) {
                        out.println(line);
                    }
                }
                out.flush();
            } catch (Exception e) {
                out.println("HTTP/1.1 500");
                out.println("");
                out.flush();
            } finally {
                close(br,in, reader, out, socket);
            }
        }
    }
    // 关闭流或者Socket
    private static void close(Closeable... closeables) {
        if (closeables != null) {
            for (Closeable closeable : closeables) {
                try {
                    closeable.close();
                } catch (Exception e) {
                }
            }
        }
    }
}
该 Web服务器处理用户请求的时序图如下。

在上图中,SimpleHttpServer在建立了与客户端的连接之后,并不会处理客户端的请求,而是将其包装成 HttpRequestHandler 并交由线程池处理。在线程池中的 Worker 处理客户端请求的同时,SimpleHttpServer能够继续完成后续客户端连接的建立,不会阻塞后续客户端的请求。
接下来,通过一个测试对比来认识线程池技术带来服务器吞吐量的提高。我们准备了个简单的 HTML 页面,内容如下。
<html>
	<head>
		<title>测试页面</title>
	</head>
	<body align = "center">
		<h1>第一张图片</h1>
		<img src="1.jpg" align="middle"/>
		<h1>第二张图片</h1>
		<img src="2.jpg" align="middle"/>
		<h1>第三张图片</h1>
		<img src="3.jpg" align="middle"/>
	</body>
</html>
将SimpleHttpServer的根目录设定到该HTML页面所在目录,并启动SimpleHttpServer,通过 Apache HTTP server benchmarking tool(版本 2.3)来测试不同线程数下,SimpleHttpServer的吞吐量表现。
测试场景是5000次请求,分10个线程并发执行,测试内容主要考察响应时间(越小越好)和每秒查询的数量(越高越好),测试结果如下表所示(本书作者机器CPU:i7-3635QM,内存为 8GB,实际输出可能与此表不同)。
| 线程池线程数量 | 1 | 5 | 10 | 
|---|---|---|---|
| 响应时间(ms) | 0.352 | 0.246 | 0.163 | 
| 每秒查询的数量 | 3076 | 4065 | 6123 | 
| 测试完成时间(s) | 1.625 | 1.230 | 0.816 | 
可以看到,随着线程池中线程数量的增加,SimpleHttpServer的吞吐量不断增大,响应时间不断变小,线程池的作用非常明显。
但是,线程池中线程数量并不是越多越好,具体的数量需要评估每个任务的处理时间以及当前计算机的处理器能力和数量。使用的线程过少,无法发挥处理器的性能;使用的线程过多,将会增加系统的无故开销,起到相反的作用。



















