BlockingQueue队列处理高并发下的日志
前言
当系统流量负载比较高时,业务日志的写入操作也要纳入系统性能考量之内,如若处理不当,将影响系统的正常业务操作,之前写过一篇《spring boot通过MQ消费log4j2的日志》的博文,采用了RabbitMQ消息中间件来存储抗高并发下的日志,因为引入了中间件,操作使用起来可能没那么简便,今天分享使用多线程消费阻塞队列的方式来处理我们的海量日志
what阻塞队列?
阻塞队列(BlockingQueue)是区别于普通队列多了两个附加操作的线程安全的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
1.声明存储固定消息的队列
/** * Created by kl on 2017/3/20. * Content :销售操作日志队列 */ public class SalesLogQueue{ //队列大小 public static final int QUEUE_MAX_SIZE = 1000; private static SalesLogQueue alarmMessageQueue = new SalesLogQueue(); //阻塞队列 private BlockingQueueblockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); private SalesLogQueue(){} public static SalesLogQueue getInstance() { return alarmMessageQueue; } /** * 消息入队 * @param salesLog * @return */ public boolean push(SalesLog salesLog) { return this.blockingQueue.add(salesLog);//队列满了就抛出异常,不阻塞 } /** * 消息出队 * @return */ public SalesLog poll() { SalesLog result = null; try { result = this.blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return result; } /** * 获取队列大小 * @return */ public int size() { return this.blockingQueue.size(); } }
ps:因为业务原因,采用add的方式入队,队列满了就抛异常,不阻塞
2.消息入队
消息入队可以在任何需要保存日志的地方操作,如aop统一拦截日志处理,filter过滤请求日志处理,或者耦合的业务日志,记住,不阻塞入队操作,不然将影响正常的业务操作,如下为filter统一处理请求日志:
/** * Created by kl on 2017/3/20. * Content :访问请求拦截,保存操作日志 */ public class SalesLogFilter implements Filter { private RoleResourceService resourceService; @Override public void init(FilterConfig filterConfig) throws ServletException { ServletContext context = filterConfig.getServletContext(); ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context); resourceService = ctx.getBean(RoleResourceService.class); } @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { try { HttpServletRequest request = (HttpServletRequest) servletRequest; String requestUrl = request.getRequestURI(); String requestType=request.getMethod(); String ipAddress = HttpClientUtil.getIpAddr(request); Map resource=resourceService.getResource(); String context=resource.get(requestUrl); //动态url正则匹配 if(StringUtil.isNull(context)){ for(Map.Entry entry:resource.entrySet()){ String resourceUrl= entry.getKey(); if(requestUrl.matches(resourceUrl)){ context=entry.getValue(); break; } } } SalesLog log=new SalesLog(); log.setCreateDate(new Timestamp(System.currentTimeMillis())); log.setContext(context); log.setOperateUser(UserTokenUtil.currentUser.get().get("realname")); log.setRequestIp(ipAddress); log.setRequestUrl(requestUrl); log.setRequestType(requestType); SalesLogQueue.getInstance().push(log); }catch (Exception e){ e.printStackTrace(); } filterChain.doFilter(servletRequest, servletResponse); } @Override public void destroy() { } }
3.消息出队被消费
BlockingQueue是线程安全的,所以可以放心的在多个线程中去处理队列中的消息,如下代码声明了一个两个大小的固定线程池,并添加了两个线程去处理队列中的消息
/** * Created by kl on 2017/3/20. * Content :启动消费操作日志队列的线程 */ @Component public class ConsumeSalesLogQueue { @Autowired SalesLogService salesLogService; @PostConstruct public void startrtThread() { ExecutorService e = Executors.newFixedThreadPool(2);//两个大小的固定线程池 e.submit(new PollSalesLog(salesLogService)); e.submit(new PollSalesLog(salesLogService)); } class PollSalesLog implements Runnable { SalesLogService salesLogService; public PollSalesLog(SalesLogService salesLogService) { this.salesLogService = salesLogService; } @Override public void run() { while (true) { try { SalesLog salesLog = SalesLogQueue.getInstance().poll(); if(salesLog!=null){ salesLogService.saveSalesLog(salesLog); } } catch (Exception e) { e.printStackTrace(); } } } } }
参考博文如下,对BlockingQueue队列更多了解,可读一读如下的博文:
详细分析Java并发集合ArrayBlockingQueue的用法