[华为北向网管NCE开发教程(6)消息订阅

1.作用

之前介绍的都是我们向网管NCE发起请求获取数据,消息订阅则反过来,是网管NCE系统给我们推送信息。其原理和MQ,JMS这些差不多,这里不过多累述。

2.场景

所支持订阅的场景有如下,以告警通知为例,当我们订阅告警通知以后,如果NCE网管有告警通知产生以后,就会给订阅的人发送一个通知(也就是实时告警推送)。那么我们就可以接收到如下的通知。

2024-06-06 00:09:30c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160140.0Z, X.733::EventType=securityAlarm, emsTime=20240605160142.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784751, isClearable=true, affectedTPList=21}
2024-06-06 00:09:36c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160147.0Z, X.733::EventType=securityAlarm, emsTime=20240605160149.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784752, isClearable=true, affectedTPList=21}
2024-06-06 00:09:43c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160155.0Z, X.733::EventType=securityAlarm, emsTime=20240605160156.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784753, isClearable=true, affectedTPList=21}
2024-06-06 00:09:50c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160202.0Z, X.733::EventType=securityAlarm, emsTime=20240605160203.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784755, isClearable=true, affectedTPList=21}
2024-06-06 00:10:01c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160213.0Z, X.733::EventType=securityAlarm, emsTime=20240605160214.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784756, isClearable=true, affectedTPList=21}

同理,如果我们订阅了文件传输状态通知,当存在文件传输完成的时候会收到如下通知,通知信息中包含了,文件传输完成后,文件的存储地址。

2024-06-06 10:15:26c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786334, fileName=pm/sdh/0605-0606/3145740.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
2024-06-06 10:15:39c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786335, fileName=pm/sdh/0605-0606/3145734.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
2024-06-06 10:15:42c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786336, fileName=pm/sdh/0605-0606/3145739.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
通知类型说明
NT_ALARM告警通知
NT_ALARM_UPDATED告警更新通知
NT_TCA性能越限告警通知
NT_OBJECT_CREATION对象创建通知
NT_OBJECT_DELETION对象删除通知
NT_ATTRIBUTE_VALUE_CHANGE属性改变通知
NT_STATE_CHANGE状态改变通知
NT_ROUTE_CHANGE路由改变通知
NT_PROTECTION_SWITCH保护倒换通知
NT_FILE_TRANSFER_STATUS文件传输状态通知
NT_EPROTECTION_SWITCH设备保护倒换通知事件
NT_ASON_RESOURCE_CHANGE智能资源改变通知
NT_PRBSTEST_STATUS伪随机码测试状态通知
NT_WDMPROTECTION_SWITCH波分保护倒换通知
NT_ATMPROTECTION_SWITCH ATM保护倒换通知
NT_RPRPROTECTION_SWITCH RPR保护组倒换通知事件格式
NT_IPPROTECTION_SWITCH Tunnel保护组倒换通知事件格式

3.如何开订阅(SpringBoot为例)

3.1登录NCE

3.1.1CorbaLoginReq

配置文件的登录参数如下

huawei: 
  nce: 
    login: 
      corba:
        host: 127.0.0.1
        port: 12001
        userName: 111111
        passWord: 111111

配置文件参数注入Spring Bean

import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;

import lombok.Data;

@Data
@SpringBootConfiguration
@ConfigurationProperties(prefix = "huawei.nce.login.corba")
public class CorbaLoginReq {
	
	private String host;
	
	private String port;
	
	private String userName;
	
	private String passWord;
}

3.1.2CorbaLoginRes

登录返回参数

import org.omg.DynamicAny.DynAnyFactory;

import lombok.Data;
import mtnm.tmforum.org.emsSession.EmsSession_I;

@Data
public class CorbaLoginRes {
	private org.omg.CORBA.ORB orb;
	private org.omg.PortableServer.POA rootPOA ;
	private EmsSession_I emsSession;
	private DynAnyFactory dynAnyFactory;
}

3.1.3TANmsSession_IImpl

