线程

image-20241130220945830

创建

1
Thread thread1 = new Thread(Methode);

开始

1
thread1.Start();

等待完成

1
thread1.Join();

优先级

1
thread1.Priority=ThreadPriority.Highest;//优先级和线程调度器时间片都会影响顺序

名字

1
thread1.Name="001";

当前线程优先级

1
Thread.CurrentThread.Priority=ThreadPriority.Normal;

1
lock (counterLock) { counter += 1; }

监视器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//尝试进入临界区
if (Monitor.TryEnter(ticketslock, 2000))
{
try
{
......
}
finally
{
Monitor.Exit(ticketslock);
}
}
else
{//超时,未进入临界区
......
}

互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//可在进程内,也跨进程,filePath:共享的互斥体
using (var mutex = new Mutex(false, $"GlobalFileMutex:{filePath}"))
{
for (int i = 0; i < 10000; i++)
{
mutex.WaitOne();//获取互斥体
try
{
......
}
finally
{
mutex.ReleaseMutex();
}
}
}

读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
bool lockAcquired = false;
try
{

_lock.EnterWriteLock();//读
lockAcquired = true;//确保获取了锁
......
}
finally
{
if (lockAcquired)

_lock.ExitWriteLock();
}
-------------------------------------------------------------------------------------------------------------------
bool lockAcquired = false;
try
{
_lock.EnterReadLock();//写
lockAcquired = true;
}
finally
{
if (lockAcquired)
_lock.ExitReadLock();

}

信号量

1
using SemaphoreSlim semaphore = new SemaphoreSlim(initialCount: 3, maxCount: 3);//设置线程数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
semaphore.Wait();//等待信号量,数量-1,等待和释放不必在同一个线程中
Thread thread = new Thread(() => ProcessBooking(input));
thread.Start();

void ProcessBooking(string? input)
{
try
{
......
}
finally
{
semaphore.Release();//释放信号量
}
}

自动重置事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
using AutoResetEvent autoResetEvent = new AutoResetEvent(false);//初始信号false
//消费者线程
//autoResetEvent.WaitOne();
//
//生产者线程
//autoResetEvent.Set();
while (true)
{
userInput = Console.ReadLine()??"";
if (userInput=="go")
{
autoResetEvent.Set();
}
}

void Worker(){
while (true)
{
Console.WriteLine($"worker{Thread.CurrentThread.Name} thread is waiting for the signal");
autoResetEvent.WaitOne();
Console.WriteLine($"worker{Thread.CurrentThread.Name} thread proceeds");
Thread.Sleep(2000);
}
}

手动重置事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
using ManualResetEventSlim manualResetEvent = new ManualResetEventSlim(false);
while (true)
{
userInput = Console.ReadLine() ?? "";
if (userInput == "wt")
{
manualResetEvent.Reset();
}
if (userInput == "go")
{
manualResetEvent.Set();
}
}

void Work()
{
while (true)
{
if (userInput == "wt")
Console.WriteLine($"{Thread.CurrentThread.Name} is waiting for the signal...");
manualResetEvent.Wait();
Console.WriteLine($"{Thread.CurrentThread.Name} has Working.");
Thread.Sleep(1000);
}
}

双向信号传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

Queue<int> queue = new Queue<int>();

ManualResetEventSlim consumeEvent = new ManualResetEventSlim(false);
ManualResetEventSlim prodeceEvent = new ManualResetEventSlim(true);
int consumeCounter = 0;
object lockConsumerCounter = new object();
Thread[] consumerThreads = new Thread[3];

