Skip to main content

RAII

· 3 min read

构造器-析构器

#include <cstdio>

struct Locker {
Locker() {
puts("lock");
}
~Locker() {
puts("unlock");
}
};

int main() {
Locker locker;
}

控制块

class Locker implements AutoCloseable {
public Locker() {
System.out.println("lock");
}

@Override
public void close() {
System.out.println("unlock");
}

public static void main(String[] args) {
try (Locker locker = new Locker()) {

}
}
}

defer 语句

import "fmt"

func main() {
defer fmt.Println("unlock")
fmt.Println("lock")
}

思考题

这样写实现 RAII 有什么问题:

var r1 = open("r1");
var r2 = open("r2");
try {
// ...
} finally {
r1.close();
r2.close();
}

正确的写法应该是:

try (var r1 = open("r1"); var r2 = open("r2")) {
// ...
}

这种写法语前面的写法不同,而是等价于下面的写法:

var r1 = open("r1");
try {
var r2 = open("r2");
try {
// ...
} finally {
r2.close();
}
} finally {
r1.close();
}

因此这个语法糖相当于是把嵌套结构变成并列结构了,大大简化了代码编写。

曲水流觞:上援下推

· 29 min read
note

这篇博客的代码用 JavaScript 编写,力图省去一些不必要的语法细节。大部分现代语言都能复现这些代码,所以编程语言的选择不是最重要的。

循环

处理一组数据时,我们通常会用循环。循环的自由度很高,可以处理各种各样的情况。但是这样带来的问题是,循环的代码通常会变得很复杂,难以理解、维护和复用。这里我们用一个非常简单的例子说明这个问题。

// find the first 3 even squares, but skip the first one
let skip = 1
let take = 3
for (let i = 0; i < 10; i++) {
let sq = i * i;
if (sq % 2 == 0) {
if (skip-- <= 0) {
if (take-- > 0) {
console.log(sq); // prints 4, 16, 36
} else {
break;
}
}
}
}

我们能不能把这个循环的逻辑分解成几个独立的部分,然后组合起来呢?

批处理

一个常见的想法是,将每一步的处理结果保存下来,然后传递给下一步。这里我们给出一个上面例子的批处理版本。

function iota(n) {
let result = []
for (let i = 0; i < n; i++) {
result.push(i);
}
return result;
}

function map(mapper, array) {
let result = [];
for (const item of array) {
result.push(mapper(item));
}
return result;
}

function filter(predicate, array) {
let result = [];
for (const item of array) {
if (predicate(item)) {
result.push(item);
}
}
return result;
}

function take(n, array) {
let result = [];
for (const item of array) {
if (n-- > 0) {
result.push(item);
} else {
break;
}
}
return result;
}

function skip(n, array) {
let result = [];
for (const item of array) {
if (n-- <= 0) {
result.push(item);
}
}
return result;
}

function forEach(fn, array) {
for (const item of array) {
fn(item);
}
}

forEach(console.log, take(3, skip(1, filter(x => x % 2 == 0, map(x => x * x, iota(10))))));
info

这一套函数在 JavaScript 的数组上已经有了现成的部分实现(mapfilterforEach),sliceskiptake 功能类似但是原理不同。这里写出来是为了更好的说明问题。

这个版本的代码更加清晰,每个函数只做一件事情。但是这个版本的代码有两个问题:

  • 每个函数都会生成一个新的数组,但是我们只需要最终的结果。这样会导致空间的浪费。
  • 每个函数都会遍历整个数组,但是我们最终只需要三个元素。这样会导致时间的浪费。

作为一个例子,我们请求的数据还很廉价,流程还很简单。但是在真实场景中,如果数据请求成本高昂,流程更加复杂,这两种浪费将不可忽略。我们能不能做的更好呢?

流水线(上)

批处理的做法是一次处理所有元素的一个步骤,完成之后再进入下一个步骤。如果我们能够一次处理一个元素,一次性就把这个元素的所有步骤跑通,那么我们就可以避免上面的两个问题。这就是流水线的思想。

迭代器

批处理中用到的 for...of 就是针对迭代器的一个特性,只不过是一次性遍历所有元素。现在我们要手动操作迭代器实现一个流水线。

为了让不了解 JavaScript 的读者能够理解,我们先给出一个简单的迭代器的例子作为解释。

function iota(n) {
let i = 0;
return {
next() {
if (i < n) {
return { value: i++, done: false };
} else {
return { done: true };
}
}
};
}

这个函数返回一个迭代器,这个迭代器可以生成 0 到 n-1 的整数。

let it = iota(10);
let result = it.next();
while (!result.done) {
console.log(result.value);
result = it.next();
}

这个例子中,我们手动调用了迭代器的 next 方法,直到 donetrue

为了方便起见,我们接下来使用这样一套工具:

function iterate(next) {
return {
[Symbol.iterator]() {
return this;
},
next
};
}

function more(value) {
return { done: false, value };
}

function done() {
return { done: true };
}

第一个函数可以让迭代器本身也是可迭代的对象,这样我们可以直接使用 for...of 语法。后两个函数可以让我们更方便的生成迭代器的返回值。

于是我们可以把批处理的代码重写如下:

function iota(n) {
let i = 0;
return iterate(() => i < n ? more(i++) : done());
}

function map(mapper, upstream) {
return iterate(() => {
let next = upstream.next();
return next.done ? done() : more(mapper(next.value));
});
}

function filter(predicate, upstream) {
return iterate(() => {
let next = upstream.next();
while (!next.done && !predicate(next.value)) {
next = upstream.next();
}
return next;
});
}

function take(n, upstream) {
return iterate(() => n-- > 0 ? upstream.next() : done());
}

function skip(n, upstream) {
return iterate(() => {
while (n-- > 0) {
if (upstream.next().done) {
return done();
}
}
return upstream.next();
});
}

function forEach(fn, upstream) {
for (const item of upstream) {
fn(item);
}
}

function toArray(upstream) {
const result = [];
forEach(item => result.push(item), upstream);
return result;
// or you can use spread operator
// return [...upstream];
}

forEach(console.log, take(3, skip(1, filter(x => x % 2 == 0, map(x => x * x, iota(10))))));

upstream 是上游迭代器,通过构造一个迭代器链,最下游的迭代器被调用一次,会按需调用上游迭代器,直到最上游的迭代器。最上游的迭代器会生成数据,然后数据会一级一级的传递到最下游的迭代器。

迭代器的理念虽好,但是编写迭代器的代码还是有点费脑子。例如 maptakenext 只需要调用至多一次上游迭代器,而 filterskip 需要在 next 方法中循环调用上游迭代器。这是因为无论上游迭代器提供的数据是否符合条件,都需要为下游迭代器提供数据。同时,维护迭代器的状态也是一个问题。下面我们引入一个新的函数 flatMap 来研究一下。

扁平化(上)

有的时候我们需要把一个元素映射成多个元素,然后再处理。例如

let result = []
for (let i = 0; i < 3; i++) {
for (let j = 0; j <= i; j++) {
result.push(j);
}
}
console.log(result); // prints [0, 0, 1, 0, 1, 2]

