| Path: | /home/chris/pipeline.rdoc |
| Last Update: | Thu Feb 26 17:38:27 CET 2004 |
| Author: | Christian Neukirchen <chneukirchen@yahoo.de> |
| License: | See `License‘ |
| Last-change: | 26feb2004 +chris+ |
Pipelines (or pipes, for short) have played, and still play an important role in daily Unix usage. As [1] points out, Unix pipes were implemented in 1972, but the idea existed back in 1971 as part of the Dartmouth Time Sharing System.
Standard Unix pipes suffer, however, from a number of difficulties, which the next chapter is going to point out. The purpose of this paper is get the idea of Unix pipes into object-oriented languages, which can simplify many applications while avoiding the disadvantages of classical pipes.
Basically, pipes allow command chaining, by connecting the output of one program with the input of another.
To make pipes work, all those commands need to use the same "protocol", the way the data is specified. Since Unix mostly handles textual data (especially as part of the Documenter’s Workbench), most tools use "line-based" text files. This protocol basically inherits from another, the "raw byte stream". Raw byte streams are the only kind of data Unix supports kernel-sided. Files, and therefore pipes too, store/transport raw byte streams.
This means we now have tools that work on two separate levels, on the one hand character-wise (characters are assumed to be 8 bits (1 byte) long) and on the other line-wise.
Character-wise tools can work one line-based data, but line-wise tools have usually no use on raw byte streams (i.e. without line-based protocol).
Please note that "line" doesn’t mean this data consists of chars separated by Carriage Return, Carriage Return and Line Feed or Line Feed alone. In fact, many tools allow modification of the so-called line separator, which usually is "\n", that is, a Line Feed. (If a certain tool doesn’t have such a flag, you still could use "tr" for this job.)
On top of this line-based protocol, some tools define another level, the "field level", which allows marking up fields as part of lines. Common field separators are tabs, ";", ":" or simply whitespace. This allows the markup of two dimensional data, for example tables.
An example for a character-wise tool would be "tr", which translates chars or "cat" which concatenates files or streams. Line-wise tools include "sort", which sorts lines or "grep", a tool that prints lines matching a pattern. Field-wise, common tools include "cut", "join" and "awk".
These protocols are by no means the only ones, and there are lots of others, for example, paragraph based tools as "fmt" or troff filters which have their own (mostly line-wise, but not always. Also, with the raise of XML there are tools which map the basic Unix utils on top of XML, e.g. xmlstarlet [3]. These tools handle tree-based data.
This actually brings me to another important issue: Since pipes only transport raw byte streams, each application has to define the protocol it uses on its own (which unfortunately leads to weird bugs and incompatibilities, which at least older versions of Unix were famous for, e.g. limited line lengths, dropping of NUL bytes and a lot more)
Furthermore, you can difference these tools into four orthogonal groups by their so-called pattern. Here are the important ones regarding pipes (see [2] for reference):
Filters read data from input, process them, and write results to the output. Filters only process parts of the data at once, they don’t block the pipe.
Examples include "cat", "tr", and the mighty "sed". "awk" is also often used in this context.
Sponges are essentially like pipes, but usually read the input (or at least significant parts) into memory, process them, and dump them to the output. Therefore, sponges block the pipe, and remove the concurrency of the tools, which usually results in larger memory footprints and slower processing.
This is bad, but sometimes can’t be avoided (for example, "sort", or "tac", which reverses the lines in a file/stream or "wc" which counts lines, words and chars of a file/stream).
Many people would consider sponges filters too, but I separated them for reasons shown later.
Sources generate data, and output them. They don’t read the input and therefore can be used at the beginning of the pipe only.
Examples of sources include "who", "ps" and "date".
Sources are also known as generators.
Sinks read the input, but don’t output to the pipe (which doesn’t necessarily mean they drop the output).
Examples for sinks are "lpr" (the printer spooler) or "nc" (a tool to write to Internet sockets. At the same time "nc" is also a source, since it can retrieve data from sockets too).
In order to use the full power of pipes, the raw byte streams of Unix need to get replaced with something in a more object-oriented fashion, that is method chaining.
Since this also means that we can’t use the common Unix tools anymore and no Unix pipes at all, it’s time to switch into a higher-level, object-oriented language. I’m going to use Ruby [4] for the rest of this paper.
Since we have talked all the time of Unix tools before, let’s port some of the simpler ones to Ruby!
Cat will be the tool that takes filenames, and emits byte-actions for the chars inside them. It also sends a start_document and end_document messages at the start or end of the file.
class Cat < Pipe
def run(*filenames)
filenames.each { |name|
File.open(name) { |file|
start_document
file.each_byte { |b| byte b }
end_document
}
}
end
end
As you can see, actions are emitted by calling them as usual.
For debugging purposes, also let’s write a Tracer, that shows all messages (and their arguments) passed trough the pipe.
class Tracer < Pipe
def method_missing(name, *args)
p [name, *args]
call_next_maybe name, *args
end
end
call_next_maybe is a special function defined by the Pipe class: it tries to run the method name with the arguments args if possible, that is, if there is no further object in the chain, it won’t raise an error (as call_next would).
Now, let’s do a quick test:
c = Cat.new c | Tracer c.run "/tmp/foo"
This probably needs some explanation: A new Cat object is created and gets connected it with an anonymous Tracer in the second line (if classes get passed to |, they automatically get instanced, a convenience function). Finally, we run the initial method of the pipe to start processing. (In further examples, only the pipe definition will be shown.)
Once run, we get this as result:
[:start_document] [:byte, 71] [:byte, 114] [:byte, 101] ... [:byte, 110] [:byte, 46] [:byte, 10] [:end_document]
Good, this seems to work. Now, let’s do an simple byte-sink that prints on standard output too:
class ShowBytes < Pipe
def start_document; end
def end_document; end
def byte(b)
print b.chr
end
end
The empty definitions of start_document and end_document are a bit special: Usually, unhandled methods just get forwarded to the next object. Since ShowBytes is a sink, however, we need to implement all methods of the protocol:
Sinks need to implement all methods of the protocol
This is because if you erroneously spell, e.g. start_document wrong, you won’t get an NoMethodError and you will be very puzzled (believe me :^)). Therefore, sinks need to implement the whole protocol, even if they need to ignore parts of it.
Ok, after defining ShowBytes, we run
c | ShowBytes
and get:
Great fleas have little fleas upon their backs to bite 'em, And little fleas have lesser fleas, and so ad infinitum. And the great fleas, themselves, in turn, have greater fleas to go on; While these again have greater still, and greater still, and so on.
This is obliviously line-based data (the same poem is used in the first chapters of [5], by the way), let’s write an filter to transform byte-events into line-events:
class Byte2Line < Pipe
def initialize
@line = ""
end
def end_document
line @line unless @line == ""
super # forward
end
def byte(b)
@line << b
if b == ?\n
line @line
@line = ""
end
end
end
byte is defined to save all chars you get into the instance variable @line and emit it after a newline is got. Then the line gets cleared.
It’s possible, however, that a file may not end with a newline, thats why there is end_document to emit that partial line, too. You also see something special here: A call to super runs the next forwards the method to the next object in chain. As usual, super without arguments reuses the arguments of the current functions, while super with arguments calls it with the new ones.
Finally, we need to transform the line into bytes again. Line2Byte is used for that (we could have written a new sink of course, but why do something twice?):
class Line2Byte < Pipe
def line(l)
l.each_byte { |b| byte b }
end
end
Line2Byte just emits a byte for every char in the line.
As an example of a line-wise tool, and a sponge at the same time, we will now implement "sort":
class Sort < Pipe
def initialize
@lines = []
end
def line(l)
@lines << l
end
def end_document
@lines.sort.each { |l| call_next :line, l }
super
end
end
The structure of this sponge is typical: We first put all lines into @lines, then sort them, and forward them in end_document. (thats what we need it for, by the way, since its not possible to check for an EOF as in Unix pipes.)
call_next is used here since super can’t run other methods than the current.
Now, we can run
c | Byte2Line | Sort | Line2Byte | ShowBytes
and get:
and greater still, and so on. and so ad infinitum. have greater fleas to go on; upon their backs to bite 'em, And little fleas have lesser fleas, And the great fleas, themselves, in turn, Great fleas have little fleas While these again have greater still,
Hmm, it’s probably better if we write an tool to unindent the line first:
class Unindent < Pipe
def line(l)
super l.lstrip
end
end
Unindent is a typical event filter: It just modifies the value of its argument and forwards it.
c | Byte2Line | Unindent | Sort | Line2Byte | ShowBytes
Now, that’s better:
And little fleas have lesser fleas, And the great fleas, themselves, in turn, Great fleas have little fleas While these again have greater still, and greater still, and so on. and so ad infinitum. have greater fleas to go on; upon their backs to bite 'em,
Finally, we have assorted poetry.
Now, let’s count the words in this sorted poem:
class WordCount < Pipe
def start_document
@words = 0
super
end
def byte(b)
if b == ?\s || b == ?\n
@words += 1
end
super
end
def end_document
words @words
super
end
end
We listen on the pipe and count each occurrence of either space or newline. Then, in end_document, we generate a words event with the sum.
This needs to be output of course, so we inherit from ShowBytes, which already shows the data, and tell end_document to show the word count too:
class Report < ShowBytes
def words(w)
@words = w
end
def end_document
puts "#{@words} words"
super
end
end
Let’s test:
c | Byte2Line | Unindent | Sort | Line2Byte | WordCount | Report
Which results in:
And little fleas have lesser fleas, And the great fleas, themselves, in turn, Great fleas have little fleas While these again have greater still, and greater still, and so on. and so ad infinitum. have greater fleas to go on; upon their backs to bite 'em, 46 words
Checking with "wc" you see it works.
The implementation of above pipes is easy, it’s just about 40 lines of code.
In order to allow general usage, a mix-in Pipeline is defined. For convenience, there’s also a class Pipe which already includes Pipeline for instant inheritation.
Method chaining is very simple: Each object in the Pipe gets a new attribute @next_object holding a reference to the next object. Additionally, Pipeline defines method_missing (the method that is called when a object receives a message it cannot handle) to forward undefined methods, and to allow usage of super.
To allow the Unix-like syntax of |, this operator gets overloaded, it essentially sets @next_object to its parameter or creates a new instance of it if it’s a class.
The other methods are self-explaining.
module Pipeline
attr_accessor :next_object
def method_missing(name, *args)
if @next_object
if @next_object.respond_to?(name) ||
@next_object.respond_to?(:method_missing)
@next_object.send(name, *args)
else
raise NoMethodError,
"undefined method `#{name}' to forward to #{@next_object}"
end
else
raise NoMethodError,
"undefined method `#{name}' for #{self} (and no pipeline defined)"
end
end
alias :call_next :method_missing
def call_next_maybe(name, *args)
@next_object.send(name, *args) if @next_object
end
def |(pipe)
if pipe.kind_of? Class
@next_object = pipe.new
else
@next_object = pipe
end
end
end
class Pipe
include Pipeline
end
I have shown how object-oriented pipelines work and how they are implemented. Trying not to go beyond the scope of this paper, the examples have been rather simple and primitive, so it may not be possible to see the whole thing.
Also, some of these examples surely would be faster and more efficient written using the Unix tools introduced in the beginning. The point is that you can transport all kinds of object via these pipelines.
Especially if the protocols turn more difficult than the simple ones introduced, the parsing overhead starts growing linear with the number of actions inside a pipeline. It’s still constant with object-oriented pipes however. If enough thoughts get into the protocol, much faster and amazingly more flexible applications get written.
One example would be, e.g., a full-scale XML based website where requested documents go though the method chains, incorporate information received by external databases, get pre-processed a bit and finally get converted into different output formats as XHTML for ordinary browsers, WML for embedded devices or even PDF for printing output.
Last but not least, writing pipes is fun, mainly due the incrementally style. At every point in time, you can see if stuff works so far and you always have an overview of what to be done. This helps development a lot as time estimation gets easier.
module Pipeline
attr_accessor :next_object
def method_missing(name, *args)
if @next_object
if @next_object.respond_to?(name) ||
@next_object.respond_to?(:method_missing)
@next_object.send(name, *args)
else
raise NoMethodError,
"undefined method `#{name}' to forward to #{@next_object}"
end
else
raise NoMethodError,
"undefined method `#{name}' for #{self} (and no pipeline defined)"
end
end
alias :call_next :method_missing
def call_next_maybe(name, *args)
@next_object.send(name, *args) if @next_object
end
def |(pipe)
if pipe.kind_of? Class
@next_object = pipe.new
else
@next_object = pipe
end
end
end
class Pipe
include Pipeline
end
class Cat < Pipe
def run(*filenames)
filenames.each { |name|
File.open(name) { |file|
start_document
file.each_byte { |b| byte b }
end_document
}
}
end
end
class Tracer < Pipe
def method_missing(name, *args)
p [name, *args]
call_next_maybe name, *args
end
end
class ShowBytes < Pipe
def start_document; end
def end_document; end
def byte(b)
print b.chr
end
end
class Byte2Line < Pipe
def initialize
@line = ""
end
def end_document
line @line unless @line == ""
super # forward
end
def byte(b)
@line << b
if b == ?\n
line @line
@line = ""
end
end
end
class Line2Byte < Pipe
def line(l)
l.each_byte { |b| byte b }
end
end
class Sort < Pipe
def initialize
@lines = []
end
def line(l)
@lines << l
end
def end_document
@lines.sort.each { |l| call_next :line, l }
super
end
end
class Unindent < Pipe
def line(l)
super l.lstrip
end
end
class WordCount < Pipe
def start_document
@words = 0
super
end
def byte(b)
if b == ?\s || b == ?\n
@words += 1
end
super
end
def end_document
words @words
super
end
end
class Report < ShowBytes
def words(w)
@words = w
end
def end_document
puts "#{@words} words"
super
end
end
c = Cat.new
c | Tracer
c.run "/tmp/foo"
1) cm.bell-labs.com/cm/cs/who/dmr/hist.html
2) www.faqs.org/docs/artu/ch11s06.html
5) "The Unix Environment" by Kernighan and Pike, Prentice-Hall, 1984
Copyright © 2004 Christian Neukirchen <chneukirchen@yahoo.de>
Permission is granted to copy, modify and/or distribute copies of this manual provided the copyright notice and this permission notice are preserved on all copies and the entire resulting derived work is distributed under the terms of a permission notice identical to this one.