for (int i = 0; i < 3; i++)
{
consumerThreads[i] = new Thread(Consume);
consumerThreads[i].Name = $"Consumer{i + 1}";
consumerThreads[i].Start();
}
//生产者
while (true)
{
prodeceEvent.Wait();
prodeceEvent.Reset();
Console.WriteLine("To produce , enter 'p'");
var input = Console.ReadLine() ?? "";
if (input.ToLower() == "p")
{
for (int i = 1; i <= 10; i++)
{
queue.Enqueue(i);
Console.WriteLine($"Produced:{i}");
}
consumeEvent.Set();
}
}
//消费者行为
void Consume()
{
while (true)
{


int i = 0;
int j = 0;
consumeEvent.Wait();
while (queue.TryDequeue(out int item))
{
i++;//当前消费者消耗产品的总数
Thread.Sleep(500);
Console.WriteLine($"Consume:{item} product, from thead:{Thread.CurrentThread.Name}");
}
j++;
lock (lockConsumerCounter)
{//当前消费者在第一次消费完所有产品时,进入
if (j == 1 && i != 0)
{
Console.WriteLine($"{Thread.CurrentThread.Name} +{i}");
consumeCounter++;
//如果三个消费者把所有产品消耗完了
if (consumeCounter == 3)
{
consumeEvent.Reset();
prodeceEvent.Set();
consumeCounter = 0;
Console.WriteLine("----------------------------");
}

}
}
}
}

死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
object userLock=new object();
object orderLock=new object();

Thread thread=new Thread(ManageOrder);
thread.Start();

ManageUesr();

thread.Join();

Console.WriteLine("Program finished. press any key to exit.");
void ManageUesr()
{
lock (userLock)
{
Console.WriteLine("User Management acuired the user lock");
Thread.Sleep(2000);
lock (orderLock)
{
Console.WriteLine("User Management acquired the order lock");
}
}

}

void ManageOrder()
{
lock (orderLock)
{
Console.WriteLine("Order Management acuired the order lock");
Thread.Sleep(1000);
lock (userLock)
{
Console.WriteLine("Order Management acquired the user lock");
}
}
}

线程的状态

1
thread1.ThreadState

线程池

1
2
ThreadPool.QueueUserWorkItem(ProcessBooking,input);
Thread.CurrentThread.IsThreadPoolThread;//是否是线程池线程

线程中的异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
var exceptions = new List<Exception>();

object lockexceptions = new object();
Thread thread1 = new Thread(Work);
Thread thread2 = new Thread(Work);
thread1.Start();
thread2.Start();

thread1.Join();
thread2.Join();

foreach (var item in exceptions)
{
Console.WriteLine( item.Message);
}
void Work(object? obj)
{
try
{
throw new InvalidOperationException("An error occurred in this worker thread. This is expected.");
}
catch (Exception ex)
{
lock (lockexceptions)
exceptions.Add(ex);
}

}

任务

1
2
3
await Task.Run(() => Console.WriteLine("888"));
var tesk = Task.Run(() => Console.WriteLine("666"));
Console.WriteLine("777");

从任务返回结果

1
2
3
4
5
var tasks = Task.Run(() =>
{
......
return ......
});

Wait,WaitAll,Result

1
2
3
task.Wait();//等待任务完成
//Task.WaitAll(task,task1);//等待多个任务完成
//var result=task.Result;//阻塞线程,返回结果

ContinueWith

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
using System.Text.Json;

using var client = new HttpClient();
var task = client.GetStringAsync("https://pokeapi.co/api/v2/pokemon");
task.ContinueWith(t =>
{
var result = t.Result;
var doc=JsonDocument.Parse(result);
JsonElement root=doc.RootElement;
JsonElement results = root.GetProperty("results");
JsonElement firstPokemon = results[0];
Console.WriteLine($"{firstPokemon.GetProperty("name")},{firstPokemon.GetProperty("url")}");
});
Console.WriteLine("666");
Console.ReadLine();

WhenAll、Any

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//Console.WriteLine((tasks.Sum(t => t.Result)));//会阻塞主线程

//全部任务完成时
Task.WhenAll(tasks)
.ContinueWith(t =>
{
Console.WriteLine($"{t.Result.Sum()}");
});
//当有任务完成时
//Task.WhenAny(tasks)
// .ContinueWith(t =>
// {
// Console.WriteLine($"{t.Result.Result}");
// });

延续链与解包

image-20241202105506144

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
using System.Text.Json;

