Wikimedia To Opensearch

概览

  • Wikimedia ⇒ Kafka ⇒ Opensearch
  • Java Library:OKhttp3和OkHttp EventSource;
  • 生产者:Wikimedia:WikimediaChangeHandler和WikimediaChangeProducer;
  • 消费者:Opensearch:OpenSearchConsumer,opensearch-java + httpclient5;
  • https://stream.wikimedia.org/v2/stream/recentchange
  • https://esjewett.github.io/wm-eventsource-demo
  • https://codepen.io/Krinkle/pen/BwEKgW?editors=1010
  • Rest Api使用OpenSearch Dashboard,在线可使用Bonsai.io;

Kafka环境



version: '3.8'
services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    privileged: true
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LOG_DIRS: '/tmp/kafka-log'
      CLUSTER_ID: 'YWU3MzE1YmVmYzhiMTFlZT'

      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'

      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'

      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
  default:
    name: network-common
    external: true
  • vi /opt/kafka/config/kraft/server.properties
#controller.quorum.voters=1@localhost:9093
controller.quorum.voters=1@192.168.0.123:9093

#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092

#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092

Opensearch

Open Search Prerequisite

# disable memory paging and swapping performance
sudo swapoff -a

# edit sysctl config
sudo vi /etc/sysctl.conf

# add line to define desired value or change exist
vm.max_map_count=262144

# reload kernel parameter using sysctl
sudo sysctl -p

# verify change
cat /proc/sys/vm/max_map_count

Open Search Compose

 docker pull opensearchproject/opensearch:1.3.16 && \
        docker pull opensearchproject/opensearch-dashboards:1.3.16


version: '3.8'
services:
  opensearch:
    image: opensearchproject/opensearch:1.3.16
    container_name: opensearch
    environment:
      discovery.type: single-node
      plugins.security.disabled: true
      compatibility.override_main_response_version: true
    ports:
      - "9200:9200"
      - "9600:9600"
  opensearch-dashboard:
    image: opensearchproject/opensearch-dashboards:1.3.16
    container_name: opensearch-dashboard
    ports:
      - "5601:5601"
    environment:
      OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
      DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
  • http://192.168.0.123:5601
  • https://192.168.0.123:9200

Producer

Producer Dependency

<properties>
        <okhttp.eventsource>2.7.1</okhttp.eventsource>
</properties>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
</dependency>

<dependency>
    <groupId>com.launchdarkly</groupId>
    <artifactId>okhttp-eventsource</artifactId>
    <version>${okhttp.eventsource}</version>
</dependency>

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

WikimediaChangeHandler

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import java.lang.invoke.MethodHandles;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WikimediaChangeHandler implements EventHandler {

        private static final Logger logger = LoggerFactory
                .getLogger(MethodHandles.lookup().lookupClass());

        String topic;
        KafkaProducer<String, String> kafkaProducer;

        public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer,String topic) {
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    @Override
    public void onOpen() {}

    @Override
    public void onClosed() {
        kafkaProducer.close();
    }

    @Override
    public void onMessage(String event, MessageEvent messageEvent) {
            logger.error(messageEvent.getData());
        kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
    }

    @Override
    public void onComment(String comment) {}

    @Override
    public void onError(Throwable t) {
            logger.error("Stream Reading Failure!", t);
    }
}

WikimediaChangeProducer

import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.EventHandler;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.net.URI;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class WikimediaChangeProducer {

        public static void main(String[] args) throws InterruptedException {
                String bootstrapServers = "192.168.0.123:9092";

            // create Producer properties
            Properties properties = new Properties();
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

            String topic = "wikimedia.recentchange";

            EventHandler eventHandler = new WikimediaChangeHandler(kafkaProducer, topic);
            String url = "https://stream.wikimedia.org/v2/stream/recentchange";
            EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));

            builder.connectTimeout(Duration.ofMinutes(10));
            //注:需科学上网
            builder.proxy("127.0.0.1",1080);
            EventSource eventSource = builder.build();

            // start the producer in another thread
            eventSource.start();

            // we produce for 10 minutes and block the program until then
            TimeUnit.MINUTES.sleep(10);
          }

}

Consumer

Consumer Dependency

<properties>
        <opensearch.java>2.10.1</opensearch.java>
</properties>

<dependency>
    <groupId>org.opensearch.client</groupId>
    <artifactId>opensearch-java</artifactId>
    <version>${opensearch.java}</version>
