JRT实现缓存协议

上一篇介绍的借助ORM的增、删、改和DolerGet方法,ORM可以很精准的知道热点数据做内存缓存。那么就有一个问题存在,即部署了多个站点时候,如果用户在一个Web里修改数据了,那么其他Web的ORM是不知道这个变化的,其他Web还是缓存的老数据的话就会造成其他Web命中的缓存数据是老的,造成不可靠问题。

那么就需要一种机制来解决多Web缓存不一致问题,参照ECP实现,把Web分为主服务器和从服务器。主服务器启动TCP服务端,从服务器启动TCP客户端连接主服务器,从服务器和主服务器之间一直保留TCP长连接用来通知缓存变化数据。这样在一个服务器增、删、改数据后就能及时通知其他服务器更新缓存,这样所有服务器的缓存数据都是可信赖的。

首先提取发送数据的对象类型
在这里插入图片描述

然后实现ECP管理类ECPManager来管理启动服务端和客户端

package JRT.DAL.ORM.Global;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 企业缓存协议,基于TCP在多台服务器直接共享缓存的更新,从而确保每个Web下的缓存数据的可靠性,这里实现TCP服务端当做小机,其他服务当做客户端和小机连接。
 * 小机会分发增加数据(增加数据分发与否倒是不影响缓存可靠性,主要是修改和删除)、修改数据、删除数据的执行记录给每个客户端,客户端按收到的记录把数据推入缓存
 */
public class ECPManager {
    /**
     * 要推入TCP的缓存队列
     */
    public static ConcurrentLinkedDeque ECPQuen = new ConcurrentLinkedDeque();

    /**
     * 主处理进程
     */
    private static Thread MainThread = null;

    /**
     * 发送数据的操作对象
     */
    public static Socket Sender = null;

    /**
     * 写操作对象
     */
    public static PrintWriter Writer = null;

    /**
     * 数据编码
     */
    private static String Encode="GBK";

    /**
     * 缓存所有客户端
     */
    private static ConcurrentHashMap<String,Socket> AllClient= new ConcurrentHashMap<>();

    /**
     * IP地址
     */
    private static String IP="";

    /**
     * 端口
     */
    private static  int Port=1991;

    /**
     * 是否启用了ECP
     */
    private static boolean UseEcp=false;

    /**
     * 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可
     *
     * @param obj
     * @throws Exception
     */
    public static void InECPToQuen(ECPDto obj) throws Exception{
        ECPQuen.add(obj);
    }

