曲水流觞:上援下推
这篇博客的代码用 JavaScript 编写,力图省去一些不必要的语法细节。大部分现代语言都能复现这些代码,所以编程语言的选择不是最重要的。
循环
处理一组数据时,我们通常会用循环。循环的自由度很高,可以处理各种各样的情况。但是这样带来的问题是,循环的代码通常会变得很复杂,难以理解、维护和复用。这里我们用一个非常简单的例子说明这个问题。
// find the first 3 even squares, but skip the first one
let skip = 1
let take = 3
for (let i = 0; i < 10; i++) {
let sq = i * i;
if (sq % 2 == 0) {
if (skip-- <= 0) {
if (take-- > 0) {
console.log(sq); // prints 4, 16, 36
} else {
break;
}
}
}
}
我们能不能把这个循环的逻辑分解成几个独立的部分,然后组合起来呢?
批处理
一个常见的想法是,将每一步的处理结果保存下来,然后传递给下一步。这里我们给出一个上面例子的批处理版本。
function iota(n) {
let result = []
for (let i = 0; i < n; i++) {
result.push(i);
}
return result;
}
function map(mapper, array) {
let result = [];
for (const item of array) {
result.push(mapper(item));
}
return result;
}
function filter(predicate, array) {
let result = [];
for (const item of array) {
if (predicate(item)) {
result.push(item);
}
}
return result;
}
function take(n, array) {
let result = [];
for (const item of array) {
if (n-- > 0) {
result.push(item);
} else {
break;
}
}
return result;
}
function skip(n, array) {
let result = [];
for (const item of array) {
if (n-- <= 0) {
result.push(item);
}
}
return result;
}
function forEach(fn, array) {
for (const item of array) {
fn(item);
}
}
forEach(console.log, take(3, skip(1, filter(x => x % 2 == 0, map(x => x * x, iota(10))))));
这一套函数在 JavaScript 的数组上已经有了现成的部分实现(map
、filter
、forEach
),slice
和 skip
、take
功能类似但是原理不同。这里写出来是为了更好的说明问题。
这个版本的代码更加清晰,每个函数只做一件事情。但是这个版本的代码有两个问题:
- 每个函数都会生成一个新的数组,但是我们只需要最终的结果。这样会导致空间的浪费。
- 每个函数都会遍历整个数组,但是我们最终只需要三个元素。这样会导致时间的浪费。
作为一个例子,我们请求的数据还很廉价,流程还很简单。但是在真实场景中,如果数据请求成本高昂,流程更加复杂,这两种浪费将不可忽略。我们能不能做的更好呢?
流水线(上)
批处理的做法是一次处理所有元素的一个步骤,完成之后再进入下一个步骤。如果我们能够一次处理一个元素,一次性就把这个元素的所有步骤跑通,那么我们就可以避免上面的两个问题。这就是流水线的思想。
迭代器
批处理中用到的 for...of
就是针对迭代器的一个特性,只不过是一次性遍历所有元素。现在我们要手动操作迭代器实现一个流水线。
为了让不了解 JavaScript 的读者能够理解,我们先给出一个简单的迭代器的例子作为解释。
function iota(n) {
let i = 0;
return {
next() {
if (i < n) {
return { value: i++, done: false };
} else {
return { done: true };
}
}
};
}
这个函数返回一个迭代器,这个迭代器可以生成 0 到 n-1 的整数。
let it = iota(10);
let result = it.next();
while (!result.done) {
console.log(result.value);
result = it.next();
}
这个例子中,我们手动调用了迭代器的 next
方法,直到 done
为 true
。
为了方便起见,我们接下来使用这样一套工具:
function iterate(next) {
return {
[Symbol.iterator]() {
return this;
},
next
};
}
function more(value) {
return { done: false, value };
}
function done() {
return { done: true };
}
第一个函数可以让迭代器本身也是可迭代的对象,这样我们可以直接使用 for...of
语法。后两个函数可以让我们更方便的生成迭代器的返回值。
于是我们可以把批处理的代码重写如下:
function iota(n) {
let i = 0;
return iterate(() => i < n ? more(i++) : done());
}
function map(mapper, upstream) {
return iterate(() => {
let next = upstream.next();
return next.done ? done() : more(mapper(next.value));
});
}
function filter(predicate, upstream) {
return iterate(() => {
let next = upstream.next();
while (!next.done && !predicate(next.value)) {
next = upstream.next();
}
return next;
});
}
function take(n, upstream) {
return iterate(() => n-- > 0 ? upstream.next() : done());
}
function skip(n, upstream) {
return iterate(() => {
while (n-- > 0) {
if (upstream.next().done) {
return done();
}
}
return upstream.next();
});
}
function forEach(fn, upstream) {
for (const item of upstream) {
fn(item);
}
}
function toArray(upstream) {
const result = [];
forEach(item => result.push(item), upstream);
return result;
// or you can use spread operator
// return [...upstream];
}
forEach(console.log, take(3, skip(1, filter(x => x % 2 == 0, map(x => x * x, iota(10))))));
upstream
是上游迭代器,通过构造一个迭代器链,最下游的迭代器被调用一次,会按需调用上游迭代器,直到最上游的迭代器。最上游的迭代器会生成数据,然后数据会一级一级的传递到最下游的迭代器。
迭代器的理念虽好,但是编写迭代器的代码还是有点费脑子。例如 map
和 take
的 next
只需要调用至多一次上游迭代器,而 filter
和 skip
需要在 next
方法中循环调用上游迭代器。这是因为无论上游迭代器提供的数据是否符合条件,都需要为下游迭代器提供数据。同时,维护迭代器的状态也是一个问题。下面我们引入一个新的函数 flatMap
来研究一下。
扁平化(上)
有的时候我们需要把一个元素映射成多个元素,然后再处理。例如
let result = []
for (let i = 0; i < 3; i++) {
for (let j = 0; j <= i; j++) {
result.push(j);
}
}
console.log(result); // prints [0, 0, 1, 0, 1, 2]
如果这里用 map
和 iota
函数,我们会得到一个数组的数组,而不是一个扁平的数组。这就需要 flatMap
,它可以把一个元素映射成一个迭代器,然后把这个迭代器的所有元素串联起来。我们先给出它的批处理版本。
function flatMap(mapper, array) {
let result = [];
for (const item of array) {
for (const subitem of mapper(item)) {
result.push(subitem);
}
// or you can use spread operator:
// result.push(...mapper(item));
}
return result;
}
console.log(flatMap(x => iota(x + 1), iota(3))); // prints [0, 0, 1, 0, 1, 2]
但如果我们要实现一个迭代器版本的 flatMap
,我们会发现这个函数的实现变得非常复杂。
function flatMap(mapper, upstream) {
let iterator = null;
return iterate(() => {
while (true) {
if (iterator === null) {
let next = upstream.next();
if (next.done) {
return done();
}
iterator = mapper(next.value);
}
let next = iterator.next();
if (next.done) {
iterator = null;
continue;
}
return next;
}
});
}
console.log(toArray(flatMap(x => iota(x + 1), iota(3)))); // prints [0, 0, 1, 0, 1, 2]
生成器
为了解决手工编写迭代器代码的难题,我们可以使用生成器协程来简化这个过程。
function*
表明这个函数是一个生成器协程,调用这个函数会返回一个迭代器。函数中的 yield
关键字可以让一个函数在执行过程中暂停,然后再次调用时从暂停的地方继续执行,函数的执行过程就像一个迭代器一样。这样迭代器的状态就可以保存在函数的局部变量中,而不需要手动维护。
function* iota(n) {
for (let i = 0; i < n; i++) {
yield i;
}
}
function* map(mapper, upstream) {
for (const item of upstream) {
yield mapper(item);
}
}
function* filter(predicate, upstream) {
for (const item of upstream) {
if (predicate(item)) {
yield item;
}
}
}
function* take(n, upstream) {
for (const item of upstream) {
if (n-- <= 0) {
break;
}
yield item;
}
}
function* skip(n, upstream) {
for (const item of upstream) {
if (n-- > 0) {
continue;
}
yield item;
}
}
// forEach and toArray are the same as before since they are terminal operations
forEach(console.log, take(3, skip(1, filter(x => x % 2 == 0, map(x => x * x, iota(10))))));
不妨再来看看这个版本的 flatMap
。
function* flatMap(mapper, upstream) {
for (const item of upstream) {
for (const subitem of mapper(item)) {
yield subitem;
}
// or you can use yield* to delegate to another generator
// yield* mapper(item);
}
}
console.log(toArray(flatMap(x => iota(x + 1), iota(3))));
yield*
关键字可以让一个生成器协程委托给另一个生成器协程。生成器内部实际上维护了一个栈,每次调用 next
方法时,会从栈顶取出一个生成器协程执行。生成器耗尽,弹出栈顶的生成器协程,继续执行上一个生成器协程。这样的机制进一步省去了嵌套带来的麻烦。
生成器协程是一种非常强大的工具,可以让我们用一种非常直观的方式来处理迭代器。
链式调用(上)
不过套娃的方式还是有点繁琐,我们可以使用链式调用的方式来简化代码。不同的语言对这块的处理方式不尽相同,这里我们给出一个简单的实现。
对不了解 JavaScript 的读者来说,这里的代码可能有点难以理解,这里做一点简单的解释。
Object.assign
可以把多个对象合并成一个对象。第一个参数是目标对象,后面的参数是源对象。这个函数会把源对象的属性复制到目标对象上,并返回目标对象。
chain
函数的作用就是给上游的迭代器添加 map
、filter
、take
、skip
、flatMap
、forEach
、toArray
这几个方法,这样就可以链式调用了。
function chain(upstream) {
return Object.assign(upstream, {
map: mapper => chain(map(mapper, upstream)),
filter: predicate => chain(filter(predicate, upstream)),
take: n => chain(take(n, upstream)),
skip: n => chain(skip(n, upstream)),
flatMap: mapper => chain(flatMap(mapper, upstream)),
forEach: fn => forEach(fn, upstream),
toArray: () => toArray(upstream)
});
}
function decorate(source) {
return (...args) => chain(source(...args));
}
iota = decorate(iota);
iota(10)
.map(x => x * x)
.filter(x => x % 2 == 0)
.skip(1)
.take(3)
.forEach(console.log);
console.log(iota(3)
.flatMap(x => iota(x + 1))
.toArray());