Arhn - архитектура программирования

Почему увеличение числа в 10 потоках Java не приводит к значению 10?

Я не понимаю, что значение «а» равно 0, почему «а» не равно 10, каков процесс выполнения этого кода, необходимо ли анализировать модель памяти Java? Вот мой тестовый код

package com.study.concurrent.demo;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

//@SpringBootTest
@Slf4j
class DemoApplicationTests {

    int a = 0;
    int b = 0;
    @Test
    void contextLoads() {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
//        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
//                try {
//                    semaphore.acquire();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                add();
                bdd();
//              log.info("a: {},concurrent_id: {}",a,Thread.currentThread().getName());
//                semaphore.release();
            });
        }
        executorService.shutdown();
        log.info("The final value of a:{}",a);
        log.info("The final value of b:{}",b);
    }


    public void add(){
        a++;
    }
    public void bdd(){
        b++;
    }

}

08.02.2021

  • Кроме того, вы должны (почти) всегда освобождать semaphore в блоке finally, чтобы убедиться, что вы случайно не забыли его освободить. 08.02.2021
  • Попробуйте использовать submit() вместо execute(). Submit возвращает Future, что позволяет вам дождаться завершения потока (или даже может просто получить значение, возвращаемое потоком). 08.02.2021
  • @markspace еще одна причина использовать submit для отправки Callable (как указано в последнем предложении) заключается в том, что Callable позволяют создавать проверенные исключения, поэтому вам не нужно беспокоиться об обработке InterruptedException явно. 08.02.2021
  • @developYan, в коде есть видимость, атомарность и проблемы «случается до». Я добавил ответ для обработки всех этих трех условий. 08.02.2021
  • Спасибо за ваши ответы, даже если они не приняты, . Ваши ответы развеяли мое замешательство 09.02.2021

Ответы:


1

Две причины:

  1. Вы не ждете завершения потоков, вы просто закрываете пул потоков (то есть заставляете пул потоков отклонять новые задачи, но продолжать обрабатывать существующие задачи).

  2. Вы не устанавливаете связь между операциями записи в пуле потоков и операциями чтения в основном потоке.

    Вы можете сделать это (среди других методов):

    1. Acquiring the semaphore before reading a;
    2. Используя submit вместо execute, чтобы получить Future<?> для каждой из отправленных задач, и вызывая метод Future.get() для всех возвращенных фьючерсов. Он задокументирован в Javadoc от ExecutorService что это устанавливает а бывает - раньше.

Первый пункт является основной причиной, по которой a выходит равным нулю: если я запускаю его локально и жду завершения пула потоков, a выходит на 10.

Однако тот факт, что он равен 10, не означает, что код работает правильно, не обращая внимания на второй момент: вам нужно применить модель памяти Java, чтобы иметь гарантии правильного функционирования.

