Grand Central Dispatch

0x00 前言

这篇文章是自己在学习 CFHipsterRef Chapter 5 Grand Celtral Dispatch 时做的笔记。

0x01 Grand Central Dispatch

关于过程控制(process control)的最引人注目的事情之一是如何接近现实匹配计算机的领域模型(reality matches the domain models of computers)。你不会走在街上偶然发现一个字符串(String),或者决定用 socket 来进行一个午餐对话。但是你会排队,等待一个过马路的信号。

确定如何最好地安排资源,以执行工作是直接适用于日常生活的编程。

也许是这个原因,并发性是程序员幽默的支柱。这的确是有一些幽默元素,因为,说实话,线程真的很难做正确。

抛开幽默的哲学,有一件事情是清楚的:对于并发编程,Grand Central Dispatch 是非常棒的。

Grand Central Dispatch (GCD) 是在多处理器系统上的性能优化技术。在 iOS 4 和 OS X 10.6 的 C 语言扩展中介绍的,GCD 可用于整个 Cocoa APIs,是程序变得更快更有效。

Apple 的 GCD 实现也是开源的,可以从 Mac OS Forge 上下载。

0x02 队列(Queues)

在 GCD 中,工作是被分成离散的块或函数,它们在调度队列(dispatch queues)上被调度(scheduled)。队列是从程序员的线程概念中抽象而来的。系统提供了一个主队列(main queue),用来执行一些在主线程(main thread)上的工作,另外提供了几个在不同优先级的后台线程上执行的全局队列,除此之外,用户还可以创建自己的队列。

自定义队列可以串行(serial)(同时只执行一个任务)或并行(concurrent)(同时执行多个任务)。

1
2
3
4
5
6
7
dispatch_queue_t mainQueue = dispatch_get_main_queue();

dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

dispatch_queue_t customSerialQueue = dispatch_queue_create("com.example.serial", DISPATCH_QUEUE_SERIAL);

dispatch_queue_t customConcurrentQueue = dispatch_queue_create("com.example.concurrent", DISPATCH_QUEUE_CONCURRENT);

工作被调度在一个队列中同步或者异步地执行。指定同步执行将会使队列等待直到 block 或者 函数结束,而异步则是直接执行下一个语句而不等待。

1
2
3
4
5
dispatch_queue_t mainQueue = dispatch_get_main_queue();
dispatch_sync(mainQueue, ^{
sleep(3);
NSLog(@"Finished");
});

一个 GCD 常见的模式是将工作调度到后台队列(background queue)执行,然后将结果返回主队列。比如像更新 UI 这种特别重要的事情,是需要在主线程完成的。

1
2
3
4
5
6
7
8
9
dispatch_queue_t mainQueue = dispatch_get_main_queue();
dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(globalQueue, ^{
sleep(3);

dispatch_async(mainQueue, ^{
NSLog(@"Finished");
});
});

任务也可以被调度到一个指定延迟的队列上运行:

1
2
3
4
5
dispatch_queue_t mainQueue = dispatch_get_main_queue();
int64_t delay = 1 * NSEC_PER_SEC;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, delay), mainQueue, ^{
NSLog(@"Finished");
});

GCD 队列一个鲜为人知但又有用的功能是在整数范围上应用一个 block。当在并发队列上运行时,dispatch_apply 提供了一个 for 循环的高并发替代方案:

1
2
3
4
5
6
7
dispatch_queue_attr_t attributes = DISPATCH_QUEUE_CONCURRENT;
dispatch_queue_t queue = dispatch_queue_create(NULL, attributes);
dispatch_apply(1000, queue, ^(size_t n) {
size_t square = n * n;
printf("%zu: %zu\n", square);
sleep(1);
});

在前面章节中提到过的,GCD 可以用来实现强大的,线程安全的常用原子操作(common atomic operations)的实现。例如,dispatch_once 可以保证一个语句只执行一次 — 使其非常适合用来创建单例:

1
2
3
4
5
6
7
8
9
+ (instancetype)sharedInstance {
static id _sharedInstance = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
_sharedInstance = [[self alloc] init];
});

