superset 二开增加 flink 数据源连接通过flink sql 查询数据

前言

superset 目前还不支持 flink 的数据源连接,目前我们公司在探索使用数据湖那一套东西:

  • 使用 flink 作为计算引擎
  • 使用 paimon + oss对象存储对接 flink 作为底层存储
  • 使用 superset 通过 flink gateway 查询 paimon 数据形成报表

增加flink数据源

界面配置

image.png
我们通过添加其他数据源连接来增加 flink 的数据源连接。
image.png
在填写 SQL_ALCHEMY_URI 的时候这里的 driver需要注意,后边在二开代码的时候,需要根据这个 driver 识别到不同的 engine。
我们是通过 flink gateway 提供的 HTTP 接口来进行 flink sql 查询的,所以这里的 host, port 就是 flink gateway 的地址。
在添加连接的时候必须指定 catalog,不然在 superset 的 sqllab 左下侧就没法显示对应的 databases 和 tables。
如果我们的连接需要一些额外参数,可以通过右侧的进阶添加一些额外的参数,在业务代码里使用:
image.png
我这里就指定了该连接使用的 catalog, 以及每次执行 sqllab 查询的时候初始化的一些命令。

代码开发

定义 flink.py 文件

我们需要在 superset/superset/db_engine_specs目录下新增一个 flink.py文件包含三个类:

  • FlinkClient: 用于和 flink gateway 交互执行 flink sql。
  • FlinkEngine: 模拟 mysql 的 cursor, 在一个 cursor 实例的生命周期内,就是和 flink gateway 的session 生命周期,当cursor 结束时,就是断开 session 的时候。
  • FlinkEngineSpec: 继承 superset 自身的 BaseEngineSpect, superset 的业务代码需要通过该类执行 sql 和查询结果。

FlinkClient

import logging
from typing import Any, Dict, Optional, Tuple, List, Union, Set
import time
import re

import requests
import sqlparse
from sqlalchemy import types, select
from sqlalchemy.orm import Session
from sqlalchemy.sql import text
from sqlalchemy.engine import  Engine

from superset.models.core import Database
from superset.config import FLINK_HOST
from superset.db_engine_specs.base import BaseEngineSpec
from superset.models.sql_lab import Query

logger = logging.getLogger(__name__)


