[prev in list] [next in list] [prev in thread] [next in thread]
List: mina-dev
Subject: Ask a question on the mina
From: <gw574813284 () sina ! com>
Date: 2015-04-15 14:15:33
Message-ID: 20150415141533.8AE7B718001 () webmail ! sinamail ! sina ! com ! cn
[Download RAW message or body]
[Attachment #2 (text/plain)]
Hi:I am in the process of using Mina, in a large number of concurrent test, will \
timeout situation,I set the readTimeOut to 54s, but still there will be individual \
session 1s timeout;The following code: package com.palmcity.bus.servers;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.palmcity.bus.code.TriffProtocolCodecFactory;
import com.palmcity.bus.handler.AcceptorHandler;
/**
* tcp中心服务器
* @author gouwei
*
*/
public class SocketAdapter extends AbstractProduct{
/**
* 缓冲区大小
*/
private static final int BUFFER_SIZE = 1024 * 500;
private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
public Thread newThread(final Runnable r) {
return new Thread(null, r, "acceptor", 64 * 1024);
}
};
private OrderedThreadPoolExecutor executor;
public void startServer(){
try {
executor = new OrderedThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, \
THREAD_FACTORY);
acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() \
+ 1); acceptor.setReuseAddress(true);
acceptor.setBacklog(config.getMaxClinetNum());
acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE);
// acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, TIMEOUT);
acceptor.getSessionConfig().setReaderIdleTime(45);
//acceptor.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, TIMEOUT);
acceptor.getFilterChain().addLast("threadPool", new \
ExecutorFilter(executor)); //添 解 器 ,编 器
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new \
TriffProtocolCodecFactory())); //接收处理handler
acceptor.setHandler(new AcceptorHandler());
acceptor.bind(new InetSocketAddress(config.getPort()));
} catch (IOException e) {
logger.error("socket 服务启动异常!",e);
if(acceptor == null)
acceptor.dispose();
}
}
}
package com.palmcity.bus.handler;
import java.nio.ByteBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.palmcity.bus.conn.TCPConnectionFactory;
import com.palmcity.bus.conn.TcpConnection;
import com.palmcity.bus.data.BaseData;
import com.palmcity.bus.data.Command;
import com.palmcity.bus.data.CommandType;
import com.palmcity.bus.data.RepalyData;
import com.palmcity.bus.data.SrcRequestDataQueue;
import com.palmcity.bus.data.response.ResponseCode;
import com.palmcity.bus.handler.request.ReqLoginHandler;
import com.palmcity.bus.jdbc.TriffDBService;
import com.palmcity.bus.utils.Config;
import com.palmcity.bus.utils.MemcachedUtils;
import com.palmcity.bus.utils.SystemCached;
public class AcceptorHandler extends IoHandlerAdapter{
private static final Logger logger = \
LoggerFactory.getLogger(IoHandlerAdapter.class); protected TCPConnectionFactory \
tcpConnectionFactory = TCPConnectionFactory.getTcpConnectionFactory(); private final \
TriffDBService dbService = new TriffDBService(); private final String STOPMID = \
"STOPMID"; private final Config config = Config.getInstance();
public synchronized void sessionCreated(IoSession session) throws Exception {
logger.info("create session : "+ session.getId());
}
public synchronized void sessionOpened(IoSession session) throws Exception {
logger.info("open session : "+ session.getId());
}
public synchronized void sessionClosed(IoSession session) throws Exception {
logger.info("客户端链接以关闭,Address = " +session.getRemoteAddress() \
+" ,ID= "+ session.getAttribute(STOPMID)); if(session.getAttribute(STOPMID) != \
null){
TcpConnection tcpConn = \
tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID)); \
if(tcpConn !=null){ // tcpConn.setActive(false);
tcpConnectionFactory.remove(tcpConn.getStopMid());
MemcachedUtils.getInstance().delete(MemcachedUtils.STATE_CITYCODE_STOPMID+tcpConn.getStopMid());
// tcpConnectionFactory.put(tcpConn.getStopMid(), tcpConn);
dbService.shutdown(tcpConn.getNo(),tcpConn.getCityCode());
}
}
session.close(true);
}
public synchronized void sessionIdle(IoSession session, IdleStatus status) throws \
Exception { if (status == IdleStatus.READER_IDLE) {
logger.info("超过"+config.getTimeOut()+" \
秒没有收到客户端数据,关闭客户端链接,Address = " \
+session.getRemoteAddress() +" ,ID = "+session.getAttribute(STOPMID)); \
session.close(true); //future.awaitUninterruptibly();
}
/* if(bothIdleReceived && session.getAttribute(STOPMID) != null){
TcpConnection tcpConn = \
tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID)); \
if(tcpConn !=null){ tcpConn.setActive(false);
dbService.shutdown(tcpConn.getStopMid(),tcpConn.getCityCode());
}
}*/
//关闭链接
}
public synchronized void exceptionCaught(IoSession session, Throwable cause) \
throws Exception { // if (logger.isWarnEnabled()) {
logger.warn("system exception : ", cause);
//}
}
public synchronized void messageReceived(IoSession session, Object message) {
BaseData dataBean = (BaseData)message;
try{
TcpConnection conn = \
tcpConnectionFactory.getTcpConnection(dataBean.getStopMid()); \
//检查链接是否为登录操作 \
if(CommandType.login.toString().equals(dataBean.getCommand())){
//登录操作
TcpConnection tcpConnection = new TcpConnection();
tcpConnection.setSession(session);
session.setAttribute(STOPMID, dataBean.getStopMid());
ReqLoginHandler handler = new ReqLoginHandler();
handler.loginHandler(dataBean, tcpConnection);
tcpConnectionFactory.put(stopMid,tcpConn);
return ;
}else{
//其他操作
}
}
}catch(Exception ex){
logger.error("接收数据处理异常,msg = " + dataBean.toString());
//throw ex;
}
}
public void messageSent(IoSession session, Object message) throws Exception {
logger.debug("服务器发送消息:" + message );
}
public synchronized void inputClosed(IoSession session) throws Exception {
/*try{
if(session.getAttribute(STOPMID) != null){
TcpConnection tcpConn = \
tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID)); \
if(tcpConn !=null){ tcpConn.setActive(false);
dbService.shutdown(tcpConn.getStopMid(),tcpConn.getCityCode());
}
}
}catch(Exception ex){
logger.error(ex.getMessage(),ex);
}*/
session.close(true);
}
}
The log4j log:
2015-04-15 20:48:30 [INFO]-[TriffDecoder.java:63] \
recevier:{"type":"dataset","msgId":"21886b1b2a814e1b8944588bb1e6faa7","command":"login \
","cityCode":"130300","server":"192.168.3.101","stopMid":"1304100109","cipher":"5D7314ED6A7A820A0E8CF98ECA8D8A05"} \
2015-04-15 20:48:30 [INFO]-[AcceptorHandler.java:58] 超过45 \
秒没有收到客户端数据,关闭客户端链接,Address = /192.168.4.201:62557 \
,ID = 1304100109 2015-04-15 20:48:30 [INFO]-[AcceptorHandler.java:41] \
客户端链接以关闭,Address = null ,ID= 1304100109
I wonder why a large number of concurrent situation will cause this to happen? Using \
mina2.0.9;"1304100109" the client login time is 2015-04-15 20:48:30, and then Mina \
call the sessionIdle method to log time is 2015-04-15 20:48:30. Looking forward to \
your reply. 北京掌行通信息技术有限公司gouwei@chinatransinfo.com苟伟 \
18612215231
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic