好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

自制线程池2(限制最长执行时间、回调函数)

自制线程池2(限制最长执行时间、回调函数)

  在 自制线程1 中,我们实现了一个,用于低优先级,可设定最大执行线程,可在执行任务过程或是在队列中的任务时销毁的线程不耻,主要应用于需要花费时间较长的任务的工作。

  任务执行的时间长了,说不定因为某些原因,线程执行的任务就死在那了,针对这点加入这样一个功能,记录这个任务执行开始的时间,当它执行的时间如果超过了1h,我们就认为这个工作超过我们的预期了,将它Abort掉。并掉用回调函数ErrCallback,通知客户进行后续的处理工作。如果正常执行可以有一个SuccCallback来处理完成后的操作,当然两个回调是非强制性的。另外执行的任务可以返回一个结果值,改变了build-in threadpool 的WaitCallback没有返回值的设计

     在设计线程池的功能是我曾经考虑单独为每个任务设定超时执行的时间,可是这样的话似乎要为每个任务创建一个定时器来监控任务的执行时间上限是否到了( 可能是我没有想到好的解决方案,高手支招哈 ),基于这种情况,大多数任务执行的时间都非常短,而且也不会出现特别的状况,实际当中,只有极个别的任务会因为一些特殊情况长期未完成任务,因此我考虑了使用一个线程来轮询,interval time设置的相关长一些,这样来清理异常任务占用的线程。而且这样实现起来也相对简单一些。下面给出基于自制线程1篇后的改进版线程池。

Code
using  System;
using  System.Collections.Generic;
using  System.Linq;
using  System.Text;
using  System.Threading;
using  Rhino.Commons;

namespace  ThreadPool
{
     public   delegate   object  WaitCallback2( object  state);

     public   delegate   void  SuccCallback( object  state, object  result);

     public   delegate   void  ErrCallback( object  state);
     ///   <summary>
     ///  此线程池的作用是将某一类特殊的任务交给此线程池执行,
     ///  可以设定该线程池的最大线程数,
     ///  这类线程池的优点时,占用的资源少,优先级低,
     ///  适合于执行任务需要长期执行,不考虑时间因素的任务
     ///  同时根据在传入线程池时的标记key,可以Aborted指定任务,
     ///  若该任务正在执行或尚在执行队列中
     ///   </summary>
     public   static   class  MyThreadPool
    {
         static   object  obj  =   new   object ();
         static  AutoResetEvent wait  =   new  AutoResetEvent( false );
         static  AutoResetEvent wait2  =   new  AutoResetEvent( false );
         static  MyThreadPool()
        {
            System.Threading.ThreadPool.RegisterWaitForSingleObject(wait,  new  WaitOrTimerCallback(aa), state,  2000 , true );
            SetMaxThreadNum( 1 );
            SetMaxExecTime( true ,  10000 );
        }
         ///   <summary>
         ///  设定单线程允许执行任务的最长时间,该方法不能在运行时改变,须事前设定
         ///   </summary>
         ///   <param name="b"></param>
         ///   <param name="time"></param>
         private   static   void  SetMaxExecTime( bool  b,  int  time)
        {
            IsLimitedExecTime  =  b;
            MaxLimitedTime  =  time;
             if  (IsLimitedExecTime)
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2,  new  WaitOrTimerCallback(bb), state,
                                                                        MaxLimitedTime,  true );

        }

