Netty--数据通信和心跳检测

2018-03-01 12:23:18来源:cnblogs.com作者:Mr.years人点击

分享

数据通信

概述:

netty的ReadTimeOut实现方案3

服务端:

public class Server {    public static void main(String[] args) throws Exception{                EventLoopGroup pGroup = new NioEventLoopGroup();        EventLoopGroup cGroup = new NioEventLoopGroup();                ServerBootstrap b = new ServerBootstrap();        b.group(pGroup, cGroup)         .channel(NioServerSocketChannel.class)         .option(ChannelOption.SO_BACKLOG, 1024)         //设置日志         .handler(new LoggingHandler(LogLevel.INFO))         .childHandler(new ChannelInitializer<SocketChannel>() {            protected void initChannel(SocketChannel sc) throws Exception {                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());                sc.pipeline().addLast(new ReadTimeoutHandler(5));                 sc.pipeline().addLast(new ServerHandler());            }        });                ChannelFuture cf = b.bind(8765).sync();                cf.channel().closeFuture().sync();        pGroup.shutdownGracefully();        cGroup.shutdownGracefully();            }}

主要是加入sc.pipeline().addLast(new ReadTimeoutHandler(5)); 

客户端:

public class Client {    private static class SingletonHolder {        static final Client instance = new Client();    }    public static Client getInstance() {        return SingletonHolder.instance;    }    private EventLoopGroup group;    private Bootstrap b;    private ChannelFuture cf;    private Client() {        group = new NioEventLoopGroup();        b = new Bootstrap();        b.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))                .handler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel sc) throws Exception {                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());                        // 超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)                        sc.pipeline().addLast(new ReadTimeoutHandler(5));                        sc.pipeline().addLast(new ClientHandler());                    }                });    }    public void connect() {        try {            this.cf = b.connect("127.0.0.1", 8765).sync();            System.out.println("远程服务器已经连接, 可以进行数据交换..");        } catch (Exception e) {            e.printStackTrace();        }    }    public ChannelFuture getChannelFuture() {        if (this.cf == null) {            this.connect();        }        if (!this.cf.channel().isActive()) {            this.connect();        }        return this.cf;    }    public static void main(String[] args) throws Exception {        final Client c = Client.getInstance();        // c.connect();        ChannelFuture cf = c.getChannelFuture();        for (int i = 1; i <= 3; i++) {            Request request = new Request();            request.setId("" + i);            request.setName("pro" + i);            request.setRequestMessage("数据信息" + i);            cf.channel().writeAndFlush(request);            TimeUnit.SECONDS.sleep(4);        }        cf.channel().closeFuture().sync();        new Thread(new Runnable() {            @Override            public void run() {                try {                    System.out.println("进入子线程...");                    ChannelFuture cf = c.getChannelFuture();                    System.out.println(cf.channel().isActive());                    System.out.println(cf.channel().isOpen());                    // 再次发送数据                    Request request = new Request();                    request.setId("" + 4);                    request.setName("pro" + 4);                    request.setRequestMessage("数据信息" + 4);                    cf.channel().writeAndFlush(request);                    cf.channel().closeFuture().sync();                    System.out.println("子线程结束.");                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }).start();        System.out.println("断开连接,主线程结束..");    }}

主要看getChannelFuture这个方法,this.cf == null是第一次连接的时候用到的,!this.cf.channel().isActive() 是连接超时后重新发起连接用到的。

其他的代码:

public class ClientHandler extends ChannelHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        try {            Response resp = (Response) msg;            System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());        } finally {            ReferenceCountUtil.release(msg);        }    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}public class ServerHandler extends ChannelHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        Request request = (Request) msg;        System.out                .println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());        Response response = new Response();        response.setId(request.getId());        response.setName("response" + request.getId());        response.setResponseMessage("响应内容" + request.getId());        ctx.writeAndFlush(response);// .addListener(ChannelFutureListener.CLOSE);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}public final class MarshallingCodeCFactory {    /**     * 创建Jboss Marshalling解码器MarshallingDecoder     * @return MarshallingDecoder     */    public static MarshallingDecoder buildMarshallingDecoder() {        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");        //创建了MarshallingConfiguration对象,配置了版本号为5         final MarshallingConfiguration configuration = new MarshallingConfiguration();        configuration.setVersion(5);        //根据marshallerFactory和configuration创建provider        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);        //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);        return decoder;    }    /**     * 创建Jboss Marshalling编码器MarshallingEncoder     * @return MarshallingEncoder     */    public static MarshallingEncoder buildMarshallingEncoder() {        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");        final MarshallingConfiguration configuration = new MarshallingConfiguration();        configuration.setVersion(5);        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组        MarshallingEncoder encoder = new MarshallingEncoder(provider);        return encoder;    }}public class Request implements Serializable {    private static final long serialVersionUID = 1L;    private String id;    private String name;    private String requestMessage;    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public String getRequestMessage() {        return requestMessage;    }    public void setRequestMessage(String requestMessage) {        this.requestMessage = requestMessage;    }}public class Response implements Serializable {    private static final long serialVersionUID = 1L;    private String id;    private String name;    private String responseMessage;    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public String getResponseMessage() {        return responseMessage;    }    public void setResponseMessage(String responseMessage) {        this.responseMessage = responseMessage;    }}

 心跳检测

