• 微信公众号:美女很有趣。 工作之余,放松一下,关注即送10G+美女照片!

使用Netty自定义实现Dubbo

开发技术 开发技术 4小时前 2次浏览

使用Netty自定义实现Dubbo

设计目标:

  使用 Netty 实现一个简单的 RPC 框架。 

 

设计需求:

  模仿 Dubbo,消费者和提供者共同约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。

 

设计说明:

  网络通信使用 Netty 4.1.20。

 

设计示图:

使用Netty自定义实现Dubbo

 

 

代码示例:

公共接口 HelloService

/**
 * 客户端与服务器端公共接口
 *
 * @author LJT
 * @date 2021/11/23 13:42
 */
public interface HelloService {

    String sayHello(String mes);

}

 

HelloServiceImpl

/**
 * 服务器端实现公共接口
 *
 * @author LJT
 * @date 2021/11/23 13:43
 */
public class HelloServiceImpl implements HelloService {

    private static int count = 0;

    // 服务端实现公共接口,重写里面的方法
    // 当收到客户端的消息时,返回响应的结果
    public String sayHello(String mes) {

        System.out.println("收到了客户端的消息=" + mes);

        // 根据请求不同的mes,返回不同的响应结果
        if (mes != null) {
            return "你好呀客户端,我已经收到你的消息【" + mes + "】第" + (++count) + "次";
        } else {
            return "你好呀客户端,我已经收到你的消息了";
        }

    }
}

 

ServerBootstrap

/**
 * ServerBootstrap 会启动一个服务提供者,即 NettyServer
 *
 * @author LJT
 * @date 2021/11/23 13:51
 */
public class ServerBootstrap {
    public static void main(String[] args) {

        // 启动服务,绑定本机的地址与端口
        NettyServer.startServer("127.0.0.1", 7000);

    }
}

 

NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 完成NettyServer的初始化和启动
 *
 * @author LJT
 * @date 2021/11/23 13:54
 */
public class NettyServer {

    // 接收外来的调用传参
    public static void startServer(String hostName, int port) {
        startServer0(hostName, port);
    }

    // 初始化NettyServer并调用启动服务
    private static void startServer0(String hostName, int port) {

        // 服务端的两个group,一个用来接收请求,一个用来处理逻辑
        // 不写参数,默认是 cpu核数 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            
            // 配置启动参数
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler()); // 自定义业务处理器
                        }
                    });

            // 回调处理
            ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
            System.out.println("服务器启动成功,开始提供服务~~~");
            channelFuture.channel().closeFuture().sync();
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 最后关闭,防止浪费资源,要养成习惯
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

 

NettyServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 自定义服务器处理器
 *
 * @author LJT
 * @date 2021/11/23 14:14
 */
// 这里继承 ChannelInboundHandlerAdapter 适配器
// 不需要再去管其它事情,只需要把精力集中重写需要的方法,处理业务逻辑
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 获取客户端发送的消息,打印消息
        System.out.println("msg=" + msg);

        // 客户端在调用服务器的 api 时,我们需要定义一个协议
        // 比如,每次发消息都必须是以某个字符串开头 “HelloService#hello#你好”
        // 在实际的开发中,类似二次解码,提取出有用的信息
        if (msg.toString().startsWith(ClientBootstrap.providerName)) {

            String result = new HelloServiceImpl().sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);

        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

 

ClientBootstrap

/**
 * 客户端启动引导
 *
 * @author LJT
 * @date 2021/11/23 15:16
 */
public class ClientBootstrap {

    // 这里定义协议头
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws Exception {

        // 创建一个消费者
        NettyClient customer = new NettyClient();

        // 创建代理对象,在实际的开发中,使用 spring 进行管理
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (; ; ) {
            Thread.sleep(2 * 1000);
            // 通过代理对象调用服务提供者的方法(服务)
            String res = service.sayHello("你好 dubbo~");
            System.out.println("调用的结果 res= " + res);
        }
    }
}

 

NettyClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * NettyClient的创建与初始化
 *
 * @author LJT
 * @date 2021/11/23 14:40
 */
public class NettyClient {

    // 创建线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    // 编写方法使用代理模式,获取一个代理对象
    //开发中,使用 spring 获取,不需要太关注于底层细节
    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");
                    //{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码
                    if (client == null) {
                        initClient();
                    }

                    //设置要发给服务器端的信息
                    //providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
                    client.setPara(providerName + args[0]);

                    return executor.submit(client).get();

                });
    }

    //初始化客户端
    private static void initClient() {

        client = new NettyClientHandler();

        // 创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();

        // 配置初始化启动参数
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            // 连接请求的服务器地址与端口
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }  
      // 注意这里不需要关闭 group
} }

 

NettyClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

/**
 * 自定义客户端处理器
 *
 * @author LJT
 * @date 2021/11/23 14:24
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //返回的结果
    private String para; //客户端调用方法时,传入的参数

    // 与服务器的连接创建后,就会被调用, 这个方法是第一个被调用
    // 确保它是活的
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被调用  ");
        context = ctx; //因为我们在其它方法会使用到 ctx
    }

    //收到服务器的数据后,逻辑处理
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被调用  ");
        result = msg.toString();
        notify(); //唤醒等待的线程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    // 被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead)
    // 注意这里需要加上 synchronized 不然会报异常
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被调用  ");
        context.writeAndFlush(para);
        //进行wait
        wait(); //等待channelRead 方法获取到服务器的结果后,唤醒
        System.out.println(" call2 被调用  ");
        return result; //服务方返回的结果

    }

    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}

 

运行结果:

使用Netty自定义实现Dubbo

 

 

使用Netty自定义实现Dubbo

 

 


程序员灯塔
转载请注明原文链接:使用Netty自定义实现Dubbo
喜欢 (0)