1.maven依赖
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.0</version>
</dependency>
2.批量插入
@Autowired
ElasticsearchClient esClient;
public static int BATCH_SIZE = 1000;
public static int BYTE_SIZE = 10485760;
public static int REQUEST_SIZE = 10;
public boolean bulk2ES(List<EmpiPro> list) {
try {
log.info("并发请求数量:{},单批次插入数量:{},最大请求大小:{}", REQUEST_SIZE, BATCH_SIZE, BYTE_SIZE);
BulkListener<String> listener = getBulkIngester();
BulkIngester<String> bulkIngester = BulkIngester.of(b -> b
.client(esClient)
.maxOperations(BATCH_SIZE)
.maxSize(BYTE_SIZE)
.maxConcurrentRequests(REQUEST_SIZE)
.flushInterval(60, TimeUnit.SECONDS)
.listener(listener)
);
for (int i = 0; i < list.size(); i++) {
EmpiPro empiPro = list.get(i);
IndexOperation<EmpiPro> indexOperation = new IndexOperation.Builder<EmpiPro>()
.index(EMPI_INDEX_NAME)
.id(empiPro.getEmpi())
.document(empiPro)
.build();
BulkOperation bulkOperation = new BulkOperation.Builder()
.index(indexOperation)
.build();
bulkIngester.add(bulkOperation);
}
bulkIngester.close();
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public BulkListener<String> getBulkIngester() throws Exception {
BulkListener<String> listener = new BulkListener<String>() {
@Override
public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
log.info("【beforeBulk】批次[{}】 携带 【{}】 请求数量", executionId, contexts.size());
}
@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
log.info("【afterBulk】批次[{}】 提交数据量【{}】 提交结果【{}】", executionId, contexts.size(), response.errors() ? "失败" : "成功");
}
@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
log.error("Bulk request " + executionId + " failed", failure);
}
};
return listener;
}
补充es的配置类:
@Configuration
public class EsConfig {
@Value("${spring.data.elasticsearch.host}")
private String host;
@Value("${spring.data.elasticsearch.scheme}")
private String scheme;
@Value("${spring.data.elasticsearch.port}")
private int port;
@Value("${spring.data.elasticsearch.user}")
private String userName;
@Value("${spring.data.elasticsearch.pwd}")
private String password;
@Bean
public ElasticsearchClient elasticsearchClient(){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
RestClient restClient = RestClient.builder(new HttpHost(host, port,scheme))
.setHttpClientConfigCallback( httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
httpClientBuilder.setKeepAliveStrategy(((response, context) -> Duration.ofMinutes(5).toMillis()));
return httpClientBuilder;
}).build();
ElasticsearchTransport transport = new RestClientTransport(restClient,new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
}
}