using var client = new System.Net.Http.HttpClient();
var taskListJson = client.GetStringAsync("https://pokeapi.co/api/v2/pokemon");
var taskGetfirstUrl = taskListJson.ContinueWith(t =>
{//得到上一个task的结果
var result = t.Result;
var doc = JsonDocument.Parse(result);
JsonElement root = doc.RootElement;
JsonElement results = root.GetProperty("results");
JsonElement firstPokemon = results[0];
return firstPokemon.GetProperty("url").ToString();
});
var taskGetDetail = taskGetfirstUrl.ContinueWith(t =>
{
var result = t.Result;
return client.GetStringAsync(result);
}).Unwrap();//解包
taskGetDetail.ContinueWith(t =>
{
var result = t.Result;
var doc = JsonDocument.Parse(result);
JsonElement root = doc.RootElement;
var results1 = root.GetProperty("name").ToString();
var results2 = root.GetProperty("weight").ToString();
var results3 = root.GetProperty("height").ToString();
Console.WriteLine($"name:{results1}\nweight:{results2}\nheight:{results3}");
});
Console.WriteLine("666");
Console.ReadLine();

任务中的异常处理

1
2
3
4
5
6
7
8
9
var task1 = client.GetStringAsync("https://pokeapi123.co/api/v2/pokemon");