import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.session.Session_I;
/**
 * NmsSession_IPOA for EMS(NCE) invoking. 
 * @author
 *
 */
public class TANmsSession_IImpl extends NmsSession_IPOA {
	public void eventLossCleared(String endTime) {
		log("TANmsSession_IImpl.eventLossCleared(String endTime) is invoked by EMS(NCE).");
		log("endTime:"+endTime);
	}
	public void eventLossOccurred(String startTime, String notificationId) {
		log("TANmsSession_IImpl.eventLossOccurred(String startTime, String notificationId) is invoked by EMS.");
		log("startTime:"+startTime+", notificationId:"+notificationId);
	}
	public Session_I associatedSession() {
		log("TANmsSession_IImpl.associatedSession() is invoked by EMS(NCE).");
		return null;
	}
	public void endSession() {
		log("TANmsSession_IImpl.endSession() is invoked by EMS(NCE).");
	}
	public void ping() {
		log("TANmsSession_IImpl.ping() is invoked by EMS(NCE).");
	}
	private static void log(String str){
		System.out.println(str);
	}
}

3.1.4BaseCorbaService

public interface BaseCorbaService {

	/**
	 * @description:登录华为nce-corba
	 * @author:hutao
	 * @mail:hutao1@epri.sgcc.com.cn
	 * @date:2024年3月1日 下午4:19:59
	 */
	CorbaLoginRes login();
	
	/**
	 * @description:清空登录
	 * @author:hutao
	 * @mail:hutao1@epri.sgcc.com.cn
	 * @date:2024年6月7日 下午3:24:02
	 */
	void clearLogin();
}
import java.util.Arrays;
import java.util.List;

import org.omg.CosNaming.NameComponent;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynAnyFactoryHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.collect.sdh.module.corba.entity.CorbaLoginReq;
import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.entity.TANmsSession_IImpl;
import com.collect.sdh.module.corba.service.BaseCorbaService;

import mtnm.tmforum.org.common.Common_IHolder;
import mtnm.tmforum.org.emsMgr.EMSMgr_I;
import mtnm.tmforum.org.emsMgr.EMSMgr_IHelper;
import mtnm.tmforum.org.emsSession.EmsSession_I;
import mtnm.tmforum.org.emsSession.EmsSession_IHolder;
import mtnm.tmforum.org.emsSession.EmsSession_IPackage.managerNames_THolder;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_I;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_IHelper;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_I;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_IHelper;
import mtnm.tmforum.org.equipment.EquipmentOrHolderIterator_IHolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolderList_THolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolder_T;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfoList_THolder;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfo_T;
import mtnm.tmforum.org.equipment.PhysicalLocationInfoList_THolder;
import mtnm.tmforum.org.equipment.PhysicalLocationInfo_T;
import mtnm.tmforum.org.globaldefs.NameAndStringValue_T;
import mtnm.tmforum.org.globaldefs.NamingAttributesIterator_IHolder;
import mtnm.tmforum.org.globaldefs.NamingAttributesList_THolder;
import mtnm.tmforum.org.globaldefs.ProcessingFailureException;
import mtnm.tmforum.org.managedElement.ManagedElementIterator_IHolder;
import mtnm.tmforum.org.managedElement.ManagedElementList_THolder;
import mtnm.tmforum.org.managedElement.ManagedElement_T;
import mtnm.tmforum.org.managedElement.ManagedElement_THolder;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_I;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_I;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetwork_T;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkIterator_IHolder;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkList_THolder;
import mtnm.tmforum.org.nmsSession.NmsSession_I;
import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.subnetworkConnection.CCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnectList_THolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnect_T;
import mtnm.tmforum.org.subnetworkConnection.Route_THolder;
import mtnm.tmforum.org.subnetworkConnection.SNCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnectionList_THolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_T;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointIterator_IHolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointList_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPoint_T;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkIterator_IHolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkList_THolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLink_T;

@Service
public class BaseCorbaServiceImpl implements BaseCorbaService {

	@Autowired
	private CorbaLoginReq loginReq;
	