如果这里用 mapiota 函数,我们会得到一个数组的数组,而不是一个扁平的数组。这就需要 flatMap,它可以把一个元素映射成一个迭代器,然后把这个迭代器的所有元素串联起来。我们先给出它的批处理版本。

function flatMap(mapper, array) {
let result = [];
for (const item of array) {
for (const subitem of mapper(item)) {
result.push(subitem);
}
// or you can use spread operator:
// result.push(...mapper(item));
}
return result;
}

console.log(flatMap(x => iota(x + 1), iota(3))); // prints [0, 0, 1, 0, 1, 2]

但如果我们要实现一个迭代器版本的 flatMap,我们会发现这个函数的实现变得非常复杂。

function flatMap(mapper, upstream) {
let iterator = null;
return iterate(() => {
while (true) {
if (iterator === null) {
let next = upstream.next();
if (next.done) {
return done();
}
iterator = mapper(next.value);
}
let next = iterator.next();
if (next.done) {
iterator = null;
continue;
}
return next;
}
});
}

console.log(toArray(flatMap(x => iota(x + 1), iota(3)))); // prints [0, 0, 1, 0, 1, 2]

生成器

为了解决手工编写迭代器代码的难题,我们可以使用生成器协程来简化这个过程。

function* 表明这个函数是一个生成器协程,调用这个函数会返回一个迭代器。函数中的 yield 关键字可以让一个函数在执行过程中暂停,然后再次调用时从暂停的地方继续执行,函数的执行过程就像一个迭代器一样。这样迭代器的状态就可以保存在函数的局部变量中,而不需要手动维护。

function* iota(n) {
for (let i = 0; i < n; i++) {
yield i;
}
}

function* map(mapper, upstream) {
for (const item of upstream) {
yield mapper(item);
}
}

function* filter(predicate, upstream) {
for (const item of upstream) {
if (predicate(item)) {
yield item;
}
}
}

function* take(n, upstream) {
for (const item of upstream) {
if (n-- <= 0) {
break;
}
yield item;
}
}

function* skip(n, upstream) {
for (const item of upstream) {
if (n-- > 0) {
continue;
}
yield item;
}
}

// forEach and toArray are the same as before since they are terminal operations

forEach(console.log, take(3, skip(1, filter(x => x % 2 == 0, map(x => x * x, iota(10))))));

不妨再来看看这个版本的 flatMap

function* flatMap(mapper, upstream) {
for (const item of upstream) {
for (const subitem of mapper(item)) {
yield subitem;
}
// or you can use yield* to delegate to another generator
// yield* mapper(item);
}
}

console.log(toArray(flatMap(x => iota(x + 1), iota(3))));

yield* 关键字可以让一个生成器协程委托给另一个生成器协程。生成器内部实际上维护了一个栈,每次调用 next 方法时,会从栈顶取出一个生成器协程执行。生成器耗尽,弹出栈顶的生成器协程,继续执行上一个生成器协程。这样的机制进一步省去了嵌套带来的麻烦。

生成器协程是一种非常强大的工具,可以让我们用一种非常直观的方式来处理迭代器。

链式调用(上)

不过套娃的方式还是有点繁琐,我们可以使用链式调用的方式来简化代码。不同的语言对这块的处理方式不尽相同,这里我们给出一个简单的实现。

info

对不了解 JavaScript 的读者来说,这里的代码可能有点难以理解,这里做一点简单的解释。

Object.assign 可以把多个对象合并成一个对象。第一个参数是目标对象,后面的参数是源对象。这个函数会把源对象的属性复制到目标对象上,并返回目标对象。

chain 函数的作用就是给上游的迭代器添加 mapfiltertakeskipflatMapforEachtoArray 这几个方法,这样就可以链式调用了。

function chain(upstream) {
return Object.assign(upstream, {
map: mapper => chain(map(mapper, upstream)),
filter: predicate => chain(filter(predicate, upstream)),
take: n => chain(take(n, upstream)),
skip: n => chain(skip(n, upstream)),
flatMap: mapper => chain(flatMap(mapper, upstream)),
forEach: fn => forEach(fn, upstream),
toArray: () => toArray(upstream)
});
}

function decorate(source) {
return (...args) => chain(source(...args));
}

iota = decorate(iota);

iota(10)
.map(x => x * x)
.filter(x => x % 2 == 0)
.skip(1)
.take(3)
.forEach(console.log);

console.log(iota(3)
.flatMap(x => iota(x + 1))
.toArray());

流水线(下)

流水线除了带来延时求值的好处,还有一个好处是可以并行处理。但是上面基于迭代器似乎不太容易实现并行处理。归根结底,这是因为

  • 一个迭代器的数据只能传递给下一个迭代器,而不能传递给多个迭代器。
  • 经过层层套娃,数据的源头被隐藏了,无法有效的分割来实现并行处理。

那我们能不能设计一个更加灵活的流水线呢?这需要一些逆向思维——能否不再是下游迭代器主动调用上游迭代器,而是上游迭代器主动把数据传递给下游迭代器呢?

回调

现在我们完全把上下游关系倒置,重新编写如下的流水线。

function iota(n, downstream) {
for (let i = 0; i < n; i++) {
downstream(i);
}
}

function map(mapper, downstream) {
return item => {
downstream(mapper(item));
};
}

function filter(predicate, downstream) {
return item => {
if (predicate(item)) {
downstream(item);
}
};
}

function take(n, downstream) {
return item => {
if (n-- > 0) {
downstream(item);
}
};
}

function skip(n, downstream) {
return item => {
if (n-- <= 0) {
downstream(item);
}
};
}

function forEach(fn) {
return item => {
fn(item);
};
}

function toArray(result) {
return item => {
result.push(item);
};
}

iota(10, map(x => x * x, filter(x => x % 2 == 0, skip(1, take(3, forEach(console.log))))));
let result = [];
iota(10, toArray(result));
console.log(result);

请注意最后一行的套娃顺序,已经完全反转了。与刚才的逻辑完全不同,整个流水线是由源头而非终点驱动的。现在每一个环节都只处理一个元素,然后把这个元素传递给下一个环节。这一点从这几个函数的定义就能看出来,非常简单直接。

短路

这样驱动的流水线有一个问题:当下游希望结束流水线时,上游并不知道。所以需要手动实现一个短路机制。

function iota(n, downstream) {
for (let i = 0; i < n; i++) {
if (downstream.cancelled()) {
return;
}
console.log(`iota: ${i}`); // for demonstration
downstream(i);
}
}

function delegateCancellable(downstream, fn) {
return Object.assign(fn, {
cancelled: () => downstream.cancelled()
});
}

function map(mapper, downstream) {
return delegateCancellable(downstream, item => {
downstream(mapper(item));
});
}

function filter(predicate, downstream) {
return delegateCancellable(downstream, item => {
if (predicate(item)) {
downstream(item);
}
});
}

function take(n, downstream) {
return Object.assign(item => {
if (n-- > 0) {
downstream(item);
}
}, {
cancelled: () => n <= 0 || downstream.cancelled()
});
}

