分布式多线程性能调优
使用多线程优化接口

 
 
	//下单业务
    public Object order( long userId){
        long start = System.currentTimeMillis();//方法的开始时间戳(ms)
        JSONObject orderInfo = remoteService.createOrder(userId);
        Callable<JSONObject> callable1 = new Callable<JSONObject>() {
            @Override
            public JSONObject call() throws Exception {
                JSONObject goodsInfo = remoteService.dealGoods(orderInfo);
                return goodsInfo;
            }
        };
        Callable<JSONObject> callable2 = new Callable<JSONObject>() {
            @Override
            public JSONObject call() throws Exception {
                JSONObject pointsInfo = remoteService.dealPoints(orderInfo);
                return pointsInfo;
            }
        };
        Callable<JSONObject> callable3 = new Callable<JSONObject>() {
            @Override
            public JSONObject call() throws Exception {
                JSONObject deliverInfo = remoteService.dealDeliver(orderInfo);
                return deliverInfo;
            }
        };
        LeeFutureTask<JSONObject> task1 = new LeeFutureTask(callable1);
        LeeFutureTask<JSONObject> task2 = new LeeFutureTask(callable2);
        LeeFutureTask<JSONObject> task3 = new LeeFutureTask(callable3);
        Thread thread1 =new Thread(task1);
        Thread thread2 =new Thread(task2);
        Thread thread3 =new Thread(task3);
        thread1.start();
        thread2.start();
        thread3.start();
        try {
            orderInfo.putAll(task1.get());
            orderInfo.putAll(task2.get());
            orderInfo.putAll(task3.get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
        long end = System.currentTimeMillis();//方法的开始时间戳(ms)
        System.out.println(end-start);//打印这个方法的执行时间
        return orderInfo;
    }
 
后台批处理的优化

public JSONObject orderFastbatch (long userId) throws Exception{
        JSONObject orderInfo = remoteService.createOrderFast(userId);
        //JSONObject goodsInfo = remoteService.dealGoodsFast(orderInfo); //这里是单个的请求,想做成批量+异步的方式
        //orderInfo.putAll(goodsInfo);
        CompletableFuture<JSONObject> future = new CompletableFuture<>();
        Request request = new Request();
        request.future =future;
        request.object = orderInfo;
        queue.add(request);
        return future.get(); //这里类似于FutureTask  的get方法,去异步的方式拿结果
    }
    //定义一个JUC中的MQ
    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();
    class Request{
        JSONObject object;
        CompletableFuture<JSONObject> future;
    }
    @PostConstruct
    public void DoBiz(){
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                int size =queue.size();
                if(size ==0) return;//没有数据在queue中,定时任务不需要执行什么
                if(size >1000) size=1000;//这里限制每次批量封装的最多是1000
                List<JSONObject> ListJSONRepuest = new ArrayList<>();
                List<Request>  ListRequest = new ArrayList<>();
                for( int i =0 ;i <size;i++){
                    Request request =queue.poll();//从MQ中拉取
                    ListRequest.add(request);
                    ListJSONRepuest.add(request.object);
                }
                //调用了多少次批量接口
                List<JSONObject> ListJSONReponse  = remoteService.dealGoodsFastBatch(ListJSONRepuest);
                System.out.println("调用批量接口,本地组装的数据:"+size+"条");
                for(JSONObject  JSONReponse:ListJSONReponse){//这里可以使用hashmap 的方式减少一轮遍历。
                    for(Request  request:ListRequest){
                        String  request_OrderId =request.object.get("orderId").toString();
                        String  response_OrderId =JSONReponse.get("orderId").toString();
                       if(request_OrderId.equals(response_OrderId)){
                           request.future.complete(JSONReponse);
                       }
                    }
                }
            }
        }, 3000, 50, TimeUnit.MILLISECONDS);
    }
 
//处理库存信息 (批量接口)  1000,
    public List<JSONObject> dealGoodsFastBatch( List<JSONObject> objectList) {
        List<JSONObject> list = objectList;
        Iterator it = list.iterator();
        while(it.hasNext()){
            JSONObject result =(JSONObject)it.next();
            result.put("dealGoods", "ok");
        }
        return  list;
    }
 
批处理与MySQL的综合性优化
如果做了批量处理的话。
 1W个请求,高并发请求,其实里面针对的修改库存会集中在一些热点数据8000个在一个商品。
 应用层基于批量的接口进行优化。
 伪代码:
 for循环遍历list,把所有相同的goods_id放到hashmap进行扣减计数即可。
分布式锁
单机锁:sync、lock
 MySQL去实现分布式锁–for update (行锁)
Redis
分布式锁的第一种常见方案:Redis来实现分布式锁。Redis key-value键值对的数据库–内存。
Redis的分布式锁的实现逻辑:
 1、加锁,setnx key value
 1)为了避免死锁问题setnx完之后设置TTL失效时间
 2)为了TTL的失效的时候业务还未完成导致的多个应用抢到锁的BUG,这里可以使用一个守护线程,来不断的去续锁(延长key的TTL)
2、解锁del key
 无论是加锁,还是解锁,这里涉及到多个命令。要解决原子性问题:
 1、复合命令实现加锁。set lock 9527 ex 10 nx
 2、解锁的逻辑中:在del之前一定要判断:只有持有锁的应用或线程,才能去解锁成功,否则都是失败。value做文章。存一个唯一标识。
if (get ==应用的保存的值)del
else释放锁失败
 
使用Lua脚本
锁的可重入。A抢到了锁,没有释放锁之前,依然可以lock进入加锁逻辑的。
Zookeeper

 1、连接ZK、创建分布式锁的根节点/lock
 2、一个线程获取锁时,create ()在/lock西面创建1个临时顺序节点3、使用getChildren()获取当前所有字节点,并且对子节点进行排序
 4、判断当前创建的节点是否是所有子节点中最小的节点,如果是,则获取锁->执行对应的业务逻辑->在结束的时候使用delete()方法删除该节点。
 否则需要等待其他的节点的释放锁、
 5、当一个线程需要释放锁的,删除该节点,其他的线程获取当前节点前的一个节点来获取锁。




![[Kubernetes]2. k8s集群中部署基于nodejs golang的项目以及Pod、Deployment详解](https://img-blog.csdnimg.cn/direct/cbbb0d5fdb324f3cb506557929e84537.png)