    /**
     * push目前的ECP缓存数据到远程服务器,供GlobalManager定时器定时推送
     */
    public static void TryPushEcp() throws Exception
    {
        try {
            //如果是客户端,先检查连接
            if (!IP.isEmpty()) {
                //重连
                if (Sender == null || Sender.isClosed()) {
                    LogUtils.WriteSecurityLog("ECP尝试重连:" + IP + ":" + Port);
                    StartEcpManager(IP, Port);
                    Thread.sleep(1000);
                }
                if (Sender == null || Sender.isClosed()) {
                    return;
                } else {
                    LogUtils.WriteSecurityLog("ECP尝试重连成功:" + IP + ":" + Port);
                }
            }
            List<ECPDto> pushList=null;
            //从缓存队列里面弹出数据push
            while (ECPQuen.size() > 0) {
                ECPDto obj = (ECPDto)ECPQuen.pop();
                if (obj != null) {
                    //启用了ECP就push到服务端,否则就只更新自己缓存
                    if(UseEcp==true)
                    {
                        //初始化push列表
                        if(pushList==null)
                        {
                            pushList=new ArrayList<>();
                        }
                        pushList.add(obj);
                    }
                    else
                    {
                        //转换成数据实体推入缓存
                        JRT.DAL.ORM.Global.GlobalManager.InCache(obj);
                    }
                }
            }
            //直接列表一次推送,没有启用ECP的话这个列表一直是空
            if(pushList!=null)
            {
                //客户端推送
                if (!IP.isEmpty()) {
                    Writer.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));
                    Writer.flush();
                }
                //服务端推送
                else {
                    //给每个连接的客户端推送信息
                    for (String ip : AllClient.keySet()) {
                        Socket oneClient = AllClient.get(ip);
                        //移除关闭的客户端
                        if (oneClient.isClosed()) {
                            AllClient.remove(ip);
                        }
                        PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);
                        oneWriter.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));
                        oneWriter.flush();
                    }
                }
            }
        }
        catch (Exception ex)
        {
            LogUtils.WriteExceptionLog("推送数据到ECP异常", ex);
        }
    }

    /**
     * 启动ECP管理
     * @param ip
     * @param port
     */
    public static void StartEcpManager(String ip, int port)
    {
        IP=ip;
        Port=port;
        //当客户端
        if (!ip.isEmpty()) {
            MainThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //得到输入流
                    InputStream inputStream = null;
                    //创建Socket对象并连接到服务端
                    Socket socket = null;
                    try {
                        //创建Socket对象并连接到服务端
                        socket = new Socket(ip, port);
                        Sender = socket;
                        Writer = new PrintWriter(new OutputStreamWriter(Sender.getOutputStream(), Encode), false);
                        //得到输入流
                        inputStream = socket.getInputStream();
                        //IO读取
                        byte[] buf = new byte[10240];
                        int readlen = 0;
                        //阻塞读取数据
                        while ((readlen = inputStream.read(buf)) != -1) {
                            try {
                                String ecpJson = new String(buf, 0, readlen, Encode);
                                //得到ECP实体
                                List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);
                                if(dtoList!=null&&dtoList.size()>0)
                                {
                                    for(ECPDto dto:dtoList) {
                                        //转换成数据实体推入缓存
                                        JRT.DAL.ORM.Global.GlobalManager.InCache(dto);
                                    }
                                }
                            }
                            catch (Exception ee)
                            {
                                LogUtils.WriteExceptionLog("ECP处理主服务器数据异常", ee);
                            }
                        }
                    }
                    catch (IOException ex) {
                        LogUtils.WriteExceptionLog("ECP侦听TCP服务异常", ex);
                    }
                    finally {
                        Sender=null;
                        try {
                            if (inputStream != null) {
                                //关闭输入
                                inputStream.close();
                            }
                            if (Writer != null) {
                                Writer.flush();
                                //关闭输出
                                Writer.close();
                                Writer=null;
                            }
                            if (socket != null) {
                                // 关闭连接
                                socket.close();
                            }
                        }
                        catch (Exception ex) {
                            LogUtils.WriteExceptionLog("释放TCP资源异常", ex);
                        }

                    }
                }
            });

        }
        //当服务端
        else {
            MainThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //得到输入流
                    InputStream inputStream = null;
                    //创建Socket对象并连接到服务端
                    Socket socket = null;
                    try {
                        ServerSocket serverSocket = new ServerSocket(port);
                        //增加一个无限循环
                        while (true) {
                            //等待客户端连接,阻塞
                            Socket clientSocket = serverSocket.accept();
                            //按IP存客户端连接
                            String clientIP=clientSocket.getInetAddress().getHostAddress();
                            AllClient.put(clientIP,clientSocket);
                            //接收客户端消息
                            Thread ClientThread = new Thread(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        //得到输出流
                                        Writer = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream(), Encode), false);
                                        //得到输入流
                                        InputStream inputStream = clientSocket.getInputStream();
                                        //IO读取
                                        byte[] buf = new byte[10240];
                                        int readlen = 0;
                                        //阻塞读取数据
                                        while ((readlen = inputStream.read(buf)) != -1) {
                                            String ecpJson = new String(buf, 0, readlen, Encode);
                                            //得到ECP实体
                                            List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);
                                            if(dtoList!=null&&dtoList.size()>0)
                                            {
                                                for(ECPDto dto:dtoList) {
                                                    //转换成数据实体推入缓存
                                                    JRT.DAL.ORM.Global.GlobalManager.InCache(dto);
                                                }
                                                //给每个连接的客户端推送信息
                                                for (String ip : AllClient.keySet()) {
                                                    Socket oneClient = AllClient.get(ip);
                                                    //移除关闭的客户端
                                                    if (oneClient.isClosed()) {
                                                        AllClient.remove(ip);
                                                    }
                                                    PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);
                                                    oneWriter.print(ecpJson);
                                                    oneWriter.flush();
                                                }
                                            }
                                        }
                                    }
                                    catch (Exception ee)
                                    {
                                        LogUtils.WriteExceptionLog("ECP处理客户端数据异常", ee);
                                    }
                                }
                            });
                            ClientThread.start();
                        }
                    }
                    catch (IOException ex) {
                        LogUtils.WriteExceptionLog("侦听仪器TCP异常", ex);
                    }
                    finally {
                        try {
                            if (inputStream != null) {
                                //关闭输入
                                inputStream.close();
                            }
                            if (Writer != null) {
                                Writer.flush();
                                //关闭输出
                                Writer.close();
                            }
                            if (socket != null) {
                                // 关闭连接
                                socket.close();
                            }
                        } catch (Exception ex) {
                            LogUtils.WriteExceptionLog("释放TCP资源异常", ex);
                        }

                    }
                }
            });
        }
        //启动主进程
        MainThread.start();
        UseEcp=true;
    }

    /**
     * 返回ECP待处理队列的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewECPQuenDate() throws Exception
    {
        return JRT.Core.Util.JsonUtil.Object2Json(ECPQuen);
    }

}

ECP管理类再对接上GlobalManager

package JRT.DAL.ORM.Global;

import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

import JRT.Core.MultiPlatform.JRTConfigurtaion;
import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.OneGlobalNode;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 实现内存模拟global的效果
 */
public class GlobalManager {
    /**
     * 在内存里缓存热点数据
     */
    private static ConcurrentHashMap<String, ConcurrentHashMap<String, OneGlobalNode>> AllHotData = new ConcurrentHashMap<>();

    /**
     * 要缓存数据的队列
     */
    private static ConcurrentLinkedDeque TaskQuen = new ConcurrentLinkedDeque();

    /**
     * 管理缓存的定时器
     */
    private static Timer ManageTimer = new Timer();

    /**
     * 缓存的最大对象数量
     */
    public static Integer GlobalCacheNum = 100000;

    /**
     * 当前的缓存数量
     */
    private static AtomicInteger CurCacheNum=new AtomicInteger(0);