	private CorbaLoginRes login;
	
	/**
	 * @description:清空登录
	 * @author:hutao
	 * @mail:hutao1@epri.sgcc.com.cn
	 * @date:2024年6月7日 下午3:24:02
	 */
	@Override
	public void clearLogin() {
		login = null;
	}
	
	/**
	 * @description:登录华为nce-corba
	 * @author:hutao
	 * @mail:hutao1@epri.sgcc.com.cn
	 * @date:2024年3月1日 下午4:19:59
	 */
	@Override
	public CorbaLoginRes login() {
		if(login != null) {
			/*本应该检测登录是否可用,如果可用,则返回登录信息,不可用则重新登录,(不知道是否可以使用emsSession.ping()来判断)
			  但是没找到华为有这个接口,因此如果出现不可抗力因素导致登录无效,例如网络中断
			  则通过com.collect.sdh.module.test.TestCorbaController.cleanLogin()清空登录
			*/	
			return login;
		}
		try {
			login = new CorbaLoginRes();
			String[] argv = new String[2];
			argv[0] = "-ORBInitRef";
			argv[1] = "NameService=corbaloc::" + loginReq.getHost() + ":" + loginReq.getPort() + "/NameService";
			org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(argv, null);
			org.omg.PortableServer.POA rootPOA = org.omg.PortableServer.POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
			rootPOA.the_POAManager().activate();
			DynAnyFactory dynAnyFactory = DynAnyFactoryHelper.narrow(orb.resolve_initial_references("DynAnyFactory"));
			org.omg.CosNaming.NamingContextExt nc = org.omg.CosNaming.NamingContextExtHelper.narrow(orb.resolve_initial_references("NameService"));
			org.omg.CosNaming.NameComponent[] name;
			name = new NameComponent[5];
			name[0] = new NameComponent("TMF_MTNM", "Class");
			name[1] = new NameComponent("HUAWEI", "Vendor");
			name[2] = new NameComponent("Huawei/NCE", "EmsInstance");
			name[3] = new NameComponent("2.0", "Version");
			name[4] = new NameComponent("Huawei/NCE", "EmsSessionFactory_I");
			EmsSessionFactory_I emsSessionFactory = EmsSessionFactory_IHelper.narrow(nc.resolve(name));
			NmsSession_IPOA pNmsSessionServant = new TANmsSession_IImpl();
			NmsSession_I nmsSession = pNmsSessionServant._this(orb);
			EmsSession_IHolder emsSessionInterfaceHolder = new EmsSession_IHolder();
			emsSessionFactory.getEmsSession(loginReq.getUserName(), loginReq.getPassWord(), nmsSession, emsSessionInterfaceHolder);
			EmsSession_I emsSession = emsSessionInterfaceHolder.value;
			login.setDynAnyFactory(dynAnyFactory);
			login.setOrb(orb);
			login.setRootPOA(rootPOA);
			login.setEmsSession(emsSession);
			return login;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}
}

3.2定制通知

3.2.1ConsumerNotice

需要实现接口:org.omg.CosNotifyComm.StructuredPushConsumerPOA

import java.util.HashMap;
import java.util.Map;

import org.omg.CosEventComm.Disconnected;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.StructuredEvent;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.StructuredPushConsumerPOA;
import org.springframework.util.ObjectUtils;

import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.AnyUtil;

import lombok.extern.log4j.Log4j2;

/**
 * @description:消费通知
 * @author:hutao
 * @mail:hutao1@epri.sgcc.com.cn
 * @date:2024年5月7日 上午10:57:26
 */
@Log4j2
public class ConsumerNotice extends StructuredPushConsumerPOA{

	private CorbaLoginRes loginRes;
	
	public ConsumerNotice(CorbaLoginRes loginRes) {
		super();
		this.loginRes = loginRes;
	}

	private static Map<String, String> noticeTypes = new HashMap<>();
	
