4.配置模块
每个MyCatServer初始化时,会初始化:
MyCatServer.java:
public static final String NAME = "MyCat";
private static final long LOG_WATCH_DELAY = 60000L;
private static final long TIME_UPDATE_PERIOD = 20L;
private static final MycatServer INSTANCE = new MycatServer();
private static final Logger LOGGER = LoggerFactory.getLogger("MycatServer");
private final RouteService routerService;
private final CacheService cacheService;
private Properties dnIndexProperties;
//AIO连接群组
private AsynchronousChannelGroup[] asyncChannelGroups;
private volatile int channelIndex = 0;
//全局序列号
private final MyCATSequnceProcessor sequnceProcessor = new MyCATSequnceProcessor();
private final DynaClassLoader catletClassLoader;
private final SQLInterceptor sqlInterceptor;
private volatile int nextProcessor;
private BufferPool bufferPool;
private boolean aio = false;
//XA事务全局ID生成
private final AtomicLong xaIDInc = new AtomicLong();
private MycatServer() {
//读取文件配置
this.config = new MycatConfig();
//定时线程池,单线程线程池
scheduler = Executors.newSingleThreadScheduledExecutor();
//SQL记录器
this.sqlRecorder = new SQLRecorder(config.getSystem()
.getSqlRecordCount());
/**
* 是否在线,MyCat manager中有命令控制
* | offline | Change MyCat status to OFF |
* | online | Change MyCat status to ON |
*/
this.isOnline = new AtomicBoolean(true);
//缓存服务初始化
cacheService = new CacheService();
//路由计算初始化
routerService = new RouteService(cacheService);
// load datanode active index from properties
dnIndexProperties = loadDnIndexProps();
try {
//SQL解析器
sqlInterceptor = (SQLInterceptor) Class.forName(
config.getSystem().getSqlInterceptor()).newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
//catlet加载器
catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath()
+ File.separator + "catlet", config.getSystem()
.getCatletClassCheckSeconds());
//记录启动时间
this.startupTime = TimeUtil.currentTimeMillis();
}
第一步是读取文件配置,主要是三个文件:schema.xml,rule.xml和server.xml. 读取后的配置会加载到MyCatConfig中。
MyCatConfig.java:
public MycatConfig() {
//读取schema.xml,rule.xml和server.xml
ConfigInitializer confInit = new ConfigInitializer(true);
this.system = confInit.getSystem();
this.users = confInit.getUsers();
this.schemas = confInit.getSchemas();
this.dataHosts = confInit.getDataHosts();
this.dataNodes = confInit.getDataNodes();
for (PhysicalDBPool dbPool : dataHosts.values()) {
dbPool.setSchemas(getDataNodeSchemasOfDataHost(dbPool.getHostName()));
}
this.quarantine = confInit.getQuarantine();
this.cluster = confInit.getCluster();
//初始化重加载配置时间
this.reloadTime = TimeUtil.currentTimeMillis();
this.rollbackTime = -1L;
this.status = RELOAD;
//配置加载锁
this.lock = new ReentrantLock();
}
它们都通过ConfigInitializer读取:
public ConfigInitializer(boolean loadDataHost) {
//读取schema.xml
SchemaLoader schemaLoader = new XMLSchemaLoader();
//读取server.xml
XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);
schemaLoader = null;
//加载配置
this.system = configLoader.getSystemConfig();
this.users = configLoader.getUserConfigs();
this.schemas = configLoader.getSchemaConfigs();
//是否重新加载DataHost和对应的DataNode
if (loadDataHost) {
this.dataHosts = initDataHosts(configLoader);
this.dataNodes = initDataNodes(configLoader);
}
//权限管理
this.quarantine = configLoader.getQuarantineConfig();
this.cluster = initCobarCluster(configLoader);
//不同类型的全局序列处理器的配置加载
if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_MYSQLDB) {
IncrSequenceMySQLHandler.getInstance().load();
}
if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_LOCAL_TIME) {
IncrSequenceTimeHandler.getInstance().load();
}
//检查user与schema配置对应以及schema配置不为空
this.checkConfig();
}
4.1 rule.xml
读取schema之前会先读取rule.xml。
XmlSchemaLoader.java:
public XMLSchemaLoader(String schemaFile, String ruleFile) {
//先读取rule.xml
XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);
this.tableRules = ruleLoader.getTableRules();
ruleLoader = null;
this.dataHosts = new HashMap<String, DataHostConfig>();
this.dataNodes = new HashMap<String, DataNodeConfig>();
this.schemas = new HashMap<String, SchemaConfig>();
//读取加载schema配置
this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
}
public XMLSchemaLoader() {
this(null, null);
}
XMLRuleLoader.java:
public XMLRuleLoader(String ruleFile) {
// this.rules = new HashSet<RuleConfig>();
//rule名 -> rule
this.tableRules = new HashMap<String, TableRuleConfig>();
//function名 -> 具体分片算法
this.functions = new HashMap<String, AbstractPartitionAlgorithm>();
//默认为:/rule.dtd和/rule.xml
load(DEFAULT_DTD, ruleFile == null ? DEFAULT_XML : ruleFile);
}
public XMLRuleLoader() {
this(null);
}
private void load(String dtdFile, String xmlFile) {
InputStream dtd = null;
InputStream xml = null;
try {
dtd = XMLRuleLoader.class.getResourceAsStream(dtdFile);
xml = XMLRuleLoader.class.getResourceAsStream(xmlFile);
//读取出语意树
Element root = ConfigUtil.getDocument(dtd, xml)
.getDocumentElement();
//加载Function
loadFunctions(root);
//加载TableRule
loadTableRules(root);
} catch (ConfigException e) {
throw e;
} catch (Exception e) {
throw new ConfigException(e);
} finally {
if (dtd != null) {
try {
dtd.close();
} catch (IOException e) {
}
}
if (xml != null) {
try {
xml.close();
} catch (IOException e) {
}
}
}
}
ConfigUtil.java解析语意树:
public static Document getDocument(final InputStream dtd, InputStream xml) throws ParserConfigurationException,
SAXException, IOException {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setValidating(true);
factory.setNamespaceAware(false);
DocumentBuilder builder = factory.newDocumentBuilder();
builder.setEntityResolver(new EntityResolver() {
@Override
public InputSource resolveEntity(String publicId, String systemId) {
return new InputSource(dtd);
}
});
builder.setErrorHandler(new ErrorHandler() {
@Override
public void warning(SAXParseException e) {
}
@Override
public void error(SAXParseException e) throws SAXException {
throw e;
}
@Override
public void fatalError(SAXParseException e) throws SAXException {
throw e;
}
});
return builder.parse(xml);
}
加载functions, XmlRuleLoader.java
private void loadFunctions(Element root) throws ClassNotFoundException,
InstantiationException, IllegalAccessException,
InvocationTargetException {
NodeList list = root.getElementsByTagName("function");
for (int i = 0, n = list.getLength(); i < n; ++i) {
Node node = list.item(i);
if (node instanceof Element) {
Element e = (Element) node;
//获取name标签
String name = e.getAttribute("name");
//如果Map已有,则function重复
if (functions.containsKey(name)) {
throw new ConfigException("rule function " + name
+ " duplicated!");
}
//获取class标签
String clazz = e.getAttribute("class");
//根据class利用反射新建分片算法
AbstractPartitionAlgorithm function = createFunction(name, clazz);
ParameterMapping.mapping(function, ConfigUtil.loadElements(e));
//每个AbstractPartitionAlgorithm可能会实现init来初始化
function.init();
//放入functions map
functions.put(name, function);
}
}
}
private AbstractPartitionAlgorithm createFunction(String name, String clazz)
throws ClassNotFoundException, InstantiationException,
IllegalAccessException, InvocationTargetException {
Class<?> clz = Class.forName(clazz);
//判断是否继承AbstractPartitionAlgorithm
if (!AbstractPartitionAlgorithm.class.isAssignableFrom(clz)) {
throw new IllegalArgumentException("rule function must implements "
+ AbstractPartitionAlgorithm.class.getName() + ", name=" + name);
}
return (AbstractPartitionAlgorithm) clz.newInstance();
}
加载所有的function的node,每一个node就是一个AbstractPartitionAlgorithm,并放入functions这个map中;
private final Map<String, TableRuleConfig> tableRules;
对于每一个node,通过反射新建对应参数的AbstractPartitionAlgorithm。这样,所有的function就加载到了functions这个map中。
同理,加载TableRule,就加上了function是否存在的判断:
/**
* tableRule标签结构:
* <tableRule name="sharding-by-month">
* <rule>
* <columns>create_date</columns>
* <algorithm>partbymonth</algorithm>
* </rule>
* </tableRule>
* @param root
* @throws SQLSyntaxErrorException
*/
private void loadTableRules(Element root) throws SQLSyntaxErrorException {
//获取每个tableRule标签
NodeList list = root.getElementsByTagName("tableRule");
for (int i = 0, n = list.getLength(); i < n; ++i) {
Node node = list.item(i);
if (node instanceof Element) {
Element e = (Element) node;
//先判断是否重复
String name = e.getAttribute("name");
if (tableRules.containsKey(name)) {
throw new ConfigException("table rule " + name
+ " duplicated!");
}
//获取rule标签
NodeList ruleNodes = e.getElementsByTagName("rule");
int length = ruleNodes.getLength();
if (length > 1) {
throw new ConfigException("only one rule can defined :"
+ name);
}
//目前只处理第一个,未来可能有多列复合逻辑需求
//RuleConfig是保存着rule与function对应关系的对象
RuleConfig rule = loadRule((Element) ruleNodes.item(0));
String funName = rule.getFunctionName();
//判断function是否存在,获取function
AbstractPartitionAlgorithm func = functions.get(funName);
if (func == null) {
throw new ConfigException("can't find function of name :"
+ funName);
}
rule.setRuleAlgorithm(func);
//保存到tableRules
tableRules.put(name, new TableRuleConfig(name, rule));
}
}
}
这样,所有的tableRule和function就加载完毕。保存在一个变量中,就是tableRules:
XMLRuleLoader.java:
private final Map<String, TableRuleConfig> tableRules;
4.2 schema.xml:
public XMLSchemaLoader(String schemaFile, String ruleFile) {
//先读取rule.xml
XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);
//将tableRules拿出,用于这里加载Schema做rule有效判断,以及之后的分片路由计算
this.tableRules = ruleLoader.getTableRules();
//释放ruleLoader
ruleLoader = null;
this.dataHosts = new HashMap<String, DataHostConfig>();
this.dataNodes = new HashMap<String, DataNodeConfig>();
this.schemas = new HashMap<String, SchemaConfig>();
//读取加载schema配置
this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
}
private void load(String dtdFile, String xmlFile) {
InputStream dtd = null;
InputStream xml = null;
try {
dtd = XMLSchemaLoader.class.getResourceAsStream(dtdFile);
xml = XMLSchemaLoader.class.getResourceAsStream(xmlFile);
Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();
//先加载所有的DataHost
loadDataHosts(root);
//再加载所有的DataNode
loadDataNodes(root);
//最后加载所有的Schema
loadSchemas(root);
} catch (ConfigException e) {
throw e;
} catch (Exception e) {
throw new ConfigException(e);
} finally {
if (dtd != null) {
try {
dtd.close();
} catch (IOException e) {
}
}
if (xml != null) {
try {
xml.close();
} catch (IOException e) {
}
}
}
}
先看下DataHostConfig这个类的结构:
XMLSchemaLoader.java:
private void loadDataHosts(Element root) {
NodeList list = root.getElementsByTagName("dataHost");
for (int i = 0, n = list.getLength(); i < n; ++i) {
Element element = (Element) list.item(i);
String name = element.getAttribute("name");
//判断是否重复
if (dataHosts.containsKey(name)) {
throw new ConfigException("dataHost name " + name + "duplicated!");
}
//读取最大连接数
int maxCon = Integer.valueOf(element.getAttribute("maxCon"));
//读取最小连接数
int minCon = Integer.valueOf(element.getAttribute("minCon"));
/**
* 读取负载均衡配置
* 1. balance="0", 不开启分离机制,所有读操作都发送到当前可用的 writeHost 上。
* 2. balance="1",全部的 readHost 和 stand by writeHost 参不 select 的负载均衡
* 3. balance="2",所有读操作都随机的在 writeHost、readhost 上分发。
* 4. balance="3",所有读请求随机的分发到 wiriterHost 对应的 readhost 执行,writerHost 不负担读压力
*/
int balance = Integer.valueOf(element.getAttribute("balance"));
/**
* 读取切换类型
* -1 表示不自动切换
* 1 默认值,自动切换
* 2 基于MySQL主从同步的状态决定是否切换
* 心跳询句为 show slave status
* 3 基于 MySQL galary cluster 的切换机制
*/
String switchTypeStr = element.getAttribute("switchType");
int switchType = switchTypeStr.equals("") ? -1 : Integer.valueOf(switchTypeStr);
//读取从延迟界限
String slaveThresholdStr = element.getAttribute("slaveThreshold");
int slaveThreshold = slaveThresholdStr.equals("") ? -1 : Integer.valueOf(slaveThresholdStr);
//如果 tempReadHostAvailable 设置大于 0 则表示写主机如果挂掉, 临时的读服务依然可用
String tempReadHostAvailableStr = element.getAttribute("tempReadHostAvailable");
boolean tempReadHostAvailable = tempReadHostAvailableStr.equals("") ? false : Integer.valueOf(tempReadHostAvailableStr) > 0;
/**
* 读取 写类型
* 这里只支持 0 - 所有写操作仅配置的第一个 writeHost
*/
String writeTypStr = element.getAttribute("writeType");
int writeType = "".equals(writeTypStr) ? PhysicalDBPool.WRITE_ONLYONE_NODE : Integer.valueOf(writeTypStr);
String dbDriver = element.getAttribute("dbDriver");
String dbType = element.getAttribute("dbType");
String filters = element.getAttribute("filters");
String logTimeStr = element.getAttribute("logTime");
long logTime = "".equals(logTimeStr) ? PhysicalDBPool.LONG_TIME : Long.valueOf(logTimeStr) ;
//读取心跳语句
String heartbeatSQL = element.getElementsByTagName("heartbeat").item(0).getTextContent();
//读取 初始化sql配置,用于oracle
NodeList connectionInitSqlList = element.getElementsByTagName("connectionInitSql");
String initConSQL = null;
if (connectionInitSqlList.getLength() > 0) {
initConSQL = connectionInitSqlList.item(0).getTextContent();
}
//读取writeHost
NodeList writeNodes = element.getElementsByTagName("writeHost");
DBHostConfig[] writeDbConfs = new DBHostConfig[writeNodes.getLength()];
Map<Integer, DBHostConfig[]> readHostsMap = new HashMap<Integer, DBHostConfig[]>(2);
for (int w = 0; w < writeDbConfs.length; w++) {
Element writeNode = (Element) writeNodes.item(w);
writeDbConfs[w] = createDBHostConf(name, writeNode, dbType, dbDriver, maxCon, minCon,filters,logTime);
NodeList readNodes = writeNode.getElementsByTagName("readHost");
//读取对应的每一个readHost
if (readNodes.getLength() != 0) {
DBHostConfig[] readDbConfs = new DBHostConfig[readNodes.getLength()];
for (int r = 0; r < readDbConfs.length; r++) {
Element readNode = (Element) readNodes.item(r);
readDbConfs[r] = createDBHostConf(name, readNode, dbType, dbDriver, maxCon, minCon,filters, logTime);
}
readHostsMap.put(w, readDbConfs);
}
}
DataHostConfig hostConf = new DataHostConfig(name, dbType, dbDriver,
writeDbConfs, readHostsMap, switchType, slaveThreshold, tempReadHostAvailable);
hostConf.setMaxCon(maxCon);
hostConf.setMinCon(minCon);
hostConf.setBalance(balance);
hostConf.setWriteType(writeType);
hostConf.setHearbeatSQL(heartbeatSQL);
hostConf.setConnectionInitSql(initConSQL);
hostConf.setFilters(filters);
hostConf.setLogTime(logTime);
dataHosts.put(hostConf.getName(), hostConf);
}
}
先读取每个DataHost的通用配置,之后读取每个DataHost对应的writeHost以及每个writeHost对应的readHost。配置好后,保存在:
private final Map<String, DataHostConfig> dataHosts;
之后读取载入DataHost:
XMLSchemaLoader.java:
private void loadDataNodes(Element root) {
//读取DataNode分支
NodeList list = root.getElementsByTagName("dataNode");
for (int i = 0, n = list.getLength(); i < n; i++) {
Element element = (Element) list.item(i);
String dnNamePre = element.getAttribute("name");
String databaseStr = element.getAttribute("database");
String host = element.getAttribute("dataHost");
//字符串不为空
if (empty(dnNamePre) || empty(databaseStr) || empty(host)) {
throw new ConfigException("dataNode " + dnNamePre + " define error ,attribute can't be empty");
}
//dnNames(name),databases(database),hostStrings(dataHost)都可以配置多个,以',', '$', '-'区分,但是需要保证database的个数*dataHost的个数=name的个数
//多个dataHost与多个database如果写在一个标签,则每个dataHost拥有所有database
//例如:<dataNode name="dn1$0-75" dataHost="localhost$1-10" database="db$0-759" />
//则为:localhost1拥有dn1$0-75,localhost2也拥有dn1$0-75(对应db$76-151)
String[] dnNames = io.mycat.util.SplitUtil.split(dnNamePre, ',', '$', '-');
String[] databases = io.mycat.util.SplitUtil.split(databaseStr, ',', '$', '-');
String[] hostStrings = io.mycat.util.SplitUtil.split(host, ',', '$', '-');
if (dnNames.length > 1 && dnNames.length != databases.length * hostStrings.length) {
throw new ConfigException("dataNode " + dnNamePre
+ " define error ,dnNames.length must be=databases.length*hostStrings.length");
}
if (dnNames.length > 1) {
List<String[]> mhdList = mergerHostDatabase(hostStrings, databases);
for (int k = 0; k < dnNames.length; k++) {
String[] hd = mhdList.get(k);
String dnName = dnNames[k];
String databaseName = hd[1];
String hostName = hd[0];
createDataNode(dnName, databaseName, hostName);
}
} else {
createDataNode(dnNamePre, databaseStr, host);
}
}
}
private void createDataNode(String dnName, String database, String host) {
DataNodeConfig conf = new DataNodeConfig(dnName, database, host);
if (dataNodes.containsKey(conf.getName())) {
throw new ConfigException("dataNode " + conf.getName() + " duplicated!");
}
if (!dataHosts.containsKey(host)) {
throw new ConfigException("dataNode " + dnName + " reference dataHost:" + host + " not exists!");
}
dataNodes.put(conf.getName(), conf);
}
生成的是DataNode类,放入:
private final Map<String, DataNodeConfig> dataNodes;