pipeline.rdoc

Path: /home/chris/pipeline.rdoc
Last Update: Thu Feb 26 17:38:27 CET 2004

Object-oriented Pipelines

Author:Christian Neukirchen <chneukirchen@yahoo.de>
License:See `License‘
Last-change:26feb2004 +chris+

Abstract

Pipelines (or pipes, for short) have played, and still play an important role in daily Unix usage. As [1] points out, Unix pipes were implemented in 1972, but the idea existed back in 1971 as part of the Dartmouth Time Sharing System.

Standard Unix pipes suffer, however, from a number of difficulties, which the next chapter is going to point out. The purpose of this paper is get the idea of Unix pipes into object-oriented languages, which can simplify many applications while avoiding the disadvantages of classical pipes.

A review of Unix pipes

Basically, pipes allow command chaining, by connecting the output of one program with the input of another.

To make pipes work, all those commands need to use the same "protocol", the way the data is specified. Since Unix mostly handles textual data (especially as part of the Documenter’s Workbench), most tools use "line-based" text files. This protocol basically inherits from another, the "raw byte stream". Raw byte streams are the only kind of data Unix supports kernel-sided. Files, and therefore pipes too, store/transport raw byte streams.

This means we now have tools that work on two separate levels, on the one hand character-wise (characters are assumed to be 8 bits (1 byte) long) and on the other line-wise.

Character-wise tools can work one line-based data, but line-wise tools have usually no use on raw byte streams (i.e. without line-based protocol).

Please note that "line" doesn’t mean this data consists of chars separated by Carriage Return, Carriage Return and Line Feed or Line Feed alone. In fact, many tools allow modification of the so-called line separator, which usually is "\n", that is, a Line Feed. (If a certain tool doesn’t have such a flag, you still could use "tr" for this job.)

On top of this line-based protocol, some tools define another level, the "field level", which allows marking up fields as part of lines. Common field separators are tabs, ";", ":" or simply whitespace. This allows the markup of two dimensional data, for example tables.

An example for a character-wise tool would be "tr", which translates chars or "cat" which concatenates files or streams. Line-wise tools include "sort", which sorts lines or "grep", a tool that prints lines matching a pattern. Field-wise, common tools include "cut", "join" and "awk".

These protocols are by no means the only ones, and there are lots of others, for example, paragraph based tools as "fmt" or troff filters which have their own (mostly line-wise, but not always. Also, with the raise of XML there are tools which map the basic Unix utils on top of XML, e.g. xmlstarlet [3]. These tools handle tree-based data.

This actually brings me to another important issue: Since pipes only transport raw byte streams, each application has to define the protocol it uses on its own (which unfortunately leads to weird bugs and incompatibilities, which at least older versions of Unix were famous for, e.g. limited line lengths, dropping of NUL bytes and a lot more)

Furthermore, you can difference these tools into four orthogonal groups by their so-called pattern. Here are the important ones regarding pipes (see [2] for reference):

Filters

Filters read data from input, process them, and write results to the output. Filters only process parts of the data at once, they don’t block the pipe.

Examples include "cat", "tr", and the mighty "sed". "awk" is also often used in this context.

Sponges

Sponges are essentially like pipes, but usually read the input (or at least significant parts) into memory, process them, and dump them to the output. Therefore, sponges block the pipe, and remove the concurrency of the tools, which usually results in larger memory footprints and slower processing.

This is bad, but sometimes can’t be avoided (for example, "sort", or "tac", which reverses the lines in a file/stream or "wc" which counts lines, words and chars of a file/stream).

Many people would consider sponges filters too, but I separated them for reasons shown later.

Sources

Sources generate data, and output them. They don’t read the input and therefore can be used at the beginning of the pipe only.

Examples of sources include "who", "ps" and "date".

Sources are also known as generators.

Sinks

Sinks read the input, but don’t output to the pipe (which doesn’t necessarily mean they drop the output).

Examples for sinks are "lpr" (the printer spooler) or "nc" (a tool to write to Internet sockets. At the same time "nc" is also a source, since it can retrieve data from sockets too).

Doing the step to object-orientation

In order to use the full power of pipes, the raw byte streams of Unix need to get replaced with something in a more object-oriented fashion, that is method chaining.

Since this also means that we can’t use the common Unix tools anymore and no Unix pipes at all, it’s time to switch into a higher-level, object-oriented language. I’m going to use Ruby [4] for the rest of this paper.

Examples of usage

Since we have talked all the time of Unix tools before, let’s port some of the simpler ones to Ruby!

Building "cat"

Cat will be the tool that takes filenames, and emits byte-actions for the chars inside them. It also sends a start_document and end_document messages at the start or end of the file.

 class Cat < Pipe
   def run(*filenames)
     filenames.each { |name|
       File.open(name) { |file|
         start_document
         file.each_byte { |b| byte b }
         end_document
       }
     }
   end
 end

As you can see, actions are emitted by calling them as usual.

For debugging purposes, also let’s write a Tracer, that shows all messages (and their arguments) passed trough the pipe.

 class Tracer < Pipe
   def method_missing(name, *args)
     p [name, *args]
     call_next_maybe name, *args
   end
 end

call_next_maybe is a special function defined by the Pipe class: it tries to run the method name with the arguments args if possible, that is, if there is no further object in the chain, it won’t raise an error (as call_next would).

Now, let’s do a quick test:

 c = Cat.new
 c | Tracer
 c.run "/tmp/foo"

This probably needs some explanation: A new Cat object is created and gets connected it with an anonymous Tracer in the second line (if classes get passed to |, they automatically get instanced, a convenience function). Finally, we run the initial method of the pipe to start processing. (In further examples, only the pipe definition will be shown.)

Once run, we get this as result:

 [:start_document]
 [:byte, 71]
 [:byte, 114]
 [:byte, 101]
 ...
 [:byte, 110]
 [:byte, 46]
 [:byte, 10]
 [:end_document]

Good, this seems to work. Now, let’s do an simple byte-sink that prints on standard output too:

 class ShowBytes < Pipe
   def start_document; end
   def end_document; end

   def byte(b)
     print b.chr
   end
 end

The empty definitions of start_document and end_document are a bit special: Usually, unhandled methods just get forwarded to the next object. Since ShowBytes is a sink, however, we need to implement all methods of the protocol:

Sinks need to implement all methods of the protocol

This is because if you erroneously spell, e.g. start_document wrong, you won’t get an NoMethodError and you will be very puzzled (believe me :^)). Therefore, sinks need to implement the whole protocol, even if they need to ignore parts of it.

Ok, after defining ShowBytes, we run

 c | ShowBytes

and get:

 Great fleas have little fleas
   upon their backs to bite 'em,
 And little fleas have lesser fleas,
   and so ad infinitum.
 And the great fleas, themselves, in turn,
   have greater fleas to go on;
 While these again have greater still,
   and greater still, and so on.

This is obliviously line-based data (the same poem is used in the first chapters of [5], by the way), let’s write an filter to transform byte-events into line-events:

 class Byte2Line < Pipe
   def initialize
     @line = ""
   end

   def end_document
     line @line  unless @line == ""
     super                       # forward
   end

   def byte(b)
     @line << b

     if b == ?\n
       line @line
       @line = ""
     end
   end
 end

byte is defined to save all chars you get into the instance variable @line and emit it after a newline is got. Then the line gets cleared.

It’s possible, however, that a file may not end with a newline, thats why there is end_document to emit that partial line, too. You also see something special here: A call to super runs the next forwards the method to the next object in chain. As usual, super without arguments reuses the arguments of the current functions, while super with arguments calls it with the new ones.

Finally, we need to transform the line into bytes again. Line2Byte is used for that (we could have written a new sink of course, but why do something twice?):

 class Line2Byte < Pipe
   def line(l)
     l.each_byte { |b| byte b }
   end
 end

Line2Byte just emits a byte for every char in the line.

Implementing "sort"

As an example of a line-wise tool, and a sponge at the same time, we will now implement "sort":

 class Sort < Pipe
   def initialize
     @lines = []
   end

   def line(l)
     @lines << l
   end

   def end_document
     @lines.sort.each { |l| call_next :line, l }
     super
   end
 end

The structure of this sponge is typical: We first put all lines into @lines, then sort them, and forward them in end_document. (thats what we need it for, by the way, since its not possible to check for an EOF as in Unix pipes.)

call_next is used here since super can’t run other methods than the current.

Now, we can run

 c | Byte2Line | Sort | Line2Byte | ShowBytes

and get:

   and greater still, and so on.
   and so ad infinitum.
   have greater fleas to go on;
   upon their backs to bite 'em,
 And little fleas have lesser fleas,
 And the great fleas, themselves, in turn,
 Great fleas have little fleas
 While these again have greater still,

Hmm, it’s probably better if we write an tool to unindent the line first:

 class Unindent < Pipe
   def line(l)
     super l.lstrip
   end
 end

Unindent is a typical event filter: It just modifies the value of its argument and forwards it.

 c | Byte2Line | Unindent | Sort | Line2Byte | ShowBytes

Now, that’s better:

 And little fleas have lesser fleas,
 And the great fleas, themselves, in turn,
 Great fleas have little fleas
 While these again have greater still,
 and greater still, and so on.
 and so ad infinitum.
 have greater fleas to go on;
 upon their backs to bite 'em,

Finally, we have assorted poetry.

Implementing "wc"

Now, let’s count the words in this sorted poem:

 class WordCount < Pipe
   def start_document
     @words = 0
     super
   end

   def byte(b)
     if b == ?\s || b == ?\n
       @words += 1
     end
     super
   end

   def end_document
     words @words
     super
   end
 end

We listen on the pipe and count each occurrence of either space or newline. Then, in end_document, we generate a words event with the sum.

This needs to be output of course, so we inherit from ShowBytes, which already shows the data, and tell end_document to show the word count too:

 class Report < ShowBytes
   def words(w)
     @words = w
   end

   def end_document
     puts "#{@words} words"
     super
   end
 end

Let’s test:

 c | Byte2Line | Unindent | Sort | Line2Byte | WordCount | Report

Which results in:

 And little fleas have lesser fleas,
 And the great fleas, themselves, in turn,
 Great fleas have little fleas
 While these again have greater still,
 and greater still, and so on.
 and so ad infinitum.
 have greater fleas to go on;
 upon their backs to bite 'em,
 46 words

Checking with "wc" you see it works.

Implementation of Ruby pipes

The implementation of above pipes is easy, it’s just about 40 lines of code.

In order to allow general usage, a mix-in Pipeline is defined. For convenience, there’s also a class Pipe which already includes Pipeline for instant inheritation.

Method chaining is very simple: Each object in the Pipe gets a new attribute @next_object holding a reference to the next object. Additionally, Pipeline defines method_missing (the method that is called when a object receives a message it cannot handle) to forward undefined methods, and to allow usage of super.

To allow the Unix-like syntax of |, this operator gets overloaded, it essentially sets @next_object to its parameter or creates a new instance of it if it’s a class.

The other methods are self-explaining.

 module Pipeline
   attr_accessor :next_object

   def method_missing(name, *args)
     if @next_object
       if @next_object.respond_to?(name) ||
            @next_object.respond_to?(:method_missing)
         @next_object.send(name, *args)
       else
         raise NoMethodError,
               "undefined method `#{name}' to forward to #{@next_object}"
       end
     else
       raise NoMethodError,
             "undefined method `#{name}' for #{self} (and no pipeline defined)"
     end
   end

   alias :call_next :method_missing

   def call_next_maybe(name, *args)
     @next_object.send(name, *args)  if @next_object
   end

   def |(pipe)
     if pipe.kind_of? Class
       @next_object = pipe.new
     else
       @next_object = pipe
     end
   end
 end

 class Pipe
   include Pipeline
 end