function skip(n, downstream) {
return delegateCancellable(downstream, item => {
if (n-- <= 0) {
downstream(item);
}
});
}

function forEach(fn) {
return Object.assign(item => {
fn(item)
}, {
cancelled: () => false
});
}

iota(10, map(x => x * x, filter(x => x % 2 === 0, skip(1, take(3, forEach(console.log))))));

输出:

iota: 0
iota: 1
iota: 2
4
iota: 3
iota: 4
16
iota: 5
iota: 6
36

这样,我们就可以在任何一个环节中终止流水线。

为了简洁起见,后面的代码中我们省略了这个机制。

链式调用(下)(一)

这个形式看上去似乎不太容易嵌套——想要增加一步操作,在外套一层是很符合直觉的,但是往最里面塞一层却需要一些函数式的技巧。我们给出一个简单的链式调用的实现。

function chain(source) {
return Object.assign(source, {
map: mapper => chain(downstream => source(map(mapper, downstream))),
filter: predicate => chain(downstream => source(filter(predicate, downstream))),
take: n => chain(downstream => source(take(n, downstream))),
skip: n => chain(downstream => source(skip(n, downstream))),
forEach: fn => source(forEach(fn)),
toArray: () => {
let result = [];
source(toArray(result));
return result;
}
});
}

function decorate(source) {
return (...args) => chain(downstream => source(...args, downstream));
}

iota = decorate(iota);

iota(10)
.map(x => x * x)
.filter(x => x % 2 == 0)
.skip(1)
.take(3)
.forEach(console.log);

希望你暂时没被绕晕,因为魔法才刚刚开始,接下来,准备迎接函数式的洗礼吧!

科里化

所谓科里化,就是把一个多参数的函数转换成一系列单参数函数的过程。例如:

function add(x, y) {
return x + y;
}
let sum = add(1, 2);

可以转换成

function add(x) {
return y => x + y;
}
let sum = add(1)(2);

这样的好处是可以更加灵活的组合函数。

在我们的例子中,我们可以把每一个环节都转换成一个单参数函数,然后再组合起来。

function iota(n) {
return downstream => {
for (let i = 0; i < n; i++) {
downstream(i);
}
};
}

function map(mapper) {
return downstream => item => {
downstream(mapper(item));
};
}

function filter(predicate) {
return downstream => item => {
if (predicate(item)) {
downstream(item);
}
};
}

function take(n) {
return downstream => item => {
if (n-- > 0) {
downstream(item);
}
};
}

function skip(n) {
return downstream => item => {
if (n-- <= 0) {
downstream(item);
}
};
}

// forEach and toArray are the same as before since they are terminal operations

iota(10)(map(x => x * x)(filter(x => x % 2 == 0)(skip(1)(take(3)(forEach(console.log))))));

扁平化(下)

tip

开始之前,可以先阅读一下 Lambda Calculus 的介绍。

在流水线倒置之后,循环便从中间的过程中消失了,函数变得异常简单,这也为我们窥视他们的本质提供了一个绝佳的机会。

例如,如果自己观察上面 forEach 的实现,就会发现它其实等价于原封不动的返回了它的参数。

function forEach(fn) {
return item => {
fn(item);
};
// or you can return fn directly
// return fn;
}

如果你自己观察,就会发现这其实是 lambda calculus 中的 eta reduction

λy.xy=x\lambda y.xy = x

map 函数的定义:

function map(mapper) {
return downstream => item => {
downstream(mapper(item));
};
}

可以写成

const map = mapper => downstream => item => downstream(mapper(item));

函数参数的名字并不重要(alpha reduction),略去名字可得

const map = f => g => x => g(f(x));

记作 lambda calculus 中的形式:

map=λf.λg.λx.g(fx)\text{map}=\lambda f.\lambda g.\lambda x.g(fx)

你会意识到这是一种纯粹的函数组合:把 fg 组合起来,先调用 f,然后用结果再调用 g

现在,请你思考一下: flatMap 函数该如何定义?

你会发现,mapper(item) 返回的对象,实际上就是可以驱动下游的源头。直接将它用于驱动下游,就可以实现扁平化的效果。

function flatMap(mapper) {
return downstream => item => {
mapper(item)(downstream);
};
}

let result = [];
iota(3)(flatMap(x => iota(x + 1))(toArray(result)));
console.log(result);

也可以写成

const flatMap = mapper => downstream => item => mapper(item)(downstream);

略去名字可得

const flatMap = f => g => x => f(x)(g);

记作 lambda calculus 中的形式:

flatMap=λf.λg.λx.fxg\text{flatMap}=\lambda f.\lambda g.\lambda x.fxg

在这之前,你可能根本不会相信,f => g => x => g(f(x))f => g => x => f(x)(g) 竟然有这样的含义。

链式调用(下)(二)

我们不妨重新实现一次链式调用,但利用科里化的优势,采取不同的策略——把每一步存起来,到最后再进行组合。这个策略的好处在后面的小节会有所体现。

function chain(source) {
let actions = []

return Object.assign(source, {
pipe(action) {
actions.push(action)
return this
},
run(terminal) {
let dowstream = terminal;
while (actions.length > 0) {
dowstream = actions.pop()(dowstream)
}
source(dowstream)
},
map: mapper => source.pipe(map(mapper)),
filter: predicate => source.pipe(filter(predicate)),
take: n => source.pipe(take(n)),
skip: n => source.pipe(skip(n)),
forEach: fn => source.run(forEach(fn)),
toArray: () => {
let result = [];
source.run(toArray(result));
return result;
}
});
}

function decorate(source) {
return (...args) => chain(source(...args));
}

iota = decorate(iota)

iota(10)
.map(x => x * x)
.filter(x => x % 2 == 0)
.skip(1)
.take(3)
.forEach(console.log);

分割与并行

有了以上的内容作为铺垫,我们现在可以很容易的实现并行处理。

warning

很遗憾,一般来说 JavaScript 是单线程的,而 Web Worker 太重量级不适用于我们这种情景。所以接下来实现的并行只是一种模拟,大家就当咱只有一个核心的 CPU 就好了。如果你采用的是其他语言,可以很容易的实现真正的并行。

实现并行的核心在于,将数据源分割成两部分,然后分别传递给两个下游。所以我们需要先添加一个 split 方法。

function half(n) {
return Math.floor(n / 2);
}

function iota(n) {
return Object.assign(downstream => {
for (let i = 0; i < n; i++) {
downstream(i);
}
}, {
split: () => [range(0, half(n)), range(half(n), n)]
});
}

function range(start, end) {
return Object.assign(downstream => iota(end - start)(map(x => x + start)(downstream)), {
split: () => [range(start, half(start + end)), range(half(start + end), end)]
});
}

然后稍微改造一下我们的链式调用函数。这里不妨让我们假设 || 就是我添加的并行操作符。我故意让右边的操作符先执行,这样就可以模拟并行乱序的效果。

