Семафоры и другие примитивы синхронизации в GCD

Разработка многопоточных приложений неизбежно ставит перед нами задачу синхронизации доступа к общим ресурсам. Grand Central Dispatch предлагает набор мощных примитивов синхронизации, которые помогают избежать гонок данных (race conditions) и других проблем многопоточности. В этой статье мы подробно рассмотрим каждый из этих инструментов и научимся выбирать оптимальный для конкретной задачи.


Семафоры: универсальный механизм синхронизации

Семафоры - один из самых гибких примитивов синхронизации, который позволяет контролировать доступ к ресурсам с ограниченной пропускной способностью.

Основы работы с DispatchSemaphore

// Создание семафора со значением счетчика 1
let semaphore = DispatchSemaphore(value: 1)

// Функция, использующая семафор для синхронизации
func synchronizedAccess() {
    // Декрементируем счетчик семафора, блокируемся если счетчик < 0
    semaphore.wait()

    // Критическая секция - только один поток может быть здесь
    accessSharedResource()

    // Инкрементируем счетчик семафора, сигнализируя другим потокам
    semaphore.signal()
}

Как работает семафор:

  1. Внутри семафора хранится счетчик (целое число)
  2. wait() уменьшает счетчик на 1:
    • Если результат ≥ 0, выполнение продолжается
    • Если результат < 0, поток блокируется до вызова signal()
  3. signal() увеличивает счетчик на 1:
    • Если есть заблокированные потоки, один из них разблокируется

Сценарии использования семафоров

1. Взаимное исключение (mutex)

Используя семафор с начальным значением 1, можно создать взаимное исключение:

let mutex = DispatchSemaphore(value: 1)
var sharedData = [String: Int]()

func updateSharedData(key: String, value: Int) {
    mutex.wait()
    defer { mutex.signal() }

    // Безопасное обновление общих данных
    sharedData[key] = value
}

2. Ограничение количества одновременных операций

Семафоры позволяют ограничить число параллельных операций:

// Ограничение одновременного выполнения 3 операциями
let limitingSemaphore = DispatchSemaphore(value: 3)

func processData(items: [Data]) {
    let queue = DispatchQueue.global(qos: .utility)

    for item in items {
        // Ожидаем, если уже запущены 3 операции
        limitingSemaphore.wait()

        queue.async {
            self.processItem(item)

            // Сигнализируем о завершении операции
            limitingSemaphore.signal()
        }
    }
}

3. Синхронизация асинхронных операций

Семафоры помогают сделать асинхронный код синхронным:

func fetchDataSynchronously() -> Data {
    let semaphore = DispatchSemaphore(value: 0)
    var fetchedData: Data?

    URLSession.shared.dataTask(with: url) { data, response, error in
        fetchedData = data
        semaphore.signal()
    }.resume()

    // Блокируем поток до получения данных
    semaphore.wait()

    return fetchedData ?? Data()
}

Внимание! Никогда не используйте такой подход на главном (UI) потоке.

Барьеры (Barriers): безопасная запись в параллельные очереди

Барьеры позволяют выполнять операции чтения параллельно, но гарантируют эксклюзивность операций записи.

Принцип работы барьеров

class ThreadSafeArray<T> {
    private var array = [T]()
    private let queue = DispatchQueue(label: "com.myapp.threadSafeArray", attributes: .concurrent)

    // Чтение - может выполняться параллельно
    func read() -> [T] {
        return queue.sync { self.array }
    }

    // Операция с барьером - выполняется эксклюзивно
    func append(_ element: T) {
        queue.async(flags: .barrier) {
            self.array.append(element)
        }
    }

    // Еще одна операция с барьером
    func removeAll() {
        queue.async(flags: .barrier) {
            self.array.removeAll()
        }
    }
}

Барьер гарантирует, что:

  • Все задачи, отправленные до барьера, завершатся перед выполнением задачи-барьера
  • Задача-барьер выполняется эксклюзивно
  • Задачи, отправленные после барьера, не начнутся до завершения задачи-барьера

Важно! Барьеры работают только с пользовательскими параллельными очередями. На последовательных или глобальных очередях .barrier не имеет эффекта барьера.

DispatchWorkItem: расширенный контроль над задачами

DispatchWorkItem предоставляет дополнительную гибкость для управления задачами, включая отмену.

// Создание задачи
let workItem = DispatchWorkItem {
    performExpensiveTask()
}

// Запуск задачи асинхронно
DispatchQueue.global().async(execute: workItem)

// Отмена задачи, если она еще не началась
workItem.cancel()

// Проверка состояния задачи
if workItem.isCancelled {
    print("Задача была отменена")
}

