Simulate the Task mechanism in .net: Exploring the mysteries of asynchronous programming
Using Task in .net makes it easy to write asynchronous programs. In order to better understand Task and its scheduling mechanism, let’s simulate the implementation of Task in order to understand:
- What is Task
- How Tasks are scheduled
Basic Task simulation implementation
Start with the most basic Task usage
Task.Run(Action action)
The function of this command is to submit the action as a task to the scheduler, and the scheduler will arrange idle threads to process it.
We use Job to simulate Task
public class Job
{
private readonly Action _work;
public Job(Action work) => _work = work;
public JobStatus Status { get; internal set; }
internal protected virtual void Invoke()
{
Status = JobStatus.Running;
_work();
Status = JobStatus.Completed;
}
public void Start(JobScheduler? scheduler = null)
=> (scheduler ?? JobScheduler.Current).QueueJob(this);
public static Job Run(Action work)
{
var job = new Job(work);
job.Start();
return job;
}
}
public enum JobStatus
{
Created,
Scheduled,
Running,
Completed
}
The same static Run method as Task is also defined here, and its usage is similar to Task
Job.Run(() => Console.WriteLine($"Job1, thread:{Thread.CurrentThread.ManagedThreadId}"));
For comparison, the writing method when using Task is as follows, with the addition of the await keyword, which will be discussed later.
await Task.Run(()=>() => Console.WriteLine($"Task1, thread:{Thread.CurrentThread.ManagedThreadId}"));
When the Job.Run method is called, a Job will be created based on the given Action, and then job.Start() will be executed. However, the Job does not start execution immediately. Instead, it is submitted to the scheduler through the QueueJob method, and the scheduler Determine when the Job is executed. When the Job is actually executed, its Invoke method will be called. At this time, the given Action will be executed, and the status of the Job will be modified accordingly, from Running to Completed. To put it simply, the basic working process of .net Task is the same as this rough Job. It can be seen that Task/Job represents an operation with a certain status.
Scheduling based on thread pool
But the execution dependence of Task/Job and the scheduler are simulated here using JobScheduler. .net uses the thread pool-based scheduling strategy by default. We also simulate and implement a ThreadPoolJobScheduler
First, let’s take a look at JobScheduler. As an abstract base class, its QueueJob method will be implemented by a specific scheduler (ThreadPoolJobScheduler):
public abstract class JobScheduler
{
public abstract void QueueJob(Job job);
public static JobScheduler Current { get; set; } = new ThreadPoolJobScheduler();
}
The QueueJob implemented by ThreadPoolJobScheduler is as follows:
public class ThreadPoolJobScheduler : JobScheduler
{
public override void QueueJob(Job job)
{
job.Status = JobStatus.Scheduled;
var executionContext = ExecutionContext.Capture();
ThreadPool.QueueUserWorkItem(_ => ExecutionContext.Run(executionContext!,
_ => job.Invoke(), null));
}
}
ThreadPoolJobScheduler will submit the job to the thread pool and set the job status to Scheduled.
Use the specified thread for scheduling
The Current attribute of JobScheduler is set to thread-based scheduling by default. If there is another scheduler, it can be replaced, but why should it be replaced? This starts with the limitations of thread-based scheduling. For some tasks with higher priority, adopting this strategy may not be able to meet the needs. For example, when threads are busy, new tasks may not be executed for a long time. For this situation, .net can solve it by setting TaskCreationOptions.LongRunning. For analysis, first use a custom scheduler to solve this problem:
public class DedicatedThreadJobScheduler : JobScheduler
{
private readonly BlockingCollection _queues=new();
private readonly Thread[] _threads;
public DedicatedThreadJobScheduler(int threadCount)
{
_threads=new Thread[threadCount];
for(int index=0; indexthread.Start());
void Invoke(object? state){
while(true){
_queues.Take().Invoke();
}
}
}
public override void QueueJob(Job job)
{
_queues.Add(job);
}
}
When starting the DedicatedThreadJobScheduler, a specified number of threads will be started, and these threads will continuously take out tasks from the queue and execute them.
Next, let’s take a look at how to use .net’s TaskCreationOptions.LongRunning:
await Task.Factory.StartNew(LongRunningMethod, TaskCreationOptions.LongRunning);
static void LongRunningMethod()
{
// Simulate a long-running operation
Console.WriteLine("Long-running task started on thread {0}.", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10000);
Console.WriteLine("Long-running task finished on thread {0}.", Thread.CurrentThread.ManagedThreadId);
}
Arrangement of task sequence
When using Task, the await keyword is often used to control the order between multiple asynchronous tasks. Await is actually syntactic sugar. Before understanding await, let’s take a look at the most basic ContinueWith method.
var taskA = Task.Run(() => DateTime.Now);
var taskB = taskA.ContinueWith(time => Console.WriteLine(time.Result));
await taskB;
Imitating Task, we also add the ContinueWith method to the Job.
public class Job
{
private readonly Action _work;
private Job? _continue;
public Job(Action work) => _work = work;
public JobStatus Status { get; internal set; }
internal protected virtual void Invoke()
{
Status = JobStatus.Running;
_work();
Status = JobStatus.Completed;
_continue?.Start();
}
public void Start(JobScheduler? scheduler = null)
=> (scheduler ?? JobScheduler.Current).QueueJob(this);
public static Job Run(Action work)
{
var job = new Job(work);
job.Start();
return job;
}
public Job ContinueWith(Action tobeContinued)
{
if (_continue == null)
{
var job = new Job(() => tobeContinued(this));
_continue = job;
}
else
{
_continue.ContinueWith(tobeContinued);
}
return this;
}
}
This ContinueWith method will place the next Job to be executed in _continue, so that multiple sequentially executed Jobs will form a linked list.
When the execution of the Invoke method of the current Job ends, the next Job will be triggered to be scheduled.
Usage example:
Job.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("11");
}).ContinueWith(_ =>
{
Thread.Sleep(1000);
Console.WriteLine("12");
});
Further use of await keyword to control
To use await like Task, the Job needs to support the GetAwaiter method. For any type, as long as it has the GetAwaiter method, you can use the await keyword on it.
GetAwaiter
can be found in the Task class of c#
public TaskAwaiter GetAwaiter();
Then TaskAwaiter inherits the ICriticalNotifyCompletion interface
public readonly struct TaskAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion
Imit the cat and the tiger, and also add the simplest JobAwaiter to the Job
public class Job
{
...
public JobAwaiter GetAwaiter() => new(this);
}
JobAwaiter is defined as follows:
public struct JobAwaiter : ICriticalNotifyCompletion
{
private readonly Job _job;
public readonly bool IsCompleted => _job.Status == JobStatus.Completed;
public JobAwaiter(Job job)
{
_job = job;
if (job.Status == JobStatus.Created)
{
job.Start();
}
}
public void GetResult() { }
public void OnCompleted(Action continuation)
{
_job.ContinueWith(_ => continuation());
}
public void UnsafeOnCompleted(Action continuation)
=> OnCompleted(continuation);
}
After adding await, the previous code can also be written like this:
await F1();
await F2();
static Job F1() => new Job(() =>
{
Thread.Sleep(1000);
Console.WriteLine("11");
});
static Job F2() => new Job(() =>
{
Thread.Sleep(1000);
Console.WriteLine("12");
});
Summary
Looking back at the two questions at the beginning, you can now try to give answers.
- What is Task? Task is a stateful operation (Created, Scheduled, Running, Completed). It is an abstraction of time-consuming operations. Just like a task in reality, its execution takes a relatively long time. time, it also has the basic processes of Created, Scheduled, Running and Completed. Of course, the results need to be obtained to complete the task. The Job here is relatively simple and does not simulate specific results;
- How Tasks are scheduled, thread pool-based scheduling is adopted by default. That is, after a Task is created, it is executed by an idle thread in the thread pool. Developers do not need to care about when and which thread executes it. In the specific implementation process,
However, due to the limitations of the .net global thread pool, when some special scenarios cannot be met (for example, a task needs to be executed immediately), the scheduling behavior can be changed through TaskCreationOptions;
In addition, await is syntactic sugar. The implementation behind it is based on GetAwaiter, which returns the implementation of the ICriticalNotifyCompletion interface and encapsulates ContinueWith.