class FlinkClient:
    result_type = {
        "NOT_READY": "NOT_READY",   # 表明 sql 还在执行中
        "PAYLOAD": "PAYLOAD",   # 表明 sql 已经在 flink 集群上执行了,需要 client 循环获取结果
        "EOS": "EOS"            # 表明已经获取到 sql 执行结果了,可以退出循环
    }

    result_kind = {
        "SUCCESS_WITH_CONTENT": "SUCCESS_WITH_CONTENT",     # 执行的是查询结果的 sql
        "SUCCESS": "SUCCESS"                                # 执行的是命令
    }

    def __init__(self, **kwargs):
        self.session_id = None
        self.operation_ids = []

        # 添加连接时在额外参数中填写的初始化命令
        # 在执行 sql 前会先执行话初始化命令
        self.init_commands = kwargs.get("init_commands", [])

        # FLINK_HOST 就是 flink gateway 的地址,我是从环境变量中获取的
        self.get_session_url = FLINK_HOST + "/v1/sessions", "POST"
        self.execute_statement_url = FLINK_HOST + "/v1/sessions/{SESSION_ID}/statements/", "POST"
        self.fetch_result_url = FLINK_HOST + "/v1/sessions/{SESSION_ID}/operations/{OPERATION_ID}/result/{BATCH_NUM}", "GET"

        self.kwargs = kwargs

    def __enter__(self):
        # 使用上下文模式,调用的时候获取 session 和执行初始化命令
        self.get_session()
        for c in self.init_commands:
            operation_id = self.execute(c)
            self.fetch_result(operation_id=operation_id)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is not None:
            logger.error(f"flink gateway got error: {exc_type}, {exc_value}")
        return False

    def handle_request(self,
                       url: str,
                       method: str,
                       form_data: Dict[str, Any] = None,
                       json_data: Dict[str, Any] = None,
                       params: Dict[str, Any] = None,
                       headers: Dict[str, Any] = None,
                       timeout: Tuple[int, ...] = (10, 60)) -> Dict[str, Any]:
        try:
            kwargs = {
                "timeout": timeout,
                "headers": {"Content-Type": "application/json"}
            }
            if form_data:
                kwargs["data"] = form_data
            if params:
                kwargs["params"] = params
            if json_data:
                kwargs["json"] = json_data
            if headers:
                kwargs["headers"].update(headers)

            # logger.info(f"request to flink gateway url: {url}")
            # logger.info(f"request to flink gateway kwargs: {kwargs}")

            if method == 'GET':
                response = requests.get(url, **kwargs)
            elif method == 'POST':
                response = requests.post(url, **kwargs)
            elif method == 'PUT':
                response = requests.put(url, **kwargs)
            elif method == 'DELETE':
                response = requests.delete(url, **kwargs)
            else:
                raise ValueError("Unsupported HTTP method")

            response.raise_for_status()

            res = {
                'status_code': response.status_code,
                'headers': dict(response.headers),
                'data': response.json()
            }
            # logger.info(f"flink gateway response: {res}")

            return res
        except Exception as e:
            logger.error(f"flink gateway res error: {str(e)}")
            raise e

    def get_session(self):
        res = self.handle_request(self.get_session_url[0], self.get_session_url[1])
        self.session_id = res['data'].get('sessionHandle')

    def ping(self):
        operation_id = self.execute("select 1")
        return True if self.fetch_result(operation_id=operation_id) else False

    def execute(self, statement: str):
        # 执行 flink sql
        data = {"statement": statement}
        res = self.handle_request(
            self.execute_statement_url[0].format(SESSION_ID=self.session_id),
            self.execute_statement_url[1],
            json_data=data)
        self.operation_ids.append(res['data'].get('operationHandle'))
        return res['data'].get('operationHandle')

    def fetch_result(self, batch_num: int = 0, operation_id: str =None) -> Dict[str, Any]:
        """
        通过 flink gateway 获取执行结果:gateway 将 sql 提交至集群后返回 PAYLOAD 状态表示提交成功,
            否则返回 NOT_READY 状态。
        当提交至集群成功后,如果执行的 sql 是查询内容的,需要通过 batch_num(nextResultUri) 不断循环请求执行结果,
        直到 gateway 返回 EOS 状态,表示集群执行完毕,获取结果完毕。
        """
        url = self.fetch_result_url[0].format(SESSION_ID=self.session_id,
                                              OPERATION_ID=operation_id,
                                              BATCH_NUM=batch_num)
        res_data = []
        res = self.handle_request(url, self.fetch_result_url[1])
        #  后续考虑是否做成从环境变量中获取超时时间,且超时后是否考虑杀死集群上执行的任务
        timeout = 300  # flink gateway 提交任务至 session 集群超时时间
        while timeout and res['data']['resultType'] == self.result_type['NOT_READY']:
            time.sleep(1)
            timeout -= 1
            res = self.handle_request(url, self.fetch_result_url[1])

        # 等待集群执行完毕,获取结果
        if res['data']['resultKind'] == self.result_kind['SUCCESS_WITH_CONTENT']:
            timeout = 3600  # flink gateway 从集群获取结果超时时间
            while timeout and res['data']['resultType'] != self.result_type['EOS']:
                time.sleep(3)
                timeout -= 1
                res_data.extend(res['data']['results']['data'])
                logger.info(f"jobID: {res['data'].get('jobID')} waiting for result")
                next_result_url = FLINK_HOST + res['data']['nextResultUri']
                res = self.handle_request(next_result_url, self.fetch_result_url[1])
            res['data']['results']['data'] = res_data
        return res['data']['results']

    def get_schema_names(self, catalog: str) -> List[str]:
        operation_id = self.execute(f"use catalog {catalog}")
        self.fetch_result(operation_id=operation_id)
        operation_id = self.execute(f"show databases")
        res = self.fetch_result(operation_id=operation_id)
        return [i['fields'][0] for i in res['data']]

    def get_table_names(self, catalog: str, schema: str) -> List[str]:
        operation_id = self.execute(f"use catalog {catalog}")
        self.fetch_result(operation_id=operation_id)
        operation_id = self.execute(f"use {schema}")
        self.fetch_result(operation_id=operation_id)
        operation_id = self.execute("show tables")
        res = self.fetch_result(operation_id=operation_id)
        return [i['fields'][0] for i in res['data']]

    def get_columns(self, catalog: str, schema: str, table_name: str) -> List[str]:
        operation_id = self.execute(f"use catalog {catalog}")
        self.fetch_result(operation_id=operation_id)
        operation_id = self.execute(f"use {schema}")
        self.fetch_result(operation_id=operation_id)
        operation_id = self.execute(f"desc {table_name}")
        res = self.fetch_result(operation_id=operation_id)
        return [i['fields'] for i in res['data']]