	static {
		noticeTypes.put("NT_ALARM", "告警通知");
		noticeTypes.put("NT_ALARM_UPDATED", "告警更新通知");
		noticeTypes.put("NT_TCA", "性能越限告警通知");
		noticeTypes.put("NT_OBJECT_CREATION", "对象创建通知");
		noticeTypes.put("NT_OBJECT_DELETION", "对象删除通知");
		noticeTypes.put("NT_ATTRIBUTE_VALUE_CHANGE", "属性改变通知");
		noticeTypes.put("NT_STATE_CHANGE", "状态改变通知");
		noticeTypes.put("NT_ROUTE_CHANGE", "路由改变通知");
		noticeTypes.put("NT_PROTECTION_SWITCH", "保护倒换通知");
		noticeTypes.put("NT_FILE_TRANSFER_STATUS", "文件传输状态通知");
		noticeTypes.put("NT_EPROTECTION_SWITCH", "设备保护倒换通知事件");
		noticeTypes.put("NT_ASON_RESOURCE_CHANGE", "智能资源改变通知");
		noticeTypes.put("NT_PRBSTEST_STATUS", "伪随机码测试状态通知");
		noticeTypes.put("NT_WDMPROTECTION_SWITCH", "波分保护倒换通知");
		noticeTypes.put("NT_ATMPROTECTION_SWITCH", "ATM保护倒换通知");
		noticeTypes.put("NT_RPRPROTECTION_SWITCH", "RPR保护组倒换通知事件格式");
		noticeTypes.put("NT_IPPROTECTION_SWITCH", "Tunnel保护组倒换通知事件格式");
	}
	
	@Override
	public void disconnect_structured_push_consumer() {
		log.info("Consumer disconnect_structured_push_consumer");
	}

	@Override
	public void push_structured_event(StructuredEvent event) throws Disconnected {
		String eventType = event.header.fixed_header.event_type.type_name;
		Map<String, Object> eventData = new HashMap<>(event.filterable_data.length);
		for (int i = 0; i < event.filterable_data.length; i++) {
			if (!ObjectUtils.isEmpty(event.filterable_data[i])) {
				eventData.put(event.filterable_data[i].name, AnyUtil.parseAny( event.filterable_data[i].value, loginRes.getDynAnyFactory()));
			}
		}
		log.info("收到事件通知:{}<{}>,通知参数:{}",eventType, noticeTypes.get(eventType), eventData);
	}

	@Override
	public void offer_change(EventType[] arg0, EventType[] arg1) throws InvalidEventType {
		
	}

}

3.2.2AnyUtil

用于解析返回的信息。

import org.omg.CORBA.Any;
import org.omg.CORBA.TCKind;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynArray;
import org.omg.DynamicAny.DynEnum;
import org.omg.DynamicAny.DynSequence;
import org.omg.DynamicAny.DynStruct;
import org.omg.DynamicAny.DynUnion;

/**
 * @description:org.omg.DynamicAny格式化工具
 * @author:hutao
 * @mail:hutao1@epri.sgcc.com.cn
 * @date:2024年5月7日 上午11:33:17
 */
public class AnyUtil {
	
