openmetadata1.3.1 自定义连接器 开发教程

openmetadata自定义连接器开发教程

一、开发通用自定义连接器教程

官网教程链接:

1.https://docs.open-metadata.org/v1.3.x/connectors/custom-connectors

2.https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector

(一)创建服务类型自定义连接器类

参考文档:https://docs.open-metadata.org/v1.3.x/sdk/python/build-connector/source#for-consumers-of-openmetadata-ingestion-to-define-custom-connectors-in-their-own-package-with-same-namespace

1.创建自定义连接器

示例:my_csv_connector.py

"""
自定义Database Service 从 CSV 文件中提取元数据
"""
import csv
import traceback

from pydantic import BaseModel, ValidationError, validator
from pathlib import Path
from typing import Iterable, Optional, List, Dict, Any

from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import Either
from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from metadata.ingestion.api.steps import Source, InvalidSourceException
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.database.customDatabaseConnection import (
    CustomDatabaseConnection,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.api.data.createDatabaseSchema import (
    CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.entity.services.databaseService import (
    DatabaseService,
)
from metadata.generated.schema.entity.data.table import (
    Column,
)
from metadata.generated.schema.metadataIngestion.workflow import (
    Source as WorkflowSource,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class InvalidCsvConnectorException(Exception):
    """
    Sample data is not valid to be ingested
    """


class CsvModel(BaseModel):
    name: str
    column_names: List[str]
    column_types: List[str]

    @validator("column_names", "column_types", pre=True)
    def str_to_list(cls, value):
        """
        Suppose that the internal split is in ;
        """
        return value.split(";")


class CsvConnector(Source):
    """
    Custom connector to ingest Database metadata.

    We'll suppose that we can read metadata from a CSV
    with a custom database name from a business_unit connection option.
    """

    # 内置方法
    def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
        self.config = config
        self.metadata = metadata
        # 获取配置信息
        self.service_connection = config.serviceConnection.__root__.config

        self.source_directory: str = (
            # 获取CSV文件路径
            self.service_connection.connectionOptions.__root__.get("source_directory")
        )
        if not self.source_directory:
            raise InvalidCsvConnectorException(
                "未获取到source_directory配置信息"
            )

        self.business_unit: str = (
            # 获取自定义的数据库名称
            self.service_connection.connectionOptions.__root__.get("business_unit")
        )
        if not self.business_unit:
            raise InvalidCsvConnectorException(
                "未获取到business_unit配置信息"
            )

        self.data: Optional[List[CsvModel]] = None

        super().__init__()

    # 内置函数
    @classmethod
    def create(
            cls, config_dict: dict, metadata_config: OpenMetadataConnection
    ) -> "CsvConnector":
        config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
        connection: CustomDatabaseConnection = config.serviceConnection.__root__.config
        if not isinstance(connection, CustomDatabaseConnection):
            raise InvalidSourceException(
                f"Expected CustomDatabaseConnection, but got {connection}"
            )
        return cls(config, metadata_config)

    # 静态方法:按行读取
    @staticmethod
    def read_row_safe(row: Dict[str, Any]):
        try:
            return CsvModel.parse_obj(row)
        except ValidationError:
            logger.warning(f"Error parsing row {row}. Skipping it.")

    # 预处理:读取文件及数据
    def prepare(self):
        # Validate that the file exists
        source_data = Path(self.source_directory)
        if not source_data.exists():
            raise InvalidCsvConnectorException("Source Data path does not exist")

        try:
            with open(source_data, "r", encoding="utf-8") as file:
                reader = csv.DictReader(file)
                # 读取数据
                self.data = [self.read_row_safe(row) for row in reader]
        except Exception as exc:
            logger.error("Unknown error reading the source file")
            raise exc

    def yield_create_request_database_service(self):
        yield Either(
            # 串讲元数据读取服务
            right=self.metadata.get_create_service_from_source(
                entity=DatabaseService, config=self.config
            )
        )

    # 业务原数据库名处理方法
    def yield_business_unit_db(self):
        # 选择我们刚刚创建的服务(如果不是UI)
        # 获取提取服务对象
        service_entity: DatabaseService = self.metadata.get_by_name(
            entity=DatabaseService, fqn=self.config.serviceName
        )
        yield Either(
            right=CreateDatabaseRequest(
                name=self.business_unit,
                service=service_entity.fullyQualifiedName,
            )
        )

    # chems处理方法
    def yield_default_schema(self):
        # Pick up the service we just created (if not UI)
        database_entity: Database = self.metadata.get_by_name(
            entity=Database, fqn=f"{self.config.serviceName}.{self.business_unit}"
        )

        yield Either(
            right=CreateDatabaseSchemaRequest(
                name="default",
                database=database_entity.fullyQualifiedName,
            )
        )

    # 业务元数据处理方法
    def yield_data(self):
        """
        Iterate over the data list to create tables
        """
        database_schema: DatabaseSchema = self.metadata.get_by_name(
            entity=DatabaseSchema,
            fqn=f"{self.config.serviceName}.{self.business_unit}.default",
        )
        # 异常处理
        # 假设我们有一个要跟踪的故障
        # try:
        #     1/0
        # except Exception:
        #     yield Either(
        #         left=StackTraceError(
        #             name="My Error",
        #             error="Demoing one error",
        #             stackTrace=traceback.format_exc(),
        #         )
        #     )
        # 解析csv元数据信息(获取列名和类型)
        for row in self.data:
            yield Either(
                right=CreateTableRequest(
                    name=row.name,
                    databaseSchema=database_schema.fullyQualifiedName,
                    columns=[
                        Column(
                            name=model_col[0],
                            dataType=model_col[1],
                        )
                        for model_col in zip(row.column_names, row.column_types)
                    ],
                )
            )

    # 迭代器:元数据迭代返回
    def _iter(self) -> Iterable[Entity]:
        # 数据库元数据信息存储
        yield from self.yield_create_request_database_service()
        # 业务源数据库
        yield from self.yield_business_unit_db()
        # 业务schema
        yield from self.yield_default_schema()
        # 业务数据
        yield from self.yield_data()

    # 测试数据库连接
    def test_connection(self) -> None:
        pass

    # 连接关闭
    def close(self):
        pass

(二)将自定义连接器方法打包编译进ingestion镜像

项目目录:

image-20240701153616535

Dockerfile:

FROM openmetadata/ingestion:1.3.1

# Let's use the same workdir as the ingestion image
WORKDIR ingestion
USER airflow

# Install our custom connector
COPY connector connector
COPY setup.py .
COPY sample.csv .
#COPY person_info.proto .
RUN pip install --no-deps .

编译服务镜像

docker build -t om-ingestion:build -f Dockerfile .

(三)部署新版ingestion服务()

docker-compose up -d

docker-compose-ingestion.yml

version: "3.9"
volumes:
  ingestion-volume-dag-airflow:
  ingestion-volume-dags:
  ingestion-volume-tmp:
  es-data:
services:  
  ingestion:
    container_name: om_ingestion
    image: om-ingestion:build
    environment:
      AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_GENERATED_CONFIGS: "/opt/airflow/dag_generated_configs"
      DB_SCHEME: ${AIRFLOW_DB_SCHEME:-postgresql+psycopg2}
      DB_HOST: ${AIRFLOW_DB_HOST:-host.docker.internal}
      DB_PORT: ${AIRFLOW_DB_PORT:-5432}
      AIRFLOW_DB: ${AIRFLOW_DB:-airflow_db}
      DB_USER: ${AIRFLOW_DB_USER:-airflow_user}
      DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass}

      # extra connection-string properties for the database
      # EXAMPLE
      # require SSL (only for Postgres)
      # properties: "?sslmode=require"
      DB_PROPERTIES: ${AIRFLOW_DB_PROPERTIES:-}
      # To test the lineage backend
      # AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend
      # AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflow
      AIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://host.docker.internal:8585/api
      AIRFLOW__LINEAGE__JWT_TOKEN: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MDk3MDkyNDMsImV4cCI6bnVsbH0.U7XIYZjJAmJ-p3WTy4rTGGSzUxZeNpjOsHzrWRz7n-zAl-GZvznZWMKX5nSX_KwRHAK3UYuO1UX2-ZbeZxdpzhyumycNFyWzwMs8G6iEGoaM6doGhqCgHileco8wcAoaTXKHTnwa80ddWHt4dqZmikP7cIhLg9etKAepQNQibefewHbaLOoCrFyo9BqFeZzNaVBo1rogNtslWaDO6Wnk_rx0jxRLTy57Thq7R7YS_nZd-JVfYf72BEFHJ_WDZym4k-dusV0PWGzMPYIXq3s1KbpPBt_tUSz4cUrXbLuI5-ZsOWIvUhsLeHJDU-35-RymylhMrQ92kZjsy7v2nl6apQ
    entrypoint: /bin/bash
    command:
      - "/opt/airflow/ingestion_dependency.sh"
    expose:
      - 8080
    ports:
      - "8080:8080"
    networks:
      - app_net_ingestion
    volumes:
      - ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs
      - ingestion-volume-dags:/opt/airflow/dags
      - ingestion-volume-tmp:/tmp

networks:
  app_net_ingestion:
    ipam:
      driver: default
      config:
        - subnet: "172.16.240.0/24"

(四)根据服务类型选择对应类型的custom服务创建采集器测试

image-20240701160552070

点击保存添加元数据提取器:

image-20240701160623658

image-20240701160654370

二、开发内置连接器教程(Streamsets)

官网教程链接:https://docs.open-metadata.org/v1.3.x/developers/contribute/developing-a-new-connector

(一)定义连接器class类json模版(streamSetsConnection.json)

目录openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/streamSetsConnection.json

{
  "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/streamSetsConnection.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "StreamSetsConnection",
  "description": "StreamSets Metadata Pipeline Connection Config",
  "type": "object",
  "javaType": "org.openmetadata.schema.services.connections.pipeline.StreamSetsConnection",
  "definitions": {
    "StreamSetsType": {
      "description": "Service type.",
      "type": "string",
      "enum": ["StreamSets"],
      "default": "StreamSets"
    },
    "basicAuthentication": {
      "title": "Username Authentication",
      "description": "Login username",
      "type":"object",
      "properties": {
        "username": {
          "title": "Username",
          "description": "StreamSets user to authenticate to the API.",
          "type": "string"
        }
      },
      "additionalProperties": false
    }
  },
  "properties": {
    "type": {
      "title": "Service Type",
      "description": "Service Type",
      "$ref": "#/definitions/StreamSetsType",
      "default": "StreamSets"
    },
    "hostPort": {
      "expose": true,
      "title": "Host And Port",
      "description": "Pipeline Service Management/UI URI.",
      "type": "string",
      "format": "uri"
    },
    "streamSetsConfig": {
      "title": "StreamSets Credentials Configuration",
      "description": "We support username authentication",
      "oneOf": [
        {
          "$ref": "#/definitions/basicAuthentication"
        }
      ]
    },
    "supportsMetadataExtraction": {
      "title": "Supports Metadata Extraction",
      "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
    }
  },
  "additionalProperties": false,
  "required": ["hostPort", "streamSetsConfig"]
}

(二)开发采集器源码:

目录:ingestion/src/metadata/ingestion/source/pipeline/streamsets/*

image-20240701162822027

1.streamsets连接客户端(client.py)

import logging
import traceback
from typing import Any, Iterable, Optional

import requests
from requests import HTTPError
from requests.auth import HTTPBasicAuth

# 设置日志记录器
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

REQUESTS_TIMEOUT = 60 * 5


def clean_uri(uri: str) -> str:
    """清理URI,确保它以HTTP或HTTPS开头"""
    if not uri.startswith(("http://", "https://")):
        return "http://" + uri
    return uri


class StreamSetsClient:
    """
    在StreamSets Data Collector REST API之上的包装器
    """

    def __init__(
            self,
            host_port: str,
            username: Optional[str] = None,
            password: Optional[str] = None,
            verify: bool = False,
    ):
        self.api_endpoint = clean_uri(host_port) + "/rest"
        self.username = username
        self.password = password
        self.verify = verify
        self.headers = {"Content-Type": "application/json"}

    def get(self, path: str) -> Optional[Any]:
        """
        GET方法包装器
        """
        try:
            res = requests.get(
                f"{self.api_endpoint}/{path}",
                verify=self.verify,
                headers=self.headers,
                timeout=REQUESTS_TIMEOUT,
                auth=HTTPBasicAuth(self.username, self.password),
            )
            res.raise_for_status()
            return res.json()
        except HTTPError as err:
            logger.warning(f"Connection error calling the StreamSets API - {err}")
            raise err

        except ValueError as err:
            logger.warning(f"Cannot pick up the JSON from API response - {err}")
            raise err

        except Exception as err:
            logger.warning(f"Unknown error calling StreamSets API - {err}")
            raise err

    def list_pipelines(self) -> Iterable[dict]:
        """
        List all pipelines
        """
        try:
            return self.get("v1/pipelines")
        except Exception as err:
            logger.error(traceback.format_exc())
            raise err

    def get_pipeline_details(self, pipeline_id: str) -> dict:
        """
        Get a specific pipeline by ID
        """
        return self.get(f"v1/pipeline/{pipeline_id}?rev=0&get=pipeline")

    def test_list_pipeline_detail(self) -> Iterable[dict]:
        """
        Test API access for listing pipelines
        """
        return self.list_pipelines()

2.连接器和测试连接器(connection.py)

"""
源连接处理程序
"""
from typing import Optional

from metadata.generated.schema.entity.automations.workflow import (
    Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (
    BasicAuthentication,
    StreamSetsConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.streamsets.client import StreamSetsClient


def get_connection(connection: StreamSetsConnection) -> StreamSetsClient:
    """
    Create connection
    """
    if isinstance(connection.streamSetsConfig, BasicAuthentication):
        return StreamSetsClient(
            host_port=connection.hostPort,
            username=connection.streamSetsConfig.username,
            password="95bd7977208bc935cac3656f4a9eea3a",
            verify=False,
        )


def test_connection(
        metadata: OpenMetadata,
        client: StreamSetsClient,
        service_connection: StreamSetsConnection,
        automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
    """
    元数据工作流或自动化工作流期间
    测试连接。这可以作为一部分执行
    """

    def custom_executor():
        list(client.list_pipelines())

    test_fn = {"GetPipelines": custom_executor}

    test_connection_steps(
        metadata=metadata,
        test_fn=test_fn,
        service_type=service_connection.type.value,
        automation_workflow=automation_workflow,
    )

3.元数据提取器(metadata.py)

"""
提取StreamSets 源的元数据 
"""
import traceback
from typing import Iterable, List, Optional, Any

from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from pydantic import BaseModel, ValidationError

from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (
    StreamSetsConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
    Source as WorkflowSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class StagesDetails(BaseModel):
    instanceName: str
    label:str
    stageType: str
    stageName: str
    description: str
    inputLanes: List[str]
    outputLanes: List[str]
    downstream_task_names: set[str] = set()

class StreamSetsPipelineDetails(BaseModel):
    """
    Defines the necessary StreamSets information
    """
    uuid: str
    pipelineId: str
    title: str
    name: str
    created: int
    creator: str
    description: str


class StreamsetsSource(PipelineServiceSource):
    """
    执行必要的方法,从 Airflow 的元数据数据库中提取管道元数据
    """

    @classmethod
    def create(cls, config_dict: dict, metadata: OpenMetadata):
        logger.info("create..........")
        config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
        logger.info(f"WorkflowSource: {config}")
        connection: StreamSetsConnection = config.serviceConnection.__root__.config
        logger.info(f"StreamSetsConnection: {connection}")
        if not isinstance(connection, StreamSetsConnection):
            raise InvalidSourceException(
                f"Expected StreamSetsConnection, but got {connection}"
            )
        return cls(config, metadata)

    def yield_pipeline(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Iterable[Either[CreatePipelineRequest]]:
        logger.info("yield_pipeline.......")
        try:
            connection_url = None
            if self.service_connection.hostPort:
                connection_url = (
                    f"{clean_uri(self.service_connection.hostPort)}/rest/v1/pipelines"
                )
            logger.info(f"pipeline_details:{pipeline_details}")
            logger.info(f"connection_url:{connection_url}")
            pipeline_request = CreatePipelineRequest(
                name=pipeline_details.name,
                displayName=pipeline_details.title,
                sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",
                tasks=self._get_tasks_from_details(pipeline_details),
                service=self.context.pipeline_service,
            )
            yield Either(right=pipeline_request)
            self.register_record(pipeline_request=pipeline_request)
        except TypeError as err:
            self.context.task_names = set()
            yield Either(
                left=StackTraceError(
                    name=pipeline_details.dag_id,
                    error=(
                        f"Error building DAG information from {pipeline_details}. There might be Airflow version"
                        f" incompatibilities - {err}"
                    ),
                    stackTrace=traceback.format_exc(),
                )
            )
        except ValidationError as err:
            self.context.task_names = set()
            yield Either(
                left=StackTraceError(
                    name=pipeline_details.dag_id,
                    error=f"Error building pydantic model for {pipeline_details} - {err}",
                    stackTrace=traceback.format_exc(),
                )
            )

        except Exception as err:
            self.context.task_names = set()
            yield Either(
                left=StackTraceError(
                    name=pipeline_details.dag_id,
                    error=f"Wild error ingesting pipeline {pipeline_details} - {err}",
                    stackTrace=traceback.format_exc(),
                )
            )

    # 获取解析管道详情
    def _get_tasks_from_details(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Optional[List[Task]]:
        logger.info("_get_tasks_from_details.......")
        logger.info(f"StreamSetsPipelineDetails:{pipeline_details}")
        try:
            stages = self.get_stages_by_pipline(pipeline_details)
            return [
                Task(
                    name=stage.instanceName,
                    displayName=stage.label,
                    sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",
                    taskType=stage.stageType,
                    description=stage.description,
                    downstreamTasks=list(stage.downstream_task_names)
                    if stage.downstream_task_names
                    else [],
                )
                for stage in stages
            ]
        except Exception as err:
            logger.debug(traceback.format_exc())
            logger.warning(
                f"Wild error encountered when trying to get tasks from Pipeline Details {pipeline_details} - {err}."
            )
        return None

    def yield_pipeline_lineage_details(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Iterable[Either[AddLineageRequest]]:
        logger.info("yield_pipeline_lineage_details..........")

        """
        将连接转换为管道实体
        :param pipeline_details: 来自  StreamSets的pipeline_details对象
        return:使用任务创建管道请求
        """
        pass

    def get_pipelines_list(
            self
    ) -> Optional[List[StreamSetsPipelineDetails]]:
        logger.info("get_pipelines_list..........")
        """Get List of all pipelines"""
        if self.connection.list_pipelines() is not None:
            for list_pipeline in self.connection.list_pipelines():
                logger.info(f"pipeline:{list_pipeline}")
                try:
                    yield StreamSetsPipelineDetails(
                        uuid=list_pipeline.get("uuid"),
                        pipelineId=list_pipeline.get("pipelineId"),
                        title=list_pipeline.get("title"),
                        name=list_pipeline.get("name"),
                        created=list_pipeline.get("created"),
                        creator=list_pipeline.get("creator"),
                        description=list_pipeline.get("description"),
                    )
                except (ValueError, KeyError, ValidationError) as err:
                    logger.debug(traceback.format_exc())
                    logger.warning(
                        f"Cannot create StreamSetsPipelineDetails from {list_pipeline} - {err}"
                    )
                except Exception as err:
                    logger.debug(traceback.format_exc())
                    logger.warning(
                        f"Wild error encountered when trying to get pipelines from Process Group {list_pipeline} - {err}."
                    )
        else:
            return None

    # 获取上下关联关系
    def get_stages_lane(
            self, stages: Optional[List[StagesDetails]]
    ) -> {}:
        logger.info("get_stages_lane......")
        input_lane_to_stage_map = {}
        for stage in stages:
            logger.info(f"stage_info:{stage}")
            for input_lane in stage.get("inputLanes", []):
                try:
                    if input_lane_to_stage_map.get(input_lane) is None:
                        input_lane_to_stage_map[input_lane] = set()
                        input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))
                    else:
                        input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))
                except Exception as err:
                    logger.debug(traceback.format_exc())
                    logger.warning(
                        f"Wild error encountered when trying to get stages from Pipeline Details {stages} - {err}.")
        logger.info(f"input_lane_to_stage_map:{input_lane_to_stage_map}")
        return input_lane_to_stage_map

    def get_stages_by_pipline(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Optional[List[StagesDetails]]:
        logger.info("get_stages_by_pipline")
        pipeline_detail = self.connection.get_pipeline_details(pipeline_details.pipelineId)
        stages = []
        if pipeline_detail.get("stages"):
            stages = pipeline_detail.get("stages")
        input_lane_to_stage_map = self.get_stages_lane(stages)
        for stage in stages:
            logger.info(f"stage:{stage}")
            try:
                input_lanes = stage.get("inputLanes", [])
                output_lanes = stage.get("outputLanes", [])
                downstream_stage_names = set()
                for output_lane in stage.get("outputLanes", []):
                    if output_lane in input_lane_to_stage_map.keys():
                        for down_stage in input_lane_to_stage_map.get(output_lane, []):
                            downstream_stage_names.add(down_stage)
                yield StagesDetails(
                    instanceName=stage.get("instanceName"),
                    label=stage["uiInfo"].get("label"),
                    stageType=stage["uiInfo"].get("stageType"),
                    stageName=stage.get("stageName"),
                    description=stage["uiInfo"].get("description"),
                    inputLanes=input_lanes,
                    outputLanes=output_lanes,
                    downstream_task_names=downstream_stage_names
                )
            except (ValueError, KeyError, ValidationError) as err:
                logger.debug(traceback.format_exc())
                logger.warning(
                    f"Cannot create StagesDetails from {stage} - {err}"
                )
            except Exception as err:
                logger.debug(traceback.format_exc())
                logger.warning(
                    f"Wild error encountered when trying to get pipelines from Process Group {stage} - {err}."
                )

    def get_pipeline_name(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> str:
        return pipeline_details.name

    def yield_pipeline_status(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Iterable[Either[OMetaPipelineStatus]]:
        pass

(三)修改前端ui源码,添加连接器对象

目录:openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts

/*
 *  Copyright 2022 Collate.
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *  http://www.apache.org/licenses/LICENSE-2.0
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

import { cloneDeep } from 'lodash';
import { COMMON_UI_SCHEMA } from '../constants/Services.constant';
import { PipelineServiceType } from '../generated/entity/services/pipelineService';
import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json';
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
import customPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/customPipelineConnection.json';
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
import databricksPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/databricksPipelineConnection.json';
import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domoPipelineConnection.json';
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json';
import streamSetsConnection from '../jsons/connectionSchemas/connections/pipeline/streamSetsConnection.json';

export const getPipelineConfig = (type: PipelineServiceType) => {
  let schema = {};
  const uiSchema = { ...COMMON_UI_SCHEMA };
  switch (type) {
    case PipelineServiceType.Airbyte: {
      schema = airbyteConnection;

      break;
    }

    case PipelineServiceType.Airflow: {
      schema = airflowConnection;

      break;
    }
    case PipelineServiceType.GluePipeline: {
      schema = gluePipelineConnection;

      break;
    }
    case PipelineServiceType.Fivetran: {
      schema = fivetranConnection;

      break;
    }
    case PipelineServiceType.Dagster: {
      schema = dagsterConnection;

      break;
    }
    case PipelineServiceType.Nifi: {
      schema = nifiConnection;

      break;
    }
    case PipelineServiceType.StreamSets: {
      schema = streamSetsConnection;

      break;
    }
    case PipelineServiceType.DomoPipeline: {
      schema = domoPipelineConnection;

      break;
    }
    case PipelineServiceType.CustomPipeline: {
      schema = customPipelineConnection;

      break;
    }
    case PipelineServiceType.DatabricksPipeline: {
      schema = databricksPipelineConnection;

      break;
    }
    case PipelineServiceType.Spline: {
      schema = splineConnection;

      break;
    }

    default:
      break;
  }

  return cloneDeep({ schema, uiSchema });
};

(四)前端ui源码,添加MD说明文档

路径:openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/StreamSets.md

# StreamSets
在本节中,我们将提供使用 StreamSets 连接器的指南和参考。

## 要求
系统 支持 StreamSets 连接器的 1 种连接类型:
- **基本认证**:使用用户名对 StreamSets 进行登陆。

您可以在 [docs](https://docs.open-metadata.org/connectors/pipeline/StreamSets) 中找到有关 StreamSets 连接器的详细信息。

## 连接详细信息
$$section
### Host and Port $(id="hostPort")
管道服务管理 URI。这应指定为格式为"scheme://hostname:port"的 URI 字符串。例如,“http://localhost:8443”、“http://host.docker.internal:8443”。
$$

$$section
### StreamSets Config $(id="StreamSetsConfig")
OpenMetadata 支持基本身份验证(用户名/密码身份验证。有关详细信息,请参阅要求部分。
$$

$$section
### Username $(id="username")
用于连接到 StreamSets 的用户名。此用户应该能够向 StreamSets API 发送请求并访问“资源”终结点。
$$

(五)创建 Java ClassConverter(可选)

(六)构建dockefile重新构建镜像

server服务Dockerfile

# Build stage
FROM alpine:3.19 AS build

COPY openmetadata-dist/target/openmetadata-*.tar.gz /
#COPY docker/openmetadata-start.sh /

RUN mkdir -p /opt/openmetadata && \
    tar zxvf openmetadata-*.tar.gz -C /opt/openmetadata --strip-components 1 && \
    rm openmetadata-*.tar.gz

# Final stage
FROM alpine:3.19

EXPOSE 8585

RUN adduser -D openmetadata && \
    apk update && \
    apk upgrade && \
    apk add --update --no-cache bash openjdk17-jre tzdata
ENV TZ=Asia/Shanghai

COPY --chown=openmetadata:openmetadata --from=build /opt/openmetadata /opt/openmetadata
COPY --chmod=755 docker/openmetadata-start.sh /

USER openmetadata

WORKDIR /opt/openmetadata
ENTRYPOINT [ "/bin/bash" ]
CMD ["/openmetadata-start.sh"]

ingestion服务Dockerfile

路径:ingestion/Dockerfile

FROM apache/airflow:2.7.3-python3.10

#FROM docker-compose-ingestion-ingestion:latest
USER root
RUN curl -sS https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl -sS https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
# Install Dependencies (listed in alphabetical order)
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get -qq update \
    && apt-get -qq install -y \
    tzdata \
    alien \
    build-essential \
    default-libmysqlclient-dev \
    freetds-bin \
    freetds-dev \
    gcc \
    gnupg \
    libaio1 \
    libevent-dev \
    libffi-dev \
    libpq-dev \
    librdkafka-dev \
    libsasl2-dev \
    libsasl2-2 \
    libsasl2-modules \
    libsasl2-modules-gssapi-mit \
    libssl-dev \
    libxml2 \
    libkrb5-dev \
    openjdk-11-jre \
    openssl \
    postgresql \
    postgresql-contrib \
    tdsodbc \
    unixodbc \
    unixodbc-dev \
    unzip \
    vim \
    git \
    wget --no-install-recommends \
    # Accept MSSQL ODBC License
    && ACCEPT_EULA=Y apt-get install -y msodbcsql18 \
    && rm -rf /var/lib/apt/lists/*

RUN if [[ $(uname -m) == "arm64" || $(uname -m) == "aarch64" ]]; \
 then \
 wget -q https://download.oracle.com/otn_software/linux/instantclient/191000/instantclient-basic-linux.arm64-19.10.0.0.0dbru.zip -O /oracle-instantclient.zip && \
 unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \
 else \
 wget -q https://download.oracle.com/otn_software/linux/instantclient/1917000/instantclient-basic-linux.x64-19.17.0.0.0dbru.zip -O /oracle-instantclient.zip && \
 unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \
 fi

ENV LD_LIBRARY_PATH=/instantclient

# Security patches for base image
# monitor no fixed version for
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-LIBTASN16-3061097
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-MARIADB105-2940589
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-BIND9-3027852
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-EXPAT-3023031 we are already installed the latest
RUN echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/backports.list
RUN apt-get -qq update \
    && apt-get -qq install -t bullseye-backports -y \
    curl \
    libpcre2-8-0 \
    postgresql-common \
    expat \
    bind9

# Required for Starting Ingestion Container in Docker Compose
# Provide Execute Permissions to shell script
COPY --chown=airflow:0 --chmod=775 ingestion/ingestion_dependency.sh /opt/airflow
# Required for Ingesting Sample Data
COPY --chown=airflow:0 --chmod=775 ingestion /home/airflow/ingestion

COPY --chown=airflow:0 --chmod=775 openmetadata-airflow-apis /home/airflow/openmetadata-airflow-apis


# Required for Airflow DAGs of Sample Data
#COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dags

USER airflow
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
ENV TZ=Asia/Shanghai

# Disable pip cache dir
# https://pip.pypa.io/en/stable/topics/caching/#avoiding-caching
ENV PIP_NO_CACHE_DIR=1
# Make pip silent
ENV PIP_QUIET=1

RUN pip install --upgrade pip

WORKDIR /home/airflow/openmetadata-airflow-apis
RUN pip install "."

WORKDIR /home/airflow/ingestion


# 提供要安装的引入依赖项的参数。默认为全部提供要安装的引入依赖项的参数。默认为全部
ARG INGESTION_DEPENDENCY="all"
RUN pip install ".[${INGESTION_DEPENDENCY}]"

# Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593
RUN echo "Image built for $(uname -m)"
RUN if [[ $(uname -m) != "aarch64" ]]; \
 then \
 pip install "ibm-db-sa~=0.4"; \
 fi

# bump python-daemon for https://github.com/apache/airflow/pull/29916
RUN pip install "python-daemon>=3.0.0"
# remove all airflow providers except for docker and cncf kubernetes
RUN pip freeze | grep "apache-airflow-providers" | grep --invert-match -E "docker|http|cncf" | xargs pip uninstall -y
# Uninstalling psycopg2-binary and installing psycopg2 instead 
# because the psycopg2-binary generates a architecture specific error 
# while authenticating connection with the airflow, psycopg2 solves this error
RUN pip uninstall psycopg2-binary -y
RUN pip install psycopg2 mysqlclient==2.1.1
# Make required folders for openmetadata-airflow-apis
RUN mkdir -p /opt/airflow/dag_generated_configs

EXPOSE 8080
# This is required as it's responsible to create airflow.cfg file
RUN airflow db init && rm -f /opt/airflow/airflow.db

(七)构建服务镜像

根目录下执行构建server:

docker build -t om-server:build -f docker/development/Dockerfile .

根目录下执行构建ingestion:

docker build -t om-ingestion:build -f ingestion/Dockerfile .

(八)部署新版服务

docker-compose -f docker/development/docker-compose-postgres.yml up -d

(九)访问服务,创建streamsets元数据采集

image-20240701165027755

image-20240701165054548

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/763522.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Webpack: 其他性能优化

概述 前面章节我们已经详细探讨 Webpack 中如何使用分包、代码压缩提升应用执行性能。除此之外,还有不少普适、细碎的方法,能够有效降低应用体积,提升网络分发性能,包括: 使用动态加载,减少首屏资源加载量&…

Adobe Photoshop 2024 v25.5.1 中文激活版下载以及安装方法教程

软件介绍 Adobe Photoshop 2024 v25.5.1 是Adobe公司的最新版图像处理软件,它提供了强大的图像编辑工具和智能自动化功能,包括图像修复、色彩校正和滤镜效果,以满足专业人士和业余爱好者的需求。这款软件还支持矢量图形设计和实时协作&#…

一维信号短时傅里叶变换域邻域降噪方法(MATLAB)

噪声在人类日常生活中无处不在,其会降低语音信号的质量和可懂度。在低信噪比的恶劣环境中,这种负面影响愈发严重。为了解决这个问题,众多研究人员在过去的几十年里提出了许多降噪算法。 根据原理的不同,降噪算法可大致分为五类:谱减法、最优滤波法、基于统计模型的方法、子空间…

Java案例打印乘法口诀表,三角形

目录 一问题: ​编辑二代码: 三运行结果: 四问题 二代码: 三运行结果: 一问题: 二代码: package 重修;import java.util.Random; import java.util.Scanner;public class first {public …

IDEA中Java源文件编译后class文件中文乱码

文章目录 一、设置 一、设置 路径:File -> Settings -> Bulid, Execution,Deployment -> Compiler -> Java Compiler

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] LYA的生日聚会(100分) - 三语言AC题解(Python/Java/Cpp)

🍭 大家好这里是清隆学长 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 💻 ACM银牌🥈| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 &#x1f…

WordPress网站如何做超级菜单(Mega Menu)?

大多数的网站菜单都是像以下这种条状的形式: 这种形式的是比较中规中矩的,大多数网站都在用的。当然还有另外一种菜单的表现形式,我们通常叫做“超级菜单”简称Mega Menu。网站的超级菜单(Mega Menu)是一种扩展的菜单&…

前端基础:CSS(篇一)

目录 css概述 CSS与HTML的关系 基本语法 行内样式表 代码 运行 内嵌样式表 代码 运行 外部样式表 代码 运行 选择器 标签选择器 代码 运行 id选择器 代码 运行 类选择器 代码 运行 选择器优先问题 通配选择器 选中所有的标签 代码 运行 选择器组…

2-22 基于matlab的NSGA-2求解多目标柔性车间调度算法

基于matlab的NSGA-2求解多目标柔性车间调度算法,计算最大完工时间、计算总延期时长、计算调度方案的总能耗、计算设备总负荷。输出四项结果,多次运行可寻找最佳的调度计划。程序已调通,可直接运行。 2-22 NSGA-2求解多目标柔性车间调度算法 - 小红书 (xi…

Java - 程序员面试笔记记录 实现 - Part2

2.1 输入输出流 流可以被看作一组有序的字节集合,即数据在两个设备间的传输。 字节流:以字节作为单位,读到一个字节就返回一个字节;InputStream & OutputStream。 字符流:使用字节流读到一个到多个字节先查询码…

传输距离3000M|低延迟|48K采样音频传输模块-SA356大功率发射模块

无线音频应用中,远距离音频传输在许多领域具有广泛的应用需求,例如大型会议系统、公共广播、户外活动和音乐演出等。为了满足这些需求,音频传输模块需要具备一些关键特性,包括长距离传输能力、高音质、低延迟、稳定性以及抗干扰能…

【第11章】MyBatis-Plus条件构造器(上)

文章目录 前言一、功能详解1. allEq2. eq3. ne4. gt5. ge6. lt7. le8. between9. notBetween10. like11. notLike12. likeLeft13. likeRight14. notLikeLeft15. notLikeRight16. isNull17. in18. notIn19. inSql20. notInSql21. eqSqlSince 3.5.622. gtSql Since 3.4.3.223. ge…

【CentOS7.6】yum 报错:Could not retrieve mirrorlist http://mirrorlist.centos.org

一、报错 1.报错内容如下 在使用 yum makecache 命令时报错,在 yum install -y xxx 的时候报错等等 [roothcss-ecs-a901 yum.repos.d]# yum makecache Loaded plugins: fastestmirror Determining fastest mirrors Could not retrieve mirrorlist http://mirrorl…

【鸿蒙学习笔记】Column迭代完备

属性含义介绍 Column({ space: 10 }) {Row() {Text(文本描述).size({ width: 80%, height: 60 }).backgroundColor(Color.Red)}.width(90%).height(90).backgroundColor(Color.Yellow) } .width(100%) // 宽度 .height(200) // 高度 .backgroundColor(Color.Pink) // 背景色 .…

【深圳大学算法设计与分析】 实验六 最大流应用问题 FF -> EK -> Dinic

目录 一、实验目的: 二、内容:棒球赛问题 三、实验要求 四、提交要求 ———————— 问题分析解释: ———————— 算法简解: Ford–Fulkerson 增广 Edmonds–Karp 算法 Dinic算法 Dinic和EK的区别: …

STM32第十四课:低功耗模式和RTC实时时钟

文章目录 需求一、低功耗模式1.睡眠模式2.停止模式3.待机模式 二、RTC实现实时时钟1.寄存器配置流程2.标准库开发3.主函数调用 三、需求实现代码 需求 1.实现睡眠模式、停止模式和待机模式。 2.实现RTC实时时间显示。 一、低功耗模式 电源对电子设备的重要性不言而喻&#xff…

【程序大侠传】异步架构应用回调数据接收接口偶发NPE

前序 在这片浩瀚的代码江湖中,各大门派林立,各自修炼独门绝技,江湖中的侠士们分别担任着开发、测试、产品和运维的角色,共同守护着这片数字化的疆域。 开发门派:代码剑宗 代码剑宗的弟子们精通各种编程语言&#xff…

【嵌入式】探索嵌入式世界:在ARM上构建俄罗斯方块游戏的奇妙之旅

文章目录 前言:1. 简介2. 总体设计思路及功能描述2.1 设计思路2.2 功能描述2.3 程序流程图 3. 各部分程序功能及详细说明3.1 游戏界面函数3.1.1 游戏界面中的图片显示3.1.2 游戏开始界面3.1.3 游戏主界面3.1.4 游戏结束广告界面3.1.5 游戏界面中的触摸反馈3.1.6 游戏…

【Spring Boot】基于 JPA 开发的文章管理系统(CRUD)

基于 JPA 开发的文章管理系统(CRUD) 1.实现文章实体2.实现数据持久层3.实现服务接口和服务接口的实现类3.1 创建服务接口3.2 编写服务接口的实现 4.实现增、删、改、查的控制层 API 功能4.1 获取文章列表4.2 根据 id 获取文章对象4.3 新增4.4 保存4.5 删…

第三届环境工程与可持续能源国际会议(EESE 2024)

随着全球气候变化和环境问题日益严峻,环境工程与可持续能源领域的研究和发展显得尤为重要。第三届环境工程与可持续能源国际会议(EESE 2024)作为这一领域的重要交流平台,将于2024年10月25日至27日在湖南长沙盛大召开。本次会议将汇…