1、背景
最近在调研seatunnel的时候,发现新版的seatunnel提供了一个web服务,可以用于图形化的创建数据同步任务,然后管理任务。这里面有个日志模块,可以查看任务的执行状态。其中有个取读数据条数和同步数据条数。很好奇这个数据是怎么来的。跟踪源码发现Hazelcast。所以对Hazelcast进行了研究。
2、Hazelcast是什么
Hazelcast是一个开源的分布式内存数据网格(In-Memory Data Grid,简称IMDG)解决方案,主要用于分布式计算和缓存
- 分布式数据结构:Hazelcast提供了一系列分布式数据结构,如Map、List、Set、Queue等,可以在集群中进行分布式存储和访问。
- 缓存:Hazelcast提供了分布式缓存功能,可以将数据存储在内存中,以提供快速的访问速度。它支持多种缓存策略,如LRU(Least Recently Used)、LFU(Least Frequently Used)和TTL(Time to Live)等。
- 分布式计算:Hazelcast支持将计算任务分布到集群中的多个节点上进行并行处理,提高应用程序的处理能力。
- 高可靠性:Hazelcast使用分布式复制和故障转移机制,确保数据的可靠性和高可用性。它具有自动故障检测和恢复机制,可以在节点故障时自动迁移数据和任务。
- 扩展性:Hazelcast可以方便地进行水平扩展,通过添加更多的节点来增加集群的处理能力。它支持动态添加和移除节点,而无需停止应用程序。
- 集成性:Hazelcast提供了与各种应用程序和框架的集成,如Spring、Hibernate、JCache等。它还支持与其他分布式系统的集成,如Apache Kafka、Apache Ignite等。
- 多语言支持:Hazelcast提供了对多种编程语言的支持,包括Java、C#、C++、Python和Node.js等
3、应用场景
- 缓存:Hazelcast可以作为高性能的分布式缓存解决方案,用于缓存应用程序中的热点数据。
- 分布式计算:Hazelcast提供了分布式计算框架,可以将计算任务分布到集群中的多个节点上进行并行处理,适用于金融、电信、电子商务等行业。
- 实时数据处理:Hazelcast可以处理实时数据流,支持数据的实时处理和分析,适用于构建实时应用,如实时监控系统、实时推荐系统等。
- 分布式会话管理:Hazelcast可以用于管理分布式会话,实现会话的共享和负载均衡。
- 分布式数据存储:Hazelcast可以作为分布式数据存储解决方案,用于在多个节点间共享数据。
4、与Redis对比
可以看到Hazelcast可以理解为一个NoSQL,那就不得不说我们用的最多的Redis了。两者都提供了丰富的数据接口,比如map、list等等。那为什么不直接用Redis呢。我理解有下边几个方面的原因:
- 使用Redis需要额外的环境搭建,而Hazelcast如果使用内嵌的方式,则不需要额外的组件引入,做到了开箱即用。
- Hazelcast用的是应用服务器自身的内存,扩展性强,不需要外部内存(有点类似Caffeine)。
- Hazelcast对过期时间的支持没有Redis那么灵活。
- Hazelcast可以进行分布式计算。我们将数据存入到多个节点,通过分布式计算的api,从多个节点上读取数据,然后计算并返回。这也算是相较Redis的一个优势。
- Redis可以供多个应用使用共享数据,与应用解耦。Hazelcast一般使用需要嵌入应用。
如果不考虑分布式计算等场景,完全可以看那个方便。如果公司没有基础架构,并且是自己业务线的产品。那完全可以使用Hazelcast。免去了Redis的搭建、运维、管理等环境。否则还是老老实实的用Redis吧。
但是如果存在实时流式处理,那么使用Hazelcast的分布式特性是个不错的选择。比如咱们做一个监控系统,需要处理很多业务系统的数据,总不能单纯在Redis或者Mysql或者单机内存中处理吧。可以考虑试试Hazelcast。
5、怎么用
上边说了一堆的理论,说到底怎么用呢,这里以SpringBoot嵌入式为例。
- maven中添加依赖
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>你的Hazelcast版本号</version> </dependency> <!-- Hazelcast Spring Boot 集成(如果需要) --> <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast-spring-boot</artifactId> <version>你的Hazelcast Spring Boot集成版本号</version> </dependency>
- 代码
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.map.IMap; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HazelcastService { @Autowired private HazelcastInstance hazelcastInstance; public void putData() { IMap<String, String> map = hazelcastInstance.getMap("my-map"); map.put("key1", "value1"); } public String getData(String key) { IMap<String, String> map = hazelcastInstance.getMap("my-map"); return map.get(key); } }
- 启动成功
分别启动两个服务,可以看到有两个Hazelcast节点组成的集群
6、源码
源码我想从两个方面去看
1、seatunnel-web提供的查看监控
- 找到查看日志接口
@RequestMapping("/seatunnel/api/v1/task")
@RestController
public class TaskInstanceController {
@Autowired ITaskInstanceService<SeaTunnelJobInstanceDto> taskInstanceService;
@GetMapping("/jobMetrics")
@ApiOperation(value = "get the jobMetrics list ", httpMethod = "GET")
public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(
@RequestAttribute(name = "userId") Integer userId,
@RequestParam(name = "jobDefineName", required = false) String jobDefineName,
@RequestParam(name = "executorName", required = false) String executorName,
@RequestParam(name = "stateType", required = false) String stateType,
@RequestParam(name = "startDate", required = false) String startTime,
@RequestParam(name = "endDate", required = false) String endTime,
@RequestParam("syncTaskType") String syncTaskType,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
return taskInstanceService.getSyncTaskInstancePaging(
userId,
jobDefineName,
executorName,
stateType,
startTime,
endTime,
syncTaskType,
pageNo,
pageSize);
}
}
- 进入getSyncTaskInstancePaging方法
public Result<PageInfo<SeaTunnelJobInstanceDto>> getSyncTaskInstancePaging(
Integer userId,
String jobDefineName,
String executorName,
String stateType,
String startTime,
String endTime,
String syncTaskType,
Integer pageNo,
Integer pageSize) {
JobDefinition jobDefinition = null;
IPage<SeaTunnelJobInstanceDto> jobInstanceIPage;
if (jobDefineName != null) {
jobDefinition = jobDefinitionDao.getJobByName(jobDefineName);
}
Result<PageInfo<SeaTunnelJobInstanceDto>> result = new Result<>();
PageInfo<SeaTunnelJobInstanceDto> pageInfo = new PageInfo<>(pageNo, pageSize);
result.setData(pageInfo);
baseService.putMsg(result, Status.SUCCESS);
Date startDate = dateConverter(startTime);
Date endDate = dateConverter(endTime);
if (jobDefinition != null) {
jobInstanceIPage =
jobInstanceDao.queryJobInstanceListPaging(
new Page<>(pageNo, pageSize),
startDate,
endDate,
jobDefinition.getId(),
syncTaskType);
} else {
jobInstanceIPage =
jobInstanceDao.queryJobInstanceListPaging(
new Page<>(pageNo, pageSize), startDate, endDate, null, syncTaskType);
}
List<SeaTunnelJobInstanceDto> records = jobInstanceIPage.getRecords();
if (CollectionUtils.isEmpty(records)) {
return result;
}
addJobDefineNameToResult(records);
addRunningTimeToResult(records);
// 关键代码,上边都是从本地数据库中获取的,这里会去Hazelcast中获取数据,并更新本地数据
jobPipelineSummaryMetrics(records, syncTaskType, userId);
pageInfo.setTotal((int) jobInstanceIPage.getTotal());
pageInfo.setTotalList(records);
result.setData(pageInfo);
return result;
}
- 进入代码jobPipelineSummaryMetrics(records, syncTaskType, userId);
private void jobPipelineSummaryMetrics(
List<SeaTunnelJobInstanceDto> records, String syncTaskType, Integer userId) {
try {
ArrayList<Long> jobInstanceIdList = new ArrayList<>();
HashMap<Long, Long> jobInstanceIdAndJobEngineIdMap = new HashMap<>();
for (SeaTunnelJobInstanceDto jobInstance : records) {
if (jobInstance.getId() != null && jobInstance.getJobEngineId() != null) {
jobInstanceIdList.add(jobInstance.getId());
jobInstanceIdAndJobEngineIdMap.put(
jobInstance.getId(), Long.valueOf(jobInstance.getJobEngineId()));
}
}
Map<Long, JobSummaryMetricsRes> jobSummaryMetrics =
// 获取每条日志数据的监控数据
jobMetricsService.getALLJobSummaryMetrics(
userId,
jobInstanceIdAndJobEngineIdMap,
jobInstanceIdList,
syncTaskType);
for (SeaTunnelJobInstanceDto taskInstance : records) {
if (jobSummaryMetrics.get(taskInstance.getId()) != null) {
taskInstance.setWriteRowCount(
jobSummaryMetrics.get(taskInstance.getId()).getWriteRowCount());
taskInstance.setReadRowCount(
jobSummaryMetrics.get(taskInstance.getId()).getReadRowCount());
}
}
} catch (Exception e) {
for (SeaTunnelJobInstanceDto taskInstance : records) {
log.error(
"instance {} {} set instance and engine id error", taskInstance.getId(), e);
}
}
}
- 进入jobMetricsService.getALLJobSummaryMetrics( userId,jobInstanceIdAndJobEngineIdMap, jobInstanceIdList, syncTaskType);
@Override
public Map<Long, JobSummaryMetricsRes> getALLJobSummaryMetrics(
@NonNull Integer userId,
@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
@NonNull List<Long> jobInstanceIdList,
@NonNull String syncTaskType) {
log.info("jobInstanceIdAndJobEngineIdMap={}", jobInstanceIdAndJobEngineIdMap);
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY, userId);
List<JobInstance> allJobInstance = jobInstanceDao.getAllJobInstance(jobInstanceIdList);
if (allJobInstance.isEmpty()) {
log.warn(
"getALLJobSummaryMetrics : allJobInstance is empty, task id list is {}",
jobInstanceIdList);
return new HashMap<>();
}
Map<Long, JobSummaryMetricsRes> result = null;
Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine =
// 从Hazelcast集群节点中获取监控数据
getAllRunningJobMetricsFromEngine(
allJobInstance.get(0).getEngineName(),
allJobInstance.get(0).getEngineVersion());
// 通过不同的方式获取数据
if (syncTaskType.equals("BATCH")) {
result =
getMatricsListIfTaskTypeIsBatch(
allJobInstance,
userId,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
} else if (syncTaskType.equals("STREAMING")) {
result =
getMatricsListIfTaskTypeIsStreaming(
allJobInstance,
userId,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
}
log.info("result is {}", result == null ? "null" : result.toString());
return result;
}
- 进入方法getAllRunningJobMetricsFromEngine(allJobInstance.get(0).getEngineName(),allJobInstance.get(0).getEngineVersion());
private Map<Long, HashMap<Integer, JobMetrics>> getAllRunningJobMetricsFromEngine(
String engineName, String engineVersion) {
Engine engine = new Engine(engineName, engineVersion);
IEngineMetricsExtractor engineMetricsExtractor =
(new EngineMetricsExtractorFactory(engine)).getEngineMetricsExtractor();
// 看名字就知道这个是获取任务的监控数据的
return engineMetricsExtractor.getAllRunningJobMetrics();
}
- 进入engineMetricsExtractor.getAllRunningJobMetrics();
@Override
public Map<Long, HashMap<Integer, JobMetrics>> getAllRunningJobMetrics() {
HashMap<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsHashMap = new HashMap<>();
try {
// 是不是很熟悉。seatunnelproxy,一看就是从这里开始真正和Hazelcast交互,获取数据了
String allJobMetricsContent = seaTunnelEngineProxy.getAllRunningJobMetricsContent();
if (StringUtils.isEmpty(allJobMetricsContent)) {
return new HashMap<>();
}
JsonNode jsonNode = JsonUtils.stringToJsonNode(allJobMetricsContent);
Iterator<JsonNode> iterator = jsonNode.iterator();
while (iterator.hasNext()) {
LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap();
JsonNode next = iterator.next();
JsonNode sourceReceivedCount = next.get("metrics").get("SourceReceivedCount");
Long jobEngineId = 0L;
if (sourceReceivedCount != null && sourceReceivedCount.isArray()) {
for (JsonNode node : sourceReceivedCount) {
jobEngineId = node.get("tags").get("jobId").asLong();
Integer pipelineId = node.get("tags").get("pipelineId").asInt();
JobMetrics currPipelineMetrics =
getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);
currPipelineMetrics.setReadRowCount(
currPipelineMetrics.getReadRowCount() + node.get("value").asLong());
}
}
JsonNode sinkWriteCount = next.get("metrics").get("SinkWriteCount");
if (sinkWriteCount != null && sinkWriteCount.isArray()) {
for (JsonNode node : sinkWriteCount) {
jobEngineId = node.get("tags").get("jobId").asLong();
Integer pipelineId = node.get("tags").get("pipelineId").asInt();
JobMetrics currPipelineMetrics =
getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);
currPipelineMetrics.setWriteRowCount(
currPipelineMetrics.getWriteRowCount()
+ node.get("value").asLong());
}
}
JsonNode sinkWriteQPS = next.get("metrics").get("SinkWriteQPS");
if (sinkWriteQPS != null && sinkWriteQPS.isArray()) {
for (JsonNode node : sinkWriteQPS) {
Integer pipelineId = node.get("tags").get("pipelineId").asInt();
JobMetrics currPipelineMetrics =
getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);
currPipelineMetrics.setWriteQps(
currPipelineMetrics.getWriteQps()
+ (new Double(node.get("value").asDouble())).longValue());
}
}
JsonNode sourceReceivedQPS = next.get("metrics").get("SourceReceivedQPS");
if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) {
for (JsonNode node : sourceReceivedQPS) {
Integer pipelineId = node.get("tags").get("pipelineId").asInt();
JobMetrics currPipelineMetrics =
getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);
currPipelineMetrics.setReadQps(
currPipelineMetrics.getReadQps()
+ (new Double(node.get("value").asDouble())).longValue());
}
}
JsonNode cdcRecordEmitDelay = next.get("metrics").get("CDCRecordEmitDelay");
if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) {
Map<Integer, List<Long>> dataMap = new HashMap<>();
for (JsonNode node : cdcRecordEmitDelay) {
Integer pipelineId = node.get("tags").get("pipelineId").asInt();
long value = node.get("value").asLong();
dataMap.computeIfAbsent(pipelineId, n -> new ArrayList<>()).add(value);
}
dataMap.forEach(
(key, value) -> {
JobMetrics currPipelineMetrics =
getOrCreatePipelineMetricsMapStatusRunning(metricsMap, key);
OptionalDouble average =
value.stream().mapToDouble(a -> a).average();
currPipelineMetrics.setRecordDelay(
Double.valueOf(
average.isPresent()
? average.getAsDouble()
: 0)
.longValue());
});
}
log.info("jobEngineId={},metricsMap={}", jobEngineId, metricsMap);
allRunningJobMetricsHashMap.put(jobEngineId, metricsMap);
}
} catch (Exception e) {
e.printStackTrace();
}
return allRunningJobMetricsHashMap;
}
- 到这里如果有实际操作过seatunnel-web界面的同学们肯定知道,这个基本就已经触及监控数据的来源了。
- 进入seaTunnelEngineProxy.getAllRunningJobMetricsContent();
public String getAllRunningJobMetricsContent() {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
try {
return seaTunnelClient.getJobClient().getRunningJobMetrics();
} finally {
seaTunnelClient.close();
}
}
- 代码很简单,没啥说的继续跟踪
public String getRunningJobMetrics() {
return (String)this.hazelcastClient.requestOnMasterAndDecodeResponse(SeaTunnelGetRunningJobMetricsCodec.encodeRequest(), SeaTunnelGetRunningJobMetricsCodec::decodeResponse);
}
- hazelcastClient,是不是眼熟。是的,seatunnel对hazelcast的调用,封装了很深。马上就胜利了,继续跟代码
public <S> S requestOnMasterAndDecodeResponse(@NonNull ClientMessage request, @NonNull Function<ClientMessage, Object> decoder) {
if (request == null) {
throw new NullPointerException("request is marked non-null but is null");
} else if (decoder == null) {
throw new NullPointerException("decoder is marked non-null but is null");
} else {
UUID masterUuid = this.hazelcastClient.getClientClusterService().getMasterMember().getUuid();
return this.requestAndDecodeResponse(masterUuid, request, decoder);
}
}
- 获取到我们要从那个hazelcast节点获取数据的信息,然后去调用
public <S> S requestAndDecodeResponse(@NonNull UUID uuid, @NonNull ClientMessage request, @NonNull Function<ClientMessage, Object> decoder) {
if (uuid == null) {
throw new NullPointerException("uuid is marked non-null but is null");
} else if (request == null) {
throw new NullPointerException("request is marked non-null but is null");
} else if (decoder == null) {
throw new NullPointerException("decoder is marked non-null but is null");
} else {
ClientInvocation invocation = new ClientInvocation(this.hazelcastClient, request, (Object)null, uuid);
try {
ClientMessage response = (ClientMessage)invocation.invoke().get();
return this.serializationService.toObject(decoder.apply(response));
} catch (InterruptedException var6) {
Thread.currentThread().interrupt();
return null;
} catch (Throwable var7) {
throw ExceptionUtil.rethrow(var7);
}
}
}
- 着重记忆一下ClientInvocation和ClientMessage。因为在跟踪hazelcase-api的代码的时候,就是用的这里。
- 在下边就是调用hazelcast的客户端,发送请求,然后get阻塞,直到数据返回。
2、Hazelcast-api
- hazelcast的api调用,我们以下面这段代码为入口开始看源码。
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HazelcastService {
@Autowired
private HazelcastInstance hazelcastInstance;
public void putData() {
IMap<String, String> map = hazelcastInstance.getMap("my-map");
map.put("key1", "value1");
}
public String getData(String key) {
IMap<String, String> map = hazelcastInstance.getMap("my-map");
return map.get(key);
}
}
- 可以看到hazelcast的使用基本和java的数据结构使用一样。所以如果我们要使用hazelcast还是很方便入手的。
- 进入hazelcast封装的map的put方法
@Override
public V get(@Nonnull Object key) {
checkNotNull(key, NULL_KEY_IS_NOT_ALLOWED);
return toObject(getInternal(key));
}
- 进入getInternal方法
protected Object getInternal(Object key) {
// TODO: action for read-backup true is not well tested
Data keyData = toDataWithStrategy(key);
if (mapConfig.isReadBackupData()) {
Object fromBackup = readBackupDataOrNull(keyData);
if (fromBackup != null) {
return fromBackup;
}
}
MapOperation operation = operationProvider.createGetOperation(name, keyData);
operation.setThreadId(getThreadId());
return invokeOperation(keyData, operation);
}
- 将参数封装为了hazelcast的map数据结构,并调用操作方法
private Object invokeOperation(Data key, MapOperation operation) {
int partitionId = partitionService.getPartitionId(key);
operation.setThreadId(getThreadId());
try {
Object result;
if (statisticsEnabled) {
long startTimeNanos = Timer.nanos();
Future future = operationService
.createInvocationBuilder(SERVICE_NAME, operation, partitionId)
.setResultDeserialized(false)
.invoke();
result = future.get();
incrementOperationStats(operation, localMapStats, startTimeNanos);
} else {
Future future = operationService
.createInvocationBuilder(SERVICE_NAME, operation, partitionId)
.setResultDeserialized(false)
.invoke();
result = future.get();
}
return result;
} catch (Throwable t) {
throw rethrow(t);
}
}
- 执行方法,并返回了一个InvocationFuture,这个InvocationFuture对象是集成了CompletableFuture的一个future,所以如果需要,也可以使用多线程编排,执行复杂查询的。
@Override
public InvocationFuture invoke() {
op.setServiceName(serviceName);
Invocation invocation;
if (target == null) {
op.setPartitionId(partitionId).setReplicaIndex(replicaIndex);
invocation = new PartitionInvocation(
context, op, doneCallback, tryCount, tryPauseMillis, callTimeout, resultDeserialized,
failOnIndeterminateOperationState, connectionManager);
} else {
invocation = new TargetInvocation(
context, op, target, doneCallback, tryCount, tryPauseMillis,
callTimeout, resultDeserialized, connectionManager);
}
return async
? invocation.invokeAsync()
: invocation.invoke();
}
- 可以看到真正去执行的是不同类型的Invocation。并且可以根据是同步还是异步,调用不同的执行方法,我们直接看invoke方法。
private void invoke0(boolean isAsync) {
if (invokeCount > 0) {
throw new IllegalStateException("This invocation is already in progress");
} else if (isActive()) {
throw new IllegalStateException(
"Attempt to reuse the same operation in multiple invocations. Operation is " + op);
}
try {
setCallTimeout(op, callTimeoutMillis);
setCallerAddress(op, context.thisAddress);
op.setNodeEngine(context.nodeEngine);
boolean isAllowed = context.operationExecutor.isInvocationAllowed(op, isAsync);
if (!isAllowed && !isMigrationOperation(op)) {
throw new IllegalThreadStateException(Thread.currentThread() + " cannot make remote call: " + op);
}
doInvoke(isAsync);
} catch (Exception e) {
handleInvocationException(e);
}
}
- 继续进入doInvoke方法
private void doInvoke(boolean isAsync) {
if (!engineActive()) {
return;
}
invokeCount++;
setInvocationTime(op, context.clusterClock.getClusterTime());
// We'll initialize the invocation before registering it. Invocation monitor iterates over
// registered invocations and it must observe completely initialized invocations.
Exception initializationFailure = null;
try {
initInvocationTarget();
} catch (Exception e) {
// We'll keep initialization failure and notify invocation with this failure
// after invocation is registered to the invocation registry.
initializationFailure = e;
}
if (!context.invocationRegistry.register(this)) {
return;
}
if (initializationFailure != null) {
notifyError(initializationFailure);
return;
}
if (isLocal()) {
doInvokeLocal(isAsync);
} else {
doInvokeRemote();
}
}
- 如果是本地调用,进入doInvokeLocal。如果是远程调用进入doInvokeRemote。如果是springboot直接引入的情况下,进入本地调用
- 调用远程的hazelcast集群的。进入doInvokeRemote方法。
- 例子中是本地调用,所以进入doInvokeLocal,这里的代码本文就不继续跟进去,如果感兴趣可以debug进去看看,大概的逻辑是调用execute方法,然后将MapOperation(Operation对象)放到一个队列中,线程池异步执行,我们着重看下MapOperation。
public abstract class MapOperation extends AbstractNamedOperation
implements IdentifiedDataSerializable, ServiceNamespaceAware {
private static final boolean ASSERTION_ENABLED = MapOperation.class.desiredAssertionStatus();
protected transient MapService mapService;
protected transient RecordStore<Record> recordStore;
protected transient MapContainer mapContainer;
protected transient MapServiceContext mapServiceContext;
protected transient MapEventPublisher mapEventPublisher;
protected transient boolean createRecordStoreOnDemand = true;
protected transient boolean disposeDeferredBlocks = true;
private transient boolean canPublishWanEvent;
public MapOperation() {
}
public MapOperation(String name) {
this.name = name;
}
@Override
public final void beforeRun() throws Exception {
super.beforeRun();
mapService = getService();
mapServiceContext = mapService.getMapServiceContext();
mapEventPublisher = mapServiceContext.getMapEventPublisher();
try {
recordStore = getRecordStoreOrNull();
if (recordStore == null) {
mapContainer = mapServiceContext.getMapContainer(name);
} else {
mapContainer = recordStore.getMapContainer();
}
} catch (Throwable t) {
disposeDeferredBlocks();
throw rethrow(t, Exception.class);
}
canPublishWanEvent = canPublishWanEvent(mapContainer);
assertNativeMapOnPartitionThread();
innerBeforeRun();
}
protected void innerBeforeRun() throws Exception {
if (recordStore != null) {
recordStore.beforeOperation();
}
// Concrete classes can override this method.
}
@Override
public final void run() {
try {
runInternal();
} catch (NativeOutOfMemoryError e) {
rerunWithForcedEviction();
}
}
protected void runInternal() {
// Intentionally empty method body.
// Concrete classes can override this method.
}
private void rerunWithForcedEviction() {
try {
runWithForcedEvictionStrategies(this);
} catch (NativeOutOfMemoryError e) {
disposeDeferredBlocks();
throw e;
}
}
@Override
public final void afterRun() throws Exception {
afterRunInternal();
disposeDeferredBlocks();
super.afterRun();
}
protected void afterRunInternal() {
// Intentionally empty method body.
// Concrete classes can override this method.
}
@Override
public void afterRunFinal() {
if (recordStore != null) {
recordStore.afterOperation();
}
}
protected void assertNativeMapOnPartitionThread() {
if (!ASSERTION_ENABLED) {
return;
}
assert mapContainer.getMapConfig().getInMemoryFormat() != NATIVE
|| getPartitionId() != GENERIC_PARTITION_ID
: "Native memory backed map operations are not allowed to run on GENERIC_PARTITION_ID";
}
ILogger logger() {
return getLogger();
}
protected final CallerProvenance getCallerProvenance() {
return disableWanReplicationEvent() ? CallerProvenance.WAN : CallerProvenance.NOT_WAN;
}
private RecordStore getRecordStoreOrNull() {
int partitionId = getPartitionId();
if (partitionId == -1) {
return null;
}
PartitionContainer partitionContainer = mapServiceContext.getPartitionContainer(partitionId);
if (createRecordStoreOnDemand) {
return partitionContainer.getRecordStore(name);
} else {
return partitionContainer.getExistingRecordStore(name);
}
}
@Override
public void onExecutionFailure(Throwable e) {
disposeDeferredBlocks();
super.onExecutionFailure(e);
}
@Override
public void logError(Throwable e) {
ILogger logger = getLogger();
if (e instanceof NativeOutOfMemoryError) {
Level level = this instanceof BackupOperation ? Level.FINEST : Level.WARNING;
logger.log(level, "Cannot complete operation! -> " + e.getMessage());
} else {
// we need to introduce a proper method to handle operation failures (at the moment
// this is the only place where we can dispose native memory allocations on failure)
disposeDeferredBlocks();
super.logError(e);
}
}
void disposeDeferredBlocks() {
if (!disposeDeferredBlocks
|| recordStore == null
|| recordStore.getInMemoryFormat() != NATIVE) {
return;
}
recordStore.disposeDeferredBlocks();
}
private boolean canPublishWanEvent(MapContainer mapContainer) {
boolean canPublishWanEvent = mapContainer.isWanReplicationEnabled()
&& !disableWanReplicationEvent();
if (canPublishWanEvent) {
mapContainer.getWanReplicationDelegate().doPrepublicationChecks();
}
return canPublishWanEvent;
}
@Override
public String getServiceName() {
return MapService.SERVICE_NAME;
}
public boolean isPostProcessing(RecordStore recordStore) {
MapDataStore mapDataStore = recordStore.getMapDataStore();
return mapDataStore.isPostProcessingMapStore()
|| !mapContainer.getInterceptorRegistry().getInterceptors().isEmpty();
}
public void setThreadId(long threadId) {
throw new UnsupportedOperationException();
}
public long getThreadId() {
throw new UnsupportedOperationException();
}
protected final void invalidateNearCache(List<Data> keys) {
if (!mapContainer.hasInvalidationListener() || isEmpty(keys)) {
return;
}
Invalidator invalidator = getNearCacheInvalidator();
for (Data key : keys) {
invalidator.invalidateKey(key, name, getCallerUuid());
}
}
// TODO: improve here it's possible that client cannot manage to attach listener
public final void invalidateNearCache(Data key) {
if (!mapContainer.hasInvalidationListener() || key == null) {
return;
}
Invalidator invalidator = getNearCacheInvalidator();
invalidator.invalidateKey(key, name, getCallerUuid());
}
/**
* This method helps to add clearing Near Cache event only from
* one-partition which matches partitionId of the map name.
*/
protected final void invalidateAllKeysInNearCaches() {
if (mapContainer.hasInvalidationListener()) {
int partitionId = getPartitionId();
Invalidator invalidator = getNearCacheInvalidator();
if (partitionId == getNodeEngine().getPartitionService().getPartitionId(name)) {
invalidator.invalidateAllKeys(name, getCallerUuid());
} else {
invalidator.forceIncrementSequence(name, getPartitionId());
}
}
}
private Invalidator getNearCacheInvalidator() {
MapNearCacheManager mapNearCacheManager = mapServiceContext.getMapNearCacheManager();
return mapNearCacheManager.getInvalidator();
}
protected final void evict(Data justAddedKey) {
if (mapContainer.getEvictor() == Evictor.NULL_EVICTOR) {
return;
}
recordStore.evictEntries(justAddedKey);
disposeDeferredBlocks();
}
@Override
public int getFactoryId() {
return MapDataSerializerHook.F_ID;
}
@Override
public ObjectNamespace getServiceNamespace() {
MapContainer container = mapContainer;
if (container == null) {
MapService service = getService();
container = service.getMapServiceContext().getMapContainer(name);
}
return container.getObjectNamespace();
}
// for testing only
public void setMapService(MapService mapService) {
this.mapService = mapService;
}
// for testing only
public void setMapContainer(MapContainer mapContainer) {
this.mapContainer = mapContainer;
}
protected final void publishWanUpdate(Data dataKey, Object value) {
publishWanUpdateInternal(dataKey, value, false);
}
private void publishWanUpdateInternal(Data dataKey, Object value, boolean hasLoadProvenance) {
if (!canPublishWanEvent) {
return;
}
Record<Object> record = recordStore.getRecord(dataKey);
if (record == null) {
return;
}
Data dataValue = toHeapData(mapServiceContext.toData(value));
ExpiryMetadata expiryMetadata = recordStore.getExpirySystem().getExpiryMetadata(dataKey);
WanMapEntryView<Object, Object> entryView = createWanEntryView(
toHeapData(dataKey), dataValue, record, expiryMetadata,
getNodeEngine().getSerializationService());
mapEventPublisher.publishWanUpdate(name, entryView, hasLoadProvenance);
}
protected final void publishLoadAsWanUpdate(Data dataKey, Object value) {
publishWanUpdateInternal(dataKey, value, true);
}
protected final void publishWanRemove(@Nonnull Data dataKey) {
if (!canPublishWanEvent) {
return;
}
mapEventPublisher.publishWanRemove(name, toHeapData(dataKey));
}
protected boolean disableWanReplicationEvent() {
return false;
}
protected final TxnReservedCapacityCounter wbqCapacityCounter() {
return recordStore.getMapDataStore().getTxnReservedCapacityCounter();
}
protected final Data getValueOrPostProcessedValue(Record record, Data dataValue) {
if (!isPostProcessing(recordStore)) {
return dataValue;
}
return mapServiceContext.toData(record.getValue());
}
@Override
public TenantControl getTenantControl() {
return getNodeEngine().getTenantControlService()
.getTenantControl(MapService.SERVICE_NAME, name);
}
@Override
public boolean requiresTenantContext() {
return true;
}
}
- 既然要线程异步去执行,所以它肯定要实现run方法,所以找到run方法,进入runInternal。实现方法很多,找到map包相关的类。
@Override
protected void runInternal() {
Object currentValue = recordStore.get(dataKey, false, getCallerAddress());
if (noCopyReadAllowed(currentValue)) {
// in case of a 'remote' call (e.g a client call) we prevent making
// an on-heap copy of the off-heap data
result = (Data) currentValue;
} else {
// in case of a local call, we do make a copy, so we can safely share
// it with e.g. near cache invalidation
result = mapService.getMapServiceContext().toData(currentValue);
}
}
- 这里基本就是获取到hazelcast管理的内存中数据的地方,不再一一debug,一路向下找到代码
public V get(Object key) {
int hash = hashOf(key);
return segmentFor(hash).get(key, hash);
}
- 怎么样,熟悉吧。java的map调用是不是也是这样,先hash找到位置,在获取数据。其实这里的hash和map的hash有一些区别。这是由于hazelcast的架构决定的,如果对原理架构感兴趣可以百度搜一搜,很多。这里大概提一嘴,有一个分片的概念,put的时候会hash到不同的分区(分片)。这也是hazelcast分布式的原理。
7、结语
本文只是介绍了hazelcast的最基本用法,如果按照案例中的使用,完全可以用redis或者本地缓存。但是如果有了更高级(实际中的使用),那么hazelcast的分布式计算特性还是很好用的。源码也只是分析了本地的调用。如果感兴趣其实可以debug跟进去看下远程调用的方式。其实想想本质还是一样,远程调用就需要1、发现节点;2、注册节点;3、网络调用其他节点。而seatunnel的调用就相对来说更高级一些,它进行了一系列的封装。最后也还是网络调用其他节点。然后返回future阻塞等待返回结果,由于是内存级别的,处理特别快。
对了差点忘记一点,一直在说分布式特性。本文只说了单纯作为缓存使用get、put方法。这里大概介绍下分布式api的使用
IExecutorService executorService = hazelcastInstance.getExecutorService("myExecutor");
Runnable task = () -> {
// 这里是任务的逻辑
System.out.println("Executing task on " + hazelcastInstance.getCluster().getLocalMember().getAddress());
};
Future<Void> future = executorService.submit(task);
future.get(); // 等待任务完成
这样就可以查询分布式节点上的数据,然后聚合返回。是不是有点像MapReduce。确实,hazelcast也可以使用MapReduce进行复杂运算,想了解的,也可以去搜一搜看看。