重写muduo之TcpConnection

目录

1、 TcpConnection.h

2、 TcpConnection.cc


1、 TcpConnection.h

TcpConnection底层绑定(管理)了一个Channel,Channel有事件被Poller通知后,会调用相应的回调,这些回调也是TcpConnection中包含的方法,将这些方法绑定了一下塞给channel作为回调,如果TcpConnection相应的底层对应的channel还在poller上注册着,还会感知到poller通知的事件,并且调用相应的回调,如果此时它对应的TcpConnection对象没有了(被人remove掉)怎么办?

tie使用弱智能指针记录,在处理事件的时候(handleEvent),肯定是被tie过,在这个方法里面将弱智能指针提升一下,如果不做任何的回调调用就说明TcpConnection这个对象已经没有了。

#pragma once
#include "noncopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"

#include <memory>
#include <string>
#include <atomic>

class Channel;
class EventLoop;
class Socket;

/**
 * TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd
 * 
 * => TcpConnection 设置回调 =》 Channel => Poller =>Channel的回调操作
 * 
*/
class TcpConnection:noncopyable,public std::enable_shared_from_this<TcpConnection>//获得当前对象的智能指针
{
public:
    TcpConnection(EventLoop* loop,
                const std::string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
    ~TcpConnection();

    EventLoop* getLoop() const {return loop_;}
    const std::string& name() const{return name_;}
    const InetAddress& localAddress() const {return localAddr_;}
    const InetAddress& peerAddress() const {return peerAddr_;}

    bool connected() const {return state_==kConnected;}

    //发送数据
    void send(const std::string& buf);
    //关闭连接
    void shutdown();

    void setConnectionCallback(const ConnectionCallback& cb)
    {connectionCallback_=cb;}

    void setMessageCallback(const MessageCallback& cb)
    {messageCallback_=cb;}

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    {writeCompleteCallback_=cb;}

    void setHighWaterMarkCallback(const HighWaterMarkCallback& cb,size_t highWaterMark)
    {highWaterMarkCallback_=cb; highWaterMark_=highWaterMark;}

    void setCloseCallback(const CloseCallback& cb)
    {closeCallback_=cb;}

    //连接建立
    void connectEstablished();
    //连接销毁
    void connectDestroyed();
private:
    enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting};
    void setState(StateE state){state_=state;}

    void handleRead(Timestamp receiveTime);
    void handleWrite();
    void handleClose();
    void handleError();

    void sendInLoop(const void* message,size_t len);

    void shutdownInLoop();

    EventLoop* loop_;//这里绝对不是baseLoop,因为TcpConnection都是在subloop里面管理的
    const std::string name_;
    std::atomic_int state_;
    bool reading_;

    //这里和Acceptor类似  Acceptor属于mainLoop  TcpConnection属于subLoop
    std::unique_ptr<Socket> socket_;
    std::unique_ptr<Channel> channel_;

    const InetAddress localAddr_;
    const InetAddress peerAddr_;

    //回调
    ConnectionCallback connectionCallback_;//有新连接时的回调
    MessageCallback messageCallback_;//有读写消息时的回调
    WriteCompleteCallback writeCompleteCallback_;//消息发送完成以后的回调
    HighWaterMarkCallback highWaterMarkCallback_;//高水位回调
    CloseCallback closeCallback_;
    size_t highWaterMark_;//设置水位线高度
    
    //数据的缓冲区
    Buffer inputBuffer_;//接收数据的缓冲区
    Buffer outputBuffer_;//发送数据的缓冲区
};

2、 TcpConnection.cc

当在上层调用了某一connection的shutdown()方法,设置当前服务器端的状态是disconnecting,然后执行shutdownInLoop,因为shutdownInLoop关闭了socket的write端,poller就给channel通知了关闭事件,就回调TcpConnection::handclose()方法,handclose()方法相当于将channel所有的事件都去掉。

#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"

#include <functional>
#include <errno.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>
#include <sys/socket.h>