if (task1.IsFaulted && task1.Exception != null) //抛出异常
{
foreach (var item in task1.Exception.InnerExceptions)
{
Console.WriteLine(item.Message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
var tasks = new[]
{
Task.Run(() =>
{
throw new InvalidOperationException("Invalid Operation");
}),
Task.Run(() =>
{
throw new ArgumentNullException("Argument null");
}),
Task.Run(() =>
{
throw new Exception("General exception");
}),
};

//var t=Task.WhenAll(tasks);
//t.Wait();
//var tesult=t.Result;//用result和wait都可以抛出异常

Task.WhenAll(tasks)
.ContinueWith(t =>
{
if (t.IsFaulted&&t.Exception!=null)
{
foreach (var item in t.Exception.InnerExceptions)
{
Console.WriteLine(item.Message);
}
}
});
Console.WriteLine("Press enter key to exit");
Console.ReadLine();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
using System.Text.Json;

using var client = new System.Net.Http.HttpClient();
var task1 = client.GetStringAsync("https://pokeapi123.co/api/v2/pokemon");
var task2 = task1.ContinueWith(
t =>
{
var result = t.Result;
var doc = JsonDocument.Parse(result);
JsonElement root = doc.RootElement;
JsonElement results = root.GetProperty("results");
JsonElement firstPokemon = results[0];
Console.WriteLine($"{firstPokemon.GetProperty("name")},{firstPokemon.GetProperty("url")}");
},
TaskContinuationOptions.NotOnFaulted//添加执行条件,当前一个任务无异常时执行
);

Console.WriteLine("666");
Console.ReadLine();

任务取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
using var cts=new CancellationTokenSource();
//cts.CancelAfter(1000);//超时,1秒后自动取消
var token=cts.Token;
var task=Task.Run (Work,token);


Console.WriteLine("To cancel,press 'c'");
var input = Console.ReadLine();
if (input == "c")
{
cts.Cancel();
}
task.Wait();
Console.WriteLine($"{task.Status}");
Console.ReadLine();

void Work()
{
Console.WriteLine("Started dong the work");
for (int i = 0; i < 100000; i++)
{
if (token.IsCancellationRequested)
{
Console.WriteLine($"User requested cancellation at iteration:{i}");

//throw new OperationCanceledException();
//token.ThrowIfCancellationRequested();
break;
}
Thread.SpinWait(300000);
}
Console.WriteLine("Work is done.");
}

异步

异步与Await概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
using System.Text.Json;

output();

Console.WriteLine("666");
Console.ReadLine();

async void output()
{
using var client = new System.Net.Http.HttpClient();
var taskGetPokemonList = client.GetStringAsync("https://pokeapi.co/api/v2/pokemon");
var response = await taskGetPokemonList;


var doc = JsonDocument.Parse(response);
JsonElement root = doc.RootElement;
JsonElement results = root.GetProperty("results");
JsonElement firstPokemon = results[0];
Console.WriteLine($"{firstPokemon.GetProperty("name")},{firstPokemon.GetProperty("url")}");

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
namespace 异步与Await基本语法
{
internal class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("starting to do work");
var data= await WorkAsync();
Console.WriteLine($"{data}");
Console.WriteLine("press enter to exit");
Console.ReadLine();
}
static async Task<string> WorkAsync()
{
await Task.Delay(2000);
Console.WriteLine("Work is done");
return "666";
}
}
}

Await前后使用的是哪个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
namespace 异步与Await基本语法
{
internal class Program
{
static async Task Main(string[] args)//main函数使用了await,要把void main 改为task main
{
Console.WriteLine($"Main thread id:{Thread.CurrentThread.ManagedThreadId}");//1
Console.WriteLine("starting to do work");
// var data = await WorkAsync();
await WorkAsync();

//Console.WriteLine($"{data}");
Console.WriteLine($"data thread id:{Thread.CurrentThread.ManagedThreadId}");//x
Console.WriteLine("press enter to exit");
Console.ReadLine();
}
static async Task<string> WorkAsync()
{
Console.WriteLine($"task before thread id:{Thread.CurrentThread.ManagedThreadId}");//1
await Task.Delay(2000);
Console.WriteLine("Work is done");
Console.WriteLine($"task after id:{Thread.CurrentThread.ManagedThreadId}");//x
return "666";
}
}
}

返回值后的延续,改自“延续链解包”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
using System.Text.Json;

using var client = new System.Net.Http.HttpClient();
getInfoAsync();
Console.WriteLine("666");
Console.ReadLine();


async Task getInfoAsync()
{
Console.WriteLine("Working...");
var taskListJson = await client.GetStringAsync("https://pokeapi.co/api/v2/pokemon");
//得到第一个pokemon的url
Console.WriteLine("Working...");
var doc = JsonDocument.Parse(taskListJson);
JsonElement root = doc.RootElement;
JsonElement results = root.GetProperty("results");
JsonElement firstPokemon = results[0];
var url = firstPokemon.GetProperty("url").ToString();
Console.WriteLine("Working...");
var GetDetail = await client.GetStringAsync(url);
doc = JsonDocument.Parse(GetDetail);
root = doc.RootElement;
var results1 = root.GetProperty("name").ToString();
var results2 = root.GetProperty("weight").ToString();
var results3 = root.GetProperty("height").ToString();
Console.WriteLine($"name:{results1}\nweight:{results2}\nheight:{results3}");
}

异步方法中的异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var tasks = new[]
{
Task.Run(() =>
{
throw new InvalidOperationException("Invalid Operation");

}),
Task.Run(() =>
{
throw new ArgumentNullException("Argument null");
}),
Task.Run(() =>
{
throw new Exception("General exception");
}),
};

await Task.WhenAll(tasks);//抛出一个

Console.WriteLine("Press enter key to exit");
Console.ReadLine();

Await和同步上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private async void button1_Click(object sender, EventArgs e)
{
//ShowMessage(2, 3000);//这个会阻塞ui线程,会在一瞬间显示最终结果
await ShowMessage(2, 3000);
}


private async void button2_Click(object sender, EventArgs e)
{
// ShowMessage(2, 2000);
await ShowMessage(2, 2000);
}
private async Task ShowMessage(int v1, int v2)
{
object lk = new object();
//Thread.Sleep(v2);
await Task.Delay(v2);
lock (lk)
i *= v1;
label1.Text = i.ToString();
}

并行循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

int num = 10;
int[] array = new int[num];
for (int i = 0; i < num; i++)
{
array[i] = i + 1;
}
int sum = 0;
object locksum = new Object();
var st = DateTime.Now;

//并行循环for、foreach、invoke都会阻塞线程
Parallel.For(0, array.Length, i =>
{
lock (locksum)
{
sum += array[i];
Console.WriteLine($"{Task.CurrentId}\t{Thread.CurrentThread.IsThreadPoolThread}");
}
});

var et = DateTime.Now;
Console.WriteLine($"{sum}\r\n{(et - st).TotalMilliseconds}");
Console.ReadLine();
Parallel.Invoke(() =>
{
Console.WriteLine("I am one");
},
() =>
{
Console.WriteLine("I am two");
},
() =>
{
Console.WriteLine("I am three");
});


Console.ReadLine();

并行循环异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

int num = 100;
int[] array = new int[num];
for (int i = 0; i < num; i++)
{
array[i] = i + 1;
}
int sum = 0;
object locksum = new Object();
var st = DateTime.Now;
try
{

//添加state参数
Parallel.For(0, array.Length, (i, state) =>
{
lock (locksum)
{//无异常,则执行
if (!state.IsExceptional)
{
if (i == 65)
{
throw new InvalidOperationException("666");
}
sum += array[i];
Console.WriteLine($"{Task.CurrentId}{Thread.CurrentThread.IsThreadPoolThread}");
}
}
});
}
catch (Exception ex)
{
//捕获聚合异常
ex.Dump();
}

var et = DateTime.Now;
Console.WriteLine($"{sum}\r\n{(et - st).TotalMilliseconds}");
Console.ReadLine();


并行循环Stop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//添加state参数
Parallel.For(0, array.Length, (i, state) =>
{
lock (locksum)
{
if (!state.IsStopped)
{
if (i == 65)
{
state.Stop();
}
sum += array[i];
Console.WriteLine($"{Task.CurrentId}{Thread.CurrentThread.IsThreadPoolThread}");
}
}
});

并行循环Break

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Parallel.For(0, array.Length, (i, state) =>
{
lock (locksum)
{//
if (state.ShouldExitCurrentIteration && state.LowestBreakIteration < i)

return;

if (i == 65)
{
state.Break();
}
sum += array[i];
Console.WriteLine($"{Task.CurrentId}{Thread.CurrentThread.IsThreadPoolThread}");
}
});

并行循环Result

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

int num = 100;
int[] array = new int[num];
for (int i = 0; i < num; i++)
{
array[i] = i + 1;
}
int sum = 0;
object locksum = new Object();
ParallelLoopResult result;
var st = DateTime.Now;
try
{
result = Parallel.For(0, array.Length, (i, state) =>
{
lock (locksum)
{
if (state.ShouldExitCurrentIteration && state.LowestBreakIteration < i)
return;

if (i == 65)
{
state.Break();
//throw new InvalidOperationException();
}
sum += array[i];
Console.WriteLine($"{Task.CurrentId}{Thread.CurrentThread.IsThreadPoolThread}");
}
});result.Dump();
}
catch (Exception ex)
{
ex.Dump();
}

var et = DateTime.Now;
Console.WriteLine($"{sum}\r\n{(et - st).TotalMilliseconds}");
Console.ReadLine();


并行循环取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

using var cts = new CancellationTokenSource();
//cts.CancelAfter(1000);//超时,1秒后自动取消
var token = cts.Token;
var task = Task.Run(Work, token);


Console.WriteLine("To cancel,press 'c'");
var input = Console.ReadLine();
if (input == "c")
{
cts.Cancel();
}
task.Wait();
Console.WriteLine($"{task.Status}");
Console.ReadLine();

void Work()
{
try
{
Console.WriteLine("Started dong the work");
var options = new ParallelOptions { CancellationToken = cts.Token };
//添加并行配置
Parallel.For(0, 1000, options, i
=>
{
Console.WriteLine($"{DateTime.Now}");
Thread.SpinWait(300000000);
});
}
catch (Exception ex)
{
ex.Dump();
}
}

并行循环线程存储变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
var startTime = DateTime.Now;

int[] array = System.Linq.Enumerable.Range(0, 1000000000).ToArray();
object lockSum = new Object();
long sum = 0;
// () => 0,初始为0,tls局部存储变量
Parallel.For(
0,
array.Length,
() => 0,
(i, state, tls) =>
{
tls += array[i] + 1;
return tls;
},
tls =>
{
lock (lockSum)
{
sum += tls;
//Console.WriteLine($"{Task.CurrentId}");
}
}
);
var endTime = DateTime.Now;
Console.WriteLine($"{sum}\r\n{(endTime - startTime).TotalMilliseconds}");
Console.ReadLine();

PLINQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var items = System.Linq.Enumerable.Range(0, 200);
//并行查询
var eventnumber = items.AsParallel().AsOrdered().Where(i =>
{

Console.WriteLine($"当前数字\t{i}\t的线程id:{Thread.CurrentThread.ManagedThreadId}");
return i % 2 == 0;
});


Console.WriteLine("test......");
eventnumber.ForAll(e => Console.WriteLine($"{e}:threadID:{Thread.CurrentThread.ManagedThreadId}"));

//foreach (var element in eventnumber)
//{
// Console.WriteLine($"消费{element}:threadID:{Thread.CurrentThread.ManagedThreadId}");
//
//}
Console.ReadLine();

PLINQ缓冲模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var items = System.Linq.Enumerable.Range(0, 2000);

//并行查询,添加缓冲模式
var eventnumber = items
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Where(i =>
{
Console.WriteLine($"当前数字\t{i}\t的线程id:{Thread.CurrentThread.ManagedThreadId}");
return i % 2 == 0;
});

Console.WriteLine("test......");
//eventnumber延迟执行,foreach有合并行为,forall没有
foreach (var element in eventnumber)
{
Console.WriteLine($"消费{element}:threadID:{Thread.CurrentThread.ManagedThreadId}");
}
Console.ReadLine();

PLINQ异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var items = System.Linq.Enumerable.Range(0, 20);


//并行查询
var eventnumber = items.AsParallel().WithMergeOptions(ParallelMergeOptions.Default).Where(i =>
{

Console.WriteLine($"当前数字{i}的线程id:{Thread.CurrentThread.ManagedThreadId}");
if (i == 5) throw new InvalidOperationException("has exception");
if (i == 19) throw new ArgumentNullException();
return i % 2 == 0;
});


Console.WriteLine("test......");
//只能在这里捕获,前面的是延迟执行的。。。
try
{

eventnumber.ForAll(e => Console.WriteLine($"{e}:threadID:{Thread.CurrentThread.ManagedThreadId}"));

}
catch (AggregateException ex)
{//捕获聚合异常中的所有异常
ex.Handle(e => { Console.WriteLine(e.Message); return true; });

}

PLINQ取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
var items = System.Linq.Enumerable.Range(0, 20);
using var cts = new CancellationTokenSource();//创建资源

//并行查询,使用令牌
var eventnumber = items
.AsParallel()
.WithCancellation(cts.Token)
.Where(i =>
{

Console.WriteLine($"当前数字{i}的线程id:{Thread.CurrentThread.ManagedThreadId}");
return i % 2 == 0;
});


Console.WriteLine("test......");
try
{

eventnumber.ForAll(e =>
{
if (e > 8)
{
cts.Cancel();//取消操作
}
Console.WriteLine($"{e}:threadID:{Thread.CurrentThread.ManagedThreadId}");
});

}
catch (OperationCanceledException ex)
{
ex.Dump();
}

并发栈

1
2
3
4
5
6
7
8
9
10
var stack=new ConcurrentStack<int>();
stack.Push(1);
stack.Push(2);
stack.Push(3);
stack.TryPop(out var result);
stack.TryPop(out var result1);
stack.TryPop(out var result2);
result.Dump();
result1.Dump();
result2.Dump();

并发队列、阻塞集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#nullable enable
ConcurrentQueue<string?> requestQueue = new ConcurrentQueue<string?>();
//一个包装器,对ConcurrentQueue进行包装
BlockingCollection<string?> collection = new BlockingCollection<string?>(requestQueue, 3);//设置限制3

Thread monitoringThread = new Thread(MonitorQueue);
monitoringThread.Start();

Console.WriteLine("Server is running ,type 'exit' to stop");
while (true)
{
string? input = Console.ReadLine();
if (input?.ToLower() == "exit")
{
collection.CompleteAdding();//停止添加
input.Dump();
break;

}
collection.Add(input);
Console.WriteLine($"Enqueued:{input};queue size:{collection.Count}");
}

void MonitorQueue()
{ //这个foreach不会自己结束
foreach (var request in collection.GetConsumingEnumerable())
{//检查是否完成
if (collection.IsCompleted)
{
break;
}
Thread processingThread = new Thread(() => ProcessInput(request));
processingThread.Start();
Thread.Sleep(2000);
}

}

void ProcessInput(string? input)
{
Thread.Sleep(2000);
(Convert.ToInt32(input)*10).Dump();
}