目录
1、 TcpConnection.h
2、 TcpConnection.cc
1、 TcpConnection.h
TcpConnection底层绑定(管理)了一个Channel,Channel有事件被Poller通知后,会调用相应的回调,这些回调也是TcpConnection中包含的方法,将这些方法绑定了一下塞给channel作为回调,如果TcpConnection相应的底层对应的channel还在poller上注册着,还会感知到poller通知的事件,并且调用相应的回调,如果此时它对应的TcpConnection对象没有了(被人remove掉)怎么办?
tie使用弱智能指针记录,在处理事件的时候(handleEvent),肯定是被tie过,在这个方法里面将弱智能指针提升一下,如果不做任何的回调调用就说明TcpConnection这个对象已经没有了。
#pragma once
#include "noncopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"
#include <memory>
#include <string>
#include <atomic>
class Channel;
class EventLoop;
class Socket;
/**
* TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd
*
* => TcpConnection 设置回调 =》 Channel => Poller =>Channel的回调操作
*
*/
class TcpConnection:noncopyable,public std::enable_shared_from_this<TcpConnection>//获得当前对象的智能指针
{
public:
TcpConnection(EventLoop* loop,
const std::string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();
EventLoop* getLoop() const {return loop_;}
const std::string& name() const{return name_;}
const InetAddress& localAddress() const {return localAddr_;}
const InetAddress& peerAddress() const {return peerAddr_;}
bool connected() const {return state_==kConnected;}
//发送数据
void send(const std::string& buf);
//关闭连接
void shutdown();
void setConnectionCallback(const ConnectionCallback& cb)
{connectionCallback_=cb;}
void setMessageCallback(const MessageCallback& cb)
{messageCallback_=cb;}
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{writeCompleteCallback_=cb;}
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb,size_t highWaterMark)
{highWaterMarkCallback_=cb; highWaterMark_=highWaterMark;}
void setCloseCallback(const CloseCallback& cb)
{closeCallback_=cb;}
//连接建立
void connectEstablished();
//连接销毁
void connectDestroyed();
private:
enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting};
void setState(StateE state){state_=state;}
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const void* message,size_t len);
void shutdownInLoop();
EventLoop* loop_;//这里绝对不是baseLoop,因为TcpConnection都是在subloop里面管理的
const std::string name_;
std::atomic_int state_;
bool reading_;
//这里和Acceptor类似 Acceptor属于mainLoop TcpConnection属于subLoop
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
//回调
ConnectionCallback connectionCallback_;//有新连接时的回调
MessageCallback messageCallback_;//有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_;//消息发送完成以后的回调
HighWaterMarkCallback highWaterMarkCallback_;//高水位回调
CloseCallback closeCallback_;
size_t highWaterMark_;//设置水位线高度
//数据的缓冲区
Buffer inputBuffer_;//接收数据的缓冲区
Buffer outputBuffer_;//发送数据的缓冲区
};
2、 TcpConnection.cc
当在上层调用了某一connection的shutdown()方法,设置当前服务器端的状态是disconnecting,然后执行shutdownInLoop,因为shutdownInLoop关闭了socket的write端,poller就给channel通知了关闭事件,就回调TcpConnection::handclose()方法,handclose()方法相当于将channel所有的事件都去掉。
#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"
#include <functional>
#include <errno.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
static EventLoop *CheckLoopNotNull(EventLoop *loop)
{
if (loop == nullptr)
{
LOG_FATAL("%s:%s:%d TcpConnection Loop is null!\n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
//已建立连接客户端跟服务器之间的联系
TcpConnection::TcpConnection(EventLoop *loop,
const std::string &nameArg,
int sockfd,
const InetAddress &localAddr,
const InetAddress &peerAddr)
: loop_(CheckLoopNotNull(loop))
,name_(nameArg)
,state_(kConnecting)
,reading_(true)
,socket_(new Socket(sockfd))
,channel_(new Channel(loop,sockfd))
,localAddr_(localAddr)
,peerAddr_(peerAddr)
,highWaterMark_(64*1024*1024)//64M
{
//下面给channel设置相应的回调函数,poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead,this,std::placeholders::_1)
);
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite,this)
);
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose,this)
);
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError,this)
);
LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
socket_->setKeepAlive(true);//保活机制
}
TcpConnection::~TcpConnection()
{
LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d\n",name_.c_str(),channel_->fd(),(int)state_);
}
void TcpConnection::send(const std::string& buf)
{
if(state_==kConnected)
{
if(loop_->isInLoopThread())
{
sendInLoop(buf.c_str(),buf.size());
}
else
{
loop_->runInLoop(std::bind(
&TcpConnection::sendInLoop,
this,
buf.c_str(),
buf.size()
));
}
}
}
/**
* 发送数据 应用写的快,而内核发送数据慢,需要把待发送数据写入缓冲区,而且设置了水位回调
*/
void TcpConnection::sendInLoop(const void* data,size_t len)
{
ssize_t nwrote=0;
size_t remaining=len;//没发送完的数据
bool faultError=false;//是否发生错误
//之前调用过该connection的shutdown,不能再进行发送了
if(state_==kDisconnected)
{
LOG_ERROR("disconnected,give up writing!");
return;
}
//表示channel_第一次开始写数据,而且缓冲区没有待发送数据
if(!channel_->isWriting()&&outputBuffer_.readableBytes()==0)
{
nwrote=::write(channel_->fd(),data,len);
if(nwrote>=0)//发送成功
{
remaining=len-nwrote;
if(remaining==0&&writeCompleteCallback_)
{
//既然在这里数据全部发送完成,就不用再给channel设置epollout事件了
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this())
);
}
}
else //nwrote<0
{
nwrote=0;
//errno==EWOULDBLOCK 由于非阻塞,没有数据时正常的返回
if(errno!=EWOULDBLOCK)//真正的错误
{
LOG_ERROR("TcpConnection::sendInLoop");
if(errno==EPIPE||errno==ECONNRESET)// SIGPIPE RESET 收到连接重置的请求
{
faultError=true;
}
}
}
}
//说明当前这一次write,并没有把数据全部发送出去,剩余的数据需要保存到缓冲区当中,然后给channel
//注册epollout事件,poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,调用writeCallback_回调方法
//也就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成
if(!faultError&&remaining>0)
{
//目前发送缓冲区剩余的待发送数据的长度
size_t oldLen=outputBuffer_.readableBytes();
if(oldLen+remaining>=highWaterMark_
&&oldLen<highWaterMark_
&&highWaterMarkCallback_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_,shared_from_this(),oldLen+remaining)
);//调用水位线回调
}
outputBuffer_.append((char*)data+nwrote,remaining);//将待发送数据添加到缓冲区中
if(!channel_->isWriting())
{
channel_->enableWriting();//这里一定要注册channel的写事件,否则poller不会给channel通知epollout
}
}
}
// 关闭连接
void TcpConnection::shutdown()
{
if(state_==kConnected)
{
setState(kDisconnecting);
loop_->runInLoop(
std::bind(&TcpConnection::shutdownInLoop,this)
);
}
}
void TcpConnection::shutdownInLoop()
{
if(!channel_->isWriting())//说明outputBuffer中的数据已经全部发送完成
{
socket_->shutdownWrite();//关闭写端
}
}
// 连接建立
void TcpConnection::connectEstablished()
{
setState(kConnected);//设置连接成功状态,初始状态为kConnecting
channel_->tie(shared_from_this());//绑定channel,让这个channel记录一下TcpConnection对象存活的状态
channel_->enableReading();//同poller注册channel的epollin事件
//新连接建立,执行回调
connectionCallback_(shared_from_this());
}
// 连接销毁
void TcpConnection::connectDestroyed()
{
if(state_==kConnected)
{
setState(kDisconnected);
channel_->disableAll();//把channel的所有感兴趣的事件,从poller中del掉 相当于epoll_ctl
connectionCallback_(shared_from_this());//断开连接
}
channel_->remove();//把channel从poller中删除掉
}
void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno=0;
ssize_t n=inputBuffer_.readFd(channel_->fd(),&savedErrno);
if(n>0)
{
//已建立连接的用户,有可读事件发生了,调用用户传入的回调操作omMessage
messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
}
else if(n==0)
{
handleClose();
}
else
{
errno=savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}
void TcpConnection::handleWrite()
{
if(channel_->isWriting())
{
int savedErrno=0;
ssize_t n=outputBuffer_.writeFd(channel_->fd(),&savedErrno);
if(n>0)
{
outputBuffer_.retrieve(n);
if(outputBuffer_.readableBytes()==0)//发送完成
{
channel_->disableWriting();//由可写变为不可写
if(writeCompleteCallback_)
{
//唤醒loop_对应的thread线程,执行回调
loop_->queueInLoop(//loop一定在TcpConnection所对应的线程中
std::bind(writeCompleteCallback_,shared_from_this())
);
}
if(state_==kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else//执行handlewrite,但channel并不是可写事件
{
LOG_ERROR("TcpConnection fd=%d is down,no more writing\n",channel_->fd());
}
}
//poller=>channel::closeCallback=>TcpConnection::handleClose 回调
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);
setState(kDisconnected);
channel_->disableAll();//将channel感兴趣的事件从poller上全部删除
TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr);//执行连接关闭的回调
closeCallback_(connPtr);//关闭连接的回调 执行的是TcpServer::removeConnection回调方法
}
void TcpConnection::handleError()
{
int optval;
socklen_t optlen=sizeof optval;
int err=0;
if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen)<0)
{
err = errno;
}
else
{
err=optval;
}
LOG_ERROR("TcpConnection::handleError name:%s-SO_ERROR:%d \n",name_.c_str(),err);
}