    /**
     * 最后删除数据的时间
     */
    private static Long LastDeleteTime=null;

    /**
     * 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可
     *
     * @param obj
     * @throws Exception
     */
    public static void InCache(ECPDto obj) throws Exception{
        TaskQuen.add(obj);
    }

    /**
     * 通过主键查询数据
     * @param model
     * @param id
     * @param <T>
     * @return
     * @throws Exception
     */
    public static <T> T DolerGet(T model,Object id) throws Exception
    {
        //实体的名称
        String modelName = model.getClass().getName();
        if(AllHotData.containsKey(modelName))
        {
            //命中数据,克隆返回
            if(AllHotData.get(modelName).containsKey(id))
            {
                OneGlobalNode node=AllHotData.get(modelName).get(id);
                //更新时间
                node.Time=JRT.Core.Util.TimeParser.GetTimeInMillis();
                Object retObj=JRT.Core.Util.JsonUtil.CloneObject(node.Data);
                return (T)retObj;
            }
        }
        return null;
    }

    /**
     * 启动缓存数据管理的线程
     */
    public static void StartGlobalManagerTask() throws Exception{
        //最大缓存数量
        String GlobalCacheNumConf = JRTConfigurtaion.Configuration("GlobalCacheNum");
        if (GlobalCacheNumConf != null && !GlobalCacheNumConf.isEmpty()) {
            GlobalCacheNum = JRT.Core.Util.Convert.ToInt32(GlobalCacheNumConf);
        }
        //定时任务
        TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {
                try {
                    //缓存队列的数据并入缓存
                    while (TaskQuen.size() > 0) {
                        //处理要加入缓存的队列
                        DealOneDataQuen();
                    }
                    //尝试推送ECP数据到主服务
                    JRT.DAL.ORM.Global.ECPManager.TryPushEcp();
                    //清理多余的缓存数据,这里需要讲究算法,要求在上百万的缓存数据里快速找到时间最久远的数据
                    if(CurCacheNum.get()>GlobalCacheNum)
                    {
                        //每轮清理时间处于上次清理时间和当前时间前百分之5的老数据
                        long Diff=(JRT.Core.Util.TimeParser.GetTimeInMillis()-LastDeleteTime)/20;
                        //留下数据的最大时间
                        long LeftMaxTime=LastDeleteTime+Diff;
                        //遍历所有的热点数据
                        for (String model : AllHotData.keySet()) {
                            ConcurrentHashMap<String, OneGlobalNode> oneTableHot=AllHotData.get(model);
                            //记录要删除的数据
                            List<String> delList=new ArrayList<>();
                            for (String key : oneTableHot.keySet()) {
                                OneGlobalNode one=oneTableHot.get(key);
                                //需要删除的数据
                                if(one.Time<LeftMaxTime)
                                {
                                    delList.add(key);
                                }
                            }
                            //移除时间久的数据
                            for(String del:delList)
                            {
                                oneTableHot.remove(del);
                            }
                        }
                    }
                    //清理时间久远的缓存数据
                } catch (Exception ex) {
                    LogUtils.WriteExceptionLog("处理Global缓存异常", ex);
                }
            }
        };
        //尝试启动ECP管理器
        String IP = JRTConfigurtaion.Configuration("ECPIP");
        String Port = JRTConfigurtaion.Configuration("ECPPort");
        if(Port!=null&&!Port.isEmpty())
        {
            if(IP==null)
            {
                IP="";
            }
            //启动ECP管理器
            JRT.DAL.ORM.Global.ECPManager.StartEcpManager(IP,JRT.Core.Util.Convert.ToInt32(Port));
        }
        //启动缓存管理定时器
        ManageTimer.schedule(timerTask, 0, 500);

    }

    /**
     * 通过实体名称获得实体类型信息
     *
     * @param modelName 实体名称
     * @return
     */
    private static Class GetTypeByName(String modelName) throws Exception {
        return JRT.Core.Util.ReflectUtil.GetType(JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName") + ".Entity." + modelName, JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName"));
    }


    /**
     * 处理队列里的一条数据并入缓存
     */
    private static void DealOneDataQuen() {
        try {
            Object obj = TaskQuen.pop();
            if (obj != null) {
                ECPDto dto=(ECPDto)obj;
                //添加或者更新缓存
                if(dto.Cmd.equals("A")||dto.Cmd.equals("U")) {
                    Class type = GetTypeByName(dto.Model);
                    Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);
                    JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
                    //实体的名称
                    String modelName = dto.Model;
                    //得到数据的主键
                    String id = tableInfo.ID.Value.toString();
                    if (!AllHotData.containsKey(modelName)) {
                        ConcurrentHashMap<String, OneGlobalNode> map = new ConcurrentHashMap<>();
                        AllHotData.put(modelName, map);
                    }
                    //更新数据
                    if (AllHotData.get(modelName).containsKey(id)) {
                        AllHotData.get(modelName).get(id).Data = entity;
                        AllHotData.get(modelName).get(id).Time = JRT.Core.Util.TimeParser.GetTimeInMillis();
                    }
                    //加入到缓存
                    else {
                        OneGlobalNode node = new OneGlobalNode();
                        node.Data = entity;
                        node.Time = JRT.Core.Util.TimeParser.GetTimeInMillis();
                        AllHotData.get(modelName).put(id, node);
                        //缓存数量加1
                        CurCacheNum.addAndGet(1);
                        //记录时间
                        if (LastDeleteTime == null) {
                            LastDeleteTime = JRT.Core.Util.TimeParser.GetTimeInMillis();
                        }
                    }
                }
                //删除缓存
                else if(dto.Cmd.equals("D"))
                {
                    Class type = GetTypeByName(dto.Model);
                    Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);
                    JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
                    //实体的名称
                    String modelName = dto.Model;
                    //得到数据的主键
                    String id = tableInfo.ID.Value.toString();
                    if (AllHotData.containsKey(modelName)&&AllHotData.get(modelName).containsKey(id)) {
                        AllHotData.get(modelName).remove(id);
                    }
                }
                //清空表缓存
                else if(dto.Cmd.equals("CLEAR"))
                {
                    //实体的名称
                    String modelName = dto.Model;
                    if (AllHotData.containsKey(modelName)) {
                        AllHotData.get(modelName).clear();
                    }
                }
            }
        }
        catch (Exception ex) {
            LogUtils.WriteExceptionLog("处理Global缓存添加异常", ex);
        }
    }

    /**
     * 返回Global的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewGlobalJson() throws Exception
    {
        return JRT.Core.Util.JsonUtil.Object2Json(AllHotData);
    }

    /**
     * 返回Global待处理队列的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewGlobalTaskQuenDate() throws Exception
    {
        return JRT.Core.Util.JsonUtil.Object2Json(TaskQuen);
    }

}

增删改和DolerGet调整

/**
     * 保存对象,不抛异常,执行信息通过参数输出
     *
     * @param entity   实体对象
     * @param outParam 输出执行成功或失败信息,执行成功时输出执行记录主键
     * @param <T>      实体类型约束
     * @return影响行数
     */
    @Override
    public <T> int Save(T entity, OutValue outValue, OutParam outParam) throws Exception {
        int row = 0;
        PreparedStatement stmt = null;
        boolean innerT = false;     //标识是否内部开启事务
        String sql = "";
        try {
            //根据实体对象获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取插入SQL语句
            sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetInsertSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, hash, false);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行插入SQL:" + sql + ";SQL参数:" + JRT.Core.Util.JsonUtil.Object2Json(hash.GetParam()));
            //获取ID列
            String idKey = tableInfo.ID.Key;
            //声明式SQL,并设置参数
            if (!idKey.isEmpty()) {
                stmt = Manager().Connection().prepareStatement(sql, new String[]{idKey});
            } else {
                stmt = Manager().Connection().prepareStatement(sql);
            }
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            ResultSet rowID = stmt.getGeneratedKeys();
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            //保存成功返回记录主键,返回影响记录数 1
            if (row == 1) {
                if (rowID.next() && (!idKey.isEmpty())) {
                    outValue.Value = rowID.getInt(idKey);
                    //设置RowID到实体
                    JRT.Core.Util.ReflectUtil.SetObjValue(entity, tableInfo.ID.Key, rowID.getInt(idKey));
                    //尝试把数据推入缓存队列
                    Manager().TryPushToCache(entity, "A");
                }
            } else {
                outParam.Code = OutStatus.ERROR;
                outParam.Message = "保存数据失败,执行保存返回:" + row;
            }
            return row;
        } catch (Exception ex) {
            outParam.Code = OutStatus.ERROR;
            //操作异常,判断如果开启事务,则回滚事务
            if (Manager().Hastransaction) {
                if (!Manager().RollTransaction()) {
                    outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + "执行SQL:" + sql;
        }
        //操作结束释放资源
        finally {
            if (stmt != null) {
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
                Manager().Close();
            }
        }
        return row;
    }


/**
     * 更新实体对象
     *
     * @param entity        实体对象
     * @param param         更新条件,有条件就按条件更新,没有条件就按主键更新
     * @param outParam      输出执行成功或失败的信息
     * @param updateColName 更新属性名集合,无属性则更新实体的所有属性
     * @param joiner        连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and
     * @param operators     操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=
     * @param <T>           类型限定符
     * @return 影响行数
     */
    @Override
    public <T> int Update(T entity, HashParam param, OutParam outParam, List<String> updateColName, List<String> joiner, List<String> operators) throws Exception {
        PreparedStatement stmt = null;
        if (outParam == null) outParam = new OutParam();
        int row = 0;
        boolean innerT = false;     //标识是否内部开启事务
        try {
            //根据实体获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取更新的SQL语句
            String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetUpdateSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, updateColName, joiner, operators, hash);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行更新SQL:" + sql);
            //声明式SQL,并且设置参数
            stmt = Manager().Connection().prepareStatement(sql);
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            if (row == 1) {
                //尝试把数据推入缓存队列
                Manager().TryPushToCache(entity, "U");
            }
            outParam.Code = OutStatus.SUCCESS;
            outParam.Message = "更新数据成功。";
            return row;
        } catch (Exception ex) {
            //操作异常,判断如果开启了事务,就回滚事务
            outParam.Code = OutStatus.ERROR;
            if (Manager().Hastransaction) {
                if (!Manager().RollTransaction()) {
                    outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "更新数据失败!" + ex.getCause().getMessage();
        }
        //操作结束释放资源
        finally {
            if (stmt != null) {
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
                Manager().Close();
            }
        }
        return row;
    }


/**
     * 根据条件删除记录
     *
     * @param entity    实体对象
     * @param param     删除条件,有条件按条件删除,没有条件按主键删除
     * @param outParam  输出执行成功或失败的信息
     * @param joiner    多条件逻辑连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and
     * @param operators 操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=
     * @param <T>       类型限定符
     * @return 影响行数
     */
    @Override
    public <T> int Remove(T entity, HashParam param, OutParam outParam, List<String> joiner, List<String> operators) throws Exception {
        PreparedStatement stmt = null;
        if (outParam == null) outParam = new OutParam();
        int row = 0;
        try {
            //根据实体对象获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取删除SQL语句
            String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetDeleteSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, joiner, operators, hash);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行删除SQL:" + sql);
            //声明式SQL,并设置参数
            stmt = Manager().Connection().prepareStatement(sql);
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            if (row == 1) {
                //尝试把数据推入缓存队列
                Manager().TryPushToCache(entity, "D");
            }
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            outParam.Code = OutStatus.SUCCESS;
            outParam.Message = "删除数据成功。";
            return row;
        } catch (Exception ex) {
            //操作异常,判断如果开启了事务,就回滚事务
            outParam.Code = OutStatus.ERROR;
            if (Manager().Hastransaction) {
                if (!Manager().RollTransaction()) {
                    outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "更新数据失败!" + ex.getCause().getMessage();
        }
        //操作结束释放资源
        finally {
            if (stmt != null) {
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
                Manager().Close();
            }
        }
        return row;
    }

/**
     * 通过主键查询数据,带缓存的查询,用来解决关系库的复杂关系数据获取,顶替Cache的$g
     *
     * @param model 实体
     * @param id    主键
     * @param <T>
     * @return
     * @throws Exception
     */
    public <T> T DolerGet(T model, Object id) throws Exception {
        T ret = GlobalManager.DolerGet(model, id);
        //命中缓存直接返回
        if (ret != null) {
            return ret;
        }
        else {
            //调用数据库查询
            ret = GetById(model, id);
            //找到数据,推入缓存
            if(ret!=null) {
                ECPDto dto = new ECPDto();
                dto.Cmd = "A";
                dto.Model = ret.getClass().getSimpleName();
                dto.Data = JRT.Core.Util.JsonUtil.Object2Json(ret);
                //通知存入缓存
                GlobalManager.InCache(dto);
            }
        }
        return ret;
    }

/**
     * 通过主键查询数据,主业务数据没找到会按历史切割数量找历史表
     *
     * @param model
     * @param id
     * @param <T>
     * @return
     * @throws Exception
     */
    public <T> T GetById(T model, Object id) throws Exception {
        JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);
        List<ParamDto> param = new ArrayList<>();
        ParamDto p = new ParamDto();
        p.Key = tableInfo.ID.Key;
        p.Value = id;
        param.add(p);
        //创建实体集合
        List<T> lstObj = FindAll(model, param, "", -1, -1, "", null, null);
        //结果为空,返回一个新建的对象
        if (lstObj.size() == 0) {
            //从历史表取数据
            String HisTableName = tableInfo.TableInfo.HisTableName();
            if (!HisTableName.isEmpty()) {
                int cutNum=0;
                //指定了切割列按切割列切割
                if(!tableInfo.TableInfo.CutHisColName().isEmpty())
                {
                    cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());
                }
                else
                {
                    cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());
                }
                //除以历史页大小算到数据该放入哪个历史表
                int hisNum = cutNum/HistoryPage;
                //分割所有历史实体
                String[] HisTableNameArr = HisTableName.split("^");
                //存放页小于所有历史表数据就做移动
                if (hisNum < HisTableNameArr.length) {
                    String HisModelName = HisTableNameArr[hisNum];
                    //得到历史表的实体
                    Class cHis = GetTypeByName(HisModelName);
                    //克隆得到历史表的对象
                    Object hisData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);
                    //创建实体集合
                    List<T> lstHisObj = DolerFindAll(0,hisData, param, "", -1, -1, "", null, null);
                    //结果为空,返回一个新建的对象
                    if (lstHisObj.size() > 0) {
                        return lstHisObj.get(0);
                    }
                }
            }
            return null;
        }
        //否则返回第一个实体
        else {
            return lstObj.get(0);
        }
    }

    /**
     * 把数据安装维护的历史表大小移入历史表
     *
     * @param model 实体数据
     * @param <T>   泛型
     * @return 是否成功
     * @throws Exception
     */
    public <T> boolean MoveToHistory(T model) throws Exception {
        JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);
        String HisTableName = tableInfo.TableInfo.HisTableName();
        if (!HisTableName.isEmpty()) {
            //分割所有历史实体
            String[] HisTableNameArr = HisTableName.split("^");
            int cutNum=0;
            //指定了切割列按切割列切割
            if(!tableInfo.TableInfo.CutHisColName().isEmpty())
            {
                cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());
            }
            else
            {
                cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());
            }
            //除以历史页大小算到数据该放入哪个历史表
            int hisNum = cutNum/HistoryPage;
            //存放页小于所有历史表数据就做移动
            if (hisNum < HisTableNameArr.length) {
                String HisModelName = HisTableNameArr[hisNum];
                //得到历史表的实体
                Class cHis = GetTypeByName(HisModelName);
                //克隆得到历史表的对象
                Object newData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);
                OutParam out = new OutParam();
                //保存历史数据
                int saveRet = Save(newData, out);
                if (saveRet == 1) {
                    saveRet = Remove(model, out);
                }
                if (saveRet == 1) {
                    return true;
                }
            }
        }
        return false;
    }

