深入解析Tomcat 9的内部结构与代码实现
tomcat核心组件
我们先从tomcat的核心配置文件server.xml开始看起.下面是tomcat官网默认的server.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<Server port="8005" shutdown="SHUTDOWN">
<Listener className="org.apache.catalina.startup.VersionLoggerListener" />
<Listener className="org.apache.catalina.core.AprLifecycleListener" SSLEngine="on" />
<Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
<Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
<Listener className="org.apache.catalina.core.ThreadLocalLeakPreventionListener" />
<GlobalNamingResources>
<Resource name="UserDatabase" auth="Container"
type="org.apache.catalina.UserDatabase"
description="User database that can be updated and saved"
factory="org.apache.catalina.users.MemoryUserDatabaseFactory"
pathname="conf/tomcat-users.xml" />
</GlobalNamingResources>
<Service name="Catalina">
<Connector port="8080" protocol="HTTP/1.1"
connectionTimeout="20000"
redirectPort="8443" />
<Engine name="Catalina" defaultHost="localhost">
<Realm className="org.apache.catalina.realm.LockOutRealm">
<Realm className="org.apache.catalina.realm.UserDatabaseRealm"
resourceName="UserDatabase"/>
</Realm>
<Host name="localhost" appBase="webapps"
unpackWARs="true" autoDeploy="true">
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="localhost_access_log" suffix=".txt"
pattern="%h %l %u %t "%r" %s %b" />
</Host>
</Engine>
</Service>
</Server>
我们可以看到最外层是一个server
标签,代表整个tomcat服务器,内部有多个Listener
用于监听服务器的信息,还有GlobalNamingResources
用于JNDI相关。同级的还有Service
标签表示一个服务,service内部有Connector
标签,用于接受请求,接受到的请求传递给 Engine
,根据不同的域名分配给不同的Host
。我们看Host的标签就指向了tomcat的目录
每个目录就是一个应用,应用被tomcat封装成一个Context
。在应用中可以定义很多的serverlet,serverlet被tomcat定义为Wrapper
如果用一次请求来看tomcat的架构图如下
组件的生命周期
了解了tomcat的核心组件之后,我们发现他们都继承于一个Lifecycle,拥有生命周期方法
生命周期接口提供了以下方法
tomcat一共了一个LifecycleBase
的类来实现了Lifecycle的方法,然后自己又提供了一些抽象类来让子类实现核心的生命周期逻辑,采用模板方法。
Engine,Wrapper,Host,Context又属于一个 Container
容器。容器的特性是拥有一个Pipeline
管道,每个管道中又有若干的Valve
阀门,处理请求流程如图
启动流程
tomcat的入口函数是Bootstrap的main方法
.下面是简化代码
public static void main(String args[]) {
synchronized (daemonLock) {
// 创建一个bootstrap
Bootstrap bootstrap = new Bootstrap();
// 调用初始化方法,内部创建了catalina对象
bootstrap.init();
daemon = bootstrap;
}
String command = "start";
if (command.equals("start")) {
daemon.setAwait(true);
// 调用 bootstrap的load方法
daemon.load(args);
daemon.start();
if (null == daemon.getServer()) {
System.exit(1);
}
}
}
public void init() throws Exception {
initClassLoaders();
// 通过反射创建 catalina对象
Class<?> startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina");
Object startupInstance = startupClass.getConstructor().newInstance();
catalinaDaemon = startupInstance;
}
我们可以看到在Bootstrap中先创建了一个bootstrap对象,然后在初始化中又创建了一个catalina对象。之后执行的是Bootstrap的load方法
private void load(String[] arguments) throws Exception {
// 调用catalina的load方法
String methodName = "load";
Object param[];
Class<?> paramTypes[];
if (arguments==null || arguments.length==0) {
paramTypes = null;
param = null;
} else {
paramTypes = new Class[1];
paramTypes[0] = arguments.getClass();
param = new Object[1];
param[0] = arguments;
}
Method method =
catalinaDaemon.getClass().getMethod(methodName, paramTypes);
if (log.isDebugEnabled()) {
log.debug("Calling startup class " + method);
}
method.invoke(catalinaDaemon, param);
}
我们可以看到在bootstrap方法中,其实是通过反射调用了catalina的load方法,其中就有解析server.xml文件的方法,tomcat的核心组件就是在该方法中创建完成
。
public void load() {
// 解析Server.xml 文件
parseServerXml(true);
// 解析完xml后就可以获得服务对象
Server s = getServer();
if (s == null) {
return;
}
getServer().setCatalina(this);
getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());
// Stream redirection
initStreams();
// Start the new server
try {
// 调用服务的生命周期init方法
getServer().init();
} catch (LifecycleException e) {
if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
throw new java.lang.Error(e);
} else {
log.error(sm.getString("catalina.initError"), e);
}
}
if(log.isInfoEnabled()) {
log.info(sm.getString("catalina.init", Long.toString(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1))));
}
}
解析完xml后获得的server里面的组件都没有初始化
,生命周期函数还没执行,在后续的getServer().init()
方法中进行初始化,其实内部是执行了 StandardServer的InitInternal方法
protected void initInternal() throws LifecycleException {
// JNDI初始化
globalNamingResources.init();
// service初始化
for (Service service : services) {
service.init();
}
}
我们可以看到在server初始化的时候,调用了Globalnaming和service的初始化。我们来看看service初始化都做了什么
protected void initInternal() throws LifecycleException {
super.initInternal();
// 初始化engine
if (engine != null) {
engine.init();
}
// 初始化 connector
synchronized (connectorsLock) {
for (Connector connector : connectors) {
connector.init();
}
}
}
我们可以看到Service里面初始化了Engine和Connector。engine的初始化内部没有做什么事情
。重点来看connector的初始化方法
protected void initInternal() throws LifecycleException {
super.initInternal();
if (protocolHandler == null) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
}
// 初始化了一个适配器
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
if (service != null) {
protocolHandler.setUtilityExecutor(service.getServer().getUtilityExecutor());
}
try {
// 初始化了协议处理器
protocolHandler.init();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
}
}
connector里面初始化了一个适配器 CoyoteAdapter
和协议处理器protocolHandler.init()
继续往下跟
public void init() throws Exception {
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);
endpoint.init();
}
我们发现协议处理器中又初始化了一个endpoint
端点,继续往下跟
public final void init() throws Exception {
if (bindOnInit) {
bindWithCleanup();
}
}
private void bindWithCleanup() throws Exception {
try {
bind();
} catch (Throwable t) {
// Ensure open sockets etc. are cleaned up if something goes
// wrong during bind
ExceptionUtils.handleThrowable(t);
unbind();
throw t;
}
}
public void bind() throws Exception {
initServerSocket();
setStopLatch(new CountDownLatch(1));
// Initialize SSL if needed
initialiseSsl();
}
protected void initServerSocket() throws Exception {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.bind(addr, getAcceptCount());
serverSock.configureBlocking(true); //mimic APR behavior
}
一路追踪源码,终于在 endpoint的initServerSocket
方法中看到了熟悉的开启Nio的代码,至此tomcat就算初始化完成。接下来又回到 bootstrap类继续往下执行。
daemon.load(args);
daemon.start();
load执行完成后,接下来就是执行start方法,其实整体流程和init差不多,需要注意的是在 engine的start方法中,engine属于ContainerBase
的子类,在ContainerBase方法中,实现了startInternal方法,我们来看engine调用start的逻辑
protected synchronized void startInternal() throws LifecycleException {
// Start our child containers, if any
Container children[] = findChildren();
List<Future<Void>> results = new ArrayList<>();
for (Container child : children) {
results.add(startStopExecutor.submit(new StartChild(child)));
}
// 启动管道
if (pipeline instanceof Lifecycle) {
((Lifecycle) pipeline).start();
}
}
private static class StartChild implements Callable<Void> {
private Container child;
public StartChild(Container child) {
this.child = child;
}
@Override
public Void call() throws LifecycleException {
child.start();
return null;
}
}
从代码中可以看出,在engine的start方法中其实就是用一个线程池并发的去startchild
组件,在启动完组件之后还需要让自己的pipeline
启动。
protected synchronized void startInternal() throws LifecycleException {
// Start the Valves in our pipeline (including the basic), if any
Valve current = first;
if (current == null) {
current = basic;
}
// 让管道里面的阀门启动
while (current != null) {
if (current instanceof Lifecycle) {
((Lifecycle) current).start();
}
current = current.getNext();
}
setState(LifecycleState.STARTING);
}
管道内部的启动也就是调用管道里面每一个valve
阀门的启动过程。具体细节不用太注意了,从刚才也可以看出来都是大同小异的东西。重点关注一下endpoint
的启动细节。
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// 真的工作的线程池
if (getExecutor() == null) {
createExecutor();
}
// 设置连接数限制
initializeConnectionLatch();
// 启动一个poller线程
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
// 启动一个 accept线程
startAcceptorThread();
}
}
// 创建线程池的方法
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
// 这里就是调优参数的 minSpareThreads 默认10 和 和 maxThreads 默认200
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
// 这里是tomcat调优的 maxConnections 参数 默认 8192
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) {
return null;
}
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections());
}
return connectionLimitLatch;
}
protected void startAcceptorThread() {
acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor";
acceptor.setThreadName(threadName);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
处理请求流程
Acceptor
我们先粗略的看了一下 endpoint的start方法,发现创建了一个线程池,默认初始大小是10,最大是200。然后又创建了一个poller线程和一个acceptor线程。所以接下来我们需要关注的是这些子线程都做了什么事情。我们先来看看Acceptor
的run方法
public void run() {
int errorDelay = 0;
long pauseStart = 0;
try {
while (!stopCalled) {
try {
//如果已经到达了最大连接数,就阻塞等待
endpoint.countUpOrAwaitConnection();
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
// 这就是服务端的socket accept到了一个客户端的连接
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (!stopCalled && !endpoint.isPaused()) {
// 把客户端的socket注册到 selector中
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
}
}
} finally {
stopLatch.countDown();
}
state = AcceptorState.ENDED;
}
我们可以看到在acceptor中就是一个死循环一直去接受客户端的连接,注意!!!服务端接受客户端的连接是BIO!!!
endpoint.serverSocketAccept() 没有客户端连接进来是阻塞状态。收到客户端的socket之后就会调用 endpoint.setSocketOptions(socket)
方法。
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
// Set socket properties
// Disable blocking, polling will be used
socket.configureBlocking(false);
if (getUnixDomainSocketPath() == null) {
socketProperties.setProperties(socket.socket());
}
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
// 其他的都不用管,直接看到就是把这个socket让poller注册即可
poller.register(socketWrapper);
return true;
}
我们看到了 acceptor接受到了一个socket之后,会调用 poller的register方法
public void register(final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent event = null;
if (eventCache != null) {
event = eventCache.pop();
}
if (event == null) {
event = new PollerEvent(socketWrapper, OP_REGISTER);
} else {
event.reset(socketWrapper, OP_REGISTER);
}
// 添加事件
addEvent(event);
}
private final SynchronizedQueue<PollerEvent> events =
new SynchronizedQueue<>();
// 事件就是放到一个同步队列中去
private void addEvent(PollerEvent event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0) {
selector.wakeup();
}
}
我们可以看到,acceptor中就是阻塞的接受客户端的socket,然后不断的把socket放到 poller的阻塞队列中。至此acceptor的线程逻辑看完了,我们来看看poller的
Poller
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
// 从刚才register的事件中拉去socket然后注册到自己的selector中
hasEvents = events();
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// 如果有事件发生就会开始处理请求
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
if (socketWrapper != null) {
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
NioSocketWrapper socketWrapper = pe.getSocketWrapper();
SocketChannel sc = socketWrapper.getSocket().getIOChannel();
int interestOps = pe.getInterestOps();
try {
// 这就是 这就是把 selector和事件注册起来的方法, NIO原生方法
sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
}
return result;
}
我们可以看到在poller的run方法里面会去events中拉取数据,如果是Acceptor传递过来的socket就会注册到自己的selector中,并且也会监听selector上其他已经注册过的socket发出的事件然后进行处理。最终处理的方法是processSocket
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
// 拿到线程池!!!!
Executor executor = getExecutor();
if (dispatch && executor != null) {
// 让线程池执行
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
Worker
从上面的分析可以知道,poller就是管理者一个selector,注册acceptor传递过来的socket,并且把客户端发来的请求丢给 worker线程。那么我们现在就来看看worker线程的逻辑,也就是查看 SocketProcessorBase
的run方法逻辑。
protected void doRun() {
Poller poller = NioEndpoint.this.poller;
if (poller == null) {
socketWrapper.close();
return;
}
state = getHandler().process(socketWrapper, event);
}
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
S socket = wrapper.getSocket();
Processor processor = (Processor) wrapper.getCurrentProcessor();
if (processor == null) {
// 创建的processor是 http11processor
processor = getProtocol().createProcessor();
register(processor);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
}
}
state = processor.process(wrapper, status);
}
从上面的代码可以看出,每个请求在线程池中最终都是调用 Http11Processor
的 process方法
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
if (getLog().isDebugEnabled()) {
getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
}
state = dispatch(nextDispatch.getSocketStatus());
if (!dispatches.hasNext()) {
state = checkForPipelinedData(state, socketWrapper);
}
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
state = checkForPipelinedData(state, socketWrapper);
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ) {
// 走这里!!!
state = service(socketWrapper);
} else if (status == SocketEvent.CONNECT_FAIL) {
logAccess(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
从上面的代码可以看出process方法就是一通判断类型然后执行不同的方法,当读取数据的时候就是OPEN_READ
事件,所以执行service(socketWrapper)
方法
public SocketState service(SocketWrapperBase<?> socketWrapper)
throws IOException {
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !protocol.isPaused()) {
// Process the request in the adapter
getAdapter().service(request, response);
}
}
省略了一大堆代码之后只看关键代码就是调用了 adapter的service方法。这个adapter就是在 connector的init方法中设置的 CoyoteAdapter,所以我们看看CoyoteAdapter的service方法干了什么.
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
// 把请求封装成 rquest类型,然后把原生放到成员变量中
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// 关键是这行,是调用了 engine的pipeline开始执行
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
上面代码可知CoyoteAdapter
就是封装了一次 request和response,然后开始执行 engine的pipeline获取第一个valve
开始执行。由于没有设置,所以走的是StandardEngineValve
的invoke
public final void invoke(Request request, Response response)
throws IOException, ServletException {
// Select the Host to be used for this Request
Host host = request.getHost();
if (host == null) {
// HTTP 0.9 or HTTP 1.0 request without a host when no default host
// is defined.
// Don't overwrite an existing error
if (!response.isError()) {
response.sendError(404);
}
return;
}
if (request.isAsyncSupported()) {
request.setAsyncSupported(host.getPipeline().isAsyncSupported());
}
// engine又掉了host的 invoke
host.getPipeline().getFirst().invoke(request, response);
}
后续的代码就是一路从 engine->host->context->wrapper 这么一路的调用链,最终停止在了StandardWrapperValve
public final void invoke(Request request, Response response)
throws IOException, ServletException {
// 创建了一个 servlet
servlet = wrapper.allocate();
// 创建了一个执行者链
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
// 执行filter
filterChain.doFilter(request.getRequest(),
response.getResponse());
}
从上面代码可以看出,就是创建了一个servlet,然后创建了一个filterchain然后调用
public void doFilter(ServletRequest request, ServletResponse response)
throws IOException, ServletException {
internalDoFilter(request,response);
}
private void internalDoFilter(ServletRequest request,
ServletResponse response)
throws IOException, ServletException {
// 执行filter链条
if (pos < n) {
ApplicationFilterConfig filterConfig = filters[pos++];
try {
Filter filter = filterConfig.getFilter();
if (request.isAsyncSupported() && "false".equalsIgnoreCase(
filterConfig.getFilterDef().getAsyncSupported())) {
request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE);
}
if( Globals.IS_SECURITY_ENABLED ) {
final ServletRequest req = request;
final ServletResponse res = response;
Principal principal =
((HttpServletRequest) req).getUserPrincipal();
Object[] args = new Object[]{req, res, this};
SecurityUtil.doAsPrivilege ("doFilter", filter, classType, args, principal);
} else {
filter.doFilter(request, response, this);
}
} catch (IOException | ServletException | RuntimeException e) {
throw e;
} catch (Throwable e) {
e = ExceptionUtils.unwrapInvocationTargetException(e);
ExceptionUtils.handleThrowable(e);
throw new ServletException(sm.getString("filterChain.filter"), e);
}
return;
}
// 所有filter执行结束之后就执行 servlet的service方法
servlet.service(request, response);
}
最后的servlet的service方法就是我们创建的servlet的service方法了。至此tomcat的调用流程也就结束了
上一篇: 如何启动和关闭Tomcat 9?