admin 2025-11-01 15:24:14 世界杯的赛程

线程亲和性

知识点

什么是线程亲和性

线程亲和性(Thread Affinity)是指将线程绑定到特定的CPU核心或处理器上运行的技术。通过设置线程亲和性,可以控制线程在哪个CPU核心上执行,从而优化性能、减少缓存失效和提高系统的可预测性。

线程亲和性的作用

缓存优化:减少缓存失效,提高缓存命中率

NUMA优化:在NUMA架构中优化内存访问

实时性:提高实时应用的可预测性

热点避免:避免某些核心过度使用

调试优化:便于性能分析和调试

ProcessorAffinity属性

在.NET中,可以通过Thread.ProcessorAffinity属性(在.NET Framework中)或使用P/Invoke调用Windows API来设置线程亲和性。

应用场景

高性能计算应用

实时系统

游戏引擎

音视频处理

网络服务器优化

注意事项

不当使用可能降低性能

操作系统调度器通常做得很好

应该基于性能测试做决定

考虑NUMA架构的影响

代码案例

案例1:基本线程亲和性设置

using System;

using System.Diagnostics;

using System.Runtime.InteropServices;

using System.Threading;

using System.Threading.Tasks;

class BasicThreadAffinity

{

// Windows API声明

[DllImport("kernel32.dll")]

static extern IntPtr GetCurrentThread();

[DllImport("kernel32.dll")]

static extern IntPtr SetThreadAffinityMask(IntPtr hThread, IntPtr dwThreadAffinityMask);

[DllImport("kernel32.dll")]

static extern int GetCurrentThreadId();

static void Main(string[] args)

{

Console.WriteLine("线程亲和性基本示例");

Console.WriteLine($"系统CPU核心数: {Environment.ProcessorCount}");

// 显示当前进程的亲和性

var process = Process.GetCurrentProcess();

Console.WriteLine($"进程亲和性掩码: 0x{process.ProcessorAffinity.ToInt64():X}");

// 启动多个任务,分别绑定到不同的CPU核心

Task[] tasks = new Task[Math.Min(4, Environment.ProcessorCount)];

for (int i = 0; i < tasks.Length; i++)

{

int coreId = i;

tasks[i] = Task.Run(() => WorkerWithAffinity(coreId));

}

// 启动一个不设置亲和性的对比任务

Task normalTask = Task.Run(() => NormalWorker());

Task.WaitAll(tasks.Concat(new[] { normalTask }).ToArray());

Console.WriteLine("所有任务完成");

}

static void WorkerWithAffinity(int coreId)

{

int threadId = GetCurrentThreadId();

Console.WriteLine($"工作线程{coreId}: 线程ID={threadId}, 尝试绑定到CPU核心{coreId}");

try

{

// 设置线程亲和性到指定核心

IntPtr affinityMask = new IntPtr(1 << coreId);

IntPtr previousAffinity = SetThreadAffinityMask(GetCurrentThread(), affinityMask);

if (previousAffinity != IntPtr.Zero)

{

Console.WriteLine($"工作线程{coreId}: 成功绑定到CPU核心{coreId}");

}

else

{

Console.WriteLine($"工作线程{coreId}: 绑定到CPU核心{coreId}失败");

}

}

catch (Exception ex)

{

Console.WriteLine($"工作线程{coreId}: 设置亲和性异常 - {ex.Message}");

}

// 执行计算密集型工作

PerformIntensiveWork($"绑定核心{coreId}", 3000);

}

static void NormalWorker()

{

int threadId = GetCurrentThreadId();

Console.WriteLine($"普通工作线程: 线程ID={threadId}, 使用系统默认调度");

// 执行相同的计算密集型工作

PerformIntensiveWork("系统调度", 3000);

}

static void PerformIntensiveWork(string workerName, int durationMs)

{

var stopwatch = Stopwatch.StartNew();

long operations = 0;

Console.WriteLine($"{workerName}: 开始计算密集型工作");

while (stopwatch.ElapsedMilliseconds < durationMs)

{

// 执行一些计算密集型操作

for (int i = 0; i < 10000; i++)

{

Math.Sqrt(i * i + 1);

operations++;

}

// 定期报告进度

if (operations % 100000 == 0)

{

Console.WriteLine($"{workerName}: 已执行 {operations} 次操作, " +

$"当前线程ID: {GetCurrentThreadId()}");

}

}

stopwatch.Stop();

Console.WriteLine($"{workerName}: 工作完成, 总操作数: {operations}, " +

$"耗时: {stopwatch.ElapsedMilliseconds}ms, " +

$"操作/秒: {operations * 1000.0 / stopwatch.ElapsedMilliseconds:F0}");

}

}

