Асинхронные вызовы методов

Хотя управлять помещением элементов работы в пул потоков можно непосредственно через класс ThreadPool, более популярный способ использования пула потоков заключается в вызовах асинхронных делегатов.

При объявлении делегата CLR создает класс-наследник System.MulticastDelegate. Один из определенных в нем методов, Invoke, принимает точно ту же сигнатуру функции, что и определение делегата. Разумеется в С# предусмотрено синтаксическое сокращение для вызова метода Invoke. Но наряду с Invoke среда CLR также определяет два метода Beginlnvoke и Endlnvoke, являющиеся сердцем шаблона асинхронной обработки, которая используется CLR.

Базовая идея очевидным образом исходит из имен методов. При вызове Beginlnvoke на делегате операция откладывается для выполнения в другом потоке. Вызове метода Endlnvoke приводит к возврату результатов операции. Если операция не завершена на момент вызова Endlnvoke, вызывающий поток блокируется до тех пор, пока она не будет завершена.

Рассмотрим краткий пример, демонстрирующий общий шаблон в действии. Предположим, что есть метод, вычисляющий сумму налогов за год, и его требуется вызывать асинхронно, потому что его выполнение занимает ощутимое время.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
using System;
using System.Threading;
public class EntryPoint
{
// Объявление делегата для асинхронного вызова.
private delegate Decimal ComputeTaxesDelegate ( int year ); // Метод для вычисления налогов.
private static Decimal ComputeTaxes ( int year )  {
Console.WriteLine ( "Вычисление налогов в потоке {0}", Thread.CurrentThread.ManagedThreadld ) ;
// Здесь происходит длительное вычисление. 
Thread.Sleep ( 6000 ); 
return 4356.98M;
}
static void Main()  { // Выполним асинхронный вызов, создав делегат и вызвав его. 
ComputeTaxesDelegate work = new ComputeTaxesDelegate ( EntryPoint.ComputeTaxes ); 
IAsyncResult pendingOp = work.Beginlnvoke( 2004, null, null ); // Выполнить другую полезную работу. 
Thread.Sleep ( 3000 ) ; // Завершить асинхронный вызов.
Console.WriteLine ( "Ожидание завершения операции." ); 
Decimal result = work.Endlnvoke( pendingOp ); 
Console.WriteLine ( "Сумма налогов: {0}", result );
}

Первое, что заметно в шаблоне — это то, что сигнатура метода Beginlnvoke не соответствует методу Invoke. Это объясняется тем, что нужен некоторый способ идентификации определенного элемента работы, который только что был отложен вызовом Beginlnvoke.

Таким образом, Beginlnvoke возвращает ссылку на объект, реализующий интерфейс IAsyncResult. Этот объект подобен cookie-набору, который сохраняется для идентификации выполняющегося элемента работы. Через методы интерфейса IAsyncResult можно проверять состояние операции, например, ее готовность. Чуть дальше этот интерфейс рассматривается более подробно; там же описаны два параметра, добавленные в конец объявления метода Beginlnvoke, вместо которых было подставлено null.

Когда поток, запрошенный для выполнения операции, завершит свою работу, он вызывает Endlnvoke на делегате. Однако, поскольку метод должен иметь способ идентификации асинхронной операции, результат которой нужно получить, ему должен быть передан объект, полученный из метода Beginlnvoke. В данном примере вызов Endlnvoke будет заблокирован на некоторое время, до окончания операции.

Если в процессе асинхронного выполнения в пуле потоков целевого кода делегата будет сгенерировано исключение, оно сгенерируется повторно, когда инициирующий поток вызовет Endlnvoke.

Отчасти красота асинхронного шаблона IOU, реализованного делегатом, заключается в том, что вызванный код даже может не знать о том, что он вызван асинхронно.

Конечно, вызывать метод асинхронно не слишком практично, если он для этого не был предназначен, если он затрагивает данные системы, к которым обращаются другие методы, либо если не используются какие-то механизмы синхронизации.

Тем не менее, вся головная боль, связанная с созданием инфраструктуры асинхронного вызова метода, смягчается благодаря делегату, сгенерированному CLR, наряду с пулом потоков, связанным с процессом. Более того, инициатор асинхронного действия даже не обязан знать, как реализовано асинхронное поведение.

Теперь давайте внимательно рассмотрим интерфейс IAsyncResult объекта, возвращенного методом Beginlnvoke. Объявление интерфейса выглядит следующим образом:

1
2
3
4
5
6
7
public interface IAsyncResult
{
Object AsyncState {get;} 
WaitHandle AsyncWaitHandle {get;}
bool CompletedSynchronously {get;} 
bool IsCompleted {get;}
}

В предыдущем примере было решено ждать окончания вычислений, вызвав Endlnvoke. Вместо этого можно было бы ожидать WaitHandle, возвращенного свойством IasyncResult.AsyncWaitHandle перед вызовом Endlnvoke. В любом случае конечный результат был бы таким же. Однако тот факт, что интерфейс IAsyncResult представляет WaitHandle, позволяет при необходимости иметь несколько потоков в системе, ожидающих завершения одного действия.

Два других свойства позволяют проверить, завершена ли операция. Свойство IsCompleted просто возвращает булевское значение, представляющее этот факт. Можно было бы организовать цикл по пулу, периодически проверяющий значение этого флага.

Однако это оказалось бы намного менее эффективно, чем просто ожидание WaitHandle. Но, тем не менее, существует и такой вариант. Второе булевское свойство — это CompletedSynchronously. Шаблон асинхронной обработки в .NET Framework предусматривает возможность вызова Beginlnvoke для запуска работы в синхронном, а не асинхронном режиме. Свойство CompletedSynchronously позволяет определить, если это произойдет.

В текущей реализации CLR никогда этого не делает, когда делегаты вызываются асинхронно, но это может измениться в любое время. Однако, поскольку рекомендуется применять тот же асинхронный шаблон всякий раз, когда проектируется тип, который может быть вызван асинхронно, такая возможность была встроена в шаблон.

Например, предположим, что у вас имеется класс, в котором поддерживается метод для синхронного выполнения обобщенных операций. Если одна из таких операций просто возвращает номер версии класса, это значит, что она завершается быстро, и ее можно выполнять синхронно.

И, наконец, свойство Asynestate интерфейса IAsyncResult позволяет присоединить к асинхронному вызову любой тип специфичных контекстных данных. Это — второй из двух дополнительных параметров, добавленных в конец сигнатуры Beginlnvoke.

В предыдущем примере был передан null, потому что этот параметр был не нужен. Хотя результат операции получается через вызов Endlnvoke, таким образом, блокируя поток, можно было бы предпочесть получать уведомление о завершении операции через обратный вызов. Рассмотрим следующую модификацию показанного ранее примера:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
using System;
using System.Threading;
public class EntryPoint { // Объявление делегата для асинхронного вызова.
private delegate Decimal ComputeTaxesDelegate( int year ); // Метод для вычисления налогов.
private static Decimal ComputeTaxes( int year )  {
Console.WriteLine ( "Вычисление налогов в потоке {0}", Thread.CurrentThread.ManagedThreadld ); // Здесь происходит длительное вычисление. 
Thread.Sleep( 6000 ) ;
return 4356.98M;
}
private static void TaxesComputed( IAsyncResult ar )  { // Теперь получим результат. 
ComputeTaxesDelegate work = (ComputeTaxesDelegate) ar.AsyncState; 
Decimal result = work.Endlnvoke( ar ); 
Console.WriteLine ( "Сумма налогов: {0}", result );
}
static void Main()  { // Выполним асинхронный вызов, создав делегат и вызвав его. 
ComputeTaxesDelegate work = new ComputeTaxesDelegate( EntryPoint.ComputeTaxes );
work.Beginlnvoke( 2004, new AsyncCallback( EntryPoint.TaxesComputed), work ); // Выполнить другую полезную работу. 
Thread.Sleep ( 3000 ) ; // Завершить асинхронный вызов.
Console.WriteLine ( "Ожидание завершения операции." ); // Sleep используется только для примера! 
//В действительности необходимо ожидать событие,
// чтобы получить результат или что-то подобное. 
Thread.Sleep ( 4000 ) ;
}

Теперь вместо обращения к Endlnvoke из потока, который вызвал Beginlnvoke, пул потоков должен вызывать метод TaxesComputed через экземпляр делегата AsyncCallback, который передан в предпоследнем параметре Beginlnvoke. Делегат IAsyncCallback ссылается на метод, принимающий один параметр типа IAsyncResult и возвращающий void.

Использование обратного вызова для обработки результата довершает шаблон асинхронной обработки, позволяя потоку, запустившему операцию, продолжать работу, без необходимости явного ожидания рабочего потока.

Обратите внимание, что метод обратного вызова TaxesComputed для получения результата асинхронного вызова все равно должен вызвать Endlnvoke. Чтобы сделать это, однако, ему нужен экземпляр делегата. И здесь на помощь приходит объект контекста IAsyncResult.AsyncState.

В рассматриваемом примере IAsyncResult. AsyncState инициализируется так, чтобы указывать на делегат, который передается в последнем параметре Beginlnvoke. Главный поток, вызывающий Beginlnvoke, не нуждается в объекте, возвращенном этим вызовом, поскольку он никогда в действительности не опрашивает состояния операции, как и не ожидает явно ее завершения.

Задержка с помощью Sleep добавлена в конец метода Main просто для примера. Помните, что все потоки в пуле выполняются как фоновые. Поэтому если не ожидать в этой точке, то процесс завершится задолго до завершения операций.

Если вы хотите, чтобы асинхронная работа проходила в потоке переднего плана, лучше создайте новый класс, реализующий асинхронный шаблон Beginlnvoke/Endlnvoke и используйте поток переднего плана для выполнения работы. Никогда не меняйте фоновый статус потока в пуле через свойство IsBackgroundB текущем потоке. Даже если попробовать это сделать, это не даст никакого эффекта.

Важно понимать, что когда выполняется асинхронный код и осуществляется обратный вызов, работа происходит в контексте произвольного потока. Нельзя делать какие-либо предположения относительно того, какой поток будет выполнять код. Во многих отношениях эта техника подобна разработке драйверов на платформе Windows.

Применение обратных вызовов для обработки завершения элемента работы очень удобно при создании серверного процесса, обрабатывающего входящие запросы. Например, предположим, что есть процесс, прослушивающий определенный порт TCP/IP на предмет входящих запросов. Когда он получает такой запрос, то отвечает на него, пересылая запрошенную информацию.

Чтобы достичь максимальной эффективности, эти операции определенно должны выполняться асинхронно. Рассмотрим следующий пример, который прослушивает порт 1234 и, получив что-либо, просто отвечает отправкой строки “Hello World!”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
using System; 
using System.Text; 
using System.Threading; 
using System.Net; 
using System.Net.Sockets; 
public class EntryPoint {
private const int ConnectQueueLength = 4;
private const int ListenPort = 1234;
static void ListenForRequests()  { 
Socket listenSock = new Socket( AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp ); 
listenSock.Bind( new IPEndPoint(IPAddress.Any, ListenPort)  ); 
listenSock.Listen ( ConnectQueueLength ); 
while ( true )  {
using ( Socket newConnection = listenSock.Accept () )  { 
// Отправить данные, 
byte[] msg =
Encoding.UTF8.GetBytes ( "Hello World!" ); 
newConnection.Send ( msg, SocketFlags.None );
}
static void Main()  {
// Запустить прослушивающий поток. 
Thread listener = new Thread(new ThreadStart(EntryPoint.ListenForRequests) ) ; 
listener.IsBackground = true;
listener.Start();
Console.WriteLine( "Нажмите <Enter> для завершения" ); 
Console.ReadLine();
}

В коде примера создается дополнительный поток, который просто в цикле прослушивает входящие соединения и обслуживает их по мере поступления. С этим подходом связано много проблем, одна из которых состоит в том, что входящие соединения обрабатывает только один поток. Если соединения начнут поступать очень часто, поток быстро окажется перегруженным. Например, реальный веб-сервер может получать тысячи запросов в секунду.

Класс Socket реализует шаблон асинхронных вызовов .NET Framework. Используя этот шаблон, можно улучшить сервер, обслуживая входящие запросы с применением пула потоков, как показано ниже:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
using System; 
using System.Text; 
using System.Threading; 
using System.Net; 
using System.Net.Sockets; 
public class EntryPoint {
private const int ConnectQueueLength = 4;
private const int ListenPort = 1234;
static void ListenForRequests()  {
Socket listenSock = new Socket( AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp ); 
listenSock.Bind( new IPEndPoint(IPAddress.Any, ListenPort) ) ; 
listenSock.Listen( ConnectQueueLength );
while ( true )  {
Socket newConnection = listenSock.Accept();
byte[] msg = Encoding.UTF8.GetBytes( "Hello World!" );
newConnection.BeginSend( msg, 0, msg.Length, SocketFlags.None, null, null ) ;
}
static void Main()  {
// Запустить прослушивающий поток. 
Thread listener = new Thread(new ThreadStart(EntryPoint.ListenForRequests)  ); 
listener.IsBackground = true; 
listener.Start();
Console.WriteLine( "Нажмите <Enter> для завершения" ); 
Console.ReadLine();

Сервер стал немного эффективнее, поскольку теперь отправка данных для входящих соединений происходит в потоке, взятом из пула.

Этот код также демонстрирует стратегию “сделал и забыл”, характерную для использования асинхронного шаблона. Вызывающий код не заинтересован в возврате объекта, реализующего IAsyncResult, как и не заинтересован в установке метода обратного вызова, который должен быть вызван при завершении работы. Этот вызов в стиле “сделал и забыл” — отважная попытка повысить эффективность сервера. Однако результат неудовлетворителен, поскольку здесь пропал оператор using из предыдущей версии сервера.

Объект Socket не закрывается вовремя, и удаленные соединения остаются открытыми до тех пор, пока сборщик мусора не решит финализировать объекты Socket. Поэтому асинхронный вызов должен включать обратный вызов, чтобы закрыть соединение. Для прослушивающего потока было бы бессмысленно ожидать метода Ends end, поскольку это вернуло бы ту же неэффективность, которая существовала ранее.

Полученный из запуска асинхронной операции объект, реализующий IAsyncResult, должен реализовывать свойство IAsyncResult. AsyncWaitHandle, чтобы позволить пользователям получать дескриптор, на котором потом можно организовать ожидание. В случае Socket возвращается экземпляр OverlappedAsyncResult.

Этот класс в конечном итоге наследуется от System.Net.LazyAsyncResult. Он в действительности не создает события для ожидания до тех пор, пока к нему не произойдет обращение через свойство IAsyncResult.AsyncWaitHandle. Такое “ленивое” создание избавляет от ненужного создания объекта блокировки, который большую часть времени остается неиспользованным.

К тому же на объект OverlappedAsyncResult возлагается обязанность по окончании работы закрывать дескриптор операционной системы.

Однако перед тем как перейти к обратному вызову, присмотримся к потоку слушателя. Все, что он делает — ожидает входящие запросы. Не будет ли более эффективным организовать прослушивание тоже через потоки из пула? Разумеется!

Ниже приведен новый усовершенствованный сервер “Hello World!”, который в полной мере использует пул потоков процесса:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
using System;
using System.Text;
using System.Threading;
using System.Net;
using System.Net.Sockets;
public class EntryPoint {
private const int ConnectQueueLength = 4; 
private const int ListenPort = 1234; 
private const int MaxConnectionHandlers = 4; 
private static void HandleConnection( IAsyncResult ar )  { 
Socket listener = (Socket) ar.AsyncState; 
Socket newConnection = listener.EndAccept( ar ); 
byte[] msg = Encoding.UTF8.GetBytes( "Hello World!" ); 
newConnection.BeginSend( msg, 0, msg.Length, SocketFlags.None, new AsyncCallback(EntryPoint.CloseConnection) , newConnection ) ; // Поместить другой запрос в очередь, 
listener.BeginAccept(new AsyncCallback(EntryPoint.HandleConnection) , listener );
}
static void CloseConnection ( IAsyncResult ar )  { 
Socket theSocket = (Socket) ar.AsyncState; 
theSocket.Close() ;
}
static void Main()  { 
Socket listenSock = new Socket ( AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp ); 
listenSock.Bind( new IPEndPoint(IPAddress.Any, ListenPort) ); 
listenSock.Listen( ConnectQueueLength );
// Ожидать дескрипторов соединений, 
for ( int i = 0; 
i < MaxConnectionHandlers; 
++i )  { 
listenSock.BeginAccept( new AsyncCallback(EntryPoint.HandleConnection), listenSock );
}
Console.WriteLine( "Нажмите <Enter> для завершения" ); 
Console.ReadLine();
}

Теперь сервер “Hello World!” полностью использует пул потоков процесса и может обрабатывать входящие клиентские запросы с максимальной степенью параллельности. Кстати, протестировать соединение весьма просто, если использовать встроенный в Windows клиент Telnet.

Просто запустите Telnet из командной строки или из диалогового окна запуска программ, и введите команду для подключения к порту 1234 локальной машины при запущенном в другом командном окне процессе сервера: Microsoft Telnet> open 127.0.0.1 1234

Вы можете следить за любыми ответами на эту запись через RSS 2.0 ленту. Вы можете оставить ответ, или trackback с вашего собственного сайта.

Оставьте отзыв

XHTML: Вы можете использовать следующие теги: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

 
Rambler's Top100