Canal安装使用

一 Canal介绍

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

背景
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析了;

工作原理
在这里插入图片描述
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,mysql master收到dump请求,开始推送binary log给slave(也就是canal),canal解析binary log对象(原始为byte流);
在这里插入图片描述

二 linux下安装配置启动Canal

下载
首先,下载canal安装包:canal下载地址
在这里插入图片描述
安装步骤
1、创建一个canal文件夹,上传压缩包并解压
2、修改canal配置文件

vi conf/example/instance.properties

在这里插入图片描述
3、修改mysql配置

vi /etc/my.cnf 
log-bin=mysql-bin             #添加这一行就ok 
binlog-format=ROW           #选择row模式 
server-id=1       #配置mysql replaction需要定义,不能和canal的slaveI重复
binlog-do-db=micromall

4、执行mysql 创建canal用户

create user canal identified by 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

查看是否授权成功:

select * from user  where  user='canal'

canal的工作原理

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal );
  3. canal 解析 binary log 对象(原始为 byte 流);

5、启动canal

cd bin
./startup.sh

三 测试Canal

逻辑:
mysql开启主从同步
启动Canal,拿到mysql的binlog日志
消息中间件获取binlog日志
添加依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

修改数据库的值,发现日志里有新的输出
在这里插入图片描述

四 接入中间件

目的:修改mysql数据后,把消息同步到RocketMQ

修改instance 配置文件

vi conf/example/instance.properties
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

instance.properties完整版

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
#canal.instance.master.address=192.168.65.232:3306
canal.instance.master.address=192.168.65.232:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=micromall
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=micromall.pms_product,micromall.sms_flash_promotion_product_relation
# table black regex
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=productDetailChange
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

修改canal 配置文件

vi /usr/local/canal/conf/canal.properties
##################################################
#########                    MQ     #############
##################################################
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 192.168.1.150:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
#消息生产组名
canal.mq.producerGroup = Canal-Producer
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 30
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =

canal.properties完整版

#################################################
######### 		common argument		############# 
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################
##################################################
#########                    MQ     #############
##################################################
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
#消息生产组名
canal.mq.producerGroup = canalProducer
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 30
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =

参数说明:
在这里插入图片描述

五 Canal内部原理

在这里插入图片描述

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)
mysql> show variables like 'binlog_format';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.00 sec)

解析开始:
com.alibaba.otter.canal.parse.inbound.AbstractEventParser#start

六 Canal集群高可用

canal的ha分为两部分,canal server和canal client分别有对应的ha实现

canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.

canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。
在这里插入图片描述
大致步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/445276.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

人工智能课题、模型源码

人工智能研究生毕业&#xff5e;深度学习、计算机视觉、时间序列预测&#xff08;LSTM、GRU、informer系列&#xff09;、python、人工智能项目代做和指导&#xff0c;各种opencv图像处理、图像分类模型&#xff08;vgg、resnet、mobilenet、efficientnet等&#xff09;、人脸检…

什么是Python网络爬虫

Python网络爬虫是一种自动化获取网页内容的程序。它可以通过发送HTTP请求&#xff0c;获取网页的HTML代码&#xff0c;并从中提取所需的数据。下面是一个简单的概述&#xff0c;帮助您理解Python网络爬虫的基本原理。 导入所需的库&#xff1a; 在Python中&#xff0c;我们可以…

大数据工程师的日常工作内容是干嘛?

前言 随着数字化时代的来临&#xff0c;大数据已经成为了许多领域不可或缺的重要资源。而大数据工程师掌握着处理、分析和应用大数据的核心技能。那么&#xff0c;大数据工程师的日常工作内容到底是什么呢&#xff1f; 我曾在智慧交通、用户画像及推荐、运营分析、平台研发等…

【异常 - 错误的更优解决方案】

目录&#xff1a; 前言异常&#xff08;一&#xff09; c语言原有的错误处理方式&#xff08;二&#xff09; 异常的概念&#xff08;三&#xff09;异常的使用1.异常的抛出与捕捉2.函数调用链中异常栈的展开原则 &#xff08;四&#xff09;5组测试及对应结论1.常规测试2.异常…

【布局:1688,阿里海外的新筹码?】1688重新布局跨境海外市场:第一步开放1688API数据采集接口

2023年底&#xff0c;阿里巴巴“古早”业务1688突然成为“重头戏”&#xff0c;尤其宣布正式布局跨境业务的消息&#xff0c;一度引发电商圈讨论。1688重新布局跨境海外市场&#xff1a;第一步开放1688API数据采集接口 2023年11月中旬&#xff0c;阿里财报分析师电话会上&…

保姆级讲解字符串函数(下篇)

目录 strtok的使用 strerror的使用 strstr的使用和函数模拟实现 strstr的使用 strstr函数模拟实现 接上篇&#xff1a;保姆级讲解字符串函数&#xff08;上篇&#xff09;&#xff0c;我们接着把剩下三个函数讲解完&#xff0c;继续跟着我的步伐一起学习呀. strtok的使用 …

2024蓝桥杯每日一题(多路归并)