案例2:NUMA感知的任务分配

using System;

using System.Collections.Generic;

using System.Diagnostics;

using System.Runtime.InteropServices;

using System.Threading;

using System.Threading.Tasks;

class NumaAwareTaskDistribution

{

[DllImport("kernel32.dll")]

static extern bool GetNumaHighestNodeNumber(out uint HighestNodeNumber);

[DllImport("kernel32.dll")]

static extern bool GetNumaNodeProcessorMask(uint Node, out ulong ProcessorMask);

[DllImport("kernel32.dll")]

static extern IntPtr SetThreadAffinityMask(IntPtr hThread, IntPtr dwThreadAffinityMask);

[DllImport("kernel32.dll")]

static extern IntPtr GetCurrentThread();

static void Main(string[] args)

{

Console.WriteLine("NUMA感知任务分配示例");

// 获取NUMA信息

var numaInfo = GetNumaInformation();

if (numaInfo.Count == 0)

{

Console.WriteLine("系统不支持NUMA或获取NUMA信息失败");

return;

}

Console.WriteLine($"检测到 {numaInfo.Count} 个NUMA节点:");

foreach (var node in numaInfo)

{

Console.WriteLine($" 节点 {node.Key}: 处理器掩码 0x{node.Value:X}");

}

// 在每个NUMA节点上启动任务

List tasks = new List();

foreach (var numaNode in numaInfo)

{

int nodeId = numaNode.Key;

ulong processorMask = numaNode.Value;

// 在当前NUMA节点上启动多个任务

int tasksPerNode = Math.Min(4, GetCoreCountFromMask(processorMask));

for (int i = 0; i < tasksPerNode; i++)

{

int taskIndex = i;

tasks.Add(Task.Run(() => NumaAwareWorker(nodeId, processorMask, taskIndex)));

}

}

// 启动对比任务(不设置亲和性)

Task normalTask = Task.Run(() => NormalTask());

tasks.Add(normalTask);

Task.WaitAll(tasks.ToArray());

Console.WriteLine("所有任务完成");

}

static Dictionary GetNumaInformation()

{

var numaInfo = new Dictionary();

try

{

if (GetNumaHighestNodeNumber(out uint highestNode))

{

for (uint nodeId = 0; nodeId <= highestNode; nodeId++)

{

if (GetNumaNodeProcessorMask(nodeId, out ulong processorMask))

{

if (processorMask != 0)

{

numaInfo.Add((int)nodeId, processorMask);

}

}

}

}

}

catch (Exception ex)

{

Console.WriteLine($"获取NUMA信息失败: {ex.Message}");

}

return numaInfo;

}

static int GetCoreCountFromMask(ulong mask)

{

int count = 0;

while (mask != 0)

{

count += (int)(mask & 1);

mask >>= 1;

}

return count;

}

static void NumaAwareWorker(int numaNodeId, ulong processorMask, int taskIndex)

{

Console.WriteLine($"NUMA任务 节点{numaNodeId}-{taskIndex}: 开始执行");

try

{

// 设置线程亲和性到NUMA节点的处理器

IntPtr affinityMask = new IntPtr((long)processorMask);

SetThreadAffinityMask(GetCurrentThread(), affinityMask);

Console.WriteLine($"NUMA任务 节点{numaNodeId}-{taskIndex}: 绑定到处理器掩码 0x{processorMask:X}");

}

catch (Exception ex)

{

Console.WriteLine($"NUMA任务 节点{numaNodeId}-{taskIndex}: 设置亲和性失败 - {ex.Message}");

}

// 分配一些内存并进行计算

PerformNumaAwareWork($"NUMA节点{numaNodeId}-任务{taskIndex}");

}

static void NormalTask()

{

Console.WriteLine("普通任务: 使用系统默认调度");

PerformNumaAwareWork("普通任务");

}

static void PerformNumaAwareWork(string taskName)

{

var stopwatch = Stopwatch.StartNew();

// 分配大量内存进行操作

const int arraySize = 1000000;

double[] data = new double[arraySize];

Console.WriteLine($"{taskName}: 开始内存密集型计算");

// 初始化数据

for (int i = 0; i < arraySize; i++)

{

data[i] = Math.Sin(i * 0.001);

}

// 执行多轮计算

for (int round = 0; round < 5; round++)

{

double sum = 0;

for (int i = 0; i < arraySize; i++)

{

data[i] = Math.Sqrt(data[i] * data[i] + 1);

sum += data[i];

}

Console.WriteLine($"{taskName}: 第{round + 1}轮完成, 校验和: {sum:F2}");

}

stopwatch.Stop();

Console.WriteLine($"{taskName}: 工作完成, 耗时: {stopwatch.ElapsedMilliseconds}ms");

}

}

