动手实现 Dispatch Group

Mike Ash Friday Q&A 中文译文:动手实现 Dispatch Group

作者 TommyWu
封面圖片: 动手实现 Dispatch Group

译文 · 原文: Friday Q&A 2013-08-16: Let's Build Dispatch Groups · 作者 Mike Ash

原文:https://www.mikeash.com/pyblog/friday-qa-2013-08-16-lets-build-dispatch-groups.html 发布:2013-08-16 作者:Mike Ash 译者:MiMo(mimo-v2.5-pro);代码块保留英文原样


调度组是用于同步多个任务的便捷工具,一位匿名读者建议将其作为今天” 让我们构建” 系列的主题。

概述

调度组提供四种基本操作:

  • 进入(Enter),表示任务已开始。
  • 退出(Exit),表示任务已完成。
  • 通知(Notify),当每个进入都有对应退出时,调用一个代码块。
  • 等待(Wait),类似通知,但是同步的。

你可以利用它发起一组并行操作并等待它们全部完成:

dispatch_group_t group = dispatch_group_create();
for(int i = 0; i < 100; i++)
{
dispatch_group_enter(group);
DoAsyncWorkWithCompletionBlock(^{
dispatch_group_leave(group);
});
}
dispatch_group_wait(group, DISPATCH_TIME_FOREVER);

你也可以用它来在所有任务完成时异步调用一个 block:

dispatch_group_t group = dispatch_group_create();
for(int i = 0; i < 100; i++)
{
dispatch_group_enter(group);
DoAsyncWorkWithCompletionBlock(^{
dispatch_group_leave(group);
});
}
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
UpdateUI();
});

既然它是 GCD(Grand Central Dispatch)的一部分,而且我们正在讨论异步操作,那么应该不言而喻:调度组(dispatch groups)的一个主要特性是所有操作都是线程安全的。

代码 与往常一样,我已在 GitHub 上发布了我的重新实现的完整代码: https://github.com/mikeash/MADispatchGroup

接口 ma_dispatch_group 的 API 与 dispatch_group 高度一致:

typedef struct ma_dispatch_group_internal *ma_dispatch_group_t;
ma_dispatch_group_t ma_dispatch_group_create(void);
void ma_dispatch_group_destroy(ma_dispatch_group_t group);
void ma_dispatch_group_enter(ma_dispatch_group_t group);
void ma_dispatch_group_leave(ma_dispatch_group_t group);
void ma_dispatch_group_notify(ma_dispatch_group_t group, void (^block)(void));
void ma_dispatch_group_wait(ma_dispatch_group_t group);

dispatch_group接口存在一些差异:

  • ma_dispatch_group_t不是 dispatch 对象,因此不采用 retain / release(引用计数)语义,而是使用单一的 destroy 函数进行清理。
  • 缺少dispatch_group_async函数。这些仅是对enterleavedispatch_async的简单封装,因此并非至关重要。
  • notify 函数不接收 dispatch queue 作为参数,而是会直接在最后一个调用leave的代码上下文中立即调用 block。将 block 封装到dispatch_async中非常简单,因此这并非重大改动。
  • wait 函数不支持超时。这极大地简化了代码,同时仍能展示整体概念。

字段
结构体struct ma_dispatch_group_internal包含两个字段:一个计数器和一个 action block:

struct ma_dispatch_group_internal {
uint32_t counter;
void (^action)(void);
};

计数器(counter)用于跟踪 enter 被调用的次数,且没有对应的 exit 调用。而 action block 则是通过 notify 函数设置的操作块。

创建与删除 创建一个新的组(group)非常简单。只需分配一块内存,并使用 calloc 来确保其初始化为零:

ma_dispatch_group_t ma_dispatch_group_create(void)
{
ma_dispatch_group_t group = calloc(1, sizeof *group);
return group;
}

销毁一个 group 同样简单。我假设任何设置了 notify 的 action set(动作集)总会在 group 被销毁前触发,并且 action block(动作块)会作为该过程的一部分被销毁。因此,destroy 中除了简单地调用 free 之外无需其他清理工作。

void ma_dispatch_group_destroy(ma_dispatch_group_t group)
{
free(group);
}