         private   static   void  bb( object  state,  bool  timedOut)
        {
             lock (obj)
            {
                Dictionary < string ,WorkerThread >  temp = new  Dictionary < string , WorkerThread > ();
                 foreach (var kvp  in  dict)
                {
                     if (DateTime.Now.Subtract(kvp.Value.StartExecTime).TotalMilliseconds > MaxLimitedTime)
                    {
                        temp.Add(kvp.Key,kvp.Value);
                    }
                }
                 foreach (var kvp  in  temp)
                {
                    Aborted(kvp.Key);
                }
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2,  new  WaitOrTimerCallback(bb), state, MaxLimitedTime,  false );
            }
        }

         public   static   int  MaxLimitedTime {  get ;  set ; }
         public   static   bool  IsLimitedExecTime {  get ;  set ; }
         private   static   void  aa( object  state,  bool  timedOut)
        {
             lock  (obj)
            {
                 // 判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
                List < string >  removeKey  =   new  List < string > ();
                List < WorkerThread >  newTask = new  List < WorkerThread > ();
                 // Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
                 foreach  (var kvp  in  dict)
                { // kvp.Value.ThreadState == ThreadState.Unstarted || 
                     if  (kvp.Value.Thread.ThreadState  ==  ThreadState.Suspended)
                    {
                         // dict.Remove(kvp.Key); // cancle because of lock

                        WorkerThread a = queue.FirstOrDefault();
                         if  (a  !=   null )
                        {
                            removeKey.Add(kvp.Key);
                             // addDict.Add(a.Key, kvp.Value.Change(a));
                            newTask.Add(kvp.Value.Change(a));

                            queue.RemoveAt( 0 );
                             // dict.Add(a.Key, kvp.Value.Change(a)); // cancle because of lock
                             // 将参数加到线程中,并改变线程的状态
                             // dict[a.Key].Thread.Resume();
                        }
                         else
                             break ;
                         // else
                         // {
                         //     System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
                         //                                                             2000, true);
                         //     return;
                         // }

                    }
                }
                removeKey.ForEach(t => dict.Remove(t));
                newTask.ForEach(t  =>
                {
                    t.StartExecTime  =  DateTime.Now;
                    dict.Add(t.Key, t);
                    t.Thread.Resume();
                });
                 while  (queue.Count  >   0   &&  dict.Count  <  MaxThreadNum)
                {
                     // 未到线程池最大池程数时,增加线程
                    WorkerThread b  =  queue.FirstOrDefault();
                     if  (b != null )
                    {
                        queue.RemoveAt( 0 );
                         // Thread thd = new Thread(new ThreadStart(b.Exec));
                         // thd.Priority = ThreadPriority.Lowest;
                         // dict.Add(b.Key, thd);
                         // thd.Start();
                        WorkerThread wt  =   new  WorkerThread();
                        wt.Start(b);
                        dict.Add(wt.Key, wt);
                        wt.Thread.Start();

                         // 将参数加到线程中,并改变线程的状态
                    }


                }
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait,  new  WaitOrTimerCallback(aa), state,  2000 ,
                                                                         true );
            }
        }


         // private static int _maxThreadNum = 1;
         public   static   int  MaxThreadNum
        {
             // get { return _maxThreadNum; }
             // set { _maxThreadNum = value; }
             get ;  set ;
        }
         public   static   void  SetMaxThreadNum( int  num)
        {
             if  (num  <   1 )
                num  =   1 ;
            MaxThreadNum  =  num;
        }

         ///   <summary>
         ///  任务执行队列
         ///   </summary>
         // static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
        
         static  List < WorkerThread >  queue = new  List < WorkerThread > ();
         ///   <summary>
         ///  目前暂定为只使用一个线程,以免耗近资源
         ///   </summary>
         static  Dictionary < string , WorkerThread >  dict  =   new  Dictionary < string , WorkerThread > ( 1 );

         private   static   object  state;
         private   static  WorkerThread FindSpecificWorkerThreadByKey( string  key)
        {
            WorkerThread wt;
            dict.TryGetValue(key,  out  wt);
             return  wt;
        }
         public   static   void  Aborted( string  key)
        {
             // lock (obj)
             // {
                 #region  old way now extract method FindSpecificWorkerThreadByKey to split this
                 // WorkerThread v;
                 // if (dict.TryGetValue(key, out v))
                 // {
                 //     v.Thread.Abort();
                 //      // 在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
                 //      // 过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
                 //      // ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
                 //      // 阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
                 //      // 如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
                 //      // 迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
                 //      // 用Join方法。
                 //     v.Thread.Join();
                 //     dict.Remove(key);
                 // }
                 #endregion
                WorkerThread v  =  FindSpecificWorkerThreadByKey(key);
                 // 没有发现指定key的线程表示,对应该key的任务已经执行完了,不需要再来取消该任务
                 // 或者指的key的线程虽然还在但它的状态已变为suspended,任务已完成,将等待下一个任务,实际不需要终止该线程
                 // 只有但指定的key的任务在执行时才删除
                 if  (v  !=   null   &&  v.Thread.ThreadState != ThreadState.Suspended)
                {
                    dict.Remove(key);
                     /*
                    在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
                    过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
                    ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
                    阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
                    如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
                    迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
                    用Join方法。
                     */
                    v.Thread.Abort();
                    v.Thread.Join();
                     if  (v.ErrorCallback  !=   null )
                        v.ErrorCallback(v.State);
                    
                }
                 // 任务如果还在队列中则删除该任务
                 int  index  =  queue.FindIndex(t  =>  t.Key  ==  key);
                 if  (index >- 1 )
                    queue.RemoveAt(index);
                wait.Set();
             // }
        }
         public   static   void  QueueUserWorkItem(WaitCallback2 callback,  object  state, string  key,SuccCallback succ,ErrCallback err)
        {
            WorkerThread p  =   new  WorkerThread()
            {
                WaitCallback  =  callback,
                State  =  state,
                Key  =  key,
                ErrorCallback = err,
                SuccessCallback = succ
            };
             // queue.Enqueue(p);
            queue.Add(p);
            wait.Set();
        }
         public   static   void  QueueUserWorkItem(WaitCallback2 callback,  object  state,SuccCallback succ,ErrCallback err)
        {
            QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString(),succ,err);
        }

         public   static   void  QueueUserWorkItem(WaitCallback2 callback,  object  state,  string  key)
        {
             // WorkerThread p = new WorkerThread()
             //             {
             //                 WaitCallback = callback,
             //                 State = state,
             //                 Key = key
             //             };
             /// /queue.Enqueue(p);
             // queue.Add(p);
             // wait.Set();
            QueueUserWorkItem(callback, state, key,  null ,  null );
        }
         public   static   void  QueueUserWorkItem(WaitCallback2 callback, object  state)
        {
            QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString());
        }

    }