这里与 flink gateway 交互获取结果有个需要注意的地方就是,我们将执行 sql 提交至 flink gateway 后, gateway resultType 会很快返回 PAYLOAD状态,这个时候不代表 sql 执行完了,代表的是集群在执行中了,我们可以阻塞获取执行结果了,然后我们在阻塞获取结果,当状态变为 EOS的时候,代表我们获取到了结果了,这个时候可以退出阻塞了。
官方的流程图说明如下:
image.png
这里需要了解下我们的客户端通过 HTTP 接口与 gateway 交互的流程,不熟悉的可以先通过官方文档了解下:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql-gateway/overview/

FlinkEngine
FlinkEngine 模拟的是类似 sql dialect 的 cursor, 通过该方法可以返回一个连接 flink gateway 的 client,在 cursor 的整个实例生命周期内使用的是同一个 FlinkClient 的 session。

class FlinkEngine:

    def __init__(self, catalog: str, schema: str = None,
                 init_commands: List[str] = None, **kwargs):
        self.catalog = catalog
        self.schema = schema
        self.init_commands = init_commands if init_commands else []
        self.kwargs = kwargs

        self.client: Optional[FlinkClient] = None

        self.columns = None

    @property
    def engine(self) -> 'FlinkEngine':
        return self

    def raw_connection(self) -> 'FlinkEngine':
        # 实例化 flink clint 生成一个 session, 当 cursor 结束时重置 client
        with FlinkClient(catalog=self.catalog, init_commands=self.init_commands) as c:
            self.client = c
        return self

    def cursor(self) -> 'FlinkEngine':
        """Return a new :py:class:`Cursor` object using the connection."""
        return self

    def close(self):
        self.client = None
        self.columns = None

    def commit(self):
        """Presto does not support transactions"""
        pass

    @property
    def description(self):
        """This read-only attribute is a sequence of 7-item sequences.

           Each of these sequences contains information describing one result column:

           - name
           - type_code
           - display_size (None in current implementation)
           - internal_size (None in current implementation)
           - precision (None in current implementation)
           - scale (None in current implementation)
           - null_ok (always True in current implementation)

           The ``type_code`` can be interpreted by comparing it to the Type Objects specified in the
           section below.
           """
        if self.columns is None:
            return None
        return [
            # name, type_code, display_size, internal_size, precision, scale, null_ok
            (col['name'], col['type'], None, None, None, None, True)
            for col in self.columns
        ]

FlinkEngineSpec
该方法需要继承 superset 的 BaseEngineSpec, 需要定义 engine 信息和 drivers 信息, 在 superset 的 sqllab 执行 sql 的时候会通过 drivers 定位到该方法执行。
所以在前边界面配置的时候需要注意连接信息中的 driver_name 要和类属性 drivers 匹配。