连接管理类调整,根据是否带事务在操作执行成功后把数据推入ECP队列供Global管理器往主服务推送分发

package JRT.DAL.ORM.DBUtility;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import JRT.DAL.ORM.DBUtility.C3P0Util;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import JRT.DAL.ORM.DBUtility.IDbFactory;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 连接和事务管理
 */
public class DBManager {
    /**
     * 驱动名称
     */
    private String factoryName="";

    /**
     * 当前对象的驱动
     */
    private IDbFactory factory=null;


    /**
     * 存数据库连接对象
     */
    private Connection connection=null;

    /**
     * 要转入缓存的临时数据
     */
    private List<ECPDto> ToEcpTmp=null;

    /**
     * 尝试把数据推入缓存队列
     * @param obj 对象
     * @param Oper 操作 A:添加  U:更新  D:删除
     * @throws Exception
     */
    public void TryPushToCache(Object obj,String Oper) throws Exception
    {
        ECPDto dto=new ECPDto();
        dto.Cmd=Oper;
        dto.Model=obj.getClass().getSimpleName();
        dto.Data=JRT.Core.Util.JsonUtil.Object2Json(obj);
        //有事务就推缓存
        if(Hastransaction=true)
        {
            if(ToEcpTmp==null)
            {
                ToEcpTmp=new ArrayList<>();
            }
            ToEcpTmp.add(dto);
        }
        else
        {
            //没事务的直接推入缓存队列
            JRT.DAL.ORM.Global.ECPManager.InECPToQuen(dto);
        }
    }