	/**
	 * @description:格式化数据
	 * @author:hutao
	 * @mail:hutao1@epri.sgcc.com.cn
	 * @date:2024年5月7日 上午11:34:17
	 */
    public static String parseAny(Any any, DynAnyFactory factory){
		if( null==any ){
			return null;
		}
		StringBuilder result = new StringBuilder();
		try {
			switch (any.type().kind().value()) {
			case TCKind._tk_char:
				result.append(any.extract_char());break;
			case TCKind._tk_null:
				break;
			case TCKind._tk_boolean:
				result.append(any.extract_boolean());
				break;
			case TCKind._tk_short:
				result.append(any.extract_short());
				break;
			case TCKind._tk_long:
				result.append(any.extract_long());
				break;
			case TCKind._tk_double:
				result.append(any.extract_double());
				break;
			case TCKind._tk_float:
				result.append(any.extract_float());
				break;
			case TCKind._tk_octet:
				result.append(any.extract_octet());
				break;
			case TCKind._tk_ulong:
				result.append(any.extract_ulong());
				break;
			case TCKind._tk_string:
				result.append(any.extract_string());
				break;
			case TCKind._tk_enum:
			{
				DynEnum dynEnum = (DynEnum) factory.create_dyn_any(any);
				result.append(dynEnum.get_as_string());
				break;
			}
			case TCKind._tk_any:
			{
				any=factory.create_dyn_any(any).get_any();
				result.append(any);
				break;
			}
			case TCKind._tk_objref:
			{
				result.append(any.extract_Object());
				break;
			}
			case TCKind._tk_struct:
			case TCKind._tk_except:
			{
				DynStruct dynstruct = (DynStruct) factory.create_dyn_any(any);
				org.omg.DynamicAny.NameValuePair[] members = dynstruct.get_members();
				result.append("{");
				for (int i = 0; i < members.length; i++) {
					if(i>0){
						result.append(" ");
					}
					result.append(members[i].id).append(" ").append(parseAny(members[i].value, factory));
				}
				result.append("}");
				break;
			}
			case TCKind._tk_union:
				DynUnion dynunion = (DynUnion) factory.create_dyn_any(any);
				result.append(dynunion.member_name()).append(" ");
				result.append(parseAny(dynunion.member().to_any(), factory));
				break;
			case TCKind._tk_sequence:
				DynSequence dynseq = (DynSequence) factory.create_dyn_any(any);
				Any[] contents = dynseq.get_elements();
				result.append("{");
				for (int i = 0; i < contents.length; i++){
					result.append(parseAny(contents[i], factory));
				}
				result.append("}");
				break;
			case TCKind._tk_array:
				DynArray dynarray = (DynArray) factory.create_dyn_any(any);
				Any[] arrayContents = dynarray.get_elements();
				result.append("{");
				for (int i = 0; i < arrayContents.length; i++){
					result.append(parseAny(arrayContents[i], factory)).append("");
				}
				result.append("}");
				break;
			default:
				result.append(any.type().kind().value());
			}
		} catch (Exception ex) {
			ex.printStackTrace();
		}
		return new String(result.toString().getBytes(StandardCharsets.ISO_8859_1));
	}
}

3.3订阅通知

SubscribeNotice 实现 Runnable,即订阅的时候,另起一个线程来订阅。该线程负责订阅。

3.3.1SubscribeNotice

import org.omg.CORBA.IntHolder;
import org.omg.CORBA.Object;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.EventChannelHolder;
import org.omg.CosNotifyChannelAdmin.ProxySupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper;
import org.omg.CosNotifyComm.StructuredPushConsumerHelper;
import org.springframework.util.ObjectUtils;

import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.JsonUtils;

import lombok.extern.log4j.Log4j2;

/**
 * @description:订阅消费通知
 * @author:hutao
 * @mail:hutao1@epri.sgcc.com.cn
 * @date:2024年5月7日 上午9:33:18
 */
@Log4j2
public class SubscribeNotice implements Runnable{

	/**
	 * 登录corba成功后的参数
	 */
	private CorbaLoginRes loginRes;
	
	/**
	 * 记录订阅通知的通道ID的存储文件地址
	 */
	private String poxyIdPath;
	
	public SubscribeNotice(CorbaLoginRes loginRes, String poxyIdPath) {
		super();
		this.loginRes = loginRes;
		this.poxyIdPath = poxyIdPath;
	}