08.02.2021
  • нет отношений «происходит до» или атомарности при чтении и записи в потоках пула потоков. Таким образом, нам может потребоваться исправить операцию обновления a и b, чтобы иметь предсказуемое поведение. 08.02.2021
  • Спасибо за ваш ответ, я забыл, что многопоточность и основной поток выполняются асинхронно, многопоточность также является случайной, и значение «b» из 10 не обязательно верно 09.02.2021

  • 2

    вопросы

    1. Видимость — несколько потоков обращаются к одной и той же переменной, и код не имеет никаких гарантий видимости.

    2. volatile может помочь с гарантией видимости

    3. Атомарность — несколько потоков обновляются с помощью операций a++ или b++. Это не атомарные операции. В первую очередь это набор операций 1. fetch a. 2. increment a. 3. update a. Переключение контекста может произойти в любом из этих состояний и привести к неправильному значению.

    4. Так что одной volatile наглядности для корректности недостаточно

    5. Используйте AtomicInteger, чтобы гарантировать атомарность операции приращения

    6. AtomicXXX может гарантировать атомарность одной операции

    7. Если возникла необходимость одновременного увеличения как a, так и b, то необходима некоторая форма синхронизации.

    8. Связь — это не связь между основным потоком и потоками задач исполнителя для передачи событий завершения.

    9. executorService.shutdown() не будет обеспечивать это общение

    10. Latch можно использовать для этого сообщения

    11. Или, как упомянул Энди, можно использовать Future

    Пример кода с AtomicInteger и Latch

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class DemoApplicationTests {
        final AtomicInteger a = new AtomicInteger(0);
        final AtomicInteger b = new AtomicInteger(0);
    
        void contextLoads() throws Exception {
            CountDownLatch latch = new CountDownLatch(10);
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            for (int i = 0; i < 10; i++) {
                executorService.execute(() -> {
                    add();
                    bdd();
                    latch.countDown();
                });
            }
            latch.await();
            executorService.shutdown();
            System.out.println("The final value of a:" + a);
            System.out.println("The final value of b:" + b);
        }
    
        public void add() {
            a.incrementAndGet();
        }
        public void bdd() {
            b.incrementAndGet();
        }
    
        public static void main(String[] args) throws Exception {
            new DemoApplicationTests().contextLoads();
        }
    }
    
    

    Неверное решение с threadpool size > 1 и CompletableFuture из-за условий гонки в a++, b++.

    Следующее может (мои знания ограничены и не могу подтвердить в любом случае) быть совершенно законным кодом для размера пула потоков 1 (скопировано из ответ Юджина)

    Но когда тот же код был выполнен с размером пула потоков › 1, это приведет к условиям гонки. (опять же намерение состоит в том, чтобы обсудить проблемы с несколькими потоками и видимостью данных как есть, а не проецировать ответ Юджина как неправильный. Ответ Юджина находится в контексте одного потока в пуле потоков и может быть совершенно правильным для однопоточного сценария пула потоков)

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class DemoApplicationTests {
        int a = 0;
        int b = 0;
    
        void contextLoads() throws Exception {
            final int count = 10000;
            ExecutorService executorService = Executors.newFixedThreadPool(100);
            List<Runnable> list = new ArrayList<>();
            for (int i = 0; i < count; i++) {
                Runnable r = () -> {
                    add();
                    bdd();
                };
                list.add(r);
            }
    
            CompletableFuture<?>[] futures = list.stream()
                .map(task -> CompletableFuture.runAsync(task, executorService))
                .toArray(CompletableFuture[]::new);
    
            CompletableFuture.allOf(futures).join();
    
            executorService.shutdown();
            System.out.println("The final value of a: " + a);
            System.out.println("The final value of b:" + b);
        }
    
        public void add() {
            a++;
        }
        public void bdd() {
            b++;
        }
    
        public static void main(String[] args) throws Exception {
            new DemoApplicationTests().contextLoads();
        }
    }
    
    

    Спасибо @Basil Bourque за исправление грамматических ошибок

    08.02.2021
  • ваш пример с AtomicInteger правильный, но зачем вам там CountDownLatch? 09.02.2021
  • @Eugene, поскольку мой код не использовал Futures для связи между задачами и основным потоком, я использовал CountDownLatch. Основное преимущество (просто личное понимание и может быть неверным): если только часть каждой задачи (скажем, начальная быстрая операция в памяти, за которой следует медленный ввод-вывод) требуется другой части программы, то я подумал, что CountDownLatch может быть полезная конструкция для координации. В противном случае пользователю придется разбить этот код на дополнительные детализированные задачи (отдельный набор задач для выполнения в памяти и ввода-вывода, чтобы ускорить другую ожидающую часть выполнения, но это может усложнить совместное использование данных). 09.02.2021
  • @Eugene, в отличие от приведенного выше комментария, даже удаление executorService.shutdown(); все равно приведет к правильному счету, напечатанному для a и b. (поскольку основной поток в примере полагается только на состояние этих двух значений). Атомарность и видимость управляются AtomicInteger, а ожидание/уведомление обрабатывается CountDownLatch. 09.02.2021
  • но AtomicInteger уже предлагает наглядность и атомарность. Если вы хотите видеть в результате 10, всегда CountDownLatch здесь не нужно. 09.02.2021
  • @Eugene, с таким кодом CountDownLatch используется для сообщения о завершении задач исполнителя основному потоку. 10.02.2021
  • @Eugene, мой предыдущий комментарий также гласит: Atomicity and visibility is controlled by AtomicInteger and wait/notify is handled by CountDownLatch. 10.02.2021
  • ты прав. Я пропустил, что нет awaitTermination. Но это заставило меня несколько раз почесать голову. Можно ли получить атомарность, без видимости? Потому что прямо сейчас, даже если вы отбросите CountDownLatch, вы все равно увидите 10, потому что AtomicInteger::incrementAndGet задокументировано для использования этого VarHandle::getAndAdd с надлежащей изменчивой семантикой... если это имеет смысл, что я пытаюсь сказать. 11.02.2021

  • 3

    В вашем пуле есть 1 поток, и вы отправляете в него 10 Runnables. Все они соберутся в queue, пока не наступит их очередь казнить. Вместо того, чтобы ждать, пока все они выполнят finish, вы вызываете shutDown, фактически говоря: больше никаких задач этот пул выполнять не будет. Когда именно это произойдет и сколько задач уже было обработано до того, как произошел вызов shutDown, сказать невозможно. Таким образом, вы получаете очень недетерминированный результат. Вы можете даже увидеть 10 в качестве вывода (иногда), но это не значит, что это правильно.

    Вместо этого вы можете подождать, пока пул завершит выполнение всех своих задач:

    executorService.awaitTermination(2, TimeUnit.SECONDS);
    executorService.shutdown();
    

    Что немного отстойно, так это то, что awaitTermination явно не упоминает, что если он вернет true, он установит отношение happens-before. Таким образом, чтобы быть педантичным с JLS, вам нужно будет работать, например, с этим Semaphore, чтобы установить необходимые гарантии.


    У вас есть гонка в вашем коде путем обновления общих a и b из нескольких потоков (даже если вы в настоящее время используете Executors.newFixedThreadPool(1)) без какой-либо синхронизации. Так что это тоже нуждается в исправлении. И Semaphore semaphore = new Semaphore(3); не поможет, поскольку вы все равно позволите 3 параллельным потокам работать с этими переменными; вам понадобится только один permit. Но тогда это действует как Lock, а не как Semaphore.

    08.02.2021
  • Согласно вопросу, в a и b между потоками возникают проблемы с видимостью для детерминированного увеличения значений. 08.02.2021
  • @Horse ... и ты хочешь сказать? 08.02.2021
  • Даже с этим изменением мы не можем гарантировать 10 в качестве вывода. Нам нужно обрабатывать a++ и b++ атомарно и с событиями до 08.02.2021
  • @Horse, а на чем именно основаны ваши рассуждения? 08.02.2021
  • согласно OP I don't understand the value of 'a' is 0,Why is 'a' not 10 ожидается получение 10 в качестве вычисленного значения. Эта операция public void add(){ a++; } не гарантирует видимость одного и того же значения в разных потоках, а также не гарантирует атомарного приращения значений. В соответствии с вашим текущим изменением мы можем дождаться завершения всех задач, прежде чем обращаться к a или b, но как мы гарантируем атомарные приращения правильно упорядоченного доступа к значениям? Я могу ошибаться, но это мое понимание. 08.02.2021
  • Независимо от количества потоков (в данном случае только 2 - 1. основной поток и 2. поток задач), есть ли гарантия свежести, когда эти потоки планируются пулом потоков (даже последовательно из-за размера 1) на нескольких ядрах на основе предыдущей копии значения? Возможно, с атомарностью все в порядке, поскольку в любой момент времени к значению обращается только 1 поток (из-за размера пула потоков — 1). 08.02.2021
  • @Конь, ты прав. Я был настолько поглощен пониманием того, обеспечит ли CompletableFuture::join необходимые гарантии видимости, что пропустил очевидное. спасибо за наводку здесь. 09.02.2021
  • Спасибо, что нашли время поделиться обновлением. 09.02.2021

  • 4

    Другие ответы правильные, с важными моментами. Кроме того, позвольте мне показать, как будущие технологии, разрабатываемые в Project Loom, упростят такой код.

    Проект Ткацкий станок

    Project Loom внесет некоторые изменения в Java. Экспериментальные сборки технологии Loom, основанные на ранней версии Java 17, уже доступно. Команда Loom запрашивает отзывы.

    AutoCloseable и попробуйте с ресурсами

    Одно изменение заключается в том, что ExecutorService расширяет AutoCloseable. Это означает, что мы можем использовать синтаксис try-with-resources для удобно и автоматически закрывать сервис после завершения блока try.

    Блокировать, пока отправленные задачи не будут выполнены

    Кроме того, поток управления блокируется в этом блоке попыток до тех пор, пока все отправленные задачи не будут выполнены/не пройдены/отменены. Нет необходимости отслеживать ход выполнения отдельных задач, если вам это не нужно.

    try (
            ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
    )
    {
        … submit tasks to the executor service, to be run on background threads.
    }
    // At this point, all submitted are done/failed/canceled. 
    // At this point, the executor service is automatically being shut down.
    

    AtomicInteger

    Как говорили другие, использование вами примитивов int для переменных a и b в потоках может завершиться неудачей из-за проблем с видимостью в Модель памяти Java. Один из вариантов — пометить их как volatile.

    Я предпочитаю альтернативу, используя классы Atomic…. Замените эти переменные int на AtomicInteger для переноса увеличивающегося числа счетчиков.

    Отметьте эти поля-члены final, чтобы каждый экземпляр никогда не заменялся.

    // Member fields
    final AtomicInteger a, b;
    
    // Constructor
    public Incrementor ( )
    {
        this.a = new AtomicInteger();
        this.b = new AtomicInteger();
    }
    

    Чтобы увеличить на единицу значение в AtomicInteger, мы вызываем incrementAndGet. Этот вызов возвращает новое увеличенное число. Поэтому я изменил сигнатуру ваших методов добавления, чтобы показать, что мы можем вернуть новое значение, если это когда-либо понадобится.

    // Logic
    public int addA ( )
    {
        return this.a.incrementAndGet();
    }
    
    public int addB ( )
    {
        return this.b.incrementAndGet();
    }
    

    Виртуальные потоки

    Еще одна функция, появившаяся в Project Loom, — это виртуальные потоки, также известные как волокна. Многие из этих облегченных потоков отображаются для работы в потоках платформы/ядра. Если ваш код часто блокируется, то использование виртуальных потоков значительно повысит производительность вашего приложения. Используйте новую функцию, вызвав Executors.newVirtualThreadExecutor.

    try (
            ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
    )
    { … }
    

    Пример класса

    Я написал класс Incrementor, похожий на ваш. Использование такого класса выглядит так:

            Incrementor incrementor = new Incrementor();
            try (
                    ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
            )
            {
                for ( int i = 0 ; i < 10 ; i++ )
                {
                    executorService.submit( ( ) -> {
                        int newValueA = incrementor.addA();
                        int newValueB = incrementor.addB();
                        System.out.println( "Thread " + Thread.currentThread().getId() + " incremented a & b to: " + newValueA + " & " + newValueB + " at " + Instant.now() );
                    } );
                }
            }
    

    Координация a и b

    Предостережение: как уже отмечали другие, такой код не увеличивает атомарно a и b вместе синхронно. Судя по всему, вам это не нужно, поэтому я игнорирую этот вопрос. Вы можете увидеть это поведение в действии в примере вывода запуска, показанном внизу ниже, где два потока чередуются во время их доступа к a и b. Выдержка здесь:

    Thread 24 incremented a & b to: 10 & 9 at 2021-02-09T02:21:30.270246Z
    Thread 23 incremented a & b to: 9 & 10 at 2021-02-09T02:21:30.270246Z
    

    Полный код класса

    Соберите весь этот код.

    Обратите внимание на простоту кода, когда (а) используется Loom и (б) используются константы Atomic…. Нет необходимости ни в семафорах, ни в защелках, ни в CompletableFuture, ни в вызове ExecutorService#shutdown.

    package work.basil.example;
    
    import java.time.Instant;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Incrementor
    {
        // Member fields
        final AtomicInteger a , b ;
    
        // Constructor
        public Incrementor ( )
        {
            this.a = new AtomicInteger();
            this.b = new AtomicInteger();
        }
    
        // Logic
        public int addA ( )
        {
            return this.a.incrementAndGet();
        }
    
        public int addB ( )
        {
            return this.b.incrementAndGet();
        }
    }
    

    И метод main для демонстрации использования этого класса.

        public static void main ( String[] args )
        {
            // Exercise this class by instantiating, then incrementing ten times.
            System.out.println( "INFO - `main` starting the demo. " + Instant.now() );
    
            Incrementor incrementor = new Incrementor();
            try (
                    ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
            )
            {
                for ( int i = 0 ; i < 10 ; i++ )
                {
                    executorService.submit( ( ) -> {
                        int newValueA = incrementor.addA();
                        int newValueB = incrementor.addB();
                        System.out.println( "Thread " + Thread.currentThread().getId() + " incremented a & b to: " + newValueA + " & " + newValueB + " at " + Instant.now() );
                    } );
                }
            }
    
            System.out.println( "INFO - At this point all submitted tasks are done/failed/canceled, and executor service is shutting down. " + Instant.now() );
            System.out.println( "incrementor.a.get() = " + incrementor.a.get() );
            System.out.println( "incrementor.b.get() = " + incrementor.b.get() );
            System.out.println( "INFO - `main` ending. " + Instant.now() );
        }
    

    Когда бег.

    INFO - `main` starting the demo. 2021-02-09T02:21:30.173816Z
    Thread 18 incremented a & b to: 4 & 4 at 2021-02-09T02:21:30.245812Z
    Thread 14 incremented a & b to: 1 & 1 at 2021-02-09T02:21:30.242306Z
    Thread 20 incremented a & b to: 6 & 6 at 2021-02-09T02:21:30.246784Z
    Thread 21 incremented a & b to: 8 & 8 at 2021-02-09T02:21:30.269666Z
    Thread 22 incremented a & b to: 7 & 7 at 2021-02-09T02:21:30.269666Z
    Thread 17 incremented a & b to: 3 & 3 at 2021-02-09T02:21:30.243580Z
    Thread 24 incremented a & b to: 10 & 9 at 2021-02-09T02:21:30.270246Z
    Thread 23 incremented a & b to: 9 & 10 at 2021-02-09T02:21:30.270246Z
    Thread 16 incremented a & b to: 2 & 2 at 2021-02-09T02:21:30.242335Z
    Thread 19 incremented a & b to: 5 & 5 at 2021-02-09T02:21:30.246646Z
    INFO - At this point all submitted tasks are done/failed/canceled, and executor service is shutting down. 2021-02-09T02:21:30.279542Z
    incrementor.a.get() = 10
    incrementor.b.get() = 10
    INFO - `main` ending. 2021-02-09T02:21:30.285862Z
    
    09.02.2021
  • Большое спасибо за напоминание о Project Loom. Я скачал на основе вашего ответа. 09.02.2021
  • Спасибо за щедрый ответ. это очень подробно 09.02.2021
  • @Horse Я очень рекомендую видео Рона Пресслера, если вы пробуете Loom. См. самые свежие из 2020 года. С момента первоначального дебюта Project Loom многое изменилось, например, больше не используется Fiber в качестве имени класса. 09.02.2021
  • @BasilBourque, конечно. Еще раз спасибо за то, что поделились информацией. 09.02.2021
  • не думайте, что ткацкий станок — панацея, по крайней мере пока. у них есть (большие?) проблемы с ThreadLocals, по крайней мере. Который частично работает с G1GC и странным образом на данный момент. насколько я понимаю, это еще далеко от реальности. 09.02.2021
  • @Евгений Согласен. Таким образом, мое использование слова «экспериментальный». Как сказано в словаре: (о новом изобретении или продукте), основанном на непроверенных идеях или методах и еще не утвержденном или доработанном 09.02.2021
  • Новые материалы

    Коллекции публикаций по глубокому обучению
    Последние пару месяцев я создавал коллекции последних академических публикаций по различным подполям глубокого обучения в моем блоге https://amundtveit.com - эта публикация дает обзор 25..

    Представляем: Pepita
    Фреймворк JavaScript с открытым исходным кодом Я знаю, что недостатка в фреймворках JavaScript нет. Но я просто не мог остановиться. Я хотел написать что-то сам, со своими собственными..

    Советы по коду Laravel #2
    1-) Найти // You can specify the columns you need // in when you use the find method on a model User::find(‘id’, [‘email’,’name’]); // You can increment or decrement // a field in..

    Работа с временными рядами спутниковых изображений, часть 3 (аналитика данных)
    Анализ временных рядов спутниковых изображений для данных наблюдений за большой Землей (arXiv) Автор: Рольф Симоэс , Жильберто Камара , Жильберто Кейрос , Фелипе Соуза , Педро Р. Андраде ,..

    3 способа решить квадратное уравнение (3-й мой любимый) -
    1. Методом факторизации — 2. Используя квадратичную формулу — 3. Заполнив квадрат — Давайте поймем это, решив это простое уравнение: Мы пытаемся сделать LHS,..

    Создание VR-миров с A-Frame
    Виртуальная реальность (и дополненная реальность) стали главными модными терминами в образовательных технологиях. С недорогими VR-гарнитурами, такими как Google Cardboard , и использованием..

    Демистификация рекурсии
    КОДЕКС Демистификация рекурсии Упрощенная концепция ошеломляющей О чем весь этот шум? Рекурсия, кажется, единственная тема, от которой у каждого начинающего студента-информатика..