</dependency>

<dependency>
    <groupId>org.apache.httpcomponents.client5</groupId>
    <artifactId>httpclient5</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

OpenSearchConsumer

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.IndexResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenSearchConsumer {

        private static final Logger logger = LoggerFactory
                .getLogger(MethodHandles.lookup().lookupClass());

        public static OpenSearchClient connect(String scheme,String hostName,int port) {
                final HttpHost host = new HttpHost(scheme,hostName,port);

            final ApacheHttpClient5TransportBuilder builder =
                    ApacheHttpClient5TransportBuilder.builder(host);

            builder.setHttpClientConfigCallback(hcb -> {
                    final PoolingAsyncClientConnectionManager manager =
                            PoolingAsyncClientConnectionManagerBuilder.create().build();
                    return hcb.setConnectionManager(manager);
            });

            final OpenSearchTransport transport = builder.build();
            return new OpenSearchClient(transport);
        }

        public static OpenSearchClient connect() {
                return connect("http","192.168.0.123",9200);
        }

        public static boolean exist(OpenSearchClient client,String indexName)
                throws OpenSearchException, IOException {
                var existRequest = ExistsRequest.of(fn -> fn.index(indexName));
                BooleanResponse exist = client.indices().exists(existRequest);
                return exist.value();
        }

        public static void createIndex(OpenSearchClient client,
                String indexName) throws OpenSearchException, IOException {
                var exist = exist(client,indexName);
                if (exist) {
                        System.out.printf("index %s already exist!\n",indexName);
                } else {
                        var createRequest = new CreateIndexRequest.Builder().index(indexName).build();
                        client.indices().create(createRequest);
                }
        }

        //GET /_cat/indices?v
        public static void deleteIndex(OpenSearchClient client,
                String indexName) throws OpenSearchException, IOException {
                var exist = exist(client,indexName);
                if (!exist) {
                        System.out.printf("index %s not exist!\n",indexName);
                } else {
                        var deleteRequest = new DeleteIndexRequest.Builder().index(indexName).build();
                        client.indices().delete(deleteRequest);
                }
        }

        public static KafkaConsumer<String,String> createKafkaConsumer(){
                var boostrapServer = "192.168.0.123:9092";
                var groupId = "group-wikimedia-opensearch";
                Properties prop = new Properties();
                prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,boostrapServer);
                prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
                prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
                prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
                //earliest,latest etc.
                prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
                return new KafkaConsumer<>(prop);
        }

        public static void consume(OpenSearchClient client,String indexName)
                throws OpenSearchException, IOException {
                var consumer = createKafkaConsumer();
                var topic = "wikimedia.recentchange";
                consumer.subscribe(Arrays.asList(topic));

                while (true) {
                        var consumerRecord = consumer.poll(Duration.ofMillis(3000));
                        int recordCount = consumerRecord.count();
                        logger.info("receive %d record!",recordCount);

                        for (ConsumerRecord<String,String> cr : consumerRecord) {
                                //send record into OpenSearch
                                IndexData indexData = new IndexData(cr.value());
                                var indexRequest = new IndexRequest.Builder<IndexData>()
                                        .index(indexName).document(indexData).build();
                                IndexResponse response = client.index(indexRequest);
                                System.out.println(response.id());
                        }
                }
        }

        public static void main(String[] sa) throws OpenSearchException, IOException {
                var client = connect();
                var indexName = "wikimedia-opensearch";
                //createIndex(client, indexName);

                System.out.println("consuming start...");
                consume(client,indexName);
                System.out.println("consuming end...");
        }


        static class IndexData {
                private String wikiMediaValue;

                public IndexData(String wikiMediaValue) {
                        this.wikiMediaValue = wikiMediaValue;
                }

                @Override
                public String toString() {
                        return String.format("IndexData{wikiMediaValue='%s'}",wikiMediaValue);
                }

                public String getWikiMediaValue() {
                        return wikiMediaValue;
                }
                public void setWikiMediaValue(String wikiMediaValue) {
                        this.wikiMediaValue = wikiMediaValue;
                }
        }

}

Testing

Create Topic

  • 创建主题,并启动WikimediaChangeProducer;
  • 注:需科学上网,需科学上网,需科学上网;