案例3:高频交易系统模拟

using System;

using System.Collections.Concurrent;

using System.Diagnostics;

using System.Runtime.InteropServices;

using System.Threading;

using System.Threading.Tasks;

class HighFrequencyTradingSimulation

{

[DllImport("kernel32.dll")]

static extern IntPtr SetThreadAffinityMask(IntPtr hThread, IntPtr dwThreadAffinityMask);

[DllImport("kernel32.dll")]

static extern IntPtr GetCurrentThread();

[DllImport("kernel32.dll")]

static extern bool SetThreadPriority(IntPtr hThread, int nPriority);

const int THREAD_PRIORITY_TIME_CRITICAL = 15;

private static readonly ConcurrentQueue marketDataQueue = new ConcurrentQueue();

private static readonly ConcurrentQueue orderQueue = new ConcurrentQueue();

private static volatile bool isRunning = true;

class MarketData

{

public string Symbol { get; set; }

public decimal Price { get; set; }

public long Timestamp { get; set; }

public int Volume { get; set; }

}

class Order

{

public string Symbol { get; set; }

public decimal Price { get; set; }

public int Quantity { get; set; }

public long Timestamp { get; set; }

public string Side { get; set; } // "BUY" or "SELL"

}

static void Main(string[] args)

{

Console.WriteLine("高频交易系统模拟 - 线程亲和性优化");

Console.WriteLine($"系统CPU核心数: {Environment.ProcessorCount}");

// 核心分配策略:

// 核心0: 市场数据接收 (最高优先级)

// 核心1: 策略引擎 (高优先级)

// 核心2: 订单执行 (高优先级)

// 核心3: 风险管理 (中等优先级)

Task[] criticalTasks = new Task[4];

// 市场数据接收线程 - 绑定到核心0

criticalTasks[0] = Task.Run(() => MarketDataReceiver(0));

// 策略引擎线程 - 绑定到核心1

criticalTasks[1] = Task.Run(() => TradingStrategy(1));

// 订单执行线程 - 绑定到核心2

criticalTasks[2] = Task.Run(() => OrderExecutor(2));

// 风险管理线程 - 绑定到核心3

criticalTasks[3] = Task.Run(() => RiskManager(3));

// 运行10秒

Thread.Sleep(10000);

isRunning = false;

Task.WaitAll(criticalTasks);

Console.WriteLine("交易系统停止");

}

static void MarketDataReceiver(int coreId)

{

SetupCriticalThread("市场数据接收", coreId);

var random = new Random();

string[] symbols = { "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN" };

while (isRunning)

{

// 模拟接收市场数据

var marketData = new MarketData

{

Symbol = symbols[random.Next(symbols.Length)],

Price = 100 + (decimal)(random.NextDouble() * 50),

Timestamp = Stopwatch.GetTimestamp(),

Volume = random.Next(100, 1000)

};

marketDataQueue.Enqueue(marketData);

// 高频数据,每毫秒多个数据点

Thread.Sleep(1);

}

Console.WriteLine("市场数据接收线程退出");

}

static void TradingStrategy(int coreId)

{

SetupCriticalThread("策略引擎", coreId);

var random = new Random();

decimal lastPrice = 100;

while (isRunning)

{

if (marketDataQueue.TryDequeue(out MarketData data))

{

// 简单的均值回归策略

decimal priceDiff = data.Price - lastPrice;

if (Math.Abs(priceDiff) > 2) // 价格变动超过阈值

{

var order = new Order

{

Symbol = data.Symbol,

Price = data.Price,

Quantity = 100,

Timestamp = Stopwatch.GetTimestamp(),

Side = priceDiff > 0 ? "SELL" : "BUY" // 均值回归

};

orderQueue.Enqueue(order);

Console.WriteLine($"策略: 生成{order.Side}订单 {order.Symbol} @ {order.Price:F2}");

}

lastPrice = data.Price;

}

// 策略计算需要非常快

Thread.Sleep(0); // 让出时间片但保持高响应性

}

Console.WriteLine("策略引擎线程退出");

}

static void OrderExecutor(int coreId)

{

SetupCriticalThread("订单执行", coreId);

int executedOrders = 0;

while (isRunning || !orderQueue.IsEmpty)

{

if (orderQueue.TryDequeue(out Order order))

{

// 模拟订单执行延迟

var executionTime = Stopwatch.GetTimestamp();

var latency = (executionTime - order.Timestamp) * 1000000.0 / Stopwatch.Frequency;

Console.WriteLine($"执行: {order.Side} {order.Quantity} {order.Symbol} @ {order.Price:F2}, " +

$"延迟: {latency:F1}μs");

executedOrders++;

// 订单执行需要极低延迟

Thread.Sleep(0);

}

else

{

Thread.Sleep(1);

}

}

Console.WriteLine($"订单执行线程退出,总执行订单: {executedOrders}");

}

static void RiskManager(int coreId)

{

SetupCriticalThread("风险管理", coreId);

decimal totalExposure = 0;

const decimal maxExposure = 100000;

while (isRunning)

{

// 模拟风险检查

if (orderQueue.Count > 10)

{

Console.WriteLine("风险: 订单队列积压,可能存在风险");

}

if (totalExposure > maxExposure * 0.8m)

{

Console.WriteLine($"风险: 总敞口接近限制 ({totalExposure:F0}/{maxExposure:F0})");

}

// 风险管理可以稍微低频一些

Thread.Sleep(100);

}

Console.WriteLine("风险管理线程退出");

}

static void SetupCriticalThread(string threadName, int coreId)

{

try

{

// 设置线程亲和性

IntPtr affinityMask = new IntPtr(1 << coreId);

SetThreadAffinityMask(GetCurrentThread(), affinityMask);

// 设置高优先级

SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);

Console.WriteLine($"{threadName}: 绑定到CPU核心{coreId},设置高优先级");

}

catch (Exception ex)

{

Console.WriteLine($"{threadName}: 设置失败 - {ex.Message}");

}

}

}