    /**
     * 为每个数据库驱动存储工厂
     */
    private static ConcurrentHashMap<String, IDbFactory> hsFact = new ConcurrentHashMap<>();

    /**
     * 为每个数据库驱动存储连接池
     */
    private static ConcurrentHashMap<String, ComboPooledDataSource> hsPoll = new ConcurrentHashMap<>();

    /**
     * 得到驱动对象
     * @param factoryName
     * @return
     */
    public IDbFactory GetIDbFactory(String factoryName)
    {
        if(factory==null)
        {
            factory=hsFact.get(factoryName);
        }
        return factory;
    }


    /**
     * 尝试初始化连接池
     * @param factoryName
     */
    public static void TryInitConnPool(String factoryName) throws Exception
    {
        if(factoryName=="")
        {
            factoryName="LisMianDbFactory";
        }
        if(!hsPoll.containsKey(factoryName))
        {
            IDbFactory factory=JRT.Core.Context.ObjectContainer.GetTypeObject(factoryName);
            hsPoll.put(factoryName,C3P0Util.GetConnPool(factory));
            if(!hsFact.containsKey(factoryName))
            {
                hsFact.put(factoryName,factory);
            }
        }
    }

    /**
     * 构造函数
     * @param factName 驱动配置名称
     * @throws Exception
     */
    public DBManager(String factName) throws Exception
    {
        factoryName=factName;
        TryInitConnPool(factoryName);
    }

