//--------------------------------------------------------------------------
//
// Copyright (c) BUSHUOSX. All rights reserved.
//
// File: HttpClientHelper.cs
//
// Version:1.0.0.1
//
// Datetime:20170815
//
//--------------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace BUSHUOSX.HTTP
{
class HttpClientHelper
{
/// <summary>
/// http task state
/// </summary>
class TASKSTATE
{
public object userState;
public HttpClient httpClient;
public TASKSTATE(object state,HttpClient client)
{
userState = state;
httpClient = client;
}
}
/// <summary>
/// 不为Null时,任意http请求完成、取消、失败时回调
/// </summary>
Action<Task, object> _callbackOnAnyTaskComplited;
/// <summary>
/// 并发控制信号
/// </summary>
SemaphoreSlim _semaphore;
/// <summary>
/// 工作标记
/// </summary>
bool _isWorking;
/// <summary>
/// 共用httpclient。为Null时,为每个http请求new httpclient。
/// </summary>
HttpClient _httpClient;
/// <summary>
/// http request task 记录
/// </summary>
readonly ConcurrentDictionary<Task, TASKSTATE> _workingTasks = new ConcurrentDictionary<Task, TASKSTATE>();
/// <summary>
/// 启动的task计数。
/// </summary>
long _workingCount;//原子操作
/// <summary>
/// 任务取消标记
/// </summary>
CancellationTokenSource _cancelSource;
/// <summary>
/// 工作器监视线程
/// </summary>
Task _worker;
/// <summary>
/// 禁止自动启动任务
/// </summary>
bool _autoStartWorking = true;
/// <summary>
/// Http Request最大并发数
/// </summary>
public int MaxConcurrencyLevel { get; }
/// <summary>
/// Http Request超时设置,毫秒。小于等于0时,使用默认值。
/// </summary>
public int TimeoutMillisecond { get; }
/// <summary>
/// 可控制并发数的HttpClient
/// </summary>
/// <param name="concurrencyLevel">Http Request最大并发数</param>
/// <param name="callbackOnAnyTaskComplited">不为Null时,任意http请求完成、取消、失败时回调。注意:如果使用,请保证方法的线程安全。</param>
/// <param name="timeoutMillisecond">Http Request超时设置,毫秒。小于等于0时,使用默认值。</param>
/// <param name="httpClient">留空则每个任务使用一个新的HttpClient对象</param>
public HttpClientHelper(int concurrencyLevel, Action<Task, object> callbackOnAnyTaskComplited = null, int timeoutMillisecond = 0, HttpClient httpClient = null)
{
if (concurrencyLevel < 1)
{
throw new ArgumentOutOfRangeException("concurrencyLevel < 1");
}
MaxConcurrencyLevel = concurrencyLevel;
_callbackOnAnyTaskComplited = callbackOnAnyTaskComplited;
TimeoutMillisecond = timeoutMillisecond;
_httpClient = httpClient;
}
/// <summary>
/// 返回已发出的http请求数
/// </summary>
/// <returns></returns>
public int GetWorkingTasksCount()
{
return _workingTasks.Count;
}
//public KeyValuePair<Task, object>[] GetTasks()
//{
// return _workingTasks.ToArray();
//}
/// <summary>
/// 停止工作器
/// </summary>
/// <param name="cancelPendingRequests">取消已发起的所有http请求</param>
public void Stop(bool cancelPendingRequests = false)
{
_autoStartWorking = false;
notifyEndWorker();
_cancelSource.Cancel();
if (_httpClient != null)
{
_httpClient.CancelPendingRequests();
}
else
{
foreach (var item in _workingTasks)
{
if (item.Value.httpClient != null)
{
item.Value.httpClient.CancelPendingRequests();
}
}
}
_workingTasks.Clear();
}
/// <summary>
/// 重新启动工作器。stop之后使用。
/// </summary>
public void ReStart()
{
_autoStartWorking = true;
}
/// <summary>
/// 阻塞直到所有http任务完成。
/// </summary>
public void WaitWorkComplited()
{
if (_worker != null)
{
_worker.Wait();
}
}
/// <summary>
/// 通过 http client 获取 string。超过并发数时会阻塞,直到有http request完成。
/// </summary>
/// <param name="address">http url string</param>
/// <param name="state">用户状态。用于callbackOnAnyTaskComplited回调。</param>
/// <returns></returns>
public Task<string> GetString(string address, object state = null)
{
Interlocked.Increment(ref _workingCount);
notifyStartWorker();
if (!_isWorking)
{
Interlocked.Decrement(ref _workingCount);
return null;
}
_semaphore.Wait(_cancelSource.Token);
TASKSTATE tstate = new TASKSTATE(state, null);
HttpClient client;
try
{
if (_httpClient != null)
{
client = _httpClient;
}
else
{
client = new HttpClient();
if (TimeoutMillisecond > 0)
{
client.Timeout = TimeSpan.FromMilliseconds(TimeoutMillisecond);
}
tstate.httpClient = client;
}
var t = client.GetStringAsync(address);
_workingTasks[t] = tstate;
t.ContinueWith(anyTaskComplited,
_cancelSource.Token,
TaskContinuationOptions.OnlyOnRanToCompletion & TaskContinuationOptions.NotOnRanToCompletion,
TaskScheduler.Current);
return t;
}
catch (HttpRequestException e)
{
Interlocked.Decrement(ref _workingCount);
_semaphore.Release();
throw new HttpRequestException(e.Message, e.InnerException);
}
}
/// <summary>
/// 任意http task完成时回调
/// </summary>
/// <param name="task"></param>
private void anyTaskComplited(Task task)
{
TASKSTATE tstate;
_workingTasks.TryRemove(task, out tstate);
//Debug.Assert(tstate.userState == state);
_callbackOnAnyTaskComplited?.Invoke(task, tstate.userState);
_semaphore.Release();
Interlocked.Decrement(ref _workingCount);
}
/// <summary>
/// 工作器初始化
/// </summary>
private void notifyStartWorker()
{
if (_isWorking) return;
if (!_autoStartWorking) return;
//初始化
_isWorking = true;
Debug.WriteLine("httpClientWorker启动……");
_semaphore = new SemaphoreSlim(MaxConcurrencyLevel, MaxConcurrencyLevel);
_cancelSource = new CancellationTokenSource();
//_workingCount = 0;
if (_httpClient != null && TimeoutMillisecond > 0)
{
_httpClient.Timeout = TimeSpan.FromMilliseconds(TimeoutMillisecond);
}
_worker = Task.Run(new Action(workerMonitor), _cancelSource.Token);
_worker.ContinueWith(a=>{ notifyEndWorker(); },TaskContinuationOptions.NotOnRanToCompletion & TaskContinuationOptions.OnlyOnRanToCompletion);
}
/// <summary>
/// 工作器结束清理工作
/// </summary>
private void notifyEndWorker()
{
if (_isWorking)
{
_isWorking = false;
Debug.WriteLine("httpClientWorker结束……");
}
}
/// <summary>
/// 任务工作器监视器
/// </summary>
private void workerMonitor()
{
do
{
Thread.Sleep(1000);
} while (_workingTasks.Count > 0 || Interlocked.Read(ref _workingCount) > 0);
}
}
}