	@Override
	public void run() {
		try {
			//获取通道
			IntHolder poxyId = new IntHolder();
			poxyId.value = getPoxyId(poxyIdPath);
			EventChannelHolder eventChannel = new EventChannelHolder();
			loginRes.getEmsSession().getEventChannel(eventChannel);
			//ConsumerNotice extends StructuredPushConsumerPOA 为消费者
			ConsumerNotice consumerNotice = new ConsumerNotice(loginRes);
			ConsumerAdmin defaultConsumerAdmin = eventChannel.value.default_consumer_admin();
			//连接通道,如果发现通道已经打开,则先关闭之前的通道(已经打开的通道即使不可以,北向接口并未释放该接口的资源,但是会限制连接通道(数量 < 3))
			try {
				if (poxyId.value > 0){
					log.info("释放旧的消费通道:{}", poxyId.value);
					ProxySupplier oldSupplier = defaultConsumerAdmin.get_proxy_supplier(poxyId.value);
					assert (oldSupplier != null);
					StructuredProxyPushSupplier myOldPoxy = StructuredProxyPushSupplierHelper.narrow(oldSupplier);
					myOldPoxy.disconnect_structured_push_supplier();
				}
			}catch (Exception e) {
				e.printStackTrace();
			}
			ProxySupplier tmpSupplier = defaultConsumerAdmin.obtain_notification_push_supplier(ClientType.STRUCTURED_EVENT, poxyId);
			StructuredProxyPushSupplier proxyPushSupplier = StructuredProxyPushSupplierHelper.narrow(tmpSupplier);
			Object servant = loginRes.getRootPOA().servant_to_reference(consumerNotice);
			proxyPushSupplier.connect_structured_push_consumer(StructuredPushConsumerHelper.narrow(servant));
			savePoxyId(poxyIdPath, poxyId.value);
			log.info("保存此次的消费通道:{}", poxyId.value);
			loginRes.getOrb().run();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * @description:获取已经连接的消费通道ID
	 * @author:hutao
	 * @mail:hutao1@epri.sgcc.com.cn
	 * @date:2024年5月7日 上午10:40:57
	 */
	public int getPoxyId(String path) {
		int poxyId = -1;
		//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库
		String str = JsonUtils.readStringFromSystemPath(path);
		if(!ObjectUtils.isEmpty(str)) {
			poxyId = Integer.parseInt(str);
		}
		return poxyId;
	}
	
	/**
	 * @description:保存已经连接的消费通道ID
	 * @author:hutao
	 * @mail:hutao1@epri.sgcc.com.cn
	 * @date:2024年5月7日 上午10:41:33
	 */
	public void savePoxyId(String path, int poxyId) {
			//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库
		JsonUtils.writeStringToSystemPath(path, String.valueOf(poxyId));
	}
}

3.3.2JsonUtils

为了保证代码完整性,如果你完全抄上面的代码,这里提供了代码需要的两个文件操作示例


 public static String readStringFromSystemPath(String path) {
    	String data = "";
    	try {
    		InputStream inputStream = new FileInputStream(path);
    		byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);
    		data = new String(bdata, StandardCharsets.UTF_8);
		} catch (FileNotFoundException e) {
			log.info("文件不存在,文件地址:{}", path);
		} catch (Exception e) {
			log.info("读取文件失败,文件地址:{},失败原因:{}", path,e.getMessage());
		} 
		return data;
    }