    /**
     * 存数据库连接对象
     */
    public Connection Connection() throws Exception
    {
        if(connection==null)
        {
            connection=hsPoll.get(factoryName).getConnection();
        }
        return connection;
    }

    /**
     * 标识是否开启事务
     */
    public boolean Hastransaction = false;

    /**
     * 存储开启多次事务的保存点,每次调用BeginTransaction开启事务是自动创建保存点
     */
    public LinkedList<Savepoint> Transpoints = new LinkedList<Savepoint>();

    /**
     * 获取开启的事务层级
     * @return
     */
    public int GetTransactionLevel()
    {
        return this.Transpoints.size();
    }

    /**
     * 释放数据库连接
     * @return true成功释放,false释放失败
     */
    public boolean Close() throws Exception
    {
        if(connection!=null)
        {
            connection.setAutoCommit(true);
            connection.close();
        }
        connection=null;
        return true;
    }


    /**
     * 此方法开启事务
     * @return  true开启事务成功,false开始事务失败
     */
    public boolean BeginTransaction() throws Exception
    {
        try
        {
            this.Connection().setAutoCommit(false);
            this.Hastransaction = true;
            Savepoint savepoint = this.Connection().setSavepoint();
            Transpoints.addLast(savepoint);
            return true;
        }

        catch (SQLException sqle)
        {
            JRT.Core.Util.LogUtils.WriteExceptionLog("开启事务失败!" + sqle.getMessage(), sqle);
        }
        return false;
    }

