8天玩转并行开发——第三天 plinq的使用
原文?8天玩轉(zhuǎn)并行開發(fā)——第三天 plinq的使用
?相信在.net平臺下,我們都玩過linq,是的,linq讓我們的程序簡潔優(yōu)美,簡直玩的是愛不釋手,但是傳統(tǒng)的linq只是串行代碼,在并行的
年代如果linq不支持并行計算那該是多么遺憾的事情啊。
? ?當(dāng)然linq有很多種方式,比如linq to sql ,xml,object 等等,如果要將linq做成并行還是很簡單的,這里我就舉一個比較實際一點的例子,
我們知道為了更快的響應(yīng)用戶操作,碼農(nóng)們想盡了各種辦法,絞盡了腦汁,其中有一個辦法就是將數(shù)據(jù)庫數(shù)據(jù)預(yù)加載到內(nèi)存中,然后通過各種
數(shù)據(jù)結(jié)構(gòu)的手段來加速CURD,是的,比如一個排序地球人只能做到N(lgN),那么如果我還想再快一點的話該怎么辦呢?那么現(xiàn)在的并行就能發(fā)
揮巨大的優(yōu)勢,尤其是現(xiàn)在的服務(wù)器配置都是在8個硬件線程的情況下,你簡直會狂笑好幾天啊,好,不亂扯了。
?
1:AsParallel(并行化)
下面我們模擬給ConcurrentDictionary灌入1500w條記錄,看看串行和并行效率上的差異,注意我的老爺機是2個硬件線程。
1 using System;2 using System.Threading;
3 using System.Threading.Tasks;
4 using System.Diagnostics;
5 using System.Collections.Concurrent;
6 using System.Collections.Generic;
7
8 using System.Linq;
9
10 class Program
11 {
12 static void Main(string[] args)
13 {
14 var dic = LoadData();
15
16 Stopwatch watch = new Stopwatch();
17
18 watch.Start();
19
20 //串行執(zhí)行
21 var query1 = (from n in dic.Values
22 where n.Age > 20 && n.Age < 25
23 select n).ToList();
24
25 watch.Stop();
26
27 Console.WriteLine("串行計算耗費時間:{0}", watch.ElapsedMilliseconds);
28
29 watch.Restart();
30
31 var query2 = (from n in dic.Values.AsParallel()
32 where n.Age > 20 && n.Age < 25
33 select n).ToList();
34
35 watch.Stop();
36
37 Console.WriteLine("并行計算耗費時間:{0}", watch.ElapsedMilliseconds);
38
39 Console.Read();
40 }
41
42 public static ConcurrentDictionary<int, Student> LoadData()
43 {
44 ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
45
46 //預(yù)加載1500w條記錄
47 Parallel.For(0, 15000000, (i) =>
48 {
49 var single = new Student()
50 {
51 ID = i,
52 Name = "hxc" + i,
53 Age = i % 151,
54 CreateTime = DateTime.Now.AddSeconds(i)
55 };
56 dic.TryAdd(i, single);
57 });
58
59 return dic;
60 }
61
62 public class Student
63 {
64 public int ID { get; set; }
65
66 public string Name { get; set; }
67
68 public int Age { get; set; }
69
70 public DateTime CreateTime { get; set; }
71 }
72 }
執(zhí)行的結(jié)果還是比較震撼的,將近7倍,這是因為plinq的查詢引擎會盡量利用cpu的所有硬件線程。
?
2:常用方法的使用
<1> orderby?
? ? ? 有時候我們并不是簡單的select一下就ok了,可能需要將結(jié)果進(jìn)行orderby操作,并行化引擎會把要遍歷的數(shù)據(jù)分區(qū),然后在每個區(qū)上進(jìn)行
orderby操作,最后來一個總的orderby,這里很像算法中的“歸并排序”。
1 using System;2 using System.Threading;
3 using System.Threading.Tasks;
4 using System.Diagnostics;
5 using System.Collections.Concurrent;
6 using System.Collections.Generic;
7
8 using System.Linq;
9
10 class Program
11 {
12 static void Main(string[] args)
13 {
14 var dic = LoadData();
15
16 var query1 = (from n in dic.Values.AsParallel()
17 where n.Age > 20 && n.Age < 25
18 select n).ToList();
19
20
21 Console.WriteLine("默認(rèn)的時間排序如下:");
22 query1.Take(10).ToList().ForEach((i) =>
23 {
24 Console.WriteLine(i.CreateTime);
25 });
26
27 var query2 = (from n in dic.Values.AsParallel()
28 where n.Age > 20 && n.Age < 25
29 orderby n.CreateTime descending
30 select n).ToList();
31
32 Console.WriteLine("排序后的時間排序如下:");
33 query2.Take(10).ToList().ForEach((i) =>
34 {
35 Console.WriteLine(i.CreateTime);
36 });
37
38 Console.Read();
39 }
40
41 public static ConcurrentDictionary<int, Student> LoadData()
42 {
43 ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
44
45 //預(yù)加載1500w條記錄
46 Parallel.For(0, 15000000, (i) =>
47 {
48 var single = new Student()
49 {
50 ID = i,
51 Name = "hxc" + i,
52 Age = i % 151,
53 CreateTime = DateTime.Now.AddSeconds(i)
54 };
55 dic.TryAdd(i, single);
56 });
57
58 return dic;
59 }
60
61 public class Student
62 {
63 public int ID { get; set; }
64
65 public string Name { get; set; }
66
67 public int Age { get; set; }
68
69 public DateTime CreateTime { get; set; }
70 }
71 }
?
<2> sum(),average()等等這些聚合函數(shù)的效果跟orderby類型一樣,都是實現(xiàn)了類型歸并排序的效果,這里就不舉例子了。
?
3:指定并行度,這個我在前面文章也說過,為了不讓并行計算占用全部的硬件線程,或許可能要留一個線程做其他事情。
1 var query2 = (from n in dic.Values.AsParallel()2 .WithDegreeOfParallelism(Environment.ProcessorCount - 1)
3 where n.Age > 20 && n.Age < 25
4 orderby n.CreateTime descending
5 select n).ToList();
?
4: 了解ParallelEnumerable類
? ?首先這個類是Enumerable的并行版本,提供了很多用于查詢實現(xiàn)的一組方法,截個圖,大家看看是不是很熟悉,要記住,他們都是并行的。
下面列舉幾個簡單的例子。
1 class Program2 {
3 static void Main(string[] args)
4 {
5 ConcurrentBag<int> bag = new ConcurrentBag<int>();
6
7 var list = ParallelEnumerable.Range(0, 10000);
8
9 list.ForAll((i) =>
10 {
11 bag.Add(i);
12 });
13
14 Console.WriteLine("bag集合中元素個數(shù)有:{0}", bag.Count);
15
16 Console.WriteLine("list集合中元素個數(shù)總和為:{0}", list.Sum());
17
18 Console.WriteLine("list集合中元素最大值為:{0}", list.Max());
19
20 Console.WriteLine("list集合中元素第一個元素為:{0}", list.FirstOrDefault());
21
22 Console.Read();
23 }
24 }
?
5: plinq實現(xiàn)MapReduce算法
? mapReduce是一個非常流行的編程模型,用于大規(guī)模數(shù)據(jù)集的并行計算,非常的牛X啊,記得mongodb中就用到了這個玩意。
map: ?也就是“映射”操作,可以為每一個數(shù)據(jù)項建立一個鍵值對,映射完后會形成一個鍵值對的集合。
reduce:“化簡”操作,我們對這些巨大的“鍵值對集合“進(jìn)行分組,統(tǒng)計等等。
具體大家可以看看百科:http://baike.baidu.com/view/2902.htm
?
下面我舉個例子,用Mapreduce來實現(xiàn)一個對age的分組統(tǒng)計。
using System;using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
class Program
{
static void Main(string[] args)
{
List<Student> list = new List<Student>()
{
new Student(){ ID=1, Name="jack", Age=20},
new Student(){ ID=1, Name="mary", Age=25},
new Student(){ ID=1, Name="joe", Age=29},
new Student(){ ID=1, Name="Aaron", Age=25},
};
//這里我們會對age建立一組鍵值對
var map = list.AsParallel().ToLookup(i => i.Age, count => 1);
//化簡統(tǒng)計
var reduce = from IGrouping<int, int> singleMap
in map.AsParallel()
select new
{
Age = singleMap.Key,
Count = singleMap.Count()
};
///最后遍歷
reduce.ForAll(i =>
{
Console.WriteLine("當(dāng)前Age={0}的人數(shù)有:{1}人", i.Age, i.Count);
});
}
public class Student
{
public int ID { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public DateTime CreateTime { get; set; }
}
}
?
posted on 2014-02-10 21:03 NET未來之路 閱讀(...) 評論(...) 編輯 收藏轉(zhuǎn)載于:https://www.cnblogs.com/lonelyxmas/p/3543517.html
總結(jié)
以上是生活随笔為你收集整理的8天玩转并行开发——第三天 plinq的使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .Net高级技术——对象序列化
- 下一篇: HCL实验四