On merging of two asynchronous queues

This is a personal note on what I implemented for this signature https://github.com/keigoi/ocaml-mpst/blob/master/lib/lwt/mstream.mli.

Comments are appreciated, but the followings are not meant to be read by others than myself.


Suppose you have two channels which are asynchronous — asynchronous queues. More precisely, an asynchronous queue q has the following properties:

  1. A queue q consists of a triple (st,i,o): A stream st and two endsoutput end o and input end i. Writing a message on o is immediately enqueued in st, which can be taken from i.
  2. Writing on an output end is non-blocking.
  3. Ordering in a stream is preserved. In other words, writings on an output end are ordered, i.e. messages m1 and m2 written in this order can only be read as first m1 then m2 on the input end, not conversely.

And I was thinking about merging of them. Here, merging is a concept derived from End Point Projection of Multiparty Sesison Types, but here is about queues and their ends, not types.

Let me informally establish what I meant by "merging".

  • Rule i). Only two ends with the same modality (input/output) and the same payload type can be merged.
  • Rule ii). Merging of two ends invokes merging of streams and the other ends; i.e. if two input ends are merged, then two streams and two output ends associated with them are merged into one respectively (here I assume the queues are empty); and vice versa

Merging means that the writing on a merged output end preserves ordering: writings "m1 to o1 then m2 to o2" can be observed in this order at the other end.

Rule ii) means that the merged ends are indistinguishable from each other. I.e., after merging two output ends o1 and o2, writing a message m on either o1 or o2 will have the same effect: it writes into the queue connected to them, without any flag or timestamp or any labellings.

Another implication is that two simultaneous inputs/outputs on the same ends will cause a race: If a stream contains only a message m0, simultaneous inputs on i1 and i2 will block one of them. Similarly, simultaneous outputs on o1 and o2 with messages m1 and m2 respectively will result in a stream non-deterministically enqueued with m1 then m2, or m2 then m1.

It bloated up to more than 20 lines, but actually easy: Merging unifies two queues into one, and after merging, they can be treated equally after that.


Then, let's add more complexity. Now, an asynchronous queue has its own wrapper function which transforms a message right before it is read by an input end.

A wrapped queue (st,i,o,f) consists of four elements: stream st, input end i, output end o and wrapper function f. Then, how the merging above should be defined properly?

Here is my solution using GADTs: 

https://github.com/keigoi/ocaml-mpst/blob/master/lib/lwt/mstream.ml  (However, this is tainted with a specific scatter/gather implementation of ocaml-mpst.)

Background thoughs might be explained in the following entry or revision of this entry.