2023-07-29
原文作者:说好不能打脸 原文地址:https://yinwj.blog.csdn.net/article/details/51577512

5-4、ESB-Client端的ActiveBrokerContext

本小节开始,我们将按照前文介绍的ESB-Client的核心步骤,一点一点的给出ESB-Client端和ESB-Broker进行交互的核心代码。为了方便在ESB-Client端进行ESB-Broker的交互,我们设计了一个ActiveBrokerContext类。读者可以将这个类理解为“ESB-Broker交互上下文”,在ESB-Client端中所有和ESB-Broker的交互过程都由在这里进行控制。我们首先看一下这个类中的主要定义:

    package esb.client;
    
    ......
    
    /**
     * 这是一个“上下文管理器”,用于为ESB-Client的客户端提供各种和ESB中间件相关的操作<br>
     * 例如,开始人员可以通过这个类获取当前为其服务的Active_Broker调用信息;
     * 还可以通过它主动通知某个Broker已经不可用了。<br>
     * @author yinwenjie
     */
    public class ActiveBrokerContext {
        /**
         * 是否已经完成初始化动作
         */
        private static Boolean IS_STARTED = false;
        /**
         * 是否正在进行初始化动作
         */
        private static Boolean IS_STARTING = false;
        /**
         * ESB-Client上唯一一个ZK客户端对象
         */
        private static CuratorFramework ZK_CLIENT;
        /**
         * uuid是这个ESB-Client的唯一编号。用来识别究竟是哪个ESB-Client变更了远程ESB-Broker的状态的
         */
        private final static String CLIENT_UUID = UUID.randomUUID().toString();
        /**
         * 日志
         */
        private final static Log LOGGER = LogFactory.getLog(ActiveBrokerContext.class);
        /**
         * 启动ESB-Client的上下文
         * @param zkConnectings 实例程序中为:"192.168.61.140:2181"
         * @return
         */
        public Boolean start(String zkConnectings) {......}
        /**
         * 这个方法将从当前“active_Broker” Path Node的所有子节点中
         * 选择一个可用的节点作为为本ESB-Client服务的节点
         * @return 
         */
        public ESBBrokerInfo randomTarget() {......}
        /**
         * 该方法用于向ZK服务端报告当前ESB-Broker出现错误
         */
        public void reportTargetError() {......}
        /**
         * 向外部调用者返回当前ESB-Client的唯一编号
         * @return
         */
        public String getUUID() {......}
        /**
         * 这个私有方法才是主要的选择方法,由start()方法和randomTarget()方法重用
         * @return
         */
        private ESBBrokerInfo doRandomTarget() {......}
    }

以上代码片段中每一个重要的静态变量和公共方法的作用都已经通过相应的注解说明的比较清楚了,唯一要说明的是doRandomTarget这个私有方法。在上一小节中的图例说明中,我们可以看到“选择一个ESB-Broker”这个操作由两个事件原由:一个是“完成业务部分的启动”后进行触发,另一个是由ZK服务端产生的事件触发。不过无论是哪一种事件原由触发的其选择ESB-Broker的过程都是相同的,所以我们试图通过一个名叫doRandomTarget的私有方法重用这一部分代码。

