Java项目--仿RabbitMQ的消息队列--网络通信协议设计

目录

一、引言

二、设计

三、代码

1.Request

2.Response

3.BasicArguments

4.BasicReturns

四、方法类

1.创建交换机

2.删除交换机 

3.创建队列 

4.删除队列 

 5.创建绑定

6.删除绑定 

7.消息发布 

8.消费消息 

9.集中返回 

五、实现Broker Server类

六、实现连接

  1.connectionFactory类

2.connection类

3.channel类

七、总结


一、引言

  本篇文章就介绍一下本次项目的最后一个大的部分,网络通信协议的设计。

二、设计

  生产者和消费者都是客户端,都需要通过网络和Broker Server进行通信。

  此处使用TCP协议,来作为通信的底层协议,同时在这个基础上自定义应用层协议,完成客户端对服务器这边功能的远程调用。

  

 

三、代码

1.Request

public class Request {
    private int type;
    private int length;
    private byte[] payload;

    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getPayload() {
        return payload;
    }

    public void setPayload(byte[] payload) {
        this.payload = payload;
    }
}

2.Response

public class Response {
    private int type;
    private int length;
    private byte[] payload;

    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getPayload() {
        return payload;
    }

    public void setPayload(byte[] payload) {
        this.payload = payload;
    }
}

3.BasicArguments

/*
Request的payload
 */
public class BasicArguments implements Serializable {
    protected String rid;
    protected String channelId;

    public String getRid() {
        return rid;
    }

    public void setRid(String rid) {
        this.rid = rid;
    }

    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }
}

4.BasicReturns

/*
Response的payload
 */
public class BasicReturns implements Serializable {
    protected String rid;
    protected String channelId;
    protected boolean ok;

    public String getRid() {
        return rid;
    }

    public void setRid(String rid) {
        this.rid = rid;
    }

    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public boolean isOk() {
        return ok;
    }

    public void setOk(boolean ok) {
        this.ok = ok;
    }
}

四、方法类

对于每个VirtualHost提供的方法都要有一个类表示对应的参数

1.创建交换机

public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private ExchangeType exchangeType;
    private boolean durable;
    private boolean autoDelete;
    private Map<String,Object> arguments;

    
}

2.删除交换机 

public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
    private String exchangeName;
}

3.创建队列 

public class QueueDeclareArguments extends BasicArguments implements Serializable {
    private String queueName;
    private boolean durable;
    private boolean exclusive;
    private boolean autoDelete;
    private Map<String,Object> arguments;
}

4.删除队列 

public class QueueDeleteArguments extends BasicArguments implements Serializable {
    private String queueName;
}

 5.创建绑定

public class QueueBindArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private String queueName;
    private String bindingKey;
}

6.删除绑定 

public class QueueUnBindArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private String queueName;
}

7.消息发布 

public class BasicPublishArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private String routingKey;
    private BasicProperties basicProperties;
    private byte[] body;
}

8.消费消息 

public class BasicConsumeArguments {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
}

9.集中返回 

public class SubScribeReturns extends BasicArguments implements Serializable {
    private String consumerTag;
    private BasicProperties basicProperties;
    private byte[] body;
}

五、实现Broker Server类

public class BrokerServer {
    private ServerSocket serverSocket = null;
    private VirtualHost virtualHost = new VirtualHost("default");

    private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String,Socket>();

    private ExecutorService executorService = null;

    private volatile boolean runnbale = true;

    public BrokerServer(int port) throws IOException {
        serverSocket = new ServerSocket(port);
    }

    public void start() throws IOException {
        System.out.println("[BrokerServer] 启动!");
        executorService = Executors.newCachedThreadPool();
        try{
            while (runnbale){
                Socket clientSocket = serverSocket.accept();
                executorService.submit(() ->{
                   processConnection(clientSocket);
                });
            }
        }catch (SocketException e){
            System.out.println("[BrokerServer]服务器停止运行!");
        }
    }

    public void stop() throws IOException {
        runnbale = false;
        executorService.shutdownNow();
        serverSocket.close();
    }