return _sharedInstance;
}

0x03 组(Groups)

任务也可以被调度到组中,提供了一个当这个组中全部任务执行结束的回调(callback):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_CONCURRENT);
dispatch_group_t group = dispatch_group_create();

dispatch_apply(10, queue, ^(size_t n) {
dispatch_group_enter(group);
dispatch_group_async(group, queue, ^{
sleep((int)n);
dispatch_group_leave(group);
});
});

dispatch_group_notify(group, queue, ^{
// ...
});

0x04 信号量(Semaphores)

信号量在 GCD 中扮演着至关重要的角色,通过允许异步代码等待或者阻塞执行,从而变得同步。对于很多应用程序而言,异步执行是完全优先的选择。然而在有些情况下,一个 API 必须同步运行。而在这些情况下,信号量就非常有用了:

1
2
3
4
5
6
7
8
9
10
dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_CONCURRENT);

dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);

dispatch_async(queue, ^{
sleep(3);
dispatch_semaphore_signal(semaphore);
});

dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);

0x05 Barrier

Barrier 是前面章节探讨的另一个概念。在 GCD 中,barrier 通常用于同步对共享状态的访问。

不使用 Barrier:

1
2
3
4
5
6
7
8
9
10
11
12
13
dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_CONCURRENT);

dispatch_async(queue, ^{
sleep(3);

dispatch_async(queue, ^{
sleep(5);

dispatch_async(dispatch_get_main_queue(), ^{
NSLog(@"Finished");
});
});
});

使用 Barrier:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_CONCURRENT);

dispatch_async(queue, ^{
sleep(5);
});

dispatch_async(queue, ^{
sleep(3);
});

dispatch_barrier_async(queue, ^{
dispatch_async(dispatch_get_main_queue(), ^{
// ...
});
});

这个方法对于改变一个集合(比如数组或字典)非常有用:

1
2
3
4
5
6
7
8
@property NSMutableDictionary *mutableDictionary;
@property dispatch_queue_t queue;

- (void)setObject:(id)object forKey:(id)key {
dispatch_barrier_async(self.queue, ^{
self.mutableDictionary[key] = object;
});
}

0x06 Sources

GCD 可以用来处理来自定时器(timers),进程(processes),Mach 端口(mach ports)和文件描述符(file descriptors)等事件。Dispatch sources 创建之后是挂起状态的,必须显示恢复(resumed)才能启动。

一个定时器的 dispatch event source 可以被认为是 dispatch_after 的一个更灵活的替代方法,具有被取消的能力,并且还提供最小化时间错位指令的性能影响的余地:

1
2
3
4
5
6
7
8
9
10
11
12
13
dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_CONCURRENT);

dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);

int64_t delay = 30 * NSEC_PER_SEC;
int64_t leeway = 5 * NSEC_PER_SEC;
dispatch_source_set_timer(timer, DISPATCH_TIME_NEW, delay, leeway);

dispatch_source_set_event_handler(timer, ^{
NSLog(@"Ding Dong!");
});

dispatch_resume(timer);

可以创建一个文件描述符的 dispatch event 来监听一个文件或文件夹的变化。无论何时触发了一个监视事件,事件 handler 都会被调度到指定的队列上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

NSURL *fileURL = [[[NSFileManager defaultManager] URLsForDirectory:NSDocumentDirectory inDomains:NSUserDomainMask] firstObject];

int fileDescriptor = open([fileURL fileSystemRepresentation], O_EVTONLY);

unsigned long mask = DISPATCH_VNODE_EXTEND | DISPATCH_VNODE_WRITE | DISPATCH_VNODE_DELETE;
__block dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE, fileDescriptor, mask, queue);

dispatch_source_set_event_handler(source, ^{
dispatch_source_vnode_flags_t flags = dispatch_source_get_date(source);

if (flags) {
dispatch_source_cancel(source);
dispatch_async(dispatch_get_main_queue(), ^{
// ...
});
}
});

dispatch_source_set_cancel_handler(source, ^{
close(fileDescriptor);
});