那么包括本小节在内的后续几个小节,我们将一起来填充这个类中的各个方法的代码片段。在笔者博客的下载区中也会提供这个类完整代码的下载, 以方便各位读者进行阅读(http://download.csdn.net/detail/yinwenjie/9600996)。

5-5、ESB-Client选择ESB-Broker

选择的核心过程是比较简单的,既是通过一个随机数并基于当前从zk服务端查询到的所有服务器的数量进行区域操作即可。取余的结果将是这个ESB-Client所选择的ESB-Broker在整个ESB-Brokers集合中的索引位置的依据,如下图所示:

202307292201317821.png

作为ESB中间件的设计和开发人员,您当然可以采用其它算法过程来计算为这个ESB-Client所服务的ESB-Broker。例如,您可以以ESB-Broker已经关联的ESB-Client的数量为权重依据,计算新的ESB-Client所关联的ESB-Broker。在这种加权计算方式中,当前较少关联ESB-Client的ESB-Broker将被有限分配给新的ESB-Client进行服务。但是真的计算方式也对已有的数据组织结构和算法过程提出了新的要求:

  • 您要知道当前每个ESB-Broker下已经关联了哪些ESB-Client,就至少需要在zookeeper服务端的每一个ESB-Broker Path Node下记录这些ESB-Client信息,那么ESB-Broker Path Node的特性就不能是EPHEMERAL(临时节点)类型的,因为 zookeeper规定临时节点Path Node下不能创建子节点 。如果您非要这么做,那么已设计好的ESB-Broker节点的变化事件通知机制就要连带做相应的调整。
  • 另外由于计算方式所依据的权中是当前ESB-Broker所关联的ESB-Client数量,那么 为了保证这个权值依据是准确的,就需要当前同时启动的多个ESB-Client在选择“为自己服务的ESB-Broker节点”这一步骤时一个一个的进行顺序启动,否则权值数据就会不准确 。这相较于“取余”的计算方式下不要求ESB-Client一个一个进行顺序启动而言,前者需要编写更多的控制代码:您需要利用zookeeper提供的EPHEMERAL_SEQUENTIAL特性来控制当前正在启动/发生状态变更的ESB-Client进行顺序启动。

这里我们给出“取余”计算的代码片段,请注意这个代码片段正式上一小节ActiveBrokerContext这个类定义中提到的doRandomTarget()私有方法的代码实现,这个方法将由start()方法和randomTarget()方法重用:

    ......
    /**
     * 这个私有方法才是主要的选择方法,由start()方法和randomTarget()方法重用
     * @return
     */
    private ESBBrokerInfo doRandomTarget() {
        /*
         * 处理过程在文章的文字描述部分已经进行了说明:
         * 1、首先查询当前zookeeper服务端,“active_Broker” Path Node的所有子节点信息
         * 这些信息就是所有的ESB-Broker节点信息, 并且为了处理方便,将它们转换为对象
         * 2、生成一个随机数,开始进行取余操作
         * 3、如果取到的broker的状态是正常的,则处理正常结束
         * 4、如果取到的broker状态时错误的,则试图使用Broker通用的测试地址进行一次连接
         *      4.1、如果连接成功,则更改zk服务端的broker状态为“正常”后,
         *      返回当前的broker并结束整个过程。
         *      4.2、如果连接失败,则将取余基数-1后,再进行第3步操作。
         *      直到取余基数  <= 0错误结束,或者选择到一个状态正确的broker节点
         * */
    
        // 1、========================
        // 按照之前的设计要求,每一个ESB-Broker的Path Node由一个固定的Broker单词 + IP信息构成
        // 而Path Node的Data区域存储了一个布尔型,表示该ESB-Broker的状态是否正常
        List<String> esbBrokePaths = null;
        try {
            esbBrokePaths = ZK_CLIENT.getChildren().forPath("/active_Brokers");
        } catch (Exception e) {
            LOGGER.error(e.getMessage() , e);
            return null;
        }
        if(esbBrokePaths == null || esbBrokePaths.isEmpty()) {
            LOGGER.warn("没有发现任何ESB-Brokes信息,启动过程终止");
            return null;
        }
    
        // 组装成对象,这样方便后续操作
        List<ESBBrokerInfo> esbBrokerInfos = new ArrayList<ESBBrokerInfo>();
        for (String esbBrokePath : esbBrokePaths) {
            String ip = esbBrokePath.split("_")[1];
            byte[] dataBytes = null;
            try {
                dataBytes = ZK_CLIENT.getData().forPath("/active_Brokers/" + esbBrokePath);
            } catch (Exception e) {
                LOGGER.error(e.getMessage() , e);
                return null;
            } 
    
            // 获取path node的data。注意,data有两种存储可能
            // 一种是包括了ESB-Client的UUID,一种是没有包括的
            String dataContext = new String(dataBytes);
            Boolean enable = null;
            if(dataContext.indexOf(",") != -1) {
                enable = Boolean.parseBoolean(new String(dataContext.split("\\,")[0]));
            } else {
                enable = Boolean.parseBoolean(new String(dataContext));
            }
    
            // 生成对象并赋值
            ESBBrokerInfo brokerInfo = new ESBBrokerInfo();
            brokerInfo.setEnable(enable);
            brokerInfo.setIp(ip);
            esbBrokerInfos.add(brokerInfo);
        }
    
        // 执行到这里,我们就有了当前所有的acvite_Brokers信息
        // 现在可以开始进行随机选择了
        int modBase = esbBrokerInfos.size();
        int random = new Random().nextInt(Integer.MAX_VALUE);
        int targetBrokerIndex = -1;
        // 一直寻找,要么找到可用的broker,要么在寻找esbBrokerInfos.size()次数后失败退出
        for(;;) {
            // 这种情况很特殊,可能的情况是所有的ESB-Broker节点状态都不正常
            if(modBase <= 0) {
                LOGGER.warn("并没有发现任何可用的ESB-Broker节点!启动过程终止");
                return null;
            }
    
            // 是否取得了正确的ESBBroker信息
            targetBrokerIndex = random % modBase;
            ESBBrokerInfo brokerInfo = esbBrokerInfos.get(targetBrokerIndex);
            // 如果条件成立,说明取得了状态正确的broker信息
            // 正常的情况下,运行到这里就结束了
            if(brokerInfo.getEnable()) {
                LOGGER.info("已选择到新的ESB-Broker节点:" + brokerInfo.getIp() + " !选择过程结束");
                return brokerInfo;
            }
    
            // 进行测试地址的调用。
            boolean isNormalBroker = false;
            String ip = brokerInfo.getIp();
            URLConnection connection = null;
            try {
                // 注意这个地址,这个地址可以是ESB-Broker启动时
                // Camel默认加载的一个http路由开始地址。
                URL testUrl = new URL("http://" + ip + ":8880/testBrokerStatus");
                connection = testUrl.openConnection();
                HttpURLConnection httpURLConnection = (HttpURLConnection)connection;
                if(httpURLConnection.getResponseCode() == 200) {
                    isNormalBroker = true;
                } 
            } catch(Exception e) {
                LOGGER.error(e.getMessage() , e);
                return null;
            }
    
            //如果条件成立,说明调用测试没有成功。
            //这时基准余数会 - 1(即余数 + 1)
            if(!isNormalBroker) {
                modBase--;
                continue;
            }
    
            // 如果执行到这里,说明测试成功。这时要更改该Path Node节点的远程状态
            LOGGER.info("已选择到新的节点:" + ip + " 并恢复其状态!选择过程结束");
            try {
                String data = "true," + CLIENT_UUID;
                ZK_CLIENT.setData().forPath("/active_Brokers/Broker_" + ip , data.getBytes());
            } catch (Exception e) { 
                LOGGER.error(e.getMessage() , e);
            } 
            return brokerInfo;
        } 
    }
    ......

这里要说明一个重要问题,即是如果选择到了一个状态为false的ESB-Broker后doRandomTarget的处理过程。当出现这种情况后,ESB-Client将会主动连接到这个ESB-Broker上一个默认提供的HTTP请求路径(这个路径是在ESB-Broker启动时默认加载到CamelContext中的),已确定当前这个ESB-Broker节点真的无法工作。如果这个HTTP请求路径连接失败了那倒是好说,ESB-Client会将取余的基数减一后重新进行ESB-Broker的选择。

但是如果ESB-Client成功请求到这个HTTP路径并且成功返回了响应,那么说明这个ESB-Broker已经回复工作了。这时ESB-Client会主动更改远程zookeeper服务上当前ESB-Broker节点描述的状态:从false变更为true,并且向调用者返回当前的ESB-Broker节点信息作为选择出来的服务节点。那么,问题来了:当远程zookeeper服务上当前ESB-Broker节点的状态发生变更后,所有的ESB-Client节点都会受到这个事件——包括这个ESB-Client本身, 这样就造成了这个触发事件的ESB-Client本身又要在事件触发时进行一次ESB-Broker节点的选择操作 ,而这次重复的选择操作对于这个触发事件的ESB-Client来说是没有实际意义的。

为了避免这种一个事件中重复进行选择操作的情况发生,我们在每个独立工作的ESB-Client启动时就为其创建一个唯一的UUID信息。当ESB-Client主动对ESB-Broker状态进行更新时,除了会改变这个ESB-Broker节点的状态值本身,还会将ESB-Client的UUID一同写到远程Path Node上:

    ......
    try {
        String data = "true," + CLIENT_UUID;
        ZK_CLIENT.setData().forPath("/active_Brokers/Broker_" + ip , data.getBytes());
    } catch (Exception e) { 
        LOGGER.error(e.getMessage() , e);
    } 
    ......

这样当ESB-Client收到Path Node事件时,就可以在将要发生不必要重复选择操作发生前,根据UUID判断出是否是自己触发的这次事件。以下代码是ActiveBrokersListener中的相关代码片段,后续的讲解中我们还将详细介绍这个类:

    ......
    if(event.getType() == Type.CHILD_UPDATED) {
        ......
        // 2.1、如果条件成立,说明这个事件是本ESB-Client出发的。并且无需重新为本ESB-Client选择新的Broker
        if(StringUtils.equals(activeBrokerContext.getUUID(), uuidValue) && isBrokerNormal) {
            return;
        } 
        ......
    }
    ......

5-6、ESB-Client启动和重新选择

在按照之前处理流程图设计所编写的代码中,有两种情况会触发doRandomTarget()这个私有方法:一种情况是ESB-Client第一次启动时,这里所说的“启动”是指ESB-Client所代表的业务系统已经完成启动时业务相关的启动过程后进行的“确定ESB-Broker”的过程。另一种情况是由于一些原因, 需要ESB-Client在工作不停止的前提下清空之前的选择结果并重新选择新的ESB-Broker 。这里说到的“一些原因”目前在ESB-Client的设计中的表现就是:ESB-Client收到zookeeper服务端的事件,通知其ESB-Brokers集群状态已经发生了变化。

5-6-1、ESB-Client第一次启动

    ......
    @SuppressWarnings("resource")
    public Boolean start(String zkConnectings) {
        /*
         * 读者是否想到使用单例的方式来实现ActiveBrokerContext?
         * 但是单例是无法避免开发者出现调用两次start()方法的。
         * 所以这里使用悲观锁 + 状态记录的方式进行限制
         * */
        synchronized (ActiveBrokerContext.class) {
            try {
                while(IS_STARTING) {
                    LOGGER.warn("发现有其它线程在同时进行ActiveBrokerContext的启动...本次操作开始等待...");
                    ActiveBrokerContext.class.wait();
                }
                // 如果条件成立,说明已经正常执行过start这个方法了。不需要多次执行
                if(IS_STARTED) {
                    LOGGER.warn("发现start()方法已经正常执行过,无须重复执行。忽略本次执行");
                    return null;
                } 
    
                IS_STARTING = true;
            } catch(InterruptedException e) {
                LOGGER.error(e.getMessage() , e);
                IS_STARTING = false;
                ActiveBrokerContext.class.notifyAll();
                return null;
            }
        }
    
        // 启动ZK客户端
        ZK_CLIENT = CuratorFrameworkFactory.newClient(zkConnectings, 30000, 30000, new RetryNTimes(50, 3000));
        ZK_CLIENT.start();
    
        // 开始进行选择
        ESBBrokerInfo selectedBroker = doRandomTarget();
    
        // 添加对active_Brokers path node下子节点的监听操作
        PathChildrenCache watcher = null;
        try {
            watcher = new PathChildrenCache(ZK_CLIENT, "/active_Brokers", true);
            watcher.getListenable().addListener(new ActiveBrokersListener());
            watcher.start();
        } catch(Exception e) {
            LOGGER.error(e.getMessage() , e);
            // 试图关闭监听者
            if(watcher != null) {
                try {
                    watcher.close();
                } catch (IOException ioe) {
                    LOGGER.error(ioe.getMessage() , ioe);
                }
            }
            IS_STARTING = false;
            ActiveBrokerContext.class.notifyAll();
            return null;
        }
    
        // 执行到这里,说明整个start过程完成
        if(selectedBroker != null) {
            IS_STARTED = true;  
        }
        IS_STARTING = false;
    
        // 通知其他正在等待的线程
        synchronized (ActiveBrokerContext.class) {
            ActiveBrokerContext.class.notifyAll();
        } 
        return selectedBroker;
    }
    ......

为什么start方法中我们需要使用悲观锁呢?在start()被调用的同时,引用程序中负责远程zookeeper服务Path Node变化监听的线程A可能已经收到了来自于zookeeper服务端的事件变化通知(我们在start方法内部就建立了针对zookeeper服务端,active_Broker节点下子节点变化的监听),这个时候A线程中就 可能 会再次触发ESB-Broker的选择过程。另外还会出现的一种情况,是开发人员在Client应用程序启动时,在多个线程中各自调用了多次start()启动方法。当这种情况出现,就意味着会在这些start()方法调用中多次初始化ZK-Client。无论是以上哪种情况,都是需要避免的。所以对于ActiveBrokerContext上下文管理器的代码设计至少需要做到的就是:

  • 在业务程序(既ESB-Client)启动和工作的过程中需要做到,无论调用多少次start()方法,无论什么情况下调用start()方法,该方法都只正常执行一次,多余的方法调用将在start()方法内部被自动忽略掉。
  • 除start()方法以外,其它需要对远程zookeeper端数据进行读写操作的方法(在ActiveBrokerContext类中就是randomTarget方法和reportTargetError)都需要在start()方法正确执行后才能继续执行。至于后者这些方法彼此间的执行过程,由于不存在执行冲突所以也不必在ActiveBrokerContext中进行关注。

5-6-2、ESB-Client重新选择ESB-Broker

通过本文之前的文字描述,对于ESB-Client重新进行ESB-Broker选择的原因、过程、注意事项就不需要再进行赘述了,这里我们选择直接上代码。

其中的代码实现只有一点需要注意:就像上一小节提到的那样,randomTarget方法中也需要通过悲观锁 + 启动状态记录的方式进行执行顺序控制——它必须在一个start()方法正确执行后才能执行。至于是否有多个ESB-Client同时执行ESB-Broker的选择过程,这个方法内部就无需关注了。

    ......
    /**
     * 这个方法将从当前“active_Broker” Path Node的所有子节点中
     * 选择一个可用的节点作为为本ESB-Client服务的节点
     * @return 
     */
    public ESBBrokerInfo randomTarget() {
        /*
         * 首先需要注意的是:如果当前ActiveBrokerContext没有完成启动过程
         * 那么调用randomTarget方法的其它线程都将等待
         * */
        synchronized (ActiveBrokerContext.class) {
            try {
                while(IS_STARTING) {
                    LOGGER.warn("发现有其它线程在同时进行ActiveBrokerContext的启动...本次操作开始等待...");
                    ActiveBrokerContext.class.wait();
                }
                // 如果条件成立,说明已经正常执行过start这个方法了。不需要多次执行
                if(!IS_STARTED) {
                    LOGGER.warn("发现start()没有执行或执行失败!本操作退出...");
                    return null;
                }
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage() , e);
                return null;
            }
        }
    
        // 开始选择
        return this.doRandomTarget();
    }
    ......

