Багатопотоковість у Java. Лекція 3: блокування та класи синхронізації потоків

1 червня
Володимир Фролов, Java-розробник, Микита Сізінцев, Android-розробник
Багатопотоковість у Java. Лекція 3: блокування та класи синхронізації потоків

Темну силу відчуваю я.
Даєш парсек за три роки.

У першій лекції ннашого курсу ми поділилися загальними відомостями про багатопотоковість, у другій — розглянули основи багатопотокових програм. Тепер обговоримо блокування та інші методи синхронізації потоків.

3.1 ReentrantLock

З появою Java 1.5 було додано пакет java.util.concurrent, що містить інструменти для роботи в багатопотоковому середовищі, зокрема блокування та колекції.

Пакет locks містить високорівневі блокування, використання яких є більше зручним у порівнянні з ключовим словом synchronized. Базовий інтерфейс блокувань — java.util.concurrent.locks.Lock. Методи цього інтерфейсу:

  • void lock() — захоплює блокування, якщо воно доступне. Якщо блокування зайняте іншим потоком, поточний потік, що виконує цей код, переходить у статус BLOCKED;
  • void lockInterruptibly() — робить те саме, що й метод lock(), але може перервати блокований потік і відновити виконання через InterruptedException;
  • boolean tryLock() — неблокуючий варіант методу lock(). Якщо вдалось отримати блокування, то метод повертає true;
  • boolean tryLock(long timeout, TimeUnit timeUnit) — те саме, що і tryLock(), за винятком того, що метод чекає певний час, перш ніж зупинити спробу отримання блокування;
  • void unlock() — відпускає блокування.

Захоплення та звільнення монітору — два незалежні методи: lock.lock() і lock.unlock(). Згадаймо, що стандартне блокування на основі ключового слова synchronized є синтаксичною конструкцією, тобто за її правильне використання відповідає компілятор:

Лістинг 1:

synchronized(synchronizedObj) {
//some code that should be synchronized
}

Наприклад, ось так: synchronized (synchronizedObj); написати не вийде.

Ключове слово synchronized можна використовувати так:

Лістинг 2:

Runnable r = new MyRunnable() { () ->
    System.out.println(“Hello!”);
}

Thread t = new Thread(r);

Для запуску потоку необхідно використовувати метод Thread.start(). Якщо викликати метод run(), він виконається у викликаючому потоці:

Лістинг 3:

Object a = new Object();
Object b = new Object();
Object c = new Object();
synchronized(a) {
    synchronized(b) {
        synchronized(c) {
        }
    }
}

Блокування c не знає, що було захоплено блокування b, відповідно, блокування b не знає, що було захоплено блокування a. Відпустити блокування a всередині блоку синхронізації блокування b або c не є можливим. Блокування на основі ReentrantLock таку можливість надає, тому що методи захоплення та звільнення блокування є окремими методами. Клас ReentrantLock покладає на програміста коректність захоплення та відпускання блокування. Записати це можна, наприклад так:

Лістинг 4:

private final Lock R_LOCK = ReentrantLock();
R_LOCK.lock();
try {
    //this code will be executed in synchronization section
} finally {
    R_LOCK.unlock();
}

Якщо відпустити блокування на один раз більше, тобто зробити rLock.unlock(); ще раз, отримаємо IllegalMonitorStateException. У лістингу 5 наведено правильний спосіб захоплення та відпускання блокування:

Лістинг 5:

private final Lock R_LOCK = ReentrantLock();
R_LOCK.lock();
try {
//this code will be executed in synchronization section
} finally {
R_LOCK.unlock();
}

Такі можливості Lock можуть стати у пригоді, коли проводиться робота, наприклад, з деревовидної структурою даних із декількох потоків. Якщо здійснюватиметься блокування кореневого вузла, при цьому змінюватимуться якісь дочірні вузли, продуктивність всієї системи буде низькою, тому що в кожен конкретний момент часу з деревом працюватиме лише один потік. Однак кожен вузол дерева можна зіставити з окремим блокуванням і використовувати те, піддерево якого підлягає зміні. Решту блокувань, розташовані вище по дереву, можна відпустити, щоб інші потоки могли їх захопити, та змінювати іншу частину дерева.