    /**
     * 回滚上一层事务
     * @return true回滚事务成功,false回滚事务失败
     */
    public boolean RollTransaction() throws Exception
    {
        //删除临时数据
        if(ToEcpTmp!=null) {
            ToEcpTmp.clear();
            ToEcpTmp=null;
        }
        //未开启事务时,算回滚事务成功
        if (!this.Hastransaction)
        {
            return true;
        }
        try
        {
            if (this.Transpoints.size() == 0)
            {
                this.Connection().rollback();
                this.Hastransaction = false;
            }
            else
            {
                Savepoint point = this.Transpoints.poll();
                this.Connection().rollback(point);
            }
            return true;
        }
        catch (SQLException sqle)
        {
            JRT.Core.Util.LogUtils.WriteExceptionLog("事务回滚失败!" + sqle.getMessage(),sqle);
            throw sqle;
        }
        finally
        {
            if (!this.Hastransaction)
            {
                Close();
            }
        }
    }

    /**
     * 回滚开启的全部事务
     * @return true回滚事务成功,false回滚事务失败
     */
    public boolean RollTransactionAll() throws Exception
    {
        //删除临时数据
        if(ToEcpTmp!=null) {
            ToEcpTmp.clear();
            ToEcpTmp=null;
        }
        //未开启事务时,算回滚事务成功
        if (!this.Hastransaction)
        {
            return true;
        }
        try
        {
            this.Connection().rollback();
            this.Hastransaction = false;
            return true;
        }
        catch (SQLException sqle)
        {
            JRT.Core.Util.LogUtils.WriteExceptionLog("回滚所有事务层级失败!" + sqle.getMessage(),sqle);
            throw sqle;
        }
        finally
        {
            Close();
        }
    }

    /**
     * 提交事务
     * @return true提交事务成功,false提交事务失败
     */
    public boolean CommitTransaction() throws Exception
    {
        try
        {
            //临时数据推入缓存
            if(ToEcpTmp!=null)
            {
                for(ECPDto obj:ToEcpTmp)
                {
                    //没事务的直接推入缓存队列
                    JRT.DAL.ORM.Global.ECPManager.InECPToQuen(obj);
                }
            }
            this.Connection().commit();
            this.Hastransaction = false;
            return true;
        }
        catch (SQLException sqle)
        {
            JRT.Core.Util.LogUtils.WriteExceptionLog("提交事务失败!" + sqle.getMessage(),sqle);
        }
        finally
        {
            //提交事务,不论成功与否,释放数据库连接
            try
            {
                Close();
            }
            catch (Exception ex)
            {

            }
        }
        return false;
    }

}





增加的配置
在这里插入图片描述

这样就有一个理论上比较靠谱的缓存机制了,业务用SQL查到主列表数据后,调用DolerGet的获得各种周边相关数据来组装给前台返回,就不用每个写业务的人自己考虑写复杂的级联查询数据库受不了,自己缓存数据时候缓存是否可靠,自己不缓存数据时候调用太多数据库交互又慢的问题。DolerGet基本可以满足比较无脑的多维取数据组装的要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/200870.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

局部性原理和伪共享

CPU Cache CPU Cache可以理解为CPU内部的高速缓存。CPU从内存读取数据时&#xff0c;将要读取的数据及其相邻地址的数据&#xff0c;即至少一个Cache Line&#xff0c;写入Cache&#xff0c;以便后续访问时提高读取速度。 CPU存在多级Cache&#xff0c;级别最高的离CPU最近&a…

实现电商平台与营销系统无缝集成:雅座的无代码开发与API连接

无代码开发&#xff1a;营销的新引擎 在数字化转型的浪潮中&#xff0c;无代码开发已成为企业提升效率、减少成本的新引擎。这种开发方式允许非技术人员通过图形界面构建应用程序&#xff0c;无需编写代码即可实现复杂功能。这对于营销、广告推广以及用户运营等业务尤为重要&a…

贪心 53. 最大子序和 122.买卖股票的最佳时机 II

53. 最大子序和 题目&#xff1a; 给定一个数组&#xff0c;有正有负&#xff0c;找出一个连续子序列的总和最大&#xff08;子数组最少一个&#xff09; 暴力思路&#xff1a; 双层for循环&#xff0c;记录每一次可能的子序列的总和&#xff0c;初始为整数最小值&#xff…

Go语言实现大模型分词器tokenizer

文章目录 前言核心结构体定义构造函数文本初始处理组词构建词组索引训练数据编码解码打印状态信息运行效果总结 前言 大模型的tokenizer用于将原始文本输入转化为模型可处理的输入形式。tokenizer将文本分割成单词、子词或字符&#xff0c;并将其编码为数字表示。大模型的toke…

ArkTS-取消标题与自定义标题栏

文章目录 取消标头自定义标题栏导入Resources自定义跳转动画关于底部tabBar导航文本输入(TextInput/TextArea)自定义样式添加事件可以是onChange可以是onSubmit List列表组件设置主轴方向 网格布局服务卡片-获取地理位置页面获取地理位置服务卡片获取地理位置 可以先看看&#…

wvp 视频监控平台抓包分析

抓包时机 下面的抓包时机是抓包文件最新&#xff0c;但是最有用的包 选择网卡开始抓包 如果之前已经选择网卡&#xff0c;直接开始抓包 停止抓包 重新抓包 sip播放过程分析 过滤条件 tcp.port 5060 and sip 可以看到有这些包 选择任何一个 &#xff0c;戍边右键--追踪流--…

