自制线程池2(限制最长执行时间、回调函数)
在 自制线程1 中,我们实现了一个,用于低优先级,可设定最大执行线程,可在执行任务过程或是在队列中的任务时销毁的线程不耻,主要应用于需要花费时间较长的任务的工作。
任务执行的时间长了,说不定因为某些原因,线程执行的任务就死在那了,针对这点加入这样一个功能,记录这个任务执行开始的时间,当它执行的时间如果超过了1h,我们就认为这个工作超过我们的预期了,将它Abort掉。并掉用回调函数ErrCallback,通知客户进行后续的处理工作。如果正常执行可以有一个SuccCallback来处理完成后的操作,当然两个回调是非强制性的。另外执行的任务可以返回一个结果值,改变了build-in threadpool 的WaitCallback没有返回值的设计
在设计线程池的功能是我曾经考虑单独为每个任务设定超时执行的时间,可是这样的话似乎要为每个任务创建一个定时器来监控任务的执行时间上限是否到了( 可能是我没有想到好的解决方案,高手支招哈 ),基于这种情况,大多数任务执行的时间都非常短,而且也不会出现特别的状况,实际当中,只有极个别的任务会因为一些特殊情况长期未完成任务,因此我考虑了使用一个线程来轮询,interval time设置的相关长一些,这样来清理异常任务占用的线程。而且这样实现起来也相对简单一些。下面给出基于自制线程1篇后的改进版线程池。
Code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Rhino.Commons;
namespace ThreadPool
{
public delegate object WaitCallback2( object state);
public delegate void SuccCallback( object state, object result);
public delegate void ErrCallback( object state);
/// <summary>
/// 此线程池的作用是将某一类特殊的任务交给此线程池执行,
/// 可以设定该线程池的最大线程数,
/// 这类线程池的优点时,占用的资源少,优先级低,
/// 适合于执行任务需要长期执行,不考虑时间因素的任务
/// 同时根据在传入线程池时的标记key,可以Aborted指定任务,
/// 若该任务正在执行或尚在执行队列中
/// </summary>
public static class MyThreadPool
{
static object obj = new object ();
static AutoResetEvent wait = new AutoResetEvent( false );
static AutoResetEvent wait2 = new AutoResetEvent( false );
static MyThreadPool()
{
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000 , true );
SetMaxThreadNum( 1 );
SetMaxExecTime( true , 10000 );
}
/// <summary>
/// 设定单线程允许执行任务的最长时间,该方法不能在运行时改变,须事前设定
/// </summary>
/// <param name="b"></param>
/// <param name="time"></param>
private static void SetMaxExecTime( bool b, int time)
{
IsLimitedExecTime = b;
MaxLimitedTime = time;
if (IsLimitedExecTime)
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, new WaitOrTimerCallback(bb), state,
MaxLimitedTime, true );
}
private static void bb( object state, bool timedOut)
{
lock (obj)
{
Dictionary < string ,WorkerThread > temp = new Dictionary < string , WorkerThread > ();
foreach (var kvp in dict)
{
if (DateTime.Now.Subtract(kvp.Value.StartExecTime).TotalMilliseconds > MaxLimitedTime)
{
temp.Add(kvp.Key,kvp.Value);
}
}
foreach (var kvp in temp)
{
Aborted(kvp.Key);
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, new WaitOrTimerCallback(bb), state, MaxLimitedTime, false );
}
}
public static int MaxLimitedTime { get ; set ; }
public static bool IsLimitedExecTime { get ; set ; }
private static void aa( object state, bool timedOut)
{
lock (obj)
{
// 判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
List < string > removeKey = new List < string > ();
List < WorkerThread > newTask = new List < WorkerThread > ();
// Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
foreach (var kvp in dict)
{ // kvp.Value.ThreadState == ThreadState.Unstarted ||
if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)
{
// dict.Remove(kvp.Key); // cancle because of lock
WorkerThread a = queue.FirstOrDefault();
if (a != null )
{
removeKey.Add(kvp.Key);
// addDict.Add(a.Key, kvp.Value.Change(a));
newTask.Add(kvp.Value.Change(a));
queue.RemoveAt( 0 );
// dict.Add(a.Key, kvp.Value.Change(a)); // cancle because of lock
// 将参数加到线程中,并改变线程的状态
// dict[a.Key].Thread.Resume();
}
else
break ;
// else
// {
// System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
// 2000, true);
// return;
// }
}
}
removeKey.ForEach(t => dict.Remove(t));
newTask.ForEach(t =>
{
t.StartExecTime = DateTime.Now;
dict.Add(t.Key, t);
t.Thread.Resume();
});
while (queue.Count > 0 && dict.Count < MaxThreadNum)
{
// 未到线程池最大池程数时,增加线程
WorkerThread b = queue.FirstOrDefault();
if (b != null )
{
queue.RemoveAt( 0 );
// Thread thd = new Thread(new ThreadStart(b.Exec));
// thd.Priority = ThreadPriority.Lowest;
// dict.Add(b.Key, thd);
// thd.Start();
WorkerThread wt = new WorkerThread();
wt.Start(b);
dict.Add(wt.Key, wt);
wt.Thread.Start();
// 将参数加到线程中,并改变线程的状态
}
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000 ,
true );
}
}
// private static int _maxThreadNum = 1;
public static int MaxThreadNum
{
// get { return _maxThreadNum; }
// set { _maxThreadNum = value; }
get ; set ;
}
public static void SetMaxThreadNum( int num)
{
if (num < 1 )
num = 1 ;
MaxThreadNum = num;
}
/// <summary>
/// 任务执行队列
/// </summary>
// static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
static List < WorkerThread > queue = new List < WorkerThread > ();
/// <summary>
/// 目前暂定为只使用一个线程,以免耗近资源
/// </summary>
static Dictionary < string , WorkerThread > dict = new Dictionary < string , WorkerThread > ( 1 );
private static object state;
private static WorkerThread FindSpecificWorkerThreadByKey( string key)
{
WorkerThread wt;
dict.TryGetValue(key, out wt);
return wt;
}
public static void Aborted( string key)
{
// lock (obj)
// {
#region old way now extract method FindSpecificWorkerThreadByKey to split this
// WorkerThread v;
// if (dict.TryGetValue(key, out v))
// {
// v.Thread.Abort();
// // 在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
// // 过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
// // ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
// // 阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
// // 如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
// // 迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
// // 用Join方法。
// v.Thread.Join();
// dict.Remove(key);
// }
#endregion
WorkerThread v = FindSpecificWorkerThreadByKey(key);
// 没有发现指定key的线程表示,对应该key的任务已经执行完了,不需要再来取消该任务
// 或者指的key的线程虽然还在但它的状态已变为suspended,任务已完成,将等待下一个任务,实际不需要终止该线程
// 只有但指定的key的任务在执行时才删除
if (v != null && v.Thread.ThreadState != ThreadState.Suspended)
{
dict.Remove(key);
/*
在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
用Join方法。
*/
v.Thread.Abort();
v.Thread.Join();
if (v.ErrorCallback != null )
v.ErrorCallback(v.State);
}
// 任务如果还在队列中则删除该任务
int index = queue.FindIndex(t => t.Key == key);
if (index >- 1 )
queue.RemoveAt(index);
wait.Set();
// }
}
public static void QueueUserWorkItem(WaitCallback2 callback, object state, string key,SuccCallback succ,ErrCallback err)
{
WorkerThread p = new WorkerThread()
{
WaitCallback = callback,
State = state,
Key = key,
ErrorCallback = err,
SuccessCallback = succ
};
// queue.Enqueue(p);
queue.Add(p);
wait.Set();
}
public static void QueueUserWorkItem(WaitCallback2 callback, object state,SuccCallback succ,ErrCallback err)
{
QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString(),succ,err);
}
public static void QueueUserWorkItem(WaitCallback2 callback, object state, string key)
{
// WorkerThread p = new WorkerThread()
// {
// WaitCallback = callback,
// State = state,
// Key = key
// };
/// /queue.Enqueue(p);
// queue.Add(p);
// wait.Set();
QueueUserWorkItem(callback, state, key, null , null );
}
public static void QueueUserWorkItem(WaitCallback2 callback, object state)
{
QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString());
}
}
public class WorkerThread
{
public DateTime StartExecTime { get ; set ; }
public Thread Thread { get ; set ; }
public string Key { get ; set ; }
public WaitCallback2 WaitCallback { get ; set ; }
public SuccCallback SuccessCallback { get ; set ; }
public ErrCallback ErrorCallback { get ; set ; }
public Object State { get ; set ; }
public void Exec()
{
while ( true )
{
if ( this .SuccessCallback != null )
this .SuccessCallback( this .State, this .WaitCallback( this .State));
else
this .WaitCallback( this .State);
this .Thread.Suspend();
}
}
public WorkerThread Change(WorkerThread wt)
{
this .Key = wt.Key;
this .WaitCallback = wt.WaitCallback;
this .State = wt.State;
this .StartExecTime = wt.StartExecTime;
this .ErrorCallback = wt.ErrorCallback;
this .SuccessCallback = wt.SuccessCallback;
return this ;
}
public void Start(WorkerThread wt)
{
this .Change(wt);
this .Thread = new Thread( new ThreadStart( this .Exec));
this .Thread.Priority = ThreadPriority.Lowest;
}
// public void Start(WaitCallback callback,Object state)
// {
// this.WaitCallback = callback;
// this.State = state;
// if(this.Thread==null){
// this.Thread = new Thread(new ThreadStart(this.Exec));
// this.Thread.Priority = ThreadPriority.Lowest;
// this.Thread.IsBackground = true;
// this.Thread.Start();
// return;
// }
// if(this.Thread.ThreadState==ThreadState.Suspended)
// {
// this.Thread.Resume();
// }
// }
}
}
下面是测试用例
Code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using ThreadPool;
namespace ThreadPoolTest
{
class Class6
{
static void Main( string [] args)
{
object state1 = " beijing " ;
object state2 = " zhejiang " ;
MyThreadPool.QueueUserWorkItem( new WaitCallback2(ExecMethod1),state1, new SuccCallback(SuccMethod), new ErrCallback(ErrMethod));
MyThreadPool.QueueUserWorkItem( new WaitCallback2(ExecMethod2), state2, new SuccCallback(SuccMethod), new ErrCallback(ErrMethod));
Console.ReadLine();
}
private static object ExecMethod2( object state)
{
while ( true )
{
Thread.Sleep( 1500 );
Console.WriteLine( " welcome to " + state);
}
return null ;
}
private static void ErrMethod( object state)
{
Console.WriteLine( " ErrMethod " );
Console.WriteLine( " transform params:{0} " ,state);
}
private static void SuccMethod( object state, object result)
{
Console.WriteLine( " SuccMethod " );
Console.WriteLine( " hey erick,you are now in {0},{1} " ,state,result);
}
private static object ExecMethod1( object state)
{
Console.WriteLine( " ExecMethod1 " );
return " welcome to " + state;
}
}
}
目前这个线程池还有这么三个问题,
1.初始时是没有线程的,当加入任务时根据需要会逐步增加到最大线程,可是之后即使线程空闲,也不会将闲置的线程不abort掉,目前还没想到该怎么解决请高手支招啊,
2.代码还待严重重构,目前先解决功能性的问题先。
3.测试不够严谨
请听我下回分解
PS:大家只想用一般性的线程池功能的话,推荐使用 smartthreadpool
查看更多关于自制线程池2(限制最长执行时间、回调函数)的详细内容...