Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.es</groupId>
    <artifactId>es-sparkstreaming</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch的客户端 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch依赖2.x的log4j -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>com.fasterxml.jackson.core</groupId>-->
        <!--            <artifactId>jackson-databind</artifactId>-->
        <!--            <version>2.11.1</version>-->
        <!--        </dependency>-->
        <!--        &lt;!&ndash; junit单元测试 &ndash;&gt;-->
        <!--        <dependency>-->
        <!--            <groupId>junit</groupId>-->
        <!--            <artifactId>junit</artifactId>-->
        <!--            <version>4.12</version>-->
        <!--        </dependency>-->
    </dependencies>
</project>

2.功能实现

package com.atguigu.es

import org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType

object SparkStreamingESTest {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
        ds.foreachRDD(
            rdd => {
                rdd.foreach(
                    data => {
                        val client = new RestHighLevelClient(
                            RestClient.builder(new HttpHost("localhost",9200, "http"))
                        )

                        val ss = data.split(" ")

                        val request = new IndexRequest()
                        request.index("product").id(ss(0))
                        val json =
                            s"""
                              | {  "data" : "${ss(1)}" }
                              |""".stripMargin
                        request.source(json, XContentType.JSON)

                        val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)
                        println(response.getResult)
                        client.close()
                    }
                )
            }
        )

        ssc.start()
        ssc.awaitTermination()
    }
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/99027.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Scala集合继承体系图

Scala集合简介 1&#xff09; Scala 的集合有三大类&#xff1a;序列 Seq、集Set、映射 Map&#xff0c;所有的集合都扩展自 Iterable特质。 2&#xff09; 对于几乎所有的集合类&#xff0c;Scala 都同时提供了可变和不可变的版本&#xff0c;分别位于以下两个包 不可变集合…

[C++网络协议] I/O复用

具有代表性的并发服务器端实现模型和方法&#xff1a; 多进程服务器&#xff1a;通过创建多个进程提供服务。 多路复用服务器&#xff1a;通过捆绑并统一管理I/O对象提供服务。✔ 多线程服务器&#xff1a;通过生成与客户端等量的线程提供服务。 目录 1. I/O复用 2. select函…

Vue:关于声明式导航中的 跳转、高亮、以及两个类名的定制

声明式导航-导航链接 文章目录 声明式导航-导航链接router-link的两大特点&#xff08;能跳转、能高亮&#xff09;声明式导航-两个类名定制两个高亮类名 实现导航高亮&#xff0c;实现方式其实&#xff0c;css&#xff0c;JavaScript , Vue ,都可以实现。其实关于路由导航&…

Android Studio 的github 工程克隆

上文介绍了Android Studio 里的"Git 建立和简单操作。本文介绍从github 上的工程fork 和clone到本地&#xff0c;然后学习和改进。 本文参考 https://learntodroid.com/how-to-use-git-and-github-in-android-studio/ 克隆clone Github 仓库&#xff1a; 先 Fork 你选择…

stm32之25.FLASH闪存