function chain(source) {
let actions = []
let is_parallel = false

return Object.assign(source, {
parallel() {
is_parallel = true
return this
},
pipe(action) {
actions.push(action)
return this
},
run(terminal) {
let downstream = terminal;
while (actions.length > 0) {
downstream = actions.pop()(downstream)
}
if (is_parallel) {
let [left, right] = source.split()
right(downstream) || left(downstream) // fake parallelism
} else {
source(downstream)
}
},
map: mapper => source.pipe(map(mapper)),
filter: predicate => source.pipe(filter(predicate)),
forEach: fn => source.run(forEach(fn))
});
}

function decorate(source) {
return (...args) => chain(source(...args))
}

iota = decorate(iota)
range = decorate(range)

range(2, 10)
.map(x => x * x)
.filter(x => x % 2 == 0)
.parallel()
.forEach(console.log) // prints 36, 64, 4, 16

对此,我们可以画一张示意图:

状态和顺序

你可能注意到我这里只用了 mapfiltertakeskip 似乎被省略了。这是因为

  • mapfilter 是无状态的,而 takeskip 是有状态的。
  • mapfilter 是无序的,而 takeskip 是有序的。

所谓有状态,是指一个函数的输出依赖于之前的输入。例如 take 函数,它需要记录已经处理了多少个元素。在并行化的情况下,这种状态是无法共享的。

所谓有序,是指元素之间有顺序关系。例如 skip 函数,它跳过的是开头的几个元素。在并行化的情况下,这种顺序是无法保证的。

有状态不一定要求有序,例如如果我们为我们的流水线添加一个 sort(),它是有状态的,但是不要求有序。有序一般要求有状态,不然这种序无法体现。

想要解决这个问题,我们就需要在遇到有状态和有序的函数时,将数据进行汇总,处理完之后然后再分发。大致的思路是这样的:

一般来说,子序列内部是有序的,但是子序列之间是无序的。所以合并的时候可以参考源头的分割的顺序。

不过,如果数据源本身就是无序的(如哈希表),那么 take 可以在所有线程取至多任意 n 元素后合并再取 n 个元素,这样就可以并行处理了。但是对于大多数情况,这样的技巧并不适用。

总的来说,这一部分的处理是比较复杂的,我就不给出代码了,留作习题

反思:曲水流殇,上下其手

在上面的文章里,我们介绍了流水线一些优点,如延时求值、短路求值、并行求值。流水线优点那么多,我应该处处使用它们吗?

当然不是。流水线同时也带来了很多问题,包括但不限于:

  • 流水线的调试比较困难,层层嵌套的函数调用栈会让你很难找到问题所在。
  • 流水线的结构会消耗很多内存,有时可能比要处理的数据本身还多,这对 GC 语言来说是不小的负担。
  • 流水线之间的操作难于协作,无法进行整体优化。(如将 sorttake 合并为 top-k 算法)
  • 流水线库提供的操作有限,强行复用代码丑陋,自己实现则比较麻烦,因此相比循环灵活性受限。
  • 流水线破坏了数据的局部性(尤其是对于小数组而言),这可能会导致缓存失效,性能下降。

正所谓,当你的手里只有一把锤子,那么所有的问题都会变成钉子。流水线是一种很好的工具,但是并不是万能的。有时候,我们还是需要回到循环和批处理的怀抱。

后记

标题是是两个成语,前者指流水线,后者表示本文的两个篇章——从上游获取元素(上拉)和向下游传递元素(下推)。

流水线在数据库管理系统里面又叫“火山模型”。批处理和流水线的选择在数据库管理系统中也是一个重要的问题。

很多语言的标准库提供对迭代器的操作(如 Python、Rust),这都是上拉流水线的实现。

我有一个 C++ 版本的上拉流水线的简单实现,可供参考

本文下推流水线部分的灵感来自于 Scala 和 Java 标准库的 Stream。两者用于分割的类分别叫做 StepperSpliterator

同为 JVM 语言的 Kotlin 标准库则同时提供了批处理和流水线两种风格的函数。其中流水线 Sequence 采用的是上拉。正如它名字所述,与另外两个语言的 Stream 不同,Sequence 只能串行,不能并行。

本文还有一个姊妹篇:迭代器的接口对比,介绍了不同语言的迭代器接口异同。

迭代器的接口对比

· 6 min read

迭代器的接口需要包含三种基本操作:

  • 判断是否还有元素
  • 获取当前元素
  • 移动到下一个元素

如图所示,迭代器的接口设计主要有下面几个流派:

C++ 的设计

C++ 将这三种操作分别封装成三个函数,分别是 operator!=operator*operator++

for (auto it = v.begin(); it != v.end(); ++it) {
std::cout << *it << std::endl;
}

这一设计最大的好处就是兼容了 C 语言的指针语义,使得指向数组的指针可以直接像迭代器一样使用。同时由于迭代器的三种操作是分开的,操作起来可以更加灵活。

但是问题也很明显,那就是迭代器本身不是自包含的,需要额外提供一个 end() 函数返回的迭代器作为终止条件。在遍历数组或链表时,这个迭代器尚且可以表示 past-the-last 元素,但是对于其他数据结构,这个迭代器可能就没有实际意义了。

C++20 引入了一种称为哨兵迭代器的设计。采用这种设计的迭代器不再需要专门准备一个终止迭代器,而是使用通用的无状态的哨兵迭代器。在与哨兵迭代器比较时,这类迭代器直接通过本身的状态来判断是否还有元素,使得迭代器实现了自包含。

Java 的设计

Java 将这三种操作封装成两个函数,分别是 hasNextnextnext 在返回当前元素的同时,还会移动到下一个元素。当然还有一种理解,认为 Java 的迭代器指向两个元素之间的空隙,next 的时候移动到下一个空隙的同时返回越过的元素。

Iterator<Integer> it = v.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}

Java 的设计需要迭代器有预测性,即我们希望在实际调用 next 之前就能通过 hasNext 知道是否还有元素。对于基于容器的迭代器来说,这可能比较容易实现。但是对于其他场景(如生成器协程),这种设计就不太适用了。因此在一些 Java 的迭代器实现中,hasNext 就已经计算出了下一个元素并缓存在了迭代器内部,next 直接返回这个缓存的元素。这会导致看似开销较小的 hasNext 操作实际上是一个隐含的 next 操作,而 next 操作并没有执行真正的计算。同时还需要注意,这里的 hasNext 应该被设计为幂等的操作,即多次调用 hasNext 应该返回相同的结果,且不影响 next 的结果。

C# 的设计

C# 将这三种操作封装成两个函数,分别是 MoveNextCurrentMoveNext 在判断是否还有元素的同时,还会移动到下一个元素。

IEnumerator<int> it = v.GetEnumerator();
while (it.MoveNext()) {
Console.WriteLine(it.Current);
}

与 Java 相比,C# 的设计是反其道而行之。幂等的操作变成了 Current,这导致所有的计算都应该发生在 MoveNext 之中,Current 只应该是一个访问操作。可以是访问容器,也可以是访问迭代器中的缓存。当然,如果你执意要在 Current 计算,也不是不可以。

Rust 和 JavaScript 的设计

Rust 将这三种操作封装成一个函数,即 nextnext 在返回当前元素的同时,还会移动到下一个元素。根据返回的 Option 类型来判断是否还有元素,有则返回 Some,无则返回 None