Enter 的实现极其简单。它仅涉及一次原子自增(atomic increment),使用了编译器内建(compiler builtin)的原子操作。

void ma_dispatch_group_enter(ma_dispatch_group_t group)
{
__sync_fetch_and_add(&group->counter, 1);
}

使用原子内建函数(atomic builtin)可确保该操作是线程安全的。ma_dispatch_group_leave 的实现稍显复杂。它首先执行一次原子递减操作:

void ma_dispatch_group_leave(ma_dispatch_group_t group)
{
uint32_t newCounterValue = __sync_sub_and_fetch(&group->counter, 1);

__sync_sub_and_fetch 这一内置函数首先执行一次原子递减操作,随后返回计数器的最新值。若此次为最后一次 leave 调用,则 newCounterValue 的值将为 0,此时应触发该组的同步通知动作。

if(newCounterValue == 0)
{

该动作可能尚未存在,例如当所有进入调用(enter calls)在通知函数(notify)调用前均以离开调用(leave)进行了平衡,因此需要对此进行检查:

if(group->action)
{

如果已设置了某个动作,则执行它。

group->action();

当它返回后,销毁该代码块并将动作设置为 NULL:

Block_release(group->action);
group->action = NULL;
}
}
}

Notify

ma_dispatch_group_notify 的实现虽然有趣,但本质上极为简单。从概念上讲,需要考虑两种完全不同的情况:

  • 仍有待处理的 enter 调用,且尚未与 leave 调用达成平衡。这种情况下,设置 group 的 action 代码块。

  • 所有 enter 调用都已与 leave 调用平衡。这种情况下,立即执行该代码块。

这看起来相当直接。然而,对第一种情况的简单实现会引发一个竞态条件(race condition)。考虑以下事件序列:

  • notify 函数检查 count,发现其非零。

  • 待处理的操作调用 leave,并将 count 减至零。

  • 将 count 减至零的操作会检查 action。由于没有设置任何 action,它不执行任何操作。

  • notify 函数设置 group 的 action。

  • action 永远不会运行,因为没有任何代码来执行它。

这个优雅的方案既修复了竞态条件(race condition),又将两种分离的情形合并为单一代码路径。解决方案是用一个进入/离开(enter/leave)对来封装赋值操作。由于在分配动作(action)时始终存在至少一个不平衡的进入操作,这实际上消除了第二种情形。这也解决了潜在的竞态条件问题,因为赋值操作至少发生在某个待处理的离开调用之前。函数的实现如下所示:

void ma_dispatch_group_notify(ma_dispatch_group_t group, void (^block)(void))
{
ma_dispatch_group_enter(group);
group->action = Block_copy(block);
ma_dispatch_group_leave(group);
}

ma_dispatch_group_wait 的实现概念简单,但代码有些复杂。它利用 ma_dispatch_group_notify 来完成大部分工作。思路很简单:调用 notify 时传入一个 block(代码块),记录该 block 何时被执行,然后等待这个 block 运行。关键在于如何高效地等待。

可以不考虑效率,直接采用轮询(poll)的方式。例如,以下是一个有效但略显笨拙的实现:

void ma_dispatch_group_wait(ma_dispatch_group_t group)
{
__block volatile int done = 0;
ma_dispatch_group_notify(group, ^{
done = 1;
});
while(!done)
/* nothing */;
}

然而,无谓地让 CPU 占用率达到 100% 并不是个好主意,所以我们需要做得更好。

实现这一目标有许多不同方法。我选择使用 pthread 条件变量(condition variables)。条件变量与互斥锁(mutex)配合使用,允许一个线程阻塞并等待另一个线程向它发送信号。在信号线程中,你需要执行:

pthread_mutex_lock(&lock);
// make your change
pthread_cond_broadcast(&cond); // or _signal
pthread_mutex_unlock(&lock);

这个锁确保了对等待线程的更改操作是原子的(atomic)。随后的 cond_broadcast 调用会通知任何正在等待的线程:是时候醒来了。

在等待线程中,你需要这样做:

pthread_mutex_lock(&lock);
while(!condition)
pthread_cond_wait(&cond);
pthread_mutex_unlock(&lock);

