GraalVM系列(三):GraalJS多线程实践

Posted by Coding Ideal World on February 4, 2021

GraalVM系列(二):GraalVM核心特性实践 一文中介绍了GraalVM的几个核心特性。本文接着前文遗留的JS多线程调用问题展开讨论。

通过前文的测试我们可以看到GraalVM下不支持对JS的并发执行,具体而言引用官网的描述如下:

GraalVM支持的多线程执行的基本模型是"无共享(share-nothing)"模型,任何JavaScript开发人员都应该熟悉。

  1. 可以创建任意数量的JavaScript Contexts,但每次只能由一个线程使用

  2. 不允许并发访问JavaScript对象:任何JavaScript对象不能同时被一个以上的线程访问

  3. 允许并发访问Java对象:任何Java对象都可以被任何Java或JavaScript线程同时访问

  4. 一个JavaScript上下文不能被两个或两个以上的线程同时访问,但可以使用适当的同步从多个线程访问同一个上下文,以确保不会发生并发访问

— https://www.graalvm.org/reference-manual/js/Multithreading/

这个限制的本意是不打破JS在前端开发中单线程的模型以降低适配学习成本,但这也大大限制了其使用的场景。

怎么解决呢?我们查阅 https://github.com/oracle/graal/issues/2484 这个Issue,结合官网的介绍可以总结出两种方案。

使用Worker

这一方案的核心是通过Node的worker_threads模块启动一个Worker,在Worker中引用Java的阻塞队列(如:LinkedBlockingDeque)并等待数据(take),获取数据后发送(postMessage)给主线程处理,这样一来Worker线程阻塞用于接收,主线程非阻塞用于处理,在需要处理时从Java侧发送(offer)数据到阻塞队列即可。

但这一方案由于要启动Worker,所以必须使用Node工程启动,不能嵌入到Java工程中。示例代码可参考 https://medium.com/graalvm/multi-threaded-java-javascript-language-interoperability-in-graalvm-2f19c1f9c37b

使用Event Loop

这里有个示例工程实现了基于事件的并发访问: https://github.com/iitsoftware/graaljs-concurrency-problem 笔者未测试过。

本文给出使用Vertx EventBus的并发示例,示例工程见前文,完整代码见/multithreading目录。

PolyglotExchanger.java
/**
 * 与JS的交互类.
 *
 * @author gudaoxuri
 */
public class PolyglotExchanger {

    // 创建Vertx实例
    private static Vertx vertx = Vertx.vertx();
    // 创建EventBus实例
    private static EventBus eventBus = vertx.eventBus();

    /**
     * 由Java侧发起JS调用请求.
     *
     * @param funName 函数名
     * @param args    函数参数
     * @param promise 执行回调
     */
    public static void request(String funName, List<Object> args, Promise<Object> promise) {
        // 向JS中发起地址为"__js_invoke__"的事件
        eventBus.request("__js_invoke__", new JsonObject().put("funName", funName).put("args", args).toString(),
                (Handler<AsyncResult<Message<String>>>) event -> {
                    // 执行返回处理
                    if (event.failed()) {
                        promise.fail(event.cause());
                    } else {
                        promise.complete(event.result().body());
                    }
                });
    }

    /**
     * 由JS侧调用事件订阅.
     *
     * @param processFun 订阅处理函数
     */
    public static void consumer(Consumer<Message<String>> processFun) {
        eventBus.consumer("__js_invoke__", processFun::accept);
    }

    /**
     * 模拟JS调用Java发起HTTP请求.
     *
     * @param httpMethod HTTP方法
     * @param url        URL
     * @param body       Body
     * @param header     Header
     * @param fun        回调函数
     */
    public static void http(String httpMethod, String url, String body, Map<String, String> header, Consumer<String> fun) {
        // 模拟调用,这里仅返回请求的URL
        vertx.setTimer(1000, i -> fun.accept("<div>Hello:" + url + "</div>"));
    }

}
task.js
// 引用PolyglotExchanger类
const polyglotExchanger = Java.type('idealworld.train.graalvm.multithreading.PolyglotExchanger')

// 订阅事件
polyglotExchanger.consumer(event => {
    // 获取到订阅数据
    let data = JSON.parse(event.body())
    let funName = data.funName
    let args = data.args
    // 此处可以实现要处理的逻辑
    // 这里使用http调用逻辑为示例
    polyglotExchanger.http('GET', 'http://127.0.0.1/s?fun=' + funName + '&args=' + args, null, null, resp => {
        // 处理完成后执行回调函数返回结果,这里返回的是http调用的结果
        event.reply(resp)
    })
})
MultithreadingExample.java
/**
 * 多线程示例.
 *
 * @author gudaoxuri
 */
public class MultithreadingExample {

    private static final String TASK_JS = new BufferedReader(new InputStreamReader(MultithreadingExample.class.getResourceAsStream("/task.js")))
            .lines().collect(Collectors.joining("\n"));

    public static void main(String[] args) throws InterruptedException {
        var context = Context.newBuilder()
                .allowAllAccess(true)
                // 开启Java函数过滤以保障安全
                .allowHostClassLookup(s -> s.equalsIgnoreCase(PolyglotExchanger.class.getName()))
                .build();
        // 添加与Java交互的函数类
        context.eval(Source.create("js", TASK_JS));

        // 执行测试
        new Thread(() -> {
            while (true) {
                var promise = Promise.promise();
                PolyglotExchanger.request("fun1", new ArrayList<>(), promise);
                promise.future()
                        .onSuccess(resp -> System.out.println("result : " + resp))
                        .onFailure(e -> System.err.println("result : " + e.getMessage()));
            }
        }).start();
        new Thread(() -> {
            while (true) {
                var promise = Promise.promise();
                PolyglotExchanger.request("fun2", new ArrayList<>(), promise);
                promise.future()
                        .onSuccess(resp -> System.out.println("result : " + resp))
                        .onFailure(e -> System.err.println("result : " + e.getMessage()));
            }
        }).start();

        new CountDownLatch(1).await();
    }

}

执行后会输出如下信息:

……
result : <div>Hello:http://127.0.0.1/s?fun=fun2&args=</div>
result : <div>Hello:http://127.0.0.1/s?fun=fun2&args=</div>
result : <div>Hello:http://127.0.0.1/s?fun=fun1&args=</div>
result : <div>Hello:http://127.0.0.1/s?fun=fun2&args=</div>
result : <div>Hello:http://127.0.0.1/s?fun=fun1&args=</div>
result : <div>Hello:http://127.0.0.1/s?fun=fun2&args=</div>
……

这样我们就很方便地实现了对JS的并发访问。