./kafka-topics.sh \
        --bootstrap-server 192.168.0.123:9092  \
        --topic wikimedia.recentchange --create \
        --partitions 3 --replication-factor 1
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"93bf8bf8-4c36-4a49-b226-3a9311d2c906","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257037},"id":484017699,"type":"log","namespace":2,"title":"Участник:Sherbek Qarshiyev","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062090,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Sherbek Qarshiyev (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q46898283","request_id":"161daa3c-9f11-44f1-b042-209a3acabcf8","id":"65fb4ae3-86fd-47ab-a85f-f0dd26b04b66","dt":"2024-04-25T12:52:32Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257038},"id":2201214347,"type":"edit","namespace":0,"title":"Q46898283","title_url":"https://www.wikidata.org/wiki/Q46898283","comment":"/* wbsetclaimvalue:1| */ [[Property:P1476]]: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults","timestamp":1714049552,"user":"KrBot","bot":true,"notify_url":"https://www.wikidata.org/w/index.php?diff=2136975570&oldid=2136975561&rcid=2201214347","minor":false,"patrolled":true,"length":{"old":60872,"new":60869},"revision":{"old":2136975561,"new":2136975570},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"<span dir=\"auto\"><span class=\"autocomment\">Определено значение для утверждения: </span></span> <a href=\"/wiki/Property:P1476\" title=\"название | название произведения (книги, фильма, газетной статьи, произведения исполнительского искусства, веб-сайта)\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"ru\" dir=\"ltr\">название</span> <span class=\"wb-itemlink-id\">(P1476)</span></span></a>: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","request_id":"4c136271-a6cd-4ff6-a6b7-429d769ba5ba","id":"14fcefb7-1f73-40a9-9acc-539d97aa06c3","dt":"2024-04-25T12:52:32Z","domain":"en.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257039},"id":1769512861,"type":"edit","namespace":2,"title":"User:Ali Ahwazi/sandbox2","title_url":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","comment":"","timestamp":1714049552,"user":"Ali Ahwazi","bot":false,"notify_url":"https://en.wikipedia.org/w/index.php?diff=1220709553&oldid=1220709505","minor":false,"length":{"old":26671,"new":31548},"revision":{"old":1220709505,"new":1220709553},"server_url":"https://en.wikipedia.org","server_name":"en.wikipedia.org","server_script_path":"/w","wiki":"enwiki","parsedcomment":""}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","request_id":"be816a4f-be27-4c49-830a-31161665401f","id":"ec47eca3-b6a4-4b4f-ac71-b44369de940d","dt":"2024-04-25T12:52:33Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257040},"id":519472837,"type":"edit","namespace":100,"title":"Portail:Châteaux/Articles récents","title_url":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","comment":"+ [[Château de Mielmont]]","timestamp":1714049553,"user":"OrlodrimBot","bot":true,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561930&oldid=214559720&rcid=519472837","minor":false,"patrolled":true,"length":{"old":779,"new":779},"revision":{"old":214559720,"new":214561930},"server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"+ <a href=\"/wiki/Ch%C3%A2teau_de_Mielmont\" title=\"Château de Mielmont\">Château de Mielmont</a>"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"b8ab3286-f69f-466a-9a00-c5c12c176001","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257041},"id":484017700,"type":"log","namespace":2,"title":"Участник:Виктория Никитенко","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062091,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Виктория Никитенко (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}