class FlinkEngineSpec(BaseEngineSpec):
    engine = "flink"
    engine_name = "Apache Flink"

    # 我们后边在业务代码中会通过判断连接的 driver_name 是否为 flink_driver 来调用该类中的方法
    # 因此需要注意前端界面配置是否一致
    drivers = {"flink_driver": "flink gateway engine"}
    default_driver = "flink_driver"

    client_init_commands = []

    @classmethod
    def get_schema_names(cls, catalog: str) -> List[str]:
        with FlinkClient(init_commands=cls.client_init_commands) as c:
            names = c.get_schema_names(catalog)
        return names

    @classmethod
    def get_table_names(cls, catalog: str, schema: str, database=None) -> List[str]:
        with FlinkClient(init_commands=cls.client_init_commands) as c:
            names = c.get_table_names(catalog, schema)
        return names

    @classmethod
    def get_view_names(cls, catalog: str, schema: str, database=None) -> Set[str]:
        return set()

    @classmethod
    def get_columns(cls, catalog: str, schema: str, table: str) -> List[Dict[str, Any]]:
        with FlinkClient(init_commands=cls.client_init_commands) as c:
            cs = c.get_columns(catalog, schema, table)
        result: List[Dict[str, Any]] = []
        for column in cs:
            column_spec = cls.get_column_spec(column[1])
            column_type = column_spec.sqla_type if column_spec else None
            if column_type is None:
                column_type = types.String()
            c = {
                "name": column[0],
                "type": column_type,
                "nullable": column[2],
                "default": None,
                "key": column[3]
            }

            try:
                c.update({"comment": column[6]})
            except Exception:
                pass
            result.append(c)
        return result

    @classmethod
    def get_pk_constraint(cls, catalog: str, schema: str, table: str) -> Dict[str, Any]:
        with FlinkClient(init_commands=cls.client_init_commands) as c:
            cs = c.get_columns(catalog, schema, table)
        pks = {"constrained_columns": None, "name": None}
        for column in cs:
            _type = column[3]
            if isinstance(_type, str) and _type.startswith("PRI"):
                matches = re.findall(r'\((.*?)\)', _type)
                pks["constrained_columns"] = [field.strip() for field in matches[0].split(',')]
                break
        return pks

    @classmethod
    def select_star(  # pylint: disable=too-many-arguments,too-many-locals
        cls,
        database: Database,
        table_name: str,
        engine: Engine,
        schema: Optional[str] = None,
        limit: int = 100,
        show_cols: bool = False,
        indent: bool = True,
        latest_partition: bool = True,
        cols: Optional[List[Dict[str, Any]]] = None,
    ) -> str:
        fields: Union[str, List[Any]] = "*"
        cols = cols or []
        if (show_cols or latest_partition) and not cols:
            cols = database.get_columns(table_name, schema)
        if show_cols:
            fields = cls._get_fields(cols)

        if schema:
            full_table_name = f"{schema}.{table_name}"
        else:
            full_table_name = f"{table_name}"

        qry = select(fields).select_from(text(full_table_name))

        if limit:
            qry = qry.limit(limit)
        if latest_partition:
            partition_query = cls.where_latest_partition(
                table_name, schema, database, qry, columns=cols
            )
            if partition_query is not None:
                qry = partition_query

        sql = str(qry.compile(compile_kwargs={"literal_binds": True}))

        if indent:
            sql = sqlparse.format(sql, reindent=True)
        return sql

    @classmethod
    def execute(  # pylint: disable=unused-argument
        cls,
        cursor: FlinkEngine,
        query: str,
        **kwargs: Any,
    ) -> None:
        """执行 flink sql 语句"""
        return cursor.client.execute(query)

    @classmethod
    def handle_cursor(cls, cursor: FlinkClient, query: Query, session: Session) -> None:
        """
        在执行 flink sql 执行过程中,执行一些动作:
        记录flink sql 任务的一些关键信息
        记录一些执行日志
        sleep 等待执行结果 等
        """
        return

    @classmethod
    def fetch_data(
        cls, cursor: FlinkEngine, limit: Optional[int] = None
    ) -> List[Tuple[Any, ...]]:
        res = cursor.client.fetch_result(operation_id=cursor.client.operation_ids[-1])
        cursor.columns = [{"name": i['name'], "type": i["logicalType"]["type"]} for i in
                          res.get('columns', [])]
        return [tuple(i['fields']) for i in res['data']]

    @classmethod
    def has_implicit_cancel(cls) -> bool:
        """
        该方法是sqllab 界面执行 sql 中点击暂停时调用的
        这里直接返回了 True, 因为 gateway 的 session 自己有过期时间
        我们也可以通过调用 gateway 的关闭 session 接口主动关闭
        """
        return True

    @classmethod
    def cancel_query(  # pylint: disable=unused-argument
        cls,
        cursor: FlinkClient,
        query: Query,
        cancel_query_id: str,
    ) -> bool:
        """
        该方法是sqllab 界面执行 sql 中点击暂停时调用的
        这里直接返回了 True, 因为 gateway 的 session 自己有过期时间
        我们也可以通过调用 gateway 的关闭 session 接口主动关闭
        """
        return True