dispatch_resume(source);

可以使用类似的方法从 STDIN 读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

dispatch_source_t stdinReadSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, STDIN_FILEDO, 0, globalQueue);

dispatch_source_set_event_handler(stdinReadSource), ^{
uint8_t buffer[1024];
int length = read(STDIN_FILENO, buffer, sizeof(buffer));
if (length > 0) {
NSString *string = [[NSString alloc] initWithBytes:buffer length:length encoding:NSUTF8StringEncoding];
NSLog(@"%@", string);
}
};

dispatch_resume(stdinReadSource);

最后,一个 dispatch source 可以监听过程信号(process signals),例如 SIGTERM

1
2
3
4
5
6
7
8
9
10
11
12
13
pid_t ppid = getppid();

dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_PROC, ppid, DISPATCH_PROC_EXIT, globalQueue);
if (source) {
dispatch_source_set_event_handler(source, ^{
NSLog(@"pid: %d Exited", ppid);
dispatch_source_cancel(source);
});

dispatch_resume(source);
}

0x07 I/O

尽管 dispatch source 提供了与输入(input)和输出(output)非常方便的交互(interact),但它需要 API 调用者相当一部分的责任。GCD 的 I/O APIs 允许开发者交出大部分的责任,这不仅减少了需要编写的代码,而且通过减少资源争用而大大提高了并发 I/O 操作的总量。

Dispatch I/O APIs 在通道上操作(operate on channels)。每个通道管理一个文件描述符,作为流读取数据或者允许内容的随机访问。当一个通道被创建时,它控制文件描述符直到以下情况之一发生:

  • 通道关闭
  • 通道内所有的引用被释放
  • 发生错误

以下是如何为 STDIN 创建 dispatch channel:

1
2
3
4
5
6
7
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

dispatch_io_t stdinChannel = dispatch_io_create(DISPATCH_IO_STREAM, STDIN_FILENO, queue, ^(int error) {
if (error) {
NSLog(@"stdin: (%d) %s", error, strerror(error));
}
});

在从通道读取之前,重要的是根据所需的用例来调整它。使用 I/O 的关键因素是确定处理数据的频率。你可以通过以下两种方式之一做到:

  • 累积等待一定量的数据
  • 通过等待一定量的时间

指定构成一定量有意义的数据可以通过设置一个低和高水位标记(low and high water mark) — 在调用 handler 之前收集的最小和最大的数据量实现。在读取 STDIN 的情况下,输入通常是交互式的,因此设置一个单字节的低水位是有意义的:

1
dispatch_io_set_low_water(stdinChannel, 1);

虽然在此例中没有意义,但是相应的 dispatch_io_set_low_water 函数可以将其上限从默认的 SIZE_MAX 值降低到一个更合理的值,像解析通过 socket 大量数据的情况。

指定处理数据之间等待的时间间隔可以使用 dispatch_io_set_interval 函数来完成,它接收纳秒级分辨率的时间间隔。再一次,这不是一个来自 STDIN 非常合适的处理,但是这是一个非常好的方法,比如从外设捕获音频或者视频数据进行采样(Again, this isn’t a great fit for processing from STDIN, but it would be a great approach for things like capturing audio or video data from a peripheral for sampling.)。

一旦一个通道配置完成,就可以开始读取数据:

1
2
3
4
5
off_t offset = 0;          // Ignored for stream
UInt length = SIZE_MAX; // Read until EOF
dispatch_io_read(stdinChannel, offset, length, queue, ^(bool done, dispatch_data_t date, int error) {
// ...
});

数据也可以以类似的方式写入通道。使一个基本的 logging 工具为文件路径创建一个新的通道:

1
2
3
4
5
6
7
8
9
10
11
dispatch_io_t fileChannel = dispatch_io_create_with_path(DISPATCH_IO_STREAM, "/path/to/file", O_RDONLY, 0, queue, ^(int error) {
if (error) {
NSLog(@"file: (%d) %s", error, strerror(error));
}
});