let mut it = v.iter();
while let Some(x) = it.next() {
println!("{}", x);
}

JavaScript 的迭代器设计与 Rust 类似,也是将这三种操作封装成一个函数,即 nextnext 在返回当前元素的同时,还会移动到下一个元素。根据返回的 done 字段来判断是否还有元素。

let it = v[Symbol.iterator]();
let next = it.next();
while (!next.done) {
console.log(next.value);
next = it.next();
}

这两个语言直接合并了三个操作,彻底抛弃了在迭代器中可能存在的缓存。返回的类型被套上了一层壳,应该先判断存在与否,再进行使用。

Python 的设计

Python __next__ 在返回当前元素的同时,还会移动到下一个元素。当元素遍历完毕时,会抛出 StopIteration 异常。

it = iter(v)
while True:
try:
print(next(it))
except StopIteration:
break

这种方式虽然比较低效,但实际上也一点都不快。

· One min read
风卷黄叶落寒塘
雨打青阶抹白霜
倏忽远眺凭栏望
何处话凄凉

伏案举手作文章
捶胸顿足写荒唐
蓦然回首寻过往
谁人诉衷肠

定风波·分配寄存器

· One min read
汇编构建 CFG
深度优先标次第
def use 连一起
遍历
溢出变量权重低

调用返回 ra 记
偏移
栈帧指针要对齐
函数开头结尾里
传递
存取 saved by callee

never say never? ——简介现代类型系统

· 17 min read
note

本文之探讨现代类型系统中的一些实践,供大家参考。多有不严谨之处,望学术大佬海涵。

类型与类型系统

我们先介绍一下什么是类型。

这里我们简单的把类型(type)定义为值(value)的集合。

例如有两个取值的布尔类型:

bool := {false, true}

一些类型的取值范围是有限的(例如大部分的基本类型),一些是无限的(如字符串)

字面类型

在不引起混淆的情况下,值本身也可以作为一种类型的记号,只包含这个值本身,即

false := {false}
true := {true}

这样的类型成为字面类型(Literal Type)。

联合类型

接着我们引入联合类型(Union Type)或和类型(Sum Type)的概念:

AB 的联合类型记作 A | B,该类型的值可以取 A 的值,也可以取 B 的值,即在 ABA\cup B 中取值。

那么我们可以把布尔类型定义为:

bool := false | true

例如在 Python 中

from typing import Literal

Mode = Literal['r', 'rb', 'w', 'wb']
TrueType = Literal[True]
def open(file: str, mode: Mode) -> TrueType:
return True

参数 mode 取某几个固定的字面量取值,而返回值一定是 True,这都可以用字面类型很好的表示。用上面的写法,即

Mode := 'r' | 'rb' | 'w' | 'wb'

交类型

既然有 Union Type 就有 Intersect Type。

AB 的交类型(Intersect Type)记作 A & B,该类型的值必须在 ABA\cap B 中取。例如

ReadMode := 'r' | 'rb'
BinaryMode := 'rb' | 'wb'

那么这两个类型的交类型就是 'rb'

积类型

既然有 Sum Type 就有 Product Type。所谓 Product,指的就是笛卡尔积。

AB 的交类型(Product Type)记作 A x B,该类型的值必须在 A×BA\times B 中取。例如

A = 1 | 2
B = a | b | c

那么

A x B = (1, a) | (1, b) | (1, c) | (2, a) | (2, b) | (2, c) 

可以看出,这其实就是元组(Tuple),因此积类型也可以记作 (A, B)

二元组还可以被扩展为 n 元组,这里不再赘述。

一元组的取值与原类型相同,故 (A) = A

零元组非常特殊,只有 () 一个取值,被称为单位类型 Unit

子类型

如果 BAB \subset A,则称 BA 的子类型(Subtyping)。

子类型是一个偏序关系。即满足下面三个性质:

  • 自反性:AAA \subset A
  • 反对称性:ABBA    A=BA\subset B \land B \subset A \implies A = B
  • 传递性:ABBC    ACA\subset B \land B\subset C \implies A\subset C

从数学本质上来说,子类型似乎只是子集在类型系统里的新名字。但由于面向对象编程(OOP)的发扬光大,子类型已经成为了非常重要的一个概念。

面向对象

若一个类型继承自另外一个类型,类图如下所示

根据 OOP 的规则,所有 B 的对象都是一个(is-a)A 的对象。即 bB,bA\forall b \in B, b \in A,也就是说 BAB \subset A。因此我们可以说 BA 的子类型。

一般来说,子类型的对象可以隐式转换为父类型,毕竟子类型的取值范围更小,这样的转换天经地义。这个概念可以推广到一般的类型系统中,如我们上面提到的 bool 类型,类图如下所示

因此,当一个类的所有子类确定,且本身不会有额外的对象(在面向对象里,就是一个抽象类或是接口),那么它的派生关系实际上可以写作所有子类型的联合类型。

显然,根据定义,任意两个类型 A B 都有公共父类型 A | B,同时有公共子类型 A & B,类图如下所示

A & B 常用于表示同时实现两个接口类型。根据定义,它能作为其中任意一个类型的对象被使用。

如果我们要尝试把所有类型的图画出来,你可能会本能的追问两个问题:这个图的顶端是谁?这个图的底端是谁?

其实很多 OOP 语言都实现成了“单根模型”,即所有的类都有一个共同的父类。如 java.lang.Object kotlin.Any System.Object 等等。基于这样的模型,类图的顶端自然就是这个单根。在之后的叙述中,我们把这个类型记作 any,表示所有类型的联合、所有类型的父类型。

null

现在我们来探究一个问题,空指针 null 是什么类型?

你可能会下意识的认为,既然所有类型都可以赋值为 null,那它理所应当是 any 类型的。这忽略了一个事实:只有子类型才可以直接转换为父类型。any 不是除了本身之外任何类型的父类型,所以 null 若是 any 则几乎不能赋值给任何类型。这显然是矛盾的。

稍加思索你会注意到,由于 null 可以赋值给任何类型,所以它实际上是任何类型的子类。即

实际上,还有一种非常容易思考的方法得出这个结论:既然 null 可以赋值给任何类型,那么所有类型的交集,自然而然有且仅有这一个 null 的取值。

“一切类型的子类型”,这一点可能相当的反直觉,但却符合逻辑与实践。许多带 null 语言的类型系统就是这样实现的。

根据类图,我们可以把把 any 称为 Top Type,null 称为 Bottom Type。用 null 作为 Bottom Type 并非是唯一的选择,我们在后面还会提到别的考虑。

函数类型

我们先来考虑一种特殊的类型——函数类型。

所谓函数类型,即用于描述接受某些参数 A,返回某些返回值 B 的类型,记作 A -> B

这里只有一个参数,有多个参数的情况,可以用各个参数类型的笛卡尔积来表示。同理,多个返回值的情况也可以轻松表示。即 (A1, A2, ..., An) -> (B1, B2, ..., Bn)

那么 void 怎么办?考虑到它什么也没有接受/返回,可以用 ()Unit 来表示。这个方案把多个参数和返回值用元组表示的规律相统一,成为了很多现代编程语言的实践。