修改测试连接逻辑

测试连接入口方法在 superset/databases/commands/test_connection.py下的 TestConnectionDatabaseCommand 类中的 run 方法,我们需要通过连接的 driver 来通过 FLinkClient 测试与 Flink gateway 的连接是否正常:
image.png

# flink 类型的连接走 flink gateway 验证
if database.driver == FLINK_DRIVER_NAME:
    from superset.db_engine_specs.flink import FlinkClient
    init_commands = database.get_encrypted_extra().get("init_commands", [])
    with FlinkClient(init_commands=init_commands) as c:
        if not c.ping():
            raise Exception("ping flink gateway err")
    return

修改 sqllab 界面逻辑

sqllab 界面需要修改获取库表信息和执行 sql 的接口逻辑:
image.png
查询库表字段信息的接口入口类都在 superset/superset/databases/api.py中:
image.png
api 入口的代码逻辑不需要修改。
获取库名称列表,修改 superset/superset/models/core.pyDatabase类中的 get_all_schema_names方法:
image.png

# flink 连接不走 sqlalchemy 的 create engine, 属于 FlinkEngineSpec
if self.driver == FLINK_DRIVER_NAME:
    extra = self.get_encrypted_extra()
    self.db_engine_spec.client_init_commands = extra.get("init_commands", [])
    return self.db_engine_spec.get_schema_names(extra['catalog'])

获取表名称列表,修改 superset/superset/models/core.pyDatabase类中的 get_all_table_names_in_schema方法和 get_all_view_names_in_schema方法:
image.png

# flink 连接不走 sqlalchemy 的 create engine, 属于 FlinkEngineSpec
if self.driver == FLINK_DRIVER_NAME:
    extra = self.get_encrypted_extra()
    self.db_engine_spec.client_init_commands = extra.get("init_commands", [])
    tables = {
        (table, schema)
        for table in self.db_engine_spec.get_table_names(
            extra['catalog'],
            schema
        )
    }
    return tables

image.png

if self.driver == FLINK_DRIVER_NAME:
    extra = self.get_encrypted_extra()
    self.db_engine_spec.client_init_commands = extra.get("init_commands", [])
    return {
        (view, schema)
        for view in self.db_engine_spec.get_view_names(
            extra['catalog'],
            schema
        )
    }

获取字段信息,修改 superset/superset/models/core.pyDatabase类中的 get_columns方法:
image.png

if self.driver == FLINK_DRIVER_NAME:
    extra = self.get_encrypted_extra()
    self.db_engine_spec.client_init_commands = extra.get("init_commands", [])
    return self.db_engine_spec.get_columns(extra["catalog"], schema, table_name)

获取表 comment 信息,修改 get_table_comment方法,这个目前还没有找到通过 flink sql 查询表 comment 信息的方法,这里直接返回空:
image.png

if self.driver == FLINK_DRIVER_NAME:
    return ""

获取索引信息,修改 get_indexes方法,返回空列表:
image.png

if self.driver == FLINK_DRIVER_NAME:
    return []

获取主键信息,修改get_pk_constraint方法:
image.png

if self.driver == FLINK_DRIVER_NAME:
    extra = self.get_encrypted_extra()
    self.db_engine_spec.client_init_commands = extra.get("init_commands", [])
    return self.db_engine_spec.get_pk_constraint(extra["catalog"], schema, table_name)

获取外键信息,修改get_foreign_keys方法:
image.png

