package component
A collection of several
process-generators that (mostly) yield processes to work on (or
produce) finite or infinite streams of values presented as channel.
All are designed to terminate cleanly -- 'i.e.' to closeIn
or
closeOut
all the channel.that they communicate on in the appropriate
direction for the type of port.
Some of these components were inspired by (or copied from) components from the Plug'n'Play collection of JCSP (without necessarily retaining the P'n'P names).
@version 03.20120824 @author Bernard Sufrin, Oxford $Revision: 247 $ $Date: 2017-10-20 15:00:00 +0100 (Fri, 20 Oct 2017) $
- Alphabetic
- By Inheritance
- component
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Type Members
-
implicit
class
ExtendCollection
[T] extends AnyRef
An implicit class that equips an
Iterable[T]
collection with methods that write all its members to an output port.An implicit class that equips an
Iterable[T]
collection with methods that write all its members to an output port.I confess that this class is here only because ScalaDoc will not generate documentation for a channel unless it defines one or more class-like entities.
Value Members
-
def
Integrator(in: channel.?[Long], out: channel.![Long]): PROC
A composite component that sums its input stream onto its output stream
A composite component that sums its input stream onto its output stream
in out [x,y,z,...] >------>|\ /|---------> [x,x+y,x+y+z,...] |+}----------->{ | +-->|/ \|--+ | | +----------<{(0)}<------+
-
def
Ticker(periodNS: Nanoseconds): ?[Unit]
Generate a
?[Unit]
to which an()
is repeatedly written by a server processperiodNS
nanoseconds after it is read.Generate a
?[Unit]
to which an()
is repeatedly written by a server processperiodNS
nanoseconds after it is read.The server terminates at the next tick after the port is closed.
+----------+ | periodNS |>-------------> () +----------+
-
def
Timer(periodNS: Nanoseconds, relativeTo: Nanoseconds = 0l): ?[Nanoseconds]
Generate a
?[Nanoseconds]
on which the system nanosecond clock is offered by a server process at least everyperiodNS
nanoseconds.Generate a
?[Nanoseconds]
on which the system nanosecond clock is offered by a server process at least everyperiodNS
nanoseconds. The offer is withdrawn if the port is not read between the moment of the offer and the moment at which the next offer is due. For example:Ticker(seconds(10))
yields a port that makes the absolute nanosecond clock available with a resolution of 10 seconds.
The server terminates at the next tick after the port is closed.
+----------+ | periodNS |>-------------> ... System.nanotime ... +----------+
-
def
console[T](in: channel.?[T]): PROC
Repeatedly write the string forms of values read from
in
onto the standard output stream. -
def
const[T](ts: T*)(out: channel.![T]): PROC
Output all the given
ts
onto the output port, then terminate.Output all the given
ts
onto the output port, then terminate.+------------+ t1, ... | t1, ... tn +---------> +------------+
-
def
copy[T](in: channel.?[T], out: channel.![T]): PROC
Repeatedly copy values from
in
toout
. -
def
exchanger[T](l: channel.?[T], r: channel.?[T], lo: channel.![T], hi: channel.![T])(implicit arg0: (T) ⇒ Ordered[T]): PROC
Repeatedly reads pairs inputs from its two input channel.and outputs them (in parallel, and ordered) to its two output channel.
Repeatedly reads pairs inputs from its two input channel.and outputs them (in parallel, and ordered) to its two output channel.
x, ...--->[\/]---> max(x,y), ... y, ...--->[/\]---> min(x,y), ...
Here is a four-channel sorting network composed of 5 such components.
-->[\/]--------->[\/]------------> -->[/\]---+ +-->[/\]--+ | | | | | +-->[\/]--> -->[\/]------+ +-->[/\]--> -->[/\]-+ | | | +---->[\/]---+ +------>[/\]------------->
-
def
filter[T](pass: (T) ⇒ Boolean)(in: channel.?[T], out: channel.![T]): PROC
Copy values from
in
toout
that satisfypass
. -
def
inj[T](in: ?[T], inj: ?[T], out: ![T]): PROC
Merges the streams
in
andinj
ontoout
, giving priority to data arriving oninj
. -
def
keyboard(out: channel.![String], prompt: ⇒ String): PROC
Repeatedly output lines read from the standard input stream, stopping when
out
either does not accept an output or can no longer accept an output; and using theprompt
thunk to generate a (new) prompt each time a line is to be read.Repeatedly output lines read from the standard input stream, stopping when
out
either does not accept an output or can no longer accept an output; and using theprompt
thunk to generate a (new) prompt each time a line is to be read. Ending the standard input stream (Control-D to the terminal on Unix-based operating systems) causesout
to be closed for output, as does closing the input port on the channel of whichout
is the output port. -
def
keyboard(out: channel.![String]): PROC
Repeatedly output lines read from the standard input stream, stopping when
out
either does not accept an output or can no longer accept an output.Repeatedly output lines read from the standard input stream, stopping when
out
either does not accept an output or can no longer accept an output. Ending the standard input stream (Control-D to the terminal on Unix-based operating systems) causesout
to be closed for output, as does closing the input port on the channel of whichout
is the output port. -
def
lines(pathName: String, out: channel.![String]): PROC
Repeatedly output lines read from the file with the given path name
-
def
lines(file: File, out: channel.![String]): PROC
Repeatedly output lines read from the given
File
-
def
lines(in: Reader, out: channel.![String]): PROC
Repeatedly output lines read from the given
Reader
. -
def
lines(in: LineNumberReader, out: channel.![String]): PROC
Repeatedly output lines read from the given LineNumberReader.
Repeatedly output lines read from the given LineNumberReader.
When
in
is associated with the user's terminal, rather than a file, there is a potential race condition that the code here does not forbid (explained below). Usekeyboard(out)
to play this role.The race condition results in an unnecessary invocation of in.readLine that will wait for input even though out is already closed.
This happens when a program reading from the keyboard terminates and closes channel downstream of the keyboard. This leaves an unconsummated readLine waiting at the keyboard itself, that has to be cleared by typing an end-line or EOF character.
The closing of out should really abort an in.readLine that is already in progress. But that would add unwarranted complexity to channel implementations.
-
def
map[I, O](f: (I) ⇒ O)(in: channel.?[I], out: channel.![O]): PROC
x, ... >-->[f]>-->f(x), ...
-
def
merge[T](ins: Seq[?[T]], out: ![T]): PROC
Merge several input streams Into a single output stream.
Merge several input streams Into a single output stream. Terminate when the output stream, or all the input streams have closed.
>---->|\ out ins : | }-----> >---->|/
-
def
prefix[T](ts: T*)(in: channel.?[T], out: channel.![T]): PROC
Output the given
ts
toout
, then copy values fromin
toout
, respecting the network-termination convention. -
def
sampler[T](periodNS: Long, in: ?[T], out: ![T]): PROC
Read data from
in
as channel.as it appears there, copying the most-recently read datum toout
everyperiodNS
nanoseconds. -
def
tail[T](in: channel.?[T], out: channel.![T]): PROC
Drop the first value read from
in
, then copy values fromin
toout
. -
def
tee[T](in: channel.?[T], outs: channel.![T]*): PROC
Copy from the given input stream to the given output streams, performing the outputs concurrently.
Copy from the given input stream to the given output streams, performing the outputs concurrently. Terminate when the input stream or any of the output streams is closed.
in /|----> x, ... x, ... >---->{ | : outs \|----> x, ...
-
def
zip[L, R](lin: channel.?[L], rin: channel.?[R], out: channel.![(L, R)]): PROC
Turns a pair of streams into a stream of pairs.
-
def
zipwith[L, R, O](f: (L, R) ⇒ O)(lin: channel.?[L], rin: channel.?[R], out: channel.![O]): PROC
Repeatedly input pairs of values
(l, r)
fromlin
, andrin
and sendf(l, r)
toout
.