2024-01-18
原文作者:hashcon 原文地址: https://zhanghaoxin.blog.csdn.net/article/details/51270847
3.5.1 后端连接获取与负载均衡

上一节我们讲了后端连接的基本建立和响应处理,那么这些后端连接是什么时候建立的呢?
首先,MyCat配置文件中,DataHost标签中有minIdle这个属性。代表在MyCat初始化时,会在这个DataHost上初始化维护多少个连接(这些连接可以理解为连接池)。每个前端Client连接会创建Session,而Session会根据命令的不同而创建不同的Handler。每个Handler会从连接池中拿出所需要的连接并使用。在连接池大小不够时,RW线程会异步驱使新建所需的连接补充连接池,但是连接数最大不能超过配置的maxCon。同时,如之前所述,有定时线程检查并回收空闲后端连接。但池中最小不会小于minCon。
我们可以通过后端连接的工厂方法的调用链来理解:

202401182020324761.png
看这个调用链,我们简述下大概的流程。

Created with Raphaël 2.2.0MyCat接受客户端连接并为之建立唯一绑定的SessionMyCat接受客户端的请求,计算路由根据请求和路由创建合适的handler,这里为SingleNodeHandler从PhysicalDBNode中获取后端连接尝试获取连接,连接够用?将请求发送给对应连接,处理完之后归还连接尝试异步创建新的连接通过DelegateResponseHandler将连接与之前的Handler,这里是SingleNodeHandler绑定yesno

我们先从Session看起,在MyCat中实现类为NonBlockingSession。在前端连接建立时,会创建绑定唯一的Session:
ServerConnectionFactory.java:

    protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
        SystemConfig sys = MycatServer.getInstance().getConfig().getSystem();
        ServerConnection c = new ServerConnection(channel);
        MycatServer.getInstance().getConfig().setSocketParams(c, true);
        c.setPrivileges(MycatPrivileges.instance());
        c.setQueryHandler(new ServerQueryHandler(c));
        c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c));
        // c.setPrepareHandler(new ServerPrepareHandler(c));
        c.setTxIsolation(sys.getTxIsolation());
        //创建绑定唯一Session
        c.setSession2(new NonBlockingSession(c));
        return c;
    }

Session主要处理事务,多节点转发协调等,由不同的ResponseHandler实现;

202401182020331902.png
这些ResponseHandler我们之后会在对应的模块去细细分析。这里先跳过。
查看SingleNodeHandler的处理方法
SingleNodeHandler.java:

    public void execute() throws Exception {
    	//从这里开始计算处理时间
    	startTime=System.currentTimeMillis();
    	ServerConnection sc = session.getSource();
    	this.isRunning = true;
    	this.packetId = 0;
    	final BackendConnection conn = session.getTarget(node);
    	//之前是否获取过Connection并且Connection有效
    	if (session.tryExistsCon(conn, node)) {
    		_execute(conn);
    	} else {
    		// create new connection
    		MycatConfig conf = MycatServer.getInstance().getConfig();
    		//从config中获取DataNode
    		PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
    		//获取对应的数据库连接
    		dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this,
                       node);
    	}
    
    }

从PhysicalDBNode中获取合适的连接:

    public void getConnection(String schema,boolean autoCommit, RouteResultsetNode rrs,
    		ResponseHandler handler, Object attachment) throws Exception {
    	checkRequest(schema);
    	//检查数据库连接池是否初始化成功,因为有reload命令
    	if (dbPool.isInitSuccess()) {
    		//根据是否能在读节点上运行获取连接,一般是判断是否为读请求,并且读请求不在事务中
    		if (rrs.canRunnINReadDB(autoCommit)) {
    			//根据负载均衡策略选择合适的后端连接
    			dbPool.getRWBanlanceCon(schema,autoCommit, handler, attachment,
    					this.database);
    		} else {
    			//直接选择当前连接池中的的后端连接
    			dbPool.getSource().getConnection(schema,autoCommit, handler, attachment);
    		}
    
    	} else {
    		throw new IllegalArgumentException("Invalid DataSource:"
    				+ dbPool.getActivedIndex());
    	}
    }

