Перейти к содержанию

3. AsyncSequence & AsyncStream

До этого момента мы с вами говорили только про задачи, которые исходят от пользователя, которые мы создаём сами, но что если нам нужно ловить последовательность каких-нибудь событий? например последовательность событий из NotificationCenter или своих событий - для этого у нас есть 2 инструмента: AsyncSequence и чуть более удобный инструмент - AsyncStream

AsyncSequence

AsyncSequence в Swift — это протокол, который позволяет работать с последовательностями асинхронных данных. В отличие от обычных последовательностей, которые возвращают значения немедленно, асинхронные последовательности могут приостанавливать выполнение для ожидания элементов.

Не стоит волноваться, на самом деле данный протокол очень похож на обычный Sequence. То есть мы как и в обычном случае выполняем просто итерации с помощью цикла for in. Но единственное отличие AsyncSequence в том, что мы ожидаем пока нам придёт какой-нибудь элемент последовательности, поэтому требуется использование await.

ВАЖНО AsyncSequence не генерирует значения и не содержит их - на самом деле данный протокол просто определяет способ доступа к элементам.

for await number in Counter(howHigh: 10) {
    print(number, terminator: " ")
} // Prints "1 2 3 4 5 6 7 8 9 10 "
  1. Создание Asynchronous Sequence:  Чтобы создать асинхронную последовательность, нужно реализовать протокол AsyncSequence. Это похоже на создание класса с определенными методами, с одной лишь разницей — он должен уметь работать "асинхронно", т.е., приостанавливаться и возобновляться.

  2. AsyncIteratorProtocol: Вместе с AsyncSequence, нам нужно реализовать AsyncIteratorProtocol, который определяет, как получать следующий элемент асинхронно.

  3. Цикл for await: for await — это специальный цикл, который позволяет перебрать элементы нашей асинхронной последовательности. Он похож на обычный for in, но предназначен для асинхронного использования.

Для понимания приложим пример создания AsyncSequence:

struct Counter: AsyncSequence {

    let limit: Int

    struct AsyncIterator : AsyncIteratorProtocol {
        let limit: Int
        var current = 1

        mutating func next() async -> Int? {
            guard !Task.isCancelled else {
                return nil
            }
 
            guard current <= limit else {
                return nil
            }
            let result = current
            current += 1
            returnresult
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(howHigh: limit)
    }
}

AsyncIterator

AsyncIterator — это часть механизма работы с AsyncSequence в Swift. Его задача — асинхронно предоставлять каждый следующий элемент из последовательности.

Ключевые моменты:

  1. Асинхронность: AsyncIterator позволяет нам получать элементы, которые могут быть доступны не сразу. Это очень важно, когда данные поступают, например, по сети или из какого-то ресурса, который не возвращает сразу все значения.

  2. Протокол AsyncIteratorProtocol: Чтобы создать AsyncIterator, нужно следовать протоколу AsyncIteratorProtocol. Он требует реализации одного основного метода: next.

Данного понимания будет достаточно на этапе начинающего разработчика

Async Stream

Гораздо чаще в разработке используется AsyncStream. AsyncStream: - это структура, которая реализует протокол AsyncSequence. Она предоставляет механизм для создания асинхронных последовательностей, где значения могут поступать в произвольное время. AsyncStream управляет внутренним состоянием и предоставляет методы для добавления новых значений (yield) и завершения последовательности (finish).

Простыми словами AsyncStream имеет более удобный интерфейс для работы с асинхронными последовательностями. Но как же пользоваться Async Stream?

Создание Async Stream

  1. Через замыкание

    let matchingLineNumbersStream = AsyncStream<Int> { continuation in  
        Task {  
            var lineNumber: Int = 0  
            for try await line in swiftFileUrl.lines {  
                lineNumber += 1 
                if matches(line) {  
                    continuation.yield(lineNumber)  
                 } 
             }
             continuation.finish()
        }  
    }
    
    Создание стрима через замыкание используется крайне редко, потому что это неочень удобно, в большинстве случаем используется следующий пункт

  2. makeStream() Данный метод возвращает сущность asyncStream и continuation, через который и можно добавлять задачи в asyncStream (см пример Задача с Собеседования)

Отправка задач в поток.

Задачки в stream отправляются через continuation с помощью метода yield

let stream = AsyncStream<Int> { continuation in  
    continuation.yield(5)
}

Обработка ивенов из стрима

let stream = AsyncStream<Int> { continuation in  
    continuation.yield(5)
}

Task.detached {
    for await event in stream { .... }
}

Завершение работы

После того как вы закончили обработку последовательности или вам больше не нужны события из стрима - необходимо в continuation вызывать метод finish

taskFromInterview.gif

Как в парадигме structured concurrency обрабатывать задачи последовательно? не нарушая порядка - первая задача пришла, первая обработалась, соблюдая принцип FIFO. Напишите реализацию вашей идеи

В данном вопросе конечно же ожидается, что кандидат работал и знает про AsyncStream. Реализация в коде выглядит так: streamExample.png

Пример выше позволяет очень удобно добавлять задачи в очередь для того, чтобы они обрабатывались последовательно.

taskFromInterview.gif

Предположим, что у меня есть источник данных, который генерирует события и отправляет их в AsyncStream. Я хочу получать эти события в двух разных местах. Как мне быть?

Пояснение: Есть код

let stream = AsyncStream<Int> { .... }

/// Можно ли использовать следующую конструкцию?

Task.detached {
    for await event in stream { .... }
}

Task.detached {
    for await event in stream { .... }
}

Ответ: Нет, такую конструкцию использовать нельзя! AsyncStream ну умеет работать с множественными подписчиками. А почему и как это обойти - мы рассмотрим в дальнейшем материале.

Итог

Из статьи выше должно быть просто понимание того, что в Structured Concurrency есть механизм для работы с последовательностями событий, что есть возможность организовать FIFO порядок выполнения задач. Вы можете самостоятельно попробовать написать несколько примеров, чтобы лучше понимать синтаксис AsyncSequence & AsyncStream или же перейти к решению задач в практикуме в разделе Junior.

Как они работают под капотом и углубленное изучение AsyncSequence/AsyncStream будет уже в разделе на мидл разработчика.