if self.driver == FLINK_DRIVER_NAME:
    return []

执行 sql 相关的需要修改 _get_sqla_engine方法。
image.png

# VOYAH 如果是 flink_driver 就使用 FlinkEngineSpec.engine
if self.driver == FLINK_DRIVER_NAME:
    extra = self.get_encrypted_extra()
    self.db_engine_spec.client_init_commands = extra.get("init_commands", [])
    from superset.db_engine_specs.flink import FlinkEngine
    return FlinkEngine(schema=schema, **params)

总结

增加其他数据源连接,主要需要修改两个文件新增一个文件:

  • 修改 superset/databases/commands/test_connection.py中的TestConnectionDatabaseCommand 类中的 run 方法。

    修改 superset/superset/models/core.pyDatabase类中的get_all_table_names_in_schemaget_all_view_names_in_schemaget_columnsget_table_commentget_indexesget_pk_constraintget_foreign_keys_get_sqla_engine 方法。

  • superset/superset/db_engine_specs目录下新增一个 flink.py文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/479994.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Gavin Wood 精彩演讲|安全灵活 JAM 链,打造去中心化多核计算机

Polkadot 年度开发者大会 sub0 Asia 近期在泰国曼谷正式落幕。面对区块链行业的激烈竞争,Polkadot 创始人 Gavin Wood 在演讲中说明将如何利用 Polkadot 2.0 与 JAM 链带来新的技术创新,推动生态持续发展。 Polkadot 将推一个名为 JAM 链的新网络。JAM …

用傅里叶变换和反变换消除噪音信号干扰的软件实例

一、序言 场景一:噪音信号是数据采集处理的天敌,但无时无刻它都存在,于是,信号传输时进行屏蔽防护、模数转换时给予充分的采保时间、电路实现上低通带通处理,为了减小电解电容的感抗作用有时还附加上瓷片电容滤波&…

python的ITS 信息平台的设计与实现flask-django-nodejs-php

第二,陈列说明该系统实现所采用的架构、系统搭建采用的服务器、系统开发环境和使用的工具,以及系统后台采用的数据库。 最后,对系统进行全面测试,主要包括功能测试、查询性能测试、安全性能测试。 分析系统存在的不足以及将来改进…

深度学习pytorch——感知机(Perceptron)(持续更新)

什么是感知机? 感知机是由美国学者FrankRosenblatt在1957年提出来的。感知机是作为神经网络(深度学习)的起源的算法。因此,学习感知机的构造也就是学习通向神经网络和深度学习的一种重要思想。 感知机接收多个输入信号&#xff0c…

在服务器(Ubuntu20.04)安装用户级别的cuda11.8(以及仿照前面教程安装cuda11.3后安装cudnn和pytorch1.9.0)

1、cuda11.8的下载 首先在cuda官网下载我们需要的cuda版本,这里我下载的是cuda11.8(我的最高支持cuda12.0) 这里我直接使用wget命令下载不了,于是我直接在浏览器输入后面的链接下载到本地,之后再上传至服务器的&am…

如何使用人工智能和ChatGPT来优化营销转化率

人工智能 (AI) 和营销的交集正在彻底改变企业与客户互动的方式,最终改变营销转化率。人工智能能够分析大量数据、理解模式和自动执行任务,它不仅是一项创新技术,而且是营销领域的根本性转变。这种转变允许更加个性化、…

Loader和Plugin的区别?编写Loader,Plugin的思路。

一、区别 前面两节我们有提到Loader与Plugin对应的概念,先来回顾下 loader 是文件加载器,能够加载资源文件,并对这些文件进行一些处理,诸如编译、压缩等,最终一起打包到指定的文件中plugin 赋予了 webpack 各种灵活的…

Jupyter服务器端为R语言安装readr包

1.登录debian服务器 方式1.Windows10中可利用putty登录linux服务器 方式2.自从搭建了Jupyter服务器后,还可以从juypyter的终端来登录linux服务器 2.进入R语言命令行 3.安装readr包 >install.packages(‘readr’) …

四川宏博蓬达法律咨询有限公司:法律服务安全的新标杆

