Java并发编程

为什么需要并发编程?

计算机不是万能的,因为他有自己的处理速度,当我们开发Web应用或者其他通讯应用,会有很多用户并发(同时)访问时,在服务器端我们就必须做并发处理。

Java支持多线程的,每个请求一个线程来处理。行业经验不丰富的情况下,你可能考虑到是使用同步来保证资源的正常调用。要知道同步是非常消耗计算机资源的,因此这篇文章推荐大家使用Java并发编程包。

java.util.concurrent

在并发编程中很常用的实用工具类。

理解队列机制

就好比我们中午去食堂吃饭一样,我们需要等待前面的兄弟伙饭打好了,才能轮到后边一位。而这个过程我们可以用“先进先出”来表示。但有时候,有的人认识食堂的人,那么他就可能插队打饭。

在concurrent包中也就是实现了这一队列机制。下边是简单的示意图:

队列先进先出示意图

球1是最先进去的,最先出来。而矩形框就是我们的队列池。

调度线程与维护线程

这两个线程由java.util.concurrent包中的类提供

  1. 数据池必须要我们能访问它并且能存放数据到里面。

  2. 在程序启动时候间隔几秒或直接启动两个线程,

    • 调度线程:进行轮循启动 缓存的调度线程(这个负责取得数据池数据构造数据传递线程并传给维护线程池[ThreadPool],由维护线程池自动调用)
    • 维护线程:是将获取的数据进行进一步处理,要么存数据库,要么通过Http请求发送给其他服务。
  3. 异常处理机制,我们不能保证数据传递过程不会报出异常,这里我们就要进行异常处理,出现异常,获取异常数据,重新推进数据池中。

并发编程实现

// 线程池管理

package org.marker.mq;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;




/**
 * 线程池管理
 * @Date 2012-12-29
 * */
public class ThreadPoolManager {

    //线程实例变量
    private static volatile ThreadPoolManager poolManager;

    // 线程池维护线程的最少数量
    private final static int CORE_POOL_SIZE  = 4;

    // 线程池维护线程的最大数量
    private final static int MAX_POOL_SIZE   = 15;

    // 线程池维护线程所允许的空闲时间
    private final static int KEEP_ALIVE_TIME = 0;

    // 线程池所使用的缓冲队列大小
    private final static int WORK_QUEUE_SIZE = 15;

    // 消息缓冲队列
    private Queue<String> msgQueue = new LinkedList<String>();



    // 访问消息缓存的调度线程
    final Runnable accessBufferThread = new Runnable() {
        public void run() {
            if (hasMoreAcquire()) {//如果有请求内容,则创建一个新的AccessDBThread,并添加到线程池中
                String msg = (String) msgQueue.poll();
                Runnable task = new AccessDBThread(msg);
                threadPool.execute(task); 
            }
        }
    };


    //异常处理消息
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println(((AccessDBThread) r).msg + "消息放入队列中重新等待执行");
            msgQueue.offer(((AccessDBThread) r).msg);
        }
    };


    //管理数据库访问的线程池
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
            new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);



    //调度线程池(参数含义查看API)
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate( accessBufferThread, 10, 1, TimeUnit.MILLISECONDS);



    //私有构造方法
    private ThreadPoolManager() { }


    /**
     * 创建线程池实例
     * @return ThreadPoolManager
     */
    public static ThreadPoolManager getInstance() {
        if(poolManager == null) poolManager = new ThreadPoolManager();
        return poolManager;
    }


    //判断是否有更多的
    private boolean hasMoreAcquire() {
        return !msgQueue.isEmpty();
    }


    /**
     * 推送消息到池里
     * */
    public boolean put(String msg) {
        if(msgQueue.size() < 10){
            return msgQueue.offer(msg);
        }else{
            return false;
        }
    }

}

// 排队对象

package org.marker.mq;



/**
 * 数据库操作
 * @author marker
 * */
public class AccessDBThread implements Runnable {

    public String msg;

    public AccessDBThread(String msg) {
        this.msg = msg;
    }

    public void run() {
        // 向数据库中添加Msg变量值
        System.out.println("Added the message: " + msg + " into the Database");
    }

}

// 测试代码

package org.marker.mq;

public class TestDriver {

    ThreadPoolManager pool = ThreadPoolManager.getInstance();

    public void sendMsg(String msg) {
        if(!pool.put(msg + "记录一条日志 ")){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.sendMsg(msg);
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10000; i++) {
            new TestDriver().sendMsg(Integer.toString(i));
        }

    }

}

在这个并发编程实现中,并没有说理解处理数据库插入,而是交给线程池有时间有优先级的排队处理。

来源: 雨林博客(www.yl-blog.com)