 public static void writeStringToSystemPath(String filePath, String str) {
		Writer write = null;
		try {
			File file = new File(filePath);
			if(file.exists()) {
				file.delete();
			}
			if (!file.getParentFile().exists()) {
				file.getParentFile().mkdirs();
			}
			if(file.createNewFile()) {
				write = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8);
				write.write(str);
				write.flush();
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(write !=null ) {
				try {
					write.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

3.4启动订阅

这里我们使用SpringBoot启动的时候启动订阅,即实现ApplicationRunner,然后使用线程池的单线程来启动上面我们编写的线程。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.service.BaseCorbaService;

/**
 * @description:启动订阅corba的消费
 * @author:hutao
 * @mail:hutao1@epri.sgcc.com.cn
 * @date:2024年5月7日 下午4:18:16
 */
@Component
public class SubscribeRunner implements ApplicationRunner  {
    
	@Value(value = "${file-save-path}")
	private String poxyIdPath;
	
	@Autowired
	private BaseCorbaService baseCorbaService;

	@Override
	public void run(ApplicationArguments args) throws Exception {
		poxyIdPath = poxyIdPath + "poxyId";
		CorbaLoginRes login = baseCorbaService.login();
		ExecutorService executor = Executors.newSingleThreadExecutor();
		executor.submit(new SubscribeNotice(login, poxyIdPath));
	}
}

4.效果展示

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/716501.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

建筑工程软件Revit中复杂大模型如何实现Web端轻量化?| HOOPS技术应用

建筑信息模型&#xff08; BIM&#xff09;技术在建筑工程中扮演着越来越重要的角色&#xff0c;而Autodesk Revit作为主流的BIM软件&#xff0c;被广泛应用于设计、施工和管理。然而&#xff0c;Revit生成的复杂大模型常常由于数据量庞大而难以直接在Web端展示和操作。这时&am…

linux日志管理之journalctl命令

一、日志查询 1.输出所有日志或按相关要求输出 输出所有日志 #journalctl查看实时日志 #journalctl -f查看最后n行 #journalctl -n 10不分页显示 #journalctl --no-pager适合阅读模式 #journalctl -p 3 -o json-pretty 查看内核日志 #journalctl -k 2.按服务查询 #journal…

植物大战僵尸杂交版最新pvzHE_v2.1.0含游戏窗口放大工具

植物大战僵尸杂交版是由B站”潜艇伟伟迷”UP主制作的一款同人策略塔防游戏&#xff0c;也叫pvzHE&#xff0c;该游戏由《植物大战僵尸》原版魔改而来&#xff0c;引入了创新的杂交合成系统&#xff0c;让玩家可以将不同植物进行杂交&#xff0c;创造出具有全新能力和外观的植物…

【低级错误笔记】debug的步入按钮为灰色

访问login方法没有加RestController 难怪点击登录的时候总是显示404资源错误 看到闪过一秒的无法访问/employee/login路径才发现这个思路……………………………………………………………………………………………………………………………………太无语了 拖拖拉拉从6.3到今天…

Digital Video Repair3.7.1.0 --一款免费的视频文件修复工具,供大家学习研究参考

下载地址&#xff1a; https://download.csdn.net/download/weixin_43097956/89431959

领夹麦克风哪个品牌音质最好?轻揭无线麦克风哪个品牌性价比高!

​随着短视频热潮的兴起&#xff0c;越来越多的人倾向于用vlog记录日常生活&#xff0c;同时借助短视频和直播平台开辟了副业。在这一过程中&#xff0c;麦克风在近两年内迅速发展&#xff0c;从最初的简单收音功能演变为拥有多样款式和功能&#xff0c;以满足视频创作的需求。…

用 微 / 积分思想妙解关于等比数列的和

同理&#xff0c;也是微积分思想&#xff1a; 求 (\sum_{k1}^n q^k) 的和&#xff1a; 我们知道几何级数的求和公式&#xff1a; ∑ k 0 n q k 1 − q n 1 1 − q (对于 q ≠ 1 ) \sum_{k0}^n q^k \frac{1-q^{n1}}{1-q} \quad \text{(对于 } q \neq 1\text{)} k0∑n​qk…

算法02 递归算法及其相关问题【C++实现】

递归 在编程中&#xff0c;我们把函数直接或者间接调用自身的过程叫做递归。 递归处理问题的过程是&#xff1a;通常把一个大型的复杂问题&#xff0c;转变成一个与原问题类似的&#xff0c;规模更小的问题来进行求解。 递归的三大要素 函数的参数。在用递归解决问题时&…

如何了解基金的估值

一、优秀的估值产品 钉大在《定投十年 财务自由》和《指数基金投资指南》中不止一次提到过要「结合估值来投资」&#xff0c;为此&#xff0c;他每个交易日他的公众号「银行螺丝钉」中都会发布他编制的基金估值表&#xff0c;最新的一期已经是第2281期了。 这是钉大昨天&#x…

一文快速认识环形光源——CCS光源

机器视觉系统中&#xff0c;光源起着重要作用&#xff0c;不同类型的光源应用也不同&#xff0c;选择合适的光源成像效果非常明显。今天我们一起来看看CCS光源——工业用环形光源LDR2系列。 LDR2系列是标准的环形光源&#xff0c;通过采用柔性基板&#xff0c;可创造任意角度。…

CPRI协议的理解——CPRI中的扰码

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 CPRI协议的理解——CPRI中的扰码 前言8B10B线路编码下的扰码发送端接收 64B66B线路编码下的扰码带有终止控制字符的控制块格式带有起始控制字符的控制块格式数据块格式 前言 …

9种编程语言的对比分析

在当今的软件开发领域&#xff0c;编程语言扮演着至关重要的角色。不同的编程语言各有其特点和适用场景&#xff0c;选择合适的编程语言能够提高开发效率和软件质量。本文将对十种常见的编程语言进行对比分析&#xff0c;帮助读者了解它们的优缺点和适用场景。 Java 特点&…

中小企业使用CRM系统的优势有哪些

中小企业如何在竞争激烈的市场中脱颖而出&#xff1f;除了优秀的产品和服务&#xff0c;一个高效的管理工具也是必不可少的。而客户关系管理&#xff08;CRM&#xff09;系统正是这样一个能帮助企业提升客户体验、优化内部管理流程的重要工具。接下来&#xff0c;让我们一起探讨…

freemarker 使用

首次使用freemarker遇到的全是坑,还好,各种问题,最终都解决了。芹菜加油 import com.lowagie.text.pdf.BaseFont; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.xhtmlrenderer.pdf.ITextRenderer;import java.io.Byte…

喜讯 | 全视通获得珠海市第七届“市长杯”工业设计大赛三等奖

近日&#xff0c;在珠海市举行的第七届“市长杯”工业设计大赛颁奖典礼上&#xff0c;珠海全视通信息技术有限公司&#xff08;以下简称“全视通”&#xff09;凭借创新的“医护对讲一体终端机”产品&#xff0c;历经激烈的竞争和严格的评选流程&#xff0c;包括大赛宣传发动、…

python-docx-template 的 Replace docx pictures 占位图片名称从哪来?

python-docx-template 的 Replace docx pictures 占位图片名称从哪来&#xff1f; 在 Word 中看占位图片名称用代码输出输出结果找对应图片 使用 replace_pic参考资料 在 Word 中看占位图片名称 右键图片 》查看可选文字 用代码输出 from docxtpl import DocxTemplate# 初始化…

二刷算法训练营Day30 | 回溯算法(6/6)

目录 详细布置&#xff1a; 1. 回溯总结 2. 332. 重新安排行程 3. 51. N 皇后 4. 37. 解数独 详细布置&#xff1a; 1. 回溯总结 回溯是递归的副产品&#xff0c;只要有递归就会有回溯&#xff0c;所以回溯法也经常和二叉树遍历&#xff0c;深度优先搜索混在一起&#x…

借助浏览器实现一个录屏插件?

说在前面 &#x1f388;不知道大家平时都是使用什么录屏软件呢&#xff1f;有没有想过只用JavaScript我们也可以快速实现一个录屏插件&#xff1f; 准备工作 开始写代码前我们需要先了解一下以下几点&#xff1a; 1、getDisplayMedia navigator.mediaDevices.getDisplayMedi…

【C++】AVL树/红黑树实现及map与set的封装

前言 【C】二叉树进阶&#xff08;二叉搜索树&#xff09; 这篇文章讲述了关于二叉搜索树知识&#xff0c;但是二叉搜索树有其自身的缺陷&#xff0c;假如往树中插入的元素有序或者接近有序&#xff0c;二叉搜索树就会退化成单支树&#xff0c;时间复杂度会退化成O(N)&#xff…

充电学习——0、电源管理

一、设备电源管理&#xff1a; 两种类型 1、系统睡眠模型&#xff1a; 设备驱动作为系统一部分&#xff0c;会跟随系统进入低功耗状态&#xff0c;suspend &#xff08;suspend-to-ram&#xff09; 一些驱动程序可以管理硬件的唤醒事件&#xff0c; 这一特性通过/sys/device/…