锁确保了对条件的检查与信号发送线程之间是原子(atomic,指操作不可分割)的。while 循环有两个作用。首先,可能条件在此之前已经被设置。这种情况下,while 循环避免了调用 pthread_cond_wait—— 由于信号先前已经发出,调用它将导致永远等待。其次,即使没有任何信号发送到条件变量,pthread_cond_wait 也可能返回。这被称为虚假唤醒(spurious wakeup),是条件变量内部实现方式所导致的结果。在发生虚假唤醒时,while 循环确保等待的线程不会过早退出。

ma_dispatch_group_wait 函数首先声明并初始化了一个互斥锁(mutex)和一个条件变量:

void ma_dispatch_group_wait(ma_dispatch_group_t group)
{
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);

接下来,它获取这些变量的指针:

pthread_mutex_t *mutexPtr = &mutex;
pthread_cond_t *condPtr = &cond;

这样做是为了规避与块(blocks)的不幸冲突。如果传递给 notify 的块直接捕获 mutexcond,它们会被复制。这些数据类型无法容忍被复制。具体来说,pthread_mutex_t 在某些实现中(至少在某些平台上)包含一些内部的对齐检查。为了避免费力去强制编译器满足该库的对齐需求,pthread_mutex_t 内部会预留一些额外的存储空间,然后在初始化时才会确定内部存储的正确对齐方式。本质上,有一个内部字段具有(至少)两个可能的位置,而具体位置由 init 决定。当变量被复制时,这种对齐可能不再正确,并可能导致崩溃。通过捕获这些变量的指针而非变量本身,可以避免复制及由此可能引发的崩溃。

还声明了一个 done 变量来跟踪块实际执行的时间:

__block int done = 0;

现在 notify 被调用,其参数是一个 block(代码块),该 block 依次获取锁、设置完成标志、通知等待线程、并释放锁:

ma_dispatch_group_notify(group, ^{
pthread_mutex_lock(mutexPtr);
done = 1;
pthread_cond_broadcast(condPtr);
pthread_mutex_unlock(mutexPtr);
});

在一切就绪之后,函数等待 done 被设置:

pthread_mutex_lock(mutexPtr);
while(!done)
pthread_cond_wait(condPtr, mutexPtr);
pthread_mutex_unlock(mutexPtr);
}

这就是全部内容!

结论 Dispatch groups(调度组)是一个极其有用的 API,能轻松协调多个异步操作(asynchronous actions),并在它们全部完成后执行后续代码。该 API 设计简洁,但功能强大。如此实用的功能其实现起来相对简单。只要思路得当,少量代码就能发挥大作用。

今天的内容就到这里。下次再来分享更多有趣的探索。Friday Q & A 栏目由读者建议驱动,在此期间,如有想了解的主题,请随时发送你的想法。


#Original (English)

Source: https://www.mikeash.com/pyblog/friday-qa-2013-08-16-lets-build-dispatch-groups.html

Dispatch groups are a handy facility for synchronizing multiple tasks, and an anonymous reader suggested them for the subject of today’s Let’s Build.

OverviewDispatch groups provide four basic operations:

  • Enter, to indicate that a task has begun.

  • Exit, to indicate that a task has completed.

  • Notify, which invokes a block when every enter gets a corresponding exit.

  • Wait, which is like notify, but synchronous.

You can use this to spin off a bunch of parallel operations and wait for them to complete:

dispatch_group_t group = dispatch_group_create();
for(int i = 0; i < 100; i++)
{
dispatch_group_enter(group);
DoAsyncWorkWithCompletionBlock(^{
dispatch_group_leave(group);
});
}
dispatch_group_wait(group, DISPATCH_TIME_FOREVER);

You can also use it to asynchronously invoke a block when they all complete:

dispatch_group_t group = dispatch_group_create();
for(int i = 0; i < 100; i++)
{
dispatch_group_enter(group);
DoAsyncWorkWithCompletionBlock(^{
dispatch_group_leave(group);
});
}
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
UpdateUI();
});

Given that it’s part of GCD and we’re talking about asynchronous operations, it should go without saying that a major feature of dispatch groups is that all operations are thread safe.