PhysicalDBPool类中有负载均衡,切换writeHost,控制write方式等(分别对应balance,writeType等标签)的实现。首先我们看如果有负载均衡策略(配置了balance)的获取连接的方式:

    public void getRWBanlanceCon(String schema, boolean autocommit,
                                 ResponseHandler handler, Object attachment, String database) throws Exception {
    
        PhysicalDatasource theNode = null;
        ArrayList<PhysicalDatasource> okSources = null;
        switch (banlance) {
            //所有读写节点参与read请求的负载均衡,除了当前活跃的写节点,balance=1
            case BALANCE_ALL_BACK: {
                //返回所有写节点和符合条件的读节点,不包括当前的写节点
                okSources = getAllActiveRWSources(true, false, checkSlaveSynStatus());
                if (okSources.isEmpty()) {
                    //如果结果即为空,返回当前写节点
                    theNode = this.getSource();
                } else {
                    //不为空,随机选一个
                    theNode = randomSelect(okSources);
                }
                break;
            }
            //所有读写节点参与read请求的负载均衡,balance=2
            case BALANCE_ALL: {
                //返回所有写节点和符合条件的读节点
                okSources = getAllActiveRWSources(true, true, checkSlaveSynStatus());
                //随机选一个
                theNode = randomSelect(okSources);
                break;
            }
            case BALANCE_ALL_READ: {
                //返回所有符合条件的读节点
                okSources = getAllActiveRWSources(false, false, checkSlaveSynStatus());
                //随机取一个
                theNode = randomSelect(okSources);
                break;
            }
            //不做负载均衡,balance=0或其他不为以上的值
            case BALANCE_NONE:
            default:
                // return default write data source
                theNode = this.getSource();
        }
    
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("select read source " + theNode.getName() + " for dataHost:" + this.getHostName());
        }
        theNode.getConnection(schema, autocommit, handler, attachment);
    }