一、第一题&#xff1a;鱼塘钓鱼 解题思路&#xff1a;多路归并优先队列 首先枚举能走到的距离然后再用优先队列将最大的值累加 【Python程序代码】 from heapq import * n int(input()) a [0] list(map(int,input().split())) b [0] list(map(int,input().spli…

解决ChatGPT发送消息没有反应

ChatGPT发消息没反应 今天照常使用ChatGPT来帮忙码代码&#xff0c;结果发现发出去的消息完全没有反应&#xff0c;即不给我处理&#xff0c;也没有抱任何的错误&#xff0c;按浏览器刷新&#xff0c;看起来很正常&#xff0c;可以查看历史对话&#xff0c;但是再次尝试还是一…

php集成修改数据库的字段

1.界面效果 2.代码 <?phpecho <form action"" method"post"><label for"table">表名:</label><input type"text" id"table" name"table"><br><div id"fieldsContaine…

在 .NET 项目中复制资源文件夹到生成目录

本文主要介绍在使用 Visual Studio 进行调试和发布时&#xff0c;如何在 .NET 项目中复制资源文件夹到生成目录。 1. 背景 在开发 .NET 项目的过程中&#xff0c;我们有时会遇到需要在 debug 、 release 或是发布时将资源文件夹复制到生成目录的需求。这些资源可能包括图片、配…

Mybaties-Plus saveBatch()、自定义批量插入、多线程批量插入性能测试和对比

一.背景 最近在做一个项目的时候&#xff0c;由于涉及到需要将一个系统的基础数据全量同步到另外一个系统中去&#xff0c;结果一看&#xff0c;基础数据有十几万条&#xff0c;作为小白的我&#xff0c;使用单元测试&#xff0c;写了一段代码&#xff0c;直接采用了MP(Mybati…

【Python编程基础6/6】双向选择的判断

目录 知识回顾 导入 if-else 执行顺序 特性 两种判断语句的对比 就近原则 空值 定义 非空 定义 在判断语句中的关系 应用场景 练习 Debug 总结 知识回顾 在上节课中&#xff0c;我们学习了 if 判断&#xff0c;如果布尔表达式成立&#xff0c;就执行后面的代码块…

数据结构(二)——线性表(顺序表)

二、线性表 2.1线性表的定义和基本操作 2.1.1 线性表的基本概念 线性表&#xff1a;是具有相同数据类型的 n 个数据元素的有限序列。(Eg:所有的整数按递增次序排列&#xff0c;不是顺序表&#xff0c;因为所有的整数是无限的)其中n为表长&#xff0c;当n0时线性表是一个空表…

kali当中不同的python版本切换(超简单)

kali当中本身就是自带两个python版本的 配置 update-alternatives --install /usr/bin/python python /usr/bin/python2 100 update-alternatives --install /usr/bin/python python /usr/bin/python3 150 切换版本 update-alternatives --config python 0 1 2编号选择一个即可…

人才推荐 | 高级半导体工艺工程师,美国凯斯西储大学电化学博士

编辑 / 木子 审核 / 朝阳 伟骅英才 伟骅英才致力于以大数据、区块链、AI人工智能等前沿技术打造开放的人力资本生态&#xff0c;用科技解决职业领域问题&#xff0c;提升行业数字化服务水平&#xff0c;提供创新型的产业与人才一体化服务的人力资源解决方案和示范平台&#x…

uniapp 云开发笔记

uniapp云开发官方文档https://uniapp.dcloud.io/uniCloud/learning.html 新建 关联云空间 云函数获取用户openID uniCloud API列表https://uniapp.dcloud.io/uniCloud/cf-functions.html#unicloud-api%E5%88%97%E8%A1%A8 自建云函数login event中包含前端传来的参数 uniCloud.…

LeetCode刷题笔记之两数相加【数组】【中等】

两数相加 刷题笔记 &#x1f565;日期&#xff1a; 2024/03/09 题目描述&#xff1a; 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同…

mysql 性能优化——磁盘刷脏页性能优化

前言 大家是不是感觉mysql 更新挺快的呀&#xff0c;有没有想过mysql 更新为什么那么快。按道理说&#xff0c;mysql 更新都是先找到这一行数据&#xff0c;然后在去更新。意味着&#xff0c;就有两次磁盘操作&#xff0c;一个是磁盘读&#xff0c;一个是磁盘写。如果真的是这…

使用 SPL 高效实现 Flink SLS Connector 下推

作者&#xff1a;潘伟龙&#xff08;豁朗&#xff09; 背景 日志服务 SLS 是云原生观测与分析平台&#xff0c;为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务&#xff0c;基于日志服务的便捷的数据接入能力&#xff0c;可以将系统日志、业务日志等接入 …

【鸿蒙开发】第十七章 Web组件(一)

1 Web概述 Web组件用于在应用程序中显示Web页面内容&#xff0c;为开发者提供页面加载、页面交互、页面调试等能力。 页面加载&#xff1a;Web组件提供基础的前端页面加载的能力&#xff0c;包括&#xff1a;加载网络页面、本地页面、html格式文本数据。 页面交互&#xff1a…