public   class  WorkerThread
{
     public  DateTime StartExecTime {  get ;  set ; }
     public  Thread Thread {  get ;  set ; }
     public   string  Key {  get ;  set ; }
     public  WaitCallback2 WaitCallback {  get ;  set ; }
     public  SuccCallback SuccessCallback {  get ;  set ; }
     public  ErrCallback ErrorCallback {  get ;  set ; }
     public  Object State {  get ;  set ; }
     public   void  Exec()
    {
         while  ( true )
        {
             if  ( this .SuccessCallback  !=   null )
                 this .SuccessCallback( this .State, this .WaitCallback( this .State));
             else
                 this .WaitCallback( this .State);
             this .Thread.Suspend();
        }
    }
     public  WorkerThread Change(WorkerThread wt)
    {
         this .Key  =  wt.Key;
         this .WaitCallback  =  wt.WaitCallback;
         this .State  =  wt.State;
         this .StartExecTime  =  wt.StartExecTime;
         this .ErrorCallback  =  wt.ErrorCallback;
         this .SuccessCallback  =  wt.SuccessCallback;
         return   this ;
    }
     public   void  Start(WorkerThread wt)
    {
         this .Change(wt);
         this .Thread  =   new  Thread( new  ThreadStart( this .Exec));
         this .Thread.Priority  =  ThreadPriority.Lowest;
    }

     // public void Start(WaitCallback callback,Object state)
     // {
     //     this.WaitCallback = callback;
     //     this.State = state;
     //     if(this.Thread==null){
     //         this.Thread = new Thread(new ThreadStart(this.Exec));
     //         this.Thread.Priority = ThreadPriority.Lowest;
     //         this.Thread.IsBackground = true;
     //         this.Thread.Start();
     //         return;
     //     }
     //     if(this.Thread.ThreadState==ThreadState.Suspended)
     //     {
     //         this.Thread.Resume();
     //     }
     // }
}
}



下面是测试用例

Code
using  System;
using  System.Collections.Generic;
using  System.Linq;
using  System.Text;
using  System.Threading;
using  ThreadPool;

namespace  ThreadPoolTest
{
     class  Class6
    {
         static   void  Main( string [] args)
        {
             object  state1 = " beijing " ;
             object  state2  =   " zhejiang " ;
            MyThreadPool.QueueUserWorkItem( new  WaitCallback2(ExecMethod1),state1, new  SuccCallback(SuccMethod), new  ErrCallback(ErrMethod));
            MyThreadPool.QueueUserWorkItem( new  WaitCallback2(ExecMethod2), state2,  new  SuccCallback(SuccMethod),  new  ErrCallback(ErrMethod));
            Console.ReadLine();
        }

         private   static   object  ExecMethod2( object  state)
        {
             while ( true )
            {
                Thread.Sleep( 1500 );
                Console.WriteLine( " welcome to "   +  state);                
            }
             return   null ;
        }

         private   static   void  ErrMethod( object  state)
        {
            Console.WriteLine( " ErrMethod " );
            Console.WriteLine( " transform params:{0} " ,state);

        }

         private   static   void  SuccMethod( object  state, object  result)
        {
            Console.WriteLine( " SuccMethod " );
            Console.WriteLine( " hey erick,you are now in {0},{1} " ,state,result);
        }

         private   static   object  ExecMethod1( object  state)
        {
            Console.WriteLine( " ExecMethod1 " );
             return   " welcome to "   +  state;
        }
    }
}

目前这个线程池还有这么三个问题,

1.初始时是没有线程的,当加入任务时根据需要会逐步增加到最大线程,可是之后即使线程空闲,也不会将闲置的线程不abort掉,目前还没想到该怎么解决请高手支招啊,

2.代码还待严重重构,目前先解决功能性的问题先。

3.测试不够严谨

 请听我下回分解

PS:大家只想用一般性的线程池功能的话,推荐使用 smartthreadpool  

查看更多关于自制线程池2(限制最长执行时间、回调函数)的详细内容...

  阅读:37次