20:52:31.937 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","request_id":"739f7d34-fd23-4b37-ab77-87c95663aeda","id":"34ab12f5-7256-48e5-a4f8-b40bf95316ad","dt":"2024-04-25T12:52:33Z","domain":"oc.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257042},"id":10815379,"type":"new","namespace":0,"title":"Pairac lo Chasteu","title_url":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","comment":"Redireccion cap a [[Pairac (lo Chasteu)]]","timestamp":1714049553,"user":"PairacLoChasteu","bot":false,"notify_url":"https://oc.wikipedia.org/w/index.php?oldid=2436522&rcid=10815379","minor":false,"patrolled":false,"length":{"new":32},"revision":{"new":2436522},"server_url":"https://oc.wikipedia.org","server_name":"oc.wikipedia.org","server_script_path":"/w","wiki":"ocwiki","parsedcomment":"Redireccion cap a <a href=\"/wiki/Pairac_(lo_Chasteu)\" title=\"Pairac (lo Chasteu)\">Pairac (lo Chasteu)</a>"}
20:52:31.938 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","request_id":"7d6530d5-10a3-4628-a101-bc2e75b9a92f","id":"2eb4865e-b4ee-407c-9e19-b871967ed9e1","dt":"2024-04-25T12:52:30Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257043},"id":519472838,"type":"categorize","namespace":14,"title":"Catégorie:Article à référence nécessaire","title_url":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","comment":"[[:20e armée (Union soviétique)]] ajoutée à la catégorie","timestamp":1714049550,"user":"Le Petit Chat","bot":false,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561929&oldid=209346180&rcid=519472838","server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"<a href=\"/wiki/20e_arm%C3%A9e_(Union_sovi%C3%A9tique)\" title=\"20e armée (Union soviétique)\">20e armée (Union soviétique)</a> ajoutée à la catégorie"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","request_id":"3d4237d1-1994-4c65-b41a-84ee1f1a05c6","id":"d9e3f133-2e22-4022-9449-65a78ed83452","dt":"2024-04-25T12:52:31Z","domain":"commons.wikimedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257044},"id":2478318778,"type":"categorize","namespace":14,"title":"Category:Milford, Derbyshire","title_url":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","comment":"[[:File:The King William pub - geograph.org.uk - 5560373.jpg]] added to category","timestamp":1714049551,"user":"WereSpielChequers","bot":false,"notify_url":"https://commons.wikimedia.org/w/index.php?diff=871189763&oldid=871189716&rcid=2478318778","server_url":"https://commons.wikimedia.org","server_name":"commons.wikimedia.org","server_script_path":"/w","wiki":"commonswiki","parsedcomment":"<a href=\"/wiki/File:The_King_William_pub_-_geograph.org.uk_-_5560373.jpg\" title=\"File:The King William pub - geograph.org.uk - 5560373.jpg\">File:The King William pub - geograph.org.uk - 5560373.jpg</a> added to category"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","request_id":"c408c6e9-7e92-4f96-8a99-1d518f3af5ed","id":"e94ece30-3416-41e9-860d-d3547f2248d6","dt":"2024-04-25T12:52:33Z","domain":"ko.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257045},"type":"log","namespace":0,"title":"조상환","title_url":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","comment":"","timestamp":1714049553,"user":"Cho Sang Hwan","bot":false,"log_id":0,"log_type":"abusefilter","log_action":"hit","log_params":{"action":"edit","filter":"71","actions":"tag","log":1521905},"log_action_comment":"Cho Sang Hwan님이 [[조상환]]에서 \"edit\" 동작을 하여 [[특수:편집필터/71|필터 71]]이(가) 작동하였습니다. 조치: 태그 ([[특수:편집필터기록/1521905|자세한 사항]])","server_url":"https://ko.wikipedia.org","server_name":"ko.wikipedia.org","server_script_path":"/w","wiki":"kowiki","parsedcomment":""}
20:52:31.940 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","request_id":"d2dfebf5-df74-43d5-860e-be964ee93420","id":"27e197b2-cfb1-4c65-9717-09bb25438f08","dt":"2024-04-25T12:52:33Z","domain":"os.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257046},"id":1911685,"type":"new","namespace":14,"title":"Категори:Хуссар Голландийы чи амардис, уыдон","title_url":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","comment":"Ног фарс, йæ код райдайы афтæ: «[[Категори:Нидерландты чи амардис, уыдон]] [[Категори:Хуссар Голландийы зындгонд адæм|Амард]]»","timestamp":1714049553,"user":"Taamu","bot":false,"notify_url":"https://os.wikipedia.org/w/index.php?oldid=558822&rcid=1911685","minor":false,"patrolled":true,"length":{"new":167},"revision":{"new":558822},"server_url":"https://os.wikipedia.org","server_name":"os.wikipedia.org","server_script_path":"/w","wiki":"oswiki","parsedcomment":"Ног фарс, йæ код райдайы афтæ: «<a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%9D%D0%B8%D0%B4%D0%B5%D1%80%D0%BB%D0%B0%D0%BD%D0%B4%D1%82%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD\" title=\"Категори:Нидерландты чи амардис, уыдон\">Категори:Нидерландты чи амардис, уыдон</a> <a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D0%B7%D1%8B%D0%BD%D0%B4%D0%B3%D0%BE%D0%BD%D0%B4_%D0%B0%D0%B4%C3%A6%D0%BC\" title=\"Категори:Хуссар Голландийы зындгонд адæм\">Амард»</a>"}

……

