RabbitMQ 实现跨语言通信
做项目时遇到需要 Java 和 Python 通信的场景,RabbitMQ 可以较好地实现。

需求场景

做项目时遇到需要 Java 和 Python 通信的场景,摸索到了三种解决方案,并都正式在项目中实施过、踩坑过、改进过、整个推翻替换过。

其中 RabbitMQ 是我最终的方案,所以会重点阐述,其中略带提一下。

三种方式的实现

1. 直接调用

ProcessBuilderRuntime.exec() 两种方法,用法差不多,都可以传一个字符串数组。

String[] command = new String[]{"python", "C:\\Users\\lenovo\\Desktop\\java2python.py"};
proc = Runtime.getRuntime().exec(command);// 执行py文件

ProcessBuilder 可以是列表,官方推荐 ProcessBuilder

List<String> command = new ArrayList<>();
command.add("python");
command.add("C:\\Users\\lenovo\\Desktop\\java2python.py");
ProcessBuilder pb = new ProcessBuilder(command);
proc = pb.start();

Process 提供 InputStreamErrorStream 接收返回结果;为方便起见,可以将错误流重定向到输入流。

pb.redirectErrorStream(true);//错误流重定向
BufferedReader in = new BufferedReader(new InputStreamReader(proc.getInputStream()));

若需要立即获取其返回结果,需要用 waitFor() 方法。

//导致当前线程等待,如有必要,直到由此Process对象表示的进程已终止
code = proc.waitFor();

坑不坑:

  • Java 自带,开发方便(这个很重要……老板就给你两天时间你能怎么办)。
  • 作为一个进程开启,效率低,好在是官方类,使用时感觉控制的还行。
  • 返回结果难以接收。可以通过输入流或者写入文件读取,但是都很不好。
  • 目前只尝试了 Java 调用 Python,反过来感觉很难。

完整代码如下:

@Test
public void java2pythonTest(){
  int code = 1;
  Process proc;
  try {
    // Runtime.getRuntime()
//			String[] command = new String[]{"python", "C:\\Users\\lenovo\\Desktop\\java2python.py"};
//      proc = Runtime.getRuntime().exec(command);// 执行py文件

    List<String> command = new ArrayList<>();
    command.add("python");
    command.add("C:\\Users\\lenovo\\Desktop\\java2python.py");
    ProcessBuilder pb = new ProcessBuilder(command);
    pb.redirectErrorStream(true);//错误流重定向
    proc = pb.start();

    //用输入输出流来截取结果
    BufferedReader in = new BufferedReader(new InputStreamReader(proc.getInputStream(), "gbk"));
    String line = null;
    while ((line = in.readLine()) != null) {
      System.out.println("python> " + line);
    }
    in.close();

    //导致当前线程等待,如有必要,直到由此Process对象表示的进程已终止
    code = proc.waitFor();
    System.out.println("python> wait for code: " + code);

  } catch (IOException e) {
    e.printStackTrace();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

2. 通过 API 通信

两端都启动为一个 Web 项目,Java 端可以采用 Spring Boot 框架,Python 端用 Flask 框架。然后通过 Spring Cloud 来连接。

由于 Spring Cloud 非本人搭建,就不讲了。当然,你也没必要非要使用。

坑不坑:

  • 两端通信频繁的情况下,相对于第一种方案更好些
  • 开发有点烦,尤其是数据传输,如果用 Spring Cloud 会好些
  • 不适用于等待的情况,如 Python 端计算量大,没法立刻给出结果。Api 嘛,时间长肯定不太好

3. RabbitMQ 消息队列

因为实际公司开发中,Python 端计算量很大,需要等待很久,所以方法二也不太适用,后面有同事提出用消息队列的方式,可以不必等待,正好满足我的情况。

RabbitMQ 不仅要有接收端、发送端,还需要安装管理器。

管理系统

管理系统各平台都有,也有 docker 镜像。具体不赘述,看官网就行了。

安装后会有 Web 端图形界面,使用很方便,不需要命令行就可以完成。

大概介绍下用到的一些概念,后续深入的话再补充。

Virtual Hosts

相当于 Mysql 的库,相互独立,exchange、queue、message 不能互通。需要账户管理员权限。Web 端操作如下。

Queue

队列,基本通信模块,相当于一个消息缓冲区,消息进到队列里,再被发送出去。

Exchange

生产者发送消息时由 Exchange 来决定要给哪个 Queue。具体有 Direct、Fanout、Topic、Headers 四种类型,我用默认的一对一也就没有仔细了解。

Java 端实现

配置

我采用 Spring Boot 框架,自带 Spring AMQP 兼容 RabbitMQ,很方便推荐使用。直接在 yml 文件中配置相关参数。

spring:
  rabbitmq:
    host: 192.168.0.102
    port: 5672
    username: admin
    password: admin
    virtual-host: test

发送消息

AmqpTemplate 类用于发送消息,我写了个定时 3 秒发送时间:

@Component
public class TestScheduled {

    //rabbitMQ Queue
    public static final String PYTHON2JAVA = "python2java";
    public static final String JAVA2PYTHON = "java2python";

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 3000)
    public void errorSpiderRerun(){
        String now = LocalDateTime.now().toString();
        String content = "java2python and now " + now;
        System.out.println(content);
        rabbitTemplate.convertAndSend(JAVA2PYTHON, content);
    }
}

如果没有客户端接收的话,会一直在队列中,Web 页面下可以看到:

接收消息

接收消息也非常简单,只需要添加 @RabbitListener 注解即可。

@RabbitListener(queues = PYTHON2JAVA)
public void listen(String message) {
    System.out.println(message);
}

Python 端实现

Python 端需要安装第三方库 pika

建立连接

def get_connect():
    """
    连接
    :return: 
    """
    config_pika_username = "admin"
    cofig_pika_password = "admin"
    config_pika_host = "192.168.0.102"
    config_pika_port = 5672
    config_pika_virtual_host = "test"
    cred = pika.PlainCredentials(config_pika_username, cofig_pika_password)
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host=config_pika_host,
            port=config_pika_port,
            virtual_host=config_pika_virtual_host,
            credentials=cred,
            heartbeat=0
        ))
    return connection

定义回调函数

def callback(ch, method, properties, body):
    print(body)

等待接收

运行此方法,等待接收。

def build():
    """
    start_consuming
    :return: 
    """
    channel = get_connect().channel()
    channel.queue_declare(queue='java2python', durable=True)
    channel.basic_consume(queue='java2python',
                          auto_ack=True,
                          on_message_callback=callback)
    print(' consumer_start')
    channel.start_consuming()

if __name__ == '__main__':
    build()

发送消息

def producer():
    '''
    生产者
    :param rabbit_body: 发送内容
    '''
    # 创建通道
    connection = get_connect()
    channel = connection.channel()
    channel.queue_declare(queue='python2java', durable=True)

    # 发送内容
    message = "python2java and now " + str(datetime.now())
    print(message)
    channel.basic_publish(exchange='', routing_key='python2java', body=message)

坑不坑:

  • 非长连接方式,效率相对接口方式更高。
  • 如果项目所用框架如 Spring Boot 已经集成的话,开发会很方便。
  • 有担心长时间没有消息队列会断开连接的情况,也有遇到,但好像都已解决。

后续

只是简单介绍用到的类和方法,流于表面,后续如果频繁使用的话,需要深入了解。


Last modified on 2020-05-15