其中涉及到的方法:

  1. 返回符合条件节点集:
    private ArrayList<PhysicalDatasource> getAllActiveRWSources(
                boolean includeWriteNode, boolean includeCurWriteNode, boolean filterWithSlaveThreshold) {
    
            int curActive = activedIndex;
            ArrayList<PhysicalDatasource> okSources = new ArrayList<PhysicalDatasource>(this.allDs.size());
            //判断写节点
            for (int i = 0; i < this.writeSources.length; i++) {
                PhysicalDatasource theSource = writeSources[i];
                //判断写节点是否是active,可能reload会置为inactive,可能多个写节点但是只有一个是活跃在用的(writeType=0)
                if (isAlive(theSource)) {
                    //负载均衡策略是否包含写节点
                    if (includeWriteNode) {
                        //判断是否包含当前活跃的写入节点
                        if (i == curActive && includeCurWriteNode == false) {
                            // not include cur active source
                        } else if (filterWithSlaveThreshold) {
                            //如果包含从节点同步延迟限制,检查同步状态
                            if (canSelectAsReadNode(theSource)) {
                                okSources.add(theSource);
                            } else {
                                //如果同步状态不对,则不添加这个写节点
                                continue;
                            }
    
                        } else {
                            okSources.add(theSource);
                        }
                    }
                    //检查theSource对应的读节点
                    if (!readSources.isEmpty()) {
    
                        // 检查theSource对应的读节点(从节点)
                        PhysicalDatasource[] allSlaves = this.readSources.get(i);
                        if (allSlaves != null) {
                            for (PhysicalDatasource slave : allSlaves) {
                                if (isAlive(slave)) {
                                    //如果包含从节点同步延迟限制,检查同步状态
                                    if (filterWithSlaveThreshold) {
                                        if (canSelectAsReadNode(slave)) {
                                            //如果同步状态正确,则把读节点加入
                                            okSources.add(slave);
                                        } else {
                                            continue;
                                        }
    
                                    } else {
                                        okSources.add(slave);
                                    }
                                }
                            }
                        }
                    }
    
                } else {
    
                    // TODO : add by zhuam
                    // 如果写节点不OK, 也要保证临时的读服务正常
                    if (this.dataHostConfig.isTempReadHostAvailable()) {
    
                        if (!readSources.isEmpty()) {
                            // check all slave nodes
                            PhysicalDatasource[] allSlaves = this.readSources.get(i);
                            if (allSlaves != null) {
                                for (PhysicalDatasource slave : allSlaves) {
                                    if (isAlive(slave)) {
    
                                        if (filterWithSlaveThreshold) {
                                            if (canSelectAsReadNode(slave)) {
                                                okSources.add(slave);
                                            } else {
                                                continue;
                                            }
    
                                        } else {
                                            okSources.add(slave);
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
    
            }
            return okSources;
        }
  1. 检查是否判断主从延迟:
    private boolean checkSlaveSynStatus() {
            return (dataHostConfig.getSlaveThreshold() != -1)
                    && (dataHostConfig.getSwitchType() == DataHostConfig.SYN_STATUS_SWITCH_DS);
        }
  1. 随机选择节点:
    /**
         * TODO: modify by zhuam
         * <p/>
         * 随机选择,按权重设置随机概率。
         * 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
         *
         * @param okSources
         * @return
         */
        public PhysicalDatasource randomSelect(ArrayList<PhysicalDatasource> okSources) {
    
            if (okSources.isEmpty()) {
                return this.getSource();
    
            } else {
    
                int length = okSources.size();    // 总个数
                int totalWeight = 0;            // 总权重
                boolean sameWeight = true;        // 权重是否都一样
                for (int i = 0; i < length; i++) {
                    int weight = okSources.get(i).getConfig().getWeight();
                    totalWeight += weight;        // 累计总权重
                    if (sameWeight && i > 0
                            && weight != okSources.get(i - 1).getConfig().getWeight()) {      // 计算所有权重是否一样
                        sameWeight = false;
                    }
                }
    
                if (totalWeight > 0 && !sameWeight) {
    
                    // 如果权重不相同且权重大于0则按总权重数随机
                    int offset = random.nextInt(totalWeight);
    
                    // 并确定随机值落在哪个片断上
                    for (int i = 0; i < length; i++) {
                        offset -= okSources.get(i).getConfig().getWeight();
                        if (offset < 0) {
                            return okSources.get(i);
                        }
                    }
                }
    
                // 如果权重相同或权重为0则均等随机
                return okSources.get(random.nextInt(length));
    
                //int index = Math.abs(random.nextInt()) % okSources.size();
                //return okSources.get(index);
            }
        }
  1. 根据writeType获取当前writeHost方法:
    public PhysicalDatasource getSource() {
    
            switch (writeType) {
                //writeType=0,返回当前active的writeHost
                case WRITE_ONLYONE_NODE: {
                    return writeSources[activedIndex];
                }
                //writeType=1,随机发到一个writeHost
                case WRITE_RANDOM_NODE: {
    
                    int index = Math.abs(wnrandom.nextInt()) % writeSources.length;
                    PhysicalDatasource result = writeSources[index];
                    if (!this.isAlive(result)) {
    
                        // find all live nodes
                        ArrayList<Integer> alives = new ArrayList<Integer>(writeSources.length - 1);
                        for (int i = 0; i < writeSources.length; i++) {
                            if (i != index) {
                                if (this.isAlive(writeSources[i])) {
                                    alives.add(i);
                                }
                            }
                        }
    
                        if (alives.isEmpty()) {
                            result = writeSources[0];
                        } else {
                            // random select one
                            index = Math.abs(wnrandom.nextInt()) % alives.size();
                            result = writeSources[alives.get(index)];
    
                        }
                    }
    
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("select write source " + result.getName()
                                + " for dataHost:" + this.getHostName());
                    }
                    return result;
                }
                //参数不正确
                default: {
                    throw new java.lang.IllegalArgumentException("writeType is "
                            + writeType + " ,so can't return one write datasource ");
                }
            }
    
        }
阅读全文