Revert "Add optional filters to stream.pipe()"

This reverts commit 24aded078fd6838d2f21934e57c7cc8dfd7303d1.
This commit is contained in:
Ryan Dahl 2011-01-01 16:41:39 -08:00
parent 1c7cd4aac3
commit 8f5f213a6f
2 changed files with 3 additions and 84 deletions

View File

@ -65,7 +65,7 @@ Resumes the incoming `'data'` events after a `pause()`.
Closes the underlying file descriptor. Stream will not emit any more events.
### stream.pipe(destination, [options], [filter])
### stream.pipe(destination, [options])
This is a `Stream.prototype` method available on all `Stream`s.
@ -92,48 +92,6 @@ NOTE: If the source stream does not support `pause()` and `resume()`, this funct
adds simple definitions which simply emit `'pause'` and `'resume'` events on
the source stream.
The `filter` argument is an optional callback which can be used to filter all
data passing through the pipe. This makes it easy to do arbitrary transforms
(like gzip) while still maintaining the proper throttling. `filter` gets
three arguments: a buffer, a write function, and a done function. Here is an
example of a chat which uses a `filter` to append each message with the
address of the sender.
var net = require('net');
var people = [];
function address(socket) {
return '<' + socket.remoteAddress + ':' + socket.remotePort + '> ';
}
net.Server(function (socket) {
socket.write("hello!\r\n");
people.forEach(function (p) {
socket.pipe(p, { end: false }, function (d, write, done) {
write(address(socket));
write(d);
done();
});
p.pipe(socket, { end: false }, function (d, write, done) {
write(address(p));
write(d);
done();
});
});
people.push(socket);
socket.on('end', function () {
people.splice(people.indexOf(socket), 1);
});
}).listen(8000);
## Writable Stream
A `Writable Stream` has the following methods, members, and events.

View File

@ -7,55 +7,16 @@ function Stream() {
util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;
Stream.prototype.pipe = function(dest /* options, filter */) {
Stream.prototype.pipe = function(dest, options) {
var source = this;
// parse arguments
var options, filter;
if (typeof arguments[1] == 'object') {
options = arguments[1];
filter = arguments[2];
} else {
filter = arguments[1];
}
function ondata(chunk) {
// FIXME shouldn't need to test writable - this is working around bug.
// .writable should not change before a 'end' event is fired.
if (dest.writable) {
if (false === dest.write(chunk)) source.pause();
}
}
if (!filter) {
source.on('data', ondata);
} else {
//
// TODO: needs tests
//
var wait = false;
var waitQueue = [];
function done () {
wait = false;
// Drain the waitQueue
if (dest.writable && waitQueue.length) {
wait = true;
filter(waitQueue.shift(), ondata, done);
}
}
source.on('data', function (d) {
if (wait) {
waitQueue.push(d);
source.pause();
} else {
wait = true;
filter(d, ondata, done);
}
});
}
source.on('data', ondata);
function ondrain() {
if (source.readable) source.resume();