概述:

代码示例:

public class Server {    public static void main(String[] args) throws Exception {        EventLoopGroup pGroup = new NioEventLoopGroup();        EventLoopGroup cGroup = new NioEventLoopGroup();        ServerBootstrap b = new ServerBootstrap();        b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)                // 设置日志                .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {                    protected void initChannel(SocketChannel sc) throws Exception {                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());                        sc.pipeline().addLast(new ServerHeartBeatHandler());                    }                });        ChannelFuture cf = b.bind(8765).sync();        cf.channel().closeFuture().sync();        pGroup.shutdownGracefully();        cGroup.shutdownGracefully();    }}public class ServerHeartBeatHandler extends ChannelHandlerAdapter {    /** key:ip value:auth */    private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();    private static final String SUCCESS_KEY = "auth_success_key";    static {        AUTH_IP_MAP.put("192.168.1.200", "1234");    }    private boolean auth(ChannelHandlerContext ctx, Object msg) {        // System.out.println(msg);        String[] ret = ((String) msg).split(",");        String auth = AUTH_IP_MAP.get(ret[0]);        if (auth != null && auth.equals(ret[1])) {            ctx.writeAndFlush(SUCCESS_KEY);            return true;        } else {            ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);            return false;        }    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof String) {            auth(ctx, msg);        } else if (msg instanceof RequestInfo) {            RequestInfo info = (RequestInfo) msg;            System.out.println("--------------------------------------------");            System.out.println("当前主机ip为: " + info.getIp());            System.out.println("当前主机cpu情况: ");            HashMap<String, Object> cpu = info.getCpuPercMap();            System.out.println("总使用率: " + cpu.get("combined"));            System.out.println("用户使用率: " + cpu.get("user"));            System.out.println("系统使用率: " + cpu.get("sys"));            System.out.println("等待率: " + cpu.get("wait"));            System.out.println("空闲率: " + cpu.get("idle"));            System.out.println("当前主机memory情况: ");            HashMap<String, Object> memory = info.getMemoryMap();            System.out.println("内存总量: " + memory.get("total"));            System.out.println("当前内存使用量: " + memory.get("used"));            System.out.println("当前内存剩余量: " + memory.get("free"));            System.out.println("--------------------------------------------");            ctx.writeAndFlush("info received!");        } else {            ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);        }    }}public class Client {    public static void main(String[] args) throws Exception {        EventLoopGroup group = new NioEventLoopGroup();        Bootstrap b = new Bootstrap();        b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {            @Override            protected void initChannel(SocketChannel sc) throws Exception {                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());                sc.pipeline().addLast(new ClienHeartBeattHandler());            }        });        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();        cf.channel().closeFuture().sync();        group.shutdownGracefully();    }}public class ClienHeartBeattHandler extends ChannelHandlerAdapter {    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);    private ScheduledFuture<?> heartBeat;    // 主动向服务器发送认证信息    private InetAddress addr;    private static final String SUCCESS_KEY = "auth_success_key";    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        addr = InetAddress.getLocalHost();        String ip = addr.getHostAddress();        String key = "1234";        // 证书        String auth = ip + "," + key;        ctx.writeAndFlush(auth);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        try {            if (msg instanceof String) {                String ret = (String) msg;                if (SUCCESS_KEY.equals(ret)) {                    // 握手成功,主动发送心跳消息                    this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2,                            TimeUnit.SECONDS);                    System.out.println(msg);                } else {                    System.out.println(msg);                }            }        } finally {            ReferenceCountUtil.release(msg);        }    }    private class HeartBeatTask implements Runnable {        private final ChannelHandlerContext ctx;        public HeartBeatTask(final ChannelHandlerContext ctx) {            this.ctx = ctx;        }        @Override        public void run() {            try {                RequestInfo info = new RequestInfo();                // ip                info.setIp(addr.getHostAddress());                Sigar sigar = new Sigar();                // cpu prec                CpuPerc cpuPerc = sigar.getCpuPerc();                HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();                cpuPercMap.put("combined", cpuPerc.getCombined());                cpuPercMap.put("user", cpuPerc.getUser());                cpuPercMap.put("sys", cpuPerc.getSys());                cpuPercMap.put("wait", cpuPerc.getWait());                cpuPercMap.put("idle", cpuPerc.getIdle());                // memory                Mem mem = sigar.getMem();                HashMap<String, Object> memoryMap = new HashMap<String, Object>();                memoryMap.put("total", mem.getTotal() / 1024L);                memoryMap.put("used", mem.getUsed() / 1024L);                memoryMap.put("free", mem.getFree() / 1024L);                info.setCpuPercMap(cpuPercMap);                info.setMemoryMap(memoryMap);                ctx.writeAndFlush(info);            } catch (Exception e) {                e.printStackTrace();            }        }        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {            cause.printStackTrace();            if (heartBeat != null) {                heartBeat.cancel(true);                heartBeat = null;            }            ctx.fireExceptionCaught(cause);        }    }}public final class MarshallingCodeCFactory {    /**     * 创建Jboss Marshalling解码器MarshallingDecoder     *      * @return MarshallingDecoder     */    public static MarshallingDecoder buildMarshallingDecoder() {        // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");        // 创建了MarshallingConfiguration对象,配置了版本号为5        final MarshallingConfiguration configuration = new MarshallingConfiguration();        configuration.setVersion(5);        // 根据marshallerFactory和configuration创建provider        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);        // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);        return decoder;    }    /**     * 创建Jboss Marshalling编码器MarshallingEncoder     *      * @return MarshallingEncoder     */    public static MarshallingEncoder buildMarshallingEncoder() {        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");        final MarshallingConfiguration configuration = new MarshallingConfiguration();        configuration.setVersion(5);        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);        // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组        MarshallingEncoder encoder = new MarshallingEncoder(provider);        return encoder;    }}public class RequestInfo implements Serializable {    private String ip;    private HashMap<String, Object> cpuPercMap;    private HashMap<String, Object> memoryMap;    // .. other field    public String getIp() {        return ip;    }    public void setIp(String ip) {        this.ip = ip;    }    public HashMap<String, Object> getCpuPercMap() {        return cpuPercMap;    }    public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {        this.cpuPercMap = cpuPercMap;    }    public HashMap<String, Object> getMemoryMap() {        return memoryMap;    }    public void setMemoryMap(HashMap<String, Object> memoryMap) {        this.memoryMap = memoryMap;    }}

当client刚刚连接的时候,会发送认证信息到server端认证,认证通过后再定时发送心跳包。

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台