graph

Малюнок 1: Деревовидна структура даних

3.2 ЧЕСНЕ БЛОКУВАННЯ ТА ReentrantLock

Одна з реалізацій інтерфейсу Lock — клас ReenterantLock. Він дозволяє одному й тому самому потоку викликати метод lock, навіть якщо він його викликав раніше, без звільнення блокування.

Клас ReentrantLock, крім методів інтерфейсу Lock, має фабричний метод newCondition(). Цей метод повертає об'єкт Condition, який дозволяє додати поточний потік у wait set даного об'єкта Condition. Це дає можливість організовувати різні умови очікування по одному й тому самому блокуванню, чого не дозволяють ключове слово synchronized і зв'язки методів wait()/notify(). Для того, щоб об'єкт потрапив у wait set для даного Condition об'єкта, потрібно викликати метод await(). Щоб розбудити потік або потоки, які є у wait set, необхідно викликати методи signal() або signalAll(). Ці методи аналогічні методам wait(), notify() і notifyAll() у об'єкта Object. Оскільки методи wait(), notify() і notifyAll() у об'єкті Object — final, методам для об'єкта Condition придумали інші найменування.

Слід також зауважити, що об'єкт Condition має свої методи wait(), notify() і notifyAll(), проте настійно не рекомендується використовувати методи від об'єкта Object і методи від об'єкта Condition разом. Методи await(), signal() і signalAll() необхідно викликати, тільки якщо захопили відповідне блокування ReentrantLock. Кожен об'єкт Condition “знає”, від якого об'єкта блокування було породжено об'єкт. У зв'язці ReentrantLock і Condition для кожного об'єкта Condition є свій wait set, однак blocked set є загальним для всіх об'єктів Condition.

Wait set і blocked set — множини (sets), а не масив і не черга, і якщо потоків у множині більше одного, існує ймовірність, що якийсь потік перебуватиме в ній нескінченно довго та ніколи не розблокується. Можлива ситуація, коли потік, який очікує найдовше, розблоковано не буде взагалі.

Це демонструє програма в лістингу 6. У ньому три потоки в нескінченному циклі намагаються захопити блокування ref нескінченну кількість разів. Реалізація synchronized припускає сценарій, у якому потік А весь час володітиме блокуванням, а решта потоків, B та C, не встигатимуть його захопити. Це доводить що wait set і blocked set у synchronized — нечесні, тобто є неврегульованими колекціями. Ми бачимо конкуренцію, три потоки постійно намагаються захопити блокування. Захопивши блокування, попрацювавши та вийшовши, потік може знову захопити блокування. Решта потоків можуть продовжувати очікування. Реалізація synchronized допускає таку ситуацію. Щоб блокування було чесним, потрібно використовувати клас ReentrantLock, передавши в конструктор цього класу параметр true.

Лістинг 6:

public class AppFair {
    public static void main(String[] agrs) {
        final Object ref = new Object();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    synchronized(ref) {
                        System.out.println(“A”);
                    }
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    synchronized(ref) {
                        System.out.println(“B”);
                    }
                }
            }
        }).start();
            new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    synchronized(ref) {
                        System.out.println(“C”);
                    }
                }
            }
        }).start();
    }
}

Якщо в конструктор ReentrantLock передати параметр true, виходить чесне блокування. Воно перетворює blocked set на впорядковану чергу.

Лістинг 7:

public class AppFair2 {
    public static void main(String[] agrs) {
        final Lock ref = new ReentrantLock(true);
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    ref.lock();
                    System.out.println(“A”);
                    ref.unlock();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    ref.lock();
                    System.out.println(“B”);
                    ref.unlock();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    ref.lock();
                    System.out.println(“C”);
                    ref.unlock();
                }
            }
        }).start();
    }
}

3.3 ReentrantReadWriteLock