函数型变

下面我们来讨论函数类型之间的类型关系,尤其是其形变(variance)。假设 BA 的子类型。

协变

考虑下面两个函数及其类型

f: () -> A
g: () -> B

由于 BA 的子类型,所以 g() 的结果是 B 也是 A。那么我们就能说,g 的类型同时也可以是 () -> A。既然所有 () -> B 都是 () -> A,那么前者就是后者的子类型,即

这样函数派生关系与类型派生关系方向相同的型变成为协变(covariance)

逆变

考虑下面两个函数和两个对象,及其类型

f: A -> ()
g: B -> ()
a: A
b: B

由于 BA 的子类型,所以不仅 f(a) 调用是合法的,而且 f(b) 调用也是合法的。那么我们就能说,f 的类型同时也可以是 B -> ()。既然所有 A -> () 都是 B -> (),那么前者就是后者的子类型,即

这样函数派生关系与类型派生关系方向相反的型变成为逆变(contravariance)

不变

考虑下面两个函数及其类型

f: A -> A
g: B -> B

显然对于参数而言,可以构成逆变,对于返回值而言,可以构成协变。但是合在一起,派生关系的方向就相互矛盾了。因此这样的函数类型称为不变(invariance)。而

f: A -> B
g: B -> A

则不同,虽然同时构成协变和逆变,但是两者的派生关系是相互平行的。故最终可以得出 A -> BB -> A 的子类型。

never

永不返回

当我们说,一个函数不返回任何东西,和一个函数不返回的时候,实际上含义是不一样的。前者会返回,只是没有返回值,后者则是不会回到调用函数的地方。一个典型的例子便是 C/C++ 的 exit 函数,声明如下所示:

[[noreturn]] void exit(int status);

在这里,[[noreturn]] 用来标注函数永远不会返回。你可能已经发觉,[[noreturn]] void 这两个在一起似乎有些多余。难道不会返回的函数还能声明为非 void 的情况吗?许多现代语言引入了 never 类型(一些语言称为 nothing),专门用于表示这种不会返回的函数类型。例如上面的函数就可以表示为

exit: int -> never

如果我们想要认为构造出一个返回 never 的函数,最简单的方法莫过于

  • 调用其它返回 never 的函数
  • 死循环
  • 抛出异常

抛出异常看似会返回到上一层栈帧,但实际上不会返回到函数的调用点,故同样是永不返回。

分支语句

考虑下面的语句:

a: A
b: B
value = cond ? a : b;

value 是什么类型?为了保证可以接受两个分支的值,显然应该是 A | B

假如把代码改为

value = cond ? a : throw Exception();

显然两个分支的类型分别是 Anever。由于第二个分支不可能返回,所以 value 的类型必然取一个分支的类型 A。根据前面的公式,即 A | never = A。这个恒等式实际上就说明 neverA\text{never} \subset A

稍加思索你会发现,对于任何类型,这个结果都满足。换句话,任何类型都是 never 的父类型。这也与实践相吻合:既然不会返回,那么实际上可以 never 当做任意类型的值来看待。这或许也说明,前面的 [[noreturn]] void 并列,并非是完全多余的。如果你希望它参与运算,或许换成其它类型也是合理的。

由于 never 实际上不可能有任何的取值,我们可以得出 never=\text{never} = \varnothing,这也符合上面的数学逻辑。

很显然,我们得出了一种新的 Bottom Type 的方案,那就是 never

现在还有个小问题没有解决:如果一个类型系统里面同时有 nevernull,怎么会出现两个 Bottom Type 呢?难道它们两个都是所有类型的子类型?

never?

空语义

要先回答这个问题,我们不妨来看看 null 究竟是什么意思。

null 最初用来表示空指针,引申为,一个不存在的对象。由于人们常常忘记检查它的存在,所以带来的问题比较多。现代的编程语言有两个实际上等价的解决方案:Option<T> 和 Nullability。

一种方法,Option<T> = T | (),即要么有值,要么是 ()。这样在使用的时候就必须先排除是 () 的情形。

另一种方法称为 Nullability,即默认普通的类型 T 不能包含 null,只有显式添加一个问号 T? 的时候,才可以是接受 null。显然 T? = T | null,是 T 的父类型,不能直接转换为 T,进而就需要额外的检查才能使用。

前一种方案往往用于没有 null 的情况。而我们如果要讨论 nevernull 并存,故讨论后一种方案更为合理。

结论

讲到这里其实答案已经呼之欲出,根据 T? = T | null

T==neverT = \varnothing = \text{never},那么 never?=null=null\text{never}? = \varnothing | \text{null} = \text{null}

也就是说,never? = null

根据定义,TT? 的子类型,所以最终我们得出了结论:never 才是真正的 Bottom Type。

我们也可以画出一个完整的,带 Nullability 的类图:

tip

推荐 BGM:Never Enough

表连接算法概览

· 5 min read

一般连接

形式语言与 SQL

连接两个表

RθSR \bowtie_\theta S
select *
from R, S
where theta
select *
from R
join S on theta

连接多个表

θ(R1,R2,,Rn)\Large \bowtie_\theta \normalsize(R_1, R_2, \cdots, R_n)
select *
from R1, R2, ... Rn
where theta
R1θ2R2θnRnR_1 \bowtie_{\theta_2} R_2 \cdots \bowtie_{\theta_{n}} R_n
select *
from R1
join R2 on theta2
...
join Rn on thetan
θ=i=2nθi\theta=\bigwedge_{i=2}^n\theta_i

Nested Loop Join

对于一般情况,多重循环是最朴素的算法

for r in R.tuples():
for s in S.tuples():
if predicate(r, s):
yield join(r, s)

可以通过分块遍历降低 IO 的性能损失

for rb in R.blocks():
for sb in S.blocks():
for r in rb:
for s in sb:
if predicate(r, s):
yield join(r, s)

等值连接

形式语言与 SQL

一种常见的连接是等值连接

关于一个属性的等值连接

R(id,),S(id,)RR.id=S.idSR(id, \cdots), S(id, \cdots)\\ R \bowtie_{R.id=S.id} S
select *
from R, S
where R.id = S.id
select *
from R
join S on R.id = S.id
select *
from R
join S using (id)
select *
from R
natural join S

关于多个属性的等值连接

R(id1,id2,),S(id1,id2,)RR.id1=S.id1R.id2=S.id2SR(id1, id2, \cdots), S(id1, id2, \cdots)\\ R \bowtie_{R.id1=S.id1 \land R.id2=S.id2 \land \cdots} S
select *
from R, S
where R.id1 = S.id1 and R.id2 = S.id2 and ...
select *
from R
join S on R.id1 = S.id1 and R.id2 = S.id2 and ...
select *
from R
join S using (id1, id2, ...)
select *
from R
natural join S

涉及等值查询问题,呼之欲出的就是两种方法:

  • 排序方法
  • 哈希方法

这两种方法在关联容器(即有序和无序的 setdict)的构造中相当有用,在表连接中也是如此。

