线程亲和性
知识点
什么是线程亲和性
线程亲和性(Thread Affinity)是指将线程绑定到特定的CPU核心或处理器上运行的技术。通过设置线程亲和性,可以控制线程在哪个CPU核心上执行,从而优化性能、减少缓存失效和提高系统的可预测性。
线程亲和性的作用
缓存优化:减少缓存失效,提高缓存命中率
NUMA优化:在NUMA架构中优化内存访问
实时性:提高实时应用的可预测性
热点避免:避免某些核心过度使用
调试优化:便于性能分析和调试
ProcessorAffinity属性
在.NET中,可以通过Thread.ProcessorAffinity属性(在.NET Framework中)或使用P/Invoke调用Windows API来设置线程亲和性。
应用场景
高性能计算应用
实时系统
游戏引擎
音视频处理
网络服务器优化
注意事项
不当使用可能降低性能
操作系统调度器通常做得很好
应该基于性能测试做决定
考虑NUMA架构的影响
代码案例
案例1:基本线程亲和性设置
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class BasicThreadAffinity
{
// Windows API声明
[DllImport("kernel32.dll")]
static extern IntPtr GetCurrentThread();
[DllImport("kernel32.dll")]
static extern IntPtr SetThreadAffinityMask(IntPtr hThread, IntPtr dwThreadAffinityMask);
[DllImport("kernel32.dll")]
static extern int GetCurrentThreadId();
static void Main(string[] args)
{
Console.WriteLine("线程亲和性基本示例");
Console.WriteLine($"系统CPU核心数: {Environment.ProcessorCount}");
// 显示当前进程的亲和性
var process = Process.GetCurrentProcess();
Console.WriteLine($"进程亲和性掩码: 0x{process.ProcessorAffinity.ToInt64():X}");
// 启动多个任务,分别绑定到不同的CPU核心
Task[] tasks = new Task[Math.Min(4, Environment.ProcessorCount)];
for (int i = 0; i < tasks.Length; i++)
{
int coreId = i;
tasks[i] = Task.Run(() => WorkerWithAffinity(coreId));
}
// 启动一个不设置亲和性的对比任务
Task normalTask = Task.Run(() => NormalWorker());
Task.WaitAll(tasks.Concat(new[] { normalTask }).ToArray());
Console.WriteLine("所有任务完成");
}
static void WorkerWithAffinity(int coreId)
{
int threadId = GetCurrentThreadId();
Console.WriteLine($"工作线程{coreId}: 线程ID={threadId}, 尝试绑定到CPU核心{coreId}");
try
{
// 设置线程亲和性到指定核心
IntPtr affinityMask = new IntPtr(1 << coreId);
IntPtr previousAffinity = SetThreadAffinityMask(GetCurrentThread(), affinityMask);
if (previousAffinity != IntPtr.Zero)
{
Console.WriteLine($"工作线程{coreId}: 成功绑定到CPU核心{coreId}");
}
else
{
Console.WriteLine($"工作线程{coreId}: 绑定到CPU核心{coreId}失败");
}
}
catch (Exception ex)
{
Console.WriteLine($"工作线程{coreId}: 设置亲和性异常 - {ex.Message}");
}
// 执行计算密集型工作
PerformIntensiveWork($"绑定核心{coreId}", 3000);
}
static void NormalWorker()
{
int threadId = GetCurrentThreadId();
Console.WriteLine($"普通工作线程: 线程ID={threadId}, 使用系统默认调度");
// 执行相同的计算密集型工作
PerformIntensiveWork("系统调度", 3000);
}
static void PerformIntensiveWork(string workerName, int durationMs)
{
var stopwatch = Stopwatch.StartNew();
long operations = 0;
Console.WriteLine($"{workerName}: 开始计算密集型工作");
while (stopwatch.ElapsedMilliseconds < durationMs)
{
// 执行一些计算密集型操作
for (int i = 0; i < 10000; i++)
{
Math.Sqrt(i * i + 1);
operations++;
}
// 定期报告进度
if (operations % 100000 == 0)
{
Console.WriteLine($"{workerName}: 已执行 {operations} 次操作, " +
$"当前线程ID: {GetCurrentThreadId()}");
}
}
stopwatch.Stop();
Console.WriteLine($"{workerName}: 工作完成, 总操作数: {operations}, " +
$"耗时: {stopwatch.ElapsedMilliseconds}ms, " +
$"操作/秒: {operations * 1000.0 / stopwatch.ElapsedMilliseconds:F0}");
}
}
案例2:NUMA感知的任务分配
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class NumaAwareTaskDistribution
{
[DllImport("kernel32.dll")]
static extern bool GetNumaHighestNodeNumber(out uint HighestNodeNumber);
[DllImport("kernel32.dll")]
static extern bool GetNumaNodeProcessorMask(uint Node, out ulong ProcessorMask);
[DllImport("kernel32.dll")]
static extern IntPtr SetThreadAffinityMask(IntPtr hThread, IntPtr dwThreadAffinityMask);
[DllImport("kernel32.dll")]
static extern IntPtr GetCurrentThread();
static void Main(string[] args)
{
Console.WriteLine("NUMA感知任务分配示例");
// 获取NUMA信息
var numaInfo = GetNumaInformation();
if (numaInfo.Count == 0)
{
Console.WriteLine("系统不支持NUMA或获取NUMA信息失败");
return;
}
Console.WriteLine($"检测到 {numaInfo.Count} 个NUMA节点:");
foreach (var node in numaInfo)
{
Console.WriteLine($" 节点 {node.Key}: 处理器掩码 0x{node.Value:X}");
}
// 在每个NUMA节点上启动任务
List
foreach (var numaNode in numaInfo)
{
int nodeId = numaNode.Key;
ulong processorMask = numaNode.Value;
// 在当前NUMA节点上启动多个任务
int tasksPerNode = Math.Min(4, GetCoreCountFromMask(processorMask));
for (int i = 0; i < tasksPerNode; i++)
{
int taskIndex = i;
tasks.Add(Task.Run(() => NumaAwareWorker(nodeId, processorMask, taskIndex)));
}
}
// 启动对比任务(不设置亲和性)
Task normalTask = Task.Run(() => NormalTask());
tasks.Add(normalTask);
Task.WaitAll(tasks.ToArray());
Console.WriteLine("所有任务完成");
}
static Dictionary
{
var numaInfo = new Dictionary
try
{
if (GetNumaHighestNodeNumber(out uint highestNode))
{
for (uint nodeId = 0; nodeId <= highestNode; nodeId++)
{
if (GetNumaNodeProcessorMask(nodeId, out ulong processorMask))
{
if (processorMask != 0)
{
numaInfo.Add((int)nodeId, processorMask);
}
}
}
}
}
catch (Exception ex)
{
Console.WriteLine($"获取NUMA信息失败: {ex.Message}");
}
return numaInfo;
}
static int GetCoreCountFromMask(ulong mask)
{
int count = 0;
while (mask != 0)
{
count += (int)(mask & 1);
mask >>= 1;
}
return count;
}
static void NumaAwareWorker(int numaNodeId, ulong processorMask, int taskIndex)
{
Console.WriteLine($"NUMA任务 节点{numaNodeId}-{taskIndex}: 开始执行");
try
{
// 设置线程亲和性到NUMA节点的处理器
IntPtr affinityMask = new IntPtr((long)processorMask);
SetThreadAffinityMask(GetCurrentThread(), affinityMask);
Console.WriteLine($"NUMA任务 节点{numaNodeId}-{taskIndex}: 绑定到处理器掩码 0x{processorMask:X}");
}
catch (Exception ex)
{
Console.WriteLine($"NUMA任务 节点{numaNodeId}-{taskIndex}: 设置亲和性失败 - {ex.Message}");
}
// 分配一些内存并进行计算
PerformNumaAwareWork($"NUMA节点{numaNodeId}-任务{taskIndex}");
}
static void NormalTask()
{
Console.WriteLine("普通任务: 使用系统默认调度");
PerformNumaAwareWork("普通任务");
}
static void PerformNumaAwareWork(string taskName)
{
var stopwatch = Stopwatch.StartNew();
// 分配大量内存进行操作
const int arraySize = 1000000;
double[] data = new double[arraySize];
Console.WriteLine($"{taskName}: 开始内存密集型计算");
// 初始化数据
for (int i = 0; i < arraySize; i++)
{
data[i] = Math.Sin(i * 0.001);
}
// 执行多轮计算
for (int round = 0; round < 5; round++)
{
double sum = 0;
for (int i = 0; i < arraySize; i++)
{
data[i] = Math.Sqrt(data[i] * data[i] + 1);
sum += data[i];
}
Console.WriteLine($"{taskName}: 第{round + 1}轮完成, 校验和: {sum:F2}");
}
stopwatch.Stop();
Console.WriteLine($"{taskName}: 工作完成, 耗时: {stopwatch.ElapsedMilliseconds}ms");
}
}
案例3:高频交易系统模拟
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class HighFrequencyTradingSimulation
{
[DllImport("kernel32.dll")]
static extern IntPtr SetThreadAffinityMask(IntPtr hThread, IntPtr dwThreadAffinityMask);
[DllImport("kernel32.dll")]
static extern IntPtr GetCurrentThread();
[DllImport("kernel32.dll")]
static extern bool SetThreadPriority(IntPtr hThread, int nPriority);
const int THREAD_PRIORITY_TIME_CRITICAL = 15;
private static readonly ConcurrentQueue
private static readonly ConcurrentQueue
private static volatile bool isRunning = true;
class MarketData
{
public string Symbol { get; set; }
public decimal Price { get; set; }
public long Timestamp { get; set; }
public int Volume { get; set; }
}
class Order
{
public string Symbol { get; set; }
public decimal Price { get; set; }
public int Quantity { get; set; }
public long Timestamp { get; set; }
public string Side { get; set; } // "BUY" or "SELL"
}
static void Main(string[] args)
{
Console.WriteLine("高频交易系统模拟 - 线程亲和性优化");
Console.WriteLine($"系统CPU核心数: {Environment.ProcessorCount}");
// 核心分配策略:
// 核心0: 市场数据接收 (最高优先级)
// 核心1: 策略引擎 (高优先级)
// 核心2: 订单执行 (高优先级)
// 核心3: 风险管理 (中等优先级)
Task[] criticalTasks = new Task[4];
// 市场数据接收线程 - 绑定到核心0
criticalTasks[0] = Task.Run(() => MarketDataReceiver(0));
// 策略引擎线程 - 绑定到核心1
criticalTasks[1] = Task.Run(() => TradingStrategy(1));
// 订单执行线程 - 绑定到核心2
criticalTasks[2] = Task.Run(() => OrderExecutor(2));
// 风险管理线程 - 绑定到核心3
criticalTasks[3] = Task.Run(() => RiskManager(3));
// 运行10秒
Thread.Sleep(10000);
isRunning = false;
Task.WaitAll(criticalTasks);
Console.WriteLine("交易系统停止");
}
static void MarketDataReceiver(int coreId)
{
SetupCriticalThread("市场数据接收", coreId);
var random = new Random();
string[] symbols = { "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN" };
while (isRunning)
{
// 模拟接收市场数据
var marketData = new MarketData
{
Symbol = symbols[random.Next(symbols.Length)],
Price = 100 + (decimal)(random.NextDouble() * 50),
Timestamp = Stopwatch.GetTimestamp(),
Volume = random.Next(100, 1000)
};
marketDataQueue.Enqueue(marketData);
// 高频数据,每毫秒多个数据点
Thread.Sleep(1);
}
Console.WriteLine("市场数据接收线程退出");
}
static void TradingStrategy(int coreId)
{
SetupCriticalThread("策略引擎", coreId);
var random = new Random();
decimal lastPrice = 100;
while (isRunning)
{
if (marketDataQueue.TryDequeue(out MarketData data))
{
// 简单的均值回归策略
decimal priceDiff = data.Price - lastPrice;
if (Math.Abs(priceDiff) > 2) // 价格变动超过阈值
{
var order = new Order
{
Symbol = data.Symbol,
Price = data.Price,
Quantity = 100,
Timestamp = Stopwatch.GetTimestamp(),
Side = priceDiff > 0 ? "SELL" : "BUY" // 均值回归
};
orderQueue.Enqueue(order);
Console.WriteLine($"策略: 生成{order.Side}订单 {order.Symbol} @ {order.Price:F2}");
}
lastPrice = data.Price;
}
// 策略计算需要非常快
Thread.Sleep(0); // 让出时间片但保持高响应性
}
Console.WriteLine("策略引擎线程退出");
}
static void OrderExecutor(int coreId)
{
SetupCriticalThread("订单执行", coreId);
int executedOrders = 0;
while (isRunning || !orderQueue.IsEmpty)
{
if (orderQueue.TryDequeue(out Order order))
{
// 模拟订单执行延迟
var executionTime = Stopwatch.GetTimestamp();
var latency = (executionTime - order.Timestamp) * 1000000.0 / Stopwatch.Frequency;
Console.WriteLine($"执行: {order.Side} {order.Quantity} {order.Symbol} @ {order.Price:F2}, " +
$"延迟: {latency:F1}μs");
executedOrders++;
// 订单执行需要极低延迟
Thread.Sleep(0);
}
else
{
Thread.Sleep(1);
}
}
Console.WriteLine($"订单执行线程退出,总执行订单: {executedOrders}");
}
static void RiskManager(int coreId)
{
SetupCriticalThread("风险管理", coreId);
decimal totalExposure = 0;
const decimal maxExposure = 100000;
while (isRunning)
{
// 模拟风险检查
if (orderQueue.Count > 10)
{
Console.WriteLine("风险: 订单队列积压,可能存在风险");
}
if (totalExposure > maxExposure * 0.8m)
{
Console.WriteLine($"风险: 总敞口接近限制 ({totalExposure:F0}/{maxExposure:F0})");
}
// 风险管理可以稍微低频一些
Thread.Sleep(100);
}
Console.WriteLine("风险管理线程退出");
}
static void SetupCriticalThread(string threadName, int coreId)
{
try
{
// 设置线程亲和性
IntPtr affinityMask = new IntPtr(1 << coreId);
SetThreadAffinityMask(GetCurrentThread(), affinityMask);
// 设置高优先级
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
Console.WriteLine($"{threadName}: 绑定到CPU核心{coreId},设置高优先级");
}
catch (Exception ex)
{
Console.WriteLine($"{threadName}: 设置失败 - {ex.Message}");
}
}
}
案例4:多媒体处理管道
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class MultimediaPipeline
{
[DllImport("kernel32.dll")]
static extern IntPtr SetThreadAffinityMask(IntPtr hThread, IntPtr dwThreadAffinityMask);
[DllImport("kernel32.dll")]
static extern IntPtr GetCurrentThread();
private static volatile bool isProcessing = true;
private static readonly object frameBuffer = new object();
private static int frameCount = 0;
class Frame
{
public int FrameNumber { get; set; }
public byte[] Data { get; set; }
public long Timestamp { get; set; }
}
static void Main(string[] args)
{
Console.WriteLine("多媒体处理管道 - 线程亲和性优化");
Console.WriteLine("模拟实时视频处理:捕获 -> 解码 -> 处理 -> 编码 -> 输出");
if (Environment.ProcessorCount < 4)
{
Console.WriteLine("建议至少4个CPU核心以获得最佳性能");
}
// 管道阶段分配到不同核心
// 核心0: 视频捕获
// 核心1: 解码
// 核心2: 图像处理
// 核心3: 编码和输出
Task[] pipelineTasks = new Task[4];
pipelineTasks[0] = Task.Run(() => VideoCaptureStage(0));
pipelineTasks[1] = Task.Run(() => DecodeStage(1));
pipelineTasks[2] = Task.Run(() => ImageProcessingStage(2));
pipelineTasks[3] = Task.Run(() => EncodeOutputStage(3));
// 运行8秒
Thread.Sleep(8000);
isProcessing = false;
Task.WaitAll(pipelineTasks);
Console.WriteLine("多媒体处理管道停止");
}
static void VideoCaptureStage(int coreId)
{
SetThreadAffinity("视频捕获", coreId);
var stopwatch = Stopwatch.StartNew();
int capturedFrames = 0;
while (isProcessing)
{
// 模拟30fps视频捕获
var targetTime = capturedFrames * 1000.0 / 30.0; // 30fps
var currentTime = stopwatch.ElapsedMilliseconds;
if (currentTime >= targetTime)
{
// 模拟帧捕获
var frame = new Frame
{
FrameNumber = capturedFrames,
Data = new byte[1920 * 1080 * 3], // 1080p RGB
Timestamp = Stopwatch.GetTimestamp()
};
// 填充模拟数据
for (int i = 0; i < frame.Data.Length; i += 3)
{
frame.Data[i] = (byte)(capturedFrames % 256); // R
frame.Data[i + 1] = (byte)((capturedFrames * 2) % 256); // G
frame.Data[i + 2] = (byte)((capturedFrames * 3) % 256); // B
}
Console.WriteLine($"捕获: 帧 {capturedFrames}, 时间戳: {frame.Timestamp}");
capturedFrames++;
// 模拟捕获处理时间
Thread.Sleep(2);
}
else
{
// 等待下一个帧时间
Thread.Sleep(1);
}
}
Console.WriteLine($"视频捕获完成,总捕获帧数: {capturedFrames}");
}
static void DecodeStage(int coreId)
{
SetThreadAffinity("解码", coreId);
int decodedFrames = 0;
while (isProcessing)
{
// 模拟解码处理
// 在实际应用中,这里会从捕获阶段接收压缩数据并解码
if (frameCount > decodedFrames)
{
Console.WriteLine($"解码: 处理帧 {decodedFrames}");
// 模拟解码时间 (解码通常比捕获慢)
Thread.Sleep(5);
decodedFrames++;
}
else
{
Thread.Sleep(1);
}
}
Console.WriteLine($"解码完成,总解码帧数: {decodedFrames}");
}
static void ImageProcessingStage(int coreId)
{
SetThreadAffinity("图像处理", coreId);
int processedFrames = 0;
while (isProcessing)
{
// 模拟图像处理 (滤镜、增强、特效等)
if (frameCount > processedFrames)
{
Console.WriteLine($"处理: 应用特效到帧 {processedFrames}");
// 模拟复杂的图像处理算法
PerformImageProcessing(processedFrames);
processedFrames++;
}
else
{
Thread.Sleep(1);
}
}
Console.WriteLine($"图像处理完成,总处理帧数: {processedFrames}");
}
static void EncodeOutputStage(int coreId)
{
SetThreadAffinity("编码输出", coreId);
int encodedFrames = 0;
while (isProcessing)
{
// 模拟编码和输出
if (frameCount > encodedFrames)
{
Console.WriteLine($"编码: 压缩并输出帧 {encodedFrames}");
// 模拟编码时间 (编码通常是最耗时的)
Thread.Sleep(8);
encodedFrames++;
}
else
{
Thread.Sleep(1);
}
}
Console.WriteLine($"编码输出完成,总输出帧数: {encodedFrames}");
}
static void SetThreadAffinity(string stageName, int coreId)
{
try
{
IntPtr affinityMask = new IntPtr(1 << coreId);
SetThreadAffinityMask(GetCurrentThread(), affinityMask);
Console.WriteLine($"{stageName}: 绑定到CPU核心{coreId}");
}
catch (Exception ex)
{
Console.WriteLine($"{stageName}: 设置线程亲和性失败 - {ex.Message}");
}
}
static void PerformImageProcessing(int frameNumber)
{
// 模拟CPU密集型图像处理
const int iterations = 50000;
double result = 0;
for (int i = 0; i < iterations; i++)
{
// 模拟滤镜计算
result += Math.Sin(i * 0.001 + frameNumber * 0.1) * Math.Cos(i * 0.002);
}
// 模拟内存操作 (像素处理)
byte[] pixelBuffer = new byte[1024];
for (int i = 0; i < pixelBuffer.Length; i++)
{
pixelBuffer[i] = (byte)((result + frameNumber + i) % 256);
}
}
}
知识点总结
线程亲和性的核心概念:
将线程绑定到特定CPU核心
优化缓存命中率和内存访问
提高系统可预测性
适用场景:
高性能计算应用
实时系统和游戏
多媒体处理
高频交易系统
设置方法:
Windows: SetThreadAffinityMask API
Linux: sched_setaffinity
.NET: Process.ProcessorAffinity
性能考虑:
合理分配任务到不同核心
考虑NUMA架构影响
避免核心负载不均衡
最佳实践:
基于性能测试做决定
考虑任务特性和依赖关系
留出核心给操作系统调度
监控和调整策略
注意事项:
不当使用可能降低性能
操作系统调度器通常很智能
需要考虑系统整体负载
跨平台兼容性问题