案例4:多媒体处理管道

using System;

using System.Diagnostics;

using System.Runtime.InteropServices;

using System.Threading;

using System.Threading.Tasks;

class MultimediaPipeline

{

[DllImport("kernel32.dll")]

static extern IntPtr SetThreadAffinityMask(IntPtr hThread, IntPtr dwThreadAffinityMask);

[DllImport("kernel32.dll")]

static extern IntPtr GetCurrentThread();

private static volatile bool isProcessing = true;

private static readonly object frameBuffer = new object();

private static int frameCount = 0;

class Frame

{

public int FrameNumber { get; set; }

public byte[] Data { get; set; }

public long Timestamp { get; set; }

}

static void Main(string[] args)

{

Console.WriteLine("多媒体处理管道 - 线程亲和性优化");

Console.WriteLine("模拟实时视频处理:捕获 -> 解码 -> 处理 -> 编码 -> 输出");

if (Environment.ProcessorCount < 4)

{

Console.WriteLine("建议至少4个CPU核心以获得最佳性能");

}

// 管道阶段分配到不同核心

// 核心0: 视频捕获

// 核心1: 解码

// 核心2: 图像处理

// 核心3: 编码和输出

Task[] pipelineTasks = new Task[4];

pipelineTasks[0] = Task.Run(() => VideoCaptureStage(0));

pipelineTasks[1] = Task.Run(() => DecodeStage(1));

pipelineTasks[2] = Task.Run(() => ImageProcessingStage(2));

pipelineTasks[3] = Task.Run(() => EncodeOutputStage(3));

// 运行8秒

Thread.Sleep(8000);

isProcessing = false;

Task.WaitAll(pipelineTasks);

Console.WriteLine("多媒体处理管道停止");

}

static void VideoCaptureStage(int coreId)

{

SetThreadAffinity("视频捕获", coreId);

var stopwatch = Stopwatch.StartNew();

int capturedFrames = 0;

while (isProcessing)

{

// 模拟30fps视频捕获

var targetTime = capturedFrames * 1000.0 / 30.0; // 30fps

var currentTime = stopwatch.ElapsedMilliseconds;

if (currentTime >= targetTime)

{

// 模拟帧捕获

var frame = new Frame

{

FrameNumber = capturedFrames,

Data = new byte[1920 * 1080 * 3], // 1080p RGB

Timestamp = Stopwatch.GetTimestamp()

};

// 填充模拟数据

for (int i = 0; i < frame.Data.Length; i += 3)

{

frame.Data[i] = (byte)(capturedFrames % 256); // R

frame.Data[i + 1] = (byte)((capturedFrames * 2) % 256); // G

frame.Data[i + 2] = (byte)((capturedFrames * 3) % 256); // B

}

Console.WriteLine($"捕获: 帧 {capturedFrames}, 时间戳: {frame.Timestamp}");

capturedFrames++;

// 模拟捕获处理时间

Thread.Sleep(2);

}

else

{

// 等待下一个帧时间

Thread.Sleep(1);

}

}

Console.WriteLine($"视频捕获完成,总捕获帧数: {capturedFrames}");

}

static void DecodeStage(int coreId)

{

SetThreadAffinity("解码", coreId);

int decodedFrames = 0;

while (isProcessing)

{

// 模拟解码处理

// 在实际应用中,这里会从捕获阶段接收压缩数据并解码

if (frameCount > decodedFrames)

{

Console.WriteLine($"解码: 处理帧 {decodedFrames}");

// 模拟解码时间 (解码通常比捕获慢)

Thread.Sleep(5);

decodedFrames++;

}

else

{

Thread.Sleep(1);

}

}

Console.WriteLine($"解码完成,总解码帧数: {decodedFrames}");

}

static void ImageProcessingStage(int coreId)

{

SetThreadAffinity("图像处理", coreId);

int processedFrames = 0;

while (isProcessing)

{

// 模拟图像处理 (滤镜、增强、特效等)

if (frameCount > processedFrames)

{

Console.WriteLine($"处理: 应用特效到帧 {processedFrames}");

// 模拟复杂的图像处理算法

PerformImageProcessing(processedFrames);

processedFrames++;

}

else

{

Thread.Sleep(1);

}

}

Console.WriteLine($"图像处理完成,总处理帧数: {processedFrames}");

}

static void EncodeOutputStage(int coreId)

{

SetThreadAffinity("编码输出", coreId);

int encodedFrames = 0;

while (isProcessing)

{

// 模拟编码和输出

if (frameCount > encodedFrames)

{

Console.WriteLine($"编码: 压缩并输出帧 {encodedFrames}");

// 模拟编码时间 (编码通常是最耗时的)

Thread.Sleep(8);

encodedFrames++;

}

else

{

Thread.Sleep(1);

}

}

Console.WriteLine($"编码输出完成,总输出帧数: {encodedFrames}");

}

static void SetThreadAffinity(string stageName, int coreId)

{

try

{

IntPtr affinityMask = new IntPtr(1 << coreId);

SetThreadAffinityMask(GetCurrentThread(), affinityMask);

Console.WriteLine($"{stageName}: 绑定到CPU核心{coreId}");

}

catch (Exception ex)

{

Console.WriteLine($"{stageName}: 设置线程亲和性失败 - {ex.Message}");

}

}

static void PerformImageProcessing(int frameNumber)

{

// 模拟CPU密集型图像处理

const int iterations = 50000;

double result = 0;

for (int i = 0; i < iterations; i++)

{

// 模拟滤镜计算

result += Math.Sin(i * 0.001 + frameNumber * 0.1) * Math.Cos(i * 0.002);

}

// 模拟内存操作 (像素处理)

byte[] pixelBuffer = new byte[1024];

for (int i = 0; i < pixelBuffer.Length; i++)

{

pixelBuffer[i] = (byte)((result + frameNumber + i) % 256);

}

}

}

知识点总结

线程亲和性的核心概念:

将线程绑定到特定CPU核心

优化缓存命中率和内存访问

提高系统可预测性

适用场景:

高性能计算应用

实时系统和游戏

多媒体处理

高频交易系统

设置方法:

Windows: SetThreadAffinityMask API

Linux: sched_setaffinity

.NET: Process.ProcessorAffinity

性能考虑:

合理分配任务到不同核心

考虑NUMA架构影响

避免核心负载不均衡

最佳实践:

基于性能测试做决定

考虑任务特性和依赖关系

留出核心给操作系统调度

监控和调整策略

注意事项:

不当使用可能降低性能

操作系统调度器通常很智能

需要考虑系统整体负载

跨平台兼容性问题