Consume Message

  • 启动OpenSearchConsumer
# 此步骤可选
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
        --topic wikimedia.recentchange --from-beginning
GET /_cat/indices?v

GET _search
{
  "query": {
    "match_all": {}
  }
}

GET /index_name/_search
{
  "query": {
    "match_all": {}
  }
}

Outro

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/622737.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录-算法训练营day38【动态规划01:理论基础、斐波那契数、爬楼梯、使用最小花费爬楼梯】

代码随想录-035期-算法训练营【博客笔记汇总表】-CSDN博客 第九章 动态规划part01● 理论基础 ● 509. 斐波那契数 ● 70. 爬楼梯 ● 746. 使用最小花费爬楼梯 详细布置 今天正式开始动态规划&#xff01;理论基础 无论大家之前对动态规划学到什么程度&#xff0c;一定要先看…

【Linux】了解信号产生的五种方式

文章目录 正文前的知识准备kill 命令查看信号man手册查看信号信号的处理方法 认识信号产生的5种方式1. 工具2. 键盘3. 系统调用kill 向任意进程发送任意信号raise 给调用方发送任意信号abort 给调用方发送SIGABRT信号 4. 软件条件5. 异常 正文前的知识准备 kill 命令查看信号 …

项目8-头像的上传

js实现头像上传并且预览图片功能以及提交 - 掘金 (juejin.cn) 我们简单建立一个表 1.前端知识储备 1.1 addClass的使用 1.基本语法 addClass() 方法向被选元素添加一个或多个类。 该方法不会移除已存在的 class 属性&#xff0c;仅仅添加一个或多个 class 属性。 提示&…

CentOS使用Docker搭建Nacos结合内网穿透实现无公网IP远程登录本地管理平台

文章目录 1. Docker 运行Nacos2. 本地访问Nacos3. Linux安装Cpolar4. 配置Nacos UI界面公网地址5. 远程访问 Nacos UI界面6. 固定Nacos UI界面公网地址7. 固定地址访问Nacos Nacos是阿里开放的一款中间件,也是一款服务注册中心&#xff0c;它主要提供三种功能&#xff1a;持久化…

LeetCode题练习与总结:不同的二叉搜索树Ⅱ--95

一、题目描述 给你一个整数 n &#xff0c;请你生成并返回所有由 n 个节点组成且节点值从 1 到 n 互不相同的不同 二叉搜索树 。可以按 任意顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;[[1,null,2,null,3],[1,null,3,2],[2,1,3],[3,1,null,nul…

CNN/TCN/LSTM/BiGRU-Attention到底哪个模型效果最好?注意力机制全家桶来啦!

​ 声明&#xff1a;文章是从本人公众号中复制而来&#xff0c;因此&#xff0c;想最新最快了解各类智能优化算法及其改进的朋友&#xff0c;可关注我的公众号&#xff1a;强盛机器学习&#xff0c;不定期会有很多免费代码分享~ 目录 数据介绍 效果展示 原理简介 代…

Python---NumPy万字总结【此篇文章内容难度较大,线性代数模块】(3)

NumPy的应用&#xff08;3&#xff09; 向量 向量&#xff08;vector&#xff09;也叫矢量&#xff0c;是一个同时具有大小和方向&#xff0c;且满足平行四边形法则的几何对象。与向量相对的概念叫标量或数量&#xff0c;标量只有大小&#xff0c;绝大多数情况下没有方向。我们…

Ubuntu 超级终端Terminator常用使用技巧

Ubuntu 超级终端Terminator常用使用技巧 Terminator 是一款功能强大的终端模拟器&#xff0c;它特别适合于需要同时管理多个终端会话的用户。以下是如何在 Ubuntu 上使用 Terminator 的详细指南&#xff1a; 安装 Terminator 如果你的系统尚未安装 Terminator&#xff0c;你…

Prompt Engineering ,Fine-tuning , RAG ?

Prompt Engineering ,Fine-tuning , RAG 总结&#xff1a;1 prompt engineering2 RAG (Retrieval Augmented Generation)**RAG特点****RAG优势****RAG劣势** 3 微调&#xff08;Fine-tuning&#xff09;**微调特点****微调优势****微调劣势** 4 三者共性和区别5 RAG和微调的适应…

港中深「户外自重构蜗牛机器人集群」登Nature子刊!

