Перейти к содержанию

2. Task Group

Мы уже очень хорошо поговорили про базовые возможности Structured Concurrency. Но остался один базовый вопрос, который хотелось бы разобрать. Как нам запустить одновременно на исполнение несколько задач?

Async let

Стоит понимать, что когда мы пишем несколько await подряд, то данный код будет выполняться последовательно, как в примере ниже

Task(priority: .high) {
    let firstResult = await foo()
    let secondResult = await foo1()
    let thirdResult = await foo2()    
}

А что делать, если я хочу чтобы все эти функции выполнялись одновременно?

Task(priority: .high) {
    async let firstTask = foo()
    async let secondTask = foo1()
    async let thirdTask = foo2()    
}
Данная конструкция позволяет запустить на выполнение несколько задач одновременно и далее вы можете обрабатывать результаты как вам удобно.

Например:

Task(priority: .high) {
    async let firstTask = foo()
    async let secondTask = foo1()
    async let thirdTask = foo2()    

    let (firstResult, secondResult) = await (firstTask, secondTask)
    /// Выполняем работу
    let thirdResult = await thirdTask
    /// Обрабатываем thirdResult
}
Но что делать, если мне требуется запустить одновременно не 2-3 задачи, а например 20-30?

Task Group

Task Group является мощным средством для организации параллельной работы нескольких задач. Task Group позволяет группировать множество асинхронных задач внутри одной задачи, управлять их выполнением параллельно и собирать результаты, когда они становятся доступны. Это особенно полезно, когда количество задач или их параметры известны только во время выполнения.

  • Параллельное выполнение: Task Group позволяет запускать несколько асинхронных задач параллельно, что может значительно ускорить выполнение программы.

  • Гибкость: Вы можете динамически добавлять задачи в группу во время выполнения, что делает Task Group очень гибким инструментом для работы с асинхронными операциями.

  • Управление ошибками: Если одна из задач в группе завершается с ошибкой, вся группа задач будет отменена. Это обеспечивает удобный способ обработки ошибок в параллельно выполняемых задачах.

  • Сбор результатов: Task Group предоставляет удобный способ собирать и агрегировать результаты выполнения задач.

taskFromInterview.gifРассмотрим классическую задачу с собеседования

/// Есть функция, которая асинхронно загружает данные:
func fetchData(from url: URL) async throws -> Data {
    let (data, _) = try await URLSession.shared.data(from: url)
    return data
}

func loadMultipleURLs(urls: [URL]) async throws -> [Data] { ... }

1) Необходимо написать реализацию метода loadMultipleURLs

2) * Необходимо учесть порядок, чтобы для каждого урла в входящем массиве соответствовала правильная Data

Решение

Решим сначала пункт 1:

/// Есть функция, которая асинхронно загружает данные:
func fetchData(from url: URL) async throws -> Data {
    let (data, _) = try await URLSession.shared.data(from: url)
    return data
}

func loadMultipleURLs(urls: [URL]) async throws -> [Data] { 
    try await withThrowingTaskGroup(of: Data.self) { group in        
        // Добавляем задачи в группу:
        for url in urls {
            group.addTask {
                return try await fetchData(from: url)
            }
        }

        var results = [Data]()
        // Собираем результаты:
        for try await result in group {
            results.append(result)
        }

        return results
    }
}

А теперь решим пункт 2:

Для того, чтобы решить пункт 2 необходимо, чтобы кандидат понимал в какой момент теряется порядок, а теряется он в момент ожидания результатов, так как они прилетают туда по мере выполнения задач. Предложим ниже вариант решения задачи:

func fetchData(from url: URL, index: Int) async throws -> (Data, Int) {
    let (data, _) = try await URLSession.shared.data(from: url)
    return (data, index)
}

func loadMultipleURLs(urls: [URL]) async throws -> [Data] {
    try await withThrowingTaskGroup(of: (Data, Int).self,
                                    returning: [Data].self) { group in

    // Добавляем задачи в группу:
    urls.enumerated().forEach { index, url in
        group.addTask {
            try await self.fetchData(from: url, index: index)
        }
    }

    var results = [Data](repeating: Data(), count: urls.count)
    // Собираем результаты:
    for try await result in group {
        let resultData = result.0
        let initialIndex = result.1
        results[initialIndex] = resultData
    }
    return results
}

Виды Task Group

Все группы инициализируется одинаково, но имеют разное предназначение.

  • Task Group - дефолтная группа, которая используется в случае если задачи в группе не выкидывают ошибок.

  • Throwing Task Group - данная группа используется, если вам требуется обработать ошибки выполнения задач

Все группы выше хранят результаты своих задач до момента пока группа не закончит выполнение всех задач и не деинициализируется. На это стали жаловаться на форумах эпла. Были заданы следующие вопросы

"Результат пришел, я его обработал и зачем он хранится дальше?" "Группа выполняет тяжёлые задачи и живёт долго, очень грузится память, как с эти бороться?"

  • Discarding Task Group - является частью Swift Concurrency и предназначен для выполнения асинхронных задач в группе, где результаты отдельных задач не требуются или могут быть проигнорированы. DiscardingTaskGroup позволяет выполнить задачи параллельно без необходимости собирать их результаты.

ОЧЕНЬ ВАЖНО

НИ В КОЕМ СЛУЧАЕ НЕЛЬЗЯ СОХРАНЯТЬ ГРУППУ И ИСПОЛЬЗОВАТЬ ЕЁ ВНЕ ЗАМЫКАНИЯ. ЭТО МОЖЕТ ПРИВЕСТИ К ДЕГРАДАЦИИ ВСЕЙ СИСТЕМЫ.

Динамическое добавление задач в группу

func fetchData(from url: URL, index: Int) async throws -> (Data, Int) {
    let (data, _) = try await URLSession.shared.data(from: url)
    return (data, index)
}

func loadMultipleURLs(urls: [URL]) async throws -> [Data] {
    try await withThrowingTaskGroup(of: (Data, Int).self,
                                    returning: [Data].self) { group in

    // Добавляем задачи в группу:
    urls.enumerated().forEach { index, url in
        group.addTask {
            try await self.fetchData(from: url, index: index)
        }
    }

    var results = [Data](repeating: Data(), count: urls.count)
    // Собираем результаты:
    for try await result in group {
        let resultData = result.0
        if resultData.isDamaged {
            group.addTask {
                try await self.fetchData(from: url, index: index)
            }
        } else {
            let initialIndex = result.1
            results[initialIndex] = resultData
        }
    }
    return results
}
Обратите внимание на обработку результатов. Мы можем обработать результат и если он нам по каким-нибудь причинам не подошёл, мы можем отправить задачу повторно в группу. Очень важно следить за тем, чтобы не возникло бесконечного цикла

TaskGroup представляет собой мощный инструмент для организации асинхронного кода, обеспечивая гибкость, безопасность и эффективность при работе с параллельными задачами.