3. 连接模块
3.5 后端连接
3.5.2 后端连接获取与维护管理
还是那之前的流程,
Created with Raphaël 2.1.0MyCat接受客户端连接并为之建立唯一绑定的SessionMyCat接受客户端的请求,计算路由根据请求和路由创建合适的handler,这里为SingleNodeHandler从PhysicalDBNode中获取后端连接尝试获取连接,连接够用?将请求发送给对应连接,处理完之后归还连接尝试异步创建新的连接通过DelegateResponseHandler将连接与之前的Handler,这里是SingleNodeHandler绑定yesno
现在我们到了尝试获取连接的阶段
PhysicalDataSource.java:
public void getConnection(String schema,boolean autocommit, final ResponseHandler handler,
final Object attachment) throws IOException {
//从当前连接map中拿取已建立好的后端连接
BackendConnection con = this.conMap.tryTakeCon(schema,autocommit);
if (con != null) {
//如果不为空,则绑定对应前端请求的handler
takeCon(con, handler, attachment, schema);
return;
} else {
//如果为空,新建连接
int activeCons = this.getActiveCount();//当前最大活动连接
if(activeCons+1>size){//下一个连接大于最大连接数
LOGGER.error("the max activeConnnections size can not be max than maxconnections");
throw new IOException("the max activeConnnections size can not be max than maxconnections");
}else{ // create connection
LOGGER.info("not ilde connection in pool,create new connection for " + this.name
+ " of schema "+schema);
createNewConnection(handler, attachment, schema);
}
}
}
private void createNewConnection(final ResponseHandler handler,
final Object attachment, final String schema) throws IOException {
//异步创建连接,将连接的handler绑定为DelegateResponseHandler
MycatServer.getInstance().getBusinessExecutor().execute(new Runnable() {
public void run() {
try {
createNewConnection(new DelegateResponseHandler(handler) {
@Override
public void connectionError(Throwable e,
BackendConnection conn) {
handler.connectionError(e, conn);
}
@Override
public void connectionAcquired(BackendConnection conn) {
takeCon(conn, handler, attachment, schema);
}
}, schema);
} catch (IOException e) {
handler.connectionError(e, null);
}
}
});
}
异步调用工厂方法创建后端连接,这里为MySQLConnection
MySQLDataSource.java:
@Override
public void createNewConnection(ResponseHandler handler,String schema) throws IOException {
factory.make(this, handler,schema);
}
根据之前所述,MySQLConnection的工厂方法会先将NIOhandler设置为MySQLConnectionAuthenticator:
MySQLConnectionFactory.java:
public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
String schema) throws IOException {
//DBHost配置
DBHostConfig dsc = pool.getConfig();
//根据是否为NIO返回SocketChannel或者AIO的AsynchronousSocketChannel
NetworkChannel channel = openSocketChannel(MycatServer.getInstance()
.isAIO());
//新建MySQLConnection
MySQLConnection c = new MySQLConnection(channel, pool.isReadNode());
//根据配置初始化MySQLConnection
MycatServer.getInstance().getConfig().setSocketParams(c, false);
c.setHost(dsc.getIp());
c.setPort(dsc.getPort());
c.setUser(dsc.getUser());
c.setPassword(dsc.getPassword());
c.setSchema(schema);
//目前实际连接还未建立,handler为MySQL连接认证MySQLConnectionAuthenticator,传入的handler为后端连接处理器ResponseHandler
c.setHandler(new MySQLConnectionAuthenticator(c, handler));
c.setPool(pool);
c.setIdleTimeout(pool.getConfig().getIdleTimeout());
//AIO和NIO连接方式建立实际的MySQL连接
if (channel instanceof AsynchronousSocketChannel) {
((AsynchronousSocketChannel) channel).connect(
new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
(CompletionHandler) MycatServer.getInstance()
.getConnector());
} else {
//通过NIOConnector建立连接
((NIOConnector) MycatServer.getInstance().getConnector())
.postConnect(c);
}
return c;
}
这里传入的ResponseHandler为DelegateResponseHandler,在连接建立验证之后,会调用:
MySQLConnectionAuthenticator.java:
public void handle(byte[] data) {
//省略
//设置ResponseHandler
if (listener != null) {
listener.connectionAcquired(source);
}
//省略
}
DelegateResponseHandler.java:
private final ResponseHandler target;
@Override
public void connectionAcquired(BackendConnection conn) {
//将后端连接的ResponseHandler设置为target
target.connectionAcquired(conn);
}
这样,原来没获取到连接的ResponseHandler就获得需要的连接,之后进行处理。处理完后,归还到连接池中。
private void returnCon(BackendConnection c) {
//清空连接的Attachment
c.setAttachment(null);
//设置为未使用
c.setBorrowed(false);
//更新上次使用时间,用于清理空闲连接
c.setLastTime(TimeUtil.currentTimeMillis());
//获取连接池对应的队列
ConQueue queue = this.conMap.getSchemaConQueue(c.getSchema());
//按照是否Autocommit分类归还连接
boolean ok = false;
if (c.isAutocommit()) {
ok = queue.getAutoCommitCons().offer(c);
} else {
ok = queue.getManCommitCons().offer(c);
}
//归还失败,关闭连接,记录
if (!ok) {
LOGGER.warn("can't return to pool ,so close con " + c);
c.close("can't return to pool ");
}
}
4.配置模块
MyCat实例初始化时究竟会有什么操作呢?看下MyCat程序入口:
MycatStartup.java:
public static void main(String[] args) {
//是否启用zk配置,/myid.properties中的loadZk属性决定,默认不启用,从本地xml文件中读取配置
ZkConfig.instance().initZk();
try {
String home = SystemConfig.getHomePath();
if (home == null) {
System.out.println(SystemConfig.SYS_HOME + " is not set.");
System.exit(-1);
}
// init
MycatServer server = MycatServer.getInstance();
server.beforeStart();
// startup
server.startup();
System.out.println("MyCAT Server startup successfully. see logs in logs/mycat.log");
while (true) {
Thread.sleep(300 * 1000);
}
} catch (Exception e) {
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
LogLog.error(sdf.format(new Date()) + " startup error", e);
System.exit(-1);
}
}
从代码中,可以简单的分为三步:
- MycatServer.getInstance():获取MyCat实例,其实就是读取配置文件,并验证正确性等
- server.beforeStart():获取环境变量,日志配置
- server.startup():启动MyCat,启动线程,初始化线程池和连接池等。