ReentrantReadWriteLock — вже не монітор. Кажуть, що це read/write блокування або shared/exclusive блокування. В об'єкта ReadWriteLock можна отримати блокування окремо на читання і на запис. Якщо кілька разів викликати метод readLock(), отримаємо посилання на один і той самий об'єкт. Така сама ситуація — з методом writeLock(). Програма, яка демонструє, що readLock і writeLock залишаються одними й тими самими об'єктами при наступних викликах:

Лістинг 8:

ReadWriteLock lock = new ReentrantReadWriteLock();
Lock rLock1 = lock.readLock();
Lock rLock2 = lock.readLock();
Lock wLock1 = lock.writeLock();
Lock wLock2 = lock.writeLock();
System.out.println(rLock1 == rLock2);
System.out.println(wLock1 == wLock2);

Об'єкт write lock може мати будь-яку кількість об'єктів condition. У write lock може заходити тільки один потік.

ReadLock може захопити кілька потоків водночас. Якщо один потік отримав readLock, а потім інший потік намагається отримати writeLock, блокування на запис він не отримає і буде заблокований, тобто потрапить у blocked set. Якщо один потік захопив блокування на запис, а інший намагається зробити те саме, другий потік буде заблоковано. Якщо блокування на запис захоплене, а інший потік також хоче захопити блокування на читання, цей потік буде заблоковано.

Потоки, які захопили блокування для читання, не можуть виконуватись одночасно з потоком, який захопив блокування на запис.

image

Малюнок 2: Кілька читаючих потоків і заблокований потік запису

Коли захоплено кілька блокувань на читання, а інший потік намагається захопити блокування на запис, після чого знову відбувається читання, перевага віддається потоку на запис.

image

Малюнок 3: Заблоковані читаючі потоки

ReentrantReadWriteLock має два режими: справедливий і несправедливий. У несправедливому режимі порядок отримання блокувань не визначено, у справедливому — реалізовано чергу потоків. Коли поточне блокування відпущене, одиночний потік запису, що очікує довше за інші, отримає доступ до блокування. Або ж блокування отримає група потоків-читачів, яка чекає довше ніж потік-письменник.

Також є можливість зниження блокування у статусі. Можна отримати блокування запису та знизити його до блокування читання, відпустивши блокування запису. Однак зворотна операція неможлива.

3.4 StampedLock

Цей тип блокування з'явився у Java 8. Це спроба реалізувати оптимістичне блокування, яке не блокує поточний потік при читанні даних. Також StampedLock є спробою зробити версіоновану структуру даних. Це блокування може бути й оптимістичним (виходить блокування на читання, яке не блокує поточний потік), і песимістичним (якщо не вдається отримати блокування, потік, що читає дані, блокується). При отриманні блокування повертається штамп — число типу long. Це значення використовується при відпусканні блокування та для перевірки того, чи є отримане раніше блокування все ще валідним. Також блокування можна розглядати як номер поточної версії даних, яку це блокування захищає.