打开标准库 源码--- int main(void) {uint32_t d;Led_init();key_init();/* 初始化串口1波特率为115200bps&#xff0c;若发送/接收数据有乱码&#xff0c;请检查PLL */usart1_init(115200);printf("this is flash test\r\n");/* 解锁FLASH&#xff08;闪存&#xf…

跨境做独立站,如何低成本引流?你的流量密码在这

大家都知道&#xff0c;海外的消费习惯与国内不同&#xff0c;独立站一向是海外消费者的最喜欢的购物方式之一&#xff0c;这也吸引了许多跨境商家开设独立站。 独立站不同于其他的第三方平台&#xff0c;其他平台可以靠平台自身流量来获得转化&#xff0c;而独立站本身没有流…

java八股文面试[多线程]——ThreadLocal底层原理和使用场景

源码分析&#xff1a; ThreadLocal中定义了ThreadLocalMap静态内部类&#xff0c;该内部类中又定义了Entry内部类。 ThreadLocalMap定了 Entry数组。 Set方法&#xff1a; Get方法&#xff1a; Thread中定义了两个ThreaLocalMap成员变量&#xff1a; Spring使用ThreadLocal解…

虚拟机Ubuntu20.04 网络连接器图标开机不显示怎么办

执行以下指令&#xff1a; sudo service network-manager stop sudo rm /var/lib/NetworkManager/NetworkManager.state sudo service network-manager start

nginx压缩ttf文件 mine.types的作用

最近在运维过程中&#xff0c;前端提到发现在linux上下载某ttl文件&#xff08;字体文件&#xff09;太大&#xff0c;传输过程比较慢&#xff0c;于是就想着使用nginx的gzip进行压缩&#xff0c;经过不断尝试&#xff0c;终于发现在nginx的配置目录/etc/nginx/mine.types 文件…

【UIPickerView-UIDatePicker-应用程序对象 Objective-C语言】

一、今天我们来学习三个东西 1.UIPickerView-UIDatePicker-应用程序对象 1.首先,来看数据选择控件 数据选择控件, 大家对这个数据选择控件,是怎么理解的, 1)数据选择控件,首先,是不是得有数据, 2)然后呢,你还得让用户能够选择, 3)最后,你还得是一个控件儿 那…

国产ETLCloud VS 开源Kettle ETL对比分析

ETLCloud VS Kettle ETLCloud和kettle是目前国内使用最广泛的两款免费ETL工具&#xff0c;本文将从多个角色对ETLCloud和kettle进行对比&#xff0c;方便用户快速了解到两款产品的差异并根据自已的需求选择相应的工具。 ETLCloud提供了对kettle流程的迁移功能&#xff0c;所以…

文件上传漏洞-upload靶场5-12关

文件上传漏洞-upload靶场5-12关通关笔记&#xff08;windows环境漏洞&#xff09; 简介 ​ 在前两篇文章中&#xff0c;已经说了分析上传漏的思路&#xff0c;在本篇文章中&#xff0c;将带领大家熟悉winodws系统存在的一些上传漏洞。 upload 第五关 &#xff08;大小写绕过…

gRPC-Gateway 快速实战

今天来分享一波 gRPC-Gateway &#xff0c; 之前咱们有分享过什么是 gRPC 及其使用方式&#xff0c;可以看看这些关于 gRPC 的历史文章&#xff1a; gRPC介绍 gRPC 客户端调用服务端需要连接池吗&#xff1f; gRPC的拦截器 gRPC的认证 分享一下 gRPC- HTTP网关 I 今天主要是分…

AI:05 - 基于深度学习的道路交通信号灯的检测与识别

随着人工智能的快速发展,基于深度学习的视觉算法在道路交通领域中起到了重要作用。本文将探讨如何利用深度学习技术实现道路交通信号灯的检测与识别,通过多处代码实例展示技术深度。 道路交通信号灯是指示交通参与者行驶和停止的重要信号。准确地检测和识别交通信号灯对于智…

最大子数组和【贪心算法】

最大子数组和 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 class Solution {public int maxSubArray(int[] nums) {//记录最大结果&…

【陈老板赠书活动 - 11期】- 【MySQL从入门到精通】

![ 陈老老老板&#x1f9b8; &#x1f468;‍&#x1f4bb;本文专栏&#xff1a;赠书活动专栏&#xff08;为大家争取的福利&#xff0c;免费送书&#xff09; &#x1f468;‍&#x1f4bb;本文简述&#xff1a;生活就像海洋,只有意志坚强的人,才能到达彼岸。 &#x1f468;‍…

高亮img、pdf重点部分(html2canvas、pdfjs-dist、react-pdf)

可用业务场景 报销单据审批中&#xff0c;高亮发票部分 需求 后台返回一张图片或者pdf、返回一组坐标&#xff0c;坐标类型[number,number,number,number]&#xff0c;分别代表了x、y、width、height。需要根据坐标在图片上高亮出来坐标位置。如下图 高亮的坐标是&#xff1…

UDP 广播

一、UDP 通信图解 UDP通信、本地套接字_呵呵哒(&#xffe3;▽&#xffe3;)"的博客-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/132523536?spm1001.2014.3001.5501 #include <sys/types.h> #include <sys/socket > ssize_t sendto(in…

【校招VIP】前端算法考点之大数据相关

考点介绍&#xff1a; 大数据的关键技术分为分析技术和处理技术&#xff0c;可用于大数据分析的关键技术主要包括A/B测试&#xff0c;关联规则挖掘&#xff0c;数据挖掘&#xff0c;集成学习&#xff0c;遗传算法&#xff0c;机器学习&#xff0c;自然语言处理&#xff0c;模式…

计算机视觉-YOYO-

目录 计算机视觉-YOYO 目标检测发展历程 区域卷积神经网络(R-CNN) Fast R-CNN Mask R-CNN模型 比如SSD、YOLO(1, 2, 3)、R-FCN 目标检测基础概念 边界框、锚框和交并比 边界框&#xff08;bounding box&#xff09; 锚框&#xff08;Anchor box&#xff09; 交并比 …