如果其中一个关系在连接键上有索引,则可以直接采取 Index Nested Loop Join,否则则需要进行下面两种额外的工作。

Sort-Merge Join

RRSS 按照等值连接所需键排序,随后分别遍历两个关系;当遇到左右等值时,将等值的区间取出求笛卡尔积。

import functools
import itertools


def join(p):
r, s = p
return *r, *s


def smj(R, S, cmp):
R.sort(key=functools.cmp_to_key(cmp))
S.sort(key=functools.cmp_to_key(cmp))
ri = iter(R)
si = iter(S)
r = next(ri, None)
s = next(si, None)
while True:
if r is None or s is None:
return
while (c := cmp(r, s)) != 0:
if c < 0:
if (r := next(ri, None)) is None:
return
else:
if (s := next(si, None)) is None:
return

rc = [r]
sc = [s]

while (r := next(ri, None)) is not None and cmp(rc[0], r) == 0:
rc.append(r)

while (s := next(si, None)) is not None and cmp(sc[0], s) == 0:
sc.append(s)

yield from map(join, itertools.product(rc, sc))

如果连接的键是唯一的,则可以更进一步省去笛卡尔积的部分:

def join(r, s):
return *r, *s


def smj(R, S, cmp):
ri = iter(R)
si = iter(S)
while True:
if (r := next(ri, None)) is None or (s := next(si, None)) is None:
return
while (c := cmp(r, s)) != 0:
if c < 0:
if (r := next(ri, None)) is None:
return
else:
if (s := next(si, None)) is None:
return

yield join(r, s)

如果不想编写笛卡尔积的代码,又需要考虑到键不唯一的情况,可以考虑使用迭代器标记恢复之前的过程,但需要谨慎处理其中的边界条件:

struct iterator {
int *p = nullptr, *q = nullptr;
operator bool() const {
return p < q;
}
int operator*() const {
return *p;
}
iterator& operator++() {
++p;
return *this;
}
};

void smj(iterator r, iterator s) {
iterator mark;
while (r and s) {
if (!mark) {
while (*r != *s) {
if (*r < *s) {
++r;
if (!r) return;
} else {
++s;
if (!s) return;
}
}
mark = s;
}
if (*r == *s) {
emit(*r, *s);
++s;
if (s) {
continue;
}
}
s = mark;
++r;
mark = {};
}
}

Hash Join

用等值连接所需的键的哈希值建立 RR 的哈希表,再用 SS 等值连接的键去查询这个哈希表,得到的相同的行进行连接。

def join(r, s):
return *r, *s


def hj(R, S, hsh, eq):
build = {}
for s in S:
build.setdefault(hsh(s), []).append(s)
for r in R:
for s in build.get(hsh(r), []):
if eq(r, s):
yield join(r, s)

光线—三角形相交算法

· 3 min read

背景

在图形学编程中,经常需要求玩家准心对准的物体。该问题可转化为求与玩家视线相交且离玩家最近的三角形面片所对应的物体。因此需要一种光线-三角形相交算法来求出视线是否与某三角形相交、若相交,距离又是多少。下面介绍一种简单的基于参数和矩阵求解的算法。

原理

已知三角形的三个顶点 v0,v1,v2\pmb v_0,\pmb v_1,\pmb v_2,可以给出三角形内点的参数方程:

h=v0+α(v1v0)+β(v2v0)\pmb h=\pmb v_0+\alpha(\pmb v_1- \pmb v_0)+\beta(\pmb v_2- \pmb v_0)

其中参数 α,β\alpha,\beta 满足下面的条件:

α0,β0,α+β1\alpha\ge0,\beta\ge0,\alpha+\beta\le1

e1=v1v0e2=v2v0\pmb e_1=\pmb v_1-\pmb v_0\\ \pmb e_2=\pmb v_2-\pmb v_0\\

则可简记为:

h=v0+αe1+βe2\pmb h=\pmb v_0+\alpha\pmb e_1+\beta\pmb e_2

玩家从 p\pmb p 点往 r\pmb r 方向看去,与三角形所在平面的交点为 h\pmb h 的参数方程:

h=p+tr\pmb h=\pmb p+t\pmb r

其中参数 tt 满足:

t0t\ge0

如图所示:

联立两个方程

v0+αe1+βe2=p+tr\pmb v_0+\alpha\pmb e_1+\beta\pmb e_2=\pmb p+t\pmb r

移项

αe1+βe2tr=pv0\alpha\pmb e_1+\beta\pmb e_2-t\pmb r=\pmb p-\pmb v_0

(e1,e2,r)(αβt)=pv0(\pmb e_1,\pmb e_2,-\pmb r) \left(\begin{array}{r} \alpha\\ \beta\\ t\\ \end{array}\right)=\pmb p-\pmb v_0

A=(e1,e2,r)x=(αβt)b=pv0\pmb A=(\pmb e_1,\pmb e_2,-\pmb r)\\ \pmb x=\left(\begin{array}{r} \alpha\\ \beta\\ t\\ \end{array}\right)\\ \pmb b=\pmb p-\pmb v_0

也就是说

Ax=b\pmb A\pmb x=\pmb b

根据克拉默法则

xj=detAj(b)detAx_j=\dfrac{\det \pmb A_j(\pmb b)}{\det \pmb A}

α=det(b,e2,r)det(e1,e2,r)β=det(e1,b,r)det(e1,e2,r)t=det(e1,e2,b)det(e1,e2,r)\begin{array}{lll} \alpha&=&\dfrac{\det(\pmb b,\pmb e_2,-\pmb r)}{\det(\pmb e_1,\pmb e_2,-\pmb r)}\\ \beta&=&\dfrac{\det(\pmb e_1,\pmb b,-\pmb r)}{\det(\pmb e_1,\pmb e_2,-\pmb r)}\\ t&=&\dfrac{\det(\pmb e_1,\pmb e_2,\pmb b)}{\det(\pmb e_1,\pmb e_2,-\pmb r)} \end{array}

如果计算得出的参数在其限制范围内,则可以认为玩家的视线与三角形相交。解出的参数 tt 即为三角形面片与玩家的距离。

实现

该实现使用了 GLM 库。这里构造的矩阵实际上是所需矩阵的转置,但由于 detAT=detA\det A^T=\det A,因此不影响结果。

函数返回 -1 表示光线与三角形不相交,返回非负数表示三角形面片与玩家的距离。

#include <glm/glm.hpp>

float raytrace(glm::vec3 v0, glm::vec3 v1, glm::vec3 v2, glm::vec3 p, glm::vec3 r) {
glm::vec3 e1 = v1 - v0;
glm::vec3 e2 = v2 - v0;
glm::vec3 b = p - v0;
float den = glm::determinant(glm::mat3(e1, e2, -r));
if (den == 0) return -1;
float nom_u = glm::determinant(glm::mat3(b, e2, -r));
float nom_v = glm::determinant(glm::mat3(e1, b, -r));
float nom_t = glm::determinant(glm::mat3(e1, e2, b));
float u = nom_u / den;
float v = nom_v / den;
float t = nom_t / den;
if (u >= 0.0 && v >= 0.0 && u + v <= 1.0 && t >= 0) {
return t;
}
return -1;
}