StampedLock — не reentrant, тому слід побоюватися повторних захоплень блокування, вони приведуть до deadlock`у. Найпростіший приклад використання блокування наведено в лістингу 9.

Лістинг 9:

Runnable r = new Runnable() {
    @Override
    public void run() {
        long stamp = lock.writeLock(); 
        try { 
            sleep(1); 
            map.put("foo", "bar"); 
        } finally { 
            lock.unlockWrite(stamp); 
        } 
    }
};
Runnable r2 = new Runnable() {
    @Override
    public void run() {
        long stamp = lock.readLock(); 
        try { 
            System.out.println(map.get("foo")); 
            sleep(1); 
        } finally { 
            lock.unlockRead(stamp); 
        }
    }
};

Цей клас також дозволяє конвертувати блокування на читання у блокування на запис. Оптимістичне блокування для читання, яке викликається за допомогою методу tryOptimisticRead(), відрізняється тим, що воно завжди повертатиме штамп, не блокуючи поточний потік. Незалежно від того, чи зайнятий ресурс, до якого воно звернулось. Якщо ресурс було заблоковано блокуванням для запису та інший потік намагається отримати блокування для читання, повернутий штамп дорівнюватиме нулю. У будь-який момент можна перевірити, чи є блокування валідним, за допомогою lock.validate(stamp). Якщо ресурс було заблоковано блокуванням на запис, а інший потік намагається одержати неоптимістичне блокування на читання, потік буде заблоковано.

Приклад використання оптимістичного блокування:

Лістинг 10:

public class OptimisticLock {
    public static void main(String[] args) {
        Runnable r1 = new Runnable() {
            @Overrride
            public void run() {
    long stamp = lock.tryOptimisticRead(); 
    try { 
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                    sleep(1);
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                    sleep(2); 
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                } finally { 
                    lock.unlock(stamp); 
    }
            }
        }
    }
}
public class OptimisticExample {
    public static void main(String[] args) {
        Runnable r1 = new Runnable() {
            @Override
            public void run() {
    long stamp = lock.tryOptimisticRead(); 
    try { 
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                    sleep(1);
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                    sleep(2); 
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
    } finally { 
        lock.unlock(stamp); 
    } 
            }
        };
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
    long stamp = lock.writeLock(); 
    try { 
                    System.out.println("Write Lock acquired"); 
                    sleep(2); 
    } finally { 
                    lock.unlock(stamp); 
                    System.out.println("Write done"); 
    } 
            }
        };
        
        new Thread(r1).start();
        new Thread(r2).start();
    }
}

У лістингу 10 у потоці r1 захоплюється оптимістичне блокування, перевіряється його валідність і викликається метод Thread.sleep(), наприклад, для імітації якоїсь діяльності. У цей час у потоці r2 виходить блокування на запис, не чекаючи, поки звільниться блокування на читання.

Після отримання блокування на запис, блокування на читання припиняє бути правильним, навіть після закінчення запису. Таким чином, при використанні оптимістичних блокувань ви маєте постійно стежити за їхньою валідністю (перевіряти її потрібно вже після того, як виконано всі необхідні операції).

Ще один приклад із використанням оптимістичного блокування:

Лістинг 11:

double distanceFromOriginV1() { // A read-only method
     long stamp;
     if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
       double currentX = x;
       double currentY = y;
       if (sl.validate(stamp))
         return Math.sqrt(currentX * currentX + currentY * currentY);
     }
     stamp = sl.readLock(); // fall back to read lock
     try {
       double currentX = x;
       double currentY = y;
         return Math.sqrt(currentX * currentX + currentY * currentY);
     } finally {
       sl.unlockRead(stamp);
     }
}

Лістинг 12 — приклад читання з використанням оптимістичного блокування.

Лістинг 12:

double readValues() {
    long stamp;
    if ((stamp = sl.tryOptimisticRead()) != 0L) { 
        double currentX = //reading value from some shared object
        double currentY = //reading value from some shared object
        if (sl.validate(stamp)) {
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }
    }
    return <default value>;
}

У коді, представленому в лістингу 12, захоплюється оптимістичне блокування та читаються значення у локальні змінні. Зверніть увагу, що читання має відбуватись аж до примітивних типів або незмінних об'єктів, і читання має відбуватись у конструкції коду tryOptimisticLock()/validate(stamp). Усередині цього блоку небезпечно зберігати посилання на змінюваний об'єкт і після конструкції validate(stamp) використовувати змінні поля цього об'єкта. Необхідно дістатись необхідних вам надалі примітивних полів, незмінних об'єктів або створити копію змінюваного. Це потрібно, щоб уникнути зміни даних після виходу з блокування та виклику методу validate.

Більш того, не рекомендується всередині блоку tryOptimisticLock()/validate(stamp) робити незворотні зміни, якщо validate(stamp) не пройде. Власне, краще взагалі нічого, крім читання, у цьому блоці не робити. Блок tryOptimisticLock()/validate(stamp) є способом отримати несуперечливе представлення даних, а всі операції з цими даними краще виконати потім, коли успішний validate(stamp) дасть вам право вважати, що отримані дані дійсно не є суперечливими.

3.5 CowntDownLatch

CowntDownLatch — засувка зі зворотним відліком. Дозволяє задати значення лічильника, а коли його значення дорівнюватиме нулю, заблоковані потоки на цьому блокуванні будуть одночасно запущені. У конструктор CountDownLatch передається число. Потік, який використовує CountDownLatch, може зменшити це число та блокуватись. Для зменшення числа в лічильнику викликається метод countDown(). Після виклику цього методу потік продовжує своє виконання. Для того, щоб заблокувати потік, необхідно викликати метод await().

Розглянемо програму, яка моделює гонку машин. Спочатку кожна машина під'їжджає до стартової лінії, потім всі вони отримують команди: “На старт!”, “Увага!”, “Марш!”. Кожна машина представлена потоком. Припустимо, що кожна машина має постійну швидкість, і поки вона продовжує гонку, потік на певний час засинає. Швидкість машини вибирається довільно. Спочатку всі машини під'їжджають до старту. Це перевіряється циклом while. Як тільки всі машини під'їхали, даються три команди, лічильник у CountDownLatch стає рівним 0, і всі 5 потоків запускаються. Якийсь потік виконується швидше, інший — повільніше, потім програма завершує виконання. Метод await() аналогічний методу wait() у класі Object.

Лістинг 13:

public class Race {
    //Створюємо CountDownLatch на 8 «умов»
    private static final CountDownLatch START = new CountDownLatch(8);
    //Умовна довжина гоночної траси
    private static final int TRACK_LENGTH = 500000;
    public static void main(String[] args) throws InterruptedException {
        int carSpeed = 0;
        for (int i = 1; i <= 5; i++) {
            carSpeed = (int) (Math.random() * 100 + 50);
            new Thread(new Car(i, carSpeed)).start();
            Thread.sleep(1000);
        }
        while (START.getCount() > 3) { //Перевіряємо, чи зібрались усі автомобілі
            Thread.sleep(100);              //у стартовій прямій. Якщо ні, чекаємо 100ms
        }
        Thread.sleep(1000);
        System.out.println(«На старт!»);
        START.countDown();//Команду дано, зменшуємо лічильник на 1
        Thread.sleep(1000);
        System.out.println(Увага!»);
        START.countDown();//Команду дано, зменшуємо лічильник на 1
        Thread.sleep(1000);
        System.out.println(«Марш!»);
        START.countDown();//Команду дано, зменшуємо лічильник на 1
        //лічильник стає рівним нулю, і все очікуючі потоки
        //одночасно розблокуються
    }
    public static class Car implements Runnable {
        private int carNumber;
        private int carSpeed; //вважаємо, що швидкість автомобіля є постійною
        public Car(int carNumber, int carSpeed) {
            this.carNumber = carNumber;
            this.carSpeed = carSpeed;
        }
        @Override
        public void run() {
            try {
                System.out.printf("Автомобіль №%d під'їхав до стартової прямої. \ N", carNumber);
                //Автомобіль під'їхав до стартової прямої — умову виконано
                //зменшуємо лічильник на 1
                START.countDown();
                //метод await() блокує потік, що викликав його, доки
                //лічильник CountDownLatch не стане рівним 0
                START.await();
                Thread.sleep(TRACK_LENGTH / carSpeed);//чекаємо, поки автомобіль проїде трасу
                System.out.printf("Автомобіль №%d фінішував!\n", carNumber);
            } catch (InterruptedException e) {
            }
        }
    }
}

3.6 CyclicBarrier

Циклічний бар'єр дуже схожий на CountDownLatch, однак має кілька відмінностей:

  • Методи countDown() і await() об'єднані в один — await(), після виклику якого потік блокується, якщо число не дорівнює нулю.
  • Клас CyclicBarrier можна використовувати повторно. Як тільки значення стає рівним нулю, це значення відновлюється, і об'єкт класу можна використовувати заново
  • Як тільки значення лічильника стало рівним нулю, є можливість виконати додатковий runnable, який може бути передано в конструктор CyclicBarrier
image

Малюнок 4: Принцип роботи CyclicBarrier

3.7 Semaphore

Примітив синхронізації дозволяє певній кількості потоків виконувати критичну секцію коду, захищену семафором. Коли створюється семафор, у конструктор передається кількість пропусків. Під час захоплення семафору кількість допустимих пропусків зменшується. Коли семафор звільняється, кількість пропусків збільшується. Також у методі захоплення семафору можна вказати, яку кількість пропусків візьме потік. У методі звільнення семафору можна вказати кількість пропусків, яку буде повернуто: ця кількість не може бути більше, ніж захоплена кількість пропусків.

Коли другим параметром у конструктор передається true, потоки, які очікують отримання пропуску, тобто хочуть увійти у критичну секцію, шикуються у чергу. Тобто замість blocking set використовується черга типу FIFO.

Лістинг 14:

class Pool {
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    // Not a particularly efficient data structure; just for demo
    protected Object[] items = new Object[MAX_AVAILABLE];
    protected boolean[] used = new boolean[MAX_AVAILABLE];
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }
    public void putItem(Object x) {
        if (markAsUnused(x)) {
            available.release();
        }
    }
    protected synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null;
    }
    protected synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else {
                    return false;
                }
            }
        }
        return false;
    }
}

3.8 Exchanger

Exchanger — точка синхронізації, яка дозволяє двом потокам обмінюватися значеннями. При створенні екземпляру цього об'єкта вказується тип об'єкта, яким обмінюватимуться потоки. В об'єкта цього класу є метод exchange, що приймає значення, яке потрібно передати іншому потоку. Метод повертає значення, яке інший потік передав поточному. Коли потік викликає метод exchange(), а інший потік не готовий до обміну значеннями, поточний потік переходить у стан WAITING. Зазначимо, що цей клас варто використовувати для обміну значеннями між двома потоками. Якщо один Exchanger використовується тільки одним потоком або кількість потоків-користувачів більше двох, то якийсь потік буде у стані WAITING нескінченно довго, поки не зупиниться програма. Якщо перший потік підготував значення, і метод exchange виконує другий потік, то другий потік після виконання методу не блокується. Також клас має перевантажений метод exchange, що приймає час, який потік знаходитиметься у стані WAITING. Після закінчення цього часу генерується виключення TimeOutException, що перевіряється.

3.9 Phaser

Phaser — найскладніший синхронізатор, який, втім, володіє найбільшим функціоналом. Phaser схожий на CountDownLatch, він дозволяє синхронізувати декілька потоків, причому їхня кількість може змінюватись у різних фазах. Phaser — блокування, яке можна перевикористати. Робота Phaser складається з виконання фаз — виконавши чергову фазу, потік викликає у Phaser метод arriveAndAwaitAdvance(). Роботу Phaser можна уявити як ланцюжок завдань або фаз, у кожній з яких може брати участь різна кількість потоків.

Щоб поточний потік брав участь у певній фазі обчислень, він реєструється викликом методу register() в об'єкта Phaser. Після того, як один потік закінчить свої обчислення та захоче почекати на інші потоки, він викликає метод arriveAndAwaitAdvance() у блокування. Це блокуючий метод. Потік продовжить своє виконання, коли кількість потоків, що спричинили метод arriveAndAwaitAdvance(), дорівнюватиме кількості зареєстрованих потоків, які викликали метод register(). Якщо потік не хоче брати участь у наступній фазі виконання з використанням Phaser, слід викликати метод arriveAndDeregister(). Якщо потік виконав свої обчислення, але не хоче чекати, поки інші потоки завершать етап своїх обчислень, потік має викликати метод arrive(). При переході на наступний етап виконується логіка малюнку 5:

image

Малюнок 5. Код, що виконується, коли Phaser переходить на наступний етап виконання

Максимальна кількість потоків, які можуть бути синхронізовані з використанням блокування Phaser, 65635. Також можна об'єднувати кілька блокувань Phaser у відношення пращур-нащадок. На цьому синхронізаторі можливо реалізувати CountDownLatch і CyclicBarrier.

CountDownLatch — аналог методу countDown ():

Лістинг 15:

phaser.arriveAndDeregister();

Аналог методу await():

Лістинг 16:

    if (!phaser.isTerminated()) {
        phaser.awaitAdvance(phaser.getPhase());
    }

Реалізація класу CyclicBarrier — просто виклик метод arriveAndAwaitAdvance().

3.10 ПРОЦЕСИ ПРИ НОРМАЛЬНОМУ ЗАВЕРШЕННІ ПОТОКУ

Розглянемо, як працює метод join(). Цей метод був спочатку у класі Thread, який з'явився ще у Java 1. Отже, можна зробити висновок, що всередині метод join використовує методи wait і notify. Це дійсно так: якщо зайти в метод join, можна побачити, що виконується ось такий код (код з очікуванням періоду часу не приведено):

Лістинг 17:

if (millis == 0) {
while (isAlive()) {
        wait(0);
    }
}

Якщо виконався рядок коду t.join(), потік, який викликав метод join на іншому потоці t, потрапляє у wait set потоку t. Виникає закономірне запитання: якщо у Java 1 не було ніяких додаткових засобів, як потік перекладався зі стану wait у стан blocked, а потім і в runnable? Щоб розібратись, розглянемо наступну програму:

Лістинг 18:

public class TestNotify {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(new ParentThread());
        t.start();
        t.join();
        System.out.println("ExceptionHandlerExample thread finished");
    }
}
class ParentThread implements Runnable {
    @Override
    public void run() {
        final int numberOfChildrenThreads = 5;
        CountDownLatch latch = new CountDownLatch(numberOfChildrenThreads);
        for (int i = 0; i < numberOfChildrenThreads; ++i) {
            Thread thread = new Thread(new ChildThread(currentThread, latch, i));
            thread.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (Thread.currentThread()) {
        }
        System.out.println("Joined thread was finished");
    }
}
class ChildThread implements Runnable {
   private Thread parentThread;
   private CountDownLatch latch;
   private int n = 0;
   public ChildThread(Thread parentThread, CountDownLatch latch, int n) {
       this.parentThread = parentThread;
       this.latch = latch;
       this.n = n;
   }
   @Override
   public void run() {
       Thread currentTread = Thread.currentThread();
       currentTread.setName("Name = " + n);
       synchronized (parentThread) {
           latch.countDown();
           try {
               parentThread.wait();
               System.out.println("Thread" + currentTread.getName() +  "was notified");
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}

У лістингу 18 створюється потік parent thread, і до нього “приєднується” головний потік. Потім у wait set потоку parent thread додається п'ять дочірніх потоків. Конструкція в лістингу 19.

Лістинг 19:

synchronized (Thread.currentThread()) {
}

захоплює блокування на потоці parent thread. Це здійснюється на той випадок, якщо планувальник потоків вирішить переключити контекст з останнього п'ятого потоку після виконання рядку latch.countDown() на потік parent thread. Тоді потік parentThread може завершитись раніше, ніж п'ятий дочірній потік.

Якщо використовувати конструкцію, наведену в лістингу 19, такої ситуації не трапиться, тому що блокування parent thread буде захоплено останнім потоком. Потім потік parent thread завершує своє виконання, що підтверджує висновок рядка “Joined thread was finished”. Після цього показуються записи про те, що дочірні потоки було переведено зі стану waiting у стан blocked, а потім runnable. Тобто для потоку parent thread викликався метод notifyAll(). І це віртуальна Java-машина здійснює самостійно під капотом. Також при завершенні потоку змінна isAlive для потоку встановлюється у false. З прикладу в лістингу 18 можна зробити висновок, що коли виконання коду потоку проходить закриваючу фігурну дужку, потік ще виконує додаткові дії з коректної зупинки потоку.

  • Україна, Дніпро; Україна, Київ; Україна, Львів; Україна, Одеса; Україна, Харків; Україна, Херсон
    3 грудня
  • Україна, Дніпро; Україна, Київ; Україна, Львів; Україна, Одеса; Україна, Харків; Україна, Херсон
    Сьогодні