    private void processConnection(Socket clientSocket){
        try(InputStream inputStream = clientSocket.getInputStream(); OutputStream outputStream = clientSocket.getOutputStream()){
            try(DataInputStream dataInputStream = new DataInputStream(inputStream)
                ; DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                while (true){
                    Request request = readRequest(dataInputStream);
                    Response response = process(request,clientSocket);
                    writeResponse(dataOutputStream,response);
                }
            }catch (EOFException | SocketException e){
                System.out.println("[BrokerServer] connection关闭!客户端地址:"+clientSocket.getInetAddress().toString()
                        +"端口号:"+clientSocket.getPort());
            }
        }catch (Exception e){
            System.out.println("[BrokerServer] connection连接出现异常!");
            e.printStackTrace();
        }finally {
            try{
                clientSocket.close();
                clearClosedSession(clientSocket);
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }

    private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
        // 1.把request中的payload做初步解析
        BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
        System.out.println("[Request] rid="+basicArguments.getRid()+",channelId=" +basicArguments.getChannelId()+",type="
                +request.getType()+",length="+request.getLength());
        // 2.根据type的值区分要做什么操作
        boolean ok = true;
        if(request.getType()==0x1){
            // 创建channel
            sessions.put(basicArguments.getChannelId(),clientSocket);
            System.out.println("[BrokerServer]创建channel完成!channelId="+basicArguments.getChannelId());
        } else if (request.getType()==0x2) {
            // 销毁channel
            sessions.remove(basicArguments.getChannelId());
            System.out.println("[BrokerServer]销毁channel完成!channelId="+basicArguments.getChannelId());
        } else if (request.getType()==0x3) {
            // 创建交换机
            ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
            ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType()
                    ,arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());
        } else if (request.getType()==0x4) {
            ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
            ok = virtualHost.exchangeDelete(arguments.getExchangeName());
        } else if (request.getType()==0x5) {
            QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
            ok = virtualHost.queueDeclare(arguments.getQueueName(),arguments.isDurable(),arguments.isExclusive()
                    ,arguments.isAutoDelete(),arguments.getArguments());
        } else if (request.getType()==0x6) {
            QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
            ok = virtualHost.queueDelete(arguments.getQueueName());
        } else if (request.getType()==0x7) {
            QueueBindArguments arguments = (QueueBindArguments) basicArguments;
            ok = virtualHost.queueBind(arguments.getExchangeName(),arguments.getQueueName(),arguments.getBindingKey());
        } else if (request.getType()==0x8) {
            QueueUnBindArguments arguments = (QueueUnBindArguments) basicArguments;
            ok = virtualHost.queueUnBind(arguments.getExchangeName(),arguments.getQueueName());
        } else if (request.getType()==0x9) {
            BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
            ok = virtualHost.basicPublish(arguments.getExchangeName(),arguments.getRoutingKey()
                    ,arguments.getBasicProperties(),arguments.getBody());
        }else if (request.getType()==0xa){
            BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
            ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {
                @Override
                // 回调函数:把服务器收到的消息直接推送回对应的消费者客户端
                public void handleDelivery(String consumeTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                    // 根据consumeTag 其实是channelId 去sessions中查询,找到对应的socket对象
                    // 1.根据channelId找到socket对象
                    Socket clientSocket = sessions.get(consumeTag);
                    if(clientSocket==null || clientSocket.isClosed()){
                        throw new MqException("[BrokerServer]订阅消息的客户端已经关闭!");
                    }
                    // 2.构造响应数据
                    SubScribeReturns subScribeReturns = new SubScribeReturns();
                    subScribeReturns.setChannelId(consumeTag);
                    subScribeReturns.setRid("");  // 此处只有响应,没有请求,不需要去对应
                    subScribeReturns.setOk(true);
                    subScribeReturns.setConsumerTag(consumeTag);
                    subScribeReturns.setBasicProperties(basicProperties);
                    subScribeReturns.setBody(body);
                    byte[] payload = BinaryTool.toBytes(subScribeReturns);
                    Response response = new Response();
                    // 0xc 表示服务器给消费者客户端推送的消息数据
                    response.setType(0xc);
                    response.setLength(payload.length);
                    response.setPayload(payload);
                    // 3.把数据写回客户端
                    DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
                    writeResponse(dataOutputStream,response);
                }
            });
        } else if (request.getType()==0xb) {
            // 调用basicAck来确认消息
            BasicAckArguments arguments = (BasicAckArguments) basicArguments;
            ok = virtualHost.basicAck(arguments.getQueueName(),arguments.getMessageId());
        }else {
            // 当前的type是非法的
            throw new MqException("[BrokerServer] 未知的type!type="+request.getType());
        }
        // 3.构造响应
        BasicReturns basicReturns = new BasicReturns();
        basicReturns.setChannelId(basicArguments.getChannelId());
        basicReturns.setRid(basicArguments.getRid());
        basicReturns.setOk(ok);
        byte[] payload = BinaryTool.toBytes(basicReturns);
        Response response = new Response();
        response.setType(request.getType());
        response.setLength(payload.length);
        response.setPayload(payload);
        System.out.println("[Response] rid="+basicReturns.getRid()+",channelId="+basicReturns.getChannelId()
                +",type="+response.getType()+",length="+response.getLength());
        return response;
    }

    private void writeResponse(DataOutputStream dataOutputStream,Response response) throws IOException {
        dataOutputStream.writeInt(response.getType());
        dataOutputStream.writeInt(response.getLength());
        dataOutputStream.write(response.getPayload());
        dataOutputStream.flush();
    }

    private Request readRequest(DataInputStream dataInputStream) throws IOException {
        Request request = new Request();
        request.setType(dataInputStream.readInt());
        request.setLength(dataInputStream.readInt());
        byte[] payload = new byte[request.getLength()];
        int n = request.getLength();
        if(n!=request.getLength()){
            throw new IOException("读取请求格式出错!");
        }
        request.setPayload(payload);
        return request;
    }

    private void clearClosedSession(Socket clientSocket){
        List<String> toDeleteChannelId = new ArrayList<>();
        for(Map.Entry<String,Socket> entry:sessions.entrySet()){
            if(entry.getValue()==clientSocket){
                toDeleteChannelId.add(entry.getKey());
            }
        }
        for(String channelId:toDeleteChannelId){
            sessions.remove(channelId);
        }
        System.out.println("[BrokerServer] 清理session完成!被清理的channelId="+toDeleteChannelId);
    }
}