5-7、ESB-Client的事件监听

ESB-Client中的主要要对远程zookeeper服务端active_Brokers Path Node的直接子节点的变化情况进行监听。包括其下子节点增加、子节点减少和子节点Data区域的数据发生变化的三种事件。而就如前文所述,在监听中进行子节点Data区域数据变化的处理时,还需要判断出重复进行ESB-Broker选择的情况并进行过滤。这个类的主要代码如下所示:

    ......
    /**
     * 这个事件监听用于监听zookeeper服务端的active_Brokers Path Node下任何直接子节点的变化
     * @author yinwenjie
     */
    public class ActiveBrokersListener implements PathChildrenCacheListener {
    
        /* (non-Javadoc)
         * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
         */
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            /*
             * 当收到active_Brokers Path Node下子节点的变化事件后
             * 本ESB-Client要做以下事情:
             * 
             * 1、如果当前事件是子节点添加或者子节点删除事件
             * 则该ESB-Client将清空ActiveBrokerContext中记录的已选择的ESB-Broker节点
             * 
             * 2、如果当前事件是子节点的修改事件(即data区域记录的ESB-Broker状态发生了变化)
             * 则该ESB-Client进行以下判断
             *      2.1、如果当前变化事件是由本ESB-Client触发的,且服务状态又是正常的,则忽略本事件的处理
             *      因为,ESB-Client和ESB-Broker的对应已经通过ActiveBrokerContext中的randomTarget()完成了变化
             *      2.2、如果当前变化事件不是由本ESB-Client触发的,则做步骤1所做的事情
             * */
    
            ActiveBrokerContext activeBrokerContext = new ActiveBrokerContext();
            // 1、==============
            if(event.getType() == Type.CHILD_ADDED || event.getType() == Type.CHILD_REMOVED) {
                this.doRandomTarget(activeBrokerContext);
            }
    
            // 2、=============
            if(event.getType() == Type.CHILD_UPDATED) {
                byte[] dataBytes = event.getData().getData();
                String dataContext = new String(dataBytes);
                // 注意,有可能path node的data区域可能并没有UUID存在。
                if(dataContext.indexOf(",") != -1) {
                    Boolean isBrokerNormal = Boolean.valueOf(dataContext.split("\\,")[0]);
                    String uuidValue = dataContext.split("\\,")[1];
                    // 2.1、如果条件成立,说明这个事件是本ESB-Client出发的。并且无需重新为本ESB-Client选择新的Broker
                    if(StringUtils.equals(activeBrokerContext.getUUID(), uuidValue) && isBrokerNormal) {
                        return;
                    } 
                } 
    
                // 2.2、执行到这里就是2.2的情况了
                this.doRandomTarget(activeBrokerContext);
            }
        }
    
        /**
         * @param activeBrokerContext
         * @throws Exception
         */
        private void doRandomTarget(ActiveBrokerContext activeBrokerContext) throws Exception {
            ESBBrokerInfo targetBroker = activeBrokerContext.randomTarget();
    
            // 如果条件成立,说明由于各种原因现在没有选择到新的节点
            if(targetBroker == null) {
                throw new RuntimeException("没有选择到新的ESB-Broker节点,建议终止ESB-Client的工作!");
            }
        }
    }
    ......

6、关于上传的代码示例

最近几篇关于“自己动手设计ESB”文章中所涉及的代码工程笔者已经上传到了CSDN的资源区域,下载地址为:http://download.csdn.net/detail/yinwenjie/9600996。其中包括了两个工程,ESB-Client工程是ESB中间件的客户端,它由需要集成到ESB中间件服务的各个业务系统进行引用;另一个工程ESB-Server,就是ESB-Broker的启动和工作工程。其中的代码涵盖了ESB-Broker的启动和动态更新RouteBuilder、Processor的代码。

需要注意ESB-Server一共有三个启动类,分别是:BootStartup、BootStartupV2和BootStartupV3。为什么会有三个启动类呢?细心的读者可以发现我们在这5篇介绍ESB设计的文章中,随着设计的深入我们对ESB-Broker端的设计做了若干次调整,我们调整了ESB-Broker相关的zookeeper服务端数据结构、调整了事件监听的处理过程。而这几个版本的ESB-Broker启动过程正是这一步一步调整的历史记录。不过还是建议读者使用BootStartupV3直接进行启动并测试。

======================
自己动手设计ESB(完)

阅读全文