// Уведомление о завершении
workItem.notify(queue: .main) {
    print("Задача завершена или отменена")
}

Использование DispatchWorkItem для таймаутов

func taskWithTimeout() {
    let workItem = DispatchWorkItem {
        // Длительная операция
        performLongRunningTask()
    }

    // Запускаем задачу
    DispatchQueue.global().async(execute: workItem)

    // Устанавливаем таймаут в 5 секунд
    DispatchQueue.global().asyncAfter(deadline: .now() + 5) {
        if !workItem.isCancelled {
            workItem.cancel()
            print("Задача отменена по таймауту")
        }
    }
}

DispatchSource: мониторинг системных событий

DispatchSource используется для мониторинга системных событий и выполнения задач при их возникновении.

Пример таймера с DispatchSource

class PreciseTimer {
    private let timer: DispatchSourceTimer
    private var isSuspended = true

    init(interval: TimeInterval, queue: DispatchQueue = .main, handler: @escaping () -> Void) {
        timer = DispatchSource.makeTimerSource(queue: queue)

        timer.setEventHandler(handler: handler)
        timer.schedule(deadline: .now(), repeating: interval)

        // Таймер начинает работу только после resume()
        timer.resume()
        isSuspended = false
    }

    func suspend() {
        if !isSuspended {
            timer.suspend()
            isSuspended = true
        }
    }

    func resume() {
        if isSuspended {
            timer.resume()
            isSuspended = false
        }
    }

    func cancel() {
        timer.cancel()
    }
}

Мониторинг файловой системы

func monitorFile(at path: String) {
    let fileDescriptor = open(path, O_EVTONLY)

    let source = DispatchSource.makeFileSystemObjectSource(
        fileDescriptor: fileDescriptor,
        eventMask: [.write, .delete],
        queue: .global()
    )

    source.setEventHandler {
        let flags = source.data

        if flags.contains(.write) {
            print("Файл был изменен")
        }

        if flags.contains(.delete) {
            print("Файл был удален")
            source.cancel()
        }
    }

    source.setCancelHandler {
        close(fileDescriptor)
    }

    source.resume()
}

Прочие механизмы синхронизации

OSSpinLock (устаревший, не рекомендуется)

Спин-блокировка - это примитив синхронизации, который в ожидании освобождения не переводит поток в сон, а продолжает проверять состояние блокировки в цикле, расходуя процессорное время.

Из-за проблем с приоритетным инверсией (низкоприоритетные потоки могут блокировать высокоприоритетные) и проблем с производительностью, Apple не рекомендует использовать OSSpinLock.

os_unfair_lock (современная замена OSSpinLock)

import os.lock

class ThreadSafeCounter {
    private var counter = 0
    private let lock = os_unfair_lock_t.allocate(capacity: 1)

    init() {
        lock.initialize(to: os_unfair_lock())
    }

    deinit {
        lock.deallocate()
    }

    func increment() -> Int {
        os_unfair_lock_lock(lock)
        defer { os_unfair_lock_unlock(lock) }

        counter += 1
        return counter
    }
}

NSLock и NSRecursiveLock

Objective-C объекты блокировки, которые более тяжеловесны, но имеют больше возможностей:

class SafeQueue<T> {
    private var items = [T]()
    private let lock = NSLock()

    func enqueue(_ item: T) {
        lock.lock()
        defer { lock.unlock() }

        items.append(item)
    }

    func dequeue() -> T? {
        lock.lock()
        defer { lock.unlock() }

        return items.isEmpty ? nil : items.removeFirst()
    }
}

NSRecursiveLock позволяет одному и тому же потоку захватывать блокировку несколько раз:

let recursiveLock = NSRecursiveLock()

func recursiveFunction(depth: Int) {
    recursiveLock.lock()
    defer { recursiveLock.unlock() }

    if depth > 0 {
        recursiveFunction(depth: depth - 1)
    }
}

Atomic Property (Swift 5.7+)

В Swift 5.7 добавлен атомарный доступ к свойствам с помощью @preconcurrency и глобальных акторов:

@globalActor
actor MyGlobalActor {
    static let shared = MyGlobalActor()
}

class Counter {
    @MyGlobalActor var count = 0

    @MyGlobalActor
    func increment() -> Int {
        count += 1
        return count
    }
}

Не забудьте загрузить приложение iJun в AppStore и подписаться на мой YouTube канал для получения дополнительных материалов по iOS-разработке.

Список литературы и дополнительных материалов

  1. DispatchSemaphore - Apple Developer Documentation
  2. Concurrency Programming Guide - Apple Developer Documentation
  3. WWDC 2016: Thread Sanitizer and Static Analysis
Также читайте:  Что такое @escaping в Swift?