Outlook

I have shown how object-oriented pipelines work and how they are implemented. Trying not to go beyond the scope of this paper, the examples have been rather simple and primitive, so it may not be possible to see the whole thing.

Also, some of these examples surely would be faster and more efficient written using the Unix tools introduced in the beginning. The point is that you can transport all kinds of object via these pipelines.

Especially if the protocols turn more difficult than the simple ones introduced, the parsing overhead starts growing linear with the number of actions inside a pipeline. It’s still constant with object-oriented pipes however. If enough thoughts get into the protocol, much faster and amazingly more flexible applications get written.

One example would be, e.g., a full-scale XML based website where requested documents go though the method chains, incorporate information received by external databases, get pre-processed a bit and finally get converted into different output formats as XHTML for ordinary browsers, WML for embedded devices or even PDF for printing output.

Last but not least, writing pipes is fun, mainly due the incrementally style. At every point in time, you can see if stuff works so far and you always have an overview of what to be done. This helps development a lot as time estimation gets easier.

Appendix: Full Source

 module Pipeline
   attr_accessor :next_object

   def method_missing(name, *args)
     if @next_object
       if @next_object.respond_to?(name) ||
            @next_object.respond_to?(:method_missing)
         @next_object.send(name, *args)
       else
         raise NoMethodError,
               "undefined method `#{name}' to forward to #{@next_object}"
       end
     else
       raise NoMethodError,
             "undefined method `#{name}' for #{self} (and no pipeline defined)"
     end
   end

   alias :call_next :method_missing

   def call_next_maybe(name, *args)
     @next_object.send(name, *args)  if @next_object
   end

   def |(pipe)
     if pipe.kind_of? Class
       @next_object = pipe.new
     else
       @next_object = pipe
     end
   end
 end

 class Pipe
   include Pipeline
 end

 class Cat < Pipe
   def run(*filenames)
     filenames.each { |name|
       File.open(name) { |file|
         start_document
         file.each_byte { |b| byte b }
         end_document
       }
     }
   end
 end

 class Tracer < Pipe
   def method_missing(name, *args)
     p [name, *args]
     call_next_maybe name, *args
   end
 end

 class ShowBytes < Pipe
   def start_document; end
   def end_document; end
   def byte(b)
     print b.chr
   end
 end

 class Byte2Line < Pipe
   def initialize
     @line = ""
   end
   def end_document
     line @line  unless @line == ""
     super                       # forward
   end
   def byte(b)
     @line << b
     if b == ?\n
       line @line
       @line = ""
     end
   end
 end

 class Line2Byte < Pipe
   def line(l)
     l.each_byte { |b| byte b }
   end
 end

 class Sort < Pipe
   def initialize
     @lines = []
   end
   def line(l)
     @lines << l
   end
   def end_document
     @lines.sort.each { |l| call_next :line, l }
     super
   end
 end

 class Unindent < Pipe
   def line(l)
     super l.lstrip
   end
 end

 class WordCount < Pipe
   def start_document
     @words = 0
     super
   end

   def byte(b)
     if b == ?\s || b == ?\n
       @words += 1
     end
     super
   end

   def end_document
     words @words
     super
   end
 end

 class Report < ShowBytes
   def words(w)
     @words = w
   end
   def end_document
     puts "#{@words} words"
     super
   end
 end

 c = Cat.new
 c | Tracer
 c.run "/tmp/foo"

Bibliography

1) cm.bell-labs.com/cm/cs/who/dmr/hist.html

2) www.faqs.org/docs/artu/ch11s06.html

3) xmlstar.sourceforge.net

4) www.ruby-lang.org

5) "The Unix Environment" by Kernighan and Pike, Prentice-Hall, 1984

License

Copyright © 2004 Christian Neukirchen <chneukirchen@yahoo.de>

Permission is granted to copy, modify and/or distribute copies of this manual provided the copyright notice and this permission notice are preserved on all copies and the entire resulting derived work is distributed under the terms of a permission notice identical to this one.

[Validate]