六、实现连接

  1.connectionFactory类

public class ConnectionFactory {
    private String host;
    private int port;

    public Connection newConnection(){
        Connection connection = new Connection(host,port);
        return connection;
    }
}

2.connection类

public class Connection {
    private Socket socket =null;
    private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();

    private InputStream inputStream;
    private OutputStream outputStream;
    private DataInputStream dataInputStream;
    private DataOutputStream dataOutputStream;

    private ExecutorService callbackPool = null;

    public Connection(String host,int port) throws IOException {
        socket = new Socket(host,port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);
        callbackPool = Executors.newFixedThreadPool(4);

        Thread t = new Thread(() ->{
           try {
               while(!socket.isClosed()){
                   Response response = readResponse();
                   dispatchResponse(response);
               }
           }catch (SocketException e){
               System.out.println("[Connection] 连接正常断开!");
           }catch (IOException | ClassNotFoundException | MqException e){
               System.out.println("[Connection] 连接异常断开!");
               e.printStackTrace();
           }
        });
    }

    public void close(){
        try {
            callbackPool.shutdownNow();
            channelMap.clear();
            dataOutputStream.close();
            dataInputStream.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
        if(response.getType()==0xc){
            SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
            Channel channel = channelMap.get(subScribeReturns.getChannelId());
            if(channel==null){
                throw new MqException("[Connection] 该消息对应的channel在客户端中不存在!channelId="+channel.getChannelId());
            }
            callbackPool.submit(() ->{
               try {
                   channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag()
                           , subScribeReturns.getBasicProperties(), subScribeReturns.getBody());
               }catch (MqException | IOException e){
                   e.printStackTrace();
               }
            });
        }else {
            BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
            Channel channel = channelMap.get(basicReturns.getChannelId());
            if(channel==null){
                throw new MqException("[Connection] 该消息对应的channel在客户端中不存在!channelId="+channel.getChannelId());
            }
            channel.putReturns(basicReturns);
        }
    }

    public void writeRequest(Request request) throws IOException {
        dataOutputStream.writeInt(request.getType());
        dataOutputStream.writeInt(request.getLength());
        dataOutputStream.write(request.getPayload());
        dataOutputStream.flush();
        System.out.println("[Connection]发送请求!type="+request.getType()+",length="+request.getLength());
    }

    public Response readResponse() throws IOException {
        Response response = new Response();
        response.setType(dataInputStream.readInt());
        response.setLength(dataInputStream.readInt());
        byte[] payload = new byte[response.getLength()];
        int n = dataInputStream.read(payload);
        if(n!=response.getLength()){
            throw new IOException("响应的数据不完整!");
        }
        response.setPayload(payload);
        System.out.println("[Connection] 收到响应!type="+response.getType()+",length="+response.getType());
        return response;
    }