【批处理常用命令及用法大全】

文章目录 1 echo 和 回显控制命令2 errorlevel程序返回码3 dir显示目录中的文件和子目录列表4 cd更改当前目录5 md创建目录6 rd删除目录7 del删除文件8 ren文件重命名9 cls清屏10 type显示文件内容11 copy拷贝文件12 title设置cmd窗口的标题13 ver显示系统版本14 label 和 vol设…

加密挖矿、AI发展刺激算力需求激增!去中心化算力时代已来临!

2009年1月3日&#xff0c;中本聪在芬兰赫尔辛基的一个小型服务器上挖出了比特币的创世区块&#xff0c;并获得了50BTC的出块奖励。自加密货币诞生第一天起&#xff0c;算力一直在行业扮演非常重要的角色。行业对算力的真实需求&#xff0c;也极大推动了芯片厂商的发展&#xff…

matlab三维地形图

matlab三维地形图 %%%%—————Code to draw 3D bathymetry—————————— %-------Created by bobo,10/10/2021-------------------- clear;clc;close all; ncdisp E:\data\etopo\scs_etopo.nc filenmE:\data\etopo\scs_etopo.nc; londouble(ncread(filenm,lon)); lat…

【深度学习笔记】06 softmax回归

06 softmax回归 softmax运算损失函数对数似然Fashion-MNIST数据集读取数据集读取小批量整合所有组件 softmax回归的从零开始实现初始化模型参数定义softmax操作定义模型定义损失函数分类精度训练预测 softmax回归的简洁实现 softmax运算 softmax函数能够将未规范化的预测变换为…

C语言——实现一个计算m~n(m<n)之间所有整数的和的简单函数。

#include <stdio.h>int sum(int m, int n) {int i;int sum 0;for ( i m; i <n; i){sum i;}return sum;}int main() { int m, n;printf("输入m和n&#xff1a;\n");scanf("%d,%d", &m, &n);printf("sum %d\n", sum(m, n)…

每日一题:LeetCode-202.面试题 08.06. 汉诺塔问题

每日一题系列&#xff08;day 07&#xff09; 前言&#xff1a; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f50e…

一款LED段码显示屏驱动芯片方案

一、基本概述 TM1620是一种LED&#xff08;发光二极管显示器&#xff09;驱动控制专用IC,内部集成有MCU数字接口、数据锁存器、LED驱动等电路。本产品质量可靠、稳定性好、抗干扰能力强。 二、基本特性 采用CMOS工艺 显示模式&#xff08;8段6位&#xff5e;10段4位&#xff…

【寒武纪(6)】MLU推理加速引擎MagicMind,最佳实践(二)混合精度

混合精度在精度损失范围内实现数倍的性能提升。 支持的量化特性 构建混合精度的流程 构建混合精度的流程如下&#xff0c;支持浮点或半精度编程&#xff0c;以及量化精度编程两种方式。 浮点或半精度 无需提供tensor分布量化编程需要设置tensor分布。 网络粒度和算子粒度的设…

LVS-NAT实验

实验前准备&#xff1a; LVS负载调度器&#xff1a;ens33&#xff1a;192.168.20.11 ens34&#xff1a;192.168.188.3 Web1节点服务器1&#xff1a;192.168.20.12 Web2节点服务器2&#xff1a;192.168.20.13 NFS服务器&#xff1a;192.168.20.14 客户端&#xff08;win11…

智能优化算法应用:基于布谷鸟算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于布谷鸟算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于布谷鸟算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.布谷鸟算法4.实验参数设定5.算法结果6.参考文献7.…

Unity中Shader变体优化

文章目录 前言一、在Unity中查看变体个数&#xff0c;以及有哪些变体二、若使用预定义的变体太多&#xff0c;我们只使用其中的几个变体&#xff0c;我们该怎么做优化一&#xff1a;可以直接定义需要的那个变体优化二&#xff1a;使用 skip_variants 剔除不需要的变体 三、变体…

TikTok如何破解限流?真假限流如何分辨?速来自测

Tiktok是目前增长较快的社交平台&#xff0c;也是中外年轻一代首选的社交平台&#xff0c;许多出海品牌已经看到了TikTok营销的潜力&#xff0c;专注于通过视频、电商入驻来加入TikTok这片蓝海&#xff0c;加深品牌影响力&#xff0c;获得变现。 然而TikTok新手往往都会遇到一…

基于PHP的校园兼职系统的设计与开发

基于PHP的校园兼职系统的设计与开发 摘要&#xff1a;从古代至今&#xff0c;教育都是国家培养人才的手段&#xff0c;在古代教育往往都是课堂式教育&#xff0c;在课堂内老师教导学生学习&#xff0c;而随着时间的推移&#xff0c;越来越多的在校大学生已经不满足于只在课堂上…

【数据库】基于索引的扫描算法,不同类型索引下的选择与连接操作,不同的代价及优化

基于索引的算法 ​专栏内容&#xff1a; 手写数据库toadb 本专栏主要介绍如何从零开发&#xff0c;开发的步骤&#xff0c;以及开发过程中的涉及的原理&#xff0c;遇到的问题等&#xff0c;让大家能跟上并且可以一起开发&#xff0c;让每个需要的人成为参与者。 本专栏会定期更…