PLINQ: 并行LINQ在C#中的应用
.NET Framework 3.5 中引入了语言集成查询 (LINQ),它具有统一的模型,以类型安全方式查询任何 System.Collections.IEnumerable或 System.Collections.Generic.IEnumerable
并行 LINQ (PLINQ) 是 LINQ 模式的并行实现
注意:以下示例代码仅是演示用,其运行速度可能不如等效的顺序LINQ查询快
1、如何进行PLINQ查询
ParallelEnumerable类提供了一个扩展方法AsParallel()可以并行执行LINQ查询
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 }; 14 15 var result = list.AsParallel().Where(x => x > 30); 16 } 17 } 18 }
2、PLINQ查询的参数
WithDegreeOfParallelism:指定 PLINQ 应用于并行化查询的处理器的最大数量。
WithExecutionMode:指定 PLINQ 应如何并行化查询(即使是当默认行为是按顺序运行查询时)。
WithMergeOptions:提供有关 PLINQ 应如何(如果可能)将并行结果合并回使用线程上的一个序列的提示。
ParallelMergeOptions 枚举值如下:
AutoBuffered | 2 |
利用系统选定大小的输出缓冲区进行合并。 在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。 |
Default | 0 |
使用默认合并类型,即 AutoBuffered。 |
FullyBuffered | 3 |
利用整个输出缓冲区进行合并。 在向查询使用者提供任何结果之前,系统会先累计所有结果。 |
NotBuffered | 1 |
不利用输出缓冲区进行合并。 一旦计算出结果元素,就向查询使用者提供这些元素。 |
WithCancellation:指定 PLINQ 应定期监视请求取消时所提供的取消标记的状态以及取消执行。(这个操作和线程的取消操作是一致的,如果不理解线程中的取消操作,可以访问:https://docs.microsoft.com/zh-cn/dotnet/standard/threading/cancellation-in-managed-threads)
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var cts = new System.Threading.CancellationTokenSource(); 14 15 try 16 { 17 var result2 = list.AsParallel().Where(x => x > 30) 18 .WithDegreeOfParallelism(Environment.ProcessorCount) 19 .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 20 .WithMergeOptions(ParallelMergeOptions.Default) 21 .WithCancellation(cts.Token); 22 23 PrintResult(result2); 24 } 25 catch(OperationCanceledException) 26 { 27 Console.WriteLine("并行查询已被取消"); 28 } 29 } 30 31 32 static void PrintResult(IEnumerable<int> collection) 33 { 34 foreach (var item in collection) 35 { 36 Console.WriteLine(item); 37 } 38 } 39 } 40 }
3、PLINQ查询中常用的函数
AsOrdered:指定 PLINQ 应为查询的其余部分保留源序列的排序,或直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。
AsUnordered:指定保留源序列的排序不需要查询其余部分的 PLINQ。
AsSequential:指定查询的其余部分应像非并行的 LINQ 查询一样按顺序运行。
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading.Tasks;
6
7 namespace PLINQ_demo
8 {
9 class Program
10 {
11 static Func<int, bool> query => x => x > 30;
12 static void Main(string[] args)
13 {
14 var orderResult = list.AsParallel().AsOrdered().Where(query);//asc
15 //list.AsParallel().Where(query).OrderBy(x=>x);//asc
16 //list.AsParallel().Where(query).OrderByDescending(x => x);//desc
17 PrintResult(orderResult);
18
19 var unorderResult = list.AsParallel().AsUnordered().Where(query);
20 PrintResult(unorderResult);
21
22 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x));
23 Console.WriteLine("\n\n");
24
25 var linqResult = list.Where(query);
26 PrintResult(linqResult);
27
28 var pinqSequentialResult = list.AsParallel().AsSequential().Where(query);
29 PrintResult(pinqSequentialResult);
30
31 var forceExecutionMode = list.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).AsSequential().Where(query);
32 PrintResult(forceExecutionMode);
33 }
34
35
36 static void PrintResult(IEnumerable<int> collection)
37 {
38 foreach (var item in collection)
39 {
40 Console.WriteLine(item);
41 }
42
43 Console.WriteLine("\n\n");
44 }
45 }
46 }
4、ForAll
ForAll是一种多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回使用者线程的情况下并行处理结果。
如果并行执行查询,PLINQ 对源序列进行分区,以便多个线程能够并发处理不同部分,通常是在不同的线程中。 如果要在一个线程(例如,foreach)中使用结果,必须将每个线程的结果合并回一个序列中。 PLINQ 执行的合并类型具体视查询中的运算符而定。 例如,对结果强制施加新顺序的运算符必须缓冲所有线程中的全部元素。 从使用线程(以及应用用户)的角度来看,完全缓冲查询可能会运行很长时间,才能生成第一个结果。 默认情况下,其他运算符进行部分缓冲,并分批生成结果。 默认不缓冲的一个运算符是 ForAll。 它会立即生成所有线程中的所有元素。
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static Func<int, bool> query => x => x > 30; 12 static void Main(string[] args) 13 { 14 var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 }; 15 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x)); 16 Console.WriteLine("\n\n"); 17 } 18 } 19 }
5、处理PLINQ查询中常用的异常
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading.Tasks;
6
7 namespace PLINQ_demo
8 {
9 class Program
10 {
11 static void Main(string[] args)
12 {
13 //普通 LINQ
14 try
15 {
16 tempList.Select(x => 100 / x).ToList();
17 }
18 catch(DivideByZeroException ex)
19 {
20 Console.WriteLine(ex.Message);
21 }
22
23 //PLINQ
24 try
25 {
26 tempList.AsParallel().Select(x => 100 / x).ToList();
27 }
28 catch(DivideByZeroException ex)
29 {
30 Console.WriteLine(ex.Message);
31 }
32 catch(AggregateException ex)
33 {
34 Console.WriteLine(ex.Message);
35 }
36
37 }
38 }
39 }
使用顺序LINQ查询:当除以0时,得到了DevideByZeroException异常
使用并行LINQ查询:当使用Asparallel()运行,除以0时,得到了AggregateException,因为现在是并行的方式运行,AggregateException将包含运行PLINQ查询期间的所有异常,可以使用Flatten和Handle方法来处理内部的DivideByZeroException类
示例如下:
1 try 2 { 3 tempList.AsParallel().Select(x => 100 / x).ToList(); 4 } 5 catch(DivideByZeroException ex) 6 { 7 Console.WriteLine(ex.Message); 8 } 9 catch(AggregateException ex) 10 { 11 var exceptions = ex.Flatten().InnerExceptions; 12 13 foreach (var item in exceptions) 14 { 15 Console.WriteLine(item); 16 } 17 }
6、数据分区
若要并行执行对数据源的操作,关键步骤之一是,将数据源分区 成多个部分,以供多个线程同时访问。 PLINQ 和任务并行库 (TPL) 提供了默认分区程序,在用户编写并行查询或 ForEach 循环时透明运行。 对于更高级的方案,可以插入自己的分区程序。
这里有点难,还需要再学习理解一下,后面再更新
下一篇: 速成LINQ指南:30分钟轻松掌握
推荐阅读
-
C# LINQ中First方法的实际应用案例
-
在C#中利用LINQ实现两个数据结构的交集操作指南
-
常见 C# 中的 LINQ 应用:Lambda 表达式使用方法
-
在Java中实现类似C# Linq的功能
-
PLINQ: 并行LINQ在C#中的应用
-
C#在AUTOCAD二次开发中的应用
-
【摩尔线程+Colossal-AI强强联手】MusaBert登上CLUE榜单TOP10:技术细节揭秘 - 技术实力:摩尔线程凭借"软硬兼备"的技术底蕴,让MusaBert得以从底层优化到顶层。其内置多功能GPU配备AI加速和并行计算模块,提供了全面的AI与科学计算支持,为AI推理和低资源条件下的大模型训练等场景带来了高效、经济且环保的算力。 - 算法层面亮点:依托Colossal-AI AI大模型开发系统,MusaBert在训练过程中展现出了卓越的并行性能与易用性,特别在预处理阶段对DataLoader进行了优化,适应低资源环境高效处理海量数据。同时,通过精细的建模优化、领域内数据增强以及Adan优化器等手段,挖掘和展示了预训练语言模型出色的语义理解潜力。基于MusaBert,摩尔线程自主研发的MusaSim通过对比学习方法微调,结合百万对标注数据,MusaSim在多个任务如语义相似度、意图识别和情绪分析中均表现出色。 - 数据资源丰富:MusaBert除了自家高质量语义相似数据外,还融合了悟道开源200GB数据、CLUE社区80GB数据,以及浪潮公司提供的1TB高质量数据,保证模型即便在较小规模下仍具备良好性能。 当前,MusaBert已成功应用于摩尔线程的智能客服与数字人项目,并广泛服务于语义相似度、情绪识别、阅读理解与声韵识别等领域。为了降低大模型开发和应用难度,MusaBert及其相关高质量模型代码已在Colossal-AI仓库开源,可快速训练优质中文BERT模型。同时,通过摩尔线程与潞晨科技的深度合作,仅需一张多功能GPU单卡便能高效训练MusaBert或更大规模的GPT2模型,显著降低预训练成本,进一步推动双方在低资源大模型训练领域的共享目标。 MusaBert荣登CLUE榜单TOP10,象征着摩尔线程与潞晨科技联合研发团队在中文预训练研究领域的领先地位。展望未来,双方将携手探索更大规模的自然语言模型研究,充分运用上游数据资源,产出更为强大的模型并开源。持续强化在摩尔线程多功能GPU上的大模型训练能力,特别是在消费级显卡等低资源环境下,致力于降低使用大模型训练的门槛与成本,推动人工智能更加普惠。而潞晨科技作为重要合作伙伴,将继续发挥关键作用。
-
在C#中理解继承与多态的实战应用
-
探讨无名内部类与lambda表达式在Java与C#中的应用
-
贪婪算法在 Python、JavaScript、Java、C++ 和 C# 中的多种实现及其在硬币变化、分数骑士、活动选择和使用哈夫曼编码的最小生成树问题中的应用实例