    public Channel createChannel(){
        String channelId = "C-"+ UUID.randomUUID().toString();
        Channel channel = new Channel(channelId,this);
        System.out.println(channelId);
        channelMap.put(channelId,channel);
        boolean ok = channel.createChannel();
        if(!ok){
            channelMap.remove(channelId);
            return null;
        }
        return channel;
    }
}

3.channel类

public class Channel {
    private String channelId;
    private Connection connection;
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    private Consumer consumer = null;

    public Channel(String channelId,Connection connection){
        this.channelId = channelId;
        this.connection = connection;
    }

    public boolean createChannel() throws IOException {
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        System.out.println(basicArguments.getChannelId());
        System.out.println(basicArguments.getRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);
        System.out.println(payload);

        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns =waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }


    private String generateRid(){
        return "R-"+ UUID.randomUUID().toString();
    }

    private BasicReturns waitResult(String rid){
        BasicReturns basicReturns =null;
        while((basicReturns = basicReturnsMap.get(rid))==null){
            synchronized (this){
                try {
                    wait();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
        basicReturnsMap.remove(rid);
        return basicReturns;
    }

    public void putReturns(BasicReturns basicReturns){
        basicReturnsMap.put(basicReturns.getRid(),basicReturns);
        synchronized (this){
            notifyAll();
        }
    }

    public boolean close() throws IOException {
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setRid(generateRid());
        basicArguments.setChannelId(channelId);
        byte[] payload = BinaryTool.toBytes(basicArguments);
        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType
            , boolean durable, boolean autoDelete, Map<String,Object> arguments) throws IOException {
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        exchangeDeclareArguments.setAutoDelete(autoDelete);
        exchangeDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
        exchangeDeleteArguments.setRid(generateRid());
        exchangeDeleteArguments.setChannelId(channelId);
        exchangeDeleteArguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);
        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns =waitResult(exchangeDeleteArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete
            ,Map<String,Object> arguments) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setRid(generateRid());
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setDurable(durable);
        queueDeclareArguments.setExclusive(exclusive);
        queueDeclareArguments.setExclusive(autoDelete);
        queueDeclareArguments.setArguments(arguments);
        byte[] payload= BinaryTool.toBytes(queueDeclareArguments);
        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
        queueDeleteArguments.setRid(generateRid());
        queueDeleteArguments.setChannelId(channelId);
        queueDeleteArguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(queueDeleteArguments);
        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean queueBind(String exchangeName,String queueName,String bindingKey) throws IOException {
        QueueBindArguments queueBindArguments = new QueueBindArguments();
        queueBindArguments.setRid(generateRid());
        queueBindArguments.setChannelId(channelId);
        queueBindArguments.setExchangeName(exchangeName);
        queueBindArguments.setQueueName(queueName);
        queueBindArguments.setBindingKey(bindingKey);
        byte[] payload = BinaryTool.toBytes(queueBindArguments);
        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns =waitResult(queueBindArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean queueUnBind(String exchangeName,String queueName) throws IOException {
        QueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();
        queueUnBindArguments.setRid(generateRid());
        queueUnBindArguments.setChannelId(channelId);
        queueUnBindArguments.setExchangeName(exchangeName);
        queueUnBindArguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(queueUnBindArguments);
        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueUnBindArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties,byte[] body) throws IOException {
        BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
        basicPublishArguments.setRid(generateRid());
        basicPublishArguments.setChannelId(channelId);
        basicPublishArguments.setRoutingKey(routingKey);
        basicPublishArguments.setBasicProperties(basicProperties);
        basicPublishArguments.setBody(body);
        byte[] payload = BinaryTool.toBytes(basicPublishArguments);
        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean basicConsume(String queueName,boolean autoAck,Consumer consumer) throws MqException, IOException {
        if(this.consumer!=null){
            throw new MqException("[Channel] 已经设置过回调函数了!不能重复设置!");
        }
        this.consumer = consumer;
        BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
        basicConsumeArguments.setRid(generateRid());
        basicConsumeArguments.setChannelId(channelId);
        basicConsumeArguments.setConsumerTag(channelId);
        basicConsumeArguments.setQueueName(queueName);
        basicConsumeArguments.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toBytes(basicConsumeArguments);
        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
        return basicReturns.isOk();
    }


    public boolean basicAck(String queueName,String messageId) throws IOException {
        BasicAckArguments basicAckArguments = new BasicAckArguments();
        basicAckArguments.setRid(generateRid());
        basicAckArguments.setChannelId(channelId);
        basicAckArguments.setQueueName(queueName);
        basicAckArguments.setMessageId(messageId);
        byte[] payload = BinaryTool.toBytes(basicAckArguments);
        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicAckArguments.getRid());
        return basicReturns.isOk();
    }
}

七、总结

  本篇文章就是本次Java项目“模拟消息队列”的最后一个大的部分了,下一篇文章就是对所写的这个项目进行一个案例编写,然后对此项目进行扩展。感谢观看!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/939792.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL通过binlog日志进行数据恢复

记录一次阿里云MySQL通过binlog日志进行数据回滚 问题描述由于阿里云远程mysql没有做安全策略 所以服务器被别人远程攻击把数据库给删除&#xff0c;通过查看binlog日志可以看到进行了drop操作&#xff0c;下面将演示通过binlog日志进行数据回滚操作。 1、查询是否开始binlog …

王佩丰24节Excel学习笔记——第十二讲:match + index

【以 Excel2010 系列学习&#xff0c;用 Office LTSC 专业增强版 2021 实践】 【本章小技巧】 vlookup与match&#xff0c;index 相结合使用match,index 结合&#xff0c;快速取得引用的值扩展功能&#xff0c;使用match/index函数&#xff0c;结合照相机工具获取照片 一、回顾…

《Time Ghost》的制作:使用 DOTS ECS 制作更为复杂的大型环境

*基于 Unity 6 引擎制作的 demo 《Time Ghost》 开始《Time Ghost》项目时的目标之一是提升在 Unity 中构建大型户外环境的构建标准。为了实现这一目标&#xff0c;我们要有处理更为复杂的场景的能力、有足够的工具支持&#xff0c;同时它对引擎的核心图形、光照、后处理、渲染…

【考前预习】4.计算机网络—网络层

往期推荐 【考前预习】3.计算机网络—数据链路层-CSDN博客 【考前预习】2.计算机网络—物理层-CSDN博客 【考前预习】1.计算机网络概述-CSDN博客 目录 1.网络层概述 2.网络层提供的两种服务 3.分类编址的IPV4 4.无分类编址的IPV4—CIDR 5.IPV4地址应用规划 5.1使用定长子…

解决pip下载慢

使用pip下载大量安装包&#xff0c;下载速度太慢了 1、问题现象 pip安装包速度太慢 2、解决方案 配置国内源 vi /root/.config/pip/pip.conf[global] timeout 6000 index-url https://mirrors.aliyun.com/pypi/simple/ trusted-host mirrors.aliyun.com

【Linux】Linux权限管理:文件与目录的全面指南

在Linux系统中&#xff0c;权限管理是确保数据安全的关键。本文将为大家介绍Linux文件与目录的权限管理&#xff0c;帮助你理解如何设置和管理访问权限。无论你是新手还是有经验的用户&#xff0c;这里都将提供实用的技巧和知识&#xff0c;助你更好地掌握Linux环境。让我们一起…

【模型压缩】原理及实例

在移动智能终端品类越发多样的时代&#xff0c;为了让模型可以顺利部署在算力和存储空间都受限的移动终端&#xff0c;对模型进行压缩尤为重要。模型压缩&#xff08;model compression&#xff09;可以降低神经网络参数量&#xff0c;减少延迟时间&#xff0c;从而实现提高神经…

Android Stduio 2024版本设置前进和后退按钮显示在主界面

Android Studio 2024&#xff08;Ladybug&#xff09;安装后发现前进和后退按钮不显示在主界面的工具栏&#xff0c;且以前在View中设置的办法无效&#xff1a; Android Studio 2024&#xff08;Ladybug&#xff09;的设置方式&#xff1a; File->Settings->Appearance&…

MySQL数据库——门诊管理系统数据库数据表

门诊系统数据库his 使用图形化工具或SQL语句在简明门诊管理系统数据库his中创建数据表&#xff0c;数据表结构见表2-3-9&#xff5e;表2-3-15所示。 表2-3-9 department&#xff08;科室信息表&#xff09; 字段名称 数据类型 长度 是否为空 说明 dep_ID int 否 科室…

Ubuntu上如何部署Nginx?

环境&#xff1a; Unbuntu 22.04 问题描述&#xff1a; Ubuntu上如何部署Nginx&#xff1f; 解决方案&#xff1a; 在Ubuntu上部署Nginx是一个相对简单的过程&#xff0c;以下是详细的步骤指南。我们将涵盖安装Nginx、启动服务、配置防火墙以及验证安装是否成功。 1. 更新…

【从零开始入门unity游戏开发之——C#篇08】逻辑运算符、位运算符

文章目录 一、逻辑运算符1、**&&&#xff08;逻辑与&#xff09;**语法&#xff1a;示例&#xff1a; 2、**||&#xff08;逻辑或&#xff09;**语法&#xff1a;示例&#xff1a; 3、**!&#xff08;逻辑非&#xff09;**语法&#xff1a;示例&#xff1a; 4、**^&…

【Android开发】安装Android Studio(2023.1.1)

下载安装包 Android Studio2023.1.1百度云盘下载&#xff0c;提取码&#xff1a;6666https://pan.baidu.com/s/1vNJezi7aDOP0poPADcBZZg?pwd6666 安装Android Studio 2023.1.1 双击下载好的安装包 弹出界面点击下一步 继续点击【Next】 更改安装路径后继续点击【Next】 点…

.net winform 实现CSS3.0 泼墨画效果

效果图 代码 private unsafe void BlendImages1(Bitmap img1, Bitmap img2) {// 确定两个图像的重叠区域Rectangle rect new Rectangle(0, 0,Math.Min(img1.Width, img2.Width),Math.Min(img1.Height, img2.Height));// 创建输出图像&#xff0c;尺寸为重叠区域大小Bitmap b…

Linux下部署MySQL8.0集群 - 主从复制(一主两从)

目录 一、部署前准备 1、查看系统信息 # 查看系统版本 cat /etc/red* # 查看系统位数 getconf LONG_BIT[rootlocalhost ~]# cat /etc/red* CentOS Linux release 7.5.1804 (Core) [rootlocalhost ~]# getconf LONG_BIT 642、下载对应安装包 进入MySQL官网&#xff1a;https:…

编辑, 抽成组件

问题 错误思路&#xff1a; 1 dept不能修改&#xff0c; 用watch监听一下&#xff1a;赋值给新的变量进行修改&#xff0c; 问题&#xff1a; currentDept 发生改变&#xff0c; depth也发生了改变&#xff0c;因为是浅拷贝&#xff0c; 用了json.pase(json.stringify(value…

2009 ~ 2019 年 408【计算机网络】大题解析

2009 年 路由算法&#xff08;9’&#xff09; 讲解视频推荐&#xff1a;【BOK408真题讲解-2009年&#xff08;催更就退网版&#xff09;】 某网络拓扑如下图所示&#xff0c;路由器 R1 通过接口 E1 、E2 分别连接局域网 1 、局域网 2 &#xff0c;通过接口 L0 连接路由器 R2 &…

MySQL追梦旅途之慢查询分析建议

一、找到慢查询 查询是否开启慢查询记录 show variables like "%slow%";log_slow_admin_statements&#xff1a; 决定是否将慢管理语句&#xff08;如 ALTER TABLE 等&#xff09;记录到慢查询日志中。 log_slow_extra &#xff1a; MySQL 和 MariaDB 中的一个系…

进阶版 -- 某恋爱话术 app 的爬虫经历与思考(含脚本)

背景 承接前文&#xff0c;由于上一个app 爬出来的数据只有 1w 多条&#xff0c;感觉不是很过瘾 所以这次又找到了一个非破解版 app&#xff0c;数据量大概有 40w&#xff0c;安全等级直线上升 声明 本次爬虫是学习实践行为&#xff0c;获取到的数据均已在 24 小时内全部删…

深入理解 Linux 内核启动流程

目录 一、BIOS 与 Bootloader 1.BIOS&#xff08;Basic Input/Output System&#xff09; 2.Bootloader&#xff08;引导加载程序&#xff09; 二、内核初始化 1.解压内核映像 2.初始化硬件设备 3.建立内存管理系统 4.启动第一个进程&#xff08;init&#xff09; 三、…

Android笔记【19】

具体示例 run: val result someObject.run {// 这里可以使用 thisthis.someMethod() }let: val result someObject?.let {// 这里使用 itit.someMethod() }with: val result with(someObject) {// 这里使用 thissomeMethod() }apply: val obj SomeClass().apply {// 这里使…