static EventLoop *CheckLoopNotNull(EventLoop *loop)
{
    if (loop == nullptr)
    {
        LOG_FATAL("%s:%s:%d TcpConnection Loop is null!\n", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

//已建立连接客户端跟服务器之间的联系
TcpConnection::TcpConnection(EventLoop *loop,
                const std::string &nameArg,
                int sockfd,
                const InetAddress &localAddr,
                const InetAddress &peerAddr)
    : loop_(CheckLoopNotNull(loop))
    ,name_(nameArg)
    ,state_(kConnecting)
    ,reading_(true)
    ,socket_(new Socket(sockfd))
    ,channel_(new Channel(loop,sockfd))
    ,localAddr_(localAddr)
    ,peerAddr_(peerAddr)
    ,highWaterMark_(64*1024*1024)//64M
{
    //下面给channel设置相应的回调函数,poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数
    channel_->setReadCallback(
        std::bind(&TcpConnection::handleRead,this,std::placeholders::_1)
    );
    channel_->setWriteCallback(
        std::bind(&TcpConnection::handleWrite,this)
    );
    channel_->setCloseCallback(
        std::bind(&TcpConnection::handleClose,this)
    );
    channel_->setErrorCallback(
        std::bind(&TcpConnection::handleError,this)
    );

    LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
    socket_->setKeepAlive(true);//保活机制

}

TcpConnection::~TcpConnection()
{
    LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d\n",name_.c_str(),channel_->fd(),(int)state_);
}


void TcpConnection::send(const std::string& buf)
{
    if(state_==kConnected)
    {
        if(loop_->isInLoopThread())
        {
            sendInLoop(buf.c_str(),buf.size());
        }
        else
        {
            loop_->runInLoop(std::bind(
                &TcpConnection::sendInLoop,
                this,
                buf.c_str(),
                buf.size()
            ));
        }
    }
}

/**
 * 发送数据 应用写的快,而内核发送数据慢,需要把待发送数据写入缓冲区,而且设置了水位回调
*/
void TcpConnection::sendInLoop(const void* data,size_t len)
{
    ssize_t nwrote=0;
    size_t remaining=len;//没发送完的数据
    bool faultError=false;//是否发生错误

    //之前调用过该connection的shutdown,不能再进行发送了
    if(state_==kDisconnected)
    {
        LOG_ERROR("disconnected,give up writing!");
        return;
    }

    //表示channel_第一次开始写数据,而且缓冲区没有待发送数据
    if(!channel_->isWriting()&&outputBuffer_.readableBytes()==0)
    {
        nwrote=::write(channel_->fd(),data,len);
        if(nwrote>=0)//发送成功
        {
            remaining=len-nwrote;
            if(remaining==0&&writeCompleteCallback_)
            {
                //既然在这里数据全部发送完成,就不用再给channel设置epollout事件了
                loop_->queueInLoop(
                    std::bind(writeCompleteCallback_,shared_from_this())
                );
            }
        }
        else //nwrote<0
        {
            nwrote=0;
            //errno==EWOULDBLOCK  由于非阻塞,没有数据时正常的返回
            if(errno!=EWOULDBLOCK)//真正的错误
            {
                LOG_ERROR("TcpConnection::sendInLoop");
                if(errno==EPIPE||errno==ECONNRESET)// SIGPIPE  RESET 收到连接重置的请求
                {
                    faultError=true;
                }
            }
        }
    }
    //说明当前这一次write,并没有把数据全部发送出去,剩余的数据需要保存到缓冲区当中,然后给channel
    //注册epollout事件,poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,调用writeCallback_回调方法
    //也就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成
    if(!faultError&&remaining>0)
    {
        //目前发送缓冲区剩余的待发送数据的长度
        size_t oldLen=outputBuffer_.readableBytes();
        if(oldLen+remaining>=highWaterMark_
            &&oldLen<highWaterMark_
            &&highWaterMarkCallback_)
        {
            loop_->queueInLoop(
                std::bind(highWaterMarkCallback_,shared_from_this(),oldLen+remaining)
            );//调用水位线回调
        }
        outputBuffer_.append((char*)data+nwrote,remaining);//将待发送数据添加到缓冲区中
        if(!channel_->isWriting())
        {
            channel_->enableWriting();//这里一定要注册channel的写事件,否则poller不会给channel通知epollout
        }
    }
}

// 关闭连接
void TcpConnection::shutdown()
{
    if(state_==kConnected)
    {
        setState(kDisconnecting);
        loop_->runInLoop(
            std::bind(&TcpConnection::shutdownInLoop,this)
        );
    }
}

void TcpConnection::shutdownInLoop()
{
    if(!channel_->isWriting())//说明outputBuffer中的数据已经全部发送完成
    {
        socket_->shutdownWrite();//关闭写端
    }
}

// 连接建立
void TcpConnection::connectEstablished()
{
    setState(kConnected);//设置连接成功状态,初始状态为kConnecting
    channel_->tie(shared_from_this());//绑定channel,让这个channel记录一下TcpConnection对象存活的状态
    channel_->enableReading();//同poller注册channel的epollin事件

    //新连接建立,执行回调
    connectionCallback_(shared_from_this());
}

// 连接销毁
void TcpConnection::connectDestroyed()
{
    if(state_==kConnected)
    {
        setState(kDisconnected);
        channel_->disableAll();//把channel的所有感兴趣的事件,从poller中del掉  相当于epoll_ctl
        connectionCallback_(shared_from_this());//断开连接
    }
    channel_->remove();//把channel从poller中删除掉
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
    int savedErrno=0;
    ssize_t n=inputBuffer_.readFd(channel_->fd(),&savedErrno);
    if(n>0)
    {
        //已建立连接的用户,有可读事件发生了,调用用户传入的回调操作omMessage
        messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
    }
    else if(n==0)
    {
        handleClose();
    }
    else
    {
        errno=savedErrno;
        LOG_ERROR("TcpConnection::handleRead");
        handleError();
    }
}
void TcpConnection::handleWrite()
{
    if(channel_->isWriting())
    {
        int savedErrno=0;
        ssize_t n=outputBuffer_.writeFd(channel_->fd(),&savedErrno);
        if(n>0)
        {
            outputBuffer_.retrieve(n);
            if(outputBuffer_.readableBytes()==0)//发送完成
            {
                channel_->disableWriting();//由可写变为不可写
                if(writeCompleteCallback_)
                {
                    //唤醒loop_对应的thread线程,执行回调
                    loop_->queueInLoop(//loop一定在TcpConnection所对应的线程中
                        std::bind(writeCompleteCallback_,shared_from_this())
                    );
                }
                if(state_==kDisconnecting)
                {
                    shutdownInLoop();
                }
            }
        }
        else
        {
            LOG_ERROR("TcpConnection::handleWrite");
        }
    }
    else//执行handlewrite,但channel并不是可写事件
    {
        LOG_ERROR("TcpConnection fd=%d is down,no more writing\n",channel_->fd());
    }
}

//poller=>channel::closeCallback=>TcpConnection::handleClose  回调
void TcpConnection::handleClose()
{
    LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);
    setState(kDisconnected);
    channel_->disableAll();//将channel感兴趣的事件从poller上全部删除

    TcpConnectionPtr connPtr(shared_from_this());
    connectionCallback_(connPtr);//执行连接关闭的回调
    closeCallback_(connPtr);//关闭连接的回调   执行的是TcpServer::removeConnection回调方法
}
void TcpConnection::handleError()
{
    int optval;
    socklen_t optlen=sizeof optval;
    int err=0;
    if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen)<0)
    {
        err = errno;
    }
    else
    {
        err=optval;
    }
    LOG_ERROR("TcpConnection::handleError name:%s-SO_ERROR:%d \n",name_.c_str(),err);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/618226.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

牛客Java面试题【面试】

牛客Java面试题【面试】 前言推荐牛客Java面试题【面试】第2章 Java笔面试高频考点&解题技巧1. Java基础[2.1 一、java-基础-1](https://www.nowcoder.com/study/live/689/2/1)1.1 为什么Java代码可以实现一次编写、到处运行&#xff1f;1.2 一个Java文件里可以有多个类吗&…

我觉得这个域名证书监控平台又吊打Uptimekuma了

前面我们讲过uptimekuma 如何监控域名证书&#xff0c;很多人都喜欢 uptimekuma 那高端暗黑的色系上&#xff0c;然而最实用就是它的域名证书监控和历史可用性图表的展示上了&#xff0c;如下如&#xff1a; 但是这个东西吧&#xff0c;好看吗&#xff1f;好看&#xff0c;有用…

关键点检测——面部情绪数据集

引言 亲爱的读者们&#xff0c;您是否在寻找某个特定的数据集&#xff0c;用于研究或项目实践&#xff1f;欢迎您在评论区留言&#xff0c;或者通过公众号私信告诉我&#xff0c;您想要的数据集的类型主题。小编会竭尽全力为您寻找&#xff0c;并在找到后第一时间与您分享。 …

开源web在线数据库设计软件 —— 筑梦之路

GitHub - drawdb-io/drawdb: Free, simple, and intuitive online database design tool and SQL generator. 简介 DrawDB是一款多功能且用户友好的在线工具&#xff0c;允许用户轻松设计数据库实体关系。通过简单直观的界面&#xff0c;DrawDB使用户能够创建图表、导出SQL脚本…

【机器学习300问】86、简述超参数优化的步骤?如何寻找最优的超参数组合?

本文想讲述清楚怎么样才能选出最优的超参数组合。关于什么是超参数&#xff1f;什么是超参数组合&#xff1f;本文不赘述&#xff0c;在之前我写的文章中有详细介绍哦&#xff01; 【机器学习300问】22、什么是超参数优化&#xff1f;常见超参数优化方法有哪些&#xff1f;htt…

AcWing-168生日蛋糕-搜索/剪枝

题目 思路 表面积和体积公式&#xff1a;以下分析参考自&#xff1a;AcWing 168. 生日蛋糕【图解推导】 - AcWing&#xff1b;AcWing 168. 关于四个剪枝的最清楚解释和再次优化 - AcWing 代码 #include<iostream> #include<cmath> using namespace std;const in…

http协议 tomcat如何访问资源 servlet理论介绍

tomcat介绍 bin是启动命令&#xff1b; conf是配置&#xff0c;可以修改端口号&#xff1b; lib是依赖的jar包&#xff1b; logs是日志 webapps是重点&#xff0c;在这里新建我们自己的javaWeb项目 tomcat如何访问资源 tomcat通过统一资源定位符&#xff08;URL&#xff09;来…

数据分析——业务数据描述

业务数据描述 前言一、数据收集数据信息来源企业内部数据源市场调查数据源公共数据源和第三方数据源 二、公司内部数据客户资料数据销售明细数据营销活动数据 三、市场调查数据观察法提问法实验法 四、公共数据五、第三方数据六、数据预处理七、数据清洗丢弃部分数据补全缺失的…

安卓开发--新建工程,新建虚拟手机,按键事件响应(含:Android中使用switch-case遇到case R.id.xxx报错)

安卓开发--新建工程&#xff0c;新建虚拟手机&#xff0c;按键事件响应 1.前言2.运行一个工程2.1布局一个Button2.2 button一般点击事件2.2 button属性点击事件2.2 button推荐点击事件&#xff08;含&#xff1a;Android中使用switch-case遇到case R.id.xxx报错&#xff09; 本…

PD-L1表达与免疫逃逸和免疫响应

免疫检查点信号转导和癌症免疫治疗&#xff08;文献&#xff09;-CSDN博客https://blog.csdn.net/hx2024/article/details/137470621?ops_request_misc%257B%2522request%255Fid%2522%253A%2522171551954416800184136566%2522%252C%2522scm%2522%253A%252220140713.130102334.…

ollama离线安装,在CPU运行它所支持的哪些量化的模型

在线安装的链接: Download Ollama on LinuxGet up and running with large language models.https://ollama.com/download/linux 离线安装教程: 下载install.sh: https://ollama.ai/install.sh

logback日志持久化

1、问题描述 使用logback持久化记录日志。 2、我的代码 logback是Springboot框架里自带的&#xff0c;所以只要引入“spring-boot-starter”就行了。无需额外引入logback依赖。 pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns&…

docker(五):DockerFile

文章目录 DockerFile1、Dockerfile构建过程解析2、DockerFile常用保留字命令FROMMAINTAINERRUNEXPOSEWORKDIRUSERENVADDCOPYVOLUMECMDENTRYPOINT总结 3、案例 DockerFile 1、Dockerfile构建过程解析 官网文档&#xff1a;https://docs.docker.com/reference/dockerfile/ Dock…

【JavaScript】DOM 事件的传播机制

事件与事件流 事件&#xff0c;这里指和网页进行互动。比如点击链接&#xff0c;移动鼠标等网页被触发&#xff0c;做出响应&#xff0c;形成交互。 js 采用事件监听器来监听事件是否发生。 事件流 事件流描述了从页面中接收事件的顺序。当一个事件发生在某个元素上时&…

匿名管道及其应用

目录 一、什么是匿名管道&#xff1f; 三、创建与使用匿名管道 三、匿名管道的特点 匿名管道的四种情况 匿名管道的五种特性 四、匿名管道的实践应用---进程池 在编程的世界中&#xff0c;匿名管道是一种非常重要的通信机制。今天&#xff0c;让我们一起来深入探讨一下匿…

「 安全设计 」68家国内外科技巨头和安全巨头参与了CISA发起的安全设计承诺,包含MFA、默认密码、CVE、VDP等七大承诺目标

美国网络安全和基础设施安全局&#xff08;CISA&#xff0c;CyberSecurity & Infrastructure Security Agency&#xff09;于2024年5月开始呼吁企业是时候将网络安全融入到技术产品的设计和制造中了&#xff0c;并发起了安全设计承诺行动&#xff0c;该承诺旨在补充和建立现…

数据挖掘原理与应用------分类预测

在数据挖掘和机器学习领域&#xff0c;TPR&#xff08;True Positive Rate&#xff09;是指在实际为阳性的情况下&#xff0c;模型正确预测为阳性的比例。TPR也被称为灵敏度&#xff08;Sensitivity&#xff09;或召回率&#xff08;Recall&#xff09;。它是评估分类模型性能的…

【LeetCode算法】1768. 交替合并字符串

提示&#xff1a;此文章仅作为本人记录日常学习使用&#xff0c;若有存在错误或者不严谨得地方欢迎指正。 文章目录 一、题目二、思路三、解决方案 一、题目 给你两个字符串 word1 和 word2 。请你从 word1 开始&#xff0c;通过交替添加字母来合并字符串。如果一个字符串比另…

bash tab 补全报错 bash: syntax error near unexpected token `(‘

使用 vim 编辑文件时&#xff0c;敲下 vim xxx 后&#xff0c;再键入 tab 键报进行补全报错 bash: syntax error near unexpected token (. 打开 bash 的命令执行详情 set -v 定位到具体的代码&#xff1a; 显然&#xff0c;代码位于 bash 补全的逻辑当中。 定位代码具体的…

搭建属于自己的AI知识库

前言 最近在看一本书《在线》&#xff0c;将所有数据都需要在线&#xff0c;才有生命力&#xff0c;那么我们的知识库也是。我们现在就可以用先进的大预言模型搭建属于自己的在线 AI 知识库&#xff0c;他就是 ChatGLM 智谱清言智能体。 它可以将自己的知识库与 ChatGLM 结合&…