Легковесная синхронизация с помощью класса Interlocked

Те из вас, кто пришел из неуправляемого мира программирования на Win32 API, вероятно, знают о существовании семейства функций Interlocked. . . К счастью, эти функции предоставлены в распоряжение разработчиков С# через статические методы класса Interlocked из пространства имен System.Threading.

Иногда при выполнении множества потоков возникает необходимость сопровождения одной простой переменной — обычно типа значения, но, может быть, и объекта — между несколькими потоками. Например, предположим, что по какой-то причине нужно отслеживать количество работающих потоков в статической целочисленной переменной. Когда поток стартует, он увеличивает значение этой переменной, а когда завершается — уменьшает это значение.

Очевидно, что понадобится каким-то образом синхронизировать доступ к этому значению, поскольку планировщик может отобрать управление у одного потока и передать его другому, когда первый находится в процессе обновления значения счетчика.

Еще хуже, когда тот же код должен выполняться параллельно на многопроцессорной машине. Для этой задачи можно использовать Interlocked. Increment и Interlocked.Decrement. Эти методы гарантированно модифицируют значение атомарно среди всех процессоров системы.

Рассмотрим следующий пример:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
using System;
using System.Threading;
public class EntryPoint
{
static private volatile int numberThreads = 0; 
static private Random rnd = new Random();
private static void RndThreadFunc ()  {
// Управлять счетчиком потоков и ожидать произвольный 
// промежуток времени от 1 до 12 секунд. 
Interlocked.Increment( ref numberThreads ); 
try {
int time = rnd.Next ( 1000, 12000 ); 
Thread.Sleep ( time );
finally {
Interlocked.Decrement ( 
ref numberThreads );
}
private static void RptThreadFunc()  { 
while ( true )  {
int threadCount = 0; 
threadCount =
Interlocked.CompareExchange ( ref numberThreads, 0, 0 ); 
Console.WriteLine ( "{0} поток(ов) активно", threadCount ); 
Thread.Sleep ( 1000 ) ;
}
static void Main()  {
// Запустить потоки отчетов. 
Thread reporter =
new Thread( new ThreadStart(EntryPoint.RptThreadFunc) ); 
reporter.IsBackground = true; 
reporter.Start();
// Запустить потоки, ожидающие в течение случайного периода времени. 
Thread[] rndthreads = new Thread[ 50 ]; 
for( uint i = 0;
i < 50; ++i )  {
rndthreads[i] = new Thread( new ThreadStart(EntryPoint.RndThreadFunc)  );
rndthreads[i] .Start ();
}

Эта небольшая программа создает 50 потоков переднего плана, которые не делают ничего, кроме ожидания в течение произвольного периода времени между 1 и 12 секунд. Она также создает фоновый поток, который выдает отчет о том, сколько потоков активно в настоящий момент.

Если взглянуть на метод RndThreadFunc, являющийся функцией потока, которую используют 50 потоков программы, в нем можно увидеть инкрементирование и декрементирование целочисленного значения с использованием методов Interlocked.

Обратите внимание на применение блока finally, который гарантирует уменьшение счетчика, независимо от того, как завершится поток.

Можно было бы воспользоваться шаблоном Disposable с помощью ключевого слова using, упаковав инкремент и декремент счетчика в отдельный класс, реализующий IDisposable.

Это позволило бы избавиться от громоздкого блока finally. Но в этом случае он не поможет, так как необходимо создать ссылочный тип, который будет хранить целочисленную переменную счетчика, поскольку невозможно применять ref к целому числу как к полю вспомогательного класса.

Относительно кода предыдущего примера Джон Скит (Jon Skeet) высказал замечательную мысль, которая подчеркивает важность знания поведения типов в определенных ситуациях. Статическому экземпляру Random в предыдущем коде примера ничего не известно о потоках.

Поэтому если два потока почти одновременно вызовут его методы, то состояние экземпляра Random теоретически может оказаться несогласованным, что приведет его в мир неопределенного поведения. Джон советует пометить Random в оболочку другого типа, который обеспечит необходимую синхронизацию. Однако в данном примере такой прием затенил бы основную идею.

Выше были показаны методы Interlocked. Increment и Interlocked. Decrement в действии. Но как насчет Interlocked. CompareExchange, который применяется в потоке отчета? Вспомните, что поскольку множество потоков пытаются выполнить запись в переменную threadCount, поток отчета должен читать ее значение также в синхронизированном режиме. И здесь на помощь приходит Interlocked.CompareExchange.

Метод Interlocked.CompareExchange, как и можно предположить из его имени, позволяет обменивать значение одной переменной, если текущее значение эквивалентно сравниваемому значению другой переменной в атомарном режиме, и возвращает значение, которое было сохранено ранее в этом месте.

Поскольку в классе Interlocked не предусмотрен метод для простого чтения значения Int32 в атомарной операции, этот обмен значения переменной numberThreads выполняется с ее собственным значением, а в качестве побочного эффекта метод Interlocked.CompareExchange возвращает значение, которое было в элементе.

Последний метод класса Interlocked, который следует рассмотреть — CompareExchange. Как уже было показано, этот небольшой метод действительно удобен.

Он похож на Interlocked.CompareExchange в том отношении, что позволяет передать значение из места положения или элемента в атомарном режиме. Однако он выполняет такую передачу, только если первоначальное значение при сравнении оказывается эквивалентным предоставленному образцу, и все эти операции выполняются совместно и атомарно.

В любом случае метод всегда возвращает первоначальное значение. Одним из чрезвычайно удобных применений метода CompareExchange является создание легковесной спин-блокировки (spin lock). Особенность спин-блокировки в том, что если она не может захватить блокировку, то запускает маленький цикл, ожидая появления такой возможности.

Обычно при реализации спин-блокировки поток переводится в спящий режим на очень краткий период времени при каждой неудачной попытке захватить блокировку. Таким образом, планировщик потоков получает возможность предоставить время для выполнения другому потоку, пока происходит ожидание.

Если поток не нужно переводить в спящий режим, а только освободить его текущий квант времени, то методу Thread.Sleep можно передать значение 0. Ниже показан пример:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
using System;
using System.10;
using System.Threading;
public class MySpinLock
{
public MySpinLock ( int spinWait )  { 
this.spinWait = spinWait;
}
public void Enter ()  {
while ( Interlocked.CompareExchange (ref theLock, 1, 0) == 1 )  { 
// Блокировка занята, ожидать. 
Thread.Sleep ( spinWait );
}
public void Exit()  {
// Сбросить блокировку.
Interlocked.CompareExchange( ref theLock,
о );
}
private volatile int theLock = 0; 
private int spinWait;
}
public class MySpinLockManager : IDisposable {
public MySpinLockManager ( MySpinLock spinLock )  { 
this.spinLock = spinLock; spinLock.Enter();
}
public void Dispose ()  { 
spinLock.Exit();
}
private MySpinLock spinLock;
}
public class EntryPoint {
static private Random rnd = new Random();
private static MySpinLock logLock = new MySpinLock( 10 ) ;
private static StreamWriter fsLog =
new StreamWriter ( File.Open("log.txt",
FileMode.Append, FileAccess.Write, FileShare.None)  ); 
private static void RndThreadFunc()  {
using( new MySpinLockManager(logLock) )  { 
fsLog.WriteLine ( "Поток запускается" ); 
fsLog.Flush();
}
int time = rnd.Next ( 10, 200 ); 
Thread.Sleep( time );
using( new MySpinLockManager(logLock) )  { 
fsLog.WriteLine ( "Поток завершается" ); 
fsLog.Flush ();
}
static void Main()  {
// Запустить потоки, ожидающие в течение случайного периода времени. 
Thread[] rndthreads = new Thread[ 50 ]; 
for( uint i = 0; i < 50; ++i )  { 
rndthreads[i] =
new Thread( new ThreadStart (
EntryPoint.RndThreadFunc)  ); 
rndthreads[i].Start ();
}

Этот пример подобен предыдущему. В нем создается 50 потоков, которые ожидают в течение случайного периода времени. Однако вместо управления счетчиком потоков он выводит строку в журнальный файл. Поскольку эта запись осуществляется из множества потоков, и методы экземпляра StreamWriter не являются безопасными в отношении потоков, запись должна выполняться в безопасной манере внутри контекста блокировки. И здесь на помощь приходит класс MySpinLock.

Внутренне он управляется переменной блокировки в форме целочисленного значения и использует Interlocked. CompareExchange для регулировки доступа к блокировке.

Вызов Interlocked.CompareExchange в MySpinLock.Enter говорит следующее.

1. Если значение блокировки равно 0, заменить его значением 1, чтобы обозначить установку блокировки; в противном случае не делать ничего.

2. Если значение элемента уже содержит 1, значит, он занят, и нужно перевести поток в спящее состояние и ожидать.

Оба эти действия происходят в атомарном режиме через класс Interlocked, так что нет никакой возможности, чтобы более одного потока в единицу времени захватило блокировку. Когда вызывается метод MySpinLock.Exit, то все, что ему нужно сделать — это сбросить блокировку. Однако это также должно быть сделано атомарно — отсюда и вызов Interlocked.CompareExchange.

Поскольку внутренне блокировка представлена типом int (который является int32), можно просто установить нулевое значение в MySpinLock.Exit. Однако, как упоминалось в предыдущей врезке, следует проявлять осторожность с блокировкой 64-битного значения при работе на 32-разрядной машине. Что если инженер поддержки изменит лежащее в основе хранилище с int на intPtr (тип, указывающий размер указателя, который зависит от платформы) и не внесет изменения в месте, где выполняется сброс theLock?

В данном примере иллюстрируется применение идиомы “disposable/using” для реализации детерминированной деструкции, когда вводится другой класс, в данном случае MySpinLockManager, чтобы реализовать идиому RAIL Этот избавляет от необходимости повсеместного написания блоков finally.

Конечно, все равно приходится помнить о применении ключевого слова using, но если следовать этой идиоме более тщательно, чем в данном примере, то понадобится реализовать финализатор, который выдаст предупреждение в отладочной сборке, если объект не будет надлежащим образом освобожден.

Имейте в виду, что спин-блокировки, реализованные подобным образом, не реенте-рабельны. Другими словами, блокировка не может быть захвачена более одного раза, как это, например, может сделать критичный блок или мьютекс. Это не значит, что спин-блокировки нельзя использовать с техникой рекурсивного программирования.

Это просто означает необходимость освобождения блокировки перед рекурсивным вызовом, в противном случае будет спровоцировано состояние взаимной блокировки.

Если нужен реентерабельный механизм, можно воспользоваться более структурированными объектами ожидания, такими как класс Monitor, который рассматривается ниже, или же объекты ожидания ядра системы.

Кстати, если хотите увидеть некоторый, так сказать, фейерверк, попробуйте закомментировать использование спин-блокировки в методе RndThreadFunc и запустить код несколько раз. Скорее всего, вы заметите, что вывод в журнальном файле стал несколько некрасивым. И эта некрасивость усилится при выполнении этого же кода на многопроцессорной машине.

Вы можете следить за любыми ответами на эту запись через RSS 2.0 ленту. Вы можете оставить ответ, или trackback с вашего собственного сайта.

Оставьте отзыв

XHTML: Вы можете использовать следующие теги: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

 
Rambler's Top100