CodeAs usual, I have posted the full code for my reimplementation on GitHub here:

https://github.com/mikeash/MADispatchGroup

InterfaceThe API for ma_dispatch_group closely mirrors that of dispatch_group:

typedef struct ma_dispatch_group_internal *ma_dispatch_group_t;
ma_dispatch_group_t ma_dispatch_group_create(void);
void ma_dispatch_group_destroy(ma_dispatch_group_t group);
void ma_dispatch_group_enter(ma_dispatch_group_t group);
void ma_dispatch_group_leave(ma_dispatch_group_t group);
void ma_dispatch_group_notify(ma_dispatch_group_t group, void (^block)(void));
void ma_dispatch_group_wait(ma_dispatch_group_t group);

There are a few differences from the dispatch_group interface:

  • A ma_dispatch_group_t is not a dispatch object, so it doesn’t use retain/release semantics, instead using a single destroy function to clean one up.

  • The dispatch_group_async functions are missing. These are just simple wrappers around enter, leave, and dispatch_async, so not all that important.

  • The notify function doesn’t take a dispatch queue, and instead invokes the block immediately in the context of the last piece of code to call leave. It’s trivial to wrap the block in a dispatch_async, so this is no major change.

  • The wait function doesn’t take a timeout. This simplifies the code considerably while still illustrating the overall concept.

FieldsThe struct ma_dispatch_group_internal contains two fields, a counter and an action block:

struct ma_dispatch_group_internal {
uint32_t counter;
void (^action)(void);
};

The counter keeps track of how many times enter has been called without a corresponding exit. The action block is the action set by the notify function.

Creation and DeletionCreating a new group is extremely simple. Just allocate a chunk of memory, using calloc to ensure it’s zeroed:

ma_dispatch_group_t ma_dispatch_group_create(void)
{
ma_dispatch_group_t group = calloc(1, sizeof *group);
return group;
}

Destroying a group is likewise simple. I assume that any action set with notify always fires before a group is destroyed, and that the action block is destroyed as part of that. As such, there’s no cleanup required in destroy beyond simply calling free:

void ma_dispatch_group_destroy(ma_dispatch_group_t group)
{
free(group);
}

EnterThe implementation of ma_dispatch_group_enter is extremely simple. It’s just an atomic increment, using an atomic compiler builtin:

void ma_dispatch_group_enter(ma_dispatch_group_t group)
{
__sync_fetch_and_add(&group->counter, 1);
}

Using the atomic builtin ensures that this is thread safe. The implementation of ma_dispatch_group_leave is a bit more complex. It first performs an atomic decrement:

void ma_dispatch_group_leave(ma_dispatch_group_t group)
{
uint32_t newCounterValue = __sync_sub_and_fetch(&group->counter, 1);

The __sync_sub_and_fetch builtin first performs an atomic decrement, then returns the new value of the counter. If this was the last leave call, then newCounterValue will contain 0, and it’s time to execute the group’s notification action.

if(newCounterValue == 0)
{

The action may not exist yet, for example if all enter calls were balanced with a leave before the call to notify, so check for that:

if(group->action)
{

If an action has been set, execute it:

group->action();

Once it returns, destroy the block and set the action to NULL:

Block_release(group->action);
group->action = NULL;
}
}
}

NotifyThe implementation of ma_dispatch_group_notify is interesting, but ultimately extremely simple. Conceptually, there are two completely separate cases to consider:

  • There are still pending enter calls that have not been balanced with a leave. In that case, set the group’s action block.

  • All enter calls have been balanced with a leave. In that case, execute the block immediately.

Seems straightforward enough. However, a simple implementation of the first case creates a race condition. Consider the following sequence of events:

  • The notify function checks count and sees that it is non-zero.

  • The pending actions call leave and decrease the count to zero.

  • The action that decreases the count to zero checks the action. No action is set, so it does nothing.

  • The notify function sets the group’s action.

  • The action never runs, because no code is left to run it.

There’s an elegant solution that both fixes this race condition and consolidates both separate cases into a single code path. The solution is to wrap the assignment of the action in an enter/leave pair. This effectively eliminates the second case, since there’s always at least one unbalanced enter when assigning the action. This also solves the potential race condition, since the assignment occurs before at least one of the pending leave calls. Here’s what the function looks like:

void ma_dispatch_group_notify(ma_dispatch_group_t group, void (^block)(void))
{
ma_dispatch_group_enter(group);
group->action = Block_copy(block);
ma_dispatch_group_leave(group);
}

WaitThe implementation of ma_dispatch_group_wait is simple in concept, although a bit complex in code. It uses ma_dispatch_group_notify for most of the work. The idea is simply to call notify with a block that notes when it gets run, then wait for that block to run. The trick is how to efficiently wait.

It’s possible to not bother with efficiency and just poll. For example, this is a valid, albeit dumb, implementation:

void ma_dispatch_group_wait(ma_dispatch_group_t group)
{
__block volatile int done = 0;
ma_dispatch_group_notify(group, ^{
done = 1;
});
while(!done)
/* nothing */;
}

However, spinning the CPU at 100% for no reason is a bad idea, so let’s try to do better.

There are many different ways to implement this. I chose to use pthread condition variables. A condition variable pairs with a mutex and allows one thread to block and wait for another thread to signal it. In the signaling thread, you do:

pthread_mutex_lock(&lock);
// make your change
pthread_cond_broadcast(&cond); // or _signal
pthread_mutex_unlock(&lock);

The lock ensures that the change is atomic with respect to the waiting thread. The cond_broadcast call then informs any waiting threads that it’s time to wake up.

In the waiting thread, you do:

pthread_mutex_lock(&lock);
while(!condition)
pthread_cond_wait(&cond);
pthread_mutex_unlock(&lock);

The lock insures that the check for the condition is atomic with respect to the signaling thread. The while loop serves two purposes. First, it’s possible that the condition was already set beforehand. In this case, the while avoids calling pthread_cond_wait, which would end up waiting forever since the signal had already occurred previously. Second, it’s possible for pthread_cond_wait to return even when nothing has signaled the condition variable. This is known as spurious wakeup and is a consequence of how condition variables are implemented internally. In the event of spurious wakeup, the while loop ensures that the waiting thread doesn’t exit prematurely.

The ma_dispatch_group_wait function first declares and initializes a mutex and a condition variable:

void ma_dispatch_group_wait(ma_dispatch_group_t group)
{
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);

Next, it grabs pointers to these variables:

pthread_mutex_t *mutexPtr = &mutex;
pthread_cond_t *condPtr = &cond;

This is done to work around an unfortunate collision with blocks. If the block passed to notify were to capture mutex and cond directly, they would be copied. These data types don’t tolerate being copied. Specifically, pthread_mutex_t has some internal alignment checks, at least on some implementations. Rather than figure out how to force the compiler to meet the library’s alignment needs, pthread_mutex_t is set up with some extra storage internally, and then the proper alignment for the internal storage is figured out when it’s initialized. In essence, there’s an internal field that has (at least) two possible positions, and the position is decided by init. When the variable is copied, that alignment may no longer be correct, and this can lead to a crash. By instead capturing pointers to these variables, the copy and potential crash are avoided.

A done variable is also declared to track when the block actually executes:

__block int done = 0;

Now notify is called with a block that acquires the lock, sets done, notifies the waiting thread, and releases the lock:

ma_dispatch_group_notify(group, ^{
pthread_mutex_lock(mutexPtr);
done = 1;
pthread_cond_broadcast(condPtr);
pthread_mutex_unlock(mutexPtr);
});

With that in place, the function waits for done to be set:

pthread_mutex_lock(mutexPtr);
while(!done)
pthread_cond_wait(condPtr, mutexPtr);
pthread_mutex_unlock(mutexPtr);
}

That’s everything!

ConclusionDispatch groups are an extremely useful API that makes it easy to coordinate multiple asynchronous actions and execute followup code once they all complete. The API is bare, but highly functional. Such a useful facility is relatively simple to implement. With the right idea, a little code can go a long way.

That’s it for today. Come back next time for more wacky adventures. Friday Q&A is driven by reader suggestions so, in the meantime, please send in your ideas for topics to cover.