<experimental/simd> 初体验

· 3 min read

之前用非标准的 simd 写过矩阵乘法加速,近期注意到 C++ 新的 Technical specifications——Parallelism library extensions v2 加入了 <experimental/simd>。于是我尝试用它重写了一下。

#include <iostream>
#include <string>
#include <functional>
#include <chrono>
#include <vector>
#include <cstring>
using namespace std;


double timeit(std::function<void()> test) {
auto start = std::chrono::system_clock::now();
test();
auto stop = std::chrono::system_clock::now();
std::chrono::duration<double, std::milli> time = stop - start;
return time.count();
}

#include <experimental/simd>
namespace stdx = std::experimental;

struct vec {
constexpr static int N = 256;

stdx::fixed_size_simd<double, 4> a[N];
double& operator[](int x) {
return ((double*) a)[x];
}
const double& operator[](int x) const {
return ((double*) a)[x];
}
double dot(const vec& x) const {
stdx::fixed_size_simd<double, 4> sum = 0;
for (int i = 0; i < N; i++)
sum += a[i] * x.a[i];
return stdx::reduce(sum);
}
};

class matrix {
constexpr static size_t m = 1024, n = 1024;
vector<double> e;
public:
explicit matrix(): e(m * n) {}
void random() {
for (size_t i = 0; i < m; ++i)
for (size_t j = 0; j < n; ++j)
at(i, j) = rand();
}
matrix(matrix const& that) = default;
matrix(matrix&&) = default;
matrix& operator=(matrix const& that) = default;
matrix& operator=(matrix&& that) = default;
double& at(size_t i, size_t j) { return e[i * m + j]; }
double const& at(size_t i, size_t j) const { return e[i * m + j]; }

matrix mul_plain(matrix const& that) const {
size_t p = that.m;
matrix product;
for (size_t i = 0; i < m; ++i)
for (size_t j = 0; j < n; ++j) {
for (size_t k = 0; k < p; ++k)
product.at(i, j) += at(i, k) * that.at(k, j);
}
return product;
}

matrix mul_simd(matrix const& that) const {
vector<vec> lines(m), columns(n);
for (int i = 0; i < m; ++i) {
for (int j = 0; j < n; ++j) {
lines[i][j] = at(i, j);
columns[j][i] = that.at(i, j);
}
}
matrix r;
for (int i = 0; i < m; ++i) {
for (int j = 0; j < n; ++j) {
r.at(i, j) = lines[i].dot(columns[j]);
}
}
return r;
}
};

double timeit(matrix (matrix::* test)(matrix const&) const, int times, const char* impl, double base = 0) {
double time = timeit([=] {
srand(1);
for (int i = 0; i < times; ++i) {
matrix m1, m2;
m1.random(); m2.random();
(m1.*test)(m2);
}
});
printf("'%5s' took %.2f ms to complete %d times 1024 x 1024 matrices multiplication", impl, time, times);
if (base && base != time) {
printf(", %.2f%% ", abs(base - time) / base * 100);
if (base > time) printf("faster");
else printf("slower");
}
puts("");
return time;
}


int main() {
double base = timeit(&matrix::mul_plain, 1, "plain");
timeit(&matrix::mul_simd, 1, "simd", base);
}

输出:

'plain' took 5458.48 ms to complete 1 times 1024 x 1024 matrices multiplication
' simd' took 232.27 ms to complete 1 times 1024 x 1024 matrices multiplication, 95.74% faster

速通统计学

· 2 min read

本文为 UESTC 《概率论与数理统计》课程统计学部分(6-10章)期末考试复习速通笔记,旨在背多分。

四个统计学分布

正态分布

XN(μ,σ2)E(X)=μ,D(X)=σ2X\sim N(\mu, \sigma^2)\\ E(X)=\mu,D(X)=\sigma^2

卡方分布

χ2=i=1nXi2χ2(n) 其中 XN(0,1)E(χ2)=n,D(χ2)=2n\chi^2=\sum_{i=1}^n X_i^2\sim\chi^2(n)\\ \text{ 其中 } X \sim N(0, 1)\\ E(\chi^2)=n,D(\chi^2)=2n

t 分布

又叫学生分布

T=XY/nt(n) 其中 XN(0,1),Yχ2(n)T=\dfrac{X}{\sqrt{Y/n}} \sim t(n)\\ \text{ 其中 } X \sim N(0, 1), Y\sim\chi^2(n)

F 分布

又叫费希尔(Fisher)分布

F=X/n1Y/n2F(n1,n2) 其中 Xχ2(n1),Yχ2(n2)F=\dfrac{X/n_1}{Y/n_2}\sim F(n_1,n_2)\\ \text{ 其中 } X\sim\chi^2(n_1), Y\sim\chi^2(n_2)

参数估计与假设检验

距估计

μ^=Xσ^2=n1nS2\hat{\mu}=\overline{X}\\ \hat{\sigma}^2=\dfrac{n-1}nS^2

极大似然估计

核心思想是使似然函数 LL 取极大值。即求出一个 θ\theta,使取出这个样本的概率最大。

L=i=1nP(Xi=xi)lnL=i=1nlnP(Xi=xi)lnLθ=0L=\prod_{i=1}^nP(X_i=x_i)\\ \ln L=\sum_{i=1}^n \ln P(X_i=x_i)\\ \dfrac{\partial \ln L}{\partial\theta}=0

区间估计与假设检验

  • 估计或检验 μ\mu,当 σ\sigma 已知时
U=Xμσ/nN(0,1)U=\dfrac{\overline{X}-\mu}{\sigma/\sqrt{n}}\sim N(0,1)
  • 估计或检验 μ\mu,当 σ\sigma 未知时
T=XμS/nt(n1)T=\dfrac{\overline{X}-\mu}{S/\sqrt{n}}\sim t(n-1)
  • 估计或检验 σ\sigma,当 μ\mu 未知时
χ2=n1σ2S2χ2(n1)\chi^2=\dfrac{n-1}{\sigma^2}S^2\sim\chi^2(n-1)

第一类错误:弃真(犯错概率为 α\alpha

第二类错误:纳伪

线性回归

lxy=i=1nxiyinxˉyˉlxx=i=1nxi2nxˉ2lyy=i=1nyi2nyˉ2b^=lxylxx,a^=yˉb^xˉσ^2=lyyb^2lxxn2R=lxylxxlyy>Rα(n2)l_{xy}=\sum_{i=1}^nx_iy_i-n\bar{x}\bar{y}\\ l_{xx}=\sum_{i=1}^nx_i^2-n\bar{x}^2\\ l_{yy}=\sum_{i=1}^ny_i^2-n\bar{y}^2\\ \hat{b}=\dfrac{l_{xy}}{l_{xx}}, \hat{a}=\bar{y}-\hat{b}\bar{x}\\ \hat{\sigma}^2=\dfrac{l_{yy}-\hat{b}^2l_{xx}}{n-2}\\ R=\dfrac{l_{xy}}{\sqrt{l_{xx}l_{yy}}}>R_\alpha(n-2)