Clojang, JInterface, & core.async

Posted on February 24, 2017 by oubiwann


Blog post image

The discussion on Slack the other day kicked off with a question about how messages are consumed in Clojang (and thus jiface and JInterface), and whether, in Clojure code, one could use core.async to do this.

We'll take a very quick deep (while at the same time superficial) dive into this, but then answer some broader questions to provide a better context for JInterface and the projects built upon it.

Receiving Messages

If you are using the clojang library, receiving messages is as simple as the following (assuming you've followed the best practice of adding the Clojang agent dependency in your project.clj file and set the :java-agents option):

(require '[clojang.core :refer [receive]]

(receive)

At which point, your code blocks until a message is received. When a message comes in, clojang will automatically convert the Java-OTP hybrid types to Clojure types; when you reply, the conversion will take place in the other direction.

A fuller examples is viewable here:

Under the hood (in the clojang library), a default OTP node and associated mbox are being used (using the Clojure-idiomatic library jiface). Under that hood, Erlang's JInterface Java library is using threads, sockets, and custom queues to handle in-coming and out-going messages.

In other words, not a lot of room for core.async, unfortunately.

However, there's a little bit of room :-)

Command & Convenience Channels

One way I use core.async in Clojang applications is to facilitate communications between Clojure functions (i.e., a Clojang server and client, both written in Clojure).

Here's a Clojure server that takes a core.async channel as an argument:

(defn run
  [cmd-chan]
  (log/info "Starting Clojure node with nodename ="
            (System/getProperty "node.sname"))
  (let [init-state 0]
    (loop [png-count init-state]
      (match (receive)
        [:register caller]
          (do
            (log/infof "Got :register request from %s ..." caller)
            (mbox/link (self) caller)
            (! caller :linked)
            (recur png-count))
        [:ping caller]
          (do
            (log/infof "Got :ping request from %s ..." caller)
            (! caller :pong)
            (recur (inc png-count)))
        [:get-ping-count caller]
          (do
            (log/infof "Got :get-ping-count request from %s ..."  caller)
            (! caller png-count)
            (recur png-count))
        [:stop caller]
          (do
            (log/warnf "Got :stop request from %s ..." caller)
            (! caller :stopping)
            :stopped)
        [:shutdown caller]
          (do
            (log/warnf "Got :shutdown request from %s ..." caller)
            (! caller :shutting-down)
            (async/>! cmd-chan :shutdown))
        [_ caller]
          (do
            (log/error "Bad message received: unknown command")
            (! caller [:error :unknown-command])
            (recur png-count))
        [_]
          (do
            (log/error "Bad message received: improperly formatted")
            (recur png-count))))))

In this particular case, it doesn't do too much: it simply sends a message back to the calling code when the server has been asked to shutdown. (Note that the :shutdown clause is sending two types of messages: first an OTP message reply is sent back to the calling OTP node – which could be either another OTP Clojure node or a BEAM node – and then a core.async message is pushed onto the channel that was passed to the server function. In the case where this code was copied from, that channel was started by the function that called (run).)

That's a really simple (and not strictly necessary) example of a command channel use case for core.async in a Clojang app. Here's another case:

(defn otp-bridge
  "This function creates the following in order to facilitate core.async
  communications with the Clojure OTP server:
  1. A dedicated mbox for the OTP bridge (what receives messages from the OTP server) 2. The pid for the dedicated mbox 3. A core.async channel for sending messages to the OTP server." [] (let [bridge-mbox (mbox/add :otpbrige) bridge-pid (mbox/get-pid bridge-mbox) bridge-chan (async/chan)] (async/go-loop [] (when-let [value (async/<! bridge-chan)] (! [value bridge-pid])) (recur)) {:mbox bridge-mbox :pid bridge-pid :channel bridge-chan}))

As the docstring above says, this function uses core.async to provide a wrapper for OTP communications with a Clojang server. Interesting and kind of fun, but not really tapping into the power of core.async.

Zhang

There is an experimental project for exploring ways in which Clojure/OTP applications could maximize core.async while communicating with nodes, services, and full distributed systems in the BEAM world:

Those two links provide some good introductory material, so I won't duplicate that here. Note, however, than while the jiface and clojang libs are stabilizing, little effort is being applied to zhang. As such, there's not much to the project currently.

Author oubiwann
Date February 24, 2017
Time 01:42:24
Category Information
Tags clojang clojure core.async internals jinterface lfecljapp zhang
Line Count 62
Word Count 688
Character Count 6546

Comments?
This blog doesn't use standard (embedded) comments; however, since the site is hosted on Github, if there is something you'd like to share, please do so by opening a "comment" ticket!