在科幻电影《超能陆战队》中&#xff0c;我们见证了一种由成千上万个微小磁性单元组成的机器人通过磁力相互连接&#xff0c;形成各种复杂的三维结构。香港中文大学&#xff08;深圳&#xff09;林天麟教授团队致力于将这一科幻转化为现实&#xff0c;近年来开发了一系列自由形…

C++基础与深度解析 | 数组 | vector | string

文章目录 一、数组1.一维数组2.多维数组 二、vector三、string 一、数组 1.一维数组 在C中&#xff0c;数组用于存储具有相同类型和特定大小的元素集合。数组在内存中是连续存储的&#xff0c;并且支持通过索引快速访问元素。 数组的声明&#xff1a; 数组的声明指定了元素的…

【基础技能】Windows常用快捷键

最近做知识管理&#xff0c;梳理了下个人技能&#xff0c;存在好多基础技能都是一知半解&#xff0c;用的时候都是现搜现查&#xff0c;没有形成一个完整的知识体系&#xff0c;导致一些基础不牢靠&#xff0c;需要再次筑基&#xff01; 于是就翻阅了微软的官网&#xff0c;撸…

5.13网络编程

只要在一个电脑中的两个进程之间可以通过网络进行通信那么拥有公网ip的两个计算机的通信是一样的。但是一个局域网中的两台电脑上的虚拟机是不能进行通信的&#xff0c;因为这两个虚拟机在电脑中又有各自的局域网所以通信很难实现。 socket套接字是一种用于网络间进行通信的方…

Linux进程间几种通信机制

一. 简介 经过前一篇文章的学习&#xff0c; 我们知道Linux下有两种标准&#xff1a;system V标准和 Posix标准。 System V 和 POSIX 标准是操作系统提供的接口标准&#xff0c;它们规定了操作系统应该如何实现一些基本功能&#xff0c;比如线程、进程间通信、文件处理等。 …

【APM】在Kubernetes中搭建OpenTelemetry+Loki+Tempo+Grafana链路追踪(一)

文章目录 1、最终效果2、前提准备2、环境信息3、服务集成&#xff08;Opentelemetry ->Tempo&#xff09;3.1 上报链路数据3.1.1 下载opentelemetry-agent3.1.2 启动配置业务app3.1.3 配置opentelemetry输入输出3.1.4 配置grafana datasource3.1.4.1 配置tempo3.1.4.2 配置l…

基于RK3568的鸿蒙通行一体机方案项目

鸿蒙通行一体机方案以鸿蒙版AIoT-3568X人工智能主板为核心平台&#xff0c;搭载OpenHarmony操作系统&#xff0c;使用自研算法和国产芯片&#xff0c;可管可控&#xff0c;并提供身份识别以及其他外设配件生态链支持。 01 项目概述 项目使用场景 鸿蒙版通行一体机方案凭借自主…

【云计算小知识】云管理的作用是什么?

云计算已经成为推动企业数字化转型&#xff0c;提升运营效率的重要力量。而在这个过程中&#xff0c;云管理作为确保云计算环境稳定、高效运行的关键环节&#xff0c;其作用愈发凸显。今天我们小编就给大家详细介绍一下云管理的作用是什么&#xff1f; 云管理的作用是什么&…

探索渲染农场的高性能奥秘

在当今数字化的时代&#xff0c;渲染农场正逐渐成为许多行业不可或缺的强大工具。那么&#xff0c;为什么我们说渲染农场是高性能的计算机系统呢&#xff1f;让我们深入剖析其中关键要点。 “渲染农场”拥有大规模的计算资源。它由众多高性能的计算机节点组成&#xff0c;这些…

Maven、JavaWeb基础开发

1 Maven介绍 1、标准化的项目结构 2、标准化的构建流程 3、依赖管理 4、依赖范围 2 JavaWeb基础开发 2.1 Http协议 1 Http请求数据格式 2 Http响应数据格式 2.2 Web服务器&#xff08;Tomcat&#xff09; VTS、FileServer使用Tomcat部署&#xff1b; 其他服务单元TESLA S…

vue3.0(七) 计算属性(computed)

文章目录 1 计算属性&#xff08;computed &#xff09;1.1 computed使用1.2 computed使用场景1.4 computed的注意点1.4 computed的原理1.5 computed的示例 computed 和 Methods 的区别 1 计算属性&#xff08;computed &#xff09; 在 Vue 3 中&#xff0c;computed 是一个用…