在这个法治社会,法律服务行业扮演着越来越重要的角色。四川宏博蓬达法律咨询有限公司,作为行业内的佼佼者,始终坚持以客户为中心,为客户提供专业、高效、安全的法律服务。 一、公司背景与实力展示 四川宏博蓬达法律咨询有限公司自…

python - 更改pdf中文本的字体高亮颜色(fitz模块)

import fitzdoc fitz.open(r"e:/test.pdf") pagedoc[0]# 按照指定的位置设置颜色 highlight page.add_highlight_annot((20, 500,60, 520)) highlight.set_colors(stroke[1, 1, 0]) # light red color (r, g, b) 颜色rgb每个除以255得出 highlight.update()# 按照…

docker 安装部署 jenkins

今天 小☀ 给大家普及一下什么是 jenkins!! Jenkins是一个开源软件项目,基于Java开发的持续集成工具。它提供了一个开放易用的软件平台,使软件项目可以进行持续集成。Jenkins起源于Hudson,主要用于持续、自动地构建、…

面试笔记——MySQL(主从同步原理、分库分表)

主从同步原理 主从同步结构:主库负责写数据,从库负责读数据,如图—— MySQL主从复制的核心就是二进制日志(BINLOG),它记录了所有的 DDL(数据定义语言)语句和 DML(数据操…

Tkinter 一文读懂

Tkinter 简介 Tkinter(即 tk interface,简称“Tk”)本质上是对 Tcl/Tk 软件包的 Python 接口封装,它是 Python 官方推荐的 GUI 工具包,属于 Python 自带的标准库模块,当您安装好 Python 后,就可…

使用PDFBox调整PDF每页格式

目录 一、内容没有图片 二、内容有图片 maven依赖&#xff0c;这里使用的是pdfbox的2.0.30版本 <dependency><groupId>org.apache.pdfbox</groupId><artifactId>pdfbox</artifactId><version>2.0.30</version></dependency>…

从零开始学Spring Boot系列-集成Kafka

Kafka简介 Apache Kafka是一个开源的分布式流处理平台&#xff0c;由LinkedIn公司开发和维护&#xff0c;后来捐赠给了Apache软件基金会。Kafka主要用于构建实时数据管道和流应用。它类似于一个分布式、高吞吐量的发布-订阅消息系统&#xff0c;可以处理消费者网站的所有动作流…

【Linux】内核空间动态内存申请

&#x1f525;博客主页&#xff1a;PannLZ &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 文章目录 内核空间动态内存申请1.kmalloc()2._ _get_free_pages()3.vmalloc() 内核空间动态内存申请 1.kmalloc() #include <linux/slab.h>vo…

Flask项目中使用蓝湖实现启动项配置——多个controller项

项目结构 # controller1-__init__.py from flask import Blueprintcont2_sale_blueprint Blueprint(cont2_sale_blueprint, __name__) cont2_user_blueprint Blueprint(cont2_user_blueprint, __name__) from . import user_controller from . import sale_controller# contr…

推荐一款很不错的vscode高亮插件

用过很多款高亮插件&#xff0c;总感觉大部分显示都很乱&#xff0c;但是其中有一款用起来很清晰明了&#xff0c;很喜欢&#xff01; 插件名字&#xff1a;select-highlight-cochineal-color 使用效果&#xff1a; 底色高亮让人感觉很清晰&#xff0c;一个好的高亮插件能让你…

VScode通过ssh连接github

通过ssh连接github 1.生成公钥和私钥2.设置config文件3.配置ssh免密登录4.远程仓库初始化 1.生成公钥和私钥 首先选择一个文件夹&#xff0c;右击 git bash here&#xff0c;在命令行输入命令&#xff0c;按下三次回车生成一个**.ssh文件夹**&#xff0c;一般在用户的user根目…

Django信号

一、介绍 Django有一个“信号调度器(signal dispatcher)”,当框架中的其他地方发生操作时,它可以通知一些解耦的应用程序 官网:信号 | Django 文档 | Django 1.1、内置的信号的使用 1.1.1、定义接收器函数 def my_callback(sender, **kwargs):print("Request finis…