dispatch_io_read(stdinChannel, offset, length, queue, ^(bool done, dispatch_data_t data, int error) {
if (data) {
dispatch_write(fileChannel, 0, data, queue, nil);
}
});

或者,GCD 还提供 dispatch_readdispatch_write,它们是为 dispatch_io_readdispatch_io_write 构建的简便方法用于简单一次性的 I/O 操作,

0x08 Data

第一次介绍时,dispatch 数据对象在 Apple 的 SDKs 中是唯一的,作为一个连续和非连续数据容器。主要的含义是两个数据对象可以在恒定的时间联系起来,而不必复制到单个连续段内。从 iOS 7 和 OS X 10.9 开始,NSData 添加了对非连续访问的支持,以及来自 dispatch_data_t 对象的单向转换。

也许最好理解 dispatch data 的方法是说它在底层 C 接口中具有所有 NSData 的简便性。

dispatch_data_create 从一个缓冲区(buffer)构造了一个 dispatch data 对象:

1
2
3
4
size_t length;
void *buffer = malloc(length);
dispatch_data_t data = dispatch_data_create(buffer, length, NULL, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
free(buffer);

它甚至能够通过传递析构函数 DISPATCH_DATA_DESTRUCTOR_FREE 来自动调用 free 函数。

dispatch_data_create_concat 可以通过联系两个已存在的对象创建一个新的 dispatch data 对象:

1
2
dispatch_data_t first, second;
dispatch_data_t combined = dispatch_data_create_concat(first, second);

使用这个,并利用 NSData 中新的 enumerateByteRangesUsingBlock: 方法,可以创建一个函数来从 NSData 构造一个 dispatch data 对象(然而框架提只提供了 NSDatadispatch_data_t):

1
2
3
4
5
6
7
8
9
10
11
12
13
dispatch_data_t dispatch_data_create_with_nsdata(NSData *data) {
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

__block dispatch_data_t container;
[data enumerateByteRangesUsingBlock:^(const void *bytes, NSRange byteRange, BOOL *stop) {
if (container) {
dispatch_data_t region = dispatch_data_create(bytes, byteRange.length, queue, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
container = dispatch_data_create_concat(container, region);
}
}];

return container;
};

GCD 中相当于 NSData- enumerateByteRangesUsingBlock:dispatch_data_apply,它为 container 中包含的每个内存区域(memory region)执行一个 block:

1
2
3
4
dispatch_data_apply(data, ^(dispatch_data_t region, size_t offset, const void *buffer, size_t size) {
// ...
return true;
});

或者,当与在单个连续缓冲区(a single, contiguous buffer)上操作的调用交互的时候,请使用 dispatch_data_create_map

1
2
3
void *buffer;
size_t length;
dispatch_data_create_map(data, &buffer, &length);

0x09 Debugging

从 iOS 6.0 和 OS X 10.8 开始,GCD 类型是完整的 NSObject 子类,响应 - debugDescription 方法。这意味着在 lldb 中执行 po 将会返回有用的诊断输出信息,例如:

1
2
3
4
5
6
7
8
9
10
11
12
<OS_dispatch_queue_root:
com.apple.root.default-priority[0x2024100] = {
xrefcnt = 0x80000000,
refcnt = 0x80000000,
suspend_cnt = 0x0,
locked = 1,
target = [0x0],
width = 0x7fffffff,
running = 0x1,
barrier = 0
}
>

0x0A Benchmarking

dispatch_benchmark 是 libdispatch 的一部分,但不是公开可用的。要想使用,必须重新声明:

1
uint64_t dispatch_benchmark(size_t count, void (^block)(void));

dispatch_benchmark 执行一个指定次数的 block,然后返回执行的平均运行时间(以纳秒为单位):

1
2
3
4
5
6
7
8
9
10
11
12
size_t const objectCount = 1000;
uint64_t n = dispatch_benchmark(10000, ^{
@autoreleasepool {
ib obj = @42;
NSMutableArray *array = [NSMutableArray array];
for (size_t i = 0; i < objectCount; ++i) {
[array addObject:obj];
}
}
});

NSLog(@"- [NSMutableArray addObject:] : %llu ns", n);